Skip to main content
This page includes info on the full Pipeline configuration schema. For conceuptal learning about Pipelines, please refer to the about Pipeline page.
name
string
required
Name of the pipeline. Must only contain lowercase letters, numbers, hyphens and should be less than 50 characters.
sources
object
required
Sources represent origin of the data into the pipeline.Supported source types:
transforms
object
required
Transforms represent data transformation logic to be applied to either a source and/or transform in the pipeline. If your pipeline does not need to transform data, this attribute can be an empty object.Supported transform types:
sinks
object
required
Sinks represent destination for source and/or transform data out of the pipeline.Supported sink types:
resource_size
string
It defines the amount of compute power to add to the pipeline. It can take one of the following values: “s”, “m”, “l”, “xl”, “xxl”. For new pipeline creation, it defaults to “s”. For updates, it defaults to the current resource_size of the pipeline.
description
string
Description of the pipeline.

Sources

Represents the origin of the data into the pipeline. Each source has a unique name to be used as a reference in transforms/sinks. sources.<key_name> is used as the referenceable name in other transforms and sinks.

Subgraph Entity

Use your subgraph as a source for your pipeline.

Example

In the sources section of your pipeline configuration, you can add a subgraph_entity per subgraph entity that you want to use.
example.yaml
sources:
  subgraph_account:
      type: subgraph_entity
      name: account
      subgraphs:
      - name: qidao-optimism
        version: 1.1.0
  subgraph_market_daily_snapshot:
      type: subgraph_entity
      name: market_daily_snapshot
      subgraphs:
      - name: qidao-optimism
        version: 1.1.0

Schema

sources.<key_name>
string
required
Unique name of the source. This is a user provided value.
type
string
required
Defines the type of the source, for Subgraph Entity sources, it is always subgraph_entity.
description
string
Description of the source
name
string
required
Entity name in your subgraph.
start_at
string
earliest processes data from the first block.latest processes data from the latest block at pipeline start time.Defaults to latest
filter
string
Filter expression that does a fast scan on the dataset. Only useful when start_at is set to earliest.Expression follows the SQL standard for what comes after the WHERE clause. Few examples:
address = '0x21552aeb494579c772a601f655e9b3c514fda960'
address = '0xb794f5ea0ba39494ce839613ff2qasdf34353dga' OR address = '0x21552aeb494579c772a601f655e9b3c514fda960'
address = '0xb794f5ea0ba39494ce839613ff2qasdf34353dga' AND amount > 500
subgraphs
subgraphReference[]
required
References deployed subgraphs(s) that have the entity mentioned in the name attribute.
subgraphs:
  - name: polymarket
    version: 1.0.0
Supports subgraphs deployed across multiple chains aka cross-chain usecase.
subgraphs:
  - name: polymarket
    version: 1.0.0
  - name: base
    version: 1.1.0
Cross-chain subgraph full example

Dataset

Dataset lets you define Direct Indexing sources. These data sources are curated by the Goldsky team, with automated QA guaranteeing correctness.

Example

sources:
  base_logs:
    type: dataset
    dataset_name: base.logs
    version: 1.0.0

Schema

sources.<key_name>
string
required
Unique name of the source. This is a user provided value.
type
string
required
Defines the type of the source, for Dataset sources, it is always dataset
description
string
Description of the source
dataset_name
string
required
Name of a goldsky dataset. Please use goldsky dataset list and select your chain of choice.Please refer to supported chains for an overview of what data is available for individual chains.
version
string
required
Version of the goldsky dataset in dataset_name.
start_at
string
earliest processes data from the first block.latest processes data from the latest block at pipeline start time.Defaults to latest
filter
string
Filter expression that does a fast scan on the dataset. Only useful when start_at is set to earliest.Expression follows the SQL standard for what comes after the WHERE clause. Few examples:
address = '0x21552aeb494579c772a601f655e9b3c514fda960'
address = '0xb794f5ea0ba39494ce839613ff2qasdf34353dga' OR address = '0x21552aeb494579c772a601f655e9b3c514fda960'
address = '0xb794f5ea0ba39494ce839613ff2qasdf34353dga' AND amount > 500

Fast Scan

