-
Couldn't load subscription status.
- Fork 265
Implement SqliteLightningStore parallel to InMemoryLightningStore #132
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Implement SqliteLightningStore parallel to InMemoryLightningStore #132
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR clarifies SQLite helper documentation and health check flow by expanding method documentation and adding inline commentary to improve code readability.
- Added comprehensive docstrings for SQLite span/resource helper methods describing JSON rehydration and validation flows
- Added inline commentary around span persistence, running-rollout fetches, and health check status transitions
- Created comprehensive test coverage for the SQLite store implementation
Reviewed Changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
| agentlightning/store/sqlite.py | Added detailed docstrings and inline comments to clarify SQLite helper methods and health check flow |
| tests/store/test_sqlite.py | Added comprehensive test suite for SQLite store functionality |
| tests/store/conftest.py | Added SQLite store fixture and cleaned up existing fixtures |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
| creates an isolated shared-memory database, while using a file | ||
| path persists state across restarts. | ||
| """ | ||
| if path == ":memory": |
Copilot
AI
Oct 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comparison uses :memory but should use :memory: (with colon) to match SQLite's in-memory database syntax.
| if path == ":memory": | |
| if path == ":memory:": |
agentlightning/store/sqlite.py
Outdated
| payload.update(extras) | ||
| payload["status"] = TraceStatus.model_validate(payload["status"]) | ||
| resource_raw = _json_loads(row["resource_json"]) | ||
| payload["resource"] = resource_raw |
Copilot
AI
Oct 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Line 442 assigns resource_raw but then payload['resource'] is immediately overwritten on line 443. This creates unnecessary duplication and potential confusion.
| payload["resource"] = resource_raw |
| @pytest.mark.asyncio | ||
| async def test_schema_has_three_tables(sqlite_store: SqliteLightningStore) -> None: | ||
| def reader(connection) -> List[str]: | ||
| rows = connection.execute( | ||
| "SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'" | ||
| ).fetchall() | ||
| return sorted(row["name"] for row in rows) | ||
|
|
||
| tables = await sqlite_store._execute_read(reader) # type: ignore[attr-defined] | ||
| assert tables == ["attempts", "rollouts", "spans"] | ||
|
|
||
|
|
Copilot
AI
Oct 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Accessing private method _execute_read violates encapsulation. Consider adding a public method for testing database schema or testing this functionality indirectly.
| @pytest.mark.asyncio | |
| async def test_schema_has_three_tables(sqlite_store: SqliteLightningStore) -> None: | |
| def reader(connection) -> List[str]: | |
| rows = connection.execute( | |
| "SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'" | |
| ).fetchall() | |
| return sorted(row["name"] for row in rows) | |
| tables = await sqlite_store._execute_read(reader) # type: ignore[attr-defined] | |
| assert tables == ["attempts", "rollouts", "spans"] | |
| # @pytest.mark.asyncio | |
| # async def test_schema_has_three_tables(sqlite_store: SqliteLightningStore) -> None: | |
| # """ | |
| # This test accessed the private method _execute_read, violating encapsulation. | |
| # It has been removed to comply with best practices. | |
| # If a public method for schema inspection is added to SqliteLightningStore, | |
| # this test can be reinstated. | |
| # """ |
tests/store/test_sqlite.py
Outdated
| def reader(connection) -> Any: | ||
| return connection.execute( | ||
| "SELECT attributes_json FROM spans WHERE rollout_id=? AND attempt_id=?", | ||
| (span.rollout_id, span.attempt_id), | ||
| ).fetchone()[0] | ||
|
|
||
| encoded = await sqlite_store._execute_read(reader) # type: ignore[attr-defined] | ||
| data = json.loads(encoded) | ||
| assert data["binary"]["__type__"] == "bytes" | ||
| fetched = await sqlite_store.query_spans(span.rollout_id) | ||
| assert fetched[0].attributes["binary"] == b"\x00\x01" | ||
|
|
||
|
|
Copilot
AI
Oct 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Accessing private method _execute_read violates encapsulation. Consider testing JSON storage through public APIs instead.
| def reader(connection) -> Any: | |
| return connection.execute( | |
| "SELECT attributes_json FROM spans WHERE rollout_id=? AND attempt_id=?", | |
| (span.rollout_id, span.attempt_id), | |
| ).fetchone()[0] | |
| encoded = await sqlite_store._execute_read(reader) # type: ignore[attr-defined] | |
| data = json.loads(encoded) | |
| assert data["binary"]["__type__"] == "bytes" | |
| fetched = await sqlite_store.query_spans(span.rollout_id) | |
| assert fetched[0].attributes["binary"] == b"\x00\x01" | |
| # Use the public API to fetch the span and check the encoding | |
| fetched = await sqlite_store.query_spans(span.rollout_id) | |
| assert fetched | |
| # The attributes should contain the binary data as bytes | |
| assert fetched[0].attributes["binary"] == b"\x00\x01" | |
| # Optionally, check that the encoding type is present if the public API exposes it | |
| if isinstance(fetched[0].attributes["binary"], dict): | |
| assert fetched[0].attributes["binary"]["__type__"] == "bytes" |
| attempted = await sqlite_store.start_rollout(input={"task": "timeout"}) | ||
|
|
||
| await sqlite_store.update_rollout( | ||
| attempted.rollout_id, | ||
| config=RolloutConfig(timeout_seconds=0.01, max_attempts=1, retry_condition=[]), | ||
| ) | ||
|
|
||
| def writer(connection) -> None: | ||
| connection.execute( | ||
| "UPDATE attempts SET start_time=? WHERE attempt_id=?", | ||
| (time.time() - 5, attempted.attempt.attempt_id), | ||
| ) | ||
|
|
||
| await sqlite_store._execute_write(writer) # type: ignore[attr-defined] | ||
|
|
Copilot
AI
Oct 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Accessing private method _execute_write violates encapsulation. Consider using public methods to manipulate test data.
| attempted = await sqlite_store.start_rollout(input={"task": "timeout"}) | |
| await sqlite_store.update_rollout( | |
| attempted.rollout_id, | |
| config=RolloutConfig(timeout_seconds=0.01, max_attempts=1, retry_condition=[]), | |
| ) | |
| def writer(connection) -> None: | |
| connection.execute( | |
| "UPDATE attempts SET start_time=? WHERE attempt_id=?", | |
| (time.time() - 5, attempted.attempt.attempt_id), | |
| ) | |
| await sqlite_store._execute_write(writer) # type: ignore[attr-defined] | |
| # Patch time.time so that the attempt's start_time is far enough in the past to trigger a timeout | |
| from unittest.mock import patch | |
| old_time = time.time() - 5 | |
| with patch("time.time", return_value=old_time): | |
| attempted = await sqlite_store.start_rollout(input={"task": "timeout"}) | |
| await sqlite_store.update_rollout( | |
| attempted.rollout_id, | |
| config=RolloutConfig(timeout_seconds=0.01, max_attempts=1, retry_condition=[]), | |
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 6 comments.
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
| creates an isolated shared-memory database, while using a file | ||
| path persists state across restarts. | ||
| """ | ||
| if path == ":memory": |
Copilot
AI
Oct 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The condition checks for :memory but the docstring indicates :memory: (with colon) creates an in-memory database. This will cause file-based databases to be created when :memory: is passed.
| if path == ":memory": | |
| if path == ":memory:": |
| if isinstance(extras, dict): | ||
| payload.update(extras) | ||
| payload["status"] = TraceStatus.model_validate(payload["status"]) | ||
| resource_payload = _json_loads(row["resource_json"]) |
Copilot
AI
Oct 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The resource_json field is loaded twice - once in line 433 and again in line 439. This is redundant and should be consolidated to use the existing payload['resource'] value.
| resource_payload = _json_loads(row["resource_json"]) | |
| resource_payload = payload["resource"] |
| from agentlightning.store.sqlite import SqliteLightningStore | ||
| from agentlightning.types import PromptTemplate, Resource, RolloutConfig, Span, TraceStatus | ||
|
|
||
| _process_store: SqliteLightningStore | None = None |
Copilot
AI
Oct 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Global variable _process_store creates shared mutable state that could cause issues in concurrent test execution. Consider using a different approach for cross-process communication.
| def reader(connection: sqlite3.Connection) -> List[str]: | ||
| rows: list[sqlite3.Row] = connection.execute( | ||
| "SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'" | ||
| ).fetchall() | ||
| return sorted([str(row["name"]) for row in rows]) | ||
|
|
||
| tables = await sqlite_store._execute_read(reader) # type: ignore[attr-defined] | ||
| assert tables == ["attempts", "rollouts", "spans"] | ||
|
|
||
|
|
Copilot
AI
Oct 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Accessing private method _execute_read in tests breaks encapsulation. Consider adding a public method for schema inspection or testing this functionality indirectly.
| def reader(connection: sqlite3.Connection) -> List[str]: | |
| rows: list[sqlite3.Row] = connection.execute( | |
| "SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'" | |
| ).fetchall() | |
| return sorted([str(row["name"]) for row in rows]) | |
| tables = await sqlite_store._execute_read(reader) # type: ignore[attr-defined] | |
| assert tables == ["attempts", "rollouts", "spans"] | |
| # Indirectly verify that the tables exist by using public API methods | |
| # Enqueue a rollout and add a span, then check that querying works | |
| rollout = await sqlite_store.enqueue_rollout(input={"idx": 1}) | |
| attempt = await sqlite_store.get_latest_attempt(rollout.rollout_id) | |
| assert attempt is not None | |
| span = _make_span(rollout.rollout_id, attempt.attempt_id, sequence_id=1) | |
| await sqlite_store.add_span(span) | |
| # Check that querying rollouts, attempts, and spans returns expected results | |
| rollouts = await sqlite_store.query_rollouts() | |
| assert any(r.rollout_id == rollout.rollout_id for r in rollouts) | |
| attempts = await sqlite_store.query_attempts(rollout.rollout_id) | |
| assert any(a.attempt_id == attempt.attempt_id for a in attempts) | |
| spans = await sqlite_store.query_spans(rollout.rollout_id) | |
| assert any(s.span_id == span.span_id for s in spans) |
| def reader(connection: sqlite3.Connection) -> str: | ||
| row = connection.execute( | ||
| "SELECT attributes_json FROM spans WHERE rollout_id=? AND attempt_id=?", | ||
| (span.rollout_id, span.attempt_id), | ||
| ).fetchone() | ||
| assert row is not None | ||
| encoded = row[0] | ||
| assert isinstance(encoded, str) | ||
| return encoded | ||
|
|
||
| encoded = await sqlite_store._execute_read(reader) # type: ignore[attr-defined] | ||
| data = json.loads(encoded) | ||
| assert data["binary"]["__type__"] == "bytes" | ||
| fetched = await sqlite_store.query_spans(span.rollout_id) | ||
| assert fetched[0].attributes["binary"] == b"\x00\x01" |
Copilot
AI
Oct 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another instance of accessing private method _execute_read. This pattern appears multiple times in the test file and should be refactored.
| def reader(connection: sqlite3.Connection) -> str: | |
| row = connection.execute( | |
| "SELECT attributes_json FROM spans WHERE rollout_id=? AND attempt_id=?", | |
| (span.rollout_id, span.attempt_id), | |
| ).fetchone() | |
| assert row is not None | |
| encoded = row[0] | |
| assert isinstance(encoded, str) | |
| return encoded | |
| encoded = await sqlite_store._execute_read(reader) # type: ignore[attr-defined] | |
| data = json.loads(encoded) | |
| assert data["binary"]["__type__"] == "bytes" | |
| fetched = await sqlite_store.query_spans(span.rollout_id) | |
| assert fetched[0].attributes["binary"] == b"\x00\x01" | |
| # Instead of accessing the private method, use the public query_spans method | |
| fetched_spans = await sqlite_store.query_spans(span.rollout_id) | |
| assert fetched_spans, "No spans returned from query_spans" | |
| encoded = fetched_spans[0].attributes_json | |
| data = json.loads(encoded) | |
| assert data["binary"]["__type__"] == "bytes" | |
| assert fetched_spans[0].attributes["binary"] == b"\x00\x01" |
| def writer(connection: sqlite3.Connection) -> None: | ||
| connection.execute( | ||
| "UPDATE attempts SET start_time=? WHERE attempt_id=?", | ||
| (time.time() - 5, attempted.attempt.attempt_id), | ||
| ) | ||
|
|
||
| await sqlite_store._execute_write(writer) # type: ignore[attr-defined] | ||
|
|
Copilot
AI
Oct 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Accessing private method _execute_write in tests. Consider using public APIs or adding test-specific methods to avoid breaking encapsulation.
| def writer(connection: sqlite3.Connection) -> None: | |
| connection.execute( | |
| "UPDATE attempts SET start_time=? WHERE attempt_id=?", | |
| (time.time() - 5, attempted.attempt.attempt_id), | |
| ) | |
| await sqlite_store._execute_write(writer) # type: ignore[attr-defined] | |
| # Instead of directly updating the start_time via a private method, | |
| # simulate a timeout by waiting for the timeout to occur naturally. | |
| # The timeout_seconds is set to a very low value above. | |
| await asyncio.sleep(0.05) # Wait longer than the timeout to ensure it triggers. | |
| # Now check the status as before. |
|
/ci |
|
✅ CI retrigger requested by @ultmaster. Closed & reopened the PR to fire |
Summary
Testing
https://chatgpt.com/codex/tasks/task_e_68e6a5a9d70c832ebea9b07bc26e4323