Filesystem source
Filesystem source allows loading files from remote locations (AWS S3, Google Cloud Storage, Google Drive, Azure Blob Storage, SFTP server) or the local filesystem seamlessly. Filesystem source natively supports CSV, Parquet, and JSONL files and allows customization for loading any type of structured files.
To load unstructured data (PDF, plain text, e-mail), please refer to the unstructured data source.
How filesystem source works
The Filesystem source doesn't just give you an easy way to load data from both remote and local files — it also comes with a powerful set of tools that let you customize the loading process to fit your specific needs.
Filesystem source loads data in two steps:
- It accesses the files in your remote or local file storage without actually reading the content yet. At this point, you can filter files by metadata or name. You can also set up incremental loading to load only new files.
- The transformer reads the files' content and yields the records. At this step, you can filter out the actual data, enrich records with metadata from files, or perform incremental loading based on the file content.
Quick example
import dlt
from dlt.sources.filesystem import filesystem, read_parquet
filesystem_resource = filesystem(
bucket_url="file://Users/admin/Documents/parquet_files",
file_glob="**/*.parquet"
)
filesystem_pipe = filesystem_resource | read_parquet()
filesystem_pipe.apply_hints(incremental=dlt.sources.incremental("modification_date"))
# We load the data into the table_name table
pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
load_info = pipeline.run(filesystem_pipe.with_name("table_name"))
print(load_info)
print(pipeline.last_trace.last_normalize_info)
Setup
Prerequisites
Please make sure the dlt
library is installed. Refer to the installation guide.
Initialize the filesystem source
To get started with your data pipeline, follow these steps:
Enter the following command:
dlt init filesystem duckdb
The dlt init command will initialize the pipeline example with the filesystem as the source and duckdb as the destination.
If you would like to use a different destination, simply replace
duckdb
with the name of your preferred destination.After running this command, a new directory will be created with the necessary files and configuration settings to get started.
Configuration
Get credentials
- AWS S3
- GCS/GDrive
- Azure
- SFTP
- Local filesystem
To get AWS keys for S3 access:
- Access IAM in the AWS Console.
- Select "Users", choose a user, and open "Security credentials".
- Click "Create access key" for AWS ID and Secret Key.
For more info, see AWS official documentation.
To get GCS/GDrive access:
- Log in to console.cloud.google.com.
- Create a service account.
- Enable "Cloud Storage API" / "Google Drive API"; see Google's guide.
- In IAM & Admin > Service Accounts, find your account, click the three-dot menu > "Manage Keys" > "ADD KEY" > "CREATE" to get a JSON credential file.
- Grant the service account appropriate permissions for cloud storage access.
For more info, see how to create a service account.
To obtain Azure blob storage access:
- Go to the Azure Portal (portal.azure.com).
- Select "Storage accounts" > your storage.
- Click "Settings" > "Access keys".
- View the account name and two keys (primary/secondary). Keep keys confidential.
For more info, see Azure official documentation.
dlt supports several authentication methods:
- Key-based authentication
- SSH Agent-based authentication
- Username/Password authentication
- GSS-API authentication
Learn more about SFTP authentication options in the SFTP section. To obtain credentials, contact your server administrator.
Add credentials to dlt pipeline
To provide credentials to the filesystem source, you can use any method available in dlt.
One of the easiest ways is to use configuration files. The .dlt
folder in your working directory contains two files: config.toml
and secrets.toml
. Sensitive information, like passwords and access tokens, should only be put into secrets.toml
, while any other configuration, like the path to a bucket, can be specified in config.toml
.
- AWS S3
- GCS/GDrive
- Azure
- SFTP
- Local filesystem
# secrets.toml
[sources.filesystem.credentials]
aws_access_key_id="Please set me up!"
aws_secret_access_key="Please set me up!"
# config.toml
[sources.filesystem]
bucket_url="s3://<bucket_name>/<path_to_files>/"
# secrets.toml
[sources.filesystem.credentials]
azure_storage_account_name="Please set me up!"
azure_storage_account_key="Please set me up!"
# config.toml
[sources.filesystem] # use [sources.readers.credentials] for the "readers" source
bucket_url="az://<container_name>/<path_to_files>/"
# secrets.toml
[sources.filesystem.credentials]
client_email="Please set me up!"
private_key="Please set me up!"
project_id="Please set me up!"
# config.toml
# gdrive
[gdrive_pipeline_name.sources.filesystem]
bucket_url="gdrive://<folder_name>/<subfolder_or_file_path>/"
# config.toml
# Google storage
[gstorage_pipeline_name.sources.filesystem]
bucket_url="gs://<bucket_name>/<path_to_files>/"
Learn how to set up SFTP credentials for each authentication method in the SFTP section. For example, in the case of key-based authentication, you can configure the source the following way:
# secrets.toml
[sources.filesystem.credentials]
sftp_username = "foo"
sftp_key_filename = "/path/to/id_rsa" # Replace with the path to your private key file
sftp_key_passphrase = "your_passphrase" # Optional: passphrase for your private key
# config.toml
[sources.filesystem] # use [sources.readers.credentials] for the "readers" source
bucket_url = "sftp://[hostname]/[path]"
You can use both native local filesystem paths and the file://
URI. Absolute, relative, and UNC Windows paths are supported.
You could provide an absolute filepath:
# config.toml
[sources.filesystem]
bucket_url='file://Users/admin/Documents/csv_files'
Or skip the schema and provide the local path in a format native to your operating system. For example, for Windows:
[sources.filesystem]
bucket_url='~\Documents\csv_files\'
You can also specify the credentials using environment variables. The name of the corresponding environment variable should be slightly different from the corresponding name in the TOML file. Simply replace dots .
with double underscores __
:
export SOURCES__FILESYSTEM__CREDENTIALS__AWS_ACCESS_KEY_ID = "Please set me up!"
export SOURCES__FILESYSTEM__CREDENTIALS__AWS_SECRET_ACCESS_KEY = "Please set me up!"
dlt supports more ways of authorizing with cloud storage, including identity-based and default credentials. To learn more about adding credentials to your pipeline, please refer to the Configuration and secrets section.
Usage
The filesystem source is quite unique since it provides you with building blocks for loading data from files. First, it iterates over files in the storage and then processes each file to yield the records. Usually, you need two resources:
- The
filesystem
resource enumerates files in a selected bucket using a glob pattern, returning details asFileItem
in customizable page sizes. - One of the available transformer resources to process each file in a specific transforming function and yield the records.
1. Initialize a filesystem
resource
If you use just the filesystem
resource, it will only list files in the storage based on glob parameters and yield the files metadata. The filesystem
resource itself does not read or copy files.
All parameters of the resource can be specified directly in code:
from dlt.sources.filesystem import filesystem
filesystem_source = filesystem(
bucket_url="file://Users/admin/Documents/csv_files",
file_glob="*.csv"
)
or taken from the config:
python code:
from dlt.sources.filesystem import filesystem
filesystem_source = filesystem()configuration file:
[sources.filesystem]
bucket_url="file://Users/admin/Documents/csv_files"
file_glob="*.csv"
Full list of filesystem
resource parameters:
bucket_url
- full URL of the bucket (could be a relative path in the case of the local filesystem).credentials
- cloud storage credentials ofAbstractFilesystem
instance (should be empty for the local filesystem). We recommend not specifying this parameter in the code, but putting it in a secrets file instead.file_glob
- file filter in glob format. Defaults to listing all non-recursive files in the bucket URL.files_per_page
- number of files processed at once. The default value is100
.extract_content
- if true, the content of the file will be read and returned in the resource. The default value isFalse
.
2. Choose the right transformer resource
The current implementation of the filesystem source natively supports three file types: CSV, Parquet, and JSONL.
You can apply any of the above or create your own transformer. To apply the selected transformer resource, use pipe notation |
:
from dlt.sources.filesystem import filesystem, read_csv
filesystem_pipe = filesystem(
bucket_url="file://Users/admin/Documents/csv_files",
file_glob="*.csv"
) | read_csv()
Available transformers
read_csv()
- processes CSV files using Pandasread_jsonl()
- processes JSONL files chunk by chunkread_parquet()
- processes Parquet files using PyArrowread_csv_duckdb()
- this transformer processes CSV files using DuckDB, which usually shows better performance than pandas.
We advise that you give each resource a specific name before loading with pipeline.run
. This will ensure that data goes to a table with the name you want and that each pipeline uses a separate state for incremental loading.
3. Create and run a pipeline
import dlt
from dlt.sources.filesystem import filesystem, read_csv
filesystem_pipe = filesystem(bucket_url="file://Users/admin/Documents/csv_files", file_glob="*.csv") | read_csv()
pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
info = pipeline.run(filesystem_pipe)
print(info)
For more information on how to create and run the pipeline, read the Walkthrough: Run a pipeline.
4. Apply hints
import dlt
from dlt.sources.filesystem import filesystem, read_csv
filesystem_pipe = filesystem(bucket_url="file://Users/admin/Documents/csv_files", file_glob="*.csv") | read_csv()
# Tell dlt to merge on date
filesystem_pipe.apply_hints(write_disposition="merge", merge_key="date")
# We load the data into the table_name table
pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
load_info = pipeline.run(filesystem_pipe.with_name("table_name"))
print(load_info)
5. Incremental loading
Here are a few simple ways to load your data incrementally:
- Load files based on modification date. Only load files that have been updated since the last time
dlt
processed them.dlt
checks the files' metadata (like the modification date) and skips those that haven't changed. - Load new records based on a specific column. You can load only the new or updated records by looking at a specific column, like
updated_at
. Unlike the first method, this approach would read all files every time and then filter the records which were updated. - Combine loading only updated files and records. Finally, you can combine both methods. It could be useful if new records could be added to existing files, so you not only want to filter the modified files, but also the modified records.
Load files based on modification date
For example, to load only new CSV files with incremental loading, you can use the apply_hints
method.
import dlt
from dlt.sources.filesystem import filesystem, read_csv
# This configuration will only consider new CSV files
new_files = filesystem(bucket_url="s3://bucket_name", file_glob="directory/*.csv")
# Add incremental on modification time
new_files.apply_hints(incremental=dlt.sources.incremental("modification_date"))
pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
load_info = pipeline.run((new_files | read_csv()).with_name("csv_files"))
print(load_info)
Load new records based on a specific column
In this example, we load only new records based on the field called updated_at
. This method may be useful if you are not able to
filter files by modification date because, for example, all files are modified each time a new record appears.
import dlt
from dlt.sources.filesystem import filesystem, read_csv
# We consider all CSV files
all_files = filesystem(bucket_url="s3://bucket_name", file_glob="directory/*.csv")
# But filter out only updated records
filesystem_pipe = (all_files | read_csv())
filesystem_pipe.apply_hints(incremental=dlt.sources.incremental("updated_at"))
pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
load_info = pipeline.run(filesystem_pipe)
print(load_info)
Combine loading only updated files and records
import dlt
from dlt.sources.filesystem import filesystem, read_csv
# This configuration will only consider modified CSV files
new_files = filesystem(bucket_url="s3://bucket_name", file_glob="directory/*.csv")
new_files.apply_hints(incremental=dlt.sources.incremental("modification_date"))
# And in each modified file, we filter out only updated records
filesystem_pipe = (new_files | read_csv())
filesystem_pipe.apply_hints(incremental=dlt.sources.incremental("updated_at"))
pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
load_info = pipeline.run(filesystem_pipe)
print(load_info)
6. Filter files
If you need to filter out files based on their metadata, you can easily do this using the add_filter
method.
Within your filtering function, you'll have access to any field of the FileItem
representation.
Filter by name
To filter only files that have London
and Berlin
in their names, you can do the following:
import dlt
from dlt.sources.filesystem import filesystem, read_csv
# Filter files accessing file_name field
filtered_files = filesystem(bucket_url="s3://bucket_name", file_glob="directory/*.csv")
filtered_files.add_filter(lambda item: ("London" in item["file_name"]) or ("Berlin" in item["file_name"]))
filesystem_pipe = (filtered_files | read_csv())
pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
load_info = pipeline.run(filesystem_pipe)
print(load_info)
You could also use file_glob
to filter files by names. It works very well in simple cases, for example, filtering by extension:
from dlt.sources.filesystem import filesystem
filtered_files = filesystem(bucket_url="s3://bucket_name", file_glob="**/*.json")
Filter by size
If for some reason you only want to load small files, you can also do that:
import dlt
from dlt.sources.filesystem import filesystem, read_csv
MAX_SIZE_IN_BYTES = 10
# Filter files accessing size_in_bytes field
filtered_files = filesystem(bucket_url="s3://bucket_name", file_glob="directory/*.csv")
filtered_files.add_filter(lambda item: item["size_in_bytes"] < MAX_SIZE_IN_BYTES)
filesystem_pipe = (filtered_files | read_csv())
pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
load_info = pipeline.run(filesystem_pipe)
print(load_info)
Troubleshooting
Access extremely long file paths
Windows supports paths up to 255 characters. When you access a path longer than 255 characters, you'll see a FileNotFound
exception.
To go over this limit, you can use extended paths. Note that Python glob does not work with extended UNC paths, so you will not be able to use them
[sources.filesystem]
bucket_url = '\\?\C:\a\b\c'
If you get an empty list of files
If you are running a dlt pipeline with the filesystem source and get zero records, we recommend you check
the configuration of bucket_url
and file_glob
parameters.
For example, with Azure Blob Storage, people sometimes mistake the account name for the container name. Make sure you've set up a URL as "az://<container name>/"
.
Also, please reference the glob function to configure the resource correctly. Use **
to include recursive files. Note that the local filesystem supports full Python glob functionality, while cloud storage supports a restricted fsspec
version.
Additional Setup guides
- Load data from Sentry to AWS S3 in python with dlt
- Load data from Google Cloud Storage to AWS Athena in python with dlt
- Load data from Zuora to The Local Filesystem in python with dlt
- Load data from Fivetran to The Local Filesystem in python with dlt
- Load data from Looker to AWS S3 in python with dlt
- Load data from Keap to AWS S3 in python with dlt
- Load data from Azure Cloud Storage to Snowflake in python with dlt
- Load data from IFTTT to Google Cloud Storage in python with dlt
- Load data from DigitalOcean to Azure Cloud Storage in python with dlt
- Load data from Braze to Google Cloud Storage in python with dlt