Стріми: future у послідовності
Пригадайте, як ми використовували одержувач для нашого async-каналу раніше в цьому розділі
в розділі «Передавання повідомлень». Асинхронний
метод recv породжує послідовність елементів з часом. Це є прикладом
набагато більш загальної моделі, відомої як stream. Багато понять природно
представляються як stream: елементи, що стають доступними в черзі, фрагменти даних,
які поступово зчитуються з файлової системи, коли повний набір даних занадто
великий для пам’яті комп’ютера, або дані, що надходять по мережі з часом.
Оскільки stream є future, ми можемо використовувати їх з будь-яким іншим видом future і
поєднувати їх цікавими способами. Наприклад, ми можемо пакетувати події, щоб уникнути
запуску занадто великої кількості мережевих викликів, встановлювати тайм-аути для послідовностей тривалих
операцій або обмежувати події інтерфейсу користувача, щоб уникнути виконання непотрібної роботи.
Ми бачили послідовність елементів ще в Главі 13, коли розглядали trait Iterator
у розділі «Трейт Iterator і метод next», але є дві відмінності між ітераторами та
одержувачем async-каналу. Перша відмінність — це час: ітератори
синхронні, тоді як одержувач каналу асинхронний. Друга відмінність — це API.
Коли ми працюємо безпосередньо з Iterator, ми викликаємо його синхронний
метод next. Зокрема, з stream trpl::Receiver ми натомість викликали
асинхронний метод recv. В іншому ці API дуже схожі,
і ця схожість не є випадковістю. Stream — це ніби асинхронна форма
ітерації. Хоча trpl::Receiver спеціально чекає на отримання
повідомлень, загальний API stream є значно ширшим: він надає
наступний елемент так, як це робить Iterator, але асинхронно.
Подібність між ітераторами та stream у Rust означає, що ми фактично можемо
створити stream з будь-якого ітератора. Як і з ітератором, ми можемо працювати з
stream, викликаючи його метод next, а потім очікуючи результат, як у Listing
17-21, який ще не скомпілюється.
extern crate trpl; // required for mdbook test
fn main() {
trpl::block_on(async {
let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let iter = values.iter().map(|n| n * 2);
let mut stream = trpl::stream_from_iter(iter);
while let Some(value) = stream.next().await {
println!("The value was: {value}");
}
});
}
Ми починаємо з масиву чисел, який перетворюємо на ітератор, а потім
викликаємо на ньому map, щоб подвоїти всі значення. Потім ми перетворюємо ітератор на
stream за допомогою функції trpl::stream_from_iter. Далі ми
проходимо циклом по елементах у stream у міру їх надходження за допомогою циклу while let.
На жаль, коли ми намагаємося запустити код, він не компілюється, а натомість
повідомляє, що метод next недоступний:
error[E0599]: no method named `next` found for struct `tokio_stream::iter::Iter` in the current scope
--> src/main.rs:10:40
|
10 | while let Some(value) = stream.next().await {
| ^^^^
|
= help: items from traits can only be used if the trait is in scope
help: the following traits which provide `next` are implemented but not in scope; perhaps you want to import one of them
|
1 + use crate::trpl::StreamExt;
|
1 + use futures_util::stream::stream::StreamExt;
|
1 + use std::iter::Iterator;
|
1 + use std::str::pattern::Searcher;
|
help: there is a method `try_next` with a similar name
|
10 | while let Some(value) = stream.try_next().await {
| ~~~~~~~~
Як пояснює цей вивід, причина помилки компілятора полягає в тому, що нам потрібен
правильний трейт в області видимості, щоб мати змогу використовувати метод next.
З огляду на нашу попередню розмову, ви могли б обґрунтовано очікувати, що цим
трейт буде Stream, але насправді це StreamExt. Скорочення від extension,
Ext — це поширений шаблон у
спільноті Rust для розширення одного трейт іншим.
Трейт Stream визначає низькорівневий інтерфейс, який фактично поєднує
traits Iterator і Future. StreamExt надає набір API вищого рівня
поверх Stream, включно з методом next, а також інші допоміжні
методи, подібні до тих, що надає trait Iterator. Stream і
StreamExt ще не є частиною стандартної бібліотеки Rust, але більшість крейтів екосистеми
використовують подібні визначення.
Виправлення помилки компілятора полягає в тому, щоб додати оператор use для
trpl::StreamExt, як у Listing 17-22.
extern crate trpl; // required for mdbook test
use trpl::StreamExt;
fn main() {
trpl::block_on(async {
let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
// --snip--
let iter = values.iter().map(|n| n * 2);
let mut stream = trpl::stream_from_iter(iter);
while let Some(value) = stream.next().await {
println!("The value was: {value}");
}
});
}
Коли всі ці частини зібрані разом, цей код працює так, як ми хочемо! Більше того,
тепер, коли StreamExt є в області видимості, ми можемо використовувати всі його допоміжні
методи, як і з ітераторами.