1
0
Fork 0

Moved EngineSpecific logic of update() from LogicalCollection to MMFilesCollection.

This commit is contained in:
Michael Hackstein 2017-02-13 17:45:33 +01:00
parent ae12fc0133
commit 2824c349bb
7 changed files with 416 additions and 328 deletions

View File

@ -348,6 +348,7 @@ SET(ARANGOD_SOURCES
VocBase/KeyGenerator.cpp
VocBase/LogicalCollection.cpp
VocBase/PathEnumerator.cpp
VocBase/PhysicalCollection.cpp
VocBase/SingleServerTraverser.cpp
VocBase/TransactionManager.cpp
VocBase/Traverser.cpp

View File

@ -29,6 +29,7 @@
#include "Basics/VelocyPackHelper.h"
#include "Basics/WriteLocker.h"
#include "Basics/process-utils.h"
#include "Cluster/ClusterMethods.h"
#include "Logger/Logger.h"
#include "RestServer/DatabaseFeature.h"
#include "StorageEngine/EngineSelectorFeature.h"
@ -39,6 +40,7 @@
#include "MMFiles/MMFilesPrimaryIndex.h"
#include "MMFiles/MMFilesTransactionState.h"
#include "StorageEngine/StorageEngine.h"
#include "Utils/CollectionNameResolver.h"
#include "Utils/CollectionWriteLocker.h"
#include "Utils/OperationOptions.h"
#include "Utils/SingleCollectionTransaction.h"
@ -1481,6 +1483,137 @@ int MMFilesCollection::insertDocument(arangodb::transaction::Methods* trx,
return static_cast<MMFilesTransactionState*>(trx->state())->addOperation(revisionId, operation, marker, waitForSync);
}
int MMFilesCollection::update(arangodb::transaction::Methods* trx,
VPackSlice const newSlice,
ManagedDocumentResult& result,
OperationOptions& options,
TRI_voc_tick_t& resultMarkerTick, bool lock,
TRI_voc_rid_t& prevRev,
ManagedDocumentResult& previous,
TRI_voc_rid_t const& revisionId,
VPackSlice const key) {
bool const isEdgeCollection =
(_logicalCollection->type() == TRI_COL_TYPE_EDGE);
TRI_IF_FAILURE("UpdateDocumentNoLock") { return TRI_ERROR_DEBUG; }
bool const useDeadlockDetector =
(lock && !trx->isSingleOperationTransaction());
arangodb::CollectionWriteLocker collectionLocker(_logicalCollection,
useDeadlockDetector, lock);
// get the previous revision
int res = _logicalCollection->lookupDocument(trx, key, previous);
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
uint8_t const* vpack = previous.vpack();
VPackSlice oldDoc(vpack);
TRI_voc_rid_t oldRevisionId =
transaction::Methods::extractRevFromDocument(oldDoc);
prevRev = oldRevisionId;
TRI_IF_FAILURE("UpdateDocumentNoMarker") {
// test what happens when no marker can be created
return TRI_ERROR_DEBUG;
}
TRI_IF_FAILURE("UpdateDocumentNoMarkerExcept") {
// test what happens when no marker can be created
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
}
// Check old revision:
if (!options.ignoreRevs) {
TRI_voc_rid_t expectedRev = 0;
if (newSlice.isObject()) {
expectedRev = TRI_ExtractRevisionId(newSlice);
}
int res = _logicalCollection->checkRevision(trx, expectedRev, prevRev);
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
}
if (newSlice.length() <= 1) {
// no need to do anything
result = previous;
return TRI_ERROR_NO_ERROR;
}
// merge old and new values
TransactionBuilderLeaser builder(trx);
if (options.recoveryMarker == nullptr) {
mergeObjectsForUpdate(trx, oldDoc, newSlice, isEdgeCollection,
TRI_RidToString(revisionId), options.mergeObjects,
options.keepNull, *builder.get());
if (ServerState::isDBServer(trx->serverRole())) {
// Need to check that no sharding keys have changed:
if (arangodb::shardKeysChanged(_logicalCollection->dbName(),
trx->resolver()->getCollectionNameCluster(
_logicalCollection->planId()),
oldDoc, builder->slice(), false)) {
return TRI_ERROR_CLUSTER_MUST_NOT_CHANGE_SHARDING_ATTRIBUTES;
}
}
}
// create marker
MMFilesCrudMarker updateMarker(
TRI_DF_MARKER_VPACK_DOCUMENT,
static_cast<MMFilesTransactionState*>(trx->state())->idForMarker(), builder->slice());
MMFilesWalMarker const* marker;
if (options.recoveryMarker == nullptr) {
marker = &updateMarker;
} else {
marker = options.recoveryMarker;
}
VPackSlice const newDoc(marker->vpack());
MMFilesDocumentOperation operation(_logicalCollection,
TRI_VOC_DOCUMENT_OPERATION_UPDATE);
try {
insertRevision(revisionId, marker->vpack(), 0, true, true);
operation.setRevisions(DocumentDescriptor(oldRevisionId, oldDoc.begin()),
DocumentDescriptor(revisionId, newDoc.begin()));
if (oldRevisionId == revisionId) {
// update with same revision id => can happen if isRestore = true
result.clear();
}
res = _logicalCollection->updateDocument(trx, oldRevisionId, oldDoc,
revisionId, newDoc, operation,
marker, options.waitForSync);
} catch (basics::Exception const& ex) {
res = ex.code();
} catch (std::bad_alloc const&) {
res = TRI_ERROR_OUT_OF_MEMORY;
} catch (...) {
res = TRI_ERROR_INTERNAL;
}
if (res != TRI_ERROR_NO_ERROR) {
operation.revert(trx);
} else {
_logicalCollection->readRevision(trx, result, revisionId);
if (options.waitForSync) {
// store the tick that was used for writing the new document
resultMarkerTick = operation.tick();
}
}
return res;
}
int MMFilesCollection::remove(arangodb::transaction::Methods* trx, VPackSlice const slice,
ManagedDocumentResult& previous,
OperationOptions& options,
@ -1709,5 +1842,4 @@ int MMFilesCollection::removeFastPath(arangodb::transaction::Methods* trx,
}
return res;
}

View File

@ -161,17 +161,7 @@ class MMFilesCollection final : public PhysicalCollection {
Ditches* ditches() const override { return &_ditches; }
////////////////////////////////////
// -- SECTION DML Operations --
///////////////////////////////////
int insert(arangodb::transaction::Methods* trx,
arangodb::velocypack::Slice const newSlice,
arangodb::ManagedDocumentResult& result,
OperationOptions& options, TRI_voc_tick_t& resultMarkerTick,
bool lock) override;
/// @brief iterate all markers of a collection on load
/// @brief iterate all markers of a collection on load
int iterateMarkersOnLoad(arangodb::transaction::Methods* trx) override;
virtual bool isFullyCollected() const override;
@ -191,13 +181,33 @@ class MMFilesCollection final : public PhysicalCollection {
}
}
int remove(arangodb::transaction::Methods* trx, arangodb::velocypack::Slice const slice,
////////////////////////////////////
// -- SECTION DML Operations --
///////////////////////////////////
int insert(arangodb::transaction::Methods* trx,
arangodb::velocypack::Slice const newSlice,
arangodb::ManagedDocumentResult& result,
OperationOptions& options, TRI_voc_tick_t& resultMarkerTick,
bool lock) override;
int update(arangodb::transaction::Methods* trx,
arangodb::velocypack::Slice const newSlice,
arangodb::ManagedDocumentResult& result, OperationOptions& options,
TRI_voc_tick_t& resultMarkerTick, bool lock,
TRI_voc_rid_t& prevRev, ManagedDocumentResult& previous,
TRI_voc_rid_t const& revisionId,
arangodb::velocypack::Slice const key) override;
int remove(arangodb::transaction::Methods* trx,
arangodb::velocypack::Slice const slice,
arangodb::ManagedDocumentResult& previous,
OperationOptions& options, TRI_voc_tick_t& resultMarkerTick,
bool lock, TRI_voc_rid_t const& revisionId, TRI_voc_rid_t& prevRev,
arangodb::velocypack::Slice const toRemove) override;
int removeFastPath(arangodb::transaction::Methods* trx, TRI_voc_rid_t oldRevisionId,
int removeFastPath(arangodb::transaction::Methods* trx,
TRI_voc_rid_t oldRevisionId,
arangodb::velocypack::Slice const oldDoc,
OperationOptions& options,
TRI_voc_tick_t& resultMarkerTick, bool lock,
@ -211,85 +221,97 @@ class MMFilesCollection final : public PhysicalCollection {
static int OpenIteratorHandleDeletionMarker(TRI_df_marker_t const* marker,
MMFilesDatafile* datafile,
OpenIteratorState* state);
static bool OpenIterator(TRI_df_marker_t const* marker, OpenIteratorState* data, MMFilesDatafile* datafile);
static bool OpenIterator(TRI_df_marker_t const* marker,
OpenIteratorState* data, MMFilesDatafile* datafile);
/// @brief create statistics for a datafile, using the stats provided
void createStats(TRI_voc_fid_t fid, DatafileStatisticsContainer const& values) {
void createStats(TRI_voc_fid_t fid,
DatafileStatisticsContainer const& values) {
_datafileStatistics.create(fid, values);
}
/// @brief iterates over a collection
bool iterateDatafiles(std::function<bool(TRI_df_marker_t const*, MMFilesDatafile*)> const& cb);
/// @brief creates a datafile
MMFilesDatafile* createDatafile(TRI_voc_fid_t fid,
TRI_voc_size_t journalSize,
bool isCompactor);
}
/// @brief iterate over a vector of datafiles and pick those with a specific
/// data range
std::vector<DatafileDescription> datafilesInRange(TRI_voc_tick_t dataMin, TRI_voc_tick_t dataMax);
/// @brief closes the datafiles passed in the vector
bool closeDatafiles(std::vector<MMFilesDatafile*> const& files);
/// @brief iterates over a collection
bool iterateDatafiles(
std::function<bool(TRI_df_marker_t const*, MMFilesDatafile*)> const&
cb);
bool iterateDatafilesVector(std::vector<MMFilesDatafile*> const& files,
std::function<bool(TRI_df_marker_t const*, MMFilesDatafile*)> const& cb);
/// @brief creates a datafile
MMFilesDatafile* createDatafile(
TRI_voc_fid_t fid, TRI_voc_size_t journalSize, bool isCompactor);
MMFilesDocumentPosition lookupRevision(TRI_voc_rid_t revisionId) const;
/// @brief iterate over a vector of datafiles and pick those with a specific
/// data range
std::vector<DatafileDescription> datafilesInRange(TRI_voc_tick_t dataMin,
TRI_voc_tick_t dataMax);
uint8_t const* lookupRevisionVPack(TRI_voc_rid_t revisionId) const override;
uint8_t const* lookupRevisionVPackConditional(TRI_voc_rid_t revisionId, TRI_voc_tick_t maxTick, bool excludeWal) const override;
void insertRevision(TRI_voc_rid_t revisionId, uint8_t const* dataptr, TRI_voc_fid_t fid, bool isInWal, bool shouldLock) override;
void updateRevision(TRI_voc_rid_t revisionId, uint8_t const* dataptr, TRI_voc_fid_t fid, bool isInWal) override;
bool updateRevisionConditional(TRI_voc_rid_t revisionId, TRI_df_marker_t const* oldPosition, TRI_df_marker_t const* newPosition, TRI_voc_fid_t newFid, bool isInWal) override;
void removeRevision(TRI_voc_rid_t revisionId, bool updateStats) override;
/// @brief closes the datafiles passed in the vector
bool closeDatafiles(std::vector<MMFilesDatafile*> const& files);
int insertDocument(arangodb::transaction::Methods* trx,
TRI_voc_rid_t revisionId,
arangodb::velocypack::Slice const& doc,
MMFilesDocumentOperation& operation,
MMFilesWalMarker const* marker, bool& waitForSync);
bool iterateDatafilesVector(
std::vector<MMFilesDatafile*> const& files,
std::function<bool(TRI_df_marker_t const*, MMFilesDatafile*)> const&
cb);
private:
// SECTION: Index storage
MMFilesDocumentPosition lookupRevision(TRI_voc_rid_t revisionId) const;
int insertIndexes(transaction::Methods* trx, TRI_voc_rid_t revisionId,
velocypack::Slice const& doc);
uint8_t const* lookupRevisionVPack(TRI_voc_rid_t revisionId) const override;
uint8_t const* lookupRevisionVPackConditional(
TRI_voc_rid_t revisionId, TRI_voc_tick_t maxTick, bool excludeWal)
const override;
void insertRevision(TRI_voc_rid_t revisionId, uint8_t const* dataptr,
TRI_voc_fid_t fid, bool isInWal, bool shouldLock)
override;
void updateRevision(TRI_voc_rid_t revisionId, uint8_t const* dataptr,
TRI_voc_fid_t fid, bool isInWal) override;
bool updateRevisionConditional(TRI_voc_rid_t revisionId,
TRI_df_marker_t const* oldPosition,
TRI_df_marker_t const* newPosition,
TRI_voc_fid_t newFid, bool isInWal) override;
void removeRevision(TRI_voc_rid_t revisionId, bool updateStats) override;
int insertPrimaryIndex(transaction::Methods*, TRI_voc_rid_t revisionId,
velocypack::Slice const&);
int insertDocument(arangodb::transaction::Methods * trx,
TRI_voc_rid_t revisionId,
arangodb::velocypack::Slice const& doc,
MMFilesDocumentOperation& operation,
MMFilesWalMarker const* marker, bool& waitForSync);
int deletePrimaryIndex(transaction::Methods*, TRI_voc_rid_t revisionId,
velocypack::Slice const&);
private:
// SECTION: Index storage
int insertSecondaryIndexes(transaction::Methods*, TRI_voc_rid_t revisionId,
velocypack::Slice const&,
bool isRollback);
int insertIndexes(transaction::Methods * trx, TRI_voc_rid_t revisionId,
velocypack::Slice const& doc);
int deleteSecondaryIndexes(transaction::Methods*, TRI_voc_rid_t revisionId,
velocypack::Slice const&,
bool isRollback);
private:
mutable arangodb::Ditches _ditches;
int insertPrimaryIndex(transaction::Methods*, TRI_voc_rid_t revisionId,
velocypack::Slice const&);
arangodb::basics::ReadWriteLock _filesLock;
std::vector<MMFilesDatafile*> _datafiles; // all datafiles
std::vector<MMFilesDatafile*> _journals; // all journals
std::vector<MMFilesDatafile*> _compactors; // all compactor files
int deletePrimaryIndex(transaction::Methods*, TRI_voc_rid_t revisionId,
velocypack::Slice const&);
arangodb::basics::ReadWriteLock _compactionLock;
int insertSecondaryIndexes(transaction::Methods*, TRI_voc_rid_t revisionId,
velocypack::Slice const&, bool isRollback);
int64_t _initialCount;
int deleteSecondaryIndexes(transaction::Methods*, TRI_voc_rid_t revisionId,
velocypack::Slice const&, bool isRollback);
MMFilesDatafileStatistics _datafileStatistics;
private:
mutable arangodb::Ditches _ditches;
TRI_voc_rid_t _lastRevision;
arangodb::basics::ReadWriteLock _filesLock;
std::vector<MMFilesDatafile*> _datafiles; // all datafiles
std::vector<MMFilesDatafile*> _journals; // all journals
std::vector<MMFilesDatafile*> _compactors; // all compactor files
MMFilesRevisionsCache _revisionsCache;
std::atomic<int64_t> _uncollectedLogfileEntries;
arangodb::basics::ReadWriteLock _compactionLock;
int64_t _initialCount;
MMFilesDatafileStatistics _datafileStatistics;
TRI_voc_rid_t _lastRevision;
MMFilesRevisionsCache _revisionsCache;
std::atomic<int64_t> _uncollectedLogfileEntries;
};

View File

@ -2055,122 +2055,8 @@ int LogicalCollection::update(transaction::Methods* trx, VPackSlice const newSli
return TRI_ERROR_ARANGO_DOCUMENT_HANDLE_BAD;
}
bool const isEdgeCollection = (type() == TRI_COL_TYPE_EDGE);
TRI_IF_FAILURE("UpdateDocumentNoLock") { return TRI_ERROR_DEBUG; }
bool const useDeadlockDetector =
(lock && !trx->isSingleOperationTransaction());
arangodb::CollectionWriteLocker collectionLocker(this, useDeadlockDetector,
lock);
// get the previous revision
int res = lookupDocument(trx, key, previous);
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
uint8_t const* vpack = previous.vpack();
VPackSlice oldDoc(vpack);
TRI_voc_rid_t oldRevisionId = transaction::Methods::extractRevFromDocument(oldDoc);
prevRev = oldRevisionId;
TRI_IF_FAILURE("UpdateDocumentNoMarker") {
// test what happens when no marker can be created
return TRI_ERROR_DEBUG;
}
TRI_IF_FAILURE("UpdateDocumentNoMarkerExcept") {
// test what happens when no marker can be created
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
}
// Check old revision:
if (!options.ignoreRevs) {
TRI_voc_rid_t expectedRev = 0;
if (newSlice.isObject()) {
expectedRev = TRI_ExtractRevisionId(newSlice);
}
int res = checkRevision(trx, expectedRev, prevRev);
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
}
if (newSlice.length() <= 1) {
// no need to do anything
result = previous;
return TRI_ERROR_NO_ERROR;
}
// merge old and new values
TransactionBuilderLeaser builder(trx);
if (options.recoveryMarker == nullptr) {
mergeObjectsForUpdate(trx, oldDoc, newSlice, isEdgeCollection,
TRI_RidToString(revisionId), options.mergeObjects,
options.keepNull, *builder.get());
if (ServerState::isDBServer(trx->serverRole())) {
// Need to check that no sharding keys have changed:
if (arangodb::shardKeysChanged(
_vocbase->name(),
trx->resolver()->getCollectionNameCluster(planId()), oldDoc,
builder->slice(), false)) {
return TRI_ERROR_CLUSTER_MUST_NOT_CHANGE_SHARDING_ATTRIBUTES;
}
}
}
// create marker
MMFilesCrudMarker updateMarker(
TRI_DF_MARKER_VPACK_DOCUMENT,
static_cast<MMFilesTransactionState*>(trx->state())->idForMarker(), builder->slice());
MMFilesWalMarker const* marker;
if (options.recoveryMarker == nullptr) {
marker = &updateMarker;
} else {
marker = options.recoveryMarker;
}
VPackSlice const newDoc(marker->vpack());
MMFilesDocumentOperation operation(this, TRI_VOC_DOCUMENT_OPERATION_UPDATE);
try {
insertRevision(revisionId, marker->vpack(), 0, true);
operation.setRevisions(DocumentDescriptor(oldRevisionId, oldDoc.begin()),
DocumentDescriptor(revisionId, newDoc.begin()));
if (oldRevisionId == revisionId) {
// update with same revision id => can happen if isRestore = true
result.clear();
}
res = updateDocument(trx, oldRevisionId, oldDoc, revisionId, newDoc,
operation, marker, options.waitForSync);
} catch (basics::Exception const& ex) {
res = ex.code();
} catch (std::bad_alloc const&) {
res = TRI_ERROR_OUT_OF_MEMORY;
} catch (...) {
res = TRI_ERROR_INTERNAL;
}
if (res != TRI_ERROR_NO_ERROR) {
operation.revert(trx);
} else {
readRevision(trx, result, revisionId);
if (options.waitForSync) {
// store the tick that was used for writing the new document
resultMarkerTick = operation.tick();
}
}
return res;
return getPhysical()->update(trx, newSlice, result, options, resultMarkerTick,
lock, prevRev, previous, revisionId, key);
}
/// @brief replaces a document or edge in a collection
@ -3047,136 +2933,6 @@ void LogicalCollection::newObjectForReplace(
builder.close();
}
/// @brief merge two objects for update, oldValue must have correctly set
/// _key and _id attributes
void LogicalCollection::mergeObjectsForUpdate(
transaction::Methods* trx, VPackSlice const& oldValue,
VPackSlice const& newValue, bool isEdgeCollection, std::string const& rev,
bool mergeObjects, bool keepNull, VPackBuilder& b) {
b.openObject();
VPackSlice keySlice = oldValue.get(StaticStrings::KeyString);
VPackSlice idSlice = oldValue.get(StaticStrings::IdString);
TRI_ASSERT(!keySlice.isNone());
TRI_ASSERT(!idSlice.isNone());
// Find the attributes in the newValue object:
VPackSlice fromSlice;
VPackSlice toSlice;
std::unordered_map<std::string, VPackSlice> newValues;
{
VPackObjectIterator it(newValue, true);
while (it.valid()) {
std::string key = it.key().copyString();
if (!key.empty() && key[0] == '_' &&
(key == StaticStrings::KeyString || key == StaticStrings::IdString ||
key == StaticStrings::RevString ||
key == StaticStrings::FromString ||
key == StaticStrings::ToString)) {
// note _from and _to and ignore _id, _key and _rev
if (key == StaticStrings::FromString) {
fromSlice = it.value();
} else if (key == StaticStrings::ToString) {
toSlice = it.value();
} // else do nothing
} else {
// regular attribute
newValues.emplace(std::move(key), it.value());
}
it.next();
}
}
if (isEdgeCollection) {
if (fromSlice.isNone()) {
fromSlice = oldValue.get(StaticStrings::FromString);
}
if (toSlice.isNone()) {
toSlice = oldValue.get(StaticStrings::ToString);
}
}
// add system attributes first, in this order:
// _key, _id, _from, _to, _rev
// _key
b.add(StaticStrings::KeyString, keySlice);
// _id
b.add(StaticStrings::IdString, idSlice);
// _from, _to
if (isEdgeCollection) {
TRI_ASSERT(!fromSlice.isNone());
TRI_ASSERT(!toSlice.isNone());
b.add(StaticStrings::FromString, fromSlice);
b.add(StaticStrings::ToString, toSlice);
}
// _rev
b.add(StaticStrings::RevString, VPackValue(rev));
// add other attributes after the system attributes
{
VPackObjectIterator it(oldValue, true);
while (it.valid()) {
std::string key = it.key().copyString();
// exclude system attributes in old value now
if (!key.empty() && key[0] == '_' &&
(key == StaticStrings::KeyString || key == StaticStrings::IdString ||
key == StaticStrings::RevString ||
key == StaticStrings::FromString ||
key == StaticStrings::ToString)) {
it.next();
continue;
}
auto found = newValues.find(key);
if (found == newValues.end()) {
// use old value
b.add(key, it.value());
} else if (mergeObjects && it.value().isObject() &&
(*found).second.isObject()) {
// merge both values
auto& value = (*found).second;
if (keepNull || (!value.isNone() && !value.isNull())) {
VPackBuilder sub =
VPackCollection::merge(it.value(), value, true, !keepNull);
b.add(key, sub.slice());
}
// clear the value in the map so its not added again
(*found).second = VPackSlice();
} else {
// use new value
auto& value = (*found).second;
if (keepNull || (!value.isNone() && !value.isNull())) {
b.add(key, value);
}
// clear the value in the map so its not added again
(*found).second = VPackSlice();
}
it.next();
}
}
// add remaining values that were only in new object
for (auto const& it : newValues) {
VPackSlice const& s = it.second;
if (s.isNone()) {
continue;
}
if (!keepNull && s.isNull()) {
continue;
}
b.add(it.first, s);
}
b.close();
}
/// @brief new object for remove, must have _key set
void LogicalCollection::newObjectForRemove(transaction::Methods* trx,
VPackSlice const& oldValue,

View File

@ -456,7 +456,6 @@ class LogicalCollection {
int checkRevision(transaction::Methods*, TRI_voc_rid_t expected,
TRI_voc_rid_t found);
private:
int updateDocument(transaction::Methods*, TRI_voc_rid_t oldRevisionId,
velocypack::Slice const& oldDoc,
TRI_voc_rid_t newRevisionId,
@ -464,6 +463,7 @@ class LogicalCollection {
MMFilesDocumentOperation&, MMFilesWalMarker const*,
bool& waitForSync);
private:
// TODO REMOVE HERE is now in SE Collection
int insertPrimaryIndex(transaction::Methods*, TRI_voc_rid_t revisionId,
velocypack::Slice const&);
@ -500,14 +500,6 @@ class LogicalCollection {
bool isEdgeCollection, std::string const& rev,
velocypack::Builder& builder);
/// @brief merge two objects for update
void mergeObjectsForUpdate(transaction::Methods* trx,
velocypack::Slice const& oldValue,
velocypack::Slice const& newValue,
bool isEdgeCollection, std::string const& rev,
bool mergeObjects, bool keepNull,
velocypack::Builder& b);
/// @brief new object for remove, must have _key set
void newObjectForRemove(transaction::Methods* trx,
velocypack::Slice const& oldValue,

View File

@ -0,0 +1,167 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Michael Hackstein
////////////////////////////////////////////////////////////////////////////////
#include "PhysicalCollection.h"
#include "Basics/StaticStrings.h"
#include <velocypack/Builder.h>
#include <velocypack/Collection.h>
#include <velocypack/Iterator.h>
#include <velocypack/Slice.h>
#include <velocypack/velocypack-aliases.h>
using namespace arangodb;
/// @brief merge two objects for update, oldValue must have correctly set
/// _key and _id attributes
void PhysicalCollection::mergeObjectsForUpdate(
transaction::Methods* trx, VPackSlice const& oldValue,
VPackSlice const& newValue, bool isEdgeCollection, std::string const& rev,
bool mergeObjects, bool keepNull, VPackBuilder& b) {
b.openObject();
VPackSlice keySlice = oldValue.get(StaticStrings::KeyString);
VPackSlice idSlice = oldValue.get(StaticStrings::IdString);
TRI_ASSERT(!keySlice.isNone());
TRI_ASSERT(!idSlice.isNone());
// Find the attributes in the newValue object:
VPackSlice fromSlice;
VPackSlice toSlice;
std::unordered_map<std::string, VPackSlice> newValues;
{
VPackObjectIterator it(newValue, true);
while (it.valid()) {
std::string key = it.key().copyString();
if (!key.empty() && key[0] == '_' &&
(key == StaticStrings::KeyString || key == StaticStrings::IdString ||
key == StaticStrings::RevString ||
key == StaticStrings::FromString ||
key == StaticStrings::ToString)) {
// note _from and _to and ignore _id, _key and _rev
if (key == StaticStrings::FromString) {
fromSlice = it.value();
} else if (key == StaticStrings::ToString) {
toSlice = it.value();
} // else do nothing
} else {
// regular attribute
newValues.emplace(std::move(key), it.value());
}
it.next();
}
}
if (isEdgeCollection) {
if (fromSlice.isNone()) {
fromSlice = oldValue.get(StaticStrings::FromString);
}
if (toSlice.isNone()) {
toSlice = oldValue.get(StaticStrings::ToString);
}
}
// add system attributes first, in this order:
// _key, _id, _from, _to, _rev
// _key
b.add(StaticStrings::KeyString, keySlice);
// _id
b.add(StaticStrings::IdString, idSlice);
// _from, _to
if (isEdgeCollection) {
TRI_ASSERT(!fromSlice.isNone());
TRI_ASSERT(!toSlice.isNone());
b.add(StaticStrings::FromString, fromSlice);
b.add(StaticStrings::ToString, toSlice);
}
// _rev
b.add(StaticStrings::RevString, VPackValue(rev));
// add other attributes after the system attributes
{
VPackObjectIterator it(oldValue, true);
while (it.valid()) {
std::string key = it.key().copyString();
// exclude system attributes in old value now
if (!key.empty() && key[0] == '_' &&
(key == StaticStrings::KeyString || key == StaticStrings::IdString ||
key == StaticStrings::RevString ||
key == StaticStrings::FromString ||
key == StaticStrings::ToString)) {
it.next();
continue;
}
auto found = newValues.find(key);
if (found == newValues.end()) {
// use old value
b.add(key, it.value());
} else if (mergeObjects && it.value().isObject() &&
(*found).second.isObject()) {
// merge both values
auto& value = (*found).second;
if (keepNull || (!value.isNone() && !value.isNull())) {
VPackBuilder sub =
VPackCollection::merge(it.value(), value, true, !keepNull);
b.add(key, sub.slice());
}
// clear the value in the map so its not added again
(*found).second = VPackSlice();
} else {
// use new value
auto& value = (*found).second;
if (keepNull || (!value.isNone() && !value.isNull())) {
b.add(key, value);
}
// clear the value in the map so its not added again
(*found).second = VPackSlice();
}
it.next();
}
}
// add remaining values that were only in new object
for (auto const& it : newValues) {
VPackSlice const& s = it.second;
if (s.isNone()) {
continue;
}
if (!keepNull && s.isNull()) {
continue;
}
b.add(it.first, s);
}
b.close();
}

View File

@ -115,6 +115,14 @@ class PhysicalCollection {
OperationOptions& options,
TRI_voc_tick_t& resultMarkerTick, bool lock) = 0;
virtual int update(arangodb::transaction::Methods* trx,
VPackSlice const newSlice, ManagedDocumentResult& result,
OperationOptions& options,
TRI_voc_tick_t& resultMarkerTick, bool lock,
TRI_voc_rid_t& prevRev, ManagedDocumentResult& previous,
TRI_voc_rid_t const& revisionId,
arangodb::velocypack::Slice const key) = 0;
virtual int remove(arangodb::transaction::Methods* trx,
arangodb::velocypack::Slice const slice,
arangodb::ManagedDocumentResult& previous,
@ -131,6 +139,16 @@ class PhysicalCollection {
TRI_voc_rid_t const& revisionId,
arangodb::velocypack::Slice const toRemove) = 0;
protected:
/// @brief merge two objects for update
void mergeObjectsForUpdate(transaction::Methods* trx,
velocypack::Slice const& oldValue,
velocypack::Slice const& newValue,
bool isEdgeCollection, std::string const& rev,
bool mergeObjects, bool keepNull,
velocypack::Builder& b);
protected:
LogicalCollection* _logicalCollection;
};