Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.goldsky.com/llms.txt

Use this file to discover all available pages before exploring further.

Overview

Turbo pipelines are defined using YAML configuration files that specify sources, transforms, and sinks. This guide covers the complete configuration syntax.

Basic Structure

name: <pipeline-name>
resource_size: xs | s | m | l | xl | xxl
description: <optional-description>
job: false # Optional: set to true for one-time jobs

sources: <source-config>

transforms: <transform-config>

sinks: <sink-config>

Top-Level Fields

name
string
required
Unique identifier for your pipeline. Must use only lowercase letters, numbers, and hyphens, and start and end with a letter or number (e.g., erc20-tracker, solana-blocks).
resource_size
string
default:"s"
Resource allocation for the pipeline. Each tier roughly doubles the previous tier’s CPU and memory. CPU limit is 3x the request; memory has no limit.
SizeCPU RequestCPU LimitMemory
xs0.41.20.5 Gi
s0.82.41.0 Gi
m1.64.82.0 Gi
l3.29.64.0 Gi
xl6.419.28.0 Gi
xxl12.838.416.0 Gi
description
string
Optional description of what the pipeline does.
job
boolean
default:"false"
Run the pipeline as a one-time job instead of a long-running stream. Jobs run to a terminal state (success or failure) and auto-delete 1 hour after termination. See the Job Mode guide for details.

Sources

Define where your data comes from. See the Data Sources for detailed examples with EVM and Solana datasets.

Dataset Source

sources:
  <reference-name>:
    type: dataset
    dataset_name: <chain>.<dataset-type>
    version: <version>
    start_at: latest | earliest
Example:
sources:
  polygon_transfers:
    type: dataset
    dataset_name: matic.erc20_transfers
    version: 1.2.0
    start_at: latest

Transforms

Process and transform your data. See the Transforms documentation for details.

SQL Transform

transforms:
  <reference-name>:
    type: sql
    primary_key: <column-name>
    sql: |
      <sql-query>
SQL transforms reference upstream sources and transforms directly in the FROM clause of the query. There is no separate from: field. Example:
transforms:
  filtered_transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        lower(contract_address) as token,
        CAST(value AS DECIMAL) as amount
      FROM polygon_transfers
      WHERE CAST(value AS DECIMAL) > 1000000

Dynamic Table

transforms:
  <reference-name>:
    type: dynamic_table
    backend_type: Postgres | InMemory
    backend_entity_name: <table-name>
    secret_name: <secret-name> # Required for Postgres backend
    sql: | # Optional
      <sql-query>
    schema: <postgres-schema> # Optional, Postgres only
    column: <column-name> # Optional, column used for lookups
    time_column: <column-name> # Optional, column used for refresh bookkeeping
Example:
transforms:
  tracked_wallets:
    type: dynamic_table
    backend_type: Postgres
    backend_entity_name: user_wallets
    secret_name: MY_POSTGRES

HTTP Handler Transform

transforms:
  <reference-name>:
    type: handler
    from: <source-or-transform>
    url: <endpoint-url>
    primary_key: <column-name>
    one_row_per_request: true | false # Optional
    headers: # Optional
      <header-name>: <header-value>
    payload_version: <number> # Optional
Example:
transforms:
  enriched_events:
    type: handler
    from: filtered_events
    url: https://api.example.com/enrich
    primary_key: id
    one_row_per_request: false

WebAssembly Script Transform

transforms:
  <reference-name>:
    type: script
    from: <source-or-transform>
    language: javascript | typescript
    primary_key: <column-name>
    script: |
      function process(input) {
        // Your code here
        return input;
      }
Example:
transforms:
  custom_processing:
    type: script
    from: raw_data
    language: javascript
    primary_key: id
    script: |
      function process(input) {
        input.processed = true;
        input.timestamp = Date.now();
        return input;
      }

Sinks

Write processed data to destinations. See the Sinks documentation for complete information.

