mirror of https://gitee.com/bigwinds/arangodb
270 lines
8.9 KiB
C++
270 lines
8.9 KiB
C++
////////////////////////////////////////////////////////////////////////////////
|
|
/// DISCLAIMER
|
|
///
|
|
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
|
|
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
|
|
///
|
|
/// Licensed under the Apache License, Version 2.0 (the "License");
|
|
/// you may not use this file except in compliance with the License.
|
|
/// You may obtain a copy of the License at
|
|
///
|
|
/// http://www.apache.org/licenses/LICENSE-2.0
|
|
///
|
|
/// Unless required by applicable law or agreed to in writing, software
|
|
/// distributed under the License is distributed on an "AS IS" BASIS,
|
|
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
/// See the License for the specific language governing permissions and
|
|
/// limitations under the License.
|
|
///
|
|
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
|
///
|
|
/// @author Max Neunhoeffer
|
|
////////////////////////////////////////////////////////////////////////////////
|
|
|
|
#include "EnumerateCollectionBlock.h"
|
|
|
|
#include "Aql/AqlItemBlock.h"
|
|
#include "Aql/Collection.h"
|
|
#include "Aql/ExecutionEngine.h"
|
|
#include "Aql/Query.h"
|
|
#include "Basics/Exceptions.h"
|
|
#include "Cluster/FollowerInfo.h"
|
|
#include "Cluster/ServerState.h"
|
|
#include "Transaction/Helpers.h"
|
|
#include "Transaction/Methods.h"
|
|
#include "Utils/OperationCursor.h"
|
|
#include "VocBase/LogicalCollection.h"
|
|
#include "VocBase/vocbase.h"
|
|
|
|
using namespace arangodb::aql;
|
|
|
|
EnumerateCollectionBlock::EnumerateCollectionBlock(ExecutionEngine* engine,
|
|
EnumerateCollectionNode const* ep)
|
|
: ExecutionBlock(engine, ep),
|
|
DocumentProducingBlock(ep, _trx),
|
|
_collection(ep->collection()),
|
|
_cursor(_trx->indexScan(_collection->name(),
|
|
(ep->_random ? transaction::Methods::CursorType::ANY
|
|
: transaction::Methods::CursorType::ALL))),
|
|
_inflight(0) {
|
|
TRI_ASSERT(_cursor->ok());
|
|
|
|
buildCallback();
|
|
|
|
if (ServerState::instance()->isRunningInCluster() && _collection->isSatellite()) {
|
|
auto logicalCollection = _collection->getCollection();
|
|
auto cid = logicalCollection->planId();
|
|
auto& dbName = logicalCollection->vocbase().name();
|
|
double maxWait = _engine->getQuery()->queryOptions().satelliteSyncWait;
|
|
bool inSync = false;
|
|
unsigned long waitInterval = 10000;
|
|
double startTime = TRI_microtime();
|
|
double now = startTime;
|
|
double endTime = startTime + maxWait;
|
|
|
|
while (!inSync) {
|
|
auto collectionInfoCurrent =
|
|
ClusterInfo::instance()->getCollectionCurrent(dbName, std::to_string(cid));
|
|
auto followers = collectionInfoCurrent->servers(_collection->name());
|
|
inSync = std::find(followers.begin(), followers.end(),
|
|
ServerState::instance()->getId()) != followers.end();
|
|
if (!inSync) {
|
|
if (endTime - now < waitInterval) {
|
|
waitInterval = static_cast<unsigned long>(endTime - now);
|
|
}
|
|
std::this_thread::sleep_for(std::chrono::microseconds(waitInterval));
|
|
}
|
|
now = TRI_microtime();
|
|
if (now > endTime) {
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (!inSync) {
|
|
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_CLUSTER_AQL_COLLECTION_OUT_OF_SYNC,
|
|
"collection " + _collection->name() +
|
|
" did not come into sync in time (" +
|
|
std::to_string(maxWait) + ")");
|
|
}
|
|
}
|
|
}
|
|
|
|
std::pair<ExecutionState, arangodb::Result> EnumerateCollectionBlock::initializeCursor(
|
|
AqlItemBlock* items, size_t pos) {
|
|
auto res = ExecutionBlock::initializeCursor(items, pos);
|
|
|
|
if (res.first == ExecutionState::WAITING || !res.second.ok()) {
|
|
// If we need to wait or get an error we return as is.
|
|
return res;
|
|
}
|
|
|
|
_cursor->reset();
|
|
|
|
return res;
|
|
}
|
|
|
|
/// @brief getSome
|
|
std::pair<ExecutionState, std::unique_ptr<AqlItemBlock>> EnumerateCollectionBlock::getSome(size_t atMost) {
|
|
traceGetSomeBegin(atMost);
|
|
|
|
TRI_ASSERT(_cursor != nullptr);
|
|
// Invariants:
|
|
// As soon as we notice that _totalCount == 0, we set _done = true.
|
|
// Otherwise, outside of this method (or skipSome), _documents is
|
|
// either empty (at the beginning, with _posInDocuments == 0)
|
|
// or is non-empty and _posInDocuments < _documents.size()
|
|
if (_done) {
|
|
TRI_ASSERT(_inflight == 0);
|
|
TRI_ASSERT(getHasMoreState() == ExecutionState::DONE);
|
|
traceGetSomeEnd(nullptr, ExecutionState::DONE);
|
|
return {ExecutionState::DONE, nullptr};
|
|
}
|
|
|
|
RegisterId const nrInRegs = getNrInputRegisters();
|
|
RegisterId const nrOutRegs = getNrOutputRegisters();
|
|
|
|
std::unique_ptr<AqlItemBlock> res;
|
|
|
|
do {
|
|
size_t toFetch = (std::min)(DefaultBatchSize(), atMost);
|
|
BufferState bufferState = getBlockIfNeeded(toFetch);
|
|
if (bufferState == BufferState::WAITING) {
|
|
return {ExecutionState::WAITING, nullptr};
|
|
}
|
|
if (bufferState == BufferState::NO_MORE_BLOCKS) {
|
|
TRI_ASSERT(_inflight == 0);
|
|
_done = true;
|
|
TRI_ASSERT(getHasMoreState() == ExecutionState::DONE);
|
|
traceGetSomeEnd(nullptr, ExecutionState::DONE);
|
|
return {ExecutionState::DONE, nullptr};
|
|
}
|
|
|
|
// If we get here, we do have _buffer.front()
|
|
AqlItemBlock* cur = _buffer.front();
|
|
|
|
TRI_ASSERT(cur != nullptr);
|
|
TRI_ASSERT(cur->getNrRegs() == nrInRegs);
|
|
|
|
// _cursor->hasMore() should only be false here if the collection is empty
|
|
if (_cursor->hasMore()) {
|
|
res.reset(requestBlock(atMost, nrOutRegs));
|
|
// automatically freed if we throw
|
|
TRI_ASSERT(nrInRegs <= res->getNrRegs());
|
|
|
|
// only copy 1st row of registers inherited from previous frame(s)
|
|
inheritRegisters(cur, res.get(), _pos);
|
|
|
|
throwIfKilled(); // check if we were aborted
|
|
|
|
TRI_IF_FAILURE("EnumerateCollectionBlock::moreDocuments") {
|
|
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
|
|
}
|
|
|
|
bool cursorHasMore;
|
|
if (produceResult()) {
|
|
// properly build up results by fetching the actual documents
|
|
// using nextDocument()
|
|
cursorHasMore = _cursor->nextDocument(
|
|
[&](LocalDocumentId const&, VPackSlice slice) {
|
|
_documentProducer(res.get(), slice, nrInRegs, _inflight, 0);
|
|
},
|
|
atMost);
|
|
} else {
|
|
// performance optimization: we do not need the documents at all,
|
|
// so just call next()
|
|
cursorHasMore = _cursor->next(
|
|
[&](LocalDocumentId const&) {
|
|
_documentProducer(res.get(), VPackSlice::nullSlice(), nrInRegs, _inflight, 0);
|
|
},
|
|
atMost);
|
|
}
|
|
if (!cursorHasMore) {
|
|
TRI_ASSERT(!_cursor->hasMore());
|
|
}
|
|
}
|
|
|
|
if (!_cursor->hasMore()) {
|
|
// we have exhausted this cursor
|
|
// re-initialize fetching of documents
|
|
_cursor->reset();
|
|
AqlItemBlock* removedBlock = advanceCursor(1, 0);
|
|
returnBlockUnlessNull(removedBlock);
|
|
}
|
|
} while (_inflight == 0);
|
|
|
|
_engine->_stats.scannedFull += static_cast<int64_t>(_inflight);
|
|
TRI_ASSERT(res != nullptr);
|
|
|
|
if (_inflight < atMost) {
|
|
// The collection did not have enough results
|
|
res->shrink(_inflight);
|
|
}
|
|
|
|
// Clear out registers no longer needed later:
|
|
clearRegisters(res.get());
|
|
|
|
_inflight = 0;
|
|
traceGetSomeEnd(res.get(), getHasMoreState());
|
|
return {getHasMoreState(), std::move(res)};
|
|
}
|
|
|
|
std::pair<ExecutionState, size_t> EnumerateCollectionBlock::skipSome(size_t atMost) {
|
|
traceSkipSomeBegin(atMost);
|
|
if (_done) {
|
|
TRI_ASSERT(_inflight == 0);
|
|
traceSkipSomeEnd(_inflight, ExecutionState::DONE);
|
|
return {ExecutionState::DONE, _inflight};
|
|
}
|
|
|
|
TRI_ASSERT(_cursor != nullptr);
|
|
|
|
while (_inflight < atMost) {
|
|
if (_buffer.empty()) {
|
|
size_t toFetch = (std::min)(DefaultBatchSize(), atMost - _inflight);
|
|
auto upstreamRes = getBlock(toFetch);
|
|
if (upstreamRes.first == ExecutionState::WAITING) {
|
|
traceSkipSomeEnd(0, ExecutionState::WAITING);
|
|
return {ExecutionState::WAITING, 0};
|
|
}
|
|
_upstreamState = upstreamRes.first;
|
|
if (!upstreamRes.second) {
|
|
_done = true;
|
|
size_t skipped = _inflight;
|
|
_inflight = 0;
|
|
traceSkipSomeEnd(skipped, ExecutionState::DONE);
|
|
return {ExecutionState::DONE, skipped};
|
|
}
|
|
_pos = 0; // this is in the first block
|
|
_cursor->reset();
|
|
}
|
|
|
|
// if we get here, then _buffer.front() exists
|
|
AqlItemBlock* cur = _buffer.front();
|
|
uint64_t skippedHere = 0;
|
|
|
|
if (_cursor->hasMore()) {
|
|
_cursor->skip(atMost - _inflight, skippedHere);
|
|
}
|
|
|
|
_inflight += skippedHere;
|
|
|
|
if (_inflight < atMost) {
|
|
TRI_ASSERT(!_cursor->hasMore());
|
|
// not skipped enough re-initialize fetching of documents
|
|
_cursor->reset();
|
|
if (++_pos >= cur->size()) {
|
|
_buffer.pop_front(); // does not throw
|
|
returnBlock(cur);
|
|
_pos = 0;
|
|
}
|
|
}
|
|
}
|
|
|
|
_engine->_stats.scannedFull += static_cast<int64_t>(_inflight);
|
|
size_t skipped = _inflight;
|
|
_inflight = 0;
|
|
ExecutionState state = getHasMoreState();
|
|
traceSkipSomeEnd(skipped, state);
|
|
return {state, skipped};
|
|
}
|