Skip to content

rigging.completion

Completions work with isolated strings of text pre and post generation.

Completion(text: str, generated: str, generator: t.Optional[Generator] = None, **kwargs: t.Any) #

Bases: BaseModel

Represents a completed text generation.

Parameters:

  • text (str) –

    The original text.

  • generated (str) –

    The generated text.

  • generator (Optional[Generator], default: None ) –

    The generator associated with this completion.

  • **kwargs (Any, default: {} ) –

    Additional keyword arguments (typically used for serialization).

Source code in rigging/completion.py
def __init__(
    self,
    text: str,
    generated: str,
    generator: t.Optional[Generator] = None,
    **kwargs: t.Any,
):
    """
    Initialize a Completion object.

    Args:
        text: The original text.
        generated: The generated text.
        generator: The generator associated with this completion.
        **kwargs: Additional keyword arguments (typically used for serialization).
    """
    if "generator_id" in kwargs and generator is None:
        # TODO: Should we move params to self.params?
        generator = get_generator(kwargs.pop("generator_id"))

    super().__init__(
        text=text,
        generated=generated,
        generator=generator,
        **kwargs,
    )

all: str property #

Returns both the text and the generation.

error: t.Optional[Exception] = Field(None, exclude=True, repr=False) class-attribute instance-attribute #

Holds any exception that was caught during the generation pipeline.

extra: dict[str, t.Any] = Field(default_factory=dict, repr=False) class-attribute instance-attribute #

Any additional information from the generation.

failed: bool = Field(False, exclude=False, repr=False) class-attribute instance-attribute #

Indicates whether conditions during generation were not met. This is typically used for graceful error handling when parsing.

generated: str instance-attribute #

The generated text.

generator: t.Optional[Generator] = Field(None, exclude=True, repr=False) class-attribute instance-attribute #

The generator associated with the completion.

generator_id: str | None property #

The identifier of the generator used to create the completion

metadata: dict[str, t.Any] = Field(default_factory=dict) class-attribute instance-attribute #

Additional metadata for the completion.

params: t.Optional[GenerateParams] = Field(None, exclude=True, repr=False) class-attribute instance-attribute #

Any additional generation params used for this completion.

stop_reason: StopReason = Field(default='unknown') class-attribute instance-attribute #

The reason the generation stopped.

text: str instance-attribute #

The original text.

timestamp: datetime = Field(default_factory=datetime.now, repr=False) class-attribute instance-attribute #

The timestamp when the completion was created.

usage: t.Optional[Usage] = Field(None, repr=False) class-attribute instance-attribute #

The usage statistics for the generation if available.

uuid: UUID = Field(default_factory=uuid4) class-attribute instance-attribute #

The unique identifier.

clone(*, only_messages: bool = False) -> Completion #

Creates a deep copy of the completion.

Source code in rigging/completion.py
def clone(self, *, only_messages: bool = False) -> Completion:
    """Creates a deep copy of the completion."""
    new = Completion(self.text, self.generated, self.generator)
    if not only_messages:
        new.metadata = deepcopy(self.metadata)
        new.stop_reason = self.stop_reason
        new.usage = self.usage.model_copy() if self.usage is not None else self.usage
        new.extra = deepcopy(self.extra)
        new.params = self.params.model_copy() if self.params is not None else self.params
        new.failed = self.failed
    return new

continue_(text: str) -> CompletionPipeline #

Alias for the rigging.completion.Completion.fork with include_all=True.

Source code in rigging/completion.py
def continue_(self, text: str) -> CompletionPipeline:
    """Alias for the [rigging.completion.Completion.fork][] with `include_all=True`."""
    return self.fork(text, include_all=True)

fork(text: str, *, include_all: bool = False) -> CompletionPipeline #

Forks the completion by creating calling rigging.completion.Completion.restart and appends the specified text.

Parameters:

  • text (str) –

    The text to append.

Returns:

  • CompletionPipeline

    A new instance of the pipeline with the specified messages added.

