Integrating with Labeling Using Webhooks

Motivation

A common usecase for Aquarium datasets is to identify problem labels and group them using segments, downloading a json or csv representation of a segment's elements, and then using scripting or a manual process to reformat the data to submit to a labeling service, whether an external vendor or an internal tool. By leveraging webhook configurations, the reformatting and submitting steps can be automated by a single handler. This allows an Aquarium user to directly send data to a labeling service directly from the UI without writing code each time.

The issue-exported event will POST the issue's elements to a webhook. A full schema can be found in the Event Schemas section of the general Webhooks page.

Schema with Example Map Keys
{
  event: "issue-exported",
  project: str,
  issue: {
    id: str,
    elements: [{
      dataset: str,
      inference_set: str,
      issue_name: str,
      element_id: str,
      element_type: "frame" | "crop",
      frame_id: str,
      frame_data: {
        coordinate_frames: [{
          coordinate_frame_id: str,
          coordinate_frame_metadata: Optional[Dict],
          coordinate_frame_type: str,
        }],
        custom_metrics: {
          [custom_metric_type]: int[][] | float,
          ...
        },
        date_captured: str,
        device_id: str,
        geo_data: {
          [coordinate_field]: float,
          ...
        },
        label_data: [{
          attributes: {
            confidence: float,
            ...
          },
          label: str,
          label_coorindate_frame: str,
          label_type: str,
          linked_labels: str[],
          uuid: str,
        }, ...],        
        sensor_data: [{
          coordinate_frame: str,
          data_urls: {
            image_url: str,
            ...
          },
          date_captured: str,
          sensor_id: str,
          sensor_metadata: Dict,
          sensor_type: str,
        }, ...],
        task_id: str,
        [user__metadata_field]: str,
      },
    }, ...]  
  }
}

Shaping the Webhook Payload To Send to Labeling

You can then use the elements in issue to create a new payload that is accepted by a labeling service; we've provided some sample code that serves a webhook endpoint and transforms an Aquarium payload to some common formats.

server.py
from flask import Flask, request
import os
from python_graphql_client import GraphqlClient

AQ_WEBHOOK_SECRET = os.getenv("AQ_WEBHOOK_SECRET")
LABELING_API_KEY = os.getenv("LABELING_API_KEY")
LABELING_API_ENDPOINT = os.getenv("LABELING_API_ENDPOINT")

labeling_api_headers = {
    # Replace with the proper API key header if any for your service
    "Authorization": f"Bearer {LABELING_API_KEY}"
}
client = GraphqlClient(endpoint=LABELING_API_ENDPOINT, headers=labeling_api_headers)

# Replace with appropriate graphql mutation.
# In this example, the way to requeue a label is to remove it and mark as a template
relabel_mutation_fragment = """
  mutation BulkDeleteLabels (
    $projectId: ID!,
    $makeTemplates: Boolean = true,
    $labelIds: [ID!]) {
        project (where: {id: $projectId}) {
            bulkDeleteLabels (
                where: {
                  id_in: $labelIds
                },
                makeTemplates: $makeTemplates,
                waitForQueue: true
            ) {
                count
            }
        }
    }
"""

app = Flask(__name__)


@app.route("/webhook", methods=["POST"])
def handle_webhook_payload():
    # Optionally verify that the payload came from Aquarium
    aq_secret = request.headers.get("x-aquarium-secret")
    if aq_secret != AQ_WEBHOOK_SECRET:
        return f"Bad Request: {msg}", 400
    
    payload_envelope = request.get_json()
    if not payload_envelope:
        msg = "no payload body received"
        print(f"error: {msg}")
        return f"Bad Request: {msg}", 400

    if not isinstance(payload_envelope, dict) or not payload_envelope.get("event"):
        msg = "invalid webhook payload format"
        print(f"error: {msg}")
        return f"Bad Request: {msg}", 400
    
    event_type = payload_envelope["event"]
    
    if event_type == "issue-exported":
        if not payload_envelope.get("issue"):
            msg = "webhook payload did not contain expected key: issue"
            print(f"error: {msg}")
            return f"Bad Request: {msg}", 400
        _format_and_export_to_graphql_api(payload_envelope["project_name"], payload_envelope["issue"])
    else:
        msg = f"endpoint not setup to handle {event_type} events yet"
        print(f"error: {msg}")
        return f"Bad Request: {msg}", 400

    return ("", 204)


def _format_and_export_to_graphql_api(project_name, issue):
    label_ids = set()
    for element in issue["elements"]:
        if element["element_type"] == "crop":
            label_ids.add(element["crop_data"]["uuid"])
        else:
            for label in element["frame_data"]["label_data"]:
                label_ids.add(label["uuid"])
    
    variables = {
        "projectId": project_name,
        "labelIds": list(label_ids)
    }
    
    client.execute(
        query=relabel_mutation_fragment,
        variables=variables,
    )


if __name__ == "__main__":
    PORT = int(os.getenv("PORT")) if os.getenv("PORT") else 8080

    app.run(host="127.0.0.1", port=PORT, debug=True)

Triggering a Segment Export

To trigger an export, click the Export Issue button on an issue's page. If there aren't any webhooks configured to handle the issue-exported event on the issue's project, it will prompt you to create one.

Last updated