> ## Documentation Index
> Fetch the complete documentation index at: https://docs.goldsky.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Snowflake

> Periodically load Mirror pipeline data into Snowflake from S3 using Dagster, Airflow, or another scheduler.

<Info>
  Streaming data directly to Snowflake would require an always-on compute instance and cost approximately \$1,500 per month for the smallest instance available on Snowflake.

  As a result, **we don't yet support it as a native sink**; and instead outline here how to load periodically from S3, a cost-effective storage sink that is natively supported by Goldsky and by Snowflake.
</Info>

## What you'll need

This page outlines the core process with illustrative scripts (using python and SQL); you'll need to schedule these tasks into an orchestration platform such as Dagster or Airflow to run on your desired schedule. You'll also need:

1. AWS Key ID and AWS Secret Key to access your S3 bucket.
2. Snowflake account details (account, username, password, database name, warehouse name)
3. A Goldsky Mirror pipeline writing the desired data to an [S3 bucket](/mirror/sinks/object-storage).

## Write extraction script

<Steps>
  <Step title="Install required Python libraries">
    You will need the `pandas` library to handle data and the `snowflake-connector-python` library to connect to Snowflake. Install them using pip:

    ```bash theme={null}
    pip install pandas snowflake-connector-python
    ```
  </Step>

  <Step title="Define AWS credentials">
    Define your AWS Key ID and AWS Secret Key as variables. These will be used to access your S3 bucket.

    ```python theme={null}
    python
    AWS_KEY_ID = "INSERT_HERE"
    AWS_SECRET_KEY = "INSERT_HERE"
    ```
  </Step>

  <Step title="Write SQL scripts">
    Write functions that generate SQL scripts to create a table in Snowflake (if it doesn't exist) and load data from the S3 bucket into the table.

    ```python theme={null}
    def get_data_script(chain):
    return f"""
    create table if not exists YOUR_DATABASE.YOUR_SCHEMA.raw_{chain}data_parquet(DATA_RAW VARIANT);
    COPY INTO YOUR_DATABASE.YOUR_SCHEMA.raw{chain}data_parquet
    FROM s3://your_bucket/{chain}/data credentials=(AWS_KEY_ID='{AWS_KEY_ID}' AWS_SECRET_KEY='{AWS_SECRET_KEY}')
    PATTERN = '..parquet'
    FILE_FORMAT = (
    TYPE = 'parquet'
    );
    """
    ```
  </Step>

  <Step title="Connect to Snowflake and execute SQL scripts">
    Connect to Snowflake using the `snowflake.connector.connect` function. Then, execute the SQL scripts using the `cursor.execute` method, and then close the connection.

    ```python theme={null}
    import snowflake.connector

    # Connect to Snowflake
    con = snowflake.connector.connect(
    user='YOUR_USERNAME',
    password='YOUR_PASSWORD',
    account='YOUR_ACCOUNT',
    warehouse='YOUR_WAREHOUSE',
    database='YOUR_DATABASE',
    schema='YOUR_SCHEMA'
    )

    # Create a cursor object
    cur = con.cursor()

    # Execute the SQL script
    cur.execute(get_data_script("your_chain"))

    # Close the connection
    con.close()
    ```
  </Step>

  <Step title="Load data from staging table to destination table">
    You know have the raw data from S3 loaded into Snowflake; but because S3 as a sink is not re-org aware, you'll need an intermediate step to manage duplicate/stale data. This can be done with a `QUALIFY` statement, depending on your specific situation.

    For example:

    ```SQL theme={null}
    {{ config(materialized="table") }}
    with
        MATERIALIZED_TABLE as (
            select
                *
            from {{ source("DATABASE", "RAW_S3_TABLE") }}
        )
    select *
    from MATERIALIZED_TABLE
    qualify row_number() over (partition by id order by block_timestamp desc) = 1
    ```

    The final line in the code above contains the key de-duplication logic based on id.
  </Step>
</Steps>

Can't find what you're looking for? Reach out to us at [support@goldsky.com](mailto:support@goldsky.com) for help.
