From 5821b9da0017b2c13422e9678a28090275a1333c Mon Sep 17 00:00:00 2001 From: James Date: Thu, 23 Oct 2014 21:02:20 +0100 Subject: [PATCH] fixing Distribute/ScatterBlocks. --- arangod/Aql/ExecutionBlock.cpp | 130 ++++++++++++++++++++++++--------- arangod/Aql/ExecutionBlock.h | 49 +++++++++---- 2 files changed, 128 insertions(+), 51 deletions(-) diff --git a/arangod/Aql/ExecutionBlock.cpp b/arangod/Aql/ExecutionBlock.cpp index d98902e4e0..75b89b20d2 100644 --- a/arangod/Aql/ExecutionBlock.cpp +++ b/arangod/Aql/ExecutionBlock.cpp @@ -3795,7 +3795,8 @@ BlockWithClients::BlockWithClients (ExecutionEngine* engine, std::vector const& shardIds) : ExecutionBlock(engine, ep), _nrClients(shardIds.size()), - _initOrShutdown(true) { + _ignoreInitCursor(false), + _ignoreShutdown(false){ _shardIdMap.reserve(_nrClients); for (size_t i = 0; i < _nrClients; i++) { @@ -3803,17 +3804,45 @@ BlockWithClients::BlockWithClients (ExecutionEngine* engine, } } +//////////////////////////////////////////////////////////////////////////////// +/// @brief initializeCursor: reset _doneForClient +//////////////////////////////////////////////////////////////////////////////// + +int BlockWithClients::initializeCursor (AqlItemBlock* items, size_t pos) { + ENTER_BLOCK + TRI_ASSERT(! _ignoreInitCursor); + _ignoreInitCursor = true; + + int res = ExecutionBlock::initializeCursor(items, pos); + + if (res != TRI_ERROR_NO_ERROR) { + return res; + } + + _doneForClient.clear(); + _doneForClient.reserve(_nrClients); + + for (size_t i = 0; i < _nrClients; i++) { + _doneForClient.push_back(false); + } + + return TRI_ERROR_NO_ERROR; + + LEAVE_BLOCK +} + //////////////////////////////////////////////////////////////////////////////// /// @brief shutdown //////////////////////////////////////////////////////////////////////////////// int BlockWithClients::shutdown (int errorCode) { ENTER_BLOCK - if (! _initOrShutdown) { - return TRI_ERROR_NO_ERROR; - } - _initOrShutdown = false; + TRI_ASSERT(! _ignoreShutdown); + _ignoreShutdown = true; + + _doneForClient.clear(); + return ExecutionBlock::shutdown(errorCode); LEAVE_BLOCK } @@ -3826,6 +3855,7 @@ AqlItemBlock* BlockWithClients::getSomeForShard (size_t atLeast, size_t atMost, std::string const& shardId) { ENTER_BLOCK + _ignoreInitCursor = false; size_t skipped = 0; AqlItemBlock* result = nullptr; int out = getOrSkipSomeForShard(atLeast, atMost, false, result, skipped, shardId); @@ -3844,6 +3874,7 @@ size_t BlockWithClients::skipSomeForShard (size_t atLeast, size_t atMost, std::string const& shardId) { ENTER_BLOCK + _ignoreInitCursor = false; size_t skipped = 0; AqlItemBlock* result = nullptr; int out = getOrSkipSomeForShard(atLeast, atMost, true, result, skipped, shardId); @@ -3862,6 +3893,7 @@ size_t BlockWithClients::skipSomeForShard (size_t atLeast, bool BlockWithClients::skipForShard (size_t number, std::string const& shardId) { ENTER_BLOCK + _ignoreInitCursor = false; size_t skipped = skipSomeForShard(number, number, shardId); size_t nr = skipped; while (nr != 0 && skipped < number) { @@ -3897,29 +3929,6 @@ size_t BlockWithClients::getClientId (std::string const& shardId) { LEAVE_BLOCK } -//////////////////////////////////////////////////////////////////////////////// -/// @brief preInitCursor: check if we should really init the cursor, and reset -/// _doneForClient -//////////////////////////////////////////////////////////////////////////////// - -bool BlockWithClients::preInitCursor () { - ENTER_BLOCK - if (! _initOrShutdown) { - return false; - } - - _doneForClient.clear(); - _doneForClient.reserve(_nrClients); - - for (size_t i = 0; i < _nrClients; i++) { - _doneForClient.push_back(false); - } - - _initOrShutdown = false; - return true; - LEAVE_BLOCK -} - // ----------------------------------------------------------------------------- // --SECTION-- class ScatterBlock // ----------------------------------------------------------------------------- @@ -3930,15 +3939,16 @@ bool BlockWithClients::preInitCursor () { int ScatterBlock::initializeCursor (AqlItemBlock* items, size_t pos) { ENTER_BLOCK - if (! preInitCursor()) { + if (_ignoreInitCursor) { return TRI_ERROR_NO_ERROR; } - int res = ExecutionBlock::initializeCursor(items, pos); + int res = BlockWithClients::initializeCursor(items, pos); if (res != TRI_ERROR_NO_ERROR) { return res; } + // local clean up _posForClient.clear(); for (size_t i = 0; i < _nrClients; i++) { @@ -3948,6 +3958,28 @@ int ScatterBlock::initializeCursor (AqlItemBlock* items, size_t pos) { LEAVE_BLOCK } +//////////////////////////////////////////////////////////////////////////////// +/// @brief initializeCursor +//////////////////////////////////////////////////////////////////////////////// + +int ScatterBlock::shutdown (int errorCode) { + ENTER_BLOCK + if (_ignoreShutdown) { + return TRI_ERROR_NO_ERROR; + } + + int res = BlockWithClients::shutdown(errorCode); + if (res != TRI_ERROR_NO_ERROR) { + return res; + } + + // local clean up + _posForClient.clear(); + + return TRI_ERROR_NO_ERROR; + LEAVE_BLOCK +} + //////////////////////////////////////////////////////////////////////////////// /// @brief hasMoreForShard: any more for shard ? //////////////////////////////////////////////////////////////////////////////// @@ -3960,13 +3992,15 @@ bool ScatterBlock::hasMoreForShard (std::string const& shardId) { return false; } + // TODO is this correct? + _ignoreInitCursor = false; + std::pair pos = _posForClient.at(clientId); // (i, j) where i is the position in _buffer, and j is the position in // _buffer.at(i) we are sending to if (pos.first > _buffer.size()) { - _initOrShutdown = true; - if (! getBlock(DefaultBatchSize, DefaultBatchSize)) { + if (! ExecutionBlock::getBlock(DefaultBatchSize, DefaultBatchSize)) { _doneForClient.at(clientId) = true; return false; } @@ -4026,7 +4060,6 @@ int ScatterBlock::getOrSkipSomeForShard (size_t atLeast, // pull more blocks from dependency if necessary . . . if (pos.first >= _buffer.size()) { - _initOrShutdown = true; if (! getBlock(atLeast, atMost)) { _doneForClient.at(clientId) = true; return TRI_ERROR_NO_ERROR; @@ -4100,15 +4133,16 @@ DistributeBlock::DistributeBlock (ExecutionEngine* engine, int DistributeBlock::initializeCursor (AqlItemBlock* items, size_t pos) { ENTER_BLOCK - if (! preInitCursor()) { + if (_ignoreInitCursor) { return TRI_ERROR_NO_ERROR; } - int res = ExecutionBlock::initializeCursor(items, pos); + int res = BlockWithClients::initializeCursor(items, pos); if (res != TRI_ERROR_NO_ERROR) { return res; } + // local clean up _distBuffer.clear(); _distBuffer.reserve(_nrClients); for (size_t i = 0; i < _nrClients; i++) { @@ -4119,17 +4153,41 @@ int DistributeBlock::initializeCursor (AqlItemBlock* items, size_t pos) { LEAVE_BLOCK } +//////////////////////////////////////////////////////////////////////////////// +/// @brief shutdown +//////////////////////////////////////////////////////////////////////////////// + +int DistributeBlock::shutdown (int errorCode) { + ENTER_BLOCK + if (_ignoreShutdown) { + return TRI_ERROR_NO_ERROR; + } + + int res = BlockWithClients::shutdown(errorCode); + if (res != TRI_ERROR_NO_ERROR) { + return res; + } + + // local clean up + _distBuffer.clear(); + + return TRI_ERROR_NO_ERROR; + LEAVE_BLOCK +} //////////////////////////////////////////////////////////////////////////////// /// @brief hasMore: any more for any shard? //////////////////////////////////////////////////////////////////////////////// bool DistributeBlock::hasMoreForShard (std::string const& shardId) { ENTER_BLOCK + size_t clientId = getClientId(shardId); - if (_doneForClient.at(clientId)) { return false; } + + // TODO is this correct? + _ignoreInitCursor = false; if (! _distBuffer.at(clientId).empty()) { return true; diff --git a/arangod/Aql/ExecutionBlock.h b/arangod/Aql/ExecutionBlock.h index db988d33f7..feed4c3edd 100644 --- a/arangod/Aql/ExecutionBlock.h +++ b/arangod/Aql/ExecutionBlock.h @@ -1526,10 +1526,16 @@ namespace triagens { public: //////////////////////////////////////////////////////////////////////////////// -/// @brief shutdown +/// @brief initializeCursor //////////////////////////////////////////////////////////////////////////////// - int shutdown (int) override; + int initializeCursor (AqlItemBlock* items, size_t pos); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief initializeCursor +//////////////////////////////////////////////////////////////////////////////// + + int shutdown (int); //////////////////////////////////////////////////////////////////////////////// /// @brief getSome: shouldn't be used, use skipSomeForShard @@ -1605,13 +1611,6 @@ namespace triagens { // --SECTION-- BlockWithClients protected methods // ----------------------------------------------------------------------------- -//////////////////////////////////////////////////////////////////////////////// -/// @brief preInitCursor: check if we should really init the cursor, and reset -/// _doneForClient -//////////////////////////////////////////////////////////////////////////////// - - bool preInitCursor (); - //////////////////////////////////////////////////////////////////////////////// /// @brief getOrSkipSomeForShard //////////////////////////////////////////////////////////////////////////////// @@ -1654,10 +1653,16 @@ namespace triagens { std::vector _doneForClient; //////////////////////////////////////////////////////////////////////////////// -/// @brief _initOrShutdown: should we really initialiseCursor or shutdown? +/// @brief _ignoreInitCursor: should we really initialiseCursor? //////////////////////////////////////////////////////////////////////////////// - bool _initOrShutdown; + bool _ignoreInitCursor; + +//////////////////////////////////////////////////////////////////////////////// +/// @brief _shutdown: should we really shutdown? +//////////////////////////////////////////////////////////////////////////////// + + bool _ignoreShutdown; }; @@ -1692,6 +1697,12 @@ namespace triagens { int initializeCursor (AqlItemBlock* items, size_t pos); +//////////////////////////////////////////////////////////////////////////////// +/// @brief shutdown +//////////////////////////////////////////////////////////////////////////////// + + int shutdown (int); + //////////////////////////////////////////////////////////////////////////////// /// @brief hasMoreForShard: any more for shard ? //////////////////////////////////////////////////////////////////////////////// @@ -1704,6 +1715,9 @@ namespace triagens { int64_t remainingForShard (std::string const& shardId); + + private: + //////////////////////////////////////////////////////////////////////////////// /// @brief getOrSkipSomeForShard //////////////////////////////////////////////////////////////////////////////// @@ -1715,8 +1729,6 @@ namespace triagens { size_t& skipped, std::string const& shardId); - private: - //////////////////////////////////////////////////////////////////////////////// /// @brief _posForClient: //////////////////////////////////////////////////////////////////////////////// @@ -1755,6 +1767,12 @@ namespace triagens { int initializeCursor (AqlItemBlock* items, size_t pos); +//////////////////////////////////////////////////////////////////////////////// +/// @brief shutdown +//////////////////////////////////////////////////////////////////////////////// + + int shutdown (int); + //////////////////////////////////////////////////////////////////////////////// /// @brief remainingForShard: remaining for shard ? //////////////////////////////////////////////////////////////////////////////// @@ -1768,7 +1786,10 @@ namespace triagens { //////////////////////////////////////////////////////////////////////////////// bool hasMoreForShard (std::string const& shardId); + + private: + //////////////////////////////////////////////////////////////////////////////// /// @brief getOrSkipSomeForShard //////////////////////////////////////////////////////////////////////////////// @@ -1780,8 +1801,6 @@ namespace triagens { size_t& skipped, std::string const& shardId); - private: - //////////////////////////////////////////////////////////////////////////////// /// @brief getBlockForClient: try to get at atLeast/atMost pairs into /// _distBuffer.at(clientId).