From d9f34fd88e19cce40fb635b95791c7d36af277ff Mon Sep 17 00:00:00 2001 From: Jan Steemann Date: Wed, 31 Oct 2012 18:04:12 +0100 Subject: [PATCH] refactored and simplified document CRUD methods --- arangod/RestHandler/RestDocumentHandler.cpp | 7 +- arangod/V8Server/v8-vocbase.cpp | 33 +- arangod/VocBase/document-collection.c | 1111 +++++++++---------- arangod/VocBase/primary-collection.c | 36 +- arangod/VocBase/primary-collection.h | 27 +- 5 files changed, 581 insertions(+), 633 deletions(-) diff --git a/arangod/RestHandler/RestDocumentHandler.cpp b/arangod/RestHandler/RestDocumentHandler.cpp index 31ebcfa03f..9b5fe0d949 100644 --- a/arangod/RestHandler/RestDocumentHandler.cpp +++ b/arangod/RestHandler/RestDocumentHandler.cpp @@ -1021,8 +1021,13 @@ bool RestDocumentHandler::deleteDocument () { WriteTransaction trx(&ca); + TRI_doc_operation_context_t context; + TRI_InitContextPrimaryCollection(&context, trx.primary(), policy, forceSync); + context._expectedRid = revision; + context._previousRid = &rid; + // unlocking is performed in destroy() - res = trx.primary()->destroy(trx.primary(), (TRI_voc_key_t) key.c_str(), revision, &rid, policy, false, forceSync); + res = trx.primary()->destroy(&context, (TRI_voc_key_t) key.c_str()); trx.end(); diff --git a/arangod/V8Server/v8-vocbase.cpp b/arangod/V8Server/v8-vocbase.cpp index 39331be726..662ca96784 100755 --- a/arangod/V8Server/v8-vocbase.cpp +++ b/arangod/V8Server/v8-vocbase.cpp @@ -722,15 +722,24 @@ static v8::Handle ReplaceVocbaseCol (TRI_vocbase_t* vocbase, forceSync = TRI_ObjectToBoolean(argv[3]); } + TRI_voc_rid_t oldRid = 0; + + TRI_doc_operation_context_t context; + TRI_InitContextPrimaryCollection(&context, primary, policy, forceSync); + context._expectedRid = rid; + context._previousRid = &oldRid; + context._release = true; + // ............................................................................. // inside a write transaction // ............................................................................. primary->beginWrite(primary); - TRI_voc_rid_t oldRid = 0; - TRI_doc_mptr_t mptr = primary->update(primary, shaped, key, rid, &oldRid, policy, true, forceSync); - if (key) TRI_FreeString(TRI_CORE_MEM_ZONE, key); + TRI_doc_mptr_t mptr = primary->update(&context, shaped, key); + if (key) { + TRI_FreeString(TRI_CORE_MEM_ZONE, key); + } // ............................................................................. // outside a write transaction @@ -1240,9 +1249,15 @@ static v8::Handle DeleteVocbaseCol (TRI_vocbase_t* vocbase, TRI_primary_collection_t* primary = collection->_collection; TRI_voc_rid_t oldRid; + + TRI_doc_operation_context_t context; + TRI_InitContextPrimaryCollection(&context, primary, policy, forceSync); + context._release = true; + context._expectedRid = rid; + context._previousRid = &oldRid; primary->beginWrite(primary); - int res = primary->destroy(primary, key, rid, &oldRid, policy, true, forceSync); + int res = primary->destroy(&context, key); if (key) { TRI_FreeString(TRI_CORE_MEM_ZONE, key); } @@ -4785,6 +4800,9 @@ static v8::Handle JS_TruncateVocbaseCol (v8::Arguments const& argv) { TRI_doc_mptr_t const** ptr; TRI_doc_mptr_t const** end; + + TRI_doc_operation_context_t context; + TRI_InitContextPrimaryCollection(&context, primary, policy, forceSync); primary->beginWrite(collection->_collection); @@ -4797,12 +4815,15 @@ static v8::Handle JS_TruncateVocbaseCol (v8::Arguments const& argv) { TRI_PushBackVectorPointer(&documents, (void*) *ptr); } } - + // now delete all documents. this will modify the index as well for (size_t i = 0; i < documents._length; ++i) { TRI_doc_mptr_t const* d = (TRI_doc_mptr_t const*) documents._buffer[i]; + + context._expectedRid = d->_rid; + context._previousRid = &oldRid; - int res = primary->destroy(primary, d->_key, d->_rid, &oldRid, policy, false, forceSync); + int res = primary->destroy(&context, d->_key); if (res != TRI_ERROR_NO_ERROR) { // an error occurred, but we simply go on because truncate should remove all documents } diff --git a/arangod/VocBase/document-collection.c b/arangod/VocBase/document-collection.c index 46f3724766..ff85f78f88 100644 --- a/arangod/VocBase/document-collection.c +++ b/arangod/VocBase/document-collection.c @@ -41,74 +41,58 @@ // --SECTION-- forward declarations // ----------------------------------------------------------------------------- -static int CreateImmediateIndexes (TRI_document_collection_t* collection, - TRI_doc_mptr_t* header); +static int CreateImmediateIndexes (TRI_document_collection_t*, + TRI_doc_mptr_t*); -static int UpdateImmediateIndexes (TRI_document_collection_t* collection, - TRI_doc_mptr_t const* header, - TRI_doc_mptr_t const* update); +static int UpdateImmediateIndexes (TRI_document_collection_t*, + TRI_doc_mptr_t const*, + TRI_doc_mptr_t const*); -static int DeleteImmediateIndexes (TRI_document_collection_t* collection, - TRI_doc_mptr_t const* header, +static int DeleteImmediateIndexes (TRI_document_collection_t*, + TRI_doc_mptr_t const*, TRI_voc_tick_t); -static TRI_doc_mptr_t UpdateDocument (TRI_document_collection_t* collection, - TRI_doc_mptr_t const* header, - TRI_doc_document_key_marker_t* marker, - TRI_voc_size_t markerSize, - void const* keyBody, - TRI_voc_size_t keyBodySize, - void const* body, - TRI_voc_size_t bodySize, - TRI_voc_rid_t rid, - TRI_voc_rid_t* oldRid, - TRI_doc_update_policy_e policy, - TRI_df_marker_t** result, - bool release, - bool allowRollback, - bool forceSync); +static TRI_doc_mptr_t UpdateDocument (TRI_doc_operation_context_t*, + TRI_doc_mptr_t const*, + TRI_doc_document_key_marker_t*, + TRI_voc_size_t, + void const*, + TRI_voc_size_t, + void const*, + TRI_voc_size_t, + TRI_df_marker_t**); -static int DeleteDocument (TRI_document_collection_t* collection, - TRI_doc_deletion_key_marker_t* marker, - void const* keyBody, - TRI_voc_size_t keyBodySize, - TRI_voc_rid_t rid, - TRI_voc_rid_t* oldRid, - TRI_doc_update_policy_e policy, - bool release, - bool forceSync); +static int DeleteDocument (TRI_doc_operation_context_t*, + TRI_doc_deletion_key_marker_t*, + void const*, + TRI_voc_size_t); -static int DeleteShapedJson (TRI_primary_collection_t* doc, - TRI_voc_key_t key, - TRI_voc_rid_t rid, - TRI_voc_rid_t* oldRid, - TRI_doc_update_policy_e policy, - bool release, - bool forceSync); +static int DeleteShapedJson (TRI_doc_operation_context_t*, + TRI_voc_key_t); -static int CapConstraintFromJson (TRI_document_collection_t* sim, - TRI_json_t* definition, +static int CapConstraintFromJson (TRI_document_collection_t*, + TRI_json_t*, TRI_idx_iid_t); -static int BitarrayIndexFromJson (TRI_document_collection_t* sim, - TRI_json_t* definition, +static int BitarrayIndexFromJson (TRI_document_collection_t*, + TRI_json_t*, TRI_idx_iid_t); -static int GeoIndexFromJson (TRI_document_collection_t* sim, - TRI_json_t* definition, - TRI_idx_iid_t iid); +static int GeoIndexFromJson (TRI_document_collection_t*, + TRI_json_t*, + TRI_idx_iid_t); -static int HashIndexFromJson (TRI_document_collection_t* sim, - TRI_json_t* definition, - TRI_idx_iid_t iid); +static int HashIndexFromJson (TRI_document_collection_t*, + TRI_json_t*, + TRI_idx_iid_t); -static int SkiplistIndexFromJson (TRI_document_collection_t* sim, - TRI_json_t* definition, - TRI_idx_iid_t iid); +static int SkiplistIndexFromJson (TRI_document_collection_t*, + TRI_json_t*, + TRI_idx_iid_t); -static int PriorityQueueFromJson (TRI_document_collection_t* sim, - TRI_json_t* definition, - TRI_idx_iid_t iid); +static int PriorityQueueFromJson (TRI_document_collection_t*, + TRI_json_t*, + TRI_idx_iid_t); // ----------------------------------------------------------------------------- // --SECTION-- JOURNALS @@ -130,48 +114,53 @@ static int PriorityQueueFromJson (TRI_document_collection_t* sim, /// to allow the gc to start when waiting for a journal to appear. //////////////////////////////////////////////////////////////////////////////// -static TRI_datafile_t* SelectJournal (TRI_document_collection_t* sim, +static TRI_datafile_t* SelectJournal (TRI_document_collection_t* document, TRI_voc_size_t size, TRI_df_marker_t** result) { TRI_datafile_t* datafile; + TRI_collection_t* base; int res; size_t i; size_t n; + + base = &document->base.base; - TRI_LOCK_JOURNAL_ENTRIES_DOC_COLLECTION(sim); + TRI_LOCK_JOURNAL_ENTRIES_DOC_COLLECTION(document); - if (sim->base.base._maximumMarkerSize < size) { - sim->base.base._maximumMarkerSize = size; + if (base->_maximumMarkerSize < size) { + base->_maximumMarkerSize = size; } - while (sim->base.base._state == TRI_COL_STATE_WRITE) { - n = sim->base.base._journals._length; + while (base->_state == TRI_COL_STATE_WRITE) { + n = base->_journals._length; for (i = 0; i < n; ++i) { // select datafile - datafile = sim->base.base._journals._buffer[i]; + datafile = base->_journals._buffer[i]; // try to reserve space res = TRI_ReserveElementDatafile(datafile, size, result); // in case of full datafile, try next if (res == TRI_ERROR_NO_ERROR) { - TRI_UNLOCK_JOURNAL_ENTRIES_DOC_COLLECTION(sim); + TRI_UNLOCK_JOURNAL_ENTRIES_DOC_COLLECTION(document); + return datafile; } else if (res != TRI_ERROR_ARANGO_DATAFILE_FULL) { - TRI_UNLOCK_JOURNAL_ENTRIES_DOC_COLLECTION(sim); + TRI_UNLOCK_JOURNAL_ENTRIES_DOC_COLLECTION(document); + return NULL; } } - TRI_INC_SYNCHRONISER_WAITER_VOC_BASE(sim->base.base._vocbase); - TRI_WAIT_JOURNAL_ENTRIES_DOC_COLLECTION(sim); - TRI_DEC_SYNCHRONISER_WAITER_VOC_BASE(sim->base.base._vocbase); + TRI_INC_SYNCHRONISER_WAITER_VOC_BASE(base->_vocbase); + TRI_WAIT_JOURNAL_ENTRIES_DOC_COLLECTION(document); + TRI_DEC_SYNCHRONISER_WAITER_VOC_BASE(base->_vocbase); } - TRI_UNLOCK_JOURNAL_ENTRIES_DOC_COLLECTION(sim); + TRI_UNLOCK_JOURNAL_ENTRIES_DOC_COLLECTION(document); return NULL; } @@ -184,22 +173,17 @@ static TRI_datafile_t* SelectJournal (TRI_document_collection_t* sim, /// datafile and has been synced. //////////////////////////////////////////////////////////////////////////////// -static void WaitSync (TRI_document_collection_t* sim, +static void WaitSync (TRI_document_collection_t* document, TRI_datafile_t* journal, - char const* position, - bool forceSync) { + char const* position) { TRI_collection_t* base; - base = &sim->base.base; + base = &document->base.base; // no condition at all. Do NOT acquire a lock, in the worst // case we will miss a parameter change. - if (! base->_waitForSync && ! forceSync) { - return; - } - - TRI_LOCK_JOURNAL_ENTRIES_DOC_COLLECTION(sim); + TRI_LOCK_JOURNAL_ENTRIES_DOC_COLLECTION(document); // wait until the sync condition is fullfilled while (true) { @@ -221,19 +205,19 @@ static void WaitSync (TRI_document_collection_t* sim, // we have to wait a bit longer // signal the synchroniser that there is work to do - TRI_INC_SYNCHRONISER_WAITER_VOC_BASE(sim->base.base._vocbase); - TRI_WAIT_JOURNAL_ENTRIES_DOC_COLLECTION(sim); - TRI_DEC_SYNCHRONISER_WAITER_VOC_BASE(sim->base.base._vocbase); + TRI_INC_SYNCHRONISER_WAITER_VOC_BASE(base->_vocbase); + TRI_WAIT_JOURNAL_ENTRIES_DOC_COLLECTION(document); + TRI_DEC_SYNCHRONISER_WAITER_VOC_BASE(base->_vocbase); } - TRI_UNLOCK_JOURNAL_ENTRIES_DOC_COLLECTION(sim); + TRI_UNLOCK_JOURNAL_ENTRIES_DOC_COLLECTION(document); } //////////////////////////////////////////////////////////////////////////////// /// @brief writes data to the journal and updates the barriers //////////////////////////////////////////////////////////////////////////////// -static int WriteElement (TRI_document_collection_t* sim, +static int WriteElement (TRI_document_collection_t* document, TRI_datafile_t* journal, TRI_df_marker_t* marker, TRI_voc_size_t markerSize, @@ -258,12 +242,12 @@ static int WriteElement (TRI_document_collection_t* sim, return res; } - TRI_LOCK_JOURNAL_ENTRIES_DOC_COLLECTION(sim); + TRI_LOCK_JOURNAL_ENTRIES_DOC_COLLECTION(document); journal->_written = ((char*) result) + marker->_size; journal->_nWritten++; - TRI_UNLOCK_JOURNAL_ENTRIES_DOC_COLLECTION(sim); + TRI_UNLOCK_JOURNAL_ENTRIES_DOC_COLLECTION(document); return TRI_ERROR_NO_ERROR; } @@ -285,6 +269,53 @@ static int WriteElement (TRI_document_collection_t* sim, /// @{ //////////////////////////////////////////////////////////////////////////////// +//////////////////////////////////////////////////////////////////////////////// +/// @brief compare revision of found document with revision specified in policy +/// this will also store the actual revision id found in the database in the +/// context variable _previousRid, but only if this is not NULL +//////////////////////////////////////////////////////////////////////////////// + +static int RevisionCheck (const TRI_doc_operation_context_t* const context, + const TRI_voc_rid_t actualRid) { + + // store previous revision + if (context->_previousRid != NULL) { + *(context->_previousRid) = actualRid; + } + + // check policy + switch (context->_policy) { + case TRI_DOC_UPDATE_ERROR: + if (context->_expectedRid != 0 && context->_expectedRid != actualRid) { + return TRI_ERROR_ARANGO_CONFLICT; + } + break; + + case TRI_DOC_UPDATE_CONFLICT: + return TRI_ERROR_NOT_IMPLEMENTED; + + case TRI_DOC_UPDATE_ILLEGAL: + return TRI_ERROR_INTERNAL; + + case TRI_DOC_UPDATE_LAST_WRITE: + return TRI_ERROR_NO_ERROR; + } + + return TRI_ERROR_NO_ERROR; +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief write-unlock the collection if it was write-locked +//////////////////////////////////////////////////////////////////////////////// + +static void Unlock (const TRI_doc_operation_context_t* const context) { + if (context->_release) { + TRI_primary_collection_t* primary = context->_collection; + + primary->endWrite(primary); + } +} + //////////////////////////////////////////////////////////////////////////////// /// @brief creates a new header //////////////////////////////////////////////////////////////////////////////// @@ -310,49 +341,50 @@ static void CreateHeader (TRI_primary_collection_t* c, /// @brief creates a new document splitted into marker and body to file //////////////////////////////////////////////////////////////////////////////// -static void CreateDocument (TRI_document_collection_t* sim, - TRI_doc_document_key_marker_t* marker, - size_t markerSize, - void const* keyBody, - TRI_voc_size_t keyBodySize, - void const* body, - TRI_voc_size_t bodySize, - TRI_df_marker_t** result, - void const* additional, - TRI_voc_key_t key, - TRI_voc_rid_t rid, - bool release, - TRI_doc_mptr_t *mptr, - bool forceSync) { +static int CreateDocument (TRI_doc_operation_context_t* context, + TRI_doc_document_key_marker_t* marker, + size_t markerSize, + void const* keyBody, + TRI_voc_size_t keyBodySize, + void const* body, + TRI_voc_size_t bodySize, + TRI_df_marker_t** result, + void const* additional, + TRI_voc_key_t key, + TRI_voc_rid_t rid, + TRI_doc_mptr_t *mptr) { TRI_datafile_t* journal; TRI_primary_collection_t* primary; + TRI_document_collection_t* document; TRI_doc_mptr_t* header; TRI_voc_size_t total; TRI_doc_datafile_info_t* dfi; int res; - primary = &sim->base; + primary = context->_collection; + document = (TRI_document_collection_t*) primary; // ............................................................................. // create header // ............................................................................. // get a new header pointer - header = sim->_headers->request(sim->_headers); - // TODO: header might be NULL and must be checked + header = document->_headers->request(document->_headers); + if (header == NULL) { + Unlock(context); + + return TRI_ERROR_INTERNAL; + } // find and select a journal total = markerSize + keyBodySize + bodySize; - journal = SelectJournal(sim, total, result); + journal = SelectJournal(document, total, result); if (journal == NULL) { - if (release) { - primary->endWrite(primary); - } + Unlock(context); - memset(mptr, 0, sizeof(TRI_doc_mptr_t)); - return; + return TRI_ERROR_INTERNAL; } // ............................................................................. @@ -360,124 +392,113 @@ static void CreateDocument (TRI_document_collection_t* sim, // ............................................................................. // verify the header pointer - header = sim->_headers->verify(sim->_headers, header); + header = document->_headers->verify(document->_headers, header); // generate crc TRI_FillCrcKeyMarkerDatafile(&marker->base, markerSize, keyBody, keyBodySize, body, bodySize); // and write marker and blob - res = WriteElement(sim, journal, &marker->base, markerSize, keyBody, keyBodySize, body, bodySize, *result); + res = WriteElement(document, journal, &marker->base, markerSize, keyBody, keyBodySize, body, bodySize, *result); + if (res != TRI_ERROR_NO_ERROR) { + Unlock(context); + + LOG_ERROR("cannot write element: %s", TRI_last_error()); + + return res; + } + // ............................................................................. // update indexes // ............................................................................. - // generate create header - if (res == TRI_ERROR_NO_ERROR) { - - // fill the header - CreateHeader(primary, journal, *result, markerSize, header, 0); - - // update the datafile info - dfi = TRI_FindDatafileInfoPrimaryCollection(primary, journal->_fid); - if (dfi != NULL) { - dfi->_numberAlive += 1; - dfi->_sizeAlive += TRI_LengthDataMasterPointer(header); - } - - // update immediate indexes - res = CreateImmediateIndexes(sim, header); - - // check for constraint error, rollback if necessary - if (res != TRI_ERROR_NO_ERROR) { - int resRollback; - - LOG_DEBUG("encountered index violation during create, deleting newly created document"); - - // rollback, ignore any additional errors - resRollback = DeleteShapedJson(primary, key, rid, 0, TRI_DOC_UPDATE_LAST_WRITE, false, false); - - if (resRollback != TRI_ERROR_NO_ERROR) { - LOG_ERROR("encountered error '%s' during rollback of create", TRI_last_error()); - } - - TRI_set_errno(res); - } - - // ............................................................................. - // create result - // ............................................................................. - - if (res == TRI_ERROR_NO_ERROR) { - *mptr = *header; - - // check cap constraint - if (primary->_capConstraint != NULL) { - while (primary->_capConstraint->_size < primary->_capConstraint->_array._array._nrUsed) { - TRI_doc_mptr_t const* oldest; - int remRes; - - oldest = TRI_PopFrontLinkedArray(&primary->_capConstraint->_array); - - if (oldest == NULL) { - LOG_WARNING("cap collection is empty, but collection '%ld' contains elements", - (unsigned long) primary->base._cid); - break; - } - - LOG_DEBUG("removing document '%s' because of cap constraint", - (char*) oldest->_key); - - remRes = DeleteShapedJson(primary, oldest->_key, 0, NULL, TRI_DOC_UPDATE_LAST_WRITE, false, false); - - if (remRes != TRI_ERROR_NO_ERROR) { - LOG_WARNING("cannot cap collection: %s", TRI_last_error()); - break; - } - } - } - - // release lock, header might be invalid after this - if (release) { - primary->endWrite(primary); - } - - // wait for sync - WaitSync(sim, journal, ((char const*) *result) + markerSize + keyBodySize + bodySize, forceSync); - - // and return - return; - } - else { - if (release) { - primary->endWrite(primary); - } - - mptr->_key = 0; - return; - } + // fill the header + CreateHeader(primary, journal, *result, markerSize, header, 0); + // update the datafile info + dfi = TRI_FindDatafileInfoPrimaryCollection(primary, journal->_fid); + if (dfi != NULL) { + dfi->_numberAlive += 1; + dfi->_sizeAlive += TRI_LengthDataMasterPointer(header); } - else { - if (release) { - primary->endWrite(primary); + + // update immediate indexes + res = CreateImmediateIndexes(document, header); + + // check for constraint error, rollback if necessary + if (res != TRI_ERROR_NO_ERROR) { + TRI_doc_operation_context_t rollbackContext; + int resRollback; + + LOG_DEBUG("encountered index violation during create, deleting newly created document"); + + // rollback, ignore any additional errors + TRI_InitContextPrimaryCollection(&rollbackContext, primary, TRI_DOC_UPDATE_LAST_WRITE, false); + rollbackContext._expectedRid = rid; + resRollback = DeleteShapedJson(&rollbackContext, key); + + if (resRollback != TRI_ERROR_NO_ERROR) { + LOG_ERROR("encountered error '%s' during rollback of create", TRI_last_error()); } - LOG_ERROR("cannot write element: %s", TRI_last_error()); + Unlock(context); + TRI_set_errno(res); - mptr->_key = 0; - return; + return res; } + + // ............................................................................. + // create result + // ............................................................................. + + assert(res == TRI_ERROR_NO_ERROR); + + *mptr = *header; + + // check cap constraint + if (primary->_capConstraint != NULL) { + while (primary->_capConstraint->_size < primary->_capConstraint->_array._array._nrUsed) { + TRI_doc_operation_context_t rollbackContext; + TRI_doc_mptr_t const* oldest; + int resRem; + + oldest = TRI_PopFrontLinkedArray(&primary->_capConstraint->_array); + + if (oldest == NULL) { + LOG_WARNING("cap collection is empty, but collection '%ld' contains elements", + (unsigned long) primary->base._cid); + break; + } + + LOG_DEBUG("removing document '%s' because of cap constraint", (char*) oldest->_key); + + TRI_InitContextPrimaryCollection(&rollbackContext, primary, TRI_DOC_UPDATE_LAST_WRITE, false); + resRem = DeleteShapedJson(&rollbackContext, oldest->_key); + + if (resRem != TRI_ERROR_NO_ERROR) { + LOG_WARNING("cannot cap collection: %s", TRI_last_error()); + break; + } + } + } + + Unlock(context); + + // wait for sync + if (context->_sync) { + WaitSync(document, journal, ((char const*) *result) + markerSize + keyBodySize + bodySize); + } + + // and return + return TRI_ERROR_NO_ERROR; } //////////////////////////////////////////////////////////////////////////////// /// @brief updates an existing header //////////////////////////////////////////////////////////////////////////////// -static void UpdateHeader (TRI_primary_collection_t* c, - TRI_datafile_t* datafile, +static void UpdateHeader (TRI_datafile_t* datafile, TRI_df_marker_t const* m, - size_t markerSize, TRI_doc_mptr_t const* header, TRI_doc_mptr_t* update) { TRI_doc_document_key_marker_t const* marker; @@ -494,7 +515,7 @@ static void UpdateHeader (TRI_primary_collection_t* c, /// @brief rolls back an update //////////////////////////////////////////////////////////////////////////////// -static TRI_doc_mptr_t RollbackUpdate (TRI_document_collection_t* sim, +static TRI_doc_mptr_t RollbackUpdate (TRI_primary_collection_t* primary, TRI_doc_mptr_t const* header, TRI_df_marker_t const* originalMarker, TRI_df_marker_t** result) { @@ -502,9 +523,24 @@ static TRI_doc_mptr_t RollbackUpdate (TRI_document_collection_t* sim, char* data; TRI_voc_size_t dataLength; TRI_voc_size_t markerLength; + TRI_doc_operation_context_t rollbackContext; char* keyData = NULL; TRI_voc_size_t keyDataLength = 0; + + if (originalMarker->_type != TRI_DOC_MARKER_KEY_DOCUMENT && + originalMarker->_type != TRI_DOC_MARKER_KEY_EDGE) { + // invalid marker type + TRI_doc_mptr_t mptr; + + memset(&mptr, 0, sizeof(mptr)); + TRI_set_errno(TRI_ERROR_INTERNAL); + LOG_WARNING("rollback operation called for unexpected marker type"); + + return mptr; + } + + if (originalMarker->_type == TRI_DOC_MARKER_KEY_DOCUMENT) { TRI_doc_document_key_marker_t documentUpdate; TRI_doc_document_key_marker_t* o = (TRI_doc_document_key_marker_t*) originalMarker; @@ -533,15 +569,13 @@ static TRI_doc_mptr_t RollbackUpdate (TRI_document_collection_t* sim, data = ((char*) originalMarker) + o->base._offsetJson; dataLength = originalMarker->_size - o->base._offsetJson; } - else { - TRI_doc_mptr_t mptr; - TRI_set_errno(TRI_ERROR_INTERNAL); - memset(&mptr, 0, sizeof(mptr)); - return mptr; - } + // create a rollback context that does not rollback itself + TRI_InitContextPrimaryCollection(&rollbackContext, primary, TRI_DOC_UPDATE_LAST_WRITE, false); + rollbackContext._expectedRid = header->_rid; + rollbackContext._allowRollback = false; - return UpdateDocument(sim, + return UpdateDocument(&rollbackContext, header, marker, markerLength, @@ -549,20 +583,14 @@ static TRI_doc_mptr_t RollbackUpdate (TRI_document_collection_t* sim, keyDataLength, data, dataLength, - header->_rid, - NULL, - TRI_DOC_UPDATE_LAST_WRITE, - result, - false, - false, - false); + result); } //////////////////////////////////////////////////////////////////////////////// /// @brief updates an existing document splitted into marker and body to file //////////////////////////////////////////////////////////////////////////////// -static TRI_doc_mptr_t UpdateDocument (TRI_document_collection_t* collection, +static TRI_doc_mptr_t UpdateDocument (TRI_doc_operation_context_t* context, TRI_doc_mptr_t const* header, TRI_doc_document_key_marker_t* marker, TRI_voc_size_t markerSize, @@ -570,69 +598,33 @@ static TRI_doc_mptr_t UpdateDocument (TRI_document_collection_t* collection, TRI_voc_size_t keyBodySize, void const* body, TRI_voc_size_t bodySize, - TRI_voc_rid_t rid, - TRI_voc_rid_t* oldRid, - TRI_doc_update_policy_e policy, - TRI_df_marker_t** result, - bool release, - bool allowRollback, - bool forceSync) { + TRI_df_marker_t** result) { TRI_doc_mptr_t mptr; + TRI_doc_mptr_t update; TRI_primary_collection_t* primary; + TRI_document_collection_t* document; TRI_datafile_t* journal; TRI_df_marker_t const* originalMarker; - TRI_doc_mptr_t resUpd; + TRI_doc_datafile_info_t* dfi; TRI_voc_size_t total; int res; - primary = &collection->base; + primary = context->_collection; + document = (TRI_document_collection_t*) primary; originalMarker = header->_data; + memset(&mptr, 0, sizeof(mptr)); // ............................................................................. // check the revision // ............................................................................. - if (oldRid != NULL) { - *oldRid = header->_rid; - } + res = RevisionCheck(context, header->_rid); + if (res != TRI_ERROR_NO_ERROR) { + Unlock(context); + TRI_set_errno(res); - switch (policy) { - case TRI_DOC_UPDATE_ERROR: - if (rid != 0) { - if (rid != header->_rid) { - if (release) { - primary->endWrite(primary); - } - - TRI_set_errno(TRI_ERROR_ARANGO_CONFLICT); - memset(&mptr, 0, sizeof(mptr)); - return mptr; - } - } - - break; - - case TRI_DOC_UPDATE_LAST_WRITE: - break; - - case TRI_DOC_UPDATE_CONFLICT: - if (release) { - primary->endWrite(primary); - } - - TRI_set_errno(TRI_ERROR_NOT_IMPLEMENTED); - mptr._key = 0; - return mptr; - - case TRI_DOC_UPDATE_ILLEGAL: - if (release) { - primary->endWrite(primary); - } - - TRI_set_errno(TRI_ERROR_INTERNAL); - mptr._key = 0; - return mptr; + return mptr; } // ............................................................................. @@ -644,16 +636,12 @@ static TRI_doc_mptr_t UpdateDocument (TRI_document_collection_t* collection, // find and select a journal total = markerSize + keyBodySize + bodySize; - journal = SelectJournal(collection, total, result); + journal = SelectJournal(document, total, result); if (journal == NULL) { + Unlock(context); primary->base._lastError = TRI_set_errno(TRI_ERROR_ARANGO_NO_JOURNAL); - if (release) { - primary->endWrite(primary); - } - - mptr._key = 0; return mptr; } @@ -666,161 +654,116 @@ static TRI_doc_mptr_t UpdateDocument (TRI_document_collection_t* collection, // and write marker and blob //TODO: update - res = WriteElement(collection, journal, &marker->base, markerSize, keyBody, keyBodySize, body, bodySize, *result); + res = WriteElement(document, journal, &marker->base, markerSize, keyBody, keyBodySize, body, bodySize, *result); + + if (res != TRI_ERROR_NO_ERROR) { + Unlock(context); + LOG_ERROR("cannot write element"); + + return mptr; + } // ............................................................................. // update indexes // ............................................................................. // update the header - if (res == TRI_ERROR_NO_ERROR) { - TRI_doc_mptr_t update; - TRI_doc_datafile_info_t* dfi; + UpdateHeader(journal, *result, header, &update); - // update the header - UpdateHeader(primary, journal, *result, markerSize, header, &update); - - // update the datafile info - dfi = TRI_FindDatafileInfoPrimaryCollection(primary, header->_fid); - if (dfi != NULL) { - size_t length = TRI_LengthDataMasterPointer(header); - - dfi->_numberAlive -= 1; - dfi->_sizeAlive -= length; - - dfi->_numberDead += 1; - dfi->_sizeDead += length; - } - - dfi = TRI_FindDatafileInfoPrimaryCollection(primary, journal->_fid); - if (dfi != NULL) { - dfi->_numberAlive += 1; - dfi->_sizeAlive += TRI_LengthDataMasterPointer(&update); - } - - // update immediate indexes - res = UpdateImmediateIndexes(collection, header, &update); - - // check for constraint error - if (allowRollback && res != TRI_ERROR_NO_ERROR) { - LOG_DEBUG("encountered index violating during update, rolling back"); - - resUpd = RollbackUpdate(collection, header, originalMarker, result); - - if (resUpd._key == 0) { - LOG_ERROR("encountered error '%s' during rollback of update", TRI_last_error()); - } - - TRI_set_errno(res); - } - - // ............................................................................. - // create result - // ............................................................................. - - if (res == TRI_ERROR_NO_ERROR) { - mptr = *header; - - // release lock, header might be invalid after this - if (release) { - primary->endWrite(primary); - } - - // wait for sync - WaitSync(collection, journal, ((char const*) *result) + markerSize + bodySize, forceSync); - - // and return - return mptr; - } - else { - if (release) { - primary->endWrite(primary); - } - - mptr._key = 0; - return mptr; - } + // update the datafile info + dfi = TRI_FindDatafileInfoPrimaryCollection(primary, header->_fid); + if (dfi != NULL) { + size_t length = TRI_LengthDataMasterPointer(header); + dfi->_numberAlive -= 1; + dfi->_sizeAlive -= length; + dfi->_numberDead += 1; + dfi->_sizeDead += length; } - else { - if (release) { - primary->endWrite(primary); - } - LOG_ERROR("cannot write element"); - mptr._key = 0; + dfi = TRI_FindDatafileInfoPrimaryCollection(primary, journal->_fid); + if (dfi != NULL) { + dfi->_numberAlive += 1; + dfi->_sizeAlive += TRI_LengthDataMasterPointer(&update); + } + + // update immediate indexes + res = UpdateImmediateIndexes(document, header, &update); + + // check for constraint error + if (context->_allowRollback && res != TRI_ERROR_NO_ERROR) { + TRI_doc_mptr_t resUpd; + + LOG_DEBUG("encountered index violating during update, rolling back"); + + resUpd = RollbackUpdate(primary, header, originalMarker, result); + if (resUpd._key == 0) { + LOG_ERROR("encountered error '%s' during rollback of update", TRI_last_error()); + } + TRI_set_errno(res); + } + + // ............................................................................. + // create result + // ............................................................................. + + if (res == TRI_ERROR_NO_ERROR) { + mptr = *header; + + Unlock(context); + + // wait for sync + if (context->_sync) { + WaitSync(document, journal, ((char const*) *result) + markerSize + bodySize); + } + + // and return return mptr; } + + // error case + assert(res != TRI_ERROR_NO_ERROR); + + Unlock(context); + mptr._key = 0; + + return mptr; } //////////////////////////////////////////////////////////////////////////////// /// @brief deletes an element and removes it from the index //////////////////////////////////////////////////////////////////////////////// -static int DeleteDocument (TRI_document_collection_t* collection, +static int DeleteDocument (TRI_doc_operation_context_t* context, TRI_doc_deletion_key_marker_t* marker, void const* keyBody, - TRI_voc_size_t keyBodySize, - TRI_voc_rid_t rid, - TRI_voc_rid_t* oldRid, - TRI_doc_update_policy_e policy, - bool release, - bool forceSync) { + TRI_voc_size_t keyBodySize) { TRI_datafile_t* journal; TRI_df_marker_t* result; TRI_doc_mptr_t const* header; TRI_primary_collection_t* primary; + TRI_document_collection_t* document; + TRI_doc_datafile_info_t* dfi; TRI_voc_size_t total; int res; - primary = &collection->base; + primary = context->_collection; + document = (TRI_document_collection_t*) primary; // get an existing header pointer header = TRI_LookupByKeyAssociativePointer(&primary->_primaryIndex, keyBody); if (header == NULL || header->_deletion != 0) { - if (release) { - primary->endWrite(primary); - } + Unlock(context); return TRI_set_errno(TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND); } - // check the revision - if (oldRid != NULL) { - *oldRid = header->_rid; - } + res = RevisionCheck(context, header->_rid); + if (res != TRI_ERROR_NO_ERROR) { + Unlock(context); - switch (policy) { - case TRI_DOC_UPDATE_ERROR: - if (rid != 0) { - if (rid != header->_rid) { - if (release) { - primary->endWrite(primary); - } - - return TRI_set_errno(TRI_ERROR_ARANGO_CONFLICT); - } - } - - break; - - case TRI_DOC_UPDATE_LAST_WRITE: - break; - - case TRI_DOC_UPDATE_CONFLICT: - if (release) { - primary->endWrite(primary); - } - - return TRI_set_errno(TRI_ERROR_NOT_IMPLEMENTED); - - case TRI_DOC_UPDATE_ILLEGAL: - if (release) { - primary->endWrite(primary); - } - - return TRI_set_errno(TRI_ERROR_INTERNAL); + return res; } // generate a new tick @@ -828,14 +771,12 @@ static int DeleteDocument (TRI_document_collection_t* collection, // find and select a journal total = sizeof(TRI_doc_deletion_key_marker_t) + keyBodySize; - journal = SelectJournal(collection, total, &result); + journal = SelectJournal(document, total, &result); if (journal == NULL) { - collection->base.base._lastError = TRI_set_errno(TRI_ERROR_ARANGO_NO_JOURNAL); + Unlock(context); - if (release) { - collection->base.endWrite(&collection->base); - } + primary->base._lastError = TRI_set_errno(TRI_ERROR_ARANGO_NO_JOURNAL); return TRI_ERROR_ARANGO_NO_JOURNAL; } @@ -844,49 +785,46 @@ static int DeleteDocument (TRI_document_collection_t* collection, TRI_FillCrcMarkerDatafile(&marker->base, sizeof(TRI_doc_deletion_key_marker_t), keyBody, keyBodySize, 0, 0); // and write marker and blob - res = WriteElement(collection, journal, &marker->base, sizeof(TRI_doc_deletion_key_marker_t), keyBody, keyBodySize, 0, 0, result); + res = WriteElement(document, journal, &marker->base, sizeof(TRI_doc_deletion_key_marker_t), keyBody, keyBodySize, 0, 0, result); - // update the header - if (res == TRI_ERROR_NO_ERROR) { - TRI_doc_datafile_info_t* dfi; - - // update the datafile info - dfi = TRI_FindDatafileInfoPrimaryCollection(primary, header->_fid); - if (dfi != NULL) { - size_t length = TRI_LengthDataMasterPointer(header); - - dfi->_numberAlive -= 1; - dfi->_sizeAlive -= length; - - dfi->_numberDead += 1; - dfi->_sizeDead += length; - } - - dfi = TRI_FindDatafileInfoPrimaryCollection(primary, journal->_fid); - if (dfi != NULL) { - dfi->_numberDeletion += 1; - } - - // update immediate indexes - DeleteImmediateIndexes(collection, header, marker->base._tick); - - // release lock - if (release) { - primary->endWrite(primary); - } - - // wait for sync - WaitSync(collection, journal, ((char const*) result) + sizeof(TRI_doc_deletion_key_marker_t) + keyBodySize, forceSync); - } - else { - if (release) { - primary->endWrite(primary); - } + if (res != TRI_ERROR_NO_ERROR) { + Unlock(context); LOG_ERROR("cannot delete element"); + + return res; } - return res; + assert(res == TRI_ERROR_NO_ERROR); + + // update the datafile info + dfi = TRI_FindDatafileInfoPrimaryCollection(primary, header->_fid); + if (dfi != NULL) { + size_t length = TRI_LengthDataMasterPointer(header); + + dfi->_numberAlive -= 1; + dfi->_sizeAlive -= length; + + dfi->_numberDead += 1; + dfi->_sizeDead += length; + } + + dfi = TRI_FindDatafileInfoPrimaryCollection(primary, journal->_fid); + if (dfi != NULL) { + dfi->_numberDeletion += 1; + } + + // update immediate indexes + DeleteImmediateIndexes(document, header, marker->base._tick); + + Unlock(context); + + // wait for sync + if (context->_sync) { + WaitSync(document, journal, ((char const*) result) + sizeof(TRI_doc_deletion_key_marker_t) + keyBodySize); + } + + return TRI_ERROR_NO_ERROR; } //////////////////////////////////////////////////////////////////////////////// @@ -898,17 +836,19 @@ static TRI_doc_mptr_t CreateJson (TRI_doc_operation_context_t* context, TRI_json_t const* json, void const* data) { TRI_shaped_json_t* shaped; - TRI_primary_collection_t* collection; + TRI_primary_collection_t* primary; TRI_doc_mptr_t result; TRI_voc_key_t key = 0; - collection = context->_collection; + primary = context->_collection; - shaped = TRI_ShapedJsonJson(collection->_shaper, json); + shaped = TRI_ShapedJsonJson(primary->_shaper, json); if (shaped == 0) { - collection->base._lastError = TRI_set_errno(TRI_ERROR_ARANGO_SHAPER_FAILED); + Unlock(context); + primary->base._lastError = TRI_set_errno(TRI_ERROR_ARANGO_SHAPER_FAILED); memset(&result, 0, sizeof(result)); + return result; } @@ -919,9 +859,9 @@ static TRI_doc_mptr_t CreateJson (TRI_doc_operation_context_t* context, } } - result = collection->create(context, type, shaped, data, key); + result = primary->create(context, type, shaped, data, key); - TRI_FreeShapedJson(collection->_shaper, shaped); + TRI_FreeShapedJson(primary->_shaper, shaped); return result; } @@ -935,21 +875,23 @@ static TRI_doc_mptr_t UpdateJson (TRI_doc_operation_context_t* context, TRI_voc_key_t key) { TRI_shaped_json_t* shaped; TRI_doc_mptr_t result; - TRI_primary_collection_t* collection; + TRI_primary_collection_t* primary; - collection = context->_collection; + primary = context->_collection; - shaped = TRI_ShapedJsonJson(collection->_shaper, json); + shaped = TRI_ShapedJsonJson(primary->_shaper, json); if (shaped == 0) { - collection->base._lastError = TRI_set_errno(TRI_ERROR_ARANGO_SHAPER_FAILED); + Unlock(context); + primary->base._lastError = TRI_set_errno(TRI_ERROR_ARANGO_SHAPER_FAILED); memset(&result, 0, sizeof(result)); + return result; } - result = collection->update(collection, shaped, key, context->_expectedRid, context->_previousRid, context->_policy, context->_release, context->_sync); + result = primary->update(context, shaped, key); - TRI_FreeShapedJson(collection->_shaper, shaped); + TRI_FreeShapedJson(primary->_shaper, shaped); return result; } @@ -1058,6 +1000,25 @@ static void DebugHeaderDocumentCollection (TRI_document_collection_t* collection } } +//////////////////////////////////////////////////////////////////////////////// +/// @brief initialise a document marker with common attributes +//////////////////////////////////////////////////////////////////////////////// + +static void InitDocumentMarker (TRI_doc_document_key_marker_t* marker, + const TRI_df_marker_type_t type, + TRI_shaped_json_t const* json, + const bool generateRid) { + marker->base._type = type; + + // generate a new tick + if (generateRid) { + marker->_rid = marker->base._tick = TRI_NewTickVocBase(); + } + + marker->_sid = 0; + marker->_shape = json->_sid; +} + //////////////////////////////////////////////////////////////////////////////// /// @brief creates a new document in the collection from shaped json //////////////////////////////////////////////////////////////////////////////// @@ -1069,7 +1030,7 @@ static TRI_doc_mptr_t CreateShapedJson (TRI_doc_operation_context_t* context, char* key) { TRI_df_marker_t* result; TRI_primary_collection_t* primary; - TRI_document_collection_t* collection; + TRI_document_collection_t* document; size_t keySize = 0; char* keyBody = 0; TRI_voc_size_t keyBodySize = 0; @@ -1078,28 +1039,43 @@ static TRI_doc_mptr_t CreateShapedJson (TRI_doc_operation_context_t* context, size_t toSize = 0; TRI_doc_mptr_t mptr; + // initialise the result + memset(&mptr, 0, sizeof(mptr)); + + if (type != TRI_DOC_MARKER_KEY_DOCUMENT && + type != TRI_DOC_MARKER_KEY_EDGE) { + // invalid marker type + Unlock(context); + + LOG_FATAL("unknown marker type %lu", (unsigned long) type); + TRI_FlushLogging(); + exit(EXIT_FAILURE); + } + + primary = context->_collection; - collection = (TRI_document_collection_t*) primary; - if (key) { + document = (TRI_document_collection_t*) primary; + // check key - if (regexec(&collection->DocumentKeyRegex, key, 0, NULL, 0) != 0 || strlen(key) > collection->keyLength) { - collection->base.base._lastError = TRI_set_errno(TRI_ERROR_ARANGO_DOCUMENT_KEY_BAD); - memset(&mptr, 0, sizeof(mptr)); - primary->endWrite(primary); + if (regexec(&document->DocumentKeyRegex, key, 0, NULL, 0) != 0 || strlen(key) > document->keyLength) { + Unlock(context); + primary->base._lastError = TRI_set_errno(TRI_ERROR_ARANGO_DOCUMENT_KEY_BAD); + return mptr; } } + // type & key are valid + if (type == TRI_DOC_MARKER_KEY_DOCUMENT) { + // create a document TRI_doc_document_key_marker_t marker; memset(&marker, 0, sizeof(marker)); - - // generate a new tick - marker._rid = marker.base._tick = TRI_NewTickVocBase(); - + InitDocumentMarker(&marker, TRI_DOC_MARKER_KEY_DOCUMENT, json, true); + if (key) { // we have a key! keySize = strlen(key) + 1; @@ -1120,39 +1096,29 @@ static TRI_doc_mptr_t CreateShapedJson (TRI_doc_operation_context_t* context, marker._offsetJson = sizeof(marker) + keyBodySize; marker.base._size = sizeof(marker) + json->_data.length + keyBodySize; - marker.base._type = TRI_DOC_MARKER_KEY_DOCUMENT; - marker._sid = 0; - marker._shape = json->_sid; - - CreateDocument(collection, - &marker, - sizeof(marker), - keyBody, - keyBodySize, - json->_data.data, - json->_data.length, - &result, - data, - keyBody, - marker._rid, - context->_release, - &mptr, - context->_sync); - TRI_FreeString(TRI_CORE_MEM_ZONE, keyBody); + CreateDocument(context, + &marker, + sizeof(marker), + keyBody, + keyBodySize, + json->_data.data, + json->_data.length, + &result, + data, + keyBody, + marker._rid, + &mptr); } - else if (type == TRI_DOC_MARKER_KEY_EDGE) { + else { + // create an edge TRI_doc_edge_key_marker_t marker; TRI_document_edge_t const* edge; edge = data; memset(&marker, 0, sizeof(marker)); - - marker.base.base._type = TRI_DOC_MARKER_KEY_EDGE; - - marker.base._sid = 0; - marker.base._shape = json->_sid; + InitDocumentMarker(&marker.base, TRI_DOC_MARKER_KEY_EDGE, json, true); marker._fromCid = edge->_fromCid; marker._toCid = edge->_toCid; @@ -1161,9 +1127,6 @@ static TRI_doc_mptr_t CreateShapedJson (TRI_doc_operation_context_t* context, fromSize = strlen(edge->_fromKey) + 1; toSize = strlen(edge->_toKey) + 1; - // generate a new tick - marker.base._rid = marker.base.base._tick = TRI_NewTickVocBase(); - if (key) { // we have a key! keySize = strlen(key)+1; @@ -1184,34 +1147,28 @@ static TRI_doc_mptr_t CreateShapedJson (TRI_doc_operation_context_t* context, TRI_CopyString((keyBody + keySize + toSize), edge->_fromKey, fromSize); marker.base._offsetKey = sizeof(marker); + marker.base._offsetJson = sizeof(marker) + keyBodySize; marker._offsetToKey = marker.base._offsetKey + keySize; marker._offsetFromKey = marker._offsetToKey + toSize; - marker.base._offsetJson = sizeof(marker) + keyBodySize; marker.base.base._size = sizeof(marker) + keyBodySize + json->_data.length; - CreateDocument(collection, - &marker.base, - sizeof(marker), - keyBody, - keyBodySize, - json->_data.data, - json->_data.length, - &result, - data, - keyBody, - marker.base._rid, - context->_release, - &mptr, - context->_sync); - TRI_FreeString(TRI_CORE_MEM_ZONE, keyBody); - } - else { - LOG_FATAL("unknown marker type %lu", (unsigned long) type); - TRI_FlushLogging(); - exit(EXIT_FAILURE); + CreateDocument(context, + &marker.base, + sizeof(marker), + keyBody, + keyBodySize, + json->_data.data, + json->_data.length, + &result, + data, + keyBody, + marker.base._rid, + &mptr); } + TRI_FreeString(TRI_CORE_MEM_ZONE, keyBody); + return mptr; } @@ -1221,70 +1178,73 @@ static TRI_doc_mptr_t CreateShapedJson (TRI_doc_operation_context_t* context, static TRI_doc_mptr_t ReadShapedJson (TRI_primary_collection_t* primary, TRI_voc_key_t key) { - TRI_doc_mptr_t result; TRI_doc_mptr_t const* header; header = TRI_LookupByKeyAssociativePointer(&primary->_primaryIndex, key); if (header == NULL || header->_deletion != 0) { + TRI_doc_mptr_t result; + memset(&result, 0, sizeof(result)); + return result; } - else { - return *header; - } + + return *header; } //////////////////////////////////////////////////////////////////////////////// /// @brief updates a document in the collection from shaped json //////////////////////////////////////////////////////////////////////////////// -static TRI_doc_mptr_t UpdateShapedJson (TRI_primary_collection_t* primary, +static TRI_doc_mptr_t UpdateShapedJson (TRI_doc_operation_context_t* context, TRI_shaped_json_t const* json, - TRI_voc_key_t key, - TRI_voc_rid_t rid, - TRI_voc_rid_t* oldRid, - TRI_doc_update_policy_e policy, - bool release, - bool forceSync) { + TRI_voc_key_t key) { TRI_df_marker_t const* original; TRI_df_marker_t* result; - TRI_document_collection_t* collection; + TRI_primary_collection_t* primary; TRI_doc_mptr_t mptr; TRI_doc_mptr_t const* header; char* keyBody = NULL; - size_t keyBodyLength = 0; + size_t keyBodyLength = 0; + + // initialise the result + memset(&mptr, 0, sizeof(mptr)); - collection = (TRI_document_collection_t*) primary; + primary = context->_collection; // get an existing header pointer header = TRI_LookupByKeyAssociativePointer(&primary->_primaryIndex, key); if (header == NULL || header->_deletion != 0) { - if (release) { - primary->endWrite(primary); - } - + Unlock(context); TRI_set_errno(TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND); - memset(&mptr, 0, sizeof(mptr)); + return mptr; } original = header->_data; - // the original is an document + if (original->_type != TRI_DOC_MARKER_KEY_DOCUMENT && + original->_type != TRI_DOC_MARKER_KEY_EDGE) { + // invalid marker type + Unlock(context); + + LOG_FATAL("unknown marker type %lu", (unsigned long) original->_type); + TRI_FlushLogging(); + exit(EXIT_FAILURE); + } + + if (original->_type == TRI_DOC_MARKER_KEY_DOCUMENT) { + // the original is a document TRI_doc_document_key_marker_t marker; TRI_doc_document_key_marker_t const* o; o = header->_data; // create an update memset(&marker, 0, sizeof(marker)); - - marker.base._type = o->base._type; - - marker._sid = 0; - marker._shape = json->_sid; + InitDocumentMarker(&marker, o->base._type, json, false); keyBody = ((char*) original) + o->_offsetKey; keyBodyLength = o->_offsetJson - o->_offsetKey; @@ -1294,7 +1254,7 @@ static TRI_doc_mptr_t UpdateShapedJson (TRI_primary_collection_t* primary, marker.base._size = sizeof(marker) + keyBodyLength + json->_data.length; - return UpdateDocument(collection, + return UpdateDocument(context, header, &marker, sizeof(marker), @@ -1302,17 +1262,11 @@ static TRI_doc_mptr_t UpdateShapedJson (TRI_primary_collection_t* primary, keyBodyLength, json->_data.data, json->_data.length, - rid, - oldRid, - policy, - &result, - release, - true, - forceSync); + &result); } // the original is an edge - else if (original->_type == TRI_DOC_MARKER_KEY_EDGE) { + else { TRI_doc_edge_key_marker_t marker; TRI_doc_edge_key_marker_t const* o; @@ -1320,11 +1274,7 @@ static TRI_doc_mptr_t UpdateShapedJson (TRI_primary_collection_t* primary, // create an update memset(&marker, 0, sizeof(marker)); - - marker.base.base._type = o->base.base._type; - - marker.base._sid = 0; - marker.base._shape = json->_sid; + InitDocumentMarker(&marker.base, o->base.base._type, json, false); marker._fromCid = o->_fromCid; marker._toCid = o->_toCid; @@ -1340,7 +1290,7 @@ static TRI_doc_mptr_t UpdateShapedJson (TRI_primary_collection_t* primary, marker.base.base._size = sizeof(marker) + keyBodyLength + json->_data.length; - return UpdateDocument(collection, + return UpdateDocument(context, header, &marker.base, sizeof(marker), @@ -1348,24 +1298,7 @@ static TRI_doc_mptr_t UpdateShapedJson (TRI_primary_collection_t* primary, keyBodyLength, json->_data.data, json->_data.length, - rid, - oldRid, - policy, - &result, - release, - true, - forceSync); - } - - // do not know - else { - if (release) { - primary->endWrite(primary); - } - - LOG_FATAL("unknown marker type %lu", (unsigned long) original->_type); - TRI_FlushLogging(); - exit(EXIT_FAILURE); + &result); } } @@ -1373,35 +1306,23 @@ static TRI_doc_mptr_t UpdateShapedJson (TRI_primary_collection_t* primary, /// @brief deletes a json document given the identifier //////////////////////////////////////////////////////////////////////////////// -static int DeleteShapedJson (TRI_primary_collection_t* primary, - TRI_voc_key_t key, - TRI_voc_rid_t rid, - TRI_voc_rid_t* oldRid, - TRI_doc_update_policy_e policy, - bool release, - bool forceSync) { - TRI_document_collection_t* document; +static int DeleteShapedJson (TRI_doc_operation_context_t* context, + TRI_voc_key_t key) { TRI_doc_deletion_key_marker_t marker; TRI_voc_size_t keyBodySize = 0; - - document = (TRI_document_collection_t*) primary; memset(&marker, 0, sizeof(marker)); - marker.base._type = TRI_DOC_MARKER_KEY_DELETION; - marker._sid = 0; if (key) { - //keyBodySize = ((strlen(key) + TRI_DF_BLOCK_ALIGN) / TRI_DF_BLOCK_ALIGN) * TRI_DF_BLOCK_ALIGN; keyBodySize = strlen(key) + 1; } marker._offsetKey = sizeof(marker); - marker.base._size = sizeof(marker) + keyBodySize; - return DeleteDocument(document, &marker, key, keyBodySize, rid, oldRid, policy, release, forceSync); + return DeleteDocument(context, &marker, key, keyBodySize); } //////////////////////////////////////////////////////////////////////////////// @@ -1444,37 +1365,6 @@ static int EndWrite (TRI_primary_collection_t* primary) { return TRI_ERROR_NO_ERROR; } -//////////////////////////////////////////////////////////////////////////////// -/// @brief size of a simple collection -//////////////////////////////////////////////////////////////////////////////// - -static TRI_voc_size_t SizeDocumentCollection (TRI_primary_collection_t* primary) { - TRI_doc_mptr_t const* mptr; - TRI_voc_size_t result; - void** end; - void** ptr; - - TRI_READ_LOCK_DOCUMENTS_INDEXES_PRIMARY_COLLECTION(primary); - - ptr = primary->_primaryIndex._table; - end = ptr + primary->_primaryIndex._nrAlloc; - result = 0; - - for (; ptr < end; ++ptr) { - if (*ptr != NULL) { - mptr = *ptr; - - if (mptr->_deletion == 0) { - ++result; - } - } - } - - TRI_READ_UNLOCK_DOCUMENTS_INDEXES_PRIMARY_COLLECTION(primary); - - return result; -} - //////////////////////////////////////////////////////////////////////////////// /// @} //////////////////////////////////////////////////////////////////////////////// @@ -1579,7 +1469,7 @@ static bool OpenIterator (TRI_df_marker_t const* marker, void* data, TRI_datafil TRI_doc_mptr_t update; // update the header info - UpdateHeader(primary, datafile, marker, markerSize, found, &update); + UpdateHeader(datafile, marker, found, &update); // update the datafile info dfi = TRI_FindDatafileInfoPrimaryCollection(primary, found->_fid); @@ -1874,8 +1764,6 @@ static bool InitDocumentCollection (TRI_document_collection_t* collection, collection->base.beginWrite = BeginWrite; collection->base.endWrite = EndWrite; - collection->base.size = SizeDocumentCollection; - collection->base.create = CreateShapedJson; collection->base.createJson = CreateJson; collection->base.read = ReadShapedJson; @@ -2241,7 +2129,6 @@ static TRI_json_t* ExtractFields (TRI_json_t* json, size_t* fieldCount, TRI_idx_ return fld; } - //////////////////////////////////////////////////////////////////////////////// /// @brief returns the list of attribute/value pairs /// diff --git a/arangod/VocBase/primary-collection.c b/arangod/VocBase/primary-collection.c index 2cc7298ec8..0322e4765b 100644 --- a/arangod/VocBase/primary-collection.c +++ b/arangod/VocBase/primary-collection.c @@ -119,6 +119,37 @@ static TRI_doc_collection_info_t* Figures (TRI_primary_collection_t* primary) { return info; } +//////////////////////////////////////////////////////////////////////////////// +/// @brief size of a primary collection +//////////////////////////////////////////////////////////////////////////////// + +static TRI_voc_size_t Count (TRI_primary_collection_t* primary) { + TRI_doc_mptr_t const* mptr; + TRI_voc_size_t result; + void** end; + void** ptr; + + TRI_READ_LOCK_DOCUMENTS_INDEXES_PRIMARY_COLLECTION(primary); + + ptr = primary->_primaryIndex._table; + end = ptr + primary->_primaryIndex._nrAlloc; + result = 0; + + for (; ptr < end; ++ptr) { + if (*ptr != NULL) { + mptr = *ptr; + + if (mptr->_deletion == 0) { + ++result; + } + } + } + + TRI_READ_UNLOCK_DOCUMENTS_INDEXES_PRIMARY_COLLECTION(primary); + + return result; +} + //////////////////////////////////////////////////////////////////////////////// /// @brief hashs a datafile identifier //////////////////////////////////////////////////////////////////////////////// @@ -389,6 +420,7 @@ void TRI_InitPrimaryCollection (TRI_primary_collection_t* collection, collection->_capConstraint = NULL; collection->figures = Figures; + collection->size = Count; TRI_InitBarrierList(&collection->_barrierList, collection); @@ -560,11 +592,11 @@ void TRI_InitContextPrimaryCollection (TRI_doc_operation_context_t* const contex context->_collection = collection; context->_policy = policy; context->_expectedRid = 0; - context->_previousRid = 0; + context->_previousRid = NULL; context->_lock = false; context->_release = false; context->_sync = forceSync || collection->base._waitForSync; - context->_allowRollback = false; + context->_allowRollback = true; } //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/VocBase/primary-collection.h b/arangod/VocBase/primary-collection.h index e932bd7986..6e78118a33 100644 --- a/arangod/VocBase/primary-collection.h +++ b/arangod/VocBase/primary-collection.h @@ -108,18 +108,22 @@ typedef enum { TRI_doc_update_policy_e; //////////////////////////////////////////////////////////////////////////////// -/// @brief typedef for arbitrary operation parameters +/// @brief typedef for arbitrary collection operation parameters +/// the context controls behavior such as revision check, locking/unlocking +/// +/// the context struct needs to be passed as a parameter for CRUD operations +/// this makes function signatures a lot easier //////////////////////////////////////////////////////////////////////////////// typedef struct TRI_doc_operation_context_s { - struct TRI_primary_collection_s* _collection; - TRI_doc_update_policy_e _policy; - TRI_voc_rid_t _expectedRid; - TRI_voc_rid_t* _previousRid; - bool _lock : 1; - bool _release : 1; - bool _sync : 1; - bool _allowRollback : 1; + struct TRI_primary_collection_s* _collection; // collection to be used + TRI_doc_update_policy_e _policy; // the update policy + TRI_voc_rid_t _expectedRid; // the expected revision id of a document. only used if set and for update/delete + TRI_voc_rid_t* _previousRid; // a variable that the previous revsion id found in the database will be pushed into. only used if set and for update/delete + bool _release : 1; // release the write lock after the operation + bool _sync : 1; // force syncing to disk after successful operation + bool _allowRollback : 1; // allow rollback of operation. this is normally true except for contexts created by rollback operations + bool _lock : 1; // currently unused } TRI_doc_operation_context_t; @@ -337,11 +341,10 @@ typedef struct TRI_primary_collection_s { TRI_doc_mptr_t (*createJson) (struct TRI_doc_operation_context_s*, TRI_df_marker_type_e, TRI_json_t const*, void const*); TRI_doc_mptr_t (*read) (struct TRI_primary_collection_s*, TRI_voc_key_t); - TRI_doc_mptr_t (*update) (struct TRI_primary_collection_s*, TRI_shaped_json_t const*, TRI_voc_key_t, TRI_voc_rid_t, TRI_voc_rid_t*, TRI_doc_update_policy_e, bool, bool); + TRI_doc_mptr_t (*update) (struct TRI_doc_operation_context_s*, TRI_shaped_json_t const*, TRI_voc_key_t); TRI_doc_mptr_t (*updateJson) (struct TRI_doc_operation_context_s*, TRI_json_t const*, TRI_voc_key_t); - // , TRI_voc_rid_t, TRI_voc_rid_t*, TRI_doc_update_policy_e, bool, bool); - int (*destroy) (struct TRI_primary_collection_s* collection, TRI_voc_key_t, TRI_voc_rid_t, TRI_voc_rid_t*, TRI_doc_update_policy_e, bool, bool); + int (*destroy) (struct TRI_doc_operation_context_s*, TRI_voc_key_t); TRI_doc_collection_info_t* (*figures) (struct TRI_primary_collection_s* collection); TRI_voc_size_t (*size) (struct TRI_primary_collection_s* collection);