Agent
This module initializes the agent package.
Exposes the core agent classes and interfaces.
AgentInterface(name, instruction, description=None, lm_invoker=None, config=None, **kwargs)
Bases: ABC
A general and minimal interface for agent implementations.
Defines core execution methods (__init__, run, arun, arun_stream).
Concrete subclasses must implement all abstract methods.
Initializes the agent.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name |
str
|
The name of the agent. |
required |
instruction |
str
|
The core directive or system prompt for the agent. |
required |
description |
str | None
|
Human-readable description. Defaults to instruction if not provided. |
None
|
lm_invoker |
BaseLMInvoker | None
|
The language model invoker to use for LLM interactions. Defaults to None. |
None
|
config |
BaseAgentConfig | None
|
Additional configuration for the agent. |
None
|
**kwargs |
Any
|
Additional keyword arguments for concrete implementations. |
{}
|
add_mcp_server(mcp_config)
abstractmethod
Adds a new MCP server configuration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
mcp_config |
dict[str, dict[str, Any]]
|
Dictionary containing server name as key and its configuration as value. |
required |
Raises:
| Type | Description |
|---|---|
ValueError
|
If mcp_config is empty or None, or if any server configuration is invalid. |
KeyError
|
If any server name already exists in the configuration. |
arun(query, **kwargs)
abstractmethod
async
Asynchronously runs the agent.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query |
str
|
The input query for the agent. |
required |
**kwargs |
Any
|
Additional keyword arguments for execution. |
{}
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dict containing at least {'output': ...}. |
arun_stream(query, **kwargs)
abstractmethod
async
Asynchronously streams the agent's response.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query |
str
|
The input query. |
required |
**kwargs |
Any
|
Extra parameters for execution. |
{}
|
Yields:
| Type | Description |
|---|---|
AsyncGenerator[str | dict[str, Any], None]
|
Chunks of output (strings or dicts). |
register_a2a_agents(agents)
abstractmethod
Registers A2A agents from a list of AgentCards.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agents |
list[AgentCard]
|
A list of AgentCard instances. |
required |
run(query, **kwargs)
abstractmethod
Synchronously runs the agent.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query |
str
|
The input query for the agent. |
required |
**kwargs |
Any
|
Additional keyword arguments for execution. |
{}
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dict containing at least {'output': ...}. |
BaseAgent(name, instruction, description=None, model=None, tools=None, config=None, tool_configs=None, **kwargs)
Bases: AgentInterface
Base class for agents, providing common A2A client method implementations.
Concrete agent implementations (e.g., LangGraphAgent, GoogleADKAgent) should inherit from this class if they need to utilize the shared A2A client functionalities.
This class now supports flexible model handling: - model: Optional[Any] - can be an lm_invoker, string/ModelId, LangChain BaseChatModel, or other types - Automatically sets self.lm_invoker if an lm_invoker is provided or can be built - Stores the original model in self.model for subclass use - Enhanced credential support with automatic type detection
Initializes the BaseAgent.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name |
str
|
The name of the agent. |
required |
instruction |
str
|
The core directive or system prompt for the agent. |
required |
description |
str | None
|
Human-readable description. Defaults to instruction if not provided. |
None
|
model |
Any | None
|
The model to use. Can be: - BaseLMInvoker instance (will be set as self.lm_invoker) - String or ModelId (will build an lm_invoker) - LangChain BaseChatModel (will be stored in self.model) - Any other type (will be stored in self.model) |
None
|
tools |
list[Any] | None
|
List of tools available to the agent. |
None
|
config |
BaseAgentConfig | dict[str, Any] | None
|
Additional configuration for the agent. Can be a BaseAgentConfig instance or dict. |
None
|
tool_configs |
dict[str, Any] | None
|
Default tool configurations applied to all tool calls from this agent. |
None
|
**kwargs |
Any
|
Additional keyword arguments for AgentInterface. |
{}
|
mcp_config: dict[str, dict[str, Any]]
property
writable
Read-only view of MCP configuration.
Returns a copy to prevent direct mutation; use add_mcp_server() for changes.
model_provider: str
property
Get the provider of the model with simplified logic.
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
The provider of the model. |
add_mcp_server(mcp_config)
Adds MCP servers to the agent.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
mcp_config |
dict[str, dict[str, Any]]
|
A dictionary containing MCP server configurations. |
required |
Raises:
| Type | Description |
|---|---|
ValueError
|
If the MCP configuration is empty or None. |
KeyError
|
If a server with the same name already exists in the MCP configuration. |
asend_to_agent(agent_card, message, **kwargs)
async
Asynchronously sends a message to another agent using the A2A protocol.
This method handles the core A2A communication logic, creating and sending properly formatted A2A messages and processing the responses.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agent_card |
AgentCard
|
The AgentCard instance containing the target agent's details including URL, authentication requirements, and capabilities. |
required |
message |
str | dict[str, Any]
|
The message to send to the agent. Can be either a string for simple text messages or a dictionary for structured data. |
required |
**kwargs |
Any
|
Additional keyword arguments. |
{}
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
A dictionary containing the response details: - status (str): 'success' or 'error' - content (str): Extracted text content from the response - task_id (str, optional): ID of the created/updated task - task_state (str, optional): Current state of the task - raw_response (str): Complete JSON response from the A2A client - error_type (str, optional): Type of error if status is 'error' - message (str, optional): Error message if status is 'error' |
Raises:
| Type | Description |
|---|---|
HTTPError
|
If there's an HTTP-related error during the request. |
Exception
|
For any other unexpected errors during message sending or processing. |
astream_to_agent(agent_card, message, **kwargs)
async
Asynchronously sends a streaming message to another agent using the A2A protocol.
This method supports streaming responses from the target agent, yielding chunks of the response as they become available. It handles various types of streaming events including task status updates, artifact updates, and message parts.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agent_card |
AgentCard
|
The AgentCard instance containing the target agent's details including URL, authentication requirements, and capabilities. |
required |
message |
str | dict[str, Any]
|
The message to send to the agent. Can be either a string for simple text messages or a dictionary for structured data. |
required |
**kwargs |
Any
|
Additional keyword arguments. |
{}
|
Yields:
| Type | Description |
|---|---|
AsyncGenerator[dict[str, Any], None]
|
Dictionaries containing streaming response chunks: For successful chunks: - status (str): 'success' - content (str): Extracted text content from the chunk - task_id (str): ID of the associated task - task_state (str): Current state of the task - final (bool): Whether this is the final chunk - artifact_name (str, optional): Name of the artifact if chunk is an artifact update For error chunks: - status (str): 'error' - error_type (str): Type of error encountered - message (str): Error description |
Raises:
| Type | Description |
|---|---|
HTTPError
|
If there's an HTTP-related error during the streaming request. |
Exception
|
For any other unexpected errors during message streaming or processing. |
discover_agents(a2a_config, **kwargs)
classmethod
Discover agents from the URLs specified in a2a_config.discovery_urls.
This concrete implementation fetches and parses .well-known/agent.json from each discovery URL to build a list of available agents.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
a2a_config |
A2AClientConfig
|
Configuration containing discovery URLs and other A2A settings. |
required |
**kwargs |
Any
|
Additional keyword arguments (unused in this implementation). |
{}
|
Returns:
| Type | Description |
|---|---|
list[AgentCard]
|
A list of AgentCard objects representing discovered agents. |
format_agent_description(agent_card)
staticmethod
Format the description of an agent card including skills information.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agent_card |
AgentCard
|
The agent card to format. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
The formatted description including skills. |
get_name_preprocessor()
Get the name preprocessor based on the provider.
This will be used to correct the agent name and tool name. (mostly tool name)
Returns:
| Name | Type | Description |
|---|---|---|
NamePreprocessor |
NamePreprocessor
|
The name preprocessor for the model. |
send_to_agent(agent_card, message, **kwargs)
Synchronously sends a message to another agent using the A2A protocol.
This method is a synchronous wrapper around asend_to_agent. It handles the creation of an event loop if one doesn't exist, and manages the asynchronous call internally.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agent_card |
AgentCard
|
The AgentCard instance containing the target agent's details including URL, authentication requirements, and capabilities. |
required |
message |
str | dict[str, Any]
|
The message to send to the agent. Can be either a string for simple text messages or a dictionary for structured data. |
required |
**kwargs |
Any
|
Additional keyword arguments passed to asend_to_agent. |
{}
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
A dictionary containing the response details: - status (str): 'success' or 'error' - content (str): Extracted text content from the response - task_id (str, optional): ID of the created/updated task - task_state (str, optional): Current state of the task - raw_response (str): Complete JSON response from the A2A client - error_type (str, optional): Type of error if status is 'error' - message (str, optional): Error message if status is 'error' |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If called from within an existing event loop or if asend_to_agent encounters an unhandled exception. |
to_a2a(agent_card, **kwargs)
Converts the agent to an A2A-compatible ASGI application.
This implementation provides a base setup for A2A server components. Subclasses can override this method if they need custom executor or task store implementations.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agent_card |
AgentCard
|
The agent card to use for the A2A application. |
required |
**kwargs |
Any
|
Additional keyword arguments for ASGI application configuration. |
{}
|
Returns:
| Type | Description |
|---|---|
Starlette
|
A Starlette ASGI application that can be used with any ASGI server. |
BaseLangGraphAgent(name, instruction, description=None, model=None, tools=None, state_schema=None, thread_id_key='thread_id', event_emitter=None, checkpointer=None, enable_a2a_token_streaming=False, **kwargs)
Bases: BaseAgent
Base class for LangGraph-based agents with unified tool approach.
Provides core LangGraph functionality including: - Graph compilation and execution - State schema management - I/O mapping between user inputs and graph states - Event emission support - Tool resolution and handling - A2A communication capabilities via tools - Agent delegation capabilities via tools - MCP server integration via tools - Enhanced output extraction from various state formats
Tool Management: - regular_tools: Standard LangChain tools provided during initialization - mcp_tools: Tools retrieved from MCP servers - resolved_tools: Combined collection of all tools for graph execution
Subclasses must implement: - define_graph(): Define the specific graph structure - _prepare_graph_input(): Convert user input to graph state - _format_graph_output(): Convert final graph state to user output
Initialize the BaseLangGraphAgent.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name |
str
|
The name of the agent. |
required |
instruction |
str
|
The system instruction for the agent. |
required |
description |
str | None
|
Human-readable description of the agent. |
None
|
model |
Any | None
|
The model to use (lm_invoker, LangChain model, string, etc.). |
None
|
tools |
Sequence[BaseTool] | None
|
Sequence of regular LangChain tools (not A2A or delegation tools). |
None
|
state_schema |
type | None
|
The state schema for the LangGraph. Defaults to basic message state. |
None
|
thread_id_key |
str
|
Key for thread ID in configuration. |
'thread_id'
|
event_emitter |
EventEmitter | None
|
Optional event emitter for streaming updates. |
None
|
checkpointer |
Checkpointer | None
|
Optional checkpointer for conversation persistence. |
None
|
enable_a2a_token_streaming |
bool
|
Enable token-level streaming for A2A responses. - False (default): Stream message-level events only - True: Stream individual tokens plus message-level events |
False
|
**kwargs |
Any
|
Additional keyword arguments passed to BaseAgent (including tool_configs and memory settings). Memory settings include: - memory_backend: Memory backend (e.g., "mem0") - agent_id: Agent identifier for memory scoping - memory_namespace: Memory namespace - save_interaction_to_memory: Whether to save interactions (default True) |
{}
|
arun(query, **kwargs)
async
Asynchronously run the LangGraph agent with lazy MCP initialization.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query |
str
|
The input query for the agent. |
required |
**kwargs |
Any
|
Additional keyword arguments including configurable for LangGraph. |
{}
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary containing the agent's response and full final state. |
arun_a2a_stream(query, **kwargs)
async
Asynchronously streams the agent's response in A2A format.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query |
str
|
The input query for the agent. |
required |
**kwargs |
Any
|
Additional keyword arguments. |
{}
|
Yields:
| Type | Description |
|---|---|
AsyncGenerator[dict[str, Any], None]
|
Dictionaries with "status" and "content" keys. |
AsyncGenerator[dict[str, Any], None]
|
Possible statuses: "working", "completed", "failed", "canceled". |
arun_stream(query, **kwargs)
async
Asynchronously stream the LangGraph agent's response.
If MCP configuration exists, connects to the MCP server and registers tools before streaming. This method properly handles both LM Invoker and LangChain model streaming: - For LM Invoker: Uses StreamEventHandler to capture streaming events - For LangChain models: Uses LangGraph's native streaming implementation
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query |
str
|
The input query for the agent. |
required |
**kwargs |
Any
|
Additional keyword arguments. |
{}
|
Yields:
| Type | Description |
|---|---|
AsyncGenerator[str | dict[str, Any], None]
|
Chunks of output (strings or dicts) from the streaming response. |
cleanup()
async
Cleanup MCP resources including persistent sessions.
This method performs best-effort cleanup of MCP client resources. Errors during cleanup are logged but do not raise exceptions to ensure the cleanup process completes gracefully.
define_graph(graph_builder)
abstractmethod
Define the specific graph structure for this agent type.
Subclasses must implement this method to: 1. Add nodes to the graph_builder 2. Add edges and conditional edges 3. Set entry points 4. Return the compiled graph
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
graph_builder |
StateGraph
|
The StateGraph builder to define nodes and edges on. |
required |
Returns:
| Type | Description |
|---|---|
CompiledStateGraph
|
The compiled graph ready for execution. |
register_a2a_agents(agent_cards)
Register A2A communication capabilities using the A2A tool manager.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agent_cards |
list[AgentCard]
|
List of AgentCard instances for external communication. |
required |
register_delegation_agents(agents)
Register internal agent delegation capabilities using the delegation tool manager.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agents |
list[BaseAgent]
|
List of BaseAgent instances for internal task delegation. |
required |
run(query, **kwargs)
Synchronously run the LangGraph agent.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query |
str
|
The input query for the agent. |
required |
**kwargs |
Any
|
Additional keyword arguments. |
{}
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary containing the agent's response. |
set_operation_mode(mode)
Set the operation mode for dependency tracking.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
mode |
str
|
Operation mode - "parallel" (default) or "sequential" |
required |
update_regular_tools(new_tools, rebuild_graph=None)
Update regular tools (not capabilities).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
new_tools |
list[BaseTool]
|
New list of regular tools to use. |
required |
rebuild_graph |
bool | None
|
Whether to rebuild graph. If None, uses auto_rebuild_graph setting. |
None
|
GoogleADKAgent(name, instruction, model, tools=None, description=None, max_iterations=3, agents=None, **kwargs)
Bases: BaseAgent
An agent that wraps a native Google ADK Agent with MCP support.
This class implements the AgentInterface and uses Google's LlmAgent to handle the core conversation and tool execution logic via ADK's async-first design. It includes persistent MCP session management for stateful tool execution across multiple calls.
The agent supports: - Native ADK tools (FunctionTool, LangchainTool) - MCP tools via GoogleADKMCPClient with session persistence - Sub-agent delegation using ADK's built-in multi-agent capabilities - A2A communication through tool integration
Initializes the GoogleADKAgent with MCP support.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name |
str
|
The name of this wrapper agent. |
required |
instruction |
str
|
The instruction for this wrapper agent. |
required |
model |
str
|
The name of the Google ADK model to use (e.g., "gemini-1.5-pro-latest"). |
required |
tools |
Optional[list[Any]]
|
An optional list of callable tools for the ADK agent. |
None
|
description |
Optional[str]
|
An optional human-readable description. |
None
|
max_iterations |
int
|
Maximum number of iterations to run (default: 3). |
3
|
agents |
Optional[List[GoogleADKAgent]]
|
Optional list of sub-agents that this agent can delegate to using ADK's built-in multi-agent capabilities. These will be passed as sub_agents to the underlying LlmAgent. |
None
|
**kwargs |
Any
|
Additional keyword arguments passed to the parent |
{}
|
arun(query, **kwargs)
async
Asynchronously runs the agent with MCP tool support.
This method ensures MCP tools are properly initialized before execution and provides persistent session management for stateful MCP tools.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query |
str
|
The user's query to process. |
required |
**kwargs |
Any
|
Additional keyword arguments. Supports "session_id", "user_id", "app_name". |
{}
|
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
A dictionary containing the output, tool_calls, and session_id. |
arun_a2a_stream(query, configurable=None, **kwargs)
async
Asynchronously streams the agent's response in a format compatible with A2A.
This method formats the ADK agent's streaming responses into a consistent format that the A2A executor can understand and process.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query |
str
|
The input query for the agent. |
required |
configurable |
Optional[Dict[str, Any]]
|
Optional dictionary for configuration, may include: - thread_id: The A2A task ID (used as session_id). |
None
|
**kwargs |
Any
|
Additional keyword arguments. Supports "user_id", "app_name". |
{}
|
Yields:
| Type | Description |
|---|---|
AsyncGenerator[Dict[str, Any], None]
|
Dictionary with 'status' and 'content' fields that describe the agent's response state. |
arun_stream(query, **kwargs)
async
Runs the agent with the given query and streams the response parts.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query |
str
|
The user's query to process. |
required |
**kwargs |
Any
|
Additional keyword arguments. Supports "session_id", "user_id", "app_name". |
{}
|
Yields:
| Type | Description |
|---|---|
AsyncIterator[str]
|
Text response chunks from the model. If an error occurs, the error message is yielded. |
cleanup()
async
Clean up ADK and MCP resources.
register_a2a_agents(agent_cards)
Convert known A2A agents to LangChain tools.
This method takes the agents from a2a_config.known_agents, creates A2AAgent instances for each one, and wraps them in LangChain tools.
Returns:
| Name | Type | Description |
|---|---|---|
None |
None
|
The tools are added to the existing tools list. |
run(query, **kwargs)
Synchronously runs the Google ADK agent by wrapping the internal async run method.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query |
str
|
The input query for the agent. |
required |
**kwargs |
Any
|
Additional keyword arguments passed to the internal async run method. Supports "session_id", "user_id", "app_name". |
{}
|
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
A dictionary containing the agent's response. |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If |
LangChainAgent(name, instruction=DEFAULT_INSTRUCTION, model=None, tools=None, agents=None, description=None, thread_id_key='thread_id', event_emitter=None, tool_output_manager=None, **kwargs)
Bases: LangGraphReactAgent
Alias for LangGraphReactAgent.
Initialize the LangGraph ReAct Agent.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name |
str
|
The name of the agent. |
required |
instruction |
str
|
The system instruction for the agent. |
DEFAULT_INSTRUCTION
|
model |
BaseChatModel | str | Any | None
|
The model to use (lm_invoker, LangChain model, string, etc.). |
None
|
tools |
Sequence[BaseTool] | None
|
Sequence of LangChain tools available to the agent. |
None
|
agents |
Sequence[Any] | None
|
Optional sequence of sub-agents for delegation (coordinator mode). |
None
|
description |
str | None
|
Human-readable description of the agent. |
None
|
thread_id_key |
str
|
Key for thread ID in configuration. |
'thread_id'
|
event_emitter |
EventEmitter | None
|
Optional event emitter for streaming updates. |
None
|
tool_output_manager |
ToolOutputManager | None
|
Optional ToolOutputManager instance for tool output management. When provided, enables tool output storage, reference resolution, and sharing capabilities. This enables multi-agent workflows where agents can access each other's tool outputs. If None, tool output management is disabled for this agent. |
None
|
**kwargs |
Any
|
Additional keyword arguments passed to BaseLangGraphAgent. |
{}
|
LangGraphAgent(name, instruction=DEFAULT_INSTRUCTION, model=None, tools=None, agents=None, description=None, thread_id_key='thread_id', event_emitter=None, tool_output_manager=None, **kwargs)
Bases: LangGraphReactAgent
Alias for LangGraphReactAgent.
Initialize the LangGraph ReAct Agent.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name |
str
|
The name of the agent. |
required |
instruction |
str
|
The system instruction for the agent. |
DEFAULT_INSTRUCTION
|
model |
BaseChatModel | str | Any | None
|
The model to use (lm_invoker, LangChain model, string, etc.). |
None
|
tools |
Sequence[BaseTool] | None
|
Sequence of LangChain tools available to the agent. |
None
|
agents |
Sequence[Any] | None
|
Optional sequence of sub-agents for delegation (coordinator mode). |
None
|
description |
str | None
|
Human-readable description of the agent. |
None
|
thread_id_key |
str
|
Key for thread ID in configuration. |
'thread_id'
|
event_emitter |
EventEmitter | None
|
Optional event emitter for streaming updates. |
None
|
tool_output_manager |
ToolOutputManager | None
|
Optional ToolOutputManager instance for tool output management. When provided, enables tool output storage, reference resolution, and sharing capabilities. This enables multi-agent workflows where agents can access each other's tool outputs. If None, tool output management is disabled for this agent. |
None
|
**kwargs |
Any
|
Additional keyword arguments passed to BaseLangGraphAgent. |
{}
|
LangGraphReactAgent(name, instruction=DEFAULT_INSTRUCTION, model=None, tools=None, agents=None, description=None, thread_id_key='thread_id', event_emitter=None, tool_output_manager=None, **kwargs)
Bases: LangGraphHitLMixin, BaseLangGraphAgent
A ReAct agent template built on LangGraph.
This agent can use either: - An LMInvoker (if self.lm_invoker is set by BaseAgent) - A LangChain BaseChatModel (if self.model is set by BaseAgent)
The graph structure follows the standard ReAct pattern: agent -> tools -> agent (loop) -> END
Initialize the LangGraph ReAct Agent.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name |
str
|
The name of the agent. |
required |
instruction |
str
|
The system instruction for the agent. |
DEFAULT_INSTRUCTION
|
model |
BaseChatModel | str | Any | None
|
The model to use (lm_invoker, LangChain model, string, etc.). |
None
|
tools |
Sequence[BaseTool] | None
|
Sequence of LangChain tools available to the agent. |
None
|
agents |
Sequence[Any] | None
|
Optional sequence of sub-agents for delegation (coordinator mode). |
None
|
description |
str | None
|
Human-readable description of the agent. |
None
|
thread_id_key |
str
|
Key for thread ID in configuration. |
'thread_id'
|
event_emitter |
EventEmitter | None
|
Optional event emitter for streaming updates. |
None
|
tool_output_manager |
ToolOutputManager | None
|
Optional ToolOutputManager instance for tool output management. When provided, enables tool output storage, reference resolution, and sharing capabilities. This enables multi-agent workflows where agents can access each other's tool outputs. If None, tool output management is disabled for this agent. |
None
|
**kwargs |
Any
|
Additional keyword arguments passed to BaseLangGraphAgent. |
{}
|
define_graph(graph_builder)
Define the ReAct agent graph structure.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
graph_builder |
StateGraph
|
The StateGraph builder to define the graph structure. |
required |
Returns:
| Type | Description |
|---|---|
CompiledStateGraph
|
Compiled LangGraph ready for execution. |
LangflowAgent(name, flow_id, description=None, base_url=None, api_key=None, config=None, **kwargs)
Bases: BaseAgent
Langflow agent implementation for executing Langflow flows.
This agent integrates with Langflow APIs to execute flows while providing full compatibility with the SDK's agent framework, including: - Synchronous and asynchronous execution - Regular and A2A streaming support - Session management for conversation continuity - Error handling and retry logic - Credential management through BaseAgent
The agent inherits from BaseAgent to leverage existing A2A infrastructure while implementing Langflow-specific execution logic.
Initialize the LangflowAgent.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name |
str
|
The name of the agent. |
required |
flow_id |
str
|
The unique identifier of the Langflow flow to execute. |
required |
description |
str | None
|
Human-readable description. |
None
|
base_url |
str | None
|
The base URL of the Langflow API server. |
None
|
api_key |
str | None
|
The API key for Langflow authentication. |
None
|
config |
LangflowAgentConfig | dict[str, Any] | None
|
Langflow-specific configuration or dict. |
None
|
**kwargs |
Any
|
Additional keyword arguments passed to BaseAgent. |
{}
|
add_mcp_server(mcp_config)
Add MCP server configuration (not supported for Langflow agents).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
mcp_config |
dict[str, dict[str, Any]]
|
MCP server configuration. |
required |
Raises:
| Type | Description |
|---|---|
NotImplementedError
|
Langflow agents don't support MCP servers. |
arun(query, **kwargs)
async
Asynchronously run the Langflow agent.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query |
str
|
The input query for the agent. |
required |
**kwargs |
Any
|
Additional keyword arguments. |
{}
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary containing the agent's response and metadata. |
arun_a2a_stream(query, **kwargs)
async
Asynchronously stream the agent's response in A2A format.
This method converts Langflow streaming events into A2A-compatible events following the patterns established by BaseLangGraphAgent.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query |
str
|
The input query for the agent. |
required |
**kwargs |
Any
|
Additional keyword arguments. |
{}
|
Yields:
| Type | Description |
|---|---|
AsyncGenerator[dict[str, Any], None]
|
A2A-compatible event dictionaries with semantic event types. |
arun_stream(query, **kwargs)
async
Asynchronously stream the Langflow agent's response.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query |
str
|
The input query for the agent. |
required |
**kwargs |
Any
|
Additional keyword arguments. |
{}
|
Yields:
| Type | Description |
|---|---|
AsyncGenerator[str | dict[str, Any], None]
|
Chunks of output (strings or dicts) from the streaming response. |
health_check()
async
Check if the Langflow API is accessible.
Returns:
| Type | Description |
|---|---|
bool
|
True if the API is accessible, False otherwise. |
register_a2a_agents(agents)
Register A2A agents (not supported for Langflow agents).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agents |
list[AgentCard]
|
List of AgentCard instances. |
required |
Raises:
| Type | Description |
|---|---|
NotImplementedError
|
Langflow agents don't support A2A agent registration. |
run(query, **kwargs)
Synchronously run the Langflow agent.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query |
str
|
The input query for the agent. |
required |
**kwargs |
Any
|
Additional keyword arguments. |
{}
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary containing the agent's response. |
MemoryRecallAgent(memory, **kwargs)
Bases: LangGraphReactAgent
Simplified mini-agent for automatic memory retrieval and query enhancement.
This agent has a simple 2-node LangGraph (agent + tools) and uses existing memory infrastructure to enhance user queries with relevant context. It acts as a preprocessing layer that automatically attempts memory retrieval for every query.
Key features:
- Uses runtime memory_user_id provided via call arguments (no static storage)
- Uses simplified instruction reusing existing guidance
- Standard 2-node LangGraph pattern (agent → tools → agent)
- Automatically enhances queries with memory context when available
- Returns original query unchanged if no relevant memories found
Initialize the MemoryRecallAgent with memory backend and configuration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
memory |
Memory backend instance (Mem0Memory or compatible) |
required | |
**kwargs |
Additional arguments passed to BaseLangGraphAgent, including: - memory_agent_id: Fallback user ID for memory operations - model: LLM model to use for memory decisions - Other BaseLangGraphAgent parameters |
{}
|
define_graph(graph_builder)
Define the 3-node memory recall LangGraph for this agent.
This creates a streamlined ReAct-inspired structure that reuses
LangGraphReactAgent helpers for robust LM invocation, token usage tracking,
error handling, and tool execution.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
graph_builder |
StateGraph
|
LangGraph |
required |
Returns:
| Name | Type | Description |
|---|---|---|
CompiledStateGraph |
CompiledStateGraph
|
The compiled memory recall graph ready for execution. |