Source code in rigging/completion.py
def fork(self, text: str, *, include_all: bool = False) -> CompletionPipeline:
    """
    Forks the completion by creating calling [rigging.completion.Completion.restart][] and appends the specified text.

    Args:
        text: The text to append.

    Returns:
        A new instance of the pipeline with the specified messages added.
    """
    return self.restart(include_all=include_all).add(text)

meta(**kwargs: t.Any) -> Completion #

Updates the metadata of the completion with the provided key-value pairs.

Parameters:

  • **kwargs (Any, default: {} ) –

    Key-value pairs representing the metadata to be updated.

Returns:

Source code in rigging/completion.py
def meta(self, **kwargs: t.Any) -> Completion:
    """
    Updates the metadata of the completion with the provided key-value pairs.

    Args:
        **kwargs: Key-value pairs representing the metadata to be updated.

    Returns:
        The updated completion object.
    """
    new = self.clone()
    new.metadata.update(kwargs)
    return new

restart(*, generator: t.Optional[Generator] = None, include_all: bool = False) -> CompletionPipeline #

Attempt to convert back to a CompletionPipeline for further generation.

Parameters:

  • generator (Optional[Generator], default: None ) –

    The generator to use for the restarted completion. Otherwise the generator from the original CompletionPipeline will be used.

  • include_all (bool, default: False ) –

    Whether to include the generation before the next round.

Returns: The restarted completion.

Raises:

  • ValueError

    If the completion was not created with a CompletionPipeline and no generator is provided.

Source code in rigging/completion.py
def restart(self, *, generator: t.Optional[Generator] = None, include_all: bool = False) -> CompletionPipeline:
    """
    Attempt to convert back to a CompletionPipeline for further generation.

    Args:
        generator: The generator to use for the restarted completion. Otherwise
            the generator from the original CompletionPipeline will be used.
        include_all: Whether to include the generation before the next round.
    Returns:
        The restarted completion.

    Raises:
        ValueError: If the completion was not created with a CompletionPipeline and no generator is provided.
    """

    text = self.all if include_all else self.generated
    if generator is None:
        generator = self.generator
    if generator is None:
        raise ValueError("Cannot restart a completion without an associated generator")
    return generator.complete(text, self.params)

CompletionPipeline(generator: Generator, text: str, *, params: t.Optional[GenerateParams] = None, watch_callbacks: t.Optional[list[WatchCompletionCallback]] = None) #

Pipeline to manipulate and produce completions.

Source code in rigging/completion.py
def __init__(
    self,
    generator: Generator,
    text: str,
    *,
    params: t.Optional[GenerateParams] = None,
    watch_callbacks: t.Optional[list[WatchCompletionCallback]] = None,
):
    self.generator: Generator = generator
    """The generator object responsible for generating the completion."""
    self.text = text
    """The text to be completed."""
    self.params = params
    """The parameters for generating the completion."""
    self.metadata: dict[str, t.Any] = {}
    """Additional metadata associated with the completion."""
    self.errors_to_fail_on: set[type[Exception]] = set()
    """
    The list of exceptions to catch during generation if you are including or skipping failures.

    ExhuastedMaxRounds is implicitly included.
    """
    self.on_failed: FailMode = "raise"
    """How to handle failures in the pipeline unless overriden in calls."""

    # (callback, all_text, max_rounds)
    self.until_callbacks: list[tuple[UntilCompletionCallback, bool, int]] = []
    self.until_types: list[type[Model]] = []
    self.then_callbacks: list[ThenCompletionCallback] = []
    self.map_callbacks: list[MapCompletionCallback] = []
    self.watch_callbacks: list[WatchCompletionCallback] = watch_callbacks or []

errors_to_fail_on: set[type[Exception]] = set() instance-attribute #

The list of exceptions to catch during generation if you are including or skipping failures.

ExhuastedMaxRounds is implicitly included.

generator: Generator = generator instance-attribute #

The generator object responsible for generating the completion.

metadata: dict[str, t.Any] = {} instance-attribute #

Additional metadata associated with the completion.

on_failed: FailMode = 'raise' instance-attribute #

How to handle failures in the pipeline unless overriden in calls.

params = params instance-attribute #

