Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Конкурентність із спільним станом (Shared-State Concurrency)

Передавання повідомлень — це хороший спосіб обробки конкурентності, але це не єдиний спосіб. Інший метод полягає в тому, щоб кілька потоків отримували доступ до тих самих спільних даних. Знову розгляньте цю частину гасла з документації мови Go: “Do not communicate by sharing memory.”

Як би виглядала комунікація через спільне використання пам’яті? Крім того, чому прихильники передавання повідомлень застерігають не використовувати спільне використання пам’яті?

Певним чином канали в будь-якій мові програмування схожі на одиничну власність (ownership), тому що після того, як ви передаєте значення через канал, ви більше не повинні використовувати це значення. Конкурентність із спільною пам’яттю схожа на множинну власність: кілька потоків можуть одночасно отримувати доступ до однієї й тієї самої ділянки пам’яті. Як ви бачили в Розділі 15, де розумні вказівники зробили можливою множинну власність, множинна власність може додавати складності, тому що цими різними власниками потрібно керувати. Система типів Rust і правила власності значною мірою допомагають правильно організувати це керування. Для прикладу, розгляньмо м’ютекси, один із найпоширеніших примітивів конкурентності для спільної пам’яті.

Керування доступом за допомогою м’ютексів

Mutex — це скорочення від mutual exclusion, тобто м’ютекс дозволяє лише одному потоку отримувати доступ до певних даних у будь-який момент часу. Щоб отримати доступ до даних у м’ютексі, потік спочатку має повідомити, що хоче доступу, попросивши захопити блокування м’ютекса. Lock — це структура даних, яка є частиною м’ютекса і відстежує, хто наразі має виключний доступ до даних. Тому про м’ютекс кажуть, що він охороняє дані, які він містить, через систему блокування.

М’ютекси мають репутацію складних у використанні, тому що вам потрібно пам’ятати два правила:

  1. Ви повинні спробувати захопити блокування перед використанням даних.
  2. Коли ви закінчили з даними, які охороняє м’ютекс, ви повинні розблокувати дані, щоб інші потоки могли захопити блокування.

Як реальну метафору для м’ютекса уявіть панельну дискусію на конференції лише з одним мікрофоном. Перш ніж учасник панелі зможе говорити, він має попросити або подати сигнал, що хоче використати мікрофон. Коли він отримує мікрофон, він може говорити стільки, скільки хоче, а потім передати мікрофон наступному учаснику панелі, який попросить слова. Якщо учасник панелі забуде передати мікрофон, коли закінчить із ним, ніхто інший не зможе говорити. Якщо керування спільним мікрофоном піде не так, панель не працюватиме згідно з планом!

Керування м’ютексами може бути неймовірно складним, щоб зробити його правильним, саме тому так багато людей захоплюються каналами. Однак завдяки системі типів Rust і правилам власності ви не можете помилитися з блокуванням і розблокуванням.

API Mutex<T>

Як приклад використання м’ютекса, почнімо з використання м’ютекса в однопотоковому контексті, як показано в Listing 16-12.

use std::sync::Mutex;

fn main() {
    let m = Mutex::new(5);

    {
        let mut num = m.lock().unwrap();
        *num = 6;
    }

    println!("m = {m:?}");
}

Як і для багатьох типів, ми створюємо Mutex<T> за допомогою асоційованої функції new. Щоб отримати доступ до даних усередині м’ютекса, ми використовуємо метод lock, щоб захопити блокування. Цей виклик заблокує поточний потік, тож він не зможе виконувати жодної роботи, доки не настане наша черга мати блокування.

Виклик lock завершився б помилкою, якби інший потік, що утримує блокування, запанікував. У такому разі ніхто ніколи не зміг би отримати блокування, тому ми вирішили unwrap і змусити цей потік запанікувати, якщо ми опинимося в такій ситуації.

Після того як ми захопили блокування, ми можемо поводитися з поверненим значенням, яке тут назване num, як із змінним посиланням на дані всередині. Система типів гарантує, що ми захоплюємо блокування перед використанням значення в m. Тип mMutex<i32>, а не i32, тож ми маємо викликати lock, щоб мати змогу використати значення i32. Ми не можемо забути; система типів не дозволить нам інакше отримати доступ до внутрішнього i32.

Виклик lock повертає тип під назвою MutexGuard, загорнутий у LockResult, з яким ми впоралися за допомогою виклику unwrap. Тип MutexGuard реалізує Deref, щоб вказувати на наші внутрішні дані; цей тип також має реалізацію Drop, яка автоматично звільняє блокування, коли MutexGuard виходить з області видимості, що відбувається наприкінці внутрішньої області видимості. Як наслідок, ми не ризикуємо забути звільнити блокування і заблокувати використання м’ютекса іншими потоками, тому що звільнення блокування відбувається автоматично.

