Skip to content

Commit

Permalink
Modified MetadataService to not depend on Guava Futures library. Modi…
Browse files Browse the repository at this point in the history
…fied ElasticSearchRefresh to not use the deprecated Guava Futures method. (#436)
  • Loading branch information
ajoymajumdar committed May 11, 2021
1 parent 3d76ba6 commit 3d4d454
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,8 @@

package com.netflix.metacat.main.services;

import com.google.common.base.Function;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.netflix.metacat.common.MetacatRequestContext;
import com.netflix.metacat.common.QualifiedName;
import com.netflix.metacat.common.dto.BaseDto;
Expand All @@ -29,7 +25,6 @@
import com.netflix.metacat.common.server.usermetadata.TagService;
import com.netflix.metacat.common.server.usermetadata.UserMetadataService;
import com.netflix.metacat.common.server.util.MetacatContextManager;
import com.netflix.metacat.common.server.util.ThreadServiceManager;
import com.netflix.spectator.api.Registry;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -151,28 +146,22 @@ public void cleanUpObsoleteDefinitionMetadata() {
List<DefinitionMetadataDto> dtos = null;
int offset = 0;
final int limit = 10000;
final ThreadServiceManager threadServiceManager =
new ThreadServiceManager(registry, 10, 20000, "definition");
final ListeningExecutorService service = threadServiceManager.getExecutor();
int totalDeletes = 0;
while (offset == 0 || dtos.size() == limit) {
dtos = userMetadataService.searchDefinitionMetadata(null, null, null, null,
"id", null, offset, limit);
int deletes = 0;
final List<ListenableFuture<Boolean>> futures = dtos.stream().map(dto ->
service.submit(() -> deleteDefinitionMetadata(dto.getName(), false, metacatRequestContext)))
.collect(Collectors.toList());
try {
deletes = Futures.transform(Futures.successfulAsList(futures),
(Function<List<Boolean>, Integer>) input ->
(int) (input != null ? input.stream().filter(b -> b != null && b).count() : 0)).get();
} catch (Exception e) {
log.warn("Failed deleting obsolete definition metadata for offset {}.", offset, e);
}
final long deletes = dtos.parallelStream().map(dto -> {
try {
return deleteDefinitionMetadata(dto.getName(), false, metacatRequestContext);
} catch (Exception e) {
log.warn("Failed deleting obsolete definition metadata for table {}", dto.getName(), e);
return false;
}
})
.filter(b -> b).count();
totalDeletes += deletes;
offset += limit - deletes;
}
threadServiceManager.stop();
log.info("End deleting obsolete definition metadata. Deleted {} number of definition metadatas", totalDeletes);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ private ListenableFuture<Void> _processPartitions(final List<QualifiedName> qNam
final List<ListenableFuture<Void>> inputFuturesWithoutNulls = input.stream().filter(NOT_NULL)
.collect(Collectors.toList());
return Futures.transform(Futures.successfulAsList(inputFuturesWithoutNulls),
Functions.constant(null));
Functions.constant(null), defaultService);
}, defaultService);
return Futures.transformAsync(processPartitionsFuture, input -> {
elasticSearchUtil.refresh();
Expand Down
20 changes: 20 additions & 0 deletions metacat-main/src/test/resources/search/mapping/metacat.json
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,18 @@
}
}
}
},
"extendedSchema": {
"type": "object",
"enabled": false
},
"data_dependency": {
"type": "object",
"enabled": false
},
"table_cost": {
"type": "object",
"enabled": false
}
}
},
Expand Down Expand Up @@ -258,6 +270,10 @@
"type": "keyword",
"index": true
},
"jsonType": {
"type": "object",
"enabled": false
},
"source_type": {
"type": "keyword",
"index": true
Expand All @@ -277,6 +293,10 @@
"type": {
"type": "keyword",
"index": true
},
"jsonType": {
"type": "object",
"enabled": false
}
}
}
Expand Down

0 comments on commit 3d4d454

Please sign in to comment.