From ea32926e45fc272219cd48e316f60e1c4fc91aa3 Mon Sep 17 00:00:00 2001 From: Andreas Streichardt Date: Fri, 22 Apr 2016 17:32:00 +0200 Subject: [PATCH] Properly callbackify create and dropcollection --- arangod/Cluster/AgencyCallback.cpp | 116 +++++++++++---- arangod/Cluster/AgencyCallback.h | 21 ++- arangod/Cluster/AgencyCallbackRegistry.cpp | 15 +- arangod/Cluster/ClusterInfo.cpp | 155 ++++++++++++--------- 4 files changed, 203 insertions(+), 104 deletions(-) diff --git a/arangod/Cluster/AgencyCallback.cpp b/arangod/Cluster/AgencyCallback.cpp index 59d917ee48..4563e766c4 100644 --- a/arangod/Cluster/AgencyCallback.cpp +++ b/arangod/Cluster/AgencyCallback.cpp @@ -21,26 +21,31 @@ /// @author Andreas Streichardt //////////////////////////////////////////////////////////////////////////////// -#include "AgencyCallback.h" +#include "Basics/MutexLocker.h" +#include "Basics/ConditionLocker.h" + +#include +#include + #include #include #include -#include -#include -#include "Basics/MutexLocker.h" + +#include "AgencyCallback.h" using namespace arangodb; AgencyCallback::AgencyCallback(AgencyComm& agency, std::string const& key, std::function const& cb, - bool needsValue) + bool needsValue, + bool needsInitialValue) : key(key), + _useCv(false), _agency(agency), _cb(cb), _needsValue(needsValue) { - - if (_needsValue) { + if (_needsValue && needsInitialValue) { refetchAndUpdate(); } } @@ -63,22 +68,42 @@ void AgencyCallback::refetchAndUpdate() { return; } - std::map::const_iterator it = - result._values.begin(); + // mop: we need to find out if it is a directory :S + // because we lost this information while parsing + std::shared_ptr bodyBuilder = + VPackParser::fromJson(result.body().c_str()); - if (it == result._values.end()) { - std::shared_ptr newData = std::make_shared(); - newData->add(VPackSlice::noneSlice()); - checkValue(newData); - } else { - checkValue(it->second._vpack); + VPackSlice slice = bodyBuilder->slice(); + if (!slice.isObject() || !slice.hasKey("node")) { + LOG(ERR) << "Invalid structure " << result.body(); + return; } + + VPackSlice node = slice.get("node"); + if (!slice.isObject()) { + LOG(ERR) << "Node is not an object"; + return; + } + + bool isDir = node.hasKey("dir"); + + std::shared_ptr newData = std::make_shared(); + if (isDir) { + VPackObjectBuilder builder(newData.get()); + for (auto& it: result._values) { + newData->add(it.first, it.second._vpack->slice()); + } + } else if (result._values.size() == 0) { + newData->add(VPackSlice::noneSlice()); + } else { + newData->add(result._values.begin()->second._vpack->slice()); + } + checkValue(newData); } void AgencyCallback::checkValue(std::shared_ptr newData) { if (!_lastData || !_lastData->slice().equals(newData->slice())) { - LOG(DEBUG) << "Got new value " << newData->slice().typeName(); - LOG(DEBUG) << "Got new value " << newData->toJson(); + LOG(DEBUG) << "Got new value " << newData->slice().typeName() << " " << newData->toJson(); if (execute(newData)) { _lastData = newData; } else { @@ -89,22 +114,65 @@ void AgencyCallback::checkValue(std::shared_ptr newData) { bool AgencyCallback::executeEmpty() { LOG(DEBUG) << "Executing (empty)"; - MUTEX_LOCKER(locker, _lock); - return _cb(VPackSlice::noneSlice()); + bool result; + { + MUTEX_LOCKER(locker, _lock); + result = _cb(VPackSlice::noneSlice()); + } + + if (_useCv) { + CONDITION_LOCKER(locker, _cv); + _cv.signal(); + } + return result; } bool AgencyCallback::execute(std::shared_ptr newData) { LOG(DEBUG) << "Executing"; - MUTEX_LOCKER(locker, _lock); - return _cb(newData->slice()); + bool result; + { + MUTEX_LOCKER(locker, _lock); + result = _cb(newData->slice()); + } + + if (_useCv) { + CONDITION_LOCKER(locker, _cv); + _cv.signal(); + } + return result; } void AgencyCallback::waitWithFailover(double timeout) { - // mop: todo thread safe? check with max - std::shared_ptr beginData = _lastData; + VPackSlice compareSlice; + if (_lastData) { + compareSlice = _lastData->slice(); + } else { + compareSlice = VPackSlice::noneSlice(); + } + std::this_thread::sleep_for(std::chrono::milliseconds(static_cast(timeout * 1000))); - if (!_lastData || _lastData->slice().equals(beginData->slice())) { + if (!_lastData || _lastData->slice().equals(compareSlice)) { + LOG(DEBUG) << "Waiting done and nothing happended. Refetching to be sure"; + // mop: watches have not triggered during our sleep...recheck to be sure + refetchAndUpdate(); + } +} + +void AgencyCallback::waitForExecution(double maxTimeout) { + VPackSlice compareSlice; + if (_lastData) { + compareSlice = _lastData->slice(); + } else { + compareSlice = VPackSlice::noneSlice(); + } + + _useCv = true; + CONDITION_LOCKER(locker, _cv); + locker.wait(maxTimeout * 1000000); + _useCv = false; + + if (!_lastData || _lastData->slice().equals(compareSlice)) { LOG(DEBUG) << "Waiting done and nothing happended. Refetching to be sure"; // mop: watches have not triggered during our sleep...recheck to be sure refetchAndUpdate(); diff --git a/arangod/Cluster/AgencyCallback.h b/arangod/Cluster/AgencyCallback.h index 6682d35dcf..3c8852e885 100644 --- a/arangod/Cluster/AgencyCallback.h +++ b/arangod/Cluster/AgencyCallback.h @@ -21,16 +21,18 @@ /// @author Andreas Streichardt //////////////////////////////////////////////////////////////////////////////// -//XXX #warning MOP nope, include guards -#pragma once +#ifndef ARANGODB_CLUSTER_AGENCYCALLBACK_H +#define ARANGODB_CLUSTER_AGENCYCALLBACK_H + +#include "Basics/ConditionVariable.h" +#include "Basics/Mutex.h" -//XXX #warning MOP order, Common.h #include #include -#include "Cluster/AgencyComm.h" #include #include -#include "Basics/Mutex.h" + +#include "Cluster/AgencyComm.h" namespace arangodb { @@ -40,7 +42,8 @@ public: /// @brief ctor ////////////////////////////////////////////////////////////////////////////// AgencyCallback(AgencyComm&, std::string const&, - std::function const&, bool needsValue); + std::function const&, bool needsValue, + bool needsInitialValue = false); ////////////////////////////////////////////////////////////////////////////// /// @brief wait a specified timeout. execute cb if watch didn't fire @@ -50,10 +53,14 @@ public: std::string const key; void refetchAndUpdate(); + void waitForExecution(double); private: arangodb::Mutex _lock; + arangodb::basics::ConditionVariable _cv; + bool _useCv; + AgencyComm& _agency; std::function const _cb; std::shared_ptr _lastData; @@ -68,3 +75,5 @@ private: }; } + +#endif diff --git a/arangod/Cluster/AgencyCallbackRegistry.cpp b/arangod/Cluster/AgencyCallbackRegistry.cpp index b37fbf52be..262855eec9 100644 --- a/arangod/Cluster/AgencyCallbackRegistry.cpp +++ b/arangod/Cluster/AgencyCallbackRegistry.cpp @@ -23,6 +23,14 @@ #include "AgencyCallbackRegistry.h" +#include "Basics/Exceptions.h" +#include "Basics/ReadLocker.h" +#include "Basics/WriteLocker.h" + +#include "Cluster/ServerState.h" +#include "Endpoint/Endpoint.h" +#include "Random/RandomGenerator.h" + #include #warning MOP why? use ConditionVariable #include @@ -32,13 +40,6 @@ #include #include -#include "Basics/Exceptions.h" -#include "Basics/ReadLocker.h" -#include "Basics/WriteLocker.h" -#include "Cluster/ServerState.h" -#include "Endpoint/Endpoint.h" -#include "Random/RandomGenerator.h" - using namespace arangodb; AgencyCallbackRegistry::AgencyCallbackRegistry(std::string const& callbackBasePath) diff --git a/arangod/Cluster/ClusterInfo.cpp b/arangod/Cluster/ClusterInfo.cpp index 2fbe66c1e3..34904b5aaa 100644 --- a/arangod/Cluster/ClusterInfo.cpp +++ b/arangod/Cluster/ClusterInfo.cpp @@ -1251,6 +1251,47 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName, if (ac.exists("Plan/Collections/" + databaseName + "/" + collectionID)) { return setErrormsg(TRI_ERROR_CLUSTER_COLLECTION_ID_EXISTS, errorMsg); } + + int dbServerResult = -1; + + std::function dbServerChanged = [&](VPackSlice const& result) { + if (result.isObject() && result.length() == (size_t) numberOfShards) { + std::string tmpMsg = ""; + bool tmpHaveError = false; + + for (auto const& p: VPackObjectIterator(result)) { + if (arangodb::basics::VelocyPackHelper::getBooleanValue( + p.value, "error", false)) { + tmpHaveError = true; + tmpMsg += " shardID:" + p.key.copyString() + ":"; + tmpMsg += arangodb::basics::VelocyPackHelper::getStringValue( + p.value, "errorMessage", ""); + if (p.value.hasKey("errorNum")) { + VPackSlice const errorNum = p.value.get("errorNum"); + if (errorNum.isNumber()) { + tmpMsg += " (errNum="; + tmpMsg += basics::StringUtils::itoa( + errorNum.getNumericValue()); + tmpMsg += ")"; + } + } + } + } + loadCurrentCollections(); + if (tmpHaveError) { + errorMsg = "Error in creation of collection:" + tmpMsg; + dbServerResult = TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION; + return true; + } + dbServerResult = setErrormsg(TRI_ERROR_NO_ERROR, errorMsg); + return true; + } + + return true; + }; + auto agencyCallback = std::make_shared( + ac, "Current/Collections/" + databaseName + "/" + collectionID, dbServerChanged, true, false); + _agencyCallbackRegistry->registerCallback(agencyCallback); VPackBuilder builder; builder.add(VPackValue(json.toJson())); @@ -1284,43 +1325,15 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName, std::string const where = "Current/Collections/" + databaseName + "/" + collectionID; while (TRI_microtime() <= endTime) { - res.clear(); - res = ac.getValues(where, true); + agencyCallback->waitForExecution(interval); - if (res.successful() && res.parse(where + "/", false)) { - if (res._values.size() == (size_t)numberOfShards) { - std::string tmpMsg = ""; - bool tmpHaveError = false; - for (auto const& p : res._values) { - VPackSlice const slice = p.second._vpack->slice(); - if (arangodb::basics::VelocyPackHelper::getBooleanValue( - slice, "error", false)) { - tmpHaveError = true; - tmpMsg += " shardID:" + p.first + ":"; - tmpMsg += arangodb::basics::VelocyPackHelper::getStringValue( - slice, "errorMessage", ""); - if (slice.hasKey("errorNum")) { - VPackSlice const errorNum = slice.get("errorNum"); - if (errorNum.isNumber()) { - tmpMsg += " (errNum="; - tmpMsg += basics::StringUtils::itoa( - errorNum.getNumericValue()); - tmpMsg += ")"; - } - } - } - } - loadCurrentCollections(); - if (tmpHaveError) { - errorMsg = "Error in creation of collection:" + tmpMsg; - return TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION; - } - return setErrormsg(TRI_ERROR_NO_ERROR, errorMsg); - } + if (dbServerResult >= 0) { + break; } - - res.clear(); - _agencyCallbackRegistry->awaitNextChange("Current/Version", interval); + } + _agencyCallbackRegistry->unregisterCallback(agencyCallback); + if (dbServerResult >= 0) { + return dbServerResult; } // LOG(ERR) << "GOT TIMEOUT. NUMBEROFSHARDS: " << numberOfShards; @@ -1343,7 +1356,41 @@ int ClusterInfo::dropCollectionCoordinator(std::string const& databaseName, double const realTimeout = getTimeout(timeout); double const endTime = TRI_microtime() + realTimeout; double const interval = getPollInterval(); + + int dbServerResult = -1; + std::function dbServerChanged = [&](VPackSlice const& result) { + if (result.isObject() && result.length() == 0) { + // ...remove the entire directory for the collection + AgencyCommLocker locker("Current", "WRITE"); + if (locker.successful()) { + AgencyCommResult res; + res = ac.removeValues( + "Current/Collections/" + databaseName + "/" + collectionID, true); + if (res.successful()) { + dbServerResult = setErrormsg(TRI_ERROR_NO_ERROR, errorMsg); + return true; + } + dbServerResult = setErrormsg( + TRI_ERROR_CLUSTER_COULD_NOT_REMOVE_COLLECTION_IN_CURRENT, + errorMsg); + return true; + } + loadCurrentCollections(); + dbServerResult = setErrormsg(TRI_ERROR_NO_ERROR, errorMsg); + return true; + } + return true; + }; + + // monitor the entry for the collection + std::string const where = + "Current/Collections/" + databaseName + "/" + collectionID; + + auto agencyCallback = std::make_shared( + ac, where, dbServerChanged, true, false); + _agencyCallbackRegistry->registerCallback(agencyCallback); + { AgencyCommLocker locker("Plan", "WRITE"); @@ -1369,40 +1416,14 @@ int ClusterInfo::dropCollectionCoordinator(std::string const& databaseName, // Update our own cache: loadPlannedCollections(); - // monitor the entry for the collection - std::string const where = - "Current/Collections/" + databaseName + "/" + collectionID; while (TRI_microtime() <= endTime) { - res.clear(); - res = ac.getValues(where, true); - if (!res.successful()) { - // It seems the collection is already gone, do not wait further - errorMsg = "Collection already gone."; - return setErrormsg(TRI_ERROR_NO_ERROR, errorMsg); + agencyCallback->waitForExecution(interval); + if (dbServerResult >= 0) { + break; } - if (res.successful() && res.parse(where + "/", false)) { - // if there are no more active shards for the collection... - if (res._values.size() == 0) { - // ...remove the entire directory for the collection - AgencyCommLocker locker("Current", "WRITE"); - if (locker.successful()) { - res.clear(); - res = ac.removeValues( - "Current/Collections/" + databaseName + "/" + collectionID, true); - if (res.successful()) { - return setErrormsg(TRI_ERROR_NO_ERROR, errorMsg); - } - return setErrormsg( - TRI_ERROR_CLUSTER_COULD_NOT_REMOVE_COLLECTION_IN_CURRENT, - errorMsg); - } - loadCurrentCollections(); - return setErrormsg(TRI_ERROR_NO_ERROR, errorMsg); - } - } - - res.clear(); - _agencyCallbackRegistry->awaitNextChange("Current/Version", interval); + } + if (dbServerResult >= 0) { + return dbServerResult; } return setErrormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg); }