1
0
Fork 0

Bug fix/fix cluster upgrade (#10411)

This commit is contained in:
Jan 2019-11-15 10:42:04 +01:00 committed by GitHub
parent 298fe7f27c
commit 1476ddbd0d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 544 additions and 318 deletions

View File

@ -808,7 +808,7 @@ set_property(CACHE USE_IPO PROPERTY STRINGS AUTO ON OFF)
set(IPO_ENABLED False)
# Determine value if IPO_ENABLED from USE_IPO and CMAKE_BUILD_TYPE
if(USE_IPO STREQUAL "AUTO")
if (USE_IPO STREQUAL "AUTO")
# When USE_IPO=AUTO, enable IPO for optimized / release builds.
# But to work around a g++ segfault triggered by using both -flto and
# -fno-devirtualize-functions, we disable IPO when using google tests, because
@ -825,7 +825,7 @@ if(USE_IPO STREQUAL "AUTO")
else()
set(IPO_ENABLED False)
endif ()
elseif(USE_IPO)
elseif (USE_IPO)
set(IPO_ENABLED True)
else()
set(IPO_ENABLED False)
@ -833,6 +833,10 @@ endif()
message(STATUS "IPO_ENABLED: ${IPO_ENABLED}")
set(CMAKE_INTERPROCEDURAL_OPTIMIZATION ${IPO_ENABLED})
if (IPO_ENABLED)
add_definitions("-DARANGODB_USE_IPO=1")
endif()
################################################################################
## LIBRARY RESOLV

View File

@ -52,7 +52,6 @@ add_library(arango_iresearch
IResearch/ApplicationServerHelper.h IResearch/ApplicationServerHelper.cpp
IResearch/Containers.cpp IResearch/Containers.h
IResearch/IResearchAnalyzerFeature.cpp IResearch/IResearchAnalyzerFeature.h
IResearch/IResearchAnalyzerCollectionFeature.cpp
IResearch/IResearchCommon.cpp IResearch/IResearchCommon.h
IResearch/IResearchKludge.cpp IResearch/IResearchKludge.h
IResearch/IResearchLink.cpp IResearch/IResearchLink.h
@ -543,6 +542,7 @@ set(LIB_ARANGOSERVER_SOURCES
Cluster/ClusterInfo.cpp
Cluster/ClusterRepairDistributeShardsLike.cpp
Cluster/ClusterRepairOperations.cpp
Cluster/ClusterUpgradeFeature.cpp
Cluster/ClusterTrxMethods.cpp
Cluster/ClusterTypes.cpp
Cluster/CreateCollection.cpp

View File

@ -547,22 +547,7 @@ void ClusterFeature::start() {
void ClusterFeature::beginShutdown() { ClusterComm::instance()->disable(); }
void ClusterFeature::stop() {
if (_heartbeatThread != nullptr) {
_heartbeatThread->beginShutdown();
}
if (_heartbeatThread != nullptr) {
int counter = 0;
while (_heartbeatThread->isRunning()) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
// emit warning after 5 seconds
if (++counter == 10 * 5) {
LOG_TOPIC("acaa9", WARN, arangodb::Logger::CLUSTER)
<< "waiting for heartbeat thread to finish";
}
}
}
shutdownHeartbeatThread();
ClusterComm::instance()->stopBackgroundThreads();
}
@ -572,9 +557,7 @@ void ClusterFeature::unprepare() {
return;
}
if (_heartbeatThread != nullptr) {
_heartbeatThread->beginShutdown();
}
shutdownHeartbeatThread();
// change into shutdown state
ServerState::instance()->setState(ServerState::STATE_SHUTDOWN);
@ -667,7 +650,7 @@ void ClusterFeature::setUnregisterOnShutdown(bool unregisterOnShutdown) {
/// @brief common routine to start heartbeat with or without cluster active
void ClusterFeature::startHeartbeatThread(AgencyCallbackRegistry* agencyCallbackRegistry,
uint64_t interval_ms, uint64_t maxFailsBeforeWarning,
const std::string& endpoints) {
std::string const& endpoints) {
_heartbeatThread =
std::make_shared<HeartbeatThread>(server(), agencyCallbackRegistry,
std::chrono::microseconds(interval_ms * 1000),
@ -686,6 +669,24 @@ void ClusterFeature::startHeartbeatThread(AgencyCallbackRegistry* agencyCallback
}
}
void ClusterFeature::shutdownHeartbeatThread() {
if (_heartbeatThread == nullptr) {
return;
}
_heartbeatThread->beginShutdown();
int counter = 0;
while (_heartbeatThread->isRunning()) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
// emit warning after 5 seconds
if (++counter == 10 * 5) {
LOG_TOPIC("acaa9", WARN, arangodb::Logger::CLUSTER)
<< "waiting for heartbeat thread to finish";
}
}
}
void ClusterFeature::syncDBServerStatusQuo() {
if (_heartbeatThread != nullptr) {
_heartbeatThread->syncDBServerStatusQuo(true);

View File

@ -87,7 +87,9 @@ class ClusterFeature : public application_features::ApplicationFeature {
protected:
void startHeartbeatThread(AgencyCallbackRegistry* agencyCallbackRegistry,
uint64_t interval_ms, uint64_t maxFailsBeforeWarning,
const std::string& endpoints);
std::string const& endpoints);
void shutdownHeartbeatThread();
private:
void reportRole(ServerState::RoleEnum);

View File

@ -0,0 +1,223 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2016 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Jan Steemann
////////////////////////////////////////////////////////////////////////////////
#include "Cluster/ClusterUpgradeFeature.h"
#include "Agency/AgencyComm.h"
#include "Agency/AgencyFeature.h"
#include "ApplicationFeatures/ApplicationServer.h"
#include "Basics/ScopeGuard.h"
#include "Cluster/ServerState.h"
#include "FeaturePhases/FinalFeaturePhase.h"
#include "Logger/LogMacros.h"
#include "ProgramOptions/ProgramOptions.h"
#include "RestServer/DatabaseFeature.h"
#include "RestServer/UpgradeFeature.h"
#include "VocBase/vocbase.h"
#include "VocBase/Methods/Upgrade.h"
#include "VocBase/Methods/Version.h"
using namespace arangodb;
using namespace arangodb::options;
namespace {
static std::string const upgradeVersionKey = "ClusterUpgradeVersion";
static std::string const upgradeExecutedByKey = "ClusterUpgradeExecutedBy";
}
ClusterUpgradeFeature::ClusterUpgradeFeature(application_features::ApplicationServer& server)
: ApplicationFeature(server, "ClusterUpgrade"),
_upgradeMode("auto") {
startsAfter<application_features::FinalFeaturePhase>();
}
void ClusterUpgradeFeature::collectOptions(std::shared_ptr<ProgramOptions> options) {
options->addOption("--cluster.upgrade",
"perform a cluster upgrade if necessary (auto = perform upgrade and shut down only if `--database.auto-upgrade true` is set, disable = never perform upgrade, force = always perform an upgrade and shut down, online = always perform an upgrade but don't shut down)",
new DiscreteValuesParameter<StringParameter>(&_upgradeMode, std::unordered_set<std::string>{"auto", "disable", "force", "online"}));
}
void ClusterUpgradeFeature::validateOptions(std::shared_ptr<ProgramOptions> options) {
auto& databaseFeature = server().getFeature<arangodb::DatabaseFeature>();
if (_upgradeMode == "force") {
// always perform an upgrade, regardless of the value of `--database.auto-upgrade`.
// after the upgrade, shut down the server
databaseFeature.enableUpgrade();
} else if (_upgradeMode == "disable") {
// never perform an upgrade, regardless of the value of `--database.auto-upgrade`.
// don't shut down the server
databaseFeature.disableUpgrade();
} else if (_upgradeMode == "online") {
// perform an upgrade, but stay online and don't shut down the server.
// disabling the upgrade functionality in the database feature is required for this.
databaseFeature.disableUpgrade();
}
}
void ClusterUpgradeFeature::start() {
if (!ServerState::instance()->isCoordinator()) {
return;
}
// this feature is doing something meaning only in a coordinator, and only
// if the server was started with the option `--database.auto-upgrade true`.
auto& databaseFeature = server().getFeature<arangodb::DatabaseFeature>();
if (_upgradeMode == "disable" || (!databaseFeature.upgrade() && (_upgradeMode != "online" && _upgradeMode != "force"))) {
return;
}
tryClusterUpgrade();
if (_upgradeMode != "online") {
LOG_TOPIC("d6047", INFO, arangodb::Logger::STARTUP) << "server will now shut down due to upgrade.";
server().beginShutdown();
}
}
void ClusterUpgradeFeature::setBootstrapVersion() {
// it is not a fundamental problem if the setValue fails. if it fails, we can't
// store the version number in the agency, so an upgrade we will run all the
// (idempotent) upgrade tasks for the same version again.
VPackBuilder builder;
builder.add(VPackValue(arangodb::methods::Version::current()));
AgencyComm agency;
agency.setValue(::upgradeVersionKey, builder.slice(), 0);
}
void ClusterUpgradeFeature::tryClusterUpgrade() {
TRI_ASSERT(ServerState::instance()->isCoordinator());
AgencyComm agency;
AgencyCommResult result = agency.getValues(::upgradeVersionKey);
if (!result.successful()) {
LOG_TOPIC("26104", ERR, arangodb::Logger::CLUSTER) << "unable to fetch cluster upgrade version from agency: " << result.errorMessage();
return;
}
uint64_t latestUpgradeVersion = 0;
VPackSlice value = result.slice()[0].get(
std::vector<std::string>({AgencyCommManager::path(), ::upgradeVersionKey}));
if (value.isNumber()) {
latestUpgradeVersion = value.getNumber<uint64_t>();
LOG_TOPIC("54f69", DEBUG, arangodb::Logger::CLUSTER) << "found previous cluster upgrade version in agency: " << latestUpgradeVersion;
} else {
// key not there yet.
LOG_TOPIC("5b00d", DEBUG, arangodb::Logger::CLUSTER) << "did not find previous cluster upgrade version in agency";
}
if (arangodb::methods::Version::current() <= latestUpgradeVersion) {
// nothing to do
return;
}
std::vector<AgencyPrecondition> precs;
if (latestUpgradeVersion == 0) {
precs.emplace_back(::upgradeVersionKey, AgencyPrecondition::Type::EMPTY, true);
} else {
precs.emplace_back(::upgradeVersionKey, AgencyPrecondition::Type::VALUE, latestUpgradeVersion);
}
// there must be no other coordinator that performs an upgrade at the same time
precs.emplace_back(::upgradeExecutedByKey, AgencyPrecondition::Type::EMPTY, true);
// try to register ourselves as responsible for the upgrade
AgencyOperation operation(::upgradeExecutedByKey, AgencyValueOperationType::SET, ServerState::instance()->getId());
// make the key expire automatically in case we crash
// operation._ttl = TRI_microtime() + 1800.0;
AgencyWriteTransaction transaction(operation, precs);
result = agency.sendTransactionWithFailover(transaction);
if (result.successful()) {
// we are responsible for the upgrade!
LOG_TOPIC("15ac4", INFO, arangodb::Logger::CLUSTER)
<< "running cluster upgrade from "
<< (latestUpgradeVersion == 0 ? std::string("an unknown version") : std::string("version ") + std::to_string(latestUpgradeVersion))
<< " to version " << arangodb::methods::Version::current() << "...";
bool success = false;
try {
success = upgradeCoordinator();
} catch (std::exception const& ex) {
LOG_TOPIC("f2a84", ERR, Logger::CLUSTER) << "caught exception during cluster upgrade: " << ex.what();
TRI_ASSERT(!success);
}
// now finally remove the upgrading key and store the new version number
std::vector<AgencyPrecondition> precs;
precs.emplace_back(::upgradeExecutedByKey, AgencyPrecondition::Type::VALUE, ServerState::instance()->getId());
std::vector<AgencyOperation> operations;
if (success) {
// upgrade successful - store our current version number
operations.emplace_back(::upgradeVersionKey, AgencyValueOperationType::SET, arangodb::methods::Version::current());
}
// remove the key that locks out other coordinators from upgrading
operations.emplace_back(::upgradeExecutedByKey, AgencySimpleOperationType::DELETE_OP);
AgencyWriteTransaction transaction(operations, precs);
result = agency.sendTransactionWithFailover(transaction);
if (result.successful()) {
LOG_TOPIC("853de", INFO, arangodb::Logger::CLUSTER)
<< "cluster upgrade to version " << arangodb::methods::Version::current()
<< " completed successfully";
} else {
LOG_TOPIC("a0b4f", ERR, arangodb::Logger::CLUSTER) << "unable to store cluster upgrade information in agency: " << result.errorMessage();
}
} else if (result.httpCode() != (int)arangodb::rest::ResponseCode::PRECONDITION_FAILED) {
LOG_TOPIC("482a3", WARN, arangodb::Logger::CLUSTER) << "unable to fetch upgrade information: " << result.errorMessage();
} else {
// someone else is performing the upgrade
LOG_TOPIC("ab6eb", DEBUG, arangodb::Logger::CLUSTER) << "someone else is running the cluster upgrade right now";
}
}
bool ClusterUpgradeFeature::upgradeCoordinator() {
LOG_TOPIC("a2d65", TRACE, arangodb::Logger::FIXME) << "starting coordinator upgrade";
bool success = true;
DatabaseFeature& databaseFeature = server().getFeature<DatabaseFeature>();
for (auto& name : databaseFeature.getDatabaseNames()) {
TRI_vocbase_t* vocbase = databaseFeature.useDatabase(name);
if (vocbase == nullptr) {
// probably deleted in the meantime... so we can ignore it here
continue;
}
auto guard = scopeGuard([&vocbase]() { vocbase->release(); });
auto res = methods::Upgrade::startupCoordinator(*vocbase);
if (res.fail()) {
LOG_TOPIC("f51b1", ERR, arangodb::Logger::FIXME)
<< "Database '" << vocbase->name() << "' upgrade failed ("
<< res.errorMessage() << "). "
<< "Please inspect the logs from the upgrade procedure"
<< " and try starting the server again.";
success = false;
}
}
LOG_TOPIC("efd49", TRACE, arangodb::Logger::FIXME) << "finished coordinator upgrade";
return success;
}

View File

@ -0,0 +1,55 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2016 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Jan Steemann
////////////////////////////////////////////////////////////////////////////////
#ifndef APPLICATION_FEATURES_CLUSTER_UPGRADE_FEATURE_H
#define APPLICATION_FEATURES_CLUSTER_UPGRADE_FEATURE_H 1
#include "ApplicationFeatures/ApplicationFeature.h"
namespace arangodb {
// this feature is responsible for performing a cluster upgrade.
// it is only doing something in a coordinator, and only if the server was started
// with the option `--database.auto-upgrade true`. The feature is late in the
// startup sequence, so it can use the full cluster functionality when run.
// after the feature has executed the upgrade, it will shut down the server.
class ClusterUpgradeFeature final : public application_features::ApplicationFeature {
public:
explicit ClusterUpgradeFeature(application_features::ApplicationServer& server);
void collectOptions(std::shared_ptr<options::ProgramOptions>) override final;
void validateOptions(std::shared_ptr<options::ProgramOptions>) override final;
void start() override final;
void setBootstrapVersion();
private:
void tryClusterUpgrade();
bool upgradeCoordinator();
private:
std::string _upgradeMode;
};
} // namespace arangodb
#endif

View File

@ -1,73 +0,0 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2017 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Jan Steemann
////////////////////////////////////////////////////////////////////////////////
#include "ApplicationServerHelper.h"
#include "Basics/StaticStrings.h"
#include "Cluster/ServerState.h"
#include "FeaturePhases/ClusterFeaturePhase.h"
#include "FeaturePhases/DatabaseFeaturePhase.h"
#include "FeaturePhases/ServerFeaturePhase.h"
#include "IResearch/IResearchAnalyzerCollectionFeature.h"
#include "IResearch/IResearchCommon.h"
#include "Logger/Logger.h"
#include "Logger/LogMacros.h"
#include "RestServer/BootstrapFeature.h"
#include "RestServer/DatabaseFeature.h"
#include "VocBase/Methods/Collections.h"
using namespace arangodb;
IResearchAnalyzerCollectionFeature::IResearchAnalyzerCollectionFeature(application_features::ApplicationServer& server)
: ApplicationFeature(server, "ArangoSearchAnalyzerCollection") {
setOptional(true);
startsAfter<application_features::DatabaseFeaturePhase>();
// should be relatively late in startup sequence
startsAfter<application_features::ClusterFeaturePhase>();
startsAfter<application_features::ServerFeaturePhase>();
startsAfter<BootstrapFeature>();
}
void IResearchAnalyzerCollectionFeature::start() {
if (ServerState::instance()->isDBServer()) {
// no need to execute this in DB server
return;
}
DatabaseFeature* databaseFeature = DatabaseFeature::DATABASE;
TRI_ASSERT(databaseFeature != nullptr);
databaseFeature->enumerateDatabases([](TRI_vocbase_t& vocbase) {
Result res = methods::Collections::lookup(vocbase, StaticStrings::AnalyzersCollection, [](std::shared_ptr<LogicalCollection> const&) {
});
if (res.is(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND)) {
// collection does not yet exist, so let's create it now
auto res = methods::Collections::createSystem(vocbase, StaticStrings::AnalyzersCollection, false);
if (res.first.ok()) {
LOG_TOPIC("c2e33", DEBUG, arangodb::iresearch::TOPIC) << "successfully created '" << StaticStrings::AnalyzersCollection << "' collection in database '" << vocbase.name() << "'";
} else if (res.first.fail() && !res.first.is(TRI_ERROR_ARANGO_CONFLICT)) {
LOG_TOPIC("ecc23", WARN, arangodb::iresearch::TOPIC) << "unable to create '" << StaticStrings::AnalyzersCollection << "' collection: " << res.first.errorMessage();
// don't abort startup here. the next startup may fix this
}
}
});
}

View File

@ -1,42 +0,0 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2017 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Jan Steemann
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGOD_IRESEARCH__IRESEARCH_ANALYZER_COLLECTION_FEATURE_H
#define ARANGOD_IRESEARCH__IRESEARCH_ANALYZER_COLLECTION_FEATURE_H 1
#include "ApplicationFeatures/ApplicationFeature.h"
namespace arangodb {
/// @brief the sole purpose of this feature is to create potentially
/// missing `_analyzers` collection after startup. It can be removed
/// eventually once the entire upgrading logic has been revised
class IResearchAnalyzerCollectionFeature final : public arangodb::application_features::ApplicationFeature {
public:
explicit IResearchAnalyzerCollectionFeature(arangodb::application_features::ApplicationServer& server);
void start() override;
};
} // namespace arangodb
#endif

View File

@ -65,7 +65,6 @@
#include "RestServer/DatabaseFeature.h"
#include "RestServer/QueryRegistryFeature.h"
#include "RestServer/SystemDatabaseFeature.h"
#include "RestServer/UpgradeFeature.h"
#include "StorageEngine/EngineSelectorFeature.h"
#include "StorageEngine/StorageEngine.h"
#include "Transaction/StandaloneContext.h"

View File

@ -213,43 +213,6 @@ bool upgradeSingleServerArangoSearchView0_1(
arangodb::velocypack::Slice const& /*upgradeParams*/) {
using arangodb::application_features::ApplicationServer;
// NOTE: during the upgrade 'ClusterFeature' is disabled which means 'ClusterFeature::validateOptions(...)'
// hasn't been called and server role in 'ServerState' is not set properly.
// In order to upgrade ArangoSearch views from version 0 to version 1 we need to
// differentiate between single server and cluster, therefore we temporary set role in 'ServerState',
// actually supplied by a user, only for the duration of task to avoid other upgrade tasks, that
// potentially rely on the original behavior, to be affected.
struct ServerRoleGuard {
ServerRoleGuard() {
auto& server = ApplicationServer::server();
auto* state = arangodb::ServerState::instance();
if (state && server.hasFeature<arangodb::ClusterFeature>()) {
auto const& clusterFeature = server.getFeature<arangodb::ClusterFeature>();
if (!clusterFeature.isEnabled()) {
auto const role = arangodb::ServerState::stringToRole(clusterFeature.myRole());
// only for cluster
if (arangodb::ServerState::isClusterRole(role)) {
_originalRole = state->getRole();
state->setRole(role);
_state = state;
}
}
}
}
~ServerRoleGuard() {
if (_state) {
// restore the original server role
_state->setRole(_originalRole);
}
}
arangodb::ServerState* _state{};
arangodb::ServerState::RoleEnum _originalRole{arangodb::ServerState::ROLE_UNDEFINED};
} guard;
if (!arangodb::ServerState::instance()->isSingleServer() &&
!arangodb::ServerState::instance()->isDBServer()) {
return true; // not applicable for other ServerState roles

View File

@ -173,7 +173,7 @@ RestStatus RestWalAccessHandler::execute() {
std::vector<std::string> suffixes = _request->decodedSuffixes();
if (suffixes.empty()) {
generateError(ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER,
"expected GET _api/wal/[tail|range|lastTick]>");
"expected GET /_api/wal/[tail|range|lastTick|open-transactions]>");
return RestStatus::DONE;
}
@ -195,7 +195,7 @@ RestStatus RestWalAccessHandler::execute() {
} else {
generateError(
ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER,
"expected GET _api/wal/[tail|range|lastTick|open-transactions]>");
"expected GET /_api/wal/[tail|range|lastTick|open-transactions]>");
}
return RestStatus::DONE;

View File

@ -26,6 +26,7 @@
#include "Aql/QueryList.h"
#include "Cluster/ClusterFeature.h"
#include "Cluster/ClusterInfo.h"
#include "Cluster/ClusterUpgradeFeature.h"
#include "Cluster/ServerState.h"
#include "FeaturePhases/ServerFeaturePhase.h"
#include "GeneralServer/AuthenticationFeature.h"
@ -59,7 +60,9 @@ using namespace arangodb;
using namespace arangodb::options;
BootstrapFeature::BootstrapFeature(application_features::ApplicationServer& server)
: ApplicationFeature(server, ::FEATURE_NAME), _isReady(false), _bark(false) {
: ApplicationFeature(server, ::FEATURE_NAME),
_isReady(false),
_bark(false) {
startsAfter<application_features::ServerFeaturePhase>();
startsAfter<SystemDatabaseFeature>();
@ -91,7 +94,7 @@ namespace {
/// Initialize certain agency entries, like Plan, system collections
/// and various similar things. Only runs through on a SINGLE coordinator.
/// must only return if we are bootstrap lead or bootstrap is done
/// must only return if we are bootstrap lead or bootstrap is done.
void raceForClusterBootstrap(BootstrapFeature& feature) {
AgencyComm agency;
auto& ci = feature.server().getFeature<ClusterFeature>().clusterInfo();
@ -185,6 +188,12 @@ void raceForClusterBootstrap(BootstrapFeature& feature) {
b.add(VPackValue(arangodb::ServerState::instance()->getId() + ": done"));
result = agency.setValue(::bootstrapKey, b.slice(), 0);
if (result.successful()) {
// store current version number in agency to avoid unnecessary upgrades
// to the same version
if (feature.server().hasFeature<ClusterUpgradeFeature>()) {
ClusterUpgradeFeature& clusterUpgradeFeature = feature.server().getFeature<ClusterUpgradeFeature>();
clusterUpgradeFeature.setBootstrapVersion();
}
return;
}
@ -277,6 +286,8 @@ void runActiveFailoverStart(std::string const& myId) {
} // namespace
void BootstrapFeature::start() {
auto& databaseFeature = server().getFeature<DatabaseFeature>();
arangodb::SystemDatabaseFeature::ptr vocbase =
server().hasFeature<arangodb::SystemDatabaseFeature>()
? server().getFeature<arangodb::SystemDatabaseFeature>().use()
@ -284,7 +295,6 @@ void BootstrapFeature::start() {
bool v8Enabled = V8DealerFeature::DEALER && V8DealerFeature::DEALER->isEnabled();
TRI_ASSERT(vocbase.get() != nullptr);
auto ss = ServerState::instance();
ServerState::RoleEnum role = ServerState::instance()->getRole();
if (ServerState::isRunningInCluster(role)) {
@ -295,7 +305,7 @@ void BootstrapFeature::start() {
LOG_TOPIC("724e0", DEBUG, Logger::STARTUP) << "Racing for cluster bootstrap...";
raceForClusterBootstrap(*this);
if (v8Enabled) {
if (v8Enabled && !databaseFeature.upgrade()) {
::runCoordinatorJS(vocbase.get());
}
} else if (ServerState::isDBServer(role)) {
@ -317,7 +327,7 @@ void BootstrapFeature::start() {
if (ServerState::isSingleServer(role) && AgencyCommManager::isEnabled()) {
::runActiveFailoverStart(myId);
} else {
ss->setFoxxmaster(myId); // could be empty, but set anyway
ServerState::instance()->setFoxxmaster(myId); // could be empty, but set anyway
}
if (v8Enabled) { // runs the single server bootstrap JS
@ -344,35 +354,17 @@ void BootstrapFeature::start() {
}
if (ServerState::isCoordinator(role)) {
LOG_TOPIC("4000c", DEBUG, arangodb::Logger::CLUSTER) << "waiting for our health entry to appear in Supervision/Health";
bool found = false;
AgencyComm agency;
int tries = 0;
while (++tries < 30) {
AgencyCommResult result = agency.getValues(::healthKey);
if (result.successful()) {
VPackSlice value = result.slice()[0].get(
std::vector<std::string>({AgencyCommManager::path(), "Supervision", "Health", ServerState::instance()->getId(), "Status"}));
if (value.isString() && !value.copyString().empty()) {
found = true;
break;
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
if (found) {
LOG_TOPIC("b0de6", DEBUG, arangodb::Logger::CLUSTER) << "found our health entry in Supervision/Health";
} else {
LOG_TOPIC("2c993", INFO, arangodb::Logger::CLUSTER) << "did not find our health entry after 15 s in Supervision/Health";
}
waitForHealthEntry();
}
if (!databaseFeature.upgrade()) {
LOG_TOPIC("cf3f4", INFO, arangodb::Logger::FIXME)
<< "ArangoDB (version " << ARANGODB_VERSION_FULL
<< ") is ready for business. Have fun!";
}
LOG_TOPIC("cf3f4", INFO, arangodb::Logger::FIXME)
<< "ArangoDB (version " << ARANGODB_VERSION_FULL
<< ") is ready for business. Have fun!";
if (_bark) {
LOG_TOPIC("bb9b7", INFO, arangodb::Logger::FIXME) << "The dog says: wau wau!";
LOG_TOPIC("bb9b7", INFO, arangodb::Logger::FIXME) << "The dog says: Гав гав";
}
_isReady = true;
@ -391,3 +383,27 @@ void BootstrapFeature::unprepare() {
}
}
}
void BootstrapFeature::waitForHealthEntry() {
LOG_TOPIC("4000c", DEBUG, arangodb::Logger::CLUSTER) << "waiting for our health entry to appear in Supervision/Health";
bool found = false;
AgencyComm agency;
int tries = 0;
while (++tries < 30) {
AgencyCommResult result = agency.getValues(::healthKey);
if (result.successful()) {
VPackSlice value = result.slice()[0].get(
std::vector<std::string>({AgencyCommManager::path(), "Supervision", "Health", ServerState::instance()->getId(), "Status"}));
if (value.isString() && !value.copyString().empty()) {
found = true;
break;
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
if (found) {
LOG_TOPIC("b0de6", DEBUG, arangodb::Logger::CLUSTER) << "found our health entry in Supervision/Health";
} else {
LOG_TOPIC("2c993", INFO, arangodb::Logger::CLUSTER) << "did not find our health entry after 15 s in Supervision/Health";
}
}

View File

@ -39,6 +39,9 @@ class BootstrapFeature final : public application_features::ApplicationFeature {
bool isReady() const { return _isReady; }
private:
void waitForHealthEntry();
private:
bool _isReady;
bool _bark;

View File

@ -143,6 +143,7 @@ class DatabaseFeature : public application_features::ApplicationFeature {
void enableCheckVersion() { _checkVersion = true; }
void enableUpgrade() { _upgrade = true; }
void disableUpgrade() { _upgrade = false; }
bool throwCollectionNotLoadedError() const {
return _throwCollectionNotLoadedError.load(std::memory_order_relaxed);
}

View File

@ -22,9 +22,14 @@
#include "UpgradeFeature.h"
#include "ApplicationFeatures/DaemonFeature.h"
#include "ApplicationFeatures/HttpEndpointProvider.h"
#include "ApplicationFeatures/GreetingsFeature.h"
#include "ApplicationFeatures/SupervisorFeature.h"
#include "Basics/application-exit.h"
#include "Basics/ScopeGuard.h"
#include "Cluster/ClusterFeature.h"
#include "Cluster/ServerState.h"
#include "FeaturePhases/AqlFeaturePhase.h"
#include "GeneralServer/AuthenticationFeature.h"
#include "Logger/LogMacros.h"
@ -32,6 +37,7 @@
#include "Logger/LoggerStream.h"
#include "ProgramOptions/ProgramOptions.h"
#include "ProgramOptions/Section.h"
#include "Pregel/PregelFeature.h"
#include "Replication/ReplicationFeature.h"
#include "RestServer/BootstrapFeature.h"
#include "RestServer/DatabaseFeature.h"
@ -110,32 +116,40 @@ void UpgradeFeature::validateOptions(std::shared_ptr<ProgramOptions> options) {
"'--database.upgrade-check false'";
FATAL_ERROR_EXIT();
}
if (!_upgrade) {
LOG_TOPIC("ed226", TRACE, arangodb::Logger::FIXME)
<< "executing upgrade check: not disabling server features";
return;
}
LOG_TOPIC("23525", INFO, arangodb::Logger::FIXME)
<< "executing upgrade procedure: disabling server features";
<< "executing upgrade procedure: disabling server features";
server().forceDisableFeatures(_nonServerFeatures);
std::vector<std::type_index> otherFeaturesToDisable = {
std::type_index(typeid(BootstrapFeature)),
std::type_index(typeid(HttpEndpointProvider)),
};
server().forceDisableFeatures(otherFeaturesToDisable);
// if we run the upgrade, we need to disable a few features that may get
// in the way...
if (ServerState::instance()->isCoordinator()) {
std::vector<std::type_index> otherFeaturesToDisable = {
std::type_index(typeid(DaemonFeature)),
std::type_index(typeid(GreetingsFeature)),
std::type_index(typeid(pregel::PregelFeature)),
std::type_index(typeid(SupervisorFeature))
};
server().forceDisableFeatures(otherFeaturesToDisable);
} else {
server().forceDisableFeatures(_nonServerFeatures);
std::vector<std::type_index> otherFeaturesToDisable = {
std::type_index(typeid(BootstrapFeature)),
std::type_index(typeid(HttpEndpointProvider))
};
server().forceDisableFeatures(otherFeaturesToDisable);
}
ReplicationFeature& replicationFeature = server().getFeature<ReplicationFeature>();
replicationFeature.disableReplicationApplier();
DatabaseFeature& database = server().getFeature<DatabaseFeature>();
database.enableUpgrade();
ClusterFeature& cluster = server().getFeature<ClusterFeature>();
cluster.forceDisable();
ServerState::instance()->setRole(ServerState::ROLE_SINGLE);
}
void UpgradeFeature::prepare() {
@ -149,7 +163,10 @@ void UpgradeFeature::start() {
// upgrade the database
if (_upgradeCheck) {
upgradeDatabase();
if (!ServerState::instance()->isCoordinator()) {
// no need to run local upgrades in the coordinator
upgradeLocalDatabase();
}
if (!init.restoreAdmin() && !init.defaultPassword().empty() && um != nullptr) {
um->updateUser("root", [&](auth::User& user) {
@ -194,15 +211,20 @@ void UpgradeFeature::start() {
*_result = EXIT_SUCCESS;
}
LOG_TOPIC("7da27", INFO, arangodb::Logger::STARTUP)
<< "server will now shut down due to upgrade, database initialization "
"or admin restoration.";
if (!ServerState::instance()->isCoordinator() || !_upgrade) {
LOG_TOPIC("7da27", INFO, arangodb::Logger::STARTUP)
<< "server will now shut down due to upgrade, database initialization "
"or admin restoration.";
server().beginShutdown();
// in the non-coordinator case, we are already done now and will shut down.
// in the coordinator case, the actual upgrade is performed by the
// ClusterUpgradeFeature, which is way later in the startup sequence.
server().beginShutdown();
}
}
}
void UpgradeFeature::upgradeDatabase() {
void UpgradeFeature::upgradeLocalDatabase() {
LOG_TOPIC("05dff", TRACE, arangodb::Logger::FIXME) << "starting database init/upgrade";
DatabaseFeature& databaseFeature = server().getFeature<DatabaseFeature>();

View File

@ -28,16 +28,30 @@
namespace arangodb {
// this feature is responsible for performing a database upgrade.
// it is only doing something if the server was started with the option
// `--database.auto-upgrade true` or `--database.check-version true`.
// On a coordinator this feature will *not* perform the actual upgrade,
// because it is too early in the sequence. Coordinator upgrades are
// performed by the ClusterUpgradeFeature, which is way later in the
// startup sequence, so it can use the full cluster functionality when run.
// after this feature has executed the upgrade, it will shut down the server.
// in the coordinator case, this feature will not shut down the server.
// instead, the shutdown is performed by the ClusterUpgradeFeature.
class UpgradeFeature final : public application_features::ApplicationFeature {
public:
UpgradeFeature(application_features::ApplicationServer& server, int* result,
std::vector<std::type_index> const& nonServerFeatures);
void addTask(methods::Upgrade::Task&& task);
void collectOptions(std::shared_ptr<options::ProgramOptions>) override final;
void validateOptions(std::shared_ptr<options::ProgramOptions>) override final;
void prepare() override final;
void start() override final;
void addTask(methods::Upgrade::Task&& task);
private:
void upgradeLocalDatabase();
private:
friend struct methods::Upgrade; // to allow access to '_tasks'
@ -45,8 +59,6 @@ class UpgradeFeature final : public application_features::ApplicationFeature {
bool _upgrade;
bool _upgradeCheck;
void upgradeDatabase();
int* _result;
std::vector<std::type_index> _nonServerFeatures;
std::vector<methods::Upgrade::Task> _tasks;

View File

@ -52,6 +52,7 @@
#include "Basics/FileUtils.h"
#include "Cache/CacheManagerFeature.h"
#include "Cluster/ClusterFeature.h"
#include "Cluster/ClusterUpgradeFeature.h"
#include "Cluster/MaintenanceFeature.h"
#include "Cluster/ReplicationTimeoutFeature.h"
#include "FeaturePhases/AgencyFeaturePhase.h"
@ -118,7 +119,6 @@
#endif
#include "IResearch/IResearchAnalyzerFeature.h"
#include "IResearch/IResearchAnalyzerCollectionFeature.h"
#include "IResearch/IResearchFeature.h"
// storage engines
@ -155,6 +155,7 @@ static int runServer(int argc, char** argv, ArangoGlobalContext& context) {
std::type_index(typeid(GreetingsFeature)),
std::type_index(typeid(HttpEndpointProvider)),
std::type_index(typeid(LoggerBufferFeature)),
std::type_index(typeid(pregel::PregelFeature)),
std::type_index(typeid(ServerFeature)),
std::type_index(typeid(SslServerFeature)),
std::type_index(typeid(StatisticsFeature)),
@ -184,6 +185,7 @@ static int runServer(int argc, char** argv, ArangoGlobalContext& context) {
server.addFeature<CacheManagerFeature>();
server.addFeature<CheckVersionFeature>(&ret, nonServerFeatures);
server.addFeature<ClusterFeature>();
server.addFeature<ClusterUpgradeFeature>();
server.addFeature<ConfigFeature>(name);
server.addFeature<ConsoleFeature>();
server.addFeature<DatabaseFeature>();
@ -259,7 +261,6 @@ static int runServer(int argc, char** argv, ArangoGlobalContext& context) {
server.addFeature<arangodb::iresearch::IResearchAnalyzerFeature>();
server.addFeature<arangodb::iresearch::IResearchFeature>();
server.addFeature<arangodb::IResearchAnalyzerCollectionFeature>();
// storage engines
server.addFeature<ClusterEngine>();

View File

@ -154,9 +154,6 @@ void RocksDBThrottle::OnFlushBegin(rocksdb::DB* db, const rocksdb::FlushJobInfo&
std::chrono::steady_clock::time_point osx_hack = std::chrono::steady_clock::now();
memcpy(gFlushStart, &osx_hack, sizeof(std::chrono::steady_clock::time_point));
AdjustThreadPriority(1);
return;
} // RocksDBThrottle::OnFlushBegin
void RocksDBThrottle::OnFlushCompleted(rocksdb::DB* db,
@ -241,8 +238,6 @@ void RocksDBThrottle::SetThrottleWriteRate(std::chrono::microseconds Micros,
LOG_TOPIC("7afe9", DEBUG, arangodb::Logger::ENGINES)
<< "SetThrottleWriteRate: Micros " << Micros.count() << ", Keys " << Keys
<< ", Bytes " << Bytes << ", IsLevel0 " << IsLevel0;
return;
} // RocksDBThrottle::SetThrottleWriteRate
void RocksDBThrottle::ThreadLoop() {
@ -508,13 +503,10 @@ void RocksDBThrottle::AdjustThreadPriority(int Adjustment) {
#ifndef _WIN32
// initialize thread infor if this the first time the thread has ever called
if (!gThreadPriority._baseSet) {
pid_t tid;
int ret_val;
tid = syscall(SYS_gettid);
pid_t tid = syscall(SYS_gettid);
if (-1 != (int)tid) {
errno = 0;
ret_val = getpriority(PRIO_PROCESS, tid);
int ret_val = getpriority(PRIO_PROCESS, tid);
// ret_val could be -1 legally, so double test
if (-1 != ret_val || 0 == errno) {
gThreadPriority._baseSet = true;

View File

@ -31,6 +31,7 @@
#include "ProgramOptions/ProgramOptions.h"
#include "ProgramOptions/Section.h"
#include "RestServer/SystemDatabaseFeature.h"
#include "RestServer/DatabaseFeature.h"
#include "Statistics/ConnectionStatistics.h"
#include "Statistics/Descriptions.h"
#include "Statistics/RequestStatistics.h"
@ -97,6 +98,12 @@ class StatisticsThread final : public Thread {
public:
void run() override {
auto& databaseFeature = server().getFeature<arangodb::DatabaseFeature>();
if (databaseFeature.upgrade()) {
// don't start the thread when we are running an upgrade
return;
}
uint64_t const MAX_SLEEP_TIME = 250;
uint64_t sleepTime = 100;

View File

@ -82,11 +82,12 @@ void EngineSelectorFeature::prepare() {
auto& databasePathFeature = server().getFeature<DatabasePathFeature>();
auto path = databasePathFeature.directory();
_engineFilePath = basics::FileUtils::buildFilename(path, "ENGINE");
LOG_TOPIC("98b5c", DEBUG, Logger::STARTUP)
<< "looking for previously selected engine in file '" << _engineFilePath << "'";
// file if engine in file does not match command-line option
if (basics::FileUtils::isRegularFile(_engineFilePath)) {
// fail if engine value in file does not match command-line option
if (!ServerState::instance()->isCoordinator() &&
basics::FileUtils::isRegularFile(_engineFilePath)) {
LOG_TOPIC("98b5c", DEBUG, Logger::STARTUP)
<< "looking for previously selected engine in file '" << _engineFilePath << "'";
try {
std::string content =
basics::StringUtils::trim(basics::FileUtils::slurp(_engineFilePath));
@ -170,7 +171,8 @@ void EngineSelectorFeature::start() {
TRI_ASSERT(ENGINE != nullptr);
// write engine File
if (!basics::FileUtils::isRegularFile(_engineFilePath)) {
if (!ServerState::instance()->isCoordinator() &&
!basics::FileUtils::isRegularFile(_engineFilePath)) {
try {
basics::FileUtils::spit(_engineFilePath, _engine, true);
} catch (std::exception const& ex) {

View File

@ -193,7 +193,13 @@ void Task::shutdownTasks() {
if (++iterations % 10 == 0) {
LOG_TOPIC("3966b", INFO, Logger::FIXME) << "waiting for " << size << " task(s) to complete";
} else if (iterations >= 25) {
LOG_TOPIC("54653", INFO, Logger::FIXME) << "giving up waiting for unfinished tasks";
MUTEX_LOCKER(guard, _tasksLock);
_tasks.clear();
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
}

View File

@ -59,6 +59,7 @@ class Task : public std::enable_shared_from_this<Task> {
private:
static Mutex _tasksLock;
// id => [ user, task ]
static std::unordered_map<std::string, std::pair<std::string, std::shared_ptr<Task>>> _tasks;
public:

View File

@ -99,14 +99,23 @@ UpgradeResult Upgrade::createDB(TRI_vocbase_t& vocbase,
}
UpgradeResult Upgrade::startup(TRI_vocbase_t& vocbase, bool isUpgrade, bool ignoreFileErrors) {
uint32_t clusterFlag = Flags::CLUSTER_LOCAL;
if (ServerState::instance()->isCoordinator()) {
// coordinators do not have any persistent data, so there is no VERSION file
// available. We don't know the previous version we are upgrading from, so we
// need to pretend no upgrade is necessary
return UpgradeResult(TRI_ERROR_NO_ERROR, methods::VersionResult::VERSION_MATCH);
}
uint32_t clusterFlag = 0;
if (ServerState::instance()->isSingleServer()) {
clusterFlag = Flags::CLUSTER_NONE;
} else {
clusterFlag = Flags::CLUSTER_LOCAL;
}
uint32_t dbflag = Flags::DATABASE_EXISTING;
auto vinfo = Version::check(&vocbase);
VersionResult vinfo = Version::check(&vocbase);
if (vinfo.status == methods::VersionResult::CANNOT_PARSE_VERSION_FILE ||
vinfo.status == methods::VersionResult::CANNOT_READ_VERSION_FILE) {
@ -202,11 +211,23 @@ UpgradeResult Upgrade::startup(TRI_vocbase_t& vocbase, bool isUpgrade, bool igno
return runTasks(vocbase, vinfo, params, clusterFlag, dbflag);
}
UpgradeResult methods::Upgrade::startupCoordinator(TRI_vocbase_t& vocbase) {
TRI_ASSERT(ServerState::instance()->isCoordinator());
// this will return a hard-coded version result
VersionResult vinfo = Version::check(&vocbase);
VPackSlice const params = VPackSlice::emptyObjectSlice();
return runTasks(vocbase, vinfo, params, Flags::CLUSTER_COORDINATOR_GLOBAL, Flags::DATABASE_UPGRADE);
}
/// @brief register tasks, only run once on startup
void methods::Upgrade::registerTasks(arangodb::UpgradeFeature& upgradeFeature) {
auto& _tasks = upgradeFeature._tasks;
TRI_ASSERT(_tasks.empty());
// note: all tasks here should be idempotent, so that they produce the same
// result when run again
addTask(upgradeFeature, "createSystemCollectionsAndIndices",
"creates all system collections including their indices",
/*system*/ Flags::DATABASE_ALL,
@ -252,10 +273,10 @@ UpgradeResult methods::Upgrade::runTasks(TRI_vocbase_t& vocbase, VersionResult&
arangodb::velocypack::Slice const& params,
uint32_t clusterFlag, uint32_t dbFlag) {
auto& upgradeFeature = vocbase.server().getFeature<arangodb::UpgradeFeature>();
auto& _tasks = upgradeFeature._tasks;
auto& tasks = upgradeFeature._tasks;
TRI_ASSERT(clusterFlag != 0 && dbFlag != 0);
TRI_ASSERT(!_tasks.empty()); // forgot to call registerTask!!
TRI_ASSERT(!tasks.empty()); // forgot to call registerTask!!
// needs to run in superuser scope, otherwise we get errors
ExecContextSuperuserScope scope;
// only local should actually write a VERSION file
@ -264,7 +285,7 @@ UpgradeResult methods::Upgrade::runTasks(TRI_vocbase_t& vocbase, VersionResult&
bool ranOnce = false;
// execute all tasks
for (Task const& t : _tasks) {
for (Task const& t : tasks) {
// check for system database
if (t.systemFlag == DATABASE_SYSTEM && !vocbase.isSystem()) {
LOG_TOPIC("bb1ef", DEBUG, Logger::STARTUP)
@ -273,7 +294,7 @@ UpgradeResult methods::Upgrade::runTasks(TRI_vocbase_t& vocbase, VersionResult&
}
if (t.systemFlag == DATABASE_EXCEPT_SYSTEM && vocbase.isSystem()) {
LOG_TOPIC("fd4e0", DEBUG, Logger::STARTUP)
<< "Upgrade: DB system, Skipping " << t.name;
<< "Upgrade: DB system, skipping " << t.name;
continue;
}
@ -288,7 +309,7 @@ UpgradeResult methods::Upgrade::runTasks(TRI_vocbase_t& vocbase, VersionResult&
if (it != vinfo.tasks.end()) {
if (it->second) {
LOG_TOPIC("ffe7f", DEBUG, Logger::STARTUP)
<< "Upgrade: Already executed, skipping " << t.name;
<< "Upgrade: already executed, skipping " << t.name;
continue;
}
vinfo.tasks.erase(it); // in case we encounter false
@ -306,24 +327,19 @@ UpgradeResult methods::Upgrade::runTasks(TRI_vocbase_t& vocbase, VersionResult&
continue;
}
LOG_TOPIC("15144", DEBUG, Logger::STARTUP) << "Upgrade: Executing " << t.name;
LOG_TOPIC("15144", DEBUG, Logger::STARTUP) << "Upgrade: executing " << t.name;
try {
bool ranTask = t.action(vocbase, params);
if (!ranTask) {
std::string msg =
"Executing " + t.name + " (" + t.description + ") failed.";
LOG_TOPIC("0a886", ERR, Logger::STARTUP) << msg << " Aborting procedure.";
"executing " + t.name + " (" + t.description + ") failed.";
LOG_TOPIC("0a886", ERR, Logger::STARTUP) << msg << " aborting upgrade procedure.";
return UpgradeResult(TRI_ERROR_INTERNAL, msg, vinfo.status);
}
} catch (arangodb::basics::Exception const& e) {
LOG_TOPIC("65ac5", ERR, Logger::STARTUP)
<< "Executing " << t.name << " (" << t.description
<< ") failed with error: " << e.what() << ". Aborting procedure.";
return UpgradeResult(e.code(), e.what(), vinfo.status);
} catch (std::exception const& e) {
LOG_TOPIC("022fe", ERR, Logger::STARTUP)
<< "Executing " << t.name << " (" << t.description
<< ") failed with error: " << e.what() << ". Aborting procedure.";
<< "executing " << t.name << " (" << t.description
<< ") failed with error: " << e.what() << ". aborting upgrade procedure.";
return UpgradeResult(TRI_ERROR_FAILED, e.what(), vinfo.status);
}
@ -344,7 +360,7 @@ UpgradeResult methods::Upgrade::runTasks(TRI_vocbase_t& vocbase, VersionResult&
if (isLocal) { // no need to write this for cluster bootstrap
// save even if no tasks were executed
LOG_TOPIC("e5a77", DEBUG, Logger::STARTUP)
<< "Upgrade: Writing VERSION file";
<< "Upgrade: writing VERSION file";
auto res = methods::Version::write(&vocbase, vinfo.tasks, /*sync*/ ranOnce);
if (res.fail()) {

View File

@ -96,10 +96,15 @@ struct Upgrade {
static UpgradeResult createDB(TRI_vocbase_t& vocbase,
arangodb::velocypack::Slice const& users);
/// @brief executed on startup
/// @brief executed on startup for non-coordinators
/// @param upgrade Perform an actual upgrade
/// Corresponds to upgrade-database.js
static UpgradeResult startup(TRI_vocbase_t& vocbase, bool upgrade, bool ignoreFileErrors);
/// @brief executed on startup for coordinators
/// @param upgrade Perform an actual upgrade
/// Corresponds to upgrade-database.js
static UpgradeResult startupCoordinator(TRI_vocbase_t& vocbase);
private:
/// @brief register tasks, only run once on startup

View File

@ -322,17 +322,17 @@ Result createSystemStatisticsCollections(TRI_vocbase_t& vocbase,
return {TRI_ERROR_NO_ERROR};
}
static Result createIndex(std::string const name, Index::IndexType type,
static Result createIndex(std::string const& name, Index::IndexType type,
std::vector<std::string> const& fields, bool unique, bool sparse,
std::vector<std::shared_ptr<LogicalCollection>>& collections) {
// Static helper function that wraps creating an index. If we fail to
// create an index with some indices created, we clean up by removing all
// collections later on. Find the collection by name
auto colIt = find_if(collections.begin(), collections.end(),
[name](std::shared_ptr<LogicalCollection> col) {
TRI_ASSERT(col != nullptr);
return col->name() == name;
});
auto colIt = std::find_if(collections.begin(), collections.end(),
[&name](std::shared_ptr<LogicalCollection> const& col) {
TRI_ASSERT(col != nullptr);
return col->name() == name;
});
if (colIt == collections.end()) {
return Result(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND,
"Collection " + name + " not found");
@ -363,7 +363,7 @@ Result createSystemStatisticsIndices(TRI_vocbase_t& vocbase,
return res;
}
}
return {TRI_ERROR_NO_ERROR};
return res;
}
Result createSystemCollectionsIndices(TRI_vocbase_t& vocbase,
@ -403,19 +403,17 @@ Result createSystemCollectionsIndices(TRI_vocbase_t& vocbase,
return res;
}
return {TRI_ERROR_NO_ERROR};
return res;
}
} // namespace
bool UpgradeTasks::createSystemCollectionsAndIndices(TRI_vocbase_t& vocbase,
arangodb::velocypack::Slice const& slice) {
Result res;
// This vector should after the call to ::createSystemCollections contain
// after the call to ::createSystemCollections this vector should contain
// a LogicalCollection for *every* (required) system collection.
std::vector<std::shared_ptr<LogicalCollection>> presentSystemCollections;
res = ::createSystemCollections(vocbase, presentSystemCollections);
Result res = ::createSystemCollections(vocbase, presentSystemCollections);
// TODO: Maybe check or assert that all collections are present (i.e. were
// present or created), raise an error if not?

View File

@ -25,6 +25,7 @@
#include "Basics/FileUtils.h"
#include "Basics/VelocyPackHelper.h"
#include "Basics/files.h"
#include "Cluster/ServerState.h"
#include "Logger/LogMacros.h"
#include "Logger/Logger.h"
#include "Logger/LoggerStream.h"
@ -90,6 +91,16 @@ VersionResult::StatusCode Version::compare(uint64_t current, uint64_t other) {
}
VersionResult Version::check(TRI_vocbase_t* vocbase) {
uint64_t lastVersion = UINT64_MAX;
uint64_t serverVersion = Version::current();
std::map<std::string, bool> tasks;
if (ServerState::instance()->isCoordinator()) {
// in a coordinator, we don't have any persistent data, so there is no VERSION
// file available. In this case we don't know the previous version we are
// upgrading from, so we can't do anything sensible here.
return VersionResult{VersionResult::VERSION_MATCH, serverVersion, serverVersion, tasks};
}
StorageEngine* engine = EngineSelectorFeature::ENGINE;
TRI_ASSERT(engine != nullptr);
@ -106,10 +117,6 @@ VersionResult Version::check(TRI_vocbase_t* vocbase) {
return VersionResult{VersionResult::CANNOT_READ_VERSION_FILE, 0, 0, {}};
}
uint64_t lastVersion = UINT64_MAX;
uint64_t serverVersion = Version::current();
std::map<std::string, bool> tasks;
try {
std::shared_ptr<VPackBuilder> parsed = velocypack::Parser::fromJson(versionInfo);
VPackSlice versionVals = parsed->slice();

View File

@ -78,7 +78,7 @@ ApplicationServer& ApplicationServer::server() {
}
ApplicationServer::ApplicationServer(std::shared_ptr<ProgramOptions> options,
const char* binaryPath)
char const* binaryPath)
: _state(State::UNINITIALIZED),
_options(options),
_binaryPath(binaryPath) {

View File

@ -488,13 +488,12 @@ std::vector<std::string> split(std::string const& source, std::string const& del
std::string trim(std::string const& sourceStr, std::string const& trimStr) {
size_t s = sourceStr.find_first_not_of(trimStr);
size_t e = sourceStr.find_last_not_of(trimStr);
if (s == std::string::npos) {
return std::string();
} else {
return std::string(sourceStr, s, e - s + 1);
}
size_t e = sourceStr.find_last_not_of(trimStr);
return std::string(sourceStr, s, e - s + 1);
}
void trimInPlace(std::string& str, std::string const& trimStr) {
@ -517,9 +516,8 @@ std::string lTrim(std::string const& str, std::string const& trimStr) {
if (s == std::string::npos) {
return std::string();
} else {
return std::string(str, s);
}
}
return std::string(str, s);
}
std::string rTrim(std::string const& sourceStr, std::string const& trimStr) {

View File

@ -44,12 +44,14 @@ LogThread::~LogThread() {
shutdown();
}
void LogThread::log(std::unique_ptr<LogMessage>& message) {
bool LogThread::log(std::unique_ptr<LogMessage>& message) {
if (MESSAGES->push(message.get())) {
// only release message if adding to the queue succeeded
// otherwise we would leak here
message.release();
return true;
}
return false;
}
void LogThread::flush() {

View File

@ -41,7 +41,7 @@ struct LogMessage;
class LogThread final : public Thread {
public:
static void log(std::unique_ptr<LogMessage>&);
static bool log(std::unique_ptr<LogMessage>&);
// flush all pending log messages
static void flush();

View File

@ -317,12 +317,6 @@ void Logger::log(char const* function, char const* file, int line,
}
#endif
if (!_active.load(std::memory_order_relaxed)) {
LogAppenderStdStream::writeLogMessage(STDERR_FILENO, (isatty(STDERR_FILENO) == 1),
level, message.data(), message.size(), true);
return;
}
std::stringstream out;
LogTimeFormats::writeTime(out, _timeFormat);
out << ' ';
@ -385,25 +379,36 @@ void Logger::log(char const* function, char const* file, int line,
// generate the complete message
out << message;
std::string ostreamContent = out.str();
if (!_active.load(std::memory_order_relaxed)) {
LogAppenderStdStream::writeLogMessage(STDERR_FILENO, (isatty(STDERR_FILENO) == 1),
level, ostreamContent.data(), ostreamContent.size(), true);
return;
}
size_t offset = ostreamContent.size() - message.size();
auto msg = std::make_unique<LogMessage>(level, topicId, std::move(ostreamContent), offset);
// now either queue or output the message
bool handled = false;
if (_threaded) {
try {
_loggingThread->log(msg);
bool const isDirectLogLevel =
(level == LogLevel::FATAL || level == LogLevel::ERR || level == LogLevel::WARN);
if (isDirectLogLevel) {
_loggingThread->flush();
handled = _loggingThread->log(msg);
if (handled) {
bool const isDirectLogLevel =
(level == LogLevel::FATAL || level == LogLevel::ERR || level == LogLevel::WARN);
if (isDirectLogLevel) {
_loggingThread->flush();
}
}
return;
} catch (...) {
// fall-through to non-threaded logging
}
}
LogAppender::log(msg.get());
if (!handled) {
LogAppender::log(msg.get());
}
}
////////////////////////////////////////////////////////////////////////////////

View File

@ -48,8 +48,6 @@ LoggerStream::~LoggerStream() {
// print a hex representation of the binary data
LoggerStream& LoggerStream::operator<<(Logger::BINARY const& binary) {
try {
std::ostringstream tmp;
uint8_t const* ptr = static_cast<uint8_t const*>(binary.baseAddress);
uint8_t const* end = ptr + binary.size;
@ -59,11 +57,10 @@ LoggerStream& LoggerStream::operator<<(Logger::BINARY const& binary) {
uint8_t n1 = n >> 4;
uint8_t n2 = n & 0x0F;
tmp << "\\x" << static_cast<char>((n1 < 10) ? ('0' + n1) : ('A' + n1 - 10))
<< static_cast<char>((n2 < 10) ? ('0' + n2) : ('A' + n2 - 10));
_out << "\\x" << static_cast<char>((n1 < 10) ? ('0' + n1) : ('A' + n1 - 10))
<< static_cast<char>((n2 < 10) ? ('0' + n2) : ('A' + n2 - 10));
++ptr;
}
_out << tmp.str();
} catch (...) {
// ignore any errors here. logging should not have side effects
}
@ -84,12 +81,10 @@ LoggerStream& LoggerStream::operator<<(Logger::CHARS const& data) {
LoggerStream& LoggerStream::operator<<(Logger::RANGE const& range) {
try {
std::ostringstream tmp;
tmp << range.baseAddress << " - "
<< static_cast<void const*>(static_cast<char const*>(range.baseAddress) +
range.size)
<< " (" << range.size << " bytes)";
_out << tmp.str();
_out << range.baseAddress << " - "
<< static_cast<void const*>(static_cast<char const*>(range.baseAddress) +
range.size)
<< " (" << range.size << " bytes)";
} catch (...) {
// ignore any errors here. logging should not have side effects
}

View File

@ -107,12 +107,17 @@ void Version::initialize() {
#else
Values["debug"] = "false";
#endif
#ifdef ARANGODB_USE_IPO
Values["ipo"] = "true";
#else
Values["ipo"] = "false";
#endif
#ifdef NDEBUG
Values["ndebug"] = "true";
#else
Values["ndebug"] = "false";
#endif
#if defined(ARCHITECTURE_OPTIMIZATIONS)
#ifdef ARCHITECTURE_OPTIMIZATIONS
Values["optimization-flags"] = std::string(ARCHITECTURE_OPTIMIZATIONS);
#endif
Values["endianness"] = getEndianness();