Skip to main content

Overview

Turbo pipelines are defined using YAML configuration files that specify sources, transforms, and sinks. This guide covers the complete configuration syntax.

Basic Structure

name: <pipeline-name>
resource_size: s | m | l
description: <optional-description>
job: false  # Optional: set to true for one-time jobs

sources:
  <source-config>

transforms:
  <transform-config>

sinks:
  <sink-config>

Top-Level Fields

name
string
required
Unique identifier for your pipeline. Must be lowercase with hyphens (e.g., erc20-tracker, solana-blocks).
resource_size
string
required
Resource allocation for the pipeline:
  • s - Small: 1 worker
  • m - Medium: 2 workers
  • l - Large: 4 workers
description
string
Optional description of what the pipeline does
job
boolean
default:"false"
Run the pipeline as a one-time job instead of a long-running stream. Jobs run to completion and auto-delete after 1 hour. See the Job Mode guide for details.

Sources

Define where your data comes from. See the Guides for detailed examples with EVM and Solana datasets.

Dataset Source

sources:
  <reference-name>:
    type: dataset
    dataset_name: <chain>.<dataset-type>
    version: <version>
    start_at: latest | earliest
Example:
sources:
  polygon_transfers:
    type: dataset
    dataset_name: matic.erc20_transfers
    version: 1.2.0
    start_at: latest

Transforms

Process and transform your data. See the Transforms documentation for details.

SQL Transform

transforms:
  <reference-name>:
    type: sql
    primary_key: <column-name>
    from: <source-or-transform>  # Optional
    sql: |
      <sql-query>
Example:
transforms:
  filtered_transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        lower(contract_address) as token,
        CAST(value AS DECIMAL) as amount
      FROM polygon_transfers
      WHERE CAST(value AS DECIMAL) > 1000000

Dynamic Table

transforms:
  <reference-name>:
    type: dynamic_table
    backend_type: Postgres | InMemory
    backend_entity_name: <table-name>
    secret_name: <secret-name>  # Required for Postgres
    sql: |  # Optional
      <sql-query>
Example:
transforms:
  tracked_wallets:
    type: dynamic_table
    backend_type: Postgres
    backend_entity_name: user_wallets
    secret_name: MY_POSTGRES

HTTP Handler Transform

transforms:
  <reference-name>:
    type: handler
    from: <source-or-transform>
    url: <endpoint-url>
    primary_key: <column-name>
    one_row_per_request: true | false
Example:
transforms:
  enriched_events:
    type: handler
    from: filtered_events
    url: https://api.example.com/enrich
    primary_key: id
    one_row_per_request: false

WebAssembly Script Transform

transforms:
  <reference-name>:
    type: script
    from: <source-or-transform>
    language: javascript | typescript
    primary_key: <column-name>
    script: |
      function process(input) {
        // Your code here
        return input;
      }
Example:
transforms:
  custom_processing:
    type: script
    from: raw_data
    language: javascript
    primary_key: id
    script: |
      function process(input) {
        input.processed = true;
        input.timestamp = Date.now();
        return input;
      }

Sinks

Write processed data to destinations. See the Sinks documentation for complete information.

PostgreSQL Sink

sinks:
  <reference-name>:
    type: postgres
    from: <transform-name>
    schema: <schema-name>
    table: <table-name>
    secret_name: <secret-name>
    primary_key: <column-name>  # Optional
Example:
sinks:
  postgres_output:
    type: postgres
    from: filtered_transfers
    schema: public
    table: erc20_transfers
    secret_name: MY_POSTGRES
    primary_key: id

ClickHouse Sink

sinks:
  <reference-name>:
    type: clickhouse
    from: <transform-name>
    table: <table-name>
    primary_key: <column-name>
    secret_name: <secret-name>
Example:
sinks:
  clickhouse_analytics:
    type: clickhouse
    from: aggregated_data
    table: transfers_analytics
    primary_key: id
    secret_name: MY_CLICKHOUSE

Webhook Sink

sinks:
  <reference-name>:
    type: webhook
    from: <transform-name>
    url: <endpoint-url>
    one_row_per_request: true | false
Example:
sinks:
  webhook_alerts:
    type: webhook
    from: high_value_transfers
    url: https://alerts.example.com/webhook
    one_row_per_request: true

Kafka Sink

sinks:
  <reference-name>:
    type: kafka
    from: <transform-name>
    topic: <topic-name>
    topic_partitions: <number>
    data_format: avro
