Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions go/api/v1alpha1/agent_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/json"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"trpc.group/trpc-go/trpc-a2a-go/server"
Expand Down Expand Up @@ -116,9 +117,15 @@ type AnyType struct {
json.RawMessage `json:",inline"`
}

// +kubebuilder:validation:XValidation:message="maxPayloadSize must be positive",rule="!has(self.maxPayloadSize) || self.maxPayloadSize.Value() > 0"
type A2AConfig struct {
// +kubebuilder:validation:MinItems=1
Skills []AgentSkill `json:"skills,omitempty"`
// MaxPayloadSize is the maximum payload size for A2A requests.
// Supports Kubernetes quantity format (e.g., "50Mi", "100MB").
// Must be positive (> 0). If not specified, uses the default a2a-python limit (~7-8MB).
// +optional
MaxPayloadSize *resource.Quantity `json:"maxPayloadSize,omitempty"`
}

type AgentSkill server.AgentSkill
Expand Down
7 changes: 7 additions & 0 deletions go/api/v1alpha2/agent_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -238,9 +239,15 @@ func (t *TypedLocalReference) GroupKind() schema.GroupKind {
}
}

// +kubebuilder:validation:XValidation:message="maxPayloadSize must be positive",rule="!has(self.maxPayloadSize) || self.maxPayloadSize.Value() > 0"
type A2AConfig struct {
// +kubebuilder:validation:MinItems=1
Skills []AgentSkill `json:"skills,omitempty"`
// MaxPayloadSize is the maximum payload size for A2A requests.
// Supports Kubernetes quantity format (e.g., "50Mi", "100MB").
// Must be positive (> 0). If not specified, uses the default a2a-python limit (~7-8MB).
// +optional
MaxPayloadSize *resource.Quantity `json:"maxPayloadSize,omitempty"`
}

