diff --git a/arangod/Utils/Transaction.h b/arangod/Utils/Transaction.h index fee09f2ff8..4d0dc001b1 100644 --- a/arangod/Utils/Transaction.h +++ b/arangod/Utils/Transaction.h @@ -982,12 +982,12 @@ namespace triagens { TRI_primary_collection_t* primary = primaryCollection(trxCollection); - int res = primary->remove(trxCollection, - (TRI_voc_key_t) key.c_str(), - rid, - &updatePolicy, - ! isLocked(trxCollection, TRI_TRANSACTION_WRITE), - forceSync); + int res = primary->removeDocument(trxCollection, + (TRI_voc_key_t) key.c_str(), + rid, + &updatePolicy, + ! isLocked(trxCollection, TRI_TRANSACTION_WRITE), + forceSync); return res; } @@ -1023,12 +1023,12 @@ namespace triagens { for (size_t i = 0; i < n; ++i) { const string& id = ids[i]; - res = primary->remove(trxCollection, - (const TRI_voc_key_t) id.c_str(), - 0, - 0, // policy - false, - forceSync); + res = primary->removeDocument(trxCollection, + (TRI_voc_key_t) id.c_str(), + 0, + nullptr, // policy + false, + forceSync); if (res != TRI_ERROR_NO_ERROR) { diff --git a/arangod/VocBase/document-collection.cpp b/arangod/VocBase/document-collection.cpp index 29cb71b382..b20ca463fd 100644 --- a/arangod/VocBase/document-collection.cpp +++ b/arangod/VocBase/document-collection.cpp @@ -1514,8 +1514,7 @@ static void DebugHeadersDocumentCollection (TRI_document_collection_t* collectio //////////////////////////////////////////////////////////////////////////////// static int InsertIndexes (TRI_transaction_collection_t* trxCollection, - TRI_doc_mptr_t* header, - bool waitForSync) { + TRI_doc_mptr_t* header) { TRI_document_collection_t* document = (TRI_document_collection_t*) trxCollection->_collection->_collection; @@ -1524,7 +1523,6 @@ static int InsertIndexes (TRI_transaction_collection_t* trxCollection, if (res != TRI_ERROR_NO_ERROR) { // insert has failed - document->_headers->release(document->_headers, header, true); return res; } @@ -1535,31 +1533,19 @@ static int InsertIndexes (TRI_transaction_collection_t* trxCollection, // insertion into secondary indexes failed DeleteSecondaryIndexes(document, header, true); DeletePrimaryIndex(document, header, true); - document->_headers->release(document->_headers, header, true); - - return res; } - - res = TRI_AddOperationTransaction(trxCollection, - TRI_VOC_DOCUMENT_OPERATION_INSERT, - header, - NULL, - NULL, - header->_data, - header->_rid, - waitForSync); + + return res; +} - if (res != TRI_ERROR_NO_ERROR) { - // something has failed.... now delete from the indexes again - RollbackInsert(document, header, NULL); +//////////////////////////////////////////////////////////////////////////////// +/// @brief post-insert operation +//////////////////////////////////////////////////////////////////////////////// - return res; - } - - // ............................................................................. - // post process insert - // ............................................................................. +static int PostInsertIndexes (TRI_transaction_collection_t* trxCollection, + TRI_doc_mptr_t* header) { + TRI_document_collection_t* document = (TRI_document_collection_t*) trxCollection->_collection->_collection; size_t const n = document->_allIndexes._length; for (size_t i = 0; i < n; ++i) { @@ -1570,11 +1556,37 @@ static int InsertIndexes (TRI_transaction_collection_t* trxCollection, } } + // TODO: post-insert will never return an error return TRI_ERROR_NO_ERROR; } //////////////////////////////////////////////////////////////////////////////// -/// @brief insert a shaped-json document (or edge) into the WAL +/// @brief add an operation to a transaction +//////////////////////////////////////////////////////////////////////////////// + +static int AddTransactionOperation (TRI_transaction_collection_t* trxCollection, + TRI_voc_document_operation_e type, + TRI_doc_mptr_t* newHeader, + TRI_doc_mptr_t* oldHeader, + TRI_doc_mptr_t* oldData, + void const* marker, + TRI_voc_rid_t rid, + bool syncRequested) { + + int res = TRI_AddOperationTransaction(trxCollection, + type, + newHeader, + oldHeader, + oldData, + marker, + rid, + syncRequested); + + return res; +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief insert a shaped-json document (or edge) /// note: key might be NULL. in this case, a key is auto-generated //////////////////////////////////////////////////////////////////////////////// @@ -1595,6 +1607,7 @@ static int InsertDocumentShapedJson (TRI_transaction_collection_t* trxCollection if (rid == 0) { // generate new revision id tick = TRI_NewTickServer(); + rid = static_cast(tick); } else { tick = static_cast(rid); @@ -1643,7 +1656,7 @@ static int InsertDocumentShapedJson (TRI_transaction_collection_t* trxCollection triagens::wal::DocumentMarker marker(primary->base._vocbase->_id, primary->base._info._cid, rid, - TRI_GetMarkerIdTransaction(trxCollection->_transaction), + trxCollection->_transaction->_id, keyString, legend, shaped); @@ -1657,7 +1670,7 @@ static int InsertDocumentShapedJson (TRI_transaction_collection_t* trxCollection triagens::wal::EdgeMarker marker(primary->base._vocbase->_id, primary->base._info._cid, rid, - TRI_GetMarkerIdTransaction(trxCollection->_transaction), + trxCollection->_transaction->_id, keyString, edge, legend, @@ -1688,15 +1701,47 @@ static int InsertDocumentShapedJson (TRI_transaction_collection_t* trxCollection return TRI_ERROR_OUT_OF_MEMORY; } + // update the header we got triagens::wal::document_marker_t const* m = static_cast(slotInfo.mem); - header->_rid = rid; header->_fid = 0; // TODO: use WAL fid // let header point to WAL location - header->_data = (void*) m; - header->_key = (char*) m + m->_offsetKey; + header->_data = slotInfo.mem; + header->_key = static_cast(const_cast(slotInfo.mem)) + m->_offsetKey; - res = InsertIndexes(trxCollection, header, forceSync); + // insert into indexes + res = InsertIndexes(trxCollection, header); + + if (res != TRI_ERROR_NO_ERROR) { + // release the header + document->_headers->release(document->_headers, header, true); + + return res; + } + + // add the operation to the running transaction + res = AddTransactionOperation(trxCollection, + TRI_VOC_DOCUMENT_OPERATION_INSERT, + header, + nullptr, + nullptr, + header->_data, + header->_rid, + forceSync); + + if (res != TRI_ERROR_NO_ERROR) { + // something has failed.... now delete from the indexes again + RollbackInsert(document, header, nullptr); + // TODO: do we need to release the header here? + + return res; + } + + // execute a post-insert + res = PostInsertIndexes(trxCollection, header); + + // TODO: do we need to release the header if something goes wrong? + // TODO: post-insert will never return an error // eager unlock collectionLocker.unlock(); @@ -1919,6 +1964,105 @@ static int UpdateShapedJson (TRI_transaction_collection_t* trxCollection, return res; } +//////////////////////////////////////////////////////////////////////////////// +/// @brief removes a document from the indexes +//////////////////////////////////////////////////////////////////////////////// + +static int RemoveIndexes (TRI_transaction_collection_t* trxCollection, + TRI_doc_update_policy_t const* policy, + triagens::wal::RemoveMarker const& marker, + const bool forceSync) { + + TRI_primary_collection_t* primary = trxCollection->_collection->_collection; + + // get the existing header pointer + TRI_doc_mptr_t* header = static_cast(TRI_LookupByKeyAssociativePointer(&primary->_primaryIndex, marker.key())); + + if (! IsVisible(header)) { + return TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND; + } + + // ............................................................................. + // check the revision + // ............................................................................. + + int res = TRI_CheckUpdatePolicy(policy, header->_rid); + + if (res != TRI_ERROR_NO_ERROR) { + return res; + } + + // delete from indexes + TRI_document_collection_t* document = (TRI_document_collection_t*) primary; + res = DeleteSecondaryIndexes(document, header, false); + + if (res != TRI_ERROR_NO_ERROR) { + // deletion failed. roll back + InsertSecondaryIndexes(document, header, true); + + return res; + } + + res = DeletePrimaryIndex(document, header, false); + + if (res != TRI_ERROR_NO_ERROR) { + // deletion failed. roll back + InsertSecondaryIndexes(document, header, true); + + return res; + } + + res = AddTransactionOperation(trxCollection, + TRI_VOC_DOCUMENT_OPERATION_REMOVE, + nullptr, + header, + header, + marker.mem(), + marker.rid(), + forceSync); + + if (res != TRI_ERROR_NO_ERROR) { + // deletion failed. roll back + RollbackRemove(document, nullptr, header, true); // TODO: check if "true" is correct! + } + + return res; +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief removes a shaped-json document (or edge) +//////////////////////////////////////////////////////////////////////////////// + +static int RemoveDocumentShapedJson (TRI_transaction_collection_t* trxCollection, + TRI_voc_key_t key, + TRI_voc_rid_t rid, + TRI_doc_update_policy_t const* policy, + bool lock, + bool forceSync) { + assert(key != nullptr); + + TRI_primary_collection_t* primary = trxCollection->_collection->_collection; + + triagens::wal::RemoveMarker marker(primary->base._vocbase->_id, + primary->base._info._cid, + rid, + trxCollection->_transaction->_id, + std::string(key)); + + triagens::wal::SlotInfo slotInfo = triagens::wal::LogfileManager::instance()->writeMarker(marker, forceSync); + + if (slotInfo.errorCode != TRI_ERROR_NO_ERROR) { + // some error occurred + return slotInfo.errorCode; + } + + triagens::arango::CollectionWriteLocker collectionLocker(primary, lock); + + int res = RemoveIndexes(trxCollection, policy, marker, forceSync); + + return res; +} + //////////////////////////////////////////////////////////////////////////////// /// @brief deletes a json document given the identifier //////////////////////////////////////////////////////////////////////////////// @@ -3140,6 +3284,7 @@ static bool InitDocumentCollection (TRI_document_collection_t* document, document->base.read = ReadShapedJson; document->base.update = UpdateShapedJson; document->base.remove = RemoveShapedJson; + document->base.removeDocument = RemoveDocumentShapedJson; // we do not require an initial journal document->_requestedJournalSize = 0; diff --git a/arangod/VocBase/primary-collection.h b/arangod/VocBase/primary-collection.h index 126689ebde..60bc8abd1d 100644 --- a/arangod/VocBase/primary-collection.h +++ b/arangod/VocBase/primary-collection.h @@ -337,6 +337,7 @@ typedef struct TRI_primary_collection_s { int (*update) (struct TRI_transaction_collection_s*, const TRI_voc_key_t, TRI_voc_rid_t, TRI_doc_mptr_t*, TRI_shaped_json_t const*, struct TRI_doc_update_policy_s const*, const bool, const bool); int (*remove) (struct TRI_transaction_collection_s*, const TRI_voc_key_t, TRI_voc_rid_t, struct TRI_doc_update_policy_s const*, const bool, const bool); + int (*removeDocument) (struct TRI_transaction_collection_s*, TRI_voc_key_t, TRI_voc_rid_t, struct TRI_doc_update_policy_s const*, bool, bool); TRI_doc_collection_info_t* (*figures) (struct TRI_primary_collection_s* collection); TRI_voc_size_t (*size) (struct TRI_primary_collection_s* collection); diff --git a/arangod/Wal/Marker.cpp b/arangod/Wal/Marker.cpp index 48c681de41..59761fbb2b 100644 --- a/arangod/Wal/Marker.cpp +++ b/arangod/Wal/Marker.cpp @@ -76,14 +76,12 @@ Marker::~Marker () { void Marker::storeSizedString (size_t offset, char const* value, size_t length) { - // init key buffer char* p = static_cast(base()) + offset; - memset(p, '\0', (1 + ((length + 1) / 8)) * 8); - // store length of key - *p = (uint8_t) length; // store actual key - memcpy(p + 1, value, length); + memcpy(p, value, length); + // append NUL byte + p[length] = '\0'; } // ----------------------------------------------------------------------------- @@ -196,7 +194,7 @@ DocumentMarker::DocumentMarker (TRI_voc_tick_t databaseId, triagens::basics::JsonLegend& legend, TRI_shaped_json_t const* shapedJson) : Marker(TRI_WAL_MARKER_DOCUMENT, - sizeof(document_marker_t) + alignedSize(key.size() + 2) + legend.getSize() + shapedJson->_data.length) { + sizeof(document_marker_t) + alignedSize(key.size() + 1) + legend.getSize() + shapedJson->_data.length) { document_marker_t* m = reinterpret_cast(base()); m->_databaseId = databaseId; m->_collectionId = collectionId; @@ -204,7 +202,7 @@ DocumentMarker::DocumentMarker (TRI_voc_tick_t databaseId, m->_tid = transactionId; m->_shape = shapedJson->_sid; m->_offsetKey = sizeof(document_marker_t); // start position of key - m->_offsetLegend = m->_offsetKey + alignedSize(key.size() + 2); + m->_offsetLegend = m->_offsetKey + alignedSize(key.size() + 1); m->_offsetJson = m->_offsetLegend + alignedSize(legend.getSize()); storeSizedString(m->_offsetKey, key.c_str(), key.size()); @@ -250,7 +248,7 @@ EdgeMarker::EdgeMarker (TRI_voc_tick_t databaseId, triagens::basics::JsonLegend& legend, TRI_shaped_json_t const* shapedJson) : Marker(TRI_WAL_MARKER_EDGE, - sizeof(edge_marker_t) + alignedSize(key.size() + 2) + alignedSize(strlen(edge->_fromKey) + 2) + alignedSize(strlen(edge->_toKey) + 2) + legend.getSize() + shapedJson->_data.length) { + sizeof(edge_marker_t) + alignedSize(key.size() + 1) + alignedSize(strlen(edge->_fromKey) + 1) + alignedSize(strlen(edge->_toKey) + 1) + legend.getSize() + shapedJson->_data.length) { document_marker_t* m = reinterpret_cast(base()); edge_marker_t* e = reinterpret_cast(base()); @@ -263,9 +261,9 @@ EdgeMarker::EdgeMarker (TRI_voc_tick_t databaseId, m->_offsetKey = sizeof(edge_marker_t); // start position of key e->_toCid = edge->_toCid; e->_fromCid = edge->_fromCid; - e->_offsetToKey = m->_offsetKey + alignedSize(key.size() + 2); - e->_offsetFromKey = e->_offsetToKey + alignedSize(strlen(edge->_toKey) + 2); - m->_offsetLegend = e->_offsetFromKey + alignedSize(strlen(edge->_fromKey) + 2); + e->_offsetToKey = m->_offsetKey + alignedSize(key.size() + 1); + e->_offsetFromKey = e->_offsetToKey + alignedSize(strlen(edge->_toKey) + 1); + m->_offsetLegend = e->_offsetFromKey + alignedSize(strlen(edge->_fromKey) + 1); m->_offsetJson = m->_offsetLegend + alignedSize(legend.getSize()); // store keys @@ -311,7 +309,7 @@ RemoveMarker::RemoveMarker (TRI_voc_tick_t databaseId, TRI_voc_tid_t transactionId, std::string const& key) : Marker(TRI_WAL_MARKER_REMOVE, - sizeof(remove_marker_t) + alignedSize(key.size() + 2)) { + sizeof(remove_marker_t) + alignedSize(key.size() + 1)) { remove_marker_t* m = reinterpret_cast(base()); m->_databaseId = databaseId; m->_collectionId = collectionId; diff --git a/arangod/Wal/Marker.h b/arangod/Wal/Marker.h index a2bc1993db..48dc54237d 100644 --- a/arangod/Wal/Marker.h +++ b/arangod/Wal/Marker.h @@ -284,6 +284,17 @@ namespace triagens { std::string const&); ~RemoveMarker (); + + public: + + inline char const* key () const { + return base() + sizeof(remove_marker_t) + 1; + } + + inline TRI_voc_rid_t rid () const { + remove_marker_t const* m = reinterpret_cast(base()); + return m->_rid; + } }; }