The parameters for generating the completion.

text = text instance-attribute #

The text to be completed.

add(text: str) -> CompletionPipeline #

Appends new text to the internal text before generation.

Parameters:

  • text (str) –

    The text to be added to the completion.

Returns:

Source code in rigging/completion.py
def add(self, text: str) -> CompletionPipeline:
    """
    Appends new text to the internal text before generation.

    Args:
        text: The text to be added to the completion.

    Returns:
        The updated CompletionPipeline object.
    """
    self.text += text
    return self

apply(**kwargs: str) -> CompletionPipeline #

Applies keyword arguments to the text using string template substitution.

Note

This produces a clone of the CompletionPipeline, leaving the original unchanged.

Parameters:

  • **kwargs (str, default: {} ) –

    Keyword arguments to be applied to the text.

Returns:

Source code in rigging/completion.py
def apply(self, **kwargs: str) -> CompletionPipeline:
    """
    Applies keyword arguments to the text using string template substitution.

    Note:
        This produces a clone of the CompletionPipeline, leaving the original unchanged.

    Args:
        **kwargs: Keyword arguments to be applied to the text.

    Returns:
        A new instance of CompletionPipeline with the applied arguments.
    """
    new = self.clone()
    template = string.Template(self.text)
    new.text = template.safe_substitute(**kwargs)
    return new

catch(*errors: type[Exception], on_failed: FailMode | None = None) -> CompletionPipeline #

Adds exceptions to catch during generation when including or skipping failures.

Parameters:

  • *errors (type[Exception], default: () ) –

    The exception types to catch.

  • on_failed (FailMode | None, default: None ) –

    How to handle failures in the pipeline unless overriden in calls.

Returns:

Source code in rigging/completion.py
def catch(self, *errors: type[Exception], on_failed: FailMode | None = None) -> CompletionPipeline:
    """
    Adds exceptions to catch during generation when including or skipping failures.

    Args:
        *errors: The exception types to catch.
        on_failed: How to handle failures in the pipeline unless overriden in calls.

    Returns:
        The updated CompletionPipeline object.
    """
    self.errors_to_fail_on.update(errors)
    self.on_failed = on_failed or self.on_failed
    return self

clone(*, only_text: bool = False) -> CompletionPipeline #

Creates a clone of the current CompletionPipeline instance.

Parameters:

  • only_text (bool, default: False ) –

    If True, only the text will be cloned. If False (default), the entire CompletionPipeline instance will be cloned including until callbacks, types, and metadata.

Returns:

  • CompletionPipeline

    A new instance of CompletionPipeline that is a clone of the current instance.

Source code in rigging/completion.py
def clone(self, *, only_text: bool = False) -> CompletionPipeline:
    """
    Creates a clone of the current `CompletionPipeline` instance.

    Args:
        only_text: If True, only the text will be cloned.
            If False (default), the entire `CompletionPipeline` instance will be cloned
            including until callbacks, types, and metadata.

    Returns:
        A new instance of `CompletionPipeline` that is a clone of the current instance.
    """
    new = CompletionPipeline(
        self.generator,
        self.text,
        params=self.params.model_copy() if self.params is not None else None,
        watch_callbacks=self.watch_callbacks,
    )
    if not only_text:
        new.until_callbacks = self.until_callbacks.copy()
        new.until_types = self.until_types.copy()
        new.metadata = deepcopy(self.metadata)
        new.then_callbacks = self.then_callbacks.copy()
        new.map_callbacks = self.map_callbacks.copy()
    return new

fork(text: str) -> CompletionPipeline #

Creates a new instance of CompletionPipeline by forking the current completion and adding the specified text.

This is a convenience method for calling clone().add(text).

Parameters:

  • text (str) –

    The text to be added to the new completion.

Returns:

  • CompletionPipeline

    A new instance of CompletionPipeline with the specified text added.

Source code in rigging/completion.py
def fork(self, text: str) -> CompletionPipeline:
    """
    Creates a new instance of `CompletionPipeline` by forking the current completion and adding the specified text.

    This is a convenience method for calling `clone().add(text)`.

    Args:
        text: The text to be added to the new completion.

    Returns:
        A new instance of `CompletionPipeline` with the specified text added.
    """
    return self.clone().add(text)

