Skip to main content

2 posts tagged with "Unstructured Data"

View All Tags

· 11 min read
Zaeem Athar
info

TL;DR: In this blog post, we'll build data piplines using dlt and orchestrate them using Dagster.

dlt is an open-source Python library that allows you to declaratively load messy data sources into well-structured tables or datasets, through automatic schema inference and evolution. It simplifies building data pipelines by providing functionality to support the entire extract and load process.

It does so in a scalable way, enabling you to run it on both micro workers or in highly parallelized setups. dlt also offers robustness on extraction by providing state management for incremental extraction, drop-in requests replacement with retries, and many other helpers for common and uncommon extraction cases.

To start with dlt, you can install it using pip: pip install dlt. Afterward, import dlt in your Python script and start building your data pipeline. There's no need to start any backends or containers.

Project Overview:

In this example, we will ingest GitHub issue data from a repository and store the data in BigQuery. We will use dlt to create a data pipeline and orchestrate it using Dagster.

Initially, we will start by creating a simple data pipeline using dlt. We will then orchestrate the pipeline using Dagster. Finally, we will add more features to this pipeline by using the dlt schema evolution and Dagster asset metadata to educate the users about their data pipeline.

The project code is available on GitHub.

Project Overview

As we will be ingesting data into BigQuery we first need to create service account credentials for BigQuery. You can find more info on setting up a service account in the dlt docs.

Once we have the credentials we are ready to begin. Let’s first install Dagster and dlt. The below commands should install both.

pip install dlt
pip install dagster dagster-webserver

Simple dlt Pipeline:

As a first step, we will create the GitHub issues pipeline using dlt.

dlt init github_issues bigquery

This will generate a template for us to create a new pipeline. Under .dlt/secrets.toml add the service account credentials for BigQuery. Then in the github_issues.py delete the generated code and add the following:

@dlt.resource(write_disposition="append")
def github_issues_resource(api_secret_key=dlt.secrets.value):
owner = 'dlt-hub'
repo = 'dlt'
url = f"https://api.github.com/repos/{owner}/{repo}/issues"
headers = {"Accept": "application/vnd.github.raw+json"}

while url:
response = requests.get(url, headers=headers)
response.raise_for_status() # raise exception if invalid response
issues = response.json()
yield issues

if 'link' in response.headers:
if 'rel="next"' not in response.headers['link']:
break

url = response.links['next']['url'] # fetch next page of stargazers
else:
break
time.sleep(2) # sleep for 2 seconds to respect rate limits

if __name__ == "__main__":
# configure the pipeline with your destination details
pipeline = dlt.pipeline(
pipeline_name='github_issues', destination='bigquery', dataset_name='github_issues_data'
)

# run the pipeline with your parameters
load_info = pipeline.run(github_issues_resource())

#print the information on data that was loaded
print(load_info)

The above code creates a simple github_issues pipeline that gets the issues data from the defined repository and loads it into BigQuery. The dlt.resources yields the data while the dlt.pipeline normalizes the nested data and loads it into the defined destination. To read more about the technical details refer to the dlt docs.

To run the pipeline execute the below commands:

pip install -r requirements.txt
python github_issues.py

We now have a running pipeline and are ready to orchestrate it using Dagster.

Orchestrating using Dagster:

We will need to adjust our pipeline a bit to orchestrate it using Dagster.

Step 1: Create a Dagster project

  • Create a new directory for your Dagster project and scaffold the basic structure:
mkdir dagster_github_issues
cd dagster_github_issues
dagster project scaffold --name github-issues

This will generate the default files for Dagster that we will use as a starting point for our data pipeline.

Step 2: Set up the directory structure

  • Inside the github-issues/github_issues directory create the following folders: assets, resources, and dlt.
.
├── README.md
├── github_issues
│ ├── __init__.py
│ ├── assets
│ │ ├── __init__.py
│ ├── dlt
│ │ ├── __init__.py
│ └── resources
│ ├── __init__.py
├── github_issues_tests
│ ├── __init__.py
│ └── test_assets.py
├── pyproject.toml
├── setup.cfg
└── setup.py

Step 3: Add dlt Resources and environment variables

  • Copy the previously created github_issues_resource code into dlt/__init__.py under the dlt folder. Remove the dlt.secrets.value parameter, as we'll pass the credentials through a .env file.
  • Create a .env file in the root directory. This is the directory where the pyproject.toml file exits. Copy the credentials into the .env and follow the correct naming convention. For more info on setting up the .env file have a look at the docs.

