This package provides a mechanism to trigger OpenWhisk actions when messages are received on an MQTT topic in the Watson IoT Platform. It sets up a Cloud Foundry application that can be configured to listen to one or more topics on a persistent connection, then invokes the registered actions as a feed action.
This package targets OpenWhisk developers who are building serverless applications and need integration with the Watson implementation of MQTT to trigger actions. You can add this package to your existing OpenWhisk namespace alongside your other actions, triggers, and rules.
This package can also be run in a multi-tenant fashion by third parties that wish to provide a hosted implementation of the event provider with other OpenWhisk developers.
The code is based on the generic MQTT package originally created by James Thomas. It differs in that it incorporates the credentials needed to connect to the Watson IoT platform, and in that it sets up the event provider as a Cloud Foundry application rather than a Docker container.
openwhisk-package-mqtt-watson/
├── actions
│ └── handler.js
├── CONTRIBUTING.md
├── feeds
│ └── feed-action.js
├── install.sh
├── LICENSE.txt
├── README.md
├── MqttWatsonEventProvider
│ ├── index.js
│ ├── manifest.yml
│ ├── package.json
│ └── lib
│ ├── feed_controller.js
│ ├── mqtt_subscription_manager.js
│ └── trigger_store.js
└── uninstall.sh
Entity | Type | Description |
---|---|---|
/namespace/mqtt-watson |
package | OpenWhisk Package for Watson IoT MQTT |
/namespace/mqtt-watson/feed-action.js |
action | Subscribes to a Watson IoT MQTT topic |
The main feed action in this package is feed-action.js
. When a trigger is associated to this action, it causes the action to subscribe to an MQTT topic as an application (not a device) so that it can receive messages that will in turn trigger your custom actions.
Parameter | Type | Required | Description | Example |
---|---|---|---|---|
url | string | yes | URL to Watson IoT MQTT feed | "ssl:https://a-123xyz.messaging.internetofthings.ibmcloud.com:8883" |
topic | string | yes | Topic subscription | "iot-2/type/+/id/+/evt/+/fmt/json" |
username | string | yes | Watson IoT API key | "a-123xyz" |
password | string | yes | Watson IoT API token | "+-derpbog" |
client | string | yes | Application client id | "a:12e45g:mqttapp" |
In order to support integration with the Watson IoT environment we need an event feed that fires a trigger in the OpenWhisk environment when messages are received. This service connects to a particular MQTT topic and then invokes the triggers that are registered for a given topic. It has its own Cloudant database to persist the topic-to-trigger subscription information. You will need to initialize this database prior to using this service.
This service can be hosted as a Cloud Foundry application. To deploy on IBM Cloud:
-
Create a Cloudant service, name it
cloudant-mqtt-watson
and create a database namedtopic_listeners
. -
Create a view for that Cloudant database, by creating a new design document with the following content. It provides a way to query for the subscriptions.
{
"_id": "_design/subscriptions",
"views": {
"host_topic_counts": {
"reduce": "_sum",
"map": "function (doc) {\n emit(doc.url + '#' + doc.topic, 1);\n}"
},
"host_topic_triggers": {
"map": "function (doc) {\n emit(doc.url + '#' + doc.topic, {trigger: doc._id, username: doc.username, password: doc.password});\n}"
},
"all": {
"map": "function (doc) {\n emit(doc._id, doc.url + '#' + doc.topic);\n}"
},
"host_triggers": {
"map": "function (doc) {\n emit(doc.url, {trigger: doc._id, username: doc.username, password: doc.password});\n}"
}
}
}
- Clone this repository
git clone https://github.com/kkbankol-ibm/openwhisk-package-mqtt-watson
and change to the MqttWatsonEventProvider
directory.
-
Change the name and host fields as necessary in
manifest.yml
. -
Run
cf push
and take note of the generated hostname route.
First, the IBM Cloud CLI (bx
) will need to be installed. This can be done by following the instructions at https://new-console.ng.bluemix.net/openwhisk/cli
The bluemix plugin for cloud-functions
will also need to be installed with
bx plugin install Cloud-Functions -r Bluemix
This step assumes you've already deployed the Event Provider application. If not, see the section above.
./install.sh openwhisk.ng.bluemix.net $AUTH_KEY $WSK_CLI $PROVIDER_ENDPOINT
Where:
- $AUTH_KEY is the OpenWhisk authentication key (Run
bx wsk property get
to obtain it) - $WSK_CLI is the path of OpenWhisk command interface binary, which should be
bx wsk
- $PROVIDER_ENDPOINT is the endpoint of the event provider service running as a Cloud Foundry application on Bluemix. It's a fully qualified URL including the path to the resource. i.e. http:https://mqtt-watson-${random-word}.mybluemix.net/mqtt-watson. This url can be found by running
bx cf apps
To uninstall the feed, run:
./uninstall.sh openwhisk.ng.bluemix.net $AUTH_KEY $WSK_CLI
To use this trigger feed, you need to pass all of the required parameters (refer to the table above)
$WSK_CLI trigger create mqttMsgReceived \
--feed mqtt-watson/feed-action \
--param topic "iot-2/type/${IOT_DEVICE_TYPE}/id/${IOT_DEVICE_ID}/evt/fromClient/fmt/json" \
--param url "ssl:https://${IOT_ORG}.messaging.internetofthings.ibmcloud.com:8883" \
--param username "${IOT_API_KEY}" \
--param password "${IOT_AUTH_TOKEN}" \
--param client "a:${IOT_ORG}:wskmqttsub_$(date +%s)"
For example:
$WSK_CLI trigger create mqttMsgReceived \
--feed mqtt-watson/feed-action \
--param topic "iot-2/type/+/id/+/evt/+/fmt/json" \
--param url "ssl:https://12e45g.messaging.internetofthings.ibmcloud.com:8883" \
--param username "a-123xyz" \
--param password "+-derpbog" \
--param client "a:12e45g:mqttapp"
-
Create a new trigger, using the example above.
-
Bind an action to the trigger via a "Rule"
bx wsk rule create mqttRule mqttMsgReceived translateText
- Post a message to the MQTT topic that triggers events you have subscribed to:
{
"d" : {
"sourceLanguage" : "en",
"payload" : "test",
"client": "client1"
}
}
- Verify the action was invoked by checking the Cloud Functions monitor or by running:
bx wsk activation poll
To delete the rule, trigger, and action:
bx wsk rule disable mqttRule
bx wsk rule delete mqttRule
bx wsk trigger delete mqttMsgReceived
bx wsk action delete translateText
Please refer to contribution guidelines.