1
0
Fork 0

Aggregator refactoring

This commit is contained in:
Simon Grätzer 2017-01-20 14:42:01 +01:00
parent 1ae122e470
commit d8ba7ffc34
17 changed files with 184 additions and 191 deletions

View File

@ -32,107 +32,86 @@
namespace arangodb {
namespace pregel {
typedef std::string AggregatorID;
class Aggregator {
bool _permanent;
Aggregator(const Aggregator&) = delete;
Aggregator& operator=(const Aggregator&) = delete;
class IAggregator {
IAggregator(const IAggregator&) = delete;
IAggregator& operator=(const IAggregator&) = delete;
public:
Aggregator(bool perm = false) : _permanent(perm) {}
virtual ~Aggregator() {}
IAggregator() {}
virtual ~IAggregator() {}
/// @brief Value from superstep S-1 supplied by the conductor
virtual void aggregate(void const* valuePtr) = 0;
virtual void aggregate(VPackSlice slice) = 0;
virtual void const* getValue() const = 0;
// virtual void setValue(VPackSlice slice) = 0;
virtual VPackValue vpackValue() = 0;
virtual VPackValue vpackValue() const = 0;
virtual void parse(VPackSlice slice) = 0;
virtual void reset(){};
bool isPermanent() { return _permanent; }
virtual void reset() = 0;
virtual bool isConverging() const = 0;
};
template <typename T>
class MaxAggregator : public Aggregator {
template<typename T>
struct NumberAggregator : public IAggregator {
static_assert(std::is_arithmetic<T>::value, "Type must be numeric");
T _value, _initial;
public:
MaxAggregator(T init, bool perm = false)
: Aggregator(perm), _value(init), _initial(init) {}
NumberAggregator(T init, bool perm = false, bool conv = false)
: _value(init), _initial(init), _converging(conv) {}
void aggregate(void const* valuePtr) override {
T other = *((T*)valuePtr);
if (other > _value) _value = other;
};
void aggregate(VPackSlice slice) override {
void const* getValue() const override { return &_value; };
VPackValue vpackValue() const override { return VPackValue(_value); };
void parse(VPackSlice slice) override {
T f = slice.getNumber<T>();
aggregate(&f);
aggregate((void const*)(&f));
}
void const* getValue() const override { return &_value; };
VPackValue vpackValue() override { return VPackValue(_value); };
void reset() override {
if (!_permanent) {_value = _initial; }
}
void reset() override { _value = _initial; }
};
template <typename T>
class MinAggregator : public Aggregator {
static_assert(std::is_arithmetic<T>::value, "Type must be numeric");
bool isConverging() const override { return _converging; }
protected:
T _value, _initial;
public:
MinAggregator(T init, bool perm = false)
: Aggregator(perm), _value(init), _initial(init) {}
bool _permanent, _converging;
};
template <typename T>
struct MaxAggregator : public NumberAggregator<T> {
MaxAggregator(T init, bool perm = false) : NumberAggregator<T>(init, perm, true) {}
void aggregate(void const* valuePtr) override {
T other = *((T*)valuePtr);
if (other < _value) _value = other;
if (other > this->_value) this->_value = other;
};
void aggregate(VPackSlice slice) override {
T f = slice.getNumber<T>();
aggregate(&f);
}
void const* getValue() const override { return &_value; };
VPackValue vpackValue() override { return VPackValue(_value); };
void reset() override { _value = _initial; }
};
template <typename T>
class SumAggregator : public Aggregator {
static_assert(std::is_arithmetic<T>::value, "Type must be numeric");
T _value, _initial;
public:
SumAggregator(T init, bool perm = false)
: Aggregator(perm), _value(init), _initial(init) {}
void aggregate(void const* valuePtr) override { _value += *((T*)valuePtr); };
void aggregate(VPackSlice slice) override { _value += slice.getNumber<T>(); }
void const* getValue() const override { return &_value; };
VPackValue vpackValue() override { return VPackValue(_value); };
void reset() override { _value = _initial; }
};
template <typename T>
class ValueAggregator : public Aggregator {
static_assert(std::is_fundamental<T>::value, "Type must be fundamental");
T _value;
struct MinAggregator : public NumberAggregator<T> {
MinAggregator(T init, bool perm = false) : NumberAggregator<T>(init, perm, true) {}
void aggregate(void const* valuePtr) override {
T other = *((T*)valuePtr);
if (other < this->_value) this->_value = other;
};
};
template <typename T>
struct SumAggregator : public NumberAggregator<T> {
SumAggregator(T init, bool perm = false) : NumberAggregator<T>(init, perm, true) {}
void aggregate(void const* valuePtr) override { this->_value += *((T*)valuePtr); };
void parse(VPackSlice slice) override { this->_value += slice.getNumber<T>(); }
};
public:
ValueAggregator(T val, bool perm = false) : Aggregator(perm), _value(val) {}
template <typename T>
struct ValueAggregator : public NumberAggregator<T> {
ValueAggregator(T val, bool perm = false) : NumberAggregator<T>(val, perm, true) {}
void aggregate(void const* valuePtr) override { _value = *((T*)valuePtr); };
void aggregate(VPackSlice slice) override { _value = slice.getNumber<T>(); }
void const* getValue() const override { return &_value; };
VPackValue vpackValue() override { return VPackValue(_value); };
void aggregate(void const* valuePtr) override { this->_value = *((T*)valuePtr); };
void parse(VPackSlice slice) override {this-> _value = slice.getNumber<T>(); }
};
}
}

View File

@ -23,23 +23,32 @@
#include "Pregel/AggregatorHandler.h"
#include "Pregel/Aggregator.h"
#include "Pregel/Algorithm.h"
#include "Basics/ReadLocker.h"
#include "Basics/WriteLocker.h"
using namespace arangodb;
using namespace arangodb::pregel;
AggregatorHandler::~AggregatorHandler() {
WRITE_LOCKER(guard, _lock);
for (auto const& it : _values) {
delete it.second;
}
_values.clear();
}
Aggregator* AggregatorHandler::_create(std::string const& name) {
auto it = _values.find(name);
if (it != _values.end()) {
return it->second;
} else {
std::unique_ptr<Aggregator> agg(_algorithm->aggregator(name));
IAggregator* AggregatorHandler::_get(AggregatorID const& name) {
{
READ_LOCKER(guard, _lock);
auto it = _values.find(name);
if (it != _values.end()) {
return it->second;
}
}
// aggregator doesn't exists, create it
{
WRITE_LOCKER(guard, _lock);
std::unique_ptr<IAggregator> agg(_algorithm->aggregator(name));
if (agg) {
_values[name] = agg.get();
return agg.release();
@ -48,51 +57,60 @@ Aggregator* AggregatorHandler::_create(std::string const& name) {
return nullptr;
}
void AggregatorHandler::aggregate(std::string const& name,
void AggregatorHandler::aggregate(AggregatorID const& name,
const void* valuePtr) {
Aggregator* agg = _create(name);
IAggregator* agg = _get(name);
if (agg) {
agg->aggregate(valuePtr);
}
}
const void* AggregatorHandler::getAggregatedValue(std::string const& name) {
Aggregator* agg = _create(name);
const void* AggregatorHandler::getAggregatedValue(AggregatorID const& name) {
IAggregator* agg = _get(name);
return agg != nullptr ? agg->getValue() : nullptr;
}
void AggregatorHandler::resetValues() {
for (auto& it : _values) {
if (!it.second->isPermanent()) {
it.second->reset();
}
}
}
void AggregatorHandler::aggregateValues(AggregatorHandler const& workerValues) {
for (auto const& pair : workerValues._values) {
std::string const& name = pair.first;
Aggregator* agg = _create(name);
AggregatorID const& name = pair.first;
IAggregator* agg = _get(name);
if (agg) {
agg->aggregate(pair.second->getValue());
}
}
}
void AggregatorHandler::aggregateValues(VPackSlice workerValues) {
for (auto const& keyValue : VPackObjectIterator(workerValues)) {
std::string name = keyValue.key.copyString();
Aggregator* agg = _create(name);
if (agg) {
agg->aggregate(keyValue.value);
void AggregatorHandler::parseValues(VPackSlice data) {
VPackSlice values = data.get(Utils::aggregatorValuesKey);
if (values.isObject()) {
for (auto const& keyValue : VPackObjectIterator(values)) {
AggregatorID name = keyValue.key.copyString();
IAggregator* agg = _get(name);
if (agg) {
agg->parse(keyValue.value);
}
}
}
}
void AggregatorHandler::serializeValues(VPackBuilder& b) const {
for (auto const& pair : _values) {
b.add(pair.first, pair.second->vpackValue());
READ_LOCKER(guard, _lock);
if (_values.size() > 0) {
b.add(Utils::aggregatorValuesKey, VPackValue(VPackValueType::Object));
for (auto const& pair : _values) {
b.add(pair.first, pair.second->vpackValue());
}
b.close();
}
}
size_t AggregatorHandler::size() { return _values.size(); }
size_t AggregatorHandler::size() const {
READ_LOCKER(guard, _lock);
return _values.size();
}

View File

@ -28,18 +28,21 @@
#include <velocypack/velocypack-aliases.h>
#include <functional>
#include <map>
#include "Basics/ReadWriteLock.h"
namespace arangodb {
namespace pregel {
struct IAlgorithm;
class Aggregator;
class IAggregator;
/// Thread safe wrapper around handles
class AggregatorHandler {
const IAlgorithm* _algorithm;
std::map<std::string, Aggregator*> _values;
Aggregator* _create(std::string const& name);
std::map<std::string, IAggregator*> _values;
mutable basics::ReadWriteLock _lock;
IAggregator* _get(std::string const& name);
public:
AggregatorHandler(const IAlgorithm* c) : _algorithm(c) {}
~AggregatorHandler();
@ -47,9 +50,9 @@ class AggregatorHandler {
const void* getAggregatedValue(std::string const& name);
void resetValues();
void aggregateValues(AggregatorHandler const& workerValues);
void aggregateValues(VPackSlice workerValues);
void parseValues(VPackSlice workerValues);
void serializeValues(VPackBuilder& b) const;
size_t size();
size_t size() const;
};
}
}

View File

@ -43,7 +43,7 @@ class VertexComputation;
template <typename V, typename E, typename M>
class VertexCompensation;
class Aggregator;
class IAggregator;
class WorkerConfig;
class MasterContext;
@ -58,7 +58,7 @@ struct IAlgorithm {
virtual bool supportsLazyLoading() const { return false; }
virtual Aggregator* aggregator(std::string const& name) const {
virtual IAggregator* aggregator(std::string const& name) const {
return nullptr;
}
@ -91,8 +91,8 @@ struct Algorithm : IAlgorithm {
WorkerConfig const*) const {
return nullptr;
}
virtual std::vector<std::string> initialActiveSet() {
return std::vector<std::string>();
virtual std::set<std::string> initialActiveSet() {
return std::set<std::string>();
}
protected:

View File

@ -98,7 +98,7 @@ VertexComputation<float, float, float>* LineRank::createComputation(
return new MyComputation();
}
Aggregator* LineRank::aggregator(std::string const& name) const {
IAggregator* LineRank::aggregator(std::string const& name) const {
if (name == kMoreIterations) {
return new ValueAggregator<bool>(false, false);// non perm
}

View File

@ -53,7 +53,7 @@ struct LineRank : public SimpleAlgorithm<float, float, float> {
VertexComputation<float, float, float>* createComputation(
WorkerConfig const*) const override;
Aggregator* aggregator(std::string const& name) const override;
IAggregator* aggregator(std::string const& name) const override;
};
}
}

View File

@ -83,7 +83,7 @@ VertexComputation<float, float, float>* PageRank::createComputation(
return new PRComputation(_threshold);
}
Aggregator* PageRank::aggregator(std::string const& name) const {
IAggregator* PageRank::aggregator(std::string const& name) const {
if (name == kConvergence) {
return new MaxAggregator<float>(-1.0f);
}

View File

@ -51,7 +51,7 @@ struct PageRank : public SimpleAlgorithm<float, float, float> {
VertexComputation<float, float, float>* createComputation(
WorkerConfig const*) const override;
Aggregator* aggregator(std::string const& name) const override;
IAggregator* aggregator(std::string const& name) const override;
};
}
}

View File

@ -94,7 +94,7 @@ VertexComputation<float, float, float>* RecoveringPageRank::createComputation(
return new RPRComputation(_threshold);
}
Aggregator* RecoveringPageRank::aggregator(std::string const& name) const {
IAggregator* RecoveringPageRank::aggregator(std::string const& name) const {
if (name == kConvergence) {
return new MaxAggregator<float>(-1);
} else if (name == kNonFailedCount) {

View File

@ -56,7 +56,7 @@ struct RecoveringPageRank : public SimpleAlgorithm<float, float, float> {
WorkerConfig const*) const override;
VertexCompensation<float, float, float>* createCompensation(
WorkerConfig const*) const override;
Aggregator* aggregator(std::string const& name) const override;
IAggregator* aggregator(std::string const& name) const override;
};
}
}

View File

@ -123,8 +123,8 @@ ShortestPathAlgorithm::ShortestPathAlgorithm(VPackSlice userParams)
_format = new SPGraphFormat(val1.copyString(), val2.copyString());
}
std::vector<std::string> ShortestPathAlgorithm::initialActiveSet() {
return std::vector<std::string>{_format->_sourceDocId};
std::set<std::string> ShortestPathAlgorithm::initialActiveSet() {
return std::set<std::string>{_format->_sourceDocId};
}
GraphFormat<int64_t, int64_t>* ShortestPathAlgorithm::inputFormat() {
@ -137,7 +137,7 @@ ShortestPathAlgorithm::createComputation(WorkerConfig const* _config) const {
return new ShortestPathComp(target);
}
Aggregator* ShortestPathAlgorithm::aggregator(std::string const& name) const {
IAggregator* ShortestPathAlgorithm::aggregator(std::string const& name) const {
if (name == spUpperPathBound) { // persistent min operator
return new MinAggregator<int64_t>(INT64_MAX, true);
}

View File

@ -54,8 +54,8 @@ struct ShortestPathAlgorithm : public Algorithm<int64_t, int64_t, int64_t> {
VertexComputation<int64_t, int64_t, int64_t>* createComputation(
WorkerConfig const* config) const override;
Aggregator* aggregator(std::string const& name) const override;
std::vector<std::string> initialActiveSet() override;
IAggregator* aggregator(std::string const& name) const override;
std::set<std::string> initialActiveSet() override;
};
}
}

View File

@ -116,7 +116,6 @@ bool Conductor::_startGlobalStep() {
// values as well as the count of active vertices
std::vector<ClusterCommRequest> requests;
int res = _sendToAllDBServers(Utils::prepareGSSPath, b.slice(), requests);
if (res != TRI_ERROR_NO_ERROR) {
_state = ExecutionState::IN_ERROR;
LOG(INFO) << "Seems there is at least one worker out of order";
@ -131,10 +130,7 @@ bool Conductor::_startGlobalStep() {
_totalEdgesCount = 0;
for (auto const& req : requests) {
VPackSlice payload = req.result.answer->payload();
VPackSlice aggVals = payload.get(Utils::aggregatorValuesKey);
if (aggVals.isObject()) {
_aggregators->aggregateValues(aggVals);
}
_aggregators->parseValues(payload);
_statistics.accumulateActiveCounts(payload);
_totalVerticesCount += payload.get(Utils::vertexCount).getUInt();
_totalEdgesCount += payload.get(Utils::edgeCount).getUInt();
@ -165,11 +161,7 @@ bool Conductor::_startGlobalStep() {
b.add(Utils::globalSuperstepKey, VPackValue(_globalSuperstep));
b.add(Utils::vertexCount, VPackValue(_totalVerticesCount));
b.add(Utils::edgeCount, VPackValue(_totalEdgesCount));
if (_aggregators->size() > 0) {
b.add(Utils::aggregatorValuesKey, VPackValue(VPackValueType::Object));
_aggregators->serializeValues(b);
b.close();
}
_aggregators->serializeValues(b);
b.close();
LOG(INFO) << b.toString();
@ -186,7 +178,7 @@ bool Conductor::_startGlobalStep() {
}
// ============ Conductor callbacks ===============
void Conductor::finishedWorkerStartup(VPackSlice& data) {
void Conductor::finishedWorkerStartup(VPackSlice data) {
MUTEX_LOCKER(guard, _callbackMutex);
_ensureUniqueResponse(data);
if (_state != ExecutionState::RUNNING) {
@ -219,7 +211,9 @@ void Conductor::finishedWorkerStartup(VPackSlice& data) {
}
}
void Conductor::finishedWorkerStep(VPackSlice& data) {
/// Will optionally send a response, to notify the worker of converging aggregator
/// values which can be coninually updated (in async mode)
void Conductor::finishedWorkerStep(VPackSlice data, VPackBuilder &response) {
MUTEX_LOCKER(guard, _callbackMutex);
// this method can be called multiple times in a superstep depending on
// whether we are in the async mode
@ -231,8 +225,10 @@ void Conductor::finishedWorkerStep(VPackSlice& data) {
return;
}
// track message counts to decide when to halt or add global barriers
// in async mode.
_statistics.accumulateMessageStats(data);
if (!_asyncMode) {
if (_asyncMode == false) {// in async mode we wait for all responded
_ensureUniqueResponse(data);
// wait for the last worker to respond
if (_respondedServers.size() != _dbServers.size()) {
@ -240,6 +236,11 @@ void Conductor::finishedWorkerStep(VPackSlice& data) {
}
} else if (_statistics.clientCount() < _dbServers.size() || // no messages
!_statistics.allMessagesProcessed()) { // haven't received msgs
_aggregators->parseValues(data);
response.openObject();
_aggregators->serializeValues(response);
response.close();
return;
}
@ -251,14 +252,13 @@ void Conductor::finishedWorkerStep(VPackSlice& data) {
_startGlobalStep(); // trigger next superstep
} else if (_state == ExecutionState::CANCELED) {
LOG(WARN) << "Execution was canceled, results will be discarded.";
// tells workers to store / discard results
_finalizeWorkers();
} else { // this prop shouldn't occur,
_finalizeWorkers();// tells workers to store / discard results
} else { // this prop shouldn't occur unless we are recovering or in error
LOG(WARN) << "No further action taken after receiving all responses";
}
}
void Conductor::finishedRecoveryStep(VPackSlice& data) {
void Conductor::finishedRecoveryStep(VPackSlice data) {
MUTEX_LOCKER(guard, _callbackMutex);
_ensureUniqueResponse(data);
if (_state != ExecutionState::RECOVERING) {
@ -267,10 +267,7 @@ void Conductor::finishedRecoveryStep(VPackSlice& data) {
}
// the recovery mechanism might be gathering state information
VPackSlice aggVals = data.get(Utils::aggregatorValuesKey);
if (aggVals.isObject()) {
_aggregators->aggregateValues(aggVals);
}
_aggregators->parseValues(data);
if (_respondedServers.size() != _dbServers.size()) {
return;
}
@ -292,11 +289,7 @@ void Conductor::finishedRecoveryStep(VPackSlice& data) {
VPackBuilder b;
b.openObject();
b.add(Utils::executionNumberKey, VPackValue(_executionNumber));
if (_aggregators->size() > 0) {
b.add(Utils::aggregatorValuesKey, VPackValue(VPackValueType::Object));
_aggregators->serializeValues(b);
b.close();
}
_aggregators->serializeValues(b);
b.close();
// first allow all workers to run worker level operations
res = _sendToAllDBServers(Utils::continueRecoveryPath, b.slice());
@ -401,11 +394,7 @@ void Conductor::startRecovery() {
b.clear();// start a new message
b.openObject();
b.add(Utils::recoveryMethodKey, VPackValue(Utils::compensate));
if (_aggregators->size() > 0) {
b.add(Utils::aggregatorValuesKey, VPackValue(VPackValueType::Object));
_aggregators->serializeValues(b);
b.close();
}
_aggregators->serializeValues(b);
b.close();
_aggregators->resetValues();
@ -565,11 +554,7 @@ int Conductor::_finalizeWorkers() {
b.add("stats", VPackValue(VPackValueType::Object));
_statistics.serializeValues(b);
b.close();
if (_aggregators->size() > 0) {
b.add(Utils::aggregatorValuesKey, VPackValue(VPackValueType::Object));
_aggregators->serializeValues(b);
b.close();
}
_aggregators->serializeValues(b);
b.close();
LOG(INFO) << "Done. We did " << _globalSuperstep << " rounds";

View File

@ -94,9 +94,9 @@ class Conductor {
void _ensureUniqueResponse(VPackSlice body);
// === REST callbacks ===
void finishedWorkerStartup(VPackSlice& data);
void finishedWorkerStep(VPackSlice& data);
void finishedRecoveryStep(VPackSlice& data);
void finishedWorkerStartup(VPackSlice data);
void finishedWorkerStep(VPackSlice data, VPackBuilder &response);
void finishedRecoveryStep(VPackSlice data);
public:
Conductor(

View File

@ -50,6 +50,7 @@ template <typename V, typename E, typename M>
Worker<V, E, M>::Worker(TRI_vocbase_t* vocbase, Algorithm<V, E, M>* algo,
VPackSlice initConfig)
: _config(vocbase->name(), initConfig), _algorithm(algo) {
VPackSlice userParams = initConfig.get(Utils::userParametersKey);
_workerContext.reset(algo->workerContext(userParams));
_messageFormat.reset(algo->messageFormat());
@ -84,7 +85,7 @@ Worker<V, E, M>::Worker(TRI_vocbase_t* vocbase, Algorithm<V, E, M>* algo,
{ // never modify the graph store without holding the mutex
MUTEX_LOCKER(guard, _commandMutex);
if (_config.lazyLoading()) {
std::vector<std::string> activeSet = _algorithm->initialActiveSet();
std::set<std::string> activeSet = _algorithm->initialActiveSet();
if (activeSet.size() == 0) {
THROW_ARANGO_EXCEPTION_MESSAGE(
TRI_ERROR_INTERNAL, "There needs to be one active vertice");
@ -188,10 +189,8 @@ void Worker<V, E, M>::prepareGlobalStep(VPackSlice data,
response.add(Utils::activeCountKey, VPackValue(_activeCount));
response.add(Utils::vertexCount, VPackValue(_graphStore->localVertexCount()));
response.add(Utils::edgeCount, VPackValue(_graphStore->localEdgeCount()));
response.add(Utils::aggregatorValuesKey, VPackValue(VPackValueType::Object));
_workerAggregators->serializeValues(response);
response.close();
response.close();
LOG(INFO) << "Worker sent prepare GSS response: " << response.toJson();
}
@ -245,11 +244,7 @@ void Worker<V, E, M>::startGlobalStep(VPackSlice data) {
_workerAggregators->resetValues();
_conductorAggregators->resetValues();
// parse aggregated values from conductor
VPackSlice aggValues = data.get(Utils::aggregatorValuesKey);
if (aggValues.isObject()) {
_conductorAggregators->aggregateValues(aggValues);
}
_conductorAggregators->parseValues(data);
// execute context
if (_workerContext) {
_workerContext->_vertexCount = data.get(Utils::vertexCount).getUInt();
@ -462,15 +457,24 @@ void Worker<V, E, M>::_finishedProcessing() {
// don't reset the active count, because the conductor will ask about
// it later
_messageStats.resetTracking();
if (_config.asynchronousMode()) {
_continueAsync();
}
}
// TODO ask how to implement message sending without waiting for a response
// call this without locking, to avoid a race condition. The conductor
// might immdediatly call us back
_callConductor(Utils::finishedWorkerStepPath, package.slice());
if (_config.asynchronousMode()) {// no answer expected
std::unique_ptr<ClusterCommResult> result = _callConductorWithResponse(Utils::finishedWorkerStepPath, package.slice());
if (result->status == CL_COMM_RECEIVED) {
VPackSlice data = result->answer->payload();
_conductorAggregators->parseValues(data);// just includes converging aggregators
}
// hold the lock only for the coni
MUTEX_LOCKER(guard, _commandMutex);
_continueAsync();
} else {
_callConductor(Utils::finishedWorkerStepPath, package.slice());
}
}
/// WARNING only call this while holding the _commandMutex
@ -557,14 +561,11 @@ void Worker<V, E, M>::startRecovery(VPackSlice data) {
template <typename V, typename E, typename M>
void Worker<V, E, M>::compensateStep(VPackSlice data) {
MUTEX_LOCKER(guard, _commandMutex);
_conductorAggregators->resetValues();
VPackSlice aggValues = data.get(Utils::aggregatorValuesKey);
if (aggValues.isObject()) {
_conductorAggregators->aggregateValues(aggValues);
}
_workerAggregators->resetValues();
_conductorAggregators->resetValues();
_conductorAggregators->parseValues(data);
ThreadPool* pool = PregelFeature::instance()->threadPool();
pool->enqueue([this] {
if (_state != WorkerState::RECOVERING) {
@ -595,12 +596,7 @@ void Worker<V, E, M>::compensateStep(VPackSlice data) {
VPackValue(_config.executionNumber()));
package.add(Utils::globalSuperstepKey,
VPackValue(_config.globalSuperstep()));
if (_workerAggregators->size() > 0) { // add aggregators
package.add(Utils::aggregatorValuesKey,
VPackValue(VPackValueType::Object));
_workerAggregators->serializeValues(package);
package.close();
}
_workerAggregators->serializeValues(package);
package.close();
_callConductor(Utils::finishedRecoveryPath, package.slice());
});
@ -625,22 +621,32 @@ void Worker<V, E, M>::finalizeRecovery(VPackSlice data) {
LOG(INFO) << "Recovery finished";
}
template <typename V, typename E, typename M>
void Worker<V, E, M>::_callConductor(std::string path, VPackSlice message) {
void Worker<V, E, M>::_callConductor(std::string const& path, VPackSlice message) {
LOG(INFO) << "Calling the conductor";
ClusterComm* cc = ClusterComm::instance();
std::string baseUrl = Utils::baseUrl(_config.database());
CoordTransactionID coordinatorTransactionID = TRI_NewTickServer();
auto headers =
std::make_unique<std::unordered_map<std::string, std::string>>();
auto headers = std::make_unique<std::unordered_map<std::string, std::string>>();
auto body = std::make_shared<std::string const>(message.toJson());
cc->asyncRequest("", coordinatorTransactionID,
"server:" + _config.coordinatorId(), rest::RequestType::POST,
baseUrl + path, body, headers, nullptr,
90.0, // timeout + single request
true);
120.0, // timeout
true);// single request, no answer expected
}
template <typename V, typename E, typename M>
std::unique_ptr<ClusterCommResult> Worker<V, E, M>::_callConductorWithResponse(std::string const& path, VPackSlice message) {
LOG(INFO) << "Calling the conductor";
ClusterComm* cc = ClusterComm::instance();
std::string baseUrl = Utils::baseUrl(_config.database());
CoordTransactionID coordinatorTransactionID = TRI_NewTickServer();
std::unordered_map<std::string, std::string> headers;
return cc->syncRequest("", coordinatorTransactionID,
"server:" + _config.coordinatorId(), rest::RequestType::POST,
baseUrl + path, message.toJson(), headers, 120.0);
}
// template types to create

View File

@ -118,8 +118,10 @@ class Worker : public IWorker {
bool _processVertices(RangeIterator<VertexEntry>& vertexIterator);
void _finishedProcessing();
void _continueAsync();
void _callConductor(std::string path, VPackSlice message);
void _callConductor(std::string const& path, VPackSlice message);
std::unique_ptr<ClusterCommResult> _callConductorWithResponse(std::string const& path,
VPackSlice message);
public:
Worker(TRI_vocbase_t* vocbase, Algorithm<V, E, M>* algorithm,
VPackSlice params);

View File

@ -122,7 +122,7 @@ RestStatus RestPregelHandler::execute() {
} else if (suffix[0] == Utils::finishedWorkerStepPath) {
Conductor *exe = PregelFeature::instance()->conductor(executionNumber);
if (exe) {
exe->finishedWorkerStep(body);
exe->finishedWorkerStep(body, result);
}
} else if (suffix[0] == Utils::cancelGSSPath) {
IWorker *exe = PregelFeature::instance()->worker(executionNumber);