//////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// /// Copyright 2016 ArangoDB GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// /// Copyright holder is ArangoDB GmbH, Cologne, Germany /// /// @author Simon Grätzer //////////////////////////////////////////////////////////////////////////////// #include "Pregel/Worker.h" #include "Pregel/Aggregator.h" #include "Pregel/CommonFormats.h" #include "Pregel/GraphStore.h" #include "Pregel/IncomingCache.h" #include "Pregel/OutgoingCache.h" #include "Pregel/PregelFeature.h" #include "Pregel/Utils.h" #include "Pregel/VertexComputation.h" #include "Pregel/WorkerConfig.h" #include "Basics/MutexLocker.h" #include "Basics/ReadLocker.h" #include "Basics/WriteLocker.h" #include "Basics/system-compiler.h" #include "Cluster/ServerState.h" #include "Futures/Utilities.h" #include "Network/NetworkFeature.h" #include "Network/Methods.h" #include "Scheduler/Scheduler.h" #include "Scheduler/SchedulerFeature.h" #include "VocBase/ticks.h" #include "VocBase/vocbase.h" #include #include using namespace arangodb; using namespace arangodb::basics; using namespace arangodb::pregel; #define MY_READ_LOCKER(obj, lock) \ ReadLocker obj(&lock, arangodb::basics::LockerType::BLOCKING, \ true, __FILE__, __LINE__) #define MY_WRITE_LOCKER(obj, lock) \ WriteLocker obj(&lock, arangodb::basics::LockerType::BLOCKING, \ true, __FILE__, __LINE__) template Worker::Worker(TRI_vocbase_t& vocbase, Algorithm* algo, VPackSlice initConfig) : _state(WorkerState::IDLE), _config(&vocbase, initConfig), _algorithm(algo), _nextGSSSendMessageCount(0), _requestedNextGSS(false) { MUTEX_LOCKER(guard, _commandMutex); VPackSlice userParams = initConfig.get(Utils::userParametersKey); _workerContext.reset(algo->workerContext(userParams)); _messageFormat.reset(algo->messageFormat()); _messageCombiner.reset(algo->messageCombiner()); _conductorAggregators.reset(new AggregatorHandler(algo)); _workerAggregators.reset(new AggregatorHandler(algo)); _graphStore.reset(new GraphStore(vocbase, _algorithm->inputFormat())); if (_config.asynchronousMode()) { _messageBatchSize = _algorithm->messageBatchSize(_config, _messageStats); } else { _messageBatchSize = 5000; } _initializeMessageCaches(); } template Worker::~Worker() { _state = WorkerState::DONE; std::this_thread::sleep_for(std::chrono::milliseconds(50)); // wait for threads to die delete _readCache; delete _writeCache; delete _writeCacheNextGSS; for (InCache* cache : _inCaches) { delete cache; } for (OutCache* cache : _outCaches) { delete cache; } _writeCache = nullptr; } template void Worker::_initializeMessageCaches() { const size_t p = _config.parallelism(); if (_messageCombiner) { _readCache = new CombiningInCache(&_config, _messageFormat.get(), _messageCombiner.get()); _writeCache = new CombiningInCache(&_config, _messageFormat.get(), _messageCombiner.get()); if (_config.asynchronousMode()) { _writeCacheNextGSS = new CombiningInCache(&_config, _messageFormat.get(), _messageCombiner.get()); } for (size_t i = 0; i < p; i++) { auto incoming = std::make_unique>(nullptr, _messageFormat.get(), _messageCombiner.get()); _inCaches.push_back(incoming.get()); _outCaches.push_back(new CombiningOutCache(&_config, _messageFormat.get(), _messageCombiner.get())); incoming.release(); } } else { _readCache = new ArrayInCache(&_config, _messageFormat.get()); _writeCache = new ArrayInCache(&_config, _messageFormat.get()); if (_config.asynchronousMode()) { _writeCacheNextGSS = new ArrayInCache(&_config, _messageFormat.get()); } for (size_t i = 0; i < p; i++) { auto incoming = std::make_unique>(nullptr, _messageFormat.get()); _inCaches.push_back(incoming.get()); _outCaches.push_back(new ArrayOutCache(&_config, _messageFormat.get())); incoming.release(); } } } // @brief load the initial worker data, call conductor eventually template void Worker::setupWorker() { std::function cb = [this] { VPackBuilder package; package.openObject(); package.add(Utils::senderKey, VPackValue(ServerState::instance()->getId())); package.add(Utils::executionNumberKey, VPackValue(_config.executionNumber())); package.add(Utils::vertexCountKey, VPackValue(_graphStore->localVertexCount())); package.add(Utils::edgeCountKey, VPackValue(_graphStore->localEdgeCount())); package.close(); _callConductor(Utils::finishedStartupPath, package); }; if (_config.lazyLoading()) { // TODO maybe lazy loading needs to be performed on another thread too std::set activeSet = _algorithm->initialActiveSet(); if (activeSet.size() == 0) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "There needs to be one active vertice"); } for (std::string const& documentID : activeSet) { _graphStore->loadDocument(&_config, documentID); } cb(); } else { // initialization of the graphstore might take an undefined amount // of time. Therefore this is performed asynchronous TRI_ASSERT(SchedulerFeature::SCHEDULER != nullptr); Scheduler* scheduler = SchedulerFeature::SCHEDULER; auto self = shared_from_this(); bool queued = scheduler->queue(RequestLane::INTERNAL_LOW, [self, this, cb] { _graphStore->loadShards(&_config, cb); }); if (!queued) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_QUEUE_FULL, "No available thread to load shards"); } } } template void Worker::prepareGlobalStep(VPackSlice const& data, VPackBuilder& response) { // Only expect serial calls from the conductor. // Lock to prevent malicous activity MUTEX_LOCKER(guard, _commandMutex); if (_state != WorkerState::IDLE) { LOG_TOPIC("b8506", ERR, Logger::PREGEL) << "Cannot prepare a gss when the worker is not idle"; THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "Cannot prepare a gss when the worker is not idle"); } _state = WorkerState::PREPARING; // stop any running step LOG_TOPIC("f16f2", DEBUG, Logger::PREGEL) << "Received prepare GSS: " << data.toJson(); VPackSlice gssSlice = data.get(Utils::globalSuperstepKey); if (!gssSlice.isInteger()) { THROW_ARANGO_EXCEPTION_FORMAT(TRI_ERROR_BAD_PARAMETER, "Invalid gss in %s:%d", __FILE__, __LINE__); } const uint64_t gss = (uint64_t)gssSlice.getUInt(); if (_expectedGSS != gss) { THROW_ARANGO_EXCEPTION_FORMAT( TRI_ERROR_BAD_PARAMETER, "Seems like this worker missed a gss, expected %u. Data = %s ", _expectedGSS, data.toJson().c_str()); } // initialize worker context if (_workerContext && gss == 0 && _config.localSuperstep() == 0) { _workerContext->_readAggregators = _conductorAggregators.get(); _workerContext->_writeAggregators = _workerAggregators.get(); _workerContext->_vertexCount = data.get(Utils::vertexCountKey).getUInt(); _workerContext->_edgeCount = data.get(Utils::edgeCountKey).getUInt(); _workerContext->preApplication(); } // make us ready to receive messages _config._globalSuperstep = gss; // write cache becomes the readable cache if (_config.asynchronousMode()) { MY_WRITE_LOCKER(wguard, _cacheRWLock); // by design shouldn't be necessary TRI_ASSERT(_readCache->containedMessageCount() == 0); TRI_ASSERT(_writeCache->containedMessageCount() == 0); std::swap(_readCache, _writeCacheNextGSS); _writeCache->clear(); _requestedNextGSS = false; // only relevant for async _messageStats.sendCount = _nextGSSSendMessageCount; _nextGSSSendMessageCount = 0; } else { MY_WRITE_LOCKER(wguard, _cacheRWLock); TRI_ASSERT(_readCache->containedMessageCount() == 0); std::swap(_readCache, _writeCache); _config._localSuperstep = gss; } // only place where is makes sense to call this, since startGlobalSuperstep // might not be called again if (_workerContext && gss > 0) { _workerContext->postGlobalSuperstep(gss - 1); } // responds with info which allows the conductor to decide whether // to start the next GSS or end the execution response.openObject(); response.add(Utils::senderKey, VPackValue(ServerState::instance()->getId())); response.add(Utils::activeCountKey, VPackValue(_activeCount)); response.add(Utils::vertexCountKey, VPackValue(_graphStore->localVertexCount())); response.add(Utils::edgeCountKey, VPackValue(_graphStore->localEdgeCount())); _workerAggregators->serializeValues(response); response.close(); } template void Worker::receivedMessages(VPackSlice const& data) { VPackSlice gssSlice = data.get(Utils::globalSuperstepKey); uint64_t gss = gssSlice.getUInt(); if (gss == _config._globalSuperstep) { { // make sure the pointer is not changed while // parsing messages MY_READ_LOCKER(guard, _cacheRWLock); // handles locking for us _writeCache->parseMessages(data); } // Trigger the processing of vertices if (_config.asynchronousMode() && _state == WorkerState::IDLE) { _continueAsync(); } } else if (_config.asynchronousMode() && gss == _config._globalSuperstep + 1) { MY_READ_LOCKER(guard, _cacheRWLock); _writeCacheNextGSS->parseMessages(data); } else { LOG_TOPIC("ecd34", ERR, Logger::PREGEL) << "Expected: " << _config._globalSuperstep << "Got: " << gss; THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER, "Superstep out of sync"); } } /// @brief Setup next superstep template void Worker::startGlobalStep(VPackSlice const& data) { // Only expect serial calls from the conductor. // Lock to prevent malicous activity MUTEX_LOCKER(guard, _commandMutex); if (_state != WorkerState::PREPARING) { THROW_ARANGO_EXCEPTION_MESSAGE( TRI_ERROR_INTERNAL, "Cannot start a gss when the worker is not prepared"); } LOG_TOPIC("d5e44", DEBUG, Logger::PREGEL) << "Starting GSS: " << data.toJson(); VPackSlice gssSlice = data.get(Utils::globalSuperstepKey); const uint64_t gss = (uint64_t)gssSlice.getUInt(); if (gss != _config.globalSuperstep()) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER, "Wrong GSS"); } _workerAggregators->resetValues(); _conductorAggregators->setAggregatedValues(data); // execute context if (_workerContext) { _workerContext->_vertexCount = data.get(Utils::vertexCountKey).getUInt(); _workerContext->_edgeCount = data.get(Utils::edgeCountKey).getUInt(); _workerContext->preGlobalSuperstep(gss); } LOG_TOPIC("39e20", DEBUG, Logger::PREGEL) << "Worker starts new gss: " << gss; _startProcessing(); // sets _state = COMPUTING; } template void Worker::cancelGlobalStep(VPackSlice const& data) { MUTEX_LOCKER(guard, _commandMutex); _state = WorkerState::DONE; _workHandle.reset(); } /// WARNING only call this while holding the _commandMutex template void Worker::_startProcessing() { _state = WorkerState::COMPUTING; _activeCount = 0; // active count is only valid after the run TRI_ASSERT(SchedulerFeature::SCHEDULER != nullptr); Scheduler* scheduler = SchedulerFeature::SCHEDULER; size_t total = _graphStore->localVertexCount(); size_t numSegments = _graphStore->numberVertexSegments(); if (total > 100000) { _runningThreads = std::min(_config.parallelism(), numSegments); } else { _runningThreads = 1; } TRI_ASSERT(_runningThreads >= 1); TRI_ASSERT(_runningThreads <= _config.parallelism()); size_t numT = _runningThreads; auto self = shared_from_this(); for (size_t i = 0; i < numT; i++) { bool queued = scheduler->queue(RequestLane::INTERNAL_LOW, [self, this, i, numT, numSegments] { if (_state != WorkerState::COMPUTING) { LOG_TOPIC("f0e3d", WARN, Logger::PREGEL) << "Execution aborted prematurely."; return; } size_t startI = i * (numSegments / numT); size_t endI = (i+1) * (numSegments / numT); TRI_ASSERT(endI <= numSegments); auto vertices = _graphStore->vertexIterator(startI, endI); // should work like a join operation if (_processVertices(i, vertices) && _state == WorkerState::COMPUTING) { _finishedProcessing(); // last thread turns the lights out } }); if (!queued) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_QUEUE_FULL, "No thread available to start processing"); } } // TRI_ASSERT(_runningThreads == i); LOG_TOPIC("425c3", DEBUG, Logger::PREGEL) << "Using " << numT << " Threads"; } template void Worker::_initializeVertexContext(VertexContext* ctx) { ctx->_gss = _config.globalSuperstep(); ctx->_lss = _config.localSuperstep(); ctx->_context = _workerContext.get(); ctx->_graphStore = _graphStore.get(); ctx->_readAggregators = _conductorAggregators.get(); } // internally called in a WORKER THREAD!! template bool Worker::_processVertices(size_t threadId, RangeIterator>& vertexIterator) { double start = TRI_microtime(); // thread local caches InCache* inCache = _inCaches[threadId]; OutCache* outCache = _outCaches[threadId]; outCache->setBatchSize(_messageBatchSize); outCache->setLocalCache(inCache); if (_config.asynchronousMode()) { outCache->sendToNextGSS(_requestedNextGSS); outCache->setLocalCacheNextGSS(_writeCacheNextGSS); TRI_ASSERT(outCache->sendCountNextGSS() == 0); } TRI_ASSERT(outCache->sendCount() == 0); AggregatorHandler workerAggregator(_algorithm.get()); // TODO look if we can avoid instantiating this std::unique_ptr> vertexComputation( _algorithm->createComputation(&_config)); _initializeVertexContext(vertexComputation.get()); vertexComputation->_writeAggregators = &workerAggregator; vertexComputation->_cache = outCache; if (!_config.asynchronousMode()) { // Should cause enterNextGlobalSuperstep to do nothing vertexComputation->_enterNextGSS = true; } size_t activeCount = 0; for (; vertexIterator.hasMore(); ++vertexIterator) { Vertex* vertexEntry = *vertexIterator; MessageIterator messages = _readCache->getMessages(vertexEntry->shard(), vertexEntry->key()); if (messages.size() > 0 || vertexEntry->active()) { vertexComputation->_vertexEntry = vertexEntry; vertexComputation->compute(messages); if (vertexEntry->active()) { activeCount++; } } if (_state != WorkerState::COMPUTING) { break; } } // ==================== send messages to other shards ==================== outCache->flushMessages(); if (ADB_UNLIKELY(!_writeCache)) { // ~Worker was called LOG_TOPIC("ee2ab", WARN, Logger::PREGEL) << "Execution aborted prematurely."; return false; } if (vertexComputation->_enterNextGSS) { _requestedNextGSS = true; _nextGSSSendMessageCount += outCache->sendCountNextGSS(); } // double t = TRI_microtime(); // merge thread local messages, _writeCache does locking _writeCache->mergeCache(_config, inCache); // TODO ask how to implement message sending without waiting for a response // t = TRI_microtime() - t; MessageStats stats; stats.sendCount = outCache->sendCount(); stats.superstepRuntimeSecs = TRI_microtime() - start; inCache->clear(); outCache->clear(); bool lastThread = false; { // only one thread at a time MUTEX_LOCKER(guard, _threadMutex); // merge the thread local stats and aggregators _workerAggregators->aggregateValues(workerAggregator); _messageStats.accumulate(stats); _activeCount += activeCount; _runningThreads--; lastThread = _runningThreads == 0; // should work like a join operation } return lastThread; } // called at the end of a worker thread, needs mutex template void Worker::_finishedProcessing() { { MUTEX_LOCKER(guard, _threadMutex); if (_runningThreads != 0) { THROW_ARANGO_EXCEPTION_MESSAGE( TRI_ERROR_INTERNAL, "only one thread should ever enter this region"); } } VPackBuilder package; { // only lock after there are no more processing threads MUTEX_LOCKER(guard, _commandMutex); if (_state != WorkerState::COMPUTING) { return; // probably canceled } // count all received messages _messageStats.receivedCount = _readCache->containedMessageCount(); // lazy loading and async mode are a little tricky // the correct halting requires us to accurately track the number // of messages send or received, and to report them to the coordinator if (_config.lazyLoading()) { // TODO how to improve this? // hack to determine newly added vertices size_t currentAVCount = _graphStore->localVertexCount(); auto it = _graphStore->vertexIterator(); for (; it.hasMore(); ++it) { Vertex* vertexEntry = *it; // reduces the containedMessageCount _readCache->erase(vertexEntry->shard(), vertexEntry->key()); } _readCache->forEach([this](PregelShard shard, VPackStringRef const& key, M const&) { _graphStore->loadDocument(&_config, shard, key); }); // only do this expensive merge operation if there are new vertices size_t total = _graphStore->localVertexCount(); if (total > currentAVCount) { if (_config.asynchronousMode()) { // just process these vertices in the next superstep MY_READ_LOCKER(guard, _cacheRWLock); _writeCache->mergeCache(_config, _readCache); // compute in next superstep _messageStats.sendCount += _readCache->containedMessageCount(); } else { // TODO call _startProcessing ??? _runningThreads = 1; auto addedVertices = _graphStore->vertexIterator(currentAVCount, total); _processVertices(0, addedVertices); } } } _readCache->clear(); // no need to keep old messages around _expectedGSS = _config._globalSuperstep + 1; _config._localSuperstep++; // only set the state here, because _processVertices checks for it _state = WorkerState::IDLE; package.openObject(); package.add(Utils::senderKey, VPackValue(ServerState::instance()->getId())); package.add(Utils::executionNumberKey, VPackValue(_config.executionNumber())); package.add(Utils::globalSuperstepKey, VPackValue(_config.globalSuperstep())); _messageStats.serializeValues(package); if (_config.asynchronousMode()) { _workerAggregators->serializeValues(package, true); } package.close(); if (_config.asynchronousMode()) { // async adaptive message buffering _messageBatchSize = _algorithm->messageBatchSize(_config, _messageStats); } else { uint64_t tn = _config.parallelism(); uint64_t s = _messageStats.sendCount / tn / 2UL; _messageBatchSize = s > 1000 ? (uint32_t)s : 1000; } _messageStats.resetTracking(); LOG_TOPIC("13dbf", DEBUG, Logger::PREGEL) << "Batch size: " << _messageBatchSize; } if (_config.asynchronousMode()) { LOG_TOPIC("56a27", DEBUG, Logger::PREGEL) << "Finished LSS: " << package.toJson(); // if the conductor is unreachable or has send data (try to) proceed _callConductorWithResponse(Utils::finishedWorkerStepPath, package, [this](VPackSlice response) { if (response.isObject()) { _conductorAggregators->aggregateValues(response); // only aggregate values VPackSlice nextGSS = response.get(Utils::enterNextGSSKey); if (nextGSS.isBool()) { _requestedNextGSS = _requestedNextGSS || nextGSS.getBool(); } _continueAsync(); } }); } else { // no answer expected _callConductor(Utils::finishedWorkerStepPath, package); LOG_TOPIC("2de5b", DEBUG, Logger::PREGEL) << "Finished GSS: " << package.toJson(); } } /// WARNING only call this while holding the _commandMutex /// in async mode checks if there are messages to process template void Worker::_continueAsync() { { MUTEX_LOCKER(guard, _commandMutex); if (_state != WorkerState::IDLE || _writeCache->containedMessageCount() == 0) { return; } // avoid calling this method accidentially _state = WorkerState::COMPUTING; } TRI_ASSERT(SchedulerFeature::SCHEDULER != nullptr); // wait for new messages before beginning to process int64_t milli = _writeCache->containedMessageCount() < _messageBatchSize ? 50 : 5; // start next iteration in $milli mseconds. bool queued = false; std::tie(queued, _workHandle) = SchedulerFeature::SCHEDULER->queueDelay( RequestLane::INTERNAL_LOW, std::chrono::milliseconds(milli), [this](bool cancelled) { if (!cancelled) { { // swap these pointers atomically MY_WRITE_LOCKER(guard, _cacheRWLock); std::swap(_readCache, _writeCache); if (_writeCacheNextGSS->containedMessageCount() > 0) { _requestedNextGSS = true; } } MUTEX_LOCKER(guard, _commandMutex); // overwrite conductor values with local values _conductorAggregators->resetValues(); _conductorAggregators->aggregateValues(*_workerAggregators.get()); _workerAggregators->resetValues(); _startProcessing(); } }); if (!queued) { THROW_ARANGO_EXCEPTION_MESSAGE( TRI_ERROR_QUEUE_FULL, "No thread available to continue execution."); } } template void Worker::finalizeExecution(VPackSlice const& body, std::function cb) { // Only expect serial calls from the conductor. // Lock to prevent malicous activity MUTEX_LOCKER(guard, _commandMutex); if (_state == WorkerState::DONE) { LOG_TOPIC("4067a", DEBUG, Logger::PREGEL) << "removing worker"; cb(); return; } auto self = shared_from_this(); auto cleanup = [self, this, cb] { VPackBuilder body; body.openObject(); body.add(Utils::senderKey, VPackValue(ServerState::instance()->getId())); body.add(Utils::executionNumberKey, VPackValue(_config.executionNumber())); body.close(); _callConductor(Utils::finishedWorkerFinalizationPath, body); cb(); }; _state = WorkerState::DONE; VPackSlice store = body.get(Utils::storeResultsKey); if (store.isBool() && store.getBool() == true) { LOG_TOPIC("91264", DEBUG, Logger::PREGEL) << "Storing results"; // tell graphstore to remove read locks _graphStore->storeResults(&_config, std::move(cleanup)); } else { LOG_TOPIC("b3f35", WARN, Logger::PREGEL) << "Discarding results"; cleanup(); } } template void Worker::aqlResult(VPackBuilder& b, bool withId) const { MUTEX_LOCKER(guard, _commandMutex); TRI_ASSERT(b.isEmpty()); // std::vector const& shards = _config.globalShardIDs(); std::string tmp; b.openArray(/*unindexed*/true); auto it = _graphStore->vertexIterator(); for (; it.hasMore(); ++it) { Vertex const* vertexEntry = *it; TRI_ASSERT(vertexEntry->shard() < _config.globalShardIDs().size()); ShardID const& shardId = _config.globalShardIDs()[vertexEntry->shard()]; b.openObject(/*unindexed*/true); if (withId) { std::string const& cname = _config.shardIDToCollectionName(shardId); if (!cname.empty()) { tmp.clear(); tmp.append(cname); tmp.push_back('/'); tmp.append(vertexEntry->key().data(), vertexEntry->key().size()); b.add(StaticStrings::IdString, VPackValue(tmp)); } } b.add(StaticStrings::KeyString, VPackValuePair(vertexEntry->key().data(), vertexEntry->key().size(), VPackValueType::String)); V const& data = vertexEntry->data(); // bool store = _graphStore->graphFormat()->buildVertexDocument(b, &data, sizeof(V)); b.close(); } b.close(); } template void Worker::startRecovery(VPackSlice const& data) { // other methods might lock _commandMutex MUTEX_LOCKER(guard, _commandMutex); VPackSlice method = data.get(Utils::recoveryMethodKey); if (method.compareString(Utils::compensate) != 0) { LOG_TOPIC("742c5", ERR, Logger::PREGEL) << "Unsupported operation"; return; } // else if (method.compareString(Utils::rollback) == 0) _state = WorkerState::RECOVERING; { MY_WRITE_LOCKER(guard, _cacheRWLock); _writeCache->clear(); _readCache->clear(); if (_writeCacheNextGSS) { _writeCacheNextGSS->clear(); } } VPackBuilder copy(data); // hack to determine newly added vertices _preRecoveryTotal = _graphStore->localVertexCount(); WorkerConfig nextState(_config); nextState.updateConfig(data); _graphStore->loadShards(&nextState, [this, nextState, copy] { _config = nextState; compensateStep(copy.slice()); }); } template void Worker::compensateStep(VPackSlice const& data) { MUTEX_LOCKER(guard, _commandMutex); _workerAggregators->resetValues(); _conductorAggregators->setAggregatedValues(data); TRI_ASSERT(SchedulerFeature::SCHEDULER != nullptr); Scheduler* scheduler = SchedulerFeature::SCHEDULER; auto self = shared_from_this(); bool queued = scheduler->queue(RequestLane::INTERNAL_LOW, [self, this] { if (_state != WorkerState::RECOVERING) { LOG_TOPIC("554e2", WARN, Logger::PREGEL) << "Compensation aborted prematurely."; return; } auto vertexIterator = _graphStore->vertexIterator(); std::unique_ptr> vCompensate( _algorithm->createCompensation(&_config)); _initializeVertexContext(vCompensate.get()); if (!vCompensate) { _state = WorkerState::DONE; LOG_TOPIC("938d2", WARN, Logger::PREGEL) << "Compensation aborted prematurely."; return; } vCompensate->_writeAggregators = _workerAggregators.get(); size_t i = 0; for (; vertexIterator.hasMore(); ++vertexIterator) { Vertex* vertexEntry = *vertexIterator; vCompensate->_vertexEntry = vertexEntry; vCompensate->compensate(i > _preRecoveryTotal); i++; if (_state != WorkerState::RECOVERING) { LOG_TOPIC("e9011", WARN, Logger::PREGEL) << "Execution aborted prematurely."; break; } } VPackBuilder package; package.openObject(); package.add(Utils::senderKey, VPackValue(ServerState::instance()->getId())); package.add(Utils::executionNumberKey, VPackValue(_config.executionNumber())); package.add(Utils::globalSuperstepKey, VPackValue(_config.globalSuperstep())); _workerAggregators->serializeValues(package); package.close(); _callConductor(Utils::finishedRecoveryPath, package); }); if (!queued) { THROW_ARANGO_EXCEPTION_MESSAGE( TRI_ERROR_QUEUE_FULL, "No thread available to queue compensation."); } } template void Worker::finalizeRecovery(VPackSlice const& data) { MUTEX_LOCKER(guard, _commandMutex); if (_state != WorkerState::RECOVERING) { LOG_TOPIC("22e42", WARN, Logger::PREGEL) << "Compensation aborted prematurely."; return; } _expectedGSS = data.get(Utils::globalSuperstepKey).getUInt(); _messageStats.resetTracking(); _state = WorkerState::IDLE; LOG_TOPIC("17f3c", INFO, Logger::PREGEL) << "Recovery finished"; } template void Worker::_callConductor(std::string const& path, VPackBuilder const& message) { if (ServerState::instance()->isRunningInCluster() == false) { TRI_ASSERT(SchedulerFeature::SCHEDULER != nullptr); Scheduler* scheduler = SchedulerFeature::SCHEDULER; auto self = shared_from_this(); bool queued = scheduler->queue(RequestLane::INTERNAL_LOW, [this, self, path, message] { VPackBuilder response; PregelFeature::handleConductorRequest(*_config.vocbase(), path, message.slice(), response); }); if (!queued) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_QUEUE_FULL, "No thread available to call conductor"); } } else { std::string baseUrl = Utils::baseUrl(_config.database(), Utils::conductorPrefix); VPackBuffer buffer; buffer.append(message.data(), message.size()); application_features::ApplicationServer& server = _config.vocbase()->server(); auto const& nf = server.getFeature(); network::ConnectionPool* pool = nf.pool(); network::sendRequest(pool, "server:" + _config.coordinatorId(), fuerte::RestVerb::Post, baseUrl + path, std::move(buffer)); } } template void Worker::_callConductorWithResponse(std::string const& path, VPackBuilder const& message, std::function handle) { LOG_TOPIC("6d349", TRACE, Logger::PREGEL) << "Calling the conductor"; if (ServerState::instance()->isRunningInCluster() == false) { VPackBuilder response; PregelFeature::handleConductorRequest(*_config.vocbase(), path, message.slice(), response); handle(response.slice()); } else { std::string baseUrl = Utils::baseUrl(_config.database(), Utils::conductorPrefix); application_features::ApplicationServer& server = _config.vocbase()->server(); auto const& nf = server.getFeature(); network::ConnectionPool* pool = nf.pool(); VPackBuffer buffer; buffer.append(message.data(), message.size()); network::RequestOptions reqOpts; reqOpts.skipScheduler = true; network::Response r = network::sendRequest(pool, "server:" + _config.coordinatorId(), fuerte::RestVerb::Post, baseUrl + path, std::move(buffer), reqOpts).get(); if (handle) { handle(r.slice()); } } }