Skip to main content
Version: 1.4.0 (latest)

Snowflake

Install dlt with Snowflakeโ€‹

To install the dlt library with Snowflake dependencies, run:

pip install "dlt[snowflake]"

Setup guideโ€‹

1. Initialize a project with a pipeline that loads to Snowflake by running:

dlt init chess snowflake

2. Install the necessary dependencies for Snowflake by running:

pip install -r requirements.txt

This will install dlt with the snowflake extra, which contains the Snowflake Python dbapi client.

3. Create a new database, user, and give dlt access.

Read the next chapter below.

4. Enter your credentials into .dlt/secrets.toml. It should now look like this:

[destination.snowflake.credentials]
database = "dlt_data"
password = "<password>"
username = "loader"
host = "kgiotue-wn98412"
warehouse = "COMPUTE_WH"
role = "DLT_LOADER_ROLE"

In the case of Snowflake, the host is your Account Identifier. You can get it in Admin/Accounts by copying the account URL: https://kgiotue-wn98412.snowflakecomputing.com and extracting the host name (kgiotue-wn98412).

The warehouse and role are optional if you assign defaults to your user. In the example below, we do not do that, so we set them explicitly.

Set up the database user and permissionsโ€‹

The instructions below assume that you use the default account setup that you get after creating a Snowflake account. You should have a default warehouse named COMPUTE_WH and a Snowflake account. Below, we create a new database, user, and assign permissions. The permissions are very generous. A more experienced user can easily reduce dlt permissions to just one schema in the database.

-- create database with standard settings
CREATE DATABASE dlt_data;
-- create new user - set your password here
CREATE USER loader WITH PASSWORD='<password>';
-- we assign all permissions to a role
CREATE ROLE DLT_LOADER_ROLE;
GRANT ROLE DLT_LOADER_ROLE TO USER loader;
-- give database access to new role
GRANT USAGE ON DATABASE dlt_data TO DLT_LOADER_ROLE;
-- allow `dlt` to create new schemas
GRANT CREATE SCHEMA ON DATABASE dlt_data TO ROLE DLT_LOADER_ROLE;
-- allow access to a warehouse named COMPUTE_WH
GRANT USAGE ON WAREHOUSE COMPUTE_WH TO DLT_LOADER_ROLE;
-- grant access to all future schemas and tables in the database
GRANT ALL PRIVILEGES ON FUTURE SCHEMAS IN DATABASE dlt_data TO DLT_LOADER_ROLE;
GRANT ALL PRIVILEGES ON FUTURE TABLES IN DATABASE dlt_data TO DLT_LOADER_ROLE;

Now you can use the user named LOADER to access the database DLT_DATA and log in with the specified password.

You can also decrease the suspend time for your warehouse to 1 minute (Admin/Warehouses in Snowflake UI).

Authentication typesโ€‹

Snowflake destination accepts three authentication types:

The password authentication is not any different from other databases like Postgres or Redshift. dlt follows the same syntax as the SQLAlchemy dialect.

You can also pass credentials as a database connection string. For example:

# Keep it at the top of your TOML file, before any section starts
destination.snowflake.credentials="snowflake://loader:<password>@kgiotue-wn98412/dlt_data?warehouse=COMPUTE_WH&role=DLT_LOADER_ROLE"

In key pair authentication, you replace the password with a private key string that should be in Base64-encoded DER format (dbt also recommends base64-encoded private keys for Snowflake connections). The private key may also be encrypted. In that case, you must provide a passphrase alongside the private key.

[destination.snowflake.credentials]
database = "dlt_data"
username = "loader"
host = "kgiotue-wn98412"
private_key = "LS0tLS1CRUdJTiBFTkNSWVBURUQgUFJJ....Qo="
private_key_passphrase="passphrase"

You can easily get the base64-encoded value of your private key by running base64 -i <path-to-private-key-file>.pem in your terminal

If you pass a passphrase in the connection string, please URL encode it.

# Keep it at the top of your TOML file, before any section starts
destination.snowflake.credentials="snowflake://loader:<password>@kgiotue-wn98412/dlt_data?private_key=<base64 encoded pem>&private_key_passphrase=<url encoded passphrase>"

In OAuth authentication, you can use an OAuth provider like Snowflake, Okta, or an external browser to authenticate. In the case of Snowflake OAuth, you pass your authenticator and refresh token as below:

[destination.snowflake.credentials]
database = "dlt_data"
username = "loader"
authenticator="oauth"
token="..."

or in the connection string as query parameters.

In the case of external authentication, you need to find documentation for your OAuth provider. Refer to Snowflake OAuth for more details.

Additional connection optionsโ€‹

We pass all query parameters to the connect function of the Snowflake Python Connector. For example:

[destination.snowflake.credentials]
database = "dlt_data"
authenticator="oauth"
[destination.snowflake.credentials.query]
timezone="UTC"
# keep session alive beyond 4 hours
client_session_keep_alive=true

