We recently released v3 of pipeline configurations which uses a more intuitive and user friendly format to define and configure pipelines using a yaml file. For backward compatibility purposes, we will still support the previous v2 format. This is why you will find references to each format in each yaml file presented across the documentation. Feel free to use whichever is more comfortable for you but we encourage you to start migrating to v3 format.

This page includes info on the full Pipeline configuration schema. For conceuptal learning about Pipelines, please refer to the about Pipeline page.

name
string
required

Name of the pipeline. Must only contain lowercase letters, numbers, hyphens and should be less than 50 characters.

sources
object
required

Sources represent origin of the data into the pipeline.

Supported source types:

transforms
object
required

Transforms represent data transformation logic to be applied to either a source and/or transform in the pipeline. If your pipeline does not need to transform data, this attribute can be an empty object.

Supported transform types:

sinks
object
required

Sinks represent destination for source and/or transform data out of the pipeline.

Supported sink types:

resource_size
string

It defines the amount of compute power to add to the pipeline. It can take one of the following values: “s”, “m”, “l”, “xl”, “xxl”. For new pipeline creation, it defaults to “s”. For updates, it defaults to the current resource_size of the pipeline.

description
string

Description of the pipeline.

Sources

Represents the origin of the data into the pipeline. Each source has a unique name to be used as a reference in transforms/sinks.

sources.<key_name> is used as the referenceable name in other transforms and sinks.

Subgraph Entity

Use your subgraph as a source for your pipeline.

Example

In the sources section of your pipeline configuration, you can add a subgraph_entity per subgraph entity that you want to use.

sources:
  subgraph_account:
      type: subgraph_entity
      name: account
      subgraphs:
      - name: qidao-optimism
        version: 1.1.0
  subgraph_market_daily_snapshot:
      type: subgraph_entity
      name: market_daily_snapshot
      subgraphs:
      - name: qidao-optimism
        version: 1.1.0

Schema

sources.<key_name>
string
required

Unique name of the source. This is a user provided value.

type
string
required

Defines the type of the source, for Subgraph Entity sources, it is always subgraph_entity.

description
string

Description of the source

name
string
required

Entity name in your subgraph.

start_at
string

earliest processes data from the first block.

latest processes data from the latest block at pipeline start time.

Defaults to latest

filter
string

Filter expression that does a fast scan on the dataset. Only useful when start_at is set to earliest.

Expression follows the SQL standard for what comes after the WHERE clause. Few examples:

address = '0x21552aeb494579c772a601f655e9b3c514fda960'
address = '0xb794f5ea0ba39494ce839613ff2qasdf34353dga' OR address = '0x21552aeb494579c772a601f655e9b3c514fda960'
address = '0xb794f5ea0ba39494ce839613ff2qasdf34353dga' AND amount > 500
subgraphs
subgraphReference[]
required

References deployed subgraphs(s) that have the entity mentioned in the name attribute.

subgraphs:
  - name: polymarket
    version: 1.0.0

Supports subgraphs deployed across multiple chains aka cross-chain usecase.

subgraphs:
  - name: polymarket
    version: 1.0.0
  - name: base
    version: 1.1.0

Cross-chain subgraph full example

Dataset

Dataset lets you define Direct Indexing sources. These data sources are curated by the Goldsky team, with automated QA guaranteeing correctness.

Example

sources:
  base_logs:
    type: dataset
    dataset_name: base.logs
    version: 1.0.0

Schema

sources.<key_name>
string
required

Unique name of the source. This is a user provided value.

type
string
required

Defines the type of the source, for Dataset sources, it is always dataset

description
string

Description of the source

dataset_name
string
required

Name of a goldsky dataset. Please use goldsky dataset list and select your chain of choice.

Please refer to supported chains for an overview of what data is available for individual chains.

version
string
required

Version of the goldsky dataset in dataset_name.

start_at
string

earliest processes data from the first block.

latest processes data from the latest block at pipeline start time.

Defaults to latest

filter
string

Filter expression that does a fast scan on the dataset. Only useful when start_at is set to earliest.

Expression follows the SQL standard for what comes after the WHERE clause. Few examples:

address = '0x21552aeb494579c772a601f655e9b3c514fda960'
address = '0xb794f5ea0ba39494ce839613ff2qasdf34353dga' OR address = '0x21552aeb494579c772a601f655e9b3c514fda960'
address = '0xb794f5ea0ba39494ce839613ff2qasdf34353dga' AND amount > 500

Fast Scan

