Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Застосування конкурентності з Async

У цьому розділі ми застосуємо async до деяких із тих самих викликів конкурентності, які ми розв’язували з потоками в Розділі 16. Оскільки ми вже обговорили там багато ключових ідей, у цьому розділі ми зосередимося на тому, що відрізняється між потоками та futures.

У багатьох випадках API для роботи з конкурентністю з використанням async дуже схожі на ті, що використовуються з потоками. В інших випадках вони виявляються зовсім різними. Навіть коли API схожі між потоками та async, вони часто мають іншу поведінку — і майже завжди мають інші характеристики продуктивності.

Створення нової task з spawn_task

Першою операцією, яку ми розв’язували в розділі «Створення нового потоку з spawn» у Розділі 16, було підраховування на двох окремих потоках. Давайте зробимо те саме з використанням async. Крейт trpl надає функцію spawn_task, яка дуже схожа на API thread::spawn, і функцію sleep, яка є async-версією API thread::sleep. Ми можемо використати їх разом, щоб реалізувати приклад з підрахунком, як показано в Лістингу 17-6.

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        trpl::spawn_task(async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        });

        for i in 1..5 {
            println!("hi number {i} from the second task!");
            trpl::sleep(Duration::from_millis(500)).await;
        }
    });
}

Як нашу початкову точку, ми налаштовуємо нашу функцію main з trpl::block_on, щоб наша функція верхнього рівня могла бути async.

Примітка: Від цього моменту далі в розділі кожен приклад міститиме цей самий обгортковий код із trpl::block_on у main, тож ми часто будемо його пропускати, як і з main. Пам’ятайте додавати його у свій код!

Потім ми пишемо два цикли всередині цього блоку, кожен із викликом trpl::sleep, який чекає пів секунди (500 мілісекунд) перед надсиланням наступного повідомлення. Ми поміщаємо один цикл у тіло trpl::spawn_task, а інший — у верхньорівневий цикл for. Ми також додаємо await після викликів sleep.

Цей код поводиться подібно до реалізації на основі потоків — зокрема, включно з тим фактом, що ви можете побачити, що повідомлення з’являються в іншому порядку у своєму терміналі, коли ви запускаєте його:

hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!

Ця версія зупиняється щойно цикл for у тілі основного async-блоку завершується, тому що task, створена spawn_task, завершується, коли функція main закінчується. Якщо ви хочете, щоб вона виконувалася аж до завершення task, вам потрібно буде використати join handle, щоб дочекатися завершення першої task. З потоками ми використовували метод join, щоб “блокуватися”, аж поки потік не завершить виконання. У Лістингу 17-7 ми можемо використати await, щоб зробити те саме, тому що сам handle task є future. Її тип Output — це Result, тож ми також розгортаємо його після await.

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let handle = trpl::spawn_task(async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        });

        for i in 1..5 {
            println!("hi number {i} from the second task!");
            trpl::sleep(Duration::from_millis(500)).await;
        }

        handle.await.unwrap();
    });
}

Ця оновлена версія виконується, доки не завершаться обидва цикли:

hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!

Поки що виглядає так, ніби async і потоки дають нам подібні результати, лише з різним синтаксисом: використання await замість виклику join на join handle, і await для викликів sleep.

Більша відмінність полягає в тому, що нам не потрібно було створювати ще один операційний системний потік, щоб зробити це. Насправді нам тут навіть не потрібно створювати task. Оскільки async-блоки компілюються в анонімні futures, ми можемо помістити кожен цикл в async-блок і змусити runtime виконати їх обидва до завершення за допомогою функції trpl::join.

У розділі «Очікування завершення всіх потоків» в Розділі 16 ми показали, як використовувати метод join на типі JoinHandle, який повертається, коли ви викликаєте std::thread::spawn. Функція trpl::join схожа, але для futures. Коли ви даєте їй дві futures, вона створює одну нову future, чиїм output є кортеж, що містить output кожної future, яку ви передали, після того як обидві завершаться. Таким чином, у Лістингу 17-8 ми використовуємо trpl::join, щоб дочекатися завершення і fut1, і fut2. Ми не очікуємо fut1 і fut2, а натомість нову future, створену trpl::join. Ми ігноруємо output, тому що це просто кортеж, що містить два значення unit.

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let fut1 = async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let fut2 = async {
            for i in 1..5 {
                println!("hi number {i} from the second task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        trpl::join(fut1, fut2).await;
    });
}