Step 4: Add configurable resources and define the asset

  • Define a DDltResource class in resources/__init__.py as a Dagster configurable resource. This class allows you to reuse pipeline code inside an asset.
from dagster import ConfigurableResource 
import dlt

class DDltResource(ConfigurableResource):
pipeline_name: str
dataset_name: str
destination: str

def create_pipeline(self, resource_data, table_name):

# configure the pipeline with your destination details
pipeline = dlt.pipeline(
pipeline_name=self.pipeline_name, destination=self.destination, dataset_name=self.dataset_name
)

# run the pipeline with your parameters
load_info = pipeline.run(dlt_resource, table_name=table_name)

return load_info
  • Define the asset, issues_pipeline, in assets/__init__.py. This asset uses the configurable resource to create a dlt pipeline and ingests data into BigQuery.
from dagster import asset, get_dagster_logger
from ..resources import DDltResource
from ..dlt import github_issues_resource

@asset
def issues_pipeline(pipeline: DDltResource):

logger = get_dagster_logger()
results = pipeline.create_pipeline(github_issues_resource, table_name='github_issues')
logger.info(results)

The defined asset (issues_pipeline) takes as input the configurable resource (DDltResource). In the asset, we use the configurable resource to create a dlt pipeline by using an instance of the configurable resource (DDltResource) to call the create_pipeline function. The dlt.resource (github_issues_resource) is passed to the create_pipeline function. The create_pipeline function normalizes the data and ingests it into BigQuery.

Step 5: Handle Schema Evolution

dlt provides the feature of schema evolution that monitors changes in the defined table schema. Suppose GitHub adds a new column or changes a datatype of a column this small change can break pipelines and transformations. The schema evolution feature works amazingly well with Dagster.

  • Add the schema evolution code to the asset to make our pipelines more resilient to changes.
from dagster import AssetExecutionContext
@asset
def issues_pipeline(context: AssetExecutionContext, pipeline: DDltResource):
...
md_content=""
for package in result.load_packages:
for table_name, table in package.schema_update.items():
for column_name, column in table["columns"].items():
md_content= f"\tTable updated: {table_name}: Column changed: {column_name}: {column['data_type']}"

# Attach the Markdown content as metadata to the asset
context.add_output_metadata(metadata={"Updates": MetadataValue.md(md_content)})

Step 6: Define Definitions

  • In the __init.py__ under the github_issues folder add the definitions:
all_assets = load_assets_from_modules([assets])
simple_pipeline = define_asset_job(name="simple_pipeline", selection= ['issues_pipeline'])

defs = Definitions(
assets=all_assets,
jobs=[simple_pipeline],
resources={
"pipeline": DDltResource(
pipeline_name = "github_issues",
dataset_name = "dagster_github_issues",
destination = "bigquery",
table_name= "github_issues"
),
}
)

Step 7: Run the Web Server and materialize the asset

  • In the root directory (github-issues) run the dagster dev command to run the web server and materialize the asset.

GitHub Asset

Step 8: View the populated Metadata and ingested data in BigQuery

Once the asset has been successfully materialized go to the Assets tab from the top and select the Issues_pipeline. In the Metadata you can see the Tables, Columns, and Data Types that have been updated. In this case, the changes are related to internal dlt tables.

Any subsequent changes in the GitHub issues schema can be tracked from the metadata. You can set up Slack notifications to be alerted to schema changes.

Meatadata loaded in Asset

Let's finally have a look in BigQuery to view the ingested data.

Data Loaded in Bigquery

The github_issues is the parent table that contains the data from the root level of the JSON returned by the GitHub API. The subsequent table github_issues_assignees is a child table that was nested in the original JSON. dlt normalizes nested data by populating them in separate tables and creates relationships between the tables. To learn more about how dlt created these relationships refer to the docs.

Orchestrating verified dlt source using Dagster:

dlt provides a list of verified sources that can be initialized to fast-track the pipeline-building process. You can find a list of sources provided in the dlt docs.

One of the main strengths of dlt lies in its ability to extract, normalize, and ingest unstructured and semi-structured data from various sources. One of the most commonly used verified source is MongoDB. Let’s quickly look at how we can orchestrate MongoDB source using Dagster.

