With external handler transforms, you can send data from your Mirror pipeline to an external service via HTTP and return the processed results back into the pipeline. This opens up a world of possibilities by allowing you to bring your own custom logic, programming languages, and external services into the transformation process.

In this repo you can see an example implementation of enriching ERC-20 Transfer Events with an HTTP service.

Key Features of External Handler Transforms:

  • Send data to external services via HTTP.
  • Supports a wide variety of programming languages and external libraries.
  • Handle complex processing outside the pipeline and return results in real time.
  • Guaranteed at least once delivery and back-pressure control to ensure data integrity.

How External Handlers work

  1. The pipeline sends a POST request to the external handler with a mini-batch of JSON rows.
  2. The external handler processes the data and returns the transformed rows in the same format and order as received.

Example workflow

  1. The pipeline sends data to an external service (e.g. a custom API).
  2. The service processes the data and returns the results to the pipeline.
  3. The pipeline continues processing the enriched data downstream.

Example HTTP Request

    POST /external-handler
    [
      {"id": 1, "value": "abc"},
      {"id": 2, "value": "def"}
    ]

Example HTTP Response

    [
      {"id": 1, "transformed_value": "xyz"},
      {"id": 2, "transformed_value": "uvw"}
    ]

YAML config with an external transform

transforms:
  my_external_handler_transform:
    type: handler # the transform type. [required]
    primary_key: hash # [required]
    url: http://example-url/example-transform-route # url that your external handler is bound to. [required]
    headers: # [optional]
	    Some-Header: some_value # use http headers to pass any tokens your server requires for authentication or any metadata that you think is useful.
    from: ethereum.raw_blocks # the input for the handler. Data sent to your handler will have the same schema as this source/transform. [required]
    # A schema override signals to the pipeline that the handler will respond with a schema that differs from the upstream source/transform (in this case ethereum.raw_blocks).
    # No override means that the handler will do some processing, but that its output will maintain the upstream schema.
    # The return type of the handler is equal to the upstream schema after the override is applied. Make sure that your handler returns a response with rows that follow this schema.
    schema_override: # [optional]
      new_column_name: datatype # if you want to add a new column, do so by including its name and datatype. 
      existing_column_name: new_datatype # if you want to change the type of an existing column (e.g. cast an int to string), do so by including its name and the new datatype
      other_existing_column_name: null # if you want to drop an existing column, do so by including its name and setting its datatype to null

Schema override datatypes

When overriding the schema of the data returned by the handler it’s important to get the datatypes for each column right. The schema_override property is a map of column names to Flink SQL datatypes.

Key considerations

  • Schema Changes: If the external handler’s output schema changes, you will need to redeploy the pipeline with the relevant schema_override.
  • Failure Handling: In case of failures, the pipeline retries requests indefinitely with exponential backoff.
  • Networking & Performance: For optimal performance, deploy your handler in a region close to where the pipelines are deployed (we use aws us-west-2). Aim to keep p95 latency under 100 milliseconds for best results.
  • Connection & Response times: The maximum allowed response time is 5 minutes and the maximum allowed time to establish a connection is 1 minute.

In-order mode for external handlers

In-Order mode allows for subgraph-style processing inside mirror. Records are emitted to the handler in the order that they appear on-chain.

How to get started

  1. Make sure that the sources that you want to use currently support Fast Scan. If they don’t, submit a request to support.
  2. In your pipeline definition specify the filter and in_order attributes for your source.
  3. Declare a transform of type handler or a sink of type webhook.

Simple transforms (e.g filtering) in between the source and the handler/webhook are allowed, but other complex transforms (e.g. aggregations, joins) can cause loss of ordering.

Example YAML config, with in-order mode

name: in-order-pipeline
sources:
  ethereum.raw_transactions:
    dataset_name: ethereum.raw_transactions
    version: 1.1.0
    type: dataset
    filter: block_number > 21875698 # [required]
    in_order: true # [required] enables in-order mode on the given source and its downstream transforms and sinks.
sinks:
  my_in_order_sink:
    type: webhook
    url: https://my-handler.com/process-in-order
    headers:
      WEBHOOK-SECRET: secret_two
      secret_name: HTTPAUTH_SECRET_TWO
    from: another_transform
  my_sink:
    type: webhook
    url: https://python-handler.fly.dev/echo
    from: ethereum.raw_transactions

Example in-order webhook sink

const express = require('express');
const { Pool } = require('pg');

const app = express();
app.use(express.json());

// Database connection settings
const pool = new Pool({
    user: 'your_user',
    host: 'localhost',
    database: 'your_database',
    password: 'your_password',
    port: 5432,
});

async function isDuplicate(client, key) {
    const res = await client.query("SELECT 1 FROM processed_messages WHERE key = $1", [key]);
    return res.rowCount > 0;
}

app.post('/webhook', async (req, res) => {
    const client = await pool.connect();
    try {
        await client.query('BEGIN');
        
        const payload = req.body;
        const metadata = payload.metadata || {};
        const data = payload.data || {};
        const op = metadata.op;
        const key = metadata.key;
        
        if (!key || !op || !data) {
            await client.query('ROLLBACK');
            return res.status(400).json({ error: "Invalid payload" });
        }

        if (await isDuplicate(client, key)) {
            await client.query('ROLLBACK');
            return res.status(200).json({ message: "Duplicate request processed without write side effects" });
        }

        if (op === "INSERT") {
            const fields = Object.keys(data);
            const values = Object.values(data);
            const placeholders = fields.map((_, i) => `$${i + 1}`).join(', ');
            const query = `INSERT INTO my_table (${fields.join(', ')}) VALUES (${placeholders})`;
            await client.query(query, values);
        } else if (op === "DELETE") {
            const conditions = Object.keys(data).map((key, i) => `${key} = $${i + 1}`).join(' AND ');
            const values = Object.values(data);
            const query = `DELETE FROM my_table WHERE ${conditions}`;
            await client.query(query, values);
        } else {
            await client.query('ROLLBACK');
            return res.status(400).json({ error: "Invalid operation" });
        }
        
        await client.query("INSERT INTO processed_messages (key) VALUES ($1)", [key]);
        await client.query('COMMIT');
        return res.status(200).json({ message: "Success" });
    } catch (e) {
        await client.query('ROLLBACK');
        return res.status(500).json({ error: e.message });
    } finally {
        client.release();
    }
});

app.listen(5000, () => {
    console.log('Server running on port 5000');
});

In-order mode tips

  • To observe records in order, either have a single instance of your handler responding to requests OR introduce some coordination mechanism to make sure that only one replica of the service can answer at a time.
  • When deploying your service, avoid having old and new instances running at the same time. Instead, discard the current instance and incur a little downtime to preserve ordering.
  • When receiving messages that have already been processed in the handler (pre-existing idempotency key or previous index (e.g already seen block number)) don’t introduce any side effects on your side, but do respond to the message as usual (i.e., processed messages for handlers, success code for webhook sink) so that the pipeline knows to keep going.

Useful tips

Schema Changes: A change in the output schema of the external handler requires redeployment with schema_override.

  • Failure Handling: The pipeline retries indefinitely with exponential backoff.
  • Networking: Deploy the handler close to where the pipeline runs for better performance.
  • Latency: Keep handler response times under 100ms to ensure smooth operation.