Mirror Pipeline Configuration Schema
Schema details for pipeline configurations
We recently released v3 of pipeline configurations which uses a more intuitive and user friendly format to define and configure pipelines using a yaml file. For backward compatibility purposes, we will still support the previous v2 format. This is why you will find references to each format in each yaml file presented across the documentation. Feel free to use whichever is more comfortable for you but we encourage you to start migrating to v3 format.
This page includes info on the full Pipeline configuration schema. For conceuptal learning about Pipelines, please refer to the about Pipeline page.
Name of the pipeline. Must only contain lowercase letters, numbers, hyphens and should be less than 50 characters.
Sources represent origin of the data into the pipeline.
Supported source types:
Transforms represent data transformation logic to be applied to either a source and/or transform in the pipeline. If your pipeline does not need to transform data, this attribute can be an empty object.
Supported transform types:
Sinks represent destination for source and/or transform data out of the pipeline.
Supported sink types:
It defines the amount of compute power to add to the pipeline. It can take one of the following values: “s”, “m”, “l”, “xl”, “xxl”. For new pipeline creation, it defaults to “s”. For updates, it defaults to the current resource_size of the pipeline.
Description of the pipeline.
Sources
Represents the origin of the data into the pipeline. Each source has a unique name to be used as a reference in transforms/sinks.
sources.<key_name>
is used as the referenceable name in other transforms and sinks.
Subgraph Entity
Use your subgraph as a source for your pipeline.
Example
In the sources section of your pipeline configuration, you can add a subgraph_entity
per subgraph entity that you want to use.
Schema
Unique name of the source. This is a user provided value.
Defines the type of the source, for Subgraph Entity sources, it is always subgraph_entity
.
Description of the source
Entity name
in your subgraph.
earliest
processes data from the first block.
latest
processes data from the latest block at pipeline start time.
Defaults to latest
Filter expression that does a fast scan on the dataset. Only useful when start_at
is set to earliest
.
Expression follows the SQL standard for what comes after the WHERE clause. Few examples:
References deployed subgraphs(s) that have the entity mentioned in the name
attribute.
Supports subgraphs deployed across multiple chains aka cross-chain usecase.
Dataset
Dataset lets you define Direct Indexing sources. These data sources are curated by the Goldsky team, with automated QA guaranteeing correctness.
Example
Schema
Unique name of the source. This is a user provided value.
Defines the type of the source, for Dataset sources, it is always dataset
Description of the source
Name of a goldsky dataset. Please use goldsky dataset list
and select your chain of choice.
Please refer to supported chains for an overview of what data is available for individual chains.
Version of the goldsky dataset in dataset_name
.
earliest
processes data from the first block.
latest
processes data from the latest block at pipeline start time.
Defaults to latest
Fast Scan
Processing full datasets (starting from earliest
) (aka doing a Backfill) requires the pipeline to process significant amount of data which affects how quickly it reaches at edge (latest record in the dataset). This is especially true for datasets for larger chains.
However, in many use-cases, pipeline may only be interested in a small-subset of the historical data. In such cases, you can enable Fast Scan on your pipeline by defining the filter
attribute in the dataset
source.
The filter is pre-applied at the source level; making the initial ingestion of historical data much faster. When defining a filter
please be sure to use attributes that exist in the dataset. You can get the schema of the dataset by running goldsky dataset get <dataset_name>
.
See example below where we pre-apply a filter based on contract address:
Transforms
Represents data transformation logic to be applied to either a source and/or transform in the pipeline. Each transform has a unique name to be used as a reference in transforms/sinks.
transforms.<key_name>
is used as the referenceable name in other transforms and sinks.
SQL
SQL query that transforms or filters the data from a source
or another transform
.
Example
Schema
Unique name of the transform. This is a user provided value.
Defines the type of the transform, for SQL transforms it is always sql
The SQL query to be executed on either source or transform in the pipeline.
The source data for sql transform is determined by the FROM <table_name>
part of the query. Any source or transform can be referenced as SQL table.
The primary key for the transformation. If there are any two rows with the same primary_key, the pipeline will override it with the latest value.
Handler
Lets you transform data by sending data to a handler endpoint.
Example
Schema
Unique name of the transform. This is a user provided value.
Defines the type of the transform, for Handler transforms it is always handler
Endpoint to send the data for transformation.
Data source for the transform. Reference a source/transform defined in this pipeline.
Data sent to your handler will have the same schema as this source/transform.
The primary key for the transformation. If there are any two rows with the same primary_key, the pipeline will override it with the latest value.
The primary key for the transformation. If there are any two rows with the same primary_key, the pipeline will override it with the latest value.
Allows overriding the schema of the response data returned by the handler. Default is to expect the same schema as source|transform
referenced in the from
attribute.
A map of column names to Flink SQL datatypes. If the handler response schema changes the pipeline needs to be re-deployed with this attribute updated.
To add a new attribute: new_attribute_name: datatype
To remove an existing attribute: existing_attribute_name: null
To change an existing attribute’s datatype: existing_attribute_name: datatype
Headers to be sent in the request from the pipeline to the handler endpoint.
A common use case is to pass any tokens your server requires for authentication or any metadata.
Goldksy secret name that contains credentials for calls between the pipeline and the handler.
For handler transform, use the httpauth
secret type.
Sinks
Represents destination for source and/or transform data out of the pipeline. Since sinks represent the end of the dataflow in the pipeline, unlike source and transform, it does not need to be referenced elsewhere in the configuration.
Most sinks are either databases such as postgresql
, dynamodb
etc. Or channels such as kafka
, sqs
etc.
Also, most sinks are provided by the user, hence the pipeline needs credentials to be able to write data to a sink. Thus, users need to create a Goldsky Secret and reference it in the sink.
PostgreSQL
Lets you sink data to a PostgreSQL table.
Example
Schema
Unique name of the sink. This is a user provided value.
Defines the type of the sink, for postgresql it is always postgressql
User provided description.
Data source for the sink. Reference to either a source or a transform defined in this pipeline.
The destination table. It will be created if it doesn’t exist. Schema is defined in the secret credentials.
Goldksy secret name that contains credentials for calls between the pipeline and the sink.
For postgres sink, use the jdbc
secret type.
The maximum time (in milliseconds) the pipeline will batch records. Default 100
The maximum time the pipeline will batch records before flushing to sink. Default: ‘1s’
Enables auto commit. Default: true
Rewrite individual insert statements into multi-value insert statements. Default true
Optional column that will be used to select the ‘correct’ row in case of conflict using the ‘greater’ wins strategy: - ie later date, higher number. The column must be numeric.
Clickhouse
Lets you sink data to a Clickhouse table.
Example
v3 example
Schema
Unique name of the sink. This is a user provided value.
Defines the type of the sink, for Clickhouse it is always clickhouse
User provided description.
Data source for the sink. Reference to either a source or a transform defined in this pipeline.
Goldksy secret name that contains credentials for calls between the pipeline and the sink.
For postgres sink, use the jdbc
secret type.
The destination table. It will be created if it doesn’t exist. Schema is defined in the secret credentials.
The maximum time (in milliseconds) the pipeline will batch records. Default 1000
The maximum time the pipeline will batch records before flushing to sink. Default: ‘1s’
Only do inserts on the table and not update or delete.
Increases insert speed and reduces Flush exceptions (which happen when too many mutations are queued up).
More details in the Clickhouse guide. Default true
.
Column name to be used as a version number. Only used in append_only_mode = true
.
Use a different primary key than the one that automatically inferred from the source and/or transform.
Ability to override the automatic schema propagation from the pipeline to Clickhouse. Map of column_name -> clickhouse_datatype
Useful in situations when data type is incompatible between the pipeline and Clickhouse. Or when wanting to use specific type for a column.
MySQL
Lets you sink data to a MySQL table.
Example
Schema
Unique name of the sink. This is a user provided value.
Defines the type of the sink, for postgresql it is always postgressql
User provided description.
Data source for the sink. Reference to either a source or a transform defined in this pipeline.
Database name
The destination table. It will be created if it doesn’t exist. Schema is defined in the secret credentials.
Goldksy secret name that contains credentials for calls between the pipeline and the sink.
For postgres sink, use the jdbc
secret type.
The maximum time (in milliseconds) the pipeline will batch events. Default 100
The maximum time the pipeline will batch events before flushing to sink. Default: ‘1s’
Enables auto commit. Default: true
Rewrite individual insert statements into multi-value insert statements. Default true
Optional column that will be used to select the ‘correct’ row in case of conflict using the ‘greater’ wins strategy: - ie later date, higher number. The column must be numeric.
Elastic Search
Lets you sink data to a Elastic Search index.
Example
v3 example
Schema
Unique name of the sink. This is a user provided value.
Defines the type of the sink, for Elastic Search it is always elasticsearch
User provided description.
Data source for the sink. Reference to either a source or a transform defined in this pipeline.
Elastic search index to write to.
Goldksy secret name that contains credentials for calls between the pipeline and the sink.
For Elastic Search sink, use the elasticSearch
secret type.
Open Search
Example
Schema
Unique name of the sink. This is a user provided value.
Defines the type of the sink, for Elastic Search it is always elasticsearch
User provided description.
Data source for the sink. Reference to either a source or a transform defined in this pipeline.
Elastic search index to write to.
Goldksy secret name that contains credentials for calls between the pipeline and the sink.
For Elastic Search sink, use the elasticSearch
secret type.
Kafka
Lets you sink data to a Kafka topic.
Example
Schema
Unique name of the sink. This is a user provided value.
Defines the type of the sink, for Kafka sink it is always kafka
User provided description.
Data source for the sink. Reference to either a source or a transform defined in this pipeline.
Kafka topic name to write to. Will be created if it does not exist.
Number of paritions to be set in the topic. Only applicable if topic does not exists.
When set to true
, the sink will emit tombstone messages (null values) for DELETE operations instead of the actual payload. This is useful for maintaining the state in Kafka topics where the latest state of a key is required, and older states should be logically deleted. Default false
Format of the record in the topic. Supported types: json
, avro
. Requires Schema Registry credentials in the secret for avro
type.
Goldksy secret name that contains credentials for calls between the pipeline and the sink.
For Kafka sink, use the kafka
secret type.
File
Example
Schema
Unique name of the sink. This is a user provided value.
Defines the type of the sink, for File sink it is always file
User provided description.
Data source for the sink. Reference to either a source or a transform defined in this pipeline.
Path to write to. Use prefix s3://
. Currently, only S3
is supported.
Format of the output file. Supported types: parquet
, csv
.
Enables auto-compaction which helps optimize the output file size. Default false
Columns to be used for partitioning. Multiple columns are comma separated. For eg: "col1,col2"
The maximum sink file size before creating a new one. Default: 128MB
The maximum time the pipeline will batch records before flushing to sink. Default: 30min
DynamoDB
Example
Schema
Unique name of the sink. This is a user provided value.
Defines the type of the sink, for Clickhouse it is always clickhouse
User provided description.
Data source for the sink. Reference to either a source or a transform defined in this pipeline.
Goldksy secret name that contains credentials for calls between the pipeline and the sink.
For DynamoDB sink, use the dynamodb
secret type.
The destination table. It will be created if it doesn’t exist.
Endpoint override, useful when writing to a DynamoDB VPC
Maximum number of requests in flight. Default 50
Batch max size. Default: 25
Maximum number of records to buffer. Default: 10000
Fail the sink on write error. Default false
Webhook
Example
Schema
Unique name of the sink. This is a user provided value.
Defines the type of the sink, for Webhook sinks it is always webhook
Defines the URL to send the record(s) to.
Send only one record per call to the provided url
Goldksy secret name that contains credentials for calls between the pipeline and the sink.
Use this if you do not want to expose authenciation details in plain text in the headers
attribute.
For webhook sink, use the httpauth
secret type.
Headers to be sent in the request from the pipeline to the url
User provided description.
SQS
Lets you sink data to a AWS SQS topic.
Example
Schema
Unique name of the sink. This is a user provided value.
Defines the type of the sink, for postgresql it is always postgressql
User provided description.
Data source for the sink. Reference to either a source or a transform defined in this pipeline.
Goldksy secret name that contains credentials for calls between the pipeline and the sink.
Use this if you do not want to expose authenciation details in plain text in the headers
attribute.
For sqs sink, use the sqs
secret type.
SQS topic URL
Fail the sink on write error. Default false
Pipeline runtime attributes
While sources, transforms and sinks define the business logic of your pipeline. There are attributes that change the pipeline execution/runtime.
If you need a refresher on the of pipelines make sure to check out About Pipeline, here we’ll just focus on specific attributes.
Following are request-level attributes that only controls the behavior of a particular request on the pipeline. These attributes should be passed via arguments to the goldsky pipeline apply <config_file> <arguments/flags>
command.
Defines the desired status for the pipeline which can be one of the three: “ACTIVE”, “INACTIVE”, “PAUSED”. If not provided it will default to the current status of the pipeline.
Defines whether the pipeline should attempt to create a fresh snapshot before this configuration is applied. The pipeline needs to be in a healthy state for snapshot to be created successfully. It defaults to true
.
Defines whether the pipeline should be started from the latest available snapshot. This attribute is useful in restarting scenarios.
To restart a pipeline from scratch, use --use_latest_snapshot false
. It defaults to true
.
Instructs the pipeline to restart. Useful in scenarios where the pipeline needs to be restarted but no configuration change is needed. It defaults to undefined
.
Pipeline Runtime Commands
Commands that change the pipeline runtime.
Start
There are multiple ways to do this:
goldsky pipeline start <name_or_path_to_config_file>
goldsky pipeline apply <name_or_path_to_config_file> --status ACTIVE
This command will have no effect on pipeline that already has a desired status of ACTIVE
.
Pause
Pause will attempt to take a snapshot and stop the pipeline so that it can be resumed later.
There are multiple ways to do this:
goldsky pipeline pause <name_or_path_to_config_file>
goldsky pipeline apply <name_or_path_to_config_file> --status PAUSED
Stop
Stopping a pipeline does not attempt to take a snapshot.
There are multiple ways to do this:
goldsky pipeline stop <pipeline_name(if exists) or path_to_config>
goldsky pipeline apply <path_to_config> --status INACTIVE --from-snapshot none
goldsky pipeline apply <path_to_config> --status INACTIVE --save-progress false
(prior to CLI version11.0.0
)
Update
Make any needed changes to the pipeline configuration file and run goldsky pipeline apply <name_or_path_to_config_file>
.
By default any update on a RUNNING
pipeline will attempt to take a snapshot before applying the update.
If you’d like to avoid taking snapshot as part of the update, run:
goldsky pipeline apply <name_or_path_to_config_file> --from-snapshot last
goldsky pipeline apply <name_or_path_to_config_file> --save-progress false
(prior to CLI version11.0.0
)
This is useful in a situations where the pipeline is running into issues, hence the snapshot will not succeed, blocking the update that is to fix the issue.
Resize
Useful in scenarios where the pipeline is running into resource constraints.
There are multiple ways to do this:
goldsky pipeline resize <resource_size>
goldsky pipeline apply <name_or_path_to_config_file>
with the config file having the attribute:
Restart
goldsky pipeline apply <path_to_configuration> --restart
Useful in the scenarios where a restart is needed but there are no changes in the configuration. For example, pipeline sink’s database connection got stuck because the database has restarted.
By default, restart
will attempt a new snapshot and start the pipeline from that particular snapshot.
To avoid using any existing snapshot or triggering a new one (aka starting from scratch) add the --from-snapshot none
or --save-progress false --use-latest-snapshot false
if you are using CLI version older than 11.0.0
.
Monitor
Provides pipeline runtime information that is helpful for monitoring/developing a pipeline. Although this command does not change the runtime, it provides info like status, metrics, logs etc. that helps with devleloping a pipeline.
goldsky pipeline monitor <name_or_path_to_config_file>
Was this page helpful?