İş Parçacıkları Arasında Veri Aktarmak için Mesaj Geçişini Kullanma

Güvenli eşzamanlılık sağlamak için giderek daha popüler hale gelen bir yaklaşım, iş parçacıklarının veya aktörlerin birbirlerine veri içeren mesajlar göndererek iletişim kurduğu mesaj geçişidir. İşte Go dil dokümantasyonundan bir slogan: “Belleği paylaşarak iletişim kurmayın; bunun yerine iletişim kurarak belleği paylaşın.”

Mesaj gönderme eşzamanlılığını gerçekleştirmek için Rust'ın standart kütüphanesi bir kanal süreklemesi sağlar. Kanal, verilerin bir iş parçacığından diğerine gönderildiği genel bir programlama kavramıdır.

Programlamada bir kanalı, bir dere veya nehir gibi yönlü bir su kanalı gibi düşünebilirsiniz. Bir nehre lastik ördek gibi bir şey koyarsanız, su yolunun sonuna kadar aşağı doğru hareket edecektir.

Bir kanalın iki yarısı vardır: bir verici ve bir alıcı. Verici yarı, nehre lastik ördek koyduğunuz yukarı akış konumudur ve alıcı yarı, lastik ördeğin aşağı akışta sona erdiği yerdir. Kodunuzun bir kısmı göndermek istediğiniz verilerle vericideki yöntemleri çağırır ve başka bir kısmı da gelen mesajlar için alıcı ucunu kontrol eder. Verici veya alıcı yarısından herhangi biri düşerse bir kanalın kapalı olduğu söylenir.

Burada, değerler üreten ve bunları bir kanaldan gönderen bir iş parçacığına ve değerleri alıp yazdıracak başka bir iş parçacığına sahip bir program üzerinde çalışacağız. Özelliği göstermek için bir kanal kullanarak iş parçacıkları arasında basit değerler göndereceğiz. Tekniğe aşina olduktan sonra, sohbet sistemi veya birçok iş parçacığının bir hesaplamanın parçalarını gerçekleştirdiği ve parçaları sonuçları toplayan bir iş parçacığına gönderdiği bir sistem gibi birbirleri arasında iletişim kurması gereken herhangi bir iş parçacığı için kanalları kullanabilirsiniz.

İlk olarak, Liste 16-6'da bir kanal oluşturacağız ancak onunla hiçbir şey yapmayacağız. Bunun henüz derlenmeyeceğini unutmayın çünkü Rust kanal üzerinden ne tür değerler göndermek istediğimizi söyleyemez.

Dosya adı: src/main.rs

use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();
}

Liste 16-6: Bir kanal oluşturma ve iki yarıyı tx ve rx olarak atama

mpsc::channel fonksiyonunu kullanarak yeni bir kanal oluşturuyoruz; mpsc çoklu üretici, tekli tüketici anlamına geliyor. Kısacası, Rust'ın standart kütüphanesinin kanalları sürekleme şekli, bir kanalın değer üreten birden fazla gönderen uca sahip olabileceği, ancak bu değerleri tüketen yalnızca bir alıcı uca sahip olabileceği anlamına gelir. Birden fazla akarsuyun birlikte büyük bir nehre aktığını düşünün: akarsulardan herhangi birine gönderilen her şey sonunda tek bir nehirde son bulacaktır. Şimdilik tek bir üretici ile başlayacağız, ancak bu örneği çalıştırdığımızda birden fazla üretici ekleyeceğiz.

mpsc::channel fonksiyonu bir tuple döndürür, ilk elemanı gönderen uç-verici- ve ikinci elemanı alan uç-alıcıdır. tx ve rx kısaltmaları geleneksel olarak birçok alanda sırasıyla verici ve alıcı için kullanılır, bu nedenle değişkenlerimizi her bir ucu belirtmek için bu şekilde adlandırıyoruz. tuple'ları yıkıma uğratan bir kalıp ile let ifade yapısını kullanıyoruz; let ifade yapılarında kalıp kullanımı ve yıkım konusunu Bölüm 18'de tartışacağız. Şimdilik, let deyimini bu şekilde kullanmanın mpsc::channel tarafından döndürülen tuple parçalarını ayıklamak için uygun bir yaklaşım olduğunu bilin.

İletim ucunu oluşturulmuş bir iş parçacığına taşıyalım ve bir dize göndermesini sağlayalım, böylece oluşturulmuş iş parçacığı Liste 16-7'de gösterildiği gibi ana iş parçacığı ile iletişim kurar. Bu, nehrin yukarısına bir lastik ördek koymak veya bir iş parçacığından diğerine bir sohbet mesajı göndermek gibidir.