PostgreSQL Sink

sinks:
  <reference-name>:
    type: postgres
    from: <transform-name>
    schema: <schema-name>
    table: <table-name>
    secret_name: <secret-name>
    primary_key: <column-name> # Optional
    batch_size: <number> # Optional
    batch_flush_interval: <duration> # Optional, e.g. 1s
Example:
sinks:
  postgres_output:
    type: postgres
    from: filtered_transfers
    schema: public
    table: erc20_transfers
    secret_name: MY_POSTGRES
    primary_key: id

PostgreSQL Aggregation Sink

sinks:
  <reference-name>:
    type: postgres_aggregate
    from: <transform-name>
    schema: <schema-name>
    landing_table: <landing-table-name>
    agg_table: <aggregation-table-name>
    primary_key: <column-name>
    secret_name: <secret-name>
    group_by:
      <column-name>:
        from: <source-column> # Optional
        type: <postgres-type> # Optional
    aggregate:
      <column-name>:
        from: <source-column> # Optional
        fn: sum | count | avg | min | max
        type: <postgres-type> # Optional
Example:
sinks:
  account_balances:
    type: postgres_aggregate
    from: transfers
    schema: public
    landing_table: transfer_log
    agg_table: balances
    primary_key: transfer_id
    secret_name: MY_POSTGRES
    group_by:
      account:
        type: text
    aggregate:
      balance:
        from: amount
        fn: sum

ClickHouse Sink

sinks:
  <reference-name>:
    type: clickhouse
    from: <transform-name>
    table: <table-name>
    primary_key: <column-name>
    secret_name: <secret-name>
    batch_size: <number> # Optional
    batch_flush_interval: <duration> # Optional, e.g. 1s
Example:
sinks:
  clickhouse_analytics:
    type: clickhouse
    from: aggregated_data
    table: transfers_analytics
    primary_key: id
    secret_name: MY_CLICKHOUSE

Webhook Sink

sinks:
  <reference-name>:
    type: webhook
    from: <transform-name>
    url: <endpoint-url>
    one_row_per_request: true | false # Optional
    headers: # Optional
      <header-name>: <header-value>
    secret_name: <secret-name> # Optional
    payload_version: <number> # Optional
Example:
sinks:
  webhook_alerts:
    type: webhook
    from: high_value_transfers
    url: https://alerts.example.com/webhook
    one_row_per_request: true

Kafka Sink

sinks:
  <reference-name>:
    type: kafka
    from: <transform-name>
    topic: <topic-name>
    data_format: avro | json
    topic_partitions: <number> # Optional
    secret_name: <secret-name> # Optional
    batch_size: <number> # Optional
    batch_flush_interval: <duration> # Optional, e.g. 100ms
Example:
sinks:
  kafka_output:
    type: kafka
    from: processed_events
    topic: processed.events
    topic_partitions: 10
    data_format: avro

SQS Sink

sinks:
  <reference-name>:
    type: sqs_sink
    from: <transform-name>
    queue_url: <queue-url>
    secret_name: <secret-name>
    batch_flush_interval: <duration>
Example:
sinks:
  sqs_output:
    type: sqs_sink
    from: high_value_transfers
    queue_url: https://sqs.us-east-1.amazonaws.com/123456789012/my-queue
    secret_name: MY_SQS_SECRET
    batch_flush_interval: 1s

Complete Example

Here’s a complete pipeline that demonstrates multiple features:
Complete multi-chain pipeline example
name: multi-chain-token-tracker
resource_size: m
description: Track ERC-20 transfers across multiple chains for specific wallets

sources:
  # Ethereum transfers
  ethereum_transfers:
    type: dataset
    dataset_name: ethereum.erc20_transfers
    version: 1.2.0
    start_at: latest

  # Polygon transfers
  polygon_transfers:
    type: dataset
    dataset_name: matic.erc20_transfers
    version: 1.2.0
    start_at: latest

