> ## Documentation Index
> Fetch the complete documentation index at: https://docs.goldsky.com/llms.txt
> Use this file to discover all available pages before exploring further.

# HTTP Handler

> Enrich streaming data by calling external HTTP APIs

## Overview

The HTTP Handler transform allows you to enrich streaming data by calling external HTTP endpoints. This is useful for:

* Enriching blockchain data with off-chain information
* Calling ML models for predictions or classifications
* Integrating with third-party APIs for additional context
* Custom business logic hosted in external services

## Configuration

```yaml theme={null}
transforms:
  my_http_handler:
    type: handler
    from: <source-or-transform>
    url: <endpoint-url>
    primary_key: <column-name>
    secret_name: <secret-name>
    one_row_per_request: true | false # defaults to false
    headers:
      X-Custom-Header: value
```

### Parameters

<ParamField path="type" type="string" required>
  Must be `handler`
</ParamField>

<ParamField path="from" type="string" required>
  The source or transform to read data from
</ParamField>

<ParamField path="url" type="string" required>
  The HTTP endpoint URL to call. Must be a fully-qualified URL (e.g.,
  `https://api.example.com/enrich`). Requests are always sent as HTTP `POST`
  with a JSON body.
</ParamField>

<ParamField path="primary_key" type="string" required>
  The column that uniquely identifies each row
</ParamField>

<ParamField path="secret_name" type="string">
  The name of a Goldsky `httpauth` secret containing an authentication header
  to include with each request. Create one with `goldsky secret create` and
  select `httpauth` as the type.
</ParamField>

<ParamField path="headers" type="object">
  Additional HTTP headers sent with every request, as a map of header name to
  value. `Content-Type: application/json` is set automatically if not
  provided. Headers from `secret_name` are merged in on top of this map.
</ParamField>

<ParamField path="one_row_per_request" default="false" type="boolean">
  * `true`: Send each row individually as a single JSON object
  * `false` (default): Send multiple rows as a JSON array (batched, up to
    2000 rows per request)
</ParamField>

<ParamField path="payload_version" default="0" type="integer">
  Payload envelope version. `0` (default) sends raw row JSON. `1` wraps
  each row in `{"metadata": {"op": "<i|u|d>"}, "data": {...}}` so the
  receiver can distinguish inserts, updates, and deletes. Responses must
  match the same envelope version.
</ParamField>

<ParamField path="schema_override" type="object">
  Map of column name to Arrow data type string used to reshape the output
  schema. Add a new column by mapping a new name to a type (e.g. `risk_score:
      Float64`), change a column's type by mapping an existing name to a new
  type, or drop a column by mapping its name to `null`. Without this field,
  the output schema must match the input schema exactly.
</ParamField>

## Request Format

### Single Row Mode (`one_row_per_request: true`)

When enabled, each row is sent as an individual HTTP POST request with JSON body:

```json theme={null}
{
  "id": "abc123",
  "address": "0x742d35cc6634c0532925a3b844bc9e7595f0beb",
  "value": "1000000000000000000",
  "block_number": 12345678
}
```

**Use when:**

* Your API doesn't support batch processing
* Each request requires significant processing time
* You need real-time, row-by-row processing

### Batch Mode (`one_row_per_request: false`)

When disabled, multiple rows are sent as a JSON array:

```json theme={null}
[
  {
    "id": "abc123",
    "address": "0x742d35cc6634c0532925a3b844bc9e7595f0beb",
    "value": "1000000000000000000"
  },
  {
    "id": "def456",
    "address": "0x1234567890abcdef1234567890abcdef12345678",
    "value": "2000000000000000000"
  }
]
```

Batches contain up to 2000 rows per request. Larger input batches are
split into multiple sequential HTTP requests.

**Use when:**

* Your API supports batch processing
* You want to reduce network overhead
* Higher throughput is needed

## Response Format

Your HTTP endpoint must return JSON with the same structure as the input, plus any additional fields you want to add.

### Single Row Response

```json theme={null}
{
  "id": "abc123",
  "address": "0x742d35cc6634c0532925a3b844bc9e7595f0beb",
  "value": "1000000000000000000",
  "block_number": 12345678,
  "enriched_data": {
    "wallet_label": "Whale Wallet #42",
    "risk_score": 0.23,
    "is_exchange": false
  }
}
```

### Batch Response

```json theme={null}
[
  {
    "id": "abc123",
    "address": "0x742d35cc6634c0532925a3b844bc9e7595f0beb",
    "value": "1000000000000000000",
    "enriched_data": {
      "wallet_label": "Whale Wallet #42"
    }
  },
  {
    "id": "def456",
    "address": "0x1234567890abcdef1234567890abcdef12345678",
    "value": "2000000000000000000",
    "enriched_data": {
      "wallet_label": "Exchange Deposit"
    }
  }
]
```

