Rigatoni
A high-performance, type-safe CDC/Data Replication framework for Rust, focused on real-time data pipelines.
Overview
Rigatoni is a modern CDC (Change Data Capture) and data replication framework built with Rust, designed for production-ready real-time data pipelines. It combines the performance and safety of Rust with an intuitive API for reliably replicating data from databases to data lakes and other destinations.
Key Features
- π High Performance - Async/await architecture with Tokio for concurrent processing
- π Type Safety - Leverage Rustβs type system for compile-time guarantees
- π MongoDB CDC - Real-time change stream listening with resume token support
- π¦ S3 Integration - Multiple formats (JSON, CSV, Parquet, Avro) with compression
- π Retry Logic - Exponential backoff with configurable limits
- π― Batching - Automatic batching based on size and time windows
- π¨ Composable - Build data replication workflows from simple, testable components
- π Observable - Comprehensive tracing and metrics support
Quick Start
Installation
Add Rigatoni to your Cargo.toml:
[dependencies]
rigatoni-core = "0.1.1"
rigatoni-destinations = { version = "0.1.1", features = ["s3"] }
rigatoni-stores = { version = "0.1.1", features = ["memory"] }
Your First Pipeline
Create a simple MongoDB to S3 pipeline:
use rigatoni_core::pipeline::{Pipeline, PipelineConfig};
use rigatoni_destinations::s3::{S3Config, S3Destination};
use rigatoni_stores::memory::MemoryStore;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Configure state store
let store = MemoryStore::new();
// Configure S3 destination
let s3_config = S3Config::builder()
.bucket("my-data-lake")
.region("us-east-1")
.prefix("mongodb-cdc")
.build()?;
let destination = S3Destination::new(s3_config).await?;
// Configure pipeline
let config = PipelineConfig::builder()
.mongodb_uri("mongodb://localhost:27017/?replicaSet=rs0")
.database("mydb")
.collections(vec!["users", "orders"])
.batch_size(1000)
.build()?;
// Create and run pipeline
let mut pipeline = Pipeline::new(config, store, destination).await?;
pipeline.start().await?;
Ok(())
}
Architecture
Rigatoni is organized as a workspace with three main crates:
Core Components
- rigatoni-core - Core traits, pipeline orchestration, and MongoDB integration
- rigatoni-destinations - Destination implementations (S3 available, BigQuery and Kafka coming soon)
- rigatoni-stores - State store implementations for checkpoint/resume
Pipeline Flow
βββββββββββββββ βββββββββββββ ββββββββββββββββ
β MongoDB βββββββΆβ Pipeline βββββββΆβ Destination β
β Change β β β β (S3) β
β Stream β β (batching,β β β
βββββββββββββββ β retry) β ββββββββββββββββ
βββββββββββββ
Learn more about architecture β
Use Cases
Real-time CDC to Data Lake
Stream MongoDB changes to S3 for analytics:
- Format: Parquet for efficient columnar storage
- Partitioning: Hive-style for query performance
- Compression: Zstandard for optimal ratio and speed
Backup and Archive
Continuous backup of MongoDB collections:
- Format: JSON for flexibility
- Partitioning: Date-based for lifecycle policies
- Compression: Gzip for compatibility
Event Sourcing
Capture all database changes for audit and replay:
- Format: Avro for schema evolution
- Partitioning: Collection-based for isolation
- State Management: Resume tokens for exactly-once semantics
Documentation
- Getting Started - Installation, setup, and your first pipeline
- Architecture - System design and core concepts
- User Guides - Task-specific guides and examples
- API Reference - Complete API documentation
- Contributing - Contribution guidelines
Community
- GitHub: valeriouberti/rigatoni
- Issues: Report bugs or request features
- Discussions: Ask questions and share ideas
License
Rigatoni is licensed under the Apache License 2.0.
Next Steps
Ready to build your first pipeline?