Event
Modules concerning the event handling system used throughout the Gen AI applications.
EventEmitter(handlers, event_level=EventLevel.INFO, hooks=None)
Handles events emitting using event handlers with various levels and types.
The EventEmitter class is responsible for handling and emitting events using a list of event handlers.
Events are processed based on their severity level and type, with the option to disable specific handlers.
Attributes:
| Name | Type | Description |
|---|---|---|
handlers |
list[BaseEventHandler]
|
A list of event handlers to process emitted events. |
severity |
int
|
The minimum severity level of events to be processed. |
Examples:
Basic usage:
event_emitter = EventEmitter(handlers=[ConsoleEventHandler()])
await event_emitter.emit("Hello, world!")
Emitting an event object:
event_emitter = EventEmitter(handlers=[ConsoleEventHandler()])
event = Event(id="123", value="Hello, world!", level=EventLevel.INFO, type="object", metadata={})
await event_emitter.emit(event)
Using with hooks:
event_emitter = EventEmitter(handlers=[ConsoleEventHandler()], hooks=[JSONStringifyEventHook()])
await event_emitter.emit("Hello, world!")
Using with customized event level:
event_emitter = EventEmitter(handlers=[ConsoleEventHandler()], event_level=EventLevel.DEBUG)
await event_emitter.emit("Hello, world!")
Using with console handler:
event_emitter = EventEmitter.with_console_handler()
await event_emitter.emit("Hello, world!")
Using with print handler:
event_emitter = EventEmitter.with_print_handler()
await event_emitter.emit("Hello, world!")
Using with stream handler:
async def function(event_emitter: EventEmitter):
await event_emitter.emit("Hello, world!")
event_emitter = EventEmitter.with_stream_handler()
asyncio.create_task(function(event_emitter))
async for event in event_emitter.stream():
print(event)
Initializes a new instance of the EventEmitter class.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
handlers |
list[BaseEventHandler]
|
A list of event handlers to process emitted events. |
required |
event_level |
EventLevel
|
The minimum severity level of events to be processed. Defaults to EventLevel.INFO. |
INFO
|
hooks |
list[BaseEventHook] | None
|
A list of event hooks to be applied to the emitted events. Defaults to None, in which case the event emitter will not apply any hooks. |
None
|
Raises:
| Type | Description |
|---|---|
ValueError
|
|
close()
async
Closes all handlers in the handler list.
This method iterates through the list of handlers and calls the close method on each handler.
Returns:
| Type | Description |
|---|---|
None
|
None |
emit(value, event_level=EventLevel.DEBUG, event_type=EventType.STATUS.value, metadata=None, event_id=None, disabled_handlers=None)
async
Emits an event using the configured event handlers.
If the value is an Event object, it is used directly and other parameters are ignored. Otherwise, a new Event object is created with the provided parameters. The event is then passed to all handlers except those listed in disabled_handlers. Events are only processed if their severity level meets or exceeds the EventEmitter's configured level.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
value |
str | dict[str, Any] | Event
|
The event value or Event object to emit. If an Event object is provided, other parameters are ignored and the Event is used as-is. |
required |
event_level |
EventLevel
|
The severity level of the event. Defaults to EventLevel.DEBUG. |
DEBUG
|
event_type |
str
|
The type of event (e.g., "status", "response"). Defaults to EventType.STATUS.value. |
value
|
metadata |
dict[str, Any] | None
|
Additional metadata for the event. Defaults to None, which creates an empty dictionary. |
None
|
event_id |
str | None
|
Unique identifier for the event. Defaults to None, which creates an empty string. |
None
|
disabled_handlers |
list[str] | None
|
Names of handlers to skip for this event. Defaults to None. |
None
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If the provided event_level is not a valid EventLevel. |
stream()
Streams events from the StreamEventHandler.
This method is a convenience method that calls the stream method on the StreamEventHandler.
To use this method, the event emitter must have exactly one StreamEventHandler.
Returns:
| Name | Type | Description |
|---|---|---|
AsyncGenerator |
AsyncGenerator
|
An asynchronous generator yielding events from the StreamEventHandler. |
with_console_handler(event_level=EventLevel.INFO, hooks=None)
classmethod
Creates a new instance of the EventEmitter class with a single ConsoleEventHandler.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event_level |
EventLevel
|
The minimum severity level of events to be processed. Defaults to EventLevel.INFO. |
INFO
|
hooks |
list[BaseEventHook] | None
|
A list of event hooks to be applied to the emitted events. Defaults to None, in which case the event emitter will not apply any hooks. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
EventEmitter |
EventEmitter
|
A new instance of the EventEmitter class with a single ConsoleEventHandler. |
with_print_handler(event_level=EventLevel.INFO, hooks=None)
classmethod
Creates a new instance of the EventEmitter class with a single PrintEventHandler.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event_level |
EventLevel
|
The minimum severity level of events to be processed. Defaults to EventLevel.INFO. |
INFO
|
hooks |
list[BaseEventHook] | None
|
A list of event hooks to be applied to the emitted events. Defaults to None, in which case the event emitter will not apply any hooks. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
EventEmitter |
EventEmitter
|
A new instance of the EventEmitter class with a single PrintEventHandler. |
with_stream_handler(event_level=EventLevel.INFO, hooks=None)
classmethod
Creates a new instance of the EventEmitter class with a single StreamEventHandler.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event_level |
EventLevel
|
The minimum severity level of events to be processed. Defaults to EventLevel.INFO. |
INFO
|
hooks |
list[BaseEventHook] | None
|
A list of event hooks to be applied to the emitted events. Defaults to None, in which case the event emitter will not apply any hooks. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
EventEmitter |
EventEmitter
|
A new instance of the EventEmitter class with a single StreamEventHandler. |
Messenger(message, is_template=True)
Bases: Component
Emits a custom event message with possible access to the state variables.
This component acts as an intermediary step, designed to be placed between other pipeline steps. It allows for event messaging operations to be performed outside individual components but still within the context of the pipeline execution.
Attributes:
| Name | Type | Description |
|---|---|---|
message |
str
|
The message to be sent, may contain placeholders enclosed in curly braces |
is_template |
bool
|
Whether the message is a template that can be injected with state variables. Defaults to True. |
variable_keys |
list[str]
|
The keys of the message that can be injected with state variables.
Only used if |
Plain string message example:
event_emitter = EventEmitter(handlers=[ConsoleEventHandler()])
kwargs = {"event_emitter": event_emitter}
messenger = Messenger("Executing component.", is_template=False)
await messenger.run(**kwargs)
Template message example:
event_emitter = EventEmitter(handlers=[ConsoleEventHandler()])
state_variables = {"query": "Hi!", "top_k": 10}
kwargs = {"event_emitter": event_emitter, "state_variables": state_variables}
messenger = Messenger("Executing component for query {query} and top k {top_k}.")
await messenger.run(**kwargs)
Initializes a new instance of the Messenger class.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message |
str
|
The message to be sent, may contain placeholders enclosed in curly braces |
required |
is_template |
bool
|
Whether the message is a template that can be injected with state variables. Defaults to True. |
True
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If the keys of the message does not match the provided keys. |
send_message(event_emitter, state_variables=None, emit_kwargs=None)
async
Emits the message to the event emitter.
This method validates the variables, formats the message if required, and then emits the message using the event emitter.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event_emitter |
EventEmitter
|
The event emitter instance to emit the message. |
required |
state_variables |
dict[str, Any] | None
|
The state variables to be injected into the message
placeholders. Can only be provided if |
None
|
emit_kwargs |
dict[str, Any] | None
|
The keyword arguments to be passed to the event emitter's emit method. Defaults to None. |
None
|