Integrating with Labeling Using Webhooks

Motivation

A common usecase for Aquarium datasets is to identify problem labels and group them using segments, downloading a json or csv representation of a segment's elements, and then using scripting or a manual process to reformat the data to submit to a labeling service, whether an external vendor or an internal tool. By leveraging webhook configurations, the reformatting and submitting steps can be automated by a single handler. This allows an Aquarium user to directly send data to a labeling service directly from the UI without writing code each time.
The issue-exported event will POST the issue's elements to a webhook. A full schema can be found in the Event Schemas section of the general Webhooks page.
Schema with Example Map Keys
1
{
2
event: "issue-exported",
3
project: str,
4
issue: {
5
id: str,
6
elements: [{
7
dataset: str,
8
inference_set: str,
9
issue_name: str,
10
element_id: str,
11
element_type: "frame" | "crop",
12
frame_id: str,
13
frame_data: {
14
coordinate_frames: [{
15
coordinate_frame_id: str,
16
coordinate_frame_metadata: Optional[Dict],
17
coordinate_frame_type: str,
18
}],
19
custom_metrics: {
20
[custom_metric_type]: int[][] | float,
21
...
22
},
23
date_captured: str,
24
device_id: str,
25
geo_data: {
26
[coordinate_field]: float,
27
...
28
},
29
label_data: [{
30
attributes: {
31
confidence: float,
32
...
33
},
34
label: str,
35
label_coorindate_frame: str,
36
label_type: str,
37
linked_labels: str[],
38
uuid: str,
39
}, ...],
40
sensor_data: [{
41
coordinate_frame: str,
42
data_urls: {
43
image_url: str,
44
...
45
},
46
date_captured: str,
47
sensor_id: str,
48
sensor_metadata: Dict,
49
sensor_type: str,
50
}, ...],
51
task_id: str,
52
[user__metadata_field]: str,
53
},
54
}, ...]
55
}
56
}
Copied!

Shaping the Webhook Payload To Send to Labeling

