S3 Configuration Guide

Learn how to configure the S3 destination for your use case.

Table of contents

  1. Basic Configuration
  2. Configuration Options
    1. Required Fields
      1. bucket(impl Into<String>)
      2. region(impl Into<String>)
    2. Optional Fields
      1. prefix(impl Into<String>)
      2. format(SerializationFormat)
      3. compression(Compression)
      4. key_strategy(KeyGenerationStrategy)
      5. max_retries(u32)
      6. endpoint_url(impl Into<String>)
      7. force_path_style(bool)
  3. Complete Example
  4. AWS Credentials
    1. 1. Environment Variables
    2. 2. AWS Credentials File
    3. 3. IAM Instance Profile
    4. 4. Environment Variables (Alternative)
    5. Required IAM Permissions
  5. LocalStack Configuration
  6. Configuration Validation
    1. Valid Configuration
    2. Invalid Configurations
    3. Error Handling
  7. Environment-Based Configuration
    1. Using Environment Variables
    2. Configuration File (TOML)
  8. Best Practices
    1. 1. Use Descriptive Prefixes
    2. 2. Enable Compression
    3. 3. Use Appropriate Partitioning
    4. 4. Increase Retries for Production
    5. 5. Use IAM Roles Instead of Access Keys
  9. Troubleshooting
    1. Access Denied
    2. Bucket Not Found
    3. Invalid Bucket Name
  10. Next Steps

Basic Configuration

The minimal S3 configuration requires only a bucket and region:

use rigatoni_destinations::s3::S3Config;

let config = S3Config::builder()
    .bucket("my-data-lake")
    .region("us-east-1")
    .build()?;

Configuration Options

Required Fields

bucket(impl Into<String>)

The S3 bucket name where data will be written.

Validation Rules:

  • Must be 3-63 characters long
  • Only lowercase letters, numbers, hyphens, and periods
  • Cannot start or end with a hyphen
.bucket("my-data-lake")  // ✓ Valid
.bucket("MyBucket")       // ✗ Invalid (uppercase)
.bucket("ab")             // ✗ Invalid (too short)

region(impl Into<String>)

AWS region where the bucket is located.

.region("us-east-1")
.region("eu-west-1")
.region("ap-southeast-2")

Optional Fields

prefix(impl Into<String>)

Key prefix for all objects. Useful for organizing data within a bucket.

.prefix("mongodb-cdc")
.prefix("production/events")

Validation Rules:

  • Cannot contain .. (path traversal)
  • Cannot start with /

Example Key:

Without prefix: users/2025/01/15/10/1705318800000.jsonl
With prefix:    mongodb-cdc/users/2025/01/15/10/1705318800000.jsonl

format(SerializationFormat)

Serialization format for events. See Formats and Compression guide.

Available formats:

  • SerializationFormat::Json - Newline-delimited JSON (default)
  • SerializationFormat::Csv - Comma-separated values
  • SerializationFormat::Parquet - Apache Parquet columnar format
  • SerializationFormat::Avro - Apache Avro binary format
use rigatoni_destinations::s3::SerializationFormat;

.format(SerializationFormat::Parquet)

compression(Compression)

Compression algorithm for objects. See Formats and Compression guide.

Available compression:

  • Compression::None - No compression (default)
  • Compression::Gzip - Gzip compression (requires gzip feature)
  • Compression::Zstd - Zstandard compression (requires zstandard feature)
use rigatoni_destinations::s3::Compression;

.compression(Compression::Zstd)

key_strategy(KeyGenerationStrategy)

Strategy for generating S3 object keys. See Partitioning Strategies guide.

Available strategies:

  • KeyGenerationStrategy::DateHourPartitioned - Date and hour (default)
  • KeyGenerationStrategy::HivePartitioned - Hive-style partitioning
  • KeyGenerationStrategy::DatePartitioned - Date only
  • KeyGenerationStrategy::CollectionBased - Collection grouping
  • KeyGenerationStrategy::Flat - Flat structure
use rigatoni_destinations::s3::KeyGenerationStrategy;

.key_strategy(KeyGenerationStrategy::HivePartitioned)

max_retries(u32)

Maximum number of retries for S3 operations. Default: 3.

.max_retries(5)  // Retry up to 5 times

endpoint_url(impl Into<String>)

Custom S3 endpoint URL for S3-compatible storage (LocalStack, MinIO, etc.).

.endpoint_url("http://localhost:4566")  // LocalStack
.endpoint_url("http://minio:9000")      // MinIO

force_path_style(bool)

Use path-style addressing instead of virtual-hosted style. Required for LocalStack and MinIO.

.force_path_style(true)

URL Styles:

  • Virtual-hosted: https://bucket.s3.region.amazonaws.com/key
  • Path-style: https://s3.region.amazonaws.com/bucket/key

Complete Example

use rigatoni_destinations::s3::{
    S3Config, Compression, SerializationFormat, KeyGenerationStrategy
};

let config = S3Config::builder()
    // Required
    .bucket("analytics-data")
    .region("us-west-2")

    // Optional
    .prefix("mongodb-events")
    .format(SerializationFormat::Parquet)
    .compression(Compression::Zstd)
    .key_strategy(KeyGenerationStrategy::HivePartitioned)
    .max_retries(5)

    .build()?;

This configuration creates keys like:

analytics-data/mongodb-events/collection=users/year=2025/month=01/day=15/hour=10/1705318800000.parquet.zst

