Skip to main content
Version: 1.5.0 (latest)

Pipedrive

Need help deploying these sources or figuring out how to run them in your data stack?
Join our Slack community or Book a call with a dltHub Solutions Engineer.

Pipedrive is a cloud-based sales Customer Relationship Management (CRM) tool designed to help businesses manage leads and deals, track communication, and automate sales processes.

This Pipedrive dlt verified source and pipeline example load data using the “Pipedrive API” to the destination of your choice.

Sources and resources that can be loaded using this verified source are:

NameDescription
activityRefers to scheduled events or tasks associated with deals, contacts, or organizations
organizationCompany or entity with which you have potential or existing business dealings
personIndividual contact or lead with whom sales deals can be associated
productGoods or services that a company sells, which can be associated with deals
dealPotential sale or transaction that you can track through various stages
pipelineVisual representation of your sales process, displaying the stages and deals in each stage
stageSpecific step in a sales process where a deal resides based on its progress
userIndividual with a unique login credential who can access and use the platform

Setup guide

Grab API token

  1. Set up a Pipedrive account.
  2. In Pipedrive, go to your name (in the top right corner).
  3. Select company settings.
  4. Go to personal preferences.
  5. Select the API tab.
  6. Copy your API token (to be used in the dlt configuration).

Note: The Pipedrive UI, which is described here, might change. The full guide is available at this link.

Initialize the verified source

To get started with your data pipeline, follow these steps:

  1. Enter the following command:

    dlt init pipedrive duckdb

    This command will initialize the pipeline example with Pipedrive as the source and duckdb as the destination.

  2. If you'd like to use a different destination, simply replace duckdb with the name of your preferred destination.

  3. After running this command, a new directory will be created with the necessary files and configuration settings to get started.

For more information, read the guide on how to add a verified source.

Add credentials

  1. In the .dlt folder, there's a file called secrets.toml. It's where you store sensitive information securely, like access tokens. Keep this file safe.

    Here's what the file looks like:

    [sources.pipedrive.credentials]
    # Note: Do not share this file and do not push it to GitHub!
    pipedrive_api_key = "PIPEDRIVE_API_TOKEN" # please set me up!
  2. Replace PIPEDRIVE_API_TOKEN with the API token you copied above.

  3. Finally, enter credentials for your chosen destination as per the docs.

For more information, read the General Usage: Credentials.

Run the pipeline

  1. Before running the pipeline, ensure that you have installed all the necessary dependencies by running the command:
    pip install -r requirements.txt
  2. You're now ready to run the pipeline! To get started, run the following command:
    python pipedrive_pipeline.py
  3. Once the pipeline has finished running, you can verify that everything loaded correctly by using the following command:
    dlt pipeline <pipeline_name> show
    For example, the pipeline_name for the above pipeline example is pipedrive, but you may also use any custom name instead.

For more information, read the guide on how to run a pipeline.

Sources and resources

dlt works on the principle of sources and resources.

Default endpoints

You can write your own pipelines to load data to a destination using this verified source. However, it is important to note the complete list of the default endpoints given in pipedrive/settings.py.

In the file "settings.py" you'll find "ENTITY_MAPPINGS" and "RECENTS_ENTITIES".

  • ENTITY_MAPPING: is a list detailing entities in Pipedrive. Each tuple contains the entity's name, its associated custom fields, and any additional configuration. Provides dynamic access to Pipedrive entities and custom fields.

  • RECENTS_ENTITIES: is a dictionary that provides a mapping between singular entity names and their plural forms.

In summary, both "ENTITY_MAPPINGS" and "RECENTS_ENTITIES" standardize interactions with the Pipedrive API.

Source pipedrive_source

This function returns a list of resources including activities, deals, custom_fields_mapping, and other resources data from the Pipedrive API.

@dlt.source(name="pipedrive")
def pipedrive_source(
pipedrive_api_key: str = dlt.secrets.value,
since_timestamp: Optional[Union[pendulum.DateTime, str]] = dlt.config.value,
) -> Iterator[DltResource]:
...

pipedrive_api_key: Authentication token for Pipedrive, configured in ".dlt/secrets.toml".

since_timestamp: Starting timestamp for incremental loading. By default, the complete history is loaded on the first run, and new data in subsequent runs.

Note: Incremental loading can be enabled or disabled depending on user preferences.

Resource iterator RECENTS_ENTITIES

This code generates resources for each entity in RECENTS_ENTITIES, stores them in endpoints_resources, and then loads data from each endpoint to the destination.

