Integrating with Labeling Using Webhooks

Motivation

A common usecase for Aquarium datasets is to identify problem labels and group them using issues, downloading a json or csv representation of an issue's elements, and then using scripting or a manual process to reformat the data to submit to a labeling service, whether an external vendor or an internal tool. By leveraging webhook configurations, the reformatting and submitting steps can be automated by a single handler. This allows an Aquarium user to directly send data to a labeling service directly from the UI without writing code each time.

The issue-exported event will POST the issue's elements to a webhook. A full schema can be found in the Event Schemas section of the general Webhooks page.

Schema with Example Map Keys
{
event: "issue-exported",
project: str,
issue: {
id: str,
elements: [{
dataset: str,
inference_set: str,
issue_name: str,
element_id: str,
element_type: "frame" | "crop",
frame_id: str,
frame_data: {
coordinate_frames: [{
coordinate_frame_id: str,
coordinate_frame_metadata: Optional[Dict],
coordinate_frame_type: str,
}],
custom_metrics: {
[custom_metric_type]: int[][] | float,
...
},
date_captured: str,
device_id: str,
geo_data: {
[coordinate_field]: float,
...
},
label_data: [{
attributes: {
confidence: float,
...
},
label: str,
label_coorindate_frame: str,
label_type: str,
linked_labels: str[],
uuid: str,
}, ...],
sensor_data: [{
coordinate_frame: str,
data_urls: {
image_url: str,
...
},
date_captured: str,
sensor_id: str,
sensor_metadata: Dict,
sensor_type: str,
}, ...],
task_id: str,
[user__metadata_field]: str,
},
}, ...]
}
}

Shaping the Webhook Payload To Send to Labeling

You can then use the elements in issue to create a new payload that is accepted by a labeling service; we've provided some sample code that serves a webhook endpoint and transforms an Aquarium payload to some common formats.

GraphQL
REST
GraphQL
server.py
from flask import Flask, request
import os
from python_graphql_client import GraphqlClient
AQ_WEBHOOK_SECRET = os.getenv("AQ_WEBHOOK_SECRET")
LABELING_API_KEY = os.getenv("LABELING_API_KEY")
LABELING_API_ENDPOINT = os.getenv("LABELING_API_ENDPOINT")
labeling_api_headers = {
# Replace with the proper API key header if any for your service
"Authorization": f"Bearer {LABELING_API_KEY}"
}
client = GraphqlClient(endpoint=LABELING_API_ENDPOINT, headers=labeling_api_headers)
# Replace with appropriate graphql mutation.
# In this example, the way to requeue a label is to remove it and mark as a template
relabel_mutation_fragment = """
mutation BulkDeleteLabels (
$projectId: ID!,
$makeTemplates: Boolean = true,
$labelIds: [ID!]) {
project (where: {id: $projectId}) {
bulkDeleteLabels (
where: {
id_in: $labelIds
},
makeTemplates: $makeTemplates,
waitForQueue: true
) {
count
}
}
}
"""
app = Flask(__name__)
@app.route("/webhook", methods=["POST"])
def handle_webhook_payload():
# Optionally verify that the payload came from Aquarium
aq_secret = request.headers.get("x-aquarium-secret")
if aq_secret != AQ_WEBHOOK_SECRET:
return f"Bad Request: {msg}", 400
payload_envelope = request.get_json()
if not payload_envelope:
msg = "no payload body received"
print(f"error: {msg}")
return f"Bad Request: {msg}", 400
if not isinstance(payload_envelope, dict) or not payload_envelope.get("event"):
msg = "invalid webhook payload format"
print(f"error: {msg}")
return f"Bad Request: {msg}", 400
event_type = payload_envelope["event"]
if event_type == "issue-exported":
if not payload_envelope.get("issue"):
msg = "webhook payload did not contain expected key: issue"
print(f"error: {msg}")
return f"Bad Request: {msg}", 400
_format_and_export_to_graphql_api(payload_envelope["project_name"], payload_envelope["issue"])
else:
msg = f"endpoint not setup to handle {event_type} events yet"
print(f"error: {msg}")
return f"Bad Request: {msg}", 400
return ("", 204)
def _format_and_export_to_graphql_api(project_name, issue):
label_ids = set()
for element in issue["elements"]:
if element["element_type"] == "crop":
label_ids.add(element["crop_data"]["uuid"])
else:
for label in element["frame_data"]["label_data"]:
label_ids.add(label["uuid"])
variables = {
"projectId": project_name,
"labelIds": list(label_ids)
}
client.execute(
query=relabel_mutation_fragment,
variables=variables,
)
if __name__ == "__main__":
PORT = int(os.getenv("PORT")) if os.getenv("PORT") else 8080
app.run(host="127.0.0.1", port=PORT, debug=True)
REST
server.py
from flask import Flask, request
import os
import requests
AQ_WEBHOOK_SECRET = os.getenv("AQ_WEBHOOK_SECRET")
LABELING_API_KEY = os.getenv("LABELING_API_KEY")
LABELING_API_ENDPOINT = os.getenv("LABELING_API_ENDPOINT")
labeling_api_headers = {
# Substitute with the proper API key header if any for your service
"Authorization": f"Bearer {LABELING_API_KEY}"
}
app = Flask(__name__)
@app.route("/webhook", methods=["POST"])
def handle_webhook_payload():
# Optionally verify that the payload came from Aquarium
aq_secret = request.headers.get("x-aquarium-secret")
if aq_secret != AQ_WEBHOOK_SECRET:
return f"Bad Request: {msg}", 400
payload_envelope = request.get_json()
if not payload_envelope:
msg = "no payload body received"
print(f"error: {msg}")
return f"Bad Request: {msg}", 400
if not isinstance(payload_envelope, dict) or not payload_envelope.get("event"):
msg = "invalid webhook payload format"
print(f"error: {msg}")
return f"Bad Request: {msg}", 400
event_type = payload_envelope["event"]
if event_type == "issue-exported":
if not payload_envelope.get("issue"):
msg = "webhook payload did not contain expected key: issue"
print(f"error: {msg}")
return f"Bad Request: {msg}", 400
_format_and_export_to_rest_api(payload_envelope["project_name"], payload_envelope["issue"])
else:
msg = f"endpoint not setup to handle {event_type} events yet"
print(f"error: {msg}")
return f"Bad Request: {msg}", 400
return ("", 204)
def _format_and_export_to_rest_api(project_name, issue):
relabel_frames = []
for element in issue["elements"]:
relabel_frames.append({
"id": element["frame_id"],
"url": element["frame_data"]["sensor_data"]["data_urls"]["image_url"] # select the right key for your media type
})
# Replace with appropriate post body.
# In this example, we assume the way to requeue a label is to resubmit the frame(s) as a new batch
new_dataset_payload = {
"name": f"{issue['dataset']}_relabel_{issue['issue_name']}",
"project": project_name,
"frames": relabel_frames
}
requests.post(LABELING_API_ENDPOINT, json=new_dataset_payload, headers=labeling_api_headers)
if __name__ == "__main__":
PORT = int(os.getenv("PORT")) if os.getenv("PORT") else 8080
app.run(host="127.0.0.1", port=PORT, debug=True)

Triggering an Issue Export

To trigger an export, click the Export Issue button on an issue's page. If there aren't any webhooks configured to handle the issue-exported event on the issue's project, it will prompt you to create one.

Export to Labeling
Confirmation step where you can preview the payload