Skip to content

Data Management

LogQS is based around robotics log data, which is often big and difficult to deal with. As a result, we provide special interfaces which allow for effeciently transfering this data into the system. These interfaces are represented by as resources related to other resources where data management is relevent (currently, ingestions and extractions), and they allow users to interact directly with the object store in a secure and effecient way.

To accomplish this, we expose a subset of AWS S3's functionality directly to the user via presigned URLs. In order to keep the interface simple and consistent, we try to expose a subset of S3 actions in a one-to-one manner, such that someone familiar with S3 could understand our system naturally. Only a small subset of these actions are relevant to our system, and thus we only expose their corresponding methods:

  • abort_multipart_upload
  • complete_multipart_upload
  • create_multipart_upload
  • delete_object
  • get_object
  • head_object
  • list_multipart_uploads
  • list_object_versions
  • list_objects_v2
  • list_parts
  • put_object
  • restore_object
  • upload_part

We expose these actions via /presignedUrls endpoints which are related to specific ingestions and extractions, i.e., /ingestions/:ingestionId/presignedUrls, etc. The only available method on these endpoints is POST, and the body contains the name of the method as well as a payload whose specification is based on the method. You create an ephemeral presigned URL which is only ever accessible as the result of the POST request which can be used to perform the action directly against S3.

Multipart Uploads

In the previous sections, we illustrated some of this functionality:

  • we used put_object to upload a log for ingestion
  • we used list_objects_v2 to list available objects related to an extraction
  • we used get_object to download an extraction file

Aside from these actions, you'll likely want to leverage S3's ability to handle multipart uploads. In a nutshell: a multipart upload allows you to upload files to S3 in small parts, which allows us to:

  1. upload files without the 5GB limit imposed by put_object
  2. upload files faster by uploading parts in parallel
  3. resume uploads which have failed/been cancelled

Multipart uploads are a bit more complex than the actions we've used up to this point, but conceptually, the steps for performing a multipart upload are quite simple:

  1. First, we create a multipart upload via create_multipart_upload
  2. Next, we upload parts via upload_part
  3. Once all of the parts have been uploaded, we finish the process via complete_multipart_upload

To demonstrate this, we will create an ingestion similar to the one in the previous section, but using multipart uploads:

First, we create a new ingestion pointing to the same log as before (ingesting the same log twice will result in overwriting records, which is not generally recommended, but fine for this purpose).

ingestion = lqs.create.ingestion(
    name="Test Multipart Ingestion",
    log_id=log["id"],
    s3_bucket=None,
    s3_key=None,
    format='ros',
    queued=False,
    note='For demo, made with script.'
)["data"]

ingestion_id = ingestion["id"]

Next, we create the multipart upload.

r_headers, r_params, r_body = lqs.s3.create_multipart_upload(
    resource="ingestion",
    resource_id=ingestion_id,
    key="multi_zeli_test.bag"
)
upload_id = r_body["InitiateMultipartUploadResult"]["UploadId"]

print(f"Upload ID: {upload_id}")

Now, we upload the parts. Here, we're using a ThreadPoolExecutor to upload a few of these parts in parallel (although there are plenty of other ways to go about doing this). Our choice of part size is (somewhat) arbitrary, and can be tuned accordingly. We have to keep track of each part's PartNumber and ETag during this process.

from concurrent.futures import ThreadPoolExecutor

log_file = open(local_filename, 'rb')
log_size = os.fstat(log_file.fileno()).st_size
log_data = log_file.read()

def part_uploader(upload_id, i, part_size):
    part_data = log_data[i * part_size : (i + 1) * part_size]
    print(f"Uploading part {i + 1}/{part_count} ({len(part_data)} bytes)")
    r_headers, r_params, r_body = lqs.s3.upload_part(
        resource="ingestion",
        resource_id=ingestion_id,
        key="multi_zeli_test.bag",
        part_number=i + 1,
        upload_id=upload_id,
        body=part_data
    )
    e_tag = r_headers["ETag"]
    return {"PartNumber": i + 1, "ETag": e_tag}

part_size = 5 * 1024 * 1024
part_count = (log_size // part_size) + 1
parts = []
futures = []

print(f"Log size: {log_size}; parts: {part_count}")

with ThreadPoolExecutor(max_workers=16) as executor:
    for i in range(part_count):
        future = executor.submit(part_uploader, upload_id, i, part_size)
        futures.append(future)

for future in futures:
    parts.append(future.result())

The above should have completed faster than if you had tried to upload the file all at once. We can take a look at a few of our parts to get an idea of the data we need to keep track of:

parts[:4]

Next, optionally, we can list the uploaded parts:

r_headers, r_params, r_body = lqs.s3.list_parts(
    resource="ingestion",
    resource_id=ingestion_id,
    key="multi_zeli_test.bag",
    upload_id=upload_id
)
r_body["ListPartsResult"]["Part"][:4]

As you can see, we don't actually have to keep track of the part meta data, since we can fetch it via this endpoint (although it's usually easier to just keep track of it as we go).

You can imagine how using such an endpoint would allow one to resume a failed/cancelled download: when you go to resume an upload, you'd first list the parts which have already been uploaded, then start where you left off.

lqs.s3.complete_multipart_upload(
    resource="ingestion",
    resource_id=ingestion_id,
    key="multi_zeli_test.bag",
    upload_id=upload_id,
    parts=parts
)

And that's it! The file has been uploaded to S3, and is accessible to the DataStore for further processing. We can double check by listing the objects in our bucket.

r_headers, r_params, r_body = lqs.s3.list_objects_v2(
    resource="ingestion",
    resource_id=ingestion_id
)
print(r_body)

You should see our file in there.

Lastly, for the sake of completion, we finish the ingestion:

ingestion = lqs.update.ingestion(
    ingestion_id=ingestion_id,
    data=dict(
        s3_bucket=r_body["ListBucketResult"]["Name"],
        s3_key=r_body["ListBucketResult"]["Contents"]["Key"],
        queued=True
    )
)["data"]
ingestion = lqs.get.ingestion(ingestion_id=ingestion_id)["data"]
print(f"Ingestion {ingestion_id} status: {ingestion['status']}")
print(f"queued: {ingestion['queued']}, processing: {ingestion['processing']}")

Downloading Data

Although LogQS provides an effecient and convenient means of access log data stored in the cloud, sometimes you'll have to download your uploaded data directly. This is typically accomplished in LogQS by first listing objects for a resource using the list_objects_v2 action then using the get_object action on the objects returned from the first request.

Assuming you want to download the log uploaded in the previous section, you'd first list the objects for that ingestion:

r_headers, r_params, r_body = lqs.s3.list_objects_v2(
    resource="ingestion",
    resource_id=ingestion_id
)
print(r_body)

Then, using the information returned from that request, you can get the object:

r_headers, r_params, r_body = lqs.s3.get_object(
    resource="ingestion",
    resource_id=ingestion_id,
    key=r_body["ListBucketResult"]["Contents"]["Key"]
)

Note that the object data is found in r_body. This can be written to disk with something like:

with open("downloaded.bag", "wb") as f:
    f.write(r_body)

And that's it! There's a few other S3 actions which we expose as presigned URLs, but they all operate in generally the same manner. Be sure to check out the AWS S3 docs for details.