Skip to main content

REST API generic source

Need help deploying these sources, or figuring out how to run them in your data stack?
Join our Slack community or book a call with our support engineer Violetta.

This is a generic dlt source you can use to extract data from any REST API. It uses declarative configuration to define the API endpoints, their relationships, how to handle pagination, and authentication.

Setup guide

Initialize the verified source

Enter the following command in your terminal:

dlt init rest_api duckdb

dlt init will initialize the pipeline examples for REST API as the source and duckdb as the destination.

Running dlt init creates the following in the current folder:

  • rest_api_pipeline.py file with a sample pipelines definition:
    • GitHub API example
    • Pokemon API example
  • .dlt folder with:
    • secrets.toml file to store your access tokens and other sensitive information
    • config.toml file to store the configuration settings
  • requirements.txt file with the required dependencies

Change the REST API source to your needs by modifying the rest_api_pipeline.py file. See the detailed source configuration section below.

note

For the rest of the guide, we will use the GitHub API and Pokemon API as example sources.

This source is based on the RESTClient class.

Add credentials

In the .dlt folder, you'll find a file called secrets.toml, where you can securely store your access tokens and other sensitive information. It's important to handle this file with care and keep it safe.

The GitHub API requires an access token to access some of its endpoints and to increase the rate limit for the API calls. To get a GitHub token, follow the GitHub documentation on managing your personal access tokens.

After you get the token, add it to the secrets.toml file:

[sources.rest_api.github]
github_token = "your_github_token"

Run the pipeline

  1. Install the required dependencies by running the following command:

    pip install -r requirements.txt
  2. Run the pipeline:

    python rest_api_pipeline.py
  3. Verify that everything loaded correctly by using the following command:

    dlt pipeline rest_api show

Source configuration

Quick example

Let's take a look at the GitHub example in rest_api_pipeline.py file:

from rest_api import RESTAPIConfig, rest_api_resources

@dlt.source
def github_source(github_token=dlt.secrets.value):
config: RESTAPIConfig = {
"client": {
"base_url": "https://api.github.com/repos/dlt-hub/dlt/",
"auth": {
"token": github_token,
},
},
"resource_defaults": {
"primary_key": "id",
"write_disposition": "merge",
"endpoint": {
"params": {
"per_page": 100,
},
},
},
"resources": [
{
"name": "issues",
"endpoint": {
"path": "issues",
"params": {
"sort": "updated",
"direction": "desc",
"state": "open",
"since": {
"type": "incremental",
"cursor_path": "updated_at",
"initial_value": "2024-01-25T11:21:28Z",
},
},
},
},
{
"name": "issue_comments",
"endpoint": {
"path": "issues/{issue_number}/comments",
"params": {
"issue_number": {
"type": "resolve",
"resource": "issues",
"field": "number",
}
},
},
"include_from_parent": ["id"],
},
],
}

yield from rest_api_resources(config)

def load_github() -> None:
pipeline = dlt.pipeline(
pipeline_name="rest_api_github",
destination="duckdb",
dataset_name="rest_api_data",
)

load_info = pipeline.run(github_source())
print(load_info)

The declarative resource configuration is defined in the config dictionary. It contains the following key components:

  1. client: Defines the base URL and authentication method for the API. In this case it uses token-based authentication. The token is stored in the secrets.toml file.

  2. resource_defaults: Contains default settings for all resources. In this example, we define that all resources:

    • Have id as the primary key
    • Use the merge write disposition to merge the data with the existing data in the destination.
    • Send a per_page query parameter with each request to 100 to get more results per page.
  3. resources: A list of resources to be loaded. Here, we have two resources: issues and issue_comments, which correspond to the GitHub API endpoints for repository issues and issue comments. Note that we need a in issue number to fetch comments for each issue. This number is taken from the issues resource. More on this in the resource relationships section.

Let's break down the configuration in more detail.

Configuration structure

tip

Import the RESTAPIConfig type from the rest_api module to have convenient hints in your editor/IDE and use it to define the configuration object.

from rest_api import RESTAPIConfig

