> ## Documentation Index
> Fetch the complete documentation index at: https://docs.goldsky.com/llms.txt
> Use this file to discover all available pages before exploring further.

# SQL

> Filter, project, and transform streaming data with SQL

## Overview

SQL transforms let you process streaming data with SQL. They run on
[Apache DataFusion](https://datafusion.apache.org/), so the dialect is standard
DataFusion SQL — with Turbo-specific extensions for blockchain data. See the
[SQL Functions Reference](/turbo-pipelines/reference/sql-functions) for the full
list of built-in functions.

<Note>
  **Streaming SQL Limitations**: Joins, aggregations, and window functions are
  not supported in streaming mode. Each transform must read from a single base
  table. Use [dynamic tables](/turbo-pipelines/transforms/dynamic-tables) for
  lookup-style joins and chain SQL transforms for multi-step logic.
</Note>

## Basic Configuration

```yaml theme={null}
transforms:
  my_transform:
    type: sql
    primary_key: id
    sql: |
      SELECT column1, column2
      FROM source_name
      WHERE condition = true
```

### Parameters

<ParamField path="type" type="string" required>
  Must be `sql`
</ParamField>

<ParamField path="primary_key" type="string" required>
  The column that uniquely identifies each row (e.g., `id`, `transaction_hash`,
  `log_index`)
</ParamField>

<ParamField path="sql" type="string" required>
  The SQL query to execute. Reference sources or upstream transforms by name
  directly in the `FROM` clause — there is no separate `from:` field for SQL
  transforms.
</ParamField>

## Supported SQL Features

### SELECT and Projections

Select specific columns from your data:

```yaml theme={null}
transforms:
  selected_columns:
    type: sql
    primary_key: transaction_hash
    sql: |
      SELECT
        transaction_hash,
        block_number,
        block_timestamp,
        from_address,
        to_address,
        value
      FROM ethereum_transactions
```

### WHERE Clauses (Filtering)

Filter rows based on conditions:

```yaml theme={null}
transforms:
  high_value_transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT *
      FROM erc20_transfers
      WHERE CAST(amount AS DECIMAL) > 1000000
```

### Column Transformations

Transform column values using functions:

```yaml theme={null}
transforms:
  normalized_addresses:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        lower(from_address) as from_address,
        lower(to_address) as to_address,
        CAST(value AS DECIMAL) / 1e18 as value_eth,
        block_timestamp
      FROM transfers
```

### CAST and Type Conversions

Convert between data types:

```yaml theme={null}
transforms:
  typed_data:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        CAST(block_number AS BIGINT) as block_num,
        CAST(value AS VARCHAR) as value_string,
        CAST(timestamp AS TIMESTAMP) as block_time
      FROM source
```

### String Functions

Common string operations:

```yaml theme={null}
transforms:
  string_operations:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        lower(address) as address_lower,
        upper(symbol) as symbol_upper,
        concat('0x', tx_hash) as full_hash,
        substring(address, 1, 10) as short_address,
        length(data) as data_length
      FROM tokens
```

### Date and Time Functions

Work with timestamps:

```yaml theme={null}
transforms:
  time_analysis:
    type: sql
    primary_key: slot
    sql: |
      SELECT
        slot,
        to_timestamp_micros(blockTime * 1000000) as block_timestamp,
        date_part('epoch', current_timestamp()) - blockTime as block_age_seconds,
        date_part('hour', to_timestamp(blockTime)) as hour_of_day
      FROM solana_blocks
```

### Array Functions

Process array columns:

```yaml theme={null}
transforms:
  array_ops:
    type: sql
    primary_key: slot
    sql: |
      SELECT
        slot,
        array_length(transactions) as transaction_count,
        transactions[1] as first_transaction
      FROM solana_blocks
```

### Conditional Logic (CASE)

Use CASE statements for conditional transformations:

```yaml theme={null}
transforms:
  categorized_transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT
        *,
        CASE
          WHEN CAST(value AS DECIMAL) > 1000000 THEN 'large'
          WHEN CAST(value AS DECIMAL) > 100000 THEN 'medium'
          ELSE 'small'
        END as transfer_size
      FROM erc20_transfers
```

### Adding Literal Columns

Add constant values or labels:

```yaml theme={null}
transforms:
  labeled_data:
    type: sql
    primary_key: id
    sql: |
      SELECT
        *,
        'polygon' as chain,
        'erc20' as token_standard,
        137 as chain_id
      FROM matic_transfers
```

## Custom SQL Functions

Turbo pipelines includes 100+ custom SQL functions for blockchain data processing:

* **EVM Functions** - Decode logs with `evm_log_decode()`, hash with `_gs_keccak256()`
* **Solana Functions** - Decode instructions, analyze transactions, track balances
* **Large Number Arithmetic** - U256/I256 functions for precise token calculations
* **Array Processing** - `array_filter()`, `array_enumerate()`, `zip_arrays()`
* **JSON Functions** - Query and construct JSON with `json_query()`, `json_object()`
* **Encoding** - Hex, Base58, and binary conversions

**Example: Decode ERC-20 Transfer Events**

```yaml theme={null}
transforms:
  decoded_transfers:
    type: sql
    primary_key: id
    sql: |
      WITH decoded AS (
        SELECT
          id,
          evm_log_decode(
            '[{"anonymous":false,"inputs":[{"indexed":true,"name":"from","type":"address"},{"indexed":true,"name":"to","type":"address"},{"indexed":false,"name":"value","type":"uint256"}],"name":"Transfer","type":"event"}]',
            topics,
            data
          ) as evt
        FROM ethereum_logs
        WHERE SPLIT_INDEX(topics, ',', 0) = '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'
      )
      SELECT
        id,
        lower(evt.event_params[1]) as from_address,
        lower(evt.event_params[2]) as to_address,
        evt.event_params[3] as value
      FROM decoded
```

**Example: Calculate Token Amounts with U256**

```yaml theme={null}
sql: |
  SELECT
    id,
    sender,
    recipient,
    amount as amount_wei,
    -- Convert wei to ETH using U256
    u256_to_string(
      to_u256(amount) / to_u256('1000000000000000000')
    ) as amount_eth
  FROM erc20_transfers
```

<Note>
  U256/I256 types support standard operators (`+`, `-`, `*`, `/`, `%`) which are
  automatically rewritten by the SQL preprocessor to their function equivalents.
</Note>

See the [SQL Functions Reference](/turbo-pipelines/reference/sql-functions) for the complete list of functions with detailed examples.

## Dynamic Table Integration

Use `dynamic_table_check()` to filter based on values in a dynamic table:

```yaml theme={null}
transforms:
  tracked_contracts:
    type: dynamic_table
    backend_type: Postgres
    backend_entity_name: tracked_contracts
    secret_name: MY_POSTGRES

  filtered_logs:
    type: sql
    primary_key: id
    sql: |
      SELECT *
      FROM ethereum_logs
      WHERE dynamic_table_check('tracked_contracts', address)
```

See the [Dynamic Tables](/turbo-pipelines/transforms/dynamic-tables) documentation for more details.

## Limitations

<Warning>
  The following SQL features are **not supported** in streaming mode:

  * **Joins** - Use [dynamic tables](/turbo-pipelines/transforms/dynamic-tables) for lookup-style joins
  * **Aggregations** (GROUP BY, COUNT, SUM, etc.) - Require stateful processing; use [postgres aggregate sinks](/turbo-pipelines/sinks/postgres) for aggregations at write time
  * **Window functions** - Not supported in streaming context

  Each SQL transform must read from a single base source or upstream transform.
  CTEs (`WITH ...`) and derived subqueries in the `FROM` clause are supported as
  long as they ultimately reference one base table. `UNION ALL` between queries
  on the same base table is also allowed.
</Warning>

## Best Practices

<AccordionGroup>
  <Accordion title="1. Select only needed columns">
    Only select the columns you need to reduce data transfer and memory usage:

    ```yaml theme={null}
    # Good - only select what you need
    sql: SELECT id, address, value FROM transfers

    # Avoid - selecting everything
    sql: SELECT * FROM transfers
    ```
  </Accordion>

  <Accordion title="2. Filter early">
    Apply filters as early as possible in your pipeline. Chain SQL transforms by
    referencing upstream transform names in the `FROM` clause:

    ```yaml theme={null}
    # Good - filter in the first transform, then enrich downstream
    transforms:
      filtered:
        type: sql
        primary_key: id
        sql: SELECT * FROM source WHERE chain_id = 1

      enriched:
        type: sql
        primary_key: id
        sql: SELECT *, 'ethereum' as network FROM filtered
    ```
  </Accordion>

  <Accordion title="3. Use appropriate data types">
    Cast to the correct types for your use case:

    ```yaml theme={null}
    sql: |
      SELECT
        CAST(block_number AS BIGINT) as block_number,
        CAST(value AS DECIMAL(38, 0)) as value,
        CAST(timestamp AS TIMESTAMP) as timestamp
      FROM source
    ```
  </Accordion>

  <Accordion title="4. _gs_op is auto-propagated">
    `_gs_op` (the change-data-capture operation column used by upsert-aware
    sinks) is automatically carried through SQL transforms — you do not need
    to select it explicitly. You can still include it if you want to project
    or reorder it:

    ```yaml theme={null}
    sql: |
      SELECT
        id,
        address,
        amount,
        _gs_op  -- Optional; already propagated automatically
      FROM transfers
    ```
  </Accordion>

  <Accordion title="5. Use descriptive column aliases">
    Make your transformed data self-documenting:

    ```yaml theme={null}
    sql: |
      SELECT
        id,
        from_address as sender,
        to_address as recipient,
        CAST(value AS DECIMAL) / 1e18 as amount_eth
      FROM transfers
    ```
  </Accordion>
</AccordionGroup>

## Example: Multi-Chain Token Transfers

Here's a complete example processing token transfers from multiple chains:

```yaml theme={null}
name: multi-chain-transfers
resource_size: s

sources:
  ethereum_transfers:
    type: dataset
    dataset_name: ethereum.erc20_transfers
    version: 1.2.0
    start_at: latest

  polygon_transfers:
    type: dataset
    dataset_name: matic.erc20_transfers
    version: 1.2.0
    start_at: latest

transforms:
  # Normalize Ethereum transfers
  eth_normalized:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        address as token_address,
        sender,
        recipient,
        CAST(amount AS DECIMAL(38, 0)) as raw_value,
        block_number,
        block_timestamp,
        'ethereum' as chain,
        1 as chain_id,
        _gs_op
      FROM ethereum_transfers

  # Normalize Polygon transfers
  polygon_normalized:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        address as token_address,
        sender,
        recipient,
        CAST(amount AS DECIMAL(38, 0)) as raw_value,
        block_number,
        block_timestamp,
        'polygon' as chain,
        137 as chain_id,
        _gs_op
      FROM polygon_transfers

  # Combine both chains
  all_transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT * FROM eth_normalized
      UNION ALL
      SELECT * FROM polygon_normalized

  # Filter for high-value transfers
  high_value:
    type: sql
    primary_key: id
    sql: |
      SELECT
        *,
        CASE
          WHEN raw_value > 1000000000000000000000000 THEN 'whale'
          WHEN raw_value > 1000000000000000000000 THEN 'large'
          ELSE 'normal'
        END as transfer_category
      FROM all_transfers
      WHERE raw_value > 1000000000000000000  -- > 1 token (assuming 18 decimals)

sinks:
  postgres_sink:
    type: postgres
    from: high_value
    schema: public
    table: multi_chain_transfers
    secret_name: MY_POSTGRES
```
