> ## Documentation Index
> Fetch the complete documentation index at: https://docs.goldsky.com/llms.txt
> Use this file to discover all available pages before exploring further.

# External Handler Transforms

> Transforming data with an external HTTP service.

With external handler transforms, you can send data from your Mirror pipeline to an external service via HTTP and return the processed results back into the pipeline. This opens up a world of possibilities by allowing you to bring your own custom logic, programming languages, and external services into the transformation process.

[In this repo](https://github.com/goldsky-io/documentation-examples/tree/main/mirror-pipelines/goldsky-enriched-erc20-pipeline) you can see an example implementation of enriching ERC-20 Transfer Events with an HTTP service.

**Key Features of External Handler Transforms:**

* Send data to external services via HTTP.
* Supports a wide variety of programming languages and external libraries.
* Handle complex processing outside the pipeline and return results in real time.
* Guaranteed at least once delivery and back-pressure control to ensure data integrity.

### How External Handlers work

1. The pipeline sends a POST request to the external handler with a mini-batch of JSON rows.
2. The external handler processes the data and returns the transformed rows in the same format and order as received.

### Example workflow

1. The pipeline sends data to an external service (e.g. a custom API).
2. The service processes the data and returns the results to the pipeline.
3. The pipeline continues processing the enriched data downstream.

### Example HTTP Request

```json theme={null}
    POST /external-handler
    [
      {"id": 1, "value": "abc"},
      {"id": 2, "value": "def"}
    ]
```

### Example HTTP Response

```json theme={null}
    [
      {"id": 1, "transformed_value": "xyz"},
      {"id": 2, "transformed_value": "uvw"}
    ]
```

### YAML config with an external transform

<Tab title="example.yaml">
  ```YAML theme={null}
  transforms:
    my_external_handler_transform:
      type: handler # the transform type. [required]
      primary_key: hash # [required]
      url: http://example-url/example-transform-route # url that your external handler is bound to. [required]
      headers: # [optional]
  	    Some-Header: some_value # use HTTP headers to pass any tokens your server requires for authentication or any metadata that you think is useful.
      from: ethereum.raw_blocks # the input for the handler. Data sent to your handler will have the same schema as this source/transform. [required]
      # A schema override signals to the pipeline that the handler will respond with a schema that differs from the upstream source/transform (in this case ethereum.raw_blocks).
      # No override means that the handler will do some processing, but that its output will maintain the upstream schema.
      # The return type of the handler is equal to the upstream schema after the override is applied. Make sure that your handler returns a response with rows that follow this schema.
      schema_override: # [optional]
        new_column_name: datatype # if you want to add a new column, do so by including its name and datatype. 
        existing_column_name: new_datatype # if you want to change the type of an existing column (e.g. cast an int to string), do so by including its name and the new datatype
        other_existing_column_name: null # if you want to drop an existing column, do so by including its name and setting its datatype to null
      # The number of records the pipeline will send together in a batch. Default `100`
      batch_size: Type.Optional(Type.Integer())
      # The maximum time the pipeline will batch records before flushing. Examples: 60s, 1m, 1h. Default: '1s'
      batch_flush_interval: Type.Optional(Type.String())
      # Specify which columns to send to the external handler. When defined, only these columns are serialized and sent.
      # The handler's response is merged with the original full row. When omitted, all columns are sent.
      payload_columns: Type.Optional(Type.Array(Type.String()))
  ```
</Tab>

### Payload columns

The `payload_columns` attribute allows you to optimize bandwidth and control which data is sent to your external handler.

**How it works:**

1. When `payload_columns` is defined:
   * Only the specified columns are serialized to JSON and sent to the external handler.
   * A copy of the full original row is kept in memory.
   * The handler's response is joined back with the original full row.

2. When `payload_columns` is omitted:
   * All columns are sent to the handler.
   * Handler response replaces the entire row.

**Purpose:**

* **Bandwidth optimization:** Send only necessary columns to reduce payload size.
* **Data filtering:** Keep sensitive or large data local while only transmitting a subset.
* **Response merging:** Handler enriches the data, which is then merged with the original complete row.

**Example:**

If you have a row with columns `[transaction_hash, block_number, from_address, to_address, value, input_data, gas_used, gas_price, ...]` but only want to send `transaction_hash` and `from_address` to an API:

```yaml theme={null}
transforms:
  my_handler:
    type: handler
    primary_key: transaction_hash
    url: http://example.com/enrich
    from: ethereum.raw_transactions
    payload_columns: ["transaction_hash", "from_address"]
```

Only `transaction_hash` and `from_address` are sent in the HTTP request.

To filter data, return null in the json array in the response, but the array length must remain the same

You can also send a new column back as part of that array to enrich the final result. The new column will be joined with the existing columns.

### Schema override datatypes

When overriding the schema of the data returned by the handler it's important to get the datatypes for each column right. The schema\_override property is a map of column names to Flink SQL datatypes.

Data types are nullable by default. If you need non-nullable types use \<data type> NOT NULL. For example: STRING NOT NULL.

<Accordion title="Complete list of supported datatypes">
  | Data Type      | Notes                               |
  | :------------- | :---------------------------------- |
  | STRING         |                                     |
  | BOOLEAN        |                                     |
  | BYTE           |                                     |
  | DECIMAL        | Supports fixed precision and scale. |
  | SMALLINT       |                                     |
  | INTEGER        |                                     |
  | BIGINT         |                                     |
  | FLOAT          |                                     |
  | DOUBLE         |                                     |
  | TIME           | Supports only a precision of 0.     |
  | TIMESTAMP      |                                     |
  | TIMESTAMP\_LTZ |                                     |
  | ARRAY          |                                     |
  | ROW            |                                     |
</Accordion>

### Key considerations

* **Schema Changes:** If the external handler's output schema changes, you will need to redeploy the pipeline with the relevant schema\_override.
* **Failure Handling:** In case of failures, the pipeline retries requests indefinitely with exponential backoff.
* **Networking & Performance:** For optimal performance, deploy your handler in a region close to where the pipelines are deployed (we use aws `us-west-2`). Aim to keep p95 latency under 100 milliseconds for best results.
* **Latency vs Throughput:** Use lower batch\_size/batch\_flush\_interval to achive low latency and higher values to achieve high throughput (useful when backfilling/bootstraping).
* **Connection & Response times**: The maximum allowed response time is 5 minutes and the maximum allowed time to establish a connection is 1 minute.

### Working with JSON data

HTTP handlers have limitations when working with JSON data types. If you need to send JSON objects from your external service:

1. **In the handler**: Return JSON data as strings (e.g., using `json.dumps()` in Python)
2. **In the handler schema\_override**: Specify the field as `string` type
3. **In the sink schema\_override**: Cast the string to `jsonb` type

**Example configuration:**

```yaml theme={null}
transforms:
  my_handler:
    type: handler
    url: https://my-service.com/process
    from: my_source
    schema_override:
      decoded_data: string  # Handler returns JSON as string

sinks:
  my_sink:
    type: postgres
    from: my_handler
    schema_override:
      decoded_data: jsonb   # Sink casts string to JSONB
```

This approach prevents "Failed to deserialize response body as JSON" errors when working with complex JSON structures.

### In-order mode for external handlers

In-Order mode allows for subgraph-style processing inside mirror. Records are emitted to the handler in the order that they appear on-chain.

**How to get started**

1. Make sure that the sources that you want to use currently support [Fast Scan](/mirror/sources/direct-indexing). If they don't, submit a request to support.
2. In your pipeline definition specify the `filter` and `in_order` attributes for your source.
3. Declare a transform of type handler or a sink of type webhook.

Simple transforms (e.g filtering) in between the source and the handler/webhook are allowed, but other complex transforms (e.g. aggregations, joins) can cause loss of ordering.

**Example YAML config, with in-order mode**

<Tab title="in-order-mode-example.yaml">
  ```YAML theme={null}
  name: in-order-pipeline
  sources:
    ethereum.raw_transactions:
      dataset_name: ethereum.raw_transactions
      version: 1.1.0
      type: dataset
      filter: block_number > 21875698 # [required]
      in_order: true # [required] enables in-order mode on the given source and its downstream transforms and sinks.
  sinks:
    my_in_order_sink:
      type: webhook
      url: https://my-handler.com/process-in-order
      headers:
        WEBHOOK-SECRET: secret_two
        secret_name: HTTPAUTH_SECRET_TWO
      from: another_transform
    my_sink:
      type: webhook
      url: https://python-handler.fly.dev/echo
      from: ethereum.raw_transactions
  ```
</Tab>

**Example in-order webhook sink**

```javascript theme={null}
const express = require('express');
const { Pool } = require('pg');

const app = express();
app.use(express.json());

// Database connection settings
const pool = new Pool({
    user: 'your_user',
    host: 'localhost',
    database: 'your_database',
    password: 'your_password',
    port: 5432,
});

async function isDuplicate(client, key) {
    const res = await client.query("SELECT 1 FROM processed_messages WHERE key = $1", [key]);
    return res.rowCount > 0;
}

app.post('/webhook', async (req, res) => {
    const client = await pool.connect();
    try {
        await client.query('BEGIN');
        
        const payload = req.body;
        const metadata = payload.metadata || {};
        const data = payload.data || {};
        const op = metadata.op;
        const key = metadata.key;
        
        if (!key || !op || !data) {
            await client.query('ROLLBACK');
            return res.status(400).json({ error: "Invalid payload" });
        }

        if (await isDuplicate(client, key)) {
            await client.query('ROLLBACK');
            return res.status(200).json({ message: "Duplicate request processed without write side effects" });
        }

        if (op === "INSERT") {
            const fields = Object.keys(data);
            const values = Object.values(data);
            const placeholders = fields.map((_, i) => `$${i + 1}`).join(', ');
            const query = `INSERT INTO my_table (${fields.join(', ')}) VALUES (${placeholders})`;
            await client.query(query, values);
        } else if (op === "DELETE") {
            const conditions = Object.keys(data).map((key, i) => `${key} = $${i + 1}`).join(' AND ');
            const values = Object.values(data);
            const query = `DELETE FROM my_table WHERE ${conditions}`;
            await client.query(query, values);
        } else {
            await client.query('ROLLBACK');
            return res.status(400).json({ error: "Invalid operation" });
        }
        
        await client.query("INSERT INTO processed_messages (key) VALUES ($1)", [key]);
        await client.query('COMMIT');
        return res.status(200).json({ message: "Success" });
    } catch (e) {
        await client.query('ROLLBACK');
        return res.status(500).json({ error: e.message });
    } finally {
        client.release();
    }
});

app.listen(5000, () => {
    console.log('Server running on port 5000');
});
```

**In-order mode tips**

* To observe records in order, either have a single instance of your handler responding to requests OR introduce some coordination mechanism to make sure that only one replica of the service can answer at a time.
* When deploying your service, avoid having old and new instances running at the same time. Instead, discard the current instance and incur a little downtime to preserve ordering.
* When receiving messages that have already been processed in the handler (pre-existing idempotency key or previous index (e.g already seen block number)) **don't** introduce any side effects on your side, but **do** respond to the message as usual (i.e., processed messages for handlers, success code for webhook sink) so that the pipeline knows to keep going.

### Useful tips

Schema Changes: A change in the output schema of the external handler requires redeployment with schema\_override.

* **Failure Handling:** The pipeline retries indefinitely with exponential backoff.
* **Networking:** Deploy the handler close to where the pipeline runs for better performance.
* **Latency:** Keep handler response times under 100ms to ensure smooth operation.
