Sensor pipeline

A complete IoT data pipeline: produce, consume, batch, compress, retain, and survive broker restarts.

Producer send_batch Consumer groups LZ4 Retention Restart

What this example covers

This example exercises the core produce/consume lifecycle through a realistic IoT scenario: 4 temperature sensors streaming readings into a single topic.

Keyed records

Each sensor reading uses the sensor ID as the record key. Keys determine partition routing—all readings from the same sensor land in the same partition, preserving per-sensor ordering.

Individual and batch production

200 records sent one at a time via producer.send(), then 800 more via producer.send_batch(). Batch sends amortise fsync overhead—one flush per batch instead of one per record.

Multiple consumer groups

Two consumer groups (“analytics” and “alerts”) read the same topic independently, each maintaining its own offset position. One group's progress never affects the other.

LZ4 compression

The broker is opened with Compression::Lz4. All records are compressed before storage and transparently decompressed on read. Hashes are computed on uncompressed data, so merkle proofs remain valid regardless of compression.

Retention policy

Configured with max_records: Some(500). After 1,000 records are produced, consumers starting from OffsetReset::Earliest only see the most recent 500.

Restart persistence

The broker is dropped and reopened from the same directory. Committed consumer group offsets survive the restart—no duplicate delivery.

Sensor pipeline flow: sensors to producer to topic with fan-out to two consumer groups sensor-a sensor-b sensor-c 4 sensors Producer send() send_batch() Topic: sensors LZ4 ret: 500 1,000 records analytics min / max / avg alerts temp > 35.0 filter offsets preserved restart

Step by step

Step 1

Configure the broker

Open a broker with LZ4 compression and a retention policy of 500 records per partition. The BrokerConfig struct uses builder-style defaults—..BrokerConfig::new(path) fills in auto_create_topics: true and default_partitions: 1.

let config = BrokerConfig {
    compression: Compression::Lz4,
    default_retention: RetentionConfig {
        max_records: Some(500),
    },
    ..BrokerConfig::new(&data_path)
};
let broker = Broker::open(config).unwrap();
Step 2

Produce individual records

Send 200 sensor readings one at a time. Each ProducerRecord carries a topic name, an optional key, and a string value. The key is the sensor ID—merkql hashes the key to determine partition assignment, so all readings from sensor-a go to the same partition.

let producer = Broker::producer(&broker);
let sensors = ["sensor-a", "sensor-b", "sensor-c", "sensor-d"];

for i in 0..200 {
    let sensor_id = sensors[i % sensors.len()];
    let reading = SensorReading {
        temp: 20.0 + (i as f64 * 0.1) % 20.0,
        ts: 1700000000 + i as u64,
    };
    let value = serde_json::to_string(&reading).unwrap();
    producer.send(&ProducerRecord::new(
        "sensors",
        Some(sensor_id.to_string()),
        value,
    )).unwrap();
}

Each send() call appends the record to the partition, updates the merkle tree, flushes the index, and fsyncs. This guarantees durability per record but costs ~31µs each.

Step 3

Batch produce

Send 800 more readings in 8 batches of 100 using send_batch(). Internally, the batch API buffers all writes and performs a single fsync at the end—significantly higher throughput for bulk ingestion.

for batch_num in 0..8 {
    let batch: Vec<ProducerRecord> = (0..100)
        .map(|j| {
            let idx = 200 + batch_num * 100 + j;
            ProducerRecord::new(
                "sensors",
                Some(sensors[idx % sensors.len()].to_string()),
                serde_json::to_string(&reading).unwrap(),
            )
        })
        .collect();
    producer.send_batch(&batch).unwrap();
}
Step 4

Consume with the “analytics” group

Create a consumer in the “analytics” group with auto_commit: false and OffsetReset::Earliest. Subscribe to the “sensors” topic and poll. Because retention is set to 500 records, only the most recent 500 are available—the first 500 have been logically trimmed.

let mut consumer = Broker::consumer(
    &broker,
    ConsumerConfig {
        group_id: "analytics".into(),
        auto_commit: false,
        offset_reset: OffsetReset::Earliest,
    },
);
consumer.subscribe(&["sensors"]).unwrap();
let records = consumer.poll(Duration::from_millis(100)).unwrap();

After processing, commit_sync() persists the current offset to disk so the group can resume later without re-reading committed records.

// Compute min/max/avg per sensor, then commit
consumer.commit_sync().unwrap();
consumer.close().unwrap();
Polled 500 records.
sensor-a: min=20.0, max=39.6, avg=30.8 (125 readings)
sensor-b: min=20.1, max=39.7, avg=30.9 (125 readings)
sensor-c: min=20.2, max=39.8, avg=31.0 (125 readings)
sensor-d: min=20.3, max=39.9, avg=31.1 (125 readings)
Step 5

