Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.goldsky.com/llms.txt

Use this file to discover all available pages before exploring further.

Overview

Publish processed data to Kafka topics with Avro or JSON serialization. Connection and Schema Registry credentials are supplied through a Goldsky secret, not the pipeline YAML.

Configuration

sinks:
  my_kafka_sink:
    type: kafka
    from: my_transform
    topic: my_output_topic
    data_format: avro
    topic_partitions: 10
    secret_name: MY_KAFKA_SECRET

Parameters

type
string
required
Must be kafka
from
string
required
The transform or source to read data from
topic
string
required
Kafka topic name to publish to
data_format
string
required
Serialization format. Supported values: avro or json.
secret_name
string
Name of the Goldsky secret (type kafka) containing the broker address, SASL credentials, and Schema Registry URL. Required when publishing to any broker that is not pre-configured for your deployment.
topic_partitions
number
default:"4"
Number of partitions to use if Goldsky creates the topic. Ignored for topics that already exist (the broker’s existing partition count is used).
primary_key
string
Column (or comma-separated list of columns) used as the Kafka message key. Values are joined with : to form the key (e.g. enriched_transaction_v2:0x6a7b...789d:1). When set and the topic is created by Goldsky, the topic is created with cleanup.policy=compact. When omitted, the sink falls back to the upstream transform or source’s primary key if one is defined.
parallelism
number
default:"1"
Number of parallel Kafka producers for increased throughput. Messages are routed to producers by hashing the message key, so per-key ordering is preserved across producers.
batch_size
number
default:"10000"
Maximum number of messages to batch before sending (maps to librdkafka’s batch.num.messages).
batch_flush_interval
string
default:"200ms"
Maximum time to wait before flushing a batch (maps to librdkafka’s linger.ms). Accepts humantime durations such as 100ms, 1s, or 500ms.
message_max_bytes
number
default:"10485760"
Maximum size in bytes for a single Kafka protocol request (maps to librdkafka’s message.max.bytes). Default is 10 MiB.

Secret structure

Create a Kafka secret with the Goldsky CLI:
goldsky secret create --name MY_KAFKA_SECRET --value '{
  "type": "kafka",
  "bootstrapServers": "broker-1:9092,broker-2:9092",
  "securityProtocol": "SASL_SSL",
  "saslMechanism": "SCRAM-SHA-512",
  "saslJaasUsername": "...",
  "saslJaasPassword": "...",
  "schemaRegistryUrl": "https://schema-registry:8081",
  "schemaRegistryUsername": "...",
  "schemaRegistryPassword": "..."
}'
  • securityProtocol: PLAINTEXT, SASL_PLAINTEXT, or SASL_SSL.
  • saslMechanism: PLAIN, SCRAM-SHA-256, or SCRAM-SHA-512 (only required for SASL protocols).
  • schemaRegistryUrl: required when data_format: avro, optional for data_format: json.

Features

  • Multiple Formats: Choose between Avro (binary) or JSON serialization
  • Auto Schema Registration: Schemas are automatically registered with Schema Registry (Avro only)
  • Avro Encoding: Efficient binary serialization with schema evolution support
  • JSON Encoding: Human-readable format without Schema Registry dependency
  • Operation Headers: The _gs_op value for each row is published as the Kafka header dbz.op (c for insert, u for update, d for delete)
  • Topic Auto-Creation: If the topic does not exist, the sink creates it with retention.ms=-1, the broker’s default replication factor, and cleanup.policy=compact when primary_key is set

Delivery and ordering

  • Producer acks: Producers are configured with acks=all and a 10 minute message.timeout.ms.
  • Delivery semantics: At-least-once. Idempotent produce is not enabled, so retries after a timeout can produce duplicates.
  • Backpressure: When librdkafka’s internal queue fills, the sink polls and retries the send rather than dropping messages.
  • Ordering: Per-key ordering is preserved; global ordering across keys is not guaranteed. With parallelism > 1, all rows sharing a key are routed to the same producer (and therefore the same partition).

Partitioning

The sink sets the Kafka message key to the joined primary-key values (see primary_key above). Partition assignment is performed by librdkafka’s default partitioner. Implications of changing partition count:
  • Records with the same key always go to the same partition at a given point in time, ensuring ordering per key.
  • Increasing partitions causes key redistribution — existing keys may map to different partitions after the change.
  • Global ordering is not guaranteed; only per-key ordering is maintained.

Examples

Avro format

sinks:
  kafka_output:
    type: kafka
    from: enriched_events
    topic: processed.events
    data_format: avro
    secret_name: MY_KAFKA_SECRET
    topic_partitions: 10
    primary_key: id

JSON format (no Schema Registry required)

sinks:
  kafka_output:
    type: kafka
    from: enriched_events
    topic: processed.events
    data_format: json
    secret_name: MY_KAFKA_SECRET
    topic_partitions: 10
The Kafka secret for this case does not need to include schemaRegistryUrl.

High-throughput configuration

sinks:
  kafka_output:
    type: kafka
    from: high_volume_events
    topic: events.stream
    data_format: avro
    secret_name: MY_KAFKA_SECRET
    topic_partitions: 16
    primary_key: id
    parallelism: 4
    batch_size: 50000
    batch_flush_interval: 500ms
    message_max_bytes: 20000000

Performance tuning

If your Kafka sink is not keeping up with pipeline throughput, tuning parallelism is the most effective first step. Each unit of parallelism runs a separate Kafka producer, allowing concurrent writes to the target cluster.

When to tune

Check the Kafka flush sink p95 metric in your pipeline’s advanced metrics. If flush latency is consistently high, your sink is likely a bottleneck and increasing parallelism can help.

Choosing a parallelism value

Set parallelism to a value that evenly divides the number of Kafka partitions in the target topic. This ensures producers are balanced across partitions and avoids uneven load distribution. For example, with a 12-partition topic, good parallelism values are 2, 3, 4, or 6. Avoid values like 5 or 7 that don’t divide evenly into 12.
Make sure your Kafka cluster can sustain the additional connections before increasing parallelism. Each parallel producer opens its own connection to the cluster.

Tuning batching

After adjusting parallelism, you can further optimize throughput by tuning batch settings:
  • batch_size: Increase if you have high message volume and want fewer, larger writes. Larger batches improve throughput but add a small amount of latency.
  • batch_flush_interval: Decrease for lower latency at the cost of smaller batches. Increase if you want to accumulate more messages before flushing. Accepts humantime durations like 50ms or 1s.
  • message_max_bytes: Increase if you see errors related to message size limits, especially with large payloads.

Example: tuning a 12-partition topic

sinks:
  kafka_output:
    type: kafka
    from: high_volume_events
    topic: events.stream
    data_format: avro
    secret_name: MY_KAFKA_SECRET
    topic_partitions: 12
    primary_key: id
    parallelism: 4          # Divides evenly into 12 partitions
    batch_size: 50000
    batch_flush_interval: 500ms