1
0
Fork 0

Add followerInfos in ClusterInfo.

This commit is contained in:
Max Neunhoeffer 2015-12-28 14:31:16 +01:00
parent f3b5b70b7b
commit 79b5175212
3 changed files with 168 additions and 2 deletions

View File

@ -2555,6 +2555,117 @@ std::vector<ServerID> ClusterInfo::getCurrentCoordinators () {
return result;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief get information about current followers of a shard, the first
/// overloaded method is supposed to be very fast, whereas the second
/// needs a hash lookup, on the other hand one only needs the shardID.
/// Returns an empty shared_ptr if the follower information of the
/// shard has been dropped (see `dropFollowerInfo` below).
////////////////////////////////////////////////////////////////////////////////
ClusterInfo::FollowerInfo ClusterInfo::getFollowerInfo(TRI_collection_t& coll) {
std::lock_guard<std::mutex> lock(_followerInfoMutex);
if (coll._followerInfoIndex >= 0) {
return _followerInfos[static_cast<size_t>(coll._followerInfoIndex)];
}
ServerID c = coll._info.name();
auto it = _followerInfoTable.find(c);
if (it != _followerInfoTable.end()) {
coll._followerInfoIndex = it->second;
return _followerInfos[static_cast<size_t>(it->second)];
}
return newFollowerInfo(c, coll._followerInfoIndex);
}
ClusterInfo::FollowerInfo ClusterInfo::getFollowerInfo(ShardID& c) {
std::lock_guard<std::mutex> lock(_followerInfoMutex);
auto it = _followerInfoTable.find(c);
if (it != _followerInfoTable.end()) {
return _followerInfos[static_cast<size_t>(it->second)];
}
int64_t tmp = -1;
return newFollowerInfo(c, tmp);
}
ClusterInfo::FollowerInfo ClusterInfo::newFollowerInfo(
ShardID& c, int64_t& index) {
// Mutex must already be locked, which is done in the getFollowerInfo methods
auto v = make_shared<std::vector<ServerID> const>();
_followerInfos.push_back(v);
try {
index = static_cast<int64_t>(_followerInfos.size() - 1);
_followerInfoTable.emplace(make_pair(c, index));
return v;
}
catch (...) {
_followerInfos.pop_back(); // make data structure consistent again
index = -1;
throw;
}
return _followerInfos[static_cast<size_t>(index)];
}
////////////////////////////////////////////////////////////////////////////////
/// @brief add a follower to a shard, this is only done by the server side
/// of the "get-in-sync" capabilities. This reports to the agency under
/// `/Current` but in asynchronous "fire-and-forget" way. The method
/// fails silently, if the follower information has since been dropped
/// (see `dropFollowerInfo` below).
////////////////////////////////////////////////////////////////////////////////
void ClusterInfo::addFollower(ShardID& c, ServerID const& s) {
std::lock_guard<std::mutex> lock(_followerInfoMutex);
auto it = _followerInfoTable.find(c);
TRI_ASSERT(it != _followerInfoTable.end());
size_t pos = static_cast<size_t>(it->second);
auto v = make_shared<std::vector<ServerID>>(*_followerInfos[pos]);
v->push_back(s);
_followerInfos[pos] = v; // will cast to std::vector<ServerID> const
// Now tell the agency:
// ...
}
////////////////////////////////////////////////////////////////////////////////
/// @brief remove a follower from a shard, this is only done by the
/// server if a synchronous replication request fails. This reports to
/// the agency under `/Current` but in asynchronous "fire-and-forget"
/// way. The method fails silently, if the follower information has
/// since been dropped (see `dropFollowerInfo` below).
////////////////////////////////////////////////////////////////////////////////
void ClusterInfo::removeFollower (ShardID& c, ServerID const& s) {
std::lock_guard<std::mutex> lock(_followerInfoMutex);
auto it = _followerInfoTable.find(c);
TRI_ASSERT(it != _followerInfoTable.end());
size_t pos = static_cast<size_t>(it->second);
auto v = make_shared<std::vector<ServerID>>();
v->reserve(_followerInfos[pos]->size() - 1);
for (auto const& i : *_followerInfos[pos]) {
if (i != s) {
v->push_back(i);
}
}
_followerInfos[pos] = v; // will cast to std::vector<ServerID> const
// Now tell the agency:
// ...
}
////////////////////////////////////////////////////////////////////////////////
/// @brief drop information about current followers of a shard,
////////////////////////////////////////////////////////////////////////////////
void ClusterInfo::dropFollowerInfo (ShardID& c) {
std::lock_guard<std::mutex> lock(_followerInfoMutex);
auto it = _followerInfoTable.find(c);
if (it == _followerInfoTable.end()) {
LOG_ERROR("Did not find expected followerInfo for shard %s.", c.c_str());
return;
}
_followerInfos[static_cast<size_t>(it->second)].reset();
_followerInfoTable.erase(it);
}
// -----------------------------------------------------------------------------
// --SECTION-- END-OF-FILE
// -----------------------------------------------------------------------------

View File

@ -561,6 +561,8 @@ namespace triagens {
DatabaseCollectionsCurrent;
typedef std::unordered_map<DatabaseID, DatabaseCollectionsCurrent>
AllCollectionsCurrent;
typedef std::shared_ptr<std::vector<ServerID> const>
FollowerInfo;
// -----------------------------------------------------------------------------
// --SECTION-- constructors and destructors
@ -922,6 +924,49 @@ namespace triagens {
return 60.0;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief get information about current followers of a shard, the first
/// overloaded method is supposed to be very fast, whereas the second
/// needs a hash lookup, on the other hand one only needs the shardID.
/// Returns an empty shared_ptr if the follower information of the
/// shard has been dropped (see `dropFollowerInfo` below).
////////////////////////////////////////////////////////////////////////////////
FollowerInfo getFollowerInfo (TRI_collection_t& coll);
FollowerInfo getFollowerInfo (ShardID& c);
////////////////////////////////////////////////////////////////////////////////
/// @brief add a follower to a shard, this is only done by the server side
/// of the "get-in-sync" capabilities. This reports to the agency under
/// `/Current` but in asynchronous "fire-and-forget" way. The method
/// fails silently, if the follower information has since been dropped
/// (see `dropFollowerInfo` below).
////////////////////////////////////////////////////////////////////////////////
void addFollower (ShardID& c, ServerID const& s);
////////////////////////////////////////////////////////////////////////////////
/// @brief remove a follower from a shard, this is only done by the
/// server if a synchronous replication request fails. This reports to
/// the agency under `/Current` but in asynchronous "fire-and-forget"
/// way. The method fails silently, if the follower information has
/// since been dropped (see `dropFollowerInfo` below).
////////////////////////////////////////////////////////////////////////////////
void removeFollower (ShardID& c, ServerID const& s);
////////////////////////////////////////////////////////////////////////////////
/// @brief drop information about current followers of a shard
////////////////////////////////////////////////////////////////////////////////
void dropFollowerInfo (ShardID& c);
////////////////////////////////////////////////////////////////////////////////
/// @brief internal method to add a follower info entry
////////////////////////////////////////////////////////////////////////////////
FollowerInfo newFollowerInfo (ShardID& c, int64_t& index);
// -----------------------------------------------------------------------------
// --SECTION-- private variables
// -----------------------------------------------------------------------------
@ -982,7 +1027,7 @@ namespace triagens {
_currentDatabases; // from Current/Databases
ProtectionData _currentDatabasesProt;
// Finally, we need information about collections, again we have
// We need information about collections, again we have
// data from Plan and from Current.
// The information for _shards and _shardKeys are filled from the
// Plan (since they are fixed for the lifetime of the collection).
@ -1009,6 +1054,13 @@ namespace triagens {
std::unordered_map<ShardID, std::shared_ptr<std::vector<ServerID>>>
_shardIds; // from Current/Collections/
// The following is a special case, it is the current information
// about synchronous followers for each shard, for which we are
// responsible as a leader.
std::vector<FollowerInfo> _followerInfos;
std::unordered_map<ShardID, int64_t> _followerInfoTable;
std::mutex _followerInfoMutex;
////////////////////////////////////////////////////////////////////////////////
/// @brief uniqid sequence
////////////////////////////////////////////////////////////////////////////////

View File

@ -370,8 +370,11 @@ struct TRI_collection_t {
TRI_vector_pointer_t _compactors; // all compactor files
TRI_vector_string_t _indexFiles; // all index filenames
int64_t _followerInfoIndex; // short-cut to find
// FollowerInfo in
// ClusterInfo quickly
TRI_collection_t (
) {
) : _followerInfoIndex(-1) {
}
explicit TRI_collection_t (