mirror of https://gitee.com/bigwinds/arangodb
Merge branch 'devel' of https://github.com/triAGENS/ArangoDB into devel
This commit is contained in:
commit
ee1a04d830
|
@ -19,8 +19,12 @@ install:
|
|||
- if [ "$CXX" == "clang++" ]; then sudo apt-get install --allow-unauthenticated -qq clang-3.4; fi
|
||||
- if [ "$CXX" == "clang++" ]; then export CXX="clang++-3.4"; fi
|
||||
|
||||
# gdb
|
||||
- sudo apt-get -y install gdb
|
||||
|
||||
before_script: "bash -c Installation/travisCI/before_script.sh"
|
||||
script: "bash -c Installation/travisCI/script.sh"
|
||||
after_failure: "bash -c Installation/travisCI/after_failure.sh"
|
||||
branches:
|
||||
only:
|
||||
- master
|
||||
|
@ -31,5 +35,6 @@ branches:
|
|||
- 1.4
|
||||
- 2.0
|
||||
- 2.1
|
||||
- 2.2
|
||||
- devel
|
||||
|
||||
|
|
|
@ -0,0 +1,20 @@
|
|||
#!/bin/bash
|
||||
|
||||
echo "$0: checking for core file"
|
||||
if [[ -f core ]]; then
|
||||
echo "$0: found a core file"
|
||||
echo "thread apply all bt full" | gdb -c core bin/arangod
|
||||
fi
|
||||
|
||||
echo
|
||||
echo "$0: compiling ArangoDB"
|
||||
|
||||
make -j2 || exit 1
|
||||
|
||||
echo
|
||||
echo "$0: testing ArangoDB"
|
||||
|
||||
make jslint unittests-shell-server unittests-shell-server-ahuacatl unittests-http-server SKIP_RANGES=1 || exit 1
|
||||
|
||||
echo
|
||||
echo "$0: done"
|
|
@ -23,6 +23,7 @@ make -j2 || exit 1
|
|||
echo
|
||||
echo "$0: testing ArangoDB"
|
||||
|
||||
ulimit -c unlimited # enable core files
|
||||
make jslint unittests-shell-server unittests-shell-server-ahuacatl unittests-http-server SKIP_RANGES=1 || exit 1
|
||||
|
||||
echo
|
||||
|
|
|
@ -44,6 +44,14 @@ using Json = triagens::basics::Json;
|
|||
using JsonHelper = triagens::basics::JsonHelper;
|
||||
using StringBuffer = triagens::basics::StringBuffer;
|
||||
|
||||
#ifdef TRI_ENABLE_MAINTAINER_MODE
|
||||
#define ENTER_BLOCK try { (void) 0;
|
||||
#define LEAVE_BLOCK } catch (...) { std::cout << "caught an exception in " << __FUNCTION__ << ", " << __FILE__ << ":" << __LINE__ << "!\n"; throw; }
|
||||
#else
|
||||
#define ENTER_BLOCK
|
||||
#define LEAVE_BLOCK
|
||||
#endif
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
// --SECTION-- struct AggregatorGroup
|
||||
// -----------------------------------------------------------------------------
|
||||
|
@ -234,9 +242,8 @@ int ExecutionBlock::initialize () {
|
|||
/// @brief shutdown, will be called exactly once for the whole query
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int ExecutionBlock::shutdown () {
|
||||
int ExecutionBlock::shutdown (int errorCode) {
|
||||
int ret = TRI_ERROR_NO_ERROR;
|
||||
int res;
|
||||
|
||||
for (auto it = _buffer.begin(); it != _buffer.end(); ++it) {
|
||||
delete *it;
|
||||
|
@ -244,8 +251,9 @@ int ExecutionBlock::shutdown () {
|
|||
_buffer.clear();
|
||||
|
||||
for (auto it = _dependencies.begin(); it != _dependencies.end(); ++it) {
|
||||
int res;
|
||||
try {
|
||||
res = (*it)->shutdown();
|
||||
res = (*it)->shutdown(errorCode);
|
||||
}
|
||||
catch (...) {
|
||||
res = TRI_ERROR_INTERNAL;
|
||||
|
@ -558,12 +566,18 @@ int SingletonBlock::initializeCursor (AqlItemBlock* items, size_t pos) {
|
|||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
||||
int SingletonBlock::shutdown () {
|
||||
int res = ExecutionBlock::shutdown();
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief shutdown the singleton block
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int SingletonBlock::shutdown (int errorCode) {
|
||||
int res = ExecutionBlock::shutdown(errorCode);
|
||||
|
||||
if (_inputRegisterValues != nullptr) {
|
||||
delete _inputRegisterValues;
|
||||
_inputRegisterValues = nullptr;
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
|
@ -936,8 +950,21 @@ bool IndexRangeBlock::readIndex () {
|
|||
// must have a V8 context here to protect Expression::execute()
|
||||
auto engine = _engine;
|
||||
triagens::basics::ScopeGuard guard{
|
||||
[&engine]() -> void { engine->getQuery()->enterContext(); },
|
||||
[&engine]() -> void { engine->getQuery()->exitContext(); }
|
||||
[&engine]() -> void {
|
||||
engine->getQuery()->enterContext();
|
||||
},
|
||||
[&]() -> void {
|
||||
|
||||
// must invalidate the expression now as we might be called from
|
||||
// different threads
|
||||
if (ExecutionEngine::isDBServer()) {
|
||||
for (auto e : _allVariableBoundExpressions) {
|
||||
e->invalidate();
|
||||
}
|
||||
}
|
||||
|
||||
engine->getQuery()->exitContext();
|
||||
}
|
||||
};
|
||||
|
||||
v8::HandleScope scope; // do not delete this!
|
||||
|
@ -970,8 +997,12 @@ bool IndexRangeBlock::readIndex () {
|
|||
"AQL: computed a variable bound and got non-JSON");
|
||||
}
|
||||
}
|
||||
|
||||
for (auto h : r._highs) {
|
||||
Expression* e = _allVariableBoundExpressions[posInExpressions];
|
||||
|
||||
TRI_ASSERT(e != nullptr);
|
||||
|
||||
AqlValue a = e->execute(_trx, docColls, data, nrRegs * _pos,
|
||||
_inVars[posInExpressions],
|
||||
_inRegs[posInExpressions]);
|
||||
|
@ -1873,11 +1904,22 @@ void CalculationBlock::doEvaluation (AqlItemBlock* result) {
|
|||
RegisterId nrRegs = result->getNrRegs();
|
||||
result->setDocumentCollection(_outReg, nullptr);
|
||||
|
||||
TRI_ASSERT(_expression != nullptr);
|
||||
|
||||
// must have a V8 context here to protect Expression::execute()
|
||||
auto engine = _engine;
|
||||
triagens::basics::ScopeGuard guard{
|
||||
[&engine]() -> void { engine->getQuery()->enterContext(); },
|
||||
[&engine]() -> void { engine->getQuery()->exitContext(); }
|
||||
[&engine]() -> void {
|
||||
engine->getQuery()->enterContext();
|
||||
},
|
||||
[&]() -> void {
|
||||
// must invalidate the expression now as we might be called from
|
||||
// different threads
|
||||
if (ExecutionEngine::isDBServer()) {
|
||||
_expression->invalidate();
|
||||
}
|
||||
engine->getQuery()->exitContext();
|
||||
}
|
||||
};
|
||||
|
||||
v8::HandleScope scope; // do not delete this!
|
||||
|
@ -2330,6 +2372,7 @@ int AggregateBlock::getOrSkipSome (size_t atLeast,
|
|||
_pos = 0;
|
||||
|
||||
bool hasMore = ! _buffer.empty();
|
||||
|
||||
if (! hasMore) {
|
||||
hasMore = ExecutionBlock::getBlock(atLeast, atMost);
|
||||
}
|
||||
|
@ -3308,6 +3351,7 @@ GatherBlock::GatherBlock (ExecutionEngine* engine,
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
GatherBlock::~GatherBlock () {
|
||||
ENTER_BLOCK
|
||||
for (std::deque<AqlItemBlock*>& x : _gatherBlockBuffer) {
|
||||
for (AqlItemBlock* y: x) {
|
||||
delete y;
|
||||
|
@ -3315,6 +3359,7 @@ GatherBlock::~GatherBlock () {
|
|||
x.clear();
|
||||
}
|
||||
_gatherBlockBuffer.clear();
|
||||
LEAVE_BLOCK
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -3322,6 +3367,7 @@ GatherBlock::~GatherBlock () {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int GatherBlock::initialize () {
|
||||
ENTER_BLOCK
|
||||
auto res = ExecutionBlock::initialize();
|
||||
|
||||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
|
@ -3329,32 +3375,38 @@ int GatherBlock::initialize () {
|
|||
}
|
||||
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
LEAVE_BLOCK
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief shutdown: need our own method since our _buffer is different
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int GatherBlock::shutdown () {
|
||||
int GatherBlock::shutdown (int errorCode) {
|
||||
ENTER_BLOCK
|
||||
// don't call default shutdown method since it does the wrong thing to
|
||||
// _gatherBlockBuffer
|
||||
for (auto it = _dependencies.begin(); it != _dependencies.end(); ++it) {
|
||||
int res = (*it)->shutdown();
|
||||
int res = (*it)->shutdown(errorCode);
|
||||
|
||||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
return res;
|
||||
}
|
||||
}
|
||||
|
||||
for (std::deque<AqlItemBlock*>& x : _gatherBlockBuffer) {
|
||||
for (AqlItemBlock* y: x) {
|
||||
delete y;
|
||||
}
|
||||
x.clear();
|
||||
}
|
||||
_gatherBlockBuffer.clear();
|
||||
|
||||
if (! _isSimple) {
|
||||
for (std::deque<AqlItemBlock*>& x : _gatherBlockBuffer) {
|
||||
for (AqlItemBlock* y: x) {
|
||||
delete y;
|
||||
}
|
||||
x.clear();
|
||||
}
|
||||
_gatherBlockBuffer.clear();
|
||||
_gatherBlockPos.clear();
|
||||
}
|
||||
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
LEAVE_BLOCK
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -3362,6 +3414,7 @@ int GatherBlock::shutdown () {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int GatherBlock::initializeCursor (AqlItemBlock* items, size_t pos) {
|
||||
ENTER_BLOCK
|
||||
int res = ExecutionBlock::initializeCursor(items, pos);
|
||||
|
||||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
|
@ -3388,6 +3441,7 @@ int GatherBlock::initializeCursor (AqlItemBlock* items, size_t pos) {
|
|||
|
||||
_done = false;
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
LEAVE_BLOCK
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -3396,6 +3450,7 @@ int GatherBlock::initializeCursor (AqlItemBlock* items, size_t pos) {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int64_t GatherBlock::count () const {
|
||||
ENTER_BLOCK
|
||||
int64_t sum = 0;
|
||||
for (auto x: _dependencies) {
|
||||
if (x->count() == -1) {
|
||||
|
@ -3404,6 +3459,7 @@ int64_t GatherBlock::count () const {
|
|||
sum += x->count();
|
||||
}
|
||||
return sum;
|
||||
LEAVE_BLOCK
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -3412,6 +3468,7 @@ int64_t GatherBlock::count () const {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int64_t GatherBlock::remaining () {
|
||||
ENTER_BLOCK
|
||||
int64_t sum = 0;
|
||||
for (auto x : _dependencies) {
|
||||
if (x->remaining() == -1) {
|
||||
|
@ -3420,6 +3477,7 @@ int64_t GatherBlock::remaining () {
|
|||
sum += x->remaining();
|
||||
}
|
||||
return sum;
|
||||
LEAVE_BLOCK
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -3428,6 +3486,7 @@ int64_t GatherBlock::remaining () {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool GatherBlock::hasMore () {
|
||||
ENTER_BLOCK
|
||||
if (_done) {
|
||||
return false;
|
||||
}
|
||||
|
@ -3452,6 +3511,7 @@ bool GatherBlock::hasMore () {
|
|||
}
|
||||
_done = true;
|
||||
return false;
|
||||
LEAVE_BLOCK
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -3459,6 +3519,7 @@ bool GatherBlock::hasMore () {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
AqlItemBlock* GatherBlock::getSome (size_t atLeast, size_t atMost) {
|
||||
ENTER_BLOCK
|
||||
if (_done) {
|
||||
return nullptr;
|
||||
}
|
||||
|
@ -3572,6 +3633,7 @@ AqlItemBlock* GatherBlock::getSome (size_t atLeast, size_t atMost) {
|
|||
}
|
||||
|
||||
return res.release();
|
||||
LEAVE_BLOCK
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -3579,6 +3641,7 @@ AqlItemBlock* GatherBlock::getSome (size_t atLeast, size_t atMost) {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
size_t GatherBlock::skipSome (size_t atLeast, size_t atMost) {
|
||||
ENTER_BLOCK
|
||||
if (_done) {
|
||||
return 0;
|
||||
}
|
||||
|
@ -3655,6 +3718,7 @@ size_t GatherBlock::skipSome (size_t atLeast, size_t atMost) {
|
|||
}
|
||||
|
||||
return skipped;
|
||||
LEAVE_BLOCK
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -3663,7 +3727,9 @@ size_t GatherBlock::skipSome (size_t atLeast, size_t atMost) {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool GatherBlock::getBlock (size_t i, size_t atLeast, size_t atMost) {
|
||||
ENTER_BLOCK
|
||||
TRI_ASSERT(0 <= i && i < _dependencies.size());
|
||||
TRI_ASSERT(! _isSimple);
|
||||
AqlItemBlock* docs = _dependencies.at(i)->getSome(atLeast, atMost);
|
||||
if (docs != nullptr) {
|
||||
try {
|
||||
|
@ -3677,6 +3743,7 @@ bool GatherBlock::getBlock (size_t i, size_t atLeast, size_t atMost) {
|
|||
}
|
||||
|
||||
return false;
|
||||
LEAVE_BLOCK
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -3725,10 +3792,11 @@ bool GatherBlock::OurLessThan::operator() (std::pair<size_t, size_t> const& a,
|
|||
|
||||
BlockWithClients::BlockWithClients (ExecutionEngine* engine,
|
||||
ExecutionNode const* ep,
|
||||
std::vector<std::string> const& shardIds) :
|
||||
ExecutionBlock(engine, ep),
|
||||
_nrClients(shardIds.size()),
|
||||
_initOrShutdown(true) {
|
||||
std::vector<std::string> const& shardIds)
|
||||
: ExecutionBlock(engine, ep),
|
||||
_nrClients(shardIds.size()),
|
||||
_initOrShutdown(true) {
|
||||
|
||||
_shardIdMap.reserve(_nrClients);
|
||||
for (size_t i = 0; i < _nrClients; i++) {
|
||||
_shardIdMap.emplace(std::make_pair(shardIds[i], i));
|
||||
|
@ -3739,12 +3807,15 @@ BlockWithClients::BlockWithClients (ExecutionEngine* engine,
|
|||
/// @brief shutdown
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int BlockWithClients::shutdown () {
|
||||
if (!_initOrShutdown) {
|
||||
int BlockWithClients::shutdown (int errorCode) {
|
||||
ENTER_BLOCK
|
||||
if (! _initOrShutdown) {
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
||||
_initOrShutdown = false;
|
||||
return ExecutionBlock::shutdown();
|
||||
return ExecutionBlock::shutdown(errorCode);
|
||||
LEAVE_BLOCK
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -3754,6 +3825,7 @@ int BlockWithClients::shutdown () {
|
|||
AqlItemBlock* BlockWithClients::getSomeForShard (size_t atLeast,
|
||||
size_t atMost,
|
||||
std::string const& shardId) {
|
||||
ENTER_BLOCK
|
||||
size_t skipped = 0;
|
||||
AqlItemBlock* result = nullptr;
|
||||
int out = getOrSkipSomeForShard(atLeast, atMost, false, result, skipped, shardId);
|
||||
|
@ -3761,6 +3833,7 @@ AqlItemBlock* BlockWithClients::getSomeForShard (size_t atLeast,
|
|||
THROW_ARANGO_EXCEPTION(out);
|
||||
}
|
||||
return result;
|
||||
LEAVE_BLOCK
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -3770,6 +3843,7 @@ AqlItemBlock* BlockWithClients::getSomeForShard (size_t atLeast,
|
|||
size_t BlockWithClients::skipSomeForShard (size_t atLeast,
|
||||
size_t atMost,
|
||||
std::string const& shardId) {
|
||||
ENTER_BLOCK
|
||||
size_t skipped = 0;
|
||||
AqlItemBlock* result = nullptr;
|
||||
int out = getOrSkipSomeForShard(atLeast, atMost, true, result, skipped, shardId);
|
||||
|
@ -3778,6 +3852,7 @@ size_t BlockWithClients::skipSomeForShard (size_t atLeast,
|
|||
THROW_ARANGO_EXCEPTION(out);
|
||||
}
|
||||
return skipped;
|
||||
LEAVE_BLOCK
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -3786,6 +3861,7 @@ size_t BlockWithClients::skipSomeForShard (size_t atLeast,
|
|||
|
||||
bool BlockWithClients::skipForShard (size_t number,
|
||||
std::string const& shardId) {
|
||||
ENTER_BLOCK
|
||||
size_t skipped = skipSomeForShard(number, number, shardId);
|
||||
size_t nr = skipped;
|
||||
while (nr != 0 && skipped < number) {
|
||||
|
@ -3796,6 +3872,7 @@ bool BlockWithClients::skipForShard (size_t number,
|
|||
return true;
|
||||
}
|
||||
return ! hasMoreForShard(shardId);
|
||||
LEAVE_BLOCK
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -3804,6 +3881,7 @@ bool BlockWithClients::skipForShard (size_t number,
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
size_t BlockWithClients::getClientId (std::string const& shardId) {
|
||||
ENTER_BLOCK
|
||||
if (shardId.empty()) {
|
||||
TRI_ASSERT(false);
|
||||
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "got empty shard id");
|
||||
|
@ -3816,6 +3894,7 @@ size_t BlockWithClients::getClientId (std::string const& shardId) {
|
|||
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, message);
|
||||
}
|
||||
return ((*it).second);
|
||||
LEAVE_BLOCK
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -3824,6 +3903,7 @@ size_t BlockWithClients::getClientId (std::string const& shardId) {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool BlockWithClients::preInitCursor () {
|
||||
ENTER_BLOCK
|
||||
if (! _initOrShutdown) {
|
||||
return false;
|
||||
}
|
||||
|
@ -3837,6 +3917,7 @@ bool BlockWithClients::preInitCursor () {
|
|||
|
||||
_initOrShutdown = false;
|
||||
return true;
|
||||
LEAVE_BLOCK
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
|
@ -3848,6 +3929,7 @@ bool BlockWithClients::preInitCursor () {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int ScatterBlock::initializeCursor (AqlItemBlock* items, size_t pos) {
|
||||
ENTER_BLOCK
|
||||
if (! preInitCursor()) {
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
@ -3863,6 +3945,7 @@ int ScatterBlock::initializeCursor (AqlItemBlock* items, size_t pos) {
|
|||
_posForClient.push_back(std::make_pair(0, 0));
|
||||
}
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
LEAVE_BLOCK
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -3870,6 +3953,7 @@ int ScatterBlock::initializeCursor (AqlItemBlock* items, size_t pos) {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool ScatterBlock::hasMoreForShard (std::string const& shardId) {
|
||||
ENTER_BLOCK
|
||||
size_t clientId = getClientId(shardId);
|
||||
|
||||
if (_doneForClient.at(clientId)) {
|
||||
|
@ -3888,6 +3972,7 @@ bool ScatterBlock::hasMoreForShard (std::string const& shardId) {
|
|||
}
|
||||
}
|
||||
return true;
|
||||
LEAVE_BLOCK
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -3896,6 +3981,7 @@ bool ScatterBlock::hasMoreForShard (std::string const& shardId) {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int64_t ScatterBlock::remainingForShard (std::string const& shardId) {
|
||||
ENTER_BLOCK
|
||||
size_t clientId = getClientId(shardId);
|
||||
if (_doneForClient.at(clientId)) {
|
||||
return 0;
|
||||
|
@ -3916,6 +4002,7 @@ int64_t ScatterBlock::remainingForShard (std::string const& shardId) {
|
|||
}
|
||||
|
||||
return sum;
|
||||
LEAVE_BLOCK
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -3925,7 +4012,7 @@ int64_t ScatterBlock::remainingForShard (std::string const& shardId) {
|
|||
int ScatterBlock::getOrSkipSomeForShard (size_t atLeast,
|
||||
size_t atMost, bool skipping, AqlItemBlock*& result,
|
||||
size_t& skipped, std::string const& shardId) {
|
||||
|
||||
ENTER_BLOCK
|
||||
TRI_ASSERT(0 < atLeast && atLeast <= atMost);
|
||||
TRI_ASSERT(result == nullptr && skipped == 0);
|
||||
|
||||
|
@ -3984,6 +4071,7 @@ int ScatterBlock::getOrSkipSomeForShard (size_t atLeast,
|
|||
}
|
||||
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
LEAVE_BLOCK
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
|
@ -4011,6 +4099,7 @@ DistributeBlock::DistributeBlock (ExecutionEngine* engine,
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int DistributeBlock::initializeCursor (AqlItemBlock* items, size_t pos) {
|
||||
ENTER_BLOCK
|
||||
if (! preInitCursor()) {
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
@ -4027,6 +4116,7 @@ int DistributeBlock::initializeCursor (AqlItemBlock* items, size_t pos) {
|
|||
}
|
||||
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
LEAVE_BLOCK
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -4034,6 +4124,7 @@ int DistributeBlock::initializeCursor (AqlItemBlock* items, size_t pos) {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool DistributeBlock::hasMoreForShard (std::string const& shardId) {
|
||||
ENTER_BLOCK
|
||||
size_t clientId = getClientId(shardId);
|
||||
|
||||
if (_doneForClient.at(clientId)) {
|
||||
|
@ -4049,6 +4140,7 @@ bool DistributeBlock::hasMoreForShard (std::string const& shardId) {
|
|||
return false;
|
||||
}
|
||||
return true;
|
||||
LEAVE_BLOCK
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -4061,6 +4153,7 @@ int DistributeBlock::getOrSkipSomeForShard (size_t atLeast,
|
|||
AqlItemBlock*& result,
|
||||
size_t& skipped,
|
||||
std::string const& shardId) {
|
||||
ENTER_BLOCK
|
||||
TRI_ASSERT(0 < atLeast && atLeast <= atMost);
|
||||
TRI_ASSERT(result == nullptr && skipped == 0);
|
||||
|
||||
|
@ -4136,36 +4229,10 @@ int DistributeBlock::getOrSkipSomeForShard (size_t atLeast,
|
|||
|
||||
freeCollector();
|
||||
|
||||
// check if we can pop from the front of _buffer
|
||||
size_t smallestIndex = 0;
|
||||
|
||||
for (size_t i = 0; i < _nrClients; i++) {
|
||||
if (! _distBuffer.at(i).empty()) {
|
||||
size_t index = _distBuffer.at(i).at(0).first;
|
||||
if (index == 0) {
|
||||
return TRI_ERROR_NO_ERROR; // don't have to do any clean-up
|
||||
}
|
||||
else {
|
||||
smallestIndex = (std::min)(index, smallestIndex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// pop from _buffer
|
||||
for (size_t i = 0; i < smallestIndex; i++) {
|
||||
AqlItemBlock* cur = _buffer.front();
|
||||
delete cur;
|
||||
_buffer.pop_front();
|
||||
}
|
||||
|
||||
// reset first coord of pairs in _distBuffer
|
||||
for (size_t i = 0; i < _nrClients; i++) {
|
||||
for (size_t j = 0; j < _distBuffer.at(i).size(); j++) {
|
||||
_distBuffer.at(i).at(j).first -= smallestIndex;
|
||||
}
|
||||
}
|
||||
// _buffer is left intact, deleted and cleared at shutdown
|
||||
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
LEAVE_BLOCK
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -4179,6 +4246,7 @@ int DistributeBlock::getOrSkipSomeForShard (size_t atLeast,
|
|||
bool DistributeBlock::getBlockForClient (size_t atLeast,
|
||||
size_t atMost,
|
||||
size_t clientId) {
|
||||
ENTER_BLOCK
|
||||
if (_buffer.empty()) {
|
||||
_index = 0; // position in _buffer
|
||||
_pos = 0; // position in _buffer.at(_index)
|
||||
|
@ -4215,6 +4283,7 @@ bool DistributeBlock::getBlockForClient (size_t atLeast,
|
|||
}
|
||||
|
||||
return true;
|
||||
LEAVE_BLOCK
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -4224,6 +4293,7 @@ bool DistributeBlock::getBlockForClient (size_t atLeast,
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
size_t DistributeBlock::sendToClient (AqlValue val) {
|
||||
ENTER_BLOCK
|
||||
TRI_json_t const* json;
|
||||
if (val._type == AqlValue::JSON) {
|
||||
json = val._json->json();
|
||||
|
@ -4259,6 +4329,7 @@ size_t DistributeBlock::sendToClient (AqlValue val) {
|
|||
TRI_ASSERT(!shardId.empty());
|
||||
|
||||
return getClientId(shardId);
|
||||
LEAVE_BLOCK
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
|
@ -4271,9 +4342,9 @@ size_t DistributeBlock::sendToClient (AqlValue val) {
|
|||
|
||||
static bool throwExceptionAfterBadSyncRequest (ClusterCommResult* res,
|
||||
bool isShutdown) {
|
||||
ENTER_BLOCK
|
||||
if (res->status == CL_COMM_TIMEOUT) {
|
||||
std::string errorMessage;
|
||||
errorMessage += std::string("Timeout in communication with shard '") +
|
||||
std::string errorMessage = std::string("Timeout in communication with shard '") +
|
||||
std::string(res->shardID) +
|
||||
std::string("' on cluster node '") +
|
||||
std::string(res->serverID) +
|
||||
|
@ -4293,7 +4364,7 @@ static bool throwExceptionAfterBadSyncRequest (ClusterCommResult* res,
|
|||
std::string(res->shardID) +
|
||||
std::string("' on cluster node '") +
|
||||
std::string(res->serverID) +
|
||||
std::string("' failed.");
|
||||
std::string("'");
|
||||
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_CLUSTER_CONNECTION_LOST,
|
||||
errorMessage);
|
||||
}
|
||||
|
@ -4306,8 +4377,7 @@ static bool throwExceptionAfterBadSyncRequest (ClusterCommResult* res,
|
|||
|
||||
if (JsonHelper::getBooleanValue(json, "error", true)) {
|
||||
errorNum = TRI_ERROR_INTERNAL;
|
||||
|
||||
errorMessage += std::string("Error message received from shard '") +
|
||||
std::string errorMessage = std::string("Error message received from shard '") +
|
||||
std::string(res->shardID) +
|
||||
std::string("' on cluster node '") +
|
||||
std::string(res->serverID) +
|
||||
|
@ -4330,11 +4400,11 @@ static bool throwExceptionAfterBadSyncRequest (ClusterCommResult* res,
|
|||
errorMessage += std::string(v->_value._string.data, v->_value._string.length - 1);
|
||||
}
|
||||
else {
|
||||
errorMessage += std::string("(No valid error in response)");
|
||||
errorMessage += std::string("(no valid error in response)");
|
||||
}
|
||||
}
|
||||
else {
|
||||
errorMessage += std::string("(No valid response)");
|
||||
errorMessage += std::string("(no valid response)");
|
||||
}
|
||||
|
||||
if (json != nullptr) {
|
||||
|
@ -4358,6 +4428,7 @@ static bool throwExceptionAfterBadSyncRequest (ClusterCommResult* res,
|
|||
}
|
||||
|
||||
return false;
|
||||
LEAVE_BLOCK
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -4394,8 +4465,9 @@ RemoteBlock::~RemoteBlock () {
|
|||
|
||||
ClusterCommResult* RemoteBlock::sendRequest (
|
||||
triagens::rest::HttpRequest::HttpRequestType type,
|
||||
std::string urlPart,
|
||||
std::string const& urlPart,
|
||||
std::string const& body) const {
|
||||
ENTER_BLOCK
|
||||
ClusterComm* cc = ClusterComm::instance();
|
||||
|
||||
// Later, we probably want to set these sensibly:
|
||||
|
@ -4418,6 +4490,7 @@ ClusterCommResult* RemoteBlock::sendRequest (
|
|||
defaultTimeOut);
|
||||
|
||||
return result;
|
||||
LEAVE_BLOCK
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -4425,6 +4498,7 @@ ClusterCommResult* RemoteBlock::sendRequest (
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int RemoteBlock::initialize () {
|
||||
ENTER_BLOCK
|
||||
int res = ExecutionBlock::initialize();
|
||||
|
||||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
|
@ -4432,6 +4506,7 @@ int RemoteBlock::initialize () {
|
|||
}
|
||||
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
LEAVE_BLOCK
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -4439,6 +4514,7 @@ int RemoteBlock::initialize () {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int RemoteBlock::initializeCursor (AqlItemBlock* items, size_t pos) {
|
||||
ENTER_BLOCK
|
||||
// For every call we simply forward via HTTP
|
||||
|
||||
Json body(Json::Array, 2);
|
||||
|
@ -4468,19 +4544,21 @@ int RemoteBlock::initializeCursor (AqlItemBlock* items, size_t pos) {
|
|||
responseBodyBuf.begin()));
|
||||
return JsonHelper::getNumericValue<int>
|
||||
(responseBodyJson.json(), "code", TRI_ERROR_INTERNAL);
|
||||
LEAVE_BLOCK
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief shutdown, will be called exactly once for the whole query
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int RemoteBlock::shutdown () {
|
||||
int RemoteBlock::shutdown (int errorCode) {
|
||||
ENTER_BLOCK
|
||||
// For every call we simply forward via HTTP
|
||||
|
||||
std::unique_ptr<ClusterCommResult> res;
|
||||
res.reset(sendRequest(rest::HttpRequest::HTTP_REQUEST_PUT,
|
||||
"/_api/aql/shutdown/",
|
||||
string()));
|
||||
string("{\"code\":" + std::to_string(errorCode) + "}")));
|
||||
if (throwExceptionAfterBadSyncRequest(res.get(), true)) {
|
||||
// artificially ignore error in case query was not found during shutdown
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
|
@ -4495,6 +4573,7 @@ int RemoteBlock::shutdown () {
|
|||
|
||||
return JsonHelper::getNumericValue<int>
|
||||
(responseBodyJson.json(), "code", TRI_ERROR_INTERNAL);
|
||||
LEAVE_BLOCK
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -4503,6 +4582,7 @@ int RemoteBlock::shutdown () {
|
|||
|
||||
AqlItemBlock* RemoteBlock::getSome (size_t atLeast,
|
||||
size_t atMost) {
|
||||
ENTER_BLOCK
|
||||
// For every call we simply forward via HTTP
|
||||
|
||||
Json body(Json::Array, 2);
|
||||
|
@ -4522,18 +4602,20 @@ AqlItemBlock* RemoteBlock::getSome (size_t atLeast,
|
|||
Json responseBodyJson(TRI_UNKNOWN_MEM_ZONE,
|
||||
TRI_JsonString(TRI_UNKNOWN_MEM_ZONE,
|
||||
responseBodyBuf.begin()));
|
||||
if (JsonHelper::getBooleanValue(responseBodyJson.json(), "exhausted", true)) {
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
auto items = new triagens::aql::AqlItemBlock(responseBodyJson);
|
||||
|
||||
ExecutionStats newStats(responseBodyJson.get("stats"));
|
||||
|
||||
_engine->_stats.addDelta(_deltaStats, newStats);
|
||||
_deltaStats = newStats;
|
||||
|
||||
if (JsonHelper::getBooleanValue(responseBodyJson.json(), "exhausted", true)) {
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
auto items = new triagens::aql::AqlItemBlock(responseBodyJson);
|
||||
|
||||
return items;
|
||||
LEAVE_BLOCK
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -4541,6 +4623,7 @@ AqlItemBlock* RemoteBlock::getSome (size_t atLeast,
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
size_t RemoteBlock::skipSome (size_t atLeast, size_t atMost) {
|
||||
ENTER_BLOCK
|
||||
// For every call we simply forward via HTTP
|
||||
|
||||
Json body(Json::Array, 2);
|
||||
|
@ -4566,6 +4649,7 @@ size_t RemoteBlock::skipSome (size_t atLeast, size_t atMost) {
|
|||
size_t skipped = JsonHelper::getNumericValue<size_t>(responseBodyJson.json(),
|
||||
"skipped", 0);
|
||||
return skipped;
|
||||
LEAVE_BLOCK
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -4573,6 +4657,7 @@ size_t RemoteBlock::skipSome (size_t atLeast, size_t atMost) {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool RemoteBlock::hasMore () {
|
||||
ENTER_BLOCK
|
||||
// For every call we simply forward via HTTP
|
||||
std::unique_ptr<ClusterCommResult> res;
|
||||
res.reset(sendRequest(rest::HttpRequest::HTTP_REQUEST_GET,
|
||||
|
@ -4590,6 +4675,7 @@ bool RemoteBlock::hasMore () {
|
|||
THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_AQL_COMMUNICATION);
|
||||
}
|
||||
return JsonHelper::getBooleanValue(responseBodyJson.json(), "hasMore", true);
|
||||
LEAVE_BLOCK
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -4597,6 +4683,7 @@ bool RemoteBlock::hasMore () {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int64_t RemoteBlock::count () const {
|
||||
ENTER_BLOCK
|
||||
// For every call we simply forward via HTTP
|
||||
std::unique_ptr<ClusterCommResult> res;
|
||||
res.reset(sendRequest(rest::HttpRequest::HTTP_REQUEST_GET,
|
||||
|
@ -4615,6 +4702,7 @@ int64_t RemoteBlock::count () const {
|
|||
}
|
||||
return JsonHelper::getNumericValue<int64_t>
|
||||
(responseBodyJson.json(), "count", 0);
|
||||
LEAVE_BLOCK
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -4622,6 +4710,7 @@ int64_t RemoteBlock::count () const {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int64_t RemoteBlock::remaining () {
|
||||
ENTER_BLOCK
|
||||
// For every call we simply forward via HTTP
|
||||
std::unique_ptr<ClusterCommResult> res;
|
||||
res.reset(sendRequest(rest::HttpRequest::HTTP_REQUEST_GET,
|
||||
|
@ -4640,6 +4729,7 @@ int64_t RemoteBlock::remaining () {
|
|||
}
|
||||
return JsonHelper::getNumericValue<int64_t>
|
||||
(responseBodyJson.json(), "remaining", 0);
|
||||
LEAVE_BLOCK
|
||||
}
|
||||
|
||||
// Local Variables:
|
||||
|
|
|
@ -186,7 +186,7 @@ namespace triagens {
|
|||
/// @brief shutdown, will be called exactly once for the whole query
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
virtual int shutdown ();
|
||||
virtual int shutdown (int);
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief getOne, gets one more item
|
||||
|
@ -279,7 +279,7 @@ namespace triagens {
|
|||
|
||||
virtual int64_t remaining ();
|
||||
|
||||
ExecutionNode const* getPlanNode () {
|
||||
ExecutionNode const* getPlanNode () const {
|
||||
return _exeNode;
|
||||
}
|
||||
|
||||
|
@ -378,7 +378,7 @@ namespace triagens {
|
|||
}
|
||||
}
|
||||
|
||||
int initialize () {
|
||||
int initialize () override {
|
||||
_inputRegisterValues = nullptr; // just in case
|
||||
return ExecutionBlock::initialize();
|
||||
}
|
||||
|
@ -387,19 +387,19 @@ namespace triagens {
|
|||
/// @brief initializeCursor, store a copy of the register values coming from above
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int initializeCursor (AqlItemBlock* items, size_t pos);
|
||||
int initializeCursor (AqlItemBlock* items, size_t pos) override;
|
||||
|
||||
int shutdown ();
|
||||
int shutdown (int) override final;
|
||||
|
||||
bool hasMore () {
|
||||
bool hasMore () override final {
|
||||
return ! _done;
|
||||
}
|
||||
|
||||
int64_t count () const {
|
||||
int64_t count () const override final {
|
||||
return 1;
|
||||
}
|
||||
|
||||
int64_t remaining () {
|
||||
int64_t remaining () override final {
|
||||
return _done ? 0 : 1;
|
||||
}
|
||||
|
||||
|
@ -457,15 +457,15 @@ namespace triagens {
|
|||
/// @brief initialize, here we fetch all docs from the database
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int initialize ();
|
||||
int initialize () override;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief initCursor, here we release our docs from this collection
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int initializeCursor (AqlItemBlock* items, size_t pos);
|
||||
int initializeCursor (AqlItemBlock* items, size_t pos) override;
|
||||
|
||||
AqlItemBlock* getSome (size_t atLeast, size_t atMost);
|
||||
AqlItemBlock* getSome (size_t atLeast, size_t atMost) override;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
// skip between atLeast and atMost, returns the number actually skipped . . .
|
||||
|
@ -473,7 +473,7 @@ namespace triagens {
|
|||
// things to skip overall.
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
size_t skipSome (size_t atLeast, size_t atMost);
|
||||
size_t skipSome (size_t atLeast, size_t atMost) override final;
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
// --SECTION-- private variables
|
||||
|
@ -530,15 +530,15 @@ namespace triagens {
|
|||
/// @brief initialize, here we fetch all docs from the database
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int initialize ();
|
||||
int initialize () override;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief initializeCursor, here we release our docs from this collection
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int initializeCursor (AqlItemBlock* items, size_t pos);
|
||||
int initializeCursor (AqlItemBlock* items, size_t pos) override;
|
||||
|
||||
AqlItemBlock* getSome (size_t atLeast, size_t atMost);
|
||||
AqlItemBlock* getSome (size_t atLeast, size_t atMost) override;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
// skip between atLeast and atMost, returns the number actually skipped . . .
|
||||
|
@ -546,7 +546,7 @@ namespace triagens {
|
|||
// things to skip overall.
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
virtual size_t skipSome (size_t atLeast, size_t atMost);
|
||||
size_t skipSome (size_t atLeast, size_t atMost) override final;
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
// --SECTION-- private methods
|
||||
|
@ -651,15 +651,15 @@ namespace triagens {
|
|||
|
||||
~EnumerateListBlock ();
|
||||
|
||||
int initialize ();
|
||||
int initialize () override;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief initializeCursor, here we release our docs from this collection
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int initializeCursor (AqlItemBlock* items, size_t pos);
|
||||
int initializeCursor (AqlItemBlock* items, size_t pos) override;
|
||||
|
||||
AqlItemBlock* getSome (size_t atLeast, size_t atMost);
|
||||
AqlItemBlock* getSome (size_t atLeast, size_t atMost) override;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
// skip between atLeast and atMost returns the number actually skipped . . .
|
||||
|
@ -667,7 +667,7 @@ namespace triagens {
|
|||
// things to skip overall.
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
size_t skipSome (size_t atLeast, size_t atMost);
|
||||
size_t skipSome (size_t atLeast, size_t atMost) override final;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief create an AqlValue from the inVariable using the current _index
|
||||
|
@ -728,7 +728,7 @@ namespace triagens {
|
|||
|
||||
~CalculationBlock ();
|
||||
|
||||
int initialize ();
|
||||
int initialize () override;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief doEvaluation, private helper to do the work
|
||||
|
@ -744,8 +744,8 @@ namespace triagens {
|
|||
/// @brief getSome
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
virtual AqlItemBlock* getSome (size_t atLeast,
|
||||
size_t atMost);
|
||||
AqlItemBlock* getSome (size_t atLeast,
|
||||
size_t atMost) override;
|
||||
|
||||
private:
|
||||
|
||||
|
@ -795,10 +795,10 @@ namespace triagens {
|
|||
|
||||
~SubqueryBlock ();
|
||||
|
||||
int initialize ();
|
||||
int initialize () override;
|
||||
|
||||
virtual AqlItemBlock* getSome (size_t atLeast,
|
||||
size_t atMost);
|
||||
AqlItemBlock* getSome (size_t atLeast,
|
||||
size_t atMost) override;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief getter for the pointer to the subquery
|
||||
|
@ -836,7 +836,7 @@ namespace triagens {
|
|||
|
||||
~FilterBlock ();
|
||||
|
||||
int initialize ();
|
||||
int initialize () override;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief internal function to actually decide
|
||||
|
@ -861,13 +861,13 @@ namespace triagens {
|
|||
AqlItemBlock*& result,
|
||||
size_t& skipped);
|
||||
|
||||
bool hasMore ();
|
||||
bool hasMore () override final;
|
||||
|
||||
int64_t count () const {
|
||||
int64_t count () const override final {
|
||||
return -1; // refuse to work
|
||||
}
|
||||
|
||||
int64_t remaining () {
|
||||
int64_t remaining () override final {
|
||||
return -1; // refuse to work
|
||||
}
|
||||
|
||||
|
@ -901,7 +901,7 @@ namespace triagens {
|
|||
|
||||
~AggregateBlock ();
|
||||
|
||||
int initialize ();
|
||||
int initialize () override;
|
||||
|
||||
private:
|
||||
|
||||
|
@ -961,7 +961,7 @@ namespace triagens {
|
|||
|
||||
~SortBlock ();
|
||||
|
||||
int initialize ();
|
||||
int initialize () override;
|
||||
|
||||
virtual int initializeCursor (AqlItemBlock* items, size_t pos);
|
||||
|
||||
|
@ -1033,7 +1033,7 @@ namespace triagens {
|
|||
~LimitBlock () {
|
||||
}
|
||||
|
||||
int initialize ();
|
||||
int initialize () override;
|
||||
|
||||
int initializeCursor (AqlItemBlock* items, size_t pos);
|
||||
|
||||
|
@ -1097,8 +1097,8 @@ namespace triagens {
|
|||
/// @brief getSome
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
virtual AqlItemBlock* getSome (size_t atLeast,
|
||||
size_t atMost);
|
||||
AqlItemBlock* getSome (size_t atLeast,
|
||||
size_t atMost) override;
|
||||
|
||||
};
|
||||
|
||||
|
@ -1127,8 +1127,8 @@ namespace triagens {
|
|||
/// @brief getSome
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
virtual AqlItemBlock* getSome (size_t atLeast,
|
||||
size_t atMost);
|
||||
AqlItemBlock* getSome (size_t atLeast,
|
||||
size_t atMost) override final;
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
// --SECTION-- protected methods
|
||||
|
@ -1328,7 +1328,7 @@ namespace triagens {
|
|||
~NoResultsBlock () {
|
||||
}
|
||||
|
||||
int initialize () {
|
||||
int initialize () override {
|
||||
return ExecutionBlock::initialize();
|
||||
}
|
||||
|
||||
|
@ -1338,15 +1338,15 @@ namespace triagens {
|
|||
|
||||
int initializeCursor (AqlItemBlock* items, size_t pos);
|
||||
|
||||
bool hasMore () {
|
||||
bool hasMore () override final {
|
||||
return false;
|
||||
}
|
||||
|
||||
int64_t count () const {
|
||||
int64_t count () const override final {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int64_t remaining () {
|
||||
int64_t remaining () override final {
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -1385,13 +1385,13 @@ namespace triagens {
|
|||
/// @brief initialize
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int initialize ();
|
||||
int initialize () override;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief shutdown: need our own method since our _buffer is different
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int shutdown ();
|
||||
int shutdown (int) override final;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief initializeCursor
|
||||
|
@ -1404,41 +1404,34 @@ namespace triagens {
|
|||
/// dependency has count -1
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int64_t count () const;
|
||||
int64_t count () const override final;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief remaining: the sum of the remaining() of the dependencies or -1 (if
|
||||
/// any dependency has remaining -1
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int64_t remaining ();
|
||||
int64_t remaining () override final;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief hasMore: true if any position of _buffer hasMore and false
|
||||
/// otherwise.
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool hasMore ();
|
||||
bool hasMore () override final;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief getSome
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
AqlItemBlock* getSome (size_t, size_t);
|
||||
AqlItemBlock* getSome (size_t, size_t) override final;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief skipSome
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
size_t skipSome (size_t, size_t);
|
||||
size_t skipSome (size_t, size_t) override final;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief _gatherBlockPos: pairs (i, _pos in _buffer.at(i)), i.e. the same as
|
||||
/// the usual _pos but one pair per dependency
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
std::vector<std::pair<size_t, size_t>> _gatherBlockPos;
|
||||
|
||||
protected:
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -1457,6 +1450,13 @@ namespace triagens {
|
|||
|
||||
private:
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief _gatherBlockPos: pairs (i, _pos in _buffer.at(i)), i.e. the same as
|
||||
/// the usual _pos but one pair per dependency
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
std::vector<std::pair<size_t, size_t>> _gatherBlockPos;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief _atDep: currently pulling blocks from _dependencies.at(_atDep),
|
||||
/// simple case only
|
||||
|
@ -1518,7 +1518,6 @@ namespace triagens {
|
|||
std::vector<std::string> const& shardIds);
|
||||
|
||||
virtual ~BlockWithClients () {}
|
||||
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
// --SECTION-- BlockWithClients public methods
|
||||
|
@ -1530,13 +1529,13 @@ namespace triagens {
|
|||
/// @brief shutdown
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int shutdown ();
|
||||
int shutdown (int) override;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief getSome: shouldn't be used, use skipSomeForShard
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
AqlItemBlock* getSome (size_t atLeast, size_t atMost) {
|
||||
AqlItemBlock* getSome (size_t atLeast, size_t atMost) override final {
|
||||
TRI_ASSERT(false);
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED);
|
||||
}
|
||||
|
@ -1545,7 +1544,7 @@ namespace triagens {
|
|||
/// @brief skipSome: shouldn't be used, use skipSomeForShard
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
size_t skipSome (size_t atLeast, size_t atMost) {
|
||||
size_t skipSome (size_t atLeast, size_t atMost) override final {
|
||||
TRI_ASSERT(false);
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED);
|
||||
}
|
||||
|
@ -1554,7 +1553,7 @@ namespace triagens {
|
|||
/// @brief remaining
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int64_t remaining () {
|
||||
int64_t remaining () override final {
|
||||
TRI_ASSERT(false);
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED);
|
||||
}
|
||||
|
@ -1563,16 +1562,7 @@ namespace triagens {
|
|||
/// @brief hasMore
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool hasMore () {
|
||||
TRI_ASSERT(false);
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief skip
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int64_t skip () {
|
||||
bool hasMore () override final {
|
||||
TRI_ASSERT(false);
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED);
|
||||
}
|
||||
|
@ -1866,7 +1856,7 @@ namespace triagens {
|
|||
/// @brief initialize
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int initialize () final;
|
||||
int initialize () override final;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief initializeCursor, could be called multiple times
|
||||
|
@ -1878,38 +1868,38 @@ namespace triagens {
|
|||
/// @brief shutdown, will be called exactly once for the whole query
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int shutdown () final;
|
||||
int shutdown (int) override final;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief getSome
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
AqlItemBlock* getSome (size_t atLeast,
|
||||
size_t atMost) final;
|
||||
size_t atMost) override final;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief skipSome
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
size_t skipSome (size_t atLeast, size_t atMost) final;
|
||||
size_t skipSome (size_t atLeast, size_t atMost) override final;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief hasMore
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool hasMore () final;
|
||||
bool hasMore () override final;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief count
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int64_t count () const final;
|
||||
int64_t count () const override final;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief remaining
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int64_t remaining () final;
|
||||
int64_t remaining () override final;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief internal method to send a request
|
||||
|
@ -1919,7 +1909,7 @@ namespace triagens {
|
|||
|
||||
triagens::arango::ClusterCommResult* sendRequest (
|
||||
rest::HttpRequest::HttpRequestType type,
|
||||
std::string urlPart,
|
||||
std::string const& urlPart,
|
||||
std::string const& body) const;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
|
|
@ -161,7 +161,8 @@ ExecutionEngine::ExecutionEngine (Query* query)
|
|||
: _stats(),
|
||||
_blocks(),
|
||||
_root(nullptr),
|
||||
_query(query) {
|
||||
_query(query),
|
||||
_wasShutdown(false) {
|
||||
|
||||
_blocks.reserve(8);
|
||||
}
|
||||
|
@ -171,8 +172,11 @@ ExecutionEngine::ExecutionEngine (Query* query)
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
ExecutionEngine::~ExecutionEngine () {
|
||||
if (_root != nullptr) {
|
||||
_root->shutdown();
|
||||
try {
|
||||
shutdown(TRI_ERROR_INTERNAL);
|
||||
}
|
||||
catch (...) {
|
||||
// shutdown can throw - ignore it in the destructor
|
||||
}
|
||||
|
||||
for (auto it = _blocks.begin(); it != _blocks.end(); ++it) {
|
||||
|
@ -333,39 +337,25 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
|
|||
if ((*it).location == COORDINATOR) {
|
||||
// create a coordinator-based engine
|
||||
engine = buildEngineCoordinator((*it), queryIds);
|
||||
|
||||
TRI_ASSERT(engine != nullptr);
|
||||
|
||||
auto plan = engine->getQuery()->plan();
|
||||
TRI_ASSERT(plan != nullptr);
|
||||
TRI_ASSERT(plan->empty());
|
||||
|
||||
|
||||
if ((*it).id > 0) {
|
||||
Query* otherQuery = query->clone(PART_DEPENDENT);
|
||||
otherQuery->engine(engine);
|
||||
|
||||
int res = otherQuery->trx()->begin();
|
||||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
THROW_ARANGO_EXCEPTION_MESSAGE(res, "could not begin transaction");
|
||||
}
|
||||
|
||||
auto* newPlan = new ExecutionPlan(otherQuery->ast());
|
||||
otherQuery->setPlan(newPlan);
|
||||
|
||||
// clone all variables
|
||||
for (auto it2 : query->ast()->variables()->variables(true)) {
|
||||
auto var = query->ast()->variables()->getVariable(it2.first);
|
||||
TRI_ASSERT(var != nullptr);
|
||||
otherQuery->ast()->variables()->createVariable(var);
|
||||
}
|
||||
|
||||
ExecutionNode const* current = (*it).nodes.front();
|
||||
ExecutionNode* previous = nullptr;
|
||||
|
||||
// TODO: fix instanciation here as in DBserver case
|
||||
while (current != nullptr) {
|
||||
auto clone = current->clone(newPlan, false, true);
|
||||
newPlan->registerNode(clone);
|
||||
auto clone = current->clone(plan, false, true);
|
||||
plan->registerNode(clone);
|
||||
|
||||
if (previous == nullptr) {
|
||||
// set the root node
|
||||
newPlan->root(clone);
|
||||
plan->root(clone);
|
||||
}
|
||||
else {
|
||||
previous->addDependency(clone);
|
||||
|
@ -380,16 +370,13 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
|
|||
current = deps[0];
|
||||
}
|
||||
|
||||
// TODO: test if this is necessary or does harm
|
||||
// newPlan->setVarUsageComputed();
|
||||
|
||||
// we need to instanciate this engine in the registry
|
||||
|
||||
// create a remote id for the engine that we can pass to
|
||||
// the plans to be created for the DBServers
|
||||
id = TRI_NewTickServer();
|
||||
|
||||
queryRegistry->insert(otherQuery->vocbase(), id, otherQuery, 3600.0);
|
||||
queryRegistry->insert(id, engine->getQuery(), 3600.0);
|
||||
}
|
||||
}
|
||||
else {
|
||||
|
@ -576,7 +563,11 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
|
|||
|
||||
ExecutionEngine* buildEngineCoordinator (EngineInfo& info,
|
||||
std::unordered_map<std::string, std::string> const& queryIds) {
|
||||
std::unique_ptr<ExecutionEngine> engine(new ExecutionEngine(query));
|
||||
// need a new query instance on the coordinator
|
||||
auto clone = query->clone(PART_DEPENDENT, false);
|
||||
|
||||
std::unique_ptr<ExecutionEngine> engine(new ExecutionEngine(clone));
|
||||
clone->engine(engine.get());
|
||||
|
||||
std::unordered_map<ExecutionNode*, ExecutionBlock*> cache;
|
||||
RemoteNode* remoteNode = nullptr;
|
||||
|
@ -615,19 +606,21 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
|
|||
}
|
||||
}
|
||||
|
||||
if (nodeType == ExecutionNode::GATHER ||
|
||||
nodeType == ExecutionNode::DISTRIBUTE) {
|
||||
// we found a gather or distribute node
|
||||
if (nodeType == ExecutionNode::GATHER /* ||
|
||||
nodeType == ExecutionNode::DISTRIBUTE */) {
|
||||
// we found a gather node
|
||||
TRI_ASSERT(remoteNode != nullptr);
|
||||
|
||||
// now we'll create a remote node for each shard and add it to the gather|distribute node
|
||||
// now we'll create a remote node for each shard and add it to the gather node
|
||||
Collection const* collection = nullptr;
|
||||
if (nodeType == ExecutionNode::GATHER) {
|
||||
collection = static_cast<GatherNode const*>((*en))->collection();
|
||||
}
|
||||
/* TODO: do we need to handle distribute nodes here, too??
|
||||
else if (nodeType == ExecutionNode::DISTRIBUTE) {
|
||||
collection = static_cast<DistributeNode const*>((*en))->collection();
|
||||
}
|
||||
*/
|
||||
else {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
|
||||
}
|
||||
|
|
|
@ -131,10 +131,15 @@ namespace triagens {
|
|||
/// @brief shutdown, will be called exactly once for the whole query
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int shutdown () {
|
||||
if (_root != nullptr) {
|
||||
return _root->shutdown();
|
||||
int shutdown (int errorCode) {
|
||||
if (_root != nullptr && ! _wasShutdown) {
|
||||
// prevent a duplicate shutdown
|
||||
int res = _root->shutdown(errorCode);
|
||||
_wasShutdown = true;
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
||||
|
@ -236,6 +241,12 @@ namespace triagens {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
Query* _query;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief whether or not shutdown() was executed
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool _wasShutdown;
|
||||
};
|
||||
|
||||
}
|
||||
|
|
|
@ -107,6 +107,14 @@ namespace triagens {
|
|||
TRI_memory_zone_t* zone,
|
||||
bool verbose) const;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief check if the plan is empty
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
inline bool empty () const {
|
||||
return (_root == nullptr);
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief note that an optimizer rule was applied
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
|
|
@ -51,6 +51,17 @@ Json ExecutionStats::toJson () const {
|
|||
return json;
|
||||
}
|
||||
|
||||
Json ExecutionStats::toJsonStatic () {
|
||||
Json json(Json::Array);
|
||||
json.set("writesExecuted", Json(static_cast<double>(0)));
|
||||
json.set("writesIgnored", Json(static_cast<double>(0)));
|
||||
json.set("scannedFull", Json(static_cast<double>(0)));
|
||||
json.set("scannedIndex", Json(static_cast<double>(0)));
|
||||
json.set("static", Json(static_cast<double>(0)));
|
||||
|
||||
return json;
|
||||
}
|
||||
|
||||
ExecutionStats::ExecutionStats()
|
||||
:writesExecuted(0),
|
||||
writesIgnored(0),
|
||||
|
|
|
@ -52,6 +52,12 @@ namespace triagens {
|
|||
|
||||
triagens::basics::Json toJson () const;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief create empty statistics for JSON
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
static triagens::basics::Json toJsonStatic ();
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief sumarize two sets of ExecutionStats
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
|
|
@ -162,7 +162,7 @@ void Expression::replaceVariables (std::unordered_map<VariableId, Variable const
|
|||
|
||||
_ast->replaceVariables(const_cast<AstNode*>(_node), replacements);
|
||||
|
||||
invalidateExpression();
|
||||
invalidate();
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -172,7 +172,7 @@ void Expression::replaceVariables (std::unordered_map<VariableId, Variable const
|
|||
/// multiple V8 contexts, it must be invalidated in between
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void Expression::invalidateExpression () {
|
||||
void Expression::invalidate () {
|
||||
if (_type == V8) {
|
||||
delete _func;
|
||||
_type = UNPROCESSED;
|
||||
|
|
|
@ -208,7 +208,7 @@ namespace triagens {
|
|||
/// multiple V8 contexts, it must be invalidated in between
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void invalidateExpression ();
|
||||
void invalidate ();
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
// --SECTION-- private functions
|
||||
|
|
|
@ -1748,7 +1748,7 @@ int triagens::aql::distributeInCluster (Optimizer* opt,
|
|||
if (nodeType == ExecutionNode::REMOVE) {
|
||||
// check if collection shard keys are only _key
|
||||
std::vector<std::string> shardKeys = collection->shardKeys();
|
||||
if (shardKeys.size() != 1 || shardKeys[0] != "_key") {
|
||||
if (shardKeys.size() != 1 || shardKeys[0] != TRI_VOC_ATTRIBUTE_KEY) {
|
||||
opt->addPlan(plan, rule->level, wasModified);
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
@ -2078,7 +2078,7 @@ class RemoveToEnumCollFinder: public WalkerWorker<ExecutionNode> {
|
|||
if (_setter->getType() == EN::CALCULATION) {
|
||||
// this should be an attribute access for _key
|
||||
auto cn = static_cast<CalculationNode*>(_setter);
|
||||
if (!(cn->expression()->isAttributeAccess())) {
|
||||
if (! cn->expression()->isAttributeAccess()) {
|
||||
break; // abort . . .
|
||||
}
|
||||
// check the variable is the same as the remove variable
|
||||
|
@ -2114,6 +2114,7 @@ class RemoveToEnumCollFinder: public WalkerWorker<ExecutionNode> {
|
|||
_lastNode = en;
|
||||
return false; // continue . . .
|
||||
}
|
||||
case EN::DISTRIBUTE:
|
||||
case EN::SCATTER: {
|
||||
if (_scatter) { // met more than one scatter node
|
||||
break; // abort . . .
|
||||
|
@ -2152,8 +2153,7 @@ class RemoveToEnumCollFinder: public WalkerWorker<ExecutionNode> {
|
|||
// ever happen?
|
||||
|
||||
// check these are a Calc-Filter pair
|
||||
if (cn->getVariablesSetHere()[0]->id
|
||||
!= fn->getVariablesUsedHere()[0]->id) {
|
||||
if (cn->getVariablesSetHere()[0]->id != fn->getVariablesUsedHere()[0]->id) {
|
||||
break; // abort . . .
|
||||
}
|
||||
|
||||
|
@ -2186,7 +2186,6 @@ class RemoveToEnumCollFinder: public WalkerWorker<ExecutionNode> {
|
|||
case EN::INSERT:
|
||||
case EN::REPLACE:
|
||||
case EN::UPDATE:
|
||||
case EN::DISTRIBUTE:
|
||||
case EN::RETURN:
|
||||
case EN::NORESULTS:
|
||||
case EN::ILLEGAL:
|
||||
|
@ -2218,7 +2217,7 @@ int triagens::aql::undistributeRemoveAfterEnumColl (Optimizer* opt,
|
|||
}
|
||||
|
||||
bool modified = false;
|
||||
if (!toUnlink.empty()) {
|
||||
if (! toUnlink.empty()) {
|
||||
plan->unlinkNodes(toUnlink);
|
||||
plan->findVarUsage();
|
||||
modified = true;
|
||||
|
|
|
@ -210,7 +210,7 @@ Query::Query (triagens::arango::ApplicationV8* applicationV8,
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
Query::~Query () {
|
||||
cleanupPlanAndEngine();
|
||||
cleanupPlanAndEngine(TRI_ERROR_INTERNAL); // abort the transaction
|
||||
|
||||
if (_profile != nullptr) {
|
||||
delete _profile;
|
||||
|
@ -258,9 +258,12 @@ Query::~Query () {
|
|||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief clone a query
|
||||
/// note: as a side-effect, this will also create and start a transaction for
|
||||
/// the query
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
Query* Query::clone (QueryPart part) {
|
||||
Query* Query::clone (QueryPart part,
|
||||
bool withPlan) {
|
||||
TRI_json_t* options = nullptr;
|
||||
|
||||
if (_options != nullptr) {
|
||||
|
@ -287,7 +290,10 @@ Query* Query::clone (QueryPart part) {
|
|||
}
|
||||
|
||||
if (_plan != nullptr) {
|
||||
clone->_plan = _plan->clone(*clone);
|
||||
if (withPlan) {
|
||||
// clone the existing plan
|
||||
clone->setPlan(_plan->clone(*clone));
|
||||
}
|
||||
|
||||
// clone all variables
|
||||
for (auto it : _ast->variables()->variables(true)) {
|
||||
|
@ -296,11 +302,23 @@ Query* Query::clone (QueryPart part) {
|
|||
clone->ast()->variables()->createVariable(var);
|
||||
}
|
||||
}
|
||||
|
||||
if (clone->_plan == nullptr) {
|
||||
// initialize an empty plan
|
||||
clone->setPlan(new ExecutionPlan(ast()));
|
||||
}
|
||||
|
||||
TRI_ASSERT(clone->_trx == nullptr);
|
||||
|
||||
clone->_trx = _trx->clone(); // A daughter transaction which does not
|
||||
// actually lock the collections
|
||||
|
||||
int res = clone->_trx->begin();
|
||||
|
||||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
THROW_ARANGO_EXCEPTION_MESSAGE(res, "could not begin transaction");
|
||||
}
|
||||
|
||||
return clone.release();
|
||||
}
|
||||
|
||||
|
@ -454,6 +472,7 @@ QueryResult Query::prepare (QueryRegistry* registry) {
|
|||
parser->ast()->variables()->fromJson(_queryJson);
|
||||
// creating the plan may have produced some collections
|
||||
// we need to add them to the transaction now (otherwise the query will fail)
|
||||
|
||||
int res = _trx->addCollectionList(_collections.collections());
|
||||
|
||||
if (res == TRI_ERROR_NO_ERROR) {
|
||||
|
@ -506,19 +525,19 @@ QueryResult Query::prepare (QueryRegistry* registry) {
|
|||
return QueryResult();
|
||||
}
|
||||
catch (triagens::arango::Exception const& ex) {
|
||||
cleanupPlanAndEngine();
|
||||
cleanupPlanAndEngine(ex.code());
|
||||
return QueryResult(ex.code(), getStateString() + ex.message());
|
||||
}
|
||||
catch (std::bad_alloc const&) {
|
||||
cleanupPlanAndEngine();
|
||||
cleanupPlanAndEngine(TRI_ERROR_OUT_OF_MEMORY);
|
||||
return QueryResult(TRI_ERROR_OUT_OF_MEMORY, getStateString() + TRI_errno_string(TRI_ERROR_OUT_OF_MEMORY));
|
||||
}
|
||||
catch (std::exception const& ex) {
|
||||
cleanupPlanAndEngine();
|
||||
cleanupPlanAndEngine(TRI_ERROR_INTERNAL);
|
||||
return QueryResult(TRI_ERROR_INTERNAL, getStateString() + ex.what());
|
||||
}
|
||||
catch (...) {
|
||||
cleanupPlanAndEngine();
|
||||
cleanupPlanAndEngine(TRI_ERROR_INTERNAL);
|
||||
return QueryResult(TRI_ERROR_INTERNAL, getStateString() + TRI_errno_string(TRI_ERROR_INTERNAL));
|
||||
}
|
||||
}
|
||||
|
@ -560,7 +579,7 @@ QueryResult Query::execute (QueryRegistry* registry) {
|
|||
|
||||
_trx->commit();
|
||||
|
||||
cleanupPlanAndEngine();
|
||||
cleanupPlanAndEngine(TRI_ERROR_NO_ERROR);
|
||||
|
||||
enterState(FINALIZATION);
|
||||
|
||||
|
@ -575,19 +594,19 @@ QueryResult Query::execute (QueryRegistry* registry) {
|
|||
return result;
|
||||
}
|
||||
catch (triagens::arango::Exception const& ex) {
|
||||
cleanupPlanAndEngine();
|
||||
cleanupPlanAndEngine(ex.code());
|
||||
return QueryResult(ex.code(), getStateString() + ex.message());
|
||||
}
|
||||
catch (std::bad_alloc const&) {
|
||||
cleanupPlanAndEngine();
|
||||
cleanupPlanAndEngine(TRI_ERROR_OUT_OF_MEMORY);
|
||||
return QueryResult(TRI_ERROR_OUT_OF_MEMORY, getStateString() + TRI_errno_string(TRI_ERROR_OUT_OF_MEMORY));
|
||||
}
|
||||
catch (std::exception const& ex) {
|
||||
cleanupPlanAndEngine();
|
||||
cleanupPlanAndEngine(TRI_ERROR_INTERNAL);
|
||||
return QueryResult(TRI_ERROR_INTERNAL, getStateString() + ex.what());
|
||||
}
|
||||
catch (...) {
|
||||
cleanupPlanAndEngine();
|
||||
cleanupPlanAndEngine(TRI_ERROR_INTERNAL);
|
||||
return QueryResult(TRI_ERROR_INTERNAL, getStateString() + TRI_errno_string(TRI_ERROR_INTERNAL));
|
||||
}
|
||||
}
|
||||
|
@ -845,7 +864,12 @@ void Query::exitContext () {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
triagens::basics::Json Query::getStats() {
|
||||
return _engine->_stats.toJson();
|
||||
if (_engine) {
|
||||
return _engine->_stats.toJson();
|
||||
}
|
||||
else {
|
||||
return ExecutionStats::toJsonStatic();
|
||||
}
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -955,16 +979,22 @@ void Query::enterState (ExecutionState state) {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
std::string Query::getStateString () const {
|
||||
return "in state " + StateNames[_state] + ": ";
|
||||
return "in state '" + StateNames[_state] + "': ";
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief cleanup plan and engine for current query
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void Query::cleanupPlanAndEngine () {
|
||||
void Query::cleanupPlanAndEngine (int errorCode) {
|
||||
if (_engine != nullptr) {
|
||||
_engine->shutdown();
|
||||
try {
|
||||
_engine->shutdown(errorCode);
|
||||
}
|
||||
catch (...) {
|
||||
// shutdown may fail but we must not throw here
|
||||
// (we're also called from the destructor)
|
||||
}
|
||||
delete _engine;
|
||||
_engine = nullptr;
|
||||
}
|
||||
|
|
|
@ -146,7 +146,13 @@ namespace triagens {
|
|||
|
||||
~Query ();
|
||||
|
||||
Query* clone (QueryPart);
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief clone a query
|
||||
/// note: as a side-effect, this will also create and start a transaction for
|
||||
/// the query
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
Query* clone (QueryPart, bool);
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
// --SECTION-- public methods
|
||||
|
@ -349,16 +355,11 @@ namespace triagens {
|
|||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief set the transaction for the query
|
||||
/// @brief get the plan for the query
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void setTrx (triagens::arango::AqlTransaction* trx) {
|
||||
TRI_ASSERT(_trx == nullptr);
|
||||
_trx = trx;
|
||||
}
|
||||
|
||||
triagens::arango::AqlTransaction* getTrx () {
|
||||
return _trx;
|
||||
ExecutionPlan* plan () const {
|
||||
return _plan;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -431,7 +432,7 @@ namespace triagens {
|
|||
/// @brief cleanup plan and engine for current query
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void cleanupPlanAndEngine ();
|
||||
void cleanupPlanAndEngine (int);
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
// --SECTION-- private variables
|
||||
|
|
|
@ -45,21 +45,32 @@ using namespace triagens::aql;
|
|||
|
||||
QueryRegistry::~QueryRegistry () {
|
||||
std::vector<std::pair<std::string, QueryId>> toDelete;
|
||||
|
||||
{
|
||||
WRITE_LOCKER(_lock);
|
||||
for (auto& x : _queries) {
|
||||
// x.first is a TRI_vocbase_t* and
|
||||
// x.second is a std::unordered_map<QueryId, QueryInfo*>
|
||||
for (auto& y : x.second) {
|
||||
// y.first is a QueryId and
|
||||
// y.second is a QueryInfo*
|
||||
toDelete.emplace_back(x.first, y.first);
|
||||
|
||||
try {
|
||||
for (auto& x : _queries) {
|
||||
// x.first is a TRI_vocbase_t* and
|
||||
// x.second is a std::unordered_map<QueryId, QueryInfo*>
|
||||
for (auto& y : x.second) {
|
||||
// y.first is a QueryId and
|
||||
// y.second is a QueryInfo*
|
||||
toDelete.emplace_back(x.first, y.first);
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (...) {
|
||||
// the emplace_back() above might fail
|
||||
// prevent throwing exceptions in the destructor
|
||||
}
|
||||
}
|
||||
|
||||
// note: destroy() will acquire _lock itself, so it must be called without
|
||||
// holding the lock
|
||||
for (auto& p : toDelete) {
|
||||
try { // just in case
|
||||
destroy(p.first, p.second);
|
||||
destroy(p.first, p.second, TRI_ERROR_TRANSACTION_ABORTED);
|
||||
}
|
||||
catch (...) {
|
||||
}
|
||||
|
@ -70,12 +81,13 @@ QueryRegistry::~QueryRegistry () {
|
|||
/// @brief insert
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void QueryRegistry::insert (TRI_vocbase_t* vocbase,
|
||||
QueryId id,
|
||||
void QueryRegistry::insert (QueryId id,
|
||||
Query* query,
|
||||
double ttl) {
|
||||
|
||||
TRI_ASSERT(query != nullptr);
|
||||
TRI_ASSERT(query->trx() != nullptr);
|
||||
auto vocbase = query->vocbase();
|
||||
|
||||
WRITE_LOCKER(_lock);
|
||||
|
||||
|
@ -171,7 +183,9 @@ void QueryRegistry::close (TRI_vocbase_t* vocbase, QueryId id, double ttl) {
|
|||
/// @brief destroy
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void QueryRegistry::destroy (std::string const& vocbase, QueryId id) {
|
||||
void QueryRegistry::destroy (std::string const& vocbase,
|
||||
QueryId id,
|
||||
int errorCode) {
|
||||
WRITE_LOCKER(_lock);
|
||||
|
||||
auto m = _queries.find(vocbase);
|
||||
|
@ -193,6 +207,11 @@ void QueryRegistry::destroy (std::string const& vocbase, QueryId id) {
|
|||
triagens::arango::TransactionBase::increaseNumbers(1, 1);
|
||||
}
|
||||
|
||||
if (errorCode == TRI_ERROR_NO_ERROR) {
|
||||
// commit the operation
|
||||
qi->_query->trx()->commit();
|
||||
}
|
||||
|
||||
// Now we can delete it:
|
||||
delete qi->_query;
|
||||
delete qi;
|
||||
|
@ -205,8 +224,10 @@ void QueryRegistry::destroy (std::string const& vocbase, QueryId id) {
|
|||
/// @brief destroy
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void QueryRegistry::destroy (TRI_vocbase_t* vocbase, QueryId id) {
|
||||
destroy(vocbase->_name, id);
|
||||
void QueryRegistry::destroy (TRI_vocbase_t* vocbase,
|
||||
QueryId id,
|
||||
int errorCode) {
|
||||
destroy(vocbase->_name, id, errorCode);
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -232,8 +253,8 @@ void QueryRegistry::expireQueries () {
|
|||
}
|
||||
}
|
||||
for (auto& p : toDelete) {
|
||||
try { // just in case
|
||||
destroy(p.first, p.second);
|
||||
try { // just in case
|
||||
destroy(p.first, p.second, TRI_ERROR_TRANSACTION_ABORTED);
|
||||
}
|
||||
catch (...) {
|
||||
}
|
||||
|
|
|
@ -65,8 +65,7 @@ namespace triagens {
|
|||
/// query will be deleted if it is not opened for that amount of time.
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void insert (TRI_vocbase_t* vocbase,
|
||||
QueryId id,
|
||||
void insert (QueryId id,
|
||||
Query* query,
|
||||
double ttl = 3600.0);
|
||||
|
||||
|
@ -99,9 +98,9 @@ namespace triagens {
|
|||
/// from the same thread that has opened it!
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void destroy (std::string const& vocbase, QueryId id);
|
||||
void destroy (std::string const& vocbase, QueryId id, int errorCode);
|
||||
|
||||
void destroy (TRI_vocbase_t* vocbase, QueryId id);
|
||||
void destroy (TRI_vocbase_t* vocbase, QueryId id, int errorCode);
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief expireQueries, this deletes all expired queries from the registry
|
||||
|
|
|
@ -151,7 +151,7 @@ void RestAqlHandler::createQueryFromJson () {
|
|||
_qId = TRI_NewTickServer();
|
||||
|
||||
try {
|
||||
_queryRegistry->insert(_vocbase, _qId, query, ttl);
|
||||
_queryRegistry->insert(_qId, query, ttl);
|
||||
}
|
||||
catch (...) {
|
||||
LOG_ERROR("could not keep query in registry");
|
||||
|
@ -336,7 +336,7 @@ void RestAqlHandler::createQueryFromString () {
|
|||
|
||||
_qId = TRI_NewTickServer();
|
||||
try {
|
||||
_queryRegistry->insert(_vocbase, _qId, query, ttl);
|
||||
_queryRegistry->insert(_qId, query, ttl);
|
||||
}
|
||||
catch (...) {
|
||||
LOG_ERROR("could not keep query in registry");
|
||||
|
@ -416,14 +416,10 @@ void RestAqlHandler::useQuery (std::string const& operation,
|
|||
TRI_ASSERT(_qId > 0);
|
||||
TRI_ASSERT(query->engine() != nullptr);
|
||||
|
||||
Json queryJson;
|
||||
if (operation != "shutdown") {
|
||||
// /shutdown does not require a body
|
||||
queryJson = Json(TRI_UNKNOWN_MEM_ZONE, parseJsonBody());
|
||||
if (queryJson.isEmpty()) {
|
||||
_queryRegistry->close(_vocbase, _qId);
|
||||
return;
|
||||
}
|
||||
Json queryJson = Json(TRI_UNKNOWN_MEM_ZONE, parseJsonBody());
|
||||
if (queryJson.isEmpty()) {
|
||||
_queryRegistry->close(_vocbase, _qId);
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
|
@ -721,7 +717,7 @@ void RestAqlHandler::handleUseQuery (std::string const& operation,
|
|||
shardId = shardIdCharP;
|
||||
}
|
||||
|
||||
Json answerBody(Json::Array, 2);
|
||||
Json answerBody(Json::Array, 3);
|
||||
|
||||
if (operation == "getSome") {
|
||||
auto atLeast = JsonHelper::getNumericValue<uint64_t>(queryJson.json(),
|
||||
|
@ -742,7 +738,8 @@ void RestAqlHandler::handleUseQuery (std::string const& operation,
|
|||
}
|
||||
if (items.get() == nullptr) {
|
||||
answerBody("exhausted", Json(true))
|
||||
("error", Json(false));
|
||||
("error", Json(false))
|
||||
("stats", query->getStats());
|
||||
}
|
||||
else {
|
||||
try {
|
||||
|
@ -785,6 +782,7 @@ void RestAqlHandler::handleUseQuery (std::string const& operation,
|
|||
}
|
||||
answerBody("skipped", Json(static_cast<double>(skipped)))
|
||||
("error", Json(false));
|
||||
answerBody.set("stats", query->getStats());
|
||||
}
|
||||
else if (operation == "skip") {
|
||||
auto number = JsonHelper::getNumericValue<uint64_t>(queryJson.json(),
|
||||
|
@ -805,6 +803,7 @@ void RestAqlHandler::handleUseQuery (std::string const& operation,
|
|||
|
||||
answerBody("exhausted", Json(exhausted))
|
||||
("error", Json(false));
|
||||
answerBody.set("stats", query->getStats());
|
||||
}
|
||||
catch (...) {
|
||||
LOG_ERROR("skip lead to an exception");
|
||||
|
@ -835,12 +834,15 @@ void RestAqlHandler::handleUseQuery (std::string const& operation,
|
|||
}
|
||||
answerBody("error", Json(res != TRI_ERROR_NO_ERROR))
|
||||
("code", Json(static_cast<double>(res)));
|
||||
answerBody.set("stats", query->getStats());
|
||||
}
|
||||
else if (operation == "shutdown") {
|
||||
int res = TRI_ERROR_INTERNAL;
|
||||
int errorCode = JsonHelper::getNumericValue<int>(queryJson.json(), "code", TRI_ERROR_INTERNAL);
|
||||
|
||||
try {
|
||||
res = query->engine()->shutdown();
|
||||
_queryRegistry->destroy(_vocbase, _qId);
|
||||
res = query->engine()->shutdown(errorCode); // pass errorCode to shutdown
|
||||
_queryRegistry->destroy(_vocbase, _qId, errorCode);
|
||||
}
|
||||
catch (...) {
|
||||
LOG_ERROR("shutdown lead to an exception");
|
||||
|
@ -850,6 +852,7 @@ void RestAqlHandler::handleUseQuery (std::string const& operation,
|
|||
}
|
||||
answerBody("error", res == TRI_ERROR_NO_ERROR ? Json(false) : Json(true))
|
||||
("code", Json(static_cast<double>(res)));
|
||||
answerBody.set("stats", query->getStats());
|
||||
}
|
||||
else {
|
||||
LOG_ERROR("Unknown operation!");
|
||||
|
|
|
@ -1192,7 +1192,7 @@ void ArangoServer::closeDatabases () {
|
|||
// stop the replication appliers so all replication transactions can end
|
||||
TRI_StopReplicationAppliersServer(_server);
|
||||
|
||||
// enfore logfile manager shutdown so we are sure no one else will
|
||||
// enforce logfile manager shutdown so we are sure no one else will
|
||||
// write to the logs
|
||||
wal::LogfileManager::instance()->stop();
|
||||
|
||||
|
|
|
@ -47,7 +47,7 @@ using namespace triagens::arango;
|
|||
/// @brief ArangoDB server
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
static ArangoServer* ArangoInstance = 0;
|
||||
static ArangoServer* ArangoInstance = nullptr;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief running flag
|
||||
|
@ -174,7 +174,7 @@ static void InstallServiceCommand (string command) {
|
|||
SC_HANDLE schSCManager = OpenSCManager(NULL, SERVICES_ACTIVE_DATABASE, SC_MANAGER_ALL_ACCESS);
|
||||
|
||||
if (schSCManager == 0) {
|
||||
cout << "FATAL: OpenSCManager failed with " << GetLastError() << endl;
|
||||
cerr << "FATAL: OpenSCManager failed with " << GetLastError() << endl;
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
|
||||
|
@ -196,7 +196,7 @@ static void InstallServiceCommand (string command) {
|
|||
CloseServiceHandle(schSCManager);
|
||||
|
||||
if (schService == 0) {
|
||||
cout << "FATAL: CreateServiceA failed with " << GetLastError() << endl;
|
||||
cerr << "FATAL: CreateServiceA failed with " << GetLastError() << endl;
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
|
||||
|
@ -220,7 +220,7 @@ static void InstallService (int argc, char* argv[]) {
|
|||
CHAR path[MAX_PATH];
|
||||
|
||||
if(! GetModuleFileNameA(NULL, path, MAX_PATH)) {
|
||||
cout << "FATAL: GetModuleFileNameA failed" << endl;
|
||||
cerr << "FATAL: GetModuleFileNameA failed" << endl;
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
|
||||
|
@ -251,7 +251,7 @@ static void DeleteService (int argc, char* argv[]) {
|
|||
SC_HANDLE schSCManager = OpenSCManager(NULL, SERVICES_ACTIVE_DATABASE, SC_MANAGER_ALL_ACCESS);
|
||||
|
||||
if (schSCManager == 0) {
|
||||
cout << "FATAL: OpenSCManager failed with " << GetLastError() << endl;
|
||||
cerr << "FATAL: OpenSCManager failed with " << GetLastError() << endl;
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
|
||||
|
@ -263,12 +263,12 @@ static void DeleteService (int argc, char* argv[]) {
|
|||
CloseServiceHandle(schSCManager);
|
||||
|
||||
if (schService == 0) {
|
||||
cout << "FATAL: OpenServiceA failed with " << GetLastError() << endl;
|
||||
cerr << "FATAL: OpenServiceA failed with " << GetLastError() << endl;
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
|
||||
if (! DeleteService(schService)) {
|
||||
cout << "FATAL: DeleteService failed with " << GetLastError() << endl;
|
||||
cerr << "FATAL: DeleteService failed with " << GetLastError() << endl;
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
|
||||
|
@ -309,7 +309,7 @@ static void SetServiceStatus (DWORD dwCurrentState, DWORD dwWin32ExitCode, DWORD
|
|||
ss.dwControlsAccepted = 0;
|
||||
SetServiceStatus(ServiceStatus, &ss);
|
||||
|
||||
if (ArangoInstance != 0) {
|
||||
if (ArangoInstance != nullptr) {
|
||||
ArangoInstance->beginShutdown();
|
||||
}
|
||||
|
||||
|
@ -346,7 +346,7 @@ static void WINAPI ServiceCtrl (DWORD dwCtrlCode) {
|
|||
if (dwCtrlCode == SERVICE_CONTROL_STOP || dwCtrlCode == SERVICE_CONTROL_SHUTDOWN) {
|
||||
SetServiceStatus(SERVICE_STOP_PENDING, NO_ERROR, 0, 0);
|
||||
|
||||
if (ArangoInstance != 0) {
|
||||
if (ArangoInstance != nullptr) {
|
||||
ArangoInstance->beginShutdown();
|
||||
|
||||
while (IsRunning) {
|
||||
|
@ -402,9 +402,9 @@ static void WINAPI ServiceMain (DWORD dwArgc, LPSTR *lpszArgv) {
|
|||
|
||||
int main (int argc, char* argv[]) {
|
||||
int res = 0;
|
||||
bool startAsService = false;
|
||||
|
||||
#ifdef _WIN32
|
||||
bool startAsService = false;
|
||||
|
||||
if (1 < argc) {
|
||||
if (TRI_EqualString(argv[1], "--install-service")) {
|
||||
|
@ -440,30 +440,36 @@ int main (int argc, char* argv[]) {
|
|||
ARGV = argv;
|
||||
|
||||
if (! StartServiceCtrlDispatcher(ste)) {
|
||||
cout << "FATAL: StartServiceCtrlDispatcher has failed with " << GetLastError() << endl;
|
||||
exit(EXIT_SUCCESS);
|
||||
cerr << "FATAL: StartServiceCtrlDispatcher has failed with " << GetLastError() << endl;
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
}
|
||||
else {
|
||||
|
||||
#endif
|
||||
|
||||
if (! startAsService) {
|
||||
ArangoInstance = new ArangoServer(argc, argv);
|
||||
res = ArangoInstance->start();
|
||||
}
|
||||
|
||||
#else
|
||||
if (ArangoInstance != nullptr) {
|
||||
try {
|
||||
delete ArangoInstance;
|
||||
}
|
||||
catch (...) {
|
||||
// caught an error during shutdown
|
||||
res = EXIT_FAILURE;
|
||||
|
||||
ArangoInstance = new ArangoServer(argc, argv);
|
||||
res = ArangoInstance->start();
|
||||
|
||||
#endif
|
||||
|
||||
if (ArangoInstance != 0) {
|
||||
delete ArangoInstance;
|
||||
ArangoInstance = 0;
|
||||
#ifdef TRI_ENABLE_MAINTAINER_MODE
|
||||
cerr << "Caught an exception during shutdown";
|
||||
#endif
|
||||
}
|
||||
ArangoInstance = nullptr;
|
||||
}
|
||||
|
||||
// shutdown sub-systems
|
||||
TRIAGENS_REST_SHUTDOWN;
|
||||
TRI_GlobalExitFunction(res, NULL);
|
||||
TRI_GlobalExitFunction(res, nullptr);
|
||||
|
||||
return res;
|
||||
}
|
||||
|
|
|
@ -75,8 +75,8 @@ namespace triagens {
|
|||
this->addHint(TRI_TRANSACTION_HINT_LOCK_ENTIRELY, false);
|
||||
}
|
||||
|
||||
for (auto it = collections->begin(); it != collections->end(); ++it) {
|
||||
if (processCollection((*it).second) != TRI_ERROR_NO_ERROR) {
|
||||
for (auto it : *collections) {
|
||||
if (processCollection(it.second) != TRI_ERROR_NO_ERROR) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -95,8 +95,8 @@ namespace triagens {
|
|||
|
||||
int addCollectionList (std::map<std::string, triagens::aql::Collection*>* collections) {
|
||||
int ret = TRI_ERROR_NO_ERROR;
|
||||
for (auto it = collections->begin(); it != collections->end(); ++it) {
|
||||
ret = processCollection((*it).second);
|
||||
for (auto it : *collections) {
|
||||
ret = processCollection(it.second);
|
||||
if (ret != TRI_ERROR_NO_ERROR) {
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -356,6 +356,7 @@ ApplicationV8::V8Context* ApplicationV8::enterContext (std::string const& name,
|
|||
}
|
||||
|
||||
LOG_TRACE("found unused V8 context");
|
||||
TRI_ASSERT(! _freeContexts[name].empty());
|
||||
|
||||
V8Context* context = _freeContexts[name].back();
|
||||
|
||||
|
@ -369,6 +370,9 @@ ApplicationV8::V8Context* ApplicationV8::enterContext (std::string const& name,
|
|||
context->_isolate->Enter();
|
||||
context->_context->Enter();
|
||||
|
||||
TRI_ASSERT(context->_locker->IsLocked(context->_isolate));
|
||||
TRI_ASSERT(v8::Locker::IsLocked(context->_isolate));
|
||||
|
||||
// set the current database
|
||||
v8::HandleScope scope;
|
||||
TRI_v8_global_t* v8g = static_cast<TRI_v8_global_t*>(context->_isolate->GetData());
|
||||
|
@ -405,6 +409,9 @@ void ApplicationV8::exitContext (V8Context* context) {
|
|||
double lastGc = gc->getLastGcStamp();
|
||||
|
||||
CONDITION_LOCKER(guard, _contextCondition);
|
||||
|
||||
TRI_ASSERT(context->_locker->IsLocked(context->_isolate));
|
||||
TRI_ASSERT(v8::Locker::IsLocked(context->_isolate));
|
||||
|
||||
// update data for later garbage collection
|
||||
TRI_v8_global_t* v8g = static_cast<TRI_v8_global_t*>(context->_isolate->GetData());
|
||||
|
@ -445,6 +452,9 @@ void ApplicationV8::exitContext (V8Context* context) {
|
|||
context->_isolate->Enter();
|
||||
context->_context->Enter();
|
||||
|
||||
TRI_ASSERT(context->_locker->IsLocked(context->_isolate));
|
||||
TRI_ASSERT(v8::Locker::IsLocked(context->_isolate));
|
||||
|
||||
context->handleGlobalContextMethods();
|
||||
|
||||
context->_context->Exit();
|
||||
|
@ -480,6 +490,7 @@ void ApplicationV8::exitContext (V8Context* context) {
|
|||
_busyContexts[name].erase(context);
|
||||
|
||||
delete context->_locker;
|
||||
TRI_ASSERT(! v8::Locker::IsLocked(context->_isolate));
|
||||
|
||||
guard.broadcast();
|
||||
}
|
||||
|
@ -502,6 +513,9 @@ void ApplicationV8::exitContext (V8Context* context) {
|
|||
|
||||
context->_isolate->Enter();
|
||||
context->_context->Enter();
|
||||
|
||||
TRI_ASSERT(context->_locker->IsLocked(context->_isolate));
|
||||
TRI_ASSERT(v8::Locker::IsLocked(context->_isolate));
|
||||
|
||||
v8::V8::LowMemoryNotification();
|
||||
while (! v8::V8::IdleNotification()) {
|
||||
|
@ -608,6 +622,9 @@ void ApplicationV8::collectGarbage () {
|
|||
context->_locker = new v8::Locker(context->_isolate);
|
||||
context->_isolate->Enter();
|
||||
context->_context->Enter();
|
||||
|
||||
TRI_ASSERT(context->_locker->IsLocked(context->_isolate));
|
||||
TRI_ASSERT(v8::Locker::IsLocked(context->_isolate));
|
||||
|
||||
v8::V8::LowMemoryNotification();
|
||||
while (! v8::V8::IdleNotification()) {
|
||||
|
@ -1343,6 +1360,7 @@ void ApplicationV8::shutdownV8Instance (const string& name, size_t i) {
|
|||
if (v8g != nullptr) {
|
||||
if (v8g->_transactionContext != nullptr) {
|
||||
delete static_cast<V8TransactionContext*>(v8g->_transactionContext);
|
||||
v8g->_transactionContext = nullptr;
|
||||
}
|
||||
delete v8g;
|
||||
}
|
||||
|
|
|
@ -1988,15 +1988,13 @@ int TRI_InitDatabasesServer (TRI_server_t* server) {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int TRI_StopServer (TRI_server_t* server) {
|
||||
int res;
|
||||
|
||||
// set shutdown flag
|
||||
TRI_LockMutex(&server->_createLock);
|
||||
server->_shutdown = true;
|
||||
TRI_UnlockMutex(&server->_createLock);
|
||||
|
||||
// stop dbm thread
|
||||
res = TRI_JoinThread(&server->_databaseManager);
|
||||
int res = TRI_JoinThread(&server->_databaseManager);
|
||||
|
||||
CloseDatabases(server);
|
||||
|
||||
|
|
|
@ -1417,7 +1417,13 @@ function UnitTest (which, options) {
|
|||
results.all_ok = allok;
|
||||
}
|
||||
results.all_ok = allok;
|
||||
cleanupDBDirectories(options);
|
||||
if (allok) {
|
||||
cleanupDBDirectories(options);
|
||||
}
|
||||
else {
|
||||
print("since some tests weren't successfully, not cleaning up: ");
|
||||
print(cleanupDirectories);
|
||||
}
|
||||
if (jsonReply === true ) {
|
||||
return results;
|
||||
}
|
||||
|
@ -1445,7 +1451,14 @@ function UnitTest (which, options) {
|
|||
}
|
||||
r.ok = ok;
|
||||
results.all_ok = ok;
|
||||
cleanupDBDirectories(options);
|
||||
|
||||
if (allok) {
|
||||
cleanupDBDirectories(options);
|
||||
}
|
||||
else {
|
||||
print("since some tests weren't successfully, not cleaning up: ");
|
||||
print(cleanupDirectories);
|
||||
}
|
||||
if (jsonReply === true ) {
|
||||
return results;
|
||||
}
|
||||
|
|
|
@ -146,7 +146,7 @@ namespace triagens {
|
|||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief handles CORS options
|
||||
/// @brief handles request
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void processRequest (uint32_t compatibility) {
|
||||
|
@ -216,8 +216,6 @@ namespace triagens {
|
|||
|
||||
// synchronous request
|
||||
else {
|
||||
this->RequestStatisticsAgent::transfer(handler);
|
||||
|
||||
ok = this->_server->handleRequest(this, handler);
|
||||
}
|
||||
|
||||
|
@ -349,7 +347,6 @@ namespace triagens {
|
|||
this->_sinceCompactification++;
|
||||
}
|
||||
|
||||
|
||||
const char * ptr = this->_readBuffer->c_str() + this->_readPosition;
|
||||
const char * end = this->_readBuffer->end() - 3;
|
||||
|
||||
|
@ -569,8 +566,10 @@ namespace triagens {
|
|||
}
|
||||
}
|
||||
else {
|
||||
if (this->_readBuffer->c_str() < end) {
|
||||
this->_readPosition = end - this->_readBuffer->c_str();
|
||||
size_t l = (this->_readBuffer->end() - this->_readBuffer->c_str());
|
||||
|
||||
if (this->_startPosition + 4 <= l) {
|
||||
this->_readPosition = l - 4;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -241,7 +241,7 @@ AnyServer::AnyServer ()
|
|||
_supervisorMode(false),
|
||||
_pidFile(""),
|
||||
_workingDirectory(""),
|
||||
_applicationServer(0) {
|
||||
_applicationServer(nullptr) {
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -249,7 +249,7 @@ AnyServer::AnyServer ()
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
AnyServer::~AnyServer () {
|
||||
if (_applicationServer != 0) {
|
||||
if (_applicationServer != nullptr) {
|
||||
delete _applicationServer;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue