Map reduce step
A pipeline step that applies a map function to multiple inputs and reduces the results.
References
NONE
MapReduceStep(name, input_state_map, output_state, map_func, reduce_func=lambda results: results, runtime_config_map=None, fixed_args=None)
Bases: BasePipelineStep
, HasInputsMixin
A step that applies a mapping function to multiple inputs and reduces the results.
This step performs parallel processing of multiple input items using: 1. A map function that processes each input item independently. The map function receives a dictionary containing the input values for the current item (derived from input_state_map, runtime_config_map, and fixed_args). 2. A reduce function that combines all the mapped results.
Note on parallel execution: 1. For true parallelism, the map_func MUST be an async function or a Component. 2. Synchronous map functions will block the event loop and run sequentially.
Input handling: 1. Automatically detects which inputs are lists/sequences. 2. Ensures all list inputs have the same length. 3. Broadcasts scalar values to match list lengths. 4. If no list inputs, applies the map function once to the whole input.
Internally, this step uses asyncio.gather() for efficient parallel execution.
Attributes:
Name | Type | Description |
---|---|---|
name |
str
|
A unique identifier for this step. |
map_func |
Component | Callable[[dict[str, Any]], Any]
|
Function to apply to each input item. Will be run in parallel if the function is an asynchronous function. |
reduce_func |
Callable[[list[Any]], Any]
|
Function to reduce the mapped results. |
input_state_map |
dict[str, str]
|
Mapping of function arguments to pipeline state keys. |
output_state |
str
|
Key to store the reduced result in the pipeline state. |
runtime_config_map |
dict[str, str] | None
|
Mapping of function arguments to runtime config keys. |
fixed_args |
dict[str, Any] | None
|
Fixed arguments to pass to the functions. |
Initialize a new MapReduceStep.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str
|
A unique identifier for this step. |
required |
input_state_map |
dict[str, str]
|
Mapping of function arguments to pipeline state keys. |
required |
output_state |
str
|
Key to store the reduced result in the pipeline state. |
required |
map_func |
Component | Callable[[dict[str, Any]], Any]
|
Function to apply to each input item. The map function receives a dictionary containing the input values derived from input_state_map, runtime_config_map, and fixed_args. |
required |
reduce_func |
Callable[[list[Any]], Any]
|
Function to reduce the mapped results. Defaults to a function that returns the list of results as is. |
lambda results: results
|
runtime_config_map |
dict[str, str] | None
|
Mapping of arguments to runtime config keys. Defaults to None. |
None
|
fixed_args |
dict[str, Any] | None
|
Fixed arguments to pass to the functions. Defaults to None. |
None
|
execute(state, config)
async
Execute the map and reduce operations.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
state |
dict[str, Any]
|
The current state of the pipeline. |
required |
config |
RunnableConfig
|
Runtime configuration. |
required |
Returns:
Type | Description |
---|---|
dict[str, Any]
|
dict[str, Any]: The reduced result stored under output_state. |
Raises:
Type | Description |
---|---|
RuntimeError
|
If an error occurs during execution. |