Zarifçe Kapatma ve Temizleme

Liste 20-20'deki kod, amaçladığımız gibi bir iş parçacığı havuzu kullanarak isteklere eşzamansız olarak yanıt veriyor. Doğrudan kullanmadığımız workers, id ve thread alanları hakkında bize hiçbir şeyi temizlemediğimizi hatırlatan bazı uyarılar alıyoruz. Ana iş parçacığını durdurmak için daha az zarif olan ctrl-c yöntemini kullandığımızda, bir isteği sunmanın ortasında olsalar bile diğer tüm iş parçacıkları da hemen durdurulur.

Şimdi, havuzdaki (thread) her bir iş parçacığına katılmak için Drop tanımını uygulayacağız, böylece onlar kapanmadan önce üzerinde çalıştıkları istekleri tamamlayabilirler. Ardından, ileti dizilerine yeni istekleri kabul etmeyi bırakmaları ve kapatmaları gerektiğini söylemenin bir yolunu uygulayacağız. Bu kodu çalışırken görmek için, iş parçacığı havuzunu zarif bir şekilde kapatmadan önce sunucumuzu yalnızca iki isteği kabul edecek şekilde değiştireceğiz.

ThreadPool'da Drop Tanımını Süreklemek

Hadi havuzumuz için Drop tanımını sürekleyelim. Her ne zaman havuz bırakılırsa, iş parçacıklarımızın tamamı işlerini bitirmelidir. Liste 20-22 bize Drop süreklemesinin ilk girişimini gösteriyor. Tabii bu kod henüz tam anlamıyla çalışmıyor.

Dosya adı: src/lib.rs

use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            worker.thread.join().unwrap();
        }
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let job = receiver.lock().unwrap().recv().unwrap();

            println!("Worker {id} got a job; executing.");

            job();
        });

        Worker { id, thread }
    }
}

Liste 20-22: Havuz alandan ayrıldığında her iş parçacığına girmek

İlk olarak, iş parçacığı havuzunun workers alanı arasında döngü yaparız. Bunun için &mut kullanıyoruz çünkü self değişken bir referanstır ve ayrıca worker'ı değiştirebiliyor olmamız gerekir. Her çalışan için, bu belirli çalışanın kapatıldığını söyleyen bir mesaj yazdırırız ve ardından o çalışanın iş parçacığına katılmayı çağırırız. Katılma çağrısı (join) başarısız olursa, Rust'ı paniğe sürüklemek ve uygunsuz bir kapatmaya gitmek için paketi açarız (unwrap).

Bu kodu derlediğimizde aldığımız hata:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0507]: cannot move out of `worker.thread` which is behind a mutable reference
  --> src/lib.rs:52:13
   |
52 |             worker.thread.join().unwrap();
   |             ^^^^^^^^^^^^^ move occurs because `worker.thread` has type `JoinHandle<()>`, which does not implement the `Copy` trait

For more information about this error, try `rustc --explain E0507`.
error: could not compile `hello` due to previous error

Hata bize, her bir çalışanın yalnızca değişken bir referansını almaya sahip olduğumuz ve join argümanının sahipliğini üstlendiği için join diyemeyeceğimizi söylüyor. Bu sorunu çözmek için, join'in thread'i tüketebilmesi için thread'i thread'in sahibi olan Worker örneğinin dışına taşımamız gerekiyor. Bunu Liste 17-15'te yaptık: Worker bunun yerine bir Option<thread::JoinHandle<()>> tutarsa, değeri Some değişkeninin dışına taşımak ve içinde bir None değişkeni bırakmak için Option üzerindeki take fonksiyonunu çağırabiliriz. Başka bir deyişle, çalışan bir Worker'ın iş parçacığında Some varyantı olacaktır ve bir Worker'ı temizlemek istediğimizde, Worker'ın çalıştıracak bir iş parçacığı olmaması için Some'yi None ile değiştiririz.

Dolayısıyla, Worker tanımını şu şekilde güncellemek istediğimizi biliyoruz:

Dosya adı: src/lib.rs

use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            worker.thread.join().unwrap();
        }
    }
}

struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let job = receiver.lock().unwrap().recv().unwrap();

            println!("Worker {id} got a job; executing.");

            job();
        });

        Worker { id, thread }
    }
}

