Event sourcing and CQRS: domain events, multiple projections from the same stream, and incremental updates without full replay.
Overview
This example implements a classic event sourcing and CQRS architecture using merkql as the event store. Domain events for a shopping cart are appended to a topic, then multiple consumer groups independently replay and project different views of the same data.
Cart events (AddItem, RemoveItem, Checkout) are modelled as a tagged enum and serialized to JSON. The serde(tag = "type") attribute produces discriminated unions in the JSON output.
Each event uses the cart ID as the record key. This routes all events for the same cart to the same partition, preserving per-cart ordering without requiring global ordering across carts.
Two consumer groups read the same event stream independently. “cart-state” materializes the current contents of each cart. “revenue” tracks which carts have checked out and sums their totals. Same data, different views.
After initial replay, both consumers commit their offsets. When new events arrive, each consumer polls and receives only the new events—no full replay needed. This is the production pattern for maintaining live projections.
Walkthrough
Cart events are modelled as a Rust enum with #[serde(tag = "type")] for tagged JSON serialization. Each variant carries a cart_id used as the record key. A helper method extracts the cart ID regardless of variant.
#[derive(Serialize, Deserialize)]
#[serde(tag = "type")]
enum CartEvent {
AddItem {
cart_id: String,
item: String,
qty: u32,
price: f64,
},
RemoveItem {
cart_id: String,
item: String,
},
Checkout {
cart_id: String,
},
}
impl CartEvent {
fn cart_id(&self) -> &str {
match self {
CartEvent::AddItem { cart_id, .. } => cart_id,
CartEvent::RemoveItem { cart_id, .. } => cart_id,
CartEvent::Checkout { cart_id, .. } => cart_id,
}
}
}
An AddItem event serializes to JSON like:
{"type":"AddItem","cart_id":"cart-1","item":"Widget","qty":2,"price":9.99}
30 events are produced across 3 carts. Carts 1 and 2 go through a full lifecycle: add items, remove some, checkout. Cart 3 has items added but hasn't checked out yet. The cart ID is used as the record key.
let producer = Broker::producer(&broker);
let events: Vec<CartEvent> = vec![
// Cart 1: add items, remove one, checkout
CartEvent::AddItem { cart_id: "cart-1".into(),
item: "Widget".into(), qty: 2, price: 9.99 },
CartEvent::AddItem { cart_id: "cart-1".into(),
item: "Gadget".into(), qty: 1, price: 24.99 },
// ... more events ...
CartEvent::Checkout { cart_id: "cart-1".into() },
// Cart 2: similar lifecycle
// Cart 3: items added, no checkout
];
for event in &events {
let value = serde_json::to_string(event).unwrap();
producer.send(&ProducerRecord::new(
"cart-events",
Some(event.cart_id().to_string()),
value,
)).unwrap();
}
The “cart-state” consumer group replays all 30 events and builds a HashMap<cart_id, Vec<LineItem>>. The apply_cart_event function handles each variant: AddItem inserts or updates quantities, RemoveItem drops the item, Checkout is a no-op for this projection.
let mut cart_state: HashMap<String, Vec<LineItem>> = HashMap::new();
let mut consumer = Broker::consumer(&broker, ConsumerConfig {
group_id: "cart-state".into(),
auto_commit: false,
offset_reset: OffsetReset::Earliest,
});
consumer.subscribe(&["cart-events"]).unwrap();
let records = consumer.poll(Duration::from_millis(100)).unwrap();
for record in &records {
let event: CartEvent = serde_json::from_str(&record.value).unwrap();
apply_cart_event(&mut cart_state, &event);
}
consumer.commit_sync().unwrap();
Polled 30 records.
cart-1: 7 items, total $125.43
Widget x3 @ $9.99 = $29.97
Gadget x1 @ $24.99 = $24.99
Thingamajig x1 @ $15.00 = $15.00
Gizmo x2 @ $7.50 = $15.00
Doodad x3 @ $5.99 = $17.97
Spring x4 @ $2.25 = $9.00
Lever x1 @ $13.50 = $13.50
cart-2: 6 items, total $141.25
cart-3: 6 items, total $49.50
A second consumer group (“revenue”) reads the same 30 events but builds a different view. It tracks which carts have checked out and only counts their line item totals toward revenue. Cart 3 hasn't checked out, so it's excluded from the total.
let mut checked_out: HashMap<String, bool> = HashMap::new();
let mut revenue_items: HashMap<String, Vec<LineItem>> = HashMap::new();
let mut consumer = Broker::consumer(&broker, ConsumerConfig {
group_id: "revenue".into(),
auto_commit: false,
offset_reset: OffsetReset::Earliest,
});
consumer.subscribe(&["cart-events"]).unwrap();
let records = consumer.poll(Duration::from_millis(100)).unwrap();
for record in &records {
let event: CartEvent = serde_json::from_str(&record.value).unwrap();
match &event {
CartEvent::Checkout { cart_id } => {
checked_out.insert(cart_id.clone(), true);
}
_ => apply_cart_event(&mut revenue_items, &event),
}
}
consumer.commit_sync().unwrap();
cart-1 (checked out): $125.43
cart-2 (checked out): $141.25
Total revenue: $266.68
5 more events complete cart-3: add two new items, remove one existing item, add another, and finally checkout. These events are appended to the same topic.
let new_events = vec![
CartEvent::AddItem { cart_id: "cart-3".into(),
item: "Clamp".into(), qty: 5, price: 2.50 },
CartEvent::AddItem { cart_id: "cart-3".into(),
item: "Hinge".into(), qty: 8, price: 1.75 },
CartEvent::RemoveItem { cart_id: "cart-3".into(),
item: "Anchor".into() },
CartEvent::AddItem { cart_id: "cart-3".into(),
item: "Latch".into(), qty: 3, price: 4.00 },
CartEvent::Checkout { cart_id: "cart-3".into() },
];
Both consumer groups poll again. Because they committed their offsets after step 4/5, they each receive exactly the 5 new events—not the full 35. Each projection incrementally updates its view without replaying the entire history.
// Cart state consumer picks up from offset 30
let records = cart_consumer.poll(Duration::from_millis(100)).unwrap();
assert_eq!(records.len(), 5);
for record in &records {
let event: CartEvent = serde_json::from_str(&record.value).unwrap();
apply_cart_event(&mut cart_state, &event);
}
// Revenue consumer also picks up from offset 30
let records = revenue_consumer.poll(Duration::from_millis(100)).unwrap();
assert_eq!(records.len(), 5);
Cart state consumer: Polled 5 new records (expected 5).
Revenue consumer: Polled 5 new records (expected 5).
After processing the 5 new events, both projections reflect the updated state. Cart-3 now has 8 items totalling $76.00 (with Anchor removed, Clamp/Hinge/Latch added), and the revenue report now includes cart-3 since it checked out.
Updated cart state:
cart-1: 7 items, total $125.43
cart-2: 6 items, total $141.25
cart-3: 8 items, total $76.00
Bolt x100 @ $0.10 = $10.00
Nut x100 @ $0.08 = $8.00
Rivet x200 @ $0.03 = $6.00
Pin x50 @ $0.15 = $7.50
Screw x150 @ $0.04 = $6.00
Clamp x5 @ $2.50 = $12.50
Hinge x8 @ $1.75 = $14.00
Latch x3 @ $4.00 = $12.00
Updated revenue:
cart-1 (checked out): $125.43
cart-2 (checked out): $141.25
cart-3 (checked out): $76.00
Total revenue: $342.68
Concepts
Instead of storing the current state of each cart, this pattern stores the sequence of events that produced that state. The current state is derived by replaying events. This gives you a complete audit trail and the ability to build new views from historical data.
The write side (producer) appends events to the log. The read side (consumers) builds materialized views. Each consumer group maintains its own read model optimised for its specific query pattern. Writing events and reading projections are completely decoupled.
merkql's consumer group model maps naturally to CQRS projections. Each group has independent offset tracking, so you can add a new projection at any time—it replays from the beginning while existing projections continue incrementally.
On first startup, a projection replays the entire event stream (offset reset = Earliest). On subsequent runs, it resumes from its committed offset and processes only new events. This is the same pattern used in production event-sourced systems with Kafka.
Using the cart ID as the record key ensures all events for the same cart are ordered within a partition. With multiple partitions, different carts can be processed in parallel while maintaining per-cart consistency.
Reference
| API | Purpose |
|---|---|
ProducerRecord::new() | Keyed records (cart_id routes to partition) |
producer.send() | Append domain events to the log |
ConsumerConfig.group_id | Independent consumer group per projection |
OffsetReset::Earliest | Replay from beginning on first run |
consumer.subscribe() | Subscribe to the event topic |
consumer.poll() | Fetch events from current position |
consumer.commit_sync() | Persist position for incremental updates |
consumer.close() | Clean shutdown of consumer |
cargo run -p merkql-event-sourced-cart