Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
254 changes: 254 additions & 0 deletions docs/rate_limiting.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
# Rate Limiting in MassGen

MassGen implements multi-dimensional rate limiting to respect AI provider rate limits and avoid hitting API quotas.

## Overview

The rate limiting system supports three types of limits:

- **RPM (Requests Per Minute)**: Maximum number of API requests allowed per minute
- **TPM (Tokens Per Minute)**: Maximum number of tokens (input + output) allowed per minute
- **RPD (Requests Per Day)**: Maximum number of API requests allowed per 24-hour period

All limits are enforced using **sliding windows** for accurate, real-time tracking.

## Configuration

Rate limits are defined in `massgen/configs/rate_limits/rate_limits.yaml`:

```yaml
gemini:
gemini-2.5-flash:
rpm: 9 # 9 requests per minute
tpm: 240000 # 240K tokens per minute
rpd: 245 # 245 requests per day

gemini-2.5-pro:
rpm: 2 # 2 requests per minute
tpm: 120000 # 120K tokens per minute
rpd: 48 # 48 requests per day

default:
rpm: 7
tpm: 100000
rpd: 100
```

### Adding New Providers

To add rate limits for a new provider (e.g., OpenAI):

```yaml
openai:
gpt-4:
rpm: 500
tpm: 150000
rpd: 10000

gpt-3.5-turbo:
rpm: 3000
tpm: 250000
rpd: 50000
```

## How It Works

### 1. Configuration Loading

Rate limits are loaded from the YAML file when the backend initializes:

```python
from massgen.configs.rate_limits import get_rate_limit_config

config = get_rate_limit_config()
limits = config.get_limits('gemini', 'gemini-2.5-flash')
# Returns: {'rpm': 9, 'tpm': 240000, 'rpd': 245}
```

### 2. Rate Limiter Creation

The backend creates a shared multi-dimensional rate limiter:

```python
from massgen.rate_limiter import GlobalRateLimiter

limiter = GlobalRateLimiter.get_multi_limiter_sync(
provider='gemini-2.5-flash',
rpm=limits['rpm'],
tpm=limits['tpm'],
rpd=limits['rpd']
)
```

### 3. Request Enforcement

Before each API request, the rate limiter checks all limits:

```python
async with self.rate_limiter:
# Rate limiter ensures all limits are respected
response = await api_call()
```

If any limit is exceeded, the request automatically waits until it's safe to proceed.

### 4. Token Tracking

After receiving a response, token usage is recorded for TPM tracking:

```python
await self.rate_limiter.record_tokens(total_tokens)
```

## Rate Limiter Behavior

### Sliding Windows

All limits use sliding windows, not fixed time periods:

- **RPM**: Tracks requests in the last 60 seconds
- **TPM**: Tracks tokens used in the last 60 seconds
- **RPD**: Tracks requests in the last 86400 seconds (24 hours)

This provides accurate, real-time enforcement without "reset" windows.

### Waiting Logic

When a limit is hit, the rate limiter:

1. Calculates wait time until the oldest request/tokens expire from the window
2. Logs a message explaining which limit was hit
3. Automatically sleeps until the request can proceed
4. Retries the limit check and allows the request

### Example Log Output

```
[MultiRateLimiter] Rate limit reached: RPM limit (9/9). Waiting 12.5s...
[MultiRateLimiter] Rate limit reached: TPM limit (245000/240000 tokens). Waiting 8.2s...
```

## Global Rate Limiter

Rate limiters are **shared globally** across all instances of the same model:

```python
# Multiple agents using the same model share the same rate limiter
agent1 = GeminiBackend(model='gemini-2.5-flash') # Uses shared limiter
agent2 = GeminiBackend(model='gemini-2.5-flash') # Uses SAME limiter
agent3 = GeminiBackend(model='gemini-2.5-pro') # Uses different limiter
```

This ensures that total usage across all agents respects the provider's limits.

## Advanced Usage

### Programmatic Configuration

You can also create rate limiters programmatically:

