Skip to content

Agent

This module initializes the agent package.

Exposes the core agent classes and interfaces.

Author

Christian Trisno Sen Long Chen (christian.t.s.l.chen@gdplabs.id)

AgentInterface(name, instruction, description=None, lm_invoker=None, config=None, **kwargs)

Bases: ABC

A general and minimal interface for agent implementations.

Defines core execution methods (__init__, run, arun, arun_stream). Concrete subclasses must implement all abstract methods.

Initializes the agent.

Parameters:

Name Type Description Default
name str

The name of the agent.

required
instruction str

The core directive or system prompt for the agent.

required
description str | None

Human-readable description. Defaults to instruction if not provided.

None
lm_invoker BaseLMInvoker | None

The language model invoker to use for LLM interactions. Defaults to None.

None
config BaseAgentConfig | None

Additional configuration for the agent.

None
**kwargs Any

Additional keyword arguments for concrete implementations.

{}

add_mcp_server(mcp_config) abstractmethod

Adds a new MCP server configuration.

Parameters:

Name Type Description Default
mcp_config dict[str, dict[str, Any]]

Dictionary containing server name as key and its configuration as value.

required

Raises:

Type Description
ValueError

If mcp_config is empty or None, or if any server configuration is invalid.

KeyError

If any server name already exists in the configuration.

arun(query, **kwargs) abstractmethod async

Asynchronously runs the agent.

Parameters:

Name Type Description Default
query str

The input query for the agent.

required
**kwargs Any

Additional keyword arguments for execution.

{}

Returns:

Type Description
dict[str, Any]

Dict containing at least {'output': ...}.

arun_stream(query, **kwargs) abstractmethod async

Asynchronously streams the agent's response.

Parameters:

Name Type Description Default
query str

The input query.

required
**kwargs Any

Extra parameters for execution.

{}

Yields:

Type Description
AsyncGenerator[str | dict[str, Any], None]

Chunks of output (strings or dicts).

register_a2a_agents(agents) abstractmethod

Registers A2A agents from a list of AgentCards.

Parameters:

Name Type Description Default
agents list[AgentCard]

A list of AgentCard instances.

required

run(query, **kwargs) abstractmethod

Synchronously runs the agent.

Parameters:

Name Type Description Default
query str

The input query for the agent.

required
**kwargs Any

Additional keyword arguments for execution.

{}

Returns:

Type Description
dict[str, Any]

Dict containing at least {'output': ...}.

BaseAgent(name, instruction, description=None, model=None, tools=None, config=None, **kwargs)

Bases: AgentInterface

Base class for agents, providing common A2A client method implementations.

Concrete agent implementations (e.g., LangGraphAgent, GoogleADKAgent) should inherit from this class if they need to utilize the shared A2A client functionalities.

This class now supports flexible model handling: - model: Optional[Any] - can be an lm_invoker, string/ModelId, LangChain BaseChatModel, or other types - Automatically sets self.lm_invoker if an lm_invoker is provided or can be built - Stores the original model in self.model for subclass use

Initializes the BaseAgent.

Parameters:

Name Type Description Default
name str

The name of the agent.

required
instruction str

The core directive or system prompt for the agent.

required
description str | None

Human-readable description. Defaults to instruction if not provided.

None
model Any | None

The model to use. Can be: - BaseLMInvoker instance (will be set as self.lm_invoker) - String or ModelId (will build an lm_invoker) - LangChain BaseChatModel (will be stored in self.model) - Any other type (will be stored in self.model)

None
tools list[Any] | None

List of tools available to the agent.

None
config BaseAgentConfig | dict[str, Any] | None

Additional configuration for the agent. Can be a BaseAgentConfig instance or dict.

None
**kwargs Any

Additional keyword arguments for AgentInterface.

{}

model_provider: str property

Get the provider of the model with simplified logic.

Returns:

Name Type Description
str str

The provider of the model.

add_mcp_server(mcp_config)

Adds MCP servers to the agent.

Parameters:

Name Type Description Default
mcp_config dict[str, dict[str, Any]]

A dictionary containing MCP server configurations.

required

Raises:

Type Description
ValueError

If the MCP configuration is empty or None.

KeyError

If a server with the same name already exists in the MCP configuration.

asend_to_agent(agent_card, message, **kwargs) async