map(callback: MapCompletionCallback) -> CompletionPipeline #

Registers a callback to be executed after the generation process completes.

Note

You must return a list of completion objects from the callback which will represent the state of completions for the remainder of the callbacks and return.

async def process(completions: list[Completion]) -> list[Completion]:
    ...

await pipeline.map(process).run()

Parameters:

Returns:

Source code in rigging/completion.py
def map(self, callback: MapCompletionCallback) -> CompletionPipeline:
    """
    Registers a callback to be executed after the generation process completes.

    Note:
        You must return a list of completion objects from the callback which will
        represent the state of completions for the remainder of the callbacks and return.

    ```
    async def process(completions: list[Completion]) -> list[Completion]:
        ...

    await pipeline.map(process).run()
    ```

    Args:
        callback: The callback function to be executed.

    Returns:
        The current instance of the completion.
    """
    self.map_callbacks.append(callback)
    return self

meta(**kwargs: t.Any) -> CompletionPipeline #

Updates the metadata of the completion with the provided key-value pairs.

Parameters:

  • **kwargs (Any, default: {} ) –

    Key-value pairs representing the metadata to be updated.

Returns:

Source code in rigging/completion.py
def meta(self, **kwargs: t.Any) -> CompletionPipeline:
    """
    Updates the metadata of the completion with the provided key-value pairs.

    Args:
        **kwargs: Key-value pairs representing the metadata to be updated.

    Returns:
        The updated completion object.
    """
    self.metadata.update(kwargs)
    return self

run(*, allow_failed: bool = False, on_failed: FailMode | None = None) -> Completion async #

Execute the generation process to produce the final chat.

Parameters:

  • allow_failed (bool, default: False ) –

    Ignore any errors and potentially return the chat in a failed state.

  • on_failed (FailMode | None, default: None ) –

    The behavior when a message fails to generate. (this is used as an alternative to allow_failed)

Returns:

Source code in rigging/completion.py
async def run(self, *, allow_failed: bool = False, on_failed: FailMode | None = None) -> Completion:
    """
    Execute the generation process to produce the final chat.

    Parameters:
        allow_failed: Ignore any errors and potentially
            return the chat in a failed state.
        on_failed: The behavior when a message fails to generate.
            (this is used as an alternative to allow_failed)

    Returns:
        The generated Completion.
    """
    if on_failed is None:
        on_failed = "include" if allow_failed else self.on_failed

    if on_failed == "skip":
        raise ValueError(
            "Cannot use 'skip' mode with single completion generation (pass allow_failed=True or on_failed='include'/'raise')"
        )

    on_failed = on_failed or self.on_failed
    states = self._initialize_states(1)

    with tracer.span(
        "Completion with {generator_id}",
        generator_id=self.generator.to_identifier(),
        params=self.params.to_dict() if self.params is not None else {},
    ) as span:
        return (await self._run(span, states, on_failed))[0]

run_batch(many: t.Sequence[str], params: t.Sequence[t.Optional[GenerateParams]] | None = None, *, on_failed: FailMode = 'raise') -> list[Completion] async #

Executes the generation process accross multiple input messages.

Note

Anything already in this pending completion will be prepended to the text.

Parameters:

  • many (Sequence[str]) –

    A sequence of texts to generate with.

  • params (Sequence[Optional[GenerateParams]] | None, default: None ) –

    A sequence of parameters to be used for each text.

  • on_failed (FailMode, default: 'raise' ) –

    How to handle failures in the pipeline unless overriden in calls.

Returns:

  • list[Completion]

    A list of generatated Completions.

