Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Від однопотокового до багатопотокового сервера

Наразі сервер оброблятиме кожен запит по черзі, тобто він не обробить друге з’єднання, доки не завершиться обробка першого з’єднання. Якби сервер отримував дедалі більше запитів, таке послідовне виконання ставало б дедалі менш оптимальним. Якщо сервер отримує запит, обробка якого займає багато часу, наступні запити мають чекати, доки довгий запит буде завершено, навіть якщо нові запити можна обробити швидко. Нам потрібно буде це виправити, але спочатку ми подивимося на проблему в дії.

Моделювання повільного запиту

Ми розглянемо, як повільно оброблюваний запит може вплинути на інші запити, зроблені до нашої поточної реалізації сервера. Лістинг 21-10 реалізує обробку запиту до /sleep із змодельованою повільною відповіддю, через що сервер буде спати п’ять секунд перед відповіддю.

use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};
// --snip--

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        handle_connection(stream);
    }
}

fn handle_connection(mut stream: TcpStream) {
    // --snip--

    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    // --snip--

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}

Тепер, коли в нас є три випадки, ми перейшли з if до match. Нам потрібно явно зіставляти зі зрізом request_line, щоб виконати зіставлення зі зразком проти значень рядкового літерала; match не робить автоматичного посилання та розіменування, як це робить метод equality.

Перша гілка така сама, як і блок if з Лістинга 21-9. Друга гілка зіставляється із запитом до /sleep. Коли цей запит отримано, сервер буде спати п’ять секунд перед рендерингом успішної HTML-сторінки. Третя гілка така сама, як і блок else з Лістинга 21-9.

Ви можете побачити, наскільки примітивним є наш сервер: реальні бібліотеки обробляли б розпізнавання кількох запитів значно менш багатослівно!

Запустіть сервер за допомогою cargo run. Потім відкрийте два вікна браузера: одне для http://127.0.0.1:7878, а інше для http://127.0.0.1:7878/sleep. Якщо ви кілька разів введете URI /, як і раніше, ви побачите, що він відповідає швидко. Але якщо ви введете /sleep, а потім завантажите /, ви побачите, що / чекає, доки sleep не проспить свої повні п’ять секунд перед завантаженням.

Є кілька технік, які ми могли б використати, щоб уникнути накопичення запитів за повільним запитом, зокрема використання async, як ми робили в Chapter 17; те, що ми реалізуємо, — це thread pool.

Покращення пропускної здатності за допомогою thread pool

Thread pool — це група запущених threads, які готові й чекають, щоб обробити task. Коли програма отримує новий task, вона призначає один із threads у pool для цього task, і цей thread оброблятиме task. Решта threads у pool доступні для обробки будь-яких інших tasks, що надходять, поки перший thread обробляє свій task. Коли перший thread завершує обробку свого task, його повертають до pool незайнятих threads, готових обробити новий task. Thread pool дає змогу обробляти з’єднання concurrently, збільшуючи throughput вашого сервера.

Ми обмежимо кількість threads у pool невеликою кількістю, щоб захиститися від DoS-атак; якби ми змусили нашу програму створювати новий thread для кожного запиту, щойно він надходить, хтось, хто надішле 10 мільйонів запитів до нашого сервера, міг би спричинити хаос, використавши всі ресурси нашого сервера й повністю зупинивши обробку запитів.

Отже, замість створення необмеженої кількості threads, у нас буде фіксована кількість threads, що чекають у pool. Запити, які надходять, надсилаються до pool на обробку. Pool підтримуватиме чергу вхідних запитів. Кожен із threads у pool буде забирати запит із цієї черги, обробляти його, а потім просити чергу надати інший запит. За такого дизайну ми можемо concurrently обробляти до N запитів, де N — це кількість threads. Якщо кожен thread відповідає на довготривалий запит, наступні запити все ще можуть накопичуватися в черзі, але ми збільшили кількість довготривалих запитів, які можемо обробити, перш ніж дійдемо до цього моменту.

