From e9fc98a7576475eb7e7fb9ed84773843707b1917 Mon Sep 17 00:00:00 2001 From: Christopher Berner Date: Tue, 28 Apr 2015 18:18:26 -0700 Subject: [PATCH] Track all memory pools in coordinator --- .../presto/execution/LocationFactory.java | 2 + .../presto/memory/ClusterMemoryManager.java | 144 +++++++++++++++++- .../presto/memory/ClusterMemoryPool.java | 128 ++++++++++++++++ .../presto/memory/ForMemoryManager.java | 31 ++++ .../presto/memory/LocalMemoryManager.java | 10 +- .../facebook/presto/memory/MemoryInfo.java | 12 +- .../facebook/presto/memory/MemoryPool.java | 4 +- .../facebook/presto/memory/MemoryPoolId.java | 6 +- .../presto/memory/MemoryPoolInfo.java | 15 +- .../presto/memory/RemoteNodeMemory.java | 103 +++++++++++++ .../presto/server/HttpLocationFactory.java | 9 ++ .../presto/server/ServerMainModule.java | 8 + .../server/testing/TestingPrestoServer.java | 8 + .../presto/execution/TestSqlTaskManager.java | 6 + .../{tests => memory}/TestMemoryManager.java | 32 +++- 15 files changed, 485 insertions(+), 33 deletions(-) create mode 100644 presto-main/src/main/java/com/facebook/presto/memory/ClusterMemoryPool.java create mode 100644 presto-main/src/main/java/com/facebook/presto/memory/ForMemoryManager.java create mode 100644 presto-main/src/main/java/com/facebook/presto/memory/RemoteNodeMemory.java rename presto-tests/src/test/java/com/facebook/presto/{tests => memory}/TestMemoryManager.java (71%) diff --git a/presto-main/src/main/java/com/facebook/presto/execution/LocationFactory.java b/presto-main/src/main/java/com/facebook/presto/execution/LocationFactory.java index 054f62f158da..8f8e13c2efd2 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/LocationFactory.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/LocationFactory.java @@ -26,4 +26,6 @@ public interface LocationFactory URI createLocalTaskLocation(TaskId taskId); URI createTaskLocation(Node node, TaskId taskId); + + URI createMemoryInfoLocation(Node node); } diff --git a/presto-main/src/main/java/com/facebook/presto/memory/ClusterMemoryManager.java b/presto-main/src/main/java/com/facebook/presto/memory/ClusterMemoryManager.java index a5c9f3327ff1..af1598474892 100644 --- a/presto-main/src/main/java/com/facebook/presto/memory/ClusterMemoryManager.java +++ b/presto-main/src/main/java/com/facebook/presto/memory/ClusterMemoryManager.java @@ -14,26 +14,70 @@ package com.facebook.presto.memory; import com.facebook.presto.ExceededMemoryLimitException; +import com.facebook.presto.execution.LocationFactory; import com.facebook.presto.execution.QueryExecution; +import com.facebook.presto.spi.Node; +import com.facebook.presto.spi.NodeManager; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import io.airlift.http.client.HttpClient; +import io.airlift.json.JsonCodec; +import io.airlift.log.Logger; import io.airlift.units.DataSize; +import org.weakref.jmx.JmxException; +import org.weakref.jmx.MBeanExporter; import org.weakref.jmx.Managed; +import org.weakref.jmx.ObjectNames; +import javax.annotation.PreDestroy; +import javax.annotation.concurrent.GuardedBy; import javax.inject.Inject; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; import java.util.concurrent.atomic.AtomicLong; +import static com.facebook.presto.util.ImmutableCollectors.toImmutableList; +import static com.facebook.presto.util.ImmutableCollectors.toImmutableSet; +import static com.google.common.collect.Sets.difference; import static java.util.Objects.requireNonNull; public class ClusterMemoryManager { + private static final Logger log = Logger.get(ClusterMemoryManager.class); + private final NodeManager nodeManager; + private final LocationFactory locationFactory; + private final HttpClient httpClient; + private final MBeanExporter exporter; + private final JsonCodec memoryInfoCodec; private final DataSize maxQueryMemory; private final boolean enabled; private final AtomicLong clusterMemoryUsageBytes = new AtomicLong(); + private final AtomicLong clusterMemoryBytes = new AtomicLong(); + private final Map nodes = new HashMap<>(); + + @GuardedBy("this") + private final Map pools = new HashMap<>(); @Inject - public ClusterMemoryManager(MemoryManagerConfig config) + public ClusterMemoryManager( + @ForMemoryManager HttpClient httpClient, + NodeManager nodeManager, + LocationFactory locationFactory, + MBeanExporter exporter, + JsonCodec memoryInfoCodec, + MemoryManagerConfig config) { requireNonNull(config, "config is null"); + this.nodeManager = requireNonNull(nodeManager, "nodeManager is null"); + this.locationFactory = requireNonNull(locationFactory, "locationFactory is null"); + this.httpClient = requireNonNull(httpClient, "httpClient is null"); + this.exporter = requireNonNull(exporter, "exporter is null"); + this.memoryInfoCodec = requireNonNull(memoryInfoCodec, "memoryInfoCodec is null"); this.maxQueryMemory = config.getMaxQueryMemory(); this.enabled = config.isClusterMemoryManagerEnabled(); } @@ -52,6 +96,98 @@ public void process(Iterable queries) } } clusterMemoryUsageBytes.set(totalBytes); + + updateNodes(); + updatePools(); + } + + @VisibleForTesting + synchronized Map getPools() + { + return ImmutableMap.copyOf(pools); + } + + private void updateNodes() + { + Set activeNodes = nodeManager.getActiveNodes(); + ImmutableSet activeNodeIds = activeNodes.stream() + .map(Node::getNodeIdentifier) + .collect(toImmutableSet()); + + // Remove nodes that don't exist anymore + nodes.keySet().removeAll(difference(nodes.keySet(), activeNodeIds)); + + // Add new nodes + for (Node node : activeNodes) { + if (!nodes.containsKey(node.getNodeIdentifier())) { + nodes.put(node.getNodeIdentifier(), new RemoteNodeMemory(httpClient, memoryInfoCodec, locationFactory.createMemoryInfoLocation(node))); + } + } + + // Schedule refresh + for (RemoteNodeMemory node : nodes.values()) { + node.asyncRefresh(); + } + } + + private synchronized void updatePools() + { + // Update view of cluster memory and pools + List nodeMemoryInfos = nodes.values().stream() + .map(RemoteNodeMemory::getInfo) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(toImmutableList()); + + long totalClusterMemory = nodeMemoryInfos.stream() + .map(MemoryInfo::getTotalNodeMemory) + .mapToLong(DataSize::toBytes) + .sum(); + clusterMemoryBytes.set(totalClusterMemory); + + Set activePoolIds = nodeMemoryInfos.stream() + .flatMap(info -> info.getPools().keySet().stream()) + .collect(toImmutableSet()); + + Set removedPools = difference(pools.keySet(), activePoolIds); + for (MemoryPoolId removed : removedPools) { + unexport(pools.get(removed)); + pools.remove(removed); + } + for (MemoryPoolId id : activePoolIds) { + ClusterMemoryPool pool = pools.computeIfAbsent(id, poolId -> { + ClusterMemoryPool newPool = new ClusterMemoryPool(poolId); + String objectName = ObjectNames.builder(ClusterMemoryPool.class, newPool.getId().toString()).build(); + try { + exporter.export(objectName, newPool); + } + catch (JmxException e) { + log.error(e, "Error exporting memory pool %s", poolId); + } + return newPool; + }); + pool.update(nodeMemoryInfos); + } + } + + @PreDestroy + public synchronized void destroy() + { + for (ClusterMemoryPool pool : pools.values()) { + unexport(pool); + } + pools.clear(); + } + + private void unexport(ClusterMemoryPool pool) + { + try { + String objectName = ObjectNames.builder(ClusterMemoryPool.class, pool.getId().toString()).build(); + exporter.unexport(objectName); + } + catch (JmxException e) { + log.error(e, "Failed to unexport pool %s", pool.getId()); + } } @Managed @@ -59,4 +195,10 @@ public long getClusterMemoryUsageBytes() { return clusterMemoryUsageBytes.get(); } + + @Managed + public long getClusterMemoryBytes() + { + return clusterMemoryBytes.get(); + } } diff --git a/presto-main/src/main/java/com/facebook/presto/memory/ClusterMemoryPool.java b/presto-main/src/main/java/com/facebook/presto/memory/ClusterMemoryPool.java new file mode 100644 index 000000000000..fbf979a19b7c --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/memory/ClusterMemoryPool.java @@ -0,0 +1,128 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.memory; + +import com.google.common.base.MoreObjects; +import com.google.common.base.Objects; +import org.weakref.jmx.Managed; + +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; + +import java.util.List; + +import static java.util.Objects.requireNonNull; + +@ThreadSafe +public class ClusterMemoryPool +{ + private final MemoryPoolId id; + + @GuardedBy("this") + private long totalDistributedBytes; + + @GuardedBy("this") + private long freeDistributedBytes; + + @GuardedBy("this") + private int nodes; + + @GuardedBy("this") + private int blockedNodes; + + public ClusterMemoryPool(MemoryPoolId id) + { + this.id = requireNonNull(id, "id is null"); + } + + public MemoryPoolId getId() + { + return id; + } + + @Managed + public synchronized long getTotalDistributedBytes() + { + return totalDistributedBytes; + } + + @Managed + public synchronized long getFreeDistributedBytes() + { + return freeDistributedBytes; + } + + @Managed + public synchronized int getNodes() + { + return nodes; + } + + @Managed + public synchronized int getBlockedNodes() + { + return blockedNodes; + } + + public synchronized void update(List memoryInfos) + { + nodes = 0; + blockedNodes = 0; + totalDistributedBytes = 0; + freeDistributedBytes = 0; + + for (MemoryInfo info : memoryInfos) { + MemoryPoolInfo poolInfo = info.getPools().get(id); + if (poolInfo != null) { + nodes++; + if (poolInfo.getFreeBytes() <= 0) { + blockedNodes++; + } + totalDistributedBytes += poolInfo.getMaxBytes(); + freeDistributedBytes += poolInfo.getFreeBytes(); + } + } + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ClusterMemoryPool that = (ClusterMemoryPool) o; + return Objects.equal(id, that.id); + } + + @Override + public int hashCode() + { + return Objects.hashCode(id); + } + + @Override + public synchronized String toString() + { + return MoreObjects.toStringHelper(this) + .add("id", id) + .add("totalDistributedBytes", totalDistributedBytes) + .add("freeDistributedBytes", freeDistributedBytes) + .add("nodes", nodes) + .add("blockedNodes", blockedNodes) + .toString(); + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/memory/ForMemoryManager.java b/presto-main/src/main/java/com/facebook/presto/memory/ForMemoryManager.java new file mode 100644 index 000000000000..469a13312882 --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/memory/ForMemoryManager.java @@ -0,0 +1,31 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.memory; + +import javax.inject.Qualifier; + +import java.lang.annotation.Retention; +import java.lang.annotation.Target; + +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +@Retention(RUNTIME) +@Target({FIELD, PARAMETER, METHOD}) +@Qualifier +public @interface ForMemoryManager +{ +} diff --git a/presto-main/src/main/java/com/facebook/presto/memory/LocalMemoryManager.java b/presto-main/src/main/java/com/facebook/presto/memory/LocalMemoryManager.java index 9c8a89578526..5f2934951242 100644 --- a/presto-main/src/main/java/com/facebook/presto/memory/LocalMemoryManager.java +++ b/presto-main/src/main/java/com/facebook/presto/memory/LocalMemoryManager.java @@ -23,7 +23,6 @@ import java.util.Map; -import static com.facebook.presto.util.ImmutableCollectors.toImmutableList; import static io.airlift.units.DataSize.Unit.BYTE; import static java.util.Objects.requireNonNull; @@ -51,10 +50,11 @@ public LocalMemoryManager(MemoryManagerConfig config, MBeanExporter exporter) public MemoryInfo getInfo() { - return new MemoryInfo(maxMemory, - pools.values().stream() - .map(MemoryPool::getInfo) - .collect(toImmutableList())); + ImmutableMap.Builder builder = ImmutableMap.builder(); + for (Map.Entry entry : pools.entrySet()) { + builder.put(entry.getKey(), entry.getValue().getInfo()); + } + return new MemoryInfo(maxMemory, builder.build()); } public MemoryPool getPool(MemoryPoolId id) diff --git a/presto-main/src/main/java/com/facebook/presto/memory/MemoryInfo.java b/presto-main/src/main/java/com/facebook/presto/memory/MemoryInfo.java index 5b6746e4960a..6df69720f4b6 100644 --- a/presto-main/src/main/java/com/facebook/presto/memory/MemoryInfo.java +++ b/presto-main/src/main/java/com/facebook/presto/memory/MemoryInfo.java @@ -16,23 +16,23 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.MoreObjects; -import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import io.airlift.units.DataSize; -import java.util.List; +import java.util.Map; import static java.util.Objects.requireNonNull; public class MemoryInfo { private final DataSize totalNodeMemory; - private final List pools; + private final Map pools; @JsonCreator - public MemoryInfo(@JsonProperty("totalNodeMemory") DataSize totalNodeMemory, @JsonProperty("pools") List pools) + public MemoryInfo(@JsonProperty("totalNodeMemory") DataSize totalNodeMemory, @JsonProperty("pools") Map pools) { this.totalNodeMemory = requireNonNull(totalNodeMemory, "totalNodeMemory is null"); - this.pools = ImmutableList.copyOf(requireNonNull(pools, "pools is null")); + this.pools = ImmutableMap.copyOf(requireNonNull(pools, "pools is null")); } @JsonProperty @@ -42,7 +42,7 @@ public DataSize getTotalNodeMemory() } @JsonProperty - public List getPools() + public Map getPools() { return pools; } diff --git a/presto-main/src/main/java/com/facebook/presto/memory/MemoryPool.java b/presto-main/src/main/java/com/facebook/presto/memory/MemoryPool.java index 6cb58058d173..766d1b5e1de1 100644 --- a/presto-main/src/main/java/com/facebook/presto/memory/MemoryPool.java +++ b/presto-main/src/main/java/com/facebook/presto/memory/MemoryPool.java @@ -56,7 +56,7 @@ public MemoryPoolId getId() public synchronized MemoryPoolInfo getInfo() { - return new MemoryPoolInfo(id, maxBytes, freeBytes); + return new MemoryPoolInfo(maxBytes, freeBytes); } /** @@ -117,7 +117,7 @@ public synchronized long getMaxBytes() } @Override - public String toString() + public synchronized String toString() { return MoreObjects.toStringHelper(this) .add("id", id) diff --git a/presto-main/src/main/java/com/facebook/presto/memory/MemoryPoolId.java b/presto-main/src/main/java/com/facebook/presto/memory/MemoryPoolId.java index 55882ad937f1..4f4ec81add9d 100644 --- a/presto-main/src/main/java/com/facebook/presto/memory/MemoryPoolId.java +++ b/presto-main/src/main/java/com/facebook/presto/memory/MemoryPoolId.java @@ -15,7 +15,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonValue; -import com.google.common.base.MoreObjects; import com.google.common.base.Objects; import static com.google.common.base.Preconditions.checkArgument; @@ -61,8 +60,7 @@ public int hashCode() @Override public String toString() { - return MoreObjects.toStringHelper(this) - .add("id", id) - .toString(); + // Return id here, because Jackson uses toString() when MemoryPoolId is the key of a Map + return id; } } diff --git a/presto-main/src/main/java/com/facebook/presto/memory/MemoryPoolInfo.java b/presto-main/src/main/java/com/facebook/presto/memory/MemoryPoolInfo.java index c7bbf6b8742e..cf00ad8fc7f1 100644 --- a/presto-main/src/main/java/com/facebook/presto/memory/MemoryPoolInfo.java +++ b/presto-main/src/main/java/com/facebook/presto/memory/MemoryPoolInfo.java @@ -17,30 +17,18 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.MoreObjects; -import static java.util.Objects.requireNonNull; - public class MemoryPoolInfo { - private final MemoryPoolId id; private final long maxBytes; private final long freeBytes; @JsonCreator - public MemoryPoolInfo(@JsonProperty("id") MemoryPoolId id, - @JsonProperty("maxBytes") long maxBytes, - @JsonProperty("freeBytes") long freeBytes) + public MemoryPoolInfo(@JsonProperty("maxBytes") long maxBytes, @JsonProperty("freeBytes") long freeBytes) { - this.id = requireNonNull(id, "id is null"); this.maxBytes = maxBytes; this.freeBytes = freeBytes; } - @JsonProperty - public MemoryPoolId getId() - { - return id; - } - @JsonProperty public long getMaxBytes() { @@ -57,7 +45,6 @@ public long getFreeBytes() public String toString() { return MoreObjects.toStringHelper(this) - .add("id", id) .add("maxBytes", maxBytes) .add("freeBytes", freeBytes) .toString(); diff --git a/presto-main/src/main/java/com/facebook/presto/memory/RemoteNodeMemory.java b/presto-main/src/main/java/com/facebook/presto/memory/RemoteNodeMemory.java new file mode 100644 index 000000000000..a34235235cc4 --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/memory/RemoteNodeMemory.java @@ -0,0 +1,103 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.memory; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import io.airlift.http.client.FullJsonResponseHandler.JsonResponse; +import io.airlift.http.client.HttpClient; +import io.airlift.http.client.HttpClient.HttpResponseFuture; +import io.airlift.http.client.Request; +import io.airlift.json.JsonCodec; +import io.airlift.log.Logger; +import io.airlift.units.Duration; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.ThreadSafe; + +import java.net.URI; +import java.util.Optional; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import static io.airlift.http.client.FullJsonResponseHandler.createFullJsonResponseHandler; +import static io.airlift.http.client.Request.Builder.prepareGet; +import static io.airlift.units.Duration.nanosSince; +import static java.util.Objects.requireNonNull; +import static java.util.Optional.empty; +import static java.util.Optional.ofNullable; +import static java.util.concurrent.TimeUnit.SECONDS; + +@ThreadSafe +public class RemoteNodeMemory +{ + private static final Logger log = Logger.get(RemoteNodeMemory.class); + + private final HttpClient httpClient; + private final URI memoryInfoUri; + private final JsonCodec memoryInfoCodec; + private final AtomicReference> memoryInfo = new AtomicReference<>(empty()); + private final AtomicReference> future = new AtomicReference<>(); + private final AtomicLong lastUpdateNanos = new AtomicLong(); + private final AtomicLong lastWarningLogged = new AtomicLong(); + + public RemoteNodeMemory(HttpClient httpClient, JsonCodec memoryInfoCodec, URI memoryInfoUri) + { + this.httpClient = requireNonNull(httpClient, "httpClient is null"); + this.memoryInfoUri = requireNonNull(memoryInfoUri, "memoryInfoUri is null"); + this.memoryInfoCodec = requireNonNull(memoryInfoCodec, "memoryInfoCodec is null"); + } + + public Optional getInfo() + { + return memoryInfo.get(); + } + + public void asyncRefresh() + { + Duration sinceUpdate = nanosSince(lastUpdateNanos.get()); + if (nanosSince(lastWarningLogged.get()).toMillis() > 1_000 && + sinceUpdate.toMillis() > 10_000 && + future.get() != null) { + log.warn("Memory info update request to %s has not returned in %s", memoryInfoUri, sinceUpdate.toString(SECONDS)); + lastWarningLogged.set(System.nanoTime()); + } + if (sinceUpdate.toMillis() > 1_000 && future.get() == null) { + Request request = prepareGet().setUri(memoryInfoUri).build(); + HttpResponseFuture> responseFuture = httpClient.executeAsync(request, createFullJsonResponseHandler(memoryInfoCodec)); + future.set(responseFuture); + + Futures.addCallback(responseFuture, new FutureCallback>() { + @Override + public void onSuccess(@Nullable JsonResponse result) + { + if (result != null && result.hasValue()) { + memoryInfo.set(ofNullable(result.getValue())); + } + lastUpdateNanos.set(System.nanoTime()); + future.compareAndSet(responseFuture, null); + } + + @Override + public void onFailure(Throwable t) + { + log.warn(t, "Error fetching memory info from %s", memoryInfoUri); + lastUpdateNanos.set(System.nanoTime()); + future.compareAndSet(responseFuture, null); + } + }); + } + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/server/HttpLocationFactory.java b/presto-main/src/main/java/com/facebook/presto/server/HttpLocationFactory.java index 9e14c510b804..fa3171328744 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/HttpLocationFactory.java +++ b/presto-main/src/main/java/com/facebook/presto/server/HttpLocationFactory.java @@ -27,6 +27,7 @@ import java.net.URI; import static io.airlift.http.client.HttpUriBuilder.uriBuilderFrom; +import static java.util.Objects.requireNonNull; public class HttpLocationFactory implements LocationFactory @@ -82,4 +83,12 @@ public URI createTaskLocation(Node node, TaskId taskId) .appendPath(taskId.toString()) .build(); } + + @Override + public URI createMemoryInfoLocation(Node node) + { + requireNonNull(node, "node is null"); + return uriBuilderFrom(node.getHttpUri()) + .appendPath("/v1/memory").build(); + } } diff --git a/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java b/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java index 59b5a47f45d2..9676263f43e6 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java +++ b/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java @@ -36,8 +36,11 @@ import com.facebook.presto.failureDetector.FailureDetectorModule; import com.facebook.presto.index.IndexManager; import com.facebook.presto.memory.ClusterMemoryManager; +import com.facebook.presto.memory.ForMemoryManager; import com.facebook.presto.memory.LocalMemoryManager; +import com.facebook.presto.memory.MemoryInfo; import com.facebook.presto.memory.MemoryManagerConfig; +import com.facebook.presto.memory.MemoryResource; import com.facebook.presto.metadata.CatalogManager; import com.facebook.presto.metadata.CatalogManagerConfig; import com.facebook.presto.metadata.HandleJsonModule; @@ -176,6 +179,11 @@ protected void setup(Binder binder) newExporter(binder).export(RemoteTaskFactory.class).withGeneratedName(); httpClientBinder(binder).bindHttpClient("scheduler", ForScheduler.class).withTracing(); + // memory manager + jaxrsBinder(binder).bind(MemoryResource.class); + httpClientBinder(binder).bindHttpClient("memoryManager", ForMemoryManager.class).withTracing(); + jsonCodecBinder(binder).bindJsonCodec(MemoryInfo.class); + // data stream provider binder.bind(PageSourceManager.class).in(Scopes.SINGLETON); binder.bind(PageSourceProvider.class).to(PageSourceManager.class).in(Scopes.SINGLETON); diff --git a/presto-main/src/main/java/com/facebook/presto/server/testing/TestingPrestoServer.java b/presto-main/src/main/java/com/facebook/presto/server/testing/TestingPrestoServer.java index 5d7d145028e3..c439809d09b5 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/testing/TestingPrestoServer.java +++ b/presto-main/src/main/java/com/facebook/presto/server/testing/TestingPrestoServer.java @@ -15,6 +15,7 @@ import com.facebook.presto.connector.ConnectorManager; import com.facebook.presto.execution.QueryManager; +import com.facebook.presto.memory.ClusterMemoryManager; import com.facebook.presto.metadata.AllNodes; import com.facebook.presto.metadata.InternalNodeManager; import com.facebook.presto.metadata.Metadata; @@ -76,6 +77,7 @@ public class TestingPrestoServer private final ConnectorManager connectorManager; private final TestingHttpServer server; private final Metadata metadata; + private final ClusterMemoryManager clusterMemoryManager; private final InternalNodeManager nodeManager; private final ServiceSelectorManager serviceSelectorManager; private final Announcer announcer; @@ -157,6 +159,7 @@ public TestingPrestoServer(boolean coordinator, Map properties, server = injector.getInstance(TestingHttpServer.class); metadata = injector.getInstance(Metadata.class); + clusterMemoryManager = injector.getInstance(ClusterMemoryManager.class); nodeManager = injector.getInstance(InternalNodeManager.class); serviceSelectorManager = injector.getInstance(ServiceSelectorManager.class); announcer = injector.getInstance(Announcer.class); @@ -228,6 +231,11 @@ public Metadata getMetadata() return metadata; } + public ClusterMemoryManager getClusterMemoryManager() + { + return clusterMemoryManager; + } + public final AllNodes refreshNodes() { serviceSelectorManager.forceRefresh(); diff --git a/presto-main/src/test/java/com/facebook/presto/execution/TestSqlTaskManager.java b/presto-main/src/test/java/com/facebook/presto/execution/TestSqlTaskManager.java index b24926597bfb..8d5701036a41 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/TestSqlTaskManager.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/TestSqlTaskManager.java @@ -294,5 +294,11 @@ public URI createTaskLocation(Node node, TaskId taskId) { return URI.create("fake://task/" + node.getNodeIdentifier() + "/" + taskId); } + + @Override + public URI createMemoryInfoLocation(Node node) + { + return URI.create("fake://" + node.getNodeIdentifier() + "/memory"); + } } } diff --git a/presto-tests/src/test/java/com/facebook/presto/tests/TestMemoryManager.java b/presto-tests/src/test/java/com/facebook/presto/memory/TestMemoryManager.java similarity index 71% rename from presto-tests/src/test/java/com/facebook/presto/tests/TestMemoryManager.java rename to presto-tests/src/test/java/com/facebook/presto/memory/TestMemoryManager.java index d478f650f2f7..99c1e9691fc4 100644 --- a/presto-tests/src/test/java/com/facebook/presto/tests/TestMemoryManager.java +++ b/presto-tests/src/test/java/com/facebook/presto/memory/TestMemoryManager.java @@ -11,18 +11,22 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.facebook.presto.tests; +package com.facebook.presto.memory; import com.facebook.presto.Session; import com.facebook.presto.testing.QueryRunner; +import com.facebook.presto.tests.DistributedQueryRunner; import com.facebook.presto.tpch.TpchPlugin; import com.google.common.collect.ImmutableMap; import org.testng.annotations.Test; import java.util.Map; +import static com.facebook.presto.memory.LocalMemoryManager.RESERVED_POOL; import static com.facebook.presto.spi.type.TimeZoneKey.UTC_KEY; import static java.util.Locale.ENGLISH; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.testng.Assert.assertTrue; @Test(singleThreaded = true) public class TestMemoryManager @@ -37,6 +41,32 @@ public class TestMemoryManager .setLocale(ENGLISH) .build(); + @Test(timeOut = 240_000) + public void testClusterPools() + throws Exception + { + Map properties = ImmutableMap.builder() + .put("experimental.cluster-memory-manager-enabled", "true") + .build(); + try (DistributedQueryRunner queryRunner = createQueryRunner(SESSION, properties)) { + ClusterMemoryManager memoryManager = queryRunner.getCoordinator().getClusterMemoryManager(); + ClusterMemoryPool reservedPool = null; + while (reservedPool == null) { + reservedPool = memoryManager.getPools().get(RESERVED_POOL); + MILLISECONDS.sleep(100); + } + + while (reservedPool.getNodes() == 0) { + MILLISECONDS.sleep(100); + } + + assertTrue(reservedPool.getNodes() > 0); + assertTrue(reservedPool.getBlockedNodes() == 0); + assertTrue(reservedPool.getTotalDistributedBytes() > 0); + assertTrue(reservedPool.getFreeDistributedBytes() > 0); + } + } + @Test(timeOut = 240_000, expectedExceptions = RuntimeException.class, expectedExceptionsMessageRegExp = ".*Query exceeded max memory size of 1kB.*") public void testQueryMemoryLimit() throws Exception