Skip to content

Commit

Permalink
Azure Stream Analytics example (#329)
Browse files Browse the repository at this point in the history
Azure Stream Analytics example
  • Loading branch information
stack72 committed Jun 28, 2019
2 parents d4a5b29 + 4c2089e commit bfc2ef4
Show file tree
Hide file tree
Showing 6 changed files with 230 additions and 0 deletions.
10 changes: 10 additions & 0 deletions azure-ts-stream-analytics/Pulumi.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
name: azure-ts-stream-analytics
runtime: nodejs
description: Uses an Azure Stream Analytics job to transform data in an Event Hub.
template:
config:
azure:environment:
description: The Azure environment to use (`public`, `usgovernment`, `german`, `china`)
default: public
azure:location:
description: The Azure location to deploy to (e.g., `eastus` or `westeurope`)
66 changes: 66 additions & 0 deletions azure-ts-stream-analytics/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
[![Deploy](https://get.pulumi.com/new/button.svg)](https://app.pulumi.com/new)

# Azure Stream Analytics

An example Pulumi program that deploys an Azure Stream Analytics job to transform data in an Event Hub.

## Running the App

1. Create a new stack:

```
$ pulumi stack init dev
```

1. Login to Azure CLI (you will be prompted to do this during deployment if you forget this step):

```
$ az login
```

1. Restore NPM dependencies:

```
$ npm install
```

1. Configure the location to deploy the example to:

```
$ pulumi config set azure:location <location>
```

1. Run `pulumi up` to preview and deploy changes:

```
$ pulumi up
Previewing update (dev):
...
Updating (dev):
...
Resources:
+ 15 created
Update duration: 2m43s
```

1. Use the following sample messages for testing:

```
// Inputs (1 line - 1 event):
{"Make":"Kia","Sales":2,"Time":"2019-06-26T10:22:36Z"}
{"Make":"Kia","Sales":1,"Time":"2019-06-26T10:22:37Z"}
{"Make":"Honda","Sales":1,"Time":"2019-06-26T10:22:38Z"}
// Output:
[{"Make":"Kia","Sales":3};{"Make":"Honda","Sales":1}]
```

You can send a message with a `curl` command:

```
curl -X POST '$(pulumi stack output inputEndpoint)' -H 'Authorization: $(pulumi stack output sasToken)' -H 'Content-Type: application/atom+xml;type=entry;charset=utf-8' -d '{"Make":"Kia","Sales":2,"Time":"2019-06-26T10:22:36Z"}'
```

1. [Start the Stream Analytics job](https://docs.microsoft.com/en-us/azure/stream-analytics/start-job). The job will start emitting messages to the output Event Hub once per minute. The Azure Function `analytics-output` will start printing those events into the console (you'd have to open the function console in the Azure portal to see them).
101 changes: 101 additions & 0 deletions azure-ts-stream-analytics/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2016-2019, Pulumi Corporation. All rights reserved.

import * as pulumi from "@pulumi/pulumi";
import * as azure from "@pulumi/azure";
import { createSharedAccessToken } from "./token";

// Create an Azure Resource Group
const resourceGroup = new azure.core.ResourceGroup("streams-rg");

// Define an Event Hub Namespace with two Hubs for an input and an output data streams
const namespace = new azure.eventhub.EventHubNamespace("streams-ns", {
resourceGroupName: resourceGroup.name,
sku: "standard",
});

const inputHub = new azure.eventhub.EventHub("inputs", {
resourceGroupName: resourceGroup.name,
namespaceName: namespace.name,
partitionCount: 2,
messageRetention: 7,
});

const consumerGroup = new azure.eventhub.EventHubConsumerGroup("analytics", {
resourceGroupName: resourceGroup.name,
namespaceName: namespace.name,
eventhubName: inputHub.name,
});

const outputHub = new azure.eventhub.EventHub("outputs", {
resourceGroupName: resourceGroup.name,
namespaceName: namespace.name,
partitionCount: 2,
messageRetention: 7,
});

// Create a Stream Analytics Job that aggregates events per Make and 1-minute intervals
const job = new azure.streamanalytics.Job("job", {
resourceGroupName: resourceGroup.name,
compatibilityLevel: "1.1",
dataLocale: "en-GB",
eventsLateArrivalMaxDelayInSeconds: 60,
eventsOutOfOrderMaxDelayInSeconds: 50,
eventsOutOfOrderPolicy: "Adjust",
outputErrorPolicy: "Drop",
streamingUnits: 1,
transformationQuery: `
SELECT
Make,
SUM(Sales) AS Sales
INTO
[MyOutput]
FROM
[MyInput] TIMESTAMP BY Time
GROUP BY
Make,
TumblingWindow(minute, 1)
`
});

// Input of the job: the Event Hub with raw events
const input = new azure.streamanalytics.StreamInputEventHub("input", {
name: "MyInput",
resourceGroupName: resourceGroup.name,
streamAnalyticsJobName: job.name,
servicebusNamespace: namespace.name,
eventhubName: inputHub.name,
eventhubConsumerGroupName: consumerGroup.name,
sharedAccessPolicyKey: namespace.defaultPrimaryKey,
sharedAccessPolicyName: "RootManageSharedAccessKey",
serialization: {
type: "Json",
encoding: "UTF8",
},
});

// Output of the job: the Event Hub with aggregated data
const output = new azure.streamanalytics.OutputEventHub("output", {
name: "MyOutput",
resourceGroupName: resourceGroup.name,
streamAnalyticsJobName: job.name,
servicebusNamespace: namespace.name,
eventhubName: outputHub.name,
sharedAccessPolicyKey: namespace.defaultPrimaryKey,
sharedAccessPolicyName: "RootManageSharedAccessKey",
serialization: {
type: "Json",
encoding: "UTF8",
format: "Array",
},
});

// Create an Azure Function to subscribe to the output and print all outputs of the job
outputHub.onEvent("analytics-output", {
callback: async (context, event) => {
console.log(JSON.stringify(event));
},
});

const url = pulumi.interpolate`https://${namespace.name}.servicebus.windows.net`;
export const sasToken = pulumi.all([url, namespace.defaultPrimaryKey]).apply(([u, pk]) => createSharedAccessToken(u, "RootManageSharedAccessKey", pk));
export const inputEndpoint = pulumi.interpolate`${url}/${inputHub.name}/messages?timeout=60&api-version=2014-01`;
14 changes: 14 additions & 0 deletions azure-ts-stream-analytics/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"name": "azure-ts-stream-analytics",
"version": "0.1.0",
"devDependencies": {
"@types/node": "^10.3.3",
"@types/utf8": "^2.1.6"
},
"dependencies": {
"@pulumi/azure": "latest",
"@pulumi/pulumi": "latest",
"crypto": "^1.0.1",
"utf8": "^3.0.0"
}
}
16 changes: 16 additions & 0 deletions azure-ts-stream-analytics/token.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { encode } from "utf8";
import { createHmac } from "crypto";

export function createSharedAccessToken(uri: string, saName: string, saKey: string): string {
if (!uri || !saName || !saKey) {
throw "Missing required parameter";
}
var encoded = encodeURIComponent(uri);
var now = new Date();
var week = 60 * 60 * 24 * 7;
var ttl = Math.round(now.getTime() / 1000) + week;
var signature = encoded + '\n' + ttl;
var signatureUTF8 = encode(signature);
var hash = createHmac('sha256', saKey).update(signatureUTF8).digest('base64');
return 'SharedAccessSignature sr=' + encoded + '&sig=' + encodeURIComponent(hash) + '&se=' + ttl + '&skn=' + saName;
}
23 changes: 23 additions & 0 deletions azure-ts-stream-analytics/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"compilerOptions": {
"outDir": "bin",
"target": "es6",
"lib": [
"es6"
],
"module": "commonjs",
"moduleResolution": "node",
"sourceMap": true,
"experimentalDecorators": true,
"pretty": true,
"noFallthroughCasesInSwitch": true,
"noImplicitAny": true,
"noImplicitReturns": true,
"forceConsistentCasingInFileNames": true,
"strictNullChecks": true
},
"files": [
"index.ts",
"token.ts"
]
}

0 comments on commit bfc2ef4

Please sign in to comment.