class: title # Asynchrony in Rust ## Scott Rixner and Alan Cox --- layout: true --- ## OS Threads * OS threads are **heavyweight**. * Each thread has its own stack (typically 2MB). * Context switches involve OS overhead. * **Good for CPU-bound tasks:** Each thread can run on a separate core. * **Bad for IO-bound tasks:** Most threads sit idle waiting for IO. * You don't want expensive resources tied up waiting. --- ## Rust Asynchronous Features **Rust provides the building blocks for asynchronous execution** * `std::future::Future` trait: the core abstraction. * `async` keyword: marks functions (and blocks) that return futures. * `.await` operator: yields control and waits for completion of a future. This allows you to write asynchronous code, but you need an async runtime library (e.g., `tokio`, `async-std`, `smol`) to actually run it! --- ## Writing Async Functions Use `async fn` which returns a `Future` that must be polled to run. ```rust async fn fetch_data(url: &str) -> String { // Actually returns impl Future
let response = http_get(url).await; process(response).await } // Must .await (or hand to a runtime) to actually execute the async function. let result = fetch_data("https://api.example.com").await; ``` * Inside an `async` block or function, use `.await` to wait for a future. * `.await` yields control to the runtime until the future is ready. --- ## Rust Futures are "Lazy" ```rust let future = http_get("https://api.example.com"); // Does NOTHING let result = future.await; // Runs NOW ``` * Rust futures are lazy: they do nothing until driven. * The function body **does not run** until the future is polled. * Dropping a Rust future before completion cancels it. * Compare to JavaScript Promises, which execute immediately on creation. --- ## The `Future` and `Wake` Traits ```rust enum Poll
{ Ready(T), Pending, } trait Future { type Output; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll
; } trait Wake { fn wake(self: Arc
); } ``` * `poll` returns `Ready` when the future is done, or `Pending` if it needs to wait. * Context includes a waker that the future uses to signal when to poll it again. * `Pin` is a wrapper around a pointer whose referent cannot be moved. --- class: line-numbers ## A Future That Counts Down ```rust use std::sync::Arc; use std::task::{Context, Poll, Wake, Waker}; use std::thread; use std::time::Duration; use std::pin::pin; struct CountdownFuture { remaining: u32 } impl Future for CountdownFuture { type Output = String; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll
{ if self.remaining == 0 { return Poll::Ready("Boom!".into()); } self.remaining -= 1; let waker = cx.waker().clone(); // create clone of waker for the timer thread thread::spawn(move || { // a real system would not spawn a thread per timer thread::sleep(Duration::from_secs(1)); waker.wake(); // tell the driver: re-poll me! }); Poll::Pending } } ``` --- class: line-numbers ## A Driver That Blocks ```rust struct ThreadWaker(thread::Thread); impl Wake for ThreadWaker { fn wake(self: Arc
) { self.0.unpark(); } // resume the sleeping driver thread } fn drive
(future: F) -> F::Output { let waker = Waker::from(Arc::new(ThreadWaker(thread::current()))); let mut cx = Context::from_waker(&waker); let mut pinned = pin!(future); // pin: about to poll, future can no longer move loop { match pinned.as_mut().poll(&mut cx) { // as_mut borrows the pinned future for polling Poll::Ready(val) => return val, Poll::Pending => thread::park(), // sleep until ThreadWaker::wake() → unpark() // safe: unpark() deposits a token if it arrives first } } } fn main() { let result = drive(CountdownFuture { remaining: 3 }); // poll 1: remaining → 2, spawn timer → Pending, park // poll 2: remaining → 1, spawn timer → Pending, park // poll 3: remaining → 0, spawn timer → Pending, park ← timer needed to trigger poll 4 // poll 4: remaining == 0 → Ready println!("{}", result); } ``` --- ## The Sync/Async Boundary ```rust fn main() { let future = CountdownFuture { remaining: 3}; // future.await - compile time error! drive(future); } ``` * `main` is synchronous, you can't `.await` inside it. * Something has to call `poll` in a loop. * That's exactly what our `drive` function does. --- class: middle ## Discussion: Async Runtimes Why does Rust only provide building blocks for asynchronous execution? --- ## From `poll` to `.await` * `async fn` is compiled to a state machine implementing `Future`. * That state machine calls `poll` at each internal `.await`. * If `poll` returns `Pending`, saves state and yields to the runtime. * If `poll` returns `Ready`, continues execution of the async fn. * The waker is the signal path from I/O completion back to the scheduler. * When I/O is ready, the waker's `wake` method is called. * Upon "wake", the runtime re-polls the top-level future, which then polls any futures that it is "awaiting" on (and so on). --- ## Async State Machines ```rust async fn fetch_data() -> String { let response = http_get("https://api.example.com").await; process(response).await } ``` The compiler transforms this into a state machine: ```rust enum FetchDataFuture { Start, WaitingForHttp(HttpFuture), WaitingForProcess(ProcessFuture), Done, } ``` * The same pattern we wrote by hand in `poll` for `CountdownFuture`. * States store local variables and the current execution point. --- ## From `drive` to a Full Runtime `drive` is a working single-future runtime, a complete async runtime must: * Scale to handle many concurrent tasks, not just one future. * Provide I/O functions that integrate with the runtime. * Use an I/O driver (epoll/kqueue) to wake tasks when I/O is ready. * Run ready tasks in parallel across a thread pool. * Reschedule tasks on that thread pool via a work queue. --- ## Real Async Runtimes * `async-std`: Models its API closely on the Rust standard library. * `smol`: Minimal runtime that composes small, reusable async primitives. * `tokio`: Most widely used, full-featured runtime. --- ## Using Tokio Use `#[tokio::main]` to enter async code from `main`: ```rust #[tokio::main] async fn main() { let result = fetch_data("https://api.example.com").await; println!("{}", result); } ``` * `#[tokio::main]` is a macro that wraps the body in `block_on`. * `block_on` is effectively `drive`, with Tokio's full runtime underneath. --- ## Tasks in Tokio * Exist as compiler-generated state machines from `async` code. * Share threads cooperatively. * Tasks do not have stacks of their own. * Only one task runs on a thread at a time. * Release the thread when `.await` polling returns `Pending`. * Resume on any thread in the pool when their waker is called. --- ## `tokio::spawn` vs `thread::spawn` ```rust // Closure runs on an OS thread: has its own stack, OS schedules it let thd = thread::spawn(|| fetch_data_sync()); // Future runs as a task: a state machine, Tokio schedules it let tsk = tokio::spawn(async { fetch_data().await }); // Both return handles you can wait on: let result = thd.join().unwrap(); // threads let result = tsk.await.unwrap(); // tasks ``` * Threads can block freely, the OS will context switch. * Tasks must not block, they'd freeze the worker thread. * For blocking or CPU-bound work, use `spawn_blocking`. * Takes a closure, not a future. * Returns a future that you can `.await` from async code. --- ## Tokio's Scheduler * Rust's threads are 1:1 (one Rust thread per OS thread). * Tokio multiplexes async tasks across a worker thread pool (M:N threading). * Each tokio worker thread runs the same event loop: * Pick a ready task from its local queue or work-steal from another thread. * Poll task once. * If `poll` returns `Pending`, it waits until its waker re-enqueues it. * If `poll` returns `Ready`, the task's result is returned to its "parent". * Scheduling is **cooperative**, not preemptive. * Tasks voluntarily yield at `.await` points. * The runtime cannot interrupt a running task mid-execution. * An additional thread waits for I/O events and wakes tasks when ready. --- ## Each Task is Sequential ```rust let a = fetch_a().await; // b doesn't start until a finishes let b = fetch_b().await; ``` * Within a task, futures do nothing until polled. * Each `.await` suspends the task until that future is `Ready`. * No other work in this task happens in the meantime. --- ## Concurrent Futures: `join!` and `select!` Tokio provides macros to poll multiple futures within a single task. ```rust // Poll both until both complete, like JavaScript's Promise.all() let (a, b) = tokio::join!(fetch_a(), fetch_b()); // Poll both until one completes, then cancel the other, like JavaScript's Promise.race() tokio::select! { result = fetch_a() => println!("a: {}", result), result = fetch_b() => println!("b: {}", result), } ``` * `join!` runs futures concurrently and waits for all to complete. * `select!` runs futures concurrently until the first one completes. * Macros expand to a single state machine that polls all of the futures. --- class: middle ## Discussion: Tokio What features or design choices do you think make Tokio popular? Are there any downsides to using it? --- ## Lifetimes and `.await` Local variables live **inside** the state machine and persist across `.await` points. ```rust async fn example() { let data = vec![1, 2, 3]; some_async_fn().await; // task suspends, "data" stays in the state machine println!("{:?}", data); // "data" is still there when the task resumes } ``` A spawned task cannot borrow from its spawner, it must own its data or borrow data that lives forever. ```rust tokio::spawn(async { println!("{:?}", data); // ERROR: data may not live long enough }); tokio::spawn(async move { println!("{:?}", data); // OK: data moved into the task's state machine }); ``` --- ## Async State Machines: `Pin` and `Send` Tasks can migrate between threads at any `.await`. * Borrows across `.await` points make state machines self-referential. * Relocating a self-referential struct would invalidate its internal borrows. * `Pin<&mut T>` prevents relocation in memory, protecting those borrows. * `Send` permits the pinned state machine to be accessed from another thread. * `tokio::spawn` requires `Future + Send`, verified by the compiler. --- ## `Send` in Practice: Mutexes `MutexGuard` is not `Send`: pthreads require lock/unlock on the same thread. ```rust async fn broken() { let guard = std::sync::Mutex::new(0).lock().unwrap(); // locked on thread A some_async_fn().await; // migrates to thread B println!("{}", *guard); // ERROR: MutexGuard is not Send } ``` `tokio::sync::Mutex` is implemented in user-space: its guard is `Send` and `.lock()` is async. ```rust async fn fixed() { let mutex = tokio::sync::Mutex::new(0); let guard = mutex.lock().await; // yields until lock is available some_async_fn().await; // guard is Send, migration is safe println!("{}", *guard); } ``` --- class: middle ## Discussion: Async in Practice When should you use async Rust instead of threads? What are the advantages and disadvantages of async Rust? --- ## Async Summary High-performance concurrent code that scales to millions of connections. * Language provides syntax and state machine generation. * Runtime must provide scheduling, I/O integration, and task management. * More complex than threads, best for I/O-bound workloads. * Every yield point is explicit in the code.