rigging.data
ElasticMapping = {'properties': {'generated': {'type': 'nested'}, 'messages': {'type': 'nested'}}}
module-attribute
#
Default index mapping for chat objects in elastic.
ElasticOpType = t.Literal['index', 'create', 'delete']
module-attribute
#
Available operations for bulk operations.
chats_to_df(chats: Chat | t.Sequence[Chat]) -> pd.DataFrame
#
Convert a Chat or list of Chat objects into a pandas DataFrame.
Note
The messages will be flatted and can be joined by the chat_id column.
Parameters:
Returns:
-
DataFrame
–A pandas DataFrame containing the chat data.
Source code in rigging/data.py
chats_to_elastic(chats: Chat | t.Sequence[Chat], index: str, client: es.AsyncElasticsearch, *, op_type: ElasticOpType = 'index', create_index: bool = True, **kwargs: t.Any) -> int
async
#
Convert chat data to Elasticsearch bulk operation format and store it with a client.
Parameters:
-
chats
(Chat | Sequence[Chat]
) –The chat or list of chats to be converted and stored.
-
index
(str
) –The name of the Elasticsearch index where the data will be stored.
-
client
(AsyncElasticsearch
) –The AsyncElasticsearch client instance.
-
op_type
(ElasticOpType
, default:'index'
) –The operation type for Elasticsearch. Defaults to "create".
-
create_index
(bool
, default:True
) –Whether to create the index if it doesn't exist and update its mapping.
-
kwargs
(Any
, default:{}
) –Additional keyword arguments to be passed to the Elasticsearch client.
Returns:
-
int
–The indexed count from the bulk operation
Source code in rigging/data.py
chats_to_elastic_data(chats: Chat | t.Sequence[Chat], index: str, *, op_type: ElasticOpType = 'index') -> list[dict[str, t.Any]]
#
Convert chat data to Elasticsearch bulk operation format.
Parameters:
-
chats
(Chat | Sequence[Chat]
) –The chat or list of chats to be converted.
-
op_type
(ElasticOpType
, default:'index'
) –The operation type for Elasticsearch.
Returns:
-
list[dict[str, Any]]
–Formatted bulk operation dict.
Source code in rigging/data.py
df_to_chats(df: pd.DataFrame) -> list[Chat]
#
Convert a pandas DataFrame into a list of Chat objects.
Note
The DataFrame should have the same structure as the one
generated by the chats_to_df
function.
Parameters:
-
df
(DataFrame
) –A pandas DataFrame containing the chat data.
Returns:
-
list[Chat]
–A list of Chat objects.
Source code in rigging/data.py
elastic_data_to_chats(data: t.Mapping[str, t.Any] | ObjectApiResponse[t.Any]) -> list[Chat]
#
Convert the raw elastic results into a list of Chat objects.
Source code in rigging/data.py
elastic_to_chats(query: t.Mapping[str, t.Any], index: str, client: es.AsyncElasticsearch, *, max_results: int | None = None, **kwargs: t.Any) -> list[Chat]
async
#
Retrieve chat data from Elasticsearch and convert it to a pandas DataFrame.
Parameters:
-
query
(Mapping[str, Any]
) –The Elasticsearch query to be executed.
-
index
(str
) –The name of the Elasticsearch index where the data will be retrieved.
-
client
(AsyncElasticsearch
) –The Elasticsearch client instance.
-
max_results
(int | None
, default:None
) –The maximum number of results to retrieve.
-
kwargs
(Any
, default:{}
) –Additional keyword arguments to be passed to the Elasticsearch client.
Returns:
-
list[Chat]
–A pandas DataFrame containing the chat data.
Source code in rigging/data.py
flatten_chats(chats: Chat | t.Sequence[Chat]) -> list[dict[t.Any, t.Any]]
#
Flatten a list of chats into a individual messages with duplicated properties relevant to the chat.
Parameters:
Returns:
-
list[dict[Any, Any]]
–A list of flat Message objects.
Source code in rigging/data.py
s3_bucket_exists(client: S3Client, bucket: str) -> bool
async
#
Determine if an S3 bucket exists.
Parameters:
-
client
(S3Client
) –The S3 client to use.
-
bucket
(str
) –The bucket to check.
Returns:
-
bool
–True if the bucket exists, False otherwise.
Source code in rigging/data.py
s3_object_exists(client: S3Client, bucket: str, key: str) -> bool
async
#
Determine if an S3 object exists.
Parameters:
-
client
(S3Client
) –The S3 client to use.
-
bucket
(str
) –The bucket to check.
-
key
(str
) –The key to check.
Returns:
-
bool
–True if the object exists, False otherwise.
Source code in rigging/data.py
unflatten_chats(messages: t.Sequence[dict[t.Any, t.Any]]) -> list[Chat]
#
Unflatten a list of messages into a list of Chat objects.
Parameters:
-
messages
(Sequence[dict[Any, Any]]
) –A list of flat Message objects in the format from rigging.data.flatten_chats.
Returns:
-
list[Chat]
–A list of Chat objects.