Ця техніка — лише один із багатьох способів покращити throughput web server. Інші варіанти, які ви можете дослідити, — це модель fork/join, однопотокова модель async I/O та багатопотокова модель async I/O. Якщо вас цікава ця тема, ви можете прочитати більше про інші рішення і спробувати реалізувати їх; з низькорівневою мовою, такою як Rust, усі ці варіанти можливі.

Перш ніж ми почнемо реалізовувати thread pool, давайте поговоримо про те, як мало б виглядати використання pool. Коли ви намагаєтеся спроєктувати код, спочатку написання інтерфейсу клієнта може допомогти спрямувати ваш дизайн. Напишіть API коду так, щоб він був структурований у спосіб, у який ви хочете його викликати; потім реалізуйте функціональність усередині цієї структури, а не реалізовуйте функціональність, а потім проєктуйте public API.

Подібно до того, як ми використовували test-driven development у проєкті в Chapter 12, тут ми використовуватимемо compiler-driven development. Ми напишемо код, який викликає потрібні нам функції, а потім подивимося на помилки від компілятора, щоб визначити, що нам слід змінити далі, аби код запрацював. Однак перш ніж ми це зробимо, ми дослідимо техніку, яку не будемо використовувати, як відправну точку.

Запуск thread для кожного запиту

Спочатку давайте подивимося, як міг би виглядати наш код, якби він створював новий thread для кожного з’єднання. Як згадувалося раніше, це не наш кінцевий план через проблеми з потенційним створенням необмеженої кількості threads, але це відправна точка, щоб спочатку отримати працездатний багатопотоковий сервер. Потім ми додамо thread pool як покращення, і зіставляти ці два рішення буде простіше.

Лістинг 21-11 показує зміни до main, щоб запускати новий thread для обробки кожного stream у циклі for.

use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        thread::spawn(|| {
            handle_connection(stream);
        });
    }
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}

Як ви дізналися в Chapter 16, thread::spawn створить новий thread, а потім запустить код у замиканні в новому thread. Якщо ви запустите цей код і завантажите /sleep у браузері, а потім / у двох інших вкладках браузера, ви справді побачите, що запити до / не мають чекати, поки /sleep завершиться. Однак, як ми згадували, це зрештою перевантажить систему, тому що ви створюватимете нові threads без жодного обмеження.

Ви також можете згадати з Chapter 17, що це саме той тип ситуації, де async і await справді розкриваються! Пам’ятайте про це, коли ми будуватимемо thread pool і думатимемо про те, як усе виглядало б інакше або так само з async.

Створення подібного інтерфейсу для скінченної кількості threads

Ми хочемо, щоб наш thread pool працював подібним, звичним способом, щоб перехід від threads до thread pool не вимагав великих змін у коді, який використовує наш API. Лістинг 21-12 показує гіпотетичний інтерфейс для структури ThreadPool, яку ми хочемо використовувати замість thread::spawn.

use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}

Ми використовуємо ThreadPool::new, щоб створити новий thread pool із налаштовуваною кількістю threads, у цьому випадку чотири. Потім, у циклі for, pool.execute має подібний інтерфейс до thread::spawn у тому, що він приймає замикання, яке pool має запускати для кожного stream. Нам потрібно реалізувати pool.execute так, щоб він приймав замикання і передавав його thread у pool для виконання. Цей код ще не скомпілюється, але ми спробуємо, щоб компілятор міг підказати нам, як це виправити.

Побудова ThreadPool за допомогою compiler-driven development

Внесіть зміни з Лістинга 21-12 до src/main.rs, а потім використаємо помилки компілятора з cargo check, щоб спрямувати нашу розробку. Ось перша помилка, яку ми отримуємо:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0433]: failed to resolve: use of undeclared type `ThreadPool`
  --> src/main.rs:11:16
   |
11 |     let pool = ThreadPool::new(4);
   |                ^^^^^^^^^^ use of undeclared type `ThreadPool`

For more information about this error, try `rustc --explain E0433`.
error: could not compile `hello` (bin "hello") due to 1 previous error

