Jump to content

How a Rust Future gets polled: from async fn to state machine

From JOHNWICK

Having worked on a few async rust project, several go & nodejs, I decided to see what’s really going on. I mean, it’s simple right? Take a function, instead of blocking sequentially, just the way you spin up functions in threads. However, after studying a few resources, I got to learn more about concurrency, parallelism, and the async state machine.

Starting with, why? Why do we even need all these constructs. Simple, we want to support multiple actions at a time. You’re probably listening to music while reading this, your device is downloading resources, your screen is refreshing (how you see changes like notifications and co), and several thousand background tasks. Hence, your device needs to be able to handle multiple tasks concurrently (.i.e all tasks proceed) or in parallel (tasks actually running at same time using multicore systems).

I wrote a piece on concurrency here: https://xpanvictor.github.io/technology/systems/os/2023/11/24/amortization-preemptive_schedulers-context_switching.html and some other concurrency blogs on same blog site. However, those are on the Operating System level. Now how do async work? Concurrency is handled on the OS level using threads which is fine but a lot of times won’t be the necessity. From the previous blog, you’ll see the overhead involved with preemptive multitasking using threads. The context switch cost. In a program however, you mostly just want to signify that a task needs to wait for some IO bound event’s completion eg reading from a file. While it’s reading from such file, you probably want to attend to other users’ requests or perform some other action. In that case, all we need is to tell a “system”, pause the execution of this task until the data is generated, handle any other action you have and once the data is complete, come back to continue the execution. This is called cooperative multitasking and it’s different from preemptive in that all tasks are executing and allowing each other execute not that the system interrupts abruptly, keeping some state.

Now, how do we handle this (rust specifically). It’s where async comes in. When you wrap a block/function with async, it’s just a sugar coated generator for a future. A future is (well from the name), something you expect to happen or be completed later.

async fn guaranteed_random_gen(): u32 {
  4u32;
} 
// the above translates to
struct RandFuture;
impl Future for RandFuture {
    type Output = u32;
    fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
        // Since there are no await points, the function body can be
        // executed immediately. It's already complete.
        Poll::Ready(4u32)
    }
}
fn guaranteed_random_gen() -> impl Future<Output = u32> {
  RandFuture
}

Now, first thing to note here is async functions in rust are lazy. Calling them does nothing but return a Future. Now, it’s up to some form of logic to drive that to completion. This might seem a bit disturbing if coming from a language like js where async returns a promise that begins execution immediately or go where goroutines begin immediately after spinning but there are advantages.

  • zero cost abstraction: no unnecessary heap allocation of the state machine. It’s explicitly stored on the stack hence can be moved around and there’s predictable performance.
  • avoid performance jitters: there’s no garbage collector. No need for those pauses due to the holy grail GC. Also makes rust good for embedded systems with memory constraints. Also, one thing I learnt during is how go handles things. Know what? Let me expand and cook the language a bit (I’m a big go lover btw). Go manages concurrency within the program (without the OS) but uses pre-emptive scheduling without need for cooperation between threads. Hence see why you don’t need async-await notation in go code. Issue with this is a big runtime.
  • gosh, I like control. With this, you can bring in your own runtime or even make your own which I did here: https://github.com/xpanvictor/rusty/tree/master/marshal-async

Now, we’ve established we’re working with a state machine on the stack. Let’s examine the future’s trait a bit more:

pub trait Future {
    type Output;
    fn poll(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Self::Output>;
}

There’s the output, whatever your async block should return. Then there’s a poll associated method. This is exactly what your runtime (eg tokio, async-rs, my marshal_async, etc) calls. Now, let’s break down that flow.

Your runtime usually has at least these 2 components: Spawner, Executor. The spawner handles handling futures. It picks them up, sends to a task queue. You can see a brief example from my implementation here:

impl Spawner {
    pub fn new(sd_ch: Sender<Arc<Task<()>>>, active: Arc<AtomicUsize>) -> Self {
        Spawner { sd_ch, index: 0, active }
    }