Step 1: Setting up a Dagster project

  • Start by creating a new Dagster project scaffold:
dagster project scaffold --name mongodb-dlt
  • Follow the steps mentioned earlier and create an assets, and resources directory under mongodb-dlt/mongodb_dlt.
  • Initialize a dlt MongoDB pipeline in the same directory:
dlt init mongodb bigquery

This will create a template with all the necessary logic implemented for extracting data from MongoDB. After running the command your directory structure should be as follows:

.
├── README.md
├── mongodb_dlt
│ ├── __init__.py
│ ├── assets
│ │ ├── __init__.py
│ │ └── assets.py
│ ├── mongodb
│ │ ├── README.md
│ │ ├── __init__.py
│ │ └── helpers.py
│ ├── mongodb_pipeline.py
│ ├── requirements.txt
│ └── resources
│ ├── __init__.py
├── mongodb_dlt_tests
│ ├── __init__.py
│ └── test_assets.py
├── pyproject.toml
├── setup.cfg
└── setup.py

Step 2: Configuring MongoDB Atlas and Credentials

For this example, we are using MongoDB Atlas. Set up the account for MongoDB Atlas and use the test Movie Flix Dataset. You can find detailed information on setting up the credentials in the MongoDB verified sources documentation.

Next, create a .env file and add the BigQuery and MongoDB credentials to the file. The .env file should reside in the root directory.

Step 3: Adding the DDltResource

Create a DltResouce under the resources directory. Add the following code to the __init__.py:

from dagster import ConfigurableResource 

import dlt

class DDltResource(ConfigurableResource):
pipeline_name: str
dataset_name: str
destination: str

def load_collection(self, resource_data, database):

# configure the pipeline with your destination details
pipeline = dlt.pipeline(
pipeline_name=f"{database}_{self.pipeline_name}", destination=self.destination, dataset_name=f"{self.dataset_name}_{database}"
)

load_info = pipeline.run(resource_data, write_disposition="replace")

return load_info

Step 4: Defining an Asset Factory

The structure of data in MongoDB is such that under each database you will find multiple collections. When writing a data pipeline it is important to separate the data loading for each collection.

Dagster provides the feature of @multi_asset declaration that will allow us to convert each collection under a database into a separate asset. This will make our pipeline easy to debug in case of failure and the collections independent of each other.

In the mongodb_pipeline.py file, locate the load_select_collection_hint_db function. We will use this function to create the asset factory.

In the __init__.py file under the assets directory, define the dlt_asset_factory:

from ..mongodb import mongodb
from ..resources import DDltResource

import dlt
import os

URL = os.getenv('SOURCES__MONGODB__CONNECTION__URL')

DATABASE_COLLECTIONS = {
"sample_mflix": [
"comments",
"embedded_movies",
],
}

def dlt_asset_factory(collection_list):
multi_assets = []

for db, collection_name in collection_list.items():
@multi_asset(
name=db,
group_name=db,
outs={
stream: AssetOut(key_prefix=[f'raw_{db}'])
for stream in collection_name}

)
def collections_asset(context: OpExecutionContext, pipeline: DDltResource):

# Getting Data From MongoDB
data = mongodb(URL, db).with_resources(*collection_name)

logger = get_dagster_logger()
results = pipeline.load_collection(data, db)
logger.info(results)

return tuple([None for _ in context.selected_output_names])

multi_assets.append(collections_asset)

return multi_assets


dlt_assets = dlt_asset_factory(DATABASE_COLLECTIONS)

Step 5: Definitions and Running the Web Server

Add the definitions in the __init__.py in the root directory:

from dagster import Definitions

from .assets import dlt_assets
from .resources import DDltResource

defs = Definitions(
assets=dlt_assets,
resources={
"pipeline": DDltResource(
pipeline_name = "mongo",
dataset_name = "dagster_mongo",
destination = "bigquery"
),
}
)

We can run the dagster dev command to start the web server. We can see that each collection is converted into a separate asset by Dagster. We can materialize our assets to ingest the data into BigQuery.

Asset Factory

The resulting data in BigQuery:

Data Ingestion in BigQuery from MongoDB

Conclusion:

In this demo, we looked at how to orchestrate dlt pipelines using Dagster. We started off by creating a simple dlt pipeline and then converted the pipeline into an asset and resource before orchestrating.

