Skip to content

Resources

Generally, LogQS workflows operate around the following resources:

  • Groups
  • Logs
  • Ingestions
  • Message Types
  • Topics
  • Records
  • Extractions

At a high-level, an end-to-end workflow through LogQS would have you creating a group, creating a log in that group, ingesting a log file into the log, querying message types for the log, querying topics based on the message types for the log, querying records from a topic and viewing record data, then creating an extraction from the log.

In the next sections, we'll walk through an example of this process.

Groups

Most resources are associated with one, and only one, group. Groups allow users to organize logs in separate partitions so that subresources (such as logs, topics, records, and message types) can be isolated as needed.

Users are associated with groups through a group association resource. Group associations pair a user with a group which allows the user to access resources in the group. A role field on the group association indicates which actions the user is allowed to take on resources in the group. Possible role values:

  • viewer: only has read access to resources in the group
  • editor: has read and write access to resources in the group
  • owner: has read and write access to resources in the group and to group association resources (i.e., can grant/revoke group access)

Users also have a boolean field is_admin which indicates whether or not the user is an admin. Admins read and write to all resources in all groups in a DataStore.

To fetch groups you have access to:

from lqs_client import LogQSClient

lqs = LogQSClient()

groups = lqs.list.group()["data"]
2023-04-26 19:48:20,080  (INFO - rospy.topics): topicmanager initialized

The list method for any resource returns a dictionary with pagination parameters count, offset, limit, order, and sort which can be used to fetch further batches of resources as needed. The actual resource data is returned as a list in the data attribute, where each element is a dictionary representation of the resource.

If we don't see a group named "Test Group", we'll have to ask an admin to create one (this can be accomplished via the API or in the admin app).

The list method can take parameters which filter the results. Each resource has it's own set of available filters, and filters can be combined (i.e., like an AND query).

We can fetch a group by name by using the name filter. Note that even though we expect only one result, the data field is still a list, so we set group to be the first element of the array.

group_name = "Test Group"
group = lqs.list.group(name=group_name)["data"][0]

The group object is a dictionary, so we can reference values by key:

group_id = group["id"]

Logs

Most workflows will start with the creation of a log resource. Logs can be thought of as containers for topics and, therefore, records. Logs are identified with a required name and are associated with a single group through it's group_id.

Since logs must have unique names within each group, we'll first check to see how many logs already exist with a name like "Test Log", then use that information to create log name which should be unique.

log_name_like = "Test Log" # change this as needed

logs_response = lqs.list.log(name_like=log_name_like)
logs_count = logs_response["count"]

log_name = f"Test Log ({logs_count})" # use the number of returned logs to ensure uniqueness

Now that we have an appropriate log name, we create the log using the create method. Similar to listing objects, the returned object is a dictionary with a data field containing the dictionary representation of the log.

log = lqs.create.log(
    group_id=group["id"],
    name=log_name,
    note=None
)["data"]

log_id = log["id"]

Ingestions

Once we have a log, we'll want to ingest data into it. This is done by associating a log file stored in S3 with the log through an ingestion resource. An ingestion is also associated with a process that, once queued and started, will index the records from the log file into the log.

Note: Although it is natural to associate one log with one log file, it is possible to associate multiple log files with each log resource in LogQS by using multiple ingestions. This may be appropriate if you split your logs by time, topics, etc.

For this example, we'll use a single ROS bag, but we'll provide details on more complex scenarios later.

First, we'll download an example ROS bag to work with. This ROS bag is about 1.5GB large and is about a minute long with a single image topic. In Colab, you can find this file once it's finished downloading by clicking the "Files" tab on the left. This download should only take a few minutes.

url = "https://logqs-dev-admin-frontend.s3.amazonaws.com/assets/zeli_test.bag"
local_filename = url.split('/')[-1]
import requests
import shutil

with requests.get(url, stream=True) as r:
    with open(local_filename, 'wb') as f:
        shutil.copyfileobj(r.raw, f)

Currently, there are two approaches for uploading a log file to the system:

  1. Uploading the log yourself to an S3 bucket which the DataStore has access to and referencing it's bucket and key in the ingestion (e.g., appropriate if you already have data existing in S3).
  2. Uploading the data via the /presignedUrls resource (e.g., appropriate if you need to upload data into the system).

We consider option (2) to be the optimal option for most use-cases. We discuss this approach in detail in the Data Management section of this notebook. If your workflow would work better with option (1), you can start with the AWS S3 docs.