Коли ми запускаємо це, ми бачимо, що обидві futures виконуються до завершення:

hi number 1 from the first task!
hi number 1 from the second task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!

Тепер ви бачитимете точно той самий порядок щоразу, що дуже відрізняється від того, що ми бачили з потоками та з trpl::spawn_task у Лістингу 17-7. Це тому, що функція trpl::join є справедливою, тобто вона перевіряє кожну future однаково часто, чергуючись між ними, і ніколи не дає одній вискочити вперед, якщо інша готова. З потоками операційна система вирішує, який потік перевіряти і як довго йому дозволяти працювати. З async Rust runtime вирішує, яку task перевіряти. (На практиці деталі ускладнюються, тому що async runtime може використовувати потоки операційної системи під капотом як частину того, як він керує конкурентністю, тож гарантування справедливості може вимагати більше роботи для runtime — але це все ще можливо!) Runtime не зобов’язані гарантувати справедливість для будь-якої конкретної операції, і вони часто пропонують різні API, щоб ви могли вибрати, чи хочете ви справедливість.

Спробуйте кілька з цих варіацій із await для futures і подивіться, що вони роблять:

  • Приберіть async-блок навколо одного або обох циклів.
  • Виконайте await для кожного async-блоку одразу після його визначення.
  • Обгорніть лише перший цикл в async-блок і виконайте await для отриманої future після тіла другого циклу.

Для додаткового виклику спробуйте визначити, яким буде вивід у кожному випадку до запуску коду!

Надсилання даних між двома tasks за допомогою передавання повідомлень

Спільне використання даних між futures також буде знайомим: ми знову використаємо передавання повідомлень, але цього разу з async-версіями типів і функцій. Ми підемо дещо іншим шляхом, ніж у розділі «Передавання даних між потоками за допомогою передавання повідомлень» в Розділі 16, щоб проілюструвати деякі ключові відмінності між потоковою конкурентністю та конкурентністю на основі futures. У Лістингу 17-9 ми почнемо лише з одного async-блоку — не створюючи окрему task, як ми створювали окремий потік.

extern crate trpl; // required for mdbook test

fn main() {
    trpl::block_on(async {
        let (tx, mut rx) = trpl::channel();

        let val = String::from("hi");
        tx.send(val).unwrap();

        let received = rx.recv().await.unwrap();
        println!("received '{received}'");
    });
}

Тут ми використовуємо trpl::channel, async-версію API каналу multiple-producer, single-consumer, який ми використовували з потоками ще в Розділі 16. Async- версія API лише трохи відрізняється від потокової версії: вона використовує змінний, а не незмінний receiver rx, і її метод recv створює future, яку нам потрібно await-ити, замість того щоб повертати значення безпосередньо. Тепер ми можемо надсилати повідомлення від sender до receiver. Зверніть увагу, що нам не потрібно створювати окремий потік або навіть task; нам лише потрібно await-ити виклик rx.recv.

Синхронний метод Receiver::recv у std::mpsc::channel блокується, доки не отримає повідомлення. Метод trpl::Receiver::recv цього не робить, тому що він async. Замість блокування він передає керування назад runtime, доки або не буде отримано повідомлення, або не закриється сторона надсилання каналу. На відміну від цього, ми не await-имо виклик send, тому що він не блокується. Йому це й не потрібно, тому що канал, у який ми надсилаємо, є необмеженим.

Примітка: Оскільки весь цей async-код виконується в async-блоці в виклику trpl::block_on, усе всередині нього може уникати блокування. Однак код поза ним блокуватиметься на тому, що функція block_on повертається. У цьому й полягає вся суть функції trpl::block_on: вона дає вам вибирати, де блокуватися на деякому наборі async-коду, а отже, де переходити між sync і async-кодом.

Зверніть увагу на дві речі в цьому прикладі. По-перше, повідомлення надійде одразу. По-друге, хоча ми використовуємо тут future, конкурентності ще немає. Усе в лістингу відбувається послідовно, так само, як і було б, якби futures не брали участі.

