Skip to content

rigging.chat

Chats are used pre and post generation to hold messages.

They are the primary way to interact with the generator.

DEFAULT_MAX_ROUNDS = 5 module-attribute #

Maximum number of internal callback rounds to attempt during generation before giving up.

FailMode = t.Literal['raise', 'skip', 'include'] module-attribute #

How to handle failures in pipelines.

  • raise: Raise an exception when a failure is encountered.
  • skip: Ignore the error and do not include the failed chat in the final output.
  • include: Mark the message as failed and include it in the final output.

Chat(messages: Messages, generated: Messages | None = None, generator: t.Optional[Generator] = None, **kwargs: t.Any) #

Bases: BaseModel

Represents a completed chat conversation.

Parameters:

  • messages (Messages) –

    The messages for the chat.

  • generated (Messages | None, default: None ) –

    The next messages for the chat.

  • generator (Optional[Generator], default: None ) –

    The generator associated with this chat.

  • **kwargs (Any, default: {} ) –

    Additional keyword arguments (typically used for deserialization)

Source code in rigging/chat.py
def __init__(
    self,
    messages: Messages,
    generated: Messages | None = None,
    generator: t.Optional[Generator] = None,
    **kwargs: t.Any,
):
    """
    Initialize a Chat object.

    Args:
        messages: The messages for the chat.
        generated: The next messages for the chat.
        generator: The generator associated with this chat.
        **kwargs: Additional keyword arguments (typically used for deserialization)
    """

    if "generator_id" in kwargs and generator is None:
        # TODO: Should we move params to self.params?
        generator = get_generator(kwargs.pop("generator_id"))

    # We can't deserialize an error
    if isinstance(kwargs.get("error"), str):
        kwargs.pop("error")

    super().__init__(
        messages=Message.fit_as_list(messages),
        generated=Message.fit_as_list(generated) if generated is not None else [],
        generator=generator,
        **kwargs,
    )

all: list[Message] property #

Returns all messages in the chat, including the next messages.

conversation: str property #

Returns a string representation of the chat.

error: t.Optional[t.Annotated[Exception, PlainSerializer(lambda x: str(x), return_type=str, when_used='json-unless-none')]] = Field(None, repr=False) class-attribute instance-attribute #

Holds any exception that was caught during the generation pipeline.

extra: dict[str, t.Any] = Field(default_factory=dict, repr=False) class-attribute instance-attribute #

Any additional information from the generation.

failed: bool = Field(False, exclude=False, repr=True) class-attribute instance-attribute #

Indicates whether conditions during generation were not met. This is typically used for graceful error handling when parsing.

generated: list[Message] = Field(default_factory=list) class-attribute instance-attribute #

The list of messages resulting from the generation.

generator: t.Optional[Generator] = Field(None, exclude=True, repr=False) class-attribute instance-attribute #

The generator associated with the chat.

generator_id: str | None property #

The identifier of the generator used to create the chat

last: Message property #

Alias for .all[-1]

message_dicts: list[MessageDict] property #

Returns the chat as a minimal message dictionaries.

Returns:

messages: list[Message] instance-attribute #

The list of messages prior to generation.

metadata: dict[str, t.Any] = Field(default_factory=dict) class-attribute instance-attribute #

Additional metadata for the chat.

next: list[Message] property #

Alias for the .generated property

params: t.Optional[GenerateParams] = Field(None, exclude=True, repr=False) class-attribute instance-attribute #

Any additional generation params used for this chat.

prev: list[Message] property #

Alias for the .messages property

stop_reason: StopReason = Field(default='unknown') class-attribute instance-attribute #

The reason the generation stopped.

timestamp: datetime = Field(default_factory=datetime.now, repr=False) class-attribute instance-attribute #

The timestamp when the chat was created.

usage: t.Optional[Usage] = Field(None, repr=False) class-attribute instance-attribute #

The usage statistics for the generation if available.

uuid: UUID = Field(default_factory=uuid4) class-attribute instance-attribute #

The unique identifier for the chat.

apply(**kwargs: str) -> Chat #

Calls rigging.message.Message.apply on the last message in the chat with the given keyword arguments.

Parameters:

  • **kwargs (str, default: {} ) –

    The string mapping of replacements.

Returns:

  • Chat

    The modified Chat object.

Source code in rigging/chat.py
def apply(self, **kwargs: str) -> Chat:
    """
    Calls [rigging.message.Message.apply][] on the last message in the chat with the given keyword arguments.

    Args:
        **kwargs: The string mapping of replacements.

    Returns:
        The modified Chat object.
    """
    if self.generated:
        self.generated[-1] = self.generated[-1].apply(**kwargs)
    else:
        self.messages[-1] = self.messages[-1].apply(**kwargs)
    return self

apply_to_all(**kwargs: str) -> Chat #

Calls rigging.message.Message.apply on all messages in the chat with the given keyword arguments.

Parameters:

  • **kwargs (str, default: {} ) –

    The string mapping of replacements.

Returns:

  • Chat

    The modified chat object.

Source code in rigging/chat.py
def apply_to_all(self, **kwargs: str) -> Chat:
    """
    Calls [rigging.message.Message.apply][] on all messages in the chat with the given keyword arguments.

    Args:
        **kwargs: The string mapping of replacements.

    Returns:
        The modified chat object.
    """
    self.messages = Message.apply_to_list(self.messages, **kwargs)
    self.generated = Message.apply_to_list(self.generated, **kwargs)
    return self

clone(*, only_messages: bool = False) -> Chat #

Creates a deep copy of the chat.

Parameters:

  • only_messages (bool, default: False ) –

    If True, only the messages will be cloned. If False (default), the entire chat object will be cloned.

Returns:

  • Chat

    A new instance of Chat.

