1
0
Fork 0

no commlockers in ClusterFeature anymore

This commit is contained in:
Kaveh Vahedipour 2016-06-22 18:19:12 +02:00
parent 2cceb1bdea
commit d55c85f8fa
1 changed files with 41 additions and 44 deletions

View File

@ -417,35 +417,27 @@ void ClusterFeature::start() {
AgencyCommResult result; AgencyCommResult result;
while (true) { while (true) {
AgencyCommLocker locker("Current", "WRITE");
bool success = locker.successful();
if (success) { VPackBuilder builder;
VPackBuilder builder; try {
try { VPackObjectBuilder b(&builder);
VPackObjectBuilder b(&builder); builder.add("endpoint", VPackValue(_myAddress));
builder.add("endpoint", VPackValue(_myAddress)); } catch (...) {
} catch (...) { LOG(FATAL) << "out of memory";
locker.unlock();
LOG(FATAL) << "out of memory";
FATAL_ERROR_EXIT();
}
result = comm.setValue("Current/ServersRegistered/" + _myId,
builder.slice(), 0.0);
}
if (!result.successful()) {
locker.unlock();
LOG(FATAL) << "unable to register server in agency: http code: "
<< result.httpCode() << ", body: " << result.body();
FATAL_ERROR_EXIT(); FATAL_ERROR_EXIT();
} }
if (success) { result = comm.setValue("Current/ServersRegistered/" + _myId,
builder.slice(), 0.0);
if (!result.successful()) {
LOG(FATAL) << "unable to register server in agency: http code: "
<< result.httpCode() << ", body: " << result.body();
FATAL_ERROR_EXIT();
} else {
break; break;
} }
sleep(1); sleep(1);
} }
@ -471,10 +463,10 @@ void ClusterFeature::unprepare() {
// change into shutdown state // change into shutdown state
ServerState::instance()->setState(ServerState::STATE_SHUTDOWN); ServerState::instance()->setState(ServerState::STATE_SHUTDOWN);
AgencyComm comm; AgencyComm comm;
comm.sendServerState(0.0); comm.sendServerState(0.0);
if (_heartbeatThread != nullptr) { if (_heartbeatThread != nullptr) {
int counter = 0; int counter = 0;
while (_heartbeatThread->isRunning()) { while (_heartbeatThread->isRunning()) {
@ -501,27 +493,32 @@ void ClusterFeature::unprepare() {
AgencyComm comm; AgencyComm comm;
comm.sendServerState(0.0); comm.sendServerState(0.0);
// Try only once to unregister because maybe the agencycomm
// is shutting down as well...
ServerState::RoleEnum role = ServerState::instance()->getRole();
AgencyWriteTransaction unreg;
{ // Remove from role
// Try only once to unregister because maybe the agencycomm if (role == ServerState::ROLE_PRIMARY) {
// is shutting down as well... unreg.operations.push_back(
AgencyCommLocker locker("Current", "WRITE", 120.0, 1.000); AgencyOperation("Current/DBServers/" + _myId,
AgencySimpleOperationType::DELETE_OP));
if (locker.successful()) { } else if (role == ServerState::ROLE_COORDINATOR) {
// unregister ourselves unreg.operations.push_back(
ServerState::RoleEnum role = ServerState::instance()->getRole(); AgencyOperation("Current/Coordinators/" + _myId,
AgencySimpleOperationType::DELETE_OP));
if (role == ServerState::ROLE_PRIMARY) {
comm.removeValues("Current/DBServers/" + _myId, false);
} else if (role == ServerState::ROLE_COORDINATOR) {
comm.removeValues("Current/Coordinators/" + _myId, false);
}
// unregister ourselves
comm.removeValues("Current/ServersRegistered/" + _myId, false);
}
} }
// Unregister
unreg.operations.push_back(
AgencyOperation("Current/ServersRegistered/" + _myId,
AgencySimpleOperationType::DELETE_OP));
comm.sendTransactionWithFailover(unreg, 120.0);
while (_heartbeatThread->isRunning()) { while (_heartbeatThread->isRunning()) {
usleep(50000); usleep(50000);
} }