Чудово! Ця помилка повідомляє нам, що нам потрібен тип або модуль ThreadPool, тому ми зараз його створимо. Наша реалізація ThreadPool буде незалежною від того, яку саме роботу виконує наш web server. Отже, давайте перетворимо крейт hello з binary crate на library crate, щоб зберігати нашу реалізацію ThreadPool. Після переходу на library crate ми також могли б використовувати окрему library thread pool для будь-якої роботи, яку захочемо виконувати за допомогою thread pool, а не лише для обслуговування web requests.

Створіть файл src/lib.rs, який міститиме таке, — це найпростіше визначення структури ThreadPool, яке ми поки що можемо мати:

pub struct ThreadPool;

Потім відредагуйте файл main.rs, щоб імпортувати ThreadPool в область видимості з library crate, додавши такий код на початок src/main.rs:

use hello::ThreadPool;
use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}

Цей код усе ще не працюватиме, але давайте перевіримо його знову, щоб отримати наступну помилку, яку нам потрібно виправити:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no function or associated item named `new` found for struct `ThreadPool` in the current scope
  --> src/main.rs:12:28
   |
12 |     let pool = ThreadPool::new(4);
   |                            ^^^ function or associated item not found in `ThreadPool`

For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error

Ця помилка вказує, що наступним кроком нам потрібно створити асоційовану функцію під назвою new для ThreadPool. Ми також знаємо, що new має мати один параметр, який може приймати 4 як аргумент, і має повертати екземпляр ThreadPool. Давайте реалізуємо найпростішу функцію new, яка матиме ці характеристики:

pub struct ThreadPool;

impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        ThreadPool
    }
}

Ми обрали usize як тип параметра size, тому що знаємо, що від’ємна кількість threads не має жодного сенсу. Ми також знаємо, що використовуватимемо це 4 як кількість елементів у колекції threads, а для цього і призначений тип usize, як обговорювалося в розділі “Integer Types” у Chapter 3.

Давайте перевіримо код ще раз:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no method named `execute` found for struct `ThreadPool` in the current scope
  --> src/main.rs:17:14
   |
