1
0
Fork 0

Return offending key for unique constraint violations (#3624)

This commit is contained in:
Dan Larkin 2017-11-10 10:03:10 -05:00 committed by Frank Celler
parent 733f27e997
commit 68bd31ac99
38 changed files with 620 additions and 309 deletions

View File

@ -527,7 +527,7 @@ void Index::batchInsert(
std::vector<std::pair<LocalDocumentId, arangodb::velocypack::Slice>> const& documents,
std::shared_ptr<arangodb::basics::LocalTaskQueue> queue) {
for (auto const& it : documents) {
Result status = insert(trx, it.first, it.second, false);
Result status = insert(trx, it.first, it.second, OperationMode::normal);
if (status.errorNumber() != TRI_ERROR_NO_ERROR) {
queue->setStatus(status.errorNumber());
break;

View File

@ -88,6 +88,13 @@ class Index {
TRI_IDX_TYPE_NO_ACCESS_INDEX
};
// mode to signal how operation should behave
enum OperationMode {
normal,
internal,
rollback
};
public:
/// @brief return the index id
inline TRI_idx_iid_t id() const { return _iid; }
@ -227,7 +234,7 @@ class Index {
/// attribute attribute, a Slice would be more flexible.
double selectivityEstimate(
arangodb::StringRef const* extra = nullptr) const;
virtual double selectivityEstimateLocal(
arangodb::StringRef const* extra) const;
@ -244,10 +251,14 @@ class Index {
virtual void toVelocyPackFigures(arangodb::velocypack::Builder&) const;
std::shared_ptr<arangodb::velocypack::Builder> toVelocyPackFigures() const;
virtual Result insert(transaction::Methods*, LocalDocumentId const& documentId,
arangodb::velocypack::Slice const&, bool isRollback) = 0;
virtual Result remove(transaction::Methods*, LocalDocumentId const& documentId,
arangodb::velocypack::Slice const&, bool isRollback) = 0;
virtual Result insert(transaction::Methods*,
LocalDocumentId const& documentId,
arangodb::velocypack::Slice const&,
OperationMode mode) = 0;
virtual Result remove(transaction::Methods*,
LocalDocumentId const& documentId,
arangodb::velocypack::Slice const&,
OperationMode mode) = 0;
virtual void batchInsert(
transaction::Methods*,
@ -324,7 +335,7 @@ class Index {
mutable bool _unique;
mutable bool _sparse;
double _clusterSelectivity;
};
}

View File

@ -59,6 +59,15 @@ class IndexResult : public Result {
}
}
}
IndexResult(int errorNumber, Index const* index, std::string key) :
IndexResult(errorNumber, index) {
// provide conflicting key
if (key.length() > 0) {
_errorMessage.append("; conflicting key: ");
_errorMessage.append(key);
}
}
};
} // namespace arangodb

View File

@ -259,7 +259,8 @@ int MMFilesCollection::OpenIteratorHandleDocumentMarker(
// insert into primary index
Result res = state->_primaryIndex->insertKey(trx, localDocumentId,
VPackSlice(vpack),
state->_mmdr);
state->_mmdr,
Index::OperationMode::normal);
if (res.errorNumber() != TRI_ERROR_NO_ERROR) {
physical->removeLocalDocumentId(localDocumentId, false);
@ -394,7 +395,7 @@ int MMFilesCollection::OpenIteratorHandleDeletionMarker(
state->_dfi->numberDeletions++;
state->_primaryIndex->removeKey(trx, oldLocalDocumentId, VPackSlice(vpack),
state->_mmdr);
state->_mmdr, Index::OperationMode::normal);
physical->removeLocalDocumentId(oldLocalDocumentId, true);
}
@ -2747,7 +2748,7 @@ void MMFilesCollection::truncate(transaction::Methods* trx,
if (vpack != nullptr) {
builder->clear();
VPackSlice oldDoc(vpack);
LocalDocumentId const documentId = LocalDocumentId::create();
TRI_voc_rid_t revisionId;
newObjectForRemove(trx, oldDoc, documentId, *builder.get(), options.isRestore, revisionId);
@ -2780,7 +2781,7 @@ Result MMFilesCollection::insert(transaction::Methods* trx,
OperationOptions& options,
TRI_voc_tick_t& resultMarkerTick, bool lock,
TRI_voc_tick_t& revisionId) {
VPackSlice fromSlice;
VPackSlice toSlice;
@ -2907,8 +2908,8 @@ Result MMFilesCollection::insert(transaction::Methods* trx,
try {
// insert into indexes
res = insertDocument(trx, documentId, revisionId, doc, operation, marker,
options.waitForSync);
res = insertDocument(trx, documentId, revisionId, doc, operation,
marker, options, options.waitForSync);
} catch (basics::Exception const& ex) {
res = Result(ex.code());
} catch (std::bad_alloc const&) {
@ -3056,26 +3057,27 @@ void MMFilesCollection::removeLocalDocumentId(LocalDocumentId const& documentId,
/// @brief creates a new entry in the primary index
Result MMFilesCollection::insertPrimaryIndex(transaction::Methods* trx,
LocalDocumentId const& documentId,
VPackSlice const& doc) {
VPackSlice const& doc,
OperationOptions& options) {
TRI_IF_FAILURE("InsertPrimaryIndex") { return Result(TRI_ERROR_DEBUG); }
// insert into primary index
return primaryIndex()->insertKey(trx, documentId, doc);
return primaryIndex()->insertKey(trx, documentId, doc, options.indexOpMode);
}
/// @brief deletes an entry from the primary index
Result MMFilesCollection::deletePrimaryIndex(
arangodb::transaction::Methods* trx, LocalDocumentId const& documentId,
VPackSlice const& doc) {
VPackSlice const& doc, OperationOptions& options) {
TRI_IF_FAILURE("DeletePrimaryIndex") { return Result(TRI_ERROR_DEBUG); }
return primaryIndex()->removeKey(trx, documentId, doc);
return primaryIndex()->removeKey(trx, documentId, doc, options.indexOpMode);
}
/// @brief creates a new entry in the secondary indexes
Result MMFilesCollection::insertSecondaryIndexes(
arangodb::transaction::Methods* trx, LocalDocumentId const& documentId,
VPackSlice const& doc, bool isRollback) {
VPackSlice const& doc, Index::OperationMode mode) {
// Coordinator doesn't know index internals
TRI_ASSERT(!ServerState::instance()->isCoordinator());
TRI_IF_FAILURE("InsertSecondaryIndexes") { return Result(TRI_ERROR_DEBUG); }
@ -3098,7 +3100,7 @@ Result MMFilesCollection::insertSecondaryIndexes(
continue;
}
Result res = idx->insert(trx, documentId, doc, isRollback);
Result res = idx->insert(trx, documentId, doc, mode);
// in case of no-memory, return immediately
if (res.errorNumber() == TRI_ERROR_OUT_OF_MEMORY) {
@ -3119,7 +3121,7 @@ Result MMFilesCollection::insertSecondaryIndexes(
/// @brief deletes an entry from the secondary indexes
Result MMFilesCollection::deleteSecondaryIndexes(
arangodb::transaction::Methods* trx, LocalDocumentId const& documentId,
VPackSlice const& doc, bool isRollback) {
VPackSlice const& doc, Index::OperationMode mode) {
// Coordintor doesn't know index internals
TRI_ASSERT(!ServerState::instance()->isCoordinator());
@ -3144,7 +3146,7 @@ Result MMFilesCollection::deleteSecondaryIndexes(
continue;
}
Result res = idx->remove(trx, documentId, doc, isRollback);
Result res = idx->remove(trx, documentId, doc, mode);
if (res.fail()) {
// an error occurred
@ -3184,9 +3186,10 @@ int MMFilesCollection::detectIndexes(transaction::Methods* trx) {
/// If it returns an error no documents are inserted
Result MMFilesCollection::insertIndexes(arangodb::transaction::Methods* trx,
LocalDocumentId const& documentId,
VPackSlice const& doc) {
VPackSlice const& doc,
OperationOptions& options) {
// insert into primary index first
Result res = insertPrimaryIndex(trx, documentId, doc);
Result res = insertPrimaryIndex(trx, documentId, doc, options);
if (res.fail()) {
// insert has failed
@ -3194,11 +3197,12 @@ Result MMFilesCollection::insertIndexes(arangodb::transaction::Methods* trx,
}
// insert into secondary indexes
res = insertSecondaryIndexes(trx, documentId, doc, false);
res = insertSecondaryIndexes(trx, documentId, doc, options.indexOpMode);
if (res.fail()) {
deleteSecondaryIndexes(trx, documentId, doc, true);
deletePrimaryIndex(trx, documentId, doc);
deleteSecondaryIndexes(trx, documentId, doc,
Index::OperationMode::rollback);
deletePrimaryIndex(trx, documentId, doc, options);
}
return res;
}
@ -3211,8 +3215,9 @@ Result MMFilesCollection::insertDocument(arangodb::transaction::Methods* trx,
VPackSlice const& doc,
MMFilesDocumentOperation& operation,
MMFilesWalMarker const* marker,
OperationOptions& options,
bool& waitForSync) {
Result res = insertIndexes(trx, documentId, doc);
Result res = insertIndexes(trx, documentId, doc, options);
if (res.fail()) {
return res;
}
@ -3338,8 +3343,9 @@ Result MMFilesCollection::update(
result.reset();
}
res = updateDocument(trx, revisionId, oldDocumentId, oldDoc, documentId, newDoc,
operation, marker, options.waitForSync);
res = updateDocument(trx, revisionId, oldDocumentId, oldDoc, documentId,
newDoc, operation, marker, options,
options.waitForSync);
} catch (basics::Exception const& ex) {
res = Result(ex.code());
} catch (std::bad_alloc const&) {
@ -3370,9 +3376,9 @@ Result MMFilesCollection::replace(
transaction::Methods* trx, VPackSlice const newSlice,
ManagedDocumentResult& result, OperationOptions& options,
TRI_voc_tick_t& resultMarkerTick, bool lock, TRI_voc_rid_t& prevRev,
ManagedDocumentResult& previous,
ManagedDocumentResult& previous,
VPackSlice const fromSlice, VPackSlice const toSlice) {
LocalDocumentId const documentId = LocalDocumentId::create();
bool const isEdgeCollection =
(_logicalCollection->type() == TRI_COL_TYPE_EDGE);
@ -3472,8 +3478,9 @@ Result MMFilesCollection::replace(
result.reset();
}
res = updateDocument(trx, revisionId, oldDocumentId, oldDoc, documentId, newDoc,
operation, marker, options.waitForSync);
res = updateDocument(trx, revisionId, oldDocumentId, oldDoc, documentId,
newDoc, operation, marker, options,
options.waitForSync);
} catch (basics::Exception const& ex) {
res = Result(ex.code());
} catch (std::bad_alloc const&) {
@ -3591,17 +3598,20 @@ Result MMFilesCollection::remove(arangodb::transaction::Methods* trx,
MMFilesDocumentDescriptor());
// delete from indexes
res = deleteSecondaryIndexes(trx, oldDocumentId, oldDoc, false);
res = deleteSecondaryIndexes(trx, oldDocumentId, oldDoc,
options.indexOpMode);
if (res.fail()) {
insertSecondaryIndexes(trx, oldDocumentId, oldDoc, true);
insertSecondaryIndexes(trx, oldDocumentId, oldDoc,
Index::OperationMode::rollback);
THROW_ARANGO_EXCEPTION(res);
}
res = deletePrimaryIndex(trx, oldDocumentId, oldDoc);
res = deletePrimaryIndex(trx, oldDocumentId, oldDoc, options);
if (res.fail()) {
insertSecondaryIndexes(trx, oldDocumentId, oldDoc, true);
insertSecondaryIndexes(trx, oldDocumentId, oldDoc,
Index::OperationMode::rollback);
THROW_ARANGO_EXCEPTION(res);
}
@ -3654,12 +3664,13 @@ void MMFilesCollection::deferDropCollection(
}
/// @brief rolls back a document operation
Result MMFilesCollection::rollbackOperation(transaction::Methods* trx,
TRI_voc_document_operation_e type,
LocalDocumentId const& oldDocumentId,
VPackSlice const& oldDoc,
LocalDocumentId const& newDocumentId,
VPackSlice const& newDoc) {
Result MMFilesCollection::rollbackOperation(
transaction::Methods* trx, TRI_voc_document_operation_e type,
LocalDocumentId const& oldDocumentId, VPackSlice const& oldDoc,
LocalDocumentId const& newDocumentId, VPackSlice const& newDoc) {
OperationOptions options;
options.indexOpMode= Index::OperationMode::rollback;
if (type == TRI_VOC_DOCUMENT_OPERATION_INSERT) {
TRI_ASSERT(oldDocumentId.empty());
TRI_ASSERT(oldDoc.isNone());
@ -3667,8 +3678,9 @@ Result MMFilesCollection::rollbackOperation(transaction::Methods* trx,
TRI_ASSERT(!newDoc.isNone());
// ignore any errors we're getting from this
deletePrimaryIndex(trx, newDocumentId, newDoc);
deleteSecondaryIndexes(trx, newDocumentId, newDoc, true);
deletePrimaryIndex(trx, newDocumentId, newDoc, options);
deleteSecondaryIndexes(trx, newDocumentId, newDoc,
Index::OperationMode::rollback);
return TRI_ERROR_NO_ERROR;
}
@ -3680,9 +3692,11 @@ Result MMFilesCollection::rollbackOperation(transaction::Methods* trx,
TRI_ASSERT(!newDoc.isNone());
// remove the current values from the indexes
deleteSecondaryIndexes(trx, newDocumentId, newDoc, true);
deleteSecondaryIndexes(trx, newDocumentId, newDoc,
Index::OperationMode::rollback);
// re-insert old state
return insertSecondaryIndexes(trx, oldDocumentId, oldDoc, true);
return insertSecondaryIndexes(trx, oldDocumentId, oldDoc,
Index::OperationMode::rollback);
}
if (type == TRI_VOC_DOCUMENT_OPERATION_REMOVE) {
@ -3692,10 +3706,11 @@ Result MMFilesCollection::rollbackOperation(transaction::Methods* trx,
TRI_ASSERT(newDocumentId.empty());
TRI_ASSERT(newDoc.isNone());
Result res = insertPrimaryIndex(trx, oldDocumentId, oldDoc);
Result res = insertPrimaryIndex(trx, oldDocumentId, oldDoc, options);
if (res.ok()) {
res = insertSecondaryIndexes(trx, oldDocumentId, oldDoc, true);
res = insertSecondaryIndexes(trx, oldDocumentId, oldDoc,
Index::OperationMode::rollback);
} else {
LOG_TOPIC(ERR, arangodb::Logger::FIXME)
<< "error rolling back remove operation";
@ -3754,17 +3769,20 @@ Result MMFilesCollection::removeFastPath(arangodb::transaction::Methods* trx,
// delete from indexes
Result res;
try {
res = deleteSecondaryIndexes(trx, oldDocumentId, oldDoc, false);
res = deleteSecondaryIndexes(trx, oldDocumentId, oldDoc,
options.indexOpMode);
if (res.fail()) {
insertSecondaryIndexes(trx, oldDocumentId, oldDoc, true);
insertSecondaryIndexes(trx, oldDocumentId, oldDoc,
Index::OperationMode::rollback);
THROW_ARANGO_EXCEPTION(res.errorNumber());
}
res = deletePrimaryIndex(trx, oldDocumentId, oldDoc);
res = deletePrimaryIndex(trx, oldDocumentId, oldDoc, options);
if (res.fail()) {
insertSecondaryIndexes(trx, oldDocumentId, oldDoc, true);
insertSecondaryIndexes(trx, oldDocumentId, oldDoc,
Index::OperationMode::rollback);
THROW_ARANGO_EXCEPTION(res.errorNumber());
}
@ -3831,27 +3849,33 @@ Result MMFilesCollection::lookupDocument(transaction::Methods* trx,
/// the caller must make sure the write lock on the collection is held
Result MMFilesCollection::updateDocument(
transaction::Methods* trx,TRI_voc_rid_t revisionId,
LocalDocumentId const& oldDocumentId,
LocalDocumentId const& oldDocumentId,
VPackSlice const& oldDoc, LocalDocumentId const& newDocumentId,
VPackSlice const& newDoc, MMFilesDocumentOperation& operation,
MMFilesWalMarker const* marker, bool& waitForSync) {
MMFilesWalMarker const* marker, OperationOptions& options,
bool& waitForSync) {
// remove old document from secondary indexes
// (it will stay in the primary index as the key won't change)
Result res = deleteSecondaryIndexes(trx, oldDocumentId, oldDoc, false);
Result res = deleteSecondaryIndexes(trx, oldDocumentId, oldDoc,
options.indexOpMode);
if (res.fail()) {
// re-enter the document in case of failure, ignore errors during rollback
insertSecondaryIndexes(trx, oldDocumentId, oldDoc, true);
insertSecondaryIndexes(trx, oldDocumentId, oldDoc,
Index::OperationMode::rollback);
return res;
}
// insert new document into secondary indexes
res = insertSecondaryIndexes(trx, newDocumentId, newDoc, false);
res = insertSecondaryIndexes(trx, newDocumentId, newDoc,
options.indexOpMode);
if (res.fail()) {
// rollback
deleteSecondaryIndexes(trx, newDocumentId, newDoc, true);
insertSecondaryIndexes(trx, oldDocumentId, oldDoc, true);
deleteSecondaryIndexes(trx, newDocumentId, newDoc,
Index::OperationMode::rollback);
insertSecondaryIndexes(trx, oldDocumentId, oldDoc,
Index::OperationMode::rollback);
return res;
}

View File

@ -129,7 +129,7 @@ class MMFilesCollection final : public PhysicalCollection {
MMFilesCollection(LogicalCollection*, PhysicalCollection const*); // use in cluster only!!!!!
~MMFilesCollection();
static constexpr uint32_t defaultIndexBuckets = 8;
static constexpr double defaultLockTimeout = 10.0 * 60.0;
@ -333,18 +333,18 @@ class MMFilesCollection final : public PhysicalCollection {
Result read(transaction::Methods*, arangodb::StringRef const& key,
ManagedDocumentResult& result, bool) override;
Result read(transaction::Methods*, arangodb::velocypack::Slice const& key,
ManagedDocumentResult& result, bool) override;
bool readDocument(transaction::Methods* trx,
LocalDocumentId const& documentId,
ManagedDocumentResult& result) override;
bool readDocumentWithCallback(transaction::Methods* trx,
LocalDocumentId const& documentId,
IndexIterator::DocumentCallback const& cb) override;
size_t readDocumentWithCallback(transaction::Methods* trx,
std::vector<std::pair<LocalDocumentId, uint8_t const*>>& documentIds,
IndexIterator::DocumentCallback const& cb);
@ -481,7 +481,8 @@ class MMFilesCollection final : public PhysicalCollection {
TRI_voc_rid_t revisionId,
arangodb::velocypack::Slice const& doc,
MMFilesDocumentOperation& operation,
MMFilesWalMarker const* marker, bool& waitForSync);
MMFilesWalMarker const* marker,
OperationOptions& options, bool& waitForSync);
private:
uint8_t const* lookupDocumentVPack(LocalDocumentId const& documentId) const;
@ -508,20 +509,21 @@ class MMFilesCollection final : public PhysicalCollection {
/// @brief Detect all indexes form file
int detectIndexes(transaction::Methods* trx);
Result insertIndexes(transaction::Methods* trx, LocalDocumentId const& documentId,
velocypack::Slice const& doc);
Result insertIndexes(transaction::Methods* trx, LocalDocumentId const& documentId, velocypack::Slice const& doc, OperationOptions& options);
Result insertPrimaryIndex(transaction::Methods*, LocalDocumentId const& documentId,
velocypack::Slice const&);
Result insertPrimaryIndex(transaction::Methods*, LocalDocumentId const& documentId, velocypack::Slice const&, OperationOptions& options);
Result deletePrimaryIndex(transaction::Methods*, LocalDocumentId const& documentId,
velocypack::Slice const&);
Result deletePrimaryIndex(transaction::Methods*, LocalDocumentId const& documentId, velocypack::Slice const&, OperationOptions& options);
Result insertSecondaryIndexes(transaction::Methods*, LocalDocumentId const& documentId,
velocypack::Slice const&, bool isRollback);
Result insertSecondaryIndexes(transaction::Methods*,
LocalDocumentId const& documentId,
velocypack::Slice const&,
Index::OperationMode mode);
Result deleteSecondaryIndexes(transaction::Methods*, LocalDocumentId const& documentId,
velocypack::Slice const&, bool isRollback);
Result deleteSecondaryIndexes(transaction::Methods*,
LocalDocumentId const& documentId,
velocypack::Slice const&,
Index::OperationMode mode);
Result lookupDocument(transaction::Methods*, velocypack::Slice,
ManagedDocumentResult& result);
@ -532,7 +534,8 @@ class MMFilesCollection final : public PhysicalCollection {
LocalDocumentId const& newDocumentId,
velocypack::Slice const& newDoc,
MMFilesDocumentOperation&,
MMFilesWalMarker const*, bool& waitForSync);
MMFilesWalMarker const*, OperationOptions& options,
bool& waitForSync);
private:
mutable arangodb::MMFilesDitches _ditches;

View File

@ -218,17 +218,20 @@ void MMFilesEdgeIndex::toVelocyPackFigures(VPackBuilder& builder) const {
}
Result MMFilesEdgeIndex::insert(transaction::Methods* trx,
LocalDocumentId const& documentId, VPackSlice const& doc,
bool isRollback) {
LocalDocumentId const& documentId,
VPackSlice const& doc,
OperationMode mode) {
MMFilesSimpleIndexElement fromElement(buildFromElement(documentId, doc));
MMFilesSimpleIndexElement toElement(buildToElement(documentId, doc));
ManagedDocumentResult result;
IndexLookupContext context(trx, _collection, &result, 1);
_edgesFrom->insert(&context, fromElement, true, isRollback);
_edgesFrom->insert(&context, fromElement, true,
mode == OperationMode::rollback);
try {
_edgesTo->insert(&context, toElement, true, isRollback);
_edgesTo->insert(&context, toElement, true,
mode == OperationMode::rollback);
} catch (std::bad_alloc const&) {
// roll back partial insert
_edgesFrom->remove(&context, fromElement);
@ -243,8 +246,9 @@ Result MMFilesEdgeIndex::insert(transaction::Methods* trx,
}
Result MMFilesEdgeIndex::remove(transaction::Methods* trx,
LocalDocumentId const& documentId, VPackSlice const& doc,
bool isRollback) {
LocalDocumentId const& documentId,
VPackSlice const& doc,
OperationMode mode) {
MMFilesSimpleIndexElement fromElement(buildFromElement(documentId, doc));
MMFilesSimpleIndexElement toElement(buildToElement(documentId, doc));
@ -256,7 +260,7 @@ Result MMFilesEdgeIndex::remove(transaction::Methods* trx,
_edgesTo->remove(&context, toElement);
return Result(TRI_ERROR_NO_ERROR);
} catch (...) {
if (isRollback) {
if (mode == OperationMode::rollback) {
return Result(TRI_ERROR_NO_ERROR);
}
return IndexResult(TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND, this);

View File

@ -165,10 +165,10 @@ class MMFilesEdgeIndex final : public MMFilesIndex {
void toVelocyPackFigures(VPackBuilder&) const override;
Result insert(transaction::Methods*, LocalDocumentId const& documentId,
arangodb::velocypack::Slice const&, bool isRollback) override;
arangodb::velocypack::Slice const&, OperationMode mode) override;
Result remove(transaction::Methods*, LocalDocumentId const& documentId,
arangodb::velocypack::Slice const&, bool isRollback) override;
arangodb::velocypack::Slice const&, OperationMode mode) override;
void batchInsert(transaction::Methods*,
std::vector<std::pair<LocalDocumentId, VPackSlice>> const&,

View File

@ -200,7 +200,7 @@ bool MMFilesFulltextIndex::matchesDefinition(VPackSlice const& info) const {
Result MMFilesFulltextIndex::insert(transaction::Methods*,
LocalDocumentId const& documentId,
VPackSlice const& doc, bool isRollback) {
VPackSlice const& doc, OperationMode mode) {
int res = TRI_ERROR_NO_ERROR;
std::set<std::string> words = wordlist(doc);
@ -212,7 +212,7 @@ Result MMFilesFulltextIndex::insert(transaction::Methods*,
Result MMFilesFulltextIndex::remove(transaction::Methods*,
LocalDocumentId const& documentId,
VPackSlice const& doc, bool isRollback) {
VPackSlice const& doc, OperationMode mode) {
int res = TRI_ERROR_NO_ERROR;
std::set<std::string> words = wordlist(doc);

View File

@ -66,10 +66,12 @@ class MMFilesFulltextIndex final : public MMFilesIndex {
bool matchesDefinition(VPackSlice const&) const override;
Result insert(transaction::Methods*, LocalDocumentId const& documentId,
arangodb::velocypack::Slice const&, bool isRollback) override;
arangodb::velocypack::Slice const&,
OperationMode mode) override;
Result remove(transaction::Methods*, LocalDocumentId const& documentId,
arangodb::velocypack::Slice const&, bool isRollback) override;
arangodb::velocypack::Slice const&,
OperationMode mode) override;
void load() override {}
void unload() override;

View File

@ -376,8 +376,9 @@ bool MMFilesGeoIndex::matchesDefinition(VPackSlice const& info) const {
return true;
}
Result MMFilesGeoIndex::insert(transaction::Methods*, LocalDocumentId const& documentId,
VPackSlice const& doc, bool isRollback) {
Result MMFilesGeoIndex::insert(transaction::Methods*,
LocalDocumentId const& documentId,
VPackSlice const& doc, OperationMode mode) {
double latitude;
double longitude;
@ -445,8 +446,9 @@ Result MMFilesGeoIndex::insert(transaction::Methods*, LocalDocumentId const& doc
return IndexResult();
}
Result MMFilesGeoIndex::remove(transaction::Methods*, LocalDocumentId const& documentId,
VPackSlice const& doc, bool isRollback) {
Result MMFilesGeoIndex::remove(transaction::Methods*,
LocalDocumentId const& documentId,
VPackSlice const& doc, OperationMode mode) {
double latitude = 0.0;
double longitude = 0.0;
bool ok = true;

View File

@ -134,10 +134,11 @@ class MMFilesGeoIndex final : public MMFilesIndex {
bool matchesDefinition(VPackSlice const& info) const override;
Result insert(transaction::Methods*, LocalDocumentId const& documentId,
arangodb::velocypack::Slice const&, bool isRollback) override;
arangodb::velocypack::Slice const&, OperationMode mode) override;
Result remove(transaction::Methods*, LocalDocumentId const& documentId,
arangodb::velocypack::Slice const&, bool isRollback) override;
arangodb::velocypack::Slice const&,
OperationMode mode) override;
void load() override {}
void unload() override;

View File

@ -28,6 +28,7 @@
#include "Basics/Exceptions.h"
#include "Basics/FixedSizeAllocator.h"
#include "Basics/LocalTaskQueue.h"
#include "Basics/StaticStrings.h"
#include "Basics/VelocyPackHelper.h"
#include "Indexes/IndexLookupContext.h"
#include "Indexes/IndexResult.h"
@ -350,7 +351,7 @@ MMFilesHashIndex::MMFilesHashIndex(TRI_idx_iid_t iid,
if (_unique) {
auto array = std::make_unique<TRI_HashArray_t>(
MMFilesUniqueHashIndexHelper(_paths.size(), _useExpansion),
MMFilesUniqueHashIndexHelper(_paths.size(), _useExpansion),
indexBuckets,
[this]() -> std::string { return this->context(); });
@ -359,7 +360,7 @@ MMFilesHashIndex::MMFilesHashIndex(TRI_idx_iid_t iid,
_multiArray = nullptr;
auto array = std::make_unique<TRI_HashArrayMulti_t>(
MMFilesMultiHashIndexHelper(_paths.size(), _useExpansion),
MMFilesMultiHashIndexHelper(_paths.size(), _useExpansion),
indexBuckets, 64,
[this]() -> std::string { return this->context(); });
@ -472,19 +473,21 @@ bool MMFilesHashIndex::matchesDefinition(VPackSlice const& info) const {
}
Result MMFilesHashIndex::insert(transaction::Methods* trx,
LocalDocumentId const& documentId, VPackSlice const& doc,
bool isRollback) {
LocalDocumentId const& documentId,
VPackSlice const& doc,
OperationMode mode) {
if (_unique) {
return IndexResult(insertUnique(trx, documentId, doc, isRollback), this);
return insertUnique(trx, documentId, doc, mode);
}
return IndexResult(insertMulti(trx, documentId, doc, isRollback), this);
return IndexResult(insertMulti(trx, documentId, doc, mode), this);
}
/// @brief removes an entry from the hash array part of the hash index
Result MMFilesHashIndex::remove(transaction::Methods* trx,
LocalDocumentId const& documentId, VPackSlice const& doc,
bool isRollback) {
LocalDocumentId const& documentId,
VPackSlice const& doc,
OperationMode mode) {
std::vector<MMFilesHashIndexElement*> elements;
int res = fillElement<MMFilesHashIndexElement>(elements, documentId, doc);
@ -498,9 +501,9 @@ Result MMFilesHashIndex::remove(transaction::Methods* trx,
for (auto& hashElement : elements) {
int result;
if (_unique) {
result = removeUniqueElement(trx, hashElement, isRollback);
result = removeUniqueElement(trx, hashElement, mode);
} else {
result = removeMultiElement(trx, hashElement, isRollback);
result = removeMultiElement(trx, hashElement, mode);
}
// we may be looping through this multiple times, and if an error
@ -589,9 +592,10 @@ int MMFilesHashIndex::lookup(
return TRI_ERROR_NO_ERROR;
}
int MMFilesHashIndex::insertUnique(transaction::Methods* trx,
LocalDocumentId const& documentId,
VPackSlice const& doc, bool isRollback) {
Result MMFilesHashIndex::insertUnique(transaction::Methods* trx,
LocalDocumentId const& documentId,
VPackSlice const& doc,
OperationMode mode) {
std::vector<MMFilesHashIndexElement*> elements;
int res = fillElement<MMFilesHashIndexElement>(elements, documentId, doc);
@ -601,13 +605,14 @@ int MMFilesHashIndex::insertUnique(transaction::Methods* trx,
_allocator->deallocate(it);
}
return res;
return IndexResult(res, this);
}
ManagedDocumentResult result;
IndexLookupContext context(trx, _collection, &result, numPaths());
auto work = [this, &context](MMFilesHashIndexElement* element, bool) -> int {
auto work = [this, &context](MMFilesHashIndexElement* element,
OperationMode) -> int {
TRI_IF_FAILURE("InsertHashIndex") { return TRI_ERROR_DEBUG; }
return _uniqueArray->_hashArray->insert(&context, element);
};
@ -616,19 +621,33 @@ int MMFilesHashIndex::insertUnique(transaction::Methods* trx,
for (size_t i = 0; i < n; ++i) {
auto hashElement = elements[i];
res = work(hashElement, isRollback);
res = work(hashElement, mode);
if (res != TRI_ERROR_NO_ERROR) {
IndexResult error(res, this);
if (res == TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED) {
LocalDocumentId rev(_uniqueArray->_hashArray->find(&context, hashElement)->localDocumentId());
ManagedDocumentResult mmdr;
_collection->getPhysical()->readDocument(trx, rev, mmdr);
std::string existingId(
VPackSlice(mmdr.vpack()).get(StaticStrings::KeyString).copyString());
if (mode == OperationMode::internal) {
error = IndexResult(res, existingId);
} else {
error = IndexResult(res, this, existingId);
}
}
for (size_t j = i; j < n; ++j) {
// Free all elements that are not yet in the index
_allocator->deallocate(elements[j]);
}
// Already indexed elements will be removed by the rollback
break;
return error;
}
}
return res;
return IndexResult(res, this);
}
void MMFilesHashIndex::batchInsertUnique(
@ -691,7 +710,7 @@ void MMFilesHashIndex::batchInsertUnique(
int MMFilesHashIndex::insertMulti(transaction::Methods* trx,
LocalDocumentId const& documentId,
VPackSlice const& doc, bool isRollback) {
VPackSlice const& doc, OperationMode mode) {
std::vector<MMFilesHashIndexElement*> elements;
int res = fillElement<MMFilesHashIndexElement>(elements, documentId, doc);
@ -705,7 +724,8 @@ int MMFilesHashIndex::insertMulti(transaction::Methods* trx,
ManagedDocumentResult result;
IndexLookupContext context(trx, _collection, &result, numPaths());
auto work = [this, &context](MMFilesHashIndexElement*& element, bool) {
auto work = [this, &context](MMFilesHashIndexElement*& element,
OperationMode) {
TRI_IF_FAILURE("InsertHashIndex") {
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
}
@ -725,7 +745,7 @@ int MMFilesHashIndex::insertMulti(transaction::Methods* trx,
auto hashElement = elements[i];
try {
work(hashElement, isRollback);
work(hashElement, mode);
} catch (arangodb::basics::Exception const& ex) {
res = ex.code();
} catch (std::bad_alloc const&) {
@ -742,7 +762,7 @@ int MMFilesHashIndex::insertMulti(transaction::Methods* trx,
for (size_t j = 0; j < i; ++j) {
// Remove all already indexed elements and free them
if (elements[j] != nullptr) {
removeMultiElement(trx, elements[j], isRollback);
removeMultiElement(trx, elements[j], mode);
}
}
@ -813,7 +833,7 @@ void MMFilesHashIndex::batchInsertMulti(
int MMFilesHashIndex::removeUniqueElement(transaction::Methods* trx,
MMFilesHashIndexElement* element,
bool isRollback) {
OperationMode mode) {
TRI_IF_FAILURE("RemoveHashIndex") { return TRI_ERROR_DEBUG; }
ManagedDocumentResult result;
IndexLookupContext context(trx, _collection, &result, numPaths());
@ -822,7 +842,8 @@ int MMFilesHashIndex::removeUniqueElement(transaction::Methods* trx,
if (old == nullptr) {
// not found
if (isRollback) { // ignore in this case, because it can happen
if (mode == OperationMode::rollback) { // ignore in this case, because it
// can happen
return TRI_ERROR_NO_ERROR;
}
return TRI_ERROR_INTERNAL;
@ -834,7 +855,7 @@ int MMFilesHashIndex::removeUniqueElement(transaction::Methods* trx,
int MMFilesHashIndex::removeMultiElement(transaction::Methods* trx,
MMFilesHashIndexElement* element,
bool isRollback) {
OperationMode mode) {
TRI_IF_FAILURE("RemoveHashIndex") { return TRI_ERROR_DEBUG; }
ManagedDocumentResult result;
IndexLookupContext context(trx, _collection, &result, numPaths());
@ -843,7 +864,8 @@ int MMFilesHashIndex::removeMultiElement(transaction::Methods* trx,
if (old == nullptr) {
// not found
if (isRollback) { // ignore in this case, because it can happen
if (mode == OperationMode::rollback) { // ignore in this case, because it
// can happen
return TRI_ERROR_NO_ERROR;
}
return TRI_ERROR_INTERNAL;

View File

@ -93,7 +93,7 @@ struct MMFilesHashIndexHelper {
return true;
}
inline bool IsEqualElementElementByKey(void* userData,
inline bool IsEqualElementElementByKey(void* userData,
MMFilesHashIndexElement const* left,
MMFilesHashIndexElement const* right) const {
TRI_ASSERT(left->isSet());
@ -119,14 +119,14 @@ struct MMFilesHashIndexHelper {
return true;
}
size_t const _numFields;
bool const _allowExpansion;
};
struct MMFilesUniqueHashIndexHelper : public MMFilesHashIndexHelper {
MMFilesUniqueHashIndexHelper(size_t n, bool allowExpansion) : MMFilesHashIndexHelper(n, allowExpansion) {}
/// @brief determines if two elements are equal
inline bool IsEqualElementElement(void*,
MMFilesHashIndexElement const* left,
@ -171,7 +171,7 @@ struct MMFilesMultiHashIndexHelper : public MMFilesHashIndexHelper {
return true;
}
};
/// @brief Class to build Slice lookups out of AST Conditions
class MMFilesHashIndexLookupBuilder {
private:
@ -287,10 +287,12 @@ class MMFilesHashIndex final : public MMFilesPathBasedIndex {
bool matchesDefinition(VPackSlice const& info) const override;
Result insert(transaction::Methods*, LocalDocumentId const& documentId,
arangodb::velocypack::Slice const&, bool isRollback) override;
arangodb::velocypack::Slice const&,
OperationMode mode) override;
Result remove(transaction::Methods*, LocalDocumentId const& documentId,
arangodb::velocypack::Slice const&, bool isRollback) override;
arangodb::velocypack::Slice const&,
OperationMode mode) override;
void batchInsert(
transaction::Methods*,
@ -321,8 +323,8 @@ class MMFilesHashIndex final : public MMFilesPathBasedIndex {
int lookup(transaction::Methods*, arangodb::velocypack::Slice,
std::vector<MMFilesHashIndexElement*>&) const;
int insertUnique(transaction::Methods*, LocalDocumentId const& documentId,
arangodb::velocypack::Slice const&, bool isRollback);
Result insertUnique(transaction::Methods*, LocalDocumentId const& documentId,
arangodb::velocypack::Slice const&, OperationMode mode);
void batchInsertUnique(
transaction::Methods*,
@ -330,7 +332,7 @@ class MMFilesHashIndex final : public MMFilesPathBasedIndex {
std::shared_ptr<arangodb::basics::LocalTaskQueue> queue);
int insertMulti(transaction::Methods*, LocalDocumentId const& documentId,
arangodb::velocypack::Slice const&, bool isRollback);
arangodb::velocypack::Slice const&, OperationMode mode);
void batchInsertMulti(
transaction::Methods*,
@ -338,9 +340,10 @@ class MMFilesHashIndex final : public MMFilesPathBasedIndex {
std::shared_ptr<arangodb::basics::LocalTaskQueue> queue);
int removeUniqueElement(transaction::Methods*, MMFilesHashIndexElement*,
bool);
OperationMode mode);
int removeMultiElement(transaction::Methods*, MMFilesHashIndexElement*, bool);
int removeMultiElement(transaction::Methods*, MMFilesHashIndexElement*,
OperationMode mode);
bool accessFitsIndex(arangodb::aql::AstNode const* access,
arangodb::aql::AstNode const* other,

View File

@ -275,6 +275,7 @@ Result handleSyncKeysMMFiles(arangodb::DatabaseInitialSyncer& syncer,
options.silent = true;
options.ignoreRevs = true;
options.isRestore = true;
options.indexOpMode = Index::OperationMode::internal;
if (!syncer._leaderId.empty()) {
options.isSynchronousReplicationFrom = syncer._leaderId;
}
@ -594,11 +595,11 @@ Result handleSyncKeysMMFiles(arangodb::DatabaseInitialSyncer& syncer,
keysBuilder.add(VPackValue(it));
}
keysBuilder.close();
std::string const keyJsonString(keysBuilder.slice().toJson());
size_t offsetInChunk = 0;
while (true) {
std::string url = baseUrl + "/" + keysId +
"?type=docs&chunk=" + std::to_string(currentChunkId) +
@ -660,23 +661,56 @@ Result handleSyncKeysMMFiles(arangodb::DatabaseInitialSyncer& syncer,
MMFilesSimpleIndexElement element = idx->lookupKey(&trx, keySlice);
auto removeConflict = [&](std::string conflictingKey) -> OperationResult {
VPackBuilder conflict;
conflict.add(VPackValue(conflictingKey));
LocalDocumentId conflictId = physical->lookupKey(&trx, conflict.slice());
if (conflictId.isSet()) {
ManagedDocumentResult mmdr;
bool success = physical->readDocument(&trx, conflictId, mmdr);
if (success) {
VPackSlice conflictingKey(mmdr.vpack());
return trx.remove(collectionName, conflictingKey, options);
}
}
return OperationResult(TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND);
};
if (!element) {
// INSERT
OperationResult opRes = trx.insert(collectionName, it, options);
if (opRes.code != TRI_ERROR_NO_ERROR) {
if (opRes.errorMessage.empty()) {
return Result(opRes.code);
}
return Result(opRes.code, opRes.errorMessage);
if (opRes.code == TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED && opRes.errorMessage > keySlice.copyString()) {
// remove conflict and retry
auto inner = removeConflict(opRes.errorMessage);
if (inner.code != TRI_ERROR_NO_ERROR) {
return opRes.errorMessage.empty() ? Result(opRes.code) : Result(opRes.code, opRes.errorMessage);
}
opRes = trx.insert(collectionName, it, options);
if (opRes.code != TRI_ERROR_NO_ERROR) {
return opRes.errorMessage.empty() ? Result(opRes.code) : Result(opRes.code, opRes.errorMessage);
}
} else {
return opRes.errorMessage.empty() ? Result(opRes.code) : Result(opRes.code, opRes.errorMessage);
}
}
} else {
// UPDATE
OperationResult opRes = trx.replace(collectionName, it, options);
if (opRes.code != TRI_ERROR_NO_ERROR) {
if (opRes.errorMessage.empty()) {
return Result(opRes.code);
if (opRes.code == TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED && opRes.errorMessage > keySlice.copyString()) {
// remove conflict and retry
auto inner = removeConflict(opRes.errorMessage);
if (inner.code != TRI_ERROR_NO_ERROR) {
return opRes.errorMessage.empty() ? Result(opRes.code) : Result(opRes.code, opRes.errorMessage);
}
opRes = trx.update(collectionName, it, options);
if (opRes.code != TRI_ERROR_NO_ERROR) {
return opRes.errorMessage.empty() ? Result(opRes.code) : Result(opRes.code, opRes.errorMessage);
}
} else {
return opRes.errorMessage.empty() ? Result(opRes.code) : Result(opRes.code, opRes.errorMessage);
}
return Result(opRes.code, opRes.errorMessage);
}
}
}

View File

@ -219,7 +219,8 @@ size_t MMFilesPersistentIndex::memory() const {
/// @brief inserts a document into the index
Result MMFilesPersistentIndex::insert(transaction::Methods* trx,
LocalDocumentId const& documentId,
VPackSlice const& doc, bool isRollback) {
VPackSlice const& doc,
OperationMode mode) {
std::vector<MMFilesSkiplistIndexElement*> elements;
int res;
@ -322,6 +323,7 @@ Result MMFilesPersistentIndex::insert(transaction::Methods* trx,
rocksdb::ReadOptions readOptions;
size_t const count = elements.size();
std::string existingId;
for (size_t i = 0; i < count; ++i) {
if (_unique) {
bool uniqueConstraintViolated = false;
@ -338,6 +340,10 @@ Result MMFilesPersistentIndex::insert(transaction::Methods* trx,
if (res <= 0) {
uniqueConstraintViolated = true;
VPackSlice slice(comparator->extractKeySlice(iterator->key()));
uint64_t length = slice.length();
TRI_ASSERT(length > 0);
existingId = slice.at(length - 1).copyString();
}
}
@ -378,13 +384,21 @@ Result MMFilesPersistentIndex::insert(transaction::Methods* trx,
}
}
if (res == TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED) {
if (mode == OperationMode::internal) {
return IndexResult(res, existingId);
}
return IndexResult(res, this, existingId);
}
return IndexResult(res, this);
}
/// @brief removes a document from the index
Result MMFilesPersistentIndex::remove(transaction::Methods* trx,
LocalDocumentId const& documentId,
VPackSlice const& doc, bool isRollback) {
VPackSlice const& doc,
OperationMode mode) {
std::vector<MMFilesSkiplistIndexElement*> elements;
int res;

View File

@ -162,10 +162,12 @@ class MMFilesPersistentIndex final : public MMFilesPathBasedIndex {
}
Result insert(transaction::Methods*, LocalDocumentId const& documentId,
arangodb::velocypack::Slice const&, bool isRollback) override;
arangodb::velocypack::Slice const&,
OperationMode mode) override;
Result remove(transaction::Methods*, LocalDocumentId const& documentId,
arangodb::velocypack::Slice const&, bool isRollback) override;
arangodb::velocypack::Slice const&,
OperationMode mode) override;
void unload() override {}

View File

@ -245,8 +245,9 @@ void MMFilesPrimaryIndex::toVelocyPackFigures(VPackBuilder& builder) const {
_primaryIndex->appendToVelocyPack(builder);
}
Result MMFilesPrimaryIndex::insert(transaction::Methods*, LocalDocumentId const&,
VPackSlice const&, bool) {
Result MMFilesPrimaryIndex::insert(transaction::Methods*,
LocalDocumentId const&,
VPackSlice const&, OperationMode) {
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
LOG_TOPIC(WARN, arangodb::Logger::FIXME)
<< "insert() called for primary index";
@ -255,8 +256,9 @@ Result MMFilesPrimaryIndex::insert(transaction::Methods*, LocalDocumentId const&
"insert() called for primary index");
}
Result MMFilesPrimaryIndex::remove(transaction::Methods*, LocalDocumentId const&,
VPackSlice const&, bool) {
Result MMFilesPrimaryIndex::remove(transaction::Methods*,
LocalDocumentId const&,
VPackSlice const&, OperationMode) {
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
LOG_TOPIC(WARN, arangodb::Logger::FIXME)
<< "remove() called for primary index";
@ -366,28 +368,51 @@ MMFilesSimpleIndexElement MMFilesPrimaryIndex::lookupSequentialReverse(
/// returns a status code, and *found will contain a found element (if any)
Result MMFilesPrimaryIndex::insertKey(transaction::Methods* trx,
LocalDocumentId const& documentId,
VPackSlice const& doc) {
VPackSlice const& doc,
OperationMode mode) {
ManagedDocumentResult result;
IndexLookupContext context(trx, _collection, &result, 1);
MMFilesSimpleIndexElement element(buildKeyElement(documentId, doc));
return IndexResult(_primaryIndex->insert(&context, element), this);
int res = _primaryIndex->insert(&context, element);
if (res == TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED) {
std::string existingId(doc.get(StaticStrings::KeyString).copyString());
if (mode == OperationMode::internal) {
return IndexResult(res, existingId);
}
return IndexResult(res, this, existingId);
}
return IndexResult(res, this);
}
Result MMFilesPrimaryIndex::insertKey(transaction::Methods* trx,
LocalDocumentId const& documentId,
VPackSlice const& doc,
ManagedDocumentResult& mmdr) {
ManagedDocumentResult& mmdr,
OperationMode mode) {
IndexLookupContext context(trx, _collection, &mmdr, 1);
MMFilesSimpleIndexElement element(buildKeyElement(documentId, doc));
return IndexResult(_primaryIndex->insert(&context, element), this);
int res = _primaryIndex->insert(&context, element);
if (res == TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED) {
std::string existingId(doc.get(StaticStrings::KeyString).copyString());
if (mode == OperationMode::internal) {
return IndexResult(res, existingId);
}
return IndexResult(res, this, existingId);
}
return IndexResult(res, this);
}
/// @brief removes an key/element from the index
Result MMFilesPrimaryIndex::removeKey(transaction::Methods* trx,
LocalDocumentId const&,
VPackSlice const& doc) {
VPackSlice const& doc,
OperationMode mode) {
ManagedDocumentResult result;
IndexLookupContext context(trx, _collection, &result, 1);
@ -405,7 +430,8 @@ Result MMFilesPrimaryIndex::removeKey(transaction::Methods* trx,
Result MMFilesPrimaryIndex::removeKey(transaction::Methods* trx,
LocalDocumentId const&,
VPackSlice const& doc,
ManagedDocumentResult& mmdr) {
ManagedDocumentResult& mmdr,
OperationMode mode) {
IndexLookupContext context(trx, _collection, &mmdr, 1);
VPackSlice keySlice(transaction::helpers::extractKeyFromDocument(doc));

View File

@ -201,10 +201,12 @@ class MMFilesPrimaryIndex final : public MMFilesIndex {
void toVelocyPackFigures(VPackBuilder&) const override;
Result insert(transaction::Methods*, LocalDocumentId const& documentId,
arangodb::velocypack::Slice const&, bool isRollback) override;
arangodb::velocypack::Slice const&,
OperationMode mode) override;
Result remove(transaction::Methods*, LocalDocumentId const& documentId,
arangodb::velocypack::Slice const&, bool isRollback) override;
arangodb::velocypack::Slice const&,
OperationMode mode) override;
void load() override {}
void unload() override;
@ -248,14 +250,16 @@ class MMFilesPrimaryIndex final : public MMFilesIndex {
transaction::Methods*, arangodb::basics::BucketPosition& position);
Result insertKey(transaction::Methods*, LocalDocumentId const& documentId,
arangodb::velocypack::Slice const&);
arangodb::velocypack::Slice const&, OperationMode mode);
Result insertKey(transaction::Methods*, LocalDocumentId const& documentId,
arangodb::velocypack::Slice const&, ManagedDocumentResult&);
arangodb::velocypack::Slice const&, ManagedDocumentResult&,
OperationMode mode);
Result removeKey(transaction::Methods*, LocalDocumentId const& documentId,
arangodb::velocypack::Slice const&);
arangodb::velocypack::Slice const&, OperationMode mode);
Result removeKey(transaction::Methods*, LocalDocumentId const& documentId,
arangodb::velocypack::Slice const&, ManagedDocumentResult&);
arangodb::velocypack::Slice const&, ManagedDocumentResult&,
OperationMode mode);
int resize(transaction::Methods*, size_t);

View File

@ -32,6 +32,7 @@
#include "Indexes/IndexLookupContext.h"
#include "Indexes/IndexResult.h"
#include "Indexes/SimpleAttributeEqualityMatcher.h"
#include "StorageEngine/PhysicalCollection.h"
#include "Transaction/Helpers.h"
#include "Transaction/Methods.h"
#include "VocBase/LogicalCollection.h"
@ -710,7 +711,7 @@ void MMFilesSkiplistIndex::toVelocyPackFigures(VPackBuilder& builder) const {
/// @brief inserts a document into a skiplist index
Result MMFilesSkiplistIndex::insert(transaction::Methods* trx,
LocalDocumentId const& documentId,
VPackSlice const& doc, bool isRollback) {
VPackSlice const& doc, OperationMode mode) {
std::vector<MMFilesSkiplistIndexElement*> elements;
int res;
@ -739,10 +740,13 @@ Result MMFilesSkiplistIndex::insert(transaction::Methods* trx,
// by the index
size_t const count = elements.size();
int badIndex = 0;
for (size_t i = 0; i < count; ++i) {
res = _skiplistIndex->insert(&context, elements[i]);
if (res != TRI_ERROR_NO_ERROR) {
badIndex = i;
// Note: this element is freed already
for (size_t j = i; j < count; ++j) {
_allocator->deallocate(elements[j]);
@ -753,20 +757,63 @@ Result MMFilesSkiplistIndex::insert(transaction::Methods* trx,
}
if (res == TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED && !_unique) {
// We ignore unique_constraint violated if we are not unique
res = TRI_ERROR_NO_ERROR;
// We ignore unique_constraint violated if we are not unique
res = TRI_ERROR_NO_ERROR;
}
break;
}
}
if (res == TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED) {
elements.clear();
// need to rebuild elements, find conflicting key to return error,
// and then free elements again
int innerRes = TRI_ERROR_NO_ERROR;
try {
innerRes = fillElement<MMFilesSkiplistIndexElement>(elements, documentId, doc);
} catch (basics::Exception const& ex) {
innerRes = ex.code();
} catch (std::bad_alloc const&) {
innerRes = TRI_ERROR_OUT_OF_MEMORY;
} catch (...) {
innerRes = TRI_ERROR_INTERNAL;
}
auto cleanup = [this, &elements] {
for (auto& element : elements) {
// free all elements to prevent leak
_allocator->deallocate(element);
}
};
TRI_DEFER(cleanup());
if (innerRes != TRI_ERROR_NO_ERROR) {
return IndexResult(innerRes, this);
}
auto found = _skiplistIndex->rightLookup(&context, elements[badIndex]);
TRI_ASSERT(found);
LocalDocumentId rev(found->document()->localDocumentId());
ManagedDocumentResult mmdr;
_collection->getPhysical()->readDocument(trx, rev, mmdr);
std::string existingId(VPackSlice(mmdr.vpack())
.get(StaticStrings::KeyString)
.copyString());
if (mode == OperationMode::internal) {
return IndexResult(res, existingId);
}
return IndexResult(res, this, existingId);
}
return IndexResult(res, this);
}
/// @brief removes a document from a skiplist index
Result MMFilesSkiplistIndex::remove(transaction::Methods* trx,
LocalDocumentId const& documentId,
VPackSlice const& doc, bool isRollback) {
VPackSlice const& doc, OperationMode mode) {
std::vector<MMFilesSkiplistIndexElement*> elements;
int res;

View File

@ -285,10 +285,12 @@ class MMFilesSkiplistIndex final : public MMFilesPathBasedIndex {
void toVelocyPackFigures(VPackBuilder&) const override;
Result insert(transaction::Methods*, LocalDocumentId const& documentId,
arangodb::velocypack::Slice const&, bool isRollback) override;
arangodb::velocypack::Slice const&,
OperationMode mode) override;
Result remove(transaction::Methods*, LocalDocumentId const& documentId,
arangodb::velocypack::Slice const&, bool isRollback) override;
arangodb::velocypack::Slice const&,
OperationMode mode) override;
void unload() override;

View File

@ -152,7 +152,7 @@ Result RocksDBCollection::updateProperties(VPackSlice const& slice, bool doSync)
primaryIndex()->createCache();
} else if (useCache()) {
destroyCache();
primaryIndex()->destroyCache();
primaryIndex()->destroyCache();
}
// nothing else to do
@ -757,7 +757,7 @@ void RocksDBCollection::truncate(transaction::Methods* trx,
rindex->truncate(trx);
}
_needToPersistIndexEstimates = true;
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
// check if documents have been deleted
if (mthd->countInBounds(documentBounds, true)) {
@ -873,7 +873,7 @@ Result RocksDBCollection::insert(arangodb::transaction::Methods* trx,
state->prepareOperation(_logicalCollection->cid(), revisionId, StringRef(),
TRI_VOC_DOCUMENT_OPERATION_INSERT);
res = insertDocument(trx, documentId, newSlice, options.waitForSync);
res = insertDocument(trx, documentId, newSlice, options, options.waitForSync);
if (res.ok()) {
Result lookupResult = lookupDocumentVPack(documentId, trx, mdr, false);
@ -978,7 +978,7 @@ Result RocksDBCollection::update(arangodb::transaction::Methods* trx,
state->prepareOperation(_logicalCollection->cid(), revisionId, StringRef(),
TRI_VOC_DOCUMENT_OPERATION_UPDATE);
res = updateDocument(trx, oldDocumentId, oldDoc, documentId, newDoc,
options.waitForSync);
options, options.waitForSync);
if (res.ok()) {
mdr.setManaged(newDoc.begin(), documentId);
@ -1075,7 +1075,8 @@ Result RocksDBCollection::replace(
TRI_VOC_DOCUMENT_OPERATION_REPLACE);
RocksDBOperationResult opResult = updateDocument(
trx, oldDocumentId, oldDoc, documentId, newDoc, options.waitForSync);
trx, oldDocumentId, oldDoc, documentId, newDoc, options,
options.waitForSync);
if (opResult.ok()) {
mdr.setManaged(newDoc.begin(), documentId);
TRI_ASSERT(!mdr.empty());
@ -1102,7 +1103,7 @@ Result RocksDBCollection::remove(arangodb::transaction::Methods* trx,
arangodb::ManagedDocumentResult& previous,
OperationOptions& options,
TRI_voc_tick_t& resultMarkerTick,
bool /*lock*/,
bool /*lock*/,
TRI_voc_rid_t& prevRev,
TRI_voc_rid_t& revisionId) {
// store the tick that was used for writing the document
@ -1153,9 +1154,10 @@ Result RocksDBCollection::remove(arangodb::transaction::Methods* trx,
[&state]() { state->resetLogState(); });
// add possible log statement under guard
state->prepareOperation(_logicalCollection->cid(), documentId.id(), StringRef(key),
TRI_VOC_DOCUMENT_OPERATION_REMOVE);
res = removeDocument(trx, oldDocumentId, oldDoc, false, options.waitForSync);
state->prepareOperation(_logicalCollection->cid(), documentId.id(),
StringRef(key),TRI_VOC_DOCUMENT_OPERATION_REMOVE);
res = removeDocument(trx, oldDocumentId, oldDoc, options, false,
options.waitForSync);
if (res.ok()) {
// report key size
res = state->addOperation(_logicalCollection->cid(), documentId.id(),
@ -1315,7 +1317,8 @@ arangodb::Result RocksDBCollection::fillIndexes(
arangodb::Result res;
auto cb = [&](LocalDocumentId const& documentId, VPackSlice slice) {
if (res.ok()) {
res = ridx->insertInternal(trx, &batched, documentId, slice);
res = ridx->insertInternal(trx, &batched, documentId, slice,
Index::OperationMode::normal);
if (res.ok()) {
numDocsWritten++;
}
@ -1352,7 +1355,8 @@ arangodb::Result RocksDBCollection::fillIndexes(
this->readDocument(trx, token, mmdr)) {
// we need to remove already inserted documents up to numDocsWritten
res2 = ridx->removeInternal(trx, &batched, mmdr.localDocumentId(),
VPackSlice(mmdr.vpack()));
VPackSlice(mmdr.vpack()),
Index::OperationMode::rollback);
if (res2.ok()) {
numDocsWritten--;
}
@ -1375,7 +1379,7 @@ arangodb::Result RocksDBCollection::fillIndexes(
RocksDBOperationResult RocksDBCollection::insertDocument(
arangodb::transaction::Methods* trx, LocalDocumentId const& documentId,
VPackSlice const& doc, bool& waitForSync) const {
VPackSlice const& doc, OperationOptions& options, bool& waitForSync) const {
RocksDBOperationResult res;
// Coordinator doesn't know index internals
TRI_ASSERT(!ServerState::instance()->isCoordinator());
@ -1399,7 +1403,8 @@ RocksDBOperationResult RocksDBCollection::insertDocument(
READ_LOCKER(guard, _indexesLock);
for (std::shared_ptr<Index> const& idx : _indexes) {
RocksDBIndex* rIdx = static_cast<RocksDBIndex*>(idx.get());
Result tmpres = rIdx->insertInternal(trx, mthd, documentId, doc);
Result tmpres = rIdx->insertInternal(trx, mthd, documentId, doc,
options.indexOpMode);
if (!tmpres.ok()) {
if (tmpres.is(TRI_ERROR_OUT_OF_MEMORY)) {
// in case of OOM return immediately
@ -1428,7 +1433,8 @@ RocksDBOperationResult RocksDBCollection::insertDocument(
RocksDBOperationResult RocksDBCollection::removeDocument(
arangodb::transaction::Methods* trx, LocalDocumentId const& documentId,
VPackSlice const& doc, bool isUpdate, bool& waitForSync) const {
VPackSlice const& doc, OperationOptions& options, bool isUpdate,
bool& waitForSync) const {
// Coordinator doesn't know index internals
TRI_ASSERT(!ServerState::instance()->isCoordinator());
TRI_ASSERT(trx->state()->isRunning());
@ -1460,7 +1466,7 @@ RocksDBOperationResult RocksDBCollection::removeDocument(
RocksDBOperationResult resInner;
READ_LOCKER(guard, _indexesLock);
for (std::shared_ptr<Index> const& idx : _indexes) {
Result tmpres = idx->remove(trx, documentId, doc, false);
Result tmpres = idx->remove(trx, documentId, doc, options.indexOpMode);
if (!tmpres.ok()) {
if (tmpres.is(TRI_ERROR_OUT_OF_MEMORY)) {
// in case of OOM return immediately
@ -1505,7 +1511,8 @@ RocksDBOperationResult RocksDBCollection::lookupDocument(
RocksDBOperationResult RocksDBCollection::updateDocument(
transaction::Methods* trx, LocalDocumentId const& oldDocumentId,
VPackSlice const& oldDoc, LocalDocumentId const& newDocumentId,
VPackSlice const& newDoc, bool& waitForSync) const {
VPackSlice const& newDoc, OperationOptions& options,
bool& waitForSync) const {
// keysize in return value is set by insertDocument
// Coordinator doesn't know index internals
@ -1546,7 +1553,8 @@ RocksDBOperationResult RocksDBCollection::updateDocument(
for (std::shared_ptr<Index> const& idx : _indexes) {
RocksDBIndex* rIdx = static_cast<RocksDBIndex*>(idx.get());
Result tmpres = rIdx->updateInternal(trx, mthd, oldDocumentId, oldDoc,
newDocumentId, newDoc);
newDocumentId, newDoc,
options.indexOpMode);
if (!tmpres.ok()) {
if (tmpres.is(TRI_ERROR_OUT_OF_MEMORY)) {
// in case of OOM return immediately

View File

@ -229,12 +229,13 @@ class RocksDBCollection final : public PhysicalCollection {
arangodb::RocksDBOperationResult insertDocument(
arangodb::transaction::Methods* trx, LocalDocumentId const& documentId,
arangodb::velocypack::Slice const& doc, bool& waitForSync) const;
arangodb::velocypack::Slice const& doc, OperationOptions& options,
bool& waitForSync) const;
arangodb::RocksDBOperationResult removeDocument(
arangodb::transaction::Methods* trx, LocalDocumentId const& documentId,
arangodb::velocypack::Slice const& doc, bool isUpdate,
bool& waitForSync) const;
arangodb::velocypack::Slice const& doc, OperationOptions& options,
bool isUpdate, bool& waitForSync) const;
arangodb::RocksDBOperationResult lookupDocument(
transaction::Methods* trx, arangodb::velocypack::Slice const& key,
@ -242,10 +243,12 @@ class RocksDBCollection final : public PhysicalCollection {
arangodb::RocksDBOperationResult updateDocument(
transaction::Methods* trx, LocalDocumentId const& oldDocumentId,
arangodb::velocypack::Slice const& oldDoc, LocalDocumentId const& newDocumentId,
arangodb::velocypack::Slice const& newDoc, bool& waitForSync) const;
arangodb::velocypack::Slice const& oldDoc,
LocalDocumentId const& newDocumentId,
arangodb::velocypack::Slice const& newDoc, OperationOptions& options,
bool& waitForSync) const;
arangodb::Result lookupDocumentVPack(LocalDocumentId const& documentId,
arangodb::Result lookupDocumentVPack(LocalDocumentId const& documentId,
transaction::Methods*,
arangodb::ManagedDocumentResult&,
bool withCache) const;

View File

@ -447,7 +447,8 @@ void RocksDBEdgeIndex::toVelocyPack(VPackBuilder& builder, bool withFigures,
Result RocksDBEdgeIndex::insertInternal(transaction::Methods* trx,
RocksDBMethods* mthd,
LocalDocumentId const& documentId,
VPackSlice const& doc) {
VPackSlice const& doc,
OperationMode mode) {
VPackSlice fromTo = doc.get(_directionAttr);
TRI_ASSERT(fromTo.isString());
auto fromToRef = StringRef(fromTo);
@ -478,7 +479,8 @@ Result RocksDBEdgeIndex::insertInternal(transaction::Methods* trx,
Result RocksDBEdgeIndex::removeInternal(transaction::Methods* trx,
RocksDBMethods* mthd,
LocalDocumentId const& documentId,
VPackSlice const& doc) {
VPackSlice const& doc,
OperationMode mode) {
// VPackSlice primaryKey = doc.get(StaticStrings::KeyString);
VPackSlice fromTo = doc.get(_directionAttr);
auto fromToRef = StringRef(fromTo);

View File

@ -163,13 +163,15 @@ class RocksDBEdgeIndex final : public RocksDBIndex {
void recalculateEstimates() override;
Result insertInternal(transaction::Methods*, RocksDBMethods*,
Result insertInternal(transaction::Methods*, RocksDBMethods*,
LocalDocumentId const& documentId,
arangodb::velocypack::Slice const&) override;
arangodb::velocypack::Slice const&,
OperationMode mode) override;
Result removeInternal(transaction::Methods*, RocksDBMethods*,
Result removeInternal(transaction::Methods*, RocksDBMethods*,
LocalDocumentId const& documentId,
arangodb::velocypack::Slice const&) override;
arangodb::velocypack::Slice const&,
OperationMode mode) override;
protected:
Result postprocessRemove(transaction::Methods* trx, rocksdb::Slice const& key,
@ -188,10 +190,10 @@ class RocksDBEdgeIndex final : public RocksDBIndex {
/// @brief add a single value node to the iterator's keys
void handleValNode(VPackBuilder* keys,
arangodb::aql::AstNode const* valNode) const;
void warmupInternal(transaction::Methods* trx,
rocksdb::Slice const& lower, rocksdb::Slice const& upper);
private:
std::string _directionAttr;

View File

@ -50,7 +50,7 @@ RocksDBFulltextIndex::RocksDBFulltextIndex(
: RocksDBIndex(iid, collection, info, RocksDBColumnFamily::fulltext(), false),
_minWordLength(TRI_FULLTEXT_MIN_WORD_LENGTH_DEFAULT) {
TRI_ASSERT(iid != 0);
TRI_ASSERT(_cf == RocksDBColumnFamily::fulltext());
TRI_ASSERT(_cf == RocksDBColumnFamily::fulltext());
VPackSlice const value = info.get("minLength");
@ -171,7 +171,8 @@ bool RocksDBFulltextIndex::matchesDefinition(VPackSlice const& info) const {
Result RocksDBFulltextIndex::insertInternal(transaction::Methods* trx,
RocksDBMethods* mthd,
LocalDocumentId const& documentId,
VPackSlice const& doc) {
VPackSlice const& doc,
OperationMode mode) {
std::set<std::string> words = wordlist(doc);
if (words.empty()) {
return TRI_ERROR_NO_ERROR;
@ -199,7 +200,8 @@ Result RocksDBFulltextIndex::insertInternal(transaction::Methods* trx,
Result RocksDBFulltextIndex::removeInternal(transaction::Methods* trx,
RocksDBMethods* mthd,
LocalDocumentId const& documentId,
VPackSlice const& doc) {
VPackSlice const& doc,
OperationMode mode) {
std::set<std::string> words = wordlist(doc);
if (words.empty()) {
return IndexResult();

View File

@ -107,11 +107,14 @@ class RocksDBFulltextIndex final : public RocksDBIndex {
/// insert index elements into the specified write batch.
Result insertInternal(transaction::Methods* trx, RocksDBMethods*,
LocalDocumentId const& documentId,
arangodb::velocypack::Slice const&) override;
arangodb::velocypack::Slice const&,
OperationMode mode) override;
/// remove index elements and put it in the specified write batch.
Result removeInternal(transaction::Methods*, RocksDBMethods*, LocalDocumentId const& documentId,
arangodb::velocypack::Slice const&) override;
Result removeInternal(transaction::Methods*, RocksDBMethods*,
LocalDocumentId const& documentId,
arangodb::velocypack::Slice const&,
OperationMode mode) override;
private:
std::set<std::string> wordlist(arangodb::velocypack::Slice const&);

View File

@ -413,7 +413,8 @@ bool RocksDBGeoIndex::matchesDefinition(VPackSlice const& info) const {
Result RocksDBGeoIndex::insertInternal(transaction::Methods* trx,
RocksDBMethods* mthd,
LocalDocumentId const& documentId,
velocypack::Slice const& doc) {
velocypack::Slice const& doc,
OperationMode mode) {
// GeoIndex is always exclusively write-locked with rocksdb
double latitude;
double longitude;
@ -483,7 +484,8 @@ Result RocksDBGeoIndex::insertInternal(transaction::Methods* trx,
Result RocksDBGeoIndex::removeInternal(transaction::Methods* trx,
RocksDBMethods* mthd,
LocalDocumentId const& documentId,
arangodb::velocypack::Slice const& doc) {
arangodb::velocypack::Slice const& doc,
OperationMode mode) {
// GeoIndex is always exclusively write-locked with rocksdb
double latitude = 0.0;
double longitude = 0.0;

View File

@ -130,7 +130,7 @@ class RocksDBGeoIndex final : public RocksDBIndex {
bool isSorted() const override { return true; }
bool hasSelectivityEstimate() const override { return false; }
void toVelocyPack(VPackBuilder&, bool, bool) const override;
// Uses default toVelocyPackFigures
@ -163,12 +163,14 @@ class RocksDBGeoIndex final : public RocksDBIndex {
/// insert index elements into the specified write batch.
Result insertInternal(transaction::Methods* trx, RocksDBMethods*,
LocalDocumentId const& documentId,
arangodb::velocypack::Slice const&) override;
arangodb::velocypack::Slice const&,
OperationMode mode) override;
/// remove index elements and put it in the specified write batch.
Result removeInternal(transaction::Methods*, RocksDBMethods*,
Result removeInternal(transaction::Methods*, RocksDBMethods*,
LocalDocumentId const& documentId,
arangodb::velocypack::Slice const&) override;
arangodb::velocypack::Slice const&,
OperationMode mode) override;
private:
/// internal insert function, set batch or trx before calling

View File

@ -42,7 +42,7 @@
#include <velocypack/velocypack-aliases.h>
namespace arangodb {
Result syncChunkRocksDB(DatabaseInitialSyncer& syncer,
Result syncChunkRocksDB(DatabaseInitialSyncer& syncer,
SingleCollectionTransaction* trx,
std::string const& keysId, uint64_t chunkId, std::string const& lowString,
std::string const& highString,
@ -55,6 +55,7 @@ Result syncChunkRocksDB(DatabaseInitialSyncer& syncer,
options.silent = true;
options.ignoreRevs = true;
options.isRestore = true;
options.indexOpMode = Index::OperationMode::internal;
if (!syncer._leaderId.empty()) {
options.isSynchronousReplicationFrom = syncer._leaderId;
}
@ -200,7 +201,7 @@ Result syncChunkRocksDB(DatabaseInitialSyncer& syncer,
}
++nextStart;
}
if (toFetch.empty()) {
// nothing to do
return Result();
@ -215,21 +216,21 @@ Result syncChunkRocksDB(DatabaseInitialSyncer& syncer,
keysBuilder.add(VPackValue(it));
}
keysBuilder.close();
std::string const keyJsonString(keysBuilder.slice().toJson());
size_t offsetInChunk = 0;
while (true) {
std::string url = baseUrl + "/" + keysId + "?type=docs&chunk=" +
std::to_string(chunkId) + "&chunkSize=" +
std::to_string(chunkSize) + "&low=" + lowString +
std::to_string(chunkSize) + "&low=" + lowString +
"&offset=" + std::to_string(offsetInChunk);
progress = "fetching documents chunk " + std::to_string(chunkId) +
" for collection '" + collectionName + "' from " + url;
syncer.setProgress(progress);
std::unique_ptr<httpclient::SimpleHttpResult> response(
syncer._client->retryRequest(rest::RequestType::PUT, url,
keyJsonString.c_str(),
@ -282,23 +283,56 @@ Result syncChunkRocksDB(DatabaseInitialSyncer& syncer,
LocalDocumentId const documentId = physical->lookupKey(trx, keySlice);
auto removeConflict = [&](std::string conflictingKey) -> OperationResult {
VPackBuilder conflict;
conflict.add(VPackValue(conflictingKey));
LocalDocumentId conflictId = physical->lookupKey(trx, conflict.slice());
if (conflictId.isSet()) {
ManagedDocumentResult mmdr;
bool success = physical->readDocument(trx, conflictId, mmdr);
if (success) {
VPackSlice conflictingKey(mmdr.vpack());
return trx->remove(collectionName, conflictingKey, options);
}
}
return OperationResult(TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND);
};
if (!documentId.isSet()) {
// INSERT
OperationResult opRes = trx->insert(collectionName, it, options);
if (opRes.code != TRI_ERROR_NO_ERROR) {
if (opRes.errorMessage.empty()) {
return Result(opRes.code);
if (opRes.code == TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED && opRes.errorMessage > keySlice.copyString()) {
// remove conflict and retry
auto inner = removeConflict(opRes.errorMessage);
if (inner.code != TRI_ERROR_NO_ERROR) {
return opRes.errorMessage.empty() ? Result(opRes.code) : Result(opRes.code, opRes.errorMessage);
}
opRes = trx->insert(collectionName, it, options);
if (opRes.code != TRI_ERROR_NO_ERROR) {
return opRes.errorMessage.empty() ? Result(opRes.code) : Result(opRes.code, opRes.errorMessage);
}
} else {
return opRes.errorMessage.empty() ? Result(opRes.code) : Result(opRes.code, opRes.errorMessage);
}
return Result(opRes.code, opRes.errorMessage);
}
} else {
// UPDATE
OperationResult opRes = trx->update(collectionName, it, options);
if (opRes.code != TRI_ERROR_NO_ERROR) {
if (opRes.errorMessage.empty()) {
return Result(opRes.code);
if (opRes.code == TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED && opRes.errorMessage > keySlice.copyString()) {
// remove conflict and retry
auto inner = removeConflict(opRes.errorMessage);
if (inner.code != TRI_ERROR_NO_ERROR) {
return opRes.errorMessage.empty() ? Result(opRes.code) : Result(opRes.code, opRes.errorMessage);
}
opRes = trx->update(collectionName, it, options);
if (opRes.code != TRI_ERROR_NO_ERROR) {
return opRes.errorMessage.empty() ? Result(opRes.code) : Result(opRes.code, opRes.errorMessage);
}
} else {
return opRes.errorMessage.empty() ? Result(opRes.code) : Result(opRes.code, opRes.errorMessage);
}
return Result(opRes.code, opRes.errorMessage);
}
}
}
@ -310,7 +344,7 @@ Result syncChunkRocksDB(DatabaseInitialSyncer& syncer,
// try again in next round
offsetInChunk = foundLength;
}
return Result();
}
@ -425,7 +459,7 @@ Result handleSyncKeysRocksDB(DatabaseInitialSyncer& syncer,
UINT64_MAX);
res = trx.commit();
if (!res.ok()) {
return res;
}

View File

@ -198,7 +198,7 @@ int RocksDBIndex::drop() {
bool prefix_same_as_start = this->type() != Index::TRI_IDX_TYPE_EDGE_INDEX;
arangodb::Result r = rocksutils::removeLargeRange(
rocksutils::globalRocksDB(), this->getBounds(), prefix_same_as_start);
// Try to drop the cache as well.
if (_cachePresent) {
try {
@ -210,19 +210,19 @@ int RocksDBIndex::drop() {
} catch (...) {
}
}
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
//check if documents have been deleted
size_t numDocs = rocksutils::countKeyRange(rocksutils::globalRocksDB(),
this->getBounds(), prefix_same_as_start);
if (numDocs > 0) {
std::string errorMsg("deletion check in index drop failed - not all documents in the index have been deleted. remaining: ");
errorMsg.append(std::to_string(numDocs));
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, errorMsg);
}
#endif
return r.errorNumber();
}
@ -240,12 +240,13 @@ Result RocksDBIndex::updateInternal(transaction::Methods* trx, RocksDBMethods* m
LocalDocumentId const& oldDocumentId,
arangodb::velocypack::Slice const& oldDoc,
LocalDocumentId const& newDocumentId,
arangodb::velocypack::Slice const& newDoc) {
Result res = removeInternal(trx, mthd, oldDocumentId, oldDoc);
arangodb::velocypack::Slice const& newDoc,
OperationMode mode) {
Result res = removeInternal(trx, mthd, oldDocumentId, oldDoc, mode);
if (!res.ok()) {
return res;
}
return insertInternal(trx, mthd, newDocumentId, newDoc);
return insertInternal(trx, mthd, newDocumentId, newDoc, mode);
}
void RocksDBIndex::truncate(transaction::Methods* trx) {
@ -291,7 +292,7 @@ void RocksDBIndex::truncate(transaction::Methods* trx) {
iter->Next();
}
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
//check if index entries have been deleted
if (type() != TRI_IDX_TYPE_GEO1_INDEX && type() != TRI_IDX_TYPE_GEO2_INDEX) {

View File

@ -66,7 +66,7 @@ class RocksDBIndex : public Index {
public:
~RocksDBIndex();
void toVelocyPackFigures(VPackBuilder& builder) const override;
/// @brief return a VelocyPack representation of the index
void toVelocyPack(velocypack::Builder& builder, bool withFigures,
bool forPersistence) const override;
@ -94,15 +94,16 @@ class RocksDBIndex : public Index {
}
Result insert(transaction::Methods* trx, LocalDocumentId const& documentId,
velocypack::Slice const& doc, bool) override {
velocypack::Slice const& doc, OperationMode mode) override {
auto mthds = RocksDBTransactionState::toMethods(trx);
return insertInternal(trx, mthds, documentId, doc);
return insertInternal(trx, mthds, documentId, doc, mode);
}
Result remove(transaction::Methods* trx, LocalDocumentId const& documentId,
arangodb::velocypack::Slice const& doc, bool) override {
arangodb::velocypack::Slice const& doc,
OperationMode mode) override {
auto mthds = RocksDBTransactionState::toMethods(trx);
return removeInternal(trx, mthds, documentId, doc);
return removeInternal(trx, mthds, documentId, doc, mode);
}
void setCacheEnabled(bool enable) {
@ -121,23 +122,26 @@ class RocksDBIndex : public Index {
/// insert index elements into the specified write batch.
virtual Result insertInternal(transaction::Methods* trx, RocksDBMethods*,
LocalDocumentId const& documentId,
arangodb::velocypack::Slice const&) = 0;
arangodb::velocypack::Slice const&,
OperationMode mode) = 0;
virtual Result updateInternal(transaction::Methods* trx, RocksDBMethods*,
LocalDocumentId const& oldDocumentId,
arangodb::velocypack::Slice const& oldDoc,
LocalDocumentId const& newDocumentId,
velocypack::Slice const& newDoc);
velocypack::Slice const& newDoc,
OperationMode mode);
/// remove index elements and put it in the specified write batch.
virtual Result removeInternal(transaction::Methods* trx, RocksDBMethods*,
LocalDocumentId const& documentId,
arangodb::velocypack::Slice const&) = 0;
arangodb::velocypack::Slice const&,
OperationMode mode) = 0;
rocksdb::ColumnFamilyHandle* columnFamily() const { return _cf; }
rocksdb::Comparator const* comparator() const;
RocksDBKeyBounds getBounds() const {
return RocksDBIndex::getBounds(type(), _objectId, _unique);
};

View File

@ -219,14 +219,22 @@ LocalDocumentId RocksDBPrimaryIndex::lookupKey(transaction::Methods* trx,
Result RocksDBPrimaryIndex::insertInternal(transaction::Methods* trx,
RocksDBMethods* mthd,
LocalDocumentId const& documentId,
VPackSlice const& slice) {
VPackSlice const& slice,
OperationMode mode) {
VPackSlice keySlice = transaction::helpers::extractKeyFromDocument(slice);
RocksDBKeyLeaser key(trx);
key->constructPrimaryIndexValue(_objectId, StringRef(keySlice));
auto value = RocksDBValue::PrimaryIndexValue(documentId.id());
if (mthd->Exists(_cf, key.ref())) {
return IndexResult(TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED, this);
std::string existingId(slice.get(StaticStrings::KeyString).copyString());
if (mode == OperationMode::internal) {
return IndexResult(TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED,
existingId);
}
return IndexResult(TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED, this,
existingId);
}
blackListKey(key->string().data(), static_cast<uint32_t>(key->string().size()));
@ -240,7 +248,8 @@ Result RocksDBPrimaryIndex::updateInternal(transaction::Methods* trx,
LocalDocumentId const& oldDocumentId,
arangodb::velocypack::Slice const& oldDoc,
LocalDocumentId const& newDocumentId,
velocypack::Slice const& newDoc) {
velocypack::Slice const& newDoc,
OperationMode mode) {
VPackSlice keySlice = transaction::helpers::extractKeyFromDocument(oldDoc);
TRI_ASSERT(keySlice == oldDoc.get(StaticStrings::KeyString));
RocksDBKeyLeaser key(trx);
@ -257,7 +266,8 @@ Result RocksDBPrimaryIndex::updateInternal(transaction::Methods* trx,
Result RocksDBPrimaryIndex::removeInternal(transaction::Methods* trx,
RocksDBMethods* mthd,
LocalDocumentId const& documentId,
VPackSlice const& slice) {
VPackSlice const& slice,
OperationMode mode) {
// TODO: deal with matching revisions?
RocksDBKeyLeaser key(trx);
key->constructPrimaryIndexValue(

View File

@ -100,7 +100,7 @@ class RocksDBPrimaryIndex final : public RocksDBIndex {
arangodb::StringRef const* = nullptr) const override {
return 1.0;
}
void load() override;
void toVelocyPack(VPackBuilder&, bool, bool) const override;
@ -128,18 +128,21 @@ class RocksDBPrimaryIndex final : public RocksDBIndex {
/// insert index elements into the specified write batch.
Result insertInternal(transaction::Methods* trx, RocksDBMethods*,
LocalDocumentId const& documentId,
arangodb::velocypack::Slice const&) override;
arangodb::velocypack::Slice const&,
OperationMode mode) override;
Result updateInternal(transaction::Methods* trx, RocksDBMethods*,
LocalDocumentId const& oldDocumentId,
arangodb::velocypack::Slice const& oldDoc,
LocalDocumentId const& newDocumentId,
velocypack::Slice const& newDoc) override;
velocypack::Slice const& newDoc,
OperationMode mode) override;
/// remove index elements and put it in the specified write batch.
Result removeInternal(transaction::Methods*, RocksDBMethods*,
Result removeInternal(transaction::Methods*, RocksDBMethods*,
LocalDocumentId const& documentId,
arangodb::velocypack::Slice const&) override;
arangodb::velocypack::Slice const&,
OperationMode mode) override;
protected:
Result postprocessRemove(transaction::Methods* trx, rocksdb::Slice const& key,
@ -159,7 +162,7 @@ class RocksDBPrimaryIndex final : public RocksDBIndex {
/// @brief add a single value node to the iterator's keys
void handleValNode(transaction::Methods* trx, VPackBuilder* keys,
arangodb::aql::AstNode const* valNode, bool isId) const;
private:
bool const _isRunningInCluster;
};

View File

@ -105,7 +105,7 @@ bool RocksDBVPackUniqueIndexIterator::next(LocalDocumentIdCallback const& cb, si
// already looked up something
return false;
}
_done = true;
auto value = RocksDBValue::Empty(RocksDBEntryType::PrimaryIndexValue);
@ -295,7 +295,7 @@ bool RocksDBVPackIndex::implicitlyUnique() const {
/// @brief helper function to insert a document into any index type
/// Should result in an elements vector filled with the new index entries
/// uses the _unique field to determine the kind of key structure
int RocksDBVPackIndex::fillElement(VPackBuilder& leased,
int RocksDBVPackIndex::fillElement(VPackBuilder& leased,
LocalDocumentId const& documentId,
VPackSlice const& doc,
std::vector<RocksDBKey>& elements,
@ -375,7 +375,7 @@ int RocksDBVPackIndex::fillElement(VPackBuilder& leased,
return TRI_ERROR_NO_ERROR;
}
void RocksDBVPackIndex::addIndexValue(VPackBuilder& leased,
void RocksDBVPackIndex::addIndexValue(VPackBuilder& leased,
LocalDocumentId const& documentId,
VPackSlice const& document,
std::vector<RocksDBKey>& elements,
@ -554,7 +554,8 @@ void RocksDBVPackIndex::fillPaths(std::vector<std::vector<std::string>>& paths,
Result RocksDBVPackIndex::insertInternal(transaction::Methods* trx,
RocksDBMethods* mthds,
LocalDocumentId const& documentId,
VPackSlice const& doc) {
VPackSlice const& doc,
OperationMode mode) {
std::vector<RocksDBKey> elements;
std::vector<uint64_t> hashes;
int res = TRI_ERROR_NO_ERROR;
@ -573,13 +574,15 @@ Result RocksDBVPackIndex::insertInternal(transaction::Methods* trx,
: RocksDBValue::VPackIndexValue();
size_t const count = elements.size();
RocksDBValue existing =
RocksDBValue::Empty(RocksDBEntryType::UniqueVPackIndexValue);
for (size_t i = 0; i < count; ++i) {
RocksDBKey& key = elements[i];
if (_unique) {
RocksDBValue existing =
RocksDBValue::Empty(RocksDBEntryType::UniqueVPackIndexValue);
if (mthds->Exists(_cf, key)) {
res = TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED;
auto found = mthds->Get(_cf, key, existing.buffer());
TRI_ASSERT(found.ok());
}
}
@ -613,6 +616,19 @@ Result RocksDBVPackIndex::insertInternal(transaction::Methods* trx,
}
}
if (res == TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED) {
LocalDocumentId rev(RocksDBValue::revisionId(existing));
ManagedDocumentResult mmdr;
bool success = _collection->getPhysical()->readDocument(trx, rev, mmdr);
TRI_ASSERT(success);
std::string existingKey(
VPackSlice(mmdr.vpack()).get(StaticStrings::KeyString).copyString());
if (mode == OperationMode::internal) {
return IndexResult(res, existingKey);
}
return IndexResult(res, this, existingKey);
}
return IndexResult(res, this);
}
@ -621,15 +637,16 @@ Result RocksDBVPackIndex::updateInternal(transaction::Methods* trx,
LocalDocumentId const& oldDocumentId,
arangodb::velocypack::Slice const& oldDoc,
LocalDocumentId const& newDocumentId,
velocypack::Slice const& newDoc) {
velocypack::Slice const& newDoc,
OperationMode mode) {
if (!_unique || _useExpansion) {
// only unique index supports in-place updates
// lets also not handle the complex case of expanded arrays
return RocksDBIndex::updateInternal(trx, mthds, oldDocumentId, oldDoc,
newDocumentId, newDoc);
newDocumentId, newDoc, mode);
} else {
bool equal = true;
for (size_t i = 0; i < _paths.size(); ++i) {
TRI_ASSERT(!_paths[i].empty());
@ -651,9 +668,9 @@ Result RocksDBVPackIndex::updateInternal(transaction::Methods* trx,
if (!equal) {
// we can only use in-place updates if no indexed attributes changed
return RocksDBIndex::updateInternal(trx, mthds, oldDocumentId, oldDoc,
newDocumentId, newDoc);
newDocumentId, newDoc, mode);
}
// more expansive method to
std::vector<RocksDBKey> elements;
std::vector<uint64_t> hashes;
@ -666,7 +683,7 @@ Result RocksDBVPackIndex::updateInternal(transaction::Methods* trx,
if (res != TRI_ERROR_NO_ERROR) {
return IndexResult(res, this);
}
RocksDBValue value = RocksDBValue::UniqueVPackIndexValue(newDocumentId.id());
size_t const count = elements.size();
for (size_t i = 0; i < count; ++i) {
@ -686,7 +703,7 @@ Result RocksDBVPackIndex::updateInternal(transaction::Methods* trx,
break;
}
}
return res;
}
}
@ -695,7 +712,8 @@ Result RocksDBVPackIndex::updateInternal(transaction::Methods* trx,
Result RocksDBVPackIndex::removeInternal(transaction::Methods* trx,
RocksDBMethods* mthds,
LocalDocumentId const& documentId,
VPackSlice const& doc) {
VPackSlice const& doc,
OperationMode mode) {
std::vector<RocksDBKey> elements;
std::vector<uint64_t> hashes;
int res = TRI_ERROR_NO_ERROR;
@ -708,7 +726,7 @@ Result RocksDBVPackIndex::removeInternal(transaction::Methods* trx,
return IndexResult(res, this);
}
size_t const count = elements.size();
for (size_t i = 0; i < count; ++i) {
arangodb::Result r = mthds->Delete(_cf, elements[i]);
@ -750,16 +768,16 @@ IndexIterator* RocksDBVPackIndex::lookup(
}
leftSearch.add(eq);
}
if (lastNonEq.isNone() && _unique && searchValues.length() == _fields.size()) {
leftSearch.close();
return new RocksDBVPackUniqueIndexIterator(_collection, trx, mmdr, this, leftSearch.slice());
}
VPackSlice leftBorder;
VPackSlice rightBorder;
VPackBuilder rightSearch;
if (lastNonEq.isNone()) {

View File

@ -56,7 +56,7 @@ namespace transaction {
class Methods;
}
/// @brief Iterator structure for RocksDB unique index.
/// @brief Iterator structure for RocksDB unique index.
/// This iterator can be used only for equality lookups that use all
/// index attributes. It uses a point lookup and no seeks
class RocksDBVPackUniqueIndexIterator final : public IndexIterator {
@ -100,7 +100,7 @@ class RocksDBVPackIndexIterator final : public IndexIterator {
transaction::Methods* trx,
ManagedDocumentResult* mmdr,
arangodb::RocksDBVPackIndex const* index,
bool reverse,
bool reverse,
RocksDBKeyBounds&& bounds);
~RocksDBVPackIndexIterator() = default;
@ -197,17 +197,20 @@ class RocksDBVPackIndex : public RocksDBIndex {
protected:
Result insertInternal(transaction::Methods*, RocksDBMethods*,
LocalDocumentId const& documentId,
arangodb::velocypack::Slice const&) override;
arangodb::velocypack::Slice const&,
OperationMode mode) override;
Result updateInternal(transaction::Methods* trx, RocksDBMethods*,
LocalDocumentId const& oldDocumentId,
arangodb::velocypack::Slice const& oldDoc,
LocalDocumentId const& newDocumentId,
velocypack::Slice const& newDoc) override;
velocypack::Slice const& newDoc,
OperationMode mode) override;
Result removeInternal(transaction::Methods*, RocksDBMethods*,
Result removeInternal(transaction::Methods*, RocksDBMethods*,
LocalDocumentId const& documentId,
arangodb::velocypack::Slice const&) override;
arangodb::velocypack::Slice const&,
OperationMode mode) override;
Result postprocessRemove(transaction::Methods* trx, rocksdb::Slice const& key,
rocksdb::Slice const& value) override;

View File

@ -478,7 +478,7 @@ std::pair<bool, bool> transaction::Methods::findIndexHandleForAndNode(
// enable the following line to see index candidates considered with their
// abilities and scores
// LOG_TOPIC(TRACE, Logger::FIXME) << "looking at index: " << idx.get() << ", isSorted: " << idx->isSorted() << ", isSparse: " << idx->sparse() << ", fields: " << idx->fields().size() << ", supportsFilter: " << supportsFilter << ", supportsSort: " << supportsSort << ", filterCost: " << filterCost << ", sortCost: " << sortCost << ", totalCost: " << (filterCost + sortCost) << ", isOnlyAttributeAccess: " << isOnlyAttributeAccess << ", isUnidirectional: " << sortCondition->isUnidirectional() << ", isOnlyEqualityMatch: " << node->isOnlyEqualityMatch() << ", itemsInIndex: " << itemsInIndex;
// LOG_TOPIC(TRACE, Logger::FIXME) << "looking at index: " << idx.get() << ", isSorted: " << idx->isSorted() << ", isSparse: " << idx->sparse() << ", fields: " << idx->fields().size() << ", supportsFilter: " << supportsFilter << ", supportsSort: " << supportsSort << ", filterCost: " << filterCost << ", sortCost: " << sortCost << ", totalCost: " << (filterCost + sortCost) << ", isOnlyAttributeAccess: " << isOnlyAttributeAccess << ", isUnidirectional: " << sortCondition->isUnidirectional() << ", isOnlyEqualityMatch: " << node->isOnlyEqualityMatch() << ", itemsInIndex: " << itemsInIndex;
if (!supportsFilter && !supportsSort) {
continue;
@ -520,17 +520,17 @@ bool transaction::Methods::findIndexHandleForAndNode(
size_t estimatedItems;
bool supportsFilter = idx->supportsFilterCondition(node, reference, itemsInIndex,
estimatedItems, estimatedCost);
// enable the following line to see index candidates considered with their
// abilities and scores
// LOG_TOPIC(TRACE, Logger::FIXME) << "looking at index: " << idx.get() << ", isSorted: " << idx->isSorted() << ", isSparse: " << idx->sparse() << ", fields: " << idx->fields().size() << ", supportsFilter: " << supportsFilter << ", estimatedCost: " << estimatedCost << ", estimatedItems: " << estimatedItems << ", itemsInIndex: " << itemsInIndex << ", selectivity: " << (idx->hasSelectivityEstimate() ? idx->selectivityEstimate() : -1.0) << ", node: " << node;
// LOG_TOPIC(TRACE, Logger::FIXME) << "looking at index: " << idx.get() << ", isSorted: " << idx->isSorted() << ", isSparse: " << idx->sparse() << ", fields: " << idx->fields().size() << ", supportsFilter: " << supportsFilter << ", estimatedCost: " << estimatedCost << ", estimatedItems: " << estimatedItems << ", itemsInIndex: " << itemsInIndex << ", selectivity: " << (idx->hasSelectivityEstimate() ? idx->selectivityEstimate() : -1.0) << ", node: " << node;
if (!supportsFilter) {
continue;
}
// index supports the filter condition
// this reduces the number of items left
itemsInIndex = estimatedItems;
@ -1925,8 +1925,8 @@ OperationResult transaction::Methods::modifyLocal(
resultBuilder.clear();
}
return OperationResult(resultBuilder.steal(), nullptr, "", res.errorNumber(),
options.waitForSync, errorCounter);
return OperationResult(resultBuilder.steal(), nullptr, res.errorMessage(),
res.errorNumber(), options.waitForSync, errorCounter);
}
/// @brief remove one or multiple documents in a collection

View File

@ -25,14 +25,16 @@
#define ARANGOD_UTILS_OPERATION_OPTIONS_H 1
#include "Basics/Common.h"
#include "Indexes/Index.h"
namespace arangodb {
// a struct for keeping document modification operations in transactions
struct OperationOptions {
OperationOptions()
OperationOptions()
: recoveryData(nullptr), waitForSync(false), keepNull(true),
mergeObjects(true), silent(false), ignoreRevs(true),
returnOld(false), returnNew(false), isRestore(false) {}
returnOld(false), returnNew(false), isRestore(false),
indexOpMode(Index::OperationMode::normal) {}
// original marker, set by an engine's recovery procedure only!
void* recoveryData;
@ -67,6 +69,8 @@ struct OperationOptions {
// operation if we are merely a follower. Finally, we must deny replications
// from the wrong leader.
std::string isSynchronousReplicationFrom;
Index::OperationMode indexOpMode;
};
}

View File

@ -133,7 +133,7 @@ function BaseTestConfig() {
},
true
);
connectToSlave();
assertEqual(5000, collectionCount(cn));
@ -141,16 +141,16 @@ function BaseTestConfig() {
// remove some random documents
for (var i = 0; i < 50; ++i) {
c.remove(c.any());
}
}
assertEqual(4950, collectionCount(cn));
// and sync again
// and sync again
var syncResult = replication.syncCollection(cn, {
endpoint: masterEndpoint,
verbose: true,
incremental: true
});
assertEqual(st.count, collectionCount(cn));
assertEqual(st.checksum, collectionChecksum(cn));
},
@ -189,24 +189,24 @@ function BaseTestConfig() {
},
true
);
connectToSlave();
var c = db._collection(cn);
// insert some random documents
for (var i = 0; i < 100; ++i) {
c.insert({ foo: "bar" + i });
}
}
assertEqual(5100, collectionCount(cn));
// and sync again
// and sync again
var syncResult = replication.syncCollection(cn, {
endpoint: masterEndpoint,
verbose: true,
incremental: true
});
assertEqual(st.count, collectionCount(cn));
assertEqual(st.checksum, collectionChecksum(cn));
},
@ -1068,7 +1068,7 @@ function BaseTestConfig() {
true
);
}
};
}
@ -1198,7 +1198,7 @@ function ReplicationIncrementalKeyConflict() {
c.insert({ _key: "x", value: 1 });
c.insert({ _key: "y", value: 2 });
c.insert({ _key: "z", value: 3 });
connectToSlave();
var syncResult = replication.syncCollection(cn, {
endpoint: masterEndpoint,
@ -1220,12 +1220,12 @@ function ReplicationIncrementalKeyConflict() {
c = db._collection(cn);
c.remove("z");
c.insert({ _key: "w", value: 3 });
assertEqual(3, c.count());
assertEqual(3, c.document("w").value);
assertEqual(1, c.document("x").value);
assertEqual(2, c.document("y").value);
connectToSlave();
replication.syncCollection(cn, {
endpoint: masterEndpoint,
@ -1234,13 +1234,13 @@ function ReplicationIncrementalKeyConflict() {
});
db._flushCache();
c = db._collection(cn);
assertEqual(3, c.count());
assertEqual(3, c.document("w").value);
assertEqual(1, c.document("x").value);
assertEqual(2, c.document("y").value);
assertEqual("hash", c.getIndexes()[1].type);
assertTrue(c.getIndexes()[1].unique);
},
@ -1252,7 +1252,7 @@ function ReplicationIncrementalKeyConflict() {
for (i = 0; i < 10000; ++i) {
c.insert({ _key: "test" + i, value: i });
}
connectToSlave();
replication.syncCollection(cn, {
endpoint: masterEndpoint,
@ -1279,9 +1279,9 @@ function ReplicationIncrementalKeyConflict() {
c.insert({ _key: "test1", value: 9998 });
c.insert({ _key: "test9998", value: 1 });
c.insert({ _key: "test9999", value: 0 });
assertEqual(10000, c.count());
connectToSlave();
replication.syncCollection(cn, {
endpoint: masterEndpoint,
@ -1290,10 +1290,10 @@ function ReplicationIncrementalKeyConflict() {
});
db._flushCache();
c = db._collection(cn);
assertEqual(10000, c.count());
assertEqual("hash", c.getIndexes()[1].type);
assertTrue(c.getIndexes()[1].unique);
}
@ -1307,6 +1307,6 @@ function ReplicationIncrementalKeyConflict() {
jsunity.run(ReplicationSuite);
jsunity.run(ReplicationOtherDBSuite);
// TODO: activate this test once it works
// jsunity.run(ReplicationIncrementalKeyConflict);
jsunity.run(ReplicationIncrementalKeyConflict);
return jsunity.done();