We also looked at how we can orchestrate dlt MongoDB verified sources using Dagster. We utilized the Dagster @multi_asset feature to create a dlt_asset_factory which converts each collection under a database to a separate asset allowing us to create more robust data pipelines.

Both dlt and Dagster can be easily run on local machines. By combining the two we can build data pipelines at great speed and rigorously test them before shipping to production.

· 19 min read
Zaeem Athar
info

TL;DR: A modern analytics stack with dlt and Holistics to transform and ingest unstructured production data from MongoDB to flat tables in BigQuery for self-service analytics.

If you’re a CTO, then you probably love MongoDB: it’s scalable, production-ready, and a great dump for unstructured, and semi-structured data. If you’re however a data scientist or data analyst and you need to run analytics on top of MongoDB data dumps, then you’re probably not a fan. The data in MongoDB needs to be transformed and stored in a data warehouse before it is ready for analytics. The process of transforming and storing the data can become quite tedious due to the unstructured nature of the data.

In this blog, we will show you how you can combine dlt and Holistics and create a modern data stack that makes the process of extracting unstructured data from MongoDB, and running self-service analytics on the data simple and straightforward. We will use dlt to ingest the Movie Flix Dataset into BigQuery from MongoDB and use Holistics to transform the data and run self-service analytics.

An Overview of the MongoDB Modern Analytics Stack

Diagram illustrating the inner workings of our Modern Analytics Stack

ToolLayerWhy it’s awesome
MongoDBProductionSometimes used as a data dump by CTOs. Often stores unstructured, semi-structured production data that stakeholders want to access.
dltData IngestionMongo is great, but then others struggle to analyze the data. dlt extracts data from MongoDB, creates schema in BigQuery, and loads normalized MongoDB data into BigQuery.
BigQueryData WarehouseBecause of its pricing model, it’s a good data warehouse choice to store structured MongoDB data so it can be used by BI tools like Holistics for self-service analytics.
HolisticsData Modeling for Self-Service AnalyticsHolistics makes it easy for data teams to setup and govern an end-user self-service analytics platform using DevOps best practices

In our stack, dlt resides in the data ingestion layer. It takes in unstructured data from MongoDB normalizes the data and populates it into BigQuery.

In the data modeling layer, Holistics accesses the data from BigQuery builds relationships, transforms the data, and creates datasets to access the transformations. In the reporting layer, Holistics allows stakeholders to self-service their data by utilizing the created datasets to build reports and create visualizations.

MongoDB is loved by CTOs, but its usage creates issues for stakeholders.

NoSQL databases such as MongoDB have gained widespread popularity due to their capacity to store data in formats that align more seamlessly with application usage, necessitating fewer data transformations during storage and retrieval.

MongoDB is optimized for performance and uses BSON (Binary Javascript Object Notation) under the hood as compared to JSON. This allows MongoDB to support custom and more complex data types, such as geospatial data, dates, and regex. Additionally, BSON supports character encodings.

All these benefits enable MongoDB to be a faster and better database, but the advantages of the flexibility offered by MongoDB are sometimes abused by developers and CTOs who use it as a dump for all types of unstructured and semi-structured data. This makes this data inaccessible to stakeholders and unfit for analytics purposes.

Moreover, the unique nature of MongoDB with its BSON types and its usage as a data dump in current times mean that additional hurdles must be crossed before data from MongoDB can be moved elsewhere.

How does our Modern data stack solve the MongoDB problem?

In the data ingestion layer, dlt utilizes the MongoDB verified source to ingest data into BigQuery. Initializing the MongoDB verified source setups default code needed to run the pipeline. We just have to setup the credentials and specify the collections in MongoDB to ingest into BigQuery. Once the pipeline is run dlt takes care of all the steps from extracting unstructured data from MongoDB, normalizing the data, creating schema, and populating the data into BigQuery.

Getting your data cleaned and ingested into a data warehouse is just one part of the analytics pipeline puzzle. Before the data is ready to be used by the entire organization the data team must model the data and document the context of data. This means defining the relationships between tables, adding column descriptions, and implementing the necessary transformations. This is where Holistics shines. With analytics-as-code as first-class citizens, Holistics allows data teams to adopt software engineering best practices in their analytics development workflows. This helps data teams govern a centralized curated set of semantic datasets that any business users can use to extract data from the data warehouse.

Why is dlt useful when you want to ingest data from a production database such as MongoDB?