```python
from massgen.rate_limiter import MultiRateLimiter

limiter = MultiRateLimiter(
rpm=10, # 10 requests per minute
tpm=100000, # 100K tokens per minute
rpd=500 # 500 requests per day
)

async with limiter:
response = await your_api_call()
await limiter.record_tokens(response.total_tokens)
```

### Disabling Limits

Set a limit to `None` to disable it:

```yaml
gemini:
gemini-2.5-flash:
rpm: 9
tpm: null # No TPM limit
rpd: null # No RPD limit
```

### Conservative Limits

The default configuration uses **conservative limits** (slightly below the actual API limits) to provide a safety buffer and prevent hitting rate limits due to timing variations.

## Architecture

```
┌─────────────────────────────────────┐
│ rate_limits.yaml │
│ (Configuration file) │
└──────────────┬──────────────────────┘
┌─────────────────────────────────────┐
│ RateLimitConfig │
│ (Loads and parses YAML) │
└──────────────┬──────────────────────┘
┌─────────────────────────────────────┐
│ GlobalRateLimiter │
│ (Manages shared limiters) │
└──────────────┬──────────────────────┘
┌─────────────────────────────────────┐
│ MultiRateLimiter │
│ (Enforces RPM, TPM, RPD) │
│ - Sliding windows │
│ - Automatic waiting │
│ - Token tracking │
└─────────────────────────────────────┘
```

## Testing

To test rate limiting behavior:

```python
import asyncio
from massgen.backend.gemini import GeminiBackend

async def test_rate_limiting():
backend = GeminiBackend(model='gemini-2.5-flash')

# Make multiple rapid requests
for i in range(20):
print(f"Request {i+1}...")
async for chunk in backend.stream_with_tools(
messages=[{"role": "user", "content": f"Hello {i}"}],
tools=[]
):
if chunk.type == "content":
print(chunk.content, end='')
print()

asyncio.run(test_rate_limiting())
```

You should see rate limiting messages when limits are exceeded.

## Benefits

1. **Prevents API errors**: Never hit rate limit errors from providers
2. **Automatic retry**: No need to handle rate limit errors manually
3. **Multi-agent safe**: Shared limiters work correctly with multiple agents
4. **Configurable**: Easy to update limits without code changes
5. **Accurate**: Sliding windows provide precise enforcement
6. **Transparent**: Clear logging shows when and why requests are delayed

## Future Enhancements

Potential improvements:

- [ ] Per-user rate limiting
- [ ] Dynamic limit adjustment based on API responses
- [ ] Rate limit metrics and dashboards
- [ ] Circuit breaker integration
- [ ] Cost tracking alongside rate limiting
92 changes: 81 additions & 11 deletions massgen/backend/gemini.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,15 @@
"""

import asyncio
import contextlib
import json
import logging
import os
from typing import Any, AsyncGenerator, Dict, List, Optional
import re
import time
from io import BytesIO
from pathlib import Path
from typing import Any, AsyncGenerator, Dict, List, Optional, Tuple, Union

from ..api_params_handler._gemini_api_params_handler import GeminiAPIParamsHandler
from ..formatter._gemini_formatter import GeminiFormatter
Expand All @@ -34,6 +39,8 @@
log_tool_call,
logger,
)
from ..configs.rate_limits import get_rate_limit_config
from ..rate_limiter import GlobalRateLimiter
from .base import FilesystemSupport, StreamChunk
from .base_with_custom_tool_and_mcp import (
CustomToolAndMCPBackend,
Expand Down Expand Up @@ -109,6 +116,11 @@ def __init__(self, api_key: Optional[str] = None, **kwargs):
# Store Gemini-specific API key before calling parent init
gemini_api_key = api_key or os.getenv("GOOGLE_API_KEY") or os.getenv("GEMINI_API_KEY")

# Extract and remove enable_rate_limit BEFORE calling parent init
# This prevents it from being stored in self.config and passed to Gemini SDK
enable_rate_limit = kwargs.pop('enable_rate_limit', False)
model_name = kwargs.get('model', '')

# Call parent class __init__ - this initializes custom_tool_manager and MCP-related attributes
super().__init__(gemini_api_key, **kwargs)

Expand All @@ -129,6 +141,56 @@ def __init__(self, api_key: Optional[str] = None, **kwargs):

# Active tool result capture during manual tool execution
self._active_tool_result_store: Optional[Dict[str, str]] = None

# Initialize multi-dimensional rate limiter for Gemini API
# Supports RPM (Requests Per Minute), TPM (Tokens Per Minute), RPD (Requests Per Day)
# Configuration loaded from massgen/config/rate_limits.yaml
# This is shared across ALL instances of the SAME MODEL

if enable_rate_limit:
# Load rate limits from configuration
rate_config = get_rate_limit_config()
limits = rate_config.get_limits('gemini', model_name)

# Create a unique provider key for the rate limiter
# Use the full model name to distinguish between different models
provider_key = f"gemini-{model_name}" if model_name else "gemini-default"

# Initialize multi-dimensional rate limiter
self.rate_limiter = GlobalRateLimiter.get_multi_limiter_sync(
provider=provider_key,
rpm=limits.get('rpm'),
tpm=limits.get('tpm'),
rpd=limits.get('rpd')
)

# Log the active rate limits
active_limits = []
if limits.get('rpm'):
active_limits.append(f"RPM: {limits['rpm']}")
if limits.get('tpm'):
active_limits.append(f"TPM: {limits['tpm']:,}")
if limits.get('rpd'):
active_limits.append(f"RPD: {limits['rpd']}")

if active_limits:
logger.info(
f"[Gemini] Multi-dimensional rate limiter enabled for '{model_name}': "
f"{', '.join(active_limits)}"
)
else:
logger.info(f"[Gemini] No rate limits configured for '{model_name}'")
else:
# No rate limiting - use a pass-through limiter
self.rate_limiter = None
logger.info(f"[Gemini] Rate limiting disabled for '{model_name}'")

def _get_rate_limiter_context(self):
"""Get rate limiter context manager (or nullcontext if rate limiting is disabled)."""
if self.rate_limiter is not None:
return self.rate_limiter
else:
return contextlib.nullcontext()

def _setup_permission_hooks(self):
"""Override base class - Gemini uses session-based permissions, not function hooks."""
Expand Down Expand Up @@ -235,6 +297,10 @@ async def stream_with_tools(self, messages: List[Dict[str, Any]], tools: List[Di
source="mcp_tools",
)

# Remove enable_rate_limit from kwargs if present (it's already been consumed in __init__)
# This prevents it from being passed to Gemini SDK API calls
kwargs.pop('enable_rate_limit', None)

# Merge constructor config with stream kwargs
all_params = {**self.config, **kwargs}

Expand Down Expand Up @@ -412,11 +478,13 @@ async def stream_with_tools(self, messages: List[Dict[str, Any]], tools: List[Di
# ====================================================================
# Streaming Phase: Stream with simple function call detection
# ====================================================================
stream = await client.aio.models.generate_content_stream(
model=model_name,
contents=full_content,
config=config,
)
# Use async streaming call with sessions/tools (with rate limiting if enabled)
async with self._get_rate_limiter_context():
stream = await client.aio.models.generate_content_stream(
model=model_name,
contents=full_content,
config=config,
)

# Simple list accumulation for function calls (no trackers)
captured_function_calls = []
Expand Down Expand Up @@ -731,11 +799,13 @@ async def stream_with_tools(self, messages: List[Dict[str, Any]], tools: List[Di
last_continuation_chunk = None

while True:
continuation_stream = await client.aio.models.generate_content_stream(
model=model_name,
contents=conversation_history,
config=config,
)
# Use same config as before (with rate limiting if enabled)
async with self._get_rate_limiter_context():
continuation_stream = await client.aio.models.generate_content_stream(
model=model_name,
contents=conversation_history,
config=config,
)
stream = continuation_stream

new_function_calls = []
Expand Down
Loading