Dosya adı: src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });
}

Liste 16-7: txi doğmuş bir iş parçacığına taşıma ve “hello” gönderme

Yine, yeni bir iş parçacığı oluşturmak için thread::spawn kullanıyoruz ve ardından tx'i kapanışa taşımak için move kullanıyoruz, böylece oluşturulan iş parçacığı tx'e sahip oluyor. Ortaya çıkan iş parçacığının kanal üzerinden mesaj gönderebilmesi için vericiye sahip olması gerekir. Verici, göndermek istediğimiz değeri alan bir send metoda sahiptir. send metodu Result<T, E> türü döndürür, bu nedenle alıcı zaten bırakılmışsa ve değer gönderilecek bir yer yoksa, send metodu bir hata döndürür. Bu örnekte, hata durumunda paniklemek için unwrap'i çağırıyoruz. Ancak gerçek bir süreklemede bunu düzgün bir şekilde ele alırız: düzgün hata işleme stratejilerini gözden geçirmek için Bölüm 9'a dönün.

Liste 16-8'de, ana iş parçacığındaki alıcıdan değeri alacağız. Bu, nehrin sonundaki sudan lastik ördeği almak veya bir sohbet mesajı almak gibidir.

Dosya adı: src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });

    let received = rx.recv().unwrap();
    println!("Got: {}", received);
}

Liste 16-8: Ana iş parçacığında “hi” değerinin alınması ve yazdırılması

Alıcının iki faydalı yöntemi vardır: recv ve try_recv. Ana iş parçacığının çalışmasını engelleyecek ve kanaldan bir değer gönderilinceye kadar bekleyecek olan recv'yi kullanıyoruz. Bir değer gönderildiğinde, recv bu değeri Result<T, E> olarak döndürür. Verici kapandığında, recv daha fazla değer gelmeyeceğini belirtmek için bir hata döndürecektir.

try_recv yöntemi bloke olmaz, ancak bunun yerine hemen bir Result<T, E> döndürür: mevcutsa bir mesaj içeren bir Ok değeri ve bu sefer herhangi bir mesaj yoksa bir Err değeri. Bu iş parçacığının mesajları beklerken yapacak başka işleri varsa try_recv'i kullanmak yararlıdır: try_recv'i sık sık çağıran, varsa bir mesajı işleyen ve aksi takdirde tekrar kontrol edene kadar kısa bir süre başka işler yapan bir döngü yazabiliriz.

Bu örnekte basitlik için recv kullandık; ana iş parçacığının mesajları beklemek dışında yapacağı başka bir işimiz yok, bu nedenle ana iş parçacığını engellemek uygundur.

Liste 16-8'deki kodu çalıştırdığımızda, ana iş parçacığından yazdırılan değeri göreceğiz:

Got: hi

Mükemmel!

Kanallar ve Sahiplik Transferi

Sahiplik kuralları mesaj göndermede hayati bir rol oynar çünkü güvenli, eşzamanlı kod yazmanıza yardımcı olurlar. Eşzamanlı programlamada hataları önlemek, Rust programlarınız boyunca sahiplik hakkında düşünmenin avantajıdır. Kanalların ve sahipliğin sorunları önlemek için nasıl birlikte çalıştığını göstermek için bir deney yapalım: kanaldan aşağı gönderdikten sonra ortaya çıkan iş parçacığında bir val değeri kullanmaya çalışacağız. Bu koda neden izin verilmediğini görmek için Liste 16-9'daki kodu derlemeyi deneyin:

Dosya adı: src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
        println!("val is {}", val);
    });

    let received = rx.recv().unwrap();
    println!("Got: {}", received);
}

Liste 16-9: Kanaldan gönderdikten sonra val kullanmayı denemek

Burada, tx.send aracılığıyla kanala gönderdikten sonra val'ı yazdırmayı deniyoruz. Buna izin vermek kötü bir fikir olacaktır: değer başka bir iş parçacığına gönderildikten sonra, biz değeri tekrar kullanmaya çalışmadan önce o iş parçacığı değeri değiştirebilir veya düşürebilir. Potansiyel olarak, diğer iş parçacığının değişiklikleri tutarsız veya var olmayan veriler nedeniyle hatalara veya beklenmedik sonuçlara neden olabilir. Ancak, Liste 16-9'daki kodu derlemeye çalışırsak Rust bize bir hata verir:

$ cargo run
   Compiling message-passing v0.1.0 (file:///projects/message-passing)
error[E0382]: borrow of moved value: `val`
  --> src/main.rs:10:31
   |
8  |         let val = String::from("hi");
   |             --- move occurs because `val` has type `String`, which does not implement the `Copy` trait
9  |         tx.send(val).unwrap();
   |                 --- value moved here
10 |         println!("val is {}", val);
   |                               ^^^ value borrowed here after move
   |
   = note: this error originates in the macro `$crate::format_args_nl` (in Nightly builds, run with -Z macro-backtrace for more info)

For more information about this error, try `rustc --explain E0382`.
error: could not compile `message-passing` due to previous error

Eşzamanlılık hatamız bir derleme zamanı hatasına neden oldu. send fonksiyonu parametresinin sahipliğini alır ve değer taşındığında alıcı onun sahipliğini alır. Bu, değeri gönderdikten sonra yanlışlıkla tekrar kullanmamızı engeller; sahiplik sistemi her şeyin yolunda olup olmadığını kontrol eder.

Birden Fazla Değer Gönderme ve Alıcının Beklediğini Görme

Liste 16-8'deki kod derlendi ve çalıştırıldı, ancak bize iki ayrı iş parçacığının kanal üzerinden birbiriyle konuştuğunu açıkça göstermedi. Liste 16-10'da, Liste 16-8'deki kodun eşzamanlı olarak çalıştığını kanıtlayacak bazı değişiklikler yaptık: ortaya çıkan iş parçacığı artık birden fazla mesaj gönderecek ve her mesaj arasında bir saniye duracak.

Dosya adı: src/main.rs

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {}", received);
    }
}

Liste 16-10: Birden fazla mesaj gönderme ve her biri arasında duraklama

Bu kez, ortaya çıkan iş parçacığı ana iş parçacığına göndermek istediğimiz dizelerden oluşan bir vektöre sahiptir. Bunların üzerinde yineleme yaparak her birini ayrı ayrı gönderiyoruz ve thread::sleep fonksiyonunu 1 saniyelik bir Duration değeriyle çağırarak her biri arasında duraklatıyoruz.

Ana iş parçacığında, artık recv fonksiyonunu açıkça çağırmıyoruz: bunun yerine, rx'i yineleyici olarak ele alıyoruz. Alınan her değer için onu yazdırıyoruz. Kanal kapatıldığında, yineleme sona erecektir.

Liste 16-10'daki kodu çalıştırdığınızda, her satır arasında 1 saniyelik bir duraklama ile aşağıdaki çıktıyı görmelisiniz:

Got: hi
Got: from
Got: the
Got: thread

Ana iş parçacığındaki for döngüsünde duraklatan veya geciktiren herhangi bir kodumuz olmadığından, ana iş parçacığının ortaya çıkan iş parçacığından değer almayı beklediğini söyleyebiliriz.

Vericiyi Klonlayarak Birden Fazla Üretici Oluşturma

Daha önce mpsc'nin çoklu üretici, tekli tüketici için kullanılan bir kısaltma olduğundan bahsetmiştik. Şimdi mpsc'yi kullanalım ve hepsi aynı alıcıya değer gönderen birden fazla iş parçacığı oluşturmak için Liste 16-10'daki kodu genişletelim. Bunu, Liste 16-11'de gösterildiği gibi vericiyi klonlayarak yapabiliriz:

Dosya adı: src/main.rs

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    // --snip--

    let (tx, rx) = mpsc::channel();

    let tx1 = tx.clone();
    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx1.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    thread::spawn(move || {
        let vals = vec![
            String::from("more"),
            String::from("messages"),
            String::from("for"),
            String::from("you"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {}", received);
    }

    // --snip--
}

Liste 16-11: Birden fazla üreticiden birden fazla mesaj gönderme

Bu kez, ilk iş parçacığını oluşturmadan önce, vericide clone çağrısı yapıyoruz. Bu bize ilk iş parçacığına aktarabileceğimiz yeni bir verici verecektir. Orijinal vericiyi ikinci bir iş parçacığına aktarırız. Bu bize her biri bir alıcıya farklı mesajlar gönderen iki iş parçacığı verir.

Kodu çalıştırdığınızda, çıktınız aşağıdaki gibi görünmelidir:

Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you

Sisteminize bağlı olarak değerleri farklı bir sırada görebilirsiniz. Eşzamanlılığı zor olduğu kadar ilginç kılan da budur. Thread::sleep ile denemeler yaparsanız, farklı iş parçacıklarında farklı değerler verirseniz, her çalıştırma daha belirsiz olacak ve her seferinde farklı çıktılar oluşturacaktır.

Kanalların nasıl çalıştığına baktığımıza göre, şimdi farklı bir eşzamanlılık yöntemine bakalım.