Skip to main content

Overview

Sinks are the final destination for data in your Turbo pipelines. They write processed data to external systems like databases, data warehouses, or HTTP endpoints.

Available Sinks

Common Parameters

All sinks share these common parameters:
type
string
required
The sink type (postgres, clickhouse, webhook, etc.)
from
string
required
The transform or source to read data from
secret_name
string
Name of the secret containing connection credentials (required for database sinks)

PostgreSQL Sink

Write data to PostgreSQL databases with automatic table creation and upsert support.

Configuration

sinks:
  my_postgres_sink:
    type: postgres
    from: my_transform
    schema: public
    table: my_table
    secret_name: MY_POSTGRES_SECRET
    primary_key: id  # Optional - for upsert behavior

Parameters

schema
string
required
PostgreSQL schema name (e.g., public, analytics)
table
string
required
Table name to write to. Will be created automatically if it doesn’t exist.
primary_key
string
Optional. Column to use for upserts. If specified, existing rows will be updated instead of inserted.

Secret Format

The secret should contain a PostgreSQL connection string:
postgres://username:password@host:port/database
Create it with:
goldsky secret create MY_POSTGRES_SECRET

Features

  • Auto Table Creation: Tables are created automatically based on your data schema
  • Upsert Support: Use primary_key to update existing rows
  • Type Handling: Automatic type conversion from Arrow to PostgreSQL types
  • Large Numbers: U256/I256 types are stored as NUMERIC(78,0)

Example

sinks:
  postgres_transfers:
    type: postgres
    from: filtered_transfers
    schema: public
    table: erc20_transfers
    secret_name: MY_POSTGRES
    primary_key: id

ClickHouse Sink

Write data to ClickHouse for high-performance analytical queries.

Configuration

sinks:
  my_clickhouse_sink:
    type: clickhouse
    from: my_transform
    table: my_table
    secret_name: MY_CLICKHOUSE_SECRET
    primary_key: id

Parameters

table
string
required
ClickHouse table name
primary_key
string
required
Primary key column for the table

Secret Format

The secret should contain ClickHouse connection details:
https://username:password@host:port/database

Example: Solana Blocks to ClickHouse

name: solana-blocks-analytics
resource_size: m

sources:
  solana_blocks:
    type: solana_source
    start_block: "312229952"
    num_workers: "50"

transforms:
  parsed_blocks:
    type: sql
    primary_key: slot
    sql: |
      SELECT
        slot,
        to_timestamp_micros(blockTime * 1000000) as block_timestamp,
        array_length(transactions) as transaction_count,
        blockhash,
        _gs_op
      FROM solana_blocks

sinks:
  clickhouse_blocks:
    type: clickhouse
    from: parsed_blocks
    table: solana_blocks
    primary_key: slot
    secret_name: MY_CLICKHOUSE
ClickHouse is ideal for high-volume data and analytical queries. Use it when you need to run aggregations and analytics on your blockchain data.

Webhook Sink

Send data to HTTP endpoints as JSON payloads.

Configuration

sinks:
  my_webhook_sink:
    type: webhook
    from: my_transform
    url: https://api.example.com/endpoint
    one_row_per_request: true | false

Parameters

url
string
required
The HTTP endpoint URL to send data to
one_row_per_request
boolean
default:"false"
If true, sends each row individually. If false, sends batches of rows.

Request Format

Data is sent as JSON: Single row (one_row_per_request: true):
{
  "id": "abc123",
  "address": "0x...",
  "value": "1000000",
  "timestamp": "2024-01-01T00:00:00Z"
}
Batch (one_row_per_request: false):
[
  {
    "id": "abc123",
    "address": "0x...",
    "value": "1000000"
  },
  {
    "id": "def456",
    "address": "0x...",
    "value": "2000000"
  }
]

Example: Send High-Value Transfers to API

transforms:
  high_value_transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT *
      FROM erc20_transfers
      WHERE CAST(value AS DECIMAL) > 1000000000000000000000

sinks:
  webhook_alerts:
    type: webhook
    from: high_value_transfers
    url: https://alerts.example.com/high-value-transfer
    one_row_per_request: true
The webhook sink includes retry logic with exponential backoff for transient failures. Configure timeouts on your endpoint accordingly.

Kafka Sink

Publish processed data back to Kafka topics with Avro serialization.

Configuration

sinks:
  my_kafka_sink:
    type: kafka
    from: my_transform
    topic: my_output_topic
    topic_partitions: 10
    data_format: avro

Parameters

topic
string
required
Kafka topic name to publish to
topic_partitions
number
Number of partitions for the topic (created if doesn’t exist)
data_format
string
default:"avro"
Serialization format. Currently only avro is supported.

