Skip to main content

2 posts tagged with "Pub/Sub"

View All Tags

Β· 9 min read
Zaeem Athar
info

TL;DR: This blog post introduces a cost-effective solution for event streaming that results in up to 18x savings. The solution leverages Cloud Pub/Sub and dlt to build an efficient event streaming pipeline.

The Segment Problem​

Event tracking is a complicated problem for which there exist many solutions. One such solution is Segment, which offers ample startup credits to organizations looking to set up event ingestion pipelines. Segment is used for a variety of purposes, including web analytics.

note

πŸ’‘ With Segment, you pay 1-1.2 cents for every tracked users.

Let’s take a back-of-napkin example: for 100.000 users, ingesting their events data would cost $1000.

The bill:

  • Minimum 10,000 monthly tracked users (0-10K) + $120.
  • Additional 1,000 monthly tracked users (10K - 25K) + $12 / 1000 user.
  • Additional 1,000 monthly tracked users (25k - 100K) + $11 / 1000 user.
  • Additional 1,000 monthly tracked users (100k +) + $10 / 1000 user.

The price of $1000/month for 100k tracked users doesn’t seem excessive, given the complexity of the task at hand.

However, similar results can be achieved on GCP by combining different services. If those 100k users produce 1-2m events, those costs would stay in the $10-60 range.

In the following sections, we will look at which GCP services can be combined to create a cost-effective event ingestion pipeline that doesn’t break the bank.

goodbye segment

The Solution to the Segment Problem​

Our proposed solution to replace Segment involves using dlt with Cloud Pub/Sub to create a simple, scalable event streaming pipeline. The pipeline's overall architecture is as follows:

pubsub_dlt-pipeline

In this architecture, a publisher initiates the process by pushing events to a Pub/Sub topic. Specifically, in the context of dlt, the library acts as the publisher, directing user telemetry data to a designated topic within Pub/Sub.

A subscriber is attached to the topic. Pub/Sub offers a push-based subscriber that proactively receives messages from the topic and writes them to Cloud Storage. The subscriber is configured to aggregate all messages received within a 10-minute window and then forward them to a designated storage bucket.

Once the data is written to the Cloud Storage this triggers a Cloud Function. The Cloud Function reads the data from the storage bucket and uses dlt to ingest the data into BigQuery.

Code Walkthrough​

This section dives into a comprehensive code walkthrough that illustrates the step-by-step process of implementing our proposed event streaming pipeline.

Implementing the pipeline requires the setup of various resources, including storage buckets and serverless functions. To streamline the procurement of these resources, we'll leverage Terraformβ€”an Infrastructure as Code (IaC) tool.

Prerequisites​

Before we embark on setting up the pipeline, there are essential tools that need to be installed to ensure a smooth implementation process.

Permissions​

Next, we focus on establishing the necessary permissions for our pipeline. A crucial step involves creating service account credentials, enabling Terraform to create and manage resources within Google Cloud seamlessly.

Please refer to the Google Cloud documentation here to set up a service account. Once created, it's important to assign the necessary permissions to the service account. The project README lists the necessary permissions. Finally, generate a key for the created service account and download the JSON file. Pass the credentials as environment variables in the project root directory.

export GOOGLE_APPLICATION_CREDENTIALS="/path/to/keyfile.json"

Setting Up The Event Streaming Pipeline​

To set up our pipeline, start by cloning the GitHub Repository. The repository contains all the necessary components, structured as follows:

.
β”œβ”€β”€ README.md
β”œβ”€β”€ cloud_functions
β”‚ β”œβ”€β”€ main.py
β”‚ └── requirements.txt
β”œβ”€β”€ publisher.py
β”œβ”€β”€ requirement.txt
β”œβ”€β”€ terraform
β”‚ β”œβ”€β”€ backend.tf
β”‚ β”œβ”€β”€ cloud_functions.tf
β”‚ β”œβ”€β”€ main.tf
β”‚ β”œβ”€β”€ provider.tf
β”‚ β”œβ”€β”€ pubsub.tf
β”‚ β”œβ”€β”€ storage_buckets.tf
β”‚ └── variables.tf

Within this structure, the Terraform directory houses all the Terraform code required to set up the necessary resources on Google Cloud.

