Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

М’яке завершення роботи та очищення

Код у Listing 21-20 асинхронно відповідає на запити через використання пулу потоків, як ми й задумували. Ми отримуємо деякі попередження про поля workers, id і thread, які ми не використовуємо безпосереднім чином, що нагадує нам, що ми нічого не очищаємо. Коли ми використовуємо менш елегантний метод ctrl-C, щоб зупинити головний потік, усі інші потоки також зупиняються негайно, навіть якщо вони перебувають у середині обслуговування запиту.

Далі ми реалізуємо трейт Drop, щоб викликати join для кожного з потоків у пулі, щоб вони могли завершити запити, над якими працюють, перед закриттям. Потім ми реалізуємо спосіб повідомити потокам, що вони мають перестати приймати нові запити та завершити роботу. Щоб побачити цей код у дії, ми змінимо наш сервер так, щоб він приймав лише два запити перед м’яким завершенням роботи свого пулу потоків.

Одне, що варто помітити в міру того, як ми просуваємося: жодне з цього не впливає на частини коду, які обробляють виконання замикань, тож усе тут було б таким самим, якби ми використовували пул потоків для async runtime.

Реалізація трейтa Drop для ThreadPool

Почнімо з реалізації Drop для нашого пулу потоків. Коли пул буде скинуто, усі наші потоки мають приєднатися, щоб переконатися, що вони завершили свою роботу. Listing 21-22 показує першу спробу реалізації Drop; цей код ще не зовсім працюватиме.

use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            worker.thread.join().unwrap();
        }
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let job = receiver.lock().unwrap().recv().unwrap();

                println!("Worker {id} got a job; executing.");

                job();
            }
        });

        Worker { id, thread }
    }
}

Спочатку ми проходимося циклом по кожному з workers пулу потоків. Ми використовуємо &mut для цього, тому що self є змінним посиланням, і нам також потрібно мати змогу змінювати worker. Для кожного worker ми виводимо повідомлення про те, що цей конкретний екземпляр Worker завершує роботу, а потім викликаємо join для потоку цього екземпляра Worker. Якщо виклик join завершується помилкою, ми використовуємо unwrap, щоб Rust запанікував і перейшов до негнучкого завершення.

Ось помилка, яку ми отримуємо під час компіляції цього коду:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0507]: cannot move out of `worker.thread` which is behind a mutable reference
  --> src/lib.rs:52:13
   |
52 |             worker.thread.join().unwrap();
   |             ^^^^^^^^^^^^^ ------ `worker.thread` moved due to this method call
   |             |
   |             move occurs because `worker.thread` has type `JoinHandle<()>`, which does not implement the `Copy` trait
   |
note: `JoinHandle::<T>::join` takes ownership of the receiver `self`, which moves `worker.thread`
  --> /rustc/1159e78c4747b02ef996e55082b704c09b970588/library/std/src/thread/mod.rs:1921:17

For more information about this error, try `rustc --explain E0507`.
error: could not compile `hello` (lib) due to 1 previous error

Помилка каже нам, що ми не можемо викликати join, тому що ми маємо лише змінне запозичення кожного worker, а join забирає власність свого аргументу. Щоб розв’язати цю проблему, нам потрібно перемістити потік із екземпляра Worker, який володіє thread, щоб join міг спожити потік. Один зі способів зробити це — застосувати той самий підхід, який ми використали в Listing 18-15. Якби Worker містив Option<thread::JoinHandle<()>>, ми могли б викликати метод take для Option, щоб перемістити значення з варіанта Some і залишити на його місці варіант None. Іншими словами, Worker, який працює, мав би варіант Some у thread, а коли ми захотіли б очистити Worker, ми б замінили Some на None, щоб у Worker не було потоку для виконання.

Однак це сталося б лише тоді, коли Worker скидається. Натомість нам довелося б мати справу з Option<thread::JoinHandle<()>> у будь-якому місці, де ми звертаємося до worker.thread. Ідіоматичний Rust досить часто використовує Option, але коли ви виявляєте, що обгортаєте щось, що, як ви знаєте, завжди буде присутнє, в Option як обхідний шлях, як у цьому випадку, варто пошукати альтернативні підходи, щоб зробити ваш код чистішим і менш схильним до помилок.

