The POC on this project was focused on getting a technology which can enable us to centralize the data from our cassandra database into our data lake which is a Mariadb database.
Below were some technologies and third-party vendors who I reached out to:
- Confluent Kafka source connector
- DataStax Kafka source connector
- Cdata source for Cassandra
- Impetus professional services
- InstaClustr Inc.
- Debezium Kafka producer
- Spark (DSE Analytics)
Because of limitation with other options Debeizium and DSE Analytics turned out to be the best choices.
Debezium Cassandra connector is a single JVM process that is intended to reside on each Cassandra node and publishes events to Kafka via a Kafka producer.
The connector reads the commit logs from the cdc_raw directory and published the messages to the kafka broker.
The write path of Cassandra starts with the immediate logging of a change into its commit log. The commit log resides locally on each node, recording every write made to that node.
Since Cassandra 3.0, a change data capture (CDC) feature is introduced. The CDC feature can be enabled on the table level by setting the table property cdc=true, after which any commit log containing data for a CDC-enabled table will be moved to the CDC directory specified in cassandra.yaml on discard.
The Cassandra connector resides on each Cassandra node and monitors the cdc_raw directory for change (configurable via cassandra.yml).
- It processes all local commit log segments as they are detected
- Produces a change event for every row-level insert, update, and delete operations in the commit log
- Publishes all change events for each table in a separate Kafka topic
- Finally deletes the commit log from the cdc_raw directory.
This last step is important because once CDC is enabled, Cassandra itself cannot purge the commit logs. If the cdc_free_space_in_mb fills up, writes to CDC-enabled tables will be rejected.
The connector is tolerant of failures. As the connector reads commit logs and produces events, it records each commit log segment’s filename and position along with each event. If the connector stops for any reason (including communication failures, network problems, or crashes), upon restart it simply continues reading the commit log where it last left off. This includes snapshots: if the snapshot was not completed when the connector is stopped, upon restart it will begin a new snapshot.
- Debezium connector on Cassandra nodes
- Kafka broker
- Custom kafka consumer for Cassandra
- Apache Airflow
-
Node level
- cdc_enabled: Enables or disables CDC operations node-wide
cdc_enabled: true
- cdc_raw_directory: Determines the destination for commit log segments to be moved after all corresponding memtables are flushed
cdc_raw_directory: $CASSANDRA_HOME/data/cdc_raw
- cdc_total_space_in_mb: The maximum capacity allocated to store commit log segments, and defaults to the minimum of 4096 MB and 1/8 of volume space where the cdc_raw_directory resides.
cdc_total_space_in_mb: 4096
- cdc_free_space_check_interval_ms: Frequency with which we re-calculate the space taken up by cdc_raw_directory to prevent burning CPU cycles unnecessarily when at capacity
cdc_free_space_check_interval_ms: 250
- commitlog_total_space_in_mb: Disk usage threshold for commit logs before triggering the database to flush memtables to disk. If the total space used by all commit logs exceeds this threshold, the database flushes memtables to disk for the oldest commitlog segments to reclaim disk space by removing those log segments from the log. If the commitlog_total_space_in_mb is small, the result is more flush activity on less-active tables.
commitlog_total_space_in_mb:100
- cdc_enabled: Enables or disables CDC operations node-wide
-
Table Level: While creating a new table cdc=true should be enabled and all the existing tables should be altered to have cdc=true property.
- Create table
CREATE TABLE foo (a int, b text, PRIMARY KEY(a)) WITH cdc=true;
- Alter table
ALTER TABLE foo WITH cdc=true;
First Half (Cassandra to Kafka)
Second Half (Kafka to Mariadb)
Yet to be decided
- CDC
- Commit logs on an individual Cassandra node do not reflect all writes to the cluster, they only reflect writes stored on that node. Therefore, it is necessary to monitor changes on all nodes in a Cassandra cluster.
- Due to replication factor, it is necessary for downstream consumers of these events to handle deduplication.
- Commit logs only arrive in cdc_raw directory when it is full, in which case it would be flushed/discarded. This implies there is a delay between when the event is logged and when the event is captured.
- Schema changes of tables are not recorded in commit logs, only data changes are recorded.
- Debezium
- If the connector goes down for any reason, then it can lead to accumulation of commit log files in the cdc_raw directory and bloat up disk storage because cassandra itself cannot purge the commit logs. If the cdc_free_space_in_mb fills up, writes to CDC-enabled tables will be rejected.
- Unable to identify snitching properties: Connector couldn’t identify other snitching properties which is a known bug ( https://issues.redhat.com/browse/DBZ-1987 )
Resolution: Create a separate cassandra.yaml with SimpleSnitch property as gossip method and provide its path to the config.properties file of the debezium connector.
- Connector couldn’t identify replication strategy: EverywhereStrategy
Resolution: Convert all the keyspaces which are configured as EverywhereStrategy to NetworkTopology Strategy.
Note : Both of the above problems were reported internally here : **https://digicertinc.atlassian.net/browse/DATA-3399
- Downstream Sinks
- Schema changes of tables are not recorded in commit logs, only data changes are recorded. Therefore changes in schema are detected on a best-effort basis. To avoid data loss, it is recommended to pause writes to the table during schema change.
- There is no direct 1-to-1 mapping between Cassandra's datatypes and MySQL/MariaDB datatypes.
- Cassandra's size limitations are often more relaxed than MySQL/MariaDB's. For example, Cassandra's limit on rowkey length is about 2G, while MySQL limits unique key length to about 1.5Kb.
- Unsupported Datatypes such as UDTs, Complex Data types.
-
Git clone the debezium poc repo
git clone https://github.com/anisriva/debezium-dse.git
-
Start the docker-compose
docker-compose -f docker-compose.yaml up -d
-
Run startup script to:
- Move the connectors binaries and config to $DEBEZIUM_HOME
- Change the EverywhereStrategy to NetworkTopology
- Create a dummy schema in the Cassandra db
- Start debezium process
- Insert huge number of records in customers table
docker container exec -it cassandra sh //home//startup-script.sh
-
Check out the debezium logs:
docker container exec -it cassandra-dse cat //opt//dse//resources//debezium//debezium.stderr.log
-
Check kafka manager to see the topics being created in the cluster or not.
The above setup is just the first half of the pipeline the next half of the pipeline requires new phase of development and design. A better approach to this problem will be to have a system which can:
- Support the varied verities of datatypes that is required to be incorporated.
- Should be able to remove the duplicates which will be generated due to the replication factor.
Because of above limitations and problems, I decided to move towards the DSE Analytics route. In next section we will discuss about DSE Analytics workload and Spark.
Apache Spark is a high performing engine for large-scale analytics and data processing. While Apache Spark provides advanced analytics capabilities, it requires a fast, distributed backend data store. Apache Cassandra is the most modern, reliable, and scalable choice for that data store. Spark when fully integrated with the key components of Cassandra, provides the resilience and scale required for big data analytics.
Cassandra is a highly scalable NoSql db which provides a very efficient write path with limitations in the way we read data. Cassandra exposes CQL which is a SQL like interface to fetch data but has major limitations with common sql like functionalities like joins, groups, and complex aggregations on top of the data.
DSE Analytics provides an isolated layer on top of the transactional system for analytical workloads and data needs. This requires a creation of a separate datacenter within the same cluster with additional hardware resources.
This setup can be implemented on top of the current datacenter as well but its highly recommended to have a separate isolated workload for running spark jobs as this will be a resource intensive task and we don’t want our applications to be affected by it.
Spark supports a rich set of higher-level tools including:
- Spark SQL
- MLlib
- GraphX
- Spark Streaming
- Cassandra
- Spark
- Spark SQL
- Scala or PySpark
- Airflow
To setup the analytical workload below operations is required to be performed.
- Allocate the analytical nodes.
- Join the nodes as a separate datacenter into the cluster.
- Configure the keyspaces to ensure they replicate in the analytical DC
- Perform node repair on the impacted keyspaces.
Data Per Node | CPU/Chipset | Memory | Disk | Network |
---|---|---|---|---|
817GB per node +/- 295GB | 28 CPU Cores | 128GB | 3.2TB | 1.6TB solr_data |
We have a 8 node transactional data center and we are planning to go for either a 3 or a 6 node analytics datacenter. |
Data Per Node | CPU/Chipset | Memory | Disk | Network |
---|---|---|---|---|
~ 2.5 TB per node | 28 CPU Cores or higher if available | 320GB | 5 TB | 1.6TB solr_data |
Data Per Node | CPU/Chipset | Memory | Disk | Network |
---|---|---|---|---|
~ 1.5 TB per node | 28 CPU Cores or higher if available | 256 GB | 4 TB | 1.6TB solr_data |
The Spark SQL Thriftserver uses JDBC and ODBC interfaces for client connections to the database.
The AlwaysOn SQL service is a high-availability service built on top of the Spark SQL Thriftserver. The Spark SQL Thriftserver is started manually on a single node in an Analytics datacenter, and will not failover to another node. Both AlwaysOn SQL and the Spark SQL Thriftserver provide JDBC and ODBC interfaces to DSE, and share many configuration settings.
The inhouse setup includes the thriftserver setup and no additional work is required. For more information please see this documentation.
https://docs.datastax.com/en/dse/6.8/dse-dev/datastax_enterprise/spark/sparkSqlThriftServer.html
NOTE: AlwaysOnSQL is only available in DSE 6.x therefore we will use only thrift server in this setup
-
Git clone the dse-analytics-spark repository from github.
git clone https://github.com/anisriva/dse-analytics-spark.git
-
Enter the directory
cd dse-analytics-spark
-
Use the spark script to run the setup
sh start.sh
-
Run DSE-studio using below link DS-Studio
-
Run Jupyter notebook by following below steps Jupyter
-
Using Spark shell
- Spark-sql
docker container exec -it analytics-seed dse spark-sql
- Spark-scala
docker container exec -it analytics-seed dse spark
- Pyspark
docker container exec -it analytics-seed dse pyspark
- Spark-sql
-
Setting up spark sql on sql client like dbeaver:
- Install Dbeaver thru below link: https://dbeaver.com/download/lite/
- Click on database on the menu bar and click on new database connection.
- Select Apache Hive
- Click next and setup the JDBC parameters as show in below image
- Now we are all set to start running HQL queries
Note : 10000 port should be exposed for the docker container and for username and password use “dse”