Example:
sinks:
  kafka_output:
    type: kafka
    from: processed_events
    topic: processed.events
    topic_partitions: 10
    data_format: avro

Complete Example

Here’s a complete pipeline that demonstrates multiple features:
name: multi-chain-token-tracker
resource_size: m
description: Track ERC-20 transfers across multiple chains for specific wallets

sources:
  # Ethereum transfers
  ethereum_transfers:
    type: dataset
    dataset_name: ethereum.erc20_transfers
    version: 1.2.0
    start_at: latest

  # Polygon transfers
  polygon_transfers:
    type: dataset
    dataset_name: matic.erc20_transfers
    version: 1.2.0
    start_at: latest

transforms:
  # Dynamic table for tracked wallets
  tracked_wallets:
    type: dynamic_table
    backend_type: Postgres
    backend_entity_name: user_wallets
    secret_name: MY_POSTGRES

  # Combine both chains
  all_transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT *, 'polygon' as chain, 137 as chain_id FROM ethereum_transfers
      UNION ALL
      SELECT *, 'ethereum' as chain, 1 as chain_id FROM polygon_transfers

  # Add transfer direction
  final_transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT
        *,
        CASE
          WHEN dynamic_table_check('tracked_wallets', sender) THEN 'outgoing'
          WHEN dynamic_table_check('tracked_wallets', recipient) THEN 'incoming'
          ELSE 'unknown'
        END as direction
      FROM all_transfers
      WHERE
        dynamic_table_check('tracked_wallets', sender)
        OR dynamic_table_check('tracked_wallets', recipient)

sinks:
  # Store in PostgreSQL
  postgres_archive:
    type: postgres
    from: final_transfers
    schema: public
    table: wallet_transfers
    secret_name: MY_POSTGRES
    primary_key: id

  # Send alerts for high-value transfers
  webhook_alerts:
    type: webhook
    from: final_transfers
    url: https://api.example.com/transfer-alert

  # Publish to analytics Kafka topic
  kafka_analytics:
    type: kafka
    from: final_transfers
    topic: wallet.transfers.analytics
    topic_partitions: 10
    data_format: avro

Reference Names

Throughout your pipeline, you reference sources and transforms by their configured names:
sources:
  my_source:  # Reference name
    type: dataset
    # ...

transforms:
  transform_1:  # Reference name
    type: sql
    sql: SELECT * FROM my_source  # Use source reference name

  transform_2:  # Reference name
    type: sql
    from: transform_1  # Use transform reference name
    sql: SELECT * FROM transform_1

sinks:
  my_sink:
    type: postgres
    from: transform_2  # Use transform reference name
Naming Guidelines:
  • Use descriptive, lowercase names with underscores or hyphens
  • Avoid special characters except _ and -
  • Examples: ethereum_blocks, filtered-transfers, enriched_data

Secrets

Secrets store sensitive information like database credentials:

Creating Secrets

goldsky secret create MY_SECRET_NAME

Using Secrets

Reference secrets in your pipeline configuration:
transforms:
  my_table:
    type: dynamic_table
    secret_name: MY_POSTGRES  # Reference the secret

sinks:
  my_sink:
    type: postgres
    secret_name: MY_POSTGRES  # Reference the secret

Secret Formats

PostgreSQL:
postgres://username:password@host:port/database
ClickHouse:
https://username:password@host:port/database

Validation

Before deploying, validate your pipeline configuration:
goldsky turbo validate my-pipeline.yaml
This checks for:
  • YAML syntax errors
  • Required fields
  • Invalid parameter values
  • Source/transform/sink references
  • SQL syntax (basic validation)

Best Practices

Choose names that clearly indicate what each component does:
# Good
sources:
  polygon_erc20_transfers:
    type: dataset
    # ...

transforms:
  high_value_transfers:
    type: sql
    # ...

# Avoid
sources:
  source_1:  # Not descriptive
    # ...
Add comments to explain complex logic:
transforms:
  # Filter to only USDC transfers over $10,000
  large_usdc_transfers:
    type: sql
    sql: |
      SELECT * FROM transfers
      WHERE contract = lower('0x...')
        AND CAST(value AS DECIMAL) > 10000000000  -- $10k in 6 decimals
Begin with resource_size: s and scale up if needed:
name: my-pipeline
resource_size: s  # Start small, monitor performance
Unless you need historical data, start from the latest:
sources:
  my_source:
    start_at: latest  # Only process new data
Always validate your configuration:
goldsky turbo validate my-pipeline.yaml
goldsky turbo apply -f my-pipeline.yaml

Next Steps