Skip to content

Commit a2ce34a

Browse files
google-genai-botcopybara-github
authored andcommitted
feat: Migrate BigQuery logging to Storage Write API
This commit refactors the `BigQueryAgentAnalyticsPlugin` to leverage the modern BigQuery Storage Write API, replacing the previous implementation that used the legacy `insert_rows_json` method (based on `tabledata.insertAll`). **Key Changes:** * **Switched to Storage Write API:** Event logs are now ingested using the `BigQueryWriteClient` from the `google-cloud-bigquery-storage` library. * **Utilizes Default Stream:** We are using the `_default` stream for sending data, which is an efficient method for streaming in data without needing to manage stream lifecycles. This is ideal for continuous event logging. * **Apache Arrow Format:** Log entries are converted to the Apache Arrow format using `pyarrow` before being sent. The BigQuery table schema is dynamically converted to an Arrow schema. This binary format is more efficient than JSON. * **Updated Initialization:** The plugin now initializes both the standard `bigquery.Client` (for table management) and the `BigQueryWriteClient`. * **Test Updates:** Unit tests in `test_bigquery_logging_plugin.py` have been comprehensively updated to mock the new `BigQueryWriteClient`, `bq_schema_utils`, and `pyarrow` components. Tests now verify calls to `append_rows` and the data structure passed to create the Arrow RecordBatch. **Benefits of this change:** * **Improved Performance:** The Storage Write API is designed for high-throughput streaming and offers better performance compared to the legacy API. * **Reduced Cost:** Ingesting data via the Storage Write API is generally more cost-effective. * **Enhanced Reliability:** The Storage Write API provides more robust streaming capabilities. * **Modernization:** Aligns the plugin with the recommended best practices for BigQuery data ingestion. This change enhances the efficiency and scalability of the BigQuery logging plugin. PiperOrigin-RevId: 828655496
1 parent fa5c546 commit a2ce34a

File tree

3 files changed

+712
-312
lines changed

3 files changed

+712
-312
lines changed

