Skip to content

Working with Records

LogQS allows us to work with individual records of our logs. We can fetch record metadata (such as timestamps and cached fields for reference), fetch record bytes (which we can deserialize and use like any other ROS message locally), and insert records. Furthermore, LogQS allows us to manage message types, so that we can depend on the API to ensure we have the correct message deserialization code to handle our specific record formats.

Record Insertion

LogQS allows us to insert new records in logs directly through the API rather than through an ingestion. Although this process isn't nearly as effecient, there are cases where doing so makes sense. By utilizing the Message Type resource, we can also define custom message types not found in the original log.

At a high level, the process works like this:

  • First, we write a message definition and create a message type within LogQS with it.
  • Next, we use that message definition to serialize our data.
  • Then, we create a topic using that message type.
  • Finally, we insert records with our serialized data as a payload into that topic.

Message Definitions

First, it's important to understand what a message definition is in this context. In ROS, nodes communicate with each other by publishing messages to topics. A message is a simple data structure, comprising typed fields.

Creating Message Definitions

In LogQS, we organize message defintions through the Message Type resource. Message Types are created using a message definition, and are associated with topics so that LogQS knows how to serialize/deserialize message data automatically.

The following steps uses the lqs-client library to generate a Python class for a given message defintion. YOU DO NOT NEED TO USE LQS-CLIENT FOR THIS. We offer this as a convenience, but you can perform these steps either using ROS or manually.

To begin, we'll instantiate our LogQS client:

import io
import base64
import time
import requests
from lqs_client import LogQSClient

lqs = LogQSClient()
2023-04-26 19:59:48,950  (INFO - rospy.topics): topicmanager initialized

First, let's take a look at our custom msg file:

with open("assets/Sample.msg") as f:
    msg = f.read()

print(msg)
# This message contains sample data for testing

float32 x         # float value
float32 y         # float value
uint8[] data      # array of unsigned 8-bit integers
string tag        # a string

We generate message definition packages/files for our message type, which we call lqs_msgs/Sample:

lqs.gen.process_message_definition(msg, "lqs_msgs", "Sample")

Once generated, we generate the message classes we can use in Python (this will store message defintions/classes in /tmp):

lqs.gen.generate_messages()

With those messages generated, we: 1. Import the message definition class 2. Instantiate an object for the message 3. Fill it with data 4. Serialize it to a buffer 5. Base64 encode it

Sample = lqs.gen.get_message_class("lqs_msgs/Sample")

sample = Sample()

sample.x = 1.0
sample.y = 2.0
sample.data = [1, 2, 3, 4, 5]
sample.tag = "Hello World!"

buff = io.BytesIO()
sample.serialize(buff)

buff.seek(0)
data = buff.read()

print("Bytes:", data)

b64_data = base64.b64encode(data).decode()
print("Base64 Encoded Data:", b64_data)
Bytes: b'\x00\x00\x80?\x00\x00\x00@\x05\x00\x00\x00\x01\x02\x03\x04\x05\x0c\x00\x00\x00Hello World!'
Base64 Encoded Data: AACAPwAAAEAFAAAAAQIDBAUMAAAASGVsbG8gV29ybGQh

To verify this all worked, we decode the base64 representation and deserialize it back into the message class. The values we print should match what we provided.

decoded_data = base64.b64decode(b64_data)

sample = Sample()
sample.deserialize(decoded_data)

print(sample)
x: 1.0
y: 2.0
data: [1, 2, 3, 4, 5]
tag: "Hello World!"

One last thing to note is that our message class also provides us with message definition's md5sum, among other useful data:

msg_md5sum = Sample._md5sum
msg_type = Sample._type
msg_def = Sample._full_text

print(msg_md5sum)
print(msg_type)
print(msg_def)
a0c4b35aa34912e1af6e823036f4dff6
lqs_msgs/Sample
# This message contains sample data for testing

