Asynchrone Verarbeitung

  • Selbst heutige Mikrokontroller sind Mehrprozessorsystem
  • Die parallele Ausführung von Code spart oft nicht nur Zeit sondern auch Energie
  • An vielen Stellen kann auch ein Programm auf Daten warten, d.h. Aufgaben können aufgeschoben werden, wenn sie notwendig sind
  • Rust nutzt hier für die async/await Syntax
    • eine Funktion ist asynchron
    • auf der Ergebnis einer asynchronen Funkten muss gewartet werden (await)
  • In der Vergangenheit wurden nebenläufige Funktionen oft mit Threads des Betriebssystems umgesetzt
    • relativ schwergewichtig sowohl für CPU als auch Arbeitsspeicher
    • fast überall einsetzbar
  • Durch die Integration in der Sprache, kann der Kompiler besser darüber entscheiden
  • asynchrone Funktionen können Rust in einer Runtime ausgeführt werden
    • Die Runtime entscheidet, wann welcher Funktion ausgeführt wird und verwaltet die Ausführung über mehrere Threads
    • Eine klassische Ausführung über Threads mit Rusts selbst ist möglich
    • Bekannte Vertreter für Runtimes sind tokio und async-std
  • async Traits werden zurzeit nicht von Rust nativ unterstützt, dafür kann aber das async-trait crate genutzt werden

Probleme von Nebenläufigkeit

Verwendung von gemeinsamen Speicher

  • Nur lesen ist einfach
  • Wird Speicher verändert müssen die Zugriffe verwaltet werden
  • Verwaltung kostet Leistung

Lösung:

  • Um dies besser zu implementiert, kommt uns der Borrow-Checker zu Hilfe
  • Explizter Ausdrücke für veränderliche Speicherbereihe
  • Borrow-Konzept markiert Bereiche die veränderlich/unveränderlich sind
  • Shared-Nothing-Konzepte sollen bevorzugt werden, hierfür können bspw. Channel verwendet werden, um zwischen Funktionen Speicherbereiche zu "versenden"
  • Synchronisierungsobjekte wie Mutex oder Verwendung von Objekten, die Send und Sync Traits implementieren

Lebenszeit von Speicher

  • Solange Speicher von Funktionen nur auf dem Stack stattfindet ist es einfacher
  • Speicherbereiche müssen allokiert werden und ggf. ausgetauscht werden
  • Mehrere getrennte Funktionen greifen unabhängig auf Speicher zu

Lösung:

  • Referenzzähler für eine asynchrone Verarbeitung, da Nutzung von Objekten nicht mehr im Voraus planbar ist
  • statische Lebenszeit für Objekte

Beispiel

use std::sync::mpsc;
use std::sync::mpsc::{Receiver, Sender};
use tokio::task;
use tokio::time::{sleep, Duration};

/// Produziert Werte sendet sie über den Channel
async fn async_producer(tx: Sender<u32>, number: u32) {
    loop {
        let value: u32 = rand::random::<u32>() % 1000;
        println!("{:02}: Sende {}", number, value);

        let res = tx.send(value);
        if res.is_err() {
            break;
        }

        let delay = value + 1000u32;
        sleep(Duration::from_millis(delay.into())).await;
    }
}

/// Konsumiert die Werte aus dem Channel. Nach `count` Werten wird die Ausführung beendet
async fn async_consumer(rx: Receiver<u32>, count: u32) {
    for _ in 0..count {
        let value = rx.recv().unwrap();
        println!("Empfangen: {}", value);
    }
}

/// Ausführung in Tokio Runtime.
///
/// ```
/// let rt = tokio::runtime::Runtime::new().unwrap();
/// rt.block_on(async {
///   // async code
/// });
/// ```
#[tokio::main]
async fn main() {
    let (tx, rx): (Sender<u32>, Receiver<u32>) = mpsc::channel();

    for num in 0..7 {
        task::spawn(async_producer(tx.clone(), num.clone()));
    }

    task::spawn(async_consumer(rx, 10)).await.unwrap();
}