Skip to content

Ugbot/Sabot

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

32 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

Sabot: High-Performance Python Data Processing

⚠️ EXPERIMENTAL - ALPHA SOFTWARE ⚠️

The Python alternative to PySpark and Ray for high-performance columnar data processing.

Sabot is a Python framework that brings Apache Arrow's columnar performance to data processing workflows. Unlike PySpark's JVM overhead or Ray's distributed complexity, Sabot provides zero-copy Arrow operations with Cython acceleration for massive throughput on single machines.

🎯 NEW: Unified Architecture (October 2025)

Sabot now has a unified entry point for all functionality:

from sabot import Sabot

# Create unified engine
engine = Sabot(mode='local')  # or 'distributed'

# Stream processing
stream = engine.stream.from_kafka('topic').filter(lambda b: b.column('x') > 10)

# SQL processing
result = engine.sql("SELECT * FROM table WHERE x > 10")

# Graph processing
matches = engine.graph.cypher("MATCH (a)-[:KNOWS]->(b) RETURN a, b")

# Clean shutdown
engine.shutdown()

See: Unified Architecture Guide

Classic API (Still Supported)

from sabot import cyarrow as ca
from sabot.api.stream import Stream
from sabot.cyarrow import compute as pc

# Load 10M rows from Parquet file
data = ca.read_parquet('transactions.parquet')

# Create batch processing pipeline
stream = (Stream.from_table(data, batch_size=100_000)
    # Filter high-value transactions (SIMD-accelerated)
    .filter(lambda batch: pc.greater(batch.column('amount'), 10000))

    # Compute fraud score using auto-compiled Numba UDF
    .map(lambda batch: batch.append_column('fraud_score',
        compute_fraud_score(batch)))  # 10-100x Numba speedup

    # Select output columns
    .select('transaction_id', 'amount', 'fraud_score'))

# Execute: 104M rows/sec hash joins, 10-100x UDF speedup
for batch in stream:
    print(f"Processed {batch.num_rows} rows")

# Same code works for streaming (infinite) sources!
# stream = Stream.from_kafka('transactions')  # Never terminates

Project Status

This is an experimental research project exploring the design space of:

  • Zero-copy Arrow columnar processing in Python
  • Cython acceleration for data-intensive operations
  • High-performance batch processing alternatives
  • Kafka streaming integration with columnar efficiency

Current State (v0.1.0-alpha):

  • βœ… CyArrow: Production-ready zero-copy Arrow operations (104M rows/sec joins)
  • βœ… Graph Processing: High-performance graph analytics (3-37M matches/sec pattern matching)
  • βœ… SabotQL: SPARQL query engine for RDF triple stores (23,798 queries/sec, integrated into pipelines)
  • βœ… DataLoader: High-performance CSV/Arrow IPC loading (52x faster than CSV)
  • βœ… Streaming Agents: Faust-inspired Kafka processing with columnar data
  • βœ… Cython Acceleration: SIMD-accelerated compute kernels
  • ⚠️ Distributed features are experimental (checkpoints, state management)
  • ⚠️ Test coverage is limited (~5%)
  • ⚠️ Not recommended for production use

Performance: Why Choose Sabot Over PySpark/Ray?

Sabot delivers PySpark-level performance without the JVM overhead:

Core Data Processing (M3 MacBook Pro, 11.2M rows)

  • Hash Joins: 104M rows/sec (11.2M row join in 107ms)
  • Data Loading (Arrow IPC): 5M rows/sec (10M rows in 2 seconds, memory-mapped)
  • Data Loading (CSV): 0.5-1.0M rows/sec (multi-threaded)
  • Arrow IPC vs CSV: 52x faster loading (10M rows: 2s vs 103s)
  • File Compression: 50-70% size reduction (5.6GB β†’ 2.4GB)
  • Zero-copy operations: ~2-3ns per element (SIMD-accelerated)

