Rigatoni

A high-performance, type-safe CDC/Data Replication framework for Rust, focused on real-time data pipelines.

Get Started View on GitHub


Overview

Rigatoni is a modern CDC (Change Data Capture) and data replication framework built with Rust, designed for production-ready real-time data pipelines. It combines the performance and safety of Rust with an intuitive API for reliably replicating data from databases to data lakes and other destinations.

Key Features

  • πŸš€ High Performance - Async/await architecture with Tokio for concurrent processing
  • πŸ”’ Type Safety - Leverage Rust’s type system for compile-time guarantees
  • πŸ“Š MongoDB CDC - Real-time change stream listening with resume token support
  • πŸ“¦ S3 Integration - Multiple formats (JSON, CSV, Parquet, Avro) with compression
  • πŸ”„ Retry Logic - Exponential backoff with configurable limits
  • 🎯 Batching - Automatic batching based on size and time windows
  • 🎨 Composable - Build data replication workflows from simple, testable components
  • πŸ“ Observable - Comprehensive tracing and metrics support

Quick Start

Installation

Add Rigatoni to your Cargo.toml:

[dependencies]
rigatoni-core = "0.1.1"
rigatoni-destinations = { version = "0.1.1", features = ["s3"] }
rigatoni-stores = { version = "0.1.1", features = ["memory"] }

Your First Pipeline

Create a simple MongoDB to S3 pipeline:

use rigatoni_core::pipeline::{Pipeline, PipelineConfig};
use rigatoni_destinations::s3::{S3Config, S3Destination};
use rigatoni_stores::memory::MemoryStore;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Configure state store
    let store = MemoryStore::new();

    // Configure S3 destination
    let s3_config = S3Config::builder()
        .bucket("my-data-lake")
        .region("us-east-1")
        .prefix("mongodb-cdc")
        .build()?;

    let destination = S3Destination::new(s3_config).await?;

    // Configure pipeline
    let config = PipelineConfig::builder()
        .mongodb_uri("mongodb://localhost:27017/?replicaSet=rs0")
        .database("mydb")
        .collections(vec!["users", "orders"])
        .batch_size(1000)
        .build()?;

    // Create and run pipeline
    let mut pipeline = Pipeline::new(config, store, destination).await?;
    pipeline.start().await?;

    Ok(())
}

Architecture

Rigatoni is organized as a workspace with three main crates:

Core Components

  • rigatoni-core - Core traits, pipeline orchestration, and MongoDB integration
  • rigatoni-destinations - Destination implementations (S3 available, BigQuery and Kafka coming soon)
  • rigatoni-stores - State store implementations for checkpoint/resume

Pipeline Flow

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  MongoDB    │─────▢│  Pipeline │─────▢│  Destination β”‚
β”‚ Change      β”‚      β”‚           β”‚      β”‚   (S3)       β”‚
β”‚ Stream      β”‚      β”‚ (batching,β”‚      β”‚              β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β”‚  retry)   β”‚      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Learn more about architecture β†’

Use Cases

Real-time CDC to Data Lake

Stream MongoDB changes to S3 for analytics:

  • Format: Parquet for efficient columnar storage
  • Partitioning: Hive-style for query performance
  • Compression: Zstandard for optimal ratio and speed

Backup and Archive

Continuous backup of MongoDB collections:

  • Format: JSON for flexibility
  • Partitioning: Date-based for lifecycle policies
  • Compression: Gzip for compatibility

Event Sourcing

Capture all database changes for audit and replay:

  • Format: Avro for schema evolution
  • Partitioning: Collection-based for isolation
  • State Management: Resume tokens for exactly-once semantics

Documentation

Community

License

Rigatoni is licensed under the Apache License 2.0.


Next Steps

Ready to build your first pipeline?

Get Started with Rigatoni β†’