mirror of https://gitee.com/bigwinds/arangodb
Maintenance Fixes (#6284)
* Clean up for `FIXMEMAINTENANCE` comments: removed race condition, added errors and `notify()`s. * Removed dublicated code. * Added requested changes. Added error reporting for `UpdateCollection`. * Make it compile. Add missing `notify()`. * `CreateCollection` generates errors in all code paths. * Fixed catch test.
This commit is contained in:
parent
679c6904f4
commit
63d9cfa081
|
@ -239,14 +239,21 @@ bool Inception::restartingActiveAgent() {
|
|||
auto comres = cc->syncRequest(
|
||||
clientId, 1, p, rest::RequestType::POST, path, greetstr,
|
||||
std::unordered_map<std::string, std::string>(), 2.0);
|
||||
// FIXMEMAINTENANCE: handle case of result not 200
|
||||
if (comres->status == CL_COMM_SENT) { // WARN: What if result not 200?
|
||||
|
||||
if (comres->status == CL_COMM_SENT &&
|
||||
comres->result->getHttpReturnCode() == 200) {
|
||||
auto const theirConfigVP = comres->result->getBodyVelocyPack();
|
||||
auto const& theirConfig = theirConfigVP->slice();
|
||||
// FIXMEMAINTENANCE: handle the case that tcc is not an object such
|
||||
// that the next command would throw.
|
||||
auto const& tcc = theirConfig.get("configuration");
|
||||
auto const& theirId = tcc.get("id").copyString();
|
||||
|
||||
if (!theirConfig.isObject()) {
|
||||
continue ;
|
||||
}
|
||||
auto const& tcc = theirConfig.get("configuration");
|
||||
|
||||
if (!tcc.isObject() || !tcc.hasKey("id")) {
|
||||
continue ;
|
||||
}
|
||||
auto const& theirId = tcc.get("id").copyString();
|
||||
|
||||
_agent->updatePeerEndpoint(theirId, p);
|
||||
informed.push_back(p);
|
||||
|
|
|
@ -91,9 +91,9 @@ void ActionBase::notify() {
|
|||
|
||||
LOG_TOPIC(DEBUG, Logger::MAINTENANCE)
|
||||
<< "Job " << _description << " calling syncDBServerStatusQuo";
|
||||
auto cf = ApplicationServer::getFeature<ClusterFeature>("Cluster");
|
||||
if (cf != nullptr) {
|
||||
cf->syncDBServerStatusQuo();
|
||||
auto cf = ApplicationServer::getFeature<ClusterFeature>("Cluster");
|
||||
if (cf != nullptr) {
|
||||
cf->syncDBServerStatusQuo();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -149,24 +149,18 @@ std::shared_ptr<Action> ActionBase::getPostAction() {
|
|||
|
||||
|
||||
// FIXMEMAINTENANCE: Code path could corrupt registry object because
|
||||
// this does not hold lock. Also, current implementation is a race condition
|
||||
// where another thread could pick this up.
|
||||
// this does not hold lock.
|
||||
|
||||
/// @brief Create a new action that will start after this action successfully completes
|
||||
void ActionBase::createPostAction(std::shared_ptr<ActionDescription> const& description) {
|
||||
|
||||
// preAction() sets up what we need
|
||||
// postAction() sets up what we need
|
||||
_postAction = description;
|
||||
std::shared_ptr<Action> new_action = _feature.postAction(description);
|
||||
|
||||
// shift from EXECUTING to WAITINGPOST ... EXECUTING is set to block other
|
||||
// workers from picking it up
|
||||
if (_postAction && new_action->ok()) {
|
||||
new_action->setState(WAITINGPOST);
|
||||
if (_postAction) {
|
||||
_feature.postAction(description);
|
||||
} else {
|
||||
_result.reset(TRI_ERROR_BAD_PARAMETER, "preAction rejected parameters for _postAction.");
|
||||
} // else
|
||||
|
||||
_result.reset(TRI_ERROR_BAD_PARAMETER, "postAction rejected parameters for _postAction.");
|
||||
}
|
||||
} // ActionBase::createPostAction
|
||||
|
||||
|
||||
|
|
|
@ -3589,10 +3589,6 @@ void ClusterInfo::invalidatePlan() {
|
|||
WRITE_LOCKER(writeLocker, _planProt.lock);
|
||||
_planProt.isValid = false;
|
||||
}
|
||||
{
|
||||
WRITE_LOCKER(writeLocker, _planProt.lock);
|
||||
_planProt.isValid = false;
|
||||
}
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -3689,12 +3685,12 @@ arangodb::Result ClusterInfo::getShardServers(
|
|||
if (it != _shardServers.end()) {
|
||||
servers = (*it).second;
|
||||
return arangodb::Result();
|
||||
}
|
||||
}
|
||||
|
||||
LOG_TOPIC(DEBUG, Logger::CLUSTER)
|
||||
<< "Strange, did not find shard in _shardServers: " << shardId;
|
||||
return arangodb::Result(TRI_ERROR_FAILED);
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -53,7 +53,7 @@ CreateCollection::CreateCollection(
|
|||
: ActionBase(feature, desc) {
|
||||
|
||||
std::stringstream error;
|
||||
|
||||
|
||||
if (!desc.has(DATABASE)) {
|
||||
error << "database must be specified. ";
|
||||
}
|
||||
|
@ -95,7 +95,7 @@ CreateCollection::CreateCollection(
|
|||
_result.reset(TRI_ERROR_INTERNAL, error.str());
|
||||
setState(FAILED);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
@ -118,23 +118,23 @@ bool CreateCollection::first() {
|
|||
|
||||
DatabaseGuard guard(database);
|
||||
auto vocbase = &guard.database();
|
||||
|
||||
|
||||
auto cluster =
|
||||
ApplicationServer::getFeature<ClusterFeature>("Cluster");
|
||||
|
||||
|
||||
bool waitForRepl =
|
||||
(props.hasKey(WAIT_FOR_SYNC_REPL) &&
|
||||
props.get(WAIT_FOR_SYNC_REPL).isBool()) ?
|
||||
props.get(WAIT_FOR_SYNC_REPL).getBool() :
|
||||
cluster->createWaitsForSyncReplication();
|
||||
|
||||
|
||||
bool enforceReplFact =
|
||||
(props.hasKey(ENF_REPL_FACT) &&
|
||||
props.get(ENF_REPL_FACT).isBool()) ?
|
||||
props.get(ENF_REPL_FACT).getBool() : true;
|
||||
|
||||
|
||||
TRI_col_type_e type = static_cast<TRI_col_type_e>(props.get(TYPE).getNumber<uint32_t>());
|
||||
|
||||
|
||||
VPackBuilder docket;
|
||||
{ VPackObjectBuilder d(&docket);
|
||||
for (auto const& i : VPackObjectIterator(props)) {
|
||||
|
@ -150,7 +150,7 @@ bool CreateCollection::first() {
|
|||
}
|
||||
docket.add("planId", VPackValue(collection));
|
||||
}
|
||||
|
||||
|
||||
_result = Collections::create(
|
||||
vocbase, shard, type, docket.slice(), waitForRepl, enforceReplFact,
|
||||
[=](LogicalCollection& col) {
|
||||
|
@ -161,7 +161,7 @@ bool CreateCollection::first() {
|
|||
col.followers()->clear();
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
if (_result.fail()) {
|
||||
std::stringstream error;
|
||||
error << "creating local shard '" << database << "/" << shard
|
||||
|
@ -169,33 +169,20 @@ bool CreateCollection::first() {
|
|||
<< _result;
|
||||
LOG_TOPIC(ERR, Logger::MAINTENANCE) << error.str();
|
||||
|
||||
// Error report for phaseTwo
|
||||
VPackBuilder eb;
|
||||
{ VPackObjectBuilder o(&eb);
|
||||
eb.add("error", VPackValue(true));
|
||||
eb.add("errorMessage", VPackValue(_result.errorMessage()));
|
||||
eb.add("errorNum", VPackValue(_result.errorNumber()));
|
||||
eb.add(VPackValue("indexes"));
|
||||
{ VPackArrayBuilder a(&eb); } // []
|
||||
eb.add(VPackValue("servers"));
|
||||
{VPackArrayBuilder a(&eb); // [serverId]
|
||||
eb.add(VPackValue(_description.get(SERVER_ID))); }}
|
||||
|
||||
// Steal buffer for maintenance feature
|
||||
_feature.storeShardError(database, collection, shard, eb.steal());
|
||||
|
||||
_result.reset(TRI_ERROR_FAILED, error.str());
|
||||
// FIXMEMAINTENANCE: notify here?
|
||||
return false;
|
||||
}
|
||||
|
||||
} catch (std::exception const& e) { // Guard failed?
|
||||
|
||||
} catch (std::exception const& e) {
|
||||
std::stringstream error;
|
||||
error << "action " << _description << " failed with exception " << e.what();
|
||||
LOG_TOPIC(WARN, Logger::MAINTENANCE) << error.str();
|
||||
_result.reset(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND, error.str());
|
||||
// FIXMEMAINTENANCE: notify here?
|
||||
return false;
|
||||
_result.reset(TRI_ERROR_FAILED, error.str());
|
||||
|
||||
}
|
||||
|
||||
if (_result.fail()) {
|
||||
_feature.storeShardError(database, collection, shard,
|
||||
_description.get(SERVER_ID), _result);
|
||||
}
|
||||
|
||||
notify();
|
||||
|
|
|
@ -42,7 +42,7 @@ CreateDatabase::CreateDatabase(
|
|||
: ActionBase(feature, desc) {
|
||||
|
||||
std::stringstream error;
|
||||
|
||||
|
||||
if (!desc.has(DATABASE)) {
|
||||
error << "database must be specified";
|
||||
}
|
||||
|
@ -53,7 +53,7 @@ CreateDatabase::CreateDatabase(
|
|||
_result.reset(TRI_ERROR_INTERNAL, error.str());
|
||||
setState(FAILED);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
CreateDatabase::~CreateDatabase() {};
|
||||
|
@ -69,37 +69,27 @@ bool CreateDatabase::first() {
|
|||
try {
|
||||
|
||||
DatabaseGuard guard("_system");
|
||||
|
||||
|
||||
// Assertion in constructor makes sure that we have DATABASE.
|
||||
_result = Databases::create(_description.get(DATABASE), users, properties());
|
||||
if (!_result.ok()) {
|
||||
LOG_TOPIC(ERR, Logger::MAINTENANCE)
|
||||
<< "CreateDatabase: failed to create database " << database << ": " << _result;
|
||||
|
||||
VPackBuilder eb;
|
||||
{ VPackObjectBuilder b(&eb);
|
||||
eb.add(NAME, VPackValue(database));
|
||||
eb.add("error", VPackValue(true));
|
||||
eb.add("errorNum", VPackValue(_result.errorNumber()));
|
||||
eb.add("errorMessage", VPackValue(_result.errorMessage())); }
|
||||
|
||||
_feature.storeDBError(database, eb.steal());
|
||||
// FIXMEMAINTENANCE: notify here?
|
||||
return false;
|
||||
_feature.storeDBError(database, _result);
|
||||
} else {
|
||||
LOG_TOPIC(INFO, Logger::MAINTENANCE)
|
||||
<< "CreateDatabase: database " << database << " created";
|
||||
}
|
||||
|
||||
LOG_TOPIC(INFO, Logger::MAINTENANCE)
|
||||
<< "CreateDatabase: database " << database << " created";
|
||||
|
||||
} catch (std::exception const& e) {
|
||||
std::stringstream error;
|
||||
error << "action " << _description << " failed with exception " << e.what();
|
||||
LOG_TOPIC(ERR, Logger::MAINTENANCE) << "CreateDatabase: " << error.str();
|
||||
_result.reset(TRI_ERROR_INTERNAL, error.str());
|
||||
// FIXMEMAINTENANCE: notify here?
|
||||
return false;
|
||||
_feature.storeDBError(database, _result);
|
||||
}
|
||||
|
||||
// notify always, either error or success
|
||||
notify();
|
||||
return false;
|
||||
|
||||
|
|
|
@ -41,9 +41,9 @@ DropCollection::DropCollection(
|
|||
ActionBase(feature, d) {
|
||||
|
||||
std::stringstream error;
|
||||
|
||||
|
||||
if (!d.has(COLLECTION)) {
|
||||
error << "collection must be specified. ";
|
||||
error << "collection must be specified. ";
|
||||
}
|
||||
TRI_ASSERT(d.has(COLLECTION));
|
||||
|
||||
|
@ -100,5 +100,5 @@ bool DropCollection::first() {
|
|||
|
||||
notify();
|
||||
return false;
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -92,22 +92,9 @@ bool DropIndex::first() {
|
|||
return false;
|
||||
}
|
||||
|
||||
// FIXMEMAINTENANCE: Why doing the actual work in a callback?
|
||||
Result found = methods::Collections::lookup(
|
||||
vocbase, collection, [&](LogicalCollection& coll) {
|
||||
LOG_TOPIC(DEBUG, Logger::MAINTENANCE)
|
||||
<< "Dropping local index " + collection + "/" + id;
|
||||
_result = Indexes::drop(&coll, index.slice());
|
||||
});
|
||||
|
||||
if (found.fail()) {
|
||||
std::stringstream error;
|
||||
error << "failed to lookup local collection " << collection
|
||||
<< "in database " + database;
|
||||
LOG_TOPIC(ERR, Logger::MAINTENANCE) << "DropIndex: " << error.str();
|
||||
_result.reset(TRI_ERROR_ARANGO_INDEX_NOT_FOUND, error.str());
|
||||
return false;
|
||||
}
|
||||
LOG_TOPIC(DEBUG, Logger::MAINTENANCE)
|
||||
<< "Dropping local index " + collection + "/" + id;
|
||||
_result = Indexes::drop(col.get(), index.slice());
|
||||
|
||||
} catch (std::exception const& e) {
|
||||
std::stringstream error;
|
||||
|
|
|
@ -182,6 +182,11 @@ static VPackBuilder compareIndexes(
|
|||
}
|
||||
if (!haveError) {
|
||||
builder.add(pindex);
|
||||
} else {
|
||||
LOG_TOPIC(DEBUG, Logger::MAINTENANCE)
|
||||
<< "Previous failure exists for index " << planIdS << " on shard "
|
||||
<< dbname << "/" << shname << " for central " << dbname << "/"
|
||||
<< collname <<"- skipping";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -209,17 +214,19 @@ void handlePlanShard(
|
|||
bool leading = lcol.get(LEADER).copyString().empty();
|
||||
auto const properties = compareRelevantProps(cprops, lcol);
|
||||
|
||||
auto fullShardLabel = dbname + "/" + colname + "/" + shname;
|
||||
|
||||
// If comparison has brought any updates
|
||||
if (properties->slice() != VPackSlice::emptyObjectSlice()
|
||||
|| leading != shouldBeLeading) {
|
||||
|
||||
if (errors.shards.find(dbname + "/" + colname + "/" + shname) ==
|
||||
if (errors.shards.find(fullShardLabel) ==
|
||||
errors.shards.end()) {
|
||||
actions.emplace_back(
|
||||
ActionDescription(
|
||||
{{NAME, "UpdateCollection"}, {DATABASE, dbname}, {COLLECTION, shname},
|
||||
{LEADER, shouldBeLeading ? std::string() : leaderId},
|
||||
{LOCAL_LEADER, lcol.get(LEADER).copyString()}},
|
||||
{{NAME, "UpdateCollection"}, {DATABASE, dbname}, {COLLECTION, colname},
|
||||
{SHARD, shname}, {LEADER, shouldBeLeading ? std::string() : leaderId},
|
||||
{SERVER_ID, serverId}, {LOCAL_LEADER, lcol.get(LEADER).copyString()}},
|
||||
properties));
|
||||
} else {
|
||||
LOG_TOPIC(DEBUG, Logger::MAINTENANCE)
|
||||
|
@ -236,12 +243,15 @@ void handlePlanShard(
|
|||
auto difference = compareIndexes(dbname, colname, shname,
|
||||
pindexes, lindexes, errors, indis);
|
||||
|
||||
// Index errors are checked in `compareIndexes`. THe loop below only
|
||||
// cares about those indexes that have no error.
|
||||
if (difference.slice().isArray()) {
|
||||
for (auto const& index : VPackArrayIterator(difference.slice())) {
|
||||
actions.emplace_back(
|
||||
ActionDescription({{NAME, "EnsureIndex"}, {DATABASE, dbname},
|
||||
{COLLECTION, colname}, {TYPE, index.get(TYPE).copyString()},
|
||||
{FIELDS, index.get(FIELDS).toJson()}, {SHARD, shname}, {ID, index.get(ID).copyString()}},
|
||||
{FIELDS, index.get(FIELDS).toJson()}, {SHARD, shname},
|
||||
{ID, index.get(ID).copyString()}},
|
||||
std::make_shared<VPackBuilder>(index)));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -42,7 +42,7 @@ using namespace arangodb::maintenance;
|
|||
MaintenanceFeature::MaintenanceFeature(application_features::ApplicationServer& server)
|
||||
: ApplicationFeature(server, "Maintenance"),
|
||||
_forceActivation(false),
|
||||
_maintenanceThreadsMax(2) {
|
||||
_maintenanceThreadsMax(2) {
|
||||
// the number of threads will be adjusted later. it's just that we want to initialize all members properly
|
||||
|
||||
// this feature has to know the role of this server in its `start`. The role
|
||||
|
@ -106,7 +106,7 @@ void MaintenanceFeature::prepare() {
|
|||
|
||||
void MaintenanceFeature::start() {
|
||||
auto serverState = ServerState::instance();
|
||||
|
||||
|
||||
// _forceActivation is set by the catch tests
|
||||
if (!_forceActivation &&
|
||||
(serverState->isAgent() || serverState->isSingleServer())) {
|
||||
|
@ -114,7 +114,7 @@ void MaintenanceFeature::start() {
|
|||
<< " for single-server or agents.";
|
||||
return ;
|
||||
}
|
||||
|
||||
|
||||
// start threads
|
||||
for (uint32_t loop = 0; loop < _maintenanceThreadsMax; ++loop) {
|
||||
auto newWorker = std::make_unique<maintenance::MaintenanceWorker>(*this);
|
||||
|
@ -169,12 +169,6 @@ Result MaintenanceFeature::deleteAction(uint64_t action_id) {
|
|||
|
||||
} // MaintenanceFeature::deleteAction
|
||||
|
||||
|
||||
// FIXMEMAINTENANCE: None of the addAction() and createAction() routines
|
||||
// explicitly check to see if construction of action set FAILED.
|
||||
// Therefore it is possible for an "executeNow" action to start running
|
||||
// with known invalid parameters.
|
||||
|
||||
/// @brief This is the API for creating an Action and executing it.
|
||||
/// Execution can be immediate by calling thread, or asynchronous via thread pool.
|
||||
/// not yet: ActionDescription parameter will be MOVED to new object.
|
||||
|
@ -195,9 +189,10 @@ Result MaintenanceFeature::addAction(
|
|||
// similar action not in the queue (or at least no longer viable)
|
||||
if (curAction == nullptr || curAction->done()) {
|
||||
|
||||
createAction(newAction, executeNow);
|
||||
|
||||
if (!newAction || !newAction->ok()) {
|
||||
if (newAction && newAction->ok()) {
|
||||
// Register action only if construction was ok
|
||||
registerAction(newAction, executeNow);
|
||||
} else {
|
||||
/// something failed in action creation ... go check logs
|
||||
result.reset(TRI_ERROR_BAD_PARAMETER, "createAction rejected parameters.");
|
||||
} // if
|
||||
|
@ -246,7 +241,7 @@ Result MaintenanceFeature::addAction(
|
|||
|
||||
// similar action not in the queue (or at least no longer viable)
|
||||
if (!curAction || curAction->done()) {
|
||||
newAction = createAction(description, executeNow);
|
||||
newAction = createAndRegisterAction(description, executeNow);
|
||||
|
||||
if (!newAction || !newAction->ok()) {
|
||||
/// something failed in action creation ... go check logs
|
||||
|
@ -275,7 +270,7 @@ Result MaintenanceFeature::addAction(
|
|||
std::shared_ptr<Action> MaintenanceFeature::preAction(
|
||||
std::shared_ptr<ActionDescription> const & description) {
|
||||
|
||||
return createAction(description, true);
|
||||
return createAndRegisterAction(description, true);
|
||||
|
||||
} // MaintenanceFeature::preAction
|
||||
|
||||
|
@ -283,14 +278,22 @@ std::shared_ptr<Action> MaintenanceFeature::preAction(
|
|||
std::shared_ptr<Action> MaintenanceFeature::postAction(
|
||||
std::shared_ptr<ActionDescription> const & description) {
|
||||
|
||||
return createAction(description, false);
|
||||
auto action = createAction(description);
|
||||
|
||||
if (action->ok()) {
|
||||
action->setState(WAITINGPOST);
|
||||
registerAction(action, false);
|
||||
}
|
||||
|
||||
return action;
|
||||
} // MaintenanceFeature::postAction
|
||||
|
||||
|
||||
void MaintenanceFeature::createAction(
|
||||
void MaintenanceFeature::registerAction(
|
||||
std::shared_ptr<Action> action, bool executeNow) {
|
||||
|
||||
// Assumes write lock on _actionRegistryLock
|
||||
|
||||
// mark as executing so no other workers accidentally grab it
|
||||
if (executeNow) {
|
||||
action->setState(maintenance::EXECUTING);
|
||||
|
@ -311,8 +314,7 @@ void MaintenanceFeature::createAction(
|
|||
|
||||
|
||||
std::shared_ptr<Action> MaintenanceFeature::createAction(
|
||||
std::shared_ptr<ActionDescription> const & description,
|
||||
bool executeNow) {
|
||||
std::shared_ptr<ActionDescription> const & description) {
|
||||
|
||||
// write lock via _actionRegistryLock is assumed held
|
||||
std::shared_ptr<Action> newAction;
|
||||
|
@ -324,19 +326,27 @@ std::shared_ptr<Action> MaintenanceFeature::createAction(
|
|||
newAction = std::make_shared<Action>(*this, *description);
|
||||
|
||||
// if a new action constructed successfully
|
||||
if (newAction->ok()) {
|
||||
|
||||
createAction(newAction, executeNow);
|
||||
|
||||
} else {
|
||||
if (!newAction->ok()) {
|
||||
LOG_TOPIC(ERR, Logger::MAINTENANCE)
|
||||
<< "createAction: unknown action name given, \"" << name.c_str() << "\", or other construction failure.";
|
||||
} // else
|
||||
}
|
||||
|
||||
return newAction;
|
||||
|
||||
} // if
|
||||
|
||||
std::shared_ptr<Action> MaintenanceFeature::createAndRegisterAction(
|
||||
std::shared_ptr<ActionDescription> const & description, bool executeNow) {
|
||||
|
||||
std::shared_ptr<Action> newAction = createAction(description);
|
||||
|
||||
if (newAction->ok()) {
|
||||
registerAction(newAction, executeNow);
|
||||
}
|
||||
|
||||
return newAction;
|
||||
}
|
||||
|
||||
|
||||
std::shared_ptr<Action> MaintenanceFeature::findAction(
|
||||
std::shared_ptr<ActionDescription> const description) {
|
||||
|
@ -449,6 +459,19 @@ std::string MaintenanceFeature::toJson(VPackBuilder & builder) {
|
|||
|
||||
std::string const SLASH("/");
|
||||
|
||||
arangodb::Result MaintenanceFeature::storeDBError (
|
||||
std::string const& database, Result const& failure)
|
||||
{
|
||||
VPackBuilder eb;
|
||||
{ VPackObjectBuilder b(&eb);
|
||||
eb.add(NAME, VPackValue(database));
|
||||
eb.add("error", VPackValue(true));
|
||||
eb.add("errorNum", VPackValue(failure.errorNumber()));
|
||||
eb.add("errorMessage", VPackValue(failure.errorMessage())); }
|
||||
|
||||
return storeDBError(database, eb.steal());
|
||||
}
|
||||
|
||||
arangodb::Result MaintenanceFeature::storeDBError (
|
||||
std::string const& database, std::shared_ptr<VPackBuffer<uint8_t>> error) {
|
||||
|
||||
|
@ -498,6 +521,25 @@ arangodb::Result MaintenanceFeature::removeDBError (
|
|||
|
||||
}
|
||||
|
||||
arangodb::Result MaintenanceFeature::storeShardError (
|
||||
std::string const& database, std::string const& collection,
|
||||
std::string const& shard, std::string const& serverId,
|
||||
arangodb::Result const& failure)
|
||||
{
|
||||
VPackBuilder eb;
|
||||
{ VPackObjectBuilder o(&eb);
|
||||
eb.add("error", VPackValue(true));
|
||||
eb.add("errorMessage", VPackValue(failure.errorMessage()));
|
||||
eb.add("errorNum", VPackValue(failure.errorNumber()));
|
||||
eb.add(VPackValue("indexes"));
|
||||
{ VPackArrayBuilder a(&eb); } // []
|
||||
eb.add(VPackValue("servers"));
|
||||
{VPackArrayBuilder a(&eb); // [serverId]
|
||||
eb.add(VPackValue(serverId)); }}
|
||||
|
||||
return storeShardError(database, collection, shard, eb.steal());
|
||||
}
|
||||
|
||||
arangodb::Result MaintenanceFeature::storeShardError (
|
||||
std::string const& database, std::string const& collection,
|
||||
std::string const& shard, std::shared_ptr<VPackBuffer<uint8_t>> error) {
|
||||
|
|
|
@ -114,12 +114,15 @@ class MaintenanceFeature : public application_features::ApplicationFeature {
|
|||
|
||||
protected:
|
||||
std::shared_ptr<maintenance::Action> createAction(
|
||||
std::shared_ptr<maintenance::ActionDescription> const & description);
|
||||
|
||||
void registerAction(
|
||||
std::shared_ptr<maintenance::Action> action, bool executeNow);
|
||||
|
||||
std::shared_ptr<maintenance::Action> createAndRegisterAction(
|
||||
std::shared_ptr<maintenance::ActionDescription> const & description,
|
||||
bool executeNow);
|
||||
|
||||
void createAction(
|
||||
std::shared_ptr<maintenance::Action> action, bool executeNow);
|
||||
|
||||
public:
|
||||
/// @brief This API will attempt to fail an existing Action that is waiting
|
||||
/// or executing. Will not fail Actions that have already succeeded or failed.
|
||||
|
@ -212,6 +215,11 @@ public:
|
|||
std::string const& database, std::string const& collection,
|
||||
std::string const& shard, std::shared_ptr<VPackBuffer<uint8_t>> error);
|
||||
|
||||
arangodb::Result storeShardError (
|
||||
std::string const& database, std::string const& collection,
|
||||
std::string const& shard, std::string const& serverId,
|
||||
arangodb::Result const& failure);
|
||||
|
||||
/**
|
||||
* @brief get all pending shard errors
|
||||
*
|
||||
|
@ -251,6 +259,9 @@ public:
|
|||
arangodb::Result storeDBError (
|
||||
std::string const& database, std::shared_ptr<VPackBuffer<uint8_t>> error);
|
||||
|
||||
arangodb::Result storeDBError (
|
||||
std::string const& database, Result const& failure);
|
||||
|
||||
/**
|
||||
* @brief get all pending shard errors
|
||||
*
|
||||
|
|
|
@ -32,6 +32,7 @@
|
|||
#include "VocBase/LogicalCollection.h"
|
||||
#include "VocBase/Methods/Collections.h"
|
||||
#include "VocBase/Methods/Databases.h"
|
||||
#include "Cluster/MaintenanceFeature.h"
|
||||
|
||||
|
||||
using namespace arangodb;
|
||||
|
@ -44,24 +45,29 @@ UpdateCollection::UpdateCollection(
|
|||
ActionBase(feature, desc) {
|
||||
|
||||
std::stringstream error;
|
||||
|
||||
|
||||
if (!desc.has(COLLECTION)) {
|
||||
error << "collection must be specified. ";
|
||||
}
|
||||
TRI_ASSERT(desc.has(COLLECTION));
|
||||
|
||||
if (!desc.has(SHARD)) {
|
||||
error << "shard must be specified. ";
|
||||
}
|
||||
TRI_ASSERT(desc.has(SHARD));
|
||||
|
||||
if (!desc.has(DATABASE)) {
|
||||
error << "database must be specified. ";
|
||||
}
|
||||
TRI_ASSERT(desc.has(DATABASE));
|
||||
|
||||
if (!desc.has(LEADER)) {
|
||||
error << "leader must be stecified. ";
|
||||
error << "leader must be specified. ";
|
||||
}
|
||||
TRI_ASSERT(desc.has(LEADER));
|
||||
|
||||
if (!desc.has(LOCAL_LEADER)) {
|
||||
error << "local leader must be stecified. ";
|
||||
error << "local leader must be specified. ";
|
||||
}
|
||||
TRI_ASSERT(desc.has(LOCAL_LEADER));
|
||||
|
||||
|
@ -70,7 +76,7 @@ UpdateCollection::UpdateCollection(
|
|||
_result.reset(TRI_ERROR_INTERNAL, error.str());
|
||||
setState(FAILED);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
void handleLeadership(
|
||||
|
@ -117,33 +123,41 @@ UpdateCollection::~UpdateCollection() {};
|
|||
|
||||
bool UpdateCollection::first() {
|
||||
|
||||
auto const& database = _description.get(DATABASE);
|
||||
auto const& collection = _description.get(COLLECTION);
|
||||
auto const& database = _description.get(DATABASE);
|
||||
auto const& collection = _description.get(COLLECTION);
|
||||
auto const& shard = _description.get(SHARD);
|
||||
auto const& plannedLeader = _description.get(LEADER);
|
||||
auto const& localLeader = _description.get(LOCAL_LEADER);
|
||||
auto const& localLeader = _description.get(LOCAL_LEADER);
|
||||
auto const& props = properties();
|
||||
|
||||
try {
|
||||
|
||||
DatabaseGuard guard(database);
|
||||
auto vocbase = &guard.database();
|
||||
|
||||
|
||||
Result found = methods::Collections::lookup(
|
||||
vocbase, collection, [&](LogicalCollection& coll) {
|
||||
vocbase, shard, [&](LogicalCollection& coll) {
|
||||
LOG_TOPIC(DEBUG, Logger::MAINTENANCE)
|
||||
<< "Updating local collection " + collection;
|
||||
|
||||
<< "Updating local collection " + shard;
|
||||
|
||||
// We adjust local leadership, note that the planned
|
||||
// resignation case is not handled here, since then
|
||||
// ourselves does not appear in shards[shard] but only
|
||||
// "_" + ourselves.
|
||||
handleLeadership(coll, localLeader, plannedLeader);
|
||||
_result = Collections::updateProperties(&coll, props);
|
||||
|
||||
if (!_result.ok()) {
|
||||
LOG_TOPIC(ERR, Logger::MAINTENANCE) << "failed to update properties"
|
||||
" of collection " << shard << ": " << _result.errorMessage();
|
||||
_feature.storeShardError(database, collection, shard,
|
||||
_description.get(SERVER_ID), _result);
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
if (found.fail()) {
|
||||
std::stringstream error;
|
||||
error << "failed to lookup local collection " << collection
|
||||
error << "failed to lookup local collection " << shard
|
||||
<< "in database " + database;
|
||||
LOG_TOPIC(ERR, Logger::MAINTENANCE) << error.str();
|
||||
_result = actionError(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND, error.str());
|
||||
|
@ -156,7 +170,7 @@ bool UpdateCollection::first() {
|
|||
_result.reset(TRI_ERROR_INTERNAL, error.str());
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
notify();
|
||||
return false;
|
||||
|
||||
|
|
|
@ -653,7 +653,7 @@ TEST_CASE("ActionPhaseOne", "[cluster][maintenance]") {
|
|||
auto cb =
|
||||
node.second(dbname).children().begin()->second->toBuilder();
|
||||
auto collection = cb.slice();
|
||||
auto colname = collection.get(NAME).copyString();
|
||||
auto shname = collection.get(NAME).copyString();
|
||||
|
||||
(*node.second(dbname).children().begin()->second)(prop) =
|
||||
v.slice();
|
||||
|
@ -669,7 +669,7 @@ TEST_CASE("ActionPhaseOne", "[cluster][maintenance]") {
|
|||
for (auto const& action : actions) {
|
||||
|
||||
REQUIRE(action.name() == "UpdateCollection");
|
||||
REQUIRE(action.get("collection") == colname);
|
||||
REQUIRE(action.get("shard") == shname);
|
||||
REQUIRE(action.get("database") == dbname);
|
||||
auto const props = action.properties();
|
||||
|
||||
|
@ -707,8 +707,8 @@ TEST_CASE("ActionPhaseOne", "[cluster][maintenance]") {
|
|||
REQUIRE(actions.size() == 1);
|
||||
for (auto const& action : actions) {
|
||||
REQUIRE(action.name() == "UpdateCollection");
|
||||
REQUIRE(action.has("collection"));
|
||||
REQUIRE(action.get("collection") == collection("name").getString());
|
||||
REQUIRE(action.has("shard"));
|
||||
REQUIRE(action.get("shard") == collection("name").getString());
|
||||
REQUIRE(action.get("localLeader").empty());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue