The definition yaml defines the sources, transformations, and sinks that comprise your pipeline. The
--definition-path flag is used to use files (get them with
goldsky pipeline get-definition <pipeline-name>) and
--definition can be used to inline a full definition inside the command.
The YAML schema consists of three primary sections:
- Sources: Denotes the origin of the data.
- Transforms: Lists the transformations to be applied to the data from the source.
- Sinks: Specifies where the transformed data should be sent.
Each source and transform has a reference name that can be used by other transforms or sinks. You can compose multiple transforms together as you need and pipe the results to a sink.
Below, we’ll explain each section of the YAML structure, and provide an example for each subset of the schema.
sources array contains one or more source objects. There are currently two supported source types:
- Subgraph Entities
This lets you define your own subgraphs as a pipeline source.
sources: - type: subgraphEntity deployments: - id: QmVcgRByfiFSzZfi7RZ21gkJoGKG2jeRA1DrpvCQ6ficNb entity: name: fixed_product_market_maker referenceName: polymarket.fixed_product_market_maker
Datasets let you define Direct Indexing sources. These datasources are curated by the Goldsky team, with automated QA guaranteeing correctness.
sources: - type: dataset referenceName: ethereum.decoded_logs version: 1.0.0
To obtain the
referenceName property, please use
goldsky dataset list and select your chain of choice. Please refer to supported chains for an overview of what data is available for individual chains.
transforms array contains one or many transform objects, each with the following properties:
referenceName: The name of the transformation. This can be used by sinks as a
sourceStreamName, or in any other transform’s SQL as a table.
type: The type of the transformation. Currently, only
sql: The SQL query to be performed. This can refer to any
referenceNamefrom sources and sinks as SQL tables.
primaryKey: The primary key for the transformation. If there are any two rows with the same primaryKey, the pipeline will override it with the latest value.
transforms: - referenceName: negative_fpmm_scaled_liquidity_parameter type: sql sql: SELECT id FROM polygon.fixed_product_market_maker WHERE scaled_liquidity_parameter < 0 primaryKey: id
sinks array contains one or many sink objects, each with the following properties:
type: The sink type. This could be
sourceStreamName: The source stream name for the sink.
referenceName: A name of your choice to uniquely identify this sink within a pipeline.
table: The table name to load into. This is required for sinks of type
schema: The schema for the sink. This is required for sinks of type
secretName: The name of the secret for the sink. This could be
topic: The topic to produce to. This is required for sinks of type
sinks: - type: postgres sourceStreamName: negative_fpmm_scaled_liquidity_parameter referenceName: postgres_test_negative_fpmm_scaled_liquidity_parameter table: test_negative_fpmm_scaled_liquidity_parameter schema: public secretName: API_POSTGRES_CREDENTIALS
End to end examples
You can run the following examples by copying the file into a local yaml file, and then using
# Assuming the yaml config is in pipeline.yaml goldsky pipeline create test-pipeline --definition-path pipeline.yaml
Syncing a subgraph into postgres
This pipeline pulls data from a single
subgraphEntity source, processes the data with a single SQL transformation, and stores the result into a PostgreSQL sink.
You will need to have the existing subgraph with the deployment
QmVcgRByfiFSzZfi7RZ21gkJoGKG2jeRA1DrpvCQ6ficNb as a prerequisite to running this pipeline.
sources: - type: subgraphEntity deployments: - id: QmVcgRByfiFSzZfi7RZ21gkJoGKG2jeRA1DrpvCQ6ficNb namespace: polygon entity: name: fixed_product_market_maker transforms: - name: negative_fpmm_scaled_liquidity_parameter type: sql sql: SELECT id FROM polygon.fixed_product_market_maker WHERE scaled_liquidity_parameter < 0 primaryKey: id sinks: - type: postgres sourceStreamName: negative_fpmm_scaled_liquidity_parameter table: test_negative_fpmm_scaled_liquidity_parameter schema: public secretName: API_POSTGRES_CREDENTIALS
Merging subgraphs cross-chain
This pipeline is named
poap-extended-1. It pulls data from two
subgraphEntity sources, does not perform any transformations, and stores the result into two separate PostgreSQL sinks.
sources: - type: subgraphEntity deployments: - id: QmbsFSmqsWFFcbxnGedXifyeTbKBSypczRcwPrBxdQdyXE - id: QmNSwC6QjZSFcSm2Tmoy6Van7g6zSEqD3yz4tDWRFdZiKh - id: QmZUh5Rp3edMhYj3wCH58zSNvZvrPSQyeM6AN5HTmyw2Ch referenceName: hashflow_cross_chain.pool_created entity: name: pool_created - type: subgraphEntity deployments: - id: QmbsFSmqsWFFcbxnGedXifyeTbKBSypczRcwPrBxdQdyXE - id: QmNSwC6QjZSFcSm2Tmoy6Van7g6zSEqD3yz4tDWRFdZiKh - id: QmZUh5Rp3edMhYj3wCH58zSNvZvrPSQyeM6AN5HTmyw2Ch referenceName: hashflow_cross_chain.update_router_permissions entity: name: update_router_permissions transforms: sinks: - type: postgres sourceStreamName: hashflow_cross_chain.pool_created table: test_pool_created schema: public secretName: API_POSTGRES_CREDENTIALS - type: postgres sourceStreamName: hashflow_cross_chain.update_router_permissions table: test_update_router_permissions schema: public secretName: API_POSTGRES_CREDENTIALS
Syncing a dataset into a postgres database
This pipeline is named
decoded-logs. It pulls data from a curated goldsky dataset, without performing any transformations, and stores the result into a PostgreSQL sink, in a table called
eth_logs in the
sources: - referenceName: ethereum.decoded_logs version: 1.0.0 type: dataset startAt: latest transforms: - sql: | SELECT id, address, event_signature, event_params, raw_log.block_number as block_number, raw_log.block_hash as block_hash, raw_log.transaction_hash as transaction_hash FROM ethereum.decoded_logs referenceName: logs type: sql primaryKey: id sinks: - type: postgres table: eth_logs schema: goldsky secretName: API_POSTGRES_CREDENTIALS sourceStreamName: logs
If you run into any issues at all with setting pipelines up, feel free to ask us for help!