Після скидання блокування ми можемо надрукувати значення м’ютекса і побачити, що змогли змінити внутрішній i32 на 6.

Спільний доступ до Mutex<T>

Тепер спробуймо поділити значення між кількома потоками за допомогою Mutex<T>. Ми запустимо 10 потоків і змусимо кожен із них збільшити значення лічильника на 1, тож лічильник перейде від 0 до 10. Приклад у Listing 16-13 матиме помилку компілятора, і ми використаємо цю помилку, щоб дізнатися більше про використання Mutex<T> і про те, як Rust допомагає нам використовувати його правильно.

use std::sync::Mutex;
use std::thread;

fn main() {
    let counter = Mutex::new(0);
    let mut handles = vec![];

    for _ in 0..10 {
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

Ми створюємо змінну counter, щоб зберігати i32 усередині Mutex<T>, як і в Listing 16-12. Далі ми створюємо 10 потоків, ітеруючись по діапазону чисел. Ми використовуємо thread::spawn і даємо всім потокам одне й те саме замикання: таке, що переміщує counter у потік, захоплює блокування на Mutex<T>, викликаючи метод lock, а потім додає 1 до значення в м’ютексі. Коли потік завершує виконання свого замикання, num вийде з області видимості та звільнить блокування, щоб інший потік міг його захопити.

У головному потоці ми збираємо всі приєднувані дескриптори. Потім, як ми робили в Listing 16-2, ми викликаємо join для кожного дескриптора, щоб переконатися, що всі потоки завершилися. У цей момент головний потік захопить блокування і надрукує результат цієї програми.

Ми натякнули, що цей приклад не скомпілюється. Тепер з’ясуймо, чому!

$ cargo run
   Compiling shared-state v0.1.0 (file:///projects/shared-state)
error[E0382]: borrow of moved value: `counter`
  --> src/main.rs:21:29
   |
 5 |     let counter = Mutex::new(0);
   |         ------- move occurs because `counter` has type `std::sync::Mutex<i32>`, which does not implement the `Copy` trait
...
 8 |     for _ in 0..10 {
   |     -------------- inside of this loop
 9 |         let handle = thread::spawn(move || {
   |                                    ------- value moved into closure here, in previous iteration of loop
...
21 |     println!("Result: {}", *counter.lock().unwrap());
   |                             ^^^^^^^ value borrowed here after move
   |
help: consider moving the expression out of the loop so it is only moved once
   |
 8 ~     let mut value = counter.lock();
 9 ~     for _ in 0..10 {
10 |         let handle = thread::spawn(move || {
11 ~             let mut num = value.unwrap();
   |

For more information about this error, try `rustc --explain E0382`.
error: could not compile `shared-state` (bin "shared-state") due to 1 previous error

Повідомлення про помилку каже, що значення counter було переміщене під час попередньої ітерації циклу. Rust повідомляє нам, що ми не можемо перемістити власність на блокування counter у кілька потоків. Давайте виправимо помилку компілятора за допомогою методу множинної власності, який ми обговорювали в Розділі 15.

Множинна власність з кількома потоками

У Розділі 15 ми надали значення кільком власникам, використовуючи розумний вказівник Rc<T> для створення значення з підрахунком посилань. Зробімо те саме тут і подивімося, що станеться. Ми загорнемо Mutex<T> в Rc<T> у Listing 16-14 і клонуватимемо Rc<T> перед переміщенням власності до потоку.

use std::rc::Rc;
use std::sync::Mutex;
use std::thread;

fn main() {
    let counter = Rc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Rc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

Ще раз компілюємо і отримуємо… інші помилки! Компілятор багато чому нас навчає:

$ cargo run
   Compiling shared-state v0.1.0 (file:///projects/shared-state)
error[E0277]: `Rc<std::sync::Mutex<i32>>` cannot be sent between threads safely
  --> src/main.rs:11:36
   |
11 |           let handle = thread::spawn(move || {
   |                        ------------- ^------
   |                        |             |
   |  ______________________|_____________within this `{closure@src/main.rs:11:36: 11:43}`
   | |                      |
   | |                      required by a bound introduced by this call
12 | |             let mut num = counter.lock().unwrap();
13 | |
14 | |             *num += 1;
15 | |         });
   | |_________^ `Rc<std::sync::Mutex<i32>>` cannot be sent between threads safely
   |
   = help: within `{closure@src/main.rs:11:36: 11:43}`, the trait `Send` is not implemented for `Rc<std::sync::Mutex<i32>>`
note: required because it's used within this closure
  --> src/main.rs:11:36
   |
11 |         let handle = thread::spawn(move || {
   |                                    ^^^^^^^
note: required by a bound in `spawn`
  --> /rustc/1159e78c4747b02ef996e55082b704c09b970588/library/std/src/thread/mod.rs:723:1

For more information about this error, try `rustc --explain E0277`.
error: could not compile `shared-state` (bin "shared-state") due to 1 previous error

Ого, це повідомлення про помилку дуже багатослівне! Ось на чому важливо зосередитися: `Rc<Mutex<i32>>` cannot be sent between threads safely. Компілятор також повідомляє нам причину: the trait `Send` is not implemented for `Rc<Mutex<i32>>`. Про Send ми поговоримо в наступному розділі: це один із трейтів, які гарантують, що типи, які ми використовуємо з потоками, призначені для використання в конкурентних ситуаціях.

На жаль, Rc<T> не є безпечним для спільного використання між потоками. Коли Rc<T> керує лічильником посилань, він додає до лічильника для кожного виклику clone і віднімає від лічильника, коли кожен клон видаляється. Але він не використовує жодних примітивів конкурентності, щоб переконатися, що зміни лічильника не можуть бути перервані іншим потоком. Це може призвести до неправильних лічильників — тонких помилок, які у свою чергу можуть призвести до витоків пам’яті або до того, що значення буде видалене до того, як ми з ним закінчимо. Нам потрібен тип, який є точно таким самим, як Rc<T>, але який робить зміни лічильника посилань у безпечний для потоків спосіб.

Атомарне лічення посилань з Arc<T>

На щастя, Arc<T> є типом, подібним до Rc<T>, який безпечно використовувати в конкурентних ситуаціях. a означає atomic, тобто це тип із атомарним підрахунком посилань. Атоміки — це додатковий вид примітива конкурентності, який ми не будемо детально розглядати тут: дивіться документацію стандартної бібліотеки для std::sync::atomic для отримання додаткових відомостей. На цьому етапі вам достатньо знати, що атоміки працюють як примітивні типи, але безпечні для спільного використання між потоками.

Тоді ви можете спитати, чому всі примітивні типи не є атомарними і чому типи стандартної бібліотеки не реалізовані так, щоб за замовчуванням використовувати Arc<T>. Причина в тому, що безпечність для потоків має пов’язаний із нею штраф за продуктивність, який ви хочете платити лише тоді, коли вам це справді потрібно. Якщо ви просто виконуєте операції над значеннями в межах одного потоку, ваш код може працювати швидше, якщо йому не потрібно забезпечувати гарантії, які надають атоміки.

Повернімося до нашого прикладу: Arc<T> і Rc<T> мають той самий API, тож ми виправляємо нашу програму, змінюючи рядок use, виклик new і виклик clone. Код у Listing 16-15 нарешті скомпілюється і запуститься.

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

Цей код виведе таке:

Result: 10

Ми зробили це! Ми порахували від 0 до 10, що може й не здаватися дуже вражаючим, але це навчило нас багато чому про Mutex<T> і безпечність для потоків. Ви також можете використати структуру цієї програми для виконання складніших операцій, ніж просто збільшення лічильника. Використовуючи цю стратегію, ви можете розбити обчислення на незалежні частини, розподілити ці частини між потоками, а потім використати Mutex<T>, щоб кожен потік оновлював остаточний результат своєю частиною.

Зауважте, що якщо ви виконуєте прості числові операції, існують типи простіші, ніж типи Mutex<T>, які надає std::sync::atomic module of the standard library. Ці типи забезпечують безпечний, конкурентний, атомарний доступ до примітивних типів. Ми вирішили використати Mutex<T> з примітивним типом для цього прикладу, щоб зосередитися на тому, як працює Mutex<T>.

Порівняння RefCell<T>/Rc<T> і Mutex<T>/Arc<T>

Ви могли помітити, що counter є незмінним, але ми могли отримати змінне посилання на значення всередині нього; це означає, що Mutex<T> забезпечує внутрішню змінність, як і родина Cell. Так само, як ми використовували RefCell<T> у Розділі 15, щоб мати змогу змінювати вміст усередині Rc<T>, ми використовуємо Mutex<T> для зміни вмісту усередині Arc<T>.

Ще одна деталь, на яку варто звернути увагу: Rust не може захистити вас від усіх видів логічних помилок, коли ви використовуєте Mutex<T>. Згадайте з Розділу 15, що використання Rc<T> несло ризик створення циклів посилань, коли два значення Rc<T> посилаються одне на одне, спричиняючи витоки пам’яті. Подібним чином Mutex<T> несе ризик створення deadlocks. Вони виникають, коли операції потрібно заблокувати два ресурси, і два потоки вже захопили по одному з блокувань, через що вони чекають один на одного вічно. Якщо вас цікавлять deadlocks, спробуйте створити програму Rust, у якій є deadlock; потім дослідіть стратегії пом’якшення deadlock для м’ютексів у будь-якій мові і спробуйте реалізувати їх у Rust. Документація API стандартної бібліотеки для Mutex<T> і MutexGuard містить корисну інформацію.

Ми завершимо цей розділ, говорячи про трейт Send і Sync та про те, як ми можемо використовувати їх із власними типами.