//////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// /// Copyright 2014-2017 ArangoDB GmbH, Cologne, Germany /// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// /// Copyright holder is ArangoDB GmbH, Cologne, Germany /// /// @author Simon Grätzer /// @author Daniel Larkin-York //////////////////////////////////////////////////////////////////////////////// #include "RocksDBRecoveryManager.h" #include "ApplicationFeatures/ApplicationServer.h" #include "Basics/NumberUtils.h" #include "Basics/StringUtils.h" #include "Basics/VelocyPackHelper.h" #include "Basics/WriteLocker.h" #include "Basics/Exceptions.h" #include "Basics/exitcodes.h" #include "Logger/Logger.h" #include "RestServer/DatabaseFeature.h" #include "RocksDBEngine/RocksDBCollection.h" #include "RocksDBEngine/RocksDBColumnFamily.h" #include "RocksDBEngine/RocksDBCommon.h" #include "RocksDBEngine/RocksDBCuckooIndexEstimator.h" #include "RocksDBEngine/RocksDBEdgeIndex.h" #include "RocksDBEngine/RocksDBKey.h" #include "RocksDBEngine/RocksDBKeyBounds.h" #include "RocksDBEngine/RocksDBRecoveryHelper.h" #include "RocksDBEngine/RocksDBSettingsManager.h" #include "RocksDBEngine/RocksDBVPackIndex.h" #include "RocksDBEngine/RocksDBValue.h" #include "StorageEngine/EngineSelectorFeature.h" #include "VocBase/KeyGenerator.h" #include "VocBase/ticks.h" #include #include #include #include #include #include #include using namespace arangodb; using namespace arangodb::application_features; RocksDBRecoveryManager* RocksDBRecoveryManager::instance() { return ApplicationServer::getFeature(featureName()); } /// Constructor needs to be called synchrunously, /// will load counts from the db and scan the WAL RocksDBRecoveryManager::RocksDBRecoveryManager( application_features::ApplicationServer* server) : ApplicationFeature(server, featureName()), _db(nullptr), _inRecovery(true) { setOptional(true); requiresElevatedPrivileges(false); startsAfter("Database"); startsAfter("RocksDBEngine"); startsAfter("StorageEngine"); startsAfter("ServerId"); onlyEnabledWith("RocksDBEngine"); } void RocksDBRecoveryManager::start() { if (!isEnabled()) { return; } _db = ApplicationServer::getFeature("RocksDBEngine")->db(); runRecovery(); _inRecovery = false; // notify everyone that recovery is now done auto databaseFeature = ApplicationServer::getFeature("Database"); databaseFeature->recoveryDone(); } /// parse recent RocksDB WAL entries and notify the /// DatabaseFeature about the successful recovery void RocksDBRecoveryManager::runRecovery() { auto res = parseRocksWAL(); if (res.fail()) { LOG_TOPIC(FATAL, Logger::ENGINES) << "failed during rocksdb WAL recovery: " << res.errorMessage(); FATAL_ERROR_EXIT_CODE(TRI_EXIT_RECOVERY); } } bool RocksDBRecoveryManager::inRecovery() const { return _inRecovery; } class WBReader final : public rocksdb::WriteBatch::Handler { public: std::unordered_map deltas; rocksdb::SequenceNumber currentSeqNum; private: // must be retrieved from settings manager std::unordered_map _seqStart; std::unordered_map _generators; uint64_t _maxTick = 0; uint64_t _maxHLC = 0; public: explicit WBReader(std::unordered_map const& seqs) : currentSeqNum(0), _seqStart(seqs) {} Result shutdownWBReader() { Result rv = basics::catchVoidToResult([&]() -> void { // update ticks after parsing wal LOG_TOPIC(TRACE, Logger::ENGINES) << "max tick found in WAL: " << _maxTick << ", last HLC value: " << _maxHLC; TRI_UpdateTickServer(_maxTick); TRI_HybridLogicalClock(_maxHLC); // TODO update generators auto dbfeature = ApplicationServer::getFeature("Database"); for (auto gen : _generators) { if (gen.second > 0) { auto dbColPair = rocksutils::mapObjectToCollection(gen.first); if (dbColPair.second == 0 && dbColPair.first == 0) { // collection with this objectID not known.Skip. continue; } auto vocbase = dbfeature->useDatabase(dbColPair.first); if (vocbase == nullptr) { continue; } TRI_DEFER(vocbase->release()); auto collection = vocbase->lookupCollection(dbColPair.second); if (collection == nullptr) { continue; } std::string k(basics::StringUtils::itoa(gen.second)); collection->keyGenerator()->track(k.data(), k.size()); } } }); return rv; } bool shouldHandleDocument(uint32_t column_family_id, const rocksdb::Slice& key) { if (column_family_id == RocksDBColumnFamily::documents()->GetID()) { uint64_t objectId = RocksDBKey::objectId(key); auto const& it = _seqStart.find(objectId); if (it != _seqStart.end()) { if (deltas.find(objectId) == deltas.end()) { deltas.emplace(objectId, RocksDBSettingsManager::CounterAdjustment()); } return it->second <= currentSeqNum; } } return false; } void storeMaxHLC(uint64_t hlc) { if (hlc > _maxHLC) { _maxHLC = hlc; } } void storeMaxTick(uint64_t tick) { if (tick > _maxTick) { _maxTick = tick; } } void storeLastKeyValue(uint64_t objectId, uint64_t keyValue) { if (keyValue == 0) { return; } auto it = _generators.find(objectId); if (it == _generators.end()) { try { _generators.emplace(objectId, keyValue); } catch (...) { } return; } if (keyValue > (*it).second) { (*it).second = keyValue; } } RocksDBCuckooIndexEstimator* findEstimator(uint64_t objectId) { RocksDBEngine* engine = static_cast(EngineSelectorFeature::ENGINE); RocksDBEngine::IndexTriple triple = engine->mapObjectToIndex(objectId); if (std::get<0>(triple) == 0 && std::get<1>(triple) == 0) { return nullptr; } DatabaseFeature* df = DatabaseFeature::DATABASE; TRI_vocbase_t* vb = df->useDatabase(std::get<0>(triple)); if (vb == nullptr) { return nullptr; } TRI_DEFER(vb->release()); auto coll = vb->lookupCollection(std::get<1>(triple)); if (coll == nullptr) { return nullptr; } std::shared_ptr index = coll->lookupIndex(std::get<2>(triple)); if (index == nullptr) { return nullptr; } return static_cast(index.get())->estimator(); } void updateMaxTick(uint32_t column_family_id, const rocksdb::Slice& key, const rocksdb::Slice& value) { // RETURN (side-effect): update _maxTick // // extract max tick from Markers and store them as side-effect in // _maxTick member variable that can be used later (dtor) to call // TRI_UpdateTickServer (ticks.h) // Markers: - collections (id,objectid) as tick and max tick in indexes // array // - documents - _rev (revision as maxtick) // - databases if (column_family_id == RocksDBColumnFamily::documents()->GetID()) { storeMaxHLC(RocksDBKey::documentId(RocksDBEntryType::Document, key).id()); storeLastKeyValue(RocksDBKey::objectId(key), RocksDBValue::keyValue(value)); } else if (column_family_id == RocksDBColumnFamily::primary()->GetID()) { // document key StringRef ref = RocksDBKey::primaryKey(key); TRI_ASSERT(!ref.empty()); // check if the key is numeric if (ref[0] >= '1' && ref[0] <= '9') { // numeric start byte. looks good bool valid; uint64_t tick = NumberUtils::atoi(ref.data(), ref.data() + ref.size(), valid); if (valid) { // if no previous _maxTick set or the numeric value found is // "near" our previous _maxTick, then we update it if (tick > _maxTick && (_maxTick == 0 || tick - _maxTick < 2048)) { storeMaxTick(tick); } } // else we got a non-numeric key. simply ignore it } } else if (column_family_id == RocksDBColumnFamily::definitions()->GetID()) { auto const type = RocksDBKey::type(key); if (type == RocksDBEntryType::Collection) { storeMaxTick(RocksDBKey::collectionId(key)); auto slice = RocksDBValue::data(value); storeMaxTick(basics::VelocyPackHelper::stringUInt64(slice, "objectId")); VPackSlice indexes = slice.get("indexes"); for (VPackSlice const& idx : VPackArrayIterator(indexes)) { storeMaxTick( std::max(basics::VelocyPackHelper::stringUInt64(idx, "objectId"), basics::VelocyPackHelper::stringUInt64(idx, "id"))); } } else if (type == RocksDBEntryType::Database) { storeMaxTick(RocksDBKey::databaseId(key)); } else if (type == RocksDBEntryType::View) { storeMaxTick( std::max(RocksDBKey::databaseId(key), RocksDBKey::viewId(key))); } } } rocksdb::Status PutCF(uint32_t column_family_id, const rocksdb::Slice& key, const rocksdb::Slice& value) override { updateMaxTick(column_family_id, key, value); if (shouldHandleDocument(column_family_id, key)) { uint64_t objectId = RocksDBKey::objectId(key); LocalDocumentId docId = RocksDBKey::documentId(RocksDBEntryType::Document, key); auto const& it = deltas.find(objectId); if (it != deltas.end()) { it->second._sequenceNum = currentSeqNum; it->second._added++; it->second._revisionId = docId.id(); } } else { // We have to adjust the estimate with an insert uint64_t hash = 0; if (column_family_id == RocksDBColumnFamily::vpack()->GetID()) { hash = RocksDBVPackIndex::HashForKey(key); } else if (column_family_id == RocksDBColumnFamily::edge()->GetID()) { hash = RocksDBEdgeIndex::HashForKey(key); } if (hash != 0) { uint64_t objectId = RocksDBKey::objectId(key); auto est = findEstimator(objectId); if (est != nullptr && est->commitSeq() < currentSeqNum) { // We track estimates for this index est->insert(hash); } } } RocksDBEngine* engine = static_cast(EngineSelectorFeature::ENGINE); for (auto helper : engine->recoveryHelpers()) { helper->PutCF(column_family_id, key, value); } return rocksdb::Status(); } rocksdb::Status DeleteCF(uint32_t column_family_id, const rocksdb::Slice& key) override { if (shouldHandleDocument(column_family_id, key)) { uint64_t objectId = RocksDBKey::objectId(key); LocalDocumentId docId = RocksDBKey::documentId(RocksDBEntryType::Document, key); auto const& it = deltas.find(objectId); if (it != deltas.end()) { it->second._sequenceNum = currentSeqNum; it->second._removed++; it->second._revisionId = docId.id(); } } else { // We have to adjust the estimate with an insert uint64_t hash = 0; if (column_family_id == RocksDBColumnFamily::vpack()->GetID()) { hash = RocksDBVPackIndex::HashForKey(key); } else if (column_family_id == RocksDBColumnFamily::edge()->GetID()) { hash = RocksDBEdgeIndex::HashForKey(key); } if (hash != 0) { uint64_t objectId = RocksDBKey::objectId(key); auto est = findEstimator(objectId); if (est != nullptr && est->commitSeq() < currentSeqNum) { // We track estimates for this index est->remove(hash); } } } RocksDBEngine* engine = static_cast(EngineSelectorFeature::ENGINE); for (auto helper : engine->recoveryHelpers()) { helper->DeleteCF(column_family_id, key); } return rocksdb::Status(); } rocksdb::Status SingleDeleteCF(uint32_t column_family_id, const rocksdb::Slice& key) override { RocksDBEngine* engine = static_cast(EngineSelectorFeature::ENGINE); for (auto helper : engine->recoveryHelpers()) { helper->SingleDeleteCF(column_family_id, key); } return rocksdb::Status(); } void LogData(const rocksdb::Slice& blob) override { RocksDBEngine* engine = static_cast(EngineSelectorFeature::ENGINE); for (auto helper : engine->recoveryHelpers()) { helper->LogData(blob); } } }; /// parse the WAL with the above handler parser class Result RocksDBRecoveryManager::parseRocksWAL() { Result shutdownRv; Result res = basics::catchToResult([&]() -> Result { Result rv; RocksDBEngine* engine = static_cast(EngineSelectorFeature::ENGINE); for (auto& helper : engine->recoveryHelpers()) { helper->prepare(); } // Tell the WriteBatch reader the transaction markers to look for WBReader handler(engine->settingsManager()->counterSeqs()); auto minTick = std::min(engine->settingsManager()->earliestSeqNeeded(), engine->releasedTick()); std::unique_ptr iterator; // reader(); rocksdb::Status s = _db->GetUpdatesSince( minTick, &iterator, rocksdb::TransactionLogIterator::ReadOptions(true)); rv = rocksutils::convertStatus(s); if (rv.ok()) { while (iterator->Valid()) { s = iterator->status(); if (s.ok()) { rocksdb::BatchResult batch = iterator->GetBatch(); handler.currentSeqNum = batch.sequence; s = batch.writeBatchPtr->Iterate(&handler); } if (!s.ok()) { rv = rocksutils::convertStatus(s); std::string msg = "error during WAL scan: " + rv.errorMessage(); LOG_TOPIC(ERR, Logger::ENGINES) << msg; rv.reset(rv.errorNumber(), std::move(msg)); // update message break; } iterator->Next(); } if (rv.ok()) { LOG_TOPIC(TRACE, Logger::ENGINES) << "finished WAL scan with " << handler.deltas.size(); for (auto& pair : handler.deltas) { engine->settingsManager()->updateCounter(pair.first, pair.second); LOG_TOPIC(TRACE, Logger::ENGINES) << "WAL recovered " << pair.second.added() << " PUTs and " << pair.second.removed() << " DELETEs for objectID " << pair.first; } } } shutdownRv = handler.shutdownWBReader(); return rv; }); if (res.ok()) { res = std::move(shutdownRv); } else { if (shutdownRv.fail()){ res.reset(res.errorNumber(), res.errorMessage() + " - " + shutdownRv.errorMessage()); } } return res; }