Turbo pipelines are defined using YAML configuration files that specify sources, transforms, and sinks. This guide covers the complete configuration syntax.
name: <pipeline-name>resource_size: s | m | ldescription: <optional-description>job: false # Optional: set to true for one-time jobssources: <source-config>transforms: <transform-config>sinks: <sink-config>
Run the pipeline as a one-time job instead of a long-running stream. Jobs run to completion and auto-delete after 1 hour. See the Job Mode guide for details.
transforms: filtered_transfers: type: sql primary_key: id sql: | SELECT id, lower(contract_address) as token, CAST(value AS DECIMAL) as amount FROM polygon_transfers WHERE CAST(value AS DECIMAL) > 1000000
Here’s a complete pipeline that demonstrates multiple features:
Copy
Ask AI
name: multi-chain-token-trackerresource_size: mdescription: Track ERC-20 transfers across multiple chains for specific walletssources: # Ethereum transfers ethereum_transfers: type: dataset dataset_name: ethereum.erc20_transfers version: 1.2.0 start_at: latest # Polygon transfers polygon_transfers: type: dataset dataset_name: matic.erc20_transfers version: 1.2.0 start_at: latesttransforms: # Dynamic table for tracked wallets tracked_wallets: type: dynamic_table backend_type: Postgres backend_entity_name: user_wallets secret_name: MY_POSTGRES # Combine both chains all_transfers: type: sql primary_key: id sql: | SELECT *, 'polygon' as chain, 137 as chain_id FROM ethereum_transfers UNION ALL SELECT *, 'ethereum' as chain, 1 as chain_id FROM polygon_transfers # Add transfer direction final_transfers: type: sql primary_key: id sql: | SELECT *, CASE WHEN dynamic_table_check('tracked_wallets', sender) THEN 'outgoing' WHEN dynamic_table_check('tracked_wallets', recipient) THEN 'incoming' ELSE 'unknown' END as direction FROM all_transfers WHERE dynamic_table_check('tracked_wallets', sender) OR dynamic_table_check('tracked_wallets', recipient)sinks: # Store in PostgreSQL postgres_archive: type: postgres from: final_transfers schema: public table: wallet_transfers secret_name: MY_POSTGRES primary_key: id # Send alerts for high-value transfers webhook_alerts: type: webhook from: final_transfers url: https://api.example.com/transfer-alert # Publish to analytics Kafka topic kafka_analytics: type: kafka from: final_transfers topic: wallet.transfers.analytics topic_partitions: 10 data_format: avro
Throughout your pipeline, you reference sources and transforms by their configured names:
Copy
Ask AI
sources: my_source: # Reference name type: dataset # ...transforms: transform_1: # Reference name type: sql sql: SELECT * FROM my_source # Use source reference name transform_2: # Reference name type: sql from: transform_1 # Use transform reference name sql: SELECT * FROM transform_1sinks: my_sink: type: postgres from: transform_2 # Use transform reference name
Naming Guidelines:
Use descriptive, lowercase names with underscores or hyphens
transforms: # Filter to only USDC transfers over $10,000 large_usdc_transfers: type: sql sql: | SELECT * FROM transfers WHERE contract = lower('0x...') AND CAST(value AS DECIMAL) > 10000000000 -- $10k in 6 decimals
3. Start with small resource sizes
Begin with resource_size: s and scale up if needed:
Copy
Ask AI
name: my-pipelineresource_size: s # Start small, monitor performance
4. Use `start_at: latest` for new pipelines
Unless you need historical data, start from the latest:
Copy
Ask AI
sources: my_source: start_at: latest # Only process new data