Writing a Python-based data ingestion pipeline for sources such as MongoDB is quite a tedious task as it involves a lot of overhead to set up. The data needs to be cleaned before it is ready for ingestion. Moreover, MongoDB is a NoSQL database meaning it stores data in a JSON-like data structure. So if you want to query it with SQL natively, you will need to transform this JSON-like data structure into flat tables. Let's look at how this transformation and cleaning can be done:

  • Create a Data Model based on the MongoDB data we intend to ingest.
  • Create tables in the data warehouse based on the defined Data Model.
  • Extract the data from MongoDB and perform necessary transformations such as Data Type conversion (BSON to JSON), and flattening of nested data.
  • Insert the transformed data into the corresponding SQL tables.
  • Define relationships between tables by setting up primary and foreign keys.

Using the dlt MongoDB verified source we can forgo the above-mentioned steps. dlt takes care of all the steps from transforming the JSON data into relational data, to creating the schema in the SQL database.

To get started with dlt we would need to set some basic configurations, while everything else would be automated. dlt takes care of all the steps from creating schema to transforming the JSON data into relational data. The workflow for creating such a data pipeline in dlt would look something like this:

  • Initialize a MongoDB source to copy the default code.
  • Set up the credentials for the source and destination.
  • Define the MongoDB collection to ingest (or default to all).
  • Optionally configure incremental loading based on source logic.

What is useful about Holistics in this project?

Holistics is a Business Intelligence platform with the goal of enabling self-service analytics for entire organizations. Holistics works by connecting to an SQL data warehouse. This allows it to build SQL queries and execute them against the data warehouse. In essence, Holistics utilizes the storage and processing capabilities of the data warehouse and the data never leaves the data warehouse.

To enable self-service Holistics introduces a modeling layer. The data teams use this layer to define table relationships, data transformations, metrics, and data logic. The entire organization can utilize these metrics and data logic defined in this layer to self-service their data needs.

In addition to the transformation layer, Holistics provides advanced features such as defining models using code through Holistics’ analytics-as-code languages (AMQL) and utilizing Git version control systems to manage code changes. Moreover, data teams can integrate with dbt to streamline the data transformations.

The overall Holistics workflow looks something like this:

Holistics Overview

  • Connect Holistics to an existing SQL data warehouse.
  • Data teams use Holistics Data Modeling to model and transform analytics data. This model layer is reusable across reports & datasets.
  • Non-technical users can self-service explore data based on predefined datasets prepared by data teams. They can save their explorations into dashboards for future use.
  • Dashboards can be shared with others, or pushed to other platforms (email, Slack, webhooks, etc.).

Code Walkthrough

In this section, we walk through how to set up a MongoDB data pipeline using dlt. We will be using the MongoDB verified source you can find here.

1. Setting up the dlt pipeline

Use the command below to install dlt.

pip3 install -U dlt

Consider setting up a virtual environment for your projects and installing the project-related libraries and dependencies inside the environment. For best installation practices visit the dlt installation guide.

Once we have dlt installed, we can go ahead and initialize a verified MongoDB pipeline with the destination set to Google BigQuery. First, create a project directory and then execute the command below:

dlt init mongodb bigquery

The above command will create a local ready-made pipeline that we can customize to our needs. After executing the command your project directory will look as follows:

.
├── .dlt
│ ├── config.toml
│ └── secrets.toml
├── mongodb
│ ├── README.md
│ ├── __init__.py
│ └── helpers.py
├── mongodb_pipeline.py
└── requirements.txt

The __init__.py file in the mongodb directory contains dlt functions we call resources that yield the data from MongoDB. The yielded data is passed to a dlt.pipeline that normalizes the data and forms the connection to move the data to your destination. To get a better intuition of the different dlt concepts have a look at the docs.

As the next step, we set up the credentials for MongoDB. You can find detailed information on setting up the credentials in the MongoDB verified sources documentation.

We also need to set up the GCP service account credentials to get permissions to BigQuery. You can find detailed explanations on setting up the service account in the dlt docs under Destination Google BigQuery.

Once all the credentials are set add them to the secrets.toml file. Your file should look something like this:

# put your secret values and credentials here. do not share this file and do not push it to github
[sources.mongodb]
connection_url = "mongodb+srv://<user>:<password>@<cluster_name>.cvanypn.mongodb.net" # please set me up!
database = "sample_mflix"

