Resources
Generally, LogQS workflows operate around the following resources:
- Groups
- Logs
- Ingestions
- Message Types
- Topics
- Records
- Extractions
At a high-level, an end-to-end workflow through LogQS would have you creating a group, creating a log in that group, ingesting a log file into the log, querying message types for the log, querying topics based on the message types for the log, querying records from a topic and viewing record data, then creating an extraction from the log.
In the next sections, we'll walk through an example of this process.
Groups
Most resources are associated with one, and only one, group. Groups allow users to organize logs in separate partitions so that subresources (such as logs, topics, records, and message types) can be isolated as needed.
Users are associated with groups through a group association resource. Group associations pair a user with a group which allows the user to access resources in the group. A role
field on the group association indicates which actions the user is allowed to take on resources in the group. Possible role
values:
viewer
: only has read access to resources in the groupeditor
: has read and write access to resources in the groupowner
: has read and write access to resources in the group and to group association resources (i.e., can grant/revoke group access)
Users also have a boolean field is_admin
which indicates whether or not the user is an admin. Admins read and write to all resources in all groups in a DataStore.
To fetch groups you have access to:
2023-04-26 19:48:20,080 (INFO - rospy.topics): topicmanager initialized
The list
method for any resource returns a dictionary with pagination parameters count
, offset
, limit
, order
, and sort
which can be used to fetch further batches of resources as needed. The actual resource data is returned as a list in the data
attribute, where each element is a dictionary representation of the resource.
If we don't see a group named "Test Group", we'll have to ask an admin to create one (this can be accomplished via the API or in the admin app).
The list
method can take parameters which filter the results. Each resource has it's own set of available filters, and filters can be combined (i.e., like an AND
query).
We can fetch a group by name by using the name
filter. Note that even though we expect only one result, the data
field is still a list, so we set group
to be the first element of the array.
The group
object is a dictionary, so we can reference values by key:
Logs
Most workflows will start with the creation of a log resource. Logs can be thought of as containers for topics and, therefore, records. Logs are identified with a required name
and are associated with a single group through it's group_id
.
Since logs must have unique names within each group, we'll first check to see how many logs already exist with a name like "Test Log", then use that information to create log name which should be unique.
log_name_like = "Test Log" # change this as needed
logs_response = lqs.list.log(name_like=log_name_like)
logs_count = logs_response["count"]
log_name = f"Test Log ({logs_count})" # use the number of returned logs to ensure uniqueness
Now that we have an appropriate log name, we create the log using the create
method. Similar to listing objects, the returned object is a dictionary with a data
field containing the dictionary representation of the log.
Ingestions
Once we have a log, we'll want to ingest data into it. This is done by associating a log file stored in S3 with the log through an ingestion resource. An ingestion is also associated with a process that, once queued and started, will index the records from the log file into the log.
Note: Although it is natural to associate one log with one log file, it is possible to associate multiple log files with each log resource in LogQS by using multiple ingestions. This may be appropriate if you split your logs by time, topics, etc.
For this example, we'll use a single ROS bag, but we'll provide details on more complex scenarios later.
First, we'll download an example ROS bag to work with. This ROS bag is about 1.5GB large and is about a minute long with a single image topic. In Colab, you can find this file once it's finished downloading by clicking the "Files" tab on the left. This download should only take a few minutes.
url = "https://logqs-dev-admin-frontend.s3.amazonaws.com/assets/zeli_test.bag"
local_filename = url.split('/')[-1]
import requests
import shutil
with requests.get(url, stream=True) as r:
with open(local_filename, 'wb') as f:
shutil.copyfileobj(r.raw, f)
Currently, there are two approaches for uploading a log file to the system:
- Uploading the log yourself to an S3 bucket which the DataStore has access to and referencing it's bucket and key in the ingestion (e.g., appropriate if you already have data existing in S3).
- Uploading the data via the
/presignedUrls
resource (e.g., appropriate if you need to upload data into the system).
We consider option (2) to be the optimal option for most use-cases. We discuss this approach in detail in the Data Management
section of this notebook. If your workflow would work better with option (1), you can start with the AWS S3 docs.
To upload and ingest a log file, the steps will be as follows:
- Create an ingestion, leaving the
s3_bucket
ands3_key
fields blank/null and ensuring thequeued
field is set tofalse
(which it is by default). - Using the ingestion's ID, create a presigned URL for the
put_object
method by making aPOST
request to/ingestions/:ingestionId/presignedUrls
. This presigned URL will only allow us to upload up to a 5GB log file. Larger files or more effecient workflows will require the use of multipart uploads, which are explained in theData Management
section of this notebook. - Make a
PUT
request to that URL with the body being the binary data of the log. This will perform thePutObject
action via the S3 API. - Update the ingestion, setting
queued
totrue
and thes3_bucket
ands3_key
to the parameters provided when creating the presigned URL to start the ingestion process.
Ingestions are associated with a single log via it's log_id
field. An ingestion has an s3_bucket
field and s3_key
field, which should point to the log file stored in S3, and a required format
field, indicating the log format (e.g., ros
, mls
, etc.). Ingestions allow for an optional name
field (used for human-readable identification) and note
field.
By default, ingestions are created with their boolean queued
field set to false
. If this is set to true
, the ingestion will be created and the process will be started (if possible). If the log file already exists in S3 during the creation of an ingestion, this should be set to true
so that the ingestion is started immediately.
First, we create the ingestion. Here, we're using the ID of the log we created above.
ingestion = lqs.create.ingestion(
name="Test Ingestion",
log_id=log["id"],
s3_bucket=None,
s3_key=None,
format='ros',
queued=False,
note='For demo, made with script.'
)["data"]
ingestion_id = ingestion["id"]
Next, we use the put_object
method of the client's s3
inner class to upload the file.
Under the hood, this is being stored in an object store (likely S3) using a key specific to the resource. As such, the key
argument cannot contain any slashes or characters not allowed by S3.
with open(local_filename, 'rb') as f:
r_headers, r_params, r_body = lqs.s3.put_object(
resource="ingestion",
resource_id=ingestion_id,
key="zeli_test.bag",
body=f.read()
)
All s3
actions return a tuple of objects:
- Response headers (assigned to
r_headers
above) which are the headers returned by the object store service for the request. - Request parameters (assigned to
r_params
above) which are the parameters used for the action taken. - Response body (assigned to
r_body
above) which is the actual body of the response (sometimesNone
).
These returned objects are sometimes used later depending on the workflow, and can offer some information for troubleshooting as needed.
With the ingestion created and the file uploaded, we're ready to start the ingestion process. We use the request parameters to find the bucket and key where the object is stored and update the ingestion accordingly, and set queued
to True
to kickoff the ingestion process.
ingestion = lqs.update.ingestion(
ingestion_id=ingestion_id,
data=dict(
s3_bucket=r_params["Bucket"],
s3_key=r_params["Key"],
queued=True
)
)["data"]
Once an ingestion is queued (whether upon creation or after creation via an update), a task will start to process the ingestion. Once the task successfully starts (usually < 2 minutes from creation), the status
of ingestion will change from ready
to the current state of the processing task. When the status is complete
, the ingestion is finished processing and the data is completely ready.
Ingestions also include a set of boolean fields, queued
, processing
, errored
, cancelled
, and archived
, to indicate it's state. When queued
, processing
, or archived
are set to true
, the ingestion cannot be modified. If errored
or cancelled
is true
, the ingestion can be re-queued by updating it's queued
field to true
. The ingestion also has a float progress
field indicating the approximate fraction of completion (i.e., a value of 0.5
indicates the ingestion is roughly 50% complete).
We can periodically fetch the ingestion to monitor it's progress. The ingestion of our example should take less than 5 minutes to complete.
ingestion = lqs.get.ingestion(ingestion_id=ingestion_id)["data"]
print(f"Ingestion {ingestion_id} status: {ingestion['status']}")
print(f"queued: {ingestion['queued']}, processing: {ingestion['processing']}")
Ingesting Existing S3 Objects
In the previous section, we described what it takes to create an ingestion, upload data to the ingestion, then process the ingestion. However, there may be cases where you log data already exists in S3, and you just need LogQS to ingest it. This is possible in LogQS with a very similar workflow:
As before, you will need to create a log which the data will be ingested into. You will also create an ingestion like before, but this time you can supply the s3_bucket
and s3_key
on creation using the S3 information for the already uploaded object. Similarly, you can set queued
to True
to immedietely start ingesting the data (otherwise, queued
will be False
by default and the ingestion won't start until queued
is set to True
). At this point, the process is the same as if you were to upload it directly to LogQS.
For the sake of concreteness, assuming you have an existing ROS bag located at s3://my-log-bucket/logs/log.bag
, then it can be ingested into the log created previously with:
ingestion = lqs.create.ingestion(
name="Test Ingestion",
log_id=log["id"],
s3_bucket="my-log-bucket",
s3_key="logs/log.bag",
format='ros',
queued=True
)["data"]
Note, the LogQS application needs to have permission to read objects from the bucket you're supplying. To accomplish this, we recommend setting a simple, minimal policy on the bucket to give the DataStore's AWS account GetObject
permissions to all objects in the bucket, e.g.,
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::<DataStore Account ID>:root"
},
"Action": "s3:GetObject",
"Resource": "arn:aws:s3:::<Bucket Name>/*"
}
]
}
Where <DataStore Account ID>
is the AWS Account ID of the DataStore (this can be found at /version-info
of the API endpoint) and <Bucket Name>
is the name of the bucket that the policy is being applied to. LogQS only requires GetObject
permissions to fully operate, which gives it read access to the object data. You can modify the policy to make it as restrictive as needed, as long as LogQS has the ability to get the objects needed for ingestion. Removing this permission will prevent LogQS from being able to ingest data, extract data, or serve record binary data, but otherwise LogQS will remain operational across previously ingested logs.
More information can be found in the AWS docs.
Message Types
During ingestion processing, message types found in a log will be populated in the system if not already present (based on message type name and MD5). Message types are unique to each group, so a log associated with one group will always be associated with a completely different set of message types in another, even if the two share message types with the same name and MD5.
We can fetch message types in a particular group with:
We can find message types of a specific type by filtering on name
:
message_type_name = "sensor_msgs/Image"
message_type = lqs.list.message_type(group_id=group_id, name=message_type_name)["data"][0]
message_type_id = message_type["id"]
Topics
During ingestion processing, the topics for a log will be populated. Topics can be thought of containers for records which have the same message type and are temporally unique (i.e., two records of the same topic must have different timestamps). Topics in LogQS correspond pretty closely with topics in ROS.
We can fetch a paginated list of topics for a log by filtering the topics on log_id
:
We can filter topics by message type using the message_type_id
filter:
Each topic in a log has a unique name, so we can specify a name to find a specific topic. We can also filter on a topic's name
:
topic_name = topics[0]['name']
topic = lqs.list.topic(log_id=log_id, name=topic_name)["data"][0]
topic_id = topic["id"]
Records
Records are associated with one topic, and each record has a unique timestamp relative to the topic. In order to query records, we must supply their topic ID as path parameter:
If the records are an image type (i.e., their topic has a message type of either sensor_msgs/Image
or sensor_msgs/CompressedImage
), then, during ingestion, a compressed WEBP version of their image is cached for fast access later. To access this image, query for records and include a include_image
URL parameter set to true
, which will populate the image_url
field.
We can also filter the messages such that we list messages at a certain frequency. For example, by setting frequency=10
, we will receive 10 records per second, setting frequency=1
will receive 1 record per second, etc.
Records can be fetched between time ranges as well. For example, we can start fetching records at a timestamp later in the log by using the log's start_time
, adding the elapsed time to it, and setting the timestamp_gte
filter to that value.
# fetch fresh log data (i.e., post-ingestion)
log = lqs.get.log(log_id=log_id)["data"]
limit = 10
frequency = 10
timestamp_gte = log['start_time']
records = lqs.list.record(
topic_id=topic_id,
frequency=frequency,
limit=limit,
timestamp_gte=timestamp_gte,
include_image=True
)["data"]
Note: the record's image_url
field is populated with a pre-signed URL that can be used to fetch the cached image:
import base64
from IPython.display import HTML, display, clear_output
def show_image(image_bytes, width=None, height=None):
data_url = 'data:image/webp;base64,' + base64.b64encode(image_bytes).decode()
display(HTML("<img src='%s'>" % (data_url)))
image_url = records[0]["image_url"]
r = requests.get(image_url)
show_image(r.content)
By iterating over the records, we can visualize the images playing back in time:
for record in records:
image_url = record["image_url"]
r = requests.get(image_url)
clear_output(wait=True)
show_image(r.content)
Fetching Raw Message Data
Similar to the above workflow, the API provides the ability to download the binary message data for a given record. Here, we will demonstrate how we can use this data to retrieve the full-resolution version of the image we saw above.
First, we install and import py3rosmsgs so that we can deserialize the binary data.
Next, we may a similar request like before, but now we set the include_bytes
URL parameter to true
. You'll notice each record's bytes_url
field is populated with a pre-signed URL. This points to a file of binary data containing the message data payload.
limit = 10
frequency = 10
timestamp_gte = log['start_time']
records = lqs.list.record(
topic_id=topic_id,
frequency=frequency,
limit=limit,
timestamp_gte=timestamp_gte,
include_bytes=True
)["data"]
Since we know this is Image data, we can use the Image
class to deserialize it:
image = Image()
image.deserialize(message_data)
height, width = image.height, image.width
# convert between image encodings and PIL modes
img_modes = {
'16UC1': 'L',
'bayer_grbg8': 'L',
'mono8': 'L',
'mono16': 'L',
'8UC1': 'L',
'8UC3': 'RGB',
'rgb8': 'RGB',
'bgr8': 'RGB',
'rgba8': 'RGBA',
'bgra8': 'RGBA',
'bayer_rggb': 'L',
'bayer_gbrg': 'L',
'bayer_grbg': 'L',
'bayer_bggr': 'L',
'yuv422': 'YCbCr',
'yuv411': 'YCbCr'
}
mode = img_modes[image.encoding]
print(f"Image deserialized! Height: {height}, Width: {width}, Mode: {mode}")
img = PIL.Image.frombytes(mode, (width, height), image.data)
if image.encoding == 'bgr8':
b, g, r = img.split()
img = PIL.Image.merge('RGB', (r, g, b))
Here, img
is a full resolution version of the image which can be used for downstream tasks.
Extractions
Working with records through the API is a good way to explore and fetch individual records and records over small ranges of times. But if a larger subset of the data is needed, it may be more effecient to generate an extraction. With an extraction, you can specify a time range over different combinations of topics which is then used in an extraction process to generate a log file (currently only ROS bags are supported) which can be downloaded.
The process for building an extraction is to first create an extraction resource, then to create extraction topic resources which are associated with the extraction. The extraction can then be queued, which starts the task for processing. Once complete, we can created a presigned URL to download the extraction.
First, we create the extraction:
extraction_name = f"{log['name']} - {topic['name']} Extraction"
extraction = lqs.create.extraction(
name=extraction_name,
log_id=log["id"],
note="Made during demo."
)["data"]
extraction_id = extraction["id"]
Extraction Topics
Next, we create an extraction topic. An extraction topic is associated with an extraction through the extraction_id
field. We associate with one of the log's topics through the topic_id
field. We can optionally supply a start_time
and end_time
to specify a time range we want to extract (null
values indicate we extract over the start and end of the topic, respectively), and optionally a frequency
field to only include records at a specific frequency (i.e., similar to fetching records from the records endpoint).
Notice that, for this topic, we'll only extract from the log's start time to 15 seconds elapsed into the log.
extraction_topic_payload = {
'topic_id': topic_id,
'start_time': log['start_time'],
'end_time': log['start_time'] + 15.,
'frequency': None
}
extraction_topic = lqs.create.extraction_topic(
extraction_id=extraction_id,
topic_id=topic_id,
start_time=log["start_time"],
end_time=log["end_time"] + 15.,
frequency=None
)["data"]
extraction_topic_id = extraction_topic["id"]
Once we've added the extraction topics to the extraction, we can queue the extraction to kick off the process.
Similar to ingestions, we can monitor the extraction:
extraction = lqs.get.extraction(extraction_id=extraction_id)["data"]
print(f"Extraction {extraction_id} status: {extraction['status']}")
print(f"queued: {extraction['queued']}, processing: {extraction['processing']}")
Once the extraction process is complete, we can see what files are available to use via presigned URLs:
r_headers, r_params, r_body = lqs.s3.list_objects_v2(
resource="extraction",
resource_id=extraction_id
)
extraction_object_key = r_body['ListBucketResult']['Contents']['Key']
We should see a single object listed under Contents
. This is our extraction file. We now want to create another presigned URL to download this file.
Note: extraction files will always have the same name, so the preceding step will likely be unnecessary in most cases (we're going through the process here for the sake of completion).
Here, we use the get_object
method to create a presigned URL allowing us to download the object.
with open("extraction.bag", "wb") as f:
r_headers, r_params, r_body = lqs.s3.get_object(
resource="extraction",
resource_id=extraction_id,
key=extraction_object_key
)
f.write(r_body)
Once complete, you should have a file named extraction.bag
. You should note that this bag is smaller than the original since we only downloaded 15 seconds worth of data.