1
0
Fork 0

fix resetting of cursors

This commit is contained in:
Jan Steemann 2016-03-11 17:56:23 +01:00
parent 4385bd8a4e
commit ba91761a66
9 changed files with 94 additions and 25 deletions

View File

@ -78,6 +78,7 @@ int SingletonBlock::shutdown(int errorCode) {
int SingletonBlock::getOrSkipSome(size_t, // atLeast,
size_t atMost, bool skipping,
AqlItemBlock*& result, size_t& skipped) {
DEBUG_BEGIN_BLOCK();
TRI_ASSERT(result == nullptr && skipped == 0);
if (_done) {
@ -126,6 +127,7 @@ int SingletonBlock::getOrSkipSome(size_t, // atLeast,
_done = true;
return TRI_ERROR_NO_ERROR;
DEBUG_END_BLOCK();
}
FilterBlock::FilterBlock(ExecutionEngine* engine, FilterNode const* en)
@ -145,6 +147,7 @@ int FilterBlock::initialize() { return ExecutionBlock::initialize(); }
////////////////////////////////////////////////////////////////////////////////
bool FilterBlock::getBlock(size_t atLeast, size_t atMost) {
DEBUG_BEGIN_BLOCK();
while (true) { // will be left by break or return
if (!ExecutionBlock::getBlock(atLeast, atMost)) {
return false;
@ -180,10 +183,12 @@ bool FilterBlock::getBlock(size_t atLeast, size_t atMost) {
}
return true;
DEBUG_END_BLOCK();
}
int FilterBlock::getOrSkipSome(size_t atLeast, size_t atMost, bool skipping,
AqlItemBlock*& result, size_t& skipped) {
DEBUG_BEGIN_BLOCK();
TRI_ASSERT(result == nullptr && skipped == 0);
if (_done) {
@ -287,9 +292,11 @@ int FilterBlock::getOrSkipSome(size_t atLeast, size_t atMost, bool skipping,
}
}
return TRI_ERROR_NO_ERROR;
DEBUG_END_BLOCK();
}
bool FilterBlock::hasMore() {
DEBUG_BEGIN_BLOCK();
if (_done) {
return false;
}
@ -312,6 +319,7 @@ bool FilterBlock::hasMore() {
// in it.
return true;
DEBUG_END_BLOCK();
}
int LimitBlock::initialize() {
@ -335,6 +343,7 @@ int LimitBlock::initializeCursor(AqlItemBlock* items, size_t pos) {
int LimitBlock::getOrSkipSome(size_t atLeast, size_t atMost, bool skipping,
AqlItemBlock*& result, size_t& skipped) {
DEBUG_BEGIN_BLOCK();
TRI_ASSERT(result == nullptr && skipped == 0);
if (_state == 2) {
@ -412,9 +421,11 @@ int LimitBlock::getOrSkipSome(size_t atLeast, size_t atMost, bool skipping,
}
return TRI_ERROR_NO_ERROR;
DEBUG_END_BLOCK();
}
AqlItemBlock* ReturnBlock::getSome(size_t atLeast, size_t atMost) {
DEBUG_BEGIN_BLOCK();
std::unique_ptr<AqlItemBlock> res(
ExecutionBlock::getSomeWithoutRegisterClearout(atLeast, atMost));
@ -466,6 +477,7 @@ AqlItemBlock* ReturnBlock::getSome(size_t atLeast, size_t atMost) {
res.release();
return stripped.release();
DEBUG_END_BLOCK();
}
////////////////////////////////////////////////////////////////////////////////
@ -475,6 +487,7 @@ AqlItemBlock* ReturnBlock::getSome(size_t atLeast, size_t atMost) {
////////////////////////////////////////////////////////////////////////////////
RegisterId ReturnBlock::returnInheritedResults() {
DEBUG_BEGIN_BLOCK();
_returnInheritedResults = true;
auto ep = static_cast<ReturnNode const*>(getPlanNode());
@ -482,6 +495,7 @@ RegisterId ReturnBlock::returnInheritedResults() {
TRI_ASSERT(it != ep->getRegisterPlan()->varInfo.end());
return it->second.registerId;
DEBUG_END_BLOCK();
}
////////////////////////////////////////////////////////////////////////////////

View File

@ -110,6 +110,7 @@ void CalculationBlock::fillBlockWithReference(AqlItemBlock* result) {
////////////////////////////////////////////////////////////////////////////////
void CalculationBlock::executeExpression(AqlItemBlock* result) {
DEBUG_BEGIN_BLOCK();
bool const hasCondition = (static_cast<CalculationNode const*>(_exeNode)
->_conditionVariable != nullptr);
@ -141,6 +142,7 @@ void CalculationBlock::executeExpression(AqlItemBlock* result) {
guard.steal(); // itemblock has taken over now
throwIfKilled(); // check if we were aborted
}
DEBUG_END_BLOCK();
}
////////////////////////////////////////////////////////////////////////////////
@ -148,6 +150,7 @@ void CalculationBlock::executeExpression(AqlItemBlock* result) {
////////////////////////////////////////////////////////////////////////////////
void CalculationBlock::doEvaluation(AqlItemBlock* result) {
DEBUG_BEGIN_BLOCK();
TRI_ASSERT(result != nullptr);
if (_isReference) {
@ -199,9 +202,11 @@ void CalculationBlock::doEvaluation(AqlItemBlock* result) {
// the V8 handle scope and the scope guard
executeExpression(result);
}
DEBUG_END_BLOCK();
}
AqlItemBlock* CalculationBlock::getSome(size_t atLeast, size_t atMost) {
DEBUG_BEGIN_BLOCK();
std::unique_ptr<AqlItemBlock> res(
ExecutionBlock::getSomeWithoutRegisterClearout(atLeast, atMost));
@ -213,4 +218,5 @@ AqlItemBlock* CalculationBlock::getSome(size_t atLeast, size_t atMost) {
// Clear out registers no longer needed later:
clearRegisters(res.get());
return res.release();
DEBUG_END_BLOCK();
}

View File

@ -33,8 +33,8 @@ CollectionScanner::CollectionScanner(arangodb::AqlTransaction* trx,
(readRandom ? Transaction::CursorType::ANY
: Transaction::CursorType::ALL),
"", VPackSlice(), 0, UINT64_MAX, 1000, false)) {
TRI_ASSERT(_cursor.successful());
}
TRI_ASSERT(_cursor.successful());
}
CollectionScanner::~CollectionScanner() {}

View File

@ -54,6 +54,7 @@ EnumerateCollectionBlock::~EnumerateCollectionBlock() { delete _scanner; }
void EnumerateCollectionBlock::initializeDocuments() {
_scanner->reset();
_documents = VPackSlice();
_documentsSize = 0;
_posInDocuments = 0;
}
@ -102,8 +103,10 @@ bool EnumerateCollectionBlock::moreDocuments(size_t hint) {
}
_documents = _scanner->scan(hint);
TRI_ASSERT(_documents.isArray());
VPackValueLength count = _documents.length();
_documentsSize = static_cast<size_t>(count);
if (count == 0) {
_documents = VPackSlice();
@ -112,7 +115,6 @@ bool EnumerateCollectionBlock::moreDocuments(size_t hint) {
_engine->_stats.scannedFull += static_cast<int64_t>(count);
_documentsSize = static_cast<size_t>(count);
_posInDocuments = 0;
return true;
@ -144,6 +146,7 @@ int EnumerateCollectionBlock::initializeCursor(AqlItemBlock* items,
AqlItemBlock* EnumerateCollectionBlock::getSome(size_t, // atLeast,
size_t atMost) {
DEBUG_BEGIN_BLOCK();
// Invariants:
// As soon as we notice that _totalCount == 0, we set _done = true.
// Otherwise, outside of this method (or skipSome), _documents is
@ -229,9 +232,11 @@ AqlItemBlock* EnumerateCollectionBlock::getSome(size_t, // atLeast,
clearRegisters(res.get());
return res.release();
DEBUG_END_BLOCK();
}
size_t EnumerateCollectionBlock::skipSome(size_t atLeast, size_t atMost) {
DEBUG_BEGIN_BLOCK();
size_t skipped = 0;
if (_done) {
@ -285,4 +290,5 @@ size_t EnumerateCollectionBlock::skipSome(size_t atLeast, size_t atMost) {
}
// We skipped atLeast documents
return skipped;
DEBUG_END_BLOCK();
}

View File

@ -22,7 +22,7 @@
/// @author Michael Hackstein
////////////////////////////////////////////////////////////////////////////////
#include "Aql/ExecutionBlock.h"
#include "ExecutionBlock.h"
#include "Aql/ExecutionEngine.h"
using namespace arangodb::aql;
@ -73,6 +73,7 @@ bool ExecutionBlock::removeDependency(ExecutionBlock* ep) {
}
int ExecutionBlock::initializeCursor(AqlItemBlock* items, size_t pos) {
DEBUG_BEGIN_BLOCK();
for (auto& d : _dependencies) {
int res = d->initializeCursor(items, pos);
@ -88,6 +89,7 @@ int ExecutionBlock::initializeCursor(AqlItemBlock* items, size_t pos) {
_done = false;
return TRI_ERROR_NO_ERROR;
DEBUG_END_BLOCK();
}
////////////////////////////////////////////////////////////////////////////////
@ -158,10 +160,12 @@ int ExecutionBlock::shutdown(int errorCode) {
////////////////////////////////////////////////////////////////////////////////
AqlItemBlock* ExecutionBlock::getSome(size_t atLeast, size_t atMost) {
DEBUG_BEGIN_BLOCK();
std::unique_ptr<AqlItemBlock> result(
getSomeWithoutRegisterClearout(atLeast, atMost));
clearRegisters(result.get());
return result.release();
DEBUG_END_BLOCK();
}
////////////////////////////////////////////////////////////////////////////////
@ -186,6 +190,7 @@ void ExecutionBlock::returnBlock(AqlItemBlock*& block) {
////////////////////////////////////////////////////////////////////////////////
int ExecutionBlock::resolve(std::string const& input) const {
DEBUG_BEGIN_BLOCK();
char const* handle = input.c_str();
char const* p = strchr(handle, TRI_DOCUMENT_HANDLE_SEPARATOR_CHR);
if (p == nullptr || *p == '\0') {
@ -205,6 +210,7 @@ int ExecutionBlock::resolve(std::string const& input) const {
}
return TRI_ERROR_NO_ERROR;
DEBUG_END_BLOCK();
}
////////////////////////////////////////////////////////////////////////////////
@ -215,6 +221,7 @@ int ExecutionBlock::resolve(std::string const& input) const {
void ExecutionBlock::inheritRegisters(AqlItemBlock const* src,
AqlItemBlock* dst, size_t srcRow,
size_t dstRow) {
DEBUG_BEGIN_BLOCK();
RegisterId const n = src->getNrRegs();
auto planNode = getPlanNode();
@ -224,15 +231,14 @@ void ExecutionBlock::inheritRegisters(AqlItemBlock const* src,
if (!value.isEmpty()) {
AqlValue a = value.clone();
try {
dst->setValue(dstRow, i, a);
} catch (...) {
a.destroy();
throw;
}
AqlValueGuard guard(a, true);
dst->setValue(dstRow, i, a);
guard.steal();
}
}
}
DEBUG_END_BLOCK();
}
////////////////////////////////////////////////////////////////////////////////
@ -242,6 +248,7 @@ void ExecutionBlock::inheritRegisters(AqlItemBlock const* src,
void ExecutionBlock::inheritRegisters(AqlItemBlock const* src,
AqlItemBlock* dst, size_t row) {
DEBUG_BEGIN_BLOCK();
RegisterId const n = src->getNrRegs();
auto planNode = getPlanNode();
@ -251,19 +258,18 @@ void ExecutionBlock::inheritRegisters(AqlItemBlock const* src,
if (!value.isEmpty()) {
AqlValue a = value.clone();
try {
TRI_IF_FAILURE("ExecutionBlock::inheritRegisters") {
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
}
dst->setValue(0, i, a);
} catch (...) {
a.destroy();
throw;
AqlValueGuard guard(a, true);
TRI_IF_FAILURE("ExecutionBlock::inheritRegisters") {
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
}
dst->setValue(0, i, a);
guard.steal();
}
}
}
DEBUG_END_BLOCK();
}
////////////////////////////////////////////////////////////////////////////////
@ -273,6 +279,7 @@ void ExecutionBlock::inheritRegisters(AqlItemBlock const* src,
////////////////////////////////////////////////////////////////////////////////
bool ExecutionBlock::getBlock(size_t atLeast, size_t atMost) {
DEBUG_BEGIN_BLOCK();
throwIfKilled(); // check if we were aborted
std::unique_ptr<AqlItemBlock> docs(
@ -290,6 +297,7 @@ bool ExecutionBlock::getBlock(size_t atLeast, size_t atMost) {
docs.release();
return true;
DEBUG_END_BLOCK();
}
////////////////////////////////////////////////////////////////////////////////
@ -302,6 +310,7 @@ bool ExecutionBlock::getBlock(size_t atLeast, size_t atMost) {
AqlItemBlock* ExecutionBlock::getSomeWithoutRegisterClearout(size_t atLeast,
size_t atMost) {
DEBUG_BEGIN_BLOCK();
TRI_ASSERT(0 < atLeast && atLeast <= atMost);
size_t skipped = 0;
@ -313,16 +322,20 @@ AqlItemBlock* ExecutionBlock::getSomeWithoutRegisterClearout(size_t atLeast,
}
return result;
DEBUG_END_BLOCK();
}
void ExecutionBlock::clearRegisters(AqlItemBlock* result) {
DEBUG_BEGIN_BLOCK();
// Clear out registers not needed later on:
if (result != nullptr) {
result->clearRegisters(getPlanNode()->_regsToClear);
}
DEBUG_END_BLOCK();
}
size_t ExecutionBlock::skipSome(size_t atLeast, size_t atMost) {
DEBUG_BEGIN_BLOCK();
TRI_ASSERT(0 < atLeast && atLeast <= atMost);
size_t skipped = 0;
@ -336,11 +349,13 @@ size_t ExecutionBlock::skipSome(size_t atLeast, size_t atMost) {
}
return skipped;
DEBUG_END_BLOCK();
}
// skip exactly <number> outputs, returns <true> if _done after
// skipping, and <false> otherwise . . .
bool ExecutionBlock::skip(size_t number) {
DEBUG_BEGIN_BLOCK();
size_t skipped = skipSome(number, number);
size_t nr = skipped;
while (nr != 0 && skipped < number) {
@ -351,6 +366,7 @@ bool ExecutionBlock::skip(size_t number) {
return true;
}
return !hasMore();
DEBUG_END_BLOCK();
}
bool ExecutionBlock::hasMore() {
@ -378,6 +394,7 @@ int64_t ExecutionBlock::remaining() {
int ExecutionBlock::getOrSkipSome(size_t atLeast, size_t atMost, bool skipping,
AqlItemBlock*& result, size_t& skipped) {
DEBUG_BEGIN_BLOCK();
TRI_ASSERT(result == nullptr && skipped == 0);
if (_done) {
@ -489,4 +506,5 @@ int ExecutionBlock::getOrSkipSome(size_t atLeast, size_t atMost, bool skipping,
freeCollector();
return TRI_ERROR_NO_ERROR;
DEBUG_END_BLOCK();
}

View File

@ -24,11 +24,23 @@
#ifndef ARANGOD_AQL_EXECUTION_BLOCK_H
#define ARANGOD_AQL_EXECUTION_BLOCK_H 1
#include "Aql/AqlItemBlock.h"
#include "AqlItemBlock.h"
#include "Aql/ExecutionNode.h"
#include <deque>
#if 0
#define DEBUG_BEGIN_BLOCK() try { //
#define DEBUG_END_BLOCK() } catch (...) { LOG(WARN) << "exception caught in " << __FILE__ << ":" << __LINE__; throw; } //
#else
#define DEBUG_BEGIN_BLOCK() //
#define DEBUG_END_BLOCK() //
#endif
namespace arangodb {
class AqlTransaction;

View File

@ -60,6 +60,7 @@ int SubqueryBlock::initialize() {
////////////////////////////////////////////////////////////////////////////////
AqlItemBlock* SubqueryBlock::getSome(size_t atLeast, size_t atMost) {
DEBUG_BEGIN_BLOCK();
std::unique_ptr<AqlItemBlock> res(
ExecutionBlock::getSomeWithoutRegisterClearout(atLeast, atMost));
@ -122,6 +123,7 @@ AqlItemBlock* SubqueryBlock::getSome(size_t atLeast, size_t atMost) {
// Clear out registers no longer needed later:
clearRegisters(res.get());
return res.release();
DEBUG_END_BLOCK();
}
////////////////////////////////////////////////////////////////////////////////
@ -143,6 +145,7 @@ int SubqueryBlock::shutdown(int errorCode) {
////////////////////////////////////////////////////////////////////////////////
std::vector<AqlItemBlock*>* SubqueryBlock::executeSubquery() {
DEBUG_BEGIN_BLOCK();
auto results = new std::vector<AqlItemBlock*>;
try {
@ -166,6 +169,7 @@ std::vector<AqlItemBlock*>* SubqueryBlock::executeSubquery() {
destroySubqueryResults(results);
throw;
}
DEBUG_END_BLOCK();
}
////////////////////////////////////////////////////////////////////////////////

View File

@ -28,7 +28,13 @@
using namespace arangodb;
void OperationCursor::reset() {
_indexIterator->reset();
_builder.clear();
if (_indexIterator != nullptr) {
_indexIterator->reset();
_hasMore = true;
_limit = _originalLimit;
}
}
//////////////////////////////////////////////////////////////////////////////

View File

@ -47,16 +47,17 @@ struct OperationCursor : public OperationResult {
arangodb::velocypack::Builder _builder;
bool _hasMore;
uint64_t _limit;
uint64_t _batchSize;
uint64_t const _originalLimit;
uint64_t const _batchSize;
public:
explicit OperationCursor(int code)
: OperationResult(code), _builder(buffer), _hasMore(false), _limit(0), _batchSize(1000) {
: OperationResult(code), _builder(buffer), _hasMore(false), _limit(0), _originalLimit(0), _batchSize(1000) {
}
OperationCursor(int code, std::string const& message)
: OperationResult(code, message), _builder(buffer), _hasMore(false), _limit(0), _batchSize(1000) {
: OperationResult(code, message), _builder(buffer), _hasMore(false), _limit(0), _originalLimit(0), _batchSize(1000) {
}
OperationCursor(std::shared_ptr<VPackBuffer<uint8_t>> buffer,
@ -68,6 +69,7 @@ struct OperationCursor : public OperationResult {
_builder(buffer),
_hasMore(false),
_limit(0),
_originalLimit(0),
_batchSize(1000) {
}
@ -78,7 +80,8 @@ struct OperationCursor : public OperationResult {
_indexIterator(iterator),
_builder(buffer),
_hasMore(true),
_limit(limit),
_limit(limit), // _limit is modified later on
_originalLimit(limit),
_batchSize(batchSize) {
if (_indexIterator == nullptr) {
_hasMore = false;