1
0
Fork 0

Bug fix/speedup shard distribution (#3645)

* Added a more sophisticated test for shardDistribution format

* Updated shard distribution test to use request instead of download

* Added a cxx reporter for the shard distribuation. WIP

* Added some virtual functions/classes for Mocking

* Added a unittest for the new CXX ShardDistribution Reporter.

* The ShardDsitributionReporter now reports Plan and Current correctly. However it does not dare to find a good total/current value and just returns a default. Hence these tests are still red

* Shard distribution now uses the cxx variant

* The ShardDistribution reporter now tries to execute count on the shards

* Updated changelog

* Added error case tests. If the servers time out the mechanism will stop bothering after two seconds and just report default values.
This commit is contained in:
Michael Hackstein 2017-11-10 15:17:08 +01:00 committed by Frank Celler
parent 2b0aa318ec
commit 5c633f9fae
14 changed files with 1907 additions and 172 deletions

View File

@ -132,13 +132,17 @@ devel
v3.2.7 (XXXX-XX-XX)
-------------------
* improved the speed of shardDistribution in cluster, by moving it to c++. It is now guaranteed
return after ~2 seconds even if the entire cluster is unresponsive.
* enable JEMalloc background thread for purging and returning unused memory
back to the operating system (Linux only)
* fix agency precondition check for complex objects
this fixes issues with several CAS operations in the agency
* fixed issue #3403: How to kill long running AQL queries with the browser console's
AQL (display issue)

View File

@ -207,6 +207,7 @@ SET(ARANGOD_SOURCES
Cluster/RestAgencyCallbacksHandler.cpp
Cluster/RestClusterHandler.cpp
Cluster/ServerState.cpp
Cluster/ShardDistributionReporter.cpp
Cluster/TraverserEngine.cpp
Cluster/TraverserEngineRegistry.cpp
Cluster/v8-cluster.cpp

View File

@ -393,7 +393,8 @@ class ClusterComm {
//////////////////////////////////////////////////////////////////////////////
public:
~ClusterComm();
virtual ~ClusterComm();
//////////////////////////////////////////////////////////////////////////////
/// @brief get the unique instance
@ -442,7 +443,7 @@ class ClusterComm {
/// @brief submit an HTTP request to a shard asynchronously.
//////////////////////////////////////////////////////////////////////////////
OperationID asyncRequest(
virtual OperationID asyncRequest(
ClientTransactionID const& clientTransactionID,
CoordTransactionID const coordTransactionID,
std::string const& destination, rest::RequestType reqtype,
@ -474,19 +475,19 @@ class ClusterComm {
/// @brief wait for one answer matching the criteria
//////////////////////////////////////////////////////////////////////////////
ClusterCommResult const wait(ClientTransactionID const& clientTransactionID,
CoordTransactionID const coordTransactionID,
OperationID const operationID,
ShardID const& shardID,
ClusterCommTimeout timeout = 0.0);
virtual ClusterCommResult const wait(ClientTransactionID const& clientTransactionID,
CoordTransactionID const coordTransactionID,
OperationID const operationID,
ShardID const& shardID,
ClusterCommTimeout timeout = 0.0);
//////////////////////////////////////////////////////////////////////////////
/// @brief ignore and drop current and future answers matching
//////////////////////////////////////////////////////////////////////////////
void drop(ClientTransactionID const& clientTransactionID,
CoordTransactionID const coordTransactionID,
OperationID const operationID, ShardID const& shardID);
virtual void drop(ClientTransactionID const& clientTransactionID,
CoordTransactionID const coordTransactionID,
OperationID const operationID, ShardID const& shardID);
//////////////////////////////////////////////////////////////////////////////
/// @brief send an answer HTTP request to a coordinator

View File

@ -66,7 +66,7 @@ class CollectionInfoCurrent {
CollectionInfoCurrent& operator=(CollectionInfoCurrent&&) = delete;
~CollectionInfoCurrent();
virtual ~CollectionInfoCurrent();
public:
bool add(ShardID const& shardID, VPackSlice slice) {
@ -137,7 +137,7 @@ class CollectionInfoCurrent {
/// @brief returns the current leader and followers for a shard
//////////////////////////////////////////////////////////////////////////////
std::vector<ServerID> servers(ShardID const& shardID) const {
virtual std::vector<ServerID> servers(ShardID const& shardID) const {
std::vector<ServerID> v;
auto it = _vpacks.find(shardID);
@ -246,7 +246,7 @@ class ClusterInfo {
/// @brief shuts down library
//////////////////////////////////////////////////////////////////////////////
~ClusterInfo();
virtual ~ClusterInfo();
public:
static void createInstance(AgencyCallbackRegistry*);
@ -315,7 +315,7 @@ class ClusterInfo {
/// @brief ask about all collections
//////////////////////////////////////////////////////////////////////////////
std::vector<std::shared_ptr<LogicalCollection>> const getCollections(
virtual std::vector<std::shared_ptr<LogicalCollection>> const getCollections(
DatabaseID const&);
//////////////////////////////////////////////////////////////////////////////
@ -333,7 +333,7 @@ class ClusterInfo {
/// If it is not found in the cache, the cache is reloaded once.
//////////////////////////////////////////////////////////////////////////////
std::shared_ptr<CollectionInfoCurrent> getCollectionCurrent(
virtual std::shared_ptr<CollectionInfoCurrent> getCollectionCurrent(
DatabaseID const&, CollectionID const&);
//////////////////////////////////////////////////////////////////////////////
@ -517,7 +517,7 @@ class ClusterInfo {
std::unordered_map<ServerID, std::string> getServers();
std::unordered_map<ServerID, std::string> getServerAliases();
virtual std::unordered_map<ServerID, std::string> getServerAliases();
private:

View File

@ -0,0 +1,448 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2017 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Michael Hackstein
////////////////////////////////////////////////////////////////////////////////
#include "ShardDistributionReporter.h"
#include "Cluster/ClusterComm.h"
#include "Cluster/ClusterInfo.h"
#include "VocBase/LogicalCollection.h"
#include "VocBase/ticks.h"
#include <velocypack/Builder.h>
#include <velocypack/velocypack-aliases.h>
#include <queue>
using namespace arangodb;
using namespace arangodb::cluster;
struct SyncCountInfo {
bool insync;
uint64_t total;
uint64_t current;
std::vector<ServerID> followers;
SyncCountInfo() : insync(false), total(1), current(0) {}
~SyncCountInfo() {}
};
//////////////////////////////////////////////////////////////////////////////
/// @brief the pointer to the singleton instance
//////////////////////////////////////////////////////////////////////////////
std::shared_ptr<ShardDistributionReporter>
arangodb::cluster::ShardDistributionReporter::_theInstance;
//////////////////////////////////////////////////////////////////////////////
/// @brief Static helper functions
//////////////////////////////////////////////////////////////////////////////
//////////////////////////////////////////////////////////////////////////////
/// @brief Test if one shard is in sync by comparing plan and current
//////////////////////////////////////////////////////////////////////////////
static inline bool TestIsShardInSync(
std::vector<ServerID> const& plannedServers,
std::vector<ServerID> const& realServers) {
// TODO We need to verify that lists are identical despite ordering?
return plannedServers.at(0) == realServers.at(0) &&
plannedServers.size() == realServers.size();
}
//////////////////////////////////////////////////////////////////////////////
/// @brief Report a single shard without progress
//////////////////////////////////////////////////////////////////////////////
static void ReportShardNoProgress(
std::string const& shardId, std::vector<ServerID> const& respServers,
std::unordered_map<ServerID, std::string> const& aliases,
VPackBuilder& result) {
TRI_ASSERT(result.isOpenObject());
result.add(VPackValue(shardId));
result.openObject();
// We always have at least the leader
TRI_ASSERT(!respServers.empty());
auto respServerIt = respServers.begin();
auto al = aliases.find(*respServerIt);
if (al != aliases.end()) {
result.add("leader", VPackValue(al->second));
} else {
result.add("leader", VPackValue(*respServerIt));
}
++respServerIt;
result.add(VPackValue("followers"));
result.openArray();
while (respServerIt != respServers.end()) {
auto al = aliases.find(*respServerIt);
if (al != aliases.end()) {
result.add(VPackValue(al->second));
} else {
result.add(VPackValue(*respServerIt));
}
++respServerIt;
}
result.close(); // followers
result.close(); // shard
}
//////////////////////////////////////////////////////////////////////////////
/// @brief Report a single shard with progress
//////////////////////////////////////////////////////////////////////////////
static void ReportShardProgress(
std::string const& shardId, std::vector<ServerID> const& respServers,
std::unordered_map<ServerID, std::string> const& aliases, uint64_t total,
uint64_t current, VPackBuilder& result) {
TRI_ASSERT(result.isOpenObject());
result.add(VPackValue(shardId));
result.openObject();
// We always have at least the leader
TRI_ASSERT(!respServers.empty());
auto respServerIt = respServers.begin();
auto al = aliases.find(*respServerIt);
if (al != aliases.end()) {
result.add("leader", VPackValue(al->second));
} else {
result.add("leader", VPackValue(*respServerIt));
}
++respServerIt;
result.add(VPackValue("followers"));
result.openArray();
while (respServerIt != respServers.end()) {
auto al = aliases.find(*respServerIt);
if (al != aliases.end()) {
result.add(VPackValue(al->second));
} else {
result.add(VPackValue(*respServerIt));
}
++respServerIt;
}
result.close(); // followers
result.add(VPackValue("progress"));
// We have somehow invalid, mostlikely no shard has responded in time
if (total == current) {
// Reset current
current = 0;
}
result.openObject();
result.add("total", VPackValue(total));
result.add("current", VPackValue(current));
result.close(); // progress
result.close(); // shard
}
//////////////////////////////////////////////////////////////////////////////
/// @brief Report a list of leader and follower based on shardMap
//////////////////////////////////////////////////////////////////////////////
static void ReportPartialNoProgress(
ShardMap const* shardIds,
std::unordered_map<ServerID, std::string> const& aliases,
VPackBuilder& result) {
TRI_ASSERT(result.isOpenObject());
for (auto const& s : *shardIds) {
ReportShardNoProgress(s.first, s.second, aliases, result);
}
}
//////////////////////////////////////////////////////////////////////////////
/// @brief Report a complete collection an the insync format
//////////////////////////////////////////////////////////////////////////////
static void ReportInSync(
LogicalCollection const* col, ShardMap const* shardIds,
std::unordered_map<ServerID, std::string> const& aliases,
VPackBuilder& result) {
TRI_ASSERT(result.isOpenObject());
result.add(VPackValue(col->name()));
// In this report Plan and Current are identical
result.openObject();
{
// Add Plan
result.add(VPackValue("Plan"));
result.openObject();
ReportPartialNoProgress(shardIds, aliases, result);
result.close();
}
{
// Add Current
result.add(VPackValue("Current"));
result.openObject();
ReportPartialNoProgress(shardIds, aliases, result);
result.close();
}
result.close();
}
//////////////////////////////////////////////////////////////////////////////
/// @brief Report a complete collection in the offsync format, with eventually
/// known counts
//////////////////////////////////////////////////////////////////////////////
static void ReportOffSync(
LogicalCollection const* col, ShardMap const* shardIds,
std::unordered_map<ShardID, SyncCountInfo>& counters,
std::unordered_map<ServerID, std::string> const& aliases,
VPackBuilder& result) {
TRI_ASSERT(result.isOpenObject());
result.add(VPackValue(col->name()));
// In this report Plan and Current are identical
result.openObject();
{
// Add Plan
result.add(VPackValue("Plan"));
result.openObject();
for (auto const& s : *shardIds) {
TRI_ASSERT(counters.find(s.first) != counters.end());
auto const& c = counters[s.first];
if (c.insync) {
ReportShardNoProgress(s.first, s.second, aliases, result);
} else {
ReportShardProgress(s.first, s.second, aliases, c.total,
c.current, result);
}
}
result.close();
}
{
// Add Current
result.add(VPackValue("Current"));
result.openObject();
for (auto const& s : *shardIds) {
TRI_ASSERT(counters.find(s.first) != counters.end());
auto const& c = counters[s.first];
if (c.insync) {
ReportShardNoProgress(s.first, s.second, aliases, result);
} else {
ReportShardNoProgress(s.first, c.followers, aliases, result);
}
}
result.close();
}
result.close();
}
ShardDistributionReporter::ShardDistributionReporter(
std::shared_ptr<ClusterComm> cc, ClusterInfo* ci)
: _cc(cc), _ci(ci) {
TRI_ASSERT(_cc != nullptr);
TRI_ASSERT(_ci != nullptr);
}
ShardDistributionReporter::~ShardDistributionReporter() {}
std::shared_ptr<ShardDistributionReporter>
ShardDistributionReporter::instance() {
if (_theInstance == nullptr) {
_theInstance = std::make_shared<ShardDistributionReporter>(
ClusterComm::instance(), ClusterInfo::instance());
}
return _theInstance;
}
void ShardDistributionReporter::getDistributionForDatabase(
std::string const& dbName, VPackBuilder& result) {
double endtime = TRI_microtime() + 2.0; // We add two seconds
result.openObject();
auto aliases = _ci->getServerAliases();
auto cols = _ci->getCollections(dbName);
std::queue<std::shared_ptr<LogicalCollection>> todoSyncStateCheck;
for (auto col : cols) {
auto allShards = col->shardIds();
if (testAllShardsInSync(dbName, col.get(), allShards.get())) {
ReportInSync(col.get(), allShards.get(), aliases, result);
} else {
todoSyncStateCheck.push(col);
}
}
if (!todoSyncStateCheck.empty()) {
CoordTransactionID coordId = TRI_NewTickServer();
std::unordered_map<ShardID, SyncCountInfo> counters;
std::vector<ServerID> serversToAsk;
while (!todoSyncStateCheck.empty()) {
double timeleft = endtime - TRI_microtime();
serversToAsk.clear();
counters.clear();
auto const col = todoSyncStateCheck.front();
auto allShards = col->shardIds();
auto cic = _ci->getCollectionCurrent(dbName, col->cid_as_string());
// Send requests
for (auto const& s : *(allShards.get())) {
uint64_t requestsInFlight = 0;
OperationID leaderOpId = 0;
auto curServers = cic->servers(s.first);
auto& entry = counters[s.first]; // Emplaces a new SyncCountInfo
if (TestIsShardInSync(s.second, curServers)) {
entry.insync = true;
} else {
entry.followers = curServers;
if (timeleft > 0.0) {
std::string path = "/_api/collection/" + s.first + "/count";
auto body = std::make_shared<std::string const>();
{
// First Ask the leader
auto headers = std::make_unique<
std::unordered_map<std::string, std::string>>();
leaderOpId = _cc->asyncRequest(
"", coordId, "server:" + s.second.at(0), rest::RequestType::GET,
path, body, headers, nullptr, timeleft);
}
// Now figure out which servers need to be asked
for (auto const& planned : s.second) {
bool found = false;
for (auto const& current : entry.followers) {
if (current == planned) {
found = true;
break;
}
}
if (!found) {
serversToAsk.emplace_back(planned);
}
}
// Ask them
for (auto const& server : serversToAsk) {
auto headers = std::make_unique<
std::unordered_map<std::string, std::string>>();
_cc->asyncRequest("", coordId, "server:" + server,
rest::RequestType::GET, path, body, headers,
nullptr, timeleft);
requestsInFlight++;
}
// Wait for responses
// First wait for Leader
{
auto result = _cc->wait("", coordId, leaderOpId, "");
if (result.status != CL_COMM_RECEIVED) {
// We did not even get count for leader, use defaults
_cc->drop("", coordId, 0, "");
// Just in case, to get a new state
coordId = TRI_NewTickServer();
continue;
}
auto body = result.result->getBodyVelocyPack();
VPackSlice response = body->slice();
if (!response.isObject()) {
LOG_TOPIC(WARN, arangodb::Logger::CLUSTER)
<< "Recieved invalid response for count. Shard distribution "
"inaccurate";
continue;
}
response = response.get("count");
if (!response.isNumber()) {
LOG_TOPIC(WARN, arangodb::Logger::CLUSTER)
<< "Recieved invalid response for count. Shard distribution "
"inaccurate";
continue;
}
entry.total = response.getNumber<uint64_t>();
entry.current =
entry.total; // << We use this to flip around min/max test
}
// Now wait for others
while (requestsInFlight > 0) {
auto result = _cc->wait("", coordId, 0, "");
requestsInFlight--;
if (result.status != CL_COMM_RECEIVED) {
// We do not care for errors of any kind.
// We can continue here because all other requests will be handled
// by the accumulated timeout
continue;
} else {
auto body = result.result->getBodyVelocyPack();
VPackSlice response = body->slice();
if (!response.isObject()) {
LOG_TOPIC(WARN, arangodb::Logger::CLUSTER)
<< "Recieved invalid response for count. Shard "
"distribution inaccurate";
continue;
}
response = response.get("count");
if (!response.isNumber()) {
LOG_TOPIC(WARN, arangodb::Logger::CLUSTER)
<< "Recieved invalid response for count. Shard "
"distribution inaccurate";
continue;
}
uint64_t other = response.getNumber<uint64_t>();
if (other < entry.total) {
// If we have more in total we need the minimum of other counts
if (other < entry.current) {
entry.current = other;
}
} else {
// If we only have more in total we take the maximum of other
// counts
if (entry.total <= entry.current && other > entry.current) {
entry.current = other;
}
}
}
}
}
}
}
ReportOffSync(col.get(), allShards.get(), counters, aliases, result);
todoSyncStateCheck.pop();
}
}
result.close();
}
bool ShardDistributionReporter::testAllShardsInSync(
std::string const& dbName, LogicalCollection const* col,
ShardMap const* shardIds) {
TRI_ASSERT(col != nullptr);
TRI_ASSERT(shardIds != nullptr);
auto cic = _ci->getCollectionCurrent(dbName, col->cid_as_string());
for (auto const& s : *shardIds) {
auto curServers = cic->servers(s.first);
if (!TestIsShardInSync(s.second, curServers)) {
return false;
}
}
return true;
}

View File

@ -0,0 +1,85 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2017 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Michael Hackstein
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGOD_CLUSTER_SHARD_DISTRIBUTED_REPORTER_H
#define ARANGOD_CLUSTER_SHARD_DISTRIBUTED_REPORTER_H 1
#include "Basics/Common.h"
namespace arangodb {
class ClusterComm;
class ClusterInfo;
namespace velocypack {
class Builder;
}
class LogicalCollection;
namespace cluster {
class ShardDistributionReporter {
public:
// During production use this singleton instance.
// Do not actively create your own instance.
// The Constructor is only public for testing purposes.
static std::shared_ptr<ShardDistributionReporter> instance();
private:
//////////////////////////////////////////////////////////////////////////////
/// @brief the pointer to the singleton instance
//////////////////////////////////////////////////////////////////////////////
static std::shared_ptr<ShardDistributionReporter> _theInstance;
public:
ShardDistributionReporter(std::shared_ptr<ClusterComm> cc, ClusterInfo* ci);
~ShardDistributionReporter();
void getDistributionForDatabase(std::string const& dbName,
arangodb::velocypack::Builder& result);
void getDistributionForCollection(std::string const& dbName,
std::string const& colName,
arangodb::velocypack::Builder& result);
private:
bool testAllShardsInSync(
std::string const& dbName, LogicalCollection const* col,
std::unordered_map<std::string, std::vector<std::string>> const*
allShards);
void reportOffSync(
std::string const& dbName, LogicalCollection const* col,
std::unordered_map<std::string, std::vector<std::string>> const* shardIds,
std::unordered_map<std::string, std::string> const& aliases,
arangodb::velocypack::Builder& result) const;
private:
std::shared_ptr<ClusterComm> _cc;
ClusterInfo* _ci;
};
} // namespace cluster
} // namespace arangodb
#endif

View File

@ -27,9 +27,10 @@
#include <velocypack/velocypack-aliases.h>
#include "Agency/AgencyComm.h"
#include "Cluster/ClusterComm.h"
#include "Cluster/ClusterInfo.h"
#include "Cluster/ServerState.h"
#include "Cluster/ClusterComm.h"
#include "Cluster/ShardDistributionReporter.h"
#include "GeneralServer/AuthenticationFeature.h"
#include "RestServer/FeatureCacheFeature.h"
#include "V8/v8-buffer.h"
@ -1879,6 +1880,26 @@ static void JS_ClusterDownload(v8::FunctionCallbackInfo<v8::Value> const& args)
return JS_Download(args);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief Collect the distribution of shards
////////////////////////////////////////////////////////////////////////////////
static void JS_GetShardDistribution(v8::FunctionCallbackInfo<v8::Value> const& args) {
TRI_V8_TRY_CATCH_BEGIN(isolate);
ONLY_IN_CLUSTER
v8::HandleScope scope(isolate);
auto vocbase = GetContextVocBase(isolate);
auto reporter = cluster::ShardDistributionReporter::instance();
VPackBuilder result;
reporter->getDistributionForDatabase(vocbase->name(), result);
TRI_V8_RETURN(TRI_VPackToV8(isolate, result.slice()));
TRI_V8_TRY_CATCH_END
}
////////////////////////////////////////////////////////////////////////////////
/// @brief creates a global cluster context
////////////////////////////////////////////////////////////////////////////////
@ -2076,4 +2097,8 @@ void TRI_InitV8Cluster(v8::Isolate* isolate, v8::Handle<v8::Context> context) {
TRI_AddGlobalFunctionVocbase(
isolate, TRI_V8_ASCII_STRING(isolate, "SYS_CLUSTER_DOWNLOAD"),
JS_ClusterDownload);
TRI_AddGlobalFunctionVocbase(
isolate, TRI_V8_ASCII_STRING(isolate, "SYS_CLUSTER_SHARD_DISTRIBUTION"),
JS_GetShardDistribution);
}

View File

@ -118,14 +118,14 @@ class LogicalCollection {
inline TRI_voc_cid_t cid() const { return _cid; }
std::string cid_as_string() const;
virtual std::string cid_as_string() const;
TRI_voc_cid_t planId() const;
std::string planId_as_string() const;
TRI_col_type_e type() const;
std::string name() const;
virtual std::string name() const;
std::string dbName() const;
std::string globallyUniqueId() const;
@ -225,7 +225,9 @@ class LogicalCollection {
bool allowUserKeys() const;
virtual bool usesDefaultShardKeys() const;
std::vector<std::string> const& shardKeys() const;
std::shared_ptr<ShardMap> shardIds() const;
virtual std::shared_ptr<ShardMap> shardIds() const;
// return a filtered list of the collection's shards
std::shared_ptr<ShardMap> shardIds(
std::unordered_set<std::string> const& includedShards) const;

View File

@ -1064,108 +1064,6 @@ actions.defineHttp({
}
var result = require('@arangodb/cluster').shardDistribution();
var dbsToCheck = []; var diff;
var getDifference = function (a, b) {
return a.filter(function (i) {
return b.indexOf(i) < 0;
});
};
_.each(result.results, function (info, collection) {
_.each(info.Plan, function (shard, shardkey) {
// check if shard is out of sync
if (!_.isEqual(shard.followers, info.Current[shardkey].followers)) {
// if not in sync, get document counts of leader and compare with follower
diff = getDifference(shard.followers, info.Current[shardkey].followers);
dbsToCheck.push({
shard: shardkey,
toCheck: diff,
leader: info.Plan[shardkey].leader,
collection: collection
});
}
});
});
var leaderOP, leaderR, followerR, leaderBody, followerBody;
var options = { timeout: 10 };
_.each(dbsToCheck, function (shard) {
if (shard.leader.charAt(0) === '_') {
shard.leader = shard.leader.substr(1, shard.leader.length - 1);
}
if (typeof shard.toCheck === 'object') {
if (shard.toCheck.length === 0) {
return;
}
}
// get counts of leader and follower shard
leaderOP = null;
try {
leaderOP = ArangoClusterComm.asyncRequest('GET', 'server:' + shard.leader, req.database,
'/_api/collection/' + shard.shard + '/count', '', {}, options);
} catch (e) {
}
// IMHO these try...catch things should at least log something but I don't want to
// introduce last minute log spam before the release (this was not logging either before restructuring it)
let followerOps = shard.toCheck.map(follower => {
try {
return ArangoClusterComm.asyncRequest('GET', 'server:' + follower, req.database, '/_api/collection/' + shard.shard + '/count', '', {}, options);
} catch (e) {
return null;
}
});
let [minFollowerCount, maxFollowerCount] = followerOps.reduce((result, followerOp) => {
if (!followerOp) {
return result;
}
let followerCount = 0;
try {
followerR = ArangoClusterComm.wait(followerOp);
if (followerR.status !== 'BACKEND_UNAVAILABLE') {
try {
followerBody = JSON.parse(followerR.body);
followerCount = followerBody.count;
} catch (e) {
}
}
} catch (e) {
}
if (result === null) {
return [followerCount, followerCount];
} else {
return [Math.min(followerCount, result[0]), Math.max(followerCount, result[1])];
}
}, null);
let leaderCount = null;
if (leaderOP) {
leaderR = ArangoClusterComm.wait(leaderOP);
try {
leaderBody = JSON.parse(leaderR.body);
leaderCount = leaderBody.count;
} catch (e) {
}
}
let followerCount;
if (minFollowerCount < leaderCount) {
followerCount = minFollowerCount;
} else {
followerCount = maxFollowerCount;
}
result.results[shard.collection].Plan[shard.shard].progress = {
total: leaderCount,
current: followerCount
};
});
actions.resultOk(req, res, actions.HTTP_OK, result);
}
});

View File

@ -376,4 +376,13 @@
delete global.SYS_GET_TASK;
}
// //////////////////////////////////////////////////////////////////////////////
// / @brief getShardDistrbiution
// //////////////////////////////////////////////////////////////////////////////
if (global.SYS_CLUSTER_SHARD_DISTRIBUTION) {
exports.getShardDistribution = global.SYS_CLUSTER_SHARD_DISTRIBUTION;
delete global.SYS_CLUSTER_SHARD_DISTRIBUTION;
}
}());

View File

@ -1865,27 +1865,8 @@ function format (x) {
}
function shardDistribution () {
var db = require('internal').db;
var dbName = db._name();
var colls = db._collections();
var result = {};
for (var i = 0; i < colls.length; ++i) {
var collName = colls[i].name();
var collInfo = global.ArangoClusterInfo.getCollectionInfo(dbName, collName);
var shards = collInfo.shards;
var collInfoCurrent = {};
var shardNames = Object.keys(shards);
for (var j = 0; j < shardNames.length; ++j) {
collInfoCurrent[shardNames[j]] =
global.ArangoClusterInfo.getCollectionInfoCurrent(
dbName, collName, shardNames[j]).shorts;
}
result[collName] = {Plan: format(collInfo.shardShorts),
Current: format(collInfoCurrent)};
}
return {
results: result
results: require('internal').getShardDistribution()
};
}

View File

@ -1,4 +1,4 @@
/* global describe, beforeEach, afterEach, it, instanceInfo */
/* global describe, beforeEach, afterEach, it, instanceInfo, before */
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
@ -24,8 +24,12 @@
'use strict';
const expect = require('chai').expect;
const assert = require('chai').assert;
const internal = require('internal');
const download = require('internal').download;
const colName = "UnitTestDistributionTest";
const _ = require("lodash");
const request = require('@arangodb/request');
let coordinator = instanceInfo.arangods.filter(arangod => {
return arangod.role === 'coordinator';
@ -36,35 +40,22 @@ let dbServerCount = instanceInfo.arangods.filter(arangod => {
}).length;
describe('Shard distribution', function () {
afterEach(function() {
internal.db._drop('distributionTest');
});
it('should properly distribute a collection', function() {
internal.db._create('distributionTest', {replicationFactor: 2, numberOfShards: 16});
internal.print(coordinator.url + '/_admin/cluster/shardDistribution');
var d = download(coordinator.url + '/_admin/cluster/shardDistribution');
require('internal').print(d);
let distribution = JSON.parse(d.body).results;
let leaders = Object.keys(distribution.distributionTest.Current).reduce((current, shardKey) => {
let shard = distribution.distributionTest.Current[shardKey];
if (current.indexOf(shard.leader) === -1) {
current.push(shard.leader);
}
return current;
}, []);
expect(leaders).to.have.length.of.at.least(2);
});
it('should properly distribute a collection with full replication factor', function() {
internal.db._create('distributionTest', {replicationFactor: dbServerCount, numberOfShards: 16});
internal.print(coordinator.url + '/_admin/cluster/shardDistribution');
var d = download(coordinator.url + '/_admin/cluster/shardDistribution');
internal.print(d);
let distribution = JSON.parse(d.body).results;
let leaders = Object.keys(distribution.distributionTest.Current).reduce((current, shardKey) => {
let shard = distribution.distributionTest.Current[shardKey];
before(function() {
internal.db._drop(colName);
});
afterEach(function() {
internal.db._drop(colName);
});
it('should properly distribute a collection', function() {
internal.db._create(colName, {replicationFactor: 2, numberOfShards: 16});
var d = request.get(coordinator.url + '/_admin/cluster/shardDistribution');
let distribution = JSON.parse(d.body).results;
let leaders = Object.keys(distribution[colName].Current).reduce((current, shardKey) => {
let shard = distribution[colName].Current[shardKey];
if (current.indexOf(shard.leader) === -1) {
current.push(shard.leader);
}
@ -72,4 +63,73 @@ describe('Shard distribution', function () {
}, []);
expect(leaders).to.have.length.of.at.least(2);
});
describe('the format with full replication factor', function () {
let distribution;
const nrShards = 16;
before(function () {
internal.db._create(colName, {replicationFactor: dbServerCount, numberOfShards: nrShards});
var d = request.get(coordinator.url + '/_admin/cluster/shardDistribution');
distribution = JSON.parse(d.body).results[colName];
assert.isObject(distribution, 'The distribution for each collection has to be an object');
});
it('should have current and plan on top level', function () {
expect(distribution).to.have.all.keys(["Current", "Plan"]);
assert.isObject(distribution.Current, 'The Current has to be an object');
assert.isObject(distribution.Plan, 'The Current has to be an object');
});
it('should list identical shards in Current and Plan', function () {
let keys = Object.keys(distribution.Plan);
expect(keys.length).to.equal(nrShards);
// Check that keys (shardnames) are identical
expect(distribution.Current).to.have.all.keys(distribution.Plan);
});
it('should have the correct format of each shard in Plan', function () {
_.forEach(distribution.Plan, function (info, shard) {
if (info.hasOwnProperty('progress')) {
expect(info).to.have.all.keys(['leader', 'progress', 'followers']);
expect(info.progress).to.have.all.keys(['total', 'current']);
} else {
expect(info).to.have.all.keys(['leader', 'followers']);
}
expect(info.leader).to.match(/^DBServer/);
assert.isArray(info.followers, 'The followers need to be an array');
// We have one replica for each server, except the leader
expect(info.followers.length).to.equal(dbServerCount - 1);
_.forEach(info.followers, function (follower) {
expect(follower).to.match(/^DBServer/);
});
});
});
it('should have the correct format of each shard in Current', function () {
_.forEach(distribution.Current, function (info, shard) {
expect(info).to.have.all.keys(['leader', 'followers']);
expect(info.leader).to.match(/^DBServer/);
assert.isArray(info.followers, 'The followers need to be an array');
// We have at most one replica per db server. They may not be in sync yet.
expect(info.followers.length).to.below(dbServerCount);
_.forEach(info.followers, function (follower) {
expect(follower).to.match(/^DBServer/);
});
});
});
it('should distribute shards on all servers', function () {
let leaders = new Set();
_.forEach(distribution.Plan, function (info, shard) {
leaders.add(info.leader);
});
expect(leaders.size).to.equal(Math.min(dbServerCount, nrShards));
});
});
});

View File

@ -52,6 +52,7 @@ add_executable(
Cache/TransactionManager.cpp
Cache/TransactionsWithBackingStore.cpp
Cluster/ClusterHelpersTest.cpp
Cluster/ShardDistributionReporterTest.cpp
Geo/georeg.cpp
Graph/ClusterTraverserCacheTest.cpp
Pregel/typedbuffer.cpp

File diff suppressed because it is too large Load Diff