diff --git a/arangod/Aql/ModificationBlocks.h b/arangod/Aql/ModificationBlocks.h index 92959d2c4d..19e33b7eb5 100644 --- a/arangod/Aql/ModificationBlocks.h +++ b/arangod/Aql/ModificationBlocks.h @@ -31,7 +31,7 @@ #include "Utils/AqlTransaction.h" #include "VocBase/shaped-json.h" -struct TRI_df_marker_s; +struct TRI_df_marker_t; struct TRI_doc_mptr_t; struct TRI_json_t; @@ -71,7 +71,7 @@ class ModificationBlock : public ExecutionBlock { /// @brief constructs a master pointer from the marker passed ////////////////////////////////////////////////////////////////////////////// - void constructMptr(TRI_doc_mptr_t*, TRI_df_marker_s const*) const; + void constructMptr(TRI_doc_mptr_t*, TRI_df_marker_t const*) const; ////////////////////////////////////////////////////////////////////////////// /// @brief check whether a shard key value has changed diff --git a/arangod/V8Server/V8VPackWrapper.h b/arangod/V8Server/V8VPackWrapper.h index c125677137..87445c5e07 100644 --- a/arangod/V8Server/V8VPackWrapper.h +++ b/arangod/V8Server/V8VPackWrapper.h @@ -30,7 +30,7 @@ #include -struct TRI_df_marker_s; +struct TRI_df_marker_t; struct TRI_doc_mptr_t; struct TRI_document_collection_t; @@ -46,7 +46,7 @@ namespace V8VPackWrapper { v8::Handle wrap(v8::Isolate*, arangodb::Transaction*, TRI_voc_cid_t cid, arangodb::DocumentDitch* ditch, - struct TRI_df_marker_s const*); + struct TRI_df_marker_t const*); //////////////////////////////////////////////////////////////////////////////// /// @brief wraps a VPackSlice diff --git a/arangod/VocBase/cleanup.cpp b/arangod/VocBase/cleanup.cpp index 925071f631..13a4da4ba0 100644 --- a/arangod/VocBase/cleanup.cpp +++ b/arangod/VocBase/cleanup.cpp @@ -24,7 +24,7 @@ #include "cleanup.h" #include "Basics/files.h" #include "Basics/Logger.h" -#include "Basics/tri-strings.h" +#include "Basics/ReadLocker.h" #include "Utils/CursorRepository.h" #include "VocBase/compactor.h" #include "VocBase/Ditch.h" diff --git a/arangod/VocBase/datafile.cpp b/arangod/VocBase/datafile.cpp index 31dd928b85..f74e9ca55a 100644 --- a/arangod/VocBase/datafile.cpp +++ b/arangod/VocBase/datafile.cpp @@ -1407,6 +1407,8 @@ char const* TRI_NameMarkerDatafile(TRI_df_marker_t const* marker) { return "header"; case TRI_DF_MARKER_FOOTER: return "footer"; + case TRI_DF_MARKER_PROLOGUE: + return "prologue"; // datafile markers case TRI_DOC_MARKER_KEY_DOCUMENT: diff --git a/arangod/VocBase/datafile.h b/arangod/VocBase/datafile.h index 679fc1637f..13b1ef52ae 100644 --- a/arangod/VocBase/datafile.h +++ b/arangod/VocBase/datafile.h @@ -107,25 +107,26 @@ struct TRI_datafile_t; /// @brief state of the datafile //////////////////////////////////////////////////////////////////////////////// -typedef enum { +enum TRI_df_state_e { TRI_DF_STATE_CLOSED = 1, // datafile is closed TRI_DF_STATE_READ = 2, // datafile is opened read only TRI_DF_STATE_WRITE = 3, // datafile is opened read/append TRI_DF_STATE_OPEN_ERROR = 4, // an error has occurred while opening TRI_DF_STATE_WRITE_ERROR = 5, // an error has occurred while writing TRI_DF_STATE_RENAME_ERROR = 6 // an error has occurred while renaming -} TRI_df_state_e; +}; //////////////////////////////////////////////////////////////////////////////// /// @brief type of the marker //////////////////////////////////////////////////////////////////////////////// -typedef enum { +enum TRI_df_marker_type_e { TRI_MARKER_MIN = 999, // not a real marker type, // but used for bounds checking TRI_DF_MARKER_HEADER = 1000, TRI_DF_MARKER_FOOTER = 1001, + TRI_DF_MARKER_PROLOGUE = 1002, TRI_DF_MARKER_BLANK = 1100, @@ -164,7 +165,7 @@ typedef enum { TRI_MARKER_MAX // again, this is not a real // marker, but we use it for // bounds checking -} TRI_df_marker_type_e; +}; //////////////////////////////////////////////////////////////////////////////// /// @brief storage type of the marker @@ -182,7 +183,7 @@ typedef uint32_t TRI_df_version_t; /// @brief scan result //////////////////////////////////////////////////////////////////////////////// -typedef struct TRI_df_scan_s { +struct TRI_df_scan_t { TRI_voc_size_t _currentSize; TRI_voc_size_t _maximalSize; TRI_voc_size_t _endPosition; @@ -192,7 +193,7 @@ typedef struct TRI_df_scan_s { uint32_t _status; bool _isSealed; -} TRI_df_scan_t; +}; //////////////////////////////////////////////////////////////////////////////// /// @brief scan result entry @@ -205,7 +206,7 @@ typedef struct TRI_df_scan_s { /// 5 - CRC failed //////////////////////////////////////////////////////////////////////////////// -typedef struct TRI_df_scan_entry_s { +struct TRI_df_scan_entry_t { TRI_voc_size_t _position; TRI_voc_size_t _size; TRI_voc_size_t _realSize; @@ -217,7 +218,7 @@ typedef struct TRI_df_scan_entry_s { char* _diagnosis; char* _key; char const* _typeName; -} TRI_df_scan_entry_t; +}; //////////////////////////////////////////////////////////////////////////////// /// @brief datafile @@ -310,7 +311,7 @@ struct TRI_datafile_t { /// and _crc the second. //////////////////////////////////////////////////////////////////////////////// -typedef struct TRI_df_marker_s { +struct TRI_df_marker_t { TRI_voc_size_t _size; // 4 bytes, must be supplied TRI_voc_crc_t _crc; // 4 bytes, will be generated @@ -321,7 +322,7 @@ typedef struct TRI_df_marker_s { #endif TRI_voc_tick_t _tick; // 8 bytes, will be generated -} TRI_df_marker_t; +}; //////////////////////////////////////////////////////////////////////////////// /// @brief datafile header marker @@ -353,13 +354,24 @@ typedef struct TRI_df_marker_s { /// //////////////////////////////////////////////////////////////////////////////// -typedef struct TRI_df_header_marker_s { +struct TRI_df_header_marker_t { TRI_df_marker_t base; // 24 bytes TRI_df_version_t _version; // 4 bytes TRI_voc_size_t _maximalSize; // 4 bytes TRI_voc_tick_t _fid; // 8 bytes -} TRI_df_header_marker_t; +}; + +//////////////////////////////////////////////////////////////////////////////// +/// @brief datafile footer marker +//////////////////////////////////////////////////////////////////////////////// + +struct TRI_df_prologue_marker_t { + TRI_df_marker_t base; // 24 bytes + + TRI_voc_tick_t _databaseId; // 8 bytes + TRI_voc_cid_t _collectionId; // 8 bytes +}; //////////////////////////////////////////////////////////////////////////////// /// @brief datafile footer marker @@ -387,18 +399,18 @@ typedef struct TRI_df_header_marker_s { /// contains a footer is sealed and read-only. //////////////////////////////////////////////////////////////////////////////// -typedef struct TRI_df_footer_marker_s { +struct TRI_df_footer_marker_t { TRI_df_marker_t base; // 24 bytes TRI_voc_size_t _maximalSize; // 4 bytes TRI_voc_size_t _totalSize; // 4 bytes -} TRI_df_footer_marker_t; +}; //////////////////////////////////////////////////////////////////////////////// /// @brief document datafile header marker //////////////////////////////////////////////////////////////////////////////// -typedef struct TRI_col_header_marker_s { +struct TRI_col_header_marker_t { TRI_df_marker_t base; // 24 bytes TRI_col_type_t _type; // 4 bytes @@ -408,28 +420,7 @@ typedef struct TRI_col_header_marker_s { #endif TRI_voc_cid_t _cid; // 8 bytes -} TRI_col_header_marker_t; - -//////////////////////////////////////////////////////////////////////////////// -/// @brief datafile attribute marker -//////////////////////////////////////////////////////////////////////////////// - -typedef struct TRI_df_attribute_marker_s { - TRI_df_marker_t base; - - TRI_shape_aid_t _aid; - TRI_shape_size_t _size; - - // char name[] -} TRI_df_attribute_marker_t; - -//////////////////////////////////////////////////////////////////////////////// -/// @brief datafile shape marker -//////////////////////////////////////////////////////////////////////////////// - -typedef struct TRI_df_shape_marker_s { - TRI_df_marker_t base; -} TRI_df_shape_marker_t; +}; //////////////////////////////////////////////////////////////////////////////// /// @brief document datafile marker with key @@ -482,50 +473,6 @@ typedef struct TRI_doc_deletion_key_marker_s { uint16_t _offsetKey; } TRI_doc_deletion_key_marker_t; -//////////////////////////////////////////////////////////////////////////////// -/// @brief begin transaction marker -//////////////////////////////////////////////////////////////////////////////// - -typedef struct TRI_doc_begin_transaction_marker_s { - TRI_df_marker_t base; - - TRI_voc_tid_t _tid; - uint32_t _numCollections; -#ifdef TRI_PADDING_32 - char _padding_begin_marker[4]; -#endif -} TRI_doc_begin_transaction_marker_t; - -//////////////////////////////////////////////////////////////////////////////// -/// @brief commit transaction marker -//////////////////////////////////////////////////////////////////////////////// - -typedef struct TRI_doc_commit_transaction_marker_s { - TRI_df_marker_t base; - - TRI_voc_tid_t _tid; -} TRI_doc_commit_transaction_marker_t; - -//////////////////////////////////////////////////////////////////////////////// -/// @brief abort transaction marker -//////////////////////////////////////////////////////////////////////////////// - -typedef struct TRI_doc_abort_transaction_marker_s { - TRI_df_marker_t base; - - TRI_voc_tid_t _tid; -} TRI_doc_abort_transaction_marker_t; - -//////////////////////////////////////////////////////////////////////////////// -/// @brief prepare transaction marker -//////////////////////////////////////////////////////////////////////////////// - -typedef struct TRI_doc_prepare_transaction_marker_s { - TRI_df_marker_t base; - - TRI_voc_tid_t _tid; -} TRI_doc_prepare_transaction_marker_t; - //////////////////////////////////////////////////////////////////////////////// /// @brief creates a new datafile /// diff --git a/arangod/VocBase/document-collection.cpp b/arangod/VocBase/document-collection.cpp index 2892fb6073..cad52c5c06 100644 --- a/arangod/VocBase/document-collection.cpp +++ b/arangod/VocBase/document-collection.cpp @@ -32,6 +32,7 @@ #include "Basics/Logger.h" #include "Basics/tri-strings.h" #include "Basics/ThreadPool.h" +#include "Basics/WriteLocker.h" #include "Cluster/ServerState.h" #include "FulltextIndex/fulltext-index.h" #include "Indexes/EdgeIndex.h" diff --git a/arangod/VocBase/replication-dump.cpp b/arangod/VocBase/replication-dump.cpp index 7b5b3e2881..c65061ec85 100644 --- a/arangod/VocBase/replication-dump.cpp +++ b/arangod/VocBase/replication-dump.cpp @@ -677,7 +677,7 @@ static TRI_voc_cid_t GetCollectionFromWalMarker(TRI_df_marker_t const* marker) { if (!slice.isObject()) { return 0; } - VPackSlice const id = slice.get("id"); + VPackSlice const id = slice.get("cid"); if (id.isString()) { return std::stoull(id.copyString()); } diff --git a/arangod/VocBase/transaction.cpp b/arangod/VocBase/transaction.cpp index c70f9d8876..98000ed0ce 100644 --- a/arangod/VocBase/transaction.cpp +++ b/arangod/VocBase/transaction.cpp @@ -1056,17 +1056,18 @@ int TRI_AddOperationTransaction(TRI_transaction_t* trx, bool& waitForSync) { TRI_ASSERT(operation.header != nullptr); + TRI_document_collection_t* document = operation.document; bool const isSingleOperationTransaction = IsSingleOperationTransaction(trx); // upgrade the info for the transaction - if (waitForSync || operation.document->_info.waitForSync()) { + if (waitForSync || document->_info.waitForSync()) { trx->_waitForSync = true; } // default is false waitForSync = false; if (isSingleOperationTransaction) { - waitForSync |= operation.document->_info.waitForSync(); + waitForSync |= document->_info.waitForSync(); } TRI_IF_FAILURE("TransactionOperationNoSlot") { return TRI_ERROR_DEBUG; } @@ -1086,13 +1087,12 @@ int TRI_AddOperationTransaction(TRI_transaction_t* trx, TRI_voc_fid_t fid = 0; void const* position = nullptr; - TRI_document_collection_t* document = operation.document; - if (operation.marker->fid() == 0) { // this is a "real" marker that must be written into the logfiles - // No document or edge marker, just append it to the WAL: + // just append it to the WAL: arangodb::wal::SlotInfoCopy slotInfo = arangodb::wal::LogfileManager::instance()->allocateAndWrite( + trx->_vocbase->_id, document->_info.id(), operation.marker->mem(), operation.marker->size(), false); if (slotInfo.errorCode != TRI_ERROR_NO_ERROR) { // some error occurred @@ -1114,8 +1114,7 @@ int TRI_AddOperationTransaction(TRI_transaction_t* trx, if (operation.type == TRI_VOC_DOCUMENT_OPERATION_INSERT || operation.type == TRI_VOC_DOCUMENT_OPERATION_UPDATE) { // adjust the data position in the header - operation.header->setDataPtr( - position); // PROTECTED by ongoing trx from operation + operation.header->setDataPtr(position); } TRI_IF_FAILURE("TransactionOperationAfterAdjust") { return TRI_ERROR_DEBUG; } diff --git a/arangod/VocBase/vocbase.cpp b/arangod/VocBase/vocbase.cpp index 2b1ed5c5c9..79a82ba83a 100644 --- a/arangod/VocBase/vocbase.cpp +++ b/arangod/VocBase/vocbase.cpp @@ -40,6 +40,7 @@ #include "Basics/threads.h" #include "Basics/Exceptions.h" #include "Basics/FileUtils.h" +#include "Basics/WriteLocker.h" #include "Utils/CollectionKeysRepository.h" #include "Utils/CursorRepository.h" #include "Utils/transactions.h" diff --git a/arangod/Wal/CollectorThread.cpp b/arangod/Wal/CollectorThread.cpp index 90dec74309..96aa262dd0 100644 --- a/arangod/Wal/CollectorThread.cpp +++ b/arangod/Wal/CollectorThread.cpp @@ -38,7 +38,6 @@ #include "VocBase/DatafileStatistics.h" #include "VocBase/document-collection.h" #include "VocBase/server.h" -#include "VocBase/VocShaper.h" #include "Wal/Logfile.h" #include "Wal/LogfileManager.h" @@ -211,19 +210,6 @@ static bool ScanMarker(TRI_df_marker_t const* marker, void* data, break; } - case TRI_WAL_MARKER_BEGIN_REMOTE_TRANSACTION: - case TRI_WAL_MARKER_COMMIT_REMOTE_TRANSACTION: { - break; - } - - case TRI_WAL_MARKER_ABORT_REMOTE_TRANSACTION: { - transaction_remote_abort_marker_t const* m = - reinterpret_cast(marker); - // note which abort markers we found - state->handledTransactions.emplace(m->_transactionId); - break; - } - case TRI_WAL_MARKER_CREATE_COLLECTION: { VPackSlice const slice(p + sizeof(TRI_df_marker_t)); TRI_voc_tid_t cid = NumericValue(slice, "cid"); @@ -246,7 +232,7 @@ static bool ScanMarker(TRI_df_marker_t const* marker, void* data, case TRI_WAL_MARKER_CREATE_DATABASE: { VPackSlice const slice(p + sizeof(TRI_df_marker_t)); - TRI_voc_tick_t id = NumericValue(slice, "id"); + TRI_voc_tick_t id = NumericValue(slice, "database"); // note that the database is now considered not dropped state->droppedDatabases.erase(id); break; @@ -254,7 +240,7 @@ static bool ScanMarker(TRI_df_marker_t const* marker, void* data, case TRI_WAL_MARKER_DROP_DATABASE: { VPackSlice const slice(p + sizeof(TRI_df_marker_t)); - TRI_voc_tick_t id = NumericValue(slice, "id"); + TRI_voc_tick_t id = NumericValue(slice, "database"); // note that the database was dropped and doesn't need to be collected state->droppedDatabases.emplace(id); @@ -273,6 +259,20 @@ static bool ScanMarker(TRI_df_marker_t const* marker, void* data, } break; } + + case TRI_WAL_MARKER_BEGIN_REMOTE_TRANSACTION: + case TRI_WAL_MARKER_COMMIT_REMOTE_TRANSACTION: { + break; + } + + case TRI_WAL_MARKER_ABORT_REMOTE_TRANSACTION: { + transaction_remote_abort_marker_t const* m = + reinterpret_cast(marker); + // note which abort markers we found + state->handledTransactions.emplace(m->_transactionId); + break; + } + default: { // do nothing intentionally @@ -1301,7 +1301,7 @@ void CollectorThread::initMarker(TRI_df_marker_t* marker, TRI_ASSERT(marker != nullptr); marker->_size = size; - marker->_type = (TRI_df_marker_type_t)type; + marker->_type = static_cast(type); marker->_crc = 0; marker->_tick = 0; } diff --git a/arangod/Wal/CollectorThread.h b/arangod/Wal/CollectorThread.h index 90cb5f1030..26748dc2e4 100644 --- a/arangod/Wal/CollectorThread.h +++ b/arangod/Wal/CollectorThread.h @@ -37,7 +37,7 @@ #include "Wal/Logfile.h" struct TRI_datafile_t; -struct TRI_df_marker_s; +struct TRI_df_marker_t; struct TRI_document_collection_t; struct TRI_server_t; @@ -182,14 +182,14 @@ class CollectorThread : public Thread { /// @brief typedef key => document marker ////////////////////////////////////////////////////////////////////////////// - typedef std::unordered_map + typedef std::unordered_map DocumentOperationsType; ////////////////////////////////////////////////////////////////////////////// /// @brief typedef for structural operation (attributes, shapes) markers ////////////////////////////////////////////////////////////////////////////// - typedef std::vector OperationsType; + typedef std::vector OperationsType; public: void beginShutdown() override final; @@ -310,7 +310,7 @@ class CollectorThread : public Thread { /// @brief initialize a marker ////////////////////////////////////////////////////////////////////////////// - void initMarker(struct TRI_df_marker_s*, TRI_df_marker_type_e, + void initMarker(struct TRI_df_marker_t*, TRI_df_marker_type_e, TRI_voc_size_t); ////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/Wal/Logfile.cpp b/arangod/Wal/Logfile.cpp index b8e35ef7f3..f98943c5bf 100644 --- a/arangod/Wal/Logfile.cpp +++ b/arangod/Wal/Logfile.cpp @@ -178,6 +178,21 @@ TRI_df_header_marker_t Logfile::getHeaderMarker() const { return header; } +//////////////////////////////////////////////////////////////////////////////// +/// @brief create a prologue marker +//////////////////////////////////////////////////////////////////////////////// + +TRI_df_prologue_marker_t Logfile::getPrologueMarker(TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId) const { + TRI_df_prologue_marker_t header; + size_t const size = sizeof(TRI_df_prologue_marker_t); + TRI_InitMarkerDatafile((char*)&header, TRI_DF_MARKER_PROLOGUE, size); + + header._databaseId = databaseId; + header._collectionId = collectionId; + + return header; +} + //////////////////////////////////////////////////////////////////////////////// /// @brief create a footer marker //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/Wal/Logfile.h b/arangod/Wal/Logfile.h index 3e44e34bb0..46bcce2f9a 100644 --- a/arangod/Wal/Logfile.h +++ b/arangod/Wal/Logfile.h @@ -25,23 +25,11 @@ #define ARANGOD_WAL_LOGFILE_H 1 #include "Basics/Common.h" -#include "Basics/ReadWriteLock.h" -#include "Basics/ReadLocker.h" -#include "Basics/WriteLocker.h" #include "Basics/Logger.h" #include "VocBase/datafile.h" -#include "VocBase/shaped-json.h" #include "VocBase/voc-types.h" #include "Wal/Marker.h" -//////////////////////////////////////////////////////////////////////////////// -/// @brief number of cache buckets -/// TODO: convert to a C++11 constexpr once Visual Studio supports it -/// (Visual Studio 2015?) -//////////////////////////////////////////////////////////////////////////////// - -#define LOGFILE_LEGEND_CACHE_BUCKETS 8 - namespace arangodb { namespace wal { @@ -309,6 +297,12 @@ class Logfile { ////////////////////////////////////////////////////////////////////////////// TRI_df_header_marker_t getHeaderMarker() const; + + ////////////////////////////////////////////////////////////////////////////// + /// @brief create a prologue marker + ////////////////////////////////////////////////////////////////////////////// + + TRI_df_prologue_marker_t getPrologueMarker(TRI_voc_tick_t, TRI_voc_cid_t) const; ////////////////////////////////////////////////////////////////////////////// /// @brief create a footer marker @@ -345,43 +339,6 @@ class Logfile { --_users; } - ////////////////////////////////////////////////////////////////////////////// - /// @brief lookup a legend in the cache - ////////////////////////////////////////////////////////////////////////////// - - void* lookupLegend(TRI_voc_cid_t cid, TRI_shape_sid_t sid) { - CidSid cs(cid, sid); - - size_t const i = cs.hash() % LOGFILE_LEGEND_CACHE_BUCKETS; - - READ_LOCKER(readLocker, _legendCacheLock[i]); - - auto it = _legendCache[i].find(cs); - - if (it != _legendCache[i].end()) { - return it->second; - } - return nullptr; - } - - ////////////////////////////////////////////////////////////////////////////// - /// @brief cache a legend - ////////////////////////////////////////////////////////////////////////////// - - void cacheLegend(TRI_voc_cid_t cid, TRI_shape_sid_t sid, void* l) { - CidSid cs(cid, sid); - - size_t const i = cs.hash() % LOGFILE_LEGEND_CACHE_BUCKETS; - - WRITE_LOCKER(writeLocker, _legendCacheLock[i]); - - auto it = _legendCache[i].find(cs); - - if (it == _legendCache[i].end()) { - _legendCache[i].emplace(cs, l); - } - } - ////////////////////////////////////////////////////////////////////////////// /// @brief the logfile id ////////////////////////////////////////////////////////////////////////////// @@ -411,51 +368,6 @@ class Logfile { ////////////////////////////////////////////////////////////////////////////// std::atomic _collectQueueSize; - - private: - ////////////////////////////////////////////////////////////////////////////// - /// @brief legend cache, key type with hash function - ////////////////////////////////////////////////////////////////////////////// - - struct CidSid { - TRI_voc_cid_t cid; - TRI_shape_sid_t sid; - CidSid(TRI_voc_cid_t c, TRI_shape_sid_t s) : cid(c), sid(s) {} - - size_t hash() const { - CidSidHash h; - return h(this); - } - - bool operator==(CidSid const& a) const { - return this->cid == a.cid && this->sid == a.sid; - } - }; - - struct CidSidHash { - size_t operator()(CidSid const& cs) const { - return std::hash()(cs.cid) ^ - std::hash()(cs.sid); - } - - size_t operator()(CidSid const* cs) const { - return std::hash()(cs->cid) ^ - std::hash()(cs->sid); - } - }; - - ////////////////////////////////////////////////////////////////////////////// - /// @brief legend cache, split into buckets - ////////////////////////////////////////////////////////////////////////////// - - std::unordered_map - _legendCache[LOGFILE_LEGEND_CACHE_BUCKETS]; - - ////////////////////////////////////////////////////////////////////////////// - /// @brief legend cache, locks, split into buckets - ////////////////////////////////////////////////////////////////////////////// - - basics::ReadWriteLock _legendCacheLock[LOGFILE_LEGEND_CACHE_BUCKETS]; }; } } diff --git a/arangod/Wal/LogfileManager.cpp b/arangod/Wal/LogfileManager.cpp index 2a1c238459..826d80c420 100644 --- a/arangod/Wal/LogfileManager.cpp +++ b/arangod/Wal/LogfileManager.cpp @@ -116,7 +116,6 @@ LogfileManager::LogfileManager(TRI_server_t* server, std::string* databasePath) _allowOversizeEntries(true), _ignoreLogfileErrors(false), _ignoreRecoveryErrors(false), - _suppressShapeInformation(false), _allowWrites(false), // start in read-only mode _hasFoundLastTick(false), _inRecovery(true), @@ -215,9 +214,6 @@ void LogfileManager::setupOptions( "wal.reserve-logfiles", &_reserveLogfiles, "maximum number of reserve logfiles to maintain")( "wal.slots", &_numberOfSlots, "number of logfile slots to use")( - "wal.suppress-shape-information", &_suppressShapeInformation, - "do not write shape information for markers (saves a lot of disk space, " - "but effectively disables using the write-ahead log for replication)")( "wal.sync-interval", &_syncInterval, "interval for automatic, non-requested disk syncs (in milliseconds)")( "wal.throttle-when-pending", &_throttleWhenPending, @@ -238,7 +234,7 @@ bool LogfileManager::prepare() { if (_directory.empty()) { // use global configuration variable - _directory = (*_databasePath); + _directory = *_databasePath; if (!basics::FileUtils::isDirectory(_directory)) { std::string systemErrorStr; @@ -751,21 +747,13 @@ SlotInfo LogfileManager::allocate(uint32_t size) { } //////////////////////////////////////////////////////////////////////////////// -/// @brief allocate space in a logfile for later writing, version for legends -/// -/// See explanations about legends in the corresponding allocateAndWrite -/// convenience function. +/// @brief allocate space in a logfile for later writing //////////////////////////////////////////////////////////////////////////////// -SlotInfo LogfileManager::allocate(uint32_t size, TRI_voc_cid_t cid, - TRI_shape_sid_t sid, uint32_t legendOffset, - void*& oldLegend) { +SlotInfo LogfileManager::allocate(TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId, + uint32_t size) { if (!_allowWrites) { -// no writes allowed -#ifdef ARANGODB_ENABLE_MAINTAINER_MODE - TRI_ASSERT(false); -#endif - + // no writes allowed return SlotInfo(TRI_ERROR_ARANGO_READ_ONLY); } @@ -779,25 +767,19 @@ SlotInfo LogfileManager::allocate(uint32_t size, TRI_voc_cid_t cid, return SlotInfo(TRI_ERROR_ARANGO_DOCUMENT_TOO_LARGE); } - return _slots->nextUnused(size, cid, sid, legendOffset, oldLegend); + return _slots->nextUnused(databaseId, collectionId, size); } //////////////////////////////////////////////////////////////////////////////// -/// @brief finalize a log entry -//////////////////////////////////////////////////////////////////////////////// - -void LogfileManager::finalize(SlotInfo& slotInfo, bool waitForSync) { - _slots->returnUsed(slotInfo, waitForSync); -} - -//////////////////////////////////////////////////////////////////////////////// -/// @brief write data into the logfile +/// @brief write data into the logfile, using database id and collection id /// this is a convenience function that combines allocate, memcpy and finalize //////////////////////////////////////////////////////////////////////////////// -SlotInfoCopy LogfileManager::allocateAndWrite(void* src, uint32_t size, +SlotInfoCopy LogfileManager::allocateAndWrite(TRI_voc_tick_t databaseId, + TRI_voc_cid_t collectionId, + void* src, uint32_t size, bool waitForSync) { - SlotInfo slotInfo = allocate(size); + SlotInfo slotInfo = allocate(databaseId, collectionId, size); if (slotInfo.errorCode != TRI_ERROR_NO_ERROR) { return SlotInfoCopy(slotInfo.errorCode); @@ -827,11 +809,51 @@ SlotInfoCopy LogfileManager::allocateAndWrite(void* src, uint32_t size, /// this is a convenience function that combines allocate, memcpy and finalize //////////////////////////////////////////////////////////////////////////////// +SlotInfoCopy LogfileManager::allocateAndWrite(void* src, uint32_t size, + bool waitForSync) { + SlotInfo slotInfo = allocate(size); + + if (slotInfo.errorCode != TRI_ERROR_NO_ERROR) { + return SlotInfoCopy(slotInfo.errorCode); + } + + TRI_ASSERT(slotInfo.slot != nullptr); + + try { + slotInfo.slot->fill(src, size); + + // we must copy the slotinfo because finalize() will set its internals to 0 + // again + SlotInfoCopy copy(slotInfo.slot); + + finalize(slotInfo, waitForSync); + return copy; + } catch (...) { + // if we don't return the slot we'll run into serious problems later + finalize(slotInfo, false); + + return SlotInfoCopy(TRI_ERROR_INTERNAL); + } +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief write data into the logfile +/// this is a convenience function that combines allocate, memcpy and finalize +//////////////////////////////////////////////////////////////////////////////// + SlotInfoCopy LogfileManager::allocateAndWrite(Marker const& marker, bool waitForSync) { return allocateAndWrite(marker.mem(), marker.size(), waitForSync); } +//////////////////////////////////////////////////////////////////////////////// +/// @brief finalize a log entry +//////////////////////////////////////////////////////////////////////////////// + +void LogfileManager::finalize(SlotInfo& slotInfo, bool waitForSync) { + _slots->returnUsed(slotInfo, waitForSync); +} + //////////////////////////////////////////////////////////////////////////////// /// @brief wait for the collector queue to get cleared for the given collection //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/Wal/LogfileManager.h b/arangod/Wal/LogfileManager.h index 51522929d1..87a139a360 100644 --- a/arangod/Wal/LogfileManager.h +++ b/arangod/Wal/LogfileManager.h @@ -356,19 +356,25 @@ class LogfileManager : public rest::ApplicationFeature { ////////////////////////////////////////////////////////////////////////////// SlotInfo allocate(uint32_t); - + ////////////////////////////////////////////////////////////////////////////// - /// @brief reserve space in a logfile, version for legends + /// @brief reserve space in a logfile ////////////////////////////////////////////////////////////////////////////// - SlotInfo allocate(uint32_t, TRI_voc_cid_t cid, TRI_shape_sid_t sid, - uint32_t legendOffset, void*& oldLegend); + SlotInfo allocate(TRI_voc_tick_t, TRI_voc_cid_t, uint32_t); ////////////////////////////////////////////////////////////////////////////// /// @brief finalize a log entry ////////////////////////////////////////////////////////////////////////////// void finalize(SlotInfo&, bool); + + ////////////////////////////////////////////////////////////////////////////// + /// @brief write data into the logfile, using database id and collection id + /// this is a convenience function that combines allocate, memcpy and finalize + ////////////////////////////////////////////////////////////////////////////// + + SlotInfoCopy allocateAndWrite(TRI_voc_tick_t, TRI_voc_cid_t, void*, uint32_t, bool); ////////////////////////////////////////////////////////////////////////////// /// @brief write data into the logfile diff --git a/arangod/Wal/RecoverState.cpp b/arangod/Wal/RecoverState.cpp index 3d28bf3205..3322dd910e 100644 --- a/arangod/Wal/RecoverState.cpp +++ b/arangod/Wal/RecoverState.cpp @@ -46,6 +46,10 @@ using namespace arangodb::wal; template static inline T NumericValue(VPackSlice const& slice, char const* attribute) { + if (!slice.isObject()) { + LOG(ERR) << "invalid value type when looking for attribute '" << attribute << "': expecting object"; + THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL); + } VPackSlice v = slice.get(attribute); if (v.isString()) { return static_cast(std::stoull(v.copyString())); @@ -53,7 +57,9 @@ static inline T NumericValue(VPackSlice const& slice, char const* attribute) { if (v.isNumber()) { return v.getNumber(); } - return 0; + + LOG(ERR) << "invalid value for attribute '" << attribute << "'"; + THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL); } //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/Wal/Slots.cpp b/arangod/Wal/Slots.cpp index 5cee78ebf2..2a5a157aae 100644 --- a/arangod/Wal/Slots.cpp +++ b/arangod/Wal/Slots.cpp @@ -50,7 +50,9 @@ Slots::Slots(LogfileManager* logfileManager, size_t numberOfSlots, _lastAssignedTick(0), _lastCommittedTick(0), _lastCommittedDataTick(0), - _numEvents(0) { + _numEvents(0), + _lastDatabaseId(0), + _lastCollectionId(0) { _slots = new Slot[numberOfSlots]; } @@ -117,131 +119,22 @@ Slot::TickType Slots::lastCommittedTick() { //////////////////////////////////////////////////////////////////////////////// SlotInfo Slots::nextUnused(uint32_t size) { - // we need to use the aligned size for writing - uint32_t alignedSize = TRI_DF_ALIGN_BLOCK(size); - int iterations = 0; - bool hasWaited = false; - - TRI_ASSERT(size > 0); - - while (++iterations < 1000) { - { - MUTEX_LOCKER(mutexLocker, _lock); - - Slot* slot = &_slots[_handoutIndex]; - TRI_ASSERT(slot != nullptr); - - if (slot->isUnused()) { - if (hasWaited) { - CONDITION_LOCKER(guard, _condition); - TRI_ASSERT(_waiting > 0); - --_waiting; - } - - // cycle until we have a valid logfile - while (_logfile == nullptr || - _logfile->freeSize() < static_cast(alignedSize)) { - if (_logfile != nullptr) { - // seal existing logfile by creating a footer marker - int res = writeFooter(slot); - - if (res != TRI_ERROR_NO_ERROR) { - return SlotInfo(res); - } - - // advance to next slot - slot = &_slots[_handoutIndex]; - _logfileManager->setLogfileSealRequested(_logfile); - - _logfile = nullptr; - } - - TRI_IF_FAILURE("LogfileManagerGetWriteableLogfile") { - return SlotInfo(TRI_ERROR_ARANGO_NO_JOURNAL); - } - - // fetch the next free logfile (this may create a new one) - Logfile::StatusType status; - int res = newLogfile(alignedSize, status); - - if (res != TRI_ERROR_NO_ERROR) { - if (res != TRI_ERROR_ARANGO_NO_JOURNAL) { - return SlotInfo(res); - } - - usleep(10 * 1000); - // try again in next iteration - } else { - TRI_ASSERT(_logfile != nullptr); - - if (status == Logfile::StatusType::EMPTY) { - // initialize the empty logfile by writing a header marker - int res = writeHeader(slot); - - if (res != TRI_ERROR_NO_ERROR) { - return SlotInfo(res); - } - - // advance to next slot - slot = &_slots[_handoutIndex]; - _logfileManager->setLogfileOpen(_logfile); - } else { - TRI_ASSERT(status == Logfile::StatusType::OPEN); - } - } - } - - // if we get here, we got a free slot for the actual data... - - char* mem = _logfile->reserve(alignedSize); - - if (mem == nullptr) { - return SlotInfo(TRI_ERROR_INTERNAL); - } - - // only in this case we return a valid slot - slot->setUsed(static_cast(mem), size, _logfile->id(), handout()); - - return SlotInfo(slot); - } - } - - // if we get here, all slots are busy - CONDITION_LOCKER(guard, _condition); - if (!hasWaited) { - ++_waiting; - hasWaited = true; - } - - bool mustWait; - { - MUTEX_LOCKER(mutexLocker, _lock); - mustWait = (_freeSlots == 0); - } - - if (mustWait) { - guard.wait(10 * 1000); - } - } - - return SlotInfo(TRI_ERROR_ARANGO_NO_JOURNAL); + return nextUnused(0, 0, size); } //////////////////////////////////////////////////////////////////////////////// -/// @brief return the next unused slot, version for legends -/// -/// See explanations in arangod/Wal/LogfileManager.cpp in the -/// corresponding allocateAndWrite method. +/// @brief return the next unused slot //////////////////////////////////////////////////////////////////////////////// -SlotInfo Slots::nextUnused(uint32_t size, TRI_voc_cid_t cid, - TRI_shape_sid_t sid, uint32_t legendOffset, - void*& oldLegend) { - // legendOffset 0 means no legend included +SlotInfo Slots::nextUnused(TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId, + uint32_t size) { + static size_t const PrologueSize = sizeof(TRI_df_prologue_marker_t); + // we need to use the aligned size for writing uint32_t alignedSize = TRI_DF_ALIGN_BLOCK(size); int iterations = 0; bool hasWaited = false; + bool mustWritePrologue = false; TRI_ASSERT(size > 0); @@ -259,6 +152,20 @@ SlotInfo Slots::nextUnused(uint32_t size, TRI_voc_cid_t cid, --_waiting; } + if (databaseId == 0 && collectionId == 0) { + _lastDatabaseId = 0; + _lastCollectionId = 0; + } + else if (!mustWritePrologue && + databaseId > 0 && + collectionId > 0 && + _lastDatabaseId != databaseId && + _lastCollectionId != collectionId) { + // write a prologue + alignedSize += PrologueSize; + mustWritePrologue = true; + } + // cycle until we have a valid logfile while (_logfile == nullptr || _logfile->freeSize() < static_cast(alignedSize)) { @@ -314,25 +221,31 @@ SlotInfo Slots::nextUnused(uint32_t size, TRI_voc_cid_t cid, // if we get here, we got a free slot for the actual data... - // Now sort out the legend business: - if (legendOffset == 0) { - void* legend = _logfile->lookupLegend(cid, sid); - if (nullptr == legend) { - // Bad, we would need a legend for this marker - return SlotInfo(TRI_ERROR_LEGEND_NOT_IN_WAL_FILE); - } - oldLegend = legend; - } - char* mem = _logfile->reserve(alignedSize); if (mem == nullptr) { return SlotInfo(TRI_ERROR_INTERNAL); } - if (legendOffset != 0) { - void* legend = static_cast(mem + legendOffset); - _logfile->cacheLegend(cid, sid, legend); + if (mustWritePrologue) { + // write prologue... + + // hand out the prologue slot and directly fill it + int res = writePrologue(slot, databaseId, collectionId); + + if (res != TRI_ERROR_NO_ERROR) { + return SlotInfo(res); + } + + // now return the slot + mem += PrologueSize; // advance memory pointer + + // use following slot for the actual data + slot = &_slots[_handoutIndex]; + + // note database and collection id for next time + _lastDatabaseId = databaseId; + _lastCollectionId = collectionId; } // only in this case we return a valid slot @@ -673,7 +586,7 @@ int Slots::closeLogfile(Slot::TickType& lastCommittedTick, bool& worked) { //////////////////////////////////////////////////////////////////////////////// int Slots::writeHeader(Slot* slot) { - TRI_df_header_marker_t&& header = _logfile->getHeaderMarker(); + TRI_df_header_marker_t header = _logfile->getHeaderMarker(); size_t const size = header.base._size; TRI_df_marker_t* mem = @@ -683,7 +596,31 @@ int Slots::writeHeader(Slot* slot) { slot->setUsed(static_cast(mem), static_cast(size), _logfile->id(), handout()); slot->fill(&header.base, size); - slot->setReturned(false); // sync + slot->setReturned(false); // no sync + + // reset values for next write + _lastDatabaseId = 0; + _lastCollectionId = 0; + + return TRI_ERROR_NO_ERROR; +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief write a prologue marker +//////////////////////////////////////////////////////////////////////////////// + +int Slots::writePrologue(Slot* slot, TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId) { + TRI_df_header_marker_t header = _logfile->getHeaderMarker(); + size_t const size = header.base._size; + + TRI_df_marker_t* mem = + reinterpret_cast(_logfile->reserve(size)); + TRI_ASSERT(mem != nullptr); + + slot->setUsed(static_cast(mem), static_cast(size), + _logfile->id(), handout()); + slot->fill(&header.base, size); + slot->setReturned(false); // no sync return TRI_ERROR_NO_ERROR; } @@ -695,7 +632,7 @@ int Slots::writeHeader(Slot* slot) { int Slots::writeFooter(Slot* slot) { TRI_ASSERT(_logfile != nullptr); - TRI_df_footer_marker_t&& footer = _logfile->getFooterMarker(); + TRI_df_footer_marker_t footer = _logfile->getFooterMarker(); size_t const size = footer.base._size; TRI_df_marker_t* mem = @@ -706,6 +643,10 @@ int Slots::writeFooter(Slot* slot) { _logfile->id(), handout()); slot->fill(&footer.base, size); slot->setReturned(true); // sync + + // reset values for next write + _lastDatabaseId = 0; + _lastCollectionId = 0; return TRI_ERROR_NO_ERROR; } diff --git a/arangod/Wal/Slots.h b/arangod/Wal/Slots.h index b23c67bc75..a4f955f2fd 100644 --- a/arangod/Wal/Slots.h +++ b/arangod/Wal/Slots.h @@ -114,13 +114,8 @@ class Slots { ////////////////////////////////////////////////////////////////////////////// SlotInfo nextUnused(uint32_t); - - ////////////////////////////////////////////////////////////////////////////// - /// @brief return the next unused slot, version for legends - ////////////////////////////////////////////////////////////////////////////// - - SlotInfo nextUnused(uint32_t size, TRI_voc_cid_t cid, TRI_shape_sid_t sid, - uint32_t legendIncluded, void*& oldLegend); + + SlotInfo nextUnused(TRI_voc_tick_t, TRI_voc_cid_t, uint32_t); ////////////////////////////////////////////////////////////////////////////// /// @brief return a used slot, allowing its synchronization @@ -165,6 +160,12 @@ class Slots { ////////////////////////////////////////////////////////////////////////////// int writeHeader(Slot*); + + ////////////////////////////////////////////////////////////////////////////// + /// @brief writes a prologue for a document/remove marker + ////////////////////////////////////////////////////////////////////////////// + + int writePrologue(Slot*, TRI_voc_tick_t, TRI_voc_cid_t); ////////////////////////////////////////////////////////////////////////////// /// @brief write a footer marker @@ -275,6 +276,18 @@ class Slots { ////////////////////////////////////////////////////////////////////////////// uint64_t _numEvents; + + ////////////////////////////////////////////////////////////////////////////// + /// @brief last written database id (in prologue marker) + ////////////////////////////////////////////////////////////////////////////// + + TRI_voc_tick_t _lastDatabaseId; + + ////////////////////////////////////////////////////////////////////////////// + /// @brief last written collection id (in prologue marker) + ////////////////////////////////////////////////////////////////////////////// + + TRI_voc_cid_t _lastCollectionId; }; } }