У цьому випадку існує краща альтернатива: метод Vec::drain. Він приймає параметр діапазону, щоб указати, які елементи видалити з вектора, і повертає ітератор цих елементів. Передавання синтаксису діапазону .. видалить кожне значення з вектора.

Отже, нам потрібно оновити реалізацію drop для ThreadPool ось так:

#![allow(unused)]
fn main() {
use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in self.workers.drain(..) {
            println!("Shutting down worker {}", worker.id);

            worker.thread.join().unwrap();
        }
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let job = receiver.lock().unwrap().recv().unwrap();

                println!("Worker {id} got a job; executing.");

                job();
            }
        });

        Worker { id, thread }
    }
}
}

Це усуває помилку компілятора і не вимагає жодних інших змін у нашому коді. Зверніть увагу, що, оскільки drop може бути викликаний під час паніки, unwrap також може запанікувати й спричинити подвійну паніку, що негайно аварійно завершує програму та припиняє будь-яке очищення, яке триває. Для прикладної програми це нормально, але для production-коду це не рекомендується.

Сигналізація потокам, щоб вони перестали слухати завдання

Після всіх змін, які ми внесли, наш код компілюється без жодних попереджень. Однак погана новина полягає в тому, що цей код ще не працює так, як ми хочемо. Ключ у логіці в замиканнях, які виконуються потоками екземплярів Worker: зараз ми викликаємо join, але це не завершить роботу потоків, тому що вони безкінечно loop-ляться в пошуках завдань. Якщо ми спробуємо скинути наш ThreadPool із поточною реалізацією drop, головний потік буде заблокований назавжди, очікуючи на завершення першого потоку.

Щоб виправити цю проблему, нам потрібна зміна в реалізації drop для ThreadPool, а потім зміна в циклі Worker.

Спочатку ми змінимо реалізацію drop для ThreadPool, щоб явно скинути sender перед очікуванням завершення потоків. Listing 21-23 показує зміни до ThreadPool, щоб явно скинути sender. На відміну від потоку, тут нам дійсно потрібно використовувати Option, щоб мати змогу перемістити sender із ThreadPool за допомогою Option::take.

use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: Option<mpsc::Sender<Job>>,
}
// --snip--

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        // --snip--

        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool {
            workers,
            sender: Some(sender),
        }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.as_ref().unwrap().send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        drop(self.sender.take());

        for worker in self.workers.drain(..) {
            println!("Shutting down worker {}", worker.id);

            worker.thread.join().unwrap();
        }
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let job = receiver.lock().unwrap().recv().unwrap();

                println!("Worker {id} got a job; executing.");

                job();
            }
        });

        Worker { id, thread }
    }
}

Скидання sender закриває канал, що означає, що більше не буде надіслано повідомлень. Коли це станеться, усі виклики recv, які екземпляри Worker виконують у нескінченному циклі, повертатимуть помилку. У Listing 21-24 ми змінюємо цикл Worker, щоб у такому разі він м’яко виходив із циклу, що означає, що потоки завершать роботу, коли реалізація drop для ThreadPool викличе для них join.

use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: Option<mpsc::Sender<Job>>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool {
            workers,
            sender: Some(sender),
        }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.as_ref().unwrap().send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        drop(self.sender.take());

        for worker in self.workers.drain(..) {
            println!("Shutting down worker {}", worker.id);

            worker.thread.join().unwrap();
        }
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let message = receiver.lock().unwrap().recv();

                match message {
                    Ok(job) => {
                        println!("Worker {id} got a job; executing.");

                        job();
                    }
                    Err(_) => {
                        println!("Worker {id} disconnected; shutting down.");
                        break;
                    }
                }
            }
        });

        Worker { id, thread }
    }
}

Щоб побачити цей код у дії, давайте змінимо main, щоб він приймав лише два запити перед м’яким завершенням роботи сервера, як показано в Listing 21-25.

use hello::ThreadPool;
use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming().take(2) {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }

    println!("Shutting down.");
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}

