//////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// /// Copyright 2016 ArangoDB GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// /// Copyright holder is ArangoDB GmbH, Cologne, Germany /// /// @author Simon Grätzer //////////////////////////////////////////////////////////////////////////////// #include "Pregel/Conductor.h" #include "Pregel/Aggregator.h" #include "Pregel/AlgoRegistry.h" #include "Pregel/Algorithm.h" #include "Pregel/MasterContext.h" #include "Pregel/PregelFeature.h" #include "Pregel/Recovery.h" #include "Pregel/ThreadPool.h" #include "Pregel/Utils.h" #include "Basics/MutexLocker.h" #include "Basics/StringUtils.h" #include "Basics/VelocyPackHelper.h" #include "Cluster/ClusterComm.h" #include "Cluster/ClusterInfo.h" #include "Cluster/ServerState.h" #include "VocBase/LogicalCollection.h" #include "VocBase/ticks.h" #include "VocBase/vocbase.h" #include #include using namespace arangodb; using namespace arangodb::pregel; using namespace arangodb::basics; const char* arangodb::pregel::ExecutionStateNames[6] = { "none", "running", "done", "canceled", "in error", "recovering"}; Conductor::Conductor( uint64_t executionNumber, TRI_vocbase_t* vocbase, std::vector> const& vertexCollections, std::vector> const& edgeCollections) : _vocbaseGuard(vocbase), _executionNumber(executionNumber), _vertexCollections(vertexCollections), _edgeCollections(edgeCollections) { TRI_ASSERT(ServerState::instance()->isCoordinator()); LOG_TOPIC(INFO, Logger::PREGEL) << "Constructed conductor"; } Conductor::~Conductor() { this->cancel(); } void Conductor::start(std::string const& algoName, VPackSlice const& config) { if (!config.isObject()) { _userParams.openObject(); _userParams.close(); } else { _userParams.add(config); } _startTimeSecs = TRI_microtime(); _globalSuperstep = 0; _state = ExecutionState::RUNNING; _algorithm.reset(AlgoRegistry::createAlgorithm(algoName, config)); if (!_algorithm) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER, "Algorithm not found"); } _masterContext.reset(_algorithm->masterContext(config)); _aggregators.reset(new AggregatorHandler(_algorithm.get())); _maxSuperstep = VelocyPackHelper::getNumericValue(config, "maxGSS", _maxSuperstep); // configure the async mode as optional VPackSlice async = _userParams.slice().get("async"); _asyncMode = _algorithm->supportsAsyncMode(); _asyncMode = _asyncMode && (async.isNone() || async.getBoolean()); if (_asyncMode) { LOG_TOPIC(INFO, Logger::PREGEL) << "Running in async mode"; } VPackSlice lazy = _userParams.slice().get("lazyLoading"); _lazyLoading = _algorithm->supportsLazyLoading(); _lazyLoading = _lazyLoading && (lazy.isNone() || lazy.getBoolean()); if (_lazyLoading) { LOG_TOPIC(INFO, Logger::PREGEL) << "Enabled lazy loading"; } _storeResults = VelocyPackHelper::getBooleanValue(config, "store", true); if (!_storeResults) { LOG_TOPIC(INFO, Logger::PREGEL) << "Will keep results in-memory"; } LOG_TOPIC(INFO, Logger::PREGEL) << "Telling workers to load the data"; int res = _initializeWorkers(Utils::startExecutionPath, VPackSlice()); if (res != TRI_ERROR_NO_ERROR) { _state = ExecutionState::CANCELED; LOG_TOPIC(ERR, Logger::PREGEL) << "Not all DBServers started the execution"; } } // only called by the conductor, is protected by the // mutex locked in finishedGlobalStep bool Conductor::_startGlobalStep() { // send prepare GSS notice VPackBuilder b; b.openObject(); b.add(Utils::executionNumberKey, VPackValue(_executionNumber)); b.add(Utils::globalSuperstepKey, VPackValue(_globalSuperstep)); b.add(Utils::vertexCountKey, VPackValue(_totalVerticesCount)); b.add(Utils::edgeCountKey, VPackValue(_totalEdgesCount)); b.close(); // we are explicitly expecting an response containing the aggregated // values as well as the count of active vertices std::vector requests; int res = _sendToAllDBServers(Utils::prepareGSSPath, b.slice(), requests); if (res != TRI_ERROR_NO_ERROR) { _state = ExecutionState::IN_ERROR; LOG_TOPIC(INFO, Logger::PREGEL) << "Seems there is at least one worker out of order"; Utils::printResponses(requests); // the recovery mechanisms should take care of this return false; } /// collect the aggregators _aggregators->resetValues(); _statistics.resetActiveCount(); _totalVerticesCount = 0; // might change during execution _totalEdgesCount = 0; for (auto const& req : requests) { VPackSlice payload = req.result.answer->payload(); _aggregators->aggregateValues(payload); _statistics.accumulateActiveCounts(payload); _totalVerticesCount += payload.get(Utils::vertexCountKey).getUInt(); _totalEdgesCount += payload.get(Utils::edgeCountKey).getUInt(); } // workers are done if all messages were processed and no active vertices // are left to process bool proceed = true; if (_masterContext && _globalSuperstep > 0) { // ask algorithm to evaluate aggregated values _masterContext->_globalSuperstep = _globalSuperstep - 1; _masterContext->_enterNextGSS = false; proceed = _masterContext->postGlobalSuperstep(); if (!proceed) { LOG_TOPIC(INFO, Logger::PREGEL) << "Master context ended execution"; } } // TODO make maximum configurable bool done = _globalSuperstep != 0 && _statistics.executionFinished(); if (!proceed || done || _globalSuperstep >= _maxSuperstep) { _state = ExecutionState::DONE; // tells workers to store / discard results if (_storeResults) { _finalizeWorkers(); } else { // just stop the timer _endTimeSecs = TRI_microtime(); LOG_TOPIC(INFO, Logger::PREGEL) << "Done execution took" << totalRuntimeSecs() << " s"; } return false; } if (_masterContext) { _masterContext->_globalSuperstep = _globalSuperstep; _masterContext->_vertexCount = _totalVerticesCount; _masterContext->_edgeCount = _totalEdgesCount; _masterContext->preGlobalSuperstep(); } b.clear(); b.openObject(); b.add(Utils::executionNumberKey, VPackValue(_executionNumber)); b.add(Utils::globalSuperstepKey, VPackValue(_globalSuperstep)); b.add(Utils::vertexCountKey, VPackValue(_totalVerticesCount)); b.add(Utils::edgeCountKey, VPackValue(_totalEdgesCount)); _aggregators->serializeValues(b); b.close(); LOG_TOPIC(INFO, Logger::PREGEL) << b.toString(); // start vertex level operations, does not get a response res = _sendToAllDBServers(Utils::startGSSPath, b.slice()); // call me maybe if (res != TRI_ERROR_NO_ERROR) { _state = ExecutionState::IN_ERROR; LOG_TOPIC(INFO, Logger::PREGEL) << "Conductor could not start GSS " << _globalSuperstep; // the recovery mechanisms should take care od this } else { LOG_TOPIC(INFO, Logger::PREGEL) << "Conductor started new gss " << _globalSuperstep; } return res == TRI_ERROR_NO_ERROR; } // ============ Conductor callbacks =============== void Conductor::finishedWorkerStartup(VPackSlice const& data) { MUTEX_LOCKER(guard, _callbackMutex); _ensureUniqueResponse(data); if (_state != ExecutionState::RUNNING) { LOG_TOPIC(WARN, Logger::PREGEL) << "We are not in a state where we expect a response"; return; } _totalVerticesCount += data.get(Utils::vertexCountKey).getUInt(); _totalEdgesCount += data.get(Utils::edgeCountKey).getUInt(); if (_respondedServers.size() != _dbServers.size()) { return; } LOG_TOPIC(INFO, Logger::PREGEL) << _totalVerticesCount << " vertices, " << _totalEdgesCount << " edges"; if (_masterContext) { _masterContext->_globalSuperstep = 0; _masterContext->_vertexCount = _totalVerticesCount; _masterContext->_edgeCount = _totalEdgesCount; _masterContext->_aggregators = _aggregators.get(); _masterContext->preApplication(); } _computationStartTimeSecs = TRI_microtime(); if (_startGlobalStep()) { // listens for changing primary DBServers on each collection shard RecoveryManager* mngr = PregelFeature::instance()->recoveryManager(); if (mngr) { mngr->monitorCollections(_vertexCollections, this); } } } /// Will optionally send a response, to notify the worker of converging /// aggregator /// values which can be coninually updated (in async mode) VPackBuilder Conductor::finishedWorkerStep(VPackSlice const& data) { MUTEX_LOCKER(guard, _callbackMutex); // this method can be called multiple times in a superstep depending on // whether we are in the async mode uint64_t gss = data.get(Utils::globalSuperstepKey).getUInt(); if (gss != _globalSuperstep || !(_state == ExecutionState::RUNNING || _state == ExecutionState::CANCELED)) { LOG_TOPIC(WARN, Logger::PREGEL) << "Conductor did received a callback from the wrong superstep"; return VPackBuilder(); } // track message counts to decide when to halt or add global barriers. // In normal mode this will wait for a response from each worker, // in async mode this will wait until all messages were processed _statistics.accumulateMessageStats(data); if (_asyncMode == false) { // in async mode we wait for all responded _ensureUniqueResponse(data); // wait for the last worker to respond if (_respondedServers.size() != _dbServers.size()) { return VPackBuilder(); } } else if (_statistics.clientCount() < _dbServers.size() || // no messages !_statistics.allMessagesProcessed()) { // haven't received msgs VPackBuilder response; _aggregators->aggregateValues(data); if (_masterContext) { _masterContext->postLocalSuperstep(); } response.openObject(); _aggregators->serializeValues(response); if (_masterContext && _masterContext->_enterNextGSS) { response.add(Utils::enterNextGSSKey, VPackValue(true)); } response.close(); return response; } LOG_TOPIC(INFO, Logger::PREGEL) << "Finished gss " << _globalSuperstep << " in " << (TRI_microtime() - _computationStartTimeSecs) << "s"; _statistics.debugOutput(); _globalSuperstep++; // don't block the response for workers waiting on this callback // this should allow workers to go into the IDLE state ThreadPool* pool = PregelFeature::instance()->threadPool(); pool->enqueue([this] { MUTEX_LOCKER(cguard, _callbackMutex); if (_state == ExecutionState::RUNNING) { _startGlobalStep(); // trigger next superstep } else if (_state == ExecutionState::CANCELED) { LOG_TOPIC(WARN, Logger::PREGEL) << "Execution was canceled, results will be discarded."; _finalizeWorkers(); // tells workers to store / discard results } else { // this prop shouldn't occur unless we are recovering or in error LOG_TOPIC(WARN, Logger::PREGEL) << "No further action taken after receiving all responses"; } }); return VPackBuilder(); } void Conductor::finishedRecoveryStep(VPackSlice const& data) { MUTEX_LOCKER(guard, _callbackMutex); _ensureUniqueResponse(data); if (_state != ExecutionState::RECOVERING) { LOG_TOPIC(WARN, Logger::PREGEL) << "We are not in a state where we expect a recovery response"; return; } // the recovery mechanism might be gathering state information _aggregators->aggregateValues(data); if (_respondedServers.size() != _dbServers.size()) { return; } // only compensations supported bool proceed = false; if (_masterContext) { proceed = proceed || _masterContext->postCompensation(); } int res = TRI_ERROR_NO_ERROR; if (proceed) { // reset values which are calculated during the superstep _aggregators->resetValues(); if (_masterContext) { _masterContext->preCompensation(); } VPackBuilder b; b.openObject(); b.add(Utils::executionNumberKey, VPackValue(_executionNumber)); _aggregators->serializeValues(b); b.close(); // first allow all workers to run worker level operations res = _sendToAllDBServers(Utils::continueRecoveryPath, b.slice()); } else { LOG_TOPIC(INFO, Logger::PREGEL) << "Recovery finished. Proceeding normally"; // build the message, works for all cases VPackBuilder b; b.openObject(); b.add(Utils::executionNumberKey, VPackValue(_executionNumber)); b.add(Utils::globalSuperstepKey, VPackValue(_globalSuperstep)); b.close(); res = _sendToAllDBServers(Utils::finalizeRecoveryPath, b.slice()); if (res == TRI_ERROR_NO_ERROR) { _state = ExecutionState::RUNNING; _startGlobalStep(); } } if (res != TRI_ERROR_NO_ERROR) { cancel(); LOG_TOPIC(INFO, Logger::PREGEL) << "Recovery failed"; } } void Conductor::cancel() { if (_state == ExecutionState::RUNNING || _state == ExecutionState::RECOVERING || _state == ExecutionState::IN_ERROR) { _state = ExecutionState::CANCELED; _finalizeWorkers(); } } void Conductor::startRecovery() { MUTEX_LOCKER(guard, _callbackMutex); if (_state != ExecutionState::RUNNING && _state != ExecutionState::IN_ERROR) { return; // maybe we are already in recovery mode } else if (_algorithm->supportsCompensation() == false) { LOG_TOPIC(ERR, Logger::PREGEL) << "Execution is not recoverable"; cancel(); return; } // we lost a DBServer, we need to reconfigure all remainging servers // so they load the data for the lost machine _state = ExecutionState::RECOVERING; _statistics.reset(); ThreadPool* pool = PregelFeature::instance()->threadPool(); pool->enqueue([this] { // let's wait for a final state in the cluster // on some systems usleep does not // like arguments greater than 1000000 usleep(1000000); usleep(1000000); if (_state != ExecutionState::RECOVERING) { return; // seems like we are canceled } std::vector goodServers; int res = PregelFeature::instance()->recoveryManager()->filterGoodServers( _dbServers, goodServers); if (res != TRI_ERROR_NO_ERROR) { LOG_TOPIC(ERR, Logger::PREGEL) << "Recovery proceedings failed"; cancel(); return; } _dbServers = goodServers; VPackBuilder b; b.openObject(); b.add(Utils::executionNumberKey, VPackValue(_executionNumber)); b.add(Utils::globalSuperstepKey, VPackValue(_globalSuperstep)); b.close(); _sendToAllDBServers(Utils::cancelGSSPath, b.slice()); if (_state != ExecutionState::RECOVERING) { return; // seems like we are canceled } // Let's try recovery if (_masterContext) { bool proceed = _masterContext->preCompensation(); if (!proceed) { cancel(); } } b.clear(); // start a new message b.openObject(); b.add(Utils::recoveryMethodKey, VPackValue(Utils::compensate)); _aggregators->serializeValues(b); b.close(); _aggregators->resetValues(); // initialize workers will reconfigure the workers and set the // _dbServers list to the new primary DBServers res = _initializeWorkers(Utils::startRecoveryPath, b.slice()); if (res != TRI_ERROR_NO_ERROR) { cancel(); LOG_TOPIC(ERR, Logger::PREGEL) << "Compensation failed"; } }); } static void resolveShards( LogicalCollection const* collection, std::map>>& serverMap, std::vector& allShards) { ClusterInfo* ci = ClusterInfo::instance(); std::shared_ptr> shardIDs = ci->getShardList(collection->cid_as_string()); allShards.insert(allShards.end(), shardIDs->begin(), shardIDs->end()); for (auto const& shard : *shardIDs) { std::shared_ptr> servers = ci->getResponsibleServer(shard); if (servers->size() > 0) { serverMap[(*servers)[0]][collection->name()].push_back(shard); } } } /// should cause workers to start a new execution or begin with recovery /// proceedings int Conductor::_initializeWorkers(std::string const& suffix, VPackSlice additional) { // int64_t vertexCount = 0, edgeCount = 0; std::map collectionPlanIdMap; std::map>> vertexMap, edgeMap; std::vector shardList; // resolve plan id's and shards on the servers for (auto& collection : _vertexCollections) { collectionPlanIdMap.emplace(collection->name(), collection->planId_as_string()); resolveShards(collection.get(), vertexMap, shardList); } for (auto& collection : _edgeCollections) { collectionPlanIdMap.emplace(collection->name(), collection->planId_as_string()); resolveShards(collection.get(), edgeMap, shardList); } _dbServers.clear(); for (auto const& pair : vertexMap) { _dbServers.push_back(pair.first); } // do not reload all shard id's, this list is must stay in the same order if (_allShards.size() == 0) { _allShards = shardList; } std::string const path = Utils::baseUrl(_vocbaseGuard.vocbase()->name()) + suffix; std::string coordinatorId = ServerState::instance()->getId(); LOG_TOPIC(INFO, Logger::PREGEL) << "My id: " << coordinatorId; std::vector requests; for (auto const& it : vertexMap) { ServerID const& server = it.first; std::map> const& vertexShardMap = it.second; std::map> const& edgeShardMap = edgeMap[it.first]; VPackBuilder b; b.openObject(); b.add(Utils::executionNumberKey, VPackValue(_executionNumber)); b.add(Utils::globalSuperstepKey, VPackValue(_globalSuperstep)); b.add(Utils::algorithmKey, VPackValue(_algorithm->name())); b.add(Utils::userParametersKey, _userParams.slice()); b.add(Utils::coordinatorIdKey, VPackValue(coordinatorId)); b.add(Utils::asyncModeKey, VPackValue(_asyncMode)); b.add(Utils::lazyLoadingKey, VPackValue(_lazyLoading)); if (additional.isObject()) { for (auto const& pair : VPackObjectIterator(additional)) { b.add(pair.key.copyString(), pair.value); } } b.add(Utils::vertexShardsKey, VPackValue(VPackValueType::Object)); for (auto const& pair : vertexShardMap) { b.add(pair.first, VPackValue(VPackValueType::Array)); for (ShardID const& shard : pair.second) { b.add(VPackValue(shard)); } b.close(); } b.close(); b.add(Utils::edgeShardsKey, VPackValue(VPackValueType::Object)); for (auto const& pair : edgeShardMap) { b.add(pair.first, VPackValue(VPackValueType::Array)); for (ShardID const& shard : pair.second) { b.add(VPackValue(shard)); } b.close(); } b.close(); b.add(Utils::collectionPlanIdMapKey, VPackValue(VPackValueType::Object)); for (auto const& pair : collectionPlanIdMap) { b.add(pair.first, VPackValue(pair.second)); } b.close(); b.add(Utils::globalShardListKey, VPackValue(VPackValueType::Array)); for (std::string const& shard : _allShards) { b.add(VPackValue(shard)); } b.close(); b.close(); auto body = std::make_shared(b.toJson()); requests.emplace_back("server:" + server, rest::RequestType::POST, path, body); LOG_TOPIC(INFO, Logger::PREGEL) << "Initializing Server " << server; LOG_TOPIC(INFO, Logger::PREGEL) << body; } std::shared_ptr cc = ClusterComm::instance(); size_t nrDone = 0; size_t nrGood = cc->performRequests(requests, 5.0 * 60.0, nrDone, LogTopic("Pregel Conductor")); Utils::printResponses(requests); return nrGood == requests.size() ? TRI_ERROR_NO_ERROR : TRI_ERROR_FAILED; } int Conductor::_finalizeWorkers() { double compEnd = TRI_microtime(); bool store = _state == ExecutionState::DONE; store = store && _storeResults; if (_masterContext) { _masterContext->postApplication(); } // stop monitoring shards RecoveryManager* mngr = PregelFeature::instance()->recoveryManager(); if (mngr) { mngr->stopMonitoring(this); } LOG_TOPIC(INFO, Logger::PREGEL) << "Finalizing workers"; VPackBuilder b; b.openObject(); b.add(Utils::executionNumberKey, VPackValue(_executionNumber)); b.add(Utils::globalSuperstepKey, VPackValue(_globalSuperstep)); b.add(Utils::storeResultsKey, VPackValue(store)); b.close(); int res = _sendToAllDBServers(Utils::finalizeExecutionPath, b.slice()); b.clear(); _endTimeSecs = TRI_microtime(); b.openObject(); b.add("stats", VPackValue(VPackValueType::Object)); _statistics.serializeValues(b); b.close(); _aggregators->serializeValues(b); b.close(); LOG_TOPIC(INFO, Logger::PREGEL) << "Done. We did " << _globalSuperstep << " rounds"; LOG_TOPIC(INFO, Logger::PREGEL) << "Startup Time: " << _computationStartTimeSecs - _startTimeSecs << "s"; LOG_TOPIC(INFO, Logger::PREGEL) << "Computation Time: " << compEnd - _computationStartTimeSecs << "s"; LOG_TOPIC(INFO, Logger::PREGEL) << "Storage Time: " << TRI_microtime() - compEnd << "s"; LOG_TOPIC(INFO, Logger::PREGEL) << "Overall: " << totalRuntimeSecs() << "s"; LOG_TOPIC(INFO, Logger::PREGEL) << "Stats: " << b.toString(); return res; } VPackBuilder Conductor::collectAQLResults() { if (_state != ExecutionState::DONE) { return VPackBuilder(); } VPackBuilder b; b.openObject(); b.add(Utils::executionNumberKey, VPackValue(_executionNumber)); b.close(); std::vector requests; int res = _sendToAllDBServers(Utils::aqlResultsPath, b.slice(), requests); if (res != TRI_ERROR_NO_ERROR) { THROW_ARANGO_EXCEPTION(res); } VPackBuilder messages; for (auto const& req : requests) { VPackSlice payload = req.result.answer->payload(); if (payload.isArray()) { messages.add(payload); } } return messages; } int Conductor::_sendToAllDBServers(std::string const& suffix, VPackSlice const& message) { std::vector requests; return _sendToAllDBServers(suffix, message, requests); } int Conductor::_sendToAllDBServers(std::string const& suffix, VPackSlice const& message, std::vector& requests) { _respondedServers.clear(); std::shared_ptr cc = ClusterComm::instance(); if (_dbServers.size() == 0) { LOG_TOPIC(WARN, Logger::PREGEL) << "No servers registered"; return TRI_ERROR_FAILED; } std::string base = Utils::baseUrl(_vocbaseGuard.vocbase()->name()); auto body = std::make_shared(message.toJson()); for (auto const& server : _dbServers) { requests.emplace_back("server:" + server, rest::RequestType::POST, base + suffix, body); } size_t nrDone = 0; size_t nrGood = cc->performRequests(requests, 5.0 * 60.0, nrDone, LogTopic("Pregel Conductor")); LOG_TOPIC(INFO, Logger::PREGEL) << "Send " << suffix << " to " << nrDone << " servers"; Utils::printResponses(requests); return nrGood == requests.size() ? TRI_ERROR_NO_ERROR : TRI_ERROR_FAILED; } void Conductor::_ensureUniqueResponse(VPackSlice body) { // check if this the only time we received this ServerID sender = body.get(Utils::senderKey).copyString(); if (_respondedServers.find(sender) != _respondedServers.end()) { LOG_TOPIC(ERR, Logger::PREGEL) << "Received response already from " << sender; THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_CONFLICT); } _respondedServers.insert(sender); }