Source code in rigging/chat.py
def clone(self, *, only_messages: bool = False) -> Chat:
    """
    Creates a deep copy of the chat.

    Args:
        only_messages: If True, only the messages will be cloned.
            If False (default), the entire chat object will be cloned.

    Returns:
        A new instance of Chat.
    """
    new = Chat(
        [m.model_copy() for m in self.messages],
        [m.model_copy() for m in self.generated],
        self.generator,
    )
    if not only_messages:
        new.metadata = deepcopy(self.metadata)
        new.params = self.params.model_copy() if self.params is not None else None
        new.stop_reason = self.stop_reason
        new.usage = self.usage.model_copy() if self.usage is not None else None
        new.extra = deepcopy(self.extra)
        new.failed = self.failed
        new.error = self.error
    return new

continue_(messages: t.Sequence[Message] | t.Sequence[MessageDict] | Message | str) -> ChatPipeline #

Alias for the rigging.chat.Chat.fork with include_all=True.

Source code in rigging/chat.py
def continue_(self, messages: t.Sequence[Message] | t.Sequence[MessageDict] | Message | str) -> ChatPipeline:
    """Alias for the [rigging.chat.Chat.fork][] with `include_all=True`."""
    return self.fork(messages, include_all=True)

fork(messages: t.Sequence[Message] | t.Sequence[MessageDict] | Message | MessageDict | str, *, include_all: bool = False) -> ChatPipeline #

Forks the chat by creating calling rigging.chat.Chat.restart and appending the specified messages.

Parameters:

  • messages (Sequence[Message] | Sequence[MessageDict] | Message | MessageDict | str) –

    The messages to be added to the new ChatPipeline instance.

  • include_all (bool, default: False ) –

    Whether to include the next messages in the restarted chat.

Returns:

  • ChatPipeline

    A new instance of ChatPipeline with the specified messages added.

Source code in rigging/chat.py
def fork(
    self,
    messages: t.Sequence[Message] | t.Sequence[MessageDict] | Message | MessageDict | str,
    *,
    include_all: bool = False,
) -> ChatPipeline:
    """
    Forks the chat by creating calling [rigging.chat.Chat.restart][] and appending the specified messages.

    Args:
        messages:
            The messages to be added to the new `ChatPipeline` instance.
        include_all: Whether to include the next messages in the restarted chat.

    Returns:
        A new instance of `ChatPipeline` with the specified messages added.

    """
    return self.restart(include_all=include_all).add(messages)

inject_system_content(content: str) -> Message #

Injects content into the chat as a system message.

Note

If the chat is empty or the first message is not a system message, a new system message with the given content is inserted at the beginning of the chat. If the first message is a system message, the content is appended to it.

Parameters:

  • content (str) –

    The content to be injected.

Returns:

  • Message

    The updated system message.

Source code in rigging/chat.py
def inject_system_content(self, content: str) -> Message:
    """
    Injects content into the chat as a system message.

    Note:
        If the chat is empty or the first message is not a system message,
        a new system message with the given content is inserted at the beginning of the chat.
        If the first message is a system message, the content is appended to it.

    Args:
        content: The content to be injected.

    Returns:
        The updated system message.
    """
    if len(self.messages) == 0 or self.messages[0].role != "system":
        self.messages.insert(0, Message(role="system", content=content))
    elif self.messages[0].role == "system":
        self.messages[0].content += "\n\n" + content
    return self.messages[0]

inject_tool_prompt(tools: t.Sequence[Tool]) -> None #

Injects a default tool use prompt into the system prompt.

Parameters:

  • tools (Sequence[Tool]) –

    A sequence of Tool objects.

Source code in rigging/chat.py
def inject_tool_prompt(self, tools: t.Sequence[Tool]) -> None:
    """
    Injects a default tool use prompt into the system prompt.

    Args:
        tools: A sequence of Tool objects.
    """
    call_format = ToolCalls.xml_example()
    tool_description_list = ToolDescriptionList(tools=[t.get_description() for t in tools])
    tool_system_prompt = system_tool_extension(call_format, tool_description_list.to_pretty_xml())
    self.inject_system_content(tool_system_prompt)

meta(**kwargs: t.Any) -> Chat #

Updates the metadata of the chat with the provided key-value pairs.

Parameters:

  • **kwargs (Any, default: {} ) –

    Key-value pairs representing the metadata to be updated.

Returns:

  • Chat

    The updated chat object.

Source code in rigging/chat.py
def meta(self, **kwargs: t.Any) -> Chat:
    """
    Updates the metadata of the chat with the provided key-value pairs.

    Args:
        **kwargs: Key-value pairs representing the metadata to be updated.

    Returns:
        The updated chat object.
    """
    self.metadata.update(kwargs)
    return self

restart(*, generator: t.Optional[Generator] = None, include_all: bool = False) -> ChatPipeline #

Attempt to convert back to a ChatPipeline for further generation.

Parameters:

  • generator (Optional[Generator], default: None ) –

    The generator to use for the restarted chat. Otherwise the generator from the original ChatPipeline will be used.

  • include_all (bool, default: False ) –

    Whether to include the next messages in the restarted chat.

Returns:

Raises:

  • ValueError

    If the chat was not created with a ChatPipeline and no generator is provided.

Source code in rigging/chat.py
def restart(self, *, generator: t.Optional[Generator] = None, include_all: bool = False) -> ChatPipeline:
    """
    Attempt to convert back to a ChatPipeline for further generation.

    Args:
        generator: The generator to use for the restarted chat. Otherwise
            the generator from the original ChatPipeline will be used.
        include_all: Whether to include the next messages in the restarted chat.

    Returns:
        The restarted chat.

    Raises:
        ValueError: If the chat was not created with a ChatPipeline and no generator is provided.
    """
    messages = self.all if include_all else self.messages
    if generator is None:
        generator = self.generator
    if generator is None:
        raise ValueError("Cannot restart a chat without an associated generator")
    return generator.chat(messages, self.params)

strip(model_type: type[Model], fail_on_missing: bool = False) -> Chat #

Strips all parsed parts of a particular type from the message content.

Parameters:

  • model_type (type[Model]) –

    The type of model to keep in the chat.

  • fail_on_missing (bool, default: False ) –

    Whether to raise an exception if a message of the specified model type is not found.

