Maelstrom-rust part 1 - The beginning

On November 2024 I stumbled upon fly.io's Gossip Glomers, which are a series of challenges on distributed systems using Jepsen's Maelstrom, which is "a workbench for learning distributed systems by writing your own".
Maelstrom has a specific protocol for communicating with the runtime, so all you need to do is:

  1. Pass a binary that speaks that protocol and implements a solution to a challenge.
  2. Tell Maelstrom which challenge you're trying to solve.
  3. Pass in some additional parameters to make things harder. (optional)

Maelstrom will run the binary against the selected challenge and give you a whole lot of insights!
The cool thing is that you can write the binary in any language that you want to :)

Folks at Fly did a pretty cool thing, they've designed and implemented a Golang package (a package is a Go 'lib') to interact with Maelstrom.
The package doesn't implement any solutions for you, it instead takes care of that whole "writing a binary that speaks the protocol" part for you, so you can focus on the 'distributed systems' part of the challenges.
I was obviously hyped with this, I could finally put into practice what I've been studying about distributed systems.

But... what about rust?...

"WHAT ABOUT IT!?", you may ask.

To which I'll reply:
First of all, rude...
And well... what if I want to do something like this but using Rust instead of Go?
So I did the first thing that every sane developer would do, I searched for a crate (a crate is a Rust 'lib') that could do the same!
Aaaand I couldn't find one. I did find a video of Jon Gjengset doing a couple of those in Rust, but that's all.

So I did the second thing that every sane developer would do! (I guess)
I created the crate myself! or at least I'm trying to

Keep in mind that the Rust crate will be heavily inspired by the Golang package. BUT! I'll try to express the system's constraints using the type system (respecting the Rusty way). So, without further adue...

The runtime

Before diving into the implementation of each section, let's first vaguely define our system's 'layers' and how it interacts with messages.

  1. Each message will arrive via the Stdin as a JSON, and each message has a 'type' field
  2. Accourding to the 'type' field, we call a user-defined function passing the received message as a parameter to said function. The arrival of a message without a user-provided handler is a no-op.
  3. The user can send messages to other nodes and can reply to a message.

Let's also define some things that I really want the runtime to have/be:

  1. It has to support concurrent handling of each message with tokio tasks.
  2. It has to use different syncing primitives other than Mutex<T>.
  3. The API has to mimic the Golang version whenever possible, or implement a more ergonomic sollution.

Although very vague, those constrains give us a very good starting point!
As you'll see later, one of my first mistakes was to try to impose constraints where they didn't exist, that's why I'm avoiding using too many constraints.

We could start our implementation through either the first or the second 'layer'.
Going for the third one first would be a little harder, since the implementation would depend on previous work.

Messages all the way down

Now, for a philosophical opening: What is a Message?
Since our system is a never ending manage of Messages, it would be wise to have a concrete definition of what one is xD.

Translating the protocol into a rust struct, we get to the following rust type for a message:

use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::fmt::Debug;

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Message {
    pub src: String,
    pub dest: String,
    pub body: MessageBody,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct MessageBody {
    #[serde(rename = "type")]
    pub ty: String,
    pub msg_id: Option<isize>,
    pub in_reply_to: Option<isize>,

    #[serde(flatten)]
    pub payload: Value,
}

This 'Value' type, the one imported from the serde_json crate, is a type that represents any valid JSON.
There are some type <-> payload value mappings in the protocol, but since this is a library supposed to be used by people who want to study, it's a good thing for it to be more flexible.
Now that we have a proper Message struct, we can start to think of a way to map an incoming message of some type to a user-provided function handler.

The MessageRouter type

A disclaimer: There are a bunch of concepts that I'll have to stroll over, otherwise, this post series would be absurdly long, so bear with me. Whenever possible, I'll try to explain loosely and briefly the concepts and also point you towards references to learn a bit more about them, but that's as far as I'll go.

We need a way to map a unique type of message to a function handler, one of the best data structures to map something to another is our good ol' HashMap<K, V>!
In an effort to express things using types, we first have to define what is that 'user-defined handler function' so we can use it as the value in our HashMap<K, V>.

pub trait HandlerFunc: Fn(Message, &Node) + Send + Sync + 'static {}
impl<T> HandlerFunc for T where T: Fn(Message, &Node) + Send + Sync + 'static {}

pub type RpcMap = HashMap<String, Arc<dyn HandlerFunc>>;

Ok, a lot of big words there, so let's go over them!

For Send + Sync + 'static and the Arc Type, know that those are required when we operate on a concurrent environment1 (before the 'actually' people come for my flesh, I'll just remind you that... taps disclaimer sign).

For the pub trait HandlerFunc: we're just defining a new trait (interfaces in rust),
that is the combination of all those traits and lifetimes - Fn(Message, &Node), Send, Sync, 'static.

In rust, closures are passed as trait objects, which can be 'dynamically dispatched'23, so the Fn(Message, &Node) specifies that the passed function will receive a Message and a reference to a Node.

The impl<T> HandlerFunc ... is just us saying that if the passed user-defined closure has the correct signature and it is Send + Sync + 'static (taps disclaimer sign again), then we're satisfied with it!

The RpcMap type is a way to express that for a given string, we may have a corresponding function; or, "for a given type of message, we may have a user-provided handler".
the dyn keyword is a way to express that the type passed will be called over dynamic dispatch23.

Now that we have a type that expresses what we want, we can go ahead and create a struct that has a field that is of said type.

pub struct RpcRouter {
    pub router: Arc<RwLock<Option<RpcMap>>>,
}

New words there!
The RwLock<T> is a cousing of Mutex<T>, instead of blocking every operation on a specific region of code, it lets n read callers operate on it, as long as there is no write caller using it. In other words, You can all read as long as no one is writing.

