diff --git a/arangod/CMakeLists.txt b/arangod/CMakeLists.txt index ffacbbd70f..8561d9e9c9 100644 --- a/arangod/CMakeLists.txt +++ b/arangod/CMakeLists.txt @@ -373,7 +373,7 @@ SET(ARANGOD_SOURCES Pregel/Algos/SCC.cpp Pregel/Algos/AsyncSCC.cpp Pregel/Algos/HITS.cpp -# Pregel/Algos/DMID/DMID.cpp + Pregel/Algos/DMID/DMID.cpp Pregel/Algos/EffectiveCloseness/EffectiveCloseness.cpp Pregel/Algos/EffectiveCloseness/HLLCounter.cpp Pregel/Conductor.cpp diff --git a/arangod/Pregel/AlgoRegistry.cpp b/arangod/Pregel/AlgoRegistry.cpp index c3f34c9db4..fced8e6a02 100644 --- a/arangod/Pregel/AlgoRegistry.cpp +++ b/arangod/Pregel/AlgoRegistry.cpp @@ -31,6 +31,7 @@ #include "Pregel/Algos/SSSP.h" #include "Pregel/Algos/ShortestPath.h" #include "Pregel/Algos/HITS.h" +#include "Pregel/Algos/DMID/DMID.h" #include "Pregel/Utils.h" using namespace arangodb; @@ -58,6 +59,8 @@ IAlgorithm* AlgoRegistry::createAlgorithm(std::string const& algorithm, return new algos::AsyncSCC(userParams); } else if (algorithm == "hits") { return new algos::HITS(userParams); + } else if (algorithm == "dmid") { + return new algos::DMID(userParams); } else { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER, "Unsupported Algorithm"); @@ -107,7 +110,9 @@ IWorker* AlgoRegistry::createWorker(TRI_vocbase_t* vocbase, VPackSlice body) { return createWorker(vocbase, new algos::AsyncSCC(userParams), body); } else if (algorithm == "hits") { return createWorker(vocbase, new algos::HITS(userParams), body); - } else { + } else if (algorithm == "dmid") { + return createWorker(vocbase, new algos::DMID(userParams), body); + } else { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER, "Unsupported Algorithm"); } diff --git a/arangod/Pregel/Algos/DMID/DMID.cpp b/arangod/Pregel/Algos/DMID/DMID.cpp index f208c6f52d..d939372ef7 100644 --- a/arangod/Pregel/Algos/DMID/DMID.cpp +++ b/arangod/Pregel/Algos/DMID/DMID.cpp @@ -30,6 +30,8 @@ #include "Pregel/MasterContext.h" #include "Pregel/VertexComputation.h" #include "Pregel/Algos/DMID/VertexSumAggregator.h" +#include "Pregel/Algos/DMID/DMIDMessageFormat.h" +#include using namespace arangodb; using namespace arangodb::pregel; @@ -79,9 +81,14 @@ static std::string const ITERATION_AGG = "aggIT"; */ static std::string const PROFITABILITY_AGG = "aggProfit"; -/** Maximum steps for the random walk, corresponds to t*. Default = 1000 */ -static uint64_t const long RW_ITERATIONBOUND = 10; +static std::string const RESTART_COUNTER_AGG = "aggRestart"; +/** Maximum steps for the random walk, corresponds to t*. Default = 1000 */ +static uint64_t const RW_ITERATIONBOUND = 10; + +static const double PROFTIABILITY_DELTA = 0.3; + +static const bool LOG_AGGS = false; struct DMIDComputation : public VertexComputation { @@ -111,9 +118,9 @@ struct DMIDComputation superstepRW(messages); } - long rwFinished = RW_ITERATIONBOUND + 4; + uint64_t rwFinished = RW_ITERATIONBOUND + 4; if (globalSuperstep() == rwFinished) { - superstep4(vertex, messages); + superstep4(messages); } if (globalSuperstep() == rwFinished +1) { @@ -133,18 +140,18 @@ struct DMIDComputation } - int64_t iterationCounter = getAggregatedValue(ITERATION_AGG); + int64_t const* iterationCounter = getAggregatedValue(ITERATION_AGG); if (globalSuperstep() >= rwFinished +4 - && (iterationCounter % 3 == 1 )) { + && (*iterationCounter % 3 == 1 )) { superstep8(messages); } if (globalSuperstep() >= rwFinished +5 - && (iterationCounter % 3 == 2 )) { + && (*iterationCounter % 3 == 2 )) { superstep9(messages); } if (globalSuperstep() >= rwFinished +6 - && (iterationCounter % 3 == 0 )) { + && (*iterationCounter % 3 == 0 )) { superstep10(messages); } } @@ -153,11 +160,11 @@ struct DMIDComputation * SUPERSTEP 0: send a message along all outgoing edges. Message contains * own VertexID and the edge weight. */ - void superstep0(MessageIterator const& messages messages) { + void superstep0(MessageIterator const& messages) { DMIDMessage message(pregelId(), 0); - RangeIterator> edges = getEdges(); - for (Edge* edge : edges) { - message.value = *edge->data(); // edge weight + RangeIterator> edges = getEdges(); + for (Edge *edge : edges) { + message.weight = *edge->data(); // edge weight sendMessage(edge, message); } } @@ -171,7 +178,7 @@ struct DMIDComputation float weightedInDegree = 0.0; /** vertices that need a reply containing this vertexs weighted indegree */ - std::set predecessors); + std::set predecessors; for (DMIDMessage const* message : messages) { /** @@ -180,15 +187,15 @@ struct DMIDComputation * was send by msg.getSourceVertexId() * */ - predecessors.push(message->senderId); - weightedInDegree += message->value; + predecessors.insert(message->senderId); + weightedInDegree += message->weight; } /** update new weightedInDegree */ - mutableVertexValue()->weightedInDegree = weightedInDegree; + mutableVertexData()->weightedInDegree = weightedInDegree; // send to all predecessors DMIDMessage message(pregelId(), weightedInDegree); - for (PregelId const& pid : predecessors) { + for (PregelID const& pid : predecessors) { sendMessage(pid, message); } } @@ -211,11 +218,11 @@ struct DMIDComputation for (DMIDMessage const* message : messages) { /** Weight= weightedInDegree */ - float senderWeight = msg.getValue(); + float senderWeight = message->weight; float disValue = fabs(ownWeight - senderWeight); disSum += disValue; /** disValue = disassortativity value of senderID and ownID */ - vertexState->disCol[msg->pregelId] = disValue; + vertexState->disCol[message->senderId] = disValue; } /** * Normalize the new disCol (Note: a new Vector is automatically @@ -233,7 +240,7 @@ struct DMIDComputation * */ VertexSumAggregator *agg = (VertexSumAggregator*)getAggregator(DA_AGG); - agg->aggregate(this->shard(), this->key(), 1.0 / context->vertexCount()); + agg->aggregate(this->shard(), this->key(), 1.0 / context()->vertexCount()); //DoubleDenseVector init = new DoubleDenseVector( // (int) getTotalNumVertices()); //init.set((int) vertex.getId().get(), (double) 1.0 @@ -263,14 +270,14 @@ struct DMIDComputation curDA->forEach([&](PregelID const& _id, double entry) { newEntryDA += entry * vertexState->disCol[_id]; }); - curDA->aggregateValue(this->shard(), this->key(), newEntryDA); + curDA->aggregate(this->shard(), this->key(), newEntryDA); } /** * SUPERSTEP RW_ITERATIONBOUND+4: Calculate entry LS_ownID using DA^t* and * weightedInDegree. Save entry in the LS aggregator. */ - private void superstep4(MessageIterator const& messages) { + void superstep4(MessageIterator const& messages) { DMIDValue* vertexState = mutableVertexData(); VertexSumAggregator *finalDA = (VertexSumAggregator*)getAggregator(DA_AGG); @@ -296,16 +303,16 @@ struct DMIDComputation * value on the sender. The influence v-i has on v-j is (LS-i * w-ji) where * w-ji is the weight of the edge from v-j to v-i. * */ - private void superstep6(MessageIterator const& messages) { + void superstep6(MessageIterator const& messages) { //DoubleDenseVector vecLS = getAggregatedValue(LS_AGG); VertexSumAggregator *vecLS = (VertexSumAggregator*)getAggregator(LS_AGG); for (DMIDMessage const* message : messages) { - PregelID senderID = message->senderId(); + PregelID senderID = message->senderId; /** Weight= weightedInDegree */ - float senderWeight = message->value(); + float senderWeight = message->weight; float myInfluence = senderWeight * vecLS->getAggregatedValue(this->shard(), this->key()); @@ -314,7 +321,7 @@ struct DMIDComputation */ bool hasEdgeToSender = false; for (Edge *edge : getEdges()) { - if (edge->targetShard() == senderID.shard && edge->key() == senderID.key) { + if (edge->targetShard() == senderID.shard && edge->toKey() == senderID.key) { hasEdgeToSender = true; /** @@ -345,7 +352,7 @@ struct DMIDComputation * There may be more then one local leader. Add 1/k to the FollowerDegree * (aggregator) of the k local leaders found. **/ - private void superstep7(MessageIterator const& messages) { + void superstep7(MessageIterator const& messages) { /** maximum influence on this vertex */ float maxInfValue = 0; @@ -356,8 +363,8 @@ struct DMIDComputation /** Find possible local leader */ for (DMIDMessage const* message : messages) { - if (message->value >= maxInfValue) { - if (message->value > maxInfValue) { + if (message->weight >= maxInfValue) { + if (message->weight > maxInfValue) { /** new distinct leader found. Clear set */ leaderSet.clear(); } @@ -412,9 +419,9 @@ struct DMIDComputation * specific communities. **/ - it = vertexState->membershipDegree.find(this->pregelId()); + auto const& it2 = vertexState->membershipDegree.find(this->pregelId()); /** In case of first init test again if vertex is leader */ - if (it == vertexState->membershipDegree.end()) { + if (it2 == vertexState->membershipDegree.end()) { for (auto const& pair : vertexState->membershipDegree) { /** @@ -427,16 +434,16 @@ struct DMIDComputation } } else { - voteToHalt(); + voteHalt(); } } else { /** All vertices are assigned to at least one community */ /** TERMINATION */ - voteToHalt(); + voteHalt(); } } else { - voteToHalt(); + voteHalt(); } } @@ -444,7 +451,7 @@ struct DMIDComputation * SUPERSTEP RW_IT+9: Second iteration point of the cascading behavior * phase. **/ - private void superstep9(MessageIterator const& messages) { + void superstep9(MessageIterator const& messages) { DMIDValue* vertexState = mutableVertexData(); /** @@ -459,8 +466,8 @@ struct DMIDComputation */ if (vertexState->membershipDegree[leaderID] != 0.0) { - DMIDMessage message(pregelId(), leaderID); - sendMessage(message->senderId, leaderID); + DMIDMessage data(pregelId(), leaderID); + sendMessage(message->senderId, data); //LongDoubleMessage answerMsg = new LongDoubleMessage(vertex // .getId().get(), leaderID); @@ -474,12 +481,72 @@ struct DMIDComputation * SUPERSTEP RW_IT+10: Third iteration point of the cascading behavior * phase. **/ - abstract void superstep10(MessageIterator const& messages); + void superstep10(MessageIterator const& messages) { + + //long vertexID = vertex.getId().get(); + DMIDValue* vertexState = mutableVertexData(); + auto const& it = vertexState->membershipDegree.find(this->pregelId()); + + /** Is this vertex a global leader? */ + if (it == vertexState->membershipDegree.end()) {//!vertex.getValue().getMembershipDegree().containsKey(vertexID) + /** counts per communities the number of successors which are member */ + std::map membershipCounter; + //double previousCount = 0.0; + + for (DMIDMessage const* message : messages) { + /** + * the msg value is the index of the community the sender is a + * member of + */ + //Long leaderID = ((long) msg.getValue()); + PregelID const& leaderID = message->leaderId; + // .containsKey(leaderID) + if (membershipCounter.find(leaderID) != membershipCounter.end()) { + /** increase count by 1 */ + membershipCounter[leaderID] += 1;//.get(leaderID); + //membershipCounter.put(leaderID, previousCount + 1); + } else { + membershipCounter[leaderID] = 1.0; + } + } + /** profitability threshold */ + float const* threshold = getAggregatedValue(PROFITABILITY_AGG); + + int64_t const* iterationCounter = getAggregatedValue(ITERATION_AGG); + + // Map.Entry entry : membershipCounter.entrySet() + for (std::pair const& pair : membershipCounter) { + + if ((pair.second / getEdges().size()) > *threshold) { + /** its profitable to become a member, set value */ + vertexState->membershipDegree[pair.first] = 1.0 / std::pow(*iterationCounter / 3, 2); + aggregate(NEW_MEMBER_AGG, true); + } + } + /* vertex.getValue().setBestValidMemDeg(vertex.getValue() + .getMembershipDegree()); + */ + bool isPartOfAnyCommunity = false; + // Map.Entry entry : vertex.getValue().getMembershipDegree().entrySet() + for (auto const& pair : vertexState->membershipDegree) { + if (pair.second != 0.0) { + isPartOfAnyCommunity = true; + } + } + if (!isPartOfAnyCommunity) { + + aggregate(NOT_ALL_ASSIGNED_AGG, true); + } + } else{ + voteHalt(); + } + + } /** * Initialize the MembershipDegree vector. **/ - private void initilaizeMemDeg() { + void initilaizeMemDeg() { DMIDValue* vertexState = mutableVertexData(); VertexSumAggregator *vecGL = (VertexSumAggregator*)getAggregator(GL_AGG); @@ -506,19 +573,18 @@ struct DMIDComputation // vertexState->membershipDegree[this->pregelId] = 1.0; //} } - } }; -VertexComputation* +VertexComputation* DMID::createComputation(WorkerConfig const* config) const { return new DMIDComputation(); } -struct SCCGraphFormat : public GraphFormat { +struct DMIDGraphFormat : public GraphFormat { const std::string _resultField; uint64_t vertexIdRange = 0; - SCCGraphFormat(std::string const& result) : _resultField(result) {} + DMIDGraphFormat(std::string const& result) : _resultField(result) {} void willLoadVertices(uint64_t count) override { // if we aren't running in a cluster it doesn't matter @@ -530,40 +596,44 @@ struct SCCGraphFormat : public GraphFormat { } } - size_t estimatedEdgeSize() const override { return 0; }; - size_t copyVertexData(std::string const& documentId, arangodb::velocypack::Slice document, - SCCValue* targetPtr, size_t maxSize) override { - SCCValue* senders = (SCCValue*)targetPtr; - senders->vertexID = vertexIdRange++; + DMIDValue* value, size_t maxSize) override { + //SCCValue* senders = (SCCValue*)targetPtr; + //senders->vertexID = vertexIdRange++; return sizeof(SCCValue); } - size_t copyEdgeData(arangodb::velocypack::Slice document, int32_t* targetPtr, + size_t copyEdgeData(arangodb::velocypack::Slice document, float* targetPtr, size_t maxSize) override { - return 0; + *targetPtr = 1.0f; + return sizeof(float); } bool buildVertexDocument(arangodb::velocypack::Builder& b, - const SCCValue* ptr, size_t size) const override { - SCCValue* senders = (SCCValue*)ptr; - b.add(_resultField, VPackValue(senders->color)); + const DMIDValue* ptr, size_t size) const override { + if (ptr->membershipDegree.size() > 0) { + b.add(_resultField, VPackValue(VPackValueType::Array)); + for (auto const& pair : ptr->membershipDegree) { + b.add(pair.first.key, VPackValue(pair.second)); + } + b.close(); + } return true; } - bool buildEdgeDocument(arangodb::velocypack::Builder& b, const int32_t* ptr, + bool buildEdgeDocument(arangodb::velocypack::Builder& b, const float* ptr, size_t size) const override { return false; } }; -GraphFormat* SCC::inputFormat() const { - return new DMIDValueGraphFormat(_resultField); +GraphFormat* DMID::inputFormat() const { + return new DMIDGraphFormat(_resultField); } -struct DMIDValueMasterContext : public MasterContext { - DMIDValueMasterContext() {} // TODO use _threashold +struct DMIDMasterContext : public MasterContext { + DMIDMasterContext() {} // TODO use _threashold void preGlobalSuperstep() override { /** @@ -573,38 +643,38 @@ struct DMIDValueMasterContext : public MasterContext { */ - int64_t* iterCount = getAggregatedValue(ITERATION_AGG); + int64_t const* iterCount = getAggregatedValue(ITERATION_AGG); int64_t newIterCount = *iterCount + 1; bool hasCascadingStarted = false; if (*iterCount != 0) { /** Cascading behavior started increment the iteration count */ - aggregateValue(ITERATION_AGG, newIterCount); + aggregate(ITERATION_AGG, newIterCount); hasCascadingStarted = true; } - if (getSuperstep() == RW_ITERATIONBOUND+ 8) { - aggregateValue(NEW_MEMBER_AGG, false); - aggregateValue(NOT_ALL_ASSIGNED_AGG, true); - aggregateValue(ITERATION_AGG, 1); + if (globalSuperstep() == RW_ITERATIONBOUND+ 8) { + aggregate(NEW_MEMBER_AGG, false); + aggregate(NOT_ALL_ASSIGNED_AGG, true); + aggregate(ITERATION_AGG, 1); hasCascadingStarted = true; initializeGL(); } if (hasCascadingStarted && (newIterCount % 3 == 1)) { /** first step of one iteration */ - int64_t* restartCountWritable = getAggregatedValue(RESTART_COUNTER_AGG); - Long restartCount = restartCountWritable.get(); - BooleanWritable newMember = getAggregatedValue(DMIDComputation.NEW_MEMBER_AGG); - BooleanWritable notAllAssigned = getAggregatedValue(DMIDComputation.NOT_ALL_ASSIGNED_AGG); + int64_t const* restartCountWritable = getAggregatedValue(RESTART_COUNTER_AGG); + int64_t restartCount = *restartCountWritable; + bool const* newMember = getAggregatedValue(NEW_MEMBER_AGG); + bool const* notAllAssigned = getAggregatedValue(NOT_ALL_ASSIGNED_AGG); - if ((notAllAssigned.get() == true) && (newMember.get() == false)) { + if ((*notAllAssigned == true) && (*newMember == false)) { /** * RESTART Cascading Behavior with lower profitability threshold */ float newThreshold = 1 - (PROFTIABILITY_DELTA * (restartCount + 1)); - setAggregatedValue(RESTART_COUNTER_AGG, restartCount + 1); - setAggregatedValue(PROFITABILITY_AGG, newThreshold); - setAggregatedValue(ITERATION_AGG, 1); + aggregate(RESTART_COUNTER_AGG, restartCount + 1); + aggregate(PROFITABILITY_AGG, newThreshold); + aggregate(ITERATION_AGG, 1); } } @@ -616,36 +686,24 @@ struct DMIDValueMasterContext : public MasterContext { * initial value */ - setAggregatedValue(NEW_MEMBER_AGG, false); - setAggregatedValue(NOT_ALL_ASSIGNED_AGG, false); + aggregate(NEW_MEMBER_AGG, false); + aggregate(NOT_ALL_ASSIGNED_AGG, false); } if (LOG_AGGS) { - if (getSuperstep() <= DMIDComputation.RW_ITERATIONBOUND + 4) { - DoubleDenseVector convergedDA = getAggregatedValue(DMIDComputation.DA_AGG); - System.out.print("Aggregator DA at step: "+getSuperstep()+" \nsize=" - + getTotalNumVertices() + "\n{ "); - for (int i = 0; i < getTotalNumVertices(); ++i) { - System.out.print(convergedDA.get(i)); - if (i != getTotalNumVertices() - 1) { - System.out.print(" , "); - } else { - System.out.println(" }\n"); - } - } + if (globalSuperstep() <= RW_ITERATIONBOUND + 4) { + VertexSumAggregator *convergedDA = (VertexSumAggregator*)getAggregator(DA_AGG); + + LOG_TOPIC(INFO, Logger::PREGEL) << "Aggregator DA at step: " << globalSuperstep(); + convergedDA->forEach([&](PregelID const& _id, double entry) { + LOG_TOPIC(INFO, Logger::PREGEL) << _id.key; + }); } - if (getSuperstep() == DMIDComputation.RW_ITERATIONBOUND +6) { - DoubleDenseVector leadershipVector = getAggregatedValue(DMIDComputation.LS_AGG); - System.out.print("Aggregator LS: \nsize=" - + getTotalNumVertices() + "\n{ "); - for (int i = 0; i < getTotalNumVertices(); ++i) { - System.out.print(leadershipVector.get(i)); - if (i != getTotalNumVertices() - 1) { - System.out.print(" , "); - } else { - System.out.println(" }\n"); - } - } + if (globalSuperstep() == RW_ITERATIONBOUND +6) { + VertexSumAggregator *leadershipVector = (VertexSumAggregator*)getAggregator(LS_AGG); + leadershipVector->forEach([&](PregelID const& _id, double entry) { + LOG_TOPIC(INFO, Logger::PREGEL) << "Aggregator LS:" << _id.key; + }); } } } @@ -655,54 +713,46 @@ struct DMIDValueMasterContext : public MasterContext { * higher number of followers than the average. */ void initializeGL() { - DoubleSparseVector initGL = new DoubleSparseVector( - (int) getTotalNumVertices()); + /** set Global Leader aggregator */ + VertexSumAggregator *initGL = (VertexSumAggregator*)getAggregator(GL_AGG); VertexSumAggregator *vecFD = (VertexSumAggregator*)getAggregator(FD_AGG); double averageFD = 0.0; int numLocalLeader = 0; /** get averageFollower degree */ - f - for (int i = 0; i < getTotalNumVertices(); ++i) { - averageFD += vecFD.get(i); - if (vecFD.get(i) != 0) { + vecFD->forEach([&](PregelID const& _id, double entry) { + averageFD += entry; + if (entry != 0) { numLocalLeader++; } - } + }); + if (numLocalLeader != 0) { averageFD = (double) averageFD / numLocalLeader; } /** set flag for globalLeader */ - if (LOG_AGGS) { - System.out.print("Global Leader:"); - } - for (int i = 0; i < vertexCount(); ++i) { - if (vecFD.get(i) > averageFD) { - initGL.set(i, 1.0); - if (LOG_AGGS) { - System.out.print(" " + i + " "); - } + //if (LOG_AGGS) { + // System.out.print("Global Leader:"); + //} + vecFD->forEach([&](PregelID const& _id, double entry) { + if (entry > averageFD) { + initGL->aggregate(_id.shard, _id.key, 1.0); + LOG_TOPIC(INFO, Logger::PREGEL) << "Leader " << _id.key; } - } - if (LOG_AGGS) { - System.out.println("\n"); - } - /** set Global Leader aggregator */ - setAggregatedValue(DMIDComputation.GL_AGG, initGL); + }); + //setAggregatedValue(DMIDComputation.GL_AGG, initGL); /** set not all vertices assigned aggregator to true */ - setAggregatedValue(DMIDComputation.NOT_ALL_ASSIGNED_AGG, - new BooleanWritable(true)); - + aggregate(NOT_ALL_ASSIGNED_AGG, true); } }; -MasterContext* SCC::masterContext(VPackSlice userParams) const { - return new SCCMasterContext(); +MasterContext* DMID::masterContext(VPackSlice userParams) const { + return new DMIDMasterContext(); } -IAggregator* SCC::aggregator(std::string const& name) const { +IAggregator* DMID::aggregator(std::string const& name) const { if (name == DA_AGG) { // permanent value return new VertexSumAggregator(false);// non perm } else if (name == LS_AGG) { @@ -712,9 +762,9 @@ IAggregator* SCC::aggregator(std::string const& name) const { } else if (name == GL_AGG) { return new VertexSumAggregator(true);// perm } else if (name == NEW_MEMBER_AGG) { - return new BooleanOrAggregator(false); // non perm + return new BoolOrAggregator(false); // non perm } else if (name == NOT_ALL_ASSIGNED_AGG) { - return new BooleanOrAggregator(false); // non perm + return new BoolOrAggregator(false); // non perm } else if (name == ITERATION_AGG) { return new MaxAggregator(0, true); // perm } else if (name == PROFITABILITY_AGG) { @@ -725,3 +775,7 @@ IAggregator* SCC::aggregator(std::string const& name) const { return nullptr; } + +MessageFormat* DMID::messageFormat() const { + return new DMIDMessageFormat(); +} diff --git a/arangod/Pregel/Algos/DMID/DMID.h b/arangod/Pregel/Algos/DMID/DMID.h index f2a0a115d1..53889056d1 100644 --- a/arangod/Pregel/Algos/DMID/DMID.h +++ b/arangod/Pregel/Algos/DMID/DMID.h @@ -20,8 +20,8 @@ /// @author Simon Grätzer //////////////////////////////////////////////////////////////////////////////// -#ifndef ARANGODB_PREGEL_ALGOS_SCC_H -#define ARANGODB_PREGEL_ALGOS_SCC_H 1 +#ifndef ARANGODB_PREGEL_ALGOS_DMID_H +#define ARANGODB_PREGEL_ALGOS_DMID_H 1 #include "Pregel/Algorithm.h" #include "Pregel/CommonFormats.h" diff --git a/arangod/Pregel/Algos/DMID/DMIDMessageFormat.h b/arangod/Pregel/Algos/DMID/DMIDMessageFormat.h index 0021946608..21c601fd3f 100644 --- a/arangod/Pregel/Algos/DMID/DMIDMessageFormat.h +++ b/arangod/Pregel/Algos/DMID/DMIDMessageFormat.h @@ -35,7 +35,6 @@ namespace arangodb { namespace pregel { struct DMIDMessageFormat : public MessageFormat { - static_assert(std::is_arithmetic::value, "Message type must be numeric"); DMIDMessageFormat() {} void unwrapValue(VPackSlice s, DMIDMessage& message) const override { VPackArrayIterator array(s); @@ -43,7 +42,7 @@ struct DMIDMessageFormat : public MessageFormat { message.senderId.key = (*(++array)).copyString(); message.leaderId.shard = (*array).getUInt(); message.leaderId.key = (*(++array)).copyString(); - message.value = (*(++array)).getNumber(); + message.weight = (*(++array)).getNumber(); } void addValue(VPackBuilder& arrayBuilder, DMIDMessage const& message) const override { @@ -52,7 +51,7 @@ struct DMIDMessageFormat : public MessageFormat { arrayBuilder.add(VPackValue(message.senderId.key)); arrayBuilder.add(VPackValue(message.leaderId.shard)); arrayBuilder.add(VPackValue(message.leaderId.key)); - arrayBuilder.add(VPackValue(message.value)); + arrayBuilder.add(VPackValue(message.weight)); arrayBuilder.close(); } }; diff --git a/arangod/Pregel/Algos/DMID/VertexSumAggregator.h b/arangod/Pregel/Algos/DMID/VertexSumAggregator.h index fc3604d657..e3f1518e9f 100644 --- a/arangod/Pregel/Algos/DMID/VertexSumAggregator.h +++ b/arangod/Pregel/Algos/DMID/VertexSumAggregator.h @@ -22,9 +22,10 @@ #include #include -#include +#include #include #include +#include #include "Pregel/Graph.h" #ifndef ARANGODB_PREGEL_AGG_DENSE_VECTOR_H @@ -34,17 +35,17 @@ namespace arangodb { namespace pregel { struct VertexSumAggregator : public IAggregator { - static_assert(std::is_arithmetic::value, "Type must be numeric"); typedef std::map> VertexMap; + typedef std::pair> MyPair; VertexSumAggregator(bool perm = false) - : _empty(empty), _permanent(perm) {} + : _permanent(perm) {} // if you use this from a vertex I will end you - void aggregate(void const* valuePtr) { + void aggregate(void const* valuePtr) override { VertexMap const* map = (VertexMap const*)valuePtr; - for (auto pair1 const& : map) { - for (auto pair2 const& : it.second) { + for (MyPair const& pair1 : *map) { + for (auto const& pair2 : pair1.second) { _entries[pair1.first][pair2.first] += pair2.second; } } @@ -52,14 +53,14 @@ struct VertexSumAggregator : public IAggregator { void parseAggregate(VPackSlice const& slice) override { for (auto const& pair: VPackObjectIterator(slice)) { - prgl_shard_t shard = it.key.getUInt(); + prgl_shard_t shard = std::stoi(pair.key.copyString()); std::string key; - VPackLength i = 0; + VPackValueLength i = 0; for (VPackSlice const& val : VPackArrayIterator(pair.value)) { if (i % 2 == 0) { key = val.copyString(); } else { - entries[shard][key] += val.getNumber(); + _entries[shard][key] += val.getNumber(); } i++; } @@ -68,16 +69,16 @@ struct VertexSumAggregator : public IAggregator { void const* getAggregatedValue() const override { return &_entries; }; - void setAggregatedValue(VPackSlice slice) override { - for (auto const& pair: VPackObjectIterator(slice)) { - prgl_shard_t shard = it.key.getUInt(); + void setAggregatedValue(VPackSlice const& slice) override { + for (auto const& pair : VPackObjectIterator(slice)) { + prgl_shard_t shard = std::stoi(pair.key.copyString()); std::string key; - VPackLength i = 0; + VPackValueLength i = 0; for (VPackSlice const& val : VPackArrayIterator(pair.value)) { if (i % 2 == 0) { key = val.copyString(); } else { - entries[shard][key] = val.getNumber(); + _entries[shard][key] = val.getNumber(); } i++; } @@ -85,9 +86,14 @@ struct VertexSumAggregator : public IAggregator { } void serialize(std::string const& key, VPackBuilder &builder) const override { - builder.add(key, VPackValueType::Array); - for (T const& e : _entries) { - builder.add(VPackValue(e)); + builder.add(key, VPackValue(VPackValueType::Object)); + for (auto const& pair1 : _entries) { + builder.add(std::to_string(pair1.first), VPackValue(VPackValueType::Array)); + for (auto const& pair2 : pair1.second) { + builder.add(VPackValue(pair2.first)); + builder.add(VPackValue(pair2.second)); + } + builder.close(); } builder.close(); }; @@ -101,12 +107,12 @@ struct VertexSumAggregator : public IAggregator { double getAggregatedValue(prgl_shard_t shard, std::string const& key) { auto const& it1 = _entries.find(shard); if (it1 != _entries.end()) { - auto const& it2 = it1.second.find(key); - if (it2 != it1.second.end()) { - return it2.second; + auto const& it2 = it1->second.find(key); + if (it2 != it1->second.end()) { + return it2->second; } } - return _empty; + return _default; } //void setValue(prgl_shard_t shard, std::string const& key, double val) { @@ -139,4 +145,5 @@ struct VertexSumAggregator : public IAggregator { bool _permanent; }; } +} #endif diff --git a/arangod/Pregel/Graph.h b/arangod/Pregel/Graph.h index d15174fea3..51405c97fc 100644 --- a/arangod/Pregel/Graph.h +++ b/arangod/Pregel/Graph.h @@ -39,10 +39,14 @@ struct PregelID { PregelID() : shard(0), key("") {} PregelID(prgl_shard_t s, std::string const& k) : shard(s), key(k) {} - inline bool operator==(const PregelID& rhs) { + inline bool operator==(const PregelID& rhs) const { return shard == rhs.shard && key == rhs.key; } + inline bool operator<(const PregelID& rhs) const { + return shard < rhs.shard || (shard == rhs.shard && key < rhs.key); + } + bool inline isValid() const { return shard != invalid_prgl_shard && !key.empty(); } diff --git a/arangod/Pregel/Iterators.h b/arangod/Pregel/Iterators.h index ca3a27337d..076eac9fc7 100644 --- a/arangod/Pregel/Iterators.h +++ b/arangod/Pregel/Iterators.h @@ -68,7 +68,7 @@ class MessageIterator { // postfix ++ MessageIterator operator++(int) { - MessageIterator result(_data, _size); + MessageIterator result(*this); ++(*this); return result; } diff --git a/arangod/Pregel/MasterContext.h b/arangod/Pregel/MasterContext.h index 48469159e6..1253f451d5 100644 --- a/arangod/Pregel/MasterContext.h +++ b/arangod/Pregel/MasterContext.h @@ -62,6 +62,10 @@ class MasterContext { inline const T* getAggregatedValue(std::string const& name) { return (const T*)_aggregators->getAggregatedValue(name); } + + inline IAggregator* getAggregator(std::string const& name) { + return _aggregators->getAggregator(name); + } inline void enterNextGlobalSuperstep() { _enterNextGSS = true; }