Skip to main content

Overview

SQL transforms allow you to use standard SQL syntax to process streaming data. They’re powered by Apache DataFusion, providing efficient query execution on columnar data.
Streaming SQL Limitations: Joins, aggregations, and window functions are not supported in streaming mode. Use dynamic tables for lookup-style joins and transform chaining for complex logic.

Basic Configuration

transforms:
  my_transform:
    type: sql
    primary_key: id
    sql: |
      SELECT column1, column2
      FROM source_name
      WHERE condition = true

Parameters

type
string
required
Must be sql
primary_key
string
required
The column that uniquely identifies each row (e.g., id, transaction_hash, log_index)
sql
string
required
The SQL query to execute. Can reference sources or other transforms by their reference name.
from
string
Optional. Specifies which transform or source to read from. If omitted, the transform reads from the source.

Supported SQL Features

SELECT and Projections

Select specific columns from your data:
transforms:
  selected_columns:
    type: sql
    primary_key: transaction_hash
    sql: |
      SELECT
        transaction_hash,
        block_number,
        block_timestamp,
        from_address,
        to_address,
        value
      FROM ethereum_transactions

WHERE Clauses (Filtering)

Filter rows based on conditions:
transforms:
  high_value_transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT *
      FROM erc20_transfers
      WHERE CAST(amount AS DECIMAL) > 1000000

Column Transformations

Transform column values using functions:
transforms:
  normalized_addresses:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        lower(from_address) as from_address,
        lower(to_address) as to_address,
        CAST(value AS DECIMAL) / 1e18 as value_eth,
        block_timestamp
      FROM transfers

CAST and Type Conversions

Convert between data types:
transforms:
  typed_data:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        CAST(block_number AS BIGINT) as block_num,
        CAST(value AS VARCHAR) as value_string,
        CAST(timestamp AS TIMESTAMP) as block_time
      FROM source

String Functions

Common string operations:
transforms:
  string_operations:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        lower(address) as address_lower,
        upper(symbol) as symbol_upper,
        concat('0x', tx_hash) as full_hash,
        substring(address, 1, 10) as short_address,
        length(data) as data_length
      FROM tokens

Date and Time Functions

Work with timestamps:
transforms:
  time_analysis:
    type: sql
    primary_key: slot
    sql: |
      SELECT
        slot,
        to_timestamp_micros(blockTime * 1000000) as block_timestamp,
        date_part('epoch', current_timestamp()) - blockTime as block_age_seconds,
        date_part('hour', to_timestamp(blockTime)) as hour_of_day
      FROM solana_blocks

Array Functions

Process array columns:
transforms:
  array_ops:
    type: sql
    primary_key: slot
    sql: |
      SELECT
        slot,
        array_length(transactions) as transaction_count,
        transactions[1] as first_transaction
      FROM solana_blocks

Conditional Logic (CASE)

Use CASE statements for conditional transformations:
transforms:
  categorized_transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT
        *,
        CASE
          WHEN CAST(value AS DECIMAL) > 1000000 THEN 'large'
          WHEN CAST(value AS DECIMAL) > 100000 THEN 'medium'
          ELSE 'small'
        END as transfer_size
      FROM erc20_transfers

Adding Literal Columns

Add constant values or labels:
transforms:
  labeled_data:
    type: sql
    primary_key: id
    sql: |
      SELECT
        *,
        'polygon' as chain,
        'erc20' as token_standard,
        137 as chain_id
      FROM matic_transfers

Custom SQL Functions

Turbo pipelines includes 100+ custom SQL functions for blockchain data processing:
  • EVM Functions - Decode logs with evm_log_decode(), hash with _gs_keccak256()
  • Solana Functions - Decode instructions, analyze transactions, track balances
  • Large Number Arithmetic - U256/I256 functions for precise token calculations
  • Array Processing - array_filter(), array_enumerate(), zip_arrays()
  • JSON Functions - Query and construct JSON with json_query(), json_object()
  • Encoding - Hex, Base58, and binary conversions
Example: Decode ERC-20 Transfer Events
transforms:
  decoded_transfers:
    type: sql
    primary_key: id
    sql: |
      WITH decoded AS (
        SELECT
          id,
          evm_log_decode(
            '[{"anonymous":false,"inputs":[{"indexed":true,"name":"from","type":"address"},{"indexed":true,"name":"to","type":"address"},{"indexed":false,"name":"value","type":"uint256"}],"name":"Transfer","type":"event"}]',
            topics,
            data
          ) as evt
        FROM ethereum_logs
        WHERE topics[1] = '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'
      )
      SELECT
        id,
        lower(evt.event_params[1]) as from_address,
        lower(evt.event_params[2]) as to_address,
        evt.event_params[3] as value
      FROM decoded