The configuration object passed to the REST API Generic Source has three main elements:

config: RESTAPIConfig = {
"client": {
...
},
"resource_defaults": {
...
},
"resources": [
...
],
}

client

The client configuration is used to connect to the API's endpoints. It includes the following fields:

  • base_url (str): The base URL of the API. This string is prepended to all endpoint paths. For example, if the base URL is https://api.example.com/v1/, and the endpoint path is users, the full URL will be https://api.example.com/v1/users.
  • headers (dict, optional): Additional headers that are sent with each request.
  • auth (optional): Authentication configuration. This can be a simple token, an AuthConfigBase object, or a more complex authentication method.
  • paginator (optional): Configuration for the default pagination used for resources that support pagination. Refer to the pagination section for more details.

resource_defaults (optional)

resource_defaults contains the default values to configure the dlt resources. This configuration is applied to all resources unless overridden by the resource-specific configuration.

For example, you can set the primary key, write disposition, and other default settings here:

config = {
"client": {
# ...
},
"resource_defaults": {
"primary_key": "id",
"write_disposition": "merge",
"endpoint": {
"params": {
"per_page": 100,
},
},
},
"resources": [
"resource1",
{
"resource2": {
"name": "resource2_name",
"write_disposition": "append",
"endpoint": {
"params": {
"param1": "value1",
},
},
}
}
],
}

Above, all resources will have primary_key set to id, resource1 will have write_disposition set to merge, and resource2 will override the default write_disposition with append. Both resource1 and resource2 will have the per_page parameter set to 100.

resources

This is a list of resource configurations that define the API endpoints to be loaded. Each resource configuration can be:

  • a dictionary with the resource configuration.
  • a string. In this case, the string is used as the both as the endpoint path and the resource name, and the resource configuration is taken from the resource_defaults configuration if it exists.

Resource configuration

A resource configuration is used to define a dlt resource for the data to be loaded from an API endpoint. It contains the following key fields:

  • endpoint: The endpoint configuration for the resource. It can be a string or a dict representing the endpoint settings. See the endpoint configuration section for more details.
  • write_disposition: The write disposition for the resource.
  • primary_key: The primary key for the resource.
  • include_from_parent: A list of fields from the parent resource to be included in the resource output. See the resource relationships section for more details.
  • selected: A flag to indicate if the resource is selected for loading. This could be useful when you want to load data only from child resources and not from the parent resource.

You can also pass additional resource parameters that will be used to configure the dlt resource. See dlt resource API reference for more details.

Endpoint configuration

The endpoint configuration defines how to query the API endpoint. Quick example:

{
"path": "issues",
"method": "GET",
"params": {
"sort": "updated",
"direction": "desc",
"state": "open",
"since": {
"type": "incremental",
"cursor_path": "updated_at",
"initial_value": "2024-01-25T11:21:28Z",
},
},
"data_selector": "results",
}

The fields in the endpoint configuration are:

  • path: The path to the API endpoint.
  • method: The HTTP method to be used. Default is GET.
  • params: Query parameters to be sent with each request. For example, sort to order the results or since to specify incremental loading. This is also used to define resource relationships.
  • json: The JSON payload to be sent with the request (for POST and PUT requests).
  • paginator: Pagination configuration for the endpoint. See the pagination section for more details.
  • data_selector: A JSONPath to select the data from the response. See the data selection section for more details.
  • response_actions: A list of actions that define how to process the response data. See the response actions section for more details.
  • incremental: Configuration for incremental loading.

Pagination

The REST API source will try to automatically handle pagination for you. This works by detecting the pagination details from the first API response.

In some special cases, you may need to specify the pagination configuration explicitly.

To specify the pagination configuration, use the paginator field in the client or endpoint configurations. You may either use a dictionary with a string alias in the type field along with the required parameters, or use a paginator class instance.

Example

Suppose the API response for https://api.example.com/posts contains a next field with the URL to the next page:

{
"data": [
{"id": 1, "title": "Post 1"},
{"id": 2, "title": "Post 2"},
{"id": 3, "title": "Post 3"}
],
"pagination": {
"next": "https://api.example.com/posts?page=2"
}
}

You can configure the pagination for the posts resource like this:

{
"path": "posts",
"paginator": {
"type": "json_response",
"next_url_path": "pagination.next",
}
}

Alternatively, you can use the paginator instance directly:

from dlt.sources.helpers.rest_client.paginators import JSONResponsePaginator

# ...

{
"path": "posts",
"paginator": JSONResponsePaginator(
next_url_path="pagination.next"
),
}
note

Currently pagination is supported only for GET requests. To handle POST requests with pagination, you need to implement a custom paginator.

These are the available paginators:

typePaginator classDescription
json_responseJSONResponsePaginatorThe link to the next page is in the body (JSON) of the response.
Parameters:
  • next_url_path (str) - the JSONPath to the next page URL
header_linkHeaderLinkPaginatorThe links to the next page are in the response headers.
Parameters:
  • link_header (str) - the name of the header containing the links. Default is "next".
offsetOffsetPaginatorThe pagination is based on an offset parameter. With total items count either in the response body or explicitly provided.
Parameters:
  • limit (int) - the maximum number of items to retrieve in each request
  • offset (int) - the initial offset for the first request. Defaults to 0
  • offset_param (str) - the name of the query parameter used to specify the offset. Defaults to "offset"
  • limit_param (str) - the name of the query parameter used to specify the limit. Defaults to "limit"
  • total_path (str) - a JSONPath expression for the total number of items. If not provided, pagination is controlled by maximum_offset
  • maximum_offset (int) - optional maximum offset value. Limits pagination even without total count
page_numberPageNumberPaginatorThe pagination is based on a page number parameter. With total pages count either in the response body or explicitly provided.
Parameters:
  • initial_page (int) - the starting page number. Defaults to 0
  • page_param (str) - the query parameter name for the page number. Defaults to "page"
  • total_path (str) - a JSONPath expression for the total number of pages. If not provided, pagination is controlled by maximum_page
  • maximum_page (int) - optional maximum page number. Stops pagination once this page is reached
cursorJSONResponseCursorPaginatorThe pagination is based on a cursor parameter. The value of the cursor is in the response body (JSON).
Parameters:
  • cursor_path (str) - the JSONPath to the cursor value. Defaults to "cursors.next"
  • cursor_param (str) - the query parameter name for the cursor. Defaults to "after"
single_pageSinglePagePaginatorThe response will be interpreted as a single-page response, ignoring possible pagination metadata.
autoNoneExplicitly specify that the source should automatically detect the pagination method.

For more complex pagination methods, you can implement a custom paginator, instantiate it, and use it in the configuration.

Data selection

The data_selector field in the endpoint configuration allows you to specify a JSONPath to select the data from the response. By default, the source will try to detect locations of the data automatically.

Use this field when you need to specify the location of the data in the response explicitly.

For example, if the API response looks like this:

{
"posts": [
{"id": 1, "title": "Post 1"},
{"id": 2, "title": "Post 2"},
{"id": 3, "title": "Post 3"}
]
}

You can use the following endpoint configuration:

{
"path": "posts",
"data_selector": "posts",
}

For a nested structure like this:

{
"results": {
"posts": [
{"id": 1, "title": "Post 1"},
{"id": 2, "title": "Post 2"},
{"id": 3, "title": "Post 3"}
]
}
}

You can use the following endpoint configuration:

{
"path": "posts",
"data_selector": "results.posts",
}

Read more about JSONPath syntax to learn how to write selectors.

Authentication

For APIs that require authentication to access their endpoints, the REST API source supports various authentication methods, including token-based authentication, query parameters, basic authentication, and custom authentication. The authentication configuration is specified in the auth field of the client either as a dictionary or as an instance of the authentication class.

Quick example

One of the most common methods is token-based authentication (also known as Bearer token authentication). To authenticate using this method, you can use the following shortcut:

{
"client": {
# ...
"auth": {
"token": dlt.secrets["your_api_token"],
},
# ...
},
}

