Snowflake
Streaming data directly to Snowflake would require an always-on compute instance and cost approximately $1,500 per month for the smallest instance available on Snowflake.
As a result, we don’t yet support it as a native sink; and instead outline here how to load periodically from S3, a cost-effective storage sink that is natively supported by Goldsky and by Snowflake.
What you’ll need
This page outlines the core process with illustrative scripts (using python and SQL); you’ll need to schedule these tasks into an orchestration platform such as Dagster or Airflow to run on your desired schedule. You’ll also need:
- AWS Key ID and AWS Secret Key to access your S3 bucket.
- Snowflake account details (account, username, password, database name, warehouse name)
- A Goldsky Mirror pipeline writing the desired data to an S3 bucket.
Write extraction script
Install required Python libraries
You will need the pandas
library to handle data and the snowflake-connector-python
library to connect to Snowflake. Install them using pip:
pip install pandas snowflake-connector-python
Define AWS credentials
Define your AWS Key ID and AWS Secret Key as variables. These will be used to access your S3 bucket.
python
AWS_KEY_ID = "INSERT_HERE"
AWS_SECRET_KEY = "INSERT_HERE"
Write SQL scripts
Write functions that generate SQL scripts to create a table in Snowflake (if it doesn’t exist) and load data from the S3 bucket into the table.
def get_data_script(chain):
return f"""
create table if not exists YOUR_DATABASE.YOUR_SCHEMA.raw_{chain}data_parquet(DATA_RAW VARIANT);
COPY INTO YOUR_DATABASE.YOUR_SCHEMA.raw{chain}data_parquet
FROM s3://your_bucket/{chain}/data credentials=(AWS_KEY_ID='{AWS_KEY_ID}' AWS_SECRET_KEY='{AWS_SECRET_KEY}')
PATTERN = '..parquet'
FILE_FORMAT = (
TYPE = 'parquet'
);
"""
Connect to Snowflake and execute SQL scripts
Connect to Snowflake using the snowflake.connector.connect
function. Then, execute the SQL scripts using the cursor.execute
method, and then close the connection.
import snowflake.connector
# Connect to Snowflake
con = snowflake.connector.connect(
user='YOUR_USERNAME',
password='YOUR_PASSWORD',
account='YOUR_ACCOUNT',
warehouse='YOUR_WAREHOUSE',
database='YOUR_DATABASE',
schema='YOUR_SCHEMA'
)
# Create a cursor object
cur = con.cursor()
# Execute the SQL script
cur.execute(get_data_script("your_chain"))
# Close the connection
con.close()
Load data from staging table to destination table
You know have the raw data from S3 loaded into Snowflake; but because S3 as a sink is not re-org aware, you’ll need an intermediate step to manage duplicate/stale data. This can be done with a QUALIFY
statement, depending on your specific situation.
For example:
{{ config(materialized="table") }}
with
MATERIALIZED_TABLE as (
select
*
from {{ source("DATABASE", "RAW_S3_TABLE") }}
)
select *
from MATERIALIZED_TABLE
qualify row_number() over (partition by id order by block_timestamp desc) = 1
The final line in the code above contains the key de-duplication logic based on id.
Can't find what you're looking for? Reach out to us at support@goldsky.com for help.
Was this page helpful?