1. SQL Transforms

SQL transforms allow you to write SQL queries to modify and shape data from multiple sources within the pipeline. This is ideal for operations that need to be performed within the data pipeline itself, such as filtering, aggregating, or joining datasets.

Depending on how you choose to source your data, you might find that you run into 1 of 2 challenges:

  1. You only care about a few contracts
    Rather than fill up your database with a ton of extra data, you’d rather filter down your data to a smaller set.

  2. The data is still a bit raw
    Maybe you’d rather track gwei rounded to the nearest whole number instead of wei. You’re looking to map data to a different format so you don’t have to run this calculation over and over again.

The SQL Solution

You can use SQL-based transforms to solve both of these challenges that normally would have you writing your own indexer or data pipeline. Instead, Goldsky can automatically run these for you using just 3 pieces of info:

  • name: A shortname for this transform
    You can refer to this from sinks via from or treat it as a table in SQL from other transforms.
  • sql: The actual SQL
    To filter your data, use a WHERE clause, e.g. WHERE liquidity > 1000.
    To map your data, use an AS clause combined with SELECT, e.g. SELECT wei / 1000000000 AS gwei.
  • primary_key: A unique ID
    This should be unique, but you can also use this to intentionally de-duplicate data - the latest row with the same ID will replace all the others.

Combine them together into your config:

transforms:
  negative_fpmm_scaled_liquidity_parameter:
    sql: SELECT id FROM polymarket.fixed_product_market_maker WHERE scaled_liquidity_parameter < 0
    primary_key: id

That’s it. You can now filter and map data to exactly what you need.

2. External Handler Transforms (New)

With external handler transforms, you can send data from your Mirror pipeline to an external service via HTTP and return the processed results back into the pipeline. This opens up a world of possibilities by allowing you to bring your own custom logic, programming languages, and external services into the transformation process.

Key Features of External Handler Transforms:

  • Send data to external services via HTTP.
  • Supports a wide variety of programming languages and external libraries.
  • Handle complex processing outside the pipeline and return results in real time.
  • Guaranteed at least once delivery and back-pressure control to ensure data integrity.

How External Handlers work

  1. The pipeline sends a POST request to the external handler with a mini-batch of JSON rows.
  2. The external handler processes the data and returns the transformed rows in the same format and order as received.

Example workflow

  1. The pipeline sends data to an external service (e.g. a custom API).
  2. The service processes the data and returns the results to the pipeline.
  3. The pipeline continues processing the enriched data downstream.

Example HTTP Request

    POST /external-handler
    [
      {"id": 1, "value": "abc"},
      {"id": 2, "value": "def"}
    ]

Example HTTP Response

    [
      {"id": 1, "transformed_value": "xyz"},
      {"id": 2, "transformed_value": "uvw"}
    ]

Example YAML config with an external transform

transforms:
  my_external_handler_transform:
    type: handler # the transform type. [required]
    primary_key: hash # [required]
    url: http://example-url/example-transform-route # url that your external handler is bound to. [required]
    headers: # [optional]
	    Some-Header: some_value # use http headers to pass any tokens your server requires for authentication or any metadata that you think is useful.
    from: ethereum.raw_blocks # the input for the handler. Data sent to your handler will have the same schema as this source/transform. [required]
    # A schema override signals to the pipeline that the handler will respond with a schema that differs from the upstream source/transform (in this case ethereum.raw_blocks).
    # No override means that the handler will do some processing, but that its output will maintain the upstream schema.
    # The return type of the handler is equal to the upstream schema after the override is applied. Make sure that your handler returns a response with rows that follow this schema.
    schema_override: # [optional]
      new_column_name: datatype # if you want to add a new column, do so by including its name and datatype. 
      existing_column_name: new_datatype # if you want to change the type of an existing column (e.g. cast an int to string), do so by including its name and the new datatype
      other_existing_column_name: null # if you want to drop an existing column, do so by including its name and setting its datatype to null

Schema override datatypes

When overriding the schema of the data returned by the handler it’s important to get the datatypes for each column right. The schema_override property is a map of column names to Flink SQL datatypes.

Key considerations

  • Schema Changes: If the external handler’s output schema changes, you will need to redeploy the pipeline with the relevant schema_override.
  • Failure Handling: In case of failures, the pipeline retries requests indefinitely with exponential backoff.
  • Networking & Performance: For optimal performance, deploy your handler in a region close to where the pipelines are deployed (we use aws us-west-2). Aim to keep p95 latency under 100 milliseconds for best results.

Useful tips

Schema Changes: A change in the output schema of the external handler requires redeployment with schema_override.

  • Failure Handling: The pipeline retries indefinitely with exponential backoff.
  • Networking: Deploy the handler close to where the pipeline runs for better performance.
  • Latency: Keep handler response times under 100ms to ensure smooth operation.

Example Implementation

In this repo you can see an example implementation of HTTP Transforms enriching ERC-20 Transfer Events.