1
0
Fork 0

Moved pre-commit document modifications out of logical-collection into the physical collection. They are only relevant when actually writing to disc.

This commit is contained in:
Michael Hackstein 2017-02-17 16:07:02 +01:00
parent 5366f09ec8
commit b84b05244f
8 changed files with 200 additions and 192 deletions

View File

@ -88,8 +88,6 @@ class IndexIterator {
virtual bool next(TokenCallback const& callback, size_t limit) = 0;
virtual bool nextExtra(ExtraCallback const& callback, size_t limit);
// virtual DocumentIdentifierToken next();
virtual void reset();
virtual void skip(uint64_t count, uint64_t& skipped);

View File

@ -26,6 +26,7 @@
#include "Basics/FileUtils.h"
#include "Basics/ReadLocker.h"
#include "Basics/StaticStrings.h"
#include "Basics/Timers.h"
#include "Basics/VelocyPackHelper.h"
#include "Basics/WriteLocker.h"
#include "Basics/encoding.h"
@ -1983,7 +1984,8 @@ void MMFilesCollection::truncate(transaction::Methods* trx, OperationOptions& op
if (vpack != nullptr) {
builder->clear();
VPackSlice oldDoc(vpack);
_logicalCollection->newObjectForRemove(trx, oldDoc, TRI_RidToString(oldRevisionId), *builder.get());
newObjectForRemove(trx, oldDoc, TRI_RidToString(oldRevisionId),
*builder.get());
TRI_voc_rid_t revisionId = TRI_HybridLogicalClock();
int res = removeFastPath(trx, oldRevisionId, VPackSlice(vpack), options,
@ -2000,10 +2002,61 @@ void MMFilesCollection::truncate(transaction::Methods* trx, OperationOptions& op
}
int MMFilesCollection::insert(transaction::Methods* trx,
VPackSlice const newSlice,
VPackSlice const slice,
ManagedDocumentResult& result,
OperationOptions& options,
TRI_voc_tick_t& resultMarkerTick, bool lock) {
VPackSlice fromSlice;
VPackSlice toSlice;
bool const isEdgeCollection =
(_logicalCollection->type() == TRI_COL_TYPE_EDGE);
if (isEdgeCollection) {
// _from:
fromSlice = slice.get(StaticStrings::FromString);
if (!fromSlice.isString()) {
return TRI_ERROR_ARANGO_INVALID_EDGE_ATTRIBUTE;
}
VPackValueLength len;
char const* docId = fromSlice.getString(len);
size_t split;
if (!TRI_ValidateDocumentIdKeyGenerator(docId, static_cast<size_t>(len),
&split)) {
return TRI_ERROR_ARANGO_INVALID_EDGE_ATTRIBUTE;
}
// _to:
toSlice = slice.get(StaticStrings::ToString);
if (!toSlice.isString()) {
return TRI_ERROR_ARANGO_INVALID_EDGE_ATTRIBUTE;
}
docId = toSlice.getString(len);
if (!TRI_ValidateDocumentIdKeyGenerator(docId, static_cast<size_t>(len),
&split)) {
return TRI_ERROR_ARANGO_INVALID_EDGE_ATTRIBUTE;
}
}
transaction::BuilderLeaser builder(trx);
VPackSlice newSlice;
int res = TRI_ERROR_NO_ERROR;
if (options.recoveryMarker == nullptr) {
TIMER_START(TRANSACTION_NEW_OBJECT_FOR_INSERT);
res = newObjectForInsert(trx, slice, fromSlice, toSlice, isEdgeCollection,
*builder.get(), options.isRestore);
TIMER_STOP(TRANSACTION_NEW_OBJECT_FOR_INSERT);
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
newSlice = builder->slice();
} else {
TRI_ASSERT(slice.isObject());
// we can get away with the fast hash function here, as key values are
// restricted to strings
newSlice = slice;
}
// create marker
MMFilesCrudMarker insertMarker(
TRI_DF_MARKER_VPACK_DOCUMENT,
@ -2052,7 +2105,7 @@ int MMFilesCollection::insert(transaction::Methods* trx,
return TRI_ERROR_INTERNAL;
}
int res = TRI_ERROR_NO_ERROR;
res = TRI_ERROR_NO_ERROR;
{
// use lock?
bool const useDeadlockDetector =
@ -2598,10 +2651,12 @@ int MMFilesCollection::remove(arangodb::transaction::Methods* trx, VPackSlice co
OperationOptions& options,
TRI_voc_tick_t& resultMarkerTick, bool lock,
TRI_voc_rid_t const& revisionId,
TRI_voc_rid_t& prevRev,
VPackSlice const toRemove) {
TRI_voc_rid_t& prevRev) {
prevRev = 0;
transaction::BuilderLeaser builder(trx);
newObjectForRemove(trx, slice, TRI_RidToString(revisionId), *builder.get());
TRI_IF_FAILURE("RemoveDocumentNoMarker") {
// test what happens when no marker can be created
return TRI_ERROR_DEBUG;
@ -2616,7 +2671,7 @@ int MMFilesCollection::remove(arangodb::transaction::Methods* trx, VPackSlice co
MMFilesCrudMarker removeMarker(
TRI_DF_MARKER_VPACK_REMOVE,
static_cast<MMFilesTransactionState*>(trx->state())->idForMarker(),
toRemove);
builder->slice());
MMFilesWalMarker const* marker;
if (options.recoveryMarker == nullptr) {

View File

@ -280,8 +280,8 @@ class MMFilesCollection final : public PhysicalCollection {
arangodb::velocypack::Slice const slice,
arangodb::ManagedDocumentResult& previous,
OperationOptions& options, TRI_voc_tick_t& resultMarkerTick,
bool lock, TRI_voc_rid_t const& revisionId, TRI_voc_rid_t& prevRev,
arangodb::velocypack::Slice const toRemove) override;
bool lock, TRI_voc_rid_t const& revisionId,
TRI_voc_rid_t& prevRev) override;
int rollbackOperation(transaction::Methods*, TRI_voc_document_operation_e,
TRI_voc_rid_t oldRevisionId,

View File

@ -29,6 +29,7 @@
#include "Basics/StaticStrings.h"
#include "Basics/VelocyPackHelper.h"
#include "Indexes/IndexLookupContext.h"
#include "MMFiles/MMFilesCollection.h"
#include "MMFiles/MMFilesIndexElement.h"
#include "MMFiles/MMFilesPrimaryIndex.h"
#include "MMFiles/MMFilesPersistentIndexFeature.h"

View File

@ -1474,59 +1474,8 @@ int LogicalCollection::insert(transaction::Methods* trx, VPackSlice const slice,
OperationOptions& options,
TRI_voc_tick_t& resultMarkerTick, bool lock) {
resultMarkerTick = 0;
VPackSlice fromSlice;
VPackSlice toSlice;
bool const isEdgeCollection = (_type == TRI_COL_TYPE_EDGE);
if (isEdgeCollection) {
// _from:
fromSlice = slice.get(StaticStrings::FromString);
if (!fromSlice.isString()) {
return TRI_ERROR_ARANGO_INVALID_EDGE_ATTRIBUTE;
}
VPackValueLength len;
char const* docId = fromSlice.getString(len);
size_t split;
if (!TRI_ValidateDocumentIdKeyGenerator(docId, static_cast<size_t>(len),
&split)) {
return TRI_ERROR_ARANGO_INVALID_EDGE_ATTRIBUTE;
}
// _to:
toSlice = slice.get(StaticStrings::ToString);
if (!toSlice.isString()) {
return TRI_ERROR_ARANGO_INVALID_EDGE_ATTRIBUTE;
}
docId = toSlice.getString(len);
if (!TRI_ValidateDocumentIdKeyGenerator(docId, static_cast<size_t>(len),
&split)) {
return TRI_ERROR_ARANGO_INVALID_EDGE_ATTRIBUTE;
}
}
transaction::BuilderLeaser builder(trx);
VPackSlice newSlice;
int res = TRI_ERROR_NO_ERROR;
if (options.recoveryMarker == nullptr) {
TIMER_START(TRANSACTION_NEW_OBJECT_FOR_INSERT);
res = newObjectForInsert(trx, slice, fromSlice, toSlice, isEdgeCollection,
*builder.get(), options.isRestore);
TIMER_STOP(TRANSACTION_NEW_OBJECT_FOR_INSERT);
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
newSlice = builder->slice();
} else {
TRI_ASSERT(slice.isObject());
// we can get away with the fast hash function here, as key values are
// restricted to strings
newSlice = slice;
}
res = getPhysical()->insert(trx, newSlice, result, options, resultMarkerTick,
lock);
return res;
return getPhysical()->insert(trx, slice, result, options, resultMarkerTick,
lock);
}
/// @brief updates a document or edge in a collection
@ -1652,11 +1601,8 @@ int LogicalCollection::remove(transaction::Methods* trx,
revisionId = TRI_HybridLogicalClock();
}
transaction::BuilderLeaser builder(trx);
newObjectForRemove(trx, slice, TRI_RidToString(revisionId), *builder.get());
return getPhysical()->remove(trx, slice, previous, options, resultMarkerTick,
lock, revisionId, prevRev, builder->slice());
lock, revisionId, prevRev);
}
void LogicalCollection::sizeHint(transaction::Methods* trx, int64_t hint) {
@ -1671,111 +1617,6 @@ void LogicalCollection::sizeHint(transaction::Methods* trx, int64_t hint) {
}
}
/// @brief new object for insert, computes the hash of the key
int LogicalCollection::newObjectForInsert(
transaction::Methods* trx, VPackSlice const& value, VPackSlice const& fromSlice,
VPackSlice const& toSlice, bool isEdgeCollection, VPackBuilder& builder,
bool isRestore) {
TRI_voc_tick_t newRev = 0;
builder.openObject();
// add system attributes first, in this order:
// _key, _id, _from, _to, _rev
// _key
VPackSlice s = value.get(StaticStrings::KeyString);
if (s.isNone()) {
TRI_ASSERT(!isRestore); // need key in case of restore
newRev = TRI_HybridLogicalClock();
std::string keyString = _keyGenerator->generate(TRI_NewTickServer());
if (keyString.empty()) {
return TRI_ERROR_ARANGO_OUT_OF_KEYS;
}
uint8_t* where =
builder.add(StaticStrings::KeyString, VPackValue(keyString));
s = VPackSlice(where); // point to newly built value, the string
} else if (!s.isString()) {
return TRI_ERROR_ARANGO_DOCUMENT_KEY_BAD;
} else {
std::string keyString = s.copyString();
int res = _keyGenerator->validate(keyString, isRestore);
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
builder.add(StaticStrings::KeyString, s);
}
// _id
uint8_t* p = builder.add(StaticStrings::IdString,
VPackValuePair(9ULL, VPackValueType::Custom));
*p++ = 0xf3; // custom type for _id
if (trx->state()->isDBServer() && !_isSystem) {
// db server in cluster, note: the local collections _statistics,
// _statisticsRaw and _statistics15 (which are the only system
// collections)
// must not be treated as shards but as local collections
encoding::storeNumber<uint64_t>(p, _planId, sizeof(uint64_t));
} else {
// local server
encoding::storeNumber<uint64_t>(p, _cid, sizeof(uint64_t));
}
// _from and _to
if (isEdgeCollection) {
TRI_ASSERT(!fromSlice.isNone());
TRI_ASSERT(!toSlice.isNone());
builder.add(StaticStrings::FromString, fromSlice);
builder.add(StaticStrings::ToString, toSlice);
}
// _rev
std::string newRevSt;
if (isRestore) {
VPackSlice oldRev = TRI_ExtractRevisionIdAsSlice(value);
if (!oldRev.isString()) {
return TRI_ERROR_ARANGO_DOCUMENT_REV_BAD;
}
bool isOld;
VPackValueLength l;
char const* p = oldRev.getString(l);
TRI_voc_rid_t oldRevision = TRI_StringToRid(p, l, isOld, false);
if (isOld || oldRevision == UINT64_MAX) {
oldRevision = TRI_HybridLogicalClock();
}
newRevSt = TRI_RidToString(oldRevision);
} else {
if (newRev == 0) {
newRev = TRI_HybridLogicalClock();
}
newRevSt = TRI_RidToString(newRev);
}
builder.add(StaticStrings::RevString, VPackValue(newRevSt));
// add other attributes after the system attributes
TRI_SanitizeObjectWithEdges(value, builder);
builder.close();
return TRI_ERROR_NO_ERROR;
}
/// @brief new object for remove, must have _key set
void LogicalCollection::newObjectForRemove(transaction::Methods* trx,
VPackSlice const& oldValue,
std::string const& rev,
VPackBuilder& builder) {
// create an object consisting of _key and _rev (in this order)
builder.openObject();
if (oldValue.isString()) {
builder.add(StaticStrings::KeyString, oldValue);
} else {
VPackSlice s = oldValue.get(StaticStrings::KeyString);
TRI_ASSERT(s.isString());
builder.add(StaticStrings::KeyString, s);
}
builder.add(StaticStrings::RevString, VPackValue(rev));
builder.close();
}
bool LogicalCollection::readRevision(transaction::Methods* trx,
ManagedDocumentResult& result,
TRI_voc_rid_t revisionId) {

View File

@ -356,24 +356,7 @@ class LogicalCollection {
// @brief create index with the given definition.
bool openIndex(velocypack::Slice const&, transaction::Methods*);
// SECTION: Document pre commit preperation (only local)
/// @brief new object for insert, value must have _key set correctly.
int newObjectForInsert(transaction::Methods* trx,
velocypack::Slice const& value,
velocypack::Slice const& fromSlice,
velocypack::Slice const& toSlice,
bool isEdgeCollection,
velocypack::Builder& builder,
bool isRestore);
public: // TODO FIXME
/// @brief new object for remove, must have _key set
void newObjectForRemove(transaction::Methods* trx,
velocypack::Slice const& oldValue,
std::string const& rev,
velocypack::Builder& builder);
private:
private:
void increaseInternalVersion();
protected:

View File

@ -23,7 +23,13 @@
#include "PhysicalCollection.h"
#include "Basics/encoding.h"
#include "Basics/StaticStrings.h"
#include "StorageEngine/TransactionState.h"
#include "Transaction/Methods.h"
#include "VocBase/KeyGenerator.h"
#include "VocBase/LogicalCollection.h"
#include "VocBase/ticks.h"
#include "VocBase/vocbase.h"
#include <velocypack/Builder.h>
@ -165,6 +171,114 @@ void PhysicalCollection::mergeObjectsForUpdate(
b.close();
}
/// @brief new object for insert, computes the hash of the key
int PhysicalCollection::newObjectForInsert(
transaction::Methods* trx, VPackSlice const& value,
VPackSlice const& fromSlice, VPackSlice const& toSlice,
bool isEdgeCollection, VPackBuilder& builder, bool isRestore) const {
TRI_voc_tick_t newRev = 0;
builder.openObject();
// add system attributes first, in this order:
// _key, _id, _from, _to, _rev
// _key
VPackSlice s = value.get(StaticStrings::KeyString);
if (s.isNone()) {
TRI_ASSERT(!isRestore); // need key in case of restore
newRev = TRI_HybridLogicalClock();
std::string keyString =
_logicalCollection->keyGenerator()->generate(TRI_NewTickServer());
if (keyString.empty()) {
return TRI_ERROR_ARANGO_OUT_OF_KEYS;
}
uint8_t* where =
builder.add(StaticStrings::KeyString, VPackValue(keyString));
s = VPackSlice(where); // point to newly built value, the string
} else if (!s.isString()) {
return TRI_ERROR_ARANGO_DOCUMENT_KEY_BAD;
} else {
std::string keyString = s.copyString();
int res =
_logicalCollection->keyGenerator()->validate(keyString, isRestore);
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
builder.add(StaticStrings::KeyString, s);
}
// _id
uint8_t* p = builder.add(StaticStrings::IdString,
VPackValuePair(9ULL, VPackValueType::Custom));
*p++ = 0xf3; // custom type for _id
if (trx->state()->isDBServer() && !_logicalCollection->isSystem()) {
// db server in cluster, note: the local collections _statistics,
// _statisticsRaw and _statistics15 (which are the only system
// collections)
// must not be treated as shards but as local collections
encoding::storeNumber<uint64_t>(p, _logicalCollection->planId(), sizeof(uint64_t));
} else {
// local server
encoding::storeNumber<uint64_t>(p, _logicalCollection->cid(), sizeof(uint64_t));
}
// _from and _to
if (isEdgeCollection) {
TRI_ASSERT(!fromSlice.isNone());
TRI_ASSERT(!toSlice.isNone());
builder.add(StaticStrings::FromString, fromSlice);
builder.add(StaticStrings::ToString, toSlice);
}
// _rev
std::string newRevSt;
if (isRestore) {
VPackSlice oldRev = TRI_ExtractRevisionIdAsSlice(value);
if (!oldRev.isString()) {
return TRI_ERROR_ARANGO_DOCUMENT_REV_BAD;
}
bool isOld;
VPackValueLength l;
char const* p = oldRev.getString(l);
TRI_voc_rid_t oldRevision = TRI_StringToRid(p, l, isOld, false);
if (isOld || oldRevision == UINT64_MAX) {
oldRevision = TRI_HybridLogicalClock();
}
newRevSt = TRI_RidToString(oldRevision);
} else {
if (newRev == 0) {
newRev = TRI_HybridLogicalClock();
}
newRevSt = TRI_RidToString(newRev);
}
builder.add(StaticStrings::RevString, VPackValue(newRevSt));
// add other attributes after the system attributes
TRI_SanitizeObjectWithEdges(value, builder);
builder.close();
return TRI_ERROR_NO_ERROR;
}
/// @brief new object for remove, must have _key set
void PhysicalCollection::newObjectForRemove(transaction::Methods* trx,
VPackSlice const& oldValue,
std::string const& rev,
VPackBuilder& builder) const {
// create an object consisting of _key and _rev (in this order)
builder.openObject();
if (oldValue.isString()) {
builder.add(StaticStrings::KeyString, oldValue);
} else {
VPackSlice s = oldValue.get(StaticStrings::KeyString);
TRI_ASSERT(s.isString());
builder.add(StaticStrings::KeyString, s);
}
builder.add(StaticStrings::RevString, VPackValue(rev));
builder.close();
}
/// @brief new object for replace, oldValue must have _key and _id correctly
/// set
void PhysicalCollection::newObjectForReplace(

View File

@ -138,11 +138,27 @@ class PhysicalCollection {
arangodb::ManagedDocumentResult& previous,
OperationOptions& options,
TRI_voc_tick_t& resultMarkerTick, bool lock,
TRI_voc_rid_t const& revisionId, TRI_voc_rid_t& prevRev,
arangodb::velocypack::Slice const toRemove) = 0;
TRI_voc_rid_t const& revisionId, TRI_voc_rid_t& prevRev) = 0;
protected:
// SECTION: Document pre commit preperation
/// @brief new object for insert, value must have _key set correctly.
int newObjectForInsert(transaction::Methods* trx,
velocypack::Slice const& value,
velocypack::Slice const& fromSlice,
velocypack::Slice const& toSlice,
bool isEdgeCollection,
velocypack::Builder& builder,
bool isRestore) const;
/// @brief new object for remove, must have _key set
void newObjectForRemove(transaction::Methods* trx,
velocypack::Slice const& oldValue,
std::string const& rev, velocypack::Builder& builder) const;
/// @brief merge two objects for update
void mergeObjectsForUpdate(transaction::Methods* trx,
velocypack::Slice const& oldValue,