Şimdi değişmesi gereken diğer yerleri bulmak için derleyiciye bakalım. Bu kodu kontrol ederken iki hata alıyoruz:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no method named `join` found for enum `Option` in the current scope
  --> src/lib.rs:52:27
   |
52 |             worker.thread.join().unwrap();
   |                           ^^^^ method not found in `Option<JoinHandle<()>>`

error[E0308]: mismatched types
  --> src/lib.rs:72:22
   |
72 |         Worker { id, thread }
   |                      ^^^^^^ expected enum `Option`, found struct `JoinHandle`
   |
   = note: expected enum `Option<JoinHandle<()>>`
            found struct `JoinHandle<_>`
help: try wrapping the expression in `Some`
   |
72 |         Worker { id, Some(thread) }
   |                      +++++      +

Some errors have detailed explanations: E0308, E0599.
For more information about an error, try `rustc --explain E0308`.
error: could not compile `hello` due to 2 previous errors

Worker::new'in sonundaki koda işaret eden ikinci hatayı ele alalım; yeni bir Worker oluşturduğumuzda, iş parçacığı (thread) değerini Some içine sarmamız gerekir. Bu hatayı düzeltmek için aşağıdaki değişiklikleri yapın:

Dosya adı: src/lib.rs

use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            worker.thread.join().unwrap();
        }
    }
}

struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        // --snip--

        let thread = thread::spawn(move || loop {
            let job = receiver.lock().unwrap().recv().unwrap();

            println!("Worker {id} got a job; executing.");

            job();
        });

        Worker {
            id,
            thread: Some(thread),
        }
    }
}

İlk hata Drop süreklemesindedir. thread'i worker'ın dışına taşımak için Option değerini alabilmek için take fonksiyonunu çağırmayı amaçladığımızdan daha önce bahsetmiştik.

Aşağıdaki değişiklikler bunu yapacaktır:

Dosya adı: src/lib.rs

use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let job = receiver.lock().unwrap().recv().unwrap();

            println!("Worker {id} got a job; executing.");

            job();
        });

        Worker {
            id,
            thread: Some(thread),
        }
    }
}

Bölüm 17'de söylendiği gibi, Option üzerindeki take fonksiyonu Some değişkenini çıkarır ve onun yerine None'u bırakır. Some'ı yok etmek ve ipliği almak için if let'i kullanıyoruz; sonra iplik üzerinde join'i çağırıyoruz. Bir işçinin iş parçacığı zaten None ise, işçinin iş parçacığını zaten temizlediğini biliyoruz, bu nedenle bu durumda hiçbir şey değişmeyecektir.

İşleri Dinlemeyi Durdurmak İçin İpliklere Sinyal Vermek

Yaptığımız tüm değişikliklerle kodumuz herhangi bir uyarı olmadan derleniyor. Ancak kötü haber şu ki, bu kod henüz istediğimiz gibi çalışmıyor. Anahtar, Worker örneklerinin iş parçacıkları tarafından çalıştırılan kapatmalardaki mantıktır: şu anda buna join diyoruz, ancak bu, iş aramak için sonsuza kadar döngü yaptıkları için iş parçacıklarını kapatmaz. Mevcut drop uygulamamızla ThreadPool'umuzu düşürmeye çalışırsak, ana iş parçacığı sonsuza kadar ilk iş parçacığının bitmesini bekleyecek. Bu sorunu çözmek için ThreadPool drop süreklemesinde bir değişikliğe ve ardından Worker döngüsünde (loop) bir değişikliğe ihtiyacımız olacak.

İlk olarak, ThreadPool drop süreklemesini, ileti dizilerinin bitmesini beklemeden önce göndereni açıkça bırakacak şekilde değiştireceğiz.

Liste 20-23, göndereni açıkça bırakmak (drop) için ThreadPool'daki değişiklikleri gösterir. Göndericiyi (sender) ThreadPool'un dışına taşıyabilmek için aynı Optionu kullanıyoruz ve iş parçacığında yaptığımız gibi tekniği alıyoruz:

Dosya adı: src/lib.rs

use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: Option<mpsc::Sender<Job>>,
}
// --snip--

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        // --snip--

        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool {
            workers,
            sender: Some(sender),
        }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.as_ref().unwrap().send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        drop(self.sender.take());

        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let job = receiver.lock().unwrap().recv().unwrap();

            println!("Worker {id} got a job; executing.");

            job();
        });

        Worker {
            id,
            thread: Some(thread),
        }
    }
}