Давайте розв’яжемо першу частину, надсилаючи серію повідомлень і роблячи паузи між ними, як показано в Лістингу 17-10.

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let (tx, mut rx) = trpl::channel();

        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("future"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            trpl::sleep(Duration::from_millis(500)).await;
        }

        while let Some(value) = rx.recv().await {
            println!("received '{value}'");
        }
    });
}

Окрім надсилання повідомлень, нам потрібно їх і отримувати. У цьому випадку, оскільки ми знаємо, скільки повідомлень надійде, ми могли б зробити це вручну, викликавши rx.recv().await чотири рази. У реальному світі, однак, ми зазвичай чекатимемо на деяку невідому кількість повідомлень, тож нам потрібно продовжувати чекати, доки ми не визначимо, що повідомлень більше немає.

У Лістингу 16-10 ми використовували цикл for, щоб обробити всі елементи, отримані з синхронного каналу. Однак у Rust ще немає способу використовувати цикл for із серією елементів, створених асинхронно, тому нам потрібно використати цикл, який ми ще не бачили: умовний цикл while let. Це циклічна версія конструкції if let, яку ми бачили в розділі «Стиснений потік керування з if let і let...else» у Розділі 6. Цикл продовжуватиме виконуватися, доки зразок, який він задає, продовжує зіставлятися зі значенням.

Виклик rx.recv створює future, яку ми await-имо. Runtime призупинить цю future, доки вона не буде готова. Коли надійде повідомлення, future резольвиться в Some(message) стільки разів, скільки надходить повідомлення. Коли канал закривається, незалежно від того, чи надійшли якісь повідомлення, future натомість резольвиться в None, щоб показати, що більше немає значень і, отже, нам слід припинити polling — тобто припинити await-ити.

Цикл while let об’єднує все це. Якщо результат виклику rx.recv().await — це Some(message), ми отримуємо доступ до повідомлення й можемо використати його в тілі циклу, так само, як могли б з if let. Якщо результат — None, цикл завершується. Щоразу, коли цикл завершується, він знову доходить до точки await, тож runtime знову призупиняє його, доки не надійде інше повідомлення.

Тепер код успішно надсилає й отримує всі повідомлення. На жаль, усе ще залишається кілька проблем. По-перше, повідомлення не надходять з інтервалом у пів секунди. Вони надходять усі одразу, через 2 секунди (2 000 мілісекунд) після запуску програми. По-друге, ця програма також ніколи не завершується! Натомість вона вічно чекає нових повідомлень. Вам доведеться зупинити її за допомогою ctrl-C.

Код всередині одного async-блоку виконується лінійно

Почнімо з того, що розглянемо, чому повідомлення надходять усі разом після повної затримки, а не з паузами між кожним із них. Усередині будь-якого async-блоку порядок, у якому ключові слова await з’являються в коді, також є порядком, у якому вони виконуються під час роботи програми.

У Лістингу 17-10 є лише один async-блок, тож усе в ньому виконується лінійно. Конкурентності все ще немає. Усі виклики tx.send відбуваються, перемежовуючись усіма викликами trpl::sleep та пов’язаними з ними точками await. Лише після цього цикл while let отримує змогу пройти через будь-які точки await на викликах recv.

Щоб отримати бажану поведінку, коли затримка сну відбувається між кожним повідомленням, нам потрібно помістити операції tx і rx у власні async-блоки, як показано в Лістингу 17-11. Тоді runtime може виконувати кожен із них окремо, використовуючи trpl::join, так само, як у Лістингу 17-8. Знову ж таки, ми await-имо результат виклику trpl::join, а не окремі futures. Якби ми await-или окремі futures послідовно, ми б просто знову повернулися до послідовного потоку — саме того, чого ми намагаємося не робити.

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let (tx, mut rx) = trpl::channel();

        let tx_fut = async {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        trpl::join(tx_fut, rx_fut).await;
    });
}

З оновленим кодом у Лістингу 17-11 повідомлення друкуються з інтервалом 500 мілісекунд, а не всі поспіль після 2 секунд.

Переміщення власності в async-блок

