1
0
Fork 0

Started with DMID

This commit is contained in:
Simon Grätzer 2017-02-16 14:38:04 +01:00
parent f58e7a4185
commit d263c51f59
18 changed files with 368 additions and 190 deletions

View File

@ -43,14 +43,18 @@ class IAggregator {
IAggregator() {}
virtual ~IAggregator() {}
/// @brief Value from superstep S-1 supplied by the conductor
/// @brief Used when updating aggregator value locally
virtual void aggregate(void const* valuePtr) = 0;
/// @brief Used when updating aggregator value from remote
virtual void parseAggregate(VPackSlice const& slice) = 0;
virtual void const* getAggregatedValue() const = 0;
/// @brief Value from superstep S-1 supplied by the conductor
virtual void setAggregatedValue(VPackSlice const& slice) = 0;
virtual void serialize(std::string const& key, VPackBuilder &builder) const = 0;
virtual void const* getValue() const = 0;
virtual VPackValue vpackValue() const = 0;
virtual void parse(VPackSlice slice) = 0;
virtual void reset(bool force) = 0;
virtual void reset() = 0;
virtual bool isConverging() const = 0;
};
@ -58,26 +62,34 @@ template <typename T>
struct NumberAggregator : public IAggregator {
static_assert(std::is_arithmetic<T>::value, "Type must be numeric");
NumberAggregator(T init, bool perm = false, bool conv = false)
: _value(init), _initial(init), _permanent(perm), _converging(conv) {}
void const* getValue() const override { return &_value; };
VPackValue vpackValue() const override { return VPackValue(_value); };
void parse(VPackSlice slice) override {
NumberAggregator(T neutral, bool perm = false, bool conv = false)
: _value(neutral), _neutral(neutral), _permanent(perm), _converging(conv) {}
void parseAggregate(VPackSlice const& slice) override {
T f = slice.getNumber<T>();
aggregate((void const*)(&f));
};
void const* getAggregatedValue() const override { return &_value; };
void setAggregatedValue(VPackSlice const& slice) override {
_value = slice.getNumber<T>();
}
void serialize(std::string const& key, VPackBuilder &builder) const override {
builder.add(key, VPackValue(_value));
};
void reset(bool force) override {
if (!_permanent || force) {
_value = _initial;
void reset() override {
if (!_permanent) {
_value = _neutral;
}
}
bool isConverging() const override { return _converging; }
protected:
T _value, _initial;
T _value, _neutral;
bool _permanent, _converging;
};
@ -109,36 +121,46 @@ struct SumAggregator : public NumberAggregator<T> {
void aggregate(void const* valuePtr) override {
this->_value += *((T*)valuePtr);
};
void parse(VPackSlice slice) override {
void parseAggregate(VPackSlice const& slice) override {
this->_value += slice.getNumber<T>();
}
};
/// Aggregator that stores a value that is overwritten once another value is aggregated.
/// This aggregator is useful for one-to-many communication from master.compute() or from a special vertex.
/// In case multiple vertices write to this aggregator, its behavior is non-deterministic.
template <typename T>
struct ValueAggregator : public NumberAggregator<T> {
ValueAggregator(T val, bool perm = false)
struct OverwriteAggregator : public NumberAggregator<T> {
OverwriteAggregator(T val, bool perm = false)
: NumberAggregator<T>(val, perm, true) {}
void aggregate(void const* valuePtr) override {
this->_value = *((T*)valuePtr);
};
void parse(VPackSlice slice) override { this->_value = slice.getNumber<T>(); }
void parseAggregate(VPackSlice const& slice) override { this->_value = slice.getNumber<T>(); }
};
/// always initializes to true.
struct BoolOrAggregator : public IAggregator {
BoolOrAggregator(bool perm = false) : _permanent(perm) {}
void const* getValue() const override { return &_value; };
VPackValue vpackValue() const override { return VPackValue(_value); };
void aggregate(void const* valuePtr) override {
_value = _value || *((bool*)valuePtr);
};
void parse(VPackSlice slice) override { _value = _value || slice.getBool(); }
void parseAggregate(VPackSlice const& slice) override { _value = _value || slice.getBool(); }
void const* getAggregatedValue() const override { return &_value; };
void setAggregatedValue(VPackSlice const& slice) override {
_value = slice.getBool();
}
void serialize(std::string const& key, VPackBuilder &builder) const override {
builder.add(key, VPackValue(_value));
};
void reset(bool force) override {
if (!_permanent || force) {
void reset() override {
if (!_permanent) {
_value = false;
}
}

View File

@ -37,7 +37,7 @@ AggregatorHandler::~AggregatorHandler() {
_values.clear();
}
IAggregator* AggregatorHandler::_get(AggregatorID const& name) {
IAggregator* AggregatorHandler::getAggregator(AggregatorID const& name) {
{
READ_LOCKER(guard, _lock);
auto it = _values.find(name);
@ -59,46 +59,57 @@ IAggregator* AggregatorHandler::_get(AggregatorID const& name) {
void AggregatorHandler::aggregate(AggregatorID const& name,
const void* valuePtr) {
IAggregator* agg = _get(name);
IAggregator* agg = getAggregator(name);
if (agg) {
agg->aggregate(valuePtr);
}
}
const void* AggregatorHandler::getAggregatedValue(AggregatorID const& name) {
IAggregator* agg = _get(name);
return agg != nullptr ? agg->getValue() : nullptr;
}
void AggregatorHandler::resetValues(bool force) {
for (auto& it : _values) {
it.second->reset(force);
}
}
void AggregatorHandler::aggregateValues(AggregatorHandler const& workerValues) {
for (auto const& pair : workerValues._values) {
AggregatorID const& name = pair.first;
IAggregator* agg = _get(name);
IAggregator* agg = getAggregator(name);
if (agg) {
agg->aggregate(pair.second->getValue());
agg->aggregate(pair.second->getAggregatedValue());
}
}
}
bool AggregatorHandler::parseValues(VPackSlice data) {
VPackSlice values = data.get(Utils::aggregatorValuesKey);
if (values.isObject() == false) {
return false;
}
for (auto const& keyValue : VPackObjectIterator(values)) {
AggregatorID name = keyValue.key.copyString();
IAggregator* agg = _get(name);
if (agg) {
agg->parse(keyValue.value);
void AggregatorHandler::aggregateValues(VPackSlice const& workerValues) {
VPackSlice values = workerValues.get(Utils::aggregatorValuesKey);
if (values.isObject()) {
for (auto const& keyValue : VPackObjectIterator(values)) {
AggregatorID name = keyValue.key.copyString();
IAggregator* agg = getAggregator(name);
if (agg) {
agg->parseAggregate(keyValue.value);
}
}
}
return true;
}
void AggregatorHandler::setAggregatedValues(VPackSlice const& workerValues) {
VPackSlice values = workerValues.get(Utils::aggregatorValuesKey);
if (values.isObject()) {
for (auto const& keyValue : VPackObjectIterator(values)) {
AggregatorID name = keyValue.key.copyString();
IAggregator* agg = getAggregator(name);
if (agg) {
agg->setAggregatedValue(keyValue.value);
}
}
}
}
const void* AggregatorHandler::getAggregatedValue(AggregatorID const& name) {
IAggregator* agg = getAggregator(name);
return agg != nullptr ? agg->getAggregatedValue() : nullptr;
}
void AggregatorHandler::resetValues() {
for (auto& it : _values) {
it.second->reset();
}
}
bool AggregatorHandler::serializeValues(VPackBuilder& b,
@ -107,9 +118,10 @@ bool AggregatorHandler::serializeValues(VPackBuilder& b,
bool hasValues = false;
b.add(Utils::aggregatorValuesKey, VPackValue(VPackValueType::Object));
for (auto const& pair : _values) {
AggregatorID const& name = pair.first;
IAggregator* agg = pair.second;
if (!onlyConverging || agg->isConverging()) {
b.add(pair.first, agg->vpackValue());
agg->serialize(name, b);
hasValues = true;
}
}

View File

@ -42,28 +42,28 @@ class AggregatorHandler {
std::map<std::string, IAggregator*> _values;
mutable basics::ReadWriteLock _lock;
IAggregator* _get(std::string const& name);
public:
AggregatorHandler(IAlgorithm const* c) : _algorithm(c) {}
~AggregatorHandler();
void registerAggregator(std::string const& name, IAggregator* aggregator);
IAggregator* getAggregator(std::string const& name);
/// aggregate this value
void aggregate(std::string const& name, const void* valuePtr);
/// aggregates all values from this aggregator
void aggregateValues(AggregatorHandler const& workerValues);
/// aggregates all values from this aggregator
void aggregateValues(VPackSlice const& workerValues);
/// return true if there are values in this Slice
void setAggregatedValues(VPackSlice const& workerValues);
/// get the pointer to an aggregator value
const void* getAggregatedValue(std::string const& name);
/// calls reset on every aggregator
void resetValues(bool force = false);
/// aggregates all values from this aggregator
void aggregateValues(AggregatorHandler const& workerValues);
/// return true if there are values in this Slice
bool parseValues(VPackSlice workerValues);
void resetValues();
/// return true if there values in this aggregator which were serialized
bool serializeValues(VPackBuilder& b, bool onlyConverging = false) const;

View File

@ -86,7 +86,7 @@ template <typename V, typename E, typename M>
struct Algorithm : IAlgorithm {
public:
virtual WorkerContext* workerContext(VPackSlice userParams) const {
return new WorkerContext(userParams);
return new WorkerContext();
}
virtual GraphFormat<V, E>* inputFormat() const = 0;
virtual MessageFormat<M>* messageFormat() const = 0;

View File

@ -254,7 +254,7 @@ MasterContext* AsyncSCC::masterContext(VPackSlice userParams) const {
IAggregator* AsyncSCC::aggregator(std::string const& name) const {
if (name == kPhase) { // permanent value
return new ValueAggregator<uint32_t>(SCCPhase::TRANSPOSE, true);
return new OverwriteAggregator<uint32_t>(SCCPhase::TRANSPOSE, true);
} else if (name == kFoundNewMax) {
return new BoolOrAggregator(false); // non perm
} else if (name == kConverged) {

View File

@ -29,6 +29,7 @@
#include "Pregel/IncomingCache.h"
#include "Pregel/MasterContext.h"
#include "Pregel/VertexComputation.h"
#include "Pregel/Algos/DMID/VertexSumAggregator.h"
using namespace arangodb;
using namespace arangodb::pregel;
@ -115,7 +116,7 @@ struct DMIDComputation
superstep4(vertex, messages);
}
if (getSuperstep() == rwFinished +1) {
if (globalSuperstep() == rwFinished +1) {
/**
* Superstep 0 and RW_ITERATIONBOUND + 5 are identical. Therefore
* call superstep0
@ -123,38 +124,36 @@ struct DMIDComputation
superstep0(vertex, messages);
}
if (getSuperstep() == rwFinished+2) {
if (globalSuperstep() == rwFinished+2) {
superstep6(vertex, messages);
}
if (getSuperstep() == rwFinished + 3) {
if (globalSuperstep() == rwFinished + 3) {
superstep7(vertex, messages);
}
LongWritable iterationCounter = getAggregatedValue(ITERATION_AGG);
double it = iterationCounter.get();
int64_t iterationCounter = getAggregatedValue<int64_t>(ITERATION_AGG);
if (getSuperstep() >= rwFinished +4
&& (it % 3 == 1 )) {
superstep8(vertex, messages);
&& (iterationCounter % 3 == 1 )) {
superstep8(messages);
}
if (getSuperstep() >= rwFinished +5
&& (it % 3 == 2 )) {
superstep9(vertex, messages);
&& (iterationCounter % 3 == 2 )) {
superstep9(messages);
}
if (getSuperstep() >= rwFinished +6
&& (it % 3 == 0 )) {
superstep10(vertex, messages);
&& (iterationCounter % 3 == 0 )) {
superstep10(messages);
}
}
/**
* SUPERSTEP 0: send a message along all outgoing edges. Message contains
* own VertexID and the edge weight.
*/
private void superstep0(
Vertex<LongWritable, DMIDVertexValue, DoubleWritable> vertex,
void superstep0(Vertex<LongWritable, DMIDVertexValue, DoubleWritable> vertex,
Iterable<LongDoubleMessage> messages) {
long vertexID = vertex.getId().get();
@ -171,7 +170,7 @@ struct DMIDComputation
* the form (ID,weightedInDegree) along all incoming edges (send every node
* a reply)
*/
private void superstep1(
void superstep1(
Vertex<LongWritable, DMIDVertexValue, DoubleWritable> vertex,
Iterable<LongDoubleMessage> messages) {
@ -208,7 +207,7 @@ struct DMIDComputation
* Save the column as a part of the vertexValue. Aggregate DA with value 1/N
* to initialize the Random Walk.
*/
private void superstep2(
void superstep2(
Vertex<LongWritable, DMIDVertexValue, DoubleWritable> vertex,
Iterable<LongDoubleMessage> messages) {
@ -264,7 +263,7 @@ struct DMIDComputation
* SUPERSTEP 3 - RW_ITERATIONBOUND+3: Calculate entry DA^(t+1)_ownID using
* DA^t and disCol. Save entry in the DA aggregator.
*/
private void superstepRW(
void superstepRW(
Vertex<LongWritable, DMIDVertexValue, DoubleWritable> vertex,
Iterable<LongDoubleMessage> messages) {
@ -614,10 +613,6 @@ struct DMIDValueMasterContext : public MasterContext {
DMIDValueMasterContext() {} // TODO use _threashold
void preGlobalSuperstep() override {
};
@Override
public void compute() {
/**
* setAggregatorValue sets the value for the aggregator after master
* compute, before starting vertex compute of the same superstep. Does
@ -625,31 +620,26 @@ struct DMIDValueMasterContext : public MasterContext {
*/
LongWritable iterCount = getAggregatedValue(DMIDComputation.ITERATION_AGG);
boolean hasCascadingStarted = false;
LongWritable newIterCount = new LongWritable((iterCount.get() + 1));
if (iterCount.get() != 0) {
int64_t* iterCount = getAggregatedValue<int64_t>(ITERATION_AGG);
int64_t newIterCount = *iterCount + 1;
bool hasCascadingStarted = false;
if (*iterCount != 0) {
/** Cascading behavior started increment the iteration count */
setAggregatedValue(DMIDComputation.ITERATION_AGG, newIterCount);
aggregateValue<int64_t>(ITERATION_AGG, newIterCount);
hasCascadingStarted = true;
}
if (getSuperstep() == DMIDComputation.RW_ITERATIONBOUND+ 8) {
setAggregatedValue(DMIDComputation.NEW_MEMBER_AGG,
new BooleanWritable(false));
setAggregatedValue(DMIDComputation.NOT_ALL_ASSIGNED_AGG,
new BooleanWritable(true));
setAggregatedValue(DMIDComputation.ITERATION_AGG, new LongWritable(1));
if (getSuperstep() == RW_ITERATIONBOUND+ 8) {
aggregateValue<bool>(NEW_MEMBER_AGG, false);
aggregateValue<bool>(NOT_ALL_ASSIGNED_AGG, true);
aggregateValue<int64_t>(ITERATION_AGG, 1);
hasCascadingStarted = true;
initializeGL();
}
if (hasCascadingStarted && (newIterCount.get() % 3 == 1)) {
if (hasCascadingStarted && (newIterCount % 3 == 1)) {
/** first step of one iteration */
LongWritable restartCountWritable = getAggregatedValue(RESTART_COUNTER_AGG);
Long restartCount=restartCountWritable.get();
int64_t* restartCountWritable = getAggregatedValue<int64_t>(RESTART_COUNTER_AGG);
Long restartCount = restartCountWritable.get();
BooleanWritable newMember = getAggregatedValue(DMIDComputation.NEW_MEMBER_AGG);
BooleanWritable notAllAssigned = getAggregatedValue(DMIDComputation.NOT_ALL_ASSIGNED_AGG);
@ -672,17 +662,15 @@ struct DMIDValueMasterContext : public MasterContext {
}
if (hasCascadingStarted && (iterCount.get() % 3 == 2)) {
if (hasCascadingStarted && (*iterCount % 3 == 2)) {
/** Second step of one iteration */
/**
* Set newMember aggregator and notAllAssigned aggregator back to
* initial value
*/
setAggregatedValue(DMIDComputation.NEW_MEMBER_AGG,
new BooleanWritable(false));
setAggregatedValue(DMIDComputation.NOT_ALL_ASSIGNED_AGG,
new BooleanWritable(false));
setAggregatedValue<bool>(NEW_MEMBER_AGG, false);
setAggregatedValue<bool>(NOT_ALL_ASSIGNED_AGG, false);
}
if (LOG_AGGS) {
@ -719,10 +707,10 @@ struct DMIDValueMasterContext : public MasterContext {
* Initilizes the global leader aggregator with 1 for every vertex with a
* higher number of followers than the average.
*/
private void initializeGL() {
void initializeGL() {
DoubleSparseVector initGL = new DoubleSparseVector(
(int) getTotalNumVertices());
DoubleSparseVector vecFD = getAggregatedValue(DMIDComputation.FD_AGG);
VertexSumAggregator::VertexMap const* vecFD = getAggregatedValue<VertexSumAggregator::VertexMap>(FD_AGG);
double averageFD = 0.0;
int numLocalLeader = 0;
@ -740,7 +728,7 @@ struct DMIDValueMasterContext : public MasterContext {
if (LOG_AGGS) {
System.out.print("Global Leader:");
}
for (int i = 0; i < getTotalNumVertices(); ++i) {
for (int i = 0; i < vertexCount(); ++i) {
if (vecFD.get(i) > averageFD) {
initGL.set(i, 1.0);
if (LOG_AGGS) {
@ -768,43 +756,24 @@ MasterContext* SCC::masterContext(VPackSlice userParams) const {
IAggregator* SCC::aggregator(std::string const& name) const {
if (name == DA_AGG) { // permanent value
return new ValueAggregator<uint32_t>(SCCPhase::TRANSPOSE, true);
} else if (name == kFoundNewMax) {
return new BoolOrAggregator(false); // non perm
} else if (name == kConverged) {
return new BoolOrAggregator(false); // non perm
return new VertexSumAggregator(false);// non perm
} else if (name == LS_AGG) {
return new VertexSumAggregator(true);// perm
} else if (name == FD_AGG) {
return new VertexSumAggregator(true);// perm
} else if (name == GL_AGG) {
return new VertexSumAggregator(true);// perm
} else if (name == NEW_MEMBER_AGG) {
return new BooleanOrAggregator(false); // non perm
} else if (name == NOT_ALL_ASSIGNED_AGG) {
return new BooleanOrAggregator(false); // non perm
} else if (name == ITERATION_AGG) {
return new MaxAggregator<int64_t>(0, true); // perm
} else if (name == PROFITABILITY_AGG) {
return new MaxAggregator<double>(0.5, true); // perm
} else if (name == RESTART_COUNTER_AGG) {
return new MaxAggregator<int64_t>(1, true); // perm
}
registerAggregator(DMIDComputation.DA_AGG,
DoubleDenseVectorSumAggregator.class);
registerPersistentAggregator(DMIDComputation.LS_AGG,
DoubleDenseVectorSumAggregator.class);
registerPersistentAggregator(DMIDComputation.FD_AGG,
DoubleSparseVectorSumAggregator.class);
registerPersistentAggregator(DMIDComputation.GL_AGG,
DoubleSparseVectorSumAggregator.class);
registerAggregator(DMIDComputation.NEW_MEMBER_AGG,
BooleanOrAggregator.class);
registerAggregator(DMIDComputation.NOT_ALL_ASSIGNED_AGG,
BooleanOrAggregator.class);
registerPersistentAggregator(DMIDComputation.ITERATION_AGG,
LongMaxAggregator.class);
registerPersistentAggregator(DMIDComputation.PROFITABILITY_AGG,
DoubleMaxAggregator.class);
registerPersistentAggregator(RESTART_COUNTER_AGG,
LongMaxAggregator.class);
//registerAggregator(DMIDComputation.RW_INFINITYNORM_AGG,
// DoubleMaxAggregator.class);
//registerAggregator(DMIDComputation.RW_FINISHED_AGG,
//LongMaxAggregator.class);
setAggregatedValue(DMIDComputation.PROFITABILITY_AGG,
new DoubleWritable(0.5));
setAggregatedValue(RESTART_COUNTER_AGG, new LongWritable(1));
setAggregatedValue(DMIDComputation.ITERATION_AGG, new LongWritable(0));
return nullptr;
}

View File

@ -0,0 +1,132 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2016 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Simon Grätzer
////////////////////////////////////////////////////////////////////////////////
#include <vector>
#include <velocypack/Builder.h>
#include <velocypack/Iterators.h>
#include <velocypack/Slice.h>
#include <velocypack/velocypack-aliases.h>
#include "Pregel/Graph.h"
#ifndef ARANGODB_PREGEL_AGG_DENSE_VECTOR_H
#define ARANGODB_PREGEL_AGG_DENSE_VECTOR_H 1
namespace arangodb {
namespace pregel {
struct VertexSumAggregator : public IAggregator {
static_assert(std::is_arithmetic<T>::value, "Type must be numeric");
typedef std::map<prgl_shard_t, std::unordered_map<std::string, double>> VertexMap;
VertexSumAggregator(bool perm = false)
: _empty(empty), _permanent(perm) {}
// if you use this from a vertex I will end you
void aggregate(void const* valuePtr) {
VertexMap const* map = (VertexMap)valuePtr;
for (auto pair1 const& : map) {
for (auto pair2 const& : it.second) {
_entries[pair1.first][pair2.first] += pair2.second;
}
}
};
void parseAggregate(VPackSlice slice) override {
for (auto const& pair: VPackObjectIterator(slice)) {
prgl_shard_t shard = it.key.getUInt();
std::string key;
VPackLength i = 0;
for (VPackSlice const& val : VPackArrayIterator(pair.value)) {
if (i % 2 == 0) {
key = val.copyString();
} else {
entries[shard][key] += val.getNumber<double>();
}
i++;
}
}
}
void const* getAggregatedValue() const override { return &_entries; };
void setAggregatedValue(VPackSlice slice) override {
for (auto const& pair: VPackObjectIterator(slice)) {
prgl_shard_t shard = it.key.getUInt();
std::string key;
VPackLength i = 0;
for (VPackSlice const& val : VPackArrayIterator(pair.value)) {
if (i % 2 == 0) {
key = val.copyString();
} else {
entries[shard][key] = val.getNumber<double>();
}
i++;
}
}
}
void serialize(std::string const& key, VPackBuilder &builder) const override {
builder.add(key, VPackValueType::Array);
for (T const& e : _entries) {
builder.add(VPackValue(e));
}
builder.close();
};
void reset() override {
if (!_permanent) {
_entries.clear();
}
}
double getAggregatedValue(prgl_shard_t shard, std::string const& key) {
auto const& it1 = _entries.find(shard);
if (it1 != _entries.end()) {
auto const& it2 = it1.second.find(key);
if (it2 != it1.second.end()) {
return it2.second;
}
}
return _empty;
}
void setValue(prgl_shard_t shard, std::string const& key, double val) {
_entries[shard][key] = val;
}
void aggregate(prgl_shard_t shard, std::string const& key, double val) {
_entries[shard][key] += val;
}
void setEmptyValue(double empty) {
_empty = empty;
}
bool isConverging() const override { return false; }
protected:
VertexMap _entries;
double _empty = 0;
bool _permanent;
};
}
#endif

View File

@ -27,6 +27,7 @@
#include "Pregel/MasterContext.h"
#include "Pregel/Utils.h"
#include "Pregel/VertexComputation.h"
#include "Pregel/WorkerContext.h"
#include "Cluster/ClusterInfo.h"
#include "Utils/OperationCursor.h"
@ -49,16 +50,29 @@ LineRank::LineRank(arangodb::velocypack::Slice params)
//_threshold = t.isNumber() ? t.getNumber<float>() : 0.000002f;
}
struct LRWorkerContext : WorkerContext {
float startAtNodeProb = 0;
void preApplication() override {
startAtNodeProb = 1.0f / edgeCount();
};
};
WorkerContext* LineRank::workerContext(VPackSlice params) const {
return new LRWorkerContext();
}
// github.com/JananiC/NetworkCentralities/blob/master/src/main/java/linerank/LineRank.java
struct LRComputation : public VertexComputation<float, float, float> {
LRComputation() {}
void compute(MessageIterator<float> const& messages) override {
float startAtNodeProb = 1.0f / context()->edgeCount();
LRWorkerContext *ctx = (LRWorkerContext *)context();
float* vertexValue = mutableVertexData();
RangeIterator<Edge<float>> edges = getEdges();
if (*vertexValue < 0.0f) {
*vertexValue = startAtNodeProb;
*vertexValue = ctx->startAtNodeProb;
aggregate(kMoreIterations, true);
} else {
float newScore = 0.0f;
@ -76,7 +90,7 @@ struct LRComputation : public VertexComputation<float, float, float> {
} else {
newScore /= edges.size();
newScore =
startAtNodeProb * RESTART_PROB + newScore * (1.0 - RESTART_PROB);
ctx->startAtNodeProb * RESTART_PROB + newScore * (1.0 - RESTART_PROB);
}
float diff = fabsf(newScore - *vertexValue);
@ -91,6 +105,8 @@ struct LRComputation : public VertexComputation<float, float, float> {
}
};
VertexComputation<float, float, float>* LineRank::createComputation(
WorkerConfig const* config) const {
return new LRComputation();
@ -98,7 +114,7 @@ VertexComputation<float, float, float>* LineRank::createComputation(
IAggregator* LineRank::aggregator(std::string const& name) const {
if (name == kMoreIterations) {
return new ValueAggregator<bool>(false, false); // non perm
return new OverwriteAggregator<bool>(false, false); // non perm
}
return nullptr;
}

View File

@ -51,9 +51,12 @@ struct LineRank : public SimpleAlgorithm<float, float, float> {
MessageCombiner<float>* messageCombiner() const override {
return new SumCombiner<float>();
}
WorkerContext* workerContext(VPackSlice params) const override;
VertexComputation<float, float, float>* createComputation(
WorkerConfig const*) const override;
IAggregator* aggregator(std::string const& name) const override;
};
}

View File

@ -50,6 +50,7 @@ struct PageRank : public SimpleAlgorithm<float, float, float> {
VertexComputation<float, float, float>* createComputation(
WorkerConfig const*) const override;
IAggregator* aggregator(std::string const& name) const override;
MasterContext* masterContext(VPackSlice userParams) const override;

View File

@ -87,9 +87,9 @@ IAggregator* RecoveringPageRank::aggregator(std::string const& name) const {
} else if (name == kRank) {
return new SumAggregator<float>(0);
} else if (name == kStep) {
return new ValueAggregator<uint32_t>(0);
return new OverwriteAggregator<uint32_t>(0);
} else if (name == kScale) {
return new ValueAggregator<float>(-1);
return new OverwriteAggregator<float>(-1);
}
return nullptr;
}

View File

@ -237,7 +237,7 @@ MasterContext* SCC::masterContext(VPackSlice userParams) const {
IAggregator* SCC::aggregator(std::string const& name) const {
if (name == kPhase) { // permanent value
return new ValueAggregator<uint32_t>(SCCPhase::TRANSPOSE, true);
return new OverwriteAggregator<uint32_t>(SCCPhase::TRANSPOSE, true);
} else if (name == kFoundNewMax) {
return new BoolOrAggregator(false); // non perm
} else if (name == kConverged) {

View File

@ -142,7 +142,7 @@ bool Conductor::_startGlobalStep() {
_totalEdgesCount = 0;
for (auto const& req : requests) {
VPackSlice payload = req.result.answer->payload();
_aggregators->parseValues(payload);
_aggregators->aggregateValues(payload);
_statistics.accumulateActiveCounts(payload);
_totalVerticesCount += payload.get(Utils::vertexCountKey).getUInt();
_totalEdgesCount += payload.get(Utils::edgeCountKey).getUInt();
@ -267,17 +267,16 @@ VPackBuilder Conductor::finishedWorkerStep(VPackSlice const& data) {
} else if (_statistics.clientCount() < _dbServers.size() || // no messages
!_statistics.allMessagesProcessed()) { // haven't received msgs
VPackBuilder response;
if (_aggregators->parseValues(data)) {
if (_masterContext) {
_masterContext->postLocalSuperstep();
}
response.openObject();
_aggregators->serializeValues(response);
if (_masterContext && _masterContext->_enterNextGSS) {
response.add(Utils::enterNextGSSKey, VPackValue(true));
}
response.close();
_aggregators->aggregateValues(data);
if (_masterContext) {
_masterContext->postLocalSuperstep();
}
response.openObject();
_aggregators->serializeValues(response);
if (_masterContext && _masterContext->_enterNextGSS) {
response.add(Utils::enterNextGSSKey, VPackValue(true));
}
response.close();
return response;
}
@ -312,7 +311,7 @@ void Conductor::finishedRecoveryStep(VPackSlice const& data) {
}
// the recovery mechanism might be gathering state information
_aggregators->parseValues(data);
_aggregators->aggregateValues(data);
if (_respondedServers.size() != _dbServers.size()) {
return;
}

View File

@ -157,4 +157,23 @@ class VertexEntry {
};*/
}
}
/*
namespace std {
template <>
struct hash<arangodb::pregel::PregelID> {
std::size_t operator()(const PregelID& k) const {
using std::size_t;
using std::hash;
using std::string;
// Compute individual hash values for first,
// second and third and combine them using XOR
// and bit shifting:
std::size_t h1 = std::hash<string>()(k.key);
std::size_t h2 = std::hash<prgl_shard_t>()(k.shard);
return h1 ^ (h2 << 1);
}
};
}*/
#endif

View File

@ -77,6 +77,7 @@ class MasterContext {
/// Called when a worker send updated aggregator values.
/// Only called in async mode, never called after a global superstep
/// Can be used to decide to enter the next phase
virtual void postLocalSuperstep(){};
/// should indicate if compensation is supposed to start by returning true

View File

@ -37,7 +37,7 @@ namespace pregel {
template <typename V, typename E, typename M>
class Worker;
class Aggregator;
class IAggregator;
template <typename V, typename E, typename M>
class VertexContext {
@ -47,8 +47,8 @@ class VertexContext {
uint64_t _lss = 0;
WorkerContext* _context;
GraphStore<V, E>* _graphStore;
AggregatorHandler* _conductorAggregators;
AggregatorHandler* _workerAggregators;
AggregatorHandler* _readAggregators;
AggregatorHandler* _writeAggregators;
VertexEntry* _vertexEntry;
public:
@ -57,12 +57,16 @@ class VertexContext {
template <typename T>
inline void aggregate(std::string const& name, T const& value) {
T const* ptr = &value;
_workerAggregators->aggregate(name, ptr);
_writeAggregators->aggregate(name, ptr);
}
template <typename T>
inline const T* getAggregatedValue(std::string const& name) {
return (const T*)_conductorAggregators->getAggregatedValue(name);
return (const T*)_readAggregators->getAggregatedValue(name);
}
IAggregator* getAggregator(std::string const& name) {
return _writeAggregators->getAggregator(name);
}
inline WorkerContext const* context() { return _context; }

View File

@ -201,8 +201,8 @@ VPackBuilder Worker<V, E, M>::prepareGlobalStep(VPackSlice const& data) {
// initialize worker context
if (_workerContext && gss == 0 && _config.localSuperstep() == 0) {
_workerContext->_conductorAggregators = _conductorAggregators.get();
_workerContext->_workerAggregators = _workerAggregators.get();
_workerContext->_readAggregators = _conductorAggregators.get();
_workerContext->_writeAggregators = _workerAggregators.get();
_workerContext->_vertexCount = data.get(Utils::vertexCountKey).getUInt();
_workerContext->_edgeCount = data.get(Utils::edgeCountKey).getUInt();
_workerContext->preApplication();
@ -295,9 +295,8 @@ void Worker<V, E, M>::startGlobalStep(VPackSlice const& data) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER, "Wrong GSS");
}
_workerAggregators->resetValues(true);
_conductorAggregators->resetValues(true);
_conductorAggregators->parseValues(data);
_workerAggregators->resetValues();
_conductorAggregators->setAggregatedValues(data);
// execute context
if (_workerContext) {
_workerContext->_vertexCount = data.get(Utils::vertexCountKey).getUInt();
@ -359,7 +358,7 @@ void Worker<V, E, M>::_initializeVertexContext(VertexContext<V, E, M>* ctx) {
ctx->_lss = _config.localSuperstep();
ctx->_context = _workerContext.get();
ctx->_graphStore = _graphStore.get();
ctx->_conductorAggregators = _conductorAggregators.get();
ctx->_readAggregators = _conductorAggregators.get();
}
// internally called in a WORKER THREAD!!
@ -381,7 +380,7 @@ bool Worker<V, E, M>::_processVertices(size_t threadId,
std::unique_ptr<VertexComputation<V, E, M>> vertexComputation(
_algorithm->createComputation(&_config));
_initializeVertexContext(vertexComputation.get());
vertexComputation->_workerAggregators = &workerAggregator;
vertexComputation->_writeAggregators = &workerAggregator;
vertexComputation->_cache = outCache;
if (!_config.asynchronousMode()) {
// Should cause enterNextGlobalSuperstep to do nothing
@ -531,13 +530,15 @@ void Worker<V, E, M>::_finishedProcessing() {
}
if (_config.asynchronousMode()) {
bool proceed = true;
bool proceed = false;
// if the conductor is unreachable or has send data (try to) proceed
std::unique_ptr<ClusterCommResult> result = _callConductorWithResponse(
Utils::finishedWorkerStepPath, package.slice());
if (result->status == CL_COMM_RECEIVED) {
VPackSlice data = result->answer->payload();
if ((proceed = _conductorAggregators->parseValues(data))) {
if (data.isObject()) {
proceed = true;
_conductorAggregators->aggregateValues(data);// only aggregate values
VPackSlice nextGSS = data.get(Utils::enterNextGSSKey);
if (nextGSS.isBool()) {
_requestedNextGSS = nextGSS.getBool();
@ -643,8 +644,7 @@ void Worker<V, E, M>::compensateStep(VPackSlice const& data) {
MUTEX_LOCKER(guard, _commandMutex);
_workerAggregators->resetValues();
_conductorAggregators->resetValues();
_conductorAggregators->parseValues(data);
_conductorAggregators->setAggregatedValues(data);
ThreadPool* pool = PregelFeature::instance()->threadPool();
pool->enqueue([this] {
@ -657,7 +657,7 @@ void Worker<V, E, M>::compensateStep(VPackSlice const& data) {
std::unique_ptr<VertexCompensation<V, E, M>> vCompensate(
_algorithm->createCompensation(&_config));
_initializeVertexContext(vCompensate.get());
vCompensate->_workerAggregators = _workerAggregators.get();
vCompensate->_writeAggregators = _workerAggregators.get();
size_t i = 0;
for (VertexEntry* vertexEntry : vertexIterator) {

View File

@ -37,19 +37,19 @@ class WorkerContext {
friend class Worker;
uint64_t _vertexCount, _edgeCount;
AggregatorHandler* _conductorAggregators;
AggregatorHandler* _workerAggregators;
AggregatorHandler* _readAggregators;
AggregatorHandler* _writeAggregators;
protected:
template <typename T>
inline void aggregate(std::string const& name, T const& value) {
T const* ptr = &value;
_workerAggregators->aggregate(name, ptr);
_writeAggregators->aggregate(name, ptr);
}
template <typename T>
inline const T* getAggregatedValue(std::string const& name) {
return (T*)_conductorAggregators->getAggregatedValue(name);
return (T*)_readAggregators->getAggregatedValue(name);
}
virtual void preApplication(){};
@ -58,7 +58,7 @@ class WorkerContext {
virtual void postApplication(){};
public:
WorkerContext(VPackSlice params) {}
WorkerContext() {}
virtual ~WorkerContext() {}
inline uint64_t vertexCount() const { return _vertexCount; }