1
0
Fork 0

remove code that potentially throws from ~WBReader (#4282)

This commit is contained in:
Jan Christoph Uhde 2018-01-15 15:14:55 +01:00 committed by Jan
parent 416724ff68
commit 9c2ebb7bfc
8 changed files with 121 additions and 73 deletions

View File

@ -71,5 +71,10 @@ ${Switch} $0
; fill me
${Break}
${Case} 23 # EXIT_RECOVERY
MessageBox MB_ICONEXCLAMATION '$1:$\r$\nrecovery failed'
; Will be returned if the recovery fails
${Break}
${EndSwitch}
FunctionEnd

View File

@ -29,6 +29,8 @@
#include "Basics/StringUtils.h"
#include "Basics/VelocyPackHelper.h"
#include "Basics/WriteLocker.h"
#include "Basics/Exceptions.h"
#include "Basics/exitcodes.h"
#include "Logger/Logger.h"
#include "RestServer/DatabaseFeature.h"
#include "RocksDBEngine/RocksDBCollection.h"
@ -97,9 +99,13 @@ void RocksDBRecoveryManager::start() {
/// parse recent RocksDB WAL entries and notify the
/// DatabaseFeature about the successful recovery
void RocksDBRecoveryManager::runRecovery() {
bool success = parseRocksWAL();
if (!success) {
} // TODO what do we do if not successful?
auto res = parseRocksWAL();
if (res.fail()) {
LOG_TOPIC(FATAL, Logger::ENGINES)
<< "failed during rocksdb WAL recovery: "
<< res.errorMessage();
FATAL_ERROR_EXIT_CODE(TRI_EXIT_RECOVERY);
}
}
bool RocksDBRecoveryManager::inRecovery() const { return _inRecovery; }
@ -122,37 +128,41 @@ class WBReader final : public rocksdb::WriteBatch::Handler {
explicit WBReader(std::unordered_map<uint64_t, rocksdb::SequenceNumber> const& seqs)
: currentSeqNum(0), _seqStart(seqs) {}
~WBReader() {
// update ticks after parsing wal
LOG_TOPIC(TRACE, Logger::ENGINES) << "max tick found in WAL: " << _maxTick
<< ", last HLC value: " << _maxHLC;
Result shutdownWBReader() {
Result rv;
try {
// update ticks after parsing wal
LOG_TOPIC(TRACE, Logger::ENGINES) << "max tick found in WAL: " << _maxTick
<< ", last HLC value: " << _maxHLC;
TRI_UpdateTickServer(_maxTick);
TRI_HybridLogicalClock(_maxHLC);
TRI_UpdateTickServer(_maxTick);
TRI_HybridLogicalClock(_maxHLC);
// TODO update generators
auto dbfeature = ApplicationServer::getFeature<DatabaseFeature>("Database");
for (auto gen : _generators) {
if (gen.second > 0) {
auto dbColPair = rocksutils::mapObjectToCollection(gen.first);
if (dbColPair.second == 0 && dbColPair.first == 0) {
// collection with this objectID not known.Skip.
continue;
}
auto vocbase = dbfeature->useDatabase(dbColPair.first);
if (vocbase == nullptr) {
continue;
}
TRI_DEFER(vocbase->release());
// TODO update generators
auto dbfeature = ApplicationServer::getFeature<DatabaseFeature>("Database");
for (auto gen : _generators) {
if (gen.second > 0) {
auto dbColPair = rocksutils::mapObjectToCollection(gen.first);
if (dbColPair.second == 0 && dbColPair.first == 0) {
// collection with this objectID not known.Skip.
continue;
}
auto vocbase = dbfeature->useDatabase(dbColPair.first);
if (vocbase == nullptr) {
continue;
}
TRI_DEFER(vocbase->release());
auto collection = vocbase->lookupCollection(dbColPair.second);
if (collection == nullptr) {
continue;
auto collection = vocbase->lookupCollection(dbColPair.second);
if (collection == nullptr) {
continue;
}
std::string k(basics::StringUtils::itoa(gen.second));
collection->keyGenerator()->track(k.data(), k.size());
}
std::string k(basics::StringUtils::itoa(gen.second));
collection->keyGenerator()->track(k.data(), k.size());
}
}
} CATCH_TO_RESULT(rv,TRI_ERROR_INTERNAL);
return rv;
}
bool shouldHandleDocument(uint32_t column_family_id,
@ -375,49 +385,72 @@ class WBReader final : public rocksdb::WriteBatch::Handler {
};
/// parse the WAL with the above handler parser class
bool RocksDBRecoveryManager::parseRocksWAL() {
RocksDBEngine* engine =
static_cast<RocksDBEngine*>(EngineSelectorFeature::ENGINE);
for (auto helper : engine->recoveryHelpers()) {
helper->prepare();
}
Result RocksDBRecoveryManager::parseRocksWAL() {
Result rv;
std::unique_ptr<WBReader> handler;
// Tell the WriteBatch reader the transaction markers to look for
auto handler =
std::make_unique<WBReader>(engine->settingsManager()->counterSeqs());
auto minTick = std::min(engine->settingsManager()->earliestSeqNeeded(),
engine->releasedTick());
std::unique_ptr<rocksdb::TransactionLogIterator> iterator; // reader();
rocksdb::Status s = _db->GetUpdatesSince(
minTick, &iterator, rocksdb::TransactionLogIterator::ReadOptions(true));
if (!s.ok()) { // TODO do something?
return false;
}
while (iterator->Valid()) {
s = iterator->status();
if (s.ok()) {
rocksdb::BatchResult batch = iterator->GetBatch();
handler->currentSeqNum = batch.sequence;
s = batch.writeBatchPtr->Iterate(handler.get());
}
if (!s.ok()) {
LOG_TOPIC(ERR, Logger::ENGINES) << "error during WAL scan";
break;
try {
RocksDBEngine* engine =
static_cast<RocksDBEngine*>(EngineSelectorFeature::ENGINE);
for (auto helper : engine->recoveryHelpers()) {
helper->prepare();
}
iterator->Next();
// Tell the WriteBatch reader the transaction markers to look for
handler = std::make_unique<WBReader>(engine->settingsManager()->counterSeqs());
auto minTick = std::min(engine->settingsManager()->earliestSeqNeeded(),
engine->releasedTick());
std::unique_ptr<rocksdb::TransactionLogIterator> iterator; // reader();
rocksdb::Status s = _db->GetUpdatesSince(
minTick, &iterator, rocksdb::TransactionLogIterator::ReadOptions(true));
rv = rocksutils::convertStatus(s);
if(rv.ok()){
while (iterator->Valid()) {
s = iterator->status();
if (s.ok()) {
rocksdb::BatchResult batch = iterator->GetBatch();
handler->currentSeqNum = batch.sequence;
s = batch.writeBatchPtr->Iterate(handler.get());
}
if (!s.ok()) {
rv = rocksutils::convertStatus(s);
std::string msg = "error during WAL scan: " + rv.errorMessage();
LOG_TOPIC(ERR, Logger::ENGINES) << msg;
rv.reset(rv.errorNumber(), msg); // update message
break;
}
iterator->Next();
}
if(rv.ok()){
LOG_TOPIC(TRACE, Logger::ENGINES)
<< "finished WAL scan with " << handler->deltas.size();
for (std::pair<uint64_t, RocksDBSettingsManager::CounterAdjustment> pair :
handler->deltas) {
engine->settingsManager()->updateCounter(pair.first, pair.second);
LOG_TOPIC(TRACE, Logger::ENGINES)
<< "WAL recovered " << pair.second.added() << " PUTs and "
<< pair.second.removed() << " DELETEs for objectID " << pair.first;
}
}
}
} CATCH_TO_RESULT(rv,TRI_ERROR_INTERNAL);
auto shutdownRv = handler->shutdownWBReader();
if(rv.ok()) {
rv = std::move(shutdownRv);
} else {
if(shutdownRv.fail()){
rv.reset(rv.errorNumber(), rv.errorMessage() + " - " + shutdownRv.errorMessage());
}
}
LOG_TOPIC(TRACE, Logger::ENGINES)
<< "finished WAL scan with " << handler->deltas.size();
for (std::pair<uint64_t, RocksDBSettingsManager::CounterAdjustment> pair :
handler->deltas) {
engine->settingsManager()->updateCounter(pair.first, pair.second);
LOG_TOPIC(TRACE, Logger::ENGINES)
<< "WAL recovered " << pair.second.added() << " PUTs and "
<< pair.second.removed() << " DELETEs for objectID " << pair.first;
}
return handler->deltas.size() > 0;
return rv;
}

View File

@ -49,8 +49,10 @@ class RocksDBRecoveryManager final
void runRecovery();
bool inRecovery() const;
private:
Result parseRocksWAL();
protected:
bool parseRocksWAL();
//////////////////////////////////////////////////////////////////////////////
/// @brief rocksdb instance

View File

@ -19,7 +19,8 @@
"EXIT_VERSION_CHECK_FAILED" : { "code" : 13, "message" : "version check failed" },
"EXIT_ALREADY_RUNNING" : { "code" : 20, "message" : "already running" },
"EXIT_COULD_NOT_BIND_PORT" : { "code" : 21, "message" : "port blocked" },
"EXIT_COULD_NOT_LOCK" : { "code" : 22, "message" : "could not lock - another process could be running" }
"EXIT_COULD_NOT_LOCK" : { "code" : 22, "message" : "could not lock - another process could be running" },
"EXIT_RECOVERY" : { "code" : 23, "message" : "recovery failed" }
};
}());

View File

@ -19,4 +19,5 @@ void TRI_InitializeExitMessages() {
REG_EXIT(EXIT_ALREADY_RUNNING, "already running");
REG_EXIT(EXIT_COULD_NOT_BIND_PORT, "port blocked");
REG_EXIT(EXIT_COULD_NOT_LOCK, "could not lock - another process could be running");
REG_EXIT(EXIT_RECOVERY, "recovery failed");
}

View File

@ -20,6 +20,7 @@ EXIT_VERSION_CHECK_FAILED,13,"version check failed","Will be returned when there
EXIT_ALREADY_RUNNING,20,"already running","Will be returned when arangod is already running according to PID-file"
EXIT_COULD_NOT_BIND_PORT,21,"port blocked","Will be returned when endpoint is taken by another process"
EXIT_COULD_NOT_LOCK,22,"could not lock - another process could be running","fill me"
EXIT_RECOVERY,23,"recovery failed","Will be returned if the recovery fails"
# network
#EXIT_NO_COORDINATOR

View File

@ -66,6 +66,11 @@ constexpr int TRI_EXIT_COULD_NOT_BIND_PORT
/// fill me
constexpr int TRI_EXIT_COULD_NOT_LOCK = 22;
/// 23: EXIT_RECOVERY
/// recovery failed
/// Will be returned if the recovery fails
constexpr int TRI_EXIT_RECOVERY = 23;
/// register all exit codes for ArangoDB
void TRI_InitializeExitMessages();

View File

@ -30,7 +30,7 @@ def genJsFile(errors):
+ " var internal = require(\"internal\");\n"\
+ "\n"\
+ " internal.exitCodes = {\n"
# print individual errors
i = 0
for e in errors:
@ -38,7 +38,7 @@ def genJsFile(errors):
msg = e[2].replace("\n", " ").replace("\\", "").replace("\"", "\\\"")
out = out\
+ " " + name.ljust(30) + " : { \"code\" : " + e[1] + ", \"message\" : \"" + msg + "\" }"
i = i + 1
if i < len(errors):