So far so good, now we only need a couple methods to read and write to this router.

impl RpcRouter {
    pub fn new() -> Self {
        RpcRouter {
            router: Arc::new(RwLock::new(None)),
        }
    }

    pub fn route<F>(&mut self, rpc_type: &str, handler: F)
    where
        F: HandlerFunc
    {
        // Insert the boxed handler into the router map
        let arced_handler: Arc<dyn HandlerFunc> = Arc::new(handler);
        let _ = self
            .router
            .write()
            .expect("error on write lock of message router route()")
            .get_or_insert_with(FuncMap::default)
            .insert(rpc_type.to_string(), arced_handler);
    }

    pub fn get(&self, key: &str) -> Option<Arc<dyn HandlerFunc>> {
        if let Some(ref map) = *self
            .router
            .read()
            .expect("error on read lock in MessageRouter::get")
        {
            match map.get(key) {
                Some(handler) => Some(handler.clone()),
                None => None,
            }
        } else {
            None
        }
    }
}

Friends and folks, we got it!
We can map some type of message to a user-provided function and then later get that function based on an incoming message of some type.
(also, feel free to copy this code and test it yourself!)

Although it may seem that I went for this implementation from the start, you may be interested in a little cautionary tale on how I first went for the most wild implementation, and what I learned from it!

A Cautionary tale

I was going all in on the whole "express things using the type system" stuff, so my first train of thought was to write an API that enabled something like.

/// Workload would be in the spirit of a marker trait
/// (https://doc.rust-lang.org/std/marker/index.html)
pub trait Workload;

#[derive(serde::Deserialize)]
pub struct APayloadType { /* fields */ }

impl Workload for APayloadType;

let node = Node::new();

let handler = |message: APayloadType| {println!(":?", message)};

node.handle::<APayloadType>(handler);

I wanted the user to define a struct, imposing that said struct should implement Serde's Deserialize and 'be a Workload' (by implementing the workload trait), then say that whenever that 'type' of message came, it should be routed to the passed callback. I wanted to guarantee that the 'type' of message would map to a concrete type and that concrete type would be the handler function's parameter.

I ended up with something like this:

use std::any::TypeId;
use std::collections::HashMap;
use std::boxed::Box;

/// the Value on this map would be the handler functions that the user passed
pub type TypeFuncMap = HashMap<TypeId, Box<dyn Fn(Box<dyn std::any::Any>)>>;

/// this is to accommodate the TypeFuncMap
pub struct MessageRouter {
    pub router: Option<Box<TypeFuncMap>>,
}

impl MessageRouter {
    pub fn new() -> Self {
        MessageRouter { router: None }
    }

    pub fn route<P>(&mut self, type_id: TypeId, handler: impl Fn(Message<P>) + 'static)
    where
        P: Clone + Debug + for<'a> serde::Deserialize<'a> + 'static,
    {
        // I would box the provided function into a callback that just called the
        // passed function with a typecast on the concrete type
        let boxed_handler = Box::new(move |msg: Box<dyn std::any::Any>| {
            // downcast the any into a concrete type
            if let Ok(message) = msg.downcast::<Message<P>>() {
                handler(*message);
            } else {
                // if it got here, the type system had failed me
                panic!("Type mismatch on router dispatch: expected Message<P>");
            }
        }) as Box<dyn Fn(Box<dyn std::any::Any>)>;

        // insert the boxed handler into the router map
        let _ = self
            .router
            .get_or_insert_with(Box::default)
            .insert(type_id, boxed_handler);
    }
}

/// this Node is a struct that would grow to store everything that
/// the API would eventually need. That's why it has only one field for now.
struct Node {
    pub message_router: MessageRouter
}

impl Node {
    /// we need this level of indirection so that the compiler can generate
    /// a different implementation of 'handle' for each Type of message,
    /// because each Type has a different TypeId.
    pub fn handle<P>(&mut self, handler: impl Fn(Message<P>) + 'static)
        where
        P: Clone + Debug + for<'a> serde::Deserialize<'a> + 'static,
    {
        self.message_router.route(TypeId::of::<P>(), handler);
    }
}

The workload would define the Requests the node would accept. At first glance, this seems like a great idea, but if you look at it for more than 20 seconds you'll notice that:

a. Maelstrom's protocol allows a user to extend the messages passed, while my API made that rule more strict. I was imposing a constraint through my API that did not exist in the underlying protocol, and that's a no-no.

b. If my type was malformed, any message arriving with a field 'type' that should be routed to my handler, would not - because the deserialization would fail.
I was, once again, imposing constraints through my API that did not exist in the underlying protocol.

c. The final handler was highly sophisticated, but it introduced a whole array of technical constraints that I honestly have not enough sanity to deal with.

At the end of the day, I was trying to define something by applying constraints, instead of letting it be defined by what it does and does not do.

hey man I'm all the way back here, I can't hear you!!!

I said 'I was trying to define something by appying constraints, instead of letting it be defined by what it does and does not do!!'

And that's a wrap for today!
Feel free to reach out to me on Linkedin if you have anything to say about this!
The next post is still a WIP (pun intended), but it's comming!

References

  1. Send + Sync:

    1. Send + Sync on Rust Book
    2. Jon Gjengset's video on Send + Sync
    3. Send + Sync Documentation
    4. Send + Sync on the Rustonomicon
  2. The Fn Trait in rust

    1. Fn documentation
    2. Jon Gjengsten's video on Fn, FnMut, and FnOnce
    3. Rust book chapter on closures
    2
  3. What is dynamic dispatch?

    1. Virtual Funcions in c++ (trust me)
    2. Vtable in c++
    2