1
0
Fork 0

Fixed DMID

This commit is contained in:
Simon Grätzer 2017-02-21 13:15:35 +01:00
parent 2e912a1743
commit 41bc0da58e
9 changed files with 231 additions and 158 deletions

View File

@ -373,7 +373,7 @@ SET(ARANGOD_SOURCES
Pregel/Algos/SCC.cpp
Pregel/Algos/AsyncSCC.cpp
Pregel/Algos/HITS.cpp
# Pregel/Algos/DMID/DMID.cpp
Pregel/Algos/DMID/DMID.cpp
Pregel/Algos/EffectiveCloseness/EffectiveCloseness.cpp
Pregel/Algos/EffectiveCloseness/HLLCounter.cpp
Pregel/Conductor.cpp

View File

@ -31,6 +31,7 @@
#include "Pregel/Algos/SSSP.h"
#include "Pregel/Algos/ShortestPath.h"
#include "Pregel/Algos/HITS.h"
#include "Pregel/Algos/DMID/DMID.h"
#include "Pregel/Utils.h"
using namespace arangodb;
@ -58,6 +59,8 @@ IAlgorithm* AlgoRegistry::createAlgorithm(std::string const& algorithm,
return new algos::AsyncSCC(userParams);
} else if (algorithm == "hits") {
return new algos::HITS(userParams);
} else if (algorithm == "dmid") {
return new algos::DMID(userParams);
} else {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER,
"Unsupported Algorithm");
@ -107,7 +110,9 @@ IWorker* AlgoRegistry::createWorker(TRI_vocbase_t* vocbase, VPackSlice body) {
return createWorker(vocbase, new algos::AsyncSCC(userParams), body);
} else if (algorithm == "hits") {
return createWorker(vocbase, new algos::HITS(userParams), body);
} else {
} else if (algorithm == "dmid") {
return createWorker(vocbase, new algos::DMID(userParams), body);
} else {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER,
"Unsupported Algorithm");
}

View File

