Skip to content

Real time data transformation and persistence with NiFi and AI service with PySpark hosted in the Cloud

License

Notifications You must be signed in to change notification settings

RihabFekii/PySpark-AI-service-data-processing-NiFi

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

60 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

End-to-end AI solution with PySpark & real time data processing with Apache NiFi

This project demostrates in a first phase, how to collect data in real-time via the NGSI-LD Context Broker, transform and persist it using DRACO (based on Apache NiFi). In a second phase, it shows how to run Apache Spark and Jupyter Notebooks on a Google Cloud Dataproc cluster.

Further information about NiFi and PySpark configuration can be found within their respective folders.

Agenda

General architecture

AI service reference architecture

Used technologies

  • NGSI-LD Context Broker: A FIWARE generic enabler to manage context data in real time. It allows you to manage the entire lifecycle of context information including updates, queries, registrations and subscriptions.

  • DRACO: A FIWARE generic enabler for managing the history of context data. Internally, Draco is based on Apache NiFi, NiFi is a dataflow system based on the concepts of flow-based programming. It supports powerful and scalable directed graphs of data routing, transformation, and system mediation logic. It was built to automate the flow of data between systems.

  • Dataproc : A managed Spark and Hadoop service that lets you take advantage of open source data tools for batch processing, querying, streaming, and machine learning.

  • PySpark: An interface for Apache Spark in Python. It not only allows you to write Spark applications using Python APIs, but also provides the PySpark shell for interactively analyzing your data in a distributed environment. PySpark supports most of Spark’s features such as Spark SQL, DataFrame, Streaming, MLlib (Machine Learning) and Spark Core.

  • Spark MLlib: Built on top of Spark, MLlib is a scalable machine learning library that provides a uniform set of high-level APIs that help users create and tune practical machine learning pipelines.

  • Jupyter Notbeook: An open-source web application that allows you to create and share documents that contain live code, equations, visualizations and narrative text. Uses include: data cleaning and transformation, numerical simulation, statistical modeling, data visualization, machine learning, and much more.

Setting up the Cloud environment

As shown in the general architecture above, this project uses Google Cloud Services (GCS). The following steps demonstrates how to set up the Cloud environment which is used within this project:

  • Create a Google cloud project

  • Create a Google Cloud Storage bucket

  • Create a Dataproc Cluster with Jupyter and Component Gateway

  • Access the JupyterLab web UI on Dataproc

  • Create a Python Notebook for the AI solution based on PySpark

  • Running a Spark job and plotting the results.

Step 1: Create a Google cloud project

Sign-in to Google Cloud Platform console at console.cloud.google.com and create a new project:

In the Cloud Shell run the following commands to enable the Dataproc service, Compute Engine and Storage APIs:

 $ gcloud config set project <project_id>

Then

$ gcloud services enable dataproc.googleapis.com \
  compute.googleapis.com \
  storage-component.googleapis.com

Step 2 : Create a Google Cloud Storage bucket

Create a Google Cloud Storage bucket in the region closest to your data and give it a unique name. This will be used for the Dataproc cluster.

To do that we need to follow two steps :

  1. Open the Cloud Storage browser in the Google Cloud Console.

bucket

  1. Click Create bucket to open the bucket creation form.

bucket form

Step 3 : Create your Dataproc Cluster with Jupyter & Component Gateway

The cluster creation could done through Dataproc service GUI or from the cloud shell.

In the cloud shell, run this gcloud command to create your cluster with all the necessary components to work with Jupyter on your cluster.

$ gcloud beta dataproc clusters create ${CLUSTER_NAME} \
 --region=${REGION} \
 --image-version=1.4 \
 --master-machine-type=n1-standard-4 \
 --worker-machine-type=n1-standard-4 \
 --bucket=${BUCKET_NAME} \
 --optional-components=ANACONDA,JUPYTER \
 --enable-component-gateway

PS:

  • The machine types to use for your Dataproc cluster. You can see a list of available machine types here.

  • By default, 1 master node and 2 worker nodes are created if you do not set the flag --num-workers

It should take about 90 seconds to create your cluster and once it is ready you will be able to access your cluster from the Dataproc Cloud console UI.

cluster

Step 4: Accessing the JupyterLab web interface

Once the cluster is ready you can find the Component Gateway link to the JupyterLab web interface by going to Dataproc Clusters - Cloud console, clicking on the cluster you created and going to the Web Interfaces tab.

WebUI

Step 5: Create a Python Notebook for the AI solution based on PySpark

It is recommended to use "Python 3" notebooks because it has access to all the data science packages installed with Anaconda as well as PySpark and which also allows you to configure the SparkSession in the notebook and include other google cloud APIs(e.g BigQuery Storage API or other APIs)

python

Before creating a new jupyter notebook we need to create a folder where it will be saved in our google cloud storage bucket under GCS(in this example)

More information about the AI solution modeling and the dataset are found under the PySpark folder.

About

Real time data transformation and persistence with NiFi and AI service with PySpark hosted in the Cloud

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published