Maelstrom-rust part 1 - The beginning

On November 2024 I stumbled upon fly.io's Gossip Glomers, which are a series of challenges on distributed systems using Jepsen's Maelstrom, which is "a workbench for learning distributed systems by writing your own".
Maelstrom has a specific protocol for communicating with the runtime, so all you need to do is:

  1. Pass a binary that speaks that protocol and implements a solution to a challenge.
  2. Tell Maelstrom which challenge you're trying to solve.
  3. Pass in some additional parameters to make things harder. (optional)

Maelstrom will run the binary against the selected challenge and give you a whole lot of insights!
The cool thing is that you can write the binary in any language that you want to :)

Folks at Fly did a pretty cool thing, they've designed and implemented a Golang package (a package is a Go 'lib') to interact with Maelstrom.
The package doesn't implement any solutions for you, it instead takes care of that whole "writing a binary that speaks the protocol" part for you, so you can focus on the 'distributed systems' part of the challenges.
I was obviously hyped with this, I could finally put into practice what I've been studying about distributed systems.

But... what about rust?...

"WHAT ABOUT IT!?", you may ask.

To which I'll reply:
First of all, rude...
And well... what if I want to do something like this but using Rust instead of Go?
So I did the first thing that every sane developer would do, I searched for a crate (a crate is a Rust 'lib') that could do the same!
Aaaand I couldn't find one. I did find a video of Jon Gjengset doing a couple of those in Rust, but that's all.

So I did the second thing that every sane developer would do! (I guess)
I created the crate myself! or at least I'm trying to

Keep in mind that the Rust crate will be heavily inspired by the Golang package. BUT! I'll try to express the system's constraints using the type system (respecting the Rusty way). So, without further due...

The runtime

Before diving into the implementation of each section, let's first vaguely define our system's 'layers' and how it interacts with messages.

  1. Each message will arrive via the Stdin as a JSON, and each message has a 'type' field
  2. Accourding to the 'type' field, we call a user-defined function passing the received message as a parameter to said function. The arrival of a message without a user-provided handler is a no-op.
  3. The user can send messages to other nodes and can reply to a message.

Let's also define some things that I really want the runtime to have/be:

  1. It has to support concurrent handling of each message with tokio tasks.
  2. It has to use different syncing primitives other than Mutex<T>.
  3. The API has to mimic the Golang version whenever possible, or implement a more ergonomic sollution.

Although very vague, this gives us a good start!
As you'll see later, one of my first mistakes was to try to impose constraints where they didn't exist, that's why I'm avoiding using to many constraints.

We could start by implementing either the first or the second 'layer'.
Going for the third one first would be a little harder, since the implementation would depend on previous work.

Messages all the way down

Now, for a philosophical opening: What is a Message?
Since our system is a never ending manage of Messages, it would be wise to have a concrete definition of what is one xD.

The final Message struct is:

use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::fmt::Debug;

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Message {
    pub src: String,
    pub dest: String,
    pub body: MessageBody,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct MessageBody {
    #[serde(rename = "type")]
    pub ty: String,
    pub msg_id: Option<isize>,
    pub in_reply_to: Option<isize>,

    #[serde(flatten)]
    pub payload: Value,
}

This 'Value' type, the one imported from the serde_json crate, is the type equivalent of any valid JSON. Now that we have a proper Message struct, we can start to think of a way to map a type of message to a user-provided function handler - when I say 'message type', I mean the string value of the type field of a given Message struct -.

The MessageRouter type

before we dive into that, a little detour for a cautionary tale (If you don't want to hear it, you can skip it)

A Cautionary tale

I was going all in on the whole "express things using the type system" stuff, so my first train of thought was to write an API that enabled something like.

/// Workload would be in the spirit of a marker trait
/// (https://doc.rust-lang.org/std/marker/index.html)
pub trait Workload;

#[derive(serde::Deserialize)]
pub struct APayloadType { /* fields */ }

impl Workload for APayloadType;

let node = Node::new();

let handler = |message: APayloadType| {println!(":?", message)};

node.handle::<APayloadType>(handler);

I wanted the user to define a struct, imposing that said struct should implement Serde's Deserialize and 'be a Workload' (by implementing the workload trait), then say that whenever that 'type' of message came, it should be routed to the passed callback. I wanted to guarantee that the 'type' of message would map to a concrete type and that concrete type would be the handler function's parameter.

I ended up with something like this:

use std::any::TypeId;
use std::collections::HashMap;
use std::boxed::Box;

/// the Value on this map would be the handler functions that the user passed
pub type TypeFuncMap = HashMap<TypeId, Box<dyn Fn(Box<dyn std::any::Any>)>>;

/// this is to accommodate the TypeFuncMap
pub struct MessageRouter {
    pub router: Option<Box<TypeFuncMap>>,
}

impl MessageRouter {
    pub fn new() -> Self {
        MessageRouter { router: None }
    }

    pub fn route<P>(&mut self, type_id: TypeId, handler: impl Fn(Message<P>) + 'static)
    where
        P: Clone + Debug + for<'a> serde::Deserialize<'a> + 'static,
    {
        // I would box the provided function into a callback that just called the
        // passed function with a typecast on the concrete type
        let boxed_handler = Box::new(move |msg: Box<dyn std::any::Any>| {
            // downcast the any into a concrete type
            if let Ok(message) = msg.downcast::<Message<P>>() {
                handler(*message);
            } else {
                // if it got here, the type system had failed me
                panic!("Type mismatch on router dispatch: expected Message<P>");
            }
        }) as Box<dyn Fn(Box<dyn std::any::Any>)>;

        // insert the boxed handler into the router map
        let _ = self
            .router
            .get_or_insert_with(Box::default)
            .insert(type_id, boxed_handler);
    }
}

/// this Node is a struct that would grow to store everything that
/// the API would eventually need. That's why it has only one field for now.
struct Node {
    pub message_router: MessageRouter
}

impl Node {
    /// we need this level of indirection so that the compiler can generate
    /// a different implementation of 'handle' for each Type of message,
    /// because each Type has a different TypeId.
    pub fn handle<P>(&mut self, handler: impl Fn(Message<P>) + 'static)
        where
        P: Clone + Debug + for<'a> serde::Deserialize<'a> + 'static,
    {
        self.message_router.route(TypeId::of::<P>(), handler);
    }
}

The workload would define the Requests the node would accept. At first glance, this seems like a great idea, but if you look at it for more than 20 seconds you'll notice that:

a. Maelstrom's protocol allows a user to extend the messages passed, while my API made that rule more strict. I was imposing a constraint through my API that did not exist on the underlying protocol, and that's a no-no.

b. If my type was malformed, any message arriving with a field 'type' that should be routed to my handler, would not, because the deserialization would fail
I was, once again, imposing constraints through my API that did not exist on the underlying protocol.

c. The final handler was highly sophisticated, but it introduced a whole array of technical constraints that I frankly have no sanity to deal with.

At the end of the day, I was trying to define something by applying constraints, instead of letting it be defined by what it does and does not do.

hey man I'm all the way back here, I can't hear you!!!

I said 'I was trying to define something by appying constraints, instead of letting it be defined by what it does and does not do!!'