Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Передавання даних між потоками за допомогою передавання повідомлень

Один дедалі популярніший підхід до забезпечення безпечної конкурентності — передавання повідомлень (message passing), за якого потоки або актори спілкуються, надсилаючи один одному повідомлення, що містять дані. Ось ідея у вигляді гасла з документації мови Go: «Не спілкуйтеся, ділячись пам’яттю; натомість діліться пам’яттю, спілкуючись».

Щоб реалізувати конкурентність із надсиланням повідомлень, стандартна бібліотека Rust надає реалізацію каналів. Канал — це загальна концепція програмування, за допомогою якої дані надсилаються з одного потоку до іншого.

У програмуванні канал можна уявити як спрямований водний канал, наприклад струмок або річку. Якщо ви кинете щось на кшталт гумової качечки в річку, вона попливе вниз за течією до кінця водного шляху.

Канал має дві половини: передавач і приймач. Половина передавача — це верхня за течією локація, куди ви кладете гумову качечку в річку, а половина приймача — це місце нижче за течією, де гумова качечка опиняється. Одна частина вашого коду викликає методи на передавачі з даними, які ви хочете надіслати, а інша частина перевіряє кінцеву точку приймання на наявність повідомлень, що надходять. Про канал кажуть, що він закритий, якщо або половина передавача, або половина приймача видалена.

Тут ми дійдемо до програми, яка має один потік для генерації значень і надсилання їх каналом, та інший потік, який отримуватиме значення й друкуватиме їх. Ми надсилатимемо прості значення між потоками за допомогою каналу, щоб продемонструвати цю можливість. Коли ви ознайомитеся з цією технікою, ви зможете використовувати канали для будь-яких потоків, яким потрібно спілкуватися один з одним, наприклад для чат-системи або системи, де багато потоків виконують частини обчислення й надсилають ці частини одному потоку, який агрегує результати.

Спочатку, у переліку 16-6, ми створимо канал, але нічого з ним не робитимемо. Зверніть увагу, що це ще не скомпілюється, оскільки Rust не може визначити, значення якого типу ми хочемо надсилати через канал.

use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();
}

Ми створюємо новий канал за допомогою функції mpsc::channel; mpsc означає multiple producer, single consumer. Коротко кажучи, спосіб, у який стандартна бібліотека Rust реалізує канали, означає, що канал може мати кілька sending кінців, які створюють значення, але лише один receiving кінець, який споживає ці значення. Уявіть кілька струмків, що впадають в одну велику річку: усе, що надіслано будь-яким зі струмків, опиниться наприкінці в одній річці. Поки що ми почнемо з одного producer, але додамо кількох producers, коли доведемо цей приклад до робочого стану.

Функція mpsc::channel повертає кортеж, перший елемент якого — це кінець надсилання — передавач, а другий елемент — це кінець отримання — приймач. Скорочення tx і rx традиційно використовуються в багатьох сферах для transmitter і receiver, відповідно, тож ми називаємо наші змінні саме так, щоб позначити кожен кінець. Ми використовуємо оператор let із зразком, який деструктурує кортежі; про використання зразків в операторах let і деструктурування ми поговоримо в главі 19. Поки що знайте, що використання оператора let у такий спосіб — це зручний підхід до вилучення частин кортежу, повернутого mpsc::channel.

Давайте перемістимо кінець передавання в створений потік і змусимо його надіслати один рядок, щоб створений потік спілкувався з головним потоком, як показано в переліку 16-7. Це як покласти гумову качечку в річку вище за течією або надіслати чат-повідомлення з одного потоку до іншого.

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });
}

Знову ми використовуємо thread::spawn, щоб створити новий потік, а потім використовуємо move, щоб перемістити tx у замикання, щоб створений потік володів tx. Створений потік повинен володіти передавачем, щоб мати змогу надсилати повідомлення через канал.

У передавача є метод send, який приймає значення, яке ми хочемо надіслати. Метод send повертає тип Result<T, E>, тож якщо приймач уже видалено і нікуди надсилати значення, операція надсилання поверне помилку. У цьому прикладі ми викликаємо unwrap, щоб викликати паніку у разі помилки. Але в реальному застосунку ми обробили б це належним чином: поверніться до глави 9, щоб переглянути стратегії належної обробки помилок.

У переліку 16-8 ми отримаємо значення від приймача в головному потоці. Це як дістати гумову качечку з води наприкінці річки або отримати чат-повідомлення.

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });

    let received = rx.recv().unwrap();
    println!("Got: {received}");
}

У приймача є два корисні методи: recv і try_recv. Ми використовуємо recv, скорочення від receive, який заблокує виконання головного потоку і чекатиме, доки в канал не буде надіслано значення. Щойно значення буде надіслано, recv поверне його в Result<T, E>. Коли передавач закривається, recv поверне помилку, щоб сигналізувати, що більше значень не надходитиме.

Метод try_recv не блокує, а натомість негайно повертає Result<T, E>: значення Ok, що містить повідомлення, якщо воно доступне, і значення Err, якщо цього разу жодних повідомлень немає. Використання try_recv корисне, якщо цей потік має іншу роботу, поки очікує повідомлень: ми могли б написати цикл, який час від часу викликає try_recv, обробляє повідомлення, якщо воно доступне, а інакше певний час виконує іншу роботу, доки не перевірить знову.

