From f5f5c773e4869bd5e9b710763c89138deba696b0 Mon Sep 17 00:00:00 2001 From: jsteemann Date: Mon, 5 Sep 2016 13:55:04 +0200 Subject: [PATCH 1/3] make error message unambiguous --- arangod/Replication/InitialSyncer.cpp | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/arangod/Replication/InitialSyncer.cpp b/arangod/Replication/InitialSyncer.cpp index d22d570615..a5fce5bda3 100644 --- a/arangod/Replication/InitialSyncer.cpp +++ b/arangod/Replication/InitialSyncer.cpp @@ -699,8 +699,7 @@ int InitialSyncer::handleCollectionDump( } if (response->getHttpReturnCode() == 404) { // unknown job, we can abort - errorMsg = "no response received from master at " + - _masterInfo._endpoint; + errorMsg = "job not found on master at " + _masterInfo._endpoint; return TRI_ERROR_REPLICATION_NO_RESPONSE; } } @@ -889,8 +888,7 @@ int InitialSyncer::handleCollectionSync( } if (response->getHttpReturnCode() == 404) { // unknown job, we can abort - errorMsg = "no response received from master at " + - _masterInfo._endpoint; + errorMsg = "job not found on master at " + _masterInfo._endpoint; return TRI_ERROR_REPLICATION_NO_RESPONSE; } } From 2543091b7c959b064710134304b9b6c1aba78fe8 Mon Sep 17 00:00:00 2001 From: Max Neunhoeffer Date: Mon, 5 Sep 2016 14:00:25 +0200 Subject: [PATCH 2/3] Add further logging where a bug is likely. --- lib/Rest/GeneralResponse.h | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/lib/Rest/GeneralResponse.h b/lib/Rest/GeneralResponse.h index 3489764c28..d5072f9d94 100644 --- a/lib/Rest/GeneralResponse.h +++ b/lib/Rest/GeneralResponse.h @@ -127,7 +127,13 @@ class GeneralResponse { arangodb::velocypack::Options const& = arangodb::velocypack::Options::Defaults) = 0; - void addPayloadPreconditions() { TRI_ASSERT(_vpackPayloads.size() == 0); } + void addPayloadPreconditions() { + if (_vpackPayloads.size() != 0) { + LOG(ERR) << "Payload set twice"; + TRI_ASSERT(_vpackPayloads.size() == 0); + } + } + virtual void addPayloadPreHook(bool inputIsBuffer, bool& resolveExternals) {} virtual void addPayloadPostHook( arangodb::velocypack::Options const* options) {} From 49d7cf345f2da6f4f037e4354bf30feb2ba93b96 Mon Sep 17 00:00:00 2001 From: Max Neunhoeffer Date: Mon, 5 Sep 2016 16:03:28 +0200 Subject: [PATCH 3/3] Improve version handling in ClusterInfo. --- arangod/Cluster/ClusterInfo.cpp | 66 ++++++++++++++++++++------------- arangod/Cluster/ClusterInfo.h | 48 ++++++++++++------------ 2 files changed, 65 insertions(+), 49 deletions(-) diff --git a/arangod/Cluster/ClusterInfo.cpp b/arangod/Cluster/ClusterInfo.cpp index 480daeefcd..b997af2c43 100644 --- a/arangod/Cluster/ClusterInfo.cpp +++ b/arangod/Cluster/ClusterInfo.cpp @@ -379,9 +379,13 @@ void ClusterInfo::loadPlan() { DatabaseFeature* databaseFeature = application_features::ApplicationServer::getFeature( "Database"); - uint64_t storedVersion = _planProt.version; - MUTEX_LOCKER(mutexLocker, _planProt.mutex); - if (_planProt.version > storedVersion) { + ++_planProt.wantedVersion; // Indicate that after *NOW* somebody has to + // reread from the agency! + MUTEX_LOCKER(mutexLocker, _planProt.mutex); // only one may work at a time + uint64_t storedVersion = _planProt.wantedVersion; // this is the version + // we will set in the end + + if (_planProt.doneVersion == storedVersion) { // Somebody else did, what we intended to do, so just return return; } @@ -502,7 +506,7 @@ void ClusterInfo::loadPlan() { _shards.swap(newShards); _shardKeys.swap(newShardKeys); } - _planProt.version++; // such that others notice our change + _planProt.doneVersion = storedVersion; _planProt.isValid = true; // will never be reset to false } else { LOG(ERR) << "\"Plan\" is not an object in agency"; @@ -525,9 +529,12 @@ void ClusterInfo::loadPlan() { static std::string const prefixCurrent = "Current"; void ClusterInfo::loadCurrent() { - uint64_t storedVersion = _currentProt.version; - MUTEX_LOCKER(mutexLocker, _currentProt.mutex); - if (_currentProt.version > storedVersion) { + ++_currentProt.wantedVersion; // Indicate that after *NOW* somebody has to + // reread from the agency! + MUTEX_LOCKER(mutexLocker, _currentProt.mutex); // only one may work at a time + uint64_t storedVersion = _currentProt.wantedVersion; // this is the version + // we will set at the end + if (_currentProt.doneVersion == storedVersion) { // Somebody else did, what we intended to do, so just return return; } @@ -581,7 +588,7 @@ void ClusterInfo::loadCurrent() { std::string const collectionName = collectionSlice.key.copyString(); auto collectionDataCurrent = std::make_shared(); - for (auto const& shardSlice : VPackObjectIterator(collectionSlice.value)) { + for (auto const& shardSlice : VPackObjectIterator(collectionSlice.value)) { std::string const shardID = shardSlice.key.copyString(); collectionDataCurrent->add(shardID, shardSlice.value); @@ -617,7 +624,7 @@ void ClusterInfo::loadCurrent() { _currentCollections.swap(newCollections); _shardIds.swap(newShardIds); } - _currentProt.version++; // such that others notice our change + _currentProt.doneVersion = storedVersion; _currentProt.isValid = true; // will never be reset to false } else { LOG(ERR) << "Current is not an object!"; @@ -818,7 +825,7 @@ int ClusterInfo::createDatabaseCoordinator(std::string const& name, }; // ATTENTION: The following callback calls the above closure in a - // different thread. Nevertheless, the closure accesses some of our + // different thread. Nevertheless, the closure accesses some of our // local variables. Therefore we have to protect all accesses to them // by a mutex. We use the mutex of the condition variable in the // AgencyCallback for this. @@ -907,7 +914,7 @@ int ClusterInfo::dropDatabaseCoordinator(std::string const& name, std::string where("Current/Databases/" + name); // ATTENTION: The following callback calls the above closure in a - // different thread. Nevertheless, the closure accesses some of our + // different thread. Nevertheless, the closure accesses some of our // local variables. Therefore we have to protect all accesses to them // by a mutex. We use the mutex of the condition variable in the // AgencyCallback for this. @@ -1042,7 +1049,7 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName, }; // ATTENTION: The following callback calls the above closure in a - // different thread. Nevertheless, the closure accesses some of our + // different thread. Nevertheless, the closure accesses some of our // local variables. Therefore we have to protect all accesses to them // by a mutex. We use the mutex of the condition variable in the // AgencyCallback for this. @@ -1145,7 +1152,7 @@ int ClusterInfo::dropCollectionCoordinator(std::string const& databaseName, "Current/Collections/" + databaseName + "/" + collectionID; // ATTENTION: The following callback calls the above closure in a - // different thread. Nevertheless, the closure accesses some of our + // different thread. Nevertheless, the closure accesses some of our // local variables. Therefore we have to protect all accesses to them // by a mutex. We use the mutex of the condition variable in the // AgencyCallback for this. @@ -1584,7 +1591,7 @@ int ClusterInfo::ensureIndexCoordinator( // ATTENTION: The following callback calls the above closure in a - // different thread. Nevertheless, the closure accesses some of our + // different thread. Nevertheless, the closure accesses some of our // local variables. Therefore we have to protect all accesses to them // by a mutex. We use the mutex of the condition variable in the // AgencyCallback for this. @@ -1720,11 +1727,11 @@ int ClusterInfo::dropIndexCoordinator(std::string const& databaseName, dbServerResult = setErrormsg(TRI_ERROR_NO_ERROR, errorMsg); } } - return true; + return true; }; // ATTENTION: The following callback calls the above closure in a - // different thread. Nevertheless, the closure accesses some of our + // different thread. Nevertheless, the closure accesses some of our // local variables. Therefore we have to protect all accesses to them // by a mutex. We use the mutex of the condition variable in the // AgencyCallback for this. @@ -1851,9 +1858,12 @@ static std::string const prefixServers = "Current/ServersRegistered"; void ClusterInfo::loadServers() { - uint64_t storedVersion = _serversProt.version; + ++_serversProt.wantedVersion; // Indicate that after *NOW* somebody has to + // reread from the agency! MUTEX_LOCKER(mutexLocker, _serversProt.mutex); - if (_serversProt.version > storedVersion) { + uint64_t storedVersion = _serversProt.wantedVersion; // this is the version + // we will set in the end + if (_serversProt.doneVersion == storedVersion) { // Somebody else did, what we intended to do, so just return return; } @@ -1883,7 +1893,7 @@ void ClusterInfo::loadServers() { { WRITE_LOCKER(writeLocker, _serversProt.lock); _servers.swap(newServers); - _serversProt.version++; // such that others notice our change + _serversProt.doneVersion = storedVersion; _serversProt.isValid = true; // will never be reset to false } return; @@ -1976,9 +1986,12 @@ std::string ClusterInfo::getServerName(std::string const& endpoint) { static std::string const prefixCurrentCoordinators = "Current/Coordinators"; void ClusterInfo::loadCurrentCoordinators() { - uint64_t storedVersion = _coordinatorsProt.version; + ++_coordinatorsProt.wantedVersion; // Indicate that after *NOW* somebody + // has to reread from the agency! MUTEX_LOCKER(mutexLocker, _coordinatorsProt.mutex); - if (_coordinatorsProt.version > storedVersion) { + uint64_t storedVersion = _coordinatorsProt.wantedVersion; // this is the + // version we will set in the end + if (_coordinatorsProt.doneVersion == storedVersion) { // Somebody else did, what we intended to do, so just return return; } @@ -2004,7 +2017,7 @@ void ClusterInfo::loadCurrentCoordinators() { { WRITE_LOCKER(writeLocker, _coordinatorsProt.lock); _coordinators.swap(newCoordinators); - _coordinatorsProt.version++; // such that others notice our change + _coordinatorsProt.doneVersion = storedVersion; _coordinatorsProt.isValid = true; // will never be reset to false } return; @@ -2028,9 +2041,12 @@ static std::string const prefixTargetCleaned = "Target/CleanedOutServers"; static std::string const prefixTargetFailed = "Target/FailedServers"; void ClusterInfo::loadCurrentDBServers() { - uint64_t storedVersion = _DBServersProt.version; + ++_DBServersProt.wantedVersion; // Indicate that after *NOW* somebody has to + // reread from the agency! MUTEX_LOCKER(mutexLocker, _DBServersProt.mutex); - if (_DBServersProt.version > storedVersion) { + uint64_t storedVersion = _DBServersProt.wantedVersion; // this is the version + // we will set in the end + if (_DBServersProt.doneVersion == storedVersion) { // Somebody else did, what we intended to do, so just return return; } @@ -2089,7 +2105,7 @@ void ClusterInfo::loadCurrentDBServers() { { WRITE_LOCKER(writeLocker, _DBServersProt.lock); _DBServers.swap(newDBServers); - _DBServersProt.version++; // such that others notice our change + _DBServersProt.doneVersion = storedVersion; _DBServersProt.isValid = true; // will never be reset to false } return; diff --git a/arangod/Cluster/ClusterInfo.h b/arangod/Cluster/ClusterInfo.h index 7ae5a58244..7935959128 100644 --- a/arangod/Cluster/ClusterInfo.h +++ b/arangod/Cluster/ClusterInfo.h @@ -149,7 +149,7 @@ class CollectionInfoCurrent { auto it = _vpacks.find(shardID); if (it != _vpacks.end()) { VPackSlice slice = it->second->slice(); - + VPackSlice servers = slice.get("servers"); if (servers.isArray()) { for (auto const& server: VPackArrayIterator(servers)) { @@ -473,29 +473,29 @@ class ClusterInfo { ////////////////////////////////////////////////////////////////////////////// /// @brief invalidate planned ////////////////////////////////////////////////////////////////////////////// - + void invalidatePlan(); ////////////////////////////////////////////////////////////////////////////// /// @brief invalidate current ////////////////////////////////////////////////////////////////////////////// - + void invalidateCurrent(); ////////////////////////////////////////////////////////////////////////////// /// @brief get current "Plan" structure ////////////////////////////////////////////////////////////////////////////// - + std::shared_ptr getPlan(); - + ////////////////////////////////////////////////////////////////////////////// /// @brief get current "Current" structure ////////////////////////////////////////////////////////////////////////////// - + std::shared_ptr getCurrent(); - + private: - + ////////////////////////////////////////////////////////////////////////////// /// @brief get an operation timeout ////////////////////////////////////////////////////////////////////////////// @@ -529,28 +529,28 @@ class ClusterInfo { // Cached data from the agency, we reload whenever necessary: - // We group the data, each group has an atomic "valid-flag" - // which is used for lazy loading in the beginning. It starts - // as false, is set to true at each reload and is never reset - // to false in the lifetime of the server. The variable is - // atomic to be able to check it without acquiring - // the read lock (see below). Flush is just an explicit reload - // for all data and is only used in tests. + // We group the data, each group has an atomic "valid-flag" which is + // used for lazy loading in the beginning. It starts as false, is set + // to true at each reload and is only reset to false if the cache + // needs to be invalidated. The variable is atomic to be able to check + // it without acquiring the read lock (see below). Flush is just an + // explicit reload for all data and is only used in tests. // Furthermore, each group has a mutex that protects against // simultaneously contacting the agency for an update. - // In addition, each group has an atomic version number, this is used - // to prevent a stampede if multiple threads notice concurrently - // that an update from the agency is necessary. Finally, there is - // a read/write lock which protects the actual data structure. + // In addition, each group has two atomic version numbers, these are + // used to prevent a stampede if multiple threads notice concurrently + // that an update from the agency is necessary. Finally, there is a + // read/write lock which protects the actual data structure. // We encapsulate this protection in the struct ProtectionData: struct ProtectionData { std::atomic isValid; Mutex mutex; - std::atomic version; + std::atomic wantedVersion; + std::atomic doneVersion; arangodb::basics::ReadWriteLock lock; - ProtectionData() : isValid(false), version(0) {} + ProtectionData() : isValid(false), wantedVersion(0), doneVersion(0) {} }; // The servers, first all, we only need Current here: @@ -566,10 +566,10 @@ class ClusterInfo { std::unordered_map _coordinators; // from Current/Coordinators ProtectionData _coordinatorsProt; - + std::shared_ptr _plan; std::shared_ptr _current; - + std::unordered_map _plannedDatabases; // from Plan/Databases ProtectionData _planProt; @@ -653,7 +653,7 @@ class FollowerInfo { public: - explicit FollowerInfo(arangodb::LogicalCollection* d) + explicit FollowerInfo(arangodb::LogicalCollection* d) : _followers(new std::vector()), _docColl(d) { } //////////////////////////////////////////////////////////////////////////////