Jump to content

Rethinking Concurrency: The Actor Model and Ractor in Rust

From JOHNWICK

The moment you start building systems that handle thousands of concurrent operations, you realize that traditional threading models start to crack under pressure. Shared memory concurrency forces you into a world of mutexes, race conditions, and the constant fear that somewhere, somehow, two threads are fighting over the same piece of data. The cognitive overhead becomes overwhelming. You spend more time reasoning about locks than about the actual business logic you’re trying to implement.

The Actor model offers a fundamentally different approach. Instead of threads competing for shared memory, you get isolated processes that communicate exclusively through messages. It’s not just a different API for the same underlying problem. It’s a different way of thinking about concurrent systems altogether.

The Problem with Shared State

Consider a typical multithreaded application processing network requests. Each thread might need to update some global statistics, check a shared cache, or modify a connection pool. Every one of these operations becomes a potential bottleneck. You wrap everything in locks, but now you’re dealing with lock contention. Fine-grained locking helps with contention but introduces deadlock risks. Lock-free algorithms using atomics seem attractive until you realize you’re essentially building a distributed consensus protocol at the CPU instruction level.

The real issue runs deeper than performance. Shared mutable state makes reasoning about your program nearly impossible. When any thread can modify any piece of data at any time, you can’t look at a function and understand what it does. You have to trace through every possible interleaving of operations across all threads. The state space explodes combinatorially.

Actors as Isolated State Machines

The Actor model sidesteps this entire class of problems by eliminating shared mutable state. Each actor is an independent entity with its own private state. No other actor can directly access or modify that state. The only way to interact with an actor is to send it a message.

When an actor receives a message, it processes that message sequentially. There’s no concurrent access to its internal state because only the actor itself can touch its own state. This gives you the reasoning power of single-threaded code while maintaining the performance characteristics of concurrent systems.

Think of each actor as a microservice running in the same process. It has a well-defined interface (the messages it accepts), private state (its internal data structures), and processes requests one at a time. The difference is that creating an actor costs microseconds rather than milliseconds, and sending a message takes nanoseconds rather than network round trips.

Message Passing as the Sole Communication Mechanism

Messages in the Actor model are not method calls. When you send a message to an actor, you’re not waiting for it to execute. You’re placing that message in the actor’s mailbox and moving on. The actor will process it when it gets to it. This asynchronous-by-default nature naturally leads to systems that don’t block.

There’s an elegance to this constraint. Because actors can only communicate through messages, you naturally end up with loosely coupled components. An actor doesn’t need to know anything about the internal implementation of another actor. It just needs to know what messages that actor understands. This makes it trivial to swap implementations, add new actors, or distribute actors across machines.

The asynchronous nature also means backpressure becomes explicit. If an actor’s mailbox fills up, you have to decide what to do. Drop messages? Block the sender? Route to a different actor? These are application-level decisions rather than runtime surprises.

Why This Matters for Systems Programming

The Actor model originated in Erlang, designed for building telecom switches that could never go down. The constraints of that environment (millions of concurrent connections, hot code reloading, fault tolerance) shaped a model that turns out to be remarkably applicable to modern distributed systems.

When you’re building a system that processes streams of data from Kafka, each actor can represent a partition consumer. When you’re implementing a cache, each actor can own a shard of the keyspace. When you’re building a request router, each actor can maintain affinity for a subset of clients. The model maps naturally to the kinds of problems you encounter in distributed systems.

The supervision model, which we’ll explore in depth in the next post, gives you fault tolerance almost for free. When an actor crashes, its supervisor can restart it, and because actors are isolated, that crash doesn’t take down the entire system. This is the kind of resilience that’s nearly impossible to achieve with traditional threading models.

Enter Ractor

Rust’s ownership system and the Actor model are a natural fit. Actors already enforce isolated mutable state through their message-passing constraint. Rust enforces it at compile time through its type system. When you send a message containing data to an actor in Ractor, you typically transfer ownership of that data. The compiler guarantees you can’t accidentally share mutable state between actors.

Ractor is a pure Rust implementation of the Actor model, heavily inspired by Erlang’s OTP and Akka. It gives you the core abstractions: actors with mailboxes, message passing with both fire-and-forget and request-response patterns, supervision trees for fault tolerance, and actor lifecycle management.

Let’s build something concrete to see how these concepts translate into code.

Building a Distributed Counter System

We’ll implement a counter system that demonstrates the core patterns you’ll use in real applications. Multiple client actors will send increment and decrement operations to counter actors, and we’ll aggregate statistics across all counters. This mirrors the kinds of aggregation problems you encounter when tracking metrics across a distributed system.

First, the dependencies in your Cargo.toml:

[dependencies]
ractor = "0.9"
tokio = { version = "1", features = ["full"] }

Now let’s define our counter actor. The pattern in Ractor centers on three key types: the message enum that defines what the actor can receive, the state struct that holds the actor’s private data, and the actor implementation itself.

use ractor::{Actor, ActorProcessingErr, ActorRef, RpcReplyPort};
use std::time::{Duration, Instant};