Fintech Enrichment Pipeline (Complete workflow):

  • Total Pipeline: 2.3 seconds end-to-end
  • Hash Join: 104.6M rows/sec throughput
  • Window Operations: ~2-3ns per element (SIMD-accelerated)
  • Spread Calculation: Vectorized compute kernels
  • Data Loading: 2.1 seconds (10M + 1.2M rows)

vs PySpark (same workload, anecdotal):

  • PySpark: ~30-60 seconds (JVM startup + serialization overhead)
  • Sabot: 2.3 seconds (pure Python + Arrow + Cython)

vs Ray:

  • Ray: Distributed coordination overhead even for single-machine
  • Sabot: Direct columnar operations with zero serialization

Streaming Performance

Real-time Fraud Detection (Kafka + columnar processing):

  • Throughput: 143K-260K transactions/sec
  • Latency p50/p95/p99: 0.01ms / 0.01ms / 0.01ms
  • Pattern Detection: velocity, amount anomaly, geo-impossible
  • Stateful Processing: 1M+ state operations/sec

Experimental Features (In Development):

  • Distributed agent runtime (Ray-like actor model)
  • RocksDB state backend (persistent state)
  • GPU acceleration via RAFT
  • Complex event processing (CEP)

Auto-Numba UDF Compilation

Automatic 10-100x speedup for user-defined functions - transparently!

Sabot automatically compiles Python UDFs with Numba JIT for massive performance gains. No code changes required - just write normal Python and Sabot handles the rest.

Status: βœ… Phase 2 Complete - Fully integrated with CythonMapOperator, tested and benchmarked.

# User writes normal Python - Sabot auto-compiles it!
def my_transform(batch):
    # Extract numpy arrays from Arrow columns
    values = batch.column('value').to_numpy()
    results = []

    # Numba-compiled computation
    for i in range(len(values)):
        total = 0
        for j in range(100):
            total += values[i] * j
        results.append(total)

    # Return new RecordBatch
    return batch.append_column('computed', pa.array(results))

# Automatically compiled with Numba @njit (10-50x faster)
stream = Stream.from_kafka('data').map(my_transform)

How it works:

  1. AST analysis detects function patterns (loops, NumPy array ops, etc.)
  2. Chooses optimal compilation strategy (@njit vs @vectorize)
  3. Compiles with Numba transparently for batch processing
  4. Falls back to Python if compilation fails
  5. Caches compiled functions for reuse

Performance:

  • Scalar loops: 10-50x speedup
  • NumPy operations: 50-100x speedup
  • Compilation overhead: <100ms (first-time only)
  • Cache hit: <1ms

Works with batch-first architecture!

Design Goals

πŸš€ PySpark Performance in Pure Python

  • CyArrow: Zero-copy Arrow operations (104M rows/sec joins)
  • Arrow IPC: Memory-mapped data loading (52x faster than CSV)
  • SIMD Acceleration: Vectorized operations beating PySpark throughput
  • Cython DataLoader: Multi-threaded CSV parsing, auto-format detection

⚑ Ray-Like Distributed Processing (Experimental)

  • Actor-based agents for distributed computation
  • Distributed checkpointing (Chandy-Lamport barriers)
  • Complex event processing (CEP) with pattern matching
  • Stateful stream processing with persistence backends

πŸ”§ Pythonic API - No JVM, No Serialization

  • Unified imports: import sabot as sb
  • Zero-copy operations: from sabot.cyarrow import load_data, hash_join_batches
  • Decorator-based agents: @app.agent() (experimental - see note below)
  • Multiple data formats: Arrow IPC, CSV, Parquet, Avro

Note on Agent API: The @app.agent() decorator is experimental. Agents register successfully but stream consumption requires manual message deserialization. See examples/fraud_app.py for working pattern.

Quick Start

1. Install

Prerequisites: Python 3.9+, C++ compiler, CMake 3.16+

# Clone with vendored Arrow C++ submodule
git clone --recursive https://github.com/yourusername/sabot.git
cd sabot

# Option A: Quick install (builds Arrow C++ automatically, ~30-60 mins first time)
pip install -e .

