1
0
Fork 0

Properly callbackify create and dropcollection

This commit is contained in:
Andreas Streichardt 2016-04-22 17:32:00 +02:00
parent 3f1b75c99f
commit ea32926e45
4 changed files with 203 additions and 104 deletions

View File

@ -21,26 +21,31 @@
/// @author Andreas Streichardt
////////////////////////////////////////////////////////////////////////////////
#include "AgencyCallback.h"
#include "Basics/MutexLocker.h"
#include "Basics/ConditionLocker.h"
#include <chrono>
#include <thread>
#include <velocypack/Exception.h>
#include <velocypack/Parser.h>
#include <velocypack/velocypack-aliases.h>
#include <chrono>
#include <thread>
#include "Basics/MutexLocker.h"
#include "AgencyCallback.h"
using namespace arangodb;
AgencyCallback::AgencyCallback(AgencyComm& agency,
std::string const& key,
std::function<bool(VPackSlice const&)> const& cb,
bool needsValue)
bool needsValue,
bool needsInitialValue)
: key(key),
_useCv(false),
_agency(agency),
_cb(cb),
_needsValue(needsValue) {
if (_needsValue) {
if (_needsValue && needsInitialValue) {
refetchAndUpdate();
}
}
@ -63,22 +68,42 @@ void AgencyCallback::refetchAndUpdate() {
return;
}
std::map<std::string, AgencyCommResultEntry>::const_iterator it =
result._values.begin();
// mop: we need to find out if it is a directory :S
// because we lost this information while parsing
std::shared_ptr<VPackBuilder> bodyBuilder =
VPackParser::fromJson(result.body().c_str());
if (it == result._values.end()) {
std::shared_ptr<VPackBuilder> newData = std::make_shared<VPackBuilder>();
newData->add(VPackSlice::noneSlice());
checkValue(newData);
} else {
checkValue(it->second._vpack);
VPackSlice slice = bodyBuilder->slice();
if (!slice.isObject() || !slice.hasKey("node")) {
LOG(ERR) << "Invalid structure " << result.body();
return;
}
VPackSlice node = slice.get("node");
if (!slice.isObject()) {
LOG(ERR) << "Node is not an object";
return;
}
bool isDir = node.hasKey("dir");
std::shared_ptr<VPackBuilder> newData = std::make_shared<VPackBuilder>();
if (isDir) {
VPackObjectBuilder builder(newData.get());
for (auto& it: result._values) {
newData->add(it.first, it.second._vpack->slice());
}
} else if (result._values.size() == 0) {
newData->add(VPackSlice::noneSlice());
} else {
newData->add(result._values.begin()->second._vpack->slice());
}
checkValue(newData);
}
void AgencyCallback::checkValue(std::shared_ptr<VPackBuilder> newData) {
if (!_lastData || !_lastData->slice().equals(newData->slice())) {
LOG(DEBUG) << "Got new value " << newData->slice().typeName();
LOG(DEBUG) << "Got new value " << newData->toJson();
LOG(DEBUG) << "Got new value " << newData->slice().typeName() << " " << newData->toJson();
if (execute(newData)) {
_lastData = newData;
} else {
@ -89,22 +114,65 @@ void AgencyCallback::checkValue(std::shared_ptr<VPackBuilder> newData) {
bool AgencyCallback::executeEmpty() {
LOG(DEBUG) << "Executing (empty)";
bool result;
{
MUTEX_LOCKER(locker, _lock);
return _cb(VPackSlice::noneSlice());
result = _cb(VPackSlice::noneSlice());
}
if (_useCv) {
CONDITION_LOCKER(locker, _cv);
_cv.signal();
}
return result;
}
bool AgencyCallback::execute(std::shared_ptr<VPackBuilder> newData) {
LOG(DEBUG) << "Executing";
bool result;
{
MUTEX_LOCKER(locker, _lock);
return _cb(newData->slice());
result = _cb(newData->slice());
}
if (_useCv) {
CONDITION_LOCKER(locker, _cv);
_cv.signal();
}
return result;
}
void AgencyCallback::waitWithFailover(double timeout) {
// mop: todo thread safe? check with max
std::shared_ptr<VPackBuilder> beginData = _lastData;
VPackSlice compareSlice;
if (_lastData) {
compareSlice = _lastData->slice();
} else {
compareSlice = VPackSlice::noneSlice();
}
std::this_thread::sleep_for(std::chrono::milliseconds(static_cast<int>(timeout * 1000)));
if (!_lastData || _lastData->slice().equals(beginData->slice())) {
if (!_lastData || _lastData->slice().equals(compareSlice)) {
LOG(DEBUG) << "Waiting done and nothing happended. Refetching to be sure";
// mop: watches have not triggered during our sleep...recheck to be sure
refetchAndUpdate();
}
}
void AgencyCallback::waitForExecution(double maxTimeout) {
VPackSlice compareSlice;
if (_lastData) {
compareSlice = _lastData->slice();
} else {
compareSlice = VPackSlice::noneSlice();
}
_useCv = true;
CONDITION_LOCKER(locker, _cv);
locker.wait(maxTimeout * 1000000);
_useCv = false;
if (!_lastData || _lastData->slice().equals(compareSlice)) {
LOG(DEBUG) << "Waiting done and nothing happended. Refetching to be sure";
// mop: watches have not triggered during our sleep...recheck to be sure
refetchAndUpdate();

View File

@ -21,16 +21,18 @@
/// @author Andreas Streichardt
////////////////////////////////////////////////////////////////////////////////
//XXX #warning MOP nope, include guards
#pragma once
#ifndef ARANGODB_CLUSTER_AGENCYCALLBACK_H
#define ARANGODB_CLUSTER_AGENCYCALLBACK_H
#include "Basics/ConditionVariable.h"
#include "Basics/Mutex.h"
//XXX #warning MOP order, Common.h
#include <functional>
#include <memory>
#include "Cluster/AgencyComm.h"
#include <velocypack/Builder.h>
#include <velocypack/velocypack-aliases.h>
#include "Basics/Mutex.h"
#include "Cluster/AgencyComm.h"
namespace arangodb {
@ -40,7 +42,8 @@ public:
/// @brief ctor
//////////////////////////////////////////////////////////////////////////////
AgencyCallback(AgencyComm&, std::string const&,
std::function<bool(VPackSlice const&)> const&, bool needsValue);
std::function<bool(VPackSlice const&)> const&, bool needsValue,
bool needsInitialValue = false);
//////////////////////////////////////////////////////////////////////////////
/// @brief wait a specified timeout. execute cb if watch didn't fire
@ -50,10 +53,14 @@ public:
std::string const key;
void refetchAndUpdate();
void waitForExecution(double);
private:
arangodb::Mutex _lock;
arangodb::basics::ConditionVariable _cv;
bool _useCv;
AgencyComm& _agency;
std::function<bool(VPackSlice const&)> const _cb;
std::shared_ptr<VPackBuilder> _lastData;
@ -68,3 +75,5 @@ private:
};
}
#endif

View File

@ -23,6 +23,14 @@
#include "AgencyCallbackRegistry.h"
#include "Basics/Exceptions.h"
#include "Basics/ReadLocker.h"
#include "Basics/WriteLocker.h"
#include "Cluster/ServerState.h"
#include "Endpoint/Endpoint.h"
#include "Random/RandomGenerator.h"
#include <ctime>
#warning MOP why? use ConditionVariable
#include <condition_variable>
@ -32,13 +40,6 @@
#include <velocypack/Slice.h>
#include <velocypack/velocypack-aliases.h>
#include "Basics/Exceptions.h"
#include "Basics/ReadLocker.h"
#include "Basics/WriteLocker.h"
#include "Cluster/ServerState.h"
#include "Endpoint/Endpoint.h"
#include "Random/RandomGenerator.h"
using namespace arangodb;
AgencyCallbackRegistry::AgencyCallbackRegistry(std::string const& callbackBasePath)

View File

@ -1252,6 +1252,47 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName,
return setErrormsg(TRI_ERROR_CLUSTER_COLLECTION_ID_EXISTS, errorMsg);
}
int dbServerResult = -1;
std::function<bool(VPackSlice const& result)> dbServerChanged = [&](VPackSlice const& result) {
if (result.isObject() && result.length() == (size_t) numberOfShards) {
std::string tmpMsg = "";
bool tmpHaveError = false;
for (auto const& p: VPackObjectIterator(result)) {
if (arangodb::basics::VelocyPackHelper::getBooleanValue(
p.value, "error", false)) {
tmpHaveError = true;
tmpMsg += " shardID:" + p.key.copyString() + ":";
tmpMsg += arangodb::basics::VelocyPackHelper::getStringValue(
p.value, "errorMessage", "");
if (p.value.hasKey("errorNum")) {
VPackSlice const errorNum = p.value.get("errorNum");
if (errorNum.isNumber()) {
tmpMsg += " (errNum=";
tmpMsg += basics::StringUtils::itoa(
errorNum.getNumericValue<uint32_t>());
tmpMsg += ")";
}
}
}
}
loadCurrentCollections();
if (tmpHaveError) {
errorMsg = "Error in creation of collection:" + tmpMsg;
dbServerResult = TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION;
return true;
}
dbServerResult = setErrormsg(TRI_ERROR_NO_ERROR, errorMsg);
return true;
}
return true;
};
auto agencyCallback = std::make_shared<AgencyCallback>(
ac, "Current/Collections/" + databaseName + "/" + collectionID, dbServerChanged, true, false);
_agencyCallbackRegistry->registerCallback(agencyCallback);
VPackBuilder builder;
builder.add(VPackValue(json.toJson()));
@ -1284,43 +1325,15 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName,
std::string const where =
"Current/Collections/" + databaseName + "/" + collectionID;
while (TRI_microtime() <= endTime) {
res.clear();
res = ac.getValues(where, true);
agencyCallback->waitForExecution(interval);
if (res.successful() && res.parse(where + "/", false)) {
if (res._values.size() == (size_t)numberOfShards) {
std::string tmpMsg = "";
bool tmpHaveError = false;
for (auto const& p : res._values) {
VPackSlice const slice = p.second._vpack->slice();
if (arangodb::basics::VelocyPackHelper::getBooleanValue(
slice, "error", false)) {
tmpHaveError = true;
tmpMsg += " shardID:" + p.first + ":";
tmpMsg += arangodb::basics::VelocyPackHelper::getStringValue(
slice, "errorMessage", "");
if (slice.hasKey("errorNum")) {
VPackSlice const errorNum = slice.get("errorNum");
if (errorNum.isNumber()) {
tmpMsg += " (errNum=";
tmpMsg += basics::StringUtils::itoa(
errorNum.getNumericValue<uint32_t>());
tmpMsg += ")";
if (dbServerResult >= 0) {
break;
}
}
}
}
loadCurrentCollections();
if (tmpHaveError) {
errorMsg = "Error in creation of collection:" + tmpMsg;
return TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION;
}
return setErrormsg(TRI_ERROR_NO_ERROR, errorMsg);
}
}
res.clear();
_agencyCallbackRegistry->awaitNextChange("Current/Version", interval);
_agencyCallbackRegistry->unregisterCallback(agencyCallback);
if (dbServerResult >= 0) {
return dbServerResult;
}
// LOG(ERR) << "GOT TIMEOUT. NUMBEROFSHARDS: " << numberOfShards;
@ -1344,6 +1357,40 @@ int ClusterInfo::dropCollectionCoordinator(std::string const& databaseName,
double const endTime = TRI_microtime() + realTimeout;
double const interval = getPollInterval();
int dbServerResult = -1;
std::function<bool(VPackSlice const& result)> dbServerChanged = [&](VPackSlice const& result) {
if (result.isObject() && result.length() == 0) {
// ...remove the entire directory for the collection
AgencyCommLocker locker("Current", "WRITE");
if (locker.successful()) {
AgencyCommResult res;
res = ac.removeValues(
"Current/Collections/" + databaseName + "/" + collectionID, true);
if (res.successful()) {
dbServerResult = setErrormsg(TRI_ERROR_NO_ERROR, errorMsg);
return true;
}
dbServerResult = setErrormsg(
TRI_ERROR_CLUSTER_COULD_NOT_REMOVE_COLLECTION_IN_CURRENT,
errorMsg);
return true;
}
loadCurrentCollections();
dbServerResult = setErrormsg(TRI_ERROR_NO_ERROR, errorMsg);
return true;
}
return true;
};
// monitor the entry for the collection
std::string const where =
"Current/Collections/" + databaseName + "/" + collectionID;
auto agencyCallback = std::make_shared<AgencyCallback>(
ac, where, dbServerChanged, true, false);
_agencyCallbackRegistry->registerCallback(agencyCallback);
{
AgencyCommLocker locker("Plan", "WRITE");
@ -1369,40 +1416,14 @@ int ClusterInfo::dropCollectionCoordinator(std::string const& databaseName,
// Update our own cache:
loadPlannedCollections();
// monitor the entry for the collection
std::string const where =
"Current/Collections/" + databaseName + "/" + collectionID;
while (TRI_microtime() <= endTime) {
res.clear();
res = ac.getValues(where, true);
if (!res.successful()) {
// It seems the collection is already gone, do not wait further
errorMsg = "Collection already gone.";
return setErrormsg(TRI_ERROR_NO_ERROR, errorMsg);
}
if (res.successful() && res.parse(where + "/", false)) {
// if there are no more active shards for the collection...
if (res._values.size() == 0) {
// ...remove the entire directory for the collection
AgencyCommLocker locker("Current", "WRITE");
if (locker.successful()) {
res.clear();
res = ac.removeValues(
"Current/Collections/" + databaseName + "/" + collectionID, true);
if (res.successful()) {
return setErrormsg(TRI_ERROR_NO_ERROR, errorMsg);
}
return setErrormsg(
TRI_ERROR_CLUSTER_COULD_NOT_REMOVE_COLLECTION_IN_CURRENT,
errorMsg);
}
loadCurrentCollections();
return setErrormsg(TRI_ERROR_NO_ERROR, errorMsg);
agencyCallback->waitForExecution(interval);
if (dbServerResult >= 0) {
break;
}
}
res.clear();
_agencyCallbackRegistry->awaitNextChange("Current/Version", interval);
if (dbServerResult >= 0) {
return dbServerResult;
}
return setErrormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg);
}