Asynchronously sends a message to another agent using the A2A protocol.

This method handles the core A2A communication logic, creating and sending properly formatted A2A messages and processing the responses.

Parameters:

Name Type Description Default
agent_card AgentCard

The AgentCard instance containing the target agent's details including URL, authentication requirements, and capabilities.

required
message str | dict[str, Any]

The message to send to the agent. Can be either a string for simple text messages or a dictionary for structured data.

required
**kwargs Any

Additional keyword arguments.

{}

Returns:

Type Description
dict[str, Any]

A dictionary containing the response details: - status (str): 'success' or 'error' - content (str): Extracted text content from the response - task_id (str, optional): ID of the created/updated task - task_state (str, optional): Current state of the task - raw_response (str): Complete JSON response from the A2A client - error_type (str, optional): Type of error if status is 'error' - message (str, optional): Error message if status is 'error'

Raises:

Type Description
HTTPError

If there's an HTTP-related error during the request.

Exception

For any other unexpected errors during message sending or processing.

astream_to_agent(agent_card, message, **kwargs) async

Asynchronously sends a streaming message to another agent using the A2A protocol.

This method supports streaming responses from the target agent, yielding chunks of the response as they become available. It handles various types of streaming events including task status updates, artifact updates, and message parts.

Parameters:

Name Type Description Default
agent_card AgentCard

The AgentCard instance containing the target agent's details including URL, authentication requirements, and capabilities.

required
message str | dict[str, Any]

The message to send to the agent. Can be either a string for simple text messages or a dictionary for structured data.

required
**kwargs Any

Additional keyword arguments.

{}

Yields:

Type Description
AsyncGenerator[dict[str, Any], None]

Dictionaries containing streaming response chunks: For successful chunks: - status (str): 'success' - content (str): Extracted text content from the chunk - task_id (str): ID of the associated task - task_state (str): Current state of the task - final (bool): Whether this is the final chunk - artifact_name (str, optional): Name of the artifact if chunk is an artifact update For error chunks: - status (str): 'error' - error_type (str): Type of error encountered - message (str): Error description

Raises:

Type Description
HTTPError

If there's an HTTP-related error during the streaming request.

Exception

For any other unexpected errors during message streaming or processing.

discover_agents(a2a_config, **kwargs) classmethod

Discover agents from the URLs specified in a2a_config.discovery_urls.

This concrete implementation fetches and parses .well-known/agent.json from each discovery URL to build a list of available agents.

Parameters:

Name Type Description Default
a2a_config A2AClientConfig

Configuration containing discovery URLs and other A2A settings.

required
**kwargs Any

Additional keyword arguments (unused in this implementation).

{}

Returns:

Type Description
list[AgentCard]

A list of AgentCard objects representing discovered agents.

format_agent_description(agent_card) staticmethod

Format the description of an agent card including skills information.

Parameters:

Name Type Description Default
agent_card AgentCard

The agent card to format.

required

Returns:

Name Type Description
str str

The formatted description including skills.

get_name_preprocessor()

Get the name preprocessor based on the provider.

This will be used to correct the agent name and tool name. (mostly tool name)

Returns:

Name Type Description
NamePreprocessor NamePreprocessor

The name preprocessor for the model.

send_to_agent(agent_card, message, **kwargs)

Synchronously sends a message to another agent using the A2A protocol.

This method is a synchronous wrapper around asend_to_agent. It handles the creation of an event loop if one doesn't exist, and manages the asynchronous call internally.

Parameters:

Name Type Description Default
agent_card AgentCard

The AgentCard instance containing the target agent's details including URL, authentication requirements, and capabilities.

required
message str | dict[str, Any]

The message to send to the agent. Can be either a string for simple text messages or a dictionary for structured data.

required
**kwargs Any

Additional keyword arguments passed to asend_to_agent.

{}

Returns:

Type Description
dict[str, Any]

A dictionary containing the response details: - status (str): 'success' or 'error' - content (str): Extracted text content from the response - task_id (str, optional): ID of the created/updated task - task_state (str, optional): Current state of the task - raw_response (str): Complete JSON response from the A2A client - error_type (str, optional): Type of error if status is 'error' - message (str, optional): Error message if status is 'error'

Raises:

Type Description
RuntimeError

If called from within an existing event loop or if asend_to_agent encounters an unhandled exception.

to_a2a(agent_card, **kwargs)

Converts the agent to an A2A-compatible ASGI application.

This implementation provides a base setup for A2A server components. Subclasses can override this method if they need custom executor or task store implementations.

Parameters:

Name Type Description Default
agent_card AgentCard

The agent card to use for the A2A application.

required
**kwargs Any

Additional keyword arguments for ASGI application configuration.

{}

Returns:

Type Description
Starlette

A Starlette ASGI application that can be used with any ASGI server.

BaseLangGraphAgent(name, instruction, description=None, model=None, tools=None, state_schema=None, thread_id_key='thread_id', event_emitter=None, checkpointer=None, **kwargs)

Bases: BaseAgent

Base class for LangGraph-based agents with unified tool approach.

Provides core LangGraph functionality including: - Graph compilation and execution - State schema management - I/O mapping between user inputs and graph states - Event emission support - Tool resolution and handling - A2A communication capabilities via tools - Agent delegation capabilities via tools - MCP server integration via tools - Enhanced output extraction from various state formats

Tool Management: - regular_tools: Standard LangChain tools provided during initialization - mcp_tools: Tools retrieved from MCP servers - resolved_tools: Combined collection of all tools for graph execution

Subclasses must implement: - define_graph(): Define the specific graph structure - _prepare_graph_input(): Convert user input to graph state - _format_graph_output(): Convert final graph state to user output

Initialize the BaseLangGraphAgent.

Parameters:

Name Type Description Default
name str

The name of the agent.

required
instruction str

The system instruction for the agent.

required
description str | None

Human-readable description of the agent.

None
model Any | None

The model to use (lm_invoker, LangChain model, string, etc.).

None
tools Sequence[BaseTool] | None

Sequence of regular LangChain tools (not A2A or delegation tools).

None
state_schema Type | None

The state schema for the LangGraph. Defaults to basic message state.

None
thread_id_key str

Key for thread ID in configuration.

'thread_id'
event_emitter EventEmitter | None

Optional event emitter for streaming updates.

None
checkpointer Checkpointer | None

Optional checkpointer for conversation persistence.

None
**kwargs Any

Additional keyword arguments passed to BaseAgent.

{}

add_mcp_server(mcp_config)

Add MCP server configuration.

Parameters:

Name Type Description Default
mcp_config dict[str, dict[str, Any]]

Dictionary containing MCP server configurations.

required

arun(query, **kwargs) async

Asynchronously run the LangGraph agent.

If MCP configuration exists, connects to the MCP server and registers tools before running.

Parameters:

Name Type Description Default
query str

The input query for the agent.

required
**kwargs Any

Additional keyword arguments including configurable for LangGraph.

{}

Returns:

Type Description
dict[str, Any]

Dictionary containing the agent's response and full final state.

arun_a2a_stream(query, **kwargs) async

Asynchronously streams the agent's response in a generic format for A2A.

If MCP configuration exists, connects to the MCP server and registers tools before streaming.

Parameters:

Name Type Description Default
query str

The input query for the agent.

required
**kwargs Any

Additional keyword arguments.

{}

Yields:

Type Description
AsyncGenerator[dict[str, Any], None]

Dictionaries with "status" and "content" keys.

AsyncGenerator[dict[str, Any], None]

Possible statuses: "working", "completed", "failed", "canceled".

arun_stream(query, **kwargs) async

Asynchronously stream the LangGraph agent's response.

If MCP configuration exists, connects to the MCP server and registers tools before streaming. This method properly handles both LM Invoker and LangChain model streaming: - For LM Invoker: Uses StreamEventHandler to capture streaming events - For LangChain models: Uses LangGraph's native streaming implementation

Parameters:

Name Type Description Default
query str

The input query for the agent.

required
**kwargs Any

Additional keyword arguments.

{}

Yields:

Type Description
AsyncGenerator[str | dict[str, Any], None]

Chunks of output (strings or dicts) from the streaming response.

define_graph(graph_builder) abstractmethod

Define the specific graph structure for this agent type.

Subclasses must implement this method to: 1. Add nodes to the graph_builder 2. Add edges and conditional edges 3. Set entry points 4. Return the compiled graph

Parameters:

Name Type Description Default
graph_builder StateGraph

The StateGraph builder to define nodes and edges on.

required

Returns:

Type Description
CompiledStateGraph

The compiled graph ready for execution.

register_a2a_agents(agent_cards)

Register A2A communication capabilities using the A2A tool manager.

Parameters:

Name Type Description Default
agent_cards list[AgentCard]

List of AgentCard instances for external communication.

required

register_delegation_agents(agents)

Register internal agent delegation capabilities using the delegation tool manager.

Parameters:

Name Type Description Default
agents list[BaseAgent]

List of BaseAgent instances for internal task delegation.

required

run(query, **kwargs)

Synchronously run the LangGraph agent.

Parameters:

Name Type Description Default
query str

The input query for the agent.

required
**kwargs Any

Additional keyword arguments.

{}

Returns:

Type Description
dict[str, Any]

Dictionary containing the agent's response.

update_regular_tools(new_tools, rebuild_graph=None)

Update regular tools (not capabilities).

Parameters:

Name Type Description Default
new_tools list[BaseTool]

New list of regular tools to use.

required
rebuild_graph bool | None

Whether to rebuild graph. If None, uses auto_rebuild_graph setting.

None

GoogleADKAgent(name, instruction, model, tools=None, description=None, max_iterations=3, agents=None, **kwargs)

Bases: BaseAgent

An agent that wraps a native Google ADK Agent.

This class implements the AgentInterface and uses Google's LlmAgent to handle the core conversation and tool execution logic via ADK's async-first design.

Initializes the GoogleADKAgent.

Parameters:

Name Type Description Default
name str

The name of this wrapper agent.

required
instruction str

The instruction for this wrapper agent.

required
model str

The name of the Google ADK model to use (e.g., "gemini-1.5-pro-latest").

required
tools Optional[list[Any]]

An optional list of callable tools for the ADK agent.

None
description Optional[str]

An optional human-readable description.

None
max_iterations int

Maximum number of iterations to run (default: 3).

3
agents Optional[List[GoogleADKAgent]]

Optional list of sub-agents that this agent can delegate to using ADK's built-in multi-agent capabilities. These will be passed as sub_agents to the underlying LlmAgent.

None
**kwargs Any

Additional keyword arguments passed to the parent __init__.

{}

add_mcp_server(mcp_config)

Adds a new MCP server configuration.

Parameters:

Name Type Description Default
mcp_config Dict[str, Dict[str, Any]]

Dictionary containing server name as key and its configuration as value.

required

Raises:

Type Description
ValueError

If mcp_config is empty or None, or if any server configuration is invalid.

KeyError

If any server name already exists in the configuration.

arun(query, **kwargs) async

Asynchronously runs the agent with the given query and returns the response.

Parameters:

Name Type Description Default
query str

The user's query to process.

required
**kwargs Any

Additional keyword arguments. Supports "session_id", "user_id", "app_name".

{}

Returns:

Type Description
Dict[str, Any]

A dictionary containing the output and other metadata.

arun_a2a_stream(query, configurable=None, **kwargs) async

Asynchronously streams the agent's response in a format compatible with A2A.

This method formats the ADK agent's streaming responses into a consistent format that the A2A executor can understand and process.

Parameters:

Name Type Description Default
query str

The input query for the agent.

required
configurable Optional[Dict[str, Any]]

Optional dictionary for configuration, may include: - thread_id: The A2A task ID (used as session_id).

None
**kwargs Any

Additional keyword arguments. Supports "user_id", "app_name".

{}

Yields:

Type Description
AsyncGenerator[Dict[str, Any], None]

Dictionary with 'status' and 'content' fields that describe the agent's response state.

arun_stream(query, **kwargs) async

Runs the agent with the given query and streams the response parts.

Parameters:

Name Type Description Default
query str

The user's query to process.

required
**kwargs Any

Additional keyword arguments. Supports "session_id", "user_id", "app_name".

{}

Yields:

Type Description
AsyncIterator[str]

Text response chunks from the model. If an error occurs, the error message is yielded.

register_a2a_agents(agent_cards)

Convert known A2A agents to LangChain tools.

This method takes the agents from a2a_config.known_agents, creates A2AAgent instances for each one, and wraps them in LangChain tools.

Returns:

Name Type Description
None None

The tools are added to the existing tools list.

run(query, **kwargs)

Synchronously runs the Google ADK agent by wrapping the internal async run method.

Parameters:

Name Type Description Default
query str

The input query for the agent.

required
**kwargs Any

Additional keyword arguments passed to the internal async run method. Supports "session_id", "user_id", "app_name".

{}

Returns:

Type Description
Dict[str, Any]

A dictionary containing the agent's response.

Raises:

Type Description
RuntimeError

If asyncio.run() is called from an already running event loop, or for other unhandled errors during synchronous execution.

LangChainAgent(name, instruction=DEFAULT_INSTRUCTION, model=None, tools=None, agents=None, description=None, thread_id_key='thread_id', event_emitter=None, **kwargs)

Bases: LangGraphReactAgent

Alias for LangGraphReactAgent.

Initialize the LangGraph ReAct Agent.

Parameters:

Name Type Description Default
name str

The name of the agent.

required
instruction str

The system instruction for the agent.

DEFAULT_INSTRUCTION
model BaseChatModel | str | Any | None

The model to use (lm_invoker, LangChain model, string, etc.).

None
tools Sequence[BaseTool] | None

Sequence of LangChain tools available to the agent.

None
agents Sequence[Any] | None

Optional sequence of sub-agents for delegation (coordinator mode).

None
description str | None

Human-readable description of the agent.

None
thread_id_key str

Key for thread ID in configuration.

'thread_id'
event_emitter EventEmitter | None

Optional event emitter for streaming updates.

None
**kwargs Any

Additional keyword arguments passed to BaseLangGraphAgent.

{}

LangGraphAgent(name, instruction=DEFAULT_INSTRUCTION, model=None, tools=None, agents=None, description=None, thread_id_key='thread_id', event_emitter=None, **kwargs)

Bases: LangGraphReactAgent

Alias for LangGraphReactAgent.

Initialize the LangGraph ReAct Agent.

Parameters:

Name Type Description Default
name str

The name of the agent.

required
instruction str

The system instruction for the agent.

DEFAULT_INSTRUCTION
model BaseChatModel | str | Any | None

The model to use (lm_invoker, LangChain model, string, etc.).

None
tools Sequence[BaseTool] | None

Sequence of LangChain tools available to the agent.

None
agents Sequence[Any] | None

Optional sequence of sub-agents for delegation (coordinator mode).

None
description str | None

Human-readable description of the agent.

None
thread_id_key str

Key for thread ID in configuration.

'thread_id'
event_emitter EventEmitter | None

Optional event emitter for streaming updates.

None
**kwargs Any

Additional keyword arguments passed to BaseLangGraphAgent.

{}

LangGraphReactAgent(name, instruction=DEFAULT_INSTRUCTION, model=None, tools=None, agents=None, description=None, thread_id_key='thread_id', event_emitter=None, **kwargs)

Bases: BaseLangGraphAgent

A ReAct agent template built on LangGraph.

This agent can use either: - An LMInvoker (if self.lm_invoker is set by BaseAgent) - A LangChain BaseChatModel (if self.model is set by BaseAgent)

The graph structure follows the standard ReAct pattern: agent -> tools -> agent (loop) -> END

Initialize the LangGraph ReAct Agent.

Parameters:

Name Type Description Default
name str

The name of the agent.

required
instruction str

The system instruction for the agent.

DEFAULT_INSTRUCTION
model BaseChatModel | str | Any | None

The model to use (lm_invoker, LangChain model, string, etc.).

None
tools Sequence[BaseTool] | None

Sequence of LangChain tools available to the agent.

None
agents Sequence[Any] | None

Optional sequence of sub-agents for delegation (coordinator mode).

None
description str | None

Human-readable description of the agent.

None
thread_id_key str

Key for thread ID in configuration.

'thread_id'
event_emitter EventEmitter | None

Optional event emitter for streaming updates.

None
**kwargs Any

Additional keyword arguments passed to BaseLangGraphAgent.

{}

define_graph(graph_builder)

Define the ReAct agent graph structure.

Parameters:

Name Type Description Default
graph_builder StateGraph

The StateGraph builder to define the graph structure.

required

Returns:

Type Description
CompiledStateGraph

Compiled LangGraph ready for execution.