Processing full datasets (starting from earliest) (aka doing a Backfill) requires the pipeline to process significant amount of data which affects how quickly it reaches at edge (latest record in the dataset). This is especially true for datasets for larger chains.

However, in many use-cases, pipeline may only be interested in a small-subset of the historical data. In such cases, you can enable Fast Scan on your pipeline by defining the filter attribute in the dataset source.

The filter is pre-applied at the source level; making the initial ingestion of historical data much faster. When defining a filter please be sure to use attributes that exist in the dataset. You can get the schema of the dataset by running goldsky dataset get <dataset_name>.

See example below where we pre-apply a filter based on contract address:

sources:
  base_logs:
    type: dataset
    dataset_name: base.logs
    version: 1.0.0
    filter: address = '0x21552aeb494579c772a601f655e9b3c514fda960'

Transforms

Represents data transformation logic to be applied to either a source and/or transform in the pipeline. Each transform has a unique name to be used as a reference in transforms/sinks.

transforms.<key_name> is used as the referenceable name in other transforms and sinks.

SQL

SQL query that transforms or filters the data from a source or another transform.

Example

transforms:
  negative_fpmm_scaled_liquidity_parameter:
    sql: SELECT id FROM polymarket.fixed_product_market_maker WHERE scaled_liquidity_parameter < 0
    primary_key: id

Schema

transforms.<key_name>
string
required

Unique name of the transform. This is a user provided value.

type
string
required

Defines the type of the transform, for SQL transforms it is always sql

sql
string
required

The SQL query to be executed on either source or transform in the pipeline.

The source data for sql transform is determined by the FROM <table_name> part of the query. Any source or transform can be referenced as SQL table.

primary_key
string
required

The primary key for the transformation. If there are any two rows with the same primary_key, the pipeline will override it with the latest value.

Handler

Lets you transform data by sending data to a handler endpoint.

Example

transforms:
  my_external_handler_transform:
    type: handler
    primary_key: id
    url: http://example-url/example-transform-route
    from: ethereum.raw_blocks

Schema

transforms.<key_name>
string
required

Unique name of the transform. This is a user provided value.

type
string
required

Defines the type of the transform, for Handler transforms it is always handler

url
string
required

Endpoint to send the data for transformation.

from
string
required

Data source for the transform. Reference a source/transform defined in this pipeline.

Data sent to your handler will have the same schema as this source/transform.

primary_key
string
required

The primary key for the transformation. If there are any two rows with the same primary_key, the pipeline will override it with the latest value.

payload_columns
string[]
required

The primary key for the transformation. If there are any two rows with the same primary_key, the pipeline will override it with the latest value.

schema_override
object

Allows overriding the schema of the response data returned by the handler. Default is to expect the same schema as source|transform referenced in the from attribute.

A map of column names to Flink SQL datatypes. If the handler response schema changes the pipeline needs to be re-deployed with this attribute updated.

To add a new attribute: new_attribute_name: datatype To remove an existing attribute: existing_attribute_name: null To change an existing attribute’s datatype: existing_attribute_name: datatype

headers
object

Headers to be sent in the request from the pipeline to the handler endpoint.

A common use case is to pass any tokens your server requires for authentication or any metadata.

secret_name
string
required

Goldksy secret name that contains credentials for calls between the pipeline and the handler. For handler transform, use the httpauth secret type.

Sinks

Represents destination for source and/or transform data out of the pipeline. Since sinks represent the end of the dataflow in the pipeline, unlike source and transform, it does not need to be referenced elsewhere in the configuration.

Most sinks are either databases such as postgresql, dynamodb etc. Or channels such as kafka, sqs etc.

Also, most sinks are provided by the user, hence the pipeline needs credentials to be able to write data to a sink. Thus, users need to create a Goldsky Secret and reference it in the sink.

PostgreSQL

Lets you sink data to a PostgreSQL table.

Example

sinks:
  postgres_test_negative_fpmm_scaled_liquidity_parameter:
    type: postgres
    from: negative_fpmm_scaled_liquidity_parameter
    table: test_negative_fpmm_scaled_liquidity_parameter
    schema: public
    secret_name: API_POSTGRES_CREDENTIALS

Schema

sinks.<key_name>
string
required

Unique name of the sink. This is a user provided value.

type
string
required

Defines the type of the sink, for postgresql it is always postgressql

description
string

User provided description.

from
string
required

Data source for the sink. Reference to either a source or a transform defined in this pipeline.

table
string
required

The destination table. It will be created if it doesn’t exist. Schema is defined in the secret credentials.

secret_name
string
required