Source code in rigging/completion.py
async def run_batch(
    self,
    many: t.Sequence[str],
    params: t.Sequence[t.Optional[GenerateParams]] | None = None,
    *,
    on_failed: FailMode = "raise",
) -> list[Completion]:
    """
    Executes the generation process accross multiple input messages.

    Note:
        Anything already in this pending completion will be prepended to the text.

    Parameters:
        many: A sequence of texts to generate with.
        params: A sequence of parameters to be used for each text.
        on_failed: How to handle failures in the pipeline unless overriden in calls.

    Returns:
        A list of generatated Completions.
    """
    on_failed = on_failed or self.on_failed
    params = self._fit_params(len(many), params)

    states: list[RunState] = [RunState(m, p, self._process()) for m, p in zip(many, params)]
    for state in states:
        next(state.processor)

    with tracer.span(
        "Completion batch with {generator_id} ({count})",
        count=len(states),
        generator_id=self.generator.to_identifier(),
        params=self.params.to_dict() if self.params is not None else {},
    ) as span:
        return await self._run(span, states, on_failed, batch_mode=True)

run_many(count: int, *, params: t.Sequence[t.Optional[GenerateParams]] | None = None, on_failed: FailMode | None = None) -> list[Completion] async #

Executes the generation process multiple times with the same inputs.

Parameters:

  • count (int) –

    The number of times to execute the generation process.

  • params (Sequence[Optional[GenerateParams]] | None, default: None ) –

    A sequence of parameters to be used for each execution.

  • on_failed (FailMode | None, default: None ) –

    How to handle failures in the pipeline unless overriden in calls.

Returns:

  • list[Completion]

    A list of generatated Completions.

Source code in rigging/completion.py
async def run_many(
    self,
    count: int,
    *,
    params: t.Sequence[t.Optional[GenerateParams]] | None = None,
    on_failed: FailMode | None = None,
) -> list[Completion]:
    """
    Executes the generation process multiple times with the same inputs.

    Parameters:
        count: The number of times to execute the generation process.
        params: A sequence of parameters to be used for each execution.
        on_failed: How to handle failures in the pipeline unless overriden in calls.

    Returns:
        A list of generatated Completions.
    """
    on_failed = on_failed or self.on_failed
    states = self._initialize_states(count, params)

    with tracer.span(
        "Completion with {generator_id} (x{count})",
        count=count,
        generator_id=self.generator.to_identifier(),
        params=self.params.to_dict() if self.params is not None else {},
    ) as span:
        return await self._run(span, states, on_failed)

run_over(*generators: Generator | str, include_original: bool = True, on_failed: FailMode | None = None) -> list[Completion] async #

Executes the generation process across multiple generators.

For each generator, this pipeline is cloned and the generator is replaced before the run call. All callbacks and parameters are preserved.

Parameters:

  • *generators (Generator | str, default: () ) –

    A sequence of generators to be used for the generation process.

  • include_original (bool, default: True ) –

    Whether to include the original generator in the list of runs.

  • on_failed (FailMode | None, default: None ) –

    The behavior when a message fails to generate.

Returns:

  • list[Completion]

    A list of generatated Completions.

Source code in rigging/completion.py
async def run_over(
    self, *generators: Generator | str, include_original: bool = True, on_failed: FailMode | None = None
) -> list[Completion]:
    """
    Executes the generation process across multiple generators.

    For each generator, this pipeline is cloned and the generator is replaced
    before the run call. All callbacks and parameters are preserved.

    Parameters:
        *generators: A sequence of generators to be used for the generation process.
        include_original: Whether to include the original generator in the list of runs.
        on_failed: The behavior when a message fails to generate.

    Returns:
        A list of generatated Completions.
    """
    on_failed = on_failed or self.on_failed

    _generators: list[Generator] = [g if isinstance(g, Generator) else get_generator(g) for g in generators]
    if include_original:
        _generators.append(self.generator)

    coros: list[t.Coroutine[t.Any, t.Any, Completion]] = []
    for generator in _generators:
        sub = self.clone()
        sub.generator = generator
        coros.append(sub.run(allow_failed=(on_failed != "raise")))

    with tracer.span("Completion over {count} generators", count=len(coros)):
        completions = await asyncio.gather(*coros)
        return await self._post_run(completions, on_failed)

then(callback: ThenCompletionCallback) -> CompletionPipeline #

Registers a callback to be executed after the generation process completes.

