1
0
Fork 0

make AQL in 3.3 somewhat forward-compatible with 3.4 (#5907)

This commit is contained in:
Jan 2018-07-25 09:13:52 +02:00 committed by GitHub
parent 03933e6e02
commit 761621ec56
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 103 additions and 7 deletions

View File

@ -58,7 +58,7 @@ using VelocyPackHelper = arangodb::basics::VelocyPackHelper;
using StringBuffer = arangodb::basics::StringBuffer; using StringBuffer = arangodb::basics::StringBuffer;
GatherBlock::GatherBlock(ExecutionEngine* engine, GatherNode const* en) GatherBlock::GatherBlock(ExecutionEngine* engine, GatherNode const* en)
: ExecutionBlock(engine, en), : ExecutionBlock(engine, en), LazyInitializeBlock(),
_sortRegisters(), _sortRegisters(),
_isSimple(en->elements().empty()), _isSimple(en->elements().empty()),
_heap(en->_sortmode == 'h' ? new Heap : nullptr) { _heap(en->_sortmode == 'h' ? new Heap : nullptr) {
@ -136,6 +136,8 @@ int GatherBlock::shutdown(int errorCode) {
/// @brief initializeCursor /// @brief initializeCursor
int GatherBlock::initializeCursor(AqlItemBlock* items, size_t pos) { int GatherBlock::initializeCursor(AqlItemBlock* items, size_t pos) {
DEBUG_BEGIN_BLOCK(); DEBUG_BEGIN_BLOCK();
setInitialized();
int res = ExecutionBlock::initializeCursor(items, pos); int res = ExecutionBlock::initializeCursor(items, pos);
if (res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_NO_ERROR) {
@ -217,6 +219,10 @@ bool GatherBlock::hasMore() {
if (_done || _dependencies.empty()) { if (_done || _dependencies.empty()) {
return false; return false;
} }
if (!wasInitialized()) {
initializeCursor(nullptr, 0);
}
if (_isSimple) { if (_isSimple) {
for (size_t i = 0; i < _dependencies.size(); i++) { for (size_t i = 0; i < _dependencies.size(); i++) {
@ -254,6 +260,10 @@ AqlItemBlock* GatherBlock::getSome(size_t atLeast, size_t atMost) {
traceGetSomeEnd(nullptr); traceGetSomeEnd(nullptr);
return nullptr; return nullptr;
} }
if (!wasInitialized()) {
initializeCursor(nullptr, 0);
}
// the simple case . . . // the simple case . . .
if (_isSimple) { if (_isSimple) {
@ -408,6 +418,10 @@ size_t GatherBlock::skipSome(size_t atLeast, size_t atMost) {
if (_done) { if (_done) {
return 0; return 0;
} }
if (!wasInitialized()) {
initializeCursor(nullptr, 0);
}
// the simple case . . . // the simple case . . .
if (_isSimple) { if (_isSimple) {
@ -544,7 +558,7 @@ bool GatherBlock::OurLessThan::operator()(std::pair<size_t, size_t> const& a,
BlockWithClients::BlockWithClients(ExecutionEngine* engine, BlockWithClients::BlockWithClients(ExecutionEngine* engine,
ExecutionNode const* ep, ExecutionNode const* ep,
std::vector<std::string> const& shardIds) std::vector<std::string> const& shardIds)
: ExecutionBlock(engine, ep), _nrClients(shardIds.size()), _wasShutdown(false) { : ExecutionBlock(engine, ep), LazyInitializeBlock(), _nrClients(shardIds.size()), _wasShutdown(false) {
_shardIdMap.reserve(_nrClients); _shardIdMap.reserve(_nrClients);
for (size_t i = 0; i < _nrClients; i++) { for (size_t i = 0; i < _nrClients; i++) {
_shardIdMap.emplace(std::make_pair(shardIds[i], i)); _shardIdMap.emplace(std::make_pair(shardIds[i], i));
@ -555,6 +569,7 @@ BlockWithClients::BlockWithClients(ExecutionEngine* engine,
int BlockWithClients::initializeCursor(AqlItemBlock* items, size_t pos) { int BlockWithClients::initializeCursor(AqlItemBlock* items, size_t pos) {
DEBUG_BEGIN_BLOCK(); DEBUG_BEGIN_BLOCK();
setInitialized();
int res = ExecutionBlock::initializeCursor(items, pos); int res = ExecutionBlock::initializeCursor(items, pos);
if (res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_NO_ERROR) {
@ -597,7 +612,11 @@ AqlItemBlock* BlockWithClients::getSomeForShard(size_t atLeast, size_t atMost,
DEBUG_BEGIN_BLOCK(); DEBUG_BEGIN_BLOCK();
size_t skipped = 0; size_t skipped = 0;
AqlItemBlock* result = nullptr; AqlItemBlock* result = nullptr;
if (!wasInitialized()) {
initializeCursor(nullptr, 0);
}
int out = int out =
getOrSkipSomeForShard(atLeast, atMost, false, result, skipped, shardId); getOrSkipSomeForShard(atLeast, atMost, false, result, skipped, shardId);
@ -620,8 +639,14 @@ size_t BlockWithClients::skipSomeForShard(size_t atLeast, size_t atMost,
DEBUG_BEGIN_BLOCK(); DEBUG_BEGIN_BLOCK();
size_t skipped = 0; size_t skipped = 0;
AqlItemBlock* result = nullptr; AqlItemBlock* result = nullptr;
if (!wasInitialized()) {
initializeCursor(nullptr, 0);
}
int out = int out =
getOrSkipSomeForShard(atLeast, atMost, true, result, skipped, shardId); getOrSkipSomeForShard(atLeast, atMost, true, result, skipped, shardId);
TRI_ASSERT(result == nullptr); TRI_ASSERT(result == nullptr);
if (out != TRI_ERROR_NO_ERROR) { if (out != TRI_ERROR_NO_ERROR) {
THROW_ARANGO_EXCEPTION(out); THROW_ARANGO_EXCEPTION(out);
@ -674,7 +699,9 @@ size_t BlockWithClients::getClientId(std::string const& shardId) {
int ScatterBlock::initializeCursor(AqlItemBlock* items, size_t pos) { int ScatterBlock::initializeCursor(AqlItemBlock* items, size_t pos) {
DEBUG_BEGIN_BLOCK(); DEBUG_BEGIN_BLOCK();
setInitialized();
int res = BlockWithClients::initializeCursor(items, pos); int res = BlockWithClients::initializeCursor(items, pos);
if (res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_NO_ERROR) {
return res; return res;
} }
@ -713,9 +740,14 @@ bool ScatterBlock::hasMoreForShard(std::string const& shardId) {
DEBUG_BEGIN_BLOCK(); DEBUG_BEGIN_BLOCK();
TRI_ASSERT(_nrClients != 0); TRI_ASSERT(_nrClients != 0);
if (!wasInitialized()) {
initializeCursor(nullptr, 0);
}
size_t clientId = getClientId(shardId); size_t clientId = getClientId(shardId);
TRI_ASSERT(_doneForClient.size() > clientId);
if (_doneForClient.at(clientId)) { if (_doneForClient.at(clientId)) {
return false; return false;
} }
@ -742,6 +774,7 @@ int64_t ScatterBlock::remainingForShard(std::string const& shardId) {
DEBUG_BEGIN_BLOCK(); DEBUG_BEGIN_BLOCK();
size_t clientId = getClientId(shardId); size_t clientId = getClientId(shardId);
TRI_ASSERT(_doneForClient.size() > clientId);
if (_doneForClient.at(clientId)) { if (_doneForClient.at(clientId)) {
return 0; return 0;
} }
@ -774,9 +807,14 @@ int ScatterBlock::getOrSkipSomeForShard(size_t atLeast, size_t atMost,
DEBUG_BEGIN_BLOCK(); DEBUG_BEGIN_BLOCK();
TRI_ASSERT(0 < atLeast && atLeast <= atMost); TRI_ASSERT(0 < atLeast && atLeast <= atMost);
TRI_ASSERT(result == nullptr && skipped == 0); TRI_ASSERT(result == nullptr && skipped == 0);
if (!wasInitialized()) {
initializeCursor(nullptr, 0);
}
size_t clientId = getClientId(shardId); size_t clientId = getClientId(shardId);
TRI_ASSERT(_doneForClient.size() > clientId);
if (_doneForClient.at(clientId)) { if (_doneForClient.at(clientId)) {
return TRI_ERROR_NO_ERROR; return TRI_ERROR_NO_ERROR;
} }
@ -837,7 +875,7 @@ DistributeBlock::DistributeBlock(ExecutionEngine* engine,
DistributeNode const* ep, DistributeNode const* ep,
std::vector<std::string> const& shardIds, std::vector<std::string> const& shardIds,
Collection const* collection) Collection const* collection)
: BlockWithClients(engine, ep, shardIds), : BlockWithClients(engine, ep, shardIds),
_collection(collection), _collection(collection),
_index(0), _index(0),
_regId(ExecutionNode::MaxRegisterId), _regId(ExecutionNode::MaxRegisterId),
@ -870,6 +908,7 @@ DistributeBlock::DistributeBlock(ExecutionEngine* engine,
int DistributeBlock::initializeCursor(AqlItemBlock* items, size_t pos) { int DistributeBlock::initializeCursor(AqlItemBlock* items, size_t pos) {
DEBUG_BEGIN_BLOCK(); DEBUG_BEGIN_BLOCK();
setInitialized();
int res = BlockWithClients::initializeCursor(items, pos); int res = BlockWithClients::initializeCursor(items, pos);
if (res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_NO_ERROR) {
@ -910,8 +949,13 @@ int DistributeBlock::shutdown(int errorCode) {
/// @brief hasMore: any more for any shard? /// @brief hasMore: any more for any shard?
bool DistributeBlock::hasMoreForShard(std::string const& shardId) { bool DistributeBlock::hasMoreForShard(std::string const& shardId) {
DEBUG_BEGIN_BLOCK(); DEBUG_BEGIN_BLOCK();
if (!wasInitialized()) {
initializeCursor(nullptr, 0);
}
size_t clientId = getClientId(shardId); size_t clientId = getClientId(shardId);
TRI_ASSERT(_doneForClient.size() > clientId);
if (_doneForClient.at(clientId)) { if (_doneForClient.at(clientId)) {
return false; return false;
} }
@ -939,8 +983,13 @@ int DistributeBlock::getOrSkipSomeForShard(size_t atLeast, size_t atMost,
traceGetSomeBegin(); traceGetSomeBegin();
TRI_ASSERT(0 < atLeast && atLeast <= atMost); TRI_ASSERT(0 < atLeast && atLeast <= atMost);
TRI_ASSERT(result == nullptr && skipped == 0); TRI_ASSERT(result == nullptr && skipped == 0);
if (!wasInitialized()) {
initializeCursor(nullptr, 0);
}
size_t clientId = getClientId(shardId); size_t clientId = getClientId(shardId);
TRI_ASSERT(_doneForClient.size() > clientId);
if (_doneForClient.at(clientId)) { if (_doneForClient.at(clientId)) {
traceGetSomeEnd(result); traceGetSomeEnd(result);
@ -1015,9 +1064,13 @@ bool DistributeBlock::getBlockForClient(size_t atLeast, size_t atMost,
_index = 0; // position in _buffer _index = 0; // position in _buffer
_pos = 0; // position in _buffer.at(_index) _pos = 0; // position in _buffer.at(_index)
} }
TRI_ASSERT(_doneForClient.size() > clientId);
std::vector<std::deque<std::pair<size_t, size_t>>>& buf = _distBuffer; std::vector<std::deque<std::pair<size_t, size_t>>>& buf = _distBuffer;
// it should be the case that buf.at(clientId) is empty // it should be the case that buf.at(clientId) is empty
TRI_ASSERT(buf.size() > clientId);
while (buf.at(clientId).size() < atLeast) { while (buf.at(clientId).size() < atLeast) {
if (_index == _buffer.size()) { if (_index == _buffer.size()) {
@ -1268,6 +1321,7 @@ RemoteBlock::RemoteBlock(ExecutionEngine* engine, RemoteNode const* en,
std::string const& server, std::string const& ownName, std::string const& server, std::string const& ownName,
std::string const& queryId) std::string const& queryId)
: ExecutionBlock(engine, en), : ExecutionBlock(engine, en),
LazyInitializeBlock(),
_server(server), _server(server),
_ownName(ownName), _ownName(ownName),
_queryId(queryId), _queryId(queryId),
@ -1341,6 +1395,7 @@ int RemoteBlock::initializeCursor(AqlItemBlock* items, size_t pos) {
DEBUG_BEGIN_BLOCK(); DEBUG_BEGIN_BLOCK();
// For every call we simply forward via HTTP // For every call we simply forward via HTTP
setInitialized();
if (!_isResponsibleForInitializeCursor) { if (!_isResponsibleForInitializeCursor) {
// do nothing... // do nothing...
return TRI_ERROR_NO_ERROR; return TRI_ERROR_NO_ERROR;
@ -1462,6 +1517,10 @@ AqlItemBlock* RemoteBlock::getSome(size_t atLeast, size_t atMost) {
// For every call we simply forward via HTTP // For every call we simply forward via HTTP
traceGetSomeBegin(); traceGetSomeBegin();
if (!wasInitialized()) {
initializeCursor(nullptr, 0);
}
VPackBuilder builder; VPackBuilder builder;
builder.openObject(); builder.openObject();
@ -1498,6 +1557,10 @@ AqlItemBlock* RemoteBlock::getSome(size_t atLeast, size_t atMost) {
size_t RemoteBlock::skipSome(size_t atLeast, size_t atMost) { size_t RemoteBlock::skipSome(size_t atLeast, size_t atMost) {
DEBUG_BEGIN_BLOCK(); DEBUG_BEGIN_BLOCK();
// For every call we simply forward via HTTP // For every call we simply forward via HTTP
if (!wasInitialized()) {
initializeCursor(nullptr, 0);
}
VPackBuilder builder; VPackBuilder builder;
builder.openObject(); builder.openObject();
@ -1536,6 +1599,11 @@ size_t RemoteBlock::skipSome(size_t atLeast, size_t atMost) {
/// @brief hasMore /// @brief hasMore
bool RemoteBlock::hasMore() { bool RemoteBlock::hasMore() {
DEBUG_BEGIN_BLOCK(); DEBUG_BEGIN_BLOCK();
if (!wasInitialized()) {
initializeCursor(nullptr, 0);
}
// For every call we simply forward via HTTP // For every call we simply forward via HTTP
std::unique_ptr<ClusterCommResult> res = std::unique_ptr<ClusterCommResult> res =
sendRequest(rest::RequestType::GET, "/_api/aql/hasMore/", std::string()); sendRequest(rest::RequestType::GET, "/_api/aql/hasMore/", std::string());

View File

@ -41,7 +41,35 @@ class AqlItemBlock;
struct Collection; struct Collection;
class ExecutionEngine; class ExecutionEngine;
class GatherBlock : public ExecutionBlock { /// the cluster blocks in 3.3 derive from this trivial
/// base class that makes sure initializeCursor will be
/// called at least once per block.
/// this is required for clusters running in mixed mode
/// (e.g. 3.3 coordinator but a 3.4 DB server that will
/// now send back requests to the 3.3 coordinator).
/// A 3.4 DB server will likely buffer initializeCursor
/// requests, but 3.3 requires them.
/// In order to not fall apart, this class keeps a
/// state for the cluster blocks in 3.3 whether they
/// have called their own initializeCursor method at
/// least once
class LazyInitializeBlock {
public:
LazyInitializeBlock() : _initialized(false) {}
void setInitialized() { _initialized = true; }
bool wasInitialized() {
bool result = _initialized;
_initialized = true;
return result;
}
private:
bool _initialized;
};
class GatherBlock : public ExecutionBlock, public LazyInitializeBlock {
public: public:
GatherBlock(ExecutionEngine*, GatherNode const*); GatherBlock(ExecutionEngine*, GatherNode const*);
@ -120,7 +148,7 @@ class GatherBlock : public ExecutionBlock {
std::unique_ptr<Heap> _heap; std::unique_ptr<Heap> _heap;
}; };
class BlockWithClients : public ExecutionBlock { class BlockWithClients : public ExecutionBlock, public LazyInitializeBlock {
public: public:
BlockWithClients(ExecutionEngine* engine, ExecutionNode const* ep, BlockWithClients(ExecutionEngine* engine, ExecutionNode const* ep,
std::vector<std::string> const& shardIds); std::vector<std::string> const& shardIds);
@ -293,7 +321,7 @@ class DistributeBlock : public BlockWithClients {
bool _allowSpecifiedKeys; bool _allowSpecifiedKeys;
}; };
class RemoteBlock : public ExecutionBlock { class RemoteBlock : public ExecutionBlock, public LazyInitializeBlock {
/// @brief constructors/destructors /// @brief constructors/destructors
public: public:
RemoteBlock(ExecutionEngine* engine, RemoteNode const* en, RemoteBlock(ExecutionEngine* engine, RemoteNode const* en,