Надання керування runtime
Пригадайте з розділу “Наша перша async-програма” section, що на кожній точці await Rust дає runtime змогу призупинити task і переключитися на інший, якщо future, яке очікується, ще не готове. Зворотне також вірне: Rust лише призупиняє async-блоки та повертає керування назад до runtime на точці await. Усе між точками await є синхронним.
Це означає, що якщо ви робите багато роботи в async-блоці без точки await, то це future заблокує будь-які інші future від просування. Іноді ви можете почути це як те, що одне future starving інші future. У деяких випадках це може бути не так уже й страшно. Однак, якщо ви робите якийсь дорогий setup або довготривалу роботу, або якщо у вас є future, яке буде безкінечно виконувати певне завдання, вам потрібно буде подумати про те, коли і де повертати керування назад до runtime.
Давайте змоделюємо довготривалу операцію, щоб проілюструвати проблему starvation,
а потім розглянемо, як її розв’язати. Listing 17-14 вводить функцію slow.
extern crate trpl; // required for mdbook test
use std::{thread, time::Duration};
fn main() {
trpl::block_on(async {
// We will call `slow` here later
});
}
fn slow(name: &str, ms: u64) {
thread::sleep(Duration::from_millis(ms));
println!("'{name}' ran for {ms}ms");
}
Цей код використовує std::thread::sleep замість trpl::sleep, щоб виклик
slow блокував потік, що виконується, на певну кількість мілісекунд. Ми можемо
використовувати slow як заміну для реальних операцій, які є і довготривалими,
і блокувальними.
У Listing 17-15 ми використовуємо slow, щоб імітувати виконання такого
CPU-bound work у парі future.
extern crate trpl; // required for mdbook test
use std::{thread, time::Duration};
fn main() {
trpl::block_on(async {
let a = async {
println!("'a' started.");
slow("a", 30);
slow("a", 10);
slow("a", 20);
trpl::sleep(Duration::from_millis(50)).await;
println!("'a' finished.");
};
let b = async {
println!("'b' started.");
slow("b", 75);
slow("b", 10);
slow("b", 15);
slow("b", 350);
trpl::sleep(Duration::from_millis(50)).await;
println!("'b' finished.");
};
trpl::select(a, b).await;
});
}
fn slow(name: &str, ms: u64) {
thread::sleep(Duration::from_millis(ms));
println!("'{name}' ran for {ms}ms");
}
Кожне future повертає керування назад до runtime лише після виконання кількох повільних операцій. Якщо ви запустите цей код, ви побачите такий вивід:
'a' started.
'a' ran for 30ms
'a' ran for 10ms
'a' ran for 20ms
'b' started.
'b' ran for 75ms
'b' ran for 10ms
'b' ran for 15ms
'b' ran for 350ms
'a' finished.
Як і в Listing 17-5, де ми використовували trpl::select, щоб змагати future, які
отримують два URLs, select усе ще завершується, щойно a закінчено. Проте
між викликами slow у двох future немає чергування. Future a виконує всю
свою роботу до того, як буде очікувано виклик trpl::sleep, потім future b
виконує всю свою роботу до того, як буде очікувано його власний виклик
trpl::sleep, і нарешті future a завершується. Щоб дозволити обом future
просуватися між їхніми повільними tasks, нам потрібні точки await, щоб ми могли
повернути керування назад до runtime. Це означає, що нам потрібна річ, яку ми
можемо await!
Ми вже можемо бачити таку передачу керування в Listing 17-15: якби ми
прибрали trpl::sleep у кінці future a, воно завершилося б без того, щоб
future b взагалі запускалося. Давайте спробуємо використати функцію
trpl::sleep як відправну точку для того, щоб дозволити операціям переключатися
з просування, як показано в Listing 17-16.
extern crate trpl; // required for mdbook test
use std::{thread, time::Duration};
fn main() {
trpl::block_on(async {
let one_ms = Duration::from_millis(1);
let a = async {
println!("'a' started.");
slow("a", 30);
trpl::sleep(one_ms).await;
slow("a", 10);
trpl::sleep(one_ms).await;
slow("a", 20);
trpl::sleep(one_ms).await;
println!("'a' finished.");
};
let b = async {
println!("'b' started.");
slow("b", 75);
trpl::sleep(one_ms).await;
slow("b", 10);
trpl::sleep(one_ms).await;
slow("b", 15);
trpl::sleep(one_ms).await;
slow("b", 350);
trpl::sleep(one_ms).await;
println!("'b' finished.");
};
trpl::select(a, b).await;
});
}
fn slow(name: &str, ms: u64) {
thread::sleep(Duration::from_millis(ms));
println!("'{name}' ran for {ms}ms");
}
Ми додали виклики trpl::sleep із точками await між кожним викликом slow.
Тепер робота двох future чергується:
'a' started.
'a' ran for 30ms
'b' started.
'b' ran for 75ms
'a' ran for 10ms
'b' ran for 10ms
'a' ran for 20ms
'b' ran for 15ms
'a' finished.
Future a все ще виконується деякий час, перш ніж передати керування b, тому що
воно викликає slow перед тим, як викликати trpl::sleep, але після цього future
перемикаються туди й назад щоразу, коли одне з них досягає точки await. У цьому
випадку ми зробили це після кожного виклику slow, але ми могли б розбити роботу
будь-яким способом, який має для нас найбільший сенс.
Втім, ми насправді не хочемо sleep тут: ми хочемо просуватися якнайшвидше. Нам
лише потрібно повернути керування назад до runtime. Ми можемо зробити це напряму,
використовуючи функцію trpl::yield_now. У Listing 17-17 ми замінюємо всі ті
виклики trpl::sleep на trpl::yield_now.
extern crate trpl; // required for mdbook test
use std::{thread, time::Duration};
fn main() {
trpl::block_on(async {
let a = async {
println!("'a' started.");
slow("a", 30);
trpl::yield_now().await;
slow("a", 10);
trpl::yield_now().await;
slow("a", 20);
trpl::yield_now().await;
println!("'a' finished.");
};
let b = async {
println!("'b' started.");
slow("b", 75);
trpl::yield_now().await;
slow("b", 10);
trpl::yield_now().await;
slow("b", 15);
trpl::yield_now().await;
slow("b", 350);
trpl::yield_now().await;
println!("'b' finished.");
};
trpl::select(a, b).await;
});
}
fn slow(name: &str, ms: u64) {
thread::sleep(Duration::from_millis(ms));
println!("'{name}' ran for {ms}ms");
}
Цей код є і зрозумілішим щодо фактичного наміру, і може бути значно
швидшим за використання sleep, тому що таймери, такі як той, що використовується
sleep, часто мають обмеження на те, наскільки дрібнозернистими вони можуть бути.
Версія sleep, яку ми використовуємо, наприклад, завжди спатиме щонайменше
одну мілісекунду, навіть якщо ми передамо їй Duration в одну наносекунду.
Знову ж таки, сучасні комп’ютери швидкі: вони можуть зробити багато за одну
мілісекунду!
Це означає, що async може бути корисним навіть для compute-bound tasks, залежно від того, що ще робить ваша програма, тому що він надає корисний інструмент для структурування взаємин між різними частинами програми (але ціною накладних витрат async state machine). Це форма cooperative multitasking, де кожне future має змогу визначати, коли воно передає керування через точки await. Отже, кожне future також несе відповідальність за те, щоб уникати надто довгого блокування. У деяких embedded операційних системах на базі Rust це єдиний вид multitasking!
У реальному коді ви зазвичай, звісно, не чергуватимете виклики функцій із точками await на кожному окремому рядку. Хоча надання керування таким способом є відносно недорогим, воно не є безкоштовним. У багатьох випадках спроба розбити compute-bound task може зробити його значно повільнішим, тому іноді краще для загальної продуктивності дозволити операції коротко блокуватися. Завжди вимірюйте, щоб побачити, де насправді є вузькі місця продуктивності вашого коду. Проте базову динаміку важливо тримати на увазі, якщо ви дійсно бачите, що багато роботи відбувається послідовно там, де ви очікували конкурентного виконання!
Створення наших власних async-абстракцій
Ми також можемо компонувати future разом, щоб створювати нові шаблони. Наприклад,
ми можемо побудувати функцію timeout з async-блоків, які вже маємо. Коли ми
завершимо, результатом буде ще один будівельний блок, який ми могли б використати
для створення ще більшої кількості async-абстракцій.
Listing 17-18 показує, як ми очікували б, що цей timeout працюватиме з повільним
future.
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::block_on(async {
let slow = async {
trpl::sleep(Duration::from_secs(5)).await;
"Finally finished"
};
match timeout(slow, Duration::from_secs(2)).await {
Ok(message) => println!("Succeeded with '{message}'"),
Err(duration) => {
println!("Failed after {} seconds", duration.as_secs())
}
}
});
}
Давайте реалізуємо це! Для початку давайте подумаємо про API для timeout:
- Він сам має бути async-функцією, щоб ми могли await його.
- Його перший параметр має бути future, яке потрібно запустити. Ми можемо зробити його узагальненим, щоб він працював із будь-яким future.
- Його другий параметр буде максимальною кількістю часу для очікування. Якщо ми використаємо
Duration, це буде легко передати доtrpl::sleep. - Він має повертати
Result. Якщо future завершується успішно,ResultбудеOkзі значенням, яке породило future. Якщо timeout спливає першим,ResultбудеErrз тривалістю, яку timeout очікував.
Listing 17-19 показує це оголошення.
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::block_on(async {
let slow = async {
trpl::sleep(Duration::from_secs(5)).await;
"Finally finished"
};
match timeout(slow, Duration::from_secs(2)).await {
Ok(message) => println!("Succeeded with '{message}'"),
Err(duration) => {
println!("Failed after {} seconds", duration.as_secs())
}
}
});
}
async fn timeout<F: Future>(
future_to_try: F,
max_time: Duration,
) -> Result<F::Output, Duration> {
// Here is where our implementation will go!
}
Це задовольняє наші цілі щодо типів. Тепер давайте подумаємо про поведінку, яка нам
потрібна: ми хочемо змагати future, передане всередину, проти тривалості. Ми можемо
використати trpl::sleep, щоб створити future таймера з Duration, і використати
trpl::select, щоб запускати цей таймер разом із future, яке передає викликач.
У Listing 17-20 ми реалізуємо timeout, зіставляючи з результатом await-інгу
trpl::select.
extern crate trpl; // required for mdbook test
use std::time::Duration;
use trpl::Either;
// --snip--
fn main() {
trpl::block_on(async {
let slow = async {
trpl::sleep(Duration::from_secs(5)).await;
"Finally finished"
};
match timeout(slow, Duration::from_secs(2)).await {
Ok(message) => println!("Succeeded with '{message}'"),
Err(duration) => {
println!("Failed after {} seconds", duration.as_secs())
}
}
});
}
async fn timeout<F: Future>(
future_to_try: F,
max_time: Duration,
) -> Result<F::Output, Duration> {
match trpl::select(future_to_try, trpl::sleep(max_time)).await {
Either::Left(output) => Ok(output),
Either::Right(_) => Err(max_time),
}
}
Реалізація trpl::select не є справедливою: вона завжди polling-ить аргументи в
порядку, у якому їх передано (інші реалізації select випадково вибиратимуть,
який аргумент polling-ити першим). Отже, ми передаємо future_to_try до select
першим, щоб воно мало шанс завершитися навіть якщо max_time є дуже короткою
тривалістю. Якщо future_to_try завершується першим, select поверне Left
із виходом від future_to_try. Якщо timer завершується першим, select
поверне Right із виходом таймера ().
Якщо future_to_try успішно завершується і ми отримуємо Left(output), ми
повертаємо Ok(output). Якщо ж таймер sleep спливає першим і ми отримуємо
Right(()), ми ігноруємо () за допомогою _ і натомість повертаємо
Err(max_time).
З цим ми маємо робочий timeout, зібраний із двох інших async-помічників. Якщо
ми запустимо наш код, він виведе режим збою після timeout:
Failed after 2 seconds
Оскільки future компонуються з іншими future, ви можете створювати справді потужні інструменти, використовуючи менші async-блоки. Наприклад, ви можете використати цей самий підхід, щоб поєднати timeout-и з повторними спробами, а потім використовувати їх з операціями, такими як мережеві виклики (такі як у Listing 17-5).
На практиці ви зазвичай працюватимете безпосередньо з async і await, а
додатково — з функціями, такими як select, і макросами, такими як макрос join!,
щоб керувати тим, як виконуються найзовнішні future.
Тепер ми бачили низку способів працювати з кількома future одночасно. Далі ми розглянемо, як ми можемо працювати з кількома future у послідовності з плином часу за допомогою streams.