Features

  • Auto Schema Registration: Schemas are automatically registered with Schema Registry
  • Avro Encoding: Efficient binary serialization
  • Operation Headers: _gs_op column is included as a message header (dbz.op)

Example

sinks:
  kafka_output:
    type: kafka
    from: enriched_events
    topic: processed.events
    topic_partitions: 10
    data_format: avro

S3 Sink

Write data to S3-compatible object storage services.

Configuration

sinks:
  my_s3_sink:
    type: s3_sink
    from: my_transform
    endpoint: https://s3.amazonaws.com
    access_key_id: <your-access-key>
    secret_access_key: <your-secret-key>
    region: us-east-1
    bucket: my-bucket
    prefix: my-prefix  # Optional

Parameters

endpoint
string
required
S3-compatible endpoint URL (e.g., https://s3.amazonaws.com, https://t3.storage.dev)
access_key_id
string
required
Access key ID for authentication
secret_access_key
string
required
Secret access key for authentication
region
string
required
AWS region or auto for S3-compatible services
bucket
string
required
Target bucket name
prefix
string
Optional path prefix for objects within the bucket
The S3 sink works with any S3-compatible storage service, including AWS S3, MinIO, Cloudflare R2, and others. Set region: auto for non-AWS services.
Output data to stdout in JSON format. Useful for debugging and development.

Configuration

sinks:
  my_print_sink:
    type: print
    from: my_transform

Output Format

Each row is printed as JSON with a row kind prefix:
[INSERT] {"id": "abc123", "address": "0x...", "value": "1000"}
[UPDATE] {"id": "def456", "address": "0x...", "value": "2000"}
[DELETE] {"id": "ghi789", "address": "0x...", "value": "3000"}

Example

name: debug-pipeline
sources:
  test_data:
    type: dataset
    dataset_name: ethereum.blocks
    version: 1.0.0
    start_at: latest

transforms:
  sample:
    type: sql
    primary_key: number
    sql: SELECT number, hash, timestamp FROM test_data LIMIT 10

sinks:
  debug_output:
    type: print
    from: sample
Print sinks should only be used for development and debugging. They’re not suitable for production pipelines.

Blackhole Sink

Discard all data without performing any operation. Useful for testing pipeline performance without I/O overhead.

Configuration

sinks:
  my_blackhole_sink:
    type: blackhole
    from: my_transform

Use Cases

  • Performance Testing: Measure transform performance without sink overhead
  • Pipeline Validation: Test that data flows correctly without writing anywhere
  • Development: Temporarily disable output while working on transforms

Example

sinks:
  perf_test:
    type: blackhole
    from: complex_transform

Multiple Sinks

You can write the same data to multiple destinations:
transforms:
  processed_data:
    type: sql
    primary_key: id
    sql: SELECT * FROM source

sinks:
  # Write to PostgreSQL
  postgres_archive:
    type: postgres
    from: processed_data
    schema: public
    table: archive
    secret_name: MY_POSTGRES

  # Send to webhook
  webhook_notification:
    type: webhook
    from: processed_data
    url: https://api.example.com/notify

  # Publish to Kafka
  kafka_downstream:
    type: kafka
    from: processed_data
    topic: processed.events
Each sink operates independently - failures in one don’t affect others.

Sink Behavior

Checkpointing

All sinks participate in Turbo’s checkpointing system:
  • Data is buffered until a checkpoint completes
  • Only acknowledged data is committed to the sink
  • Ensures exactly-once delivery semantics

Backpressure

Sinks apply backpressure to the pipeline:
  • If a sink can’t keep up, the entire pipeline slows down
  • Prevents data loss and memory overflow
  • Monitor sink performance to identify bottlenecks

Error Handling

Sink errors are handled with retries:
  • Transient errors (network issues) are retried with exponential backoff
  • Permanent errors (invalid data, schema mismatches) fail the pipeline
  • Check logs for detailed error messages

Best Practices

  • PostgreSQL: Transactional data, updates, relational queries
  • ClickHouse: High-volume analytics, aggregations, time-series
  • Webhook: Real-time notifications, integrations with external systems
  • Kafka: Downstream processing, event sourcing, decoupling systems
For upsert behavior in databases, choose a stable primary key:
sinks:
  postgres_sink:
    primary_key: id  # Use a unique, stable identifier
Use logs and metrics to track:
  • Write throughput
  • Error rates
  • Latency
goldsky turbo logs my-pipeline
Always use secrets for database credentials:
# Create secret
goldsky secret create MY_DB_SECRET

# Reference in pipeline
sinks:
  my_sink:
    secret_name: MY_DB_SECRET

Next Steps