1
0
Fork 0

Handle replicationFactor and numberOfShards in arangorestore.

This adds an option --default-replication-factor to arangorestore and
makes it more convenient to specify the number of shards for restored
collections.
This commit is contained in:
Max Neunhoeffer 2016-07-05 00:10:20 +02:00
parent 2a5bc9297d
commit b7e957664f
7 changed files with 169 additions and 93 deletions

View File

@ -1939,4 +1939,66 @@ int flushWalOnAllDBServers(bool waitForSync, bool waitForCollector) {
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief compute a shard distribution for a new collection, the list
/// dbServers must be a list of DBserver ids to distribute across.
/// If this list is empty, the complete current list of DBservers is
/// fetched from ClusterInfo and with random_shuffle to mix it up.
////////////////////////////////////////////////////////////////////////////////
std::map<std::string, std::vector<std::string>> distributeShards(
uint64_t numberOfShards,
uint64_t replicationFactor,
std::vector<std::string>& dbServers) {
std::map<std::string, std::vector<std::string>> shards;
ClusterInfo* ci = ClusterInfo::instance();
if (dbServers.size() == 0) {
dbServers = ci->getCurrentDBServers();
if (dbServers.empty()) {
return shards;
}
random_shuffle(dbServers.begin(), dbServers.end());
}
// fetch a unique id for each shard to create
uint64_t const id = ci->uniqid(numberOfShards);
// now create the shards
size_t count = 0;
for (uint64_t i = 0; i < numberOfShards; ++i) {
// determine responsible server(s)
std::vector<std::string> serverIds;
for (uint64_t j = 0; j < replicationFactor; ++j) {
std::string candidate;
size_t count2 = 0;
bool found = true;
do {
candidate = dbServers[count++];
if (count >= dbServers.size()) {
count = 0;
}
if (++count2 == dbServers.size() + 1) {
LOG(WARN) << "createCollectionCoordinator: replicationFactor is "
"too large for the number of DBservers";
found = false;
break;
}
} while (std::find(serverIds.begin(), serverIds.end(), candidate) !=
serverIds.end());
if (found) {
serverIds.push_back(candidate);
}
}
// determine shard id
std::string shardId = "s" + StringUtils::itoa(id + 1 + i);
shards.insert(std::make_pair(shardId, serverIds));
}
return shards;
}
} // namespace arangodb

View File

@ -169,6 +169,19 @@ int truncateCollectionOnCoordinator(std::string const& dbname,
int flushWalOnAllDBServers(bool, bool);
////////////////////////////////////////////////////////////////////////////////
/// @brief compute a shard distribution for a new collection, the list
/// dbServers must be a list of DBserver ids to distribute across.
/// If this list is empty, the complete current list of DBservers is
/// fetched from ClusterInfo. If shuffle is true, a few random shuffles
/// are performed before the list is taken. Thus modifies the list.
////////////////////////////////////////////////////////////////////////////////
std::map<std::string, std::vector<std::string>> distributeShards(
uint64_t numberOfShards,
uint64_t replicationFactor,
std::vector<std::string>& dbServers);
} // namespace arangodb
#endif

View File

