1
0
Fork 0

Merge branch 'devel' of https://github.com/arangodb/arangodb into devel

This commit is contained in:
Simon Grätzer 2017-05-12 14:07:34 +02:00
commit 40f174710b
3 changed files with 109 additions and 84 deletions

View File

@ -99,7 +99,6 @@ void CheckVersionFeature::start() {
// and force shutdown
server()->beginShutdown();
LOG_TOPIC(DEBUG, arangodb::Logger::FIXME) << "checking version on an empty database";
usleep(1 * 1000 * 1000);
TRI_EXIT_FUNCTION(EXIT_SUCCESS, nullptr);
}
@ -139,6 +138,7 @@ void CheckVersionFeature::checkVersion() {
// can do this without a lock as this is the startup
DatabaseFeature* databaseFeature = application_features::ApplicationServer::getFeature<DatabaseFeature>("Database");
// iterate over all databases
for (auto& name : databaseFeature->getDatabaseNames()) {
TRI_vocbase_t* vocbase = databaseFeature->lookupDatabase(name);
@ -156,9 +156,17 @@ void CheckVersionFeature::checkVersion() {
<< "'. Please inspect the logs for any errors";
FATAL_ERROR_EXIT();
} else if (status == 3) {
// this is safe to do even if further databases will be checked
// because we will never set the status back to success
*_result = 3;
LOG_TOPIC(WARN, arangodb::Logger::FIXME) << "Database version check failed for '"
<< vocbase->name() << "': upgrade needed";
} else if (status == 2 && *_result == 1) {
// this is safe to do even if further databases will be checked
// because we will never set the status back to success
*_result = 2;
LOG_TOPIC(WARN, arangodb::Logger::FIXME) << "Database version check failed for '"
<< vocbase->name() << "': downgrade needed";
}
}
}
@ -168,8 +176,14 @@ void CheckVersionFeature::checkVersion() {
localContext->Exit();
}
}
LOG_TOPIC(DEBUG, arangodb::Logger::FIXME) << "final result of version check: " << *_result;
if (*_result == 1) {
*_result = EXIT_SUCCESS;
} else if (*_result > 1) {
*_result = EXIT_FAILURE;
LOG_TOPIC(FATAL, arangodb::Logger::FIXME) << "Database version check failed";
FATAL_ERROR_EXIT();
}
}

View File

