> ## Documentation Index
> Fetch the complete documentation index at: https://docs.goldsky.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Turbo Pipeline Configuration

> Complete reference for Turbo pipeline YAML syntax

## Overview

Turbo pipelines are defined using YAML configuration files that specify sources, transforms, and sinks. This guide covers the complete configuration syntax.

## Basic Structure

```yaml theme={null}
name: <pipeline-name>
resource_size: xs | s | m | l | xl | xxl
description: <optional-description>
job: false # Optional: set to true for one-time jobs

sources: <source-config>

transforms: <transform-config>

sinks: <sink-config>
```

## Top-Level Fields

<ParamField path="name" type="string" required>
  Unique identifier for your pipeline. Must use only lowercase letters, numbers, and hyphens, and start and end with a letter or number (e.g., `erc20-tracker`, `solana-blocks`).
</ParamField>

<ParamField path="resource_size" type="string" default="s">
  Resource allocation for the pipeline. Each tier roughly doubles the previous tier's CPU and memory. CPU limit is 3x the request; memory has no limit.

  | Size  | CPU Request | CPU Limit | Memory  |
  | ----- | ----------- | --------- | ------- |
  | `xs`  | 0.4         | 1.2       | 0.5 Gi  |
  | `s`   | 0.8         | 2.4       | 1.0 Gi  |
  | `m`   | 1.6         | 4.8       | 2.0 Gi  |
  | `l`   | 3.2         | 9.6       | 4.0 Gi  |
  | `xl`  | 6.4         | 19.2      | 8.0 Gi  |
  | `xxl` | 12.8        | 38.4      | 16.0 Gi |
</ParamField>

<ParamField path="description" type="string">
  Optional description of what the pipeline does.
</ParamField>

<ParamField path="job" type="boolean" default="false">
  Run the pipeline as a one-time job instead of a long-running stream. Jobs run to a terminal state (success or failure) and auto-delete 1 hour after termination. See the [Job Mode](/turbo-pipelines/job-mode) guide for details.
</ParamField>

## Sources

Define where your data comes from. See the [Data Sources](/turbo-pipelines/sources/overview) for detailed examples with EVM and Solana datasets.

### Dataset Source

```yaml theme={null}
sources:
  <reference-name>:
    type: dataset
    dataset_name: <chain>.<dataset-type>
    version: <version>
    start_at: latest | earliest
```

**Example:**

```yaml theme={null}
sources:
  polygon_transfers:
    type: dataset
    dataset_name: matic.erc20_transfers
    version: 1.2.0
    start_at: latest
```

## Transforms

Process and transform your data. See the [Transforms](/turbo-pipelines/transforms/overview) documentation for details.

### SQL Transform

```yaml theme={null}
transforms:
  <reference-name>:
    type: sql
    primary_key: <column-name>
    sql: |
      <sql-query>
```

SQL transforms reference upstream sources and transforms directly in the `FROM` clause of the query. There is no separate `from:` field.

**Example:**

```yaml theme={null}
transforms:
  filtered_transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        lower(contract_address) as token,
        CAST(value AS DECIMAL) as amount
      FROM polygon_transfers
      WHERE CAST(value AS DECIMAL) > 1000000
```

### Dynamic Table

```yaml theme={null}
transforms:
  <reference-name>:
    type: dynamic_table
    backend_type: Postgres | InMemory
    backend_entity_name: <table-name>
    secret_name: <secret-name> # Required for Postgres backend
    sql: | # Optional
      <sql-query>
    schema: <postgres-schema> # Optional, Postgres only
    column: <column-name> # Optional, column used for lookups
    time_column: <column-name> # Optional, column used for refresh bookkeeping
```

**Example:**

```yaml theme={null}
transforms:
  tracked_wallets:
    type: dynamic_table
    backend_type: Postgres
    backend_entity_name: user_wallets
    secret_name: MY_POSTGRES
```

### HTTP Handler Transform

```yaml theme={null}
transforms:
  <reference-name>:
    type: handler
    from: <source-or-transform>
    url: <endpoint-url>
    primary_key: <column-name>
    one_row_per_request: true | false # Optional
    headers: # Optional
      <header-name>: <header-value>
    payload_version: <number> # Optional
```

**Example:**

```yaml theme={null}
transforms:
  enriched_events:
    type: handler
    from: filtered_events
    url: https://api.example.com/enrich
    primary_key: id
    one_row_per_request: false
```

### WebAssembly Script Transform

```yaml theme={null}
transforms:
  <reference-name>:
    type: script
    from: <source-or-transform>
    language: javascript | typescript
    primary_key: <column-name>
    script: |
      function process(input) {
        // Your code here
        return input;
      }
```

**Example:**

