//////////////////////////////////////////////////////////////////////////////// /// @brief base transaction wrapper /// /// @file /// /// DISCLAIMER /// /// Copyright 2004-2012 triAGENS GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// /// Copyright holder is triAGENS GmbH, Cologne, Germany /// /// @author Jan Steemann /// @author Copyright 2011-2012, triAGENS GmbH, Cologne, Germany //////////////////////////////////////////////////////////////////////////////// #ifndef TRIAGENS_UTILS_TRANSACTION_H #define TRIAGENS_UTILS_TRANSACTION_H 1 #include "VocBase/barrier.h" #include "VocBase/transaction.h" #include "VocBase/vocbase.h" #include "Logger/Logger.h" #include "Utils/CollectionReadLock.h" #include "Utils/CollectionWriteLock.h" #include "Utils/TransactionCollectionsList.h" #include "Utils/TransactionCollection.h" #if TRI_ENABLE_TRX //#define TRX_LOG LOGGER_INFO << __FUNCTION__ << ":" << __LINE__ << " " #define TRX_LOG if (false) std::cout #else #define TRX_LOG if (false) std::cout #endif namespace triagens { namespace arango { // ----------------------------------------------------------------------------- // --SECTION-- class Transaction // ----------------------------------------------------------------------------- template class Transaction : public T { //////////////////////////////////////////////////////////////////////////////// /// @addtogroup ArangoDB /// @{ //////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////// /// @brief Transaction //////////////////////////////////////////////////////////////////////////////// private: Transaction (const Transaction&); Transaction& operator= (const Transaction&); //////////////////////////////////////////////////////////////////////////////// /// @} //////////////////////////////////////////////////////////////////////////////// // ----------------------------------------------------------------------------- // --SECTION-- constructors and destructors // ----------------------------------------------------------------------------- //////////////////////////////////////////////////////////////////////////////// /// @addtogroup ArangoDB /// @{ //////////////////////////////////////////////////////////////////////////////// public: //////////////////////////////////////////////////////////////////////////////// /// @brief create the transaction //////////////////////////////////////////////////////////////////////////////// Transaction (TRI_vocbase_t* const vocbase, TransactionCollectionsList* collections) : T(), _vocbase(vocbase), _collections(collections), _hints(0), _setupError(TRI_ERROR_NO_ERROR) { assert(_vocbase != 0); TRX_LOG << "creating transaction"; int res = _collections->getError(); if (res == TRI_ERROR_NO_ERROR) { res = createTransaction(); } if (res != TRI_ERROR_NO_ERROR) { // setting up the transaction failed TRX_LOG << "creating transaction failed"; this->_setupError = res; } } //////////////////////////////////////////////////////////////////////////////// /// @brief destroy the transaction //////////////////////////////////////////////////////////////////////////////// virtual ~Transaction () { TRX_LOG << "destroying transaction"; if (this->_trx != 0 && ! this->isEmbedded()) { if (this->status() == TRI_TRANSACTION_RUNNING) { // auto abort this->abort(); } freeTransaction(); } delete _collections; } //////////////////////////////////////////////////////////////////////////////// /// @} //////////////////////////////////////////////////////////////////////////////// // ----------------------------------------------------------------------------- // --SECTION-- public methods // ----------------------------------------------------------------------------- //////////////////////////////////////////////////////////////////////////////// /// @addtogroup ArangoDB /// @{ //////////////////////////////////////////////////////////////////////////////// public: //////////////////////////////////////////////////////////////////////////////// /// @brief begin the transaction //////////////////////////////////////////////////////////////////////////////// int begin () { TRX_LOG << "beginning transaction"; if (this->_setupError != TRI_ERROR_NO_ERROR) { return this->_setupError; } if (this->_trx == 0) { return TRI_ERROR_TRANSACTION_INVALID_STATE; } int res; if (this->isEmbedded()) { res = this->checkCollections(); return res; } // ! this->isEmbedded() if (status() != TRI_TRANSACTION_CREATED) { return TRI_ERROR_TRANSACTION_INVALID_STATE; } // register usage of the underlying collections res = this->addCollections(); if (res != TRI_ERROR_NO_ERROR) { return res; } TRX_LOG << "calling transaction start"; return TRI_StartTransaction(this->_trx, _hints); } //////////////////////////////////////////////////////////////////////////////// /// @brief commit / finish the transaction //////////////////////////////////////////////////////////////////////////////// int commit () { TRX_LOG << "committing transaction"; if (this->_trx == 0 || this->status() != TRI_TRANSACTION_RUNNING) { // transaction not created or not running return TRI_ERROR_TRANSACTION_INVALID_STATE; } if (this->isEmbedded()) { // return instantly if the transaction is embedded return TRI_ERROR_NO_ERROR; } int res; if (this->_trx->_type == TRI_TRANSACTION_READ) { // a read transaction just finishes TRX_LOG << "commit - finishing read transaction"; res = TRI_FinishTransaction(this->_trx); } else { // a write transaction commits TRX_LOG << "commit - committing write transaction"; res = TRI_CommitTransaction(this->_trx); } return res; } //////////////////////////////////////////////////////////////////////////////// /// @brief abort the transaction //////////////////////////////////////////////////////////////////////////////// int abort () { TRX_LOG << "aborting transaction"; if (this->_trx == 0) { // transaction not created return TRI_ERROR_TRANSACTION_INVALID_STATE; } if (this->status() != TRI_TRANSACTION_RUNNING) { return TRI_ERROR_TRANSACTION_INVALID_STATE; } int res = TRI_AbortTransaction(this->_trx); return res; } //////////////////////////////////////////////////////////////////////////////// /// @brief finish a transaction, based on the previous state //////////////////////////////////////////////////////////////////////////////// int finish (const int errorNumber) { if (this->_trx == 0) { // transaction already not created return TRI_ERROR_TRANSACTION_INVALID_STATE; } if (this->status() != TRI_TRANSACTION_RUNNING) { return TRI_ERROR_TRANSACTION_INVALID_STATE; } int res; if (errorNumber == TRI_ERROR_NO_ERROR) { // there was no previous error, so we'll commit res = this->commit(); } else { // there was a previous error, so we'll abort this->abort(); // return original error number res = errorNumber; } return res; } //////////////////////////////////////////////////////////////////////////////// /// @brief get the "vocbase" //////////////////////////////////////////////////////////////////////////////// inline TRI_vocbase_t* vocbase () const { return this->_vocbase; } //////////////////////////////////////////////////////////////////////////////// /// @brief get the status of the transaction //////////////////////////////////////////////////////////////////////////////// inline TRI_transaction_status_e status () const { if (this->_trx != 0) { return this->_trx->_status; } return TRI_TRANSACTION_UNDEFINED; } //////////////////////////////////////////////////////////////////////////////// /// @brief dump the transaction //////////////////////////////////////////////////////////////////////////////// void dump () { if (this->_trx != 0) { TRI_DumpTransaction(this->_trx); } } //////////////////////////////////////////////////////////////////////////////// /// @} //////////////////////////////////////////////////////////////////////////////// // ----------------------------------------------------------------------------- // --SECTION-- protected methods // ----------------------------------------------------------------------------- //////////////////////////////////////////////////////////////////////////////// /// @addtogroup ArangoDB /// @{ //////////////////////////////////////////////////////////////////////////////// protected: //////////////////////////////////////////////////////////////////////////////// /// @brief add a transaction hint //////////////////////////////////////////////////////////////////////////////// void addHint (const TRI_transaction_hint_e hint) { this->_hints |= (TRI_transaction_hint_t) hint; } //////////////////////////////////////////////////////////////////////////////// /// @brief read a single document, identified by key //////////////////////////////////////////////////////////////////////////////// int readCollectionDocument (TRI_primary_collection_t* const primary, TRI_doc_mptr_t** mptr, const string& key) { TRI_doc_operation_context_t context; TRI_InitReadContextPrimaryCollection(&context, primary); CollectionReadLock lock(primary); return primary->read(&context, mptr, (TRI_voc_key_t) key.c_str()); } //////////////////////////////////////////////////////////////////////////////// /// @brief read all documents //////////////////////////////////////////////////////////////////////////////// int readCollectionDocuments (TRI_primary_collection_t* const primary, vector& ids) { TRI_doc_operation_context_t context; TRI_InitReadContextPrimaryCollection(&context, primary); CollectionReadLock lock(primary); if (primary->_primaryIndex._nrUsed > 0) { void** ptr = primary->_primaryIndex._table; void** end = ptr + primary->_primaryIndex._nrAlloc; for (; ptr < end; ++ptr) { if (*ptr) { TRI_doc_mptr_t const* d = (TRI_doc_mptr_t const*) *ptr; if (d->_validTo == 0) { ids.push_back(d->_key); } } } } return TRI_ERROR_NO_ERROR; } //////////////////////////////////////////////////////////////////////////////// /// @brief read all master pointers, using skip and limit //////////////////////////////////////////////////////////////////////////////// int readCollectionPointers (TRI_primary_collection_t* const primary, vector& docs, TRI_barrier_t** barrier, TRI_voc_ssize_t skip, TRI_voc_size_t limit, uint32_t* total) { TRI_doc_operation_context_t context; TRI_InitReadContextPrimaryCollection(&context, primary); if (limit == 0) { // nothing to do return TRI_ERROR_NO_ERROR; } CollectionReadLock lock(primary); if (primary->_primaryIndex._nrUsed == 0) { // nothing to do return TRI_ERROR_NO_ERROR; } *barrier = TRI_CreateBarrierElement(&primary->_barrierList); if (*barrier == 0) { return TRI_ERROR_OUT_OF_MEMORY; } void** beg = primary->_primaryIndex._table; void** ptr = beg; void** end = ptr + primary->_primaryIndex._nrAlloc; uint32_t count = 0; // TODO: this is not valid in MVCC context *total = (uint32_t) primary->_primaryIndex._nrUsed; // apply skip if (skip > 0) { // skip from the beginning for (; ptr < end && 0 < skip; ++ptr) { if (*ptr) { TRI_doc_mptr_t const* d = (TRI_doc_mptr_t const*) *ptr; if (d->_validTo == 0) { --skip; } } } } else if (skip < 0) { // skip from the end ptr = end - 1; for (; beg <= ptr; --ptr) { if (*ptr) { TRI_doc_mptr_t const* d = (TRI_doc_mptr_t const*) *ptr; if (d->_validTo == 0) { ++skip; if (skip == 0) { break; } } } } if (ptr < beg) { ptr = beg; } } // fetch documents, taking limit into account for (; ptr < end && count < limit; ++ptr) { if (*ptr) { TRI_doc_mptr_t* d = (TRI_doc_mptr_t*) *ptr; if (d->_validTo == 0) { docs.push_back(d); ++count; } } } if (count == 0) { // barrier not needed, kill it TRI_FreeBarrier(*barrier); } return TRI_ERROR_NO_ERROR; } //////////////////////////////////////////////////////////////////////////////// /// @brief create a single document, using JSON //////////////////////////////////////////////////////////////////////////////// int createCollectionDocument (TRI_primary_collection_t* const primary, const TRI_df_marker_type_e markerType, TRI_doc_mptr_t** mptr, TRI_json_t const* json, void const* data, const bool forceSync) { TRI_doc_operation_context_t context; TRI_InitContextPrimaryCollection(&context, primary, TRI_DOC_UPDATE_ERROR, forceSync); CollectionWriteLock lock(primary); return primary->createJson(&context, markerType, mptr, json, data); } //////////////////////////////////////////////////////////////////////////////// /// @brief create a single document, using shaped json //////////////////////////////////////////////////////////////////////////////// int createCollectionShaped (TRI_primary_collection_t* const primary, const TRI_df_marker_type_e markerType, TRI_voc_key_t key, TRI_doc_mptr_t** mptr, TRI_shaped_json_t const* shaped, void const* data, const bool forceSync) { TRI_doc_operation_context_t context; TRI_InitContextPrimaryCollection(&context, primary, TRI_DOC_UPDATE_ERROR, forceSync); CollectionWriteLock lock(primary); return primary->create(&context, markerType, mptr, shaped, data, key); } //////////////////////////////////////////////////////////////////////////////// /// @brief update a single document, using JSON //////////////////////////////////////////////////////////////////////////////// int updateCollectionDocument (TRI_primary_collection_t* const primary, const string& key, TRI_doc_mptr_t** mptr, TRI_json_t* const json, const TRI_doc_update_policy_e policy, const TRI_voc_rid_t expectedRevision, TRI_voc_rid_t* actualRevision, const bool forceSync) { TRI_doc_operation_context_t context; TRI_InitContextPrimaryCollection(&context, primary, policy, forceSync); context._expectedRid = expectedRevision; context._previousRid = actualRevision; CollectionWriteLock lock(primary); return primary->updateJson(&context, mptr, json, (TRI_voc_key_t) key.c_str()); } //////////////////////////////////////////////////////////////////////////////// /// @brief update a single document, using shaped json //////////////////////////////////////////////////////////////////////////////// int updateCollectionShaped (TRI_primary_collection_t* const primary, const string& key, TRI_doc_mptr_t** mptr, TRI_shaped_json_t* const shaped, const TRI_doc_update_policy_e policy, const TRI_voc_rid_t expectedRevision, TRI_voc_rid_t* actualRevision, const bool forceSync) { TRI_doc_operation_context_t context; TRI_InitContextPrimaryCollection(&context, primary, policy, forceSync); context._expectedRid = expectedRevision; context._previousRid = actualRevision; CollectionWriteLock lock(primary); return primary->update(&context, mptr, shaped, (TRI_voc_key_t) key.c_str()); } //////////////////////////////////////////////////////////////////////////////// /// @brief delete a single document //////////////////////////////////////////////////////////////////////////////// int deleteCollectionDocument (TRI_primary_collection_t* const primary, const string& key, const TRI_doc_update_policy_e policy, const TRI_voc_rid_t expectedRevision, TRI_voc_rid_t* actualRevision, const bool forceSync) { TRI_doc_operation_context_t context; TRI_InitContextPrimaryCollection(&context, primary, policy, forceSync); context._expectedRid = expectedRevision; context._previousRid = actualRevision; CollectionWriteLock lock(primary); return primary->destroy(&context, (TRI_voc_key_t) key.c_str()); } //////////////////////////////////////////////////////////////////////////////// /// @brief truncate a collection //////////////////////////////////////////////////////////////////////////////// int truncateCollection (TRI_primary_collection_t* const primary, const bool forceSync) { vector ids; int res = readCollectionDocuments(primary, ids); if (res != TRI_ERROR_NO_ERROR) { return res; } TRI_doc_operation_context_t context; TRI_InitContextPrimaryCollection(&context, primary, TRI_DOC_UPDATE_LAST_WRITE, forceSync); size_t n = ids.size(); res = TRI_ERROR_NO_ERROR; CollectionWriteLock lock(primary); for (size_t i = 0; i < n; ++i) { const string& id = ids[i]; res = primary->destroy(&context, (TRI_voc_key_t) id.c_str()); if (res != TRI_ERROR_NO_ERROR) { // halt on first error break; } } return res; } //////////////////////////////////////////////////////////////////////////////// /// @} //////////////////////////////////////////////////////////////////////////////// // ----------------------------------------------------------------------------- // --SECTION-- private methods // ----------------------------------------------------------------------------- //////////////////////////////////////////////////////////////////////////////// /// @addtogroup ArangoDB /// @{ //////////////////////////////////////////////////////////////////////////////// private: //////////////////////////////////////////////////////////////////////////////// /// @brief check all collections to the transaction /// /// this method is called in case the transaction is embedded. its purpose is /// to validate if all collections have been previously registered in the /// current transaction //////////////////////////////////////////////////////////////////////////////// int checkCollections () { assert(this->isEmbedded()); TRX_LOG << "checking collections"; const vector& collections = _collections->getCollections(); for (size_t i = 0; i < collections.size(); ++i) { TransactionCollection* c = collections[i]; TRI_vocbase_col_t* collection = TRI_CheckCollectionTransaction(this->_trx, c->getName().c_str(), c->getAccessType()); if (collection == 0) { return TRI_ERROR_TRANSACTION_UNREGISTERED_COLLECTION; } } return TRI_ERROR_NO_ERROR; } //////////////////////////////////////////////////////////////////////////////// /// @brief add all collections to the transaction /// /// this method is called in case the transaction is not embedded. its purpose is /// to validate if all collections have been previously registered in the /// current transaction //////////////////////////////////////////////////////////////////////////////// int addCollections () { assert(! this->isEmbedded()); TRX_LOG << "adding collections"; const vector& collections = _collections->getCollections(); for (size_t i = 0; i < collections.size(); ++i) { TransactionCollection* c = collections[i]; TRX_LOG << "adding collection " << c->getName(); int res = TRI_AddCollectionTransaction(this->_trx, c->getName().c_str(), c->getAccessType()); if (res != TRI_ERROR_NO_ERROR) { return res; } } TRX_LOG << "all collections added"; return TRI_ERROR_NO_ERROR; } //////////////////////////////////////////////////////////////////////////////// /// @brief create transaction //////////////////////////////////////////////////////////////////////////////// int createTransaction () { if (this->isEmbedded()) { if (! this->isEmbeddable()) { return TRI_ERROR_TRANSACTION_NESTED; } TRX_LOG << "creating embedded transaction"; this->_trx = this->getParent(); if (this->_trx == 0) { TRX_LOG << "creating embedded transaction failed"; return TRI_ERROR_TRANSACTION_INVALID_STATE; } return TRI_ERROR_NO_ERROR; } TRX_LOG << "creating standalone transaction"; this->_trx = TRI_CreateTransaction(_vocbase->_transactionContext, TRI_TRANSACTION_READ_REPEATABLE); if (this->_trx == 0) { return TRI_ERROR_OUT_OF_MEMORY; } this->registerTransaction(this->_trx); return TRI_ERROR_NO_ERROR; } //////////////////////////////////////////////////////////////////////////////// /// @brief free transaction //////////////////////////////////////////////////////////////////////////////// int freeTransaction () { assert(! this->isEmbedded()); TRX_LOG << "freeing standalone transaction"; if (this->_trx != 0) { this->unregisterTransaction(); TRI_FreeTransaction(this->_trx); this->_trx = 0; } return TRI_ERROR_NO_ERROR; } //////////////////////////////////////////////////////////////////////////////// /// @} //////////////////////////////////////////////////////////////////////////////// // ----------------------------------------------------------------------------- // --SECTION-- protected variables // ----------------------------------------------------------------------------- //////////////////////////////////////////////////////////////////////////////// /// @addtogroup ArangoDB /// @{ //////////////////////////////////////////////////////////////////////////////// protected: //////////////////////////////////////////////////////////////////////////////// /// @brief the vocbase //////////////////////////////////////////////////////////////////////////////// TRI_vocbase_t* const _vocbase; //////////////////////////////////////////////////////////////////////////////// /// @brief the collections participating in the transaction //////////////////////////////////////////////////////////////////////////////// TransactionCollectionsList* _collections; //////////////////////////////////////////////////////////////////////////////// /// @} //////////////////////////////////////////////////////////////////////////////// // ----------------------------------------------------------------------------- // --SECTION-- private variables // ----------------------------------------------------------------------------- //////////////////////////////////////////////////////////////////////////////// /// @addtogroup ArangoDB /// @{ //////////////////////////////////////////////////////////////////////////////// private: //////////////////////////////////////////////////////////////////////////////// /// @brief transaction hints //////////////////////////////////////////////////////////////////////////////// TRI_transaction_hint_t _hints; //////////////////////////////////////////////////////////////////////////////// /// @brief error that occurred on transaction initialisation (before begin()) //////////////////////////////////////////////////////////////////////////////// int _setupError; //////////////////////////////////////////////////////////////////////////////// /// @} //////////////////////////////////////////////////////////////////////////////// }; } } #endif // Local Variables: // mode: outline-minor // outline-regexp: "^\\(/// @brief\\|/// {@inheritDoc}\\|/// @addtogroup\\|/// @page\\|// --SECTION--\\|/// @\\}\\)" // End: