- Lab 1
- Access cluster
- Lab 2 - Getting started with NiFi
- Consuming the Meetup RSVP stream
- Extracting JSON elements we are interested in
- Splitting JSON into smaller fragments
- Writing JSON to File System
- Lab 3 - Kafka Basics
- Creating a topic
- Producing data
- Consuming data
- Lab 4 - Integrating Kafka with NiFi
- Creating the Kafka topic
- Adding the Kafka producer processor
- Verifying the data is flowing
- Lab 5 - Integrating the Schema Registry
- Creating the Kafka topic
- Adding the Meetup Avro Schema
- Sending Avro data to Kafka
Credentials will be provided for these services by the instructor:
- SSH
- Ambari
NOTE: The following instructions are for using Putty. You can also use other popular SSH tools such as MobaXterm or SmarTTY
-
Right click to download this ppk key > Save link as > save to Downloads folder
-
Use putty to connect to your node using the ppk key:
-
Create a new seession called
hdf-workshop
- For the Host Name use: centos@IP_ADDRESS_OF_EC2_NODE
- Click "Save" on the session page before logging in
-
SSH into your EC2 node using below steps:
-
Right click to download this pem key > Save link as > save to Downloads folder
-
Copy pem key to ~/.ssh dir and correct permissions
cp ~/Downloads/hdf-workshop.pem ~/.ssh/ chmod 400 ~/.ssh/hdf-workshop.pem
-
Login to the ec2 node of the you have been assigned by replacing IP_ADDRESS_OF_EC2_NODE below with EC2 node IP Address (your instructor will provide this)
ssh -i ~/.ssh/hdf-workshop.pem centos@IP_ADDRESS_OF_EC2_NODE
-
To change user to root you can:
sudo su -
-
Login to Ambari web UI by opening https://{YOUR_IP}:8080 and log in with admin/StrongPassword
-
You will see a list of Hadoop components running on your node on the left side of the page
- They should all show green (ie started) status. If not, start them by Ambari via 'Service Actions' menu for that service
- NiFi is installed at: /usr/hdf/current/nifi
In this lab, we will learn how to:
- Consume the Meetup RSVP stream
- Extract the JSON elements we are interested in
- Split the JSON into smaller fragments
- Write the JSON to the file system
To get started we need to consume the data from the Meetup RSVP stream, extract what we need, splt the content and save it to a file:
- Consume Meetup RSVP stream
- Extract the JSON elements we are interested in
- Split the JSON into smaller fragments
- Write the JSON files to disk
Our final flow for this lab will look like the following:
A template for this flow can be found here
-
Step 1: Add a ConnectWebSocket processor to the cavas
- In case you are using a downloaded template, the ControllerService will be prepopulated. You will need to enable the ControllerService. Double-click the processor and follow the arrow next to the JettyWebSocketClient
- Set WebSocket Client ID to your favorite number.
-
Step 2: Add an UpdateAttribute procesor
- Configure it to have a custom property called
mime.type
with the value ofapplication/json
- Configure it to have a custom property called
-
Step 3. Add an EvaluateJsonPath processor and configure it as shown below:
The properties to add are:
event.name $.event.event_name event.url $.event.event_url group.city $.group.group_city group.state $.group.group_state group.country $.group.group_country group.name $.group.group_name venue.lat $.venue.lat venue.lon $.venue.lon venue.name $.venue.venue_name
-
Step 4: Add a SplitJson processor and configure the JsonPath Expression to be
$.group.group_topics
-
Step 5: Add a ReplaceText processor and configure the Search Value to be
([{])([\S\s]+)([}])
and the Replacement Value to be{ "event_name": "${event.name}", "event_url": "${event.url}", "venue" : { "lat": "${venue.lat}", "lon": "${venue.lon}", "name": "${venue.name}" }, "group" : { "group_city" : "${group.city}", "group_country" : "${group.country}", "group_name" : "${group.name}", "group_state" : "${group.state}", $2 } }
-
Step 6: Add a PutFile processor to the canvas and configure it to write the data out to
/tmp/rsvp-data
- What does a full RSVP Json object look like?
- How many output files do you end up with?
- How can you change the file name that Json is saved as from PutFile?
- Why do you think we are splitting out the RSVP's by group?
- Why are we using the Update Attribute processor to add a mime.type?
- How can you cange the flow to get the member photo from the Json and download it.
In this lab we are going to explore creating, writing to and consuming Kafka topics. This will come in handy when we later integrate Kafka with NiFi and Streaming Analytics Manager.
- Creating a topic
-
Step 1: Open an SSH connection to your EC2 Node.
-
Step 2: Naviagte to the Kafka directory (
/usr/hdf/current/kafka-broker
), this is where Kafka is installed, we will use the utilities located in the bin directory.#cd /usr/hdf/current/kafka-broker/
-
Step 3: Create a topic using the kafka-topics.sh script
bin/kafka-topics.sh --zookeeper localhost:2181 --create --partitions 1 --replication-factor 1 --topic first-topic
NOTE: Based on how Kafka reports metrics topics with a period ('.') or underscore ('_') may collide with metric names and should be avoided. If they cannot be avoided, then you should only use one of them.
-
Step 4: Ensure the topic was created
bin/kafka-topics.sh --list --zookeeper localhost:2181
- Testing Producers and Consumers
- Step 1: Open a second terminal to your EC2 node and navigate to the Kafka directory
- In one shell window connect a consumer:
bin/kafka-console-consumer.sh --bootstrap-server demo.hortonworks.com:6667 --from-beginning --topic first-topic
Note: using –from-beginning will tell the broker we want to consume from the first message in the topic. Otherwise it will be from the latest offset.
- In the second shell window connect a producer:
bin/kafka-console-producer.sh --broker-list demo.hortonworks.com:6667 --topic first-topic
-
Sending messages. Now that the producer is connected we can type messages.
- Type a message in the producer window
-
Messages should appear in the consumer window.
-
Close the consumer (ctrl-c) and reconnect using the default offset, of latest. You will now see only new messages typed in the producer window.
-
As you type messages in the producer window they should appear in the consumer window.
- Creating the topic
-
Step 1: Open an SSH connection to your EC2 Node.
-
Step 2: Naviagte to the Kafka directory (
/usr/hdf/current/kafka-broker
), this is where Kafka is installed, we will use the utilities located in the bin directory.#cd /usr/hdf/current/kafka-broker/
-
Step 3: Create a topic using the kafka-topics.sh script
bin/kafka-topics.sh --zookeeper localhost:2181 --create --partitions 1 --replication-factor 1 --topic meetup_rsvp_raw
NOTE: Based on how Kafka reports metrics topics with a period ('.') or underscore ('_') may collide with metric names and should be avoided. If they cannot be avoided, then you should only use one of them.
-
Step 4: Ensure the topic was created
bin/kafka-topics.sh --list --zookeeper localhost:2181
- Integrating NiFi
-
Step 1: Add a PublishKafka_1_0 processor to the canvas.
-
Step 2: Add a routing for the success relationship of the ReplaceText processor to the PublishKafka_1_0 processor added in Step 1 as shown below:
-
Step 3: Configure the topic and broker for the PublishKafka_1_0 processor, where topic is meetup_rsvp_raw and broker is demo.hortonworks.com:6667.
-
Start the NiFi flow
-
In a terminal window to your EC2 node and navigate to the Kafka directory and connect a consumer to the
meetup_rsvp_raw
topic:bin/kafka-console-consumer.sh --bootstrap-server demo.hortonworks.com:6667 --from-beginning --topic meetup_rsvp_raw
-
Messages should now appear in the consumer window.
- Creating the topic
-
Step 1: Open an SSH connection to your EC2 Node.
-
Step 2: Naviagte to the Kafka directory (
/usr/hdf/current/kafka-broker
), this is where Kafka is installed, we will use the utilities located in the bin directory.#cd /usr/hdf/current/kafka-broker/
-
Step 3: Create a topic using the kafka-topics.sh script
bin/kafka-topics.sh --zookeeper localhost:2181 --create --partitions 1 --replication-factor 1 --topic meetup_rsvp_avro
NOTE: Based on how Kafka reports metrics topics with a period ('.') or underscore ('_') may collide with metric names and should be avoided. If they cannot be avoided, then you should only use one of them.
-
Step 4: Ensure the topic was created
bin/kafka-topics.sh --list --zookeeper localhost:2181
- Adding the Schema to the Schema Registry
-
Step 1: Open a browser and navigate to the Schema Registry UI. You can get to this from the either the
Quick Links
drop down in Ambari, as shown below:or by going to
https://<EC2_NODE>:17788
-
Step 2: Create Meetup RSVP Schema in the Schema Registry
-
Click on “+” button to add new schemas. A window called “Add New Schema” will appear.
-
Fill in the fields of the
Add Schema Dialog
as follows:For the Schema Text you can download it here and either copy and paste it or upload the file.
Once the schema information fields have been filled and schema uploaded, click Save.
-
- We are now ready to integrate the schema with NiFi
-
Step 0: Remove the PutFile and PublishKafka_1_0 processors from the canvas, we will not need them for this section.
-
Step 1: Add a UpdateAttribute processor to the canvas.
-
Step 2: Add a routing for the success relationship of the ReplaceText processor to the UpdateAttrbute processor added in Step 1.
-
Step 3: Configure the UpdateAttribute processor as shown below:
-
Step 4: Add a JoltTransformJSON processor to the canvas.
-
Step 5: Add a routing for the success relationship of the UpdateAttribute processor to the JoltTransformJSON processor added in Step 5.
-
Step 6: Configure the JoltTransformJSON processor as shown below:
The JSON used in the 'Jolt Specification' property is as follows:
{ "venue": { "lat": ["=toDouble", 0.0], "lon": ["=toDouble", 0.0] } }
-
Step 7: Add a LogAttribute processor to the canvas.
-
Step 8: Add a routing for the failure relationship of the JoltTransformJSON processor to the LogAttribute processor added in Step 7.
-
Step 9: Add a PublishKafkaRecord_1_0 to the canvas.
-
Step 10: Add a routing for the success relationship of the JoltTransformJSON processor to the PublishKafkaRecord_1_0 processor added in Step 9.
-
Step 11: Configure the PublishKafkaRecord_1_0 processor to look like the following:
-
Step 12: When you configure the JsonTreeReader and AvroRecordSetWriter, you will first need to configure a schema registry controller service. The schema registry controller service we are going to use is the 'HWX Schema Registry', it should be configured as shown below:
-
Step 13: Configure the JsonTreeReader as shown below:
-
Step 14: Configure the AvroRecordSetWriter as shown below:
After following the above steps this section of your flow should look like the following:
-
Start the NiFi flow
-
In a terminal window to your EC2 node and navigate to the Kafka directory and connect a consumer to the
meetup_rsvp_avro
topic:bin/kafka-console-consumer.sh --bootstrap-server demo.hortonworks.com:6667 --from-beginning --topic meetup_rsvp_avro
-
Messages should now appear in the consumer window.