1
0
Fork 0

Sort out FollowerInfo.

This commit is contained in:
Max Neunhoeffer 2016-01-29 13:56:37 +01:00
parent 5375ada8a7
commit dcf4ed8bca
7 changed files with 95 additions and 135 deletions

View File

@ -2523,72 +2523,34 @@ std::vector<ServerID> ClusterInfo::getCurrentCoordinators() {
}
////////////////////////////////////////////////////////////////////////////////
/// @brief get information about current followers of a shard, the first
/// overloaded method is supposed to be very fast, whereas the second
/// needs a hash lookup, on the other hand one only needs the shardID.
/// Returns an empty shared_ptr if the follower information of the
/// shard has been dropped (see `dropFollowerInfo` below).
/// @brief get information about current followers of a shard.
////////////////////////////////////////////////////////////////////////////////
ClusterInfo::FollowerInfo ClusterInfo::getFollowerInfo(TRI_collection_t& coll) {
std::lock_guard<std::mutex> lock(_followerInfoMutex);
if (coll._followerInfoIndex >= 0) {
return _followerInfos[static_cast<size_t>(coll._followerInfoIndex)];
}
ServerID c = coll._info.name();
auto it = _followerInfoTable.find(c);
if (it != _followerInfoTable.end()) {
coll._followerInfoIndex = it->second;
return _followerInfos[static_cast<size_t>(it->second)];
}
return newFollowerInfo(c, coll._followerInfoIndex);
}
ClusterInfo::FollowerInfo ClusterInfo::getFollowerInfo(ShardID& c) {
std::lock_guard<std::mutex> lock(_followerInfoMutex);
auto it = _followerInfoTable.find(c);
if (it != _followerInfoTable.end()) {
return _followerInfos[static_cast<size_t>(it->second)];
}
int64_t tmp = -1;
return newFollowerInfo(c, tmp);
}
ClusterInfo::FollowerInfo ClusterInfo::newFollowerInfo(ShardID& c,
int64_t& index) {
// Mutex must already be locked, which is done in the getFollowerInfo methods
auto v = std::make_shared<std::vector<ServerID> const>();
_followerInfos.push_back(v);
try {
index = static_cast<int64_t>(_followerInfos.size() - 1);
_followerInfoTable.emplace(make_pair(c, index));
return v;
} catch (...) {
_followerInfos.pop_back(); // make data structure consistent again
index = -1;
throw;
}
return _followerInfos[static_cast<size_t>(index)];
std::shared_ptr<std::vector<ServerID> const> FollowerInfo::get() {
std::lock_guard<std::mutex> lock(_mutex);
return _followers;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief add a follower to a shard, this is only done by the server side
/// of the "get-in-sync" capabilities. This reports to the agency under
/// `/Current` but in asynchronous "fire-and-forget" way. The method
/// fails silently, if the follower information has since been dropped
/// (see `dropFollowerInfo` below).
/// `/Current` but in asynchronous "fire-and-forget" way.
////////////////////////////////////////////////////////////////////////////////
void ClusterInfo::addFollower(ShardID& c, ServerID const& s) {
std::lock_guard<std::mutex> lock(_followerInfoMutex);
auto it = _followerInfoTable.find(c);
TRI_ASSERT(it != _followerInfoTable.end());
size_t pos = static_cast<size_t>(it->second);
auto v = std::make_shared<std::vector<ServerID>>(*_followerInfos[pos]);
v->push_back(s);
_followerInfos[pos] = v; // will cast to std::vector<ServerID> const
void FollowerInfo::add(ServerID const& s) {
std::lock_guard<std::mutex> lock(_mutex);
// Fully copy the vector:
auto v = std::make_shared<std::vector<ServerID>>(*_followers);
v->push_back(s); // add a single entry
_followers = v; // will cast to std::vector<ServerID> const
// Now tell the agency:
// ...
// Path is
// Current/Collections/<dbName>/<collectionID>/<shardID>
// do {
// Get value,
// add follower
// Casvalue
// } until geklappt
}
////////////////////////////////////////////////////////////////////////////////
@ -2599,36 +2561,23 @@ void ClusterInfo::addFollower(ShardID& c, ServerID const& s) {
/// since been dropped (see `dropFollowerInfo` below).
////////////////////////////////////////////////////////////////////////////////
void ClusterInfo::removeFollower(ShardID& c, ServerID const& s) {
std::lock_guard<std::mutex> lock(_followerInfoMutex);
auto it = _followerInfoTable.find(c);
TRI_ASSERT(it != _followerInfoTable.end());
size_t pos = static_cast<size_t>(it->second);
void FollowerInfo::remove(ServerID const& s) {
std::lock_guard<std::mutex> lock(_mutex);
auto v = std::make_shared<std::vector<ServerID>>();
v->reserve(_followerInfos[pos]->size() - 1);
for (auto const& i : *_followerInfos[pos]) {
v->reserve(_followers->size() - 1);
for (auto const& i : *_followers) {
if (i != s) {
v->push_back(i);
}
}
_followerInfos[pos] = v; // will cast to std::vector<ServerID> const
_followers = v; // will cast to std::vector<ServerID> const
// Now tell the agency:
// ...
// Path is
// Current/Collections/<dbName>/<collectionID>/<shardID>
// do {
// Get value,
// remove follower
// Casvalue
// } until geklappt
}
////////////////////////////////////////////////////////////////////////////////
/// @brief drop information about current followers of a shard,
////////////////////////////////////////////////////////////////////////////////
void ClusterInfo::dropFollowerInfo(ShardID& c) {
std::lock_guard<std::mutex> lock(_followerInfoMutex);
auto it = _followerInfoTable.find(c);
if (it == _followerInfoTable.end()) {
LOG_ERROR("Did not find expected followerInfo for shard %s.", c.c_str());
return;
}
_followerInfos[static_cast<size_t>(it->second)].reset();
_followerInfoTable.erase(it);
}

View File

@ -527,7 +527,6 @@ class ClusterInfo {
DatabaseCollectionsCurrent;
typedef std::unordered_map<DatabaseID, DatabaseCollectionsCurrent>
AllCollectionsCurrent;
typedef std::shared_ptr<std::vector<ServerID> const> FollowerInfo;
private:
//////////////////////////////////////////////////////////////////////////////
@ -851,49 +850,9 @@ class ClusterInfo {
double getReloadServerListTimeout() const { return 60.0; }
//////////////////////////////////////////////////////////////////////////////
/// @brief get information about current followers of a shard, the first
/// overloaded method is supposed to be very fast, whereas the second
/// needs a hash lookup, on the other hand one only needs the shardID.
/// Returns an empty shared_ptr if the follower information of the
/// shard has been dropped (see `dropFollowerInfo` below).
/// @brief object for agency communication
//////////////////////////////////////////////////////////////////////////////
FollowerInfo getFollowerInfo(TRI_collection_t& coll);
FollowerInfo getFollowerInfo(ShardID& c);
//////////////////////////////////////////////////////////////////////////////
/// @brief add a follower to a shard, this is only done by the server side
/// of the "get-in-sync" capabilities. This reports to the agency under
/// `/Current` but in asynchronous "fire-and-forget" way. The method
/// fails silently, if the follower information has since been dropped
/// (see `dropFollowerInfo` below).
//////////////////////////////////////////////////////////////////////////////
void addFollower(ShardID& c, ServerID const& s);
//////////////////////////////////////////////////////////////////////////////
/// @brief remove a follower from a shard, this is only done by the
/// server if a synchronous replication request fails. This reports to
/// the agency under `/Current` but in asynchronous "fire-and-forget"
/// way. The method fails silently, if the follower information has
/// since been dropped (see `dropFollowerInfo` below).
//////////////////////////////////////////////////////////////////////////////
void removeFollower(ShardID& c, ServerID const& s);
//////////////////////////////////////////////////////////////////////////////
/// @brief drop information about current followers of a shard
//////////////////////////////////////////////////////////////////////////////
void dropFollowerInfo(ShardID& c);
//////////////////////////////////////////////////////////////////////////////
/// @brief internal method to add a follower info entry
//////////////////////////////////////////////////////////////////////////////
FollowerInfo newFollowerInfo(ShardID& c, int64_t& index);
private:
AgencyComm _agency;
// Cached data from the agency, we reload whenever necessary:
@ -971,13 +930,6 @@ class ClusterInfo {
std::unordered_map<ShardID, std::shared_ptr<std::vector<ServerID>>>
_shardIds; // from Current/Collections/
// The following is a special case, it is the current information
// about synchronous followers for each shard, for which we are
// responsible as a leader.
std::vector<FollowerInfo> _followerInfos;
std::unordered_map<ShardID, int64_t> _followerInfoTable;
std::mutex _followerInfoMutex;
//////////////////////////////////////////////////////////////////////////////
/// @brief uniqid sequence
//////////////////////////////////////////////////////////////////////////////
@ -1018,6 +970,48 @@ class ClusterInfo {
static double const reloadServerListTimeout;
};
////////////////////////////////////////////////////////////////////////////////
/// @brief a class to track followers that are in sync for a shard
////////////////////////////////////////////////////////////////////////////////
class FollowerInfo {
std::shared_ptr<std::vector<ServerID> const> _followers;
std::mutex _mutex;
TRI_document_collection_t* _docColl;
public:
FollowerInfo(TRI_document_collection_t* d) : _docColl(d) {
}
//////////////////////////////////////////////////////////////////////////////
/// @brief get information about current followers of a shard.
//////////////////////////////////////////////////////////////////////////////
std::shared_ptr<std::vector<ServerID> const> get();
//////////////////////////////////////////////////////////////////////////////
/// @brief add a follower to a shard, this is only done by the server side
/// of the "get-in-sync" capabilities. This reports to the agency under
/// `/Current` but in asynchronous "fire-and-forget" way. The method
/// fails silently, if the follower information has since been dropped
/// (see `dropFollowerInfo` below).
//////////////////////////////////////////////////////////////////////////////
void add(ServerID const& s);
//////////////////////////////////////////////////////////////////////////////
/// @brief remove a follower from a shard, this is only done by the
/// server if a synchronous replication request fails. This reports to
/// the agency under `/Current` but in asynchronous "fire-and-forget"
/// way. The method fails silently, if the follower information has
/// since been dropped (see `dropFollowerInfo` below).
//////////////////////////////////////////////////////////////////////////////
void remove(ServerID const& s);
};
} // end namespace arangodb
#endif

View File

@ -300,6 +300,14 @@ static void JS_SynchronizeReplication(
}
}
bool forSynchronousReplication = false;
if (object->Has(TRI_V8_ASCII_STRING("forSynchronousReplication"))) {
if (object->Get(TRI_V8_ASCII_STRING("forSynchronousReplication"))->IsBoolean()) {
forSynchronousReplication =
TRI_ObjectToBoolean(object->Get(TRI_V8_ASCII_STRING("forSynchronousReplication")));
}
}
std::string errorMsg = "";
InitialSyncer syncer(vocbase, &config, restrictCollections, restrictType,
verbose);
@ -341,6 +349,9 @@ static void JS_SynchronizeReplication(
}
}
// Now check forSynchronousReplication flag and tell ClusterInfo
// about a new follower.
TRI_V8_RETURN(result);
TRI_V8_TRY_CATCH_END
}

View File

@ -1891,4 +1891,3 @@ bool TRI_IsAllowedNameCollection(bool allowSystem, char const* name) {
return true;
}

View File

@ -333,10 +333,7 @@ struct TRI_collection_t {
TRI_vector_pointer_t _compactors; // all compactor files
TRI_vector_string_t _indexFiles; // all index filenames
int64_t _followerInfoIndex; // short-cut to find
// FollowerInfo in
// ClusterInfo quickly
TRI_collection_t() : _followerInfoIndex(-1) {}
TRI_collection_t() {}
explicit TRI_collection_t(arangodb::VocbaseCollectionInfo const& info)
: _info(info) {}

View File

@ -31,6 +31,7 @@
#include "Basics/logging.h"
#include "Basics/tri-strings.h"
#include "Basics/ThreadPool.h"
#include "Cluster/ServerState.h"
#include "FulltextIndex/fulltext-index.h"
#include "Indexes/CapConstraint.h"
#include "Indexes/EdgeIndex.h"
@ -84,6 +85,9 @@ TRI_document_collection_t::TRI_document_collection_t()
_tickMax = 0;
setCompactionStatus("compaction not yet started");
if (ServerState::instance()->isDBServer()) {
_followers.reset(new FollowerInfo(this));
}
}
////////////////////////////////////////////////////////////////////////////////

View File

@ -35,6 +35,7 @@
#include "VocBase/update-policy.h"
#include "VocBase/voc-types.h"
#include "Wal/Marker.h"
#include "Cluster/ClusterInfo.h"
#include <regex.h>
@ -256,6 +257,11 @@ struct TRI_document_collection_t : public TRI_collection_t {
// whether or not secondary indexes are filled
bool _useSecondaryIndexes;
// the following contains in the cluster/DBserver case the information
// which other servers are in sync with this shard. It is unset in all
// other cases.
std::unique_ptr<arangodb::FollowerInfo> _followers;
public:
arangodb::DatafileStatistics _datafileStatistics;