AWS Credentials

The S3 destination uses the AWS SDK, which loads credentials from:

1. Environment Variables

export AWS_ACCESS_KEY_ID=your_access_key
export AWS_SECRET_ACCESS_KEY=your_secret_key
export AWS_REGION=us-east-1

2. AWS Credentials File

~/.aws/credentials:

[default]
aws_access_key_id = your_access_key
aws_secret_access_key = your_secret_key

~/.aws/config:

[default]
region = us-east-1

3. IAM Instance Profile

For EC2 instances or ECS tasks, use IAM roles.

4. Environment Variables (Alternative)

export AWS_PROFILE=production
export AWS_DEFAULT_REGION=us-west-2

Required IAM Permissions

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "s3:PutObject",
        "s3:PutObjectAcl"
      ],
      "Resource": "arn:aws:s3:::my-data-lake/*"
    }
  ]
}

LocalStack Configuration

For local development and testing with LocalStack:

let config = S3Config::builder()
    .bucket("test-bucket")
    .region("us-east-1")
    .endpoint_url("http://localhost:4566")
    .force_path_style(true)
    .build()?;

Environment Setup:

# Install LocalStack
pip install localstack

# Start LocalStack
localstack start -d

# Set credentials (dummy values)
export AWS_ACCESS_KEY_ID=test
export AWS_SECRET_ACCESS_KEY=test
export AWS_REGION=us-east-1

# Create bucket
awslocal s3 mb s3://test-bucket

See LocalStack Development Guide for more details.


Configuration Validation

The builder validates configuration at build time:

Valid Configuration

let config = S3Config::builder()
    .bucket("my-bucket")
    .region("us-east-1")
    .build()?;  // ✓ OK

Invalid Configurations

// Missing required field
S3Config::builder()
    .bucket("my-bucket")
    // Missing region
    .build()?;  // ✗ Error: region is required

// Invalid bucket name
S3Config::builder()
    .bucket("MyBucket")  // Uppercase not allowed
    .region("us-east-1")
    .build()?;  // ✗ Error: invalid bucket name

// Invalid prefix
S3Config::builder()
    .bucket("my-bucket")
    .region("us-east-1")
    .prefix("data/../secrets")  // Path traversal
    .build()?;  // ✗ Error: invalid prefix

Error Handling

use rigatoni_destinations::s3::S3ConfigError;

match S3Config::builder()
    .bucket("ab")
    .region("us-east-1")
    .build()
{
    Ok(config) => { /* use config */ }
    Err(S3ConfigError::InvalidBucket { name, reason }) => {
        eprintln!("Invalid bucket '{}': {}", name, reason);
    }
    Err(e) => {
        eprintln!("Config error: {}", e);
    }
}

Environment-Based Configuration

Using Environment Variables

use std::env;

let config = S3Config::builder()
    .bucket(env::var("S3_BUCKET")?)
    .region(env::var("AWS_REGION")?)
    .prefix(env::var("S3_PREFIX").unwrap_or_default())
    .build()?;

Configuration File (TOML)

config.toml:

[s3]
bucket = "my-data-lake"
region = "us-east-1"
prefix = "mongodb-cdc"
format = "parquet"
compression = "zstd"

Load with config crate:

use config::{Config, File};
use serde::Deserialize;

#[derive(Deserialize)]
struct AppConfig {
    s3: S3Settings,
}

#[derive(Deserialize)]
struct S3Settings {
    bucket: String,
    region: String,
    prefix: Option<String>,
}

let settings = Config::builder()
    .add_source(File::with_name("config"))
    .build()?
    .try_deserialize::<AppConfig>()?;

let s3_config = S3Config::builder()
    .bucket(&settings.s3.bucket)
    .region(&settings.s3.region)
    .prefix(settings.s3.prefix.unwrap_or_default())
    .build()?;

Best Practices

1. Use Descriptive Prefixes

Organize data by environment and purpose:

// Development
.prefix("dev/mongodb-cdc")

// Production
.prefix("prod/mongodb-cdc")

// By source system
.prefix("mongodb/production/cdc")

2. Enable Compression

Reduce storage costs and bandwidth:

.compression(Compression::Zstd)  // Best ratio and speed

3. Use Appropriate Partitioning

Choose based on query patterns:

// Analytics: Hive partitioning
.key_strategy(KeyGenerationStrategy::HivePartitioned)

// Backups: Date partitioning
.key_strategy(KeyGenerationStrategy::DatePartitioned)

4. Increase Retries for Production

.max_retries(10)  // More retries for production

5. Use IAM Roles Instead of Access Keys

For EC2/ECS deployments, use IAM instance profiles instead of hardcoding credentials.


Troubleshooting

Access Denied

Error:

Error: S3 operation failed: Access Denied

Solutions:

  1. Verify AWS credentials are configured
  2. Check IAM permissions include s3:PutObject
  3. Verify bucket name and region are correct

Bucket Not Found

Error:

Error: S3 operation failed: NoSuchBucket

Solutions:

  1. Create the bucket: aws s3 mb s3://my-bucket
  2. Verify bucket name spelling
  3. Verify region matches bucket location

Invalid Bucket Name

Error:

Error: invalid bucket name: MyBucket (must contain only lowercase letters, numbers, hyphens, and periods)

Solution: Use lowercase letters only:

.bucket("my-bucket")  // ✓ Correct

Next Steps