1
0
Fork 0

Cache workaround to pass upgrade test.

This commit is contained in:
Dan Larkin 2017-04-05 12:55:16 -04:00
parent 4ca2cf52d3
commit ae84e8ac84
6 changed files with 62 additions and 23 deletions

View File

@ -163,6 +163,15 @@ bool Cache::isResizing() {
return resizing;
}
bool Cache::isShutdown() {
bool shutdown = false;
_state.lock();
shutdown = !isOperational();
_state.unlock();
return shutdown;
}
void Cache::destroy(std::shared_ptr<Cache> cache) {
if (cache.get() != nullptr) {
cache->shutdown();

View File

@ -112,6 +112,11 @@ class Cache : public std::enable_shared_from_this<Cache> {
//////////////////////////////////////////////////////////////////////////////
bool isResizing();
//////////////////////////////////////////////////////////////////////////////
/// @brief Check whether the cache has begin the process of shutting down.
//////////////////////////////////////////////////////////////////////////////
bool isShutdown();
protected:
static constexpr int64_t triesFast = 50;
static constexpr int64_t triesSlow = 10000;

View File

@ -32,6 +32,7 @@
#include "Cache/State.h"
#include "Cache/Table.h"
#include "Cache/TransactionalBucket.h"
#include "Logger/Logger.h"
#include <stdint.h>
#include <atomic>

View File

@ -73,6 +73,14 @@ void RocksDBIndex::createCache() {
}
}
void RocksDBIndex::disableCache() {
TRI_ASSERT(_cacheManager != nullptr);
TRI_ASSERT(_useCache);
TRI_ASSERT(_cache.get() != nullptr);
_useCache = false;
_cacheManager->destroyCache(_cache);
_cache.reset();
}
int RocksDBIndex::drop() {
// Try to drop the cache as well.
if (_useCache && _cache != nullptr) {

View File

@ -48,7 +48,6 @@ class RocksDBIndex : public Index {
arangodb::velocypack::Slice const&);
public:
~RocksDBIndex();
uint64_t objectId() const { return _objectId; }
@ -56,7 +55,7 @@ class RocksDBIndex : public Index {
bool isPersistent() const override final { return true; }
int drop() override;
int unload() override {
// nothing to do here yet
// TODO: free the cache the index uses
@ -64,13 +63,14 @@ class RocksDBIndex : public Index {
}
/// @brief provides a size hint for the index
int sizeHint(transaction::Methods* /*trx*/, size_t /*size*/) override final{
int sizeHint(transaction::Methods* /*trx*/, size_t /*size*/) override final {
// nothing to do here
return TRI_ERROR_NO_ERROR;
}
protected:
void createCache();
void disableCache();
protected:
uint64_t _objectId;

View File

@ -178,19 +178,20 @@ bool RocksDBAllIndexIterator::outOfRange() const {
uint64_t RocksDBAnyIndexIterator::OFFSET = 0;
RocksDBAnyIndexIterator::RocksDBAnyIndexIterator(LogicalCollection* collection, transaction::Methods* trx,
ManagedDocumentResult* mmdr, RocksDBPrimaryIndex const* index)
: IndexIterator(collection, trx, mmdr, index),
_cmp(index->_cmp),
_bounds(RocksDBKeyBounds::PrimaryIndex(index->objectId())) {
RocksDBAnyIndexIterator::RocksDBAnyIndexIterator(
LogicalCollection* collection, transaction::Methods* trx,
ManagedDocumentResult* mmdr, RocksDBPrimaryIndex const* index)
: IndexIterator(collection, trx, mmdr, index),
_cmp(index->_cmp),
_bounds(RocksDBKeyBounds::PrimaryIndex(index->objectId())) {
// aquire rocksdb transaction
RocksDBTransactionState* state = rocksutils::toRocksTransactionState(trx);
rocksdb::Transaction* rtrx = state->rocksTransaction();
auto options = state->readOptions();
_iterator.reset(rtrx->GetIterator(options));
_iterator->Seek(_bounds.start());
// not thread safe by design
uint64_t off = OFFSET++;
while (_iterator->Valid() && --off > 0) {
@ -209,24 +210,22 @@ bool RocksDBAnyIndexIterator::next(TokenCallback const& cb, size_t limit) {
TRI_ASSERT(limit > 0); // Someone called with limit == 0. Api broken
return false;
}
while (limit > 0) {
RocksDBToken token(RocksDBValue::revisionId(_iterator->value()));
cb(token);
--limit;
_iterator->Next();
if (!_iterator->Valid() || outOfRange()) {
return false;
}
}
return true;
}
void RocksDBAnyIndexIterator::reset() {
_iterator->Seek(_bounds.start());
}
void RocksDBAnyIndexIterator::reset() { _iterator->Seek(_bounds.start()); }
bool RocksDBAnyIndexIterator::outOfRange() const {
return _cmp->Compare(_iterator->key(), _bounds.end()) > 0;
@ -350,9 +349,18 @@ int RocksDBPrimaryIndex::insert(transaction::Methods* trx,
if (_useCache) {
// blacklist from cache
bool blacklisted = false;
uint64_t attempts = 0;
while (!blacklisted) {
blacklisted = _cache->blacklist(
key.string().data(), static_cast<uint32_t>(key.string().size()));
attempts++;
if (attempts > 10) {
if (_cache->isShutdown()) {
disableCache();
break;
}
attempts = 0;
}
}
}
@ -376,9 +384,18 @@ int RocksDBPrimaryIndex::remove(transaction::Methods* trx,
if (_useCache) {
// blacklist from cache
bool blacklisted = false;
uint64_t attempts = 0;
while (!blacklisted) {
blacklisted = _cache->blacklist(
key.string().data(), static_cast<uint32_t>(key.string().size()));
attempts++;
if (attempts > 10) {
if (_cache->isShutdown()) {
disableCache();
break;
}
attempts = 0;
}
}
}
@ -468,15 +485,16 @@ IndexIterator* RocksDBPrimaryIndex::allIterator(transaction::Methods* trx,
/// @brief request an iterator over all elements in the index in
/// a sequential order.
IndexIterator* RocksDBPrimaryIndex::anyIterator(transaction::Methods* trx,
ManagedDocumentResult* mmdr) const {
IndexIterator* RocksDBPrimaryIndex::anyIterator(
transaction::Methods* trx, ManagedDocumentResult* mmdr) const {
return new RocksDBAnyIndexIterator(_collection, trx, mmdr, this);
}
void RocksDBPrimaryIndex::invokeOnAllElements(transaction::Methods* trx,
std::function<bool(DocumentIdentifierToken const&)> callback) {
void RocksDBPrimaryIndex::invokeOnAllElements(
transaction::Methods* trx,
std::function<bool(DocumentIdentifierToken const&)> callback) {
ManagedDocumentResult mmdr;
std::unique_ptr<IndexIterator> cursor (allIterator(trx, &mmdr, false));
std::unique_ptr<IndexIterator> cursor(allIterator(trx, &mmdr, false));
bool cnt = true;
auto cb = [&](DocumentIdentifierToken token) {
if (cnt) {
@ -484,11 +502,9 @@ void RocksDBPrimaryIndex::invokeOnAllElements(transaction::Methods* trx,
}
};
while (cursor->next(cb, 1000) && cnt) {
}
}
/// @brief create the iterator, for a single attribute, IN operator
IndexIterator* RocksDBPrimaryIndex::createInIterator(
transaction::Methods* trx, ManagedDocumentResult* mmdr,