17 |         pool.execute(|| {
   |         -----^^^^^^^ method not found in `ThreadPool`

For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error

Тепер помилка виникає тому, що в нас немає методу execute на ThreadPool. Згадайте з розділу “Створення подібного інтерфейсу для скінченної кількості threads”, що ми вирішили, що наш thread pool має мати інтерфейс, подібний до thread::spawn. Крім того, ми реалізуємо функцію execute так, щоб вона приймала замикання, яке їй передають, і передавала його незайнятому thread у pool для виконання.

Ми визначимо метод execute на ThreadPool, щоб він приймав замикання як параметр. Згадайте з розділу [“Переміщення захоплених значень із замикань”] moving-out-of-closures у Chapter 13, що ми можемо приймати замикання як параметри за допомогою трьох різних трейтів: Fn, FnMut і FnOnce. Нам потрібно вирішити, який саме тип замикання тут використати. Ми знаємо, що зрештою зробимо щось подібне до стандартної реалізації library thread::spawn, тож можемо подивитися, які обмеження має сигнатура thread::spawn на свій параметр. Документація показує таке:

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static,

Параметр типу F — це той, який нас тут цікавить; параметр типу T пов’язаний із значенням, що повертається, і він нас не цікавить. Ми бачимо, що spawn використовує FnOnce як trait bound для F. Ймовірно, це саме те, що ми хочемо і тут, тому що зрештою передамо аргумент, який отримуємо в execute, до spawn. Ми можемо ще більше впевнитися, що FnOnce — це той trait, який нам потрібно використовувати, тому що thread, який виконує запит, лише один раз виконає замикання цього запиту, що відповідає Once у FnOnce.

Параметр типу F також має trait bound Send і lifetime bound 'static, які корисні в нашій ситуації: нам потрібен Send, щоб передати замикання з одного thread в інший, і 'static, тому що ми не знаємо, скільки часу thread знадобиться на виконання. Давайте створимо метод execute на ThreadPool, який прийматиме узагальнений параметр типу F із цими обмеженнями:

pub struct ThreadPool;

impl ThreadPool {
    // --snip--
    pub fn new(size: usize) -> ThreadPool {
        ThreadPool
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

Ми й далі використовуємо () після FnOnce, тому що цей FnOnce представляє замикання, яке не приймає параметрів і повертає unit type (). Так само, як і у визначеннях функцій, тип, що повертається, можна опустити із сигнатури, але навіть якщо в нас немає параметрів, дужки все одно потрібні.

Знову ж таки, це найпростіша реалізація методу execute: вона нічого не робить, але ми лише намагаємося змусити наш код скомпілюватися. Давайте перевіримо його ще раз:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.24s

Воно компілюється! Але зауважте: якщо ви спробуєте cargo run і зробите запит у браузері, то побачите помилки в браузері, які ми бачили на початку розділу. Наша library насправді ще не викликає замикання, передане до execute!

Примітка: Ви можете почути вислів про мови зі строгими компіляторами, такими як Haskell і Rust: «If the code compiles, it works.» Але цей вислів не є універсально правильним. Наш проєкт компілюється, але абсолютно нічого не робить! Якби ми будували реальний, повний проєкт, це був би гарний час почати писати unit tests, щоб перевірити, що код компілюється і має потрібну нам поведінку.

Поміркуйте: що було б інакше тут, якби ми збиралися виконувати future, а не замикання?

Перевірка кількості threads у new

Ми нічого не робимо з параметрами new і execute. Давайте реалізуємо тіла цих функцій із потрібною нам поведінкою. Для початку подумаємо про new. Раніше ми вибрали беззнаковий тип для параметра size, тому що pool із від’ємною кількістю threads не має сенсу. Однак pool із нульовою кількістю threads теж не має сенсу, хоча нуль є цілком допустимим usize. Ми додамо код, щоб перевірити, що size більший за нуль, перед тим як повернути екземпляр ThreadPool, і змусимо програму panic, якщо вона отримає нуль, використавши макрос assert!, як показано в Лістингу 21-13.

pub struct ThreadPool;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        ThreadPool
    }

    // --snip--
    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

Ми також додали деяку документацію для нашого ThreadPool за допомогою doc comments. Зауважте, що ми дотрималися добрих практик документації, додавши розділ, який вказує на ситуації, у яких наша функція може panic, як обговорювалося в Chapter 14. Спробуйте запустити cargo doc --open і натиснути на структуру ThreadPool, щоб побачити, як виглядає згенерована документація для new!

Замість того щоб додавати макрос assert!, як ми зробили тут, ми могли б змінити new на build і повертати Result, як ми зробили з Config::build у I/O проєкті в Лістингу 12-9. Але в цьому випадку ми вирішили, що спроба створити thread pool без жодного thread є непереборною помилкою. Якщо ви налаштовані амбітно, спробуйте написати функцію під назвою build із такою сигнатурою, щоб порівняти її з функцією new:

pub fn build(size: usize) -> Result<ThreadPool, PoolCreationError> {

Створення місця для зберігання threads

Тепер, коли ми маємо спосіб переконатися, що в нас є дійсна кількість threads для зберігання в pool, ми можемо створити ці threads і зберегти їх у структурі ThreadPool перед тим, як повертати структуру. Але як нам «зберегти» thread? Давайте ще раз подивимося на сигнатуру thread::spawn:

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static,

Функція spawn повертає JoinHandle<T>, де T — це тип, який повертає замикання. Давайте спробуємо використати JoinHandle теж і подивимося, що станеться. У нашому випадку замикання, які ми передаємо до thread pool, оброблятимуть з’єднання і нічого не повертатимуть, тож T буде unit type ().

Код у Лістингу 21-14 скомпілюється, але поки що не створює жодного thread. Ми змінили визначення ThreadPool, щоб воно містило vector thread::JoinHandle<()>, ініціалізували vector із ємністю size, налаштували цикл for, який виконуватиме код для створення threads, і повернули екземпляр ThreadPool, що містить їх.

use std::thread;

pub struct ThreadPool {
    threads: Vec<thread::JoinHandle<()>>,
}

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let mut threads = Vec::with_capacity(size);

        for _ in 0..size {
            // create some threads and store them in the vector
        }

        ThreadPool { threads }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

Ми імпортували std::thread у область видимості library crate, тому що використовуємо thread::JoinHandle як тип елементів у vector у ThreadPool.

Після отримання дійсного size наш ThreadPool створює новий vector, який може містити size елементів. Функція with_capacity виконує ту саму задачу, що й Vec::new, але з важливою відмінністю: вона попередньо виділяє місце у vector. Оскільки ми знаємо, що нам потрібно зберігати size елементів у vector, робити це виділення наперед трохи ефективніше, ніж використовувати Vec::new, який змінює свій розмір у міру вставляння елементів.

Коли ви знову запустите cargo check, він має успішно завершитися.

Надсилання коду з ThreadPool до thread

Ми залишили коментар у циклі for в Лістингу 21-14 щодо створення threads. Тут ми подивимося, як саме створювати threads. Standard library надає thread::spawn як спосіб створення threads, і thread::spawn очікує отримати деякий код, який thread має виконати одразу після створення thread. Однак у нашому випадку ми хочемо створити threads і змусити їх чекати на код, який ми надішлемо пізніше. Реалізація threads у standard library не містить способу зробити це; нам потрібно реалізувати це вручну.

Ми реалізуємо таку поведінку, ввівши нову структуру даних між ThreadPool і threads, яка керуватиме цією новою поведінкою. Ми назвемо цю структуру даних Worker, що є поширеним терміном у реалізаціях pooling. Worker забирає код, який потрібно виконати, і виконує цей код у своєму thread.

Подумайте про людей, які працюють на кухні в ресторані: workers чекають, поки від клієнтів надійдуть замовлення, а потім відповідають за те, щоб узяти ці замовлення й виконати їх.

Замість зберігання vector екземплярів JoinHandle<()> у thread pool ми зберігатимемо екземпляри структури Worker. Кожен Worker зберігатиме один екземпляр JoinHandle<()>. Потім ми реалізуємо метод на Worker, який прийматиме замикання коду для виконання і надсилатиме його вже запущеному thread для виконання. Ми також надамо кожному Worker id, щоб можна було розрізняти різні екземпляри Worker у pool під час логування або налагодження.

Ось новий процес, який відбуватиметься, коли ми створюємо ThreadPool. Ми реалізуємо код, який надсилає замикання до thread, після того як налаштуємо Worker у такий спосіб:

  1. Визначте структуру Worker, яка містить id і JoinHandle<()>.
  2. Змініть ThreadPool так, щоб він містив vector екземплярів Worker.
  3. Визначте функцію Worker::new, яка приймає число id і повертає екземпляр Worker, що містить id і thread, запущений із порожнім замиканням.
  4. У ThreadPool::new використовуйте лічильник циклу for, щоб згенерувати id, створити новий Worker із цим id і зберегти Worker у vector.

Якщо ви готові до виклику, спробуйте реалізувати ці зміни самостійно, перш ніж дивитися на код у Лістингу 21-15.

Готові? Ось Лістинг 21-15 з одним із способів внести попередні зміни.

use std::thread;

pub struct ThreadPool {
    workers: Vec<Worker>,
}

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id));
        }

        ThreadPool { workers }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize) -> Worker {
        let thread = thread::spawn(|| {});

        Worker { id, thread }
    }
}

