diff --git a/go/api/v1alpha1/agent_types.go b/go/api/v1alpha1/agent_types.go index ffd98b31c..2e7da3ac4 100644 --- a/go/api/v1alpha1/agent_types.go +++ b/go/api/v1alpha1/agent_types.go @@ -20,6 +20,7 @@ import ( "encoding/json" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "trpc.group/trpc-go/trpc-a2a-go/server" @@ -116,9 +117,15 @@ type AnyType struct { json.RawMessage `json:",inline"` } +// +kubebuilder:validation:XValidation:message="maxPayloadSize must be positive",rule="!has(self.maxPayloadSize) || self.maxPayloadSize.Value() > 0" type A2AConfig struct { // +kubebuilder:validation:MinItems=1 Skills []AgentSkill `json:"skills,omitempty"` + // MaxPayloadSize is the maximum payload size for A2A requests. + // Supports Kubernetes quantity format (e.g., "50Mi", "100MB"). + // Must be positive (> 0). If not specified, uses the default a2a-python limit (~7-8MB). + // +optional + MaxPayloadSize *resource.Quantity `json:"maxPayloadSize,omitempty"` } type AgentSkill server.AgentSkill diff --git a/go/api/v1alpha2/agent_types.go b/go/api/v1alpha2/agent_types.go index 80feeff47..18bc26304 100644 --- a/go/api/v1alpha2/agent_types.go +++ b/go/api/v1alpha2/agent_types.go @@ -21,6 +21,7 @@ import ( "fmt" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "sigs.k8s.io/controller-runtime/pkg/client" @@ -238,9 +239,15 @@ func (t *TypedLocalReference) GroupKind() schema.GroupKind { } } +// +kubebuilder:validation:XValidation:message="maxPayloadSize must be positive",rule="!has(self.maxPayloadSize) || self.maxPayloadSize.Value() > 0" type A2AConfig struct { // +kubebuilder:validation:MinItems=1 Skills []AgentSkill `json:"skills,omitempty"` + // MaxPayloadSize is the maximum payload size for A2A requests. + // Supports Kubernetes quantity format (e.g., "50Mi", "100MB"). + // Must be positive (> 0). If not specified, uses the default a2a-python limit (~7-8MB). + // +optional + MaxPayloadSize *resource.Quantity `json:"maxPayloadSize,omitempty"` } type AgentSkill server.AgentSkill diff --git a/go/internal/adk/types.go b/go/internal/adk/types.go index ca08f8ab3..a8f6edfd6 100644 --- a/go/internal/adk/types.go +++ b/go/internal/adk/types.go @@ -246,23 +246,25 @@ type RemoteAgentConfig struct { // See `python/packages/kagent-adk/src/kagent/adk/types.py` for the python version of this type AgentConfig struct { - Model Model `json:"model"` - Description string `json:"description"` - Instruction string `json:"instruction"` - HttpTools []HttpMcpServerConfig `json:"http_tools"` - SseTools []SseMcpServerConfig `json:"sse_tools"` - RemoteAgents []RemoteAgentConfig `json:"remote_agents"` - ExecuteCode bool `json:"execute_code,omitempty"` + Model Model `json:"model"` + Description string `json:"description"` + Instruction string `json:"instruction"` + HttpTools []HttpMcpServerConfig `json:"http_tools"` + SseTools []SseMcpServerConfig `json:"sse_tools"` + RemoteAgents []RemoteAgentConfig `json:"remote_agents"` + ExecuteCode bool `json:"execute_code,omitempty"` + MaxPayloadSize *int64 `json:"max_payload_size,omitempty"` } func (a *AgentConfig) UnmarshalJSON(data []byte) error { var tmp struct { - Model json.RawMessage `json:"model"` - Description string `json:"description"` - Instruction string `json:"instruction"` - HttpTools []HttpMcpServerConfig `json:"http_tools"` - SseTools []SseMcpServerConfig `json:"sse_tools"` - RemoteAgents []RemoteAgentConfig `json:"remote_agents"` + Model json.RawMessage `json:"model"` + Description string `json:"description"` + Instruction string `json:"instruction"` + HttpTools []HttpMcpServerConfig `json:"http_tools"` + SseTools []SseMcpServerConfig `json:"sse_tools"` + RemoteAgents []RemoteAgentConfig `json:"remote_agents"` + MaxPayloadSize *int64 `json:"max_payload_size,omitempty"` } if err := json.Unmarshal(data, &tmp); err != nil { return err @@ -277,6 +279,7 @@ func (a *AgentConfig) UnmarshalJSON(data []byte) error { a.HttpTools = tmp.HttpTools a.SseTools = tmp.SseTools a.RemoteAgents = tmp.RemoteAgents + a.MaxPayloadSize = tmp.MaxPayloadSize return nil } diff --git a/go/internal/controller/translator/agent/adk_api_translator.go b/go/internal/controller/translator/agent/adk_api_translator.go index 0eeafb507..b516d9bdd 100644 --- a/go/internal/controller/translator/agent/adk_api_translator.go +++ b/go/internal/controller/translator/agent/adk_api_translator.go @@ -528,6 +528,20 @@ func (a *adkApiTranslator) translateInlineAgent(ctx context.Context, agent *v1al ExecuteCode: false && ptr.Deref(agent.Spec.Declarative.ExecuteCodeBlocks, false), //ignored due to this issue https://github.com/google/adk-python/issues/3921. } + // Extract maxPayloadSize from A2AConfig if present + if agent.Spec.Declarative.A2AConfig != nil && agent.Spec.Declarative.A2AConfig.MaxPayloadSize != nil { + maxPayloadSizeBytes := agent.Spec.Declarative.A2AConfig.MaxPayloadSize.Value() + // Validate that maxPayloadSize is positive (Kubernetes quantities can be negative) + if maxPayloadSizeBytes <= 0 { + return nil, nil, nil, fmt.Errorf( + "maxPayloadSize must be positive, got %d bytes (quantity: %s)", + maxPayloadSizeBytes, + agent.Spec.Declarative.A2AConfig.MaxPayloadSize.String(), + ) + } + cfg.MaxPayloadSize = &maxPayloadSizeBytes + } + for _, tool := range agent.Spec.Declarative.Tools { // Skip tools that are not applicable to the model provider switch { diff --git a/go/internal/controller/translator/agent/testdata/inputs/agent_with_max_payload_size.yaml b/go/internal/controller/translator/agent/testdata/inputs/agent_with_max_payload_size.yaml new file mode 100644 index 000000000..1c54c8a14 --- /dev/null +++ b/go/internal/controller/translator/agent/testdata/inputs/agent_with_max_payload_size.yaml @@ -0,0 +1,39 @@ +operation: translateAgent +targetObject: test-agent +namespace: test + +objects: + - apiVersion: v1 + kind: Secret + metadata: + name: openai-secret + namespace: test + data: + api-key: c2stdGVzdC1hcGkta2V5 # base64 encoded "sk-test-api-key" + - apiVersion: kagent.dev/v1alpha2 + kind: ModelConfig + metadata: + name: default-model-config + namespace: test + spec: + provider: OpenAI + model: gpt-4o + apiKeySecret: openai-secret + apiKeySecretKey: api-key + defaultHeaders: + User-Agent: "kagent/1.0" + - apiVersion: kagent.dev/v1alpha2 + kind: Agent + metadata: + name: test-agent + namespace: test + spec: + type: Declarative + description: Agent with max payload size configuration + declarative: + modelConfig: default-model-config + systemMessage: You are a helpful assistant. + a2aConfig: + maxPayloadSize: "50Mi" + tools: [] + diff --git a/python/packages/kagent-adk/src/kagent/adk/_a2a.py b/python/packages/kagent-adk/src/kagent/adk/_a2a.py index 886ec08a2..cd4ebe0bb 100644 --- a/python/packages/kagent-adk/src/kagent/adk/_a2a.py +++ b/python/packages/kagent-adk/src/kagent/adk/_a2a.py @@ -20,7 +20,7 @@ from google.adk.sessions import InMemorySessionService from google.genai import types -from kagent.core.a2a import KAgentRequestContextBuilder, KAgentTaskStore +from kagent.core.a2a import KAgentRequestContextBuilder, KAgentTaskStore, patch_a2a_payload_limit from ._agent_executor import A2aAgentExecutor from ._lifespan import LifespanManager @@ -55,6 +55,7 @@ def __init__( app_name: str, lifespan: Optional[Callable[[Any], Any]] = None, plugins: List[BasePlugin] = None, + max_payload_size: Optional[int] = None, ): self.root_agent = root_agent self.kagent_url = kagent_url @@ -62,6 +63,7 @@ def __init__( self.agent_card = agent_card self._lifespan = lifespan self.plugins = plugins if plugins is not None else [] + self.max_payload_size = max_payload_size def build(self, local=False) -> FastAPI: session_service = InMemorySessionService() @@ -111,6 +113,10 @@ def create_runner() -> Runner: if not local: lifespan_manager.add(token_service.lifespan()) + # Patch a2a-python's payload size limit if specified + if self.max_payload_size is not None: + patch_a2a_payload_limit(self.max_payload_size) + app = FastAPI(lifespan=lifespan_manager) # Health check/readiness probe diff --git a/python/packages/kagent-adk/src/kagent/adk/cli.py b/python/packages/kagent-adk/src/kagent/adk/cli.py index 90a640920..4027192a6 100644 --- a/python/packages/kagent-adk/src/kagent/adk/cli.py +++ b/python/packages/kagent-adk/src/kagent/adk/cli.py @@ -67,7 +67,14 @@ def static( root_agent = agent_config.to_agent(app_cfg.name, sts_integration) maybe_add_skills(root_agent) - kagent_app = KAgentApp(root_agent, agent_card, app_cfg.url, app_cfg.app_name, plugins=plugins) + kagent_app = KAgentApp( + root_agent, + agent_card, + app_cfg.url, + app_cfg.app_name, + plugins=plugins, + max_payload_size=agent_config.max_payload_size, + ) server = kagent_app.build() configure_tracing(server) @@ -179,7 +186,14 @@ async def test_agent(agent_config: AgentConfig, agent_card: AgentCard, task: str plugins = [sts_integration] root_agent = agent_config.to_agent(app_cfg.name, sts_integration) maybe_add_skills(root_agent) - app = KAgentApp(root_agent, agent_card, app_cfg.url, app_cfg.app_name, plugins=plugins) + app = KAgentApp( + root_agent, + agent_card, + app_cfg.url, + app_cfg.app_name, + plugins=plugins, + max_payload_size=agent_config.max_payload_size, + ) await app.test(task) diff --git a/python/packages/kagent-adk/src/kagent/adk/types.py b/python/packages/kagent-adk/src/kagent/adk/types.py index 90350e95d..2fea53c33 100644 --- a/python/packages/kagent-adk/src/kagent/adk/types.py +++ b/python/packages/kagent-adk/src/kagent/adk/types.py @@ -102,6 +102,11 @@ class AgentConfig(BaseModel): sse_tools: list[SseMcpServerConfig] | None = None # SSE MCP tools remote_agents: list[RemoteAgentConfig] | None = None # remote agents execute_code: bool | None = None + max_payload_size: int | None = Field( + default=None, + gt=0, + description="Maximum payload size in bytes for A2A requests. Must be positive (> 0).", + ) def to_agent(self, name: str, sts_integration: Optional[ADKTokenPropagationPlugin] = None) -> Agent: if name is None or not str(name).strip(): diff --git a/python/packages/kagent-adk/tests/unittests/test_a2a_payload_size.py b/python/packages/kagent-adk/tests/unittests/test_a2a_payload_size.py new file mode 100644 index 000000000..0b1242f06 --- /dev/null +++ b/python/packages/kagent-adk/tests/unittests/test_a2a_payload_size.py @@ -0,0 +1,144 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for A2A payload size configuration. + +NOTE: These tests verify that the configuration value can be set and patched, +but do NOT verify end-to-end behavior (i.e., that payloads of the configured +size can actually be sent/received). +""" + +from unittest import mock + +import pytest + +from kagent.core.a2a import patch_a2a_payload_limit + + +class TestPatchA2APayloadLimit: + """Tests for patch_a2a_payload_limit function.""" + + def test_patch_max_payload_size_exists(self): + """Test patching when MAX_PAYLOAD_SIZE constant exists.""" + mock_jsonrpc_app = mock.MagicMock() + mock_jsonrpc_app.MAX_PAYLOAD_SIZE = 7 * 1024 * 1024 # 7MB default + + with mock.patch("builtins.__import__", return_value=mock_jsonrpc_app): + patch_a2a_payload_limit(50 * 1024 * 1024) # 50MB + + assert mock_jsonrpc_app.MAX_PAYLOAD_SIZE == 50 * 1024 * 1024 + + def test_patch_underscore_max_payload_size_exists(self): + """Test patching when _MAX_PAYLOAD_SIZE constant exists.""" + mock_jsonrpc_app = mock.MagicMock() + mock_jsonrpc_app._MAX_PAYLOAD_SIZE = 7 * 1024 * 1024 # 7MB default + del mock_jsonrpc_app.MAX_PAYLOAD_SIZE # Ensure MAX_PAYLOAD_SIZE doesn't exist + + with mock.patch("builtins.__import__", return_value=mock_jsonrpc_app): + patch_a2a_payload_limit(100 * 1024 * 1024) # 100MB + + assert mock_jsonrpc_app._MAX_PAYLOAD_SIZE == 100 * 1024 * 1024 + + def test_patch_module_not_found(self): + """Test behavior when jsonrpc_app module cannot be imported.""" + + def mock_import(name, fromlist=None): + if "jsonrpc" in name: + raise ImportError("No module named 'a2a.server.apps.jsonrpc'") + return mock.MagicMock() + + with mock.patch("builtins.__import__", side_effect=mock_import): + # Should not raise an exception, just log debug message + patch_a2a_payload_limit(50 * 1024 * 1024) + + def test_patch_no_payload_size_constant(self): + """Test behavior when payload size constant doesn't exist.""" + mock_jsonrpc_app = mock.MagicMock() + # Remove any payload size attributes + if hasattr(mock_jsonrpc_app, "MAX_PAYLOAD_SIZE"): + delattr(mock_jsonrpc_app, "MAX_PAYLOAD_SIZE") + if hasattr(mock_jsonrpc_app, "_MAX_PAYLOAD_SIZE"): + delattr(mock_jsonrpc_app, "_MAX_PAYLOAD_SIZE") + + with mock.patch("builtins.__import__", return_value=mock_jsonrpc_app): + # Should not raise an exception, just log debug message + patch_a2a_payload_limit(50 * 1024 * 1024) + + def test_patch_with_different_import_paths(self): + """Test that function tries multiple import paths.""" + import_calls = [] + + def mock_import(name, fromlist=None): + import_calls.append(name) + if name == "a2a.server.apps.jsonrpc.jsonrpc_app": + raise ImportError("First path failed") + if name == "a2a.server.apps.jsonrpc_app": + mock_jsonrpc_app = mock.MagicMock() + mock_jsonrpc_app.MAX_PAYLOAD_SIZE = 7 * 1024 * 1024 + return mock_jsonrpc_app + return mock.MagicMock() + + with mock.patch("builtins.__import__", side_effect=mock_import): + patch_a2a_payload_limit(50 * 1024 * 1024) + + # Should have tried both import paths + assert "a2a.server.apps.jsonrpc.jsonrpc_app" in import_calls + assert "a2a.server.apps.jsonrpc_app" in import_calls + + def test_patch_raises_error_for_zero(self): + """Test that patch_a2a_payload_limit raises ValueError for zero.""" + with pytest.raises(ValueError, match="must be positive"): + patch_a2a_payload_limit(0) + + def test_patch_raises_error_for_negative(self): + """Test that patch_a2a_payload_limit raises ValueError for negative values.""" + with pytest.raises(ValueError, match="must be positive"): + patch_a2a_payload_limit(-1) + with pytest.raises(ValueError, match="must be positive"): + patch_a2a_payload_limit(-100 * 1024 * 1024) + + def test_patch_warns_on_override(self, caplog): + """Test that patching with a different value logs a warning.""" + mock_jsonrpc_app = mock.MagicMock() + mock_jsonrpc_app.MAX_PAYLOAD_SIZE = 7 * 1024 * 1024 # 7MB default + + with mock.patch("builtins.__import__", return_value=mock_jsonrpc_app): + # First patch + patch_a2a_payload_limit(50 * 1024 * 1024) # 50MB + assert mock_jsonrpc_app.MAX_PAYLOAD_SIZE == 50 * 1024 * 1024 + + # Second patch with different value - should warn + patch_a2a_payload_limit(100 * 1024 * 1024) # 100MB + assert mock_jsonrpc_app.MAX_PAYLOAD_SIZE == 100 * 1024 * 1024 + + # Check that warning was logged + assert any( + "Overriding previously patched" in record.message and "process-level setting" in record.message + for record in caplog.records + ) + + def test_patch_no_warning_on_same_value(self, caplog): + """Test that patching with the same value doesn't log a warning.""" + mock_jsonrpc_app = mock.MagicMock() + mock_jsonrpc_app.MAX_PAYLOAD_SIZE = 7 * 1024 * 1024 # 7MB default + + with mock.patch("builtins.__import__", return_value=mock_jsonrpc_app): + # First patch + patch_a2a_payload_limit(50 * 1024 * 1024) # 50MB + + # Second patch with same value - should not warn + patch_a2a_payload_limit(50 * 1024 * 1024) # 50MB again + + # Check that no warning was logged + assert not any("Overriding previously patched" in record.message for record in caplog.records) diff --git a/python/packages/kagent-adk/tests/unittests/test_agent_config.py b/python/packages/kagent-adk/tests/unittests/test_agent_config.py new file mode 100644 index 000000000..1b77ae478 --- /dev/null +++ b/python/packages/kagent-adk/tests/unittests/test_agent_config.py @@ -0,0 +1,127 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json + +import pytest + +from kagent.adk.types import AgentConfig, OpenAI + + +class TestAgentConfigMaxPayloadSize: + """Tests for AgentConfig max_payload_size field.""" + + def test_agent_config_with_max_payload_size(self): + """Test AgentConfig parsing with max_payload_size.""" + config_dict = { + "model": { + "type": "openai", + "model": "gpt-4o", + "base_url": "", + }, + "description": "Test agent", + "instruction": "You are a helpful assistant.", + "max_payload_size": 50 * 1024 * 1024, # 50MB + } + + config = AgentConfig.model_validate(config_dict) + + assert config.max_payload_size == 50 * 1024 * 1024 + assert config.description == "Test agent" + assert config.instruction == "You are a helpful assistant." + + def test_agent_config_without_max_payload_size(self): + """Test AgentConfig parsing without max_payload_size (backward compatibility).""" + config_dict = { + "model": { + "type": "openai", + "model": "gpt-4o", + "base_url": "", + }, + "description": "Test agent", + "instruction": "You are a helpful assistant.", + } + + config = AgentConfig.model_validate(config_dict) + + assert config.max_payload_size is None + assert config.description == "Test agent" + + def test_agent_config_max_payload_size_none(self): + """Test AgentConfig parsing with max_payload_size explicitly set to None.""" + config_dict = { + "model": { + "type": "openai", + "model": "gpt-4o", + "base_url": "", + }, + "description": "Test agent", + "instruction": "You are a helpful assistant.", + "max_payload_size": None, + } + + config = AgentConfig.model_validate(config_dict) + + assert config.max_payload_size is None + + def test_agent_config_json_serialization(self): + """Test that AgentConfig can be serialized to JSON with max_payload_size.""" + config = AgentConfig( + model=OpenAI( + type="openai", + model="gpt-4o", + base_url="", + ), + description="Test agent", + instruction="You are a helpful assistant.", + max_payload_size=100 * 1024 * 1024, # 100MB + ) + + json_str = config.model_dump_json() + parsed = json.loads(json_str) + + assert parsed["max_payload_size"] == 100 * 1024 * 1024 + assert parsed["description"] == "Test agent" + + def test_agent_config_max_payload_size_zero_raises_error(self): + """Test that AgentConfig validation rejects zero max_payload_size.""" + config_dict = { + "model": { + "type": "openai", + "model": "gpt-4o", + "base_url": "", + }, + "description": "Test agent", + "instruction": "You are a helpful assistant.", + "max_payload_size": 0, + } + + with pytest.raises(Exception): # Pydantic validation error + AgentConfig.model_validate(config_dict) + + def test_agent_config_max_payload_size_negative_raises_error(self): + """Test that AgentConfig validation rejects negative max_payload_size.""" + config_dict = { + "model": { + "type": "openai", + "model": "gpt-4o", + "base_url": "", + }, + "description": "Test agent", + "instruction": "You are a helpful assistant.", + "max_payload_size": -100, + } + + with pytest.raises(Exception): # Pydantic validation error + AgentConfig.model_validate(config_dict) diff --git a/python/packages/kagent-core/src/kagent/core/a2a/__init__.py b/python/packages/kagent-core/src/kagent/core/a2a/__init__.py index 0665d4123..73188015b 100644 --- a/python/packages/kagent-core/src/kagent/core/a2a/__init__.py +++ b/python/packages/kagent-core/src/kagent/core/a2a/__init__.py @@ -23,6 +23,7 @@ handle_tool_approval_interrupt, is_input_required_task, ) +from ._payload_limit import patch_a2a_payload_limit from ._requests import KAgentRequestContextBuilder from ._task_result_aggregator import TaskResultAggregator from ._task_store import KAgentTaskStore @@ -56,4 +57,6 @@ "is_input_required_task", # HITL handlers "handle_tool_approval_interrupt", + # Payload limit utilities + "patch_a2a_payload_limit", ] diff --git a/python/packages/kagent-core/src/kagent/core/a2a/_payload_limit.py b/python/packages/kagent-core/src/kagent/core/a2a/_payload_limit.py new file mode 100644 index 000000000..07ed19772 --- /dev/null +++ b/python/packages/kagent-core/src/kagent/core/a2a/_payload_limit.py @@ -0,0 +1,86 @@ +"""Utilities for configuring A2A payload size limits.""" + +import logging + +logger = logging.getLogger(__name__) + +# Track the last patched value to detect conflicts +_last_patched_value: int | None = None + + +def patch_a2a_payload_limit(max_body_size: int) -> None: + """Attempt to patch a2a-python library's hardcoded payload size limit. + + This function attempts to patch the a2a-python library's internal payload + size limit by modifying the MAX_PAYLOAD_SIZE or _MAX_PAYLOAD_SIZE constant + in the jsonrpc_app module. + + Args: + max_body_size: Maximum payload size in bytes to set. Must be positive (> 0). + + Raises: + ValueError: If max_body_size is not positive. + + Note: + **IMPORTANT LIMITATION**: This function modifies a global module-level constant + in the a2a-python library. In scenarios where multiple KAgentApp instances are + created in the same process with different max_payload_size values, only the + last value set will be effective. This is a process-level setting, not per-agent. + + This function gracefully handles cases where: + - The jsonrpc_app module cannot be imported + - The payload size constant doesn't exist + - The module structure has changed + + In such cases, it logs a debug message and continues without raising + an exception. + + If called multiple times with different values, a warning will be logged + indicating that the previous value is being overridden. + """ + global _last_patched_value + + if max_body_size <= 0: + raise ValueError(f"max_body_size must be positive, got {max_body_size}") + + # Warn if patching with a different value than previously set + if _last_patched_value is not None and _last_patched_value != max_body_size: + logger.warning( + f"Overriding previously patched max_payload_size ({_last_patched_value} bytes) " + f"with new value ({max_body_size} bytes). This is a process-level setting - " + f"all agents in this process will use the new value." + ) + + try: + # Try different import paths for jsonrpc_app module + jsonrpc_app = None + import_paths = [ + "a2a.server.apps.jsonrpc.jsonrpc_app", + "a2a.server.apps.jsonrpc_app", + ] + for path in import_paths: + try: + jsonrpc_app = __import__(path, fromlist=[""]) + break + except ImportError: + continue + + if jsonrpc_app is None: + logger.debug("Could not find a2a-python jsonrpc_app module to patch") + return + + # Check if MAX_PAYLOAD_SIZE or similar constant exists + if hasattr(jsonrpc_app, "MAX_PAYLOAD_SIZE"): + jsonrpc_app.MAX_PAYLOAD_SIZE = max_body_size + logger.info(f"Patched a2a-python MAX_PAYLOAD_SIZE to {max_body_size} bytes") + _last_patched_value = max_body_size + # Also check for _MAX_PAYLOAD_SIZE or other variants + elif hasattr(jsonrpc_app, "_MAX_PAYLOAD_SIZE"): + jsonrpc_app._MAX_PAYLOAD_SIZE = max_body_size + logger.info(f"Patched a2a-python _MAX_PAYLOAD_SIZE to {max_body_size} bytes") + _last_patched_value = max_body_size + else: + logger.debug("Could not find MAX_PAYLOAD_SIZE constant in a2a-python jsonrpc_app") + except (ImportError, AttributeError) as e: + # If patching fails, log a debug message but continue + logger.debug(f"Could not patch a2a-python payload limit: {e}") diff --git a/python/packages/kagent-core/src/kagent/core/tracing/_span_processor.py b/python/packages/kagent-core/src/kagent/core/tracing/_span_processor.py index 673d1949b..d7ab3c2eb 100644 --- a/python/packages/kagent-core/src/kagent/core/tracing/_span_processor.py +++ b/python/packages/kagent-core/src/kagent/core/tracing/_span_processor.py @@ -1,8 +1,8 @@ """Custom span processor to add kagent attributes to all spans in a request context.""" import logging -from typing import Optional from contextvars import Token +from typing import Optional from opentelemetry import context as otel_context from opentelemetry.sdk.trace import ReadableSpan, Span, SpanProcessor diff --git a/python/packages/kagent-crewai/src/kagent/crewai/_a2a.py b/python/packages/kagent-crewai/src/kagent/crewai/_a2a.py index 73adc1fc3..41b3a0e36 100644 --- a/python/packages/kagent-crewai/src/kagent/crewai/_a2a.py +++ b/python/packages/kagent-crewai/src/kagent/crewai/_a2a.py @@ -13,7 +13,7 @@ from crewai import Crew, Flow from kagent.core import KAgentConfig, configure_tracing -from kagent.core.a2a import KAgentRequestContextBuilder, KAgentTaskStore +from kagent.core.a2a import KAgentRequestContextBuilder, KAgentTaskStore, patch_a2a_payload_limit from ._executor import CrewAIAgentExecutor, CrewAIAgentExecutorConfig @@ -42,12 +42,14 @@ def __init__( config: KAgentConfig = KAgentConfig(), executor_config: CrewAIAgentExecutorConfig | None = None, tracing: bool = True, + max_payload_size: int | None = None, ): self._crew = crew self.agent_card = AgentCard.model_validate(agent_card) self.config = config self.executor_config = executor_config or CrewAIAgentExecutorConfig() self.tracing = tracing + self.max_payload_size = max_payload_size def build(self) -> FastAPI: http_client = httpx.AsyncClient(base_url=self.config.url) @@ -73,6 +75,11 @@ def build(self) -> FastAPI: ) faulthandler.enable() + + # Patch a2a-python's payload size limit if specified + if self.max_payload_size is not None: + patch_a2a_payload_limit(self.max_payload_size) + app = FastAPI( title=f"KAgent CrewAI: {self.config.app_name}", description=f"CrewAI agent with KAgent integration: {self.agent_card.description}", diff --git a/python/packages/kagent-crewai/src/kagent/crewai/_executor.py b/python/packages/kagent-crewai/src/kagent/crewai/_executor.py index f74244a60..03feb8ac2 100644 --- a/python/packages/kagent-crewai/src/kagent/crewai/_executor.py +++ b/python/packages/kagent-crewai/src/kagent/crewai/_executor.py @@ -23,7 +23,6 @@ from crewai import Crew, Flow from crewai.memory import LongTermMemory - from kagent.core.tracing._span_processor import ( clear_kagent_span_attributes, set_kagent_span_attributes, diff --git a/python/packages/kagent-langgraph/src/kagent/langgraph/_a2a.py b/python/packages/kagent-langgraph/src/kagent/langgraph/_a2a.py index 57d1af2dd..942e9b2f8 100644 --- a/python/packages/kagent-langgraph/src/kagent/langgraph/_a2a.py +++ b/python/packages/kagent-langgraph/src/kagent/langgraph/_a2a.py @@ -15,7 +15,7 @@ from fastapi.responses import PlainTextResponse from kagent.core import KAgentConfig, configure_tracing -from kagent.core.a2a import KAgentRequestContextBuilder, KAgentTaskStore +from kagent.core.a2a import KAgentRequestContextBuilder, KAgentTaskStore, patch_a2a_payload_limit from langgraph.graph.state import CompiledStateGraph from ._executor import LangGraphAgentExecutor, LangGraphAgentExecutorConfig @@ -54,6 +54,7 @@ def __init__( config: KAgentConfig, executor_config: LangGraphAgentExecutorConfig | None = None, tracing: bool = True, + max_payload_size: int | None = None, ): """Initialize the KAgent application. @@ -63,6 +64,7 @@ def __init__( config: KAgent configuration executor_config: Optional executor configuration tracing: Enable OpenTelemetry tracing/logging via kagent.core.tracing + max_payload_size: Maximum payload size in bytes for A2A requests """ self._graph = graph @@ -71,6 +73,7 @@ def __init__( self.executor_config = executor_config or LangGraphAgentExecutorConfig() self._enable_tracing = tracing + self.max_payload_size = max_payload_size def build(self) -> FastAPI: """Build the FastAPI application with A2A integration. @@ -111,6 +114,10 @@ def build(self) -> FastAPI: # Enable fault handler for debugging faulthandler.enable() + # Patch a2a-python's payload size limit if specified + if self.max_payload_size is not None: + patch_a2a_payload_limit(self.max_payload_size) + # Create FastAPI application app = FastAPI( title=f"KAgent LangGraph: {self.config.app_name}", diff --git a/python/packages/kagent-langgraph/src/kagent/langgraph/_executor.py b/python/packages/kagent-langgraph/src/kagent/langgraph/_executor.py index bc82a5b1c..1684603c5 100644 --- a/python/packages/kagent-langgraph/src/kagent/langgraph/_executor.py +++ b/python/packages/kagent-langgraph/src/kagent/langgraph/_executor.py @@ -26,8 +26,6 @@ TextPart, ) from langchain_core.runnables import RunnableConfig -from langgraph.graph.state import CompiledStateGraph -from langgraph.types import Command from pydantic import BaseModel from kagent.core.a2a import ( @@ -43,6 +41,8 @@ clear_kagent_span_attributes, set_kagent_span_attributes, ) +from langgraph.graph.state import CompiledStateGraph +from langgraph.types import Command from ._converters import _convert_langgraph_event_to_a2a from ._error_mappings import get_error_metadata, get_user_friendly_error_message