Returns:

  • Chat

    A new Chat object with only the messages of the specified model type.

Source code in rigging/chat.py
def strip(self, model_type: type[Model], fail_on_missing: bool = False) -> Chat:
    """
    Strips all parsed parts of a particular type from the message content.

    Args:
        model_type: The type of model to keep in the chat.
        fail_on_missing: Whether to raise an exception if a message of the specified model type is not found.

    Returns:
        A new Chat object with only the messages of the specified model type.
    """
    new = self.clone()
    for message in new.all:
        message.strip(model_type, fail_on_missing=fail_on_missing)
    return new

to_df() -> t.Any #

Converts the chat to a Pandas DataFrame.

See rigging.data.chats_to_df for more information.

Returns:

  • Any

    The chat as a DataFrame.

Source code in rigging/chat.py
def to_df(self) -> t.Any:
    """
    Converts the chat to a Pandas DataFrame.

    See [rigging.data.chats_to_df][] for more information.

    Returns:
        The chat as a DataFrame.
    """
    # Late import for circular
    from rigging.data import chats_to_df

    return chats_to_df(self)

to_elastic(index: str, client: AsyncElasticsearch, *, op_type: ElasticOpType = 'index', create_index: bool = True, **kwargs: t.Any) -> int async #

Converts the chat data to Elasticsearch format and indexes it.

See rigging.data.chats_to_elastic for more information.

Returns:

  • int

    The number of chats indexed.

Source code in rigging/chat.py
async def to_elastic(
    self,
    index: str,
    client: AsyncElasticsearch,
    *,
    op_type: ElasticOpType = "index",
    create_index: bool = True,
    **kwargs: t.Any,
) -> int:
    """
    Converts the chat data to Elasticsearch format and indexes it.

    See [rigging.data.chats_to_elastic][] for more information.

    Returns:
        The number of chats indexed.
    """
    from rigging.data import chats_to_elastic

    return await chats_to_elastic(self, index, client, op_type=op_type, create_index=create_index, **kwargs)

ChatList #

Bases: list[Chat]

Represents a list of chat objects.

Inherits from the built-in list class and is specialized for storing Chat objects.

to_df() -> t.Any #

Converts the chat list to a Pandas DataFrame.

See rigging.data.chats_to_df for more information.

Returns:

  • Any

    The chat list as a DataFrame.

Source code in rigging/chat.py
def to_df(self) -> t.Any:
    """
    Converts the chat list to a Pandas DataFrame.

    See [rigging.data.chats_to_df][] for more information.

    Returns:
        The chat list as a DataFrame.
    """
    # Late import for circular
    from rigging.data import chats_to_df

    return chats_to_df(self)

to_elastic(index: str, client: AsyncElasticsearch, *, op_type: ElasticOpType = 'index', create_index: bool = True, **kwargs: t.Any) -> int async #

Converts the chat list to Elasticsearch format and indexes it.

See rigging.data.chats_to_elastic for more information.

Returns:

  • int

    The number of chats indexed.

Source code in rigging/chat.py
async def to_elastic(
    self,
    index: str,
    client: AsyncElasticsearch,
    *,
    op_type: ElasticOpType = "index",
    create_index: bool = True,
    **kwargs: t.Any,
) -> int:
    """
    Converts the chat list to Elasticsearch format and indexes it.

    See [rigging.data.chats_to_elastic][] for more information.

    Returns:
        The number of chats indexed.
    """
    from rigging.data import chats_to_elastic

    return await chats_to_elastic(self, index, client, op_type=op_type, create_index=create_index, **kwargs)

to_json() -> list[dict[str, t.Any]] #

Helper to convert the chat list to a list of dictionaries.

Source code in rigging/chat.py
def to_json(self) -> list[dict[str, t.Any]]:
    """
    Helper to convert the chat list to a list of dictionaries.
    """
    return [chat.model_dump() for chat in self]

ChatPipeline(generator: Generator, messages: t.Sequence[Message], *, params: t.Optional[GenerateParams] = None, watch_callbacks: t.Optional[list[WatchChatCallback]] = None) #

Pipeline to manipulate and produce chats.

Source code in rigging/chat.py
def __init__(
    self,
    generator: Generator,
    messages: t.Sequence[Message],
    *,
    params: t.Optional[GenerateParams] = None,
    watch_callbacks: t.Optional[list[WatchChatCallback]] = None,
):
    self.generator: Generator = generator
    """The generator object responsible for generating the chat."""
    self.chat: Chat = Chat(messages)
    """The chat object representing the conversation."""
    self.params = params
    """The parameters for generating messages."""
    self.metadata: dict[str, t.Any] = {}
    """Additional metadata associated with the chat."""
    self.errors_to_fail_on: set[type[Exception]] = set()
    """
    The list of exceptions to catch during generation if you are including or skipping failures.

    ExhuastedMaxRounds is implicitly included.
    """
    self.errors_to_exclude: set[type[Exception]] = set()
    """The list of exceptions to exclude from the catch list."""
    self.on_failed: FailMode = "raise"
    """How to handle failures in the pipeline unless overriden in calls."""

    # (callback, attempt_recovery, drop_dialog, max_rounds)
    self.until_callbacks: list[tuple[UntilMessageCallback, bool, bool, int]] = []
    self.until_types: list[type[Model]] = []
    self.until_tools: list[Tool] = []
    self.inject_tool_prompt: bool = True
    self.force_tool: bool = False
    self.then_callbacks: list[ThenChatCallback] = []
    self.map_callbacks: list[MapChatCallback] = []
    self.watch_callbacks: list[WatchChatCallback] = watch_callbacks or []

chat: Chat = Chat(messages) instance-attribute #

The chat object representing the conversation.

errors_to_exclude: set[type[Exception]] = set() instance-attribute #

The list of exceptions to exclude from the catch list.

errors_to_fail_on: set[type[Exception]] = set() instance-attribute #

The list of exceptions to catch during generation if you are including or skipping failures.

ExhuastedMaxRounds is implicitly included.

generator: Generator = generator instance-attribute #

