Webhooks

How to use Aquarium webhooks to communicate with your own services, including automating labeling service integrations.

Overview

This page provides two guides:

  • Integrating with Webhooks (Generically)

  • Integrating with Labeling Using Webhooks

Integrating With Webhooks

As a data operations platform, we encourage you to integrate with your other systems and tooling in your practice with Aquarium. To support use cases where you'd prefer for these events to be pushed to you via an HTTP request, we've implemented webhooks.

These webhooks can be triggered from these events:

  • dataset-complete

  • dataset-failed

  • issues-created

  • issue-update

  • issue-exported

It is up to you to provide an endpoint that the webhooks can communicate with. The webhook will send a POST request with a standard payload. The payload schemas can be seen here.

Configuration

Webhooks are configured for events per project, and can be edited under the Webhooks tab on a project page:

One endpoint can service multiple events, or you can configure a unique endpoint for every event you want to be alerted on. You can also deactivate the entire webhook if you need to quickly disable sending to it.

Testing the Webhook

You can also test the endpoint by sending a minimal test payload using the send icon; we will generate a test event and send it through the same processing pipelines that other events go through.

Payloads will always be POSTed to the endpoint, with the headers Content-Type: application/json

An authenticating header will also be included if one has been configured for the organization (see below).

Specific schemas can be viewed here, but all payloads will follow the same basic schema:

{
  event: str, // the event-type that triggered the call
  project: str; // the name of the project that the event originated in
  [entity]: Dict; // a json representation of the entity that the event refers to, if any
}

Authentication

When you configure a webhook that will be called by an Aquarium service, it will also be available for the public to call. To ensure that any payload you receive on that endpoint originated with Aquarium, we've provided a method to generate a secret key in your organization settings page. This key applies across all configured webhooks in all projects in your organization. We'll generate and show it only once in the UI, so please remember to write it down!

Once a secret key is generated, it will be set in all request headers as X-Aquarium-Secret which you can verify against the secret key you've been shown.

# example of how to compare secret key for verification
aq_secret = request.headers.get("x-aquarium-secret")
if aq_secret != AQ_WEBHOOK_SECRET:
    return f"Bad Request: {msg}", 400

A new secret key will immediately replace the previous one.

Event Schemas

All event payloads will follow the same basic schema:

{
  event: str, // the event-type that triggered the call
  project: str; // the name of the project that the event originated in
  [entity]: Dict; // a json representation of the entity that the event refers to, if any
}

Below is an exhaustive list of schema definitions for every event that Aquarium supports and the shape of the [entity] in the payload.

Dataset Completed Processing

The dataset-complete event fires when a labeled dataset or an inference set for a labeled dataset has been uploaded; the shape will differ slightly depending on what was uploaded.

{
  event: "dataset-complete",
  project: str,
  dataset: {
    id: str,
    archived: bool,
    created_at: str,
    updated_at: str,
    frame_count: int,
    label_count: int,
    data_url: str[],
    embeddings_url?: str[],
    dataflow_status: str
    dataflow_status_postprocess: str,
  }
}

Dataset Failed Processing

The dataset-failed event fires when a labeled dataset or an inference set for a labeled dataset has been uploaded; the shape will differ slightly depending on what was uploaded.

{
  event: "dataset-failed",
  project: str,
  dataset: {
    id: str,
    archived: bool,
    created_at: str,
    updated_at: str,
    frame_count: int,
    label_count: int,
    data_url: str[],
    embeddings_url?: str[],
    dataflow_status: str
    dataflow_status_postprocess: str,
  }
}
Issues Created
{
  event: "issues-created",
  project: str,
  issues: [{
    id: str,
    issue_name: str,
    creation_type: "auto" | "manual",
    element_type: "frame" | "crop",
    num_elements: int,
    originating_inference_set_name?: str // the inference set that was used to generate this issue's element, usually applicable to auto-created issues
  }]
}
Issue Updated
{
  event: "issue-updated",
  project: str,
  issue_update: {
    id: str,
    update_type: "add" | "remove" | "move" | "elt_status" | "issue_status",
    num_elements_added?: int, // if update_type == "add"
    num_elements_removed?: int, // if update_type == "remove"
    num_elements_moved?: int, // if update_type == "move"
    num_elements_already_present?: int, // if update_type == "move"
    original_issue_name?: int, // if update_type == "move", the issue the elements were originally in
    new_elements_status?: str, // if update_type == "elt_status"
    num_elements_updated?: int, // if update_type == "elt_status"
    previous_issue_status?: str, // if update_type == "issue_status"
    new_issue_status?: str // if update_type == "issue_status"
  }
}
Issue Exported
{
  event: "issue-exported",
  project: str,
  issue: {
    id: str,
    elements: [{
      dataset: str,
      inference_set: str,
      issue_name: str,
      element_id: str,
      element_type: "frame" | "crop",
      frame_id: str,
      frame_data: {
        coordinate_frames: [{
          coordinate_frame_id: str,
          coordinate_frame_metadata: Optional[Dict],
          coordinate_frame_type: str,
        }],
        custom_metrics: {
          [custom_metric_type]: int[][] | float,
        },
        date_captured: str,
        device_id: str,
        geo_data: {
          [coordinate_field]: float,
        },
        label_data: [{
          attributes: {
            confidence: float,
            ...
          },
          label: str,
          label_coorindate_frame: str,
          label_type: str,
          linked_labels: str[],
          uuid: str,
        }],        
        sensor_data: [{
          coordinate_frame: str,
          data_urls: {
            image_url: str,
          },
          date_captured: str,
          sensor_id: str,
          sensor_metadata: Dict,
          sensor_type: str,
        }],
        task_id: str,
        [user__metadata_field]: str | int | float | bool,
      },
    }]  
  }
}