[destination.bigquery]
location = "US"
[destination.bigquery.credentials]
project_id = "analytics" # please set me up!
private_key = "very secret can't show"
client_email = "<org_name>@analytics.iam.gserviceaccount.com" # please set me up!

The mongodb_pipeline.py at the root of your project directory is the script that runs the pipeline. It contains many functions that provide different ways of loading the data. The selection of the function depends on your specific use case, but for this demo, we try to keep it simple and use the load_entire_database function.

def load_entire_database(pipeline: Pipeline = None) -> LoadInfo:
"""Use the mongo source to completely load all collection in a database"""
if pipeline is None:
# Create a pipeline
pipeline = dlt.pipeline(
pipeline_name="local_mongo",
destination='bigquery',
dataset_name="mongo_database",
)

# By default the mongo source reflects all collections in the database
source = mongodb()

# Run the pipeline. For a large db this may take a while
info = pipeline.run(source, write_disposition="replace")

return info

Before we execute the pipeline script let's install the dependencies for the pipeline by executing the requirements.txt file.

pip install -r requirements.txt

Finally, we are ready to execute the script. In the main function uncomment the load_entire_database function call and run the script.

python mongodb_pipeline.py

If you followed the instructions correctly the pipeline should run successfully and the data should be loaded in Google BigQuery.

2. The result: Comparing MongoDB data with the data loaded in BigQuery.

To get a sense of what we accomplished let's examine what the unstructured data looked like in MongoDB against what is loaded in BigQuery. Below you can see the sample document in MongoDB.

{
"_id": {
"$oid": "573a1390f29313caabcd42e8"
},
"plot": "A group of bandits stage a brazen train hold-up, only to find a determined posse hot on their heels.",
"genres": [
"Short",
"Western"
],
"runtime": {
"$numberInt": "11"
},
"cast": [
"A.C. Abadie",
"Gilbert M. 'Broncho Billy' Anderson",
"George Barnes",
"Justus D. Barnes"
],
"poster": "https://m.media-amazon.com/images/M/MV5BMTU3NjE5NzYtYTYyNS00MDVmLWIwYjgtMmYwYWIxZDYyNzU2XkEyXkFqcGdeQXVyNzQzNzQxNzI@._V1_SY1000_SX677_AL_.jpg",
"title": "The Great Train Robbery",
"fullplot": "Among the earliest existing films in American cinema - notable as the first film that presented a narrative story to tell - it depicts a group of cowboy outlaws who hold up a train and rob the passengers. They are then pursued by a Sheriff's posse. Several scenes have color included - all hand tinted.",
"languages": [
"English"
],
"released": {
"$date": {
"$numberLong": "-2085523200000"
}
},
"directors": [
"Edwin S. Porter"
],
"rated": "TV-G",
"awards": {
"wins": {
"$numberInt": "1"
},
"nominations": {
"$numberInt": "0"
},
"text": "1 win."
},
"lastupdated": "2015-08-13 00:27:59.177000000",
"year": {
"$numberInt": "1903"
},
"imdb": {
"rating": {
"$numberDouble": "7.4"
},
"votes": {
"$numberInt": "9847"
},
"id": {
"$numberInt": "439"
}
},
"countries": [
"USA"
],
"type": "movie",
"tomatoes": {
"viewer": {
"rating": {
"$numberDouble": "3.7"
},
"numReviews": {
"$numberInt": "2559"
},
"meter": {
"$numberInt": "75"
}
},
"fresh": {
"$numberInt": "6"
},
"critic": {
"rating": {
"$numberDouble": "7.6"
},
"numReviews": {
"$numberInt": "6"
},
"meter": {
"$numberInt": "100"
}
},
"rotten": {
"$numberInt": "0"
},
"lastUpdated": {
"$date": {
"$numberLong": "1439061370000"
}
}
},
"num_mflix_comments": {
"$numberInt": "0"
}
}

This is a typical way data is structured in a NoSQL database. The data is in a JSON-like format and contains nested data. Now, let's look at what is loaded in BigQuery. Below you can see the same data in BigQuery.

BigQuery Data Overview

The ddl (data definition language) for the movies table in BigQuery can be seen below:

CREATE TABLE `dlthub-analytics.mongo_database.movies`
(
_id STRING NOT NULL,
plot STRING,
runtime INT64,
poster STRING,
title STRING,
fullplot STRING,
released TIMESTAMP,
rated STRING,
awards__wins INT64,
awards__nominations INT64,
awards__text STRING,
lastupdated TIMESTAMP,
year INT64,
imdb__rating FLOAT64,
imdb__votes INT64,
imdb__id INT64,
type STRING,
tomatoes__viewer__rating FLOAT64,
tomatoes__viewer__num_reviews INT64,
tomatoes__viewer__meter INT64,
tomatoes__fresh INT64,
tomatoes__critic__rating FLOAT64,
tomatoes__critic__num_reviews INT64,
tomatoes__critic__meter INT64,
tomatoes__rotten INT64,
tomatoes__last_updated TIMESTAMP,
num_mflix_comments INT64,
_dlt_load_id STRING NOT NULL,
_dlt_id STRING NOT NULL,
tomatoes__dvd TIMESTAMP,
tomatoes__website STRING,
tomatoes__production STRING,
tomatoes__consensus STRING,
metacritic INT64,
tomatoes__box_office STRING,
imdb__rating__v_text STRING,
imdb__votes__v_text STRING,
year__v_text STRING
);

If you compare the ddl against the sample document in MongoDB you will notice that the nested arrays such as CAST are missing from the ddl in BigQuery. This is because of how dlt handles nested arrays. If we look at our database in BigQuery you can see the CAST is loaded as a separate table.

BigQuery Table Overview

dlt normalises nested data by populating them in separate tables and creates relationships between the tables, so they can be combined together using normal SQL joins. All this is taken care of by dlt and we need not worry about how transformations are handled. In short, the transformation steps we discussed in Why is dlt useful when you want to ingest data from a production database such as MongoDB? are taken care of by dlt, making the data analyst's life easier.

To better understand how dlt does this transformation, refer to the docs.

3. Self-service analytics for MongoDB with Holistics.

After dlt ingests the data into your data warehouse, you can connect Holistics to the data warehouse and model, govern, and set up your self-service analytics platform for end-user consumption.

By combining dlt with Holistics we get the best of both worlds. The flexibility of an open source library for data ingestion that we can customize based on changing data needs, and a self-service BI tool in Holistics that can not only be used for analytics but also introduces a data modeling layer where metrics and data logic can be defined. Holistics also has support for Git version control to track code changes and can integrate with dbt for streamlining data transformations.

We took care of the data ingestion step in the previous section. We can now connect to our SQL data warehouse, and start transforming the data using the modeling layer in Holistics. We will be using the newest version of Holistics, Holistics 4.0 for this purpose.

In Holistics, add a new data source click on the plus sign (+) on the top menu, and then select Connect Data Sources. Select New Data Sources and in the database type select Google BigQuery. We need to provide the service account credentials that were generated above when we connected dlt to BigQuery. For more detailed instructions on connecting BigQuery to Hollistics refer to this guide.

Once the BigQuery source is added we are ready to import the schemas from BigQuery into Holistics. The schema(dataset_name) name under which dlt loaded the MongoDB data is defined in the load_entire_database function when we create the MongoDB pipeline.

# Create a pipeline
pipeline = dlt.pipeline(
pipeline_name="local_mongo",
destination='bigquery',
dataset_name="mongo_database", # Schema Name
)

4. Modeling the Data and Relationships with Holistics.

To use the data, we will define a data model and the join paths that Holistics can use to build the semantic datasets.

A data model is an abstract view on top of a physical database table that you may manipulate without directly affecting the underlying data. It allows you to store additional metadata that may enrich the underlying data in the data table.

In Holistics, go to the Modelling 4.0 section from the top bar. We will be greeted with the Start page as we have created no models or datasets. We will turn on the development mode from the top left corner. The development model will allow you to experiment with the data without affecting the production datasets and reporting. To keep things organized let’s create two folders called Models and Datasets.

Adding Holistics Data Model(s):

Under the Models folder, let's add the MongoDB data from BigQuery as Table Models. Hover over the Models folder and click on the (+) sign then select Add Table Model. In the Data Sources select the BigQuery Source we created before and then select the relevant table models to import into Holistics. In this case, we are importing the movies, movies_cast and movies_directors tables.

Holistics Add Model

Adding Holistics Dataset(s) and Relationships:

After the Data Models have been added, we can create a Dataset with these models and use them for reporting.

info

Dataset is a "container" holding several Data Models together so they can be explored together, and dictating which join path to be used in a particular analytics use case.

