diff --git a/arangod/Aql/ExecutionBlock.cpp b/arangod/Aql/ExecutionBlock.cpp index c1c6139233..8296454d12 100644 --- a/arangod/Aql/ExecutionBlock.cpp +++ b/arangod/Aql/ExecutionBlock.cpp @@ -3617,7 +3617,8 @@ ScatterBlock::ScatterBlock (ExecutionEngine* engine, ScatterNode const* ep, std::vector const& shardIds) : ExecutionBlock(engine, ep), - _nrClients(shardIds.size()) { + _nrClients(shardIds.size()), + _initOrShutdown(true){ _shardIdMap.reserve(_nrClients); for (size_t i = 0; i < _nrClients; i++) { _shardIdMap.emplace(std::make_pair(shardIds[i], i)); @@ -3629,20 +3630,36 @@ ScatterBlock::ScatterBlock (ExecutionEngine* engine, //////////////////////////////////////////////////////////////////////////////// int ScatterBlock::initializeCursor (AqlItemBlock* items, size_t pos) { + + if (!_initOrShutdown) { + return TRI_ERROR_NO_ERROR; + } + int res = ExecutionBlock::initializeCursor(items, pos); if (res != TRI_ERROR_NO_ERROR) { return res; } + _posForClient.clear(); + _doneForClient.clear(); _doneForClient.reserve(_nrClients); for (size_t i = 0; i < _nrClients; i++) { _posForClient.push_back(std::make_pair(0, 0)); _doneForClient.push_back(false); } - + _initOrShutdown = false; return TRI_ERROR_NO_ERROR; + +} + +int ScatterBlock::shutdown () { + + if (!_initOrShutdown) { + return TRI_ERROR_NO_ERROR; + } + return ExecutionBlock::shutdown(); } //////////////////////////////////////////////////////////////////////////////// @@ -3707,6 +3724,7 @@ bool ScatterBlock::hasMoreForShard (std::string const& shardId){ // _buffer.at(i) we are sending to if (pos.first > _buffer.size()) { + _initOrShutdown = true; if (! getBlock(DefaultBatchSize, DefaultBatchSize)) { _doneForClient.at(clientId) = true; return false; @@ -3737,6 +3755,7 @@ int ScatterBlock::getOrSkipSomeForShard (size_t atLeast, // pull more blocks from dependency if necessary . . . if (pos.first >= _buffer.size()) { + _initOrShutdown = true; if (! getBlock(atLeast, atMost)) { _doneForClient.at(clientId) = true; return TRI_ERROR_NO_ERROR; diff --git a/arangod/Aql/ExecutionBlock.h b/arangod/Aql/ExecutionBlock.h index 4760965951..a62d262468 100644 --- a/arangod/Aql/ExecutionBlock.h +++ b/arangod/Aql/ExecutionBlock.h @@ -1540,6 +1540,12 @@ namespace triagens { int initializeCursor (AqlItemBlock* items, size_t pos); +//////////////////////////////////////////////////////////////////////////////// +/// @brief initializeCursor +//////////////////////////////////////////////////////////////////////////////// + + int shutdown (); + //////////////////////////////////////////////////////////////////////////////// /// @brief remaining //////////////////////////////////////////////////////////////////////////////// @@ -1649,6 +1655,12 @@ namespace triagens { size_t _nrClients; +//////////////////////////////////////////////////////////////////////////////// +/// @brief _reinit: should we really initialiseCursor or shutdown? +//////////////////////////////////////////////////////////////////////////////// + + bool _initOrShutdown; + }; // -----------------------------------------------------------------------------