diff --git a/arangod/Pregel/Aggregator.h b/arangod/Pregel/Aggregator.h index 2a8045811f..f425809db3 100644 --- a/arangod/Pregel/Aggregator.h +++ b/arangod/Pregel/Aggregator.h @@ -43,14 +43,18 @@ class IAggregator { IAggregator() {} virtual ~IAggregator() {} - /// @brief Value from superstep S-1 supplied by the conductor + /// @brief Used when updating aggregator value locally virtual void aggregate(void const* valuePtr) = 0; + /// @brief Used when updating aggregator value from remote + virtual void parseAggregate(VPackSlice const& slice) = 0; + + virtual void const* getAggregatedValue() const = 0; + /// @brief Value from superstep S-1 supplied by the conductor + virtual void setAggregatedValue(VPackSlice const& slice) = 0; + + virtual void serialize(std::string const& key, VPackBuilder &builder) const = 0; - virtual void const* getValue() const = 0; - virtual VPackValue vpackValue() const = 0; - virtual void parse(VPackSlice slice) = 0; - - virtual void reset(bool force) = 0; + virtual void reset() = 0; virtual bool isConverging() const = 0; }; @@ -58,26 +62,34 @@ template struct NumberAggregator : public IAggregator { static_assert(std::is_arithmetic::value, "Type must be numeric"); - NumberAggregator(T init, bool perm = false, bool conv = false) - : _value(init), _initial(init), _permanent(perm), _converging(conv) {} - - void const* getValue() const override { return &_value; }; - VPackValue vpackValue() const override { return VPackValue(_value); }; - void parse(VPackSlice slice) override { + NumberAggregator(T neutral, bool perm = false, bool conv = false) + : _value(neutral), _neutral(neutral), _permanent(perm), _converging(conv) {} + + void parseAggregate(VPackSlice const& slice) override { T f = slice.getNumber(); aggregate((void const*)(&f)); + }; + + void const* getAggregatedValue() const override { return &_value; }; + + void setAggregatedValue(VPackSlice const& slice) override { + _value = slice.getNumber(); } + + void serialize(std::string const& key, VPackBuilder &builder) const override { + builder.add(key, VPackValue(_value)); + }; - void reset(bool force) override { - if (!_permanent || force) { - _value = _initial; + void reset() override { + if (!_permanent) { + _value = _neutral; } } bool isConverging() const override { return _converging; } protected: - T _value, _initial; + T _value, _neutral; bool _permanent, _converging; }; @@ -109,36 +121,46 @@ struct SumAggregator : public NumberAggregator { void aggregate(void const* valuePtr) override { this->_value += *((T*)valuePtr); }; - void parse(VPackSlice slice) override { + void parseAggregate(VPackSlice const& slice) override { this->_value += slice.getNumber(); } }; +/// Aggregator that stores a value that is overwritten once another value is aggregated. +/// This aggregator is useful for one-to-many communication from master.compute() or from a special vertex. +/// In case multiple vertices write to this aggregator, its behavior is non-deterministic. template -struct ValueAggregator : public NumberAggregator { - ValueAggregator(T val, bool perm = false) +struct OverwriteAggregator : public NumberAggregator { + OverwriteAggregator(T val, bool perm = false) : NumberAggregator(val, perm, true) {} void aggregate(void const* valuePtr) override { this->_value = *((T*)valuePtr); }; - void parse(VPackSlice slice) override { this->_value = slice.getNumber(); } + void parseAggregate(VPackSlice const& slice) override { this->_value = slice.getNumber(); } }; /// always initializes to true. struct BoolOrAggregator : public IAggregator { BoolOrAggregator(bool perm = false) : _permanent(perm) {} - - void const* getValue() const override { return &_value; }; - VPackValue vpackValue() const override { return VPackValue(_value); }; - + void aggregate(void const* valuePtr) override { _value = _value || *((bool*)valuePtr); }; - void parse(VPackSlice slice) override { _value = _value || slice.getBool(); } + + void parseAggregate(VPackSlice const& slice) override { _value = _value || slice.getBool(); } + + void const* getAggregatedValue() const override { return &_value; }; + void setAggregatedValue(VPackSlice const& slice) override { + _value = slice.getBool(); + } + + void serialize(std::string const& key, VPackBuilder &builder) const override { + builder.add(key, VPackValue(_value)); + }; - void reset(bool force) override { - if (!_permanent || force) { + void reset() override { + if (!_permanent) { _value = false; } } diff --git a/arangod/Pregel/AggregatorHandler.cpp b/arangod/Pregel/AggregatorHandler.cpp index ca8349be30..ff413842a9 100644 --- a/arangod/Pregel/AggregatorHandler.cpp +++ b/arangod/Pregel/AggregatorHandler.cpp @@ -37,7 +37,7 @@ AggregatorHandler::~AggregatorHandler() { _values.clear(); } -IAggregator* AggregatorHandler::_get(AggregatorID const& name) { +IAggregator* AggregatorHandler::getAggregator(AggregatorID const& name) { { READ_LOCKER(guard, _lock); auto it = _values.find(name); @@ -59,46 +59,57 @@ IAggregator* AggregatorHandler::_get(AggregatorID const& name) { void AggregatorHandler::aggregate(AggregatorID const& name, const void* valuePtr) { - IAggregator* agg = _get(name); + IAggregator* agg = getAggregator(name); if (agg) { agg->aggregate(valuePtr); } } -const void* AggregatorHandler::getAggregatedValue(AggregatorID const& name) { - IAggregator* agg = _get(name); - return agg != nullptr ? agg->getValue() : nullptr; -} - -void AggregatorHandler::resetValues(bool force) { - for (auto& it : _values) { - it.second->reset(force); - } -} - void AggregatorHandler::aggregateValues(AggregatorHandler const& workerValues) { for (auto const& pair : workerValues._values) { AggregatorID const& name = pair.first; - IAggregator* agg = _get(name); + IAggregator* agg = getAggregator(name); if (agg) { - agg->aggregate(pair.second->getValue()); + agg->aggregate(pair.second->getAggregatedValue()); } } } -bool AggregatorHandler::parseValues(VPackSlice data) { - VPackSlice values = data.get(Utils::aggregatorValuesKey); - if (values.isObject() == false) { - return false; - } - for (auto const& keyValue : VPackObjectIterator(values)) { - AggregatorID name = keyValue.key.copyString(); - IAggregator* agg = _get(name); - if (agg) { - agg->parse(keyValue.value); +void AggregatorHandler::aggregateValues(VPackSlice const& workerValues) { + VPackSlice values = workerValues.get(Utils::aggregatorValuesKey); + if (values.isObject()) { + for (auto const& keyValue : VPackObjectIterator(values)) { + AggregatorID name = keyValue.key.copyString(); + IAggregator* agg = getAggregator(name); + if (agg) { + agg->parseAggregate(keyValue.value); + } } } - return true; +} + +void AggregatorHandler::setAggregatedValues(VPackSlice const& workerValues) { + VPackSlice values = workerValues.get(Utils::aggregatorValuesKey); + if (values.isObject()) { + for (auto const& keyValue : VPackObjectIterator(values)) { + AggregatorID name = keyValue.key.copyString(); + IAggregator* agg = getAggregator(name); + if (agg) { + agg->setAggregatedValue(keyValue.value); + } + } + } +} + +const void* AggregatorHandler::getAggregatedValue(AggregatorID const& name) { + IAggregator* agg = getAggregator(name); + return agg != nullptr ? agg->getAggregatedValue() : nullptr; +} + +void AggregatorHandler::resetValues() { + for (auto& it : _values) { + it.second->reset(); + } } bool AggregatorHandler::serializeValues(VPackBuilder& b, @@ -107,9 +118,10 @@ bool AggregatorHandler::serializeValues(VPackBuilder& b, bool hasValues = false; b.add(Utils::aggregatorValuesKey, VPackValue(VPackValueType::Object)); for (auto const& pair : _values) { + AggregatorID const& name = pair.first; IAggregator* agg = pair.second; if (!onlyConverging || agg->isConverging()) { - b.add(pair.first, agg->vpackValue()); + agg->serialize(name, b); hasValues = true; } } diff --git a/arangod/Pregel/AggregatorHandler.h b/arangod/Pregel/AggregatorHandler.h index 77e213a653..b0fe895d9f 100644 --- a/arangod/Pregel/AggregatorHandler.h +++ b/arangod/Pregel/AggregatorHandler.h @@ -42,28 +42,28 @@ class AggregatorHandler { std::map _values; mutable basics::ReadWriteLock _lock; - IAggregator* _get(std::string const& name); public: AggregatorHandler(IAlgorithm const* c) : _algorithm(c) {} ~AggregatorHandler(); - - void registerAggregator(std::string const& name, IAggregator* aggregator); + + IAggregator* getAggregator(std::string const& name); /// aggregate this value void aggregate(std::string const& name, const void* valuePtr); + /// aggregates all values from this aggregator + void aggregateValues(AggregatorHandler const& workerValues); + /// aggregates all values from this aggregator + void aggregateValues(VPackSlice const& workerValues); + + /// return true if there are values in this Slice + void setAggregatedValues(VPackSlice const& workerValues); /// get the pointer to an aggregator value const void* getAggregatedValue(std::string const& name); - + /// calls reset on every aggregator - void resetValues(bool force = false); - - /// aggregates all values from this aggregator - void aggregateValues(AggregatorHandler const& workerValues); - - /// return true if there are values in this Slice - bool parseValues(VPackSlice workerValues); + void resetValues(); /// return true if there values in this aggregator which were serialized bool serializeValues(VPackBuilder& b, bool onlyConverging = false) const; diff --git a/arangod/Pregel/Algorithm.h b/arangod/Pregel/Algorithm.h index 3f2c980394..d262d4b5fb 100644 --- a/arangod/Pregel/Algorithm.h +++ b/arangod/Pregel/Algorithm.h @@ -86,7 +86,7 @@ template struct Algorithm : IAlgorithm { public: virtual WorkerContext* workerContext(VPackSlice userParams) const { - return new WorkerContext(userParams); + return new WorkerContext(); } virtual GraphFormat* inputFormat() const = 0; virtual MessageFormat* messageFormat() const = 0; diff --git a/arangod/Pregel/Algos/AsyncSCC.cpp b/arangod/Pregel/Algos/AsyncSCC.cpp index c05daf352d..103594919e 100644 --- a/arangod/Pregel/Algos/AsyncSCC.cpp +++ b/arangod/Pregel/Algos/AsyncSCC.cpp @@ -254,7 +254,7 @@ MasterContext* AsyncSCC::masterContext(VPackSlice userParams) const { IAggregator* AsyncSCC::aggregator(std::string const& name) const { if (name == kPhase) { // permanent value - return new ValueAggregator(SCCPhase::TRANSPOSE, true); + return new OverwriteAggregator(SCCPhase::TRANSPOSE, true); } else if (name == kFoundNewMax) { return new BoolOrAggregator(false); // non perm } else if (name == kConverged) { diff --git a/arangod/Pregel/Algos/DMID.cpp b/arangod/Pregel/Algos/DMID.cpp index f143af33a3..f998adab58 100644 --- a/arangod/Pregel/Algos/DMID.cpp +++ b/arangod/Pregel/Algos/DMID.cpp @@ -29,6 +29,7 @@ #include "Pregel/IncomingCache.h" #include "Pregel/MasterContext.h" #include "Pregel/VertexComputation.h" +#include "Pregel/Algos/DMID/VertexSumAggregator.h" using namespace arangodb; using namespace arangodb::pregel; @@ -115,7 +116,7 @@ struct DMIDComputation superstep4(vertex, messages); } - if (getSuperstep() == rwFinished +1) { + if (globalSuperstep() == rwFinished +1) { /** * Superstep 0 and RW_ITERATIONBOUND + 5 are identical. Therefore * call superstep0 @@ -123,38 +124,36 @@ struct DMIDComputation superstep0(vertex, messages); } - if (getSuperstep() == rwFinished+2) { + if (globalSuperstep() == rwFinished+2) { superstep6(vertex, messages); } - if (getSuperstep() == rwFinished + 3) { + if (globalSuperstep() == rwFinished + 3) { superstep7(vertex, messages); } - LongWritable iterationCounter = getAggregatedValue(ITERATION_AGG); - double it = iterationCounter.get(); + int64_t iterationCounter = getAggregatedValue(ITERATION_AGG); if (getSuperstep() >= rwFinished +4 - && (it % 3 == 1 )) { - superstep8(vertex, messages); + && (iterationCounter % 3 == 1 )) { + superstep8(messages); } if (getSuperstep() >= rwFinished +5 - && (it % 3 == 2 )) { - superstep9(vertex, messages); + && (iterationCounter % 3 == 2 )) { + superstep9(messages); } if (getSuperstep() >= rwFinished +6 - && (it % 3 == 0 )) { - superstep10(vertex, messages); + && (iterationCounter % 3 == 0 )) { + superstep10(messages); } } - + /** * SUPERSTEP 0: send a message along all outgoing edges. Message contains * own VertexID and the edge weight. */ - private void superstep0( - Vertex vertex, + void superstep0(Vertex vertex, Iterable messages) { long vertexID = vertex.getId().get(); @@ -171,7 +170,7 @@ struct DMIDComputation * the form (ID,weightedInDegree) along all incoming edges (send every node * a reply) */ - private void superstep1( + void superstep1( Vertex vertex, Iterable messages) { @@ -208,7 +207,7 @@ struct DMIDComputation * Save the column as a part of the vertexValue. Aggregate DA with value 1/N * to initialize the Random Walk. */ - private void superstep2( + void superstep2( Vertex vertex, Iterable messages) { @@ -264,7 +263,7 @@ struct DMIDComputation * SUPERSTEP 3 - RW_ITERATIONBOUND+3: Calculate entry DA^(t+1)_ownID using * DA^t and disCol. Save entry in the DA aggregator. */ - private void superstepRW( + void superstepRW( Vertex vertex, Iterable messages) { @@ -614,10 +613,6 @@ struct DMIDValueMasterContext : public MasterContext { DMIDValueMasterContext() {} // TODO use _threashold void preGlobalSuperstep() override { - }; - - @Override - public void compute() { /** * setAggregatorValue sets the value for the aggregator after master * compute, before starting vertex compute of the same superstep. Does @@ -625,31 +620,26 @@ struct DMIDValueMasterContext : public MasterContext { */ - LongWritable iterCount = getAggregatedValue(DMIDComputation.ITERATION_AGG); - - boolean hasCascadingStarted = false; - LongWritable newIterCount = new LongWritable((iterCount.get() + 1)); - - - if (iterCount.get() != 0) { + int64_t* iterCount = getAggregatedValue(ITERATION_AGG); + int64_t newIterCount = *iterCount + 1; + bool hasCascadingStarted = false; + if (*iterCount != 0) { /** Cascading behavior started increment the iteration count */ - setAggregatedValue(DMIDComputation.ITERATION_AGG, newIterCount); + aggregateValue(ITERATION_AGG, newIterCount); hasCascadingStarted = true; } - if (getSuperstep() == DMIDComputation.RW_ITERATIONBOUND+ 8) { - setAggregatedValue(DMIDComputation.NEW_MEMBER_AGG, - new BooleanWritable(false)); - setAggregatedValue(DMIDComputation.NOT_ALL_ASSIGNED_AGG, - new BooleanWritable(true)); - setAggregatedValue(DMIDComputation.ITERATION_AGG, new LongWritable(1)); + if (getSuperstep() == RW_ITERATIONBOUND+ 8) { + aggregateValue(NEW_MEMBER_AGG, false); + aggregateValue(NOT_ALL_ASSIGNED_AGG, true); + aggregateValue(ITERATION_AGG, 1); hasCascadingStarted = true; initializeGL(); } - if (hasCascadingStarted && (newIterCount.get() % 3 == 1)) { + if (hasCascadingStarted && (newIterCount % 3 == 1)) { /** first step of one iteration */ - LongWritable restartCountWritable = getAggregatedValue(RESTART_COUNTER_AGG); - Long restartCount=restartCountWritable.get(); + int64_t* restartCountWritable = getAggregatedValue(RESTART_COUNTER_AGG); + Long restartCount = restartCountWritable.get(); BooleanWritable newMember = getAggregatedValue(DMIDComputation.NEW_MEMBER_AGG); BooleanWritable notAllAssigned = getAggregatedValue(DMIDComputation.NOT_ALL_ASSIGNED_AGG); @@ -672,17 +662,15 @@ struct DMIDValueMasterContext : public MasterContext { } - if (hasCascadingStarted && (iterCount.get() % 3 == 2)) { + if (hasCascadingStarted && (*iterCount % 3 == 2)) { /** Second step of one iteration */ /** * Set newMember aggregator and notAllAssigned aggregator back to * initial value */ - setAggregatedValue(DMIDComputation.NEW_MEMBER_AGG, - new BooleanWritable(false)); - setAggregatedValue(DMIDComputation.NOT_ALL_ASSIGNED_AGG, - new BooleanWritable(false)); + setAggregatedValue(NEW_MEMBER_AGG, false); + setAggregatedValue(NOT_ALL_ASSIGNED_AGG, false); } if (LOG_AGGS) { @@ -719,10 +707,10 @@ struct DMIDValueMasterContext : public MasterContext { * Initilizes the global leader aggregator with 1 for every vertex with a * higher number of followers than the average. */ - private void initializeGL() { + void initializeGL() { DoubleSparseVector initGL = new DoubleSparseVector( (int) getTotalNumVertices()); - DoubleSparseVector vecFD = getAggregatedValue(DMIDComputation.FD_AGG); + VertexSumAggregator::VertexMap const* vecFD = getAggregatedValue(FD_AGG); double averageFD = 0.0; int numLocalLeader = 0; @@ -740,7 +728,7 @@ struct DMIDValueMasterContext : public MasterContext { if (LOG_AGGS) { System.out.print("Global Leader:"); } - for (int i = 0; i < getTotalNumVertices(); ++i) { + for (int i = 0; i < vertexCount(); ++i) { if (vecFD.get(i) > averageFD) { initGL.set(i, 1.0); if (LOG_AGGS) { @@ -768,43 +756,24 @@ MasterContext* SCC::masterContext(VPackSlice userParams) const { IAggregator* SCC::aggregator(std::string const& name) const { if (name == DA_AGG) { // permanent value - return new ValueAggregator(SCCPhase::TRANSPOSE, true); - } else if (name == kFoundNewMax) { - return new BoolOrAggregator(false); // non perm - } else if (name == kConverged) { - return new BoolOrAggregator(false); // non perm + return new VertexSumAggregator(false);// non perm + } else if (name == LS_AGG) { + return new VertexSumAggregator(true);// perm + } else if (name == FD_AGG) { + return new VertexSumAggregator(true);// perm + } else if (name == GL_AGG) { + return new VertexSumAggregator(true);// perm + } else if (name == NEW_MEMBER_AGG) { + return new BooleanOrAggregator(false); // non perm + } else if (name == NOT_ALL_ASSIGNED_AGG) { + return new BooleanOrAggregator(false); // non perm + } else if (name == ITERATION_AGG) { + return new MaxAggregator(0, true); // perm + } else if (name == PROFITABILITY_AGG) { + return new MaxAggregator(0.5, true); // perm + } else if (name == RESTART_COUNTER_AGG) { + return new MaxAggregator(1, true); // perm } - registerAggregator(DMIDComputation.DA_AGG, - DoubleDenseVectorSumAggregator.class); - registerPersistentAggregator(DMIDComputation.LS_AGG, - DoubleDenseVectorSumAggregator.class); - registerPersistentAggregator(DMIDComputation.FD_AGG, - DoubleSparseVectorSumAggregator.class); - registerPersistentAggregator(DMIDComputation.GL_AGG, - DoubleSparseVectorSumAggregator.class); - - registerAggregator(DMIDComputation.NEW_MEMBER_AGG, - BooleanOrAggregator.class); - registerAggregator(DMIDComputation.NOT_ALL_ASSIGNED_AGG, - BooleanOrAggregator.class); - - registerPersistentAggregator(DMIDComputation.ITERATION_AGG, - LongMaxAggregator.class); - - registerPersistentAggregator(DMIDComputation.PROFITABILITY_AGG, - DoubleMaxAggregator.class); - registerPersistentAggregator(RESTART_COUNTER_AGG, - LongMaxAggregator.class); - //registerAggregator(DMIDComputation.RW_INFINITYNORM_AGG, - // DoubleMaxAggregator.class); - //registerAggregator(DMIDComputation.RW_FINISHED_AGG, - //LongMaxAggregator.class); - - setAggregatedValue(DMIDComputation.PROFITABILITY_AGG, - new DoubleWritable(0.5)); - setAggregatedValue(RESTART_COUNTER_AGG, new LongWritable(1)); - setAggregatedValue(DMIDComputation.ITERATION_AGG, new LongWritable(0)); - return nullptr; } diff --git a/arangod/Pregel/Algos/DMID/VertexSumAggregator.h b/arangod/Pregel/Algos/DMID/VertexSumAggregator.h new file mode 100644 index 0000000000..9214656b6d --- /dev/null +++ b/arangod/Pregel/Algos/DMID/VertexSumAggregator.h @@ -0,0 +1,132 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2016 ArangoDB GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Simon Grätzer +//////////////////////////////////////////////////////////////////////////////// + +#include +#include +#include +#include +#include +#include "Pregel/Graph.h" + +#ifndef ARANGODB_PREGEL_AGG_DENSE_VECTOR_H +#define ARANGODB_PREGEL_AGG_DENSE_VECTOR_H 1 + +namespace arangodb { +namespace pregel { + +struct VertexSumAggregator : public IAggregator { + static_assert(std::is_arithmetic::value, "Type must be numeric"); + typedef std::map> VertexMap; + + VertexSumAggregator(bool perm = false) + : _empty(empty), _permanent(perm) {} + + // if you use this from a vertex I will end you + void aggregate(void const* valuePtr) { + VertexMap const* map = (VertexMap)valuePtr; + for (auto pair1 const& : map) { + for (auto pair2 const& : it.second) { + _entries[pair1.first][pair2.first] += pair2.second; + } + } + }; + + void parseAggregate(VPackSlice slice) override { + for (auto const& pair: VPackObjectIterator(slice)) { + prgl_shard_t shard = it.key.getUInt(); + std::string key; + VPackLength i = 0; + for (VPackSlice const& val : VPackArrayIterator(pair.value)) { + if (i % 2 == 0) { + key = val.copyString(); + } else { + entries[shard][key] += val.getNumber(); + } + i++; + } + } + } + + void const* getAggregatedValue() const override { return &_entries; }; + + void setAggregatedValue(VPackSlice slice) override { + for (auto const& pair: VPackObjectIterator(slice)) { + prgl_shard_t shard = it.key.getUInt(); + std::string key; + VPackLength i = 0; + for (VPackSlice const& val : VPackArrayIterator(pair.value)) { + if (i % 2 == 0) { + key = val.copyString(); + } else { + entries[shard][key] = val.getNumber(); + } + i++; + } + } + } + + void serialize(std::string const& key, VPackBuilder &builder) const override { + builder.add(key, VPackValueType::Array); + for (T const& e : _entries) { + builder.add(VPackValue(e)); + } + builder.close(); + }; + + void reset() override { + if (!_permanent) { + _entries.clear(); + } + } + + double getAggregatedValue(prgl_shard_t shard, std::string const& key) { + auto const& it1 = _entries.find(shard); + if (it1 != _entries.end()) { + auto const& it2 = it1.second.find(key); + if (it2 != it1.second.end()) { + return it2.second; + } + } + return _empty; + } + + void setValue(prgl_shard_t shard, std::string const& key, double val) { + _entries[shard][key] = val; + } + + void aggregate(prgl_shard_t shard, std::string const& key, double val) { + _entries[shard][key] += val; + } + + void setEmptyValue(double empty) { + _empty = empty; + } + + bool isConverging() const override { return false; } + + protected: + VertexMap _entries; + double _empty = 0; + bool _permanent; +}; +} +#endif diff --git a/arangod/Pregel/Algos/LineRank.cpp b/arangod/Pregel/Algos/LineRank.cpp index 571354f96e..410190d78e 100644 --- a/arangod/Pregel/Algos/LineRank.cpp +++ b/arangod/Pregel/Algos/LineRank.cpp @@ -27,6 +27,7 @@ #include "Pregel/MasterContext.h" #include "Pregel/Utils.h" #include "Pregel/VertexComputation.h" +#include "Pregel/WorkerContext.h" #include "Cluster/ClusterInfo.h" #include "Utils/OperationCursor.h" @@ -49,16 +50,29 @@ LineRank::LineRank(arangodb::velocypack::Slice params) //_threshold = t.isNumber() ? t.getNumber() : 0.000002f; } +struct LRWorkerContext : WorkerContext { + float startAtNodeProb = 0; + + void preApplication() override { + startAtNodeProb = 1.0f / edgeCount(); + }; +}; + +WorkerContext* LineRank::workerContext(VPackSlice params) const { + return new LRWorkerContext(); +} + // github.com/JananiC/NetworkCentralities/blob/master/src/main/java/linerank/LineRank.java struct LRComputation : public VertexComputation { LRComputation() {} void compute(MessageIterator const& messages) override { - float startAtNodeProb = 1.0f / context()->edgeCount(); + LRWorkerContext *ctx = (LRWorkerContext *)context(); + float* vertexValue = mutableVertexData(); RangeIterator> edges = getEdges(); if (*vertexValue < 0.0f) { - *vertexValue = startAtNodeProb; + *vertexValue = ctx->startAtNodeProb; aggregate(kMoreIterations, true); } else { float newScore = 0.0f; @@ -76,7 +90,7 @@ struct LRComputation : public VertexComputation { } else { newScore /= edges.size(); newScore = - startAtNodeProb * RESTART_PROB + newScore * (1.0 - RESTART_PROB); + ctx->startAtNodeProb * RESTART_PROB + newScore * (1.0 - RESTART_PROB); } float diff = fabsf(newScore - *vertexValue); @@ -91,6 +105,8 @@ struct LRComputation : public VertexComputation { } }; + + VertexComputation* LineRank::createComputation( WorkerConfig const* config) const { return new LRComputation(); @@ -98,7 +114,7 @@ VertexComputation* LineRank::createComputation( IAggregator* LineRank::aggregator(std::string const& name) const { if (name == kMoreIterations) { - return new ValueAggregator(false, false); // non perm + return new OverwriteAggregator(false, false); // non perm } return nullptr; } diff --git a/arangod/Pregel/Algos/LineRank.h b/arangod/Pregel/Algos/LineRank.h index e8eee4a1d6..18a09c1e43 100644 --- a/arangod/Pregel/Algos/LineRank.h +++ b/arangod/Pregel/Algos/LineRank.h @@ -51,9 +51,12 @@ struct LineRank : public SimpleAlgorithm { MessageCombiner* messageCombiner() const override { return new SumCombiner(); } + + WorkerContext* workerContext(VPackSlice params) const override; VertexComputation* createComputation( WorkerConfig const*) const override; + IAggregator* aggregator(std::string const& name) const override; }; } diff --git a/arangod/Pregel/Algos/PageRank.h b/arangod/Pregel/Algos/PageRank.h index b6787ca680..f07a2d939b 100644 --- a/arangod/Pregel/Algos/PageRank.h +++ b/arangod/Pregel/Algos/PageRank.h @@ -50,6 +50,7 @@ struct PageRank : public SimpleAlgorithm { VertexComputation* createComputation( WorkerConfig const*) const override; + IAggregator* aggregator(std::string const& name) const override; MasterContext* masterContext(VPackSlice userParams) const override; diff --git a/arangod/Pregel/Algos/RecoveringPageRank.cpp b/arangod/Pregel/Algos/RecoveringPageRank.cpp index 96c49f8a10..2eaac255b7 100644 --- a/arangod/Pregel/Algos/RecoveringPageRank.cpp +++ b/arangod/Pregel/Algos/RecoveringPageRank.cpp @@ -87,9 +87,9 @@ IAggregator* RecoveringPageRank::aggregator(std::string const& name) const { } else if (name == kRank) { return new SumAggregator(0); } else if (name == kStep) { - return new ValueAggregator(0); + return new OverwriteAggregator(0); } else if (name == kScale) { - return new ValueAggregator(-1); + return new OverwriteAggregator(-1); } return nullptr; } diff --git a/arangod/Pregel/Algos/SCC.cpp b/arangod/Pregel/Algos/SCC.cpp index ce59920e2a..000574cf43 100644 --- a/arangod/Pregel/Algos/SCC.cpp +++ b/arangod/Pregel/Algos/SCC.cpp @@ -237,7 +237,7 @@ MasterContext* SCC::masterContext(VPackSlice userParams) const { IAggregator* SCC::aggregator(std::string const& name) const { if (name == kPhase) { // permanent value - return new ValueAggregator(SCCPhase::TRANSPOSE, true); + return new OverwriteAggregator(SCCPhase::TRANSPOSE, true); } else if (name == kFoundNewMax) { return new BoolOrAggregator(false); // non perm } else if (name == kConverged) { diff --git a/arangod/Pregel/Conductor.cpp b/arangod/Pregel/Conductor.cpp index 57b17af0f1..858f6c0ab2 100644 --- a/arangod/Pregel/Conductor.cpp +++ b/arangod/Pregel/Conductor.cpp @@ -142,7 +142,7 @@ bool Conductor::_startGlobalStep() { _totalEdgesCount = 0; for (auto const& req : requests) { VPackSlice payload = req.result.answer->payload(); - _aggregators->parseValues(payload); + _aggregators->aggregateValues(payload); _statistics.accumulateActiveCounts(payload); _totalVerticesCount += payload.get(Utils::vertexCountKey).getUInt(); _totalEdgesCount += payload.get(Utils::edgeCountKey).getUInt(); @@ -267,17 +267,16 @@ VPackBuilder Conductor::finishedWorkerStep(VPackSlice const& data) { } else if (_statistics.clientCount() < _dbServers.size() || // no messages !_statistics.allMessagesProcessed()) { // haven't received msgs VPackBuilder response; - if (_aggregators->parseValues(data)) { - if (_masterContext) { - _masterContext->postLocalSuperstep(); - } - response.openObject(); - _aggregators->serializeValues(response); - if (_masterContext && _masterContext->_enterNextGSS) { - response.add(Utils::enterNextGSSKey, VPackValue(true)); - } - response.close(); + _aggregators->aggregateValues(data); + if (_masterContext) { + _masterContext->postLocalSuperstep(); } + response.openObject(); + _aggregators->serializeValues(response); + if (_masterContext && _masterContext->_enterNextGSS) { + response.add(Utils::enterNextGSSKey, VPackValue(true)); + } + response.close(); return response; } @@ -312,7 +311,7 @@ void Conductor::finishedRecoveryStep(VPackSlice const& data) { } // the recovery mechanism might be gathering state information - _aggregators->parseValues(data); + _aggregators->aggregateValues(data); if (_respondedServers.size() != _dbServers.size()) { return; } diff --git a/arangod/Pregel/Graph.h b/arangod/Pregel/Graph.h index e280a0950e..04a1a392e2 100644 --- a/arangod/Pregel/Graph.h +++ b/arangod/Pregel/Graph.h @@ -157,4 +157,23 @@ class VertexEntry { };*/ } } +/* +namespace std { + template <> + struct hash { + std::size_t operator()(const PregelID& k) const { + using std::size_t; + using std::hash; + using std::string; + + // Compute individual hash values for first, + // second and third and combine them using XOR + // and bit shifting: + std::size_t h1 = std::hash()(k.key); + std::size_t h2 = std::hash()(k.shard); + return h1 ^ (h2 << 1); + } + }; +}*/ + #endif diff --git a/arangod/Pregel/MasterContext.h b/arangod/Pregel/MasterContext.h index 9987edeffb..48469159e6 100644 --- a/arangod/Pregel/MasterContext.h +++ b/arangod/Pregel/MasterContext.h @@ -77,6 +77,7 @@ class MasterContext { /// Called when a worker send updated aggregator values. /// Only called in async mode, never called after a global superstep + /// Can be used to decide to enter the next phase virtual void postLocalSuperstep(){}; /// should indicate if compensation is supposed to start by returning true diff --git a/arangod/Pregel/VertexComputation.h b/arangod/Pregel/VertexComputation.h index 414e52c444..5c5e592d6e 100644 --- a/arangod/Pregel/VertexComputation.h +++ b/arangod/Pregel/VertexComputation.h @@ -37,7 +37,7 @@ namespace pregel { template class Worker; -class Aggregator; +class IAggregator; template class VertexContext { @@ -47,8 +47,8 @@ class VertexContext { uint64_t _lss = 0; WorkerContext* _context; GraphStore* _graphStore; - AggregatorHandler* _conductorAggregators; - AggregatorHandler* _workerAggregators; + AggregatorHandler* _readAggregators; + AggregatorHandler* _writeAggregators; VertexEntry* _vertexEntry; public: @@ -57,12 +57,16 @@ class VertexContext { template inline void aggregate(std::string const& name, T const& value) { T const* ptr = &value; - _workerAggregators->aggregate(name, ptr); + _writeAggregators->aggregate(name, ptr); } template inline const T* getAggregatedValue(std::string const& name) { - return (const T*)_conductorAggregators->getAggregatedValue(name); + return (const T*)_readAggregators->getAggregatedValue(name); + } + + IAggregator* getAggregator(std::string const& name) { + return _writeAggregators->getAggregator(name); } inline WorkerContext const* context() { return _context; } diff --git a/arangod/Pregel/Worker.cpp b/arangod/Pregel/Worker.cpp index f451a6cc2d..4ddc49b46a 100644 --- a/arangod/Pregel/Worker.cpp +++ b/arangod/Pregel/Worker.cpp @@ -201,8 +201,8 @@ VPackBuilder Worker::prepareGlobalStep(VPackSlice const& data) { // initialize worker context if (_workerContext && gss == 0 && _config.localSuperstep() == 0) { - _workerContext->_conductorAggregators = _conductorAggregators.get(); - _workerContext->_workerAggregators = _workerAggregators.get(); + _workerContext->_readAggregators = _conductorAggregators.get(); + _workerContext->_writeAggregators = _workerAggregators.get(); _workerContext->_vertexCount = data.get(Utils::vertexCountKey).getUInt(); _workerContext->_edgeCount = data.get(Utils::edgeCountKey).getUInt(); _workerContext->preApplication(); @@ -295,9 +295,8 @@ void Worker::startGlobalStep(VPackSlice const& data) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER, "Wrong GSS"); } - _workerAggregators->resetValues(true); - _conductorAggregators->resetValues(true); - _conductorAggregators->parseValues(data); + _workerAggregators->resetValues(); + _conductorAggregators->setAggregatedValues(data); // execute context if (_workerContext) { _workerContext->_vertexCount = data.get(Utils::vertexCountKey).getUInt(); @@ -359,7 +358,7 @@ void Worker::_initializeVertexContext(VertexContext* ctx) { ctx->_lss = _config.localSuperstep(); ctx->_context = _workerContext.get(); ctx->_graphStore = _graphStore.get(); - ctx->_conductorAggregators = _conductorAggregators.get(); + ctx->_readAggregators = _conductorAggregators.get(); } // internally called in a WORKER THREAD!! @@ -381,7 +380,7 @@ bool Worker::_processVertices(size_t threadId, std::unique_ptr> vertexComputation( _algorithm->createComputation(&_config)); _initializeVertexContext(vertexComputation.get()); - vertexComputation->_workerAggregators = &workerAggregator; + vertexComputation->_writeAggregators = &workerAggregator; vertexComputation->_cache = outCache; if (!_config.asynchronousMode()) { // Should cause enterNextGlobalSuperstep to do nothing @@ -531,13 +530,15 @@ void Worker::_finishedProcessing() { } if (_config.asynchronousMode()) { - bool proceed = true; + bool proceed = false; // if the conductor is unreachable or has send data (try to) proceed std::unique_ptr result = _callConductorWithResponse( Utils::finishedWorkerStepPath, package.slice()); if (result->status == CL_COMM_RECEIVED) { VPackSlice data = result->answer->payload(); - if ((proceed = _conductorAggregators->parseValues(data))) { + if (data.isObject()) { + proceed = true; + _conductorAggregators->aggregateValues(data);// only aggregate values VPackSlice nextGSS = data.get(Utils::enterNextGSSKey); if (nextGSS.isBool()) { _requestedNextGSS = nextGSS.getBool(); @@ -643,8 +644,7 @@ void Worker::compensateStep(VPackSlice const& data) { MUTEX_LOCKER(guard, _commandMutex); _workerAggregators->resetValues(); - _conductorAggregators->resetValues(); - _conductorAggregators->parseValues(data); + _conductorAggregators->setAggregatedValues(data); ThreadPool* pool = PregelFeature::instance()->threadPool(); pool->enqueue([this] { @@ -657,7 +657,7 @@ void Worker::compensateStep(VPackSlice const& data) { std::unique_ptr> vCompensate( _algorithm->createCompensation(&_config)); _initializeVertexContext(vCompensate.get()); - vCompensate->_workerAggregators = _workerAggregators.get(); + vCompensate->_writeAggregators = _workerAggregators.get(); size_t i = 0; for (VertexEntry* vertexEntry : vertexIterator) { diff --git a/arangod/Pregel/WorkerContext.h b/arangod/Pregel/WorkerContext.h index 09c9bbc675..b73fadf781 100644 --- a/arangod/Pregel/WorkerContext.h +++ b/arangod/Pregel/WorkerContext.h @@ -37,19 +37,19 @@ class WorkerContext { friend class Worker; uint64_t _vertexCount, _edgeCount; - AggregatorHandler* _conductorAggregators; - AggregatorHandler* _workerAggregators; + AggregatorHandler* _readAggregators; + AggregatorHandler* _writeAggregators; protected: template inline void aggregate(std::string const& name, T const& value) { T const* ptr = &value; - _workerAggregators->aggregate(name, ptr); + _writeAggregators->aggregate(name, ptr); } template inline const T* getAggregatedValue(std::string const& name) { - return (T*)_conductorAggregators->getAggregatedValue(name); + return (T*)_readAggregators->getAggregatedValue(name); } virtual void preApplication(){}; @@ -58,7 +58,7 @@ class WorkerContext { virtual void postApplication(){}; public: - WorkerContext(VPackSlice params) {} + WorkerContext() {} virtual ~WorkerContext() {} inline uint64_t vertexCount() const { return _vertexCount; }