Streaming data directly to Snowflake would require an always-on compute instance and cost approximately $1,500 per month for the smallest instance available on Snowflake.

As a result, we don’t yet support it as a native sink; and instead outline here how to load periodically from S3, a cost-effective storage sink that is natively supported by Goldsky and by Snowflake.

What you’ll need

This page outlines the core process with illustrative scripts (using python and SQL); you’ll need to schedule these tasks into an orchestration platform such as Dagster or Airflow to run on your desired schedule. You’ll also need:

  1. AWS Key ID and AWS Secret Key to access your S3 bucket.
  2. Snowflake account details (account, username, password, database name, warehouse name)
  3. A Goldsky Mirror pipeline writing the desired data to an S3 bucket.

Write extraction script

1

Install required Python libraries

You will need the pandas library to handle data and the snowflake-connector-python library to connect to Snowflake. Install them using pip:

pip install pandas snowflake-connector-python
2

Define AWS credentials

Define your AWS Key ID and AWS Secret Key as variables. These will be used to access your S3 bucket.

python
AWS_KEY_ID = "INSERT_HERE"
AWS_SECRET_KEY = "INSERT_HERE"
3

Write SQL scripts

Write functions that generate SQL scripts to create a table in Snowflake (if it doesn’t exist) and load data from the S3 bucket into the table.

def get_data_script(chain):
return f"""
create table if not exists YOUR_DATABASE.YOUR_SCHEMA.raw_{chain}data_parquet(DATA_RAW VARIANT);
COPY INTO YOUR_DATABASE.YOUR_SCHEMA.raw{chain}data_parquet
FROM s3://your_bucket/{chain}/data credentials=(AWS_KEY_ID='{AWS_KEY_ID}' AWS_SECRET_KEY='{AWS_SECRET_KEY}')
PATTERN = '..parquet'
FILE_FORMAT = (
TYPE = 'parquet'
);
"""
4

Connect to Snowflake and execute SQL scripts

Connect to Snowflake using the snowflake.connector.connect function. Then, execute the SQL scripts using the cursor.execute method, and then close the connection.

import snowflake.connector

# Connect to Snowflake
con = snowflake.connector.connect(
user='YOUR_USERNAME',
password='YOUR_PASSWORD',
account='YOUR_ACCOUNT',
warehouse='YOUR_WAREHOUSE',
database='YOUR_DATABASE',
schema='YOUR_SCHEMA'
)

# Create a cursor object
cur = con.cursor()

# Execute the SQL script
cur.execute(get_data_script("your_chain"))

# Close the connection
con.close()
5

Load data from staging table to destination table

You know have the raw data from S3 loaded into Snowflake; but because S3 as a sink is not re-org aware, you’ll need an intermediate step to manage duplicate/stale data. This can be done with a QUALIFY statement, depending on your specific situation.

For example:

{{ config(materialized="table") }}
with
    MATERIALIZED_TABLE as (
        select
            *
        from {{ source("DATABASE", "RAW_S3_TABLE") }}
    )
select *
from MATERIALIZED_TABLE
qualify row_number() over (partition by id order by block_timestamp desc) = 1

The final line in the code above contains the key de-duplication logic based on id.

Can't find what you're looking for? Reach out to us at support@goldsky.com for help.

Was this page helpful?