diff --git a/arangod/Pregel/Algos/PageRank.cpp b/arangod/Pregel/Algos/PageRank.cpp index c509de3f46..fe395f33cb 100644 --- a/arangod/Pregel/Algos/PageRank.cpp +++ b/arangod/Pregel/Algos/PageRank.cpp @@ -67,10 +67,6 @@ GraphFormat* PageRankAlgorithm::inputFormat() { return new PageRankGraphFormat(_sourceField, _resultField); } -MessageFormat* PageRankAlgorithm::messageFormat() const { - return new FloatMessageFormat(); -} - MessageCombiner* PageRankAlgorithm::messageCombiner() const { return new FloatSumCombiner(); } diff --git a/arangod/Pregel/Algos/PageRank.h b/arangod/Pregel/Algos/PageRank.h index 97f227849c..fc328750b8 100644 --- a/arangod/Pregel/Algos/PageRank.h +++ b/arangod/Pregel/Algos/PageRank.h @@ -41,7 +41,10 @@ struct PageRankAlgorithm : public SimpleAlgorithm { MasterContext* masterContext(VPackSlice userParams) const override; GraphFormat* inputFormat() override; - MessageFormat* messageFormat() const override; + MessageFormat* messageFormat() const override { + return new FloatMessageFormat(); + } + MessageCombiner* messageCombiner() const override; VertexComputation* createComputation( WorkerConfig const*) const override; diff --git a/arangod/Pregel/Conductor.cpp b/arangod/Pregel/Conductor.cpp index 14829aa8b5..938820db4d 100644 --- a/arangod/Pregel/Conductor.cpp +++ b/arangod/Pregel/Conductor.cpp @@ -334,27 +334,26 @@ void Conductor::startRecovery() { basics::ThreadPool* pool = PregelFeature::instance()->threadPool(); pool->enqueue([this] { // let's wait for a final state in the cluster - for (int i = 0; i < 15; i++) { + for (int i = 0; i < 5; i++) { // on some systems usleep does not // like arguments greater than 1000000 usleep(1000000); } - std::vector goodServers; + /*std::vector goodServers; int res = PregelFeature::instance()->recoveryManager()->filterGoodServers( _dbServers, goodServers); if (res != TRI_ERROR_NO_ERROR) { LOG(ERR) << "Recovery proceedings failed"; cancel(); return; - } + }*/ VPackBuilder b; b.openObject(); b.add(Utils::executionNumberKey, VPackValue(_executionNumber)); b.add(Utils::globalSuperstepKey, VPackValue(_globalSuperstep)); b.close(); - _dbServers = goodServers; - _sendToAllDBServers(Utils::cancelGSSPath, b.slice()); + _sendToAllDBServers(Utils::cancelGSSPath, b.slice());// will fail for some // Let's try recovery if (_algorithm->supportsCompensation()) { @@ -452,10 +451,11 @@ int Conductor::_initializeWorkers(std::string const& suffix, b.add(Utils::coordinatorIdKey, VPackValue(coordinatorId)); b.add(Utils::asyncMode, VPackValue(_asyncMode)); b.add(Utils::lazyLoading, VPackValue(_lazyLoading)); - b.add(Utils::vertexShardsKey, VPackValue(VPackValueType::Object)); if (additional.isObject()) { b.add(additional); } + + b.add(Utils::vertexShardsKey, VPackValue(VPackValueType::Object)); for (auto const& pair : vertexShardMap) { b.add(pair.first, VPackValue(VPackValueType::Array)); for (ShardID const& shard : pair.second) { diff --git a/arangod/Pregel/Recovery.cpp b/arangod/Pregel/Recovery.cpp index 5a86f0c3fc..e3020eb1c4 100644 --- a/arangod/Pregel/Recovery.cpp +++ b/arangod/Pregel/Recovery.cpp @@ -129,7 +129,7 @@ void RecoveryManager::_monitorShard(DatabaseID const& databaseName, std::shared_ptr> servers = ClusterInfo::instance()->getResponsibleServer(shard); if (servers->size() > 0) { - MUTEX_LOCKER(guard, _lock); + // _lock is already held _primaryServers[shard] = servers->at(0); auto call = @@ -146,6 +146,8 @@ int RecoveryManager::filterGoodServers(std::vector const& servers, VPackSlice serversRegistered = result.slice()[0].get(std::vector( {AgencyCommManager::path(), "Supervision", "Health"})); + + LOG(INFO) << "Server Status: " << serversRegistered.toJson(); if (serversRegistered.isObject()) { for (auto const& res : VPackObjectIterator(serversRegistered)) { @@ -188,7 +190,7 @@ void RecoveryManager::updatedFailedServers() { // don't call while holding _lock void RecoveryManager::_renewPrimaryServer(ShardID const& shard) { - MUTEX_LOCKER(guard, _lock); + MUTEX_LOCKER(guard, _lock);// editing ClusterInfo* ci = ClusterInfo::instance(); auto const& conductors = _listeners.find(shard); @@ -206,6 +208,7 @@ void RecoveryManager::_renewPrimaryServer(ShardID const& shard) { if (servers) { ServerID const& nextPrimary = servers->front(); if (currentPrimary->second != nextPrimary) { + _primaryServers[shard] = nextPrimary; for (Conductor* cc : conductors->second) { cc->startRecovery(); @@ -214,7 +217,7 @@ void RecoveryManager::_renewPrimaryServer(ShardID const& shard) { break; } } - usleep(250000); // 250ms + usleep(100000); // 100ms tries++; } while (tries < 2); } diff --git a/arangod/Pregel/Worker.cpp b/arangod/Pregel/Worker.cpp index 7f865e0c14..d7ea6bd09f 100644 --- a/arangod/Pregel/Worker.cpp +++ b/arangod/Pregel/Worker.cpp @@ -514,6 +514,12 @@ void Worker::startRecovery(VPackSlice data) { MUTEX_LOCKER(guard, _commandMutex); _state = WorkerState::RECOVERING; + _writeCache->clear(); + _readCache->clear(); + if (_writeCacheNextGSS) { + _writeCacheNextGSS->clear(); + } + VPackSlice method = data.get(Utils::recoveryMethodKey); if (method.compareString(Utils::compensate) == 0) { // hack to determine newly added vertices @@ -522,7 +528,8 @@ void Worker::startRecovery(VPackSlice data) { _graphStore->loadShards(nextState); _config = nextState; compensateStep(data); - } else if (method.compareString(Utils::rollback) == 0) { + } else {//if (method.compareString(Utils::rollback) == 0) { + LOG(INFO) << "Unsupported operation"; } } diff --git a/scripts/shutdownLocalCluster.sh b/scripts/shutdownLocalCluster.sh index bf58f07df6..6141860663 100755 --- a/scripts/shutdownLocalCluster.sh +++ b/scripts/shutdownLocalCluster.sh @@ -83,6 +83,11 @@ for p in `seq 8629 $PORTTOPDB` ; do testServerDown $p done +# Currently the agency does not wait for all servers to shutdown +# This causes a race condisiton where all servers wait to tell the agency +# they are shutting down +sleep 5 + echo Shutting down agency ... for aid in `seq 0 $(( $NRAGENTS - 1 ))`; do port=$(( 4001 + $aid ))