> ## Documentation Index
> Fetch the complete documentation index at: https://docs.goldsky.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Kafka

> Publish processed data to Kafka topics with Avro or JSON serialization

## Overview

Publish processed data to Kafka topics with Avro or JSON serialization. Connection and Schema Registry credentials are supplied through a Goldsky [`secret`](/turbo-pipelines/secrets), not the pipeline YAML.

## Configuration

```yaml theme={null}
sinks:
  my_kafka_sink:
    type: kafka
    from: my_transform
    topic: my_output_topic
    data_format: avro
    topic_partitions: 10
    secret_name: MY_KAFKA_SECRET
```

## Parameters

<ParamField path="type" type="string" required>
  Must be `kafka`
</ParamField>

<ParamField path="from" type="string" required>
  The transform or source to read data from
</ParamField>

<ParamField path="topic" type="string" required>
  Kafka topic name to publish to
</ParamField>

<ParamField path="data_format" type="string" required>
  Serialization format. Supported values: `avro` or `json`.
</ParamField>

<ParamField path="secret_name" type="string">
  Name of the Goldsky secret (type `kafka`) containing the broker address, SASL credentials, and Schema Registry URL. Required when publishing to any broker that is not pre-configured for your deployment.
</ParamField>

<ParamField path="topic_partitions" type="number" default="4">
  Number of partitions to use if Goldsky creates the topic. Ignored for topics that already exist (the broker's existing partition count is used).
</ParamField>

<ParamField path="primary_key" type="string">
  Column (or comma-separated list of columns) used as the Kafka message key. Values are joined with `:` to form the key (e.g. `enriched_transaction_v2:0x6a7b...789d:1`). When set and the topic is created by Goldsky, the topic is created with `cleanup.policy=compact`. When omitted, the sink falls back to the upstream transform or source's primary key if one is defined.
</ParamField>

<ParamField path="parallelism" type="number" default="1">
  Number of parallel Kafka producers for increased throughput. Messages are routed to producers by hashing the message key, so per-key ordering is preserved across producers.
</ParamField>

<ParamField path="batch_size" type="number" default="10000">
  Maximum number of messages to batch before sending (maps to librdkafka's `batch.num.messages`).
</ParamField>

<ParamField path="batch_flush_interval" type="string" default="200ms">
  Maximum time to wait before flushing a batch (maps to librdkafka's `linger.ms`). Accepts humantime durations such as `100ms`, `1s`, or `500ms`.
</ParamField>

<ParamField path="message_max_bytes" type="number" default="10485760">
  Maximum size in bytes for a single Kafka protocol request (maps to librdkafka's `message.max.bytes`). Default is 10 MiB.
</ParamField>

## Secret structure

Create a Kafka secret with the Goldsky CLI:

```shell theme={null}
goldsky secret create --name MY_KAFKA_SECRET --value '{
  "type": "kafka",
  "bootstrapServers": "broker-1:9092,broker-2:9092",
  "securityProtocol": "SASL_SSL",
  "saslMechanism": "SCRAM-SHA-512",
  "saslJaasUsername": "...",
  "saslJaasPassword": "...",
  "schemaRegistryUrl": "https://schema-registry:8081",
  "schemaRegistryUsername": "...",
  "schemaRegistryPassword": "..."
}'
```

* `securityProtocol`: `PLAINTEXT`, `SASL_PLAINTEXT`, or `SASL_SSL`.
* `saslMechanism`: `PLAIN`, `SCRAM-SHA-256`, or `SCRAM-SHA-512` (only required for SASL protocols).
* `schemaRegistryUrl`: required when `data_format: avro`, optional for `data_format: json`.

## Features

* **Multiple Formats**: Choose between Avro (binary) or JSON serialization
* **Auto Schema Registration**: Schemas are automatically registered with Schema Registry (Avro only)
* **Avro Encoding**: Efficient binary serialization with schema evolution support
* **JSON Encoding**: Human-readable format without Schema Registry dependency
* **Operation Headers**: The `_gs_op` value for each row is published as the Kafka header `dbz.op` (`c` for insert, `u` for update, `d` for delete)
* **Topic Auto-Creation**: If the topic does not exist, the sink creates it with `retention.ms=-1`, the broker's default replication factor, and `cleanup.policy=compact` when `primary_key` is set

## Delivery and ordering

* **Producer acks**: Producers are configured with `acks=all` and a 10 minute `message.timeout.ms`.
* **Delivery semantics**: At-least-once. Idempotent produce is not enabled, so retries after a timeout can produce duplicates.
* **Backpressure**: When librdkafka's internal queue fills, the sink polls and retries the send rather than dropping messages.
* **Ordering**: Per-key ordering is preserved; global ordering across keys is not guaranteed. With `parallelism > 1`, all rows sharing a key are routed to the same producer (and therefore the same partition).

## Partitioning

The sink sets the Kafka message key to the joined primary-key values (see `primary_key` above). Partition assignment is performed by librdkafka's default partitioner.

**Implications of changing partition count:**

* Records with the same key always go to the same partition at a given point in time, ensuring ordering per key.
* Increasing partitions causes key redistribution — existing keys may map to different partitions after the change.
* Global ordering is not guaranteed; only per-key ordering is maintained.

## Examples

### Avro format

```yaml theme={null}
sinks:
  kafka_output:
    type: kafka
    from: enriched_events
    topic: processed.events
    data_format: avro
    secret_name: MY_KAFKA_SECRET
    topic_partitions: 10
    primary_key: id
```

### JSON format (no Schema Registry required)

```yaml theme={null}
sinks:
  kafka_output:
    type: kafka
    from: enriched_events
    topic: processed.events
    data_format: json
    secret_name: MY_KAFKA_SECRET
    topic_partitions: 10
```

The Kafka secret for this case does not need to include `schemaRegistryUrl`.

### High-throughput configuration

```yaml theme={null}
sinks:
  kafka_output:
    type: kafka
    from: high_volume_events
    topic: events.stream
    data_format: avro
    secret_name: MY_KAFKA_SECRET
    topic_partitions: 16
    primary_key: id
    parallelism: 4
    batch_size: 50000
    batch_flush_interval: 500ms
    message_max_bytes: 20000000
```

## Performance tuning

If your Kafka sink is not keeping up with pipeline throughput, tuning `parallelism` is the most effective first step. Each unit of parallelism runs a separate Kafka producer, allowing concurrent writes to the target cluster.

### When to tune

Check the **Kafka flush sink p95** metric in your pipeline's advanced metrics. If flush latency is consistently high, your sink is likely a bottleneck and increasing parallelism can help.

### Choosing a parallelism value

Set `parallelism` to a value that **evenly divides the number of Kafka partitions** in the target topic. This ensures producers are balanced across partitions and avoids uneven load distribution.

For example, with a 12-partition topic, good parallelism values are `2`, `3`, `4`, or `6`. Avoid values like `5` or `7` that don't divide evenly into 12.

<Warning>
  Make sure your Kafka cluster can sustain the additional connections before increasing parallelism. Each parallel producer opens its own connection to the cluster.
</Warning>

### Tuning batching

After adjusting parallelism, you can further optimize throughput by tuning batch settings:

* **`batch_size`**: Increase if you have high message volume and want fewer, larger writes. Larger batches improve throughput but add a small amount of latency.
* **`batch_flush_interval`**: Decrease for lower latency at the cost of smaller batches. Increase if you want to accumulate more messages before flushing. Accepts humantime durations like `50ms` or `1s`.
* **`message_max_bytes`**: Increase if you see errors related to message size limits, especially with large payloads.

### Example: tuning a 12-partition topic

```yaml theme={null}
sinks:
  kafka_output:
    type: kafka
    from: high_volume_events
    topic: events.stream
    data_format: avro
    secret_name: MY_KAFKA_SECRET
    topic_partitions: 12
    primary_key: id
    parallelism: 4          # Divides evenly into 12 partitions
    batch_size: 50000
    batch_flush_interval: 500ms
```
