From fad6f371ea2e4b01d6ff53886898c66cfc5eb32e Mon Sep 17 00:00:00 2001 From: Tom Gianos Date: Mon, 5 Dec 2016 13:26:32 -0800 Subject: [PATCH] Adding a notification service with a SNS implementation (#17) --- .editorconfig | 5 +- .../common/monitoring/CounterWrapper.java | 4 +- .../common/server/ArchaiusConfigImpl.java | 32 ++ .../netflix/metacat/common/server/Config.java | 22 ++ .../metacat/common/MetacatRequestContext.java | 6 + .../dto/notifications/package-info.java | 25 ++ .../dto/notifications/sns/SNSMessage.java | 70 +++++ .../notifications/sns/SNSMessageFactory.java | 83 +++++ .../dto/notifications/sns/SNSMessageType.java | 56 ++++ .../sns/messages/AddPartitionMessage.java | 57 ++++ .../sns/messages/CreateTableMessage.java | 57 ++++ .../sns/messages/DeletePartitionMessage.java | 56 ++++ .../sns/messages/DeleteTableMessage.java | 57 ++++ .../sns/messages/UpdateTableMessage.java | 58 ++++ .../UpdateTablePartitionsMessage.java | 55 ++++ .../sns/messages/package-info.java | 25 ++ .../dto/notifications/sns/package-info.java | 25 ++ .../TablePartitionsUpdatePayload.java | 51 +++ .../sns/payloads/UpdatePayload.java | 57 ++++ .../sns/payloads/package-info.java | 25 ++ .../sns/SNSMessageFactorySpec.groovy | 121 ++++++++ .../messages/CreateTableMessageSpec.groovy | 77 +++++ .../sns/payloads/UpdatePayloadSpec.groovy | 72 +++++ .../init/MetacatInitializationService.java | 22 +- .../metacat/main/services/ServicesModule.java | 3 + .../DefaultNotificationServiceImpl.java | 89 ++++++ .../notifications/NotificationService.java | 86 ++++++ .../notifications/NotificationsModule.java | 38 +++ .../services/notifications/package-info.java | 25 ++ .../sns/SNSNotificationServiceImpl.java | 292 ++++++++++++++++++ .../SNSNotificationServiceImplProvider.java | 97 ++++++ .../sns/SNSNotificationsModule.java | 46 +++ .../notifications/sns/package-info.java | 24 ++ .../search/ElasticSearchMetacatRefresh.java | 119 ++++--- ...NotificationServiceImplProviderSpec.groovy | 88 ++++++ .../sns/SNSNotificationServiceImplSpec.groovy | 226 ++++++++++++++ scripts/stop_metacat_test_cluster.sh | 19 +- 37 files changed, 2197 insertions(+), 73 deletions(-) create mode 100644 metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/package-info.java create mode 100644 metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/SNSMessage.java create mode 100644 metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/SNSMessageFactory.java create mode 100644 metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/SNSMessageType.java create mode 100644 metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/messages/AddPartitionMessage.java create mode 100644 metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/messages/CreateTableMessage.java create mode 100644 metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/messages/DeletePartitionMessage.java create mode 100644 metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/messages/DeleteTableMessage.java create mode 100644 metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/messages/UpdateTableMessage.java create mode 100644 metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/messages/UpdateTablePartitionsMessage.java create mode 100644 metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/messages/package-info.java create mode 100644 metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/package-info.java create mode 100644 metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/payloads/TablePartitionsUpdatePayload.java create mode 100644 metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/payloads/UpdatePayload.java create mode 100644 metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/payloads/package-info.java create mode 100644 metacat-common/src/test/groovy/com/netflix/metacat/common/dto/notifications/sns/SNSMessageFactorySpec.groovy create mode 100644 metacat-common/src/test/groovy/com/netflix/metacat/common/dto/notifications/sns/messages/CreateTableMessageSpec.groovy create mode 100644 metacat-common/src/test/groovy/com/netflix/metacat/common/dto/notifications/sns/payloads/UpdatePayloadSpec.groovy create mode 100644 metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/DefaultNotificationServiceImpl.java create mode 100644 metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/NotificationService.java create mode 100644 metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/NotificationsModule.java create mode 100644 metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/package-info.java create mode 100644 metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/sns/SNSNotificationServiceImpl.java create mode 100644 metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/sns/SNSNotificationServiceImplProvider.java create mode 100644 metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/sns/SNSNotificationsModule.java create mode 100644 metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/sns/package-info.java create mode 100644 metacat-main/src/test/groovy/com/netflix/metacat/main/services/notifications/sns/SNSNotificationServiceImplProviderSpec.groovy create mode 100644 metacat-main/src/test/groovy/com/netflix/metacat/main/services/notifications/sns/SNSNotificationServiceImplSpec.groovy diff --git a/.editorconfig b/.editorconfig index cf0174ddd..1be7f77f9 100644 --- a/.editorconfig +++ b/.editorconfig @@ -10,6 +10,9 @@ indent_style = space [*.java] indent_size = 4 +[*.groovy] +indent_size = 4 + [*.py] indent_size = 4 @@ -23,4 +26,4 @@ indent_size = 2 indent_size = 4 [*.yml] -indent_size = 2 \ No newline at end of file +indent_size = 2 diff --git a/metacat-common-server/src/main/java/com/netflix/metacat/common/monitoring/CounterWrapper.java b/metacat-common-server/src/main/java/com/netflix/metacat/common/monitoring/CounterWrapper.java index ed368e2aa..966e0673d 100644 --- a/metacat-common-server/src/main/java/com/netflix/metacat/common/monitoring/CounterWrapper.java +++ b/metacat-common-server/src/main/java/com/netflix/metacat/common/monitoring/CounterWrapper.java @@ -49,7 +49,8 @@ private CounterWrapper() { /** * Increments the servo counter. - * @param counterName counter name + * + * @param counterName counter name * @param incrementAmount increment value */ public static void incrementCounter(final String counterName, final long incrementAmount) { @@ -67,6 +68,7 @@ public static void incrementCounter(final String counterName, final long increme /** * Increments the servo counter by 1. + * * @param counterName counter name */ public static void incrementCounter(final String counterName) { diff --git a/metacat-common-server/src/main/java/com/netflix/metacat/common/server/ArchaiusConfigImpl.java b/metacat-common-server/src/main/java/com/netflix/metacat/common/server/ArchaiusConfigImpl.java index 7e0008d27..580e845b4 100644 --- a/metacat-common-server/src/main/java/com/netflix/metacat/common/server/ArchaiusConfigImpl.java +++ b/metacat-common-server/src/main/java/com/netflix/metacat/common/server/ArchaiusConfigImpl.java @@ -63,6 +63,9 @@ public class ArchaiusConfigImpl implements Config { private final DynamicBooleanProperty canSoftDeleteDataMetadata; private final DynamicBooleanProperty canCascadeViewsMetadataOnTableDelete; private final DynamicIntProperty userMetadataMaxInClauseItems; + private final DynamicBooleanProperty snsEnabled; + private final DynamicStringProperty snsTopicTableArn; + private final DynamicStringProperty snsTopicPartitionArn; /** * Default constructor. @@ -126,6 +129,11 @@ public ArchaiusConfigImpl(final DynamicPropertyFactory factory) { this.canCascadeViewsMetadataOnTableDelete = factory .getBooleanProperty("metacat.table.delete.cascade.views.metadata", true); this.userMetadataMaxInClauseItems = factory.getIntProperty("metacat.user.metadata.max_in_clause_items", 2500); + this.snsEnabled = factory.getBooleanProperty("metacat.notifications.sns.enabled", false); + this.snsTopicTableArn + = factory.getStringProperty("metacat.notifications.sns.topic.table.arn", null); + this.snsTopicPartitionArn + = factory.getStringProperty("metacat.notifications.sns.topic.partition.arn", null); } private void setQualifiedNamesToElasticSearchRefreshExcludeQualifiedNames() { @@ -315,4 +323,28 @@ public boolean canCascadeViewsMetadataOnTableDelete() { public int getUserMetadataMaxInClauseItems() { return userMetadataMaxInClauseItems.get(); } + + /** + * {@inheritDoc} + */ + @Override + public boolean isSnsNotificationEnabled() { + return this.snsEnabled.get(); + } + + /** + * {@inheritDoc} + */ + @Override + public String getSnsTopicTableArn() { + return this.snsTopicTableArn.get(); + } + + /** + * {@inheritDoc} + */ + @Override + public String getSnsTopicPartitionArn() { + return this.snsTopicPartitionArn.get(); + } } diff --git a/metacat-common-server/src/main/java/com/netflix/metacat/common/server/Config.java b/metacat-common-server/src/main/java/com/netflix/metacat/common/server/Config.java index 70e164bfd..74d16ce84 100644 --- a/metacat-common-server/src/main/java/com/netflix/metacat/common/server/Config.java +++ b/metacat-common-server/src/main/java/com/netflix/metacat/common/server/Config.java @@ -176,4 +176,26 @@ public interface Config { * @return Max. number of in clause items in user metadata service queries */ int getUserMetadataMaxInClauseItems(); + + /** + * Whether or not notifications should be published to SNS. If this is enabled the table topic ARN and + * partition topic arn must also exist or SNS won't be enabled. + * + * @return Whether SNS notifications should be enabled + */ + boolean isSnsNotificationEnabled(); + + /** + * Get the AWS ARN for the SNS topic to publish to for table related notifications. + * + * @return The table topic ARN or null if no property set + */ + String getSnsTopicTableArn(); + + /** + * Get the AWS ARN for the SNS topic to publish to for partition related notifications. + * + * @return The partition topic ARN or null if no property set + */ + String getSnsTopicPartitionArn(); } diff --git a/metacat-common/src/main/java/com/netflix/metacat/common/MetacatRequestContext.java b/metacat-common/src/main/java/com/netflix/metacat/common/MetacatRequestContext.java index 366c77172..686e8616d 100644 --- a/metacat-common/src/main/java/com/netflix/metacat/common/MetacatRequestContext.java +++ b/metacat-common/src/main/java/com/netflix/metacat/common/MetacatRequestContext.java @@ -19,6 +19,9 @@ import lombok.Data; +import java.util.Date; +import java.util.UUID; + /** * The context of the request to Metacat. * @@ -44,6 +47,9 @@ public class MetacatRequestContext { */ public static final String HEADER_KEY_DATA_TYPE_CONTEXT = "X-Netflix.data.type.context"; + private final String id = UUID.randomUUID().toString(); + // TODO: Move to Java 8 and use java.time.Instant + private final long timestamp = new Date().getTime(); private final String userName; private final String clientAppName; private final String clientId; diff --git a/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/package-info.java b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/package-info.java new file mode 100644 index 000000000..738f4c0a5 --- /dev/null +++ b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/package-info.java @@ -0,0 +1,25 @@ +/* + * + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/** + * Data transfer objects related to notification events. + * + * @author tgianos + * @since 0.1.47 + */ +package com.netflix.metacat.common.dto.notifications; diff --git a/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/SNSMessage.java b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/SNSMessage.java new file mode 100644 index 000000000..6f085f74c --- /dev/null +++ b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/SNSMessage.java @@ -0,0 +1,70 @@ +/* + * + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package com.netflix.metacat.common.dto.notifications.sns; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.netflix.metacat.common.dto.BaseDto; +import lombok.Getter; +import lombok.ToString; + +/** + * Base SNS notification DTO with shared fields. + * + * @param

The type of payload this notification has + * @author tgianos + * @since 0.1.47 + */ +@Getter +@ToString +public class SNSMessage

extends BaseDto { + + private final String id; + private final long timestamp; + private final String requestId; + private final String name; + private final SNSMessageType type; + private final P payload; + + /** + * Create a new SNSMessage. + * + * @param id The unique id of the message + * @param timestamp The number of milliseconds since epoch that this message occurred + * @param requestId The id of the API request that generated this and possibly other messages. Used for grouping + * @param type The type of notification + * @param name The qualified name of the resource that this notification is being generated for + * @param payload The payload of the notification + */ + @JsonCreator + public SNSMessage( + @JsonProperty("id") final String id, + @JsonProperty("timestamp") final long timestamp, + @JsonProperty("requestId") final String requestId, + @JsonProperty("type") final SNSMessageType type, + @JsonProperty("name") final String name, + @JsonProperty("payload") final P payload + ) { + this.id = id; + this.timestamp = timestamp; + this.requestId = requestId; + this.type = type; + this.name = name; + this.payload = payload; + } +} diff --git a/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/SNSMessageFactory.java b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/SNSMessageFactory.java new file mode 100644 index 000000000..a430567a1 --- /dev/null +++ b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/SNSMessageFactory.java @@ -0,0 +1,83 @@ +/* + * + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package com.netflix.metacat.common.dto.notifications.sns; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.netflix.metacat.common.dto.notifications.sns.messages.AddPartitionMessage; +import com.netflix.metacat.common.dto.notifications.sns.messages.CreateTableMessage; +import com.netflix.metacat.common.dto.notifications.sns.messages.DeletePartitionMessage; +import com.netflix.metacat.common.dto.notifications.sns.messages.DeleteTableMessage; +import com.netflix.metacat.common.dto.notifications.sns.messages.UpdateTableMessage; +import com.netflix.metacat.common.dto.notifications.sns.messages.UpdateTablePartitionsMessage; + +import java.io.IOException; + +/** + * Create SNSMessage object based on the JSON passed in. + * + * @author tgianos + * @since 0.1.47 + */ +public class SNSMessageFactory { + + private static final String TYPE_FIELD = "type"; + private final ObjectMapper mapper; + + /** + * Constructor. + * + * @param mapper The object mapper to use for deserialization + */ + public SNSMessageFactory(final ObjectMapper mapper) { + this.mapper = mapper; + } + + /** + * Convert a JSON String into a message if possible. + * + * @param json The body of the message to convert back to the original message object from JSON string + * @return The message bound back into a POJO + * @throws IOException When the input isn't valid JSON + */ + public SNSMessage getMessage(final String json) throws IOException { + final JsonNode object = this.mapper.readTree(json); + if (object.has(TYPE_FIELD)) { + final SNSMessageType messageType = SNSMessageType.valueOf(object.get(TYPE_FIELD).asText()); + switch (messageType) { + case TABLE_CREATE: + return this.mapper.readValue(json, CreateTableMessage.class); + case TABLE_DELETE: + return this.mapper.readValue(json, DeleteTableMessage.class); + case TABLE_UPDATE: + return this.mapper.readValue(json, UpdateTableMessage.class); + case TABLE_PARTITIONS_UPDATE: + return this.mapper.readValue(json, UpdateTablePartitionsMessage.class); + case PARTITION_ADD: + return this.mapper.readValue(json, AddPartitionMessage.class); + case PARTITION_DELETE: + return this.mapper.readValue(json, DeletePartitionMessage.class); + default: + throw new UnsupportedOperationException("Unknown type " + messageType); + } + } else { + // won't know how to bind + throw new IOException("Invalid content. No field type field found"); + } + } +} diff --git a/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/SNSMessageType.java b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/SNSMessageType.java new file mode 100644 index 000000000..c45c2ab24 --- /dev/null +++ b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/SNSMessageType.java @@ -0,0 +1,56 @@ +/* + * + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package com.netflix.metacat.common.dto.notifications.sns; + +/** + * Enumeration of the various types of SNS events there can be. + * + * @author tgianos + * @since 0.1.47 + */ +public enum SNSMessageType { + /** + * When a table is created. + */ + TABLE_CREATE, + + /** + * When a table is deleted. + */ + TABLE_DELETE, + + /** + * When the metadata about a table is updated somehow. + */ + TABLE_UPDATE, + + /** + * When the partitions for a table are either created or deleted. + */ + TABLE_PARTITIONS_UPDATE, + + /** + * When a partition is added. + */ + PARTITION_ADD, + + /** + * When a partition is deleted. + */ + PARTITION_DELETE +} diff --git a/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/messages/AddPartitionMessage.java b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/messages/AddPartitionMessage.java new file mode 100644 index 000000000..d3c274618 --- /dev/null +++ b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/messages/AddPartitionMessage.java @@ -0,0 +1,57 @@ +/* + * + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package com.netflix.metacat.common.dto.notifications.sns.messages; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.netflix.metacat.common.dto.PartitionDto; +import com.netflix.metacat.common.dto.notifications.sns.SNSMessage; +import com.netflix.metacat.common.dto.notifications.sns.SNSMessageType; +import lombok.Getter; +import lombok.ToString; + +/** + * A message sent when a partition is created. + * + * @author tgianos + * @since 0.1.47 + */ +@Getter +@ToString(callSuper = true) +public class AddPartitionMessage extends SNSMessage { + + /** + * Create a new AddPartitionMessage. + * + * @param id The unique id of the message + * @param timestamp The number of milliseconds since epoch that this message occurred + * @param requestId The id of the API request that generated this and possibly other messages. Used for grouping + * @param name The qualified name of the resource that this notification is being generated for + * @param payload The payload of the notification + */ + @JsonCreator + public AddPartitionMessage( + @JsonProperty("id") final String id, + @JsonProperty("timestamp") final long timestamp, + @JsonProperty("requestId") final String requestId, + @JsonProperty("name") final String name, + @JsonProperty("payload") final PartitionDto payload + ) { + super(id, timestamp, requestId, SNSMessageType.PARTITION_ADD, name, payload); + } +} diff --git a/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/messages/CreateTableMessage.java b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/messages/CreateTableMessage.java new file mode 100644 index 000000000..b8ead7316 --- /dev/null +++ b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/messages/CreateTableMessage.java @@ -0,0 +1,57 @@ +/* + * + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package com.netflix.metacat.common.dto.notifications.sns.messages; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.netflix.metacat.common.dto.TableDto; +import com.netflix.metacat.common.dto.notifications.sns.SNSMessage; +import com.netflix.metacat.common.dto.notifications.sns.SNSMessageType; +import lombok.Getter; +import lombok.ToString; + +/** + * A message sent when a table is created. + * + * @author tgianos + * @since 0.1.47 + */ +@Getter +@ToString(callSuper = true) +public class CreateTableMessage extends SNSMessage { + + /** + * Create a new CreateTableMessage. + * + * @param id The unique id of the message + * @param timestamp The number of milliseconds since epoch that this message occurred + * @param requestId The id of the API request that generated this and possibly other messages. Used for grouping + * @param name The qualified name of the resource that this notification is being generated for + * @param payload The payload of the notification + */ + @JsonCreator + public CreateTableMessage( + @JsonProperty("id") final String id, + @JsonProperty("timestamp") final long timestamp, + @JsonProperty("requestId") final String requestId, + @JsonProperty("name") final String name, + @JsonProperty("payload") final TableDto payload + ) { + super(id, timestamp, requestId, SNSMessageType.TABLE_CREATE, name, payload); + } +} diff --git a/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/messages/DeletePartitionMessage.java b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/messages/DeletePartitionMessage.java new file mode 100644 index 000000000..46cdb8d83 --- /dev/null +++ b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/messages/DeletePartitionMessage.java @@ -0,0 +1,56 @@ +/* + * + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package com.netflix.metacat.common.dto.notifications.sns.messages; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.netflix.metacat.common.dto.notifications.sns.SNSMessage; +import com.netflix.metacat.common.dto.notifications.sns.SNSMessageType; +import lombok.Getter; +import lombok.ToString; + +/** + * A message sent when a partition is deleted. + * + * @author tgianos + * @since 0.1.47 + */ +@Getter +@ToString(callSuper = true) +public class DeletePartitionMessage extends SNSMessage { + + /** + * Create a new DeletePartitionMessage. + * + * @param id The unique id of the message + * @param timestamp The number of milliseconds since epoch that this message occurred + * @param requestId The id of the API request that generated this and possibly other messages. Used for grouping + * @param name The qualified name of the resource that this notification is being generated for + * @param payload The payload of the notification + */ + @JsonCreator + public DeletePartitionMessage( + @JsonProperty("id") final String id, + @JsonProperty("timestamp") final long timestamp, + @JsonProperty("requestId") final String requestId, + @JsonProperty("name") final String name, + @JsonProperty("payload") final String payload + ) { + super(id, timestamp, requestId, SNSMessageType.PARTITION_DELETE, name, payload); + } +} diff --git a/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/messages/DeleteTableMessage.java b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/messages/DeleteTableMessage.java new file mode 100644 index 000000000..1521890d8 --- /dev/null +++ b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/messages/DeleteTableMessage.java @@ -0,0 +1,57 @@ +/* + * + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package com.netflix.metacat.common.dto.notifications.sns.messages; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.netflix.metacat.common.dto.TableDto; +import com.netflix.metacat.common.dto.notifications.sns.SNSMessage; +import com.netflix.metacat.common.dto.notifications.sns.SNSMessageType; +import lombok.Getter; +import lombok.ToString; + +/** + * A message sent when a table is deleted. + * + * @author tgianos + * @since 0.1.47 + */ +@Getter +@ToString(callSuper = true) +public class DeleteTableMessage extends SNSMessage { + + /** + * Create a new DeleteTableMessage. + * + * @param id The unique id of the message + * @param timestamp The number of milliseconds since epoch that this message occurred + * @param requestId The id of the API request that generated this and possibly other messages. Used for grouping + * @param name The qualified name of the resource that this notification is being generated for + * @param payload The payload of the notification + */ + @JsonCreator + public DeleteTableMessage( + @JsonProperty("id") final String id, + @JsonProperty("timestamp") final long timestamp, + @JsonProperty("requestId") final String requestId, + @JsonProperty("name") final String name, + @JsonProperty("payload") final TableDto payload + ) { + super(id, timestamp, requestId, SNSMessageType.TABLE_DELETE, name, payload); + } +} diff --git a/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/messages/UpdateTableMessage.java b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/messages/UpdateTableMessage.java new file mode 100644 index 000000000..f904e71f2 --- /dev/null +++ b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/messages/UpdateTableMessage.java @@ -0,0 +1,58 @@ +/* + * + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package com.netflix.metacat.common.dto.notifications.sns.messages; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.netflix.metacat.common.dto.TableDto; +import com.netflix.metacat.common.dto.notifications.sns.SNSMessage; +import com.netflix.metacat.common.dto.notifications.sns.SNSMessageType; +import com.netflix.metacat.common.dto.notifications.sns.payloads.UpdatePayload; +import lombok.Getter; +import lombok.ToString; + +/** + * A message sent when a table is updated. + * + * @author tgianos + * @since 0.1.47 + */ +@Getter +@ToString(callSuper = true) +public class UpdateTableMessage extends SNSMessage> { + + /** + * Create a new UpdateTableMessage. + * + * @param id The unique id of the message + * @param timestamp The number of milliseconds since epoch that this message occurred + * @param requestId The id of the API request that generated this and possibly other messages. Used for grouping + * @param name The qualified name of the resource that this notification is being generated for + * @param payload The payload of the notification + */ + @JsonCreator + public UpdateTableMessage( + @JsonProperty("id") final String id, + @JsonProperty("timestamp") final long timestamp, + @JsonProperty("requestId") final String requestId, + @JsonProperty("name") final String name, + @JsonProperty("payload") final UpdatePayload payload + ) { + super(id, timestamp, requestId, SNSMessageType.TABLE_UPDATE, name, payload); + } +} diff --git a/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/messages/UpdateTablePartitionsMessage.java b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/messages/UpdateTablePartitionsMessage.java new file mode 100644 index 000000000..584e637a5 --- /dev/null +++ b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/messages/UpdateTablePartitionsMessage.java @@ -0,0 +1,55 @@ +/* + * + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package com.netflix.metacat.common.dto.notifications.sns.messages; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.netflix.metacat.common.dto.notifications.sns.SNSMessage; +import com.netflix.metacat.common.dto.notifications.sns.SNSMessageType; +import com.netflix.metacat.common.dto.notifications.sns.payloads.TablePartitionsUpdatePayload; +import lombok.Getter; +import lombok.ToString; + +/** + * Message sent when the partitions for a table are updated. + * + * @author tgianos + * @since 0.1.47 + */ +@Getter +@ToString(callSuper = true) +public class UpdateTablePartitionsMessage extends SNSMessage { + + /** + * Create a new UpdateTablePartitionsMessage. + * + * @param id The unique id of the message + * @param timestamp The number of milliseconds since epoch that this message occurred + * @param requestId The id of the API request that generated this and possibly other messages. Used for grouping + * @param name The qualified name of the resource that this notification is being generated for + * @param payload The payload of the notification + */ + public UpdateTablePartitionsMessage( + @JsonProperty("id") final String id, + @JsonProperty("timestamp") final long timestamp, + @JsonProperty("requestId") final String requestId, + @JsonProperty("name") final String name, + @JsonProperty("payload") final TablePartitionsUpdatePayload payload + ) { + super(id, timestamp, requestId, SNSMessageType.TABLE_PARTITIONS_UPDATE, name, payload); + } +} diff --git a/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/messages/package-info.java b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/messages/package-info.java new file mode 100644 index 000000000..7c09f5700 --- /dev/null +++ b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/messages/package-info.java @@ -0,0 +1,25 @@ +/* + * + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/** + * Specific notification messages which extend SNSMessage. + * + * @author tgianos + * @since 0.1.46 + */ +package com.netflix.metacat.common.dto.notifications.sns.messages; diff --git a/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/package-info.java b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/package-info.java new file mode 100644 index 000000000..5166f19aa --- /dev/null +++ b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/package-info.java @@ -0,0 +1,25 @@ +/* + * + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/** + * Data Transfer Objects (DTO) for AWS SNS notifications from Metacat. + * + * @author tgianos + * @since 0.1.47 + */ +package com.netflix.metacat.common.dto.notifications.sns; diff --git a/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/payloads/TablePartitionsUpdatePayload.java b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/payloads/TablePartitionsUpdatePayload.java new file mode 100644 index 000000000..b1780c737 --- /dev/null +++ b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/payloads/TablePartitionsUpdatePayload.java @@ -0,0 +1,51 @@ +/* + * + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package com.netflix.metacat.common.dto.notifications.sns.payloads; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Getter; +import lombok.ToString; + +/** + * Information about how the partitions have changed when a table was updated. + * + * @author tgianos + * @since 0.1.47 + */ +@Getter +@ToString +public class TablePartitionsUpdatePayload { + private final int numCreatedPartitions; + private final int numDeletedPartitions; + + /** + * Constructor. + * + * @param numCreatedPartitions The number of partitions that were created for the table + * @param numDeletedPartitions The number of partitions that were deleted from the table + */ + @JsonCreator + public TablePartitionsUpdatePayload( + @JsonProperty("numCreatedPartitions") final int numCreatedPartitions, + @JsonProperty("numDeletedPartitions") final int numDeletedPartitions + ) { + this.numCreatedPartitions = numCreatedPartitions; + this.numDeletedPartitions = numDeletedPartitions; + } +} diff --git a/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/payloads/UpdatePayload.java b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/payloads/UpdatePayload.java new file mode 100644 index 000000000..defeaca8e --- /dev/null +++ b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/payloads/UpdatePayload.java @@ -0,0 +1,57 @@ +/* + * + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package com.netflix.metacat.common.dto.notifications.sns.payloads; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.github.fge.jsonpatch.JsonPatch; +import lombok.Getter; +import lombok.ToString; + +/** + * Represents the contents of an update payload. + * + * @param The DTO type that was update. e.g. com.netflix.metacat.common.dto.TableDto + * @author tgianos + * @since 0.1.47 + */ +@Getter +@ToString +public class UpdatePayload { + private T previous; + private JsonPatch patch; + private T current; + + /** + * Create a new update payload. + * + * @param previous The previous version of the object that was updated + * @param patch The JSON patch to go from previous to current + * @param current The current version of the object that was updated + */ + @JsonCreator + public UpdatePayload( + @JsonProperty("previous") final T previous, + @JsonProperty("patch") final JsonPatch patch, + @JsonProperty("current") final T current + ) { + this.previous = previous; + this.patch = patch; + this.current = current; + } +} diff --git a/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/payloads/package-info.java b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/payloads/package-info.java new file mode 100644 index 000000000..d1b431e62 --- /dev/null +++ b/metacat-common/src/main/java/com/netflix/metacat/common/dto/notifications/sns/payloads/package-info.java @@ -0,0 +1,25 @@ +/* + * + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/** + * Various payload representations for SNS Notifications. + * + * @author tgianos + * @since 0.1.47 + */ +package com.netflix.metacat.common.dto.notifications.sns.payloads; diff --git a/metacat-common/src/test/groovy/com/netflix/metacat/common/dto/notifications/sns/SNSMessageFactorySpec.groovy b/metacat-common/src/test/groovy/com/netflix/metacat/common/dto/notifications/sns/SNSMessageFactorySpec.groovy new file mode 100644 index 000000000..5de6801b1 --- /dev/null +++ b/metacat-common/src/test/groovy/com/netflix/metacat/common/dto/notifications/sns/SNSMessageFactorySpec.groovy @@ -0,0 +1,121 @@ +/* + * + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package com.netflix.metacat.common.dto.notifications.sns + +import com.fasterxml.jackson.databind.ObjectMapper +import com.github.fge.jsonpatch.diff.JsonDiff +import com.netflix.metacat.common.dto.PartitionDto +import com.netflix.metacat.common.dto.TableDto +import com.netflix.metacat.common.dto.notifications.sns.messages.* +import com.netflix.metacat.common.dto.notifications.sns.payloads.TablePartitionsUpdatePayload +import com.netflix.metacat.common.dto.notifications.sns.payloads.UpdatePayload +import spock.lang.Shared +import spock.lang.Specification +import spock.lang.Unroll + +import java.time.Instant + +/** + * Tests for SNSMessageFactory. + * + * @author tgianos + * @since 0.1.47 + */ +class SNSMessageFactorySpec extends Specification { + @Shared + def mapper = new ObjectMapper() + + def factory = new SNSMessageFactory(this.mapper) + + + @Unroll + def "Can Deserialize #clazz from #message"() { + when: + def obj = this.factory.getMessage(message) + + then: + clazz.isInstance(obj) + + where: + message | clazz + this.mapper.writeValueAsString( + new CreateTableMessage( + UUID.randomUUID().toString(), + Instant.now().toEpochMilli(), + UUID.randomUUID().toString(), + UUID.randomUUID().toString(), + new TableDto() + ) + ) | CreateTableMessage.class + this.mapper.writeValueAsString( + new DeleteTableMessage( + UUID.randomUUID().toString(), + Instant.now().toEpochMilli(), + UUID.randomUUID().toString(), + UUID.randomUUID().toString(), + new TableDto() + ) + ) | DeleteTableMessage.class + this.mapper.writeValueAsString( + new UpdateTableMessage( + UUID.randomUUID().toString(), + Instant.now().toEpochMilli(), + UUID.randomUUID().toString(), + UUID.randomUUID().toString(), + new UpdatePayload( + new TableDto(), + JsonDiff.asJsonPatch( + this.mapper.readTree("{\"a\":\"b\"}"), + this.mapper.readTree("{\"a\":\"c\"}") + ), + new TableDto() + ) + ) + ) | UpdateTableMessage.class + this.mapper.writeValueAsString( + new UpdateTablePartitionsMessage( + UUID.randomUUID().toString(), + Instant.now().toEpochMilli(), + UUID.randomUUID().toString(), + UUID.randomUUID().toString(), + new TablePartitionsUpdatePayload( + 10, + 15 + ) + ) + ) | UpdateTablePartitionsMessage.class + this.mapper.writeValueAsString( + new AddPartitionMessage( + UUID.randomUUID().toString(), + Instant.now().toEpochMilli(), + UUID.randomUUID().toString(), + UUID.randomUUID().toString(), + new PartitionDto() + ) + ) | AddPartitionMessage.class + this.mapper.writeValueAsString( + new DeletePartitionMessage( + UUID.randomUUID().toString(), + Instant.now().toEpochMilli(), + UUID.randomUUID().toString(), + UUID.randomUUID().toString(), + UUID.randomUUID().toString() + ) + ) | DeletePartitionMessage.class + } +} diff --git a/metacat-common/src/test/groovy/com/netflix/metacat/common/dto/notifications/sns/messages/CreateTableMessageSpec.groovy b/metacat-common/src/test/groovy/com/netflix/metacat/common/dto/notifications/sns/messages/CreateTableMessageSpec.groovy new file mode 100644 index 000000000..7d0ce233d --- /dev/null +++ b/metacat-common/src/test/groovy/com/netflix/metacat/common/dto/notifications/sns/messages/CreateTableMessageSpec.groovy @@ -0,0 +1,77 @@ +/* + * + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package com.netflix.metacat.common.dto.notifications.sns.messages + +import com.fasterxml.jackson.databind.ObjectMapper +import com.netflix.metacat.common.dto.TableDto +import com.netflix.metacat.common.dto.notifications.sns.SNSMessageType +import spock.lang.Specification + +import java.time.Instant + +/** + * Tests for CreateTableMessage. + * + * @author tgianos + * @since 0.1.47 + */ +class CreateTableMessageSpec extends Specification { + + def mapper = new ObjectMapper() + + def "Can Construct"() { + def id = UUID.randomUUID().toString() + def timestamp = Instant.now().toEpochMilli() + def requestId = UUID.randomUUID().toString() + def name = UUID.randomUUID().toString() + def payload = Mock(TableDto) + + when: + def message = new CreateTableMessage(id, timestamp, requestId, name, payload) + + then: + message != null + message.getId() == id + message.getTimestamp() == timestamp + message.getRequestId() == requestId + message.getName() == name + message.getType() == SNSMessageType.TABLE_CREATE + message.getPayload() == payload + } + + def "Can Serialize and Deserialize"() { + def id = UUID.randomUUID().toString() + def timestamp = Instant.now().toEpochMilli() + def requestId = UUID.randomUUID().toString() + def name = UUID.randomUUID().toString() + def payload = new TableDto() + def message = new CreateTableMessage(id, timestamp, requestId, name, payload) + + when: + def json = this.mapper.writeValueAsString(message) + def message2 = this.mapper.readValue(json, CreateTableMessage.class) + + then: + message.getId() == message2.getId() + message.getTimestamp() == message2.getTimestamp() + message.getRequestId() == message2.getRequestId() + message.getName() == message2.getName() + message.getType() == message2.getType() + message.getPayload() == message2.getPayload() + } +} diff --git a/metacat-common/src/test/groovy/com/netflix/metacat/common/dto/notifications/sns/payloads/UpdatePayloadSpec.groovy b/metacat-common/src/test/groovy/com/netflix/metacat/common/dto/notifications/sns/payloads/UpdatePayloadSpec.groovy new file mode 100644 index 000000000..07f0e103a --- /dev/null +++ b/metacat-common/src/test/groovy/com/netflix/metacat/common/dto/notifications/sns/payloads/UpdatePayloadSpec.groovy @@ -0,0 +1,72 @@ +/* + * + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package com.netflix.metacat.common.dto.notifications.sns.payloads + +import com.fasterxml.jackson.core.type.TypeReference +import com.fasterxml.jackson.databind.ObjectMapper +import com.github.fge.jsonpatch.diff.JsonDiff +import com.google.common.collect.Lists +import com.netflix.metacat.common.dto.TableDto +import spock.lang.Specification + +/** + * Tests for the UploadPayload class. + * + * @author tgianos + * @since 0.1.47 + */ +class UpdatePayloadSpec extends Specification { + + def mapper = new ObjectMapper() + + def "can construct with variables saved"() { + def previous = Mock(TableDto) + def current = Mock(TableDto) + def patch = JsonDiff.asJsonPatch(this.mapper.readTree("{\"a\":\"b\"}"), this.mapper.readTree("{\"a\":\"c\"}")) + + when: + def updatePayload = new UpdatePayload(previous, patch, current) + + then: + updatePayload != null + updatePayload.getCurrent() == current + updatePayload.getPatch() == patch + updatePayload.getPrevious() == previous + } + + def "can serialize and deserialize with no data loss"() { + def previous = Lists.newArrayList("one", "three") + def current = Lists.newArrayList("one", "two", "three") + def patch = JsonDiff.asJsonPatch(this.mapper.valueToTree(previous), this.mapper.valueToTree(current)) + UpdatePayload> payload = new UpdatePayload<>(previous, patch, current) + + when: "Serialize to JSON and then back to a POJO and back to JSON" + def json = this.mapper.writeValueAsString(payload) + UpdatePayload> payload2 = this.mapper.readValue( + json, + new TypeReference>>() {} + ) + def json2 = this.mapper.writeValueAsString(payload2) + + then: "Make sure all the values are still equal" + payload.getCurrent() == payload2.getCurrent() + payload.getPatch().toString() == payload2.getPatch().toString() + payload.getPrevious() == payload2.getPrevious() + json == json2 + } +} diff --git a/metacat-main/src/main/java/com/netflix/metacat/main/init/MetacatInitializationService.java b/metacat-main/src/main/java/com/netflix/metacat/main/init/MetacatInitializationService.java index 2144907b6..dd8721053 100644 --- a/metacat-main/src/main/java/com/netflix/metacat/main/init/MetacatInitializationService.java +++ b/metacat-main/src/main/java/com/netflix/metacat/main/init/MetacatInitializationService.java @@ -26,6 +26,7 @@ import com.netflix.metacat.common.util.ThreadServiceManager; import com.netflix.metacat.main.manager.PluginManager; import com.netflix.metacat.main.presto.metadata.CatalogManager; +import com.netflix.metacat.main.services.notifications.NotificationService; import com.netflix.metacat.main.services.search.MetacatEventHandlers; import io.airlift.configuration.ConfigurationFactory; import io.airlift.configuration.ConfigurationProvider; @@ -34,6 +35,7 @@ import javax.inject.Inject; import java.util.Map; +import java.util.Set; /** * Metacat initialization service. @@ -45,17 +47,29 @@ public class MetacatInitializationService { /** * Constructor. - * @param injector injector - * @param config config + * + * @param injector injector + * @param config config + * @param eventBus The event bus to use for internal events + * @param notificationServices The notification service implementations to register for receiving events */ @Inject - public MetacatInitializationService(final Injector injector, final Config config) { + public MetacatInitializationService( + final Injector injector, + final Config config, + final MetacatEventBus eventBus, + final Set notificationServices + ) { this.config = config; this.injector = injector; + + // Register all the services to listen for events + notificationServices.forEach(eventBus::register); } /** * Returns the config factory. + * * @return config factory */ public ConfigurationFactory getConfigurationFactory() { @@ -70,6 +84,7 @@ public ConfigurationFactory getConfigurationFactory() { /** * Metacat service initialization. + * * @throws Exception error */ public void start() throws Exception { @@ -100,6 +115,7 @@ public void start() throws Exception { /** * Metcat service shutdown. + * * @throws Exception error */ public void stop() throws Exception { diff --git a/metacat-main/src/main/java/com/netflix/metacat/main/services/ServicesModule.java b/metacat-main/src/main/java/com/netflix/metacat/main/services/ServicesModule.java index 0cf1f0de8..cc8fae2dc 100644 --- a/metacat-main/src/main/java/com/netflix/metacat/main/services/ServicesModule.java +++ b/metacat-main/src/main/java/com/netflix/metacat/main/services/ServicesModule.java @@ -20,6 +20,7 @@ import com.netflix.metacat.main.services.impl.MViewServiceImpl; import com.netflix.metacat.main.services.impl.PartitionServiceImpl; import com.netflix.metacat.main.services.impl.TableServiceImpl; +import com.netflix.metacat.main.services.notifications.NotificationsModule; import com.netflix.metacat.main.services.search.ElasticSearchClientProvider; import com.netflix.metacat.main.services.search.ElasticSearchMetacatRefresh; import com.netflix.metacat.main.services.search.ElasticSearchUtil; @@ -34,6 +35,8 @@ public class ServicesModule extends AbstractModule { @Override protected void configure() { + install(new NotificationsModule()); + binder().bind(CatalogService.class).to(CatalogServiceImpl.class).in(Scopes.SINGLETON); binder().bind(DatabaseService.class).to(DatabaseServiceImpl.class).in(Scopes.SINGLETON); binder().bind(TableService.class).to(TableServiceImpl.class).in(Scopes.SINGLETON); diff --git a/metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/DefaultNotificationServiceImpl.java b/metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/DefaultNotificationServiceImpl.java new file mode 100644 index 000000000..277b4fc7f --- /dev/null +++ b/metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/DefaultNotificationServiceImpl.java @@ -0,0 +1,89 @@ +/* + * + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package com.netflix.metacat.main.services.notifications; + +import com.netflix.metacat.common.server.events.MetacatCreateTablePostEvent; +import com.netflix.metacat.common.server.events.MetacatDeleteTablePartitionPostEvent; +import com.netflix.metacat.common.server.events.MetacatDeleteTablePostEvent; +import com.netflix.metacat.common.server.events.MetacatRenameTablePostEvent; +import com.netflix.metacat.common.server.events.MetacatSaveTablePartitionPostEvent; +import com.netflix.metacat.common.server.events.MetacatUpdateTablePostEvent; +import lombok.extern.slf4j.Slf4j; + +import javax.validation.constraints.NotNull; + +/** + * This is a default implementation of the NotificationService interface. It doesn't really do anything other than + * log the event that would have generated some sort of external notification in a real instance. This class exists + * primarily to handle returns from providers when the "plugin" isn't enabled instead of returning null which is + * prohibited by the Provider interface definition. + * + * @author tgianos + * @since 0.1.47 + */ +@Slf4j +public class DefaultNotificationServiceImpl implements NotificationService { + + /** + * {@inheritDoc} + */ + @Override + public void notifyOfPartitionAddition(@NotNull final MetacatSaveTablePartitionPostEvent event) { + log.debug(event.toString()); + } + + /** + * {@inheritDoc} + */ + @Override + public void notifyOfPartitionDeletion(@NotNull final MetacatDeleteTablePartitionPostEvent event) { + log.debug(event.toString()); + } + + /** + * {@inheritDoc} + */ + @Override + public void notifyOfTableCreation(@NotNull final MetacatCreateTablePostEvent event) { + log.debug(event.toString()); + } + + /** + * {@inheritDoc} + */ + @Override + public void notifyOfTableDeletion(@NotNull final MetacatDeleteTablePostEvent event) { + log.debug(event.toString()); + } + + /** + * {@inheritDoc} + */ + @Override + public void notifyOfTableRename(@NotNull final MetacatRenameTablePostEvent event) { + log.debug(event.toString()); + } + + /** + * {@inheritDoc} + */ + @Override + public void notifyOfTableUpdate(@NotNull final MetacatUpdateTablePostEvent event) { + log.debug(event.toString()); + } +} diff --git a/metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/NotificationService.java b/metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/NotificationService.java new file mode 100644 index 000000000..c2fcd50e3 --- /dev/null +++ b/metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/NotificationService.java @@ -0,0 +1,86 @@ +/* + * + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package com.netflix.metacat.main.services.notifications; + +import com.google.common.eventbus.Subscribe; +import com.netflix.metacat.common.server.events.MetacatCreateTablePostEvent; +import com.netflix.metacat.common.server.events.MetacatDeleteTablePartitionPostEvent; +import com.netflix.metacat.common.server.events.MetacatDeleteTablePostEvent; +import com.netflix.metacat.common.server.events.MetacatRenameTablePostEvent; +import com.netflix.metacat.common.server.events.MetacatSaveTablePartitionPostEvent; +import com.netflix.metacat.common.server.events.MetacatUpdateTablePostEvent; + +import javax.validation.constraints.NotNull; + +/** + * Interface for services which will provide external notifications based on internal events. The structure and + * destinations of the notifications are left up to the implementation. + * + * @author tgianos + * @since 0.1.47 + */ +public interface NotificationService { + + /** + * Publish information about a partition being added. + * + * @param event The event passed within the JVM after a partition has been successfully added + */ + @Subscribe + void notifyOfPartitionAddition(@NotNull final MetacatSaveTablePartitionPostEvent event); + + /** + * Publish information about a partition being deleted. + * + * @param event The event passed within the JVM after a partition has been successfully deleted + */ + @Subscribe + void notifyOfPartitionDeletion(@NotNull final MetacatDeleteTablePartitionPostEvent event); + + /** + * Publish information about a table being created. + * + * @param event The event passed within the JVM after a table has been successfully created + */ + @Subscribe + void notifyOfTableCreation(@NotNull final MetacatCreateTablePostEvent event); + + /** + * Publish information about a table being deleted. + * + * @param event The event passed within the JVM after a table has been successfully deleted + */ + @Subscribe + void notifyOfTableDeletion(@NotNull final MetacatDeleteTablePostEvent event); + + /** + * Publish information about a table being renamed. + * + * @param event The event passed within the JVM after a table has been successfully renamed + */ + @Subscribe + void notifyOfTableRename(@NotNull final MetacatRenameTablePostEvent event); + + /** + * Publish information about a table being updated. + * + * @param event The event passed within the JVM after a table has been successfully updated + */ + @Subscribe + void notifyOfTableUpdate(@NotNull final MetacatUpdateTablePostEvent event); +} diff --git a/metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/NotificationsModule.java b/metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/NotificationsModule.java new file mode 100644 index 000000000..c14a7e5b7 --- /dev/null +++ b/metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/NotificationsModule.java @@ -0,0 +1,38 @@ +/* + * + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package com.netflix.metacat.main.services.notifications; + +import com.google.inject.AbstractModule; +import com.netflix.metacat.main.services.notifications.sns.SNSNotificationsModule; + +/** + * A module to install all the desired Notification implementations. + * + * @author tgianos + * @since 0.1.47 + */ +public class NotificationsModule extends AbstractModule { + + /** + * {@inheritDoc} + */ + @Override + protected void configure() { + install(new SNSNotificationsModule()); + } +} diff --git a/metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/package-info.java b/metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/package-info.java new file mode 100644 index 000000000..8f4bc585f --- /dev/null +++ b/metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/package-info.java @@ -0,0 +1,25 @@ +/* + * + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/** + * Classes related to sending notifications out of Metacat via implementations of the service interface. + * + * @author tgianos + * @since 0.1.46 + */ +package com.netflix.metacat.main.services.notifications; diff --git a/metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/sns/SNSNotificationServiceImpl.java b/metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/sns/SNSNotificationServiceImpl.java new file mode 100644 index 000000000..58c7d637c --- /dev/null +++ b/metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/sns/SNSNotificationServiceImpl.java @@ -0,0 +1,292 @@ +/* + * + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package com.netflix.metacat.main.services.notifications.sns; + +import com.amazonaws.services.sns.AmazonSNSClient; +import com.amazonaws.services.sns.model.PublishResult; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.github.fge.jsonpatch.JsonPatch; +import com.github.fge.jsonpatch.diff.JsonDiff; +import com.github.rholder.retry.RetryException; +import com.github.rholder.retry.Retryer; +import com.netflix.metacat.common.dto.PartitionDto; +import com.netflix.metacat.common.dto.TableDto; +import com.netflix.metacat.common.dto.notifications.sns.SNSMessage; +import com.netflix.metacat.common.dto.notifications.sns.messages.AddPartitionMessage; +import com.netflix.metacat.common.dto.notifications.sns.messages.CreateTableMessage; +import com.netflix.metacat.common.dto.notifications.sns.messages.DeletePartitionMessage; +import com.netflix.metacat.common.dto.notifications.sns.messages.DeleteTableMessage; +import com.netflix.metacat.common.dto.notifications.sns.messages.UpdateTableMessage; +import com.netflix.metacat.common.dto.notifications.sns.messages.UpdateTablePartitionsMessage; +import com.netflix.metacat.common.dto.notifications.sns.payloads.TablePartitionsUpdatePayload; +import com.netflix.metacat.common.dto.notifications.sns.payloads.UpdatePayload; +import com.netflix.metacat.common.monitoring.CounterWrapper; +import com.netflix.metacat.common.server.events.MetacatCreateTablePostEvent; +import com.netflix.metacat.common.server.events.MetacatDeleteTablePartitionPostEvent; +import com.netflix.metacat.common.server.events.MetacatDeleteTablePostEvent; +import com.netflix.metacat.common.server.events.MetacatRenameTablePostEvent; +import com.netflix.metacat.common.server.events.MetacatSaveTablePartitionPostEvent; +import com.netflix.metacat.common.server.events.MetacatUpdateTablePostEvent; +import com.netflix.metacat.main.services.notifications.NotificationService; +import lombok.extern.slf4j.Slf4j; + +import javax.validation.constraints.NotNull; +import javax.validation.constraints.Size; +import java.io.IOException; +import java.util.UUID; +import java.util.concurrent.ExecutionException; + +/** + * Implementation of the NotificationService using Amazon SNS. + * + * @author tgianos + * @since 0.1.47 + */ +@Slf4j +public class SNSNotificationServiceImpl implements NotificationService { + + private final AmazonSNSClient client; + private final String tableTopicArn; + private final String partitionTopicArn; + private final ObjectMapper mapper; + private final Retryer retry; + + /** + * Constructor. + * + * @param client The SNS client to use to publish notifications + * @param tableTopicArn The topic to publish table related notifications to + * @param partitionTopicArn The topic to publish partition related notifications to + * @param mapper The object mapper to use to convert objects to JSON strings + * @param retry The retry factory to use. Should already be configured + */ + public SNSNotificationServiceImpl( + @NotNull final AmazonSNSClient client, + @NotNull @Size(min = 1) final String tableTopicArn, + @NotNull @Size(min = 1) final String partitionTopicArn, + @NotNull final ObjectMapper mapper, + @NotNull final Retryer retry + ) { + this.client = client; + this.tableTopicArn = tableTopicArn; + this.partitionTopicArn = partitionTopicArn; + this.mapper = mapper; + this.retry = retry; + } + + /** + * {@inheritDoc} + */ + @Override + public void notifyOfPartitionAddition(@NotNull final MetacatSaveTablePartitionPostEvent event) { + try { + final String name = event.getName().toString(); + final long timestamp = event.getRequestContext().getTimestamp(); + final String requestId = event.getRequestContext().getId(); + for (final PartitionDto partition : event.getPartitions()) { + final AddPartitionMessage message = new AddPartitionMessage( + UUID.randomUUID().toString(), + timestamp, + requestId, + name, + partition + ); + this.publishNotification(this.partitionTopicArn, message); + log.debug("Published create partition message {} on {}", message, this.partitionTopicArn); + CounterWrapper.incrementCounter("metacat.notifications.sns.partitions.add.succeeded"); + } + + // Publish a global message stating how many partitions were updated for the table to the table topic + final UpdateTablePartitionsMessage message = new UpdateTablePartitionsMessage( + UUID.randomUUID().toString(), + timestamp, + requestId, + name, + new TablePartitionsUpdatePayload( + event.getPartitions().size(), + 0 + ) + ); + this.publishNotification(this.tableTopicArn, message); + // TODO: In ideal world this this be an injected object to the class so we can mock for tests + // swap out implementations etc. + CounterWrapper.incrementCounter("metacat.notifications.sns.tables.addPartitions.succeeded"); + } catch (final ExecutionException | RetryException e) { + log.error("Unable to publish partition creation notification", e); + CounterWrapper.incrementCounter("metacat.notifications.sns.partitions.add.failed"); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void notifyOfPartitionDeletion(@NotNull final MetacatDeleteTablePartitionPostEvent event) { + try { + final String name = event.getName().toString(); + final long timestamp = event.getRequestContext().getTimestamp(); + final String requestId = event.getRequestContext().getId(); + for (final String partitionId : event.getPartitionIds()) { + final DeletePartitionMessage message = new DeletePartitionMessage( + UUID.randomUUID().toString(), + timestamp, + requestId, + name, + partitionId + ); + this.publishNotification(this.partitionTopicArn, message); + CounterWrapper.incrementCounter("metacat.notifications.sns.partitions.delete.succeeded"); + log.debug("Published delete partition message {} on {}", message, this.partitionTopicArn); + } + + // Publish a global message stating how many partitions were updated for the table to the table topic + final UpdateTablePartitionsMessage message = new UpdateTablePartitionsMessage( + UUID.randomUUID().toString(), + timestamp, + requestId, + name, + new TablePartitionsUpdatePayload( + 0, + event.getPartitionIds().size() + ) + ); + this.publishNotification(this.tableTopicArn, message); + CounterWrapper.incrementCounter("metacat.notifications.sns.tables.deletePartitions.succeeded"); + } catch (final ExecutionException | RetryException e) { + log.error("Unable to publish partition deletion notification", e); + CounterWrapper.incrementCounter("metacat.notifications.sns.partitions.delete.failed"); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void notifyOfTableCreation(@NotNull final MetacatCreateTablePostEvent event) { + try { + final CreateTableMessage message = new CreateTableMessage( + UUID.randomUUID().toString(), + event.getRequestContext().getTimestamp(), + event.getRequestContext().getId(), + event.getName().toString(), + event.getTable() + ); + this.publishNotification(this.tableTopicArn, message); + CounterWrapper.incrementCounter("metacat.notifications.sns.tables.create.succeeded"); + } catch (final ExecutionException | RetryException e) { + log.error("Unable to publish create table notification", e); + CounterWrapper.incrementCounter("metacat.notifications.sns.tables.create.failed"); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void notifyOfTableDeletion(@NotNull final MetacatDeleteTablePostEvent event) { + try { + final DeleteTableMessage message = new DeleteTableMessage( + UUID.randomUUID().toString(), + event.getRequestContext().getTimestamp(), + event.getRequestContext().getId(), + event.getName().toString(), + event.getTable() + ); + this.publishNotification(this.tableTopicArn, message); + CounterWrapper.incrementCounter("metacat.notifications.sns.tables.delete.succeeded"); + } catch (final ExecutionException | RetryException e) { + log.error("Unable to publish delete table notification", e); + CounterWrapper.incrementCounter("metacat.notifications.sns.tables.delete.failed"); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void notifyOfTableRename(@NotNull final MetacatRenameTablePostEvent event) { + try { + final UpdateTableMessage message = this.createUpdateTableMessage( + UUID.randomUUID().toString(), + event.getRequestContext().getTimestamp(), + event.getRequestContext().getId(), + event.getName().toString(), + event.getOldTable(), + event.getCurrentTable() + ); + this.publishNotification(this.tableTopicArn, message); + CounterWrapper.incrementCounter("metacat.notifications.sns.tables.rename.succeeded"); + } catch (final ExecutionException | IOException | RetryException e) { + log.error("Unable to publish rename table notification", e); + CounterWrapper.incrementCounter("metacat.notifications.sns.tables.rename.failed"); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void notifyOfTableUpdate(@NotNull final MetacatUpdateTablePostEvent event) { + try { + final UpdateTableMessage message = this.createUpdateTableMessage( + UUID.randomUUID().toString(), + event.getRequestContext().getTimestamp(), + event.getRequestContext().getId(), + event.getName().toString(), + event.getOldTable(), + event.getCurrentTable() + ); + this.publishNotification(this.tableTopicArn, message); + CounterWrapper.incrementCounter("metacat.notifications.sns.tables.update.succeeded"); + } catch (final IOException | ExecutionException | RetryException e) { + log.error("Unable to publish update table notification", e); + CounterWrapper.incrementCounter("metacat.notifications.sns.tables.update.failed"); + } + } + + private UpdateTableMessage createUpdateTableMessage( + final String id, + final long timestamp, + final String requestId, + final String name, + final TableDto oldTable, + final TableDto currentTable + ) throws IOException { + final JsonPatch patch = JsonDiff.asJsonPatch( + this.mapper.valueToTree(oldTable), + this.mapper.valueToTree(currentTable) + ); + return new UpdateTableMessage( + id, + timestamp, + requestId, + name, + new UpdatePayload<>(oldTable, patch, currentTable) + ); + } + + private void publishNotification( + final String arn, + final SNSMessage message + ) throws ExecutionException, RetryException { + final PublishResult result = this.retry.call( + () -> this.client.publish(arn, this.mapper.writeValueAsString(message)) + ); + log.debug("Successfully published message {} to topic {} with id {}", message, arn, result.getMessageId()); + } +} diff --git a/metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/sns/SNSNotificationServiceImplProvider.java b/metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/sns/SNSNotificationServiceImplProvider.java new file mode 100644 index 000000000..ebea95d1a --- /dev/null +++ b/metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/sns/SNSNotificationServiceImplProvider.java @@ -0,0 +1,97 @@ +/* + * + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package com.netflix.metacat.main.services.notifications.sns; + +import com.amazonaws.services.sns.AmazonSNSClient; +import com.amazonaws.services.sns.model.InternalErrorException; +import com.amazonaws.services.sns.model.PublishResult; +import com.amazonaws.services.sns.model.ThrottledException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.github.rholder.retry.Retryer; +import com.github.rholder.retry.RetryerBuilder; +import com.github.rholder.retry.StopStrategies; +import com.github.rholder.retry.WaitStrategies; +import com.google.inject.Provider; +import com.google.inject.ProvisionException; +import com.netflix.metacat.common.server.Config; +import com.netflix.metacat.main.services.notifications.DefaultNotificationServiceImpl; +import com.netflix.metacat.main.services.notifications.NotificationService; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; + +import javax.inject.Inject; +import javax.validation.constraints.NotNull; +import java.util.concurrent.TimeUnit; + +/** + * Provides an instance of SNSNotificationServiceImpl if conditions are right. + * + * @author tgianos + * @since 0.1.47 + */ +@Slf4j +public class SNSNotificationServiceImplProvider implements Provider { + + private final Config config; + private final ObjectMapper mapper; + + /** + * Constructor. + * + * @param config The metacat configuration + * @param mapper The JSON object mapper to use + */ + @Inject + public SNSNotificationServiceImplProvider(@NotNull final Config config, @NotNull final ObjectMapper mapper) { + this.config = config; + this.mapper = mapper; + } + + /** + * {@inheritDoc} + */ + @Override + public NotificationService get() { + if (this.config.isSnsNotificationEnabled()) { + final String tableArn = this.config.getSnsTopicTableArn(); + if (StringUtils.isEmpty(tableArn)) { + throw new ProvisionException( + "SNS Notifications are enabled but no table ARN provided. Unable to configure." + ); + } + final String partitionArn = this.config.getSnsTopicPartitionArn(); + if (StringUtils.isEmpty(partitionArn)) { + throw new ProvisionException( + "SNS Notifications are enabled but no partition ARN provided. Unable to configure." + ); + } + + log.info("SNS notifications are enabled. Providing SNSNotificationServiceImpl implementation."); + final Retryer retry = RetryerBuilder.newBuilder() + .retryIfExceptionOfType(InternalErrorException.class) + .retryIfExceptionOfType(ThrottledException.class) + .withWaitStrategy(WaitStrategies.incrementingWait(10, TimeUnit.SECONDS, 30, TimeUnit.SECONDS)) + .withStopStrategy(StopStrategies.stopAfterAttempt(3)) + .build(); + return new SNSNotificationServiceImpl(new AmazonSNSClient(), tableArn, partitionArn, this.mapper, retry); + } else { + log.info("SNS notifications are not enabled. Ignoring and providing default implementation."); + return new DefaultNotificationServiceImpl(); + } + } +} diff --git a/metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/sns/SNSNotificationsModule.java b/metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/sns/SNSNotificationsModule.java new file mode 100644 index 000000000..0f939affe --- /dev/null +++ b/metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/sns/SNSNotificationsModule.java @@ -0,0 +1,46 @@ +/* + * + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package com.netflix.metacat.main.services.notifications.sns; + +import com.google.inject.AbstractModule; +import com.google.inject.Singleton; +import com.google.inject.multibindings.Multibinder; +import com.netflix.metacat.main.services.notifications.NotificationService; +import lombok.extern.slf4j.Slf4j; + +/** + * Module for configuring notifications to AWS SNS. + * + * @author tgianos + * @since 0.1.47 + */ +@Slf4j +public class SNSNotificationsModule extends AbstractModule { + + /** + * {@inheritDoc} + */ + @Override + protected void configure() { + final Multibinder notificationServices = Multibinder.newSetBinder( + this.binder(), + NotificationService.class + ); + notificationServices.addBinding().toProvider(SNSNotificationServiceImplProvider.class).in(Singleton.class); + } +} diff --git a/metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/sns/package-info.java b/metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/sns/package-info.java new file mode 100644 index 000000000..2bc34ec91 --- /dev/null +++ b/metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/sns/package-info.java @@ -0,0 +1,24 @@ +/* + * + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +/** + * Classes related to setting up and sending SNS Notifications. + * + * @author tgianos + * @since 0.1.47 + */ +package com.netflix.metacat.main.services.notifications.sns; diff --git a/metacat-main/src/main/java/com/netflix/metacat/main/services/search/ElasticSearchMetacatRefresh.java b/metacat-main/src/main/java/com/netflix/metacat/main/services/search/ElasticSearchMetacatRefresh.java index 3fef0d93a..42eb58541 100644 --- a/metacat-main/src/main/java/com/netflix/metacat/main/services/search/ElasticSearchMetacatRefresh.java +++ b/metacat-main/src/main/java/com/netflix/metacat/main/services/search/ElasticSearchMetacatRefresh.java @@ -23,7 +23,6 @@ import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import com.google.common.util.concurrent.AsyncFunction; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; @@ -40,6 +39,8 @@ import com.netflix.metacat.common.monitoring.CounterWrapper; import com.netflix.metacat.common.monitoring.TimerWrapper; import com.netflix.metacat.common.server.Config; +import com.netflix.metacat.common.server.events.MetacatDeleteTablePostEvent; +import com.netflix.metacat.common.server.events.MetacatEventBus; import com.netflix.metacat.common.usermetadata.TagService; import com.netflix.metacat.common.usermetadata.UserMetadataService; import com.netflix.metacat.common.util.MetacatContextManager; @@ -50,7 +51,6 @@ import lombok.extern.slf4j.Slf4j; import org.joda.time.Instant; -import javax.annotation.Nullable; import javax.inject.Inject; import java.util.List; import java.util.Optional; @@ -66,6 +66,7 @@ /** * This class does a refresh of all the metadata entities from original data sources to elastic search. + * * @author amajumdar */ @Slf4j @@ -90,10 +91,30 @@ public class ElasticSearchMetacatRefresh { private UserMetadataService userMetadataService; @Inject private TagService tagService; + @Inject + private MetacatEventBus eventBus; // Fixed thread pool private ListeningExecutorService service; private ListeningExecutorService esService; + private static ExecutorService newFixedThreadPool(final int nThreads, final String threadFactoryName, + final int queueSize) { + return new ThreadPoolExecutor(nThreads, nThreads, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(queueSize), + new ThreadFactoryBuilder() + .setNameFormat(threadFactoryName) + .build(), + (r, executor) -> { + // this will block if the queue is full + try { + executor.getQueue().put(r); + } catch (InterruptedException e) { + throw Throwables.propagate(e); + } + }); + } + /** * Does a sweep across all catalogs to refresh the same data in elastic search. */ @@ -106,6 +127,7 @@ public void process() { /** * Does a sweep across given catalogs to refresh the same data in elastic search. + * * @param catalogNames catalog anmes */ public void processCatalogs(final List catalogNames) { @@ -116,7 +138,8 @@ public void processCatalogs(final List catalogNames) { /** * Does a sweep across given catalog and databases to refresh the same data in elastic search. - * @param catalogName catalog + * + * @param catalogName catalog * @param databaseNames database names */ public void processDatabases(final String catalogName, final List databaseNames) { @@ -128,6 +151,7 @@ public void processDatabases(final String catalogName, final List databa /** * Does a sweep across all catalogs to refresh the same data in elastic search. + * * @param names qualified names */ public void processPartitions(final List names) { @@ -152,7 +176,7 @@ private ListenableFuture _processPartitions(final List qNam final List> indexFutures = Lists.newArrayList(); int offset = 0; int count; - Sort sort = null; + Sort sort; if ("s3".equals(tableName.getCatalogName()) || "aegisthus".equals(tableName.getCatalogName())) { sort = new Sort("id", SortOrder.ASC); } else { @@ -179,18 +203,13 @@ private ListenableFuture _processPartitions(final List qNam .collect(Collectors.toList()); return Futures.transform(Futures.successfulAsList(inputFuturesWithoutNulls), Functions.constant(null)); }); - return Futures.transformAsync(processPartitionsFuture, new AsyncFunction() { - @Override - public ListenableFuture apply( - @Nullable - final Void input) throws Exception { - elasticSearchUtil.refresh(); - final List> cleanUpFutures = tables.stream() - .map(s -> service - .submit(() -> partitionsCleanUp(QualifiedName.fromString(s, false), excludeQualifiedNames))) - .collect(Collectors.toList()); - return Futures.transform(Futures.successfulAsList(cleanUpFutures), Functions.constant(null)); - } + return Futures.transformAsync(processPartitionsFuture, input -> { + elasticSearchUtil.refresh(); + final List> cleanUpFutures = tables.stream() + .map(s -> service + .submit(() -> partitionsCleanUp(QualifiedName.fromString(s, false), excludeQualifiedNames))) + .collect(Collectors.toList()); + return Futures.transform(Futures.successfulAsList(cleanUpFutures), Functions.constant(null)); }); } @@ -231,27 +250,9 @@ private Void partitionsCleanUp(final QualifiedName tableName, final List(queueSize), - new ThreadFactoryBuilder() - .setNameFormat(threadFactoryName) - .build(), - (r, executor) -> { - // this will block if the queue is full - try { - executor.getQueue().put(r); - } catch (InterruptedException e) { - throw Throwables.propagate(e); - } - }); - } - @SuppressWarnings("checkstyle:methodname") private void _process(final List qNames, final Supplier> supplier, - final String requestName, final boolean delete, final int queueSize) { + final String requestName, final boolean delete, final int queueSize) { if (isElasticSearchMetacatRefreshAlreadyRunning.compareAndSet(false, true)) { final TimerWrapper timer = TimerWrapper .createStarted("dse.metacat.timer.ElasticSearchMetacatRefresh." + requestName); @@ -259,7 +260,7 @@ private void _process(final List qNames, final Supplier qNames, - final List excludeQualifiedNames) { + final List excludeQualifiedNames) { log.info("Start: Delete unmarked entities"); // // get unmarked qualified names @@ -340,7 +341,8 @@ private void deleteUnmarkedEntities(final List qNames, } } catch (SchemaNotFoundException ignored) { result = true; - } catch (Exception ignored) { } + } catch (Exception ignored) { + } return result; }).collect(Collectors.toList()); log.info("Unmarked databases({}): {}", unmarkedDatabaseNames.size(), unmarkedDatabaseNames); @@ -377,7 +379,8 @@ private void deleteUnmarkedEntities(final List qNames, if (!dto.isPresent()) { result = true; } - } catch (Exception ignored) { } + } catch (Exception ignored) { + } return result; }).collect(Collectors.toList()); log.info("Unmarked tables({}): {}", unmarkedTableNames.size(), unmarkedTableNames); @@ -387,8 +390,17 @@ private void deleteUnmarkedEntities(final List qNames, dto -> dto.getName().toString()).collect(Collectors.toList()); log.info("Deleting tables({}): {}", deleteTableNames.size(), deleteTableNames); userMetadataService.deleteMetadatas("admin", Lists.newArrayList(deleteTableDtos)); - elasticSearchUtil.softDelete("table", deleteTableNames, context); - deleteTableDtos.forEach(tableDto -> tagService.delete(tableDto.getName(), false)); + + // Publish event. Elasticsearch event handler will take care of updating the index already + // TODO: Re-evaluate events vs. direct calls for these types of situations like in Genie + deleteTableDtos.forEach( + tableDto -> { + tagService.delete(tableDto.getName(), false); + this.eventBus.postAsync( + new MetacatDeleteTablePostEvent(tableDto.getName(), context, tableDto) + ); + } + ); } log.info("End: Delete unmarked tables({})", unmarkedTableDtos.size()); } else { @@ -459,13 +471,14 @@ private List getCatalogNamesToRefresh() { /** * Process the list of databases. - * @param catalogName catalog name + * + * @param catalogName catalog name * @param databaseNames database names * @return future */ @SuppressWarnings("checkstyle:methodname") private ListenableFuture _processDatabases(final QualifiedName catalogName, - final List databaseNames) { + final List databaseNames) { ListenableFuture resultFuture = null; log.info("Full refresh of catalog {} for databases({}): {}", catalogName, databaseNames.size(), databaseNames); final List> getDatabaseFutures = databaseNames.stream() @@ -507,8 +520,9 @@ private ListenableFuture _processDatabases(final QualifiedName catalogName /** * Save all databases to index it in elastic search. + * * @param catalogName catalog name - * @param dtos database dtos + * @param dtos database dtos * @return future */ private ListenableFuture indexDatabaseDtos(final QualifiedName catalogName, final List dtos) { @@ -525,12 +539,13 @@ private ListenableFuture indexDatabaseDtos(final QualifiedName catalogName /** * Process the list of tables in batches. + * * @param databaseName database name - * @param tableNames table names + * @param tableNames table names * @return A future containing the tasks */ private ListenableFuture processTables(final QualifiedName databaseName, - final List tableNames) { + final List tableNames) { final List> tableNamesBatches = Lists.partition(tableNames, 500); final List> processTablesBatchFutures = tableNamesBatches.stream().map( subTableNames -> _processTables(databaseName, subTableNames)).collect(Collectors.toList()); @@ -540,7 +555,7 @@ private ListenableFuture processTables(final QualifiedName databaseName, @SuppressWarnings("checkstyle:methodname") private ListenableFuture _processTables(final QualifiedName databaseName, - final List tableNames) { + final List tableNames) { final List>> getTableFutures = tableNames.stream() .map(tableName -> service.submit(() -> { Optional result = null; @@ -549,7 +564,7 @@ private ListenableFuture _processTables(final QualifiedName databaseName, } catch (Exception e) { log.error("Failed to retrieve table: {}", tableName); elasticSearchUtil.log("ElasticSearchMetacatRefresh.getTable", ElasticSearchDoc.Type.table.name(), - tableName.toString(), null, e.getMessage(), e, true); + tableName.toString(), null, e.getMessage(), e, true); } return result; })) @@ -561,12 +576,13 @@ private ListenableFuture _processTables(final QualifiedName databaseName, /** * Save all tables to index it in elastic search. + * * @param databaseName database name - * @param dtos table dtos + * @param dtos table dtos * @return future */ private ListenableFuture indexTableDtos(final QualifiedName databaseName, - final List> dtos) { + final List> dtos) { return esService.submit(() -> { final List docs = dtos.stream().filter(dto -> dto != null && dto.isPresent()).map( tableDtoOptional -> { @@ -582,8 +598,9 @@ private ListenableFuture indexTableDtos(final QualifiedName databaseName, /** * Save all tables to index it in elastic search. + * * @param tableName database name - * @param dtos partition dtos + * @param dtos partition dtos * @return future */ private ListenableFuture indexPartitionDtos(final QualifiedName tableName, final List dtos) { diff --git a/metacat-main/src/test/groovy/com/netflix/metacat/main/services/notifications/sns/SNSNotificationServiceImplProviderSpec.groovy b/metacat-main/src/test/groovy/com/netflix/metacat/main/services/notifications/sns/SNSNotificationServiceImplProviderSpec.groovy new file mode 100644 index 000000000..62928e437 --- /dev/null +++ b/metacat-main/src/test/groovy/com/netflix/metacat/main/services/notifications/sns/SNSNotificationServiceImplProviderSpec.groovy @@ -0,0 +1,88 @@ +/* + * + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package com.netflix.metacat.main.services.notifications.sns + +import com.fasterxml.jackson.databind.ObjectMapper +import com.google.inject.ProvisionException +import com.netflix.metacat.common.server.Config +import com.netflix.metacat.main.services.notifications.DefaultNotificationServiceImpl +import spock.lang.Specification + +/** + * Tests for SNSNotificationsServiceImplProvider. + * + * @author tgianos + * @since 0.1.47 + */ +class SNSNotificationServiceImplProviderSpec extends Specification { + + def config = Mock(Config) + def mapper = Mock(ObjectMapper) + + def "Will provide default implementation when SNS is disabled"() { + def provider = new SNSNotificationServiceImplProvider(config, Mock(ObjectMapper)) + + when: "call get" + def service = provider.get() + + then: + service instanceof DefaultNotificationServiceImpl + 1 * config.isSnsNotificationEnabled() >> false + 0 * config.getSnsTopicPartitionArn() + 0 * config.getSnsTopicTableArn() + } + + def "Will provide SNS implementation when SNS is enabled"() { + def provider = new SNSNotificationServiceImplProvider(this.config, this.mapper) + + when: "call get" + def service = provider.get() + + then: "Should return a SNSNotificationServiceImpl implementation" + service instanceof SNSNotificationServiceImpl + 1 * this.config.isSnsNotificationEnabled() >> true + 1 * this.config.getSnsTopicPartitionArn() >> UUID.randomUUID().toString() + 1 * this.config.getSnsTopicTableArn() >> UUID.randomUUID().toString() + } + + def "Will throw exception if partition ARN not set but SNS enabled"() { + def provider = new SNSNotificationServiceImplProvider(this.config, this.mapper) + + when: "call get" + provider.get() + + then: "Will throw a provision exception" + thrown ProvisionException + 1 * this.config.isSnsNotificationEnabled() >> true + 1 * this.config.getSnsTopicPartitionArn() + 1 * this.config.getSnsTopicTableArn() >> UUID.randomUUID().toString() + } + + def "Will throw exception if table ARN not set but SNS enabled"() { + def provider = new SNSNotificationServiceImplProvider(this.config, this.mapper) + + when: "call get" + provider.get() + + then: "will throw a provision exception" + thrown ProvisionException + 1 * this.config.isSnsNotificationEnabled() >> true + 0 * this.config.getSnsTopicPartitionArn() + 1 * this.config.getSnsTopicTableArn() + } +} diff --git a/metacat-main/src/test/groovy/com/netflix/metacat/main/services/notifications/sns/SNSNotificationServiceImplSpec.groovy b/metacat-main/src/test/groovy/com/netflix/metacat/main/services/notifications/sns/SNSNotificationServiceImplSpec.groovy new file mode 100644 index 000000000..0d9e30308 --- /dev/null +++ b/metacat-main/src/test/groovy/com/netflix/metacat/main/services/notifications/sns/SNSNotificationServiceImplSpec.groovy @@ -0,0 +1,226 @@ +/* + * + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package com.netflix.metacat.main.services.notifications.sns + +import com.amazonaws.services.sns.AmazonSNSClient +import com.amazonaws.services.sns.model.InternalErrorException +import com.amazonaws.services.sns.model.NotFoundException +import com.amazonaws.services.sns.model.PublishResult +import com.amazonaws.services.sns.model.ThrottledException +import com.facebook.presto.hive.$internal.com.google.common.collect.Lists +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.databind.node.TextNode +import com.github.rholder.retry.RetryerBuilder +import com.github.rholder.retry.StopStrategies +import com.github.rholder.retry.WaitStrategies +import com.netflix.metacat.common.MetacatRequestContext +import com.netflix.metacat.common.QualifiedName +import com.netflix.metacat.common.dto.PartitionDto +import com.netflix.metacat.common.dto.PartitionsSaveResponseDto +import com.netflix.metacat.common.dto.TableDto +import com.netflix.metacat.common.dto.notifications.sns.messages.* +import com.netflix.metacat.common.server.events.* +import spock.lang.Specification + +import java.util.concurrent.TimeUnit + +/** + * Tests for the SNSNotificationServiceImpl. + * + * @author tgianos + * @since 0.1.47 + */ +class SNSNotificationServiceImplSpec extends Specification { + + def client = Mock(AmazonSNSClient) + def qName = QualifiedName.fromString( + UUID.randomUUID().toString() + + "/" + + UUID.randomUUID().toString() + + "/" + + UUID.randomUUID().toString() + + "/" + + UUID.randomUUID().toString() + + "/" + + UUID.randomUUID().toString() + ) + def mapper = Mock(ObjectMapper) + def partitionArn = UUID.randomUUID().toString() + def tableArn = UUID.randomUUID().toString() + def retry = RetryerBuilder. newBuilder() + .retryIfExceptionOfType(InternalErrorException.class) + .retryIfExceptionOfType(ThrottledException.class) + .withWaitStrategy(WaitStrategies.incrementingWait(10, TimeUnit.NANOSECONDS, 30, TimeUnit.NANOSECONDS)) + .withStopStrategy(StopStrategies.stopAfterAttempt(3)) + .build(); + def service = new SNSNotificationServiceImpl(this.client, this.tableArn, this.partitionArn, this.mapper, this.retry) + + def "Will Notify On Partition Creation"() { + def partitions = Lists.newArrayList(new PartitionDto(), new PartitionDto(), new PartitionDto()) + + def event = new MetacatSaveTablePartitionPostEvent( + this.qName, + Mock(MetacatRequestContext), + partitions, + Mock(PartitionsSaveResponseDto) + ) + + when: + this.service.notifyOfPartitionAddition(event) + + then: + 3 * this.mapper.writeValueAsString(_ as AddPartitionMessage) >> UUID.randomUUID().toString() + 1 * this.mapper.writeValueAsString(_ as UpdateTablePartitionsMessage) >> UUID.randomUUID().toString() + 3 * this.client.publish(this.partitionArn, _ as String) >> new PublishResult() + 1 * this.client.publish(this.tableArn, _ as String) >> new PublishResult() + } + + def "Will Notify On Partition Deletion"() { + def partitions = Lists.newArrayList( + UUID.randomUUID().toString(), + UUID.randomUUID().toString(), + UUID.randomUUID().toString(), + UUID.randomUUID().toString(), + UUID.randomUUID().toString() + ) + + def event = new MetacatDeleteTablePartitionPostEvent( + this.qName, + Mock(MetacatRequestContext), + partitions + ) + + when: + this.service.notifyOfPartitionDeletion(event) + + then: + 5 * this.mapper.writeValueAsString(_ as DeletePartitionMessage) >> UUID.randomUUID().toString() + 1 * this.mapper.writeValueAsString(_ as UpdateTablePartitionsMessage) >> UUID.randomUUID().toString() + 5 * this.client.publish(this.partitionArn, _ as String) >> new PublishResult() + 1 * this.client.publish(this.tableArn, _ as String) >> new PublishResult() + } + + def "Will Notify On Table Creation"() { + def event = new MetacatCreateTablePostEvent( + this.qName, + Mock(MetacatRequestContext), + new TableDto() + ) + + when: + this.service.notifyOfTableCreation(event) + + then: + 1 * this.mapper.writeValueAsString(_ as CreateTableMessage) >> UUID.randomUUID().toString() + 1 * this.client.publish(this.tableArn, _ as String) >> new PublishResult() + } + + def "Will Notify On Table Deletion"() { + def event = new MetacatDeleteTablePostEvent( + this.qName, + Mock(MetacatRequestContext), + new TableDto() + ) + + when: + this.service.notifyOfTableDeletion(event) + + then: + 1 * this.mapper.writeValueAsString(_ as DeleteTableMessage) >> UUID.randomUUID().toString() + 1 * this.client.publish(this.tableArn, _ as String) >> new PublishResult() + } + + def "Will Notify On Table Rename"() { + def event = new MetacatRenameTablePostEvent( + this.qName, + Mock(MetacatRequestContext), + new TableDto(), + new TableDto() + ) + + when: + this.service.notifyOfTableRename(event) + + then: + 2 * this.mapper.valueToTree(_ as TableDto) >> new TextNode(UUID.randomUUID().toString()) + 1 * this.mapper.writeValueAsString(_ as UpdateTableMessage) >> UUID.randomUUID().toString() + 1 * this.client.publish(this.tableArn, _ as String) >> new PublishResult() + } + + def "Will Notify On Table Update"() { + def event = new MetacatUpdateTablePostEvent( + this.qName, + Mock(MetacatRequestContext), + new TableDto(), + new TableDto() + ) + + when: + this.service.notifyOfTableUpdate(event) + + then: + 2 * this.mapper.valueToTree(_ as TableDto) >> new TextNode(UUID.randomUUID().toString()) + 1 * this.mapper.writeValueAsString(_ as UpdateTableMessage) >> UUID.randomUUID().toString() + 1 * this.client.publish(this.tableArn, _ as String) >> new PublishResult() + } + + def "Will retry on ThrottledException"() { + def event = new MetacatCreateTablePostEvent( + this.qName, + Mock(MetacatRequestContext), + new TableDto() + ) + + when: + this.service.notifyOfTableCreation(event) + + then: + 3 * this.mapper.writeValueAsString(_ as CreateTableMessage) >> UUID.randomUUID().toString() + 3 * this.client.publish(this.tableArn, _ as String) >> { throw new ThrottledException("Exception") } + } + + def "Will retry on InternalException"() { + def event = new MetacatCreateTablePostEvent( + this.qName, + Mock(MetacatRequestContext), + new TableDto() + ) + + when: + this.service.notifyOfTableCreation(event) + + then: + 3 * this.mapper.writeValueAsString(_ as CreateTableMessage) >> UUID.randomUUID().toString() + 3 * this.client.publish(this.tableArn, _ as String) >> { throw new InternalErrorException("Exception") } + } + + def "Won't retry on Other Exception"() { + def event = new MetacatCreateTablePostEvent( + this.qName, + Mock(MetacatRequestContext), + new TableDto() + ) + + when: + this.service.notifyOfTableCreation(event) + + then: + 1 * this.mapper.writeValueAsString(_ as CreateTableMessage) >> UUID.randomUUID().toString() + 1 * this.client.publish(this.tableArn, _ as String) >> { throw new NotFoundException("Exception") } + } +} diff --git a/scripts/stop_metacat_test_cluster.sh b/scripts/stop_metacat_test_cluster.sh index 7f6c89d4a..3bdcc522c 100755 --- a/scripts/stop_metacat_test_cluster.sh +++ b/scripts/stop_metacat_test_cluster.sh @@ -5,23 +5,8 @@ set -x # Usage: ./stopCluster.sh docker-compose.yml COMPOSE_FILE=$1 -docker-compose --file $COMPOSE_FILE stop -t 30 +docker-compose --file $COMPOSE_FILE down if [ $? -ne 0 ]; then - echo "Unable to stop docker-compose" + echo "Unable to bring down docker-compose" exit 9 fi - -docker-compose --file $COMPOSE_FILE rm -f -if [ $? -ne 0 ]; then - echo "Unable to remove docker compose containers" - exit 10 -fi - -TEST_CONTAINERS=$(docker ps -a -q --filter "label=com.netflix.metacat.oss.test=true") -if [[ "$TEST_CONTAINER" != "" ]]; then - docker rm -f $TEST_CONTAINERS - if [ $? -ne 0 ]; then - echo "Unable to remove metacat-test docker containers" - exit 11 - fi -fi