Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.goldsky.com/llms.txt

Use this file to discover all available pages before exploring further.

Overview

SQL transforms let you process streaming data with SQL. They run on Apache DataFusion, so the dialect is standard DataFusion SQL — with Turbo-specific extensions for blockchain data. See the SQL Functions Reference for the full list of built-in functions.
Streaming SQL Limitations: Joins, aggregations, and window functions are not supported in streaming mode. Each transform must read from a single base table. Use dynamic tables for lookup-style joins and chain SQL transforms for multi-step logic.

Basic Configuration

transforms:
  my_transform:
    type: sql
    primary_key: id
    sql: |
      SELECT column1, column2
      FROM source_name
      WHERE condition = true

Parameters

type
string
required
Must be sql
primary_key
string
required
The column that uniquely identifies each row (e.g., id, transaction_hash, log_index)
sql
string
required
The SQL query to execute. Reference sources or upstream transforms by name directly in the FROM clause — there is no separate from: field for SQL transforms.

Supported SQL Features

SELECT and Projections

Select specific columns from your data:
transforms:
  selected_columns:
    type: sql
    primary_key: transaction_hash
    sql: |
      SELECT
        transaction_hash,
        block_number,
        block_timestamp,
        from_address,
        to_address,
        value
      FROM ethereum_transactions

WHERE Clauses (Filtering)

Filter rows based on conditions:
transforms:
  high_value_transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT *
      FROM erc20_transfers
      WHERE CAST(amount AS DECIMAL) > 1000000

Column Transformations

Transform column values using functions:
transforms:
  normalized_addresses:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        lower(from_address) as from_address,
        lower(to_address) as to_address,
        CAST(value AS DECIMAL) / 1e18 as value_eth,
        block_timestamp
      FROM transfers

CAST and Type Conversions

Convert between data types:
transforms:
  typed_data:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        CAST(block_number AS BIGINT) as block_num,
        CAST(value AS VARCHAR) as value_string,
        CAST(timestamp AS TIMESTAMP) as block_time
      FROM source

String Functions

Common string operations:
transforms:
  string_operations:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        lower(address) as address_lower,
        upper(symbol) as symbol_upper,
        concat('0x', tx_hash) as full_hash,
        substring(address, 1, 10) as short_address,
        length(data) as data_length
      FROM tokens

Date and Time Functions

Work with timestamps:
transforms:
  time_analysis:
    type: sql
    primary_key: slot
    sql: |
      SELECT
        slot,
        to_timestamp_micros(blockTime * 1000000) as block_timestamp,
        date_part('epoch', current_timestamp()) - blockTime as block_age_seconds,
        date_part('hour', to_timestamp(blockTime)) as hour_of_day
      FROM solana_blocks

Array Functions

Process array columns:
transforms:
  array_ops:
    type: sql
    primary_key: slot
    sql: |
      SELECT
        slot,
        array_length(transactions) as transaction_count,
        transactions[1] as first_transaction
      FROM solana_blocks

Conditional Logic (CASE)

Use CASE statements for conditional transformations:
transforms:
  categorized_transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT
        *,
        CASE
          WHEN CAST(value AS DECIMAL) > 1000000 THEN 'large'
          WHEN CAST(value AS DECIMAL) > 100000 THEN 'medium'
          ELSE 'small'
        END as transfer_size
      FROM erc20_transfers

Adding Literal Columns

Add constant values or labels:
transforms:
  labeled_data:
    type: sql
    primary_key: id
    sql: |
      SELECT
        *,
        'polygon' as chain,
        'erc20' as token_standard,
        137 as chain_id
      FROM matic_transfers

Custom SQL Functions

Turbo pipelines includes 100+ custom SQL functions for blockchain data processing:
  • EVM Functions - Decode logs with evm_log_decode(), hash with _gs_keccak256()
  • Solana Functions - Decode instructions, analyze transactions, track balances
  • Large Number Arithmetic - U256/I256 functions for precise token calculations
  • Array Processing - array_filter(), array_enumerate(), zip_arrays()
  • JSON Functions - Query and construct JSON with json_query(), json_object()
  • Encoding - Hex, Base58, and binary conversions
Example: Decode ERC-20 Transfer Events
transforms:
  decoded_transfers:
    type: sql
    primary_key: id
    sql: |
      WITH decoded AS (
        SELECT
          id,
          evm_log_decode(
            '[{"anonymous":false,"inputs":[{"indexed":true,"name":"from","type":"address"},{"indexed":true,"name":"to","type":"address"},{"indexed":false,"name":"value","type":"uint256"}],"name":"Transfer","type":"event"}]',
            topics,
            data
          ) as evt
        FROM ethereum_logs
        WHERE SPLIT_INDEX(topics, ',', 0) = '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'
      )
      SELECT
        id,
        lower(evt.event_params[1]) as from_address,
        lower(evt.event_params[2]) as to_address,
        evt.event_params[3] as value
      FROM decoded
Example: Calculate Token Amounts with U256
sql: |
  SELECT
    id,
    sender,
    recipient,
    amount as amount_wei,
    -- Convert wei to ETH using U256
    u256_to_string(
      to_u256(amount) / to_u256('1000000000000000000')
    ) as amount_eth
  FROM erc20_transfers
U256/I256 types support standard operators (+, -, *, /, %) which are automatically rewritten by the SQL preprocessor to their function equivalents.
See the SQL Functions Reference for the complete list of functions with detailed examples.

Dynamic Table Integration

Use dynamic_table_check() to filter based on values in a dynamic table:
transforms:
  tracked_contracts:
    type: dynamic_table
    backend_type: Postgres
    backend_entity_name: tracked_contracts
    secret_name: MY_POSTGRES

  filtered_logs:
    type: sql
    primary_key: id
    sql: |
      SELECT *
      FROM ethereum_logs
      WHERE dynamic_table_check('tracked_contracts', address)
See the Dynamic Tables documentation for more details.

Limitations

The following SQL features are not supported in streaming mode:
  • Joins - Use dynamic tables for lookup-style joins
  • Aggregations (GROUP BY, COUNT, SUM, etc.) - Require stateful processing; use postgres aggregate sinks for aggregations at write time
  • Window functions - Not supported in streaming context
Each SQL transform must read from a single base source or upstream transform. CTEs (WITH ...) and derived subqueries in the FROM clause are supported as long as they ultimately reference one base table. UNION ALL between queries on the same base table is also allowed.

Best Practices

Only select the columns you need to reduce data transfer and memory usage:
# Good - only select what you need
sql: SELECT id, address, value FROM transfers

# Avoid - selecting everything
sql: SELECT * FROM transfers
Apply filters as early as possible in your pipeline. Chain SQL transforms by referencing upstream transform names in the FROM clause:
# Good - filter in the first transform, then enrich downstream
transforms:
  filtered:
    type: sql
    primary_key: id
    sql: SELECT * FROM source WHERE chain_id = 1

  enriched:
    type: sql
    primary_key: id
    sql: SELECT *, 'ethereum' as network FROM filtered
Cast to the correct types for your use case:
sql: |
  SELECT
    CAST(block_number AS BIGINT) as block_number,
    CAST(value AS DECIMAL(38, 0)) as value,
    CAST(timestamp AS TIMESTAMP) as timestamp
  FROM source
_gs_op (the change-data-capture operation column used by upsert-aware sinks) is automatically carried through SQL transforms — you do not need to select it explicitly. You can still include it if you want to project or reorder it:
sql: |
  SELECT
    id,
    address,
    amount,
    _gs_op  -- Optional; already propagated automatically
  FROM transfers
Make your transformed data self-documenting:
sql: |
  SELECT
    id,
    from_address as sender,
    to_address as recipient,
    CAST(value AS DECIMAL) / 1e18 as amount_eth
  FROM transfers

Example: Multi-Chain Token Transfers

Here’s a complete example processing token transfers from multiple chains:
name: multi-chain-transfers
resource_size: s

sources:
  ethereum_transfers:
    type: dataset
    dataset_name: ethereum.erc20_transfers
    version: 1.2.0
    start_at: latest

  polygon_transfers:
    type: dataset
    dataset_name: matic.erc20_transfers
    version: 1.2.0
    start_at: latest

transforms:
  # Normalize Ethereum transfers
  eth_normalized:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        address as token_address,
        sender,
        recipient,
        CAST(amount AS DECIMAL(38, 0)) as raw_value,
        block_number,
        block_timestamp,
        'ethereum' as chain,
        1 as chain_id,
        _gs_op
      FROM ethereum_transfers

  # Normalize Polygon transfers
  polygon_normalized:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        address as token_address,
        sender,
        recipient,
        CAST(amount AS DECIMAL(38, 0)) as raw_value,
        block_number,
        block_timestamp,
        'polygon' as chain,
        137 as chain_id,
        _gs_op
      FROM polygon_transfers

  # Combine both chains
  all_transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT * FROM eth_normalized
      UNION ALL
      SELECT * FROM polygon_normalized

  # Filter for high-value transfers
  high_value:
    type: sql
    primary_key: id
    sql: |
      SELECT
        *,
        CASE
          WHEN raw_value > 1000000000000000000000000 THEN 'whale'
          WHEN raw_value > 1000000000000000000000 THEN 'large'
          ELSE 'normal'
        END as transfer_category
      FROM all_transfers
      WHERE raw_value > 1000000000000000000  -- > 1 token (assuming 18 decimals)

sinks:
  postgres_sink:
    type: postgres
    from: high_value
    schema: public
    table: multi_chain_transfers
    secret_name: MY_POSTGRES