Jump to content

Unlocking Effortless Asynchrony: Mastering the Easy Event-Driven Paradigm in Rust

From JOHNWICK

Photo by Tima Miroshnichenko: https://www.pexels.com/photo/alarm-clocks-on-wooden-shelves-8327954/

Introduction

Sometimes, when your program has a task that takes a lot of time, like working with databases, web services, or complex calculations, you might want to let it happen in the background.

This way, your program can keep running smoothly without waiting for the time-consuming task to finish. In Rust, we can achieve Rust’s ‘fearless concurrency’ This article explores a straightforward example involving a virtual windowing system. Implementation in Rust

The ResizeEvent, ResizeEventHandler and ResizeEventListener structs Let’s assume for the sake of simplicity that resizing windows in our hypothetical system is time-consuming. So we want to do that in a different thread.

Let’s start by importing the structs we need:

use std::sync::{mpsc, Arc, Mutex};
use std::thread;
use std::time::Duration;

Line by line:

  • We start by importing some synchronization primitives:
  • mpsc: This stands for Multiple Producer, Single Consumer. This type provides channels to enable safe communication between threads.
  • Arc: Atomic Reference Counted. This type enable safe sharing of data across threads. It does this by tracking references.
  • Mutex: Mutual Exclusion. This type protects resources from concurrent access, so only one thread at a time can access the data.
  • The std::thread module provides functionality for managing threads.
  • We use the std::time::Duration type for specifying timeouts in the event processing loop.

The ResizeEvent simply encapsulates the new width and height:

#[derive(Debug)]
struct ResizeEvent {
    width: i32,
    height: i32,
}

This event needs a handler: type ResizeEventHandler=Arc<dyn Fn(ResizeEvent)->Result<(i32,i32),String>+Send+Sync>; The ResizeEventHandler is basically an alias for a thread-safe closure behaviour trait, an , taking a ResizeEvent as its only parameter and returning either a tuple of i32 indicating the new width and height, or a String indicating an error. The type is wrapped in an Arc so the handler can be safely shared of the handler across multiple threads.

Now we need a way to listen to the ResizeEvent and we do that by implementing a listener:

#[derive(Debug)]
struct ResizeEventListener {
    events: mpsc::Sender<ResizeEvent>,
    stop: mpsc::Sender<()>,
    _thread_handle: thread::JoinHandle<()>,
}

Line by line:

  • The events field is a channel for receiving or transmitting the resize events on.
  • The stop field is a channel sender for stop signals.
  • The last is a handle to the background thread. It is prefixed with an underscore, indicating it is kept only to maintain thread lifetime.

The implementation looks as follows:

impl ResizeEventListener {
    fn new(
        handler: ResizeEventHandler,
        window: Arc<Mutex<Window>>,
    ) -> Self {
        let (tx, rx) = mpsc::channel();
        let (stop_tx, stop_rx) = mpsc::channel();
        let rx = Arc::new(Mutex::new(rx));
        let handler_clone = Arc::clone(&handler);
        let window_clone = Arc::clone(&window);

        let thread_handle = thread::spawn(move || {
            loop {
                // Wait for either a resize event or a stop signal
                let event = {
                    let rx = rx.lock();
                    if let Ok(rx) = rx {
                        rx.recv_timeout(Duration::from_millis(100))
                    } else {
                        eprintln!("Failed to lock event receiver");
                        break;
                    }
                };
                match event {
                    Ok(event) => {
                        match handler_clone(event) {
                            Ok((new_width, new_height)) => {
                                if let Ok(mut window) = window_clone.lock() {
                                    window.width = new_width;
                                    window.height = new_height;
                                } else {
                                    eprintln!("Failed to lock window for resizing");
                                }
                            }
                            Err(e) => {
                                eprintln!("Error handling resize event: {}", e);
                                break;
                            }
                        }
                    }
                    Err(mpsc::RecvTimeoutError::Timeout) => {
                        if stop_rx.try_recv().is_ok() {
                            break;
                        }
                    }
                    Err(_) => break,
                }
            }
        });

        Self {
            events: tx,
            stop: stop_tx,
            _thread_handle: thread_handle,
        }
    }

    fn send(&self, event: ResizeEvent) {
        if let Err(e) = self.events.send(event) {
            eprintln!("Failed to send resize event: {}", e);
        }
    }