Meanwhile, the cloud_functions folder includes the code for the Cloud Function that will be deployed. This function will read the data from storage and use dlt to ingest data into BigQuery. The code for the function can be found in cloud_functions/main.py file.

Step 1: Configure Service Account Credentials​

To begin, integrate the service account credentials with Terraform to enable authorization and resource management on Google Cloud. Edit the terraform/main.tf file to include the path to your service account's credentials file as follows:

provider "google" {
credentials = file("./../credentials.json")
project = var.project_id
region = var.region
}

Step 2: Define Required Variables​

Next, in the terraform/variables.tf define the required variables. These variables correspond to details within your credentials.json file and include your project's ID, the region for resource deployment, and any other parameters required by your Terraform configuration:

variable "project_id" {
type = string
default = "Add Project ID"
}

variable "region" {
type = string
default = "Add Region"
}

variable "service_account_email" {
type = string
default = "Add Service Account Email"
}

Step 3: Procure Cloud Resources​

We are now ready to set up some cloud resources. To get started, navigate into the terraform directory and terraform init. The command initializes the working directory containing Terraform configuration files.

With the initialization complete, you're ready to proceed with the creation of your cloud resources. To do this, run the following Terraform commands in sequence. These commands instruct Terraform to plan and apply the configurations defined in your .tf files, setting up the infrastructure on Google Cloud as specified.

terraform plan
terraform apply

This terraform plan command previews the actions Terraform intends to take based on your configuration files. It's a good practice to review this output to ensure the planned actions align with your expectations.

After reviewing the plan, execute the terraform apply command. This command prompts Terraform to create or update resources on Google Cloud according to your configurations.

The following resources are created on Google Cloud once terraform apply command is executed:

NameTypeDescription
tel_storageBucketBucket for storage of telemetry data.
pubsub_cfunctionsBucketBucket for storage of Cloud Function source code.
storage_bigqueryCloud FunctionThe Cloud Function that runs dlt to ingest data into BigQuery.
telemetry_data_teraPub/Sub TopicPub/Sub topic for telemetry data.
push_sub_storagePub/Sub SubscriberPub/Sub subscriber that pushes data to Cloud Storage.

Step 4: Run the Publisher​

Now that our cloud infrastructure is in place, it's time to activate the event publisher. Look for the publisher.py file in the project root directory. You'll need to provide specific details to enable the publisher to send events to the correct Pub/Sub topic. Update the file with the following:

# TODO(developer)
project_id = "Add GCP Project ID"
topic_id = "telemetry_data_tera"

The publisher.py script is designed to generate dummy events, simulating real-world data, and then sends these events to the specified Pub/Sub topic. This process is crucial for testing the end-to-end functionality of our event streaming pipeline, ensuring that data flows from the source (the publisher) to our intended destinations (BigQuery, via the Cloud Function and dlt). To run the publisher execute the following command:

python publisher.py

Step 5: Results​

Once the publisher sends events to the Pub/Sub Topic, the pipeline is activated. These are asynchronous calls, so there's a delay between message publication and their appearance in BigQuery.

The average completion time of the pipeline is approximately 12 minutes, accounting for the 10-minute time interval after which the subscriber pushes data to storage plus the Cloud Function execution time. The push interval of the subscriber can be adjusted by changing the max_duration in pubsub.tf

cloud_storage_config {
bucket = google_storage_bucket.tel_bucket_storage.name

filename_prefix = "telemetry-"

max_duration = "600s"

}

Our Cost Estimation​

On average the cost for our proposed pipeline are as follows:

  • 100k users tracked on Segment would cost $1000.
  • 1 million events ingested via our setup $37.
  • Our web tracking user:event ratio is 1:15, so the Segment cost equivalent would be $55.
  • Our telemetry device:event ratio is 1:60, so the Segment cost equivalent would be $220.

So with our setup, as long as we keep events-to-user ratio under 270, we will have cost savings over Segment. In reality, it gets even better because GCP offers a very generous free tier that resets every month, where Segment costs more at low volumes.

GCP Cost Calculation: Currently, our telemetry tracks 50,000 anonymized devices each month on a 1:60 device-to-event ratio. Based on these data volumes we can estimate the cost of our proposed pipeline.

Cloud Functions is by far the most expensive service used by our pipeline. It is billed based on the vCPU / memory, compute time, and number of invocations.

note

πŸ’‘ The cost of compute for 512MB / .333vCPU machine time for 1000ms is as follows