You can then use the elements in issue to create a new payload that is accepted by a labeling service; we've provided some sample code that serves a webhook endpoint and transforms an Aquarium payload to some common formats.
GraphQL
REST
server.py
1
from flask import Flask, request
2
import os
3
from python_graphql_client import GraphqlClient
4
​
5
AQ_WEBHOOK_SECRET = os.getenv("AQ_WEBHOOK_SECRET")
6
LABELING_API_KEY = os.getenv("LABELING_API_KEY")
7
LABELING_API_ENDPOINT = os.getenv("LABELING_API_ENDPOINT")
8
​
9
labeling_api_headers = {
10
# Replace with the proper API key header if any for your service
11
"Authorization": f"Bearer {LABELING_API_KEY}"
12
}
13
client = GraphqlClient(endpoint=LABELING_API_ENDPOINT, headers=labeling_api_headers)
14
​
15
# Replace with appropriate graphql mutation.
16
# In this example, the way to requeue a label is to remove it and mark as a template
17
relabel_mutation_fragment = """
18
mutation BulkDeleteLabels (
19
$projectId: ID!,
20
$makeTemplates: Boolean = true,
21
$labelIds: [ID!]) {
22
project (where: {id: $projectId}) {
23
bulkDeleteLabels (
24
where: {
25
id_in: $labelIds
26
},
27
makeTemplates: $makeTemplates,
28
waitForQueue: true
29
) {
30
count
31
}
32
}
33
}
34
"""
35
​
36
app = Flask(__name__)
37
​
38
​
39
@app.route("/webhook", methods=["POST"])
40
def handle_webhook_payload():
41
# Optionally verify that the payload came from Aquarium
42
aq_secret = request.headers.get("x-aquarium-secret")
43
if aq_secret != AQ_WEBHOOK_SECRET:
44
return f"Bad Request: {msg}", 400
45
46
payload_envelope = request.get_json()
47
if not payload_envelope:
48
msg = "no payload body received"
49
print(f"error: {msg}")
50
return f"Bad Request: {msg}", 400
51
​
52
if not isinstance(payload_envelope, dict) or not payload_envelope.get("event"):
53
msg = "invalid webhook payload format"
54
print(f"error: {msg}")
55
return f"Bad Request: {msg}", 400
56
57
event_type = payload_envelope["event"]
58
59
if event_type == "issue-exported":
60
if not payload_envelope.get("issue"):
61
msg = "webhook payload did not contain expected key: issue"
62
print(f"error: {msg}")
63
return f"Bad Request: {msg}", 400
64
_format_and_export_to_graphql_api(payload_envelope["project_name"], payload_envelope["issue"])
65
else:
66
msg = f"endpoint not setup to handle {event_type} events yet"
67
print(f"error: {msg}")
68
return f"Bad Request: {msg}", 400
69
​
70
return ("", 204)
71
​
72
​
73
def _format_and_export_to_graphql_api(project_name, issue):
74
label_ids = set()
75
for element in issue["elements"]:
76
if element["element_type"] == "crop":
77
label_ids.add(element["crop_data"]["uuid"])
78
else:
79
for label in element["frame_data"]["label_data"]:
80
label_ids.add(label["uuid"])
81
82
variables = {
83
"projectId": project_name,
84
"labelIds": list(label_ids)
85
}
86
87
client.execute(
88
query=relabel_mutation_fragment,
89
variables=variables,
90
)
91
​
92
​
93
if __name__ == "__main__":
94
PORT = int(os.getenv("PORT")) if os.getenv("PORT") else 8080
95
​
96
app.run(host="127.0.0.1", port=PORT, debug=True)
97
​
Copied!
server.py
1
from flask import Flask, request
2
import os
3
import requests
4
​
5
AQ_WEBHOOK_SECRET = os.getenv("AQ_WEBHOOK_SECRET")
6
LABELING_API_KEY = os.getenv("LABELING_API_KEY")
7
LABELING_API_ENDPOINT = os.getenv("LABELING_API_ENDPOINT")
8
​
9
labeling_api_headers = {
10
# Substitute with the proper API key header if any for your service
11
"Authorization": f"Bearer {LABELING_API_KEY}"
12
}
13
​
14
​
15
app = Flask(__name__)
16
​
17
​
18
@app.route("/webhook", methods=["POST"])
19
def handle_webhook_payload():
20
# Optionally verify that the payload came from Aquarium
21
aq_secret = request.headers.get("x-aquarium-secret")
22
if aq_secret != AQ_WEBHOOK_SECRET:
23
return f"Bad Request: {msg}", 400
24
​
25
payload_envelope = request.get_json()
26
if not payload_envelope:
27
msg = "no payload body received"
28
print(f"error: {msg}")
29
return f"Bad Request: {msg}", 400
30
​
31
if not isinstance(payload_envelope, dict) or not payload_envelope.get("event"):
32
msg = "invalid webhook payload format"
33
print(f"error: {msg}")
34
return f"Bad Request: {msg}", 400
35
36
event_type = payload_envelope["event"]
37
38
if event_type == "issue-exported":
39
if not payload_envelope.get("issue"):
40
msg = "webhook payload did not contain expected key: issue"
41
print(f"error: {msg}")
42
return f"Bad Request: {msg}", 400
43
_format_and_export_to_rest_api(payload_envelope["project_name"], payload_envelope["issue"])
44
else:
45
msg = f"endpoint not setup to handle {event_type} events yet"
46
print(f"error: {msg}")
47
return f"Bad Request: {msg}", 400
48
​
49
return ("", 204)
50
​
51
​
52
def _format_and_export_to_rest_api(project_name, issue):
53
relabel_frames = []
54
for element in issue["elements"]:
55
relabel_frames.append({
56
"id": element["frame_id"],
57
"url": element["frame_data"]["sensor_data"]["data_urls"]["image_url"] # select the right key for your media type
58
})
59
​
60
# Replace with appropriate post body.
61
# In this example, we assume the way to requeue a label is to resubmit the frame(s) as a new batch
62
new_dataset_payload = {
63
"name": f"{issue['dataset']}_relabel_{issue['issue_name']}",
64
"project": project_name,
65
"frames": relabel_frames
66
}
67
68
requests.post(LABELING_API_ENDPOINT, json=new_dataset_payload, headers=labeling_api_headers)
69
​
70
​
71
if __name__ == "__main__":
72
PORT = int(os.getenv("PORT")) if os.getenv("PORT") else 8080
73
​
74
app.run(host="127.0.0.1", port=PORT, debug=True)
75
​
Copied!

Triggering a Segment Export

To trigger an export, click the Export Issue button on an issue's page. If there aren't any webhooks configured to handle the issue-exported event on the issue's project, it will prompt you to create one.
Export to Labeling
Confirmation step where you can preview the payload