1
0
Fork 0

Merge branch 'devel' of https://github.com/triAGENS/ArangoDB into devel

This commit is contained in:
Jan Steemann 2014-10-23 22:04:18 +02:00
commit ed07ba2fc0
2 changed files with 128 additions and 51 deletions

View File

@ -3795,7 +3795,8 @@ BlockWithClients::BlockWithClients (ExecutionEngine* engine,
std::vector<std::string> const& shardIds) std::vector<std::string> const& shardIds)
: ExecutionBlock(engine, ep), : ExecutionBlock(engine, ep),
_nrClients(shardIds.size()), _nrClients(shardIds.size()),
_initOrShutdown(true) { _ignoreInitCursor(false),
_ignoreShutdown(false){
_shardIdMap.reserve(_nrClients); _shardIdMap.reserve(_nrClients);
for (size_t i = 0; i < _nrClients; i++) { for (size_t i = 0; i < _nrClients; i++) {
@ -3803,17 +3804,45 @@ BlockWithClients::BlockWithClients (ExecutionEngine* engine,
} }
} }
////////////////////////////////////////////////////////////////////////////////
/// @brief initializeCursor: reset _doneForClient
////////////////////////////////////////////////////////////////////////////////
int BlockWithClients::initializeCursor (AqlItemBlock* items, size_t pos) {
ENTER_BLOCK
TRI_ASSERT(! _ignoreInitCursor);
_ignoreInitCursor = true;
int res = ExecutionBlock::initializeCursor(items, pos);
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
_doneForClient.clear();
_doneForClient.reserve(_nrClients);
for (size_t i = 0; i < _nrClients; i++) {
_doneForClient.push_back(false);
}
return TRI_ERROR_NO_ERROR;
LEAVE_BLOCK
}
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief shutdown /// @brief shutdown
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
int BlockWithClients::shutdown (int errorCode) { int BlockWithClients::shutdown (int errorCode) {
ENTER_BLOCK ENTER_BLOCK
if (! _initOrShutdown) {
return TRI_ERROR_NO_ERROR;
}
_initOrShutdown = false; TRI_ASSERT(! _ignoreShutdown);
_ignoreShutdown = true;
_doneForClient.clear();
return ExecutionBlock::shutdown(errorCode); return ExecutionBlock::shutdown(errorCode);
LEAVE_BLOCK LEAVE_BLOCK
} }
@ -3826,6 +3855,7 @@ AqlItemBlock* BlockWithClients::getSomeForShard (size_t atLeast,
size_t atMost, size_t atMost,
std::string const& shardId) { std::string const& shardId) {
ENTER_BLOCK ENTER_BLOCK
_ignoreInitCursor = false;
size_t skipped = 0; size_t skipped = 0;
AqlItemBlock* result = nullptr; AqlItemBlock* result = nullptr;
int out = getOrSkipSomeForShard(atLeast, atMost, false, result, skipped, shardId); int out = getOrSkipSomeForShard(atLeast, atMost, false, result, skipped, shardId);
@ -3844,6 +3874,7 @@ size_t BlockWithClients::skipSomeForShard (size_t atLeast,
size_t atMost, size_t atMost,
std::string const& shardId) { std::string const& shardId) {
ENTER_BLOCK ENTER_BLOCK
_ignoreInitCursor = false;
size_t skipped = 0; size_t skipped = 0;
AqlItemBlock* result = nullptr; AqlItemBlock* result = nullptr;
int out = getOrSkipSomeForShard(atLeast, atMost, true, result, skipped, shardId); int out = getOrSkipSomeForShard(atLeast, atMost, true, result, skipped, shardId);
@ -3862,6 +3893,7 @@ size_t BlockWithClients::skipSomeForShard (size_t atLeast,
bool BlockWithClients::skipForShard (size_t number, bool BlockWithClients::skipForShard (size_t number,
std::string const& shardId) { std::string const& shardId) {
ENTER_BLOCK ENTER_BLOCK
_ignoreInitCursor = false;
size_t skipped = skipSomeForShard(number, number, shardId); size_t skipped = skipSomeForShard(number, number, shardId);
size_t nr = skipped; size_t nr = skipped;
while (nr != 0 && skipped < number) { while (nr != 0 && skipped < number) {
@ -3897,29 +3929,6 @@ size_t BlockWithClients::getClientId (std::string const& shardId) {
LEAVE_BLOCK LEAVE_BLOCK
} }
////////////////////////////////////////////////////////////////////////////////
/// @brief preInitCursor: check if we should really init the cursor, and reset
/// _doneForClient
////////////////////////////////////////////////////////////////////////////////
bool BlockWithClients::preInitCursor () {
ENTER_BLOCK
if (! _initOrShutdown) {
return false;
}
_doneForClient.clear();
_doneForClient.reserve(_nrClients);
for (size_t i = 0; i < _nrClients; i++) {
_doneForClient.push_back(false);
}
_initOrShutdown = false;
return true;
LEAVE_BLOCK
}
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
// --SECTION-- class ScatterBlock // --SECTION-- class ScatterBlock
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
@ -3930,15 +3939,16 @@ bool BlockWithClients::preInitCursor () {
int ScatterBlock::initializeCursor (AqlItemBlock* items, size_t pos) { int ScatterBlock::initializeCursor (AqlItemBlock* items, size_t pos) {
ENTER_BLOCK ENTER_BLOCK
if (! preInitCursor()) { if (_ignoreInitCursor) {
return TRI_ERROR_NO_ERROR; return TRI_ERROR_NO_ERROR;
} }
int res = ExecutionBlock::initializeCursor(items, pos); int res = BlockWithClients::initializeCursor(items, pos);
if (res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_NO_ERROR) {
return res; return res;
} }
// local clean up
_posForClient.clear(); _posForClient.clear();
for (size_t i = 0; i < _nrClients; i++) { for (size_t i = 0; i < _nrClients; i++) {
@ -3948,6 +3958,28 @@ int ScatterBlock::initializeCursor (AqlItemBlock* items, size_t pos) {
LEAVE_BLOCK LEAVE_BLOCK
} }
////////////////////////////////////////////////////////////////////////////////
/// @brief initializeCursor
////////////////////////////////////////////////////////////////////////////////
int ScatterBlock::shutdown (int errorCode) {
ENTER_BLOCK
if (_ignoreShutdown) {
return TRI_ERROR_NO_ERROR;
}
int res = BlockWithClients::shutdown(errorCode);
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
// local clean up
_posForClient.clear();
return TRI_ERROR_NO_ERROR;
LEAVE_BLOCK
}
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief hasMoreForShard: any more for shard <shardId>? /// @brief hasMoreForShard: any more for shard <shardId>?
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -3960,13 +3992,15 @@ bool ScatterBlock::hasMoreForShard (std::string const& shardId) {
return false; return false;
} }
// TODO is this correct?
_ignoreInitCursor = false;
std::pair<size_t,size_t> pos = _posForClient.at(clientId); std::pair<size_t,size_t> pos = _posForClient.at(clientId);
// (i, j) where i is the position in _buffer, and j is the position in // (i, j) where i is the position in _buffer, and j is the position in
// _buffer.at(i) we are sending to <clientId> // _buffer.at(i) we are sending to <clientId>
if (pos.first > _buffer.size()) { if (pos.first > _buffer.size()) {
_initOrShutdown = true; if (! ExecutionBlock::getBlock(DefaultBatchSize, DefaultBatchSize)) {
if (! getBlock(DefaultBatchSize, DefaultBatchSize)) {
_doneForClient.at(clientId) = true; _doneForClient.at(clientId) = true;
return false; return false;
} }
@ -4026,7 +4060,6 @@ int ScatterBlock::getOrSkipSomeForShard (size_t atLeast,
// pull more blocks from dependency if necessary . . . // pull more blocks from dependency if necessary . . .
if (pos.first >= _buffer.size()) { if (pos.first >= _buffer.size()) {
_initOrShutdown = true;
if (! getBlock(atLeast, atMost)) { if (! getBlock(atLeast, atMost)) {
_doneForClient.at(clientId) = true; _doneForClient.at(clientId) = true;
return TRI_ERROR_NO_ERROR; return TRI_ERROR_NO_ERROR;
@ -4100,15 +4133,16 @@ DistributeBlock::DistributeBlock (ExecutionEngine* engine,
int DistributeBlock::initializeCursor (AqlItemBlock* items, size_t pos) { int DistributeBlock::initializeCursor (AqlItemBlock* items, size_t pos) {
ENTER_BLOCK ENTER_BLOCK
if (! preInitCursor()) { if (_ignoreInitCursor) {
return TRI_ERROR_NO_ERROR; return TRI_ERROR_NO_ERROR;
} }
int res = ExecutionBlock::initializeCursor(items, pos); int res = BlockWithClients::initializeCursor(items, pos);
if (res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_NO_ERROR) {
return res; return res;
} }
// local clean up
_distBuffer.clear(); _distBuffer.clear();
_distBuffer.reserve(_nrClients); _distBuffer.reserve(_nrClients);
for (size_t i = 0; i < _nrClients; i++) { for (size_t i = 0; i < _nrClients; i++) {
@ -4119,18 +4153,42 @@ int DistributeBlock::initializeCursor (AqlItemBlock* items, size_t pos) {
LEAVE_BLOCK LEAVE_BLOCK
} }
////////////////////////////////////////////////////////////////////////////////
/// @brief shutdown
////////////////////////////////////////////////////////////////////////////////
int DistributeBlock::shutdown (int errorCode) {
ENTER_BLOCK
if (_ignoreShutdown) {
return TRI_ERROR_NO_ERROR;
}
int res = BlockWithClients::shutdown(errorCode);
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
// local clean up
_distBuffer.clear();
return TRI_ERROR_NO_ERROR;
LEAVE_BLOCK
}
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief hasMore: any more for any shard? /// @brief hasMore: any more for any shard?
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
bool DistributeBlock::hasMoreForShard (std::string const& shardId) { bool DistributeBlock::hasMoreForShard (std::string const& shardId) {
ENTER_BLOCK ENTER_BLOCK
size_t clientId = getClientId(shardId);
size_t clientId = getClientId(shardId);
if (_doneForClient.at(clientId)) { if (_doneForClient.at(clientId)) {
return false; return false;
} }
// TODO is this correct?
_ignoreInitCursor = false;
if (! _distBuffer.at(clientId).empty()) { if (! _distBuffer.at(clientId).empty()) {
return true; return true;
} }

View File

@ -1526,10 +1526,16 @@ namespace triagens {
public: public:
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief shutdown /// @brief initializeCursor
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
int shutdown (int) override; int initializeCursor (AqlItemBlock* items, size_t pos);
////////////////////////////////////////////////////////////////////////////////
/// @brief initializeCursor
////////////////////////////////////////////////////////////////////////////////
int shutdown (int);
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief getSome: shouldn't be used, use skipSomeForShard /// @brief getSome: shouldn't be used, use skipSomeForShard
@ -1605,13 +1611,6 @@ namespace triagens {
// --SECTION-- BlockWithClients protected methods // --SECTION-- BlockWithClients protected methods
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief preInitCursor: check if we should really init the cursor, and reset
/// _doneForClient
////////////////////////////////////////////////////////////////////////////////
bool preInitCursor ();
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief getOrSkipSomeForShard /// @brief getOrSkipSomeForShard
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -1654,10 +1653,16 @@ namespace triagens {
std::vector<bool> _doneForClient; std::vector<bool> _doneForClient;
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief _initOrShutdown: should we really initialiseCursor or shutdown? /// @brief _ignoreInitCursor: should we really initialiseCursor?
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
bool _initOrShutdown; bool _ignoreInitCursor;
////////////////////////////////////////////////////////////////////////////////
/// @brief _shutdown: should we really shutdown?
////////////////////////////////////////////////////////////////////////////////
bool _ignoreShutdown;
}; };
@ -1692,6 +1697,12 @@ namespace triagens {
int initializeCursor (AqlItemBlock* items, size_t pos); int initializeCursor (AqlItemBlock* items, size_t pos);
////////////////////////////////////////////////////////////////////////////////
/// @brief shutdown
////////////////////////////////////////////////////////////////////////////////
int shutdown (int);
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief hasMoreForShard: any more for shard <shardId>? /// @brief hasMoreForShard: any more for shard <shardId>?
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -1704,6 +1715,9 @@ namespace triagens {
int64_t remainingForShard (std::string const& shardId); int64_t remainingForShard (std::string const& shardId);
private:
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief getOrSkipSomeForShard /// @brief getOrSkipSomeForShard
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -1715,8 +1729,6 @@ namespace triagens {
size_t& skipped, size_t& skipped,
std::string const& shardId); std::string const& shardId);
private:
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief _posForClient: /// @brief _posForClient:
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -1755,6 +1767,12 @@ namespace triagens {
int initializeCursor (AqlItemBlock* items, size_t pos); int initializeCursor (AqlItemBlock* items, size_t pos);
////////////////////////////////////////////////////////////////////////////////
/// @brief shutdown
////////////////////////////////////////////////////////////////////////////////
int shutdown (int);
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief remainingForShard: remaining for shard <shardId>? /// @brief remainingForShard: remaining for shard <shardId>?
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -1769,6 +1787,9 @@ namespace triagens {
bool hasMoreForShard (std::string const& shardId); bool hasMoreForShard (std::string const& shardId);
private:
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief getOrSkipSomeForShard /// @brief getOrSkipSomeForShard
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -1780,8 +1801,6 @@ namespace triagens {
size_t& skipped, size_t& skipped,
std::string const& shardId); std::string const& shardId);
private:
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief getBlockForClient: try to get at atLeast/atMost pairs into /// @brief getBlockForClient: try to get at atLeast/atMost pairs into
/// _distBuffer.at(clientId). /// _distBuffer.at(clientId).