MetricUnit Price
GB-seconds (Memory)$0.000925
GHz-seconds (vCPU)$0.001295
Invocation$0.0000004
Total0.0022

This puts the monthly cost of ingesting 1 million events with Cloud Functions at:

  • (1 million / 60) * 0.0022 cents = $37

In Conclusion​

Event streaming pipelines don’t need to be expensive. In this demo, we present an alternative to Segment that offers up to 18x in savings in practice. Our proposed solution leverages Cloud Pub/Sub and dlt to deliver a cost-effective streaming pipeline.

Following this demo requires knowledge of the publisher-subscriber model, dlt, and GCP. It took about 4 hours to set up the pipeline from scratch, but we went through the trouble and set up Terraform to procure infrastructure.

Use terraform apply to set up the needed infrastructure for running the pipeline. This can be done in 30 minutes, allowing you to evaluate the proposed solution's efficacy without spending extra time on setup. Please do share your feedback.

P.S: We will soon be migrating from Segment. Stay tuned for future posts where we document the migration process and provide a detailed analysis of the associated human and financial costs.

Β· 7 min read
William Laroche
info

TL;DR: William, a gcp data consultant, shares an article about the work he did with dlt and GCP to create a secure, scalable, lightweight, and powerful high-volume event ingestion engine.

He explores several alternatives before offering a solution, and he benchmarks the solution after a few weeks of running.

Read the original post here: dataroc.ca blog. Or find/hire William on Linkedin.

In the ever-evolving landscape of cloud computing, optimizing data workflows is paramount for achieving efficiency and scalability. Even though Google Cloud Platform offers the powerful Dataflow service to process data at scale, sometimes the simplest solution is worth a shot.

In cases with a relatively high Pub/Sub volume (>10 messages per second), a pull subscription with a continuously running worker is more cost-efficient and quicker than a push subscription. Using a combination of Docker, Instance Templates and Instance Groups, it is pretty simple to set up an auto-scaling group of instances that will process Pub/Sub messages.

This guide will walk you through the process of configuring GCP infrastructure that efficiently pulls JSON messages from a Pub/Sub subscription, infers schema, and inserts them directly into a Cloud SQL PostgreSQL database using micro-batch processing.

The issue at hand

In my current role at WishRoll, I was faced with the issue of processing a high amount of events and store them in the production database directly.

Imagine the scene: the server application produces analytics-style events such as "user logged-in", and "task X was completed" (among others). Eventually, for example, we want to run analytics queries on those events to count how many times a user logs in to better tailor their experience.

A. The trivial solution: synchronous insert​

The trivial solution is to synchronously insert these events directly in the database. A simple implementation would mean that each event fired results in a single insert to the database. This comes with 2 main drawbacks:

  • Every API call that produces an event becomes slower. I.e. the /login endpoint needs to insert a record in the database
  • The database is now hit with a very high amount of insert queries

With our most basic need of 2 event types, we were looking at about 200 to 500 events per second. I concluded this solution would not be scalable. To make it so, 2 things would be necessary: (1) make the event firing mechanism asynchronous and (2) bulk events together before insertion.

B. The serverless asynchronous solution​

A second solution is to use a Pub/Sub push subscription to trigger an HTTP endpoint when a message comes in. This would've been easy in my case because we already have a worker-style autoscaled App Engine service that could've hosted this. However, this only solves the 1st problem of the trivial solution; the events still come in one at a time to the HTTP service.

Although it's possible to implement some sort of bulking mechanism in a push endpoint, it's much easier to have a worker pull many messages at once instead.

C. The serverless, fully-managed Dataflow solution​

This led me to implement a complete streaming pipeline using GCP's streaming service: Dataflow. Spoiler: this was way overkill and led to weird bugs with DLT (data load tool). If you're curious, I've open-sourced that code too.

This solved both issues of the trivial solution, but proved pretty expensive and hard to debug and monitor.

D. An autoscaled asynchronous pull worker​

Disclaimer: I had never considered standalone machines from cloud providers (AWS EC2, GCP Compute Engine) to be a viable solution to my cloud problems. In my head, they seemed like outdated, manually provisioned services that could instead be replaced by managed services.

But here I was, with a need to have a continuously running worker. I decided to bite the bullet and try my luck with GCP Compute Engine. What I realized to my surprise, is that by using instance templates and instance groups, you can easily set up a cluster of workers that will autoscale.