To upload and ingest a log file, the steps will be as follows:

  1. Create an ingestion, leaving the s3_bucket and s3_key fields blank/null and ensuring the queued field is set to false (which it is by default).
  2. Using the ingestion's ID, create a presigned URL for the put_object method by making a POST request to /ingestions/:ingestionId/presignedUrls. This presigned URL will only allow us to upload up to a 5GB log file. Larger files or more effecient workflows will require the use of multipart uploads, which are explained in the Data Management section of this notebook.
  3. Make a PUT request to that URL with the body being the binary data of the log. This will perform the PutObject action via the S3 API.
  4. Update the ingestion, setting queued to true and the s3_bucket and s3_key to the parameters provided when creating the presigned URL to start the ingestion process.

Ingestions are associated with a single log via it's log_id field. An ingestion has an s3_bucket field and s3_key field, which should point to the log file stored in S3, and a required format field, indicating the log format (e.g., ros, mls, etc.). Ingestions allow for an optional name field (used for human-readable identification) and note field.

By default, ingestions are created with their boolean queued field set to false. If this is set to true, the ingestion will be created and the process will be started (if possible). If the log file already exists in S3 during the creation of an ingestion, this should be set to true so that the ingestion is started immediately.

First, we create the ingestion. Here, we're using the ID of the log we created above.

ingestion = lqs.create.ingestion(
    name="Test Ingestion",
    log_id=log["id"],
    s3_bucket=None,
    s3_key=None,
    format='ros',
    queued=False,
    note='For demo, made with script.'
)["data"]

ingestion_id = ingestion["id"]

Next, we use the put_object method of the client's s3 inner class to upload the file.

Under the hood, this is being stored in an object store (likely S3) using a key specific to the resource. As such, the key argument cannot contain any slashes or characters not allowed by S3.

with open(local_filename, 'rb') as f:
    r_headers, r_params, r_body = lqs.s3.put_object(
        resource="ingestion",
        resource_id=ingestion_id,
        key="zeli_test.bag",
        body=f.read()
    )

All s3 actions return a tuple of objects:

  • Response headers (assigned to r_headers above) which are the headers returned by the object store service for the request.
  • Request parameters (assigned to r_params above) which are the parameters used for the action taken.
  • Response body (assigned to r_body above) which is the actual body of the response (sometimes None).

These returned objects are sometimes used later depending on the workflow, and can offer some information for troubleshooting as needed.

With the ingestion created and the file uploaded, we're ready to start the ingestion process. We use the request parameters to find the bucket and key where the object is stored and update the ingestion accordingly, and set queued to True to kickoff the ingestion process.

ingestion = lqs.update.ingestion(
    ingestion_id=ingestion_id,
    data=dict(
        s3_bucket=r_params["Bucket"],
        s3_key=r_params["Key"],
        queued=True
    )
)["data"]

Once an ingestion is queued (whether upon creation or after creation via an update), a task will start to process the ingestion. Once the task successfully starts (usually < 2 minutes from creation), the status of ingestion will change from ready to the current state of the processing task. When the status is complete, the ingestion is finished processing and the data is completely ready.

Ingestions also include a set of boolean fields, queued, processing, errored, cancelled, and archived, to indicate it's state. When queued, processing, or archived are set to true, the ingestion cannot be modified. If errored or cancelled is true, the ingestion can be re-queued by updating it's queued field to true. The ingestion also has a float progress field indicating the approximate fraction of completion (i.e., a value of 0.5 indicates the ingestion is roughly 50% complete).

We can periodically fetch the ingestion to monitor it's progress. The ingestion of our example should take less than 5 minutes to complete.

ingestion = lqs.get.ingestion(ingestion_id=ingestion_id)["data"]
print(f"Ingestion {ingestion_id} status: {ingestion['status']}")
print(f"queued: {ingestion['queued']}, processing: {ingestion['processing']}")

Ingesting Existing S3 Objects

In the previous section, we described what it takes to create an ingestion, upload data to the ingestion, then process the ingestion. However, there may be cases where you log data already exists in S3, and you just need LogQS to ingest it. This is possible in LogQS with a very similar workflow:

As before, you will need to create a log which the data will be ingested into. You will also create an ingestion like before, but this time you can supply the s3_bucket and s3_key on creation using the S3 information for the already uploaded object. Similarly, you can set queued to True to immedietely start ingesting the data (otherwise, queued will be False by default and the ingestion won't start until queued is set to True). At this point, the process is the same as if you were to upload it directly to LogQS.

For the sake of concreteness, assuming you have an existing ROS bag located at s3://my-log-bucket/logs/log.bag, then it can be ingested into the log created previously with:

ingestion = lqs.create.ingestion(
    name="Test Ingestion",
    log_id=log["id"],
    s3_bucket="my-log-bucket",
    s3_key="logs/log.bag",
    format='ros',
    queued=True
)["data"]

Note, the LogQS application needs to have permission to read objects from the bucket you're supplying. To accomplish this, we recommend setting a simple, minimal policy on the bucket to give the DataStore's AWS account GetObject permissions to all objects in the bucket, e.g.,

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::<DataStore Account ID>:root"
            },
            "Action": "s3:GetObject",
            "Resource": "arn:aws:s3:::<Bucket Name>/*"
        }
    ]
}