```yaml theme={null}
transforms:
  custom_processing:
    type: script
    from: raw_data
    language: javascript
    primary_key: id
    script: |
      function process(input) {
        input.processed = true;
        input.timestamp = Date.now();
        return input;
      }
```

## Sinks

Write processed data to destinations. See the [Sinks](/turbo-pipelines/sinks) documentation for complete information.

### PostgreSQL Sink

```yaml theme={null}
sinks:
  <reference-name>:
    type: postgres
    from: <transform-name>
    schema: <schema-name>
    table: <table-name>
    secret_name: <secret-name>
    primary_key: <column-name> # Optional
    batch_size: <number> # Optional
    batch_flush_interval: <duration> # Optional, e.g. 1s
```

**Example:**

```yaml theme={null}
sinks:
  postgres_output:
    type: postgres
    from: filtered_transfers
    schema: public
    table: erc20_transfers
    secret_name: MY_POSTGRES
    primary_key: id
```

### PostgreSQL Aggregation Sink

```yaml theme={null}
sinks:
  <reference-name>:
    type: postgres_aggregate
    from: <transform-name>
    schema: <schema-name>
    landing_table: <landing-table-name>
    agg_table: <aggregation-table-name>
    primary_key: <column-name>
    secret_name: <secret-name>
    group_by:
      <column-name>:
        from: <source-column> # Optional
        type: <postgres-type> # Optional
    aggregate:
      <column-name>:
        from: <source-column> # Optional
        fn: sum | count | avg | min | max
        type: <postgres-type> # Optional
```

**Example:**

```yaml theme={null}
sinks:
  account_balances:
    type: postgres_aggregate
    from: transfers
    schema: public
    landing_table: transfer_log
    agg_table: balances
    primary_key: transfer_id
    secret_name: MY_POSTGRES
    group_by:
      account:
        type: text
    aggregate:
      balance:
        from: amount
        fn: sum
```

### ClickHouse Sink

```yaml theme={null}
sinks:
  <reference-name>:
    type: clickhouse
    from: <transform-name>
    table: <table-name>
    primary_key: <column-name>
    secret_name: <secret-name>
    batch_size: <number> # Optional
    batch_flush_interval: <duration> # Optional, e.g. 1s
```

**Example:**

```yaml theme={null}
sinks:
  clickhouse_analytics:
    type: clickhouse
    from: aggregated_data
    table: transfers_analytics
    primary_key: id
    secret_name: MY_CLICKHOUSE
```

### Webhook Sink

```yaml theme={null}
sinks:
  <reference-name>:
    type: webhook
    from: <transform-name>
    url: <endpoint-url>
    one_row_per_request: true | false # Optional
    headers: # Optional
      <header-name>: <header-value>
    secret_name: <secret-name> # Optional
    payload_version: <number> # Optional
```

**Example:**

```yaml theme={null}
sinks:
  webhook_alerts:
    type: webhook
    from: high_value_transfers
    url: https://alerts.example.com/webhook
    one_row_per_request: true
```

### Kafka Sink

```yaml theme={null}
sinks:
  <reference-name>:
    type: kafka
    from: <transform-name>
    topic: <topic-name>
    data_format: avro | json
    topic_partitions: <number> # Optional
    secret_name: <secret-name> # Optional
    batch_size: <number> # Optional
    batch_flush_interval: <duration> # Optional, e.g. 100ms
```

**Example:**

```yaml theme={null}
sinks:
  kafka_output:
    type: kafka
    from: processed_events
    topic: processed.events
    topic_partitions: 10
    data_format: avro
```

### SQS Sink

```yaml theme={null}
sinks:
  <reference-name>:
    type: sqs_sink
    from: <transform-name>
    queue_url: <queue-url>
    secret_name: <secret-name>
    batch_flush_interval: <duration>
```

**Example:**

```yaml theme={null}
sinks:
  sqs_output:
    type: sqs_sink
    from: high_value_transfers
    queue_url: https://sqs.us-east-1.amazonaws.com/123456789012/my-queue
    secret_name: MY_SQS_SECRET
    batch_flush_interval: 1s
```

## Complete Example

Here's a complete pipeline that demonstrates multiple features:

```yaml Complete multi-chain pipeline example expandable theme={null}
name: multi-chain-token-tracker
resource_size: m
description: Track ERC-20 transfers across multiple chains for specific wallets

sources:
  # Ethereum transfers
  ethereum_transfers:
    type: dataset
    dataset_name: ethereum.erc20_transfers
    version: 1.2.0
    start_at: latest

  # Polygon transfers
  polygon_transfers:
    type: dataset
    dataset_name: matic.erc20_transfers
    version: 1.2.0
    start_at: latest

transforms:
  # Dynamic table for tracked wallets
  tracked_wallets:
    type: dynamic_table
    backend_type: Postgres
    backend_entity_name: user_wallets
    secret_name: MY_POSTGRES

  # Combine both chains
  all_transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT *, 'ethereum' as chain, 1 as chain_id FROM ethereum_transfers
      UNION ALL
      SELECT *, 'polygon' as chain, 137 as chain_id FROM polygon_transfers

  # Add transfer direction
  final_transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT
        *,
        CASE
          WHEN dynamic_table_check('tracked_wallets', sender) THEN 'outgoing'
          WHEN dynamic_table_check('tracked_wallets', recipient) THEN 'incoming'
          ELSE 'unknown'
        END as direction
      FROM all_transfers
      WHERE
        dynamic_table_check('tracked_wallets', sender)
        OR dynamic_table_check('tracked_wallets', recipient)

sinks:
  # Store in PostgreSQL
  postgres_archive:
    type: postgres
    from: final_transfers
    schema: public
    table: wallet_transfers
    secret_name: MY_POSTGRES
    primary_key: id

  # Send alerts for high-value transfers
  webhook_alerts:
    type: webhook
    from: final_transfers
    url: https://api.example.com/transfer-alert

  # Publish to analytics Kafka topic
  kafka_analytics:
    type: kafka
    from: final_transfers
    topic: wallet.transfers.analytics
    topic_partitions: 10
    data_format: avro
```

## Reference Names

Throughout your pipeline, you reference sources and transforms by their configured names:

```yaml theme={null}
sources:
  my_source: # Reference name
    type: dataset
    # ...

transforms:
  transform_1: # Reference name
    type: sql
    sql: SELECT * FROM my_source # Use source reference name

  transform_2: # Reference name
    type: sql
    from: transform_1 # Use transform reference name
    sql: SELECT * FROM transform_1

sinks:
  my_sink:
    type: postgres
    from: transform_2 # Use transform reference name
```

**Naming Guidelines:**

* Use descriptive, lowercase names with underscores or hyphens
* Avoid special characters except `_` and `-`
* Examples: `ethereum_blocks`, `filtered-transfers`, `enriched_data`

## Secrets

Secrets store sensitive information like database credentials:

### Creating Secrets

```bash theme={null}
goldsky secret create MY_SECRET_NAME
```

### Using Secrets

Reference secrets in your pipeline configuration:

```yaml theme={null}
transforms:
  my_table:
    type: dynamic_table
    secret_name: MY_POSTGRES # Reference the secret

sinks:
  my_sink:
    type: postgres
    secret_name: MY_POSTGRES # Reference the secret
```

### Secret Formats

**PostgreSQL:**

```
postgres://username:password@host:port/database
```

**ClickHouse:**

```
https://username:password@host:port/database
```

## Validation

Before deploying, validate your pipeline configuration:

```bash theme={null}
goldsky turbo validate my-pipeline.yaml
```

This checks for:

* YAML syntax errors
* Required fields
* Invalid parameter values
* Source/transform/sink references
* SQL syntax (basic validation)

## Best Practices

<AccordionGroup>
  <Accordion title="1. Use descriptive names">
    Choose names that clearly indicate what each component does:

    ```yaml theme={null}
    # Good
    sources:
      polygon_erc20_transfers:
        type: dataset
        # ...

    transforms:
      high_value_transfers:
        type: sql
        # ...

    # Avoid
    sources:
      source_1:  # Not descriptive
        # ...
    ```
  </Accordion>

  <Accordion title="2. Comment your configuration">
    Add comments to explain complex logic:

    ```yaml theme={null}
    transforms:
      # Filter to only USDC transfers over $10,000
      large_usdc_transfers:
        type: sql
        sql: |
          SELECT * FROM transfers
          WHERE contract = lower('0x...')
            AND CAST(value AS DECIMAL) > 10000000000  -- $10k in 6 decimals
    ```
  </Accordion>

  <Accordion title="3. Start with small resource sizes">
    Begin with `resource_size: s` and scale up if needed:

    ```yaml theme={null}
    name: my-pipeline
    resource_size: s  # Start small, monitor performance
    ```
  </Accordion>

  <Accordion title="4. Use `start_at: latest` for new pipelines">
    Unless you need historical data, start from the latest:

    ```yaml theme={null}
    sources:
      my_source:
        start_at: latest  # Only process new data
    ```
  </Accordion>

  <Accordion title="5. Validate before deploying">
    Always validate your configuration:

    ```bash theme={null}
    goldsky turbo validate my-pipeline.yaml
    goldsky turbo apply -f my-pipeline.yaml
    ```
  </Accordion>
</AccordionGroup>