# Option B: Build Arrow C++ manually first (recommended for development)
python build.py          # One-time Arrow build (~30-60 mins)
pip install -e .         # Fast install (<1 min)

# Verify vendored Arrow is working
python -c "from sabot import cyarrow; print(f'Vendored Arrow: {cyarrow.USING_ZERO_COPY}')"

# Note: Use sabot.cyarrow (our optimized Arrow), not pyarrow

# Dependencies Note: We vendor critical Cython dependencies (RocksDB, Arrow components)
# because they are tightly bound to native C/C++ libraries, not standard Python packages.
# This ensures consistent builds and performance across platforms.

2. Start Infrastructure

# Start Kafka, Postgres, Redis via Docker
docker compose up -d

# Check services
docker compose ps

3. Create Your First App

fraud_app.py:

import sabot as sb
import json

# Create Sabot application
app = sb.App(
    'fraud-detection',
    broker='kafka://localhost:19092',
    value_serializer='json'
)

# Define fraud detector with state
detector = FraudDetector()  # Your fraud detection logic

@app.agent('bank-transactions')
async def detect_fraud(stream):
    """Process transactions and detect fraud patterns."""
    async for message in stream:
        # Deserialize message (stream yields raw bytes/str)
        if isinstance(message, bytes):
            txn = json.loads(message.decode('utf-8'))
        else:
            txn = json.loads(message) if isinstance(message, str) else message

        # Process transaction
        alerts = await detector.detect_fraud(txn)

        # Handle fraud alerts
        for alert in alerts:
            print(f"🚨 FRAUD: {alert['type']} - {alert['details']}")

4. Run with CLI

# Start worker (Faust-style)
sabot -A fraud_app:app worker --loglevel=INFO

# Or with concurrency
sabot -A fraud_app:app worker -c 4

Building from Source

Sabot uses a unified build system that automatically builds all dependencies:

# Clone the repository
git clone https://github.com/sabot/sabot.git
cd sabot

# Initialize submodules (Arrow, Tonbo, etc.)
git submodule update --init --recursive

# Build everything (Arrow C++, vendored extensions, Cython modules)
python build.py

# The build system will:
# 1. Check dependencies (Arrow, RocksDB, Tonbo, Rust, hiredis)
# 2. Build Arrow C++ (20-60 min first time, then cached)
# 3. Build vendor extensions (CyRedis, RocksDB, Tonbo) if available
# 4. Auto-discover and build 56 Cython modules
# 5. Validate builds and report summary

Build Commands:

python build.py              # Build everything
python build.py --clean      # Remove .so/.c/.cpp artifacts
python build.py --clean-all  # Also remove Arrow/vendor builds
python build.py --skip-arrow # Don't rebuild Arrow (use existing)
python build.py --skip-vendor # Skip vendor extensions

Dependencies:

  • Required: CMake, C++ compiler (clang++/g++), Python 3.11+, Cython, NumPy
  • Optional: RocksDB (for persistent state), Rust toolchain (for Tonbo), hiredis (for CyRedis)

The build system automatically skips unavailable optional dependencies and reports what was built/skipped.

Architecture

Sabot combines Arrow's columnar performance with Python's ecosystem:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    Application Layer                    β”‚
β”‚   @app.agent() decorators, Faust-style API            β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                         ↓
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚              Sabot Core (Clean API)                     β”‚
β”‚   import sabot as sb                                    β”‚
β”‚   - sb.App, sb.agent()                                  β”‚
β”‚   - sb.Barrier, sb.BarrierTracker (checkpoints)        β”‚
β”‚   - sb.MemoryBackend, sb.ValueState (state)            β”‚
β”‚   - sb.WatermarkTracker, sb.Timers (time)              β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                         ↓
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚         Cython-Accelerated Modules (10-100x faster)     β”‚
β”‚   - Checkpoint coordination (Chandy-Lamport)            β”‚
β”‚   - State management (RocksDB, memory)                  β”‚
β”‚   - Time/watermark tracking                             β”‚
β”‚   - Arrow batch processing (SIMD-accelerated)          β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                         ↓
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                  Infrastructure Layer                   β”‚
β”‚   Kafka, Redpanda | PostgreSQL | Redis | RocksDB       β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Core Modules

