Event-sourced cart

Event sourcing and CQRS: domain events, multiple projections from the same stream, and incremental updates without full replay.

Event sourcing CQRS Projections Keyed records Incremental

What this example covers

This example implements a classic event sourcing and CQRS architecture using merkql as the event store. Domain events for a shopping cart are appended to a topic, then multiple consumer groups independently replay and project different views of the same data.

Domain event modelling

Cart events (AddItem, RemoveItem, Checkout) are modelled as a tagged enum and serialized to JSON. The serde(tag = "type") attribute produces discriminated unions in the JSON output.

Entity-keyed records

Each event uses the cart ID as the record key. This routes all events for the same cart to the same partition, preserving per-cart ordering without requiring global ordering across carts.

CQRS with multiple projections

Two consumer groups read the same event stream independently. “cart-state” materializes the current contents of each cart. “revenue” tracks which carts have checked out and sums their totals. Same data, different views.

Incremental updates

After initial replay, both consumers commit their offsets. When new events arrive, each consumer polls and receives only the new events—no full replay needed. This is the production pattern for maintaining live projections.

CQRS projections: event log with two consumer groups building different materialized views Event Log (topic: cart-events) AddItem AddItem RemoveItem AddItem Checkout ... 30 events total Cart State Materialized view cart-1: Widget x3, Gadget x1 cart-2: Bolt x100, Nut x100 cart-3: Rivet x200, Pin x50 group: cart-state Revenue Report Materialized view cart-1: $125.43 (checked out) cart-2: $141.25 (checked out) cart-3: pending group: revenue offset: 30 offset: 30 +5 new events AddItem Checkout cart-3 completes cart-3: 8 items, $76 offset: 30 → 35 total: $342.68 offset: 30 → 35 Same event stream, different views. Each consumer group processes only new events after commit. No full replay needed for incremental updates.

Step by step

Step 1

Define event types

Cart events are modelled as a Rust enum with #[serde(tag = "type")] for tagged JSON serialization. Each variant carries a cart_id used as the record key. A helper method extracts the cart ID regardless of variant.

#[derive(Serialize, Deserialize)]
#[serde(tag = "type")]
enum CartEvent {
    AddItem {
        cart_id: String,
        item: String,
        qty: u32,
        price: f64,
    },
    RemoveItem {
        cart_id: String,
        item: String,
    },
    Checkout {
        cart_id: String,
    },
}

impl CartEvent {
    fn cart_id(&self) -> &str {
        match self {
            CartEvent::AddItem { cart_id, .. } => cart_id,
            CartEvent::RemoveItem { cart_id, .. } => cart_id,
            CartEvent::Checkout { cart_id, .. } => cart_id,
        }
    }
}

An AddItem event serializes to JSON like:

{"type":"AddItem","cart_id":"cart-1","item":"Widget","qty":2,"price":9.99}
Steps 2–3

Produce cart events

30 events are produced across 3 carts. Carts 1 and 2 go through a full lifecycle: add items, remove some, checkout. Cart 3 has items added but hasn't checked out yet. The cart ID is used as the record key.

let producer = Broker::producer(&broker);

let events: Vec<CartEvent> = vec![
    // Cart 1: add items, remove one, checkout
    CartEvent::AddItem { cart_id: "cart-1".into(),
        item: "Widget".into(), qty: 2, price: 9.99 },
    CartEvent::AddItem { cart_id: "cart-1".into(),
        item: "Gadget".into(), qty: 1, price: 24.99 },
    // ... more events ...
    CartEvent::Checkout { cart_id: "cart-1".into() },
    // Cart 2: similar lifecycle
    // Cart 3: items added, no checkout
];

for event in &events {
    let value = serde_json::to_string(event).unwrap();
    producer.send(&ProducerRecord::new(
        "cart-events",
        Some(event.cart_id().to_string()),
        value,
    )).unwrap();
}
Step 4

Projection 1 — Cart state

The “cart-state” consumer group replays all 30 events and builds a HashMap<cart_id, Vec<LineItem>>. The apply_cart_event function handles each variant: AddItem inserts or updates quantities, RemoveItem drops the item, Checkout is a no-op for this projection.

let mut cart_state: HashMap<String, Vec<LineItem>> = HashMap::new();

let mut consumer = Broker::consumer(&broker, ConsumerConfig {
    group_id: "cart-state".into(),
    auto_commit: false,
    offset_reset: OffsetReset::Earliest,
});
consumer.subscribe(&["cart-events"]).unwrap();
let records = consumer.poll(Duration::from_millis(100)).unwrap();

for record in &records {
    let event: CartEvent = serde_json::from_str(&record.value).unwrap();
    apply_cart_event(&mut cart_state, &event);
}
consumer.commit_sync().unwrap();
Polled 30 records.
cart-1: 7 items, total $125.43
  Widget x3 @ $9.99 = $29.97
  Gadget x1 @ $24.99 = $24.99
  Thingamajig x1 @ $15.00 = $15.00
  Gizmo x2 @ $7.50 = $15.00
  Doodad x3 @ $5.99 = $17.97
  Spring x4 @ $2.25 = $9.00
  Lever x1 @ $13.50 = $13.50
cart-2: 6 items, total $141.25
cart-3: 6 items, total $49.50
Step 5