The generator object responsible for generating the chat.

metadata: dict[str, t.Any] = {} instance-attribute #

Additional metadata associated with the chat.

on_failed: FailMode = 'raise' instance-attribute #

How to handle failures in the pipeline unless overriden in calls.

params = params instance-attribute #

The parameters for generating messages.

add(messages: t.Sequence[MessageDict] | t.Sequence[Message] | MessageDict | Message | str) -> ChatPipeline #

Appends new message(s) to the internal chat before generation.

Note

If the last message in the chat is the same role as the first new message, the content will be appended. instead of a new message being created.

Parameters:

  • messages (Sequence[MessageDict] | Sequence[Message] | MessageDict | Message | str) –

    The messages to be added to the chat. It can be a single message or a sequence of messages.

Returns:

Source code in rigging/chat.py
def add(
    self, messages: t.Sequence[MessageDict] | t.Sequence[Message] | MessageDict | Message | str
) -> ChatPipeline:
    """
    Appends new message(s) to the internal chat before generation.

    Note:
        If the last message in the chat is the same role as the first new message,
        the content will be appended. instead of a new message being created.

    Args:
        messages: The messages to be added to the chat. It can be a single message or a sequence of messages.

    Returns:
        The updated ChatPipeline object.
    """
    message_list = Message.fit_as_list(messages)
    # If the last message is the same role as the first new message, append to it
    if self.chat.all and self.chat.all[-1].role == message_list[0].role:
        self.chat.all[-1].content += "\n" + message_list[0].content
        message_list = message_list[1:]
    else:
        self.chat.generated += message_list
    return self

apply(**kwargs: str) -> ChatPipeline #

Clones this chat pipeline and calls rigging.chat.Chat.apply with the given keyword arguments.

Parameters:

  • **kwargs (str, default: {} ) –

    Keyword arguments to be applied to the chat.

Returns:

  • ChatPipeline

    A new instance of ChatPipeline with the applied arguments.

Source code in rigging/chat.py
def apply(self, **kwargs: str) -> ChatPipeline:
    """
    Clones this chat pipeline and calls [rigging.chat.Chat.apply][] with the given keyword arguments.

    Args:
        **kwargs: Keyword arguments to be applied to the chat.

    Returns:
        A new instance of ChatPipeline with the applied arguments.
    """
    new = self.clone()
    new.chat.apply(**kwargs)
    return new

apply_to_all(**kwargs: str) -> ChatPipeline #

Clones this chat pipeline and calls rigging.chat.Chat.apply_to_all with the given keyword arguments.

Parameters:

  • **kwargs (str, default: {} ) –

    Keyword arguments to be applied to the chat.

Returns:

  • ChatPipeline

    A new instance of ChatPipeline with the applied arguments.

Source code in rigging/chat.py
def apply_to_all(self, **kwargs: str) -> ChatPipeline:
    """
    Clones this chat pipeline and calls [rigging.chat.Chat.apply_to_all][] with the given keyword arguments.

    Args:
        **kwargs: Keyword arguments to be applied to the chat.

    Returns:
        A new instance of ChatPipeline with the applied arguments.
    """
    new = self.clone()
    new.chat.apply_to_all(**kwargs)
    return new

catch(*errors: type[Exception], on_failed: FailMode | None = None, exclude: list[type[Exception]] | None = None) -> ChatPipeline #

Adds exceptions to catch during generation when including or skipping failures.

Parameters:

  • *errors (type[Exception], default: () ) –

    The exception types to catch.

  • on_failed (FailMode | None, default: None ) –

    How to handle failures in the pipeline unless overriden in calls.

Returns:

Source code in rigging/chat.py
def catch(
    self, *errors: type[Exception], on_failed: FailMode | None = None, exclude: list[type[Exception]] | None = None
) -> ChatPipeline:
    """
    Adds exceptions to catch during generation when including or skipping failures.

    Args:
        *errors: The exception types to catch.
        on_failed: How to handle failures in the pipeline unless overriden in calls.

    Returns:
        The updated ChatPipeline object.
    """
    self.errors_to_fail_on.update(errors)
    self.errors_to_exclude.update(exclude or [])
    self.on_failed = on_failed or self.on_failed
    return self

clone(*, only_messages: bool = False) -> ChatPipeline #

Creates a clone of the current ChatPipeline instance.

Parameters:

  • only_messages (bool, default: False ) –

    If True, only the messages will be cloned. If False (default), the entire ChatPipeline instance will be cloned including until callbacks, types, tools, metadata, etc.

Returns:

  • ChatPipeline

    A new instance of ChatPipeline that is a clone of the current instance.

Source code in rigging/chat.py
def clone(self, *, only_messages: bool = False) -> ChatPipeline:
    """
    Creates a clone of the current `ChatPipeline` instance.

    Args:
        only_messages: If True, only the messages will be cloned.
            If False (default), the entire `ChatPipeline` instance will be cloned
            including until callbacks, types, tools, metadata, etc.

    Returns:
        A new instance of `ChatPipeline` that is a clone of the current instance.
    """
    new = ChatPipeline(
        self.generator,
        [],
        params=self.params.model_copy() if self.params is not None else None,
        watch_callbacks=self.watch_callbacks,
    )
    new.chat = self.chat.clone()
    if not only_messages:
        new.until_callbacks = self.until_callbacks.copy()
        new.until_types = self.until_types.copy()
        new.until_tools = self.until_tools.copy()
        new.inject_tool_prompt = self.inject_tool_prompt
        new.force_tool = self.force_tool
        new.metadata = deepcopy(self.metadata)
        new.then_callbacks = self.then_callbacks.copy()
        new.map_callbacks = self.map_callbacks.copy()
        new.on_failed = self.on_failed
        new.errors_to_fail_on = self.errors_to_fail_on.copy()
        new.errors_to_exclude = self.errors_to_exclude.copy()
    return new

fork(messages: t.Sequence[MessageDict] | t.Sequence[Message] | MessageDict | Message | str) -> ChatPipeline #