Liste 20-23: Çalışan iş parçacıklarına katılmadan önce sender'ı açıkça bırakın

sender'ı bırakmak, kanalı kapatır, bu da daha fazla mesaj gönderilmeyeceğini gösterir. Bu olduğunda, işçilerin sonsuz döngüde yaptığı tüm recv çağrıları bir hata döndürür. Liste 20-24'te, bu durumda döngüden zarif bir şekilde çıkmak için Worker döngüsünü değiştiriyoruz; bu, ThreadPool drop süreklemesi, join çağrısı yaptığında iş parçacıklarının biteceği anlamına geliyor.

Dosya adı: src/lib.rs

use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: Option<mpsc::Sender<Job>>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool {
            workers,
            sender: Some(sender),
        }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.as_ref().unwrap().send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        drop(self.sender.take());

        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            match receiver.lock().unwrap().recv() {
                Ok(job) => {
                    println!("Worker {id} got a job; executing.");

                    job();
                }
                Err(_) => {
                    println!("Worker {id} disconnected; shutting down.");
                    break;
                }
            }
        });

        Worker {
            id,
            thread: Some(thread),
        }
    }
}

Liste 20-24: "recv" bir hata döndürdüğünde açıkça döngüden çıkar

Bu kodu çalışırken görmek için, Liste 20-25'te gösterildiği gibi, sunucuyu düzgün bir şekilde kapatmadan önce main'i yalnızca iki isteği kabul edecek şekilde değiştirelim.

Dosya adı: src/main.rs

use hello::ThreadPool;
use std::fs;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;
use std::thread;
use std::time::Duration;

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming().take(2) {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }

    println!("Shutting down.");
}

fn handle_connection(mut stream: TcpStream) {
    let mut buffer = [0; 1024];
    stream.read(&mut buffer).unwrap();

    let get = b"GET / HTTP/1.1\r\n";
    let sleep = b"GET /sleep HTTP/1.1\r\n";

    let (status_line, filename) = if buffer.starts_with(get) {
        ("HTTP/1.1 200 OK", "hello.html")
    } else if buffer.starts_with(sleep) {
        thread::sleep(Duration::from_secs(5));
        ("HTTP/1.1 200 OK", "hello.html")
    } else {
        ("HTTP/1.1 404 NOT FOUND", "404.html")
    };

    let contents = fs::read_to_string(filename).unwrap();

    let response = format!(
        "{}\r\nContent-Length: {}\r\n\r\n{}",
        status_line,
        contents.len(),
        contents
    );

    stream.write_all(response.as_bytes()).unwrap();
    stream.flush().unwrap();
}

Liste 20-25: Döngüden çıkıp iki istek sunduktan sonra sunucuyu kapatır

Gerçek dünya uygulamasında bir web sunucusunun yalnızca iki istek sunduktan sonra kapanmasını istemezsiniz. Bu kod, yalnızca zarif kapatma ve temizlemenin çalışır durumda olduğunu gösterir.

take metodu, Iterator tanımında tanımlanır ve yinelemeyi en fazla ilk iki öğeyle sınırlar. ThreadPool, main sonunda kapsam dışına çıkacak ve drop süreklemesi çalışacaktır.

Sunucuyu cargo run ile başlatın ve üç istekte bulunun. Üçüncü istek hata vermeli ve uçbiriminizde buna benzer bir çıktı görmelisiniz:

