SQL transforms let you process streaming data with SQL. They run on
Apache DataFusion, so the dialect is standard
DataFusion SQL — with Turbo-specific extensions for blockchain data. See the
SQL Functions Reference for the full
list of built-in functions.
Streaming SQL Limitations: Joins, aggregations, and window functions are
not supported in streaming mode. Each transform must read from a single base
table. Use dynamic tables for
lookup-style joins and chain SQL transforms for multi-step logic.
The SQL query to execute. Reference sources or upstream transforms by name
directly in the FROM clause — there is no separate from: field for SQL
transforms.
transforms: normalized_addresses: type: sql primary_key: id sql: | SELECT id, lower(from_address) as from_address, lower(to_address) as to_address, CAST(value AS DECIMAL) / 1e18 as value_eth, block_timestamp FROM transfers
transforms: typed_data: type: sql primary_key: id sql: | SELECT id, CAST(block_number AS BIGINT) as block_num, CAST(value AS VARCHAR) as value_string, CAST(timestamp AS TIMESTAMP) as block_time FROM source
transforms: string_operations: type: sql primary_key: id sql: | SELECT id, lower(address) as address_lower, upper(symbol) as symbol_upper, concat('0x', tx_hash) as full_hash, substring(address, 1, 10) as short_address, length(data) as data_length FROM tokens
Use CASE statements for conditional transformations:
transforms: categorized_transfers: type: sql primary_key: id sql: | SELECT *, CASE WHEN CAST(value AS DECIMAL) > 1000000 THEN 'large' WHEN CAST(value AS DECIMAL) > 100000 THEN 'medium' ELSE 'small' END as transfer_size FROM erc20_transfers
transforms: labeled_data: type: sql primary_key: id sql: | SELECT *, 'polygon' as chain, 'erc20' as token_standard, 137 as chain_id FROM matic_transfers
JSON Functions - Query and construct JSON with json_query(), json_object()
Encoding - Hex, Base58, and binary conversions
Example: Decode ERC-20 Transfer Events
transforms: decoded_transfers: type: sql primary_key: id sql: | WITH decoded AS ( SELECT id, evm_log_decode( '[{"anonymous":false,"inputs":[{"indexed":true,"name":"from","type":"address"},{"indexed":true,"name":"to","type":"address"},{"indexed":false,"name":"value","type":"uint256"}],"name":"Transfer","type":"event"}]', topics, data ) as evt FROM ethereum_logs WHERE SPLIT_INDEX(topics, ',', 0) = '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef' ) SELECT id, lower(evt.event_params[1]) as from_address, lower(evt.event_params[2]) as to_address, evt.event_params[3] as value FROM decoded
Example: Calculate Token Amounts with U256
sql: | SELECT id, sender, recipient, amount as amount_wei, -- Convert wei to ETH using U256 u256_to_string( to_u256(amount) / to_u256('1000000000000000000') ) as amount_eth FROM erc20_transfers
U256/I256 types support standard operators (+, -, *, /, %) which are
automatically rewritten by the SQL preprocessor to their function equivalents.
Aggregations (GROUP BY, COUNT, SUM, etc.) - Require stateful processing; use postgres aggregate sinks for aggregations at write time
Window functions - Not supported in streaming context
Each SQL transform must read from a single base source or upstream transform.
CTEs (WITH ...) and derived subqueries in the FROM clause are supported as
long as they ultimately reference one base table. UNION ALL between queries
on the same base table is also allowed.
Only select the columns you need to reduce data transfer and memory usage:
# Good - only select what you needsql: SELECT id, address, value FROM transfers# Avoid - selecting everythingsql: SELECT * FROM transfers
2. Filter early
Apply filters as early as possible in your pipeline. Chain SQL transforms by
referencing upstream transform names in the FROM clause:
# Good - filter in the first transform, then enrich downstreamtransforms: filtered: type: sql primary_key: id sql: SELECT * FROM source WHERE chain_id = 1 enriched: type: sql primary_key: id sql: SELECT *, 'ethereum' as network FROM filtered
3. Use appropriate data types
Cast to the correct types for your use case:
sql: | SELECT CAST(block_number AS BIGINT) as block_number, CAST(value AS DECIMAL(38, 0)) as value, CAST(timestamp AS TIMESTAMP) as timestamp FROM source
4. _gs_op is auto-propagated
_gs_op (the change-data-capture operation column used by upsert-aware
sinks) is automatically carried through SQL transforms — you do not need
to select it explicitly. You can still include it if you want to project
or reorder it: