diff --git a/arangod/RestServer/CheckVersionFeature.cpp b/arangod/RestServer/CheckVersionFeature.cpp index b920eb925f..a00b984787 100644 --- a/arangod/RestServer/CheckVersionFeature.cpp +++ b/arangod/RestServer/CheckVersionFeature.cpp @@ -99,7 +99,6 @@ void CheckVersionFeature::start() { // and force shutdown server()->beginShutdown(); - LOG_TOPIC(DEBUG, arangodb::Logger::FIXME) << "checking version on an empty database"; usleep(1 * 1000 * 1000); TRI_EXIT_FUNCTION(EXIT_SUCCESS, nullptr); } @@ -139,6 +138,7 @@ void CheckVersionFeature::checkVersion() { // can do this without a lock as this is the startup DatabaseFeature* databaseFeature = application_features::ApplicationServer::getFeature("Database"); + // iterate over all databases for (auto& name : databaseFeature->getDatabaseNames()) { TRI_vocbase_t* vocbase = databaseFeature->lookupDatabase(name); @@ -156,9 +156,17 @@ void CheckVersionFeature::checkVersion() { << "'. Please inspect the logs for any errors"; FATAL_ERROR_EXIT(); } else if (status == 3) { + // this is safe to do even if further databases will be checked + // because we will never set the status back to success *_result = 3; + LOG_TOPIC(WARN, arangodb::Logger::FIXME) << "Database version check failed for '" + << vocbase->name() << "': upgrade needed"; } else if (status == 2 && *_result == 1) { + // this is safe to do even if further databases will be checked + // because we will never set the status back to success *_result = 2; + LOG_TOPIC(WARN, arangodb::Logger::FIXME) << "Database version check failed for '" + << vocbase->name() << "': downgrade needed"; } } } @@ -168,8 +176,14 @@ void CheckVersionFeature::checkVersion() { localContext->Exit(); } } + + LOG_TOPIC(DEBUG, arangodb::Logger::FIXME) << "final result of version check: " << *_result; if (*_result == 1) { *_result = EXIT_SUCCESS; + } else if (*_result > 1) { + *_result = EXIT_FAILURE; + LOG_TOPIC(FATAL, arangodb::Logger::FIXME) << "Database version check failed"; + FATAL_ERROR_EXIT(); } } diff --git a/arangod/RocksDBEngine/RocksDBEdgeIndex.cpp b/arangod/RocksDBEngine/RocksDBEdgeIndex.cpp index cc3a16b132..5c5c898bec 100644 --- a/arangod/RocksDBEngine/RocksDBEdgeIndex.cpp +++ b/arangod/RocksDBEngine/RocksDBEdgeIndex.cpp @@ -67,6 +67,7 @@ RocksDBEdgeIndexIterator::RocksDBEdgeIndexIterator( _arrayIterator(VPackSlice::emptyArraySlice()), _bounds(RocksDBKeyBounds::EdgeIndex(0)), _doUpdateBounds(true), + _doUpdateArrayIterator(true), _useCache(useCache), _cache(cache), _cacheValueSize(0) { @@ -100,10 +101,7 @@ StringRef getFromToFromIterator(arangodb::velocypack::ArrayIterator const& it){ } bool RocksDBEdgeIndexIterator::next(TokenCallback const& cb, size_t limit) { - //LOG_TOPIC(ERR, Logger::FIXME) << "rockdb edge index next"; - std::size_t cacheValueSizeLimit = limit; TRI_ASSERT(_trx->state()->isRunning()); - if (limit == 0 || !_keysIterator.valid()) { // No limit no data, or we are actually done. The last call should have // returned false @@ -111,64 +109,70 @@ bool RocksDBEdgeIndexIterator::next(TokenCallback const& cb, size_t limit) { return false; } + std::size_t cacheValueSizeLimit = 1000; + // acquire RocksDB collection RocksDBToken token; + auto iterateChachedValues = [this,&cb,&limit,&token](){ + //LOG_TOPIC(ERR, Logger::FIXME) << "value found in cache "; + while(_arrayIterator.valid()){ + StringRef edgeKey(_arrayIterator.value()); + if(lookupDocumentAndUseCb(edgeKey, cb, limit, token, true)){ + _arrayIterator++; + return true; // more documents - function will be re-entered + } + _arrayIterator++; + } + + //reset cache iterator before handling next from/to + _arrayBuffer.clear(); + _arrayIterator = VPackArrayIterator(VPackSlice::emptyArraySlice()); + return false; + }; while (_keysIterator.valid()) { TRI_ASSERT(limit > 0); StringRef fromTo = getFromToFromIterator(_keysIterator); bool foundInCache = false; + //LOG_TOPIC(ERR, Logger::FIXME) << "fromTo" << fromTo; - if (_useCache){ - //LOG_TOPIC(ERR, Logger::FIXME) << "using cache"; - // find in cache - foundInCache = false; - // handle resume of next() in cached case - if(!_arrayBuffer.empty()){ - // resume iteration after batch size limit was hit - // do not modify buffer or iterator - foundInCache = true; - } else { - // try to find cached value - auto f = _cache->find(fromTo.data(), (uint32_t)fromTo.size()); - foundInCache = f.found(); - if (foundInCache) { - VPackSlice cachedPrimaryKeys(f.value()->value()); - TRI_ASSERT(cachedPrimaryKeys.isArray()); - - if(cachedPrimaryKeys.length() <= limit){ - // do not copy - _arraySlice = cachedPrimaryKeys; - } else { - // copy data if there are more documents than the batch size limit allows - _arrayBuffer.append(cachedPrimaryKeys.start(),cachedPrimaryKeys.byteSize()); - _arraySlice = VPackSlice(_arrayBuffer.data()); - } - // update iterator - _arrayIterator = VPackArrayIterator(_arraySlice); - } else { - //LOG_TOPIC(ERR, Logger::FIXME) << "not found in cache " << fromTo; - } - } - + if (_useCache && _doUpdateArrayIterator){ + // try to find cached value + auto f = _cache->find(fromTo.data(), (uint32_t)fromTo.size()); + foundInCache = f.found(); if (foundInCache) { - //iterate over cached primary keys - for(VPackSlice const& slice: _arrayIterator){ - StringRef edgeKey(slice); - //LOG_TOPIC(ERR, Logger::FIXME) << "using value form cache " << slice.toJson(); - if(lookupDocumentAndUseCb(edgeKey, cb, limit, token)){ - return true; // more documents - function will be re-entered - } else { - //iterate over keys - } + VPackSlice cachedPrimaryKeys(f.value()->value()); + TRI_ASSERT(cachedPrimaryKeys.isArray()); + + // update arraySlice (and copu Buffer if required) + if(cachedPrimaryKeys.length() <= limit){ + _arraySlice = cachedPrimaryKeys; // do not copy + } else { + // copy data if there are more documents than the batch size limit allows + _arrayBuffer.append(cachedPrimaryKeys.start(),cachedPrimaryKeys.byteSize()); + _arraySlice = VPackSlice(_arrayBuffer.data()); + } + + // update cache value iterator + _arrayIterator = VPackArrayIterator(_arraySlice); + + // iterate until batch size limit is hit + bool continueWithNextBatch = iterateChachedValues(); + if(continueWithNextBatch){ + return true; // exit and continue with next batch + } else { + continue; // advance keys (from/to) iterator } - _arrayIterator.reset(); - _arrayBuffer.clear(); - _keysIterator.next(); // handle next key - continue; // do not use the code below that does a lookup in RocksDB } - } else { - //LOG_TOPIC(ERR, Logger::FIXME) << "not using cache"; + } else if (_useCache && !_doUpdateArrayIterator){ + // resuming old iterator + _doUpdateArrayIterator = true; + bool continueWithNextBatch = iterateChachedValues(); + if(continueWithNextBatch){ + return true; // exit and continue with next batch + } else { + continue; // advance keys (from/to) iterator + } } if(!foundInCache) { @@ -184,7 +188,6 @@ bool RocksDBEdgeIndexIterator::next(TokenCallback const& cb, size_t limit) { updateBounds(fromTo); if(_useCache){ _cacheValueBuilder.openArray(); - _cacheValueSize = cacheValueSizeLimit; } } else { _doUpdateBounds = true; @@ -192,37 +195,41 @@ bool RocksDBEdgeIndexIterator::next(TokenCallback const& cb, size_t limit) { while (_iterator->Valid() && (_index->_cmp->Compare(_iterator->key(), _bounds.end()) < 0)) { StringRef edgeKey = RocksDBKey::primaryKey(_iterator->key()); + + // build cache value for from/to if(_useCache){ - if (_cacheValueSize < cacheValueSizeLimit){ - _cacheValueBuilder.add(VPackValuePair(edgeKey.data(),edgeKey.size())); + if (_cacheValueSize <= cacheValueSizeLimit){ + _cacheValueBuilder.add(VPackValue(std::string(edgeKey.data(),edgeKey.size()))); ++_cacheValueSize; - } else { - _cacheValueBuilder.clear(); - _cacheValueBuilder.openArray(); } } - if(lookupDocumentAndUseCb(edgeKey, cb, limit, token)){ + // lookup real document + bool continueWithNextBatch = lookupDocumentAndUseCb(edgeKey, cb, limit, token, false); + _iterator->Next(); + + //check batch size limit + if(continueWithNextBatch){ return true; // more documents - function will be re-entered - } else { - // batch size limit not reached continue loop } } - //only add entry if cache is available and entry did not hit size limit - if (_useCache) { - _cacheValueBuilder.close(); - if(!_cacheValueBuilder.isEmpty() && _cacheValueBuilder.slice().length() > 0) { - auto entry = cache::CachedValue::construct( - fromTo.data(), static_cast(fromTo.size()), - _cacheValueBuilder.slice().start(), static_cast(_cacheValueBuilder.slice().byteSize())); - bool cached = _cache->insert(entry); - if (!cached) { - delete entry; - } - } // builder not empty - } // use cache + // insert cache values that are beyond the cacheValueSizeLimit + if (_useCache && _cacheValueSize <= cacheValueSizeLimit){ _cacheValueBuilder.close(); + auto entry = cache::CachedValue::construct( + fromTo.data(), static_cast(fromTo.size()), + _cacheValueBuilder.slice().start(), + static_cast(_cacheValueBuilder.slice().byteSize()) + ); + bool cached = _cache->insert(entry); + if (!cached) { + delete entry; + } + } + //prepare for next key + _cacheValueBuilder.clear(); + _cacheValueSize = 0; } // not found in cache _keysIterator.next(); // handle next key @@ -233,16 +240,19 @@ bool RocksDBEdgeIndexIterator::next(TokenCallback const& cb, size_t limit) { // acquire the document token through the primary index bool RocksDBEdgeIndexIterator::lookupDocumentAndUseCb( StringRef primaryKey, TokenCallback const& cb, - size_t& limit, RocksDBToken& token){ + size_t& limit, RocksDBToken& token, bool fromCache){ //we pass the token in as ref to avoid allocations auto rocksColl = toRocksDBCollection(_collection); Result res = rocksColl->lookupDocumentToken(_trx, primaryKey, token); - _iterator->Next(); if (res.ok()) { cb(token); --limit; if (limit == 0) { - _doUpdateBounds=false; //limit hit continue with next batch + if(fromCache) { + _doUpdateArrayIterator=false; //limit hit continue with next batch + } else { + _doUpdateBounds=false; //limit hit continue with next batch + } return true; } } // TODO do we need to handle failed lookups here? @@ -252,6 +262,7 @@ bool RocksDBEdgeIndexIterator::lookupDocumentAndUseCb( void RocksDBEdgeIndexIterator::reset() { //rest offsets into iterators _doUpdateBounds = true; + _doUpdateArrayIterator = true; _cacheValueBuilder.clear(); _arrayBuffer.clear(); _arraySlice = VPackSlice::emptyArraySlice(); @@ -268,15 +279,18 @@ RocksDBEdgeIndex::RocksDBEdgeIndex(TRI_idx_iid_t iid, : RocksDBIndex(iid ,collection ,std::vector>({{AttributeName(attr, false)}}) - ,false - ,false + ,false // unique + ,false // sparse ,basics::VelocyPackHelper::stringUInt64(info, "objectId") - ,false //!ServerState::instance()->isCoordinator() /*useCache*/ + ,!ServerState::instance()->isCoordinator() // useCache ) , _directionAttr(attr) { TRI_ASSERT(iid != 0); TRI_ASSERT(_objectId != 0); + // if we never hit the assertions we need to remove the + // following code + // FIXME if (_objectId == 0) { //disable cache? _useCache = false; @@ -547,9 +561,6 @@ IndexIterator* RocksDBEdgeIndex::createEqIterator( } keys->close(); - //LOG_TOPIC(ERR, Logger::FIXME) << "_useCache: " << _useCache - // << " _cachePresent: " << _cachePresent - // << " useCache():" << useCache(); return new RocksDBEdgeIndexIterator( _collection, trx, mmdr, this, keys, useCache(), _cache.get()); } @@ -577,7 +588,6 @@ IndexIterator* RocksDBEdgeIndex::createInIterator( } keys->close(); - //LOG_TOPIC(ERR, Logger::FIXME) << "useCache: " << _useCache << useCache(); return new RocksDBEdgeIndexIterator( _collection, trx, mmdr, this, keys, useCache(), _cache.get()); } diff --git a/arangod/RocksDBEngine/RocksDBEdgeIndex.h b/arangod/RocksDBEngine/RocksDBEdgeIndex.h index fbdfd02018..fa9865b124 100644 --- a/arangod/RocksDBEngine/RocksDBEdgeIndex.h +++ b/arangod/RocksDBEngine/RocksDBEdgeIndex.h @@ -61,7 +61,7 @@ class RocksDBEdgeIndexIterator final : public IndexIterator { private: void updateBounds(StringRef fromTo); bool lookupDocumentAndUseCb( - StringRef primaryKey, TokenCallback const&, size_t& limit, RocksDBToken&); + StringRef primaryKey, TokenCallback const&, size_t& limit, RocksDBToken&, bool fromCache); std::unique_ptr _keys; arangodb::velocypack::ArrayIterator _keysIterator; RocksDBEdgeIndex const* _index; @@ -74,6 +74,7 @@ class RocksDBEdgeIndexIterator final : public IndexIterator { RocksDBKeyBounds _bounds; bool _doUpdateBounds; + bool _doUpdateArrayIterator; bool _useCache; cache::Cache* _cache; VPackBuilder _cacheValueBuilder;