Projection 2 — Revenue report

A second consumer group (“revenue”) reads the same 30 events but builds a different view. It tracks which carts have checked out and only counts their line item totals toward revenue. Cart 3 hasn't checked out, so it's excluded from the total.

let mut checked_out: HashMap<String, bool> = HashMap::new();
let mut revenue_items: HashMap<String, Vec<LineItem>> = HashMap::new();

let mut consumer = Broker::consumer(&broker, ConsumerConfig {
    group_id: "revenue".into(),
    auto_commit: false,
    offset_reset: OffsetReset::Earliest,
});
consumer.subscribe(&["cart-events"]).unwrap();
let records = consumer.poll(Duration::from_millis(100)).unwrap();

for record in &records {
    let event: CartEvent = serde_json::from_str(&record.value).unwrap();
    match &event {
        CartEvent::Checkout { cart_id } => {
            checked_out.insert(cart_id.clone(), true);
        }
        _ => apply_cart_event(&mut revenue_items, &event),
    }
}
consumer.commit_sync().unwrap();
cart-1 (checked out): $125.43
cart-2 (checked out): $141.25
Total revenue: $266.68
Step 6

Produce incremental events

5 more events complete cart-3: add two new items, remove one existing item, add another, and finally checkout. These events are appended to the same topic.

let new_events = vec![
    CartEvent::AddItem { cart_id: "cart-3".into(),
        item: "Clamp".into(), qty: 5, price: 2.50 },
    CartEvent::AddItem { cart_id: "cart-3".into(),
        item: "Hinge".into(), qty: 8, price: 1.75 },
    CartEvent::RemoveItem { cart_id: "cart-3".into(),
        item: "Anchor".into() },
    CartEvent::AddItem { cart_id: "cart-3".into(),
        item: "Latch".into(), qty: 3, price: 4.00 },
    CartEvent::Checkout { cart_id: "cart-3".into() },
];
Step 7

Incremental projection updates

Both consumer groups poll again. Because they committed their offsets after step 4/5, they each receive exactly the 5 new events—not the full 35. Each projection incrementally updates its view without replaying the entire history.

// Cart state consumer picks up from offset 30
let records = cart_consumer.poll(Duration::from_millis(100)).unwrap();
assert_eq!(records.len(), 5);
for record in &records {
    let event: CartEvent = serde_json::from_str(&record.value).unwrap();
    apply_cart_event(&mut cart_state, &event);
}

// Revenue consumer also picks up from offset 30
let records = revenue_consumer.poll(Duration::from_millis(100)).unwrap();
assert_eq!(records.len(), 5);
Cart state consumer: Polled 5 new records (expected 5).
Revenue consumer:   Polled 5 new records (expected 5).
Step 8

Updated projections

After processing the 5 new events, both projections reflect the updated state. Cart-3 now has 8 items totalling $76.00 (with Anchor removed, Clamp/Hinge/Latch added), and the revenue report now includes cart-3 since it checked out.

Updated cart state:
cart-1: 7 items, total $125.43
cart-2: 6 items, total $141.25
cart-3: 8 items, total $76.00
  Bolt x100 @ $0.10 = $10.00
  Nut x100 @ $0.08 = $8.00
  Rivet x200 @ $0.03 = $6.00
  Pin x50 @ $0.15 = $7.50
  Screw x150 @ $0.04 = $6.00
  Clamp x5 @ $2.50 = $12.50
  Hinge x8 @ $1.75 = $14.00
  Latch x3 @ $4.00 = $12.00

Updated revenue:
cart-1 (checked out): $125.43
cart-2 (checked out): $141.25
cart-3 (checked out): $76.00
Total revenue: $342.68

Event sourcing with merkql

Events as the source of truth

Instead of storing the current state of each cart, this pattern stores the sequence of events that produced that state. The current state is derived by replaying events. This gives you a complete audit trail and the ability to build new views from historical data.

CQRS: Command Query Responsibility Segregation

The write side (producer) appends events to the log. The read side (consumers) builds materialized views. Each consumer group maintains its own read model optimised for its specific query pattern. Writing events and reading projections are completely decoupled.

Consumer groups as projection builders

merkql's consumer group model maps naturally to CQRS projections. Each group has independent offset tracking, so you can add a new projection at any time—it replays from the beginning while existing projections continue incrementally.

Event replay vs incremental update

On first startup, a projection replays the entire event stream (offset reset = Earliest). On subsequent runs, it resumes from its committed offset and processes only new events. This is the same pattern used in production event-sourced systems with Kafka.

Keyed records for entity ordering

Using the cart ID as the record key ensures all events for the same cart are ordered within a partition. With multiple partitions, different carts can be processed in parallel while maintaining per-cart consistency.


APIs used

APIPurpose
ProducerRecord::new()Keyed records (cart_id routes to partition)
producer.send()Append domain events to the log
ConsumerConfig.group_idIndependent consumer group per projection
OffsetReset::EarliestReplay from beginning on first run
consumer.subscribe()Subscribe to the event topic
consumer.poll()Fetch events from current position
consumer.commit_sync()Persist position for incremental updates
consumer.close()Clean shutdown of consumer
cargo run -p merkql-event-sourced-cart