We recently released v3 of pipeline definitions which uses a more intuitive and user friendly format to define and configure pipelines using a yaml file. For backward compatibility purposes, we will still support the previous v2 format. This is why you will find references to each format in each yaml file presented across the documentation. Feel free to use whichever is more comfortable for you but we encourage you to start migrating to v3 format.

Mirror pipelines are defined using a yaml file. This yaml file has a twofold purpose:

  • It defines the sources, transformations, and sinks that comprise your pipeline.
  • It contains configuration attributes that alter certain properties of your pipeline such as its status or resource_size.

These two different purposes can be all added under the same definition file as in the following example:

base-logs.yaml
name: base-logs-pipeline
version: 1
status: ACTIVE
resource_size: s
apiVersion: 3
sources:
  base.logs:
    dataset_name: base.logs
    version: 1.0.0
    type: dataset
    description: Enriched logs for events emitted from contracts. Contains the
      contract address, data, topics, decoded event and metadata for blocks and
      transactions.
    display_name: Logs
transforms: {}
sinks:
  postgres_base_logs:
    type: postgres
    table: base_logs
    schema: public
    secret_name: GOLDSKY_SECRET
    description: "Postgres sink for: base.logs"
    from: base.logs

Keys in v3 format for sources, transforms and sinks are user provided values. In this example, the source reference name base.logs matches the actual dataset_name. This is the default format that you’ll typically see accross examples since it’s convenient. However, you can use a custom reference name as key instead, just make sure to remember to use it when referring on your transforms and sinks.

Alternatively, they can be separated:

  1. Definition files
base-logs.yaml
name: base-logs-pipeline
description: An example description of a pipeline
version: 1
status: ACTIVE
resource_size: s
apiVersion: 3
sources:
  base.logs:
    dataset_name: base.logs
    version: 1.0.0
    type: dataset
    description: Enriched logs for events emitted from contracts. Contains the
      contract address, data, topics, decoded event and metadata for blocks and
      transactions.
    display_name: Logs
transforms: {}
sinks:
  postgres_base_logs:
    type: postgres
    table: base_logs
    schema: public
    secret_name: GOLDSKY_SECRET
    description: "Postgres sink for: base.logs"
    from: base.logs
  1. Configuration files
base-logs.yaml
name: base-logs-pipeline
description: An example description of a pipeline
resource_size: s

In the following sections we’ll look at the attributes that correspond to each section of pipeline definition and pipeline configuration.

Defining a pipeline

In order to create a pipeline, you need to define its sources, transformations, and sinks.

If you already have a pipeline you can get its configuration file with the following command: goldsky pipeline get <pipeline-name>

If you are planning on creating a new pipeline you can use goldsky pipeline apply <path-to-pipeline-config-file>.


The pipeline config YAML schema consists of three primary sections:

  • Sources: Denotes the origin of the data.
  • Transforms: Lists the transformations to be applied to the data from the source.
  • Sinks: Specifies where the transformed data should be sent.

Each source and transform has a key that can be used by other transforms or sinks. You can compose multiple transforms together as you need and pipe the results to a sink.

Below, we’ll explain each section of the YAML structure, and provide an example for each subset of the schema.

Sources

The sources object contains one or more source configuration. There are currently two supported source types:

  • Subgraph Entities
  • Datasets

Subgraph Entities

This lets you define your own subgraphs as a pipeline source.

Example

sources:
  polymarket.fixed_product_market_maker:
    type: subgraph_entity
    name: fixed_product_market_maker
    subgraphs:
    - name: polymarket
      version: 1.0.0

Here you’ll use as name the name of your deployed subgraph

Datasets

Datasets let you define Direct Indexing sources. These data sources are curated by the Goldsky team, with automated QA guaranteeing correctness.

Example

sources:
  my_ethereum_decoded_logs:
    type: dataset
    dataset_name: ethereum.decoded_logs
    version: 1.0.0

To obtain the dataset_name property, please use goldsky dataset list and select your chain of choice. Please refer to supported chains for an overview of what data is available for individual chains.

Transforms

The transforms object contains one or many transform configurations, each with the following properties:

  • Its property name in the transforms object: The name of the transformation. This can be used by sinks as a from property, or in any other transform’s SQL as a table.
  • sql: The SQL query to be performed. This can refer to any property from sources object and sinks as SQL tables.
  • primary_key: The primary key for the transformation. If there are any two rows with the same primaryKey, the pipeline will override it with the latest value.

Transform Example

transforms:
  negative_fpmm_scaled_liquidity_parameter:
    sql: SELECT id FROM polymarket.fixed_product_market_maker WHERE scaled_liquidity_parameter < 0
    primary_key: id

Sinks

The sinks object contains one or many sink configurations, each with the following properties:

  • Its property name in the sinks object: A name of your choice to uniquely identify this sink within a pipeline.
  • type: The sink type. This could be postgres or elasticsearch.
  • from: The source or transform to use for the sink.
  • table: The table name to load into. This is required for sinks of type postgres.
  • schema: The schema for the sink. This is required for sinks of type postgres.
  • secret_name: The name of the secret for the sink. This could be API_POSTGRES_CREDENTIALS or REDPANDA_CREDENTIALS.
  • topic: The topic to produce to. This is required for sinks of type kafka.

Sink Example