<Note>
  By default, the response must include every column from the input schema
  with matching types, plus any new fields you want to add. To add, remove,
  or retype columns, declare them in `schema_override`.
</Note>

## Example: Enrich Transfers with Wallet Labels

```yaml theme={null}
name: enriched-transfers
resource_size: s

sources:
  ethereum_transfers:
    type: dataset
    dataset_name: ethereum.erc20_transfers
    version: 1.0.0
    start_at: latest

transforms:
  # First, filter to high-value transfers
  high_value:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        lower(from_address) as from_address,
        lower(to_address) as to_address,
        CAST(value AS DECIMAL) as value,
        contract_address,
        block_timestamp,
        _gs_op
      FROM ethereum_transfers
      WHERE CAST(value AS DECIMAL) > 1000000000000000000000

  # Enrich with external API
  enriched_transfers:
    type: handler
    from: high_value
    url: https://api.example.com/wallet-enrichment
    primary_key: id
    secret_name: WALLET_API_SECRET
    one_row_per_request: false

sinks:
  postgres_enriched:
    type: postgres
    from: enriched_transfers
    schema: public
    table: enriched_transfers
    secret_name: MY_POSTGRES
    primary_key: id
```

## Example API Implementation

Here's a simple example of an HTTP endpoint that enriches wallet data:

```python theme={null}
from flask import Flask, request, jsonify

app = Flask(__name__)

# Mock wallet database
WALLET_LABELS = {
    "0x742d35cc6634c0532925a3b844bc9e7595f0beb": {
        "label": "Binance Hot Wallet",
        "type": "exchange",
        "risk_score": 0.1
    },
    # ... more wallets
}

@app.route('/wallet-enrichment', methods=['POST'])
def enrich_wallets():
    data = request.json

    # Handle both single row and batch
    is_batch = isinstance(data, list)
    rows = data if is_batch else [data]

    enriched = []
    for row in rows:
        # Add enrichment data
        from_label = WALLET_LABELS.get(row.get('from_address', ''), {})
        to_label = WALLET_LABELS.get(row.get('to_address', ''), {})

        row['from_wallet_label'] = from_label.get('label', 'Unknown')
        row['from_wallet_type'] = from_label.get('type', 'unknown')
        row['to_wallet_label'] = to_label.get('label', 'Unknown')
        row['to_wallet_type'] = to_label.get('type', 'unknown')

        enriched.append(row)

    return jsonify(enriched if is_batch else enriched[0])

if __name__ == '__main__':
    app.run(port=8080)
```

## Example: ML Model Integration

Call a machine learning model to classify transactions:

```yaml theme={null}
transforms:
  suspicious_transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        from_address,
        to_address,
        CAST(value AS DECIMAL) as value,
        block_timestamp
      FROM ethereum_transfers

  ml_classified:
    type: handler
    from: suspicious_transfers
    url: https://ml-api.example.com/fraud-detection
    primary_key: id
    one_row_per_request: true # ML models often process one at a time
```

The ML endpoint might return:

```json theme={null}
{
  "id": "abc123",
  "from_address": "0x...",
  "to_address": "0x...",
  "value": "1000000000000000000",
  "block_timestamp": "2024-01-01T00:00:00Z",
  "fraud_probability": 0.87,
  "fraud_indicators": ["high_value", "new_wallet", "rapid_succession"],
  "recommended_action": "flag_for_review"
}
```

## Error Handling and Retries

The HTTP handler includes built-in retry logic:

* **Transient errors** (network errors, request timeouts, `408`, `429`, and
  all `5xx` responses): Retried indefinitely with exponential backoff. The
  pipeline blocks on the failing batch until the endpoint recovers.
* **Permanent errors** (other `4xx` responses, invalid JSON response): The
  pipeline fails immediately with no retries.
* **Request timeout**: Each request has a 300-second timeout by default,
  configured globally and not tunable per-transform.

<Tip>
  Ensure your endpoint can handle retries idempotently. The same request may be
  sent multiple times if there are transient failures.
</Tip>

## Performance Considerations

<AccordionGroup>
  <Accordion title="Latency Impact">
    HTTP handlers add latency to your pipeline:

    * Each request takes at least the network round-trip time
    * Plus your endpoint's processing time
    * Use batching (`one_row_per_request: false`) to reduce overhead
    * Consider caching frequently requested data in your API
  </Accordion>

  <Accordion title="Throughput">
    To maximize throughput:

    * Use batch mode when possible (10-100 rows per batch works well)
    * Ensure your API can handle concurrent requests
    * Scale your API horizontally if it becomes a bottleneck
    * Monitor API response times in your pipeline logs
  </Accordion>

  <Accordion title="Backpressure">
    If your HTTP endpoint is slow:

    * The entire pipeline will slow down to match
    * This prevents data loss and memory overflow
    * Scale your API or optimize its response time
    * Monitor logs for HTTP handler performance metrics
  </Accordion>