Ми змінили назву поля в ThreadPool з threads на workers, тому що тепер воно містить екземпляри Worker замість екземплярів JoinHandle<()>. Ми використовуємо лічильник у циклі for як аргумент для Worker::new, і зберігаємо кожен новий Worker у vector під назвою workers.

Зовнішньому коду (як нашому серверу в src/main.rs) не потрібно знати деталі реалізації щодо використання структури Worker всередині ThreadPool, тому ми робимо структуру Worker і її функцію new приватними. Функція Worker::new використовує id, який ми їй даємо, і зберігає екземпляр JoinHandle<()>, створений шляхом запуску нового thread із порожнім замиканням.

Примітка: Якщо операційна система не може створити thread через те, що бракує системних ресурсів, thread::spawn викличе panic. Це призведе до panic усього нашого сервера, навіть якщо створення деяких threads може успішно завершитися. Для простоти така поведінка прийнятна, але в production-реалізації thread pool, ймовірно, вам варто було б використати std::thread::Builder і його метод spawn, який натомість повертає Result.

Цей код скомпілюється й зберігатиме кількість екземплярів Worker, яку ми вказали як аргумент для ThreadPool::new. Але ми все ще не обробляємо замикання, яке отримуємо в execute. Давайте подивимося, як зробити це далі.

Надсилання запитів до threads через channels

Наступна проблема, яку ми розв’яжемо, полягає в тому, що замикання, передані до thread::spawn, абсолютно нічого не роблять. Наразі ми отримуємо замикання, яке хочемо виконати, у методі execute. Але нам потрібно дати thread::spawn замикання, яке потрібно виконати, коли ми створюємо кожен Worker під час створення ThreadPool.

Ми хочемо, щоб щойно створені структури Worker отримували код для виконання з черги, що зберігається в ThreadPool, і надсилали цей код до свого thread для виконання.

Channels, про які ми дізналися в Chapter 16, — простий спосіб комунікації між двома threads — ідеально підходять для цього випадку. Ми використаємо channel як чергу jobs, а execute надсилатиме job від ThreadPool до екземплярів Worker, які надсилатимуть job до свого thread. Ось план:

  1. ThreadPool створить channel і зберігатиме sender.
  2. Кожен Worker зберігатиме receiver.
  3. Ми створимо нову структуру Job, яка зберігатиме замикання, які ми хочемо передавати через channel.
  4. Метод execute надсилатиме job, який він хоче виконати, через sender.
  5. У своєму thread Worker буде ітеруватися по receiver і виконувати замикання будь-яких jobs, які він отримує.

Почнемо зі створення channel у ThreadPool::new і збереження sender в екземплярі ThreadPool, як показано в Лістингу 21-16. Структура Job поки що нічого не зберігає, але буде типом елемента, який ми передаємо через channel.

use std::{sync::mpsc, thread};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id));
        }

        ThreadPool { workers, sender }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize) -> Worker {
        let thread = thread::spawn(|| {});

        Worker { id, thread }
    }
}

У ThreadPool::new ми створюємо новий channel і змушуємо pool зберігати sender. Це успішно скомпілюється.

Спробуймо передати receiver channel кожному Worker, коли thread pool створює channel. Ми знаємо, що хочемо використовувати receiver у thread, який запускають екземпляри Worker, тому в замиканні ми звернемося до параметра receiver. Код у Лістингу 21-17 ще не зовсім скомпілюється.

use std::{sync::mpsc, thread};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, receiver));
        }

        ThreadPool { workers, sender }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

// --snip--


struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker { id, thread }
    }
}

Ми внесли кілька невеликих і простих змін: передаємо receiver до Worker::new, а потім використовуємо його всередині замикання.

Коли ми намагаємося перевірити цей код, отримуємо таку помилку:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0382]: use of moved value: `receiver`
  --> src/lib.rs:26:42
   |
