Skip to content

Commit 47fb3fc

Browse files
committed
sample conversation/session manager on top of kurrentdb
1 parent 96ba32e commit 47fb3fc

File tree

5 files changed

+407
-7
lines changed

5 files changed

+407
-7
lines changed
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
# KurrentDB Conversation Manager for Strands Agent
2+
3+
A persistent conversation manager implementation for Strands Agent that uses KurrentDB as the storage backend. This manager enables conversation history persistence, state management, and recovery capabilities for AI agents.
4+
5+
## Overview
6+
7+
The `KurrentDBConversationManager` extends the Strands framework's `ConversationManager` to provide:
8+
9+
- **Persistent Message Storage**: All conversation messages are stored as events in KurrentDB streams
10+
- **State Checkpointing**: Save and restore agent state at any point in the conversation
11+
- **Conversation History Management**: Configure retention policies with maximum age or count limits
12+
- **Recovery Capabilities**: Restore agent state and conversation history after restarts
13+
14+
## Installation
15+
16+
### Prerequisites
17+
18+
- Python 3.7+
19+
- KurrentDB instance running (default: `localhost:2113`)
20+
- Required Python packages:
21+
```bash
22+
pip install strands kurrentdbclient
23+
```
24+
## Quick Start
25+
```json
26+
pip install strands-agents[anthropic]
27+
```
28+
Setup an instance of KurrentDB: https://console.kurrent.cloud/signup or https://aws.amazon.com/marketplace/pp/prodview-kxo6grvoovk2y?sr=0-1&ref_=beagle&applicationId=AWSMPContessa
29+
```python
30+
from strands import Agent
31+
from strands.models.anthropic import AnthropicModel
32+
from kurrentdb_session_manager import KurrentDBConversationManager
33+
34+
unique_run_id = "run-01"
35+
kurrentdb_conversation_manager = (
36+
KurrentDBConversationManager(unique_run_id, "connection string here")
37+
) # replace with your actual connection string
38+
39+
# kurrentdb_conversation_manager.set_max_window_age(60) # Set max window age to 60 seconds
40+
model = AnthropicModel(
41+
client_args={
42+
"api_key": "Your API KEY here", # Replace with your actual API key
43+
},
44+
# **model_config
45+
max_tokens= 4096,
46+
model_id="claude-3-5-haiku-latest",
47+
params={
48+
"temperature": 0.7,
49+
}
50+
)
51+
52+
poet_agent = Agent(
53+
system_prompt="You are a hungry poet who loves to write haikus about everything.",
54+
model=model,
55+
conversation_manager=kurrentdb_conversation_manager, # Assuming no specific conversation manager is needed
56+
)
57+
poet_agent("Write a haiku about the beauty of nature.")
58+
kurrentdb_conversation_manager.save_agent_state(unique_run_id=unique_run_id,
59+
state={"messages": poet_agent.messages,
60+
"system_prompt": poet_agent.system_prompt})
61+
poet_agent("Based on the previous haiku, write another one about the changing seasons.")
62+
poet_agent = kurrentdb_conversation_manager.restore_agent_state(agent=poet_agent,unique_run_id=unique_run_id)
63+
poet_agent("What did we just talk about?")
64+
65+
66+
67+
68+
```
69+
70+
## Features
71+
72+
### 1. Persistent Message Storage
73+
74+
Every message in the conversation is automatically stored as an event in KurrentDB:
75+
- Each message is stored with its role (user/assistant/system) as the event type
76+
- Messages are stored in order with stream positions for accurate replay
77+
78+
### 2. State Management
79+
80+
Save and restore complete agent state:
81+
82+
```python
83+
# Save current state
84+
conversation_manager.save_agent_state(
85+
unique_run_id="run-01",
86+
state={
87+
"messages": agent.messages,
88+
"system_prompt": agent.system_prompt,
89+
"custom_data": "any additional state"
90+
}
91+
)
92+
93+
# Restore state later
94+
agent = conversation_manager.restore_agent_state(
95+
agent=agent,
96+
unique_run_id="run-01"
97+
)
98+
```
99+
100+
### 3. Conversation Retention Policies
101+
102+
Configure how long conversations are retained:
103+
104+
```python
105+
# Set maximum age (in seconds)
106+
conversation_manager.set_max_window_age(3600) # Keep messages for 1 hour
107+
108+
# Set maximum message count
109+
conversation_manager.set_max_window_size(100) # Keep last 100 messages
110+
```
111+
112+
### 4. Window Size Management
113+
114+
Control how many messages are loaded into memory:
115+
116+
```python
117+
conversation_manager = KurrentDBConversationManager(
118+
unique_run_id="run-01",
119+
connection_string="esdb://localhost:2113?Tls=false",
120+
window_size=40 # Load last 40 messages by default
121+
)
122+
```
123+
124+
## API Reference
125+
126+
### Constructor
127+
128+
```python
129+
KurrentDBConversationManager(
130+
unique_run_id: str,
131+
connection_string: str = "esdb://localhost:2113?Tls=false",
132+
window_size: int = 40,
133+
reducer_function = lambda x: x
134+
)
135+
```
136+
137+
**Parameters:**
138+
- `unique_run_id`: Unique identifier for the conversation stream
139+
- `connection_string`: KurrentDB connection string
140+
- `window_size`: Maximum number of messages to keep in memory
141+
- `reducer_function`: Function to reduce messages if context limit is exceeded
142+
143+
### Methods
144+
145+
#### `apply_management(messages: Messages) -> None`
146+
Applies management strategies to the messages list and persists new messages to KurrentDB.
147+
148+
#### `reduce_context(messages: Messages, e: Optional[Exception] = None) -> Optional[Messages]`
149+
Reduces the context window size when it exceeds limits using the configured reducer function.
150+
151+
#### `set_max_window_age(max_age: int) -> None`
152+
Sets the maximum age for messages in the conversation (KurrentDB stream metadata).
153+
154+
#### `set_max_window_size(max_count: int) -> None`
155+
Sets the maximum number of messages to retain in the stream.
156+
157+
#### `save_agent_state(unique_run_id: str, state: dict) -> None`
158+
Saves the current agent state to a checkpoint stream.
159+
160+
#### `restore_agent_state(agent: Agent, unique_run_id: str) -> Agent`
161+
Restores agent state from the checkpoint stream.
162+
163+
## How It Works
164+
165+
### Stream Structure
166+
167+
The manager uses two types of streams in KurrentDB:
168+
169+
1. **Conversation Stream** (`{unique_run_id}`):
170+
- Contains all conversation messages as events
171+
- Event types: "user", "assistant", "system", "StateRestored"
172+
- Messages stored in chronological order
173+
174+
2. **Checkpoint Stream** (`strands_checkpoint-{unique_run_id}`):
175+
- Contains agent state snapshots
176+
- Used for recovery and state restoration
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
from strands import Agent
2+
from strands.models.anthropic import AnthropicModel
3+
from kurrentdb_session_manager import KurrentDBConversationManager
4+
5+
unique_run_id = "run-01"
6+
kurrentdb_conversation_manager = (
7+
KurrentDBConversationManager(unique_run_id, "esdb://localhost:2113?Tls=false")
8+
) # replace with your actual connection string
9+
10+
# kurrentdb_conversation_manager.set_max_window_age(60) # Set max window age to 60 seconds
11+
model = AnthropicModel(
12+
client_args={
13+
"api_key": "Your API KEY here", # Replace with your actual API key
14+
},
15+
# **model_config
16+
max_tokens= 4096,
17+
model_id="claude-3-5-haiku-latest",
18+
params={
19+
"temperature": 0.7,
20+
}
21+
)
22+
23+
poet_agent = Agent(
24+
system_prompt="You are a hungry poet who loves to write haikus about everything.",
25+
model=model,
26+
conversation_manager=kurrentdb_conversation_manager, # Assuming no specific conversation manager is needed
27+
)
28+
poet_agent("Write a haiku about the beauty of nature.")
29+
kurrentdb_conversation_manager.save_agent_state(unique_run_id=unique_run_id,
30+
state={"messages": poet_agent.messages,
31+
"system_prompt": poet_agent.system_prompt})
32+
poet_agent("Based on the previous haiku, write another one about the changing seasons.")
33+
poet_agent = kurrentdb_conversation_manager.restore_agent_state(agent=poet_agent,unique_run_id=unique_run_id)
34+
poet_agent("What did we just talk about?")
35+
36+
37+
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
from strands.agent.conversation_manager import ConversationManager
2+
from strands.agent import Agent
3+
from strands.types.content import Messages
4+
from typing import Optional
5+
from kurrentdbclient import KurrentDBClient, NewEvent, StreamState
6+
from kurrentdbclient.exceptions import NotFoundError
7+
import json
8+
9+
"""
10+
Example usage:
11+
from strands import Agent
12+
from strands.models.anthropic import AnthropicModel
13+
from kurrentdb_session_manager import KurrentDBConversationManager
14+
15+
unique_run_id = "run-01"
16+
kurrentdb_conversation_manager = (
17+
KurrentDBConversationManager(unique_run_id, "esdb://localhost:2113?Tls=false")
18+
) # replace with your actual connection string
19+
20+
# kurrentdb_conversation_manager.set_max_window_age(60) # Set max window age to 60 seconds
21+
model = AnthropicModel(
22+
client_args={
23+
"api_key": "Your API KEY here", # Replace with your actual API key
24+
},
25+
# **model_config
26+
max_tokens= 4096,
27+
model_id="claude-3-5-haiku-latest",
28+
params={
29+
"temperature": 0.7,
30+
}
31+
)
32+
33+
poet_agent = Agent(
34+
system_prompt="You are a hungry poet who loves to write haikus about everything.",
35+
model=model,
36+
conversation_manager=kurrentdb_conversation_manager, # Assuming no specific conversation manager is needed
37+
)
38+
poet_agent("Write a haiku about the beauty of nature.")
39+
kurrentdb_conversation_manager.save_agent_state(unique_run_id=unique_run_id,
40+
state={"messages": poet_agent.messages,
41+
"system_prompt": poet_agent.system_prompt})
42+
poet_agent("Based on the previous haiku, write another one about the changing seasons.")
43+
poet_agent = kurrentdb_conversation_manager.restore_agent_state(agent=poet_agent,unique_run_id=unique_run_id)
44+
poet_agent("What did we just talk about?")
45+
"""
46+
class KurrentDBConversationManager(ConversationManager):
47+
client: KurrentDBClient
48+
def __init__(self, unique_run_id:str,
49+
connection_string: str = "esdb://localhost:2113?Tls=false",
50+
window_size: int = 40,
51+
reducer_function = lambda x: x) -> None:
52+
"""
53+
Initializes the KurrentDB conversation manager with a connection string.
54+
:param connection_string: The connection string for KurrentDB.
55+
"""
56+
self.client = KurrentDBClient(connection_string)
57+
self.stream_id = unique_run_id
58+
self.checkpoint = -1 # Default checkpoint value, no messages processed yet
59+
self.window_size = window_size # Maximum number of messages to keep in the conversation
60+
self.reducer_function = reducer_function # Function to reduce messages if needed
61+
62+
def apply_management(self, messages: Messages) -> None:
63+
"""Apply management strategies to the messages list."""
64+
justRestored = False
65+
try:
66+
events = self.client.get_stream(
67+
stream_name=self.stream_id,
68+
resolve_links=True,
69+
backwards=True,
70+
limit=1
71+
) # Get the last event in the stream
72+
if len(events) == 1 and events[0].type == "StateRestored":
73+
# then we don't need to remove any message
74+
justRestored = True
75+
self.checkpoint = events[0].stream_position
76+
77+
except NotFoundError as e:
78+
#this means that the stream does not exist yet
79+
if self.checkpoint != -1:
80+
# Handle inconsistency in the outside the conversation manager
81+
raise Exception("Inconsistent state: Stream not found but checkpoint exists.")
82+
if self.checkpoint != -1 and justRestored == False:
83+
# remove already added messages from the messages list
84+
messages = messages[self.checkpoint + 1:] # Keep only new messages
85+
events = []
86+
for message in messages:
87+
metadata = {}
88+
event = NewEvent(type=message["role"], data=bytes(json.dumps(message), 'utf-8'),
89+
content_type='application/json',
90+
metadata=bytes(json.dumps(metadata), 'utf-8'))
91+
events.append(event)
92+
self.client.append_to_stream(
93+
stream_name=self.stream_id,
94+
events=events,
95+
current_version=StreamState.ANY # TODO: tighten this up if needed if agent is called in parallel and order is important(is that possible?)
96+
)
97+
self.checkpoint += len(events) # Update checkpoint after appending messages
98+
99+
100+
def reduce_context(self, messages: Messages, e: Optional[Exception] = None) -> Optional[Messages]:
101+
"""Function to reduce the context window size when it exceeds the model's limit.
102+
"""
103+
return self.reducer_function(messages)
104+
105+
def set_max_window_age(self, max_age: int) -> None:
106+
"""Set the maximum age for messages in the conversation inside KurrentDB."""
107+
self.client.set_stream_metadata(self.stream_id,
108+
metadata={"$maxAge": max_age},
109+
current_version=StreamState.ANY
110+
)
111+
112+
def set_max_window_size(self, max_count: int) -> None:
113+
"""Set the maximum size for the conversation history inside KurrentDB."""
114+
self.client.set_stream_metadata(self.stream_id,
115+
metadata={"$maxCount": max_count},
116+
current_version=StreamState.ANY
117+
)
118+
119+
def save_agent_state(self, unique_run_id: str, state: dict) -> None:
120+
"""
121+
Saves the agent state variables to a checkpoint stream in KurrentDB.
122+
This event contains which position in the stream the agent is at and other state variables.
123+
"""
124+
del state["messages"] # We already keep messages in the stream, so we don't need to save them again.
125+
state["kurrentdb_checkpoint"] = self.checkpoint
126+
state["kurrentdb_checkpoint_stream_id"] = unique_run_id
127+
event = NewEvent(type="agent_state", data=bytes(json.dumps(state), 'utf-8'),
128+
content_type='application/json')
129+
self.client.append_to_stream(
130+
stream_name="strands_checkpoint-" + unique_run_id,
131+
events=[event],
132+
current_version=StreamState.ANY)
133+
134+
135+
def restore_agent_state(self, agent: Agent, unique_run_id: str) -> Agent:
136+
"""
137+
Builds the agent state messages from a stream in KurrentDB.
138+
"""
139+
try:
140+
checkpoint_event = self.client.get_stream(
141+
stream_name="strands_checkpoint-" + unique_run_id,
142+
resolve_links=True,
143+
backwards=True,
144+
limit=1
145+
)
146+
if not checkpoint_event or len(checkpoint_event) == 0:
147+
return None # No state found
148+
149+
state = json.loads(checkpoint_event[0].data.decode('utf-8'))
150+
self.stream_id = state["kurrentdb_checkpoint_stream_id"]
151+
self.checkpoint = state["kurrentdb_checkpoint"]
152+
153+
messages = []
154+
message_events = self.client.get_stream(
155+
stream_name=unique_run_id,
156+
resolve_links=True,
157+
backwards=True,
158+
stream_position=self.checkpoint,
159+
limit=self.window_size
160+
)
161+
for event in message_events:
162+
if event.type == "StateRestored":
163+
break #reached of this state
164+
message = json.loads(event.data.decode('utf-8'))
165+
messages.insert(0,message)
166+
state["messages"] = messages
167+
agent.messages = messages
168+
169+
#append an event to know restore state was called
170+
system_event = NewEvent(
171+
type="StateRestored",
172+
data=bytes("{}", 'utf-8'),
173+
content_type='application/json',
174+
metadata=bytes("{}", 'utf-8')
175+
)
176+
self.client.append_to_stream(
177+
stream_name=unique_run_id,
178+
events=[system_event],
179+
current_version=StreamState.ANY
180+
)
181+
return agent
182+
except NotFoundError as e:
183+
return agent #unchanged agent, no state to restore

0 commit comments

Comments
 (0)