With external handler transforms, you can send data from your Mirror pipeline to an external service via HTTP and return the processed results back into the pipeline. This opens up a world of possibilities by allowing you to bring your own custom logic, programming languages, and external services into the transformation process.
In this repo you can see an example implementation of enriching ERC-20 Transfer Events with an HTTP service.
Key Features of External Handler Transforms:
Send data to external services via HTTP.
Supports a wide variety of programming languages and external libraries.
Handle complex processing outside the pipeline and return results in real time.
Guaranteed at least once delivery and back-pressure control to ensure data integrity.
transforms: my_external_handler_transform: type: handler # the transform type. [required] primary_key: hash # [required] url: http://example-url/example-transform-route # url that your external handler is bound to. [required] headers: # [optional] Some-Header: some_value # use http headers to pass any tokens your server requires for authentication or any metadata that you think is useful. from: ethereum.raw_blocks # the input for the handler. Data sent to your handler will have the same schema as this source/transform. [required] # A schema override signals to the pipeline that the handler will respond with a schema that differs from the upstream source/transform (in this case ethereum.raw_blocks). # No override means that the handler will do some processing, but that its output will maintain the upstream schema. # The return type of the handler is equal to the upstream schema after the override is applied. Make sure that your handler returns a response with rows that follow this schema. schema_override: # [optional] new_column_name: datatype # if you want to add a new column, do so by including its name and datatype. existing_column_name: new_datatype # if you want to change the type of an existing column (e.g. cast an int to string), do so by including its name and the new datatype other_existing_column_name: null # if you want to drop an existing column, do so by including its name and setting its datatype to null
When overriding the schema of the data returned by the handler it’s important to get the datatypes for each column right. The schema_override property is a map of column names to Flink SQL datatypes.
Schema Changes: If the external handler’s output schema changes, you will need to redeploy the pipeline with the relevant schema_override.
Failure Handling: In case of failures, the pipeline retries requests indefinitely with exponential backoff.
Networking & Performance: For optimal performance, deploy your handler in a region close to where the pipelines are deployed (we use aws us-west-2). Aim to keep p95 latency under 100 milliseconds for best results.
Connection & Response times: The maximum allowed response time is 5 minutes and the maximum allowed time to establish a connection is 1 minute.
In-Order mode allows for subgraph-style processing inside mirror. Records are emitted to the handler in the order that they appear on-chain.
How to get started
Make sure that the sources that you want to use currently support Fast Scan. If they don’t, submit a request to support.
In your pipeline definition specify the filter and in_order attributes for your source.
Declare a transform of type handler or a sink of type webhook.
Simple transforms (e.g filtering) in between the source and the handler/webhook are allowed, but other complex transforms (e.g. aggregations, joins) can cause loss of ordering.
Example YAML config, with in-order mode
name: in-order-pipelinesources: ethereum.raw_transactions: dataset_name: ethereum.raw_transactions version: 1.1.0 type: dataset filter: block_number > 21875698 # [required] in_order: true # [required] enables in-order mode on the given source and its downstream transforms and sinks.sinks: my_in_order_sink: type: webhook url: https://my-handler.com/process-in-order headers: WEBHOOK-SECRET: secret_two secret_name: HTTPAUTH_SECRET_TWO from: another_transform my_sink: type: webhook url: https://python-handler.fly.dev/echo from: ethereum.raw_transactions
To observe records in order, either have a single instance of your handler responding to requests OR introduce some coordination mechanism to make sure that only one replica of the service can answer at a time.
When deploying your service, avoid having old and new instances running at the same time. Instead, discard the current instance and incur a little downtime to preserve ordering.
When receiving messages that have already been processed in the handler (pre-existing idempotency key or previous index (e.g already seen block number)) don’t introduce any side effects on your side, but do respond to the message as usual (i.e., processed messages for handlers, success code for webhook sink) so that the pipeline knows to keep going.