Skip to content

patryklaskowski/vspu-notification-system

Repository files navigation

vspu-notification-system


Process stream of data in real time manner and trigger data-driven actions.

Python Java Apache_Kafka Apache_Maven Redis Docker IOS


Overview:

This is notification system for Video Stream Processing Unit (VSPU).
The purpose is to build solution that based on any data source is able to process received data and finally trigger custom actions. Each part is build in such way that custom data sources, data processors and actions may be implemented. Here You have provided few examples.

Action system

  • Live plot (matplotlib):
    Plots stream of incoming data provided by callable object.
    Possible to set limit value (both static and dynamic).
    Based on matplotlib.
  • Email notificaiton (Gmail SMTP):
    Send email based on incoming data stream and either dynamic or static limit value.
    System sources data from Kafka topic and compare to current limit.
    If data value exceeds limit value, sends email.
  • Database for IPC:
    Redis database for quick inter process communication.
    Modifying vaiables by external user made available.
    This component is optional.

Data processor (Kafka Streams)

  • SumTheAge:
    Kafka Streams and Java application.
    Recieves data from one Kafka topic, process and pass to another Kafka topic.
    Values are filtered and then sum off all age key is evaluated in real time as data is coming.

Data source

  • Mockup script:
    Python script that pushes data to Kafka topic. Data is flat JSON type in format of (string_key, JSON_value).

Additional

  • Kafka CLI consumer:
    To visulize data flow through Kafka server.

High-level notification system architecture

visualization of high-level-notification-system-architecture


Runtime visualization

screen-record.gif


Getting Started

1) Prepare Kafka Server

NOTE: It is assumed that Kafka Server is up and running. Also it's bin directory added to path

a) Create topic: example.001

kafka-topics.sh --zookeeper 127.0.0.1:2181 \
--topic example.001 \
--create \
--partitions 1 \
--replication-factor 1

b) Topic: example.001.age.sum

kafka-topics.sh --zookeeper 127.0.0.1:2181 \
--topic example.001.age.sum \
--create \
--partitions 1 \
--replication-factor 1

2) Prepare Redis database

a) Using Docker

docker run -d --name redis-db -p 6379:6379 redis redis-server --requirepass "password"

NOTE: To manipulate Redis data (e.g. change value for 'limit' key to 150):

docker exec -it redis-db bash
redis-cli -a password
SET limit 150

3) Clone repository

git clone https://github.com/patryklaskowski/vspu-notification-system.git &&
cd vspu-notification-system

4) Run action system

a). Kafka CLI Consumer (topic: example.001.age.sum, group: example.001.age.sum.vis.app)

kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 \
--topic example.001.age.sum \
--group example.001.age.sum.vis.app \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer

5) Run data processor

a) ...

6) Run data producer

a) Kafka CLI Consumer (topic:example.001, group: example.001.vis.app)

kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 \
--topic example.001 \
--group example.001.vis.app \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

TODO


Run software

NOTE: To reset group offset to very beginning

kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 \
--topic example.001 \
--group example.001.vis.app \
--reset-offsets --to-earliest \
--execute

2. "sumTheAge" Kafka Streams Application

java -jar kafka-streams/sumTheAge-kafka-streams/target/sumTheAge-kafka-streams-1.0-jar-with-dependencies.jar

NOTE: To reset group offset to very beginning

kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 \
--topic example.001.age.sum \
--group example.001.age.sum.vis.app \
--reset-offsets --to-earliest \
--execute

4. Live Plot Python Consumer

cd python-kafka-consumer &&
python3.7 -m venv env &&
source env/bin/activate &&
python3 -m pip install -r requirements.txt &&
python3 live_plot_consumer.py --limit 10000 --window 50 --interval 300

5. Email Notification Python Consumer

6. Python Data Producer Mockup

cd python-kafka-producer-mockup &&
python3.7 -m venv env &&
source env/bin/activate &&
python3 -m pip install -r requirements.txt &&
python3 kafka-python-sumTheAge-producer.py --min -5 --max 7 --sleep 0.2 -n 200