Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Azure Stream Analytics example #329

Merged
merged 3 commits into from
Jun 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions azure-ts-stream-analytics/Pulumi.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
name: azure-ts-stream-analytics
runtime: nodejs
description: Uses an Azure Stream Analytics job to transform data in an Event Hub.
template:
config:
azure:environment:
description: The Azure environment to use (`public`, `usgovernment`, `german`, `china`)
default: public
azure:location:
description: The Azure location to deploy to (e.g., `eastus` or `westeurope`)
66 changes: 66 additions & 0 deletions azure-ts-stream-analytics/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
[![Deploy](https://get.pulumi.com/new/button.svg)](https://app.pulumi.com/new)

# Azure Stream Analytics

An example Pulumi program that deploys an Azure Stream Analytics job to transform data in an Event Hub.

## Running the App

1. Create a new stack:

```
$ pulumi stack init dev
```

1. Login to Azure CLI (you will be prompted to do this during deployment if you forget this step):

```
$ az login
```

1. Restore NPM dependencies:

```
$ npm install
```

1. Configure the location to deploy the example to:

```
$ pulumi config set azure:location <location>
```

1. Run `pulumi up` to preview and deploy changes:

```
$ pulumi up
Previewing update (dev):
...

Updating (dev):
...
Resources:
+ 15 created
Update duration: 2m43s
```

1. Use the following sample messages for testing:

```
// Inputs (1 line - 1 event):
{"Make":"Kia","Sales":2,"Time":"2019-06-26T10:22:36Z"}
{"Make":"Kia","Sales":1,"Time":"2019-06-26T10:22:37Z"}
{"Make":"Honda","Sales":1,"Time":"2019-06-26T10:22:38Z"}

// Output:
[{"Make":"Kia","Sales":3};{"Make":"Honda","Sales":1}]

```

You can send a message with a `curl` command:

```
curl -X POST '$(pulumi stack output inputEndpoint)' -H 'Authorization: $(pulumi stack output sasToken)' -H 'Content-Type: application/atom+xml;type=entry;charset=utf-8' -d '{"Make":"Kia","Sales":2,"Time":"2019-06-26T10:22:36Z"}'
```

1. [Start the Stream Analytics job](https://docs.microsoft.com/en-us/azure/stream-analytics/start-job). The job will start emitting messages to the output Event Hub once per minute. The Azure Function `analytics-output` will start printing those events into the console (you'd have to open the function console in the Azure portal to see them).
101 changes: 101 additions & 0 deletions azure-ts-stream-analytics/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2016-2019, Pulumi Corporation. All rights reserved.

import * as pulumi from "@pulumi/pulumi";
import * as azure from "@pulumi/azure";
import { createSharedAccessToken } from "./token";

// Create an Azure Resource Group
const resourceGroup = new azure.core.ResourceGroup("streams-rg");

// Define an Event Hub Namespace with two Hubs for an input and an output data streams
const namespace = new azure.eventhub.EventHubNamespace("streams-ns", {
resourceGroupName: resourceGroup.name,
sku: "standard",
});

const inputHub = new azure.eventhub.EventHub("inputs", {
resourceGroupName: resourceGroup.name,
namespaceName: namespace.name,
partitionCount: 2,
messageRetention: 7,
});

const consumerGroup = new azure.eventhub.EventHubConsumerGroup("analytics", {
resourceGroupName: resourceGroup.name,
namespaceName: namespace.name,
eventhubName: inputHub.name,
});

const outputHub = new azure.eventhub.EventHub("outputs", {
resourceGroupName: resourceGroup.name,
namespaceName: namespace.name,
partitionCount: 2,
messageRetention: 7,
});

// Create a Stream Analytics Job that aggregates events per Make and 1-minute intervals
const job = new azure.streamanalytics.Job("job", {
resourceGroupName: resourceGroup.name,
compatibilityLevel: "1.1",
dataLocale: "en-GB",
eventsLateArrivalMaxDelayInSeconds: 60,
eventsOutOfOrderMaxDelayInSeconds: 50,
eventsOutOfOrderPolicy: "Adjust",
outputErrorPolicy: "Drop",
streamingUnits: 1,
transformationQuery: `
SELECT
Make,
SUM(Sales) AS Sales
INTO
[MyOutput]
FROM
[MyInput] TIMESTAMP BY Time
GROUP BY
Make,
TumblingWindow(minute, 1)
`
});

// Input of the job: the Event Hub with raw events
const input = new azure.streamanalytics.StreamInputEventHub("input", {
name: "MyInput",
resourceGroupName: resourceGroup.name,
streamAnalyticsJobName: job.name,
servicebusNamespace: namespace.name,
eventhubName: inputHub.name,
eventhubConsumerGroupName: consumerGroup.name,
sharedAccessPolicyKey: namespace.defaultPrimaryKey,
sharedAccessPolicyName: "RootManageSharedAccessKey",
serialization: {
type: "Json",
encoding: "UTF8",
},
});

// Output of the job: the Event Hub with aggregated data
const output = new azure.streamanalytics.OutputEventHub("output", {
name: "MyOutput",
resourceGroupName: resourceGroup.name,
streamAnalyticsJobName: job.name,
servicebusNamespace: namespace.name,
eventhubName: outputHub.name,
sharedAccessPolicyKey: namespace.defaultPrimaryKey,
sharedAccessPolicyName: "RootManageSharedAccessKey",
serialization: {
type: "Json",
encoding: "UTF8",
format: "Array",
},
});

// Create an Azure Function to subscribe to the output and print all outputs of the job
outputHub.onEvent("analytics-output", {
callback: async (context, event) => {
console.log(JSON.stringify(event));
},
});

const url = pulumi.interpolate`https://${namespace.name}.servicebus.windows.net`;
export const sasToken = pulumi.all([url, namespace.defaultPrimaryKey]).apply(([u, pk]) => createSharedAccessToken(u, "RootManageSharedAccessKey", pk));
export const inputEndpoint = pulumi.interpolate`${url}/${inputHub.name}/messages?timeout=60&api-version=2014-01`;
14 changes: 14 additions & 0 deletions azure-ts-stream-analytics/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"name": "azure-ts-stream-analytics",
"version": "0.1.0",
"devDependencies": {
"@types/node": "^10.3.3",
"@types/utf8": "^2.1.6"
},
"dependencies": {
"@pulumi/azure": "latest",
"@pulumi/pulumi": "latest",
"crypto": "^1.0.1",
"utf8": "^3.0.0"
}
}
16 changes: 16 additions & 0 deletions azure-ts-stream-analytics/token.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { encode } from "utf8";
import { createHmac } from "crypto";

export function createSharedAccessToken(uri: string, saName: string, saKey: string): string {
if (!uri || !saName || !saKey) {
throw "Missing required parameter";
}
var encoded = encodeURIComponent(uri);
var now = new Date();
var week = 60 * 60 * 24 * 7;
var ttl = Math.round(now.getTime() / 1000) + week;
var signature = encoded + '\n' + ttl;
var signatureUTF8 = encode(signature);
var hash = createHmac('sha256', saKey).update(signatureUTF8).digest('base64');
return 'SharedAccessSignature sr=' + encoded + '&sig=' + encodeURIComponent(hash) + '&se=' + ttl + '&skn=' + saName;
}
23 changes: 23 additions & 0 deletions azure-ts-stream-analytics/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"compilerOptions": {
"outDir": "bin",
"target": "es6",
"lib": [
"es6"
],
"module": "commonjs",
"moduleResolution": "node",
"sourceMap": true,
"experimentalDecorators": true,
"pretty": true,
"noFallthroughCasesInSwitch": true,
"noImplicitAny": true,
"noImplicitReturns": true,
"forceConsistentCasingInFileNames": true,
"strictNullChecks": true
},
"files": [
"index.ts",
"token.ts"
]
}