    pub fn spawn<T, F>(&mut self, f: F) -> JoinHandle<T>
    where
        F: Future<Output = T> + Send + 'static,
        T: Send + 'static,
    {
        let (tx, rx) = channel();

        // Wrap user future, deliver T to JoinHandle via channel; task future outputs ()
        let fut_wrapper = async move {
            let result = f.await;
            let _ = tx.send(result);
        };

        self.active.fetch_add(1, atomic::Ordering::SeqCst);

        let task = Task {
            future: Mutex::new(Some(
                Box::pin(fut_wrapper) as Pin<Box<dyn Future<Output = ()> + Send + 'static>>
            )),
            sender: self.sd_ch.clone(),
            id: self.index,
            active: self.active.clone(),
        };

        self.index += 1;
        self.sd_ch.send(Arc::from(task)).unwrap();

        JoinHandle::new(rx)
    }
}

Ignore the join handle for now. Essentially, the idea here is, take the future (should implement Send & ‘straight traits so we can move across threads), I’m using a channel here as we’ll be passing a sender to a custom Task struct I created which wraps the future.

For the active atomic uint, I’m planning to just implement some basic job stealing for concurrency like tokio but I’ll look into the testing later.

Then see, we wrap the future (note the pin to ensure same memory address for self referential state machine) into the task and send into the runtime’s queue.

This is where our executor picks it up, pushes the future state machine further, checks if it’s Ready, completes then or finds a way to push back to queue if not (I’ll explain this in a bit).

impl Executor {
    pub fn new(rx: Receiver<Arc<Task<()>>>) -> Self {
        Self { rx }
    }

    /// Blocking run loop: drains tasks until channel is closed and no active tasks remain
    pub fn run(&self, active: &std::sync::Arc<std::sync::atomic::AtomicUsize>) {
        loop {
            match self.rx.try_recv() {
                Ok(task) => self.poll(task),
                Err(TryRecvError::Empty) => {
                    if active.load(Ordering::SeqCst) == 0 {
                        break;
                    } else {
                        // let os do somn else
                        std::thread::yield_now();
                    }
                }
                Err(TryRecvError::Disconnected) => {
                    // Channel closed; finish only when active zero
                    if active.load(Ordering::SeqCst) == 0 {
                        break;
                    } else {
                        std::thread::yield_now();
                    }
                }
            }
        }
    }

    // non blocking ver
    pub fn run_once(&self) {
        if let Ok(task) = self.rx.try_recv() {
            self.poll(task);
        }
    }

    // polling part
    pub fn poll(&self, t: Arc<Task<()>>) {
        let waker_ref = waker_ref(&t);
        let mut cx = Context::from_waker(&*waker_ref);

        let mut future_slot = t.future.lock().unwrap();
        if let Some(mut future) = future_slot.take() {
            match future.as_mut().poll(&mut cx) {
                Poll::Pending => {
                    *future_slot = Some(future);
                }
                Poll::Ready(_) => {
                    // finished: decrement shared counter
                    t.active.fetch_sub(1, Ordering::SeqCst);
                }
            }
        }
    }
}

The idea is just in the poll match. Notice the context we pass however. This is where the magic comes. The context has a key component called the Waker. The waker is the entity that actually notifies the runtime that the future can be repolled. It’s not busy waiting, if the item isn’t ready, it’s not repolled. For example, I created an async future to show where waker is called

use std::sync::atomic::{AtomicBool, Ordering};

/// A future that yields once to the executor.
pub fn yield_now() -> YieldNow {
    YieldNow {
        yielded: Arc::new(AtomicBool::new(false)),
    }
}

pub struct YieldNow {
    yielded: Arc<AtomicBool>,
}

impl Future for YieldNow {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
        // First poll: mark yielded and wake
        if !self.yielded.swap(true, Ordering::SeqCst) {
            let waker: &Waker = cx.waker();
            waker.wake_by_ref();
            Poll::Pending
        } else {
            Poll::Ready(())
        }
    }
}

You can see the waker here. Just before pending is send, the waker is called for the yield. However, in this example the waker is called immediately. If we were implementing something like an async timer or network fetch, you can spawn a different thread, keep the waker (clone) into that thread and when it times out or network fetch is complete, just call the waker back. So while the thread is working, you can safely just return pending. And folks, it’s that simple. However, before we wrap up, I should mention something fun. Maybe you shouldn’t console.log everywhere. Thing is, most times we mix blocking IO in async code. Async doesn’t mean the function disappears somewhere else. It’s not multithreaded (can be). It’s not parallelism (can be). Async is concurrent across asyncs not within async. So in an async block, everything is sequential. So if you do something like

async {
  let f = async_fetch_some_file().await.unwrap();
  regular_log_very_big_file(f.some_handler);
  return 0
}

You just missed the point unless it’s fine for your implementation. The first await yields control back to the runtime, fine. However, after the waker is called and that one is done, regular_log… starts running and can block the entire execution thread. This is called starving the thread. Well, most console logs are very small and even runtimes manage them well (flushing logic; well compilers, interpreters are pretty smart, even your printf in c++ might just buffer your logs). Well, I expect some errors here, if any, please reach out to [email protected] or comment here. Thank you.

Read the full article here: https://medium.com/@xpanvictor/how-a-rust-future-gets-polled-from-async-fn-to-state-machine-033bc9cb0126