Workflows
Workflows provide a way to allow users to tie custom, external processes to LogQS processes, such as ingestions and digestions. Workflows are effectively just collections of Hooks which are webhooks that are triggered by process state transitions. A user can define a workflow and assign a set of hooks to it, then assign that workflow to a process. When the process transitions to a state that has a hook assigned to it, the hook will be triggered.
Tutorial: Receiving Workflow Hook Request in AWS Lambda
To examplify this feature, we will create a simple workflow which will generate some statistics about records selected via a digestion. We will use Amazon Web Services (AWS) (specifically, a Lambda function using a Function URL) to faciliate this process, but any service that can receive a webhook can be used.
2. Create a Lambda Function
We’ll use a Python 3.11 runtime for this example. We will check “Enable function URL,” use the “NONE” Auth type, and configure CORS.
3. Add the LogQS Client Lambda Layer
Once the function is created, we will add the pre-made LogQS layer to the function. This layer contains the LogQS client. The current version of the layer is 9 and it’s ARN is arn:aws:lambda:us-east-1:902375420399:layer:LogQS:9.
4. Add the Event Receiving Code
We can now add the code for the function. The code below is a simple boilerplate for a Lambda function that will receive a webhook and return a response.
from lqs.client.server import lambda_handler_handler
def event_callback(event):
return {"status": "ok", "event": event.model_dump(mode="json")}
def lambda_handler(event, context):
return lambda_handler_handler(event, context, event_callback=event_callback)
Once the code is added to the editor, we can save and deploy the function.
5. Test the Function Endpoint
At this point, the function is ready to receive a hook request. We can test this by manually making a POST request to the function’s URL, which can be found in the top right corner of the Lambda function’s page.
This endpoint expects to receive an EventCreateRequest payload, which looks like the following:
class EventCreateRequest(BaseModel):
previous_state: Optional[ProcessState] = None
current_state: ProcessState
process_type: ProcessType
resource_id: UUID
workflow_id: Optional[UUID] = None
hook_id: Optional[UUID] = None
datastore_id: Optional[UUID] = None
datastore_endpoint: Optional[str] = None
We will use a mock payload for testing purposes:
{
"current_state": "finalizing",
"process_type": "digestion",
"resource_id": "00000000-0000-0000-0000-000000000000"
}
To test this, we’ll use curl to make a POST request to the /events endpoint of the function’s URL. Be sure to include the Content-Type header with the value application/json.
curl -X POST https://<Function URL ID>.lambda-url.us-east-1.on.aws/events \
-H "Content-Type: application/json" \
-d '{"current_state": "finalizing", "process_type": "digestion", "resource_id": "00000000-0000-0000-0000-000000000000"}'
If the request is successful, the function will return a response with the status ok and the event payload, i.e.,
{
"status": "ok",
"event": {
"previous_state": null,
"current_state": "finalizing",
"process_type": "digestion",
"resource_id": "00000000-0000-0000-0000-000000000000",
"workflow_id": null,
"hook_id": null,
"datastore_id": null,
"datastore_endpoint": null
}
}
6. Set the LogQS Credentials
At this point, we have a Lambda function that can receive a webhook request. However, for this to be useful, we need the function to be able to interact with LogQS. To do this, we need to supply the function with the necessary credentials to authenticate with LogQS. Under the hood, we’re just using the LogQS client, so we can configure it the same way we would configure the client in any other application. The client can be configured using environment variables, which can be set in the Lambda function’s configuration.
In the Lambda function’s configuration, we can add the necessary environment variables.
NOTE: While you’re modifying the function’s configuration, you should also set the function’s timeout to a reasonable value (say, 30 seconds) to prevent the function from being terminated prematurely.
As per typical usage, we will need to supply the following environment variables:
LQS_API_KEY_ID
LQS_API_KEY_SECRET
LQS_DATASTORE_ID
An API Key can be generated by following the instructions in the Generating an API Key section.
Additionally, it’s a good idea to protect your endpoint with a secret to prevent unauthorized access. When using the client’s server module, the secret can be set using the LQS_EXPECTED_SECRET environment variable. This can be any string, but it should be sufficiently complex and kept secret.
For example, you could run something like openssl rand -base64 16 and use the output, such as iy2BL/OlhxFuuCcpXYB7vw==, as the secret.
We can test that these keys work by having the Lambda function make an authenticated request to LogQS. We can do this by using the lqs.client.server.lambda_handler function, which will handle the request and authenticate it using the environment variables.
from lqs import LogQS
from lqs.client.server import lambda_handler_handler
def event_callback(event):
lqs = LogQS()
me = lqs.fetch.me().data
return {"status": "ok", "me": me.model_dump(mode="json")}
def lambda_handler(event, context):
return lambda_handler_handler(event, context, event_callback=event_callback)
You can now test the function like before by making a POST request to the function’s URL. If the request is successful, the function will return a response with the status ok and the user’s information. If you supplied the LQS_EXPECTED_SECRET environment variable, you will need to include it in the request as a header with the key Authorization and the value Bearer <base64-encoded-secret>.
You can get the base64-encoded secret by running echo -n “<secret here>” | base64 and using the output as the value.
curl -X POST https://<Function URL ID>.lambda-url.us-east-1.on.aws/events \
-H "Content-Type: application/json" \
-H "Authorization: Bearer <base-64-encoded-secret>" \
-d '{"current_state": "finalizing", "process_type": "digestion", "resource_id": "00000000-0000-0000-0000-000000000000"}'
7. Doing Something with the Event
Now that we have a Lambda function that can receive a webhook request and authenticate with LogQS, we can do something with the event. In this example, we will fetch the digestion and it’s associated parts then generate some statistics about the records selected, then store that information on the digestion’s log’s context.
from concurrent.futures import ThreadPoolExecutor
from lqs import LogQS
from lqs.client.server import lambda_handler_handler
def event_callback(event):
print(f"Received event: {event.model_dump(mode='json')}")
lqs = LogQS()
# fetch the digestion resource
digestion = lqs.fetch.digestion(digestion_id=event.resource_id).data
# first, we need to fetch all digestion parts
limit = 10 # we set this to 10 to avoid getting too many results
parts = lqs.list.digestion_part(digestion_id=digestion.id, limit=limit).data
# next, we iterate over each part and collect some information
record_count = 0
record_size = 0
with ThreadPoolExecutor() as executor:
full_parts = executor.map(
lambda part: lqs.fetch.digestion_part(
digestion_id=digestion.id,
digestion_part_id=part.id
).data,
parts
)
for part in full_parts:
for entry in part.index:
# each entry of the index is a tuple with the following structure:
(
topic_id,
ingestion_id,
source,
data_offset,
data_length,
chunk_compression,
chunk_offset,
chunk_length,
timestamp
) = entry
record_count += 1
if chunk_length is not None:
# if chunk_length is present, then we should use this
# as the record's size
record_size += chunk_length
else:
# otherwise, we should use data_length
record_size += data_length
# next, we fetch the log and update its context
log = lqs.fetch.log(log_id=digestion.log_id).data
# we don't want to overwrite an existing context, so we'll
# make sure to keep the existing context and add some data
context = log.context or {}
context["digestion_stats"] = {
"digestion_id": str(digestion.id),
"record_count": record_count,
"record_size": record_size
}
lqs.update.log(
log_id=log.id,
data=dict(
context=context
)
)
# once we've updated the log's context,
# we can transition the digestion to completed
lqs.update.digestion(
digestion_id=digestion.id,
data=dict(
state="completed"
)
)
return {"status": "ok"}
def lambda_handler(event, context):
return lambda_handler_handler(event, context, event_callback=event_callback)
When using more complex functions, it’s a good idea to use CloudWatch in AWS in order to troubleshoot any issues that may arise. You can access the logs for the given Lambda function from the “Monitor” tab in the Lambda console.
8. Creating a Workflow
Now that we have a Lambda function that can receive a webhook request and authenticate with LogQS, we can create a workflow that will use this function. To do this, we will need to create a new workflow in LogQS and assign a hook to it.
First, create a workflow in LogQS and name it something like “AWS Lambda Test”. Then, create a hook for the workflow and name it “AWS Lambda Test Hook”. Assign the hook to the “finalizing” trigger state of the “digestion” process type. The URI should be the URL of the Lambda function not including the /events endpoint. If you configured an expected secret, supply that to the “Secret” field (do not base64 encode it or include the Bearer prefix).
Once created, this workflow can be attached to digestions and will be triggered when the digestion transitions to the “finalizing” state.
9. Testing the Workflow
To test the workflow, create a new digestion and assign the workflow to it. We can create a digestion through the Player view by opening the “Create a Digestion” sidebar by clicking the “scissors” icon in the top right corner of the page. We supply an optional name for the digestion, then ensure we select the workflow we just created. We can then select the topics and time ranges we want (which is arbitrary for this example) to digest and click “CREATE DIGESTION” to create the digestion.
Once the digestion is created, LogQS will transition the digestion to the “processing” state automatically. During this state, LogQS will generate Digestion Parts for the digestion corresponding to the topics and time ranges you selected. Once this is complete, the digestion will transition to the “finalizing” state, which will trigger the hook we created. This will send a webhook request to the Lambda function.
At this point, the digestion should run and result in a completed state and the log’s context should now include the information we gathered in the Lambda function.