Skip to content

Commit

Permalink
Detect quick worker reboots and fail queries
Browse files Browse the repository at this point in the history
If a worker reboots quickly, the engine may think it is the same process and
will wait for results forever.  This change adds the process unique worker
instance id to the task info and if that changes, the query is failed
immediately.

Fixes prestodb#2330
  • Loading branch information
dain committed Feb 14, 2015
1 parent 4260ab3 commit 39202d3
Show file tree
Hide file tree
Showing 11 changed files with 51 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import java.net.URI;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -49,6 +50,7 @@ public class SqlTask
private static final Logger log = Logger.get(SqlTask.class);

private final TaskId taskId;
private final String nodeInstanceId;
private final URI location;
private final TaskStateMachine taskStateMachine;
private final SharedBuffer sharedBuffer;
Expand All @@ -62,13 +64,15 @@ public class SqlTask

public SqlTask(
TaskId taskId,
String nodeInstanceId,
URI location,
SqlTaskExecutionFactory sqlTaskExecutionFactory,
ExecutorService taskNotificationExecutor,
final Function<SqlTask, ?> onDone,
DataSize maxBufferSize)
{
this.taskId = checkNotNull(taskId, "taskId is null");
this.nodeInstanceId = checkNotNull(nodeInstanceId, "nodeInstanceId is null");
this.location = checkNotNull(location, "location is null");
this.sqlTaskExecutionFactory = checkNotNull(sqlTaskExecutionFactory, "sqlTaskExecutionFactory is null");
checkNotNull(taskNotificationExecutor, "taskNotificationExecutor is null");
Expand Down Expand Up @@ -174,6 +178,7 @@ private TaskInfo createTaskInfo(TaskHolder taskHolder)

return new TaskInfo(
taskStateMachine.getTaskId(),
Optional.of(nodeInstanceId),
versionNumber,
state,
location,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.concurrent.ThreadPoolExecutorMBean;
import io.airlift.log.Logger;
import io.airlift.node.NodeInfo;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import org.joda.time.DateTime;
Expand Down Expand Up @@ -78,8 +79,10 @@ public SqlTaskManager(
final LocationFactory locationFactory,
TaskExecutor taskExecutor,
QueryMonitor queryMonitor,
NodeInfo nodeInfo,
TaskManagerConfig config)
{
checkNotNull(nodeInfo, "nodeInfo is null");
checkNotNull(config, "config is null");
infoCacheTime = config.getInfoMaxAge();
clientTimeout = config.getClientTimeout();
Expand All @@ -102,6 +105,7 @@ public SqlTask load(TaskId taskId)
{
return new SqlTask(
taskId,
nodeInfo.getInstanceId(),
locationFactory.createLocalTaskLocation(taskId),
sqlTaskExecutionFactory,
taskNotificationExecutor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.net.URI;
import java.util.List;
import java.util.Optional;
import java.util.Set;

import static com.google.common.base.MoreObjects.toStringHelper;
Expand All @@ -50,6 +51,7 @@ public class TaskInfo
public static final long MAX_VERSION = Long.MAX_VALUE;

private final TaskId taskId;
private final Optional<String> nodeInstanceId;
private final long version;
private final TaskState state;
private final URI self;
Expand All @@ -61,6 +63,7 @@ public class TaskInfo

@JsonCreator
public TaskInfo(@JsonProperty("taskId") TaskId taskId,
@JsonProperty("nodeInstanceId") Optional<String> nodeInstanceId,
@JsonProperty("version") long version,
@JsonProperty("state") TaskState state,
@JsonProperty("self") URI self,
Expand All @@ -71,6 +74,7 @@ public TaskInfo(@JsonProperty("taskId") TaskId taskId,
@JsonProperty("failures") List<ExecutionFailureInfo> failures)
{
this.taskId = checkNotNull(taskId, "taskId is null");
this.nodeInstanceId = checkNotNull(nodeInstanceId, "nodeInstanceId is null");
this.version = checkNotNull(version, "version is null");
this.state = checkNotNull(state, "state is null");
this.self = checkNotNull(self, "self is null");
Expand All @@ -93,6 +97,12 @@ public TaskId getTaskId()
return taskId;
}

@JsonProperty
public Optional<String> getNodeInstanceId()
{
return nodeInstanceId;
}

@JsonProperty
public long getVersion()
{
Expand Down Expand Up @@ -143,7 +153,7 @@ public List<ExecutionFailureInfo> getFailures()

public TaskInfo summarize()
{
return new TaskInfo(taskId, version, state, self, lastHeartbeat, outputBuffers, noMoreSplits, stats.summarize(), failures);
return new TaskInfo(taskId, nodeInstanceId, version, state, self, lastHeartbeat, outputBuffers, noMoreSplits, stats.summarize(), failures);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CancellationException;
Expand All @@ -82,7 +83,9 @@

import static com.facebook.presto.spi.StandardErrorCode.REMOTE_TASK_ERROR;
import static com.facebook.presto.spi.StandardErrorCode.TOO_MANY_REQUESTS_FAILED;
import static com.facebook.presto.spi.StandardErrorCode.WORKER_RESTARTED;
import static com.facebook.presto.util.Failures.WORKER_NODE_ERROR;
import static com.facebook.presto.util.Failures.WORKER_RESTARTED_ERROR;
import static com.facebook.presto.util.Failures.toFailure;
import static com.facebook.presto.util.ImmutableCollectors.toImmutableList;
import static com.google.common.base.MoreObjects.toStringHelper;
Expand Down Expand Up @@ -188,6 +191,7 @@ public HttpRemoteTask(Session session,

taskInfo = new StateMachine<>("task " + taskId, executor, new TaskInfo(
taskId,
Optional.empty(),
TaskInfo.MIN_VERSION,
TaskState.PLANNED,
location,
Expand Down Expand Up @@ -322,7 +326,14 @@ private synchronized void updateTaskInfo(TaskInfo newValue, List<TaskSource> sou
}

// change to new value if old value is not changed and new value has a newer version
AtomicBoolean workerRestarted = new AtomicBoolean();
taskInfo.setIf(newValue, oldValue -> {
// did the worker restart
if (oldValue.getNodeInstanceId().isPresent() && !oldValue.getNodeInstanceId().equals(newValue.getNodeInstanceId())) {
workerRestarted.set(true);
return false;
}

if (oldValue.getState().isDone()) {
// never update if the task has reached a terminal state
return false;
Expand All @@ -334,6 +345,12 @@ private synchronized void updateTaskInfo(TaskInfo newValue, List<TaskSource> sou
return true;
});

if (workerRestarted.get()) {
PrestoException exception = new PrestoException(WORKER_RESTARTED, format("%s (%s)", WORKER_RESTARTED_ERROR, newValue.getSelf()));
failTask(exception);
abort();
}

// remove acknowledged splits, which frees memory
for (TaskSource source : sources) {
PlanNodeId planNodeId = source.getPlanNodeId();
Expand Down Expand Up @@ -465,6 +482,7 @@ public synchronized void abort()
TaskInfo taskInfo = getTaskInfo();
URI uri = taskInfo.getSelf();
updateTaskInfo(new TaskInfo(taskInfo.getTaskId(),
taskInfo.getNodeInstanceId(),
TaskInfo.MAX_VERSION,
TaskState.ABORTED,
uri,
Expand Down Expand Up @@ -521,6 +539,7 @@ private void failTask(Throwable cause)
log.debug(cause, "Remote task failed: %s", taskInfo.getSelf());
}
updateTaskInfo(new TaskInfo(taskInfo.getTaskId(),
taskInfo.getNodeInstanceId(),
TaskInfo.MAX_VERSION,
TaskState.FAILED,
taskInfo.getSelf(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@

public final class Failures
{
public static final String WORKER_NODE_ERROR = "" +
"Encountered too many errors talking to a worker node. " +
"The node may have crashed or be under too much load. " +
private static final String NODE_CRASHED_ERROR = "The node may have crashed or be under too much load. " +
"This is probably a transient issue, so please retry your query in a few minutes.";

public static final String WORKER_NODE_ERROR = "Encountered too many errors talking to a worker node. " + NODE_CRASHED_ERROR;

public static final String WORKER_RESTARTED_ERROR = "A worker node running your query has restarted. " + NODE_CRASHED_ERROR;

private Failures() {}

public static ExecutionFailureInfo toFailure(Throwable failure)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ public TaskInfo getTaskInfo()

return new TaskInfo(
taskStateMachine.getTaskId(),
Optional.empty(),
nextTaskInfoVersion.getAndIncrement(),
state,
location,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,7 @@ public TaskInfo getTaskInfo()

return new TaskInfo(
taskStateMachine.getTaskId(),
Optional.empty(),
nextTaskInfoVersion.getAndIncrement(),
state,
location,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ public SqlTask createInitialTask()

return new SqlTask(
taskId,
"test",
location,
sqlTaskExecutionFactory,
taskNotificationExecutor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ public SqlTaskManager createSqlTaskManager(TaskManagerConfig config)
new MockLocationFactory(),
taskExecutor,
new QueryMonitor(new ObjectMapperProvider().get(), new NullEventClient(), new NodeInfo("test"), new NodeVersion("testVersion")),
new NodeInfo("test"),
config);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -271,6 +272,7 @@ public TaskInfo getTaskInfo()

return new TaskInfo(
taskStateMachine.getTaskId(),
Optional.empty(),
nextTaskInfoVersion.getAndIncrement(),
state,
location,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public enum StandardErrorCode
NO_NODES_AVAILABLE(0x0001_0005),
REMOTE_TASK_ERROR(0x0001_0006),
COMPILER_ERROR(0x0001_0007),
WORKER_RESTARTED(0x0001_0008),

INSUFFICIENT_RESOURCES(0x0002_0000),
EXCEEDED_MEMORY_LIMIT(0x0002_0001),
Expand Down

0 comments on commit 39202d3

Please sign in to comment.