    fn stop(&self) {
        let _ = self.stop.send(());
    }
}

The new() method In the new() method, or the constructor we do the following:

  • We start by creating two channels, one for resize events, one for stop signals.
  • The receiver for the resize events is wrapped both in an Arc and a Mutex. This allows safe sharing and access from different threads.
  • The core of this small system is the background thread which continuously listens for events.
  • Inside the loop the thread periodically checks for new events. Note the timeout.
  • When a resize event is received, it is processed by the handler function.
  • If no error occurred the window dimensions are updated, if there was an error an error message is logged.
  • The thread also checks for stop signals during timeouts, allow the thread to exit cleanly when requested via the stop() method.

The send() method The send() method submits a ResizeEvent to the channel. The listener thread will handle the event. The stop() method

By sending a message to the stop channel, this method signals the background thread to terminate. The Window struct The Window struct looks like this:

#[derive(Debug)]
struct Window {
    listener: Option<ResizeEventListener>,
    title: String,
    width: i32,
    height: i32,
}

Note that the listener is optional. That is because there could be windows without a listener.

Let’s have a look at the implementation:

impl Window {
    fn new(
        title: String,
        width: i32,
        height: i32,
        handler: ResizeEventHandler,
    ) -> Arc<Mutex<Self>> {
        let window = Arc::new(Mutex::new(Self {
            listener: None,
            title,
            width,
            height,
        }));

        let listener = ResizeEventListener::new(handler, Arc::clone(&window));

        // Replace unwrap() with proper error handling
        if let Ok(mut w) = window.lock() {
            w.listener = Some(listener);
        } else {
            eprintln!("Failed to lock window when setting up listener");
        }

        window
    }

    fn resize(&mut self, event: ResizeEvent) {
        if let Some(listener) = &self.listener {
            listener.send(event);
        } else {
            eprintln!("No resize listener available");
        }
    }

    fn open(&self) {
        println!(
            "Opening window: {} with size {}x{}",
            self.title, self.width, self.height
        );
    }

    fn close(&mut self) {
        println!("Closing window: {}", self.title);
        if let Some(listener) = &self.listener {
            listener.stop();
        }
    }
}

Some things to note:

  • The new() method returns an Arc<Mutex<Self>> making sure you can safely share the newly minted object between threads.
  • In the resize() method we test for the existence of a listener before sending the event. One possible enhancement would be to return some sort of error, if there is no listener.
  • The close() method calls the stop() method on the listener, effectively terminating the listener thread.

Testing time

Now we can test our setup:

fn main() {
    let window = Window::new(
        "My Window".to_string(),
        800,
        600,
        Arc::new(|event: ResizeEvent| {
            if event.width < 100 || event.height < 100 {
                return Err("Window size too small".to_string());
            }
            println!("Resizing window to {}x{}", event.width, event.height);
            Ok((event.width, event.height))
        }),
    );


    if let Ok(w) = window.lock() {
        w.open();
    } else {
        eprintln!("Failed to lock window for opening");
    }

    if let Ok(mut w) = window.lock() {
        w.resize(ResizeEvent { width: 1024, height: 768 });
    } else {
        eprintln!("Failed to lock window for resizing to 1024x768");
    }

    if let Ok(mut w) = window.lock() {
        w.resize(ResizeEvent { width: 50, height: 50 });
    } else {
        eprintln!("Failed to lock window for resizing to 50x50");
    }

    if let Ok(mut w) = window.lock() {
        w.close();
    } else {
        eprintln!("Failed to lock window for closing");
    }
}

Line by line:

  • We create a window, with a handler function as a closure.
  • We open the window
  • And then resize it twice.
  • Then we close the window.

Conclusion

Implementing this pattern took some time to get right, but it helped me gain a better understanding of Rust’s ‘fearless concurrency’. As you can see, using the Arc and Mutex structs to make access to the handler and the window thread-safe helps to make the rest of the implementation simple. Also Rust's quite powerful functionpointer in the form Fn and its siblings help to keep things simple and clean.

One possible enhancement, but I will save that for another article is to combine the Balking Pattern with this pattern to make sure you can’t for example resize a closed window, or open a window more than once.


Read the full article here: https://medium.com/rustaceans/unlocking-effortless-asynchrony-mastering-the-easy-event-driven-paradigm-in-rust-code-nomad-40b1f1da3c16