SQL transforms allow you to use standard SQL syntax to process streaming data. They’re powered by Apache DataFusion, providing efficient query execution on columnar data.
Streaming SQL Limitations: Joins, aggregations, and window functions are not supported in streaming mode. Use dynamic tables for lookup-style joins and transform chaining for complex logic.
transforms: normalized_addresses: type: sql primary_key: id sql: | SELECT id, lower(from_address) as from_address, lower(to_address) as to_address, CAST(value AS DECIMAL) / 1e18 as value_eth, block_timestamp FROM transfers
transforms: typed_data: type: sql primary_key: id sql: | SELECT id, CAST(block_number AS BIGINT) as block_num, CAST(value AS VARCHAR) as value_string, CAST(timestamp AS TIMESTAMP) as block_time FROM source
transforms: string_operations: type: sql primary_key: id sql: | SELECT id, lower(address) as address_lower, upper(symbol) as symbol_upper, concat('0x', tx_hash) as full_hash, substring(address, 1, 10) as short_address, length(data) as data_length FROM tokens
Use CASE statements for conditional transformations:
Copy
Ask AI
transforms: categorized_transfers: type: sql primary_key: id sql: | SELECT *, CASE WHEN CAST(value AS DECIMAL) > 1000000 THEN 'large' WHEN CAST(value AS DECIMAL) > 100000 THEN 'medium' ELSE 'small' END as transfer_size FROM erc20_transfers
transforms: labeled_data: type: sql primary_key: id sql: | SELECT *, 'polygon' as chain, 'erc20' as token_standard, 137 as chain_id FROM matic_transfers
JSON Functions - Query and construct JSON with json_query(), json_object()
Encoding - Hex, Base58, and binary conversions
Example: Decode ERC-20 Transfer Events
Copy
Ask AI
transforms: decoded_transfers: type: sql primary_key: id sql: | WITH decoded AS ( SELECT id, evm_log_decode( '[{"anonymous":false,"inputs":[{"indexed":true,"name":"from","type":"address"},{"indexed":true,"name":"to","type":"address"},{"indexed":false,"name":"value","type":"uint256"}],"name":"Transfer","type":"event"}]', topics, data ) as evt FROM ethereum_logs WHERE topics[1] = '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef' ) SELECT id, lower(evt.event_params[1]) as from_address, lower(evt.event_params[2]) as to_address, evt.event_params[3] as value FROM decoded
Example: Calculate Token Amounts with U256
Copy
Ask AI
sql: | SELECT id, sender, recipient, value as value_wei, -- Convert wei to ETH using U256 u256_to_string( to_u256(amount) / to_u256('1000000000000000000') ) as value_eth FROM erc20_transfers
U256/I256 types support standard operators (+, -, *, /, %) which are automatically rewritten by the SQL preprocessor to their function equivalents.
Only select the columns you need to reduce data transfer and memory usage:
Copy
Ask AI
# Good - only select what you needsql: SELECT id, address, value FROM transfers# Avoid - selecting everythingsql: SELECT * FROM transfers
2. Filter early
Apply filters as early as possible in your pipeline:
Copy
Ask AI
# Good - filter in the first transformtransforms: filtered: type: sql sql: SELECT * FROM source WHERE chain_id = 1 enriched: type: sql from: filtered sql: SELECT *, 'ethereum' as network FROM filtered
3. Use appropriate data types
Cast to the correct types for your use case:
Copy
Ask AI
sql: | SELECT CAST(block_number AS BIGINT) as block_number, CAST(value AS DECIMAL(38, 0)) as value, CAST(timestamp AS TIMESTAMP) as timestamp FROM source
4. Preserve _gs_op for upserts
If your sink needs upsert semantics, include _gs_op:
Copy
Ask AI
sql: | SELECT id, address, value, _gs_op -- Include for proper upsert behavior FROM transfers
5. Use descriptive column aliases
Make your transformed data self-documenting:
Copy
Ask AI
sql: | SELECT id, from_address as sender, to_address as recipient, CAST(value AS DECIMAL) / 1e18 as amount_eth FROM transfers