mirror of https://gitee.com/bigwinds/arangodb
Moved engine specific logic of Document Remove from Logical To Physical Collection
This commit is contained in:
parent
b666254d4b
commit
c8757efc8e
|
@ -1614,6 +1614,132 @@ int MMFilesCollection::update(arangodb::transaction::Methods* trx,
|
|||
|
||||
}
|
||||
|
||||
int MMFilesCollection::replace(
|
||||
transaction::Methods* trx, VPackSlice const newSlice,
|
||||
ManagedDocumentResult& result, OperationOptions& options,
|
||||
TRI_voc_tick_t& resultMarkerTick, bool lock, TRI_voc_rid_t& prevRev,
|
||||
ManagedDocumentResult& previous, TRI_voc_rid_t const revisionId,
|
||||
VPackSlice const fromSlice, VPackSlice const toSlice) {
|
||||
bool const isEdgeCollection = (_logicalCollection->type() == TRI_COL_TYPE_EDGE);
|
||||
TRI_IF_FAILURE("ReplaceDocumentNoLock") { return TRI_ERROR_DEBUG; }
|
||||
|
||||
// get the previous revision
|
||||
VPackSlice key = newSlice.get(StaticStrings::KeyString);
|
||||
if (key.isNone()) {
|
||||
return TRI_ERROR_ARANGO_DOCUMENT_HANDLE_BAD;
|
||||
}
|
||||
|
||||
bool const useDeadlockDetector =
|
||||
(lock && !trx->isSingleOperationTransaction());
|
||||
arangodb::CollectionWriteLocker collectionLocker(_logicalCollection, useDeadlockDetector,
|
||||
lock);
|
||||
|
||||
// get the previous revision
|
||||
int res = _logicalCollection->lookupDocument(trx, key, previous);
|
||||
|
||||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
return res;
|
||||
}
|
||||
|
||||
TRI_IF_FAILURE("ReplaceDocumentNoMarker") {
|
||||
// test what happens when no marker can be created
|
||||
return TRI_ERROR_DEBUG;
|
||||
}
|
||||
|
||||
TRI_IF_FAILURE("ReplaceDocumentNoMarkerExcept") {
|
||||
// test what happens when no marker can be created
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
|
||||
}
|
||||
|
||||
uint8_t const* vpack = previous.vpack();
|
||||
VPackSlice oldDoc(vpack);
|
||||
TRI_voc_rid_t oldRevisionId = transaction::Methods::extractRevFromDocument(oldDoc);
|
||||
prevRev = oldRevisionId;
|
||||
|
||||
// Check old revision:
|
||||
if (!options.ignoreRevs) {
|
||||
TRI_voc_rid_t expectedRev = 0;
|
||||
if (newSlice.isObject()) {
|
||||
expectedRev = TRI_ExtractRevisionId(newSlice);
|
||||
}
|
||||
int res = _logicalCollection->checkRevision(trx, expectedRev, prevRev);
|
||||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
return res;
|
||||
}
|
||||
}
|
||||
|
||||
// merge old and new values
|
||||
TransactionBuilderLeaser builder(trx);
|
||||
newObjectForReplace(trx, oldDoc, newSlice, fromSlice, toSlice,
|
||||
isEdgeCollection, TRI_RidToString(revisionId),
|
||||
*builder.get());
|
||||
|
||||
if (ServerState::isDBServer(trx->serverRole())) {
|
||||
// Need to check that no sharding keys have changed:
|
||||
if (arangodb::shardKeysChanged(_logicalCollection->dbName(),
|
||||
trx->resolver()->getCollectionNameCluster(
|
||||
_logicalCollection->planId()),
|
||||
oldDoc, builder->slice(), false)) {
|
||||
return TRI_ERROR_CLUSTER_MUST_NOT_CHANGE_SHARDING_ATTRIBUTES;
|
||||
}
|
||||
}
|
||||
|
||||
// create marker
|
||||
MMFilesCrudMarker replaceMarker(
|
||||
TRI_DF_MARKER_VPACK_DOCUMENT,
|
||||
static_cast<MMFilesTransactionState*>(trx->state())->idForMarker(), builder->slice());
|
||||
|
||||
MMFilesWalMarker const* marker;
|
||||
if (options.recoveryMarker == nullptr) {
|
||||
marker = &replaceMarker;
|
||||
} else {
|
||||
marker = options.recoveryMarker;
|
||||
}
|
||||
|
||||
VPackSlice const newDoc(marker->vpack());
|
||||
|
||||
MMFilesDocumentOperation operation(_logicalCollection, TRI_VOC_DOCUMENT_OPERATION_REPLACE);
|
||||
|
||||
try {
|
||||
insertRevision(revisionId, marker->vpack(), 0, true, true);
|
||||
|
||||
operation.setRevisions(DocumentDescriptor(oldRevisionId, oldDoc.begin()),
|
||||
DocumentDescriptor(revisionId, newDoc.begin()));
|
||||
|
||||
if (oldRevisionId == revisionId) {
|
||||
// update with same revision id => can happen if isRestore = true
|
||||
result.clear();
|
||||
}
|
||||
|
||||
res = _logicalCollection->updateDocument(trx, oldRevisionId, oldDoc,
|
||||
revisionId, newDoc, operation,
|
||||
marker, options.waitForSync);
|
||||
} catch (basics::Exception const& ex) {
|
||||
res = ex.code();
|
||||
} catch (std::bad_alloc const&) {
|
||||
res = TRI_ERROR_OUT_OF_MEMORY;
|
||||
} catch (...) {
|
||||
res = TRI_ERROR_INTERNAL;
|
||||
}
|
||||
|
||||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
operation.revert(trx);
|
||||
} else {
|
||||
if (oldRevisionId == revisionId) {
|
||||
// update with same revision id => can happen if isRestore = true
|
||||
result.clear();
|
||||
}
|
||||
_logicalCollection->readRevision(trx, result, revisionId);
|
||||
|
||||
if (options.waitForSync) {
|
||||
// store the tick that was used for writing the new document
|
||||
resultMarkerTick = operation.tick();
|
||||
}
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
int MMFilesCollection::remove(arangodb::transaction::Methods* trx, VPackSlice const slice,
|
||||
ManagedDocumentResult& previous,
|
||||
OperationOptions& options,
|
||||
|
|
|
@ -199,6 +199,15 @@ class MMFilesCollection final : public PhysicalCollection {
|
|||
TRI_voc_rid_t const& revisionId,
|
||||
arangodb::velocypack::Slice const key) override;
|
||||
|
||||
int replace(transaction::Methods* trx,
|
||||
arangodb::velocypack::Slice const newSlice,
|
||||
ManagedDocumentResult& result, OperationOptions& options,
|
||||
TRI_voc_tick_t& resultMarkerTick, bool lock,
|
||||
TRI_voc_rid_t& prevRev, ManagedDocumentResult& previous,
|
||||
TRI_voc_rid_t const revisionId,
|
||||
arangodb::velocypack::Slice const fromSlice,
|
||||
arangodb::velocypack::Slice const toSlice) override;
|
||||
|
||||
int remove(arangodb::transaction::Methods* trx,
|
||||
arangodb::velocypack::Slice const slice,
|
||||
arangodb::ManagedDocumentResult& previous,
|
||||
|
|
|
@ -43,7 +43,6 @@
|
|||
#include "Scheduler/SchedulerFeature.h"
|
||||
#include "StorageEngine/EngineSelectorFeature.h"
|
||||
#include "MMFiles/MMFilesDocumentOperation.h"
|
||||
//#include "MMFiles/MMFilesLogfileManager.h"
|
||||
#include "MMFiles/MMFilesCollection.h" //remove
|
||||
#include "MMFiles/MMFilesPrimaryIndex.h"
|
||||
#include "MMFiles/MMFilesIndexElement.h"
|
||||
|
@ -2060,7 +2059,8 @@ int LogicalCollection::update(transaction::Methods* trx, VPackSlice const newSli
|
|||
}
|
||||
|
||||
/// @brief replaces a document or edge in a collection
|
||||
int LogicalCollection::replace(transaction::Methods* trx, VPackSlice const newSlice,
|
||||
int LogicalCollection::replace(
|
||||
transaction::Methods* trx, VPackSlice const newSlice,
|
||||
ManagedDocumentResult& result,
|
||||
OperationOptions& options,
|
||||
TRI_voc_tick_t& resultMarkerTick, bool lock,
|
||||
|
@ -2076,9 +2076,8 @@ int LogicalCollection::replace(transaction::Methods* trx, VPackSlice const newSl
|
|||
VPackSlice fromSlice;
|
||||
VPackSlice toSlice;
|
||||
|
||||
bool const isEdgeCollection = (type() == TRI_COL_TYPE_EDGE);
|
||||
|
||||
if (isEdgeCollection) {
|
||||
if (type() == TRI_COL_TYPE_EDGE) {
|
||||
fromSlice = newSlice.get(StaticStrings::FromString);
|
||||
if (!fromSlice.isString()) {
|
||||
return TRI_ERROR_ARANGO_INVALID_EDGE_ATTRIBUTE;
|
||||
|
@ -2107,122 +2106,9 @@ int LogicalCollection::replace(transaction::Methods* trx, VPackSlice const newSl
|
|||
revisionId = TRI_HybridLogicalClock();
|
||||
}
|
||||
|
||||
TRI_IF_FAILURE("ReplaceDocumentNoLock") { return TRI_ERROR_DEBUG; }
|
||||
|
||||
// get the previous revision
|
||||
VPackSlice key = newSlice.get(StaticStrings::KeyString);
|
||||
if (key.isNone()) {
|
||||
return TRI_ERROR_ARANGO_DOCUMENT_HANDLE_BAD;
|
||||
}
|
||||
|
||||
bool const useDeadlockDetector =
|
||||
(lock && !trx->isSingleOperationTransaction());
|
||||
arangodb::CollectionWriteLocker collectionLocker(this, useDeadlockDetector,
|
||||
lock);
|
||||
|
||||
// get the previous revision
|
||||
int res = lookupDocument(trx, key, previous);
|
||||
|
||||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
return res;
|
||||
}
|
||||
|
||||
TRI_IF_FAILURE("ReplaceDocumentNoMarker") {
|
||||
// test what happens when no marker can be created
|
||||
return TRI_ERROR_DEBUG;
|
||||
}
|
||||
|
||||
TRI_IF_FAILURE("ReplaceDocumentNoMarkerExcept") {
|
||||
// test what happens when no marker can be created
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
|
||||
}
|
||||
|
||||
uint8_t const* vpack = previous.vpack();
|
||||
VPackSlice oldDoc(vpack);
|
||||
TRI_voc_rid_t oldRevisionId = transaction::Methods::extractRevFromDocument(oldDoc);
|
||||
prevRev = oldRevisionId;
|
||||
|
||||
// Check old revision:
|
||||
if (!options.ignoreRevs) {
|
||||
TRI_voc_rid_t expectedRev = 0;
|
||||
if (newSlice.isObject()) {
|
||||
expectedRev = TRI_ExtractRevisionId(newSlice);
|
||||
}
|
||||
int res = checkRevision(trx, expectedRev, prevRev);
|
||||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
return res;
|
||||
}
|
||||
}
|
||||
|
||||
// merge old and new values
|
||||
TransactionBuilderLeaser builder(trx);
|
||||
newObjectForReplace(trx, oldDoc, newSlice, fromSlice, toSlice,
|
||||
isEdgeCollection, TRI_RidToString(revisionId),
|
||||
*builder.get());
|
||||
|
||||
if (ServerState::isDBServer(trx->serverRole())) {
|
||||
// Need to check that no sharding keys have changed:
|
||||
if (arangodb::shardKeysChanged(
|
||||
_vocbase->name(),
|
||||
trx->resolver()->getCollectionNameCluster(_planId), oldDoc,
|
||||
builder->slice(), false)) {
|
||||
return TRI_ERROR_CLUSTER_MUST_NOT_CHANGE_SHARDING_ATTRIBUTES;
|
||||
}
|
||||
}
|
||||
|
||||
// create marker
|
||||
MMFilesCrudMarker replaceMarker(
|
||||
TRI_DF_MARKER_VPACK_DOCUMENT,
|
||||
static_cast<MMFilesTransactionState*>(trx->state())->idForMarker(), builder->slice());
|
||||
|
||||
MMFilesWalMarker const* marker;
|
||||
if (options.recoveryMarker == nullptr) {
|
||||
marker = &replaceMarker;
|
||||
} else {
|
||||
marker = options.recoveryMarker;
|
||||
}
|
||||
|
||||
VPackSlice const newDoc(marker->vpack());
|
||||
|
||||
MMFilesDocumentOperation operation(this, TRI_VOC_DOCUMENT_OPERATION_REPLACE);
|
||||
|
||||
try {
|
||||
insertRevision(revisionId, marker->vpack(), 0, true);
|
||||
|
||||
operation.setRevisions(DocumentDescriptor(oldRevisionId, oldDoc.begin()),
|
||||
DocumentDescriptor(revisionId, newDoc.begin()));
|
||||
|
||||
if (oldRevisionId == revisionId) {
|
||||
// update with same revision id => can happen if isRestore = true
|
||||
result.clear();
|
||||
}
|
||||
|
||||
res = updateDocument(trx, oldRevisionId, oldDoc, revisionId, newDoc,
|
||||
operation, marker, options.waitForSync);
|
||||
} catch (basics::Exception const& ex) {
|
||||
res = ex.code();
|
||||
} catch (std::bad_alloc const&) {
|
||||
res = TRI_ERROR_OUT_OF_MEMORY;
|
||||
} catch (...) {
|
||||
res = TRI_ERROR_INTERNAL;
|
||||
}
|
||||
|
||||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
operation.revert(trx);
|
||||
} else {
|
||||
if (oldRevisionId == revisionId) {
|
||||
// update with same revision id => can happen if isRestore = true
|
||||
result.clear();
|
||||
}
|
||||
readRevision(trx, result, revisionId);
|
||||
|
||||
if (options.waitForSync) {
|
||||
// store the tick that was used for writing the new document
|
||||
resultMarkerTick = operation.tick();
|
||||
}
|
||||
}
|
||||
|
||||
return res;
|
||||
return getPhysical()->replace(trx, newSlice, result, options,
|
||||
resultMarkerTick, lock, prevRev, previous,
|
||||
revisionId, fromSlice, toSlice);
|
||||
}
|
||||
|
||||
/// @brief removes a document or edge
|
||||
|
@ -2895,44 +2781,6 @@ int LogicalCollection::newObjectForInsert(
|
|||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
||||
/// @brief new object for replace, oldValue must have _key and _id correctly
|
||||
/// set
|
||||
void LogicalCollection::newObjectForReplace(
|
||||
transaction::Methods* trx, VPackSlice const& oldValue, VPackSlice const& newValue,
|
||||
VPackSlice const& fromSlice, VPackSlice const& toSlice,
|
||||
bool isEdgeCollection, std::string const& rev, VPackBuilder& builder) {
|
||||
builder.openObject();
|
||||
|
||||
// add system attributes first, in this order:
|
||||
// _key, _id, _from, _to, _rev
|
||||
|
||||
// _key
|
||||
VPackSlice s = oldValue.get(StaticStrings::KeyString);
|
||||
TRI_ASSERT(!s.isNone());
|
||||
builder.add(StaticStrings::KeyString, s);
|
||||
|
||||
// _id
|
||||
s = oldValue.get(StaticStrings::IdString);
|
||||
TRI_ASSERT(!s.isNone());
|
||||
builder.add(StaticStrings::IdString, s);
|
||||
|
||||
// _from and _to here
|
||||
if (isEdgeCollection) {
|
||||
TRI_ASSERT(!fromSlice.isNone());
|
||||
TRI_ASSERT(!toSlice.isNone());
|
||||
builder.add(StaticStrings::FromString, fromSlice);
|
||||
builder.add(StaticStrings::ToString, toSlice);
|
||||
}
|
||||
|
||||
// _rev
|
||||
builder.add(StaticStrings::RevString, VPackValue(rev));
|
||||
|
||||
// add other attributes after the system attributes
|
||||
TRI_SanitizeObjectWithEdges(newValue, builder);
|
||||
|
||||
builder.close();
|
||||
}
|
||||
|
||||
/// @brief new object for remove, must have _key set
|
||||
void LogicalCollection::newObjectForRemove(transaction::Methods* trx,
|
||||
VPackSlice const& oldValue,
|
||||
|
|
|
@ -491,16 +491,7 @@ class LogicalCollection {
|
|||
velocypack::Builder& builder,
|
||||
bool isRestore);
|
||||
|
||||
/// @brief new object for replace
|
||||
void newObjectForReplace(transaction::Methods* trx,
|
||||
velocypack::Slice const& oldValue,
|
||||
velocypack::Slice const& newValue,
|
||||
velocypack::Slice const& fromSlice,
|
||||
velocypack::Slice const& toSlice,
|
||||
bool isEdgeCollection, std::string const& rev,
|
||||
velocypack::Builder& builder);
|
||||
|
||||
/// @brief new object for remove, must have _key set
|
||||
/// @brief new object for remove, must have _key set
|
||||
void newObjectForRemove(transaction::Methods* trx,
|
||||
velocypack::Slice const& oldValue,
|
||||
std::string const& rev,
|
||||
|
|
|
@ -24,6 +24,7 @@
|
|||
#include "PhysicalCollection.h"
|
||||
|
||||
#include "Basics/StaticStrings.h"
|
||||
#include "VocBase/vocbase.h"
|
||||
|
||||
#include <velocypack/Builder.h>
|
||||
#include <velocypack/Collection.h>
|
||||
|
@ -164,4 +165,41 @@ void PhysicalCollection::mergeObjectsForUpdate(
|
|||
b.close();
|
||||
}
|
||||
|
||||
/// @brief new object for replace, oldValue must have _key and _id correctly
|
||||
/// set
|
||||
void PhysicalCollection::newObjectForReplace(
|
||||
transaction::Methods* trx, VPackSlice const& oldValue,
|
||||
VPackSlice const& newValue, VPackSlice const& fromSlice,
|
||||
VPackSlice const& toSlice, bool isEdgeCollection, std::string const& rev,
|
||||
VPackBuilder& builder) {
|
||||
builder.openObject();
|
||||
|
||||
// add system attributes first, in this order:
|
||||
// _key, _id, _from, _to, _rev
|
||||
|
||||
// _key
|
||||
VPackSlice s = oldValue.get(StaticStrings::KeyString);
|
||||
TRI_ASSERT(!s.isNone());
|
||||
builder.add(StaticStrings::KeyString, s);
|
||||
|
||||
// _id
|
||||
s = oldValue.get(StaticStrings::IdString);
|
||||
TRI_ASSERT(!s.isNone());
|
||||
builder.add(StaticStrings::IdString, s);
|
||||
|
||||
// _from and _to here
|
||||
if (isEdgeCollection) {
|
||||
TRI_ASSERT(!fromSlice.isNone());
|
||||
TRI_ASSERT(!toSlice.isNone());
|
||||
builder.add(StaticStrings::FromString, fromSlice);
|
||||
builder.add(StaticStrings::ToString, toSlice);
|
||||
}
|
||||
|
||||
// _rev
|
||||
builder.add(StaticStrings::RevString, VPackValue(rev));
|
||||
|
||||
// add other attributes after the system attributes
|
||||
TRI_SanitizeObjectWithEdges(newValue, builder);
|
||||
|
||||
builder.close();
|
||||
}
|
||||
|
|
|
@ -123,6 +123,15 @@ class PhysicalCollection {
|
|||
TRI_voc_rid_t const& revisionId,
|
||||
arangodb::velocypack::Slice const key) = 0;
|
||||
|
||||
virtual int replace(transaction::Methods* trx,
|
||||
arangodb::velocypack::Slice const newSlice,
|
||||
ManagedDocumentResult& result, OperationOptions& options,
|
||||
TRI_voc_tick_t& resultMarkerTick, bool lock,
|
||||
TRI_voc_rid_t& prevRev, ManagedDocumentResult& previous,
|
||||
TRI_voc_rid_t const revisionId,
|
||||
arangodb::velocypack::Slice const fromSlice,
|
||||
arangodb::velocypack::Slice const toSlice) = 0;
|
||||
|
||||
virtual int remove(arangodb::transaction::Methods* trx,
|
||||
arangodb::velocypack::Slice const slice,
|
||||
arangodb::ManagedDocumentResult& previous,
|
||||
|
@ -147,8 +156,17 @@ class PhysicalCollection {
|
|||
velocypack::Slice const& newValue,
|
||||
bool isEdgeCollection, std::string const& rev,
|
||||
bool mergeObjects, bool keepNull,
|
||||
velocypack::Builder& b);
|
||||
velocypack::Builder& builder);
|
||||
|
||||
/// @brief new object for replace
|
||||
void newObjectForReplace(transaction::Methods* trx,
|
||||
velocypack::Slice const& oldValue,
|
||||
velocypack::Slice const& newValue,
|
||||
velocypack::Slice const& fromSlice,
|
||||
velocypack::Slice const& toSlice,
|
||||
bool isEdgeCollection, std::string const& rev,
|
||||
velocypack::Builder& builder);
|
||||
|
||||
protected:
|
||||
LogicalCollection* _logicalCollection;
|
||||
};
|
||||
|
|
Loading…
Reference in New Issue