The code is simple: run a loop forever that pulls messages from a Pub/Sub subscription, bulk the messages together, and then insert them in the database. Repeat.

Then deploy that code as an instance group that auto-scales based on the need to process messages.

Code walkthrough

The complete source code is available here.

Summarily, the code is comprised of 2 main parts:

  • The pulling and batching logic to accumulate and group messages from Pub/Sub based on their destination table
  • The load logic to infer the schema and bulk insert the records into the database. This part leverages DLT for destination compatibility and schema inference

Main loop​

By using this micro-batch architecture, we strive to maintain a balance of database insert efficiency (by writing multiple records at a time) with near real-time insertion (by keeping the window size around 5 seconds).


pipeline = dlt.pipeline(
pipeline_name="pubsub_dlt",
destination=DESTINATION_NAME,
dataset_name=DATASET_NAME,
)

pull = StreamingPull(PUBSUB_INPUT_SUBCRIPTION)
pull.start()

try:
while pull.is_running:
bundle = pull.bundle(timeout=WINDOW_SIZE_SECS)
if len(bundle):
load_info = pipeline.run(bundle.dlt_source())
bundle.ack_bundle()
# pretty print the information on data that was loaded
print(load_info)
else:
print(f"No messages received in the last {WINDOW_SIZE_SECS} seconds")

finally:
pull.stop()

How to deploy​

The GitHub repo explains how to deploy the project as an instance group.

Database concerns​

Using DLT has the major advantage of inferring the schema of your JSON data automatically. This also comes with some caveats:

  • The output schema of these analytics tables might change based on events
  • If your events have a lot of possible properties, the resulting tables could become very wide (lots of columns) which is not something desirable in an OLTP database

Given these caveats, I make sure that all events fired by our app are fully typed and limited in scope. Moreover, using the table_name_data_key configuration of the code I wrote, it's possible to separate different events with different schemas into different tables.

See this README section for an example of application code and the resulting table.

Performance and cost

After running this code and doing backfills for a couple of weeks, I was able to benchmark the overall efficiency and cost of this solution.

Throughput capacity​

The pull worker performance

The Pub/Sub subscription metrics. Message throughput ranges between 200 and 300 per second, while the oldest message is usually between 5 and 8 seconds with occasional spikes.

I am running a preemptible (SPOT) instance group of n1-standard-1 machines that auto-scales between 2 and 10 instances. In normal operation, a single worker can handle our load easily. However, because of the preemptible nature of the instances, I set the minimum number to 2 to avoid periods where no worker is running.

Maximum capacity​

When deploying the solution with a backlog of messages to process (15 hours worth of messages), 10 instances were spawned and cleared the backlog in about 25 minutes.

The Pub/Sub subscription throughput metrics when a 15-hour backlog was cleared. The instance group gradually reached 10 instances at about 10:30AM, then cleared the backlog by 10:50AM.

Between 7000 and 10000 messages per second were processed on average by these 10 instances, resulting in a minimum throughput capacity of 700 messages/s per worker.

Cost​

Using n1-standard-1 spot machines, this cluster costs $8.03/mth per active machine. With a minimum cluster size of 2, this means $16.06 per month.

Conclusion

Using more "primitive" GCP services around Compute Engine provides a straightforward and cost-effective way to process a high throughput of Pub/Sub messages from a pull subscription.

info

PS from dlt team:

  • We just added data contracts enabling to manage schema evolution behavior.
  • Are you on aws? Check out this AWS SAM & Lambda event ingestion pipeline here.

This demo works on codespaces. Codespaces is a development environment available for free to anyone with a Github account. You'll be asked to fork the demo repository and from there the README guides you with further steps.
The demo uses the Continue VSCode extension.

Off to codespaces!

DHelp

Ask a question

Welcome to "Codex Central", your next-gen help center, driven by OpenAI's GPT-4 model. It's more than just a forum or a FAQ hub – it's a dynamic knowledge base where coders can find AI-assisted solutions to their pressing problems. With GPT-4's powerful comprehension and predictive abilities, Codex Central provides instantaneous issue resolution, insightful debugging, and personalized guidance. Get your code running smoothly with the unparalleled support at Codex Central - coding help reimagined with AI prowess.