pyproject.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ dependencies = [
3232
"fastapi>=0.115.0, <1.119.0", # FastAPI framework
3333
"google-api-python-client>=2.157.0, <3.0.0", # Google API client discovery
3434
"google-cloud-aiplatform[agent_engines]>=1.125.0, <2.0.0", # For VertexAI integrations, e.g. example store.
35+
"google-cloud-bigquery-storage>=2.0.0",
36+
"google-cloud-bigquery>=2.2.0",
3537
"google-cloud-bigtable>=2.32.0", # For Bigtable database
3638
"google-cloud-discoveryengine>=0.13.12, <0.14.0", # For Discovery Engine Search Tool
3739
"google-cloud-secret-manager>=2.22.0, <3.0.0", # Fetching secrets in RestAPI Tool
@@ -48,6 +50,7 @@ dependencies = [
4850
"opentelemetry-exporter-otlp-proto-http>=1.36.0",
4951
"opentelemetry-resourcedetector-gcp>=1.9.0a0, <2.0.0",
5052
"opentelemetry-sdk>=1.37.0, <=1.37.0",
53+
"pyarrow>=14.0.0",
5154
"pydantic>=2.0, <3.0.0", # For data validation/models
5255
"python-dateutil>=2.9.0.post0, <3.0.0", # For Vertext AI Session Service
5356
"python-dotenv>=1.0.0, <2.0.0", # To manage environment variables
@@ -206,7 +209,7 @@ asyncio_mode = "auto"
206209
python_version = "3.9"
207210
exclude = "tests/"
208211
plugins = ["pydantic.mypy"]
209-
# Start with non-strict mode, and switch to strict mode later.
212+
# Start with non-strict mode, and swtich to strict mode later.
210213
# strict = true
211214
disable_error_code = ["import-not-found", "import-untyped", "unused-ignore"]
212215
follow_imports = "skip"

src/google/adk/plugins/bigquery_logging_plugin.py

Lines changed: 217 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,18 @@
2626
from typing import List
2727
from typing import Optional
2828
from typing import TYPE_CHECKING
29+
import warnings
2930

3031
import google.api_core.client_info
3132
import google.auth
3233
from google.auth import exceptions as auth_exceptions
3334
from google.cloud import bigquery
3435
from google.cloud import exceptions as cloud_exceptions
36+
from google.cloud.bigquery import schema as bq_schema
37+
from google.cloud.bigquery_storage_v1 import types as bq_storage_types
38+
from google.cloud.bigquery_storage_v1.services.big_query_write.async_client import BigQueryWriteAsyncClient
3539
from google.genai import types
40+
import pyarrow as pa
3641

3742
from .. import version
3843
from ..agents.base_agent import BaseAgent
@@ -48,6 +53,132 @@
4853
from ..agents.invocation_context import InvocationContext
4954

5055

56+
def _pyarrow_datetime():
57+
return pa.timestamp("us", tz=None)
58+
59+
60+
def _pyarrow_numeric():
61+
return pa.decimal128(38, 9)
62+
63+
64+
def _pyarrow_bignumeric():
65+
return pa.decimal256(76, 38)
66+
67+
68+
def _pyarrow_time():
69+
return pa.time64("us")
70+
71+
72+
def _pyarrow_timestamp():
73+
return pa.timestamp("us", tz="UTC")
74+
75+
76+
_BQ_TO_ARROW_SCALARS = {
77+
"BOOL": pa.bool_,
78+
"BOOLEAN": pa.bool_,
79+
"BYTES": pa.binary,
80+
"DATE": pa.date32,
81+
"DATETIME": _pyarrow_datetime,
82+
"FLOAT": pa.float64,
83+
"FLOAT64": pa.float64,
84+
"GEOGRAPHY": pa.string,
85+
"INT64": pa.int64,
86+
"INTEGER": pa.int64,
87+
"JSON": pa.string,
88+
"NUMERIC": _pyarrow_numeric,
89+
"BIGNUMERIC": _pyarrow_bignumeric,
90+
"STRING": pa.string,
91+
"TIME": _pyarrow_time,
92+
"TIMESTAMP": _pyarrow_timestamp,
93+
}
94+
95+
96+
def _bq_to_arrow_scalars(bq_scalar: str):
97+
return _BQ_TO_ARROW_SCALARS.get(bq_scalar)
98+
99+
100+
_BQ_FIELD_TYPE_TO_ARROW_FIELD_METADATA = {
101+
"GEOGRAPHY": {
102+
b"ARROW:extension:name": b"google:sqlType:geography",
103+
b"ARROW:extension:metadata": b'{"encoding": "WKT"}',
104+
},
105+
"DATETIME": {b"ARROW:extension:name": b"google:sqlType:datetime"},
106+
"JSON": {b"ARROW:extension:name": b"google:sqlType:json"},
107+
}
108+
_STRUCT_TYPES = ("RECORD", "STRUCT")
109+
110+
111+
def _bq_to_arrow_struct_data_type(field):
112+
arrow_fields = []
113+
for subfield in field.fields:
114+
arrow_subfield = _bq_to_arrow_field(subfield)
115+
if arrow_subfield:
116+
arrow_fields.append(arrow_subfield)
117+
else:
118+
return None
119+
return pa.struct(arrow_fields)
120+
121+
122+
def _bq_to_arrow_range_data_type(field):
123+
if field is None:
124+
raise ValueError("Range element type cannot be None")
125+
element_type = field.element_type.upper()
126+
arrow_element_type = _bq_to_arrow_scalars(element_type)()
127+
return pa.struct([("start", arrow_element_type), ("end", arrow_element_type)])
128+
129+
130+
def _bq_to_arrow_data_type(field):
131+
if field.mode is not None and field.mode.upper() == "REPEATED":
132+
inner_type = _bq_to_arrow_data_type(
133+
bq_schema.SchemaField(field.name, field.field_type, fields=field.fields)
134+
)
135+
if inner_type:
136+
return pa.list_(inner_type)
137+
return None
138+
139+
field_type_upper = field.field_type.upper() if field.field_type else ""
140+
if field_type_upper in _STRUCT_TYPES:
141+
return _bq_to_arrow_struct_data_type(field)
142+
143+
if field_type_upper == "RANGE":
144+
return _bq_to_arrow_range_data_type(field.range_element_type)
145+
146+
data_type_constructor = _bq_to_arrow_scalars(field_type_upper)
147+
if data_type_constructor is None:
148+
return None
149+
return data_type_constructor()
150+
151+
152+
def _bq_to_arrow_field(bq_field, array_type=None):
153+
arrow_type = _bq_to_arrow_data_type(bq_field)
154+
if arrow_type is not None:
155+
if array_type is not None:
156+
arrow_type = array_type
157+
metadata = _BQ_FIELD_TYPE_TO_ARROW_FIELD_METADATA.get(
158+
bq_field.field_type.upper() if bq_field.field_type else ""
159+
)
160+
return pa.field(
161+
bq_field.name,
162+
arrow_type,
163+
nullable=False if bq_field.mode.upper() == "REPEATED" else True,
164+
metadata=metadata,
165+
)
166+
167+
warnings.warn(f"Unable to determine Arrow type for field '{bq_field.name}'.")
168+
return None
169+
170+
171+
def to_arrow_schema(bq_schema_list):
172+
"""Return the Arrow schema, corresponding to a given BigQuery schema."""
173+
arrow_fields = []
174+
for bq_field in bq_schema_list:
175+
arrow_field = _bq_to_arrow_field(bq_field)
176+
if arrow_field is None:
177+
return None
178+
arrow_fields.append(arrow_field)
179+
return pa.schema(arrow_fields)
180+
181+
51182
@dataclasses.dataclass
52183
class BigQueryLoggerConfig:
53184
"""Configuration for the BigQueryAgentAnalyticsPlugin.
@@ -131,9 +262,6 @@ class BigQueryAgentAnalyticsPlugin(BasePlugin):
131262
- Events yielded by agents
132263
- Errors during model and tool execution
133264
134-
Each log entry includes a timestamp, event type, agent name, session ID,
135-
invocation ID, user ID, content payload, and any error messages.
136-
137265
Logging behavior can be customized using the BigQueryLoggerConfig.
138266
"""
139267

@@ -155,6 +283,8 @@ def __init__(
155283
self._init_done = False
156284
self._init_succeeded = False
157285

286+
self._write_client: BigQueryWriteAsyncClient | None = None
287+
self._arrow_schema: pa.Schema | None = None
158288
if not self._config.enabled:
159289
logging.info(
160290
"BigQueryAgentAnalyticsPlugin %s is disabled by configuration.",
@@ -177,24 +307,38 @@ def _ensure_initialized_sync(self):
177307
self._init_done = True
178308
try:
179309
credentials, _ = google.auth.default(
180-
scopes=["https://www.googleapis.com/auth/bigquery"]
310+
scopes=[
311+
"https://www.googleapis.com/auth/bigquery",
312+
"https://www.googleapis.com/auth/cloud-platform", # For Storage Write
313+
]
181314
)
182315
client_info = google.api_core.client_info.ClientInfo(
183316
user_agent=f"google-adk-bq-logger/{version.__version__}"
184317
)
318+
319+
# 1. Init BQ Client (for create_dataset/create_table)
185320
self._bq_client = bigquery.Client(
186321
project=self._project_id,
187322
credentials=credentials,
188323
client_info=client_info,
189324
)
325+
326+
# 2. Init BQ Storage Write Client
327+
self._write_client = BigQueryWriteAsyncClient(
328+
credentials=credentials, client_info=client_info
329+
)
330+
190331
logging.info(
191-
"BigQuery client initialized for project %s", self._project_id
332+
"BigQuery clients (Core & Storage Write) initialized for"
333+
" project %s",
334+
self._project_id,
192335
)
193336
dataset_ref = self._bq_client.dataset(self._dataset_id)
194337
self._bq_client.create_dataset(dataset_ref, exists_ok=True)
195338
logging.info("Dataset %s ensured to exist.", self._dataset_id)
196339
table_ref = dataset_ref.table(self._table_id)
197-
# Schema without separate token columns
340+
341+
# Schema
198342
schema = [
199343
bigquery.SchemaField("timestamp", "TIMESTAMP"),
200344
bigquery.SchemaField("event_type", "STRING"),
@@ -208,6 +352,11 @@ def _ensure_initialized_sync(self):
208352
table = bigquery.Table(table_ref, schema=schema)
209353
self._bq_client.create_table(table, exists_ok=True)
210354
logging.info("Table %s ensured to exist.", self._table_id)
355+
356+
# 4. Store Arrow schema for Write API
357+
self._arrow_schema = to_arrow_schema(schema) # USE LOCAL VERSION
358+
# --- self._table_ref_str removed ---
359+
211360
self._init_succeeded = True
212361
except (
213362
auth_exceptions.GoogleAuthError,
@@ -252,13 +401,17 @@ async def _log_to_bigquery_async(self, event_dict: dict[str, Any]):
252401
)
253402
# Optionally log a generic message or the error
254403

255-
def _sync_log():
256-
self._ensure_initialized_sync()
257-
if not self._init_succeeded or not self._bq_client:
404+
try:
405+
if not self._init_done:
406+
await asyncio.to_thread(self._ensure_initialized_sync)
407+
408+
# Check for all required Storage Write API components
409+
if not (
410+
self._init_succeeded and self._write_client and self._arrow_schema
411+
):
412+
logging.warning("BigQuery write client not initialized. Skipping log.")
258413
return
259-
table_ref = self._bq_client.dataset(self._dataset_id).table(
260-
self._table_id
261-
)
414+
262415
default_row = {
263416
"timestamp": datetime.now(timezone.utc).isoformat(),
264417
"event_type": None,
@@ -271,21 +424,41 @@ def _sync_log():
271424
}
272425
insert_row = {**default_row, **event_dict}
273426

274-
errors = self._bq_client.insert_rows_json(table_ref, [insert_row])
275-
if errors:
276-
logging.error(
277-
"Errors occurred while inserting to BigQuery table %s.%s: %s",
278-
self._dataset_id,
279-
self._table_id,
280-
errors,
281-
)
427+
# --- START MODIFIED STORAGE WRITE API LOGIC (using Default Stream) ---
428+
# 1. Convert the single row dict to a PyArrow RecordBatch
429+
# pa.RecordBatch.from_pydict requires a dict of lists
430+
pydict = {
431+
field.name: [insert_row.get(field.name)]
432+
for field in self._arrow_schema
433+
}
434+
batch = pa.RecordBatch.from_pydict(pydict, schema=self._arrow_schema)
282435

283-
try:
284-
await asyncio.to_thread(_sync_log)
285-
except (
286-
cloud_exceptions.GoogleCloudError,
287-
auth_exceptions.GoogleAuthError,
288-
) as e:
436+
# 2. Create the AppendRowsRequest, pointing to the default stream
437+
request = bq_storage_types.AppendRowsRequest(
438+
write_stream=(
439+
f"projects/{self._project_id}/datasets/{self._dataset_id}"
440+
f"/tables/{self._table_id}/_default"
441+
)
442+
)
443+
request.arrow_rows.writer_schema.serialized_schema = (
444+
self._arrow_schema.serialize().to_pybytes()
445+
)
446+
447+
request.arrow_rows.rows.serialized_record_batch = (
448+
batch.serialize().to_pybytes()
449+
)
450+
451+
# 3. Send the request and check for errors
452+
response_iterator = self._write_client.append_rows(requests=[request])
453+
async for response in response_iterator:
454+
if response.row_errors:
455+
logging.error(
456+
"Errors occurred while writing to BigQuery (Storage Write"
457+
" API): %s",
458+
response.row_errors,
459+
)
460+
break # Only one response expected
461+
except Exception as e:
289462
logging.exception("Failed to log to BigQuery: %s", e)
290463

291464
async def on_user_message_callback(
@@ -407,20 +580,26 @@ async def before_model_callback(
407580

408581
# Log Full System Instruction
409582
system_instruction_text = "None"
410-
if llm_request.config and hasattr(llm_request.config, "system_instruction"):
583+
if llm_request.config and llm_request.config.system_instruction:
411584
si = llm_request.config.system_instruction
412-
if si:
413-
if isinstance(si, str):
414-
system_instruction_text = si
415-
elif hasattr(si, "__iter__"): # Handles list, tuple, etc. of parts
416-
# Join parts together to form the complete system instruction
417-
system_instruction_text = "".join(
418-
part.text for part in si if hasattr(part, "text")
419-
)
420-
else:
421-
system_instruction_text = str(si)
585+
if isinstance(si, str):
586+
system_instruction_text = si
587+
elif isinstance(si, types.Content):
588+
system_instruction_text = "".join(p.text for p in si.parts if p.text)
589+
elif isinstance(si, types.Part):
590+
system_instruction_text = si.text
591+
elif hasattr(si, "__iter__"):
592+
texts = []
593+
for item in si:
594+
if isinstance(item, str):
595+
texts.append(item)
596+
elif isinstance(item, types.Part) and item.text:
597+
texts.append(item.text)
598+
system_instruction_text = "".join(texts)
422599
else:
423-
system_instruction_text = "Empty"
600+
system_instruction_text = str(si)
601+
elif llm_request.config and not llm_request.config.system_instruction:
602+
system_instruction_text = "Empty"
424603

425604
content_parts.append(f"System Prompt: {system_instruction_text}")
426605

0 commit comments

Comments
 (0)