1
0
Fork 0

getting rid of locks in cluster comm

This commit is contained in:
Kaveh Vahedipour 2016-06-22 16:42:43 +02:00
parent 39e3923094
commit 11a36db83c
1 changed files with 64 additions and 87 deletions

View File

@ -1220,60 +1220,59 @@ int ClusterInfo::setCollectionPropertiesCoordinator(
AgencyComm ac;
AgencyCommResult res;
{
AgencyCommLocker locker("Plan", "WRITE");
if (!locker.successful()) {
return TRI_ERROR_CLUSTER_COULD_NOT_LOCK_PLAN;
}
if (!ac.exists("Plan/Databases/" + databaseName)) {
return TRI_ERROR_ARANGO_DATABASE_NOT_FOUND;
}
res = ac.getValues("Plan/Collections/" + databaseName+"/" + collectionID);
if (!res.successful()) {
return TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND;
}
velocypack::Slice collection =
res.slice()[0].get(std::vector<std::string>(
{AgencyComm::prefix(), "Plan", "Collections",
databaseName, collectionID}));
if (!collection.isObject()) {
return TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND;
}
VPackBuilder copy;
try {
VPackObjectBuilder b(&copy);
for (auto const& entry : VPackObjectIterator(collection)) {
std::string key = entry.key.copyString();
// Copy all values except the following
// They are overwritten later
if (key != "doCompact" && key != "journalSize" &&
key != "waitForSync" && key != "indexBuckets") {
copy.add(key, entry.value);
}
}
copy.add("doCompact", VPackValue(info->doCompact()));
copy.add("journalSize", VPackValue(info->maximalSize()));
copy.add("waitForSync", VPackValue(info->waitForSync()));
copy.add("indexBuckets", VPackValue(info->indexBuckets()));
} catch (...) {
return TRI_ERROR_OUT_OF_MEMORY;
}
res.clear();
res = ac.setValue("Plan/Collections/" + databaseName + "/" + collectionID,
copy.slice(), 0.0);
AgencyPrecondition databaseExists(
"Plan/Databases/" + databaseName, AgencyPrecondition::EMPTY, false);
res = ac.getValues("Plan/Collections/" + databaseName+"/" + collectionID);
if (!res.successful()) {
return TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND;
}
velocypack::Slice collection =
res.slice()[0].get(std::vector<std::string>(
{AgencyComm::prefix(), "Plan", "Collections",
databaseName, collectionID}));
if (!collection.isObject()) {
return TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND;
}
VPackBuilder copy;
try {
VPackObjectBuilder b(&copy);
for (auto const& entry : VPackObjectIterator(collection)) {
std::string key = entry.key.copyString();
// Copy all values except the following
// They are overwritten later
if (key != "doCompact" && key != "journalSize" &&
key != "waitForSync" && key != "indexBuckets") {
copy.add(key, entry.value);
}
}
copy.add("doCompact", VPackValue(info->doCompact()));
copy.add("journalSize", VPackValue(info->maximalSize()));
copy.add("waitForSync", VPackValue(info->waitForSync()));
copy.add("indexBuckets", VPackValue(info->indexBuckets()));
} catch (...) {
return TRI_ERROR_OUT_OF_MEMORY;
}
res.clear();
AgencyOperation setColl (
"Plan/Collections/" + databaseName + "/" + collectionID,
AgencyValueOperationType::SET, copy.slice());
AgencyWriteTransaction trans (setColl, databaseExists);
res = ac.sendTransactionWithFailover(trans);
if (res.successful()) {
loadPlan();
return TRI_ERROR_NO_ERROR;
} else {
return TRI_ERROR_ARANGO_DATABASE_NOT_FOUND;
}
return TRI_ERROR_INTERNAL;
@ -1290,17 +1289,9 @@ int ClusterInfo::setCollectionStatusCoordinator(
AgencyComm ac;
AgencyCommResult res;
{
AgencyCommLocker locker("Plan", "WRITE");
if (!locker.successful()) {
return TRI_ERROR_CLUSTER_COULD_NOT_LOCK_PLAN;
}
if (!ac.exists("Plan/Databases/" + databaseName)) {
return TRI_ERROR_ARANGO_DATABASE_NOT_FOUND;
}
AgencyPrecondition databaseExists(
"Plan/Databases/" + databaseName, AgencyPrecondition::EMPTY, false);
res = ac.getValues("Plan/Collections/" + databaseName +"/" + collectionID);
if (!res.successful()) {
@ -1338,13 +1329,20 @@ int ClusterInfo::setCollectionStatusCoordinator(
return TRI_ERROR_OUT_OF_MEMORY;
}
res.clear();
res = ac.setValue("Plan/Collections/" + databaseName + "/" + collectionID,
builder.slice(), 0.0);
}
AgencyOperation setColl (
"Plan/Collections/" + databaseName + "/" + collectionID,
AgencyValueOperationType::SET, builder.slice());
AgencyWriteTransaction trans (setColl, databaseExists);
res = ac.sendTransactionWithFailover(trans);
if (res.successful()) {
loadPlan();
return TRI_ERROR_NO_ERROR;
} else {
return TRI_ERROR_ARANGO_DATABASE_NOT_FOUND;
}
return TRI_ERROR_INTERNAL;
@ -1880,14 +1878,7 @@ void ClusterInfo::loadServers() {
}
// Now contact the agency:
AgencyCommResult result;
{
AgencyCommLocker locker("Current", "READ");
if (locker.successful()) {
result = _agency.getValues(prefixServers);
}
}
AgencyCommResult result = _agency.getValues(prefixServers);
if (result.successful()) {
@ -2012,14 +2003,7 @@ void ClusterInfo::loadCurrentCoordinators() {
}
// Now contact the agency:
AgencyCommResult result;
{
AgencyCommLocker locker("Current", "READ");
if (locker.successful()) {
result = _agency.getValues(prefixCurrentCoordinators);
}
}
AgencyCommResult result = _agency.getValues(prefixCurrentCoordinators);
if (result.successful()) {
@ -2069,14 +2053,7 @@ void ClusterInfo::loadCurrentDBServers() {
}
// Now contact the agency:
AgencyCommResult result;
{
AgencyCommLocker locker("Current", "READ");
if (locker.successful()) {
result = _agency.getValues(prefixCurrentDBServers);
}
}
AgencyCommResult result = _agency.getValues(prefixCurrentDBServers);
if (result.successful()) {