Working with Records

Most non-trivial uses of LogQS will involve interacting with records which have been ingested. On this page, we’ll cover a few different aspects of working with records.

Record Auxiliary Data

First, we’ll load a log, a topic from that log which is of type sensor_msgs/Image, and a single record from that topic. We’ll dump the contents of the record to see what it looks like:

log = lqs.resource.Log.fetch("Demo Log")
topic = log.list_topics(type_name="sensor_msgs/Image")[0]
record = topic.list_records(limit=1)[0]

print(record.model_dump())

>> {'locked': False,
 'locked_by': None,
 'locked_at': None,
 'lock_token': None,
 'timestamp': 1655235727034130944,
 'created_at': datetime.datetime(2023, 12, 18, 22, 25, 10, 453947, tzinfo=TzInfo(UTC)),
 'updated_at': None,
 'deleted_at': None,
 'created_by': None,
 'updated_by': None,
 'deleted_by': None,
 'log_id': UUID('f94c2773-6075-44d3-9638-89489e99d0c0'),
 'topic_id': UUID('0f552dad-30b5-4d93-b6a2-67403527fa3a'),
 'ingestion_id': UUID('707e51ae-25a7-42ff-8ed5-9d8ed603b883'),
 'data_offset': 18122,
 'data_length': 1710802,
 'chunk_compression': None,
 'chunk_offset': None,
 'chunk_length': None,
 'source': None,
 'error': None,
 'query_data': None,
 'auxiliary_data': None,
 'raw_data': None,
 'context': None,
 'note': None}

In LogQS, records can be associated with “auxiliary” data which allows us to augment records with arbitrary JSON data stored in an object store. This data is not included on records by default, as loading the data incurs a performance hit per record, but it can be loaded by setting the include_auxiliary_data parameter to True when fetching or listing records.

Note: auxiliary data can be arbitrarily large, so loading a large amount of records with auxiliary data can be problematic (including errors related to payload limits). It’s usually best to load records with auxiliary data one at a time, or in small batches.

record = topic.list_records(limit=1, include_auxiliary_data=True)[0]
print(record.model_dump())

>> {'locked': False,
 'locked_by': None,
 'locked_at': None,
 'lock_token': None,
 'timestamp': 1655235727034130944,
 'created_at': datetime.datetime(2023, 12, 18, 22, 25, 10, 453947, tzinfo=TzInfo(UTC)),
 'updated_at': None,
 'deleted_at': None,
 'created_by': None,
 'updated_by': None,
 'deleted_by': None,
 'log_id': UUID('f94c2773-6075-44d3-9638-89489e99d0c0'),
 'topic_id': UUID('0f552dad-30b5-4d93-b6a2-67403527fa3a'),
 'ingestion_id': UUID('707e51ae-25a7-42ff-8ed5-9d8ed603b883'),
 'data_offset': 18122,
 'data_length': 1710802,
 'chunk_compression': None,
 'chunk_offset': None,
 'chunk_length': None,
 'source': None,
 'error': None,
 'query_data': None,
 'auxiliary_data': {'image': 'UklGR...e/8f0gQAAAA',
  'max_size': 640,
  'quality': 80,
  'format': 'webp'},
 'raw_data': None,
 'context': None,
 'note': None}

You should see that the auxiliary data for this record includes an ‘image’ field with a base64-encoded image. In LogQS, we automatically process certain types of data, such as images, to generate this auxiliary data on-demand. Other types of data may not have auxiliary data generated automatically, in which case a user will need to manually create it.

The record model includes a helper method to display the image in notebooks:

record.load_auxiliary_data_image()
../_images/03_working_with_records_7_0.png

The “thumbnail” associated with the record.

Note that the image you’d find in the auxiliary data of a record is typically downscaled and compressed, making it unsuitable for high-quality image processing. We refer to these images as “preview” images since they’re appropriate for quick reference.

If you need a full-resolution image, you’ll need to fetch and deserialize the original data from the log file.

Fetching Record Data

When we want to fetch the original log data for a record, we have to jump through a few hoops to actually get it. The record provides enough information to fetch the rest of the necessary data to fetch the orginal log data from the log file in the object store, but this is quite cumbersome.

To make this process easier, we’ve provided a utility method for fetching the record bytes given a record. Note that this process can be slow, especially when performed on a single record at a time:

record_bytes = lqs.utils.fetch_record_bytes(record)

print(record_bytes[:10])

b'`\n\x00\x00\x8e\xe4\xa8b(p'

LogQS comes with deserialization utilities for different log formats. There’s different ways of accessing these utilities, but if you’re interested in fetching and deserializing the original log data for a record, the following method is the most straightforward:

record_data = lqs.utils.get_deserialized_record_data(record)

# we omit the "data" field since it's big and not very interesting to see
print({ k: v for k, v in record_data.items() if k != "data" })

>> {'header': {'seq': 2656,
  'stamp': {'secs': 1655235726, 'nsecs': 999977000},
  'frame_id': 'crl_rzr/multisense_front/aux_camera_frame'},
 'height': 594,
 'width': 960,
 'encoding': 'bgr8',
 'is_bigendian': 0,
 'step': 2880}

Our deserialization utilities will return a dictionary with the deserialized data in a format closely matching the original data schema. In the case of sensor_msgs/Image topics, you’ll find that the dictionary looks similar to the ROS message definition.