Goldksy secret name that contains credentials for calls between the pipeline and the sink. For postgres sink, use the jdbc secret type.

batch_size
integer

The maximum time (in milliseconds) the pipeline will batch records. Default 100

batch_flush_interval
string

The maximum time the pipeline will batch records before flushing to sink. Default: ‘1s’

scan_autocommit
boolean

Enables auto commit. Default: true

rewrite_batched_inserts
boolean

Rewrite individual insert statements into multi-value insert statements. Default true

conditional_upsert_column
boolean

Optional column that will be used to select the ‘correct’ row in case of conflict using the ‘greater’ wins strategy: - ie later date, higher number. The column must be numeric.

Clickhouse

Lets you sink data to a Clickhouse table.

Example

v3 example

Schema

sinks.<key_name>
string
required

Unique name of the sink. This is a user provided value.

type
string
required

Defines the type of the sink, for Clickhouse it is always clickhouse

description
string

User provided description.

from
string
required

Data source for the sink. Reference to either a source or a transform defined in this pipeline.

secret_name
string
required

Goldksy secret name that contains credentials for calls between the pipeline and the sink. For postgres sink, use the jdbc secret type.

table
string
required

The destination table. It will be created if it doesn’t exist. Schema is defined in the secret credentials.

batch_size
integer

The maximum time (in milliseconds) the pipeline will batch records. Default 1000

batch_flush_interval
string

The maximum time the pipeline will batch records before flushing to sink. Default: ‘1s’

append_only_mode
boolean

Only do inserts on the table and not update or delete. Increases insert speed and reduces Flush exceptions (which happen when too many mutations are queued up). More details in the Clickhouse guide. Default true.

version_column_name
string

Column name to be used as a version number. Only used in append_only_mode = true.

primary_key_override
string

Use a different primary key than the one that automatically inferred from the source and/or transform.

schema_override
object

Ability to override the automatic schema propagation from the pipeline to Clickhouse. Map of column_name -> clickhouse_datatype

Useful in situations when data type is incompatible between the pipeline and Clickhouse. Or when wanting to use specific type for a column.

MySQL

Lets you sink data to a MySQL table.

Example

sinks:
  postgres_test_negative_fpmm_scaled_liquidity_parameter:
    type: postgres
    from: negative_fpmm_scaled_liquidity_parameter
    table: test_negative_fpmm_scaled_liquidity_parameter
    schema: public
    secret_name: API_POSTGRES_CREDENTIALS

Schema

sinks.<key_name>
string
required

Unique name of the sink. This is a user provided value.

type
string
required

Defines the type of the sink, for postgresql it is always postgressql

description
string

User provided description.

from
string
required

Data source for the sink. Reference to either a source or a transform defined in this pipeline.

schema
string
required

Database name

table
string
required

The destination table. It will be created if it doesn’t exist. Schema is defined in the secret credentials.

secret_name
string
required

Goldksy secret name that contains credentials for calls between the pipeline and the sink. For postgres sink, use the jdbc secret type.

batch_size
integer

The maximum time (in milliseconds) the pipeline will batch events. Default 100

batch_flush_interval
string

The maximum time the pipeline will batch events before flushing to sink. Default: ‘1s’

scan_autocommit
boolean

Enables auto commit. Default: true

rewrite_batched_inserts
boolean

Rewrite individual insert statements into multi-value insert statements. Default true

conditional_upsert_column
boolean

Optional column that will be used to select the ‘correct’ row in case of conflict using the ‘greater’ wins strategy: - ie later date, higher number. The column must be numeric.

Lets you sink data to a Elastic Search index.

Example

v3 example

Schema

sinks.<key_name>
string
required

Unique name of the sink. This is a user provided value.

type
string
required

Defines the type of the sink, for Elastic Search it is always elasticsearch

description
string

User provided description.

from
string
required

Data source for the sink. Reference to either a source or a transform defined in this pipeline.

index
string
required

Elastic search index to write to.

secretName
string
required

Goldksy secret name that contains credentials for calls between the pipeline and the sink. For Elastic Search sink, use the elasticSearch secret type.

Example

 sinks:
    my_elasticsearch_sink:
      description: Type.Optional(Type.String())
      type: elasticsearch
      from: Type.String()
      index: Type.String()
      secret_name: Type.String()

Schema

sinks.<key_name>
string
required

Unique name of the sink. This is a user provided value.

type
string
required

Defines the type of the sink, for Elastic Search it is always elasticsearch

description
string

User provided description.

from
string
required

