diff --git a/.editorconfig b/.editorconfig
index cf0174ddd..1be7f77f9 100644
--- a/.editorconfig
+++ b/.editorconfig
@@ -10,6 +10,9 @@ indent_style = space
[*.java]
indent_size = 4
+[*.groovy]
+indent_size = 4
+
[*.py]
indent_size = 4
@@ -23,4 +26,4 @@ indent_size = 2
indent_size = 4
[*.yml]
-indent_size = 2
\ No newline at end of file
+indent_size = 2
diff --git a/metacat-common-server/src/main/java/com/netflix/metacat/common/monitoring/CounterWrapper.java b/metacat-common-server/src/main/java/com/netflix/metacat/common/monitoring/CounterWrapper.java
index ed368e2aa..966e0673d 100644
--- a/metacat-common-server/src/main/java/com/netflix/metacat/common/monitoring/CounterWrapper.java
+++ b/metacat-common-server/src/main/java/com/netflix/metacat/common/monitoring/CounterWrapper.java
@@ -49,7 +49,8 @@ private CounterWrapper() {
/**
* Increments the servo counter.
- * @param counterName counter name
+ *
+ * @param counterName counter name
* @param incrementAmount increment value
*/
public static void incrementCounter(final String counterName, final long incrementAmount) {
@@ -67,6 +68,7 @@ public static void incrementCounter(final String counterName, final long increme
/**
* Increments the servo counter by 1.
+ *
* @param counterName counter name
*/
public static void incrementCounter(final String counterName) {
diff --git a/metacat-common-server/src/main/java/com/netflix/metacat/common/server/ArchaiusConfigImpl.java b/metacat-common-server/src/main/java/com/netflix/metacat/common/server/ArchaiusConfigImpl.java
index 7e0008d27..580e845b4 100644
--- a/metacat-common-server/src/main/java/com/netflix/metacat/common/server/ArchaiusConfigImpl.java
+++ b/metacat-common-server/src/main/java/com/netflix/metacat/common/server/ArchaiusConfigImpl.java
@@ -63,6 +63,9 @@ public class ArchaiusConfigImpl implements Config {
private final DynamicBooleanProperty canSoftDeleteDataMetadata;
private final DynamicBooleanProperty canCascadeViewsMetadataOnTableDelete;
private final DynamicIntProperty userMetadataMaxInClauseItems;
+ private final DynamicBooleanProperty snsEnabled;
+ private final DynamicStringProperty snsTopicTableArn;
+ private final DynamicStringProperty snsTopicPartitionArn;
/**
* Default constructor.
@@ -126,6 +129,11 @@ public ArchaiusConfigImpl(final DynamicPropertyFactory factory) {
this.canCascadeViewsMetadataOnTableDelete = factory
.getBooleanProperty("metacat.table.delete.cascade.views.metadata", true);
this.userMetadataMaxInClauseItems = factory.getIntProperty("metacat.user.metadata.max_in_clause_items", 2500);
+ this.snsEnabled = factory.getBooleanProperty("metacat.notifications.sns.enabled", false);
+ this.snsTopicTableArn
+ = factory.getStringProperty("metacat.notifications.sns.topic.table.arn", null);
+ this.snsTopicPartitionArn
+ = factory.getStringProperty("metacat.notifications.sns.topic.partition.arn", null);
}
private void setQualifiedNamesToElasticSearchRefreshExcludeQualifiedNames() {
@@ -315,4 +323,28 @@ public boolean canCascadeViewsMetadataOnTableDelete() {
public int getUserMetadataMaxInClauseItems() {
return userMetadataMaxInClauseItems.get();
}
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean isSnsNotificationEnabled() {
+ return this.snsEnabled.get();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String getSnsTopicTableArn() {
+ return this.snsTopicTableArn.get();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String getSnsTopicPartitionArn() {
+ return this.snsTopicPartitionArn.get();
+ }
}
diff --git a/metacat-common-server/src/main/java/com/netflix/metacat/common/server/Config.java b/metacat-common-server/src/main/java/com/netflix/metacat/common/server/Config.java
index 70e164bfd..74d16ce84 100644
--- a/metacat-common-server/src/main/java/com/netflix/metacat/common/server/Config.java
+++ b/metacat-common-server/src/main/java/com/netflix/metacat/common/server/Config.java
@@ -176,4 +176,26 @@ public interface Config {
* @return Max. number of in clause items in user metadata service queries
*/
int getUserMetadataMaxInClauseItems();
+
+ /**
+ * Whether or not notifications should be published to SNS. If this is enabled the table topic ARN and
+ * partition topic arn must also exist or SNS won't be enabled.
+ *
+ * @return Whether SNS notifications should be enabled
+ */
+ boolean isSnsNotificationEnabled();
+
+ /**
+ * Get the AWS ARN for the SNS topic to publish to for table related notifications.
+ *
+ * @return The table topic ARN or null if no property set
+ */
+ String getSnsTopicTableArn();
+
+ /**
+ * Get the AWS ARN for the SNS topic to publish to for partition related notifications.
+ *
+ * @return The partition topic ARN or null if no property set
+ */
+ String getSnsTopicPartitionArn();
}
diff --git a/metacat-common/src/main/java/com/netflix/metacat/common/MetacatRequestContext.java b/metacat-common/src/main/java/com/netflix/metacat/common/MetacatRequestContext.java
index 366c77172..686e8616d 100644
--- a/metacat-common/src/main/java/com/netflix/metacat/common/MetacatRequestContext.java
+++ b/metacat-common/src/main/java/com/netflix/metacat/common/MetacatRequestContext.java
@@ -19,6 +19,9 @@
import lombok.Data;
+import java.util.Date;
+import java.util.UUID;
+
/**
* The context of the request to Metacat.
*
@@ -44,6 +47,9 @@ public class MetacatRequestContext {
*/
public static final String HEADER_KEY_DATA_TYPE_CONTEXT = "X-Netflix.data.type.context";
+ private final String id = UUID.randomUUID().toString();
+ // TODO: Move to Java 8 and use java.time.Instant
+ private final long timestamp = new Date().getTime();
private final String userName;
private final String clientAppName;
private final String clientId;
diff --git a/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/package-info.java b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/package-info.java
new file mode 100644
index 000000000..738f4c0a5
--- /dev/null
+++ b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/package-info.java
@@ -0,0 +1,25 @@
+/*
+ *
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/**
+ * Data transfer objects related to notification events.
+ *
+ * @author tgianos
+ * @since 0.1.47
+ */
+package com.netflix.metacat.common.dto.notifications;
diff --git a/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/SNSMessage.java b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/SNSMessage.java
new file mode 100644
index 000000000..6f085f74c
--- /dev/null
+++ b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/SNSMessage.java
@@ -0,0 +1,70 @@
+/*
+ *
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package com.netflix.metacat.common.dto.notifications.sns;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.netflix.metacat.common.dto.BaseDto;
+import lombok.Getter;
+import lombok.ToString;
+
+/**
+ * Base SNS notification DTO with shared fields.
+ *
+ * @param
The type of payload this notification has
+ * @author tgianos
+ * @since 0.1.47
+ */
+@Getter
+@ToString
+public class SNSMessage
extends BaseDto {
+
+ private final String id;
+ private final long timestamp;
+ private final String requestId;
+ private final String name;
+ private final SNSMessageType type;
+ private final P payload;
+
+ /**
+ * Create a new SNSMessage.
+ *
+ * @param id The unique id of the message
+ * @param timestamp The number of milliseconds since epoch that this message occurred
+ * @param requestId The id of the API request that generated this and possibly other messages. Used for grouping
+ * @param type The type of notification
+ * @param name The qualified name of the resource that this notification is being generated for
+ * @param payload The payload of the notification
+ */
+ @JsonCreator
+ public SNSMessage(
+ @JsonProperty("id") final String id,
+ @JsonProperty("timestamp") final long timestamp,
+ @JsonProperty("requestId") final String requestId,
+ @JsonProperty("type") final SNSMessageType type,
+ @JsonProperty("name") final String name,
+ @JsonProperty("payload") final P payload
+ ) {
+ this.id = id;
+ this.timestamp = timestamp;
+ this.requestId = requestId;
+ this.type = type;
+ this.name = name;
+ this.payload = payload;
+ }
+}
diff --git a/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/SNSMessageFactory.java b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/SNSMessageFactory.java
new file mode 100644
index 000000000..a430567a1
--- /dev/null
+++ b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/SNSMessageFactory.java
@@ -0,0 +1,83 @@
+/*
+ *
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package com.netflix.metacat.common.dto.notifications.sns;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.netflix.metacat.common.dto.notifications.sns.messages.AddPartitionMessage;
+import com.netflix.metacat.common.dto.notifications.sns.messages.CreateTableMessage;
+import com.netflix.metacat.common.dto.notifications.sns.messages.DeletePartitionMessage;
+import com.netflix.metacat.common.dto.notifications.sns.messages.DeleteTableMessage;
+import com.netflix.metacat.common.dto.notifications.sns.messages.UpdateTableMessage;
+import com.netflix.metacat.common.dto.notifications.sns.messages.UpdateTablePartitionsMessage;
+
+import java.io.IOException;
+
+/**
+ * Create SNSMessage object based on the JSON passed in.
+ *
+ * @author tgianos
+ * @since 0.1.47
+ */
+public class SNSMessageFactory {
+
+ private static final String TYPE_FIELD = "type";
+ private final ObjectMapper mapper;
+
+ /**
+ * Constructor.
+ *
+ * @param mapper The object mapper to use for deserialization
+ */
+ public SNSMessageFactory(final ObjectMapper mapper) {
+ this.mapper = mapper;
+ }
+
+ /**
+ * Convert a JSON String into a message if possible.
+ *
+ * @param json The body of the message to convert back to the original message object from JSON string
+ * @return The message bound back into a POJO
+ * @throws IOException When the input isn't valid JSON
+ */
+ public SNSMessage> getMessage(final String json) throws IOException {
+ final JsonNode object = this.mapper.readTree(json);
+ if (object.has(TYPE_FIELD)) {
+ final SNSMessageType messageType = SNSMessageType.valueOf(object.get(TYPE_FIELD).asText());
+ switch (messageType) {
+ case TABLE_CREATE:
+ return this.mapper.readValue(json, CreateTableMessage.class);
+ case TABLE_DELETE:
+ return this.mapper.readValue(json, DeleteTableMessage.class);
+ case TABLE_UPDATE:
+ return this.mapper.readValue(json, UpdateTableMessage.class);
+ case TABLE_PARTITIONS_UPDATE:
+ return this.mapper.readValue(json, UpdateTablePartitionsMessage.class);
+ case PARTITION_ADD:
+ return this.mapper.readValue(json, AddPartitionMessage.class);
+ case PARTITION_DELETE:
+ return this.mapper.readValue(json, DeletePartitionMessage.class);
+ default:
+ throw new UnsupportedOperationException("Unknown type " + messageType);
+ }
+ } else {
+ // won't know how to bind
+ throw new IOException("Invalid content. No field type field found");
+ }
+ }
+}
diff --git a/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/SNSMessageType.java b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/SNSMessageType.java
new file mode 100644
index 000000000..c45c2ab24
--- /dev/null
+++ b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/SNSMessageType.java
@@ -0,0 +1,56 @@
+/*
+ *
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package com.netflix.metacat.common.dto.notifications.sns;
+
+/**
+ * Enumeration of the various types of SNS events there can be.
+ *
+ * @author tgianos
+ * @since 0.1.47
+ */
+public enum SNSMessageType {
+ /**
+ * When a table is created.
+ */
+ TABLE_CREATE,
+
+ /**
+ * When a table is deleted.
+ */
+ TABLE_DELETE,
+
+ /**
+ * When the metadata about a table is updated somehow.
+ */
+ TABLE_UPDATE,
+
+ /**
+ * When the partitions for a table are either created or deleted.
+ */
+ TABLE_PARTITIONS_UPDATE,
+
+ /**
+ * When a partition is added.
+ */
+ PARTITION_ADD,
+
+ /**
+ * When a partition is deleted.
+ */
+ PARTITION_DELETE
+}
diff --git a/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/messages/AddPartitionMessage.java b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/messages/AddPartitionMessage.java
new file mode 100644
index 000000000..d3c274618
--- /dev/null
+++ b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/messages/AddPartitionMessage.java
@@ -0,0 +1,57 @@
+/*
+ *
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package com.netflix.metacat.common.dto.notifications.sns.messages;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.netflix.metacat.common.dto.PartitionDto;
+import com.netflix.metacat.common.dto.notifications.sns.SNSMessage;
+import com.netflix.metacat.common.dto.notifications.sns.SNSMessageType;
+import lombok.Getter;
+import lombok.ToString;
+
+/**
+ * A message sent when a partition is created.
+ *
+ * @author tgianos
+ * @since 0.1.47
+ */
+@Getter
+@ToString(callSuper = true)
+public class AddPartitionMessage extends SNSMessage {
+
+ /**
+ * Create a new AddPartitionMessage.
+ *
+ * @param id The unique id of the message
+ * @param timestamp The number of milliseconds since epoch that this message occurred
+ * @param requestId The id of the API request that generated this and possibly other messages. Used for grouping
+ * @param name The qualified name of the resource that this notification is being generated for
+ * @param payload The payload of the notification
+ */
+ @JsonCreator
+ public AddPartitionMessage(
+ @JsonProperty("id") final String id,
+ @JsonProperty("timestamp") final long timestamp,
+ @JsonProperty("requestId") final String requestId,
+ @JsonProperty("name") final String name,
+ @JsonProperty("payload") final PartitionDto payload
+ ) {
+ super(id, timestamp, requestId, SNSMessageType.PARTITION_ADD, name, payload);
+ }
+}
diff --git a/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/messages/CreateTableMessage.java b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/messages/CreateTableMessage.java
new file mode 100644
index 000000000..b8ead7316
--- /dev/null
+++ b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/messages/CreateTableMessage.java
@@ -0,0 +1,57 @@
+/*
+ *
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package com.netflix.metacat.common.dto.notifications.sns.messages;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.netflix.metacat.common.dto.TableDto;
+import com.netflix.metacat.common.dto.notifications.sns.SNSMessage;
+import com.netflix.metacat.common.dto.notifications.sns.SNSMessageType;
+import lombok.Getter;
+import lombok.ToString;
+
+/**
+ * A message sent when a table is created.
+ *
+ * @author tgianos
+ * @since 0.1.47
+ */
+@Getter
+@ToString(callSuper = true)
+public class CreateTableMessage extends SNSMessage {
+
+ /**
+ * Create a new CreateTableMessage.
+ *
+ * @param id The unique id of the message
+ * @param timestamp The number of milliseconds since epoch that this message occurred
+ * @param requestId The id of the API request that generated this and possibly other messages. Used for grouping
+ * @param name The qualified name of the resource that this notification is being generated for
+ * @param payload The payload of the notification
+ */
+ @JsonCreator
+ public CreateTableMessage(
+ @JsonProperty("id") final String id,
+ @JsonProperty("timestamp") final long timestamp,
+ @JsonProperty("requestId") final String requestId,
+ @JsonProperty("name") final String name,
+ @JsonProperty("payload") final TableDto payload
+ ) {
+ super(id, timestamp, requestId, SNSMessageType.TABLE_CREATE, name, payload);
+ }
+}
diff --git a/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/messages/DeletePartitionMessage.java b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/messages/DeletePartitionMessage.java
new file mode 100644
index 000000000..46cdb8d83
--- /dev/null
+++ b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/messages/DeletePartitionMessage.java
@@ -0,0 +1,56 @@
+/*
+ *
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package com.netflix.metacat.common.dto.notifications.sns.messages;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.netflix.metacat.common.dto.notifications.sns.SNSMessage;
+import com.netflix.metacat.common.dto.notifications.sns.SNSMessageType;
+import lombok.Getter;
+import lombok.ToString;
+
+/**
+ * A message sent when a partition is deleted.
+ *
+ * @author tgianos
+ * @since 0.1.47
+ */
+@Getter
+@ToString(callSuper = true)
+public class DeletePartitionMessage extends SNSMessage {
+
+ /**
+ * Create a new DeletePartitionMessage.
+ *
+ * @param id The unique id of the message
+ * @param timestamp The number of milliseconds since epoch that this message occurred
+ * @param requestId The id of the API request that generated this and possibly other messages. Used for grouping
+ * @param name The qualified name of the resource that this notification is being generated for
+ * @param payload The payload of the notification
+ */
+ @JsonCreator
+ public DeletePartitionMessage(
+ @JsonProperty("id") final String id,
+ @JsonProperty("timestamp") final long timestamp,
+ @JsonProperty("requestId") final String requestId,
+ @JsonProperty("name") final String name,
+ @JsonProperty("payload") final String payload
+ ) {
+ super(id, timestamp, requestId, SNSMessageType.PARTITION_DELETE, name, payload);
+ }
+}
diff --git a/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/messages/DeleteTableMessage.java b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/messages/DeleteTableMessage.java
new file mode 100644
index 000000000..1521890d8
--- /dev/null
+++ b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/messages/DeleteTableMessage.java
@@ -0,0 +1,57 @@
+/*
+ *
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package com.netflix.metacat.common.dto.notifications.sns.messages;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.netflix.metacat.common.dto.TableDto;
+import com.netflix.metacat.common.dto.notifications.sns.SNSMessage;
+import com.netflix.metacat.common.dto.notifications.sns.SNSMessageType;
+import lombok.Getter;
+import lombok.ToString;
+
+/**
+ * A message sent when a table is deleted.
+ *
+ * @author tgianos
+ * @since 0.1.47
+ */
+@Getter
+@ToString(callSuper = true)
+public class DeleteTableMessage extends SNSMessage {
+
+ /**
+ * Create a new DeleteTableMessage.
+ *
+ * @param id The unique id of the message
+ * @param timestamp The number of milliseconds since epoch that this message occurred
+ * @param requestId The id of the API request that generated this and possibly other messages. Used for grouping
+ * @param name The qualified name of the resource that this notification is being generated for
+ * @param payload The payload of the notification
+ */
+ @JsonCreator
+ public DeleteTableMessage(
+ @JsonProperty("id") final String id,
+ @JsonProperty("timestamp") final long timestamp,
+ @JsonProperty("requestId") final String requestId,
+ @JsonProperty("name") final String name,
+ @JsonProperty("payload") final TableDto payload
+ ) {
+ super(id, timestamp, requestId, SNSMessageType.TABLE_DELETE, name, payload);
+ }
+}
diff --git a/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/messages/UpdateTableMessage.java b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/messages/UpdateTableMessage.java
new file mode 100644
index 000000000..f904e71f2
--- /dev/null
+++ b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/messages/UpdateTableMessage.java
@@ -0,0 +1,58 @@
+/*
+ *
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package com.netflix.metacat.common.dto.notifications.sns.messages;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.netflix.metacat.common.dto.TableDto;
+import com.netflix.metacat.common.dto.notifications.sns.SNSMessage;
+import com.netflix.metacat.common.dto.notifications.sns.SNSMessageType;
+import com.netflix.metacat.common.dto.notifications.sns.payloads.UpdatePayload;
+import lombok.Getter;
+import lombok.ToString;
+
+/**
+ * A message sent when a table is updated.
+ *
+ * @author tgianos
+ * @since 0.1.47
+ */
+@Getter
+@ToString(callSuper = true)
+public class UpdateTableMessage extends SNSMessage> {
+
+ /**
+ * Create a new UpdateTableMessage.
+ *
+ * @param id The unique id of the message
+ * @param timestamp The number of milliseconds since epoch that this message occurred
+ * @param requestId The id of the API request that generated this and possibly other messages. Used for grouping
+ * @param name The qualified name of the resource that this notification is being generated for
+ * @param payload The payload of the notification
+ */
+ @JsonCreator
+ public UpdateTableMessage(
+ @JsonProperty("id") final String id,
+ @JsonProperty("timestamp") final long timestamp,
+ @JsonProperty("requestId") final String requestId,
+ @JsonProperty("name") final String name,
+ @JsonProperty("payload") final UpdatePayload payload
+ ) {
+ super(id, timestamp, requestId, SNSMessageType.TABLE_UPDATE, name, payload);
+ }
+}
diff --git a/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/messages/UpdateTablePartitionsMessage.java b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/messages/UpdateTablePartitionsMessage.java
new file mode 100644
index 000000000..584e637a5
--- /dev/null
+++ b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/messages/UpdateTablePartitionsMessage.java
@@ -0,0 +1,55 @@
+/*
+ *
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package com.netflix.metacat.common.dto.notifications.sns.messages;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.netflix.metacat.common.dto.notifications.sns.SNSMessage;
+import com.netflix.metacat.common.dto.notifications.sns.SNSMessageType;
+import com.netflix.metacat.common.dto.notifications.sns.payloads.TablePartitionsUpdatePayload;
+import lombok.Getter;
+import lombok.ToString;
+
+/**
+ * Message sent when the partitions for a table are updated.
+ *
+ * @author tgianos
+ * @since 0.1.47
+ */
+@Getter
+@ToString(callSuper = true)
+public class UpdateTablePartitionsMessage extends SNSMessage {
+
+ /**
+ * Create a new UpdateTablePartitionsMessage.
+ *
+ * @param id The unique id of the message
+ * @param timestamp The number of milliseconds since epoch that this message occurred
+ * @param requestId The id of the API request that generated this and possibly other messages. Used for grouping
+ * @param name The qualified name of the resource that this notification is being generated for
+ * @param payload The payload of the notification
+ */
+ public UpdateTablePartitionsMessage(
+ @JsonProperty("id") final String id,
+ @JsonProperty("timestamp") final long timestamp,
+ @JsonProperty("requestId") final String requestId,
+ @JsonProperty("name") final String name,
+ @JsonProperty("payload") final TablePartitionsUpdatePayload payload
+ ) {
+ super(id, timestamp, requestId, SNSMessageType.TABLE_PARTITIONS_UPDATE, name, payload);
+ }
+}
diff --git a/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/messages/package-info.java b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/messages/package-info.java
new file mode 100644
index 000000000..7c09f5700
--- /dev/null
+++ b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/messages/package-info.java
@@ -0,0 +1,25 @@
+/*
+ *
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/**
+ * Specific notification messages which extend SNSMessage.
+ *
+ * @author tgianos
+ * @since 0.1.46
+ */
+package com.netflix.metacat.common.dto.notifications.sns.messages;
diff --git a/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/package-info.java b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/package-info.java
new file mode 100644
index 000000000..5166f19aa
--- /dev/null
+++ b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/package-info.java
@@ -0,0 +1,25 @@
+/*
+ *
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/**
+ * Data Transfer Objects (DTO) for AWS SNS notifications from Metacat.
+ *
+ * @author tgianos
+ * @since 0.1.47
+ */
+package com.netflix.metacat.common.dto.notifications.sns;
diff --git a/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/payloads/TablePartitionsUpdatePayload.java b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/payloads/TablePartitionsUpdatePayload.java
new file mode 100644
index 000000000..b1780c737
--- /dev/null
+++ b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/payloads/TablePartitionsUpdatePayload.java
@@ -0,0 +1,51 @@
+/*
+ *
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package com.netflix.metacat.common.dto.notifications.sns.payloads;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.Getter;
+import lombok.ToString;
+
+/**
+ * Information about how the partitions have changed when a table was updated.
+ *
+ * @author tgianos
+ * @since 0.1.47
+ */
+@Getter
+@ToString
+public class TablePartitionsUpdatePayload {
+ private final int numCreatedPartitions;
+ private final int numDeletedPartitions;
+
+ /**
+ * Constructor.
+ *
+ * @param numCreatedPartitions The number of partitions that were created for the table
+ * @param numDeletedPartitions The number of partitions that were deleted from the table
+ */
+ @JsonCreator
+ public TablePartitionsUpdatePayload(
+ @JsonProperty("numCreatedPartitions") final int numCreatedPartitions,
+ @JsonProperty("numDeletedPartitions") final int numDeletedPartitions
+ ) {
+ this.numCreatedPartitions = numCreatedPartitions;
+ this.numDeletedPartitions = numDeletedPartitions;
+ }
+}
diff --git a/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/payloads/UpdatePayload.java b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/payloads/UpdatePayload.java
new file mode 100644
index 000000000..defeaca8e
--- /dev/null
+++ b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/payloads/UpdatePayload.java
@@ -0,0 +1,57 @@
+/*
+ *
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package com.netflix.metacat.common.dto.notifications.sns.payloads;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.github.fge.jsonpatch.JsonPatch;
+import lombok.Getter;
+import lombok.ToString;
+
+/**
+ * Represents the contents of an update payload.
+ *
+ * @param The DTO type that was update. e.g. com.netflix.metacat.common.dto.TableDto
+ * @author tgianos
+ * @since 0.1.47
+ */
+@Getter
+@ToString
+public class UpdatePayload {
+ private T previous;
+ private JsonPatch patch;
+ private T current;
+
+ /**
+ * Create a new update payload.
+ *
+ * @param previous The previous version of the object that was updated
+ * @param patch The JSON patch to go from previous to current
+ * @param current The current version of the object that was updated
+ */
+ @JsonCreator
+ public UpdatePayload(
+ @JsonProperty("previous") final T previous,
+ @JsonProperty("patch") final JsonPatch patch,
+ @JsonProperty("current") final T current
+ ) {
+ this.previous = previous;
+ this.patch = patch;
+ this.current = current;
+ }
+}
diff --git a/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/payloads/package-info.java b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/payloads/package-info.java
new file mode 100644
index 000000000..d1b431e62
--- /dev/null
+++ b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/payloads/package-info.java
@@ -0,0 +1,25 @@
+/*
+ *
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/**
+ * Various payload representations for SNS Notifications.
+ *
+ * @author tgianos
+ * @since 0.1.47
+ */
+package com.netflix.metacat.common.dto.notifications.sns.payloads;
diff --git a/metacat-common/src/test/groovy/com/netflix/metacat/common/dto/notifications/sns/SNSMessageFactorySpec.groovy b/metacat-common/src/test/groovy/com/netflix/metacat/common/dto/notifications/sns/SNSMessageFactorySpec.groovy
new file mode 100644
index 000000000..5de6801b1
--- /dev/null
+++ b/metacat-common/src/test/groovy/com/netflix/metacat/common/dto/notifications/sns/SNSMessageFactorySpec.groovy
@@ -0,0 +1,121 @@
+/*
+ *
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package com.netflix.metacat.common.dto.notifications.sns
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.github.fge.jsonpatch.diff.JsonDiff
+import com.netflix.metacat.common.dto.PartitionDto
+import com.netflix.metacat.common.dto.TableDto
+import com.netflix.metacat.common.dto.notifications.sns.messages.*
+import com.netflix.metacat.common.dto.notifications.sns.payloads.TablePartitionsUpdatePayload
+import com.netflix.metacat.common.dto.notifications.sns.payloads.UpdatePayload
+import spock.lang.Shared
+import spock.lang.Specification
+import spock.lang.Unroll
+
+import java.time.Instant
+
+/**
+ * Tests for SNSMessageFactory.
+ *
+ * @author tgianos
+ * @since 0.1.47
+ */
+class SNSMessageFactorySpec extends Specification {
+ @Shared
+ def mapper = new ObjectMapper()
+
+ def factory = new SNSMessageFactory(this.mapper)
+
+
+ @Unroll
+ def "Can Deserialize #clazz from #message"() {
+ when:
+ def obj = this.factory.getMessage(message)
+
+ then:
+ clazz.isInstance(obj)
+
+ where:
+ message | clazz
+ this.mapper.writeValueAsString(
+ new CreateTableMessage(
+ UUID.randomUUID().toString(),
+ Instant.now().toEpochMilli(),
+ UUID.randomUUID().toString(),
+ UUID.randomUUID().toString(),
+ new TableDto()
+ )
+ ) | CreateTableMessage.class
+ this.mapper.writeValueAsString(
+ new DeleteTableMessage(
+ UUID.randomUUID().toString(),
+ Instant.now().toEpochMilli(),
+ UUID.randomUUID().toString(),
+ UUID.randomUUID().toString(),
+ new TableDto()
+ )
+ ) | DeleteTableMessage.class
+ this.mapper.writeValueAsString(
+ new UpdateTableMessage(
+ UUID.randomUUID().toString(),
+ Instant.now().toEpochMilli(),
+ UUID.randomUUID().toString(),
+ UUID.randomUUID().toString(),
+ new UpdatePayload(
+ new TableDto(),
+ JsonDiff.asJsonPatch(
+ this.mapper.readTree("{\"a\":\"b\"}"),
+ this.mapper.readTree("{\"a\":\"c\"}")
+ ),
+ new TableDto()
+ )
+ )
+ ) | UpdateTableMessage.class
+ this.mapper.writeValueAsString(
+ new UpdateTablePartitionsMessage(
+ UUID.randomUUID().toString(),
+ Instant.now().toEpochMilli(),
+ UUID.randomUUID().toString(),
+ UUID.randomUUID().toString(),
+ new TablePartitionsUpdatePayload(
+ 10,
+ 15
+ )
+ )
+ ) | UpdateTablePartitionsMessage.class
+ this.mapper.writeValueAsString(
+ new AddPartitionMessage(
+ UUID.randomUUID().toString(),
+ Instant.now().toEpochMilli(),
+ UUID.randomUUID().toString(),
+ UUID.randomUUID().toString(),
+ new PartitionDto()
+ )
+ ) | AddPartitionMessage.class
+ this.mapper.writeValueAsString(
+ new DeletePartitionMessage(
+ UUID.randomUUID().toString(),
+ Instant.now().toEpochMilli(),
+ UUID.randomUUID().toString(),
+ UUID.randomUUID().toString(),
+ UUID.randomUUID().toString()
+ )
+ ) | DeletePartitionMessage.class
+ }
+}
diff --git a/metacat-common/src/test/groovy/com/netflix/metacat/common/dto/notifications/sns/messages/CreateTableMessageSpec.groovy b/metacat-common/src/test/groovy/com/netflix/metacat/common/dto/notifications/sns/messages/CreateTableMessageSpec.groovy
new file mode 100644
index 000000000..7d0ce233d
--- /dev/null
+++ b/metacat-common/src/test/groovy/com/netflix/metacat/common/dto/notifications/sns/messages/CreateTableMessageSpec.groovy
@@ -0,0 +1,77 @@
+/*
+ *
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package com.netflix.metacat.common.dto.notifications.sns.messages
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.netflix.metacat.common.dto.TableDto
+import com.netflix.metacat.common.dto.notifications.sns.SNSMessageType
+import spock.lang.Specification
+
+import java.time.Instant
+
+/**
+ * Tests for CreateTableMessage.
+ *
+ * @author tgianos
+ * @since 0.1.47
+ */
+class CreateTableMessageSpec extends Specification {
+
+ def mapper = new ObjectMapper()
+
+ def "Can Construct"() {
+ def id = UUID.randomUUID().toString()
+ def timestamp = Instant.now().toEpochMilli()
+ def requestId = UUID.randomUUID().toString()
+ def name = UUID.randomUUID().toString()
+ def payload = Mock(TableDto)
+
+ when:
+ def message = new CreateTableMessage(id, timestamp, requestId, name, payload)
+
+ then:
+ message != null
+ message.getId() == id
+ message.getTimestamp() == timestamp
+ message.getRequestId() == requestId
+ message.getName() == name
+ message.getType() == SNSMessageType.TABLE_CREATE
+ message.getPayload() == payload
+ }
+
+ def "Can Serialize and Deserialize"() {
+ def id = UUID.randomUUID().toString()
+ def timestamp = Instant.now().toEpochMilli()
+ def requestId = UUID.randomUUID().toString()
+ def name = UUID.randomUUID().toString()
+ def payload = new TableDto()
+ def message = new CreateTableMessage(id, timestamp, requestId, name, payload)
+
+ when:
+ def json = this.mapper.writeValueAsString(message)
+ def message2 = this.mapper.readValue(json, CreateTableMessage.class)
+
+ then:
+ message.getId() == message2.getId()
+ message.getTimestamp() == message2.getTimestamp()
+ message.getRequestId() == message2.getRequestId()
+ message.getName() == message2.getName()
+ message.getType() == message2.getType()
+ message.getPayload() == message2.getPayload()
+ }
+}
diff --git a/metacat-common/src/test/groovy/com/netflix/metacat/common/dto/notifications/sns/payloads/UpdatePayloadSpec.groovy b/metacat-common/src/test/groovy/com/netflix/metacat/common/dto/notifications/sns/payloads/UpdatePayloadSpec.groovy
new file mode 100644
index 000000000..07f0e103a
--- /dev/null
+++ b/metacat-common/src/test/groovy/com/netflix/metacat/common/dto/notifications/sns/payloads/UpdatePayloadSpec.groovy
@@ -0,0 +1,72 @@
+/*
+ *
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package com.netflix.metacat.common.dto.notifications.sns.payloads
+
+import com.fasterxml.jackson.core.type.TypeReference
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.github.fge.jsonpatch.diff.JsonDiff
+import com.google.common.collect.Lists
+import com.netflix.metacat.common.dto.TableDto
+import spock.lang.Specification
+
+/**
+ * Tests for the UploadPayload class.
+ *
+ * @author tgianos
+ * @since 0.1.47
+ */
+class UpdatePayloadSpec extends Specification {
+
+ def mapper = new ObjectMapper()
+
+ def "can construct with variables saved"() {
+ def previous = Mock(TableDto)
+ def current = Mock(TableDto)
+ def patch = JsonDiff.asJsonPatch(this.mapper.readTree("{\"a\":\"b\"}"), this.mapper.readTree("{\"a\":\"c\"}"))
+
+ when:
+ def updatePayload = new UpdatePayload(previous, patch, current)
+
+ then:
+ updatePayload != null
+ updatePayload.getCurrent() == current
+ updatePayload.getPatch() == patch
+ updatePayload.getPrevious() == previous
+ }
+
+ def "can serialize and deserialize with no data loss"() {
+ def previous = Lists.newArrayList("one", "three")
+ def current = Lists.newArrayList("one", "two", "three")
+ def patch = JsonDiff.asJsonPatch(this.mapper.valueToTree(previous), this.mapper.valueToTree(current))
+ UpdatePayload> payload = new UpdatePayload<>(previous, patch, current)
+
+ when: "Serialize to JSON and then back to a POJO and back to JSON"
+ def json = this.mapper.writeValueAsString(payload)
+ UpdatePayload> payload2 = this.mapper.readValue(
+ json,
+ new TypeReference>>() {}
+ )
+ def json2 = this.mapper.writeValueAsString(payload2)
+
+ then: "Make sure all the values are still equal"
+ payload.getCurrent() == payload2.getCurrent()
+ payload.getPatch().toString() == payload2.getPatch().toString()
+ payload.getPrevious() == payload2.getPrevious()
+ json == json2
+ }
+}
diff --git a/metacat-main/src/main/java/com/netflix/metacat/main/init/MetacatInitializationService.java b/metacat-main/src/main/java/com/netflix/metacat/main/init/MetacatInitializationService.java
index 2144907b6..dd8721053 100644
--- a/metacat-main/src/main/java/com/netflix/metacat/main/init/MetacatInitializationService.java
+++ b/metacat-main/src/main/java/com/netflix/metacat/main/init/MetacatInitializationService.java
@@ -26,6 +26,7 @@
import com.netflix.metacat.common.util.ThreadServiceManager;
import com.netflix.metacat.main.manager.PluginManager;
import com.netflix.metacat.main.presto.metadata.CatalogManager;
+import com.netflix.metacat.main.services.notifications.NotificationService;
import com.netflix.metacat.main.services.search.MetacatEventHandlers;
import io.airlift.configuration.ConfigurationFactory;
import io.airlift.configuration.ConfigurationProvider;
@@ -34,6 +35,7 @@
import javax.inject.Inject;
import java.util.Map;
+import java.util.Set;
/**
* Metacat initialization service.
@@ -45,17 +47,29 @@ public class MetacatInitializationService {
/**
* Constructor.
- * @param injector injector
- * @param config config
+ *
+ * @param injector injector
+ * @param config config
+ * @param eventBus The event bus to use for internal events
+ * @param notificationServices The notification service implementations to register for receiving events
*/
@Inject
- public MetacatInitializationService(final Injector injector, final Config config) {
+ public MetacatInitializationService(
+ final Injector injector,
+ final Config config,
+ final MetacatEventBus eventBus,
+ final Set notificationServices
+ ) {
this.config = config;
this.injector = injector;
+
+ // Register all the services to listen for events
+ notificationServices.forEach(eventBus::register);
}
/**
* Returns the config factory.
+ *
* @return config factory
*/
public ConfigurationFactory getConfigurationFactory() {
@@ -70,6 +84,7 @@ public ConfigurationFactory getConfigurationFactory() {
/**
* Metacat service initialization.
+ *
* @throws Exception error
*/
public void start() throws Exception {
@@ -100,6 +115,7 @@ public void start() throws Exception {
/**
* Metcat service shutdown.
+ *
* @throws Exception error
*/
public void stop() throws Exception {
diff --git a/metacat-main/src/main/java/com/netflix/metacat/main/services/ServicesModule.java b/metacat-main/src/main/java/com/netflix/metacat/main/services/ServicesModule.java
index 0cf1f0de8..cc8fae2dc 100644
--- a/metacat-main/src/main/java/com/netflix/metacat/main/services/ServicesModule.java
+++ b/metacat-main/src/main/java/com/netflix/metacat/main/services/ServicesModule.java
@@ -20,6 +20,7 @@
import com.netflix.metacat.main.services.impl.MViewServiceImpl;
import com.netflix.metacat.main.services.impl.PartitionServiceImpl;
import com.netflix.metacat.main.services.impl.TableServiceImpl;
+import com.netflix.metacat.main.services.notifications.NotificationsModule;
import com.netflix.metacat.main.services.search.ElasticSearchClientProvider;
import com.netflix.metacat.main.services.search.ElasticSearchMetacatRefresh;
import com.netflix.metacat.main.services.search.ElasticSearchUtil;
@@ -34,6 +35,8 @@
public class ServicesModule extends AbstractModule {
@Override
protected void configure() {
+ install(new NotificationsModule());
+
binder().bind(CatalogService.class).to(CatalogServiceImpl.class).in(Scopes.SINGLETON);
binder().bind(DatabaseService.class).to(DatabaseServiceImpl.class).in(Scopes.SINGLETON);
binder().bind(TableService.class).to(TableServiceImpl.class).in(Scopes.SINGLETON);
diff --git a/metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/DefaultNotificationServiceImpl.java b/metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/DefaultNotificationServiceImpl.java
new file mode 100644
index 000000000..277b4fc7f
--- /dev/null
+++ b/metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/DefaultNotificationServiceImpl.java
@@ -0,0 +1,89 @@
+/*
+ *
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package com.netflix.metacat.main.services.notifications;
+
+import com.netflix.metacat.common.server.events.MetacatCreateTablePostEvent;
+import com.netflix.metacat.common.server.events.MetacatDeleteTablePartitionPostEvent;
+import com.netflix.metacat.common.server.events.MetacatDeleteTablePostEvent;
+import com.netflix.metacat.common.server.events.MetacatRenameTablePostEvent;
+import com.netflix.metacat.common.server.events.MetacatSaveTablePartitionPostEvent;
+import com.netflix.metacat.common.server.events.MetacatUpdateTablePostEvent;
+import lombok.extern.slf4j.Slf4j;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * This is a default implementation of the NotificationService interface. It doesn't really do anything other than
+ * log the event that would have generated some sort of external notification in a real instance. This class exists
+ * primarily to handle returns from providers when the "plugin" isn't enabled instead of returning null which is
+ * prohibited by the Provider interface definition.
+ *
+ * @author tgianos
+ * @since 0.1.47
+ */
+@Slf4j
+public class DefaultNotificationServiceImpl implements NotificationService {
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void notifyOfPartitionAddition(@NotNull final MetacatSaveTablePartitionPostEvent event) {
+ log.debug(event.toString());
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void notifyOfPartitionDeletion(@NotNull final MetacatDeleteTablePartitionPostEvent event) {
+ log.debug(event.toString());
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void notifyOfTableCreation(@NotNull final MetacatCreateTablePostEvent event) {
+ log.debug(event.toString());
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void notifyOfTableDeletion(@NotNull final MetacatDeleteTablePostEvent event) {
+ log.debug(event.toString());
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void notifyOfTableRename(@NotNull final MetacatRenameTablePostEvent event) {
+ log.debug(event.toString());
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void notifyOfTableUpdate(@NotNull final MetacatUpdateTablePostEvent event) {
+ log.debug(event.toString());
+ }
+}
diff --git a/metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/NotificationService.java b/metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/NotificationService.java
new file mode 100644
index 000000000..c2fcd50e3
--- /dev/null
+++ b/metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/NotificationService.java
@@ -0,0 +1,86 @@
+/*
+ *
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package com.netflix.metacat.main.services.notifications;
+
+import com.google.common.eventbus.Subscribe;
+import com.netflix.metacat.common.server.events.MetacatCreateTablePostEvent;
+import com.netflix.metacat.common.server.events.MetacatDeleteTablePartitionPostEvent;
+import com.netflix.metacat.common.server.events.MetacatDeleteTablePostEvent;
+import com.netflix.metacat.common.server.events.MetacatRenameTablePostEvent;
+import com.netflix.metacat.common.server.events.MetacatSaveTablePartitionPostEvent;
+import com.netflix.metacat.common.server.events.MetacatUpdateTablePostEvent;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * Interface for services which will provide external notifications based on internal events. The structure and
+ * destinations of the notifications are left up to the implementation.
+ *
+ * @author tgianos
+ * @since 0.1.47
+ */
+public interface NotificationService {
+
+ /**
+ * Publish information about a partition being added.
+ *
+ * @param event The event passed within the JVM after a partition has been successfully added
+ */
+ @Subscribe
+ void notifyOfPartitionAddition(@NotNull final MetacatSaveTablePartitionPostEvent event);
+
+ /**
+ * Publish information about a partition being deleted.
+ *
+ * @param event The event passed within the JVM after a partition has been successfully deleted
+ */
+ @Subscribe
+ void notifyOfPartitionDeletion(@NotNull final MetacatDeleteTablePartitionPostEvent event);
+
+ /**
+ * Publish information about a table being created.
+ *
+ * @param event The event passed within the JVM after a table has been successfully created
+ */
+ @Subscribe
+ void notifyOfTableCreation(@NotNull final MetacatCreateTablePostEvent event);
+
+ /**
+ * Publish information about a table being deleted.
+ *
+ * @param event The event passed within the JVM after a table has been successfully deleted
+ */
+ @Subscribe
+ void notifyOfTableDeletion(@NotNull final MetacatDeleteTablePostEvent event);
+
+ /**
+ * Publish information about a table being renamed.
+ *
+ * @param event The event passed within the JVM after a table has been successfully renamed
+ */
+ @Subscribe
+ void notifyOfTableRename(@NotNull final MetacatRenameTablePostEvent event);
+
+ /**
+ * Publish information about a table being updated.
+ *
+ * @param event The event passed within the JVM after a table has been successfully updated
+ */
+ @Subscribe
+ void notifyOfTableUpdate(@NotNull final MetacatUpdateTablePostEvent event);
+}
diff --git a/metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/NotificationsModule.java b/metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/NotificationsModule.java
new file mode 100644
index 000000000..c14a7e5b7
--- /dev/null
+++ b/metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/NotificationsModule.java
@@ -0,0 +1,38 @@
+/*
+ *
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package com.netflix.metacat.main.services.notifications;
+
+import com.google.inject.AbstractModule;
+import com.netflix.metacat.main.services.notifications.sns.SNSNotificationsModule;
+
+/**
+ * A module to install all the desired Notification implementations.
+ *
+ * @author tgianos
+ * @since 0.1.47
+ */
+public class NotificationsModule extends AbstractModule {
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected void configure() {
+ install(new SNSNotificationsModule());
+ }
+}
diff --git a/metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/package-info.java b/metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/package-info.java
new file mode 100644
index 000000000..8f4bc585f
--- /dev/null
+++ b/metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/package-info.java
@@ -0,0 +1,25 @@
+/*
+ *
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/**
+ * Classes related to sending notifications out of Metacat via implementations of the service interface.
+ *
+ * @author tgianos
+ * @since 0.1.46
+ */
+package com.netflix.metacat.main.services.notifications;
diff --git a/metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/sns/SNSNotificationServiceImpl.java b/metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/sns/SNSNotificationServiceImpl.java
new file mode 100644
index 000000000..58c7d637c
--- /dev/null
+++ b/metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/sns/SNSNotificationServiceImpl.java
@@ -0,0 +1,292 @@
+/*
+ *
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package com.netflix.metacat.main.services.notifications.sns;
+
+import com.amazonaws.services.sns.AmazonSNSClient;
+import com.amazonaws.services.sns.model.PublishResult;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.fge.jsonpatch.JsonPatch;
+import com.github.fge.jsonpatch.diff.JsonDiff;
+import com.github.rholder.retry.RetryException;
+import com.github.rholder.retry.Retryer;
+import com.netflix.metacat.common.dto.PartitionDto;
+import com.netflix.metacat.common.dto.TableDto;
+import com.netflix.metacat.common.dto.notifications.sns.SNSMessage;
+import com.netflix.metacat.common.dto.notifications.sns.messages.AddPartitionMessage;
+import com.netflix.metacat.common.dto.notifications.sns.messages.CreateTableMessage;
+import com.netflix.metacat.common.dto.notifications.sns.messages.DeletePartitionMessage;
+import com.netflix.metacat.common.dto.notifications.sns.messages.DeleteTableMessage;
+import com.netflix.metacat.common.dto.notifications.sns.messages.UpdateTableMessage;
+import com.netflix.metacat.common.dto.notifications.sns.messages.UpdateTablePartitionsMessage;
+import com.netflix.metacat.common.dto.notifications.sns.payloads.TablePartitionsUpdatePayload;
+import com.netflix.metacat.common.dto.notifications.sns.payloads.UpdatePayload;
+import com.netflix.metacat.common.monitoring.CounterWrapper;
+import com.netflix.metacat.common.server.events.MetacatCreateTablePostEvent;
+import com.netflix.metacat.common.server.events.MetacatDeleteTablePartitionPostEvent;
+import com.netflix.metacat.common.server.events.MetacatDeleteTablePostEvent;
+import com.netflix.metacat.common.server.events.MetacatRenameTablePostEvent;
+import com.netflix.metacat.common.server.events.MetacatSaveTablePartitionPostEvent;
+import com.netflix.metacat.common.server.events.MetacatUpdateTablePostEvent;
+import com.netflix.metacat.main.services.notifications.NotificationService;
+import lombok.extern.slf4j.Slf4j;
+
+import javax.validation.constraints.NotNull;
+import javax.validation.constraints.Size;
+import java.io.IOException;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Implementation of the NotificationService using Amazon SNS.
+ *
+ * @author tgianos
+ * @since 0.1.47
+ */
+@Slf4j
+public class SNSNotificationServiceImpl implements NotificationService {
+
+ private final AmazonSNSClient client;
+ private final String tableTopicArn;
+ private final String partitionTopicArn;
+ private final ObjectMapper mapper;
+ private final Retryer retry;
+
+ /**
+ * Constructor.
+ *
+ * @param client The SNS client to use to publish notifications
+ * @param tableTopicArn The topic to publish table related notifications to
+ * @param partitionTopicArn The topic to publish partition related notifications to
+ * @param mapper The object mapper to use to convert objects to JSON strings
+ * @param retry The retry factory to use. Should already be configured
+ */
+ public SNSNotificationServiceImpl(
+ @NotNull final AmazonSNSClient client,
+ @NotNull @Size(min = 1) final String tableTopicArn,
+ @NotNull @Size(min = 1) final String partitionTopicArn,
+ @NotNull final ObjectMapper mapper,
+ @NotNull final Retryer retry
+ ) {
+ this.client = client;
+ this.tableTopicArn = tableTopicArn;
+ this.partitionTopicArn = partitionTopicArn;
+ this.mapper = mapper;
+ this.retry = retry;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void notifyOfPartitionAddition(@NotNull final MetacatSaveTablePartitionPostEvent event) {
+ try {
+ final String name = event.getName().toString();
+ final long timestamp = event.getRequestContext().getTimestamp();
+ final String requestId = event.getRequestContext().getId();
+ for (final PartitionDto partition : event.getPartitions()) {
+ final AddPartitionMessage message = new AddPartitionMessage(
+ UUID.randomUUID().toString(),
+ timestamp,
+ requestId,
+ name,
+ partition
+ );
+ this.publishNotification(this.partitionTopicArn, message);
+ log.debug("Published create partition message {} on {}", message, this.partitionTopicArn);
+ CounterWrapper.incrementCounter("metacat.notifications.sns.partitions.add.succeeded");
+ }
+
+ // Publish a global message stating how many partitions were updated for the table to the table topic
+ final UpdateTablePartitionsMessage message = new UpdateTablePartitionsMessage(
+ UUID.randomUUID().toString(),
+ timestamp,
+ requestId,
+ name,
+ new TablePartitionsUpdatePayload(
+ event.getPartitions().size(),
+ 0
+ )
+ );
+ this.publishNotification(this.tableTopicArn, message);
+ // TODO: In ideal world this this be an injected object to the class so we can mock for tests
+ // swap out implementations etc.
+ CounterWrapper.incrementCounter("metacat.notifications.sns.tables.addPartitions.succeeded");
+ } catch (final ExecutionException | RetryException e) {
+ log.error("Unable to publish partition creation notification", e);
+ CounterWrapper.incrementCounter("metacat.notifications.sns.partitions.add.failed");
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void notifyOfPartitionDeletion(@NotNull final MetacatDeleteTablePartitionPostEvent event) {
+ try {
+ final String name = event.getName().toString();
+ final long timestamp = event.getRequestContext().getTimestamp();
+ final String requestId = event.getRequestContext().getId();
+ for (final String partitionId : event.getPartitionIds()) {
+ final DeletePartitionMessage message = new DeletePartitionMessage(
+ UUID.randomUUID().toString(),
+ timestamp,
+ requestId,
+ name,
+ partitionId
+ );
+ this.publishNotification(this.partitionTopicArn, message);
+ CounterWrapper.incrementCounter("metacat.notifications.sns.partitions.delete.succeeded");
+ log.debug("Published delete partition message {} on {}", message, this.partitionTopicArn);
+ }
+
+ // Publish a global message stating how many partitions were updated for the table to the table topic
+ final UpdateTablePartitionsMessage message = new UpdateTablePartitionsMessage(
+ UUID.randomUUID().toString(),
+ timestamp,
+ requestId,
+ name,
+ new TablePartitionsUpdatePayload(
+ 0,
+ event.getPartitionIds().size()
+ )
+ );
+ this.publishNotification(this.tableTopicArn, message);
+ CounterWrapper.incrementCounter("metacat.notifications.sns.tables.deletePartitions.succeeded");
+ } catch (final ExecutionException | RetryException e) {
+ log.error("Unable to publish partition deletion notification", e);
+ CounterWrapper.incrementCounter("metacat.notifications.sns.partitions.delete.failed");
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void notifyOfTableCreation(@NotNull final MetacatCreateTablePostEvent event) {
+ try {
+ final CreateTableMessage message = new CreateTableMessage(
+ UUID.randomUUID().toString(),
+ event.getRequestContext().getTimestamp(),
+ event.getRequestContext().getId(),
+ event.getName().toString(),
+ event.getTable()
+ );
+ this.publishNotification(this.tableTopicArn, message);
+ CounterWrapper.incrementCounter("metacat.notifications.sns.tables.create.succeeded");
+ } catch (final ExecutionException | RetryException e) {
+ log.error("Unable to publish create table notification", e);
+ CounterWrapper.incrementCounter("metacat.notifications.sns.tables.create.failed");
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void notifyOfTableDeletion(@NotNull final MetacatDeleteTablePostEvent event) {
+ try {
+ final DeleteTableMessage message = new DeleteTableMessage(
+ UUID.randomUUID().toString(),
+ event.getRequestContext().getTimestamp(),
+ event.getRequestContext().getId(),
+ event.getName().toString(),
+ event.getTable()
+ );
+ this.publishNotification(this.tableTopicArn, message);
+ CounterWrapper.incrementCounter("metacat.notifications.sns.tables.delete.succeeded");
+ } catch (final ExecutionException | RetryException e) {
+ log.error("Unable to publish delete table notification", e);
+ CounterWrapper.incrementCounter("metacat.notifications.sns.tables.delete.failed");
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void notifyOfTableRename(@NotNull final MetacatRenameTablePostEvent event) {
+ try {
+ final UpdateTableMessage message = this.createUpdateTableMessage(
+ UUID.randomUUID().toString(),
+ event.getRequestContext().getTimestamp(),
+ event.getRequestContext().getId(),
+ event.getName().toString(),
+ event.getOldTable(),
+ event.getCurrentTable()
+ );
+ this.publishNotification(this.tableTopicArn, message);
+ CounterWrapper.incrementCounter("metacat.notifications.sns.tables.rename.succeeded");
+ } catch (final ExecutionException | IOException | RetryException e) {
+ log.error("Unable to publish rename table notification", e);
+ CounterWrapper.incrementCounter("metacat.notifications.sns.tables.rename.failed");
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void notifyOfTableUpdate(@NotNull final MetacatUpdateTablePostEvent event) {
+ try {
+ final UpdateTableMessage message = this.createUpdateTableMessage(
+ UUID.randomUUID().toString(),
+ event.getRequestContext().getTimestamp(),
+ event.getRequestContext().getId(),
+ event.getName().toString(),
+ event.getOldTable(),
+ event.getCurrentTable()
+ );
+ this.publishNotification(this.tableTopicArn, message);
+ CounterWrapper.incrementCounter("metacat.notifications.sns.tables.update.succeeded");
+ } catch (final IOException | ExecutionException | RetryException e) {
+ log.error("Unable to publish update table notification", e);
+ CounterWrapper.incrementCounter("metacat.notifications.sns.tables.update.failed");
+ }
+ }
+
+ private UpdateTableMessage createUpdateTableMessage(
+ final String id,
+ final long timestamp,
+ final String requestId,
+ final String name,
+ final TableDto oldTable,
+ final TableDto currentTable
+ ) throws IOException {
+ final JsonPatch patch = JsonDiff.asJsonPatch(
+ this.mapper.valueToTree(oldTable),
+ this.mapper.valueToTree(currentTable)
+ );
+ return new UpdateTableMessage(
+ id,
+ timestamp,
+ requestId,
+ name,
+ new UpdatePayload<>(oldTable, patch, currentTable)
+ );
+ }
+
+ private void publishNotification(
+ final String arn,
+ final SNSMessage> message
+ ) throws ExecutionException, RetryException {
+ final PublishResult result = this.retry.call(
+ () -> this.client.publish(arn, this.mapper.writeValueAsString(message))
+ );
+ log.debug("Successfully published message {} to topic {} with id {}", message, arn, result.getMessageId());
+ }
+}
diff --git a/metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/sns/SNSNotificationServiceImplProvider.java b/metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/sns/SNSNotificationServiceImplProvider.java
new file mode 100644
index 000000000..ebea95d1a
--- /dev/null
+++ b/metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/sns/SNSNotificationServiceImplProvider.java
@@ -0,0 +1,97 @@
+/*
+ *
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package com.netflix.metacat.main.services.notifications.sns;
+
+import com.amazonaws.services.sns.AmazonSNSClient;
+import com.amazonaws.services.sns.model.InternalErrorException;
+import com.amazonaws.services.sns.model.PublishResult;
+import com.amazonaws.services.sns.model.ThrottledException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.rholder.retry.Retryer;
+import com.github.rholder.retry.RetryerBuilder;
+import com.github.rholder.retry.StopStrategies;
+import com.github.rholder.retry.WaitStrategies;
+import com.google.inject.Provider;
+import com.google.inject.ProvisionException;
+import com.netflix.metacat.common.server.Config;
+import com.netflix.metacat.main.services.notifications.DefaultNotificationServiceImpl;
+import com.netflix.metacat.main.services.notifications.NotificationService;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+
+import javax.inject.Inject;
+import javax.validation.constraints.NotNull;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Provides an instance of SNSNotificationServiceImpl if conditions are right.
+ *
+ * @author tgianos
+ * @since 0.1.47
+ */
+@Slf4j
+public class SNSNotificationServiceImplProvider implements Provider {
+
+ private final Config config;
+ private final ObjectMapper mapper;
+
+ /**
+ * Constructor.
+ *
+ * @param config The metacat configuration
+ * @param mapper The JSON object mapper to use
+ */
+ @Inject
+ public SNSNotificationServiceImplProvider(@NotNull final Config config, @NotNull final ObjectMapper mapper) {
+ this.config = config;
+ this.mapper = mapper;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public NotificationService get() {
+ if (this.config.isSnsNotificationEnabled()) {
+ final String tableArn = this.config.getSnsTopicTableArn();
+ if (StringUtils.isEmpty(tableArn)) {
+ throw new ProvisionException(
+ "SNS Notifications are enabled but no table ARN provided. Unable to configure."
+ );
+ }
+ final String partitionArn = this.config.getSnsTopicPartitionArn();
+ if (StringUtils.isEmpty(partitionArn)) {
+ throw new ProvisionException(
+ "SNS Notifications are enabled but no partition ARN provided. Unable to configure."
+ );
+ }
+
+ log.info("SNS notifications are enabled. Providing SNSNotificationServiceImpl implementation.");
+ final Retryer retry = RetryerBuilder.newBuilder()
+ .retryIfExceptionOfType(InternalErrorException.class)
+ .retryIfExceptionOfType(ThrottledException.class)
+ .withWaitStrategy(WaitStrategies.incrementingWait(10, TimeUnit.SECONDS, 30, TimeUnit.SECONDS))
+ .withStopStrategy(StopStrategies.stopAfterAttempt(3))
+ .build();
+ return new SNSNotificationServiceImpl(new AmazonSNSClient(), tableArn, partitionArn, this.mapper, retry);
+ } else {
+ log.info("SNS notifications are not enabled. Ignoring and providing default implementation.");
+ return new DefaultNotificationServiceImpl();
+ }
+ }
+}
diff --git a/metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/sns/SNSNotificationsModule.java b/metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/sns/SNSNotificationsModule.java
new file mode 100644
index 000000000..0f939affe
--- /dev/null
+++ b/metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/sns/SNSNotificationsModule.java
@@ -0,0 +1,46 @@
+/*
+ *
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package com.netflix.metacat.main.services.notifications.sns;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Singleton;
+import com.google.inject.multibindings.Multibinder;
+import com.netflix.metacat.main.services.notifications.NotificationService;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Module for configuring notifications to AWS SNS.
+ *
+ * @author tgianos
+ * @since 0.1.47
+ */
+@Slf4j
+public class SNSNotificationsModule extends AbstractModule {
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected void configure() {
+ final Multibinder notificationServices = Multibinder.newSetBinder(
+ this.binder(),
+ NotificationService.class
+ );
+ notificationServices.addBinding().toProvider(SNSNotificationServiceImplProvider.class).in(Singleton.class);
+ }
+}
diff --git a/metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/sns/package-info.java b/metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/sns/package-info.java
new file mode 100644
index 000000000..2bc34ec91
--- /dev/null
+++ b/metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/sns/package-info.java
@@ -0,0 +1,24 @@
+/*
+ *
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+/**
+ * Classes related to setting up and sending SNS Notifications.
+ *
+ * @author tgianos
+ * @since 0.1.47
+ */
+package com.netflix.metacat.main.services.notifications.sns;
diff --git a/metacat-main/src/main/java/com/netflix/metacat/main/services/search/ElasticSearchMetacatRefresh.java b/metacat-main/src/main/java/com/netflix/metacat/main/services/search/ElasticSearchMetacatRefresh.java
index 3fef0d93a..42eb58541 100644
--- a/metacat-main/src/main/java/com/netflix/metacat/main/services/search/ElasticSearchMetacatRefresh.java
+++ b/metacat-main/src/main/java/com/netflix/metacat/main/services/search/ElasticSearchMetacatRefresh.java
@@ -23,7 +23,6 @@
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
@@ -40,6 +39,8 @@
import com.netflix.metacat.common.monitoring.CounterWrapper;
import com.netflix.metacat.common.monitoring.TimerWrapper;
import com.netflix.metacat.common.server.Config;
+import com.netflix.metacat.common.server.events.MetacatDeleteTablePostEvent;
+import com.netflix.metacat.common.server.events.MetacatEventBus;
import com.netflix.metacat.common.usermetadata.TagService;
import com.netflix.metacat.common.usermetadata.UserMetadataService;
import com.netflix.metacat.common.util.MetacatContextManager;
@@ -50,7 +51,6 @@
import lombok.extern.slf4j.Slf4j;
import org.joda.time.Instant;
-import javax.annotation.Nullable;
import javax.inject.Inject;
import java.util.List;
import java.util.Optional;
@@ -66,6 +66,7 @@
/**
* This class does a refresh of all the metadata entities from original data sources to elastic search.
+ *
* @author amajumdar
*/
@Slf4j
@@ -90,10 +91,30 @@ public class ElasticSearchMetacatRefresh {
private UserMetadataService userMetadataService;
@Inject
private TagService tagService;
+ @Inject
+ private MetacatEventBus eventBus;
// Fixed thread pool
private ListeningExecutorService service;
private ListeningExecutorService esService;
+ private static ExecutorService newFixedThreadPool(final int nThreads, final String threadFactoryName,
+ final int queueSize) {
+ return new ThreadPoolExecutor(nThreads, nThreads,
+ 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(queueSize),
+ new ThreadFactoryBuilder()
+ .setNameFormat(threadFactoryName)
+ .build(),
+ (r, executor) -> {
+ // this will block if the queue is full
+ try {
+ executor.getQueue().put(r);
+ } catch (InterruptedException e) {
+ throw Throwables.propagate(e);
+ }
+ });
+ }
+
/**
* Does a sweep across all catalogs to refresh the same data in elastic search.
*/
@@ -106,6 +127,7 @@ public void process() {
/**
* Does a sweep across given catalogs to refresh the same data in elastic search.
+ *
* @param catalogNames catalog anmes
*/
public void processCatalogs(final List catalogNames) {
@@ -116,7 +138,8 @@ public void processCatalogs(final List catalogNames) {
/**
* Does a sweep across given catalog and databases to refresh the same data in elastic search.
- * @param catalogName catalog
+ *
+ * @param catalogName catalog
* @param databaseNames database names
*/
public void processDatabases(final String catalogName, final List databaseNames) {
@@ -128,6 +151,7 @@ public void processDatabases(final String catalogName, final List databa
/**
* Does a sweep across all catalogs to refresh the same data in elastic search.
+ *
* @param names qualified names
*/
public void processPartitions(final List names) {
@@ -152,7 +176,7 @@ private ListenableFuture _processPartitions(final List qNam
final List> indexFutures = Lists.newArrayList();
int offset = 0;
int count;
- Sort sort = null;
+ Sort sort;
if ("s3".equals(tableName.getCatalogName()) || "aegisthus".equals(tableName.getCatalogName())) {
sort = new Sort("id", SortOrder.ASC);
} else {
@@ -179,18 +203,13 @@ private ListenableFuture _processPartitions(final List qNam
.collect(Collectors.toList());
return Futures.transform(Futures.successfulAsList(inputFuturesWithoutNulls), Functions.constant(null));
});
- return Futures.transformAsync(processPartitionsFuture, new AsyncFunction() {
- @Override
- public ListenableFuture apply(
- @Nullable
- final Void input) throws Exception {
- elasticSearchUtil.refresh();
- final List> cleanUpFutures = tables.stream()
- .map(s -> service
- .submit(() -> partitionsCleanUp(QualifiedName.fromString(s, false), excludeQualifiedNames)))
- .collect(Collectors.toList());
- return Futures.transform(Futures.successfulAsList(cleanUpFutures), Functions.constant(null));
- }
+ return Futures.transformAsync(processPartitionsFuture, input -> {
+ elasticSearchUtil.refresh();
+ final List> cleanUpFutures = tables.stream()
+ .map(s -> service
+ .submit(() -> partitionsCleanUp(QualifiedName.fromString(s, false), excludeQualifiedNames)))
+ .collect(Collectors.toList());
+ return Futures.transform(Futures.successfulAsList(cleanUpFutures), Functions.constant(null));
});
}
@@ -231,27 +250,9 @@ private Void partitionsCleanUp(final QualifiedName tableName, final List(queueSize),
- new ThreadFactoryBuilder()
- .setNameFormat(threadFactoryName)
- .build(),
- (r, executor) -> {
- // this will block if the queue is full
- try {
- executor.getQueue().put(r);
- } catch (InterruptedException e) {
- throw Throwables.propagate(e);
- }
- });
- }
-
@SuppressWarnings("checkstyle:methodname")
private void _process(final List qNames, final Supplier> supplier,
- final String requestName, final boolean delete, final int queueSize) {
+ final String requestName, final boolean delete, final int queueSize) {
if (isElasticSearchMetacatRefreshAlreadyRunning.compareAndSet(false, true)) {
final TimerWrapper timer = TimerWrapper
.createStarted("dse.metacat.timer.ElasticSearchMetacatRefresh." + requestName);
@@ -259,7 +260,7 @@ private void _process(final List qNames, final Supplier qNames,
- final List excludeQualifiedNames) {
+ final List excludeQualifiedNames) {
log.info("Start: Delete unmarked entities");
//
// get unmarked qualified names
@@ -340,7 +341,8 @@ private void deleteUnmarkedEntities(final List qNames,
}
} catch (SchemaNotFoundException ignored) {
result = true;
- } catch (Exception ignored) { }
+ } catch (Exception ignored) {
+ }
return result;
}).collect(Collectors.toList());
log.info("Unmarked databases({}): {}", unmarkedDatabaseNames.size(), unmarkedDatabaseNames);
@@ -377,7 +379,8 @@ private void deleteUnmarkedEntities(final List qNames,
if (!dto.isPresent()) {
result = true;
}
- } catch (Exception ignored) { }
+ } catch (Exception ignored) {
+ }
return result;
}).collect(Collectors.toList());
log.info("Unmarked tables({}): {}", unmarkedTableNames.size(), unmarkedTableNames);
@@ -387,8 +390,17 @@ private void deleteUnmarkedEntities(final List qNames,
dto -> dto.getName().toString()).collect(Collectors.toList());
log.info("Deleting tables({}): {}", deleteTableNames.size(), deleteTableNames);
userMetadataService.deleteMetadatas("admin", Lists.newArrayList(deleteTableDtos));
- elasticSearchUtil.softDelete("table", deleteTableNames, context);
- deleteTableDtos.forEach(tableDto -> tagService.delete(tableDto.getName(), false));
+
+ // Publish event. Elasticsearch event handler will take care of updating the index already
+ // TODO: Re-evaluate events vs. direct calls for these types of situations like in Genie
+ deleteTableDtos.forEach(
+ tableDto -> {
+ tagService.delete(tableDto.getName(), false);
+ this.eventBus.postAsync(
+ new MetacatDeleteTablePostEvent(tableDto.getName(), context, tableDto)
+ );
+ }
+ );
}
log.info("End: Delete unmarked tables({})", unmarkedTableDtos.size());
} else {
@@ -459,13 +471,14 @@ private List getCatalogNamesToRefresh() {
/**
* Process the list of databases.
- * @param catalogName catalog name
+ *
+ * @param catalogName catalog name
* @param databaseNames database names
* @return future
*/
@SuppressWarnings("checkstyle:methodname")
private ListenableFuture _processDatabases(final QualifiedName catalogName,
- final List databaseNames) {
+ final List databaseNames) {
ListenableFuture resultFuture = null;
log.info("Full refresh of catalog {} for databases({}): {}", catalogName, databaseNames.size(), databaseNames);
final List> getDatabaseFutures = databaseNames.stream()
@@ -507,8 +520,9 @@ private ListenableFuture _processDatabases(final QualifiedName catalogName
/**
* Save all databases to index it in elastic search.
+ *
* @param catalogName catalog name
- * @param dtos database dtos
+ * @param dtos database dtos
* @return future
*/
private ListenableFuture indexDatabaseDtos(final QualifiedName catalogName, final List dtos) {
@@ -525,12 +539,13 @@ private ListenableFuture indexDatabaseDtos(final QualifiedName catalogName
/**
* Process the list of tables in batches.
+ *
* @param databaseName database name
- * @param tableNames table names
+ * @param tableNames table names
* @return A future containing the tasks
*/
private ListenableFuture processTables(final QualifiedName databaseName,
- final List tableNames) {
+ final List tableNames) {
final List> tableNamesBatches = Lists.partition(tableNames, 500);
final List> processTablesBatchFutures = tableNamesBatches.stream().map(
subTableNames -> _processTables(databaseName, subTableNames)).collect(Collectors.toList());
@@ -540,7 +555,7 @@ private ListenableFuture processTables(final QualifiedName databaseName,
@SuppressWarnings("checkstyle:methodname")
private ListenableFuture _processTables(final QualifiedName databaseName,
- final List tableNames) {
+ final List tableNames) {
final List>> getTableFutures = tableNames.stream()
.map(tableName -> service.submit(() -> {
Optional result = null;
@@ -549,7 +564,7 @@ private ListenableFuture _processTables(final QualifiedName databaseName,
} catch (Exception e) {
log.error("Failed to retrieve table: {}", tableName);
elasticSearchUtil.log("ElasticSearchMetacatRefresh.getTable", ElasticSearchDoc.Type.table.name(),
- tableName.toString(), null, e.getMessage(), e, true);
+ tableName.toString(), null, e.getMessage(), e, true);
}
return result;
}))
@@ -561,12 +576,13 @@ private ListenableFuture _processTables(final QualifiedName databaseName,
/**
* Save all tables to index it in elastic search.
+ *
* @param databaseName database name
- * @param dtos table dtos
+ * @param dtos table dtos
* @return future
*/
private ListenableFuture indexTableDtos(final QualifiedName databaseName,
- final List> dtos) {
+ final List> dtos) {
return esService.submit(() -> {
final List docs = dtos.stream().filter(dto -> dto != null && dto.isPresent()).map(
tableDtoOptional -> {
@@ -582,8 +598,9 @@ private ListenableFuture indexTableDtos(final QualifiedName databaseName,
/**
* Save all tables to index it in elastic search.
+ *
* @param tableName database name
- * @param dtos partition dtos
+ * @param dtos partition dtos
* @return future
*/
private ListenableFuture indexPartitionDtos(final QualifiedName tableName, final List dtos) {
diff --git a/metacat-main/src/test/groovy/com/netflix/metacat/main/services/notifications/sns/SNSNotificationServiceImplProviderSpec.groovy b/metacat-main/src/test/groovy/com/netflix/metacat/main/services/notifications/sns/SNSNotificationServiceImplProviderSpec.groovy
new file mode 100644
index 000000000..62928e437
--- /dev/null
+++ b/metacat-main/src/test/groovy/com/netflix/metacat/main/services/notifications/sns/SNSNotificationServiceImplProviderSpec.groovy
@@ -0,0 +1,88 @@
+/*
+ *
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package com.netflix.metacat.main.services.notifications.sns
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.google.inject.ProvisionException
+import com.netflix.metacat.common.server.Config
+import com.netflix.metacat.main.services.notifications.DefaultNotificationServiceImpl
+import spock.lang.Specification
+
+/**
+ * Tests for SNSNotificationsServiceImplProvider.
+ *
+ * @author tgianos
+ * @since 0.1.47
+ */
+class SNSNotificationServiceImplProviderSpec extends Specification {
+
+ def config = Mock(Config)
+ def mapper = Mock(ObjectMapper)
+
+ def "Will provide default implementation when SNS is disabled"() {
+ def provider = new SNSNotificationServiceImplProvider(config, Mock(ObjectMapper))
+
+ when: "call get"
+ def service = provider.get()
+
+ then:
+ service instanceof DefaultNotificationServiceImpl
+ 1 * config.isSnsNotificationEnabled() >> false
+ 0 * config.getSnsTopicPartitionArn()
+ 0 * config.getSnsTopicTableArn()
+ }
+
+ def "Will provide SNS implementation when SNS is enabled"() {
+ def provider = new SNSNotificationServiceImplProvider(this.config, this.mapper)
+
+ when: "call get"
+ def service = provider.get()
+
+ then: "Should return a SNSNotificationServiceImpl implementation"
+ service instanceof SNSNotificationServiceImpl
+ 1 * this.config.isSnsNotificationEnabled() >> true
+ 1 * this.config.getSnsTopicPartitionArn() >> UUID.randomUUID().toString()
+ 1 * this.config.getSnsTopicTableArn() >> UUID.randomUUID().toString()
+ }
+
+ def "Will throw exception if partition ARN not set but SNS enabled"() {
+ def provider = new SNSNotificationServiceImplProvider(this.config, this.mapper)
+
+ when: "call get"
+ provider.get()
+
+ then: "Will throw a provision exception"
+ thrown ProvisionException
+ 1 * this.config.isSnsNotificationEnabled() >> true
+ 1 * this.config.getSnsTopicPartitionArn()
+ 1 * this.config.getSnsTopicTableArn() >> UUID.randomUUID().toString()
+ }
+
+ def "Will throw exception if table ARN not set but SNS enabled"() {
+ def provider = new SNSNotificationServiceImplProvider(this.config, this.mapper)
+
+ when: "call get"
+ provider.get()
+
+ then: "will throw a provision exception"
+ thrown ProvisionException
+ 1 * this.config.isSnsNotificationEnabled() >> true
+ 0 * this.config.getSnsTopicPartitionArn()
+ 1 * this.config.getSnsTopicTableArn()
+ }
+}
diff --git a/metacat-main/src/test/groovy/com/netflix/metacat/main/services/notifications/sns/SNSNotificationServiceImplSpec.groovy b/metacat-main/src/test/groovy/com/netflix/metacat/main/services/notifications/sns/SNSNotificationServiceImplSpec.groovy
new file mode 100644
index 000000000..0d9e30308
--- /dev/null
+++ b/metacat-main/src/test/groovy/com/netflix/metacat/main/services/notifications/sns/SNSNotificationServiceImplSpec.groovy
@@ -0,0 +1,226 @@
+/*
+ *
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package com.netflix.metacat.main.services.notifications.sns
+
+import com.amazonaws.services.sns.AmazonSNSClient
+import com.amazonaws.services.sns.model.InternalErrorException
+import com.amazonaws.services.sns.model.NotFoundException
+import com.amazonaws.services.sns.model.PublishResult
+import com.amazonaws.services.sns.model.ThrottledException
+import com.facebook.presto.hive.$internal.com.google.common.collect.Lists
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.databind.node.TextNode
+import com.github.rholder.retry.RetryerBuilder
+import com.github.rholder.retry.StopStrategies
+import com.github.rholder.retry.WaitStrategies
+import com.netflix.metacat.common.MetacatRequestContext
+import com.netflix.metacat.common.QualifiedName
+import com.netflix.metacat.common.dto.PartitionDto
+import com.netflix.metacat.common.dto.PartitionsSaveResponseDto
+import com.netflix.metacat.common.dto.TableDto
+import com.netflix.metacat.common.dto.notifications.sns.messages.*
+import com.netflix.metacat.common.server.events.*
+import spock.lang.Specification
+
+import java.util.concurrent.TimeUnit
+
+/**
+ * Tests for the SNSNotificationServiceImpl.
+ *
+ * @author tgianos
+ * @since 0.1.47
+ */
+class SNSNotificationServiceImplSpec extends Specification {
+
+ def client = Mock(AmazonSNSClient)
+ def qName = QualifiedName.fromString(
+ UUID.randomUUID().toString()
+ + "/"
+ + UUID.randomUUID().toString()
+ + "/"
+ + UUID.randomUUID().toString()
+ + "/"
+ + UUID.randomUUID().toString()
+ + "/"
+ + UUID.randomUUID().toString()
+ )
+ def mapper = Mock(ObjectMapper)
+ def partitionArn = UUID.randomUUID().toString()
+ def tableArn = UUID.randomUUID().toString()
+ def retry = RetryerBuilder. newBuilder()
+ .retryIfExceptionOfType(InternalErrorException.class)
+ .retryIfExceptionOfType(ThrottledException.class)
+ .withWaitStrategy(WaitStrategies.incrementingWait(10, TimeUnit.NANOSECONDS, 30, TimeUnit.NANOSECONDS))
+ .withStopStrategy(StopStrategies.stopAfterAttempt(3))
+ .build();
+ def service = new SNSNotificationServiceImpl(this.client, this.tableArn, this.partitionArn, this.mapper, this.retry)
+
+ def "Will Notify On Partition Creation"() {
+ def partitions = Lists.newArrayList(new PartitionDto(), new PartitionDto(), new PartitionDto())
+
+ def event = new MetacatSaveTablePartitionPostEvent(
+ this.qName,
+ Mock(MetacatRequestContext),
+ partitions,
+ Mock(PartitionsSaveResponseDto)
+ )
+
+ when:
+ this.service.notifyOfPartitionAddition(event)
+
+ then:
+ 3 * this.mapper.writeValueAsString(_ as AddPartitionMessage) >> UUID.randomUUID().toString()
+ 1 * this.mapper.writeValueAsString(_ as UpdateTablePartitionsMessage) >> UUID.randomUUID().toString()
+ 3 * this.client.publish(this.partitionArn, _ as String) >> new PublishResult()
+ 1 * this.client.publish(this.tableArn, _ as String) >> new PublishResult()
+ }
+
+ def "Will Notify On Partition Deletion"() {
+ def partitions = Lists.newArrayList(
+ UUID.randomUUID().toString(),
+ UUID.randomUUID().toString(),
+ UUID.randomUUID().toString(),
+ UUID.randomUUID().toString(),
+ UUID.randomUUID().toString()
+ )
+
+ def event = new MetacatDeleteTablePartitionPostEvent(
+ this.qName,
+ Mock(MetacatRequestContext),
+ partitions
+ )
+
+ when:
+ this.service.notifyOfPartitionDeletion(event)
+
+ then:
+ 5 * this.mapper.writeValueAsString(_ as DeletePartitionMessage) >> UUID.randomUUID().toString()
+ 1 * this.mapper.writeValueAsString(_ as UpdateTablePartitionsMessage) >> UUID.randomUUID().toString()
+ 5 * this.client.publish(this.partitionArn, _ as String) >> new PublishResult()
+ 1 * this.client.publish(this.tableArn, _ as String) >> new PublishResult()
+ }
+
+ def "Will Notify On Table Creation"() {
+ def event = new MetacatCreateTablePostEvent(
+ this.qName,
+ Mock(MetacatRequestContext),
+ new TableDto()
+ )
+
+ when:
+ this.service.notifyOfTableCreation(event)
+
+ then:
+ 1 * this.mapper.writeValueAsString(_ as CreateTableMessage) >> UUID.randomUUID().toString()
+ 1 * this.client.publish(this.tableArn, _ as String) >> new PublishResult()
+ }
+
+ def "Will Notify On Table Deletion"() {
+ def event = new MetacatDeleteTablePostEvent(
+ this.qName,
+ Mock(MetacatRequestContext),
+ new TableDto()
+ )
+
+ when:
+ this.service.notifyOfTableDeletion(event)
+
+ then:
+ 1 * this.mapper.writeValueAsString(_ as DeleteTableMessage) >> UUID.randomUUID().toString()
+ 1 * this.client.publish(this.tableArn, _ as String) >> new PublishResult()
+ }
+
+ def "Will Notify On Table Rename"() {
+ def event = new MetacatRenameTablePostEvent(
+ this.qName,
+ Mock(MetacatRequestContext),
+ new TableDto(),
+ new TableDto()
+ )
+
+ when:
+ this.service.notifyOfTableRename(event)
+
+ then:
+ 2 * this.mapper.valueToTree(_ as TableDto) >> new TextNode(UUID.randomUUID().toString())
+ 1 * this.mapper.writeValueAsString(_ as UpdateTableMessage) >> UUID.randomUUID().toString()
+ 1 * this.client.publish(this.tableArn, _ as String) >> new PublishResult()
+ }
+
+ def "Will Notify On Table Update"() {
+ def event = new MetacatUpdateTablePostEvent(
+ this.qName,
+ Mock(MetacatRequestContext),
+ new TableDto(),
+ new TableDto()
+ )
+
+ when:
+ this.service.notifyOfTableUpdate(event)
+
+ then:
+ 2 * this.mapper.valueToTree(_ as TableDto) >> new TextNode(UUID.randomUUID().toString())
+ 1 * this.mapper.writeValueAsString(_ as UpdateTableMessage) >> UUID.randomUUID().toString()
+ 1 * this.client.publish(this.tableArn, _ as String) >> new PublishResult()
+ }
+
+ def "Will retry on ThrottledException"() {
+ def event = new MetacatCreateTablePostEvent(
+ this.qName,
+ Mock(MetacatRequestContext),
+ new TableDto()
+ )
+
+ when:
+ this.service.notifyOfTableCreation(event)
+
+ then:
+ 3 * this.mapper.writeValueAsString(_ as CreateTableMessage) >> UUID.randomUUID().toString()
+ 3 * this.client.publish(this.tableArn, _ as String) >> { throw new ThrottledException("Exception") }
+ }
+
+ def "Will retry on InternalException"() {
+ def event = new MetacatCreateTablePostEvent(
+ this.qName,
+ Mock(MetacatRequestContext),
+ new TableDto()
+ )
+
+ when:
+ this.service.notifyOfTableCreation(event)
+
+ then:
+ 3 * this.mapper.writeValueAsString(_ as CreateTableMessage) >> UUID.randomUUID().toString()
+ 3 * this.client.publish(this.tableArn, _ as String) >> { throw new InternalErrorException("Exception") }
+ }
+
+ def "Won't retry on Other Exception"() {
+ def event = new MetacatCreateTablePostEvent(
+ this.qName,
+ Mock(MetacatRequestContext),
+ new TableDto()
+ )
+
+ when:
+ this.service.notifyOfTableCreation(event)
+
+ then:
+ 1 * this.mapper.writeValueAsString(_ as CreateTableMessage) >> UUID.randomUUID().toString()
+ 1 * this.client.publish(this.tableArn, _ as String) >> { throw new NotFoundException("Exception") }
+ }
+}
diff --git a/scripts/stop_metacat_test_cluster.sh b/scripts/stop_metacat_test_cluster.sh
index 7f6c89d4a..3bdcc522c 100755
--- a/scripts/stop_metacat_test_cluster.sh
+++ b/scripts/stop_metacat_test_cluster.sh
@@ -5,23 +5,8 @@ set -x
# Usage: ./stopCluster.sh docker-compose.yml
COMPOSE_FILE=$1
-docker-compose --file $COMPOSE_FILE stop -t 30
+docker-compose --file $COMPOSE_FILE down
if [ $? -ne 0 ]; then
- echo "Unable to stop docker-compose"
+ echo "Unable to bring down docker-compose"
exit 9
fi
-
-docker-compose --file $COMPOSE_FILE rm -f
-if [ $? -ne 0 ]; then
- echo "Unable to remove docker compose containers"
- exit 10
-fi
-
-TEST_CONTAINERS=$(docker ps -a -q --filter "label=com.netflix.metacat.oss.test=true")
-if [[ "$TEST_CONTAINER" != "" ]]; then
- docker rm -f $TEST_CONTAINERS
- if [ $? -ne 0 ]; then
- echo "Unable to remove metacat-test docker containers"
- exit 11
- fi
-fi