1
0
Fork 0

get rid of two pointers in each transaction operation

This commit is contained in:
jsteemann 2016-09-15 08:16:02 +02:00
parent f3a2a91f12
commit a9cb07678d
7 changed files with 51 additions and 59 deletions

View File

@ -985,10 +985,9 @@ void Transaction::extractKeyAndRevFromDocument(VPackSlice slice,
} else if (*p == basics::VelocyPackHelper::RevAttribute) { } else if (*p == basics::VelocyPackHelper::RevAttribute) {
VPackSlice revSlice(p + 1); VPackSlice revSlice(p + 1);
if (revSlice.isString()) { if (revSlice.isString()) {
bool isOld;
VPackValueLength l; VPackValueLength l;
char const* p = revSlice.getString(l); char const* p = revSlice.getString(l);
revisionId = TRI_StringToRid(p, l, isOld); revisionId = TRI_StringToRid(p, l);
} else if (revSlice.isNumber()) { } else if (revSlice.isNumber()) {
revisionId = revSlice.getNumericValue<TRI_voc_rid_t>(); revisionId = revSlice.getNumericValue<TRI_voc_rid_t>();
} }
@ -1006,10 +1005,9 @@ void Transaction::extractKeyAndRevFromDocument(VPackSlice slice,
// fall back to regular lookup // fall back to regular lookup
{ {
keySlice = slice.get(StaticStrings::KeyString); keySlice = slice.get(StaticStrings::KeyString);
bool isOld;
VPackValueLength l; VPackValueLength l;
char const* p = slice.get(StaticStrings::RevString).getString(l); char const* p = slice.get(StaticStrings::RevString).getString(l);
revisionId = TRI_StringToRid(p, l, isOld); revisionId = TRI_StringToRid(p, l);
} }
} }
@ -1028,10 +1026,9 @@ TRI_voc_rid_t Transaction::extractRevFromDocument(VPackSlice slice) {
if (*p == basics::VelocyPackHelper::RevAttribute) { if (*p == basics::VelocyPackHelper::RevAttribute) {
VPackSlice revSlice(p + 1); VPackSlice revSlice(p + 1);
if (revSlice.isString()) { if (revSlice.isString()) {
bool isOld;
VPackValueLength l; VPackValueLength l;
char const* p = revSlice.getString(l); char const* p = revSlice.getString(l);
return TRI_StringToRid(p, l, isOld); return TRI_StringToRid(p, l);
} else if (revSlice.isNumber()) { } else if (revSlice.isNumber()) {
return revSlice.getNumericValue<TRI_voc_rid_t>(); return revSlice.getNumericValue<TRI_voc_rid_t>();
} }

View File

@ -69,7 +69,9 @@ using Helper = arangodb::basics::VelocyPackHelper;
/// forward /// forward
int TRI_AddOperationTransaction(TRI_transaction_t*, int TRI_AddOperationTransaction(TRI_transaction_t*,
arangodb::wal::DocumentOperation&, bool&); arangodb::wal::DocumentOperation&,
arangodb::wal::Marker const* marker,
bool&);
/// @brief helper struct for filling indexes /// @brief helper struct for filling indexes
class IndexFiller { class IndexFiller {
@ -1840,10 +1842,7 @@ int LogicalCollection::insert(Transaction* trx, VPackSlice const slice,
} }
arangodb::wal::DocumentOperation operation( arangodb::wal::DocumentOperation operation(
trx, marker, this, TRI_VOC_DOCUMENT_OPERATION_INSERT); trx, this, TRI_VOC_DOCUMENT_OPERATION_INSERT);
// DocumentOperation has taken over the ownership for the marker
TRI_ASSERT(operation.marker != nullptr);
TRI_IF_FAILURE("InsertDocumentNoHeader") { TRI_IF_FAILURE("InsertDocumentNoHeader") {
// test what happens if no header can be acquired // test what happens if no header can be acquired
@ -1867,7 +1866,7 @@ int LogicalCollection::insert(Transaction* trx, VPackSlice const slice,
} }
// update the header we got // update the header we got
void* mem = operation.marker->vpack(); void* mem = marker->vpack();
TRI_ASSERT(mem != nullptr); TRI_ASSERT(mem != nullptr);
header->setHash(hash); header->setHash(hash);
header->setVPack(mem); // PROTECTED by trx in trxCollection header->setVPack(mem); // PROTECTED by trx in trxCollection
@ -1875,7 +1874,7 @@ int LogicalCollection::insert(Transaction* trx, VPackSlice const slice,
TRI_ASSERT(VPackSlice(header->vpack()).isObject()); TRI_ASSERT(VPackSlice(header->vpack()).isObject());
// insert into indexes // insert into indexes
res = insertDocument(trx, header, operation, mptr, options.waitForSync); res = insertDocument(trx, header, operation, marker, mptr, options.waitForSync);
if (res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_NO_ERROR) {
operation.revert(); operation.revert();
@ -2005,15 +2004,12 @@ int LogicalCollection::update(Transaction* trx, VPackSlice const newSlice,
} }
arangodb::wal::DocumentOperation operation( arangodb::wal::DocumentOperation operation(
trx, marker, this, TRI_VOC_DOCUMENT_OPERATION_UPDATE); trx, this, TRI_VOC_DOCUMENT_OPERATION_UPDATE);
// DocumentOperation has taken over the ownership for the marker
TRI_ASSERT(operation.marker != nullptr);
operation.header = oldHeader; operation.header = oldHeader;
operation.init(); operation.init();
res = updateDocument(trx, revisionId, oldHeader, operation, mptr, options.waitForSync); res = updateDocument(trx, revisionId, oldHeader, operation, marker, mptr, options.waitForSync);
if (res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_NO_ERROR) {
operation.revert(); operation.revert();
@ -2147,15 +2143,12 @@ int LogicalCollection::replace(Transaction* trx, VPackSlice const newSlice,
marker = options.recoveryMarker; marker = options.recoveryMarker;
} }
arangodb::wal::DocumentOperation operation(trx, marker, this, TRI_VOC_DOCUMENT_OPERATION_REPLACE); arangodb::wal::DocumentOperation operation(trx, this, TRI_VOC_DOCUMENT_OPERATION_REPLACE);
// DocumentOperation has taken over the ownership for the marker
TRI_ASSERT(operation.marker != nullptr);
operation.header = oldHeader; operation.header = oldHeader;
operation.init(); operation.init();
res = updateDocument(trx, revisionId, oldHeader, operation, mptr, options.waitForSync); res = updateDocument(trx, revisionId, oldHeader, operation, marker, mptr, options.waitForSync);
if (res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_NO_ERROR) {
operation.revert(); operation.revert();
@ -2232,10 +2225,7 @@ int LogicalCollection::remove(arangodb::Transaction* trx,
return TRI_ERROR_DEBUG; return TRI_ERROR_DEBUG;
} }
arangodb::wal::DocumentOperation operation(trx, marker, this, TRI_VOC_DOCUMENT_OPERATION_REMOVE); arangodb::wal::DocumentOperation operation(trx, this, TRI_VOC_DOCUMENT_OPERATION_REMOVE);
// DocumentOperation has taken over the ownership for the marker
TRI_ASSERT(operation.marker != nullptr);
VPackSlice key; VPackSlice key;
if (slice.isString()) { if (slice.isString()) {
@ -2297,7 +2287,7 @@ int LogicalCollection::remove(arangodb::Transaction* trx,
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG); THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
} }
res = TRI_AddOperationTransaction(trx->getInternals(), operation, options.waitForSync); res = TRI_AddOperationTransaction(trx->getInternals(), operation, marker, options.waitForSync);
if (res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_NO_ERROR) {
operation.revert(); operation.revert();
@ -2801,6 +2791,7 @@ int LogicalCollection::checkRevision(Transaction* trx,
int LogicalCollection::updateDocument( int LogicalCollection::updateDocument(
arangodb::Transaction* trx, TRI_voc_rid_t revisionId, arangodb::Transaction* trx, TRI_voc_rid_t revisionId,
TRI_doc_mptr_t* oldHeader, arangodb::wal::DocumentOperation& operation, TRI_doc_mptr_t* oldHeader, arangodb::wal::DocumentOperation& operation,
arangodb::wal::Marker const* marker,
TRI_doc_mptr_t* mptr, bool& waitForSync) { TRI_doc_mptr_t* mptr, bool& waitForSync) {
// save the old data, remember // save the old data, remember
TRI_doc_mptr_t oldData = *oldHeader; TRI_doc_mptr_t oldData = *oldHeader;
@ -2819,7 +2810,8 @@ int LogicalCollection::updateDocument(
TRI_doc_mptr_t* newHeader = oldHeader; TRI_doc_mptr_t* newHeader = oldHeader;
// update the header. this will modify oldHeader, too !!! // update the header. this will modify oldHeader, too !!!
void* mem = operation.marker->vpack(); TRI_ASSERT(marker != nullptr);
void* mem = marker->vpack();
TRI_ASSERT(mem != nullptr); TRI_ASSERT(mem != nullptr);
newHeader->setVPack(mem); newHeader->setVPack(mem);
@ -2846,7 +2838,7 @@ int LogicalCollection::updateDocument(
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG); THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
} }
res = TRI_AddOperationTransaction(trx->getInternals(), operation, waitForSync); res = TRI_AddOperationTransaction(trx->getInternals(), operation, marker, waitForSync);
if (res == TRI_ERROR_NO_ERROR) { if (res == TRI_ERROR_NO_ERROR) {
// write new header into result // write new header into result
@ -2860,8 +2852,8 @@ int LogicalCollection::updateDocument(
/// the caller must make sure the write lock on the collection is held /// the caller must make sure the write lock on the collection is held
int LogicalCollection::insertDocument( int LogicalCollection::insertDocument(
arangodb::Transaction* trx, TRI_doc_mptr_t* header, arangodb::Transaction* trx, TRI_doc_mptr_t* header,
arangodb::wal::DocumentOperation& operation, TRI_doc_mptr_t* mptr, arangodb::wal::DocumentOperation& operation, arangodb::wal::Marker const* marker,
bool& waitForSync) { TRI_doc_mptr_t* mptr, bool& waitForSync) {
TRI_ASSERT(header != nullptr); TRI_ASSERT(header != nullptr);
TRI_ASSERT(mptr != nullptr); TRI_ASSERT(mptr != nullptr);
@ -2896,7 +2888,7 @@ int LogicalCollection::insertDocument(
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG); THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
} }
res = TRI_AddOperationTransaction(trx->getInternals(), operation, waitForSync); res = TRI_AddOperationTransaction(trx->getInternals(), operation, marker, waitForSync);
if (res == TRI_ERROR_NO_ERROR) { if (res == TRI_ERROR_NO_ERROR) {
*mptr = *header; *mptr = *header;

View File

@ -43,6 +43,7 @@ class Slice;
namespace wal { namespace wal {
struct DocumentOperation; struct DocumentOperation;
class Marker;
} }
typedef std::string ServerID; // ID of a server typedef std::string ServerID; // ID of a server
@ -387,11 +388,11 @@ class LogicalCollection {
arangodb::velocypack::Slice const); arangodb::velocypack::Slice const);
int updateDocument(arangodb::Transaction*, TRI_voc_rid_t, TRI_doc_mptr_t*, int updateDocument(arangodb::Transaction*, TRI_voc_rid_t, TRI_doc_mptr_t*,
arangodb::wal::DocumentOperation&, arangodb::wal::DocumentOperation&, arangodb::wal::Marker const*,
TRI_doc_mptr_t*, bool&); TRI_doc_mptr_t*, bool&);
int insertDocument(arangodb::Transaction*, TRI_doc_mptr_t*, int insertDocument(arangodb::Transaction*, TRI_doc_mptr_t*,
arangodb::wal::DocumentOperation&, TRI_doc_mptr_t*, arangodb::wal::DocumentOperation&, arangodb::wal::Marker const*,
bool&); TRI_doc_mptr_t*, bool&);
int insertPrimaryIndex(arangodb::Transaction*, TRI_doc_mptr_t*); int insertPrimaryIndex(arangodb::Transaction*, TRI_doc_mptr_t*);
public: public:

View File

@ -955,6 +955,7 @@ bool TRI_IsContainedCollectionTransaction(TRI_transaction_t* trx,
int TRI_AddOperationTransaction(TRI_transaction_t* trx, int TRI_AddOperationTransaction(TRI_transaction_t* trx,
arangodb::wal::DocumentOperation& operation, arangodb::wal::DocumentOperation& operation,
arangodb::wal::Marker const* marker,
bool& waitForSync) { bool& waitForSync) {
TRI_ASSERT(operation.header != nullptr); TRI_ASSERT(operation.header != nullptr);
@ -990,7 +991,7 @@ int TRI_AddOperationTransaction(TRI_transaction_t* trx,
TRI_voc_fid_t fid = 0; TRI_voc_fid_t fid = 0;
void const* position = nullptr; void const* position = nullptr;
if (operation.marker->fid() == 0) { if (marker->fid() == 0) {
// this is a "real" marker that must be written into the logfiles // this is a "real" marker that must be written into the logfiles
// just append it to the WAL: // just append it to the WAL:
@ -1014,7 +1015,7 @@ int TRI_AddOperationTransaction(TRI_transaction_t* trx,
arangodb::wal::SlotInfoCopy slotInfo = arangodb::wal::SlotInfoCopy slotInfo =
arangodb::wal::LogfileManager::instance()->allocateAndWrite( arangodb::wal::LogfileManager::instance()->allocateAndWrite(
trx->_vocbase->id(), collection->cid(), trx->_vocbase->id(), collection->cid(),
operation.marker, wakeUpSynchronizer, marker, wakeUpSynchronizer,
localWaitForSync, waitForTick); localWaitForSync, waitForTick);
if (slotInfo.errorCode != TRI_ERROR_NO_ERROR) { if (slotInfo.errorCode != TRI_ERROR_NO_ERROR) {
// some error occurred // some error occurred
@ -1032,8 +1033,8 @@ int TRI_AddOperationTransaction(TRI_transaction_t* trx,
} else { } else {
// this is an envelope marker that has been written to the logfiles before. // this is an envelope marker that has been written to the logfiles before.
// avoid writing it again! // avoid writing it again!
fid = operation.marker->fid(); fid = marker->fid();
position = static_cast<wal::MarkerEnvelope const*>(operation.marker)->mem(); position = static_cast<wal::MarkerEnvelope const*>(marker)->mem();
} }
TRI_ASSERT(fid > 0); TRI_ASSERT(fid > 0);
@ -1105,8 +1106,14 @@ int TRI_AddOperationTransaction(TRI_transaction_t* trx,
trxCollection->_operations->push_back(copy); trxCollection->_operations->push_back(copy);
copy->handle(); copy->handle();
} }
collection->setRevision(operation.rid, false); {
VPackSlice s(static_cast<uint8_t*>(marker->vpack()));
VPackValueLength l;
char const* p = s.get(StaticStrings::RevString).getString(l);
TRI_voc_rid_t rid = TRI_StringToRid(p, l);
collection->setRevision(rid, false);
}
TRI_IF_FAILURE("TransactionOperationAtEnd") { return TRI_ERROR_DEBUG; } TRI_IF_FAILURE("TransactionOperationAtEnd") { return TRI_ERROR_DEBUG; }

View File

@ -68,6 +68,9 @@ std::string TRI_RidToString(TRI_voc_rid_t rid);
/// @brief Convert a string into a revision ID, no check variant /// @brief Convert a string into a revision ID, no check variant
TRI_voc_rid_t TRI_StringToRid(std::string const& ridStr, bool& isOld); TRI_voc_rid_t TRI_StringToRid(std::string const& ridStr, bool& isOld);
/// @brief Convert a string into a revision ID, no check variant
TRI_voc_rid_t TRI_StringToRid(char const* p, size_t len);
/// @brief Convert a string into a revision ID, no check variant /// @brief Convert a string into a revision ID, no check variant
TRI_voc_rid_t TRI_StringToRid(char const* p, size_t len, bool& isOld); TRI_voc_rid_t TRI_StringToRid(char const* p, size_t len, bool& isOld);

View File

@ -1249,10 +1249,9 @@ TRI_voc_rid_t TRI_ExtractRevisionId(VPackSlice slice) {
VPackSlice r(slice.get(StaticStrings::RevString)); VPackSlice r(slice.get(StaticStrings::RevString));
if (r.isString()) { if (r.isString()) {
bool isOld;
VPackValueLength l; VPackValueLength l;
char const* p = r.getString(l); char const* p = r.getString(l);
return TRI_StringToRid(p, l, isOld); return TRI_StringToRid(p, l);
} }
if (r.isInteger()) { if (r.isInteger()) {
return r.getNumber<TRI_voc_rid_t>(); return r.getNumber<TRI_voc_rid_t>();
@ -1322,6 +1321,12 @@ TRI_voc_rid_t TRI_StringToRid(std::string const& ridStr, bool& isOld) {
return TRI_StringToRid(ridStr.c_str(), ridStr.size(), isOld); return TRI_StringToRid(ridStr.c_str(), ridStr.size(), isOld);
} }
/// @brief Convert a string into a revision ID, no check variant
TRI_voc_rid_t TRI_StringToRid(char const* p, size_t len) {
bool isOld;
return TRI_StringToRid(p, len, isOld);
}
/// @brief Convert a string into a revision ID, no check variant /// @brief Convert a string into a revision ID, no check variant
TRI_voc_rid_t TRI_StringToRid(char const* p, size_t len, bool& isOld) { TRI_voc_rid_t TRI_StringToRid(char const* p, size_t len, bool& isOld) {
if (len > 0 && *p >= '1' && *p <= '9') { if (len > 0 && *p >= '1' && *p <= '9') {

View File

@ -7,13 +7,11 @@
#include "VocBase/MasterPointer.h" #include "VocBase/MasterPointer.h"
#include "VocBase/MasterPointers.h" #include "VocBase/MasterPointers.h"
#include "VocBase/voc-types.h" #include "VocBase/voc-types.h"
#include "Wal/Marker.h"
namespace arangodb { namespace arangodb {
class Transaction; class Transaction;
namespace wal { namespace wal {
class Marker;
struct DocumentOperation { struct DocumentOperation {
enum class StatusType : uint8_t { enum class StatusType : uint8_t {
@ -24,23 +22,15 @@ struct DocumentOperation {
REVERTED REVERTED
}; };
DocumentOperation(arangodb::Transaction* trx, Marker const* marker, DocumentOperation(arangodb::Transaction* trx,
LogicalCollection* collection, LogicalCollection* collection,
TRI_voc_document_operation_e type) TRI_voc_document_operation_e type)
: trx(trx), : trx(trx),
marker(marker),
collection(collection), collection(collection),
header(nullptr), header(nullptr),
tick(0), tick(0),
rid(0),
type(type), type(type),
status(StatusType::CREATED) { status(StatusType::CREATED) {
TRI_ASSERT(marker != nullptr);
VPackSlice s(static_cast<uint8_t*>(marker->vpack()));
bool isOld;
VPackValueLength l;
char const* p = s.get(StaticStrings::RevString).getString(l);
rid = TRI_StringToRid(p, l, isOld);
} }
~DocumentOperation() { ~DocumentOperation() {
@ -55,9 +45,8 @@ struct DocumentOperation {
DocumentOperation* swap() { DocumentOperation* swap() {
DocumentOperation* copy = DocumentOperation* copy =
new DocumentOperation(trx, marker, collection, type); new DocumentOperation(trx, collection, type);
copy->tick = tick; copy->tick = tick;
copy->rid = rid;
copy->header = header; copy->header = header;
copy->oldHeader = oldHeader; copy->oldHeader = oldHeader;
copy->status = status; copy->status = status;
@ -112,12 +101,10 @@ struct DocumentOperation {
} }
arangodb::Transaction* trx; arangodb::Transaction* trx;
Marker const* marker;
LogicalCollection* collection; LogicalCollection* collection;
TRI_doc_mptr_t* header; TRI_doc_mptr_t* header;
TRI_doc_mptr_t oldHeader; TRI_doc_mptr_t oldHeader;
TRI_voc_tick_t tick; TRI_voc_tick_t tick;
TRI_voc_rid_t rid;
TRI_voc_document_operation_e type; TRI_voc_document_operation_e type;
StatusType status; StatusType status;
}; };