Example: Calculate Token Amounts with U256
sql: |
  SELECT
    id,
    sender,
    recipient,
    value as value_wei,
    -- Convert wei to ETH using U256
    u256_to_string(
      to_u256(amount) / to_u256('1000000000000000000')
    ) as value_eth
  FROM erc20_transfers
U256/I256 types support standard operators (+, -, *, /, %) which are automatically rewritten by the SQL preprocessor to their function equivalents.
See the SQL Functions Reference for the complete list of functions with detailed examples.

Dynamic Table Integration

Use dynamic_table_check() to filter based on values in a dynamic table:
transforms:
  tracked_contracts:
    type: dynamic_table
    backend_type: Postgres
    backend_entity_name: tracked_contracts
    secret_name: MY_POSTGRES

  filtered_logs:
    type: sql
    primary_key: id
    sql: |
      SELECT *
      FROM ethereum_logs
      WHERE dynamic_table_check('tracked_contracts', address)
See the Dynamic Tables documentation for more details.

Limitations

The following SQL features are not supported in streaming mode:
  • Joins - Use dynamic tables for lookup-style joins
  • Aggregations (GROUP BY, COUNT, SUM, etc.) - Requires stateful processing
  • Window functions - Not supported in streaming context
  • Subqueries - Use transform chaining instead
These limitations ensure predictable streaming performance.

Best Practices

Only select the columns you need to reduce data transfer and memory usage:
# Good - only select what you need
sql: SELECT id, address, value FROM transfers

# Avoid - selecting everything
sql: SELECT * FROM transfers
Apply filters as early as possible in your pipeline:
# Good - filter in the first transform
transforms:
  filtered:
    type: sql
    sql: SELECT * FROM source WHERE chain_id = 1

  enriched:
    type: sql
    from: filtered
    sql: SELECT *, 'ethereum' as network FROM filtered
Cast to the correct types for your use case:
sql: |
  SELECT
    CAST(block_number AS BIGINT) as block_number,
    CAST(value AS DECIMAL(38, 0)) as value,
    CAST(timestamp AS TIMESTAMP) as timestamp
  FROM source
If your sink needs upsert semantics, include _gs_op:
sql: |
  SELECT
    id,
    address,
    value,
    _gs_op  -- Include for proper upsert behavior
  FROM transfers
Make your transformed data self-documenting:
sql: |
  SELECT
    id,
    from_address as sender,
    to_address as recipient,
    CAST(value AS DECIMAL) / 1e18 as amount_eth
  FROM transfers

Example: Multi-Chain Token Transfers

Here’s a complete example processing token transfers from multiple chains:
name: multi-chain-transfers
resource_size: s

sources:
  ethereum_transfers:
    type: dataset
    dataset_name: ethereum.erc20_transfers
    version: 1.2.0
    start_at: latest

  polygon_transfers:
    type: dataset
    dataset_name: matic.erc20_transfers
    version: 1.2.0
    start_at: latest

transforms:
  # Normalize Ethereum transfers
  eth_normalized:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        address as token_address,
        sender,
        recipient,
        CAST(amount AS DECIMAL(38, 0)) as raw_value,
        block_number,
        block_timestamp,
        'ethereum' as chain,
        1 as chain_id,
        _gs_op
      FROM ethereum_transfers

  # Normalize Polygon transfers
  polygon_normalized:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        address as token_address,
        sender,
        recipient,
        CAST(amount AS DECIMAL(38, 0)) as raw_value,
        block_number,
        block_timestamp,
        'polygon' as chain,
        137 as chain_id,
        _gs_op
      FROM polygon_transfers

  # Combine both chains
  all_transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT * FROM eth_normalized
      UNION ALL
      SELECT * FROM polygon_normalized

  # Filter for high-value transfers
  high_value:
    type: sql
    from: all_transfers
    primary_key: id
    sql: |
      SELECT
        *,
        CASE
          WHEN raw_value > 1000000000000000000000000 THEN 'whale'
          WHEN raw_value > 1000000000000000000000 THEN 'large'
          ELSE 'normal'
        END as transfer_category
      FROM all_transfers
      WHERE raw_value > 1000000000000000000  -- > 1 token (assuming 18 decimals)

sinks:
  postgres_sink:
    type: postgres
    from: high_value
    schema: public
    table: multi_chain_transfers
    secret_name: MY_POSTGRES

Next Steps