REST API source
This is a dlt source you can use to extract data from any REST API. It uses declarative configuration to define the API endpoints, their relationships, how to handle pagination, and authentication.
Quick example
Here's an example of how to configure the REST API source to load posts and related comments from a hypothetical blog API:
import dlt
from dlt.sources.rest_api import rest_api_source
source = rest_api_source({
"client": {
"base_url": "https://api.example.com/",
"auth": {
"token": dlt.secrets["your_api_token"],
},
"paginator": {
"type": "json_link",
"next_url_path": "paging.next",
},
},
"resources": [
# "posts" will be used as the endpoint path, the resource name,
# and the table name in the destination. The HTTP client will send
# a request to "https://api.example.com/posts".
"posts",
# The explicit configuration allows you to link resources
# and define parameters.
{
"name": "comments",
"endpoint": {
"path": "posts/{post_id}/comments",
"params": {
"post_id": {
"type": "resolve",
"resource": "posts",
"field": "id",
},
"sort": "created_at",
},
},
},
],
})
pipeline = dlt.pipeline(
pipeline_name="rest_api_example",
destination="duckdb",
dataset_name="rest_api_data",
)
load_info = pipeline.run(source)
Running this pipeline will create two tables in DuckDB: posts
and comments
with the data from the respective API endpoints. The comments
resource will fetch comments for each post by using the id
field from the posts
resource.
Setup
Prerequisites
Please make sure the dlt
library is installed. Refer to the installation guide.
Initialize the REST API source
Enter the following command in your terminal:
dlt init rest_api duckdb
dlt init will initialize the pipeline examples for REST API as the source and duckdb as the destination.
Running dlt init
creates the following in the current folder:
rest_api_pipeline.py
file with a sample pipelines definition:- GitHub API example
- Pokemon API example
.dlt
folder with:secrets.toml
file to store your access tokens and other sensitive informationconfig.toml
file to store the configuration settings
requirements.txt
file with the required dependencies
Change the REST API source to your needs by modifying the rest_api_pipeline.py
file. See the detailed source configuration section below.
For the rest of the guide, we will use the GitHub API and Pokemon API as example sources.
This source is based on the RESTClient class.
Add credentials
In the .dlt
folder, you'll find a file called secrets.toml
, where you can securely store your access tokens and other sensitive information. It's important to handle this file with care and keep it safe.
The GitHub API requires an access token to access some of its endpoints and to increase the rate limit for the API calls. To get a GitHub token, follow the GitHub documentation on managing your personal access tokens.
After you get the token, add it to the secrets.toml
file:
[sources.rest_api_pipeline.github_source]
github_token = "your_github_token"
Run the pipeline
Install the required dependencies by running the following command:
pip install -r requirements.txt
Run the pipeline:
python rest_api_pipeline.py
Verify that everything loaded correctly by using the following command:
dlt pipeline rest_api show
Source configuration
Quick example
Let's take a look at the GitHub example in the rest_api_pipeline.py
file:
from dlt.sources.rest_api import RESTAPIConfig, rest_api_resources
@dlt.source
def github_source(github_token=dlt.secrets.value):
config: RESTAPIConfig = {
"client": {
"base_url": "https://api.github.com/repos/dlt-hub/dlt/",
"auth": {
"token": github_token,
},
},
"resource_defaults": {
"primary_key": "id",
"write_disposition": "merge",
"endpoint": {
"params": {
"per_page": 100,
},
},
},
"resources": [
{
"name": "issues",
"endpoint": {
"path": "issues",
"params": {
"sort": "updated",
"direction": "desc",
"state": "open",
"since": {
"type": "incremental",
"cursor_path": "updated_at",
"initial_value": "2024-01-25T11:21:28Z",
},
},
},
},
{
"name": "issue_comments",
"endpoint": {
"path": "issues/{issue_number}/comments",
"params": {
"issue_number": {
"type": "resolve",
"resource": "issues",
"field": "number",
}
},
},
"include_from_parent": ["id"],
},
],
}
yield from rest_api_resources(config)
def load_github() -> None:
pipeline = dlt.pipeline(
pipeline_name="rest_api_github",
destination="duckdb",
dataset_name="rest_api_data",
)
load_info = pipeline.run(github_source())
print(load_info)
The declarative resource configuration is defined in the config
dictionary. It contains the following key components:
client
: Defines the base URL and authentication method for the API. In this case, it uses token-based authentication. The token is stored in thesecrets.toml
file.resource_defaults
: Contains default settings for all resources. In this example, we define that all resources:- Have
id
as the primary key - Use the
merge
write disposition to merge the data with the existing data in the destination. - Send a
per_page=100
query parameter with each request to get more results per page.
- Have
resources
: A list of resources to be loaded. Here, we have two resources:issues
andissue_comments
, which correspond to the GitHub API endpoints for repository issues and issue comments. Note that we need an issue number to fetch comments for each issue. This number is taken from theissues
resource. More on this in the resource relationships section.
Let's break down the configuration in more detail.
Configuration structure
Import the RESTAPIConfig
type from the rest_api
module to have convenient hints in your editor/IDE and use it to define the configuration object.
from dlt.sources.rest_api import RESTAPIConfig
The configuration object passed to the REST API Generic Source has three main elements:
config: RESTAPIConfig = {
"client": {
# ...
},
"resource_defaults": {
# ...
},
"resources": [
# ...
],
}
client
The client
configuration is used to connect to the API's endpoints. It includes the following fields:
base_url
(str): The base URL of the API. This string is prepended to all endpoint paths. For example, if the base URL ishttps://api.example.com/v1/
, and the endpoint path isusers
, the full URL will behttps://api.example.com/v1/users
.headers
(dict, optional): Additional headers that are sent with each request.auth
(optional): Authentication configuration. This can be a simple token, anAuthConfigBase
object, or a more complex authentication method.paginator
(optional): Configuration for the default pagination used for resources that support pagination. Refer to the pagination section for more details.
resource_defaults
(optional)
resource_defaults
contains the default values to configure the dlt resources. This configuration is applied to all resources unless overridden by the resource-specific configuration.
For example, you can set the primary key, write disposition, and other default settings here:
config = {
"client": {
# ...
},
"resource_defaults": {
"primary_key": "id",
"write_disposition": "merge",
"endpoint": {
"params": {
"per_page": 100,
},
},
},
"resources": [
"resource1",
{
"name": "resource2_name",
"write_disposition": "append",
"endpoint": {
"params": {
"param1": "value1",
},
},
}
],
}
Above, all resources will have primary_key
set to id
, resource1
will have write_disposition
set to merge
, and resource2
will override the default write_disposition
with append
.
Both resource1
and resource2
will have the per_page
parameter set to 100.
resources
This is a list of resource configurations that define the API endpoints to be loaded. Each resource configuration can be:
- a dictionary with the resource configuration.
- a string. In this case, the string is used as both the endpoint path and the resource name, and the resource configuration is taken from the
resource_defaults
configuration if it exists.
Resource configuration
A resource configuration is used to define a dlt resource for the data to be loaded from an API endpoint. It contains the following key fields:
endpoint
: The endpoint configuration for the resource. It can be a string or a dict representing the endpoint settings. See the endpoint configuration section for more details.write_disposition
: The write disposition for the resource.primary_key
: The primary key for the resource.include_from_parent
: A list of fields from the parent resource to be included in the resource output. See the resource relationships section for more details.processing_steps
: A list of processing steps to filter and transform your data.selected
: A flag to indicate if the resource is selected for loading. This could be useful when you want to load data only from child resources and not from the parent resource.auth
: An optionalAuthConfig
instance. If passed, is used over the one defined in the client definition. Example:
from dlt.sources.helpers.rest_client.auth import HttpBasicAuth
config = {
"client": {
"auth": {
"type": "bearer",
"token": dlt.secrets["your_api_token"],
}
},
"resources": [
"resource-using-bearer-auth",
{
"name": "my-resource-with-special-auth",
"endpoint": {
# ...
"auth": HttpBasicAuth("user", dlt.secrets["your_basic_auth_password"])
},
# ...
}
]
# ...
}
This would use Bearer
auth as defined in the client
for resource-using-bearer-auth
and Http Basic
auth for my-resource-with-special-auth
.
You can also pass additional resource parameters that will be used to configure the dlt resource. See dlt resource API reference for more details.
Endpoint configuration
The endpoint configuration defines how to query the API endpoint. Quick example:
{
"path": "issues",
"method": "GET",
"params": {
"sort": "updated",
"direction": "desc",
"state": "open",
"since": {
"type": "incremental",
"cursor_path": "updated_at",
"initial_value": "2024-01-25T11:21:28Z",
},
},
"data_selector": "results",
}
The fields in the endpoint configuration are:
path
: The path to the API endpoint. By default this path is appended to the givenbase_url
. If this is a fully qualified URL starting withhttp:
orhttps:
it will be used as-is andbase_url
will be ignored.method
: The HTTP method to be used. The default isGET
.params
: Query parameters to be sent with each request. For example,sort
to order the results orsince
to specify incremental loading. This is also used to define resource relationships.json
: The JSON payload to be sent with the request (for POST and PUT requests).paginator
: Pagination configuration for the endpoint. See the pagination section for more details.data_selector
: A JSONPath to select the data from the response. See the data selection section for more details.response_actions
: A list of actions that define how to process the response data. See the response actions section for more details.incremental
: Configuration for incremental loading.
Pagination
The REST API source will try to automatically handle pagination for you. This works by detecting the pagination details from the first API response.
In some special cases, you may need to specify the pagination configuration explicitly.
To specify the pagination configuration, use the paginator
field in the client or endpoint configurations. You may either use a dictionary with a string alias in the type
field along with the required parameters, or use a paginator class instance.
Example
Suppose the API response for https://api.example.com/posts
contains a next
field with the URL to the next page:
{
"data": [
{"id": 1, "title": "Post 1"},
{"id": 2, "title": "Post 2"},
{"id": 3, "title": "Post 3"}
],
"pagination": {
"next": "https://api.example.com/posts?page=2"
}
}
You can configure the pagination for the posts
resource like this:
{
"path": "posts",
"paginator": {
"type": "json_link",
"next_url_path": "pagination.next",
}
}
Alternatively, you can use the paginator instance directly:
from dlt.sources.helpers.rest_client.paginators import JSONLinkPaginator
# ...
{
"path": "posts",
"paginator": JSONLinkPaginator(
next_url_path="pagination.next"
),
}
Currently, pagination is supported only for GET requests. To handle POST requests with pagination, you need to implement a custom paginator.
These are the available paginators:
type | Paginator class | Description |
---|---|---|
json_link | JSONLinkPaginator | The link to the next page is in the body (JSON) of the response. Parameters:
|
header_link | HeaderLinkPaginator | The links to the next page are in the response headers. Parameters:
|
offset | OffsetPaginator | The pagination is based on an offset parameter, with the total items count either in the response body or explicitly provided. Parameters:
|
page_number | PageNumberPaginator | The pagination is based on a page number parameter, with the total pages count either in the response body or explicitly provided. Parameters:
|
cursor | JSONResponseCursorPaginator | The pagination is based on a cursor parameter, with the value of the cursor in the response body (JSON). Parameters:
|
single_page | SinglePagePaginator | The response will be interpreted as a single-page response, ignoring possible pagination metadata. |
auto | None | Explicitly specify that the source should automatically detect the pagination method. |
For more complex pagination methods, you can implement a custom paginator, instantiate it, and use it in the configuration.
Alternatively, you can use the dictionary configuration syntax also for custom paginators. For this, you need to register your custom paginator:
from dlt.sources.rest_api.config_setup import register_paginator
class CustomPaginator(SinglePagePaginator):
# custom implementation of SinglePagePaginator
pass
register_paginator("custom_paginator", CustomPaginator)
{
# ...
"paginator": {
"type": "custom_paginator",
"next_url_path": "paging.nextLink",
}
}
Data selection
The data_selector
field in the endpoint configuration allows you to specify a JSONPath to select the data from the response. By default, the source will try to detect the locations of the data automatically.
Use this field when you need to specify the location of the data in the response explicitly.
For example, if the API response looks like this:
{
"posts": [
{"id": 1, "title": "Post 1"},
{"id": 2, "title": "Post 2"},
{"id": 3, "title": "Post 3"}
]
}
You can use the following endpoint configuration:
{
"path": "posts",
"data_selector": "posts",
}
For a nested structure like this:
{
"results": {
"posts": [
{"id": 1, "title": "Post 1"},
{"id": 2, "title": "Post 2"},
{"id": 3, "title": "Post 3"}
]
}
}
You can use the following endpoint configuration:
{
"path": "posts",
"data_selector": "results.posts",
}
Read more about JSONPath syntax to learn how to write selectors.
Authentication
For APIs that require authentication to access their endpoints, the REST API source supports various authentication methods, including token-based authentication, query parameters, basic authentication, and custom authentication. The authentication configuration is specified in the auth
field of the client either as a dictionary or as an instance of the authentication class.
Quick example
One of the most common methods is token-based authentication (also known as Bearer token authentication). To authenticate using this method, you can use the following shortcut:
{
"client": {
# ...
"auth": {
"token": dlt.secrets["your_api_token"],
},
# ...
},
}
Make sure to store your access tokens and other sensitive information in the secrets.toml
file and never commit it to the version control system.
Available authentication types:
Authentication class | String Alias (type ) | Description |
---|---|---|
BearerTokenAuth | bearer | Bearer token authentication. |
HTTPBasicAuth | http_basic | Basic HTTP authentication. |
APIKeyAuth | api_key | API key authentication with key defined in the query parameters or in the headers. |
OAuth2ClientCredentials | oauth2_client_credentials | OAuth 2.0 authorization with a temporary access token obtained from the authorization server. |
To specify the authentication configuration, use the auth
field in the client configuration:
{
"client": {
# ...
"auth": {
"type": "bearer",
"token": dlt.secrets["your_api_token"],
},
# ...
},
}
Alternatively, you can use the authentication class directly:
from dlt.sources.helpers.rest_client.auth import BearerTokenAuth
config = {
"client": {
"auth": BearerTokenAuth(dlt.secrets["your_api_token"]),
},
"resources": [
]
# ...
}
Make sure to store your access tokens and other sensitive information in the secrets.toml
file and never commit it to the version control system.
Available authentication types:
type | Authentication class | Description |
---|---|---|
bearer | BearerTokenAuth | Bearer token authentication. Parameters:
|
http_basic | HTTPBasicAuth | Basic HTTP authentication. Parameters:
|
api_key | APIKeyAuth | API key authentication with key defined in the query parameters or in the headers. Parameters:
|
oauth2_client_credentials | OAuth2ClientCredentials) | OAuth 2.0 Client Credentials authorization for server-to-server communication without user consent. Parameters:
|
For more complex authentication methods, you can implement a custom authentication class and use it in the configuration.
You can use the dictionary configuration syntax also for custom authentication classes after registering them as follows:
from dlt.sources.rest_api.config_setup import register_auth
class CustomAuth(AuthConfigBase):
pass
register_auth("custom_auth", CustomAuth)
{
# ...
"auth": {
"type": "custom_auth",
"api_key": dlt.secrets["sources.my_source.my_api_key"],
}
}
Define resource relationships
When you have a resource that depends on another resource, you can define the relationship using the resolve
configuration. This allows you to link one or more path parameters in the child resource to fields in the parent resource's data.
In the GitHub example, the issue_comments
resource depends on the issues
resource. The issue_number
parameter in the issue_comments
endpoint configuration is resolved from the number
field of the issues
resource:
{
"resources": [
{
"name": "issues",
"endpoint": {
"path": "issues",
# ...
},
},
{
"name": "issue_comments",
"endpoint": {
"path": "issues/{issue_number}/comments",
"params": {
"issue_number": {
"type": "resolve",
"resource": "issues",
"field": "number",
}
},
},
"include_from_parent": ["id"],
},
],
}
This configuration tells the source to get issue numbers from the issues
resource and use them to fetch comments for each issue. So if the issues
resource yields the following data:
[
{"id": 1, "number": 123},
{"id": 2, "number": 124},
{"id": 3, "number": 125}
]
The issue_comments
resource will make requests to the following endpoints:
issues/123/comments
issues/124/comments
issues/125/comments
The syntax for the resolve
field in parameter configuration is:
{
"<parameter_name>": {
"type": "resolve",
"resource": "<parent_resource_name>",
"field": "<parent_resource_field_name_or_jsonpath>",
}
}
The field
value can be specified as a JSONPath to select a nested field in the parent resource data. For example: "field": "items[0].id"
.
Under the hood, dlt handles this by using a transformer resource.
Resolving multiple path parameters from a parent resource
When a child resource depends on multiple fields from a single parent resource, you can define multiple resolve
parameters in the endpoint configuration. For example:
{
"resources": [
"groups",
{
"name": "users",
"endpoint": {
"path": "groups/{group_id}/users",
"params": {
"group_id": {
"type": "resolve",
"resource": "groups",
"field": "id",
},
},
},
},
{
"name": "user_details",
"endpoint": {
"path": "groups/{group_id}/users/{user_id}/details",
"params": {
"group_id": {
"type": "resolve",
"resource": "users",
"field": "group_id",
},
"user_id": {
"type": "resolve",
"resource": "users",
"field": "id",
},
},
},
},
],
}
In the configuration above:
- The
users
resource depends on thegroups
resource, resolving thegroup_id
parameter from theid
field ingroups
. - The
user_details
resource depends on theusers
resource, resolving bothgroup_id
anduser_id
parameters from fields inusers
.
Include fields from the parent resource
You can include data from the parent resource in the child resource by using the include_from_parent
field in the resource configuration. For example:
{
"name": "issue_comments",
"endpoint": {
...
},
"include_from_parent": ["id", "title", "created_at"],
}
This will include the id
, title
, and created_at
fields from the issues
resource in the issue_comments
resource data. The names of the included fields will be prefixed with the parent resource name and an underscore (_
) like so: _issues_id
, _issues_title
, _issues_created_at
.
Define a resource which is not a REST endpoint
Sometimes, we want to request endpoints with specific values that are not returned by another endpoint.
Thus, you can also include arbitrary dlt resources in your RESTAPIConfig
instead of defining a resource for every path!
In the following example, we want to load the issues belonging to three repositories.
Instead of defining three different issues resources, one for each of the paths dlt-hub/dlt/issues/
, dlt-hub/verified-sources/issues/
, dlt-hub/dlthub-education/issues/
, we have a resource repositories
which yields a list of repository names that will be fetched by the dependent resource issues
.
from dlt.sources.rest_api import RESTAPIConfig
@dlt.resource()
def repositories() -> Generator[List[Dict[str, Any]], Any, Any]:
"""A seed list of repositories to fetch"""
yield [{"name": "dlt"}, {"name": "verified-sources"}, {"name": "dlthub-education"}]
config: RESTAPIConfig = {
"client": {"base_url": "https://github.com/api/v2"},
"resources": [
{
"name": "issues",
"endpoint": {
"path": "dlt-hub/{repository}/issues/",
"params": {
"repository": {
"type": "resolve",
"resource": "repositories",
"field": "name",
},
},
},
},
repositories(),
],
}
Be careful that the parent resource needs to return Generator[List[Dict[str, Any]]]
. Thus, the following will NOT work:
@dlt.resource
def repositories() -> Generator[Dict[str, Any], Any, Any]:
"""Not working seed list of repositories to fetch"""
yield from [{"name": "dlt"}, {"name": "verified-sources"}, {"name": "dlthub-education"}]
Processing steps: filter and transform data
The processing_steps
field in the resource configuration allows you to apply transformations to the data fetched from the API before it is loaded into your destination. This is useful when you need to filter out certain records, modify the data structure, or anonymize sensitive information.
Each processing step is a dictionary specifying the type of operation (filter
or map
) and the function to apply. Steps apply in the order they are listed.
Quick example
def lower_title(record):
record["title"] = record["title"].lower()
return record
config: RESTAPIConfig = {
"client": {
"base_url": "https://api.example.com",
},
"resources": [
{
"name": "posts",
"processing_steps": [
{"filter": lambda x: x["id"] < 10},
{"map": lower_title},
],
},
],
}
In the example above:
- First, the
filter
step uses a lambda function to include only records whereid
is less than 10. - Thereafter, the
map
step applies thelower_title
function to each remaining record.
Using filter
The filter
step allows you to exclude records that do not meet certain criteria. The provided function should return True
to keep the record or False
to exclude it:
{
"name": "posts",
"endpoint": "posts",
"processing_steps": [
{"filter": lambda x: x["id"] in [10, 20, 30]},
],
}
In this example, only records with id
equal to 10, 20, or 30 will be included.
Using map
The map
step allows you to modify the records fetched from the API. The provided function should take a record as an argument and return the modified record. For example, to anonymize the email
field:
def anonymize_email(record):
record["email"] = "REDACTED"
return record
config: RESTAPIConfig = {
"client": {
"base_url": "https://api.example.com",
},
"resources": [
{
"name": "users",
"processing_steps": [
{"map": anonymize_email},
],
},
],
}
Combining filter
and map
You can combine multiple processing steps to achieve complex transformations:
{
"name": "posts",
"endpoint": "posts",
"processing_steps": [
{"filter": lambda x: x["id"] < 10},
{"map": lower_title},
{"filter": lambda x: "important" in x["title"]},
],
}
Best practices
- Order matters: Processing steps are applied in the order they are listed. Be mindful of the sequence, especially when combining
map
andfilter
. - Function definition: Define your filter and map functions separately for clarity and reuse.
- Use
filter
to exclude records early in the process to reduce the amount of data that needs to be processed. - Combine consecutive
map
steps into a single function for faster execution.
Incremental loading
Some APIs provide a way to fetch only new or changed data (most often by using a timestamp field like updated_at
, created_at
, or incremental IDs).
This is called incremental loading and is very useful as it allows you to reduce the load time and the amount of data transferred.
When the API endpoint supports incremental loading, you can configure dlt to load only the new or changed data using these two methods:
- Defining a special parameter in the
params
section of the endpoint configuration. - Specifying the
incremental
field in the endpoint configuration.
Let's start with the first method.
Incremental loading in params
Imagine we have the following endpoint https://api.example.com/posts
and it:
- Accepts a
created_since
query parameter to fetch posts created after a certain date. - Returns a list of posts with the
created_at
field for each post.
For example, if we query the endpoint with https://api.example.com/posts?created_since=2024-01-25
, we get the following response:
{
"results": [
{"id": 1, "title": "Post 1", "created_at": "2024-01-26"},
{"id": 2, "title": "Post 2", "created_at": "2024-01-27"},
{"id": 3, "title": "Post 3", "created_at": "2024-01-28"}
]
}
To enable incremental loading for this endpoint, you can use the following endpoint configuration:
{
"path": "posts",
"data_selector": "results", # Optional JSONPath to select the list of posts
"params": {
"created_since": {
"type": "incremental",
"cursor_path": "created_at", # The JSONPath to the field we want to track in each post
"initial_value": "2024-01-25",
},
},
}
After you run the pipeline, dlt will keep track of the last created_at
from all the posts fetched and use it as the created_since
parameter in the next request.
So in our case, the next request will be made to https://api.example.com/posts?created_since=2024-01-28
to fetch only the new posts created after 2024-01-28
.
Let's break down the configuration.
- We explicitly set
data_selector
to"results"
to select the list of posts from the response. This is optional; if not set, dlt will try to auto-detect the data location. - We define the
created_since
parameter as an incremental parameter with the following fields:
{
"created_since": {
"type": "incremental",
"cursor_path": "created_at",
"initial_value": "2024-01-25",
},
}
type
: The type of the parameter definition. In this case, it must be set toincremental
.cursor_path
: The JSONPath to the field within each item in the list. The value of this field will be used in the next request. In the example above, our items look like{"id": 1, "title": "Post 1", "created_at": "2024-01-26"}
so to track the created time, we setcursor_path
to"created_at"
. Note that the JSONPath starts from the root of the item (dict) and not from the root of the response.initial_value
: The initial value for the cursor. This is the value that will initialize the state of incremental loading. In this case, it's2024-01-25
. The value type should match the type of the field in the data item.
Incremental loading using the incremental
field
The alternative method is to use the incremental
field in the endpoint configuration. This configuration is more powerful than the method shown above because it also allows you to specify not only the start parameter and value but also the end parameter and value for the incremental loading.
Let's take the same example as above and configure it using the incremental
field:
{
"path": "posts",
"data_selector": "results",
"incremental": {
"start_param": "created_since",
"cursor_path": "created_at",
"initial_value": "2024-01-25",
},
}
Note that we specify the query parameter name created_since
in the start_param
field and not in the params
section.
The full available configuration for the incremental
field is:
{
"incremental": {
"start_param": "<start_parameter_name>",
"end_param": "<end_parameter_name>",
"cursor_path": "<path_to_cursor_field>",
"initial_value": "<initial_value>",
"end_value": "<end_value>",
"convert": my_callable,
}
}
The fields are:
start_param
(str): The name of the query parameter to be used as the start condition. If we use the example above, it would be"created_since"
.end_param
(str): The name of the query parameter to be used as the end condition. This is optional and can be omitted if you only need to track the start condition. This is useful when you need to fetch data within a specific range and the API supports end conditions (like thecreated_before
query parameter).cursor_path
(str): The JSONPath to the field within each item in the list. This is the field that will be used to track the incremental loading. In the example above, it's"created_at"
.initial_value
(str): The initial value for the cursor. This is the value that will initialize the state of incremental loading.end_value
(str): The end value for the cursor to stop the incremental loading. This is optional and can be omitted if you only need to track the start condition. If you set this field,initial_value
needs to be set as well.convert
(callable): A callable that converts the cursor value into the format that the query parameter requires. For example, a UNIX timestamp can be converted into an ISO 8601 date or a date can be converted intocreated_at+gt+{date}
.
See the incremental loading guide for more details.
If you encounter issues with incremental loading, see the troubleshooting section in the incremental loading guide.
Convert the incremental value before calling the API
If you need to transform the values in the cursor field before passing them to the API endpoint, you can specify a callable under the key convert
. For example, the API might return UNIX epoch timestamps but expects to be queried with an ISO 8601 date. To achieve that, we can specify a function that converts from the date format returned by the API to the date format required for API requests.
In the following examples, 1704067200
is returned from the API in the field updated_at
, but the API will be called with ?created_since=2024-01-01
.
Incremental loading using the params
field:
{
"created_since": {
"type": "incremental",
"cursor_path": "updated_at",
"initial_value": "1704067200",
"convert": lambda epoch: pendulum.from_timestamp(int(epoch)).to_date_string(),
}
}
Incremental loading using the incremental
field:
{
"path": "posts",
"data_selector": "results",
"incremental": {
"start_param": "created_since",
"cursor_path": "updated_at",
"initial_value": "1704067200",
"convert": lambda epoch: pendulum.from_timestamp(int(epoch)).to_date_string(),
},
}
Troubleshooting
If you encounter issues while running the pipeline, enable logging for detailed information about the execution:
RUNTIME__LOG_LEVEL=INFO python my_script.py
This also provides details on the HTTP requests.
Configuration issues
Getting validation errors
When you are running the pipeline and getting a DictValidationException
, it means that the source configuration is incorrect. The error message provides details on the issue, including the path to the field and the expected type.
For example, if you have a source configuration like this:
config: RESTAPIConfig = {
"client": {
# ...
},
"resources": [
{
"name": "issues",
"params": { # <- Wrong: this should be inside
"sort": "updated", # the endpoint field below
},
"endpoint": {
"path": "issues",
# "params": { # <- Correct configuration
# "sort": "updated",
# },
},
},
# ...
],
}
You will get an error like this:
dlt.common.exceptions.DictValidationException: In path .: field 'resources[0]'
expects the following types: str, EndpointResource. Provided value {'name': 'issues', 'params': {'sort': 'updated'},
'endpoint': {'path': 'issues', ... }} with type 'dict' is invalid with the following errors:
For EndpointResource: In path ./resources[0]: following fields are unexpected {'params'}
It means that in the first resource configuration (resources[0]
), the params
field should be inside the endpoint
field.
Import the RESTAPIConfig
type from the rest_api
module to have convenient hints in your editor/IDE and use it to define the configuration object.
from dlt.sources.rest_api import RESTAPIConfig
Getting wrong data or no data
If incorrect data is received from an endpoint, check the data_selector
field in the endpoint configuration. Ensure the JSONPath is accurate and points to the correct data in the response body. rest_api
attempts to auto-detect the data location, which may not always succeed. See the data selection section for more details.
Getting insufficient data or incorrect pagination
Check the paginator
field in the configuration. When not explicitly specified, the source tries to auto-detect the pagination method. If auto-detection fails, or the system is unsure, a warning is logged. For production environments, we recommend specifying an explicit paginator in the configuration. See the pagination section for more details. Some APIs may have non-standard pagination methods, and you may need to implement a custom paginator.
Incremental loading not working
See the troubleshooting guide for incremental loading issues.
Getting HTTP 404 errors
Some APIs may return 404 errors for resources that do not exist or have no data. Manage these responses by configuring the ignore
action in response actions.
Authentication issues
If you are experiencing 401 (Unauthorized) errors, this could indicate:
- Incorrect authorization credentials. Verify credentials in the
secrets.toml
. Refer to Secret and configs for more information. - An incorrect authentication type. Consult the API documentation for the proper method. See the authentication section for details. For some APIs, a custom authentication method may be required.
General guidelines
The rest_api
source uses the RESTClient class for HTTP requests. Refer to the RESTClient troubleshooting guide for debugging tips.
For further assistance, join our Slack community. We're here to help!