sinks:
  postgres_test_negative_fpmm_scaled_liquidity_parameter:
    type: postgres
    from: negative_fpmm_scaled_liquidity_parameter
    table: test_negative_fpmm_scaled_liquidity_parameter
    schema: public
    secret_name: API_POSTGRES_CREDENTIALS

Pipeline definition examples

You can run the following examples by copying the file into a local yaml file, and then using

# Assuming the yaml config is in pipeline.yaml
goldsky pipeline apply pipeline.yaml

Syncing a subgraph into postgres

This pipeline pulls data from a single subgraph_entity source, processes the data with a single SQL transformation, and stores the result into a PostgreSQL sink.

You will need to have the existing subgraph with the name/version combo of polymarket/1.0.0 as a prerequisite to running this pipeline.

name: syncing-a-subgraph-into-postgres
apiVersion: 3
sources:
  polygon.fixed_product_market_maker:
    type: subgraph_entity
    name: fixed_product_market_maker
    subgraphs:
    - name: polymarket
      version: 1.0.0
transforms:
  negative_fpmm_scaled_liquidity_parameter:
    sql: SELECT id FROM polygon.fixed_product_market_maker WHERE scaled_liquidity_parameter < 0
    primary_key: id
sinks:
  postgres_polygon_sink:
    type: postgres
    from: negative_fpmm_scaled_liquidity_parameter
    table: test_negative_fpmm_scaled_liquidity_parameter
    schema: public
    secret_name: API_POSTGRES_CREDENTIALS

Merging subgraphs cross-chain

This pipeline is named poap-extended-1. It pulls data from two subgraph_entity sources, does not perform any transformations, and stores the result into two separate PostgreSQL sinks.

name: poap-extended-1
apiVersion: 3
sources:
  hashflow_cross_chain.pool_created:
    type: subgraph_entity
    name: pool_created
    subgraphs:
      - name: polymarket
        version: 1.0.0
      - name: hashflow
        version: 1.0.0
  hashflow_cross_chain.update_router_permissions:
    type: subgraph_entity
    name: update_router_permissions
    subgraphs:
      - name: polymarket
        version: 1.0.0
      - name: hashflow
        version: 1.0.0
transforms: {}
sinks:
  pool_created_sink:
    type: postgres
    from: hashflow_cross_chain.pool_created
    table: test_pool_created
    schema: public
    secret_name: API_POSTGRES_CREDENTIALS
  update_router_permissions_sink:
    type: postgres
    from: hashflow_cross_chain.update_router_permissions
    table: test_update_router_permissions
    schema: public
    secret_name: API_POSTGRES_CREDENTIALS

Syncing a dataset into a postgres database

This pipeline is named decoded-logs-pipeline. It pulls data from a curated goldsky dataset, without performing any transformations, and stores the result into a PostgreSQL sink, in a table called eth_logs in the goldsky schema.

name: decoded-logs-pipeline
apiVersion: 3
sources:
  my_ethereum_decoded_logs:
    dataset_name: ethereum.decoded_logs
    version: 1.0.0
    type: dataset
    start_at: latest

transforms:
  logs:
    sql: |
      SELECT
          id,
          address,
          event_signature,
          event_params,
          raw_log.block_number as block_number,
          raw_log.block_hash as block_hash,
          raw_log.transaction_hash as transaction_hash
      FROM
          my_ethereum_decoded_logs
    primary_key: id

sinks:
  logs_sink:
    type: postgres
    table: eth_logs
    schema: goldsky
    secret_name: API_POSTGRES_CREDENTIALS
    from: logs

Configuring a pipeline

Once a pipeline has been defined and deployed there’s a certain set of attributes we can use to configure its lifecycle and behaviour. This is done using the goldsky pipeline apply <config_file> command.

If you need a refresher on the lifecycle of pipelines make sure to check out Run a Pipeline, here we’ll just focus on detailing the configuration attributes that can be used:

name
string
required

The pipeline name is required so that Mirror knows which pipeline needs to apply the configuration to. Remember that, if a definition is not provided, then pipeline name must refere to an already deployed pipeline in your project.

definition
string

As explained above, the definition is comprised of sources, transforms and sinks. If a pipeline has already been deployed there’s no need to include its definition unless you want to update any of its properties.

status
string

It defines the desired status for the pipeline which can be one of the three: “ACTIVE”, “INACTIVE”, “PAUSED”. If not provided it will default to the current status of the pipeline

resource_size
string

It defines the amount of compute power to add to the pipeline. It can take one of the following values: “s”, “m”, “l”, “xl”, “xxl”. If not provided it will default to the current resource_size of the pipeline.

save_progress
boolean

It defines whether the pipeline should create a snapshot when this configuration is applied. It defaults to false

use_latest_snapshot
boolean

It defines whether the pipeline should restart from the latest available snapshot. Notice this command is useful in restarting scenarios. It defaults to true

restart
boolean

It instructs the pipeline to restart when this configuration is applied. It defaults to false

Pipeline configuration examples

Take a snapshot and restart the pipeline with more resources

name: base-logs-pipeline
restart: true
resource_size: xl

Restarting a pipeline from the latest safe point

Consider the scenario where your pipeline got in a corrupted state and you would like to restart it to a safe point in the past, avoiding taking a snapshot of your current status. You could achieve that with a configuration like the following:

name: base-logs-pipeline
description: a new description for my pipeline
restart: true
use_latest_snapshot: true
save_progress: false

If you run into any issues at all with setting pipelines up, feel free to ask us for help!