Pipedrive
Pipedrive is a cloud-based sales Customer Relationship Management (CRM) tool designed to help businesses manage leads and deals, track communication, and automate sales processes.
This Pipedrive dlt
verified source and
pipeline example
load data using the “Pipedrive API” to the destination of your choice.
Sources and resources that can be loaded using this verified source are:
Name | Description |
---|---|
activity | Refers to scheduled events or tasks associated with deals, contacts, or organizations |
organization | Company or entity with which you have potential or existing business dealings |
person | Individual contact or lead with whom sales deals can be associated |
product | Goods or services that a company sells, which can be associated with deals |
deal | Potential sale or transaction that you can track through various stages |
pipeline | Visual representation of your sales process, displaying the stages and deals in each stage |
stage | Specific step in a sales process where a deal resides based on its progress |
user | Individual with a unique login credential who can access and use the platform |
Setup Guide
Grab API token
- Set up a Pipedrive account.
- In Pipedrive, go to your name (in the top right corner).
- Select company settings.
- Go to personal preferences.
- Select the API tab.
- Copy your API token (to be used in the dlt configuration).
Note: The Pipedrive UI, which is described here, might change. The full guide is available at this link.
Initialize the verified source
To get started with your data pipeline, follow these steps:
Enter the following command:
dlt init pipedrive duckdb
This command will initialize the pipeline example with Pipedrive as the source and duckdb as the destination.
If you'd like to use a different destination, simply replace
duckdb
with the name of your preferred destination.After running this command, a new directory will be created with the necessary files and configuration settings to get started.
For more information, read the guide on how to add a verified source.
Add credentials
In the
.dlt
folder, there's a file calledsecrets.toml
. It's where you store sensitive information securely, like access tokens. Keep this file safe.Here's what the file looks like:
[sources.pipedrive.credentials]
# Note: Do not share this file and do not push it to GitHub!
pipedrive_api_key = "PIPEDRIVE_API_TOKEN" # please set me up !Replace
PIPEDRIVE_API_TOKEN
with the API token you copied above.Finally, enter credentials for your chosen destination as per the docs.
For more information, read the General Usage: Credentials.
Run the pipeline
- Before running the pipeline, ensure that you have installed all the necessary dependencies by
running the command:
pip install -r requirements.txt
- You're now ready to run the pipeline! To get started, run the following command:
python pipedrive_pipeline.py
- Once the pipeline has finished running, you can verify that everything loaded correctly by using
the following command:For example, the
dlt pipeline <pipeline_name> show
pipeline_name
for the above pipeline example ispipedrive
, but you may also use any custom name instead.
For more information, read the guide on how to run a pipeline.
Sources and resources
dlt
works on the principle of sources and
resources.
Default endpoints
You can write your own pipelines to load data to a destination using this verified source. However, it is important to note the complete list of the default endpoints given in pipedrive/settings.py.
In the file "settings.py" you'll find "ENTITY_MAPPINGS" and "RECENTS_ENTITIES".
ENTITY_MAPPING
: is a list detailing entities in Pipedrive. Each tuple contains the entity's name, its associated custom fields, and any additional configuration. Provides dynamic access to Pipedrive entities and custom fields.RECENTS_ENTITIES
: is a dictionary that provides a mapping between singular entity names and their plural forms.
In summary, both "ENTITY_MAPPINGS" and "RECENTS_ENTITIES" standardize interactions with the Pipedrive API.
Source pipedrive_source
This function returns a list of resources including activities, deals, custom_fields_mapping and other resources data from Pipedrive API.
@dlt.source(name="pipedrive")
def pipedrive_source(
pipedrive_api_key: str = dlt.secrets.value,
since_timestamp: Optional[Union[pendulum.DateTime, str]] = dlt.config.value,
) -> Iterator[DltResource]:
...
pipedrive_api_key
: Authentication token for Pipedrive, configured in ".dlt/secrets.toml".
since_timestamp
: Starting timestamp for incremental loading. By default, complete history is loaded
on the first run. And new data in subsequent runs.
Note: Incremental loading can be enabled or disabled depending on user preferences.
Resource iterator RECENTS_ENTITIES
This code generates resources for each entity in RECENTS_ENTITIES, stores them in endpoints_resources, and then loads data from each endpoint to the destination.
endpoints_resources = {}
for entity, resource_name in RECENTS_ENTITIES.items():
endpoints_resources[resource_name] = dlt.resource(
get_recent_items_incremental,
name=resource_name,
primary_key="id",
write_disposition="merge",
)(entity, **resource_kwargs)
#yields endpoint_resources.values
entity and resource_name
: Key-value pairs from RECENTS_ENTITIES.
get_recent_items_incremental
: Function given to dlt.resource to generate data.
name
: Sets the resource's name.
primary_key
: Designates "id" as the resource's primary key.
write_disposition
: New data merges with existing data in the destination.
Transformer deals_participants
This function gets the participants of deals from the Pipedrive API and yields the result.
def pipedrive_source(args):
# Rest of function
yield endpoints_resources["deals"] | dlt.transformer(
name="deals_participants",
write_disposition="merge",
primary_key="id"
)(_get_deals_participants)(pipedrive_api_key)
name
: Names the transformer as "deals_participants".
write_disposition
: Sets the transformer to merge new data with existing data in the destination.
Similar to the transformer function "deals_participants" is another transformer function named "deals_flow" that gets the flow of deals from the Pipedrive API, and then yields the result for further processing or loading.
Resource create_state
This function preserves the mapping of custom fields across different pipeline runs. It is used to create and store a mapping of custom fields for different entities in the source state.
@dlt.resource(selected=False)
def create_state(pipedrive_api_key: str) -> Iterator[Dict[str, Any]]:
def _get_pages_for_rename(
entity: str, fields_entity: str, pipedrive_api_key: str
) -> Dict[str, Any]:
...
yield _get_pages_for_rename("", "", "")
It processes each entity in ENTITY_MAPPINGS, updating the custom fields mapping if a related fields entity exists. This updated state is then saved for future pipeline runs.
Other functions
Similar to the above functions, there are the following:
custom_fields_mapping
: Transformer function that parses and yields custom fields' mapping in order
to be stored in destination by dlt.
leads
: Resource function that incrementally loads Pipedrive leads by update_time.
Customization
Create your own pipeline
If you wish to create your own pipelines, you can leverage source and resource methods from this verified source.
Configure the pipeline by specifying the pipeline name, destination, and dataset as follows:
pipeline = dlt.pipeline(
pipeline_name="pipedrive", # Use a custom name if desired
destination="duckdb", # Choose the appropriate destination (e.g., duckdb, redshift, post)
dataset_name="pipedrive_data" # Use a custom name if desired
)To read more about pipeline configuration, please refer to our documentation.
To print source info:
pipedrive_data = pipedrive_source()
#print source info
print(pipedrive_data)
# list resource names
print(pipedrive_data.resources.keys())
# print `persons` resource info
print(pipedrive_data.resources["persons"])To load all the data in Pipedrive:
load_data = pipedrive_source() # calls the source function
load_info = pipeline.run(load_data) #runs the pipeline with selected source configuration
print(load_info)To load data from selected resources:
#To load custom fields, include custom_fields_mapping for hash to name mapping.
load_data = pipedrive_source().with_resources("products", "deals", "deals_participants", "custom_fields_mapping")
load_info = pipeline.run(load_data) #runs the pipeline loading selected data
print(load_info)To load data from a start date:
# Configure a source for 'activities' starting from the specified date.
# The 'custom_fields_mapping' is incorporated to convert custom field hashes into their respective names.
activities_source = pipedrive_source(
since_timestamp="2023-03-01 00:00:00Z"
).with_resources("activities", "custom_fields_mapping")
# Run the pipeline with both sources
load_info = pipeline.run(activities_source)
print(load_info)
Additional Setup guides
- Load data from Pipedrive to Microsoft SQL Server in python with dlt
- Load data from Pipedrive to MotherDuck in python with dlt
- Load data from Pipedrive to Snowflake in python with dlt
- Load data from Pipedrive to ClickHouse in python with dlt
- Load data from Pipedrive to AWS Athena in python with dlt
- Load data from Pipedrive to BigQuery in python with dlt
- Load data from Pipedrive to Neon Serverless Postgres in python with dlt
- Load data from Pipedrive to Timescale in python with dlt
- Load data from Pipedrive to PostgreSQL in python with dlt
- Load data from Pipedrive to YugabyteDB in python with dlt