Skip to content

Map reduce step

A pipeline step that applies a map function to multiple inputs and reduces the results.

Authors

Kadek Denaya (kadek.d.r.diana@gdplabs.id)

References

NONE

MapReduceStep(name, input_state_map, output_state, map_func, reduce_func=lambda results: results, runtime_config_map=None, fixed_args=None)

Bases: BasePipelineStep, HasInputsMixin

A step that applies a mapping function to multiple inputs and reduces the results.

This step performs parallel processing of multiple input items using: 1. A map function that processes each input item independently. The map function receives a dictionary containing the input values for the current item (derived from input_state_map, runtime_config_map, and fixed_args). 2. A reduce function that combines all the mapped results.

Note on parallel execution: 1. For true parallelism, the map_func MUST be an async function or a Component. 2. Synchronous map functions will block the event loop and run sequentially.

Input handling: 1. Automatically detects which inputs are lists/sequences. 2. Ensures all list inputs have the same length. 3. Broadcasts scalar values to match list lengths. 4. If no list inputs, applies the map function once to the whole input.

Internally, this step uses asyncio.gather() for efficient parallel execution.

Attributes:

Name Type Description
name str

A unique identifier for this step.

map_func Component | Callable[[dict[str, Any]], Any]

Function to apply to each input item. Will be run in parallel if the function is an asynchronous function.

reduce_func Callable[[list[Any]], Any]

Function to reduce the mapped results.

input_state_map dict[str, str]

Mapping of function arguments to pipeline state keys.

output_state str

Key to store the reduced result in the pipeline state.

runtime_config_map dict[str, str] | None

Mapping of function arguments to runtime config keys.

fixed_args dict[str, Any] | None

Fixed arguments to pass to the functions.

Initialize a new MapReduceStep.

Parameters:

Name Type Description Default
name str

A unique identifier for this step.

required
input_state_map dict[str, str]

Mapping of function arguments to pipeline state keys.

required
output_state str

Key to store the reduced result in the pipeline state.

required
map_func Component | Callable[[dict[str, Any]], Any]

Function to apply to each input item. The map function receives a dictionary containing the input values derived from input_state_map, runtime_config_map, and fixed_args.

required
reduce_func Callable[[list[Any]], Any]

Function to reduce the mapped results. Defaults to a function that returns the list of results as is.

lambda results: results
runtime_config_map dict[str, str] | None

Mapping of arguments to runtime config keys. Defaults to None.

None
fixed_args dict[str, Any] | None

Fixed arguments to pass to the functions. Defaults to None.

None

execute(state, config) async

Execute the map and reduce operations.

Parameters:

Name Type Description Default
state dict[str, Any]

The current state of the pipeline.

required
config RunnableConfig

Runtime configuration.

required

Returns:

Type Description
dict[str, Any]

dict[str, Any]: The reduced result stored under output_state.

Raises:

Type Description
RuntimeError

If an error occurs during execution.