Creates a new instance of ChatPipeline by forking the current chat and adding the specified messages.

This is a convenience method for calling clone().add(messages).

Parameters:

Returns:

  • ChatPipeline

    A new instance the pipeline with the specified messages added.

Source code in rigging/chat.py
def fork(
    self, messages: t.Sequence[MessageDict] | t.Sequence[Message] | MessageDict | Message | str
) -> ChatPipeline:
    """
    Creates a new instance of `ChatPipeline` by forking the current chat and adding the specified messages.

    This is a convenience method for calling `clone().add(messages)`.

    Args:
        messages: A sequence of messages or a single message to be added to the new chat.

    Returns:
        A new instance the pipeline with the specified messages added.
    """
    return self.clone().add(messages)

map(callback: MapChatCallback) -> ChatPipeline #

Registers a callback to be executed after the generation process completes.

Note

You must return a list of Chat objects from the callback which will represent the state of chats for the remainder of the callbacks and the final return of control.

async def process(chats: list[Chat]) -> list[Chat]:
    ...

await pipeline.map(process).run()

Parameters:

Returns:

Source code in rigging/chat.py
def map(self, callback: MapChatCallback) -> ChatPipeline:
    """
    Registers a callback to be executed after the generation process completes.

    Note:
        You must return a list of Chat objects from the callback which will
        represent the state of chats for the remainder of the callbacks and
        the final return of control.

    ```
    async def process(chats: list[Chat]) -> list[Chat]:
        ...

    await pipeline.map(process).run()
    ```

    Args:
        callback: The callback function to be executed.

    Returns:
        The current instance of the chat.
    """
    if not asyncio.iscoroutinefunction(callback):
        raise TypeError(f"Callback '{callback.__name__}' must be an async function")  # type: ignore

    self.map_callbacks.append(callback)
    return self

meta(**kwargs: t.Any) -> ChatPipeline #

Updates the metadata of the chat with the provided key-value pairs.

Parameters:

  • **kwargs (Any, default: {} ) –

    Key-value pairs representing the metadata to be updated.

Returns:

Source code in rigging/chat.py
def meta(self, **kwargs: t.Any) -> ChatPipeline:
    """
    Updates the metadata of the chat with the provided key-value pairs.

    Args:
        **kwargs: Key-value pairs representing the metadata to be updated.

    Returns:
        The updated chat object.
    """
    self.metadata.update(kwargs)
    return self

prompt(func: t.Callable[P, t.Coroutine[None, None, R]]) -> Prompt[P, R] #

Decorator to convert a function into a prompt bound to this pipeline.

See rigging.prompt.prompt for more information.

Parameters:

  • func (Callable[P, Coroutine[None, None, R]]) –

    The function to be converted into a prompt.

Returns:

  • Prompt[P, R]

    The prompt.

Source code in rigging/chat.py
def prompt(self, func: t.Callable[P, t.Coroutine[None, None, R]]) -> Prompt[P, R]:
    """
    Decorator to convert a function into a prompt bound to this pipeline.

    See [rigging.prompt.prompt][] for more information.

    Args:
        func: The function to be converted into a prompt.

    Returns:
        The prompt.
    """
    from rigging.prompt import prompt

    return prompt(func, pipeline=self)

run(*, allow_failed: bool = False, on_failed: FailMode | None = None) -> Chat async #

Execute the generation process to produce the final chat.

Parameters:

  • allow_failed (bool, default: False ) –

    Ignore any errors and potentially return the chat in a failed state.

  • on_failed (FailMode | None, default: None ) –

    The behavior when a message fails to generate. (this is used as an alternative to allow_failed)

Returns:

  • Chat

    The generated Chat.

Source code in rigging/chat.py
async def run(self, *, allow_failed: bool = False, on_failed: FailMode | None = None) -> Chat:
    """
    Execute the generation process to produce the final chat.

    Parameters:
        allow_failed: Ignore any errors and potentially
            return the chat in a failed state.
        on_failed: The behavior when a message fails to generate.
            (this is used as an alternative to allow_failed)

    Returns:
        The generated Chat.
    """
    if on_failed is None:
        on_failed = "include" if allow_failed else self.on_failed

    if on_failed == "skip":
        raise ValueError(
            "Cannot use 'skip' mode with single message generation (pass allow_failed=True or on_failed='include'/'raise')"
        )

    return (await self.run_many(1, on_failed=on_failed))[0]

run_batch(many: t.Sequence[t.Sequence[Message]] | t.Sequence[Message] | t.Sequence[MessageDict] | t.Sequence[str] | MessageDict | str, params: t.Sequence[t.Optional[GenerateParams]] | None = None, *, on_failed: FailMode | None = None) -> ChatList async #

Executes the generation process accross multiple input messages.

Note

Anything already in this chat pipeline will be prepended to the input messages.

Parameters:

  • many (Sequence[Sequence[Message]] | Sequence[Message] | Sequence[MessageDict] | Sequence[str] | MessageDict | str) –

    A sequence of sequences of messages to be generated.

  • params (Sequence[Optional[GenerateParams]] | None, default: None ) –

    A sequence of parameters to be used for each set of messages.

  • on_failed (FailMode | None, default: None ) –

    The behavior when a message fails to generate.

Returns:

  • ChatList

    A list of generatated Chats.

Source code in rigging/chat.py
async def run_batch(
    self,
    many: t.Sequence[t.Sequence[Message]]
    | t.Sequence[Message]
    | t.Sequence[MessageDict]
    | t.Sequence[str]
    | MessageDict
    | str,
    params: t.Sequence[t.Optional[GenerateParams]] | None = None,
    *,
    on_failed: FailMode | None = None,
) -> ChatList:
    """
    Executes the generation process accross multiple input messages.

    Note:
        Anything already in this chat pipeline will be prepended to the input messages.

    Parameters:
        many: A sequence of sequences of messages to be generated.
        params: A sequence of parameters to be used for each set of messages.
        on_failed: The behavior when a message fails to generate.

    Returns:
        A list of generatated Chats.
    """
    on_failed = on_failed or self.on_failed
    states = self._initialize_batch_states(many, params)
    return await self._run(states, on_failed, batch_mode=True)