Ви б не хотіли, щоб реальний вебсервер завершував роботу після обслуговування лише двох запитів. Цей код лише демонструє, що м’яке завершення роботи та очищення працюють.

Метод take визначено в трейті Iterator і обмежує ітерацію щонайбільше першими двома елементами. ThreadPool вийде з області видимості в кінці main, і буде виконано реалізацію drop.

Запустіть сервер за допомогою cargo run і зробіть три запити. Третій запит має завершитися помилкою, і у вашому терміналі ви маєте побачити вивід, подібний до такого:

$ cargo run
   Compiling hello v0.1.0 (file:///projects/hello)
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.41s
     Running `target/debug/hello`
Worker 0 got a job; executing.
Shutting down.
Shutting down worker 0
Worker 3 got a job; executing.
Worker 1 disconnected; shutting down.
Worker 2 disconnected; shutting down.
Worker 3 disconnected; shutting down.
Worker 0 disconnected; shutting down.
Shutting down worker 1
Shutting down worker 2
Shutting down worker 3

Ви можете побачити інший порядок Worker-ID та надрукованих повідомлень. Ми можемо зрозуміти, як працює цей код, із повідомлень: екземпляри Worker 0 і 3 отримали перші два запити. Сервер перестав приймати з’єднання після другого з’єднання, і реалізація Drop для ThreadPool починає виконуватися ще до того, як Worker 3 навіть починає свою роботу. Скидання sender від’єднує всі екземпляри Worker і повідомляє їм, що вони мають завершити роботу. Кожен екземпляр Worker друкує повідомлення, коли від’єднується, а потім пул потоків викликає join, щоб дочекатися завершення кожного потоку Worker.

Зверніть увагу на один цікавий аспект цього конкретного виконання: ThreadPool скинув sender, і перш ніж будь-який Worker отримав помилку, ми спробували приєднати Worker 0. Worker 0 ще не отримав помилку від recv, тож головний потік заблокувався, очікуючи завершення Worker 0. Тим часом Worker 3 отримав завдання, а потім усі потоки отримали помилку. Коли Worker 0 завершився, головний потік дочекався завершення решти екземплярів Worker. У цей момент вони всі вийшли зі своїх циклів і зупинилися.

Вітаємо! Ми завершили наш проєкт; тепер у нас є базовий вебсервер, який використовує пул потоків для асинхронної відповіді. Ми можемо виконати м’яке завершення роботи сервера, яке очищає всі потоки в пулі.

Ось повний код для довідки:

use hello::ThreadPool;
use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming().take(2) {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }

    println!("Shutting down.");
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}
use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: Option<mpsc::Sender<Job>>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool {
            workers,
            sender: Some(sender),
        }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.as_ref().unwrap().send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        drop(self.sender.take());

        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let message = receiver.lock().unwrap().recv();

                match message {
                    Ok(job) => {
                        println!("Worker {id} got a job; executing.");

                        job();
                    }
                    Err(_) => {
                        println!("Worker {id} disconnected; shutting down.");
                        break;
                    }
                }
            }
        });

        Worker {
            id,
            thread: Some(thread),
        }
    }
}

Ми могли б зробити тут ще більше! Якщо ви хочете продовжити вдосконалювати цей проєкт, ось кілька ідей:

  • Додайте більше документації до ThreadPool та його публічних методів.
  • Додайте тести функціональності бібліотеки.
  • Змініть виклики unwrap на надійнішу обробку помилок.
  • Використайте ThreadPool, щоб виконувати якусь іншу задачу, ніж обслуговування вебзапитів.
  • Знайдіть крейт пулу потоків на crates.io і реалізуйте подібний вебсервер, використовуючи замість цього крейт. Потім порівняйте його API та надійність із пулом потоків, який ми реалізували.

Підсумок

Чудова робота! Ви дійшли до кінця книги! Ми хочемо подякувати вам за те, що ви пройшли разом із нами цей огляд мови програмування Rust. Тепер ви готові реалізовувати власні проєкти Rust і допомагати з проєктами інших людей. Пам’ятайте, що існує привітна спільнота інших растацеанців (Rustaceans), які із задоволенням допоможуть вам із будь-якими викликами, з якими ви зіткнетеся у своїй подорожі Rust.