Map reduce step
A pipeline step that applies a map function to multiple inputs and reduces the results.
References
NONE
MapReduceStep(name, output_state, map_func, reduce_func=lambda results: results, input_state_map=None, runtime_config_map=None, fixed_args=None, input_map=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None)
Bases: BasePipelineStep, HasInputsMixin
A step that applies a mapping function to multiple inputs and reduces the results.
This step performs parallel processing of multiple input items using: 1. A map function that processes each input item independently. The map function receives a dictionary containing the input values for the current item (derived from input_state_map, runtime_config_map, and fixed_args). 2. A reduce function that combines all the mapped results.
Note on parallel execution: 1. For true parallelism, the map_func MUST be an async function or a Component. 2. Synchronous map functions will block the event loop and run sequentially.
Input handling: 1. Automatically detects which inputs are lists/sequences. 2. Ensures all list inputs have the same length. 3. Broadcasts scalar values to match list lengths. 4. If no list inputs, applies the map function once to the whole input.
Internally, this step uses asyncio.gather() for efficient parallel execution.
Attributes:
| Name | Type | Description |
|---|---|---|
name |
str
|
A unique identifier for this step. |
map_func |
Component | Callable[[dict[str, Any]], Any]
|
Function to apply to each input item. Will be run in parallel if the function is an asynchronous function. |
reduce_func |
Callable[[list[Any]], Any]
|
Function to reduce the mapped results. |
input_map |
dict[str, str | Any] | None
|
Unified input map. |
output_state |
str
|
Key to store the reduced result in the pipeline state. |
retry_policy |
RetryPolicy | None
|
Configuration for retry behavior using LangGraph's RetryPolicy. |
Initialize a new MapReduceStep.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name |
str
|
A unique identifier for this step. |
required |
output_state |
str
|
Key to store the reduced result in the pipeline state. |
required |
map_func |
Component | Callable[[dict[str, Any]], Any]
|
Function to apply to each input item. The map function receives a dictionary containing the input values derived from input_state_map, runtime_config_map, and fixed_args. |
required |
reduce_func |
Callable[[list[Any]], Any]
|
Function to reduce the mapped results. Defaults to a function that returns the list of results as is. |
lambda results: results
|
input_state_map |
dict[str, str] | None
|
Mapping of function arguments to pipeline state keys. Defaults to None. |
None
|
runtime_config_map |
dict[str, str] | None
|
Mapping of arguments to runtime config keys. Defaults to None. |
None
|
fixed_args |
dict[str, Any] | None
|
Fixed arguments to pass to the functions. Defaults to None. |
None
|
input_map |
InputMapSpec | None
|
Unified input map. Can be a dict (arg -> str|Val) or a list with elements: 1. str for identity mapping 2. dict[str, str] for state/config mapping 3. dict[str, Val] for fixed args. If provided, it will be used directly instead of synthesizing from maps. Defaults to None. |
None
|
retry_config |
RetryConfig | None
|
Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied. |
None
|
error_handler |
BaseStepErrorHandler | None
|
Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used. |
None
|
cache_store |
'BaseCache' | None
|
Cache store to be used for caching. Defaults to None, in which case no cache store is used. |
None
|
cache_config |
dict[str, Any] | None
|
Cache configuration to be used for caching. Defaults to None, in which case no cache configuration is used. |
None
|
execute(state, runtime)
async
Execute the map and reduce operations.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state |
dict[str, Any]
|
The current state of the pipeline. |
required |
runtime |
Runtime[dict[str, Any] | BaseModel]
|
Runtime information for this step's execution. |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
dict[str, Any]: The reduced result stored under output_state. |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If an error occurs during execution. |