Datasets works like a data marts, except that it exists only on the semantic layer. You publish these datasets to your business users to let them build dashboards, or explore existing data.

Hover over the Datasets folder, click on the (+) sign, and then select Add Datasets. Select the previously created Table Models under this dataset, and Create Dataset.

Holistics Create Dataset

We will then be asked to create relationships between the models. We create a Many-to-one (n - 1) relationship between the cast and the movies models.

Add Relationship between Models

The resulting relationship can seen As Code using the Holistics 4.0 Analytics as Code feature. To activate this feature click on the newly created dataset and select the View as Code option from the top right. For more detailed instructions on setting up relationships between models refer to the model relationship guide.

Previously, we created the relationship between the cast and the movies tables using GUI, now let’s add the relationship between the directors and movies tables using the Analytics as Code feature. In the dataset.aml file append the relationships block with the following line of code:

relationship(model__mongo_database_movies_directors.dlt_parent_id > model__mongo_database_movies.dlt_id, true)

After the change, the dataset.aml file should look like this:

import '../Models/mongo_database_movies.model.aml' {
mongo_database_movies as model__mongo_database_movies
}
import '../Models/mongo_database_movies_cast.model.aml' {
mongo_database_movies_cast as model__mongo_database_movies_cast
}
import '../Models/mongo_database_movies_directors.model.aml' {
mongo_database_movies_directors as model__mongo_database_movies_directors
}

Dataset movies {
label: 'Movies'
description: ''
data_source_name: 'bigquery_mongo'
models: [
model__mongo_database_movies,
model__mongo_database_movies_cast,
model__mongo_database_movies_directors
]
relationships: [
relationship(model__mongo_database_movies_cast.dlt_parent_id > model__mongo_database_movies.dlt_id, true),
relationship(model__mongo_database_movies_directors.dlt_parent_id > model__mongo_database_movies.dlt_id, true)
]
owner: 'zaeem@dlthub.com'
}

The corresponding view for the dataset.aml file in the GUI looks like this:

Add Relationship GUI

Once the relationships between the tables have been defined we are all set to create some visualizations. We can select the Preview option from next to the View as Code toggle to create some visualization in the development mode. This comes in handy if we have connected an external git repository to track our changes, this way we could test out the dataset in preview mode before committing and pushing changes, and deploying the dataset to production.

In the current scenario, we will just directly deploy the dataset to production as we have not integrated a Git Repository. For more information on connecting a Git Repository refer to the Holistics docs.

The Movies dataset should now be available in the Reporting section. We will create a simple visualization that shows the workload of the cast and directors. In simple words, How many movies did an actor or director work on in a single year?

Visualization and Self-Service Analytics with Holistics:

The visualization part is pretty self-explanatory and is mostly drag and drop as we took the time to define the relationships between the tables. Below we create a simple table in Holistics that shows the actors that have appeared in most movies since the year 2000.

Holistics Create Visualization

Similarly, we can add other reports and combine them into a dashboard. The resulting dashboard can be seen below:

Holistics Dashboard

Conclusion

In this blog, we have introduced a modern data stack that uses dlt and Holistics to address the MongoDB data accessibility issue.

We leverage dlt, to extract, normalize, create schemas, and load data into BigQuery, making it more structured and accessible. Additionally, Holistics provides the means to transform and model this data, adding relationships between various datasets, and ultimately enabling self-service analytics for the broader range of stakeholders in the organization.

This modern data stack offers an efficient and effective way to bridge the gap between MongoDB's unstructured data storage capabilities and the diverse needs of business, operations, and data science professionals, thereby unlocking the full potential of the data within MongoDB for the entire Company.

Additional Resources:

This demo works on codespaces. Codespaces is a development environment available for free to anyone with a Github account. You'll be asked to fork the demo repository and from there the README guides you with further steps.
The demo uses the Continue VSCode extension.

Off to codespaces!

DHelp

Ask a question

Welcome to "Codex Central", your next-gen help center, driven by OpenAI's GPT-4 model. It's more than just a forum or a FAQ hub – it's a dynamic knowledge base where coders can find AI-assisted solutions to their pressing problems. With GPT-4's powerful comprehension and predictive abilities, Codex Central provides instantaneous issue resolution, insightful debugging, and personalized guidance. Get your code running smoothly with the unparalleled support at Codex Central - coding help reimagined with AI prowess.