Module Description Performance
cyarrow Zero-copy Arrow operations (hash joins, windows) 104M rows/sec
DataLoader Multi-threaded data loading (CSV, Arrow IPC) 5M rows/sec (Arrow)
checkpoint Distributed snapshots (Chandy-Lamport) <10ΞΌs initiation
state Managed state (Memory, RocksDB, Redis, Tonbo) 1M+ ops/sec
time Watermarks, timers, event-time <5ΞΌs tracking
agents Actor-based stream processors (experimental) -
features Feature engineering for ML pipelines (CyRedis) Streaming features
tonbo LSM state backend (Rust FFI, production-ready) 72K writes/sec, 241K reads/sec
shuffle Lock-free network transport (Arrow Flight) Zero-copy distributed shuffle
graph Graph storage, traversal, and pattern matching 3-37M matches/sec

Feature Engineering (sabot/features/)

Real-time feature computation and storage for ML pipelines:

from sabot import Stream
from sabot.features import FeatureStore

# Initialize CyRedis-backed feature store
feature_store = FeatureStore(redis_url="localhost:6379", db=0)
await feature_store.initialize()

# Compute features using standard operators + feature extractors
stream = (Stream.from_kafka("ticker-data")
    .with_features([
        'price_rolling_mean_5m',
        'volume_std_1h',
        'spread_percentile_95'
    ])
    .to_feature_store(
        feature_store=feature_store,
        entity_key_column='symbol',
        feature_columns=['price_rolling_mean_5m', 'volume_std_1h', 'spread_percentile_95'],
        ttl=300
    ))

Features:

  • βœ… Cython-accelerated extractors (RollingMean, RollingStd, Percentile, TimeBased)
  • βœ… CyRedis backend with async batch writes
  • βœ… TTL-based expiration for streaming features
  • βœ… Composable with standard Stream API operators
  • βœ… Working demo: examples/crypto_features_demo.py

Tonbo State Backend

Production-ready LSM storage engine (Rust FFI):

from sabot.stores import TonboBackend

# Create Tonbo state backend
backend = TonboBackend(path="./state/tonbo", cython_enabled=True)
await backend.start()

# High-performance operations
await backend.set("user:123", {"name": "Alice", "tier": "gold"})
value = await backend.get("user:123")
await backend.batch_set([("key1", val1), ("key2", val2)])

Performance (Oct 6, 2025):

  • βœ… 72K writes/sec, 241K reads/sec, 135K batch ops/sec
  • βœ… Rust FFI with zero-copy Arrow integration
  • βœ… Used for: dimension tables, checkpoints, materializations
  • βœ… 7 test files passing (4 unit, 3 integration)
  • βœ… Status: PRODUCTION READY

Lock-Free Network Shuffle

Zero-copy distributed data transfer using Arrow Flight:

Features:

  • βœ… Lock-free Arrow Flight transport (flight_transport_lockfree.pyx) - atomic connection pooling
  • βœ… SPSC/MPSC ring buffers (lock_free_queue.pyx) - concurrent partition queues
  • βœ… Atomic partition store (atomic_partition_store.pyx) - LMAX Disruptor-style hash table
  • βœ… Zero-copy network transfer - direct Arrow RecordBatch serialization
  • βœ… 8 compiled modules in sabot/_cython/shuffle/

Use Cases:

  • Distributed joins and aggregations
  • Repartitioning operations
  • Multi-stage dataflow pipelines

Graph Processing (sabot/_cython/graph/)

High-performance graph analytics and pattern matching built on Arrow columnar format.

Features:

  • βœ… Columnar Graph Storage: Property graphs with CSR/CSC adjacency
  • βœ… Graph Traversal: BFS, DFS, shortest paths, PageRank, centrality, connected components
  • βœ… Pattern Matching: 2-hop, 3-hop, variable-length paths with cost-based optimization
  • βœ… Zero-Copy Operations: Direct Arrow buffer access for maximum throughput

