Asynchrone Verarbeitung
- Selbst heutige Mikrokontroller sind Mehrprozessorsystem
- Die parallele Ausführung von Code spart oft nicht nur Zeit sondern auch Energie
- An vielen Stellen kann auch ein Programm auf Daten warten, d.h. Aufgaben können aufgeschoben werden, wenn sie notwendig sind
- Rust nutzt hier für die
async
/await
Syntax- eine Funktion ist
async
hron - auf der Ergebnis einer asynchronen Funkten muss gewartet werden (
await
)
- eine Funktion ist
- In der Vergangenheit wurden nebenläufige Funktionen oft mit Threads des Betriebssystems umgesetzt
- relativ schwergewichtig sowohl für CPU als auch Arbeitsspeicher
- fast überall einsetzbar
- Durch die Integration in der Sprache, kann der Kompiler besser darüber entscheiden
- asynchrone Funktionen können Rust in einer Runtime ausgeführt werden
- Die Runtime entscheidet, wann welcher Funktion ausgeführt wird und verwaltet die Ausführung über mehrere Threads
- Eine klassische Ausführung über Threads mit Rusts selbst ist möglich
- Bekannte Vertreter für Runtimes sind
tokio
undasync-std
- async Traits werden zurzeit nicht von Rust nativ unterstützt, dafür kann aber das async-trait crate genutzt werden
Probleme von Nebenläufigkeit
Verwendung von gemeinsamen Speicher
- Nur lesen ist einfach
- Wird Speicher verändert müssen die Zugriffe verwaltet werden
- Verwaltung kostet Leistung
Lösung:
- Um dies besser zu implementiert, kommt uns der Borrow-Checker zu Hilfe
- Explizter Ausdrücke für veränderliche Speicherbereihe
- Borrow-Konzept markiert Bereiche die veränderlich/unveränderlich sind
- Shared-Nothing-Konzepte sollen bevorzugt werden, hierfür können bspw.
Channel
verwendet werden, um zwischen Funktionen Speicherbereiche zu "versenden" - Synchronisierungsobjekte wie
Mutex
oder Verwendung von Objekten, dieSend
undSync
Traits implementieren
Lebenszeit von Speicher
- Solange Speicher von Funktionen nur auf dem Stack stattfindet ist es einfacher
- Speicherbereiche müssen allokiert werden und ggf. ausgetauscht werden
- Mehrere getrennte Funktionen greifen unabhängig auf Speicher zu
Lösung:
- Referenzzähler für eine asynchrone Verarbeitung, da Nutzung von Objekten nicht mehr im Voraus planbar ist
- statische Lebenszeit für Objekte
Beispiel
use std::sync::mpsc;
use std::sync::mpsc::{Receiver, Sender};
use tokio::task;
use tokio::time::{sleep, Duration};
/// Produziert Werte sendet sie über den Channel
async fn async_producer(tx: Sender<u32>, number: u32) {
loop {
let value: u32 = rand::random::<u32>() % 1000;
println!("{:02}: Sende {}", number, value);
let res = tx.send(value);
if res.is_err() {
break;
}
let delay = value + 1000u32;
sleep(Duration::from_millis(delay.into())).await;
}
}
/// Konsumiert die Werte aus dem Channel. Nach `count` Werten wird die Ausführung beendet
async fn async_consumer(rx: Receiver<u32>, count: u32) {
for _ in 0..count {
let value = rx.recv().unwrap();
println!("Empfangen: {}", value);
}
}
/// Ausführung in Tokio Runtime.
///
/// ```
/// let rt = tokio::runtime::Runtime::new().unwrap();
/// rt.block_on(async {
/// // async code
/// });
/// ```
#[tokio::main]
async fn main() {
let (tx, rx): (Sender<u32>, Receiver<u32>) = mpsc::channel();
for num in 0..7 {
task::spawn(async_producer(tx.clone(), num.clone()));
}
task::spawn(async_consumer(rx, 10)).await.unwrap();
}