Processing full datasets (starting from earliest) (aka doing a Backfill) requires the pipeline to process significant amount of data which affects how quickly it reaches at edge (latest record in the dataset). This is especially true for datasets for larger chains. However, in many use-cases, pipeline may only be interested in a small-subset of the historical data. In such cases, you can enable Fast Scan on your pipeline by defining the filter attribute in the dataset source. The filter is pre-applied at the source level; making the initial ingestion of historical data much faster. When defining a filter please be sure to use attributes that exist in the dataset. You can get the schema of the dataset by running goldsky dataset get <dataset_name>. See example below where we pre-apply a filter based on contract address:
sources:
  base_logs:
    type: dataset
    dataset_name: base.logs
    version: 1.0.0
    filter: address = '0x21552aeb494579c772a601f655e9b3c514fda960'

Transforms

Represents data transformation logic to be applied to either a source and/or transform in the pipeline. Each transform has a unique name to be used as a reference in transforms/sinks. transforms.<key_name> is used as the referenceable name in other transforms and sinks.

SQL

SQL query that transforms or filters the data from a source or another transform.

Example

transforms:
  negative_fpmm_scaled_liquidity_parameter:
    sql: SELECT id FROM polymarket.fixed_product_market_maker WHERE scaled_liquidity_parameter < 0
    primary_key: id

Schema

Handler

Lets you transform data by sending data to a handler endpoint.

Example

transforms:
  my_external_handler_transform:
    type: handler
    primary_key: id
    url: http://example-url/example-transform-route
    from: ethereum.raw_blocks

Schema

Sinks

Represents destination for source and/or transform data out of the pipeline. Since sinks represent the end of the dataflow in the pipeline, unlike source and transform, it does not need to be referenced elsewhere in the configuration. Most sinks are provided by the user, hence the pipeline needs credentials to be able to write data to a sink. Thus, users need to create a Goldsky Secret and reference it in the sink.

PostgreSQL

Lets you sink data to a PostgreSQL table.

Example

sinks:
  postgres_test_negative_fpmm_scaled_liquidity_parameter:
    type: postgres
    from: negative_fpmm_scaled_liquidity_parameter
    table: test_negative_fpmm_scaled_liquidity_parameter
    schema: public
    secret_name: API_POSTGRES_CREDENTIALS

Schema

Clickhouse

Lets you sink data to a Clickhouse table.

Example

sinks:
  my_clickhouse_sink:
    type: clickhouse
    from: my_source
    table: my_table
    database: my_database
    secret_name: MY_CLICKHOUSE_CREDENTIALS

Schema

MySQL

Lets you sink data to a MySQL table.

Example

sinks:
  postgres_test_negative_fpmm_scaled_liquidity_parameter:
    type: postgres
    from: negative_fpmm_scaled_liquidity_parameter
    table: test_negative_fpmm_scaled_liquidity_parameter
    schema: public
    secret_name: API_POSTGRES_CREDENTIALS

Schema

Lets you sink data to a Elastic Search index.

Example

sinks:
  my_elasticsearch_sink:
    type: elasticsearch
    from: my_source
    index: my_index
    secret_name: MY_ELASTICSEARCH_CREDENTIALS

Schema

Example

 sinks:
    my_elasticsearch_sink:
      description: Type.Optional(Type.String())
      type: elasticsearch
      from: Type.String()
      index: Type.String()
      secret_name: Type.String()

Schema

Kafka

Lets you sink data to a Kafka topic.

Example

sinks:
  kafka_topic_sink:
    type: kafka
    from: my_source
    topic: accounts
    secret_name: KAFKA_SINK_SECRET_CR343D
    topic_partitions: 2

Schema

File

Example

sinks:
  s3_write:
    type: file
    path: s3://goldsky/linea/traces/
    format: parquet
    from: linea.traces
    secret_name: GOLDSKY_S3_CREDS

Schema

DynamoDB

Example

sinks:
  postgres_test_negative_fpmm_scaled_liquidity_parameter:
    type: postgres
    from: negative_fpmm_scaled_liquidity_parameter
    table: test_negative_fpmm_scaled_liquidity_parameter
    schema: public
    secret_name: API_POSTGRES_CREDENTIALS

Schema

Webhook

Example

sinks:
  webhook_publish:
    type: webhook
    from: base.logs
    url: https://webhook.site/d06324e8-d273-45b4-a18b-c4ad69c6e7e6
    secret_name: WEBHOOK_SECRET_CM3UPDBJC0

Schema

SQS

Lets you sink data to a AWS SQS topic.

Example