Performance (M1 Pro, October 2025):

  • 2-hop patterns: 3-37M matches/sec
  • 3-hop patterns: 2.7-5.6M matches/sec
  • Graph traversal: 10-50M nodes/sec
  • Join optimizer: <0.130ms overhead

API Example:

import pyarrow as pa
from sabot._cython.graph import PropertyGraph, VertexTable, EdgeTable
from sabot._cython.graph.query import match_2hop
from sabot._cython.graph.traversal import pagerank

# Create social graph
vertices = pa.table({
    'id': pa.array([0, 1, 2, 3], type=pa.int64()),
    'label': pa.array(['Person', 'Person', 'Person', 'Company']).dictionary_encode(),
    'name': ['Alice', 'Bob', 'Charlie', 'Acme Corp']
})

edges = pa.table({
    'source': pa.array([0, 1, 0, 2], type=pa.int64()),
    'target': pa.array([1, 2, 3, 3], type=pa.int64()),
    'type': pa.array(['KNOWS', 'KNOWS', 'WORKS_AT', 'WORKS_AT']).dictionary_encode()
})

# Create property graph
graph = PropertyGraph(VertexTable(vertices), EdgeTable(edges))
graph.build_csr()

# Find 2-hop patterns: Person β†’ Person β†’ Person
result = match_2hop(edges, edges)
print(f"Found {result.num_matches()} friend-of-friend connections")

# Run PageRank
ranks = pagerank(edges, num_vertices=4, damping=0.85, max_iterations=20)
print(f"PageRank scores: {ranks.to_pylist()}")

# Get neighbors
neighbors = graph.get_neighbors(0)  # Alice's neighbors
print(f"Alice knows: {neighbors.to_pylist()}")

Use Cases:

  • Social network analysis (friend-of-friend recommendations)
  • Fraud detection (money laundering pattern matching)
  • Knowledge graph inference
  • Supply chain tracking
  • Network influence analysis

Comprehensive Documentation: See GRAPH_QUERY_ENGINE.md for complete API reference, examples, and benchmarks.

Working Examples:

  • examples/social_network_analysis.py - Friend recommendations
  • examples/fraud_detection_optimizer.py - Money laundering patterns
  • examples/pattern_sabot_integration.py - Integration with Sabot operators

Test Suite: 27 tests, 100% passing

pytest tests/unit/graph/test_pattern_matching.py -v

Example: Fintech Data Enrichment (Zero-Copy Arrow)

The fintech enrichment demo shows how Sabot processes millions of rows with Arrow columnar operations:

# One-time: Convert CSV to Arrow IPC format (52x faster loading)
cd examples/fintech_enrichment_demo
python convert_csv_to_arrow.py

# Enable Arrow IPC and run
export SABOT_USE_ARROW=1
./run_demo.sh --securities 10000000 --quotes 1200000

How Arrow Batch Processing Works:

  • Arrow Joins: Convert RecordBatches to Tables, perform SIMD-accelerated hash joins
  • Zero-Copy Operations: Direct memory access to Arrow buffers (no Python object creation)
  • SIMD Acceleration: PyArrow compute kernels use vectorized operations
  • Memory-Mapped Loading: Arrow IPC files loaded directly into memory without copying

Performance Results (M3 MacBook Pro, 11.2M rows):

  • Total Pipeline: 2.3 seconds end-to-end (vs 103s with CSV)
  • Hash Join: 104.6M rows/sec (11.2M rows joined in 107ms)
  • Data Loading: 2.1 seconds (10M securities + 1.2M quotes)
  • File Size: 50-70% smaller than CSV (5.6GB β†’ 2.4GB)
  • Speedup: 46x faster than traditional CSV processing

vs PySpark for this workload:

  • PySpark: ~30-60 seconds (JVM startup, serialization, garbage collection)
  • Sabot: 2.3 seconds (pure Python + Arrow + Cython acceleration)

See DATA_FORMATS.md for format guide and PERFORMANCE_SUMMARY.md for detailed benchmarks.

