Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions packages/toolbox-adk/src/toolbox_adk/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
from typing import Any, Dict, List, Optional

from google.auth import credentials as google_creds
from google.adk.auth.auth_credential import AuthCredential, AuthCredentialTypes
from google.adk.auth.auth_tool import AuthConfig, AuthScheme


class CredentialType(Enum):
Expand Down Expand Up @@ -106,3 +108,70 @@ def manual_creds(credentials: Any) -> CredentialConfig:
type=CredentialType.MANUAL_CREDS,
credentials=credentials,
)

@staticmethod
def from_adk_auth_config(auth_config: "AuthConfig") -> CredentialConfig:
"""
Creates a CredentialConfig from an ADK AuthConfig object.

Args:
auth_config: The ADK AuthConfig object.

Returns:
CredentialConfig: The corresponding credential configuration.
"""
if auth_config.raw_auth_credential is None:
raise ValueError("AuthConfig must have a raw_auth_credential.")

return CredentialStrategy.from_adk_credentials(
auth_config.auth_scheme, auth_config.raw_auth_credential
)

@staticmethod
def from_adk_credentials(
auth_scheme: "AuthScheme", auth_credential: "AuthCredential"
) -> CredentialConfig:
"""
Creates a CredentialConfig from ADK AuthScheme and AuthCredential objects.

Args:
auth_scheme: The ADK AuthScheme (e.g. OAuth2, HTTP, etc).
auth_credential: The ADK AuthCredential (containing secrets/tokens).

Returns:
CredentialConfig: The corresponding credential configuration.

Raises:
ValueError: If the credential type is not supported.
"""
# Handle OAuth2
if (
auth_credential.auth_type == AuthCredentialTypes.OAUTH2
and auth_credential.oauth2
):
# Extract client_id, client_secret, and scopes from the credential object.
return CredentialStrategy.user_identity(
client_id=auth_credential.oauth2.client_id,
client_secret=auth_credential.oauth2.client_secret,
scopes=auth_credential.oauth2.scopes,
)

# Handle HTTP Bearer
if (
auth_credential.auth_type == AuthCredentialTypes.HTTP
and auth_credential.http
):
# Verify scheme is Bearer and extract token.
scheme_type = (auth_credential.http.scheme or "").lower()
if (
scheme_type == "bearer"
and auth_credential.http.credentials
and auth_credential.http.credentials.token
):
return CredentialStrategy.manual_token(
token=auth_credential.http.credentials.token, scheme="Bearer"
)

raise ValueError(
f"Unsupported ADK credential type: {auth_credential.auth_type}"
)
68 changes: 68 additions & 0 deletions packages/toolbox-adk/tests/unit/test_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,71 @@ def test_manual_creds(self):
config = CredentialStrategy.manual_creds(fake_creds)
assert config.type == CredentialType.MANUAL_CREDS
assert config.credentials == fake_creds

def test_from_adk_credentials_oauth2(self):
from google.adk.auth.auth_credential import (
AuthCredential,
AuthCredentialTypes,
OAuth2Auth,
)
from fastapi.openapi.models import OAuth2, OAuthFlows

# Mock objects
oauth2_auth = OAuth2Auth(
client_id="cid", client_secret="csec", scopes=["s1"]
)
cred = AuthCredential(
auth_type=AuthCredentialTypes.OAUTH2, oauth2=oauth2_auth
)
# Scheme is required for type hinting but not used for OAuth2 in this impl
scheme = OAuth2(flows=OAuthFlows())

config = CredentialStrategy.from_adk_credentials(scheme, cred)
assert config.type == CredentialType.USER_IDENTITY
assert config.client_id == "cid"
assert config.client_secret == "csec"
assert config.scopes == ["s1"]

def test_from_adk_credentials_http_bearer(self):
from google.adk.auth.auth_credential import (
AuthCredential,
AuthCredentialTypes,
HttpCredentials,
HttpAuth,
)
from fastapi.openapi.models import HTTPBearer

http_creds = HttpCredentials(token="xyz")
http_auth = HttpAuth(scheme="Bearer", credentials=http_creds)
cred = AuthCredential(
auth_type=AuthCredentialTypes.HTTP, http=http_auth
)
scheme = HTTPBearer()

config = CredentialStrategy.from_adk_credentials(scheme, cred)
assert config.type == CredentialType.MANUAL_TOKEN
assert config.token == "xyz"
assert config.scheme == "Bearer"

def test_from_adk_auth_config(self):
from google.adk.auth.auth_tool import AuthConfig
from google.adk.auth.auth_credential import (
AuthCredential,
AuthCredentialTypes,
OAuth2Auth,
)
from fastapi.openapi.models import OAuth2, OAuthFlows

oauth2_auth = OAuth2Auth(
client_id="cid2", client_secret="csec2", scopes=["s2"]
)
cred = AuthCredential(
auth_type=AuthCredentialTypes.OAUTH2, oauth2=oauth2_auth
)
scheme = OAuth2(flows=OAuthFlows())
auth_config = AuthConfig(auth_scheme=scheme, raw_auth_credential=cred)

config = CredentialStrategy.from_adk_auth_config(auth_config)
assert config.type == CredentialType.USER_IDENTITY
assert config.client_id == "cid2"