Mirror pipeline definitions
Creating pipeline definitions for use with Goldsky Mirror
We recently released v3 of pipeline definitions which uses a more intuitive and user friendly format to define and configure pipelines using a yaml file. For backward compatibility purposes, we will still support the previous v2 format. This is why you will find references to each format in each yaml file presented across the documentation. Feel free to use whichever is more comfortable for you but we encourage you to start migrating to v3 format.
Mirror pipelines are defined using a yaml file. This yaml file has a twofold purpose:
- It defines the sources, transformations, and sinks that comprise your pipeline.
- It contains configuration attributes that alter certain properties of your pipeline such as its
status
orresource_size
.
These two different purposes can be all added under the same definition file as in the following example:
name: base-logs-pipeline
version: 1
status: ACTIVE
resource_size: s
apiVersion: 3
sources:
base.logs:
dataset_name: base.logs
version: 1.0.0
type: dataset
description: Enriched logs for events emitted from contracts. Contains the
contract address, data, topics, decoded event and metadata for blocks and
transactions.
display_name: Logs
transforms: {}
sinks:
postgres_base_logs:
type: postgres
table: base_logs
schema: public
secret_name: GOLDSKY_SECRET
description: "Postgres sink for: base.logs"
from: base.logs
Keys in v3 format for sources, transforms and sinks are user provided values. In this example, the source reference name base.logs
matches the actual dataset_name. This is the default format that you’ll typically see
accross examples since it’s convenient. However, you can use a custom reference name as key instead, just make sure to remember to use it when referring on your transforms and sinks.
Alternatively, they can be separated:
- Definition files
name: base-logs-pipeline
description: An example description of a pipeline
version: 1
status: ACTIVE
resource_size: s
apiVersion: 3
sources:
base.logs:
dataset_name: base.logs
version: 1.0.0
type: dataset
description: Enriched logs for events emitted from contracts. Contains the
contract address, data, topics, decoded event and metadata for blocks and
transactions.
display_name: Logs
transforms: {}
sinks:
postgres_base_logs:
type: postgres
table: base_logs
schema: public
secret_name: GOLDSKY_SECRET
description: "Postgres sink for: base.logs"
from: base.logs
- Configuration files
name: base-logs-pipeline
description: An example description of a pipeline
resource_size: s
In the following sections we’ll look at the attributes that correspond to each section of pipeline definition and pipeline configuration.
Defining a pipeline
In order to create a pipeline, you need to define its sources, transformations, and sinks.
If you already have a pipeline you can get its configuration file with the following command: goldsky pipeline get <pipeline-name>
If you are planning on creating a new pipeline you can use goldsky pipeline apply <path-to-pipeline-config-file>
.
The pipeline config YAML schema consists of three primary sections:
- Sources: Denotes the origin of the data.
- Transforms: Lists the transformations to be applied to the data from the source.
- Sinks: Specifies where the transformed data should be sent.
Each source and transform has a key that can be used by other transforms or sinks. You can compose multiple transforms together as you need and pipe the results to a sink.
Below, we’ll explain each section of the YAML structure, and provide an example for each subset of the schema.
Sources
The sources
object contains one or more source configuration. There are currently two supported source types:
- Subgraph Entities
- Datasets
Subgraph Entities
This lets you define your own subgraphs as a pipeline source.
Example
sources:
polymarket.fixed_product_market_maker:
type: subgraph_entity
name: fixed_product_market_maker
subgraphs:
- name: polymarket
version: 1.0.0
Here you’ll use as name
the name of your deployed subgraph
Datasets
Datasets let you define Direct Indexing sources. These data sources are curated by the Goldsky team, with automated QA guaranteeing correctness.
Example
sources:
base_logs:
type: dataset
dataset_name: base.logs
version: 1.0.0
To obtain the dataset_name
property, please use goldsky dataset list
and select your chain of choice. Please refer to supported chains for an overview of what data is available for individual chains.
Fast Scan
Consuming full datasets can require a significant amount of storage availability on your sink, specially when the chains have short block times as they tend to grow pretty fast. This process of consuming historical chain data is called Backfill. This is the default pipeline creation mode when you define a pipeline.
In many cases, consuming the entire history of a chain can be time-consuming and cost-inefficient. There are two ways you can address this situation with Mirror:
- Enable Fast Scan on your pipeline by defining filters on the source pipeline definition. These filters are pre-applied at the source level; making the initial ingestion of historical data much faster. You can apply these filters to any dataset just make sure to consider using attributes that exist in it. See example below where we pre-apply a filter based on contract address:
sources:
base_logs:
type: dataset
dataset_name: base.logs
version: 1.0.0
filter: address = '0x21552aeb494579c772a601f655e9b3c514fda960'
- Consume edge data using the
startAt: latest
in your source pipeline definition. This will inform Mirror to only consume data at edge at the time of creation of the pipeline. See example:
sources:
base_logs:
type: dataset
dataset_name: base.logs
version: 1.0.0
start_at: latest
Remember that using both methods is incompatible. If you use the startAt
attribute in your definition it will render Fast Scan useless so be mindful of what method you would like to use.
Transforms
The transforms
object contains one or many transform configurations, each with the following properties:
- Its property name in the
transforms
object: The name of the transformation. This can be used by sinks as afrom
property, or in any other transform’s SQL as a table. sql
: The SQL query to be performed. This can refer to any property from sources object and sinks as SQL tables.primary_key
: The primary key for the transformation. If there are any two rows with the same primaryKey, the pipeline will override it with the latest value.
Transform Example
transforms:
negative_fpmm_scaled_liquidity_parameter:
sql: SELECT id FROM polymarket.fixed_product_market_maker WHERE scaled_liquidity_parameter < 0
primary_key: id
Sinks
The sinks
object contains one or many sink configurations, each with the following properties:
- Its property name in the
sinks
object: A name of your choice to uniquely identify this sink within a pipeline. type
: The sink type. This could bepostgres
orelasticsearch
.from
: The source or transform to use for the sink.table
: The table name to load into. This is required for sinks of typepostgres
.schema
: The schema for the sink. This is required for sinks of typepostgres
.secret_name
: The name of the secret for the sink. This could beAPI_POSTGRES_CREDENTIALS
orREDPANDA_CREDENTIALS
.topic
: The topic to produce to. This is required for sinks of typekafka
.
Sink Example
sinks:
postgres_test_negative_fpmm_scaled_liquidity_parameter:
type: postgres
from: negative_fpmm_scaled_liquidity_parameter
table: test_negative_fpmm_scaled_liquidity_parameter
schema: public
secret_name: API_POSTGRES_CREDENTIALS
Pipeline definition examples
You can run the following examples by copying the file into a local yaml file, and then using
# Assuming the yaml config is in pipeline.yaml
goldsky pipeline apply pipeline.yaml
Syncing a subgraph into postgres
This pipeline pulls data from a single subgraph_entity
source, processes the data with a single SQL transformation, and stores the result into a PostgreSQL sink.
You will need to have the existing subgraph with the name/version combo of polymarket/1.0.0
as a prerequisite to running this pipeline.
name: syncing-a-subgraph-into-postgres
apiVersion: 3
sources:
polygon.fixed_product_market_maker:
type: subgraph_entity
name: fixed_product_market_maker
subgraphs:
- name: polymarket
version: 1.0.0
transforms:
negative_fpmm_scaled_liquidity_parameter:
sql: SELECT id FROM polygon.fixed_product_market_maker WHERE scaled_liquidity_parameter < 0
primary_key: id
sinks:
postgres_polygon_sink:
type: postgres
from: negative_fpmm_scaled_liquidity_parameter
table: test_negative_fpmm_scaled_liquidity_parameter
schema: public
secret_name: API_POSTGRES_CREDENTIALS
Merging subgraphs cross-chain
This pipeline is named poap-extended-1
. It pulls data from two subgraph_entity
sources, does not perform any transformations, and stores the result into two separate PostgreSQL sinks.
name: poap-extended-1
apiVersion: 3
sources:
hashflow_cross_chain.pool_created:
type: subgraph_entity
name: pool_created
subgraphs:
- name: polymarket
version: 1.0.0
- name: hashflow
version: 1.0.0
hashflow_cross_chain.update_router_permissions:
type: subgraph_entity
name: update_router_permissions
subgraphs:
- name: polymarket
version: 1.0.0
- name: hashflow
version: 1.0.0
transforms: {}
sinks:
pool_created_sink:
type: postgres
from: hashflow_cross_chain.pool_created
table: test_pool_created
schema: public
secret_name: API_POSTGRES_CREDENTIALS
update_router_permissions_sink:
type: postgres
from: hashflow_cross_chain.update_router_permissions
table: test_update_router_permissions
schema: public
secret_name: API_POSTGRES_CREDENTIALS
Syncing a dataset into a postgres database
This pipeline is named decoded-logs-pipeline
. It pulls data from a curated goldsky dataset, without performing any transformations, and stores the result into a PostgreSQL sink, in a table called eth_logs
in the goldsky
schema.
name: decoded-logs-pipeline
apiVersion: 3
sources:
my_ethereum_decoded_logs:
dataset_name: ethereum.decoded_logs
version: 1.0.0
type: dataset
start_at: latest
transforms:
logs:
sql: |
SELECT
id,
address,
event_signature,
event_params,
raw_log.block_number as block_number,
raw_log.block_hash as block_hash,
raw_log.transaction_hash as transaction_hash
FROM
my_ethereum_decoded_logs
primary_key: id
sinks:
logs_sink:
type: postgres
table: eth_logs
schema: goldsky
secret_name: API_POSTGRES_CREDENTIALS
from: logs
Configuring a pipeline
Once a pipeline has been defined and deployed there’s a certain set of attributes we can use to configure its lifecycle and behaviour.
This is done using the goldsky pipeline apply <config_file>
command.
If you need a refresher on the lifecycle of pipelines make sure to check out Run a Pipeline, here we’ll just focus on detailing the configuration attributes that can be used:
The pipeline name is required so that Mirror knows which pipeline needs to apply the configuration to. Remember that, if a definition is not provided, then pipeline name must refere to an already deployed pipeline in your project.
As explained above, sources are a list of datasets being sent to a transform or a sink. If a pipeline has already been deployed there’s no need to include sources unless you want to update any of their properties, but there’s no downside to leaving them.
Transforms can be an empty object, or one or more objects that include SQL transforms run against either a source, or another transform within the same pipeline.
Sinks are a list of sinks that are configured with secrets in your account. They reference either a source, or a transform and point to a configured sink, which is usually a table for database sinks.
It defines the desired status for the pipeline which can be one of the three: “ACTIVE”, “INACTIVE”, “PAUSED”. If not provided it will default to the current status of the pipeline
It defines the amount of compute power to add to the pipeline. It can take one of the following values: “s”, “m”, “l”, “xl”, “xxl”. If not provided it will default to the current resource_size of the pipeline.
It defines whether the pipeline should create a snapshot when this configuration is applied. It defaults to false
It defines whether the pipeline should restart from the latest available snapshot. Notice this command is useful in restarting
scenarios. It defaults to true
It instructs the pipeline to restart when this configuration is applied. It defaults to false
Pipeline configuration examples
Take a snapshot and restart the pipeline with more resources
name: base-logs-pipeline
restart: true
resource_size: xl
Restarting a pipeline from the latest safe point
Consider the scenario where your pipeline got in a corrupted state and you would like to restart it to a safe point in the past, avoiding taking a snapshot of your current status. You could achieve that with a configuration like the following:
name: base-logs-pipeline
description: a new description for my pipeline
restart: true
use_latest_snapshot: true
save_progress: false
If you run into any issues at all with setting pipelines up, feel free to ask us for help!
Was this page helpful?