From dc6e993038627d424ccedb83c29836fd944e1df7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20Gra=CC=88tzer?= Date: Sun, 6 Nov 2016 23:20:59 +0100 Subject: [PATCH] Aggregator support started --- arangod/CMakeLists.txt | 3 +- arangod/Pregel/Aggregator.h | 29 ++++-- arangod/Pregel/Algorithm.h | 3 +- arangod/Pregel/AlgorithmContext.h | 68 ------------- arangod/Pregel/Algos/PageRank.cpp | 106 ++++++++++++++++++++ arangod/Pregel/Algos/PageRank.h | 47 +++++++++ arangod/Pregel/Algos/SCC.cpp | 2 +- arangod/Pregel/Algos/SSSP.cpp | 2 +- arangod/Pregel/Combiners/FloatSumCombiner.h | 39 +++++++ arangod/Pregel/Conductor.cpp | 43 ++++++-- arangod/Pregel/Conductor.h | 4 +- arangod/Pregel/GraphFormat.h | 27 +++++ arangod/Pregel/MessageCombiner.h | 4 +- arangod/Pregel/OutgoingCache.cpp | 38 ++++--- arangod/Pregel/OutgoingCache.h | 2 - arangod/Pregel/Utils.cpp | 5 +- arangod/Pregel/Utils.h | 8 +- arangod/Pregel/Worker.cpp | 1 + arangod/Pregel/Worker.h | 4 +- arangod/Pregel/WorkerState.cpp | 10 +- arangod/Pregel/WorkerState.h | 5 +- arangod/RestHandler/RestPregelHandler.cpp | 2 +- 22 files changed, 328 insertions(+), 124 deletions(-) delete mode 100644 arangod/Pregel/AlgorithmContext.h create mode 100644 arangod/Pregel/Algos/PageRank.cpp create mode 100644 arangod/Pregel/Algos/PageRank.h create mode 100644 arangod/Pregel/Combiners/FloatSumCombiner.h diff --git a/arangod/CMakeLists.txt b/arangod/CMakeLists.txt index 6d746daeee..6d11d15028 100644 --- a/arangod/CMakeLists.txt +++ b/arangod/CMakeLists.txt @@ -363,6 +363,7 @@ SET(ARANGOD_SOURCES Wal/Slots.cpp Wal/SynchronizerThread.cpp Pregel/Algorithm.cpp + Pregel/Aggregator.cpp Pregel/Conductor.cpp Pregel/PregelFeature.cpp Pregel/IncomingCache.cpp @@ -376,7 +377,7 @@ SET(ARANGOD_SOURCES Pregel/MessageFormat.cpp Pregel/ResultWriter.cpp Pregel/Algos/SSSP.cpp - Pregel/Algos/SCC.cpp + Pregel/Algos/PageRank.cpp ${ADDITIONAL_BIN_ARANGOD_SOURCES} ) diff --git a/arangod/Pregel/Aggregator.h b/arangod/Pregel/Aggregator.h index 7fd95baf96..1dc5a510cd 100644 --- a/arangod/Pregel/Aggregator.h +++ b/arangod/Pregel/Aggregator.h @@ -23,7 +23,7 @@ #include #include -#include +#include #include #ifndef ARANGODB_PREGEL_AGGREGATOR_H @@ -32,14 +32,16 @@ namespace arangodb { namespace pregel { -template +template class Aggregator { public: Aggregator(const Aggregator&) = delete; Aggregator& operator=(const Aggregator&) = delete; - virtual T const& getValue() const = 0; - virtual void reduce(T const& otherValue) = 0; + virtual void const* getValue() const = 0; + virtual void reduce( void const* otherValue) = 0; + virtual void parse(VPackSlice otherValue) = 0; + virtual void serialize(VPackBuilder *b) = 0; std::string name() { return _name; } protected: @@ -49,17 +51,24 @@ class Aggregator { std::string _name; }; -class MinIntegerAggregator : public Aggregator { - MinIntegerAggregator(std::string const& name, int64_t init) +class FloatMinAggregator : public Aggregator { + MinIntegerAggregator(std::string const& name, float init) : Aggregator(name), _value(init) {} - int64_t const& getValue() const override { return _value; }; - void reduce(int64_t const& otherValue) override { - if (otherValue < _value) _value = otherValue; + void const* getValue() const override { return &_value; }; + void reduce(void const* otherValue) override { + float other = *((float*)otherValue); + if (other < _value) _value = other; + }; + void parse(VPackSlice otherValue) override { + this->reduce(otherValue.getInteger()); + }; + void serialize(VPackBuilder *b) override { + b.add(name(), _value); }; private: - int64_t _value; + float _value; }; } } diff --git a/arangod/Pregel/Algorithm.h b/arangod/Pregel/Algorithm.h index e8b15a1635..8c6352a512 100644 --- a/arangod/Pregel/Algorithm.h +++ b/arangod/Pregel/Algorithm.h @@ -44,10 +44,11 @@ struct Algorithm { public: virtual ~Algorithm() {} // virtual bool isFixpointAlgorithm() const {return false;} - virtual bool preserveTransactions() const { return false; } + //virtual bool preserveTransactions() const { return false; } virtual size_t estimatedVertexSize() const { return sizeof(V); }; virtual size_t estimatedEdgeSize() const { return sizeof(E); }; + virtual void aggregators(std::vector> &aggregators) {} virtual std::shared_ptr> inputFormat() const = 0; virtual std::shared_ptr> messageFormat() const = 0; diff --git a/arangod/Pregel/AlgorithmContext.h b/arangod/Pregel/AlgorithmContext.h deleted file mode 100644 index e8b15a1635..0000000000 --- a/arangod/Pregel/AlgorithmContext.h +++ /dev/null @@ -1,68 +0,0 @@ -//////////////////////////////////////////////////////////////////////////////// -/// DISCLAIMER -/// -/// Copyright 2016 ArangoDB GmbH, Cologne, Germany -/// -/// Licensed under the Apache License, Version 2.0 (the "License"); -/// you may not use this file except in compliance with the License. -/// You may obtain a copy of the License at -/// -/// http://www.apache.org/licenses/LICENSE-2.0 -/// -/// Unless required by applicable law or agreed to in writing, software -/// distributed under the License is distributed on an "AS IS" BASIS, -/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -/// See the License for the specific language governing permissions and -/// limitations under the License. -/// -/// Copyright holder is ArangoDB GmbH, Cologne, Germany -/// -/// @author Simon Grätzer -//////////////////////////////////////////////////////////////////////////////// - -#ifndef ARANGODB_PREGEL_ALGORITHM_H -#define ARANGODB_PREGEL_ALGORITHM_H 1 - -#include -#include -#include - -#include "Basics/Common.h" -#include "GraphFormat.h" -#include "MessageCombiner.h" -#include "MessageFormat.h" - -namespace arangodb { -namespace pregel { - -template -class VertexComputation; - -// specify serialization, whatever -template -struct Algorithm { - public: - virtual ~Algorithm() {} - // virtual bool isFixpointAlgorithm() const {return false;} - virtual bool preserveTransactions() const { return false; } - - virtual size_t estimatedVertexSize() const { return sizeof(V); }; - virtual size_t estimatedEdgeSize() const { return sizeof(E); }; - - virtual std::shared_ptr> inputFormat() const = 0; - virtual std::shared_ptr> messageFormat() const = 0; - virtual std::shared_ptr> messageCombiner() const = 0; - virtual std::shared_ptr> createComputation() - const = 0; - - std::string const& getName() const { return _name; } - - protected: - Algorithm(std::string const& name) : _name(name){}; - - private: - std::string _name; -}; -} -} -#endif diff --git a/arangod/Pregel/Algos/PageRank.cpp b/arangod/Pregel/Algos/PageRank.cpp new file mode 100644 index 0000000000..94b3397864 --- /dev/null +++ b/arangod/Pregel/Algos/PageRank.cpp @@ -0,0 +1,106 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2016 ArangoDB GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Simon Grätzer +//////////////////////////////////////////////////////////////////////////////// + +#include "SCC.h" +#include "Pregel/VertexComputation.h" +#include "Pregel/GraphFormat.h" +#include "Pregel/Utils.h" + +#include "Vocbase/vocbase.h" +#include "Cluster/ClusterInfo.h" +#include "Utils/OperationCursor.h" +#include "Utils/SingleCollectionTransaction.h" +#include "Utils/StandaloneTransactionContext.h" +#include "Utils/Transaction.h" + +using namespace arangodb; +using namespace arangodb::pregel; +using namespace arangodb::pregel::algos; + +struct SCCGraphFormat : public GraphFormat { + uint64_t _currentId = 0; + void willUseCollection(TRI_vocbase_t *vocbase, std::string const& shard, bool isEdgeCollection) override { + int64_t count = Utils::countDocuments(vocbase, shard); + _currentId = ClusterInfo::instance()->uniqid(count); + } + SCCGraphFormat() {} + + size_t copyVertexData(VPackSlice document, void* targetPtr, + size_t maxSize) override { + *((int64_t*)targetPtr) = _currentId; + _currentId++; + return sizeof(int64_t); + } + + size_t copyEdgeData(VPackSlice document, void* targetPtr, + size_t maxSize) override { + return 0; + } + + int64_t readVertexData(void* ptr) override { return *((int64_t*)ptr); } + int64_t readEdgeData(void* ptr) override { return *((int64_t*)ptr); } +}; + +std::shared_ptr> SCCAlgorithm::inputFormat() + const { + return std::make_shared("value", -1, 1); +} + +std::shared_ptr> SCCAlgorithm::messageFormat() const { + return std::shared_ptr(new IntegerMessageFormat()); +} + +std::shared_ptr> SCCAlgorithm::messageCombiner() + const { + return std::shared_ptr(new MinIntegerCombiner()); +} + +struct SCCComputation : public VertexComputation { + SCCComputation() {} + void compute(std::string const& vertexID, + MessageIterator const& messages) override { + /*int64_t tmp = vertexData(); + for (const int64_t* msg : messages) { + if (tmp < 0 || *msg < tmp) { + tmp = *msg; + }; + } + int64_t* state = mutableVertexData(); + if (tmp >= 0 && (getGlobalSuperstep() == 0 || tmp != *state)) { + LOG(INFO) << "Recomputing value for vertex " << vertexID; + *state = tmp; // update state + + EdgeIterator edges = getEdges(); + for (EdgeEntry* edge : edges) { + int64_t val = *(edge->getData()) + tmp; + uint64_t toID = edge->toPregelID(); + sendMessage(toID, val); + } + }*/ + voteHalt(); + } +}; + +std::shared_ptr> +SCCAlgorithm::createComputation() const { + return std::shared_ptr(new SCCComputation()); +} diff --git a/arangod/Pregel/Algos/PageRank.h b/arangod/Pregel/Algos/PageRank.h new file mode 100644 index 0000000000..c8a0f320b3 --- /dev/null +++ b/arangod/Pregel/Algos/PageRank.h @@ -0,0 +1,47 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2016 ArangoDB GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Simon Grätzer +//////////////////////////////////////////////////////////////////////////////// + +#ifndef ARANGODB_PREGEL_ALGOS_SSSP_H +#define ARANGODB_PREGEL_ALGOS_SSSP_H 1 + +#include "Pregel/Algorithm.h" + +namespace arangodb { +namespace pregel { +namespace algos { + +/// PageRank + struct PageRankAlgorithm : public Algorithm { + public: + SCCAlgorithm() : Algorithm("PageRank") {} + + void aggregators(std::vector> &aggregators) override; + std::shared_ptr> inputFormat() const override; + std::shared_ptr> messageFormat() const override; + std::shared_ptr> messageCombiner() const override; + std::shared_ptr> + createComputation() const override; +}; +} +} +} +#endif diff --git a/arangod/Pregel/Algos/SCC.cpp b/arangod/Pregel/Algos/SCC.cpp index 94b3397864..8a98ca9e11 100644 --- a/arangod/Pregel/Algos/SCC.cpp +++ b/arangod/Pregel/Algos/SCC.cpp @@ -71,7 +71,7 @@ std::shared_ptr> SCCAlgorithm::messageFormat() const { std::shared_ptr> SCCAlgorithm::messageCombiner() const { - return std::shared_ptr(new MinIntegerCombiner()); + return std::shared_ptr(new IntegerMinCombiner()); } struct SCCComputation : public VertexComputation { diff --git a/arangod/Pregel/Algos/SSSP.cpp b/arangod/Pregel/Algos/SSSP.cpp index 45c40c77f1..ffcae7dd5d 100644 --- a/arangod/Pregel/Algos/SSSP.cpp +++ b/arangod/Pregel/Algos/SSSP.cpp @@ -66,7 +66,7 @@ std::shared_ptr> SSSPAlgorithm::messageFormat() const { std::shared_ptr> SSSPAlgorithm::messageCombiner() const { - return std::shared_ptr(new MinIntegerCombiner()); + return std::shared_ptr(new IntegerMinCombiner()); } std::shared_ptr> diff --git a/arangod/Pregel/Combiners/FloatSumCombiner.h b/arangod/Pregel/Combiners/FloatSumCombiner.h new file mode 100644 index 0000000000..8a1e0b2c0c --- /dev/null +++ b/arangod/Pregel/Combiners/FloatSumCombiner.h @@ -0,0 +1,39 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2016 ArangoDB GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Simon Grätzer +//////////////////////////////////////////////////////////////////////////////// + +#include "Pregel/MessageCombiner.h" + +#ifndef ARANGODB_PREGEL_FLOAT_SUM_COMBINER_H +#define ARANGODB_PREGEL_FLOAT_SUM_COMBINER_H 1 +namespace arangodb { +namespace pregel { + +struct FloatSumCombiner : public MessageCombiner { + MinIntegerCombiner() {} + float combine(float const& firstValue, + float const& secondValue) const override { + return firstValue + secondValue; + }; +}; +} +} +#endif diff --git a/arangod/Pregel/Conductor.cpp b/arangod/Pregel/Conductor.cpp index c8a483a852..786640fdd1 100644 --- a/arangod/Pregel/Conductor.cpp +++ b/arangod/Pregel/Conductor.cpp @@ -36,6 +36,9 @@ #include #include +#include "Algorithm.h" +#include "Pregel/Algos/PageRank.h" + using namespace arangodb; using namespace arangodb::pregel; @@ -52,6 +55,14 @@ Conductor::Conductor( bool isCoordinator = ServerState::instance()->isCoordinator(); assert(isCoordinator); LOG(INFO) << "constructed conductor"; + + if (algorithm == "sssp") { + } else if (algorithm == "pagerank") { + algos::PageRankAlgorithm algo(); + algo.aggregators(_aggregators); + } else { + LOG(ERR) << "Unsupported algorithm"; + } } Conductor::~Conductor() { @@ -71,8 +82,7 @@ static void printResults(std::vector const& requests) { } static void resolveShards(LogicalCollection const* collection, - std::map>& serverMap, - std::map> &serverShardPlanIdMap) { + std::map>& serverMap) { ClusterInfo* ci = ClusterInfo::instance(); std::shared_ptr> shardIDs = @@ -83,7 +93,6 @@ static void resolveShards(LogicalCollection const* collection, ci->getResponsibleServer(shard); if (servers->size() > 0) { serverMap[(*servers)[0]].push_back(shard); - serverShardPlanIdMap[(*servers)[0]][shard] = collection->planId_as_string(); } } } @@ -91,23 +100,25 @@ static void resolveShards(LogicalCollection const* collection, void Conductor::start() { ClusterComm* cc = ClusterComm::instance(); int64_t vertexCount = 0, edgeCount = 0; - std::map> serverShardPlanIdMap; + std::map collectionPlanIdMap; std::map> edgeServerMap; for (auto collection : _vertexCollections) { + collectionPlanIdMap[collection->name()] = collection->planId_as_string(); size_t cc = Utils::countDocuments(_vocbaseGuard.vocbase(), collection->name()); if (cc > 0) { vertexCount += cc; - resolveShards(collection.get(), _vertexServerMap, serverShardPlanIdMap); + resolveShards(collection.get(), _vertexServerMap); } else { LOG(WARN) << "Collection does not contain vertices"; } } for (auto collection : _edgeCollections) { + collectionPlanIdMap[collection->name()] = collection->planId_as_string(); size_t cc = Utils::countDocuments(_vocbaseGuard.vocbase(), collection->name()); if (cc > 0) { edgeCount += cc; - resolveShards(collection.get(), edgeServerMap, serverShardPlanIdMap); + resolveShards(collection.get(), edgeServerMap); } else { LOG(WARN) << "Collection does not contain edges"; } @@ -147,9 +158,9 @@ void Conductor::start() { b.add(VPackValue(eit)); } b.close(); - b.add(Utils::shardPlanMapKey, VPackValue(VPackValueType::Object)); - for (auto const& shardPair : serverShardPlanIdMap[it.first]) { - b.add(shardPair.first, VPackValue(shardPair.second)); + b.add(Utils::collectionPlanIdMapKey, VPackValue(VPackValueType::Object)); + for (auto const& pair : collectionPlanIdMap) { + b.add(pair.first, VPackValue(pair.second)); } b.close(); b.close(); @@ -174,8 +185,20 @@ void Conductor::finishedGlobalStep(VPackSlice& data) { LOG(WARN) << "Conductor did not expect another finishedGlobalStep call"; return; } - _responseCount++; + + VPackSlice aggValues = data.get(Utils::aggregatorsKey); + if (aggValues.isObject()) { + for (std::unique_ptr &aggregator : _aggregators) { + VPackSlice val = aggValues.get(aggregator->name()) + if (!val.isNone()) { + aggregator->parse(val); + } + } + } + + + VPackSlice isDone = data.get(Utils::doneKey); if (isDone.isBool() && isDone.getBool()) { _doneCount++; diff --git a/arangod/Pregel/Conductor.h b/arangod/Pregel/Conductor.h index 7657d711d0..bff3f8a8dc 100644 --- a/arangod/Pregel/Conductor.h +++ b/arangod/Pregel/Conductor.h @@ -55,11 +55,13 @@ class Conductor { Mutex _finishedGSSMutex; // prevents concurrent calls to finishedGlobalStep VocbaseGuard _vocbaseGuard; const unsigned int _executionNumber; + std::string _algorithm; ExecutionState _state = ExecutionState::RUNNING; + + std::vector > _aggregators; std::vector> _vertexCollections, _edgeCollections; std::map> _vertexServerMap; - std::string _algorithm; unsigned int _globalSuperstep; int32_t _dbServerCount = 0; diff --git a/arangod/Pregel/GraphFormat.h b/arangod/Pregel/GraphFormat.h index 058eec8e9b..de9c799213 100644 --- a/arangod/Pregel/GraphFormat.h +++ b/arangod/Pregel/GraphFormat.h @@ -75,6 +75,33 @@ class IntegerGraphFormat : public GraphFormat { int64_t readVertexData(void* ptr) override { return *((int64_t*)ptr); } int64_t readEdgeData(void* ptr) override { return *((int64_t*)ptr); } }; + +class FloatGraphFormat : public GraphFormat { + const std::string _field; + const float _vDefault, _eDefault; + + public: + IntegerGraphFormat(std::string const& field, float vertexNull, + float edgeNull) + : _field(field), _vDefault(vertexNull), _eDefault(edgeNull) {} + + size_t copyVertexData(VPackSlice document, void* targetPtr, + size_t maxSize) override { + VPackSlice val = document.get(_field); + *((float*)targetPtr) = val.isInteger() ? val.getInt() : _vDefault; + return sizeof(float); + } + + size_t copyEdgeData(VPackSlice document, void* targetPtr, + float maxSize) override { + VPackSlice val = document.get(_field); + *((float*)targetPtr) = val.isInteger() ? val.getInt() : _eDefault; + return sizeof(float); + } + + float readVertexData(void* ptr) override { return *((float*)ptr); } + float readEdgeData(void* ptr) override { return *((float*)ptr); } +}; } } #endif diff --git a/arangod/Pregel/MessageCombiner.h b/arangod/Pregel/MessageCombiner.h index 7ac698bcde..9c6c178e10 100644 --- a/arangod/Pregel/MessageCombiner.h +++ b/arangod/Pregel/MessageCombiner.h @@ -34,8 +34,8 @@ struct MessageCombiner { virtual M combine(M const& firstValue, M const& secondValue) const = 0; }; -struct MinIntegerCombiner : public MessageCombiner { - MinIntegerCombiner() {} +struct IntegerMinCombiner : public MessageCombiner { + IntegerMinCombiner() {} int64_t combine(int64_t const& firstValue, int64_t const& secondValue) const override { return firstValue < secondValue ? firstValue : secondValue; diff --git a/arangod/Pregel/OutgoingCache.cpp b/arangod/Pregel/OutgoingCache.cpp index 1daf7550ad..463696134a 100644 --- a/arangod/Pregel/OutgoingCache.cpp +++ b/arangod/Pregel/OutgoingCache.cpp @@ -54,19 +54,26 @@ OutgoingCache::~OutgoingCache() { template void OutgoingCache::clear() { - // TODO better way? - /*for (auto const& it : _map) { - for (auto const& it2 : it.second) { - it2.second->clear(); // clears VPackBuilder - } - }*/ _map.clear(); _containedMessages = 0; } -static void resolveResponsibleShard(ClusterInfo* ci, LogicalCollection* info, - std::string const& vertexKey, - std::string& responsibleShard) { +static inline LogicalCollection* resolveCollection(ClusterInfo* ci, + std::string const& database, + std::string const& collectionName, + std::map const& collectionPlanIdMap) { + auto const& it = collectionPlanIdMap.find(collectionName); + if (it == collectionPlanIdMap.end()) { + LOG(ERR) << "Collection this messages is going to is unkown"; + THROW_ARANGO_EXCEPTION(TRI_ERROR_FORBIDDEN); + } + std::shared_ptr collectionInfo(ci->getCollection(database, it->second)); + return collectionInfo.get(); +} + +static inline void resolveShard(ClusterInfo* ci, LogicalCollection* info, + std::string const& vertexKey, + std::string& responsibleShard) { bool usesDefaultShardingAttributes; VPackBuilder partial; partial.openObject(); @@ -86,15 +93,16 @@ static void resolveResponsibleShard(ClusterInfo* ci, LogicalCollection* info, template void OutgoingCache::sendMessageTo(std::string const& toValue, M const& data) { - assert(_combiner); - std::string _key = Utils::vertexKeyFromToValue(toValue); - std::string collectionName = Utils::collectionFromToValue(toValue); + std::size_t pos = toValue.find('/'); + std::string _key = toValue.substr(pos + 1, toValue.length() - pos - 1); + std::string collectionName = toValue.substr(0, pos); LOG(INFO) << "Adding outgoing messages for " << collectionName << "/" << _key; - std::shared_ptr collectionInfo(_ci->getCollection(_state->database(), - collectionName)); + + LogicalCollection *coll = resolveCollection(_ci, _state->database(), + collectionName, _state->collectionPlanIdMap()); ShardID responsibleShard; - resolveResponsibleShard(_ci, _collInfo.get(), _key, responsibleShard); + resolveShard(_ci, coll, _key, responsibleShard); LOG(INFO) << "Responsible shard: " << responsibleShard; std::vector const& localShards = _state->localVertexShardIDs(); diff --git a/arangod/Pregel/OutgoingCache.h b/arangod/Pregel/OutgoingCache.h index c32dde9713..012ce72b6e 100644 --- a/arangod/Pregel/OutgoingCache.h +++ b/arangod/Pregel/OutgoingCache.h @@ -55,12 +55,10 @@ class OutgoingCache { /// @brief two stage map: shard -> vertice -> message std::unordered_map> _map; std::shared_ptr> _state; - std::shared_ptr _collInfo; ClusterInfo* _ci; std::string _baseUrl; /// @brief current number of vertices stored - std::map _collectionPlanIdMap; size_t _containedMessages = 0; size_t _sendMessages = 0; }; diff --git a/arangod/Pregel/Utils.cpp b/arangod/Pregel/Utils.cpp index 30b43a3366..2460a77619 100644 --- a/arangod/Pregel/Utils.cpp +++ b/arangod/Pregel/Utils.cpp @@ -43,7 +43,7 @@ std::string const Utils::executionNumberKey = "exn"; std::string const Utils::totalVertexCount = "vertexCount"; std::string const Utils::totalEdgeCount = "edgeCount"; -std::string const Utils::shardPlanMapKey = "shardPlanMap"; +std::string const Utils::collectionPlanIdMapKey = "collectionPlanIdMap"; std::string const Utils::vertexShardsListKey = "vertexShards"; std::string const Utils::edgeShardsListKey = "edgeShards"; @@ -56,6 +56,9 @@ std::string const Utils::messagesKey = "msgs"; std::string const Utils::senderKey = "sender"; std::string const Utils::doneKey = "done"; +std::string const Utils::parameterMapKey = "params"; +std::string const Utils::aggregatorsKey = "aggregators"; + std::string const Utils::edgeShardingKey = "_vertex"; std::string Utils::baseUrl(std::string dbName) { diff --git a/arangod/Pregel/Utils.h b/arangod/Pregel/Utils.h index 2a8e7441e7..126faea262 100644 --- a/arangod/Pregel/Utils.h +++ b/arangod/Pregel/Utils.h @@ -46,10 +46,11 @@ class Utils { static std::string const executionNumberKey; static std::string const algorithmKey; static std::string const coordinatorIdKey; - + static std::string const totalVertexCount; static std::string const totalEdgeCount; - static std::string const shardPlanMapKey; + + static std::string const collectionPlanIdMapKey; static std::string const vertexShardsListKey; static std::string const edgeShardsListKey; @@ -58,6 +59,9 @@ class Utils { static std::string const senderKey; static std::string const doneKey; + static std::string const parameterMapKey; + static std::string const aggregatorsKey; + static std::string const edgeShardingKey; static std::string baseUrl(std::string dbName); static std::string collectionFromToValue(std::string const& graphKey); diff --git a/arangod/Pregel/Worker.cpp b/arangod/Pregel/Worker.cpp index 1aad4567b7..ba77adf758 100644 --- a/arangod/Pregel/Worker.cpp +++ b/arangod/Pregel/Worker.cpp @@ -87,6 +87,7 @@ Worker::~Worker() { LOG(INFO) << "Called ~Worker()"; const size_t threadNum = 1; _workerPool.reset(new ThreadPool(static_cast(threadNum), "Pregel Worker")); + _ctx->algorithm()->aggregators(_aggregators); } /// @brief Setup next superstep diff --git a/arangod/Pregel/Worker.h b/arangod/Pregel/Worker.h index a4e0144060..91dec328b4 100644 --- a/arangod/Pregel/Worker.h +++ b/arangod/Pregel/Worker.h @@ -57,11 +57,11 @@ class Worker : public IWorker { void finalizeExecution(VPackSlice data) override; private: - // Mutex _messagesMutex; TODO figure this out + bool _running = true; std::shared_ptr> _ctx; std::shared_ptr> _graphStore; std::unique_ptr _workerPool; - bool _running = true; + std::vector > _aggregators; void workerJobIsDone(bool allVerticesHalted); }; diff --git a/arangod/Pregel/WorkerState.cpp b/arangod/Pregel/WorkerState.cpp index a9aefe8904..040a837fb8 100644 --- a/arangod/Pregel/WorkerState.cpp +++ b/arangod/Pregel/WorkerState.cpp @@ -38,9 +38,9 @@ WorkerState::WorkerState(Algorithm* algo, DatabaseID dbname, VPackSlice vertexShardIDs = params.get(Utils::vertexShardsListKey); VPackSlice edgeShardIDs = params.get(Utils::edgeShardsListKey); VPackSlice execNum = params.get(Utils::executionNumberKey); - VPackSlice planIDs = params.get(Utils::shardPlanMapKey); + VPackSlice collectionPlanIdMap = params.get(Utils::collectionPlanIdMapKey); if (!coordID.isString() || !vertexShardIDs.isArray() || - !edgeShardIDs.isArray() || !execNum.isInteger()) { + !edgeShardIDs.isArray() || !execNum.isInteger() || !collectionPlanIdMap.isObject()) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER, "Supplied bad parameters to worker"); } @@ -63,9 +63,9 @@ WorkerState::WorkerState(Algorithm* algo, DatabaseID dbname, LOG(INFO) << name; } - for (auto const& it : VPackObjectIterator(planIDs)) { - - } + for (auto const& it : VPackObjectIterator(collectionPlanIdMap)) { + _collectionPlanIdMap[it.key.toString()] = it.value.toString(); + } auto format = algo->messageFormat(); auto combiner = algo->messageCombiner(); diff --git a/arangod/Pregel/WorkerState.h b/arangod/Pregel/WorkerState.h index 7c912dcc8e..ece7c2a4e0 100644 --- a/arangod/Pregel/WorkerState.h +++ b/arangod/Pregel/WorkerState.h @@ -25,7 +25,7 @@ #include #include "Basics/Common.h" -#include "Cluster/CLusterInfo.h" +#include "Cluster/ClusterInfo.h" #include "Algorithm.h" @@ -78,6 +78,8 @@ class WorkerState { } std::shared_ptr> algorithm() { return _algorithm; } + + std::map const& collectionPlanIdMap() {return _collectionPlanIdMap;}; private: /// @brief guard to make sure the database is not dropped while used by us @@ -89,6 +91,7 @@ class WorkerState { std::string _coordinatorId; const std::string _database; std::vector _localVertexShardIDs, _localEdgeShardIDs; + std::map _collectionPlanIdMap; std::shared_ptr> _readCache, _writeCache; void swapIncomingCaches(); // only call when message receiving is locked diff --git a/arangod/RestHandler/RestPregelHandler.cpp b/arangod/RestHandler/RestPregelHandler.cpp index 3074d177c7..c20d5994e4 100644 --- a/arangod/RestHandler/RestPregelHandler.cpp +++ b/arangod/RestHandler/RestPregelHandler.cpp @@ -111,5 +111,5 @@ RestStatus RestPregelHandler::execute() { LOG(ERR) << "Exception"; } - return status::DONE; + return RestStatus::DONE; }