Skip to content

Commit

Permalink
Adding a notification service with a SNS implementation (#17)
Browse files Browse the repository at this point in the history
  • Loading branch information
tgianos committed Dec 5, 2016
1 parent a0174fe commit fad6f37
Show file tree
Hide file tree
Showing 37 changed files with 2,197 additions and 73 deletions.
5 changes: 4 additions & 1 deletion .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ indent_style = space
[*.java]
indent_size = 4

[*.groovy]
indent_size = 4

[*.py]
indent_size = 4

Expand All @@ -23,4 +26,4 @@ indent_size = 2
indent_size = 4

[*.yml]
indent_size = 2
indent_size = 2
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ private CounterWrapper() {

/**
* Increments the servo counter.
* @param counterName counter name
*
* @param counterName counter name
* @param incrementAmount increment value
*/
public static void incrementCounter(final String counterName, final long incrementAmount) {
Expand All @@ -67,6 +68,7 @@ public static void incrementCounter(final String counterName, final long increme

/**
* Increments the servo counter by 1.
*
* @param counterName counter name
*/
public static void incrementCounter(final String counterName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ public class ArchaiusConfigImpl implements Config {
private final DynamicBooleanProperty canSoftDeleteDataMetadata;
private final DynamicBooleanProperty canCascadeViewsMetadataOnTableDelete;
private final DynamicIntProperty userMetadataMaxInClauseItems;
private final DynamicBooleanProperty snsEnabled;
private final DynamicStringProperty snsTopicTableArn;
private final DynamicStringProperty snsTopicPartitionArn;

/**
* Default constructor.
Expand Down Expand Up @@ -126,6 +129,11 @@ public ArchaiusConfigImpl(final DynamicPropertyFactory factory) {
this.canCascadeViewsMetadataOnTableDelete = factory
.getBooleanProperty("metacat.table.delete.cascade.views.metadata", true);
this.userMetadataMaxInClauseItems = factory.getIntProperty("metacat.user.metadata.max_in_clause_items", 2500);
this.snsEnabled = factory.getBooleanProperty("metacat.notifications.sns.enabled", false);
this.snsTopicTableArn
= factory.getStringProperty("metacat.notifications.sns.topic.table.arn", null);
this.snsTopicPartitionArn
= factory.getStringProperty("metacat.notifications.sns.topic.partition.arn", null);
}

private void setQualifiedNamesToElasticSearchRefreshExcludeQualifiedNames() {
Expand Down Expand Up @@ -315,4 +323,28 @@ public boolean canCascadeViewsMetadataOnTableDelete() {
public int getUserMetadataMaxInClauseItems() {
return userMetadataMaxInClauseItems.get();
}

/**
* {@inheritDoc}
*/
@Override
public boolean isSnsNotificationEnabled() {
return this.snsEnabled.get();
}

/**
* {@inheritDoc}
*/
@Override
public String getSnsTopicTableArn() {
return this.snsTopicTableArn.get();
}

/**
* {@inheritDoc}
*/
@Override
public String getSnsTopicPartitionArn() {
return this.snsTopicPartitionArn.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,4 +176,26 @@ public interface Config {
* @return Max. number of in clause items in user metadata service queries
*/
int getUserMetadataMaxInClauseItems();

/**
* Whether or not notifications should be published to SNS. If this is enabled the table topic ARN and
* partition topic arn must also exist or SNS won't be enabled.
*
* @return Whether SNS notifications should be enabled
*/
boolean isSnsNotificationEnabled();

/**
* Get the AWS ARN for the SNS topic to publish to for table related notifications.
*
* @return The table topic ARN or null if no property set
*/
String getSnsTopicTableArn();

/**
* Get the AWS ARN for the SNS topic to publish to for partition related notifications.
*
* @return The partition topic ARN or null if no property set
*/
String getSnsTopicPartitionArn();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

import lombok.Data;

import java.util.Date;
import java.util.UUID;

/**
* The context of the request to Metacat.
*
Expand All @@ -44,6 +47,9 @@ public class MetacatRequestContext {
*/
public static final String HEADER_KEY_DATA_TYPE_CONTEXT = "X-Netflix.data.type.context";

private final String id = UUID.randomUUID().toString();
// TODO: Move to Java 8 and use java.time.Instant
private final long timestamp = new Date().getTime();
private final String userName;
private final String clientAppName;
private final String clientId;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
*
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

/**
* Data transfer objects related to notification events.
*
* @author tgianos
* @since 0.1.47
*/
package com.netflix.metacat.common.dto.notifications;
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
*
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.netflix.metacat.common.dto.notifications.sns;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.netflix.metacat.common.dto.BaseDto;
import lombok.Getter;
import lombok.ToString;

/**
* Base SNS notification DTO with shared fields.
*
* @param <P> The type of payload this notification has
* @author tgianos
* @since 0.1.47
*/
@Getter
@ToString
public class SNSMessage<P> extends BaseDto {

private final String id;
private final long timestamp;
private final String requestId;
private final String name;
private final SNSMessageType type;
private final P payload;

/**
* Create a new SNSMessage.
*
* @param id The unique id of the message
* @param timestamp The number of milliseconds since epoch that this message occurred
* @param requestId The id of the API request that generated this and possibly other messages. Used for grouping
* @param type The type of notification
* @param name The qualified name of the resource that this notification is being generated for
* @param payload The payload of the notification
*/
@JsonCreator
public SNSMessage(
@JsonProperty("id") final String id,
@JsonProperty("timestamp") final long timestamp,
@JsonProperty("requestId") final String requestId,
@JsonProperty("type") final SNSMessageType type,
@JsonProperty("name") final String name,
@JsonProperty("payload") final P payload
) {
this.id = id;
this.timestamp = timestamp;
this.requestId = requestId;
this.type = type;
this.name = name;
this.payload = payload;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
*
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.netflix.metacat.common.dto.notifications.sns;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.netflix.metacat.common.dto.notifications.sns.messages.AddPartitionMessage;
import com.netflix.metacat.common.dto.notifications.sns.messages.CreateTableMessage;
import com.netflix.metacat.common.dto.notifications.sns.messages.DeletePartitionMessage;
import com.netflix.metacat.common.dto.notifications.sns.messages.DeleteTableMessage;
import com.netflix.metacat.common.dto.notifications.sns.messages.UpdateTableMessage;
import com.netflix.metacat.common.dto.notifications.sns.messages.UpdateTablePartitionsMessage;

import java.io.IOException;

/**
* Create SNSMessage object based on the JSON passed in.
*
* @author tgianos
* @since 0.1.47
*/
public class SNSMessageFactory {

private static final String TYPE_FIELD = "type";
private final ObjectMapper mapper;

/**
* Constructor.
*
* @param mapper The object mapper to use for deserialization
*/
public SNSMessageFactory(final ObjectMapper mapper) {
this.mapper = mapper;
}

/**
* Convert a JSON String into a message if possible.
*
* @param json The body of the message to convert back to the original message object from JSON string
* @return The message bound back into a POJO
* @throws IOException When the input isn't valid JSON
*/
public SNSMessage<?> getMessage(final String json) throws IOException {
final JsonNode object = this.mapper.readTree(json);
if (object.has(TYPE_FIELD)) {
final SNSMessageType messageType = SNSMessageType.valueOf(object.get(TYPE_FIELD).asText());
switch (messageType) {
case TABLE_CREATE:
return this.mapper.readValue(json, CreateTableMessage.class);
case TABLE_DELETE:
return this.mapper.readValue(json, DeleteTableMessage.class);
case TABLE_UPDATE:
return this.mapper.readValue(json, UpdateTableMessage.class);
case TABLE_PARTITIONS_UPDATE:
return this.mapper.readValue(json, UpdateTablePartitionsMessage.class);
case PARTITION_ADD:
return this.mapper.readValue(json, AddPartitionMessage.class);
case PARTITION_DELETE:
return this.mapper.readValue(json, DeletePartitionMessage.class);
default:
throw new UnsupportedOperationException("Unknown type " + messageType);
}
} else {
// won't know how to bind
throw new IOException("Invalid content. No field type field found");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
*
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.netflix.metacat.common.dto.notifications.sns;

/**
* Enumeration of the various types of SNS events there can be.
*
* @author tgianos
* @since 0.1.47
*/
public enum SNSMessageType {
/**
* When a table is created.
*/
TABLE_CREATE,

/**
* When a table is deleted.
*/
TABLE_DELETE,

/**
* When the metadata about a table is updated somehow.
*/
TABLE_UPDATE,

/**
* When the partitions for a table are either created or deleted.
*/
TABLE_PARTITIONS_UPDATE,

/**
* When a partition is added.
*/
PARTITION_ADD,

/**
* When a partition is deleted.
*/
PARTITION_DELETE
}
Loading

0 comments on commit fad6f37

Please sign in to comment.