The full version of the configuration would also include the authentication type (bearer) explicitly:

{
"client": {
# ...
"auth": {
"type": "bearer",
"token": dlt.secrets["your_api_token"],
},
# ...
},
}

Alternatively, you can use the authentication class directly:

from dlt.sources.helpers.rest_client.auth import BearerTokenAuth

config = {
"client": {
"auth": BearTokenAuth(dlt.secrets["your_api_token"]),
},
# ...
}
danger

Make sure to store your access tokens and other sensitive information in the secrets.toml file and never commit it to the version control system.

Available authentication types:

typeAuthentication classDescription
bearerBearTokenAuthBearer token authentication.
Parameters:
  • token (str)
http_basicHTTPBasicAuthBasic HTTP authentication.
Parameters:
  • username (str)
  • password (str)
api_keyAPIKeyAuthAPI key authentication with key defined in the query parameters or in the headers.
Parameters:
  • name (str) - the name of the query parameter or header
  • api_key (str) - the API key value
  • location (str, optional) - the location of the API key in the request. Can be query or header. Default is header

For more complex authentication methods, you can implement a custom authentication class and use it in the configuration.

Define resource relationships

When you have a resource that depends on another resource, you can define the relationship using the resolve configuration. With it you link a path parameter in the child resource to a field in the parent resource's data.

In the GitHub example, the issue_comments resource depends on the issues resource. The issue_number parameter in the issue_comments endpoint configuration is resolved from the number field of the issues resource:

{
"resources": [
{
"name": "issues",
"endpoint": {
"path": "issues",
# ...
},
},
{
"name": "issue_comments",
"endpoint": {
"path": "issues/{issue_number}/comments",
"params": {
"issue_number": {
"type": "resolve",
"resource": "issues",
"field": "number",
}
},
},
"include_from_parent": ["id"],
},
],
}

This configuration tells the source to get issue numbers from the issues resource and use them to fetch comments for each issue. So if the issues resource yields the following data:

[
{"id": 1, "number": 123},
{"id": 2, "number": 124},
{"id": 3, "number": 125}
]

The issue_comments resource will make requests to the following endpoints:

  • issues/123/comments
  • issues/124/comments
  • issues/125/comments

The syntax for the resolve field in parameter configuration is:

{
"<parameter_name>": {
"type": "resolve",
"resource": "<parent_resource_name>",
"field": "<parent_resource_field_name_or_jsonpath>",
}
}

The field value can be specified as a JSONPath to select a nested field in the parent resource data. For example: "field": "items[0].id".

Under the hood, dlt handles this by using a transformer resource.

Include fields from the parent resource

You can include data from the parent resource in the child resource by using the include_from_parent field in the resource configuration. For example:

{
"name": "issue_comments",
"endpoint": {
...
},
"include_from_parent": ["id", "title", "created_at"],
}

This will include the id, title, and created_at fields from the issues resource in the issue_comments resource data. The name of the included fields will be prefixed with the parent resource name and an underscore (_) like so: _issues_id, _issues_title, _issues_created_at.

Incremental loading

Some APIs provide a way to fetch only new or changed data (most often by using a timestamp field like updated_at, created_at, or incremental IDs). This is called incremental loading and is very useful as it allows you to reduce the load time and the amount of data transferred.

When the API endpoint supports incremental loading, you can configure dlt to load only the new or changed data using these two methods:

  1. Defining a special parameter in the params section of the endpoint configuration.
  2. Specifying the incremental field in the endpoint configuration.

Let's start with the first method.

Incremental loading in params

Imagine we have the following endpoint https://api.example.com/posts and it:

  1. Accepts a created_since query parameter to fetch posts created after a certain date.
  2. Returns a list of posts with the created_at field for each post.

For example, if we query the endpoint with https://api.example.com/posts?created_since=2024-01-25, we get the following response:

{
"results": [
{"id": 1, "title": "Post 1", "created_at": "2024-01-26"},
{"id": 2, "title": "Post 2", "created_at": "2024-01-27"},
{"id": 3, "title": "Post 3", "created_at": "2024-01-28"}
]
}