Independent consumer group

A second consumer group (“alerts”) reads the same topic independently. It filters for readings above 35.0°C and prints alerts. Each group has its own committed offsets—the “analytics” group's earlier commit has no effect on where “alerts” starts reading.

let mut consumer = Broker::consumer(
    &broker,
    ConsumerConfig {
        group_id: "alerts".into(),
        auto_commit: false,
        offset_reset: OffsetReset::Earliest,
    },
);
consumer.subscribe(&["sensors"]).unwrap();
let records = consumer.poll(Duration::from_millis(100)).unwrap();

for record in &records {
    let reading: SensorReading = serde_json::from_str(&record.value).unwrap();
    if reading.temp > 35.0 {
        println!("ALERT: {} temp={:.1}",
            record.key.as_deref().unwrap_or("unknown"), reading.temp);
    }
}
ALERT: sensor-d temp=35.1 at ts=1700000551
ALERT: sensor-a temp=35.2 at ts=1700000552
ALERT: sensor-b temp=35.3 at ts=1700000553
... and 142 more alerts
Total alerts: 147 out of 500 readings
Step 6

Broker restart

Drop the broker entirely and reopen it from the same directory. All state—topics, partitions, merkle trees, consumer group offsets—is recovered from disk. This simulates a process restart.

drop(broker);

let config = BrokerConfig {
    compression: Compression::Lz4,
    default_retention: RetentionConfig {
        max_records: Some(500),
    },
    ..BrokerConfig::new(&data_path)
};
let broker = Broker::open(config).unwrap();
Step 7

Resume without duplicates

The “analytics” consumer reconnects after the restart. Because it committed its offsets before the broker was dropped, it resumes from where it left off and receives zero records.

let mut consumer = Broker::consumer(&broker, analytics_config);
consumer.subscribe(&["sensors"]).unwrap();
let records = consumer.poll(Duration::from_millis(100)).unwrap();
// records.len() == 0
Polled 0 records (expected 0).
Step 8

Incremental consumption

Produce 100 more readings, then poll with the “analytics” group again. It receives exactly the 100 new records—no more, no less. This confirms that offset tracking works correctly across broker restarts.

// Produce 100 more readings...

let records = consumer.poll(Duration::from_millis(100)).unwrap();
assert_eq!(records.len(), 100);
Produced 100 more readings.
Polled 100 records (expected 100).
Step 9

Disk usage

The example prints the total size of the data directory. With LZ4 compression and 1,100 JSON sensor readings, the entire dataset (including merkle tree nodes, indices, and consumer group state) fits in approximately 563 KB.

Data directory: 562.8 KB

How it works

Record routing

When a record has a key, merkql hashes the key and routes to hash(key) % num_partitions. Without a key, records are distributed round-robin. This ensures all events for the same entity (sensor, user, cart) are ordered within a single partition.

Consumer group offsets

Each consumer group stores its committed offset per topic-partition in .merkql/groups/{group_id}/offsets.bin. On subscribe(), the consumer checks for committed offsets and resumes from there. If none exist, it falls back to the offset_reset policy (Earliest or Latest).

Consumer offset timeline showing offset persistence across broker restart Before restart 0 1 2 ... 999 analytics @ 999 alerts @ 999 restart After restart + 100 new records 999 1000 1001 ... 1099 analytics → 1099 polls 100 new Committed offsets survive broker restart. After reopen, analytics receives exactly 100 new records — no duplicates. Each consumer group tracks its position independently.

Retention

When max_records is set, the partition advances its min_valid_offset so that at most N records are readable. Records below this watermark return None on read. The data remains on disk (in the pack file) but is logically invisible.

Compression

LZ4 compression is applied per-object in the pack file. Each compressed entry starts with a 1-byte marker (0x00 for none, 0x01 for LZ4), so a broker can read data written with a different compression setting. Hashes are computed on uncompressed data.


APIs used

APIPurpose
BrokerConfigLZ4 compression, retention of 500 records
Broker::open()Open or create broker from a directory
Broker::producer()Create a producer for the broker
Broker::consumer()Create a consumer with group config
ProducerRecord::new()Keyed records with JSON values
producer.send()Individual record production with fsync
producer.send_batch()Batch production with amortised fsync
consumer.subscribe()Subscribe to topics, load committed offsets
consumer.poll()Fetch records from current position
consumer.commit_sync()Persist current position to disk
consumer.close()Close consumer (auto-commits if configured)
cargo run -p merkql-sensor-pipeline