Kafka
Kafka is an open-source distributed event streaming platform, organized
in the form of a log with message publishers and subscribers.
The Kafka dlt
verified source loads data using the Confluent Kafka API to the destination of your choice.
See a pipeline example.
The resource that can be loaded:
Name | Description |
---|---|
kafka_consumer | Extracts messages from Kafka topics |
Setup guide
Grab Kafka cluster credentials
- Follow the Kafka Setup to tweak a project.
- Follow the Configuration to get the project credentials.
Initialize the verified source
To get started with your data pipeline, follow these steps:
Enter the following command:
dlt init kafka duckdb
This command will initialize the pipeline example with Kafka as the source and duckdb as the destination.
If you'd like to use a different destination, simply replace
duckdb
with the name of your preferred destination.After running this command, a new directory will be created with the necessary files and configuration settings to get started.
For more information, read the Walkthrough: Add a verified source.
Add credentials
In the
.dlt
folder, there's a file calledsecrets.toml
. It's where you store sensitive information securely, like access tokens. Keep this file safe.Use the following format for service account authentication:
[sources.kafka.credentials]
bootstrap_servers="web.address.gcp.confluent.cloud:9092"
group_id="test_group"
security_protocol="SASL_SSL"
sasl_mechanisms="PLAIN"
sasl_username="example_username"
sasl_password="example_secret"
- Enter credentials for your chosen destination as per the docs.
Run the pipeline
Before running the pipeline, ensure that you have installed all the necessary dependencies by running the command:
pip install -r requirements.txt
You're now ready to run the pipeline! To get started, run the following command:
python kafka_pipeline.py
Once the pipeline has finished running, you can verify that everything loaded correctly by using the following command:
dlt pipeline <pipeline_name> show
For more information, read the Walkthrough: Run a pipeline.
If you created a topic and start reading from it immediately, the brokers may not yet be synchronized, and the offset from which dlt
reads messages may become invalid. In this case, the resource will return no messages. Pending messages will be received on the next run (or when brokers synchronize).
Sources and resources
dlt
works on the principle of sources and
resources.
Source kafka_consumer
This function retrieves messages from the given Kafka topics.
@dlt.resource(name="kafka_messages", table_name=lambda msg: msg["_kafka"]["topic"], standalone=True)
def kafka_consumer(
topics: Union[str, List[str]],
credentials: Union[KafkaCredentials, Consumer] = dlt.secrets.value,
msg_processor: Optional[Callable[[Message], Dict[str, Any]]] = default_msg_processor,
batch_size: Optional[int] = 3000,
batch_timeout: Optional[int] = 3,
start_from: Optional[TAnyDateTime] = None,
) -> Iterable[TDataItem]:
...
topics
: A list of Kafka topics to be extracted.
credentials
: By default, it is initialized with the data from
the secrets.toml
. It may be used explicitly to pass an initialized
Kafka Consumer object.
msg_processor
: A function that will be used to process every message
read from the given topics before saving them in the destination.
It can be used explicitly to pass a custom processor. See the
default processor
as an example of how to implement processors.
batch_size
: The number of messages to extract from the cluster
at once. It can be set to tweak performance.
batch_timeout
: The maximum timeout (in seconds) for a single batch reading
operation. It can be set to tweak performance.
start_from
: A timestamp, starting from which the messages must
be read. When passed, dlt
asks the Kafka cluster for an offset,
which is actual for the given timestamp, and starts to read messages from
this offset.
Customization
Create your own pipeline
Configure the pipeline by specifying the pipeline name, destination, and dataset as follows:
pipeline = dlt.pipeline(
pipeline_name="kafka", # Use a custom name if desired
destination="duckdb", # Choose the appropriate destination (e.g., duckdb, redshift, post)
dataset_name="kafka_data" # Use a custom name if desired
)To extract several topics:
topics = ["topic1", "topic2", "topic3"]
resource = kafka_consumer(topics)
pipeline.run(resource, write_disposition="replace")To extract messages and process them in a custom way:
def custom_msg_processor(msg: confluent_kafka.Message) -> Dict[str, Any]:
return {
"_kafka": {
"topic": msg.topic(), # required field
"key": msg.key().decode("utf-8"),
"partition": msg.partition(),
},
"data": msg.value().decode("utf-8"),
}
resource = kafka_consumer("topic", msg_processor=custom_msg_processor)
pipeline.run(resource)To extract messages, starting from a timestamp:
resource = kafka_consumer("topic", start_from=pendulum.DateTime(2023, 12, 15))
pipeline.run(resource)