mirror of https://gitee.com/bigwinds/arangodb
merge from 3.1
This commit is contained in:
parent
094be54826
commit
0d96ba4f4b
|
@ -95,7 +95,7 @@ void ClusterFeature::collectOptions(std::shared_ptr<ProgramOptions> options) {
|
|||
"agency endpoint to connect to",
|
||||
new VectorParameter<StringParameter>(&_agencyEndpoints));
|
||||
|
||||
options->addOption("--cluster.agency-prefix", "agency prefix",
|
||||
options->addHiddenOption("--cluster.agency-prefix", "agency prefix",
|
||||
new StringParameter(&_agencyPrefix));
|
||||
|
||||
options->addOption("--cluster.my-local-info", "this server's local info",
|
||||
|
@ -134,7 +134,11 @@ void ClusterFeature::collectOptions(std::shared_ptr<ProgramOptions> options) {
|
|||
"replication factor for system collections",
|
||||
new UInt32Parameter(&_systemReplicationFactor));
|
||||
|
||||
options->addOption("--cluster.create-waits-for-sync-replication",
|
||||
options->addOption("--cluster.create-waits-for-sync-replication",
|
||||
"active coordinator will wait for all replicas to create collection",
|
||||
new BooleanParameter(&_createWaitsForSyncReplication));
|
||||
|
||||
options->addOption("--cluster.create-waits-for-sync-replication",
|
||||
"active coordinator will wait for all replicas to create collection",
|
||||
new BooleanParameter(&_createWaitsForSyncReplication));
|
||||
}
|
||||
|
@ -144,7 +148,7 @@ void ClusterFeature::validateOptions(std::shared_ptr<ProgramOptions> options) {
|
|||
if (options->processingResult().touched("cluster.disable-dispatcher-kickstarter") ||
|
||||
options->processingResult().touched("cluster.disable-dispatcher-frontend")) {
|
||||
LOG_TOPIC(FATAL, arangodb::Logger::FIXME)
|
||||
<< "The dispatcher feature isn't available anymore. Use ArangoDBStarter for this now! See https://github.com/arangodb-helper/ArangoDBStarter/ for more details.";
|
||||
<< "The dispatcher feature isn't available anymore. Use ArangoDBStarter for this now! See https://github.com/arangodb-helper/ArangoDBStarter/ for more details.";
|
||||
FATAL_ERROR_EXIT();
|
||||
}
|
||||
|
||||
|
@ -158,7 +162,7 @@ void ClusterFeature::validateOptions(std::shared_ptr<ProgramOptions> options) {
|
|||
|
||||
// validate --cluster.agency-endpoint (currently a noop)
|
||||
if (_agencyEndpoints.empty()) {
|
||||
LOG_TOPIC(FATAL, arangodb::Logger::FIXME)
|
||||
LOG_TOPIC(FATAL, Logger::CLUSTER)
|
||||
<< "must at least specify one endpoint in --cluster.agency-endpoint";
|
||||
FATAL_ERROR_EXIT();
|
||||
}
|
||||
|
@ -173,13 +177,13 @@ void ClusterFeature::validateOptions(std::shared_ptr<ProgramOptions> options) {
|
|||
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789/");
|
||||
|
||||
if (found != std::string::npos || _agencyPrefix.empty()) {
|
||||
LOG_TOPIC(FATAL, arangodb::Logger::FIXME) << "invalid value specified for --cluster.agency-prefix";
|
||||
LOG_TOPIC(FATAL, arangodb::Logger::CLUSTER) << "invalid value specified for --cluster.agency-prefix";
|
||||
FATAL_ERROR_EXIT();
|
||||
}
|
||||
|
||||
// validate system-replication-factor
|
||||
if (_systemReplicationFactor == 0) {
|
||||
LOG_TOPIC(FATAL, arangodb::Logger::FIXME) << "system replication factor must be greater 0";
|
||||
LOG_TOPIC(FATAL, arangodb::Logger::CLUSTER) << "system replication factor must be greater 0";
|
||||
FATAL_ERROR_EXIT();
|
||||
}
|
||||
|
||||
|
@ -190,7 +194,7 @@ void ClusterFeature::validateOptions(std::shared_ptr<ProgramOptions> options) {
|
|||
_requestedRole == ServerState::ROLE_AGENT ||
|
||||
_requestedRole == ServerState::ROLE_UNDEFINED
|
||||
) {
|
||||
LOG_TOPIC(FATAL, arangodb::Logger::FIXME) << "Invalid role provided. Possible values: PRIMARY, "
|
||||
LOG_TOPIC(FATAL, arangodb::Logger::CLUSTER) << "Invalid role provided. Possible values: PRIMARY, "
|
||||
"SECONDARY, COORDINATOR";
|
||||
FATAL_ERROR_EXIT();
|
||||
}
|
||||
|
@ -202,7 +206,7 @@ void ClusterFeature::reportRole(arangodb::ServerState::RoleEnum role) {
|
|||
if (role == ServerState::ROLE_UNDEFINED) {
|
||||
roleString += ". Determining real role from agency";
|
||||
}
|
||||
LOG_TOPIC(INFO, arangodb::Logger::FIXME) << "Starting up with role " << roleString;
|
||||
LOG_TOPIC(INFO, arangodb::Logger::CLUSTER) << "Starting up with role " << roleString;
|
||||
}
|
||||
|
||||
void ClusterFeature::prepare() {
|
||||
|
@ -247,7 +251,7 @@ void ClusterFeature::prepare() {
|
|||
"Authentication");
|
||||
|
||||
if (authenticationFeature->isEnabled() && !authenticationFeature->hasUserdefinedJwt()) {
|
||||
LOG_TOPIC(FATAL, arangodb::Logger::FIXME) << "Cluster authentication enabled but jwt not set via command line. Please"
|
||||
LOG_TOPIC(FATAL, arangodb::Logger::CLUSTER) << "Cluster authentication enabled but jwt not set via command line. Please"
|
||||
<< " provide --server.jwt-secret which is used throughout the cluster.";
|
||||
FATAL_ERROR_EXIT();
|
||||
}
|
||||
|
@ -275,7 +279,7 @@ void ClusterFeature::prepare() {
|
|||
std::string const unified = Endpoint::unifiedForm(_agencyEndpoints[i]);
|
||||
|
||||
if (unified.empty()) {
|
||||
LOG_TOPIC(FATAL, arangodb::Logger::FIXME) << "invalid endpoint '" << _agencyEndpoints[i]
|
||||
LOG_TOPIC(FATAL, arangodb::Logger::CLUSTER) << "invalid endpoint '" << _agencyEndpoints[i]
|
||||
<< "' specified for --cluster.agency-endpoint";
|
||||
FATAL_ERROR_EXIT();
|
||||
}
|
||||
|
@ -293,7 +297,7 @@ void ClusterFeature::prepare() {
|
|||
ClusterComm::instance()->enableConnectionErrorLogging(false);
|
||||
// perform an initial connect to the agency
|
||||
if (!AgencyCommManager::MANAGER->start()) {
|
||||
LOG_TOPIC(FATAL, arangodb::Logger::FIXME) << "Could not connect to any agency endpoints ("
|
||||
LOG_TOPIC(FATAL, arangodb::Logger::CLUSTER) << "Could not connect to any agency endpoints ("
|
||||
<< AgencyCommManager::MANAGER->endpointsString() << ")";
|
||||
FATAL_ERROR_EXIT();
|
||||
}
|
||||
|
@ -310,13 +314,13 @@ void ClusterFeature::prepare() {
|
|||
|
||||
if (role == ServerState::ROLE_UNDEFINED) {
|
||||
// no role found
|
||||
LOG_TOPIC(FATAL, arangodb::Logger::FIXME) << "unable to determine unambiguous role for server '" << _myId
|
||||
LOG_TOPIC(FATAL, arangodb::Logger::CLUSTER) << "unable to determine unambiguous role for server '" << _myId
|
||||
<< "'. No role configured in agency (" << endpoints << ")";
|
||||
FATAL_ERROR_EXIT();
|
||||
}
|
||||
|
||||
if (role == ServerState::ROLE_SINGLE) {
|
||||
LOG_TOPIC(FATAL, arangodb::Logger::FIXME) << "determined single-server role for server '" << _myId
|
||||
LOG_TOPIC(FATAL, arangodb::Logger::CLUSTER) << "determined single-server role for server '" << _myId
|
||||
<< "'. Please check the configurarion in the agency ("
|
||||
<< endpoints << ")";
|
||||
FATAL_ERROR_EXIT();
|
||||
|
@ -343,12 +347,12 @@ void ClusterFeature::prepare() {
|
|||
double start = TRI_microtime();
|
||||
|
||||
while (true) {
|
||||
LOG_TOPIC(INFO, arangodb::Logger::FIXME) << "Waiting for DBservers to show up...";
|
||||
LOG_TOPIC(INFO, arangodb::Logger::CLUSTER) << "Waiting for DBservers to show up...";
|
||||
ci->loadCurrentDBServers();
|
||||
std::vector<ServerID> DBServers = ci->getCurrentDBServers();
|
||||
if (DBServers.size() >= 1 &&
|
||||
(DBServers.size() > 1 || TRI_microtime() - start > 15.0)) {
|
||||
LOG_TOPIC(INFO, arangodb::Logger::FIXME) << "Found " << DBServers.size() << " DBservers.";
|
||||
LOG_TOPIC(INFO, arangodb::Logger::CLUSTER) << "Found " << DBServers.size() << " DBservers.";
|
||||
break;
|
||||
}
|
||||
sleep(1);
|
||||
|
@ -357,7 +361,7 @@ void ClusterFeature::prepare() {
|
|||
}
|
||||
|
||||
if (_myAddress.empty()) {
|
||||
LOG_TOPIC(FATAL, arangodb::Logger::FIXME) << "unable to determine internal address for server '" << _myId
|
||||
LOG_TOPIC(FATAL, arangodb::Logger::CLUSTER) << "unable to determine internal address for server '" << _myId
|
||||
<< "'. Please specify --cluster.my-address or configure the "
|
||||
"address for this server in the agency.";
|
||||
FATAL_ERROR_EXIT();
|
||||
|
@ -365,7 +369,7 @@ void ClusterFeature::prepare() {
|
|||
|
||||
// now we can validate --cluster.my-address
|
||||
if (Endpoint::unifiedForm(_myAddress).empty()) {
|
||||
LOG_TOPIC(FATAL, arangodb::Logger::FIXME) << "invalid endpoint '" << _myAddress
|
||||
LOG_TOPIC(FATAL, arangodb::Logger::CLUSTER) << "invalid endpoint '" << _myAddress
|
||||
<< "' specified for --cluster.my-address";
|
||||
FATAL_ERROR_EXIT();
|
||||
}
|
||||
|
@ -396,7 +400,7 @@ void ClusterFeature::start() {
|
|||
|
||||
ServerState::RoleEnum role = ServerState::instance()->getRole();
|
||||
|
||||
LOG_TOPIC(INFO, arangodb::Logger::FIXME) << "Cluster feature is turned on. Agency version: " << version
|
||||
LOG_TOPIC(INFO, arangodb::Logger::CLUSTER) << "Cluster feature is turned on. Agency version: " << version
|
||||
<< ", Agency endpoints: " << endpoints << ", server id: '" << _myId
|
||||
<< "', internal address: " << _myAddress
|
||||
<< ", role: " << ServerState::roleToString(role);
|
||||
|
@ -412,7 +416,7 @@ void ClusterFeature::start() {
|
|||
if (HeartbeatIntervalMs.isInteger()) {
|
||||
try {
|
||||
_heartbeatInterval = HeartbeatIntervalMs.getUInt();
|
||||
LOG_TOPIC(INFO, arangodb::Logger::FIXME) << "using heartbeat interval value '" << _heartbeatInterval
|
||||
LOG_TOPIC(INFO, arangodb::Logger::CLUSTER) << "using heartbeat interval value '" << _heartbeatInterval
|
||||
<< " ms' from agency";
|
||||
} catch (...) {
|
||||
// Ignore if it is not a small int or uint
|
||||
|
@ -424,7 +428,7 @@ void ClusterFeature::start() {
|
|||
if (_heartbeatInterval == 0) {
|
||||
_heartbeatInterval = 5000; // 1/s
|
||||
|
||||
LOG_TOPIC(WARN, arangodb::Logger::FIXME) << "unable to read heartbeat interval from agency. Using "
|
||||
LOG_TOPIC(WARN, arangodb::Logger::CLUSTER) << "unable to read heartbeat interval from agency. Using "
|
||||
<< "default value '" << _heartbeatInterval << " ms'";
|
||||
}
|
||||
|
||||
|
@ -434,7 +438,7 @@ void ClusterFeature::start() {
|
|||
SchedulerFeature::SCHEDULER->ioService());
|
||||
|
||||
if (!_heartbeatThread->init() || !_heartbeatThread->start()) {
|
||||
LOG_TOPIC(FATAL, arangodb::Logger::FIXME) << "heartbeat could not connect to agency endpoints ("
|
||||
LOG_TOPIC(FATAL, arangodb::Logger::CLUSTER) << "heartbeat could not connect to agency endpoints ("
|
||||
<< endpoints << ")";
|
||||
FATAL_ERROR_EXIT();
|
||||
}
|
||||
|
@ -453,7 +457,7 @@ void ClusterFeature::start() {
|
|||
VPackObjectBuilder b(&builder);
|
||||
builder.add("endpoint", VPackValue(_myAddress));
|
||||
} catch (...) {
|
||||
LOG_TOPIC(FATAL, arangodb::Logger::FIXME) << "out of memory";
|
||||
LOG_TOPIC(FATAL, arangodb::Logger::CLUSTER) << "out of memory";
|
||||
FATAL_ERROR_EXIT();
|
||||
}
|
||||
|
||||
|
@ -461,7 +465,7 @@ void ClusterFeature::start() {
|
|||
builder.slice(), 0.0);
|
||||
|
||||
if (!result.successful()) {
|
||||
LOG_TOPIC(FATAL, arangodb::Logger::FIXME) << "unable to register server in agency: http code: "
|
||||
LOG_TOPIC(FATAL, arangodb::Logger::CLUSTER) << "unable to register server in agency: http code: "
|
||||
<< result.httpCode() << ", body: " << result.body();
|
||||
FATAL_ERROR_EXIT();
|
||||
} else {
|
||||
|
@ -494,7 +498,7 @@ void ClusterFeature::stop() {
|
|||
usleep(100000);
|
||||
// emit warning after 5 seconds
|
||||
if (++counter == 10 * 5) {
|
||||
LOG_TOPIC(WARN, arangodb::Logger::FIXME) << "waiting for heartbeat thread to finish";
|
||||
LOG_TOPIC(WARN, arangodb::Logger::CLUSTER) << "waiting for heartbeat thread to finish";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -521,7 +525,7 @@ void ClusterFeature::unprepare() {
|
|||
usleep(100000);
|
||||
// emit warning after 5 seconds
|
||||
if (++counter == 10 * 5) {
|
||||
LOG_TOPIC(WARN, arangodb::Logger::FIXME) << "waiting for heartbeat thread to finish";
|
||||
LOG_TOPIC(WARN, arangodb::Logger::CLUSTER) << "waiting for heartbeat thread to finish";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1101,10 +1101,19 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName,
|
|||
[=](VPackSlice const& result) {
|
||||
if (result.isObject() && result.length() == (size_t)numberOfShards) {
|
||||
std::string tmpMsg = "";
|
||||
bool tmpHaveError = false;
|
||||
|
||||
for (auto const& p : VPackObjectIterator(result)) {
|
||||
if (replicationFactor == 0) {
|
||||
VPackSlice servers = p.value.get("servers");
|
||||
if (!servers.isArray() || servers.length() < dbServers.size()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
if (arangodb::basics::VelocyPackHelper::getBooleanValue(
|
||||
p.value, "error", false)) {
|
||||
tmpHaveError = true;
|
||||
tmpMsg += " shardID:" + p.key.copyString() + ":";
|
||||
tmpMsg += arangodb::basics::VelocyPackHelper::getStringValue(
|
||||
p.value, "errorMessage", "");
|
||||
|
@ -1117,25 +1126,14 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName,
|
|||
tmpMsg += ")";
|
||||
}
|
||||
}
|
||||
*errMsg = "Error in creation of collection:" + tmpMsg + " "
|
||||
+ __FILE__ + std::to_string(__LINE__);
|
||||
*dbServerResult = TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION;
|
||||
return true;
|
||||
}
|
||||
|
||||
// wait that all followers have created our new collection
|
||||
if (waitForReplication) {
|
||||
uint64_t mutableReplicationFactor = replicationFactor;
|
||||
if (mutableReplicationFactor == 0) {
|
||||
mutableReplicationFactor = dbServers.size();
|
||||
}
|
||||
|
||||
VPackSlice servers = p.value.get("servers");
|
||||
if (!servers.isArray() || servers.length() < mutableReplicationFactor) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (tmpHaveError) {
|
||||
*errMsg = "Error in creation of collection:" + tmpMsg + " "
|
||||
+ __FILE__ + std::to_string(__LINE__);
|
||||
*dbServerResult = TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION;
|
||||
return true;
|
||||
}
|
||||
*dbServerResult = setErrormsg(TRI_ERROR_NO_ERROR, *errMsg);
|
||||
}
|
||||
return true;
|
||||
|
@ -1315,17 +1313,17 @@ int ClusterInfo::dropCollectionCoordinator(
|
|||
clones.push_back(p->name());
|
||||
}
|
||||
}
|
||||
if (!clones.empty()){
|
||||
errorMsg += "Collection must not be dropped while it is sharding "
|
||||
"prototype for collection[s]";
|
||||
for (auto const& i : clones) {
|
||||
|
||||
if (!clones.empty()){
|
||||
errorMsg += "Collection must not be dropped while it is sharding "
|
||||
"prototype for collection[s]";
|
||||
for (auto const& i : clones) {
|
||||
errorMsg += std::string(" ") + i;
|
||||
}
|
||||
errorMsg += ".";
|
||||
return TRI_ERROR_CLUSTER_MUST_NOT_DROP_COLL_OTHER_DISTRIBUTESHARDSLIKE;
|
||||
|
||||
}
|
||||
|
||||
errorMsg += ".";
|
||||
return TRI_ERROR_CLUSTER_MUST_NOT_DROP_COLL_OTHER_DISTRIBUTESHARDSLIKE;
|
||||
}
|
||||
|
||||
double const realTimeout = getTimeout(timeout);
|
||||
double const endTime = TRI_microtime() + realTimeout;
|
||||
double const interval = getPollInterval();
|
||||
|
|
Loading…
Reference in New Issue