1
0
Fork 0

Added warmup function for indexes.

Squashed commit of the following:

commit bc0472d212b2efef0d7b1b8a36f2b43e8432ba86
Merge: eb47631 5f87674
Author: Michael Hackstein <michael@arangodb.com>
Date:   Mon May 29 09:39:31 2017 +0200

    Merge branch 'devel' of github.com:arangodb/arangodb into feature/rocks-index-warmup

commit eb476310b0ca6165be10b37b960933886c2757f5
Merge: d725f21 32149d2
Author: Michael Hackstein <michael@arangodb.com>
Date:   Mon May 29 09:00:41 2017 +0200

    Merge branch 'devel' of github.com:arangodb/arangodb into feature/rocks-index-warmup

commit d725f21f7d61f7c79ba31fc0338881e35f4d8f48
Author: Michael Hackstein <michael@arangodb.com>
Date:   Fri May 26 15:53:23 2017 +0200

    Activated collection warmup in Cluster Mode. Added a test for collection warmup (SingleServerOnly)

commit 132bf4f9565b0dcf4ec9f84e93897b482a67ce7f
Author: Michael Hackstein <michael@arangodb.com>
Date:   Fri May 26 12:46:20 2017 +0200

    Implemented client-side warmup. It can now be triggered via http and arangosh

commit 78ea449dff86118814a2f87bdb59dc16544d92b6
Author: Michael Hackstein <michael@arangodb.com>
Date:   Fri May 26 11:25:40 2017 +0200

    Fixed assertion.

commit dae80f6277dde1a52eadda506858cc36e235bd55
Author: Michael Hackstein <michael@arangodb.com>
Date:   Fri May 26 11:08:46 2017 +0200

    Improved the CuckooIndexEstimator computeEstimate function. It is now much better on collections with many different values

commit 7abf57876511ba369d7a577e1995d4575e98c7c8
Author: Michael Hackstein <michael@arangodb.com>
Date:   Fri May 26 11:06:43 2017 +0200

    Edge index warmup will now first check if a document is stored and if not will insert it. Furthermore it resizes the cache with an estimate of how many documents will be most likely inserted

commit 890d8ad4cdfd155249f060fedd5c798b9531d556
Author: Michael Hackstein <michael@arangodb.com>
Date:   Fri May 26 11:04:47 2017 +0200

    Adjusted thresholds in transactional cache. Wastly increased the amount of tries to get the lock. Lowered the fill-grade boundaries

commit 60972ab7151a3acb78e1aae4149de11d0da7aceb
Author: Michael Hackstein <michael@arangodb.com>
Date:   Fri May 26 10:45:38 2017 +0200

    Added new Logtpopic CACHE that should be used to log important information regarding caches, like debug ouput on evictions / resizing, or memory-pressure resizing going on

commit 2dfacb9aef6e3fde169032514baca386786d059c
Author: Michael Hackstein <michael@arangodb.com>
Date:   Fri May 26 10:44:21 2017 +0200

    Fixed an assertion in index iterator. It expected _from/_to to be cached but we modified to cache entire document

commit f05beccef65853c30eda1859a39c28ea2295bc71
Merge: 13c6abf 5c46430
Author: Michael Hackstein <michael@arangodb.com>
Date:   Fri May 26 07:56:58 2017 +0200

    Merge branch 'devel' of github.com:arangodb/arangodb into feature/rocks-index-warmup

commit 13c6abfbc2ddb451a689011110411df68247435e
Author: Michael Hackstein <michael@arangodb.com>
Date:   Wed May 24 09:52:01 2017 +0200

    Added a getExtra feature for the edgeIndex. It now returns the complete edge as extra (temporary). Modified the EdgeCache to cache token and edge document (temporary). Added a warmup Function to collections that is used to warmup the index caches on demand.
This commit is contained in:
Michael Hackstein 2017-05-29 09:40:58 +02:00
parent 5f87674b6f
commit b9d2faa7cc
22 changed files with 616 additions and 116 deletions

View File

