From ba437cab71c5a137eb848bc0c5ece1b4a9f8dbb4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20Gra=CC=88tzer?= Date: Thu, 13 Oct 2016 16:41:17 +0200 Subject: [PATCH] Included support for multiple shards --- arangod/CMakeLists.txt | 1 + arangod/Pregel/Conductor.cpp | 9 +- arangod/Pregel/OutMessageCache.cpp | 9 +- arangod/Pregel/Utils.cpp | 5 +- arangod/Pregel/Utils.h | 4 +- arangod/Pregel/Worker.cpp | 321 ++++++++-------------- arangod/Pregel/Worker.h | 24 +- arangod/Pregel/WorkerContext.h | 13 +- arangod/Pregel/WorkerJob.cpp | 142 ++++++++++ arangod/Pregel/WorkerJob.h | 54 ++++ arangod/RestHandler/RestPregelHandler.cpp | 1 + arangod/V8Server/v8-collection.cpp | 6 +- 12 files changed, 350 insertions(+), 239 deletions(-) create mode 100755 arangod/Pregel/WorkerJob.cpp create mode 100755 arangod/Pregel/WorkerJob.h diff --git a/arangod/CMakeLists.txt b/arangod/CMakeLists.txt index 9ece379999..78088c85a3 100644 --- a/arangod/CMakeLists.txt +++ b/arangod/CMakeLists.txt @@ -377,6 +377,7 @@ add_executable(${BIN_ARANGOD} Pregel/Vertex.cpp Pregel/Worker.cpp Pregel/WorkerContext.cpp + Pregel/WorkerJob.cpp Pregel/GraphState.cpp Pregel/Utils.cpp ${ADDITIONAL_BIN_ARANGOD_SOURCES} diff --git a/arangod/Pregel/Conductor.cpp b/arangod/Pregel/Conductor.cpp index e0b3dcc6de..be9dd166af 100755 --- a/arangod/Pregel/Conductor.cpp +++ b/arangod/Pregel/Conductor.cpp @@ -89,8 +89,11 @@ void Conductor::start() { VPackBuilder b; b.openObject(); b.add(Utils::executionNumberKey, VPackValue(_executionNumber)); + b.add(Utils::globalSuperstepKey, VPackValue(0)); + b.add(Utils::algorithmKey, VPackValue(_algorithm)); b.add(Utils::coordinatorIdKey, VPackValue(coordinatorId)); - b.add(Utils::vertexCollectionKey, VPackValue(_vertexCollection->name())); + b.add(Utils::vertexCollectionNameKey, VPackValue(_vertexCollection->name())); + b.add(Utils::vertexCollectionPlanIdKey, VPackValue(_vertexCollection->planId_as_string())); b.add(Utils::vertexShardsListKey, VPackValue(VPackValueType::Array)); for (ShardID const &vit : it.second) { b.add(VPackValue(vit)); @@ -101,10 +104,6 @@ void Conductor::start() { b.add(VPackValue(eit)); } b.close(); - //b.add(Utils::vertexCollectionKey, VPackValue(_vertexCollection)); - //b.add(Utils::edgeCollectionKey, VPackValue(_edgeCollection)); - b.add(Utils::globalSuperstepKey, VPackValue(0)); - b.add(Utils::algorithmKey, VPackValue(_algorithm)); b.close(); auto body = std::make_shared(b.toJson()); diff --git a/arangod/Pregel/OutMessageCache.cpp b/arangod/Pregel/OutMessageCache.cpp index 60319071b4..48ce1126c3 100755 --- a/arangod/Pregel/OutMessageCache.cpp +++ b/arangod/Pregel/OutMessageCache.cpp @@ -33,6 +33,8 @@ #include #include +#include + using namespace arangodb; using namespace arangodb::pregel; @@ -121,15 +123,16 @@ void OutMessageCache::getMessages(ShardID const& shardId, VPackBuilder &outBuild } void OutMessageCache::sendMessages() { - LOG(INFO) << "Sending messages to shards"; + LOG(INFO) << "Sending messages to other machines"; + auto localShards = _ctx->localVertexShardIDs(); std::shared_ptr> shards = _ci->getShardList(_ctx->vertexCollectionPlanId()); - LOG(INFO) << "Seeing shards: " << shards->size(); std::vector requests; for (auto const &it : *shards) { - if (_ctx->vertexShardId() == it) { + if (std::find(localShards.begin(), localShards.end(), it) != localShards.end()) { LOG(INFO) << "Worker: Getting messages for myself"; + VPackBuilder messages; messages.openArray(); getMessages(it, messages); diff --git a/arangod/Pregel/Utils.cpp b/arangod/Pregel/Utils.cpp index 03f68da91a..f2fae2f103 100755 --- a/arangod/Pregel/Utils.cpp +++ b/arangod/Pregel/Utils.cpp @@ -35,7 +35,8 @@ std::string const Utils::messagesPath = "messages"; std::string const Utils::writeResultsPath = "writeResults"; std::string const Utils::executionNumberKey = "extn"; -std::string const Utils::vertexCollectionKey = "vertexCollection"; +std::string const Utils::vertexCollectionNameKey = "vertecCollName"; +std::string const Utils::vertexCollectionPlanIdKey = "vertecCollPlanID"; std::string const Utils::vertexShardsListKey = "vertexShards"; std::string const Utils::edgeShardsListKey = "edgeShards"; std::string const Utils::resultShardKey = "resultShard"; @@ -46,6 +47,8 @@ std::string const Utils::messagesKey = "msgs"; std::string const Utils::senderKey = "sender"; std::string const Utils::doneKey = "done"; +std::string const Utils::edgeShardingKey = "_vertex"; + std::string Utils::baseUrl(std::string dbName) { return "/_db/" + basics::StringUtils::urlEncode(dbName) + Utils::apiPrefix; } diff --git a/arangod/Pregel/Utils.h b/arangod/Pregel/Utils.h index 1f147da6d8..ca40a3963b 100755 --- a/arangod/Pregel/Utils.h +++ b/arangod/Pregel/Utils.h @@ -44,7 +44,8 @@ namespace pregel { static std::string const writeResultsPath; static std::string const executionNumberKey; - static std::string const vertexCollectionKey; + static std::string const vertexCollectionNameKey; + static std::string const vertexCollectionPlanIdKey; static std::string const vertexShardsListKey; static std::string const edgeShardsListKey; static std::string const resultShardKey; @@ -56,6 +57,7 @@ namespace pregel { static std::string const senderKey; static std::string const doneKey; + static std::string const edgeShardingKey; static std::string baseUrl(std::string dbName); }; diff --git a/arangod/Pregel/Worker.cpp b/arangod/Pregel/Worker.cpp index cbbd8b2f0a..562e928387 100755 --- a/arangod/Pregel/Worker.cpp +++ b/arangod/Pregel/Worker.cpp @@ -23,6 +23,7 @@ #include "Worker.h" #include "Vertex.h" #include "Utils.h" +#include "WorkerJob.h" #include "WorkerContext.h" #include "InMessageCache.h" #include "OutMessageCache.h" @@ -30,19 +31,19 @@ #include "Basics/MutexLocker.h" #include "Cluster/ClusterInfo.h" #include "Cluster/ClusterComm.h" -#include "VocBase/ticks.h" #include "VocBase/vocbase.h" #include "VocBase/LogicalCollection.h" #include "VocBase/EdgeCollectionInfo.h" -#include "Indexes/Index.h" #include "Dispatcher/DispatcherQueue.h" #include "Dispatcher/DispatcherFeature.h" #include "Utils/Transaction.h" #include "Utils/SingleCollectionTransaction.h" #include "Utils/StandaloneTransactionContext.h" #include "Utils/OperationCursor.h" + #include "Indexes/EdgeIndex.h" +#include "Indexes/Index.h" #include #include @@ -56,112 +57,38 @@ Worker::Worker(unsigned int executionNumber, TRI_vocbase_t *vocbase, VPackSlice s) : _vocbase(vocbase), _ctx(new WorkerContext(executionNumber)) { - //VPackSlice algo = s.get("algo"); VPackSlice coordID = s.get(Utils::coordinatorIdKey); + VPackSlice vertexCollName = s.get(Utils::vertexCollectionNameKey); + VPackSlice vertexCollPlanId = s.get(Utils::vertexCollectionPlanIdKey); VPackSlice vertexShardIDs = s.get(Utils::vertexShardsListKey); VPackSlice edgeShardIDs = s.get(Utils::edgeShardsListKey); - // TODO support more shards - if (!(coordID.isString() && vertexShardIDs.length() == 1 && edgeShardIDs.length() == 1)) { - THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER, "Only one shard per collection supported"); + //if (!(coordID.isString() && vertexShardIDs.length() == 1 && edgeShardIDs.length() == 1)) { + // THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER, "Only one shard per collection supported"); + //} + if (!coordID.isString() + || !vertexCollName.isString() + || !vertexCollPlanId.isString() + || !vertexShardIDs.isArray() + || !edgeShardIDs.isArray()) { + THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER, "Supplied bad parameters to worker"); } - _ctx->_coordinatorId = coordID.copyString(); - _ctx->_database = vocbase->name(); - _ctx->_vertexCollectionName = s.get(Utils::vertexCollectionKey).copyString();// readable name of collection - _ctx->_vertexShardID = vertexShardIDs.at(0).copyString(); - _ctx->_edgeShardID = edgeShardIDs.at(0).copyString(); - LOG(INFO) << "Received collection " << _ctx->_vertexCollectionName; - LOG(INFO) << "starting worker with (" << _ctx->_vertexShardID << ", " << _ctx->_edgeShardID << ")"; - - SingleCollectionTransaction *trx = new SingleCollectionTransaction(StandaloneTransactionContext::Create(_vocbase), - _ctx->_vertexShardID, TRI_TRANSACTION_READ); - int res = trx->begin(); - if (res != TRI_ERROR_NO_ERROR) { - THROW_ARANGO_EXCEPTION_FORMAT(res, "while looking up vertices '%s'", - _ctx->_vertexShardID.c_str()); - return; - } - // resolve planId - _ctx->_vertexCollectionPlanId = trx->documentCollection()->planId_as_string(); - - OperationResult result = trx->all( _ctx->_vertexShardID, 0, UINT64_MAX, OperationOptions()); - // Commit or abort. - res = trx->finish(result.code); - if (!result.successful()) { - THROW_ARANGO_EXCEPTION_FORMAT(result.code, "while looking up graph '%s'", _ctx->_vertexCollectionName.c_str()); - } - if (res != TRI_ERROR_NO_ERROR) { - THROW_ARANGO_EXCEPTION_FORMAT(res, "while looking up graph '%s'", _ctx->_vertexCollectionName.c_str()); - } - VPackSlice vertices = result.slice(); - if (vertices.isExternal()) { - vertices = vertices.resolveExternal(); - } - _transactions.push_back(trx);// store transactions, otherwise VPackSlices become invalid - - // ======= Look up edges - - trx = new SingleCollectionTransaction(StandaloneTransactionContext::Create(_vocbase), - _ctx->_edgeShardID, TRI_TRANSACTION_READ); - res = trx->begin(); - if (res != TRI_ERROR_NO_ERROR) { - THROW_ARANGO_EXCEPTION_FORMAT(res, "while looking up edges '%s'", _ctx->_edgeShardID.c_str()); - } - _transactions.push_back(trx); - - auto info = std::make_unique(trx, _ctx->_edgeShardID, TRI_EDGE_OUT, - StaticStrings::FromString, 0); - - size_t edgeCount = 0; - VPackArrayIterator arr = VPackArrayIterator(vertices); - LOG(INFO) << "Found vertices: " << arr.size(); - for (auto it : arr) { - LOG(INFO) << it.toJson(); - if (it.isExternal()) { - it = it.resolveExternal(); + _ctx->_coordinatorId = coordID.copyString(); + _ctx->_database = vocbase->name(); + _ctx->_vertexCollectionName = vertexCollName.copyString();// readable name of collection + _ctx->_vertexCollectionPlanId = vertexCollPlanId.copyString(); + + VPackArrayIterator vertices(vertexShardIDs); + for (VPackSlice shardSlice : vertices) { + ShardID name = shardSlice.copyString(); + _ctx->_localVertexShardIDs.push_back(name); + lookupVertices(name); } - - std::string vertexId = it.get(StaticStrings::KeyString).copyString(); - std::unique_ptr v (new Vertex(it)); - _vertices[vertexId] = v.get(); - - std::string key = _ctx->_vertexCollectionName+"/"+vertexId;// TODO geht das schneller - LOG(INFO) << "Retrieving edge " << key; - - auto cursor = info->getEdges(key); - if (cursor->failed()) { - THROW_ARANGO_EXCEPTION_FORMAT(cursor->code, "while looking up edges '%s' from %s", - key.c_str(), _ctx->_edgeShardID.c_str()); + VPackArrayIterator edges(edgeShardIDs); + for (VPackSlice shardSlice : edges) { + ShardID name = shardSlice.copyString(); + lookupEdges(name); } - - std::vector result; - result.reserve(1000); - while (cursor->hasMore()) { - cursor->getMoreMptr(result, 1000); - for (auto const& mptr : result) { - - VPackSlice s(mptr->vpack()); - if (s.isExternal()) { - s = s.resolveExternal(); - } - LOG(INFO) << s.toJson(); - - v->_edges.emplace_back(s); - edgeCount++; - - } - } - LOG(INFO) << "done retrieving edge"; - - v.release(); - } - trx->finish(res); - if (res != TRI_ERROR_NO_ERROR) { - THROW_ARANGO_EXCEPTION_FORMAT(res, "after looking up edges '%s'", _ctx->_edgeShardID.c_str()); - } - - LOG(INFO) << "Resolved " << _vertices.size() << " vertices"; - LOG(INFO) << "Resolved " << edgeCount << " edges"; } Worker::~Worker() { @@ -170,10 +97,7 @@ Worker::~Worker() { delete(it.second); } _vertices.clear(); - for (auto const &it : _transactions) {// clean transactions - delete(it); - } - _transactions.clear(); + cleanupReadTransactions(); } /// @brief Setup next superstep @@ -264,106 +188,101 @@ void Worker::writeResults() { }*/ } - -// ========== WorkerJob ========== - -WorkerJob::WorkerJob(Worker *worker, - std::shared_ptr ctx) : Job("Pregel Job"), _canceled(false), _worker(worker), _ctx(ctx) { -} - -void WorkerJob::work() { - LOG(INFO) << "Worker job started"; - if (_canceled) { - LOG(INFO) << "Job was canceled before work started"; - return; - } - // TODO cache this - OutMessageCache outCache(_ctx); - - unsigned int gss = _ctx->globalSuperstep(); - bool isDone = true; - - if (gss == 0) { - isDone = false; - - for (auto const &it : _worker->_vertices) { - Vertex *v = it.second; - //std::string key = v->_data.get(StaticStrings::KeyString).copyString(); - //VPackSlice messages = _ctx->readableIncomingCache()->getMessages(key); - v->compute(gss, MessageIterator(), &outCache); - bool active = v->state() == VertexActivationState::ACTIVE; - if (!active) LOG(INFO) << "vertex has halted"; - _worker->_activationMap[it.first] = active; +void Worker::cleanupReadTransactions() { + for (auto const &it : _readTrxList) {// clean transactions + if (it->getStatus() == TRI_TRANSACTION_RUNNING) { + if (it->commit() != TRI_ERROR_NO_ERROR) { + LOG(WARN) << "Pregel worker: Failed to commit on a read transaction"; + } + } + delete(it); } - } else { - for (auto &it : _worker->_activationMap) { - - std::string key = _ctx->vertexCollectionName() + "/" + it.first; - VPackSlice messages = _ctx->readableIncomingCache()->getMessages(key); - - MessageIterator iterator(messages); - if (iterator.size() > 0 || it.second) { - isDone = false; - LOG(INFO) << "Processing messages: " << messages.toString(); - - Vertex *v = _worker->_vertices[it.first]; - v->compute(gss, iterator, &outCache); - bool active = v->state() == VertexActivationState::ACTIVE; - it.second = active; - if (!active) LOG(INFO) << "vertex has halted"; - } - } - } - LOG(INFO) << "Finished executing vertex programs."; + _readTrxList.clear(); +} - if (_canceled) { - return; - } - - // ==================== send messages to other shards ==================== - - if (!isDone) { - outCache.sendMessages(); - } else { - LOG(INFO) << "Worker job has nothing more to process"; - } - - // notify the conductor that we are done. - VPackBuilder package; - package.openObject(); - package.add(Utils::senderKey, VPackValue(ServerState::instance()->getId())); - package.add(Utils::executionNumberKey, VPackValue(_ctx->executionNumber())); - package.add(Utils::globalSuperstepKey, VPackValue(gss)); - package.add(Utils::doneKey, VPackValue(isDone)); - package.close(); - - LOG(INFO) << "Sending finishedGSS to coordinator: " << _ctx->coordinatorId(); - // TODO handle communication failures? +void Worker::lookupVertices(ShardID const &vertexShard) { - ClusterComm *cc = ClusterComm::instance(); - std::string baseUrl = Utils::baseUrl(_worker->_vocbase->name()); - CoordTransactionID coordinatorTransactionID = TRI_NewTickServer(); - auto headers = - std::make_unique>(); - auto body = std::make_shared(package.toJson()); - cc->asyncRequest("", coordinatorTransactionID, - "server:" + _ctx->coordinatorId(), - rest::RequestType::POST, - baseUrl + Utils::finishedGSSPath, - body, headers, nullptr, 90.0); - - LOG(INFO) << "Worker job finished sending stuff"; + SingleCollectionTransaction *trx = new SingleCollectionTransaction(StandaloneTransactionContext::Create(_vocbase), + vertexShard, TRI_TRANSACTION_READ); + int res = trx->begin(); + if (res != TRI_ERROR_NO_ERROR) { + THROW_ARANGO_EXCEPTION_FORMAT(res, "while looking up vertices '%s'", vertexShard.c_str()); + } + + OperationResult result = trx->all(vertexShard, 0, UINT64_MAX, OperationOptions()); + // Commit or abort. + res = trx->finish(result.code); + if (!result.successful()) { + THROW_ARANGO_EXCEPTION_FORMAT(result.code, "while looking up shard '%s'", vertexShard.c_str()); + } + if (res != TRI_ERROR_NO_ERROR) { + THROW_ARANGO_EXCEPTION_FORMAT(res, "while looking up shard '%s'", vertexShard.c_str()); + } + VPackSlice vertices = result.slice(); + if (vertices.isExternal()) { + vertices = vertices.resolveExternal(); + } + _readTrxList.push_back(trx);// store transactions, otherwise VPackSlices become invalid + + VPackArrayIterator arr = VPackArrayIterator(vertices); + LOG(INFO) << "Found vertices: " << arr.size(); + for (auto it : arr) { + LOG(INFO) << it.toJson(); + if (it.isExternal()) { + it = it.resolveExternal(); + } + + std::string vertexId = it.get(StaticStrings::KeyString).copyString(); + std::unique_ptr v (new Vertex(it)); + _vertices[vertexId] = v.get(); + v.release(); + } } -bool WorkerJob::cancel() { - LOG(INFO) << "Canceling worker job"; - _canceled = true; - return true; +void Worker::lookupEdges(ShardID const &edgeShardID) { + std::unique_ptr trx(new SingleCollectionTransaction(StandaloneTransactionContext::Create(_vocbase), + edgeShardID, TRI_TRANSACTION_READ)); + int res = trx->begin(); + if (res != TRI_ERROR_NO_ERROR) { + THROW_ARANGO_EXCEPTION_FORMAT(res, "while looking up edges '%s'", edgeShardID.c_str()); + } + + auto info = std::make_unique(trx.get(), edgeShardID, TRI_EDGE_OUT, + StaticStrings::FromString, 0); + + size_t edgeCount = 0; + for (auto const &it : _vertices) { + Vertex *v = it.second; + + std::string _from = _ctx->_vertexCollectionName+"/"+it.first;// TODO geht das schneller + LOG(INFO) << "Retrieving edge _from: " << _from; + + auto cursor = info->getEdges(_from); + if (cursor->failed()) { + THROW_ARANGO_EXCEPTION_FORMAT(cursor->code, "while looking up edges '%s' from %s", + _from.c_str(), edgeShardID.c_str()); + } + + std::vector result; + result.reserve(1000); + while (cursor->hasMore()) { + cursor->getMoreMptr(result, 1000); + for (auto const& mptr : result) { + + VPackSlice s(mptr->vpack()); + if (s.isExternal()) { + s = s.resolveExternal(); + } + LOG(INFO) << s.toJson(); + + v->_edges.emplace_back(s); + edgeCount++; + + } + } + } + + _readTrxList.push_back(trx.get()); + trx.release(); } -void WorkerJob::cleanup(rest::DispatcherQueue* queue) { - queue->removeJob(this); - delete this; -} - -void WorkerJob::handleError(basics::Exception const& ex) {} diff --git a/arangod/Pregel/Worker.h b/arangod/Pregel/Worker.h index 6a74682be7..23e5eea692 100755 --- a/arangod/Pregel/Worker.h +++ b/arangod/Pregel/Worker.h @@ -46,6 +46,7 @@ namespace pregel { void nextGlobalStep(VPackSlice data);// called by coordinator void receivedMessages(VPackSlice data); void writeResults(); + void cleanupReadTransactions(); private: /// @brief guard to make sure the database is not dropped while used by us @@ -55,25 +56,10 @@ namespace pregel { std::unordered_map _vertices; std::map _activationMap; - std::vector _transactions; - }; - - class WorkerJob : public rest::Job { - WorkerJob(WorkerJob const&) = delete; - WorkerJob& operator=(WorkerJob const&) = delete; - - public: - WorkerJob(Worker *worker, std::shared_ptr ctx); - - void work() override; - bool cancel() override; - void cleanup(rest::DispatcherQueue*) override; - void handleError(basics::Exception const& ex) override; - - private: - std::atomic _canceled; - Worker *_worker; - std::shared_ptr _ctx; + std::vector _readTrxList; + + void lookupVertices(ShardID const& vertexShard); + void lookupEdges(ShardID const& edgeShardID); }; } } diff --git a/arangod/Pregel/WorkerContext.h b/arangod/Pregel/WorkerContext.h index 76199cfae9..0097b746d6 100755 --- a/arangod/Pregel/WorkerContext.h +++ b/arangod/Pregel/WorkerContext.h @@ -65,13 +65,13 @@ namespace pregel { return _vertexCollectionPlanId; } - inline ShardID const& vertexShardId() const { - return _vertexShardID; + inline std::vector const& localVertexShardIDs() const { + return _localVertexShardIDs; } - inline ShardID const& edgeShardId() const { - return _edgeShardID; - } + //inline ShardID const& edgeShardId() const { + // return _edgeShardID; + //} inline InMessageCache* readableIncomingCache() { return _readCache; @@ -89,7 +89,8 @@ namespace pregel { std::string _coordinatorId; std::string _database; std::string _vertexCollectionName, _vertexCollectionPlanId; - ShardID _vertexShardID, _edgeShardID; + std::vector _localVertexShardIDs; + //ShardID _vertexShardID, _edgeShardID; InMessageCache *_readCache, *_writeCache; void swapIncomingCaches();// only call when message receiving is locked diff --git a/arangod/Pregel/WorkerJob.cpp b/arangod/Pregel/WorkerJob.cpp new file mode 100755 index 0000000000..3eb4777ba3 --- /dev/null +++ b/arangod/Pregel/WorkerJob.cpp @@ -0,0 +1,142 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2016 ArangoDB GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Simon Grätzer +//////////////////////////////////////////////////////////////////////////////// + +#include "WorkerJob.h" +#include "WorkerContext.h" +#include "Worker.h" +#include "Vertex.h" +#include "Utils.h" + +#include "InMessageCache.h" +#include "OutMessageCache.h" + +#include "VocBase/ticks.h" +#include "Cluster/ClusterInfo.h" +#include "Cluster/ClusterComm.h" +#include "Dispatcher/DispatcherQueue.h" + +#include +#include + +using namespace arangodb; +using namespace arangodb::pregel; + +WorkerJob::WorkerJob(Worker *worker, + std::shared_ptr ctx) : Job("Pregel Job"), _canceled(false), _worker(worker), _ctx(ctx) { +} + +void WorkerJob::work() { + LOG(INFO) << "Worker job started"; + if (_canceled) { + LOG(INFO) << "Job was canceled before work started"; + return; + } + // TODO cache this + OutMessageCache outCache(_ctx); + + unsigned int gss = _ctx->globalSuperstep(); + bool isDone = true; + + if (gss == 0) { + isDone = false; + + for (auto const &it : _worker->_vertices) { + Vertex *v = it.second; + //std::string key = v->_data.get(StaticStrings::KeyString).copyString(); + //VPackSlice messages = _ctx->readableIncomingCache()->getMessages(key); + v->compute(gss, MessageIterator(), &outCache); + bool active = v->state() == VertexActivationState::ACTIVE; + if (!active) LOG(INFO) << "vertex has halted"; + _worker->_activationMap[it.first] = active; + } + } else { + for (auto &it : _worker->_activationMap) { + + std::string key = _ctx->vertexCollectionName() + "/" + it.first; + VPackSlice messages = _ctx->readableIncomingCache()->getMessages(key); + + MessageIterator iterator(messages); + if (iterator.size() > 0 || it.second) { + isDone = false; + LOG(INFO) << "Processing messages: " << messages.toString(); + + Vertex *v = _worker->_vertices[it.first]; + v->compute(gss, iterator, &outCache); + bool active = v->state() == VertexActivationState::ACTIVE; + it.second = active; + if (!active) LOG(INFO) << "vertex has halted"; + } + } + } + LOG(INFO) << "Finished executing vertex programs."; + + if (_canceled) { + return; + } + + // ==================== send messages to other shards ==================== + + if (!isDone) { + outCache.sendMessages(); + } else { + LOG(INFO) << "Worker job has nothing more to process"; + } + + // notify the conductor that we are done. + VPackBuilder package; + package.openObject(); + package.add(Utils::senderKey, VPackValue(ServerState::instance()->getId())); + package.add(Utils::executionNumberKey, VPackValue(_ctx->executionNumber())); + package.add(Utils::globalSuperstepKey, VPackValue(gss)); + package.add(Utils::doneKey, VPackValue(isDone)); + package.close(); + + LOG(INFO) << "Sending finishedGSS to coordinator: " << _ctx->coordinatorId(); + // TODO handle communication failures? + + ClusterComm *cc = ClusterComm::instance(); + std::string baseUrl = Utils::baseUrl(_worker->_vocbase->name()); + CoordTransactionID coordinatorTransactionID = TRI_NewTickServer(); + auto headers = + std::make_unique>(); + auto body = std::make_shared(package.toJson()); + cc->asyncRequest("", coordinatorTransactionID, + "server:" + _ctx->coordinatorId(), + rest::RequestType::POST, + baseUrl + Utils::finishedGSSPath, + body, headers, nullptr, 90.0); + + LOG(INFO) << "Worker job finished sending stuff"; +} + +bool WorkerJob::cancel() { + LOG(INFO) << "Canceling worker job"; + _canceled = true; + return true; +} + +void WorkerJob::cleanup(rest::DispatcherQueue* queue) { + queue->removeJob(this); + delete this; +} + +void WorkerJob::handleError(basics::Exception const& ex) {} diff --git a/arangod/Pregel/WorkerJob.h b/arangod/Pregel/WorkerJob.h new file mode 100755 index 0000000000..88008a8f5c --- /dev/null +++ b/arangod/Pregel/WorkerJob.h @@ -0,0 +1,54 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2016 ArangoDB GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Simon Grätzer +//////////////////////////////////////////////////////////////////////////////// + +#ifndef ARANGODB_PREGEL_JOB_H +#define ARANGODB_PREGEL_JOB_H 1 + +#include "Basics/Common.h" +#include "Dispatcher/Job.h" + +namespace arangodb { + class SingleCollectionTransaction; +namespace pregel { + class Worker; + class WorkerContext; + + class WorkerJob : public rest::Job { + WorkerJob(WorkerJob const&) = delete; + WorkerJob& operator=(WorkerJob const&) = delete; + + public: + WorkerJob(Worker *worker, std::shared_ptr ctx); + + void work() override; + bool cancel() override; + void cleanup(rest::DispatcherQueue*) override; + void handleError(basics::Exception const& ex) override; + + private: + std::atomic _canceled; + Worker *_worker; + std::shared_ptr _ctx; + }; +} +} +#endif diff --git a/arangod/RestHandler/RestPregelHandler.cpp b/arangod/RestHandler/RestPregelHandler.cpp index f099419370..ab2e9f9b88 100644 --- a/arangod/RestHandler/RestPregelHandler.cpp +++ b/arangod/RestHandler/RestPregelHandler.cpp @@ -94,6 +94,7 @@ RestHandler::status RestPregelHandler::execute() { Worker *exe = PregelFeature::instance()->worker(executionNumber); if (exe) { exe->writeResults(); + PregelFeature::instance()->cleanup(executionNumber); } } diff --git a/arangod/V8Server/v8-collection.cpp b/arangod/V8Server/v8-collection.cpp index 20e42d8595..4eebc122f3 100644 --- a/arangod/V8Server/v8-collection.cpp +++ b/arangod/V8Server/v8-collection.cpp @@ -1854,15 +1854,15 @@ static void JS_Pregel(v8::FunctionCallbackInfo const& args) { try { vertexColl = ClusterInfo::instance()->getCollection(vocbase->name(), vName); edgeColl = ClusterInfo::instance()->getCollection(vocbase->name(), eName); - if (edgeColl->isSystem() || edgeColl->isSystem()) { + if (vertexColl->isSystem() || edgeColl->isSystem()) { TRI_V8_THROW_EXCEPTION_USAGE("Cannot use pregel on system collection"); } if (!vertexColl->usesDefaultShardKeys()) { TRI_V8_THROW_EXCEPTION_USAGE("Vertex collection needs to be shared after '_key'"); } std::vector eKeys = edgeColl->shardKeys(); - if (eKeys.size() != 1 || eKeys[0] == StaticStrings::FromString) { - TRI_V8_THROW_EXCEPTION_USAGE("Edge collection needs to be sharded after '_from'"); + if (eKeys.size() != 1 || eKeys[0] != "_vertex") { + TRI_V8_THROW_EXCEPTION_USAGE("Edge collection needs to be sharded after '_vertex', or use smart graphs"); } //eName = ci2->cid_as_string(); } catch (...) {