A complete IoT data pipeline: produce, consume, batch, compress, retain, and survive broker restarts.
Overview
This example exercises the core produce/consume lifecycle through a realistic IoT scenario: 4 temperature sensors streaming readings into a single topic.
Each sensor reading uses the sensor ID as the record key. Keys determine partition routing—all readings from the same sensor land in the same partition, preserving per-sensor ordering.
200 records sent one at a time via producer.send(), then 800 more via producer.send_batch(). Batch sends amortise fsync overhead—one flush per batch instead of one per record.
Two consumer groups (“analytics” and “alerts”) read the same topic independently, each maintaining its own offset position. One group's progress never affects the other.
The broker is opened with Compression::Lz4. All records are compressed before storage and transparently decompressed on read. Hashes are computed on uncompressed data, so merkle proofs remain valid regardless of compression.
Configured with max_records: Some(500). After 1,000 records are produced, consumers starting from OffsetReset::Earliest only see the most recent 500.
The broker is dropped and reopened from the same directory. Committed consumer group offsets survive the restart—no duplicate delivery.
Walkthrough
Open a broker with LZ4 compression and a retention policy of 500 records per partition. The BrokerConfig struct uses builder-style defaults—..BrokerConfig::new(path) fills in auto_create_topics: true and default_partitions: 1.
let config = BrokerConfig {
compression: Compression::Lz4,
default_retention: RetentionConfig {
max_records: Some(500),
},
..BrokerConfig::new(&data_path)
};
let broker = Broker::open(config).unwrap();
Send 200 sensor readings one at a time. Each ProducerRecord carries a topic name, an optional key, and a string value. The key is the sensor ID—merkql hashes the key to determine partition assignment, so all readings from sensor-a go to the same partition.
let producer = Broker::producer(&broker);
let sensors = ["sensor-a", "sensor-b", "sensor-c", "sensor-d"];
for i in 0..200 {
let sensor_id = sensors[i % sensors.len()];
let reading = SensorReading {
temp: 20.0 + (i as f64 * 0.1) % 20.0,
ts: 1700000000 + i as u64,
};
let value = serde_json::to_string(&reading).unwrap();
producer.send(&ProducerRecord::new(
"sensors",
Some(sensor_id.to_string()),
value,
)).unwrap();
}
Each send() call appends the record to the partition, updates the merkle tree, flushes the index, and fsyncs. This guarantees durability per record but costs ~31µs each.
Send 800 more readings in 8 batches of 100 using send_batch(). Internally, the batch API buffers all writes and performs a single fsync at the end—significantly higher throughput for bulk ingestion.
for batch_num in 0..8 {
let batch: Vec<ProducerRecord> = (0..100)
.map(|j| {
let idx = 200 + batch_num * 100 + j;
ProducerRecord::new(
"sensors",
Some(sensors[idx % sensors.len()].to_string()),
serde_json::to_string(&reading).unwrap(),
)
})
.collect();
producer.send_batch(&batch).unwrap();
}
Create a consumer in the “analytics” group with auto_commit: false and OffsetReset::Earliest. Subscribe to the “sensors” topic and poll. Because retention is set to 500 records, only the most recent 500 are available—the first 500 have been logically trimmed.
let mut consumer = Broker::consumer(
&broker,
ConsumerConfig {
group_id: "analytics".into(),
auto_commit: false,
offset_reset: OffsetReset::Earliest,
},
);
consumer.subscribe(&["sensors"]).unwrap();
let records = consumer.poll(Duration::from_millis(100)).unwrap();
After processing, commit_sync() persists the current offset to disk so the group can resume later without re-reading committed records.
// Compute min/max/avg per sensor, then commit
consumer.commit_sync().unwrap();
consumer.close().unwrap();
Polled 500 records.
sensor-a: min=20.0, max=39.6, avg=30.8 (125 readings)
sensor-b: min=20.1, max=39.7, avg=30.9 (125 readings)
sensor-c: min=20.2, max=39.8, avg=31.0 (125 readings)
sensor-d: min=20.3, max=39.9, avg=31.1 (125 readings)
A second consumer group (“alerts”) reads the same topic independently. It filters for readings above 35.0°C and prints alerts. Each group has its own committed offsets—the “analytics” group's earlier commit has no effect on where “alerts” starts reading.
let mut consumer = Broker::consumer(
&broker,
ConsumerConfig {
group_id: "alerts".into(),
auto_commit: false,
offset_reset: OffsetReset::Earliest,
},
);
consumer.subscribe(&["sensors"]).unwrap();
let records = consumer.poll(Duration::from_millis(100)).unwrap();
for record in &records {
let reading: SensorReading = serde_json::from_str(&record.value).unwrap();
if reading.temp > 35.0 {
println!("ALERT: {} temp={:.1}",
record.key.as_deref().unwrap_or("unknown"), reading.temp);
}
}
ALERT: sensor-d temp=35.1 at ts=1700000551
ALERT: sensor-a temp=35.2 at ts=1700000552
ALERT: sensor-b temp=35.3 at ts=1700000553
... and 142 more alerts
Total alerts: 147 out of 500 readings
Drop the broker entirely and reopen it from the same directory. All state—topics, partitions, merkle trees, consumer group offsets—is recovered from disk. This simulates a process restart.
drop(broker);
let config = BrokerConfig {
compression: Compression::Lz4,
default_retention: RetentionConfig {
max_records: Some(500),
},
..BrokerConfig::new(&data_path)
};
let broker = Broker::open(config).unwrap();
The “analytics” consumer reconnects after the restart. Because it committed its offsets before the broker was dropped, it resumes from where it left off and receives zero records.
let mut consumer = Broker::consumer(&broker, analytics_config);
consumer.subscribe(&["sensors"]).unwrap();
let records = consumer.poll(Duration::from_millis(100)).unwrap();
// records.len() == 0
Polled 0 records (expected 0).
Produce 100 more readings, then poll with the “analytics” group again. It receives exactly the 100 new records—no more, no less. This confirms that offset tracking works correctly across broker restarts.
// Produce 100 more readings...
let records = consumer.poll(Duration::from_millis(100)).unwrap();
assert_eq!(records.len(), 100);
Produced 100 more readings.
Polled 100 records (expected 100).
The example prints the total size of the data directory. With LZ4 compression and 1,100 JSON sensor readings, the entire dataset (including merkle tree nodes, indices, and consumer group state) fits in approximately 563 KB.
Data directory: 562.8 KB
Concepts
When a record has a key, merkql hashes the key and routes to hash(key) % num_partitions. Without a key, records are distributed round-robin. This ensures all events for the same entity (sensor, user, cart) are ordered within a single partition.
Each consumer group stores its committed offset per topic-partition in .merkql/groups/{group_id}/offsets.bin. On subscribe(), the consumer checks for committed offsets and resumes from there. If none exist, it falls back to the offset_reset policy (Earliest or Latest).
When max_records is set, the partition advances its min_valid_offset so that at most N records are readable. Records below this watermark return None on read. The data remains on disk (in the pack file) but is logically invisible.
LZ4 compression is applied per-object in the pack file. Each compressed entry starts with a 1-byte marker (0x00 for none, 0x01 for LZ4), so a broker can read data written with a different compression setting. Hashes are computed on uncompressed data.
Reference
| API | Purpose |
|---|---|
BrokerConfig | LZ4 compression, retention of 500 records |
Broker::open() | Open or create broker from a directory |
Broker::producer() | Create a producer for the broker |
Broker::consumer() | Create a consumer with group config |
ProducerRecord::new() | Keyed records with JSON values |
producer.send() | Individual record production with fsync |
producer.send_batch() | Batch production with amortised fsync |
consumer.subscribe() | Subscribe to topics, load committed offsets |
consumer.poll() | Fetch records from current position |
consumer.commit_sync() | Persist current position to disk |
consumer.close() | Close consumer (auto-commits if configured) |
cargo run -p merkql-sensor-pipeline