Digesting
Digestions provide a means of processing records _within_ LogQS so that large quantities of records can be processed efficiently. One can think of digestions as a way to bulk export data from LogQS, similar to thinking ingestions as a way to bulk import data into LogQS.
Digestions are primarily composed of three resources:
Digestion: The main resource which represents the digestion itself. It contains metadata about the digestion, such as which log it is associated with and the current state of the digestion process (see Digestions)
Digestion Topic: A resource which is associated with a digestion and a specific topic within a log. The collection of digestion topics associated with a digestion defines which records will be processed during the digestion, including filtering data based on time ranges (see Digestion Topics).
Digestion Part: A resource generated during digestion processing which provides an index of records which are used during processing (see Digestion Parts).
The general digestion process is as follows:
Create a Digestion: A user creates a digestion resource which specifies the log which is to be processed.
Add Digestion Topics: The user adds digestion topics to the digestion resource, specifying which topics within the log should be processed and any filters to apply to the records in those topics.
Process the Digestion: The user updates the digestion to a queued state, which triggers the digestion processing. The LogQS server will then process the records in the specified topics, creating digestion parts as it goes.
Digestions can process records in different ways as dictated by the specific Workflow (see Workflows). For example, a digestion can be used to create an extraction, which results in the records being stored in a new log file which can then be downloaded by the user. In their most generic form, digestions can be used to consolidate record data for efficient processing through the API (see Basic Digestions).
Creating a Digestion
Before creating the actual digestion, we will choose an appropriate workflow to process the digestion. For this example, we will use the LogQS’ managed “Extraction” workflow, which will produce a new log file that we can download. We can fetch this workflow by its name:
workflow = lqs.list.workflow(
name="LogQS Managed Extractor",
).data[0]
Next, to create a digestion, we can use the lqs.create.digestion method. This method requires a log_id to specify which log the digestion is associated with and the workflow_id to specify which workflow should be used for processing. We can also provide an optional name and note for the digestion for reference.
digestion = lqs.create.digestion(
log_id=log_id,
name="My Digestion",
note="This is a test digestion.",
workflow_id=workflow.id,
).data
Digestions are created in a ready state by default.
Next, we can add digestion topics to the digestion. This is done using the lqs.create.digestion_topic method, which requires a digestion_id and a topic_id to specify which topic within the log should be processed. We can also provide optional start_time and end_time parameters to specify a time range for the records to be processed.
Say we want to process records from all the topics in the log starting 15 seconds from the start of the log and ending 15 seconds before the end of the log. We can do this by fetching all the topics in the log and then creating a digestion topic for each one:
log = lqs.fetch.log(log_id=log_id).data
topics = lqs.list.topic(log_id=log_id).data
for topic in topics:
lqs.create.digestion_topic(
digestion_id=digestion.id,
topic_id=topic.id,
start_time=log.start_time + 15e9, # 15 seconds after log start
end_time=log.end_time - 15e9, # 15 seconds before log end
)
At this point, the digestion is ready to be processed. We can update the digestion to a queued state, which will trigger the digestion processing:
digestion = lqs.update.digestion(
digestion_id=digestion.id,
data={"state": "queued"}
).data
This will start the digestion process, which will transition the digestion to a processing state when LogQS starts the actual task. During the processing state, LogQS will create digestion parts for the digestion which can be used either by the user directly or by a workflow during the finalizing state.
We can monitor the state of the digestion by fetching it again:
digestion = lqs.fetch.digestion(digestion_id=digestion.id).data
print(f"Digestion '{digestion.name}' is now in state '{digestion.state.value}'.")
In this example, we’re using the LogQS managed “Extraction” workflow which will produce a new log file during the finalizing state and upload that file to the default object store as a log object under a prefix of digestions/<digestion ID>/. Since we’re digesting data from a ROS bag, the resulting log file will be a ROS bag file as well and will be located at digestions/<digestion ID>/extraction_<digestion ID>.bag.
When the digestion is in the completed state, we can verify that the extraction file was created by listing the log objects in the default object store:
extraction_object = lqs.list.log_object(
log_id=log_id,
prefix=f"digestions/{digestion.id}/",
delimiter="/",
).data[0]
If this checks out, we can download the file using the download utility method (see Utils.download()):
lqs.utils.download(log_id=log_id, object_key=extraction_object.key)
This will download the file to the current working directory, and we can verify that the file was downloaded successfully.