mirror of https://gitee.com/bigwinds/arangodb
Merge branch 'spdvpk' of ssh://github.com/ArangoDB/ArangoDB into spdvpk
This commit is contained in:
commit
1398eea35a
|
@ -134,6 +134,16 @@ class AqlItemBlock {
|
|||
v._type = AqlValue::SHAPED;
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief fill a slot in the item block with an external VelocyPack value.
|
||||
/// This value should not be freed.
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void setExternal(size_t index, RegisterId varNr,
|
||||
arangodb::velocypack::Slice external) {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief eraseValue, erase the current value of a register and freeing it
|
||||
/// if this was the last reference to the value
|
||||
|
|
|
@ -25,64 +25,31 @@
|
|||
|
||||
using namespace arangodb::aql;
|
||||
|
||||
CollectionScanner::CollectionScanner(
|
||||
arangodb::AqlTransaction* trx, TRI_transaction_collection_t* trxCollection)
|
||||
: trx(trx), trxCollection(trxCollection), totalCount(0) {}
|
||||
CollectionScanner::CollectionScanner(arangodb::AqlTransaction* trx,
|
||||
std::string const& collection,
|
||||
bool readRandom)
|
||||
: _cursor(trx->indexScan(collection,
|
||||
(readRandom ? Transaction::CursorType::ANY
|
||||
: Transaction::CursorType::ALL),
|
||||
"", VPackSlice(), 0, UINT64_MAX, 1000, false)) {
|
||||
TRI_ASSERT(_cursor.successful());
|
||||
}
|
||||
|
||||
CollectionScanner::~CollectionScanner() {}
|
||||
|
||||
RandomCollectionScanner::RandomCollectionScanner(
|
||||
arangodb::AqlTransaction* trx, TRI_transaction_collection_t* trxCollection)
|
||||
: CollectionScanner(trx, trxCollection), step(0) {}
|
||||
|
||||
int RandomCollectionScanner::scan(std::vector<TRI_doc_mptr_t>& docs,
|
||||
size_t batchSize) {
|
||||
return trx->any(trxCollection, docs, initialPosition, position,
|
||||
static_cast<uint64_t>(batchSize), step, totalCount);
|
||||
VPackSlice CollectionScanner::scan(size_t batchSize) {
|
||||
if (!_cursor.hasMore()) {
|
||||
VPackSlice empty;
|
||||
return empty;
|
||||
}
|
||||
_cursor.getMore(batchSize, true);
|
||||
return _cursor.slice();
|
||||
}
|
||||
|
||||
int RandomCollectionScanner::forward(size_t batchSize, size_t& skipped) {
|
||||
// Basic implementation, no gain
|
||||
std::vector<TRI_doc_mptr_t> unusedDocs;
|
||||
unusedDocs.reserve(batchSize);
|
||||
int res = scan(unusedDocs, batchSize);
|
||||
skipped += unusedDocs.size();
|
||||
// TRI_doc_mptr_t is never freed
|
||||
unusedDocs.clear();
|
||||
return res;
|
||||
int CollectionScanner::forward(size_t batchSize, uint64_t& skipped) {
|
||||
return _cursor.skip(batchSize, skipped);
|
||||
}
|
||||
|
||||
void RandomCollectionScanner::reset() {
|
||||
initialPosition.reset();
|
||||
position.reset();
|
||||
step = 0;
|
||||
void CollectionScanner::reset() {
|
||||
_cursor.reset();
|
||||
}
|
||||
|
||||
LinearCollectionScanner::LinearCollectionScanner(
|
||||
arangodb::AqlTransaction* trx, TRI_transaction_collection_t* trxCollection)
|
||||
: CollectionScanner(trx, trxCollection) {}
|
||||
|
||||
int LinearCollectionScanner::scan(std::vector<TRI_doc_mptr_t>& docs,
|
||||
size_t batchSize) {
|
||||
uint64_t skip = 0;
|
||||
return trx->readIncremental(trxCollection, docs, position,
|
||||
static_cast<uint64_t>(batchSize), skip,
|
||||
UINT64_MAX, totalCount);
|
||||
}
|
||||
|
||||
int LinearCollectionScanner::forward(size_t batchSize, size_t& skipped) {
|
||||
// Basic implementation, no gain
|
||||
std::vector<TRI_doc_mptr_t> unusedDocs;
|
||||
uint64_t toSkip = static_cast<uint64_t>(batchSize);
|
||||
|
||||
int res = trx->readIncremental(trxCollection, unusedDocs, position, 0,
|
||||
toSkip, // Will be modified. Will reach 0 if
|
||||
// batchSize many docs have been skipped
|
||||
UINT64_MAX, totalCount);
|
||||
uint64_t reallySkipped = static_cast<uint64_t>(batchSize) - toSkip;
|
||||
skipped += static_cast<size_t>(reallySkipped);
|
||||
TRI_ASSERT(unusedDocs.empty());
|
||||
return res;
|
||||
}
|
||||
|
||||
void LinearCollectionScanner::reset() { position.reset(); }
|
||||
|
|
|
@ -26,21 +26,20 @@
|
|||
|
||||
#include "Basics/Common.h"
|
||||
#include "Utils/AqlTransaction.h"
|
||||
#include "VocBase/document-collection.h"
|
||||
#include "VocBase/transaction.h"
|
||||
#include "VocBase/vocbase.h"
|
||||
#include "Utils/OperationCursor.h"
|
||||
|
||||
namespace arangodb {
|
||||
namespace aql {
|
||||
|
||||
struct CollectionScanner {
|
||||
CollectionScanner(arangodb::AqlTransaction*, TRI_transaction_collection_t*);
|
||||
class CollectionScanner {
|
||||
public:
|
||||
CollectionScanner(arangodb::AqlTransaction*, std::string const&, bool);
|
||||
|
||||
virtual ~CollectionScanner();
|
||||
~CollectionScanner();
|
||||
|
||||
virtual int scan(std::vector<TRI_doc_mptr_t>&, size_t) = 0;
|
||||
arangodb::velocypack::Slice scan(size_t);
|
||||
|
||||
virtual void reset() = 0;
|
||||
void reset();
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief forwards the cursor n elements. Does not read the data.
|
||||
|
@ -49,37 +48,10 @@ struct CollectionScanner {
|
|||
/// really skipped
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
virtual int forward(size_t, size_t&) = 0;
|
||||
int forward(size_t, uint64_t&);
|
||||
|
||||
arangodb::AqlTransaction* trx;
|
||||
TRI_transaction_collection_t* trxCollection;
|
||||
uint64_t totalCount;
|
||||
arangodb::basics::BucketPosition position;
|
||||
};
|
||||
|
||||
struct RandomCollectionScanner final : public CollectionScanner {
|
||||
RandomCollectionScanner(arangodb::AqlTransaction*,
|
||||
TRI_transaction_collection_t*);
|
||||
|
||||
int scan(std::vector<TRI_doc_mptr_t>&, size_t) override;
|
||||
|
||||
void reset() override;
|
||||
|
||||
int forward(size_t, size_t&) override;
|
||||
|
||||
arangodb::basics::BucketPosition initialPosition;
|
||||
uint64_t step;
|
||||
};
|
||||
|
||||
struct LinearCollectionScanner final : public CollectionScanner {
|
||||
LinearCollectionScanner(arangodb::AqlTransaction*,
|
||||
TRI_transaction_collection_t*);
|
||||
|
||||
int scan(std::vector<TRI_doc_mptr_t>&, size_t) override;
|
||||
|
||||
void reset() override;
|
||||
|
||||
int forward(size_t, size_t&) override;
|
||||
private:
|
||||
OperationCursor _cursor;
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
|
@ -41,18 +41,7 @@ EnumerateCollectionBlock::EnumerateCollectionBlock(
|
|||
_posInDocuments(0),
|
||||
_random(ep->_random),
|
||||
_mustStoreResult(true) {
|
||||
auto trxCollection = _trx->trxCollection(_collection->cid());
|
||||
if (trxCollection != nullptr) {
|
||||
_trx->orderDitch(trxCollection);
|
||||
}
|
||||
|
||||
if (_random) {
|
||||
// random scan
|
||||
_scanner = new RandomCollectionScanner(_trx, trxCollection);
|
||||
} else {
|
||||
// default: linear scan
|
||||
_scanner = new LinearCollectionScanner(_trx, trxCollection);
|
||||
}
|
||||
_scanner = new CollectionScanner(_trx, _collection->getName(), _random);
|
||||
}
|
||||
|
||||
EnumerateCollectionBlock::~EnumerateCollectionBlock() { delete _scanner; }
|
||||
|
@ -63,7 +52,8 @@ EnumerateCollectionBlock::~EnumerateCollectionBlock() { delete _scanner; }
|
|||
|
||||
void EnumerateCollectionBlock::initializeDocuments() {
|
||||
_scanner->reset();
|
||||
_documents.clear();
|
||||
VPackSlice none;
|
||||
_documents = none;
|
||||
_posInDocuments = 0;
|
||||
}
|
||||
|
||||
|
@ -73,7 +63,7 @@ void EnumerateCollectionBlock::initializeDocuments() {
|
|||
|
||||
bool EnumerateCollectionBlock::skipDocuments(size_t toSkip, size_t& skipped) {
|
||||
throwIfKilled(); // check if we were aborted
|
||||
size_t skippedHere = 0;
|
||||
uint64_t skippedHere = 0;
|
||||
|
||||
int res = _scanner->forward(toSkip, skippedHere);
|
||||
|
||||
|
@ -83,7 +73,8 @@ bool EnumerateCollectionBlock::skipDocuments(size_t toSkip, size_t& skipped) {
|
|||
|
||||
skipped += skippedHere;
|
||||
|
||||
_documents.clear();
|
||||
VPackSlice none;
|
||||
_documents = none;
|
||||
_posInDocuments = 0;
|
||||
|
||||
_engine->_stats.scannedFull += static_cast<int64_t>(skippedHere);
|
||||
|
@ -111,22 +102,19 @@ bool EnumerateCollectionBlock::moreDocuments(size_t hint) {
|
|||
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
|
||||
}
|
||||
|
||||
std::vector<TRI_doc_mptr_t> newDocs;
|
||||
newDocs.reserve(hint);
|
||||
_documents = _scanner->scan(hint);
|
||||
TRI_ASSERT(_documents.isArray());
|
||||
VPackValueLength count = _documents.length();
|
||||
|
||||
int res = _scanner->scan(newDocs, hint);
|
||||
|
||||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
THROW_ARANGO_EXCEPTION(res);
|
||||
}
|
||||
|
||||
if (newDocs.empty()) {
|
||||
if (count == 0) {
|
||||
VPackSlice none;
|
||||
_documents = none;
|
||||
return false;
|
||||
}
|
||||
|
||||
_engine->_stats.scannedFull += static_cast<int64_t>(newDocs.size());
|
||||
_engine->_stats.scannedFull += static_cast<int64_t>(count);
|
||||
|
||||
_documents.swap(newDocs);
|
||||
_documentsSize = static_cast<size_t>(count);
|
||||
_posInDocuments = 0;
|
||||
|
||||
return true;
|
||||
|
@ -182,14 +170,14 @@ AqlItemBlock* EnumerateCollectionBlock::getSome(size_t, // atLeast,
|
|||
size_t const curRegs = cur->getNrRegs();
|
||||
|
||||
// Get more documents from collection if _documents is empty:
|
||||
if (_posInDocuments >= _documents.size()) {
|
||||
if (_posInDocuments >= _documentsSize) {
|
||||
if (!moreDocuments(atMost)) {
|
||||
_done = true;
|
||||
return nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
size_t available = _documents.size() - _posInDocuments;
|
||||
size_t available = _documentsSize - _posInDocuments;
|
||||
size_t toSend = (std::min)(atMost, available);
|
||||
RegisterId nrRegs =
|
||||
getPlanNode()->getRegisterPlan()->nrRegs[getPlanNode()->getDepth()];
|
||||
|
@ -219,9 +207,8 @@ AqlItemBlock* EnumerateCollectionBlock::getSome(size_t, // atLeast,
|
|||
// The result is in the first variable of this depth,
|
||||
// we do not need to do a lookup in getPlanNode()->_registerPlan->varInfo,
|
||||
// but can just take cur->getNrRegs() as registerId:
|
||||
res->setShaped(j, static_cast<arangodb::aql::RegisterId>(curRegs),
|
||||
reinterpret_cast<TRI_df_marker_t const*>(
|
||||
_documents[_posInDocuments].getDataPtr()));
|
||||
res->setExternal(j, static_cast<arangodb::aql::RegisterId>(curRegs),
|
||||
_documents.at(_posInDocuments));
|
||||
// No harm done, if the setValue throws!
|
||||
}
|
||||
|
||||
|
@ -229,7 +216,7 @@ AqlItemBlock* EnumerateCollectionBlock::getSome(size_t, // atLeast,
|
|||
}
|
||||
|
||||
// Advance read position:
|
||||
if (_posInDocuments >= _documents.size()) {
|
||||
if (_posInDocuments >= _documentsSize) {
|
||||
// we have exhausted our local documents buffer
|
||||
// fetch more documents into our buffer
|
||||
if (!moreDocuments(atMost)) {
|
||||
|
@ -257,18 +244,19 @@ size_t EnumerateCollectionBlock::skipSome(size_t atLeast, size_t atMost) {
|
|||
return skipped;
|
||||
}
|
||||
|
||||
if (!_documents.empty()) {
|
||||
if (_posInDocuments < _documents.size()) {
|
||||
if (!_documents.isNone()) {
|
||||
if (_posInDocuments < _documentsSize) {
|
||||
// We still have unread documents in the _documents buffer
|
||||
// Just skip them
|
||||
size_t couldSkip = _documents.size() - _posInDocuments;
|
||||
size_t couldSkip = _documentsSize - _posInDocuments;
|
||||
if (atMost <= couldSkip) {
|
||||
// More in buffer then to skip.
|
||||
_posInDocuments += atMost;
|
||||
return atMost;
|
||||
}
|
||||
// Skip entire buffer
|
||||
_documents.clear();
|
||||
VPackSlice none;
|
||||
_documents = none;
|
||||
_posInDocuments = 0;
|
||||
skipped += couldSkip;
|
||||
}
|
||||
|
@ -276,7 +264,7 @@ size_t EnumerateCollectionBlock::skipSome(size_t atLeast, size_t atMost) {
|
|||
|
||||
// No _documents buffer. But could Skip more
|
||||
// Fastforward the _scanner
|
||||
TRI_ASSERT(_documents.empty());
|
||||
TRI_ASSERT(_documents.isNone());
|
||||
|
||||
while (skipped < atLeast) {
|
||||
if (_buffer.empty()) {
|
||||
|
|
|
@ -104,7 +104,13 @@ class EnumerateCollectionBlock : public ExecutionBlock {
|
|||
/// @brief document buffer
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
std::vector<TRI_doc_mptr_t> _documents;
|
||||
arangodb::velocypack::Slice _documents;
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief length of documents
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
size_t _documentsSize;
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief current position in _documents
|
||||
|
|
|
@ -103,11 +103,12 @@ void IndexIterator::reset() {}
|
|||
/// @brief default implementation for skip
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void IndexIterator::skip(uint64_t count) {
|
||||
void IndexIterator::skip(uint64_t count, uint64_t& skipped) {
|
||||
// Skip the first count-many entries
|
||||
// TODO: Can be improved
|
||||
while (count > 0 && next() != nullptr) {
|
||||
--count;
|
||||
skipped++;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -71,7 +71,7 @@ class IndexIterator {
|
|||
|
||||
virtual void reset();
|
||||
|
||||
virtual void skip(uint64_t count);
|
||||
virtual void skip(uint64_t count, uint64_t& skipped);
|
||||
};
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
|
|
@ -100,13 +100,14 @@ void CollectionExport::run(uint64_t maxWaitTime, size_t limit) {
|
|||
SingleCollectionTransaction trx(StandaloneTransactionContext::Create(_document->_vocbase),
|
||||
_name, TRI_TRANSACTION_READ);
|
||||
|
||||
trx.addHint(TRI_TRANSACTION_HINT_NO_USAGE_LOCK,
|
||||
true); // already locked by guard above
|
||||
// already locked by guard above
|
||||
trx.addHint(TRI_TRANSACTION_HINT_NO_USAGE_LOCK, true);
|
||||
int res = trx.begin();
|
||||
|
||||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
THROW_ARANGO_EXCEPTION(res);
|
||||
}
|
||||
|
||||
auto idx = _document->primaryIndex();
|
||||
size_t maxDocuments = idx->size();
|
||||
if (limit > 0 && limit < maxDocuments) {
|
||||
|
|
|
@ -26,14 +26,15 @@
|
|||
#include "Basics/VelocyPackHelper.h"
|
||||
#include "Basics/VPackStringBufferAdapter.h"
|
||||
#include "Utils/CollectionExport.h"
|
||||
#include "Utils/CollectionNameResolver.h"
|
||||
#include "Utils/TransactionContext.h"
|
||||
#include "VocBase/document-collection.h"
|
||||
#include "VocBase/shaped-json.h"
|
||||
#include "VocBase/vocbase.h"
|
||||
#include "VocBase/VocShaper.h"
|
||||
|
||||
#include <velocypack/Builder.h>
|
||||
#include <velocypack/Dumper.h>
|
||||
#include <velocypack/Iterator.h>
|
||||
#include <velocypack/Options.h>
|
||||
#include <velocypack/velocypack-aliases.h>
|
||||
|
||||
using namespace arangodb;
|
||||
|
@ -276,13 +277,20 @@ static bool IncludeAttribute(
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void ExportCursor::dump(arangodb::basics::StringBuffer& buffer) {
|
||||
TRI_ASSERT(_ex != nullptr);
|
||||
auto resolver = std::make_unique<CollectionNameResolver>(_vocbase);
|
||||
std::unique_ptr<VPackCustomTypeHandler> customTypeHandler(TransactionContext::createCustomTypeHandler(_vocbase, resolver.get()));
|
||||
|
||||
auto shaper = _ex->_document->getShaper();
|
||||
TRI_ASSERT(_ex != nullptr);
|
||||
auto const restrictionType = _ex->_restrictions.type;
|
||||
|
||||
buffer.appendText("\"result\":[");
|
||||
|
||||
// copy original options
|
||||
VPackOptions options = VPackOptions::Defaults;
|
||||
options.customTypeHandler = customTypeHandler.get();
|
||||
|
||||
VPackBuilder result;
|
||||
|
||||
size_t const n = batchSize();
|
||||
|
||||
for (size_t i = 0; i < n; ++i) {
|
||||
|
@ -294,31 +302,17 @@ void ExportCursor::dump(arangodb::basics::StringBuffer& buffer) {
|
|||
buffer.appendChar(',');
|
||||
}
|
||||
|
||||
auto marker =
|
||||
static_cast<TRI_df_marker_t const*>(_ex->_documents->at(_position++));
|
||||
char const* p = reinterpret_cast<char const*>(_ex->_documents->at(_position++));
|
||||
VPackSlice const slice(p + DatafileHelper::VPackOffset(TRI_WAL_MARKER_VPACK_DOCUMENT));
|
||||
|
||||
TRI_shaped_json_t shaped;
|
||||
TRI_EXTRACT_SHAPED_JSON_MARKER(shaped, marker);
|
||||
// Only Temporary wait for Shaped ==> VPack
|
||||
std::unique_ptr<TRI_json_t> tmp(TRI_JsonShapedJson(shaper, &shaped));
|
||||
std::shared_ptr<VPackBuilder> builder = arangodb::basics::JsonHelper::toVelocyPack(tmp.get());
|
||||
|
||||
if (builder == nullptr) {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_OUT_OF_MEMORY);
|
||||
}
|
||||
VPackSlice const shapedSlice = builder->slice();
|
||||
VPackBuilder result;
|
||||
{
|
||||
result.clear();
|
||||
|
||||
VPackObjectBuilder b(&result);
|
||||
// Copy over shaped values
|
||||
for (auto const& entry : VPackObjectIterator(shapedSlice)) {
|
||||
std::string key = entry.key.copyString();
|
||||
if (key == TRI_VOC_ATTRIBUTE_ID || key == TRI_VOC_ATTRIBUTE_KEY ||
|
||||
key == TRI_VOC_ATTRIBUTE_FROM || key == TRI_VOC_ATTRIBUTE_TO ||
|
||||
key == TRI_VOC_ATTRIBUTE_REV) {
|
||||
// This if excludes all internal values. Just to make sure they are not present when added later.
|
||||
continue;
|
||||
}
|
||||
for (auto const& entry : VPackObjectIterator(slice)) {
|
||||
std::string key(entry.key.copyString());
|
||||
|
||||
if (!IncludeAttribute(restrictionType, _ex->_restrictions.fields, key)) {
|
||||
// Ignore everything that should be excluded or not included
|
||||
continue;
|
||||
|
@ -327,51 +321,12 @@ void ExportCursor::dump(arangodb::basics::StringBuffer& buffer) {
|
|||
result.add(key, entry.value);
|
||||
}
|
||||
// append the internal attributes
|
||||
|
||||
// _id, _key, _rev
|
||||
char const* key = TRI_EXTRACT_MARKER_KEY(marker);
|
||||
if (IncludeAttribute(restrictionType, _ex->_restrictions.fields, TRI_VOC_ATTRIBUTE_ID)) {
|
||||
std::string id(
|
||||
_ex->_resolver.getCollectionName(_ex->_document->_info.id()));
|
||||
id.push_back('/');
|
||||
id.append(key);
|
||||
result.add(TRI_VOC_ATTRIBUTE_ID, VPackValue(id));
|
||||
}
|
||||
if (IncludeAttribute(restrictionType, _ex->_restrictions.fields, TRI_VOC_ATTRIBUTE_KEY)) {
|
||||
result.add(TRI_VOC_ATTRIBUTE_KEY, VPackValue(key));
|
||||
}
|
||||
if (IncludeAttribute(restrictionType, _ex->_restrictions.fields, TRI_VOC_ATTRIBUTE_REV)) {
|
||||
std::string rev = std::to_string(TRI_EXTRACT_MARKER_RID(marker));
|
||||
result.add(TRI_VOC_ATTRIBUTE_REV, VPackValue(rev));
|
||||
}
|
||||
|
||||
#if 0
|
||||
// TODO
|
||||
if (TRI_IS_EDGE_MARKER(marker)) {
|
||||
if (IncludeAttribute(restrictionType, _ex->_restrictions.fields, TRI_VOC_ATTRIBUTE_FROM)) {
|
||||
// _from
|
||||
std::string from(_ex->_resolver.getCollectionNameCluster(
|
||||
TRI_EXTRACT_MARKER_FROM_CID(marker)));
|
||||
from.push_back('/');
|
||||
from.append(TRI_EXTRACT_MARKER_FROM_KEY(marker));
|
||||
result.add(TRI_VOC_ATTRIBUTE_FROM, VPackValue(from));
|
||||
}
|
||||
|
||||
if (IncludeAttribute(restrictionType, _ex->_restrictions.fields, TRI_VOC_ATTRIBUTE_TO)) {
|
||||
// _to
|
||||
std::string to(_ex->_resolver.getCollectionNameCluster(
|
||||
TRI_EXTRACT_MARKER_TO_CID(marker)));
|
||||
to.push_back('/');
|
||||
to.append(TRI_EXTRACT_MARKER_TO_KEY(marker));
|
||||
result.add(TRI_VOC_ATTRIBUTE_FROM, VPackValue(to));
|
||||
}
|
||||
}
|
||||
#endif
|
||||
}
|
||||
arangodb::basics::VPackStringBufferAdapter bufferAdapter(
|
||||
buffer.stringBuffer());
|
||||
VPackDumper dumper(&bufferAdapter);
|
||||
|
||||
arangodb::basics::VPackStringBufferAdapter bufferAdapter(buffer.stringBuffer());
|
||||
|
||||
try {
|
||||
VPackDumper dumper(&bufferAdapter, &options);
|
||||
dumper.dump(result.slice());
|
||||
} catch (...) {
|
||||
/// TODO correct error Handling!
|
||||
|
|
|
@ -25,6 +25,11 @@
|
|||
|
||||
using namespace arangodb;
|
||||
|
||||
|
||||
void OperationCursor::reset() {
|
||||
_indexIterator->reset();
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief Get next default batchSize many elements.
|
||||
/// Check hasMore()==true before using this
|
||||
|
@ -38,10 +43,12 @@ int OperationCursor::getMore() {
|
|||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief Get next batchSize many elements.
|
||||
/// Check hasMore()==true before using this
|
||||
/// If useExternals is set to true all elements in the vpack are
|
||||
/// externals. Otherwise they are inlined.
|
||||
/// NOTE: This will throw on OUT_OF_MEMORY
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int OperationCursor::getMore(uint64_t batchSize) {
|
||||
int OperationCursor::getMore(uint64_t batchSize, bool useExternals) {
|
||||
// This may throw out of memory
|
||||
if (!hasMore()) {
|
||||
TRI_ASSERT(false);
|
||||
|
@ -56,7 +63,11 @@ int OperationCursor::getMore(uint64_t batchSize) {
|
|||
while (batchSize > 0 && _limit > 0 && (mptr = _indexIterator->next()) != nullptr) {
|
||||
--batchSize;
|
||||
--_limit;
|
||||
_builder.add(VPackSlice(mptr->vpack()));
|
||||
if (useExternals) {
|
||||
_builder.add(VPackValue(mptr->vpack(), VPackValueType::External));
|
||||
} else {
|
||||
_builder.add(VPackSlice(mptr->vpack()));
|
||||
}
|
||||
}
|
||||
if (batchSize > 0 || _limit == 0) {
|
||||
// Iterator empty, there is no more
|
||||
|
@ -64,3 +75,32 @@ int OperationCursor::getMore(uint64_t batchSize) {
|
|||
}
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief Skip the next toSkip many elements.
|
||||
/// skipped will be increased by the amount of skipped elements afterwards
|
||||
/// Check hasMore()==true before using this
|
||||
/// NOTE: This will throw on OUT_OF_MEMORY
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int OperationCursor::skip(uint64_t toSkip, uint64_t& skipped) {
|
||||
if (!hasMore()) {
|
||||
TRI_ASSERT(false);
|
||||
// You requested more even if you should have checked it before.
|
||||
return TRI_ERROR_FORBIDDEN;
|
||||
}
|
||||
|
||||
if (toSkip > _limit) {
|
||||
// Short-cut, we jump to the end
|
||||
_limit = 0;
|
||||
_hasMore = false;
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
||||
_indexIterator->skip(toSkip, skipped);
|
||||
_limit -= skipped;
|
||||
if (skipped != toSkip || _limit == 0) {
|
||||
_hasMore = false;
|
||||
}
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
|
|
@ -94,13 +94,19 @@ struct OperationCursor : public OperationResult {
|
|||
return _hasMore;
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief Reset the cursor
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void reset();
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief Get next batchSize many elements.
|
||||
/// Check hasMore()==true before using this
|
||||
/// NOTE: This will throw on OUT_OF_MEMORY
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int getMore(uint64_t batchSize);
|
||||
int getMore(uint64_t, bool useExternals = false);
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief Get next default batchSize many elements.
|
||||
|
@ -109,6 +115,15 @@ struct OperationCursor : public OperationResult {
|
|||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int getMore();
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief Skip the next toSkip many elements.
|
||||
/// skipped will be increased by the amount of skipped elements afterwards
|
||||
/// Check hasMore()==true before using this
|
||||
/// NOTE: This will throw on OUT_OF_MEMORY
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int skip(uint64_t, uint64_t&);
|
||||
};
|
||||
|
||||
}
|
||||
|
|
|
@ -1776,7 +1776,8 @@ OperationCursor Transaction::indexScan(
|
|||
return OperationCursor(TRI_ERROR_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
iterator->skip(skip);
|
||||
uint64_t unused = 0;
|
||||
iterator->skip(skip, unused);
|
||||
|
||||
return OperationCursor(transactionContext()->orderCustomTypeHandler(),
|
||||
iterator.release(), limit, batchSize);
|
||||
|
|
|
@ -134,6 +134,15 @@ TransactionContext::~TransactionContext() {
|
|||
_resolver = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief factory to create a custom type handler, not managed
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
VPackCustomTypeHandler* TransactionContext::createCustomTypeHandler(TRI_vocbase_t* vocbase,
|
||||
CollectionNameResolver const* resolver) {
|
||||
return new CustomTypeHandler(vocbase, resolver);
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief order a document ditch for the collection
|
||||
|
@ -141,7 +150,7 @@ TransactionContext::~TransactionContext() {
|
|||
|
||||
std::shared_ptr<VPackCustomTypeHandler> TransactionContext::orderCustomTypeHandler() {
|
||||
if (_customTypeHandler == nullptr) {
|
||||
_customTypeHandler.reset(new CustomTypeHandler(_vocbase, getResolver()));
|
||||
_customTypeHandler.reset(TransactionContext::createCustomTypeHandler(_vocbase, getResolver()));
|
||||
}
|
||||
|
||||
TRI_ASSERT(_customTypeHandler != nullptr);
|
||||
|
|
|
@ -37,6 +37,7 @@ namespace velocypack {
|
|||
struct CustomTypeHandler;
|
||||
}
|
||||
|
||||
class CollectionNameResolver;
|
||||
class DocumentDitch;
|
||||
|
||||
class TransactionContext {
|
||||
|
@ -45,6 +46,7 @@ class TransactionContext {
|
|||
TransactionContext& operator=(TransactionContext const&) = delete;
|
||||
|
||||
protected:
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief create the context
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -52,12 +54,21 @@ class TransactionContext {
|
|||
explicit TransactionContext(TRI_vocbase_t* vocbase);
|
||||
|
||||
public:
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief destroy the context
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
virtual ~TransactionContext();
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief factory to create a custom type handler, not managed
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
static arangodb::velocypack::CustomTypeHandler* createCustomTypeHandler(
|
||||
TRI_vocbase_t*,
|
||||
arangodb::CollectionNameResolver const*);
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief return the vocbase
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
|
Loading…
Reference in New Issue