У цьому прикладі ми використали recv для простоти; у головного потоку немає іншої роботи, окрім очікування повідомлень, тож блокування головного потоку є доречним.

Коли ми запустимо код у переліку 16-8, ми побачимо, як значення надруковано з головного потоку:

Got: hi

Чудово!

Передавання власності через канали

Правила власності відіграють життєво важливу роль у надсиланні повідомлень, оскільки вони допомагають вам писати безпечний, конкурентний код. Запобігання помилкам у конкурентному програмуванні — це перевага того, що ви мислите про власність упродовж усіх ваших програм Rust. Давайте проведемо експеримент, щоб показати, як канали й власність працюють разом, щоб запобігати проблемам: ми спробуємо використати значення val у створеному потоці після того, як ми надіслали його каналом. Спробуйте скомпілювати код у переліку 16-9, щоб побачити, чому цей код не дозволений.

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
        println!("val is {val}");
    });

    let received = rx.recv().unwrap();
    println!("Got: {received}");
}

Тут ми намагаємося надрукувати val після того, як надіслали його каналом через tx.send. Дозволити це було б поганою ідеєю: щойно значення надіслано в інший потік, той потік може змінити його або видалити, перш ніж ми спробуємо використати значення знову. Потенційно зміни іншого потоку могли б спричинити помилки або неочікувані результати через несумісні або відсутні дані. Однак Rust видає нам помилку, якщо ми спробуємо скомпілювати код у переліку 16-9:

$ cargo run
   Compiling message-passing v0.1.0 (file:///projects/message-passing)
error[E0382]: borrow of moved value: `val`
  --> src/main.rs:10:27
   |
 8 |         let val = String::from("hi");
   |             --- move occurs because `val` has type `String`, which does not implement the `Copy` trait
 9 |         tx.send(val).unwrap();
   |                 --- value moved here
10 |         println!("val is {val}");
   |                           ^^^ value borrowed here after move
   |
   = note: this error originates in the macro `$crate::format_args_nl` which comes from the expansion of the macro `println` (in Nightly builds, run with -Z macro-backtrace for more info)

For more information about this error, try `rustc --explain E0382`.
error: could not compile `message-passing` (bin "message-passing") due to 1 previous error

Наша помилка конкурентності спричинила помилку часу компіляції. Функція send приймає власність свого параметра, і коли значення переміщується, приймач бере його у власність. Це зупиняє нас від випадкового повторного використання значення після надсилання; система власності перевіряє, що все гаразд.

Надсилання кількох значень

Код у переліку 16-8 скомпілювався і запрацював, але він не показав нам чітко, що два окремі потоки спілкувалися один з одним через канал.

У переліку 16-10 ми внесли деякі зміни, які доведуть, що код у переліку 16-8 виконується конкурентно: створений потік тепер надсилатиме кілька повідомлень і робитиме паузу на секунду між кожним повідомленням.

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {received}");
    }
}

Цього разу створений потік має вектор рядків, які ми хочемо надіслати в головний потік. Ми ітеруємося по них, надсилаючи кожен окремо, і робимо паузу між кожним, викликаючи функцію thread::sleep зі значенням Duration в одну секунду.

У головному потоці ми більше не викликаємо функцію recv явно: натомість ми розглядаємо rx як ітератор. Для кожного отриманого значення ми його друкуємо. Коли канал закривається, ітерація завершується.

Під час запуску коду в переліку 16-10 ви повинні побачити такий вивід із паузою в одну секунду між кожним рядком:

Got: hi
Got: from
Got: the
Got: thread

Оскільки у нас немає жодного коду, який робить паузу або затримку в циклі for у головному потоці, ми можемо зрозуміти, що головний потік чекає отримання значень від створеного потоку.

Створення кількох producer

Раніше ми згадували, що mpsc — це акронім для multiple producer, single consumer. Давайте застосуємо mpsc на практиці й розширимо код у переліку 16-10, щоб створити кілька потоків, які всі надсилатимуть значення одному й тому самому приймачу. Ми можемо зробити це, клонуючи передавач, як показано в переліку 16-11.

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    // --snip--

    let (tx, rx) = mpsc::channel();

    let tx1 = tx.clone();
    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx1.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    thread::spawn(move || {
        let vals = vec![
            String::from("more"),
            String::from("messages"),
            String::from("for"),
            String::from("you"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {received}");
    }

    // --snip--
}

Цього разу, перед тим як ми створимо перший створений потік, ми викликаємо clone на передавачі. Це дасть нам новий передавач, який ми зможемо передати першому створеному потоку. Ми передаємо оригінальний передавач другому створеному потоку. Це дає нам два потоки, кожен із яких надсилає різні повідомлення одному приймачу.

Коли ви запустите код, ваш вивід має виглядати приблизно так:

Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you

Ви можете побачити значення в іншому порядку, залежно від вашої системи. Саме це робить конкурентність цікавою, а також складною. Якщо ви поекспериментуєте з thread::sleep, задаючи йому різні значення в різних потоках, кожен запуск буде більш недетермінованим і щоразу створюватиме інший вивід.

Тепер, коли ми подивилися, як працюють канали, давайте подивимося на інший метод конкурентності.