From 9cfa2f16f846dfd47d16416e7fe1352f1ff9a040 Mon Sep 17 00:00:00 2001 From: Mikhail Shilkov Date: Thu, 27 Jun 2019 09:07:58 +0200 Subject: [PATCH 1/2] Azure Stream Analytics example --- azure-ts-stream-analytics/Pulumi.yaml | 10 +++ azure-ts-stream-analytics/README.md | 58 +++++++++++++++ azure-ts-stream-analytics/index.ts | 95 +++++++++++++++++++++++++ azure-ts-stream-analytics/package.json | 11 +++ azure-ts-stream-analytics/tsconfig.json | 22 ++++++ 5 files changed, 196 insertions(+) create mode 100644 azure-ts-stream-analytics/Pulumi.yaml create mode 100644 azure-ts-stream-analytics/README.md create mode 100644 azure-ts-stream-analytics/index.ts create mode 100644 azure-ts-stream-analytics/package.json create mode 100644 azure-ts-stream-analytics/tsconfig.json diff --git a/azure-ts-stream-analytics/Pulumi.yaml b/azure-ts-stream-analytics/Pulumi.yaml new file mode 100644 index 000000000..e0cee0b55 --- /dev/null +++ b/azure-ts-stream-analytics/Pulumi.yaml @@ -0,0 +1,10 @@ +name: azure-ts-stream-analytics +runtime: nodejs +description: Uses an Azure Stream Analytics job to transform data in an Event Hub. +template: + config: + azure:environment: + description: The Azure environment to use (`public`, `usgovernment`, `german`, `china`) + default: public + azure:location: + description: The Azure location to deploy to (e.g., `eastus` or `westeurope`) diff --git a/azure-ts-stream-analytics/README.md b/azure-ts-stream-analytics/README.md new file mode 100644 index 000000000..216eea880 --- /dev/null +++ b/azure-ts-stream-analytics/README.md @@ -0,0 +1,58 @@ +[![Deploy](https://get.pulumi.com/new/button.svg)](https://app.pulumi.com/new) + +# Azure Stream Analytics + +An example Pulumi program that deploys an Azure Stream Analytics job to transform data in an Event Hub. + +## Running the App + +1. Create a new stack: + + ``` + $ pulumi stack init dev + ``` + +1. Login to Azure CLI (you will be prompted to do this during deployment if you forget this step): + + ``` + $ az login + ``` + +1. Restore NPM dependencies: + + ``` + $ npm install + ``` + +1. Configure the location to deploy the example to: + + ``` + $ pulumi config set azure:location + ``` + +1. Run `pulumi up` to preview and deploy changes: + + ``` + $ pulumi up + Previewing update (dev): + ... + + Updating (dev): + ... + Resources: + + 15 created + Update duration: 2m43s + ``` + +1. Use the following sample messages for testing: + + ``` + // Inputs (1 line - 1 event): + {"Make":"Kia","Sales":2,"Time":"2019-06-26T10:22:36Z"} + {"Make":"Kia","Sales":1,"Time":"2019-06-26T10:22:37Z"} + {"Make":"Honda","Sales":1,"Time":"2019-06-26T10:22:38Z"} + + // Output: + [{"Make":"Kia","Sales":3};{"Make":"Honda","Sales":1}] + + ``` diff --git a/azure-ts-stream-analytics/index.ts b/azure-ts-stream-analytics/index.ts new file mode 100644 index 000000000..920e3b5c9 --- /dev/null +++ b/azure-ts-stream-analytics/index.ts @@ -0,0 +1,95 @@ +// Copyright 2016-2019, Pulumi Corporation. All rights reserved. + +import * as azure from "@pulumi/azure"; + +// Create an Azure Resource Group +const resourceGroup = new azure.core.ResourceGroup("streams-rg"); + +// Define an Event Hub Namespace with two Hubs for an input and an output data streams +const namespace = new azure.eventhub.EventHubNamespace("streams-ns", { + resourceGroupName: resourceGroup.name, + sku: "standard", +}); + +const inputHub = new azure.eventhub.EventHub("inputs", { + resourceGroupName: resourceGroup.name, + namespaceName: namespace.name, + partitionCount: 2, + messageRetention: 7, +}); + +const consumerGroup = new azure.eventhub.EventHubConsumerGroup("analytics", { + resourceGroupName: resourceGroup.name, + namespaceName: namespace.name, + eventhubName: inputHub.name, +}); + +const outputHub = new azure.eventhub.EventHub("outputs", { + resourceGroupName: resourceGroup.name, + namespaceName: namespace.name, + partitionCount: 2, + messageRetention: 7, +}); + +// Create a Stream Analytics Job that aggregates events per Make and 1-minute intervals +const job = new azure.streamanalytics.Job("job", { + resourceGroupName: resourceGroup.name, + compatibilityLevel: "1.1", + dataLocale: "en-GB", + eventsLateArrivalMaxDelayInSeconds: 60, + eventsOutOfOrderMaxDelayInSeconds: 50, + eventsOutOfOrderPolicy: "Adjust", + outputErrorPolicy: "Drop", + streamingUnits: 1, + transformationQuery: ` +SELECT + Make, + SUM(Sales) AS Sales +INTO + [MyOutput] +FROM + [MyInput] TIMESTAMP BY Time +GROUP BY + Make, + TumblingWindow(minute, 1) +` +}); + +// Input of the job: the Event Hub with raw events +const input = new azure.streamanalytics.StreamInputEventHub("input", { + name: "MyInput", + resourceGroupName: resourceGroup.name, + streamAnalyticsJobName: job.name, + servicebusNamespace: namespace.name, + eventhubName: inputHub.name, + eventhubConsumerGroupName: consumerGroup.name, + sharedAccessPolicyKey: namespace.defaultPrimaryKey, + sharedAccessPolicyName: "RootManageSharedAccessKey", + serialization: { + type: "Json", + encoding: "UTF8", + }, +}); + +// Output of the job: the Event Hub with aggregated data +const output = new azure.streamanalytics.OutputEventHub("output", { + name: "MyOutput", + resourceGroupName: resourceGroup.name, + streamAnalyticsJobName: job.name, + servicebusNamespace: namespace.name, + eventhubName: outputHub.name, + sharedAccessPolicyKey: namespace.defaultPrimaryKey, + sharedAccessPolicyName: "RootManageSharedAccessKey", + serialization: { + type: "Json", + encoding: "UTF8", + format: "Array", + }, +}); + +// Create an Azure Function to subscribe to the output and print all outputs of the job +outputHub.onEvent("analytics-output", { + callback: async (context, event) => { + console.log(JSON.stringify(event)); + }, +}); diff --git a/azure-ts-stream-analytics/package.json b/azure-ts-stream-analytics/package.json new file mode 100644 index 000000000..d2dc4fc1c --- /dev/null +++ b/azure-ts-stream-analytics/package.json @@ -0,0 +1,11 @@ +{ + "name": "azure-ts-stream-analytics", + "version": "0.1.0", + "devDependencies": { + "@types/node": "^10.3.3" + }, + "dependencies": { + "@pulumi/azure": "latest", + "@pulumi/pulumi": "latest" + } +} diff --git a/azure-ts-stream-analytics/tsconfig.json b/azure-ts-stream-analytics/tsconfig.json new file mode 100644 index 000000000..16caa48c2 --- /dev/null +++ b/azure-ts-stream-analytics/tsconfig.json @@ -0,0 +1,22 @@ +{ + "compilerOptions": { + "outDir": "bin", + "target": "es6", + "lib": [ + "es6" + ], + "module": "commonjs", + "moduleResolution": "node", + "sourceMap": true, + "experimentalDecorators": true, + "pretty": true, + "noFallthroughCasesInSwitch": true, + "noImplicitAny": true, + "noImplicitReturns": true, + "forceConsistentCasingInFileNames": true, + "strictNullChecks": true + }, + "files": [ + "index.ts" + ] +} From 4c2089e9b88fe523ad081dc8882e816628149553 Mon Sep 17 00:00:00 2001 From: Mikhail Shilkov Date: Fri, 28 Jun 2019 15:47:13 +0200 Subject: [PATCH 2/2] Add a way to send events to the Event Hub --- azure-ts-stream-analytics/README.md | 8 ++++++++ azure-ts-stream-analytics/index.ts | 6 ++++++ azure-ts-stream-analytics/package.json | 9 ++++++--- azure-ts-stream-analytics/token.ts | 16 ++++++++++++++++ azure-ts-stream-analytics/tsconfig.json | 5 +++-- 5 files changed, 39 insertions(+), 5 deletions(-) create mode 100644 azure-ts-stream-analytics/token.ts diff --git a/azure-ts-stream-analytics/README.md b/azure-ts-stream-analytics/README.md index 216eea880..b79971687 100644 --- a/azure-ts-stream-analytics/README.md +++ b/azure-ts-stream-analytics/README.md @@ -56,3 +56,11 @@ An example Pulumi program that deploys an Azure Stream Analytics job to transfor [{"Make":"Kia","Sales":3};{"Make":"Honda","Sales":1}] ``` + + You can send a message with a `curl` command: + + ``` + curl -X POST '$(pulumi stack output inputEndpoint)' -H 'Authorization: $(pulumi stack output sasToken)' -H 'Content-Type: application/atom+xml;type=entry;charset=utf-8' -d '{"Make":"Kia","Sales":2,"Time":"2019-06-26T10:22:36Z"}' + ``` + +1. [Start the Stream Analytics job](https://docs.microsoft.com/en-us/azure/stream-analytics/start-job). The job will start emitting messages to the output Event Hub once per minute. The Azure Function `analytics-output` will start printing those events into the console (you'd have to open the function console in the Azure portal to see them). diff --git a/azure-ts-stream-analytics/index.ts b/azure-ts-stream-analytics/index.ts index 920e3b5c9..43bb84560 100644 --- a/azure-ts-stream-analytics/index.ts +++ b/azure-ts-stream-analytics/index.ts @@ -1,6 +1,8 @@ // Copyright 2016-2019, Pulumi Corporation. All rights reserved. +import * as pulumi from "@pulumi/pulumi"; import * as azure from "@pulumi/azure"; +import { createSharedAccessToken } from "./token"; // Create an Azure Resource Group const resourceGroup = new azure.core.ResourceGroup("streams-rg"); @@ -93,3 +95,7 @@ outputHub.onEvent("analytics-output", { console.log(JSON.stringify(event)); }, }); + +const url = pulumi.interpolate`https://${namespace.name}.servicebus.windows.net`; +export const sasToken = pulumi.all([url, namespace.defaultPrimaryKey]).apply(([u, pk]) => createSharedAccessToken(u, "RootManageSharedAccessKey", pk)); +export const inputEndpoint = pulumi.interpolate`${url}/${inputHub.name}/messages?timeout=60&api-version=2014-01`; diff --git a/azure-ts-stream-analytics/package.json b/azure-ts-stream-analytics/package.json index d2dc4fc1c..4a6d9617c 100644 --- a/azure-ts-stream-analytics/package.json +++ b/azure-ts-stream-analytics/package.json @@ -2,10 +2,13 @@ "name": "azure-ts-stream-analytics", "version": "0.1.0", "devDependencies": { - "@types/node": "^10.3.3" + "@types/node": "^10.3.3", + "@types/utf8": "^2.1.6" }, "dependencies": { "@pulumi/azure": "latest", - "@pulumi/pulumi": "latest" + "@pulumi/pulumi": "latest", + "crypto": "^1.0.1", + "utf8": "^3.0.0" } -} +} \ No newline at end of file diff --git a/azure-ts-stream-analytics/token.ts b/azure-ts-stream-analytics/token.ts new file mode 100644 index 000000000..b22eb00d2 --- /dev/null +++ b/azure-ts-stream-analytics/token.ts @@ -0,0 +1,16 @@ +import { encode } from "utf8"; +import { createHmac } from "crypto"; + +export function createSharedAccessToken(uri: string, saName: string, saKey: string): string { + if (!uri || !saName || !saKey) { + throw "Missing required parameter"; + } + var encoded = encodeURIComponent(uri); + var now = new Date(); + var week = 60 * 60 * 24 * 7; + var ttl = Math.round(now.getTime() / 1000) + week; + var signature = encoded + '\n' + ttl; + var signatureUTF8 = encode(signature); + var hash = createHmac('sha256', saKey).update(signatureUTF8).digest('base64'); + return 'SharedAccessSignature sr=' + encoded + '&sig=' + encodeURIComponent(hash) + '&se=' + ttl + '&skn=' + saName; +} diff --git a/azure-ts-stream-analytics/tsconfig.json b/azure-ts-stream-analytics/tsconfig.json index 16caa48c2..5476c7b49 100644 --- a/azure-ts-stream-analytics/tsconfig.json +++ b/azure-ts-stream-analytics/tsconfig.json @@ -17,6 +17,7 @@ "strictNullChecks": true }, "files": [ - "index.ts" + "index.ts", + "token.ts" ] -} +} \ No newline at end of file