@ -30,6 +30,8 @@
#include "Pregel/MasterContext.h"
#include "Pregel/VertexComputation.h"
#include "Pregel/Algos/DMID/VertexSumAggregator.h"
#include "Pregel/Algos/DMID/DMIDMessageFormat.h"
#include <cmath>
using namespace arangodb;
using namespace arangodb::pregel;
@ -79,9 +81,14 @@ static std::string const ITERATION_AGG = "aggIT";
*/
static std::string const PROFITABILITY_AGG = "aggProfit";
/** Maximum steps for the random walk, corresponds to t*. Default = 1000 */
static uint64_t const long RW_ITERATIONBOUND = 10;
static std::string const RESTART_COUNTER_AGG = "aggRestart";
/** Maximum steps for the random walk, corresponds to t*. Default = 1000 */
static uint64_t const RW_ITERATIONBOUND = 10;
static const double PROFTIABILITY_DELTA = 0.3;
static const bool LOG_AGGS = false;
struct DMIDComputation
: public VertexComputation<DMIDValue, float, DMIDMessage> {
@ -111,9 +118,9 @@ struct DMIDComputation
superstepRW(messages);
}
long rwFinished = RW_ITERATIONBOUND + 4;
uint64_t rwFinished = RW_ITERATIONBOUND + 4;
if (globalSuperstep() == rwFinished) {
superstep4(vertex, messages);
superstep4(messages);
}
if (globalSuperstep() == rwFinished +1) {
@ -133,18 +140,18 @@ struct DMIDComputation
}
int64_t iterationCounter = getAggregatedValue<int64_t>(ITERATION_AGG);
int64_t const* iterationCounter = getAggregatedValue<int64_t>(ITERATION_AGG);
if (globalSuperstep() >= rwFinished +4
&& (iterationCounter % 3 == 1 )) {
&& (*iterationCounter % 3 == 1 )) {
superstep8(messages);
}
if (globalSuperstep() >= rwFinished +5
&& (iterationCounter % 3 == 2 )) {
&& (*iterationCounter % 3 == 2 )) {
superstep9(messages);
}
if (globalSuperstep() >= rwFinished +6
&& (iterationCounter % 3 == 0 )) {
&& (*iterationCounter % 3 == 0 )) {
superstep10(messages);
}
}
@ -153,11 +160,11 @@ struct DMIDComputation
* SUPERSTEP 0: send a message along all outgoing edges. Message contains
* own VertexID and the edge weight.
*/
void superstep0(MessageIterator<DMIDMessage> const& messages messages) {
void superstep0(MessageIterator<DMIDMessage> const& messages) {
DMIDMessage message(pregelId(), 0);
RangeIterator<Edge<int64_t>> edges = getEdges();
for (Edge<int64_t>* edge : edges) {
message.value = *edge->data(); // edge weight
RangeIterator<Edge<float>> edges = getEdges();
for (Edge<float> *edge : edges) {
message.weight = *edge->data(); // edge weight
sendMessage(edge, message);
}
}
@ -171,7 +178,7 @@ struct DMIDComputation
float weightedInDegree = 0.0;
/** vertices that need a reply containing this vertexs weighted indegree */
std::set<PregelId> predecessors);
std::set<PregelID> predecessors;
for (DMIDMessage const* message : messages) {
/**
@ -180,15 +187,15 @@ struct DMIDComputation
* was send by msg.getSourceVertexId()
*
*/
predecessors.push(message->senderId);
weightedInDegree += message->value;
predecessors.insert(message->senderId);
weightedInDegree += message->weight;
}
/** update new weightedInDegree */
mutableVertexValue()->weightedInDegree = weightedInDegree;
mutableVertexData()->weightedInDegree = weightedInDegree;
// send to all predecessors
DMIDMessage message(pregelId(), weightedInDegree);
for (PregelId const& pid : predecessors) {
for (PregelID const& pid : predecessors) {
sendMessage(pid, message);
}
}
@ -211,11 +218,11 @@ struct DMIDComputation
for (DMIDMessage const* message : messages) {
/** Weight= weightedInDegree */
float senderWeight = msg.getValue();
float senderWeight = message->weight;
float disValue = fabs(ownWeight - senderWeight);
disSum += disValue;
/** disValue = disassortativity value of senderID and ownID */
vertexState->disCol[msg->pregelId] = disValue;
vertexState->disCol[message->senderId] = disValue;
}
/**
* Normalize the new disCol (Note: a new Vector is automatically
@ -233,7 +240,7 @@ struct DMIDComputation
* */
VertexSumAggregator *agg = (VertexSumAggregator*)getAggregator(DA_AGG);
agg->aggregate(this->shard(), this->key(), 1.0 / context->vertexCount());
agg->aggregate(this->shard(), this->key(), 1.0 / context()->vertexCount());
//DoubleDenseVector init = new DoubleDenseVector(
// (int) getTotalNumVertices());
//init.set((int) vertex.getId().get(), (double) 1.0
@ -263,14 +270,14 @@ struct DMIDComputation
curDA->forEach([&](PregelID const& _id, double entry) {
newEntryDA += entry * vertexState->disCol[_id];
});
curDA->aggregateValue(this->shard(), this->key(), newEntryDA);
curDA->aggregate(this->shard(), this->key(), newEntryDA);
}
/**
* SUPERSTEP RW_ITERATIONBOUND+4: Calculate entry LS_ownID using DA^t* and
* weightedInDegree. Save entry in the LS aggregator.
*/
private void superstep4(MessageIterator<DMIDMessage> const& messages) {
void superstep4(MessageIterator<DMIDMessage> const& messages) {
DMIDValue* vertexState = mutableVertexData();
VertexSumAggregator *finalDA = (VertexSumAggregator*)getAggregator(DA_AGG);
@ -296,16 +303,16 @@ struct DMIDComputation
* value on the sender. The influence v-i has on v-j is (LS-i * w-ji) where
* w-ji is the weight of the edge from v-j to v-i.
* */
private void superstep6(MessageIterator<DMIDMessage> const& messages) {
void superstep6(MessageIterator<DMIDMessage> const& messages) {
//DoubleDenseVector vecLS = getAggregatedValue(LS_AGG);
VertexSumAggregator *vecLS = (VertexSumAggregator*)getAggregator(LS_AGG);
for (DMIDMessage const* message : messages) {
PregelID senderID = message->senderId();
PregelID senderID = message->senderId;
/** Weight= weightedInDegree */
float senderWeight = message->value();
float senderWeight = message->weight;
float myInfluence = senderWeight * vecLS->getAggregatedValue(this->shard(), this->key());
@ -314,7 +321,7 @@ struct DMIDComputation
*/
bool hasEdgeToSender = false;
for (Edge<float> *edge : getEdges()) {
if (edge->targetShard() == senderID.shard && edge->key() == senderID.key) {
if (edge->targetShard() == senderID.shard && edge->toKey() == senderID.key) {
hasEdgeToSender = true;
/**
@ -345,7 +352,7 @@ struct DMIDComputation
* There may be more then one local leader. Add 1/k to the FollowerDegree
* (aggregator) of the k local leaders found.
**/
private void superstep7(MessageIterator<DMIDMessage> const& messages) {
void superstep7(MessageIterator<DMIDMessage> const& messages) {
/** maximum influence on this vertex */
float maxInfValue = 0;
@ -356,8 +363,8 @@ struct DMIDComputation
/** Find possible local leader */
for (DMIDMessage const* message : messages) {
if (message->value >= maxInfValue) {
if (message->value > maxInfValue) {
if (message->weight >= maxInfValue) {
if (message->weight > maxInfValue) {
/** new distinct leader found. Clear set */
leaderSet.clear();
}
@ -412,9 +419,9 @@ struct DMIDComputation
* specific communities.
**/
it = vertexState->membershipDegree.find(this->pregelId());
auto const& it2 = vertexState->membershipDegree.find(this->pregelId());
/** In case of first init test again if vertex is leader */
if (it == vertexState->membershipDegree.end()) {
if (it2 == vertexState->membershipDegree.end()) {
for (auto const& pair : vertexState->membershipDegree) {
/**
@ -427,16 +434,16 @@ struct DMIDComputation
}
} else {
voteToHalt();
voteHalt();
}
} else {
/** All vertices are assigned to at least one community */
/** TERMINATION */
voteToHalt();
voteHalt();
}
} else {
voteToHalt();
voteHalt();
}
}
@ -444,7 +451,7 @@ struct DMIDComputation
* SUPERSTEP RW_IT+9: Second iteration point of the cascading behavior
* phase.
**/
private void superstep9(MessageIterator<DMIDMessage> const& messages) {
void superstep9(MessageIterator<DMIDMessage> const& messages) {
DMIDValue* vertexState = mutableVertexData();
/**
@ -459,8 +466,8 @@ struct DMIDComputation
*/
if (vertexState->membershipDegree[leaderID] != 0.0) {
DMIDMessage message(pregelId(), leaderID);
sendMessage(message->senderId, leaderID);
DMIDMessage data(pregelId(), leaderID);
sendMessage(message->senderId, data);
//LongDoubleMessage answerMsg = new LongDoubleMessage(vertex
// .getId().get(), leaderID);
@ -474,12 +481,72 @@ struct DMIDComputation
* SUPERSTEP RW_IT+10: Third iteration point of the cascading behavior
* phase.
**/
abstract void superstep10(MessageIterator<DMIDMessage> const& messages);
void superstep10(MessageIterator<DMIDMessage> const& messages) {
//long vertexID = vertex.getId().get();
DMIDValue* vertexState = mutableVertexData();
auto const& it = vertexState->membershipDegree.find(this->pregelId());
/** Is this vertex a global leader? */
if (it == vertexState->membershipDegree.end()) {//!vertex.getValue().getMembershipDegree().containsKey(vertexID)
/** counts per communities the number of successors which are member */
std::map<PregelID, float> membershipCounter;
//double previousCount = 0.0;
for (DMIDMessage const* message : messages) {
/**
* the msg value is the index of the community the sender is a
* member of
*/
//Long leaderID = ((long) msg.getValue());
PregelID const& leaderID = message->leaderId;
// .containsKey(leaderID)
if (membershipCounter.find(leaderID) != membershipCounter.end()) {
/** increase count by 1 */
membershipCounter[leaderID] += 1;//.get(leaderID);
//membershipCounter.put(leaderID, previousCount + 1);
} else {
membershipCounter[leaderID] = 1.0;
}
}
/** profitability threshold */
float const* threshold = getAggregatedValue<float>(PROFITABILITY_AGG);
int64_t const* iterationCounter = getAggregatedValue<int64_t>(ITERATION_AGG);
// Map.Entry<Long, Double> entry : membershipCounter.entrySet()
for (std::pair<PregelID, float> const& pair : membershipCounter) {
if ((pair.second / getEdges().size()) > *threshold) {
/** its profitable to become a member, set value */
vertexState->membershipDegree[pair.first] = 1.0 / std::pow(*iterationCounter / 3, 2);
aggregate<bool>(NEW_MEMBER_AGG, true);
}
}
/* vertex.getValue().setBestValidMemDeg(vertex.getValue()
.getMembershipDegree());
*/
bool isPartOfAnyCommunity = false;
// Map.Entry<Long, Double> entry : vertex.getValue().getMembershipDegree().entrySet()
for (auto const& pair : vertexState->membershipDegree) {
if (pair.second != 0.0) {
isPartOfAnyCommunity = true;
}
}
if (!isPartOfAnyCommunity) {
aggregate<bool>(NOT_ALL_ASSIGNED_AGG, true);
}
} else{
voteHalt();
}
}
/**
* Initialize the MembershipDegree vector.
**/
private void initilaizeMemDeg() {
void initilaizeMemDeg() {
DMIDValue* vertexState = mutableVertexData();
VertexSumAggregator *vecGL = (VertexSumAggregator*)getAggregator(GL_AGG);
@ -506,19 +573,18 @@ struct DMIDComputation
// vertexState->membershipDegree[this->pregelId] = 1.0;
//}
}
}
};
VertexComputation<DMIDValue, int32_t, int64_t>*
VertexComputation<DMIDValue, float, DMIDMessage>*
DMID::createComputation(WorkerConfig const* config) const {
return new DMIDComputation();
}
struct SCCGraphFormat : public GraphFormat<DMIDValue, int32_t> {
struct DMIDGraphFormat : public GraphFormat<DMIDValue, float> {
const std::string _resultField;
uint64_t vertexIdRange = 0;
SCCGraphFormat(std::string const& result) : _resultField(result) {}
DMIDGraphFormat(std::string const& result) : _resultField(result) {}
void willLoadVertices(uint64_t count) override {
// if we aren't running in a cluster it doesn't matter
@ -530,40 +596,44 @@ struct SCCGraphFormat : public GraphFormat<DMIDValue, int32_t> {
}
}
size_t estimatedEdgeSize() const override { return 0; };
size_t copyVertexData(std::string const& documentId,
arangodb::velocypack::Slice document,
SCCValue* targetPtr, size_t maxSize) override {
SCCValue* senders = (SCCValue*)targetPtr;
senders->vertexID = vertexIdRange++;
DMIDValue* value, size_t maxSize) override {
//SCCValue* senders = (SCCValue*)targetPtr;
//senders->vertexID = vertexIdRange++;
return sizeof(SCCValue);
}
size_t copyEdgeData(arangodb::velocypack::Slice document, int32_t* targetPtr,
size_t copyEdgeData(arangodb::velocypack::Slice document, float* targetPtr,
size_t maxSize) override {
return 0;
*targetPtr = 1.0f;
return sizeof(float);
}
bool buildVertexDocument(arangodb::velocypack::Builder& b,
const SCCValue* ptr, size_t size) const override {
SCCValue* senders = (SCCValue*)ptr;
b.add(_resultField, VPackValue(senders->color));
const DMIDValue* ptr, size_t size) const override {
if (ptr->membershipDegree.size() > 0) {
b.add(_resultField, VPackValue(VPackValueType::Array));
for (auto const& pair : ptr->membershipDegree) {
b.add(pair.first.key, VPackValue(pair.second));
}
b.close();
}
return true;
}
bool buildEdgeDocument(arangodb::velocypack::Builder& b, const int32_t* ptr,
bool buildEdgeDocument(arangodb::velocypack::Builder& b, const float* ptr,
size_t size) const override {
return false;
}
};
GraphFormat<SCCValue, int32_t>* SCC::inputFormat() const {
return new DMIDValueGraphFormat(_resultField);
GraphFormat<DMIDValue, float>* DMID::inputFormat() const {
return new DMIDGraphFormat(_resultField);
}
struct DMIDValueMasterContext : public MasterContext {
DMIDValueMasterContext() {} // TODO use _threashold
struct DMIDMasterContext : public MasterContext {
DMIDMasterContext() {} // TODO use _threashold
void preGlobalSuperstep() override {
/**
@ -573,38 +643,38 @@ struct DMIDValueMasterContext : public MasterContext {
*/
int64_t* iterCount = getAggregatedValue<int64_t>(ITERATION_AGG);
int64_t const* iterCount = getAggregatedValue<int64_t>(ITERATION_AGG);
int64_t newIterCount = *iterCount + 1;
bool hasCascadingStarted = false;
if (*iterCount != 0) {
/** Cascading behavior started increment the iteration count */
aggregateValue<int64_t>(ITERATION_AGG, newIterCount);
aggregate<int64_t>(ITERATION_AGG, newIterCount);
hasCascadingStarted = true;
}
if (getSuperstep() == RW_ITERATIONBOUND+ 8) {
aggregateValue<bool>(NEW_MEMBER_AGG, false);
aggregateValue<bool>(NOT_ALL_ASSIGNED_AGG, true);
aggregateValue<int64_t>(ITERATION_AGG, 1);
if (globalSuperstep() == RW_ITERATIONBOUND+ 8) {
aggregate<bool>(NEW_MEMBER_AGG, false);
aggregate<bool>(NOT_ALL_ASSIGNED_AGG, true);
aggregate<int64_t>(ITERATION_AGG, 1);
hasCascadingStarted = true;
initializeGL();
}
if (hasCascadingStarted && (newIterCount % 3 == 1)) {
/** first step of one iteration */
int64_t* restartCountWritable = getAggregatedValue<int64_t>(RESTART_COUNTER_AGG);
Long restartCount = restartCountWritable.get();
BooleanWritable newMember = getAggregatedValue(DMIDComputation.NEW_MEMBER_AGG);
BooleanWritable notAllAssigned = getAggregatedValue(DMIDComputation.NOT_ALL_ASSIGNED_AGG);
int64_t const* restartCountWritable = getAggregatedValue<int64_t>(RESTART_COUNTER_AGG);
int64_t restartCount = *restartCountWritable;
bool const* newMember = getAggregatedValue<bool>(NEW_MEMBER_AGG);
bool const* notAllAssigned = getAggregatedValue<bool>(NOT_ALL_ASSIGNED_AGG);
if ((notAllAssigned.get() == true) && (newMember.get() == false)) {
if ((*notAllAssigned == true) && (*newMember == false)) {
/**
* RESTART Cascading Behavior with lower profitability threshold
*/
float newThreshold = 1 - (PROFTIABILITY_DELTA * (restartCount + 1));
setAggregatedValue<int64_t>(RESTART_COUNTER_AGG, restartCount + 1);
setAggregatedValue<float>(PROFITABILITY_AGG, newThreshold);
setAggregatedValue<int64_t>(ITERATION_AGG, 1);
aggregate<int64_t>(RESTART_COUNTER_AGG, restartCount + 1);
aggregate<float>(PROFITABILITY_AGG, newThreshold);
aggregate<int64_t>(ITERATION_AGG, 1);
}
}
@ -616,36 +686,24 @@ struct DMIDValueMasterContext : public MasterContext {
* initial value
*/
setAggregatedValue<bool>(NEW_MEMBER_AGG, false);
setAggregatedValue<bool>(NOT_ALL_ASSIGNED_AGG, false);
aggregate<bool>(NEW_MEMBER_AGG, false);
aggregate<bool>(NOT_ALL_ASSIGNED_AGG, false);
}
if (LOG_AGGS) {
if (getSuperstep() <= DMIDComputation.RW_ITERATIONBOUND + 4) {
DoubleDenseVector convergedDA = getAggregatedValue(DMIDComputation.DA_AGG);
System.out.print("Aggregator DA at step: "+getSuperstep()+" \nsize="
+ getTotalNumVertices() + "\n{ ");
for (int i = 0; i < getTotalNumVertices(); ++i) {
System.out.print(convergedDA.get(i));
if (i != getTotalNumVertices() - 1) {
System.out.print(" , ");
} else {
System.out.println(" }\n");
}
}
if (globalSuperstep() <= RW_ITERATIONBOUND + 4) {
VertexSumAggregator *convergedDA = (VertexSumAggregator*)getAggregator(DA_AGG);
LOG_TOPIC(INFO, Logger::PREGEL) << "Aggregator DA at step: " << globalSuperstep();
convergedDA->forEach([&](PregelID const& _id, double entry) {
LOG_TOPIC(INFO, Logger::PREGEL) << _id.key;
});
}
if (getSuperstep() == DMIDComputation.RW_ITERATIONBOUND +6) {
DoubleDenseVector leadershipVector = getAggregatedValue(DMIDComputation.LS_AGG);
System.out.print("Aggregator LS: \nsize="
+ getTotalNumVertices() + "\n{ ");
for (int i = 0; i < getTotalNumVertices(); ++i) {
System.out.print(leadershipVector.get(i));
if (i != getTotalNumVertices() - 1) {
System.out.print(" , ");
} else {
System.out.println(" }\n");
}
}
if (globalSuperstep() == RW_ITERATIONBOUND +6) {
VertexSumAggregator *leadershipVector = (VertexSumAggregator*)getAggregator(LS_AGG);
leadershipVector->forEach([&](PregelID const& _id, double entry) {
LOG_TOPIC(INFO, Logger::PREGEL) << "Aggregator LS:" << _id.key;
});
}
}
}
@ -655,54 +713,46 @@ struct DMIDValueMasterContext : public MasterContext {
* higher number of followers than the average.
*/
void initializeGL() {
DoubleSparseVector initGL = new DoubleSparseVector(
(int) getTotalNumVertices());
/** set Global Leader aggregator */
VertexSumAggregator *initGL = (VertexSumAggregator*)getAggregator(GL_AGG);
VertexSumAggregator *vecFD = (VertexSumAggregator*)getAggregator(FD_AGG);
double averageFD = 0.0;
int numLocalLeader = 0;
/** get averageFollower degree */
f
for (int i = 0; i < getTotalNumVertices(); ++i) {
averageFD += vecFD.get(i);
if (vecFD.get(i) != 0) {
vecFD->forEach([&](PregelID const& _id, double entry) {
averageFD += entry;
if (entry != 0) {
numLocalLeader++;
}
}
});
if (numLocalLeader != 0) {
averageFD = (double) averageFD / numLocalLeader;
}
/** set flag for globalLeader */
if (LOG_AGGS) {
System.out.print("Global Leader:");
}
for (int i = 0; i < vertexCount(); ++i) {
if (vecFD.get(i) > averageFD) {
initGL.set(i, 1.0);
if (LOG_AGGS) {
System.out.print(" " + i + " ");
}
//if (LOG_AGGS) {
// System.out.print("Global Leader:");
//}
vecFD->forEach([&](PregelID const& _id, double entry) {
if (entry > averageFD) {
initGL->aggregate(_id.shard, _id.key, 1.0);
LOG_TOPIC(INFO, Logger::PREGEL) << "Leader " << _id.key;
}
}
if (LOG_AGGS) {
System.out.println("\n");
}
/** set Global Leader aggregator */
setAggregatedValue(DMIDComputation.GL_AGG, initGL);
});
//setAggregatedValue(DMIDComputation.GL_AGG, initGL);
/** set not all vertices assigned aggregator to true */
setAggregatedValue(DMIDComputation.NOT_ALL_ASSIGNED_AGG,
new BooleanWritable(true));
aggregate<bool>(NOT_ALL_ASSIGNED_AGG, true);
}
};
MasterContext* SCC::masterContext(VPackSlice userParams) const {
return new SCCMasterContext();
MasterContext* DMID::masterContext(VPackSlice userParams) const {
return new DMIDMasterContext();
}
IAggregator* SCC::aggregator(std::string const& name) const {
IAggregator* DMID::aggregator(std::string const& name) const {
if (name == DA_AGG) { // permanent value
return new VertexSumAggregator(false);// non perm
} else if (name == LS_AGG) {
@ -712,9 +762,9 @@ IAggregator* SCC::aggregator(std::string const& name) const {
} else if (name == GL_AGG) {
return new VertexSumAggregator(true);// perm
} else if (name == NEW_MEMBER_AGG) {
return new BooleanOrAggregator(false); // non perm
return new BoolOrAggregator(false); // non perm
} else if (name == NOT_ALL_ASSIGNED_AGG) {
return new BooleanOrAggregator(false); // non perm
return new BoolOrAggregator(false); // non perm
} else if (name == ITERATION_AGG) {
return new MaxAggregator<int64_t>(0, true); // perm
} else if (name == PROFITABILITY_AGG) {
@ -725,3 +775,7 @@ IAggregator* SCC::aggregator(std::string const& name) const {
return nullptr;
}
MessageFormat<DMIDMessage>* DMID::messageFormat() const {
return new DMIDMessageFormat();
}

View File

@ -20,8 +20,8 @@
/// @author Simon Grätzer
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGODB_PREGEL_ALGOS_SCC_H
#define ARANGODB_PREGEL_ALGOS_SCC_H 1
#ifndef ARANGODB_PREGEL_ALGOS_DMID_H
#define ARANGODB_PREGEL_ALGOS_DMID_H 1
#include "Pregel/Algorithm.h"
#include "Pregel/CommonFormats.h"

View File

@ -35,7 +35,6 @@ namespace arangodb {
namespace pregel {
struct DMIDMessageFormat : public MessageFormat<DMIDMessage> {
static_assert(std::is_arithmetic<T>::value, "Message type must be numeric");
DMIDMessageFormat() {}
void unwrapValue(VPackSlice s, DMIDMessage& message) const override {
VPackArrayIterator array(s);
@ -43,7 +42,7 @@ struct DMIDMessageFormat : public MessageFormat<DMIDMessage> {
message.senderId.key = (*(++array)).copyString();
message.leaderId.shard = (*array).getUInt();
message.leaderId.key = (*(++array)).copyString();
message.value = (*(++array)).getNumber<float>();
message.weight = (*(++array)).getNumber<float>();
}
void addValue(VPackBuilder& arrayBuilder,
DMIDMessage const& message) const override {
@ -52,7 +51,7 @@ struct DMIDMessageFormat : public MessageFormat<DMIDMessage> {
arrayBuilder.add(VPackValue(message.senderId.key));
arrayBuilder.add(VPackValue(message.leaderId.shard));
arrayBuilder.add(VPackValue(message.leaderId.key));
arrayBuilder.add(VPackValue(message.value));
arrayBuilder.add(VPackValue(message.weight));
arrayBuilder.close();
}
};

View File

@ -22,9 +22,10 @@
#include <vector>
#include <velocypack/Builder.h>
#include <velocypack/Iterators.h>
#include <velocypack/Iterator.h>
#include <velocypack/Slice.h>
#include <velocypack/velocypack-aliases.h>
#include <string>
#include "Pregel/Graph.h"
#ifndef ARANGODB_PREGEL_AGG_DENSE_VECTOR_H
@ -34,17 +35,17 @@ namespace arangodb {
namespace pregel {
struct VertexSumAggregator : public IAggregator {
static_assert(std::is_arithmetic<T>::value, "Type must be numeric");
typedef std::map<prgl_shard_t, std::unordered_map<std::string, double>> VertexMap;
typedef std::pair<prgl_shard_t, std::unordered_map<std::string, double>> MyPair;
VertexSumAggregator(bool perm = false)
: _empty(empty), _permanent(perm) {}
: _permanent(perm) {}
// if you use this from a vertex I will end you
void aggregate(void const* valuePtr) {
void aggregate(void const* valuePtr) override {
VertexMap const* map = (VertexMap const*)valuePtr;
for (auto pair1 const& : map) {
for (auto pair2 const& : it.second) {
for (MyPair const& pair1 : *map) {
for (auto const& pair2 : pair1.second) {
_entries[pair1.first][pair2.first] += pair2.second;
}
}
@ -52,14 +53,14 @@ struct VertexSumAggregator : public IAggregator {
void parseAggregate(VPackSlice const& slice) override {
for (auto const& pair: VPackObjectIterator(slice)) {
prgl_shard_t shard = it.key.getUInt();
prgl_shard_t shard = std::stoi(pair.key.copyString());
std::string key;
VPackLength i = 0;
VPackValueLength i = 0;
for (VPackSlice const& val : VPackArrayIterator(pair.value)) {
if (i % 2 == 0) {
key = val.copyString();
} else {
entries[shard][key] += val.getNumber<double>();
_entries[shard][key] += val.getNumber<double>();
}
i++;
}
@ -68,16 +69,16 @@ struct VertexSumAggregator : public IAggregator {
void const* getAggregatedValue() const override { return &_entries; };
void setAggregatedValue(VPackSlice slice) override {
for (auto const& pair: VPackObjectIterator(slice)) {
prgl_shard_t shard = it.key.getUInt();
void setAggregatedValue(VPackSlice const& slice) override {
for (auto const& pair : VPackObjectIterator(slice)) {
prgl_shard_t shard = std::stoi(pair.key.copyString());
std::string key;
VPackLength i = 0;
VPackValueLength i = 0;
for (VPackSlice const& val : VPackArrayIterator(pair.value)) {
if (i % 2 == 0) {
key = val.copyString();
} else {
entries[shard][key] = val.getNumber<double>();
_entries[shard][key] = val.getNumber<double>();
}
i++;
}
@ -85,9 +86,14 @@ struct VertexSumAggregator : public IAggregator {
}
void serialize(std::string const& key, VPackBuilder &builder) const override {
builder.add(key, VPackValueType::Array);
for (T const& e : _entries) {
builder.add(VPackValue(e));
builder.add(key, VPackValue(VPackValueType::Object));
for (auto const& pair1 : _entries) {
builder.add(std::to_string(pair1.first), VPackValue(VPackValueType::Array));
for (auto const& pair2 : pair1.second) {
builder.add(VPackValue(pair2.first));
builder.add(VPackValue(pair2.second));
}
builder.close();
}
builder.close();
};
@ -101,12 +107,12 @@ struct VertexSumAggregator : public IAggregator {
double getAggregatedValue(prgl_shard_t shard, std::string const& key) {
auto const& it1 = _entries.find(shard);
if (it1 != _entries.end()) {
auto const& it2 = it1.second.find(key);
if (it2 != it1.second.end()) {
return it2.second;
auto const& it2 = it1->second.find(key);
if (it2 != it1->second.end()) {
return it2->second;
}
}
return _empty;
return _default;
}
//void setValue(prgl_shard_t shard, std::string const& key, double val) {
@ -139,4 +145,5 @@ struct VertexSumAggregator : public IAggregator {
bool _permanent;
};
}
}
#endif

View File

@ -39,10 +39,14 @@ struct PregelID {
PregelID() : shard(0), key("") {}
PregelID(prgl_shard_t s, std::string const& k) : shard(s), key(k) {}
inline bool operator==(const PregelID& rhs) {
inline bool operator==(const PregelID& rhs) const {
return shard == rhs.shard && key == rhs.key;
}
inline bool operator<(const PregelID& rhs) const {
return shard < rhs.shard || (shard == rhs.shard && key < rhs.key);
}
bool inline isValid() const {
return shard != invalid_prgl_shard && !key.empty();
}

View File

@ -68,7 +68,7 @@ class MessageIterator {
// postfix ++
MessageIterator operator++(int) {
MessageIterator result(_data, _size);
MessageIterator result(*this);
++(*this);
return result;
}

View File

@ -62,6 +62,10 @@ class MasterContext {
inline const T* getAggregatedValue(std::string const& name) {
return (const T*)_aggregators->getAggregatedValue(name);
}
inline IAggregator* getAggregator(std::string const& name) {
return _aggregators->getAggregator(name);
}
inline void enterNextGlobalSuperstep() { _enterNextGSS = true; }