From db2cabf8445f07a29a9c9bbf6afdb4f941cfb1e7 Mon Sep 17 00:00:00 2001 From: Dan Larkin Date: Mon, 6 Mar 2017 10:01:24 -0500 Subject: [PATCH] Completed implementation of transactional cache. --- UnitTests/CMakeLists.txt | 9 +- UnitTests/Cache/FrequencyBuffer.cpp | 68 ++- UnitTests/Cache/Manager.cpp | 20 +- UnitTests/Cache/Metadata.cpp | 34 +- UnitTests/Cache/PlainCache.cpp | 2 +- UnitTests/Cache/Rebalancer.cpp | 147 ++++- ...ctionWindow.cpp => TransactionManager.cpp} | 58 +- UnitTests/Cache/TransactionalBucket.cpp | 2 +- UnitTests/Cache/TransactionalCache.cpp | 430 +++++++++++++++ UnitTests/Cache/TransactionalStore.cpp | 277 ++++++++++ UnitTests/Cache/TransactionalStore.h | 98 ++++ .../Cache/TransactionsWithBackingStore.cpp | 487 +++++++++++++++++ arangod/CMakeLists.txt | 3 +- arangod/Cache/Cache.cpp | 93 ++-- arangod/Cache/Cache.h | 31 +- arangod/Cache/FrequencyBuffer.h | 48 +- arangod/Cache/Manager.cpp | 185 ++++--- arangod/Cache/Manager.h | 36 +- arangod/Cache/Metadata.cpp | 15 +- arangod/Cache/Metadata.h | 8 +- arangod/Cache/PlainBucket.cpp | 10 +- arangod/Cache/PlainBucket.h | 9 +- arangod/Cache/PlainCache.cpp | 111 ++-- arangod/Cache/PlainCache.h | 22 +- ...{TransactionWindow.cpp => Transaction.cpp} | 20 +- arangod/Cache/Transaction.h | 47 ++ arangod/Cache/TransactionManager.cpp | 83 +++ ...ansactionWindow.h => TransactionManager.h} | 32 +- arangod/Cache/TransactionalBucket.cpp | 57 +- arangod/Cache/TransactionalBucket.h | 133 ++++- arangod/Cache/TransactionalCache.cpp | 504 ++++++++++++++++-- arangod/Cache/TransactionalCache.h | 80 ++- 32 files changed, 2757 insertions(+), 402 deletions(-) rename UnitTests/Cache/{TransactionWindow.cpp => TransactionManager.cpp} (64%) create mode 100644 UnitTests/Cache/TransactionalCache.cpp create mode 100644 UnitTests/Cache/TransactionalStore.cpp create mode 100644 UnitTests/Cache/TransactionalStore.h create mode 100644 UnitTests/Cache/TransactionsWithBackingStore.cpp rename arangod/Cache/{TransactionWindow.cpp => Transaction.cpp} (75%) create mode 100644 arangod/Cache/Transaction.h create mode 100644 arangod/Cache/TransactionManager.cpp rename arangod/Cache/{TransactionWindow.h => TransactionManager.h} (69%) diff --git a/UnitTests/CMakeLists.txt b/UnitTests/CMakeLists.txt index a71f02f0d1..fe2b490055 100644 --- a/UnitTests/CMakeLists.txt +++ b/UnitTests/CMakeLists.txt @@ -66,7 +66,10 @@ add_executable(${TEST_CACHE_SUITE} Cache/Rebalancer.cpp Cache/State.cpp Cache/TransactionalBucket.cpp - Cache/TransactionWindow.cpp + Cache/TransactionalCache.cpp + Cache/TransactionalStore.cpp + Cache/TransactionManager.cpp + Cache/TransactionsWithBackingStore.cpp ../lib/Basics/WorkMonitorDummy.cpp ../arangod/Cache/Cache.cpp ../arangod/Cache/CacheManagerFeatureThreads.cpp @@ -78,9 +81,10 @@ add_executable(${TEST_CACHE_SUITE} ../arangod/Cache/PlainCache.cpp ../arangod/Cache/Rebalancer.cpp ../arangod/Cache/State.cpp + ../arangod/Cache/Transaction.cpp ../arangod/Cache/TransactionalBucket.cpp ../arangod/Cache/TransactionalCache.cpp - ../arangod/Cache/TransactionWindow.cpp + ../arangod/Cache/TransactionManager.cpp ) include_directories(${TEST_CACHE_SUITE} @@ -90,6 +94,7 @@ include_directories(${TEST_CACHE_SUITE} target_link_libraries(${TEST_CACHE_SUITE} ${LIB_ARANGO} ${MSVC_LIBS} + ${ROCKSDB_LIBS} boost_system boost_boost ${SYSTEM_LIBRARIES} diff --git a/UnitTests/Cache/FrequencyBuffer.cpp b/UnitTests/Cache/FrequencyBuffer.cpp index 25808bd8e6..54f9510367 100644 --- a/UnitTests/Cache/FrequencyBuffer.cpp +++ b/UnitTests/Cache/FrequencyBuffer.cpp @@ -33,6 +33,9 @@ #include "Cache/FrequencyBuffer.h" #include +#include + +#include using namespace arangodb::cache; @@ -70,7 +73,9 @@ BOOST_AUTO_TEST_CASE(tst_uint8_t) { BOOST_CHECK(uint8_t() == zero); FrequencyBuffer buffer(8); - BOOST_CHECK_EQUAL(buffer.memoryUsage(), sizeof(FrequencyBuffer) + 8); + BOOST_CHECK_EQUAL( + buffer.memoryUsage(), + sizeof(FrequencyBuffer) + sizeof(std::vector) + 8); for (size_t i = 0; i < 4; i++) { buffer.insertRecord(two); @@ -80,52 +85,73 @@ BOOST_AUTO_TEST_CASE(tst_uint8_t) { } auto frequencies = buffer.getFrequencies(); - BOOST_CHECK_EQUAL(2ULL, frequencies->size()); + BOOST_CHECK_EQUAL(static_cast(2), frequencies->size()); BOOST_CHECK_EQUAL(one, (*frequencies)[0].first); - BOOST_CHECK_EQUAL(2ULL, (*frequencies)[0].second); + BOOST_CHECK_EQUAL(static_cast(2), (*frequencies)[0].second); BOOST_CHECK_EQUAL(two, (*frequencies)[1].first); - BOOST_CHECK_EQUAL(4ULL, (*frequencies)[1].second); + BOOST_CHECK_EQUAL(static_cast(4), (*frequencies)[1].second); for (size_t i = 0; i < 8; i++) { buffer.insertRecord(one); } frequencies = buffer.getFrequencies(); - BOOST_CHECK_EQUAL(1ULL, frequencies->size()); + BOOST_CHECK_EQUAL(static_cast(1), frequencies->size()); BOOST_CHECK_EQUAL(one, (*frequencies)[0].first); - BOOST_CHECK_EQUAL(8ULL, (*frequencies)[0].second); + BOOST_CHECK_EQUAL(static_cast(8), (*frequencies)[0].second); } //////////////////////////////////////////////////////////////////////////////// -/// @brief test behavior with pointers +/// @brief test behavior with shared_ptr //////////////////////////////////////////////////////////////////////////////// BOOST_AUTO_TEST_CASE(tst_pointers) { - uint8_t* zero = nullptr; - uint8_t one = 1; - uint8_t two = 2; + struct cmp_weak_ptr { + bool operator()(std::weak_ptr const& left, + std::weak_ptr const& right) const { + return !left.owner_before(right) && !right.owner_before(left); + } + }; + + struct hash_weak_ptr { + size_t operator()(std::weak_ptr const& wp) const { + auto sp = wp.lock(); + return std::hash()(sp); + } + }; + + typedef FrequencyBuffer, cmp_weak_ptr, hash_weak_ptr> + BufferType; + + std::shared_ptr p0(nullptr); // check that default construction is as expected - typedef uint8_t* smallptr; - BOOST_CHECK(smallptr() == zero); + BOOST_CHECK(std::shared_ptr() == p0); - FrequencyBuffer buffer(8); + std::shared_ptr p1(new int()); + *p1 = static_cast(1); + std::shared_ptr p2(new int()); + *p2 = static_cast(2); + + BufferType buffer(8); BOOST_CHECK_EQUAL(buffer.memoryUsage(), - sizeof(FrequencyBuffer) + (8 * sizeof(uint8_t*))); + sizeof(BufferType) + + sizeof(std::vector>) + + (8 * sizeof(std::weak_ptr))); for (size_t i = 0; i < 4; i++) { - buffer.insertRecord(&two); + buffer.insertRecord(p1); } for (size_t i = 0; i < 2; i++) { - buffer.insertRecord(&one); + buffer.insertRecord(p2); } auto frequencies = buffer.getFrequencies(); - BOOST_CHECK_EQUAL(2ULL, frequencies->size()); - BOOST_CHECK_EQUAL(&one, (*frequencies)[0].first); - BOOST_CHECK_EQUAL(2ULL, (*frequencies)[0].second); - BOOST_CHECK_EQUAL(&two, (*frequencies)[1].first); - BOOST_CHECK_EQUAL(4ULL, (*frequencies)[1].second); + BOOST_CHECK_EQUAL(static_cast(2), frequencies->size()); + BOOST_CHECK(p2 == (*frequencies)[0].first.lock()); + BOOST_CHECK_EQUAL(static_cast(2), (*frequencies)[0].second); + BOOST_CHECK(p1 == (*frequencies)[1].first.lock()); + BOOST_CHECK_EQUAL(static_cast(4), (*frequencies)[1].second); } //////////////////////////////////////////////////////////////////////////////// diff --git a/UnitTests/Cache/Manager.cpp b/UnitTests/Cache/Manager.cpp index 4d91af4025..3f6e68da3b 100644 --- a/UnitTests/Cache/Manager.cpp +++ b/UnitTests/Cache/Manager.cpp @@ -102,8 +102,13 @@ BOOST_AUTO_TEST_CASE(tst_mixed_load) { size_t threadCount = 4; std::vector> caches; for (size_t i = 0; i < cacheCount; i++) { - caches.emplace_back( - manager.createCache(Manager::CacheType::Plain, initialSize, true)); + auto res = + /*manager.createCache(((i % 2 == 0) ? Manager::CacheType::Plain + : Manager::CacheType::Transactional), + initialSize, true);*/ + manager.createCache(Manager::CacheType::Plain, initialSize, true); + TRI_ASSERT(res); + caches.emplace_back(res); } uint64_t chunkSize = 4 * 1024 * 1024; @@ -211,11 +216,16 @@ BOOST_AUTO_TEST_CASE(tst_lifecycle_chaos) { std::queue> caches; for (uint64_t i = 0; i < operationCount; i++) { - uint32_t r = RandomGenerator::interval(static_cast(1UL)); + uint32_t r = RandomGenerator::interval(static_cast(1)); switch (r) { case 0: { - caches.emplace(manager.createCache(Manager::CacheType::Plain, - initialSize, true)); + auto res = manager.createCache( + (i % 2 == 0) ? Manager::CacheType::Plain + : Manager::CacheType::Transactional, + initialSize, true); + if (res) { + caches.emplace(res); + } } case 1: default: { diff --git a/UnitTests/Cache/Metadata.cpp b/UnitTests/Cache/Metadata.cpp index 47a2337dc1..f111952c93 100644 --- a/UnitTests/Cache/Metadata.cpp +++ b/UnitTests/Cache/Metadata.cpp @@ -61,14 +61,8 @@ BOOST_FIXTURE_TEST_SUITE(CCacheMetadataTest, CCacheMetadataSetup) //////////////////////////////////////////////////////////////////////////////// BOOST_AUTO_TEST_CASE(tst_constructor) { - uint64_t dummy; - std::shared_ptr dummyCache(reinterpret_cast(&dummy), - [](Cache* p) -> void {}); - uint8_t dummyTable; - uint32_t logSize = 1; uint64_t limit = 1024; - - Metadata metadata(dummyCache, limit, &dummyTable, logSize); + Metadata metadata(limit); } //////////////////////////////////////////////////////////////////////////////// @@ -79,26 +73,19 @@ BOOST_AUTO_TEST_CASE(tst_getters) { uint64_t dummy; std::shared_ptr dummyCache(reinterpret_cast(&dummy), [](Cache* p) -> void {}); - uint8_t dummyTable; - uint32_t logSize = 1; uint64_t limit = 1024; - Metadata metadata(dummyCache, limit, &dummyTable, logSize); + Metadata metadata(limit); + metadata.link(dummyCache); metadata.lock(); BOOST_CHECK(dummyCache == metadata.cache()); - BOOST_CHECK_EQUAL(logSize, metadata.logSize()); - BOOST_CHECK_EQUAL(0UL, metadata.auxiliaryLogSize()); - BOOST_CHECK_EQUAL(limit, metadata.softLimit()); BOOST_CHECK_EQUAL(limit, metadata.hardLimit()); BOOST_CHECK_EQUAL(0UL, metadata.usage()); - BOOST_CHECK(&dummyTable == metadata.table()); - BOOST_CHECK(nullptr == metadata.auxiliaryTable()); - metadata.unlock(); } @@ -107,14 +94,9 @@ BOOST_AUTO_TEST_CASE(tst_getters) { //////////////////////////////////////////////////////////////////////////////// BOOST_AUTO_TEST_CASE(tst_usage_limits) { - uint64_t dummy; - std::shared_ptr dummyCache(reinterpret_cast(&dummy), - [](Cache* p) -> void {}); - uint8_t dummyTable; - uint32_t logSize = 1; bool success; - Metadata metadata(dummyCache, 1024ULL, &dummyTable, logSize); + Metadata metadata(1024ULL); metadata.lock(); @@ -158,19 +140,19 @@ BOOST_AUTO_TEST_CASE(tst_usage_limits) { //////////////////////////////////////////////////////////////////////////////// BOOST_AUTO_TEST_CASE(tst_migration) { - uint64_t dummy; - std::shared_ptr dummyCache(reinterpret_cast(&dummy), - [](Cache* p) -> void {}); uint8_t dummyTable; uint8_t dummyAuxiliaryTable; uint32_t logSize = 1; uint32_t auxiliaryLogSize = 2; uint64_t limit = 1024; - Metadata metadata(dummyCache, limit, &dummyTable, logSize); + Metadata metadata(limit); metadata.lock(); + metadata.grantAuxiliaryTable(&dummyTable, logSize); + metadata.swapTables(); + metadata.grantAuxiliaryTable(&dummyAuxiliaryTable, auxiliaryLogSize); BOOST_CHECK_EQUAL(auxiliaryLogSize, metadata.auxiliaryLogSize()); BOOST_CHECK(&dummyAuxiliaryTable == metadata.auxiliaryTable()); diff --git a/UnitTests/Cache/PlainCache.cpp b/UnitTests/Cache/PlainCache.cpp index fc48146b81..f76c6711a8 100644 --- a/UnitTests/Cache/PlainCache.cpp +++ b/UnitTests/Cache/PlainCache.cpp @@ -1,5 +1,5 @@ //////////////////////////////////////////////////////////////////////////////// -/// @brief test suite for arangodb::cache::PlainBucket +/// @brief test suite for arangodb::cache::PlainCache /// /// @file /// diff --git a/UnitTests/Cache/Rebalancer.cpp b/UnitTests/Cache/Rebalancer.cpp index 65eff5478b..3a8ee0371f 100644 --- a/UnitTests/Cache/Rebalancer.cpp +++ b/UnitTests/Cache/Rebalancer.cpp @@ -1,5 +1,5 @@ //////////////////////////////////////////////////////////////////////////////// -/// @brief test suite for arangodb::cache::Manager +/// @brief test suite for arangodb::cache::Rebalancer /// /// @file /// @@ -34,6 +34,8 @@ #include "Cache/Manager.h" #include "Cache/PlainCache.h" #include "Cache/Rebalancer.h" +#include "Cache/Transaction.h" +#include "Cache/TransactionalCache.h" #include "MockScheduler.h" @@ -66,10 +68,10 @@ struct CCacheRebalancerSetup { BOOST_FIXTURE_TEST_SUITE(CCacheRebalancerTest, CCacheRebalancerSetup) //////////////////////////////////////////////////////////////////////////////// -/// @brief test rebalancing (multi-threaded) +/// @brief test rebalancing plain caches (multi-threaded) //////////////////////////////////////////////////////////////////////////////// -BOOST_AUTO_TEST_CASE(tst_rebalancing) { +BOOST_AUTO_TEST_CASE(tst_rebalancing_plain) { uint64_t initialSize = 16ULL * 1024ULL; RandomGenerator::initialize(RandomGenerator::RandomType::MERSENNE); MockScheduler scheduler(4); @@ -190,6 +192,145 @@ BOOST_AUTO_TEST_CASE(tst_rebalancing) { RandomGenerator::shutdown(); } +//////////////////////////////////////////////////////////////////////////////// +/// @brief test rebalancing transactional caches (multi-threaded) +//////////////////////////////////////////////////////////////////////////////// + +BOOST_AUTO_TEST_CASE(tst_rebalancing_transactional) { + uint64_t initialSize = 16ULL * 1024ULL; + RandomGenerator::initialize(RandomGenerator::RandomType::MERSENNE); + MockScheduler scheduler(4); + Manager manager(scheduler.ioService(), 128ULL * 1024ULL * 1024ULL); + Rebalancer rebalancer(&manager); + + size_t cacheCount = 4; + size_t threadCount = 4; + std::vector> caches; + for (size_t i = 0; i < cacheCount; i++) { + caches.emplace_back(manager.createCache(Manager::CacheType::Transactional, + initialSize, true)); + } + + bool doneRebalancing = false; + auto rebalanceWorker = [&rebalancer, &doneRebalancing]() -> void { + while (!doneRebalancing) { + bool rebalanced = rebalancer.rebalance(); + if (rebalanced) { + usleep(500 * 1000); + } else { + usleep(100); + } + } + }; + auto rebalancerThread = new std::thread(rebalanceWorker); + + uint64_t chunkSize = 4 * 1024 * 1024; + uint64_t initialInserts = 1 * 1024 * 1024; + uint64_t operationCount = 4 * 1024 * 1024; + std::atomic hitCount(0); + std::atomic missCount(0); + auto worker = [&manager, &caches, cacheCount, initialInserts, operationCount, + &hitCount, + &missCount](uint64_t lower, uint64_t upper) -> void { + Transaction* tx = manager.beginTransaction(false); + // fill with some initial data + for (uint64_t i = 0; i < initialInserts; i++) { + uint64_t item = lower + i; + size_t cacheIndex = item % cacheCount; + CachedValue* value = CachedValue::construct(&item, sizeof(uint64_t), + &item, sizeof(uint64_t)); + bool ok = caches[cacheIndex]->insert(value); + if (!ok) { + delete value; + } + } + + // initialize valid range for keys that *might* be in cache + uint64_t validLower = lower; + uint64_t validUpper = lower + initialInserts - 1; + uint64_t blacklistUpper = validUpper; + + // commence mixed workload + for (uint64_t i = 0; i < operationCount; i++) { + uint32_t r = RandomGenerator::interval(static_cast(99UL)); + + if (r >= 99) { // remove something + if (validLower == validUpper) { + continue; // removed too much + } + + uint64_t item = validLower++; + size_t cacheIndex = item % cacheCount; + + caches[cacheIndex]->remove(&item, sizeof(uint64_t)); + } else if (r >= 90) { // insert something + if (validUpper == upper) { + continue; // already maxed out range + } + + uint64_t item = ++validUpper; + if (validUpper > blacklistUpper) { + blacklistUpper = validUpper; + } + size_t cacheIndex = item % cacheCount; + CachedValue* value = CachedValue::construct(&item, sizeof(uint64_t), + &item, sizeof(uint64_t)); + bool ok = caches[cacheIndex]->insert(value); + if (!ok) { + delete value; + } + } else if (r >= 80) { // blacklist something + if (blacklistUpper == upper) { + continue; // already maxed out range + } + + uint64_t item = ++blacklistUpper; + size_t cacheIndex = item % cacheCount; + caches[cacheIndex]->blacklist(&item, sizeof(uint64_t)); + } else { // lookup something + uint64_t item = RandomGenerator::interval( + static_cast(validLower), static_cast(validUpper)); + size_t cacheIndex = item % cacheCount; + + Cache::Finding f = caches[cacheIndex]->find(&item, sizeof(uint64_t)); + if (f.found()) { + hitCount++; + TRI_ASSERT(f.value() != nullptr); + TRI_ASSERT(f.value()->sameKey(&item, sizeof(uint64_t))); + } else { + missCount++; + TRI_ASSERT(f.value() == nullptr); + } + } + } + manager.endTransaction(tx); + }; + + std::vector threads; + // dispatch threads + for (size_t i = 0; i < threadCount; i++) { + uint64_t lower = i * chunkSize; + uint64_t upper = ((i + 1) * chunkSize) - 1; + threads.push_back(new std::thread(worker, lower, upper)); + } + + // join threads + for (auto t : threads) { + t->join(); + delete t; + } + + doneRebalancing = true; + rebalancerThread->join(); + delete rebalancerThread; + + for (auto cache : caches) { + manager.destroyCache(cache); + } + + RandomGenerator::shutdown(); +} + //////////////////////////////////////////////////////////////////////////////// /// @brief generate tests //////////////////////////////////////////////////////////////////////////////// diff --git a/UnitTests/Cache/TransactionWindow.cpp b/UnitTests/Cache/TransactionManager.cpp similarity index 64% rename from UnitTests/Cache/TransactionWindow.cpp rename to UnitTests/Cache/TransactionManager.cpp index 94f85acab1..21c021c9a3 100644 --- a/UnitTests/Cache/TransactionWindow.cpp +++ b/UnitTests/Cache/TransactionManager.cpp @@ -1,5 +1,5 @@ //////////////////////////////////////////////////////////////////////////////// -/// @brief test suite for arangodb::cache::TransactionWindow +/// @brief test suite for arangodb::cache::TransactionManager /// /// @file /// @@ -30,7 +30,8 @@ #define BOOST_TEST_INCLUDED #include -#include "Cache/TransactionWindow.h" +#include "Cache/Transaction.h" +#include "Cache/TransactionManager.h" #include #include @@ -41,13 +42,13 @@ using namespace arangodb::cache; // --SECTION-- setup / tear-down // ----------------------------------------------------------------------------- -struct CCacheTransactionWindowSetup { - CCacheTransactionWindowSetup() { - BOOST_TEST_MESSAGE("setup TransactionWindow"); +struct CCacheTransactionManagerSetup { + CCacheTransactionManagerSetup() { + BOOST_TEST_MESSAGE("setup TransactionManager"); } - ~CCacheTransactionWindowSetup() { - BOOST_TEST_MESSAGE("tear-down TransactionWindow"); + ~CCacheTransactionManagerSetup() { + BOOST_TEST_MESSAGE("tear-down TransactionManager"); } }; // ----------------------------------------------------------------------------- @@ -58,31 +59,56 @@ struct CCacheTransactionWindowSetup { /// @brief setup //////////////////////////////////////////////////////////////////////////////// -BOOST_FIXTURE_TEST_SUITE(CCacheTransactionWindowTest, - CCacheTransactionWindowSetup) +BOOST_FIXTURE_TEST_SUITE(CCacheTransactionManagerTest, + CCacheTransactionManagerSetup) //////////////////////////////////////////////////////////////////////////////// /// @brief test transaction term management //////////////////////////////////////////////////////////////////////////////// BOOST_AUTO_TEST_CASE(tst_transaction_term) { - TransactionWindow transactions; + TransactionManager transactions; + Transaction* tx1; + Transaction* tx2; + Transaction* tx3; BOOST_CHECK_EQUAL(0ULL, transactions.term()); - transactions.start(); + tx1 = transactions.begin(false); BOOST_CHECK_EQUAL(1ULL, transactions.term()); - transactions.end(); + transactions.end(tx1); BOOST_CHECK_EQUAL(2ULL, transactions.term()); - transactions.start(); + tx1 = transactions.begin(false); BOOST_CHECK_EQUAL(3ULL, transactions.term()); - transactions.start(); + tx2 = transactions.begin(false); BOOST_CHECK_EQUAL(3ULL, transactions.term()); - transactions.end(); + transactions.end(tx1); BOOST_CHECK_EQUAL(3ULL, transactions.term()); - transactions.end(); + transactions.end(tx2); BOOST_CHECK_EQUAL(4ULL, transactions.term()); + + tx1 = transactions.begin(true); + BOOST_CHECK_EQUAL(4ULL, transactions.term()); + tx2 = transactions.begin(false); + BOOST_CHECK_EQUAL(5ULL, transactions.term()); + transactions.end(tx2); + BOOST_CHECK_EQUAL(5ULL, transactions.term()); + transactions.end(tx1); + BOOST_CHECK_EQUAL(6ULL, transactions.term()); + + tx1 = transactions.begin(true); + BOOST_CHECK_EQUAL(6ULL, transactions.term()); + tx2 = transactions.begin(false); + BOOST_CHECK_EQUAL(7ULL, transactions.term()); + transactions.end(tx2); + BOOST_CHECK_EQUAL(7ULL, transactions.term()); + tx3 = transactions.begin(true); + BOOST_CHECK_EQUAL(7ULL, transactions.term()); + transactions.end(tx1); + BOOST_CHECK_EQUAL(8ULL, transactions.term()); + transactions.end(tx3); + BOOST_CHECK_EQUAL(8ULL, transactions.term()); } //////////////////////////////////////////////////////////////////////////////// diff --git a/UnitTests/Cache/TransactionalBucket.cpp b/UnitTests/Cache/TransactionalBucket.cpp index 590a4b1e85..eea9a197b6 100644 --- a/UnitTests/Cache/TransactionalBucket.cpp +++ b/UnitTests/Cache/TransactionalBucket.cpp @@ -276,7 +276,7 @@ BOOST_AUTO_TEST_CASE(tst_blacklist) { sizeof(uint64_t)); } - success = bucket.lock(0, -1LL); + success = bucket.lock(1ULL, -1LL); BOOST_CHECK(success); // insert three to fill diff --git a/UnitTests/Cache/TransactionalCache.cpp b/UnitTests/Cache/TransactionalCache.cpp new file mode 100644 index 0000000000..2420d0cd44 --- /dev/null +++ b/UnitTests/Cache/TransactionalCache.cpp @@ -0,0 +1,430 @@ +//////////////////////////////////////////////////////////////////////////////// +/// @brief test suite for arangodb::cache::TransactionalCache +/// +/// @file +/// +/// DISCLAIMER +/// +/// Copyright 2017 triagens GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is triAGENS GmbH, Cologne, Germany +/// +/// @author Daniel H. Larkin +/// @author Copyright 2017, triAGENS GmbH, Cologne, Germany +//////////////////////////////////////////////////////////////////////////////// + +#include "Basics/Common.h" +#include "Random/RandomGenerator.h" + +#define BOOST_TEST_INCLUDED +#include + +#include "Cache/Manager.h" +#include "Cache/Transaction.h" +#include "Cache/TransactionalCache.h" + +#include "MockScheduler.h" + +#include +#include +#include +#include + +#include + +using namespace arangodb; +using namespace arangodb::cache; + +// ----------------------------------------------------------------------------- +// --SECTION-- setup / tear-down +// ----------------------------------------------------------------------------- + +struct CCacheTransactionalCacheSetup { + CCacheTransactionalCacheSetup() { + BOOST_TEST_MESSAGE("setup TransactionalCache"); + } + + ~CCacheTransactionalCacheSetup() { + BOOST_TEST_MESSAGE("tear-down TransactionalCache"); + } +}; +// ----------------------------------------------------------------------------- +// --SECTION-- test suite +// ----------------------------------------------------------------------------- + +//////////////////////////////////////////////////////////////////////////////// +/// @brief setup +//////////////////////////////////////////////////////////////////////////////// + +BOOST_FIXTURE_TEST_SUITE(CCacheTransactionalCacheTest, + CCacheTransactionalCacheSetup) + +//////////////////////////////////////////////////////////////////////////////// +/// @brief test construction (single-threaded) +//////////////////////////////////////////////////////////////////////////////// + +BOOST_AUTO_TEST_CASE(tst_st_construction) { + Manager manager(nullptr, 1024ULL * 1024ULL); + auto cache1 = manager.createCache(Manager::CacheType::Transactional, + 256ULL * 1024ULL, false); + auto cache2 = manager.createCache(Manager::CacheType::Transactional, + 512ULL * 1024ULL, false); + + BOOST_CHECK_EQUAL(0ULL, cache1->usage()); + BOOST_CHECK_EQUAL(256ULL * 1024ULL, cache1->limit()); + BOOST_CHECK_EQUAL(0ULL, cache2->usage()); + BOOST_CHECK(512ULL * 1024ULL > cache2->limit()); + + manager.destroyCache(cache1); + manager.destroyCache(cache2); +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief test insertion (single-threaded) +//////////////////////////////////////////////////////////////////////////////// + +BOOST_AUTO_TEST_CASE(tst_st_insertion) { + uint64_t cacheLimit = 256ULL * 1024ULL; + Manager manager(nullptr, 4ULL * cacheLimit); + auto cache = + manager.createCache(Manager::CacheType::Transactional, cacheLimit, false); + + for (uint64_t i = 0; i < 1024; i++) { + CachedValue* value = + CachedValue::construct(&i, sizeof(uint64_t), &i, sizeof(uint64_t)); + bool success = cache->insert(value); + BOOST_CHECK(success); + auto f = cache->find(&i, sizeof(uint64_t)); + BOOST_CHECK(f.found()); + } + + for (uint64_t i = 0; i < 1024; i++) { + uint64_t j = 2 * i; + CachedValue* value = + CachedValue::construct(&i, sizeof(uint64_t), &j, sizeof(uint64_t)); + bool success = cache->insert(value); + BOOST_CHECK(success); + auto f = cache->find(&i, sizeof(uint64_t)); + BOOST_CHECK(f.found()); + BOOST_CHECK(0 == memcmp(f.value()->value(), &j, sizeof(uint64_t))); + } + + uint64_t notInserted = 0; + for (uint64_t i = 1024; i < 128 * 1024; i++) { + CachedValue* value = + CachedValue::construct(&i, sizeof(uint64_t), &i, sizeof(uint64_t)); + bool success = cache->insert(value); + if (success) { + auto f = cache->find(&i, sizeof(uint64_t)); + BOOST_CHECK(f.found()); + } else { + delete value; + notInserted++; + } + } + BOOST_CHECK(notInserted > 0); + + manager.destroyCache(cache); +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief test removal (single-threaded) +//////////////////////////////////////////////////////////////////////////////// + +BOOST_AUTO_TEST_CASE(tst_st_removal) { + uint64_t cacheLimit = 256ULL * 1024ULL; + Manager manager(nullptr, 4ULL * cacheLimit); + auto cache = + manager.createCache(Manager::CacheType::Transactional, cacheLimit, false); + + for (uint64_t i = 0; i < 1024; i++) { + CachedValue* value = + CachedValue::construct(&i, sizeof(uint64_t), &i, sizeof(uint64_t)); + bool success = cache->insert(value); + BOOST_CHECK(success); + auto f = cache->find(&i, sizeof(uint64_t)); + BOOST_CHECK(f.found()); + BOOST_CHECK(f.value() != nullptr); + BOOST_CHECK(f.value()->sameKey(&i, sizeof(uint64_t))); + } + + // test removal of bogus keys + for (uint64_t i = 1024; i < 2048; i++) { + bool removed = cache->remove(&i, sizeof(uint64_t)); + BOOST_ASSERT(removed); + // ensure existing keys not removed + for (uint64_t j = 0; j < 1024; j++) { + auto f = cache->find(&j, sizeof(uint64_t)); + BOOST_CHECK(f.found()); + BOOST_CHECK(f.value() != nullptr); + BOOST_CHECK(f.value()->sameKey(&j, sizeof(uint64_t))); + } + } + + // remove actual keys + for (uint64_t i = 0; i < 1024; i++) { + bool removed = cache->remove(&i, sizeof(uint64_t)); + BOOST_CHECK(removed); + auto f = cache->find(&i, sizeof(uint64_t)); + BOOST_CHECK(!f.found()); + } + + manager.destroyCache(cache); +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief test blacklisting (single-threaded) +//////////////////////////////////////////////////////////////////////////////// + +BOOST_AUTO_TEST_CASE(tst_st_blacklist) { + uint64_t cacheLimit = 256ULL * 1024ULL; + Manager manager(nullptr, 4ULL * cacheLimit); + auto cache = + manager.createCache(Manager::CacheType::Transactional, cacheLimit, false); + + Transaction* tx = manager.beginTransaction(false); + + for (uint64_t i = 0; i < 1024; i++) { + CachedValue* value = + CachedValue::construct(&i, sizeof(uint64_t), &i, sizeof(uint64_t)); + bool success = cache->insert(value); + BOOST_CHECK(success); + auto f = cache->find(&i, sizeof(uint64_t)); + BOOST_CHECK(f.found()); + BOOST_CHECK(f.value() != nullptr); + BOOST_CHECK(f.value()->sameKey(&i, sizeof(uint64_t))); + } + + for (uint64_t i = 512; i < 1024; i++) { + bool success = cache->blacklist(&i, sizeof(uint64_t)); + BOOST_CHECK(success); + auto f = cache->find(&i, sizeof(uint64_t)); + BOOST_CHECK(!f.found()); + } + + for (uint64_t i = 512; i < 1024; i++) { + CachedValue* value = + CachedValue::construct(&i, sizeof(uint64_t), &i, sizeof(uint64_t)); + bool success = cache->insert(value); + BOOST_CHECK(!success); + delete value; + auto f = cache->find(&i, sizeof(uint64_t)); + BOOST_CHECK(!f.found()); + } + + manager.endTransaction(tx); + tx = manager.beginTransaction(false); + + for (uint64_t i = 512; i < 1024; i++) { + CachedValue* value = + CachedValue::construct(&i, sizeof(uint64_t), &i, sizeof(uint64_t)); + bool success = cache->insert(value); + BOOST_CHECK(success); + auto f = cache->find(&i, sizeof(uint64_t)); + BOOST_CHECK(f.found()); + } + + manager.endTransaction(tx); + manager.destroyCache(cache); +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief test growth behavior (single-threaded) +//////////////////////////////////////////////////////////////////////////////// + +BOOST_AUTO_TEST_CASE(tst_st_growth) { + uint64_t initialSize = 16ULL * 1024ULL; + uint64_t minimumSize = 64ULL * initialSize; + MockScheduler scheduler(4); + Manager manager(scheduler.ioService(), 1024ULL * 1024ULL * 1024ULL); + auto cache = + manager.createCache(Manager::CacheType::Transactional, initialSize, true); + + for (uint64_t i = 0; i < 4ULL * 1024ULL * 1024ULL; i++) { + CachedValue* value = + CachedValue::construct(&i, sizeof(uint64_t), &i, sizeof(uint64_t)); + bool success = cache->insert(value); + if (!success) { + delete value; + } + } + + BOOST_CHECK(cache->usage() > minimumSize); + + manager.destroyCache(cache); +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief test shrink behavior (single-threaded) +//////////////////////////////////////////////////////////////////////////////// + +BOOST_AUTO_TEST_CASE(tst_st_shrink) { + uint64_t initialSize = 16ULL * 1024ULL; + RandomGenerator::initialize(RandomGenerator::RandomType::MERSENNE); + MockScheduler scheduler(4); + Manager manager(scheduler.ioService(), 1024ULL * 1024ULL * 1024ULL); + auto cache = + manager.createCache(Manager::CacheType::Transactional, initialSize, true); + + for (uint64_t i = 0; i < 16ULL * 1024ULL * 1024ULL; i++) { + CachedValue* value = + CachedValue::construct(&i, sizeof(uint64_t), &i, sizeof(uint64_t)); + bool success = cache->insert(value); + if (!success) { + delete value; + } + } + + cache->disableGrowth(); + uint64_t target = cache->usage() / 2; + while (!cache->resize(target)) { + }; + + for (uint64_t i = 0; i < 16ULL * 1024ULL * 1024ULL; i++) { + CachedValue* value = + CachedValue::construct(&i, sizeof(uint64_t), &i, sizeof(uint64_t)); + bool success = cache->insert(value); + if (!success) { + delete value; + } + } + + while (cache->isResizing()) { + } + BOOST_CHECK_MESSAGE(cache->usage() <= target, + cache->usage() << " !<= " << target); + + manager.destroyCache(cache); + RandomGenerator::shutdown(); +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief test mixed load behavior (multi-threaded) +//////////////////////////////////////////////////////////////////////////////// + +BOOST_AUTO_TEST_CASE(tst_mt_mixed_load) { + uint64_t initialSize = 16ULL * 1024ULL; + RandomGenerator::initialize(RandomGenerator::RandomType::MERSENNE); + MockScheduler scheduler(4); + Manager manager(scheduler.ioService(), 1024ULL * 1024ULL * 1024ULL); + size_t threadCount = 4; + std::shared_ptr cache = + manager.createCache(Manager::CacheType::Transactional, initialSize, true); + + uint64_t chunkSize = 16 * 1024 * 1024; + uint64_t initialInserts = 4 * 1024 * 1024; + uint64_t operationCount = 16 * 1024 * 1024; + std::atomic hitCount(0); + std::atomic missCount(0); + auto worker = [&manager, &cache, initialInserts, operationCount, &hitCount, + &missCount](uint64_t lower, uint64_t upper) -> void { + Transaction* tx = manager.beginTransaction(false); + // fill with some initial data + for (uint64_t i = 0; i < initialInserts; i++) { + uint64_t item = lower + i; + CachedValue* value = CachedValue::construct(&item, sizeof(uint64_t), + &item, sizeof(uint64_t)); + bool ok = cache->insert(value); + if (!ok) { + delete value; + } + } + + // initialize valid range for keys that *might* be in cache + uint64_t validLower = lower; + uint64_t validUpper = lower + initialInserts - 1; + uint64_t blacklistUpper = validUpper; + + // commence mixed workload + for (uint64_t i = 0; i < operationCount; i++) { + uint32_t r = RandomGenerator::interval(static_cast(99UL)); + + if (r >= 99) { // remove something + if (validLower == validUpper) { + continue; // removed too much + } + + uint64_t item = validLower++; + + cache->remove(&item, sizeof(uint64_t)); + } else if (r >= 90) { // insert something + if (validUpper == upper) { + continue; // already maxed out range + } + + uint64_t item = ++validUpper; + if (validUpper > blacklistUpper) { + blacklistUpper = validUpper; + } + CachedValue* value = CachedValue::construct(&item, sizeof(uint64_t), + &item, sizeof(uint64_t)); + bool ok = cache->insert(value); + if (!ok) { + delete value; + } + } else if (r >= 80) { // blacklist something + if (blacklistUpper == upper) { + continue; // already maxed out range + } + + uint64_t item = ++blacklistUpper; + cache->blacklist(&item, sizeof(uint64_t)); + } else { // lookup something + uint64_t item = RandomGenerator::interval( + static_cast(validLower), static_cast(validUpper)); + + Cache::Finding f = cache->find(&item, sizeof(uint64_t)); + if (f.found()) { + hitCount++; + TRI_ASSERT(f.value() != nullptr); + TRI_ASSERT(f.value()->sameKey(&item, sizeof(uint64_t))); + } else { + missCount++; + TRI_ASSERT(f.value() == nullptr); + } + } + } + manager.endTransaction(tx); + }; + + std::vector threads; + // dispatch threads + for (size_t i = 0; i < threadCount; i++) { + uint64_t lower = i * chunkSize; + uint64_t upper = ((i + 1) * chunkSize) - 1; + threads.push_back(new std::thread(worker, lower, upper)); + } + + // join threads + for (auto t : threads) { + t->join(); + delete t; + } + + manager.destroyCache(cache); + RandomGenerator::shutdown(); +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief generate tests +//////////////////////////////////////////////////////////////////////////////// + +BOOST_AUTO_TEST_SUITE_END() + +// Local Variables: +// mode: outline-minor +// outline-regexp: "^\\(/// @brief\\|/// {@inheritDoc}\\|/// @addtogroup\\|// +// --SECTION--\\|/// @\\}\\)" +// End: diff --git a/UnitTests/Cache/TransactionalStore.cpp b/UnitTests/Cache/TransactionalStore.cpp new file mode 100644 index 0000000000..5729ac5ed7 --- /dev/null +++ b/UnitTests/Cache/TransactionalStore.cpp @@ -0,0 +1,277 @@ +//////////////////////////////////////////////////////////////////////////////// +/// @brief helper for cache suite +/// +/// @file +/// +/// DISCLAIMER +/// +/// Copyright 2017 triagens GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is triAGENS GmbH, Cologne, Germany +/// +/// @author Daniel H. Larkin +/// @author Copyright 2017, triAGENS GmbH, Cologne, Germany +//////////////////////////////////////////////////////////////////////////////// + +#include "TransactionalStore.h" +#include "Basics/Common.h" +#include "Basics/StringBuffer.h" +#include "Basics/files.h" +#include "Cache/Manager.h" +#include "Cache/TransactionalCache.h" + +#include +#include +#include +#include + +#include +#include + +using namespace arangodb::cache; + +std::atomic TransactionalStore::_sequence(0); + +TransactionalStore::Document::Document() : Document(0) {} + +TransactionalStore::Document::Document(uint64_t k) + : key(k), + timestamp(static_cast( + std::chrono::steady_clock::now().time_since_epoch().count())), + sequence(0) {} + +void TransactionalStore::Document::advance() { + timestamp = static_cast( + std::chrono::steady_clock::now().time_since_epoch().count()); + sequence++; +} + +void TransactionalStore::Document::clear() { + memset(this, 0, sizeof(Document)); +} + +bool TransactionalStore::Document::empty() const { return (key == 0); } + +TransactionalStore::Transaction::Transaction(arangodb::cache::Transaction* c, + rocksdb::Transaction* r) + : cache(c), rocks(r) {} + +TransactionalStore::TransactionalStore(Manager* manager) + : _manager(manager), + _directory(TRI_UNKNOWN_MEM_ZONE), + _id(++_sequence), + _readOptions(rocksdb::ReadOptions()), + _writeOptions(rocksdb::WriteOptions()), + _txOptions(rocksdb::TransactionOptions()) { + TRI_ASSERT(manager != nullptr); + _cache = manager->createCache(Manager::CacheType::Transactional, 1024 * 1024, + true, true); + TRI_ASSERT(_cache.get() != nullptr); + + _directory.appendText(TRI_GetTempPath()); + _directory.appendChar(TRI_DIR_SEPARATOR_CHAR); + _directory.appendText("cache-test-transactional-store-"); + _directory.appendText(std::to_string(_id)); + + rocksdb::Options options; + options.create_if_missing = true; + options.max_open_files = 128; + + auto status = rocksdb::TransactionDB::Open( + options, rocksdb::TransactionDBOptions(), _directory.c_str(), &_db); + if (!status.ok()) { + std::cerr << status.ToString() << std::endl; + } + TRI_ASSERT(status.ok()); +} + +TransactionalStore::~TransactionalStore() { + delete _db; + TRI_ASSERT(_directory.length() > 20); + TRI_RemoveDirectory(_directory.c_str()); + _manager->destroyCache(_cache); +} + +Cache* TransactionalStore::cache() { return _cache.get(); } + +TransactionalStore::Transaction* TransactionalStore::beginTransaction( + bool readOnly) { + auto cache = _manager->beginTransaction(readOnly); + auto rocks = _db->BeginTransaction(_writeOptions, _txOptions); + rocks->SetSnapshot(); + return new Transaction(cache, rocks); +} + +bool TransactionalStore::commit(TransactionalStore::Transaction* tx) { + if (tx != nullptr) { + auto status = tx->rocks->Commit(); + if (status.ok()) { + _manager->endTransaction(tx->cache); + delete tx->rocks; + delete tx; + return true; + } + } + return false; +} + +bool TransactionalStore::rollback(TransactionalStore::Transaction* tx) { + if (tx != nullptr) { + tx->rocks->Rollback(); + _manager->endTransaction(tx->cache); + delete tx->rocks; + delete tx; + return true; + } + return false; +} + +bool TransactionalStore::insert(TransactionalStore::Transaction* tx, + TransactionalStore::Document const& document) { + bool useInternalTransaction = (tx == nullptr); + if (useInternalTransaction) { + tx = beginTransaction(false); + } + + bool inserted = false; + Document d = lookup(tx, document.key); + if (d.empty()) { // ensure document with this key does not exist + // blacklist in cache first + _cache->blacklist(&(document.key), sizeof(uint64_t)); + + // now write to rocksdb + rocksdb::Slice kSlice(reinterpret_cast(&(document.key)), + sizeof(uint64_t)); + rocksdb::Slice vSlice(reinterpret_cast(&document), + sizeof(Document)); + auto status = tx->rocks->Put(kSlice, vSlice); + inserted = status.ok(); + } + + if (useInternalTransaction) { + bool ok = commit(tx); + if (!ok) { + rollback(tx); + inserted = false; + } + } + + return inserted; +} + +bool TransactionalStore::update(TransactionalStore::Transaction* tx, + TransactionalStore::Document const& document) { + bool useInternalTransaction = (tx == nullptr); + if (useInternalTransaction) { + tx = beginTransaction(false); + } + + bool updated = false; + Document d = lookup(tx, document.key); + if (!d.empty()) { // ensure document with this key exists + // blacklist in cache first + _cache->blacklist(&(document.key), sizeof(uint64_t)); + + // now write to rocksdb + rocksdb::Slice kSlice(reinterpret_cast(&(document.key)), + sizeof(uint64_t)); + rocksdb::Slice vSlice(reinterpret_cast(&document), + sizeof(Document)); + auto status = tx->rocks->Put(kSlice, vSlice); + updated = status.ok(); + } + + if (useInternalTransaction) { + bool ok = commit(tx); + if (!ok) { + rollback(tx); + updated = false; + } + } + + return updated; +} + +bool TransactionalStore::remove(TransactionalStore::Transaction* tx, + uint64_t key) { + bool useInternalTransaction = (tx == nullptr); + if (useInternalTransaction) { + tx = beginTransaction(false); + } + + bool removed = false; + Document d = lookup(tx, key); + if (!d.empty()) { // ensure document with this key exists + // blacklist in cache first + _cache->blacklist(&key, sizeof(uint64_t)); + + // now write to rocksdb + rocksdb::Slice kSlice(reinterpret_cast(&key), sizeof(uint64_t)); + auto status = tx->rocks->Delete(kSlice); + removed = status.ok(); + } + + if (useInternalTransaction) { + bool ok = commit(tx); + if (!ok) { + rollback(tx); + removed = false; + } + } + + return removed; +} + +TransactionalStore::Document TransactionalStore::lookup( + TransactionalStore::Transaction* tx, uint64_t key) { + bool useInternalTransaction = (tx == nullptr); + if (useInternalTransaction) { + tx = beginTransaction(true); + } + + Document result; + { + Cache::Finding f = _cache->find(&key, sizeof(uint64_t)); + if (f.found()) { + CachedValue const* cv = f.value(); + memcpy(&result, cv->value(), sizeof(Document)); + } + } + if (result.empty()) { + auto readOptions = rocksdb::ReadOptions(); + readOptions.snapshot = tx->rocks->GetSnapshot(); + rocksdb::Slice kSlice(reinterpret_cast(&key), sizeof(uint64_t)); + std::string buffer; + auto status = tx->rocks->Get(readOptions, kSlice, &buffer); + if (status.ok()) { + memcpy(&result, buffer.data(), sizeof(Document)); + CachedValue* value = CachedValue::construct(&key, sizeof(uint64_t), + &result, sizeof(Document)); + bool inserted = _cache->insert(value); + if (!inserted) { + delete value; + } + } + } + + if (useInternalTransaction) { + bool ok = commit(tx); + if (!ok) { + rollback(tx); + } + } + + return result; +} diff --git a/UnitTests/Cache/TransactionalStore.h b/UnitTests/Cache/TransactionalStore.h new file mode 100644 index 0000000000..152f21d578 --- /dev/null +++ b/UnitTests/Cache/TransactionalStore.h @@ -0,0 +1,98 @@ +//////////////////////////////////////////////////////////////////////////////// +/// @brief helper for cache suite +/// +/// @file +/// +/// DISCLAIMER +/// +/// Copyright 2017 triagens GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is triAGENS GmbH, Cologne, Germany +/// +/// @author Daniel H. Larkin +/// @author Copyright 2017, triAGENS GmbH, Cologne, Germany +//////////////////////////////////////////////////////////////////////////////// + +#ifndef UNITTESTS_CACHE_TRANSACTIONAL_STORE_H +#define UNITTESTS_CACHE_TRANSACTIONAL_STORE_H + +#include "Basics/Common.h" +#include "Basics/StringBuffer.h" +#include "Cache/Manager.h" +#include "Cache/TransactionalCache.h" + +#include +#include +#include +#include + +#include + +namespace arangodb { +namespace cache { + +class TransactionalStore { + public: + struct Document { + uint64_t key; + uint64_t timestamp; + uint64_t sequence; + + Document(); + Document(uint64_t k); + void advance(); + void clear(); + bool empty() const; + }; + + struct Transaction { + arangodb::cache::Transaction* cache; + rocksdb::Transaction* rocks; + + Transaction(arangodb::cache::Transaction* c, rocksdb::Transaction* r); + }; + + public: + TransactionalStore(Manager* manager); + ~TransactionalStore(); + + Cache* cache(); + + TransactionalStore::Transaction* beginTransaction(bool readOnly); + bool commit(TransactionalStore::Transaction* tx); + bool rollback(TransactionalStore::Transaction* tx); + + bool insert(TransactionalStore::Transaction* tx, Document const& document); + bool update(TransactionalStore::Transaction* tx, Document const& document); + bool remove(TransactionalStore::Transaction* tx, uint64_t key); + Document lookup(TransactionalStore::Transaction* tx, uint64_t key); + + private: + static std::atomic _sequence; + Manager* _manager; + std::shared_ptr _cache; + + arangodb::basics::StringBuffer _directory; + uint32_t _id; + rocksdb::TransactionDB* _db; + rocksdb::ReadOptions _readOptions; + rocksdb::WriteOptions _writeOptions; + rocksdb::TransactionOptions _txOptions; +}; + +}; // end namespace cache +}; // end namespace arangodb + +#endif diff --git a/UnitTests/Cache/TransactionsWithBackingStore.cpp b/UnitTests/Cache/TransactionsWithBackingStore.cpp new file mode 100644 index 0000000000..0d265f7d21 --- /dev/null +++ b/UnitTests/Cache/TransactionsWithBackingStore.cpp @@ -0,0 +1,487 @@ +//////////////////////////////////////////////////////////////////////////////// +/// @brief test suite for arangodb::cache::TransactionalCache with backing store +/// +/// @file +/// +/// DISCLAIMER +/// +/// Copyright 2017 triagens GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is triAGENS GmbH, Cologne, Germany +/// +/// @author Daniel H. Larkin +/// @author Copyright 2017, triAGENS GmbH, Cologne, Germany +//////////////////////////////////////////////////////////////////////////////// + +#include "Basics/Common.h" + +#define BOOST_TEST_INCLUDED +#include + +#include "Cache/Manager.h" +#include "Cache/Rebalancer.h" +#include "Random/RandomGenerator.h" + +#include "MockScheduler.h" +#include "TransactionalStore.h" + +#include +#include +#include +#include +#include + +using namespace arangodb; +using namespace arangodb::cache; + +// ----------------------------------------------------------------------------- +// --SECTION-- setup / tear-down +// ----------------------------------------------------------------------------- + +struct CCacheTransactionsWithBackingStoreSetup { + CCacheTransactionsWithBackingStoreSetup() { + BOOST_TEST_MESSAGE("setup TransactionsWithBackingStore"); + } + + ~CCacheTransactionsWithBackingStoreSetup() { + BOOST_TEST_MESSAGE("tear-down TransactionsWithBackingStore"); + } +}; +// ----------------------------------------------------------------------------- +// --SECTION-- test suite +// ----------------------------------------------------------------------------- + +//////////////////////////////////////////////////////////////////////////////// +/// @brief setup +//////////////////////////////////////////////////////////////////////////////// + +BOOST_FIXTURE_TEST_SUITE(CCacheTransactionsWithBackingStoreTest, + CCacheTransactionsWithBackingStoreSetup) + +/* +Planned Tests +============= + +All tests begin by filling the database with a set number of documents. After +that, all writes consist of updates via the Document::advance() API to both keep +things simple and to provide a reliable way to test what version of a document a +reader gets. + + 1) Single store; Read-only; hotset access pattern + - Test for hit rate + + 2) Single store; Simultaneous read, write threads, part 1 + - Have writers sleep a while between transactions + - Have readers read single documents with only internal transactions + - Test for hit rate + + 3) Single store; Simultaneous read, write threads, part 2 + - Have writers sleep a while between transactions + - Have readers read a set of documents within a transaction + - Test for snapshot isolation to the extent possible + - Test for hit rate + + 4) Multiple stores with rebalancing; Simultaneous read, write threads + - Use small global limit to provide memory pressure + - Vary store-access bias over time to check that rebalancing helps + - Have writers sleep a while between transactions + - Have readers read a set of documents within a transaction +*/ + +//////////////////////////////////////////////////////////////////////////////// +/// @brief test hit rate for read-only hotset workload +//////////////////////////////////////////////////////////////////////////////// + +BOOST_AUTO_TEST_CASE(tst_single_readonly_hotset) { + RandomGenerator::initialize(RandomGenerator::RandomType::MERSENNE); + MockScheduler scheduler(4); + Manager manager(scheduler.ioService(), 16 * 1024 * 1024); + TransactionalStore store(&manager); + uint64_t totalDocuments = 1000000; + uint64_t hotsetSize = 50000; + size_t threadCount = 4; + uint64_t lookupsPerThread = 1000000; + + // initial fill + for (uint64_t i = 1; i <= totalDocuments; i++) { + store.insert(nullptr, TransactionalStore::Document(i)); + } + + auto worker = [&store, hotsetSize, totalDocuments, + lookupsPerThread]() -> void { + for (uint64_t i = 0; i < lookupsPerThread; i++) { + uint32_t r = RandomGenerator::interval(static_cast(99)); + uint64_t choice = (r >= 90) ? RandomGenerator::interval(totalDocuments) + : RandomGenerator::interval(hotsetSize); + if (choice == 0) { + choice = 1; + } + + auto d = store.lookup(nullptr, choice); + TRI_ASSERT(!d.empty()); + } + }; + + std::vector threads; + // dispatch threads + for (size_t i = 0; i < threadCount; i++) { + threads.push_back(new std::thread(worker)); + } + + // join threads + for (auto t : threads) { + t->join(); + delete t; + } + + auto hitRates = manager.globalHitRates(); + BOOST_CHECK(hitRates.first >= 65.0); + BOOST_CHECK(hitRates.second >= 85.0); + + RandomGenerator::shutdown(); +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief test hit rate for mixed workload +//////////////////////////////////////////////////////////////////////////////// + +BOOST_AUTO_TEST_CASE(tst_single_mixed_hitrate) { + RandomGenerator::initialize(RandomGenerator::RandomType::MERSENNE); + MockScheduler scheduler(4); + Manager manager(scheduler.ioService(), 256 * 1024 * 1024); + TransactionalStore store(&manager); + uint64_t totalDocuments = 1000000; + uint64_t batchSize = 1000; + size_t readerCount = 4; + size_t writerCount = 2; + std::atomic writersDone(0); + auto writeWaitInterval = std::chrono::milliseconds(10); + + // initial fill + for (uint64_t i = 1; i <= totalDocuments; i++) { + store.insert(nullptr, TransactionalStore::Document(i)); + } + + auto readWorker = [&store, &writersDone, writerCount, + totalDocuments]() -> void { + while (writersDone.load() < writerCount) { + uint64_t choice = RandomGenerator::interval(totalDocuments); + if (choice == 0) { + choice = 1; + } + + auto d = store.lookup(nullptr, choice); + TRI_ASSERT(!d.empty()); + } + }; + + auto writeWorker = [&store, &writersDone, writerCount, totalDocuments, + batchSize, &writeWaitInterval](uint64_t lower, + uint64_t upper) -> void { + uint64_t batches = (upper + 1 - lower) / batchSize; + uint64_t choice = lower; + for (uint64_t batch = 0; batch < batches; batch++) { + auto tx = store.beginTransaction(false); + for (uint64_t i = 0; i < batchSize; i++) { + auto d = store.lookup(tx, choice); + TRI_ASSERT(!d.empty()); + d.advance(); + bool ok = store.update(tx, d); + TRI_ASSERT(ok); + choice++; + } + bool ok = store.commit(tx); + TRI_ASSERT(ok); + std::this_thread::sleep_for(writeWaitInterval); + } + writersDone++; + }; + + std::vector threads; + // dispatch reader threads + for (size_t i = 0; i < readerCount; i++) { + threads.push_back(new std::thread(readWorker)); + } + // dispatch writer threads + uint64_t chunkSize = totalDocuments / writerCount; + for (size_t i = 0; i < writerCount; i++) { + uint64_t lower = (i * chunkSize) + 1; + uint64_t upper = ((i + 1) * chunkSize); + threads.push_back(new std::thread(writeWorker, lower, upper)); + } + + // join threads + for (auto t : threads) { + t->join(); + delete t; + } + + auto hitRates = manager.globalHitRates(); + BOOST_CHECK(hitRates.first >= 40.0); + BOOST_CHECK(hitRates.second >= 60.0); + + RandomGenerator::shutdown(); +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief test transactionality for mixed workload +//////////////////////////////////////////////////////////////////////////////// + +BOOST_AUTO_TEST_CASE(tst_single_mixed_transactionality) { + RandomGenerator::initialize(RandomGenerator::RandomType::MERSENNE); + MockScheduler scheduler(4); + Manager manager(scheduler.ioService(), 256 * 1024 * 1024); + TransactionalStore store(&manager); + uint64_t totalDocuments = 1000000; + uint64_t writeBatchSize = 1000; + uint64_t readBatchSize = 10000; + size_t readerCount = 4; + size_t writerCount = 2; + std::atomic writersDone(0); + auto writeWaitInterval = std::chrono::milliseconds(10); + + // initial fill + for (uint64_t i = 1; i <= totalDocuments; i++) { + store.insert(nullptr, TransactionalStore::Document(i)); + } + + auto readWorker = [&store, &writersDone, writerCount, totalDocuments, + readBatchSize]() -> void { + while (writersDone.load() < writerCount) { + auto tx = store.beginTransaction(true); + uint64_t start = static_cast( + std::chrono::steady_clock::now().time_since_epoch().count()); + for (uint64_t i = 0; i < readBatchSize; i++) { + uint64_t choice = RandomGenerator::interval(totalDocuments); + if (choice == 0) { + choice = 1; + } + + auto d = store.lookup(tx, choice); + TRI_ASSERT(!d.empty()); + TRI_ASSERT(d.timestamp <= start); // transactionality + } + bool ok = store.commit(tx); + TRI_ASSERT(ok); + } + }; + + auto writeWorker = [&store, &writersDone, writerCount, totalDocuments, + writeBatchSize, &writeWaitInterval]( + uint64_t lower, uint64_t upper) -> void { + uint64_t batches = (upper + 1 - lower) / writeBatchSize; + uint64_t choice = lower; + for (uint64_t batch = 0; batch < batches; batch++) { + auto tx = store.beginTransaction(false); + for (uint64_t i = 0; i < writeBatchSize; i++) { + auto d = store.lookup(tx, choice); + TRI_ASSERT(!d.empty()); + d.advance(); + bool ok = store.update(tx, d); + TRI_ASSERT(ok); + choice++; + } + bool ok = store.commit(tx); + TRI_ASSERT(ok); + std::this_thread::sleep_for(writeWaitInterval); + } + writersDone++; + }; + + std::vector threads; + // dispatch reader threads + for (size_t i = 0; i < readerCount; i++) { + threads.push_back(new std::thread(readWorker)); + } + // dispatch writer threads + uint64_t chunkSize = totalDocuments / writerCount; + for (size_t i = 0; i < writerCount; i++) { + uint64_t lower = (i * chunkSize) + 1; + uint64_t upper = ((i + 1) * chunkSize); + threads.push_back(new std::thread(writeWorker, lower, upper)); + } + + // join threads + for (auto t : threads) { + t->join(); + delete t; + } + + RandomGenerator::shutdown(); +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief test rebalancing in the wild +//////////////////////////////////////////////////////////////////////////////// + +BOOST_AUTO_TEST_CASE(tst_multi_rebalancing) { + RandomGenerator::initialize(RandomGenerator::RandomType::MERSENNE); + MockScheduler scheduler(4); + Manager manager(scheduler.ioService(), 16 * 1024 * 1024); + Rebalancer rebalancer(&manager); + TransactionalStore store1(&manager); + TransactionalStore store2(&manager); + uint64_t totalDocuments = 1000000; + uint64_t writeBatchSize = 1000; + uint64_t readBatchSize = 100; + size_t readerCount = 4; + size_t writerCount = 2; + std::atomic writersDone(0); + auto writeWaitInterval = std::chrono::milliseconds(50); + uint32_t storeBias; + + bool doneRebalancing = false; + auto rebalanceWorker = [&rebalancer, &doneRebalancing]() -> void { + while (!doneRebalancing) { + bool rebalanced = rebalancer.rebalance(); + if (rebalanced) { + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + } else { + std::this_thread::sleep_for(std::chrono::microseconds(100)); + } + } + }; + auto rebalancerThread = new std::thread(rebalanceWorker); + + // initial fill + for (uint64_t i = 1; i <= totalDocuments; i++) { + store1.insert(nullptr, TransactionalStore::Document(i)); + store2.insert(nullptr, TransactionalStore::Document(i)); + } + + auto readWorker = [&store1, &store2, &storeBias, &writersDone, writerCount, + totalDocuments, readBatchSize]() -> void { + while (writersDone.load() < writerCount) { + uint32_t r = RandomGenerator::interval(99UL); + TransactionalStore* store = (r <= storeBias) ? &store1 : &store2; + auto tx = store->beginTransaction(true); + uint64_t start = static_cast( + std::chrono::steady_clock::now().time_since_epoch().count()); + for (uint64_t i = 0; i < readBatchSize; i++) { + uint64_t choice = RandomGenerator::interval(totalDocuments); + if (choice == 0) { + choice = 1; + } + + auto d = store->lookup(tx, choice); + TRI_ASSERT(!d.empty()); + TRI_ASSERT(d.timestamp <= start); // transactionality + } + bool ok = store->commit(tx); + TRI_ASSERT(ok); + } + }; + + auto writeWorker = [&store1, &store2, &storeBias, &writersDone, writerCount, + totalDocuments, writeBatchSize, &writeWaitInterval]( + uint64_t lower, uint64_t upper) -> void { + uint64_t batches = (upper + 1 - lower) / writeBatchSize; + uint64_t choice = lower; + for (uint64_t batch = 0; batch < batches; batch++) { + uint32_t r = RandomGenerator::interval(99UL); + TransactionalStore* store = (r <= storeBias) ? &store1 : &store2; + auto tx = store->beginTransaction(false); + for (uint64_t i = 0; i < writeBatchSize; i++) { + auto d = store->lookup(tx, choice); + TRI_ASSERT(!d.empty()); + d.advance(); + bool ok = store->update(tx, d); + TRI_ASSERT(ok); + choice++; + } + bool ok = store->commit(tx); + TRI_ASSERT(ok); + std::this_thread::sleep_for(writeWaitInterval); + } + writersDone++; + }; + + std::vector threads; + + // bias toward first store + storeBias = 80; + + // dispatch reader threads + for (size_t i = 0; i < readerCount; i++) { + threads.push_back(new std::thread(readWorker)); + } + // dispatch writer threads + uint64_t chunkSize = totalDocuments / writerCount; + for (size_t i = 0; i < writerCount; i++) { + uint64_t lower = (i * chunkSize) + 1; + uint64_t upper = ((i + 1) * chunkSize); + threads.push_back(new std::thread(writeWorker, lower, upper)); + } + + // join threads + for (auto t : threads) { + t->join(); + delete t; + } + + while (store1.cache()->isResizing() || store2.cache()->isResizing()) { + std::this_thread::yield(); + } + /*BOOST_CHECK_MESSAGE(2 * store1.cache()->limit() > store2.cache()->limit(), + 2 * store1.cache()->limit() << " !> " + << store2.cache()->limit()); + */ + threads.clear(); + + // bias toward second store + storeBias = 20; + + // dispatch reader threads + for (size_t i = 0; i < readerCount; i++) { + threads.push_back(new std::thread(readWorker)); + } + // dispatch writer threads + for (size_t i = 0; i < writerCount; i++) { + uint64_t lower = (i * chunkSize) + 1; + uint64_t upper = ((i + 1) * chunkSize); + threads.push_back(new std::thread(writeWorker, lower, upper)); + } + + // join threads + for (auto t : threads) { + t->join(); + delete t; + } + + while (store1.cache()->isResizing() || store2.cache()->isResizing()) { + std::this_thread::yield(); + } + /*BOOST_CHECK_MESSAGE(store1.cache()->limit() < 2 * store2.cache()->limit(), + store1.cache()->limit() << " !< " + << 2 * store2.cache()->limit()); + */ + doneRebalancing = true; + rebalancerThread->join(); + delete rebalancerThread; + + RandomGenerator::shutdown(); +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief generate tests +//////////////////////////////////////////////////////////////////////////////// + +BOOST_AUTO_TEST_SUITE_END() + +// Local Variables: +// mode: outline-minor +// outline-regexp: "^\\(/// @brief\\|/// {@inheritDoc}\\|/// @addtogroup\\|// +// --SECTION--\\|/// @\\}\\)" +// End: diff --git a/arangod/CMakeLists.txt b/arangod/CMakeLists.txt index 03e099a0f0..59c99a24f1 100644 --- a/arangod/CMakeLists.txt +++ b/arangod/CMakeLists.txt @@ -180,9 +180,10 @@ SET(ARANGOD_SOURCES Cache/PlainCache.cpp Cache/Rebalancer.cpp Cache/State.cpp + Cache/Transaction.cpp Cache/TransactionalBucket.cpp Cache/TransactionalCache.cpp - Cache/TransactionWindow.cpp + Cache/TransactionManager.cpp Cluster/AgencyCallback.cpp Cluster/AgencyCallbackRegistry.cpp Cluster/ClusterComm.cpp diff --git a/arangod/Cache/Cache.cpp b/arangod/Cache/Cache.cpp index a3ab5439c1..49ac44fcbf 100644 --- a/arangod/Cache/Cache.cpp +++ b/arangod/Cache/Cache.cpp @@ -38,6 +38,11 @@ using namespace arangodb::cache; +uint64_t Cache::_evictionStatsCapacity = 1024; +uint64_t Cache::_findStatsCapacity = 16384; + +Cache::ConstructionGuard::ConstructionGuard() {} + Cache::Finding::Finding(CachedValue* v) : _value(v) { if (_value != nullptr) { _value->lease(); @@ -111,9 +116,30 @@ CachedValue* Cache::Finding::copy() const { return ((_value == nullptr) ? nullptr : _value->copy()); } -void Cache::destroy(std::shared_ptr cache) { - if (cache.get() != nullptr) { - cache->shutdown(); +Cache::Cache(ConstructionGuard guard, Manager* manager, + Manager::MetadataItr metadata, bool allowGrowth, + bool enableWindowedStats) + : _state(), + _allowGrowth(allowGrowth), + _evictionStats(_evictionStatsCapacity), + _insertionCount(0), + _enableWindowedStats(enableWindowedStats), + _findStats(nullptr), + _findHits(0), + _findMisses(0), + _manager(manager), + _metadata(metadata), + _openOperations(0), + _migrateRequestTime(std::chrono::steady_clock::now()), + _resizeRequestTime(std::chrono::steady_clock::now()), + _lastResizeRequestStatus(true) { + if (_enableWindowedStats) { + try { + _findStats.reset(new StatBuffer(_findStatsCapacity)); + } catch (std::bad_alloc) { + _findStats.reset(nullptr); + _enableWindowedStats = false; + } } } @@ -142,12 +168,15 @@ uint64_t Cache::usage() { } std::pair Cache::hitRates() { + double lifetimeRate = std::nan(""); double windowedRate = std::nan(""); uint64_t currentMisses = _findMisses.load(); uint64_t currentHits = _findHits.load(); - double lifetimeRate = 100 * (static_cast(currentHits) / - static_cast(currentHits + currentMisses)); + if (currentMisses + currentHits > 0) { + lifetimeRate = 100 * (static_cast(currentHits) / + static_cast(currentHits + currentMisses)); + } if (_enableWindowedStats && _findStats.get() != nullptr) { auto stats = _findStats->getFrequencies(); @@ -165,8 +194,10 @@ std::pair Cache::hitRates() { currentHits = (*stats)[1].second; currentMisses = (*stats)[0].second; } - windowedRate = 100 * (static_cast(currentHits) / - static_cast(currentHits + currentMisses)); + if (currentHits + currentMisses > 0) { + windowedRate = 100 * (static_cast(currentHits) / + static_cast(currentHits + currentMisses)); + } } } @@ -223,38 +254,9 @@ bool Cache::isResizing() { return resizing; } -Cache::Cache(Manager* manager, uint64_t requestedLimit, bool allowGrowth, - bool enableWindowedStats, std::function deleter, - uint64_t size) - : _state(), - _allowGrowth(allowGrowth), - _evictionStats(1024), - _insertionCount(0), - _enableWindowedStats(enableWindowedStats), - _findStats(nullptr), - _findHits(0), - _findMisses(0), - _manager(manager), - _openOperations(0), - _migrateRequestTime(std::chrono::steady_clock::now()), - _resizeRequestTime(std::chrono::steady_clock::now()) { - try { - uint64_t fullSize = - size + _evictionStats.memoryUsage() + - ((_findStats.get() == nullptr) ? 0 : _findStats->memoryUsage()); - _metadata = - _manager->registerCache(this, requestedLimit, deleter, fullSize); - } catch (std::bad_alloc) { - // could not register, mark as non-operational - if (!_state.isSet(State::Flag::shutdown)) { - _state.toggleFlag(State::Flag::shutdown); - } - } - try { - _findStats.reset(new StatBuffer(16384)); - } catch (std::bad_alloc) { - _findStats.reset(nullptr); - _enableWindowedStats = false; +void Cache::destroy(std::shared_ptr cache) { + if (cache.get() != nullptr) { + cache->shutdown(); } } @@ -282,7 +284,13 @@ bool Cache::requestResize(uint64_t requestedLimit, bool internal) { _resizeRequestTime))) { _metadata->lock(); uint64_t newLimit = - (requestedLimit > 0) ? requestedLimit : (_metadata->hardLimit() * 2); + (requestedLimit > 0) + ? requestedLimit + : (_lastResizeRequestStatus + ? (_metadata->hardLimit() * 2) + : (static_cast( + static_cast(_metadata->hardLimit()) * + 1.25))); _metadata->unlock(); auto result = _manager->requestResize(_metadata, newLimit); _resizeRequestTime = result.second; @@ -384,6 +392,11 @@ void Cache::beginShutdown() { void Cache::shutdown() { _state.lock(); + _metadata->lock(); + auto handle = _metadata->cache(); // hold onto self-reference to prevent + // pre-mature shared_ptr destruction + TRI_ASSERT(handle.get() == this); + _metadata->unlock(); if (!_state.isSet(State::Flag::shutdown)) { if (!_state.isSet(State::Flag::shuttingDown)) { _state.toggleFlag(State::Flag::shuttingDown); diff --git a/arangod/Cache/Cache.h b/arangod/Cache/Cache.h index 57676ed6a4..8d1511f7db 100644 --- a/arangod/Cache/Cache.h +++ b/arangod/Cache/Cache.h @@ -39,13 +39,27 @@ namespace arangodb { namespace cache { +class PlainCache; // forward declaration +class TransactionalCache; // forward declaration + //////////////////////////////////////////////////////////////////////////////// /// @brief The common structure of all caches managed by Manager. /// /// Any pure virtual functions are documented in derived classes implementing /// them. //////////////////////////////////////////////////////////////////////////////// -class Cache { +class Cache : public std::enable_shared_from_this { + protected: + ////////////////////////////////////////////////////////////////////////////// + /// @brief A dummy class to restrict constructor access. + ////////////////////////////////////////////////////////////////////////////// + class ConstructionGuard { + private: + ConstructionGuard(); + friend class PlainCache; + friend class TransactionalCache; + }; + public: typedef FrequencyBuffer StatBuffer; @@ -93,10 +107,16 @@ class Cache { }; public: + Cache(ConstructionGuard guard, Manager* manager, + Manager::MetadataItr metadata, bool allowGrowth, + bool enableWindowedStats); + virtual ~Cache() = default; + // primary functionality; documented in derived classes virtual Finding find(void const* key, uint32_t keySize) = 0; virtual bool insert(CachedValue* value) = 0; virtual bool remove(void const* key, uint32_t keySize) = 0; + virtual bool blacklist(void const* key, uint32_t keySize) = 0; ////////////////////////////////////////////////////////////////////////////// /// @brief Returns the limit on memory usage for this cache in bytes. @@ -157,8 +177,10 @@ class Cache { insertEviction = 3, insertNoEviction = 4 }; + static uint64_t _evictionStatsCapacity; StatBuffer _evictionStats; std::atomic _insertionCount; + static uint64_t _findStatsCapacity; bool _enableWindowedStats; std::unique_ptr _findStats; std::atomic _findHits; @@ -174,6 +196,7 @@ class Cache { // times to wait until requesting is allowed again Manager::time_point _migrateRequestTime; Manager::time_point _resizeRequestTime; + bool _lastResizeRequestStatus; // friend class manager and tasks friend class FreeMemoryTask; @@ -184,12 +207,6 @@ class Cache { // shutdown cache and let its memory be reclaimed static void destroy(std::shared_ptr cache); - Cache(Manager* manager, uint64_t requestedLimit, bool allowGrowth, - bool enableWindowedStats, std::function deleter, - uint64_t size); - - virtual ~Cache() = default; - bool isOperational() const; void startOperation(); void endOperation(); diff --git a/arangod/Cache/FrequencyBuffer.h b/arangod/Cache/FrequencyBuffer.h index e7cb748ea3..66c0cca8e8 100644 --- a/arangod/Cache/FrequencyBuffer.h +++ b/arangod/Cache/FrequencyBuffer.h @@ -46,7 +46,8 @@ namespace cache { /// which over-writes itself after it fills up (thus only maintaining a recent /// window on the records). //////////////////////////////////////////////////////////////////////////////// -template +template , + class Hasher = std::hash> class FrequencyBuffer { public: typedef std::vector> stats_t; @@ -55,43 +56,54 @@ class FrequencyBuffer { std::atomic _current; uint64_t _capacity; uint64_t _mask; - std::unique_ptr _buffer; + std::unique_ptr> _buffer; + Comparator _cmp; + T _empty; public: ////////////////////////////////////////////////////////////////////////////// /// @brief Initialize with the given capacity. ////////////////////////////////////////////////////////////////////////////// - FrequencyBuffer(uint64_t capacity) : _current(0) { - size_t i = 0; - for (; (1ULL << i) < capacity; i++) { + FrequencyBuffer(uint64_t capacity) : _current(0), _cmp(), _empty() { + uint64_t i = 0; + for (; (static_cast(1) << i) < capacity; i++) { } - _capacity = (1ULL << i); + _capacity = (static_cast(1) << i); _mask = _capacity - 1; - _buffer.reset(new T[_capacity]()); + _buffer.reset(new std::vector(_capacity)); + TRI_ASSERT(_buffer->capacity() == _capacity); + TRI_ASSERT(_buffer->size() == _capacity); + } + + ////////////////////////////////////////////////////////////////////////////// + /// @brief Reports the hidden allocation size (not captured by sizeof). + ////////////////////////////////////////////////////////////////////////////// + static uint64_t allocationSize(uint64_t capacity) { + return sizeof(std::vector) + (capacity * sizeof(T)); } ////////////////////////////////////////////////////////////////////////////// /// @brief Reports the memory usage in bytes. ////////////////////////////////////////////////////////////////////////////// uint64_t memoryUsage() { - return ((_capacity * sizeof(T)) + sizeof(FrequencyBuffer)); + return ((_capacity * sizeof(T)) + sizeof(FrequencyBuffer) + + sizeof(std::vector)); } ////////////////////////////////////////////////////////////////////////////// /// @brief Insert an individual event record. ////////////////////////////////////////////////////////////////////////////// - void insertRecord(T const& record) { - ++_current; - _buffer[_current & _mask] = record; + void insertRecord(T record) { + (*_buffer)[_current++ & _mask] = record; } ////////////////////////////////////////////////////////////////////////////// /// @brief Remove all occurrences of the specified event record. ////////////////////////////////////////////////////////////////////////////// - void purgeRecord(T const& record) { + void purgeRecord(T record) { for (size_t i = 0; i < _capacity; i++) { - if (_buffer[i] == record) { - _buffer[i] = T(); + if (_cmp((*_buffer)[i], record)) { + (*_buffer)[i] = _empty; } } } @@ -102,10 +114,10 @@ class FrequencyBuffer { ////////////////////////////////////////////////////////////////////////////// std::shared_ptr getFrequencies() const { // calculate frequencies - std::unordered_map frequencies; + std::unordered_map frequencies; for (size_t i = 0; i < _capacity; i++) { - T entry = _buffer[i]; - if (entry != T()) { + T const entry = (*_buffer)[i]; + if (!_cmp(entry, _empty)) { frequencies[entry]++; } } @@ -129,7 +141,7 @@ class FrequencyBuffer { ////////////////////////////////////////////////////////////////////////////// void clear() { for (size_t i = 0; i < _capacity; i++) { - _buffer[i] = T(); + (*_buffer)[i] = T(); } } }; diff --git a/arangod/Cache/Manager.cpp b/arangod/Cache/Manager.cpp index b986c364e2..1e18aff2c4 100644 --- a/arangod/Cache/Manager.cpp +++ b/arangod/Cache/Manager.cpp @@ -31,6 +31,7 @@ #include "Cache/Metadata.h" #include "Cache/PlainCache.h" #include "Cache/State.h" +#include "Cache/Transaction.h" #include "Cache/TransactionalCache.h" #include @@ -57,6 +58,17 @@ static constexpr uint64_t CACHE_RECORD_OVERHEAD = sizeof(Metadata) + 16; static constexpr uint64_t TABLE_LISTS_OVERHEAD = 32 * 16 * 8; static constexpr int64_t TRIES_FAST = 100; +bool Manager::cmp_weak_ptr::operator()( + std::weak_ptr const& left, std::weak_ptr const& right) const { + return !left.owner_before(right) && !right.owner_before(left); +} + +size_t Manager::hash_weak_ptr::operator()( + const std::weak_ptr& wp) const { + auto sp = wp.lock(); + return std::hash()(sp); +} + Manager::Manager(boost::asio::io_service* ioService, uint64_t globalLimit, bool enableWindowedStats) : _state(), @@ -98,21 +110,38 @@ std::shared_ptr Manager::createCache(Manager::CacheType type, std::shared_ptr result(nullptr); _state.lock(); bool allowed = isOperational(); + MetadataItr metadata = _caches.end(); _state.unlock(); + if (allowed) { + uint64_t fixedSize = 0; + switch (type) { + case CacheType::Plain: + fixedSize = PlainCache::allocationSize(enableWindowedStats); + break; + case CacheType::Transactional: + fixedSize = TransactionalCache::allocationSize(enableWindowedStats); + break; + default: + break; + } + std::tie(allowed, metadata) = registerCache(requestedLimit, fixedSize); + } + if (allowed) { switch (type) { case CacheType::Plain: - result = PlainCache::create(this, requestedLimit, allowGrowth, + result = PlainCache::create(this, metadata, allowGrowth, enableWindowedStats); break; case CacheType::Transactional: - result = TransactionalCache::create(this, requestedLimit, allowGrowth, + result = TransactionalCache::create(this, metadata, allowGrowth, enableWindowedStats); break; default: break; } + metadata->link(result); } return result; @@ -197,11 +226,15 @@ uint64_t Manager::globalAllocation() { } std::pair Manager::globalHitRates() { + double lifetimeRate = std::nan(""); double windowedRate = std::nan(""); - uint64_t currentMisses = _findMisses.load(); + uint64_t currentHits = _findHits.load(); - double lifetimeRate = 100 * (static_cast(currentHits) / - static_cast(currentHits + currentMisses)); + uint64_t currentMisses = _findMisses.load(); + if (currentHits + currentMisses > 0) { + lifetimeRate = 100 * (static_cast(currentHits) / + static_cast(currentHits + currentMisses)); + } if (_enableWindowedStats && _findStats.get() != nullptr) { auto stats = _findStats->getFrequencies(); @@ -219,22 +252,25 @@ std::pair Manager::globalHitRates() { currentHits = (*stats)[1].second; currentMisses = (*stats)[0].second; } - windowedRate = 100 * (static_cast(currentHits) / - static_cast(currentHits + currentMisses)); + if (currentHits + currentMisses > 0) { + windowedRate = 100 * (static_cast(currentHits) / + static_cast(currentHits + currentMisses)); + } } } return std::pair(lifetimeRate, windowedRate); } -void Manager::startTransaction() { _transactions.start(); } +Transaction* Manager::beginTransaction(bool readOnly) { + return _transactions.begin(readOnly); +} -void Manager::endTransaction() { _transactions.end(); } +void Manager::endTransaction(Transaction* tx) { _transactions.end(tx); } -Manager::MetadataItr Manager::registerCache(Cache* cache, - uint64_t requestedLimit, - std::function deleter, - uint64_t fixedSize) { +std::pair Manager::registerCache( + uint64_t requestedLimit, uint64_t fixedSize) { + bool ok = true; uint32_t logSize = 0; uint32_t tableLogSize = MIN_TABLE_LOG_SIZE; for (; (1ULL << logSize) < requestedLimit; logSize++) { @@ -246,39 +282,42 @@ Manager::MetadataItr Manager::registerCache(Cache* cache, _state.lock(); if (!isOperational()) { - _state.unlock(); - throw std::bad_alloc(); + ok = false; } - while (logSize >= MIN_LOG_SIZE) { - uint64_t tableAllocation = - _tables[tableLogSize].empty() ? tableSize(tableLogSize) : 0; - if (increaseAllowed(grantedLimit + tableAllocation + CACHE_RECORD_OVERHEAD + - fixedSize)) { - break; + if (ok) { + while (logSize >= MIN_LOG_SIZE) { + uint64_t tableAllocation = + _tables[tableLogSize].empty() ? tableSize(tableLogSize) : 0; + if (increaseAllowed(grantedLimit + tableAllocation + + CACHE_RECORD_OVERHEAD + fixedSize)) { + break; + } + + grantedLimit >>= 1U; + logSize--; + if (tableLogSize > MIN_TABLE_LOG_SIZE) { + tableLogSize--; + } } - grantedLimit >>= 1U; - logSize--; - if (tableLogSize > MIN_TABLE_LOG_SIZE) { - tableLogSize--; + if (logSize < MIN_LOG_SIZE) { + ok = false; } } - if (logSize < MIN_LOG_SIZE) { - _state.unlock(); - throw std::bad_alloc(); + MetadataItr metadata = _caches.end(); + if (ok) { + _globalAllocation += (grantedLimit + CACHE_RECORD_OVERHEAD + fixedSize); + _caches.emplace_front(grantedLimit); + metadata = _caches.begin(); + metadata->lock(); + leaseTable(metadata, tableLogSize); + metadata->unlock(); } - - _globalAllocation += (grantedLimit + CACHE_RECORD_OVERHEAD + fixedSize); - _caches.emplace_front(std::shared_ptr(cache, deleter), grantedLimit); - MetadataItr metadata = _caches.begin(); - metadata->lock(); - leaseTable(metadata, tableLogSize); - metadata->unlock(); _state.unlock(); - return metadata; + return std::pair(ok, metadata); } void Manager::unregisterCache(Manager::MetadataItr& metadata) { @@ -302,7 +341,7 @@ void Manager::unregisterCache(Manager::MetadataItr& metadata) { std::pair Manager::requestResize( Manager::MetadataItr& metadata, uint64_t requestedLimit) { - Manager::time_point nextRequest = futureTime(30); + Manager::time_point nextRequest = futureTime(100); bool allowed = false; bool ok = _state.lock(TRIES_FAST); @@ -332,7 +371,7 @@ std::pair Manager::requestResize( std::pair Manager::requestMigrate( Manager::MetadataItr& metadata, uint32_t requestedLogSize) { - Manager::time_point nextRequest = futureTime(30); + Manager::time_point nextRequest = futureTime(100); bool allowed = false; bool ok = _state.lock(TRIES_FAST); @@ -361,9 +400,10 @@ std::pair Manager::requestMigrate( } void Manager::reportAccess(std::shared_ptr cache) { - if (((++_accessCounter) & 0x7FULL) == 0) { // record 1 in 128 - _accessStats.insertRecord(cache); - } + // if (((++_accessCounter) & static_cast(7)) == 0) { // record 1 in + // 8 + _accessStats.insertRecord(cache); + //} } void Manager::recordHitStat(Manager::Stat stat) { @@ -455,26 +495,35 @@ bool Manager::rebalance() { // allow background tasks if more than 7/8ths full bool allowTasks = - _globalAllocation > (_globalHardLimit - (_globalHardLimit >> 3)); + _globalAllocation > + static_cast(0.875 * static_cast(_globalHardLimit)); // be aggressive if more than 3/4ths full bool beAggressive = - _globalAllocation > (_globalHardLimit - (_globalHardLimit >> 2)); + _globalAllocation > + static_cast(0.75 * static_cast(_globalHardLimit)); - // aim for 1/4th with background tasks, 1/8th if no tasks but aggressive, no + // aim for 3/8th with background tasks, 1/4th if no tasks but aggressive, no // goal otherwise - uint64_t goal = beAggressive ? (allowTasks ? (_globalAllocation >> 2) - : (_globalAllocation >> 3)) - : 0; + uint64_t goal = + beAggressive + ? (allowTasks ? static_cast( + 0.375 * static_cast(_globalHardLimit)) + : static_cast( + 0.25 * static_cast(_globalHardLimit))) + : 0; - // get stats on cache access to prioritize freeing from less frequently used - // caches first, so more frequently used ones stay large - std::shared_ptr cacheList = priorityList(); + if (goal > 0) { + // get stats on cache access to prioritize freeing from less frequently used + // caches first, so more frequently used ones stay large + std::shared_ptr cacheList = priorityList(); - // just adjust limits - uint64_t reclaimed = resizeAllCaches(TaskEnvironment::rebalancing, cacheList, - allowTasks, beAggressive, goal); - _globalAllocation -= reclaimed; + // just adjust limits + uint64_t reclaimed = + resizeAllCaches(TaskEnvironment::rebalancing, cacheList, allowTasks, + beAggressive, goal); + _globalAllocation -= reclaimed; + } if (_rebalancingTasks.load() == 0) { _state.toggleFlag(State::Flag::rebalancing); @@ -518,8 +567,9 @@ void Manager::internalResize(uint64_t newGlobalLimit, bool firstAttempt) { cacheList = priorityList(); // first just adjust limits down to usage - uint64_t reclaimed = resizeAllCaches(TaskEnvironment::resizing, cacheList, true, - true, _globalAllocation - _globalSoftLimit); + uint64_t reclaimed = + resizeAllCaches(TaskEnvironment::resizing, cacheList, true, true, + _globalAllocation - _globalSoftLimit); _globalAllocation -= reclaimed; done = adjustGlobalLimitsIfAllowed(newGlobalLimit); } @@ -561,11 +611,10 @@ uint64_t Manager::resizeAllCaches(Manager::TaskEnvironment environment, if (aggressive) { newLimit = (noTasks ? metadata->usage() - : (std::min)(metadata->usage(), metadata->hardLimit() / 4)); - } else { - newLimit = - (noTasks ? (std::max)(metadata->usage(), metadata->hardLimit() / 2) : (std::min)(metadata->usage(), metadata->hardLimit() / 2)); + } else { + newLimit = (std::max)(metadata->usage(), + (metadata->hardLimit() + metadata->usage()) / 2); } newLimit = (std::max)(newLimit, MIN_CACHE_SIZE); @@ -761,9 +810,11 @@ std::shared_ptr Manager::priorityList() { // catalog accessed caches auto stats = _accessStats.getFrequencies(); - std::set accessed; + std::set> accessed; for (auto s : *stats) { - accessed.emplace(s.first.get()); + if (auto cache = s.first.lock()) { + accessed.emplace(cache); + } } // gather all unaccessed caches at beginning of list @@ -772,7 +823,7 @@ std::shared_ptr Manager::priorityList() { std::shared_ptr cache = m->cache(); m->unlock(); - auto found = accessed.find(cache.get()); + auto found = accessed.find(cache); if (found == accessed.end()) { list->emplace_back(cache); } @@ -780,13 +831,15 @@ std::shared_ptr Manager::priorityList() { // gather all accessed caches in order for (auto s : *stats) { - list->emplace_back(s.first); + if (auto cache = s.first.lock()) { + list->emplace_back(cache); + } } return list; } -Manager::time_point Manager::futureTime(uint64_t secondsFromNow) { +Manager::time_point Manager::futureTime(uint64_t millisecondsFromNow) { return (std::chrono::steady_clock::now() + - std::chrono::seconds(secondsFromNow)); + std::chrono::milliseconds(millisecondsFromNow)); } diff --git a/arangod/Cache/Manager.h b/arangod/Cache/Manager.h index ef2c177a32..a1fc824b0b 100644 --- a/arangod/Cache/Manager.h +++ b/arangod/Cache/Manager.h @@ -30,7 +30,8 @@ #include "Cache/FrequencyBuffer.h" #include "Cache/Metadata.h" #include "Cache/State.h" -#include "Cache/TransactionWindow.h" +#include "Cache/Transaction.h" +#include "Cache/TransactionManager.h" #include #include @@ -67,9 +68,19 @@ class Rebalancer; // forward declaration /// need a different instance. //////////////////////////////////////////////////////////////////////////////// class Manager { + protected: + struct cmp_weak_ptr { + bool operator()(std::weak_ptr const& left, + std::weak_ptr const& right) const; + }; + struct hash_weak_ptr { + size_t operator()(const std::weak_ptr& wp) const; + }; + public: static uint64_t MINIMUM_SIZE; - typedef FrequencyBuffer> AccessStatBuffer; + typedef FrequencyBuffer, cmp_weak_ptr, hash_weak_ptr> + AccessStatBuffer; typedef FrequencyBuffer FindStatBuffer; typedef std::vector> PriorityList; typedef std::chrono::time_point time_point; @@ -144,14 +155,18 @@ class Manager { std::pair globalHitRates(); ////////////////////////////////////////////////////////////////////////////// - /// @brief Signal the beginning of a transaction. + /// @brief Open a new transaction. + /// + /// The transaction is considered read-only if it is guaranteed not to write + /// to the backing store. A read-only transaction may, however, write to the + /// cache. ////////////////////////////////////////////////////////////////////////////// - void startTransaction(); + Transaction* beginTransaction(bool readOnly); ////////////////////////////////////////////////////////////////////////////// - /// @brief Signal the end of a transaction. + /// @brief Signal the end of a transaction. Deletes the passed Transaction. ////////////////////////////////////////////////////////////////////////////// - void endTransaction(); + void endTransaction(Transaction* tx); private: // simple state variable for locking and other purposes @@ -180,7 +195,7 @@ class Manager { uint64_t _globalAllocation; // transaction management - TransactionWindow _transactions; + TransactionManager _transactions; // task management enum TaskEnvironment { none, rebalancing, resizing }; @@ -200,9 +215,8 @@ class Manager { private: // used by caches // register and unregister individual caches - Manager::MetadataItr registerCache(Cache* cache, uint64_t requestedLimit, - std::function deleter, - uint64_t fixedSize); + std::pair registerCache(uint64_t requestedLimit, + uint64_t fixedSize); void unregisterCache(Manager::MetadataItr& metadata); // allow individual caches to request changes to their allocations @@ -259,7 +273,7 @@ class Manager { std::shared_ptr priorityList(); // helper for wait times - Manager::time_point futureTime(uint64_t secondsFromNow); + Manager::time_point futureTime(uint64_t millisecondsFromNow); }; }; // end namespace cache diff --git a/arangod/Cache/Metadata.cpp b/arangod/Cache/Metadata.cpp index f5dfed471f..8d4b9e75af 100644 --- a/arangod/Cache/Metadata.cpp +++ b/arangod/Cache/Metadata.cpp @@ -30,16 +30,15 @@ using namespace arangodb::cache; -Metadata::Metadata(std::shared_ptr cache, uint64_t limit, uint8_t* table, - uint32_t logSize) +Metadata::Metadata(uint64_t limit) : _state(), - _cache(cache), + _cache(nullptr), _usage(0), _softLimit(limit), _hardLimit(limit), - _table(table), + _table(nullptr), _auxiliaryTable(nullptr), - _logSize(logSize), + _logSize(0), _auxiliaryLogSize(0) {} Metadata::Metadata(Metadata const& other) @@ -53,6 +52,12 @@ Metadata::Metadata(Metadata const& other) _logSize(other._logSize), _auxiliaryLogSize(other._auxiliaryLogSize) {} +void Metadata::link(std::shared_ptr cache) { + lock(); + _cache = cache; + unlock(); +} + void Metadata::lock() { _state.lock(); } void Metadata::unlock() { diff --git a/arangod/Cache/Metadata.h b/arangod/Cache/Metadata.h index 0ad3814fc4..2e16ed4ae2 100644 --- a/arangod/Cache/Metadata.h +++ b/arangod/Cache/Metadata.h @@ -44,14 +44,18 @@ class Metadata { ////////////////////////////////////////////////////////////////////////////// /// @brief Initializes record with given information. ////////////////////////////////////////////////////////////////////////////// - Metadata(std::shared_ptr cache, uint64_t limit, - uint8_t* table = nullptr, uint32_t logSize = 0); + Metadata(uint64_t limit); ////////////////////////////////////////////////////////////////////////////// /// @brief Initializes record from an existing record. ////////////////////////////////////////////////////////////////////////////// Metadata(Metadata const& other); + ////////////////////////////////////////////////////////////////////////////// + /// @brief Links the metadata object to an actual cache. + ////////////////////////////////////////////////////////////////////////////// + void link(std::shared_ptr cache); + ////////////////////////////////////////////////////////////////////////////// /// @brief Locks the record. ////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/Cache/PlainBucket.cpp b/arangod/Cache/PlainBucket.cpp index 03faf08ab3..b7a65d7888 100644 --- a/arangod/Cache/PlainBucket.cpp +++ b/arangod/Cache/PlainBucket.cpp @@ -114,14 +114,14 @@ CachedValue* PlainBucket::remove(uint32_t hash, void const* key, return value; } -CachedValue* PlainBucket::evictionCandidate() const { +CachedValue* PlainBucket::evictionCandidate(bool ignoreRefCount) const { TRI_ASSERT(isLocked()); for (size_t i = 0; i < SLOTS_DATA; i++) { size_t slot = SLOTS_DATA - (i + 1); if (_cachedHashes[slot] == 0) { continue; } - if (_cachedData[slot]->isFreeable()) { + if (ignoreRefCount || _cachedData[slot]->isFreeable()) { return _cachedData[slot]; } } @@ -166,8 +166,6 @@ void PlainBucket::moveSlot(size_t slot, bool moveToFront) { _cachedData[i] = _cachedData[i + 1]; } } - if (i != slot) { - _cachedHashes[i] = hash; - _cachedData[i] = value; - } + _cachedHashes[i] = hash; + _cachedData[i] = value; } diff --git a/arangod/Cache/PlainBucket.h b/arangod/Cache/PlainBucket.h index 6ead5417a6..0fc90d1500 100644 --- a/arangod/Cache/PlainBucket.h +++ b/arangod/Cache/PlainBucket.h @@ -128,11 +128,12 @@ struct alignas(64) PlainBucket { /// @brief Searches for the best candidate in the bucket to evict. Requires /// state to be locked. /// - /// Returns a pointer to least recently used freeable value. If the bucket - /// contains no values or all have outstanding references, then it returns - /// nullptr. + /// Usually returns a pointer to least recently used freeable value. If the + /// bucket contains no values or all have outstanding references, then it + /// returns nullptr. In the case that ignoreRefCount is set to true, then it + /// simply returns the least recently used value, regardless of freeability. ////////////////////////////////////////////////////////////////////////////// - CachedValue* evictionCandidate() const; + CachedValue* evictionCandidate(bool ignoreRefCount = false) const; ////////////////////////////////////////////////////////////////////////////// /// @brief Evicts the given value from the bucket. Requires state to be diff --git a/arangod/Cache/PlainCache.cpp b/arangod/Cache/PlainCache.cpp index 82751db167..2061257184 100644 --- a/arangod/Cache/PlainCache.cpp +++ b/arangod/Cache/PlainCache.cpp @@ -71,36 +71,46 @@ bool PlainCache::insert(CachedValue* value) { std::tie(ok, bucket) = getBucket(hash, TRIES_FAST); if (ok) { + bool allowed = true; + bool eviction = false; int64_t change = value->size(); CachedValue* candidate = bucket->find(hash, value->key(), value->keySize); if (candidate == nullptr && bucket->isFull()) { candidate = bucket->evictionCandidate(); + if (candidate == nullptr) { + allowed = false; + } else { + eviction = true; + } } - if (candidate != nullptr) { - change -= candidate->size(); - } - - _metadata->lock(); - bool allowed = _metadata->adjustUsageIfAllowed(change); - _metadata->unlock(); if (allowed) { if (candidate != nullptr) { - bucket->evict(candidate, true); - freeValue(candidate); - recordStat(Stat::insertEviction); - } else { - recordStat(Stat::insertNoEviction); + change -= candidate->size(); + } + + _metadata->lock(); + allowed = _metadata->adjustUsageIfAllowed(change); + _metadata->unlock(); + + if (allowed) { + if (candidate != nullptr) { + bucket->evict(candidate, true); + freeValue(candidate); + } + recordStat(eviction ? Stat::insertEviction : Stat::insertNoEviction); + bucket->insert(hash, value); + inserted = true; + } else { + requestResize(); // let function do the hard work } - bucket->insert(hash, value); - inserted = true; - } else { - requestResize(); // let function do the hard work } bucket->unlock(); - requestMigrate(); // let function do the hard work + if (inserted) { + requestMigrate(); // let function do the hard work + } endOperation(); } @@ -138,29 +148,29 @@ bool PlainCache::remove(void const* key, uint32_t keySize) { return removed; } -std::shared_ptr PlainCache::create(Manager* manager, - uint64_t requestedSize, - bool allowGrowth, - bool enableWindowedStats) { - PlainCache* cache = - new PlainCache(manager, requestedSize, allowGrowth, enableWindowedStats); +bool PlainCache::blacklist(void const* key, uint32_t keySize) { return false; } - if (cache == nullptr) { - return std::shared_ptr(nullptr); - } - - cache->metadata()->lock(); - std::shared_ptr result = cache->metadata()->cache(); - cache->metadata()->unlock(); - - return result; +uint64_t PlainCache::allocationSize(bool enableWindowedStats) { + return sizeof(PlainCache) + + StatBuffer::allocationSize(_evictionStatsCapacity) + + (enableWindowedStats ? (sizeof(StatBuffer) + + StatBuffer::allocationSize(_findStatsCapacity)) + : 0); } -PlainCache::PlainCache(Manager* manager, uint64_t requestedLimit, - bool allowGrowth, bool enableWindowedStats) - : Cache(manager, requestedLimit, allowGrowth, enableWindowedStats, - [](Cache* p) -> void { delete reinterpret_cast(p); }, - sizeof(PlainCache)), +std::shared_ptr PlainCache::create(Manager* manager, + Manager::MetadataItr metadata, + bool allowGrowth, + bool enableWindowedStats) { + return std::make_shared(Cache::ConstructionGuard(), manager, + metadata, allowGrowth, + enableWindowedStats); +} + +PlainCache::PlainCache(Cache::ConstructionGuard guard, Manager* manager, + Manager::MetadataItr metadata, bool allowGrowth, + bool enableWindowedStats) + : Cache(guard, manager, metadata, allowGrowth, enableWindowedStats), _table(nullptr), _logSize(0), _tableSize(1), @@ -186,7 +196,7 @@ PlainCache::PlainCache(Manager* manager, uint64_t requestedLimit, PlainCache::~PlainCache() { _state.lock(); - if (isOperational()) { + if (!_state.isSet(State::Flag::shutdown)) { _state.unlock(); shutdown(); } @@ -295,8 +305,8 @@ bool PlainCache::migrate() { for (size_t j = 0; j < PlainBucket::SLOTS_DATA; j++) { size_t k = PlainBucket::SLOTS_DATA - (j + 1); if ((*bucket)._cachedHashes[k] != 0) { - uint32_t hash = (*bucket)._cachedHashes[k]; - CachedValue* value = (*bucket)._cachedData[k]; + uint32_t hash = bucket->_cachedHashes[k]; + CachedValue* value = bucket->_cachedData[k]; uint32_t targetIndex = (hash & _auxiliaryBucketMask) >> _auxiliaryMaskShift; @@ -316,11 +326,13 @@ bool PlainCache::migrate() { if (haveSpace) { targetBucket->insert(hash, value); } else { + uint64_t size = value->size(); freeValue(value); + reclaimMemory(size); } - (*bucket)._cachedHashes[k] = 0; - (*bucket)._cachedData[k] = nullptr; + bucket->_cachedHashes[k] = 0; + bucket->_cachedData[k] = nullptr; } } @@ -418,15 +430,12 @@ void PlainCache::clearTable(PlainBucket* table, uint64_t tableSize) { for (uint64_t i = 0; i < tableSize; i++) { PlainBucket* bucket = &(table[i]); bucket->lock(-1LL); - CachedValue* value = bucket->evictionCandidate(); - while (value != nullptr) { - bucket->evict(value); - _metadata->lock(); - _metadata->adjustUsageIfAllowed(-static_cast(value->size())); - _metadata->unlock(); - freeValue(value); - - value = bucket->evictionCandidate(); + for (size_t j = 0; j < PlainBucket::SLOTS_DATA; j++) { + if (bucket->_cachedData[j] != nullptr) { + uint64_t size = bucket->_cachedData[j]->size(); + freeValue(bucket->_cachedData[j]); + reclaimMemory(size); + } } bucket->clear(); } diff --git a/arangod/Cache/PlainCache.h b/arangod/Cache/PlainCache.h index dddff9ae3d..78bad48e3d 100644 --- a/arangod/Cache/PlainCache.h +++ b/arangod/Cache/PlainCache.h @@ -42,8 +42,6 @@ namespace arangodb { namespace cache { -class Manager; // forward declaration - //////////////////////////////////////////////////////////////////////////////// /// @brief A simple, LRU-ish cache. /// @@ -53,6 +51,11 @@ class Manager; // forward declaration //////////////////////////////////////////////////////////////////////////////// class PlainCache final : public Cache { public: + PlainCache(Cache::ConstructionGuard guard, Manager* manager, + Manager::MetadataItr metadata, bool allowGrowth, + bool enableWindowedStats); + ~PlainCache(); + PlainCache() = delete; PlainCache(PlainCache const&) = delete; PlainCache& operator=(PlainCache const&) = delete; @@ -86,6 +89,11 @@ class PlainCache final : public Cache { ////////////////////////////////////////////////////////////////////////////// bool remove(void const* key, uint32_t keySize); + ////////////////////////////////////////////////////////////////////////////// + /// @brief Does nothing; convenience method inheritance compliance + ////////////////////////////////////////////////////////////////////////////// + bool blacklist(void const* key, uint32_t keySize); + private: // main table info PlainBucket* _table; @@ -107,15 +115,11 @@ class PlainCache final : public Cache { friend class MigrateTask; private: - // creator -- do not use constructor explicitly - static std::shared_ptr create(Manager* manager, uint64_t requestedSize, + static uint64_t allocationSize(bool enableWindowedStats); + static std::shared_ptr create(Manager* manager, + Manager::MetadataItr metadata, bool allowGrowth, bool enableWindowedStats); - - PlainCache(Manager* manager, uint64_t requestedLimit, bool allowGrowth, - bool enableWindowedStats); - ~PlainCache(); - // management bool freeMemory(); bool migrate(); diff --git a/arangod/Cache/TransactionWindow.cpp b/arangod/Cache/Transaction.cpp similarity index 75% rename from arangod/Cache/TransactionWindow.cpp rename to arangod/Cache/Transaction.cpp index 72ccf9a847..82d171de34 100644 --- a/arangod/Cache/TransactionWindow.cpp +++ b/arangod/Cache/Transaction.cpp @@ -21,25 +21,13 @@ /// @author Daniel H. Larkin //////////////////////////////////////////////////////////////////////////////// -#include "Cache/TransactionWindow.h" +#include "Cache/Transaction.h" #include -#include using namespace arangodb::cache; -TransactionWindow::TransactionWindow() : _open(0), _term(0) {} +Transaction::Transaction() : term(0), readOnly(true), sensitive(false) {} -void TransactionWindow::start() { - if (++_open == 1) { - _term++; - } -} - -void TransactionWindow::end() { - if (--_open == 0) { - _term++; - } -} - -uint64_t TransactionWindow::term() { return _term.load(); } +Transaction::Transaction(bool ro) + : term(0), readOnly(ro), sensitive(!readOnly) {} diff --git a/arangod/Cache/Transaction.h b/arangod/Cache/Transaction.h new file mode 100644 index 0000000000..8a3502257f --- /dev/null +++ b/arangod/Cache/Transaction.h @@ -0,0 +1,47 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2014-2017 ArangoDB GmbH, Cologne, Germany +/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Daniel H. Larkin +//////////////////////////////////////////////////////////////////////////////// + +#ifndef ARANGODB_CACHE_TRANSACTION_H +#define ARANGODB_CACHE_TRANSACTION_H + +#include "Basics/Common.h" + +#include +namespace arangodb { +namespace cache { + +//////////////////////////////////////////////////////////////////////////////// +/// @brief Structure to maintain information about an individual transaction. +struct Transaction { + uint64_t term; + bool readOnly; + bool sensitive; + + Transaction(); + Transaction(bool ro); +}; + +}; // end namespace cache +}; // end namespace arangodb + +#endif diff --git a/arangod/Cache/TransactionManager.cpp b/arangod/Cache/TransactionManager.cpp new file mode 100644 index 0000000000..9c78c2928f --- /dev/null +++ b/arangod/Cache/TransactionManager.cpp @@ -0,0 +1,83 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2014-2017 ArangoDB GmbH, Cologne, Germany +/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Daniel H. Larkin +//////////////////////////////////////////////////////////////////////////////// + +#include "Cache/TransactionManager.h" +#include "Cache/State.h" +#include "Cache/Transaction.h" + +#include +#include + +using namespace arangodb::cache; + +TransactionManager::TransactionManager() + : _state(), _openReads(0), _openSensitive(0), _openWrites(0), _term(0) {} + +Transaction* TransactionManager::begin(bool readOnly) { + _state.lock(); + Transaction* tx = new Transaction(readOnly); + + if (readOnly) { + _openReads++; + if (_openWrites.load() > 0) { + tx->sensitive = true; + _openSensitive++; + } + } else { + tx->sensitive = true; + _openWrites++; + if (++_openSensitive == 1) { + _term++; + _openSensitive += _openReads.load(); + } + } + tx->term = _term; + _state.unlock(); + + return tx; +} + +void TransactionManager::end(Transaction* tx) { + TRI_ASSERT(tx != nullptr); + _state.lock(); + // if currently in sensitive phase, and transaction term is old, it was + // upgraded to sensitive status + if (((_term & static_cast(1)) > 0) && (_term > tx->term)) { + tx->sensitive = true; + } + + if (tx->readOnly) { + _openReads--; + } else { + _openWrites--; + } + + if (tx->sensitive && (--_openSensitive == 0)) { + _term++; + } + + _state.unlock(); + delete tx; +} + +uint64_t TransactionManager::term() { return _term.load(); } diff --git a/arangod/Cache/TransactionWindow.h b/arangod/Cache/TransactionManager.h similarity index 69% rename from arangod/Cache/TransactionWindow.h rename to arangod/Cache/TransactionManager.h index 085fc5da85..f2692a9992 100644 --- a/arangod/Cache/TransactionWindow.h +++ b/arangod/Cache/TransactionManager.h @@ -25,6 +25,8 @@ #define ARANGODB_CACHE_TRANSACTION_WINDOW_H #include "Basics/Common.h" +#include "Cache/State.h" +#include "Cache/Transaction.h" #include #include @@ -33,28 +35,35 @@ namespace arangodb { namespace cache { //////////////////////////////////////////////////////////////////////////////// -/// @brief Manage windows in time when there are either no ongoing transactions -/// or some. +/// @brief Manage global cache transactions. /// /// Allows clients to start a transaction, end a transaction, and query an -/// identifier for the current window. +/// identifier for the current window. If the identifier is even, there are no +/// ongoing sensitive transactions, and it is safe to store any values retrieved +/// from the backing store to transactional caches. If the identifier is odd, +/// then some values may be blacklisted by transactional caches (if they have +/// been written to the backing store in the current window). //////////////////////////////////////////////////////////////////////////////// -class TransactionWindow { +class TransactionManager { public: ////////////////////////////////////////////////////////////////////////////// /// @brief Initialize state with no open transactions. ////////////////////////////////////////////////////////////////////////////// - TransactionWindow(); + TransactionManager(); ////////////////////////////////////////////////////////////////////////////// - /// @brief Signal the beginning of a transaction. + /// @brief Open a new transaction. + /// + /// The transaction is considered read-only if it is guaranteed not to write + /// to the backing store. A read-only transaction may, however, write to the + /// cache. ////////////////////////////////////////////////////////////////////////////// - void start(); + Transaction* begin(bool readOnly); ////////////////////////////////////////////////////////////////////////////// - /// @brief Signal the end of a transaction. + /// @brief Signal the end of a transaction. Deletes the passed Transaction. ////////////////////////////////////////////////////////////////////////////// - void end(); + void end(Transaction* tx); ////////////////////////////////////////////////////////////////////////////// /// @brief Return the current window identifier. @@ -62,7 +71,10 @@ class TransactionWindow { uint64_t term(); private: - std::atomic _open; + State _state; + std::atomic _openReads; + std::atomic _openSensitive; + std::atomic _openWrites; std::atomic _term; }; diff --git a/arangod/Cache/TransactionalBucket.cpp b/arangod/Cache/TransactionalBucket.cpp index 8b1d089ba5..d35f4d232c 100644 --- a/arangod/Cache/TransactionalBucket.cpp +++ b/arangod/Cache/TransactionalBucket.cpp @@ -35,12 +35,14 @@ size_t TransactionalBucket::SLOTS_DATA = 3; size_t TransactionalBucket::SLOTS_BLACKLIST = 4; TransactionalBucket::TransactionalBucket() { - memset(this, 0, sizeof(TransactionalBucket)); + _state.lock(); + clear(); } bool TransactionalBucket::lock(uint64_t transactionTerm, int64_t maxTries) { - return _state.lock(maxTries, - [&]() -> void { updateBlacklistTerm(transactionTerm); }); + return _state.lock(maxTries, [this, transactionTerm]() -> void { + updateBlacklistTerm(transactionTerm); + }); } void TransactionalBucket::unlock() { @@ -52,12 +54,12 @@ bool TransactionalBucket::isLocked() const { return _state.isLocked(); } bool TransactionalBucket::isMigrated() const { TRI_ASSERT(isLocked()); - return _state.isSet(State::Flag::blacklisted); + return _state.isSet(State::Flag::migrated); } bool TransactionalBucket::isFullyBlacklisted() const { TRI_ASSERT(isLocked()); - return _state.isSet(State::Flag::blacklisted); + return (haveOpenTransaction() && _state.isSet(State::Flag::blacklisted)); } bool TransactionalBucket::isFull() const { @@ -125,30 +127,39 @@ CachedValue* TransactionalBucket::remove(uint32_t hash, void const* key, return value; } -void TransactionalBucket::blacklist(uint32_t hash, void const* key, - uint32_t keySize) { +CachedValue* TransactionalBucket::blacklist(uint32_t hash, void const* key, + uint32_t keySize) { TRI_ASSERT(isLocked()); - // remove key if it's here - remove(hash, key, keySize); + if (!haveOpenTransaction()) { + return nullptr; + } - if (isFullyBlacklisted()) { - return; + // remove key if it's here + CachedValue* value = (keySize == 0) ? nullptr : remove(hash, key, keySize); + + if (isBlacklisted(hash)) { + return value; } for (size_t i = 0; i < SLOTS_BLACKLIST; i++) { if (_blacklistHashes[i] == 0) { // found an empty slot _blacklistHashes[i] = hash; - return; + return value; } } // no empty slot found, fully blacklist _state.toggleFlag(State::Flag::blacklisted); + return value; } bool TransactionalBucket::isBlacklisted(uint32_t hash) const { TRI_ASSERT(isLocked()); + if (!haveOpenTransaction()) { + return false; + } + if (isFullyBlacklisted()) { return true; } @@ -164,14 +175,14 @@ bool TransactionalBucket::isBlacklisted(uint32_t hash) const { return blacklisted; } -CachedValue* TransactionalBucket::evictionCandidate() const { +CachedValue* TransactionalBucket::evictionCandidate(bool ignoreRefCount) const { TRI_ASSERT(isLocked()); for (size_t i = 0; i < SLOTS_DATA; i++) { size_t slot = SLOTS_DATA - (i + 1); if (_cachedHashes[slot] == 0) { continue; } - if (_cachedData[slot]->isFreeable()) { + if (ignoreRefCount || _cachedData[slot]->isFreeable()) { return _cachedData[slot]; } } @@ -193,6 +204,11 @@ void TransactionalBucket::evict(CachedValue* value, bool optimizeForInsertion) { } } +void TransactionalBucket::clear() { + TRI_ASSERT(isLocked()); + memset(this, 0, sizeof(TransactionalBucket)); +} + void TransactionalBucket::updateBlacklistTerm(uint64_t term) { if (term > _blacklistTerm) { _blacklistTerm = term; @@ -206,6 +222,7 @@ void TransactionalBucket::updateBlacklistTerm(uint64_t term) { } void TransactionalBucket::moveSlot(size_t slot, bool moveToFront) { + TRI_ASSERT(isLocked()); uint32_t hash = _cachedHashes[slot]; CachedValue* value = _cachedData[slot]; size_t i = slot; @@ -222,8 +239,12 @@ void TransactionalBucket::moveSlot(size_t slot, bool moveToFront) { _cachedData[i] = _cachedData[i + 1]; } } - if (i != slot) { - _cachedHashes[i] = hash; - _cachedData[i] = value; - } + _cachedHashes[i] = hash; + _cachedData[i] = value; +} + +bool TransactionalBucket::haveOpenTransaction() const { + TRI_ASSERT(isLocked()); + // only have open transactions if term is odd + return ((_blacklistTerm & 1ULL) > 0); } diff --git a/arangod/Cache/TransactionalBucket.h b/arangod/Cache/TransactionalBucket.h index c646611994..face1691d0 100644 --- a/arangod/Cache/TransactionalBucket.h +++ b/arangod/Cache/TransactionalBucket.h @@ -34,6 +34,16 @@ namespace arangodb { namespace cache { +//////////////////////////////////////////////////////////////////////////////// +/// @brief Bucket structure for TransactionalCache. +/// +/// Contains, a State variable, three slots each for hashes and data pointers, +/// four slots for blacklisted hashes, and the applicable transaction term. Most +/// querying and manipulation can be handled via the exposed methods. Bucket +/// must be locked before doing anything else to ensure proper synchronization. +/// Data entries are carefully laid out to ensure the structure fits in a single +/// cacheline. +//////////////////////////////////////////////////////////////////////////////// struct alignas(64) TransactionalBucket { State _state; @@ -52,33 +62,136 @@ struct alignas(64) TransactionalBucket { uint32_t _padding[3]; #endif + ////////////////////////////////////////////////////////////////////////////// + /// @brief Initialize an empty bucket. + ////////////////////////////////////////////////////////////////////////////// TransactionalBucket(); - // must lock before using any other operations + ////////////////////////////////////////////////////////////////////////////// + /// @brief Attempt to lock bucket (failing after maxTries attempts). + /// + /// If the bucket is successfully locked, the transaction term is updated. + ////////////////////////////////////////////////////////////////////////////// bool lock(uint64_t transactionTerm, int64_t maxTries); + + ////////////////////////////////////////////////////////////////////////////// + /// @brief Unlock the bucket. Requires bucket to be locked. + ////////////////////////////////////////////////////////////////////////////// void unlock(); - // state checkers + ////////////////////////////////////////////////////////////////////////////// + /// @brief Checks whether the bucket is locked. + ////////////////////////////////////////////////////////////////////////////// bool isLocked() const; + + ////////////////////////////////////////////////////////////////////////////// + /// @brief Checks whether the bucket has been migrated. Requires state to be + /// locked. + ////////////////////////////////////////////////////////////////////////////// bool isMigrated() const; + + ////////////////////////////////////////////////////////////////////////////// + /// @brief Checks whether bucket has been fully blacklisted. Requires state to + /// be locked. + ////////////////////////////////////////////////////////////////////////////// bool isFullyBlacklisted() const; + + ////////////////////////////////////////////////////////////////////////////// + /// @brief Checks whether bucket is full. Requires state to be locked. + ////////////////////////////////////////////////////////////////////////////// bool isFull() const; - // primary functions + ////////////////////////////////////////////////////////////////////////////// + /// @brief Looks up a given key and returns associated value. Requires state + /// to be locked. + /// + /// Takes an input hash and key (specified by pointer and size), and searches + /// the bucket for a matching entry. If a matching entry is found, it is + /// returned. By default, a matching entry will be moved to the front of the + /// bucket to allow basic LRU semantics. If no matching entry is found, + /// nothing will be changed and a nullptr will be returned. + ////////////////////////////////////////////////////////////////////////////// CachedValue* find(uint32_t hash, void const* key, uint32_t keySize, bool moveToFront = true); - void insert(uint32_t hash, CachedValue* value); - CachedValue* remove(uint32_t hash, void const* key, uint32_t keySize); - void blacklist(uint32_t hash, void const* key, uint32_t keySize); - // auxiliary functions + ////////////////////////////////////////////////////////////////////////////// + /// @brief Inserts a given value if it is not blacklisted. Requires state to + /// be locked. + /// + /// Requires that the bucket is not full and does not already contain an item + /// with the same key. If it is full, the item will not be inserted. If an + /// item with the same key exists, this is not detected but it is likely to + /// produce bugs later on down the line. If the item's hash has been + /// blacklisted, or the bucket is fully blacklisted, insertion will simply do + /// nothing. When inserting, the item is put into the first empty slot, then + /// moved to the front. If attempting to insert and the bucket is full, the + /// user should evict an item and specify the optimizeForInsertion flag to be + /// true. + ////////////////////////////////////////////////////////////////////////////// + void insert(uint32_t hash, CachedValue* value); + + ////////////////////////////////////////////////////////////////////////////// + /// @brief Removes an item with the given key if one exists. Requires state to + /// be locked. + /// + /// Search for a matching key. If none exists, do nothing and return a + /// nullptr. If one exists, remove it from the bucket and return the pointer + /// to the value. Upon removal, the empty slot generated is moved to the back + /// of the bucket (to remove the gap). + ////////////////////////////////////////////////////////////////////////////// + CachedValue* remove(uint32_t hash, void const* key, uint32_t keySize); + + ////////////////////////////////////////////////////////////////////////////// + /// @brief Blacklists a key and removes it if it exists. Requires state to + /// be locked. + /// + /// Search for a matching key. If one exists, remove it. Then blacklist the + /// hash associated with the key. If there are no empty blacklist slots, fully + /// blacklist the bucket. + ////////////////////////////////////////////////////////////////////////////// + CachedValue* blacklist(uint32_t hash, void const* key, uint32_t keySize); + + ////////////////////////////////////////////////////////////////////////////// + /// @brief Checks whether a given hash is blacklisted. Requires state to be + /// locked. + ////////////////////////////////////////////////////////////////////////////// bool isBlacklisted(uint32_t hash) const; - CachedValue* evictionCandidate() const; - void evict(CachedValue* value, bool optimizeForInsertion); + + ////////////////////////////////////////////////////////////////////////////// + /// @brief Searches for the best candidate in the bucket to evict. Requires + /// state to be locked. + /// + /// Usually returns a pointer to least recently used freeable value. If the + /// bucket contains no values or all have outstanding references, then it + /// returns nullptr. In the case that ignoreRefCount is set to true, then it + /// simply returns the least recently used value, regardless of freeability. + ////////////////////////////////////////////////////////////////////////////// + CachedValue* evictionCandidate(bool ignoreRefCount = false) const; + + ////////////////////////////////////////////////////////////////////////////// + /// @brief Evicts the given value from the bucket. Requires state to be + /// locked. + /// + /// By default, it will move the empty slot to the back of the bucket. If + /// preparing an empty slot for insertion, specify the second parameter to be + /// true. This will move the empty slot to the front instead. + ////////////////////////////////////////////////////////////////////////////// + void evict(CachedValue* value, bool optimizeForInsertion = false); + + ////////////////////////////////////////////////////////////////////////////// + /// @brief Updates the bucket's blacklist term. Requires state to be locked. + ////////////////////////////////////////////////////////////////////////////// + void updateBlacklistTerm(uint64_t term); + + ////////////////////////////////////////////////////////////////////////////// + /// @brief Reinitializes a bucket to be completely empty and unlocked. + /// Requires state to be locked. + ////////////////////////////////////////////////////////////////////////////// + void clear(); private: - void updateBlacklistTerm(uint64_t term); void moveSlot(size_t slot, bool moveToFront); + bool haveOpenTransaction() const; }; }; // end namespace cache diff --git a/arangod/Cache/TransactionalCache.cpp b/arangod/Cache/TransactionalCache.cpp index ac019aa5a6..f0e348a475 100644 --- a/arangod/Cache/TransactionalCache.cpp +++ b/arangod/Cache/TransactionalCache.cpp @@ -27,7 +27,9 @@ #include "Cache/CachedValue.h" #include "Cache/FrequencyBuffer.h" #include "Cache/Metadata.h" +#include "Cache/State.h" #include "Cache/TransactionalBucket.h" +#include "Random/RandomGenerator.h" #include #include @@ -36,69 +38,485 @@ using namespace arangodb::cache; +static constexpr int64_t TRIES_FAST = 50LL; +static constexpr int64_t TRIES_SLOW = 10000LL; +static constexpr int64_t TRIES_GUARANTEE = -1LL; + Cache::Finding TransactionalCache::find(void const* key, uint32_t keySize) { - // TODO: implement this; - return Cache::Finding(nullptr); -} + TRI_ASSERT(key != nullptr); + Finding result(nullptr); + uint32_t hash = hashKey(key, keySize); -bool TransactionalCache::insert(CachedValue* value) { - // TODO: implement this - return false; -} + bool ok; + TransactionalBucket* bucket; + std::tie(ok, bucket) = getBucket(hash, TRIES_FAST); -bool TransactionalCache::remove(void const* key, uint32_t keySize) { - // TODO: implement this - return false; -} - -void TransactionalCache::blackList(void const* key, uint32_t keySize) { - // TODO: implement this -} - -std::shared_ptr TransactionalCache::create(Manager* manager, - uint64_t requestedSize, - bool allowGrowth, - bool enableWindowedStats) { - TransactionalCache* cache = new TransactionalCache( - manager, requestedSize, allowGrowth, enableWindowedStats); - - if (cache == nullptr) { - return std::shared_ptr(nullptr); + if (ok) { + result.reset(bucket->find(hash, key, keySize)); + recordStat(result.found() ? Stat::findHit : Stat::findMiss); + bucket->unlock(); + endOperation(); } - cache->metadata()->lock(); - auto result = cache->metadata()->cache(); - cache->metadata()->unlock(); - return result; } -TransactionalCache::TransactionalCache(Manager* manager, - uint64_t requestedLimit, +bool TransactionalCache::insert(CachedValue* value) { + TRI_ASSERT(value != nullptr); + bool inserted = false; + uint32_t hash = hashKey(value->key(), value->keySize); + + bool ok; + TransactionalBucket* bucket; + std::tie(ok, bucket) = getBucket(hash, TRIES_FAST); + + if (ok) { + bool allowed = !bucket->isBlacklisted(hash); + if (allowed) { + bool eviction = false; + int64_t change = value->size(); + CachedValue* candidate = bucket->find(hash, value->key(), value->keySize); + + if (candidate == nullptr && bucket->isFull()) { + candidate = bucket->evictionCandidate(); + if (candidate == nullptr) { + allowed = false; + } else { + eviction = true; + } + } + + if (allowed) { + if (candidate != nullptr) { + change -= candidate->size(); + } + + _metadata->lock(); + allowed = _metadata->adjustUsageIfAllowed(change); + _metadata->unlock(); + + if (allowed) { + if (candidate != nullptr) { + bucket->evict(candidate, true); + freeValue(candidate); + } + recordStat(eviction ? Stat::insertEviction : Stat::insertNoEviction); + bucket->insert(hash, value); + inserted = true; + } else { + requestResize(); // let function do the hard work + } + } + } + + bucket->unlock(); + if (inserted) { + requestMigrate(); // let function do the hard work + } + endOperation(); + } + + return inserted; +} + +bool TransactionalCache::remove(void const* key, uint32_t keySize) { + TRI_ASSERT(key != nullptr); + bool removed = false; + uint32_t hash = hashKey(key, keySize); + + bool ok; + TransactionalBucket* bucket; + std::tie(ok, bucket) = getBucket(hash, TRIES_SLOW); + + if (ok) { + CachedValue* candidate = bucket->remove(hash, key, keySize); + + if (candidate != nullptr) { + int64_t change = -static_cast(candidate->size()); + + _metadata->lock(); + bool allowed = _metadata->adjustUsageIfAllowed(change); + TRI_ASSERT(allowed); + _metadata->unlock(); + + freeValue(candidate); + } + + removed = true; + bucket->unlock(); + endOperation(); + } + + return removed; +} + +bool TransactionalCache::blacklist(void const* key, uint32_t keySize) { + TRI_ASSERT(key != nullptr); + bool blacklisted = false; + uint32_t hash = hashKey(key, keySize); + + bool ok; + TransactionalBucket* bucket; + std::tie(ok, bucket) = getBucket(hash, TRIES_SLOW); + + if (ok) { + CachedValue* candidate = bucket->blacklist(hash, key, keySize); + blacklisted = true; + + if (candidate != nullptr) { + int64_t change = -static_cast(candidate->size()); + + _metadata->lock(); + bool allowed = _metadata->adjustUsageIfAllowed(change); + TRI_ASSERT(allowed); + _metadata->unlock(); + + freeValue(candidate); + } + + bucket->unlock(); + endOperation(); + } + + return blacklisted; +} + +uint64_t TransactionalCache::allocationSize(bool enableWindowedStats) { + return sizeof(TransactionalCache) + + StatBuffer::allocationSize(_evictionStatsCapacity) + + (enableWindowedStats ? (sizeof(StatBuffer) + + StatBuffer::allocationSize(_findStatsCapacity)) + : 0); +} + +std::shared_ptr TransactionalCache::create(Manager* manager, + Manager::MetadataItr metadata, + bool allowGrowth, + bool enableWindowedStats) { + return std::make_shared(Cache::ConstructionGuard(), + manager, metadata, allowGrowth, + enableWindowedStats); +} + +TransactionalCache::TransactionalCache(Cache::ConstructionGuard guard, + Manager* manager, + Manager::MetadataItr metadata, bool allowGrowth, bool enableWindowedStats) - : Cache(manager, requestedLimit, allowGrowth, enableWindowedStats, - [](Cache* p) -> void { - delete reinterpret_cast(p); - }, - sizeof(TransactionalCache)) { - // TODO: implement this + : Cache(guard, manager, metadata, allowGrowth, enableWindowedStats), + _table(nullptr), + _logSize(0), + _tableSize(1), + _maskShift(32), + _bucketMask(0), + _auxiliaryTable(nullptr), + _auxiliaryLogSize(0), + _auxiliaryTableSize(1), + _auxiliaryMaskShift(32), + _auxiliaryBucketMask(0) { + _state.lock(); + if (isOperational()) { + _metadata->lock(); + _table = reinterpret_cast(_metadata->table()); + _logSize = _metadata->logSize(); + _tableSize = (1ULL << _logSize); + _maskShift = 32 - _logSize; + _bucketMask = (_tableSize - 1) << _maskShift; + _metadata->unlock(); + } + _state.unlock(); } TransactionalCache::~TransactionalCache() { - // TODO: implement this + _state.lock(); + if (!_state.isSet(State::Flag::shutdown)) { + _state.unlock(); + shutdown(); + } + if (_state.isLocked()) { + _state.unlock(); + } } bool TransactionalCache::freeMemory() { - // TODO: implement this - return false; + _state.lock(); + if (!isOperational()) { + _state.unlock(); + return false; + } + startOperation(); + _state.unlock(); + + bool underLimit = reclaimMemory(0ULL); + uint64_t failures = 0; + while (!underLimit) { + // pick a random bucket + uint32_t randomHash = RandomGenerator::interval(UINT32_MAX); + bool ok; + TransactionalBucket* bucket; + std::tie(ok, bucket) = getBucket(randomHash, TRIES_FAST, false); + + if (ok) { + failures = 0; + // evict LRU freeable value if exists + CachedValue* candidate = bucket->evictionCandidate(); + + if (candidate != nullptr) { + uint64_t size = candidate->size(); + bucket->evict(candidate); + freeValue(candidate); + + underLimit = reclaimMemory(size); + } + + bucket->unlock(); + } else { + failures++; + if (failures > 100) { + _state.lock(); + bool shouldQuit = !isOperational(); + _state.unlock(); + + if (shouldQuit) { + break; + } else { + failures = 0; + } + } + } + } + + endOperation(); + return true; } bool TransactionalCache::migrate() { - // TODO: implement this - return false; + _state.lock(); + if (!isOperational()) { + _state.unlock(); + return false; + } + startOperation(); + _metadata->lock(); + if (_metadata->table() == nullptr || _metadata->auxiliaryTable() == nullptr) { + _metadata->unlock(); + _state.unlock(); + endOperation(); + return false; + } + _auxiliaryTable = + reinterpret_cast(_metadata->auxiliaryTable()); + _auxiliaryLogSize = _metadata->auxiliaryLogSize(); + _auxiliaryTableSize = (1ULL << _auxiliaryLogSize); + _auxiliaryMaskShift = (32 - _auxiliaryLogSize); + _auxiliaryBucketMask = (_auxiliaryTableSize - 1) << _auxiliaryMaskShift; + _metadata->unlock(); + _state.toggleFlag(State::Flag::migrating); + _state.unlock(); + + uint64_t term = _manager->_transactions.term(); + + for (uint32_t i = 0; i < _tableSize; i++) { + // lock current bucket + TransactionalBucket* bucket = &(_table[i]); + bucket->lock(term, -1LL); + term = std::max(term, bucket->_blacklistTerm); + + // collect target bucket(s) + std::vector targets; + if (_logSize > _auxiliaryLogSize) { + uint32_t targetIndex = (i << _maskShift) >> _auxiliaryMaskShift; + targets.emplace_back(&(_auxiliaryTable[targetIndex])); + } else { + uint32_t baseIndex = (i << _maskShift) >> _auxiliaryMaskShift; + for (size_t j = 0; j < (1U << (_auxiliaryLogSize - _logSize)); j++) { + uint32_t targetIndex = baseIndex + j; + targets.emplace_back(&(_auxiliaryTable[targetIndex])); + } + } + // lock target bucket(s) + for (TransactionalBucket* targetBucket : targets) { + targetBucket->lock(term, TRIES_GUARANTEE); + term = std::max(term, targetBucket->_blacklistTerm); + } + + // update all buckets to maximum term found (guaranteed at most the current) + bucket->updateBlacklistTerm(term); + for (TransactionalBucket* targetBucket : targets) { + targetBucket->updateBlacklistTerm(term); + } + // now actually migrate any relevant blacklist terms + if (bucket->isFullyBlacklisted()) { + for (TransactionalBucket* targetBucket : targets) { + if (!targetBucket->isFullyBlacklisted()) { + (*targetBucket)._state.toggleFlag(State::Flag::blacklisted); + } + } + } else { + for (size_t j = 0; j < TransactionalBucket::SLOTS_BLACKLIST; j++) { + uint32_t hash = bucket->_blacklistHashes[j]; + if (hash == 0) { + break; + } + uint32_t targetIndex = getIndex(hash, true); + TransactionalBucket* targetBucket = &(_auxiliaryTable[targetIndex]); + CachedValue* candidate = targetBucket->blacklist(hash, nullptr, 0); + TRI_ASSERT(candidate == nullptr); + bucket->_blacklistHashes[j] = 0; + } + } + + // migrate actual values + for (size_t j = 0; j < TransactionalBucket::SLOTS_DATA; j++) { + size_t k = TransactionalBucket::SLOTS_DATA - (j + 1); + if (bucket->_cachedHashes[k] != 0) { + uint32_t hash = bucket->_cachedHashes[k]; + CachedValue* value = bucket->_cachedData[k]; + + uint32_t targetIndex = getIndex(hash, true); + TransactionalBucket* targetBucket = &(_auxiliaryTable[targetIndex]); + if (targetBucket->isBlacklisted(hash)) { + uint64_t size = value->size(); + freeValue(value); + reclaimMemory(size); + } else { + bool haveSpace = true; + if (targetBucket->isFull()) { + CachedValue* candidate = targetBucket->evictionCandidate(); + if (candidate != nullptr) { + targetBucket->evict(candidate, true); + uint64_t size = candidate->size(); + freeValue(candidate); + reclaimMemory(size); + } else { + haveSpace = false; + } + } + if (haveSpace) { + targetBucket->insert(hash, value); + } else { + uint64_t size = value->size(); + freeValue(value); + reclaimMemory(size); + } + } + + bucket->_cachedHashes[k] = 0; + bucket->_cachedData[k] = nullptr; + } + } + + // unlock targets + for (TransactionalBucket* targetBucket : targets) { + targetBucket->unlock(); + } + + // finish up this bucket's migration + bucket->_state.toggleFlag(State::Flag::migrated); + bucket->unlock(); + } + + // swap tables and unmark local migrating flag + _state.lock(); + std::swap(_table, _auxiliaryTable); + std::swap(_logSize, _auxiliaryLogSize); + std::swap(_tableSize, _auxiliaryTableSize); + std::swap(_maskShift, _auxiliaryMaskShift); + std::swap(_bucketMask, _auxiliaryBucketMask); + _state.toggleFlag(State::Flag::migrating); + _state.unlock(); + + // clear out old table + clearTable(_auxiliaryTable, _auxiliaryTableSize); + + // release references to old table + _state.lock(); + _auxiliaryTable = nullptr; + _auxiliaryLogSize = 0; + _auxiliaryTableSize = 1; + _auxiliaryMaskShift = 32; + _auxiliaryBucketMask = 0; + _state.unlock(); + + // swap table in metadata + _metadata->lock(); + _metadata->swapTables(); + _metadata->unlock(); + + endOperation(); + return true; } void TransactionalCache::clearTables() { - // TODO: implement this + if (_table != nullptr) { + clearTable(_table, _tableSize); + } + if (_auxiliaryTable != nullptr) { + clearTable(_auxiliaryTable, _auxiliaryTableSize); + } +} + +std::pair TransactionalCache::getBucket( + uint32_t hash, int64_t maxTries, bool singleOperation) { + TransactionalBucket* bucket = nullptr; + + bool ok = _state.lock(maxTries); + if (ok) { + bool started = false; + ok = isOperational(); + if (ok) { + if (singleOperation) { + startOperation(); + started = true; + _metadata->lock(); + _manager->reportAccess(_metadata->cache()); + _metadata->unlock(); + } + + uint64_t term = _manager->_transactions.term(); + + bucket = &(_table[getIndex(hash, false)]); + ok = bucket->lock(term, maxTries); + if (ok && + bucket->isMigrated()) { // get bucket from auxiliary table instead + bucket->unlock(); + bucket = &(_auxiliaryTable[getIndex(hash, true)]); + ok = bucket->lock(term, maxTries); + if (ok && bucket->isMigrated()) { + ok = false; + bucket->unlock(); + } + } + } + if (!ok && started) { + endOperation(); + } + _state.unlock(); + } + + return std::pair(ok, bucket); +} + +void TransactionalCache::clearTable(TransactionalBucket* table, + uint64_t tableSize) { + for (uint64_t i = 0; i < tableSize; i++) { + TransactionalBucket* bucket = &(table[i]); + bucket->lock(0, -1LL); // term doesn't actually matter here + for (size_t j = 0; j < TransactionalBucket::SLOTS_DATA; j++) { + if (bucket->_cachedData[j] != nullptr) { + uint64_t size = bucket->_cachedData[j]->size(); + freeValue(bucket->_cachedData[j]); + reclaimMemory(size); + } + } + bucket->clear(); + } +} + +uint32_t TransactionalCache::getIndex(uint32_t hash, bool useAuxiliary) const { + if (useAuxiliary) { + return ((hash & _auxiliaryBucketMask) >> _auxiliaryMaskShift); + } + + return ((hash & _bucketMask) >> _maskShift); } diff --git a/arangod/Cache/TransactionalCache.h b/arangod/Cache/TransactionalCache.h index a4db4f34f7..be7f3ef54f 100644 --- a/arangod/Cache/TransactionalCache.h +++ b/arangod/Cache/TransactionalCache.h @@ -28,7 +28,10 @@ #include "Cache/Cache.h" #include "Cache/CachedValue.h" #include "Cache/FrequencyBuffer.h" +#include "Cache/Manager.h" +#include "Cache/ManagerTasks.h" #include "Cache/Metadata.h" +#include "Cache/State.h" #include "Cache/TransactionalBucket.h" #include @@ -39,19 +42,73 @@ namespace arangodb { namespace cache { -class Manager; // forward declaration - +//////////////////////////////////////////////////////////////////////////////// +/// @brief A transactional, LRU-ish cache. +/// +/// To create a cache, see Manager class. Once created, the class has a simple +/// API mostly following that of the base Cache class. For any non-pure-virtual +/// functions, see Cache.h for documentation. The only additional functions +/// exposed on the API of the transactional cache are those dealing with the +/// blacklisting of keys. +/// +/// To operate correctly, whenever a key is about to be written to the backing +/// store, it must be blacklisted in any corresponding transactional caches. +/// This will prevent the cache from serving stale or potentially incorrect +/// values and allow for clients to fall through to the backing transactional +/// store. +//////////////////////////////////////////////////////////////////////////////// class TransactionalCache final : public Cache { public: + TransactionalCache(Cache::ConstructionGuard guard, Manager* manager, + Manager::MetadataItr metadata, bool allowGrowth, + bool enableWindowedStats); + ~TransactionalCache(); + TransactionalCache() = delete; TransactionalCache(TransactionalCache const&) = delete; TransactionalCache& operator=(TransactionalCache const&) = delete; public: + ////////////////////////////////////////////////////////////////////////////// + /// @brief Looks up the given key. + /// + /// May report a false negative if it fails to acquire a lock in a timely + /// fashion. Should not block for long. + ////////////////////////////////////////////////////////////////////////////// Cache::Finding find(void const* key, uint32_t keySize); + + ////////////////////////////////////////////////////////////////////////////// + /// @brief Attempts to insert the given value. + /// + /// Returns true if inserted, false otherwise. Will not insert if the key is + /// (or its corresponding hash) is blacklisted. Will not insert value if this + /// would cause the total usage to exceed the limits. May also not insert + /// value if it fails to acquire a lock in a timely fashion. Should not block + /// for long. + ////////////////////////////////////////////////////////////////////////////// bool insert(CachedValue* value); + + ////////////////////////////////////////////////////////////////////////////// + /// @brief Attempts to remove the given key. + /// + /// Returns true if the key guaranteed not to be in the cache, false if the + /// key may remain in the cache. May leave the key in the cache if it fails to + /// acquire a lock in a timely fashion. Makes more attempts to acquire a lock + /// before quitting, so may block for longer than find or insert. Client may + /// re-try. + ////////////////////////////////////////////////////////////////////////////// bool remove(void const* key, uint32_t keySize); - void blackList(void const* key, uint32_t keySize); + + ////////////////////////////////////////////////////////////////////////////// + /// @brief Attempts to blacklist the given key. + /// + /// Returns true if the key was blacklisted and is guaranteed not to be in the + /// cache, false otherwise. May not blacklist the key if it fails to + /// acquire a lock in a timely fashion. Makes more attempts to acquire a lock + /// before quitting, so may block for longer than find or insert. Client + /// should re-try. + ////////////////////////////////////////////////////////////////////////////// + bool blacklist(void const* key, uint32_t keySize); private: // main table info @@ -74,19 +131,22 @@ class TransactionalCache final : public Cache { friend class MigrateTask; private: - // creator -- do not use constructor explicitly - static std::shared_ptr create(Manager* manager, uint64_t requestedSize, + static uint64_t allocationSize(bool enableWindowedStats); + static std::shared_ptr create(Manager* manager, + Manager::MetadataItr metadata, bool allowGrowth, bool enableWindowedStats); - - TransactionalCache(Manager* manager, uint64_t requestedLimit, - bool allowGrowth, bool enableWindowedStats); - ~TransactionalCache(); - // management bool freeMemory(); bool migrate(); void clearTables(); + + // helpers + std::pair getBucket(uint32_t hash, + int64_t maxTries, + bool singleOperation = true); + void clearTable(TransactionalBucket* table, uint64_t tableSize); + uint32_t getIndex(uint32_t hash, bool useAuxiliary) const; }; }; // end namespace cache