CLI Reference

Sabot provides a Faust-style CLI for production deployments:

# Start worker
sabot -A myapp:app worker

# With concurrency
sabot -A myapp:app worker -c 4

# Override broker
sabot -A myapp:app worker -b kafka://prod:9092

# Set log level
sabot -A myapp:app worker --loglevel=DEBUG

# Full options
sabot -A myapp:app worker \
  --concurrency 4 \
  --broker kafka://localhost:9092 \
  --loglevel INFO

API Reference

Creating an App

import sabot as sb

app = sb.App(
    'my-app',
    broker='kafka://localhost:19092',      # Kafka broker URL
    value_serializer='json',                # 'json', 'arrow', 'avro'
    enable_distributed_state=True,          # Use Redis for state
    database_url='postgresql://localhost/sabot'  # For durable execution
)

Defining Agents

@app.agent('my-topic')
async def process_events(stream):
    """Stateful event processor."""
    async for event in stream:
        # Process event
        result = transform(event)
        # Yield to output
        yield result

State Management

# Memory backend (fast, not persistent)
config = sb.BackendConfig(
    backend_type="memory",
    max_size=100000,
    ttl_seconds=300.0
)
backend = sb.MemoryBackend(config)

# RocksDB backend (persistent, larger state)
rocksdb = sb.RocksDBBackend(
    sb.BackendConfig(backend_type="rocksdb", path="./state")
)

# State types
value_state = sb.ValueState(backend, "counter")      # Single value
map_state = sb.MapState(backend, "user_profiles")    # Key-value map
list_state = sb.ListState(backend, "events")         # Ordered list

Checkpointing

# Barrier tracking for distributed checkpoints
tracker = sb.BarrierTracker(num_channels=3)

# Register barrier
aligned = tracker.register_barrier(
    channel=0,
    checkpoint_id=1,
    total_inputs=3
)

# Checkpoint coordinator
coordinator = sb.Coordinator()

Time & Watermarks

# Track watermarks across partitions
watermark_tracker = sb.WatermarkTracker(num_partitions=3)
watermark_tracker.update_watermark(partition_id=0, timestamp=12345)

# Timer service for delayed processing
timers = sb.Timers()

Installation Details

System Requirements

  • Python: 3.8+
  • OS: Linux, macOS (Windows via WSL)
  • Memory: 4GB+ recommended
  • Dependencies: See requirements.txt

Optional Dependencies

# GPU acceleration (RAFT)
pip install cudf cupy raft-dask pylibraft

# Kafka support
pip install confluent-kafka aiokafka

# Redis state backend
pip install redis hiredis

# RocksDB state backend
pip install rocksdb

# All optional features
pip install sabot[all]

Building from Source

# Install Cython and dependencies
uv pip install cython numpy  # Use sabot.cyarrow, not pyarrow

# Build Cython extensions
python setup.py build_ext --inplace

# Install in development mode
pip install -e .

Docker Compose Infrastructure

The included docker-compose.yml provides a complete streaming stack:

services:
  redpanda:      # Kafka-compatible broker (port 19092)
  console:       # Redpanda web UI (port 8080)
  postgres:      # PostgreSQL for durable execution (port 5432)
  redis:         # Redis for distributed state (port 6379)

Start all services:

docker compose up -d

Access Redpanda Console:

open http://localhost:8080

View logs:

docker compose logs -f redpanda

Stop all services:

docker compose down

Examples

Example Description Performance Location
Fintech Enrichment 11.2M row joins with Arrow IPC 104M rows/sec examples/fintech_enrichment_demo/
Arrow Data Loading CSV to Arrow IPC conversion 52x faster examples/fintech_enrichment_demo/convert_csv_to_arrow.py
Zero-Copy Operations Hash joins, windows, filtering SIMD-accelerated examples/fintech_enrichment_demo/arrow_optimized_enrichment.py
Fraud Detection Real-time fraud detection (experimental) 3-6K txn/s examples/fraud_app.py
Crypto Features Real-time feature engineering pipeline Redis feature store examples/crypto_features_demo.py
Graph Pattern Matching Friend-of-friend recommendations 3-37M matches/sec examples/social_network_analysis.py
Graph Fraud Detection Money laundering pattern detection 2.7-5.6M matches/sec examples/fraud_detection_optimizer.py
Graph Traversal BFS, DFS, PageRank, shortest paths 10-50M nodes/sec examples/property_graph_demo.py