$ cargo run
   Compiling hello v0.1.0 (file:///projects/hello)
    Finished dev [unoptimized + debuginfo] target(s) in 1.0s
     Running `target/debug/hello`
Worker 0 got a job; executing.
Shutting down.
Shutting down worker 0
Worker 3 got a job; executing.
Worker 1 disconnected; shutting down.
Worker 2 disconnected; shutting down.
Worker 3 disconnected; shutting down.
Worker 0 disconnected; shutting down.
Shutting down worker 1
Shutting down worker 2
Shutting down worker 3

Farklı bir çalışan sıralaması ve yazdırılan mesajlar görebilirsiniz. Bu kodun nasıl çalıştığını mesajlardan görebiliriz: 0 ve 3 numaralı işçiler ilk iki isteği aldı. Sunucu, ikinci bağlantıdan sonra bağlantıları kabul etmeyi durdurdu ve ThreadPool'daki Drop süreklemesi, işçi 3 daha işine başlamadan önce yürütülmeye başladı. sender'ı bırakmak, tüm çalışanların bağlantısını keser ve onlara kapatmalarını söyler.

Çalışanların her biri, bağlantıyı kestiklerinde bir mesaj yazdırır ve ardından iş parçacığı havuzu, her bir çalışan iş parçacığının bitmesini beklemek için birleştirme çağırır.

Bu uygulamanın ilginç bir yönüne dikkat edin: ThreadPool sender'ı bıraktı (drop) ve herhangi bir çalışan bir hata almadan önce, işçi 0'a girmeye çalıştık.

İşçi 0, recv'den henüz bir hata almamıştı, bu nedenle ana iş parçacığı, işçi 0'ı beklemeyi engelledi. bitirmek için. Bu arada, işçi 3 bir iş aldı ve ardından tüm iş parçacıkları bir hata aldı. İşçi 0 bittiğinde, ana iş parçacığı diğer işçilerin bitirmesini bekledi. Bu noktada, hepsi döngülerinden çıkmış ve durmuşlardı. Tebrikler! Artık projemizi tamamladık; zaman uyumsuz olarak yanıt vermek için bir iş parçacığı havuzu kullanan temel bir web sunucumuz var. Havuzdaki tüm iş parçacıklarını temizleyen sunucunun zarif bir şekilde kapatılmasını gerçekleştirebiliyoruz.

Referans için tam kod:

Dosya adı: src/main.rs

use hello::ThreadPool;
use std::fs;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;
use std::thread;
use std::time::Duration;

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming().take(2) {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }

    println!("Shutting down.");
}

fn handle_connection(mut stream: TcpStream) {
    let mut buffer = [0; 1024];
    stream.read(&mut buffer).unwrap();

    let get = b"GET / HTTP/1.1\r\n";
    let sleep = b"GET /sleep HTTP/1.1\r\n";

    let (status_line, filename) = if buffer.starts_with(get) {
        ("HTTP/1.1 200 OK", "hello.html")
    } else if buffer.starts_with(sleep) {
        thread::sleep(Duration::from_secs(5));
        ("HTTP/1.1 200 OK", "hello.html")
    } else {
        ("HTTP/1.1 404 NOT FOUND", "404.html")
    };

    let contents = fs::read_to_string(filename).unwrap();

    let response = format!(
        "{}\r\nContent-Length: {}\r\n\r\n{}",
        status_line,
        contents.len(),
        contents
    );

    stream.write_all(response.as_bytes()).unwrap();
    stream.flush().unwrap();
}

Dosya adı: src/lib.rs

use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: Option<mpsc::Sender<Job>>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender: Some(sender) }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.as_ref().unwrap().send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        drop(self.sender.take());

        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let message = receiver.lock().unwrap().recv();

            match message {
                Ok(job) => {
                    println!("Worker {id} got a job; executing.");

                    job();
                }
                Err(_) => {
                    println!("Worker {id} disconnected; shutting down.");
                    break;
                }
            }
        });

        Worker {
            id,
            thread: Some(thread),
        }
    }
}

Daha fazlasını da yapabilirdik! Eğer bu projeyi geliştirmeye devam etmek istiyorsanız, burada size yardımcı olacak bazı fikirler var:

  • ThreadPool'a ve genel yöntemlerine daha fazla belge ekleyin.
  • Kütüphanenin işlevselliğine ilişkin testler ekleyin
  • Daha kararlı hata işleme için fonksiyon çağrılarına unwrap ekleyin.
  • Web isteklerini sunmaktan başka bir görevi gerçekleştirmek için ThreadPool'u kullanın.
  • crates.io'da iş parçacığı havuzu arayın ve yaptığımız web sunucusunu bulduğunuz kasayda sürekleyin. Daha sonra süreklediğimizle arasındaki API kararlılığını karşılaştırın.

Özet

Bravo! Kitabın sonuna geldiniz! Rust'un bu turuna katıldığınız için size teşekkür etmek istiyoruz. Artık kendi Rust projelerinizi uygulamaya ve diğer insanların projelerine yardım etmeye hazırsınız. Rust yolculuğunuzda karşılaştığınız her türlü zorlukta size yardım etmeyi sevecek diğer Rustseverlerden oluşan hoş bir topluluk olduğunu unutmayın.