transforms:
  # Dynamic table for tracked wallets
  tracked_wallets:
    type: dynamic_table
    backend_type: Postgres
    backend_entity_name: user_wallets
    secret_name: MY_POSTGRES

  # Combine both chains
  all_transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT *, 'ethereum' as chain, 1 as chain_id FROM ethereum_transfers
      UNION ALL
      SELECT *, 'polygon' as chain, 137 as chain_id FROM polygon_transfers

  # Add transfer direction
  final_transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT
        *,
        CASE
          WHEN dynamic_table_check('tracked_wallets', sender) THEN 'outgoing'
          WHEN dynamic_table_check('tracked_wallets', recipient) THEN 'incoming'
          ELSE 'unknown'
        END as direction
      FROM all_transfers
      WHERE
        dynamic_table_check('tracked_wallets', sender)
        OR dynamic_table_check('tracked_wallets', recipient)

sinks:
  # Store in PostgreSQL
  postgres_archive:
    type: postgres
    from: final_transfers
    schema: public
    table: wallet_transfers
    secret_name: MY_POSTGRES
    primary_key: id

  # Send alerts for high-value transfers
  webhook_alerts:
    type: webhook
    from: final_transfers
    url: https://api.example.com/transfer-alert

  # Publish to analytics Kafka topic
  kafka_analytics:
    type: kafka
    from: final_transfers
    topic: wallet.transfers.analytics
    topic_partitions: 10
    data_format: avro

Reference Names

Throughout your pipeline, you reference sources and transforms by their configured names:
sources:
  my_source: # Reference name
    type: dataset
    # ...

transforms:
  transform_1: # Reference name
    type: sql
    sql: SELECT * FROM my_source # Use source reference name

  transform_2: # Reference name
    type: sql
    from: transform_1 # Use transform reference name
    sql: SELECT * FROM transform_1

sinks:
  my_sink:
    type: postgres
    from: transform_2 # Use transform reference name
Naming Guidelines:
  • Use descriptive, lowercase names with underscores or hyphens
  • Avoid special characters except _ and -
  • Examples: ethereum_blocks, filtered-transfers, enriched_data

Secrets

Secrets store sensitive information like database credentials:

Creating Secrets

goldsky secret create MY_SECRET_NAME

Using Secrets

Reference secrets in your pipeline configuration:
transforms:
  my_table:
    type: dynamic_table
    secret_name: MY_POSTGRES # Reference the secret

sinks:
  my_sink:
    type: postgres
    secret_name: MY_POSTGRES # Reference the secret

Secret Formats

PostgreSQL:
postgres://username:password@host:port/database
ClickHouse:
https://username:password@host:port/database

Validation

Before deploying, validate your pipeline configuration:
goldsky turbo validate my-pipeline.yaml
This checks for:
  • YAML syntax errors
  • Required fields
  • Invalid parameter values
  • Source/transform/sink references
  • SQL syntax (basic validation)

Best Practices

Choose names that clearly indicate what each component does:
# Good
sources:
  polygon_erc20_transfers:
    type: dataset
    # ...

transforms:
  high_value_transfers:
    type: sql
    # ...

# Avoid
sources:
  source_1:  # Not descriptive
    # ...
Add comments to explain complex logic:
transforms:
  # Filter to only USDC transfers over $10,000
  large_usdc_transfers:
    type: sql
    sql: |
      SELECT * FROM transfers
      WHERE contract = lower('0x...')
        AND CAST(value AS DECIMAL) > 10000000000  -- $10k in 6 decimals
Begin with resource_size: s and scale up if needed:
name: my-pipeline
resource_size: s  # Start small, monitor performance
Unless you need historical data, start from the latest:
sources:
  my_source:
    start_at: latest  # Only process new data
Always validate your configuration:
goldsky turbo validate my-pipeline.yaml
goldsky turbo apply -f my-pipeline.yaml