Base langgraph agent
Base class for LangGraph-based agent implementations.
This class provides the core LangGraph machinery including graph compilation, state handling, and I/O mapping for LangGraph agents.
BaseLangGraphAgent(name, instruction, description=None, model=None, tools=None, state_schema=None, thread_id_key='thread_id', event_emitter=None, checkpointer=None, enable_a2a_token_streaming=False, **kwargs)
Bases: BaseAgent
Base class for LangGraph-based agents with unified tool approach.
Provides core LangGraph functionality including: - Graph compilation and execution - State schema management - I/O mapping between user inputs and graph states - Event emission support - Tool resolution and handling - A2A communication capabilities via tools - Agent delegation capabilities via tools - MCP server integration via tools - Enhanced output extraction from various state formats
Tool Management: - regular_tools: Standard LangChain tools provided during initialization - mcp_tools: Tools retrieved from MCP servers - resolved_tools: Combined collection of all tools for graph execution
Subclasses must implement: - define_graph(): Define the specific graph structure - _prepare_graph_input(): Convert user input to graph state - _format_graph_output(): Convert final graph state to user output
Initialize the BaseLangGraphAgent.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name |
str
|
The name of the agent. |
required |
instruction |
str
|
The system instruction for the agent. |
required |
description |
str | None
|
Human-readable description of the agent. |
None
|
model |
Any | None
|
The model to use (lm_invoker, LangChain model, string, etc.). |
None
|
tools |
Sequence[BaseTool] | None
|
Sequence of regular LangChain tools (not A2A or delegation tools). |
None
|
state_schema |
type | None
|
The state schema for the LangGraph. Defaults to basic message state. |
None
|
thread_id_key |
str
|
Key for thread ID in configuration. |
'thread_id'
|
event_emitter |
EventEmitter | None
|
Optional event emitter for streaming updates. |
None
|
checkpointer |
Checkpointer | None
|
Optional checkpointer for conversation persistence. |
None
|
enable_a2a_token_streaming |
bool
|
Enable token-level streaming for A2A responses. - False (default): Stream message-level events only - True: Stream individual tokens plus message-level events |
False
|
**kwargs |
Any
|
Additional keyword arguments passed to BaseAgent (including tool_configs and memory settings). Memory settings include: - memory_backend: Memory backend (e.g., "mem0") - agent_id: Agent identifier for memory scoping - memory_namespace: Memory namespace - save_interaction_to_memory: Whether to save interactions (default True) |
{}
|
arun(query, **kwargs)
async
Asynchronously run the LangGraph agent with lazy MCP initialization.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query |
str
|
The input query for the agent. |
required |
**kwargs |
Any
|
Additional keyword arguments including configurable for LangGraph. |
{}
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary containing the agent's response and full final state. |
arun_a2a_stream(query, **kwargs)
async
Asynchronously streams the agent's response in A2A format.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query |
str
|
The input query for the agent. |
required |
**kwargs |
Any
|
Additional keyword arguments. |
{}
|
Yields:
| Type | Description |
|---|---|
AsyncGenerator[dict[str, Any], None]
|
Dictionaries with "status" and "content" keys. |
AsyncGenerator[dict[str, Any], None]
|
Possible statuses: "working", "completed", "failed", "canceled". |
arun_stream(query, **kwargs)
async
Asynchronously stream the LangGraph agent's response.
If MCP configuration exists, connects to the MCP server and registers tools before streaming. This method properly handles both LM Invoker and LangChain model streaming: - For LM Invoker: Uses StreamEventHandler to capture streaming events - For LangChain models: Uses LangGraph's native streaming implementation
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query |
str
|
The input query for the agent. |
required |
**kwargs |
Any
|
Additional keyword arguments. |
{}
|
Yields:
| Type | Description |
|---|---|
AsyncGenerator[str | dict[str, Any], None]
|
Chunks of output (strings or dicts) from the streaming response. |
cleanup()
async
Cleanup MCP resources including persistent sessions.
This method performs best-effort cleanup of MCP client resources. Errors during cleanup are logged but do not raise exceptions to ensure the cleanup process completes gracefully.
define_graph(graph_builder)
abstractmethod
Define the specific graph structure for this agent type.
Subclasses must implement this method to: 1. Add nodes to the graph_builder 2. Add edges and conditional edges 3. Set entry points 4. Return the compiled graph
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
graph_builder |
StateGraph
|
The StateGraph builder to define nodes and edges on. |
required |
Returns:
| Type | Description |
|---|---|
CompiledStateGraph
|
The compiled graph ready for execution. |
register_a2a_agents(agent_cards)
Register A2A communication capabilities using the A2A tool manager.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agent_cards |
list[AgentCard]
|
List of AgentCard instances for external communication. |
required |
register_delegation_agents(agents)
Register internal agent delegation capabilities using the delegation tool manager.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agents |
list[BaseAgent]
|
List of BaseAgent instances for internal task delegation. |
required |
run(query, **kwargs)
Synchronously run the LangGraph agent.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query |
str
|
The input query for the agent. |
required |
**kwargs |
Any
|
Additional keyword arguments. |
{}
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary containing the agent's response. |
set_operation_mode(mode)
Set the operation mode for dependency tracking.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
mode |
str
|
Operation mode - "parallel" (default) or "sequential" |
required |
update_regular_tools(new_tools, rebuild_graph=None)
Update regular tools (not capabilities).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
new_tools |
list[BaseTool]
|
New list of regular tools to use. |
required |
rebuild_graph |
bool | None
|
Whether to rebuild graph. If None, uses auto_rebuild_graph setting. |
None
|