21 |         let (sender, receiver) = mpsc::channel();
   |                      -------- move occurs because `receiver` has type `std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
...
25 |         for id in 0..size {
   |         ----------------- inside of this loop
26 |             workers.push(Worker::new(id, receiver));
   |                                          ^^^^^^^^ value moved here, in previous iteration of loop
   |
note: consider changing this parameter type in method `new` to borrow instead if owning the value isn't necessary
  --> src/lib.rs:47:33
   |
47 |     fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
   |        --- in this method       ^^^^^^^^^^^^^^^^^^^ this parameter takes ownership of the value
help: consider moving the expression out of the loop so it is only moved once
   |
25 ~         let mut value = Worker::new(id, receiver);
26 ~         for id in 0..size {
27 ~             workers.push(value);
   |

For more information about this error, try `rustc --explain E0382`.
error: could not compile `hello` (lib) due to 1 previous error

Код намагається передати receiver кільком екземплярам Worker. Це не спрацює, як ви пам’ятаєте з Chapter 16: реалізація channel, яку надає Rust, є multiple producer, single consumer. Це означає, що ми не можемо просто клонувати споживчий кінець channel, щоб виправити цей код. Ми також не хочемо надсилати повідомлення кілька разів кільком споживачам; нам потрібен один список повідомлень із кількома екземплярами Worker, так щоб кожне повідомлення оброблялося один раз.

Крім того, забирати job із черги channel означає змінювати receiver, тому threads потрібен безпечний спосіб ділити receiver і змінювати його; інакше ми можемо отримати race conditions (як розглядалося в Chapter 16).

Згадайте thread-safe smart pointers, про які йшлося в Chapter 16: щоб розділяти ownership між кількома threads і дозволяти threads змінювати значення, нам потрібно використовувати Arc<Mutex<T>>. Тип Arc дасть змогу кільком екземплярам Worker володіти receiver, а Mutex забезпечить, що лише один Worker отримуватиме job із receiver за раз. Лістинг 21-18 показує зміни, які нам потрібно зробити.

use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};
// --snip--

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

// --snip--

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        // --snip--
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker { id, thread }
    }
}

У ThreadPool::new ми поміщаємо receiver в Arc і Mutex. Для кожного нового Worker ми клонуємо Arc, щоб збільшити лічильник посилань так, щоб екземпляри Worker могли спільно володіти receiver.

З цими змінами код компілюється! Ми рухаємося далі!

Реалізація методу execute

Нарешті давайте реалізуємо метод execute на ThreadPool. Ми також змінимо Job зі структури на псевдонім типу для trait object, який містить тип замикання, що його отримує execute. Як обговорювалося в розділі “Type Synonyms and Type Aliases” у Chapter 20, type aliases дають змогу зробити довгі типи коротшими для зручності використання. Подивіться на Лістинг 21-19.

use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

// --snip--

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

// --snip--

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker { id, thread }
    }
}

Після створення нового екземпляра Job за допомогою замикання, яке ми отримуємо в execute, ми надсилаємо цю job через sending end channel. Ми викликаємо unwrap на send на випадок, якщо надсилання не вдасться. Це може статися, якщо, наприклад, ми зупинимо виконання всіх наших threads, тобто receiving end припинить отримувати нові повідомлення. Наразі ми не можемо зупинити виконання наших threads: вони продовжують виконуватися, доки pool існує. Причина, чому ми використовуємо unwrap, полягає в тому, що ми знаємо, що випадок помилки не станеться, але компілятор цього не знає.

Але ми ще не зовсім завершили! У Worker наше замикання, яке передається до thread::spawn, досі лише посилається на receiving end channel. Натомість нам потрібно, щоб замикання безкінечно ітерувалося, запитуючи receiving end channel про job і запускаючи job, коли отримає її. Давайте внесемо зміну, показану в Лістингу 21-20, до Worker::new.

use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

// --snip--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let job = receiver.lock().unwrap().recv().unwrap();

                println!("Worker {id} got a job; executing.");

                job();
            }
        });

        Worker { id, thread }
    }
}

Тут ми спочатку викликаємо lock на receiver, щоб отримати mutex, а потім викликаємо unwrap, щоб викликати panic у разі будь-яких помилок. Отримання lock може завершитися невдачею, якщо mutex перебуває в poisoned стані, що може статися, якщо якийсь інший thread викликав panic, утримуючи lock, а не звільняючи його. У цій ситуації виклик unwrap, щоб цей thread викликав panic, є правильною дією. За бажанням ви можете змінити цей unwrap на expect із повідомленням про помилку, яке вам зрозуміле.

Якщо ми отримуємо lock на mutex, ми викликаємо recv, щоб отримати Job із channel. Остаточний unwrap також проходить повз будь-які помилки тут, які можуть виникнути, якщо thread, що утримує sender, завершив роботу, подібно до того, як метод send повертає Err, якщо receiver завершує роботу.

Виклик recv блокує, тож якщо job ще немає, поточний thread чекатиме, доки job не стане доступною. Mutex<T> забезпечує, що лише один thread Worker за раз намагається запросити job.

Наша thread pool тепер у робочому стані! Запустіть її за допомогою cargo run і зробіть кілька запитів:

$ cargo run
   Compiling hello v0.1.0 (file:///projects/hello)
warning: field `workers` is never read
 --> src/lib.rs:7:5
  |
6 | pub struct ThreadPool {
  |            ---------- field in this struct
7 |     workers: Vec<Worker>,
  |     ^^^^^^^
  |
  = note: `#[warn(dead_code)]` on by default

