diff --git a/arangod/Pregel/Conductor.cpp b/arangod/Pregel/Conductor.cpp index 309befe370..9b4eea601b 100755 --- a/arangod/Pregel/Conductor.cpp +++ b/arangod/Pregel/Conductor.cpp @@ -24,31 +24,30 @@ #include "Utils.h" #include "Basics/MutexLocker.h" +#include "Basics/StringUtils.h" #include "Utils/SingleCollectionTransaction.h" #include "Utils/StandaloneTransactionContext.h" #include "Cluster/ClusterInfo.h" #include "Cluster/ClusterComm.h" +#include "VocBase/LogicalCollection.h" #include "VocBase/ticks.h" #include "VocBase/vocbase.h" #include #include -using namespace std; using namespace arangodb; using namespace arangodb::pregel; -Conductor::Conductor(int executionNumber, - //TRI_vocbase_t *vocbase, - std::string const&vertexCollection, - CollectionID vertexCollectionID, - std::string const& edgeCollection, +Conductor::Conductor(unsigned int executionNumber, + TRI_vocbase_t *vocbase, + std::shared_ptr vertexCollection, + std::shared_ptr edgeCollection, std::string const& algorithm) : +_vocbaseGuard(vocbase), _executionNumber(executionNumber), -//_vocbase(vocbase), _vertexCollection(vertexCollection), _edgeCollection(edgeCollection), -_vertexCollectionID(vertexCollectionID), _algorithm(algorithm) { bool isCoordinator = ServerState::instance()->isCoordinator(); @@ -56,56 +55,106 @@ _algorithm(algorithm) { LOG(INFO) << "constructed conductor"; } +static void printResults(std::vector &requests) { + for (auto const& req : requests) { + auto& res = req.result; + if (res.status == CL_COMM_RECEIVED) { + LOG(INFO) << res.answer->payload().toJson(); + } + } +} + + void Conductor::start() { + ClusterComm* cc = ClusterComm::instance(); + std::unordered_map> vertexServerMap, edgeServerMap; + resolveWorkerServers(vertexServerMap, edgeServerMap); + + std::string const baseUrl = Utils::baseUrl(_vocbaseGuard.vocbase()); + _globalSuperstep = 0; _state = ExecutionState::RUNNING; + _dbServerCount = vertexServerMap.size(); + _responseCount = 0; + _doneCount = 0; + if (vertexServerMap.size() != edgeServerMap.size()) { + THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER, + "Vertex and edge collections are not shared correctly"); + } - string coordinatorId = ServerState::instance()->getId(); + std::string coordinatorId = ServerState::instance()->getId(); LOG(INFO) << "My id: " << coordinatorId; - - VPackBuilder b; - b.openObject(); - b.add(Utils::executionNumberKey, VPackValue(_executionNumber)); - b.add(Utils::coordinatorIdKey, VPackValue(coordinatorId)); - b.add(Utils::vertexCollectionKey, VPackValue(_vertexCollection)); - b.add(Utils::edgeCollectionKey, VPackValue(_edgeCollection)); - b.add(Utils::globalSuperstepKey, VPackValue(0)); - b.add(Utils::algorithmKey, VPackValue(_algorithm)); - b.close(); - - sendToAllShards(Utils::nextGSSPath, b.slice()); + std::vector requests; + for (auto const &it : vertexServerMap) { + VPackBuilder b; + b.openObject(); + b.add(Utils::executionNumberKey, VPackValue(_executionNumber)); + b.add(Utils::coordinatorIdKey, VPackValue(coordinatorId)); + b.add(Utils::vertexCollectionKey, VPackValue(_vertexCollection->name())); + b.add(Utils::vertexShardsListKey, VPackValue(VPackValueType::Array)); + for (ShardID const &vit : it.second) { + b.add(VPackValue(vit)); + } + b.close(); + b.add(Utils::edgeShardsListKey, VPackValue(VPackValueType::Array)); + for (ShardID const &eit : edgeServerMap[it.first]) { + b.add(VPackValue(eit)); + } + b.close(); + //b.add(Utils::vertexCollectionKey, VPackValue(_vertexCollection)); + //b.add(Utils::edgeCollectionKey, VPackValue(_edgeCollection)); + b.add(Utils::globalSuperstepKey, VPackValue(0)); + b.add(Utils::algorithmKey, VPackValue(_algorithm)); + b.close(); + + auto body = std::make_shared(b.toJson()); + requests.emplace_back("server:" + it.first, rest::RequestType::POST, baseUrl+Utils::nextGSSPath, body); + } + size_t nrDone = 0; + cc->performRequests(requests, 120.0, nrDone, LogTopic("Pregel Conductor")); + LOG(INFO) << "Send messages to " << nrDone << " shards of " << _vertexCollection->name(); + // look at results + printResults(requests); } void Conductor::finishedGlobalStep(VPackSlice &data) { - MUTEX_LOCKER(locker, writeMutex); + MUTEX_LOCKER(locker, _finishedGSSMutex); LOG(INFO) << "Conductor received finished callback"; if (_state != ExecutionState::RUNNING) { - LOG(WARN) << "Conductor did not expect another finishedGlobalStep()"; + LOG(WARN) << "Conductor did not expect another finishedGlobalStep call"; return; } _responseCount++; - if (_responseCount >= _dbServerCount) { + VPackSlice isDone = data.get(Utils::doneKey); + if (isDone.isBool() && isDone.getBool()) { + _doneCount++; + } + + if (_responseCount == _dbServerCount) { + LOG(INFO) << "Finished gss " << _globalSuperstep; _globalSuperstep++; - if (_globalSuperstep >= 25) { - LOG(INFO) << "We did 25 rounds"; + std::string baseUrl = Utils::baseUrl(_vocbaseGuard.vocbase()); + if (_doneCount == _dbServerCount || _globalSuperstep >= 25) { + LOG(INFO) << "Done. We did " << _globalSuperstep << " rounds"; VPackBuilder b; b.openObject(); b.add(Utils::executionNumberKey, VPackValue(_executionNumber)); b.add(Utils::globalSuperstepKey, VPackValue(_globalSuperstep)); b.close(); - sendToAllShards(Utils::writeResultsPath, b.slice()); + sendToAllShards(baseUrl+Utils::writeResultsPath, b.slice()); _state = ExecutionState::FINISHED; - } else { + + } else {// trigger next superstep VPackBuilder b; b.openObject(); b.add(Utils::executionNumberKey, VPackValue(_executionNumber)); b.add(Utils::globalSuperstepKey, VPackValue(_globalSuperstep)); b.close(); - sendToAllShards(Utils::nextGSSPath, b.slice()); - LOG(INFO) << "Conductor started new gss\n"; + sendToAllShards(baseUrl+Utils::nextGSSPath, b.slice()); + LOG(INFO) << "Conductor started new gss " << _globalSuperstep; } } } @@ -115,33 +164,50 @@ void Conductor::cancel() { } int Conductor::sendToAllShards(std::string path, VPackSlice const& config) { - - ClusterInfo* ci = ClusterInfo::instance(); ClusterComm* cc = ClusterComm::instance(); - //CoordTransactionID coordTransactionID = TRI_NewTickServer(); + std::unordered_map> vertexServerMap, edgeServerMap; + resolveWorkerServers(vertexServerMap, edgeServerMap); - - - shared_ptr> shardIDs = ci->getShardList(_vertexCollectionID);// ->getCurrentDBServers(); - _dbServerCount = shardIDs->size(); + _dbServerCount = vertexServerMap.size(); _responseCount = 0; + _doneCount = 0; - if (shardIDs->size() == 0) { - LOG(WARN) << "No shards registered for " << _vertexCollection; + if (_dbServerCount == 0) { + LOG(WARN) << "No shards registered for " << _vertexCollection->name(); return TRI_ERROR_FAILED; } auto body = std::make_shared(config.toJson()); - vector requests; - for (auto it = shardIDs->begin(); it != shardIDs->end(); ++it) { - requests.emplace_back("shard:" + *it, rest::RequestType::POST, path, body); + std::vector requests; + for (auto const &it : vertexServerMap) { + requests.emplace_back("server:" + it.first, rest::RequestType::POST, path, body); } - - LOG(INFO) << "Constructed requests"; size_t nrDone = 0; cc->performRequests(requests, 120.0, nrDone, LogTopic("Pregel Conductor")); - LOG(INFO) << "Send messages to " << nrDone << " shards of " << _vertexCollection; - + LOG(INFO) << "Send messages to " << nrDone << " shards of " << _vertexCollection->name(); + printResults(requests); + return TRI_ERROR_NO_ERROR; } + +void Conductor::resolveWorkerServers(std::unordered_map> &vertexServerMap, + std::unordered_map> &edgeServerMap) { + ClusterInfo* ci = ClusterInfo::instance(); + std::shared_ptr> vertexShardIDs = ci->getShardList(_vertexCollection->cid_as_string()); + std::shared_ptr> edgeShardIDs = ci->getShardList(_edgeCollection->cid_as_string()); + + for (auto const &shard : *vertexShardIDs) { + std::shared_ptr> servers = ci->getResponsibleServer(shard); + if (servers->size() > 0) { + vertexServerMap[(*servers)[0]].push_back(shard); + } + } + for (auto const &shard : *edgeShardIDs) { + std::shared_ptr> servers = ci->getResponsibleServer(shard); + if (servers->size() > 0) { + edgeServerMap[(*servers)[0]].push_back(shard); + } + } +} + diff --git a/arangod/Pregel/Conductor.h b/arangod/Pregel/Conductor.h index 4c627c1230..552fe13136 100755 --- a/arangod/Pregel/Conductor.h +++ b/arangod/Pregel/Conductor.h @@ -40,11 +40,10 @@ namespace pregel { class Conductor { public: - Conductor(int32_t executionNumber, - //TRI_vocbase_t *vocbase, - std::string const&vertexCollection, - CollectionID vertexCollectionID, - std::string const& edgeCollection, + Conductor(unsigned int executionNumber, + TRI_vocbase_t *vocbase, + std::shared_ptr vertexCollection, + std::shared_ptr edgeCollection, std::string const& algorithm); void start(); @@ -54,20 +53,24 @@ namespace pregel { ExecutionState getState() {return _state;} private: - Mutex writeMutex;// prevents concurrent calls to finishedGlobalStep - - int _executionNumber; - int _globalSuperstep; + Mutex _finishedGSSMutex;// prevents concurrent calls to finishedGlobalStep + VocbaseGuard _vocbaseGuard; + const unsigned int _executionNumber; + + unsigned int _globalSuperstep; int32_t _dbServerCount = 0; int32_t _responseCount = 0; - - //TRI_vocbase_t *_vocbase; - std::string _vertexCollection, _edgeCollection; + int32_t _doneCount = 0; + + std::shared_ptr _vertexCollection, _edgeCollection; CollectionID _vertexCollectionID; std::string _algorithm; ExecutionState _state = ExecutionState::RUNNING; + // convenience + void resolveWorkerServers(std::unordered_map> &vertexServerMap, + std::unordered_map> &edgeServerMap); int sendToAllShards(std::string url, VPackSlice const& body); }; } diff --git a/arangod/Pregel/InMessageCache.cpp b/arangod/Pregel/InMessageCache.cpp index 18c5ba291f..55b25d869f 100755 --- a/arangod/Pregel/InMessageCache.cpp +++ b/arangod/Pregel/InMessageCache.cpp @@ -25,6 +25,7 @@ #include "Basics/MutexLocker.h" #include "Basics/StaticStrings.h" +#include "Basics/VelocyPackHelper.h" #include #include @@ -87,9 +88,11 @@ void InMessageCache::addMessages(VPackArrayIterator incomingMessages) { } } -VPackArrayIterator InMessageCache::getMessages(std::string const& vertexId) { +VPackSlice InMessageCache::getMessages(std::string const& vertexId) { LOG(INFO) << "Querying messages from in queue\n"; auto vmsg = _messages.find(vertexId); - if (vmsg != _messages.end()) return VPackArrayIterator(vmsg->second->slice()); - else return VPackArrayIterator(VPackSlice()); + if (vmsg != _messages.end()) { + return vmsg->second->slice(); + } + else return VPackSlice(); } diff --git a/arangod/Pregel/InMessageCache.h b/arangod/Pregel/InMessageCache.h index 650705627a..ee421e26a5 100755 --- a/arangod/Pregel/InMessageCache.h +++ b/arangod/Pregel/InMessageCache.h @@ -45,7 +45,7 @@ public: ~InMessageCache(); void addMessages(VPackArrayIterator messages); - arangodb::velocypack::ArrayIterator getMessages(ShardID const& shardId); + VPackSlice getMessages(ShardID const& shardId); void clean(); private: diff --git a/arangod/Pregel/OutMessageCache.cpp b/arangod/Pregel/OutMessageCache.cpp index 692494c4f3..56f8565c85 100755 --- a/arangod/Pregel/OutMessageCache.cpp +++ b/arangod/Pregel/OutMessageCache.cpp @@ -32,7 +32,7 @@ using namespace arangodb; using namespace arangodb::pregel; -OutMessageCache::OutMessageCache(CollectionID &vertexCollection) : _collection(vertexCollection) { +OutMessageCache::OutMessageCache(CollectionID &vertexCollection, std::string baseUrl) : _collection(vertexCollection) { _ci = ClusterInfo::instance(); auto shardMap = _ci->getShardList(vertexCollection); @@ -72,10 +72,10 @@ void OutMessageCache::addMessage(std::string key, VPackSlice slice) { int res = _ci->getResponsibleShard(_collection, keyDoc.slice(), true, responsibleShard, usesDefaultShardingAttributes); if (res != TRI_ERROR_NO_ERROR) { - THROW_ARANGO_EXCEPTION(res); + THROW_ARANGO_EXCEPTION_MESSAGE(res, "OutMessageCache could not resolve the responsible shard"); } TRI_ASSERT(usesDefaultShardingAttributes);// should be true anyway - + //std::unordered_map vertexMap =; auto it = _map[responsibleShard].find(key); if (it != _map[responsibleShard].end()) {// more than one message @@ -96,54 +96,17 @@ void OutMessageCache::addMessage(std::string key, VPackSlice slice) { b.release(); } } -/* -void OutMessageCache::addMessages(VPackArrayIterator incomingMessages) { - - //unordered_map> messageBucket; - //VPackSlice messages = data.get(Utils::messagesKey); - for (auto const &it : incomingMessages) { - std::string vertexId = it.get(StaticStrings::ToString).copyString(); - - auto vmsg = _messages.find(vertexId); - if (vmsg != _messages.end()) { - - // if no combiner - // vmsg->add(it.slice()) - - // TODO do not hardcode combiner - int64_t old = vmsg->second->slice().get("value").getInt(); - int64_t nw = it.get("value").getInt(); - if (nw < old) { - vmsg->second->clear(); - vmsg->second->add(it); - } - } else { - // assuming we have a combiner - std::unique_ptr b(new VPackBuilder()); - b->add(it); - _messages[vertexId] = b.get(); - b.release(); - - // if no combiner - // VPackBuilder *arr = new VPackBuilder(it); - // arr->openArray(); - // arr->add(it) - // _messages[vertexId] = arr; - } - } -}*/ void OutMessageCache::getMessages(ShardID const& shardId, VPackBuilder &outBuilder) { auto shardIt = _map.find(shardId); + outBuilder.openArray(); if (shardIt != _map.end()) { //auto vertices = *shardIt; - outBuilder.openArray(); for (auto messagesPair : shardIt->second) { outBuilder.add(VPackArrayIterator(messagesPair.second->slice())); } - outBuilder.close(); //return ArrayIterator(vmsg->second->slice()) - } + outBuilder.close(); //else return VPackSlice(); } diff --git a/arangod/Pregel/OutMessageCache.h b/arangod/Pregel/OutMessageCache.h index 87c0f59657..f4a59b801a 100755 --- a/arangod/Pregel/OutMessageCache.h +++ b/arangod/Pregel/OutMessageCache.h @@ -40,19 +40,23 @@ namespace pregel { processing */ class OutMessageCache { public: - OutMessageCache(CollectionID &vertexCollection); + OutMessageCache(CollectionID &vertexCollection, std::string baseUrl); ~OutMessageCache(); void addMessage(std::string key, VPackSlice slice); //void addMessages(VPackArrayIterator messages); void getMessages(ShardID const& shardId, VPackBuilder &outBuilder); + + void sendMessages(); void clean(); + private: // two stage map: shard -> vertice -> message std::unordered_map> _map; ClusterInfo *_ci; CollectionID _collection; + size_t _numVertices; }; }} diff --git a/arangod/Pregel/Utils.cpp b/arangod/Pregel/Utils.cpp index 1bb13f5298..132152b823 100755 --- a/arangod/Pregel/Utils.cpp +++ b/arangod/Pregel/Utils.cpp @@ -21,20 +21,31 @@ //////////////////////////////////////////////////////////////////////////////// #include "Utils.h" +#include "Basics/StringUtils.h" +#include "VocBase/vocbase.h" using namespace arangodb::pregel; -std::string const Utils::nextGSSPath = "/_api/pregel/nextGSS"; -std::string const Utils::finishedGSSPath = "/_api/pregel/finishedGSS"; -std::string const Utils::messagesPath = "/_api/pregel/messages"; -std::string const Utils::writeResultsPath = "/_api/pregel/writeResults"; +std::string const Utils::apiPrefix = "/_api/pregel/"; + + +std::string const Utils::nextGSSPath = "nextGSS"; +std::string const Utils::finishedGSSPath = "finishedGSS"; +std::string const Utils::messagesPath = "messages"; +std::string const Utils::writeResultsPath = "writeResults"; std::string const Utils::executionNumberKey = "extn"; -std::string const Utils::vertexCollectionKey = "vxcln"; -std::string const Utils::edgeCollectionKey = "ecnln"; +std::string const Utils::vertexCollectionKey = "vertexCollection"; +std::string const Utils::vertexShardsListKey = "vertexShards"; +std::string const Utils::edgeShardsListKey = "edgeShards"; +std::string const Utils::resultShardKey = "resultShard"; std::string const Utils::coordinatorIdKey = "coordinatorId"; std::string const Utils::algorithmKey = "algorithm"; std::string const Utils::globalSuperstepKey = "gss"; std::string const Utils::messagesKey = "msgs"; std::string const Utils::senderKey = "sender"; std::string const Utils::doneKey = "done"; + +std::string Utils::baseUrl(TRI_vocbase_t *vocbase) { + return "/_db/" + basics::StringUtils::urlEncode(vocbase->name()) + Utils::apiPrefix; +} diff --git a/arangod/Pregel/Utils.h b/arangod/Pregel/Utils.h index c737dc70c0..0f2a30598f 100755 --- a/arangod/Pregel/Utils.h +++ b/arangod/Pregel/Utils.h @@ -25,13 +25,19 @@ #include "Basics/Common.h" + +struct TRI_vocbase_t; namespace arangodb { + namespace pregel { + class Utils { Utils() = delete; public: // constants + static std::string const apiPrefix; + static std::string const nextGSSPath; static std::string const finishedGSSPath; static std::string const messagesPath; @@ -39,7 +45,9 @@ namespace pregel { static std::string const executionNumberKey; static std::string const vertexCollectionKey; - static std::string const edgeCollectionKey; + static std::string const vertexShardsListKey; + static std::string const edgeShardsListKey; + static std::string const resultShardKey; static std::string const algorithmKey; static std::string const coordinatorIdKey; @@ -47,6 +55,8 @@ namespace pregel { static std::string const messagesKey; static std::string const senderKey; static std::string const doneKey; + + static std::string baseUrl(TRI_vocbase_t *vocbase); }; } diff --git a/arangod/Pregel/Vertex.cpp b/arangod/Pregel/Vertex.cpp index 589457ca6e..51bd79519d 100755 --- a/arangod/Pregel/Vertex.cpp +++ b/arangod/Pregel/Vertex.cpp @@ -32,35 +32,39 @@ using namespace arangodb; using namespace arangodb::velocypack; using namespace arangodb::pregel; -Vertex::Vertex(VPackSlice document) { - documentId = document.get(StaticStrings::IdString).copyString(); +Message::Message(VPackSlice slice) { + VPackSlice s = slice.get("value"); + _value = s.getSmallInt() || s.getInt() ? s.getInt() : -1; +} + +Vertex::Vertex(VPackSlice document) : _data(document) { + //documentId = document.get(StaticStrings::IdString).copyString(); VPackSlice s = document.get("value"); - _vertexState = s.getSmallInt() || s.getInt() ? s.getInt() : -1; + _vertexState = s.isInteger() ? s.getInt() : -1; } Vertex::~Vertex() { - for (auto const &it : _edges) { + /*for (auto const &it : _edges) { delete(it); - } + }*/ _edges.clear(); } -void Vertex::compute(int gss, VPackArrayIterator const &messages, OutMessageCache* const cache) { - +void Vertex::compute(int gss, MessageIterator const &messages, OutMessageCache* const cache) { int current = _vertexState; for (auto const &msg : messages) { - int val = msg.getInt(); - if (val < current) current = val; + if (msg._value < current) current = msg._value; } if (current >= 0 && (gss == 0 || current != _vertexState)) { _vertexState = current; for (auto const &edge : _edges) { + VPackSlice toID = edge._data.get(StaticStrings::ToString); VPackBuilder b; b.openObject(); - b.add(StaticStrings::ToString, VPackValue(edge->toId)); - b.add("val", VPackValue(edge->value + current)); + b.add(StaticStrings::ToString, toID); + b.add("value", VPackValue(edge._value + current)); b.close(); - cache->addMessage(edge->toId, b.slice()); + cache->addMessage(toID.copyString(), b.slice()); } } else voteHalt(); } diff --git a/arangod/Pregel/Vertex.h b/arangod/Pregel/Vertex.h index 964b0ce91f..f736bdc5df 100755 --- a/arangod/Pregel/Vertex.h +++ b/arangod/Pregel/Vertex.h @@ -36,40 +36,108 @@ namespace pregel { }; class OutMessageCache; + + struct Message { + Message(VPackSlice slice); + + int _value; // demo + }; + + //template + class MessageIterator { + public: + MessageIterator() = delete; + + typedef MessageIterator iterator; + typedef const MessageIterator const_iterator; + + explicit MessageIterator(VPackSlice slice) : _slice(slice) { + if (_slice.isNull() || _slice.isNone()) _size = 0; + else if (_slice.isArray()) _size = _slice.length(); + else _size = 1; + } + + iterator begin() { + return MessageIterator(_slice); + } + const_iterator begin() const { + return MessageIterator(_slice); + } + iterator end() { + auto it = MessageIterator(_slice); + it._position = it._size; + return it; + } + const_iterator end() const { + auto it = MessageIterator(_slice); + it._position = it._size; + return it; + } + Message operator*() const { + if (_slice.isArray()) { + return Message(_slice.at(_position)); + } else { + return Message(_slice); + } + } + + // prefix ++ + MessageIterator& operator++() { + ++_position; + return *this; + } + + // postfix ++ + MessageIterator operator++(int) { + MessageIterator result(*this); + ++(*this); + return result; + } + + bool operator!=(MessageIterator const& other) const { + return _position != other._position; + } + + size_t size() const { + return _size; + } + + private: + VPackSlice _slice; + size_t _position = 0; + size_t _size = 1; + }; struct Edge { - public: - std::string edgeId; - std::string toId; - - int value;// demo + Edge(VPackSlice data) : _data(data) {} + VPackSlice _data; + + //protected: virtual void loadData() = 0; + + int _value;// demo }; class Vertex { friend class Worker; + friend class WorkerJob; + public: //typedef std::iterator MessageIterator; Vertex(VPackSlice document); ~Vertex(); - void compute(int gss, VPackArrayIterator const &messages, OutMessageCache* const cache); + void compute(int gss, MessageIterator const &messages, OutMessageCache* const cache); VertexActivationState state() {return _activationState;} - //std::vector messages() {return _messages;} - - std::vector _edges; - std::string documentId; + std::vector _edges; protected: void voteHalt() {_activationState = VertexActivationState::STOPPED;} - //void sendMessage(VPackBuilder &message) {_messages.push_back(message.slice());} + int _vertexState;// demo + VPackSlice _data; private: VertexActivationState _activationState; - //std::vector _messages; - - int _vertexState;// demo }; - } } #endif diff --git a/arangod/Pregel/Worker.cpp b/arangod/Pregel/Worker.cpp index bc57cda117..d943ed1bcd 100755 --- a/arangod/Pregel/Worker.cpp +++ b/arangod/Pregel/Worker.cpp @@ -26,15 +26,21 @@ #include "InMessageCache.h" #include "OutMessageCache.h" +#include "Basics/MutexLocker.h" #include "Cluster/ClusterInfo.h" #include "Cluster/ClusterComm.h" #include "VocBase/ticks.h" #include "VocBase/vocbase.h" +#include "VocBase/LogicalCollection.h" +#include "VocBase/EdgeCollectionInfo.h" + #include "Indexes/Index.h" #include "Dispatcher/DispatcherQueue.h" #include "Dispatcher/DispatcherFeature.h" +#include "Utils/Transaction.h" #include "Utils/SingleCollectionTransaction.h" #include "Utils/StandaloneTransactionContext.h" +#include "Utils/OperationCursor.h" #include "Indexes/EdgeIndex.h" #include @@ -45,113 +51,121 @@ using namespace arangodb; using namespace arangodb::pregel; -Worker::Worker(int executionNumber, +Worker::Worker(unsigned int executionNumber, TRI_vocbase_t *vocbase, - VPackSlice s) : _vocbaseGuard(vocbase), _executionNumber(executionNumber) { + VPackSlice s) : _vocbase(vocbase), _executionNumber(executionNumber) { //VPackSlice algo = s.get("algo"); - _vertexCollection = s.get(Utils::vertexCollectionKey).copyString(); - _edgeCollection = s.get(Utils::edgeCollectionKey).copyString(); - LOG(INFO) << "starting worker with (" << _vertexCollection << ", " << _edgeCollection << ")"; + + VPackSlice coordID = s.get(Utils::coordinatorIdKey); + VPackSlice vertexShardIDs = s.get(Utils::vertexShardsListKey); + VPackSlice edgeShardIDs = s.get(Utils::edgeShardsListKey); + // TODO support more shards + if (!(coordID.isString() && vertexShardIDs.length() == 1 && edgeShardIDs.length() == 1)) { + THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER, "Only one shard per collection supported"); + } + _coordinatorId = coordID.copyString(); + _vertexCollectionName = s.get(Utils::vertexCollectionKey).copyString();// readable name of collection + _vertexShardID = vertexShardIDs.at(0).copyString(); + _edgeShardID = edgeShardIDs.at(0).copyString(); + LOG(INFO) << "Received collection " << _vertexCollectionName; + LOG(INFO) << "starting worker with (" << _vertexShardID << ", " << _edgeShardID << ")"; _cache1 = new InMessageCache(); _cache2 = new InMessageCache(); _currentCache = _cache1; - SingleCollectionTransaction trx(StandaloneTransactionContext::Create(vocbase), - _vertexCollection, TRI_TRANSACTION_READ); - int res = trx.begin(); - + SingleCollectionTransaction *trx = new SingleCollectionTransaction(StandaloneTransactionContext::Create(_vocbase), + _vertexShardID, TRI_TRANSACTION_READ); + int res = trx->begin(); if (res != TRI_ERROR_NO_ERROR) { THROW_ARANGO_EXCEPTION_FORMAT(res, "while looking up vertices '%s'", - _vertexCollection.c_str()); + _vertexShardID.c_str()); return; } + // resolve planId + _vertexCollectionPlanId = trx->documentCollection()->planId_as_string(); - OperationResult result = trx.all(_vertexCollection, 0, UINT64_MAX, OperationOptions()); + OperationResult result = trx->all(_vertexShardID, 0, UINT64_MAX, OperationOptions()); // Commit or abort. - res = trx.finish(result.code); - + res = trx->finish(result.code); if (!result.successful()) { - THROW_ARANGO_EXCEPTION_FORMAT(result.code, "while looking up graph '%s'", - _vertexCollection.c_str()); + THROW_ARANGO_EXCEPTION_FORMAT(result.code, "while looking up graph '%s'", _vertexCollectionName.c_str()); } if (res != TRI_ERROR_NO_ERROR) { - THROW_ARANGO_EXCEPTION_FORMAT(res, "while looking up graph '%s'", - _vertexCollection.c_str()); + THROW_ARANGO_EXCEPTION_FORMAT(res, "while looking up graph '%s'", _vertexCollectionName.c_str()); } VPackSlice vertices = result.slice(); if (vertices.isExternal()) { vertices = vertices.resolveExternal(); } + _transactions.push_back(trx);// store transactions, otherwise VPackSlices become invalid - SingleCollectionTransaction trx2(StandaloneTransactionContext::Create(vocbase), - _edgeCollection, TRI_TRANSACTION_READ); - res = trx2.begin(); + // ======= Look up edges + trx = new SingleCollectionTransaction(StandaloneTransactionContext::Create(_vocbase), + _edgeShardID, TRI_TRANSACTION_READ); + res = trx->begin(); if (res != TRI_ERROR_NO_ERROR) { - THROW_ARANGO_EXCEPTION_FORMAT(res, "while looking up edges '%s'", - _vertexCollection.c_str()); - return; + THROW_ARANGO_EXCEPTION_FORMAT(res, "while looking up edges '%s'", _edgeShardID.c_str()); } + _transactions.push_back(trx); - /*OperationResult result2 = trx2.all(edgeCollection, 0, UINT64_MAX, OperationOptions()); - // Commit or abort. - res = trx.finish(result.code); + auto info = std::make_unique(trx, _edgeShardID, TRI_EDGE_OUT, + StaticStrings::FromString, 0); - if (!result2.successful() || res != TRI_ERROR_NO_ERROR) { - THROW_ARANGO_EXCEPTION_FORMAT(result2.code, "while looking up graph '%s'", - edgeCollection.c_str()); - } - VPackSlice edges = result.slice(); - if (edges.isExternal()) { - edges = vertices.resolveExternal(); - }*/ - - SingleCollectionTransaction::IndexHandle handle = trx2.edgeIndexHandle(_edgeCollection); - if (handle.isEdgeIndex()) { - THROW_ARANGO_EXCEPTION(TRI_ERROR_BAD_PARAMETER); - } - - std::shared_ptr edgeIndexPtr = handle.getIndex(); - EdgeIndex* edgeIndex = static_cast(edgeIndexPtr.get()); - - - /*map> sortBucket;// TODO hash_map ? - for (auto const& it : velocypack::ArrayIterator(edges)) { - Edge *e = new Edge(); - e->edgeId = it.get(StaticStrings::IdString).copyString(); - e->toId = it.get(StaticStrings::ToString).copyString(); - e->value = it.get("value").getInt(); - sortBucket[e->toId].push_back(e); - }*/ + size_t edgeCount = 0; VPackArrayIterator arr = VPackArrayIterator(vertices); LOG(INFO) << "Found vertices: " << arr.size(); - for (auto const &it : arr) { - Vertex *v = new Vertex(it); - _vertices[v->documentId] = v; - - //v._edges = sortBucket[v.documentId]; - - TransactionBuilderLeaser b(&trx2); - b->openArray(); - b->add(it.get(StaticStrings::IdString)); - b->add(VPackValue(VPackValueType::Null)); - b->close(); - IndexIterator* vit = edgeIndex->iteratorForSlice(&trx2, nullptr, b->slice(), false); - TRI_doc_mptr_t *edgePack; - while((edgePack = vit->next()) != nullptr) { - VPackSlice s(edgePack->vpack()); - - std::unique_ptr e(new Edge()); - e->edgeId = it.get(StaticStrings::IdString).copyString(); - e->toId = it.get(StaticStrings::ToString).copyString(); - VPackSlice i = it.get("value"); - e->value = i.isSmallInt() || i.isInt() ? i.getInt() : 1; - v->_edges.push_back(e.get()); - e.release(); + for (auto it : arr) { + LOG(INFO) << it.toJson(); + if (it.isExternal()) { + it = it.resolveExternal(); } + + std::string vertexId = it.get(StaticStrings::KeyString).copyString(); + std::unique_ptr v (new Vertex(it)); + _vertices[vertexId] = v.get(); + + std::string key = _vertexCollectionName+"/"+vertexId;// TODO geht das schneller + LOG(INFO) << "Retrieving edge " << key; + + auto cursor = info->getEdges(key); + if (cursor->failed()) { + THROW_ARANGO_EXCEPTION_FORMAT(cursor->code, "while looking up edges '%s' from %s", + key.c_str(), _edgeShardID.c_str()); + } + + std::vector result; + result.reserve(1000); + while (cursor->hasMore()) { + cursor->getMoreMptr(result, 1000); + for (auto const& mptr : result) { + + VPackSlice s(mptr->vpack()); + if (s.isExternal()) { + s = s.resolveExternal(); + } + LOG(INFO) << s.toJson(); + + VPackSlice i = s.get("value"); + v->_edges.emplace_back(s); + v->_edges.end()->_value = i.isInteger() ? i.getInt() : 1; + edgeCount++; + + } + } + LOG(INFO) << "done retrieving edge"; + + v.release(); } + trx->finish(res); + if (res != TRI_ERROR_NO_ERROR) { + THROW_ARANGO_EXCEPTION_FORMAT(res, "after looking up edges '%s'", _edgeShardID.c_str()); + } + + LOG(INFO) << "Resolved " << _vertices.size() << " vertices"; + LOG(INFO) << "Resolved " << edgeCount << " edges"; } Worker::~Worker() { @@ -159,33 +173,38 @@ Worker::~Worker() { delete(it.second); } _vertices.clear(); + for (auto const &it : _transactions) {// clean transactions + delete(it); + } + _transactions.clear(); } void Worker::nextGlobalStep(VPackSlice data) { + LOG(INFO) << "Called next global step: " << data.toString(); + // TODO do some work? VPackSlice gssSlice = data.get(Utils::globalSuperstepKey); - if (!gssSlice.isInt()) { - THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER, "Invalid gss in body"); - } - - int64_t gss = gssSlice.getInt(); - if (gss == 0) { - VPackSlice cid = data.get(Utils::coordinatorIdKey); - //VPackSlice algo = data.get(Utils::algorithmKey); - if (cid.isString()) - _coordinatorId = cid.copyString(); + if (!gssSlice.isInteger()) { + THROW_ARANGO_EXCEPTION_FORMAT(TRI_ERROR_BAD_PARAMETER, + "Invalid gss in %s:%d", __FILE__, __LINE__); } + uint64_t gss = gssSlice.getUInt(); _globalSuperstep = gss; InMessageCache *inCache = _currentCache; - if (_currentCache == _cache1) _currentCache = _cache2; - else _currentCache = _cache1; + if (_currentCache == _cache1) { + _currentCache = _cache2; + } else { + _currentCache = _cache1; + } std::unique_ptr job(new WorkerJob(this, inCache)); DispatcherFeature::DISPATCHER->addJob(job, false); - LOG(INFO) << "Worker started new gss computation\n"; + LOG(INFO) << "Worker started new gss computation: " << gss; } void Worker::receivedMessages(VPackSlice data) { + MUTEX_LOCKER(locker, _messagesMutex); + VPackSlice gssSlice = data.get(Utils::globalSuperstepKey); VPackSlice messageSlice = data.get(Utils::messagesKey); if (!gssSlice.isInt() || !messageSlice.isArray()) { @@ -202,47 +221,53 @@ void Worker::receivedMessages(VPackSlice data) { void Worker::writeResults() { - SingleCollectionTransaction trx(StandaloneTransactionContext::Create(_vocbaseGuard.vocbase()), + /*SingleCollectionTransaction trx(StandaloneTransactionContext::Create(_vocbaseGuard.vocbase()), _vertexCollection, TRI_TRANSACTION_WRITE); int res = trx.begin(); if (res != TRI_ERROR_NO_ERROR) { LOG(ERR) << "cannot start transaction to load authentication"; return; - } + }*/ OperationResult result; OperationOptions options; options.waitForSync = false; options.mergeObjects = true; for (auto const &pair : _vertices) { - TransactionBuilderLeaser b(&trx); - b->openObject(); - b->add(StaticStrings::KeyString, VPackValue(pair.second->documentId)); - b->add("value", VPackValue(pair.second->_vertexState)); - b->close(); - result = trx.update(_vertexCollection, b->slice(), options); + //TransactionBuilderLeaser b(&trx); + VPackBuilder b; + b.openObject(); + b.add(StaticStrings::KeyString, pair.second->_data.get(StaticStrings::KeyString)); + b.add("value", VPackValue(pair.second->_vertexState)); + b.close(); + LOG(INFO) << b.toString(); + /*result = trx.update(_vertexCollection, b->slice(), options); if (!result.successful()) { THROW_ARANGO_EXCEPTION_FORMAT(result.code, "while looking up graph '%s'", _vertexCollection.c_str()); - } + }*/ } // Commit or abort. - res = trx.finish(result.code); + /*res = trx.finish(result.code); if (res != TRI_ERROR_NO_ERROR) { THROW_ARANGO_EXCEPTION_FORMAT(res, "while looking up graph '%s'", _vertexCollection.c_str()); + }*/ +} + + +// ========== WorkerJob ========== + +static void waitForResults(std::vector &requests) { + for (auto const& req : requests) { + auto& res = req.result; + if (res.status == CL_COMM_RECEIVED) { + LOG(INFO) << res.answer->payload().toJson(); + } } } -// ========== WorkerJob - -/* -V8Job::~V8Job() { - if (_task != nullptr) { - V8PeriodicTask::jobDone(_task); - } -}*/ WorkerJob::WorkerJob(Worker *worker, InMessageCache *inCache) : Job("Pregel Job"), _worker(worker), _inCache(inCache) { } @@ -252,10 +277,10 @@ void WorkerJob::work() { return; } LOG(INFO) << "Worker job started\n"; - - OutMessageCache outCache(_worker->_vertexCollection); - - int64_t gss = _worker->_globalSuperstep; + // TODO cache this + OutMessageCache outCache(_worker->_vertexCollectionPlanId); + + unsigned int gss = _worker->_globalSuperstep; bool isDone = true; if (gss == 0) { @@ -263,62 +288,83 @@ void WorkerJob::work() { for (auto const &it : _worker->_vertices) { Vertex *v = it.second; - VPackArrayIterator messages = _inCache->getMessages(v->documentId); - v->compute(gss, messages, &outCache); - _worker->_activationMap[it.first] = v->state() == VertexActivationState::ACTIVE; + std::string key = v->_data.get(StaticStrings::KeyString).copyString(); + + VPackSlice messages = _inCache->getMessages(key); + v->compute(gss, MessageIterator(messages), &outCache); + bool active = v->state() == VertexActivationState::ACTIVE; + if (!active) LOG(INFO) << "vertex has halted"; + _worker->_activationMap[it.first] = active; } } else { for (auto &it : _worker->_activationMap) { - VPackArrayIterator messages = _inCache->getMessages(it.first); - if (messages.size() > 0 || it.second) { + VPackSlice messages = _inCache->getMessages(it.first); + MessageIterator iterator(messages); + if (iterator.size() > 0 || it.second) { isDone = false; Vertex *v = _worker->_vertices[it.first]; - v->compute(gss, messages, &outCache); + v->compute(gss, iterator, &outCache); it.second = v->state() == VertexActivationState::ACTIVE; } } } - LOG(INFO) << "Worker job computations done, now distributing results\n"; + LOG(INFO) << "Worker job computations done"; if (_canceled) { return; } - // send messages to other shards + // ==================== send messages to other shards ==================== ClusterComm *cc = ClusterComm::instance(); ClusterInfo *ci = ClusterInfo::instance(); + std::string baseUrl = Utils::baseUrl(_worker->_vocbase); if (!isDone) { - std::shared_ptr> shards = ci->getShardList(_worker->_vertexCollection); + LOG(INFO) << "Sending messages to shards"; + std::shared_ptr> shards = ci->getShardList(_worker->_vertexCollectionPlanId); + LOG(INFO) << "Seeing shards: " << shards->size(); + std::vector requests; - for (auto it = shards->begin(); it != shards->end(); ++it) { - VPackBuilder package; - package.add(Utils::senderKey, VPackValue(ServerState::instance()->getId())); - package.add(Utils::executionNumberKey, VPackValue(_worker->_executionNumber)); - package.add(Utils::globalSuperstepKey, VPackValue(gss+1)); - VPackBuilder messages; - outCache.getMessages(*it, messages); - package.add(Utils::messagesKey, messages.slice()); - package.close(); + for (auto const &it : *shards) { - auto body = std::make_shared(package.toJson()); - requests.emplace_back("shard:" + *it, rest::RequestType::POST, Utils::messagesPath, body); + VPackBuilder messages; + outCache.getMessages(it, messages); + if (_worker->_vertexShardID == it) { + LOG(INFO) << "Worker: Getting messages for myself"; + _worker->_currentCache->addMessages(VPackArrayIterator(messages.slice())); + } else { + LOG(INFO) << "Worker: Sending messages for shard " << it; + + VPackBuilder package; + package.openObject(); + package.add(Utils::senderKey, VPackValue(ServerState::instance()->getId())); + package.add(Utils::executionNumberKey, VPackValue(_worker->_executionNumber)); + package.add(Utils::globalSuperstepKey, VPackValue(gss+1)); + package.add(Utils::messagesKey, messages.slice()); + package.close(); + // add a request + auto body = std::make_shared(package.toJson()); + requests.emplace_back("shard:" + it, rest::RequestType::POST, baseUrl + Utils::messagesPath, body); + } } size_t nrDone = 0; cc->performRequests(requests, 120, nrDone, LogTopic("Pregel message transfer")); + waitForResults(requests); } else { LOG(INFO) << "Worker job has nothing more to process\n"; } // notify the conductor that we are done. VPackBuilder package; + package.openObject(); package.add(Utils::senderKey, VPackValue(ServerState::instance()->getId())); package.add(Utils::executionNumberKey, VPackValue(_worker->_executionNumber)); - package.add(Utils::doneKey, VPackValue(isDone)); + if (!isDone) package.add(Utils::doneKey, VPackValue(isDone)); package.close(); - // TODO handle communication failure + LOG(INFO) << "Sending finishedGSS to coordinator: " << _worker->_coordinatorId; + // TODO handle communication failures? CoordTransactionID coordinatorTransactionID = TRI_NewTickServer(); auto headers = std::make_unique>(); @@ -326,10 +372,10 @@ void WorkerJob::work() { cc->asyncRequest("", coordinatorTransactionID, "server:"+_worker->_coordinatorId, rest::RequestType::POST, - Utils::finishedGSSPath, + baseUrl + Utils::finishedGSSPath, body, headers, nullptr, 90.0); - LOG(INFO) << "Worker job finished sending results\n"; + LOG(INFO) << "Worker job finished sending stuff"; } bool WorkerJob::cancel() { diff --git a/arangod/Pregel/Worker.h b/arangod/Pregel/Worker.h index 3c275ac5e3..e8512d259c 100755 --- a/arangod/Pregel/Worker.h +++ b/arangod/Pregel/Worker.h @@ -24,12 +24,14 @@ #define ARANGODB_PREGEL_WORKER_H 1 #include "Basics/Common.h" +#include "Basics/Mutex.h" #include "VocBase/vocbase.h" #include "Scheduler/Task.h" #include "Cluster/ClusterInfo.h" #include "Dispatcher/Job.h" namespace arangodb { + class SingleCollectionTransaction; namespace pregel { class Vertex; class InMessageCache; @@ -38,7 +40,7 @@ namespace pregel { class Worker { friend class WorkerJob; public: - Worker(int executionNumber, TRI_vocbase_t *vocbase, VPackSlice s); + Worker(unsigned int executionNumber, TRI_vocbase_t *vocbase, VPackSlice s); ~Worker(); void nextGlobalStep(VPackSlice data);// called by coordinator @@ -47,18 +49,21 @@ namespace pregel { private: /// @brief guard to make sure the database is not dropped while used by us - VocbaseGuard _vocbaseGuard; - - int _executionNumber; - int _globalSuperstep; + TRI_vocbase_t* _vocbase; + Mutex _messagesMutex; + const unsigned int _executionNumber; + + unsigned int _globalSuperstep; std::string _coordinatorId; - std::string _vertexCollection, _edgeCollection; + std::string _vertexCollectionName, _vertexCollectionPlanId; + ShardID _vertexShardID, _edgeShardID; std::unordered_map _vertices; std::map _activationMap; InMessageCache *_cache1, *_cache2; InMessageCache *_currentCache; + std::vector _transactions; }; class WorkerJob : public rest::Job { diff --git a/arangod/RestHandler/RestPregelHandler.cpp b/arangod/RestHandler/RestPregelHandler.cpp index b4d60544ec..6de026bd12 100644 --- a/arangod/RestHandler/RestPregelHandler.cpp +++ b/arangod/RestHandler/RestPregelHandler.cpp @@ -42,72 +42,76 @@ RestPregelHandler::RestPregelHandler(GeneralRequest* request, GeneralResponse* r : RestVocbaseBaseHandler(request, response) {} RestHandler::status RestPregelHandler::execute() { - LOG(INFO) << "Received request\n"; - - bool parseSuccess = true; - std::shared_ptr parsedBody = - parseVelocyPackBody(&VPackOptions::Defaults, parseSuccess); - VPackSlice body(parsedBody->start());// never nullptr - - if (!parseSuccess || !body.isObject()) { - LOG(ERR) << "Bad request body\n"; - generateError(rest::ResponseCode::BAD, - TRI_ERROR_NOT_IMPLEMENTED, "illegal request for /_api/pregel"); - } else if (_request->requestType() == rest::RequestType::POST) { + try { + bool parseSuccess = true; + std::shared_ptr parsedBody = + parseVelocyPackBody(&VPackOptions::Defaults, parseSuccess); + VPackSlice body(parsedBody->start());// never nullptr - std::vector const& suffix = _request->suffix(); - VPackSlice sExecutionNum = body.get(Utils::executionNumberKey); - if (!sExecutionNum.isSmallInt() && !sExecutionNum.isInt()) { - LOG(ERR) << "Invalid execution number"; - generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_HTTP_NOT_FOUND); - return status::DONE; - } - - int executionNumber = sExecutionNum.getInt(); + if (!parseSuccess || !body.isObject()) { + LOG(ERR) << "Bad request body\n"; + generateError(rest::ResponseCode::BAD, + TRI_ERROR_NOT_IMPLEMENTED, "illegal request for /_api/pregel"); + } else if (_request->requestType() == rest::RequestType::POST) { + + std::vector const& suffix = _request->suffix(); + VPackSlice sExecutionNum = body.get(Utils::executionNumberKey); + if (!sExecutionNum.isInteger()) { + LOG(ERR) << "Invalid execution number"; + generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_HTTP_NOT_FOUND); + return status::DONE; + } + + unsigned int executionNumber = sExecutionNum.getUInt(); - if (suffix.size() != 1) { - LOG(ERR) << "Invalid suffix"; - generateError(rest::ResponseCode::NOT_FOUND, - TRI_ERROR_HTTP_NOT_FOUND); - return status::DONE; - } else if (suffix[0] == "finishedGSS") { - LOG(INFO) << "finishedGSS"; - Conductor *exe = JobMapping::instance()->conductor(executionNumber); - if (exe) { - exe->finishedGlobalStep(body); - } else { - LOG(ERR) << "Conductor not found\n"; - } - } else if (suffix[0] == "nextGSS") { - LOG(INFO) << "nextGSS"; - Worker *w = JobMapping::instance()->worker(executionNumber); - if (!w) {// can happen if gss == 0 - LOG(INFO) << "creating worker"; - w = new Worker(executionNumber, _vocbase, body); - JobMapping::instance()->addWorker(w, executionNumber); - } - w->nextGlobalStep(body); - } else if (suffix[0] == "messages") { - LOG(INFO) << "messages"; - Worker *exe = JobMapping::instance()->worker(executionNumber); - if (exe) { - exe->receivedMessages(body); - } - } else if (suffix[0] == "writeResults") { - Worker *exe = JobMapping::instance()->worker(executionNumber); - if (exe) { - exe->writeResults(); + if (suffix.size() != 1) { + LOG(ERR) << "Invalid suffix"; + generateError(rest::ResponseCode::NOT_FOUND, + TRI_ERROR_HTTP_NOT_FOUND); + return status::DONE; + } else if (suffix[0] == "finishedGSS") { + LOG(INFO) << "finishedGSS"; + Conductor *exe = JobMapping::instance()->conductor(executionNumber); + if (exe) { + exe->finishedGlobalStep(body); + } else { + LOG(ERR) << "Conductor not found\n"; + } + } else if (suffix[0] == "nextGSS") { + LOG(INFO) << "nextGSS"; + Worker *w = JobMapping::instance()->worker(executionNumber); + if (!w) {// can happen if gss == 0 + LOG(INFO) << "creating worker"; + w = new Worker(executionNumber, _vocbase, body); + JobMapping::instance()->addWorker(w, executionNumber); + } + w->nextGlobalStep(body); + } else if (suffix[0] == "messages") { + LOG(INFO) << "messages"; + Worker *exe = JobMapping::instance()->worker(executionNumber); + if (exe) { + exe->receivedMessages(body); + } + } else if (suffix[0] == "writeResults") { + Worker *exe = JobMapping::instance()->worker(executionNumber); + if (exe) { + exe->writeResults(); + } } + + VPackBuilder result; + result.add(VPackValue("thanks")); + generateResult(rest::ResponseCode::OK, result.slice()); + + } else { + generateError(rest::ResponseCode::METHOD_NOT_ALLOWED, + TRI_ERROR_NOT_IMPLEMENTED, "illegal method for /_api/pregel"); } - - VPackBuilder result; - result.add(VPackValue("thanks")); - generateResult(rest::ResponseCode::OK, result.slice()); - - } else { - generateError(rest::ResponseCode::METHOD_NOT_ALLOWED, - TRI_ERROR_NOT_IMPLEMENTED, "illegal method for /_api/pregel"); + } catch (std::exception const &e) { + LOG(ERR) << e.what(); + } catch(...) { + LOG(ERR) << "Exception"; } - + return status::DONE; } diff --git a/arangod/V8Server/v8-collection.cpp b/arangod/V8Server/v8-collection.cpp index cac167065f..085becf29b 100644 --- a/arangod/V8Server/v8-collection.cpp +++ b/arangod/V8Server/v8-collection.cpp @@ -1850,23 +1850,27 @@ static void JS_Pregel(v8::FunctionCallbackInfo const& args) { LOG(INFO) << "Called as a controller"; TRI_vocbase_t* vocbase = GetContextVocBase(isolate); - CollectionID vID; + std::shared_ptr vertexColl, edgeColl; try { - std::shared_ptr const ci1 = - ClusterInfo::instance()->getCollection(vocbase->name(), vName); - std::shared_ptr const ci2 = - ClusterInfo::instance()->getCollection(vocbase->name(), eName); - if (ci1->isSystem() || ci2->isSystem()) { + vertexColl = ClusterInfo::instance()->getCollection(vocbase->name(), vName); + edgeColl = ClusterInfo::instance()->getCollection(vocbase->name(), eName); + if (edgeColl->isSystem() || edgeColl->isSystem()) { TRI_V8_THROW_EXCEPTION_USAGE("Cannot use pregel on system collection"); } - vID = ci1->cid_as_string(); + if (!vertexColl->usesDefaultShardKeys()) { + TRI_V8_THROW_EXCEPTION_USAGE("Vertex collection needs to be shared after '_key'"); + } + std::vector eKeys = edgeColl->shardKeys(); + if (eKeys.size() != 1 || eKeys[0] == StaticStrings::FromString) { + TRI_V8_THROW_EXCEPTION_USAGE("Edge collection needs to be sharded after '_from'"); + } //eName = ci2->cid_as_string(); } catch (...) { TRI_V8_THROW_EXCEPTION_USAGE("Collections do not exist"); } result = pregel::JobMapping::instance()->createExecutionNumber(); - pregel::Conductor* e = new pregel::Conductor(result, vName, vID, eName, "todo"); + pregel::Conductor* e = new pregel::Conductor(result, vocbase, vertexColl, edgeColl, "todo"); pregel::JobMapping::instance()->addExecution(e, result); LOG(INFO) << "Starting..."; diff --git a/arangod/VocBase/LogicalCollection.cpp b/arangod/VocBase/LogicalCollection.cpp index f5076d3ca3..10a647b60d 100644 --- a/arangod/VocBase/LogicalCollection.cpp +++ b/arangod/VocBase/LogicalCollection.cpp @@ -684,6 +684,10 @@ TRI_voc_cid_t LogicalCollection::planId() const { return _planId; } +std::string LogicalCollection::planId_as_string() const { + return basics::StringUtils::itoa(_planId); +} + TRI_col_type_e LogicalCollection::type() const { return _type; } diff --git a/arangod/VocBase/LogicalCollection.h b/arangod/VocBase/LogicalCollection.h index fd5cf7c69e..5a31610262 100644 --- a/arangod/VocBase/LogicalCollection.h +++ b/arangod/VocBase/LogicalCollection.h @@ -130,6 +130,7 @@ class LogicalCollection { std::string cid_as_string() const; TRI_voc_cid_t planId() const; + std::string planId_as_string() const; TRI_col_type_e type() const;