float32 x         # float value
float32 y         # float value
uint8[] data      # array of unsigned 8-bit integers
string tag        # a string

Great! So we have our message data in a format we can send to LogQS.

Once you have your message data serialized (using LogQS-Client or otherwise), we can create the resources in LogQS.

Message types in LogQS are organized by group. That is, each message type belongs to one, and only one, group. If you want to reuse a message type in a log in a different group, you need to create a new resource for that group.

We will be using the "Test Group," so first we fetch the group data.

group = lqs.list.groups(name="Test Group")["data"][0]

Next, we query for the message type based on the group ID, name, and MD5 sum. If it doesn't exist, we'll create it.

message_type_response = lqs.list.message_types(group_id=group["id"], name=msg_type, md5=msg_md5sum)["data"]

if len(message_type_response) == 0:
    message_type = lqs.create.message_type(
        group_id=group["id"],
        name=msg_type,
        md5=msg_md5sum,
        definition=msg_def,
    )["data"]
    print(f"Created message type: {message_type['name']}")
else:
    message_type = message_type_response[0]
    print(f"Found message type: {message_type['name']}")
Found message type: lqs_msgs/Sample

We can start using the message type now.

Creating Custom Records in LogQS

Now that we have a message type for our custom message stored in LogQS, we can use it to insert our custom records.

To demonstrate, we'll create a new log called "Custom Records."

log_response = lqs.list.logs(name="Custom Records")["data"]

if len(log_response) == 0:
    log = lqs.create.log(
        name="Custom Records",
        group_id=group["id"],
    )["data"]
    print(f"Created log: {log['name']}")
else:
    log = log_response[0]
    print(f"Found log: {log['name']}")
Found log: Custom Records

Within this log, we'll create a topic for our custom records.

Note that we associate the topic with our newly created message type.

topic_response = lqs.list.topics(name="/samples", log_id=log["id"])["data"]

if len(topic_response) == 0:
    topic = lqs.create.topic(
        name="/samples",
        log_id=log["id"],
        message_type_id=message_type["id"],
    )["data"]
    print(f"Created topic: {topic['name']}")
else:
    topic = topic_response[0]
    print(f"Found topic: {topic['name']}")
Found topic: /samples

Finally, we create a record in this topic using the Base64 encoded data supplied in the payload field. This will store the payload in the object store backend as well as populate relevant fields in the record.

timestamp = time.time()

lqs.create.record(
    timestamp=timestamp,
    topic_id=topic["id"],
    payload_bytes=b64_data,
)["data"]
{'timestamp': 1682539196.9702437,
 'topic_id': 'd3595d1a-25d9-4a0a-908e-6ccea5df3946',
 'log_id': 'a6db3294-0902-40c8-895a-62cd0ec984fd',
 'message_type_id': 'f2086a93-ba1c-4fc6-a3ac-83cafb392e49',
 'ingestion_id': None,
 'offset': 0,
 'length': 33,
 'data_offset': 0,
 'data_length': 33,
 'errored': False,
 'archived': False,
 'error': None,
 'note': None,
 'chunk_compression': None,
 'chunk_offset': None,
 'chunk_length': None,
 's3_bucket': 'lqs',
 's3_key': 'logs/a6db3294-0902-40c8-895a-62cd0ec984fd/topics/d3595d1a-25d9-4a0a-908e-6ccea5df3946/records/1682539196.9702437/record.bin',
 'format': None,
 'message_data': {'x': 1.0,
  'y': 2.0,
  'tag': 'Hello World!',
  'data': [1, 2, 3, 4, 5]},
 'context': None,
 'nanosecond': 1682539196970243584,
 'image_url': None,
 'bytes_url': None,
 'created_at': '2023-04-26T19:59:58.925227',
 'updated_at': None,
 'deleted_at': None,
 'created_by': '3c0979c0-fbcf-4084-9202-c32bedc0a6e9',
 'updated_by': None,
 'deleted_by': None}

Note that we can now query for that record based on the message data found in the serialized payload:

lqs.list.records(
    topic_id=topic["id"],
    data_filter=[{
        "var": "x",
        "op": "eq",
        "val": 1.0,
    }]
)["data"][0]
{'timestamp': 1682536809.0731158,
 'topic_id': 'd3595d1a-25d9-4a0a-908e-6ccea5df3946',
 'log_id': 'a6db3294-0902-40c8-895a-62cd0ec984fd',
 'message_type_id': 'f2086a93-ba1c-4fc6-a3ac-83cafb392e49',
 'ingestion_id': None,
 'offset': 0,
 'length': 33,
 'data_offset': 0,
 'data_length': 33,
 'errored': False,
 'archived': False,
 'error': None,
 'note': None,
 'chunk_compression': None,
 'chunk_offset': None,
 'chunk_length': None,
 's3_bucket': 'lqs',
 's3_key': 'logs/a6db3294-0902-40c8-895a-62cd0ec984fd/topics/d3595d1a-25d9-4a0a-908e-6ccea5df3946/records/1682536809.0731158/record.bin',
 'format': None,
 'message_data': {'x': 1.0,
  'y': 2.0,
  'tag': 'Hello World!',
  'data': [1, 2, 3, 4, 5]},
 'context': None,
 'nanosecond': 1682536809073115904,
 'image_url': None,
 'bytes_url': None,
 'created_at': '2023-04-26T19:20:09.386515',
 'updated_at': None,
 'deleted_at': None,
 'created_by': '3c0979c0-fbcf-4084-9202-c32bedc0a6e9',
 'updated_by': None,
 'deleted_by': None}

We can also include_bytes for this record:

record = lqs.list.records(
    topic_id=topic["id"],
    data_filter=[{
        "var": "x",
        "op": "eq",
        "val": 1.0,
    }],
    include_bytes=True
)["data"][0]

bytes_url = record['bytes_url']

These bytes should be the exact same bytes we sent as our payload. To verify this, we can pull the bytes down and deserialize them with our Sample class.

data = requests.get(bytes_url).content

sample = Sample()
sample.deserialize(data)

print(sample)
x: 1.0
y: 2.0
data: [1, 2, 3, 4, 5]
tag: "Hello World!"

Using JSON Message Payloads

In the above examples, we jumped through some hoops to serialize our message data. LogQS provides a best-effort means of doing this for us. Although the previous approach should be preferred to ensure data is delivered as expected, this method is offered as a convenience.

Here, we supply the data as a dictionary in the payload_dict field, with the dictionary key-values corresponding to the message definition.

timestamp = time.time()

lqs.create.record(
    timestamp=timestamp,
    topic_id=topic["id"],
    payload_dict={
        "x": 2.0,
        "y": 3.0,
        "data": [1, 2, 3, 4, 5],
        "tag": "Hello World, again!",
    },
)["data"]
{'timestamp': 1682539205.083629,
 'topic_id': 'd3595d1a-25d9-4a0a-908e-6ccea5df3946',
 'log_id': 'a6db3294-0902-40c8-895a-62cd0ec984fd',
 'message_type_id': 'f2086a93-ba1c-4fc6-a3ac-83cafb392e49',
 'ingestion_id': None,
 'offset': 0,
 'length': 40,
 'data_offset': 0,
 'data_length': 40,
 'errored': False,
 'archived': False,
 'error': None,
 'note': None,
 'chunk_compression': None,
 'chunk_offset': None,
 'chunk_length': None,
 's3_bucket': 'lqs',
 's3_key': 'logs/a6db3294-0902-40c8-895a-62cd0ec984fd/topics/d3595d1a-25d9-4a0a-908e-6ccea5df3946/records/1682539205.083629/record.bin',
 'format': None,
 'message_data': {'x': 2.0,
  'y': 3.0,
  'tag': 'Hello World, again!',
  'data': [1, 2, 3, 4, 5]},
 'context': None,
 'nanosecond': 1682539205083628800,
 'image_url': None,
 'bytes_url': None,
 'created_at': '2023-04-26T20:00:07.144116',
 'updated_at': None,
 'deleted_at': None,
 'created_by': '3c0979c0-fbcf-4084-9202-c32bedc0a6e9',
 'updated_by': None,
 'deleted_by': None}

