1
0
Fork 0

Fix AQL query shutdown in cluster.

This commit is contained in:
Max Neunhoeffer 2015-11-30 14:18:49 +01:00
parent b11e88400b
commit f7e0b53fff
4 changed files with 14 additions and 31 deletions

View File

@ -529,8 +529,7 @@ BlockWithClients::BlockWithClients (ExecutionEngine* engine,
ExecutionNode const* ep,
std::vector<std::string> const& shardIds)
: ExecutionBlock(engine, ep),
_nrClients(shardIds.size()),
_ignoreShutdown(false) {
_nrClients(shardIds.size()) {
_shardIdMap.reserve(_nrClients);
for (size_t i = 0; i < _nrClients; i++) {
@ -570,9 +569,6 @@ int BlockWithClients::initializeCursor (AqlItemBlock* items, size_t pos) {
int BlockWithClients::shutdown (int errorCode) {
ENTER_BLOCK
TRI_ASSERT(! _ignoreShutdown);
_ignoreShutdown = true;
_doneForClient.clear();
return ExecutionBlock::shutdown(errorCode);
@ -587,7 +583,6 @@ AqlItemBlock* BlockWithClients::getSomeForShard (size_t atLeast,
size_t atMost,
std::string const& shardId) {
ENTER_BLOCK
_ignoreShutdown = false;
size_t skipped = 0;
AqlItemBlock* result = nullptr;
@ -613,7 +608,6 @@ size_t BlockWithClients::skipSomeForShard (size_t atLeast,
size_t atMost,
std::string const& shardId) {
ENTER_BLOCK
_ignoreShutdown = false;
size_t skipped = 0;
AqlItemBlock* result = nullptr;
int out = getOrSkipSomeForShard(atLeast, atMost, true, result, skipped, shardId);
@ -698,10 +692,6 @@ int ScatterBlock::initializeCursor (AqlItemBlock* items, size_t pos) {
int ScatterBlock::shutdown (int errorCode) {
ENTER_BLOCK
if (_ignoreShutdown) {
return TRI_ERROR_NO_ERROR;
}
int res = BlockWithClients::shutdown(errorCode);
if (res != TRI_ERROR_NO_ERROR) {
return res;
@ -726,9 +716,6 @@ bool ScatterBlock::hasMoreForShard (std::string const& shardId) {
return false;
}
// TODO is this correct?
_ignoreShutdown = false;
std::pair<size_t,size_t> pos = _posForClient.at(clientId);
// (i, j) where i is the position in _buffer, and j is the position in
// _buffer.at(i) we are sending to <clientId>
@ -907,10 +894,6 @@ int DistributeBlock::initializeCursor (AqlItemBlock* items, size_t pos) {
int DistributeBlock::shutdown (int errorCode) {
ENTER_BLOCK
if (_ignoreShutdown) {
return TRI_ERROR_NO_ERROR;
}
int res = BlockWithClients::shutdown(errorCode);
if (res != TRI_ERROR_NO_ERROR) {
return res;
@ -935,9 +918,6 @@ bool DistributeBlock::hasMoreForShard (std::string const& shardId) {
return false;
}
// TODO is this correct?
_ignoreShutdown = false;
if (! _distBuffer.at(clientId).empty()) {
return true;
}
@ -1504,6 +1484,12 @@ int RemoteBlock::initializeCursor (AqlItemBlock* items, size_t pos) {
int RemoteBlock::shutdown (int errorCode) {
ENTER_BLOCK
if (! static_cast<RemoteNode const*>(_exeNode)->isResponsibleForInitCursor()) {
// do nothing...
return TRI_ERROR_NO_ERROR;
}
// For every call we simply forward via HTTP
std::unique_ptr<ClusterCommResult> res;

View File

@ -339,12 +339,6 @@ namespace triagens {
std::vector<bool> _doneForClient;
////////////////////////////////////////////////////////////////////////////////
/// @brief _shutdown: should we really shutdown?
////////////////////////////////////////////////////////////////////////////////
bool _ignoreShutdown;
};
// -----------------------------------------------------------------------------

View File

@ -80,7 +80,8 @@ namespace triagens {
}
////////////////////////////////////////////////////////////////////////////////
/// @brief whether or not this node will forward initializeCursor requests
/// @brief whether or not this node will forward initializeCursor or shutDown
/// requests
////////////////////////////////////////////////////////////////////////////////
void isResponsibleForInitCursor (bool value) {
@ -88,7 +89,8 @@ namespace triagens {
}
////////////////////////////////////////////////////////////////////////////////
/// @brief whether or not this node will forward initializeCursor requests
/// @brief whether or not this node will forward initializeCursor or shutDown
/// requests
////////////////////////////////////////////////////////////////////////////////
bool isResponsibleForInitCursor () const {
@ -239,7 +241,8 @@ namespace triagens {
std::string _queryId;
////////////////////////////////////////////////////////////////////////////////
/// @brief whether or not this node will forward initializeCursor requests
/// @brief whether or not this node will forward initializeCursor and shutDown
/// requests
////////////////////////////////////////////////////////////////////////////////
bool _isResponsibleForInitCursor;

View File

@ -493,7 +493,7 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
static_cast<RemoteNode*>(clone)->server("server:" + triagens::arango::ServerState::instance()->getId());
static_cast<RemoteNode*>(clone)->ownName(shardId);
static_cast<RemoteNode*>(clone)->queryId(connectedId);
// only one of the remote blocks is responsible for forwarding the initializeCursor requests
// only one of the remote blocks is responsible for forwarding the initializeCursor and shutDown requests
// for simplicity, we always use the first remote block if we have more than one
static_cast<RemoteNode*>(clone)->isResponsibleForInitCursor(nr == 0);
}