#[derive(Debug)]
pub enum CounterMessage {
    Increment { amount: i64 },
    Decrement { amount: i64 },
    GetValue(RpcReplyPort<i64>),
    GetStats(RpcReplyPort<CounterStats>),
}

#[derive(Debug, Clone)]
pub struct CounterStats {
    pub current_value: i64,
    pub total_operations: u64,
    pub operations_per_second: f64,
}

pub struct CounterActor {
    id: String,
}

pub struct CounterState {
    id: String,
    value: i64,
    total_operations: u64,
    start_time: Instant,
}

#[ractor::async_trait]
impl Actor for CounterActor {
    type Msg = CounterMessage;
    type State = CounterState;
    type Arguments = String;

    async fn pre_start(
        &self,
        _myself: ActorRef<Self::Msg>,
        id: Self::Arguments,
    ) -> Result<Self::State, ActorProcessingErr> {
        println!("[{}] Counter starting", id);
        Ok(CounterState {
            id,
            value: 0,
            total_operations: 0,
            start_time: Instant::now(),
        })
    }

    async fn handle(
        &self,
        _myself: ActorRef<Self::Msg>,
        message: Self::Msg,
        state: &mut Self::State,
    ) -> Result<(), ActorProcessingErr> {
        match message {
            CounterMessage::Increment { amount } => {
                state.value += amount;
                state.total_operations += 1;
            }
            CounterMessage::Decrement { amount } => {
                state.value -= amount;
                state.total_operations += 1;
            }
            CounterMessage::GetValue(reply) => {
                reply.send(state.value)?;
            }
            CounterMessage::GetStats(reply) => {
                let elapsed = state.start_time.elapsed().as_secs_f64();
                let ops_per_sec = if elapsed > 0.0 {
                    state.total_operations as f64 / elapsed
                } else {
                    0.0
                };
                
                reply.send(CounterStats {
                    current_value: state.value,
                    total_operations: state.total_operations,
                    operations_per_second: ops_per_sec,
                })?;
            }
        }
        Ok(())
    }

    async fn post_stop(
        &self,
        _myself: ActorRef<Self::Msg>,
        state: &mut Self::State,
    ) -> Result<(), ActorProcessingErr> {
        println!(
            "[{}] Counter stopping with final value {} after {} operations",
            state.id, state.value, state.total_operations
        );
        Ok(())
    }
}

The actor’s lifecycle flows through three phases. The pre_start hook runs before the actor begins processing messages, giving you a place to initialize state or acquire resources. The handle method processes each message sequentially, with exclusive mutable access to the state. The post_stop hook runs during shutdown, perfect for cleanup or final logging. Notice how the state updates happen. When we increment the counter, we directly mutate state.value. There's no lock acquisition, no atomic operation, no concern about other threads. The actor runtime guarantees that handle is never called concurrently for the same actor. This is the simplicity dividend of the Actor model.

Now let’s create an aggregator actor that collects statistics from multiple counters:

pub enum AggregatorMessage {
    RegisterCounter(ActorRef<CounterMessage>),
    CollectStats,
    GetTotalStats(RpcReplyPort<AggregatedStats>),
}

#[derive(Debug, Clone)]
pub struct AggregatedStats {
    pub total_value: i64,
    pub total_operations: u64,
    pub num_counters: usize,
    pub avg_ops_per_second: f64,
}

pub struct AggregatorActor;

pub struct AggregatorState {
    counters: Vec<ActorRef<CounterMessage>>,
    last_stats: Option<AggregatedStats>,
}

#[ractor::async_trait]
impl Actor for AggregatorActor {
    type Msg = AggregatorMessage;
    type State = AggregatorState;
    type Arguments = ();

    async fn pre_start(
        &self,
        _myself: ActorRef<Self::Msg>,
        _args: Self::Arguments,
    ) -> Result<Self::State, ActorProcessingErr> {
        println!("[Aggregator] Starting");
        Ok(AggregatorState {
            counters: Vec::new(),
            last_stats: None,
        })
    }