run_many(count: int, *, params: t.Sequence[t.Optional[GenerateParams]] | None = None, on_failed: FailMode | None = None) -> ChatList async #

Executes the generation process multiple times with the same inputs.

Parameters:

  • count (int) –

    The number of times to execute the generation process.

  • params (Sequence[Optional[GenerateParams]] | None, default: None ) –

    A sequence of parameters to be used for each execution.

  • on_failed (FailMode | None, default: None ) –

    The behavior when a message fails to generate.

Returns:

  • ChatList

    A list of generatated Chats.

Source code in rigging/chat.py
async def run_many(
    self,
    count: int,
    *,
    params: t.Sequence[t.Optional[GenerateParams]] | None = None,
    on_failed: FailMode | None = None,
) -> ChatList:
    """
    Executes the generation process multiple times with the same inputs.

    Parameters:
        count: The number of times to execute the generation process.
        params: A sequence of parameters to be used for each execution.
        on_failed: The behavior when a message fails to generate.

    Returns:
        A list of generatated Chats.
    """
    on_failed = on_failed or self.on_failed
    states = self._initialize_states(count, params)
    return await self._run(states, on_failed)

run_over(*generators: Generator | str, include_original: bool = True, on_failed: FailMode | None = None) -> ChatList async #

Executes the generation process across multiple generators.

For each generator, this pipeline is cloned and the generator is replaced before the run call. All callbacks and parameters are preserved.

Parameters:

  • *generators (Generator | str, default: () ) –

    A sequence of generators to be used for the generation process.

  • include_original (bool, default: True ) –

    Whether to include the original generator in the list of runs.

  • on_failed (FailMode | None, default: None ) –

    The behavior when a message fails to generate.

Returns:

  • ChatList

    A list of generatated Chats.

Source code in rigging/chat.py
async def run_over(
    self, *generators: Generator | str, include_original: bool = True, on_failed: FailMode | None = None
) -> ChatList:
    """
    Executes the generation process across multiple generators.

    For each generator, this pipeline is cloned and the generator is replaced
    before the run call. All callbacks and parameters are preserved.

    Parameters:
        *generators: A sequence of generators to be used for the generation process.
        include_original: Whether to include the original generator in the list of runs.
        on_failed: The behavior when a message fails to generate.

    Returns:
        A list of generatated Chats.
    """
    on_failed = on_failed or self.on_failed

    _generators: list[Generator] = [g if isinstance(g, Generator) else get_generator(g) for g in generators]
    if include_original:
        _generators.append(self.generator)

    coros: list[t.Coroutine[t.Any, t.Any, Chat]] = []
    for generator in _generators:
        sub = self.clone()
        sub.generator = generator
        coros.append(sub.run(allow_failed=(on_failed != "raise")))

    chats = await asyncio.gather(*coros)

    return await self._post_run(chats, on_failed)

run_prompt(prompt: Prompt[P, R], /, *args: P.args, **kwargs: P.kwargs) -> R async #

Calls rigging.prompt.Prompt.run with this pipeline.

Warning

This method is deprecated and will be removed in a future release. Use Prompt.bind(pipeline) instead.

Source code in rigging/chat.py
async def run_prompt(self, prompt: Prompt[P, R], /, *args: P.args, **kwargs: P.kwargs) -> R:
    """
    Calls [rigging.prompt.Prompt.run][] with this pipeline.

    Warning:
        This method is deprecated and will be removed in a future release.
        Use [Prompt.bind(pipeline)][rigging.prompt.Prompt.bind] instead.
    """
    warnings.warn("run_prompt is deprecated, use Prompt.bind(pipeline) instead", DeprecationWarning, stacklevel=2)
    return await prompt.bind(self)(*args, **kwargs)

run_prompt_many(prompt: Prompt[P, R], count: int, /, *args: P.args, **kwargs: P.kwargs) -> list[R] async #

Calls rigging.prompt.Prompt.run_many with this pipeline.

Warning

This method is deprecated and will be removed in a future release. Use Prompt.bind_many(pipeline) instead.

Source code in rigging/chat.py
async def run_prompt_many(self, prompt: Prompt[P, R], count: int, /, *args: P.args, **kwargs: P.kwargs) -> list[R]:
    """
    Calls [rigging.prompt.Prompt.run_many][] with this pipeline.

    Warning:
        This method is deprecated and will be removed in a future release.
        Use [Prompt.bind_many(pipeline)][rigging.prompt.Prompt.bind_many] instead.
    """
    warnings.warn(
        "run_prompt_many is deprecated, use Prompt.bind_many(pipeline) instead",
        DeprecationWarning,
        stacklevel=2,
    )
    return await prompt.bind_many(self)(count, *args, **kwargs)

run_prompt_over(prompt: Prompt[P, R], generators: t.Sequence[Generator | str], /, *args: P.args, **kwargs: P.kwargs) -> list[R] async #

Calls rigging.prompt.Prompt.run_over with this pipeline.

Warning

This method is deprecated and will be removed in a future release. Use Prompt.bind_over(pipeline) instead.

Source code in rigging/chat.py
async def run_prompt_over(
    self, prompt: Prompt[P, R], generators: t.Sequence[Generator | str], /, *args: P.args, **kwargs: P.kwargs
) -> list[R]:
    """
    Calls [rigging.prompt.Prompt.run_over][] with this pipeline.

    Warning:
        This method is deprecated and will be removed in a future release.
        Use [Prompt.bind_over(pipeline)][rigging.prompt.Prompt.bind_over] instead.
    """
    warnings.warn(
        "run_prompt_over is deprecated, use Prompt.bind_over(pipeline) instead",
        DeprecationWarning,
        stacklevel=2,
    )
    return await prompt.bind_over(self)(generators, *args, **kwargs)

then(callback: ThenChatCallback) -> ChatPipeline #

Registers a callback to be executed after the generation process completes.

Note