Where <DataStore Account ID> is the AWS Account ID of the DataStore (this can be found at /version-info of the API endpoint) and <Bucket Name> is the name of the bucket that the policy is being applied to. LogQS only requires GetObject permissions to fully operate, which gives it read access to the object data. You can modify the policy to make it as restrictive as needed, as long as LogQS has the ability to get the objects needed for ingestion. Removing this permission will prevent LogQS from being able to ingest data, extract data, or serve record binary data, but otherwise LogQS will remain operational across previously ingested logs.

More information can be found in the AWS docs.

Message Types

During ingestion processing, message types found in a log will be populated in the system if not already present (based on message type name and MD5). Message types are unique to each group, so a log associated with one group will always be associated with a completely different set of message types in another, even if the two share message types with the same name and MD5.

We can fetch message types in a particular group with:

message_types = lqs.list.message_type(group_id=group_id)["data"]

We can find message types of a specific type by filtering on name:

message_type_name = "sensor_msgs/Image"
message_type = lqs.list.message_type(group_id=group_id, name=message_type_name)["data"][0]
message_type_id = message_type["id"]

Topics

During ingestion processing, the topics for a log will be populated. Topics can be thought of containers for records which have the same message type and are temporally unique (i.e., two records of the same topic must have different timestamps). Topics in LogQS correspond pretty closely with topics in ROS.

We can fetch a paginated list of topics for a log by filtering the topics on log_id:

topics = lqs.list.topic(log_id=log_id)["data"]

We can filter topics by message type using the message_type_id filter:

topics = lqs.list.topic(log_id=log_id, message_type_id=message_type_id)["data"]

Each topic in a log has a unique name, so we can specify a name to find a specific topic. We can also filter on a topic's name:

topic_name = topics[0]['name']

topic = lqs.list.topic(log_id=log_id, name=topic_name)["data"][0]

topic_id = topic["id"]

Records

Records are associated with one topic, and each record has a unique timestamp relative to the topic. In order to query records, we must supply their topic ID as path parameter:

records = lqs.list.record(topic_id=topic_id)["data"]

If the records are an image type (i.e., their topic has a message type of either sensor_msgs/Image or sensor_msgs/CompressedImage), then, during ingestion, a compressed WEBP version of their image is cached for fast access later. To access this image, query for records and include a include_image URL parameter set to true, which will populate the image_url field.

We can also filter the messages such that we list messages at a certain frequency. For example, by setting frequency=10, we will receive 10 records per second, setting frequency=1 will receive 1 record per second, etc.

Records can be fetched between time ranges as well. For example, we can start fetching records at a timestamp later in the log by using the log's start_time, adding the elapsed time to it, and setting the timestamp_gte filter to that value.

# fetch fresh log data (i.e., post-ingestion)
log = lqs.get.log(log_id=log_id)["data"]

limit = 10
frequency = 10
timestamp_gte = log['start_time']

records = lqs.list.record(
    topic_id=topic_id,
    frequency=frequency,
    limit=limit,
    timestamp_gte=timestamp_gte,
    include_image=True
)["data"]

Note: the record's image_url field is populated with a pre-signed URL that can be used to fetch the cached image:

import base64
from IPython.display import HTML, display, clear_output

def show_image(image_bytes, width=None, height=None):
    data_url = 'data:image/webp;base64,' + base64.b64encode(image_bytes).decode()
    display(HTML("<img src='%s'>" % (data_url)))

image_url = records[0]["image_url"]
r = requests.get(image_url)
show_image(r.content)

By iterating over the records, we can visualize the images playing back in time:

for record in records:
    image_url = record["image_url"]
    r = requests.get(image_url)
    clear_output(wait=True)
    show_image(r.content)

Fetching Raw Message Data

Similar to the above workflow, the API provides the ability to download the binary message data for a given record. Here, we will demonstrate how we can use this data to retrieve the full-resolution version of the image we saw above.

First, we install and import py3rosmsgs so that we can deserialize the binary data.

!pip install py3rosmsgs
from sensor_msgs.msg import Image

import PIL

Next, we may a similar request like before, but now we set the include_bytes URL parameter to true. You'll notice each record's bytes_url field is populated with a pre-signed URL. This points to a file of binary data containing the message data payload.

