Skip to main content

Overview

In this quickstart, you’ll create a simple Turbo pipeline that:
  1. Reads ERC-20 transfer data from a Goldsky dataset
  2. Writes the results to a blackhole sink
  3. Inspect the data live
  4. [Optional] Write the data into a PostgreSQL Database
This replaces the need for complex Mirror v1 configurations with a simple, declarative YAML file.

Prerequisites

Step 1: Create Your Pipeline

Create a file named erc20-pipeline.yaml:
erc20-pipeline.yaml
name: erc20-transfers
resource_size: s

sources:
  base_erc20_transfers:
    type: dataset
    dataset_name: base.erc20_transfers
    version: 1.2.0
    start_at: latest

transforms: {}

sinks:
  blackhole_sink:
    type: blackhole
    from: base_erc20_transfers
Sources:
  • We’re using the base.erc20_transfers dataset (version 1.2.0)
  • start_at: latest means we’ll only process new transfers going forward
Sinks:
  • The blackhole sink discards the data but allows you to test the pipeline
  • Perfect for development and testing before adding real outputs

Step 2: Deploy Your Pipeline

Apply your pipeline configuration:
goldsky turbo apply erc20-pipeline.yaml
You should see output confirming the deployment:
✓ Pipeline erc20-transfers created

Step 3: Inspect the Data Live

Now that the pipeline is running, you can replace erc20-pipeline.yaml with the name of the pipeline erc20-transfers if desired
View the live data inspect to see data flowing through your pipeline:
goldsky turbo inspect erc20-pipeline.yaml
Use the logs command to the running output of the pipeline:
goldsky turbo logs erc20-pipeline.yaml
You should see output showing the pipeline processing ERC-20 transfers in real-time.

Step 4: Add a Filter for USDC Only

Now let’s add a SQL transform to filter only USDC transfers. Update your erc20-pipeline.yaml:
erc20-pipeline.yaml
name: erc20-transfers
resource_size: s

sources:
  base_erc20_transfers:
    type: dataset
    dataset_name: base.erc20_transfers
    version: 1.2.0
    start_at: latest

transforms:
  # Filter to only USDC transfers
  usdc_transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        sender,
        recipient,
        amount,
        to_timestamp(block_timestamp) as block_time
      FROM base_erc20_transfers
      WHERE address = lower('0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913')

sinks:
  blackhole_sink:
    type: blackhole
    from: usdc_transfers
Transforms:
  • Added a SQL transform named usdc_transfers that filters for the USDC contract on Base
  • The contract address 0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913 is USDC on Base
  • We use lower() to ensure consistent case-insensitive matching
  • Selected only essential columns: sender, recipient, amount, and block_time
  • Converted the Unix timestamp to a human-readable format using to_timestamp()
Sinks:
  • Updated the sink to read from usdc_transfers instead of the raw source
  • Now only USDC transfers will flow through the pipeline
You can achieve the same filtering using a TypeScript transform instead of SQL. Return null to filter out records:
name: erc20-transfer
resource_size: s

sources:
base_erc20_transfers:
  type: dataset
  dataset_name: base.erc20_transfers
  version: 1.2.0
  start_at: latest

transforms:
usdc_transfers:
  type: script
  primary_key: id
  language: typescript
  from: base_erc20_transfers
  schema:
    id: string
    sender: string
    recipient: string
    amount: string
    block_time: string
  script: |
    function transform(input) {
      const USDC_ADDRESS = '0x833589fcd6edb6e08f4c7c32d4f71b54bda02913';

      // Return null to filter out non-USDC transfers
      if (!input?.address || input.address.toLowerCase() !== USDC_ADDRESS) {
        return null;
      }

      // Return a custom schema with only the fields we need
      return {
        id: input.id,
        sender: input.sender,
        recipient: input.recipient,
        amount: input.amount,
        block_time: new Date(input.block_timestamp * 1000).toISOString()
      };
    }
sinks:
blackhole_sink:
  type: blackhole
  from: usdc_transfers
Key features:
  • Return null to filter: Records that don’t match your criteria can be filtered out by returning null
  • Custom output schema: Use the schema field to define a different output schema than the input. This lets you reshape data, rename fields, or include only specific columns
  • Flexible transformations: Perform calculations, string manipulation, and conditional logic
TypeScript Benefits:
  • More flexible data transformations and complex logic
  • Familiar syntax for developers
  • Type safety and autocompletion support
  • Can perform calculations, string manipulation, and conditional logic
SQL Benefits:
  • Generally faster for simple filtering and aggregations
  • More concise for straightforward queries
  • Better for set-based operations
Choose TypeScript when you need complex transformations or custom logic. Choose SQL for simple filters and aggregations.

Redeploy and Inspect

Apply the updated configuration:
goldsky turbo apply erc20-pipeline.yaml
Now inspect output of your new usdc_transfers.
# the -n argument allows you to target one or more sources or transforms
goldsky turbo inspect erc20-pipeline.yaml -n usdc_transfers
You should now see only USDC transfers! Compare this to the original source data:
# View the filtered USDC data
goldsky turbo inspect erc20-pipeline.yaml -n usdc_transfers

# View all ERC-20 transfers (before filtering)
goldsky turbo inspect erc20-pipeline.yaml -n base_erc20_transfers

# View multiple stages at once
goldsky turbo inspect erc20-pipeline.yaml -n base_erc20_transfers,usdc_transfers
The --topology_node_keys parameter lets you inspect data at any point in your pipeline. Use transform names to see filtered/transformed data, or source names to see raw data. You can specify multiple keys separated by commas to view multiple stages simultaneously.

Optional: Write to PostgreSQL

To persist your data to a PostgreSQL database, update your pipeline configuration:

1. Create a Secret

Store your PostgreSQL credentials:
goldsky secret create MY_POSTGRES_SECRET
When prompted, enter your PostgreSQL connection string:
postgres://username:password@host:port/database

2. Update Your Pipeline

Modify erc20-pipeline.yaml to add a PostgreSQL sink:
erc20-pipeline.yaml
name: erc20-transfers
resource_size: s

sources:
  base_erc20_transfers:
    type: dataset
    dataset_name: base.erc20_transfers
    version: 1.2.0
    start_at: latest

transforms:
  # Filter to only USDC transfers
  usdc_transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        sender,
        recipient,
        amount,
        to_timestamp(block_timestamp) as block_time
      FROM base_erc20_transfers
      WHERE address = lower('0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913')

sinks:
  postgres_sink:
    type: postgres
    schema: public
    table: usdc_transfers
    secret_name: MY_POSTGRES_SECRET
    from: usdc_transfers
    primary_key: id

3. Redeploy

Apply the updated configuration:
goldsky turbo apply erc20-pipeline.yaml

4. Query Your Data

Connect to PostgreSQL and query the USDC transfers:
SELECT
  sender,
  recipient,
  amount,
  block_time
FROM public.usdc_transfers
ORDER BY block_time DESC
LIMIT 10;

What You’ve Learned

Datasets provide pre-indexed blockchain data without managing Kafka topics. They’re the easiest way to get started with Turbo Pipelines.
The blackhole sink is perfect for testing pipelines and understanding data flow before adding real destinations.
PostgreSQL sinks automatically create tables based on your data schema, reducing manual setup.
You can easily modify and redeploy pipelines by updating your YAML configuration and running goldsky turbo apply.

Next Steps