If we want to view this image, we’ll have to do a little processing to convert the image data to a format that can be displayed in a Jupyter notebook. We’ll use the PIL library to do this:

from PIL import Image as ImagePIL

mode = "RGB" # different encodings may use different modes
img = ImagePIL.frombuffer(
    mode,
    (record_data["width"], record_data["height"]),
    bytes(record_data["data"]),
    "raw",
    mode,
    0,
    1,
)

# in this case, we actually have a BGR image, not an RGB, so we need to swap the channels
b, g, r = img.split()
img = ImagePIL.merge("RGB", (r, g, b))

img
../_images/03_working_with_records_14_0.png

The full-resolution image associated with the record.

Of course, we also offer a utility function which can do this for you:

from lqs.common.utils import get_record_image

img = get_record_image(record_data, format="PNG")
img
../_images/03_working_with_records_16_0.png

The same full-resolution image associated with the record.

Listing Records

If we need to work with more than one record in this kind of way, there are some approaches that can be useful to improve performance depending on the context. For example, if we’re interested in getting a list of records across time, but we don’t need every record within a span of time, we can use the frequency parameter to specify how many records we want to fetch per second. This can be useful for getting a representative sample of records across time without having to load every single record.

records = topic.list_records(frequency=0.1) # 0.1 record per second, or 1 record every 10 seconds

print(f"Found {len(records)} records")

>> Found 7 records

We can then proceed as we did above to fetch the original log data for each record, but the methods used above aren’t optimized for working with a batch of records (you’ll incur unnecessary overhead for each record).

Instead, you’d want to use the iter_record_data utility method which takes a list of records as input and produces an iterator which yields a tuple of the record and the record’s data. This method is optimized for fetching data for multiple records at once as well as re-using lookup data and the deserialization utilities across multiple records:

for idx, (record, record_data) in enumerate(lqs.utils.iter_record_data(records, deserialize_results=True)):
    image = get_record_image(
        record_data,
        format="PNG",
    )
    image.thumbnail((200, 200)) # make them small for the sake of compactness, but the record_data is full-res
    display(image)
../_images/03_working_with_records_21_0.png
../_images/03_working_with_records_21_1.png
../_images/03_working_with_records_21_2.png
../_images/03_working_with_records_21_3.png
../_images/03_working_with_records_21_4.png
../_images/03_working_with_records_21_5.png
../_images/03_working_with_records_21_6.png

Basic Digestions

Digestions are processes managed by LogQS which provide a way to handle batches of records effeciently. Digestions are composed of four main processing steps: first, the record index information for the provided selection of records (based on topics, time ranges, etc.) is fetched from the database, then the underlying message data for each record is fetched and stored in a record “blob” in the object store, then the record blobs are processed according to the digestion workflow, and finally the record blobs are deleted from the object store.

Sometimes, you may want to work with batches of records locally, so you _don’t_ have a managed workflow ready to process the generated record blobs, but you effectively want to process these blobs yourself. In this case, you can use the lqs.utils.iter_digestion_records method to create an iterator which will yield the record data from these blobs. The benefit to doing this (versus iterating over record data directly, as is the case in the previous section) is that, by copying only the relevant records into the object store as blobs, you can more efficiently stream the data from the object store without having to download unnecessary record data or incur the overhead of incongrugent data.

As an example, assume we’ve created a digestion which contains a single topic of image data. We want to download these images and store them as individual files locally. When we create the digestion, we specify that it should use the Basic Digestion workflow. This workflow will create the record indexes as well as the record blobs, but will stop at the finalizing state. Once it has reached the finalizing state, you could run something like the following:

import os
from lqs import LogQS
from lqs.transcode import Transcode

lqs = LogQS()
transcoder = Transcode()

digestion_id = "cf893115-e194-4b93-bf24-06fcf7c49437" # our digestion's ID
image_dir = f"digestion_images/{digestion_id}"
os.makedirs(image_dir, exist_ok=True)

topics = {}
record_iter = lqs.utils.iter_digestion_records(digestion_id)
for digestion_part_id, part_entry, record_bytes in record_iter:
    topic_id = str(part_entry.topic_id)
    if topic_id not in topics:
        topics[topic_id] = lqs.fetch.topic(topic_id).data
    topic = topics[topic_id]
    record_data = transcoder.deserialize(
        message_bytes=record_bytes,
        type_name=topic.type_name,
        type_encoding=topic.type_encoding,
        type_data=topic.type_data,
    )
    image_bytes = lqs.utils.get_record_image(record_data, format="PNG", return_bytes=True)
    with open(os.path.join(image_dir, f"{part_entry.timestamp}.png"), "wb") as f:
        # image_bytes is io.BytesIO
        f.write(image_bytes.getbuffer())

The result of running this will be a folder with all the images from the digestion stored as PNG files.

The decision to leverage digestions like this, compared to fetching record data through the iter_record_data method or on a record-by-record basis, is based on a number of variables which will be context dependent. Digestions incur some overhead in that they need to process the indexes and blobs as well as some overhead for setting up the iterator, but once this is ready, downloading data this way will be much more efficient than using the other approaches. In general, you’ll want to download record data on a record-by-record basis if you only need to download a few records, while you’d use the record iterator if you need to download many records where selecting the records beforehand isn’t feasible, while you’d use the digestion iterator if you need to download many more records and you can afford the overhead of creating the digestion.