1
0
Fork 0
arangodb/arangod/Sharding/ShardingInfo.cpp

519 lines
18 KiB
C++

////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Jan Steemann
////////////////////////////////////////////////////////////////////////////////
#include "ShardingInfo.h"
#include "ApplicationFeatures/ApplicationServer.h"
#include "Basics/Exceptions.h"
#include "Basics/StaticStrings.h"
#include "Basics/VelocyPackHelper.h"
#include "Cluster/ClusterFeature.h"
#include "Cluster/ServerState.h"
#include "Logger/Logger.h"
#include "Sharding/ShardingFeature.h"
#include "Sharding/ShardingStrategy.h"
#include "Sharding/ShardingStrategyDefault.h"
#include "Utils/CollectionNameResolver.h"
#include "VocBase/KeyGenerator.h"
#include "VocBase/LogicalCollection.h"
#include <velocypack/velocypack-aliases.h>
using namespace arangodb;
ShardingInfo::ShardingInfo(arangodb::velocypack::Slice info, LogicalCollection* collection)
: _collection(collection),
_numberOfShards(basics::VelocyPackHelper::readNumericValue<size_t>(info, StaticStrings::NumberOfShards,
1)),
_replicationFactor(1),
_minReplicationFactor(1),
_distributeShardsLike(basics::VelocyPackHelper::getStringValue(info, StaticStrings::DistributeShardsLike,
"")),
_avoidServers(),
_shardKeys(),
_shardIds(new ShardMap()),
_shardingStrategy() {
bool const isSmart =
basics::VelocyPackHelper::readBooleanValue(info, StaticStrings::IsSmart, false);
if (isSmart && _collection->type() == TRI_COL_TYPE_EDGE) {
// smart edge collection
_numberOfShards = 0;
}
VPackSlice shardKeysSlice = info.get(StaticStrings::ShardKeys);
if (ServerState::instance()->isCoordinator()) {
if (_numberOfShards == 0 && !isSmart) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER,
"invalid number of shards");
}
// intentionally no call to validateNumberOfShards here,
// because this constructor is called from the constructor of
// LogicalCollection, and we want LogicalCollection to be created
// with any configured number of shards in case the maximum allowed
// number of shards is set or decreased in a cluster with already
// existing collections that would violate the setting.
// so we validate the number of shards against the maximum only
// when a collection is created by a user, and on a restore
}
VPackSlice distributeShardsLike = info.get(StaticStrings::DistributeShardsLike);
if (!distributeShardsLike.isNone() && !distributeShardsLike.isString()) {
THROW_ARANGO_EXCEPTION_MESSAGE(
TRI_ERROR_BAD_PARAMETER,
"invalid non-string value for 'distributeShardsLike'");
}
VPackSlice v = info.get(StaticStrings::NumberOfShards);
if (!v.isNone() && !v.isNumber() && !v.isNull()) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER,
"invalid number of shards");
}
if (info.hasKey("avoidServers")) {
auto avoidServersSlice = info.get("avoidServers");
if (avoidServersSlice.isArray()) {
for (const auto& i : VPackArrayIterator(avoidServersSlice)) {
if (i.isString()) {
_avoidServers.push_back(i.copyString());
} else {
LOG_TOPIC("e5bc6", ERR, arangodb::Logger::FIXME)
<< "avoidServers must be a vector of strings, we got "
<< avoidServersSlice.toJson() << ". discarding!";
_avoidServers.clear();
break;
}
}
}
}
bool isSatellite = false;
auto replicationFactorSlice = info.get(StaticStrings::ReplicationFactor);
if (!replicationFactorSlice.isNone()) {
bool isError = true;
if (replicationFactorSlice.isNumber()) {
_replicationFactor = replicationFactorSlice.getNumber<size_t>();
// mop: only allow satellite collections to be created explicitly
if (_replicationFactor > 0) {
isError = false;
#ifdef USE_ENTERPRISE
} else if (_replicationFactor == 0) {
isError = false;
#endif
}
}
#ifdef USE_ENTERPRISE
else if (replicationFactorSlice.isString() &&
replicationFactorSlice.copyString() == "satellite") {
_replicationFactor = 0;
_minReplicationFactor = 0;
_numberOfShards = 1;
_distributeShardsLike = "";
_avoidServers.clear();
isError = false;
isSatellite = true;
}
#endif
if (isError) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER,
"invalid replicationFactor");
}
}
if (!isSatellite) {
auto minReplicationFactorSlice = info.get(StaticStrings::MinReplicationFactor);
if (!minReplicationFactorSlice.isNone()) {
if (minReplicationFactorSlice.isNumber()) {
_minReplicationFactor = minReplicationFactorSlice.getNumber<size_t>();
if (_minReplicationFactor > _replicationFactor) {
THROW_ARANGO_EXCEPTION_MESSAGE(
TRI_ERROR_BAD_PARAMETER,
"minReplicationFactor cannot be larger than replicationFactor (" +
basics::StringUtils::itoa(_minReplicationFactor) + " > " +
basics::StringUtils::itoa(_replicationFactor) + ")");
}
if (_minReplicationFactor == 0 && _replicationFactor != 0) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER,
"minReplicationFactor cannot be 0");
}
} else {
THROW_ARANGO_EXCEPTION_MESSAGE(
TRI_ERROR_BAD_PARAMETER,
"minReplicationFactor needs to be an integer number");
}
}
}
// replicationFactor == 0 -> satellite collection
if (shardKeysSlice.isNone() || _replicationFactor == 0) {
// Use default.
_shardKeys.emplace_back(StaticStrings::KeyString);
} else {
if (shardKeysSlice.isArray()) {
for (auto const& sk : VPackArrayIterator(shardKeysSlice)) {
if (sk.isString()) {
std::string key = sk.copyString();
// remove : char at the beginning or end (for enterprise)
std::string stripped;
if (!key.empty()) {
if (key.front() == ':') {
stripped = key.substr(1);
} else if (key.back() == ':') {
stripped = key.substr(0, key.size() - 1);
} else {
stripped = key;
}
}
// system attributes are not allowed (except _key)
if (!stripped.empty() && stripped != StaticStrings::IdString &&
stripped != StaticStrings::RevString) {
_shardKeys.emplace_back(key);
}
}
}
if (_shardKeys.empty()) { // && !isCluster) {
// Compatibility. Old configs might store empty shard-keys locally.
// This is translated to ["_key"]. In cluster-case this always was
// forbidden.
// TODO: now we need to allow this, as we use cluster features for
// single servers in case of async failover
_shardKeys.emplace_back(StaticStrings::KeyString);
}
}
}
if (_shardKeys.empty() || _shardKeys.size() > 8) {
THROW_ARANGO_EXCEPTION_MESSAGE(
TRI_ERROR_BAD_PARAMETER,
std::string("invalid number of shard keys for collection"));
}
auto shardsSlice = info.get("shards");
if (shardsSlice.isObject()) {
for (auto const& shardSlice : VPackObjectIterator(shardsSlice)) {
if (shardSlice.key.isString() && shardSlice.value.isArray()) {
ShardID shard = shardSlice.key.copyString();
std::vector<ServerID> servers;
for (auto const& serverSlice : VPackArrayIterator(shardSlice.value)) {
servers.push_back(serverSlice.copyString());
}
_shardIds->emplace(shard, servers);
}
}
}
// set the sharding strategy
if (!ServerState::instance()->isRunningInCluster()) {
// shortcut, so we do not need to set up the whole application server for
// testing
_shardingStrategy = std::make_unique<ShardingStrategyNone>();
} else {
_shardingStrategy = application_features::ApplicationServer::getFeature<ShardingFeature>(
"Sharding")
->fromVelocyPack(info, this);
}
TRI_ASSERT(_shardingStrategy != nullptr);
}
ShardingInfo::ShardingInfo(ShardingInfo const& other, LogicalCollection* collection)
: _collection(collection),
_numberOfShards(other.numberOfShards()),
_replicationFactor(other.replicationFactor()),
_distributeShardsLike(other.distributeShardsLike()),
_avoidServers(other.avoidServers()),
_shardKeys(other.shardKeys()),
_shardIds(new ShardMap()),
_shardingStrategy() {
TRI_ASSERT(_collection != nullptr);
// set the sharding strategy
_shardingStrategy = application_features::ApplicationServer::getFeature<ShardingFeature>(
"Sharding")
->create(other._shardingStrategy->name(), this);
TRI_ASSERT(_shardingStrategy != nullptr);
}
ShardingInfo::~ShardingInfo() {}
bool ShardingInfo::usesSameShardingStrategy(ShardingInfo const* other) const {
return _shardingStrategy->isCompatible(other->_shardingStrategy.get());
}
std::string ShardingInfo::shardingStrategyName() const {
return _shardingStrategy->name();
}
LogicalCollection* ShardingInfo::collection() const {
TRI_ASSERT(_collection != nullptr);
return _collection;
}
void ShardingInfo::toVelocyPack(VPackBuilder& result, bool translateCids) {
result.add(StaticStrings::NumberOfShards, VPackValue(_numberOfShards));
result.add(VPackValue("shards"));
result.openObject();
auto tmpShards = _shardIds;
for (auto const& shards : *tmpShards) {
result.add(VPackValue(shards.first));
result.openArray();
for (auto const& servers : shards.second) {
result.add(VPackValue(servers));
}
result.close(); // server array
}
result.close(); // shards
if (isSatellite()) {
result.add(StaticStrings::ReplicationFactor, VPackValue("satellite"));
} else {
result.add(StaticStrings::ReplicationFactor, VPackValue(_replicationFactor));
}
result.add(StaticStrings::MinReplicationFactor, VPackValue(_minReplicationFactor));
if (!_distributeShardsLike.empty() && ServerState::instance()->isCoordinator()) {
if (translateCids) {
CollectionNameResolver resolver(_collection->vocbase());
result.add(StaticStrings::DistributeShardsLike,
VPackValue(resolver.getCollectionNameCluster(static_cast<TRI_voc_cid_t>(
basics::StringUtils::uint64(distributeShardsLike())))));
} else {
result.add(StaticStrings::DistributeShardsLike, VPackValue(distributeShardsLike()));
}
}
result.add(VPackValue(StaticStrings::ShardKeys));
result.openArray();
for (auto const& key : _shardKeys) {
result.add(VPackValue(key));
}
result.close(); // shardKeys
if (!_avoidServers.empty()) {
result.add(VPackValue("avoidServers"));
result.openArray();
for (auto const& server : _avoidServers) {
result.add(VPackValue(server));
}
result.close();
}
_shardingStrategy->toVelocyPack(result);
}
std::string ShardingInfo::distributeShardsLike() const {
return _distributeShardsLike;
}
void ShardingInfo::distributeShardsLike(std::string const& cid, ShardingInfo const* other) {
if (_shardKeys.size() != other->shardKeys().size()) {
THROW_ARANGO_EXCEPTION_MESSAGE(
TRI_ERROR_BAD_PARAMETER,
"cannot distribute shards like "
"a collection with a different number of shard key attributes");
}
if (!usesSameShardingStrategy(other)) {
// other collection has a different sharding strategy
// adjust our sharding so it uses the same strategy as the other collection
auto shr = application_features::ApplicationServer::getFeature<ShardingFeature>(
"Sharding");
_shardingStrategy = shr->create(other->shardingStrategyName(), this);
}
_distributeShardsLike = cid;
if (_collection->isSmart() && _collection->type() == TRI_COL_TYPE_EDGE) {
return;
}
_replicationFactor = other->replicationFactor();
_numberOfShards = other->numberOfShards();
}
std::vector<std::string> const& ShardingInfo::avoidServers() const {
return _avoidServers;
}
void ShardingInfo::avoidServers(std::vector<std::string> const& avoidServers) {
_avoidServers = avoidServers;
}
size_t ShardingInfo::replicationFactor() const {
TRI_ASSERT(_minReplicationFactor <= _replicationFactor);
return _replicationFactor;
}
void ShardingInfo::replicationFactor(size_t replicationFactor) {
if (replicationFactor < _minReplicationFactor) {
THROW_ARANGO_EXCEPTION_MESSAGE(
TRI_ERROR_BAD_PARAMETER,
"replicationFactor cannot be smaller than minReplicationFactor (" +
basics::StringUtils::itoa(_replicationFactor) + " < " +
basics::StringUtils::itoa(_minReplicationFactor) + ")");
}
_replicationFactor = replicationFactor;
}
size_t ShardingInfo::minReplicationFactor() const {
TRI_ASSERT(_minReplicationFactor <= _replicationFactor);
return _minReplicationFactor;
}
void ShardingInfo::minReplicationFactor(size_t minReplicationFactor) {
if (minReplicationFactor > _replicationFactor) {
THROW_ARANGO_EXCEPTION_MESSAGE(
TRI_ERROR_BAD_PARAMETER,
"minReplicationFactor cannot be larger than replicationFactor (" +
basics::StringUtils::itoa(_minReplicationFactor) + " > " +
basics::StringUtils::itoa(_replicationFactor) + ")");
}
_minReplicationFactor = minReplicationFactor;
}
void ShardingInfo::setMinAndMaxReplicationFactor(size_t minimal, size_t maximal) {
if (minimal > maximal) {
THROW_ARANGO_EXCEPTION_MESSAGE(
TRI_ERROR_BAD_PARAMETER,
"minReplicationFactor cannot be larger than replicationFactor (" +
basics::StringUtils::itoa(minimal) + " > " +
basics::StringUtils::itoa(maximal) + ")");
}
_minReplicationFactor = minimal;
_replicationFactor = maximal;
}
bool ShardingInfo::isSatellite() const { return _replicationFactor == 0; }
size_t ShardingInfo::numberOfShards() const { return _numberOfShards; }
void ShardingInfo::numberOfShards(size_t numberOfShards) {
// the only allowed value is "0", because the only allowed
// caller of this method is VirtualSmartEdgeCollection, which
// sets the number of shards to 0
TRI_ASSERT(numberOfShards == 0);
_numberOfShards = numberOfShards;
}
bool ShardingInfo::usesDefaultShardKeys() const {
return _shardingStrategy->usesDefaultShardKeys();
}
std::vector<std::string> const& ShardingInfo::shardKeys() const {
TRI_ASSERT(!_shardKeys.empty());
return _shardKeys;
}
std::shared_ptr<ShardMap> ShardingInfo::shardIds() const { return _shardIds; }
std::shared_ptr<std::vector<ShardID>> ShardingInfo::shardListAsShardID() const {
auto vector = std::make_shared<std::vector<ShardID>>();
for (auto const& mapElement : *_shardIds) {
vector->emplace_back(mapElement.first);
}
std::sort(vector->begin(), vector->end());
return vector;
}
// return a filtered list of the collection's shards
std::shared_ptr<ShardMap> ShardingInfo::shardIds(std::unordered_set<std::string> const& includedShards) const {
if (includedShards.empty()) {
return _shardIds;
}
std::shared_ptr<ShardMap> copy = _shardIds;
auto result = std::make_shared<ShardMap>();
for (auto const& it : *copy) {
if (includedShards.find(it.first) == includedShards.end()) {
// a shard we are not interested in
continue;
}
result->emplace(it.first, it.second);
}
return result;
}
void ShardingInfo::setShardMap(std::shared_ptr<ShardMap> const& map) {
_shardIds = map;
}
int ShardingInfo::getResponsibleShard(arangodb::velocypack::Slice slice, bool docComplete,
ShardID& shardID, bool& usesDefaultShardKeys,
std::string const& key) {
return _shardingStrategy->getResponsibleShard(slice, docComplete, shardID,
usesDefaultShardKeys, key);
}
Result ShardingInfo::validateShardsAndReplicationFactor(arangodb::velocypack::Slice slice) {
Result res;
if (slice.isObject()) {
auto const* cl = application_features::ApplicationServer::getFeature<ClusterFeature>("Cluster");
auto numberOfShardsSlice = slice.get(StaticStrings::NumberOfShards);
if (numberOfShardsSlice.isNumber()) {
uint32_t const maxNumberOfShards = cl->maxNumberOfShards();
uint32_t numberOfShards = numberOfShardsSlice.getNumber<uint32_t>();
if (maxNumberOfShards > 0 &&
numberOfShards > maxNumberOfShards) {
res.reset(TRI_ERROR_CLUSTER_TOO_MANY_SHARDS,
std::string("too many shards. maximum number of shards is ") + std::to_string(maxNumberOfShards));
}
}
auto replicationFactorSlice = slice.get(StaticStrings::ReplicationFactor);
if (replicationFactorSlice.isNumber()) {
uint32_t const minReplicationFactor = cl->minReplicationFactor();
uint32_t const maxReplicationFactor = cl->maxReplicationFactor();
int64_t replicationFactorProbe = replicationFactorSlice.getNumber<int64_t>();
if (replicationFactorProbe <= 0) {
res.reset(TRI_ERROR_BAD_PARAMETER, "invalid value for replicationFactor");
} else {
uint32_t replicationFactor = replicationFactorSlice.getNumber<uint32_t>();
if (replicationFactor > maxReplicationFactor &&
maxReplicationFactor > 0) {
res.reset(TRI_ERROR_BAD_PARAMETER,
std::string("replicationFactor must not be higher than maximum allowed replicationFactor (") + std::to_string(maxReplicationFactor) + ")");
} else if (replicationFactor < minReplicationFactor &&
minReplicationFactor > 0) {
res.reset(TRI_ERROR_BAD_PARAMETER,
std::string("replicationFactor must not be lower than minimum allowed replicationFactor (") + std::to_string(minReplicationFactor) + ")");
}
}
}
}
return res;
}