Benchmark Results

Fintech Enrichment Demo (M3 MacBook Pro, 11.2M rows):

  • Hash Join: 104.6M rows/sec (11.2M rows in 107ms)
  • Arrow IPC Loading: 5M rows/sec (10M rows in 2 seconds)
  • CSV Loading: 0.5M rows/sec (multi-threaded)
  • Total Pipeline: 2.3 seconds (vs 103s with CSV - 46x faster)
  • File Size: 50-70% reduction (5.6GB β†’ 2.4GB)
  • Memory Usage: Memory-mapped (minimal footprint)

CyArrow Zero-Copy Operations:

  • Window Computation: ~2-3ns per element (SIMD)
  • Filtering: 50-100x faster than Python loops
  • Sorting: 10M+ rows/sec with zero-copy slicing
  • Buffer Access: ~5ns per element (direct C++ pointers)

State Backend Operations (MemoryBackend):

  • Get/Put latency: Sub-millisecond
  • Sustained throughput: 1M+ operations/second (Cython)

Notes on Benchmarks:

  • Measured on M3 MacBook Pro (8-core, 18GB RAM)
  • Arrow IPC with memory-mapped I/O
  • Real fintech data (10M securities, 1.2M quotes)
  • All benchmarks are reproducible (see PERFORMANCE_SUMMARY.md)

Documentation

Performance & Data Formats

Graph Processing

  • GRAPH_QUERY_ENGINE.md - Complete graph API documentation
    • Columnar graph storage (CSR/CSC adjacency)
    • Graph traversal algorithms (BFS, DFS, PageRank, etc.)
    • Pattern matching (2-hop, 3-hop, variable-length paths)
    • Query optimization and planning
    • Performance benchmarks (3-37M matches/sec)

Architecture & Development

Comparison to Other Frameworks

Feature Sabot PySpark Ray Apache Flink
Language Python Python (JVM) Python Java/Scala
Performance βœ… 104M rows/sec (Arrow + Cython) 🐌 10-50x slower (JVM serialization) ⚠️ Distributed overhead βœ… Production-scale
Columnar Processing βœ… Zero-copy Arrow (SIMD-accelerated) ⚠️ Arrow integration ❌ No ⚠️ Limited
Data Loading βœ… 52x faster (Arrow IPC) 🐌 Standard 🐌 Standard 🐌 Standard
Memory Efficiency βœ… Memory-mapped (no copies) 🐌 JVM heap ⚠️ Object serialization βœ… Native
Graph Processing βœ… 3-37M matches/sec (native) ⚠️ GraphX (JVM overhead) ❌ No native support ⚠️ Gelly (limited)
Setup Complexity βœ… Single pip install 🐌 JVM + Spark cluster 🐌 Distributed setup 🐌 Cluster management
Debugging βœ… Pure Python (pdb, breakpoints) 🐌 JVM stack traces ⚠️ Distributed complexity 🐌 JVM debugging
Streaming ⚠️ Experimental agents βœ… Structured Streaming βœ… Ray Data βœ… Production
Production Ready βœ… CyArrow (yes), βœ… Graph (yes), ⚠️ Streaming (no) βœ… Yes βœ… Yes βœ… Yes

Roadmap

Current Status (v0.1.0-alpha)

Working (Production-Quality):

  • βœ… CyArrow: Zero-copy hash joins (104M rows/sec)
  • βœ… DataLoader: Multi-threaded CSV, memory-mapped Arrow IPC
  • βœ… Arrow IPC Format: 52x faster than CSV, 50-70% smaller files
  • βœ… SIMD Operations: Window functions, filtering, sorting
  • βœ… Graph Processing: Storage, traversal, pattern matching (3-37M matches/sec)
  • βœ… Cython checkpoint coordinator (Chandy-Lamport barriers)
  • βœ… Memory state backend with Cython acceleration
  • βœ… Fintech enrichment demo (11.2M rows in 2.3s)

