Skip to content

Commit

Permalink
IMPALA-491: Improve error message when queries are cancelled due to B…
Browse files Browse the repository at this point in the history
…E nodes dying.

Change-Id: If9a47d9021b08385743093fbe8054b48119eaff9
Reviewed-on: https://gerrit.ent.cloudera.com:8080/523
Tested-by: jenkins
Reviewed-by: Alex Behm <[email protected]>
Tested-by: Alex Behm <[email protected]>
  • Loading branch information
Alex Behm authored and Henry Robinson committed Jan 8, 2014
1 parent b5ca38c commit 6a1cc58
Show file tree
Hide file tree
Showing 10 changed files with 145 additions and 53 deletions.
4 changes: 2 additions & 2 deletions be/src/runtime/coordinator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -894,12 +894,12 @@ Status Coordinator::ExecRemoteFragment(void* exec_state_arg) {
return exec_state->status;
}

void Coordinator::Cancel() {
void Coordinator::Cancel(const Status* cause) {
lock_guard<mutex> l(lock_);
// if the query status indicates an error, cancellation has already been initiated
if (!query_status_.ok()) return;
// prevent others from cancelling a second time
query_status_ = Status::CANCELLED;
query_status_ = (cause != NULL) ? *cause : Status::CANCELLED;
CancelInternal();
}

Expand Down
5 changes: 3 additions & 2 deletions be/src/runtime/coordinator.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,9 @@ class Coordinator {
Status GetNext(RowBatch** batch, RuntimeState* state);

// Cancel execution of query. This includes the execution of the local plan fragment,
// if any, as well as all plan fragments on remote nodes.
void Cancel();
// if any, as well as all plan fragments on remote nodes. Sets query_status_ to
// the given cause if non-NULL. Otherwise, sets query_status_ to Status::CANCELLED.
void Cancel(const Status* cause = NULL);

// Updates status and query execution metadata of a particular
// fragment; if 'status' is an error status or if 'done' is true,
Expand Down
22 changes: 4 additions & 18 deletions be/src/service/impala-beeswax-server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -442,28 +442,14 @@ Status ImpalaServer::QueryToTClientRequest(const Query& query,

inline void ImpalaServer::TUniqueIdToQueryHandle(const TUniqueId& query_id,
QueryHandle* handle) {
stringstream stringstream;
stringstream << query_id.hi << " " << query_id.lo;
handle->__set_id(stringstream.str());
handle->__set_log_context(stringstream.str());
string query_id_str = PrintId(query_id);
handle->__set_id(query_id_str);
handle->__set_log_context(query_id_str);
}

inline void ImpalaServer::QueryHandleToTUniqueId(const QueryHandle& handle,
TUniqueId* query_id) {
char_separator<char> sep(" ");
tokenizer< char_separator<char> > tokens(handle.id, sep);
int i = 0;
BOOST_FOREACH(const string& t, tokens) {
StringParser::ParseResult parse_result = StringParser::PARSE_SUCCESS;
int64_t id = StringParser::StringToInt<int64_t>(
t.c_str(), t.length(), &parse_result);
if (i == 0) {
query_id->hi = id;
} else {
query_id->lo = id;
}
++i;
}
ParseId(handle.id, query_id);
}

void ImpalaServer::RaiseBeeswaxException(const string& msg, const char* sql_state) {
Expand Down
84 changes: 65 additions & 19 deletions be/src/service/impala-server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,34 @@ const char* ImpalaServer::SQLSTATE_GENERAL_ERROR = "HY000";
const char* ImpalaServer::SQLSTATE_OPTIONAL_FEATURE_NOT_IMPLEMENTED = "HYC00";
const int ImpalaServer::ASCII_PRECISION = 16; // print 16 digits for double/float

// Work item for ImpalaServer::cancellation_thread_pool_.
class CancellationWork {
public:
CancellationWork(const TUniqueId& query_id, const Status& cause)
: query_id_(query_id), cause_(cause) {
}

CancellationWork() {
}

const TUniqueId& query_id() const { return query_id_; }
const Status& cause() const { return cause_; }

bool operator<(const CancellationWork& other) const {
return query_id_ < other.query_id_;
}

bool operator==(const CancellationWork& other) const {
return query_id_ == other.query_id_;
}

private:
// Id of query to be canceled.
TUniqueId query_id_;

// Error status containing a list of failed impalads causing the cancellation.
Status cause_;
};

ImpalaServer::ImpalaServer(ExecEnv* exec_env)
: exec_env_(exec_env) {
Expand Down Expand Up @@ -413,7 +441,7 @@ ImpalaServer::ImpalaServer(ExecEnv* exec_env)
// Initialise the cancellation thread pool with 5 (by default) threads. The max queue
// size is deliberately set so high that it should never fill; if it does the
// cancellations will get ignored and retried on the next statestore heartbeat.
cancellation_thread_pool_.reset(new ThreadPool<TUniqueId>(
cancellation_thread_pool_.reset(new ThreadPool<CancellationWork>(
"impala-server", "cancellation-worker",
FLAGS_cancellation_thread_pool_size, MAX_CANCELLATION_QUEUE_SIZE,
bind<void>(&ImpalaServer::CancelFromThreadPool, this, _1, _2)));
Expand Down Expand Up @@ -1108,14 +1136,14 @@ Status ImpalaServer::UpdateCatalogMetrics() {
return Status::OK;
}

Status ImpalaServer::CancelInternal(const TUniqueId& query_id) {
Status ImpalaServer::CancelInternal(const TUniqueId& query_id, const Status* cause) {
VLOG_QUERY << "Cancel(): query_id=" << PrintId(query_id);
shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, true);
if (exec_state == NULL) return Status("Invalid or unknown query handle");

lock_guard<mutex> l(*exec_state->lock(), adopt_lock_t());
// TODO: can we call Coordinator::Cancel() here while holding lock?
exec_state->Cancel();
exec_state->Cancel(cause);
return Status::OK;
}

Expand Down Expand Up @@ -1540,11 +1568,13 @@ void ImpalaServer::SessionState::ToThrift(const TUniqueId& session_id,
state->network_address = network_address;
}

void ImpalaServer::CancelFromThreadPool(uint32_t thread_id, const TUniqueId& query_id) {
Status status = CancelInternal(query_id);
void ImpalaServer::CancelFromThreadPool(uint32_t thread_id,
const CancellationWork& cancellation_work) {
Status status =
CancelInternal(cancellation_work.query_id(), &cancellation_work.cause());
if (!status.ok()) {
VLOG_QUERY << "Query cancellation (" << query_id << ") did not succeed: "
<< status.GetErrorMsg();
VLOG_QUERY << "Query cancellation (" << cancellation_work.query_id()
<< ") did not succeed: " << status.GetErrorMsg();
}
}

Expand Down Expand Up @@ -1587,25 +1617,32 @@ void ImpalaServer::MembershipCallback(
current_membership.insert(backend.second);
}

set<TUniqueId> queries_to_cancel;
// Maps from query id (to be cancelled) to a list of failed Impalads that are
// the cause of the cancellation.
map<TUniqueId, vector<TNetworkAddress> > queries_to_cancel;
{
// Build a list of queries that are running on failed hosts (as evidenced by their
// absence from the membership list).
// TODO: crash-restart failures can give false negatives for failed Impala demons.
lock_guard<mutex> l(query_locations_lock_);
QueryLocations::const_iterator backend = query_locations_.begin();
while (backend != query_locations_.end()) {
if (current_membership.find(backend->first) == current_membership.end()) {
queries_to_cancel.insert(backend->second.begin(), backend->second.end());
exec_env_->client_cache()->CloseConnections(backend->first);
QueryLocations::const_iterator loc_entry = query_locations_.begin();
while (loc_entry != query_locations_.end()) {
if (current_membership.find(loc_entry->first) == current_membership.end()) {
unordered_set<TUniqueId>::const_iterator query_id = loc_entry->second.begin();
// Add failed backend locations to all queries that ran on that backend.
for(; query_id != loc_entry->second.end(); ++query_id) {
vector<TNetworkAddress>& failed_hosts = queries_to_cancel[*query_id];
failed_hosts.push_back(loc_entry->first);
}
exec_env_->client_cache()->CloseConnections(loc_entry->first);
// We can remove the location wholesale once we know backend's failed. To do so
// safely during iteration, we have to be careful not in invalidate the current
// iterator, so copy the iterator to do the erase(..) and advance the original.
QueryLocations::const_iterator failed_backend = backend;
++backend;
QueryLocations::const_iterator failed_backend = loc_entry;
++loc_entry;
query_locations_.erase(failed_backend);
} else {
++backend;
++loc_entry;
}
}
}
Expand All @@ -1619,8 +1656,18 @@ void ImpalaServer::MembershipCallback(
// Since we are the only producer for this pool, we know that this cannot block
// indefinitely since the queue is large enough to accept all new cancellation
// requests.
BOOST_FOREACH(const TUniqueId& query_id, queries_to_cancel) {
cancellation_thread_pool_->Offer(query_id);
map<TUniqueId, vector<TNetworkAddress> >::iterator cancellation_entry;
for (cancellation_entry = queries_to_cancel.begin();
cancellation_entry != queries_to_cancel.end();
++cancellation_entry) {
stringstream cause_msg;
cause_msg << "Cancelled due to unreachable impalad(s): ";
for (int i = 0; i < cancellation_entry->second.size(); ++i) {
cause_msg << cancellation_entry->second[i];
if (i + 1 != cancellation_entry->second.size()) cause_msg << ", ";
}
cancellation_thread_pool_->Offer(
CancellationWork(cancellation_entry->first, Status(cause_msg.str())));
}
}
}
Expand Down Expand Up @@ -1778,5 +1825,4 @@ shared_ptr<ImpalaServer::QueryExecState> ImpalaServer::GetQueryExecState(
}
}


}
17 changes: 11 additions & 6 deletions be/src/service/impala-server.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ namespace impala {

class ExecEnv;
class DataSink;
class CancellationWork;
class Coordinator;
class RowDescriptor;
class TCatalogUpdate;
Expand Down Expand Up @@ -337,10 +338,12 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf,
// Returns true if it found a registered exec_state, otherwise false.
bool UnregisterQuery(const TUniqueId& query_id);

// Initiates query cancellation. Returns OK unless query_id is not found.
// Initiates query cancellation reporting the given cause as the query status.
// Assumes deliberate cancellation by the user if the cause is NULL.
// Returns OK unless query_id is not found.
// Queries still need to be unregistered, usually via Close, after cancellation.
// Caller should not hold any locks when calling this function.
Status CancelInternal(const TUniqueId& query_id);
Status CancelInternal(const TUniqueId& query_id, const Status* cause = NULL);

// Close the session and release all resource used by this session.
// Caller should not hold any locks when calling this function.
Expand Down Expand Up @@ -556,9 +559,11 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf,
QueryStateToTOperationState(const beeswax::QueryState::type& query_state);

// Helper method to process cancellations that result from failed backends, called from
// the cancellation thread pool. Calls CancelInternal directly, but has a signature
// compatible with the thread pool.
void CancelFromThreadPool(uint32_t thread_id, const TUniqueId& query_id);
// the cancellation thread pool. The cancellation_work contains the query id to cancel
// and a cause listing the failed backends that led to cancellation. Calls
// CancelInternal directly, but has a signature compatible with the thread pool.
void CancelFromThreadPool(uint32_t thread_id,
const CancellationWork& cancellation_work);

// For access to GetTableNames and DescribeTable
friend class DdlExecutor;
Expand Down Expand Up @@ -596,7 +601,7 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf,

// Thread pool to process cancellation requests that come from failed Impala demons to
// avoid blocking the statestore callback.
boost::scoped_ptr<ThreadPool<TUniqueId> > cancellation_thread_pool_;
boost::scoped_ptr<ThreadPool<CancellationWork> > cancellation_thread_pool_;

// map from query id to exec state; QueryExecState is owned by us and referenced
// as a shared_ptr to allow asynchronous deletion
Expand Down
4 changes: 2 additions & 2 deletions be/src/service/query-exec-state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -342,14 +342,14 @@ Status ImpalaServer::QueryExecState::GetRowValue(TupleRow* row, vector<void*>* r
return Status::OK;
}

void ImpalaServer::QueryExecState::Cancel() {
void ImpalaServer::QueryExecState::Cancel(const Status* cause) {
// If the query is completed, no need to cancel.
if (eos_) return;
// we don't want multiple concurrent cancel calls to end up executing
// Coordinator::Cancel() multiple times
if (query_state_ == QueryState::EXCEPTION) return;
query_state_ = QueryState::EXCEPTION;
if (coord_.get() != NULL) coord_->Cancel();
if (coord_.get() != NULL) coord_->Cancel(cause);
}

Status ImpalaServer::QueryExecState::UpdateMetastore() {
Expand Down
4 changes: 2 additions & 2 deletions be/src/service/query-exec-state.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,10 @@ class ImpalaServer::QueryExecState {
// is taken before calling UpdateQueryStatus
Status UpdateQueryStatus(const Status& status);

// Sets state to EXCEPTION and cancels coordinator.
// Sets state to EXCEPTION and cancels coordinator with the given cause.
// Caller needs to hold lock_.
// Does nothing if the query has reached EOS.
void Cancel();
void Cancel(const Status* cause = NULL);

// This is called when the query is done (finished, cancelled, or failed).
// Takes lock_: callers must not hold lock() before calling.
Expand Down
6 changes: 5 additions & 1 deletion tests/common/impala_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,15 @@ class ImpaladProcess(BaseImpalaProcess):
def __init__(self, cmd):
super(ImpaladProcess, self).__init__(cmd, socket.gethostname())
self.service = ImpaladService(self.hostname,
self._get_webserver_port(default=25000), self.__get_beeswax_port(default=21000))
self._get_webserver_port(default=25000), self.__get_beeswax_port(default=21000),
self.__get_be_port(default=22000))

def __get_beeswax_port(self, default=None):
return int(self._get_arg_value('beeswax_port', default))

def __get_be_port(self, default=None):
return int(self._get_arg_value('be_port', default))

def start(self, wait_until_ready=True):
"""Starts the impalad and waits until the service is ready to accept connections."""
super(ImpaladProcess, self).start()
Expand Down
31 changes: 30 additions & 1 deletion tests/common/impala_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,10 @@ def wait_for_metric_value(self, metric_name, expected_value, timeout=10, interva
# Allows for interacting with an Impalad instance to perform operations such as creating
# new connections or accessing the debug webpage.
class ImpaladService(BaseImpalaService):
def __init__(self, hostname, webserver_port=25000, beeswax_port=21000):
def __init__(self, hostname, webserver_port=25000, beeswax_port=21000, be_port=22000):
super(ImpaladService, self).__init__(hostname, webserver_port)
self.beeswax_port = beeswax_port
self.be_port = be_port

def get_num_known_live_backends(self, timeout=30, interval=1):
LOG.info("Getting num_known_live_backends from %s:%s" %\
Expand All @@ -108,6 +109,34 @@ def wait_for_num_known_live_backends(self, expected_value, timeout=30, interval=
sleep(1)
assert 0, 'num_known_live_backends did not reach expected value in time'

def read_query_profile_page(self, query_id, timeout=10, interval=1):
"""Fetches the raw contents of the query's runtime profile webpage.
Fails an assertion if Impala's webserver is unavailable or the query's
profile page doesn't exist."""
return self.read_debug_webpage("query_profile?query_id=%s&raw" % (query_id))

def get_query_status(self, query_id):
"""Gets the 'Query Status' section of the query's runtime profile."""
page = self.read_query_profile_page(query_id)
status_line =\
next((x for x in page.split('\n') if re.search('Query Status:', x)), None)
return status_line.split('Query Status:')[1].strip()

def wait_for_query_state(self, client, query_handle, target_state,
timeout=10, interval=1):
"""Keeps polling for the query's state using client in the given interval until
the query's state reaches the target state or the given timeout has been reached."""
start_time = time()
while (time() - start_time < timeout):
try:
query_state = client.get_state(query_handle)
except Exception as e:
pass
if query_state == target_state:
return
sleep(interval)
return

def create_beeswax_client(self, use_kerberos=False):
"""Creates a new beeswax client connection to the impalad"""
client =\
Expand Down
21 changes: 21 additions & 0 deletions tests/experiments/test_process_failures.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ def test_kill_restart_worker(self, vector):
worker_impalad = self.cluster.get_different_impalad(impalad)
print "Coordinator impalad: %s Worker impalad: %s" % (impalad, worker_impalad)

# Start executing a query. It will be cancelled due to a killed worker.
handle = client.execute_query_async(QUERY)

statestored = self.cluster.statestored
worker_impalad.kill()

Expand All @@ -143,6 +146,24 @@ def test_kill_restart_worker(self, vector):
# Wait until the impalad registers another instance went down.
impalad.service.wait_for_num_known_live_backends(CLUSTER_SIZE - 1, timeout=30)

# Wait until the in-flight query has been cancelled.
impalad.service.wait_for_query_state(client, handle,\
client.query_states['EXCEPTION'])

# The in-flight query should have been cancelled, reporting a failed worker as the
# cause. The query may have been cancelled because the state store detected a failed
# node, or because a stream sender failed to establish a thrift connection. It is
# non-deterministic which of those paths will initiate cancellation, but in either
# case the query status should include the failed (or unreachable) worker.
assert client.get_state(handle) == client.query_states['EXCEPTION']
query_status = impalad.service.get_query_status(handle.id)
if query_status is None:
assert False, "Could not find 'Query Status' section in profile of "\
"query with id %s:\n%s" % (handle.id)
failed_hostport = "%s:%s" % (worker_impalad.service.hostname,\
worker_impalad.service.be_port)
assert failed_hostport in query_status

# Should work fine even if a worker is down.
self.execute_query_using_client(client, QUERY, vector)

Expand Down

0 comments on commit 6a1cc58

Please sign in to comment.