Mirror pipeline definitions
Creating pipeline definitions for use with Goldsky Mirror
We recently released v3 of pipeline definitions which uses a more intuitive and user friendly format to define and configure pipelines using a yaml file. For backward compatibility purposes, we will still support the previous v2 format. This is why you will find references to each format in each yaml file presented across the documentation. Feel free to use whichever is more comfortable for you but we encourage you to start migrating to v3 format.
Mirror pipelines are defined using a yaml file. This yaml file has a twofold purpose:
- It defines the sources, transformations, and sinks that comprise your pipeline.
- It contains configuration attributes that alter certain properties of your pipeline such as its
status
orresource_size
.
These two different purposes can be all added under the same definition file as in the following example:
Keys in v3 format for sources, transforms and sinks are user provided values. In this example, the source reference name base.logs
matches the actual dataset_name. This is the default format that you’ll typically see
accross examples since it’s convenient. However, you can use a custom reference name as key instead, just make sure to remember to use it when referring on your transforms and sinks.
Alternatively, they can be separated:
- Definition files
- Configuration files
In the following sections we’ll look at the attributes that correspond to each section of pipeline definition and pipeline configuration.
Defining a pipeline
In order to create a pipeline, you need to define its sources, transformations, and sinks.
If you already have a pipeline you can get its configuration file with the following command: goldsky pipeline get <pipeline-name>
If you are planning on creating a new pipeline you can use goldsky pipeline apply <path-to-pipeline-config-file>
.
The pipeline config YAML schema consists of three primary sections:
- Sources: Denotes the origin of the data.
- Transforms: Lists the transformations to be applied to the data from the source.
- Sinks: Specifies where the transformed data should be sent.
Each source and transform has a key that can be used by other transforms or sinks. You can compose multiple transforms together as you need and pipe the results to a sink.
Below, we’ll explain each section of the YAML structure, and provide an example for each subset of the schema.
Sources
The sources
object contains one or more source configuration. There are currently two supported source types:
- Subgraph Entities
- Datasets
Subgraph Entities
This lets you define your own subgraphs as a pipeline source.
Example
Here you’ll use as name
the name of your deployed subgraph
Datasets
Datasets let you define Direct Indexing sources. These data sources are curated by the Goldsky team, with automated QA guaranteeing correctness.
Example
To obtain the dataset_name
property, please use goldsky dataset list
and select your chain of choice. Please refer to supported chains for an overview of what data is available for individual chains.
Fast Scan
Consuming full datasets can require a significant amount of storage availability on your sink, specially when the chains have short block times as they tend to grow pretty fast. This process of consuming historical chain data is called Backfill. This is the default pipeline creation mode when you define a pipeline.
In many cases, consuming the entire history of a chain can be time-consuming and cost-inefficient. There are two ways you can address this situation with Mirror:
- Enable Fast Scan on your pipeline by defining filters on the source pipeline definition. These filters are pre-applied at the source level; making the initial ingestion of historical data much faster. You can apply these filters to any dataset just make sure to consider using attributes that exist in it. See example below where we pre-apply a filter based on contract address:
- Consume edge data using the
startAt: latest
in your source pipeline definition. This will inform Mirror to only consume data at edge at the time of creation of the pipeline. See example:
Remember that using both methods is incompatible. If you use the startAt
attribute in your definition it will render Fast Scan useless so be mindful of what method you would like to use.
Transforms
The transforms
object contains one or many transform configurations, each with the following properties:
- Its property name in the
transforms
object: The name of the transformation. This can be used by sinks as afrom
property, or in any other transform’s SQL as a table. sql
: The SQL query to be performed. This can refer to any property from sources object and sinks as SQL tables.primary_key
: The primary key for the transformation. If there are any two rows with the same primaryKey, the pipeline will override it with the latest value.
Transform Example
Sinks
The sinks
object contains one or many sink configurations, each with the following properties:
- Its property name in the
sinks
object: A name of your choice to uniquely identify this sink within a pipeline. type
: The sink type. This could bepostgres
orelasticsearch
.from
: The source or transform to use for the sink.table
: The table name to load into. This is required for sinks of typepostgres
.schema
: The schema for the sink. This is required for sinks of typepostgres
.secret_name
: The name of the secret for the sink. This could beAPI_POSTGRES_CREDENTIALS
orREDPANDA_CREDENTIALS
.topic
: The topic to produce to. This is required for sinks of typekafka
.
Sink Example
Pipeline definition examples
You can run the following examples by copying the file into a local yaml file, and then using
Syncing a subgraph into postgres
This pipeline pulls data from a single subgraph_entity
source, processes the data with a single SQL transformation, and stores the result into a PostgreSQL sink.
You will need to have the existing subgraph with the name/version combo of polymarket/1.0.0
as a prerequisite to running this pipeline.
Merging subgraphs cross-chain
This pipeline is named poap-extended-1
. It pulls data from two subgraph_entity
sources, does not perform any transformations, and stores the result into two separate PostgreSQL sinks.
Syncing a dataset into a postgres database
This pipeline is named decoded-logs-pipeline
. It pulls data from a curated goldsky dataset, without performing any transformations, and stores the result into a PostgreSQL sink, in a table called eth_logs
in the goldsky
schema.
Configuring a pipeline
Once a pipeline has been defined and deployed there’s a certain set of attributes we can use to configure its lifecycle and behaviour.
This is done using the goldsky pipeline apply <config_file>
command.
If you need a refresher on the lifecycle of pipelines make sure to check out Run a Pipeline, here we’ll just focus on detailing the configuration attributes that can be used:
The pipeline name is required so that Mirror knows which pipeline needs to apply the configuration to. Remember that, if a definition is not provided, then pipeline name must refere to an already deployed pipeline in your project.
As explained above, sources are a list of datasets being sent to a transform or a sink. If a pipeline has already been deployed there’s no need to include sources unless you want to update any of their properties, but there’s no downside to leaving them.
Transforms can be an empty object, or one or more objects that include SQL transforms run against either a source, or another transform within the same pipeline.
Sinks are a list of sinks that are configured with secrets in your account. They reference either a source, or a transform and point to a configured sink, which is usually a table for database sinks.
It defines the desired status for the pipeline which can be one of the three: “ACTIVE”, “INACTIVE”, “PAUSED”. If not provided it will default to the current status of the pipeline
It defines the amount of compute power to add to the pipeline. It can take one of the following values: “s”, “m”, “l”, “xl”, “xxl”. If not provided it will default to the current resource_size of the pipeline.
It defines whether the pipeline should create a snapshot when this configuration is applied. It defaults to false
It defines whether the pipeline should restart from the latest available snapshot. Notice this command is useful in restarting
scenarios. It defaults to true
It instructs the pipeline to restart when this configuration is applied. It defaults to false
Pipeline configuration examples
Take a snapshot and restart the pipeline with more resources
Restarting a pipeline from the latest safe point
Consider the scenario where your pipeline got in a corrupted state and you would like to restart it to a safe point in the past, avoiding taking a snapshot of your current status. You could achieve that with a configuration like the following:
If you run into any issues at all with setting pipelines up, feel free to ask us for help!
Was this page helpful?