This will set the timezone and session keep alive. Mind that if you use TOML, your configuration is typed. The alternative: "snowflake://loader/dlt_data?authenticator=oauth&timezone=UTC&client_session_keep_alive=true" will pass client_session_keep_alive as a string to the connect method (which we didn't verify if it works).

Write dispositionโ€‹

All write dispositions are supported.

If you set the replace strategy to staging-optimized, the destination tables will be dropped and recreated with a clone command from the staging tables.

Data loadingโ€‹

The data is loaded using an internal Snowflake stage. We use the PUT command and per-table built-in stages by default. Stage files are kept by default, unless specified otherwise via the keep_staged_files parameter:

[destination.snowflake]
keep_staged_files = false

Data typesโ€‹

snowflake supports various timestamp types, which can be configured using the column flags timezone and precision in the dlt.resource decorator or the pipeline.run method.

  • Precision: Allows you to specify the number of decimal places for fractional seconds, ranging from 0 to 9. It can be used in combination with the timezone flag.
  • Timezone:
    • Setting timezone=False maps to TIMESTAMP_NTZ.
    • Setting timezone=True (or omitting the flag, which defaults to True) maps to TIMESTAMP_TZ.

Example precision and timezone: TIMESTAMP_NTZ(3)โ€‹

@dlt.resource(
columns={"event_tstamp": {"data_type": "timestamp", "precision": 3, "timezone": False}},
primary_key="event_id",
)
def events():
yield [{"event_id": 1, "event_tstamp": "2024-07-30T10:00:00.123"}]

pipeline = dlt.pipeline(destination="snowflake")
pipeline.run(events())

Supported file formatsโ€‹

When staging is enabled:

caution

When loading from Parquet, Snowflake will store json types (JSON) in VARIANT as a string. Use the JSONL format instead or use PARSE_JSON to update the VARIANT field after loading.

Custom CSV formatsโ€‹

By default, we support the CSV format produced by our writers, which is comma-delimited, with a header, and optionally quoted.

You can configure your own formatting, i.e., when importing external csv files.

[destination.snowflake.csv_format]
delimiter="|"
include_header=false
on_error_continue=true

This will read a | delimited file, without a header, and will continue on errors.

Note that we ignore missing columns ERROR_ON_COLUMN_COUNT_MISMATCH = FALSE and we will insert NULL into them.

Supported column hintsโ€‹

Snowflake supports the following column hints:

  • cluster - Creates a cluster column(s). Many columns per table are supported and only when a new table is created.

Table and column identifiersโ€‹

Snowflake supports both case-sensitive and case-insensitive identifiers. All unquoted and uppercase identifiers resolve case-insensitively in SQL statements. Case-insensitive naming conventions like the default snake_case will generate case-insensitive identifiers. Case-sensitive (like sql_cs_v1) will generate case-sensitive identifiers that must be quoted in SQL statements.

note

Names of tables and columns in schemas are kept in lowercase like for all other destinations. This is the pattern we observed in other tools, i.e., dbt. In the case of dlt, it is, however, trivial to define your own uppercase naming convention.

Staging supportโ€‹

Snowflake supports S3 and GCS as file staging destinations. dlt will upload files in the parquet format to the bucket provider and will ask Snowflake to copy their data directly into the db.

Alternatively to parquet files, you can also specify jsonl as the staging file format. For this, set the loader_file_format argument of the run command of the pipeline to jsonl.

Snowflake and Amazon S3โ€‹

Please refer to the S3 documentation to learn how to set up your bucket with the bucket_url and credentials. For S3, the dlt Redshift loader will use the AWS credentials provided for S3 to access the S3 bucket if not specified otherwise (see config options below). Alternatively, you can create a stage for your S3 Bucket by following the instructions provided in the Snowflake S3 documentation. The basic steps are as follows:

  • Create a storage integration linked to GCS and the right bucket.
  • Grant access to this storage integration to the Snowflake role you are using to load the data into Snowflake.
  • Create a stage from this storage integration in the PUBLIC namespace, or the namespace of the schema of your data.
  • Also grant access to this stage for the role you are using to load data into Snowflake.
  • Provide the name of your stage (including the namespace) to dlt like so:

To prevent dlt from forwarding the S3 bucket credentials on every command, and set your S3 stage, change these settings:

[destination]
stage_name="PUBLIC.my_s3_stage"

To run Snowflake with S3 as the staging destination:

# Create a `dlt` pipeline that will load
# chess player data to the Snowflake destination
# via staging on S3
pipeline = dlt.pipeline(
pipeline_name='chess_pipeline',
destination='snowflake',
staging='filesystem', # add this to activate the staging location
dataset_name='player_data'
)

Snowflake and Google Cloud Storageโ€‹

Please refer to the Google Storage filesystem documentation to learn how to set up your bucket with the bucket_url and credentials. For GCS, you can define a stage in Snowflake and provide the stage identifier in the configuration (see config options below). Please consult the Snowflake Documentation on how to create a stage for your GCS Bucket. The basic steps are as follows:

  • Create a storage integration linked to GCS and the right bucket.
  • Grant access to this storage integration to the Snowflake role you are using to load the data into Snowflake.
  • Create a stage from this storage integration in the PUBLIC namespace, or the namespace of the schema of your data.
  • Also grant access to this stage for the role you are using to load data into Snowflake.
  • Provide the name of your stage (including the namespace) to dlt like so:
[destination]
stage_name="PUBLIC.my_gcs_stage"

To run Snowflake with GCS as the staging destination:

# Create a `dlt` pipeline that will load
# chess player data to the Snowflake destination
# via staging on GCS
pipeline = dlt.pipeline(
pipeline_name='chess_pipeline',
destination='snowflake',
staging='filesystem', # add this to activate the staging location
dataset_name='player_data'
)

Snowflake and Azure Blob Storageโ€‹

Please refer to the Azure Blob Storage filesystem documentation to learn how to set up your bucket with the bucket_url and credentials. For Azure, the Snowflake loader will use the filesystem credentials for your Azure Blob Storage container if not specified otherwise (see config options below). Alternatively, you can define an external stage in Snowflake and provide the stage identifier. Please consult the Snowflake Documentation on how to create a stage for your Azure Blob Storage Container. The basic steps are as follows:

  • Create a storage integration linked to Azure Blob Storage and the right container.
  • Grant access to this storage integration to the Snowflake role you are using to load the data into Snowflake.
  • Create a stage from this storage integration in the PUBLIC namespace, or the namespace of the schema of your data.
  • Also, grant access to this stage for the role you are using to load data into Snowflake.
  • Provide the name of your stage (including the namespace) to dlt like so:
[destination]
stage_name="PUBLIC.my_azure_stage"

To run Snowflake with Azure as the staging destination:

# Create a `dlt` pipeline that will load
# chess player data to the Snowflake destination
# via staging on Azure
pipeline = dlt.pipeline(
pipeline_name='chess_pipeline',
destination='snowflake',
staging='filesystem', # add this to activate the staging location
dataset_name='player_data'
)

Additional destination optionsโ€‹

You can define your own stage to PUT files and disable the removal of the staged files after loading.

[destination.snowflake]
# Use an existing named stage instead of the default. Default uses the implicit table stage per table
stage_name="DLT_STAGE"
# Whether to keep or delete the staged files after COPY INTO succeeds
keep_staged_files=true

Setting up CSV formatโ€‹

You can provide non-default csv settings via configuration file or explicitly.

[destination.snowflake.csv_format]
delimiter="|"
include_header=false
on_error_continue=true

or

from dlt.destinations import snowflake
from dlt.common.data_writers.configuration import CsvFormatConfiguration

csv_format = CsvFormatConfiguration(delimiter="|", include_header=False, on_error_continue=True)

dest_ = snowflake(csv_format=csv_format)

Above, we set the CSV file format without a header, with | as a separator, and we request to ignore lines with errors.

tip

You'll need these settings when importing external files.

Query taggingโ€‹

dlt tags sessions that execute loading jobs with the following job properties:

  • source - name of the source (identical with the name of the dlt schema)
  • resource - name of the resource (if known, else empty string)
  • table - name of the table loaded by the job
  • load_id - load id of the job
  • pipeline_name - name of the active pipeline (or empty string if not found)

You can define a query tag by defining a query tag placeholder in Snowflake credentials:

[destination.snowflake]
query_tag='{{"source":"{source}", "resource":"{resource}", "table": "{table}", "load_id":"{load_id}", "pipeline_name":"{pipeline_name}"}}'

which contains Python named formatters corresponding to tag names i.e., {source} will assume the name of the dlt source.

note
  1. Query tagging is off by default. The query_tag configuration field is None by default and must be set to enable tagging.
  2. Only sessions associated with a job are tagged. Sessions that migrate schemas remain untagged.
  3. Jobs processing table chains (i.e., SQL merge jobs) will use the top-level table as table.

dbt supportโ€‹

This destination integrates with dbt via dbt-snowflake. Both password and key pair authentication are supported and shared with dbt runners.

Syncing of dlt stateโ€‹

This destination fully supports dlt state sync.

Snowflake connection identifierโ€‹

We enable Snowflake to identify that the connection is created by dlt. Snowflake will use this identifier to better understand the usage patterns associated with dlt integration. The connection identifier is dltHub_dlt.

Additional Setup guidesโ€‹

This demo works on codespaces. Codespaces is a development environment available for free to anyone with a Github account. You'll be asked to fork the demo repository and from there the README guides you with further steps.
The demo uses the Continue VSCode extension.

Off to codespaces!

DHelp

Ask a question

Welcome to "Codex Central", your next-gen help center, driven by OpenAI's GPT-4 model. It's more than just a forum or a FAQ hub โ€“ it's a dynamic knowledge base where coders can find AI-assisted solutions to their pressing problems. With GPT-4's powerful comprehension and predictive abilities, Codex Central provides instantaneous issue resolution, insightful debugging, and personalized guidance. Get your code running smoothly with the unparalleled support at Codex Central - coding help reimagined with AI prowess.