1
0
Fork 0

adding load and unload

This commit is contained in:
Simon Grätzer 2017-06-02 14:44:38 +02:00
parent 51ee57a38f
commit 634254b80f
16 changed files with 105 additions and 18 deletions

View File

@ -252,6 +252,7 @@ class Index {
std::vector<std::pair<TRI_voc_rid_t, arangodb::velocypack::Slice>> const&,
std::shared_ptr<arangodb::basics::LocalTaskQueue> queue);
virtual int load() = 0;
virtual int unload() = 0;
// a garbage collection function for the index

View File

@ -164,6 +164,8 @@ class MMFilesCollection final : public PhysicalCollection {
/// @brief closes an open collection
int close() override;
void load() override {}
void unload() override {}
/// @brief rotate the active journal - will do nothing if there is no journal
int rotateActiveJournal();

View File

@ -142,6 +142,7 @@ class MMFilesGeoIndex final : public Index {
int remove(transaction::Methods*, TRI_voc_rid_t,
arangodb::velocypack::Slice const&, bool isRollback) override;
int load() override {}
int unload() override;
/// @brief looks up all points within a given radius

View File

@ -61,6 +61,7 @@ class MMFilesPathBasedIndex : public Index {
}
bool implicitlyUnique() const override;
int load() override {}
protected:
/// @brief helper function to insert a document into any index type

View File

@ -157,6 +157,7 @@ class MMFilesPrimaryIndex final : public Index {
int remove(transaction::Methods*, TRI_voc_rid_t,
arangodb::velocypack::Slice const&, bool isRollback) override;
int load() override {}
int unload() override;
MMFilesSimpleIndexElement lookupKey(transaction::Methods*,

View File

@ -170,6 +170,20 @@ int RocksDBCollection::close() {
return TRI_ERROR_NO_ERROR;
}
void RocksDBCollection::load() {
READ_LOCKER(guard, _indexesLock);
for (auto it : _indexes) {
it->load();
}
}
void RocksDBCollection::unload() {
READ_LOCKER(guard, _indexesLock);
for (auto it : _indexes) {
it->unload();
}
}
TRI_voc_rid_t RocksDBCollection::revision() const { return _revisionId; }
TRI_voc_rid_t RocksDBCollection::revision(transaction::Methods* trx) const {

View File

@ -77,7 +77,9 @@ class RocksDBCollection final : public PhysicalCollection {
/// @brief closes an open collection
int close() override;
void load() override;
void unload() override;
TRI_voc_rid_t revision() const;
TRI_voc_rid_t revision(arangodb::transaction::Methods* trx) const override;
uint64_t numberDocuments() const;

View File

@ -63,7 +63,7 @@ using namespace arangodb::basics;
RocksDBEdgeIndexIterator::RocksDBEdgeIndexIterator(
LogicalCollection* collection, transaction::Methods* trx,
ManagedDocumentResult* mmdr, arangodb::RocksDBEdgeIndex const* index,
std::unique_ptr<VPackBuilder>& keys, cache::Cache* cache)
std::unique_ptr<VPackBuilder>& keys, std::shared_ptr<cache::Cache> cache)
: IndexIterator(collection, trx, mmdr, index),
_keys(keys.get()),
_keysIterator(_keys->slice()),
@ -173,7 +173,7 @@ bool RocksDBEdgeIndexIterator::next(TokenCallback const& cb, size_t limit) {
StringRef fromTo(fromToSlice);
bool needRocksLookup = true;
if (_cache != nullptr) {
if (_cache) {
// Try to read from cache
auto finding = _cache->find(fromTo.data(), (uint32_t)fromTo.size());
if (finding.found()) {
@ -270,7 +270,7 @@ bool RocksDBEdgeIndexIterator::nextExtra(ExtraCallback const& cb,
StringRef fromTo(fromToSlice);
bool needRocksLookup = true;
if (_cache != nullptr) {
if (_cache) {
// Try to read from cache
auto finding = _cache->find(fromTo.data(), (uint32_t)fromTo.size());
if (finding.found()) {
@ -326,6 +326,7 @@ void RocksDBEdgeIndexIterator::lookupInRocksDB(StringRef fromTo) {
resetInplaceMemory();
rocksdb::Comparator const* cmp = _index->comparator();
cache::Cache *cc = _cache.get();
_builder.openArray();
auto end = _bounds.end();
while (_iterator->Valid() && (cmp->Compare(_iterator->key(), end) < 0)) {
@ -340,7 +341,7 @@ void RocksDBEdgeIndexIterator::lookupInRocksDB(StringRef fromTo) {
_iterator->Next();
}
_builder.close();
if (_cache != nullptr) {
if (cc != nullptr) {
// TODO Add cache retry on next call
// Now we have something in _inplaceMemory.
// It may be an empty array or a filled one, never mind, we cache both
@ -348,7 +349,7 @@ void RocksDBEdgeIndexIterator::lookupInRocksDB(StringRef fromTo) {
fromTo.data(), static_cast<uint32_t>(fromTo.size()),
_builder.slice().start(),
static_cast<uint64_t>(_builder.slice().byteSize()));
bool cached = _cache->insert(entry);
bool cached = cc->insert(entry);
if (!cached) {
LOG_TOPIC(DEBUG, arangodb::Logger::CACHE) << "Failed to cache: "
<< fromTo.toString();
@ -633,7 +634,7 @@ void RocksDBEdgeIndex::expandInSearchValues(VPackSlice const slice,
}
void RocksDBEdgeIndex::warmup(arangodb::transaction::Methods* trx) {
if (_cache == nullptr) {
if (!_useCache || !_cache) {
return;
}
auto rocksColl = toRocksDBCollection(_collection);
@ -649,6 +650,7 @@ void RocksDBEdgeIndex::warmup(arangodb::transaction::Methods* trx) {
ManagedDocumentResult mmdr;
bool needsInsert = false;
cache::Cache* cc = _cache.get();
rocksutils::iterateBounds(
bounds,
[&](rocksdb::Iterator* it) {
@ -659,7 +661,7 @@ void RocksDBEdgeIndex::warmup(arangodb::transaction::Methods* trx) {
builder.clear();
previous = v.toString();
auto finding =
_cache->find(previous.data(), (uint32_t)previous.size());
cc->find(previous.data(), (uint32_t)previous.size());
if (finding.found()) {
needsInsert = false;
} else {
@ -674,7 +676,7 @@ void RocksDBEdgeIndex::warmup(arangodb::transaction::Methods* trx) {
// Store what we have.
builder.close();
while (_cache->isResizing() || _cache->isMigrating()) {
while (cc->isResizing() || cc->isMigrating()) {
// We should wait here, the cache will reject
// any inserts anyways.
usleep(10000);
@ -684,7 +686,7 @@ void RocksDBEdgeIndex::warmup(arangodb::transaction::Methods* trx) {
previous.data(), static_cast<uint32_t>(previous.size()),
builder.slice().start(),
static_cast<uint64_t>(builder.slice().byteSize()));
if (!_cache->insert(entry)) {
if (!cc->insert(entry)) {
delete entry;
}
builder.clear();
@ -692,7 +694,7 @@ void RocksDBEdgeIndex::warmup(arangodb::transaction::Methods* trx) {
// Need to store
previous = v.toString();
auto finding =
_cache->find(previous.data(), (uint32_t)previous.size());
cc->find(previous.data(), (uint32_t)previous.size());
if (finding.found()) {
needsInsert = false;
} else {
@ -727,7 +729,7 @@ void RocksDBEdgeIndex::warmup(arangodb::transaction::Methods* trx) {
previous.data(), static_cast<uint32_t>(previous.size()),
builder.slice().start(),
static_cast<uint64_t>(builder.slice().byteSize()));
if (!_cache->insert(entry)) {
if (!cc->insert(entry)) {
delete entry;
}
}
@ -752,7 +754,7 @@ IndexIterator* RocksDBEdgeIndex::createEqIterator(
keys->close();
return new RocksDBEdgeIndexIterator(_collection, trx, mmdr, this, keys,
_cache.get());
_cache);
}
/// @brief create the iterator
@ -779,7 +781,7 @@ IndexIterator* RocksDBEdgeIndex::createInIterator(
keys->close();
return new RocksDBEdgeIndexIterator(_collection, trx, mmdr, this, keys,
_cache.get());
_cache);
}
/// @brief add a single value node to the iterator's keys

View File

@ -52,7 +52,8 @@ class RocksDBEdgeIndexIterator final : public IndexIterator {
transaction::Methods* trx,
ManagedDocumentResult* mmdr,
arangodb::RocksDBEdgeIndex const* index,
std::unique_ptr<VPackBuilder>& keys, cache::Cache*);
std::unique_ptr<VPackBuilder>& keys,
std::shared_ptr<cache::Cache>);
~RocksDBEdgeIndexIterator();
char const* typeName() const override { return "edge-index-iterator"; }
bool hasExtra() const override { return true; }
@ -76,7 +77,7 @@ class RocksDBEdgeIndexIterator final : public IndexIterator {
// the following 2 values are required for correct batch handling
std::unique_ptr<rocksdb::Iterator> _iterator; // iterator position in rocksdb
RocksDBKeyBounds _bounds;
cache::Cache* _cache;
std::shared_ptr<cache::Cache> _cache;
arangodb::velocypack::ArrayIterator _builderIterator;
arangodb::velocypack::Builder _builder;
size_t _copyCounter;

View File

@ -110,6 +110,14 @@ void RocksDBIndex::toVelocyPackFigures(VPackBuilder& builder) const {
}
}
int RocksDBIndex::load() {
if (_useCache) {
createCache();
TRI_ASSERT(_cachePresent);
}
return TRI_ERROR_NO_ERROR;
}
int RocksDBIndex::unload() {
if (useCache()) {
//LOG_TOPIC(ERR, Logger::FIXME) << "unload cache";

View File

@ -74,6 +74,7 @@ class RocksDBIndex : public Index {
int drop() override;
int load() override;
int unload() override;
virtual void truncate(transaction::Methods*);

View File

@ -75,6 +75,8 @@ class PhysicalCollection {
void figures(std::shared_ptr<arangodb::velocypack::Builder>& builder);
virtual int close() = 0;
virtual void load() = 0;
virtual void unload() = 0;
// @brief Return the number of documents in this collection
virtual uint64_t numberDocuments(transaction::Methods* trx) const = 0;

View File

@ -715,7 +715,13 @@ int LogicalCollection::close() {
return getPhysical()->close();
}
void LogicalCollection::unload() {}
void LogicalCollection::load() {
_physical->load();
}
void LogicalCollection::unload() {
_physical->unload();
}
void LogicalCollection::drop() {
// make sure collection has been closed

View File

@ -204,6 +204,7 @@ class LogicalCollection {
// SECTION: Modification Functions
int rename(std::string const&);
void load();
void unload();
virtual void drop();

View File

@ -522,6 +522,7 @@ int TRI_vocbase_t::loadCollection(arangodb::LogicalCollection* collection,
TRI_ASSERT(collection->status() == TRI_VOC_COL_STATUS_LOADING);
collection->setStatus(TRI_VOC_COL_STATUS_LOADED);
collection->load();
// release the WRITE lock and try again
locker.unlock();

View File

@ -402,8 +402,51 @@ function CollectionSuite () {
// unload is allowed
c.unload();
}
},
testEdgeCacheBehaviour : function() {
var checkIndexes = function (idx) {
}
var cn = "UnitLoadBehaviour123";
db._drop(cn);
var c = db._createEdgeCollection(cn);
c.load();
for(i=0;i<10000;i++) {
c.insert({_from:"c/v"+i, _to:"c/v"+i});
}
// check if edge cache is present
var idxs = db.test.getIndexes(true);
assertEqual("edge", idxs[1].type, idxs);
var inital = [];
for (idx in idxs) {
if (idx.figures.cacheInUse) {
inital.push(idx.figures.cacheSize);
} else {
inital.push(0);
}
}
c.warmup();
// checking if edge cach grew
idxs = db.test.getIndexes(true);
var i = 0;
for (idx in idxs) {
assertTrue(idx.figures.cacheSize > inital[i], idx);
i++;
}
c.unload();
idxs = db.test.getIndexes(true);
for (idx in idxs) {
assertEqual(idx.figures.cacheSize, 0, idx);
}
db._drop(cn);
}
};
}