Детальніший погляд на трейтів для async
Упродовж розділу ми використовували трейт Future, Stream і StreamExt
різними способами. Але дотепер ми уникали занурення в
деталі того, як вони працюють або як вони поєднуються, що здебільшого добре
для вашої щоденної роботи з Rust. Проте інколи ви зіткнетеся з
ситуаціями, де вам потрібно буде зрозуміти ще кілька деталей цих трейтів,
разом із типом Pin і трейтів Unpin. У цьому розділі ми зануримося
достатньо, щоб допомогти в таких сценаріях, усе ще залишаючи справді
глибоке занурення для іншої документації.
Трейт Future
Почнемо з того, що детальніше поглянемо на те, як працює трейт Future. Ось
як Rust визначає його:
#![allow(unused)]
fn main() {
use std::pin::Pin;
use std::task::{Context, Poll};
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
}
Це визначення трейту включає безліч нових типів, а також деякий синтаксис, який ми ще не бачили, тож пройдемося по визначенню по частинах.
По-перше, асоційований тип Output у Future вказує, до чого future
розв’язується. Це аналогічно асоційованому типу Item для трейту
Iterator.
По-друге, Future має метод poll, який приймає спеціальне посилання Pin
для свого параметра self і змінне посилання на тип Context, а
повертає Poll<Self::Output>. Про Pin і Context ми поговоримо трохи
згодом. Наразі зосередьмося на тому, що повертає метод — тип Poll:
#![allow(unused)]
fn main() {
pub enum Poll<T> {
Ready(T),
Pending,
}
}
Цей тип Poll подібний до Option. Він має один варіант, що має значення,
Ready(T), і один, що не має його, Pending. Однак Poll означає дещо
зовсім інше, ніж Option! Варіант Pending вказує на те, що future
ще має роботу, яку треба виконати, тож викликачеві потрібно буде перевірити
знову пізніше. Варіант Ready вказує на те, що Future завершив свою роботу
і значення T доступне.
Note: Рідко виникає потреба викликати
pollбезпосередньо, але якщо вам усе ж потрібно це зробити, майте на увазі, що для більшості futures викликач не повинен викликатиpollзнову після того, як future повернувReady. Багато futures викличуть паніку, якщо їх опитати знову після того, як вони стали готовими. Futures, які безпечно опитувати знову, прямо скажуть про це в своїй документації. Це подібно до того, як поводитьсяIterator::next.
Коли ви бачите код, що використовує await, Rust компілює його під капотом у
код, який викликає poll. Якщо ви повернетеся до Listing 17-4, де ми
виводили заголовок сторінки для однієї URL-адреси після того, як вона
розв’язувалася, Rust компілює це в щось на кшталт такого (хоча й не зовсім):
match page_title(url).poll() {
Ready(page_title) => match page_title {
Some(title) => println!("The title for {url} was {title}"),
None => println!("{url} had no title"),
}
Pending => {
// But what goes here?
}
}
Що нам слід робити, коли future усе ще Pending? Нам потрібен якийсь спосіб
спробувати знову, і знову, і знову, доки future нарешті не буде готовим. Іншими
словами, нам потрібен цикл:
let mut page_title_fut = page_title(url);
loop {
match page_title_fut.poll() {
Ready(value) => match page_title {
Some(title) => println!("The title for {url} was {title}"),
None => println!("{url} had no title"),
}
Pending => {
// continue
}
}
}
Якби Rust скомпілював це точно в такий код, кожен await був би
блокувальним — цілком протилежним тому, чого ми прагнули! Натомість Rust
гарантує, що цикл може передати керування чомусь, що може призупинити роботу
над цим future, щоб попрацювати над іншими futures, а потім перевірити цей
знову пізніше. Як ми бачили, цим чимось є async runtime, і це планування та
координація — одне з його головних завдань.
У розділі “Sending Data Between Two Tasks Using Message
Passing” ми описували очікування на
rx.recv. Виклик recv повертає future, а очікування future опитує його. Ми
зазначали, що runtime призупинить future, доки воно не буде готове з Some(message)
або None, коли канал закривається. З нашим глибшим розумінням трейту Future,
і конкретно Future::poll, ми можемо побачити, як це працює. Runtime знає, що
future не готове, коли воно повертає Poll::Pending. Навпаки, runtime знає, що
future готове і просуває його далі, коли poll повертає
Poll::Ready(Some(message)) або Poll::Ready(None).
Точні деталі того, як runtime робить це, виходять за межі цієї книги, але ключове тут — побачити базову механіку futures: runtime опитує кожне future, за яке він відповідає, знову присипляючи future, коли воно ще не готове.
Тип Pin і трейт Unpin
Ще в Listing 17-13 ми використовували макрос trpl::join!, щоб очікувати три
futures. Однак часто буває колекція, наприклад вектор, що містить певну
кількість futures, які не будуть відомі до часу виконання. Змінимо Listing
17-13 на код у Listing 17-23, який поміщає три futures у вектор
і натомість викликає функцію trpl::join_all, яка ще не скомпілюється.
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::block_on(async {
let (tx, mut rx) = trpl::channel();
let tx1 = tx.clone();
let tx1_fut = async move {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("future"),
];
for val in vals {
tx1.send(val).unwrap();
trpl::sleep(Duration::from_secs(1)).await;
}
};
let rx_fut = async {
while let Some(value) = rx.recv().await {
println!("received '{value}'");
}
};
let tx_fut = async move {
// --snip--
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for val in vals {
tx.send(val).unwrap();
trpl::sleep(Duration::from_secs(1)).await;
}
};
let futures: Vec<Box<dyn Future<Output = ()>>> =
vec![Box::new(tx1_fut), Box::new(rx_fut), Box::new(tx_fut)];
trpl::join_all(futures).await;
});
}
Ми помістили кожне future всередину Box, щоб зробити їх трейт-об’єктами,
так само як ми робили в розділі “Returning Errors from run” у Chapter 12.
(Ми розглянемо трейт-об’єкти детально в Chapter 18.) Використання трейт-об’єктів
дає змогу розглядати кожне з анонімних futures, створених цими типами, як один
і той самий тип, тому що всі вони реалізують трейт Future.
Це може здатися несподіваним. Зрештою, жоден із async-блоків нічого не
повертає, тож кожен із них створює Future<Output = ()>. Пам’ятайте, однак, що
Future — це трейт, а компілятор створює унікальний enum для кожного async
блоку, навіть коли вони мають ідентичні типи виходу. Так само як ви не можете
помістити дві різні вручну написані структури у Vec, ви не можете змішувати
enum-и, згенеровані компілятором.
Потім ми передаємо колекцію futures до функції trpl::join_all і
очікуємо результат. Однак це не компілюється; ось відповідна частина
повідомлень про помилки.
error[E0277]: `dyn Future<Output = ()>` cannot be unpinned
--> src/main.rs:48:33
|
48 | trpl::join_all(futures).await;
| ^^^^^ the trait `Unpin` is not implemented for `dyn Future<Output = ()>`
|
= note: consider using the `pin!` macro
consider using `Box::pin` if you need to access the pinned value outside of the current scope
= note: required for `Box<dyn Future<Output = ()>>` to implement `Future`
note: required by a bound in `futures_util::future::join_all::JoinAll`
--> file:///home/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/futures-util-0.3.30/src/future/join_all.rs:29:8
|
27 | pub struct JoinAll<F>
| ------- required by a bound in this struct
28 | where
29 | F: Future,
| ^^^^^^ required by this bound in `JoinAll`
Примітка в цьому повідомленні про помилку каже нам, що ми маємо використати
макрос pin!, щоб pin-ити значення, тобто помістити їх усередину типу
Pin, який гарантує, що значення не будуть переміщені в пам’яті. Повідомлення
про помилку каже, що pinning потрібен тому, що dyn Future<Output = ()> має
реалізовувати трейт Unpin, а наразі цього не робить.
Функція trpl::join_all повертає структуру під назвою JoinAll. Ця структура
є узагальненою за типом F, для якого накладено обмеження реалізовувати
трейт Future. Безпосереднє очікування future за допомогою await
неявно pin-ить future. Саме тому нам не потрібно використовувати pin!
скрізь, де ми хочемо очікувати futures.
Однак тут ми не очікуємо future безпосередньо. Замість цього ми створюємо
нове future, JoinAll, передаючи колекцію futures у функцію join_all.
Сигнатура join_all вимагає, щоб типи елементів у колекції всі реалізовували
трейт Future, а Box<T> реалізує Future лише якщо T, яке він
обгортає, є future, яке реалізує трейт Unpin.
Це багато для сприйняття! Щоб по-справжньому зрозуміти це, занурмося трохи
далі в те, як насправді працює трейт Future, зокрема навколо pinning. Ще
раз погляньте на визначення трейту Future:
#![allow(unused)]
fn main() {
use std::pin::Pin;
use std::task::{Context, Poll};
pub trait Future {
type Output;
// Required method
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
}
Параметр cx і його тип Context — це ключ до того, як runtime насправді
знає, коли перевіряти будь-яке конкретне future, залишаючись при цьому
лінивим. Знову ж таки, деталі того, як це працює, виходять за межі цього
розділу, і зазвичай вам потрібно думати про це лише тоді, коли ви пишете
власну реалізацію Future. Натомість ми зосередимося на типі для self,
оскільки це перший раз, коли ми бачимо метод, де self має анотацію типу.
Анотація типу для self працює так само, як анотації типу для інших параметрів
функції, але з двома ключовими відмінностями:
- Вона каже Rust, яким має бути тип
self, щоб метод можна було викликати. - Це не може бути будь-який тип. Він обмежений типом, на якому реалізовано
метод, посиланням або розумним вказівником на цей тип, або
Pin, що обгортає посилання на цей тип.
Ми побачимо більше про цей синтаксис у Chapter 18. Наразі
достатньо знати, що якщо ми хочемо опитати future, щоб перевірити, чи воно
Pending або Ready(Output), нам потрібне змінне посилання, обгорнуте в Pin,
на цей тип.
Pin — це обгортка для вказівникоподібних типів, таких як &, &mut, Box
і Rc.
(Технічно, Pin працює з типами, що реалізують трейт Deref або DerefMut,
але це фактично еквівалентно роботі лише з посиланнями та розумними
вказівниками.) Pin сам по собі не є вказівником і не має власної поведінки,
такої як Rc і Arc з підрахунком посилань; це суто інструмент, який
компілятор може використовувати, щоб забезпечувати обмеження на використання
вказівників.
Пригадування того, що await реалізовано через виклики poll, починає
пояснювати повідомлення про помилку, яке ми бачили раніше, але там ішлося про
Unpin, а не про Pin. То як саме Pin пов’язаний із Unpin, і чому Future
потрібен self у типі Pin, щоб викликати poll?
Пам’ятайте з раніше в цьому розділі, що серія точок await у future компілюється в автомат станів, і компілятор переконується, що цей автомат станів дотримується всіх звичайних правил Rust щодо безпеки, включно із запозиченням і власністю. Щоб це працювало, Rust дивиться на те, які дані потрібні між однією точкою await і або наступною точкою await, або кінцем async-блоку. Потім він створює відповідний варіант у скомпільованому автоматі станів. Кожен варіант отримує доступ, який йому потрібен, до даних, що будуть використані в цій частині вихідного коду, або шляхом отримання власності на ці дані, або шляхом отримання до них змінного чи незмінного посилання.
Поки що все добре: якщо ми щось неправильно зробимо з власністю або
посиланнями в певному async-блоці, перевірник запозичень нам про це скаже.
Коли ж ми хочемо перемістити future, яке відповідає цьому блоку, — наприклад,
перемістити його у Vec, щоб передати до join_all, — усе стає складніше.
Коли ми переміщуємо future — чи то шляхом додавання його в структуру даних,
щоб використати як ітератор із join_all, чи то шляхом повернення його з
функції, — це насправді означає переміщення автомата станів, який Rust
створює для нас. І, на відміну від більшості інших типів у Rust, futures,
які Rust створює для async-блоків, можуть у полях будь-якого конкретного
варіанта виявитися з посиланнями на самі себе, як показано в спрощеній
ілюстрації на Figure 17-4.
За замовчуванням, однак, будь-який об’єкт, що має посилання на самого себе, є небезпечним для переміщення, тому що посилання завжди вказують на фактичну адресу пам’яті того, на що вони посилаються (див. Figure 17-5). Якщо ви перемістите саму структуру даних, ці внутрішні посилання залишаться вказувати на старе місце. Однак це місце пам’яті тепер невалідне. По-перше, його значення не буде оновлено, коли ви внесете зміни до структури даних. По-друге — і це важливіше — комп’ютер тепер може повторно використати цю пам’ять для інших цілей! Згодом ви можете прочитати зовсім не пов’язані дані.
Теоретично, компілятор Rust міг би намагатися оновлювати кожне посилання на об’єкт щоразу, коли його переміщують, але це могло б додати значні накладні витрати продуктивності, особливо якщо потрібно оновлювати цілу мережу посилань. Якби замість цього ми могли переконатися, що відповідна структура даних не переміщується в пам’яті, нам не довелося б оновлювати жодні посилання. Саме для цього й існує перевірник запозичень Rust: у безпечному коді він не дає вам переміщувати будь-який елемент з активним посиланням на нього.
Pin спирається на це, щоб дати нам саме ту гарантію, яка нам потрібна. Коли
ми pin-имо значення, обгортаючи вказівник на це значення в Pin, воно
більше не може переміщуватися. Отже, якщо у вас є Pin<Box<SomeType>>, ви
насправді pin-ите значення SomeType, а не вказівник Box. Figure 17-6
ілюструє цей процес.
Насправді вказівник Box усе ще може вільно переміщуватися. Пам’ятайте: нас
цікавить, щоб дані, на які врешті-решт посилаються, залишалися на місці. Якщо
вказівник переміщується, але дані, на які він вказує, залишаються на тому ж
місці, як на Figure 17-7, проблеми немає. (Як окрему вправу, подивіться
документацію для цих типів, а також модуль std::pin, і спробуйте зрозуміти,
як би ви зробили це з Pin, що обгортає Box.) Ключ у тому, що сама
самореферентна структура не може переміщуватися, бо вона все ще pin-ена.
Однак більшість типів цілком безпечно переміщувати, навіть якщо вони
знаходяться за вказівником Pin. Нам потрібно думати про pinning лише тоді,
коли елементи мають внутрішні посилання. Примітивні значення, такі як числа й
булеві значення, безпечні, тому що вони явно не мають жодних внутрішніх
посилань.
Більшість типів, з якими ви зазвичай працюєте в Rust, також не мають таких
посилань. Наприклад, ви можете переміщувати Vec без хвилювання. З огляду на
те, що ми побачили дотепер, якщо у вас є Pin<Vec<String>>, вам довелося б
робити все через безпечні, але обмежувальні API, які надає Pin, хоча
Vec<String> завжди безпечно переміщувати, якщо немає інших посилань на нього.
Нам потрібен спосіб сказати компілятору, що в таких випадках можна
переміщувати елементи, — і саме тут вступає в дію Unpin.
Unpin — це marker trait, подібний до трейтів Send і Sync, які ми бачили
в Chapter 16, і тому не має власної функціональності. Marker traits існують
лише для того, щоб сказати компілятору, що безпечно використовувати тип, який
реалізує певний трейт, у конкретному контексті. Unpin інформує компілятор,
що певний тип не мусить дотримуватися жодних гарантій щодо того, чи може
значення безпечно переміщуватися.
Так само як і для Send і Sync, компілятор реалізує Unpin автоматично
для всіх типів, для яких може довести, що це безпечно. Особливий випадок,
знову подібний до Send і Sync, — це коли Unpin не реалізовано для
типу. Позначення для цього таке: impl !Unpin for SomeType,
де SomeType — це назва типу, який справді має
дотримуватися цих гарантій, щоб бути безпечним усякий раз, коли в Pin
використовується вказівник на цей тип.
Іншими словами, є дві речі, про які слід пам’ятати щодо зв’язку між Pin і
Unpin. По-перше, Unpin — це “нормальний” випадок, а !Unpin — особливий
випадок. По-друге, чи реалізує тип Unpin, чи !Unpin, лише має значення,
коли ви використовуєте pin-ений вказівник на цей тип, такий як Pin<&mut
SomeType>.
Щоб зробити це конкретним, подумайте про String: у нього є довжина й Unicode
символи, з яких він складається. Ми можемо обгорнути String у Pin, як
показано на Figure 17-8. Однак String автоматично реалізує Unpin, як і
більшість інших типів у Rust.
У результаті ми можемо робити речі, які були б незаконними, якби замість цього
String реалізовував !Unpin, наприклад замінювати один рядок іншим у точно
тому самому місці в пам’яті, як на Figure 17-9. Це не порушує контракт Pin,
тому що String не має внутрішніх посилань, які робили б його небезпечним для
переміщення. Саме тому він реалізує Unpin, а не !Unpin.
Тепер ми знаємо достатньо, щоб зрозуміти помилки, повідомлені для того
виклику join_all ще з Listing 17-23. Спочатку ми намагалися перемістити
futures, створені async-блоками, у Vec<Box<dyn Future<Output = ()>>>, але,
як ми бачили, ці futures можуть мати внутрішні посилання, тож вони не
реалізують Unpin автоматично. Щойно ми їх pin-имо, ми можемо передати
отриманий тип Pin у Vec, будучи впевненими, що базові дані у futures не
будуть переміщені. Listing 17-24 показує, як виправити код, викликавши макрос
pin! там, де визначено кожне з трьох futures, і скоригувавши тип
трейт-об’єкта.
extern crate trpl; // required for mdbook test
use std::pin::{Pin, pin};
// --snip--
use std::time::Duration;
fn main() {
trpl::block_on(async {
let (tx, mut rx) = trpl::channel();
let tx1 = tx.clone();
let tx1_fut = pin!(async move {
// --snip--
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("future"),
];
for val in vals {
tx1.send(val).unwrap();
trpl::sleep(Duration::from_secs(1)).await;
}
});
let rx_fut = pin!(async {
// --snip--
while let Some(value) = rx.recv().await {
println!("received '{value}'");
}
});
let tx_fut = pin!(async move {
// --snip--
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for val in vals {
tx.send(val).unwrap();
trpl::sleep(Duration::from_secs(1)).await;
}
});
let futures: Vec<Pin<&mut dyn Future<Output = ()>>> =
vec![tx1_fut, rx_fut, tx_fut];
trpl::join_all(futures).await;
});
}
Цей приклад тепер компілюється й запускається, і ми могли б додавати або видаляти futures з вектора під час виконання та об’єднувати їх усі.
Pin і Unpin здебільшого важливі для побудови бібліотек нижчого рівня або
коли ви будуєте сам runtime, а не для щоденного коду Rust. Однак, коли ви
бачите ці трейтів у повідомленнях про помилки, тепер у вас буде краще уявлення
про те, як виправити свій код!
Note: Це поєднання
PinіUnpinдає змогу безпечно реалізувати в Rust цілий клас складних типів, які інакше були б складними, тому що вони самореферентні. Типи, що потребуютьPin, найчастіше трапляються в async Rust сьогодні, але час від часу ви можете побачити їх і в інших контекстах.Конкретні подробиці того, як працюють
PinіUnpin, і правила, яких вони мають дотримуватися, детально описані в документації API дляstd::pin, тож якщо ви хочете дізнатися більше, це чудове місце для початку.Якщо ви хочете зрозуміти, як усе працює під капотом ще детальніше, дивіться Chapters 2 і 4 з Asynchronous Programming in Rust.
Трейт Stream
Тепер, коли ви краще розумієте трейт Future, Pin і Unpin, ми можемо
перейти до трейту Stream. Як ви дізналися раніше в цьому розділі, streams
схожі на асинхронні ітератори. Однак, на відміну від Iterator і Future,
Stream на момент написання не має визначення у стандартній бібліотеці, але
існує дуже поширене визначення з крейту futures, яке використовується в
усьому екосистемному просторі.
Перш ніж дивитися, як трейт Stream може об’єднати їх, згадаємо визначення
трейтів Iterator і Future. Від Iterator ми маємо ідею послідовності:
його метод next надає Option<Self::Item>. Від Future ми маємо ідею
готовності з часом: його метод poll надає Poll<Self::Output>. Щоб
представити послідовність елементів, які стають готовими з часом, ми
визначаємо трейт Stream, який поєднує ці можливості:
#![allow(unused)]
fn main() {
use std::pin::Pin;
use std::task::{Context, Poll};
trait Stream {
type Item;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Option<Self::Item>>;
}
}
Трейт Stream визначає асоційований тип під назвою Item для типу елементів,
які продукує stream. Це подібно до Iterator, де може бути від нуля до
багатьох елементів, і на відміну від Future, де завжди є один Output,
навіть якщо це unit-тип ().
Stream також визначає метод для отримання цих елементів. Ми називаємо його
poll_next, щоб було зрозуміло, що він опитує так само, як це робить
Future::poll, і продукує послідовність елементів так само, як це робить
Iterator::next. Його тип повернення поєднує Poll з Option. Зовнішній тип
— Poll, тому що його треба перевіряти на готовність, так само як future.
Внутрішній тип — Option, тому що він має сигналізувати, чи є ще повідомлення,
так само як ітератор.
Імовірно, щось дуже схоже на це визначення згодом стане частиною стандартної бібліотеки Rust. Тим часом це частина інструментарію більшості runtime, тож ви можете на це покладатися, і все, що ми розглянемо далі, загалом має застосовуватися!
У прикладах, які ми бачили в розділі “Streams: Futures in Sequence”, однак, ми не використовували ні poll_next, ні Stream, а
натомість використовували next і StreamExt. Звісно, ми могли б працювати
безпосередньо через API poll_next, написавши вручну власні автомати станів
Stream, так само як ми могли б працювати з futures безпосередньо через
їхній метод poll. Використовувати await набагато приємніше, і трейт
StreamExt надає метод next, щоб ми могли саме так і робити:
#![allow(unused)]
fn main() {
use std::pin::Pin;
use std::task::{Context, Poll};
trait Stream {
type Item;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>>;
}
trait StreamExt: Stream {
async fn next(&mut self) -> Option<Self::Item>
where
Self: Unpin;
// other methods...
}
}
Note: Справжнє визначення, яке ми використали раніше в розділі, виглядає трохи інакше, ніж це, тому що воно підтримує версії Rust, які ще не підтримували використання async-функцій у трейтів. У результаті воно виглядає так:
fn next(&mut self) -> Next<'_, Self> where Self: Unpin;Тип
Next— цеstruct, яка реалізуєFutureі дає нам змогу назвати час життя посилання наselfза допомогоюNext<'_, Self>, щобawaitміг працювати з цим методом.
Трейт StreamExt також є домом для всіх цікавих методів, доступних для
використання зі streams. StreamExt автоматично реалізується для кожного
типу, який реалізує Stream, але ці трейтів визначені окремо, щоб дати змогу
спільноті розвивати API зручності без впливу на базовий трейт.
У версії StreamExt, що використовується в крейті trpl, трейт не лише
визначає метод next, але й надає стандартну реалізацію next, яка правильно
обробляє деталі виклику Stream::poll_next. Це означає, що навіть коли вам
потрібно написати власний тип потокових даних, вам лише потрібно реалізувати
Stream, і тоді будь-хто, хто використовує ваш тип даних, автоматично зможе
використовувати StreamExt і його методи з ним.
Це все, що ми збираємося охопити щодо деталей цих трейтів нижчого рівня. На завершення розгляньмо, як futures (включно зі streams), tasks і threads усе разом поєднуються!