@ -131,8 +131,8 @@ class Cache : public std::enable_shared_from_this<Cache> {
bool isShutdown();
protected:
static constexpr int64_t triesFast = 50;
static constexpr int64_t triesSlow = 10000;
static constexpr int64_t triesFast = 200000;
static constexpr int64_t triesSlow = 10000000;
static constexpr int64_t triesGuarantee = -1;
protected:
@ -157,7 +157,7 @@ class Cache : public std::enable_shared_from_this<Cache> {
std::atomic<uint64_t> _insertsTotal;
std::atomic<uint64_t> _insertEvictions;
static constexpr uint64_t _evictionMask = 1023; // check every 1024 insertions
static constexpr uint64_t _evictionThreshold = 32; // if more than 32
static constexpr uint64_t _evictionThreshold = 10; // if more than 10
// evictions in past 1024
// inserts, migrate

View File

@ -39,8 +39,8 @@ namespace cache {
////////////////////////////////////////////////////////////////////////////////
class Table : public std::enable_shared_from_this<Table> {
public:
static constexpr double idealLowerRatio = 0.05;
static constexpr double idealUpperRatio = 0.33;
static constexpr double idealLowerRatio = 0.04;
static constexpr double idealUpperRatio = 0.25;
static const uint32_t minLogSize;
static const uint32_t maxLogSize;
static constexpr uint32_t standardLogSizeAdjustment = 6;

View File

@ -98,8 +98,10 @@ bool TransactionalCache::insert(CachedValue* value) {
bool eviction = false;
if (candidate != nullptr) {
bucket->evict(candidate, true);
if (!candidate->sameKey(value->key(), value->keySize)) {
eviction = true;
}
freeValue(candidate);
eviction = true;
}
bucket->insert(hash, value);
inserted = true;

View File

@ -634,6 +634,50 @@ int revisionOnCoordinator(std::string const& dbname,
// the DBserver could have reported an error.
}
int warmupOnCoordinator(std::string const& dbname,
std::string const& cid) {
// Set a few variables needed for our work:
ClusterInfo* ci = ClusterInfo::instance();
auto cc = ClusterComm::instance();
if (cc == nullptr) {
// nullptr happens only during controlled shutdown
return TRI_ERROR_SHUTTING_DOWN;
}
// First determine the collection ID from the name:
std::shared_ptr<LogicalCollection> collinfo;
try {
collinfo = ci->getCollection(dbname, cid);
} catch (...) {
return TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND;
}
TRI_ASSERT(collinfo != nullptr);
// If we get here, the sharding attributes are not only _key, therefore
// we have to contact everybody:
auto shards = collinfo->shardIds();
CoordTransactionID coordTransactionID = TRI_NewTickServer();
for (auto const& p : *shards) {
auto headers =
std::make_unique<std::unordered_map<std::string, std::string>>();
cc->asyncRequest(
"", coordTransactionID, "shard:" + p.first,
arangodb::rest::RequestType::GET,
"/_db/" + StringUtils::urlEncode(dbname) + "/_api/collection/" +
StringUtils::urlEncode(p.first) + "/warmup",
std::shared_ptr<std::string const>(), headers, nullptr, 300.0);
}
// Now listen to the results:
// Well actually we don't care...
int count;
for (count = (int)shards->size(); count > 0; count--) {
auto res = cc->wait("", coordTransactionID, 0, "", 0.0);
}
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief returns figures for a sharded collection
////////////////////////////////////////////////////////////////////////////////

View File

@ -68,6 +68,13 @@ bool shardKeysChanged(std::string const& dbname, std::string const& collname,
int revisionOnCoordinator(std::string const& dbname,
std::string const& collname, TRI_voc_rid_t&);
////////////////////////////////////////////////////////////////////////////////
/// @brief Warmup index caches on Shards
////////////////////////////////////////////////////////////////////////////////
int warmupOnCoordinator(std::string const& dbname,
std::string const& cid);
////////////////////////////////////////////////////////////////////////////////
/// @brief returns figures for a sharded collection
////////////////////////////////////////////////////////////////////////////////

View File

@ -165,16 +165,26 @@ void SingleServerEdgeCursor::readAll(
auto& cursorSet = _cursors[_currentCursor];
for (auto& cursor : cursorSet) {
LogicalCollection* collection = cursor->collection();
auto cb = [&](DocumentIdentifierToken const& token) {
if (collection->readDocument(_trx, token, *_mmdr)) {
VPackSlice doc(_mmdr->vpack());
if (cursor->hasExtra()) {
auto cb = [&](DocumentIdentifierToken const& token, VPackSlice doc) {
std::string tmpId = _trx->extractIdString(doc);
StringRef edgeId = _opts->cache()->persistString(StringRef(tmpId));
_opts->cache()->insertDocument(edgeId, doc);
callback(edgeId, doc, cursorId);
}
};
cursor->all(cb);
};
cursor->allWithExtra(cb);
} else {
auto cb = [&](DocumentIdentifierToken const& token) {
if (collection->readDocument(_trx, token, *_mmdr)) {
VPackSlice doc(_mmdr->vpack());
std::string tmpId = _trx->extractIdString(doc);
StringRef edgeId = _opts->cache()->persistString(StringRef(tmpId));
_opts->cache()->insertDocument(edgeId, doc);
callback(edgeId, doc, cursorId);
}
};
cursor->all(cb);
}
}
}
}

View File

@ -836,6 +836,11 @@ void Index::expandInSearchValues(VPackSlice const base,
}
}
void Index::warmup(arangodb::transaction::Methods*) {
// Do nothing. If an index needs some warmup
// it has to explicitly implement it.
}
/// @brief append the index description to an output stream
std::ostream& operator<<(std::ostream& stream, arangodb::Index const* index) {
stream << index->context();

View File

@ -294,6 +294,8 @@ class Index {
virtual void expandInSearchValues(arangodb::velocypack::Slice const,
arangodb::velocypack::Builder&) const;
virtual void warmup(arangodb::transaction::Methods* trx);
protected:
static size_t sortWeight(arangodb::aql::AstNode const* node);

View File

@ -28,6 +28,7 @@
#include "Basics/StringRef.h"
#include "Logger/Logger.h"
#include "RocksDBEngine/RocksDBComparator.h"
#include "RocksDBEngine/RocksDBColumnFamily.h"
#include "RocksDBEngine/RocksDBEngine.h"
#include "RocksDBEngine/RocksDBKey.h"
#include "RocksDBEngine/RocksDBKeyBounds.h"
@ -276,7 +277,7 @@ std::vector<std::pair<RocksDBKey, RocksDBValue>> collectionKVPairs(
iterateBounds(bounds, [&rv](rocksdb::Iterator* it) {
rv.emplace_back(RocksDBKey(it->key()),
RocksDBValue(RocksDBEntryType::Collection, it->value()));
});
}, arangodb::RocksDBColumnFamily::other());
return rv;
}
@ -287,7 +288,7 @@ std::vector<std::pair<RocksDBKey, RocksDBValue>> viewKVPairs(
iterateBounds(bounds, [&rv](rocksdb::Iterator* it) {
rv.emplace_back(RocksDBKey(it->key()),
RocksDBValue(RocksDBEntryType::View, it->value()));
});
}, arangodb::RocksDBColumnFamily::other());
return rv;
}

View File

@ -128,10 +128,11 @@ std::vector<std::pair<RocksDBKey, RocksDBValue>> viewKVPairs(
template <typename T> // T is a invokeable that takes a rocksdb::Iterator*
void iterateBounds(
RocksDBKeyBounds const& bounds, T callback,
rocksdb::ColumnFamilyHandle* handle,
rocksdb::ReadOptions options = rocksdb::ReadOptions()) {
rocksdb::Slice const end = bounds.end();
options.iterate_upper_bound = &end;// save to use on rocksb::DB directly
std::unique_ptr<rocksdb::Iterator> it(globalRocksDB()->NewIterator(options));
std::unique_ptr<rocksdb::Iterator> it(globalRocksDB()->NewIterator(options, handle));
for (it->Seek(bounds.start()); it->Valid(); it->Next()) {
callback(it.get());
}

View File

@ -301,10 +301,18 @@ class RocksDBCuckooIndexEstimator {
// If we do not have any documents we have a rather constant estimate.
return 1;
}
// _nrUsed; These are known to be distinct values
// _nrCuckood; These are eventually distinct documents with unknown state
return (double)(_nrUsed + ((double)_nrCuckood * 3 * _nrUsed / _nrTotal)) /
_nrTotal;
double total = 0;
for (uint32_t b = 0; b < _size; ++b) {
for (size_t i = 0; i < SlotsPerBucket; ++i) {
uint32_t* c = findCounter(b, i);
total += *c;
}
}
if (total == 0) {
return 1;
}
return _nrUsed / total;
}
bool lookup(Key const& k) const {

View File

@ -19,6 +19,7 @@
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Simon Grätzer
/// @author Michael Hackstein
////////////////////////////////////////////////////////////////////////////////
#include "RocksDBEdgeIndex.h"
@ -71,12 +72,7 @@ RocksDBEdgeIndexIterator::RocksDBEdgeIndexIterator(
rocksutils::toRocksMethods(trx)->NewIterator(index->columnFamily())),
_bounds(RocksDBKeyBounds::EdgeIndex(0)),
_cache(cache),
_posInMemory(0),
_memSize(26),
_inplaceMemory(nullptr) {
// We allocate enough memory for 25 elements + 1 size.
// Maybe adjust this.
_inplaceMemory = new uint64_t[_memSize];
_builderIterator(arangodb::basics::VelocyPackHelper::EmptyArrayValue()) {
keys.release(); // now we have ownership for _keys
TRI_ASSERT(_keys != nullptr);
TRI_ASSERT(_keys->slice().isArray());
@ -84,7 +80,6 @@ RocksDBEdgeIndexIterator::RocksDBEdgeIndexIterator(
}
RocksDBEdgeIndexIterator::~RocksDBEdgeIndexIterator() {
delete[] _inplaceMemory;
if (_keys != nullptr) {
// return the VPackBuilder to the transaction context
_trx->transactionContextPtr()->returnBuilder(_keys.release());
@ -92,6 +87,7 @@ RocksDBEdgeIndexIterator::~RocksDBEdgeIndexIterator() {
}
void RocksDBEdgeIndexIterator::resizeMemory() {
/*
// Increase size by factor of two.
// TODO Adjust this has potential to kill memory...
uint64_t* tmp = new uint64_t[_memSize * 2];
@ -99,9 +95,11 @@ void RocksDBEdgeIndexIterator::resizeMemory() {
_memSize *= 2;
delete[] _inplaceMemory;
_inplaceMemory = tmp;
*/
}
void RocksDBEdgeIndexIterator::reserveInplaceMemory(uint64_t count) {
/*
// NOTE: count the number of cached edges, 1 is the size
if (count + 1 > _memSize) {
// In this case the current memory is too small.
@ -111,26 +109,21 @@ void RocksDBEdgeIndexIterator::reserveInplaceMemory(uint64_t count) {
_memSize = count + 1;
}
// else NOOP, we have enough memory to write to
*/
}
uint64_t RocksDBEdgeIndexIterator::valueLength() const {
return *_inplaceMemory;
return 0;
// return *_inplaceMemory;
}
void RocksDBEdgeIndexIterator::resetInplaceMemory() {
// It is sufficient to only set the first element
// We always have to make sure to only access elements
// at a position lower than valueLength()
*_inplaceMemory = 0;
// Position 0 points to the size.
// This is defined as the invalid position
_posInMemory = 0;
}
void RocksDBEdgeIndexIterator::resetInplaceMemory() { _builder.clear(); }
void RocksDBEdgeIndexIterator::reset() {
resetInplaceMemory();
_keysIterator.reset();
_builderIterator =
VPackArrayIterator(arangodb::basics::VelocyPackHelper::EmptyArrayValue());
}
bool RocksDBEdgeIndexIterator::next(TokenCallback const& cb, size_t limit) {
@ -146,20 +139,21 @@ bool RocksDBEdgeIndexIterator::next(TokenCallback const& cb, size_t limit) {
#endif
while (limit > 0) {
if (_posInMemory > 0) {
while (_builderIterator.valid()) {
// We still have unreturned edges in out memory.
// Just plainly return those.
size_t atMost = (std::min)(static_cast<uint64_t>(limit),
valueLength() + 1 /*size*/ - _posInMemory);
for (size_t i = 0; i < atMost; ++i) {
cb(RocksDBToken{*(_inplaceMemory + _posInMemory++)});
}
limit -= atMost;
if (_posInMemory == valueLength() + 1) {
// We have returned everthing we had in local buffer, reset it
resetInplaceMemory();
} else {
TRI_ASSERT(limit == 0);
TRI_ASSERT(_builderIterator.value().isNumber());
cb(RocksDBToken{_builderIterator.value().getNumericValue<uint64_t>()});
limit--;
// Twice advance the iterator
_builderIterator.next();
// We always have <revision,_from> pairs
TRI_ASSERT(_builderIterator.valid());
_builderIterator.next();
if (limit == 0) {
// Limit reached bail out
return true;
}
}
@ -185,24 +179,32 @@ bool RocksDBEdgeIndexIterator::next(TokenCallback const& cb, size_t limit) {
if (finding.found()) {
needRocksLookup = false;
// We got sth. in the cache
uint64_t* cachedData = (uint64_t*)finding.value()->value();
uint64_t cachedLength = *cachedData;
if (cachedLength < limit) {
// Save copies, just return it.
for (uint64_t i = 0; i < cachedLength; ++i) {
cb(RocksDBToken{*(cachedData + 1 + i)});
VPackSlice cachedData(finding.value()->value());
TRI_ASSERT(cachedData.isArray());
if (cachedData.length() / 2 < limit) {
// Directly return it, no need to copy
_builderIterator = VPackArrayIterator(cachedData);
while (_builderIterator.valid()) {
TRI_ASSERT(_builderIterator.value().isNumber());
cb(RocksDBToken{
_builderIterator.value().getNumericValue<uint64_t>()});
limit--;
// Twice advance the iterator
_builderIterator.next();
// We always have <revision,_from> pairs
TRI_ASSERT(_builderIterator.valid());
_builderIterator.next();
}
limit -= cachedLength;
_builderIterator = VPackArrayIterator(
arangodb::basics::VelocyPackHelper::EmptyArrayValue());
} else {
// We need to copy it.
// And then we just get back to beginning of the loop
reserveInplaceMemory(cachedLength);
// It is now guaranteed that memcpy will succeed
std::memcpy(_inplaceMemory, cachedData,
(cachedLength + 1 /*size*/) * sizeof(uint64_t));
TRI_ASSERT(valueLength() == cachedLength);
// Set to first document.
_posInMemory = 1;
_builder.clear();
_builder.add(cachedData);
TRI_ASSERT(_builder.slice().isArray());
_builderIterator = VPackArrayIterator(_builder.slice());
// Do not set limit
}
}
@ -212,13 +214,109 @@ bool RocksDBEdgeIndexIterator::next(TokenCallback const& cb, size_t limit) {
lookupInRocksDB(fromTo);
}
// We cannot be more advanced here
TRI_ASSERT(_posInMemory == 1 || _posInMemory == 0);
_keysIterator.next();
}
TRI_ASSERT(limit == 0);
return _builderIterator.valid() || _keysIterator.valid();
}
bool RocksDBEdgeIndexIterator::nextExtra(ExtraCallback const& cb,
size_t limit) {
TRI_ASSERT(_trx->state()->isRunning());
#ifdef USE_MAINTAINER_MODE
TRI_ASSERT(limit > 0); // Someone called with limit == 0. Api broken
#else
// Gracefully return in production code
// Nothing bad has happened
if (limit == 0) {
return false;
}
#endif
while (limit > 0) {
while (_builderIterator.valid()) {
// We still have unreturned edges in out memory.
// Just plainly return those.
TRI_ASSERT(_builderIterator.value().isNumber());
RocksDBToken tkn{_builderIterator.value().getNumericValue<uint64_t>()};
_builderIterator.next();
TRI_ASSERT(_builderIterator.valid());
// For now we store complete edges.
TRI_ASSERT(_builderIterator.value().isObject());
cb(tkn, _builderIterator.value());
_builderIterator.next();
limit--;
if (limit == 0) {
// Limit reached bail out
return true;
}
}
if (!_keysIterator.valid()) {
// We are done iterating
return false;
}
// We have exhausted local memory.
// Now fill it again:
VPackSlice fromToSlice = _keysIterator.value();
if (fromToSlice.isObject()) {
fromToSlice = fromToSlice.get(StaticStrings::IndexEq);
}
TRI_ASSERT(fromToSlice.isString());
StringRef fromTo(fromToSlice);
bool needRocksLookup = true;
if (_cache != nullptr) {
// Try to read from cache
auto finding = _cache->find(fromTo.data(), (uint32_t)fromTo.size());
if (finding.found()) {
needRocksLookup = false;
// We got sth. in the cache
VPackSlice cachedData(finding.value()->value());
TRI_ASSERT(cachedData.isArray());
if (cachedData.length() / 2 < limit) {
// Directly return it, no need to copy
_builderIterator = VPackArrayIterator(cachedData);
while (_builderIterator.valid()) {
TRI_ASSERT(_builderIterator.value().isNumber());
RocksDBToken tkn{
_builderIterator.value().getNumericValue<uint64_t>()};
_builderIterator.next();
TRI_ASSERT(_builderIterator.valid());
TRI_ASSERT(_builderIterator.value().isObject());
cb(tkn, _builderIterator.value());
_builderIterator.next();
limit--;
}
_builderIterator = VPackArrayIterator(
arangodb::basics::VelocyPackHelper::EmptyArrayValue());
} else {
_copyCounter++;
// We need to copy it.
// And then we just get back to beginning of the loop
_builder.clear();
_builder.add(cachedData);
TRI_ASSERT(_builder.slice().isArray());
_builderIterator = VPackArrayIterator(_builder.slice());
// Do not set limit
}
}
}
if (needRocksLookup) {
lookupInRocksDB(fromTo);
}
_keysIterator.next();
}
TRI_ASSERT(limit == 0);
return (_posInMemory != 0) || _keysIterator.valid();
return _builderIterator.valid() || _keysIterator.valid();
}
void RocksDBEdgeIndexIterator::lookupInRocksDB(StringRef fromTo) {
@ -226,10 +324,10 @@ void RocksDBEdgeIndexIterator::lookupInRocksDB(StringRef fromTo) {
_bounds = RocksDBKeyBounds::EdgeIndexVertex(_index->_objectId, fromTo);
_iterator->Seek(_bounds.start());
resetInplaceMemory();
_posInMemory = 1;
RocksDBCollection* rocksColl = toRocksDBCollection(_collection);
rocksdb::Comparator const* cmp = _index->comparator();
_builder.openArray();
RocksDBToken token;
auto end = _bounds.end();
while (_iterator->Valid() &&
@ -237,11 +335,16 @@ void RocksDBEdgeIndexIterator::lookupInRocksDB(StringRef fromTo) {
StringRef edgeKey = RocksDBKey::primaryKey(_iterator->key());
Result res = rocksColl->lookupDocumentToken(_trx, edgeKey, token);
if (res.ok()) {
*(_inplaceMemory + _posInMemory) = token.revisionId();
_posInMemory++;
(*_inplaceMemory)++; // Increase size
if (_posInMemory == _memSize) {
resizeMemory();
ManagedDocumentResult mmdr;
if (rocksColl->readDocument(_trx, token, mmdr)) {
_builder.add(VPackValue(token.revisionId()));
VPackSlice doc(mmdr.vpack());
TRI_ASSERT(doc.isObject());
_builder.add(doc);
} else {
// Data Inconsistency.
// We have a revision id without a document...
TRI_ASSERT(false);
}
#ifdef USE_MAINTAINER_MODE
} else {
@ -252,19 +355,24 @@ void RocksDBEdgeIndexIterator::lookupInRocksDB(StringRef fromTo) {
}
_iterator->Next();
}
_builder.close();
if (_cache != nullptr) {
// TODO Add cache retry on next call
// Now we have something in _inplaceMemory.
// It may be an empty array or a filled one, never mind, we cache both
auto entry = cache::CachedValue::construct(
fromTo.data(), static_cast<uint32_t>(fromTo.size()), _inplaceMemory,
sizeof(uint64_t) * (valueLength() + 1 /*size*/));
fromTo.data(), static_cast<uint32_t>(fromTo.size()),
_builder.slice().start(),
static_cast<uint64_t>(_builder.slice().byteSize()));
bool cached = _cache->insert(entry);
if (!cached) {
LOG_TOPIC(ERR, arangodb::Logger::FIXME) << "Failed to cache: "
<< fromTo.toString();
delete entry;
}
}
_posInMemory = 1;
TRI_ASSERT(_builder.slice().isArray());
_builderIterator = VPackArrayIterator(_builder.slice());
}
// ============================= Index ====================================
@ -535,6 +643,106 @@ void RocksDBEdgeIndex::expandInSearchValues(VPackSlice const slice,
builder.close();
}
void RocksDBEdgeIndex::warmup(arangodb::transaction::Methods* trx) {
if (_cache == nullptr) {
return;
}
auto rocksColl = toRocksDBCollection(_collection);
uint64_t expectedCount = static_cast<uint64_t>(selectivityEstimate() * rocksColl->numberDocuments());
// Prepare the cache to be resized for this amount of objects to be inserted.
_cache->sizeHint(expectedCount);
auto bounds = RocksDBKeyBounds::EdgeIndex(_objectId);
std::string previous = "";
VPackBuilder builder;
ManagedDocumentResult mmdr;
RocksDBToken token;
bool needsInsert = false;
rocksutils::iterateBounds(bounds, [&](rocksdb::Iterator* it) {
auto key = it->key();
StringRef v = RocksDBKey::vertexId(key);
if (previous.empty()) {
// First call.
builder.clear();
previous = v.toString();
auto finding = _cache->find(previous.data(), (uint32_t)previous.size());
if (finding.found()) {
needsInsert = false;
} else {
needsInsert = true;
builder.openArray();
}
}
if (v != previous) {
if (needsInsert) {
// Switch to next vertex id.
// Store what we have.
builder.close();
while(_cache->isResizing() || _cache->isMigrating()) {
// We should wait here, the cache will reject
// any inserts anyways.
usleep(10000);
}
auto entry = cache::CachedValue::construct(
previous.data(), static_cast<uint32_t>(previous.size()),
builder.slice().start(),
static_cast<uint64_t>(builder.slice().byteSize()));
if (!_cache->insert(entry)) {
delete entry;
}
builder.clear();
}
// Need to store
previous = v.toString();
auto finding = _cache->find(previous.data(), (uint32_t)previous.size());
if (finding.found()) {
needsInsert = false;
} else {
needsInsert = true;
builder.openArray();
}
}
if (needsInsert) {
StringRef edgeKey = RocksDBKey::primaryKey(key);
Result res = rocksColl->lookupDocumentToken(trx, edgeKey, token);
if (res.ok() && rocksColl->readDocument(trx, token, mmdr)) {
builder.add(VPackValue(token.revisionId()));
VPackSlice doc(mmdr.vpack());
TRI_ASSERT(doc.isObject());
builder.add(doc);
#ifdef USE_MAINTAINER_MODE
} else {
// Data Inconsistency.
// We have a revision id without a document...
TRI_ASSERT(false);
#endif
}
}
}, RocksDBColumnFamily::edge());
if (!previous.empty() && needsInsert) {
// We still have something to store
builder.close();
auto entry = cache::CachedValue::construct(
previous.data(), static_cast<uint32_t>(previous.size()),
builder.slice().start(),
static_cast<uint64_t>(builder.slice().byteSize()));
if (!_cache->insert(entry)) {
delete entry;
}
}
}
// ===================== Helpers ==================
/// @brief create the iterator
@ -642,7 +850,7 @@ void RocksDBEdgeIndex::recalculateEstimates() {
rocksutils::iterateBounds(bounds, [&](rocksdb::Iterator* it) {
uint64_t hash = RocksDBEdgeIndex::HashForKey(it->key());
_estimator->insert(hash);
});
}, arangodb::RocksDBColumnFamily::edge());
}
Result RocksDBEdgeIndex::postprocessRemove(transaction::Methods* trx,

View File

@ -55,7 +55,9 @@ class RocksDBEdgeIndexIterator final : public IndexIterator {
std::unique_ptr<VPackBuilder>& keys, cache::Cache*);
~RocksDBEdgeIndexIterator();
char const* typeName() const override { return "edge-index-iterator"; }
bool hasExtra() const override { return true; }
bool next(TokenCallback const& cb, size_t limit) override;
bool nextExtra(ExtraCallback const& cb, size_t limit) override;
void reset() override;
private:
@ -75,9 +77,10 @@ class RocksDBEdgeIndexIterator final : public IndexIterator {
std::unique_ptr<rocksdb::Iterator> _iterator; // iterator position in rocksdb
RocksDBKeyBounds _bounds;
cache::Cache* _cache;
uint64_t _posInMemory;
uint64_t _memSize;
uint64_t* _inplaceMemory;
arangodb::velocypack::ArrayIterator _builderIterator;
arangodb::velocypack::Builder _builder;
size_t _copyCounter;
size_t _lookupCounter;
};
class RocksDBEdgeIndex final : public RocksDBIndex {
@ -153,6 +156,10 @@ class RocksDBEdgeIndex final : public RocksDBIndex {
/// entries.
void expandInSearchValues(arangodb::velocypack::Slice const,
arangodb::velocypack::Builder&) const override;
/// @brief Warmup the index caches.
void warmup(arangodb::transaction::Methods* trx) override;
int cleanup() override;
void serializeEstimate(std::string& output) const override;
@ -161,6 +168,7 @@ class RocksDBEdgeIndex final : public RocksDBIndex {
void recalculateEstimates() override;
private:
/// @brief create the iterator
IndexIterator* createEqIterator(transaction::Methods*, ManagedDocumentResult*,

View File

@ -29,6 +29,7 @@
#include "Basics/StaticStrings.h"
#include "Basics/VelocyPackHelper.h"
#include "RocksDBEngine/RocksDBCollection.h"
#include "RocksDBEngine/RocksDBColumnFamily.h"
#include "RocksDBEngine/RocksDBCommon.h"
#include "RocksDBEngine/RocksDBComparator.h"
#include "RocksDBEngine/RocksDBCounterManager.h"
@ -1512,7 +1513,7 @@ void RocksDBVPackIndex::recalculateEstimates() {
rocksutils::iterateBounds(bounds, [&](rocksdb::Iterator* it) {
uint64_t hash = RocksDBVPackIndex::HashForKey(it->key());
_estimator->insert(hash);
});
}, arangodb::RocksDBColumnFamily::index());
}
Result RocksDBVPackIndex::postprocessRemove(transaction::Methods* trx,

View File

@ -22,9 +22,9 @@
////////////////////////////////////////////////////////////////////////////////
#include "OperationCursor.h"
#include "OperationResult.h"
#include "Basics/Exceptions.h"
#include "Logger/Logger.h"
#include "OperationResult.h"
#include "VocBase/LogicalCollection.h"
#include "VocBase/ManagedDocumentResult.h"
@ -42,6 +42,8 @@ bool OperationCursor::hasMore() {
return _hasMore;
}
bool OperationCursor::hasExtra() const { return indexIterator()->hasExtra(); }
void OperationCursor::reset() {
code = TRI_ERROR_NO_ERROR;
@ -92,6 +94,52 @@ bool OperationCursor::next(IndexIterator::TokenCallback const& callback, uint64_
return _hasMore;
}
//////////////////////////////////////////////////////////////////////////////
/// @brief Calls cb for the next batchSize many elements
/// Uses the getExtra feature of indexes. Can only be called on those
/// who support it.
/// NOTE: This will throw on OUT_OF_MEMORY
//////////////////////////////////////////////////////////////////////////////
bool OperationCursor::nextWithExtra(IndexIterator::ExtraCallback const& callback, uint64_t batchSize) {
TRI_ASSERT(hasExtra());
if (!hasMore()) {
return false;
}
if (batchSize == UINT64_MAX) {
batchSize = _batchSize;
}
size_t atMost = static_cast<size_t>(batchSize > _limit ? _limit : batchSize);
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
// We add wrapper around Callback that validates that
// the callback has been called at least once.
bool called = false;
auto cb = [&](DocumentIdentifierToken const& token, VPackSlice extra) {
called = true;
callback(token, extra);
};
_hasMore = _indexIterator->nextExtra(cb, atMost);
if (_hasMore) {
// If the index says it has more elements than it need
// to call callback at least once.
// Otherweise progress is not guaranteed.
TRI_ASSERT(called);
}
#else
_hasMore = _indexIterator->nextExtra(callback, atMost);
#endif
if (_hasMore) {
// We got atMost many callbacks
TRI_ASSERT(_limit >= atMost);
_limit -= atMost;
}
return _hasMore;
}
/// @brief Skip the next toSkip many elements.
/// skipped will be increased by the amount of skipped elements afterwards
/// Check hasMore()==true before using this

View File

@ -41,17 +41,15 @@ class LogicalCollection;
struct OperationResult;
struct OperationCursor {
public:
int code;
int code;
private:
std::unique_ptr<IndexIterator> _indexIterator;
bool _hasMore;
uint64_t _limit;
uint64_t const _originalLimit;
uint64_t const _batchSize;
bool _hasMore;
uint64_t _limit;
uint64_t const _originalLimit;
uint64_t const _batchSize;
public:
explicit OperationCursor(int code)
@ -62,7 +60,7 @@ struct OperationCursor {
_batchSize(1000) {}
OperationCursor(IndexIterator* iterator, uint64_t limit, uint64_t batchSize)
: code(TRI_ERROR_NO_ERROR),
: code(TRI_ERROR_NO_ERROR),
_indexIterator(iterator),
_hasMore(true),
_limit(limit), // _limit is modified later on
@ -91,6 +89,8 @@ struct OperationCursor {
return !successful();
}
bool hasExtra() const;
/// @brief Reset the cursor
void reset();
@ -98,11 +98,20 @@ struct OperationCursor {
bool next(IndexIterator::TokenCallback const& callback,
uint64_t batchSize);
/// @brief Calls cb for the next batchSize many elements
bool nextWithExtra(IndexIterator::ExtraCallback const& callback,
uint64_t batchSize);
/// @brief convenience function to retrieve all results
void all(IndexIterator::TokenCallback const& callback) {
while (next(callback, 1000)) {}
}
/// @brief convenience function to retrieve all results with extra
void allWithExtra(IndexIterator::ExtraCallback const& callback) {
while (nextWithExtra(callback, 1000)) {}
}
/// @brief Skip the next toSkip many elements.
/// skipped will be increased by the amount of skipped elements afterwards
/// Check hasMore()==true before using this

View File

@ -37,6 +37,7 @@
#include "Basics/conversions.h"
#include "Cluster/ClusterInfo.h"
#include "Cluster/ClusterMethods.h"
#include "Indexes/Index.h"
#include "Cluster/FollowerInfo.h"
#include "Pregel/AggregatorHandler.h"
#include "Pregel/Conductor.h"
@ -3168,6 +3169,62 @@ static void JS_CountVocbaseCol(
TRI_V8_TRY_CATCH_END
}
// .............................................................................
// Warmup Index caches
// .............................................................................
static void JS_WarmupVocbaseCol(
v8::FunctionCallbackInfo<v8::Value> const& args) {
TRI_V8_TRY_CATCH_BEGIN(isolate);
v8::HandleScope scope(isolate);
arangodb::LogicalCollection* collection =
TRI_UnwrapClass<arangodb::LogicalCollection>(args.Holder(),
WRP_VOCBASE_COL_TYPE);
if (ServerState::instance()->isCoordinator()) {
std::string const databaseName(collection->dbName());
std::string const cid = collection->cid_as_string();
int res = warmupOnCoordinator(databaseName, cid);
if (res != TRI_ERROR_NO_ERROR) {
TRI_V8_THROW_EXCEPTION(res);
}
TRI_V8_RETURN_UNDEFINED();
}
if (collection == nullptr) {
TRI_V8_THROW_EXCEPTION_INTERNAL("cannot extract collection");
}
SingleCollectionTransaction trx(
transaction::V8Context::Create(collection->vocbase(), true),
collection->cid(),
AccessMode::Type::READ);
Result trxRes = trx.begin();
if (!trxRes.ok()) {
TRI_V8_THROW_EXCEPTION(trxRes);
}
auto idxs = collection->getIndexes();
for (auto& idx : idxs) {
// multi threading?
idx->warmup(&trx);
}
trxRes = trx.commit();
if (!trxRes.ok()) {
TRI_V8_THROW_EXCEPTION(trxRes);
}
TRI_V8_RETURN_UNDEFINED();
TRI_V8_TRY_CATCH_END
}
// .............................................................................
// generate the arangodb::LogicalCollection template
// .............................................................................
@ -3281,6 +3338,8 @@ void TRI_InitV8Collections(v8::Handle<v8::Context> context,
JS_UpdateVocbaseCol);
TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING("version"),
JS_VersionVocbaseCol);
TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING("warmup"),
JS_WarmupVocbaseCol);
TRI_InitV8IndexCollection(isolate, rt);

View File

@ -467,6 +467,25 @@ function put_api_collection_load (req, res, collection) {
}
}
// //////////////////////////////////////////////////////////////////////////////
// / @brief was docuBlock JSF_put_api_collection_warmup
// //////////////////////////////////////////////////////////////////////////////
function put_api_collection_warmup (req, res, collection) {
try {
// Warmup the indexes
collection.warmup();
var result = collectionRepresentation(collection);
actions.resultOk(req, res, actions.HTTP_OK, result);
} catch (err) {
actions.resultException(req, res, err, undefined, false);
}
}
// //////////////////////////////////////////////////////////////////////////////
// / @brief was docuBlock JSF_put_api_collection_unload
// //////////////////////////////////////////////////////////////////////////////
@ -613,6 +632,8 @@ function put_api_collection (req, res) {
put_api_collection_rename(req, res, collection);
} else if (sub === 'rotate') {
put_api_collection_rotate(req, res, collection);
} else if (sub === 'warmup') {
put_api_collection_warmup(req, res, collection);
} else {
actions.resultNotFound(req, res, arangodb.ERROR_HTTP_NOT_FOUND,
"expecting one of the actions 'load', 'unload',"
@ -658,6 +679,8 @@ function delete_api_collection (req, res) {
}
}
// //////////////////////////////////////////////////////////////////////////////
// / @brief handles a collection request
// //////////////////////////////////////////////////////////////////////////////

View File

@ -1396,3 +1396,15 @@ ArangoCollection.prototype.removeByKeys = function (keys) {
ignored: requestResult.ignored
};
};
// //////////////////////////////////////////////////////////////////////////////
// / @brief warmup indexes of a collection
// //////////////////////////////////////////////////////////////////////////////
ArangoCollection.prototype.warmup = function () {
this._status = null;
var requestResult = this._database._connection.PUT(this._baseurl('warmup'), '');
this._status = null;
arangosh.checkRequestResult(requestResult);
};

View File

@ -54,8 +54,8 @@ function verifyMemory(index){
function verifyCache(index){
if (index.type !== 'primary') {
expect(index.figures.cacheInUse).to.be.true;
expect(index.figures).to.be.ok;
expect(index.figures.cacheInUse).to.be.true;
expect(index.figures.cacheSize).to.be.a('number');
expect(index.figures.cacheLifeTimeHitRate).to.be.a('number');
expect(index.figures.cacheWindowedHitRate).to.be.a('number');
@ -76,39 +76,89 @@ describe('Index figures', function () {
}
});
// edge index ///////////////////////////////////////////////////////////
describe('primar/edge index', function () {
var col;
before('create collection',function(){
col = db._createEdgeCollection("UnitTestEdgar");
for(var i = 0; i < 100; i++){
col.insert({"_from":"source/1","_to":"sink/"+i});
}
const colName = 'UnitTestEdges';
let col;
before('create collection', function(){
db._drop(colName);
col = db._createEdgeCollection(colName);
});
it('verify index types', function() {
var indexes = col.getIndexes(true);
expect(indexes.length).to.be.equal(2);
expect(indexes[0].type).to.be.equal("primary");
expect(indexes[1].type).to.be.equal("edge");
after(function() {
db._drop(colName);
});
it('verify index - memory', function() {
var indexes = col.getIndexes(true);
indexes.forEach((i) => {
verifyMemory(i);
describe('verify statistics', function() {
before('insert documents', function() {
for (let i = 0; i < 100; i++) {
col.insert({"_from": "source/1", "_to": "sink/" + i});
}
});
});
it('verify figures - cache', function() {
if(!isRocksDB){
this.skip();
}
var indexes = col.getIndexes(true);
indexes.forEach((i) => {
verifyCache(i);
it('index types', function() {
var indexes = col.getIndexes(true);
expect(indexes.length).to.be.equal(2);
expect(indexes[0].type).to.be.equal("primary");
expect(indexes[1].type).to.be.equal("edge");
});
it('index - memory', function() {
var indexes = col.getIndexes(true);
indexes.forEach((i) => {
verifyMemory(i);
});
});
it('figures - cache', function() {
if(!isRocksDB){
this.skip();
}
var indexes = col.getIndexes(true);
indexes.forEach((i) => {
verifyCache(i);
});
});
});
after(function(){
db._drop(col);
describe('warmup', function() {
before('insert document', function() {
// We insert enough documents to trigger resizing
// of initial cache size.
for (let i = 0; i < 23000; i++) {
col.insert({"_from": "source/" + i, "_to": "sink/" + i});
}
});
it('should fill the cache', function() {
if(!isRocksDB){
this.skip();
}
// The cache does not expose fillgrades an such.
// We can only check memory consumption...
let indexes = col.getIndexes(true);
let edgeIndex = indexes[1];
expect(edgeIndex.type).to.be.equal('edge');
expect(edgeIndex.figures).to.be.ok;
expect(edgeIndex.figures.cacheSize).to.be.a('number');
let oldSize = edgeIndex.figures.cacheSize;
col.warmup();
// Test if the memory consumption goes up
let indexes2 = col.getIndexes(true);
let edgeIndex2 = indexes2[1];
expect(edgeIndex2.type).to.be.equal('edge');
expect(edgeIndex2.figures.cacheSize).to.be.above(oldSize);
});
});
}); // end edge index
// hash index ///////////////////////////////////////////////////////////

View File

@ -41,6 +41,7 @@ std::map<std::string, LogTopic*> LogTopic::_names;
LogTopic Logger::AGENCY("agency", LogLevel::INFO);
LogTopic Logger::AGENCYCOMM("agencycomm", LogLevel::INFO);
LogTopic Logger::CACHE("cache", LogLevel::INFO);
LogTopic Logger::CLUSTER("cluster", LogLevel::INFO);
LogTopic Logger::COLLECTOR("collector");
LogTopic Logger::COMMUNICATION("communication", LogLevel::INFO);

View File

@ -127,6 +127,7 @@ class Logger {
public:
static LogTopic AGENCY;
static LogTopic AGENCYCOMM;
static LogTopic CACHE;
static LogTopic CLUSTER;
static LogTopic COLLECTOR;
static LogTopic COMMUNICATION;