Skip to content

Commit

Permalink
Track all memory pools in coordinator
Browse files Browse the repository at this point in the history
  • Loading branch information
cberner committed Apr 30, 2015
1 parent 1d05622 commit e9fc98a
Show file tree
Hide file tree
Showing 15 changed files with 485 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,6 @@ public interface LocationFactory
URI createLocalTaskLocation(TaskId taskId);

URI createTaskLocation(Node node, TaskId taskId);

URI createMemoryInfoLocation(Node node);
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,70 @@
package com.facebook.presto.memory;

import com.facebook.presto.ExceededMemoryLimitException;
import com.facebook.presto.execution.LocationFactory;
import com.facebook.presto.execution.QueryExecution;
import com.facebook.presto.spi.Node;
import com.facebook.presto.spi.NodeManager;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.airlift.http.client.HttpClient;
import io.airlift.json.JsonCodec;
import io.airlift.log.Logger;
import io.airlift.units.DataSize;
import org.weakref.jmx.JmxException;
import org.weakref.jmx.MBeanExporter;
import org.weakref.jmx.Managed;
import org.weakref.jmx.ObjectNames;

import javax.annotation.PreDestroy;
import javax.annotation.concurrent.GuardedBy;
import javax.inject.Inject;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;

import static com.facebook.presto.util.ImmutableCollectors.toImmutableList;
import static com.facebook.presto.util.ImmutableCollectors.toImmutableSet;
import static com.google.common.collect.Sets.difference;
import static java.util.Objects.requireNonNull;

public class ClusterMemoryManager
{
private static final Logger log = Logger.get(ClusterMemoryManager.class);
private final NodeManager nodeManager;
private final LocationFactory locationFactory;
private final HttpClient httpClient;
private final MBeanExporter exporter;
private final JsonCodec<MemoryInfo> memoryInfoCodec;
private final DataSize maxQueryMemory;
private final boolean enabled;
private final AtomicLong clusterMemoryUsageBytes = new AtomicLong();
private final AtomicLong clusterMemoryBytes = new AtomicLong();
private final Map<String, RemoteNodeMemory> nodes = new HashMap<>();

@GuardedBy("this")
private final Map<MemoryPoolId, ClusterMemoryPool> pools = new HashMap<>();

@Inject
public ClusterMemoryManager(MemoryManagerConfig config)
public ClusterMemoryManager(
@ForMemoryManager HttpClient httpClient,
NodeManager nodeManager,
LocationFactory locationFactory,
MBeanExporter exporter,
JsonCodec<MemoryInfo> memoryInfoCodec,
MemoryManagerConfig config)
{
requireNonNull(config, "config is null");
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
this.locationFactory = requireNonNull(locationFactory, "locationFactory is null");
this.httpClient = requireNonNull(httpClient, "httpClient is null");
this.exporter = requireNonNull(exporter, "exporter is null");
this.memoryInfoCodec = requireNonNull(memoryInfoCodec, "memoryInfoCodec is null");
this.maxQueryMemory = config.getMaxQueryMemory();
this.enabled = config.isClusterMemoryManagerEnabled();
}
Expand All @@ -52,11 +96,109 @@ public void process(Iterable<QueryExecution> queries)
}
}
clusterMemoryUsageBytes.set(totalBytes);

updateNodes();
updatePools();
}

@VisibleForTesting
synchronized Map<MemoryPoolId, ClusterMemoryPool> getPools()
{
return ImmutableMap.copyOf(pools);
}

private void updateNodes()
{
Set<Node> activeNodes = nodeManager.getActiveNodes();
ImmutableSet<String> activeNodeIds = activeNodes.stream()
.map(Node::getNodeIdentifier)
.collect(toImmutableSet());

// Remove nodes that don't exist anymore
nodes.keySet().removeAll(difference(nodes.keySet(), activeNodeIds));

// Add new nodes
for (Node node : activeNodes) {
if (!nodes.containsKey(node.getNodeIdentifier())) {
nodes.put(node.getNodeIdentifier(), new RemoteNodeMemory(httpClient, memoryInfoCodec, locationFactory.createMemoryInfoLocation(node)));
}
}

// Schedule refresh
for (RemoteNodeMemory node : nodes.values()) {
node.asyncRefresh();
}
}

