diff --git a/3rdParty/velocypack/src/Builder.cpp b/3rdParty/velocypack/src/Builder.cpp index 6167aeea5c..3b8c78651f 100644 --- a/3rdParty/velocypack/src/Builder.cpp +++ b/3rdParty/velocypack/src/Builder.cpp @@ -383,7 +383,7 @@ Builder& Builder::close() { _start[tos] = 0x0b; // First determine byte length and its format: - unsigned int offsetSize; + unsigned int offsetSize = 8; // can be 1, 2, 4 or 8 for the byte width of the offsets, // the byte length and the number of subvalues: if (_pos - tos + index.size() - 6 <= 0xff) { @@ -392,16 +392,8 @@ Builder& Builder::close() { // case we would win back 6 bytes but would need one byte per subvalue // for the index table offsetSize = 1; - } else if (_pos - tos + 2 * index.size() <= 0xffff) { - offsetSize = 2; - } else if (_pos - tos + 4 * index.size() <= 0xffffffffu) { - offsetSize = 4; - } else { - offsetSize = 8; - } - // Maybe we need to move down data: - if (offsetSize == 1) { + // Maybe we need to move down data: ValueLength targetPos = 3; if (_pos > (tos + 9)) { ValueLength len = _pos - (tos + 9); @@ -413,15 +405,19 @@ Builder& Builder::close() { for (size_t i = 0; i < n; i++) { index[i] -= diff; } + + // One could move down things in the offsetSize == 2 case as well, + // since we only need 4 bytes in the beginning. However, saving these + // 4 bytes has been sacrificed on the Altar of Performance. + } else if (_pos - tos + 2 * index.size() <= 0xffff) { + offsetSize = 2; + } else if (_pos - tos + 4 * index.size() <= 0xffffffffu) { + offsetSize = 4; } - // One could move down things in the offsetSize == 2 case as well, - // since we only need 4 bytes in the beginning. However, saving these - // 4 bytes has been sacrificed on the Altar of Performance. // Now build the table: - ValueLength tableBase; reserveSpace(offsetSize * index.size() + (offsetSize == 8 ? 8 : 0)); - tableBase = _pos; + ValueLength tableBase = _pos; _pos += offsetSize * index.size(); // Object if (index.size() >= 2) { diff --git a/arangod/Aql/ShortestPathBlock.cpp b/arangod/Aql/ShortestPathBlock.cpp index 3a155ba6e4..ac677fed07 100644 --- a/arangod/Aql/ShortestPathBlock.cpp +++ b/arangod/Aql/ShortestPathBlock.cpp @@ -80,7 +80,7 @@ struct ConstDistanceExpanderLocal { void operator()(VPackSlice const& v, std::vector& resEdges, std::vector& neighbors) { ManagedDocumentResult* mmdr = _block->_mmdr.get(); - std::shared_ptr edgeCursor; + std::unique_ptr edgeCursor; for (auto const& edgeCollection : _block->_collectionInfos) { TRI_ASSERT(edgeCollection != nullptr); if (_isReverse) { diff --git a/arangod/Indexes/EdgeIndex.cpp b/arangod/Indexes/EdgeIndex.cpp index ec89b11264..8e69babf71 100644 --- a/arangod/Indexes/EdgeIndex.cpp +++ b/arangod/Indexes/EdgeIndex.cpp @@ -691,8 +691,15 @@ IndexIterator* EdgeIndex::iteratorForSlice( // Invalid searchValue return nullptr; } - VPackSlice const from = searchValues.at(0); - VPackSlice const to = searchValues.at(1); + + VPackArrayIterator it(searchValues); + TRI_ASSERT(it.valid()); + + VPackSlice const from = it.value(); + + it.next(); + TRI_ASSERT(it.valid()); + VPackSlice const to = it.value(); if (!from.isNull()) { TRI_ASSERT(from.isArray()); diff --git a/arangod/Indexes/PrimaryIndex.cpp b/arangod/Indexes/PrimaryIndex.cpp index 6137eefe01..af7764f856 100644 --- a/arangod/Indexes/PrimaryIndex.cpp +++ b/arangod/Indexes/PrimaryIndex.cpp @@ -344,7 +344,7 @@ int PrimaryIndex::insertKey(arangodb::Transaction* trx, TRI_voc_rid_t revisionId ManagedDocumentResult result(trx); IndexLookupContext context(trx, _collection, &result, 1); SimpleIndexElement element(buildKeyElement(revisionId, doc)); - + return _primaryIndex->insert(&context, element); } diff --git a/arangod/Replication/InitialSyncer.cpp b/arangod/Replication/InitialSyncer.cpp index 21bd11873d..5dadf7f732 100644 --- a/arangod/Replication/InitialSyncer.cpp +++ b/arangod/Replication/InitialSyncer.cpp @@ -974,6 +974,7 @@ int InitialSyncer::handleCollectionSync( return TRI_ERROR_REPLICATION_INVALID_RESPONSE; } + if (count.getNumber() <= 0) { // remote collection has no documents. now truncate our local collection SingleCollectionTransaction trx(StandaloneTransactionContext::Create(_vocbase), col->cid(), TRI_TRANSACTION_WRITE); @@ -1056,6 +1057,12 @@ int InitialSyncer::handleSyncKeys(arangodb::LogicalCollection* col, TRI_DEFER(col->ditches()->freeDitch(ditch)); + std::unordered_set chunks; + auto cleanupChunks = [&chunks]() { + for (auto& chunk : chunks) { chunk->release(); } + }; + TRI_DEFER(cleanupChunks()); + { SingleCollectionTransaction trx(StandaloneTransactionContext::Create(_vocbase), col->cid(), TRI_TRANSACTION_READ); @@ -1087,6 +1094,8 @@ int InitialSyncer::handleSyncKeys(arangodb::LogicalCollection* col, return true; }); + trx.transactionContext()->stealChunks(chunks); + if (checkAborted()) { return TRI_ERROR_REPLICATION_APPLIER_STOPPED; } @@ -1185,7 +1194,7 @@ int InitialSyncer::handleSyncKeys(arangodb::LogicalCollection* col, return TRI_ERROR_REPLICATION_INVALID_RESPONSE; } - + OperationOptions options; options.silent = true; options.ignoreRevs = true; @@ -1338,7 +1347,7 @@ int InitialSyncer::handleSyncKeys(arangodb::LogicalCollection* col, match = false; } } - + if (match) { // match nextStart = localTo + 1; diff --git a/arangod/StorageEngine/MMFilesCollection.cpp b/arangod/StorageEngine/MMFilesCollection.cpp index 68909e179e..56b0bca39e 100644 --- a/arangod/StorageEngine/MMFilesCollection.cpp +++ b/arangod/StorageEngine/MMFilesCollection.cpp @@ -31,6 +31,7 @@ #include "Indexes/PrimaryIndex.h" #include "Logger/Logger.h" #include "RestServer/DatabaseFeature.h" +#include "StorageEngine/EngineSelectorFeature.h" #include "StorageEngine/MMFilesDocumentPosition.h" #include "StorageEngine/StorageEngine.h" #include "Utils/SingleCollectionTransaction.h" @@ -73,16 +74,20 @@ int MMFilesCollection::OpenIteratorHandleDocumentMarker(TRI_df_marker_t const* m TRI_ASSERT(trx != nullptr); VPackSlice const slice(reinterpret_cast(marker) + DatafileHelper::VPackOffset(TRI_DF_MARKER_VPACK_DOCUMENT)); + uint8_t const* vpack = slice.begin(); VPackSlice keySlice; TRI_voc_rid_t revisionId; Transaction::extractKeyAndRevFromDocument(slice, keySlice, revisionId); - collection->setRevision(revisionId, false); - VPackValueLength length; - char const* p = keySlice.getString(length); - collection->keyGenerator()->track(p, length); + c->setRevision(revisionId, false); + + if (state->_trackKeys) { + VPackValueLength length; + char const* p = keySlice.getString(length); + collection->keyGenerator()->track(p, length); + } ++state->_documents; @@ -93,19 +98,16 @@ int MMFilesCollection::OpenIteratorHandleDocumentMarker(TRI_df_marker_t const* m state->_dfi = FindDatafileStats(state, fid); } - auto primaryIndex = collection->primaryIndex(); - // no primary index lock required here because we are the only ones reading // from the index ATM - SimpleIndexElement* found = primaryIndex->lookupKeyRef(trx, keySlice, state->_mmdr); + SimpleIndexElement* found = state->_primaryIndex->lookupKeyRef(trx, keySlice, state->_mmdr); // it is a new entry if (found == nullptr || found->revisionId() == 0) { - uint8_t const* vpack = reinterpret_cast(marker) + arangodb::DatafileHelper::VPackOffset(TRI_DF_MARKER_VPACK_DOCUMENT); - c->insertRevision(revisionId, vpack, fid, false); + c->insertRevision(revisionId, vpack, fid, false, false); // insert into primary index - int res = primaryIndex->insertKey(trx, revisionId, VPackSlice(vpack), state->_mmdr); + int res = state->_primaryIndex->insertKey(trx, revisionId, VPackSlice(vpack), state->_mmdr); if (res != TRI_ERROR_NO_ERROR) { c->removeRevision(revisionId, false); @@ -121,10 +123,9 @@ int MMFilesCollection::OpenIteratorHandleDocumentMarker(TRI_df_marker_t const* m // it is an update else { - uint8_t const* vpack = reinterpret_cast(marker) + arangodb::DatafileHelper::VPackOffset(TRI_DF_MARKER_VPACK_DOCUMENT); TRI_voc_rid_t const oldRevisionId = found->revisionId(); // update the revision id in primary index - found->updateRevisionId(revisionId, static_cast(keySlice.begin() - slice.begin())); + found->updateRevisionId(revisionId, static_cast(keySlice.begin() - vpack)); MMFilesDocumentPosition const old = c->lookupRevision(oldRevisionId); @@ -132,7 +133,7 @@ int MMFilesCollection::OpenIteratorHandleDocumentMarker(TRI_df_marker_t const* m c->removeRevision(oldRevisionId, false); // insert new revision - c->insertRevision(revisionId, vpack, fid, false); + c->insertRevision(revisionId, vpack, fid, false, false); // update the datafile info DatafileStatisticsContainer* dfi; @@ -174,10 +175,12 @@ int MMFilesCollection::OpenIteratorHandleDeletionMarker(TRI_df_marker_t const* m Transaction::extractKeyAndRevFromDocument(slice, keySlice, revisionId); - collection->setRevision(revisionId, false); - VPackValueLength length; - char const* p = keySlice.getString(length); - collection->keyGenerator()->track(p, length); + c->setRevision(revisionId, false); + if (state->_trackKeys) { + VPackValueLength length; + char const* p = keySlice.getString(length); + collection->keyGenerator()->track(p, length); + } ++state->_deletions; @@ -189,8 +192,7 @@ int MMFilesCollection::OpenIteratorHandleDeletionMarker(TRI_df_marker_t const* m // no primary index lock required here because we are the only ones reading // from the index ATM - auto primaryIndex = collection->primaryIndex(); - SimpleIndexElement found = primaryIndex->lookupKey(trx, keySlice, state->_mmdr); + SimpleIndexElement found = state->_primaryIndex->lookupKey(trx, keySlice, state->_mmdr); // it is a new entry, so we missed the create if (!found) { @@ -224,7 +226,7 @@ int MMFilesCollection::OpenIteratorHandleDeletionMarker(TRI_df_marker_t const* m dfi->sizeDead += DatafileHelper::AlignedSize(size); state->_dfi->numberDeletions++; - primaryIndex->removeKey(trx, oldRevisionId, VPackSlice(vpack), state->_mmdr); + state->_primaryIndex->removeKey(trx, oldRevisionId, VPackSlice(vpack), state->_mmdr); c->removeRevision(oldRevisionId, true); } @@ -1155,10 +1157,10 @@ uint8_t const* MMFilesCollection::lookupRevisionVPackConditional(TRI_voc_rid_t r return vpack; } -void MMFilesCollection::insertRevision(TRI_voc_rid_t revisionId, uint8_t const* dataptr, TRI_voc_fid_t fid, bool isInWal) { +void MMFilesCollection::insertRevision(TRI_voc_rid_t revisionId, uint8_t const* dataptr, TRI_voc_fid_t fid, bool isInWal, bool shouldLock) { TRI_ASSERT(revisionId != 0); TRI_ASSERT(dataptr != nullptr); - _revisionsCache.insert(revisionId, dataptr, fid, isInWal); + _revisionsCache.insert(revisionId, dataptr, fid, isInWal, shouldLock); } void MMFilesCollection::updateRevision(TRI_voc_rid_t revisionId, uint8_t const* dataptr, TRI_voc_fid_t fid, bool isInWal) { diff --git a/arangod/StorageEngine/MMFilesCollection.h b/arangod/StorageEngine/MMFilesCollection.h index 10c99c2313..679ae67954 100644 --- a/arangod/StorageEngine/MMFilesCollection.h +++ b/arangod/StorageEngine/MMFilesCollection.h @@ -30,6 +30,7 @@ #include "StorageEngine/MMFilesDatafileStatistics.h" #include "StorageEngine/MMFilesRevisionsCache.h" #include "VocBase/Ditch.h" +#include "VocBase/KeyGenerator.h" #include "VocBase/ManagedDocumentResult.h" #include "VocBase/PhysicalCollection.h" @@ -38,6 +39,7 @@ struct TRI_df_marker_t; namespace arangodb { class LogicalCollection; +class PrimaryIndex; class MMFilesCollection final : public PhysicalCollection { friend class MMFilesCompactorThread; @@ -47,6 +49,7 @@ class MMFilesCollection final : public PhysicalCollection { /// @brief state during opening of a collection struct OpenIteratorState { LogicalCollection* _collection; + arangodb::PrimaryIndex* _primaryIndex; TRI_voc_tid_t _tid; TRI_voc_fid_t _fid; std::unordered_map _stats; @@ -57,9 +60,11 @@ class MMFilesCollection final : public PhysicalCollection { uint64_t _deletions; uint64_t _documents; int64_t _initialCount; + bool const _trackKeys; OpenIteratorState(LogicalCollection* collection, arangodb::Transaction* trx) : _collection(collection), + _primaryIndex(collection->primaryIndex()), _tid(0), _fid(0), _stats(), @@ -69,7 +74,8 @@ class MMFilesCollection final : public PhysicalCollection { _context(trx, collection, &_mmdr, 1), _deletions(0), _documents(0), - _initialCount(-1) { + _initialCount(-1), + _trackKeys(collection->keyGenerator()->trackKeys()) { TRI_ASSERT(collection != nullptr); TRI_ASSERT(trx != nullptr); } @@ -191,7 +197,7 @@ class MMFilesCollection final : public PhysicalCollection { uint8_t const* lookupRevisionVPack(TRI_voc_rid_t revisionId) const override; uint8_t const* lookupRevisionVPackConditional(TRI_voc_rid_t revisionId, TRI_voc_tick_t maxTick, bool excludeWal) const override; - void insertRevision(TRI_voc_rid_t revisionId, uint8_t const* dataptr, TRI_voc_fid_t fid, bool isInWal) override; + void insertRevision(TRI_voc_rid_t revisionId, uint8_t const* dataptr, TRI_voc_fid_t fid, bool isInWal, bool shouldLock) override; void updateRevision(TRI_voc_rid_t revisionId, uint8_t const* dataptr, TRI_voc_fid_t fid, bool isInWal) override; bool updateRevisionConditional(TRI_voc_rid_t revisionId, TRI_df_marker_t const* oldPosition, TRI_df_marker_t const* newPosition, TRI_voc_fid_t newFid, bool isInWal) override; void removeRevision(TRI_voc_rid_t revisionId, bool updateStats) override; diff --git a/arangod/StorageEngine/MMFilesRevisionsCache.cpp b/arangod/StorageEngine/MMFilesRevisionsCache.cpp index f2d5c0d4eb..94639b5b93 100644 --- a/arangod/StorageEngine/MMFilesRevisionsCache.cpp +++ b/arangod/StorageEngine/MMFilesRevisionsCache.cpp @@ -77,11 +77,11 @@ void MMFilesRevisionsCache::clear() { _positions.truncate([](MMFilesDocumentPosition&) { return true; }); } -void MMFilesRevisionsCache::insert(TRI_voc_rid_t revisionId, uint8_t const* dataptr, TRI_voc_fid_t fid, bool isInWal) { +void MMFilesRevisionsCache::insert(TRI_voc_rid_t revisionId, uint8_t const* dataptr, TRI_voc_fid_t fid, bool isInWal, bool shouldLock) { TRI_ASSERT(revisionId != 0); TRI_ASSERT(dataptr != nullptr); - WRITE_LOCKER(locker, _lock); + CONDITIONAL_WRITE_LOCKER(locker, _lock, shouldLock); int res = _positions.insert(nullptr, MMFilesDocumentPosition(revisionId, dataptr, fid, isInWal)); if (res != TRI_ERROR_NO_ERROR) { _positions.removeByKey(nullptr, &revisionId); diff --git a/arangod/StorageEngine/MMFilesRevisionsCache.h b/arangod/StorageEngine/MMFilesRevisionsCache.h index de4c566527..02c1037c5d 100644 --- a/arangod/StorageEngine/MMFilesRevisionsCache.h +++ b/arangod/StorageEngine/MMFilesRevisionsCache.h @@ -43,7 +43,7 @@ class MMFilesRevisionsCache { void sizeHint(int64_t hint); void clear(); MMFilesDocumentPosition lookup(TRI_voc_rid_t revisionId) const; - void insert(TRI_voc_rid_t revisionId, uint8_t const* dataptr, TRI_voc_fid_t fid, bool isInWal); + void insert(TRI_voc_rid_t revisionId, uint8_t const* dataptr, TRI_voc_fid_t fid, bool isInWal, bool shouldLock); void update(TRI_voc_rid_t revisionId, uint8_t const* dataptr, TRI_voc_fid_t fid, bool isInWal); bool updateConditional(TRI_voc_rid_t revisionId, TRI_df_marker_t const* oldPosition, TRI_df_marker_t const* newPosition, TRI_voc_fid_t newFid, bool isInWal); void remove(TRI_voc_rid_t revisionId); diff --git a/arangod/Utils/OperationResult.h b/arangod/Utils/OperationResult.h index a86dd50874..871e3df0ac 100644 --- a/arangod/Utils/OperationResult.h +++ b/arangod/Utils/OperationResult.h @@ -71,8 +71,7 @@ struct OperationResult { wasSynchronous(wasSynchronous), countErrorCodes(countErrorCodes) {} - virtual ~OperationResult() { - } + ~OperationResult() {} bool successful() const { return code == TRI_ERROR_NO_ERROR; @@ -82,7 +81,7 @@ struct OperationResult { return !successful(); } - VPackSlice slice() const { + inline VPackSlice slice() const { TRI_ASSERT(buffer != nullptr); return VPackSlice(buffer->data()); } diff --git a/arangod/VocBase/CollectionRevisionsCache.cpp b/arangod/VocBase/CollectionRevisionsCache.cpp index 0770ce2845..d547294c69 100644 --- a/arangod/VocBase/CollectionRevisionsCache.cpp +++ b/arangod/VocBase/CollectionRevisionsCache.cpp @@ -56,7 +56,10 @@ static bool IsEqualElementElement(void*, RevisionCacheEntry const& left, } // namespace CollectionRevisionsCache::CollectionRevisionsCache(LogicalCollection* collection, RevisionCacheChunkAllocator* allocator) - : _revisions(HashKey, HashElement, IsEqualKeyElement, IsEqualElementElement, IsEqualElementElement, 8, [this]() -> std::string { return std::string("revisions for ") + this->_collection->name(); }), _collection(collection), _readCache(allocator, this) {} + : _revisions(HashKey, HashElement, IsEqualKeyElement, IsEqualElementElement, IsEqualElementElement, 8, [this]() -> std::string { return std::string("revisions for ") + this->_collection->name(); }), + _collection(collection), + _readCache(allocator, this), + _allowInvalidation(true) {} CollectionRevisionsCache::~CollectionRevisionsCache() { try { @@ -94,7 +97,7 @@ void CollectionRevisionsCache::sizeHint(int64_t hint) { _revisions.resize(nullptr, static_cast(hint * 1.1)); } } - + // look up a revision bool CollectionRevisionsCache::lookupRevision(Transaction* trx, ManagedDocumentResult& result, TRI_voc_rid_t revisionId, bool shouldLock) { TRI_ASSERT(revisionId != 0); @@ -103,7 +106,7 @@ bool CollectionRevisionsCache::lookupRevision(Transaction* trx, ManagedDocumentR return true; } - READ_LOCKER(locker, _lock); + CONDITIONAL_READ_LOCKER(locker, _lock, shouldLock); RevisionCacheEntry found = _revisions.findByKey(nullptr, &revisionId); @@ -119,7 +122,7 @@ bool CollectionRevisionsCache::lookupRevision(Transaction* trx, ManagedDocumentR ChunkProtector protector = _readCache.insertAndLease(revisionId, reinterpret_cast(logfile->data() + found.offset()), result); // must have succeeded (otherwise an exception was thrown) // and insert result into the hash - insertRevision(revisionId, protector.chunk(), protector.offset(), protector.version()); + insertRevision(revisionId, protector.chunk(), protector.offset(), protector.version(), shouldLock); // TODO: handle WAL reference counters return true; } @@ -144,11 +147,11 @@ bool CollectionRevisionsCache::lookupRevision(Transaction* trx, ManagedDocumentR // insert found revision into our hash ChunkProtector protector = _readCache.insertAndLease(revisionId, vpack, result); // insert result into the hash - insertRevision(revisionId, protector.chunk(), protector.offset(), protector.version()); + insertRevision(revisionId, protector.chunk(), protector.offset(), protector.version(), shouldLock); return true; } -bool CollectionRevisionsCache::lookupRevisionConditional(Transaction* trx, ManagedDocumentResult& result, TRI_voc_rid_t revisionId, TRI_voc_tick_t maxTick, bool excludeWal) { +bool CollectionRevisionsCache::lookupRevisionConditional(Transaction* trx, ManagedDocumentResult& result, TRI_voc_rid_t revisionId, TRI_voc_tick_t maxTick, bool excludeWal, bool shouldLock) { // fetch document from engine uint8_t const* vpack = readFromEngineConditional(revisionId, maxTick, excludeWal); @@ -159,18 +162,18 @@ bool CollectionRevisionsCache::lookupRevisionConditional(Transaction* trx, Manag // insert found revision into our hash ChunkProtector protector = _readCache.insertAndLease(revisionId, vpack, result); // insert result into the hash - insertRevision(revisionId, protector.chunk(), protector.offset(), protector.version()); + insertRevision(revisionId, protector.chunk(), protector.offset(), protector.version(), shouldLock); return true; } // insert from chunk -void CollectionRevisionsCache::insertRevision(TRI_voc_rid_t revisionId, RevisionCacheChunk* chunk, uint32_t offset, uint32_t version) { +void CollectionRevisionsCache::insertRevision(TRI_voc_rid_t revisionId, RevisionCacheChunk* chunk, uint32_t offset, uint32_t version, bool shouldLock) { TRI_ASSERT(revisionId != 0); TRI_ASSERT(chunk != nullptr); TRI_ASSERT(offset != UINT32_MAX); TRI_ASSERT(version != 0 && version != UINT32_MAX); - WRITE_LOCKER(locker, _lock); + CONDITIONAL_WRITE_LOCKER(locker, _lock, shouldLock); int res = _revisions.insert(nullptr, RevisionCacheEntry(revisionId, chunk, offset, version)); if (res != TRI_ERROR_NO_ERROR) { @@ -181,9 +184,9 @@ void CollectionRevisionsCache::insertRevision(TRI_voc_rid_t revisionId, Revision } // insert from WAL -void CollectionRevisionsCache::insertRevision(TRI_voc_rid_t revisionId, wal::Logfile* logfile, uint32_t offset) { - TRI_ASSERT(false); - WRITE_LOCKER(locker, _lock); +void CollectionRevisionsCache::insertRevision(TRI_voc_rid_t revisionId, wal::Logfile* logfile, uint32_t offset, bool shouldLock) { + CONDITIONAL_WRITE_LOCKER(locker, _lock, shouldLock); + int res = _revisions.insert(nullptr, RevisionCacheEntry(revisionId, logfile, offset)); if (res != TRI_ERROR_NO_ERROR) { diff --git a/arangod/VocBase/CollectionRevisionsCache.h b/arangod/VocBase/CollectionRevisionsCache.h index 54f971c960..5396874866 100644 --- a/arangod/VocBase/CollectionRevisionsCache.h +++ b/arangod/VocBase/CollectionRevisionsCache.h @@ -54,17 +54,25 @@ class CollectionRevisionsCache { void sizeHint(int64_t hint); + bool allowInvalidation() const { + return _allowInvalidation.load(); + } + + void allowInvalidation(bool value) { + _allowInvalidation.store(value); + } + // look up a revision bool lookupRevision(arangodb::Transaction* trx, ManagedDocumentResult& result, TRI_voc_rid_t revisionId, bool shouldLock); // conditionally look up a revision - bool lookupRevisionConditional(arangodb::Transaction* trx, ManagedDocumentResult& result, TRI_voc_rid_t revisionId, TRI_voc_tick_t maxTick, bool excludeWal); + bool lookupRevisionConditional(arangodb::Transaction* trx, ManagedDocumentResult& result, TRI_voc_rid_t revisionId, TRI_voc_tick_t maxTick, bool excludeWal, bool shouldLock); // insert from chunk - void insertRevision(TRI_voc_rid_t revisionId, RevisionCacheChunk* chunk, uint32_t offset, uint32_t version); + void insertRevision(TRI_voc_rid_t revisionId, RevisionCacheChunk* chunk, uint32_t offset, uint32_t version, bool shouldLock); // insert from WAL - void insertRevision(TRI_voc_rid_t revisionId, wal::Logfile* logfile, uint32_t offset); + void insertRevision(TRI_voc_rid_t revisionId, wal::Logfile* logfile, uint32_t offset, bool shouldLock); // remove a revision void removeRevision(TRI_voc_rid_t revisionId); @@ -85,6 +93,8 @@ class CollectionRevisionsCache { LogicalCollection* _collection; ReadCache _readCache; + + std::atomic _allowInvalidation; }; } // namespace arangodb diff --git a/arangod/VocBase/KeyGenerator.cpp b/arangod/VocBase/KeyGenerator.cpp index 3d9caad3a1..868493ba30 100644 --- a/arangod/VocBase/KeyGenerator.cpp +++ b/arangod/VocBase/KeyGenerator.cpp @@ -280,11 +280,9 @@ int TraditionalKeyGenerator::validate(std::string const& key, bool isRestore) { /// @brief track usage of a key //////////////////////////////////////////////////////////////////////////////// -void TraditionalKeyGenerator::track(char const*, VPackValueLength) {} - -//////////////////////////////////////////////////////////////////////////////// -/// @brief create a VPack representation of the generator -//////////////////////////////////////////////////////////////////////////////// +void TraditionalKeyGenerator::track(char const*, VPackValueLength) { + TRI_ASSERT(false); +} //////////////////////////////////////////////////////////////////////////////// /// @brief create a VPack representation of the generator diff --git a/arangod/VocBase/KeyGenerator.h b/arangod/VocBase/KeyGenerator.h index 2a6e3cdc7a..c9d8c32e41 100644 --- a/arangod/VocBase/KeyGenerator.h +++ b/arangod/VocBase/KeyGenerator.h @@ -88,6 +88,8 @@ class KeyGenerator { static KeyGenerator* factory(arangodb::velocypack::Slice const&); public: + virtual bool trackKeys() const = 0; + ////////////////////////////////////////////////////////////////////////////// /// @brief generate a key ////////////////////////////////////////////////////////////////////////////// @@ -160,6 +162,9 @@ class TraditionalKeyGenerator : public KeyGenerator { static bool validateKey(char const* key, size_t len); public: + + bool trackKeys() const override { return false; } + ////////////////////////////////////////////////////////////////////////////// /// @brief generate a key ////////////////////////////////////////////////////////////////////////////// @@ -213,6 +218,9 @@ class AutoIncrementKeyGenerator : public KeyGenerator { static bool validateKey(char const* key, size_t len); public: + + bool trackKeys() const override { return true; } + ////////////////////////////////////////////////////////////////////////////// /// @brief generate a key ////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/VocBase/LogicalCollection.cpp b/arangod/VocBase/LogicalCollection.cpp index 5db5a3eb57..4809f088ac 100644 --- a/arangod/VocBase/LogicalCollection.cpp +++ b/arangod/VocBase/LogicalCollection.cpp @@ -332,6 +332,7 @@ LogicalCollection::LogicalCollection( _lastCompactionStatus(nullptr), _lastCompactionStamp(0.0), _uncollectedLogfileEntries(0), + _isInitialIteration(false), _revisionError(false) { _keyGenerator.reset(KeyGenerator::factory(other.keyOptions())); @@ -397,6 +398,7 @@ LogicalCollection::LogicalCollection(TRI_vocbase_t* vocbase, _lastCompactionStatus(nullptr), _lastCompactionStamp(0.0), _uncollectedLogfileEntries(0), + _isInitialIteration(false), _revisionError(false) { if (!IsAllowedName(info)) { @@ -681,10 +683,6 @@ uint32_t LogicalCollection::internalVersion() const { return _internalVersion; } -TRI_voc_cid_t LogicalCollection::cid() const { - return _cid; -} - std::string LogicalCollection::cid_as_string() const { return basics::StringUtils::itoa(_cid); } @@ -834,10 +832,6 @@ VPackSlice LogicalCollection::keyOptions() const { return VPackSlice(_keyOptions->data()); } -arangodb::KeyGenerator* LogicalCollection::keyGenerator() const { - return _keyGenerator.get(); -} - // SECTION: Indexes uint32_t LogicalCollection::indexBuckets() const { return _indexBuckets; @@ -977,13 +971,18 @@ int LogicalCollection::rename(std::string const& newName) { } int LogicalCollection::close() { - // This was unload + // This was unload() in 3.0 auto primIdx = primaryIndex(); auto idxSize = primIdx->size(); if (!_isDeleted && _physical->initialCount() != static_cast(idxSize)) { _physical->updateCount(idxSize); + + // save new "count" value + StorageEngine* engine = EngineSelectorFeature::ENGINE; + bool const doSync = application_features::ApplicationServer::getFeature("Database")->forceSyncProperties(); + engine->changeCollection(_vocbase, _cid, this, doSync); } // We also have to unload the indexes. @@ -1012,8 +1011,11 @@ void LogicalCollection::drop() { engine->dropCollection(_vocbase, this); _isDeleted = true; - // save some memory by freeing the revisions cache + // save some memory by freeing the revisions cache and indexes + _keyGenerator.reset(); _revisionsCache.reset(); + _indexes.clear(); + _physical.reset(); } void LogicalCollection::setStatus(TRI_vocbase_col_status_e status) { @@ -1104,10 +1106,6 @@ void LogicalCollection::toVelocyPack(VPackBuilder& builder, bool includeIndexes, engine->getCollectionInfo(_vocbase, _cid, builder, includeIndexes, maxTick); } -TRI_vocbase_t* LogicalCollection::vocbase() const { - return _vocbase; -} - void LogicalCollection::increaseInternalVersion() { ++_internalVersion; } @@ -1251,6 +1249,13 @@ void LogicalCollection::open(bool ignoreErrors) { StorageEngine* engine = EngineSelectorFeature::ENGINE; engine->getCollectionInfo(_vocbase, cid(), builder, true, 0); + VPackSlice initialCount = builder.slice().get(std::vector({ "parameters", "count" })); + if (initialCount.isNumber()) { + int64_t count = initialCount.getNumber(); + if (count > 0) { + _physical->updateCount(count); + } + } double start = TRI_microtime(); LOG_TOPIC(TRACE, Logger::PERFORMANCE) @@ -1274,6 +1279,9 @@ void LogicalCollection::open(bool ignoreErrors) { << "iterate-markers { collection: " << _vocbase->name() << "/" << _name << " }"; + _revisionsCache->allowInvalidation(false); + _isInitialIteration = true; + // iterate over all markers of the collection res = getPhysical()->iterateMarkersOnLoad(&trx); @@ -1282,6 +1290,8 @@ void LogicalCollection::open(bool ignoreErrors) { if (res != TRI_ERROR_NO_ERROR) { THROW_ARANGO_EXCEPTION_MESSAGE(res, std::string("cannot iterate data of document collection: ") + TRI_errno_string(res)); } + + _isInitialIteration = false; // build the indexes meta-data, but do not fill the indexes yet { @@ -1310,6 +1320,8 @@ void LogicalCollection::open(bool ignoreErrors) { // build the index structures, and fill the indexes fillIndexes(&trx); } + + _revisionsCache->allowInvalidation(true); LOG_TOPIC(TRACE, Logger::PERFORMANCE) << "[timer] " << Logger::FIXED(TRI_microtime() - start) @@ -3520,17 +3532,17 @@ void LogicalCollection::newObjectForRemove( bool LogicalCollection::readRevision(Transaction* trx, ManagedDocumentResult& result, TRI_voc_rid_t revisionId) { TRI_ASSERT(_revisionsCache); - return _revisionsCache->lookupRevision(trx, result, revisionId, _status == TRI_VOC_COL_STATUS_LOADING); + return _revisionsCache->lookupRevision(trx, result, revisionId, !_isInitialIteration); } bool LogicalCollection::readRevisionConditional(Transaction* trx, ManagedDocumentResult& result, TRI_voc_rid_t revisionId, TRI_voc_tick_t maxTick, bool excludeWal) { TRI_ASSERT(_revisionsCache); - return _revisionsCache->lookupRevisionConditional(trx, result, revisionId, maxTick, excludeWal); + return _revisionsCache->lookupRevisionConditional(trx, result, revisionId, maxTick, excludeWal, true); } void LogicalCollection::insertRevision(TRI_voc_rid_t revisionId, uint8_t const* dataptr, TRI_voc_fid_t fid, bool isInWal) { // note: there is no need to insert into the cache here as the data points to temporary storage - getPhysical()->insertRevision(revisionId, dataptr, fid, isInWal); + getPhysical()->insertRevision(revisionId, dataptr, fid, isInWal, true); } void LogicalCollection::updateRevision(TRI_voc_rid_t revisionId, uint8_t const* dataptr, TRI_voc_fid_t fid, bool isInWal) { diff --git a/arangod/VocBase/LogicalCollection.h b/arangod/VocBase/LogicalCollection.h index e831c6a9e4..1ec160ddd6 100644 --- a/arangod/VocBase/LogicalCollection.h +++ b/arangod/VocBase/LogicalCollection.h @@ -105,6 +105,8 @@ class LogicalCollection { static bool IsAllowedName(bool isSystem, std::string const& name); void ensureRevisionsCache(); + + void isInitialIteration(bool value) { _isInitialIteration = value; } // TODO: MOVE TO PHYSICAL? bool isFullyCollected(); @@ -137,7 +139,10 @@ class LogicalCollection { uint32_t internalVersion() const; - TRI_voc_cid_t cid() const; + inline TRI_voc_cid_t cid() const { + return _cid; + } + std::string cid_as_string() const; TRI_voc_cid_t planId() const; @@ -214,7 +219,9 @@ class LogicalCollection { // Get a reference to this KeyGenerator. // Caller is not allowed to free it. - arangodb::KeyGenerator* keyGenerator() const; + inline arangodb::KeyGenerator* keyGenerator() const { + return _keyGenerator.get(); + } PhysicalCollection* getPhysical() const { return _physical.get(); } @@ -261,10 +268,9 @@ class LogicalCollection { /// The builder has to be an opened Type::Object void toVelocyPack(arangodb::velocypack::Builder&, bool, TRI_voc_tick_t); - TRI_vocbase_t* vocbase() const; - - // Only Local - void updateCount(size_t); + inline TRI_vocbase_t* vocbase() const { + return _vocbase; + } // Update this collection. virtual int update(arangodb::velocypack::Slice const&, bool); @@ -575,6 +581,11 @@ class LogicalCollection { double _lastCompactionStamp; std::atomic _uncollectedLogfileEntries; + + /// @brief: flag that is set to true when the documents are + /// initial enumerated and the primary index is built + bool _isInitialIteration; + bool _revisionError; }; diff --git a/arangod/VocBase/ManagedDocumentResult.h b/arangod/VocBase/ManagedDocumentResult.h index cfbd0947b8..8c9ec4a179 100644 --- a/arangod/VocBase/ManagedDocumentResult.h +++ b/arangod/VocBase/ManagedDocumentResult.h @@ -103,8 +103,12 @@ class ManagedDocumentResult { void addExisting(ChunkProtector& protector, TRI_voc_rid_t revisionId); bool hasSeenChunk(RevisionCacheChunk* chunk) const { return _chunkCache.contains(chunk); } - TRI_voc_rid_t lastRevisionId() const { return _lastRevisionId; } - uint8_t const* lastVPack() const { return _vpack; } + inline TRI_voc_rid_t lastRevisionId() const { return _lastRevisionId; } + + inline void setCache(TRI_voc_rid_t revisionId, uint8_t const* vpack) { + _lastRevisionId = revisionId; + _vpack = vpack; + } //void clear(); diff --git a/arangod/VocBase/PhysicalCollection.h b/arangod/VocBase/PhysicalCollection.h index 07862fa630..77c21ef31f 100644 --- a/arangod/VocBase/PhysicalCollection.h +++ b/arangod/VocBase/PhysicalCollection.h @@ -98,7 +98,7 @@ class PhysicalCollection { virtual uint8_t const* lookupRevisionVPack(TRI_voc_rid_t revisionId) const = 0; virtual uint8_t const* lookupRevisionVPackConditional(TRI_voc_rid_t revisionId, TRI_voc_tick_t maxTick, bool excludeWal) const = 0; - virtual void insertRevision(TRI_voc_rid_t revisionId, uint8_t const* dataptr, TRI_voc_fid_t fid, bool isInWal) = 0; + virtual void insertRevision(TRI_voc_rid_t revisionId, uint8_t const* dataptr, TRI_voc_fid_t fid, bool isInWal, bool shouldLock) = 0; virtual void updateRevision(TRI_voc_rid_t revisionId, uint8_t const* dataptr, TRI_voc_fid_t fid, bool isInWal) = 0; virtual bool updateRevisionConditional(TRI_voc_rid_t revisionId, TRI_df_marker_t const* oldPosition, TRI_df_marker_t const* newPosition, TRI_voc_fid_t newFid, bool isInWal) = 0; virtual void removeRevision(TRI_voc_rid_t revisionId, bool updateStats) = 0; diff --git a/arangod/VocBase/ReadCache.h b/arangod/VocBase/ReadCache.h index cda7cdaac9..4b7fa9a1c5 100644 --- a/arangod/VocBase/ReadCache.h +++ b/arangod/VocBase/ReadCache.h @@ -193,7 +193,7 @@ class ReadCache { ChunkProtector readAndLease(RevisionCacheEntry const&, ManagedDocumentResult& result); ChunkProtector insertAndLease(TRI_voc_rid_t revisionId, uint8_t const* vpack, ManagedDocumentResult& result); - + private: RevisionCacheChunkAllocator* _allocator; CollectionRevisionsCache* _collectionCache; diff --git a/arangod/VocBase/RevisionCacheChunk.cpp b/arangod/VocBase/RevisionCacheChunk.cpp index cdf997a994..8729919f72 100644 --- a/arangod/VocBase/RevisionCacheChunk.cpp +++ b/arangod/VocBase/RevisionCacheChunk.cpp @@ -134,7 +134,11 @@ uint32_t RevisionCacheChunk::advanceWritePosition(uint32_t size) { return offset; } -void RevisionCacheChunk::invalidate(std::vector& revisions) { +bool RevisionCacheChunk::invalidate(std::vector& revisions) { + if (!_collectionCache->allowInvalidation()) { + return false; + } + // wait until all writers have finished while (true) { { @@ -147,6 +151,8 @@ void RevisionCacheChunk::invalidate(std::vector& revisions) { } revisions.clear(); + revisions.reserve(8192); + findRevisions(revisions); invalidate(); if (!revisions.empty()) { @@ -154,6 +160,7 @@ void RevisionCacheChunk::invalidate(std::vector& revisions) { } // increase version number once again invalidate(); + return true; } void RevisionCacheChunk::findRevisions(std::vector& revisions) { diff --git a/arangod/VocBase/RevisionCacheChunk.h b/arangod/VocBase/RevisionCacheChunk.h index 81330cc758..266804e792 100644 --- a/arangod/VocBase/RevisionCacheChunk.h +++ b/arangod/VocBase/RevisionCacheChunk.h @@ -103,7 +103,7 @@ class RevisionCacheChunk { bool isUsed() noexcept; - void invalidate(std::vector& revisions); + bool invalidate(std::vector& revisions); void wipeout() { #ifdef TRI_ENABLE_MAINTAINER_MODE diff --git a/arangod/VocBase/RevisionCacheChunkAllocator.cpp b/arangod/VocBase/RevisionCacheChunkAllocator.cpp index 351a7895e7..01bb198f52 100644 --- a/arangod/VocBase/RevisionCacheChunkAllocator.cpp +++ b/arangod/VocBase/RevisionCacheChunkAllocator.cpp @@ -195,22 +195,19 @@ void RevisionCacheChunkAllocator::removeCollection(ReadCache* cache) { bool RevisionCacheChunkAllocator::garbageCollect() { RevisionCacheChunk* chunk = nullptr; + bool hasMemoryPressure; { WRITE_LOCKER(locker, _chunksLock); // LOG(ERR) << "gc: total allocated: " << _totalAllocated << ", target: " << _totalTargetSize; + hasMemoryPressure = (_totalAllocated >= _totalTargetSize); - if (_totalAllocated < _totalTargetSize) { - // nothing to do - // LOG(ERR) << "gc: not necessary"; - return false; - } - - // try a chunk from the freelist first - if (!_freeList.empty()) { + if (hasMemoryPressure && !_freeList.empty()) { + // try a chunk from the freelist first chunk = _freeList.back(); _freeList.pop_back(); - // fix statistics here already + + // fix statistics here already, because we already have the lock TRI_ASSERT(_totalAllocated >= chunk->size()); _totalAllocated -= chunk->size(); } @@ -265,45 +262,41 @@ bool RevisionCacheChunkAllocator::garbageCollect() { MUTEX_LOCKER(locker, _gcLock); auto it = _fullChunks.find(chunkInfo.cache); + if (it != _fullChunks.end()) { + (*it).second.erase(chunk); if ((*it).second.empty()) { _fullChunks.erase(chunkInfo.cache); - } else { - (*it).second.erase(chunk); - } + } } } - // now move chunk into freelist + // now free the chunk (don't move it into freelist so we + // can release the chunk's allocated memory here already) { WRITE_LOCKER(locker, _chunksLock); - try { - _freeList.push_back(chunk); - } catch (...) { - // if movement fails then we simply throw away the chunk - TRI_ASSERT(_totalAllocated >= chunk->size()); - _totalAllocated -= chunk->size(); - deleteChunk(chunk); - throw; - } + + TRI_ASSERT(_totalAllocated >= chunk->size()); + _totalAllocated -= chunk->size(); + deleteChunk(chunk); } return true; } - } else { + } else if (hasMemoryPressure) { // LOG(ERR) << "gc: invalidating chunk " << chunk; - revisions.reserve(8192); - chunk->invalidate(revisions); - // LOG(ERR) << "gc: invalidating chunk " << chunk << " done"; - MUTEX_LOCKER(locker, _gcLock); - - auto it = _fullChunks.find(chunkInfo.cache); + if (chunk->invalidate(revisions)) { + // LOG(ERR) << "gc: invalidating chunk " << chunk << " done"; + MUTEX_LOCKER(locker, _gcLock); + + auto it = _fullChunks.find(chunkInfo.cache); - if (it != _fullChunks.end()) { - auto it2 = (*it).second.find(chunk); - if (it2 != (*it).second.end()) { - // set to invalidated - (*it2).second = true; - worked = true; + if (it != _fullChunks.end()) { + auto it2 = (*it).second.find(chunk); + if (it2 != (*it).second.end()) { + // set to invalidated + (*it2).second = true; + worked = true; + } } } } diff --git a/lib/Basics/AssocMulti.h b/lib/Basics/AssocMulti.h index 958f154ffa..09eca78f24 100644 --- a/lib/Basics/AssocMulti.h +++ b/lib/Basics/AssocMulti.h @@ -547,7 +547,13 @@ class AssocMulti { newBucket[j].writeHashCache(0); } } - empty.emplace_back(newBucket); + try { + // shouldn't fail as enough space was reserved above, but let's be paranoid + empty.emplace_back(newBucket); + } catch (...) { + delete[] newBucket; + throw; + } } size_t i = 0; diff --git a/lib/Basics/AssocUnique.h b/lib/Basics/AssocUnique.h index 06af1df26e..50fec6d73c 100644 --- a/lib/Basics/AssocUnique.h +++ b/lib/Basics/AssocUnique.h @@ -330,7 +330,13 @@ class AssocUnique { for (size_t i = 0; i < _buckets.size(); ++i) { auto newBucket = new Element[static_cast(nrAlloc)](); - empty.emplace_back(newBucket); + try { + // shouldn't fail as enough space was reserved above, but let's be paranoid + empty.emplace_back(newBucket); + } catch (...) { + delete[] newBucket; + throw; + } } size_t i = 0; diff --git a/lib/Basics/StringRef.h b/lib/Basics/StringRef.h index 2cf6c82bc1..a810dc6b70 100644 --- a/lib/Basics/StringRef.h +++ b/lib/Basics/StringRef.h @@ -39,7 +39,7 @@ namespace arangodb { class StringRef { public: /// @brief create an empty StringRef - StringRef() : _data(""), _length(0) {} + constexpr StringRef() : _data(""), _length(0) {} /// @brief create a StringRef from an std::string explicit StringRef(std::string const& str) : _data(str.c_str()), _length(str.size()) {} diff --git a/lib/V8/v8-vpack.cpp b/lib/V8/v8-vpack.cpp index 1f9170536f..cade6cfc71 100644 --- a/lib/V8/v8-vpack.cpp +++ b/lib/V8/v8-vpack.cpp @@ -84,6 +84,7 @@ static v8::Handle ObjectVPackObject(v8::Isolate* isolate, v8::Local sub; if (v.isString()) { char const* p = v.getString(l); + // value of _key, _id, _from, _to, and _rev is ASCII too sub = TRI_V8_ASCII_PAIR_STRING(p, l); } else { sub = TRI_VPackToV8(isolate, v, options, &slice);