Once again, we can query for this record and fetch it's bytes:

record = lqs.list.records(
    topic_id=topic["id"],
    data_filter=[{
        "var": "x",
        "op": "eq",
        "val": 2.0,
    }],
    include_bytes=True
)["data"][0]

bytes_url = record['bytes_url']

And verify that the record data is what we expect:

data = requests.get(bytes_url).content

sample = Sample()
sample.deserialize(data)

print(sample)
x: 2.0
y: 3.0
data: [1, 2, 3, 4, 5]
tag: "Hello World, again!"

Fetching Record Data

Up to this point, we've fetched record data using LogQS' bytes_url parameter. Although this is convenient and generally usable, it's not as performant as fetching the record data directly from S3. The LogQS Client provides utility functions to handle this for us.

To begin, we fetch records but we don't request their bytes URL. In doing so, we avoid the potential overhead of LogQS having to fetch the bytes and store them in a location from which we can retrieve them. Instead, we use the record metadata (specifically, the offset information) to locate the data we need for the record and make the request ourselves. In order for this to work, we need to have AWS access credentials which provide read access to our log objects in S3. Similar to LogQS credentials, those can either be supplied directly to the LogQS Client during instantiation or as environment variables (e.g., in our .env file). The values we need are:

  • AWS_ACCESS_KEY_ID
  • AWS_SECRET_ACCESS_KEY
  • AWS_DEFAULT_REGION

Continuing from the example above, we'll first use LogQS to fetch the message data for the record directly from S3:

record = lqs.list.records(
    topic_id=topic["id"],
    data_filter=[{
        "var": "x",
        "op": "eq",
        "val": 2.0,
    }],
    include_bytes=False # note: we're avoiding backend overhead by not including the bytes
)["data"][0]
direct_data = lqs.utils.get_message_data_from_record(record)
assert direct_data == data

Note that the data we fetched using the above utility function (a) came directly from S3 and (b) is exactly the same as the data we would have recieved from the bytes_url payload.

Next, we can get the message class for that record's data from LogQS and use it to deserialize the data:

message_class = lqs.utils.get_message_class(record['message_type_id'])
message = message_class()
message.deserialize(direct_data)

print(message)
x: 2.0
y: 3.0
data: [1, 2, 3, 4, 5]
tag: "Hello World, again!"

For your convenience, we can do this in one go:

message = lqs.utils.get_ros_message_from_record(record)

print(message)
x: 2.0
y: 3.0
data: [1, 2, 3, 4, 5]
tag: "Hello World, again!"

Not only is this pretty convenient, but it's also much faster!

As an added utility, we provide a means for fetching images from image records using a single method:

# Data from previous docs

group_name = "Test Group"
group = lqs.list.group(name=group_name)["data"][0]
group_id = group["id"]

message_type_name = "sensor_msgs/Image"
message_type = lqs.list.message_type(group_id=group_id, name=message_type_name)["data"][0]
message_type_id = message_type["id"]

topic = lqs.list.topic(message_type_id=message_type_id)["data"][0]
topic_id = topic["id"]

limit = 10
frequency = 10
timestamp_gte = log['start_time']

record = lqs.list.record(
    topic_id=topic_id,
    frequency=frequency,
    limit=limit,
    timestamp_gte=timestamp_gte,
    include_bytes=False
)["data"][0]
image = lqs.utils.get_image_from_record(record)
image

png