mirror of https://gitee.com/bigwinds/arangodb
add blacklisting for keys in rocksdb-edge-index on insert/remove
This commit is contained in:
parent
e29db44af6
commit
738989f9a3
|
@ -288,8 +288,11 @@ int RocksDBEdgeIndex::insert(transaction::Methods* trx,
|
||||||
VPackSlice primaryKey = doc.get(StaticStrings::KeyString);
|
VPackSlice primaryKey = doc.get(StaticStrings::KeyString);
|
||||||
VPackSlice fromTo = doc.get(_directionAttr);
|
VPackSlice fromTo = doc.get(_directionAttr);
|
||||||
TRI_ASSERT(primaryKey.isString() && fromTo.isString());
|
TRI_ASSERT(primaryKey.isString() && fromTo.isString());
|
||||||
RocksDBKey key = RocksDBKey::EdgeIndexValue(_objectId, StringRef(fromTo),
|
auto fromToRef=StringRef(fromTo);
|
||||||
|
RocksDBKey key = RocksDBKey::EdgeIndexValue(_objectId, fromToRef,
|
||||||
StringRef(primaryKey));
|
StringRef(primaryKey));
|
||||||
|
//blacklist key in cache
|
||||||
|
cacheBlackListKey(fromToRef.data(),fromToRef.size());
|
||||||
|
|
||||||
// acquire rocksdb transaction
|
// acquire rocksdb transaction
|
||||||
RocksDBTransactionState* state = rocksutils::toRocksTransactionState(trx);
|
RocksDBTransactionState* state = rocksutils::toRocksTransactionState(trx);
|
||||||
|
@ -314,10 +317,14 @@ int RocksDBEdgeIndex::remove(transaction::Methods* trx,
|
||||||
bool isRollback) {
|
bool isRollback) {
|
||||||
VPackSlice primaryKey = doc.get(StaticStrings::KeyString);
|
VPackSlice primaryKey = doc.get(StaticStrings::KeyString);
|
||||||
VPackSlice fromTo = doc.get(_directionAttr);
|
VPackSlice fromTo = doc.get(_directionAttr);
|
||||||
|
auto fromToRef=StringRef(fromTo);
|
||||||
TRI_ASSERT(primaryKey.isString() && fromTo.isString());
|
TRI_ASSERT(primaryKey.isString() && fromTo.isString());
|
||||||
RocksDBKey key = RocksDBKey::EdgeIndexValue(_objectId, StringRef(fromTo),
|
RocksDBKey key = RocksDBKey::EdgeIndexValue(_objectId, fromToRef,
|
||||||
StringRef(primaryKey));
|
StringRef(primaryKey));
|
||||||
|
|
||||||
|
//blacklist key in cache
|
||||||
|
cacheBlackListKey(fromToRef.data(),fromToRef.size());
|
||||||
|
|
||||||
// acquire rocksdb transaction
|
// acquire rocksdb transaction
|
||||||
RocksDBTransactionState* state = rocksutils::toRocksTransactionState(trx);
|
RocksDBTransactionState* state = rocksutils::toRocksTransactionState(trx);
|
||||||
rocksdb::Transaction* rtrx = state->rocksTransaction();
|
rocksdb::Transaction* rtrx = state->rocksTransaction();
|
||||||
|
@ -335,8 +342,11 @@ int RocksDBEdgeIndex::removeRaw(rocksdb::WriteBatch* writeBatch,
|
||||||
VPackSlice primaryKey = doc.get(StaticStrings::KeyString);
|
VPackSlice primaryKey = doc.get(StaticStrings::KeyString);
|
||||||
VPackSlice fromTo = doc.get(_directionAttr);
|
VPackSlice fromTo = doc.get(_directionAttr);
|
||||||
TRI_ASSERT(primaryKey.isString() && fromTo.isString());
|
TRI_ASSERT(primaryKey.isString() && fromTo.isString());
|
||||||
RocksDBKey key = RocksDBKey::EdgeIndexValue(_objectId, StringRef(fromTo),
|
auto fromToRef=StringRef(fromTo);
|
||||||
|
RocksDBKey key = RocksDBKey::EdgeIndexValue(_objectId, fromToRef,
|
||||||
StringRef(primaryKey));
|
StringRef(primaryKey));
|
||||||
|
//blacklist key in cache
|
||||||
|
cacheBlackListKey(fromToRef.data(),fromToRef.size());
|
||||||
writeBatch->Delete(rocksdb::Slice(key.string()));
|
writeBatch->Delete(rocksdb::Slice(key.string()));
|
||||||
return TRI_ERROR_NO_ERROR;
|
return TRI_ERROR_NO_ERROR;
|
||||||
}
|
}
|
||||||
|
@ -353,9 +363,11 @@ void RocksDBEdgeIndex::batchInsert(
|
||||||
VPackSlice primaryKey = doc.second.get(StaticStrings::KeyString);
|
VPackSlice primaryKey = doc.second.get(StaticStrings::KeyString);
|
||||||
VPackSlice fromTo = doc.second.get(_directionAttr);
|
VPackSlice fromTo = doc.second.get(_directionAttr);
|
||||||
TRI_ASSERT(primaryKey.isString() && fromTo.isString());
|
TRI_ASSERT(primaryKey.isString() && fromTo.isString());
|
||||||
RocksDBKey key = RocksDBKey::EdgeIndexValue(_objectId, StringRef(fromTo),
|
auto fromToRef=StringRef(fromTo);
|
||||||
|
RocksDBKey key = RocksDBKey::EdgeIndexValue(_objectId, fromToRef,
|
||||||
StringRef(primaryKey));
|
StringRef(primaryKey));
|
||||||
|
|
||||||
|
cacheBlackListKey(fromToRef.data(),fromToRef.size());
|
||||||
rocksdb::Status status =
|
rocksdb::Status status =
|
||||||
rtrx->Put(rocksdb::Slice(key.string()), rocksdb::Slice());
|
rtrx->Put(rocksdb::Slice(key.string()), rocksdb::Slice());
|
||||||
if (!status.ok()) {
|
if (!status.ok()) {
|
||||||
|
@ -492,7 +504,7 @@ IndexIterator* RocksDBEdgeIndex::createEqIterator(
|
||||||
keys->close();
|
keys->close();
|
||||||
|
|
||||||
return new RocksDBEdgeIndexIterator(
|
return new RocksDBEdgeIndexIterator(
|
||||||
_collection, trx, mmdr, this, keys, _useCache, _cache.get());
|
_collection, trx, mmdr, this, keys, useCache(), _cache.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
/// @brief create the iterator
|
/// @brief create the iterator
|
||||||
|
@ -519,7 +531,7 @@ IndexIterator* RocksDBEdgeIndex::createInIterator(
|
||||||
keys->close();
|
keys->close();
|
||||||
|
|
||||||
return new RocksDBEdgeIndexIterator(
|
return new RocksDBEdgeIndexIterator(
|
||||||
_collection, trx, mmdr, this, keys, _useCache, _cache.get());
|
_collection, trx, mmdr, this, keys, useCache(), _cache.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
/// @brief add a single value node to the iterator's keys
|
/// @brief add a single value node to the iterator's keys
|
||||||
|
|
|
@ -24,6 +24,7 @@
|
||||||
#include "RocksDBIndex.h"
|
#include "RocksDBIndex.h"
|
||||||
#include "Basics/VelocyPackHelper.h"
|
#include "Basics/VelocyPackHelper.h"
|
||||||
#include "Cache/CacheManagerFeature.h"
|
#include "Cache/CacheManagerFeature.h"
|
||||||
|
#include "Cache/TransactionalCache.h"
|
||||||
#include "Cache/Common.h"
|
#include "Cache/Common.h"
|
||||||
#include "Cache/Manager.h"
|
#include "Cache/Manager.h"
|
||||||
#include "RocksDBEngine/RocksDBComparator.h"
|
#include "RocksDBEngine/RocksDBComparator.h"
|
||||||
|
@ -138,3 +139,22 @@ int RocksDBIndex::drop() {
|
||||||
return TRI_ERROR_NO_ERROR;
|
return TRI_ERROR_NO_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void RocksDBIndex::cacheBlackListKey(char const* data, std::size_t len){
|
||||||
|
if (useCache()) {
|
||||||
|
TRI_ASSERT(_cache != nullptr);
|
||||||
|
// blacklist from cache
|
||||||
|
bool blacklisted = false;
|
||||||
|
uint64_t attempts = 0;
|
||||||
|
while (!blacklisted) {
|
||||||
|
blacklisted = _cache->blacklist(data,len);
|
||||||
|
attempts++;
|
||||||
|
if (attempts > 10) {
|
||||||
|
if (_cache->isShutdown()) {
|
||||||
|
disableCache();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
attempts = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -89,6 +89,7 @@ class RocksDBIndex : public Index {
|
||||||
void createCache();
|
void createCache();
|
||||||
void disableCache();
|
void disableCache();
|
||||||
inline bool useCache() const { return (_useCache && _cachePresent); }
|
inline bool useCache() const { return (_useCache && _cachePresent); }
|
||||||
|
void cacheBlackListKey(char const* data, std::size_t len);
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
uint64_t _objectId;
|
uint64_t _objectId;
|
||||||
|
|
|
@ -431,24 +431,7 @@ int RocksDBPrimaryIndex::insert(transaction::Methods* trx,
|
||||||
return TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED;
|
return TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (useCache()) {
|
cacheBlackListKey(key.string().data(), static_cast<uint32_t>(key.string().size()));
|
||||||
TRI_ASSERT(_cache != nullptr);
|
|
||||||
// blacklist from cache
|
|
||||||
bool blacklisted = false;
|
|
||||||
uint64_t attempts = 0;
|
|
||||||
while (!blacklisted) {
|
|
||||||
blacklisted = _cache->blacklist(
|
|
||||||
key.string().data(), static_cast<uint32_t>(key.string().size()));
|
|
||||||
attempts++;
|
|
||||||
if (attempts > 10) {
|
|
||||||
if (_cache->isShutdown()) {
|
|
||||||
disableCache();
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
attempts = 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
auto status = rtrx->Put(key.string(), value.string());
|
auto status = rtrx->Put(key.string(), value.string());
|
||||||
if (!status.ok()) {
|
if (!status.ok()) {
|
||||||
|
@ -472,24 +455,7 @@ int RocksDBPrimaryIndex::remove(transaction::Methods* trx,
|
||||||
auto key = RocksDBKey::PrimaryIndexValue(
|
auto key = RocksDBKey::PrimaryIndexValue(
|
||||||
_objectId, StringRef(slice.get(StaticStrings::KeyString)));
|
_objectId, StringRef(slice.get(StaticStrings::KeyString)));
|
||||||
|
|
||||||
if (useCache()) {
|
cacheBlackListKey(key.string().data(), static_cast<uint32_t>(key.string().size()));
|
||||||
TRI_ASSERT(_cache != nullptr);
|
|
||||||
// blacklist from cache
|
|
||||||
bool blacklisted = false;
|
|
||||||
uint64_t attempts = 0;
|
|
||||||
while (!blacklisted) {
|
|
||||||
blacklisted = _cache->blacklist(
|
|
||||||
key.string().data(), static_cast<uint32_t>(key.string().size()));
|
|
||||||
attempts++;
|
|
||||||
if (attempts > 10) {
|
|
||||||
if (_cache->isShutdown()) {
|
|
||||||
disableCache();
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
attempts = 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// acquire rocksdb transaction
|
// acquire rocksdb transaction
|
||||||
RocksDBTransactionState* state = rocksutils::toRocksTransactionState(trx);
|
RocksDBTransactionState* state = rocksutils::toRocksTransactionState(trx);
|
||||||
|
|
Loading…
Reference in New Issue