The definition yaml defines the sources, transformations, and sinks that comprise your pipeline. The --definition-path flag is used to use files (get them with goldsky pipeline get-definition <pipeline-name>) and --definition can be used to inline a full definition inside the command.

The YAML schema consists of three primary sections:

  • Sources: Denotes the origin of the data.
  • Transforms: Lists the transformations to be applied to the data from the source.
  • Sinks: Specifies where the transformed data should be sent.

Each source and transform has a reference name that can be used by other transforms or sinks. You can compose multiple transforms together as you need and pipe the results to a sink.

Below, we’ll explain each section of the YAML structure, and provide an example for each subset of the schema.

Sources

The sources array contains one or more source objects. There are currently two supported source types:

  • Subgraph Entities
  • Datasets

Subgraph Entities

This lets you define your own subgraphs as a pipeline source.

Example

sources:
  - type: subgraphEntity
    deployments:
      - id: QmVcgRByfiFSzZfi7RZ21gkJoGKG2jeRA1DrpvCQ6ficNb
    entity:
      name: fixed_product_market_maker
    referenceName: polymarket.fixed_product_market_maker

Datasets

Datasets let you define Direct Indexing sources. These datasources are curated by the Goldsky team, with automated QA guaranteeing correctness.

Example

sources:
  - type: dataset
    referenceName: ethereum.decoded_logs
    version: 1.0.0

To obtain the referenceName property, please use goldsky dataset list and select your chain of choice. Please refer to supported chains for an overview of what data is available for individual chains.

Transforms

The transforms array contains one or many transform objects, each with the following properties:

  • referenceName: The name of the transformation. This can be used by sinks as a sourceStreamName, or in any other transform’s SQL as a table.
  • type: The type of the transformation. Currently, only sql is supported.
  • sql: The SQL query to be performed. This can refer to any referenceName from sources and sinks as SQL tables.
  • primaryKey: The primary key for the transformation. If there are any two rows with the same primaryKey, the pipeline will override it with the latest value.

Transform Example

transforms:
  - referenceName: negative_fpmm_scaled_liquidity_parameter
    type: sql
    sql: SELECT id FROM polygon.fixed_product_market_maker WHERE scaled_liquidity_parameter < 0
    primaryKey: id

Sinks

The sinks array contains one or many sink objects, each with the following properties:

  • type: The sink type. This could be postgres or elasticsearch.
  • sourceStreamName: The source stream name for the sink.
  • referenceName: A name of your choice to uniquely identify this sink within a pipeline.
  • table: The table name to load into. This is required for sinks of type postgres.
  • schema: The schema for the sink. This is required for sinks of type postgres.
  • secretName: The name of the secret for the sink. This could be API_POSTGRES_CREDENTIALS or REDPANDA_CREDENTIALS.
  • topic: The topic to produce to. This is required for sinks of type kafka.

Sink Example

sinks:
  - type: postgres
    sourceStreamName: negative_fpmm_scaled_liquidity_parameter
    referenceName: postgres_test_negative_fpmm_scaled_liquidity_parameter
    table: test_negative_fpmm_scaled_liquidity_parameter
    schema: public
    secretName: API_POSTGRES_CREDENTIALS

End to end examples

You can run the following examples by copying the file into a local yaml file, and then using

# Assuming the yaml config is in pipeline.yaml
goldsky pipeline create test-pipeline --definition-path pipeline.yaml

Syncing a subgraph into postgres

This pipeline pulls data from a single subgraphEntity source, processes the data with a single SQL transformation, and stores the result into a PostgreSQL sink.

You will need to have the existing subgraph with the deployment QmVcgRByfiFSzZfi7RZ21gkJoGKG2jeRA1DrpvCQ6ficNb as a prerequisite to running this pipeline.

sources:
  - type: subgraphEntity
    deployments:
      - id: QmVcgRByfiFSzZfi7RZ21gkJoGKG2jeRA1DrpvCQ6ficNb
    namespace: polygon
    entity:
      name: fixed_product_market_maker
transforms:
  - name: negative_fpmm_scaled_liquidity_parameter
    type: sql
    sql: SELECT id FROM polygon.fixed_product_market_maker WHERE scaled_liquidity_parameter < 0
    primaryKey: id
sinks:
  - type: postgres
    sourceStreamName: negative_fpmm_scaled_liquidity_parameter
    table: test_negative_fpmm_scaled_liquidity_parameter
    schema: public
    secretName: API_POSTGRES_CREDENTIALS

Merging subgraphs cross-chain

This pipeline is named poap-extended-1. It pulls data from two subgraphEntity sources, does not perform any transformations, and stores the result into two separate PostgreSQL sinks.

sources:
  - type: subgraphEntity
    deployments:
      - id: QmbsFSmqsWFFcbxnGedXifyeTbKBSypczRcwPrBxdQdyXE
      - id: QmNSwC6QjZSFcSm2Tmoy6Van7g6zSEqD3yz4tDWRFdZiKh
      - id: QmZUh5Rp3edMhYj3wCH58zSNvZvrPSQyeM6AN5HTmyw2Ch
    referenceName: hashflow_cross_chain.pool_created
    entity:
      name: pool_created
  - type: subgraphEntity
    deployments:
      - id: QmbsFSmqsWFFcbxnGedXifyeTbKBSypczRcwPrBxdQdyXE
      - id: QmNSwC6QjZSFcSm2Tmoy6Van7g6zSEqD3yz4tDWRFdZiKh
      - id: QmZUh5Rp3edMhYj3wCH58zSNvZvrPSQyeM6AN5HTmyw2Ch
    referenceName: hashflow_cross_chain.update_router_permissions
    entity:
      name: update_router_permissions
transforms:
sinks:
  - type: postgres
    sourceStreamName: hashflow_cross_chain.pool_created
    table: test_pool_created
    schema: public
    secretName: API_POSTGRES_CREDENTIALS
  - type: postgres
    sourceStreamName: hashflow_cross_chain.update_router_permissions
    table: test_update_router_permissions
    schema: public
    secretName: API_POSTGRES_CREDENTIALS

Syncing a dataset into a postgres database

This pipeline is named decoded-logs. It pulls data from a curated goldsky dataset, without performing any transformations, and stores the result into a PostgreSQL sink, in a table called eth_logs in the goldsky schema.

sources:
  - referenceName: ethereum.decoded_logs
    version: 1.0.0
    type: dataset
    startAt: latest

transforms:
  - sql: |
      SELECT
          id,
          address,
          event_signature,
          event_params,
          raw_log.block_number as block_number,
          raw_log.block_hash as block_hash,
          raw_log.transaction_hash as transaction_hash
      FROM
          ethereum.decoded_logs
    referenceName: logs
    type: sql
    primaryKey: id

sinks:
  - type: postgres
    table: eth_logs
    schema: goldsky
    secretName: API_POSTGRES_CREDENTIALS
    sourceStreamName: logs

If you run into any issues at all with setting pipelines up, feel free to ask us for help!