type AgentSkill server.AgentSkill
Expand Down
29 changes: 16 additions & 13 deletions go/internal/adk/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,23 +246,25 @@ type RemoteAgentConfig struct {

// See `python/packages/kagent-adk/src/kagent/adk/types.py` for the python version of this
type AgentConfig struct {
Model Model `json:"model"`
Description string `json:"description"`
Instruction string `json:"instruction"`
HttpTools []HttpMcpServerConfig `json:"http_tools"`
SseTools []SseMcpServerConfig `json:"sse_tools"`
RemoteAgents []RemoteAgentConfig `json:"remote_agents"`
ExecuteCode bool `json:"execute_code,omitempty"`
Model Model `json:"model"`
Description string `json:"description"`
Instruction string `json:"instruction"`
HttpTools []HttpMcpServerConfig `json:"http_tools"`
SseTools []SseMcpServerConfig `json:"sse_tools"`
RemoteAgents []RemoteAgentConfig `json:"remote_agents"`
ExecuteCode bool `json:"execute_code,omitempty"`
MaxPayloadSize *int64 `json:"max_payload_size,omitempty"`
}

func (a *AgentConfig) UnmarshalJSON(data []byte) error {
var tmp struct {
Model json.RawMessage `json:"model"`
Description string `json:"description"`
Instruction string `json:"instruction"`
HttpTools []HttpMcpServerConfig `json:"http_tools"`
SseTools []SseMcpServerConfig `json:"sse_tools"`
RemoteAgents []RemoteAgentConfig `json:"remote_agents"`
Model json.RawMessage `json:"model"`
Description string `json:"description"`
Instruction string `json:"instruction"`
HttpTools []HttpMcpServerConfig `json:"http_tools"`
SseTools []SseMcpServerConfig `json:"sse_tools"`
RemoteAgents []RemoteAgentConfig `json:"remote_agents"`
MaxPayloadSize *int64 `json:"max_payload_size,omitempty"`
}
if err := json.Unmarshal(data, &tmp); err != nil {
return err
Expand All @@ -277,6 +279,7 @@ func (a *AgentConfig) UnmarshalJSON(data []byte) error {
a.HttpTools = tmp.HttpTools
a.SseTools = tmp.SseTools
a.RemoteAgents = tmp.RemoteAgents
a.MaxPayloadSize = tmp.MaxPayloadSize
return nil
}

Expand Down
14 changes: 14 additions & 0 deletions go/internal/controller/translator/agent/adk_api_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,20 @@ func (a *adkApiTranslator) translateInlineAgent(ctx context.Context, agent *v1al
ExecuteCode: false && ptr.Deref(agent.Spec.Declarative.ExecuteCodeBlocks, false), //ignored due to this issue https://github.com/google/adk-python/issues/3921.
}

// Extract maxPayloadSize from A2AConfig if present
if agent.Spec.Declarative.A2AConfig != nil && agent.Spec.Declarative.A2AConfig.MaxPayloadSize != nil {
maxPayloadSizeBytes := agent.Spec.Declarative.A2AConfig.MaxPayloadSize.Value()
Copy link

Copilot AI Dec 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The extracted maxPayloadSizeBytes value is not validated before being assigned to the config. While Kubernetes resource.Quantity parsing handles invalid formats, it can accept negative values (e.g., "-10Mi"). Consider adding validation to ensure the value is positive before assigning it to cfg.MaxPayloadSize.

Suggested change
maxPayloadSizeBytes := agent.Spec.Declarative.A2AConfig.MaxPayloadSize.Value()
maxPayloadSizeBytes := agent.Spec.Declarative.A2AConfig.MaxPayloadSize.Value()
if maxPayloadSizeBytes <= 0 {
return nil, nil, nil, fmt.Errorf("invalid maxPayloadSize: must be positive, got %d", maxPayloadSizeBytes)
}

Copilot uses AI. Check for mistakes.
// Validate that maxPayloadSize is positive (Kubernetes quantities can be negative)
if maxPayloadSizeBytes <= 0 {
return nil, nil, nil, fmt.Errorf(
"maxPayloadSize must be positive, got %d bytes (quantity: %s)",
maxPayloadSizeBytes,
agent.Spec.Declarative.A2AConfig.MaxPayloadSize.String(),
)
}
cfg.MaxPayloadSize = &maxPayloadSizeBytes
}

for _, tool := range agent.Spec.Declarative.Tools {
// Skip tools that are not applicable to the model provider
switch {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
operation: translateAgent
targetObject: test-agent
namespace: test

objects:
- apiVersion: v1
kind: Secret
metadata:
name: openai-secret
namespace: test
data:
api-key: c2stdGVzdC1hcGkta2V5 # base64 encoded "sk-test-api-key"
- apiVersion: kagent.dev/v1alpha2
kind: ModelConfig
metadata:
name: default-model-config
namespace: test
spec:
provider: OpenAI
model: gpt-4o
apiKeySecret: openai-secret
apiKeySecretKey: api-key
defaultHeaders:
User-Agent: "kagent/1.0"
- apiVersion: kagent.dev/v1alpha2
kind: Agent
metadata:
name: test-agent
namespace: test
spec:
type: Declarative
description: Agent with max payload size configuration
declarative:
modelConfig: default-model-config
systemMessage: You are a helpful assistant.
a2aConfig:
maxPayloadSize: "50Mi"
tools: []

8 changes: 7 additions & 1 deletion python/packages/kagent-adk/src/kagent/adk/_a2a.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from google.adk.sessions import InMemorySessionService
from google.genai import types

from kagent.core.a2a import KAgentRequestContextBuilder, KAgentTaskStore
from kagent.core.a2a import KAgentRequestContextBuilder, KAgentTaskStore, patch_a2a_payload_limit

from ._agent_executor import A2aAgentExecutor
from ._lifespan import LifespanManager
Expand Down Expand Up @@ -55,13 +55,15 @@ def __init__(
app_name: str,
lifespan: Optional[Callable[[Any], Any]] = None,
plugins: List[BasePlugin] = None,
max_payload_size: Optional[int] = None,
):
self.root_agent = root_agent
self.kagent_url = kagent_url
self.app_name = app_name
self.agent_card = agent_card
self._lifespan = lifespan
self.plugins = plugins if plugins is not None else []
self.max_payload_size = max_payload_size

def build(self, local=False) -> FastAPI:
session_service = InMemorySessionService()
Expand Down Expand Up @@ -111,6 +113,10 @@ def create_runner() -> Runner:
if not local:
lifespan_manager.add(token_service.lifespan())

# Patch a2a-python's payload size limit if specified
if self.max_payload_size is not None:
patch_a2a_payload_limit(self.max_payload_size)

app = FastAPI(lifespan=lifespan_manager)

# Health check/readiness probe
Expand Down
18 changes: 16 additions & 2 deletions python/packages/kagent-adk/src/kagent/adk/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,14 @@ def static(
root_agent = agent_config.to_agent(app_cfg.name, sts_integration)
maybe_add_skills(root_agent)

kagent_app = KAgentApp(root_agent, agent_card, app_cfg.url, app_cfg.app_name, plugins=plugins)
kagent_app = KAgentApp(
root_agent,
agent_card,
app_cfg.url,
app_cfg.app_name,
plugins=plugins,
max_payload_size=agent_config.max_payload_size,
)

server = kagent_app.build()
configure_tracing(server)
Expand Down Expand Up @@ -179,7 +186,14 @@ async def test_agent(agent_config: AgentConfig, agent_card: AgentCard, task: str
plugins = [sts_integration]
root_agent = agent_config.to_agent(app_cfg.name, sts_integration)
maybe_add_skills(root_agent)
app = KAgentApp(root_agent, agent_card, app_cfg.url, app_cfg.app_name, plugins=plugins)
app = KAgentApp(
root_agent,
agent_card,
app_cfg.url,
app_cfg.app_name,
plugins=plugins,
max_payload_size=agent_config.max_payload_size,
)
await app.test(task)


Expand Down
5 changes: 5 additions & 0 deletions python/packages/kagent-adk/src/kagent/adk/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ class AgentConfig(BaseModel):
sse_tools: list[SseMcpServerConfig] | None = None # SSE MCP tools
remote_agents: list[RemoteAgentConfig] | None = None # remote agents
execute_code: bool | None = None
max_payload_size: int | None = Field(
default=None,
gt=0,
description="Maximum payload size in bytes for A2A requests. Must be positive (> 0).",
)

def to_agent(self, name: str, sts_integration: Optional[ADKTokenPropagationPlugin] = None) -> Agent:
if name is None or not str(name).strip():
Expand Down
144 changes: 144 additions & 0 deletions python/packages/kagent-adk/tests/unittests/test_a2a_payload_size.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Unit tests for A2A payload size configuration.

NOTE: These tests verify that the configuration value can be set and patched,
but do NOT verify end-to-end behavior (i.e., that payloads of the configured
size can actually be sent/received).
"""

from unittest import mock

import pytest

Comment on lines +24 to +25
Copy link

Copilot AI Dec 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import of 'pytest' is not used.

Suggested change
import pytest

Copilot uses AI. Check for mistakes.
from kagent.core.a2a import patch_a2a_payload_limit


class TestPatchA2APayloadLimit:
"""Tests for patch_a2a_payload_limit function."""

def test_patch_max_payload_size_exists(self):
"""Test patching when MAX_PAYLOAD_SIZE constant exists."""
mock_jsonrpc_app = mock.MagicMock()
mock_jsonrpc_app.MAX_PAYLOAD_SIZE = 7 * 1024 * 1024 # 7MB default

with mock.patch("builtins.__import__", return_value=mock_jsonrpc_app):
patch_a2a_payload_limit(50 * 1024 * 1024) # 50MB

assert mock_jsonrpc_app.MAX_PAYLOAD_SIZE == 50 * 1024 * 1024

def test_patch_underscore_max_payload_size_exists(self):
"""Test patching when _MAX_PAYLOAD_SIZE constant exists."""
mock_jsonrpc_app = mock.MagicMock()
mock_jsonrpc_app._MAX_PAYLOAD_SIZE = 7 * 1024 * 1024 # 7MB default
del mock_jsonrpc_app.MAX_PAYLOAD_SIZE # Ensure MAX_PAYLOAD_SIZE doesn't exist

with mock.patch("builtins.__import__", return_value=mock_jsonrpc_app):
patch_a2a_payload_limit(100 * 1024 * 1024) # 100MB

assert mock_jsonrpc_app._MAX_PAYLOAD_SIZE == 100 * 1024 * 1024

def test_patch_module_not_found(self):
"""Test behavior when jsonrpc_app module cannot be imported."""

def mock_import(name, fromlist=None):
if "jsonrpc" in name:
raise ImportError("No module named 'a2a.server.apps.jsonrpc'")
return mock.MagicMock()

with mock.patch("builtins.__import__", side_effect=mock_import):
# Should not raise an exception, just log debug message
patch_a2a_payload_limit(50 * 1024 * 1024)

def test_patch_no_payload_size_constant(self):
"""Test behavior when payload size constant doesn't exist."""
mock_jsonrpc_app = mock.MagicMock()
# Remove any payload size attributes
if hasattr(mock_jsonrpc_app, "MAX_PAYLOAD_SIZE"):
delattr(mock_jsonrpc_app, "MAX_PAYLOAD_SIZE")
if hasattr(mock_jsonrpc_app, "_MAX_PAYLOAD_SIZE"):
delattr(mock_jsonrpc_app, "_MAX_PAYLOAD_SIZE")

with mock.patch("builtins.__import__", return_value=mock_jsonrpc_app):
# Should not raise an exception, just log debug message
patch_a2a_payload_limit(50 * 1024 * 1024)

def test_patch_with_different_import_paths(self):
"""Test that function tries multiple import paths."""
import_calls = []

def mock_import(name, fromlist=None):
import_calls.append(name)
if name == "a2a.server.apps.jsonrpc.jsonrpc_app":
raise ImportError("First path failed")
if name == "a2a.server.apps.jsonrpc_app":
mock_jsonrpc_app = mock.MagicMock()
mock_jsonrpc_app.MAX_PAYLOAD_SIZE = 7 * 1024 * 1024
return mock_jsonrpc_app
return mock.MagicMock()

with mock.patch("builtins.__import__", side_effect=mock_import):
patch_a2a_payload_limit(50 * 1024 * 1024)

# Should have tried both import paths
assert "a2a.server.apps.jsonrpc.jsonrpc_app" in import_calls
assert "a2a.server.apps.jsonrpc_app" in import_calls

def test_patch_raises_error_for_zero(self):
"""Test that patch_a2a_payload_limit raises ValueError for zero."""
with pytest.raises(ValueError, match="must be positive"):
patch_a2a_payload_limit(0)

def test_patch_raises_error_for_negative(self):
"""Test that patch_a2a_payload_limit raises ValueError for negative values."""
with pytest.raises(ValueError, match="must be positive"):
patch_a2a_payload_limit(-1)
with pytest.raises(ValueError, match="must be positive"):
patch_a2a_payload_limit(-100 * 1024 * 1024)

def test_patch_warns_on_override(self, caplog):
"""Test that patching with a different value logs a warning."""
mock_jsonrpc_app = mock.MagicMock()
mock_jsonrpc_app.MAX_PAYLOAD_SIZE = 7 * 1024 * 1024 # 7MB default

with mock.patch("builtins.__import__", return_value=mock_jsonrpc_app):
# First patch
patch_a2a_payload_limit(50 * 1024 * 1024) # 50MB
assert mock_jsonrpc_app.MAX_PAYLOAD_SIZE == 50 * 1024 * 1024

# Second patch with different value - should warn
patch_a2a_payload_limit(100 * 1024 * 1024) # 100MB
assert mock_jsonrpc_app.MAX_PAYLOAD_SIZE == 100 * 1024 * 1024

# Check that warning was logged
assert any(
"Overriding previously patched" in record.message and "process-level setting" in record.message
for record in caplog.records
)

def test_patch_no_warning_on_same_value(self, caplog):
"""Test that patching with the same value doesn't log a warning."""
mock_jsonrpc_app = mock.MagicMock()
mock_jsonrpc_app.MAX_PAYLOAD_SIZE = 7 * 1024 * 1024 # 7MB default

with mock.patch("builtins.__import__", return_value=mock_jsonrpc_app):
# First patch
patch_a2a_payload_limit(50 * 1024 * 1024) # 50MB

# Second patch with same value - should not warn
patch_a2a_payload_limit(50 * 1024 * 1024) # 50MB again

# Check that no warning was logged
assert not any("Overriding previously patched" in record.message for record in caplog.records)
Loading