</AccordionGroup>

## Security Best Practices

<Steps>
  <Step title="Use HTTPS">
    Always use HTTPS endpoints to encrypt data in transit:

    ```yaml theme={null}
    url: https://api.example.com/enrich  # ✓ Good
    url: http://api.example.com/enrich   # ✗ Avoid
    ```
  </Step>

  <Step title="Use secret_name for authentication">
    Use the `secret_name` parameter with an `httpauth` secret to securely authenticate with your endpoint. This avoids exposing credentials in your pipeline configuration:

    ```yaml theme={null}
    transforms:
      enriched:
        type: handler
        from: my_source
        url: https://api.example.com/enrich
        primary_key: id
        secret_name: MY_API_SECRET
    ```

    Create the secret with `goldsky secret create` and select `httpauth` as the type.
  </Step>

  <Step title="Validate the authentication header">
    Verify the secret header in your endpoint:

    ```python theme={null}
    @app.route('/enrich', methods=['POST'])
    def enrich():
        api_key = request.headers.get('X-API-Key')
        if api_key != os.getenv('EXPECTED_API_KEY'):
            return jsonify({'error': 'Unauthorized'}), 401
        # ... process request
    ```
  </Step>

  <Step title="Validate Input">
    Always validate incoming data in your endpoint:

    ```python theme={null}
    def enrich():
        data = request.json
        if not isinstance(data, (dict, list)):
            return jsonify({'error': 'Invalid input'}), 400
        # ... process request
    ```
  </Step>

  <Step title="Rate Limiting">
    Implement rate limiting to prevent abuse:

    ```python theme={null}
    from flask_limiter import Limiter

    limiter = Limiter(app, default_limits=["1000 per hour"])

    @app.route('/enrich', methods=['POST'])
    @limiter.limit("100 per minute")
    def enrich():
        # ... process request
    ```
  </Step>
</Steps>

## Limitations

<Warning>
  HTTP Handler transforms have some limitations to be aware of:

  * **Schema changes require `schema_override`**: By default, the response
    schema must match the input. To add, remove, or retype columns, declare
    them explicitly in `schema_override`.
  * **Response size**: Very large responses (>10MB) may cause issues.
  * **Timeout**: Requests that exceed 300 seconds are canceled and retried as
    transient errors.
  * **Order**: In batch mode, responses must be returned in the same order as
    the input rows.
  * **Retriable failures block the pipeline**: The handler retries `5xx`,
    `408`, `429`, and network errors forever, so a persistently failing
    endpoint will stall the pipeline rather than drop rows.
</Warning>

## Debugging

View logs to debug HTTP handler issues:

```bash theme={null}
goldsky turbo logs my-pipeline
```

Look for:

* HTTP status codes (200 = success, 4xx/5xx = errors)
* Response times
* Retry attempts
* Error messages from your endpoint

Common issues:

* **"Connection refused"**: Your endpoint is not reachable
* **"Timeout"**: Your endpoint is too slow — optimize it, or reduce the number
  of rows per batch by lowering row volume upstream
* **"Schema mismatch"**: Response doesn't include all original fields (or
  the extra fields need to be declared in `schema_override`)
* **"Invalid JSON"**: Your endpoint returned malformed JSON

## Best Practices

<AccordionGroup>
  <Accordion title="1. Filter before enriching">
    Only send rows that need enrichment to reduce API calls:

    ```yaml theme={null}
    transforms:
      # Filter first
      needs_enrichment:
        type: sql
        sql: SELECT * FROM source WHERE value > 1000000

      # Then enrich
      enriched:
        type: handler
        from: needs_enrichment
        url: https://api.example.com/enrich
    ```
  </Accordion>

  <Accordion title="2. Use batching when possible">
    Batch mode reduces network overhead:

    ```yaml theme={null}
    enriched:
      type: handler
      one_row_per_request: false  # Batch mode
    ```
  </Accordion>

  <Accordion title="3. Keep endpoints fast">
    Aim for under 100ms response times:

    * Cache frequently accessed data
    * Use database indexes
    * Optimize expensive computations
    * Consider async processing for slow operations
  </Accordion>

  <Accordion title="4. Monitor your API">
    Track metrics like:

    * Request rate
    * Response times (p50, p95, p99)
    * Error rates
    * Resource usage (CPU, memory)
  </Accordion>

  <Accordion title="5. Handle failures gracefully">
    Make your endpoint resilient:

    * Return partial results on partial failures
    * Log errors for debugging
    * Implement circuit breakers for downstream dependencies
    * Provide fallback values when enrichment fails
  </Accordion>
</AccordionGroup>