Working (Experimental):

  • ⚠️ Basic Kafka source/sink with schema registry
  • ⚠️ Watermark tracking primitives
  • ⚠️ CLI scaffolding (Faust-style)
  • ⚠️ Fraud detection demo (3K-6K txn/s)

In Progress:

  • 🚧 Agent runtime execution layer (partially stubbed)
  • 🚧 RocksDB state backend integration
  • 🚧 Distributed coordination
  • 🚧 Complex event processing (CEP)

Known Limitations:

  • ⚠️ Test coverage ~5% for streaming features
  • ⚠️ Agent runtime has mock implementations
  • ⚠️ CLI uses stubs in places
  • βœ… CyArrow & DataLoader are well-tested and performant

Planned (v0.2.0)

  • 🎯 Complete agent runtime implementation
  • 🎯 Comprehensive integration tests
  • 🎯 RocksDB state backend completion
  • 🎯 Improved error handling and recovery
  • 🎯 Performance benchmarking suite
  • 🎯 Production-ready checkpointing

Future Ideas (v0.3.0+)

  • πŸ“‹ GPU acceleration via RAFT
  • πŸ“‹ Advanced CEP patterns
  • πŸ“‹ SQL/Table API
  • πŸ“‹ Web UI for monitoring
  • πŸ“‹ S3/HDFS connectors
  • πŸ“‹ Query optimizer

Contributing

Sabot is an experimental research project and welcomes contributions! This is a learning-focused project exploring streaming architecture design.

High-Impact Areas:

  1. Testing: Expand test coverage beyond current ~5%
  2. Agent Runtime: Complete the execution layer implementation
  3. RocksDB Integration: Finish state backend implementation
  4. Documentation: Improve guides and examples
  5. Benchmarking: Add comprehensive performance tests

Before Contributing:

  • This is alpha software with many incomplete features
  • Focus on learning and experimentation rather than production readiness
  • Check existing issues and roadmap before starting major work
  • Add tests for any new functionality

See CONTRIBUTING.md for guidelines (if available).

License

GNU Affero General Public License v3.0 (AGPL-3.0) - See LICENSE file for details.

Credits

Inspired by:

  • Apache Arrow - Zero-copy columnar data processing
  • PySpark - DataFrame API and distributed processing concepts
  • Ray - Actor model and distributed computing patterns
  • Faust - Python streaming framework, CLI design, and agent patterns

Built with:

  • Cython - High-performance compiled modules for Arrow acceleration
  • PyArrow - SIMD-accelerated compute kernels and memory management
  • Redpanda - Kafka-compatible streaming infrastructure
  • PostgreSQL - Durable execution (DBOS-inspired)
  • RocksDB - Embedded key-value store for state management

Support


When to Choose Sabot vs PySpark vs Ray

Choose Sabot when:

  • You need PySpark-level performance without JVM overhead
  • You're processing large columnar datasets (Arrow IPC, Parquet)
  • You need high-performance graph analytics (pattern matching, traversals)
  • You want pure Python debugging (pdb, breakpoints, no JVM stack traces)
  • Single-machine performance is your primary concern
  • You need fast iteration during development

Choose PySpark when:

  • You need production-scale distributed processing
  • Your team is already invested in the Spark ecosystem
  • You have existing Spark clusters and infrastructure
  • Java/Scala performance is acceptable for your use case

Choose Ray when:

  • You need distributed actor-based processing
  • You're building complex ML pipelines with distributed training
  • Python-first distributed computing is your priority

This is experimental alpha software. The CyArrow columnar processing and graph analytics are production-quality, but streaming features are experimental. We welcome feedback and contributions!

Ready to try it?

About

An Arrow based batch data processing tool for python

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published