Data source for the sink. Reference to either a source or a transform defined in this pipeline.

index
string
required

Elastic search index to write to.

secretName
string
required

Goldksy secret name that contains credentials for calls between the pipeline and the sink. For Elastic Search sink, use the elasticSearch secret type.

Kafka

Lets you sink data to a Kafka topic.

Example

sinks:
  kafka_topic_sink:
    type: kafka
    from: my_source
    topic: accounts
    secret_name: KAFKA_SINK_SECRET_CR343D
    topic_partitions: 2

Schema

sinks.<key_name>
string
required

Unique name of the sink. This is a user provided value.

type
string
required

Defines the type of the sink, for Kafka sink it is always kafka

description
string

User provided description.

from
string
required

Data source for the sink. Reference to either a source or a transform defined in this pipeline.

topic
string
required

Kafka topic name to write to. Will be created if it does not exist.

topic_partitions
string

Number of paritions to be set in the topic. Only applicable if topic does not exists.

upsert_mode
boolean

When set to true, the sink will emit tombstone messages (null values) for DELETE operations instead of the actual payload. This is useful for maintaining the state in Kafka topics where the latest state of a key is required, and older states should be logically deleted. Default false

format
string

Format of the record in the topic. Supported types: json, avro. Requires Schema Registry credentials in the secret for avro type.

secret_name
string
required

Goldksy secret name that contains credentials for calls between the pipeline and the sink. For Kafka sink, use the kafka secret type.

File

Example

sinks:
  s3_write:
    type: file
    path: s3://goldsky/linea/traces/
    format: parquet
    from: linea.traces
    secret_name: GOLDSKY_S3_CREDS

Schema

transforms.<key_name>
string
required

Unique name of the sink. This is a user provided value.

type
string
required

Defines the type of the sink, for File sink it is always file

description
string

User provided description.

from
string
required

Data source for the sink. Reference to either a source or a transform defined in this pipeline.

path
string
required

Path to write to. Use prefix s3://. Currently, only S3 is supported.

format
string
required

Format of the output file. Supported types: parquet, csv.

auto_compaction
boolean

Enables auto-compaction which helps optimize the output file size. Default false

partition_columns
boolean

Columns to be used for partitioning. Multiple columns are comma separated. For eg: "col1,col2"

batch_size
string

The maximum sink file size before creating a new one. Default: 128MB

batch_flush_interval
string

The maximum time the pipeline will batch records before flushing to sink. Default: 30min

DynamoDB

Example

sinks:
  postgres_test_negative_fpmm_scaled_liquidity_parameter:
    type: postgres
    from: negative_fpmm_scaled_liquidity_parameter
    table: test_negative_fpmm_scaled_liquidity_parameter
    schema: public
    secret_name: API_POSTGRES_CREDENTIALS

Schema

sinks.<key_name>
string
required

Unique name of the sink. This is a user provided value.

type
string
required

Defines the type of the sink, for Clickhouse it is always clickhouse

description
string

User provided description.

from
string
required

Data source for the sink. Reference to either a source or a transform defined in this pipeline.

secret_name
string
required

Goldksy secret name that contains credentials for calls between the pipeline and the sink. For DynamoDB sink, use the dynamodb secret type.

table
string
required

The destination table. It will be created if it doesn’t exist.

endpoint
integer

Endpoint override, useful when writing to a DynamoDB VPC

request_max_in_flight
integer

Maximum number of requests in flight. Default 50

batch_max_size
integer

Batch max size. Default: 25

request_max_buffered
string

Maximum number of records to buffer. Default: 10000

fail_on_error
boolean

Fail the sink on write error. Default false

Webhook

Example

sinks:
  webhook_publish:
    type: webhook
    from: base.logs
    url: https://webhook.site/d06324e8-d273-45b4-a18b-c4ad69c6e7e6
    secret_name: WEBHOOK_SECRET_CM3UPDBJC0

Schema

sinks.<key_name>
string
required

Unique name of the sink. This is a user provided value.

type
string
required

Defines the type of the sink, for Webhook sinks it is always webhook

url
string
required

Defines the URL to send the record(s) to.

one_row_per_request
boolean

Send only one record per call to the provided url

secret_name
boolean

Goldksy secret name that contains credentials for calls between the pipeline and the sink. Use this if you do not want to expose authenciation details in plain text in the headers attribute.

For webhook sink, use the httpauth secret type.

headers
object

Headers to be sent in the request from the pipeline to the url

description
string

User provided description.

SQS

Lets you sink data to a AWS SQS topic.

Example