Returning a Chat object from the callback will replace the current chat. for the remainder of the callbacks + return value of run(). This is optional.

async def process(chat: Chat) -> Chat | None:
    ...

await pipeline.then(process).run()

Parameters:

Returns:

Source code in rigging/chat.py
def then(self, callback: ThenChatCallback) -> ChatPipeline:
    """
    Registers a callback to be executed after the generation process completes.

    Note:
        Returning a Chat object from the callback will replace the current chat.
        for the remainder of the callbacks + return value of `run()`. This is
        optional.

    ```
    async def process(chat: Chat) -> Chat | None:
        ...

    await pipeline.then(process).run()
    ```

    Args:
        callback: The callback function to be executed.

    Returns:
        The current instance of the chat.
    """
    if not asyncio.iscoroutinefunction(callback):
        raise TypeError(f"Callback '{callback.__name__}' must be an async function")  # type: ignore

    self.then_callbacks.append(callback)
    return self

until(callback: UntilMessageCallback, *, attempt_recovery: bool = True, drop_dialog: bool = True, max_rounds: int = DEFAULT_MAX_ROUNDS) -> ChatPipeline #

Registers a callback to participate in validating the generation process.

# Takes the next message being generated, and returns whether or not to continue
# generating new messages in addition to a list of messages to append before continuing

def callback(message: Message) -> tuple[bool, list[Message]]:
    if is_valid(message):
        return (False, [message])
    else:
        return (True, [message, ...])

await pipeline.until(callback).run()
Note

In general, your callback function should always include the message that was passed to it.

Whether these messages get used or discarded in the next round depends on attempt_recovery.

Parameters:

  • callback (UntilMessageCallback) –

    The callback function to be executed.

  • attempt_recovery (bool, default: True ) –

    Whether to attempt recovery by continuing to append prior messages before the next round of generation.

  • drop_dialog (bool, default: True ) –

    Whether to drop the intermediate dialog of recovery before returning the final chat back to the caller.

  • max_rounds (int, default: DEFAULT_MAX_ROUNDS ) –

    The maximum number of rounds to attempt generation + callbacks before giving uop.

Returns:

Source code in rigging/chat.py
def until(
    self,
    callback: UntilMessageCallback,
    *,
    attempt_recovery: bool = True,
    drop_dialog: bool = True,
    max_rounds: int = DEFAULT_MAX_ROUNDS,
) -> ChatPipeline:
    """
    Registers a callback to participate in validating the generation process.

    ```py
    # Takes the next message being generated, and returns whether or not to continue
    # generating new messages in addition to a list of messages to append before continuing

    def callback(message: Message) -> tuple[bool, list[Message]]:
        if is_valid(message):
            return (False, [message])
        else:
            return (True, [message, ...])

    await pipeline.until(callback).run()
    ```

    Note:
        In general, your callback function should always include the message that was passed to it.

        Whether these messages get used or discarded in the next round depends on `attempt_recovery`.

    Args:
        callback: The callback function to be executed.
        attempt_recovery: Whether to attempt recovery by continuing to append prior messages
            before the next round of generation.
        drop_dialog: Whether to drop the intermediate dialog of recovery before returning
            the final chat back to the caller.
        max_rounds: The maximum number of rounds to attempt generation + callbacks
            before giving uop.

    Returns:
        The current instance of the chat.
    """
    self.until_callbacks.append((callback, attempt_recovery, drop_dialog, max_rounds))
    return self

until_parsed_as(*types: type[ModelT], attempt_recovery: bool = False, drop_dialog: bool = True, max_rounds: int = DEFAULT_MAX_ROUNDS) -> ChatPipeline #

Adds the specified types to the list of types which should successfully parse before the generation process completes.

Parameters:

  • *types (type[ModelT], default: () ) –

    The type or types of models to wait for.

  • attempt_recovery (bool, default: False ) –

    Whether to attempt recovery if parsing fails by providing validation feedback to the model before the next round.

  • drop_dialog (bool, default: True ) –

    Whether to drop the intermediate dialog of recovery efforts before returning the final chat to the caller.

  • max_rounds (int, default: DEFAULT_MAX_ROUNDS ) –

    The maximum number of rounds to try to parse successfully.

Returns:

Source code in rigging/chat.py
def until_parsed_as(
    self,
    *types: type[ModelT],
    attempt_recovery: bool = False,
    drop_dialog: bool = True,
    max_rounds: int = DEFAULT_MAX_ROUNDS,
) -> ChatPipeline:
    """
    Adds the specified types to the list of types which should successfully parse
    before the generation process completes.

    Args:
        *types: The type or types of models to wait for.
        attempt_recovery: Whether to attempt recovery if parsing fails by providing
            validation feedback to the model before the next round.
        drop_dialog: Whether to drop the intermediate dialog of recovery efforts
            before returning the final chat to the caller.
        max_rounds: The maximum number of rounds to try to parse successfully.

    Returns:
        The updated ChatPipeline object.
    """
    self.until_types += types
    if next((c for c in self.until_callbacks if c[0] == self._until_parse_callback), None) is None:
        self.until_callbacks.append((self._until_parse_callback, attempt_recovery, drop_dialog, max_rounds))

    return self

using(*tools: Tool, force: bool = False, attempt_recovery: bool = True, drop_dialog: bool = False, max_rounds: int = DEFAULT_MAX_ROUNDS, inject_prompt: bool | None = None) -> ChatPipeline #

Adds a tool or a sequence of tools to participate in the generation process.

Parameters:

  • tools (Tool, default: () ) –

    The tool or sequence of tools to be added.

  • force (bool, default: False ) –

    Whether to force the use of the tool(s) at least once.

  • attempt_recovery (bool, default: True ) –

    Whether to attempt recovery if the tool(s) fail by providing validation feedback to the model before the next round.

  • drop_dialog (bool, default: False ) –

    Whether to drop the intermediate dialog of recovery efforts before returning the final chat to the caller.

  • max_rounds (int, default: DEFAULT_MAX_ROUNDS ) –

    The maximum number of rounds to attempt recovery.

  • inject_prompt (bool | None, default: None ) –

    Whether to inject the tool guidance prompt into a system message.and will override self.inject_tool_prompt if provided.

