1
0
Fork 0
This commit is contained in:
Jan Steemann 2014-05-27 18:42:40 +02:00
parent 86cecdb6a6
commit e52f6f9d05
11 changed files with 568 additions and 570 deletions

View File

@ -168,6 +168,7 @@ namespace triagens {
////////////////////////////////////////////////////////////////////////////////
inline int readRandom (TRI_doc_mptr_t* mptr, TRI_barrier_t** barrier) {
assert(mptr != nullptr);
return this->readAny(this->trxCollection(), mptr, barrier);
}
@ -176,6 +177,7 @@ namespace triagens {
////////////////////////////////////////////////////////////////////////////////
inline int read (TRI_doc_mptr_t* mptr, const string& key) {
assert(mptr != nullptr);
return this->readSingle(this->trxCollection(), mptr, key);
}

View File

@ -145,6 +145,8 @@ namespace triagens {
if (_numWrites++ > N) {
return TRI_ERROR_TRANSACTION_INTERNAL;
}
assert(mptr != nullptr);
return this->create(this->trxCollection(),
TRI_DOC_MARKER_KEY_DOCUMENT,
@ -166,6 +168,8 @@ namespace triagens {
if (_numWrites++ > N) {
return TRI_ERROR_TRANSACTION_INTERNAL;
}
assert(mptr != nullptr);
return this->create(this->trxCollection(),
TRI_DOC_MARKER_KEY_EDGE,
@ -186,6 +190,8 @@ namespace triagens {
if (_numWrites++ > N) {
return TRI_ERROR_TRANSACTION_INTERNAL;
}
assert(mptr != nullptr);
return this->create(this->trxCollection(),
key,
@ -210,6 +216,8 @@ namespace triagens {
return TRI_ERROR_TRANSACTION_INTERNAL;
}
assert(mptr != nullptr);
return this->create(this->trxCollection(),
key,
0,
@ -235,6 +243,8 @@ namespace triagens {
if (_numWrites++ > N) {
return TRI_ERROR_TRANSACTION_INTERNAL;
}
assert(mptr != nullptr);
return this->update(this->trxCollection(),
key,
@ -262,6 +272,8 @@ namespace triagens {
if (_numWrites++ > N) {
return TRI_ERROR_TRANSACTION_INTERNAL;
}
assert(mptr != nullptr);
return this->update(this->trxCollection(),
key,

View File

@ -535,9 +535,10 @@ namespace triagens {
int readSingle (TRI_transaction_collection_t* trxCollection,
TRI_doc_mptr_t* mptr,
const string& key) {
assert(mptr != nullptr);
TRI_primary_collection_t* primary = primaryCollection(trxCollection);
memset(&mptr, 0, sizeof(TRI_doc_mptr_t));
int res = primary->readDocument(trxCollection,
(TRI_voc_key_t) key.c_str(),

View File

@ -47,10 +47,17 @@
#include "VocBase/server.h"
#include "VocBase/update-policy.h"
#include "VocBase/voc-shaper.h"
#include "Wal/DocumentOperation.h"
#include "Wal/LogfileManager.h"
#include "Wal/Marker.h"
#include "Wal/Slots.h"
////////////////////////////////////////////////////////////////////////////////
/// @brief add a WAL operation for a transaction collection
////////////////////////////////////////////////////////////////////////////////
int TRI_AddOperationTransaction (triagens::wal::DocumentOperation&, bool);
// -----------------------------------------------------------------------------
// --SECTION-- forward declarations
// -----------------------------------------------------------------------------
@ -153,9 +160,9 @@ static int InsertPrimaryIndex (TRI_document_collection_t* document,
TRI_doc_mptr_t* found;
int res;
TRI_ASSERT_MAINTAINER(document != NULL);
TRI_ASSERT_MAINTAINER(header != NULL);
TRI_ASSERT_MAINTAINER(header->_key != NULL);
TRI_ASSERT_MAINTAINER(document != nullptr);
TRI_ASSERT_MAINTAINER(header != nullptr);
TRI_ASSERT_MAINTAINER(header->_key != nullptr);
primary = &document->base;
@ -166,7 +173,7 @@ static int InsertPrimaryIndex (TRI_document_collection_t* document,
return res;
}
if (found == NULL) {
if (found == nullptr) {
// success
IncreaseDocumentCount(primary);
@ -190,18 +197,12 @@ static int InsertPrimaryIndex (TRI_document_collection_t* document,
static int InsertSecondaryIndexes (TRI_document_collection_t* document,
TRI_doc_mptr_t const* header,
const bool isRollback) {
size_t i, n;
int result;
int result = TRI_ERROR_NO_ERROR;
size_t const n = document->_allIndexes._length;
result = TRI_ERROR_NO_ERROR;
n = document->_allIndexes._length;
for (i = 0; i < n; ++i) {
TRI_index_t* idx;
int res;
idx = static_cast<TRI_index_t*>(document->_allIndexes._buffer[i]);
res = idx->insert(idx, header, isRollback);
for (size_t i = 0; i < n; ++i) {
TRI_index_t* idx = static_cast<TRI_index_t*>(document->_allIndexes._buffer[i]);
int res = idx->insert(idx, header, isRollback);
// in case of no-memory, return immediately
if (res == TRI_ERROR_OUT_OF_MEMORY) {
@ -233,7 +234,7 @@ static int DeletePrimaryIndex (TRI_document_collection_t* document,
TRI_primary_collection_t* primary = &document->base;
TRI_doc_mptr_t* found = static_cast<TRI_doc_mptr_t*>(TRI_RemoveKeyAssociativePointer(&primary->_primaryIndex, header->_key));
if (found == NULL) {
if (found == nullptr) {
return TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND;
}
@ -249,13 +250,10 @@ static int DeletePrimaryIndex (TRI_document_collection_t* document,
static int DeleteSecondaryIndexes (TRI_document_collection_t* document,
TRI_doc_mptr_t const* header,
const bool isRollback) {
size_t i, n;
int result;
size_t const n = document->_allIndexes._length;
int result = TRI_ERROR_NO_ERROR;
n = document->_allIndexes._length;
result = TRI_ERROR_NO_ERROR;
for (i = 0; i < n; ++i) {
for (size_t i = 0; i < n; ++i) {
TRI_index_t* idx = static_cast<TRI_index_t*>(document->_allIndexes._buffer[i]);
int res = idx->remove(idx, header, isRollback);
@ -358,23 +356,13 @@ static int RotateJournal (TRI_document_collection_t* document) {
////////////////////////////////////////////////////////////////////////////////
static int RollbackInsert (TRI_document_collection_t* document,
TRI_doc_mptr_t* newHeader,
TRI_doc_mptr_t* oldHeader) {
int res;
// there is no old header
assert(oldHeader == NULL);
TRI_doc_mptr_t* header) {
// ignore any errors we're getting from this
DeletePrimaryIndex(document, newHeader, true);
DeleteSecondaryIndexes(document, newHeader, true);
DeletePrimaryIndex(document, header, true);
DeleteSecondaryIndexes(document, header, true);
// release the header. nobody else should point to it now
document->_headers->release(document->_headers, newHeader, true);
res = TRI_ERROR_NO_ERROR;
return res;
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
@ -414,18 +402,12 @@ static int RollbackUpdate (TRI_document_collection_t* document,
////////////////////////////////////////////////////////////////////////////////
static int RollbackRemove (TRI_document_collection_t* document,
TRI_doc_mptr_t* newHeader,
TRI_doc_mptr_t* oldHeader,
TRI_doc_mptr_t* header,
bool adjustHeader) {
int res;
// there is no new header
assert(newHeader == NULL);
res = InsertPrimaryIndex(document, oldHeader, true);
int res = InsertPrimaryIndex(document, header, true);
if (res == TRI_ERROR_NO_ERROR) {
res = InsertSecondaryIndexes(document, oldHeader, true);
res = InsertSecondaryIndexes(document, header, true);
}
else {
LOG_ERROR("error rolling back remove operation");
@ -433,7 +415,7 @@ static int RollbackRemove (TRI_document_collection_t* document,
if (adjustHeader) {
// put back the header into its old position
document->_headers->relink(document->_headers, oldHeader, oldHeader);
document->_headers->relink(document->_headers, header, header);
}
return res;
@ -596,35 +578,6 @@ static void DebugHeadersDocumentCollection (TRI_document_collection_t* collectio
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief insert a WAL marker into indexes
////////////////////////////////////////////////////////////////////////////////
static int InsertIndexes (TRI_transaction_collection_t* trxCollection,
TRI_doc_mptr_t* header) {
TRI_document_collection_t* document = (TRI_document_collection_t*) trxCollection->_collection->_collection;
// insert into primary index first
int res = InsertPrimaryIndex(document, header, false);
if (res != TRI_ERROR_NO_ERROR) {
// insert has failed
return res;
}
// insert into secondary indexes
res = InsertSecondaryIndexes(document, header, false);
if (res != TRI_ERROR_NO_ERROR) {
// insertion into secondary indexes failed
DeleteSecondaryIndexes(document, header, true);
DeletePrimaryIndex(document, header, true);
}
return res;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief post-insert operation
////////////////////////////////////////////////////////////////////////////////
@ -647,6 +600,63 @@ static int PostInsertIndexes (TRI_transaction_collection_t* trxCollection,
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief generates a new revision id if not yet set
////////////////////////////////////////////////////////////////////////////////
static inline TRI_voc_rid_t GetRevisionId (TRI_voc_rid_t previous) {
if (previous != 0) {
return previous;
}
// generate new revision id
return static_cast<TRI_voc_rid_t>(TRI_NewTickServer());
}
////////////////////////////////////////////////////////////////////////////////
/// @brief insert a document
////////////////////////////////////////////////////////////////////////////////
static int InsertDocument (TRI_transaction_collection_t* trxCollection,
TRI_doc_mptr_t* header,
triagens::wal::DocumentOperation& operation,
TRI_doc_mptr_t* mptr,
bool syncRequested) {
assert(header != nullptr);
assert(mptr != nullptr);
TRI_document_collection_t* document = (TRI_document_collection_t*) trxCollection->_collection->_collection;
// .............................................................................
// insert into indexes
// .............................................................................
// insert into primary index first
int res = InsertPrimaryIndex(document, header, false);
if (res != TRI_ERROR_NO_ERROR) {
// insert has failed
return res;
}
// insert into secondary indexes
res = InsertSecondaryIndexes(document, header, false);
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
res = TRI_AddOperationTransaction(operation, syncRequested);
if (res == TRI_ERROR_NO_ERROR) {
*mptr = *header;
res = PostInsertIndexes(trxCollection, header);
}
return res;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief insert a shaped-json document (or edge)
/// note: key might be NULL. in this case, a key is auto-generated
@ -664,16 +674,12 @@ static int InsertDocumentShapedJson (TRI_transaction_collection_t* trxCollection
bool isRestore) {
// TODO: isRestore is not used yet!
TRI_voc_tick_t tick;
assert(mptr != nullptr);
mptr->_data = nullptr;
mptr->_key = nullptr;
if (rid == 0) {
// generate new revision id
tick = TRI_NewTickServer();
rid = static_cast<TRI_voc_rid_t>(tick);
}
else {
tick = static_cast<TRI_voc_tick_t>(rid);
}
rid = GetRevisionId(rid);
TRI_voc_tick_t tick = static_cast<TRI_voc_tick_t>(rid);
TRI_primary_collection_t* primary = trxCollection->_collection->_collection;
TRI_key_generator_t* keyGenerator = static_cast<TRI_key_generator_t*>(primary->_keyGenerator);
@ -707,37 +713,32 @@ static int InsertDocumentShapedJson (TRI_transaction_collection_t* trxCollection
return res;
}
triagens::wal::SlotInfo slotInfo;
triagens::wal::Marker* marker = nullptr;
if (markerType == TRI_DOC_MARKER_KEY_DOCUMENT) {
// document
assert(edge == nullptr);
triagens::wal::DocumentMarker marker(primary->base._vocbase->_id,
primary->base._info._cid,
rid,
trxCollection->_transaction->_id,
keyString,
legend,
shaped);
slotInfo = triagens::wal::LogfileManager::instance()->writeMarker(marker, forceSync);
marker = new triagens::wal::DocumentMarker(primary->base._vocbase->_id,
primary->base._info._cid,
rid,
trxCollection->_transaction->_id,
keyString,
legend,
shaped);
}
else if (markerType == TRI_DOC_MARKER_KEY_EDGE) {
// edge
assert(edge != nullptr);
triagens::wal::EdgeMarker marker(primary->base._vocbase->_id,
primary->base._info._cid,
rid,
trxCollection->_transaction->_id,
keyString,
edge,
legend,
shaped);
slotInfo = triagens::wal::LogfileManager::instance()->writeMarker(marker, forceSync);
marker = new triagens::wal::EdgeMarker(primary->base._vocbase->_id,
primary->base._info._cid,
rid,
trxCollection->_transaction->_id,
keyString,
edge,
legend,
shaped);
}
else {
// invalid marker type
@ -745,71 +746,72 @@ static int InsertDocumentShapedJson (TRI_transaction_collection_t* trxCollection
}
if (slotInfo.errorCode != TRI_ERROR_NO_ERROR) {
// some error occurred
return slotInfo.errorCode;
}
assert(marker != nullptr);
// now insert into indexes
{
triagens::arango::CollectionWriteLocker collectionLocker(primary, lock);
triagens::wal::DocumentOperation operation(marker, trxCollection, TRI_VOC_DOCUMENT_OPERATION_INSERT, rid);
// create a new header
TRI_document_collection_t* document = (TRI_document_collection_t*) primary;
TRI_doc_mptr_t* header = document->_headers->request(document->_headers, slotInfo.size);
TRI_doc_mptr_t* header = operation.header = document->_headers->request(document->_headers, marker->size());
if (header == nullptr) {
// out of memory. no harm done here. just return the error
return TRI_ERROR_OUT_OF_MEMORY;
}
// update the header we got
triagens::wal::document_marker_t const* m = static_cast<triagens::wal::document_marker_t const*>(slotInfo.mem);
void* mem = operation.marker->mem();
triagens::wal::document_marker_t const* m = static_cast<triagens::wal::document_marker_t const*>(mem);
header->_rid = rid;
header->_fid = 0; // TODO: use WAL fid
// let header point to WAL location
header->_data = slotInfo.mem;
header->_key = static_cast<char*>(const_cast<void*>(slotInfo.mem)) + m->_offsetKey;
header->_data = mem;
header->_key = static_cast<char*>(const_cast<void*>(mem)) + m->_offsetKey;
// insert into indexes
res = InsertIndexes(trxCollection, header);
if (res != TRI_ERROR_NO_ERROR) {
// release the header
document->_headers->release(document->_headers, header, true);
return res;
}
// add the operation to the running transaction
res = TRI_AddOperationTransaction(trxCollection,
TRI_VOC_DOCUMENT_OPERATION_INSERT,
header,
nullptr,
nullptr,
forceSync);
res = InsertDocument(trxCollection, header, operation, mptr, forceSync);
if (res != TRI_ERROR_NO_ERROR) {
// release the header. nobody else should point to it now
assert(mptr->_data == nullptr);
assert(mptr->_key == nullptr);
// something has failed.... now delete from the indexes again
RollbackInsert(document, header, nullptr);
// TODO: do we need to release the header here?
return res;
RollbackInsert(document, header);
}
// execute a post-insert
res = PostInsertIndexes(trxCollection, header);
// TODO: do we need to release the header if something goes wrong?
// TODO: post-insert will never return an error
if (res == TRI_ERROR_NO_ERROR) {
TRI_SetRevisionDocumentCollection(document, header->_rid, false);
*mptr = *header;
else {
assert(mptr->_data != nullptr);
assert(mptr->_key != nullptr);
}
}
return res;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief looks up a document by key
/// the caller must make sure the read lock on the collection is helt
////////////////////////////////////////////////////////////////////////////////
static int LookupDocument (TRI_primary_collection_t* primary,
TRI_voc_key_t key,
TRI_doc_update_policy_t const* policy,
TRI_doc_mptr_t*& header) {
header = static_cast<TRI_doc_mptr_t*>(TRI_LookupByKeyAssociativePointer(&primary->_primaryIndex, key));
if (! IsVisible(header)) {
return TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND;
}
if (policy != nullptr) {
return TRI_CheckUpdatePolicy(policy, header->_rid);
}
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief reads an element from the document collection
////////////////////////////////////////////////////////////////////////////////
@ -818,15 +820,19 @@ static int ReadDocumentShapedJson (TRI_transaction_collection_t* trxCollection,
const TRI_voc_key_t key,
TRI_doc_mptr_t* mptr,
bool lock) {
TRI_primary_collection_t* primary = trxCollection->_collection->_collection;
assert(mptr != nullptr);
mptr->_data = nullptr;
mptr->_key = nullptr;
{
TRI_primary_collection_t* primary = trxCollection->_collection->_collection;
triagens::arango::CollectionReadLocker collectionLocker(primary, lock);
TRI_doc_mptr_t const* header = static_cast<TRI_doc_mptr_t const*>(TRI_LookupByKeyAssociativePointer(&primary->_primaryIndex, key));
TRI_doc_mptr_t* header;
int res = LookupDocument(primary, key, nullptr, header);
if (! IsVisible(header)) {
return TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND;
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
// we found a document, now copy it over
@ -841,15 +847,26 @@ static int ReadDocumentShapedJson (TRI_transaction_collection_t* trxCollection,
}
////////////////////////////////////////////////////////////////////////////////
/// @brief updates a document in the indexes
/// @brief updates an existing document
////////////////////////////////////////////////////////////////////////////////
static int UpdateIndexes (TRI_transaction_collection_t* trxCollection,
TRI_doc_mptr_t* oldHeader,
TRI_doc_mptr_t* newHeader) {
static int UpdateDocument (TRI_transaction_collection_t* trxCollection,
TRI_doc_mptr_t* oldHeader,
triagens::wal::DocumentOperation& operation,
TRI_doc_mptr_t* mptr,
bool syncRequested) {
TRI_document_collection_t* document = (TRI_document_collection_t*) trxCollection->_collection->_collection;
// save the old data, remember
TRI_doc_mptr_t oldData = *oldHeader;
// .............................................................................
// update indexes
// .............................................................................
// remove old document from secondary indexes
// (it will stay in the primary index as the key won't change)
TRI_document_collection_t* document = (TRI_document_collection_t*) trxCollection->_collection->_collection;
int res = DeleteSecondaryIndexes(document, oldHeader, false);
if (res != TRI_ERROR_NO_ERROR) {
@ -858,6 +875,16 @@ static int UpdateIndexes (TRI_transaction_collection_t* trxCollection,
return res;
}
// .............................................................................
// update header
// .............................................................................
TRI_doc_mptr_t* newHeader = oldHeader;
// update the header. this will modify oldHeader, too !!!
newHeader->_rid = operation.rid;
newHeader->_data = operation.marker->mem();
// insert new document into secondary indexes
res = InsertSecondaryIndexes(document, newHeader, false);
@ -865,7 +892,23 @@ static int UpdateIndexes (TRI_transaction_collection_t* trxCollection,
if (res != TRI_ERROR_NO_ERROR) {
// rollback
DeleteSecondaryIndexes(document, newHeader, true);
// copy back old header data
*oldHeader = oldData;
InsertSecondaryIndexes(document, oldHeader, true);
return res;
}
res = TRI_AddOperationTransaction(operation, syncRequested);
if (res == TRI_ERROR_NO_ERROR) {
// write new header into result
*mptr = *((TRI_doc_mptr_t*) newHeader);
}
else {
RollbackUpdate(document, newHeader, &oldData, false); // TODO: check whether "false" is correct!
}
return res;
@ -884,184 +927,89 @@ static int UpdateDocumentShapedJson (TRI_transaction_collection_t* trxCollection
bool lock,
bool forceSync) {
if (rid == 0) {
// generate new revision id
rid = static_cast<TRI_voc_rid_t>(TRI_NewTickServer());
}
rid = GetRevisionId(rid);
assert(key != nullptr);
// initialise the result
assert(mptr != nullptr);
mptr->_key = nullptr;
mptr->_data = nullptr;
TRI_primary_collection_t* primary = trxCollection->_collection->_collection;
TRI_document_collection_t* document = (TRI_document_collection_t*) primary;
// create legend
triagens::basics::JsonLegend legend(primary->_shaper);
int res = legend.addShape(shaped->_sid, &shaped->_data);
int res;
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
{
triagens::arango::CollectionWriteLocker collectionLocker(primary, lock);
// get the header pointer of the previous revision
assert(key != nullptr);
TRI_doc_mptr_t* oldHeader = static_cast<TRI_doc_mptr_t*>(TRI_LookupByKeyAssociativePointer(&primary->_primaryIndex, key));
if (IsVisible(oldHeader)) {
// document found, now check revision
res = TRI_CheckUpdatePolicy(policy, oldHeader->_rid);
}
else {
// document not found
res = TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND;
}
TRI_doc_mptr_t* oldHeader;
res = LookupDocument(primary, key, policy, oldHeader);
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
triagens::basics::JsonLegend legend(primary->_shaper);
res = legend.addShape(shaped->_sid, &shaped->_data);
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
triagens::wal::SlotInfo slotInfo;
triagens::wal::Marker* marker = nullptr;
TRI_df_marker_t const* original = static_cast<TRI_df_marker_t const*>(oldHeader->_data);
if (original->_type == TRI_WAL_MARKER_DOCUMENT ||
original->_type == TRI_DOC_MARKER_KEY_DOCUMENT) {
triagens::wal::DocumentMarker marker = triagens::wal::DocumentMarker::clone(original,
primary->base._vocbase->_id,
primary->base._info._cid,
rid,
trxCollection->_transaction->_id,
legend,
shaped);
slotInfo = triagens::wal::LogfileManager::instance()->writeMarker(marker, forceSync);
// create a WAL document marker
marker = triagens::wal::DocumentMarker::clone(original,
primary->base._vocbase->_id,
primary->base._info._cid,
rid,
trxCollection->_transaction->_id,
legend,
shaped);
}
else if (original->_type == TRI_WAL_MARKER_EDGE ||
original->_type == TRI_DOC_MARKER_KEY_EDGE) {
original->_type == TRI_DOC_MARKER_KEY_EDGE) {
// create a WAL edge marker
triagens::wal::EdgeMarker marker = triagens::wal::EdgeMarker::clone(original,
primary->base._vocbase->_id,
primary->base._info._cid,
rid,
trxCollection->_transaction->_id,
legend,
shaped);
slotInfo = triagens::wal::LogfileManager::instance()->writeMarker(marker, forceSync);
marker = triagens::wal::EdgeMarker::clone(original,
primary->base._vocbase->_id,
primary->base._info._cid,
rid,
trxCollection->_transaction->_id,
legend,
shaped);
}
else {
// invalid marker type
assert(false);
return TRI_ERROR_INTERNAL;
}
triagens::wal::DocumentOperation operation(marker, trxCollection, TRI_VOC_DOCUMENT_OPERATION_UPDATE, rid);
operation.header = oldHeader;
operation.init();
if (slotInfo.errorCode != TRI_ERROR_NO_ERROR) {
// some error occurred
return slotInfo.errorCode;
}
triagens::wal::document_marker_t const* m = static_cast<triagens::wal::document_marker_t const*>(slotInfo.mem);
TRI_doc_mptr_t newHeader;
newHeader._rid = rid;
newHeader._fid = 0; // TODO
newHeader._data = slotInfo.mem;
newHeader._key = static_cast<char*>(const_cast<void*>(slotInfo.mem)) + m->_offsetKey;
res = UpdateIndexes(trxCollection, oldHeader, &newHeader);
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
res = TRI_AddOperationTransaction(trxCollection,
TRI_VOC_DOCUMENT_OPERATION_UPDATE,
&newHeader,
oldHeader,
oldHeader,
forceSync);
if (res != TRI_ERROR_NO_ERROR) {
RollbackUpdate(document, &newHeader, oldHeader, false);
return res;
}
TRI_SetRevisionDocumentCollection(document, rid, false);
// update the old header in place
oldHeader->_rid = newHeader._rid;
oldHeader->_fid = newHeader._fid;
oldHeader->_data = newHeader._data;
oldHeader->_key = newHeader._key;
// return value
*mptr = newHeader;
res = UpdateDocument(trxCollection, oldHeader, operation, mptr, forceSync);
}
if (res == TRI_ERROR_NO_ERROR) {
assert(mptr->_key != nullptr);
assert(mptr->_data != nullptr);
assert(mptr->_rid > 0);
}
else {
assert(mptr->_key == nullptr);
assert(mptr->_data == nullptr);
assert(mptr->_rid == 0);
}
assert(mptr->_key != nullptr);
assert(mptr->_data != nullptr);
assert(mptr->_rid > 0);
return res;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief removes a document from the indexes
////////////////////////////////////////////////////////////////////////////////
static int RemoveIndexes (TRI_transaction_collection_t* trxCollection,
TRI_doc_update_policy_t const* policy,
TRI_voc_key_t key,
TRI_doc_mptr_t** mptr) {
TRI_primary_collection_t* primary = trxCollection->_collection->_collection;
// get the existing header pointer
TRI_doc_mptr_t* header = static_cast<TRI_doc_mptr_t*>(TRI_LookupByKeyAssociativePointer(&primary->_primaryIndex, key));
if (! IsVisible(header)) {
return TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND;
}
*mptr = header;
// .............................................................................
// check the revision
// .............................................................................
int res = TRI_CheckUpdatePolicy(policy, header->_rid);
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
// delete from indexes
TRI_document_collection_t* document = (TRI_document_collection_t*) primary;
res = DeleteSecondaryIndexes(document, header, false);
if (res != TRI_ERROR_NO_ERROR) {
// deletion failed. roll back
InsertSecondaryIndexes(document, header, true);
return res;
}
res = DeletePrimaryIndex(document, header, false);
if (res != TRI_ERROR_NO_ERROR) {
// deletion failed. roll back
InsertSecondaryIndexes(document, header, true);
}
return res;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief removes a shaped-json document (or edge)
////////////////////////////////////////////////////////////////////////////////
@ -1073,55 +1021,50 @@ static int RemoveDocumentShapedJson (TRI_transaction_collection_t* trxCollection
bool lock,
bool forceSync) {
assert(key != nullptr);
if (rid == 0) {
// generate new revision id
rid = static_cast<TRI_voc_rid_t>(TRI_NewTickServer());
}
rid = GetRevisionId(rid);
TRI_primary_collection_t* primary = trxCollection->_collection->_collection;
TRI_document_collection_t* document = (TRI_document_collection_t*) primary;
triagens::wal::RemoveMarker marker(primary->base._vocbase->_id,
primary->base._info._cid,
rid,
trxCollection->_transaction->_id,
std::string(key));
triagens::wal::SlotInfo slotInfo = triagens::wal::LogfileManager::instance()->writeMarker(marker, forceSync);
if (slotInfo.errorCode != TRI_ERROR_NO_ERROR) {
// some error occurred
return slotInfo.errorCode;
}
triagens::wal::Marker* marker = new triagens::wal::RemoveMarker(primary->base._vocbase->_id,
primary->base._info._cid,
rid,
trxCollection->_transaction->_id,
std::string(key));
TRI_doc_mptr_t* header;
int res;
{
triagens::arango::CollectionWriteLocker collectionLocker(primary, lock);
triagens::wal::DocumentOperation operation(marker, trxCollection, TRI_VOC_DOCUMENT_OPERATION_REMOVE, rid);
res = RemoveIndexes(trxCollection, policy, key, &header);
res = LookupDocument(primary, key, policy, header);
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
// we found a document to remove
assert(header != nullptr);
operation.header = header;
res = TRI_AddOperationTransaction(trxCollection,
TRI_VOC_DOCUMENT_OPERATION_REMOVE,
nullptr,
header,
header,
forceSync);
TRI_document_collection_t* document = (TRI_document_collection_t*) primary;
// delete from indexes
res = DeleteSecondaryIndexes(document, header, false);
if (res != TRI_ERROR_NO_ERROR) {
InsertSecondaryIndexes(document, header, true);
return res;
}
res = TRI_AddOperationTransaction(operation, forceSync);
if (res != TRI_ERROR_NO_ERROR) {
// deletion failed. roll back
RollbackRemove(document, nullptr, header, true); // TODO: check if "true" is correct!
}
else {
TRI_SetRevisionDocumentCollection(document, rid, false);
document->_headers->unlink(document->_headers, header);
RollbackRemove(document, header, false);
}
}
@ -2575,13 +2518,13 @@ int TRI_RollbackOperationDocumentCollection (TRI_document_collection_t* document
int res;
if (type == TRI_VOC_DOCUMENT_OPERATION_INSERT) {
res = RollbackInsert(document, newHeader, NULL);
res = RollbackInsert(document, newHeader);
}
else if (type == TRI_VOC_DOCUMENT_OPERATION_UPDATE) {
res = RollbackUpdate(document, newHeader, oldData, true);
}
else if (type == TRI_VOC_DOCUMENT_OPERATION_REMOVE) {
res = RollbackRemove(document, NULL, oldHeader, true);
res = RollbackRemove(document, oldHeader, true);
}
else {
res = TRI_ERROR_INTERNAL;

View File

@ -103,7 +103,7 @@ static void ClearHeader (TRI_headers_t* h,
TRI_doc_mptr_t* header) {
simple_headers_t* headers = (simple_headers_t*) h;
TRI_ASSERT_MAINTAINER(header != NULL);
TRI_ASSERT_MAINTAINER(header != nullptr);
memset(header, 0, sizeof(TRI_doc_mptr_t));
@ -123,7 +123,7 @@ static void MoveBackHeader (TRI_headers_t* h,
int64_t oldSize;
int64_t newSize;
if (header == NULL) {
if (header == nullptr) {
return;
}
@ -132,13 +132,13 @@ static void MoveBackHeader (TRI_headers_t* h,
TRI_ASSERT_MAINTAINER(headers->_totalSize > 0);
// we have at least one element in the list
TRI_ASSERT_MAINTAINER(headers->_begin != NULL);
TRI_ASSERT_MAINTAINER(headers->_end != NULL);
TRI_ASSERT_MAINTAINER(headers->_begin != nullptr);
TRI_ASSERT_MAINTAINER(headers->_end != nullptr);
TRI_ASSERT_MAINTAINER(header->_prev != header);
TRI_ASSERT_MAINTAINER(header->_next != header);
TRI_ASSERT_MAINTAINER(old != NULL);
TRI_ASSERT_MAINTAINER(old->_data != NULL);
TRI_ASSERT_MAINTAINER(old != nullptr);
TRI_ASSERT_MAINTAINER(old->_data != nullptr);
newSize = (int64_t) (((TRI_df_marker_t*) header->_data)->_size);
oldSize = (int64_t) (((TRI_df_marker_t*) old->_data)->_size);
@ -149,32 +149,32 @@ static void MoveBackHeader (TRI_headers_t* h,
if (headers->_end == header) {
// header is already at the end
TRI_ASSERT_MAINTAINER(header->_next == NULL);
TRI_ASSERT_MAINTAINER(header->_next == nullptr);
return;
}
TRI_ASSERT_MAINTAINER(headers->_begin != headers->_end);
// unlink the element
if (header->_prev != NULL) {
if (header->_prev != nullptr) {
header->_prev->_next = header->_next;
}
if (header->_next != NULL) {
if (header->_next != nullptr) {
header->_next->_prev = header->_prev;
}
if (headers->_begin == header) {
TRI_ASSERT_MAINTAINER(header->_next != NULL);
TRI_ASSERT_MAINTAINER(header->_next != nullptr);
headers->_begin = header->_next;
}
header->_prev = headers->_end;
header->_next = NULL;
header->_next = nullptr;
headers->_end = header;
header->_prev->_next = header;
TRI_ASSERT_MAINTAINER(headers->_begin != NULL);
TRI_ASSERT_MAINTAINER(headers->_end != NULL);
TRI_ASSERT_MAINTAINER(headers->_begin != nullptr);
TRI_ASSERT_MAINTAINER(headers->_end != nullptr);
TRI_ASSERT_MAINTAINER(header->_prev != header);
TRI_ASSERT_MAINTAINER(header->_next != header);
@ -190,8 +190,8 @@ static void UnlinkHeader (TRI_headers_t* h,
simple_headers_t* headers = (simple_headers_t*) h;
int64_t size;
TRI_ASSERT_MAINTAINER(header != NULL);
TRI_ASSERT_MAINTAINER(header->_data != NULL);
TRI_ASSERT_MAINTAINER(header != nullptr);
TRI_ASSERT_MAINTAINER(header->_data != nullptr);
TRI_ASSERT_MAINTAINER(header->_prev != header);
TRI_ASSERT_MAINTAINER(header->_next != header);
@ -199,11 +199,11 @@ static void UnlinkHeader (TRI_headers_t* h,
TRI_ASSERT_MAINTAINER(size > 0);
// unlink the header
if (header->_prev != NULL) {
if (header->_prev != nullptr) {
header->_prev->_next = header->_next;
}
if (header->_next != NULL) {
if (header->_next != nullptr) {
header->_next->_prev = header->_prev;
}
@ -224,13 +224,13 @@ static void UnlinkHeader (TRI_headers_t* h,
headers->_totalSize -= TRI_DF_ALIGN_BLOCK(size);
if (headers->_nrLinked == 0) {
TRI_ASSERT_MAINTAINER(headers->_begin == NULL);
TRI_ASSERT_MAINTAINER(headers->_end == NULL);
TRI_ASSERT_MAINTAINER(headers->_begin == nullptr);
TRI_ASSERT_MAINTAINER(headers->_end == nullptr);
TRI_ASSERT_MAINTAINER(headers->_totalSize == 0);
}
else {
TRI_ASSERT_MAINTAINER(headers->_begin != NULL);
TRI_ASSERT_MAINTAINER(headers->_end != NULL);
TRI_ASSERT_MAINTAINER(headers->_begin != nullptr);
TRI_ASSERT_MAINTAINER(headers->_end != nullptr);
TRI_ASSERT_MAINTAINER(headers->_totalSize > 0);
}
@ -250,17 +250,17 @@ static void MoveHeader (TRI_headers_t* h,
int64_t newSize;
int64_t oldSize;
if (header == NULL) {
if (header == nullptr) {
return;
}
TRI_ASSERT_MAINTAINER(headers->_nrAllocated > 0);
TRI_ASSERT_MAINTAINER(header->_prev != header);
TRI_ASSERT_MAINTAINER(header->_next != header);
TRI_ASSERT_MAINTAINER(header->_data != NULL);
TRI_ASSERT_MAINTAINER(header->_data != nullptr);
TRI_ASSERT_MAINTAINER(((TRI_df_marker_t*) header->_data)->_size > 0);
TRI_ASSERT_MAINTAINER(old != NULL);
TRI_ASSERT_MAINTAINER(old->_data != NULL);
TRI_ASSERT_MAINTAINER(old != nullptr);
TRI_ASSERT_MAINTAINER(old->_data != nullptr);
newSize = (int64_t) (((TRI_df_marker_t*) header->_data)->_size);
oldSize = (int64_t) (((TRI_df_marker_t*) old->_data)->_size);
@ -269,56 +269,56 @@ static void MoveHeader (TRI_headers_t* h,
headers->_totalSize += TRI_DF_ALIGN_BLOCK(oldSize);
// adjust list start and end pointers
if (old->_prev == NULL) {
if (old->_prev == nullptr) {
headers->_begin = header;
}
else if (headers->_begin == header) {
headers->_begin = header->_next;
}
if (old->_next == NULL) {
if (old->_next == nullptr) {
headers->_end = header;
}
else if (headers->_end == header) {
headers->_end = header->_prev;
}
if (header->_prev != NULL) {
if (header->_prev != nullptr) {
if (header->_prev == old->_next) {
header->_prev->_next = NULL;
header->_prev->_next = nullptr;
}
else {
header->_prev->_next = header->_next;
}
}
if (header->_next != NULL) {
if (header->_next != nullptr) {
if (header->_next == old->_prev) {
header->_next->_prev = NULL;
header->_next->_prev = nullptr;
}
else {
header->_next->_prev = header->_prev;
}
}
if (old->_prev != NULL) {
if (old->_prev != nullptr) {
old->_prev->_next = header;
header->_prev = old->_prev;
}
else {
header->_prev = NULL;
header->_prev = nullptr;
}
if (old->_next != NULL) {
if (old->_next != nullptr) {
old->_next->_prev = header;
header->_next = old->_next;
}
else {
header->_next = NULL;
header->_next = nullptr;
}
TRI_ASSERT_MAINTAINER(headers->_begin != NULL);
TRI_ASSERT_MAINTAINER(headers->_end != NULL);
TRI_ASSERT_MAINTAINER(headers->_begin != nullptr);
TRI_ASSERT_MAINTAINER(headers->_end != nullptr);
TRI_ASSERT_MAINTAINER(header->_prev != header);
TRI_ASSERT_MAINTAINER(header->_next != header);
}
@ -334,11 +334,11 @@ static void RelinkHeader (TRI_headers_t* h,
simple_headers_t* headers = (simple_headers_t*) h;
int64_t size;
if (header == NULL) {
if (header == nullptr) {
return;
}
TRI_ASSERT_MAINTAINER(header->_data != NULL);
TRI_ASSERT_MAINTAINER(header->_data != nullptr);
size = (int64_t) ((TRI_df_marker_t*) header->_data)->_size;
TRI_ASSERT_MAINTAINER(size > 0);
@ -366,7 +366,7 @@ static TRI_doc_mptr_t* RequestHeader (TRI_headers_t* h,
assert(size > 0);
if (headers->_freelist == NULL) {
if (headers->_freelist == nullptr) {
size_t blockSize = GetBlockSize(headers->_blocks._length);
TRI_ASSERT_MAINTAINER(blockSize > 0);
@ -374,36 +374,36 @@ static TRI_doc_mptr_t* RequestHeader (TRI_headers_t* h,
char* begin = static_cast<char*>(TRI_Allocate(TRI_UNKNOWN_MEM_ZONE, blockSize * sizeof(TRI_doc_mptr_t), true));
// out of memory
if (begin == NULL) {
if (begin == nullptr) {
TRI_set_errno(TRI_ERROR_OUT_OF_MEMORY);
return NULL;
return nullptr;
}
char* ptr = begin + sizeof(TRI_doc_mptr_t) * (blockSize - 1);
header = NULL;
header = nullptr;
for (; begin <= ptr; ptr -= sizeof(TRI_doc_mptr_t)) {
((TRI_doc_mptr_t*) ptr)->_data = header;
header = ptr;
}
TRI_ASSERT_MAINTAINER(headers != NULL);
TRI_ASSERT_MAINTAINER(headers != nullptr);
headers->_freelist = (TRI_doc_mptr_t*) header;
TRI_PushBackVectorPointer(&headers->_blocks, begin);
}
TRI_ASSERT_MAINTAINER(headers->_freelist != NULL);
TRI_ASSERT_MAINTAINER(headers->_freelist != nullptr);
TRI_doc_mptr_t* result = const_cast<TRI_doc_mptr_t*>(headers->_freelist);
TRI_ASSERT_MAINTAINER(result != NULL);
TRI_ASSERT_MAINTAINER(result != nullptr);
headers->_freelist = static_cast<TRI_doc_mptr_t const*>(result->_data);
result->_data = NULL;
result->_data = nullptr;
// put new header at the end of the list
if (headers->_begin == NULL) {
if (headers->_begin == nullptr) {
// list of headers is empty
TRI_ASSERT_MAINTAINER(headers->_nrLinked == 0);
TRI_ASSERT_MAINTAINER(headers->_totalSize == 0);
@ -411,20 +411,20 @@ static TRI_doc_mptr_t* RequestHeader (TRI_headers_t* h,
headers->_begin = result;
headers->_end = result;
result->_prev = NULL;
result->_next = NULL;
result->_prev = nullptr;
result->_next = nullptr;
}
else {
// list is not empty
TRI_ASSERT_MAINTAINER(headers->_nrLinked > 0);
TRI_ASSERT_MAINTAINER(headers->_totalSize > 0);
TRI_ASSERT_MAINTAINER(headers->_nrAllocated > 0);
TRI_ASSERT_MAINTAINER(headers->_begin != NULL);
TRI_ASSERT_MAINTAINER(headers->_end != NULL);
TRI_ASSERT_MAINTAINER(headers->_begin != nullptr);
TRI_ASSERT_MAINTAINER(headers->_end != nullptr);
headers->_end->_next = result;
result->_prev = headers->_end;
result->_next = NULL;
result->_next = nullptr;
headers->_end = result;
}
@ -444,7 +444,7 @@ static void ReleaseHeader (TRI_headers_t* h,
bool unlinkHeader) {
simple_headers_t* headers = (simple_headers_t*) h;
if (header == NULL) {
if (header == nullptr) {
return;
}
@ -461,7 +461,7 @@ static void ReleaseHeader (TRI_headers_t* h,
////////////////////////////////////////////////////////////////////////////////
/// @brief return the element at the head of the list
///
/// note: the element returned might be NULL
/// note: the element returned might be nullptr
////////////////////////////////////////////////////////////////////////////////
static TRI_doc_mptr_t* FrontHeaders (TRI_headers_t const* h) {
@ -473,7 +473,7 @@ static TRI_doc_mptr_t* FrontHeaders (TRI_headers_t const* h) {
////////////////////////////////////////////////////////////////////////////////
/// @brief return the element at the tail of the list
///
/// note: the element returned might be NULL
/// note: the element returned might be nullptr
////////////////////////////////////////////////////////////////////////////////
static TRI_doc_mptr_t* BackHeaders (TRI_headers_t const* h) {
@ -520,7 +520,7 @@ static void DumpHeaders (TRI_headers_t const* h) {
printf("begin ptr: %p\n", headers->_begin);
printf("end ptr: %p\n", headers->_end);
while (next != NULL) {
while (next != nullptr) {
printf("- header #%lu: ptr: %p, prev: %p, next: %p, key: %s\n",
(unsigned long) i,
next,
@ -529,7 +529,7 @@ static void DumpHeaders (TRI_headers_t const* h) {
next->_key);
i++;
if (next->_next == NULL) {
if (next->_next == nullptr) {
TRI_ASSERT_MAINTAINER(next == headers->_end);
}
@ -561,15 +561,14 @@ static void DumpHeaders (TRI_headers_t const* h) {
TRI_headers_t* TRI_CreateSimpleHeaders () {
simple_headers_t* headers = static_cast<simple_headers_t*>(TRI_Allocate(TRI_UNKNOWN_MEM_ZONE, sizeof(simple_headers_t), false));
if (headers == NULL) {
if (headers == nullptr) {
TRI_set_errno(TRI_ERROR_OUT_OF_MEMORY);
return NULL;
return nullptr;
}
headers->_freelist = NULL;
headers->_begin = NULL;
headers->_end = NULL;
headers->_freelist = nullptr;
headers->_begin = nullptr;
headers->_end = nullptr;
headers->_nrAllocated = 0;
headers->_nrLinked = 0;
headers->_totalSize = 0;

View File

@ -37,6 +37,7 @@
#include "VocBase/replication-logger.h"
#include "VocBase/server.h"
#include "VocBase/vocbase.h"
#include "Wal/DocumentOperation.h"
#include "Wal/LogfileManager.h"
#define LOG_TRX(trx, level, format, ...) \
@ -125,45 +126,6 @@ static int InitCollectionOperations (TRI_transaction_collection_t* trxCollection
return res;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief add a WAL-based operation for a collection
////////////////////////////////////////////////////////////////////////////////
static int AddOperation (TRI_transaction_collection_t* trxCollection,
TRI_voc_document_operation_e type,
TRI_doc_mptr_t* newHeader,
TRI_doc_mptr_t* oldHeader,
TRI_doc_mptr_t* oldData) {
TRI_DEBUG_INTENTIONAL_FAIL_IF("AddOperation-OOM") {
return TRI_ERROR_DEBUG;
}
int res;
if (trxCollection->_operations == nullptr) {
res = InitCollectionOperations(trxCollection);
if (res != TRI_ERROR_NO_ERROR) {
return TRI_ERROR_OUT_OF_MEMORY;
}
}
TRI_transaction_operation_t trxOperation;
trxOperation._type = type;
trxOperation._newHeader = newHeader;
trxOperation._oldHeader = oldHeader;
if (oldData != nullptr) {
trxOperation._oldData = *oldData;
}
else {
memset(&trxOperation._oldData, 0, sizeof(TRI_doc_mptr_t));
}
return TRI_PushBackVector(trxCollection->_operations, &trxOperation);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief rollback all operations for a collection
////////////////////////////////////////////////////////////////////////////////
@ -969,49 +931,56 @@ int TRI_AddIdFailedTransaction (TRI_vector_t* vector,
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief reserve room in the transactions operations vector
////////////////////////////////////////////////////////////////////////////////
int TRI_ReserveOperationTransaction (TRI_transaction_collection_t* trxCollection) {
if (trxCollection->_operations == nullptr) {
return InitCollectionOperations(trxCollection);
}
return TRI_ReserveVector(trxCollection->_operations, 1);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief add a WAL operation for a transaction collection
////////////////////////////////////////////////////////////////////////////////
int TRI_AddOperationTransaction (TRI_transaction_collection_t* trxCollection,
TRI_voc_document_operation_e type,
TRI_doc_mptr_t* newHeader,
TRI_doc_mptr_t* oldHeader,
TRI_doc_mptr_t* oldData,
int TRI_AddOperationTransaction (triagens::wal::DocumentOperation& operation,
bool syncRequested) {
TRI_transaction_collection_t* trxCollection = operation.trxCollection;
TRI_transaction_t* trx = trxCollection->_transaction;
// TODO: re-add replication!!
int res = AddOperation(trxCollection, type, newHeader, oldHeader, oldData);
assert(operation.header != nullptr);
if (res == TRI_ERROR_NO_ERROR) {
// if everything went well, this will ensure we don't double free etc. headers
trx->_hasOperations = true;
// update waitForSync for the transaction
bool const isSingleOperationTransaction = IsSingleOperationTransaction(trx);
// default is false
bool waitForSync = false;
if (isSingleOperationTransaction) {
waitForSync = syncRequested || trxCollection->_waitForSync;
}
else {
// upgrade the info for the transaction
if (syncRequested || trxCollection->_waitForSync) {
trx->_waitForSync = true;
}
}
else {
TRI_ASSERT_MAINTAINER(res == TRI_ERROR_OUT_OF_MEMORY || res == TRI_ERROR_DEBUG);
}
return res;
triagens::wal::SlotInfo slotInfo = triagens::wal::LogfileManager::instance()->allocateAndWrite(operation.marker->mem(), operation.marker->size(), waitForSync);
if (slotInfo.errorCode != TRI_ERROR_NO_ERROR) {
// some error occurred
operation.revert(isSingleOperationTransaction);
return slotInfo.errorCode;
}
if (operation.type == TRI_VOC_DOCUMENT_OPERATION_INSERT ||
operation.type == TRI_VOC_DOCUMENT_OPERATION_UPDATE) {
// adjust the data position in the header
operation.header->_data = slotInfo.mem;
triagens::wal::document_marker_t const* m = static_cast<triagens::wal::document_marker_t const*>(operation.header->_data);
operation.header->_key = static_cast<char*>(const_cast<void*>(slotInfo.mem)) + m->_offsetKey;
}
operation.handled(isSingleOperationTransaction);
if (operation.rid > 0) {
TRI_document_collection_t* document = (TRI_document_collection_t*) trxCollection->_collection->_collection;
TRI_SetRevisionDocumentCollection(document, operation.rid, false);
}
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////

View File

@ -250,23 +250,6 @@ bool TRI_IsLockedCollectionTransaction (TRI_transaction_collection_t*,
int TRI_AddIdFailedTransaction (TRI_vector_t*,
TRI_voc_tid_t);
////////////////////////////////////////////////////////////////////////////////
/// @brief reserve room in the transactions operations vector
////////////////////////////////////////////////////////////////////////////////
int TRI_ReserveOperationTransaction (TRI_transaction_collection_t*);
////////////////////////////////////////////////////////////////////////////////
/// @brief add a WAL operation fora transaction collection
////////////////////////////////////////////////////////////////////////////////
int TRI_AddOperationTransaction (TRI_transaction_collection_t*,
TRI_voc_document_operation_e,
struct TRI_doc_mptr_s*,
struct TRI_doc_mptr_s*,
struct TRI_doc_mptr_s*,
bool);
////////////////////////////////////////////////////////////////////////////////
/// @brief get a transaction's id
////////////////////////////////////////////////////////////////////////////////

View File

@ -0,0 +1,98 @@
#ifndef TRIAGENS_VOC_BASE_DOCUMENT_OPERATION_H
#define TRIAGENS_VOC_BASE_DOCUMENT_OPERATION_H 1
#include "Basics/Common.h"
#include "VocBase/voc-types.h"
#include "VocBase/document-collection.h"
#include "Wal/Marker.h"
struct TRI_transaction_collection_s;
namespace triagens {
namespace wal {
class Marker;
struct DocumentOperation {
DocumentOperation (Marker* marker,
struct TRI_transaction_collection_s* trxCollection,
TRI_voc_document_operation_e type,
TRI_voc_rid_t rid)
: marker(marker),
trxCollection(trxCollection),
header(nullptr),
type(type),
rid(rid) {
assert(marker != nullptr);
}
~DocumentOperation () {
if (marker != nullptr) {
delete marker;
}
}
void init () {
if (type == TRI_VOC_DOCUMENT_OPERATION_UPDATE) {
// copy the old header into a safe area
assert(header != nullptr);
oldHeader = *header;
}
}
void handled (bool isSingleOperationTransaction) {
assert(header != nullptr);
TRI_document_collection_t* document = (TRI_document_collection_t*) trxCollection->_collection->_collection;
if (type == TRI_VOC_DOCUMENT_OPERATION_INSERT) {
// nothing to do for insert
}
else if (type == TRI_VOC_DOCUMENT_OPERATION_UPDATE) {
// move header to the end of the list
document->_headers->moveBack(document->_headers, header, &oldHeader);
}
else if (type == TRI_VOC_DOCUMENT_OPERATION_REMOVE) {
// free or unlink the header
if (isSingleOperationTransaction) {
document->_headers->release(document->_headers, header, true);
}
else {
document->_headers->unlink(document->_headers, header);
}
}
// free the local marker buffer
delete[] marker->steal();
}
void revert (bool isSingleOperationTransaction) {
assert(header != nullptr);
TRI_document_collection_t* document = (TRI_document_collection_t*) trxCollection->_collection->_collection;
if (type == TRI_VOC_DOCUMENT_OPERATION_INSERT) {
document->_headers->release(document->_headers, header, true);
}
else if (type == TRI_VOC_DOCUMENT_OPERATION_UPDATE) {
document->_headers->move(document->_headers, header, &oldHeader);
}
else if (type == TRI_VOC_DOCUMENT_OPERATION_REMOVE) {
document->_headers->relink(document->_headers, header, header);
}
}
Marker* marker;
struct TRI_transaction_collection_s* trxCollection;
TRI_doc_mptr_t* header;
TRI_doc_mptr_t oldHeader;
TRI_voc_document_operation_e const type;
TRI_voc_rid_t const rid;
};
}
}
#endif

View File

@ -38,28 +38,6 @@ using namespace triagens::wal;
// --SECTION-- constructors and destructors
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief create a copy of a marker
////////////////////////////////////////////////////////////////////////////////
Marker::Marker (Marker&& other)
: _buffer(other._buffer),
_size(other._size) {
other._buffer = nullptr;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief create a copy of a marker
////////////////////////////////////////////////////////////////////////////////
Marker::Marker (Marker const& other)
: _buffer(new char[other._size]),
_size(other._size) {
memcpy(_buffer, other._buffer, other._size);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief create marker with a sized buffer
////////////////////////////////////////////////////////////////////////////////
@ -70,10 +48,14 @@ Marker::Marker (TRI_df_marker_type_e type,
_size(static_cast<uint32_t>(size)) {
TRI_df_marker_t* m = reinterpret_cast<TRI_df_marker_t*>(begin());
memset(m, 0, size); // to shut up valgrind
m->_type = type;
m->_size = static_cast<TRI_voc_size_t>(size);
m->_crc = 0;
m->_tick = 0;
std::cout << "CREATED A MARKER OF SIZE: " << size << ", buffer: " << (void*) _buffer << "\n";
}
////////////////////////////////////////////////////////////////////////////////
@ -82,7 +64,7 @@ Marker::Marker (TRI_df_marker_type_e type,
Marker::~Marker () {
if (_buffer != nullptr) {
delete _buffer;
delete[] _buffer;
}
}
@ -351,25 +333,25 @@ void DocumentMarker::dump () const {
/// @brief clone a marker from another marker
////////////////////////////////////////////////////////////////////////////////
DocumentMarker DocumentMarker::clone (TRI_df_marker_t const* other,
TRI_voc_tick_t databaseId,
TRI_voc_cid_t collectionId,
TRI_voc_rid_t revisionId,
TRI_voc_tid_t transactionId,
triagens::basics::JsonLegend& legend,
TRI_shaped_json_t const* shapedJson) {
DocumentMarker* DocumentMarker::clone (TRI_df_marker_t const* other,
TRI_voc_tick_t databaseId,
TRI_voc_cid_t collectionId,
TRI_voc_rid_t revisionId,
TRI_voc_tid_t transactionId,
triagens::basics::JsonLegend& legend,
TRI_shaped_json_t const* shapedJson) {
char const* base = reinterpret_cast<char const*>(other);
if (other->_type == TRI_DOC_MARKER_KEY_DOCUMENT) {
TRI_doc_document_key_marker_t const* original = reinterpret_cast<TRI_doc_document_key_marker_t const*>(other);
return DocumentMarker(databaseId,
collectionId,
revisionId,
transactionId,
std::string(base + original->_offsetKey),
legend,
shapedJson);
return new DocumentMarker(databaseId,
collectionId,
revisionId,
transactionId,
std::string(base + original->_offsetKey),
legend,
shapedJson);
}
else {
assert(other->_type == TRI_WAL_MARKER_DOCUMENT);
@ -379,13 +361,13 @@ DocumentMarker DocumentMarker::clone (TRI_df_marker_t const* other,
assert(original->_databaseId == databaseId);
assert(original->_collectionId == collectionId);
return DocumentMarker(original->_databaseId,
original->_collectionId,
revisionId,
transactionId,
std::string(base + original->_offsetKey),
legend,
shapedJson);
return new DocumentMarker(original->_databaseId,
original->_collectionId,
revisionId,
transactionId,
std::string(base + original->_offsetKey),
legend,
shapedJson);
}
}
@ -489,13 +471,13 @@ void EdgeMarker::dump () const {
/// @brief clone a marker from another marker
////////////////////////////////////////////////////////////////////////////////
EdgeMarker EdgeMarker::clone (TRI_df_marker_t const* other,
TRI_voc_tick_t databaseId,
TRI_voc_cid_t collectionId,
TRI_voc_rid_t revisionId,
TRI_voc_tid_t transactionId,
triagens::basics::JsonLegend& legend,
TRI_shaped_json_t const* shapedJson) {
EdgeMarker* EdgeMarker::clone (TRI_df_marker_t const* other,
TRI_voc_tick_t databaseId,
TRI_voc_cid_t collectionId,
TRI_voc_rid_t revisionId,
TRI_voc_tid_t transactionId,
triagens::basics::JsonLegend& legend,
TRI_shaped_json_t const* shapedJson) {
char const* base = reinterpret_cast<char const*>(other);
if (other->_type == TRI_DOC_MARKER_KEY_EDGE) {
@ -507,14 +489,14 @@ EdgeMarker EdgeMarker::clone (TRI_df_marker_t const* other,
edge._toKey = (TRI_voc_key_t) base + original->_offsetToKey;
edge._fromKey = (TRI_voc_key_t) base + original->_offsetFromKey;
return EdgeMarker(databaseId,
collectionId,
revisionId,
transactionId,
std::string(base + original->base._offsetKey),
&edge,
legend,
shapedJson);
return new EdgeMarker(databaseId,
collectionId,
revisionId,
transactionId,
std::string(base + original->base._offsetKey),
&edge,
legend,
shapedJson);
}
else {
assert(other->_type == TRI_WAL_MARKER_EDGE);
@ -530,14 +512,14 @@ EdgeMarker EdgeMarker::clone (TRI_df_marker_t const* other,
edge._toKey = (TRI_voc_key_t) base + original->_offsetToKey;
edge._fromKey = (TRI_voc_key_t) base + original->_offsetFromKey;
return EdgeMarker(original->_databaseId,
original->_collectionId,
revisionId,
transactionId,
std::string(base + original->_offsetKey),
&edge,
legend,
shapedJson);
return new EdgeMarker(original->_databaseId,
original->_collectionId,
revisionId,
transactionId,
std::string(base + original->_offsetKey),
&edge,
legend,
shapedJson);
}
}
@ -591,6 +573,8 @@ void RemoveMarker::dump () const {
<< ", KEY: " << key()
<< "\n";
std::cout << "BEGIN: " << begin() << ", SIZE: " << size() << "\n";
std::cout << "BINARY: '" << stringifyPart(begin(), size()) << "'\n";
}

View File

@ -147,20 +147,26 @@ namespace triagens {
Marker& operator= (Marker const&) = delete;
Marker (Marker&&);
Marker (Marker&&) = delete;
Marker (Marker const&);
Marker (Marker const&) = delete;
Marker (TRI_df_marker_type_e,
size_t);
virtual ~Marker ();
// -----------------------------------------------------------------------------
// --SECTION-- public methods
// -----------------------------------------------------------------------------
public:
virtual ~Marker ();
inline char* steal () {
char* buffer = _buffer;
_buffer = nullptr;
return buffer;
}
static inline size_t alignedSize (size_t size) {
return TRI_DF_ALIGN_BLOCK(size);
@ -379,13 +385,13 @@ namespace triagens {
void dump () const;
static DocumentMarker clone (TRI_df_marker_t const*,
TRI_voc_tick_t,
TRI_voc_cid_t,
TRI_voc_rid_t,
TRI_voc_tid_t,
triagens::basics::JsonLegend&,
TRI_shaped_json_t const*);
static DocumentMarker* clone (TRI_df_marker_t const*,
TRI_voc_tick_t,
TRI_voc_cid_t,
TRI_voc_rid_t,
TRI_voc_tid_t,
triagens::basics::JsonLegend&,
TRI_shaped_json_t const*);
};
// -----------------------------------------------------------------------------
@ -458,13 +464,13 @@ namespace triagens {
void dump () const;
static EdgeMarker clone (TRI_df_marker_t const*,
TRI_voc_tick_t,
TRI_voc_cid_t,
TRI_voc_rid_t,
TRI_voc_tid_t,
triagens::basics::JsonLegend&,
TRI_shaped_json_t const*);
static EdgeMarker* clone (TRI_df_marker_t const*,
TRI_voc_tick_t,
TRI_voc_cid_t,
TRI_voc_rid_t,
TRI_voc_tid_t,
triagens::basics::JsonLegend&,
TRI_shaped_json_t const*);
};
// -----------------------------------------------------------------------------

View File

@ -89,7 +89,8 @@ std::string Slot::statusText () const {
void Slot::fill (void* src,
size_t size) {
assert(size == _size);
assert(size == _size);
assert(src != nullptr);
TRI_df_marker_t* marker = static_cast<TRI_df_marker_t*>(src);