1
0
Fork 0

Completed implementation of transactional cache.

This commit is contained in:
Dan Larkin 2017-03-06 10:01:24 -05:00
parent f7b819579d
commit db2cabf844
32 changed files with 2757 additions and 402 deletions

View File

@ -66,7 +66,10 @@ add_executable(${TEST_CACHE_SUITE}
Cache/Rebalancer.cpp
Cache/State.cpp
Cache/TransactionalBucket.cpp
Cache/TransactionWindow.cpp
Cache/TransactionalCache.cpp
Cache/TransactionalStore.cpp
Cache/TransactionManager.cpp
Cache/TransactionsWithBackingStore.cpp
../lib/Basics/WorkMonitorDummy.cpp
../arangod/Cache/Cache.cpp
../arangod/Cache/CacheManagerFeatureThreads.cpp
@ -78,9 +81,10 @@ add_executable(${TEST_CACHE_SUITE}
../arangod/Cache/PlainCache.cpp
../arangod/Cache/Rebalancer.cpp
../arangod/Cache/State.cpp
../arangod/Cache/Transaction.cpp
../arangod/Cache/TransactionalBucket.cpp
../arangod/Cache/TransactionalCache.cpp
../arangod/Cache/TransactionWindow.cpp
../arangod/Cache/TransactionManager.cpp
)
include_directories(${TEST_CACHE_SUITE}
@ -90,6 +94,7 @@ include_directories(${TEST_CACHE_SUITE}
target_link_libraries(${TEST_CACHE_SUITE}
${LIB_ARANGO}
${MSVC_LIBS}
${ROCKSDB_LIBS}
boost_system
boost_boost
${SYSTEM_LIBRARIES}

View File

@ -33,6 +33,9 @@
#include "Cache/FrequencyBuffer.h"
#include <stdint.h>
#include <memory>
#include <iostream>
using namespace arangodb::cache;
@ -70,7 +73,9 @@ BOOST_AUTO_TEST_CASE(tst_uint8_t) {
BOOST_CHECK(uint8_t() == zero);
FrequencyBuffer<uint8_t> buffer(8);
BOOST_CHECK_EQUAL(buffer.memoryUsage(), sizeof(FrequencyBuffer<uint8_t>) + 8);
BOOST_CHECK_EQUAL(
buffer.memoryUsage(),
sizeof(FrequencyBuffer<uint8_t>) + sizeof(std::vector<uint8_t>) + 8);
for (size_t i = 0; i < 4; i++) {
buffer.insertRecord(two);
@ -80,52 +85,73 @@ BOOST_AUTO_TEST_CASE(tst_uint8_t) {
}
auto frequencies = buffer.getFrequencies();
BOOST_CHECK_EQUAL(2ULL, frequencies->size());
BOOST_CHECK_EQUAL(static_cast<uint64_t>(2), frequencies->size());
BOOST_CHECK_EQUAL(one, (*frequencies)[0].first);
BOOST_CHECK_EQUAL(2ULL, (*frequencies)[0].second);
BOOST_CHECK_EQUAL(static_cast<uint64_t>(2), (*frequencies)[0].second);
BOOST_CHECK_EQUAL(two, (*frequencies)[1].first);
BOOST_CHECK_EQUAL(4ULL, (*frequencies)[1].second);
BOOST_CHECK_EQUAL(static_cast<uint64_t>(4), (*frequencies)[1].second);
for (size_t i = 0; i < 8; i++) {
buffer.insertRecord(one);
}
frequencies = buffer.getFrequencies();
BOOST_CHECK_EQUAL(1ULL, frequencies->size());
BOOST_CHECK_EQUAL(static_cast<size_t>(1), frequencies->size());
BOOST_CHECK_EQUAL(one, (*frequencies)[0].first);
BOOST_CHECK_EQUAL(8ULL, (*frequencies)[0].second);
BOOST_CHECK_EQUAL(static_cast<uint64_t>(8), (*frequencies)[0].second);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief test behavior with pointers
/// @brief test behavior with shared_ptr
////////////////////////////////////////////////////////////////////////////////
BOOST_AUTO_TEST_CASE(tst_pointers) {
uint8_t* zero = nullptr;
uint8_t one = 1;
uint8_t two = 2;
struct cmp_weak_ptr {
bool operator()(std::weak_ptr<int> const& left,
std::weak_ptr<int> const& right) const {
return !left.owner_before(right) && !right.owner_before(left);
}
};
struct hash_weak_ptr {
size_t operator()(std::weak_ptr<int> const& wp) const {
auto sp = wp.lock();
return std::hash<decltype(sp)>()(sp);
}
};
typedef FrequencyBuffer<std::weak_ptr<int>, cmp_weak_ptr, hash_weak_ptr>
BufferType;
std::shared_ptr<int> p0(nullptr);
// check that default construction is as expected
typedef uint8_t* smallptr;
BOOST_CHECK(smallptr() == zero);
BOOST_CHECK(std::shared_ptr<int>() == p0);
FrequencyBuffer<uint8_t*> buffer(8);
std::shared_ptr<int> p1(new int());
*p1 = static_cast<int>(1);
std::shared_ptr<int> p2(new int());
*p2 = static_cast<int>(2);
BufferType buffer(8);
BOOST_CHECK_EQUAL(buffer.memoryUsage(),
sizeof(FrequencyBuffer<uint8_t*>) + (8 * sizeof(uint8_t*)));
sizeof(BufferType) +
sizeof(std::vector<std::weak_ptr<int>>) +
(8 * sizeof(std::weak_ptr<int>)));
for (size_t i = 0; i < 4; i++) {
buffer.insertRecord(&two);
buffer.insertRecord(p1);
}
for (size_t i = 0; i < 2; i++) {
buffer.insertRecord(&one);
buffer.insertRecord(p2);
}
auto frequencies = buffer.getFrequencies();
BOOST_CHECK_EQUAL(2ULL, frequencies->size());
BOOST_CHECK_EQUAL(&one, (*frequencies)[0].first);
BOOST_CHECK_EQUAL(2ULL, (*frequencies)[0].second);
BOOST_CHECK_EQUAL(&two, (*frequencies)[1].first);
BOOST_CHECK_EQUAL(4ULL, (*frequencies)[1].second);
BOOST_CHECK_EQUAL(static_cast<uint64_t>(2), frequencies->size());
BOOST_CHECK(p2 == (*frequencies)[0].first.lock());
BOOST_CHECK_EQUAL(static_cast<uint64_t>(2), (*frequencies)[0].second);
BOOST_CHECK(p1 == (*frequencies)[1].first.lock());
BOOST_CHECK_EQUAL(static_cast<uint64_t>(4), (*frequencies)[1].second);
}
////////////////////////////////////////////////////////////////////////////////

View File

@ -102,8 +102,13 @@ BOOST_AUTO_TEST_CASE(tst_mixed_load) {
size_t threadCount = 4;
std::vector<std::shared_ptr<Cache>> caches;
for (size_t i = 0; i < cacheCount; i++) {
caches.emplace_back(
manager.createCache(Manager::CacheType::Plain, initialSize, true));
auto res =
/*manager.createCache(((i % 2 == 0) ? Manager::CacheType::Plain
: Manager::CacheType::Transactional),
initialSize, true);*/
manager.createCache(Manager::CacheType::Plain, initialSize, true);
TRI_ASSERT(res);
caches.emplace_back(res);
}
uint64_t chunkSize = 4 * 1024 * 1024;
@ -211,11 +216,16 @@ BOOST_AUTO_TEST_CASE(tst_lifecycle_chaos) {
std::queue<std::shared_ptr<Cache>> caches;
for (uint64_t i = 0; i < operationCount; i++) {
uint32_t r = RandomGenerator::interval(static_cast<uint32_t>(1UL));
uint32_t r = RandomGenerator::interval(static_cast<uint32_t>(1));
switch (r) {
case 0: {
caches.emplace(manager.createCache(Manager::CacheType::Plain,
initialSize, true));
auto res = manager.createCache(
(i % 2 == 0) ? Manager::CacheType::Plain
: Manager::CacheType::Transactional,
initialSize, true);
if (res) {
caches.emplace(res);
}
}
case 1:
default: {

View File

@ -61,14 +61,8 @@ BOOST_FIXTURE_TEST_SUITE(CCacheMetadataTest, CCacheMetadataSetup)
////////////////////////////////////////////////////////////////////////////////
BOOST_AUTO_TEST_CASE(tst_constructor) {
uint64_t dummy;
std::shared_ptr<Cache> dummyCache(reinterpret_cast<Cache*>(&dummy),
[](Cache* p) -> void {});
uint8_t dummyTable;
uint32_t logSize = 1;
uint64_t limit = 1024;
Metadata metadata(dummyCache, limit, &dummyTable, logSize);
Metadata metadata(limit);
}
////////////////////////////////////////////////////////////////////////////////
@ -79,26 +73,19 @@ BOOST_AUTO_TEST_CASE(tst_getters) {
uint64_t dummy;
std::shared_ptr<Cache> dummyCache(reinterpret_cast<Cache*>(&dummy),
[](Cache* p) -> void {});
uint8_t dummyTable;
uint32_t logSize = 1;
uint64_t limit = 1024;
Metadata metadata(dummyCache, limit, &dummyTable, logSize);
Metadata metadata(limit);
metadata.link(dummyCache);
metadata.lock();
BOOST_CHECK(dummyCache == metadata.cache());
BOOST_CHECK_EQUAL(logSize, metadata.logSize());
BOOST_CHECK_EQUAL(0UL, metadata.auxiliaryLogSize());
BOOST_CHECK_EQUAL(limit, metadata.softLimit());
BOOST_CHECK_EQUAL(limit, metadata.hardLimit());
BOOST_CHECK_EQUAL(0UL, metadata.usage());
BOOST_CHECK(&dummyTable == metadata.table());
BOOST_CHECK(nullptr == metadata.auxiliaryTable());
metadata.unlock();
}
@ -107,14 +94,9 @@ BOOST_AUTO_TEST_CASE(tst_getters) {
////////////////////////////////////////////////////////////////////////////////
BOOST_AUTO_TEST_CASE(tst_usage_limits) {
uint64_t dummy;
std::shared_ptr<Cache> dummyCache(reinterpret_cast<Cache*>(&dummy),
[](Cache* p) -> void {});
uint8_t dummyTable;
uint32_t logSize = 1;
bool success;
Metadata metadata(dummyCache, 1024ULL, &dummyTable, logSize);
Metadata metadata(1024ULL);
metadata.lock();
@ -158,19 +140,19 @@ BOOST_AUTO_TEST_CASE(tst_usage_limits) {
////////////////////////////////////////////////////////////////////////////////
BOOST_AUTO_TEST_CASE(tst_migration) {
uint64_t dummy;
std::shared_ptr<Cache> dummyCache(reinterpret_cast<Cache*>(&dummy),
[](Cache* p) -> void {});
uint8_t dummyTable;
uint8_t dummyAuxiliaryTable;
uint32_t logSize = 1;
uint32_t auxiliaryLogSize = 2;
uint64_t limit = 1024;
Metadata metadata(dummyCache, limit, &dummyTable, logSize);
Metadata metadata(limit);
metadata.lock();
metadata.grantAuxiliaryTable(&dummyTable, logSize);
metadata.swapTables();
metadata.grantAuxiliaryTable(&dummyAuxiliaryTable, auxiliaryLogSize);
BOOST_CHECK_EQUAL(auxiliaryLogSize, metadata.auxiliaryLogSize());
BOOST_CHECK(&dummyAuxiliaryTable == metadata.auxiliaryTable());

View File

@ -1,5 +1,5 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief test suite for arangodb::cache::PlainBucket
/// @brief test suite for arangodb::cache::PlainCache
///
/// @file
///

View File

@ -1,5 +1,5 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief test suite for arangodb::cache::Manager
/// @brief test suite for arangodb::cache::Rebalancer
///
/// @file
///
@ -34,6 +34,8 @@
#include "Cache/Manager.h"
#include "Cache/PlainCache.h"
#include "Cache/Rebalancer.h"
#include "Cache/Transaction.h"
#include "Cache/TransactionalCache.h"
#include "MockScheduler.h"
@ -66,10 +68,10 @@ struct CCacheRebalancerSetup {
BOOST_FIXTURE_TEST_SUITE(CCacheRebalancerTest, CCacheRebalancerSetup)
////////////////////////////////////////////////////////////////////////////////
/// @brief test rebalancing (multi-threaded)
/// @brief test rebalancing plain caches (multi-threaded)
////////////////////////////////////////////////////////////////////////////////
BOOST_AUTO_TEST_CASE(tst_rebalancing) {
BOOST_AUTO_TEST_CASE(tst_rebalancing_plain) {
uint64_t initialSize = 16ULL * 1024ULL;
RandomGenerator::initialize(RandomGenerator::RandomType::MERSENNE);
MockScheduler scheduler(4);
@ -190,6 +192,145 @@ BOOST_AUTO_TEST_CASE(tst_rebalancing) {
RandomGenerator::shutdown();
}
////////////////////////////////////////////////////////////////////////////////
/// @brief test rebalancing transactional caches (multi-threaded)
////////////////////////////////////////////////////////////////////////////////
BOOST_AUTO_TEST_CASE(tst_rebalancing_transactional) {
uint64_t initialSize = 16ULL * 1024ULL;
RandomGenerator::initialize(RandomGenerator::RandomType::MERSENNE);
MockScheduler scheduler(4);
Manager manager(scheduler.ioService(), 128ULL * 1024ULL * 1024ULL);
Rebalancer rebalancer(&manager);
size_t cacheCount = 4;
size_t threadCount = 4;
std::vector<std::shared_ptr<Cache>> caches;
for (size_t i = 0; i < cacheCount; i++) {
caches.emplace_back(manager.createCache(Manager::CacheType::Transactional,
initialSize, true));
}
bool doneRebalancing = false;
auto rebalanceWorker = [&rebalancer, &doneRebalancing]() -> void {
while (!doneRebalancing) {
bool rebalanced = rebalancer.rebalance();
if (rebalanced) {
usleep(500 * 1000);
} else {
usleep(100);
}
}
};
auto rebalancerThread = new std::thread(rebalanceWorker);
uint64_t chunkSize = 4 * 1024 * 1024;
uint64_t initialInserts = 1 * 1024 * 1024;
uint64_t operationCount = 4 * 1024 * 1024;
std::atomic<uint64_t> hitCount(0);
std::atomic<uint64_t> missCount(0);
auto worker = [&manager, &caches, cacheCount, initialInserts, operationCount,
&hitCount,
&missCount](uint64_t lower, uint64_t upper) -> void {
Transaction* tx = manager.beginTransaction(false);
// fill with some initial data
for (uint64_t i = 0; i < initialInserts; i++) {
uint64_t item = lower + i;
size_t cacheIndex = item % cacheCount;
CachedValue* value = CachedValue::construct(&item, sizeof(uint64_t),
&item, sizeof(uint64_t));
bool ok = caches[cacheIndex]->insert(value);
if (!ok) {
delete value;
}
}
// initialize valid range for keys that *might* be in cache
uint64_t validLower = lower;
uint64_t validUpper = lower + initialInserts - 1;
uint64_t blacklistUpper = validUpper;
// commence mixed workload
for (uint64_t i = 0; i < operationCount; i++) {
uint32_t r = RandomGenerator::interval(static_cast<uint32_t>(99UL));
if (r >= 99) { // remove something
if (validLower == validUpper) {
continue; // removed too much
}
uint64_t item = validLower++;
size_t cacheIndex = item % cacheCount;
caches[cacheIndex]->remove(&item, sizeof(uint64_t));
} else if (r >= 90) { // insert something
if (validUpper == upper) {
continue; // already maxed out range
}
uint64_t item = ++validUpper;
if (validUpper > blacklistUpper) {
blacklistUpper = validUpper;
}
size_t cacheIndex = item % cacheCount;
CachedValue* value = CachedValue::construct(&item, sizeof(uint64_t),
&item, sizeof(uint64_t));
bool ok = caches[cacheIndex]->insert(value);
if (!ok) {
delete value;
}
} else if (r >= 80) { // blacklist something
if (blacklistUpper == upper) {
continue; // already maxed out range
}
uint64_t item = ++blacklistUpper;
size_t cacheIndex = item % cacheCount;
caches[cacheIndex]->blacklist(&item, sizeof(uint64_t));
} else { // lookup something
uint64_t item = RandomGenerator::interval(
static_cast<int64_t>(validLower), static_cast<int64_t>(validUpper));
size_t cacheIndex = item % cacheCount;
Cache::Finding f = caches[cacheIndex]->find(&item, sizeof(uint64_t));
if (f.found()) {
hitCount++;
TRI_ASSERT(f.value() != nullptr);
TRI_ASSERT(f.value()->sameKey(&item, sizeof(uint64_t)));
} else {
missCount++;
TRI_ASSERT(f.value() == nullptr);
}
}
}
manager.endTransaction(tx);
};
std::vector<std::thread*> threads;
// dispatch threads
for (size_t i = 0; i < threadCount; i++) {
uint64_t lower = i * chunkSize;
uint64_t upper = ((i + 1) * chunkSize) - 1;
threads.push_back(new std::thread(worker, lower, upper));
}
// join threads
for (auto t : threads) {
t->join();
delete t;
}
doneRebalancing = true;
rebalancerThread->join();
delete rebalancerThread;
for (auto cache : caches) {
manager.destroyCache(cache);
}
RandomGenerator::shutdown();
}
////////////////////////////////////////////////////////////////////////////////
/// @brief generate tests
////////////////////////////////////////////////////////////////////////////////

View File

@ -1,5 +1,5 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief test suite for arangodb::cache::TransactionWindow
/// @brief test suite for arangodb::cache::TransactionManager
///
/// @file
///
@ -30,7 +30,8 @@
#define BOOST_TEST_INCLUDED
#include <boost/test/unit_test.hpp>
#include "Cache/TransactionWindow.h"
#include "Cache/Transaction.h"
#include "Cache/TransactionManager.h"
#include <stdint.h>
#include <iostream>
@ -41,13 +42,13 @@ using namespace arangodb::cache;
// --SECTION-- setup / tear-down
// -----------------------------------------------------------------------------
struct CCacheTransactionWindowSetup {
CCacheTransactionWindowSetup() {
BOOST_TEST_MESSAGE("setup TransactionWindow");
struct CCacheTransactionManagerSetup {
CCacheTransactionManagerSetup() {
BOOST_TEST_MESSAGE("setup TransactionManager");
}
~CCacheTransactionWindowSetup() {
BOOST_TEST_MESSAGE("tear-down TransactionWindow");
~CCacheTransactionManagerSetup() {
BOOST_TEST_MESSAGE("tear-down TransactionManager");
}
};
// -----------------------------------------------------------------------------
@ -58,31 +59,56 @@ struct CCacheTransactionWindowSetup {
/// @brief setup
////////////////////////////////////////////////////////////////////////////////
BOOST_FIXTURE_TEST_SUITE(CCacheTransactionWindowTest,
CCacheTransactionWindowSetup)
BOOST_FIXTURE_TEST_SUITE(CCacheTransactionManagerTest,
CCacheTransactionManagerSetup)
////////////////////////////////////////////////////////////////////////////////
/// @brief test transaction term management
////////////////////////////////////////////////////////////////////////////////
BOOST_AUTO_TEST_CASE(tst_transaction_term) {
TransactionWindow transactions;
TransactionManager transactions;
Transaction* tx1;
Transaction* tx2;
Transaction* tx3;
BOOST_CHECK_EQUAL(0ULL, transactions.term());
transactions.start();
tx1 = transactions.begin(false);
BOOST_CHECK_EQUAL(1ULL, transactions.term());
transactions.end();
transactions.end(tx1);
BOOST_CHECK_EQUAL(2ULL, transactions.term());
transactions.start();
tx1 = transactions.begin(false);
BOOST_CHECK_EQUAL(3ULL, transactions.term());
transactions.start();
tx2 = transactions.begin(false);
BOOST_CHECK_EQUAL(3ULL, transactions.term());
transactions.end();
transactions.end(tx1);
BOOST_CHECK_EQUAL(3ULL, transactions.term());
transactions.end();
transactions.end(tx2);
BOOST_CHECK_EQUAL(4ULL, transactions.term());
tx1 = transactions.begin(true);
BOOST_CHECK_EQUAL(4ULL, transactions.term());
tx2 = transactions.begin(false);
BOOST_CHECK_EQUAL(5ULL, transactions.term());
transactions.end(tx2);
BOOST_CHECK_EQUAL(5ULL, transactions.term());
transactions.end(tx1);
BOOST_CHECK_EQUAL(6ULL, transactions.term());
tx1 = transactions.begin(true);
BOOST_CHECK_EQUAL(6ULL, transactions.term());
tx2 = transactions.begin(false);
BOOST_CHECK_EQUAL(7ULL, transactions.term());
transactions.end(tx2);
BOOST_CHECK_EQUAL(7ULL, transactions.term());
tx3 = transactions.begin(true);
BOOST_CHECK_EQUAL(7ULL, transactions.term());
transactions.end(tx1);
BOOST_CHECK_EQUAL(8ULL, transactions.term());
transactions.end(tx3);
BOOST_CHECK_EQUAL(8ULL, transactions.term());
}
////////////////////////////////////////////////////////////////////////////////

View File

@ -276,7 +276,7 @@ BOOST_AUTO_TEST_CASE(tst_blacklist) {
sizeof(uint64_t));
}
success = bucket.lock(0, -1LL);
success = bucket.lock(1ULL, -1LL);
BOOST_CHECK(success);
// insert three to fill

View File

@ -0,0 +1,430 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief test suite for arangodb::cache::TransactionalCache
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2017 triagens GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is triAGENS GmbH, Cologne, Germany
///
/// @author Daniel H. Larkin
/// @author Copyright 2017, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#include "Basics/Common.h"
#include "Random/RandomGenerator.h"
#define BOOST_TEST_INCLUDED
#include <boost/test/unit_test.hpp>
#include "Cache/Manager.h"
#include "Cache/Transaction.h"
#include "Cache/TransactionalCache.h"
#include "MockScheduler.h"
#include <stdint.h>
#include <string>
#include <thread>
#include <vector>
#include <iostream>
using namespace arangodb;
using namespace arangodb::cache;
// -----------------------------------------------------------------------------
// --SECTION-- setup / tear-down
// -----------------------------------------------------------------------------
struct CCacheTransactionalCacheSetup {
CCacheTransactionalCacheSetup() {
BOOST_TEST_MESSAGE("setup TransactionalCache");
}
~CCacheTransactionalCacheSetup() {
BOOST_TEST_MESSAGE("tear-down TransactionalCache");
}
};
// -----------------------------------------------------------------------------
// --SECTION-- test suite
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief setup
////////////////////////////////////////////////////////////////////////////////
BOOST_FIXTURE_TEST_SUITE(CCacheTransactionalCacheTest,
CCacheTransactionalCacheSetup)
////////////////////////////////////////////////////////////////////////////////
/// @brief test construction (single-threaded)
////////////////////////////////////////////////////////////////////////////////
BOOST_AUTO_TEST_CASE(tst_st_construction) {
Manager manager(nullptr, 1024ULL * 1024ULL);
auto cache1 = manager.createCache(Manager::CacheType::Transactional,
256ULL * 1024ULL, false);
auto cache2 = manager.createCache(Manager::CacheType::Transactional,
512ULL * 1024ULL, false);
BOOST_CHECK_EQUAL(0ULL, cache1->usage());
BOOST_CHECK_EQUAL(256ULL * 1024ULL, cache1->limit());
BOOST_CHECK_EQUAL(0ULL, cache2->usage());
BOOST_CHECK(512ULL * 1024ULL > cache2->limit());
manager.destroyCache(cache1);
manager.destroyCache(cache2);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief test insertion (single-threaded)
////////////////////////////////////////////////////////////////////////////////
BOOST_AUTO_TEST_CASE(tst_st_insertion) {
uint64_t cacheLimit = 256ULL * 1024ULL;
Manager manager(nullptr, 4ULL * cacheLimit);
auto cache =
manager.createCache(Manager::CacheType::Transactional, cacheLimit, false);
for (uint64_t i = 0; i < 1024; i++) {
CachedValue* value =
CachedValue::construct(&i, sizeof(uint64_t), &i, sizeof(uint64_t));
bool success = cache->insert(value);
BOOST_CHECK(success);
auto f = cache->find(&i, sizeof(uint64_t));
BOOST_CHECK(f.found());
}
for (uint64_t i = 0; i < 1024; i++) {
uint64_t j = 2 * i;
CachedValue* value =
CachedValue::construct(&i, sizeof(uint64_t), &j, sizeof(uint64_t));
bool success = cache->insert(value);
BOOST_CHECK(success);
auto f = cache->find(&i, sizeof(uint64_t));
BOOST_CHECK(f.found());
BOOST_CHECK(0 == memcmp(f.value()->value(), &j, sizeof(uint64_t)));
}
uint64_t notInserted = 0;
for (uint64_t i = 1024; i < 128 * 1024; i++) {
CachedValue* value =
CachedValue::construct(&i, sizeof(uint64_t), &i, sizeof(uint64_t));
bool success = cache->insert(value);
if (success) {
auto f = cache->find(&i, sizeof(uint64_t));
BOOST_CHECK(f.found());
} else {
delete value;
notInserted++;
}
}
BOOST_CHECK(notInserted > 0);
manager.destroyCache(cache);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief test removal (single-threaded)
////////////////////////////////////////////////////////////////////////////////
BOOST_AUTO_TEST_CASE(tst_st_removal) {
uint64_t cacheLimit = 256ULL * 1024ULL;
Manager manager(nullptr, 4ULL * cacheLimit);
auto cache =
manager.createCache(Manager::CacheType::Transactional, cacheLimit, false);
for (uint64_t i = 0; i < 1024; i++) {
CachedValue* value =
CachedValue::construct(&i, sizeof(uint64_t), &i, sizeof(uint64_t));
bool success = cache->insert(value);
BOOST_CHECK(success);
auto f = cache->find(&i, sizeof(uint64_t));
BOOST_CHECK(f.found());
BOOST_CHECK(f.value() != nullptr);
BOOST_CHECK(f.value()->sameKey(&i, sizeof(uint64_t)));
}
// test removal of bogus keys
for (uint64_t i = 1024; i < 2048; i++) {
bool removed = cache->remove(&i, sizeof(uint64_t));
BOOST_ASSERT(removed);
// ensure existing keys not removed
for (uint64_t j = 0; j < 1024; j++) {
auto f = cache->find(&j, sizeof(uint64_t));
BOOST_CHECK(f.found());
BOOST_CHECK(f.value() != nullptr);
BOOST_CHECK(f.value()->sameKey(&j, sizeof(uint64_t)));
}
}
// remove actual keys
for (uint64_t i = 0; i < 1024; i++) {
bool removed = cache->remove(&i, sizeof(uint64_t));
BOOST_CHECK(removed);
auto f = cache->find(&i, sizeof(uint64_t));
BOOST_CHECK(!f.found());
}
manager.destroyCache(cache);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief test blacklisting (single-threaded)
////////////////////////////////////////////////////////////////////////////////
BOOST_AUTO_TEST_CASE(tst_st_blacklist) {
uint64_t cacheLimit = 256ULL * 1024ULL;
Manager manager(nullptr, 4ULL * cacheLimit);
auto cache =
manager.createCache(Manager::CacheType::Transactional, cacheLimit, false);
Transaction* tx = manager.beginTransaction(false);
for (uint64_t i = 0; i < 1024; i++) {
CachedValue* value =
CachedValue::construct(&i, sizeof(uint64_t), &i, sizeof(uint64_t));
bool success = cache->insert(value);
BOOST_CHECK(success);
auto f = cache->find(&i, sizeof(uint64_t));
BOOST_CHECK(f.found());
BOOST_CHECK(f.value() != nullptr);
BOOST_CHECK(f.value()->sameKey(&i, sizeof(uint64_t)));
}
for (uint64_t i = 512; i < 1024; i++) {
bool success = cache->blacklist(&i, sizeof(uint64_t));
BOOST_CHECK(success);
auto f = cache->find(&i, sizeof(uint64_t));
BOOST_CHECK(!f.found());
}
for (uint64_t i = 512; i < 1024; i++) {
CachedValue* value =
CachedValue::construct(&i, sizeof(uint64_t), &i, sizeof(uint64_t));
bool success = cache->insert(value);
BOOST_CHECK(!success);
delete value;
auto f = cache->find(&i, sizeof(uint64_t));
BOOST_CHECK(!f.found());
}
manager.endTransaction(tx);
tx = manager.beginTransaction(false);
for (uint64_t i = 512; i < 1024; i++) {
CachedValue* value =
CachedValue::construct(&i, sizeof(uint64_t), &i, sizeof(uint64_t));
bool success = cache->insert(value);
BOOST_CHECK(success);
auto f = cache->find(&i, sizeof(uint64_t));
BOOST_CHECK(f.found());
}
manager.endTransaction(tx);
manager.destroyCache(cache);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief test growth behavior (single-threaded)
////////////////////////////////////////////////////////////////////////////////
BOOST_AUTO_TEST_CASE(tst_st_growth) {
uint64_t initialSize = 16ULL * 1024ULL;
uint64_t minimumSize = 64ULL * initialSize;
MockScheduler scheduler(4);
Manager manager(scheduler.ioService(), 1024ULL * 1024ULL * 1024ULL);
auto cache =
manager.createCache(Manager::CacheType::Transactional, initialSize, true);
for (uint64_t i = 0; i < 4ULL * 1024ULL * 1024ULL; i++) {
CachedValue* value =
CachedValue::construct(&i, sizeof(uint64_t), &i, sizeof(uint64_t));
bool success = cache->insert(value);
if (!success) {
delete value;
}
}
BOOST_CHECK(cache->usage() > minimumSize);
manager.destroyCache(cache);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief test shrink behavior (single-threaded)
////////////////////////////////////////////////////////////////////////////////
BOOST_AUTO_TEST_CASE(tst_st_shrink) {
uint64_t initialSize = 16ULL * 1024ULL;
RandomGenerator::initialize(RandomGenerator::RandomType::MERSENNE);
MockScheduler scheduler(4);
Manager manager(scheduler.ioService(), 1024ULL * 1024ULL * 1024ULL);
auto cache =
manager.createCache(Manager::CacheType::Transactional, initialSize, true);
for (uint64_t i = 0; i < 16ULL * 1024ULL * 1024ULL; i++) {
CachedValue* value =
CachedValue::construct(&i, sizeof(uint64_t), &i, sizeof(uint64_t));
bool success = cache->insert(value);
if (!success) {
delete value;
}
}
cache->disableGrowth();
uint64_t target = cache->usage() / 2;
while (!cache->resize(target)) {
};
for (uint64_t i = 0; i < 16ULL * 1024ULL * 1024ULL; i++) {
CachedValue* value =
CachedValue::construct(&i, sizeof(uint64_t), &i, sizeof(uint64_t));
bool success = cache->insert(value);
if (!success) {
delete value;
}
}
while (cache->isResizing()) {
}
BOOST_CHECK_MESSAGE(cache->usage() <= target,
cache->usage() << " !<= " << target);
manager.destroyCache(cache);
RandomGenerator::shutdown();
}
////////////////////////////////////////////////////////////////////////////////
/// @brief test mixed load behavior (multi-threaded)
////////////////////////////////////////////////////////////////////////////////
BOOST_AUTO_TEST_CASE(tst_mt_mixed_load) {
uint64_t initialSize = 16ULL * 1024ULL;
RandomGenerator::initialize(RandomGenerator::RandomType::MERSENNE);
MockScheduler scheduler(4);
Manager manager(scheduler.ioService(), 1024ULL * 1024ULL * 1024ULL);
size_t threadCount = 4;
std::shared_ptr<Cache> cache =
manager.createCache(Manager::CacheType::Transactional, initialSize, true);
uint64_t chunkSize = 16 * 1024 * 1024;
uint64_t initialInserts = 4 * 1024 * 1024;
uint64_t operationCount = 16 * 1024 * 1024;
std::atomic<uint64_t> hitCount(0);
std::atomic<uint64_t> missCount(0);
auto worker = [&manager, &cache, initialInserts, operationCount, &hitCount,
&missCount](uint64_t lower, uint64_t upper) -> void {
Transaction* tx = manager.beginTransaction(false);
// fill with some initial data
for (uint64_t i = 0; i < initialInserts; i++) {
uint64_t item = lower + i;
CachedValue* value = CachedValue::construct(&item, sizeof(uint64_t),
&item, sizeof(uint64_t));
bool ok = cache->insert(value);
if (!ok) {
delete value;
}
}
// initialize valid range for keys that *might* be in cache
uint64_t validLower = lower;
uint64_t validUpper = lower + initialInserts - 1;
uint64_t blacklistUpper = validUpper;
// commence mixed workload
for (uint64_t i = 0; i < operationCount; i++) {
uint32_t r = RandomGenerator::interval(static_cast<uint32_t>(99UL));
if (r >= 99) { // remove something
if (validLower == validUpper) {
continue; // removed too much
}
uint64_t item = validLower++;
cache->remove(&item, sizeof(uint64_t));
} else if (r >= 90) { // insert something
if (validUpper == upper) {
continue; // already maxed out range
}
uint64_t item = ++validUpper;
if (validUpper > blacklistUpper) {
blacklistUpper = validUpper;
}
CachedValue* value = CachedValue::construct(&item, sizeof(uint64_t),
&item, sizeof(uint64_t));
bool ok = cache->insert(value);
if (!ok) {
delete value;
}
} else if (r >= 80) { // blacklist something
if (blacklistUpper == upper) {
continue; // already maxed out range
}
uint64_t item = ++blacklistUpper;
cache->blacklist(&item, sizeof(uint64_t));
} else { // lookup something
uint64_t item = RandomGenerator::interval(
static_cast<int64_t>(validLower), static_cast<int64_t>(validUpper));
Cache::Finding f = cache->find(&item, sizeof(uint64_t));
if (f.found()) {
hitCount++;
TRI_ASSERT(f.value() != nullptr);
TRI_ASSERT(f.value()->sameKey(&item, sizeof(uint64_t)));
} else {
missCount++;
TRI_ASSERT(f.value() == nullptr);
}
}
}
manager.endTransaction(tx);
};
std::vector<std::thread*> threads;
// dispatch threads
for (size_t i = 0; i < threadCount; i++) {
uint64_t lower = i * chunkSize;
uint64_t upper = ((i + 1) * chunkSize) - 1;
threads.push_back(new std::thread(worker, lower, upper));
}
// join threads
for (auto t : threads) {
t->join();
delete t;
}
manager.destroyCache(cache);
RandomGenerator::shutdown();
}
////////////////////////////////////////////////////////////////////////////////
/// @brief generate tests
////////////////////////////////////////////////////////////////////////////////
BOOST_AUTO_TEST_SUITE_END()
// Local Variables:
// mode: outline-minor
// outline-regexp: "^\\(/// @brief\\|/// {@inheritDoc}\\|/// @addtogroup\\|//
// --SECTION--\\|/// @\\}\\)"
// End:

View File

@ -0,0 +1,277 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief helper for cache suite
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2017 triagens GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is triAGENS GmbH, Cologne, Germany
///
/// @author Daniel H. Larkin
/// @author Copyright 2017, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#include "TransactionalStore.h"
#include "Basics/Common.h"
#include "Basics/StringBuffer.h"
#include "Basics/files.h"
#include "Cache/Manager.h"
#include "Cache/TransactionalCache.h"
#include <rocksdb/db.h>
#include <rocksdb/options.h>
#include <rocksdb/utilities/transaction.h>
#include <rocksdb/utilities/transaction_db.h>
#include <chrono>
#include <iostream>
using namespace arangodb::cache;
std::atomic<uint32_t> TransactionalStore::_sequence(0);
TransactionalStore::Document::Document() : Document(0) {}
TransactionalStore::Document::Document(uint64_t k)
: key(k),
timestamp(static_cast<uint64_t>(
std::chrono::steady_clock::now().time_since_epoch().count())),
sequence(0) {}
void TransactionalStore::Document::advance() {
timestamp = static_cast<uint64_t>(
std::chrono::steady_clock::now().time_since_epoch().count());
sequence++;
}
void TransactionalStore::Document::clear() {
memset(this, 0, sizeof(Document));
}
bool TransactionalStore::Document::empty() const { return (key == 0); }
TransactionalStore::Transaction::Transaction(arangodb::cache::Transaction* c,
rocksdb::Transaction* r)
: cache(c), rocks(r) {}
TransactionalStore::TransactionalStore(Manager* manager)
: _manager(manager),
_directory(TRI_UNKNOWN_MEM_ZONE),
_id(++_sequence),
_readOptions(rocksdb::ReadOptions()),
_writeOptions(rocksdb::WriteOptions()),
_txOptions(rocksdb::TransactionOptions()) {
TRI_ASSERT(manager != nullptr);
_cache = manager->createCache(Manager::CacheType::Transactional, 1024 * 1024,
true, true);
TRI_ASSERT(_cache.get() != nullptr);
_directory.appendText(TRI_GetTempPath());
_directory.appendChar(TRI_DIR_SEPARATOR_CHAR);
_directory.appendText("cache-test-transactional-store-");
_directory.appendText(std::to_string(_id));
rocksdb::Options options;
options.create_if_missing = true;
options.max_open_files = 128;
auto status = rocksdb::TransactionDB::Open(
options, rocksdb::TransactionDBOptions(), _directory.c_str(), &_db);
if (!status.ok()) {
std::cerr << status.ToString() << std::endl;
}
TRI_ASSERT(status.ok());
}
TransactionalStore::~TransactionalStore() {
delete _db;
TRI_ASSERT(_directory.length() > 20);
TRI_RemoveDirectory(_directory.c_str());
_manager->destroyCache(_cache);
}
Cache* TransactionalStore::cache() { return _cache.get(); }
TransactionalStore::Transaction* TransactionalStore::beginTransaction(
bool readOnly) {
auto cache = _manager->beginTransaction(readOnly);
auto rocks = _db->BeginTransaction(_writeOptions, _txOptions);
rocks->SetSnapshot();
return new Transaction(cache, rocks);
}
bool TransactionalStore::commit(TransactionalStore::Transaction* tx) {
if (tx != nullptr) {
auto status = tx->rocks->Commit();
if (status.ok()) {
_manager->endTransaction(tx->cache);
delete tx->rocks;
delete tx;
return true;
}
}
return false;
}
bool TransactionalStore::rollback(TransactionalStore::Transaction* tx) {
if (tx != nullptr) {
tx->rocks->Rollback();
_manager->endTransaction(tx->cache);
delete tx->rocks;
delete tx;
return true;
}
return false;
}
bool TransactionalStore::insert(TransactionalStore::Transaction* tx,
TransactionalStore::Document const& document) {
bool useInternalTransaction = (tx == nullptr);
if (useInternalTransaction) {
tx = beginTransaction(false);
}
bool inserted = false;
Document d = lookup(tx, document.key);
if (d.empty()) { // ensure document with this key does not exist
// blacklist in cache first
_cache->blacklist(&(document.key), sizeof(uint64_t));
// now write to rocksdb
rocksdb::Slice kSlice(reinterpret_cast<char const*>(&(document.key)),
sizeof(uint64_t));
rocksdb::Slice vSlice(reinterpret_cast<char const*>(&document),
sizeof(Document));
auto status = tx->rocks->Put(kSlice, vSlice);
inserted = status.ok();
}
if (useInternalTransaction) {
bool ok = commit(tx);
if (!ok) {
rollback(tx);
inserted = false;
}
}
return inserted;
}
bool TransactionalStore::update(TransactionalStore::Transaction* tx,
TransactionalStore::Document const& document) {
bool useInternalTransaction = (tx == nullptr);
if (useInternalTransaction) {
tx = beginTransaction(false);
}
bool updated = false;
Document d = lookup(tx, document.key);
if (!d.empty()) { // ensure document with this key exists
// blacklist in cache first
_cache->blacklist(&(document.key), sizeof(uint64_t));
// now write to rocksdb
rocksdb::Slice kSlice(reinterpret_cast<char const*>(&(document.key)),
sizeof(uint64_t));
rocksdb::Slice vSlice(reinterpret_cast<char const*>(&document),
sizeof(Document));
auto status = tx->rocks->Put(kSlice, vSlice);
updated = status.ok();
}
if (useInternalTransaction) {
bool ok = commit(tx);
if (!ok) {
rollback(tx);
updated = false;
}
}
return updated;
}
bool TransactionalStore::remove(TransactionalStore::Transaction* tx,
uint64_t key) {
bool useInternalTransaction = (tx == nullptr);
if (useInternalTransaction) {
tx = beginTransaction(false);
}
bool removed = false;
Document d = lookup(tx, key);
if (!d.empty()) { // ensure document with this key exists
// blacklist in cache first
_cache->blacklist(&key, sizeof(uint64_t));
// now write to rocksdb
rocksdb::Slice kSlice(reinterpret_cast<char*>(&key), sizeof(uint64_t));
auto status = tx->rocks->Delete(kSlice);
removed = status.ok();
}
if (useInternalTransaction) {
bool ok = commit(tx);
if (!ok) {
rollback(tx);
removed = false;
}
}
return removed;
}
TransactionalStore::Document TransactionalStore::lookup(
TransactionalStore::Transaction* tx, uint64_t key) {
bool useInternalTransaction = (tx == nullptr);
if (useInternalTransaction) {
tx = beginTransaction(true);
}
Document result;
{
Cache::Finding f = _cache->find(&key, sizeof(uint64_t));
if (f.found()) {
CachedValue const* cv = f.value();
memcpy(&result, cv->value(), sizeof(Document));
}
}
if (result.empty()) {
auto readOptions = rocksdb::ReadOptions();
readOptions.snapshot = tx->rocks->GetSnapshot();
rocksdb::Slice kSlice(reinterpret_cast<char*>(&key), sizeof(uint64_t));
std::string buffer;
auto status = tx->rocks->Get(readOptions, kSlice, &buffer);
if (status.ok()) {
memcpy(&result, buffer.data(), sizeof(Document));
CachedValue* value = CachedValue::construct(&key, sizeof(uint64_t),
&result, sizeof(Document));
bool inserted = _cache->insert(value);
if (!inserted) {
delete value;
}
}
}
if (useInternalTransaction) {
bool ok = commit(tx);
if (!ok) {
rollback(tx);
}
}
return result;
}

View File

@ -0,0 +1,98 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief helper for cache suite
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2017 triagens GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is triAGENS GmbH, Cologne, Germany
///
/// @author Daniel H. Larkin
/// @author Copyright 2017, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#ifndef UNITTESTS_CACHE_TRANSACTIONAL_STORE_H
#define UNITTESTS_CACHE_TRANSACTIONAL_STORE_H
#include "Basics/Common.h"
#include "Basics/StringBuffer.h"
#include "Cache/Manager.h"
#include "Cache/TransactionalCache.h"
#include <rocksdb/db.h>
#include <rocksdb/options.h>
#include <rocksdb/utilities/transaction.h>
#include <rocksdb/utilities/transaction_db.h>
#include <chrono>
namespace arangodb {
namespace cache {
class TransactionalStore {
public:
struct Document {
uint64_t key;
uint64_t timestamp;
uint64_t sequence;
Document();
Document(uint64_t k);
void advance();
void clear();
bool empty() const;
};
struct Transaction {
arangodb::cache::Transaction* cache;
rocksdb::Transaction* rocks;
Transaction(arangodb::cache::Transaction* c, rocksdb::Transaction* r);
};
public:
TransactionalStore(Manager* manager);
~TransactionalStore();
Cache* cache();
TransactionalStore::Transaction* beginTransaction(bool readOnly);
bool commit(TransactionalStore::Transaction* tx);
bool rollback(TransactionalStore::Transaction* tx);
bool insert(TransactionalStore::Transaction* tx, Document const& document);
bool update(TransactionalStore::Transaction* tx, Document const& document);
bool remove(TransactionalStore::Transaction* tx, uint64_t key);
Document lookup(TransactionalStore::Transaction* tx, uint64_t key);
private:
static std::atomic<uint32_t> _sequence;
Manager* _manager;
std::shared_ptr<Cache> _cache;
arangodb::basics::StringBuffer _directory;
uint32_t _id;
rocksdb::TransactionDB* _db;
rocksdb::ReadOptions _readOptions;
rocksdb::WriteOptions _writeOptions;
rocksdb::TransactionOptions _txOptions;
};
}; // end namespace cache
}; // end namespace arangodb
#endif

View File

@ -0,0 +1,487 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief test suite for arangodb::cache::TransactionalCache with backing store
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2017 triagens GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is triAGENS GmbH, Cologne, Germany
///
/// @author Daniel H. Larkin
/// @author Copyright 2017, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#include "Basics/Common.h"
#define BOOST_TEST_INCLUDED
#include <boost/test/unit_test.hpp>
#include "Cache/Manager.h"
#include "Cache/Rebalancer.h"
#include "Random/RandomGenerator.h"
#include "MockScheduler.h"
#include "TransactionalStore.h"
#include <stdint.h>
#include <chrono>
#include <iostream>
#include <thread>
#include <vector>
using namespace arangodb;
using namespace arangodb::cache;
// -----------------------------------------------------------------------------
// --SECTION-- setup / tear-down
// -----------------------------------------------------------------------------
struct CCacheTransactionsWithBackingStoreSetup {
CCacheTransactionsWithBackingStoreSetup() {
BOOST_TEST_MESSAGE("setup TransactionsWithBackingStore");
}
~CCacheTransactionsWithBackingStoreSetup() {
BOOST_TEST_MESSAGE("tear-down TransactionsWithBackingStore");
}
};
// -----------------------------------------------------------------------------
// --SECTION-- test suite
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief setup
////////////////////////////////////////////////////////////////////////////////
BOOST_FIXTURE_TEST_SUITE(CCacheTransactionsWithBackingStoreTest,
CCacheTransactionsWithBackingStoreSetup)
/*
Planned Tests
=============
All tests begin by filling the database with a set number of documents. After
that, all writes consist of updates via the Document::advance() API to both keep
things simple and to provide a reliable way to test what version of a document a
reader gets.
1) Single store; Read-only; hotset access pattern
- Test for hit rate
2) Single store; Simultaneous read, write threads, part 1
- Have writers sleep a while between transactions
- Have readers read single documents with only internal transactions
- Test for hit rate
3) Single store; Simultaneous read, write threads, part 2
- Have writers sleep a while between transactions
- Have readers read a set of documents within a transaction
- Test for snapshot isolation to the extent possible
- Test for hit rate
4) Multiple stores with rebalancing; Simultaneous read, write threads
- Use small global limit to provide memory pressure
- Vary store-access bias over time to check that rebalancing helps
- Have writers sleep a while between transactions
- Have readers read a set of documents within a transaction
*/
////////////////////////////////////////////////////////////////////////////////
/// @brief test hit rate for read-only hotset workload
////////////////////////////////////////////////////////////////////////////////
BOOST_AUTO_TEST_CASE(tst_single_readonly_hotset) {
RandomGenerator::initialize(RandomGenerator::RandomType::MERSENNE);
MockScheduler scheduler(4);
Manager manager(scheduler.ioService(), 16 * 1024 * 1024);
TransactionalStore store(&manager);
uint64_t totalDocuments = 1000000;
uint64_t hotsetSize = 50000;
size_t threadCount = 4;
uint64_t lookupsPerThread = 1000000;
// initial fill
for (uint64_t i = 1; i <= totalDocuments; i++) {
store.insert(nullptr, TransactionalStore::Document(i));
}
auto worker = [&store, hotsetSize, totalDocuments,
lookupsPerThread]() -> void {
for (uint64_t i = 0; i < lookupsPerThread; i++) {
uint32_t r = RandomGenerator::interval(static_cast<uint32_t>(99));
uint64_t choice = (r >= 90) ? RandomGenerator::interval(totalDocuments)
: RandomGenerator::interval(hotsetSize);
if (choice == 0) {
choice = 1;
}
auto d = store.lookup(nullptr, choice);
TRI_ASSERT(!d.empty());
}
};
std::vector<std::thread*> threads;
// dispatch threads
for (size_t i = 0; i < threadCount; i++) {
threads.push_back(new std::thread(worker));
}
// join threads
for (auto t : threads) {
t->join();
delete t;
}
auto hitRates = manager.globalHitRates();
BOOST_CHECK(hitRates.first >= 65.0);
BOOST_CHECK(hitRates.second >= 85.0);
RandomGenerator::shutdown();
}
////////////////////////////////////////////////////////////////////////////////
/// @brief test hit rate for mixed workload
////////////////////////////////////////////////////////////////////////////////
BOOST_AUTO_TEST_CASE(tst_single_mixed_hitrate) {
RandomGenerator::initialize(RandomGenerator::RandomType::MERSENNE);
MockScheduler scheduler(4);
Manager manager(scheduler.ioService(), 256 * 1024 * 1024);
TransactionalStore store(&manager);
uint64_t totalDocuments = 1000000;
uint64_t batchSize = 1000;
size_t readerCount = 4;
size_t writerCount = 2;
std::atomic<size_t> writersDone(0);
auto writeWaitInterval = std::chrono::milliseconds(10);
// initial fill
for (uint64_t i = 1; i <= totalDocuments; i++) {
store.insert(nullptr, TransactionalStore::Document(i));
}
auto readWorker = [&store, &writersDone, writerCount,
totalDocuments]() -> void {
while (writersDone.load() < writerCount) {
uint64_t choice = RandomGenerator::interval(totalDocuments);
if (choice == 0) {
choice = 1;
}
auto d = store.lookup(nullptr, choice);
TRI_ASSERT(!d.empty());
}
};
auto writeWorker = [&store, &writersDone, writerCount, totalDocuments,
batchSize, &writeWaitInterval](uint64_t lower,
uint64_t upper) -> void {
uint64_t batches = (upper + 1 - lower) / batchSize;
uint64_t choice = lower;
for (uint64_t batch = 0; batch < batches; batch++) {
auto tx = store.beginTransaction(false);
for (uint64_t i = 0; i < batchSize; i++) {
auto d = store.lookup(tx, choice);
TRI_ASSERT(!d.empty());
d.advance();
bool ok = store.update(tx, d);
TRI_ASSERT(ok);
choice++;
}
bool ok = store.commit(tx);
TRI_ASSERT(ok);
std::this_thread::sleep_for(writeWaitInterval);
}
writersDone++;
};
std::vector<std::thread*> threads;
// dispatch reader threads
for (size_t i = 0; i < readerCount; i++) {
threads.push_back(new std::thread(readWorker));
}
// dispatch writer threads
uint64_t chunkSize = totalDocuments / writerCount;
for (size_t i = 0; i < writerCount; i++) {
uint64_t lower = (i * chunkSize) + 1;
uint64_t upper = ((i + 1) * chunkSize);
threads.push_back(new std::thread(writeWorker, lower, upper));
}
// join threads
for (auto t : threads) {
t->join();
delete t;
}
auto hitRates = manager.globalHitRates();
BOOST_CHECK(hitRates.first >= 40.0);
BOOST_CHECK(hitRates.second >= 60.0);
RandomGenerator::shutdown();
}
////////////////////////////////////////////////////////////////////////////////
/// @brief test transactionality for mixed workload
////////////////////////////////////////////////////////////////////////////////
BOOST_AUTO_TEST_CASE(tst_single_mixed_transactionality) {
RandomGenerator::initialize(RandomGenerator::RandomType::MERSENNE);
MockScheduler scheduler(4);
Manager manager(scheduler.ioService(), 256 * 1024 * 1024);
TransactionalStore store(&manager);
uint64_t totalDocuments = 1000000;
uint64_t writeBatchSize = 1000;
uint64_t readBatchSize = 10000;
size_t readerCount = 4;
size_t writerCount = 2;
std::atomic<size_t> writersDone(0);
auto writeWaitInterval = std::chrono::milliseconds(10);
// initial fill
for (uint64_t i = 1; i <= totalDocuments; i++) {
store.insert(nullptr, TransactionalStore::Document(i));
}
auto readWorker = [&store, &writersDone, writerCount, totalDocuments,
readBatchSize]() -> void {
while (writersDone.load() < writerCount) {
auto tx = store.beginTransaction(true);
uint64_t start = static_cast<uint64_t>(
std::chrono::steady_clock::now().time_since_epoch().count());
for (uint64_t i = 0; i < readBatchSize; i++) {
uint64_t choice = RandomGenerator::interval(totalDocuments);
if (choice == 0) {
choice = 1;
}
auto d = store.lookup(tx, choice);
TRI_ASSERT(!d.empty());
TRI_ASSERT(d.timestamp <= start); // transactionality
}
bool ok = store.commit(tx);
TRI_ASSERT(ok);
}
};
auto writeWorker = [&store, &writersDone, writerCount, totalDocuments,
writeBatchSize, &writeWaitInterval](
uint64_t lower, uint64_t upper) -> void {
uint64_t batches = (upper + 1 - lower) / writeBatchSize;
uint64_t choice = lower;
for (uint64_t batch = 0; batch < batches; batch++) {
auto tx = store.beginTransaction(false);
for (uint64_t i = 0; i < writeBatchSize; i++) {
auto d = store.lookup(tx, choice);
TRI_ASSERT(!d.empty());
d.advance();
bool ok = store.update(tx, d);
TRI_ASSERT(ok);
choice++;
}
bool ok = store.commit(tx);
TRI_ASSERT(ok);
std::this_thread::sleep_for(writeWaitInterval);
}
writersDone++;
};
std::vector<std::thread*> threads;
// dispatch reader threads
for (size_t i = 0; i < readerCount; i++) {
threads.push_back(new std::thread(readWorker));
}
// dispatch writer threads
uint64_t chunkSize = totalDocuments / writerCount;
for (size_t i = 0; i < writerCount; i++) {
uint64_t lower = (i * chunkSize) + 1;
uint64_t upper = ((i + 1) * chunkSize);
threads.push_back(new std::thread(writeWorker, lower, upper));
}
// join threads
for (auto t : threads) {
t->join();
delete t;
}
RandomGenerator::shutdown();
}
////////////////////////////////////////////////////////////////////////////////
/// @brief test rebalancing in the wild
////////////////////////////////////////////////////////////////////////////////
BOOST_AUTO_TEST_CASE(tst_multi_rebalancing) {
RandomGenerator::initialize(RandomGenerator::RandomType::MERSENNE);
MockScheduler scheduler(4);
Manager manager(scheduler.ioService(), 16 * 1024 * 1024);
Rebalancer rebalancer(&manager);
TransactionalStore store1(&manager);
TransactionalStore store2(&manager);
uint64_t totalDocuments = 1000000;
uint64_t writeBatchSize = 1000;
uint64_t readBatchSize = 100;
size_t readerCount = 4;
size_t writerCount = 2;
std::atomic<size_t> writersDone(0);
auto writeWaitInterval = std::chrono::milliseconds(50);
uint32_t storeBias;
bool doneRebalancing = false;
auto rebalanceWorker = [&rebalancer, &doneRebalancing]() -> void {
while (!doneRebalancing) {
bool rebalanced = rebalancer.rebalance();
if (rebalanced) {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
} else {
std::this_thread::sleep_for(std::chrono::microseconds(100));
}
}
};
auto rebalancerThread = new std::thread(rebalanceWorker);
// initial fill
for (uint64_t i = 1; i <= totalDocuments; i++) {
store1.insert(nullptr, TransactionalStore::Document(i));
store2.insert(nullptr, TransactionalStore::Document(i));
}
auto readWorker = [&store1, &store2, &storeBias, &writersDone, writerCount,
totalDocuments, readBatchSize]() -> void {
while (writersDone.load() < writerCount) {
uint32_t r = RandomGenerator::interval(99UL);
TransactionalStore* store = (r <= storeBias) ? &store1 : &store2;
auto tx = store->beginTransaction(true);
uint64_t start = static_cast<uint64_t>(
std::chrono::steady_clock::now().time_since_epoch().count());
for (uint64_t i = 0; i < readBatchSize; i++) {
uint64_t choice = RandomGenerator::interval(totalDocuments);
if (choice == 0) {
choice = 1;
}
auto d = store->lookup(tx, choice);
TRI_ASSERT(!d.empty());
TRI_ASSERT(d.timestamp <= start); // transactionality
}
bool ok = store->commit(tx);
TRI_ASSERT(ok);
}
};
auto writeWorker = [&store1, &store2, &storeBias, &writersDone, writerCount,
totalDocuments, writeBatchSize, &writeWaitInterval](
uint64_t lower, uint64_t upper) -> void {
uint64_t batches = (upper + 1 - lower) / writeBatchSize;
uint64_t choice = lower;
for (uint64_t batch = 0; batch < batches; batch++) {
uint32_t r = RandomGenerator::interval(99UL);
TransactionalStore* store = (r <= storeBias) ? &store1 : &store2;
auto tx = store->beginTransaction(false);
for (uint64_t i = 0; i < writeBatchSize; i++) {
auto d = store->lookup(tx, choice);
TRI_ASSERT(!d.empty());
d.advance();
bool ok = store->update(tx, d);
TRI_ASSERT(ok);
choice++;
}
bool ok = store->commit(tx);
TRI_ASSERT(ok);
std::this_thread::sleep_for(writeWaitInterval);
}
writersDone++;
};
std::vector<std::thread*> threads;
// bias toward first store
storeBias = 80;
// dispatch reader threads
for (size_t i = 0; i < readerCount; i++) {
threads.push_back(new std::thread(readWorker));
}
// dispatch writer threads
uint64_t chunkSize = totalDocuments / writerCount;
for (size_t i = 0; i < writerCount; i++) {
uint64_t lower = (i * chunkSize) + 1;
uint64_t upper = ((i + 1) * chunkSize);
threads.push_back(new std::thread(writeWorker, lower, upper));
}
// join threads
for (auto t : threads) {
t->join();
delete t;
}
while (store1.cache()->isResizing() || store2.cache()->isResizing()) {
std::this_thread::yield();
}
/*BOOST_CHECK_MESSAGE(2 * store1.cache()->limit() > store2.cache()->limit(),
2 * store1.cache()->limit() << " !> "
<< store2.cache()->limit());
*/
threads.clear();
// bias toward second store
storeBias = 20;
// dispatch reader threads
for (size_t i = 0; i < readerCount; i++) {
threads.push_back(new std::thread(readWorker));
}
// dispatch writer threads
for (size_t i = 0; i < writerCount; i++) {
uint64_t lower = (i * chunkSize) + 1;
uint64_t upper = ((i + 1) * chunkSize);
threads.push_back(new std::thread(writeWorker, lower, upper));
}
// join threads
for (auto t : threads) {
t->join();
delete t;
}
while (store1.cache()->isResizing() || store2.cache()->isResizing()) {
std::this_thread::yield();
}
/*BOOST_CHECK_MESSAGE(store1.cache()->limit() < 2 * store2.cache()->limit(),
store1.cache()->limit() << " !< "
<< 2 * store2.cache()->limit());
*/
doneRebalancing = true;
rebalancerThread->join();
delete rebalancerThread;
RandomGenerator::shutdown();
}
////////////////////////////////////////////////////////////////////////////////
/// @brief generate tests
////////////////////////////////////////////////////////////////////////////////
BOOST_AUTO_TEST_SUITE_END()
// Local Variables:
// mode: outline-minor
// outline-regexp: "^\\(/// @brief\\|/// {@inheritDoc}\\|/// @addtogroup\\|//
// --SECTION--\\|/// @\\}\\)"
// End:

View File

@ -180,9 +180,10 @@ SET(ARANGOD_SOURCES
Cache/PlainCache.cpp
Cache/Rebalancer.cpp
Cache/State.cpp
Cache/Transaction.cpp
Cache/TransactionalBucket.cpp
Cache/TransactionalCache.cpp
Cache/TransactionWindow.cpp
Cache/TransactionManager.cpp
Cluster/AgencyCallback.cpp
Cluster/AgencyCallbackRegistry.cpp
Cluster/ClusterComm.cpp

View File

@ -38,6 +38,11 @@
using namespace arangodb::cache;
uint64_t Cache::_evictionStatsCapacity = 1024;
uint64_t Cache::_findStatsCapacity = 16384;
Cache::ConstructionGuard::ConstructionGuard() {}
Cache::Finding::Finding(CachedValue* v) : _value(v) {
if (_value != nullptr) {
_value->lease();
@ -111,9 +116,30 @@ CachedValue* Cache::Finding::copy() const {
return ((_value == nullptr) ? nullptr : _value->copy());
}
void Cache::destroy(std::shared_ptr<Cache> cache) {
if (cache.get() != nullptr) {
cache->shutdown();
Cache::Cache(ConstructionGuard guard, Manager* manager,
Manager::MetadataItr metadata, bool allowGrowth,
bool enableWindowedStats)
: _state(),
_allowGrowth(allowGrowth),
_evictionStats(_evictionStatsCapacity),
_insertionCount(0),
_enableWindowedStats(enableWindowedStats),
_findStats(nullptr),
_findHits(0),
_findMisses(0),
_manager(manager),
_metadata(metadata),
_openOperations(0),
_migrateRequestTime(std::chrono::steady_clock::now()),
_resizeRequestTime(std::chrono::steady_clock::now()),
_lastResizeRequestStatus(true) {
if (_enableWindowedStats) {
try {
_findStats.reset(new StatBuffer(_findStatsCapacity));
} catch (std::bad_alloc) {
_findStats.reset(nullptr);
_enableWindowedStats = false;
}
}
}
@ -142,12 +168,15 @@ uint64_t Cache::usage() {
}
std::pair<double, double> Cache::hitRates() {
double lifetimeRate = std::nan("");
double windowedRate = std::nan("");
uint64_t currentMisses = _findMisses.load();
uint64_t currentHits = _findHits.load();
double lifetimeRate = 100 * (static_cast<double>(currentHits) /
static_cast<double>(currentHits + currentMisses));
if (currentMisses + currentHits > 0) {
lifetimeRate = 100 * (static_cast<double>(currentHits) /
static_cast<double>(currentHits + currentMisses));
}
if (_enableWindowedStats && _findStats.get() != nullptr) {
auto stats = _findStats->getFrequencies();
@ -165,8 +194,10 @@ std::pair<double, double> Cache::hitRates() {
currentHits = (*stats)[1].second;
currentMisses = (*stats)[0].second;
}
windowedRate = 100 * (static_cast<double>(currentHits) /
static_cast<double>(currentHits + currentMisses));
if (currentHits + currentMisses > 0) {
windowedRate = 100 * (static_cast<double>(currentHits) /
static_cast<double>(currentHits + currentMisses));
}
}
}
@ -223,38 +254,9 @@ bool Cache::isResizing() {
return resizing;
}
Cache::Cache(Manager* manager, uint64_t requestedLimit, bool allowGrowth,
bool enableWindowedStats, std::function<void(Cache*)> deleter,
uint64_t size)
: _state(),
_allowGrowth(allowGrowth),
_evictionStats(1024),
_insertionCount(0),
_enableWindowedStats(enableWindowedStats),
_findStats(nullptr),
_findHits(0),
_findMisses(0),
_manager(manager),
_openOperations(0),
_migrateRequestTime(std::chrono::steady_clock::now()),
_resizeRequestTime(std::chrono::steady_clock::now()) {
try {
uint64_t fullSize =
size + _evictionStats.memoryUsage() +
((_findStats.get() == nullptr) ? 0 : _findStats->memoryUsage());
_metadata =
_manager->registerCache(this, requestedLimit, deleter, fullSize);
} catch (std::bad_alloc) {
// could not register, mark as non-operational
if (!_state.isSet(State::Flag::shutdown)) {
_state.toggleFlag(State::Flag::shutdown);
}
}
try {
_findStats.reset(new StatBuffer(16384));
} catch (std::bad_alloc) {
_findStats.reset(nullptr);
_enableWindowedStats = false;
void Cache::destroy(std::shared_ptr<Cache> cache) {
if (cache.get() != nullptr) {
cache->shutdown();
}
}
@ -282,7 +284,13 @@ bool Cache::requestResize(uint64_t requestedLimit, bool internal) {
_resizeRequestTime))) {
_metadata->lock();
uint64_t newLimit =
(requestedLimit > 0) ? requestedLimit : (_metadata->hardLimit() * 2);
(requestedLimit > 0)
? requestedLimit
: (_lastResizeRequestStatus
? (_metadata->hardLimit() * 2)
: (static_cast<uint64_t>(
static_cast<double>(_metadata->hardLimit()) *
1.25)));
_metadata->unlock();
auto result = _manager->requestResize(_metadata, newLimit);
_resizeRequestTime = result.second;
@ -384,6 +392,11 @@ void Cache::beginShutdown() {
void Cache::shutdown() {
_state.lock();
_metadata->lock();
auto handle = _metadata->cache(); // hold onto self-reference to prevent
// pre-mature shared_ptr destruction
TRI_ASSERT(handle.get() == this);
_metadata->unlock();
if (!_state.isSet(State::Flag::shutdown)) {
if (!_state.isSet(State::Flag::shuttingDown)) {
_state.toggleFlag(State::Flag::shuttingDown);

View File

@ -39,13 +39,27 @@
namespace arangodb {
namespace cache {
class PlainCache; // forward declaration
class TransactionalCache; // forward declaration
////////////////////////////////////////////////////////////////////////////////
/// @brief The common structure of all caches managed by Manager.
///
/// Any pure virtual functions are documented in derived classes implementing
/// them.
////////////////////////////////////////////////////////////////////////////////
class Cache {
class Cache : public std::enable_shared_from_this<Cache> {
protected:
//////////////////////////////////////////////////////////////////////////////
/// @brief A dummy class to restrict constructor access.
//////////////////////////////////////////////////////////////////////////////
class ConstructionGuard {
private:
ConstructionGuard();
friend class PlainCache;
friend class TransactionalCache;
};
public:
typedef FrequencyBuffer<uint8_t> StatBuffer;
@ -93,10 +107,16 @@ class Cache {
};
public:
Cache(ConstructionGuard guard, Manager* manager,
Manager::MetadataItr metadata, bool allowGrowth,
bool enableWindowedStats);
virtual ~Cache() = default;
// primary functionality; documented in derived classes
virtual Finding find(void const* key, uint32_t keySize) = 0;
virtual bool insert(CachedValue* value) = 0;
virtual bool remove(void const* key, uint32_t keySize) = 0;
virtual bool blacklist(void const* key, uint32_t keySize) = 0;
//////////////////////////////////////////////////////////////////////////////
/// @brief Returns the limit on memory usage for this cache in bytes.
@ -157,8 +177,10 @@ class Cache {
insertEviction = 3,
insertNoEviction = 4
};
static uint64_t _evictionStatsCapacity;
StatBuffer _evictionStats;
std::atomic<uint64_t> _insertionCount;
static uint64_t _findStatsCapacity;
bool _enableWindowedStats;
std::unique_ptr<StatBuffer> _findStats;
std::atomic<uint64_t> _findHits;
@ -174,6 +196,7 @@ class Cache {
// times to wait until requesting is allowed again
Manager::time_point _migrateRequestTime;
Manager::time_point _resizeRequestTime;
bool _lastResizeRequestStatus;
// friend class manager and tasks
friend class FreeMemoryTask;
@ -184,12 +207,6 @@ class Cache {
// shutdown cache and let its memory be reclaimed
static void destroy(std::shared_ptr<Cache> cache);
Cache(Manager* manager, uint64_t requestedLimit, bool allowGrowth,
bool enableWindowedStats, std::function<void(Cache*)> deleter,
uint64_t size);
virtual ~Cache() = default;
bool isOperational() const;
void startOperation();
void endOperation();

View File

@ -46,7 +46,8 @@ namespace cache {
/// which over-writes itself after it fills up (thus only maintaining a recent
/// window on the records).
////////////////////////////////////////////////////////////////////////////////
template <class T>
template <class T, class Comparator = std::equal_to<T>,
class Hasher = std::hash<T>>
class FrequencyBuffer {
public:
typedef std::vector<std::pair<T, uint64_t>> stats_t;
@ -55,43 +56,54 @@ class FrequencyBuffer {
std::atomic<uint64_t> _current;
uint64_t _capacity;
uint64_t _mask;
std::unique_ptr<T[]> _buffer;
std::unique_ptr<std::vector<T>> _buffer;
Comparator _cmp;
T _empty;
public:
//////////////////////////////////////////////////////////////////////////////
/// @brief Initialize with the given capacity.
//////////////////////////////////////////////////////////////////////////////
FrequencyBuffer(uint64_t capacity) : _current(0) {
size_t i = 0;
for (; (1ULL << i) < capacity; i++) {
FrequencyBuffer(uint64_t capacity) : _current(0), _cmp(), _empty() {
uint64_t i = 0;
for (; (static_cast<uint64_t>(1) << i) < capacity; i++) {
}
_capacity = (1ULL << i);
_capacity = (static_cast<uint64_t>(1) << i);
_mask = _capacity - 1;
_buffer.reset(new T[_capacity]());
_buffer.reset(new std::vector<T>(_capacity));
TRI_ASSERT(_buffer->capacity() == _capacity);
TRI_ASSERT(_buffer->size() == _capacity);
}
//////////////////////////////////////////////////////////////////////////////
/// @brief Reports the hidden allocation size (not captured by sizeof).
//////////////////////////////////////////////////////////////////////////////
static uint64_t allocationSize(uint64_t capacity) {
return sizeof(std::vector<T>) + (capacity * sizeof(T));
}
//////////////////////////////////////////////////////////////////////////////
/// @brief Reports the memory usage in bytes.
//////////////////////////////////////////////////////////////////////////////
uint64_t memoryUsage() {
return ((_capacity * sizeof(T)) + sizeof(FrequencyBuffer<T>));
return ((_capacity * sizeof(T)) + sizeof(FrequencyBuffer<T>) +
sizeof(std::vector<T>));
}
//////////////////////////////////////////////////////////////////////////////
/// @brief Insert an individual event record.
//////////////////////////////////////////////////////////////////////////////
void insertRecord(T const& record) {
++_current;
_buffer[_current & _mask] = record;
void insertRecord(T record) {
(*_buffer)[_current++ & _mask] = record;
}
//////////////////////////////////////////////////////////////////////////////
/// @brief Remove all occurrences of the specified event record.
//////////////////////////////////////////////////////////////////////////////
void purgeRecord(T const& record) {
void purgeRecord(T record) {
for (size_t i = 0; i < _capacity; i++) {
if (_buffer[i] == record) {
_buffer[i] = T();
if (_cmp((*_buffer)[i], record)) {
(*_buffer)[i] = _empty;
}
}
}
@ -102,10 +114,10 @@ class FrequencyBuffer {
//////////////////////////////////////////////////////////////////////////////
std::shared_ptr<typename FrequencyBuffer::stats_t> getFrequencies() const {
// calculate frequencies
std::unordered_map<T, uint64_t> frequencies;
std::unordered_map<T, uint64_t, Hasher, Comparator> frequencies;
for (size_t i = 0; i < _capacity; i++) {
T entry = _buffer[i];
if (entry != T()) {
T const entry = (*_buffer)[i];
if (!_cmp(entry, _empty)) {
frequencies[entry]++;
}
}
@ -129,7 +141,7 @@ class FrequencyBuffer {
//////////////////////////////////////////////////////////////////////////////
void clear() {
for (size_t i = 0; i < _capacity; i++) {
_buffer[i] = T();
(*_buffer)[i] = T();
}
}
};

View File

@ -31,6 +31,7 @@
#include "Cache/Metadata.h"
#include "Cache/PlainCache.h"
#include "Cache/State.h"
#include "Cache/Transaction.h"
#include "Cache/TransactionalCache.h"
#include <stdint.h>
@ -57,6 +58,17 @@ static constexpr uint64_t CACHE_RECORD_OVERHEAD = sizeof(Metadata) + 16;
static constexpr uint64_t TABLE_LISTS_OVERHEAD = 32 * 16 * 8;
static constexpr int64_t TRIES_FAST = 100;
bool Manager::cmp_weak_ptr::operator()(
std::weak_ptr<Cache> const& left, std::weak_ptr<Cache> const& right) const {
return !left.owner_before(right) && !right.owner_before(left);
}
size_t Manager::hash_weak_ptr::operator()(
const std::weak_ptr<Cache>& wp) const {
auto sp = wp.lock();
return std::hash<decltype(sp)>()(sp);
}
Manager::Manager(boost::asio::io_service* ioService, uint64_t globalLimit,
bool enableWindowedStats)
: _state(),
@ -98,21 +110,38 @@ std::shared_ptr<Cache> Manager::createCache(Manager::CacheType type,
std::shared_ptr<Cache> result(nullptr);
_state.lock();
bool allowed = isOperational();
MetadataItr metadata = _caches.end();
_state.unlock();
if (allowed) {
uint64_t fixedSize = 0;
switch (type) {
case CacheType::Plain:
fixedSize = PlainCache::allocationSize(enableWindowedStats);
break;
case CacheType::Transactional:
fixedSize = TransactionalCache::allocationSize(enableWindowedStats);
break;
default:
break;
}
std::tie(allowed, metadata) = registerCache(requestedLimit, fixedSize);
}
if (allowed) {
switch (type) {
case CacheType::Plain:
result = PlainCache::create(this, requestedLimit, allowGrowth,
result = PlainCache::create(this, metadata, allowGrowth,
enableWindowedStats);
break;
case CacheType::Transactional:
result = TransactionalCache::create(this, requestedLimit, allowGrowth,
result = TransactionalCache::create(this, metadata, allowGrowth,
enableWindowedStats);
break;
default:
break;
}
metadata->link(result);
}
return result;
@ -197,11 +226,15 @@ uint64_t Manager::globalAllocation() {
}
std::pair<double, double> Manager::globalHitRates() {
double lifetimeRate = std::nan("");
double windowedRate = std::nan("");
uint64_t currentMisses = _findMisses.load();
uint64_t currentHits = _findHits.load();
double lifetimeRate = 100 * (static_cast<double>(currentHits) /
static_cast<double>(currentHits + currentMisses));
uint64_t currentMisses = _findMisses.load();
if (currentHits + currentMisses > 0) {
lifetimeRate = 100 * (static_cast<double>(currentHits) /
static_cast<double>(currentHits + currentMisses));
}
if (_enableWindowedStats && _findStats.get() != nullptr) {
auto stats = _findStats->getFrequencies();
@ -219,22 +252,25 @@ std::pair<double, double> Manager::globalHitRates() {
currentHits = (*stats)[1].second;
currentMisses = (*stats)[0].second;
}
windowedRate = 100 * (static_cast<double>(currentHits) /
static_cast<double>(currentHits + currentMisses));
if (currentHits + currentMisses > 0) {
windowedRate = 100 * (static_cast<double>(currentHits) /
static_cast<double>(currentHits + currentMisses));
}
}
}
return std::pair<double, double>(lifetimeRate, windowedRate);
}
void Manager::startTransaction() { _transactions.start(); }
Transaction* Manager::beginTransaction(bool readOnly) {
return _transactions.begin(readOnly);
}
void Manager::endTransaction() { _transactions.end(); }
void Manager::endTransaction(Transaction* tx) { _transactions.end(tx); }
Manager::MetadataItr Manager::registerCache(Cache* cache,
uint64_t requestedLimit,
std::function<void(Cache*)> deleter,
uint64_t fixedSize) {
std::pair<bool, Manager::MetadataItr> Manager::registerCache(
uint64_t requestedLimit, uint64_t fixedSize) {
bool ok = true;
uint32_t logSize = 0;
uint32_t tableLogSize = MIN_TABLE_LOG_SIZE;
for (; (1ULL << logSize) < requestedLimit; logSize++) {
@ -246,39 +282,42 @@ Manager::MetadataItr Manager::registerCache(Cache* cache,
_state.lock();
if (!isOperational()) {
_state.unlock();
throw std::bad_alloc();
ok = false;
}
while (logSize >= MIN_LOG_SIZE) {
uint64_t tableAllocation =
_tables[tableLogSize].empty() ? tableSize(tableLogSize) : 0;
if (increaseAllowed(grantedLimit + tableAllocation + CACHE_RECORD_OVERHEAD +
fixedSize)) {
break;
if (ok) {
while (logSize >= MIN_LOG_SIZE) {
uint64_t tableAllocation =
_tables[tableLogSize].empty() ? tableSize(tableLogSize) : 0;
if (increaseAllowed(grantedLimit + tableAllocation +
CACHE_RECORD_OVERHEAD + fixedSize)) {
break;
}
grantedLimit >>= 1U;
logSize--;
if (tableLogSize > MIN_TABLE_LOG_SIZE) {
tableLogSize--;
}
}
grantedLimit >>= 1U;
logSize--;
if (tableLogSize > MIN_TABLE_LOG_SIZE) {
tableLogSize--;
if (logSize < MIN_LOG_SIZE) {
ok = false;
}
}
if (logSize < MIN_LOG_SIZE) {
_state.unlock();
throw std::bad_alloc();
MetadataItr metadata = _caches.end();
if (ok) {
_globalAllocation += (grantedLimit + CACHE_RECORD_OVERHEAD + fixedSize);
_caches.emplace_front(grantedLimit);
metadata = _caches.begin();
metadata->lock();
leaseTable(metadata, tableLogSize);
metadata->unlock();
}
_globalAllocation += (grantedLimit + CACHE_RECORD_OVERHEAD + fixedSize);
_caches.emplace_front(std::shared_ptr<Cache>(cache, deleter), grantedLimit);
MetadataItr metadata = _caches.begin();
metadata->lock();
leaseTable(metadata, tableLogSize);
metadata->unlock();
_state.unlock();
return metadata;
return std::pair<bool, MetadataItr>(ok, metadata);
}
void Manager::unregisterCache(Manager::MetadataItr& metadata) {
@ -302,7 +341,7 @@ void Manager::unregisterCache(Manager::MetadataItr& metadata) {
std::pair<bool, Manager::time_point> Manager::requestResize(
Manager::MetadataItr& metadata, uint64_t requestedLimit) {
Manager::time_point nextRequest = futureTime(30);
Manager::time_point nextRequest = futureTime(100);
bool allowed = false;
bool ok = _state.lock(TRIES_FAST);
@ -332,7 +371,7 @@ std::pair<bool, Manager::time_point> Manager::requestResize(
std::pair<bool, Manager::time_point> Manager::requestMigrate(
Manager::MetadataItr& metadata, uint32_t requestedLogSize) {
Manager::time_point nextRequest = futureTime(30);
Manager::time_point nextRequest = futureTime(100);
bool allowed = false;
bool ok = _state.lock(TRIES_FAST);
@ -361,9 +400,10 @@ std::pair<bool, Manager::time_point> Manager::requestMigrate(
}
void Manager::reportAccess(std::shared_ptr<Cache> cache) {
if (((++_accessCounter) & 0x7FULL) == 0) { // record 1 in 128
_accessStats.insertRecord(cache);
}
// if (((++_accessCounter) & static_cast<uint64_t>(7)) == 0) { // record 1 in
// 8
_accessStats.insertRecord(cache);
//}
}
void Manager::recordHitStat(Manager::Stat stat) {
@ -455,26 +495,35 @@ bool Manager::rebalance() {
// allow background tasks if more than 7/8ths full
bool allowTasks =
_globalAllocation > (_globalHardLimit - (_globalHardLimit >> 3));
_globalAllocation >
static_cast<uint64_t>(0.875 * static_cast<double>(_globalHardLimit));
// be aggressive if more than 3/4ths full
bool beAggressive =
_globalAllocation > (_globalHardLimit - (_globalHardLimit >> 2));
_globalAllocation >
static_cast<uint64_t>(0.75 * static_cast<double>(_globalHardLimit));
// aim for 1/4th with background tasks, 1/8th if no tasks but aggressive, no
// aim for 3/8th with background tasks, 1/4th if no tasks but aggressive, no
// goal otherwise
uint64_t goal = beAggressive ? (allowTasks ? (_globalAllocation >> 2)
: (_globalAllocation >> 3))
: 0;
uint64_t goal =
beAggressive
? (allowTasks ? static_cast<uint64_t>(
0.375 * static_cast<double>(_globalHardLimit))
: static_cast<uint64_t>(
0.25 * static_cast<double>(_globalHardLimit)))
: 0;
// get stats on cache access to prioritize freeing from less frequently used
// caches first, so more frequently used ones stay large
std::shared_ptr<PriorityList> cacheList = priorityList();
if (goal > 0) {
// get stats on cache access to prioritize freeing from less frequently used
// caches first, so more frequently used ones stay large
std::shared_ptr<PriorityList> cacheList = priorityList();
// just adjust limits
uint64_t reclaimed = resizeAllCaches(TaskEnvironment::rebalancing, cacheList,
allowTasks, beAggressive, goal);
_globalAllocation -= reclaimed;
// just adjust limits
uint64_t reclaimed =
resizeAllCaches(TaskEnvironment::rebalancing, cacheList, allowTasks,
beAggressive, goal);
_globalAllocation -= reclaimed;
}
if (_rebalancingTasks.load() == 0) {
_state.toggleFlag(State::Flag::rebalancing);
@ -518,8 +567,9 @@ void Manager::internalResize(uint64_t newGlobalLimit, bool firstAttempt) {
cacheList = priorityList();
// first just adjust limits down to usage
uint64_t reclaimed = resizeAllCaches(TaskEnvironment::resizing, cacheList, true,
true, _globalAllocation - _globalSoftLimit);
uint64_t reclaimed =
resizeAllCaches(TaskEnvironment::resizing, cacheList, true, true,
_globalAllocation - _globalSoftLimit);
_globalAllocation -= reclaimed;
done = adjustGlobalLimitsIfAllowed(newGlobalLimit);
}
@ -561,11 +611,10 @@ uint64_t Manager::resizeAllCaches(Manager::TaskEnvironment environment,
if (aggressive) {
newLimit =
(noTasks ? metadata->usage()
: (std::min)(metadata->usage(), metadata->hardLimit() / 4));
} else {
newLimit =
(noTasks ? (std::max)(metadata->usage(), metadata->hardLimit() / 2)
: (std::min)(metadata->usage(), metadata->hardLimit() / 2));
} else {
newLimit = (std::max)(metadata->usage(),
(metadata->hardLimit() + metadata->usage()) / 2);
}
newLimit = (std::max)(newLimit, MIN_CACHE_SIZE);
@ -761,9 +810,11 @@ std::shared_ptr<Manager::PriorityList> Manager::priorityList() {
// catalog accessed caches
auto stats = _accessStats.getFrequencies();
std::set<Cache*> accessed;
std::set<std::shared_ptr<Cache>> accessed;
for (auto s : *stats) {
accessed.emplace(s.first.get());
if (auto cache = s.first.lock()) {
accessed.emplace(cache);
}
}
// gather all unaccessed caches at beginning of list
@ -772,7 +823,7 @@ std::shared_ptr<Manager::PriorityList> Manager::priorityList() {
std::shared_ptr<Cache> cache = m->cache();
m->unlock();
auto found = accessed.find(cache.get());
auto found = accessed.find(cache);
if (found == accessed.end()) {
list->emplace_back(cache);
}
@ -780,13 +831,15 @@ std::shared_ptr<Manager::PriorityList> Manager::priorityList() {
// gather all accessed caches in order
for (auto s : *stats) {
list->emplace_back(s.first);
if (auto cache = s.first.lock()) {
list->emplace_back(cache);
}
}
return list;
}
Manager::time_point Manager::futureTime(uint64_t secondsFromNow) {
Manager::time_point Manager::futureTime(uint64_t millisecondsFromNow) {
return (std::chrono::steady_clock::now() +
std::chrono::seconds(secondsFromNow));
std::chrono::milliseconds(millisecondsFromNow));
}

View File

@ -30,7 +30,8 @@
#include "Cache/FrequencyBuffer.h"
#include "Cache/Metadata.h"
#include "Cache/State.h"
#include "Cache/TransactionWindow.h"
#include "Cache/Transaction.h"
#include "Cache/TransactionManager.h"
#include <stdint.h>
#include <atomic>
@ -67,9 +68,19 @@ class Rebalancer; // forward declaration
/// need a different instance.
////////////////////////////////////////////////////////////////////////////////
class Manager {
protected:
struct cmp_weak_ptr {
bool operator()(std::weak_ptr<Cache> const& left,
std::weak_ptr<Cache> const& right) const;
};
struct hash_weak_ptr {
size_t operator()(const std::weak_ptr<Cache>& wp) const;
};
public:
static uint64_t MINIMUM_SIZE;
typedef FrequencyBuffer<std::shared_ptr<Cache>> AccessStatBuffer;
typedef FrequencyBuffer<std::weak_ptr<Cache>, cmp_weak_ptr, hash_weak_ptr>
AccessStatBuffer;
typedef FrequencyBuffer<uint8_t> FindStatBuffer;
typedef std::vector<std::shared_ptr<Cache>> PriorityList;
typedef std::chrono::time_point<std::chrono::steady_clock> time_point;
@ -144,14 +155,18 @@ class Manager {
std::pair<double, double> globalHitRates();
//////////////////////////////////////////////////////////////////////////////
/// @brief Signal the beginning of a transaction.
/// @brief Open a new transaction.
///
/// The transaction is considered read-only if it is guaranteed not to write
/// to the backing store. A read-only transaction may, however, write to the
/// cache.
//////////////////////////////////////////////////////////////////////////////
void startTransaction();
Transaction* beginTransaction(bool readOnly);
//////////////////////////////////////////////////////////////////////////////
/// @brief Signal the end of a transaction.
/// @brief Signal the end of a transaction. Deletes the passed Transaction.
//////////////////////////////////////////////////////////////////////////////
void endTransaction();
void endTransaction(Transaction* tx);
private:
// simple state variable for locking and other purposes
@ -180,7 +195,7 @@ class Manager {
uint64_t _globalAllocation;
// transaction management
TransactionWindow _transactions;
TransactionManager _transactions;
// task management
enum TaskEnvironment { none, rebalancing, resizing };
@ -200,9 +215,8 @@ class Manager {
private: // used by caches
// register and unregister individual caches
Manager::MetadataItr registerCache(Cache* cache, uint64_t requestedLimit,
std::function<void(Cache*)> deleter,
uint64_t fixedSize);
std::pair<bool, Manager::MetadataItr> registerCache(uint64_t requestedLimit,
uint64_t fixedSize);
void unregisterCache(Manager::MetadataItr& metadata);
// allow individual caches to request changes to their allocations
@ -259,7 +273,7 @@ class Manager {
std::shared_ptr<PriorityList> priorityList();
// helper for wait times
Manager::time_point futureTime(uint64_t secondsFromNow);
Manager::time_point futureTime(uint64_t millisecondsFromNow);
};
}; // end namespace cache

View File

@ -30,16 +30,15 @@
using namespace arangodb::cache;
Metadata::Metadata(std::shared_ptr<Cache> cache, uint64_t limit, uint8_t* table,
uint32_t logSize)
Metadata::Metadata(uint64_t limit)
: _state(),
_cache(cache),
_cache(nullptr),
_usage(0),
_softLimit(limit),
_hardLimit(limit),
_table(table),
_table(nullptr),
_auxiliaryTable(nullptr),
_logSize(logSize),
_logSize(0),
_auxiliaryLogSize(0) {}
Metadata::Metadata(Metadata const& other)
@ -53,6 +52,12 @@ Metadata::Metadata(Metadata const& other)
_logSize(other._logSize),
_auxiliaryLogSize(other._auxiliaryLogSize) {}
void Metadata::link(std::shared_ptr<Cache> cache) {
lock();
_cache = cache;
unlock();
}
void Metadata::lock() { _state.lock(); }
void Metadata::unlock() {

View File

@ -44,14 +44,18 @@ class Metadata {
//////////////////////////////////////////////////////////////////////////////
/// @brief Initializes record with given information.
//////////////////////////////////////////////////////////////////////////////
Metadata(std::shared_ptr<Cache> cache, uint64_t limit,
uint8_t* table = nullptr, uint32_t logSize = 0);
Metadata(uint64_t limit);
//////////////////////////////////////////////////////////////////////////////
/// @brief Initializes record from an existing record.
//////////////////////////////////////////////////////////////////////////////
Metadata(Metadata const& other);
//////////////////////////////////////////////////////////////////////////////
/// @brief Links the metadata object to an actual cache.
//////////////////////////////////////////////////////////////////////////////
void link(std::shared_ptr<Cache> cache);
//////////////////////////////////////////////////////////////////////////////
/// @brief Locks the record.
//////////////////////////////////////////////////////////////////////////////

View File

@ -114,14 +114,14 @@ CachedValue* PlainBucket::remove(uint32_t hash, void const* key,
return value;
}
CachedValue* PlainBucket::evictionCandidate() const {
CachedValue* PlainBucket::evictionCandidate(bool ignoreRefCount) const {
TRI_ASSERT(isLocked());
for (size_t i = 0; i < SLOTS_DATA; i++) {
size_t slot = SLOTS_DATA - (i + 1);
if (_cachedHashes[slot] == 0) {
continue;
}
if (_cachedData[slot]->isFreeable()) {
if (ignoreRefCount || _cachedData[slot]->isFreeable()) {
return _cachedData[slot];
}
}
@ -166,8 +166,6 @@ void PlainBucket::moveSlot(size_t slot, bool moveToFront) {
_cachedData[i] = _cachedData[i + 1];
}
}
if (i != slot) {
_cachedHashes[i] = hash;
_cachedData[i] = value;
}
_cachedHashes[i] = hash;
_cachedData[i] = value;
}

View File

@ -128,11 +128,12 @@ struct alignas(64) PlainBucket {
/// @brief Searches for the best candidate in the bucket to evict. Requires
/// state to be locked.
///
/// Returns a pointer to least recently used freeable value. If the bucket
/// contains no values or all have outstanding references, then it returns
/// nullptr.
/// Usually returns a pointer to least recently used freeable value. If the
/// bucket contains no values or all have outstanding references, then it
/// returns nullptr. In the case that ignoreRefCount is set to true, then it
/// simply returns the least recently used value, regardless of freeability.
//////////////////////////////////////////////////////////////////////////////
CachedValue* evictionCandidate() const;
CachedValue* evictionCandidate(bool ignoreRefCount = false) const;
//////////////////////////////////////////////////////////////////////////////
/// @brief Evicts the given value from the bucket. Requires state to be

View File

@ -71,36 +71,46 @@ bool PlainCache::insert(CachedValue* value) {
std::tie(ok, bucket) = getBucket(hash, TRIES_FAST);
if (ok) {
bool allowed = true;
bool eviction = false;
int64_t change = value->size();
CachedValue* candidate = bucket->find(hash, value->key(), value->keySize);
if (candidate == nullptr && bucket->isFull()) {
candidate = bucket->evictionCandidate();
if (candidate == nullptr) {
allowed = false;
} else {
eviction = true;
}
}
if (candidate != nullptr) {
change -= candidate->size();
}
_metadata->lock();
bool allowed = _metadata->adjustUsageIfAllowed(change);
_metadata->unlock();
if (allowed) {
if (candidate != nullptr) {
bucket->evict(candidate, true);
freeValue(candidate);
recordStat(Stat::insertEviction);
} else {
recordStat(Stat::insertNoEviction);
change -= candidate->size();
}
_metadata->lock();
allowed = _metadata->adjustUsageIfAllowed(change);
_metadata->unlock();
if (allowed) {
if (candidate != nullptr) {
bucket->evict(candidate, true);
freeValue(candidate);
}
recordStat(eviction ? Stat::insertEviction : Stat::insertNoEviction);
bucket->insert(hash, value);
inserted = true;
} else {
requestResize(); // let function do the hard work
}
bucket->insert(hash, value);
inserted = true;
} else {
requestResize(); // let function do the hard work
}
bucket->unlock();
requestMigrate(); // let function do the hard work
if (inserted) {
requestMigrate(); // let function do the hard work
}
endOperation();
}
@ -138,29 +148,29 @@ bool PlainCache::remove(void const* key, uint32_t keySize) {
return removed;
}
std::shared_ptr<Cache> PlainCache::create(Manager* manager,
uint64_t requestedSize,
bool allowGrowth,
bool enableWindowedStats) {
PlainCache* cache =
new PlainCache(manager, requestedSize, allowGrowth, enableWindowedStats);
bool PlainCache::blacklist(void const* key, uint32_t keySize) { return false; }
if (cache == nullptr) {
return std::shared_ptr<Cache>(nullptr);
}
cache->metadata()->lock();
std::shared_ptr<Cache> result = cache->metadata()->cache();
cache->metadata()->unlock();
return result;
uint64_t PlainCache::allocationSize(bool enableWindowedStats) {
return sizeof(PlainCache) +
StatBuffer::allocationSize(_evictionStatsCapacity) +
(enableWindowedStats ? (sizeof(StatBuffer) +
StatBuffer::allocationSize(_findStatsCapacity))
: 0);
}
PlainCache::PlainCache(Manager* manager, uint64_t requestedLimit,
bool allowGrowth, bool enableWindowedStats)
: Cache(manager, requestedLimit, allowGrowth, enableWindowedStats,
[](Cache* p) -> void { delete reinterpret_cast<PlainCache*>(p); },
sizeof(PlainCache)),
std::shared_ptr<Cache> PlainCache::create(Manager* manager,
Manager::MetadataItr metadata,
bool allowGrowth,
bool enableWindowedStats) {
return std::make_shared<PlainCache>(Cache::ConstructionGuard(), manager,
metadata, allowGrowth,
enableWindowedStats);
}
PlainCache::PlainCache(Cache::ConstructionGuard guard, Manager* manager,
Manager::MetadataItr metadata, bool allowGrowth,
bool enableWindowedStats)
: Cache(guard, manager, metadata, allowGrowth, enableWindowedStats),
_table(nullptr),
_logSize(0),
_tableSize(1),
@ -186,7 +196,7 @@ PlainCache::PlainCache(Manager* manager, uint64_t requestedLimit,
PlainCache::~PlainCache() {
_state.lock();
if (isOperational()) {
if (!_state.isSet(State::Flag::shutdown)) {
_state.unlock();
shutdown();
}
@ -295,8 +305,8 @@ bool PlainCache::migrate() {
for (size_t j = 0; j < PlainBucket::SLOTS_DATA; j++) {
size_t k = PlainBucket::SLOTS_DATA - (j + 1);
if ((*bucket)._cachedHashes[k] != 0) {
uint32_t hash = (*bucket)._cachedHashes[k];
CachedValue* value = (*bucket)._cachedData[k];
uint32_t hash = bucket->_cachedHashes[k];
CachedValue* value = bucket->_cachedData[k];
uint32_t targetIndex =
(hash & _auxiliaryBucketMask) >> _auxiliaryMaskShift;
@ -316,11 +326,13 @@ bool PlainCache::migrate() {
if (haveSpace) {
targetBucket->insert(hash, value);
} else {
uint64_t size = value->size();
freeValue(value);
reclaimMemory(size);
}
(*bucket)._cachedHashes[k] = 0;
(*bucket)._cachedData[k] = nullptr;
bucket->_cachedHashes[k] = 0;
bucket->_cachedData[k] = nullptr;
}
}
@ -418,15 +430,12 @@ void PlainCache::clearTable(PlainBucket* table, uint64_t tableSize) {
for (uint64_t i = 0; i < tableSize; i++) {
PlainBucket* bucket = &(table[i]);
bucket->lock(-1LL);
CachedValue* value = bucket->evictionCandidate();
while (value != nullptr) {
bucket->evict(value);
_metadata->lock();
_metadata->adjustUsageIfAllowed(-static_cast<int64_t>(value->size()));
_metadata->unlock();
freeValue(value);
value = bucket->evictionCandidate();
for (size_t j = 0; j < PlainBucket::SLOTS_DATA; j++) {
if (bucket->_cachedData[j] != nullptr) {
uint64_t size = bucket->_cachedData[j]->size();
freeValue(bucket->_cachedData[j]);
reclaimMemory(size);
}
}
bucket->clear();
}

View File

@ -42,8 +42,6 @@
namespace arangodb {
namespace cache {
class Manager; // forward declaration
////////////////////////////////////////////////////////////////////////////////
/// @brief A simple, LRU-ish cache.
///
@ -53,6 +51,11 @@ class Manager; // forward declaration
////////////////////////////////////////////////////////////////////////////////
class PlainCache final : public Cache {
public:
PlainCache(Cache::ConstructionGuard guard, Manager* manager,
Manager::MetadataItr metadata, bool allowGrowth,
bool enableWindowedStats);
~PlainCache();
PlainCache() = delete;
PlainCache(PlainCache const&) = delete;
PlainCache& operator=(PlainCache const&) = delete;
@ -86,6 +89,11 @@ class PlainCache final : public Cache {
//////////////////////////////////////////////////////////////////////////////
bool remove(void const* key, uint32_t keySize);
//////////////////////////////////////////////////////////////////////////////
/// @brief Does nothing; convenience method inheritance compliance
//////////////////////////////////////////////////////////////////////////////
bool blacklist(void const* key, uint32_t keySize);
private:
// main table info
PlainBucket* _table;
@ -107,15 +115,11 @@ class PlainCache final : public Cache {
friend class MigrateTask;
private:
// creator -- do not use constructor explicitly
static std::shared_ptr<Cache> create(Manager* manager, uint64_t requestedSize,
static uint64_t allocationSize(bool enableWindowedStats);
static std::shared_ptr<Cache> create(Manager* manager,
Manager::MetadataItr metadata,
bool allowGrowth,
bool enableWindowedStats);
PlainCache(Manager* manager, uint64_t requestedLimit, bool allowGrowth,
bool enableWindowedStats);
~PlainCache();
// management
bool freeMemory();
bool migrate();

View File

@ -21,25 +21,13 @@
/// @author Daniel H. Larkin
////////////////////////////////////////////////////////////////////////////////
#include "Cache/TransactionWindow.h"
#include "Cache/Transaction.h"
#include <stdint.h>
#include <atomic>
using namespace arangodb::cache;
TransactionWindow::TransactionWindow() : _open(0), _term(0) {}
Transaction::Transaction() : term(0), readOnly(true), sensitive(false) {}
void TransactionWindow::start() {
if (++_open == 1) {
_term++;
}
}
void TransactionWindow::end() {
if (--_open == 0) {
_term++;
}
}
uint64_t TransactionWindow::term() { return _term.load(); }
Transaction::Transaction(bool ro)
: term(0), readOnly(ro), sensitive(!readOnly) {}

View File

@ -0,0 +1,47 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2017 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Daniel H. Larkin
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGODB_CACHE_TRANSACTION_H
#define ARANGODB_CACHE_TRANSACTION_H
#include "Basics/Common.h"
#include <stdint.h>
namespace arangodb {
namespace cache {
////////////////////////////////////////////////////////////////////////////////
/// @brief Structure to maintain information about an individual transaction.
struct Transaction {
uint64_t term;
bool readOnly;
bool sensitive;
Transaction();
Transaction(bool ro);
};
}; // end namespace cache
}; // end namespace arangodb
#endif

View File

@ -0,0 +1,83 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2017 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Daniel H. Larkin
////////////////////////////////////////////////////////////////////////////////
#include "Cache/TransactionManager.h"
#include "Cache/State.h"
#include "Cache/Transaction.h"
#include <stdint.h>
#include <atomic>
using namespace arangodb::cache;
TransactionManager::TransactionManager()
: _state(), _openReads(0), _openSensitive(0), _openWrites(0), _term(0) {}
Transaction* TransactionManager::begin(bool readOnly) {
_state.lock();
Transaction* tx = new Transaction(readOnly);
if (readOnly) {
_openReads++;
if (_openWrites.load() > 0) {
tx->sensitive = true;
_openSensitive++;
}
} else {
tx->sensitive = true;
_openWrites++;
if (++_openSensitive == 1) {
_term++;
_openSensitive += _openReads.load();
}
}
tx->term = _term;
_state.unlock();
return tx;
}
void TransactionManager::end(Transaction* tx) {
TRI_ASSERT(tx != nullptr);
_state.lock();
// if currently in sensitive phase, and transaction term is old, it was
// upgraded to sensitive status
if (((_term & static_cast<uint64_t>(1)) > 0) && (_term > tx->term)) {
tx->sensitive = true;
}
if (tx->readOnly) {
_openReads--;
} else {
_openWrites--;
}
if (tx->sensitive && (--_openSensitive == 0)) {
_term++;
}
_state.unlock();
delete tx;
}
uint64_t TransactionManager::term() { return _term.load(); }

View File

@ -25,6 +25,8 @@
#define ARANGODB_CACHE_TRANSACTION_WINDOW_H
#include "Basics/Common.h"
#include "Cache/State.h"
#include "Cache/Transaction.h"
#include <stdint.h>
#include <atomic>
@ -33,28 +35,35 @@ namespace arangodb {
namespace cache {
////////////////////////////////////////////////////////////////////////////////
/// @brief Manage windows in time when there are either no ongoing transactions
/// or some.
/// @brief Manage global cache transactions.
///
/// Allows clients to start a transaction, end a transaction, and query an
/// identifier for the current window.
/// identifier for the current window. If the identifier is even, there are no
/// ongoing sensitive transactions, and it is safe to store any values retrieved
/// from the backing store to transactional caches. If the identifier is odd,
/// then some values may be blacklisted by transactional caches (if they have
/// been written to the backing store in the current window).
////////////////////////////////////////////////////////////////////////////////
class TransactionWindow {
class TransactionManager {
public:
//////////////////////////////////////////////////////////////////////////////
/// @brief Initialize state with no open transactions.
//////////////////////////////////////////////////////////////////////////////
TransactionWindow();
TransactionManager();
//////////////////////////////////////////////////////////////////////////////
/// @brief Signal the beginning of a transaction.
/// @brief Open a new transaction.
///
/// The transaction is considered read-only if it is guaranteed not to write
/// to the backing store. A read-only transaction may, however, write to the
/// cache.
//////////////////////////////////////////////////////////////////////////////
void start();
Transaction* begin(bool readOnly);
//////////////////////////////////////////////////////////////////////////////
/// @brief Signal the end of a transaction.
/// @brief Signal the end of a transaction. Deletes the passed Transaction.
//////////////////////////////////////////////////////////////////////////////
void end();
void end(Transaction* tx);
//////////////////////////////////////////////////////////////////////////////
/// @brief Return the current window identifier.
@ -62,7 +71,10 @@ class TransactionWindow {
uint64_t term();
private:
std::atomic<uint64_t> _open;
State _state;
std::atomic<uint64_t> _openReads;
std::atomic<uint64_t> _openSensitive;
std::atomic<uint64_t> _openWrites;
std::atomic<uint64_t> _term;
};

View File

@ -35,12 +35,14 @@ size_t TransactionalBucket::SLOTS_DATA = 3;
size_t TransactionalBucket::SLOTS_BLACKLIST = 4;
TransactionalBucket::TransactionalBucket() {
memset(this, 0, sizeof(TransactionalBucket));
_state.lock();
clear();
}
bool TransactionalBucket::lock(uint64_t transactionTerm, int64_t maxTries) {
return _state.lock(maxTries,
[&]() -> void { updateBlacklistTerm(transactionTerm); });
return _state.lock(maxTries, [this, transactionTerm]() -> void {
updateBlacklistTerm(transactionTerm);
});
}
void TransactionalBucket::unlock() {
@ -52,12 +54,12 @@ bool TransactionalBucket::isLocked() const { return _state.isLocked(); }
bool TransactionalBucket::isMigrated() const {
TRI_ASSERT(isLocked());
return _state.isSet(State::Flag::blacklisted);
return _state.isSet(State::Flag::migrated);
}
bool TransactionalBucket::isFullyBlacklisted() const {
TRI_ASSERT(isLocked());
return _state.isSet(State::Flag::blacklisted);
return (haveOpenTransaction() && _state.isSet(State::Flag::blacklisted));
}
bool TransactionalBucket::isFull() const {
@ -125,30 +127,39 @@ CachedValue* TransactionalBucket::remove(uint32_t hash, void const* key,
return value;
}
void TransactionalBucket::blacklist(uint32_t hash, void const* key,
uint32_t keySize) {
CachedValue* TransactionalBucket::blacklist(uint32_t hash, void const* key,
uint32_t keySize) {
TRI_ASSERT(isLocked());
// remove key if it's here
remove(hash, key, keySize);
if (!haveOpenTransaction()) {
return nullptr;
}
if (isFullyBlacklisted()) {
return;
// remove key if it's here
CachedValue* value = (keySize == 0) ? nullptr : remove(hash, key, keySize);
if (isBlacklisted(hash)) {
return value;
}
for (size_t i = 0; i < SLOTS_BLACKLIST; i++) {
if (_blacklistHashes[i] == 0) {
// found an empty slot
_blacklistHashes[i] = hash;
return;
return value;
}
}
// no empty slot found, fully blacklist
_state.toggleFlag(State::Flag::blacklisted);
return value;
}
bool TransactionalBucket::isBlacklisted(uint32_t hash) const {
TRI_ASSERT(isLocked());
if (!haveOpenTransaction()) {
return false;
}
if (isFullyBlacklisted()) {
return true;
}
@ -164,14 +175,14 @@ bool TransactionalBucket::isBlacklisted(uint32_t hash) const {
return blacklisted;
}
CachedValue* TransactionalBucket::evictionCandidate() const {
CachedValue* TransactionalBucket::evictionCandidate(bool ignoreRefCount) const {
TRI_ASSERT(isLocked());
for (size_t i = 0; i < SLOTS_DATA; i++) {
size_t slot = SLOTS_DATA - (i + 1);
if (_cachedHashes[slot] == 0) {
continue;
}
if (_cachedData[slot]->isFreeable()) {
if (ignoreRefCount || _cachedData[slot]->isFreeable()) {
return _cachedData[slot];
}
}
@ -193,6 +204,11 @@ void TransactionalBucket::evict(CachedValue* value, bool optimizeForInsertion) {
}
}
void TransactionalBucket::clear() {
TRI_ASSERT(isLocked());
memset(this, 0, sizeof(TransactionalBucket));
}
void TransactionalBucket::updateBlacklistTerm(uint64_t term) {
if (term > _blacklistTerm) {
_blacklistTerm = term;
@ -206,6 +222,7 @@ void TransactionalBucket::updateBlacklistTerm(uint64_t term) {
}
void TransactionalBucket::moveSlot(size_t slot, bool moveToFront) {
TRI_ASSERT(isLocked());
uint32_t hash = _cachedHashes[slot];
CachedValue* value = _cachedData[slot];
size_t i = slot;
@ -222,8 +239,12 @@ void TransactionalBucket::moveSlot(size_t slot, bool moveToFront) {
_cachedData[i] = _cachedData[i + 1];
}
}
if (i != slot) {
_cachedHashes[i] = hash;
_cachedData[i] = value;
}
_cachedHashes[i] = hash;
_cachedData[i] = value;
}
bool TransactionalBucket::haveOpenTransaction() const {
TRI_ASSERT(isLocked());
// only have open transactions if term is odd
return ((_blacklistTerm & 1ULL) > 0);
}

View File

@ -34,6 +34,16 @@
namespace arangodb {
namespace cache {
////////////////////////////////////////////////////////////////////////////////
/// @brief Bucket structure for TransactionalCache.
///
/// Contains, a State variable, three slots each for hashes and data pointers,
/// four slots for blacklisted hashes, and the applicable transaction term. Most
/// querying and manipulation can be handled via the exposed methods. Bucket
/// must be locked before doing anything else to ensure proper synchronization.
/// Data entries are carefully laid out to ensure the structure fits in a single
/// cacheline.
////////////////////////////////////////////////////////////////////////////////
struct alignas(64) TransactionalBucket {
State _state;
@ -52,33 +62,136 @@ struct alignas(64) TransactionalBucket {
uint32_t _padding[3];
#endif
//////////////////////////////////////////////////////////////////////////////
/// @brief Initialize an empty bucket.
//////////////////////////////////////////////////////////////////////////////
TransactionalBucket();
// must lock before using any other operations
//////////////////////////////////////////////////////////////////////////////
/// @brief Attempt to lock bucket (failing after maxTries attempts).
///
/// If the bucket is successfully locked, the transaction term is updated.
//////////////////////////////////////////////////////////////////////////////
bool lock(uint64_t transactionTerm, int64_t maxTries);
//////////////////////////////////////////////////////////////////////////////
/// @brief Unlock the bucket. Requires bucket to be locked.
//////////////////////////////////////////////////////////////////////////////
void unlock();
// state checkers
//////////////////////////////////////////////////////////////////////////////
/// @brief Checks whether the bucket is locked.
//////////////////////////////////////////////////////////////////////////////
bool isLocked() const;
//////////////////////////////////////////////////////////////////////////////
/// @brief Checks whether the bucket has been migrated. Requires state to be
/// locked.
//////////////////////////////////////////////////////////////////////////////
bool isMigrated() const;
//////////////////////////////////////////////////////////////////////////////
/// @brief Checks whether bucket has been fully blacklisted. Requires state to
/// be locked.
//////////////////////////////////////////////////////////////////////////////
bool isFullyBlacklisted() const;
//////////////////////////////////////////////////////////////////////////////
/// @brief Checks whether bucket is full. Requires state to be locked.
//////////////////////////////////////////////////////////////////////////////
bool isFull() const;
// primary functions
//////////////////////////////////////////////////////////////////////////////
/// @brief Looks up a given key and returns associated value. Requires state
/// to be locked.
///
/// Takes an input hash and key (specified by pointer and size), and searches
/// the bucket for a matching entry. If a matching entry is found, it is
/// returned. By default, a matching entry will be moved to the front of the
/// bucket to allow basic LRU semantics. If no matching entry is found,
/// nothing will be changed and a nullptr will be returned.
//////////////////////////////////////////////////////////////////////////////
CachedValue* find(uint32_t hash, void const* key, uint32_t keySize,
bool moveToFront = true);
void insert(uint32_t hash, CachedValue* value);
CachedValue* remove(uint32_t hash, void const* key, uint32_t keySize);
void blacklist(uint32_t hash, void const* key, uint32_t keySize);
// auxiliary functions
//////////////////////////////////////////////////////////////////////////////
/// @brief Inserts a given value if it is not blacklisted. Requires state to
/// be locked.
///
/// Requires that the bucket is not full and does not already contain an item
/// with the same key. If it is full, the item will not be inserted. If an
/// item with the same key exists, this is not detected but it is likely to
/// produce bugs later on down the line. If the item's hash has been
/// blacklisted, or the bucket is fully blacklisted, insertion will simply do
/// nothing. When inserting, the item is put into the first empty slot, then
/// moved to the front. If attempting to insert and the bucket is full, the
/// user should evict an item and specify the optimizeForInsertion flag to be
/// true.
//////////////////////////////////////////////////////////////////////////////
void insert(uint32_t hash, CachedValue* value);
//////////////////////////////////////////////////////////////////////////////
/// @brief Removes an item with the given key if one exists. Requires state to
/// be locked.
///
/// Search for a matching key. If none exists, do nothing and return a
/// nullptr. If one exists, remove it from the bucket and return the pointer
/// to the value. Upon removal, the empty slot generated is moved to the back
/// of the bucket (to remove the gap).
//////////////////////////////////////////////////////////////////////////////
CachedValue* remove(uint32_t hash, void const* key, uint32_t keySize);
//////////////////////////////////////////////////////////////////////////////
/// @brief Blacklists a key and removes it if it exists. Requires state to
/// be locked.
///
/// Search for a matching key. If one exists, remove it. Then blacklist the
/// hash associated with the key. If there are no empty blacklist slots, fully
/// blacklist the bucket.
//////////////////////////////////////////////////////////////////////////////
CachedValue* blacklist(uint32_t hash, void const* key, uint32_t keySize);
//////////////////////////////////////////////////////////////////////////////
/// @brief Checks whether a given hash is blacklisted. Requires state to be
/// locked.
//////////////////////////////////////////////////////////////////////////////
bool isBlacklisted(uint32_t hash) const;
CachedValue* evictionCandidate() const;
void evict(CachedValue* value, bool optimizeForInsertion);
//////////////////////////////////////////////////////////////////////////////
/// @brief Searches for the best candidate in the bucket to evict. Requires
/// state to be locked.
///
/// Usually returns a pointer to least recently used freeable value. If the
/// bucket contains no values or all have outstanding references, then it
/// returns nullptr. In the case that ignoreRefCount is set to true, then it
/// simply returns the least recently used value, regardless of freeability.
//////////////////////////////////////////////////////////////////////////////
CachedValue* evictionCandidate(bool ignoreRefCount = false) const;
//////////////////////////////////////////////////////////////////////////////
/// @brief Evicts the given value from the bucket. Requires state to be
/// locked.
///
/// By default, it will move the empty slot to the back of the bucket. If
/// preparing an empty slot for insertion, specify the second parameter to be
/// true. This will move the empty slot to the front instead.
//////////////////////////////////////////////////////////////////////////////
void evict(CachedValue* value, bool optimizeForInsertion = false);
//////////////////////////////////////////////////////////////////////////////
/// @brief Updates the bucket's blacklist term. Requires state to be locked.
//////////////////////////////////////////////////////////////////////////////
void updateBlacklistTerm(uint64_t term);
//////////////////////////////////////////////////////////////////////////////
/// @brief Reinitializes a bucket to be completely empty and unlocked.
/// Requires state to be locked.
//////////////////////////////////////////////////////////////////////////////
void clear();
private:
void updateBlacklistTerm(uint64_t term);
void moveSlot(size_t slot, bool moveToFront);
bool haveOpenTransaction() const;
};
}; // end namespace cache

View File

@ -27,7 +27,9 @@
#include "Cache/CachedValue.h"
#include "Cache/FrequencyBuffer.h"
#include "Cache/Metadata.h"
#include "Cache/State.h"
#include "Cache/TransactionalBucket.h"
#include "Random/RandomGenerator.h"
#include <stdint.h>
#include <atomic>
@ -36,69 +38,485 @@
using namespace arangodb::cache;
static constexpr int64_t TRIES_FAST = 50LL;
static constexpr int64_t TRIES_SLOW = 10000LL;
static constexpr int64_t TRIES_GUARANTEE = -1LL;
Cache::Finding TransactionalCache::find(void const* key, uint32_t keySize) {
// TODO: implement this;
return Cache::Finding(nullptr);
}
TRI_ASSERT(key != nullptr);
Finding result(nullptr);
uint32_t hash = hashKey(key, keySize);
bool TransactionalCache::insert(CachedValue* value) {
// TODO: implement this
return false;
}
bool ok;
TransactionalBucket* bucket;
std::tie(ok, bucket) = getBucket(hash, TRIES_FAST);
bool TransactionalCache::remove(void const* key, uint32_t keySize) {
// TODO: implement this
return false;
}
void TransactionalCache::blackList(void const* key, uint32_t keySize) {
// TODO: implement this
}
std::shared_ptr<Cache> TransactionalCache::create(Manager* manager,
uint64_t requestedSize,
bool allowGrowth,
bool enableWindowedStats) {
TransactionalCache* cache = new TransactionalCache(
manager, requestedSize, allowGrowth, enableWindowedStats);
if (cache == nullptr) {
return std::shared_ptr<Cache>(nullptr);
if (ok) {
result.reset(bucket->find(hash, key, keySize));
recordStat(result.found() ? Stat::findHit : Stat::findMiss);
bucket->unlock();
endOperation();
}
cache->metadata()->lock();
auto result = cache->metadata()->cache();
cache->metadata()->unlock();
return result;
}
TransactionalCache::TransactionalCache(Manager* manager,
uint64_t requestedLimit,
bool TransactionalCache::insert(CachedValue* value) {
TRI_ASSERT(value != nullptr);
bool inserted = false;
uint32_t hash = hashKey(value->key(), value->keySize);
bool ok;
TransactionalBucket* bucket;
std::tie(ok, bucket) = getBucket(hash, TRIES_FAST);
if (ok) {
bool allowed = !bucket->isBlacklisted(hash);
if (allowed) {
bool eviction = false;
int64_t change = value->size();
CachedValue* candidate = bucket->find(hash, value->key(), value->keySize);
if (candidate == nullptr && bucket->isFull()) {
candidate = bucket->evictionCandidate();
if (candidate == nullptr) {
allowed = false;
} else {
eviction = true;
}
}
if (allowed) {
if (candidate != nullptr) {
change -= candidate->size();
}
_metadata->lock();
allowed = _metadata->adjustUsageIfAllowed(change);
_metadata->unlock();
if (allowed) {
if (candidate != nullptr) {
bucket->evict(candidate, true);
freeValue(candidate);
}
recordStat(eviction ? Stat::insertEviction : Stat::insertNoEviction);
bucket->insert(hash, value);
inserted = true;
} else {
requestResize(); // let function do the hard work
}
}
}
bucket->unlock();
if (inserted) {
requestMigrate(); // let function do the hard work
}
endOperation();
}
return inserted;
}
bool TransactionalCache::remove(void const* key, uint32_t keySize) {
TRI_ASSERT(key != nullptr);
bool removed = false;
uint32_t hash = hashKey(key, keySize);
bool ok;
TransactionalBucket* bucket;
std::tie(ok, bucket) = getBucket(hash, TRIES_SLOW);
if (ok) {
CachedValue* candidate = bucket->remove(hash, key, keySize);
if (candidate != nullptr) {
int64_t change = -static_cast<int64_t>(candidate->size());
_metadata->lock();
bool allowed = _metadata->adjustUsageIfAllowed(change);
TRI_ASSERT(allowed);
_metadata->unlock();
freeValue(candidate);
}
removed = true;
bucket->unlock();
endOperation();
}
return removed;
}
bool TransactionalCache::blacklist(void const* key, uint32_t keySize) {
TRI_ASSERT(key != nullptr);
bool blacklisted = false;
uint32_t hash = hashKey(key, keySize);
bool ok;
TransactionalBucket* bucket;
std::tie(ok, bucket) = getBucket(hash, TRIES_SLOW);
if (ok) {
CachedValue* candidate = bucket->blacklist(hash, key, keySize);
blacklisted = true;
if (candidate != nullptr) {
int64_t change = -static_cast<int64_t>(candidate->size());
_metadata->lock();
bool allowed = _metadata->adjustUsageIfAllowed(change);
TRI_ASSERT(allowed);
_metadata->unlock();
freeValue(candidate);
}
bucket->unlock();
endOperation();
}
return blacklisted;
}
uint64_t TransactionalCache::allocationSize(bool enableWindowedStats) {
return sizeof(TransactionalCache) +
StatBuffer::allocationSize(_evictionStatsCapacity) +
(enableWindowedStats ? (sizeof(StatBuffer) +
StatBuffer::allocationSize(_findStatsCapacity))
: 0);
}
std::shared_ptr<Cache> TransactionalCache::create(Manager* manager,
Manager::MetadataItr metadata,
bool allowGrowth,
bool enableWindowedStats) {
return std::make_shared<TransactionalCache>(Cache::ConstructionGuard(),
manager, metadata, allowGrowth,
enableWindowedStats);
}
TransactionalCache::TransactionalCache(Cache::ConstructionGuard guard,
Manager* manager,
Manager::MetadataItr metadata,
bool allowGrowth,
bool enableWindowedStats)
: Cache(manager, requestedLimit, allowGrowth, enableWindowedStats,
[](Cache* p) -> void {
delete reinterpret_cast<TransactionalCache*>(p);
},
sizeof(TransactionalCache)) {
// TODO: implement this
: Cache(guard, manager, metadata, allowGrowth, enableWindowedStats),
_table(nullptr),
_logSize(0),
_tableSize(1),
_maskShift(32),
_bucketMask(0),
_auxiliaryTable(nullptr),
_auxiliaryLogSize(0),
_auxiliaryTableSize(1),
_auxiliaryMaskShift(32),
_auxiliaryBucketMask(0) {
_state.lock();
if (isOperational()) {
_metadata->lock();
_table = reinterpret_cast<TransactionalBucket*>(_metadata->table());
_logSize = _metadata->logSize();
_tableSize = (1ULL << _logSize);
_maskShift = 32 - _logSize;
_bucketMask = (_tableSize - 1) << _maskShift;
_metadata->unlock();
}
_state.unlock();
}
TransactionalCache::~TransactionalCache() {
// TODO: implement this
_state.lock();
if (!_state.isSet(State::Flag::shutdown)) {
_state.unlock();
shutdown();
}
if (_state.isLocked()) {
_state.unlock();
}
}
bool TransactionalCache::freeMemory() {
// TODO: implement this
return false;
_state.lock();
if (!isOperational()) {
_state.unlock();
return false;
}
startOperation();
_state.unlock();
bool underLimit = reclaimMemory(0ULL);
uint64_t failures = 0;
while (!underLimit) {
// pick a random bucket
uint32_t randomHash = RandomGenerator::interval(UINT32_MAX);
bool ok;
TransactionalBucket* bucket;
std::tie(ok, bucket) = getBucket(randomHash, TRIES_FAST, false);
if (ok) {
failures = 0;
// evict LRU freeable value if exists
CachedValue* candidate = bucket->evictionCandidate();
if (candidate != nullptr) {
uint64_t size = candidate->size();
bucket->evict(candidate);
freeValue(candidate);
underLimit = reclaimMemory(size);
}
bucket->unlock();
} else {
failures++;
if (failures > 100) {
_state.lock();
bool shouldQuit = !isOperational();
_state.unlock();
if (shouldQuit) {
break;
} else {
failures = 0;
}
}
}
}
endOperation();
return true;
}
bool TransactionalCache::migrate() {
// TODO: implement this
return false;
_state.lock();
if (!isOperational()) {
_state.unlock();
return false;
}
startOperation();
_metadata->lock();
if (_metadata->table() == nullptr || _metadata->auxiliaryTable() == nullptr) {
_metadata->unlock();
_state.unlock();
endOperation();
return false;
}
_auxiliaryTable =
reinterpret_cast<TransactionalBucket*>(_metadata->auxiliaryTable());
_auxiliaryLogSize = _metadata->auxiliaryLogSize();
_auxiliaryTableSize = (1ULL << _auxiliaryLogSize);
_auxiliaryMaskShift = (32 - _auxiliaryLogSize);
_auxiliaryBucketMask = (_auxiliaryTableSize - 1) << _auxiliaryMaskShift;
_metadata->unlock();
_state.toggleFlag(State::Flag::migrating);
_state.unlock();
uint64_t term = _manager->_transactions.term();
for (uint32_t i = 0; i < _tableSize; i++) {
// lock current bucket
TransactionalBucket* bucket = &(_table[i]);
bucket->lock(term, -1LL);
term = std::max(term, bucket->_blacklistTerm);
// collect target bucket(s)
std::vector<TransactionalBucket*> targets;
if (_logSize > _auxiliaryLogSize) {
uint32_t targetIndex = (i << _maskShift) >> _auxiliaryMaskShift;
targets.emplace_back(&(_auxiliaryTable[targetIndex]));
} else {
uint32_t baseIndex = (i << _maskShift) >> _auxiliaryMaskShift;
for (size_t j = 0; j < (1U << (_auxiliaryLogSize - _logSize)); j++) {
uint32_t targetIndex = baseIndex + j;
targets.emplace_back(&(_auxiliaryTable[targetIndex]));
}
}
// lock target bucket(s)
for (TransactionalBucket* targetBucket : targets) {
targetBucket->lock(term, TRIES_GUARANTEE);
term = std::max(term, targetBucket->_blacklistTerm);
}
// update all buckets to maximum term found (guaranteed at most the current)
bucket->updateBlacklistTerm(term);
for (TransactionalBucket* targetBucket : targets) {
targetBucket->updateBlacklistTerm(term);
}
// now actually migrate any relevant blacklist terms
if (bucket->isFullyBlacklisted()) {
for (TransactionalBucket* targetBucket : targets) {
if (!targetBucket->isFullyBlacklisted()) {
(*targetBucket)._state.toggleFlag(State::Flag::blacklisted);
}
}
} else {
for (size_t j = 0; j < TransactionalBucket::SLOTS_BLACKLIST; j++) {
uint32_t hash = bucket->_blacklistHashes[j];
if (hash == 0) {
break;
}
uint32_t targetIndex = getIndex(hash, true);
TransactionalBucket* targetBucket = &(_auxiliaryTable[targetIndex]);
CachedValue* candidate = targetBucket->blacklist(hash, nullptr, 0);
TRI_ASSERT(candidate == nullptr);
bucket->_blacklistHashes[j] = 0;
}
}
// migrate actual values
for (size_t j = 0; j < TransactionalBucket::SLOTS_DATA; j++) {
size_t k = TransactionalBucket::SLOTS_DATA - (j + 1);
if (bucket->_cachedHashes[k] != 0) {
uint32_t hash = bucket->_cachedHashes[k];
CachedValue* value = bucket->_cachedData[k];
uint32_t targetIndex = getIndex(hash, true);
TransactionalBucket* targetBucket = &(_auxiliaryTable[targetIndex]);
if (targetBucket->isBlacklisted(hash)) {
uint64_t size = value->size();
freeValue(value);
reclaimMemory(size);
} else {
bool haveSpace = true;
if (targetBucket->isFull()) {
CachedValue* candidate = targetBucket->evictionCandidate();
if (candidate != nullptr) {
targetBucket->evict(candidate, true);
uint64_t size = candidate->size();
freeValue(candidate);
reclaimMemory(size);
} else {
haveSpace = false;
}
}
if (haveSpace) {
targetBucket->insert(hash, value);
} else {
uint64_t size = value->size();
freeValue(value);
reclaimMemory(size);
}
}
bucket->_cachedHashes[k] = 0;
bucket->_cachedData[k] = nullptr;
}
}
// unlock targets
for (TransactionalBucket* targetBucket : targets) {
targetBucket->unlock();
}
// finish up this bucket's migration
bucket->_state.toggleFlag(State::Flag::migrated);
bucket->unlock();
}
// swap tables and unmark local migrating flag
_state.lock();
std::swap(_table, _auxiliaryTable);
std::swap(_logSize, _auxiliaryLogSize);
std::swap(_tableSize, _auxiliaryTableSize);
std::swap(_maskShift, _auxiliaryMaskShift);
std::swap(_bucketMask, _auxiliaryBucketMask);
_state.toggleFlag(State::Flag::migrating);
_state.unlock();
// clear out old table
clearTable(_auxiliaryTable, _auxiliaryTableSize);
// release references to old table
_state.lock();
_auxiliaryTable = nullptr;
_auxiliaryLogSize = 0;
_auxiliaryTableSize = 1;
_auxiliaryMaskShift = 32;
_auxiliaryBucketMask = 0;
_state.unlock();
// swap table in metadata
_metadata->lock();
_metadata->swapTables();
_metadata->unlock();
endOperation();
return true;
}
void TransactionalCache::clearTables() {
// TODO: implement this
if (_table != nullptr) {
clearTable(_table, _tableSize);
}
if (_auxiliaryTable != nullptr) {
clearTable(_auxiliaryTable, _auxiliaryTableSize);
}
}
std::pair<bool, TransactionalBucket*> TransactionalCache::getBucket(
uint32_t hash, int64_t maxTries, bool singleOperation) {
TransactionalBucket* bucket = nullptr;
bool ok = _state.lock(maxTries);
if (ok) {
bool started = false;
ok = isOperational();
if (ok) {
if (singleOperation) {
startOperation();
started = true;
_metadata->lock();
_manager->reportAccess(_metadata->cache());
_metadata->unlock();
}
uint64_t term = _manager->_transactions.term();
bucket = &(_table[getIndex(hash, false)]);
ok = bucket->lock(term, maxTries);
if (ok &&
bucket->isMigrated()) { // get bucket from auxiliary table instead
bucket->unlock();
bucket = &(_auxiliaryTable[getIndex(hash, true)]);
ok = bucket->lock(term, maxTries);
if (ok && bucket->isMigrated()) {
ok = false;
bucket->unlock();
}
}
}
if (!ok && started) {
endOperation();
}
_state.unlock();
}
return std::pair<bool, TransactionalBucket*>(ok, bucket);
}
void TransactionalCache::clearTable(TransactionalBucket* table,
uint64_t tableSize) {
for (uint64_t i = 0; i < tableSize; i++) {
TransactionalBucket* bucket = &(table[i]);
bucket->lock(0, -1LL); // term doesn't actually matter here
for (size_t j = 0; j < TransactionalBucket::SLOTS_DATA; j++) {
if (bucket->_cachedData[j] != nullptr) {
uint64_t size = bucket->_cachedData[j]->size();
freeValue(bucket->_cachedData[j]);
reclaimMemory(size);
}
}
bucket->clear();
}
}
uint32_t TransactionalCache::getIndex(uint32_t hash, bool useAuxiliary) const {
if (useAuxiliary) {
return ((hash & _auxiliaryBucketMask) >> _auxiliaryMaskShift);
}
return ((hash & _bucketMask) >> _maskShift);
}

View File

@ -28,7 +28,10 @@
#include "Cache/Cache.h"
#include "Cache/CachedValue.h"
#include "Cache/FrequencyBuffer.h"
#include "Cache/Manager.h"
#include "Cache/ManagerTasks.h"
#include "Cache/Metadata.h"
#include "Cache/State.h"
#include "Cache/TransactionalBucket.h"
#include <stdint.h>
@ -39,19 +42,73 @@
namespace arangodb {
namespace cache {
class Manager; // forward declaration
////////////////////////////////////////////////////////////////////////////////
/// @brief A transactional, LRU-ish cache.
///
/// To create a cache, see Manager class. Once created, the class has a simple
/// API mostly following that of the base Cache class. For any non-pure-virtual
/// functions, see Cache.h for documentation. The only additional functions
/// exposed on the API of the transactional cache are those dealing with the
/// blacklisting of keys.
///
/// To operate correctly, whenever a key is about to be written to the backing
/// store, it must be blacklisted in any corresponding transactional caches.
/// This will prevent the cache from serving stale or potentially incorrect
/// values and allow for clients to fall through to the backing transactional
/// store.
////////////////////////////////////////////////////////////////////////////////
class TransactionalCache final : public Cache {
public:
TransactionalCache(Cache::ConstructionGuard guard, Manager* manager,
Manager::MetadataItr metadata, bool allowGrowth,
bool enableWindowedStats);
~TransactionalCache();
TransactionalCache() = delete;
TransactionalCache(TransactionalCache const&) = delete;
TransactionalCache& operator=(TransactionalCache const&) = delete;
public:
//////////////////////////////////////////////////////////////////////////////
/// @brief Looks up the given key.
///
/// May report a false negative if it fails to acquire a lock in a timely
/// fashion. Should not block for long.
//////////////////////////////////////////////////////////////////////////////
Cache::Finding find(void const* key, uint32_t keySize);
//////////////////////////////////////////////////////////////////////////////
/// @brief Attempts to insert the given value.
///
/// Returns true if inserted, false otherwise. Will not insert if the key is
/// (or its corresponding hash) is blacklisted. Will not insert value if this
/// would cause the total usage to exceed the limits. May also not insert
/// value if it fails to acquire a lock in a timely fashion. Should not block
/// for long.
//////////////////////////////////////////////////////////////////////////////
bool insert(CachedValue* value);
//////////////////////////////////////////////////////////////////////////////
/// @brief Attempts to remove the given key.
///
/// Returns true if the key guaranteed not to be in the cache, false if the
/// key may remain in the cache. May leave the key in the cache if it fails to
/// acquire a lock in a timely fashion. Makes more attempts to acquire a lock
/// before quitting, so may block for longer than find or insert. Client may
/// re-try.
//////////////////////////////////////////////////////////////////////////////
bool remove(void const* key, uint32_t keySize);
void blackList(void const* key, uint32_t keySize);
//////////////////////////////////////////////////////////////////////////////
/// @brief Attempts to blacklist the given key.
///
/// Returns true if the key was blacklisted and is guaranteed not to be in the
/// cache, false otherwise. May not blacklist the key if it fails to
/// acquire a lock in a timely fashion. Makes more attempts to acquire a lock
/// before quitting, so may block for longer than find or insert. Client
/// should re-try.
//////////////////////////////////////////////////////////////////////////////
bool blacklist(void const* key, uint32_t keySize);
private:
// main table info
@ -74,19 +131,22 @@ class TransactionalCache final : public Cache {
friend class MigrateTask;
private:
// creator -- do not use constructor explicitly
static std::shared_ptr<Cache> create(Manager* manager, uint64_t requestedSize,
static uint64_t allocationSize(bool enableWindowedStats);
static std::shared_ptr<Cache> create(Manager* manager,
Manager::MetadataItr metadata,
bool allowGrowth,
bool enableWindowedStats);
TransactionalCache(Manager* manager, uint64_t requestedLimit,
bool allowGrowth, bool enableWindowedStats);
~TransactionalCache();
// management
bool freeMemory();
bool migrate();
void clearTables();
// helpers
std::pair<bool, TransactionalBucket*> getBucket(uint32_t hash,
int64_t maxTries,
bool singleOperation = true);
void clearTable(TransactionalBucket* table, uint64_t tableSize);
uint32_t getIndex(uint32_t hash, bool useAuxiliary) const;
};
}; // end namespace cache