Skip to content

Latest commit

 

History

History

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 
 
 
 
 

Overview

This tool is a Kafka producer written in Python using confluent-kafka-python client library. The producer is required in order to records from data cubes created in SQL data warehouse of TPC-DS data to Kafka in the form of JSON records (each table row becomes a JSON record in Kafka). The tool works with any SQLAlchemy compatible SQL database, in my case I was using PostgreSQL.

Prerequisites

Python

In order to use the Kafka producer you are advised to create a Python virtual environment in order to install the required libraries.

Here we provide an example of creating the Python environment using Anaconda you can use the package manager of your preference

conda create --name sql_kafka_producer python=3.10
conda activate sql_kafka_producer
pip install -r requirements.txt

Environment

You will also need to create a .env file containing the required environmental variables in order to connect with the TPC-DS SQL database and the sink Kafka Broker. You can make a copy of the .env.example file provided to create yours.

DB_HOST=localhost
DB_PORT=5432
DB_USER=postgres
DB_PASS=postgres
DB_NAME=tpcd2

KAFKA_BROKERS=localhost:9093

Run

In order to produce data from SQL tables to Kafka topic you need to use sql_to_kafka.py script. This script given a table will produce each of its records as a flat JSON record in the Kafka topic. The script provides a CLI with some options about source table, destination topic, limit number of records etc.

In order to review all the available options of the CLI tool you can run the following command

python sql_to_kafka.py -h

Example scenario

For example in order to generate 5000 records from table/view web_sales_cube_nonull in topic test1(the definition of this view can be found in dsdgen/dw/web_sales_cube_nonull.sql file):

python sql_to_kafka.py --table_name web_sales_cube_nonull --topic_name tpcds2 --batch_size 10000 --limit 5000