Skip to content

Event

Modules concerning the event handling system used throughout the Gen AI applications.

EventEmitter(handlers, event_level=EventLevel.INFO, hooks=None)

Handles events emitting using event handlers with various levels and types.

The EventEmitter class is responsible for handling and emitting events using a list of event handlers. Events are processed based on their severity level and type, with the option to disable specific handlers.

Attributes:

Name Type Description
handlers list[BaseEventHandler]

A list of event handlers to process emitted events.

severity int

The minimum severity level of events to be processed.

Examples:

Basic usage:

event_emitter = EventEmitter(handlers=[ConsoleEventHandler()])
await event_emitter.emit("Hello, world!")

Emitting an event object:

event_emitter = EventEmitter(handlers=[ConsoleEventHandler()])
event = Event(id="123", value="Hello, world!", level=EventLevel.INFO, type="object", metadata={})
await event_emitter.emit(event)

Using with hooks:

event_emitter = EventEmitter(handlers=[ConsoleEventHandler()], hooks=[JSONStringifyEventHook()])
await event_emitter.emit("Hello, world!")

Using with customized event level:

event_emitter = EventEmitter(handlers=[ConsoleEventHandler()], event_level=EventLevel.DEBUG)
await event_emitter.emit("Hello, world!")

Using with console handler:

event_emitter = EventEmitter.with_console_handler()
await event_emitter.emit("Hello, world!")

Using with print handler:

event_emitter = EventEmitter.with_print_handler()
await event_emitter.emit("Hello, world!")

Using with stream handler:

async def function(event_emitter: EventEmitter):
    await event_emitter.emit("Hello, world!")

event_emitter = EventEmitter.with_stream_handler()
asyncio.create_task(function(event_emitter))
async for event in event_emitter.stream():
    print(event)

Initializes a new instance of the EventEmitter class.

Parameters:

Name Type Description Default
handlers list[BaseEventHandler]

A list of event handlers to process emitted events.

required
event_level EventLevel

The minimum severity level of events to be processed. Defaults to EventLevel.INFO.

INFO
hooks list[BaseEventHook] | None

A list of event hooks to be applied to the emitted events. Defaults to None, in which case the event emitter will not apply any hooks.

None

Raises:

Type Description
ValueError
  1. If the event handlers list is empty.
  2. If an invalid event level is provided.

close() async

Closes all handlers in the handler list.

This method iterates through the list of handlers and calls the close method on each handler.

Returns:

Type Description
None

None

emit(value, event_level=EventLevel.DEBUG, event_type=EventType.STATUS.value, metadata=None, event_id=None, disabled_handlers=None) async

Emits an event using the configured event handlers.

If the value is an Event object, it is used directly and other parameters are ignored. Otherwise, a new Event object is created with the provided parameters. The event is then passed to all handlers except those listed in disabled_handlers. Events are only processed if their severity level meets or exceeds the EventEmitter's configured level.

Parameters:

Name Type Description Default
value str | dict[str, Any] | Event

The event value or Event object to emit. If an Event object is provided, other parameters are ignored and the Event is used as-is.

required
event_level EventLevel

The severity level of the event. Defaults to EventLevel.DEBUG.

DEBUG
event_type str

The type of event (e.g., "status", "response"). Defaults to EventType.STATUS.value.

value
metadata dict[str, Any] | None

Additional metadata for the event. Defaults to None, which creates an empty dictionary.

None
event_id str | None

Unique identifier for the event. Defaults to None, which creates an empty string.

None
disabled_handlers list[str] | None

Names of handlers to skip for this event. Defaults to None.

None

Raises:

Type Description
ValueError

If the provided event_level is not a valid EventLevel.

stream()

Streams events from the StreamEventHandler.

This method is a convenience method that calls the stream method on the StreamEventHandler. To use this method, the event emitter must have exactly one StreamEventHandler.

Returns:

Name Type Description
AsyncGenerator AsyncGenerator

An asynchronous generator yielding events from the StreamEventHandler.

with_console_handler(event_level=EventLevel.INFO, hooks=None) classmethod

Creates a new instance of the EventEmitter class with a single ConsoleEventHandler.

Parameters:

Name Type Description Default
event_level EventLevel

The minimum severity level of events to be processed. Defaults to EventLevel.INFO.

INFO
hooks list[BaseEventHook] | None

A list of event hooks to be applied to the emitted events. Defaults to None, in which case the event emitter will not apply any hooks.

None

Returns:

Name Type Description
EventEmitter EventEmitter

A new instance of the EventEmitter class with a single ConsoleEventHandler.

with_print_handler(event_level=EventLevel.INFO, hooks=None) classmethod

Creates a new instance of the EventEmitter class with a single PrintEventHandler.

Parameters:

Name Type Description Default
event_level EventLevel

The minimum severity level of events to be processed. Defaults to EventLevel.INFO.

INFO
hooks list[BaseEventHook] | None

A list of event hooks to be applied to the emitted events. Defaults to None, in which case the event emitter will not apply any hooks.

None

Returns:

Name Type Description
EventEmitter EventEmitter

A new instance of the EventEmitter class with a single PrintEventHandler.

with_stream_handler(event_level=EventLevel.INFO, hooks=None) classmethod

Creates a new instance of the EventEmitter class with a single StreamEventHandler.

Parameters:

Name Type Description Default
event_level EventLevel

The minimum severity level of events to be processed. Defaults to EventLevel.INFO.

INFO
hooks list[BaseEventHook] | None

A list of event hooks to be applied to the emitted events. Defaults to None, in which case the event emitter will not apply any hooks.

None

Returns:

Name Type Description
EventEmitter EventEmitter

A new instance of the EventEmitter class with a single StreamEventHandler.

Messenger(message, is_template=True)

Bases: Component

Emits a custom event message with possible access to the state variables.

This component acts as an intermediary step, designed to be placed between other pipeline steps. It allows for event messaging operations to be performed outside individual components but still within the context of the pipeline execution.

Attributes:

Name Type Description
message str

The message to be sent, may contain placeholders enclosed in curly braces {}.

is_template bool

Whether the message is a template that can be injected with state variables. Defaults to True.

variable_keys list[str]

The keys of the message that can be injected with state variables. Only used if is_template is set to True.

Plain string message example:

event_emitter = EventEmitter(handlers=[ConsoleEventHandler()])
kwargs = {"event_emitter": event_emitter}

messenger = Messenger("Executing component.", is_template=False)
await messenger.run(**kwargs)

Template message example:

event_emitter = EventEmitter(handlers=[ConsoleEventHandler()])
state_variables = {"query": "Hi!", "top_k": 10}
kwargs = {"event_emitter": event_emitter, "state_variables": state_variables}

messenger = Messenger("Executing component for query {query} and top k {top_k}.")
await messenger.run(**kwargs)

Initializes a new instance of the Messenger class.

Parameters:

Name Type Description Default
message str

The message to be sent, may contain placeholders enclosed in curly braces {}.

required
is_template bool

Whether the message is a template that can be injected with state variables. Defaults to True.

True

Raises:

Type Description
ValueError

If the keys of the message does not match the provided keys.

send_message(event_emitter, state_variables=None, emit_kwargs=None) async

Emits the message to the event emitter.

This method validates the variables, formats the message if required, and then emits the message using the event emitter.

Parameters:

Name Type Description Default
event_emitter EventEmitter

The event emitter instance to emit the message.

required
state_variables dict[str, Any] | None

The state variables to be injected into the message placeholders. Can only be provided if is_template is set to True. Defaults to None.

None
emit_kwargs dict[str, Any] | None

The keyword arguments to be passed to the event emitter's emit method. Defaults to None.

None