Skip to main content

Overview

The HTTP Handler transform allows you to enrich streaming data by calling external HTTP endpoints. This is useful for:
  • Enriching blockchain data with off-chain information
  • Calling ML models for predictions or classifications
  • Integrating with third-party APIs for additional context
  • Custom business logic hosted in external services

Configuration

transforms:
  my_http_handler:
    type: handler
    from: <source-or-transform>
    url: <endpoint-url>
    primary_key: <column-name>
    one_row_per_request: true | false # defaults to false

Parameters

type
string
required
Must be handler
from
string
required
The source or transform to read data from
url
string
required
The HTTP endpoint URL to call. Must be a fully-qualified URL (e.g., https://api.example.com/enrich)
primary_key
string
required
The column that uniquely identifies each row
one_row_per_request
boolean
default:"true"
  • true: Send each row individually as a single JSON object
  • false (default): Send multiple rows as a JSON array (batched)

Request Format

Single Row Mode (one_row_per_request: true)

When enabled, each row is sent as an individual HTTP POST request with JSON body:
{
  "id": "abc123",
  "address": "0x742d35cc6634c0532925a3b844bc9e7595f0beb",
  "value": "1000000000000000000",
  "block_number": 12345678
}
Use when:
  • Your API doesn’t support batch processing
  • Each request requires significant processing time
  • You need real-time, row-by-row processing

Batch Mode (one_row_per_request: false)

When disabled, multiple rows are sent as a JSON array:
[
  {
    "id": "abc123",
    "address": "0x742d35cc6634c0532925a3b844bc9e7595f0beb",
    "value": "1000000000000000000"
  },
  {
    "id": "def456",
    "address": "0x1234567890abcdef1234567890abcdef12345678",
    "value": "2000000000000000000"
  }
]
Use when:
  • Your API supports batch processing
  • You want to reduce network overhead
  • Higher throughput is needed

Response Format

Your HTTP endpoint must return JSON with the same structure as the input, plus any additional fields you want to add.

Single Row Response

{
  "id": "abc123",
  "address": "0x742d35cc6634c0532925a3b844bc9e7595f0beb",
  "value": "1000000000000000000",
  "block_number": 12345678,
  "enriched_data": {
    "wallet_label": "Whale Wallet #42",
    "risk_score": 0.23,
    "is_exchange": false
  }
}

Batch Response

[
  {
    "id": "abc123",
    "address": "0x742d35cc6634c0532925a3b844bc9e7595f0beb",
    "value": "1000000000000000000",
    "enriched_data": {
      "wallet_label": "Whale Wallet #42"
    }
  },
  {
    "id": "def456",
    "address": "0x1234567890abcdef1234567890abcdef12345678",
    "value": "2000000000000000000",
    "enriched_data": {
      "wallet_label": "Exchange Deposit"
    }
  }
]
The response must include all original fields plus any new enriched fields. Missing original fields will cause errors.

Example: Enrich Transfers with Wallet Labels

name: enriched-transfers
resource_size: s

sources:
  ethereum_transfers:
    type: dataset
    dataset_name: ethereum.erc20_transfers
    version: 1.0.0
    start_at: latest

transforms:
  # First, filter to high-value transfers
  high_value:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        lower(from_address) as from_address,
        lower(to_address) as to_address,
        CAST(value AS DECIMAL) as value,
        contract_address,
        block_timestamp,
        _gs_op
      FROM ethereum_transfers
      WHERE CAST(value AS DECIMAL) > 1000000000000000000000

  # Enrich with external API
  enriched_transfers:
    type: handler
    from: high_value
    url: https://api.example.com/wallet-enrichment
    primary_key: id
    one_row_per_request: false

sinks:
  postgres_enriched:
    type: postgres
    from: enriched_transfers
    schema: public
    table: enriched_transfers
    secret_name: MY_POSTGRES
    primary_key: id

Example API Implementation

Here’s a simple example of an HTTP endpoint that enriches wallet data:
from flask import Flask, request, jsonify

app = Flask(__name__)

# Mock wallet database
WALLET_LABELS = {
    "0x742d35cc6634c0532925a3b844bc9e7595f0beb": {
        "label": "Binance Hot Wallet",
        "type": "exchange",
        "risk_score": 0.1
    },
    # ... more wallets
}

@app.route('/wallet-enrichment', methods=['POST'])
def enrich_wallets():
    data = request.json

    # Handle both single row and batch
    is_batch = isinstance(data, list)
    rows = data if is_batch else [data]

    enriched = []
    for row in rows:
        # Add enrichment data
        from_label = WALLET_LABELS.get(row.get('from_address', ''), {})
        to_label = WALLET_LABELS.get(row.get('to_address', ''), {})

        row['from_wallet_label'] = from_label.get('label', 'Unknown')
        row['from_wallet_type'] = from_label.get('type', 'unknown')
        row['to_wallet_label'] = to_label.get('label', 'Unknown')
        row['to_wallet_type'] = to_label.get('type', 'unknown')

        enriched.append(row)

    return jsonify(enriched if is_batch else enriched[0])

if __name__ == '__main__':
    app.run(port=8080)

Example: ML Model Integration

Call a machine learning model to classify transactions:
transforms:
  suspicious_transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        from_address,
        to_address,
        CAST(value AS DECIMAL) as value,
        block_timestamp
      FROM ethereum_transfers

  ml_classified:
    type: handler
    from: suspicious_transfers
    url: https://ml-api.example.com/fraud-detection
    primary_key: id
    one_row_per_request: true  # ML models often process one at a time
The ML endpoint might return:
{
  "id": "abc123",
  "from_address": "0x...",
  "to_address": "0x...",
  "value": "1000000000000000000",
  "block_timestamp": "2024-01-01T00:00:00Z",
  "fraud_probability": 0.87,
  "fraud_indicators": ["high_value", "new_wallet", "rapid_succession"],
  "recommended_action": "flag_for_review"
}

Error Handling and Retries

The HTTP handler includes built-in retry logic:
  • Transient errors (network timeouts, 5xx responses): Retried with exponential backoff
  • Permanent errors (4xx responses, invalid JSON): Pipeline fails after max retries
  • Timeout: Configurable timeout per request (default: 30 seconds)
Ensure your endpoint can handle retries idempotently. The same request may be sent multiple times if there are transient failures.

Performance Considerations

HTTP handlers add latency to your pipeline:
  • Each request takes at least the network round-trip time
  • Plus your endpoint’s processing time
  • Use batching (one_row_per_request: false) to reduce overhead
  • Consider caching frequently requested data in your API
To maximize throughput:
  • Use batch mode when possible (10-100 rows per batch works well)
  • Ensure your API can handle concurrent requests
  • Scale your API horizontally if it becomes a bottleneck
  • Monitor API response times in your pipeline logs
If your HTTP endpoint is slow:
  • The entire pipeline will slow down to match
  • This prevents data loss and memory overflow
  • Scale your API or optimize its response time
  • Monitor logs for HTTP handler performance metrics

Security Best Practices

1

Use HTTPS

Always use HTTPS endpoints to encrypt data in transit:
url: https://api.example.com/enrich  # ✓ Good
url: http://api.example.com/enrich   # ✗ Avoid
2

Implement Authentication

Use API keys or tokens in your endpoint:
@app.route('/enrich', methods=['POST'])
def enrich():
    api_key = request.headers.get('X-API-Key')
    if api_key != os.getenv('EXPECTED_API_KEY'):
        return jsonify({'error': 'Unauthorized'}), 401
    # ... process request
3

Validate Input

Always validate incoming data in your endpoint:
def enrich():
    data = request.json
    if not isinstance(data, (dict, list)):
        return jsonify({'error': 'Invalid input'}), 400
    # ... process request
4

Rate Limiting

Implement rate limiting to prevent abuse:
from flask_limiter import Limiter

limiter = Limiter(app, default_limits=["1000 per hour"])

@app.route('/enrich', methods=['POST'])
@limiter.limit("100 per minute")
def enrich():
    # ... process request

Limitations

HTTP Handler transforms have some limitations to be aware of:
  • No schema changes: You can add fields but cannot remove or change types of existing fields
  • Response size: Very large responses (>10MB) may cause issues
  • Timeout: Requests that take longer than the timeout will fail and retry
  • Order: Responses must be in the same order as requests for batch mode

Debugging

View logs to debug HTTP handler issues:
goldsky turbo logs my-pipeline
Look for:
  • HTTP status codes (200 = success, 4xx/5xx = errors)
  • Response times
  • Retry attempts
  • Error messages from your endpoint
Common issues:
  • “Connection refused”: Your endpoint is not reachable
  • “Timeout”: Your endpoint is too slow, optimize or increase timeout
  • “Schema mismatch”: Response doesn’t include all original fields
  • “Invalid JSON”: Your endpoint returned malformed JSON

Best Practices

Only send rows that need enrichment to reduce API calls:
transforms:
  # Filter first
  needs_enrichment:
    type: sql
    sql: SELECT * FROM source WHERE value > 1000000

  # Then enrich
  enriched:
    type: handler
    from: needs_enrichment
    url: https://api.example.com/enrich
Batch mode reduces network overhead:
enriched:
  type: handler
  one_row_per_request: false  # Batch mode
Aim for under 100ms response times:
  • Cache frequently accessed data
  • Use database indexes
  • Optimize expensive computations
  • Consider async processing for slow operations
Track metrics like:
  • Request rate
  • Response times (p50, p95, p99)
  • Error rates
  • Resource usage (CPU, memory)
Make your endpoint resilient:
  • Return partial results on partial failures
  • Log errors for debugging
  • Implement circuit breakers for downstream dependencies
  • Provide fallback values when enrichment fails

Next Steps