From 66ba421dff2ee2bd3b81b25eb09ae59b42d3d34c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20Gra=CC=88tzer?= Date: Wed, 18 Jan 2017 14:06:19 +0100 Subject: [PATCH] Pregel Refactoring --- arangod/CMakeLists.txt | 2 + arangod/Pregel/Aggregator.h | 78 ++++--- arangod/Pregel/AlgoRegistry.cpp | 10 + arangod/Pregel/Algorithm.h | 2 +- arangod/Pregel/Algos/ConnectedComponents.cpp | 68 +++++++ arangod/Pregel/Algos/ConnectedComponents.h | 60 ++++++ arangod/Pregel/Algos/LineRank.cpp | 106 ++++++++++ arangod/Pregel/Algos/LineRank.h | 61 ++++++ arangod/Pregel/Algos/PageRank.cpp | 30 +-- arangod/Pregel/Algos/PageRank.h | 10 +- arangod/Pregel/Algos/RecoveringPageRank.cpp | 28 +-- arangod/Pregel/Algos/RecoveringPageRank.h | 10 +- arangod/Pregel/Algos/SCC.cpp | 107 ---------- arangod/Pregel/Algos/SCC.h | 46 ----- arangod/Pregel/Algos/SSSP.cpp | 4 +- arangod/Pregel/Algos/ShortestPath.cpp | 8 +- arangod/Pregel/Algos/ShortestPath.h | 2 +- arangod/Pregel/Combiners/FloatSumCombiner.h | 39 ---- arangod/Pregel/Conductor.cpp | 1 + arangod/Pregel/GraphFormat.h | 201 +++++++++---------- arangod/Pregel/GraphStore.cpp | 5 +- arangod/Pregel/IncomingCache.cpp | 3 + arangod/Pregel/MasterContext.h | 1 - arangod/Pregel/MessageCombiner.h | 17 +- arangod/Pregel/MessageFormat.h | 30 ++- arangod/Pregel/VertexComputation.h | 15 +- 26 files changed, 515 insertions(+), 429 deletions(-) create mode 100644 arangod/Pregel/Algos/ConnectedComponents.cpp create mode 100644 arangod/Pregel/Algos/ConnectedComponents.h create mode 100644 arangod/Pregel/Algos/LineRank.cpp create mode 100644 arangod/Pregel/Algos/LineRank.h delete mode 100644 arangod/Pregel/Algos/SCC.cpp delete mode 100644 arangod/Pregel/Algos/SCC.h delete mode 100644 arangod/Pregel/Combiners/FloatSumCombiner.h diff --git a/arangod/CMakeLists.txt b/arangod/CMakeLists.txt index 4e24efa157..4d5bf91e45 100644 --- a/arangod/CMakeLists.txt +++ b/arangod/CMakeLists.txt @@ -367,6 +367,8 @@ SET(ARANGOD_SOURCES Pregel/Algos/ShortestPath.cpp Pregel/Algos/PageRank.cpp Pregel/Algos/RecoveringPageRank.cpp + Pregel/Algos/LineRank.cpp + Pregel/Algos/ConnectedComponents.cpp Pregel/Conductor.cpp Pregel/GraphStore.cpp Pregel/IncomingCache.cpp diff --git a/arangod/Pregel/Aggregator.h b/arangod/Pregel/Aggregator.h index 11c862e11d..7f986356cd 100644 --- a/arangod/Pregel/Aggregator.h +++ b/arangod/Pregel/Aggregator.h @@ -43,7 +43,7 @@ class Aggregator { virtual ~Aggregator() {} /// @brief Value from superstep S-1 supplied by the conductor - virtual void aggregate(const void* valuePtr) = 0; + virtual void aggregate(void const* valuePtr) = 0; virtual void aggregate(VPackSlice slice) = 0; virtual void const* getValue() const = 0; @@ -54,27 +54,27 @@ class Aggregator { bool isPermanent() { return _permanent; } }; -class FloatMaxAggregator : public Aggregator { - float _value, _initial; - - public: - FloatMaxAggregator(float init) : _value(init), _initial(init) {} - +template +class MaxAggregator : public Aggregator { + static_assert(std::is_arithmetic::value, "Type must be numeric"); + T _value, _initial; + +public: + MaxAggregator(T init, bool perm = false) + : Aggregator(perm), _value(init), _initial(init) {} + void aggregate(void const* valuePtr) override { - float other = *((float*)valuePtr); + T other = *((T*)valuePtr); if (other > _value) _value = other; }; void aggregate(VPackSlice slice) override { - float f = slice.getNumber(); + T f = slice.getNumber(); aggregate(&f); } - + void const* getValue() const override { return &_value; }; - /*void setValue(VPackSlice slice) override { - _value = (float)slice.getDouble(); - }*/ - VPackValue vpackValue() override { return VPackValue((double)_value); }; - + VPackValue vpackValue() override { return VPackValue(_value); }; + void reset() override { _value = _initial; } }; @@ -97,49 +97,41 @@ class MinAggregator : public Aggregator { } void const* getValue() const override { return &_value; }; - /*void setValue(VPackSlice slice) override { - _value = (float)slice.getDouble(); - }*/ VPackValue vpackValue() override { return VPackValue(_value); }; void reset() override { _value = _initial; } }; + +template +class SumAggregator : public Aggregator { + static_assert(std::is_arithmetic::value, "Type must be numeric"); + T _value, _initial; + +public: + SumAggregator(T init, bool perm = false) + : Aggregator(perm), _value(init), _initial(init) {} + + void aggregate(void const* valuePtr) override { _value += *((T*)valuePtr); }; + void aggregate(VPackSlice slice) override { _value += slice.getNumber(); } + + void const* getValue() const override { return &_value; }; + VPackValue vpackValue() override { return VPackValue(_value); }; + + void reset() override { _value = _initial; } +}; template class ValueAggregator : public Aggregator { - static_assert(std::is_arithmetic::value, "Type must be numeric"); - + static_assert(std::is_fundamental::value, "Type must be fundamental"); T _value; public: - ValueAggregator(T val) : Aggregator(true), _value(val) {} + ValueAggregator(T val, bool perm = false) : Aggregator(perm), _value(val) {} void aggregate(void const* valuePtr) override { _value = *((T*)valuePtr); }; void aggregate(VPackSlice slice) override { _value = slice.getNumber(); } void const* getValue() const override { return &_value; }; - /*void setValue(VPackSlice slice) override { - _value = (float)slice.getDouble(); - }*/ - VPackValue vpackValue() override { return VPackValue(_value); }; -}; - -template -class SumAggregator : public Aggregator { - static_assert(std::is_arithmetic::value, "Type must be numeric"); - - T _value; - - public: - SumAggregator(T val) : Aggregator(true), _value(val) {} - - void aggregate(void const* valuePtr) override { _value += *((T*)valuePtr); }; - void aggregate(VPackSlice slice) override { _value += slice.getNumber(); } - - void const* getValue() const override { return &_value; }; - /*void setValue(VPackSlice slice) override { - _value = (float)slice.getDouble(); - }*/ VPackValue vpackValue() override { return VPackValue(_value); }; }; } diff --git a/arangod/Pregel/AlgoRegistry.cpp b/arangod/Pregel/AlgoRegistry.cpp index e6339250b9..d84bf2b914 100644 --- a/arangod/Pregel/AlgoRegistry.cpp +++ b/arangod/Pregel/AlgoRegistry.cpp @@ -25,6 +25,8 @@ #include "Pregel/Algos/RecoveringPageRank.h" #include "Pregel/Algos/SSSP.h" #include "Pregel/Algos/ShortestPath.h" +#include "Pregel/Algos/LineRank.h" +#include "Pregel/Algos/ConnectedComponents.h" #include "Pregel/Utils.h" using namespace arangodb; @@ -40,6 +42,10 @@ IAlgorithm* AlgoRegistry::createAlgorithm(std::string const& algorithm, return new algos::RecoveringPageRank(userParams); } else if (algorithm == "shortestpath") { return new algos::ShortestPathAlgorithm(userParams); + } else if (algorithm == "linerank") { + return new algos::LineRank(userParams); + } else if (algorithm == "connectedcomponents") { + return new algos::ConnectedComponents(userParams); } else { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER, "Unsupported Algorithm"); @@ -75,6 +81,10 @@ IWorker* AlgoRegistry::createWorker(TRI_vocbase_t* vocbase, } else if (algorithm == "shortestpath") { return createWorker(vocbase, new algos::ShortestPathAlgorithm(userParams), body); + } else if (algorithm == "linerank") { + return createWorker(vocbase, new algos::LineRank(userParams), body); + } else if (algorithm == "connectedcomponents") { + return createWorker(vocbase, new algos::ConnectedComponents(userParams), body); } else { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER, "Unsupported Algorithm"); diff --git a/arangod/Pregel/Algorithm.h b/arangod/Pregel/Algorithm.h index f12c7d645e..e01dc326ba 100644 --- a/arangod/Pregel/Algorithm.h +++ b/arangod/Pregel/Algorithm.h @@ -30,7 +30,6 @@ #include "Basics/Common.h" #include "GraphFormat.h" -#include "MasterContext.h" #include "MessageCombiner.h" #include "MessageFormat.h" #include "WorkerContext.h" @@ -46,6 +45,7 @@ class VertexCompensation; class Aggregator; class WorkerConfig; +class MasterContext; struct IAlgorithm { virtual ~IAlgorithm() {} diff --git a/arangod/Pregel/Algos/ConnectedComponents.cpp b/arangod/Pregel/Algos/ConnectedComponents.cpp new file mode 100644 index 0000000000..54b0463d2a --- /dev/null +++ b/arangod/Pregel/Algos/ConnectedComponents.cpp @@ -0,0 +1,68 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2016 ArangoDB GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Simon Grätzer +//////////////////////////////////////////////////////////////////////////////// + +#include "ConnectedComponents.h" +#include "Pregel/Algorithm.h" +#include "Pregel/GraphStore.h" +#include "Pregel/IncomingCache.h" +#include "Pregel/VertexComputation.h" + +using namespace arangodb::pregel; +using namespace arangodb::pregel::algos; + +struct MyComputation : public VertexComputation { + MyComputation() {} + void compute(MessageIterator const& messages) override { + + int64_t currentComponent = vertexData(); + for (const int64_t* msg : messages) { + if (*msg < currentComponent) { + currentComponent = *msg; + }; + } + + if (currentComponent != vertexData()) { + sendMessageToAllEdges(currentComponent); + } + voteHalt(); + } +}; + +VertexComputation* ConnectedComponents::createComputation( + WorkerConfig const* config) const { + return new MyComputation(); +} + +struct MyCompensation : public VertexCompensation { + MyCompensation() {} + void compensate(bool inLostPartition) override { + if (inLostPartition) { + int64_t* data = mutableVertexData(); + *data = INT64_MAX; + } + } +}; + +VertexCompensation* +ConnectedComponents::createCompensation(WorkerConfig const* config) const { + return new MyCompensation(); +} diff --git a/arangod/Pregel/Algos/ConnectedComponents.h b/arangod/Pregel/Algos/ConnectedComponents.h new file mode 100644 index 0000000000..dbb4c2af0a --- /dev/null +++ b/arangod/Pregel/Algos/ConnectedComponents.h @@ -0,0 +1,60 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2016 ArangoDB GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Simon Grätzer +//////////////////////////////////////////////////////////////////////////////// + +#ifndef ARANGODB_PREGEL_ALGOS_WCC_H +#define ARANGODB_PREGEL_ALGOS_WCC_H 1 + +#include "Pregel/Algorithm.h" + +namespace arangodb { +namespace pregel { +namespace algos { + +/// The idea behind the algorithm is very simple: propagate the smallest +/// vertex id along the edges to all vertices of a connected component. The +/// number of supersteps necessary is equal to the length of the maximum +/// diameter of all components + 1 +struct ConnectedComponents : public SimpleAlgorithm { + public: + ConnectedComponents(VPackSlice userParams) : SimpleAlgorithm("ConnectedComponents", userParams) {} + + bool supportsAsyncMode() const override { return false; } + bool supportsCompensation() const override { return true; } + + GraphFormat* inputFormat() override { + return new NumberGraphFormat(_sourceField, _resultField, INT64_MAX, 0); + } + MessageFormat* messageFormat() const override { + return new IntegerMessageFormat(); + } + MessageCombiner* messageCombiner() const override { + return new MinCombiner(); + } + VertexComputation* createComputation( + WorkerConfig const*) const override; + VertexCompensation* createCompensation( + WorkerConfig const*) const override; +}; +} +} +} +#endif diff --git a/arangod/Pregel/Algos/LineRank.cpp b/arangod/Pregel/Algos/LineRank.cpp new file mode 100644 index 0000000000..7acf9d85d5 --- /dev/null +++ b/arangod/Pregel/Algos/LineRank.cpp @@ -0,0 +1,106 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2016 ArangoDB GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Simon Grätzer +//////////////////////////////////////////////////////////////////////////////// + +#include "LineRank.h" +#include "Pregel/Aggregator.h" +#include "Pregel/GraphFormat.h" +#include "Pregel/Iterators.h" +#include "Pregel/MasterContext.h" +#include "Pregel/Utils.h" +#include "Pregel/VertexComputation.h" + +#include "Cluster/ClusterInfo.h" +#include "Utils/OperationCursor.h" +#include "Utils/SingleCollectionTransaction.h" +#include "Utils/StandaloneTransactionContext.h" +#include "Utils/Transaction.h" +#include "VocBase/vocbase.h" + +using namespace arangodb; +using namespace arangodb::pregel; +using namespace arangodb::pregel::algos; + +static std::string const kMoreIterations = "more"; +static const double RESTART_PROB = 0.15; +static const double EPS = 0.000000001; + +LineRank::LineRank(arangodb::velocypack::Slice params) + : SimpleAlgorithm("LineRank", params) { + //VPackSlice t = params.get("convergenceThreshold"); + //_threshold = t.isNumber() ? t.getNumber() : 0.000002f; +} + +// github.com/JananiC/NetworkCentralities/blob/master/src/main/java/linerank/LineRank.java +struct MyComputation : public VertexComputation { + MyComputation() {} + void compute(MessageIterator const& messages) override { + + float startAtNodeProb = 1.0f / context()->edgeCount(); + float* vertexValue = mutableVertexData(); + RangeIterator> edges = getEdges(); + + if (*vertexValue < 0.0f) { + *vertexValue = startAtNodeProb; + aggregate(kMoreIterations, true); + } else { + + float newScore = 0.0f; + for (const float* msg : messages) { + newScore += *msg; + } + + bool const* moreIterations = getAggregatedValue(kMoreIterations); + if (*moreIterations == false) { + *vertexValue = *vertexValue * edges.size() + newScore; + voteHalt(); + } else { + + if (edges.size() == 0) { + newScore = 0; + } else { + newScore /= edges.size(); + newScore = startAtNodeProb * RESTART_PROB + newScore * (1.0 - RESTART_PROB); + } + + float diff = fabsf(newScore - *vertexValue); + *vertexValue = newScore; + + if (diff > EPS) { + aggregate(kMoreIterations, true); + } + } + } + sendMessageToAllEdges(*vertexValue); + } +}; + +VertexComputation* LineRank::createComputation( + WorkerConfig const* config) const { + return new MyComputation(); +} + +Aggregator* LineRank::aggregator(std::string const& name) const { + if (name == kMoreIterations) { + return new ValueAggregator(false, false);// non perm + } + return nullptr; +} diff --git a/arangod/Pregel/Algos/LineRank.h b/arangod/Pregel/Algos/LineRank.h new file mode 100644 index 0000000000..f0976f46c4 --- /dev/null +++ b/arangod/Pregel/Algos/LineRank.h @@ -0,0 +1,61 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2016 ArangoDB GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Simon Grätzer +//////////////////////////////////////////////////////////////////////////////// + +#ifndef ARANGODB_PREGEL_ALGOS_LINERANK_H +#define ARANGODB_PREGEL_ALGOS_LINERANK_H 1 + +#include +#include "Pregel/Algorithm.h" + +namespace arangodb { +namespace pregel { +namespace algos { + +/// LineRank from "Centralities in Large Networks: Algorithms and Observations" 2011: +/// Given a directed graph G, the LINERANK score of a node v ∈ G is computed by +/// aggregating the stationary probabilities of its incident edges on the line graph L(G). +/// Implementation based on +/// github.com/JananiC/NetworkCentralities/blob/master/src/main/java/linerank/LineRank.java +struct LineRank : public SimpleAlgorithm { + + public: + LineRank(arangodb::velocypack::Slice params); + + GraphFormat* inputFormat() override { + return new VertexGraphFormat(_resultField, -1.0); + } + MessageFormat* messageFormat() const override { + return new FloatMessageFormat(); + } + + MessageCombiner* messageCombiner() const override { + return new SumCombiner(); + } + + VertexComputation* createComputation( + WorkerConfig const*) const override; + Aggregator* aggregator(std::string const& name) const override; +}; +} +} +} +#endif diff --git a/arangod/Pregel/Algos/PageRank.cpp b/arangod/Pregel/Algos/PageRank.cpp index 9855de7940..3e1e73241f 100644 --- a/arangod/Pregel/Algos/PageRank.cpp +++ b/arangod/Pregel/Algos/PageRank.cpp @@ -22,7 +22,6 @@ #include "PageRank.h" #include "Pregel/Aggregator.h" -#include "Pregel/Combiners/FloatSumCombiner.h" #include "Pregel/GraphFormat.h" #include "Pregel/Iterators.h" #include "Pregel/MasterContext.h" @@ -48,31 +47,6 @@ PageRank::PageRank(arangodb::velocypack::Slice params) _threshold = t.isNumber() ? t.getNumber() : 0.000002f; } -struct PageRankGraphFormat : public FloatGraphFormat { - PageRankGraphFormat(std::string const& s, std::string const& r) - : FloatGraphFormat(s, r, 0, 0) {} - size_t copyVertexData(VertexEntry const& vertex, - std::string const& documentId, - arangodb::velocypack::Slice document, void* targetPtr, - size_t maxSize) override { - *((float*)targetPtr) = _vDefault; - return sizeof(float); - } - - size_t copyEdgeData(arangodb::velocypack::Slice document, void* targetPtr, - size_t maxSize) override { - return 0; - } -}; - -GraphFormat* PageRank::inputFormat() { - return new PageRankGraphFormat(_sourceField, _resultField); -} - -MessageCombiner* PageRank::messageCombiner() const { - return new FloatSumCombiner(); -} - struct PRComputation : public VertexComputation { float _limit; PRComputation(float t) : _limit(t) {} @@ -110,8 +84,8 @@ VertexComputation* PageRank::createComputation( } Aggregator* PageRank::aggregator(std::string const& name) const { - if (name == "convergence") { - return new FloatMaxAggregator(-1); + if (name == kConvergence) { + return new MaxAggregator(-1.0f); } return nullptr; } diff --git a/arangod/Pregel/Algos/PageRank.h b/arangod/Pregel/Algos/PageRank.h index 1fb0103d7d..a86b24738b 100644 --- a/arangod/Pregel/Algos/PageRank.h +++ b/arangod/Pregel/Algos/PageRank.h @@ -37,12 +37,18 @@ struct PageRank : public SimpleAlgorithm { public: PageRank(arangodb::velocypack::Slice params); - GraphFormat* inputFormat() override; + GraphFormat* inputFormat() override { + return new VertexGraphFormat(_resultField, 0); + } + MessageFormat* messageFormat() const override { return new FloatMessageFormat(); } - MessageCombiner* messageCombiner() const override; + MessageCombiner* messageCombiner() const override { + return new SumCombiner(); + } + VertexComputation* createComputation( WorkerConfig const*) const override; Aggregator* aggregator(std::string const& name) const override; diff --git a/arangod/Pregel/Algos/RecoveringPageRank.cpp b/arangod/Pregel/Algos/RecoveringPageRank.cpp index f7acafc45c..8fc6136696 100644 --- a/arangod/Pregel/Algos/RecoveringPageRank.cpp +++ b/arangod/Pregel/Algos/RecoveringPageRank.cpp @@ -22,7 +22,6 @@ #include "RecoveringPageRank.h" #include "Pregel/Aggregator.h" -#include "Pregel/Combiners/FloatSumCombiner.h" #include "Pregel/GraphFormat.h" #include "Pregel/Iterators.h" #include "Pregel/MasterContext.h" @@ -54,31 +53,6 @@ RecoveringPageRank::RecoveringPageRank(arangodb::velocypack::Slice params) _threshold = t.isNumber() ? t.getNumber() : 0.000002f; } -struct PageRankGraphFormat : public FloatGraphFormat { - PageRankGraphFormat(std::string const& s, std::string const& r) - : FloatGraphFormat(s, r, 0, 0) {} - size_t copyVertexData(VertexEntry const& vertex, - std::string const& documentId, - arangodb::velocypack::Slice document, void* targetPtr, - size_t maxSize) override { - *((float*)targetPtr) = _vDefault; - return sizeof(float); - } - - size_t copyEdgeData(arangodb::velocypack::Slice document, void* targetPtr, - size_t maxSize) override { - return 0; - } -}; - -GraphFormat* RecoveringPageRank::inputFormat() { - return new PageRankGraphFormat(_sourceField, _resultField); -} - -MessageCombiner* RecoveringPageRank::messageCombiner() const { - return new FloatSumCombiner(); -} - struct RPRComputation : public VertexComputation { float _limit; RPRComputation(float t) : _limit(t) {} @@ -122,7 +96,7 @@ VertexComputation* RecoveringPageRank::createComputation( Aggregator* RecoveringPageRank::aggregator(std::string const& name) const { if (name == kConvergence) { - return new FloatMaxAggregator(-1); + return new MaxAggregator(-1); } else if (name == kNonFailedCount) { return new SumAggregator(0); } else if (name == kRank) { diff --git a/arangod/Pregel/Algos/RecoveringPageRank.h b/arangod/Pregel/Algos/RecoveringPageRank.h index ff3a49253c..9639d6333f 100644 --- a/arangod/Pregel/Algos/RecoveringPageRank.h +++ b/arangod/Pregel/Algos/RecoveringPageRank.h @@ -40,12 +40,18 @@ struct RecoveringPageRank : public SimpleAlgorithm { bool supportsCompensation() const override { return true; } MasterContext* masterContext(VPackSlice userParams) const override; - GraphFormat* inputFormat() override; + GraphFormat* inputFormat() override { + return new VertexGraphFormat(_resultField, 0); + } + MessageFormat* messageFormat() const override { return new FloatMessageFormat(); } - MessageCombiner* messageCombiner() const override; + MessageCombiner* messageCombiner() const override { + return new SumCombiner(); + } + VertexComputation* createComputation( WorkerConfig const*) const override; VertexCompensation* createCompensation( diff --git a/arangod/Pregel/Algos/SCC.cpp b/arangod/Pregel/Algos/SCC.cpp deleted file mode 100644 index fe741bedf9..0000000000 --- a/arangod/Pregel/Algos/SCC.cpp +++ /dev/null @@ -1,107 +0,0 @@ -//////////////////////////////////////////////////////////////////////////////// -/// DISCLAIMER -/// -/// Copyright 2016 ArangoDB GmbH, Cologne, Germany -/// -/// Licensed under the Apache License, Version 2.0 (the "License"); -/// you may not use this file except in compliance with the License. -/// You may obtain a copy of the License at -/// -/// http://www.apache.org/licenses/LICENSE-2.0 -/// -/// Unless required by applicable law or agreed to in writing, software -/// distributed under the License is distributed on an "AS IS" BASIS, -/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -/// See the License for the specific language governing permissions and -/// limitations under the License. -/// -/// Copyright holder is ArangoDB GmbH, Cologne, Germany -/// -/// @author Simon Grätzer -//////////////////////////////////////////////////////////////////////////////// - -#include "SCC.h" -#include "Pregel/GraphFormat.h" -#include "Pregel/Utils.h" -#include "Pregel/VertexComputation.h" - -#include "Cluster/ClusterInfo.h" -#include "Utils/OperationCursor.h" -#include "Utils/SingleCollectionTransaction.h" -#include "Utils/StandaloneTransactionContext.h" -#include "Utils/Transaction.h" -#include "Vocbase/vocbase.h" - -using namespace arangodb; -using namespace arangodb::pregel; -using namespace arangodb::pregel::algos; - -struct SCCGraphFormat : public GraphFormat { - uint64_t _currentId = 0; - void willUseCollection(TRI_vocbase_t* vocbase, std::string const& shard, - bool isEdgeCollection) override { - int64_t count = Utils::countDocuments(vocbase, shard); - _currentId = ClusterInfo::instance()->uniqid(count); - } - SCCGraphFormat() {} - - size_t copyVertexData(VPackSlice document, void* targetPtr, - size_t maxSize) override { - *((int64_t*)targetPtr) = _currentId; - _currentId++; - return sizeof(int64_t); - } - - size_t copyEdgeData(VPackSlice document, void* targetPtr, - size_t maxSize) override { - return 0; - } - - int64_t readVertexData(void* ptr) override { return *((int64_t*)ptr); } - int64_t readEdgeData(void* ptr) override { return *((int64_t*)ptr); } -}; - -std::shared_ptr> SCCAlgorithm::inputFormat() - const { - return std::make_shared("value", -1, 1); -} - -std::shared_ptr> SCCAlgorithm::messageFormat() const { - return std::shared_ptr(new IntegerMessageFormat()); -} - -std::shared_ptr> SCCAlgorithm::messageCombiner() - const { - return std::shared_ptr(new IntegerMinCombiner()); -} - -struct SCCComputation : public VertexComputation { - SCCComputation() {} - void compute(std::string const& vertexID, - MessageIterator const& messages) override { - /*int64_t tmp = vertexData(); - for (const int64_t* msg : messages) { - if (tmp < 0 || *msg < tmp) { - tmp = *msg; - }; - } - int64_t* state = mutableVertexData(); - if (tmp >= 0 && (getGlobalSuperstep() == 0 || tmp != *state)) { - LOG(INFO) << "Recomputing value for vertex " << vertexID; - *state = tmp; // update state - - EdgeIterator edges = getEdges(); - for (EdgeEntry* edge : edges) { - int64_t val = *(edge->getData()) + tmp; - uint64_t toID = edge->toPregelID(); - sendMessage(toID, val); - } - }*/ - voteHalt(); - } -}; - -std::shared_ptr> -SCCAlgorithm::createComputation() const { - return std::shared_ptr(new SCCComputation()); -} diff --git a/arangod/Pregel/Algos/SCC.h b/arangod/Pregel/Algos/SCC.h deleted file mode 100644 index 7ea89ba0ff..0000000000 --- a/arangod/Pregel/Algos/SCC.h +++ /dev/null @@ -1,46 +0,0 @@ -//////////////////////////////////////////////////////////////////////////////// -/// DISCLAIMER -/// -/// Copyright 2016 ArangoDB GmbH, Cologne, Germany -/// -/// Licensed under the Apache License, Version 2.0 (the "License"); -/// you may not use this file except in compliance with the License. -/// You may obtain a copy of the License at -/// -/// http://www.apache.org/licenses/LICENSE-2.0 -/// -/// Unless required by applicable law or agreed to in writing, software -/// distributed under the License is distributed on an "AS IS" BASIS, -/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -/// See the License for the specific language governing permissions and -/// limitations under the License. -/// -/// Copyright holder is ArangoDB GmbH, Cologne, Germany -/// -/// @author Simon Grätzer -//////////////////////////////////////////////////////////////////////////////// - -#ifndef ARANGODB_PREGEL_ALGOS_SSSP_H -#define ARANGODB_PREGEL_ALGOS_SSSP_H 1 - -#include "Pregel/Algorithm.h" - -namespace arangodb { -namespace pregel { -namespace algos { - -/// Strongly connected components. -struct SCCAlgorithm : public Algorithm { - public: - SCCAlgorithm() : Algorithm("SCC") {} - - std::shared_ptr> inputFormat() override; - std::shared_ptr> messageFormat() const override; - std::shared_ptr> messageCombiner() const override; - std::shared_ptr> - createComputation(uint64_t gss) const override; -}; -} -} -} -#endif diff --git a/arangod/Pregel/Algos/SSSP.cpp b/arangod/Pregel/Algos/SSSP.cpp index 8235598b27..c84029a1f8 100644 --- a/arangod/Pregel/Algos/SSSP.cpp +++ b/arangod/Pregel/Algos/SSSP.cpp @@ -53,7 +53,7 @@ struct SSSPComputation : public VertexComputation { }; GraphFormat* SSSPAlgorithm::inputFormat() { - return new IntegerGraphFormat(_sourceField, _resultField, INT64_MAX, 1); + return new NumberGraphFormat (_sourceField, _resultField, INT64_MAX, 1); } MessageFormat* SSSPAlgorithm::messageFormat() const { @@ -61,7 +61,7 @@ MessageFormat* SSSPAlgorithm::messageFormat() const { } MessageCombiner* SSSPAlgorithm::messageCombiner() const { - return new IntegerMinCombiner(); + return new MinCombiner(); } VertexComputation* SSSPAlgorithm::createComputation( diff --git a/arangod/Pregel/Algos/ShortestPath.cpp b/arangod/Pregel/Algos/ShortestPath.cpp index dd94d84189..9876b53e66 100644 --- a/arangod/Pregel/Algos/ShortestPath.cpp +++ b/arangod/Pregel/Algos/ShortestPath.cpp @@ -38,17 +38,19 @@ struct ShortestPathComp : public VertexComputation { ShortestPathComp(PregelID const& target) : _target(target) {} void compute(MessageIterator const& messages) override { - int64_t const* max = getAggregatedValue(spUpperPathBound); int64_t current = vertexData(); for (const int64_t* msg : messages) { if (*msg < current) { current = *msg; }; } - + + // use global state to limit the computation of paths bool isSource = current == 0 && localSuperstep() == 0; + int64_t const* max = getAggregatedValue(spUpperPathBound); + int64_t* state = mutableVertexData(); - if ((current < *state && current < *max) || isSource) { + if (isSource || (current < *state && current < *max)) { *state = current; // update state if (this->pregelId() == _target) { diff --git a/arangod/Pregel/Algos/ShortestPath.h b/arangod/Pregel/Algos/ShortestPath.h index aa16e994d6..94c16e29ef 100644 --- a/arangod/Pregel/Algos/ShortestPath.h +++ b/arangod/Pregel/Algos/ShortestPath.h @@ -49,7 +49,7 @@ struct ShortestPathAlgorithm : public Algorithm { } MessageCombiner* messageCombiner() const override { - return new IntegerMinCombiner(); + return new MinCombiner(); } VertexComputation* createComputation( diff --git a/arangod/Pregel/Combiners/FloatSumCombiner.h b/arangod/Pregel/Combiners/FloatSumCombiner.h deleted file mode 100644 index 8a1e7dd0b7..0000000000 --- a/arangod/Pregel/Combiners/FloatSumCombiner.h +++ /dev/null @@ -1,39 +0,0 @@ -//////////////////////////////////////////////////////////////////////////////// -/// DISCLAIMER -/// -/// Copyright 2016 ArangoDB GmbH, Cologne, Germany -/// -/// Licensed under the Apache License, Version 2.0 (the "License"); -/// you may not use this file except in compliance with the License. -/// You may obtain a copy of the License at -/// -/// http://www.apache.org/licenses/LICENSE-2.0 -/// -/// Unless required by applicable law or agreed to in writing, software -/// distributed under the License is distributed on an "AS IS" BASIS, -/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -/// See the License for the specific language governing permissions and -/// limitations under the License. -/// -/// Copyright holder is ArangoDB GmbH, Cologne, Germany -/// -/// @author Simon Grätzer -//////////////////////////////////////////////////////////////////////////////// - -#include "Pregel/MessageCombiner.h" - -#ifndef ARANGODB_PREGEL_FLOAT_SUM_COMBINER_H -#define ARANGODB_PREGEL_FLOAT_SUM_COMBINER_H 1 -namespace arangodb { -namespace pregel { - -struct FloatSumCombiner : public MessageCombiner { - FloatSumCombiner() {} - void combine(float & firstValue, - float const& secondValue) const override { - firstValue += secondValue; - }; -}; -} -} -#endif diff --git a/arangod/Pregel/Conductor.cpp b/arangod/Pregel/Conductor.cpp index 95249830aa..5f336f22fd 100644 --- a/arangod/Pregel/Conductor.cpp +++ b/arangod/Pregel/Conductor.cpp @@ -24,6 +24,7 @@ #include "Pregel/Aggregator.h" #include "Pregel/AlgoRegistry.h" #include "Pregel/Algorithm.h" +#include "Pregel/MasterContext.h" #include "Pregel/PregelFeature.h" #include "Pregel/Recovery.h" #include "Pregel/Utils.h" diff --git a/arangod/Pregel/GraphFormat.h b/arangod/Pregel/GraphFormat.h index 8c630e8e3f..38e63f5612 100644 --- a/arangod/Pregel/GraphFormat.h +++ b/arangod/Pregel/GraphFormat.h @@ -39,6 +39,7 @@ template struct GraphFormat { virtual size_t estimatedVertexSize() const { return sizeof(V); }; virtual size_t estimatedEdgeSize() const { return sizeof(E); }; + virtual void willLoadVertices(size_t count) {} virtual size_t copyVertexData(VertexEntry const& vertex, std::string const& documentId, @@ -53,92 +54,6 @@ struct GraphFormat { const void* targetPtr, size_t size) = 0; }; -class IntegerGraphFormat : public GraphFormat { - const std::string _sourceField, _resultField; - const int64_t _vDefault, _eDefault; - - public: - IntegerGraphFormat(std::string const& source, std::string const& result, - int64_t vertexNull, int64_t edgeNull) - : _sourceField(source), - _resultField(result), - _vDefault(vertexNull), - _eDefault(edgeNull) {} - - size_t copyVertexData(VertexEntry const& vertex, - std::string const& documentId, - arangodb::velocypack::Slice document, void* targetPtr, - size_t maxSize) override { - arangodb::velocypack::Slice val = document.get(_sourceField); - *((int64_t*)targetPtr) = val.isInteger() ? val.getInt() : _vDefault; - return sizeof(int64_t); - } - - size_t copyEdgeData(arangodb::velocypack::Slice document, void* targetPtr, - size_t maxSize) override { - arangodb::velocypack::Slice val = document.get(_sourceField); - *((int64_t*)targetPtr) = val.isInteger() ? val.getInt() : _eDefault; - return sizeof(int64_t); - } - - bool buildVertexDocument(arangodb::velocypack::Builder& b, - const void* targetPtr, size_t size) override { - b.add(_resultField, VPackValue(*((int64_t*)targetPtr))); - return true; - } - - bool buildEdgeDocument(arangodb::velocypack::Builder& b, - const void* targetPtr, size_t size) override { - b.add(_resultField, VPackValue(*((int64_t*)targetPtr))); - return true; - } -}; - -class FloatGraphFormat : public GraphFormat { - protected: - const std::string _sourceField, _resultField; - const float _vDefault, _eDefault; - - public: - FloatGraphFormat(std::string const& source, std::string const& result, - float vertexNull, float edgeNull) - : _sourceField(source), - _resultField(result), - _vDefault(vertexNull), - _eDefault(edgeNull) {} - - float readVertexData(const void* ptr) { return *((float*)ptr); } - float readEdgeData(const void* ptr) { return *((float*)ptr); } - - size_t copyVertexData(VertexEntry const& vertex, - std::string const& documentId, - arangodb::velocypack::Slice document, void* targetPtr, - size_t maxSize) override { - arangodb::velocypack::Slice val = document.get(_sourceField); - *((float*)targetPtr) = val.isDouble() ? (float)val.getDouble() : _vDefault; - return sizeof(float); - } - - size_t copyEdgeData(arangodb::velocypack::Slice document, void* targetPtr, - size_t maxSize) override { - arangodb::velocypack::Slice val = document.get(_sourceField); - *((float*)targetPtr) = val.isDouble() ? (float)val.getDouble() : _eDefault; - return sizeof(float); - } - - bool buildVertexDocument(arangodb::velocypack::Builder& b, - const void* targetPtr, size_t size) override { - b.add(_resultField, VPackValue(readVertexData(targetPtr))); - return true; - } - - bool buildEdgeDocument(arangodb::velocypack::Builder& b, - const void* targetPtr, size_t size) override { - b.add(_resultField, VPackValue(readEdgeData(targetPtr))); - return true; - } -}; -/* template class NumberGraphFormat : public GraphFormat { static_assert(std::is_arithmetic::value, "Vertex type must be numeric"); @@ -157,20 +72,19 @@ public: _vDefault(vertexNull), _eDefault(edgeNull) {} - V readVertexData(void* ptr) override { return *((V*)ptr); } - E readEdgeData(void* ptr) override { return *((E*)ptr); } - - size_t copyVertexData(arangodb::velocypack::Slice document, void* targetPtr, - size_t maxSize) override { + size_t copyVertexData(VertexEntry const& vertex, + std::string const& documentId, + arangodb::velocypack::Slice document, + void* targetPtr, size_t maxSize) override { arangodb::velocypack::Slice val = document.get(_sourceField); - if (std::is_integral()) { - if (std::is_signed()) { + if (std::is_integral::value) { + if (std::is_signed::value) { *((V*)targetPtr) = val.isInteger() ? val.getInt() : _vDefault; } else { *((V*)targetPtr) = val.isInteger() ? val.getUInt() : _vDefault; } } else { - *((V*)targetPtr) = val.isDouble() ? val.getDouble() : _vDefault; + *((V*)targetPtr) = val.isNumber() ? val.getNumber() : _vDefault; } return sizeof(V); } @@ -178,30 +92,109 @@ public: size_t copyEdgeData(arangodb::velocypack::Slice document, void* targetPtr, size_t maxSize) override { arangodb::velocypack::Slice val = document.get(_sourceField); - if (std::is_integral()) { - if (std::is_signed()) { + if (std::is_integral::value) { + if (std::is_signed::value) {// getNumber does range checks *((E*)targetPtr) = val.isInteger() ? val.getInt() : _eDefault; } else { *((E*)targetPtr) = val.isInteger() ? val.getUInt() : _eDefault; } } else { - *((E*)targetPtr) = val.isDouble() ? val.getDouble() : _eDefault; + *((E*)targetPtr) = val.isNumber() ? val.getNumber() : _eDefault; } return sizeof(E); } - void buildVertexDocument(arangodb::velocypack::Builder& b, const void* -targetPtr, + bool buildVertexDocument(arangodb::velocypack::Builder& b, const void* ptr, size_t size) override { - b.add(_resultField, VPackValue(readVertexData(targetPtr))); + b.add(_resultField, VPackValue(*((V*)ptr))); + return true; } - void buildEdgeDocument(arangodb::velocypack::Builder& b, const void* -targetPtr, + bool buildEdgeDocument(arangodb::velocypack::Builder& b, const void* ptr, size_t size) override { - b.add(_resultField, VPackValue(readEdgeData(targetPtr))); + b.add(_resultField, VPackValue(*((E*)ptr))); + return true; } -};//*/ +}; + +template +class InitGraphFormat : public GraphFormat { + protected: + const std::string _resultField; + const V _vDefault; + const E _eDefault; + + public: + + InitGraphFormat(std::string const& result, V vertexNull, E edgeNull) + : _resultField(result), _vDefault(vertexNull), _eDefault(edgeNull) {} + + size_t copyVertexData(VertexEntry const& vertex, + std::string const& documentId, + arangodb::velocypack::Slice document, + void* targetPtr, size_t maxSize) override { + *((V*)targetPtr) = _vDefault; + return sizeof(V); + } + + size_t copyEdgeData(arangodb::velocypack::Slice document, void* targetPtr, + size_t maxSize) override { + *((E*)targetPtr) = _eDefault; + return sizeof(E); + } + + bool buildVertexDocument(arangodb::velocypack::Builder& b, + const void* ptr, + size_t size) override { + b.add(_resultField, VPackValue(*((V*)ptr))); + return true; + } + + bool buildEdgeDocument(arangodb::velocypack::Builder& b, const void* ptr, + size_t size) override { + b.add(_resultField, VPackValue(*((E*)ptr))); + return true; + } +}; + + +template +class VertexGraphFormat : public GraphFormat { +protected: + const std::string _resultField; + const V _vDefault; + +public: + + VertexGraphFormat(std::string const& result, V vertexNull) + : _resultField(result), _vDefault(vertexNull) {} + + size_t copyVertexData(VertexEntry const& vertex, + std::string const& documentId, + arangodb::velocypack::Slice document, + void* targetPtr, size_t maxSize) override { + *((V*)targetPtr) = _vDefault; + return sizeof(V); + } + + size_t copyEdgeData(arangodb::velocypack::Slice document, void* targetPtr, + size_t maxSize) override { + return 0; + } + + bool buildVertexDocument(arangodb::velocypack::Builder& b, + const void* ptr, + size_t size) override { + b.add(_resultField, VPackValue(*((V*)ptr))); + return true; + } + + bool buildEdgeDocument(arangodb::velocypack::Builder& b, const void* ptr, + size_t size) override { + return false; + } +}; + } } #endif diff --git a/arangod/Pregel/GraphStore.cpp b/arangod/Pregel/GraphStore.cpp index 0a4065fcfe..d3d9fe8810 100644 --- a/arangod/Pregel/GraphStore.cpp +++ b/arangod/Pregel/GraphStore.cpp @@ -126,8 +126,9 @@ void GraphStore::loadDocument(WorkerConfig const& config, std::string documentId = _readTrx->extractIdString(opResult.slice()); VertexEntry entry(sourceShard, _key); V vertexData; - size_t size = _graphFormat->copyVertexData( - entry, documentId, opResult.slice(), &vertexData, sizeof(V)); + size_t size = _graphFormat->copyVertexData(entry, documentId, + opResult.slice(), + &vertexData, sizeof(V)); if (size > 0) { entry._vertexDataOffset = _vertexData.size(); _vertexData.push_back(vertexData); diff --git a/arangod/Pregel/IncomingCache.cpp b/arangod/Pregel/IncomingCache.cpp index 65b512ca5e..f35c2fb494 100644 --- a/arangod/Pregel/IncomingCache.cpp +++ b/arangod/Pregel/IncomingCache.cpp @@ -239,9 +239,12 @@ void CombiningInCache::forEach( } // template types to create +template class arangodb::pregel::InCache; template class arangodb::pregel::InCache; template class arangodb::pregel::InCache; +template class arangodb::pregel::ArrayInCache; template class arangodb::pregel::ArrayInCache; template class arangodb::pregel::ArrayInCache; +template class arangodb::pregel::CombiningInCache; template class arangodb::pregel::CombiningInCache; template class arangodb::pregel::CombiningInCache; diff --git a/arangod/Pregel/MasterContext.h b/arangod/Pregel/MasterContext.h index a0ce048659..d8ec7d11ac 100644 --- a/arangod/Pregel/MasterContext.h +++ b/arangod/Pregel/MasterContext.h @@ -27,7 +27,6 @@ #include #include "Basics/Common.h" #include "Pregel/AggregatorHandler.h" -#include "Pregel/Utils.h" namespace arangodb { namespace pregel { diff --git a/arangod/Pregel/MessageCombiner.h b/arangod/Pregel/MessageCombiner.h index 15c455595e..31d7a4659d 100644 --- a/arangod/Pregel/MessageCombiner.h +++ b/arangod/Pregel/MessageCombiner.h @@ -34,14 +34,25 @@ struct MessageCombiner { virtual void combine(M& firstValue, M const& secondValue) const = 0; }; -struct IntegerMinCombiner : public MessageCombiner { - IntegerMinCombiner() {} - void combine(int64_t& firstValue, int64_t const& secondValue) const override { +template +struct MinCombiner : public MessageCombiner { + static_assert(std::is_arithmetic::value, "Message type must be numeric"); + MinCombiner() {} + void combine(M& firstValue, M const& secondValue) const override { if (firstValue > secondValue) { firstValue = secondValue; } }; }; + +template +struct SumCombiner : public MessageCombiner { + static_assert(std::is_arithmetic::value, "Message type must be numeric"); + SumCombiner() {} + void combine(M& firstValue, M const& secondValue) const { + firstValue += secondValue; + } +}; } } #endif diff --git a/arangod/Pregel/MessageFormat.h b/arangod/Pregel/MessageFormat.h index a4ec665b13..48ee9ef0fd 100644 --- a/arangod/Pregel/MessageFormat.h +++ b/arangod/Pregel/MessageFormat.h @@ -58,23 +58,19 @@ struct FloatMessageFormat : public MessageFormat { arrayBuilder.add(VPackValue(val)); } }; - -/* - template - struct NumberMessageFormat : public MessageFormat { - static_assert(std::is_arithmetic::value, "Type must be numeric"); - NumberMessageFormat() {} - bool unwrapValue(VPackSlice s, int64_t& value) const override { - if (s.isNumber()) { - value = s.getNumber(); - return true; - } - return false; - } - void addValue(VPackBuilder& arrayBuilder, int64_t const& val) const override { - arrayBuilder.add(VPackValue(val)); - } - };*/ + +template +struct NumberMessageFormat : public MessageFormat { + static_assert(std::is_arithmetic::value, "Message type must be numeric"); + NumberMessageFormat() {} + void unwrapValue(VPackSlice s, M& value) const override { + value = s.getNumber(); + } + void addValue(VPackBuilder& arrayBuilder, M const& val) const override { + arrayBuilder.add(VPackValue(val)); + } +}; + } } #endif diff --git a/arangod/Pregel/VertexComputation.h b/arangod/Pregel/VertexComputation.h index e91ea37e77..15378b58a0 100644 --- a/arangod/Pregel/VertexComputation.h +++ b/arangod/Pregel/VertexComputation.h @@ -59,9 +59,14 @@ class VertexContext { } template - inline void aggregate(std::string const& name, const T* valuePtr) { + inline void aggregate(std::string const& name, T const* valuePtr) { _workerAggregators->aggregate(name, valuePtr); } + + template + inline void aggregate(std::string const& name, T const& valuePtr) { + _workerAggregators->aggregate(name, &valuePtr); + } inline WorkerContext const* context() { return _context; } @@ -101,6 +106,14 @@ class VertexComputation : public VertexContext { void sendMessage(Edge const* edge, M const& data) { _cache->appendMessage(edge->targetShard(), edge->toKey(), data); } + + // TODO optimize outgoing cache somehow + void sendMessageToAllEdges(M const& data) { + RangeIterator> edges = this->getEdges(); + for (Edge const* edge : edges) { + _cache->appendMessage(edge->targetShard(), edge->toKey(), data); + } + } void enterNextPhase() { if (!_nextPhase) {