Production Deployment Guide
Best practices for deploying Rigatoni pipelines to production environments.
Table of contents
- Pre-Deployment Checklist
- Configuration Best Practices
- Deployment Architectures
- Docker Deployment
- Kubernetes Deployment
- Monitoring and Observability
- Graceful Shutdown
- Security Best Practices
- Performance Tuning
- Disaster Recovery
- Troubleshooting
- Monitoring Checklist
- Next Steps
Pre-Deployment Checklist
Before deploying to production, ensure you have:
- ✅ MongoDB Replica Set - Change streams require replica sets
- ✅ AWS Credentials - IAM role or access keys configured
- ✅ S3 Bucket - Created with appropriate lifecycle policies
- ✅ Monitoring - Logging and metrics collection set up
- ✅ Testing - Integration tests passing
- ✅ Graceful Shutdown - Signal handling implemented
Configuration Best Practices
1. Environment-Based Configuration
Use environment variables for deployment-specific settings:
use std::env;
let config = PipelineConfig::builder()
.mongodb_uri(env::var("MONGODB_URI")?)
.database(env::var("MONGODB_DATABASE")?)
.collections(
env::var("MONGODB_COLLECTIONS")?
.split(',')
.map(|s| s.to_string())
.collect()
)
.batch_size(
env::var("BATCH_SIZE")?
.parse()
.unwrap_or(1000)
)
.build()?;
let s3_config = S3Config::builder()
.bucket(env::var("S3_BUCKET")?)
.region(env::var("AWS_REGION")?)
.prefix(env::var("S3_PREFIX")?)
.build()?;
Environment File:
# .env.production
MONGODB_URI=mongodb://mongo1,mongo2,mongo3/?replicaSet=rs0
MONGODB_DATABASE=production
MONGODB_COLLECTIONS=users,orders,products
BATCH_SIZE=5000
S3_BUCKET=prod-data-lake
AWS_REGION=us-east-1
S3_PREFIX=mongodb-cdc/production
2. Optimize for Throughput
PipelineConfig::builder()
.batch_size(5000) // Larger batches
.batch_timeout_ms(30000) // 30 second timeout
.num_workers(4) // More workers per collection
.channel_buffer_size(10000) // Larger channel buffer
.build()?
3. Robust Retry Configuration
PipelineConfig::builder()
.max_retries(10) // More retries
.retry_delay_ms(1000) // 1 second initial delay
.max_retry_delay_ms(60000) // 1 minute max delay
.build()?
4. Production Logging
use tracing_subscriber::{fmt, EnvFilter};
fn init_logging() {
let filter = EnvFilter::try_from_default_env()
.unwrap_or_else(|_| EnvFilter::new("info"));
fmt()
.with_env_filter(filter)
.with_target(true)
.with_thread_ids(true)
.with_line_number(true)
.json() // JSON logging for structured logs
.init();
}
Deployment Architectures
Single-Instance Deployment
┌─────────────────────────────────┐
│ Docker Container/EC2 │
│ │
│ ┌──────────────────────────┐ │
│ │ Rigatoni Pipeline │ │
│ │ │ │
│ │ MongoDB ──▶ S3 │ │
│ │ │ │
│ │ Collections: │ │
│ │ - users │ │
│ │ - orders │ │
│ │ - products │ │
│ └──────────────────────────┘ │
└─────────────────────────────────┘
Pros:
- Simple to deploy and manage
- Lower operational overhead
Cons:
- Single point of failure
- Limited horizontal scaling
Best for:
- Low to medium volume (< 10,000 events/sec)
- Development and staging environments
Multi-Instance Deployment
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Instance 1 │ │ Instance 2 │ │ Instance 3 │
│ │ │ │ │ │
│ Collections:│ │ Collections:│ │ Collections:│
│ - users │ │ - orders │ │ - products │
│ - comments │ │ - payments │ │ - inventory │
└──────────────┘ └──────────────┘ └──────────────┘
│ │ │
└───────────────────┴───────────────────┘
│
┌─────▼─────┐
│ S3 │
└───────────┘
⚠️ CRITICAL LIMITATION: Same-Collection Constraint
Multiple pipeline instances MUST NOT watch the same collection simultaneously. This will cause:
- ❌ Duplicate event processing - All instances receive all events
- ❌ Resume token race conditions - Last write wins, leading to data loss on restart
- ❌ No distributed locking - Redis store uses simple SET (no SETNX or Redlock)
Supported Configuration:
// Instance 1
.collections(vec!["users".to_string(), "comments".to_string()])
// Instance 2
.collections(vec!["orders".to_string(), "payments".to_string()])
// Instance 3
.collections(vec!["products".to_string(), "inventory".to_string()])
NOT Supported (Will Cause Duplicates):
// ❌ Both instances watching "users" - DUPLICATES!
// Instance 1
.collections(vec!["users".to_string()])
// Instance 2
.collections(vec!["users".to_string()]) // ← Same collection!
Workaround for High-Volume Collections:
For collections that need more than one instance:
- Use MongoDB sharding to split the collection
- Deploy separate Rigatoni instances per shard
- Configure each instance to watch different shard ranges
See internal-docs/issues/multi-instance-same-collection-support.md for planned distributed locking support.
Pros:
- Horizontal scaling by collection partitioning
- Fault isolation per instance
- Can dedicate resources per collection
Cons:
- More complex coordination
- Higher operational overhead
- Cannot scale single high-volume collections without MongoDB sharding
Best for:
- High volume (> 10,000 events/sec)
- Critical production workloads with many collections
- Need for high availability
Docker Deployment
Dockerfile
# Build stage
FROM rust:1.88 as builder
WORKDIR /app
COPY Cargo.toml Cargo.lock ./
COPY src ./src
RUN cargo build --release
# Runtime stage
FROM debian:bookworm-slim
# Install runtime dependencies
RUN apt-get update && apt-get install -y \
ca-certificates \
libssl3 \
&& rm -rf /var/lib/apt/lists/*
# Copy binary
COPY --from=builder /app/target/release/my-pipeline /usr/local/bin/pipeline
# Create non-root user
RUN useradd -m -u 1000 pipeline
USER pipeline
ENTRYPOINT ["pipeline"]
docker-compose.yml
version: '3.8'
services:
pipeline:
build: .
restart: unless-stopped
environment:
- MONGODB_URI=${MONGODB_URI}
- MONGODB_DATABASE=${MONGODB_DATABASE}
- MONGODB_COLLECTIONS=${MONGODB_COLLECTIONS}
- S3_BUCKET=${S3_BUCKET}
- AWS_REGION=${AWS_REGION}
- S3_PREFIX=${S3_PREFIX}
- RUST_LOG=info
env_file:
- .env.production
# Mount AWS credentials (or use IAM role)
volumes:
- ~/.aws:/home/pipeline/.aws:ro
Build and Run
# Build image
docker build -t my-pipeline:latest .
# Run container
docker-compose up -d
# View logs
docker-compose logs -f pipeline
# Stop
docker-compose down
Kubernetes Deployment
deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: rigatoni-pipeline
labels:
app: rigatoni
spec:
replicas: 3
selector:
matchLabels:
app: rigatoni
template:
metadata:
labels:
app: rigatoni
spec:
serviceAccountName: rigatoni-sa
containers:
- name: pipeline
image: my-pipeline:latest
env:
- name: MONGODB_URI
valueFrom:
secretKeyRef:
name: mongodb-secret
key: uri
- name: MONGODB_DATABASE
value: "production"
- name: MONGODB_COLLECTIONS
value: "users,orders"
- name: S3_BUCKET
value: "prod-data-lake"
- name: AWS_REGION
value: "us-east-1"
- name: RUST_LOG
value: "info"
resources:
requests:
memory: "256Mi"
cpu: "500m"
limits:
memory: "1Gi"
cpu: "2000m"
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
Service Account (for IAM Roles)
apiVersion: v1
kind: ServiceAccount
metadata:
name: rigatoni-sa
annotations:
eks.amazonaws.com/role-arn: arn:aws:iam::123456789012:role/rigatoni-s3-role
Monitoring and Observability
1. Structured Logging
use tracing::{info, warn, error, instrument};
#[instrument(skip(self), fields(collection = %collection))]
async fn process_batch(&self, collection: String, events: Vec<ChangeEvent>) {
info!(
event_count = events.len(),
"Processing batch"
);
match self.write_to_destination(&events).await {
Ok(_) => {
info!(
event_count = events.len(),
"Batch written successfully"
);
}
Err(e) => {
error!(
event_count = events.len(),
error = %e,
"Failed to write batch"
);
}
}
}
2. Metrics Collection
use metrics::{counter, gauge, histogram};
// Track events processed
counter!("events_processed_total", "collection" => collection).increment(events.len() as u64);
// Track batch size
histogram!("batch_size", "collection" => collection).record(events.len() as f64);
// Track active workers
gauge!("active_workers").set(worker_count as f64);
// Track errors
counter!("write_errors_total", "collection" => collection).increment(1);
3. Health Checks
use axum::{Router, routing::get, Json};
use serde_json::json;
async fn health_check() -> Json<serde_json::Value> {
Json(json!({
"status": "healthy",
"version": env!("CARGO_PKG_VERSION"),
}))
}
// In main()
let app = Router::new()
.route("/health", get(health_check));
tokio::spawn(async {
axum::Server::bind(&"0.0.0.0:8080".parse().unwrap())
.serve(app.into_make_service())
.await
});
Graceful Shutdown
Signal Handling
use tokio::signal;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let mut pipeline = Pipeline::new(config, destination).await?;
// Handle graceful shutdown
tokio::select! {
result = pipeline.run() => {
if let Err(e) = result {
error!("Pipeline error: {}", e);
return Err(e.into());
}
}
_ = signal::ctrl_c() => {
info!("Received shutdown signal, shutting down gracefully...");
pipeline.shutdown().await?;
info!("Pipeline shut down successfully");
}
}
Ok(())
}
Docker Shutdown
# Use SIGTERM for graceful shutdown
STOPSIGNAL SIGTERM
# Allow time for graceful shutdown (30 seconds)
# docker stop will wait this long before SIGKILL
Kubernetes Shutdown
spec:
containers:
- name: pipeline
# ...
lifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 15"]
terminationGracePeriodSeconds: 30
Security Best Practices
1. Use IAM Roles
AWS ECS/EKS:
# IAM role for task
taskRoleArn: arn:aws:iam::123456789012:role/rigatoni-task-role
IAM Policy:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:PutObjectAcl"
],
"Resource": "arn:aws:s3:::prod-data-lake/*"
}
]
}
2. Encrypt Secrets
Use AWS Secrets Manager or Kubernetes Secrets:
use aws_sdk_secretsmanager::Client;
async fn get_mongodb_uri() -> Result<String, Box<dyn Error>> {
let config = aws_config::load_from_env().await;
let client = Client::new(&config);
let secret = client
.get_secret_value()
.secret_id("mongodb-uri")
.send()
.await?;
Ok(secret.secret_string().unwrap().to_string())
}
3. Network Security
- Use VPC endpoints for S3 (no internet gateway required)
- Restrict MongoDB access to VPC
- Use TLS for MongoDB connections
.mongodb_uri("mongodb://mongo1,mongo2,mongo3/?tls=true&replicaSet=rs0")
Performance Tuning
1. Resource Allocation
CPU:
- Minimum: 1 vCPU
- Recommended: 2-4 vCPUs for multi-worker pipelines
Memory:
- Minimum: 512 MB
- Recommended: 1-2 GB
- Formula:
base (256 MB) + (workers × 128 MB) + (batch_size × 1 KB)
Example:
4 workers × 128 MB = 512 MB
batch_size=5000 × 1 KB = 5 MB
Total: 256 MB + 512 MB + 5 MB = ~1 GB
2. Batching Optimization
// High throughput, higher latency
.batch_size(10000)
.batch_timeout_ms(60000)
// Low latency, lower throughput
.batch_size(100)
.batch_timeout_ms(1000)
// Balanced (recommended)
.batch_size(5000)
.batch_timeout_ms(30000)
3. S3 Upload Optimization
use rigatoni_destinations::s3::{Compression, SerializationFormat};
S3Config::builder()
// Use Parquet for best compression
.format(SerializationFormat::Parquet)
// Use Zstd for best compression ratio and speed
.compression(Compression::Zstd)
// Use Hive partitioning for analytics
.key_strategy(KeyGenerationStrategy::HivePartitioned)
.build()?
Disaster Recovery
1. Resume Token Persistence
Use Redis for distributed state:
use rigatoni_stores::redis::{RedisStore, RedisConfig};
use std::time::Duration;
// Configure Redis with connection pooling and TTL
let redis_config = RedisConfig::builder()
.url("redis://localhost:6379")
.pool_size(10)
.ttl(Duration::from_secs(14 * 24 * 60 * 60)) // 14 days
.max_retries(3)
.build()?;
let store = RedisStore::new(redis_config).await?;
let pipeline = Pipeline::with_store(config, destination, store).await?;
Redis Configuration for Production:
- Use TLS for encryption:
rediss://scheme - Set TTL to prevent unbounded growth (7-30 days recommended)
- Configure pool size based on concurrent pipelines (2× pipeline count)
- Enable Redis AUTH for authentication
- Use Redis Sentinel for high availability
2. Backup Resume Tokens
# Redis backup
redis-cli SAVE
# Copy backup
scp /var/lib/redis/dump.rdb backup-server:/backups/
3. Recovery Procedure
- Stop pipeline
- Restore Redis from backup
- Start pipeline (resumes from last checkpoint)
Troubleshooting
High Memory Usage
Symptoms:
- Container OOM kills
- Slow performance
Solutions:
- Reduce
batch_size - Reduce
channel_buffer_size - Increase container memory limits
Pipeline Lag
Symptoms:
- Events not processed in real-time
- Growing backlog
Solutions:
- Increase
num_workers - Increase
batch_size - Scale horizontally (more instances)
S3 Throttling
Symptoms:
- Frequent 503 errors
- Slow uploads
Solutions:
- Use S3 Transfer Acceleration
- Increase retry delays
- Use prefix sharding
Monitoring Checklist
- CPU and memory metrics
- Events processed per second
- Batch write latency
- Error rate
- Retry count
- Resume token age
- S3 upload latency
- MongoDB connection health
Next Steps
- Monitoring and Observability - Set up comprehensive monitoring
- Error Handling - Handle failures gracefully
- Testing Strategies - Test your pipelines thoroughly