To enable the incremental loading for this endpoint, you can use the following endpoint configuration:

{
"path": "posts",
"data_selector": "results", # Optional JSONPath to select the list of posts
"params": {
"created_since": {
"type": "incremental",
"cursor_path": "created_at", # The JSONPath to the field we want to track in each post
"initial_value": "2024-01-25",
},
},
}

After you run the pipeline, dlt will keep track of the last created_at from all the posts fetched and use it as the created_since parameter in the next request. So in our case, the next request will be made to https://api.example.com/posts?created_since=2024-01-28 to fetch only the new posts created after 2024-01-28.

Let's break down the configuration.

  1. We explicitly set data_selector to "results" to select the list of posts from the response. This is optional, if not set, dlt will try to auto-detect the data location.
  2. We define the created_since parameter as an incremental parameter with the following fields:
{
"created_since": {
"type": "incremental",
"cursor_path": "created_at",
"initial_value": "2024-01-25",
},
}
  • type: The type of the parameter definition. In this case, it must be set to incremental.
  • cursor_path: The JSONPath to the field within each item in the list. The value of this field will be used in the next request. In the example above our items look like {"id": 1, "title": "Post 1", "created_at": "2024-01-26"} so to track the created time we set cursor_path to "created_at". Note that the JSONPath starts from the root of the item (dict) and not from the root of the response.
  • initial_value: The initial value for the cursor. This is the value that will initialize the state of incremental loading. In this case, it's 2024-01-25. The value type should match the type of the field in the data item.

Incremental loading using the incremental field

The alternative method is to use the incremental field in the endpoint configuration. This method is more flexible and allows you to specify the start and end conditions for the incremental loading.

Let's take the same example as above and configure it using the incremental field:

{
"path": "posts",
"data_selector": "results",
"incremental": {
"start_param": "created_since",
"cursor_path": "created_at",
"initial_value": "2024-01-25",
},
}

Note that we specify the query parameter name created_since in the start_param field and not in the params section.

The full available configuration for the incremental field is:

{
"incremental": {
"start_param": "<start_parameter_name>",
"end_param": "<end_parameter_name>",
"cursor_path": "<path_to_cursor_field>",
"initial_value": "<initial_value>",
"end_value": "<end_value>",
}
}

The fields are:

  • start_param (str): The name of the query parameter to be used as the start condition. If we use the example above, it would be "created_since".
  • end_param (str): The name of the query parameter to be used as the end condition. This is optional and can be omitted if you only need to track the start condition. This is useful when you need to fetch data within a specific range and the API supports end conditions (like created_before query parameter).
  • cursor_path (str): The JSONPath to the field within each item in the list. This is the field that will be used to track the incremental loading. In the example above, it's "created_at".
  • initial_value (str): The initial value for the cursor. This is the value that will initialize the state of incremental loading.
  • end_value (str): The end value for the cursor to stop the incremental loading. This is optional and can be omitted if you only need to track the start condition. If you set this field, initial_value needs to be set as well.

See the incremental loading guide for more details.

If you encounter issues with incremental loading, see the troubleshooting section in the incremental loading guide.

Advanced configuration

rest_api_source() function creates the dlt source and lets you configure the following parameters:

  • config: The REST API configuration dictionary.
  • name: An optional name for the source.
  • section: An optional section name in the configuration file.
  • max_table_nesting: Sets the maximum depth of nested table above which the remaining nodes are loaded as structs or JSON.
  • root_key (bool): Enables merging on all resources by propagating root foreign key to child tables. This option is most useful if you plan to change write disposition of a resource to disable/enable merge. Defaults to False.
  • schema_contract: Schema contract settings that will be applied to this resource.
  • spec: A specification of configuration and secret values required by the source.

Response actions

The response_actions field in the endpoint configuration allows you to specify how to handle specific responses from the API based on status codes or content substrings. This is useful for handling edge cases like ignoring responses on specific conditions.

Experimental Feature

This is an experimental feature and may change in future releases.

Example

{
"path": "issues",
"response_actions": [
{"status_code": 404, "action": "ignore"},
{"content": "Not found", "action": "ignore"},
{"status_code": 200, "content": "some text", "action": "ignore"},
],
}

In this example, the source will ignore responses with a status code of 404, responses with the content "Not found", and responses with a status code of 200 and content "some text".

Fields:

  • status_code (int, optional): The HTTP status code to match.
  • content (str, optional): A substring to search for in the response content.
  • action (str): The action to take when the condition is met. Currently supported actions:
    • ignore: Ignore the response.

Troubleshooting

If you encounter issues while running the pipeline, enable logging for detailed information about the execution:

RUNTIME__LOG_LEVEL=INFO python my_script.py

This also provides details on the HTTP requests.

Configuration issues

Getting validation errors

When you running the pipeline and getting a DictValidationException, it means that the source configuration is incorrect. The error message provides details on the issue including the path to the field and the expected type.

For example, if you have a source configuration like this:

config: RESTAPIConfig = {
"client": {
# ...
},
"resources": [
{
"name": "issues",
"params": { # <- Wrong: this should be inside
"sort": "updated", # the endpoint field below
},
"endpoint": {
"path": "issues",
# "params": { # <- Correct configuration
# "sort": "updated",
# },
},
},
# ...
],
}

You will get an error like this:

dlt.common.exceptions.DictValidationException: In path .: field 'resources[0]'
expects the following types: str, EndpointResource. Provided value {'name': 'issues', 'params': {'sort': 'updated'},
'endpoint': {'path': 'issues', ... }} with type 'dict' is invalid with the following errors:
For EndpointResource: In path ./resources[0]: following fields are unexpected {'params'}

It means that in the first resource configuration (resources[0]), the params field should be inside the endpoint field.

tip

Import the RESTAPIConfig type from the rest_api module to have convenient hints in your editor/IDE and use it to define the configuration object.

from rest_api import RESTAPIConfig

Getting wrong data or no data

If incorrect data is received from an endpoint, check the data_selector field in the endpoint configuration. Ensure the JSONPath is accurate and points to the correct data in the response body. rest_api attempts to auto-detect the data location, which may not always succeed. See the data selection section for more details.

Getting insufficient data or incorrect pagination

Check the paginator field in the configuration. When not explicitly specified, the source tries to auto-detect the pagination method. If auto-detection fails, or the system is unsure, a warning is logged. For production environments, we recommend to specify an explicit paginator in the configuration. See the pagination section for more details. Some APIs may have non-standard pagination methods, and you may need to implement a custom paginator.

Incremental loading not working

See the troubleshooting guide for incremental loading issues.

Getting HTTP 404 errors

Some API may return 404 errors for resources that do not exist or have no data. Manage these responses by configuring the ignore action in response actions.

Authentication issues

If experiencing 401 (Unauthorized) errors, this could indicate:

  • Incorrect authorization credentials. Verify credentials in the secrets.toml. Refer to Secret and configs for more information.
  • An incorrect authentication type. Consult the API documentation for the proper method. See the authentication section for details. For some APIs, a custom authentication method may be required.

General guidelines

The rest_api source uses the RESTClient class for HTTP requests. Refer to the RESTClient troubleshooting guide for debugging tips.

For further assistance, join our Slack community. We're here to help!

This demo works on codespaces. Codespaces is a development environment available for free to anyone with a Github account. You'll be asked to fork the demo repository and from there the README guides you with further steps.
The demo uses the Continue VSCode extension.

Off to codespaces!

DHelp

Ask a question

Welcome to "Codex Central", your next-gen help center, driven by OpenAI's GPT-4 model. It's more than just a forum or a FAQ hub – it's a dynamic knowledge base where coders can find AI-assisted solutions to their pressing problems. With GPT-4's powerful comprehension and predictive abilities, Codex Central provides instantaneous issue resolution, insightful debugging, and personalized guidance. Get your code running smoothly with the unparalleled support at Codex Central - coding help reimagined with AI prowess.