1
0
Fork 0

make the replication applier auto-start for the RocksDB engine if con… (#3647)

This commit is contained in:
Jan 2017-11-15 12:02:37 +01:00 committed by Frank Celler
parent 2d7b58fcb8
commit 3b0a8a9cdf
9 changed files with 103 additions and 14 deletions

View File

@ -46,6 +46,7 @@ ReplicationFeature::ReplicationFeature(ApplicationServer* server)
setOptional(false);
requiresElevatedPrivileges(false);
startsAfter("Database");
startsAfter("ServerId");
startsAfter("StorageEngine");
}

View File

@ -62,6 +62,7 @@ set(ROCKSDB_SOURCES
RocksDBEngine/RocksDBMethods.cpp
RocksDBEngine/RocksDBOptimizerRules.cpp
RocksDBEngine/RocksDBPrimaryIndex.cpp
RocksDBEngine/RocksDBRecoveryFinalizer.cpp
RocksDBEngine/RocksDBReplicationCommon.cpp
RocksDBEngine/RocksDBReplicationContext.cpp
RocksDBEngine/RocksDBReplicationManager.cpp

View File

@ -97,10 +97,6 @@ void RocksDBCounterManager::runRecovery() {
sync(false);
}
}
// notify everyone that recovery is now done
auto databaseFeature = ApplicationServer::getFeature<DatabaseFeature>("Database");
databaseFeature->recoveryDone();
}
RocksDBCounterManager::CounterAdjustment RocksDBCounterManager::loadCounter(

View File

@ -59,6 +59,7 @@
#include "RocksDBEngine/RocksDBLogValue.h"
#include "RocksDBEngine/RocksDBOptimizerRules.h"
#include "RocksDBEngine/RocksDBPrefixExtractor.h"
#include "RocksDBEngine/RocksDBRecoveryFinalizer.h"
#include "RocksDBEngine/RocksDBReplicationManager.h"
#include "RocksDBEngine/RocksDBReplicationTailing.h"
#include "RocksDBEngine/RocksDBRestHandlers.h"
@ -128,6 +129,8 @@ RocksDBEngine::RocksDBEngine(application_features::ApplicationServer* server)
// inherits order from StorageEngine but requires "RocksDBOption" that is used
// to configure this engine and the MMFiles PersistentIndexFeature
startsAfter("RocksDBOption");
server->addFeature(new RocksDBRecoveryFinalizer(server));
}
RocksDBEngine::~RocksDBEngine() { delete _db; }
@ -513,11 +516,11 @@ void RocksDBEngine::start() {
key.string(), &oldVersion);
if (dbExisted) {
if (s.IsNotFound() || oldVersion.data()[0] < version) {
LOG_TOPIC(ERR, Logger::ENGINES)
LOG_TOPIC(FATAL, Logger::ENGINES)
<< "Your db directory is in an old format. Please delete the directory.";
FATAL_ERROR_EXIT();
} else if (oldVersion.data()[0] > version) {
LOG_TOPIC(ERR, Logger::ENGINES)
LOG_TOPIC(FATAL, Logger::ENGINES)
<< "You are using an old version of ArangoDB, please update "
<< "before opening this dir.";
FATAL_ERROR_EXIT();
@ -537,9 +540,9 @@ void RocksDBEngine::start() {
_counterManager->runRecovery();
double const counter_sync_seconds = 2.5;
double const counterSyncSeconds = 2.5;
_backgroundThread.reset(
new RocksDBBackgroundThread(this, counter_sync_seconds));
new RocksDBBackgroundThread(this, counterSyncSeconds));
if (!_backgroundThread->start()) {
LOG_TOPIC(FATAL, Logger::ENGINES)
<< "could not start rocksdb counter manager";

View File

@ -0,0 +1,49 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2016 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Jan Steemann
////////////////////////////////////////////////////////////////////////////////
#include "RocksDBRecoveryFinalizer.h"
#include "ApplicationFeatures/ApplicationServer.h"
#include "RestServer/DatabaseFeature.h"
using namespace arangodb;
using namespace arangodb::application_features;
RocksDBRecoveryFinalizer::RocksDBRecoveryFinalizer(ApplicationServer* server)
: ApplicationFeature(server, "RocksDBRecoveryFinalizer") {
setOptional(true);
requiresElevatedPrivileges(false);
startsAfter("Database");
startsAfter("RocksDBEngine");
startsAfter("StorageEngine");
startsAfter("ServerId");
onlyEnabledWith("RocksDBEngine");
}
void RocksDBRecoveryFinalizer::start() {
if (!isEnabled()) {
return;
}
// notify everyone that recovery is now done
auto databaseFeature = ApplicationServer::getFeature<DatabaseFeature>("Database");
databaseFeature->recoveryDone();
}

View File

@ -0,0 +1,44 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2016 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Jan Steemann
////////////////////////////////////////////////////////////////////////////////
#ifndef ROCKSDB_RECOVERY_FINALIZER_H
#define ROCKSDB_RECOVERY_FINALIZER_H 1
#include "ApplicationFeatures/ApplicationFeature.h"
namespace arangodb {
// a small glue feature that only establishes the dependencies between the RocksDBEngine
// feature and the DatabaseFeature
// its start method will be run after both the RocksDBEngine and the DatabaseFeature
// have started and all databases have been established
// it will then call the DatabaseFeature's recoveryDone() method, which will start
// replication in all databases if nececessary
class RocksDBRecoveryFinalizer final : public application_features::ApplicationFeature {
public:
explicit RocksDBRecoveryFinalizer(application_features::ApplicationServer* server);
public:
void start() override final;
};
}
#endif

View File

@ -50,9 +50,6 @@ using namespace arangodb::velocypack;
double const RocksDBReplicationContext::DefaultTTL = 300.0; // seconds
RocksDBReplicationContext::RocksDBReplicationContext()
: RocksDBReplicationContext(DefaultTTL) {}
RocksDBReplicationContext::RocksDBReplicationContext(double ttl)
: _id(TRI_NewTickServer()),
_lastTick(0),

View File

@ -52,9 +52,7 @@ class RocksDBReplicationContext {
RocksDBReplicationContext(RocksDBReplicationContext const&) = delete;
RocksDBReplicationContext& operator=(RocksDBReplicationContext const&) = delete;
RocksDBReplicationContext();
explicit RocksDBReplicationContext(double ttl);
~RocksDBReplicationContext();
TRI_voc_tick_t id() const; //batchId

View File

@ -161,7 +161,7 @@ class StorageEngine : public application_features::ApplicationFeature {
/// @brief opens a database
virtual TRI_vocbase_t* openDatabase(arangodb::velocypack::Slice const& args, bool isUpgrade, int& status) = 0;
TRI_vocbase_t* openDatabase(arangodb::velocypack::Slice const& args, bool isUpgrade){
TRI_vocbase_t* openDatabase(arangodb::velocypack::Slice const& args, bool isUpgrade) {
int status;
TRI_vocbase_t* rv = openDatabase(args, isUpgrade, status);
TRI_ASSERT(status == TRI_ERROR_NO_ERROR);