Returns:

Source code in rigging/chat.py
def using(
    self,
    *tools: Tool,
    force: bool = False,
    attempt_recovery: bool = True,
    drop_dialog: bool = False,
    max_rounds: int = DEFAULT_MAX_ROUNDS,
    inject_prompt: bool | None = None,
) -> ChatPipeline:
    """
    Adds a tool or a sequence of tools to participate in the generation process.

    Args:
        tools: The tool or sequence of tools to be added.
        force: Whether to force the use of the tool(s) at least once.
        attempt_recovery: Whether to attempt recovery if the tool(s) fail by providing
            validation feedback to the model before the next round.
        drop_dialog: Whether to drop the intermediate dialog of recovery efforts
            before returning the final chat to the caller.
        max_rounds: The maximum number of rounds to attempt recovery.
        inject_prompt: Whether to inject the tool guidance prompt into a
            system message.and will override self.inject_tool_prompt if provided.

    Returns:
        The updated ChatPipeline object.
    """
    self.until_tools += tools
    self.inject_tool_prompt = inject_prompt or self.inject_tool_prompt
    self.force_tool = force
    if next((c for c in self.until_callbacks if c[0] == self._until_tools_callback), None) is None:
        self.until_callbacks.append(
            (
                self._until_tools_callback,
                attempt_recovery,
                drop_dialog,
                max_rounds,
            )
        )
    return self

watch(*callbacks: WatchChatCallback, allow_duplicates: bool = False) -> ChatPipeline #

Registers a callback to monitor any chats produced.

Parameters:

  • *callbacks (WatchChatCallback, default: () ) –

    The callback functions to be executed.

  • allow_duplicates (bool, default: False ) –

    Whether to allow (seemingly) duplicate callbacks to be added.

async def log(chats: list[Chat]) -> None:
    ...

await pipeline.watch(log).run()

Returns:

Source code in rigging/chat.py
def watch(self, *callbacks: WatchChatCallback, allow_duplicates: bool = False) -> ChatPipeline:
    """
    Registers a callback to monitor any chats produced.

    Args:
        *callbacks: The callback functions to be executed.
        allow_duplicates: Whether to allow (seemingly) duplicate callbacks to be added.

    ```
    async def log(chats: list[Chat]) -> None:
        ...

    await pipeline.watch(log).run()
    ```

    Returns:
        The current instance of the chat.
    """
    for callback in callbacks:
        if allow_duplicates or callback not in self.watch_callbacks:
            self.watch_callbacks.append(callback)
    return self

with_(params: t.Optional[GenerateParams] = None, **kwargs: t.Any) -> ChatPipeline #

Assign specific generation parameter overloads for this chat.

Note

This will trigger a clone if overload params have already been set.

Parameters:

  • params (Optional[GenerateParams], default: None ) –

    The parameters to set for the chat.

  • **kwargs (Any, default: {} ) –

    An alternative way to pass parameters as keyword arguments.

Returns:

  • ChatPipeline

    A new instance of ChatPipeline with the updated parameters.

Source code in rigging/chat.py
def with_(self, params: t.Optional[GenerateParams] = None, **kwargs: t.Any) -> ChatPipeline:
    """
    Assign specific generation parameter overloads for this chat.

    Note:
        This will trigger a `clone` if overload params have already been set.

    Args:
        params: The parameters to set for the chat.
        **kwargs: An alternative way to pass parameters as keyword arguments.

    Returns:
        A new instance of ChatPipeline with the updated parameters.
    """
    if params is None:
        params = GenerateParams(**kwargs)

    if self.params is not None:
        new = self.clone()
        new.params = self.params.merge_with(params)
        return new

    self.params = params
    return self

wrap(func: t.Callable[[CallableT], CallableT]) -> ChatPipeline #

Helper for rigging.generator.base.Generator.wrap.

Parameters:

  • func (Callable[[CallableT], CallableT]) –

    The function to wrap the calls with.

Returns:

Source code in rigging/chat.py
def wrap(self, func: t.Callable[[CallableT], CallableT]) -> ChatPipeline:
    """
    Helper for [rigging.generator.base.Generator.wrap][].

    Args:
        func: The function to wrap the calls with.

    Returns:
        The current instance of the pipeline.
    """
    self.generator = self.generator.wrap(func)
    return self

MapChatCallback #

Bases: Protocol

__call__(chats: list[Chat]) -> t.Awaitable[list[Chat]] #

Passed a finalized chats to process. Can replace chats in the pipeline by returning a new chat object.

Source code in rigging/chat.py
def __call__(self, chats: list[Chat], /) -> t.Awaitable[list[Chat]]:
    """
    Passed a finalized chats to process. Can replace chats in the pipeline by returning
    a new chat object.
    """
    ...

ThenChatCallback #

Bases: Protocol

__call__(chat: Chat) -> t.Awaitable[Chat | None] #

Passed a finalized chat to process and can return a new chat to replace it.

Source code in rigging/chat.py
def __call__(self, chat: Chat, /) -> t.Awaitable[Chat | None]:
    """
    Passed a finalized chat to process and can return a new chat to replace it.
    """
    ...

UntilMessageCallback #

Bases: Protocol

__call__(message: Message) -> tuple[bool, list[Message]] #

Passed the next message, returns whether or not to continue and an optional list of messages to append before continuing.

Source code in rigging/chat.py
def __call__(self, message: Message, /) -> tuple[bool, list[Message]]:
    """
    Passed the next message, returns whether or not to continue and an
    optional list of messages to append before continuing.
    """
    ...

WatchChatCallback #

Bases: Protocol

__call__(chats: list[Chat]) -> t.Awaitable[None] #

Passed any created chat objects for monitoring/logging.

Source code in rigging/chat.py
def __call__(self, chats: list[Chat], /) -> t.Awaitable[None]:
    """
    Passed any created chat objects for monitoring/logging.
    """
    ...