Integrating with Labeling Using Webhooks

Motivation

A common use case for Aquarium datasets is to identify problem labels and group them using segments, downloading a JSON or CSV representation of a segment's elements, and then using scripting or a manual process to reformat the data to submit to a labeling service, whether an external vendor or an internal tool. By leveraging webhook configurations, the reformatting and submitting steps can be automated by a single handler. This allows an Aquarium user to directly send data to a labeling service directly from the UI without writing code each time.

The issue-exported event will POST the issue's elements to a webhook. A full schema can be found here:

Schema with Example Map Keys
{
  event: "issue-exported",
  project: str,
  issue: {
    id: str,
    elements: [{
      dataset: str,
      inference_set: str,
      issue_name: str,
      element_id: str,
      element_type: "frame" | "crop",
      frame_id: str,
      frame_data: {
        coordinate_frames: [{
          coordinate_frame_id: str,
          coordinate_frame_metadata: Optional[Dict],
          coordinate_frame_type: str,
        }],
        custom_metrics: {
          [custom_metric_type]: int[][] | float,
          ...
        },
        date_captured: str,
        device_id: str,
        geo_data: {
          [coordinate_field]: float,
          ...
        },
        label_data: [{
          attributes: {
            confidence: float,
            ...
          },
          label: str,
          label_coorindate_frame: str,
          label_type: str,
          linked_labels: str[],
          uuid: str,
        }, ...],        
        sensor_data: [{
          coordinate_frame: str,
          data_urls: {
            image_url: str,
            ...
          },
          date_captured: str,
          sensor_id: str,
          sensor_metadata: Dict,
          sensor_type: str,
        }, ...],
        task_id: str,
        [user__metadata_field]: str,
      },
    }, ...]  
  }
}

Shaping the Webhook Payload To Send to Labeling

You can then use the elements in segment to create a new payload that is accepted by a labeling service; we've provided some sample code that serves a webhook endpoint and transforms an Aquarium payload to some common formats.

server.py
from flask import Flask, request
import os
from python_graphql_client import GraphqlClient

AQ_WEBHOOK_SECRET = os.getenv("AQ_WEBHOOK_SECRET")
LABELING_API_KEY = os.getenv("LABELING_API_KEY")
LABELING_API_ENDPOINT = os.getenv("LABELING_API_ENDPOINT")

labeling_api_headers = {
    # Replace with the proper API key header if any for your service
    "Authorization": f"Bearer {LABELING_API_KEY}"
}
client = GraphqlClient(endpoint=LABELING_API_ENDPOINT, headers=labeling_api_headers)

# Replace with appropriate graphql mutation.
# In this example, the way to requeue a label is to remove it and mark as a template
relabel_mutation_fragment = """
  mutation BulkDeleteLabels (
    $projectId: ID!,
    $makeTemplates: Boolean = true,
    $labelIds: [ID!]) {
        project (where: {id: $projectId}) {
            bulkDeleteLabels (
                where: {
                  id_in: $labelIds
                },
                makeTemplates: $makeTemplates,
                waitForQueue: true
            ) {
                count
            }
        }
    }
"""

app = Flask(__name__)


@app.route("/webhook", methods=["POST"])
def handle_webhook_payload():
    # Optionally verify that the payload came from Aquarium
    aq_secret = request.headers.get("x-aquarium-secret")
    if aq_secret != AQ_WEBHOOK_SECRET:
        return f"Bad Request: {msg}", 400
    
    payload_envelope = request.get_json()
    if not payload_envelope:
        msg = "no payload body received"
        print(f"error: {msg}")
        return f"Bad Request: {msg}", 400

    if not isinstance(payload_envelope, dict) or not payload_envelope.get("event"):
        msg = "invalid webhook payload format"
        print(f"error: {msg}")
        return f"Bad Request: {msg}", 400
    
    event_type = payload_envelope["event"]
    
    if event_type == "issue-exported":
        if not payload_envelope.get("issue"):
            msg = "webhook payload did not contain expected key: issue"
            print(f"error: {msg}")
            return f"Bad Request: {msg}", 400
        _format_and_export_to_graphql_api(payload_envelope["project_name"], payload_envelope["issue"])
    else:
        msg = f"endpoint not setup to handle {event_type} events yet"
        print(f"error: {msg}")
        return f"Bad Request: {msg}", 400

    return ("", 204)


def _format_and_export_to_graphql_api(project_name, issue):
    label_ids = set()
    for element in issue["elements"]:
        if element["element_type"] == "crop":
            label_ids.add(element["crop_data"]["uuid"])
        else:
            for label in element["frame_data"]["label_data"]:
                label_ids.add(label["uuid"])
    
    variables = {
        "projectId": project_name,
        "labelIds": list(label_ids)
    }
    
    client.execute(
        query=relabel_mutation_fragment,
        variables=variables,
    )


if __name__ == "__main__":
    PORT = int(os.getenv("PORT")) if os.getenv("PORT") else 8080

    app.run(host="127.0.0.1", port=PORT, debug=True)

Triggering a Segment Export

To trigger an export, click the Export To Labeling button on an issue's page. If there aren't any webhooks configured to handle the issue-exported event on the issue's project, it will prompt you to create one.

If the payload was sent to the webhook successfully the modal will close. If not, an error will display in the modal window.

Last updated