Skip to content

Map reduce step

A pipeline step that applies a map function to multiple inputs and reduces the results.

Authors

Kadek Denaya (kadek.d.r.diana@gdplabs.id)

References

NONE

MapReduceStep(name, output_state, map_func, reduce_func=lambda results: results, input_state_map=None, runtime_config_map=None, fixed_args=None, input_map=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None)

Bases: BasePipelineStep, HasInputsMixin

A step that applies a mapping function to multiple inputs and reduces the results.

This step performs parallel processing of multiple input items using: 1. A map function that processes each input item independently. The map function receives a dictionary containing the input values for the current item (derived from input_state_map, runtime_config_map, and fixed_args). 2. A reduce function that combines all the mapped results.

Note on parallel execution: 1. For true parallelism, the map_func MUST be an async function or a Component. 2. Synchronous map functions will block the event loop and run sequentially.

Input handling: 1. Automatically detects which inputs are lists/sequences. 2. Ensures all list inputs have the same length. 3. Broadcasts scalar values to match list lengths. 4. If no list inputs, applies the map function once to the whole input.

Internally, this step uses asyncio.gather() for efficient parallel execution.

Attributes:

Name Type Description
name str

A unique identifier for this step.

map_func Component | Callable[[dict[str, Any]], Any]

Function to apply to each input item. Will be run in parallel if the function is an asynchronous function.

reduce_func Callable[[list[Any]], Any]

Function to reduce the mapped results.

input_map dict[str, str | Any] | None

Unified input map.

output_state str

Key to store the reduced result in the pipeline state.

retry_policy RetryPolicy | None

Configuration for retry behavior using LangGraph's RetryPolicy.

Initialize a new MapReduceStep.

Parameters:

Name Type Description Default
name str

A unique identifier for this step.

required
output_state str

Key to store the reduced result in the pipeline state.

required
map_func Component | Callable[[dict[str, Any]], Any]

Function to apply to each input item. The map function receives a dictionary containing the input values derived from input_state_map, runtime_config_map, and fixed_args.

required
reduce_func Callable[[list[Any]], Any]

Function to reduce the mapped results. Defaults to a function that returns the list of results as is.

lambda results: results
input_state_map dict[str, str] | None

Mapping of function arguments to pipeline state keys. Defaults to None.

None
runtime_config_map dict[str, str] | None

Mapping of arguments to runtime config keys. Defaults to None.

None
fixed_args dict[str, Any] | None

Fixed arguments to pass to the functions. Defaults to None.

None
input_map InputMapSpec | None

Unified input map. Can be a dict (arg -> str|Val) or a list with elements: 1. str for identity mapping 2. dict[str, str] for state/config mapping 3. dict[str, Val] for fixed args. If provided, it will be used directly instead of synthesizing from maps. Defaults to None.

None
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied.

None
error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used.

None
cache_store 'BaseCache' | None

Cache store to be used for caching. Defaults to None, in which case no cache store is used.

None
cache_config dict[str, Any] | None

Cache configuration to be used for caching. Defaults to None, in which case no cache configuration is used.

None

execute(state, runtime) async

Execute the map and reduce operations.

Parameters:

Name Type Description Default
state dict[str, Any]

The current state of the pipeline.

required
runtime Runtime[dict[str, Any] | BaseModel]

Runtime information for this step's execution.

required

Returns:

Type Description
dict[str, Any]

dict[str, Any]: The reduced result stored under output_state.

Raises:

Type Description
RuntimeError

If an error occurs during execution.