Skip to content

Map reduce step

A pipeline step that applies a map function to multiple inputs and reduces the results.

MapReduceStep(name, output_state, map_func, reduce_func=lambda results: results, input_map=None, retry_config=None, error_handler=None, cache=None)

Bases: BasePipelineStep, HasInputsMixin

A step that applies a mapping function to multiple inputs and reduces the results.

This step performs parallel processing of multiple input items using: 1. A map function that processes each input item independently. The map function receives a dictionary containing the input values for the current item (derived from input_map). 2. A reduce function that combines all the mapped results.

Note on parallel execution: 1. For true parallelism, the map_func MUST be an async function or a Component. 2. Synchronous map functions will block the event loop and run sequentially.

Input handling: 1. Automatically detects which inputs are lists/sequences. 2. Ensures all list inputs have the same length. 3. Broadcasts scalar values to match list lengths. 4. If no list inputs, applies the map function once to the whole input.

Internally, this step uses asyncio.gather() for efficient parallel execution.

Attributes:

Name Type Description
name str

A unique identifier for this step.

map_func Component | Callable[[dict[str, Any]], Any]

Function to apply to each input item. Will be run in parallel if the function is an asynchronous function.

reduce_func Callable[[list[Any]], Any]

Function to reduce the mapped results.

input_map InputMapSpec | None

Unified input map.

output_state str

Key to store the reduced result in the pipeline state.

retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig.

Initialize a new MapReduceStep.

Parameters:

Name Type Description Default
name str

A unique identifier for this step.

required
output_state str

Key to store the reduced result in the pipeline state.

required
map_func Component | Callable[[dict[str, Any]], Any]

Function to apply to each input item. The map function receives a dictionary containing the input values derived from input_map.

required
reduce_func Callable[[list[Any]], Any]

Function to reduce the mapped results. Defaults to a function that returns the list of results as is.

lambda results: results
input_map InputMapSpec | None

Unified input map. Can be a dict (arg -> str|Val|Group) or a list with elements: 1. str for identity mapping 2. dict[str, str] for state/config mapping 3. dict[str, Val] for fixed args. 4. dict[str, Group] to keep wrapped lookup/literal values intact across fan-out. Defaults to None.

None
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied.

None
error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used.

None
cache CacheConfig | None

Configuration for the cache store including the cache store instance and its settings. Defaults to None, in which case no caching will be used.

None

execute(state, runtime, config=None) async

Execute the map and reduce operations.

Parameters:

Name Type Description Default
state PipelineState

The current state of the pipeline.

required
runtime Runtime

Runtime information for this step's execution.

required
config RunnableConfig | None

The runnable configuration. Defaults to None.

None

Returns:

Type Description
dict[str, Any]

dict[str, Any]: The reduced result stored under output_state.

Raises:

Type Description
RuntimeError

If an error occurs during execution.