private synchronized void updatePools()
{
// Update view of cluster memory and pools
List<MemoryInfo> nodeMemoryInfos = nodes.values().stream()
.map(RemoteNodeMemory::getInfo)
.filter(Optional::isPresent)
.map(Optional::get)
.collect(toImmutableList());

long totalClusterMemory = nodeMemoryInfos.stream()
.map(MemoryInfo::getTotalNodeMemory)
.mapToLong(DataSize::toBytes)
.sum();
clusterMemoryBytes.set(totalClusterMemory);

Set<MemoryPoolId> activePoolIds = nodeMemoryInfos.stream()
.flatMap(info -> info.getPools().keySet().stream())
.collect(toImmutableSet());

Set<MemoryPoolId> removedPools = difference(pools.keySet(), activePoolIds);
for (MemoryPoolId removed : removedPools) {
unexport(pools.get(removed));
pools.remove(removed);
}
for (MemoryPoolId id : activePoolIds) {
ClusterMemoryPool pool = pools.computeIfAbsent(id, poolId -> {
ClusterMemoryPool newPool = new ClusterMemoryPool(poolId);
String objectName = ObjectNames.builder(ClusterMemoryPool.class, newPool.getId().toString()).build();
try {
exporter.export(objectName, newPool);
}
catch (JmxException e) {
log.error(e, "Error exporting memory pool %s", poolId);
}
return newPool;
});
pool.update(nodeMemoryInfos);
}
}

@PreDestroy
public synchronized void destroy()
{
for (ClusterMemoryPool pool : pools.values()) {
unexport(pool);
}
pools.clear();
}

private void unexport(ClusterMemoryPool pool)
{
try {
String objectName = ObjectNames.builder(ClusterMemoryPool.class, pool.getId().toString()).build();
exporter.unexport(objectName);
}
catch (JmxException e) {
log.error(e, "Failed to unexport pool %s", pool.getId());
}
}

@Managed
public long getClusterMemoryUsageBytes()
{
return clusterMemoryUsageBytes.get();
}

@Managed
public long getClusterMemoryBytes()
{
return clusterMemoryBytes.get();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.memory;

import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import org.weakref.jmx.Managed;

import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;

import java.util.List;

import static java.util.Objects.requireNonNull;

@ThreadSafe
public class ClusterMemoryPool
{
private final MemoryPoolId id;

@GuardedBy("this")
private long totalDistributedBytes;

@GuardedBy("this")
private long freeDistributedBytes;

@GuardedBy("this")
private int nodes;

@GuardedBy("this")
private int blockedNodes;

public ClusterMemoryPool(MemoryPoolId id)
{
this.id = requireNonNull(id, "id is null");
}

public MemoryPoolId getId()
{
return id;
}

@Managed
public synchronized long getTotalDistributedBytes()
{
return totalDistributedBytes;
}

@Managed
public synchronized long getFreeDistributedBytes()
{
return freeDistributedBytes;
}

@Managed
public synchronized int getNodes()
{
return nodes;
}

@Managed
public synchronized int getBlockedNodes()
{
return blockedNodes;
}

public synchronized void update(List<MemoryInfo> memoryInfos)
{
nodes = 0;
blockedNodes = 0;
totalDistributedBytes = 0;
freeDistributedBytes = 0;

for (MemoryInfo info : memoryInfos) {
MemoryPoolInfo poolInfo = info.getPools().get(id);
if (poolInfo != null) {
nodes++;
if (poolInfo.getFreeBytes() <= 0) {
blockedNodes++;
}
totalDistributedBytes += poolInfo.getMaxBytes();
freeDistributedBytes += poolInfo.getFreeBytes();
}
}
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ClusterMemoryPool that = (ClusterMemoryPool) o;
return Objects.equal(id, that.id);
}

@Override
public int hashCode()
{
return Objects.hashCode(id);
}

@Override
public synchronized String toString()
{
return MoreObjects.toStringHelper(this)
.add("id", id)
.add("totalDistributedBytes", totalDistributedBytes)
.add("freeDistributedBytes", freeDistributedBytes)
.add("nodes", nodes)
.add("blockedNodes", blockedNodes)
.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.memory;

import javax.inject.Qualifier;

import java.lang.annotation.Retention;
import java.lang.annotation.Target;

import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

@Retention(RUNTIME)
@Target({FIELD, PARAMETER, METHOD})
@Qualifier
public @interface ForMemoryManager
{
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

import java.util.Map;

import static com.facebook.presto.util.ImmutableCollectors.toImmutableList;
import static io.airlift.units.DataSize.Unit.BYTE;
import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -51,10 +50,11 @@ public LocalMemoryManager(MemoryManagerConfig config, MBeanExporter exporter)

public MemoryInfo getInfo()
{
return new MemoryInfo(maxMemory,
pools.values().stream()
.map(MemoryPool::getInfo)
.collect(toImmutableList()));
ImmutableMap.Builder<MemoryPoolId, MemoryPoolInfo> builder = ImmutableMap.builder();
for (Map.Entry<MemoryPoolId, MemoryPool> entry : pools.entrySet()) {
builder.put(entry.getKey(), entry.getValue().getInfo());
}
return new MemoryInfo(maxMemory, builder.build());
}

public MemoryPool getPool(MemoryPoolId id)
Expand Down
Loading

0 comments on commit e9fc98a

Please sign in to comment.