limit = 10
frequency = 10
timestamp_gte = log['start_time']

records = lqs.list.record(
    topic_id=topic_id,
    frequency=frequency,
    limit=limit,
    timestamp_gte=timestamp_gte,
    include_bytes=True
)["data"]
bytes_url = records[0]["bytes_url"]
r = requests.get(bytes_url)
message_data = r.content

Since we know this is Image data, we can use the Image class to deserialize it:

image = Image()
image.deserialize(message_data)
height, width = image.height, image.width

# convert between image encodings and PIL modes
img_modes = {
    '16UC1': 'L',
    'bayer_grbg8': 'L',
    'mono8': 'L',
    'mono16': 'L',
    '8UC1': 'L',
    '8UC3': 'RGB',
    'rgb8': 'RGB',
    'bgr8': 'RGB',
    'rgba8': 'RGBA',
    'bgra8': 'RGBA',
    'bayer_rggb': 'L',
    'bayer_gbrg': 'L',
    'bayer_grbg': 'L',
    'bayer_bggr': 'L',
    'yuv422': 'YCbCr',
    'yuv411': 'YCbCr'
}

mode = img_modes[image.encoding]

print(f"Image deserialized! Height: {height}, Width: {width}, Mode: {mode}")
img = PIL.Image.frombytes(mode, (width, height), image.data)
if image.encoding == 'bgr8':
    b, g, r = img.split()
    img = PIL.Image.merge('RGB', (r, g, b))

Here, img is a full resolution version of the image which can be used for downstream tasks.

img

Extractions

Working with records through the API is a good way to explore and fetch individual records and records over small ranges of times. But if a larger subset of the data is needed, it may be more effecient to generate an extraction. With an extraction, you can specify a time range over different combinations of topics which is then used in an extraction process to generate a log file (currently only ROS bags are supported) which can be downloaded.

The process for building an extraction is to first create an extraction resource, then to create extraction topic resources which are associated with the extraction. The extraction can then be queued, which starts the task for processing. Once complete, we can created a presigned URL to download the extraction.

First, we create the extraction:

extraction_name = f"{log['name']} - {topic['name']} Extraction"

extraction = lqs.create.extraction(
    name=extraction_name,
    log_id=log["id"],
    note="Made during demo."
)["data"]

extraction_id = extraction["id"]

Extraction Topics

Next, we create an extraction topic. An extraction topic is associated with an extraction through the extraction_id field. We associate with one of the log's topics through the topic_id field. We can optionally supply a start_time and end_time to specify a time range we want to extract (null values indicate we extract over the start and end of the topic, respectively), and optionally a frequency field to only include records at a specific frequency (i.e., similar to fetching records from the records endpoint).

Notice that, for this topic, we'll only extract from the log's start time to 15 seconds elapsed into the log.

extraction_topic_payload = {
    'topic_id': topic_id,
    'start_time': log['start_time'],
    'end_time': log['start_time'] + 15.,
    'frequency': None
}

extraction_topic = lqs.create.extraction_topic(
    extraction_id=extraction_id,
    topic_id=topic_id,
    start_time=log["start_time"],
    end_time=log["end_time"] + 15.,
    frequency=None
)["data"]

extraction_topic_id = extraction_topic["id"]

Once we've added the extraction topics to the extraction, we can queue the extraction to kick off the process.

extraction = lqs.update.extraction(
    extraction_id=extraction_id,
    data=dict(
        queued=True
    )
)["data"]

Similar to ingestions, we can monitor the extraction:

extraction = lqs.get.extraction(extraction_id=extraction_id)["data"]

print(f"Extraction {extraction_id} status: {extraction['status']}")
print(f"queued: {extraction['queued']}, processing: {extraction['processing']}")

Once the extraction process is complete, we can see what files are available to use via presigned URLs:

r_headers, r_params, r_body = lqs.s3.list_objects_v2(
    resource="extraction",
    resource_id=extraction_id
)

extraction_object_key = r_body['ListBucketResult']['Contents']['Key']

We should see a single object listed under Contents. This is our extraction file. We now want to create another presigned URL to download this file.

Note: extraction files will always have the same name, so the preceding step will likely be unnecessary in most cases (we're going through the process here for the sake of completion).

Here, we use the get_object method to create a presigned URL allowing us to download the object.

with open("extraction.bag", "wb") as f:
    r_headers, r_params, r_body = lqs.s3.get_object(
        resource="extraction",
        resource_id=extraction_id,
        key=extraction_object_key
    )
    f.write(r_body)

Once complete, you should have a file named extraction.bag. You should note that this bag is smaller than the original since we only downloaded 15 seconds worth of data.