warning: fields `id` and `thread` are never read
  --> src/lib.rs:48:5
   |
47 | struct Worker {
   |        ------ fields in this struct
48 |     id: usize,
   |     ^^
49 |     thread: thread::JoinHandle<()>,
   |     ^^^^^^

warning: `hello` (lib) generated 2 warnings
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 4.91s
     Running `target/debug/hello`
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.

Успіх! Тепер у нас є thread pool, який виконує з’єднання asynchronously. Ніколи не створюється більше ніж чотири threads, тож наша система не буде перевантажена, якщо сервер отримає багато запитів. Якщо ми зробимо запит до /sleep, сервер зможе обслуговувати інші запити, змушуючи інший thread виконувати їх.

Примітка: Якщо ви відкриєте /sleep у кількох вікнах браузера одночасно, вони можуть завантажуватися по одному з інтервалами в п’ять секунд. Деякі web browsers послідовно виконують кілька екземплярів того самого запиту з міркувань кешування. Це обмеження не спричинене нашим web server.

Зараз гарний момент зупинитися й поміркувати, чим би відрізнявся код у Лістингах 21-18, 21-19 і 21-20, якби ми використовували future замість замикання для роботи, яку потрібно виконати. Які типи змінилися б? Як би відрізнялися сигнатури методів, якщо б відрізнялися взагалі? Які частини коду залишилися б такими самими?

Після вивчення циклу while let у Chapter 17 і Chapter 19 ви можете замислитися, чому ми не написали код thread Worker, як показано в Лістингу 21-21.

use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}
// --snip--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            while let Ok(job) = receiver.lock().unwrap().recv() {
                println!("Worker {id} got a job; executing.");

                job();
            }
        });

        Worker { id, thread }
    }
}

Цей код компілюється й запускається, але не дає бажаної поведінки threading: повільний запит усе одно змушуватиме інші запити чекати на обробку. Причина дещо тонка: структура Mutex не має публічного методу unlock, тому що ownership lock ґрунтується на lifetime MutexGuard<T> всередині LockResult<MutexGuard<T>>, який повертає метод lock. На етапі компіляції перевірник запозичень може забезпечити правило, що ресурс під захистом Mutex не можна отримати доступ, якщо ми не тримаємо lock. Однак ця реалізація також може призвести до того, що lock утримуватиметься довше, ніж задумано, якщо ми не зважатимемо на lifetime MutexGuard<T>.

Код у Лістингу 21-20, який використовує let job = receiver.lock().unwrap().recv().unwrap();, працює тому, що з let будь-які тимчасові значення, використані у виразі праворуч від знака рівності, негайно видаляються, коли оператор let завершується. Однак while letif let, і match) не видаляє тимчасові значення до кінця пов’язаного блоку. У Лістингу 21-21 lock залишається утримуваним протягом виклику job(), тобто інші екземпляри Worker не можуть отримувати jobs.