    async fn handle(
        &self,
        myself: ActorRef<Self::Msg>,
        message: Self::Msg,
        state: &mut Self::State,
    ) -> Result<(), ActorProcessingErr> {
        match message {
            AggregatorMessage::RegisterCounter(counter_ref) => {
                println!("[Aggregator] Registered counter");
                state.counters.push(counter_ref);
            }
            AggregatorMessage::CollectStats => {
                let mut total_value = 0i64;
                let mut total_operations = 0u64;
                let mut total_ops_per_sec = 0.0;

                for counter in &state.counters {
                    match counter.call(
                        |reply| CounterMessage::GetStats(reply),
                        Some(Duration::from_secs(1))
                    ).await {
                        Ok(stats) => {
                            total_value += stats.current_value;
                            total_operations += stats.total_operations;
                            total_ops_per_sec += stats.operations_per_second;
                        }
                        Err(e) => {
                            eprintln!("[Aggregator] Failed to get stats: {}", e);
                        }
                    }
                }

                let aggregated = AggregatedStats {
                    total_value,
                    total_operations,
                    num_counters: state.counters.len(),
                    avg_ops_per_second: if state.counters.len() > 0 {
                        total_ops_per_sec / state.counters.len() as f64
                    } else {
                        0.0
                    },
                };

                println!("[Aggregator] Total value: {}, Total ops: {}, Avg ops/sec: {:.2}",
                    aggregated.total_value,
                    aggregated.total_operations,
                    aggregated.avg_ops_per_second
                );

                state.last_stats = Some(aggregated);
                
                // Schedule next collection
                tokio::spawn(async move {
                    tokio::time::sleep(Duration::from_secs(2)).await;
                    let _ = myself.cast(AggregatorMessage::CollectStats);
                });
            }
            AggregatorMessage::GetTotalStats(reply) => {
                if let Some(stats) = &state.last_stats {
                    reply.send(stats.clone())?;
                } else {
                    reply.send(AggregatedStats {
                        total_value: 0,
                        total_operations: 0,
                        num_counters: 0,
                        avg_ops_per_second: 0.0,
                    })?;
                }
            }
        }
        Ok(())
    }
}

The aggregator demonstrates request-response communication through the call method. When it collects statistics, it sends a message to each counter and waits for the reply. This is synchronous from the aggregator's perspective but asynchronous at the system level. The counters process these requests in their own time, and the aggregator waits without blocking other actors.

Here’s the complete system wired together:

use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    println!("Starting distributed counter system\n");

    // Spawn the aggregator
    let (aggregator_ref, _aggregator_handle) = Actor::spawn(
        Some("aggregator".to_string()),
        AggregatorActor,
        (),
    ).await?;

    // Spawn multiple counter actors
    let mut counter_refs = Vec::new();
    for i in 0..4 {
        let counter_id = format!("counter-{}", i);
        let (counter_ref, _counter_handle) = Actor::spawn(
            Some(counter_id.clone()),
            CounterActor { id: counter_id.clone() },
            counter_id,
        ).await?;

        // Register with aggregator
        aggregator_ref.cast(AggregatorMessage::RegisterCounter(counter_ref.clone()))?;
        counter_refs.push(counter_ref);
    }

    // Start periodic stats collection
    aggregator_ref.cast(AggregatorMessage::CollectStats)?;

    // Simulate concurrent operations from multiple clients
    let mut handles = Vec::new();
    for (idx, counter_ref) in counter_refs.iter().enumerate() {
        let counter = counter_ref.clone();
        let handle = tokio::spawn(async move {
            for i in 0..50 {
                if i % 3 == 0 {
                    let _ = counter.cast(CounterMessage::Decrement { 
                        amount: (idx + 1) as i64 
                    });
                } else {
                    let _ = counter.cast(CounterMessage::Increment { 
                        amount: (idx + 1) as i64 
                    });
                }
                tokio::time::sleep(Duration::from_millis(10)).await;
            }
        });
        handles.push(handle);
    }

    // Wait for operations to complete
    for handle in handles {
        handle.await?;
    }

    // Give time for final stats collection
    sleep(Duration::from_secs(3)).await;

    // Get final aggregated stats
    let final_stats = aggregator_ref.call(
        |reply| AggregatorMessage::GetTotalStats(reply),
        Some(Duration::from_secs(1))
    ).await?;

    println!("\n=== Final System Statistics ===");
    println!("Total value across all counters: {}", final_stats.total_value);
    println!("Total operations: {}", final_stats.total_operations);
    println!("Number of counters: {}", final_stats.num_counters);
    println!("Average ops/second per counter: {:.2}", final_stats.avg_ops_per_second);

    // Shutdown
    for counter in counter_refs {
        counter.stop(None);
    }
    aggregator_ref.stop(None);

    sleep(Duration::from_millis(100)).await;

    Ok(())
}

When you run this, you’ll see counters processing operations concurrently while the aggregator periodically collects statistics. Each counter maintains perfect consistency despite the concurrent updates because those updates are serialized through the actor’s mailbox. The aggregator gets a consistent snapshot of each counter’s state through the request-response pattern.

The Foundation for Complex Systems

This example might seem simple, but it demonstrates the patterns you’ll use in production systems. Replace the counter with a partition consumer and you have a Kafka processing pipeline. Replace it with a cache shard and you have a distributed cache. Replace it with a connection handler and you have a network server.

The Actor model gives you a vocabulary for building concurrent systems that scales from these simple examples to massively distributed architectures. In the next post, we’ll explore supervision trees and fault tolerance, using a trading system as our example. That’s where the Actor model really shows its strength, when things go wrong and your system needs to recover gracefully.

For now, the key insight is this: by constraining how components communicate and eliminating shared mutable state, the Actor model makes concurrent programming approachable. You reason about one actor at a time, just like you reason about one function at a time. The runtime handles the concurrency, and the type system ensures safety. That’s the promise of Ractor in Rust. In the next post we will look at the supervising model.

Code is available here: https://github.com/aovestdipaperino/medium-posts/tree/main/actor-i.