Note

Returning a Completion object from the callback will replace the current completion. for the remainder of the callbacks + return value of run().

async def process(completion: Completion) -> Completion | None:
    ...

await pipeline.then(process).run()

Parameters:

Returns:

Source code in rigging/completion.py
def then(self, callback: ThenCompletionCallback) -> CompletionPipeline:
    """
    Registers a callback to be executed after the generation process completes.

    Note:
        Returning a Completion object from the callback will replace the current completion.
        for the remainder of the callbacks + return value of `run()`.

    ```
    async def process(completion: Completion) -> Completion | None:
        ...

    await pipeline.then(process).run()
    ```

    Args:
        callback: The callback function to be executed.

    Returns:
        The current instance of the pipeline.
    """
    self.then_callbacks.append(callback)
    return self

until(callback: UntilCompletionCallback, *, use_all_text: bool = False, max_rounds: int = DEFAULT_MAX_ROUNDS) -> CompletionPipeline #

Registers a callback to participate in validating the generation process.

# Takes the generated text, and returns whether or not to retry generation.

def callback(text: str) -> bool:
    if is_valid(text):
        return False
    else:
        return True

await pipeline.until(callback).run()

Parameters:

  • callback (UntilCompletionCallback) –

    The callback function to be executed.

  • use_all_text (bool, default: False ) –

    Whether to pass the entire text (including prompt) to the callback.

  • max_rounds (int, default: DEFAULT_MAX_ROUNDS ) –

    The maximum number of rounds to attempt generation + callbacks before giving up.

Returns:

Source code in rigging/completion.py
def until(
    self,
    callback: UntilCompletionCallback,
    *,
    use_all_text: bool = False,
    max_rounds: int = DEFAULT_MAX_ROUNDS,
) -> CompletionPipeline:
    """
    Registers a callback to participate in validating the generation process.

    ```py
    # Takes the generated text, and returns whether or not to retry generation.

    def callback(text: str) -> bool:
        if is_valid(text):
            return False
        else:
            return True

    await pipeline.until(callback).run()
    ```

    Args:
        callback: The callback function to be executed.
        use_all_text: Whether to pass the entire text (including prompt) to the callback.

        max_rounds: The maximum number of rounds to attempt generation + callbacks
            before giving up.

    Returns:
        The current instance of the completion.
    """
    self.until_callbacks.append((callback, use_all_text, max_rounds))
    return self

until_parsed_as(*types: type[ModelT], use_all_text: bool = False, max_rounds: int = DEFAULT_MAX_ROUNDS) -> CompletionPipeline #

Adds the specified types to the list of types which should successfully parse before the generation process completes.

Parameters:

  • *types (type[ModelT], default: () ) –

    The type or types of models to wait for.

  • use_all_text (bool, default: False ) –

    Whether to pass the entire text (including prompt) to the parser.

  • max_rounds (int, default: DEFAULT_MAX_ROUNDS ) –

    The maximum number of rounds to try to parse successfully.

Returns:

Source code in rigging/completion.py
def until_parsed_as(
    self,
    *types: type[ModelT],
    use_all_text: bool = False,
    max_rounds: int = DEFAULT_MAX_ROUNDS,
) -> CompletionPipeline:
    """
    Adds the specified types to the list of types which should successfully parse
    before the generation process completes.

    Args:
        *types: The type or types of models to wait for.
        use_all_text: Whether to pass the entire text (including prompt) to the parser.
        max_rounds: The maximum number of rounds to try to parse successfully.

    Returns:
        The updated CompletionPipeline object.
    """
    self.until_types += types
    if next((c for c in self.until_callbacks if c[0] == self._until_parse_callback), None) is None:
        self.until_callbacks.append((self._until_parse_callback, use_all_text, max_rounds))

    return self

watch(*callbacks: WatchCompletionCallback, allow_duplicates: bool = False) -> CompletionPipeline #

Registers a callback to monitor any completions produced.

Parameters:

  • *callbacks (WatchCompletionCallback, default: () ) –

    The callback functions to be executed.

  • allow_duplicates (bool, default: False ) –

    Whether to allow (seemingly) duplicate callbacks to be added.