sinks:
  my_sqs_sink:
    type: sqs
    url: https://sqs.us-east-1.amazonaws.com/335342423/dev-logs
    secret_name: SQS_SECRET_IAM
    from: my_transform

Schema

sinks.<key_name>
string
required

Unique name of the sink. This is a user provided value.

type
string
required

Defines the type of the sink, for postgresql it is always postgressql

description
string

User provided description.

from
string
required

Data source for the sink. Reference to either a source or a transform defined in this pipeline.

secret_name
boolean

Goldksy secret name that contains credentials for calls between the pipeline and the sink. Use this if you do not want to expose authenciation details in plain text in the headers attribute.

For sqs sink, use the sqs secret type.

url
object

SQS topic URL

fail_on_error
boolean

Fail the sink on write error. Default false

Pipeline runtime attributes

While sources, transforms and sinks define the business logic of your pipeline. There are attributes that change the pipeline execution/runtime.

If you need a refresher on the of pipelines make sure to check out About Pipeline, here we’ll just focus on specific attributes.

Following are request-level attributes that only controls the behavior of a particular request on the pipeline. These attributes should be passed via arguments to the goldsky pipeline apply <config_file> <arguments/flags> command.

status
string

Defines the desired status for the pipeline which can be one of the three: “ACTIVE”, “INACTIVE”, “PAUSED”. If not provided it will default to the current status of the pipeline.

save_progress
boolean

Defines whether the pipeline should attempt to create a fresh snapshot before this configuration is applied. The pipeline needs to be in a healthy state for snapshot to be created successfully. It defaults to true.

use_latest_snapshot
boolean

Defines whether the pipeline should be started from the latest available snapshot. This attribute is useful in restarting scenarios. To restart a pipeline from scratch, use --use_latest_snapshot false. It defaults to true.

restart
boolean

Instructs the pipeline to restart. Useful in scenarios where the pipeline needs to be restarted but no configuration change is needed. It defaults to undefined.

Pipeline Runtime Commands

Commands that change the pipeline runtime.

Start

There are multiple ways to do this:

  • goldsky pipeline start <name_or_path_to_config_file>
  • goldsky pipeline apply <name_or_path_to_config_file> --status ACTIVE

This command will have no effect on pipeline that already has a desired status of ACTIVE.

Pause

Pause will attempt to take a snapshot and stop the pipeline so that it can be resumed later.

There are multiple ways to do this:

  • goldsky pipeline pause <name_or_path_to_config_file>
  • goldsky pipeline apply <name_or_path_to_config_file> --status PAUSED

Stop

Stopping a pipeline does not attempt to take a snapshot.

There are multiple ways to do this:

  • goldsky pipeline stop <pipeline_name(if exists) or path_to_config>
  • goldsky pipeline apply <path_to_config> --status INACTIVE --from-snapshot none
  • goldsky pipeline apply <path_to_config> --status INACTIVE --save-progress false (prior to CLI version 11.0.0)

Update

Make any needed changes to the pipeline configuration file and run goldsky pipeline apply <name_or_path_to_config_file>.

By default any update on a RUNNING pipeline will attempt to take a snapshot before applying the update.

If you’d like to avoid taking snapshot as part of the update, run:

  • goldsky pipeline apply <name_or_path_to_config_file> --from-snapshot last
  • goldsky pipeline apply <name_or_path_to_config_file> --save-progress false (prior to CLI version 11.0.0)

This is useful in a situations where the pipeline is running into issues, hence the snapshot will not succeed, blocking the update that is to fix the issue.

Resize

Useful in scenarios where the pipeline is running into resource constraints.

There are multiple ways to do this:

  • goldsky pipeline resize <resource_size>
  • goldsky pipeline apply <name_or_path_to_config_file> with the config file having the attribute:
resource_size: xl

Restart

goldsky pipeline apply <path_to_configuration> --restart

Useful in the scenarios where a restart is needed but there are no changes in the configuration. For example, pipeline sink’s database connection got stuck because the database has restarted.

By default, restart will attempt a new snapshot and start the pipeline from that particular snapshot.

To avoid using any existing snapshot or triggering a new one (aka starting from scratch) add the --from-snapshot none or --save-progress false --use-latest-snapshot false if you are using CLI version older than 11.0.0.

Monitor

Provides pipeline runtime information that is helpful for monitoring/developing a pipeline. Although this command does not change the runtime, it provides info like status, metrics, logs etc. that helps with devleloping a pipeline.

goldsky pipeline monitor <name_or_path_to_config_file>