Програма все ще ніколи не завершується, однак, через те, як цикл while let взаємодіє з trpl::join:

  • Future, яку повертає trpl::join, завершується лише тоді, коли завершаться обидві futures, передані їй.
  • Future tx_fut завершується, коли вона закінчує спати після надсилання останнього повідомлення в vals.
  • Future rx_fut не завершиться, доки не закінчиться цикл while let.
  • Цикл while let не закінчиться, доки await-ання rx.recv не дасть None.
  • await-ання rx.recv поверне None лише тоді, коли інший кінець каналу закрито.
  • Канал закриється лише якщо ми викличемо rx.close або коли сторона sender, tx, буде викинута.
  • Ми ніде не викликаємо rx.close, і tx не буде викинуто, доки зовнішній async-блок, переданий до trpl::block_on, не завершиться.
  • Блок не може завершитися, тому що він заблокований на завершенні trpl::join, що повертає нас на початок цього списку.

Зараз async-блок, у якому ми надсилаємо повідомлення, лише запозичує tx, оскільки надсилання повідомлення не потребує власності, але якби ми могли перемістити tx у цей async-блок, він був би викинутий, щойно цей блок закінчиться. У розділі [«Захоплення посилань або переміщення власності»] capture-or-move в Розділі 13 ви дізналися, як використовувати ключове слово move із замиканнями, і, як обговорювалося в розділі «Використання move-замикань з потоками» у Розділі 16, нам часто потрібно переміщати дані в замикання, коли ми працюємо з потоками. Ті самі базові динаміки застосовуються до async-блоків, тож ключове слово move працює з async-блоками так само, як і з замиканнями.

У Лістингу 17-12 ми змінюємо блок, який використовується для надсилання повідомлень, з async на async move.

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let (tx, mut rx) = trpl::channel();

        let tx_fut = async move {
            // --snip--
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        trpl::join(tx_fut, rx_fut).await;
    });
}

Коли ми запускаємо цю версію коду, вона коректно завершується після надсилання й отримання останнього повідомлення. Далі давайте подивимося, що потрібно змінити, щоб надсилати дані з більш ніж однієї future.

Об’єднання кількох futures за допомогою макроса join!

Цей async-канал також є каналом multiple-producer, тож ми можемо викликати clone на tx, якщо хочемо надсилати повідомлення з кількох futures, як показано в Лістингу 17-13.

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let (tx, mut rx) = trpl::channel();

        let tx1 = tx.clone();
        let tx1_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx1.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        let tx_fut = async move {
            let vals = vec![
                String::from("more"),
                String::from("messages"),
                String::from("for"),
                String::from("you"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(1500)).await;
            }
        };

        trpl::join!(tx1_fut, tx_fut, rx_fut);
    });
}

Спочатку ми клонуємо tx, створюючи tx1 поза першим async-блоком. Ми переміщаємо tx1 у цей блок так само, як робили це раніше з tx. Потім, пізніше, ми переміщаємо оригінальний tx у новий async-блок, де надсилаємо більше повідомлень із трохи повільнішою затримкою. Так сталося, що ми розміщуємо цей новий async-блок після async-блоку для отримання повідомлень, але він міг би бути і перед ним. Ключове тут — порядок, у якому futures await-яться, а не порядок, у якому вони створюються.

Обидва async-блоки для надсилання повідомлень мають бути блоками async move, щоб і tx, і tx1 були викинуті, коли ці блоки завершаться. Інакше ми знову опинимося в тому самому нескінченному циклі, з якого почали.

Нарешті, ми переходимо з trpl::join на trpl::join!, щоб обробити додаткову future: макрос join! await-ить довільну кількість futures, коли ми знаємо кількість futures під час компіляції. Ми обговоримо очікування колекції невідомої кількості futures пізніше в цьому розділі.

Тепер ми бачимо всі повідомлення з обох futures, що надсилають, і оскільки futures, що надсилають, використовують трохи різні затримки після надсилання, повідомлення також отримуються з цими різними інтервалами:

received 'hi'
received 'more'
received 'from'
received 'the'
received 'messages'
received 'future'
received 'for'
received 'you'

Ми дослідили, як використовувати передавання повідомлень для надсилання даних між futures, як код всередині async-блоку виконується послідовно, як переміщувати власність в async-блок і як об’єднувати кілька futures. Далі давайте обговоримо, як і чому повідомляти runtime, що він може перейти до іншої task.