diff --git a/arangod/Pregel/Conductor.cpp b/arangod/Pregel/Conductor.cpp index 4a004594f5..e0b3dcc6de 100755 --- a/arangod/Pregel/Conductor.cpp +++ b/arangod/Pregel/Conductor.cpp @@ -137,8 +137,8 @@ void Conductor::finishedGlobalStep(VPackSlice &data) { _globalSuperstep++; std::string baseUrl = Utils::baseUrl(_vocbaseGuard.vocbase()->name()); - if (_doneCount == _dbServerCount || _globalSuperstep >= 25) { - LOG(INFO) << "Done. We did " << _globalSuperstep << " rounds"; + if (_doneCount == _dbServerCount || _globalSuperstep == 101) { + LOG(INFO) << "Done. We did " << _globalSuperstep-1 << " rounds"; VPackBuilder b; b.openObject(); b.add(Utils::executionNumberKey, VPackValue(_executionNumber)); diff --git a/arangod/Pregel/InMessageCache.cpp b/arangod/Pregel/InMessageCache.cpp index 9cb94ba658..dd68b0998f 100755 --- a/arangod/Pregel/InMessageCache.cpp +++ b/arangod/Pregel/InMessageCache.cpp @@ -48,7 +48,7 @@ void InMessageCache::clear() { void InMessageCache::addMessages(VPackArrayIterator incomingMessages) { MUTEX_LOCKER(locker, writeMutex); - LOG(INFO) << "Adding messages to in queue\n"; + LOG(INFO) << "Adding messages to in queue"; //unordered_map> messageBucket; //VPackSlice messages = data.get(Utils::messagesKey); @@ -89,7 +89,7 @@ void InMessageCache::addMessages(VPackArrayIterator incomingMessages) { } VPackSlice InMessageCache::getMessages(std::string const& vertexId) { - LOG(INFO) << "Querying messages from in queue\n"; + LOG(INFO) << "Querying messages from for " << vertexId; auto vmsg = _messages.find(vertexId); if (vmsg != _messages.end()) { return vmsg->second->slice(); diff --git a/arangod/Pregel/OutMessageCache.cpp b/arangod/Pregel/OutMessageCache.cpp index d3bc71d65e..60319071b4 100755 --- a/arangod/Pregel/OutMessageCache.cpp +++ b/arangod/Pregel/OutMessageCache.cpp @@ -66,8 +66,8 @@ void OutMessageCache::clear() { _map.clear(); } -void OutMessageCache::addMessage(std::string key, VPackSlice slice) { - LOG(INFO) << "Adding messages to out queue\n"; +void OutMessageCache::addMessage(std::string key, VPackSlice mData) { + LOG(INFO) << "Adding outgoing messages " << mData.toJson(); ShardID responsibleShard; bool usesDefaultShardingAttributes; @@ -89,15 +89,15 @@ void OutMessageCache::addMessage(std::string key, VPackSlice slice) { // hardcoded combiner int64_t oldValue = b->slice().get("value").getInt(); - int64_t newValue = slice.get("value").getInt(); + int64_t newValue = mData.get("value").getInt(); if (newValue < oldValue) { b->clear(); - b->add(slice); + b->add(mData); } } else {// first message for this vertex std::unique_ptr b(new VPackBuilder()); - b->add(slice); + b->add(mData); _map[responsibleShard][key] = b.get(); b.release(); } diff --git a/arangod/Pregel/Vertex.cpp b/arangod/Pregel/Vertex.cpp index 51bd79519d..7678bd3c4a 100755 --- a/arangod/Pregel/Vertex.cpp +++ b/arangod/Pregel/Vertex.cpp @@ -34,7 +34,12 @@ using namespace arangodb::pregel; Message::Message(VPackSlice slice) { VPackSlice s = slice.get("value"); - _value = s.getSmallInt() || s.getInt() ? s.getInt() : -1; + _value = s.getInt(); +} + +Edge::Edge(VPackSlice data) : _data(data) { + VPackSlice v = data.get("value"); + _value = v.isInteger() ? v.getInt() : 1; } Vertex::Vertex(VPackSlice document) : _data(document) { @@ -51,20 +56,26 @@ Vertex::~Vertex() { } void Vertex::compute(int gss, MessageIterator const &messages, OutMessageCache* const cache) { - int current = _vertexState; + int64_t current = _vertexState; for (auto const &msg : messages) { - if (msg._value < current) current = msg._value; + if (current < 0 || msg._value < current) { + current = msg._value; + }; } if (current >= 0 && (gss == 0 || current != _vertexState)) { + LOG(INFO) << "Recomputing value for vertex " << _data.toJson(); + _vertexState = current; for (auto const &edge : _edges) { + int64_t val = edge._value + current; VPackSlice toID = edge._data.get(StaticStrings::ToString); VPackBuilder b; b.openObject(); b.add(StaticStrings::ToString, toID); - b.add("value", VPackValue(edge._value + current)); + b.add("value", VPackValue(val)); b.close(); cache->addMessage(toID.copyString(), b.slice()); } - } else voteHalt(); + } + voteHalt(); } diff --git a/arangod/Pregel/Vertex.h b/arangod/Pregel/Vertex.h index f736bdc5df..009db49395 100755 --- a/arangod/Pregel/Vertex.h +++ b/arangod/Pregel/Vertex.h @@ -40,13 +40,13 @@ namespace pregel { struct Message { Message(VPackSlice slice); - int _value; // demo + int64_t _value; // demo }; //template class MessageIterator { public: - MessageIterator() = delete; + MessageIterator() : _size(0) {} typedef MessageIterator iterator; typedef const MessageIterator const_iterator; @@ -109,12 +109,12 @@ namespace pregel { }; struct Edge { - Edge(VPackSlice data) : _data(data) {} + Edge(VPackSlice data); VPackSlice _data; //protected: virtual void loadData() = 0; - int _value;// demo + int64_t _value;// demo }; class Vertex { @@ -132,11 +132,11 @@ namespace pregel { protected: void voteHalt() {_activationState = VertexActivationState::STOPPED;} - int _vertexState;// demo + int64_t _vertexState;// demo VPackSlice _data; private: - VertexActivationState _activationState; + VertexActivationState _activationState = VertexActivationState::ACTIVE; }; } } diff --git a/arangod/Pregel/Worker.cpp b/arangod/Pregel/Worker.cpp index 5a100248b8..3bf14ce054 100755 --- a/arangod/Pregel/Worker.cpp +++ b/arangod/Pregel/Worker.cpp @@ -54,7 +54,7 @@ using namespace arangodb::pregel; Worker::Worker(unsigned int executionNumber, TRI_vocbase_t *vocbase, - VPackSlice s) : _vocbase(vocbase), _ctx(new WorkerContext) { + VPackSlice s) : _vocbase(vocbase), _ctx(new WorkerContext(executionNumber)) { //VPackSlice algo = s.get("algo"); @@ -65,7 +65,6 @@ Worker::Worker(unsigned int executionNumber, if (!(coordID.isString() && vertexShardIDs.length() == 1 && edgeShardIDs.length() == 1)) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER, "Only one shard per collection supported"); } - _ctx->_executionNumber = executionNumber; _ctx->_coordinatorId = coordID.copyString(); _ctx->_database = vocbase->name(); _ctx->_vertexCollectionName = s.get(Utils::vertexCollectionKey).copyString();// readable name of collection @@ -147,9 +146,7 @@ Worker::Worker(unsigned int executionNumber, } LOG(INFO) << s.toJson(); - VPackSlice i = s.get("value"); v->_edges.emplace_back(s); - v->_edges.end()->_value = i.isInteger() ? i.getInt() : 1; edgeCount++; } @@ -190,17 +187,21 @@ void Worker::nextGlobalStep(VPackSlice data) { "Invalid gss in %s:%d", __FILE__, __LINE__); } unsigned int gss = (unsigned int) gssSlice.getUInt(); - unsigned int expected = _ctx->_globalSuperstep + 1; - if (gss != 0 && expected != gss) { + if (_ctx->_expectedGSS != gss) { THROW_ARANGO_EXCEPTION_FORMAT(TRI_ERROR_BAD_PARAMETER, "Seems like this worker missed a gss, expected %u. Data = %s ", - expected, data.toJson().c_str()); + _ctx->_expectedGSS, data.toJson().c_str()); } + _ctx->_globalSuperstep = gss; + _ctx->_expectedGSS = gss + 1; _ctx->readableIncomingCache()->clear(); _ctx->swapIncomingCaches();// write cache becomes the readable cache std::unique_ptr job(new WorkerJob(this, _ctx)); - DispatcherFeature::DISPATCHER->addJob(job, false); + int res = DispatcherFeature::DISPATCHER->addJob(job, true); +if (res != TRI_ERROR_NO_ERROR) { + LOG(ERR) << "Could not start worker job"; +} LOG(INFO) << "Worker started new gss: " << gss; } @@ -222,7 +223,7 @@ void Worker::receivedMessages(VPackSlice data) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER, "Superstep out of sync"); } - LOG(INFO) << "Worker received messages\n"; + LOG(INFO) << "Worker received messages"; } void Worker::writeResults() { @@ -247,7 +248,7 @@ void Worker::writeResults() { b.add(StaticStrings::KeyString, pair.second->_data.get(StaticStrings::KeyString)); b.add("value", VPackValue(pair.second->_vertexState)); b.close(); - LOG(INFO) << b.toString(); + LOG(INFO) << b.toJson(); /*result = trx.update(_vertexCollection, b->slice(), options); if (!result.successful()) { THROW_ARANGO_EXCEPTION_FORMAT(result.code, "while looking up graph '%s'", @@ -266,14 +267,16 @@ void Worker::writeResults() { // ========== WorkerJob ========== -WorkerJob::WorkerJob(Worker *worker, std::shared_ptr ctx) : Job("Pregel Job"), _worker(worker), _ctx(ctx) { +WorkerJob::WorkerJob(Worker *worker, + std::shared_ptr ctx) : Job("Pregel Job"), _canceled(false), _worker(worker), _ctx(ctx) { } void WorkerJob::work() { + LOG(INFO) << "Worker job started"; if (_canceled) { + LOG(INFO) << "Job was canceled before work started"; return; } - LOG(INFO) << "Worker job started\n"; // TODO cache this OutMessageCache outCache(_ctx); @@ -285,24 +288,29 @@ void WorkerJob::work() { for (auto const &it : _worker->_vertices) { Vertex *v = it.second; - std::string key = v->_data.get(StaticStrings::KeyString).copyString(); - - VPackSlice messages = _ctx->readableIncomingCache()->getMessages(key); - v->compute(gss, MessageIterator(messages), &outCache); + //std::string key = v->_data.get(StaticStrings::KeyString).copyString(); + //VPackSlice messages = _ctx->readableIncomingCache()->getMessages(key); + v->compute(gss, MessageIterator(), &outCache); bool active = v->state() == VertexActivationState::ACTIVE; if (!active) LOG(INFO) << "vertex has halted"; _worker->_activationMap[it.first] = active; } } else { for (auto &it : _worker->_activationMap) { - VPackSlice messages = _ctx->readableIncomingCache()->getMessages(it.first); + + std::string key = _ctx->vertexCollectionName() + "/" + it.first; + VPackSlice messages = _ctx->readableIncomingCache()->getMessages(key); + MessageIterator iterator(messages); if (iterator.size() > 0 || it.second) { isDone = false; + LOG(INFO) << "Processing messages: " << messages.toString(); Vertex *v = _worker->_vertices[it.first]; v->compute(gss, iterator, &outCache); - it.second = v->state() == VertexActivationState::ACTIVE; + bool active = v->state() == VertexActivationState::ACTIVE; + it.second = active; + if (!active) LOG(INFO) << "vertex has halted"; } } } @@ -317,7 +325,7 @@ void WorkerJob::work() { if (!isDone) { outCache.sendMessages(); } else { - LOG(INFO) << "Worker job has nothing more to process\n"; + LOG(INFO) << "Worker job has nothing more to process"; } // notify the conductor that we are done. @@ -326,7 +334,7 @@ void WorkerJob::work() { package.add(Utils::senderKey, VPackValue(ServerState::instance()->getId())); package.add(Utils::executionNumberKey, VPackValue(_ctx->executionNumber())); package.add(Utils::globalSuperstepKey, VPackValue(gss)); - if (!isDone) package.add(Utils::doneKey, VPackValue(isDone)); + package.add(Utils::doneKey, VPackValue(isDone)); package.close(); LOG(INFO) << "Sending finishedGSS to coordinator: " << _ctx->coordinatorId(); @@ -348,6 +356,7 @@ void WorkerJob::work() { } bool WorkerJob::cancel() { + LOG(INFO) << "Canceling worker job"; _canceled = true; return true; } diff --git a/arangod/Pregel/WorkerContext.cpp b/arangod/Pregel/WorkerContext.cpp index 75e0178d78..bce40d37ce 100755 --- a/arangod/Pregel/WorkerContext.cpp +++ b/arangod/Pregel/WorkerContext.cpp @@ -26,7 +26,7 @@ using namespace arangodb; using namespace arangodb::pregel; -WorkerContext::WorkerContext() { +WorkerContext::WorkerContext(unsigned int en) : _executionNumber(en) { _readCache = new InMessageCache(); _writeCache = new InMessageCache(); } diff --git a/arangod/Pregel/WorkerContext.h b/arangod/Pregel/WorkerContext.h index 67237d64d6..76199cfae9 100755 --- a/arangod/Pregel/WorkerContext.h +++ b/arangod/Pregel/WorkerContext.h @@ -38,7 +38,7 @@ namespace pregel { class WorkerContext { friend class Worker; public: - WorkerContext(); + WorkerContext(unsigned int exeNum); ~WorkerContext(); inline unsigned int executionNumber() { @@ -83,8 +83,9 @@ namespace pregel { private: /// @brief guard to make sure the database is not dropped while used by us - unsigned int _executionNumber; - unsigned int _globalSuperstep; + const unsigned int _executionNumber; + unsigned int _globalSuperstep = 0; + unsigned int _expectedGSS = 0; std::string _coordinatorId; std::string _database; std::string _vertexCollectionName, _vertexCollectionPlanId;