@ -67,6 +67,7 @@ RocksDBEdgeIndexIterator::RocksDBEdgeIndexIterator(
_arrayIterator(VPackSlice::emptyArraySlice()),
_bounds(RocksDBKeyBounds::EdgeIndex(0)),
_doUpdateBounds(true),
_doUpdateArrayIterator(true),
_useCache(useCache),
_cache(cache),
_cacheValueSize(0) {
@ -100,10 +101,7 @@ StringRef getFromToFromIterator(arangodb::velocypack::ArrayIterator const& it){
}
bool RocksDBEdgeIndexIterator::next(TokenCallback const& cb, size_t limit) {
//LOG_TOPIC(ERR, Logger::FIXME) << "rockdb edge index next";
std::size_t cacheValueSizeLimit = limit;
TRI_ASSERT(_trx->state()->isRunning());
if (limit == 0 || !_keysIterator.valid()) {
// No limit no data, or we are actually done. The last call should have
// returned false
@ -111,64 +109,70 @@ bool RocksDBEdgeIndexIterator::next(TokenCallback const& cb, size_t limit) {
return false;
}
std::size_t cacheValueSizeLimit = 1000;
// acquire RocksDB collection
RocksDBToken token;
auto iterateChachedValues = [this,&cb,&limit,&token](){
//LOG_TOPIC(ERR, Logger::FIXME) << "value found in cache ";
while(_arrayIterator.valid()){
StringRef edgeKey(_arrayIterator.value());
if(lookupDocumentAndUseCb(edgeKey, cb, limit, token, true)){
_arrayIterator++;
return true; // more documents - function will be re-entered
}
_arrayIterator++;
}
//reset cache iterator before handling next from/to
_arrayBuffer.clear();
_arrayIterator = VPackArrayIterator(VPackSlice::emptyArraySlice());
return false;
};
while (_keysIterator.valid()) {
TRI_ASSERT(limit > 0);
StringRef fromTo = getFromToFromIterator(_keysIterator);
bool foundInCache = false;
//LOG_TOPIC(ERR, Logger::FIXME) << "fromTo" << fromTo;
if (_useCache){
//LOG_TOPIC(ERR, Logger::FIXME) << "using cache";
// find in cache
foundInCache = false;
// handle resume of next() in cached case
if(!_arrayBuffer.empty()){
// resume iteration after batch size limit was hit
// do not modify buffer or iterator
foundInCache = true;
} else {
// try to find cached value
auto f = _cache->find(fromTo.data(), (uint32_t)fromTo.size());
foundInCache = f.found();
if (foundInCache) {
VPackSlice cachedPrimaryKeys(f.value()->value());
TRI_ASSERT(cachedPrimaryKeys.isArray());
if(cachedPrimaryKeys.length() <= limit){
// do not copy
_arraySlice = cachedPrimaryKeys;
} else {
// copy data if there are more documents than the batch size limit allows
_arrayBuffer.append(cachedPrimaryKeys.start(),cachedPrimaryKeys.byteSize());
_arraySlice = VPackSlice(_arrayBuffer.data());
}
// update iterator
_arrayIterator = VPackArrayIterator(_arraySlice);
} else {
//LOG_TOPIC(ERR, Logger::FIXME) << "not found in cache " << fromTo;
}
}
if (_useCache && _doUpdateArrayIterator){
// try to find cached value
auto f = _cache->find(fromTo.data(), (uint32_t)fromTo.size());
foundInCache = f.found();
if (foundInCache) {
//iterate over cached primary keys
for(VPackSlice const& slice: _arrayIterator){
StringRef edgeKey(slice);
//LOG_TOPIC(ERR, Logger::FIXME) << "using value form cache " << slice.toJson();
if(lookupDocumentAndUseCb(edgeKey, cb, limit, token)){
return true; // more documents - function will be re-entered
} else {
//iterate over keys
}
VPackSlice cachedPrimaryKeys(f.value()->value());
TRI_ASSERT(cachedPrimaryKeys.isArray());
// update arraySlice (and copu Buffer if required)
if(cachedPrimaryKeys.length() <= limit){
_arraySlice = cachedPrimaryKeys; // do not copy
} else {
// copy data if there are more documents than the batch size limit allows
_arrayBuffer.append(cachedPrimaryKeys.start(),cachedPrimaryKeys.byteSize());
_arraySlice = VPackSlice(_arrayBuffer.data());
}
// update cache value iterator
_arrayIterator = VPackArrayIterator(_arraySlice);
// iterate until batch size limit is hit
bool continueWithNextBatch = iterateChachedValues();
if(continueWithNextBatch){
return true; // exit and continue with next batch
} else {
continue; // advance keys (from/to) iterator
}
_arrayIterator.reset();
_arrayBuffer.clear();
_keysIterator.next(); // handle next key
continue; // do not use the code below that does a lookup in RocksDB
}
} else {
//LOG_TOPIC(ERR, Logger::FIXME) << "not using cache";
} else if (_useCache && !_doUpdateArrayIterator){
// resuming old iterator
_doUpdateArrayIterator = true;
bool continueWithNextBatch = iterateChachedValues();
if(continueWithNextBatch){
return true; // exit and continue with next batch
} else {
continue; // advance keys (from/to) iterator
}
}
if(!foundInCache) {
@ -184,7 +188,6 @@ bool RocksDBEdgeIndexIterator::next(TokenCallback const& cb, size_t limit) {
updateBounds(fromTo);
if(_useCache){
_cacheValueBuilder.openArray();
_cacheValueSize = cacheValueSizeLimit;
}
} else {
_doUpdateBounds = true;
@ -192,37 +195,41 @@ bool RocksDBEdgeIndexIterator::next(TokenCallback const& cb, size_t limit) {
while (_iterator->Valid() && (_index->_cmp->Compare(_iterator->key(), _bounds.end()) < 0)) {
StringRef edgeKey = RocksDBKey::primaryKey(_iterator->key());
// build cache value for from/to
if(_useCache){
if (_cacheValueSize < cacheValueSizeLimit){
_cacheValueBuilder.add(VPackValuePair(edgeKey.data(),edgeKey.size()));
if (_cacheValueSize <= cacheValueSizeLimit){
_cacheValueBuilder.add(VPackValue(std::string(edgeKey.data(),edgeKey.size())));
++_cacheValueSize;
} else {
_cacheValueBuilder.clear();
_cacheValueBuilder.openArray();
}
}
if(lookupDocumentAndUseCb(edgeKey, cb, limit, token)){
// lookup real document
bool continueWithNextBatch = lookupDocumentAndUseCb(edgeKey, cb, limit, token, false);
_iterator->Next();
//check batch size limit
if(continueWithNextBatch){
return true; // more documents - function will be re-entered
} else {
// batch size limit not reached continue loop
}
}
//only add entry if cache is available and entry did not hit size limit
if (_useCache) {
_cacheValueBuilder.close();
if(!_cacheValueBuilder.isEmpty() && _cacheValueBuilder.slice().length() > 0) {
auto entry = cache::CachedValue::construct(
fromTo.data(), static_cast<uint32_t>(fromTo.size()),
_cacheValueBuilder.slice().start(), static_cast<uint64_t>(_cacheValueBuilder.slice().byteSize()));
bool cached = _cache->insert(entry);
if (!cached) {
delete entry;
}
} // builder not empty
} // use cache
// insert cache values that are beyond the cacheValueSizeLimit
if (_useCache && _cacheValueSize <= cacheValueSizeLimit){ _cacheValueBuilder.close();
auto entry = cache::CachedValue::construct(
fromTo.data(), static_cast<uint32_t>(fromTo.size()),
_cacheValueBuilder.slice().start(),
static_cast<uint64_t>(_cacheValueBuilder.slice().byteSize())
);
bool cached = _cache->insert(entry);
if (!cached) {
delete entry;
}
}
//prepare for next key
_cacheValueBuilder.clear();
_cacheValueSize = 0;
} // not found in cache
_keysIterator.next(); // handle next key
@ -233,16 +240,19 @@ bool RocksDBEdgeIndexIterator::next(TokenCallback const& cb, size_t limit) {
// acquire the document token through the primary index
bool RocksDBEdgeIndexIterator::lookupDocumentAndUseCb(
StringRef primaryKey, TokenCallback const& cb,
size_t& limit, RocksDBToken& token){
size_t& limit, RocksDBToken& token, bool fromCache){
//we pass the token in as ref to avoid allocations
auto rocksColl = toRocksDBCollection(_collection);
Result res = rocksColl->lookupDocumentToken(_trx, primaryKey, token);
_iterator->Next();
if (res.ok()) {
cb(token);
--limit;
if (limit == 0) {
_doUpdateBounds=false; //limit hit continue with next batch
if(fromCache) {
_doUpdateArrayIterator=false; //limit hit continue with next batch
} else {
_doUpdateBounds=false; //limit hit continue with next batch
}
return true;
}
} // TODO do we need to handle failed lookups here?
@ -252,6 +262,7 @@ bool RocksDBEdgeIndexIterator::lookupDocumentAndUseCb(
void RocksDBEdgeIndexIterator::reset() {
//rest offsets into iterators
_doUpdateBounds = true;
_doUpdateArrayIterator = true;
_cacheValueBuilder.clear();
_arrayBuffer.clear();
_arraySlice = VPackSlice::emptyArraySlice();
@ -268,15 +279,18 @@ RocksDBEdgeIndex::RocksDBEdgeIndex(TRI_idx_iid_t iid,
: RocksDBIndex(iid
,collection
,std::vector<std::vector<AttributeName>>({{AttributeName(attr, false)}})
,false
,false
,false // unique
,false // sparse
,basics::VelocyPackHelper::stringUInt64(info, "objectId")
,false //!ServerState::instance()->isCoordinator() /*useCache*/
,!ServerState::instance()->isCoordinator() // useCache
)
, _directionAttr(attr)
{
TRI_ASSERT(iid != 0);
TRI_ASSERT(_objectId != 0);
// if we never hit the assertions we need to remove the
// following code
// FIXME
if (_objectId == 0) {
//disable cache?
_useCache = false;
@ -547,9 +561,6 @@ IndexIterator* RocksDBEdgeIndex::createEqIterator(
}
keys->close();
//LOG_TOPIC(ERR, Logger::FIXME) << "_useCache: " << _useCache
// << " _cachePresent: " << _cachePresent
// << " useCache():" << useCache();
return new RocksDBEdgeIndexIterator(
_collection, trx, mmdr, this, keys, useCache(), _cache.get());
}
@ -577,7 +588,6 @@ IndexIterator* RocksDBEdgeIndex::createInIterator(
}
keys->close();
//LOG_TOPIC(ERR, Logger::FIXME) << "useCache: " << _useCache << useCache();
return new RocksDBEdgeIndexIterator(
_collection, trx, mmdr, this, keys, useCache(), _cache.get());
}

View File

@ -61,7 +61,7 @@ class RocksDBEdgeIndexIterator final : public IndexIterator {
private:
void updateBounds(StringRef fromTo);
bool lookupDocumentAndUseCb(
StringRef primaryKey, TokenCallback const&, size_t& limit, RocksDBToken&);
StringRef primaryKey, TokenCallback const&, size_t& limit, RocksDBToken&, bool fromCache);
std::unique_ptr<arangodb::velocypack::Builder> _keys;
arangodb::velocypack::ArrayIterator _keysIterator;
RocksDBEdgeIndex const* _index;
@ -74,6 +74,7 @@ class RocksDBEdgeIndexIterator final : public IndexIterator {
RocksDBKeyBounds _bounds;
bool _doUpdateBounds;
bool _doUpdateArrayIterator;
bool _useCache;
cache::Cache* _cache;
VPackBuilder _cacheValueBuilder;