mirror of https://gitee.com/bigwinds/arangodb
Merge branch 'devel' of https://github.com/arangodb/arangodb into generic-col-types
This commit is contained in:
commit
9611f30796
|
@ -229,6 +229,10 @@ std::string Constituent::endpoint(std::string id) const {
|
|||
/// @brief Vote
|
||||
bool Constituent::vote(term_t term, std::string id, index_t prevLogIndex,
|
||||
term_t prevLogTerm, bool appendEntries) {
|
||||
if(!_vocbase) {
|
||||
return false;
|
||||
}
|
||||
|
||||
term_t t = 0;
|
||||
std::string lid;
|
||||
|
||||
|
|
|
@ -526,16 +526,16 @@ void ClusterInfo::loadPlan() {
|
|||
_planProt.doneVersion = storedVersion;
|
||||
_planProt.isValid = true; // will never be reset to false
|
||||
} else {
|
||||
LOG(ERR) << "\"Plan\" is not an object in agency";
|
||||
LOG_TOPIC(ERR, Logger::CLUSTER) << "\"Plan\" is not an object in agency";
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
LOG(DEBUG) << "Error while loading " << prefixPlan
|
||||
<< " httpCode: " << result.httpCode()
|
||||
<< " errorCode: " << result.errorCode()
|
||||
<< " errorMessage: " << result.errorMessage()
|
||||
<< " body: " << result.body();
|
||||
LOG_TOPIC(DEBUG, Logger::CLUSTER) << "Error while loading " << prefixPlan
|
||||
<< " httpCode: " << result.httpCode()
|
||||
<< " errorCode: " << result.errorCode()
|
||||
<< " errorMessage: " << result.errorMessage()
|
||||
<< " body: " << result.body();
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -622,9 +622,11 @@ void ClusterInfo::loadCurrent() {
|
|||
newShardIds.insert(make_pair(shardID, servers));
|
||||
|
||||
}
|
||||
databaseCollections.insert(std::make_pair(collectionName, collectionDataCurrent));
|
||||
databaseCollections.insert(
|
||||
std::make_pair(collectionName, collectionDataCurrent));
|
||||
}
|
||||
newCollections.emplace(std::make_pair(databaseName, databaseCollections));
|
||||
newCollections.emplace(
|
||||
std::make_pair(databaseName, databaseCollections));
|
||||
}
|
||||
swapCollections = true;
|
||||
}
|
||||
|
@ -637,24 +639,25 @@ void ClusterInfo::loadCurrent() {
|
|||
_currentDatabases.swap(newDatabases);
|
||||
}
|
||||
if (swapCollections) {
|
||||
LOG(TRACE) << "Have loaded new collections current cache!";
|
||||
LOG_TOPIC(TRACE, Logger::CLUSTER)
|
||||
<< "Have loaded new collections current cache!";
|
||||
_currentCollections.swap(newCollections);
|
||||
_shardIds.swap(newShardIds);
|
||||
}
|
||||
_currentProt.doneVersion = storedVersion;
|
||||
_currentProt.isValid = true; // will never be reset to false
|
||||
} else {
|
||||
LOG(ERR) << "Current is not an object!";
|
||||
LOG_TOPIC(ERR, Logger::CLUSTER) << "Current is not an object!";
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
LOG(ERR) << "Error while loading " << prefixCurrent
|
||||
<< " httpCode: " << result.httpCode()
|
||||
<< " errorCode: " << result.errorCode()
|
||||
<< " errorMessage: " << result.errorMessage()
|
||||
<< " body: " << result.body();
|
||||
LOG_TOPIC(ERR, Logger::CLUSTER) << "Error while loading " << prefixCurrent
|
||||
<< " httpCode: " << result.httpCode()
|
||||
<< " errorCode: " << result.errorCode()
|
||||
<< " errorMessage: " << result.errorMessage()
|
||||
<< " body: " << result.body();
|
||||
}
|
||||
|
||||
/// @brief ask about a collection
|
||||
|
@ -1917,11 +1920,11 @@ void ClusterInfo::loadServers() {
|
|||
}
|
||||
}
|
||||
|
||||
LOG(DEBUG) << "Error while loading " << prefixServers
|
||||
<< " httpCode: " << result.httpCode()
|
||||
<< " errorCode: " << result.errorCode()
|
||||
<< " errorMessage: " << result.errorMessage()
|
||||
<< " body: " << result.body();
|
||||
LOG_TOPIC(DEBUG, Logger::CLUSTER) << "Error while loading " << prefixServers
|
||||
<< " httpCode: " << result.httpCode()
|
||||
<< " errorCode: " << result.errorCode()
|
||||
<< " errorMessage: " << result.errorMessage()
|
||||
<< " body: " << result.body();
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -2041,11 +2044,12 @@ void ClusterInfo::loadCurrentCoordinators() {
|
|||
}
|
||||
}
|
||||
|
||||
LOG(DEBUG) << "Error while loading " << prefixCurrentCoordinators
|
||||
<< " httpCode: " << result.httpCode()
|
||||
<< " errorCode: " << result.errorCode()
|
||||
<< " errorMessage: " << result.errorMessage()
|
||||
<< " body: " << result.body();
|
||||
LOG_TOPIC(DEBUG, Logger::CLUSTER)
|
||||
<< "Error while loading " << prefixCurrentCoordinators
|
||||
<< " httpCode: " << result.httpCode()
|
||||
<< " errorCode: " << result.errorCode()
|
||||
<< " errorMessage: " << result.errorMessage()
|
||||
<< " body: " << result.body();
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -2129,11 +2133,12 @@ void ClusterInfo::loadCurrentDBServers() {
|
|||
}
|
||||
}
|
||||
|
||||
LOG(DEBUG) << "Error while loading " << prefixCurrentDBServers
|
||||
<< " httpCode: " << result.httpCode()
|
||||
<< " errorCode: " << result.errorCode()
|
||||
<< " errorMessage: " << result.errorMessage()
|
||||
<< " body: " << result.body();
|
||||
LOG_TOPIC(DEBUG, Logger::CLUSTER)
|
||||
<< "Error while loading " << prefixCurrentDBServers
|
||||
<< " httpCode: " << result.httpCode()
|
||||
<< " errorCode: " << result.errorCode()
|
||||
<< " errorMessage: " << result.errorMessage()
|
||||
<< " body: " << result.body();
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -2206,7 +2211,9 @@ std::shared_ptr<std::vector<ServerID>> ClusterInfo::getResponsibleServer(
|
|||
// This is a temporary situation in which the leader has already
|
||||
// resigned, let's wait half a second and try again.
|
||||
--tries;
|
||||
LOG(INFO) << "getResponsibleServer: found resigned leader, waiting for half a second...";
|
||||
LOG_TOPIC(INFO, Logger::CLUSTER)
|
||||
<< "getResponsibleServer: found resigned leader,"
|
||||
<< "waiting for half a second...";
|
||||
usleep(500000);
|
||||
} else {
|
||||
return (*it).second;
|
||||
|
@ -2534,9 +2541,10 @@ void FollowerInfo::add(ServerID const& sid) {
|
|||
_docColl->name()}));
|
||||
|
||||
if (!currentEntry.isObject()) {
|
||||
LOG(ERR) << "FollowerInfo::add, did not find object in " << path;
|
||||
LOG_TOPIC(ERR, Logger::CLUSTER)
|
||||
<< "FollowerInfo::add, did not find object in " << path;
|
||||
if (!currentEntry.isNone()) {
|
||||
LOG(ERR) << "Found: " << currentEntry.toJson();
|
||||
LOG_TOPIC(ERR, Logger::CLUSTER) << "Found: " << currentEntry.toJson();
|
||||
}
|
||||
} else {
|
||||
auto newValue = newShardEntry(currentEntry, sid, true);
|
||||
|
@ -2558,17 +2566,19 @@ void FollowerInfo::add(ServerID const& sid) {
|
|||
success = true;
|
||||
break; //
|
||||
} else {
|
||||
LOG(WARN) << "FollowerInfo::add, could not cas key " << path;
|
||||
LOG_TOPIC(WARN, Logger::CLUSTER)
|
||||
<< "FollowerInfo::add, could not cas key " << path;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
LOG(ERR) << "FollowerInfo::add, could not read " << path << " in agency.";
|
||||
LOG_TOPIC(ERR, Logger::CLUSTER)
|
||||
<< "FollowerInfo::add, could not read " << path << " in agency.";
|
||||
}
|
||||
usleep(500000);
|
||||
} while (TRI_microtime() < startTime + 30);
|
||||
if (!success) {
|
||||
LOG(ERR) << "FollowerInfo::add, timeout in agency operation for key "
|
||||
<< path;
|
||||
LOG_TOPIC(ERR, Logger::CLUSTER)
|
||||
<< "FollowerInfo::add, timeout in agency operation for key " << path;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2615,9 +2625,10 @@ void FollowerInfo::remove(ServerID const& sid) {
|
|||
_docColl->name()}));
|
||||
|
||||
if (!currentEntry.isObject()) {
|
||||
LOG(ERR) << "FollowerInfo::remove, did not find object in " << path;
|
||||
LOG_TOPIC(ERR, Logger::CLUSTER)
|
||||
<< "FollowerInfo::remove, did not find object in " << path;
|
||||
if (!currentEntry.isNone()) {
|
||||
LOG(ERR) << "Found: " << currentEntry.toJson();
|
||||
LOG_TOPIC(ERR, Logger::CLUSTER) << "Found: " << currentEntry.toJson();
|
||||
}
|
||||
} else {
|
||||
auto newValue = newShardEntry(currentEntry, sid, false);
|
||||
|
@ -2639,18 +2650,19 @@ void FollowerInfo::remove(ServerID const& sid) {
|
|||
success = true;
|
||||
break; //
|
||||
} else {
|
||||
LOG(WARN) << "FollowerInfo::remove, could not cas key " << path;
|
||||
LOG_TOPIC(WARN, Logger::CLUSTER)
|
||||
<< "FollowerInfo::remove, could not cas key " << path;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
LOG(ERR) << "FollowerInfo::remove, could not read " << path
|
||||
<< " in agency.";
|
||||
LOG_TOPIC(ERR, Logger::CLUSTER)
|
||||
<< "FollowerInfo::remove, could not read " << path << " in agency.";
|
||||
}
|
||||
usleep(500000);
|
||||
} while (TRI_microtime() < startTime + 30);
|
||||
if (!success) {
|
||||
LOG(ERR) << "FollowerInfo::remove, timeout in agency operation for key "
|
||||
<< path;
|
||||
LOG_TOPIC(ERR, Logger::CLUSTER)
|
||||
<< "FollowerInfo::remove, timeout in agency operation for key " << path;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -2237,8 +2237,9 @@ std::map<std::string, std::vector<std::string>> distributeShards(
|
|||
count = 0;
|
||||
}
|
||||
if (++count2 == dbServers.size() + 1) {
|
||||
LOG(WARN) << "createCollectionCoordinator: replicationFactor is "
|
||||
"too large for the number of DBservers";
|
||||
LOG_TOPIC(WARN, Logger::CLUSTER)
|
||||
<< "createCollectionCoordinator: replicationFactor is "
|
||||
<< "too large for the number of DBservers";
|
||||
found = false;
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -59,7 +59,7 @@ DBServerAgencySync::DBServerAgencySync(HeartbeatThread* heartbeat)
|
|||
DBServerAgencySync::~DBServerAgencySync() {}
|
||||
|
||||
void DBServerAgencySync::work() {
|
||||
LOG(TRACE) << "starting plan update handler";
|
||||
LOG_TOPIC(TRACE, Logger::CLUSTER) << "starting plan update handler";
|
||||
|
||||
if (_shutdown != 0) {
|
||||
return;
|
||||
|
@ -136,11 +136,12 @@ DBServerAgencySyncResult DBServerAgencySync::execute() {
|
|||
}
|
||||
|
||||
if (!handlePlanChange->IsFunction()) {
|
||||
LOG(ERR) << "handlePlanChange is not a function";
|
||||
LOG_TOPIC(ERR, Logger::CLUSTER) << "handlePlanChange is not a function";
|
||||
return result;
|
||||
}
|
||||
|
||||
v8::Handle<v8::Function> func = v8::Handle<v8::Function>::Cast(handlePlanChange);
|
||||
v8::Handle<v8::Function> func =
|
||||
v8::Handle<v8::Function>::Cast(handlePlanChange);
|
||||
v8::Handle<v8::Value> args[2];
|
||||
// Keep the shared_ptr to the builder while we run TRI_VPackToV8 on the
|
||||
// slice(), just to be on the safe side:
|
||||
|
@ -149,7 +150,8 @@ DBServerAgencySyncResult DBServerAgencySync::execute() {
|
|||
builder = clusterInfo->getCurrent();
|
||||
args[1] = TRI_VPackToV8(isolate, builder->slice());
|
||||
|
||||
v8::Handle<v8::Value> res = func->Call(isolate->GetCurrentContext()->Global(), 2, args);
|
||||
v8::Handle<v8::Value> res =
|
||||
func->Call(isolate->GetCurrentContext()->Global(), 2, args);
|
||||
|
||||
if (tryCatch.HasCaught()) {
|
||||
TRI_LogV8Exception(isolate, &tryCatch);
|
||||
|
@ -172,16 +174,19 @@ DBServerAgencySyncResult DBServerAgencySync::execute() {
|
|||
|
||||
if (value->IsNumber()) {
|
||||
if (strcmp(*str, "plan") == 0) {
|
||||
result.planVersion = static_cast<uint64_t>(value->ToUint32()->Value());
|
||||
result.planVersion =
|
||||
static_cast<uint64_t>(value->ToUint32()->Value());
|
||||
} else if (strcmp(*str, "current") == 0) {
|
||||
result.currentVersion = static_cast<uint64_t>(value->ToUint32()->Value());
|
||||
result.currentVersion =
|
||||
static_cast<uint64_t>(value->ToUint32()->Value());
|
||||
}
|
||||
} else if (value->IsBoolean() && strcmp(*str, "success")) {
|
||||
result.success = TRI_ObjectToBoolean(value);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
LOG(ERR) << "handlePlanChange returned a non-object";
|
||||
LOG_TOPIC(ERR, Logger::CLUSTER)
|
||||
<< "handlePlanChange returned a non-object";
|
||||
return result;
|
||||
}
|
||||
// invalidate our local cache, even if an error occurred
|
||||
|
|
|
@ -277,9 +277,11 @@ void HeartbeatThread::runDBServer() {
|
|||
remain = interval - (TRI_microtime() - start);
|
||||
} while (remain > 0);
|
||||
} catch (std::exception const& e) {
|
||||
LOG_TOPIC(ERR, Logger::HEARTBEAT) << "Got an exception in DBServer heartbeat: " << e.what();
|
||||
LOG_TOPIC(ERR, Logger::HEARTBEAT)
|
||||
<< "Got an exception in DBServer heartbeat: " << e.what();
|
||||
} catch (...) {
|
||||
LOG_TOPIC(ERR, Logger::HEARTBEAT) << "Got an unknown exception in DBServer heartbeat";
|
||||
LOG_TOPIC(ERR, Logger::HEARTBEAT)
|
||||
<< "Got an unknown exception in DBServer heartbeat";
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -361,38 +363,42 @@ void HeartbeatThread::runCoordinator() {
|
|||
|
||||
handleStateChange(result);
|
||||
|
||||
// mop: order is actually important here...FoxxmasterQueueupdate will be set only when somebody
|
||||
// registers some new queue stuff (for example on a different coordinator than this one)...
|
||||
// However when we are just about to become the new foxxmaster we must immediately refresh our queues
|
||||
// this is done in ServerState...if queueupdate is set after foxxmaster the change will be reset again
|
||||
// mop: order is actually important here...FoxxmasterQueueupdate will
|
||||
// be set only when somebody registers some new queue stuff (for example
|
||||
// on a different coordinator than this one)... However when we are just
|
||||
// about to become the new foxxmaster we must immediately refresh our
|
||||
// queues this is done in ServerState...if queueupdate is set after
|
||||
// foxxmaster the change will be reset again
|
||||
VPackSlice foxxmasterQueueupdateSlice = result.slice()[0].get(
|
||||
std::vector<std::string>({_agency.prefix(), "Current", "FoxxmasterQueueupdate"})
|
||||
);
|
||||
std::vector<std::string>({_agency.prefix(), "Current",
|
||||
"FoxxmasterQueueupdate"})
|
||||
);
|
||||
|
||||
if (foxxmasterQueueupdateSlice.isBool()) {
|
||||
ServerState::instance()->setFoxxmasterQueueupdate(foxxmasterQueueupdateSlice.getBool());
|
||||
ServerState::instance()->setFoxxmasterQueueupdate(
|
||||
foxxmasterQueueupdateSlice.getBool());
|
||||
}
|
||||
|
||||
|
||||
VPackSlice foxxmasterSlice = result.slice()[0].get(
|
||||
std::vector<std::string>({_agency.prefix(), "Current", "Foxxmaster"})
|
||||
);
|
||||
std::vector<std::string>({_agency.prefix(), "Current", "Foxxmaster"})
|
||||
);
|
||||
|
||||
if (foxxmasterSlice.isString()) {
|
||||
ServerState::instance()->setFoxxmaster(foxxmasterSlice.copyString());
|
||||
}
|
||||
|
||||
|
||||
VPackSlice versionSlice = result.slice()[0].get(
|
||||
std::vector<std::string>({_agency.prefix(), "Plan", "Version"}));
|
||||
|
||||
std::vector<std::string>({_agency.prefix(), "Plan", "Version"}));
|
||||
|
||||
if (versionSlice.isInteger()) {
|
||||
// there is a plan version
|
||||
|
||||
|
||||
uint64_t planVersion = 0;
|
||||
try {
|
||||
planVersion = versionSlice.getUInt();
|
||||
} catch (...) {
|
||||
}
|
||||
|
||||
|
||||
if (planVersion > lastPlanVersionNoticed) {
|
||||
LOG_TOPIC(TRACE, Logger::HEARTBEAT)
|
||||
<< "Found planVersion " << planVersion << " which is newer than "
|
||||
|
@ -401,14 +407,14 @@ void HeartbeatThread::runCoordinator() {
|
|||
lastPlanVersionNoticed = planVersion;
|
||||
} else {
|
||||
LOG_TOPIC(WARN, Logger::HEARTBEAT)
|
||||
<< "handlePlanChangeCoordinator was unsuccessful";
|
||||
<< "handlePlanChangeCoordinator was unsuccessful";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
VPackSlice slice = result.slice()[0].get(
|
||||
std::vector<std::string>({_agency.prefix(), "Sync", "UserVersion"}));
|
||||
|
||||
std::vector<std::string>({_agency.prefix(), "Sync", "UserVersion"}));
|
||||
|
||||
if (slice.isInteger()) {
|
||||
// there is a UserVersion
|
||||
uint64_t userVersion = 0;
|
||||
|
@ -416,13 +422,13 @@ void HeartbeatThread::runCoordinator() {
|
|||
userVersion = slice.getUInt();
|
||||
} catch (...) {
|
||||
}
|
||||
|
||||
|
||||
if (userVersion > 0 && userVersion != oldUserVersion) {
|
||||
oldUserVersion = userVersion;
|
||||
GeneralServerFeature::AUTH_INFO.outdate();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
versionSlice = result.slice()[0].get(
|
||||
std::vector<std::string>({_agency.prefix(), "Current", "Version"}));
|
||||
if (versionSlice.isInteger()) {
|
||||
|
|
|
@ -72,7 +72,8 @@ RestHandler::status RestAgencyCallbacksHandler::execute() {
|
|||
ss >> index;
|
||||
|
||||
auto callback = _agencyCallbackRegistry->getCallback(index);
|
||||
LOG(DEBUG) << "Agency callback has been triggered. refetching!";
|
||||
LOG_TOPIC(DEBUG, Logger::CLUSTER)
|
||||
<< "Agency callback has been triggered. refetching!";
|
||||
callback->refetchAndUpdate(true);
|
||||
resetResponse(arangodb::rest::ResponseCode::ACCEPTED);
|
||||
} catch (arangodb::basics::Exception const&) {
|
||||
|
|
|
@ -183,7 +183,7 @@ void ServerState::findAndSetRoleBlocking() {
|
|||
while (true) {
|
||||
auto role = determineRole(_localInfo, _id);
|
||||
std::string roleString = roleToString(role);
|
||||
LOG(DEBUG) << "Found my role: " << roleString;
|
||||
LOG_TOPIC(DEBUG, Logger::CLUSTER) << "Found my role: " << roleString;
|
||||
|
||||
if (storeRole(role)) {
|
||||
break;
|
||||
|
@ -222,7 +222,9 @@ bool ServerState::unregister() {
|
|||
std::string const& id = getId();
|
||||
|
||||
std::string localInfoEncoded = StringUtils::urlEncode(_localInfo);
|
||||
AgencyOperation deleteLocalIdMap("Target/MapLocalToID/" + localInfoEncoded, AgencySimpleOperationType::DELETE_OP);
|
||||
AgencyOperation deleteLocalIdMap(
|
||||
"Target/MapLocalToID/" + localInfoEncoded,
|
||||
AgencySimpleOperationType::DELETE_OP);
|
||||
|
||||
std::vector<AgencyOperation> operations = {deleteLocalIdMap};
|
||||
|
||||
|
@ -230,8 +232,13 @@ bool ServerState::unregister() {
|
|||
const std::string agencyKey = roleToAgencyKey(role);
|
||||
TRI_ASSERT(isClusterRole(role));
|
||||
if (role == ROLE_COORDINATOR || role == ROLE_PRIMARY) {
|
||||
operations.push_back(AgencyOperation("Plan/" + agencyKey + "/" + id, AgencySimpleOperationType::DELETE_OP));
|
||||
operations.push_back(AgencyOperation("Current/" + agencyKey + "/" + id, AgencySimpleOperationType::DELETE_OP));
|
||||
operations.push_back(
|
||||
AgencyOperation(
|
||||
"Plan/" + agencyKey + "/" + id, AgencySimpleOperationType::DELETE_OP));
|
||||
operations.push_back(
|
||||
AgencyOperation(
|
||||
"Current/" + agencyKey + "/"
|
||||
+ id, AgencySimpleOperationType::DELETE_OP));
|
||||
}
|
||||
|
||||
AgencyWriteTransaction unregisterTransaction(operations);
|
||||
|
@ -248,7 +255,8 @@ bool ServerState::unregister() {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
bool ServerState::registerWithRole(ServerState::RoleEnum role) {
|
||||
if (!getId().empty()) {
|
||||
LOG(INFO) << "Registering with role and localinfo. Supplied id is being ignored";
|
||||
LOG_TOPIC(INFO, Logger::CLUSTER)
|
||||
<< "Registering with role and localinfo. Supplied id is being ignored";
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -271,7 +279,9 @@ bool ServerState::registerWithRole(ServerState::RoleEnum role) {
|
|||
}
|
||||
}
|
||||
if (!found) {
|
||||
LOG(DEBUG) << "Determining id from localinfo failed. Continuing with registering ourselves for the first time";
|
||||
LOG_TOPIC(DEBUG, Logger::CLUSTER)
|
||||
<< "Determining id from localinfo failed."
|
||||
<< "Continuing with registering ourselves for the first time";
|
||||
id = createIdForRole(comm, role);
|
||||
}
|
||||
|
||||
|
@ -302,7 +312,8 @@ bool ServerState::registerWithRole(ServerState::RoleEnum role) {
|
|||
|
||||
comm.setValue(planKey, plan, 0.0);
|
||||
if (!result.successful()) {
|
||||
LOG(ERR) << "Couldn't create plan " << result.errorMessage();
|
||||
LOG_TOPIC(ERR, Logger::CLUSTER)
|
||||
<< "Couldn't create plan " << result.errorMessage();
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
@ -310,14 +321,17 @@ bool ServerState::registerWithRole(ServerState::RoleEnum role) {
|
|||
result = comm.setValue(currentKey, builder->slice(), 0.0);
|
||||
|
||||
if (!result.successful()) {
|
||||
LOG(ERR) << "Could not talk to agency! " << result.errorMessage();
|
||||
LOG_TOPIC(ERR, Logger::CLUSTER)
|
||||
<< "Could not talk to agency! " << result.errorMessage();
|
||||
return false;
|
||||
}
|
||||
|
||||
_id = id;
|
||||
|
||||
findAndSetRoleBlocking();
|
||||
LOG(DEBUG) << "We successfully announced ourselves as " << roleToString(role) << " and our id is " << id;
|
||||
LOG_TOPIC(DEBUG, Logger::CLUSTER)
|
||||
<< "We successfully announced ourselves as " << roleToString(role)
|
||||
<< " and our id is " << id;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
@ -344,7 +358,9 @@ std::string ServerState::roleToAgencyKey(ServerState::RoleEnum role) {
|
|||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief create an id for a specified role
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
std::string ServerState::createIdForRole(AgencyComm comm, ServerState::RoleEnum role) {
|
||||
std::string ServerState::createIdForRole(
|
||||
AgencyComm comm, ServerState::RoleEnum role) {
|
||||
|
||||
std::string const agencyKey = roleToAgencyKey(role);
|
||||
|
||||
std::string const serverIdPrefix = agencyKey.substr(0, agencyKey.length() - 1);
|
||||
|
@ -387,7 +403,8 @@ std::string ServerState::createIdForRole(AgencyComm comm, ServerState::RoleEnum
|
|||
<< (!entry.isNone());
|
||||
} while (!entry.isNone());
|
||||
|
||||
createResult = comm.casValue("Plan/" + agencyKey + "/" + id, idValue, false, 0.0, 0.0);
|
||||
createResult =
|
||||
comm.casValue("Plan/" + agencyKey + "/" + id, idValue, false, 0.0, 0.0);
|
||||
} while(!createResult.successful());
|
||||
|
||||
VPackBuilder localIdBuilder;
|
||||
|
@ -395,7 +412,10 @@ std::string ServerState::createIdForRole(AgencyComm comm, ServerState::RoleEnum
|
|||
|
||||
VPackSlice localIdValue = localIdBuilder.slice();
|
||||
|
||||
AgencyCommResult mapResult = comm.setValue("Target/MapLocalToID/" + StringUtils::urlEncode(_localInfo), localIdValue, 0.0);
|
||||
AgencyCommResult mapResult = comm.setValue(
|
||||
"Target/MapLocalToID/"
|
||||
+ StringUtils::urlEncode(_localInfo), localIdValue, 0.0);
|
||||
|
||||
if (!mapResult.successful()) {
|
||||
LOG(FATAL) << "Couldn't register Id as localId";
|
||||
FATAL_ERROR_EXIT();
|
||||
|
@ -539,11 +559,17 @@ void ServerState::setState(StateEnum state) {
|
|||
}
|
||||
|
||||
if (result) {
|
||||
LOG(INFO) << "changing state of " << ServerState::roleToString(role) << " server from " << ServerState::stateToString(_state) << " to " << ServerState::stateToString(state);
|
||||
LOG_TOPIC(INFO, Logger::CLUSTER)
|
||||
<< "changing state of " << ServerState::roleToString(role)
|
||||
<< " server from " << ServerState::stateToString(_state)
|
||||
<< " to " << ServerState::stateToString(state);
|
||||
|
||||
_state = state;
|
||||
} else {
|
||||
LOG(ERR) << "invalid state transition for " << ServerState::roleToString(role) << " server from " << ServerState::stateToString(_state) << " to " << ServerState::stateToString(state);
|
||||
LOG_TOPIC(ERR, Logger::CLUSTER)
|
||||
<< "invalid state transition for " << ServerState::roleToString(role)
|
||||
<< " server from " << ServerState::stateToString(_state)
|
||||
<< " to " << ServerState::stateToString(state);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -668,20 +694,21 @@ bool ServerState::redetermineRole() {
|
|||
std::string saveIdOfPrimary = _idOfPrimary;
|
||||
RoleEnum role = determineRole(_localInfo, _id);
|
||||
std::string roleString = roleToString(role);
|
||||
LOG(INFO) << "Redetermined role from agency: " << roleString;
|
||||
LOG_TOPIC(INFO, Logger::CLUSTER)
|
||||
<< "Redetermined role from agency: " << roleString;
|
||||
if (role == ServerState::ROLE_UNDEFINED) {
|
||||
return false;
|
||||
}
|
||||
RoleEnum oldRole = loadRole();
|
||||
if (role != oldRole) {
|
||||
LOG(INFO) << "Changed role to: " << roleString;
|
||||
LOG_TOPIC(INFO, Logger::CLUSTER) << "Changed role to: " << roleString;
|
||||
if (!storeRole(role)) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
if (_idOfPrimary != saveIdOfPrimary) {
|
||||
LOG(INFO) << "The ID of our primary has changed!";
|
||||
LOG_TOPIC(INFO, Logger::CLUSTER) << "The ID of our primary has changed!";
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
|
@ -697,11 +724,11 @@ ServerState::RoleEnum ServerState::determineRole(std::string const& info,
|
|||
if (id.empty()) {
|
||||
int res = lookupLocalInfoToId(info, id);
|
||||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
LOG(ERR) << "Could not lookupLocalInfoToId";
|
||||
LOG_TOPIC(ERR, Logger::CLUSTER) << "Could not lookupLocalInfoToId";
|
||||
return ServerState::ROLE_UNDEFINED;
|
||||
}
|
||||
// When we get here, we have have successfully looked up our id
|
||||
LOG(DEBUG) << "Learned my own Id: " << id;
|
||||
LOG_TOPIC(DEBUG, Logger::CLUSTER) << "Learned my own Id: " << id;
|
||||
setId(id);
|
||||
}
|
||||
|
||||
|
@ -799,7 +826,10 @@ ServerState::RoleEnum ServerState::checkCoordinatorsList(
|
|||
if (!result.successful()) {
|
||||
std::string const endpoints = AgencyComm::getEndpointsString();
|
||||
|
||||
LOG(TRACE) << "Could not fetch configuration from agency endpoints (" << endpoints << "): got status code " << result._statusCode << ", message: " << result.errorMessage() << ", key: " << key;
|
||||
LOG_TOPIC(TRACE, Logger::CLUSTER)
|
||||
<< "Could not fetch configuration from agency endpoints ("
|
||||
<< endpoints << "): got status code " << result._statusCode
|
||||
<< ", message: " << result.errorMessage() << ", key: " << key;
|
||||
|
||||
return ServerState::ROLE_UNDEFINED;
|
||||
}
|
||||
|
@ -807,7 +837,8 @@ ServerState::RoleEnum ServerState::checkCoordinatorsList(
|
|||
VPackSlice coordinators = result.slice()[0].get(std::vector<std::string>(
|
||||
{comm.prefix(), "Plan", "Coordinators"}));
|
||||
if (!coordinators.isObject()) {
|
||||
LOG(TRACE) << "Got an invalid JSON response for Plan/Coordinators";
|
||||
LOG_TOPIC(TRACE, Logger::CLUSTER)
|
||||
<< "Got an invalid JSON response for Plan/Coordinators";
|
||||
return ServerState::ROLE_UNDEFINED;
|
||||
}
|
||||
|
||||
|
@ -891,7 +922,10 @@ ServerState::RoleEnum ServerState::checkServersList(std::string const& id) {
|
|||
if (!result.successful()) {
|
||||
std::string const endpoints = AgencyComm::getEndpointsString();
|
||||
|
||||
LOG(TRACE) << "Could not fetch configuration from agency endpoints (" << endpoints << "): got status code " << result._statusCode << ", message: " << result.errorMessage() << ", key: " << key;
|
||||
LOG_TOPIC(TRACE, Logger::CLUSTER)
|
||||
<< "Could not fetch configuration from agency endpoints (" << endpoints
|
||||
<< "): got status code " << result._statusCode << ", message: "
|
||||
<< result.errorMessage() << ", key: " << key;
|
||||
|
||||
return ServerState::ROLE_UNDEFINED;
|
||||
}
|
||||
|
@ -901,7 +935,8 @@ ServerState::RoleEnum ServerState::checkServersList(std::string const& id) {
|
|||
VPackSlice dbservers = result.slice()[0].get(std::vector<std::string>(
|
||||
{comm.prefix(), "Plan", "DBServers"}));
|
||||
if (!dbservers.isObject()) {
|
||||
LOG(TRACE) << "Got an invalid JSON response for Plan/DBServers";
|
||||
LOG_TOPIC(TRACE, Logger::CLUSTER)
|
||||
<< "Got an invalid JSON response for Plan/DBServers";
|
||||
return ServerState::ROLE_UNDEFINED;
|
||||
}
|
||||
|
||||
|
@ -950,7 +985,8 @@ bool ServerState::storeRole(RoleEnum role) {
|
|||
comm.setValue("Current/Coordinators/" + _id, builder.slice(), 0.0);
|
||||
|
||||
if (!result.successful()) {
|
||||
LOG(FATAL) << "unable to register coordinator in agency"; FATAL_ERROR_EXIT();
|
||||
LOG(FATAL)
|
||||
<< "unable to register coordinator in agency"; FATAL_ERROR_EXIT();
|
||||
}
|
||||
} else if (role == ServerState::ROLE_PRIMARY) {
|
||||
VPackBuilder builder;
|
||||
|
@ -965,7 +1001,8 @@ bool ServerState::storeRole(RoleEnum role) {
|
|||
comm.setValue("Current/DBServers/" + _id, builder.slice(), 0.0);
|
||||
|
||||
if (!result.successful()) {
|
||||
LOG(FATAL) << "unable to register db server in agency"; FATAL_ERROR_EXIT();
|
||||
LOG(FATAL)
|
||||
<< "unable to register db server in agency"; FATAL_ERROR_EXIT();
|
||||
}
|
||||
} else if (role == ServerState::ROLE_SECONDARY) {
|
||||
std::string keyName = _id;
|
||||
|
|
|
@ -1122,7 +1122,8 @@ static void JS_isFoxxmaster(v8::FunctionCallbackInfo<v8::Value> const& args) {
|
|||
TRI_V8_TRY_CATCH_END
|
||||
}
|
||||
|
||||
static void JS_getFoxxmasterQueueupdate(v8::FunctionCallbackInfo<v8::Value> const& args) {
|
||||
static void JS_getFoxxmasterQueueupdate(
|
||||
v8::FunctionCallbackInfo<v8::Value> const& args) {
|
||||
TRI_V8_TRY_CATCH_BEGIN(isolate);
|
||||
v8::HandleScope scope(isolate);
|
||||
|
||||
|
@ -1754,7 +1755,8 @@ static void JS_AsyncRequest(v8::FunctionCallbackInfo<v8::Value> const& args) {
|
|||
"couldn't queue async request");
|
||||
}
|
||||
|
||||
LOG(DEBUG) << "JS_AsyncRequest: request has been submitted";
|
||||
LOG_TOPIC(DEBUG, Logger::CLUSTER)
|
||||
<< "JS_AsyncRequest: request has been submitted";
|
||||
|
||||
Return_PrepareClusterCommResultForJS(args, res);
|
||||
TRI_V8_TRY_CATCH_END
|
||||
|
@ -1820,7 +1822,7 @@ static void JS_SyncRequest(v8::FunctionCallbackInfo<v8::Value> const& args) {
|
|||
"couldn't do sync request");
|
||||
}
|
||||
|
||||
LOG(DEBUG) << "JS_SyncRequest: request has been done";
|
||||
LOG_TOPIC(DEBUG, Logger::CLUSTER) << "JS_SyncRequest: request has been done";
|
||||
|
||||
Return_PrepareClusterCommResultForJS(args, *res);
|
||||
TRI_V8_TRY_CATCH_END
|
||||
|
@ -1848,7 +1850,8 @@ static void JS_Enquire(v8::FunctionCallbackInfo<v8::Value> const& args) {
|
|||
|
||||
OperationID operationID = TRI_ObjectToUInt64(args[0], true);
|
||||
|
||||
LOG(DEBUG) << "JS_Enquire: calling ClusterComm::enquire()";
|
||||
LOG_TOPIC(DEBUG, Logger::CLUSTER)
|
||||
<< "JS_Enquire: calling ClusterComm::enquire()";
|
||||
|
||||
ClusterCommResult const res = cc->enquire(operationID);
|
||||
|
||||
|
@ -1917,7 +1920,7 @@ static void JS_Wait(v8::FunctionCallbackInfo<v8::Value> const& args) {
|
|||
}
|
||||
}
|
||||
|
||||
LOG(DEBUG) << "JS_Wait: calling ClusterComm::wait()";
|
||||
LOG_TOPIC(DEBUG, Logger::CLUSTER) << "JS_Wait: calling ClusterComm::wait()";
|
||||
|
||||
ClusterCommResult const res =
|
||||
cc->wait(myclientTransactionID, mycoordTransactionID, myoperationID,
|
||||
|
@ -1979,7 +1982,7 @@ static void JS_Drop(v8::FunctionCallbackInfo<v8::Value> const& args) {
|
|||
}
|
||||
}
|
||||
|
||||
LOG(DEBUG) << "JS_Drop: calling ClusterComm::drop()";
|
||||
LOG_TOPIC(DEBUG, Logger::CLUSTER) << "JS_Drop: calling ClusterComm::drop()";
|
||||
|
||||
cc->drop(myclientTransactionID, mycoordTransactionID, myoperationID,
|
||||
myshardID);
|
||||
|
|
|
@ -38,6 +38,7 @@ std::map<std::string, LogTopic*> LogTopic::_names;
|
|||
|
||||
LogTopic Logger::AGENCY("agency", LogLevel::INFO);
|
||||
LogTopic Logger::AGENCYCOMM("agencycomm", LogLevel::INFO);
|
||||
LogTopic Logger::CLUSTER("cluster", LogLevel::INFO);
|
||||
LogTopic Logger::COLLECTOR("collector");
|
||||
LogTopic Logger::COMMUNICATION("communication", LogLevel::INFO);
|
||||
LogTopic Logger::COMPACTOR("compactor");
|
||||
|
|
Loading…
Reference in New Issue