1
0
Fork 0

Bug fix/simplify things (#6516)

This commit is contained in:
Jan 2018-09-18 17:47:01 +02:00 committed by GitHub
parent 78fc6aa758
commit c38051519e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 338 additions and 551 deletions

View File

@ -839,7 +839,7 @@ size_t ClusterComm::performRequests(std::vector<ClusterCommRequest>& requests,
ClusterCommTimeout timeout, size_t& nrDone, ClusterCommTimeout timeout, size_t& nrDone,
arangodb::LogTopic const& logTopic, arangodb::LogTopic const& logTopic,
bool retryOnCollNotFound) { bool retryOnCollNotFound) {
if (requests.size() == 0) { if (requests.empty()) {
nrDone = 0; nrDone = 0;
return 0; return 0;
} }

View File

@ -43,6 +43,7 @@
#include "Utils/OperationOptions.h" #include "Utils/OperationOptions.h"
#include "VocBase/LocalDocumentId.h" #include "VocBase/LocalDocumentId.h"
#include "VocBase/LogicalCollection.h" #include "VocBase/LogicalCollection.h"
#include "VocBase/ManagedDocumentResult.h"
#include "VocBase/ticks.h" #include "VocBase/ticks.h"
#include "VocBase/voc-types.h" #include "VocBase/voc-types.h"

View File

@ -30,7 +30,6 @@
#include "ClusterEngine/Common.h" #include "ClusterEngine/Common.h"
#include "StorageEngine/PhysicalCollection.h" #include "StorageEngine/PhysicalCollection.h"
#include "VocBase/LogicalCollection.h" #include "VocBase/LogicalCollection.h"
#include "VocBase/ManagedDocumentResult.h"
namespace rocksdb { namespace rocksdb {
class Transaction; class Transaction;

View File

@ -65,11 +65,55 @@
#include "VocBase/KeyGenerator.h" #include "VocBase/KeyGenerator.h"
#include "VocBase/LocalDocumentId.h" #include "VocBase/LocalDocumentId.h"
#include "VocBase/LogicalCollection.h" #include "VocBase/LogicalCollection.h"
#include "VocBase/ManagedDocumentResult.h"
#include "VocBase/ticks.h" #include "VocBase/ticks.h"
using namespace arangodb; using namespace arangodb;
using Helper = arangodb::basics::VelocyPackHelper; using Helper = arangodb::basics::VelocyPackHelper;
/// @brief state during opening of a collection
namespace arangodb {
struct OpenIteratorState {
LogicalCollection* _collection;
arangodb::MMFilesPrimaryIndex* _primaryIndex;
TRI_voc_tid_t _tid;
TRI_voc_fid_t _fid;
std::unordered_map<TRI_voc_fid_t, MMFilesDatafileStatisticsContainer*>
_stats;
MMFilesDatafileStatisticsContainer* _dfi;
transaction::Methods* _trx;
ManagedDocumentResult _mdr;
IndexLookupContext _context;
uint64_t _deletions;
uint64_t _documents;
int64_t _initialCount;
OpenIteratorState(LogicalCollection* collection, transaction::Methods* trx)
: _collection(collection),
_primaryIndex(
static_cast<MMFilesCollection*>(collection->getPhysical())
->primaryIndex()),
_tid(0),
_fid(0),
_stats(),
_dfi(nullptr),
_trx(trx),
_context(trx, collection, &_mdr, 1),
_deletions(0),
_documents(0),
_initialCount(-1) {
TRI_ASSERT(collection != nullptr);
TRI_ASSERT(trx != nullptr);
}
~OpenIteratorState() {
for (auto& it : _stats) {
delete it.second;
}
}
};
}
namespace { namespace {
/// @brief helper class for filling indexes /// @brief helper class for filling indexes
@ -100,7 +144,7 @@ class MMFilesIndexFillerTask : public basics::LocalTask {
/// @brief find a statistics container for a given file id /// @brief find a statistics container for a given file id
static MMFilesDatafileStatisticsContainer* FindDatafileStats( static MMFilesDatafileStatisticsContainer* FindDatafileStats(
MMFilesCollection::OpenIteratorState* state, TRI_voc_fid_t fid) { OpenIteratorState* state, TRI_voc_fid_t fid) {
auto it = state->_stats.find(fid); auto it = state->_stats.find(fid);
if (it != state->_stats.end()) { if (it != state->_stats.end()) {
@ -216,7 +260,7 @@ PhysicalCollection* MMFilesCollection::clone(LogicalCollection& logical) const {
/// @brief process a document (or edge) marker when opening a collection /// @brief process a document (or edge) marker when opening a collection
int MMFilesCollection::OpenIteratorHandleDocumentMarker( int MMFilesCollection::OpenIteratorHandleDocumentMarker(
MMFilesMarker const* marker, MMFilesDatafile* datafile, MMFilesMarker const* marker, MMFilesDatafile* datafile,
MMFilesCollection::OpenIteratorState* state) { OpenIteratorState* state) {
LogicalCollection* collection = state->_collection; LogicalCollection* collection = state->_collection;
TRI_ASSERT(collection != nullptr); TRI_ASSERT(collection != nullptr);
auto physical = static_cast<MMFilesCollection*>(collection->getPhysical()); auto physical = static_cast<MMFilesCollection*>(collection->getPhysical());
@ -265,7 +309,7 @@ int MMFilesCollection::OpenIteratorHandleDocumentMarker(
// no primary index lock required here because we are the only ones reading // no primary index lock required here because we are the only ones reading
// from the index ATM // from the index ATM
MMFilesSimpleIndexElement* found = MMFilesSimpleIndexElement* found =
state->_primaryIndex->lookupKeyRef(trx, keySlice, state->_mmdr); state->_primaryIndex->lookupKeyRef(trx, keySlice, state->_mdr);
// it is a new entry // it is a new entry
if (found == nullptr || !found->isSet()) { if (found == nullptr || !found->isSet()) {
@ -274,7 +318,7 @@ int MMFilesCollection::OpenIteratorHandleDocumentMarker(
// insert into primary index // insert into primary index
Result res = state->_primaryIndex->insertKey(trx, localDocumentId, Result res = state->_primaryIndex->insertKey(trx, localDocumentId,
VPackSlice(vpack), VPackSlice(vpack),
state->_mmdr, state->_mdr,
Index::OperationMode::normal); Index::OperationMode::normal);
if (res.fail()) { if (res.fail()) {
@ -336,7 +380,7 @@ int MMFilesCollection::OpenIteratorHandleDocumentMarker(
/// @brief process a deletion marker when opening a collection /// @brief process a deletion marker when opening a collection
int MMFilesCollection::OpenIteratorHandleDeletionMarker( int MMFilesCollection::OpenIteratorHandleDeletionMarker(
MMFilesMarker const* marker, MMFilesDatafile* datafile, MMFilesMarker const* marker, MMFilesDatafile* datafile,
MMFilesCollection::OpenIteratorState* state) { OpenIteratorState* state) {
LogicalCollection* collection = state->_collection; LogicalCollection* collection = state->_collection;
TRI_ASSERT(collection != nullptr); TRI_ASSERT(collection != nullptr);
auto physical = static_cast<MMFilesCollection*>(collection->getPhysical()); auto physical = static_cast<MMFilesCollection*>(collection->getPhysical());
@ -371,7 +415,7 @@ int MMFilesCollection::OpenIteratorHandleDeletionMarker(
// no primary index lock required here because we are the only ones reading // no primary index lock required here because we are the only ones reading
// from the index ATM // from the index ATM
MMFilesSimpleIndexElement found = MMFilesSimpleIndexElement found =
state->_primaryIndex->lookupKey(trx, keySlice, state->_mmdr); state->_primaryIndex->lookupKey(trx, keySlice, state->_mdr);
// it is a new entry, so we missed the create // it is a new entry, so we missed the create
if (!found) { if (!found) {
@ -407,7 +451,7 @@ int MMFilesCollection::OpenIteratorHandleDeletionMarker(
state->_dfi->numberDeletions++; state->_dfi->numberDeletions++;
state->_primaryIndex->removeKey(trx, oldLocalDocumentId, VPackSlice(vpack), state->_primaryIndex->removeKey(trx, oldLocalDocumentId, VPackSlice(vpack),
state->_mmdr, Index::OperationMode::normal); state->_mdr, Index::OperationMode::normal);
physical->removeLocalDocumentId(oldLocalDocumentId, true); physical->removeLocalDocumentId(oldLocalDocumentId, true);
} }
@ -417,7 +461,7 @@ int MMFilesCollection::OpenIteratorHandleDeletionMarker(
/// @brief iterator for open /// @brief iterator for open
bool MMFilesCollection::OpenIterator(MMFilesMarker const* marker, bool MMFilesCollection::OpenIterator(MMFilesMarker const* marker,
MMFilesCollection::OpenIteratorState* data, OpenIteratorState* data,
MMFilesDatafile* datafile) { MMFilesDatafile* datafile) {
TRI_voc_tick_t const tick = marker->getTick(); TRI_voc_tick_t const tick = marker->getTick();
MMFilesMarkerType const type = marker->getType(); MMFilesMarkerType const type = marker->getType();
@ -434,15 +478,8 @@ bool MMFilesCollection::OpenIterator(MMFilesMarker const* marker,
if (tick > datafile->_dataMax) { if (tick > datafile->_dataMax) {
datafile->_dataMax = tick; datafile->_dataMax = tick;
} }
if (++data->_operations % 1024 == 0) {
data->_mmdr.reset();
}
} else if (type == TRI_DF_MARKER_VPACK_REMOVE) { } else if (type == TRI_DF_MARKER_VPACK_REMOVE) {
res = OpenIteratorHandleDeletionMarker(marker, datafile, data); res = OpenIteratorHandleDeletionMarker(marker, datafile, data);
if (++data->_operations % 1024 == 0) {
data->_mmdr.reset();
}
} else { } else {
if (type == TRI_DF_MARKER_HEADER) { if (type == TRI_DF_MARKER_HEADER) {
// ensure there is a datafile info entry for each datafile of the // ensure there is a datafile info entry for each datafile of the
@ -3290,7 +3327,7 @@ Result MMFilesCollection::update(
if (newSlice.length() <= 1) { if (newSlice.length() <= 1) {
// no need to do anything // no need to do anything
result = std::move(previous); result = previous;
if (_logicalCollection.waitForSync()) { if (_logicalCollection.waitForSync()) {
options.waitForSync = true; options.waitForSync = true;

View File

@ -38,19 +38,17 @@
#include "VocBase/KeyGenerator.h" #include "VocBase/KeyGenerator.h"
#include "VocBase/LocalDocumentId.h" #include "VocBase/LocalDocumentId.h"
#include "VocBase/LogicalCollection.h" #include "VocBase/LogicalCollection.h"
#include "VocBase/ManagedDocumentResult.h"
struct MMFilesDatafile; struct MMFilesDatafile;
struct MMFilesMarker; struct MMFilesMarker;
namespace arangodb { namespace arangodb {
class LogicalCollection; class LogicalCollection;
class ManagedDocumentResult; class ManagedDocumentResult;
struct MMFilesDocumentOperation; struct MMFilesDocumentOperation;
class MMFilesPrimaryIndex; class MMFilesPrimaryIndex;
class MMFilesWalMarker; class MMFilesWalMarker;
struct OpenIteratorState;
class Result; class Result;
class TransactionState; class TransactionState;
@ -73,50 +71,6 @@ class MMFilesCollection final : public PhysicalCollection {
return toMMFilesCollection(phys); return toMMFilesCollection(phys);
} }
/// @brief state during opening of a collection
struct OpenIteratorState {
LogicalCollection* _collection;
arangodb::MMFilesPrimaryIndex* _primaryIndex;
TRI_voc_tid_t _tid;
TRI_voc_fid_t _fid;
std::unordered_map<TRI_voc_fid_t, MMFilesDatafileStatisticsContainer*>
_stats;
MMFilesDatafileStatisticsContainer* _dfi;
transaction::Methods* _trx;
ManagedDocumentResult _mmdr;
IndexLookupContext _context;
uint64_t _deletions;
uint64_t _documents;
uint64_t _operations;
int64_t _initialCount;
OpenIteratorState(LogicalCollection* collection, transaction::Methods* trx)
: _collection(collection),
_primaryIndex(
static_cast<MMFilesCollection*>(collection->getPhysical())
->primaryIndex()),
_tid(0),
_fid(0),
_stats(),
_dfi(nullptr),
_trx(trx),
_mmdr(),
_context(trx, collection, &_mmdr, 1),
_deletions(0),
_documents(0),
_operations(0),
_initialCount(-1) {
TRI_ASSERT(collection != nullptr);
TRI_ASSERT(trx != nullptr);
}
~OpenIteratorState() {
for (auto& it : _stats) {
delete it.second;
}
}
};
struct DatafileDescription { struct DatafileDescription {
MMFilesDatafile const* _data; MMFilesDatafile const* _data;
TRI_voc_tick_t _dataMin; TRI_voc_tick_t _dataMin;

View File

@ -35,6 +35,7 @@
#include "Utils/SingleCollectionTransaction.h" #include "Utils/SingleCollectionTransaction.h"
#include "Transaction/StandaloneContext.h" #include "Transaction/StandaloneContext.h"
#include "VocBase/LogicalCollection.h" #include "VocBase/LogicalCollection.h"
#include "VocBase/ManagedDocumentResult.h"
#include "VocBase/vocbase.h" #include "VocBase/vocbase.h"
#include <velocypack/Builder.h> #include <velocypack/Builder.h>
@ -110,13 +111,13 @@ void MMFilesCollectionKeys::create(TRI_voc_tick_t maxTick) {
THROW_ARANGO_EXCEPTION(res); THROW_ARANGO_EXCEPTION(res);
} }
ManagedDocumentResult mmdr; ManagedDocumentResult mdr;
MMFilesCollection *mmColl = MMFilesCollection::toMMFilesCollection(_collection); MMFilesCollection* mmColl = MMFilesCollection::toMMFilesCollection(_collection);
trx.invokeOnAllElements( trx.invokeOnAllElements(
_collection->name(), [this, &trx, &maxTick, &mmdr, &mmColl](LocalDocumentId const& token) { _collection->name(), [this, &trx, &maxTick, &mdr, &mmColl](LocalDocumentId const& token) {
if (mmColl->readDocumentConditional(&trx, token, maxTick, mmdr)) { if (mmColl->readDocumentConditional(&trx, token, maxTick, mdr)) {
_vpack.emplace_back(mmdr.vpack()); _vpack.emplace_back(mdr.vpack());
} }
return true; return true;
}); });

View File

@ -40,6 +40,7 @@
#include "Transaction/Methods.h" #include "Transaction/Methods.h"
#include "Utils/CollectionNameResolver.h" #include "Utils/CollectionNameResolver.h"
#include "VocBase/LogicalCollection.h" #include "VocBase/LogicalCollection.h"
#include "VocBase/ManagedDocumentResult.h"
#include <velocypack/Iterator.h> #include <velocypack/Iterator.h>
#include <velocypack/velocypack-aliases.h> #include <velocypack/velocypack-aliases.h>
@ -55,12 +56,12 @@ static std::vector<std::vector<arangodb::basics::AttributeName>> const
MMFilesEdgeIndexIterator::MMFilesEdgeIndexIterator( MMFilesEdgeIndexIterator::MMFilesEdgeIndexIterator(
LogicalCollection* collection, transaction::Methods* trx, LogicalCollection* collection, transaction::Methods* trx,
ManagedDocumentResult* mmdr, arangodb::MMFilesEdgeIndex const* index, ManagedDocumentResult* mdr, arangodb::MMFilesEdgeIndex const* index,
TRI_MMFilesEdgeIndexHash_t const* indexImpl, TRI_MMFilesEdgeIndexHash_t const* indexImpl,
std::unique_ptr<VPackBuilder> keys) std::unique_ptr<VPackBuilder> keys)
: IndexIterator(collection, trx), : IndexIterator(collection, trx),
_index(indexImpl), _index(indexImpl),
_context(trx, collection, mmdr, index->fields().size()), _context(trx, collection, mdr, index->fields().size()),
_keys(std::move(keys)), _keys(std::move(keys)),
_iterator(_keys->slice()), _iterator(_keys->slice()),
_posInBuffer(0), _posInBuffer(0),
@ -419,7 +420,7 @@ bool MMFilesEdgeIndex::supportsFilterCondition(
/// @brief creates an IndexIterator for the given Condition /// @brief creates an IndexIterator for the given Condition
IndexIterator* MMFilesEdgeIndex::iteratorForCondition( IndexIterator* MMFilesEdgeIndex::iteratorForCondition(
transaction::Methods* trx, ManagedDocumentResult* mmdr, transaction::Methods* trx, ManagedDocumentResult* mdr,
arangodb::aql::AstNode const* node, arangodb::aql::AstNode const* node,
arangodb::aql::Variable const* reference, arangodb::aql::Variable const* reference,
IndexIteratorOptions const& opts) { IndexIteratorOptions const& opts) {
@ -442,7 +443,7 @@ IndexIterator* MMFilesEdgeIndex::iteratorForCondition(
if (comp->type == aql::NODE_TYPE_OPERATOR_BINARY_EQ) { if (comp->type == aql::NODE_TYPE_OPERATOR_BINARY_EQ) {
// a.b == value // a.b == value
return createEqIterator(trx, mmdr, attrNode, valNode); return createEqIterator(trx, mdr, attrNode, valNode);
} }
if (comp->type == aql::NODE_TYPE_OPERATOR_BINARY_IN) { if (comp->type == aql::NODE_TYPE_OPERATOR_BINARY_IN) {
@ -452,7 +453,7 @@ IndexIterator* MMFilesEdgeIndex::iteratorForCondition(
return new EmptyIndexIterator(&_collection, trx); return new EmptyIndexIterator(&_collection, trx);
} }
return createInIterator(trx, mmdr, attrNode, valNode); return createInIterator(trx, mdr, attrNode, valNode);
} }
// operator type unsupported // operator type unsupported
@ -469,7 +470,7 @@ arangodb::aql::AstNode* MMFilesEdgeIndex::specializeCondition(
/// @brief create the iterator /// @brief create the iterator
IndexIterator* MMFilesEdgeIndex::createEqIterator( IndexIterator* MMFilesEdgeIndex::createEqIterator(
transaction::Methods* trx, ManagedDocumentResult* mmdr, transaction::Methods* trx, ManagedDocumentResult* mdr,
arangodb::aql::AstNode const* attrNode, arangodb::aql::AstNode const* attrNode,
arangodb::aql::AstNode const* valNode) const { arangodb::aql::AstNode const* valNode) const {
// lease builder, but immediately pass it to the unique_ptr so we don't leak // lease builder, but immediately pass it to the unique_ptr so we don't leak
@ -489,7 +490,7 @@ IndexIterator* MMFilesEdgeIndex::createEqIterator(
return new MMFilesEdgeIndexIterator( return new MMFilesEdgeIndexIterator(
&_collection, &_collection,
trx, trx,
mmdr, mdr,
this, this,
isFrom ? _edgesFrom.get() : _edgesTo.get(), isFrom ? _edgesFrom.get() : _edgesTo.get(),
std::move(keys) std::move(keys)
@ -498,7 +499,7 @@ IndexIterator* MMFilesEdgeIndex::createEqIterator(
/// @brief create the iterator /// @brief create the iterator
IndexIterator* MMFilesEdgeIndex::createInIterator( IndexIterator* MMFilesEdgeIndex::createInIterator(
transaction::Methods* trx, ManagedDocumentResult* mmdr, transaction::Methods* trx, ManagedDocumentResult* mdr,
arangodb::aql::AstNode const* attrNode, arangodb::aql::AstNode const* attrNode,
arangodb::aql::AstNode const* valNode) const { arangodb::aql::AstNode const* valNode) const {
// lease builder, but immediately pass it to the unique_ptr so we don't leak // lease builder, but immediately pass it to the unique_ptr so we don't leak
@ -525,7 +526,7 @@ IndexIterator* MMFilesEdgeIndex::createInIterator(
return new MMFilesEdgeIndexIterator( return new MMFilesEdgeIndexIterator(
&_collection, &_collection,
trx, trx,
mmdr, mdr,
this, this,
isFrom ? _edgesFrom.get() : _edgesTo.get(), isFrom ? _edgesFrom.get() : _edgesTo.get(),
std::move(keys) std::move(keys)

View File

@ -38,6 +38,7 @@
#include "Transaction/Context.h" #include "Transaction/Context.h"
#include "Transaction/Helpers.h" #include "Transaction/Helpers.h"
#include "VocBase/LogicalCollection.h" #include "VocBase/LogicalCollection.h"
#include "VocBase/ManagedDocumentResult.h"
#include <velocypack/Iterator.h> #include <velocypack/Iterator.h>
#include <velocypack/velocypack-aliases.h> #include <velocypack/velocypack-aliases.h>

View File

@ -40,6 +40,7 @@
#include "Utils/OperationOptions.h" #include "Utils/OperationOptions.h"
#include "VocBase/LocalDocumentId.h" #include "VocBase/LocalDocumentId.h"
#include "VocBase/LogicalCollection.h" #include "VocBase/LogicalCollection.h"
#include "VocBase/ManagedDocumentResult.h"
#include <velocypack/Builder.h> #include <velocypack/Builder.h>
#include <velocypack/Iterator.h> #include <velocypack/Iterator.h>
@ -179,12 +180,12 @@ Result handleSyncKeysMMFiles(arangodb::DatabaseInitialSyncer& syncer,
markers.reserve(trx.documentCollection()->numberDocuments(&trx, transaction::CountType::Normal)); markers.reserve(trx.documentCollection()->numberDocuments(&trx, transaction::CountType::Normal));
uint64_t iterations = 0; uint64_t iterations = 0;
ManagedDocumentResult mmdr; ManagedDocumentResult mdr;
trx.invokeOnAllElements( trx.invokeOnAllElements(
trx.name(), [&syncer, &trx, &mmdr, &markers, trx.name(), [&syncer, &trx, &mdr, &markers,
&iterations](LocalDocumentId const& token) { &iterations](LocalDocumentId const& token) {
if (trx.documentCollection()->readDocument(&trx, token, mmdr)) { if (trx.documentCollection()->readDocument(&trx, token, mdr)) {
markers.emplace_back(mmdr.vpack()); markers.emplace_back(mdr.vpack());
if (++iterations % 10000 == 0) { if (++iterations % 10000 == 0) {
if (syncer.isAborted()) { if (syncer.isAborted()) {
@ -413,7 +414,7 @@ Result handleSyncKeysMMFiles(arangodb::DatabaseInitialSyncer& syncer,
// The LogicalCollection is protected by trx. // The LogicalCollection is protected by trx.
// Neither it nor it's indexes can be invalidated // Neither it nor it's indexes can be invalidated
ManagedDocumentResult mmdr; ManagedDocumentResult mdr;
auto physical = static_cast<MMFilesCollection*>( auto physical = static_cast<MMFilesCollection*>(
trx.documentCollection()->getPhysical()); trx.documentCollection()->getPhysical());
@ -620,9 +621,9 @@ Result handleSyncKeysMMFiles(arangodb::DatabaseInitialSyncer& syncer,
toFetch.emplace_back(i); toFetch.emplace_back(i);
} else { } else {
TRI_voc_rid_t currentRevisionId = 0; TRI_voc_rid_t currentRevisionId = 0;
if (physical->readDocument(&trx, element.localDocumentId(), mmdr)) { if (physical->readDocument(&trx, element.localDocumentId(), mdr)) {
currentRevisionId = transaction::helpers::extractRevFromDocument( currentRevisionId = transaction::helpers::extractRevFromDocument(
VPackSlice(mmdr.vpack())); VPackSlice(mdr.vpack()));
} }
if (TRI_RidToString(currentRevisionId) != pair.at(1).copyString()) { if (TRI_RidToString(currentRevisionId) != pair.at(1).copyString()) {
@ -765,10 +766,10 @@ Result handleSyncKeysMMFiles(arangodb::DatabaseInitialSyncer& syncer,
LocalDocumentId conflictId = LocalDocumentId conflictId =
physical->lookupKey(&trx, conflict.slice()); physical->lookupKey(&trx, conflict.slice());
if (conflictId.isSet()) { if (conflictId.isSet()) {
ManagedDocumentResult mmdr; ManagedDocumentResult mdr;
bool success = physical->readDocument(&trx, conflictId, mmdr); bool success = physical->readDocument(&trx, conflictId, mdr);
if (success) { if (success) {
VPackSlice conflictingKey(mmdr.vpack()); VPackSlice conflictingKey(mdr.vpack());
return trx.remove(coll->name(), conflictingKey, options); return trx.remove(coll->name(), conflictingKey, options);
} }
} }

View File

@ -40,6 +40,7 @@
#include "Transaction/Helpers.h" #include "Transaction/Helpers.h"
#include "Transaction/Methods.h" #include "Transaction/Methods.h"
#include "VocBase/LogicalCollection.h" #include "VocBase/LogicalCollection.h"
#include "VocBase/ManagedDocumentResult.h"
#include <rocksdb/utilities/optimistic_transaction_db.h> #include <rocksdb/utilities/optimistic_transaction_db.h>
#include <rocksdb/utilities/transaction.h> #include <rocksdb/utilities/transaction.h>

View File

@ -37,6 +37,7 @@
#include "Transaction/Helpers.h" #include "Transaction/Helpers.h"
#include "Transaction/Methods.h" #include "Transaction/Methods.h"
#include "VocBase/LogicalCollection.h" #include "VocBase/LogicalCollection.h"
#include "VocBase/ManagedDocumentResult.h"
#ifdef USE_ENTERPRISE #ifdef USE_ENTERPRISE
#include "Enterprise/VocBase/VirtualCollection.h" #include "Enterprise/VocBase/VirtualCollection.h"
@ -80,7 +81,7 @@ bool MMFilesPrimaryIndexIterator::next(LocalDocumentIdCallback const& cb, size_t
return false; return false;
} }
while (_iterator.valid() && limit > 0) { while (_iterator.valid() && limit > 0) {
// TODO: use version that hands in an existing mmdr // TODO: use version that hands in an existing mdr
MMFilesSimpleIndexElement result = MMFilesSimpleIndexElement result =
_index->lookupKey(_trx, _iterator.value()); _index->lookupKey(_trx, _iterator.value());
_iterator.next(); _iterator.next();
@ -273,8 +274,8 @@ void MMFilesPrimaryIndex::unload() {
/// @brief looks up an element given a key /// @brief looks up an element given a key
MMFilesSimpleIndexElement MMFilesPrimaryIndex::lookupKey( MMFilesSimpleIndexElement MMFilesPrimaryIndex::lookupKey(
transaction::Methods* trx, VPackSlice const& key) const { transaction::Methods* trx, VPackSlice const& key) const {
ManagedDocumentResult mmdr; ManagedDocumentResult mdr;
IndexLookupContext context(trx, &_collection, &mmdr, 1); IndexLookupContext context(trx, &_collection, &mdr, 1);
TRI_ASSERT(key.isString()); TRI_ASSERT(key.isString());
return _primaryIndex->findByKey(&context, key.begin()); return _primaryIndex->findByKey(&context, key.begin());
@ -283,8 +284,8 @@ MMFilesSimpleIndexElement MMFilesPrimaryIndex::lookupKey(
/// @brief looks up an element given a key /// @brief looks up an element given a key
MMFilesSimpleIndexElement MMFilesPrimaryIndex::lookupKey( MMFilesSimpleIndexElement MMFilesPrimaryIndex::lookupKey(
transaction::Methods* trx, VPackSlice const& key, transaction::Methods* trx, VPackSlice const& key,
ManagedDocumentResult& mmdr) const { ManagedDocumentResult& mdr) const {
IndexLookupContext context(trx, &_collection, &mmdr, 1); IndexLookupContext context(trx, &_collection, &mdr, 1);
TRI_ASSERT(key.isString()); TRI_ASSERT(key.isString());
return _primaryIndex->findByKey(&context, key.begin()); return _primaryIndex->findByKey(&context, key.begin());
@ -310,8 +311,8 @@ MMFilesSimpleIndexElement* MMFilesPrimaryIndex::lookupKeyRef(
/// @brief looks up an element given a key /// @brief looks up an element given a key
MMFilesSimpleIndexElement* MMFilesPrimaryIndex::lookupKeyRef( MMFilesSimpleIndexElement* MMFilesPrimaryIndex::lookupKeyRef(
transaction::Methods* trx, VPackSlice const& key, transaction::Methods* trx, VPackSlice const& key,
ManagedDocumentResult& mmdr) const { ManagedDocumentResult& mdr) const {
IndexLookupContext context(trx, &_collection, &mmdr, 1); IndexLookupContext context(trx, &_collection, &mdr, 1);
TRI_ASSERT(key.isString()); TRI_ASSERT(key.isString());
MMFilesSimpleIndexElement* element = MMFilesSimpleIndexElement* element =
_primaryIndex->findByKeyRef(&context, key.begin()); _primaryIndex->findByKeyRef(&context, key.begin());
@ -373,16 +374,16 @@ Result MMFilesPrimaryIndex::insertKey(transaction::Methods* trx,
LocalDocumentId const& documentId, LocalDocumentId const& documentId,
VPackSlice const& doc, VPackSlice const& doc,
OperationMode mode) { OperationMode mode) {
ManagedDocumentResult mmdr; ManagedDocumentResult mdr;
return insertKey(trx, documentId, doc, mmdr, mode); return insertKey(trx, documentId, doc, mdr, mode);
} }
Result MMFilesPrimaryIndex::insertKey(transaction::Methods* trx, Result MMFilesPrimaryIndex::insertKey(transaction::Methods* trx,
LocalDocumentId const& documentId, LocalDocumentId const& documentId,
VPackSlice const& doc, VPackSlice const& doc,
ManagedDocumentResult& mmdr, ManagedDocumentResult& mdr,
OperationMode mode) { OperationMode mode) {
IndexLookupContext context(trx, &_collection, &mmdr, 1); IndexLookupContext context(trx, &_collection, &mdr, 1);
MMFilesSimpleIndexElement element(buildKeyElement(documentId, doc)); MMFilesSimpleIndexElement element(buildKeyElement(documentId, doc));
// TODO: we can pass in a special IndexLookupContext which has some more on the information // TODO: we can pass in a special IndexLookupContext which has some more on the information
@ -407,16 +408,16 @@ Result MMFilesPrimaryIndex::removeKey(transaction::Methods* trx,
LocalDocumentId const& documentId, LocalDocumentId const& documentId,
VPackSlice const& doc, VPackSlice const& doc,
OperationMode mode) { OperationMode mode) {
ManagedDocumentResult mmdr; ManagedDocumentResult mdr;
return removeKey(trx, documentId, doc, mmdr, mode); return removeKey(trx, documentId, doc, mdr, mode);
} }
Result MMFilesPrimaryIndex::removeKey(transaction::Methods* trx, Result MMFilesPrimaryIndex::removeKey(transaction::Methods* trx,
LocalDocumentId const&, LocalDocumentId const&,
VPackSlice const& doc, VPackSlice const& doc,
ManagedDocumentResult& mmdr, ManagedDocumentResult& mdr,
OperationMode mode) { OperationMode mode) {
IndexLookupContext context(trx, &_collection, &mmdr, 1); IndexLookupContext context(trx, &_collection, &mdr, 1);
VPackSlice keySlice(transaction::helpers::extractKeyFromDocument(doc)); VPackSlice keySlice(transaction::helpers::extractKeyFromDocument(doc));
MMFilesSimpleIndexElement found = MMFilesSimpleIndexElement found =
_primaryIndex->removeByKey(&context, keySlice.begin()); _primaryIndex->removeByKey(&context, keySlice.begin());

View File

@ -502,7 +502,7 @@ void MMFilesSkiplistInLookupBuilder::buildSearchValues() {
MMFilesSkiplistIterator::MMFilesSkiplistIterator( MMFilesSkiplistIterator::MMFilesSkiplistIterator(
LogicalCollection* collection, transaction::Methods* trx, LogicalCollection* collection, transaction::Methods* trx,
ManagedDocumentResult* mmdr, arangodb::MMFilesSkiplistIndex const* index, ManagedDocumentResult* mdr, arangodb::MMFilesSkiplistIndex const* index,
TRI_Skiplist const* skiplist, size_t numPaths, TRI_Skiplist const* skiplist, size_t numPaths,
std::function<int(void*, MMFilesSkiplistIndexElement const*, std::function<int(void*, MMFilesSkiplistIndexElement const*,
MMFilesSkiplistIndexElement const*, MMFilesSkiplistIndexElement const*,
@ -510,7 +510,7 @@ MMFilesSkiplistIterator::MMFilesSkiplistIterator(
bool reverse, MMFilesBaseSkiplistLookupBuilder* builder) bool reverse, MMFilesBaseSkiplistLookupBuilder* builder)
: IndexIterator(collection, trx), : IndexIterator(collection, trx),
_skiplistIndex(skiplist), _skiplistIndex(skiplist),
_context(trx, collection, mmdr, index->fields().size()), _context(trx, collection, mdr, index->fields().size()),
_numPaths(numPaths), _numPaths(numPaths),
_reverse(reverse), _reverse(reverse),
_cursor(nullptr), _cursor(nullptr),
@ -1176,7 +1176,7 @@ bool MMFilesSkiplistIndex::findMatchingConditions(
} }
IndexIterator* MMFilesSkiplistIndex::iteratorForCondition( IndexIterator* MMFilesSkiplistIndex::iteratorForCondition(
transaction::Methods* trx, ManagedDocumentResult* mmdr, transaction::Methods* trx, ManagedDocumentResult* mdr,
arangodb::aql::AstNode const* node, arangodb::aql::AstNode const* node,
arangodb::aql::Variable const* reference, arangodb::aql::Variable const* reference,
IndexIteratorOptions const& opts) { IndexIteratorOptions const& opts) {
@ -1207,7 +1207,7 @@ IndexIterator* MMFilesSkiplistIndex::iteratorForCondition(
return new MMFilesSkiplistIterator( return new MMFilesSkiplistIterator(
&_collection, &_collection,
trx, trx,
mmdr, mdr,
this, this,
_skiplistIndex, _skiplistIndex,
numPaths(), numPaths(),
@ -1223,7 +1223,7 @@ IndexIterator* MMFilesSkiplistIndex::iteratorForCondition(
return new MMFilesSkiplistIterator( return new MMFilesSkiplistIterator(
&_collection, &_collection,
trx, trx,
mmdr, mdr,
this, this,
_skiplistIndex, _skiplistIndex,
numPaths(), numPaths(),

View File

@ -61,6 +61,7 @@
#include "VocBase/KeyGenerator.h" #include "VocBase/KeyGenerator.h"
#include "VocBase/LocalDocumentId.h" #include "VocBase/LocalDocumentId.h"
#include "VocBase/LogicalCollection.h" #include "VocBase/LogicalCollection.h"
#include "VocBase/ManagedDocumentResult.h"
#include "VocBase/ticks.h" #include "VocBase/ticks.h"
#include "VocBase/voc-types.h" #include "VocBase/voc-types.h"
@ -883,7 +884,7 @@ Result RocksDBCollection::insert(arangodb::transaction::Methods* trx,
if (res.ok()) { if (res.ok()) {
trackWaitForSync(trx, options); trackWaitForSync(trx, options);
if (options.silent) { if (options.silent) {
mdr.reset(); mdr.clear();
} else { } else {
mdr.setManaged(newSlice.begin(), documentId); mdr.setManaged(newSlice.begin(), documentId);
TRI_ASSERT(!mdr.empty()); TRI_ASSERT(!mdr.empty());
@ -950,8 +951,7 @@ Result RocksDBCollection::update(arangodb::transaction::Methods* trx,
if (newSlice.length() <= 1) { if (newSlice.length() <= 1) {
// shortcut. no need to do anything // shortcut. no need to do anything
previous.clone(mdr); mdr = previous;
TRI_ASSERT(!mdr.empty()); TRI_ASSERT(!mdr.empty());
trackWaitForSync(trx, options); trackWaitForSync(trx, options);
@ -997,7 +997,7 @@ Result RocksDBCollection::update(arangodb::transaction::Methods* trx,
trackWaitForSync(trx, options); trackWaitForSync(trx, options);
if (options.silent) { if (options.silent) {
mdr.reset(); mdr.clear();
} else { } else {
mdr.setManaged(newDoc.begin(), documentId); mdr.setManaged(newDoc.begin(), documentId);
TRI_ASSERT(!mdr.empty()); TRI_ASSERT(!mdr.empty());
@ -1107,7 +1107,7 @@ Result RocksDBCollection::replace(transaction::Methods* trx,
trackWaitForSync(trx, options); trackWaitForSync(trx, options);
if (options.silent) { if (options.silent) {
mdr.reset(); mdr.clear();
} else { } else {
mdr.setManaged(newDoc.begin(), documentId); mdr.setManaged(newDoc.begin(), documentId);
TRI_ASSERT(!mdr.empty()); TRI_ASSERT(!mdr.empty());
@ -1532,7 +1532,7 @@ arangodb::Result RocksDBCollection::lookupDocumentVPack(
auto f = _cache->find(key->string().data(), auto f = _cache->find(key->string().data(),
static_cast<uint32_t>(key->string().size())); static_cast<uint32_t>(key->string().size()));
if (f.found()) { if (f.found()) {
std::string* value = mdr.prepareStringUsage(); std::string* value = mdr.string();
value->append(reinterpret_cast<char const*>(f.value()->value()), value->append(reinterpret_cast<char const*>(f.value()->value()),
f.value()->valueSize()); f.value()->valueSize());
mdr.setManagedAfterStringUsage(documentId); mdr.setManagedAfterStringUsage(documentId);
@ -1545,7 +1545,7 @@ arangodb::Result RocksDBCollection::lookupDocumentVPack(
} }
RocksDBMethods* mthd = RocksDBTransactionState::toMethods(trx); RocksDBMethods* mthd = RocksDBTransactionState::toMethods(trx);
std::string* value = mdr.prepareStringUsage(); std::string* value = mdr.string();
Result res = mthd->Get(RocksDBColumnFamily::documents(), key.ref(), value); Result res = mthd->Get(RocksDBColumnFamily::documents(), key.ref(), value);
if (res.ok()) { if (res.ok()) {
@ -1577,7 +1577,7 @@ arangodb::Result RocksDBCollection::lookupDocumentVPack(
<< "NOT FOUND rev: " << documentId.id() << " trx: " << trx->state()->id() << "NOT FOUND rev: " << documentId.id() << " trx: " << trx->state()->id()
<< " seq: " << mthd->sequenceNumber() << " seq: " << mthd->sequenceNumber()
<< " objectID " << _objectId << " name: " << _logicalCollection.name(); << " objectID " << _objectId << " name: " << _logicalCollection.name();
mdr.reset(); mdr.clear();
} }
return res; return res;

View File

@ -30,7 +30,6 @@
#include "RocksDBEngine/RocksDBCommon.h" #include "RocksDBEngine/RocksDBCommon.h"
#include "StorageEngine/PhysicalCollection.h" #include "StorageEngine/PhysicalCollection.h"
#include "VocBase/LogicalCollection.h" #include "VocBase/LogicalCollection.h"
#include "VocBase/ManagedDocumentResult.h"
namespace rocksdb { namespace rocksdb {
class Transaction; class Transaction;

View File

@ -38,6 +38,7 @@
#include "Utils/OperationOptions.h" #include "Utils/OperationOptions.h"
#include "VocBase/LocalDocumentId.h" #include "VocBase/LocalDocumentId.h"
#include "VocBase/LogicalCollection.h" #include "VocBase/LogicalCollection.h"
#include "VocBase/ManagedDocumentResult.h"
#include <velocypack/Builder.h> #include <velocypack/Builder.h>
#include <velocypack/Iterator.h> #include <velocypack/Iterator.h>

View File

@ -1724,111 +1724,18 @@ OperationResult transaction::Methods::insertLocal(
} }
if (res.ok() && replicationType == ReplicationType::LEADER) { if (res.ok() && replicationType == ReplicationType::LEADER) {
TRI_ASSERT(followers != nullptr);
// Now replicate the same operation on all followers: // Now replicate the same operation on all followers:
Result r = replicateOperations(
collection,
value,
resultBuilder,
followers,
arangodb::rest::RequestType::POST,
"&" + StaticStrings::OverWrite + "=" + (options.overwrite ? "true" : "false")
);
// In the multi babies case res is always TRI_ERROR_NO_ERROR if we if (r.fail()) {
// get here, in the single document case, we do not try to replicate return OperationResult(r);
// in case of an error.
// Now replicate the good operations on all followers:
std::string path =
"/_db/" + arangodb::basics::StringUtils::urlEncode(vocbase().name()) +
"/_api/document/" + arangodb::basics::StringUtils::urlEncode(collection->name()) +
"?isRestore=true&isSynchronousReplication=" + ServerState::instance()->getId() +
"&" + StaticStrings::SilentString + "=true" +
"&" + StaticStrings::OverWrite + "=" + (options.overwrite ? "true" : "false");
transaction::BuilderLeaser payload(this);
auto doOneDoc = [&](VPackSlice const& doc, VPackSlice result) {
VPackObjectBuilder guard(payload.get());
VPackSlice s = result.get(StaticStrings::KeyString);
payload->add(StaticStrings::KeyString, s);
s = result.get(StaticStrings::RevString);
payload->add(StaticStrings::RevString, s);
TRI_SanitizeObject(doc, *payload.get());
};
VPackSlice ourResult = resultBuilder.slice();
size_t count = 0;
if (value.isArray()) {
VPackArrayBuilder guard(payload.get());
VPackArrayIterator itValue(value);
VPackArrayIterator itResult(ourResult);
while (itValue.valid() && itResult.valid()) {
TRI_ASSERT((*itResult).isObject());
if (!(*itResult).hasKey(StaticStrings::Error)) {
doOneDoc(itValue.value(), itResult.value());
count++;
}
itValue.next();
itResult.next();
}
} else {
doOneDoc(value, ourResult);
count++;
}
if (count > 0) {
auto body = std::make_shared<std::string>();
*body = payload->slice().toJson();
// Now prepare the requests:
std::vector<ClusterCommRequest> requests;
requests.reserve(followers->size());
for (auto const& f : *followers) {
requests.emplace_back("server:" + f, arangodb::rest::RequestType::POST,
path, body);
}
auto cc = arangodb::ClusterComm::instance();
if (cc != nullptr) {
// nullptr only happens on controlled shutdown
size_t nrDone = 0;
size_t nrGood = cc->performRequests(requests,
chooseTimeout(count, body->size()*followers->size()),
nrDone, Logger::REPLICATION, false);
if (nrGood < followers->size()) {
// If any would-be-follower refused to follow there must be a
// new leader in the meantime, in this case we must not allow
// this operation to succeed, we simply return with a refusal
// error (note that we use the follower version, since we have
// lost leadership):
if (findRefusal(requests)) {
return OperationResult(TRI_ERROR_CLUSTER_SHARD_LEADER_RESIGNED, options);
}
// Otherwise we drop all followers that were not successful:
for (size_t i = 0; i < followers->size(); ++i) {
bool replicationWorked =
requests[i].done &&
requests[i].result.status == CL_COMM_RECEIVED &&
(requests[i].result.answer_code ==
rest::ResponseCode::ACCEPTED ||
requests[i].result.answer_code == rest::ResponseCode::CREATED);
if (replicationWorked) {
bool found;
requests[i].result.answer->header(StaticStrings::ErrorCodes,
found);
replicationWorked = !found;
}
if (!replicationWorked) {
auto const& followerInfo = collection->followers();
if (followerInfo->remove((*followers)[i])) {
LOG_TOPIC(WARN, Logger::REPLICATION)
<< "insertLocal: dropping follower " << (*followers)[i]
<< " for shard " << collectionName;
} else {
LOG_TOPIC(ERR, Logger::REPLICATION)
<< "insertLocal: could not drop follower "
<< (*followers)[i] << " for shard " << collectionName;
THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_COULD_NOT_DROP_FOLLOWER);
}
}
}
}
}
} }
} }
@ -2095,118 +2002,19 @@ OperationResult transaction::Methods::modifyLocal(
res = workForOneDocument(newValue, false); res = workForOneDocument(newValue, false);
} }
// Now see whether or not we have to do synchronous replication:
if (res.ok() && replicationType == ReplicationType::LEADER) { if (res.ok() && replicationType == ReplicationType::LEADER) {
TRI_ASSERT(followers != nullptr);
// Now replicate the same operation on all followers: // Now replicate the same operation on all followers:
Result r = replicateOperations(
collection,
newValue,
resultBuilder,
followers,
(operation == TRI_VOC_DOCUMENT_OPERATION_REPLACE ? arangodb::rest::RequestType::PUT : arangodb::rest::RequestType::PATCH),
""
);
// In the multi babies case res is always TRI_ERROR_NO_ERROR if we if (r.fail()) {
// get here, in the single document case, we do not try to replicate return OperationResult(r);
// in case of an error.
// Now replicate the good operations on all followers:
auto cc = arangodb::ClusterComm::instance();
if (cc != nullptr) {
// nullptr only happens on controlled shutdown
std::string path =
"/_db/" +
arangodb::basics::StringUtils::urlEncode(vocbase().name()) +
"/_api/document/" +
arangodb::basics::StringUtils::urlEncode(collection->name()) +
"?isRestore=true&isSynchronousReplication=" +
ServerState::instance()->getId() + "&" + StaticStrings::SilentString + "=true";
transaction::BuilderLeaser payload(this);
auto doOneDoc = [&](VPackSlice const& doc, VPackSlice result) {
VPackObjectBuilder guard(payload.get());
VPackSlice s = result.get(StaticStrings::KeyString);
payload->add(StaticStrings::KeyString, s);
s = result.get(StaticStrings::RevString);
payload->add(StaticStrings::RevString, s);
TRI_SanitizeObject(doc, *payload.get());
};
VPackSlice ourResult = resultBuilder.slice();
size_t count = 0;
if (multiCase) {
VPackArrayBuilder guard(payload.get());
VPackArrayIterator itValue(newValue);
VPackArrayIterator itResult(ourResult);
while (itValue.valid() && itResult.valid()) {
TRI_ASSERT((*itResult).isObject());
if (!(*itResult).hasKey(StaticStrings::Error)) {
doOneDoc(itValue.value(), itResult.value());
count++;
}
itValue.next();
itResult.next();
}
} else {
VPackArrayBuilder guard(payload.get());
doOneDoc(newValue, ourResult);
count++;
}
if (count > 0) {
auto body = std::make_shared<std::string>();
*body = payload->slice().toJson();
// Now prepare the requests:
std::vector<ClusterCommRequest> requests;
requests.reserve(followers->size());
for (auto const& f : *followers) {
requests.emplace_back("server:" + f,
operation == TRI_VOC_DOCUMENT_OPERATION_REPLACE
? arangodb::rest::RequestType::PUT
: arangodb::rest::RequestType::PATCH,
path, body);
}
size_t nrDone = 0;
size_t nrGood = cc->performRequests(requests,
chooseTimeout(count, body->size()*followers->size()),
nrDone, Logger::REPLICATION, false);
if (nrGood < followers->size()) {
// If any would-be-follower refused to follow there must be a
// new leader in the meantime, in this case we must not allow
// this operation to succeed, we simply return with a refusal
// error (note that we use the follower version, since we have
// lost leadership):
if (findRefusal(requests)) {
return OperationResult(TRI_ERROR_CLUSTER_SHARD_LEADER_RESIGNED);
}
// Otherwise we drop all followers that were not successful:
for (size_t i = 0; i < followers->size(); ++i) {
bool replicationWorked =
requests[i].done &&
requests[i].result.status == CL_COMM_RECEIVED &&
(requests[i].result.answer_code ==
rest::ResponseCode::ACCEPTED ||
requests[i].result.answer_code == rest::ResponseCode::OK);
if (replicationWorked) {
bool found;
requests[i].result.answer->header(StaticStrings::ErrorCodes,
found);
replicationWorked = !found;
}
if (!replicationWorked) {
auto const& followerInfo = collection->followers();
if (followerInfo->remove((*followers)[i])) {
LOG_TOPIC(WARN, Logger::REPLICATION)
<< "modifyLocal: dropping follower " << (*followers)[i]
<< " for shard " << collectionName;
} else {
LOG_TOPIC(ERR, Logger::REPLICATION)
<< "modifyLocal: could not drop follower "
<< (*followers)[i] << " for shard " << collectionName;
THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_COULD_NOT_DROP_FOLLOWER);
}
}
}
}
}
} }
} }
@ -2392,116 +2200,19 @@ OperationResult transaction::Methods::removeLocal(
res = workForOneDocument(value, false); res = workForOneDocument(value, false);
} }
// Now see whether or not we have to do synchronous replication:
if (res.ok() && replicationType == ReplicationType::LEADER) { if (res.ok() && replicationType == ReplicationType::LEADER) {
TRI_ASSERT(followers != nullptr);
// Now replicate the same operation on all followers: // Now replicate the same operation on all followers:
Result r = replicateOperations(
collection,
value,
resultBuilder,
followers,
arangodb::rest::RequestType::DELETE_REQ,
""
);
// In the multi babies case res is always TRI_ERROR_NO_ERROR if we if (r.fail()) {
// get here, in the single document case, we do not try to replicate return OperationResult(r);
// in case of an error.
// Now replicate the good operations on all followers:
auto cc = arangodb::ClusterComm::instance();
if (cc != nullptr) {
// nullptr only happens on controled shutdown
std::string path =
"/_db/" +
arangodb::basics::StringUtils::urlEncode(vocbase().name()) +
"/_api/document/" +
arangodb::basics::StringUtils::urlEncode(collection->name()) +
"?isRestore=true&isSynchronousReplication=" +
ServerState::instance()->getId() + "&" + StaticStrings::SilentString + "=true";
transaction::BuilderLeaser payload(this);
auto doOneDoc = [&](VPackSlice const& doc, VPackSlice result) {
VPackObjectBuilder guard(payload.get());
VPackSlice s = result.get(StaticStrings::KeyString);
payload->add(StaticStrings::KeyString, s);
s = result.get(StaticStrings::RevString);
payload->add(StaticStrings::RevString, s);
};
VPackSlice ourResult = resultBuilder.slice();
size_t count = 0;
if (value.isArray()) {
VPackArrayBuilder guard(payload.get());
VPackArrayIterator itValue(value);
VPackArrayIterator itResult(ourResult);
while (itValue.valid() && itResult.valid()) {
TRI_ASSERT((*itResult).isObject());
if (!(*itResult).hasKey(StaticStrings::Error)) {
doOneDoc(itValue.value(), itResult.value());
count++;
}
itValue.next();
itResult.next();
}
} else {
VPackArrayBuilder guard(payload.get());
doOneDoc(value, ourResult);
count++;
}
if (count > 0) {
auto body = std::make_shared<std::string>();
*body = payload->slice().toJson();
// Now prepare the requests:
std::vector<ClusterCommRequest> requests;
requests.reserve(followers->size());
for (auto const& f : *followers) {
requests.emplace_back("server:" + f,
arangodb::rest::RequestType::DELETE_REQ, path,
body);
}
size_t nrDone = 0;
size_t nrGood = cc->performRequests(requests,
chooseTimeout(count, body->size()*followers->size()),
nrDone, Logger::REPLICATION, false);
if (nrGood < followers->size()) {
// If any would-be-follower refused to follow there must be a
// new leader in the meantime, in this case we must not allow
// this operation to succeed, we simply return with a refusal
// error (note that we use the follower version, since we have
// lost leadership):
if (findRefusal(requests)) {
return OperationResult(TRI_ERROR_CLUSTER_SHARD_LEADER_RESIGNED);
}
// we drop all followers that were not successful:
for (size_t i = 0; i < followers->size(); ++i) {
bool replicationWorked =
requests[i].done &&
requests[i].result.status == CL_COMM_RECEIVED &&
(requests[i].result.answer_code ==
rest::ResponseCode::ACCEPTED ||
requests[i].result.answer_code == rest::ResponseCode::OK);
if (replicationWorked) {
bool found;
requests[i].result.answer->header(StaticStrings::ErrorCodes,
found);
replicationWorked = !found;
}
if (!replicationWorked) {
auto const& followerInfo = collection->followers();
if (followerInfo->remove((*followers)[i])) {
LOG_TOPIC(WARN, Logger::REPLICATION)
<< "removeLocal: dropping follower " << (*followers)[i]
<< " for shard " << collectionName;
} else {
LOG_TOPIC(ERR, Logger::REPLICATION)
<< "removeLocal: could not drop follower "
<< (*followers)[i] << " for shard " << collectionName;
THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_COULD_NOT_DROP_FOLLOWER);
}
}
}
}
}
} }
} }
@ -3473,3 +3184,133 @@ Result transaction::Methods::resolveId(char const* handle, size_t length,
return TRI_ERROR_NO_ERROR; return TRI_ERROR_NO_ERROR;
} }
Result Methods::replicateOperations(LogicalCollection* collection,
VPackSlice const& inputValue,
VPackBuilder const& resultBuilder,
std::shared_ptr<std::vector<std::string> const>& followers,
arangodb::rest::RequestType requestType,
std::string const& pathAppendix) {
TRI_ASSERT(collection != nullptr);
TRI_ASSERT(followers != nullptr);
// Now replicate the good operations on all followers:
auto cc = arangodb::ClusterComm::instance();
if (cc == nullptr) {
// nullptr happens only on controlled shutdown
return Result();
}
transaction::BuilderLeaser payload(this);
auto doOneDoc = [&](VPackSlice const& doc, VPackSlice result) {
VPackObjectBuilder guard(payload.get());
VPackSlice s = result.get(StaticStrings::KeyString);
payload->add(StaticStrings::KeyString, s);
s = result.get(StaticStrings::RevString);
payload->add(StaticStrings::RevString, s);
if (requestType != arangodb::rest::RequestType::DELETE_REQ) {
// no need to add document data for remove operations
TRI_SanitizeObject(doc, *payload.get());
}
};
VPackSlice ourResult = resultBuilder.slice();
size_t count = 0;
if (inputValue.isArray()) {
VPackArrayBuilder guard(payload.get());
VPackArrayIterator itValue(inputValue);
VPackArrayIterator itResult(ourResult);
while (itValue.valid() && itResult.valid()) {
TRI_ASSERT((*itResult).isObject());
if (!(*itResult).hasKey(StaticStrings::Error)) {
doOneDoc(itValue.value(), itResult.value());
count++;
}
itValue.next();
itResult.next();
}
} else {
if (requestType == arangodb::rest::RequestType::POST) {
doOneDoc(inputValue, ourResult);
} else {
VPackArrayBuilder guard(payload.get());
doOneDoc(inputValue, ourResult);
}
count++;
}
if (count == 0) {
// nothing to do
return Result();
}
std::string path =
"/_db/" + arangodb::basics::StringUtils::urlEncode(collection->vocbase().name()) +
"/_api/document/" + arangodb::basics::StringUtils::urlEncode(collection->name()) +
"?isRestore=true&isSynchronousReplication=" + ServerState::instance()->getId() +
"&" + StaticStrings::SilentString + "=true" + pathAppendix;
auto body = std::make_shared<std::string>();
*body = payload->slice().toJson();
// Now prepare the requests:
std::vector<ClusterCommRequest> requests;
requests.reserve(followers->size());
for (auto const& f : *followers) {
requests.emplace_back("server:" + f, requestType, path, body);
}
double const timeout = chooseTimeout(count, body->size() * followers->size());
size_t nrDone = 0;
size_t nrGood = cc->performRequests(requests, timeout, nrDone, Logger::REPLICATION, false);
if (nrGood == followers->size()) {
return Result();
}
// If any would-be-follower refused to follow there must be a
// new leader in the meantime, in this case we must not allow
// this operation to succeed, we simply return with a refusal
// error (note that we use the follower version, since we have
// lost leadership):
if (findRefusal(requests)) {
return Result(TRI_ERROR_CLUSTER_SHARD_LEADER_RESIGNED);
}
// Otherwise we drop all followers that were not successful:
for (size_t i = 0; i < followers->size(); ++i) {
bool replicationWorked =
requests[i].done &&
requests[i].result.status == CL_COMM_RECEIVED &&
(requests[i].result.answer_code == rest::ResponseCode::ACCEPTED ||
requests[i].result.answer_code == rest::ResponseCode::CREATED ||
requests[i].result.answer_code == rest::ResponseCode::OK);
if (replicationWorked) {
bool found;
requests[i].result.answer->header(StaticStrings::ErrorCodes, found);
replicationWorked = !found;
}
if (!replicationWorked) {
auto const& followerInfo = collection->followers();
if (followerInfo->remove((*followers)[i])) {
LOG_TOPIC(WARN, Logger::REPLICATION)
<< "synchronous replication: dropping follower " << (*followers)[i]
<< " for shard " << collection->name();
} else {
LOG_TOPIC(ERR, Logger::REPLICATION)
<< "synchronous replication: could not drop follower "
<< (*followers)[i] << " for shard " << collection->name();
THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_COULD_NOT_DROP_FOLLOWER);
}
}
}
// we return "ok" here still.
return Result();
}

View File

@ -28,11 +28,12 @@
#include "Basics/Exceptions.h" #include "Basics/Exceptions.h"
#include "Basics/StringRef.h" #include "Basics/StringRef.h"
#include "Basics/Result.h" #include "Basics/Result.h"
#include "Utils/OperationResult.h" #include "Rest/CommonDefines.h"
#include "Transaction/CountCache.h" #include "Transaction/CountCache.h"
#include "Transaction/Hints.h" #include "Transaction/Hints.h"
#include "Transaction/Options.h" #include "Transaction/Options.h"
#include "Transaction/Status.h" #include "Transaction/Status.h"
#include "Utils/OperationResult.h"
#include "VocBase/AccessMode.h" #include "VocBase/AccessMode.h"
#include "VocBase/vocbase.h" #include "VocBase/vocbase.h"
#include "VocBase/voc-types.h" #include "VocBase/voc-types.h"
@ -551,8 +552,15 @@ class Methods {
ENTERPRISE_VIRT Result unlockRecursive(TRI_voc_cid_t, AccessMode::Type); ENTERPRISE_VIRT Result unlockRecursive(TRI_voc_cid_t, AccessMode::Type);
private: private:
/// @brief replicates operations from leader to follower(s)
Result replicateOperations(LogicalCollection* collection,
arangodb::velocypack::Slice const& inputValue,
arangodb::velocypack::Builder const& resultBuilder,
std::shared_ptr<std::vector<std::string> const>& followers,
arangodb::rest::RequestType requestType,
std::string const& pathAppendix);
/// @brief Helper create a Cluster Communication document /// @brief Helper create a Cluster Communication document
OperationResult clusterResultDocument( OperationResult clusterResultDocument(
rest::ResponseCode const& responseCode, rest::ResponseCode const& responseCode,
std::shared_ptr<arangodb::velocypack::Builder> const& resultBody, std::shared_ptr<arangodb::velocypack::Builder> const& resultBody,

View File

@ -73,16 +73,12 @@ class LocalDocumentId {
void clear() { _id = 0; } void clear() { _id = 0; }
/// @brief create a not-set document id /// @brief create a not-set document id
// clang does not like:
// static constexpr LocalDocumentId none() { return LocalDocumentId(0); }
static LocalDocumentId none() { return LocalDocumentId(0); } static LocalDocumentId none() { return LocalDocumentId(0); }
/// @brief create a new document id /// @brief create a new document id
static LocalDocumentId create() { return LocalDocumentId(TRI_HybridLogicalClock()); } static LocalDocumentId create() { return LocalDocumentId(TRI_HybridLogicalClock()); }
/// @brief create a document id from an existing id /// @brief create a document id from an existing id
// clang does not like:
// static constexpr LocalDocumentId create(BaseType id) { return LocalDocumentId(id); }
static LocalDocumentId create(BaseType id) { return LocalDocumentId(id); } static LocalDocumentId create(BaseType id) { return LocalDocumentId(id); }
private: private:

View File

@ -29,83 +29,36 @@
#include <velocypack/velocypack-aliases.h> #include <velocypack/velocypack-aliases.h>
using namespace arangodb; using namespace arangodb;
using namespace arangodb::aql;
void ManagedDocumentResult::clone(ManagedDocumentResult& cloned) const {
cloned.reset();
if (_useString) {
cloned._useString = true;
cloned._string = _string;
cloned._localDocumentId = _localDocumentId;
cloned._vpack = reinterpret_cast<uint8_t*>(const_cast<char*>(cloned._string.data()));
} else if (_managed) {
cloned.setManaged(_vpack, _localDocumentId);
} else {
cloned.setUnmanaged(_vpack, _localDocumentId);
}
}
//add unmanaged vpack
void ManagedDocumentResult::setUnmanaged(uint8_t const* vpack, LocalDocumentId const& documentId) { void ManagedDocumentResult::setUnmanaged(uint8_t const* vpack, LocalDocumentId const& documentId) {
if(_managed || _useString) { _string.clear();
reset();
}
TRI_ASSERT(_length == 0);
_vpack = const_cast<uint8_t*>(vpack); _vpack = const_cast<uint8_t*>(vpack);
_localDocumentId = documentId; _localDocumentId = documentId;
_managed = false;
} }
void ManagedDocumentResult::setManaged(uint8_t const* vpack, LocalDocumentId const& documentId) { void ManagedDocumentResult::setManaged(uint8_t const* vpack, LocalDocumentId const& documentId) {
VPackSlice slice(vpack); _string.assign(reinterpret_cast<char const*>(vpack), VPackSlice(vpack).byteSize());
auto newLen = slice.byteSize(); _vpack = nullptr;
if (_length >= newLen && _managed){
std::memcpy(_vpack, vpack, newLen);
} else {
reset();
_vpack = new uint8_t[newLen];
std::memcpy(_vpack, vpack, newLen);
_length=newLen;
}
_localDocumentId = documentId; _localDocumentId = documentId;
_managed = true; _managed = true;
} }
void ManagedDocumentResult::setManagedAfterStringUsage(LocalDocumentId const& documentId) { void ManagedDocumentResult::setManagedAfterStringUsage(LocalDocumentId const& documentId) {
TRI_ASSERT(!_string.empty());
TRI_ASSERT(_useString);
_vpack = reinterpret_cast<uint8_t*>(const_cast<char*>(_string.data()));
_localDocumentId = documentId;
_useString = true;
}
void ManagedDocumentResult::setManaged(std::string&& str, LocalDocumentId const& documentId) {
reset();
_string = std::move(str);
_vpack = reinterpret_cast<uint8_t*>(const_cast<char*>(_string.data()));
_localDocumentId = documentId;
_useString = true;
}
void ManagedDocumentResult::reset() noexcept {
if (_managed) {
delete[] _vpack;
}
_managed = false;
_length = 0;
if (_useString) {
_string.clear();
_useString = false;
}
_localDocumentId.clear();
_vpack = nullptr; _vpack = nullptr;
_localDocumentId = documentId;
_managed = true;
} }
void ManagedDocumentResult::addToBuilder(velocypack::Builder& builder, bool allowExternals) const { void ManagedDocumentResult::addToBuilder(velocypack::Builder& builder, bool allowExternals) const {
TRI_ASSERT(!empty()); uint8_t const* vpack;
auto slice = velocypack::Slice(_vpack); if (_managed) {
vpack = reinterpret_cast<uint8_t const*>(_string.data());
} else {
vpack = _vpack;
}
TRI_ASSERT(vpack != nullptr);
auto slice = velocypack::Slice(vpack);
TRI_ASSERT(!slice.isExternal()); TRI_ASSERT(!slice.isExternal());
if (allowExternals && canUseInExternal()) { if (allowExternals && canUseInExternal()) {
builder.addExternal(slice.begin()); builder.addExternal(slice.begin());

View File

@ -33,83 +33,75 @@ namespace velocypack {
class Builder; class Builder;
} }
namespace aql {
struct AqlValue;
}
class ManagedDocumentResult { class ManagedDocumentResult {
public: public:
ManagedDocumentResult() : ManagedDocumentResult() :
_length(0),
_localDocumentId(),
_vpack(nullptr), _vpack(nullptr),
_managed(false), _managed(false) {}
_useString(false) {}
~ManagedDocumentResult() { reset(); }
ManagedDocumentResult(ManagedDocumentResult const& other) = delete;
ManagedDocumentResult& operator=(ManagedDocumentResult const& other) = delete;
ManagedDocumentResult& operator=(ManagedDocumentResult&& other) { ManagedDocumentResult(ManagedDocumentResult const& other) = default;
if (other._useString) { ManagedDocumentResult& operator=(ManagedDocumentResult const& other) = default;
setManaged(std::move(other._string), other._localDocumentId);
other._managed = false; ManagedDocumentResult& operator=(ManagedDocumentResult&& other) noexcept {
other.reset(); _string = std::move(other._string);
} else if (other._managed) { _vpack = other._vpack;
reset(); _localDocumentId = other._localDocumentId;
_vpack = other._vpack; _managed = other._managed;
_length = other._length;
_localDocumentId = other._localDocumentId; other.clear();
_managed = true;
other._managed = false;
other.reset();
} else {
setUnmanaged(other._vpack, other._localDocumentId);
}
return *this; return *this;
} }
ManagedDocumentResult(ManagedDocumentResult&& other) = delete; ManagedDocumentResult(ManagedDocumentResult&& other) noexcept
: _string(std::move(other._string)),
_vpack(other._vpack),
_localDocumentId(other._localDocumentId),
_managed(other._managed) {
other.clear();
}
void clone(ManagedDocumentResult& cloned) const;
//add unmanaged vpack
void setUnmanaged(uint8_t const* vpack, LocalDocumentId const& documentId); void setUnmanaged(uint8_t const* vpack, LocalDocumentId const& documentId);
void setManaged(uint8_t const* vpack, LocalDocumentId const& documentId); void setManaged(uint8_t const* vpack, LocalDocumentId const& documentId);
void setManaged(std::string&& str, LocalDocumentId const& documentId);
inline LocalDocumentId localDocumentId() const { return _localDocumentId; }
void reset() noexcept;
std::string* prepareStringUsage() {
reset();
_useString = true;
return &_string;
}
void setManagedAfterStringUsage(LocalDocumentId const& documentId); void setManagedAfterStringUsage(LocalDocumentId const& documentId);
inline LocalDocumentId localDocumentId() const { return _localDocumentId; }
void clear() noexcept {
_string.clear();
_vpack = nullptr;
_localDocumentId.clear();
_managed = false;
}
std::string* string() {
return &_string;
}
inline uint8_t const* vpack() const { inline uint8_t const* vpack() const {
if (_managed) {
return reinterpret_cast<uint8_t const*>(_string.data());
}
TRI_ASSERT(_vpack != nullptr); TRI_ASSERT(_vpack != nullptr);
return _vpack; return _vpack;
} }
inline bool empty() const { return _vpack == nullptr; } inline bool empty() const {
return (!_managed && _vpack == nullptr);
}
inline bool canUseInExternal() const { inline bool canUseInExternal() const {
return (!_managed && !_useString); return !_managed;
} }
void addToBuilder(velocypack::Builder& builder, bool allowExternals) const; void addToBuilder(velocypack::Builder& builder, bool allowExternals) const;
private: private:
uint64_t _length;
LocalDocumentId _localDocumentId;
uint8_t* _vpack;
std::string _string; std::string _string;
uint8_t* _vpack;
LocalDocumentId _localDocumentId;
bool _managed; bool _managed;
bool _useString;
}; };
} }