Agent
This module initializes the agent package.
Exposes the core agent classes and interfaces.
AgentInterface(name, instruction, description=None, lm_invoker=None, config=None, **kwargs)
Bases: ABC
A general and minimal interface for agent implementations.
Defines core execution methods (__init__
, run
, arun
, arun_stream
).
Concrete subclasses must implement all abstract methods.
Initializes the agent.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str
|
The name of the agent. |
required |
instruction |
str
|
The core directive or system prompt for the agent. |
required |
description |
str | None
|
Human-readable description. Defaults to instruction if not provided. |
None
|
lm_invoker |
BaseLMInvoker | None
|
The language model invoker to use for LLM interactions. Defaults to None. |
None
|
config |
BaseAgentConfig | None
|
Additional configuration for the agent. |
None
|
**kwargs |
Any
|
Additional keyword arguments for concrete implementations. |
{}
|
add_mcp_server(mcp_config)
abstractmethod
Adds a new MCP server configuration.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mcp_config |
dict[str, dict[str, Any]]
|
Dictionary containing server name as key and its configuration as value. |
required |
Raises:
Type | Description |
---|---|
ValueError
|
If mcp_config is empty or None, or if any server configuration is invalid. |
KeyError
|
If any server name already exists in the configuration. |
arun(query, **kwargs)
abstractmethod
async
Asynchronously runs the agent.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query |
str
|
The input query for the agent. |
required |
**kwargs |
Any
|
Additional keyword arguments for execution. |
{}
|
Returns:
Type | Description |
---|---|
dict[str, Any]
|
Dict containing at least {'output': ...}. |
arun_stream(query, **kwargs)
abstractmethod
async
Asynchronously streams the agent's response.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query |
str
|
The input query. |
required |
**kwargs |
Any
|
Extra parameters for execution. |
{}
|
Yields:
Type | Description |
---|---|
AsyncGenerator[str | dict[str, Any], None]
|
Chunks of output (strings or dicts). |
register_a2a_agents(agents)
abstractmethod
Registers A2A agents from a list of AgentCards.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
agents |
list[AgentCard]
|
A list of AgentCard instances. |
required |
run(query, **kwargs)
abstractmethod
Synchronously runs the agent.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query |
str
|
The input query for the agent. |
required |
**kwargs |
Any
|
Additional keyword arguments for execution. |
{}
|
Returns:
Type | Description |
---|---|
dict[str, Any]
|
Dict containing at least {'output': ...}. |
BaseAgent(name, instruction, description=None, model=None, tools=None, config=None, **kwargs)
Bases: AgentInterface
Base class for agents, providing common A2A client method implementations.
Concrete agent implementations (e.g., LangGraphAgent, GoogleADKAgent) should inherit from this class if they need to utilize the shared A2A client functionalities.
This class now supports flexible model handling: - model: Optional[Any] - can be an lm_invoker, string/ModelId, LangChain BaseChatModel, or other types - Automatically sets self.lm_invoker if an lm_invoker is provided or can be built - Stores the original model in self.model for subclass use
Initializes the BaseAgent.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str
|
The name of the agent. |
required |
instruction |
str
|
The core directive or system prompt for the agent. |
required |
description |
str | None
|
Human-readable description. Defaults to instruction if not provided. |
None
|
model |
Any | None
|
The model to use. Can be: - BaseLMInvoker instance (will be set as self.lm_invoker) - String or ModelId (will build an lm_invoker) - LangChain BaseChatModel (will be stored in self.model) - Any other type (will be stored in self.model) |
None
|
tools |
list[Any] | None
|
List of tools available to the agent. |
None
|
config |
BaseAgentConfig | dict[str, Any] | None
|
Additional configuration for the agent. Can be a BaseAgentConfig instance or dict. |
None
|
**kwargs |
Any
|
Additional keyword arguments for AgentInterface. |
{}
|
model_provider: str
property
Get the provider of the model with simplified logic.
Returns:
Name | Type | Description |
---|---|---|
str |
str
|
The provider of the model. |
add_mcp_server(mcp_config)
Adds MCP servers to the agent.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mcp_config |
dict[str, dict[str, Any]]
|
A dictionary containing MCP server configurations. |
required |
Raises:
Type | Description |
---|---|
ValueError
|
If the MCP configuration is empty or None. |
KeyError
|
If a server with the same name already exists in the MCP configuration. |
asend_to_agent(agent_card, message, **kwargs)
async
Asynchronously sends a message to another agent using the A2A protocol.
This method handles the core A2A communication logic, creating and sending properly formatted A2A messages and processing the responses.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
agent_card |
AgentCard
|
The AgentCard instance containing the target agent's details including URL, authentication requirements, and capabilities. |
required |
message |
str | dict[str, Any]
|
The message to send to the agent. Can be either a string for simple text messages or a dictionary for structured data. |
required |
**kwargs |
Any
|
Additional keyword arguments. |
{}
|
Returns:
Type | Description |
---|---|
dict[str, Any]
|
A dictionary containing the response details: - status (str): 'success' or 'error' - content (str): Extracted text content from the response - task_id (str, optional): ID of the created/updated task - task_state (str, optional): Current state of the task - raw_response (str): Complete JSON response from the A2A client - error_type (str, optional): Type of error if status is 'error' - message (str, optional): Error message if status is 'error' |
Raises:
Type | Description |
---|---|
HTTPError
|
If there's an HTTP-related error during the request. |
Exception
|
For any other unexpected errors during message sending or processing. |
astream_to_agent(agent_card, message, **kwargs)
async
Asynchronously sends a streaming message to another agent using the A2A protocol.
This method supports streaming responses from the target agent, yielding chunks of the response as they become available. It handles various types of streaming events including task status updates, artifact updates, and message parts.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
agent_card |
AgentCard
|
The AgentCard instance containing the target agent's details including URL, authentication requirements, and capabilities. |
required |
message |
str | dict[str, Any]
|
The message to send to the agent. Can be either a string for simple text messages or a dictionary for structured data. |
required |
**kwargs |
Any
|
Additional keyword arguments. |
{}
|
Yields:
Type | Description |
---|---|
AsyncGenerator[dict[str, Any], None]
|
Dictionaries containing streaming response chunks: For successful chunks: - status (str): 'success' - content (str): Extracted text content from the chunk - task_id (str): ID of the associated task - task_state (str): Current state of the task - final (bool): Whether this is the final chunk - artifact_name (str, optional): Name of the artifact if chunk is an artifact update For error chunks: - status (str): 'error' - error_type (str): Type of error encountered - message (str): Error description |
Raises:
Type | Description |
---|---|
HTTPError
|
If there's an HTTP-related error during the streaming request. |
Exception
|
For any other unexpected errors during message streaming or processing. |
discover_agents(a2a_config, **kwargs)
classmethod
Discover agents from the URLs specified in a2a_config.discovery_urls.
This concrete implementation fetches and parses .well-known/agent.json from each discovery URL to build a list of available agents.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
a2a_config |
A2AClientConfig
|
Configuration containing discovery URLs and other A2A settings. |
required |
**kwargs |
Any
|
Additional keyword arguments (unused in this implementation). |
{}
|
Returns:
Type | Description |
---|---|
list[AgentCard]
|
A list of AgentCard objects representing discovered agents. |
format_agent_description(agent_card)
staticmethod
Format the description of an agent card including skills information.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
agent_card |
AgentCard
|
The agent card to format. |
required |
Returns:
Name | Type | Description |
---|---|---|
str |
str
|
The formatted description including skills. |
get_name_preprocessor()
Get the name preprocessor based on the provider.
This will be used to correct the agent name and tool name. (mostly tool name)
Returns:
Name | Type | Description |
---|---|---|
NamePreprocessor |
NamePreprocessor
|
The name preprocessor for the model. |
send_to_agent(agent_card, message, **kwargs)
Synchronously sends a message to another agent using the A2A protocol.
This method is a synchronous wrapper around asend_to_agent. It handles the creation of an event loop if one doesn't exist, and manages the asynchronous call internally.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
agent_card |
AgentCard
|
The AgentCard instance containing the target agent's details including URL, authentication requirements, and capabilities. |
required |
message |
str | dict[str, Any]
|
The message to send to the agent. Can be either a string for simple text messages or a dictionary for structured data. |
required |
**kwargs |
Any
|
Additional keyword arguments passed to asend_to_agent. |
{}
|
Returns:
Type | Description |
---|---|
dict[str, Any]
|
A dictionary containing the response details: - status (str): 'success' or 'error' - content (str): Extracted text content from the response - task_id (str, optional): ID of the created/updated task - task_state (str, optional): Current state of the task - raw_response (str): Complete JSON response from the A2A client - error_type (str, optional): Type of error if status is 'error' - message (str, optional): Error message if status is 'error' |
Raises:
Type | Description |
---|---|
RuntimeError
|
If called from within an existing event loop or if asend_to_agent encounters an unhandled exception. |
to_a2a(agent_card, **kwargs)
Converts the agent to an A2A-compatible ASGI application.
This implementation provides a base setup for A2A server components. Subclasses can override this method if they need custom executor or task store implementations.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
agent_card |
AgentCard
|
The agent card to use for the A2A application. |
required |
**kwargs |
Any
|
Additional keyword arguments for ASGI application configuration. |
{}
|
Returns:
Type | Description |
---|---|
Starlette
|
A Starlette ASGI application that can be used with any ASGI server. |
BaseLangGraphAgent(name, instruction, description=None, model=None, tools=None, state_schema=None, thread_id_key='thread_id', event_emitter=None, checkpointer=None, **kwargs)
Bases: BaseAgent
Base class for LangGraph-based agents with unified tool approach.
Provides core LangGraph functionality including: - Graph compilation and execution - State schema management - I/O mapping between user inputs and graph states - Event emission support - Tool resolution and handling - A2A communication capabilities via tools - Agent delegation capabilities via tools - MCP server integration via tools - Enhanced output extraction from various state formats
Tool Management: - regular_tools: Standard LangChain tools provided during initialization - mcp_tools: Tools retrieved from MCP servers - resolved_tools: Combined collection of all tools for graph execution
Subclasses must implement: - define_graph(): Define the specific graph structure - _prepare_graph_input(): Convert user input to graph state - _format_graph_output(): Convert final graph state to user output
Initialize the BaseLangGraphAgent.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str
|
The name of the agent. |
required |
instruction |
str
|
The system instruction for the agent. |
required |
description |
str | None
|
Human-readable description of the agent. |
None
|
model |
Any | None
|
The model to use (lm_invoker, LangChain model, string, etc.). |
None
|
tools |
Sequence[BaseTool] | None
|
Sequence of regular LangChain tools (not A2A or delegation tools). |
None
|
state_schema |
Type | None
|
The state schema for the LangGraph. Defaults to basic message state. |
None
|
thread_id_key |
str
|
Key for thread ID in configuration. |
'thread_id'
|
event_emitter |
EventEmitter | None
|
Optional event emitter for streaming updates. |
None
|
checkpointer |
Checkpointer | None
|
Optional checkpointer for conversation persistence. |
None
|
**kwargs |
Any
|
Additional keyword arguments passed to BaseAgent. |
{}
|
add_mcp_server(mcp_config)
Add MCP server configuration.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mcp_config |
dict[str, dict[str, Any]]
|
Dictionary containing MCP server configurations. |
required |
arun(query, **kwargs)
async
Asynchronously run the LangGraph agent.
If MCP configuration exists, connects to the MCP server and registers tools before running.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query |
str
|
The input query for the agent. |
required |
**kwargs |
Any
|
Additional keyword arguments including configurable for LangGraph. |
{}
|
Returns:
Type | Description |
---|---|
dict[str, Any]
|
Dictionary containing the agent's response and full final state. |
arun_a2a_stream(query, **kwargs)
async
Asynchronously streams the agent's response in a generic format for A2A.
If MCP configuration exists, connects to the MCP server and registers tools before streaming.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query |
str
|
The input query for the agent. |
required |
**kwargs |
Any
|
Additional keyword arguments. |
{}
|
Yields:
Type | Description |
---|---|
AsyncGenerator[dict[str, Any], None]
|
Dictionaries with "status" and "content" keys. |
AsyncGenerator[dict[str, Any], None]
|
Possible statuses: "working", "completed", "failed", "canceled". |
arun_stream(query, **kwargs)
async
Asynchronously stream the LangGraph agent's response.
If MCP configuration exists, connects to the MCP server and registers tools before streaming. This method properly handles both LM Invoker and LangChain model streaming: - For LM Invoker: Uses StreamEventHandler to capture streaming events - For LangChain models: Uses LangGraph's native streaming implementation
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query |
str
|
The input query for the agent. |
required |
**kwargs |
Any
|
Additional keyword arguments. |
{}
|
Yields:
Type | Description |
---|---|
AsyncGenerator[str | dict[str, Any], None]
|
Chunks of output (strings or dicts) from the streaming response. |
define_graph(graph_builder)
abstractmethod
Define the specific graph structure for this agent type.
Subclasses must implement this method to: 1. Add nodes to the graph_builder 2. Add edges and conditional edges 3. Set entry points 4. Return the compiled graph
Parameters:
Name | Type | Description | Default |
---|---|---|---|
graph_builder |
StateGraph
|
The StateGraph builder to define nodes and edges on. |
required |
Returns:
Type | Description |
---|---|
CompiledStateGraph
|
The compiled graph ready for execution. |
register_a2a_agents(agent_cards)
Register A2A communication capabilities using the A2A tool manager.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
agent_cards |
list[AgentCard]
|
List of AgentCard instances for external communication. |
required |
register_delegation_agents(agents)
Register internal agent delegation capabilities using the delegation tool manager.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
agents |
list[BaseAgent]
|
List of BaseAgent instances for internal task delegation. |
required |
run(query, **kwargs)
Synchronously run the LangGraph agent.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query |
str
|
The input query for the agent. |
required |
**kwargs |
Any
|
Additional keyword arguments. |
{}
|
Returns:
Type | Description |
---|---|
dict[str, Any]
|
Dictionary containing the agent's response. |
update_regular_tools(new_tools, rebuild_graph=None)
Update regular tools (not capabilities).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
new_tools |
list[BaseTool]
|
New list of regular tools to use. |
required |
rebuild_graph |
bool | None
|
Whether to rebuild graph. If None, uses auto_rebuild_graph setting. |
None
|
GoogleADKAgent(name, instruction, model, tools=None, description=None, max_iterations=3, agents=None, **kwargs)
Bases: BaseAgent
An agent that wraps a native Google ADK Agent.
This class implements the AgentInterface and uses Google's LlmAgent to handle the core conversation and tool execution logic via ADK's async-first design.
Initializes the GoogleADKAgent.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str
|
The name of this wrapper agent. |
required |
instruction |
str
|
The instruction for this wrapper agent. |
required |
model |
str
|
The name of the Google ADK model to use (e.g., "gemini-1.5-pro-latest"). |
required |
tools |
Optional[list[Any]]
|
An optional list of callable tools for the ADK agent. |
None
|
description |
Optional[str]
|
An optional human-readable description. |
None
|
max_iterations |
int
|
Maximum number of iterations to run (default: 3). |
3
|
agents |
Optional[List[GoogleADKAgent]]
|
Optional list of sub-agents that this agent can delegate to using ADK's built-in multi-agent capabilities. These will be passed as sub_agents to the underlying LlmAgent. |
None
|
**kwargs |
Any
|
Additional keyword arguments passed to the parent |
{}
|
add_mcp_server(mcp_config)
Adds a new MCP server configuration.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mcp_config |
Dict[str, Dict[str, Any]]
|
Dictionary containing server name as key and its configuration as value. |
required |
Raises:
Type | Description |
---|---|
ValueError
|
If mcp_config is empty or None, or if any server configuration is invalid. |
KeyError
|
If any server name already exists in the configuration. |
arun(query, **kwargs)
async
Asynchronously runs the agent with the given query and returns the response.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query |
str
|
The user's query to process. |
required |
**kwargs |
Any
|
Additional keyword arguments. Supports "session_id", "user_id", "app_name". |
{}
|
Returns:
Type | Description |
---|---|
Dict[str, Any]
|
A dictionary containing the output and other metadata. |
arun_a2a_stream(query, configurable=None, **kwargs)
async
Asynchronously streams the agent's response in a format compatible with A2A.
This method formats the ADK agent's streaming responses into a consistent format that the A2A executor can understand and process.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query |
str
|
The input query for the agent. |
required |
configurable |
Optional[Dict[str, Any]]
|
Optional dictionary for configuration, may include: - thread_id: The A2A task ID (used as session_id). |
None
|
**kwargs |
Any
|
Additional keyword arguments. Supports "user_id", "app_name". |
{}
|
Yields:
Type | Description |
---|---|
AsyncGenerator[Dict[str, Any], None]
|
Dictionary with 'status' and 'content' fields that describe the agent's response state. |
arun_stream(query, **kwargs)
async
Runs the agent with the given query and streams the response parts.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query |
str
|
The user's query to process. |
required |
**kwargs |
Any
|
Additional keyword arguments. Supports "session_id", "user_id", "app_name". |
{}
|
Yields:
Type | Description |
---|---|
AsyncIterator[str]
|
Text response chunks from the model. If an error occurs, the error message is yielded. |
register_a2a_agents(agent_cards)
Convert known A2A agents to LangChain tools.
This method takes the agents from a2a_config.known_agents, creates A2AAgent instances for each one, and wraps them in LangChain tools.
Returns:
Name | Type | Description |
---|---|---|
None |
None
|
The tools are added to the existing tools list. |
run(query, **kwargs)
Synchronously runs the Google ADK agent by wrapping the internal async run method.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query |
str
|
The input query for the agent. |
required |
**kwargs |
Any
|
Additional keyword arguments passed to the internal async run method. Supports "session_id", "user_id", "app_name". |
{}
|
Returns:
Type | Description |
---|---|
Dict[str, Any]
|
A dictionary containing the agent's response. |
Raises:
Type | Description |
---|---|
RuntimeError
|
If |
LangChainAgent(name, instruction=DEFAULT_INSTRUCTION, model=None, tools=None, agents=None, description=None, thread_id_key='thread_id', event_emitter=None, **kwargs)
Bases: LangGraphReactAgent
Alias for LangGraphReactAgent.
Initialize the LangGraph ReAct Agent.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str
|
The name of the agent. |
required |
instruction |
str
|
The system instruction for the agent. |
DEFAULT_INSTRUCTION
|
model |
BaseChatModel | str | Any | None
|
The model to use (lm_invoker, LangChain model, string, etc.). |
None
|
tools |
Sequence[BaseTool] | None
|
Sequence of LangChain tools available to the agent. |
None
|
agents |
Sequence[Any] | None
|
Optional sequence of sub-agents for delegation (coordinator mode). |
None
|
description |
str | None
|
Human-readable description of the agent. |
None
|
thread_id_key |
str
|
Key for thread ID in configuration. |
'thread_id'
|
event_emitter |
EventEmitter | None
|
Optional event emitter for streaming updates. |
None
|
**kwargs |
Any
|
Additional keyword arguments passed to BaseLangGraphAgent. |
{}
|
LangGraphAgent(name, instruction=DEFAULT_INSTRUCTION, model=None, tools=None, agents=None, description=None, thread_id_key='thread_id', event_emitter=None, **kwargs)
Bases: LangGraphReactAgent
Alias for LangGraphReactAgent.
Initialize the LangGraph ReAct Agent.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str
|
The name of the agent. |
required |
instruction |
str
|
The system instruction for the agent. |
DEFAULT_INSTRUCTION
|
model |
BaseChatModel | str | Any | None
|
The model to use (lm_invoker, LangChain model, string, etc.). |
None
|
tools |
Sequence[BaseTool] | None
|
Sequence of LangChain tools available to the agent. |
None
|
agents |
Sequence[Any] | None
|
Optional sequence of sub-agents for delegation (coordinator mode). |
None
|
description |
str | None
|
Human-readable description of the agent. |
None
|
thread_id_key |
str
|
Key for thread ID in configuration. |
'thread_id'
|
event_emitter |
EventEmitter | None
|
Optional event emitter for streaming updates. |
None
|
**kwargs |
Any
|
Additional keyword arguments passed to BaseLangGraphAgent. |
{}
|
LangGraphReactAgent(name, instruction=DEFAULT_INSTRUCTION, model=None, tools=None, agents=None, description=None, thread_id_key='thread_id', event_emitter=None, **kwargs)
Bases: BaseLangGraphAgent
A ReAct agent template built on LangGraph.
This agent can use either: - An LMInvoker (if self.lm_invoker is set by BaseAgent) - A LangChain BaseChatModel (if self.model is set by BaseAgent)
The graph structure follows the standard ReAct pattern: agent -> tools -> agent (loop) -> END
Initialize the LangGraph ReAct Agent.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str
|
The name of the agent. |
required |
instruction |
str
|
The system instruction for the agent. |
DEFAULT_INSTRUCTION
|
model |
BaseChatModel | str | Any | None
|
The model to use (lm_invoker, LangChain model, string, etc.). |
None
|
tools |
Sequence[BaseTool] | None
|
Sequence of LangChain tools available to the agent. |
None
|
agents |
Sequence[Any] | None
|
Optional sequence of sub-agents for delegation (coordinator mode). |
None
|
description |
str | None
|
Human-readable description of the agent. |
None
|
thread_id_key |
str
|
Key for thread ID in configuration. |
'thread_id'
|
event_emitter |
EventEmitter | None
|
Optional event emitter for streaming updates. |
None
|
**kwargs |
Any
|
Additional keyword arguments passed to BaseLangGraphAgent. |
{}
|
define_graph(graph_builder)
Define the ReAct agent graph structure.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
graph_builder |
StateGraph
|
The StateGraph builder to define the graph structure. |
required |
Returns:
Type | Description |
---|---|
CompiledStateGraph
|
Compiled LangGraph ready for execution. |