Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/toolbox-adk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pip install toolbox-adk
from toolbox_adk import ToolboxToolset, CredentialStrategy

# Configure auth (e.g., Use the agent's identity)
creds = CredentialStrategy.TOOLBOX_IDENTITY()
creds = CredentialStrategy.toolbox_identity()

# Create the toolset
toolset = ToolboxToolset(
Expand Down
10 changes: 5 additions & 5 deletions packages/toolbox-adk/src/toolbox_adk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@

from .client import ToolboxClient
from .credentials import CredentialConfig, CredentialStrategy, CredentialType
from .tool import ToolboxContext, ToolboxTool
from .toolset import ToolboxToolset
# from .tool import ToolboxContext, ToolboxTool
# from .toolset import ToolboxToolset
from .version import __version__

__all__ = [
"CredentialStrategy",
"CredentialConfig",
"CredentialType",
"ToolboxClient",
"ToolboxTool",
"ToolboxContext",
"ToolboxToolset",
# "ToolboxTool",
# "ToolboxContext",
# "ToolboxToolset",
]
155 changes: 155 additions & 0 deletions packages/toolbox-adk/src/toolbox_adk/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from contextvars import ContextVar
from typing import Any, Awaitable, Callable, Dict, Optional, Union

import google.auth
import toolbox_core
from google.auth import compute_engine, transport
from google.auth.transport import requests
from google.oauth2 import id_token

from .credentials import CredentialConfig, CredentialType

USER_TOKEN_CONTEXT_VAR: ContextVar[Optional[str]] = ContextVar(
"toolbox_user_token", default=None
)


class ToolboxClient:
"""
Wraps toolbox_core.ToolboxClient to provide ADK-native authentication strategy support.
"""

def __init__(
self,
server_url: str,
credentials: Optional[CredentialConfig] = None,
additional_headers: Optional[
Dict[str, Union[str, Callable[[], str], Callable[[], Awaitable[str]]]]
] = None,
**kwargs: Any,
):
"""
Args:
server_url: The URL of the Toolbox server.
credentials: The CredentialConfig object (from CredentialStrategy).
additional_headers: Dictionary of headers (static or dynamic callables).
**kwargs: Additional arguments passed to toolbox_core.ToolboxClient.
"""
self._server_url = server_url
self._credentials = credentials
self._additional_headers = additional_headers or {}

self._core_client_headers: Dict[
str, Union[str, Callable[[], str], Callable[[], Awaitable[str]]]
] = {}

# Add static additional headers
for k, v in self._additional_headers.items():
self._core_client_headers[k] = v

if credentials:
self._configure_auth(credentials)

self._client = toolbox_core.ToolboxClient(
url=server_url, client_headers=self._core_client_headers, **kwargs
)

def _configure_auth(self, creds: CredentialConfig) -> None:
if creds.type == CredentialType.TOOLBOX_IDENTITY:
# No auth headers needed
pass

elif creds.type == CredentialType.WORKLOAD_IDENTITY:
if not creds.target_audience:
raise ValueError(
"target_audience is required for WORKLOAD_IDENTITY"
)

# Create an async capable token getter
self._core_client_headers["Authorization"] = self._create_adc_token_getter(
creds.target_audience
)

elif creds.type == CredentialType.MANUAL_TOKEN:
if not creds.token:
raise ValueError("token is required for MANUAL_TOKEN")
scheme = creds.scheme or "Bearer"
self._core_client_headers["Authorization"] = f"{scheme} {creds.token}"

elif creds.type == CredentialType.MANUAL_CREDS:
if not creds.credentials:
raise ValueError("credentials object is required for MANUAL_CREDS")

# Adapter for google-auth credentials object to callable
self._core_client_headers["Authorization"] = (
self._create_creds_token_getter(creds.credentials)
)

elif creds.type == CredentialType.USER_IDENTITY:
# For USER_IDENTITY (3LO), the *Tool* handles the interactive flow at runtime.

def get_user_token() -> str:
token = USER_TOKEN_CONTEXT_VAR.get()
if not token:
return ""
return f"Bearer {token}"

self._core_client_headers["Authorization"] = get_user_token

def _create_adc_token_getter(self, audience: str) -> Callable[[], str]:
"""Returns a callable that fetches a fresh ID token using ADC."""

def get_token() -> str:
request = requests.Request()
try:
token = id_token.fetch_id_token(request, audience)
return f"Bearer {token}"
except Exception:
# Fallback to default credentials
creds, _ = google.auth.default()
if not creds.valid:
creds.refresh(request)

if hasattr(creds, "id_token") and creds.id_token:
return f"Bearer {creds.id_token}"

curr_token = getattr(creds, "token", None)
return f"Bearer {curr_token}" if curr_token else ""

return get_token

def _create_creds_token_getter(self, credentials: Any) -> Callable[[], str]:
def get_token() -> str:
request = requests.Request()
if not credentials.valid:
credentials.refresh(request)
return f"Bearer {credentials.token}"

return get_token

@property
def credential_config(self) -> Optional[CredentialConfig]:
return self._credentials

async def load_toolset(self, toolset_name: str, **kwargs: Any) -> Any:
return await self._client.load_toolset(toolset_name, **kwargs)

async def load_tool(self, tool_name: str, **kwargs: Any) -> Any:
return await self._client.load_tool(tool_name, **kwargs)

async def close(self):
await self._client.close()
22 changes: 11 additions & 11 deletions packages/toolbox-adk/src/toolbox_adk/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

class CredentialType(Enum):
TOOLBOX_IDENTITY = "TOOLBOX_IDENTITY"
APPLICATION_DEFAULT_CREDENTIALS = "APPLICATION_DEFAULT_CREDENTIALS"
WORKLOAD_IDENTITY = "WORKLOAD_IDENTITY"
USER_IDENTITY = "USER_IDENTITY"
MANUAL_TOKEN = "MANUAL_TOKEN"
MANUAL_CREDS = "MANUAL_CREDS"
Expand All @@ -30,7 +30,7 @@ class CredentialType(Enum):
@dataclass
class CredentialConfig:
type: CredentialType
# For APPLICATION_DEFAULT_CREDENTIALS
# For WORKLOAD_IDENTITY
target_audience: Optional[str] = None
# For USER_IDENTITY
client_id: Optional[str] = None
Expand All @@ -47,32 +47,32 @@ class CredentialStrategy:
"""Factory for creating credential configurations for ToolboxToolset."""

@staticmethod
def TOOLBOX_IDENTITY() -> CredentialConfig:
def toolbox_identity() -> CredentialConfig:
"""
No credentials are sent. Relies on the remote Toolbox server's own identity.
"""
return CredentialConfig(type=CredentialType.TOOLBOX_IDENTITY)

@staticmethod
def APPLICATION_DEFAULT_CREDENTIALS(target_audience: str) -> CredentialConfig:
def workload_identity(target_audience: str) -> CredentialConfig:
"""
Uses the agent ADC to generate a Google-signed ID token for the specified audience.
This is suitable for Cloud Run, GKE, or local development with `gcloud auth login`.
"""
return CredentialConfig(
type=CredentialType.APPLICATION_DEFAULT_CREDENTIALS,
type=CredentialType.WORKLOAD_IDENTITY,
target_audience=target_audience,
)

@staticmethod
def WORKLOAD_IDENTITY(target_audience: str) -> CredentialConfig:
def application_default_credentials(target_audience: str) -> CredentialConfig:
"""
Alias for APPLICATION_DEFAULT_CREDENTIALS.
Alias for workload_identity.
"""
return CredentialStrategy.APPLICATION_DEFAULT_CREDENTIALS(target_audience)
return CredentialStrategy.workload_identity(target_audience)

@staticmethod
def USER_IDENTITY(
def user_identity(
client_id: str, client_secret: str, scopes: Optional[List[str]] = None
) -> CredentialConfig:
"""
Expand All @@ -87,7 +87,7 @@ def USER_IDENTITY(
)

@staticmethod
def MANUAL_TOKEN(token: str, scheme: str = "Bearer") -> CredentialConfig:
def manual_token(token: str, scheme: str = "Bearer") -> CredentialConfig:
"""
Send a hardcoded token string in the Authorization header.
"""
Expand All @@ -98,7 +98,7 @@ def MANUAL_TOKEN(token: str, scheme: str = "Bearer") -> CredentialConfig:
)

@staticmethod
def MANUAL_CREDS(credentials: Any) -> CredentialConfig:
def manual_creds(credentials: Any) -> CredentialConfig:
"""
Uses a provided Google Auth Credentials object.
"""
Expand Down
Loading