1
0
Fork 0
arangodb/arangod/Sharding/ShardDistributionReporter.cpp

507 lines
18 KiB
C++

////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2017 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Michael Hackstein
////////////////////////////////////////////////////////////////////////////////
#include <queue>
#include <velocypack/Builder.h>
#include <velocypack/velocypack-aliases.h>
#include "Basics/StringUtils.h"
#include "Basics/system-functions.h"
#include "Cluster/ClusterFeature.h"
#include "Cluster/ClusterInfo.h"
#include "Futures/Utilities.h"
#include "Logger/LogMacros.h"
#include "Network/Methods.h"
#include "Network/NetworkFeature.h"
#include "Network/Utils.h"
#include "ShardDistributionReporter.h"
#include "VocBase/LogicalCollection.h"
#include "VocBase/ticks.h"
using namespace arangodb;
using namespace arangodb::cluster;
struct SyncCountInfo {
bool insync;
uint64_t total;
uint64_t current;
std::vector<ServerID> followers;
SyncCountInfo() : insync(false), total(1), current(0) {}
~SyncCountInfo() = default;
};
//////////////////////////////////////////////////////////////////////////////
/// @brief the pointer to the singleton instance
//////////////////////////////////////////////////////////////////////////////
std::shared_ptr<ShardDistributionReporter> arangodb::cluster::ShardDistributionReporter::_theInstance;
//////////////////////////////////////////////////////////////////////////////
/// @brief Static helper functions
//////////////////////////////////////////////////////////////////////////////
//////////////////////////////////////////////////////////////////////////////
/// @brief Test if one shard is in sync by comparing plan and current
//////////////////////////////////////////////////////////////////////////////
static inline bool TestIsShardInSync(std::vector<ServerID> plannedServers,
std::vector<ServerID> realServers) {
// The leader at [0] must be the same, while the order of the followers must
// be ignored.
TRI_ASSERT(!plannedServers.empty());
TRI_ASSERT(!realServers.empty());
std::sort(plannedServers.begin() + 1, plannedServers.end());
std::sort(realServers.begin() + 1, realServers.end());
return plannedServers == realServers;
}
//////////////////////////////////////////////////////////////////////////////
/// @brief Report a single shard without progress
//////////////////////////////////////////////////////////////////////////////
static void ReportShardNoProgress(std::string const& shardId,
std::vector<ServerID> const& respServers,
std::unordered_map<ServerID, std::string> const& aliases,
VPackBuilder& result) {
TRI_ASSERT(result.isOpenObject());
result.add(VPackValue(shardId));
result.openObject();
// We always have at least the leader
TRI_ASSERT(!respServers.empty());
auto respServerIt = respServers.begin();
auto al = aliases.find(*respServerIt);
if (al != aliases.end()) {
result.add("leader", VPackValue(al->second));
} else {
result.add("leader", VPackValue(*respServerIt));
}
++respServerIt;
result.add(VPackValue("followers"));
result.openArray();
while (respServerIt != respServers.end()) {
auto al = aliases.find(*respServerIt);
if (al != aliases.end()) {
result.add(VPackValue(al->second));
} else {
result.add(VPackValue(*respServerIt));
}
++respServerIt;
}
result.close(); // followers
result.close(); // shard
}
//////////////////////////////////////////////////////////////////////////////
/// @brief Report a single shard with progress
//////////////////////////////////////////////////////////////////////////////
static void ReportShardProgress(std::string const& shardId,
std::vector<ServerID> const& respServers,
std::unordered_map<ServerID, std::string> const& aliases,
uint64_t total, uint64_t current, VPackBuilder& result) {
TRI_ASSERT(result.isOpenObject());
result.add(VPackValue(shardId));
result.openObject();
// We always have at least the leader
TRI_ASSERT(!respServers.empty());
auto respServerIt = respServers.begin();
auto al = aliases.find(*respServerIt);
if (al != aliases.end()) {
result.add("leader", VPackValue(al->second));
} else {
result.add("leader", VPackValue(*respServerIt));
}
++respServerIt;
result.add(VPackValue("followers"));
result.openArray();
while (respServerIt != respServers.end()) {
auto al = aliases.find(*respServerIt);
if (al != aliases.end()) {
result.add(VPackValue(al->second));
} else {
result.add(VPackValue(*respServerIt));
}
++respServerIt;
}
result.close(); // followers
result.add(VPackValue("progress"));
// We have somehow invalid, mostlikely no shard has responded in time
if (total == current) {
// Reset current
current = 0;
}
result.openObject();
result.add("total", VPackValue(total));
result.add("current", VPackValue(current));
result.close(); // progress
result.close(); // shard
}
//////////////////////////////////////////////////////////////////////////////
/// @brief Report a list of leader and follower based on shardMap
//////////////////////////////////////////////////////////////////////////////
static void ReportPartialNoProgress(ShardMap const* shardIds,
std::unordered_map<ServerID, std::string> const& aliases,
VPackBuilder& result) {
TRI_ASSERT(result.isOpenObject());
for (auto const& s : *shardIds) {
ReportShardNoProgress(s.first, s.second, aliases, result);
}
}
//////////////////////////////////////////////////////////////////////////////
/// @brief Report a complete collection an the insync format
//////////////////////////////////////////////////////////////////////////////
static void ReportInSync(LogicalCollection const* col, ShardMap const* shardIds,
std::unordered_map<ServerID, std::string> const& aliases,
VPackBuilder& result) {
TRI_ASSERT(result.isOpenObject());
result.add(VPackValue(col->name()));
// In this report Plan and Current are identical
result.openObject();
{
// Add Plan
result.add(VPackValue("Plan"));
result.openObject();
ReportPartialNoProgress(shardIds, aliases, result);
result.close();
}
{
// Add Current
result.add(VPackValue("Current"));
result.openObject();
ReportPartialNoProgress(shardIds, aliases, result);
result.close();
}
result.close();
}
//////////////////////////////////////////////////////////////////////////////
/// @brief Report a complete collection in the offsync format, with eventually
/// known counts
//////////////////////////////////////////////////////////////////////////////
static void ReportOffSync(LogicalCollection const* col, ShardMap const* shardIds,
std::unordered_map<ShardID, SyncCountInfo>& counters,
std::unordered_map<ServerID, std::string> const& aliases,
VPackBuilder& result, bool progress) {
TRI_ASSERT(result.isOpenObject());
result.add(VPackValue(col->name()));
// In this report Plan and Current are identical
result.openObject();
{
// Add Plan
result.add(VPackValue("Plan"));
result.openObject();
for (auto const& s : *shardIds) {
TRI_ASSERT(counters.find(s.first) != counters.end());
auto const& c = counters[s.first];
if (c.insync) {
ReportShardNoProgress(s.first, s.second, aliases, result);
} else {
if (progress) {
ReportShardProgress(s.first, s.second, aliases, c.total, c.current, result);
} else {
ReportShardNoProgress(s.first, s.second, aliases, result);
}
}
}
result.close();
}
{
// Add Current
result.add(VPackValue("Current"));
result.openObject();
for (auto const& s : *shardIds) {
TRI_ASSERT(counters.find(s.first) != counters.end());
auto const& c = counters[s.first];
if (c.insync) {
ReportShardNoProgress(s.first, s.second, aliases, result);
} else if (!c.followers.empty()) {
ReportShardNoProgress(s.first, c.followers, aliases, result);
}
}
result.close();
}
result.close();
}
ShardDistributionReporter::ShardDistributionReporter(ClusterInfo* ci, network::Sender sender)
: _ci(ci), _send(sender) {
TRI_ASSERT(_ci != nullptr);
}
ShardDistributionReporter::~ShardDistributionReporter() = default;
std::shared_ptr<ShardDistributionReporter> ShardDistributionReporter::instance(
application_features::ApplicationServer& server) {
if (_theInstance == nullptr) {
auto& ci = server.getFeature<ClusterFeature>().clusterInfo();
auto& nf = server.getFeature<NetworkFeature>();
auto* pool = nf.pool();
_theInstance = std::make_shared<ShardDistributionReporter>(
&ci,
[pool](network::DestinationId const& d, arangodb::fuerte::RestVerb v,
std::string const& u, velocypack::Buffer<uint8_t> b,
network::Timeout t, network::Headers h) -> network::FutureRes {
network::RequestOptions options;
options.timeout = t;;
return sendRequest(pool, d, v, u, std::move(b), options, std::move(h));
});
}
return _theInstance;
}
void ShardDistributionReporter::helperDistributionForDatabase(
std::string const& dbName, VPackBuilder& result,
std::queue<std::shared_ptr<LogicalCollection>>& todoSyncStateCheck, double endtime,
std::unordered_map<std::string, std::string>& aliases, bool progress) {
if (!todoSyncStateCheck.empty()) {
std::unordered_map<ShardID, SyncCountInfo> counters;
std::vector<ServerID> serversToAsk;
while (!todoSyncStateCheck.empty()) {
counters.clear();
auto const col = todoSyncStateCheck.front();
auto allShards = col->shardIds();
auto cic = _ci->getCollectionCurrent(dbName, std::to_string(col->id()));
// Send requests
for (auto const& s : *(allShards.get())) {
double timeleft = endtime - TRI_microtime();
network::Timeout timeout(timeleft);
serversToAsk.clear();
auto curServers = cic->servers(s.first);
auto& entry = counters[s.first]; // Emplaces a new SyncCountInfo
if (curServers.empty() || s.second.empty()) {
// either of the two vectors is empty...
entry.insync = false;
} else if (TestIsShardInSync(s.second, curServers)) {
entry.insync = true;
} else {
entry.followers = curServers;
if (timeleft > 0.0) {
std::string path = "/_db/" + basics::StringUtils::urlEncode(dbName) +
"/_api/collection/" +
basics::StringUtils::urlEncode(s.first) +
"/count";
VPackBuffer<uint8_t> body;
// First Ask the leader
network::Headers headers;
auto leaderF = _send("server:" + s.second.at(0), fuerte::RestVerb::Get,
path, body, timeout, headers);
// Now figure out which servers need to be asked
for (auto const& planned : s.second) {
bool found = false;
for (auto const& current : entry.followers) {
if (current == planned) {
found = true;
break;
}
}
if (!found) {
serversToAsk.emplace_back(planned);
}
}
// Ask them
std::vector<network::FutureRes> futures;
futures.reserve(serversToAsk.size());
for (auto const& server : serversToAsk) {
network::Headers headers;
auto f = _send("server:" + server, fuerte::RestVerb::Get, path,
body, timeout, headers);
futures.emplace_back(std::move(f));
}
// Wait for responses
// First wait for Leader
{
auto& res = leaderF.get();
if (fuerteToArangoErrorCode(res) != TRI_ERROR_NO_ERROR || !res.response) {
// We did not even get count for leader, use defaults
continue;
}
std::vector<VPackSlice> const& slices = res.response->slices();
if (slices.empty() || !slices[0].isObject()) {
LOG_TOPIC("c02b2", WARN, arangodb::Logger::CLUSTER)
<< "Received invalid response for count. Shard "
<< "distribution inaccurate";
continue;
}
VPackSlice response = slices[0].get("count");
if (!response.isNumber()) {
LOG_TOPIC("fe868", WARN, arangodb::Logger::CLUSTER)
<< "Received invalid response for count. Shard "
<< "distribution inaccurate";
continue;
}
entry.total = response.getNumber<uint64_t>();
entry.current = entry.total; // << We use this to flip around min/max test
}
{
auto responses = futures::collectAll(futures).get();
for (futures::Try<network::Response> const& response : responses) {
if (!response.hasValue() ||
fuerteToArangoErrorCode(response.get()) != TRI_ERROR_NO_ERROR ||
!response.get().response) {
// We do not care for errors of any kind.
// We can continue here because all other requests will be
// handled by the accumulated timeout
continue;
}
auto& res = response.get();
std::vector<VPackSlice> const& slices = res.response->slices();
if (slices.empty() || !slices[0].isObject()) {
LOG_TOPIC("fcbb3", WARN, arangodb::Logger::CLUSTER)
<< "Received invalid response for count. Shard "
<< "distribution inaccurate";
continue;
}
VPackSlice answer = slices[0].get("count");
if (!answer.isNumber()) {
LOG_TOPIC("8d7b0", WARN, arangodb::Logger::CLUSTER)
<< "Received invalid response for count. Shard "
<< "distribution inaccurate";
continue;
}
uint64_t other = answer.getNumber<uint64_t>();
if (other < entry.total) {
// If we have more in total we need the minimum of other
// counts
if (other < entry.current) {
entry.current = other;
}
} else {
// If we only have more in total we take the maximum of other
// counts
if (entry.total <= entry.current && other > entry.current) {
entry.current = other;
}
}
}
}
}
}
}
ReportOffSync(col.get(), allShards.get(), counters, aliases, result, progress);
todoSyncStateCheck.pop();
}
}
}
void ShardDistributionReporter::getCollectionDistributionForDatabase(
std::string const& dbName, std::string const& colName, VPackBuilder& result) {
double endtime = TRI_microtime() + 2.0; // We add two seconds
result.openObject();
auto aliases = _ci->getServerAliases();
auto col = _ci->getCollection(dbName, colName);
std::queue<std::shared_ptr<LogicalCollection>> todoSyncStateCheck;
auto allShards = col->shardIds();
if (testAllShardsInSync(dbName, col.get(), allShards.get())) {
ReportInSync(col.get(), allShards.get(), aliases, result);
} else {
todoSyncStateCheck.push(col);
}
const bool progress = true;
helperDistributionForDatabase(dbName, result, todoSyncStateCheck, endtime, aliases, progress);
result.close();
}
void ShardDistributionReporter::getDistributionForDatabase(std::string const& dbName,
VPackBuilder& result) {
double endtime = TRI_microtime() + 2.0; // We add two seconds
result.openObject();
auto aliases = _ci->getServerAliases();
auto cols = _ci->getCollections(dbName);
std::queue<std::shared_ptr<LogicalCollection>> todoSyncStateCheck;
for (auto col : cols) {
auto allShards = col->shardIds();
if (testAllShardsInSync(dbName, col.get(), allShards.get())) {
ReportInSync(col.get(), allShards.get(), aliases, result);
} else {
todoSyncStateCheck.push(col);
}
}
const bool progress = false;
helperDistributionForDatabase(dbName, result, todoSyncStateCheck, endtime, aliases, progress);
result.close();
}
bool ShardDistributionReporter::testAllShardsInSync(std::string const& dbName,
LogicalCollection const* col,
ShardMap const* shardIds) {
TRI_ASSERT(col != nullptr);
TRI_ASSERT(shardIds != nullptr);
auto cic = _ci->getCollectionCurrent(dbName, std::to_string(col->id()));
for (auto const& s : *shardIds) {
auto curServers = cic->servers(s.first);
if (s.second.empty() || curServers.empty() || !TestIsShardInSync(s.second, curServers)) {
return false;
}
}
return true;
}