1
0
Fork 0

Merge branch 'devel' of https://github.com/arangodb/arangodb into devel

This commit is contained in:
jsteemann 2017-05-11 20:58:49 +02:00
commit 68611a0a6a
4 changed files with 126 additions and 11 deletions

View File

@ -27,6 +27,10 @@
#include "Basics/StaticStrings.h"
#include "Basics/VelocyPackHelper.h"
#include "Basics/WriteLocker.h"
#include "Cache/CacheManagerFeature.h"
#include "Cache/TransactionalCache.h"
#include "Cache/Common.h"
#include "Cache/Manager.h"
#include "Cluster/ClusterMethods.h"
#include "Cluster/CollectionLockState.h"
#include "Indexes/Index.h"
@ -82,9 +86,15 @@ RocksDBCollection::RocksDBCollection(LogicalCollection* collection,
_objectId(basics::VelocyPackHelper::stringUInt64(info, "objectId")),
_numberDocuments(0),
_revisionId(0),
_hasGeoIndex(false) {
_hasGeoIndex(false),
_cache(nullptr),
_cachePresent(false),
_useCache(true) {
addCollectionMapping(_objectId, _logicalCollection->vocbase()->id(),
_logicalCollection->cid());
if (_useCache) {
createCache();
}
}
RocksDBCollection::RocksDBCollection(LogicalCollection* collection,
@ -93,12 +103,27 @@ RocksDBCollection::RocksDBCollection(LogicalCollection* collection,
_objectId(static_cast<RocksDBCollection*>(physical)->_objectId),
_numberDocuments(0),
_revisionId(0),
_hasGeoIndex(false) {
_hasGeoIndex(false),
_cache(nullptr),
_cachePresent(false),
_useCache(true) {
addCollectionMapping(_objectId, _logicalCollection->vocbase()->id(),
_logicalCollection->cid());
if (_useCache) {
createCache();
}
}
RocksDBCollection::~RocksDBCollection() {}
RocksDBCollection::~RocksDBCollection() {
if (useCache()) {
try {
TRI_ASSERT(_cache != nullptr);
TRI_ASSERT(CacheManagerFeature::MANAGER != nullptr);
CacheManagerFeature::MANAGER->destroyCache(_cache);
} catch (...) {
}
}
}
std::string const& RocksDBCollection::path() const {
return Empty; // we do not have any path
@ -617,6 +642,8 @@ void RocksDBCollection::truncate(transaction::Methods* trx,
VPackSlice(iter->value().data()).get(StaticStrings::KeyString);
TRI_ASSERT(key.isString());
blackListKey(iter->key().data(), static_cast<uint32_t>(iter->key().size()));
// add possible log statement
state->prepareOperation(cid, revisionId, StringRef(key),
TRI_VOC_DOCUMENT_OPERATION_REMOVE);
@ -1388,6 +1415,8 @@ RocksDBOperationResult RocksDBCollection::insertDocument(
RocksDBKey key(RocksDBKey::Document(_objectId, revisionId));
RocksDBValue value(RocksDBValue::Document(doc));
blackListKey(key.string().data(), static_cast<uint32_t>(key.string().size()));
rocksdb::Transaction* rtrx = rocksTransaction(trx);
rocksdb::Status status = rtrx->Put(key.string(), value.string());
@ -1447,6 +1476,8 @@ RocksDBOperationResult RocksDBCollection::removeDocument(
auto key = RocksDBKey::Document(_objectId, revisionId);
blackListKey(key.string().data(), static_cast<uint32_t>(key.string().size()));
rocksdb::Transaction* rtrx = rocksTransaction(trx);
rtrx->PutLogData(RocksDBLogValue::DocumentRemove(
@ -1551,12 +1582,38 @@ arangodb::Result RocksDBCollection::lookupRevisionVPack(
auto key = RocksDBKey::Document(_objectId, revisionId);
std::string value;
if (useCache()) {
TRI_ASSERT(_cache != nullptr);
// check cache first for fast path
auto f = _cache->find(key.string().data(),
static_cast<uint32_t>(key.string().size()));
if (f.found()) {
value.append(reinterpret_cast<char const*>(f.value()->value()),
static_cast<size_t>(f.value()->valueSize));
mdr.setManaged(std::move(value), revisionId);
return {TRI_ERROR_NO_ERROR};
}
}
auto* state = toRocksTransactionState(trx);
rocksdb::Status status = state->rocksTransaction()->Get(state->readOptions(),
key.string(), &value);
TRI_ASSERT(value.data());
auto result = convertStatus(status);
if (result.ok()) {
if (useCache()) {
TRI_ASSERT(_cache != nullptr);
// write entry back to cache
auto entry = cache::CachedValue::construct(
key.string().data(), static_cast<uint32_t>(key.string().size()),
value.data(), static_cast<uint64_t>(value.size()));
bool cached = _cache->insert(entry);
if (!cached) {
delete entry;
}
}
mdr.setManaged(std::move(value), revisionId);
} else {
mdr.reset();
@ -1745,3 +1802,52 @@ void RocksDBCollection::estimateSize(velocypack::Builder& builder) {
builder.add("total", VPackValue(total));
builder.close();
}
void RocksDBCollection::createCache() const {
if (!_useCache || _cachePresent) {
// we leave this if we do not need the cache
// or if cache already created
return;
}
TRI_ASSERT(_useCache);
TRI_ASSERT(_cache.get() == nullptr);
TRI_ASSERT(CacheManagerFeature::MANAGER != nullptr);
_cache = CacheManagerFeature::MANAGER->createCache(
cache::CacheType::Transactional);
_cachePresent = (_cache.get() != nullptr);
TRI_ASSERT(_useCache);
}
void RocksDBCollection::disableCache() const {
if (!_cachePresent) {
return;
}
TRI_ASSERT(CacheManagerFeature::MANAGER != nullptr);
// must have a cache...
TRI_ASSERT(_useCache);
TRI_ASSERT(_cachePresent);
TRI_ASSERT(_cache.get() != nullptr);
CacheManagerFeature::MANAGER->destroyCache(_cache);
_cache.reset();
_cachePresent = false;
TRI_ASSERT(_useCache);
}
// blacklist given key from transactional cache
void RocksDBCollection::blackListKey(char const* data, std::size_t len) const {
if (useCache()) {
TRI_ASSERT(_cache != nullptr);
bool blacklisted = false;
uint64_t attempts = 0;
while (!blacklisted) {
blacklisted = _cache->blacklist(data,len);
if (attempts++ % 10 == 0) {
if (_cache->isShutdown()) {
disableCache();
break;
}
}
}
}
}

View File

@ -33,6 +33,9 @@
#include "VocBase/ManagedDocumentResult.h"
namespace arangodb {
namespace cache {
class Cache;
}
class LogicalCollection;
class ManagedDocumentResult;
class Result;
@ -217,6 +220,11 @@ class RocksDBCollection final : public PhysicalCollection {
arangodb::Result lookupRevisionVPack(TRI_voc_rid_t, transaction::Methods*,
arangodb::ManagedDocumentResult&) const;
void createCache() const;
void disableCache() const;
inline bool useCache() const { return (_useCache && _cachePresent); }
void blackListKey(char const* data, std::size_t len) const;
private:
uint64_t const _objectId; // rocksdb-specific object id for collection
std::atomic<uint64_t> _numberDocuments;
@ -225,6 +233,12 @@ class RocksDBCollection final : public PhysicalCollection {
/// upgrade write locks to exclusive locks if this flag is set
bool _hasGeoIndex;
basics::ReadWriteLock _exclusiveLock;
mutable std::shared_ptr<cache::Cache> _cache;
// we use this boolean for testing whether _cache is set.
// it's quicker than accessing the shared_ptr each time
mutable bool _cachePresent;
bool _useCache;
};
inline RocksDBCollection* toRocksDBCollection(PhysicalCollection* physical) {

View File

@ -58,7 +58,7 @@ RocksDBIndex::RocksDBIndex(
}
RocksDBIndex::RocksDBIndex(TRI_idx_iid_t id, LogicalCollection* collection,
VPackSlice const& info,bool useCache)
VPackSlice const& info, bool useCache)
: Index(id, collection, info),
_objectId(basics::VelocyPackHelper::stringUInt64(info.get("objectId"))),
_cmp(static_cast<RocksDBEngine*>(EngineSelectorFeature::ENGINE)->cmp()),

View File

@ -297,14 +297,9 @@ RocksDBPrimaryIndex::RocksDBPrimaryIndex(
{{arangodb::basics::AttributeName(
StaticStrings::KeyString, false)}}),
true, false,
basics::VelocyPackHelper::stringUInt64(info, "objectId")
,!ServerState::instance()->isCoordinator() /*useCache*/
) {
basics::VelocyPackHelper::stringUInt64(info, "objectId"),
!ServerState::instance()->isCoordinator() /*useCache*/) {
TRI_ASSERT(_objectId != 0);
if (_objectId == 0 ) {
//disableCache
_useCache = false;
}
}
RocksDBPrimaryIndex::~RocksDBPrimaryIndex() {}