sinks:
  my_sqs_sink:
    type: sqs
    url: https://sqs.us-east-1.amazonaws.com/335342423/dev-logs
    secret_name: SQS_SECRET_IAM
    from: my_transform

Schema

Pipeline runtime attributes

While sources, transforms and sinks define the business logic of your pipeline. There are attributes that change the pipeline execution/runtime. If you need a refresher on the of pipelines make sure to check out About Pipeline, here we’ll just focus on specific attributes. Following are request-level attributes that only controls the behavior of a particular request on the pipeline. These attributes should be passed via arguments to the goldsky pipeline apply <config_file> <arguments/flags> command.
status
string
Defines the desired status for the pipeline which can be one of the three: “ACTIVE”, “INACTIVE”, “PAUSED”. If not provided it will default to the current status of the pipeline.
save_progress
boolean
Defines whether the pipeline should attempt to create a fresh snapshot before this configuration is applied. The pipeline needs to be in a healthy state for snapshot to be created successfully. It defaults to true.
use_latest_snapshot
boolean
Defines whether the pipeline should be started from the latest available snapshot. This attribute is useful in restarting scenarios. To restart a pipeline from scratch, use --use_latest_snapshot false. It defaults to true.
restart
boolean
Instructs the pipeline to restart. Useful in scenarios where the pipeline needs to be restarted but no configuration change is needed. It defaults to undefined.

Pipeline Runtime Commands

Commands that change the pipeline runtime. Many commands aim to abstract away the above attributes into meaningful actions.

Start

There are multiple ways to do this:
  • goldsky pipeline start <name_or_path_to_config_file>
  • goldsky pipeline apply <name_or_path_to_config_file> --status ACTIVE
This command will have no effect on pipeline that already has a desired status of ACTIVE.

Pause

Pause will attempt to take a snapshot and stop the pipeline so that it can be resumed later. There are multiple ways to do this:
  • goldsky pipeline pause <name_or_path_to_config_file>
  • goldsky pipeline apply <name_or_path_to_config_file> --status PAUSED

Stop

Stopping a pipeline does not attempt to take a snapshot. There are multiple ways to do this:
  • goldsky pipeline stop <pipeline_name(if exists) or path_to_config>
  • goldsky pipeline apply <path_to_config> --status INACTIVE --from-snapshot none
  • goldsky pipeline apply <path_to_config> --status INACTIVE --save-progress false (prior to CLI version 11.0.0)

Update

Make any needed changes to the pipeline configuration file and run goldsky pipeline apply <name_or_path_to_config_file>. By default any update on a RUNNING pipeline will attempt to take a snapshot before applying the update. If you’d like to avoid taking snapshot as part of the update, run:
  • goldsky pipeline apply <name_or_path_to_config_file> --from-snapshot last
  • goldsky pipeline apply <name_or_path_to_config_file> --save-progress false (prior to CLI version 11.0.0)
This is useful in a situations where the pipeline is running into issues, hence the snapshot will not succeed, blocking the update that is to fix the issue.

Resize

Useful in scenarios where the pipeline is running into resource constraints. There are multiple ways to do this:
  • goldsky pipeline resize <resource_size>
  • goldsky pipeline apply <name_or_path_to_config_file> with the config file having the attribute:
resource_size: xl

Restart

Useful in the scenarios where a restart is needed but there are no changes in the configuration. For example, pipeline sink’s database connection got stuck because the database has restarted. There are multiple ways to restart a RUNNING pipeline without any configuration changes:
  1. goldsky pipeline restart <path_to_config_or_name> --from-snapshot last|none
The above command will attempt to restart the pipeline. To restart with no snapshot aka from scratch, provide the --from-snapshot none option. To restart with last available snapshot, provide the --from-snapshot last option.
  1. goldsky pipeline apply <path_to_configuration> --restart (CLI version below 10.0.0)
By default, the above command will will attempt a new snapshot and start the pipeline from that particular snapshot. To avoid using any existing snapshot or triggering a new one (aka starting from scratch) add the --from-snapshot none or --save-progress false --use-latest-snapshot false if you are using CLI version older than 11.0.0.

Monitor

Provides pipeline runtime information that is helpful for monitoring/developing a pipeline. Although this command does not change the runtime, it provides info like status, metrics, logs etc. that helps with devleloping a pipeline. goldsky pipeline monitor <name_or_path_to_config_file>