@ -1372,12 +1372,20 @@ void RestReplicationHandler::handleCommandRestoreCollection() {
numberOfShards = StringUtils::uint64(value4);
}
uint64_t replicationFactor = 1;
std::string const& value5 = _request->value("replicationFactor", found);
if (found) {
replicationFactor = StringUtils::uint64(value5);
}
std::string errorMsg;
int res;
if (ServerState::instance()->isCoordinator()) {
res = processRestoreCollectionCoordinator(slice, overwrite, recycleIds,
force, numberOfShards, errorMsg);
force, numberOfShards, errorMsg,
replicationFactor);
} else {
res =
processRestoreCollection(slice, overwrite, recycleIds, force, errorMsg);
@ -1577,7 +1585,7 @@ int RestReplicationHandler::processRestoreCollection(
int RestReplicationHandler::processRestoreCollectionCoordinator(
VPackSlice const& collection, bool dropExisting, bool reuseId, bool force,
uint64_t numberOfShards, std::string& errorMsg) {
uint64_t numberOfShards, std::string& errorMsg, uint64_t replicationFactor) {
if (!collection.isObject()) {
errorMsg = "collection declaration is invalid";
@ -1645,21 +1653,35 @@ int RestReplicationHandler::processRestoreCollectionCoordinator(
}
// now re-create the collection
// dig out number of shards:
VPackSlice const shards = parameters.get("shards");
if (shards.isObject()) {
numberOfShards = static_cast<uint64_t>(shards.length());
// dig out number of shards, explicit attribute takes precedence:
VPackSlice const numberOfShardsSlice = parameters.get("numberOfShards");
if (numberOfShardsSlice.isInteger()) {
numberOfShards = numberOfShardsSlice.getNumericValue<uint64_t>();
} else {
// "shards" not specified
// now check if numberOfShards property was given
if (numberOfShards == 0) {
// We take one shard if no value was given
numberOfShards = 1;
VPackSlice const shards = parameters.get("shards");
if (shards.isObject()) {
numberOfShards = static_cast<uint64_t>(shards.length());
} else {
// "shards" not specified
// now check if numberOfShards property was given
if (numberOfShards == 0) {
// We take one shard if no value was given
numberOfShards = 1;
}
}
}
TRI_ASSERT(numberOfShards > 0);
VPackSlice const replFactorSlice = parameters.get("replicationFactor");
if (replFactorSlice.isInteger()) {
replicationFactor = replFactorSlice.getNumericValue
<decltype(replicationFactor)>();
}
if (replicationFactor == 0) {
replicationFactor = 1;
}
try {
TRI_voc_tick_t newIdTick = ci->uniqid(1);
VPackBuilder toMerge;
@ -1677,32 +1699,30 @@ int RestReplicationHandler::processRestoreCollectionCoordinator(
}
// shards
if (!shards.isObject()) {
// if no shards were given, create a random list of shards
auto dbServers = ci->getCurrentDBServers();
if (dbServers.empty()) {
errorMsg = "no database servers found in cluster";
return TRI_ERROR_INTERNAL;
}
std::random_shuffle(dbServers.begin(), dbServers.end());
uint64_t const id = ci->uniqid(1 + numberOfShards);
toMerge.add("shards", VPackValue(VPackValueType::Object));
for (uint64_t i = 0; i < numberOfShards; ++i) {
// shard id
toMerge.add(
VPackValue(std::string("s" + StringUtils::itoa(id + 1 + i))));
// server ids
toMerge.add(VPackValue(VPackValueType::Array));
toMerge.add(VPackValue(dbServers[i % dbServers.size()]));
toMerge.close(); // server ids
}
toMerge.close(); // end of shards
std::vector<std::string> dbServers; // will be filled
std::map<std::string, std::vector<std::string>> shardDistribution
= arangodb::distributeShards(numberOfShards, replicationFactor,
dbServers);
if (shardDistribution.empty()) {
errorMsg = "no database servers found in cluster";
return TRI_ERROR_INTERNAL;
}
toMerge.add(VPackValue("shards"));
{
VPackObjectBuilder guard(&toMerge);
for (auto const& p : shardDistribution) {
toMerge.add(VPackValue(p.first));
{
VPackArrayBuilder guard2(&toMerge);
for (std::string const& s : p.second) {
toMerge.add(VPackValue(s));
}
}
}
}
toMerge.add("replicationFactor", VPackValue(replicationFactor));
// Now put in the primary and an edge index if needed:
toMerge.add("indexes", VPackValue(VPackValueType::Array));

View File

@ -186,7 +186,7 @@ class RestReplicationHandler : public RestVocbaseBaseHandler {
//////////////////////////////////////////////////////////////////////////////
int processRestoreCollectionCoordinator(VPackSlice const&, bool, bool, bool,
uint64_t, std::string&);
uint64_t, std::string&, uint64_t);
//////////////////////////////////////////////////////////////////////////////
/// @brief restores the indexes of a collection TODO MOVE

View File

@ -26,6 +26,7 @@
#include "Basics/StringUtils.h"
#include "Basics/tri-strings.h"
#include "Basics/VelocyPackHelper.h"
#include "Cluster/ClusterMethods.h"
#include "FulltextIndex/fulltext-index.h"
#include "Indexes/EdgeIndex.h"
#include "Indexes/FulltextIndex.h"
@ -965,8 +966,9 @@ static void CreateCollectionCoordinator(
ClusterInfo* ci = ClusterInfo::instance();
// fetch a unique id for the new collection plus one for each shard to create
uint64_t const id = ci->uniqid(1 + numberOfShards);
// fetch a unique id for the new collection, this is also used for the
// edge index, if that is needed, therefore we create the id anyway:
uint64_t const id = ci->uniqid(1);
if (cid.empty()) {
// collection id is the first unique id we got
cid = StringUtils::itoa(id);
@ -975,7 +977,6 @@ static void CreateCollectionCoordinator(
std::vector<std::string> dbServers;
bool done = false;
if (!distributeShardsLike.empty()) {
CollectionNameResolver resolver(vocbase);
TRI_voc_cid_t otherCid =
@ -996,55 +997,20 @@ static void CreateCollectionCoordinator(
}
}
}
done = true;
}
}
}
if (!done) {
// fetch list of available servers in cluster, and shuffle them randomly
dbServers = ci->getCurrentDBServers();
// If the list dbServers is still empty, it will be filled in
// distributeShards below.
if (dbServers.empty()) {
TRI_V8_THROW_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
"no database servers found in cluster");
}
random_shuffle(dbServers.begin(), dbServers.end());
}
// now create the shards
std::map<std::string, std::vector<std::string>> shards;
size_t count = 0;
for (uint64_t i = 0; i < numberOfShards; ++i) {
// determine responsible server(s)
std::vector<std::string> serverIds;
for (uint64_t j = 0; j < replicationFactor; ++j) {
std::string candidate;
size_t count2 = 0;
bool found = true;
do {
candidate = dbServers[count++];
if (count >= dbServers.size()) {
count = 0;
}
if (++count2 == dbServers.size() + 1) {
LOG(WARN) << "createCollectionCoordinator: replicationFactor is "
"too large for the number of DBservers";
found = false;
break;
}
} while (std::find(serverIds.begin(), serverIds.end(), candidate) !=
serverIds.end());
if (found) {
serverIds.push_back(candidate);
}
}
// determine shard id
std::string shardId = "s" + StringUtils::itoa(id + 1 + i);
shards.insert(std::make_pair(shardId, serverIds));
// Now create the shards:
std::map<std::string, std::vector<std::string>> shards
= arangodb::distributeShards(numberOfShards, replicationFactor,
dbServers);
if (shards.empty()) {
TRI_V8_THROW_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
"no database servers found in cluster");
}
// now create the VelocyPack for the collection

View File

@ -66,6 +66,7 @@ RestoreFeature::RestoreFeature(application_features::ApplicationServer* server,
_force(false),
_clusterMode(false),
_defaultNumberOfShards(1),
_defaultReplicationFactor(1),
_result(result),
_stats{ 0, 0, 0 } {
requiresElevatedPrivileges(false);
@ -119,6 +120,10 @@ void RestoreFeature::collectOptions(
"default value for numberOfShards if not specified",
new UInt64Parameter(&_defaultNumberOfShards));
options->addOption("--default-replication-factor",
"default value for replicationFactor if not specified",
new UInt64Parameter(&_defaultReplicationFactor));
options->addOption(
"--force", "continue restore even in the face of some server-side errors",
new BooleanParameter(&_force));
@ -222,16 +227,25 @@ int RestoreFeature::sendRestoreCollection(VPackSlice const& slice,
std::string(_recycleIds ? "true" : "false") + "&force=" +
std::string(_force ? "true" : "false");
if (_clusterMode &&
!slice.hasKey(std::vector<std::string>({"parameters", "shards"})) &&
!slice.hasKey(
std::vector<std::string>({"parameters", "numberOfShards"}))) {
// no "shards" and no "numberOfShards" attribute present. now assume
// default value from --default-number-of-shards
std::cerr << "# no sharding information specified for collection '" << name
<< "', using default number of shards " << _defaultNumberOfShards
<< std::endl;
url += "&numberOfShards=" + std::to_string(_defaultNumberOfShards);
if (_clusterMode) {
if (!slice.hasKey(std::vector<std::string>({"parameters", "shards"})) &&
!slice.hasKey(
std::vector<std::string>({"parameters", "numberOfShards"}))) {
// no "shards" and no "numberOfShards" attribute present. now assume
// default value from --default-number-of-shards
std::cerr << "# no sharding information specified for collection '"
<< name << "', using default number of shards "
<< _defaultNumberOfShards << std::endl;
url += "&numberOfShards=" + std::to_string(_defaultNumberOfShards);
}
if (!slice.hasKey(std::vector<std::string>({"parameters", "replicationFactor"}))) {
// No replication factor given, so take the default:
std::cerr << "# no replication information specified for collection '"
<< name << "', using default replication factor "
<< _defaultReplicationFactor << std::endl;
url += "&replicationFactor=" + std::to_string(_defaultReplicationFactor);
}
}
std::string const body = slice.toJson();

View File

@ -62,6 +62,7 @@ class RestoreFeature final
bool _force;
bool _clusterMode;
uint64_t _defaultNumberOfShards;
uint64_t _defaultReplicationFactor;
private:
int tryCreateDatabase(ClientFeature*, std::string const& name);