1
0
Fork 0

Augmented cache API and fixed a potential deadlock scenario. (#2579)

* Augmented cache API and fixed a potential deadlock scenario.

* The RocksDBEdgeIndex now tries to read up to 10 times form the cache on lock_timeout. Instead of rewriting everything after first attempt
This commit is contained in:
Dan Larkin 2017-06-19 11:28:04 -04:00 committed by Frank Celler
parent 2680a1aaf8
commit a3ae2b7191
19 changed files with 311 additions and 221 deletions

View File

@ -168,8 +168,8 @@ bool Cache::isResizing() {
_metadata.lock();
resizing = _metadata.isSet(State::Flag::resizing);
_metadata.unlock();
_state.unlock();
}
_state.unlock();
return resizing;
}
@ -181,8 +181,8 @@ bool Cache::isMigrating() {
_metadata.lock();
migrating = _metadata.isSet(State::Flag::migrating);
_metadata.unlock();
_state.unlock();
}
_state.unlock();
return migrating;
}

View File

@ -25,6 +25,7 @@
#define ARANGODB_CACHE_CACHE_H
#include "Basics/Common.h"
#include "Basics/Result.h"
#include "Cache/CachedValue.h"
#include "Cache/Common.h"
#include "Cache/Finding.h"
@ -78,9 +79,9 @@ class Cache : public std::enable_shared_from_this<Cache> {
// primary functionality; documented in derived classes
virtual Finding find(void const* key, uint32_t keySize) = 0;
virtual bool insert(CachedValue* value) = 0;
virtual bool remove(void const* key, uint32_t keySize) = 0;
virtual bool blacklist(void const* key, uint32_t keySize) = 0;
virtual Result insert(CachedValue* value) = 0;
virtual Result remove(void const* key, uint32_t keySize) = 0;
virtual Result blacklist(void const* key, uint32_t keySize) = 0;
//////////////////////////////////////////////////////////////////////////////
/// @brief Returns the total memory usage for this cache in bytes.
@ -131,8 +132,8 @@ class Cache : public std::enable_shared_from_this<Cache> {
bool isShutdown();
protected:
static constexpr int64_t triesFast = 10000;
static constexpr int64_t triesSlow = 100000;
static constexpr int64_t triesFast = 200;
static constexpr int64_t triesSlow = 10000;
static constexpr int64_t triesGuarantee = -1;
protected:

View File

@ -24,23 +24,32 @@
#include "Cache/Finding.h"
#include "Cache/CachedValue.h"
using namespace arangodb;
using namespace arangodb::cache;
Finding::Finding() : _value(nullptr) {}
Finding::Finding() : _value(nullptr), _result(TRI_ERROR_NO_ERROR) {}
Finding::Finding(CachedValue* v) : _value(v) {
Finding::Finding(CachedValue* v) : _value(v), _result(TRI_ERROR_NO_ERROR) {
if (_value != nullptr) {
_value->lease();
}
}
Finding::Finding(Finding const& other) : _value(other._value) {
Finding::Finding(CachedValue* v, Result const& r) : _value(v), _result(r) {
if (_value != nullptr) {
_value->lease();
}
}
Finding::Finding(Finding&& other) : _value(other._value) {
Finding::Finding(Finding const& other)
: _value(other._value), _result(other._result) {
if (_value != nullptr) {
_value->lease();
}
}
Finding::Finding(Finding&& other)
: _value(other._value), _result(std::move(other._result)) {
other._value = nullptr;
}
@ -58,6 +67,8 @@ Finding& Finding::operator=(Finding const& other) {
_value->lease();
}
_result = other._result;
return *this;
}
@ -73,6 +84,8 @@ Finding& Finding::operator=(Finding&& other) {
_value = other._value;
other._value = nullptr;
_result = std::move(other._result);
return *this;
}
@ -109,6 +122,8 @@ void Finding::reset(CachedValue* v) {
}
}
void Finding::reportError(Result const& r) { _result = r; }
bool Finding::found() const { return (_value != nullptr); }
CachedValue const* Finding::value() const { return _value; }
@ -116,3 +131,5 @@ CachedValue const* Finding::value() const { return _value; }
CachedValue* Finding::copy() const {
return ((_value == nullptr) ? nullptr : _value->copy());
}
Result Finding::result() const { return _result; }

View File

@ -25,6 +25,7 @@
#define ARANGODB_CACHE_FINDING_H
#include "Basics/Common.h"
#include "Basics/Result.h"
#include "Cache/CachedValue.h"
namespace arangodb {
@ -43,6 +44,7 @@ class Finding {
public:
Finding();
explicit Finding(CachedValue* v);
explicit Finding(CachedValue* v, Result const& r);
Finding(Finding const& other);
Finding(Finding&& other);
Finding& operator=(Finding const& other);
@ -60,6 +62,11 @@ class Finding {
//////////////////////////////////////////////////////////////////////////////
void set(CachedValue* v);
//////////////////////////////////////////////////////////////////////////////
/// @brief Sets the error Result object.
//////////////////////////////////////////////////////////////////////////////
void reportError(Result const& r);
//////////////////////////////////////////////////////////////////////////////
/// @brief Specifies whether the value was found. If not, value is nullptr.
//////////////////////////////////////////////////////////////////////////////
@ -74,14 +81,20 @@ class Finding {
/// @brief Creates a copy of the underlying value and returns a pointer.
//////////////////////////////////////////////////////////////////////////////
CachedValue* copy() const;
//////////////////////////////////////////////////////////////////////////////
/// @brief Releases the finding.
//////////////////////////////////////////////////////////////////////////////
void release();
//////////////////////////////////////////////////////////////////////////////
/// @brief Returns the status result object associated with the lookup.
/////////////////////////////////////////////////////////////////////////////
Result result() const;
private:
CachedValue* _value;
Result _result;
};
}; // end namespace cache

View File

@ -369,10 +369,12 @@ std::pair<bool, Manager::time_point> Manager::requestGrow(
nextRequest = std::chrono::steady_clock::now();
resizeCache(TaskEnvironment::none, cache,
metadata->newLimit()); // unlocks metadata
} else {
metadata->unlock();
}
}
if (!allowed) {
metadata->unlock();
}
}
_state.unlock();
}
@ -620,10 +622,6 @@ void Manager::resizeCache(Manager::TaskEnvironment environment,
if (metadata->usage <= newLimit) {
uint64_t oldLimit = metadata->hardUsageLimit;
/*std::cout << "(" << metadata->softUsageLimit << ", "
<< metadata->hardUsageLimit << ") -> " << newLimit << " ("
<< metadata->deservedSize << ", " << metadata->maxSize << ")"
<< std::endl;*/
bool success = metadata->adjustLimits(newLimit, newLimit);
TRI_ASSERT(success);
metadata->unlock();
@ -635,10 +633,6 @@ void Manager::resizeCache(Manager::TaskEnvironment environment,
return;
}
/*std::cout << "(" << metadata->softUsageLimit << ", "
<< metadata->hardUsageLimit << ") -> " << newLimit << " ("
<< metadata->deservedSize << ", " << metadata->maxSize << ")"
<< std::endl;*/
bool success = metadata->adjustLimits(newLimit, metadata->hardUsageLimit);
TRI_ASSERT(success);
TRI_ASSERT(!metadata->isSet(State::Flag::resizing));

View File

@ -38,6 +38,7 @@
#include <chrono>
#include <list>
using namespace arangodb;
using namespace arangodb::cache;
Finding PlainCache::find(void const* key, uint32_t keySize) {
@ -45,32 +46,33 @@ Finding PlainCache::find(void const* key, uint32_t keySize) {
Finding result;
uint32_t hash = hashKey(key, keySize);
bool ok;
Result status;
PlainBucket* bucket;
std::shared_ptr<Table> source;
std::tie(ok, bucket, source) = getBucket(hash, Cache::triesFast);
std::tie(status, bucket, source) = getBucket(hash, Cache::triesFast);
if (ok) {
if (status.ok()) {
result.set(bucket->find(hash, key, keySize));
recordStat(result.found() ? Stat::findHit : Stat::findMiss);
bucket->unlock();
endOperation();
} else {
result.reportError(status);
}
return result;
}
bool PlainCache::insert(CachedValue* value) {
Result PlainCache::insert(CachedValue* value) {
TRI_ASSERT(value != nullptr);
bool inserted = false;
uint32_t hash = hashKey(value->key(), value->keySize);
bool ok;
Result status;
PlainBucket* bucket;
std::shared_ptr<Table> source;
std::tie(ok, bucket, source) = getBucket(hash, Cache::triesFast);
std::tie(status, bucket, source) = getBucket(hash, Cache::triesFast);
if (ok) {
if (status.ok()) {
bool allowed = true;
bool maybeMigrate = false;
int64_t change = static_cast<int64_t>(value->size());
@ -80,6 +82,7 @@ bool PlainCache::insert(CachedValue* value) {
candidate = bucket->evictionCandidate();
if (candidate == nullptr) {
allowed = false;
status.reset(TRI_ERROR_ARANGO_BUSY);
}
}
@ -100,15 +103,15 @@ bool PlainCache::insert(CachedValue* value) {
eviction = true;
}
freeValue(candidate);
}
}
bucket->insert(hash, value);
inserted = true;
if (!eviction) {
maybeMigrate = source->slotFilled();
}
maybeMigrate |= reportInsert(eviction);
} else {
requestGrow(); // let function do the hard work
status.reset(TRI_ERROR_RESOURCE_LIMIT);
}
}
@ -119,20 +122,19 @@ bool PlainCache::insert(CachedValue* value) {
endOperation();
}
return inserted;
return status;
}
bool PlainCache::remove(void const* key, uint32_t keySize) {
Result PlainCache::remove(void const* key, uint32_t keySize) {
TRI_ASSERT(key != nullptr);
bool removed = false;
uint32_t hash = hashKey(key, keySize);
bool ok;
Result status;
PlainBucket* bucket;
std::shared_ptr<Table> source;
std::tie(ok, bucket, source) = getBucket(hash, Cache::triesSlow);
std::tie(status, bucket, source) = getBucket(hash, Cache::triesSlow);
if (ok) {
if (status.ok()) {
bool maybeMigrate = false;
CachedValue* candidate = bucket->remove(hash, key, keySize);
@ -148,7 +150,6 @@ bool PlainCache::remove(void const* key, uint32_t keySize) {
maybeMigrate = source->slotEmptied();
}
removed = true;
bucket->unlock();
if (maybeMigrate) {
requestMigrate(_table->idealSize());
@ -156,10 +157,12 @@ bool PlainCache::remove(void const* key, uint32_t keySize) {
endOperation();
}
return removed;
return status;
}
bool PlainCache::blacklist(void const* key, uint32_t keySize) { return false; }
Result PlainCache::blacklist(void const* key, uint32_t keySize) {
return {TRI_ERROR_NOT_IMPLEMENTED};
}
uint64_t PlainCache::allocationSize(bool enableWindowedStats) {
return sizeof(PlainCache) +
@ -194,13 +197,13 @@ PlainCache::~PlainCache() {
uint64_t PlainCache::freeMemoryFrom(uint32_t hash) {
uint64_t reclaimed = 0;
bool ok;
Result status;
bool maybeMigrate = false;
PlainBucket* bucket;
std::shared_ptr<Table> source;
std::tie(ok, bucket, source) = getBucket(hash, Cache::triesFast, false);
std::tie(status, bucket, source) = getBucket(hash, Cache::triesFast, false);
if (ok) {
if (status.ok()) {
// evict LRU freeable value if exists
CachedValue* candidate = bucket->evictionCandidate();
@ -282,8 +285,9 @@ void PlainCache::migrateBucket(void* sourcePtr,
source->unlock();
}
std::tuple<bool, PlainBucket*, std::shared_ptr<Table>> PlainCache::getBucket(
std::tuple<Result, PlainBucket*, std::shared_ptr<Table>> PlainCache::getBucket(
uint32_t hash, int64_t maxTries, bool singleOperation) {
Result status;
PlainBucket* bucket = nullptr;
std::shared_ptr<Table> source(nullptr);
@ -302,14 +306,21 @@ std::tuple<bool, PlainBucket*, std::shared_ptr<Table>> PlainCache::getBucket(
bucket = reinterpret_cast<PlainBucket*>(pair.first);
source = pair.second;
ok = (bucket != nullptr);
if (!ok) {
status.reset(TRI_ERROR_LOCK_TIMEOUT);
}
} else {
status.reset(TRI_ERROR_SHUTTING_DOWN);
}
if (!ok && started) {
endOperation();
}
_state.unlock();
} else {
status.reset(TRI_ERROR_LOCK_TIMEOUT);
}
return std::make_tuple(ok, bucket, source);
return std::make_tuple(status, bucket, source);
}
Table::BucketClearer PlainCache::bucketClearer(Metadata* metadata) {

View File

@ -68,34 +68,35 @@ class PlainCache final : public Cache {
/// @brief Looks up the given key.
///
/// May report a false negative if it fails to acquire a lock in a timely
/// fashion. Should not block for long.
/// fashion. The Result contained in the return value should report an error
/// code in this case. Should not block for long.
//////////////////////////////////////////////////////////////////////////////
Finding find(void const* key, uint32_t keySize);
//////////////////////////////////////////////////////////////////////////////
/// @brief Attempts to insert the given value.
///
/// Returns true if inserted, false otherwise. Will not insert value if this
/// Returns ok if inserted, error otherwise. Will not insert value if this
/// would cause the total usage to exceed the limits. May also not insert
/// value if it fails to acquire a lock in a timely fashion. Should not block
/// for long.
//////////////////////////////////////////////////////////////////////////////
bool insert(CachedValue* value);
Result insert(CachedValue* value);
//////////////////////////////////////////////////////////////////////////////
/// @brief Attempts to remove the given key.
///
/// Returns true if the key guaranteed not to be in the cache, false if the
/// Returns ok if the key guaranteed not to be in the cache, error if the
/// key may remain in the cache. May leave the key in the cache if it fails to
/// acquire a lock in a timely fashion. Makes more attempts to acquire a lock
/// before quitting, so may block for longer than find or insert.
//////////////////////////////////////////////////////////////////////////////
bool remove(void const* key, uint32_t keySize);
Result remove(void const* key, uint32_t keySize);
//////////////////////////////////////////////////////////////////////////////
/// @brief Does nothing; convenience method inheritance compliance
//////////////////////////////////////////////////////////////////////////////
bool blacklist(void const* key, uint32_t keySize);
Result blacklist(void const* key, uint32_t keySize);
private:
// friend class manager and tasks
@ -115,7 +116,7 @@ class PlainCache final : public Cache {
std::shared_ptr<Table> newTable);
// helpers
std::tuple<bool, PlainBucket*, std::shared_ptr<Table>> getBucket(
std::tuple<Result, PlainBucket*, std::shared_ptr<Table>> getBucket(
uint32_t hash, int64_t maxTries, bool singleOperation = true);
uint32_t getIndex(uint32_t hash, bool useAuxiliary) const;

View File

@ -39,6 +39,7 @@
#include <chrono>
#include <list>
using namespace arangodb;
using namespace arangodb::cache;
Finding TransactionalCache::find(void const* key, uint32_t keySize) {
@ -46,12 +47,12 @@ Finding TransactionalCache::find(void const* key, uint32_t keySize) {
Finding result;
uint32_t hash = hashKey(key, keySize);
bool ok;
Result status;
TransactionalBucket* bucket;
std::shared_ptr<Table> source;
std::tie(ok, bucket, source) = getBucket(hash, Cache::triesFast);
std::tie(status, bucket, source) = getBucket(hash, Cache::triesFast);
if (ok) {
if (status.ok()) {
result.set(bucket->find(hash, key, keySize));
recordStat(result.found() ? Stat::findHit : Stat::findMiss);
bucket->unlock();
@ -61,17 +62,16 @@ Finding TransactionalCache::find(void const* key, uint32_t keySize) {
return result;
}
bool TransactionalCache::insert(CachedValue* value) {
Result TransactionalCache::insert(CachedValue* value) {
TRI_ASSERT(value != nullptr);
bool inserted = false;
uint32_t hash = hashKey(value->key(), value->keySize);
bool ok;
Result status;
TransactionalBucket* bucket;
std::shared_ptr<Table> source;
std::tie(ok, bucket, source) = getBucket(hash, Cache::triesFast);
std::tie(status, bucket, source) = getBucket(hash, Cache::triesFast);
if (ok) {
if (status.ok()) {
bool maybeMigrate = false;
bool allowed = !bucket->isBlacklisted(hash);
if (allowed) {
@ -82,6 +82,7 @@ bool TransactionalCache::insert(CachedValue* value) {
candidate = bucket->evictionCandidate();
if (candidate == nullptr) {
allowed = false;
status.reset(TRI_ERROR_ARANGO_BUSY);
}
}
@ -104,15 +105,17 @@ bool TransactionalCache::insert(CachedValue* value) {
freeValue(candidate);
}
bucket->insert(hash, value);
inserted = true;
if (!eviction) {
maybeMigrate = source->slotFilled();
}
maybeMigrate |= reportInsert(eviction);
} else {
requestGrow(); // let function do the hard work
status.reset(TRI_ERROR_RESOURCE_LIMIT);
}
}
} else {
status.reset(TRI_ERROR_ARANGO_CONFLICT);
}
bucket->unlock();
@ -122,20 +125,19 @@ bool TransactionalCache::insert(CachedValue* value) {
endOperation();
}
return inserted;
return status;
}
bool TransactionalCache::remove(void const* key, uint32_t keySize) {
Result TransactionalCache::remove(void const* key, uint32_t keySize) {
TRI_ASSERT(key != nullptr);
bool removed = false;
uint32_t hash = hashKey(key, keySize);
bool ok;
Result status;
TransactionalBucket* bucket;
std::shared_ptr<Table> source;
std::tie(ok, bucket, source) = getBucket(hash, Cache::triesSlow);
std::tie(status, bucket, source) = getBucket(hash, Cache::triesSlow);
if (ok) {
if (status.ok()) {
bool maybeMigrate = false;
CachedValue* candidate = bucket->remove(hash, key, keySize);
@ -151,7 +153,6 @@ bool TransactionalCache::remove(void const* key, uint32_t keySize) {
maybeMigrate = source->slotEmptied();
}
removed = true;
bucket->unlock();
if (maybeMigrate) {
requestMigrate(_table->idealSize());
@ -159,23 +160,21 @@ bool TransactionalCache::remove(void const* key, uint32_t keySize) {
endOperation();
}
return removed;
return status;
}
bool TransactionalCache::blacklist(void const* key, uint32_t keySize) {
Result TransactionalCache::blacklist(void const* key, uint32_t keySize) {
TRI_ASSERT(key != nullptr);
bool blacklisted = false;
uint32_t hash = hashKey(key, keySize);
bool ok;
Result status;
TransactionalBucket* bucket;
std::shared_ptr<Table> source;
std::tie(ok, bucket, source) = getBucket(hash, Cache::triesSlow);
std::tie(status, bucket, source) = getBucket(hash, Cache::triesSlow);
if (ok) {
if (status.ok()) {
bool maybeMigrate = false;
CachedValue* candidate = bucket->blacklist(hash, key, keySize);
blacklisted = true;
if (candidate != nullptr) {
int64_t change = -static_cast<int64_t>(candidate->size());
@ -196,7 +195,7 @@ bool TransactionalCache::blacklist(void const* key, uint32_t keySize) {
endOperation();
}
return blacklisted;
return status;
}
uint64_t TransactionalCache::allocationSize(bool enableWindowedStats) {
@ -236,12 +235,12 @@ TransactionalCache::~TransactionalCache() {
uint64_t TransactionalCache::freeMemoryFrom(uint32_t hash) {
uint64_t reclaimed = 0;
bool ok;
Result status;
TransactionalBucket* bucket;
std::shared_ptr<Table> source;
std::tie(ok, bucket, source) = getBucket(hash, Cache::triesFast, false);
std::tie(status, bucket, source) = getBucket(hash, Cache::triesFast, false);
if (ok) {
if (status.ok()) {
bool maybeMigrate = false;
// evict LRU freeable value if exists
CachedValue* candidate = bucket->evictionCandidate();
@ -369,9 +368,10 @@ void TransactionalCache::migrateBucket(void* sourcePtr,
source->unlock();
}
std::tuple<bool, TransactionalBucket*, std::shared_ptr<Table>>
std::tuple<Result, TransactionalBucket*, std::shared_ptr<Table>>
TransactionalCache::getBucket(uint32_t hash, int64_t maxTries,
bool singleOperation) {
Result status;
TransactionalBucket* bucket = nullptr;
std::shared_ptr<Table> source(nullptr);
@ -393,15 +393,21 @@ TransactionalCache::getBucket(uint32_t hash, int64_t maxTries,
ok = (bucket != nullptr);
if (ok) {
bucket->updateBlacklistTerm(term);
} else {
status.reset(TRI_ERROR_LOCK_TIMEOUT);
}
} else {
status.reset(TRI_ERROR_SHUTTING_DOWN);
}
if (!ok && started) {
endOperation();
}
_state.unlock();
} else {
status.reset(TRI_ERROR_LOCK_TIMEOUT);
}
return std::make_tuple(ok, bucket, source);
return std::make_tuple(status, bucket, source);
}
Table::BucketClearer TransactionalCache::bucketClearer(Metadata* metadata) {

View File

@ -76,42 +76,43 @@ class TransactionalCache final : public Cache {
/// @brief Looks up the given key.
///
/// May report a false negative if it fails to acquire a lock in a timely
/// fashion. Should not block for long.
/// fashion. The Result contained in the return value should report an error
/// code in this case. Should not block for long.
//////////////////////////////////////////////////////////////////////////////
Finding find(void const* key, uint32_t keySize);
//////////////////////////////////////////////////////////////////////////////
/// @brief Attempts to insert the given value.
///
/// Returns true if inserted, false otherwise. Will not insert if the key is
/// Returns ok if inserted, error otherwise. Will not insert if the key is
/// (or its corresponding hash) is blacklisted. Will not insert value if this
/// would cause the total usage to exceed the limits. May also not insert
/// value if it fails to acquire a lock in a timely fashion. Should not block
/// for long.
//////////////////////////////////////////////////////////////////////////////
bool insert(CachedValue* value);
Result insert(CachedValue* value);
//////////////////////////////////////////////////////////////////////////////
/// @brief Attempts to remove the given key.
///
/// Returns true if the key guaranteed not to be in the cache, false if the
/// Returns ok if the key guaranteed not to be in the cache, error if the
/// key may remain in the cache. May leave the key in the cache if it fails to
/// acquire a lock in a timely fashion. Makes more attempts to acquire a lock
/// before quitting, so may block for longer than find or insert. Client may
/// re-try.
//////////////////////////////////////////////////////////////////////////////
bool remove(void const* key, uint32_t keySize);
Result remove(void const* key, uint32_t keySize);
//////////////////////////////////////////////////////////////////////////////
/// @brief Attempts to blacklist the given key.
///
/// Returns true if the key was blacklisted and is guaranteed not to be in the
/// cache, false otherwise. May not blacklist the key if it fails to
/// Returns ok if the key was blacklisted and is guaranteed not to be in the
/// cache, error otherwise. May not blacklist the key if it fails to
/// acquire a lock in a timely fashion. Makes more attempts to acquire a lock
/// before quitting, so may block for longer than find or insert. Client
/// should re-try.
//////////////////////////////////////////////////////////////////////////////
bool blacklist(void const* key, uint32_t keySize);
Result blacklist(void const* key, uint32_t keySize);
private:
// friend class manager and tasks
@ -131,7 +132,7 @@ class TransactionalCache final : public Cache {
std::shared_ptr<Table> newTable);
// helpers
std::tuple<bool, TransactionalBucket*, std::shared_ptr<Table>> getBucket(
std::tuple<Result, TransactionalBucket*, std::shared_ptr<Table>> getBucket(
uint32_t hash, int64_t maxTries, bool singleOperation = true);
uint32_t getIndex(uint32_t hash, bool useAuxiliary) const;

View File

@ -79,8 +79,8 @@ VPackSlice TraverserDocumentCache::lookupAndCache(StringRef id) {
cache::CachedValue::construct(key, keySize, resVal, resValSize));
if (value) {
bool success = _cache->insert(value.get());
if (!success) {
auto result = _cache->insert(value.get());
if (!result.ok()) {
LOG_TOPIC(DEBUG, Logger::GRAPHS) << "Insert failed";
} else {
// Cache is responsible.
@ -149,8 +149,8 @@ void TraverserDocumentCache::insertDocument(
cache::CachedValue::construct(key, keySize, resVal, resValSize));
if (value) {
bool success = _cache->insert(value.get());
if (!success) {
auto result = _cache->insert(value.get());
if (!result.ok()) {
LOG_TOPIC(DEBUG, Logger::GRAPHS) << "Insert document into cache failed";
} else {
// Cache is responsible.

View File

@ -1543,8 +1543,8 @@ arangodb::Result RocksDBCollection::lookupRevisionVPack(
auto entry = cache::CachedValue::construct(
key.string().data(), static_cast<uint32_t>(key.string().size()),
value.data(), static_cast<uint64_t>(value.size()));
bool cached = _cache->insert(entry);
if (!cached) {
auto status = _cache->insert(entry);
if (status.fail()) {
delete entry;
}
}
@ -1850,14 +1850,13 @@ void RocksDBCollection::blackListKey(char const* data, std::size_t len) const {
if (useCache()) {
TRI_ASSERT(_cache != nullptr);
bool blacklisted = false;
uint64_t attempts = 0;
while (!blacklisted) {
blacklisted = _cache->blacklist(data, static_cast<uint32_t>(len));
if (attempts++ % 10 == 0) {
if (_cache->isShutdown()) {
disableCache();
break;
}
auto status = _cache->blacklist(data, static_cast<uint32_t>(len));
if (status.ok()) {
blacklisted = true;
} else if (status.errorNumber() == TRI_ERROR_SHUTTING_DOWN) {
disableCache();
break;
}
}
}

View File

@ -150,38 +150,46 @@ bool RocksDBEdgeIndexIterator::next(TokenCallback const& cb, size_t limit) {
bool needRocksLookup = true;
if (_cache) {
// Try to read from cache
auto finding = _cache->find(fromTo.data(), (uint32_t)fromTo.size());
if (finding.found()) {
needRocksLookup = false;
// We got sth. in the cache
VPackSlice cachedData(finding.value()->value());
TRI_ASSERT(cachedData.isArray());
if (cachedData.length() / 2 < limit) {
// Directly return it, no need to copy
_builderIterator = VPackArrayIterator(cachedData);
while (_builderIterator.valid()) {
TRI_ASSERT(_builderIterator.value().isNumber());
cb(RocksDBToken{
_builderIterator.value().getNumericValue<uint64_t>()});
limit--;
for (size_t attempts = 0; attempts < 10; ++attempts) {
// Try to read from cache
auto finding = _cache->find(fromTo.data(), (uint32_t)fromTo.size());
if (finding.found()) {
needRocksLookup = false;
// We got sth. in the cache
VPackSlice cachedData(finding.value()->value());
TRI_ASSERT(cachedData.isArray());
if (cachedData.length() / 2 < limit) {
// Directly return it, no need to copy
_builderIterator = VPackArrayIterator(cachedData);
while (_builderIterator.valid()) {
TRI_ASSERT(_builderIterator.value().isNumber());
cb(RocksDBToken{
_builderIterator.value().getNumericValue<uint64_t>()});
limit--;
// Twice advance the iterator
_builderIterator.next();
// We always have <revision,_from> pairs
TRI_ASSERT(_builderIterator.valid());
_builderIterator.next();
// Twice advance the iterator
_builderIterator.next();
// We always have <revision,_from> pairs
TRI_ASSERT(_builderIterator.valid());
_builderIterator.next();
}
_builderIterator = VPackArrayIterator(
arangodb::basics::VelocyPackHelper::EmptyArrayValue());
} else {
// We need to copy it.
// And then we just get back to beginning of the loop
_builder.clear();
_builder.add(cachedData);
TRI_ASSERT(_builder.slice().isArray());
_builderIterator = VPackArrayIterator(_builder.slice());
// Do not set limit
}
_builderIterator = VPackArrayIterator(
arangodb::basics::VelocyPackHelper::EmptyArrayValue());
} else {
// We need to copy it.
// And then we just get back to beginning of the loop
_builder.clear();
_builder.add(cachedData);
TRI_ASSERT(_builder.slice().isArray());
_builderIterator = VPackArrayIterator(_builder.slice());
// Do not set limit
break;
}
if (finding.result().isNot(TRI_ERROR_LOCK_TIMEOUT)) {
// We really have not found an entry.
// Otherwise we do not know yet
break;
}
}
}
@ -261,9 +269,9 @@ bool RocksDBEdgeIndexIterator::nextExtra(ExtraCallback const& cb,
TRI_ASSERT(_builderIterator.value().isNumber());
RocksDBToken tkn{
_builderIterator.value().getNumericValue<uint64_t>()};
_builderIterator.next();
TRI_ASSERT(_builderIterator.valid());
TRI_ASSERT(_builderIterator.value().isString());
cb(tkn, _builderIterator.value());
@ -325,10 +333,20 @@ void RocksDBEdgeIndexIterator::lookupInRocksDB(StringRef fromTo) {
fromTo.data(), static_cast<uint32_t>(fromTo.size()),
_builder.slice().start(),
static_cast<uint64_t>(_builder.slice().byteSize()));
bool cached = cc->insert(entry);
if (!cached) {
bool inserted = false;
for (size_t attempts = 0; attempts < 10; attempts++) {
auto status = cc->insert(entry);
if (status.ok()) {
inserted = true;
break;
}
if (status.errorNumber() != TRI_ERROR_LOCK_TIMEOUT) {
break;
}
}
if (!inserted) {
LOG_TOPIC(DEBUG, arangodb::Logger::CACHE) << "Failed to cache: "
<< fromTo.toString();
<< fromTo.toString();
delete entry;
}
}
@ -640,13 +658,18 @@ void RocksDBEdgeIndex::warmup(arangodb::transaction::Methods* trx) {
// First call.
builder.clear();
previous = v.toString();
auto finding =
cc->find(previous.data(), (uint32_t)previous.size());
if (finding.found()) {
needsInsert = false;
} else {
needsInsert = true;
builder.openArray(true);
bool shouldTry = true;
while (shouldTry) {
auto finding =
cc->find(previous.data(), (uint32_t)previous.size());
if (finding.found()) {
needsInsert = false;
} else if ( // shouldTry if failed lookup was just a lock timeout
finding.result().errorNumber() != TRI_ERROR_LOCK_TIMEOUT) {
shouldTry = false;
needsInsert = true;
builder.openArray(true);
}
}
}
@ -666,7 +689,18 @@ void RocksDBEdgeIndex::warmup(arangodb::transaction::Methods* trx) {
previous.data(), static_cast<uint32_t>(previous.size()),
builder.slice().start(),
static_cast<uint64_t>(builder.slice().byteSize()));
if (!cc->insert(entry)) {
bool inserted = false;
for (size_t attempts = 0; attempts < 10; attempts++) {
auto status = cc->insert(entry);
if (status.ok()) {
inserted = true;
break;
}
if (status.errorNumber() != TRI_ERROR_LOCK_TIMEOUT) {
break;
}
}
if (!inserted) {
delete entry;
}
builder.clear();
@ -687,7 +721,7 @@ void RocksDBEdgeIndex::warmup(arangodb::transaction::Methods* trx) {
RocksDBToken token(revisionId);
if (rocksColl->readDocument(trx, token, mmdr)) {
builder.add(VPackValue(token.revisionId()));
VPackSlice doc(mmdr.vpack());
VPackSlice toFrom = _isFromIndex ? transaction::helpers::extractToFromDocument(doc) : transaction::helpers::extractFromFromDocument(doc);
TRI_ASSERT(toFrom.isString());
@ -712,7 +746,18 @@ void RocksDBEdgeIndex::warmup(arangodb::transaction::Methods* trx) {
previous.data(), static_cast<uint32_t>(previous.size()),
builder.slice().start(),
static_cast<uint64_t>(builder.slice().byteSize()));
if (!cc->insert(entry)) {
bool inserted = false;
for (size_t attempts = 0; attempts < 10; attempts++) {
auto status = cc->insert(entry);
if (status.ok()) {
inserted = true;
break;
}
if (status.errorNumber() != TRI_ERROR_LOCK_TIMEOUT) {
break;
}
}
if (!inserted) {
delete entry;
}
}

View File

@ -58,7 +58,7 @@ RocksDBIndex::RocksDBIndex(
_useCache(useCache) {
if (_useCache) {
createCache();
}
}
}
RocksDBIndex::RocksDBIndex(TRI_idx_iid_t id, LogicalCollection* collection,
@ -214,7 +214,7 @@ void RocksDBIndex::truncate(transaction::Methods* trx) {
while (iter->Valid() && cmp->Compare(iter->key(), end) < 0) {
TRI_ASSERT(_objectId == RocksDBKey::objectId(iter->key()));
Result r = mthds->Delete(_cf, iter->key());
if (!r.ok()) {
THROW_ARANGO_EXCEPTION(r);
@ -241,14 +241,13 @@ void RocksDBIndex::blackListKey(char const* data, std::size_t len){
if (useCache()) {
TRI_ASSERT(_cache != nullptr);
bool blacklisted = false;
uint64_t attempts = 0;
while (!blacklisted) {
blacklisted = _cache->blacklist(data, (uint32_t)len);
if (attempts++ % 10 == 0) {
if (_cache->isShutdown()) {
disableCache();
break;
}
auto status = _cache->blacklist(data, static_cast<uint32_t>(len));
if (status.ok()) {
blacklisted = true;
} else if (status.errorNumber() == TRI_ERROR_SHUTTING_DOWN) {
disableCache();
break;
}
}
}

View File

@ -204,8 +204,8 @@ RocksDBToken RocksDBPrimaryIndex::lookupKey(transaction::Methods* trx,
auto entry = cache::CachedValue::construct(
key.string().data(), static_cast<uint32_t>(key.string().size()),
value.buffer()->data(), static_cast<uint64_t>(value.buffer()->size()));
bool cached = _cache->insert(entry);
if (!cached) {
auto status = _cache->insert(entry);
if (status.fail()) {
delete entry;
}
}

View File

@ -91,8 +91,8 @@ TEST_CASE("cache::Manager", "[cache][!hide][longRunning]") {
size_t cacheIndex = item % cacheCount;
CachedValue* value = CachedValue::construct(&item, sizeof(uint64_t),
&item, sizeof(uint64_t));
bool ok = caches[cacheIndex]->insert(value);
if (!ok) {
auto status = caches[cacheIndex]->insert(value);
if (status.fail()) {
delete value;
}
}
@ -123,8 +123,8 @@ TEST_CASE("cache::Manager", "[cache][!hide][longRunning]") {
size_t cacheIndex = item % cacheCount;
CachedValue* value = CachedValue::construct(&item, sizeof(uint64_t),
&item, sizeof(uint64_t));
bool ok = caches[cacheIndex]->insert(value);
if (!ok) {
auto status = caches[cacheIndex]->insert(value);
if (status.fail()) {
delete value;
}
} else { // lookup something

View File

@ -66,8 +66,8 @@ TEST_CASE("cache::PlainCache", "[cache][!hide][longRunning]") {
for (uint64_t i = 0; i < 1024; i++) {
CachedValue* value =
CachedValue::construct(&i, sizeof(uint64_t), &i, sizeof(uint64_t));
bool success = cache->insert(value);
if (success) {
auto status = cache->insert(value);
if (status.ok()) {
auto f = cache->find(&i, sizeof(uint64_t));
REQUIRE(f.found());
} else {
@ -79,8 +79,8 @@ TEST_CASE("cache::PlainCache", "[cache][!hide][longRunning]") {
uint64_t j = 2 * i;
CachedValue* value =
CachedValue::construct(&i, sizeof(uint64_t), &j, sizeof(uint64_t));
bool success = cache->insert(value);
if (success) {
auto status = cache->insert(value);
if (status.ok()) {
auto f = cache->find(&i, sizeof(uint64_t));
REQUIRE(f.found());
REQUIRE(0 == memcmp(f.value()->value(), &j, sizeof(uint64_t)));
@ -92,8 +92,8 @@ TEST_CASE("cache::PlainCache", "[cache][!hide][longRunning]") {
for (uint64_t i = 1024; i < 256 * 1024; i++) {
CachedValue* value =
CachedValue::construct(&i, sizeof(uint64_t), &i, sizeof(uint64_t));
bool success = cache->insert(value);
if (success) {
auto status = cache->insert(value);
if (status.ok()) {
auto f = cache->find(&i, sizeof(uint64_t));
REQUIRE(f.found());
} else {
@ -113,8 +113,8 @@ TEST_CASE("cache::PlainCache", "[cache][!hide][longRunning]") {
for (uint64_t i = 0; i < 1024; i++) {
CachedValue* value =
CachedValue::construct(&i, sizeof(uint64_t), &i, sizeof(uint64_t));
bool success = cache->insert(value);
if (success) {
auto status = cache->insert(value);
if (status.ok()) {
auto f = cache->find(&i, sizeof(uint64_t));
REQUIRE(f.found());
REQUIRE(f.value() != nullptr);
@ -135,8 +135,8 @@ TEST_CASE("cache::PlainCache", "[cache][!hide][longRunning]") {
// test removal of bogus keys
for (uint64_t i = 1024; i < 2048; i++) {
bool removed = cache->remove(&i, sizeof(uint64_t));
BOOST_ASSERT(removed);
auto status = cache->remove(&i, sizeof(uint64_t));
REQUIRE(status.ok());
// ensure existing keys not removed
uint64_t found = 0;
for (uint64_t j = 0; j < 1024; j++) {
@ -152,8 +152,8 @@ TEST_CASE("cache::PlainCache", "[cache][!hide][longRunning]") {
// remove actual keys
for (uint64_t i = 0; i < 1024; i++) {
bool removed = cache->remove(&i, sizeof(uint64_t));
REQUIRE(removed);
auto status = cache->remove(&i, sizeof(uint64_t));
REQUIRE(status.ok());
auto f = cache->find(&i, sizeof(uint64_t));
REQUIRE(!f.found());
}
@ -170,8 +170,8 @@ TEST_CASE("cache::PlainCache", "[cache][!hide][longRunning]") {
for (uint64_t i = 0; i < 4 * 1024 * 1024; i++) {
CachedValue* value =
CachedValue::construct(&i, sizeof(uint64_t), &i, sizeof(uint64_t));
bool success = cache->insert(value);
if (!success) {
auto status = cache->insert(value);
if (status.fail()) {
delete value;
}
}
@ -200,8 +200,8 @@ TEST_CASE("cache::PlainCache", "[cache][!hide][longRunning]") {
uint64_t item = lower + i;
CachedValue* value = CachedValue::construct(&item, sizeof(uint64_t),
&item, sizeof(uint64_t));
bool ok = cache->insert(value);
if (!ok) {
auto status = cache->insert(value);
if (status.fail()) {
delete value;
}
}
@ -230,8 +230,8 @@ TEST_CASE("cache::PlainCache", "[cache][!hide][longRunning]") {
uint64_t item = ++validUpper;
CachedValue* value = CachedValue::construct(&item, sizeof(uint64_t),
&item, sizeof(uint64_t));
bool ok = cache->insert(value);
if (!ok) {
auto status = cache->insert(value);
if (status.fail()) {
delete value;
}
} else { // lookup something
@ -280,19 +280,22 @@ TEST_CASE("cache::PlainCache", "[cache][!hide][longRunning]") {
for (uint64_t i = 0; i < 1024; i++) {
CachedValue* value =
CachedValue::construct(&i, sizeof(uint64_t), &i, sizeof(uint64_t));
if (!cacheHit->insert(value)) {
auto status = cacheHit->insert(value);
if (status.fail()) {
delete value;
}
value =
CachedValue::construct(&i, sizeof(uint64_t), &i, sizeof(uint64_t));
if (!cacheMiss->insert(value)) {
status = cacheMiss->insert(value);
if (status.fail()) {
delete value;
}
value =
CachedValue::construct(&i, sizeof(uint64_t), &i, sizeof(uint64_t));
if (!cacheMixed->insert(value)) {
status = cacheMixed->insert(value);
if (status.fail()) {
delete value;
}
}

View File

@ -87,8 +87,8 @@ TEST_CASE("cache::Rebalancer", "[cache][!hide][longRunning]") {
size_t cacheIndex = item % cacheCount;
CachedValue* value = CachedValue::construct(&item, sizeof(uint64_t),
&item, sizeof(uint64_t));
bool ok = caches[cacheIndex]->insert(value);
if (!ok) {
auto status = caches[cacheIndex]->insert(value);
if (status.fail()) {
delete value;
}
}
@ -119,8 +119,8 @@ TEST_CASE("cache::Rebalancer", "[cache][!hide][longRunning]") {
size_t cacheIndex = item % cacheCount;
CachedValue* value = CachedValue::construct(&item, sizeof(uint64_t),
&item, sizeof(uint64_t));
bool ok = caches[cacheIndex]->insert(value);
if (!ok) {
auto status = caches[cacheIndex]->insert(value);
if (status.fail()) {
delete value;
}
} else { // lookup something
@ -208,8 +208,8 @@ TEST_CASE("cache::Rebalancer", "[cache][!hide][longRunning]") {
size_t cacheIndex = item % cacheCount;
CachedValue* value = CachedValue::construct(&item, sizeof(uint64_t),
&item, sizeof(uint64_t));
bool ok = caches[cacheIndex]->insert(value);
if (!ok) {
auto status = caches[cacheIndex]->insert(value);
if (status.fail()) {
delete value;
}
}
@ -244,8 +244,8 @@ TEST_CASE("cache::Rebalancer", "[cache][!hide][longRunning]") {
size_t cacheIndex = item % cacheCount;
CachedValue* value = CachedValue::construct(&item, sizeof(uint64_t),
&item, sizeof(uint64_t));
bool ok = caches[cacheIndex]->insert(value);
if (!ok) {
auto status = caches[cacheIndex]->insert(value);
if (status.fail()) {
delete value;
}
} else if (r >= 80) { // blacklist something

View File

@ -69,8 +69,8 @@ TEST_CASE("cache::TransactionalCache", "[cache][!hide][longRunning]") {
for (uint64_t i = 0; i < 1024; i++) {
CachedValue* value =
CachedValue::construct(&i, sizeof(uint64_t), &i, sizeof(uint64_t));
bool success = cache->insert(value);
if (success) {
auto status = cache->insert(value);
if (status.ok()) {
auto f = cache->find(&i, sizeof(uint64_t));
REQUIRE(f.found());
} else {
@ -82,8 +82,8 @@ TEST_CASE("cache::TransactionalCache", "[cache][!hide][longRunning]") {
uint64_t j = 2 * i;
CachedValue* value =
CachedValue::construct(&i, sizeof(uint64_t), &j, sizeof(uint64_t));
bool success = cache->insert(value);
if (success) {
auto status = cache->insert(value);
if (status.ok()) {
auto f = cache->find(&i, sizeof(uint64_t));
REQUIRE(f.found());
REQUIRE(0 == memcmp(f.value()->value(), &j, sizeof(uint64_t)));
@ -95,8 +95,8 @@ TEST_CASE("cache::TransactionalCache", "[cache][!hide][longRunning]") {
for (uint64_t i = 1024; i < 256 * 1024; i++) {
CachedValue* value =
CachedValue::construct(&i, sizeof(uint64_t), &i, sizeof(uint64_t));
bool success = cache->insert(value);
if (success) {
auto status = cache->insert(value);
if (status.ok()) {
auto f = cache->find(&i, sizeof(uint64_t));
REQUIRE(f.found());
} else {
@ -117,8 +117,8 @@ TEST_CASE("cache::TransactionalCache", "[cache][!hide][longRunning]") {
for (uint64_t i = 0; i < 1024; i++) {
CachedValue* value =
CachedValue::construct(&i, sizeof(uint64_t), &i, sizeof(uint64_t));
bool success = cache->insert(value);
if (success) {
auto status = cache->insert(value);
if (status.ok()) {
auto f = cache->find(&i, sizeof(uint64_t));
REQUIRE(f.found());
REQUIRE(f.value() != nullptr);
@ -139,8 +139,8 @@ TEST_CASE("cache::TransactionalCache", "[cache][!hide][longRunning]") {
// test removal of bogus keys
for (uint64_t i = 1024; i < 2048; i++) {
bool removed = cache->remove(&i, sizeof(uint64_t));
REQUIRE(removed);
auto status = cache->remove(&i, sizeof(uint64_t));
REQUIRE(status.ok());
// ensure existing keys not removed
uint64_t found = 0;
for (uint64_t j = 0; j < 1024; j++) {
@ -156,8 +156,8 @@ TEST_CASE("cache::TransactionalCache", "[cache][!hide][longRunning]") {
// remove actual keys
for (uint64_t i = 0; i < 1024; i++) {
bool removed = cache->remove(&i, sizeof(uint64_t));
REQUIRE(removed);
auto status = cache->remove(&i, sizeof(uint64_t));
REQUIRE(status.ok());
auto f = cache->find(&i, sizeof(uint64_t));
REQUIRE(!f.found());
}
@ -176,8 +176,8 @@ TEST_CASE("cache::TransactionalCache", "[cache][!hide][longRunning]") {
for (uint64_t i = 0; i < 1024; i++) {
CachedValue* value =
CachedValue::construct(&i, sizeof(uint64_t), &i, sizeof(uint64_t));
bool success = cache->insert(value);
if (success) {
auto status = cache->insert(value);
if (status.ok()) {
auto f = cache->find(&i, sizeof(uint64_t));
REQUIRE(f.found());
REQUIRE(f.value() != nullptr);
@ -188,8 +188,8 @@ TEST_CASE("cache::TransactionalCache", "[cache][!hide][longRunning]") {
}
for (uint64_t i = 512; i < 1024; i++) {
bool success = cache->blacklist(&i, sizeof(uint64_t));
REQUIRE(success);
auto status = cache->blacklist(&i, sizeof(uint64_t));
REQUIRE(status.ok());
auto f = cache->find(&i, sizeof(uint64_t));
REQUIRE(!f.found());
}
@ -197,8 +197,8 @@ TEST_CASE("cache::TransactionalCache", "[cache][!hide][longRunning]") {
for (uint64_t i = 512; i < 1024; i++) {
CachedValue* value =
CachedValue::construct(&i, sizeof(uint64_t), &i, sizeof(uint64_t));
bool success = cache->insert(value);
REQUIRE(!success);
auto status = cache->insert(value);
REQUIRE(status.fail());
delete value;
auto f = cache->find(&i, sizeof(uint64_t));
REQUIRE(!f.found());
@ -211,8 +211,8 @@ TEST_CASE("cache::TransactionalCache", "[cache][!hide][longRunning]") {
for (uint64_t i = 512; i < 1024; i++) {
CachedValue* value =
CachedValue::construct(&i, sizeof(uint64_t), &i, sizeof(uint64_t));
bool success = cache->insert(value);
if (success) {
auto status = cache->insert(value);
if (status.ok()) {
reinserted++;
auto f = cache->find(&i, sizeof(uint64_t));
REQUIRE(f.found());
@ -235,8 +235,8 @@ TEST_CASE("cache::TransactionalCache", "[cache][!hide][longRunning]") {
for (uint64_t i = 0; i < 4 * 1024 * 1024; i++) {
CachedValue* value =
CachedValue::construct(&i, sizeof(uint64_t), &i, sizeof(uint64_t));
bool success = cache->insert(value);
if (!success) {
auto status = cache->insert(value);
if (status.fail()) {
delete value;
}
}
@ -267,8 +267,8 @@ TEST_CASE("cache::TransactionalCache", "[cache][!hide][longRunning]") {
uint64_t item = lower + i;
CachedValue* value = CachedValue::construct(&item, sizeof(uint64_t),
&item, sizeof(uint64_t));
bool ok = cache->insert(value);
if (!ok) {
auto status = cache->insert(value);
if (status.fail()) {
delete value;
}
}
@ -301,8 +301,8 @@ TEST_CASE("cache::TransactionalCache", "[cache][!hide][longRunning]") {
}
CachedValue* value = CachedValue::construct(&item, sizeof(uint64_t),
&item, sizeof(uint64_t));
bool ok = cache->insert(value);
if (!ok) {
auto status = cache->insert(value);
if (status.fail()) {
delete value;
}
} else if (r >= 80) { // blacklist something

View File

@ -258,8 +258,8 @@ TransactionalStore::Document TransactionalStore::lookup(
memcpy(&result, buffer.data(), sizeof(Document));
CachedValue* value = CachedValue::construct(&key, sizeof(uint64_t),
&result, sizeof(Document));
bool inserted = _cache->insert(value);
if (!inserted) {
auto status = _cache->insert(value);
if (status.fail()) {
delete value;
}
}