Rigatoni Observability Guide
This guide explains how to monitor and troubleshoot Rigatoni pipelines in production using metrics, logging, and distributed tracing.
Table of Contents
- Overview
- Metrics
- Prometheus Integration
- Grafana Dashboards
- Alerting
- Performance Tuning
- Troubleshooting
Overview
Rigatoni provides comprehensive observability through three pillars:
- Metrics: Quantitative measurements using the
metricscrate and Prometheus - Logging: Structured logging with
tracing - Health Checks: Pipeline status and health endpoints
Metrics
Metric Types
Rigatoni uses three types of metrics:
Counters (Monotonically Increasing)
rigatoni_events_processed_total- Total events successfully processedrigatoni_events_failed_total- Total events that failed processingrigatoni_retries_total- Total retry attemptsrigatoni_batches_written_total- Total batches written to destinationsrigatoni_destination_write_errors_total- Total destination write errors
Histograms (Value Distributions)
rigatoni_batch_size- Distribution of batch sizesrigatoni_batch_duration_seconds- Time to process batchesrigatoni_destination_write_duration_seconds- Destination write latencyrigatoni_destination_write_bytes- Size of data writtenrigatoni_change_stream_lag_seconds- Change stream lag
Gauges (Point-in-Time Values)
rigatoni_active_collections- Number of monitored collectionsrigatoni_pipeline_status- Pipeline status (0=stopped, 1=running, 2=error)rigatoni_batch_queue_size- Events buffered awaiting write
Metric Labels
All metrics include relevant labels for filtering and aggregation:
collection: MongoDB collection namedestination_type: Destination type (currently only s3 in 0.1.1)operation: Operation type (insert, update, delete)error_type: Error category (timeout_error, connection_error, etc.)
Cardinality Considerations
⚠️ Important: Labels increase cardinality, which affects Prometheus performance and storage.
Safe Labels (low cardinality, <100 unique values):
- collection names
- destination types
- operation types
- error categories
Dangerous Labels (avoid these):
- Document IDs
- Timestamps
- User IDs
- Full error messages
- IP addresses
Prometheus Integration
Setup
- Enable metrics in your pipeline:
use rigatoni_core::metrics;
use metrics_exporter_prometheus::PrometheusBuilder;
// Initialize metrics
metrics::init_metrics();
// Start Prometheus exporter
let addr = ([0, 0, 0, 0], 9000).into();
PrometheusBuilder::new()
.with_http_listener(addr)
.install()
.expect("Failed to install Prometheus exporter");
- Configure Prometheus (
prometheus.yml):
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
- job_name: 'rigatoni'
static_configs:
- targets: ['localhost:9000']
metric_relabel_configs:
# Drop high-cardinality labels if needed
- source_labels: [__name__]
regex: 'rigatoni_.*'
action: keep
- Run the metrics example:
cargo run --example metrics_prometheus --features metrics-export
- Verify metrics:
curl http://localhost:9000/metrics | grep rigatoni_
Useful PromQL Queries
Throughput
# Events processed per second
rate(rigatoni_events_processed_total[5m])
# Events processed per second by collection
sum by (collection) (rate(rigatoni_events_processed_total[5m]))
# Total throughput across all collections
sum(rate(rigatoni_events_processed_total[5m]))
Latency
# 50th percentile write latency
histogram_quantile(0.50, rate(rigatoni_destination_write_duration_seconds_bucket[5m]))
# 95th percentile write latency
histogram_quantile(0.95, rate(rigatoni_destination_write_duration_seconds_bucket[5m]))
# 99th percentile write latency
histogram_quantile(0.99, rate(rigatoni_destination_write_duration_seconds_bucket[5m]))
# Average batch processing time
rate(rigatoni_batch_duration_seconds_sum[5m]) / rate(rigatoni_batch_duration_seconds_count[5m])
Error Rates
# Error rate (events/sec)
rate(rigatoni_events_failed_total[5m])
# Error rate by type
sum by (error_type) (rate(rigatoni_events_failed_total[5m]))
# Error percentage
100 * (
rate(rigatoni_events_failed_total[5m]) /
(rate(rigatoni_events_processed_total[5m]) + rate(rigatoni_events_failed_total[5m]))
)
Queue Depth
# Current queue size
rigatoni_batch_queue_size
# Queue size by collection
sum by (collection) (rigatoni_batch_queue_size)
# Queue growth rate (positive = filling up, negative = draining)
deriv(rigatoni_batch_queue_size[5m])
Data Volume
# Bytes written per second
rate(rigatoni_destination_write_bytes_sum[5m])
# Megabytes written per hour
rate(rigatoni_destination_write_bytes_sum[1h]) / 1024 / 1024
# Average batch size
rigatoni_batch_size_sum / rigatoni_batch_size_count
Grafana Dashboards
Importing the Dashboard
- Navigate to Grafana → Dashboards → Import
- Upload
docs/grafana/rigatoni-dashboard.json - Select your Prometheus datasource
- Click Import
Dashboard Panels
Overview Row
- Pipeline Status: Current pipeline state (running/stopped/error)
- Active Collections: Number of collections being monitored
- Events Processed: Real-time throughput graph
- Error Rate: Failed events over time
Performance Row
- Batch Size Distribution: Heatmap showing batch size patterns
- Write Latency Percentiles: p50, p95, p99 latencies
- Batch Processing Time: Time from event receipt to destination write
- Queue Depth: Events waiting to be written
Health Row
- Retry Rate: Retry attempts over time
- Destination Errors: Errors by destination type
- Data Written: Bytes/sec written to destinations
- Change Stream Lag: Delay between MongoDB operation and event receipt
Custom Queries
Add custom panels with these queries:
Success Rate:
100 * (
rate(rigatoni_events_processed_total[5m]) /
(rate(rigatoni_events_processed_total[5m]) + rate(rigatoni_events_failed_total[5m]))
)
Average Batch Size by Collection:
avg by (collection) (rigatoni_batch_size)
Destination Write Success Rate:
100 * (
rate(rigatoni_batches_written_total[5m]) /
(rate(rigatoni_batches_written_total[5m]) + rate(rigatoni_destination_write_errors_total[5m]))
)
Alerting
Recommended Alerts
Critical Alerts
Pipeline Down:
- alert: RigatoniPipelineDown
expr: rigatoni_pipeline_status != 1
for: 1m
labels:
severity: critical
annotations:
summary: "Rigatoni pipeline is not running"
description: "Pipeline status is (expected 1=running)"
High Error Rate:
- alert: RigatoniHighErrorRate
expr: |
(
rate(rigatoni_events_failed_total[5m]) /
(rate(rigatoni_events_processed_total[5m]) + rate(rigatoni_events_failed_total[5m]))
) > 0.05
for: 5m
labels:
severity: critical
annotations:
summary: "High error rate in Rigatoni pipeline"
description: "Error rate is (threshold: 5%)"
No Events Processed:
- alert: RigatoniNoEventsProcessed
expr: rate(rigatoni_events_processed_total[10m]) == 0
for: 10m
labels:
severity: warning
annotations:
summary: "Rigatoni pipeline not processing events"
description: "No events processed in the last 10 minutes"
Warning Alerts
High Write Latency:
- alert: RigatoniHighWriteLatency
expr: |
histogram_quantile(0.99,
rate(rigatoni_destination_write_duration_seconds_bucket[5m])
) > 5
for: 10m
labels:
severity: warning
annotations:
summary: "High destination write latency"
description: "P99 write latency is s (threshold: 5s)"
Queue Growing:
- alert: RigatoniQueueGrowing
expr: deriv(rigatoni_batch_queue_size[10m]) > 10
for: 15m
labels:
severity: warning
annotations:
summary: "Batch queue growing"
description: "Queue is growing at events/min for collection "
High Retry Rate:
- alert: RigatoniHighRetryRate
expr: rate(rigatoni_retries_total[5m]) > 1
for: 10m
labels:
severity: warning
annotations:
summary: "High retry rate"
description: "Retry rate is retries/sec for "
Alert Routing
Configure Alertmanager to route alerts based on severity:
route:
group_by: ['alertname', 'collection']
group_wait: 10s
group_interval: 10s
repeat_interval: 12h
receiver: 'default'
routes:
- match:
severity: critical
receiver: 'pagerduty'
- match:
severity: warning
receiver: 'slack'
receivers:
- name: 'pagerduty'
pagerduty_configs:
- service_key: '<your-key>'
- name: 'slack'
slack_configs:
- api_url: '<your-webhook>'
channel: '#rigatoni-alerts'
Performance Tuning
Identifying Bottlenecks
1. High Queue Depth
Symptoms:
rigatoni_batch_queue_sizecontinuously growingderiv(rigatoni_batch_queue_size[10m]) > 0
Causes:
- Destination writes too slow
- Batch size too large
- Network issues
Solutions:
- Reduce
batch_sizein config - Increase
batch_timeoutto flush faster - Scale destination (e.g., more S3 parallel uploads)
- Check destination performance
2. High Write Latency
Symptoms:
histogram_quantile(0.99, rigatoni_destination_write_duration_seconds_bucket) > 5
Causes:
- Destination overloaded
- Large batch sizes
- Network latency
Solutions:
- Reduce batch size
- Enable compression (if not already)
- Use faster destination (e.g., S3 over HTTP/2)
- Check network connectivity
3. High Error Rate
Symptoms:
rate(rigatoni_events_failed_total[5m]) > threshold
Causes:
- Serialization errors
- Destination connectivity issues
- Permiss ion errors
- Rate limiting
Solutions:
- Check error types:
sum by (error_type) (rate(rigatoni_events_failed_total[5m])) - Fix serialization issues
- Verify credentials
- Implement rate limiting backoff
- Increase
max_retries
4. High Retry Rate
Symptoms:
rate(rigatoni_retries_total[5m]) > 1
Causes:
- Transient network errors
- Destination throttling
- Timeout too aggressive
Solutions:
- Check retry reasons:
sum by (error_type) (rate(rigatoni_retries_total[5m])) - Increase retry delay
- Implement exponential backoff
- Contact destination support if persistent
Optimization Strategies
Batch Size Tuning
Monitor rigatoni_batch_size histogram:
# Show batch size distribution
histogram_quantile(0.50, rate(rigatoni_batch_size_bucket[5m])) # median
histogram_quantile(0.95, rate(rigatoni_batch_size_bucket[5m])) # p95
Guidelines:
- Too small (<10 events): Overhead from frequent writes
- Optimal (50-200 events): Good balance
- Too large (>500 events): High latency, memory usage
Memory Usage
Monitor queue size to estimate memory:
# Approximate memory usage (assuming 1KB per event)
sum(rigatoni_batch_queue_size) * 1024
Guidelines:
- Keep queue size < 10,000 events per collection
- Configure
batch_timeoutto prevent unbounded growth - Scale horizontally if needed
Troubleshooting
Common Issues
Pipeline Not Starting
Check:
- Pipeline status:
rigatoni_pipeline_status(should be 1) - Logs for error messages
- MongoDB connectivity
- Redis connectivity
Debug:
# Check if metrics endpoint is accessible
curl http://localhost:9000/metrics
# Look for initialization errors in logs
grep "ERROR" pipeline.log | grep -i "start\|init"
No Events Being Processed
Check:
- MongoDB change stream: Are there actual changes?
- Collection configuration: Are correct collections monitored?
- Resume token: Is pipeline stuck on old token?
Debug:
# Should be > 0 if events are flowing
rate(rigatoni_events_processed_total[5m])
# Check if change stream is receiving events
rate(rigatoni_change_stream_lag_seconds_count[5m])
Destination Writes Failing
Check:
- Destination errors:
rigatoni_destination_write_errors_total - Error types:
sum by (error_type, destination_type) (rate(rigatoni_destination_write_errors_total[5m])) - Credentials and permissions
Debug:
# Test S3 destination connectivity manually
awslocal s3 ls s3://your-bucket/ # for S3 (LocalStack)
aws s3 ls s3://your-bucket/ # for AWS S3
High Memory Usage
Check:
- Queue sizes:
sum(rigatoni_batch_queue_size) - Batch sizes:
histogram_quantile(0.99, rate(rigatoni_batch_size_bucket[5m]))
Solutions:
- Reduce
batch_size - Decrease
batch_timeout - Add backpressure limits
Performance Degradation
If throughput decreases over time:
# Compare current vs historical throughput
rate(rigatoni_events_processed_total[5m]) # current
rate(rigatoni_events_processed_total[5m] offset 1h) # 1 hour ago
Possible Causes:
- Memory pressure → check queue size
- Destination throttling → check error rates
- MongoDB replication lag → check change stream lag
- Network issues → check write latency
Best Practices
1. Set Retention Policies
Configure Prometheus retention:
# In prometheus.yml
storage:
tsdb:
retention.time: 30d
retention.size: 50GB
2. Use Recording Rules
Pre-compute expensive queries:
# In prometheus.yml
groups:
- name: rigatoni
interval: 30s
rules:
- record: rigatoni:throughput:rate5m
expr: rate(rigatoni_events_processed_total[5m])
- record: rigatoni:error_rate:rate5m
expr: |
rate(rigatoni_events_failed_total[5m]) /
(rate(rigatoni_events_processed_total[5m]) + rate(rigatoni_events_failed_total[5m]))
- record: rigatoni:write_latency:p99
expr: histogram_quantile(0.99, rate(rigatoni_destination_write_duration_seconds_bucket[5m]))
3. Implement Dashboards for Each Team
- Operations: Pipeline health, errors, throughput
- Development: Latency distributions, queue depths
- Business: Data volume, collection statistics
4. Regular Review
Schedule weekly reviews of:
- Alert frequency and accuracy
- Dashboard usage
- Metric cardinality
- Performance trends
5. Documentation
Document your specific:
- Alert thresholds and rationale
- Expected throughput ranges
- Maintenance procedures
- Escalation procedures