async def log(completions: list[Completion]) -> None:
    ...

await pipeline.watch(log).run()

Returns:

Source code in rigging/completion.py
def watch(self, *callbacks: WatchCompletionCallback, allow_duplicates: bool = False) -> CompletionPipeline:
    """
    Registers a callback to monitor any completions produced.

    Args:
        *callbacks: The callback functions to be executed.
        allow_duplicates: Whether to allow (seemingly) duplicate callbacks to be added.

    ```
    async def log(completions: list[Completion]) -> None:
        ...

    await pipeline.watch(log).run()
    ```

    Returns:
        The current instance.
    """
    for callback in callbacks:
        if allow_duplicates or callback not in self.watch_callbacks:
            self.watch_callbacks.append(callback)
    return self

with_(params: t.Optional[GenerateParams] = None, **kwargs: t.Any) -> CompletionPipeline #

Assign specific generation parameter overloads for this completion.

Note

This will trigger a clone if overload params have already been set.

Parameters:

  • params (Optional[GenerateParams], default: None ) –

    The parameters to set for the completion.

  • **kwargs (Any, default: {} ) –

    An alternative way to pass parameters as keyword arguments.

Returns:

Source code in rigging/completion.py
def with_(self, params: t.Optional[GenerateParams] = None, **kwargs: t.Any) -> CompletionPipeline:
    """
    Assign specific generation parameter overloads for this completion.

    Note:
        This will trigger a `clone` if overload params have already been set.

    Args:
        params: The parameters to set for the completion.
        **kwargs: An alternative way to pass parameters as keyword arguments.

    Returns:
        The current (or cloned) instance of the completion.
    """
    if params is None:
        params = GenerateParams(**kwargs)

    if self.params is not None:
        new = self.clone()
        new.params = self.params.merge_with(params)
        return new

    self.params = params
    return self

wrap(func: t.Callable[[CallableT], CallableT]) -> CompletionPipeline #

Helper for rigging.generator.base.Generator.wrap.

Parameters:

  • func (Callable[[CallableT], CallableT]) –

    The function to wrap the calls with.

Returns:

Source code in rigging/completion.py
def wrap(self, func: t.Callable[[CallableT], CallableT]) -> CompletionPipeline:
    """
    Helper for [rigging.generator.base.Generator.wrap][].

    Args:
        func: The function to wrap the calls with.

    Returns:
        The current instance of the pipeline.
    """
    self.generator = self.generator.wrap(func)
    return self

MapCompletionCallback #

Bases: Protocol

__call__(completions: list[Completion]) -> t.Awaitable[list[Completion]] #

Passed a finalized completion to process.

This callback can replace, remove, or extend completions in the pipeline.

Source code in rigging/completion.py
def __call__(self, completions: list[Completion], /) -> t.Awaitable[list[Completion]]:
    """
    Passed a finalized completion to process.

    This callback can replace, remove, or extend completions
    in the pipeline.
    """
    ...

ThenCompletionCallback #

Bases: Protocol

__call__(completion: Completion) -> t.Awaitable[Completion | None] #

Passed a finalized completion to process and can return a new completion to replace it.

Source code in rigging/completion.py
def __call__(self, completion: Completion, /) -> t.Awaitable[Completion | None]:
    """
    Passed a finalized completion to process and can return a new completion to replace it.
    """
    ...

UntilCompletionCallback #

Bases: Protocol

__call__(text: str) -> bool #

A callback function that takes the generated text and returns whether or not to retry generation.

Source code in rigging/completion.py
def __call__(self, text: str, /) -> bool:
    """
    A callback function that takes the generated text and returns whether or not to retry generation.
    """
    ...

WatchCompletionCallback #

Bases: Protocol

__call__(completions: list[Completion]) -> t.Awaitable[None] #

Passed any created completion objects for monitoring/logging.

Source code in rigging/completion.py
def __call__(self, completions: list[Completion], /) -> t.Awaitable[None]:
    """
    Passed any created completion objects for monitoring/logging.
    """
    ...