From 1fc3480a6b4df92a79cccbebb213aec5ff1d5270 Mon Sep 17 00:00:00 2001 From: Dan Larkin Date: Thu, 11 May 2017 14:39:00 -0400 Subject: [PATCH] Added document cache. --- arangod/RocksDBEngine/RocksDBCollection.cpp | 112 +++++++++++++++++- arangod/RocksDBEngine/RocksDBCollection.h | 14 +++ arangod/RocksDBEngine/RocksDBIndex.cpp | 2 +- arangod/RocksDBEngine/RocksDBPrimaryIndex.cpp | 9 +- 4 files changed, 126 insertions(+), 11 deletions(-) diff --git a/arangod/RocksDBEngine/RocksDBCollection.cpp b/arangod/RocksDBEngine/RocksDBCollection.cpp index 535a755e9f..2cc8ab8474 100644 --- a/arangod/RocksDBEngine/RocksDBCollection.cpp +++ b/arangod/RocksDBEngine/RocksDBCollection.cpp @@ -27,6 +27,10 @@ #include "Basics/StaticStrings.h" #include "Basics/VelocyPackHelper.h" #include "Basics/WriteLocker.h" +#include "Cache/CacheManagerFeature.h" +#include "Cache/TransactionalCache.h" +#include "Cache/Common.h" +#include "Cache/Manager.h" #include "Cluster/ClusterMethods.h" #include "Cluster/CollectionLockState.h" #include "Indexes/Index.h" @@ -82,9 +86,15 @@ RocksDBCollection::RocksDBCollection(LogicalCollection* collection, _objectId(basics::VelocyPackHelper::stringUInt64(info, "objectId")), _numberDocuments(0), _revisionId(0), - _hasGeoIndex(false) { + _hasGeoIndex(false), + _cache(nullptr), + _cachePresent(false), + _useCache(true) { addCollectionMapping(_objectId, _logicalCollection->vocbase()->id(), _logicalCollection->cid()); + if (_useCache) { + createCache(); + } } RocksDBCollection::RocksDBCollection(LogicalCollection* collection, @@ -93,12 +103,27 @@ RocksDBCollection::RocksDBCollection(LogicalCollection* collection, _objectId(static_cast(physical)->_objectId), _numberDocuments(0), _revisionId(0), - _hasGeoIndex(false) { + _hasGeoIndex(false), + _cache(nullptr), + _cachePresent(false), + _useCache(true) { addCollectionMapping(_objectId, _logicalCollection->vocbase()->id(), _logicalCollection->cid()); + if (_useCache) { + createCache(); + } } -RocksDBCollection::~RocksDBCollection() {} +RocksDBCollection::~RocksDBCollection() { + if (useCache()) { + try { + TRI_ASSERT(_cache != nullptr); + TRI_ASSERT(CacheManagerFeature::MANAGER != nullptr); + CacheManagerFeature::MANAGER->destroyCache(_cache); + } catch (...) { + } + } +} std::string const& RocksDBCollection::path() const { return Empty; // we do not have any path @@ -611,6 +636,8 @@ void RocksDBCollection::truncate(transaction::Methods* trx, VPackSlice(iter->value().data()).get(StaticStrings::KeyString); TRI_ASSERT(key.isString()); + blackListKey(iter->key().data(), static_cast(iter->key().size())); + // add possible log statement state->prepareOperation(cid, revisionId, StringRef(key), TRI_VOC_DOCUMENT_OPERATION_REMOVE); @@ -1370,6 +1397,8 @@ RocksDBOperationResult RocksDBCollection::insertDocument( RocksDBKey key(RocksDBKey::Document(_objectId, revisionId)); RocksDBValue value(RocksDBValue::Document(doc)); + blackListKey(key.string().data(), static_cast(key.string().size())); + rocksdb::Transaction* rtrx = rocksTransaction(trx); rocksdb::Status status = rtrx->Put(key.string(), value.string()); @@ -1432,6 +1461,8 @@ RocksDBOperationResult RocksDBCollection::removeDocument( auto key = RocksDBKey::Document(_objectId, revisionId); + blackListKey(key.string().data(), static_cast(key.string().size())); + rocksdb::Transaction* rtrx = rocksTransaction(trx); rtrx->PutLogData(RocksDBLogValue::DocumentRemove( @@ -1540,12 +1571,38 @@ arangodb::Result RocksDBCollection::lookupRevisionVPack( auto key = RocksDBKey::Document(_objectId, revisionId); std::string value; + + if (useCache()) { + TRI_ASSERT(_cache != nullptr); + // check cache first for fast path + auto f = _cache->find(key.string().data(), + static_cast(key.string().size())); + if (f.found()) { + value.append(reinterpret_cast(f.value()->value()), + static_cast(f.value()->valueSize)); + mdr.setManaged(std::move(value), revisionId); + return {TRI_ERROR_NO_ERROR}; + } + } + auto* state = toRocksTransactionState(trx); rocksdb::Status status = state->rocksTransaction()->Get(state->readOptions(), key.string(), &value); TRI_ASSERT(value.data()); auto result = convertStatus(status); if (result.ok()) { + if (useCache()) { + TRI_ASSERT(_cache != nullptr); + // write entry back to cache + auto entry = cache::CachedValue::construct( + key.string().data(), static_cast(key.string().size()), + value.data(), static_cast(value.size())); + bool cached = _cache->insert(entry); + if (!cached) { + delete entry; + } + } + mdr.setManaged(std::move(value), revisionId); } else { mdr.reset(); @@ -1732,3 +1789,52 @@ void RocksDBCollection::estimateSize(velocypack::Builder& builder) { builder.add("total", VPackValue(total)); builder.close(); } + +void RocksDBCollection::createCache() const { + if (!_useCache || _cachePresent) { + // we leave this if we do not need the cache + // or if cache already created + return; + } + + TRI_ASSERT(_useCache); + TRI_ASSERT(_cache.get() == nullptr); + TRI_ASSERT(CacheManagerFeature::MANAGER != nullptr); + _cache = CacheManagerFeature::MANAGER->createCache( + cache::CacheType::Transactional); + _cachePresent = (_cache.get() != nullptr); + TRI_ASSERT(_useCache); +} + +void RocksDBCollection::disableCache() const { + if (!_cachePresent) { + return; + } + TRI_ASSERT(CacheManagerFeature::MANAGER != nullptr); + // must have a cache... + TRI_ASSERT(_useCache); + TRI_ASSERT(_cachePresent); + TRI_ASSERT(_cache.get() != nullptr); + CacheManagerFeature::MANAGER->destroyCache(_cache); + _cache.reset(); + _cachePresent = false; + TRI_ASSERT(_useCache); +} + +// blacklist given key from transactional cache +void RocksDBCollection::blackListKey(char const* data, std::size_t len) const { + if (useCache()) { + TRI_ASSERT(_cache != nullptr); + bool blacklisted = false; + uint64_t attempts = 0; + while (!blacklisted) { + blacklisted = _cache->blacklist(data,len); + if (attempts++ % 10 == 0) { + if (_cache->isShutdown()) { + disableCache(); + break; + } + } + } + } +} diff --git a/arangod/RocksDBEngine/RocksDBCollection.h b/arangod/RocksDBEngine/RocksDBCollection.h index b6050aa6da..4fb62c03b3 100644 --- a/arangod/RocksDBEngine/RocksDBCollection.h +++ b/arangod/RocksDBEngine/RocksDBCollection.h @@ -33,6 +33,9 @@ #include "VocBase/ManagedDocumentResult.h" namespace arangodb { +namespace cache { + class Cache; +} class LogicalCollection; class ManagedDocumentResult; class Result; @@ -217,6 +220,11 @@ class RocksDBCollection final : public PhysicalCollection { arangodb::Result lookupRevisionVPack(TRI_voc_rid_t, transaction::Methods*, arangodb::ManagedDocumentResult&) const; + void createCache() const; + void disableCache() const; + inline bool useCache() const { return (_useCache && _cachePresent); } + void blackListKey(char const* data, std::size_t len) const; + private: uint64_t const _objectId; // rocksdb-specific object id for collection std::atomic _numberDocuments; @@ -225,6 +233,12 @@ class RocksDBCollection final : public PhysicalCollection { /// upgrade write locks to exclusive locks if this flag is set bool _hasGeoIndex; basics::ReadWriteLock _exclusiveLock; + + mutable std::shared_ptr _cache; + // we use this boolean for testing whether _cache is set. + // it's quicker than accessing the shared_ptr each time + mutable bool _cachePresent; + bool _useCache; }; inline RocksDBCollection* toRocksDBCollection(PhysicalCollection* physical) { diff --git a/arangod/RocksDBEngine/RocksDBIndex.cpp b/arangod/RocksDBEngine/RocksDBIndex.cpp index efa2821cc3..7e9aaedc34 100644 --- a/arangod/RocksDBEngine/RocksDBIndex.cpp +++ b/arangod/RocksDBEngine/RocksDBIndex.cpp @@ -58,7 +58,7 @@ RocksDBIndex::RocksDBIndex( } RocksDBIndex::RocksDBIndex(TRI_idx_iid_t id, LogicalCollection* collection, - VPackSlice const& info,bool useCache) + VPackSlice const& info, bool useCache) : Index(id, collection, info), _objectId(basics::VelocyPackHelper::stringUInt64(info.get("objectId"))), _cmp(static_cast(EngineSelectorFeature::ENGINE)->cmp()), diff --git a/arangod/RocksDBEngine/RocksDBPrimaryIndex.cpp b/arangod/RocksDBEngine/RocksDBPrimaryIndex.cpp index 5dda4199a4..7b101e47df 100644 --- a/arangod/RocksDBEngine/RocksDBPrimaryIndex.cpp +++ b/arangod/RocksDBEngine/RocksDBPrimaryIndex.cpp @@ -297,14 +297,9 @@ RocksDBPrimaryIndex::RocksDBPrimaryIndex( {{arangodb::basics::AttributeName( StaticStrings::KeyString, false)}}), true, false, - basics::VelocyPackHelper::stringUInt64(info, "objectId") - ,!ServerState::instance()->isCoordinator() /*useCache*/ - ) { + basics::VelocyPackHelper::stringUInt64(info, "objectId"), + !ServerState::instance()->isCoordinator() /*useCache*/) { TRI_ASSERT(_objectId != 0); - if (_objectId == 0 ) { - //disableCache - _useCache = false; - } } RocksDBPrimaryIndex::~RocksDBPrimaryIndex() {}