endpoints_resources = {}
for entity, resource_name in RECENTS_ENTITIES.items():
endpoints_resources[resource_name] = dlt.resource(
get_recent_items_incremental,
name=resource_name,
primary_key="id",
write_disposition="merge",
)(entity, **resource_kwargs)

# yields endpoint_resources.values

entity and resource_name: Key-value pairs from RECENTS_ENTITIES.

get_recent_items_incremental: Function given to dlt.resource to generate data.

name: Sets the resource's name.

primary_key: Designates "id" as the resource's primary key.

write_disposition: New data merges with existing data in the destination.

Transformer deals_participants

This function gets the participants of deals from the Pipedrive API and yields the result.

def pipedrive_source(args):
# Rest of function
yield endpoints_resources["deals"] | dlt.transformer(
name="deals_participants",
write_disposition="merge",
primary_key="id"
)(_get_deals_participants)(pipedrive_api_key)

name: Names the transformer as "deals_participants".

write_disposition: Sets the transformer to merge new data with existing data in the destination.

Similar to the transformer function "deals_participants," another transformer function named "deals_flow" gets the flow of deals from the Pipedrive API and then yields the result for further processing or loading.

Resource create_state

This function preserves the mapping of custom fields across different pipeline runs. It is used to create and store a mapping of custom fields for different entities in the source state.

@dlt.resource(selected=False)
def create_state(pipedrive_api_key: str) -> Iterator[Dict[str, Any]]:
def _get_pages_for_rename(
entity: str, fields_entity: str, pipedrive_api_key: str
) -> Dict[str, Any]:
...
yield _get_pages_for_rename("", "", "")

It processes each entity in ENTITY_MAPPINGS, updating the custom fields mapping if a related fields entity exists. This updated state is then saved for future pipeline runs.

Other functions

Similar to the above functions, there are the following:

custom_fields_mapping: Transformer function that parses and yields custom fields' mapping in order to be stored in the destination by dlt.

leads: Resource function that incrementally loads Pipedrive leads by update_time.

Customization

Create your own pipeline

If you wish to create your own pipelines, you can leverage source and resource methods from this verified source.

  1. Configure the pipeline by specifying the pipeline name, destination, and dataset as follows:

    pipeline = dlt.pipeline(
    pipeline_name="pipedrive", # Use a custom name if desired
    destination="duckdb", # Choose the appropriate destination (e.g., duckdb, redshift, post)
    dataset_name="pipedrive_data" # Use a custom name if desired
    )

    To read more about pipeline configuration, please refer to our documentation.

  2. To print source info:

    pipedrive_data = pipedrive_source()
    #print source info
    print(pipedrive_data)
    # list resource names
    print(pipedrive_data.resources.keys())
    # print `persons` resource info
    print(pipedrive_data.resources["persons"])
  3. To load all the data in Pipedrive:

    load_data = pipedrive_source() # calls the source function
    load_info = pipeline.run(load_data) #runs the pipeline with selected source configuration
    print(load_info)
  4. To load data from selected resources:

    #To load custom fields, include custom_fields_mapping for hash to name mapping.
    load_data = pipedrive_source().with_resources("products", "deals", "deals_participants", "custom_fields_mapping")
    load_info = pipeline.run(load_data) #runs the pipeline loading selected data
    print(load_info)
  5. To load data from a start date:

    # Configure a source for 'activities' starting from the specified date.
    # The 'custom_fields_mapping' is incorporated to convert custom field hashes into their respective names.
    activities_source = pipedrive_source(
    since_timestamp="2023-03-01 00:00:00Z"
    ).with_resources("activities", "custom_fields_mapping")

    # Run the pipeline with both sources
    load_info = pipeline.run(activities_source)
    print(load_info)

This demo works on codespaces. Codespaces is a development environment available for free to anyone with a Github account. You'll be asked to fork the demo repository and from there the README guides you with further steps.
The demo uses the Continue VSCode extension.

Off to codespaces!

DHelp

Ask a question

Welcome to "Codex Central", your next-gen help center, driven by OpenAI's GPT-4 model. It's more than just a forum or a FAQ hub – it's a dynamic knowledge base where coders can find AI-assisted solutions to their pressing problems. With GPT-4's powerful comprehension and predictive abilities, Codex Central provides instantaneous issue resolution, insightful debugging, and personalized guidance. Get your code running smoothly with the unparalleled support at Codex Central - coding help reimagined with AI prowess.