1
0
Fork 0

more dictributeShardsLike code mergedfrom 3.1

This commit is contained in:
Kaveh Vahedipour 2017-04-24 15:12:38 +02:00
parent f9259b4677
commit c099c6daa9
13 changed files with 196 additions and 44 deletions

View File

@ -237,6 +237,7 @@ std::vector<Job::shard_t> Job::clones(
std::vector<shard_t> ret;
ret.emplace_back(collection, shard); // add (collection, shard) as first item
typedef std::unordered_map<std::string, std::shared_ptr<Node>> UChildren;
try {
std::string databasePath = planColPrefix + database,

View File

@ -57,7 +57,11 @@ Supervision::Supervision()
_selfShutdown(false),
_upgraded(false) {}
Supervision::~Supervision() { shutdown(); };
Supervision::~Supervision() {
if (!isStopping()) {
shutdown();
}
};
static std::string const syncPrefix = "/Sync/ServerStates/";
static std::string const healthPrefix = "/Supervision/Health/";

View File

@ -1056,6 +1056,14 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName,
std::string const name =
arangodb::basics::VelocyPackHelper::getStringValue(json, "name", "");
std::shared_ptr<ShardMap> otherCidShardMap = nullptr;
if (json.hasKey("distributeShardsLike")) {
auto const otherCidString = json.get("distributeShardsLike").copyString();
if (!otherCidString.empty()) {
otherCidShardMap = getCollection(databaseName, otherCidString)->shardIds();
}
}
{
// check if a collection with the same name is already planned
loadPlan();
@ -1144,29 +1152,65 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName,
VPackBuilder builder;
builder.add(json);
AgencyOperation createCollection(
std::vector<AgencyOperation> opers (
{ AgencyOperation("Plan/Collections/" + databaseName + "/" + collectionID,
AgencyValueOperationType::SET, builder.slice()),
AgencyOperation("Plan/Version", AgencySimpleOperationType::INCREMENT_OP)});
std::vector<AgencyPrecondition> precs;
precs.emplace_back(
AgencyPrecondition(
"Plan/Collections/" + databaseName + "/" + collectionID,
AgencyValueOperationType::SET, builder.slice());
AgencyOperation increaseVersion("Plan/Version",
AgencySimpleOperationType::INCREMENT_OP);
AgencyPrecondition::Type::EMPTY, true));
AgencyPrecondition precondition = AgencyPrecondition(
"Plan/Collections/" + databaseName + "/" + collectionID,
AgencyPrecondition::Type::EMPTY, true);
// Any of the shards locked?
if (otherCidShardMap != nullptr) {
for (auto const& shard : *otherCidShardMap) {
precs.emplace_back(
AgencyPrecondition("Supervision/Shards/" + shard.first,
AgencyPrecondition::Type::EMPTY, true));
}
}
AgencyWriteTransaction transaction;
AgencyGeneralTransaction transaction;
transaction.transactions.push_back(
AgencyGeneralTransaction::TransactionType(opers,precs));
transaction.operations.push_back(createCollection);
transaction.operations.push_back(increaseVersion);
transaction.preconditions.push_back(precondition);
AgencyCommResult res = ac.sendTransactionWithFailover(transaction);
auto res = ac.sendTransactionWithFailover(transaction);
auto result = res.slice();
// Only if not precondition failed
if (!res.successful()) {
if (res.httpCode() ==
(int)arangodb::rest::ResponseCode::PRECONDITION_FAILED) {
AgencyCommResult ag = ac.getValues("/");
if (result.isArray() && result.length() > 0) {
if (result[0].isObject()) {
auto tres = result[0];
if (tres.hasKey(
std::vector<std::string>(
{AgencyCommManager::path(), "Plan", "Collections", databaseName,collectionID}))) {
errorMsg += std::string( "Preexisting collection with ID ") + collectionID;
} else if (
tres.hasKey(
std::vector<std::string>(
{AgencyCommManager::path(), "Supervision"}))) {
for (const auto& s :
VPackObjectIterator(
tres.get(
std::vector<std::string>(
{AgencyCommManager::path(), "Supervision","Shards"})))) {
errorMsg += std::string("Shard ") + s.key.copyString();
errorMsg += " of prototype collection is blocked by supervision job ";
errorMsg += s.value.copyString();
}
}
return TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION_IN_PLAN;
}
}
if (ag.successful()) {
LOG_TOPIC(ERR, Logger::CLUSTER) << "Agency dump:\n"
<< ag.slice().toJson();
@ -1219,6 +1263,21 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName,
} else {
LOG_TOPIC(ERR, Logger::CLUSTER) << "Could not get agency dump!";
}
// Now we ought to remove the collection again in the Plan:
AgencyOperation removeCollection(
"Plan/Collections/" + databaseName + "/" + collectionID,
AgencySimpleOperationType::DELETE_OP);
AgencyOperation increaseVersion("Plan/Version",
AgencySimpleOperationType::INCREMENT_OP);
AgencyWriteTransaction transaction;
transaction.operations.push_back(removeCollection);
transaction.operations.push_back(increaseVersion);
// This is a best effort, in the worst case the collection stays:
ac.sendTransactionWithFailover(transaction);
events::CreateCollection(name, TRI_ERROR_CLUSTER_TIMEOUT);
return setErrormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg);
}

View File

@ -2166,14 +2166,14 @@ std::unordered_map<std::string, std::vector<std::string>> distributeShards(
#ifndef USE_ENTERPRISE
std::unique_ptr<LogicalCollection>
ClusterMethods::createCollectionOnCoordinator(TRI_col_type_e collectionType,
TRI_vocbase_t* vocbase,
VPackSlice parameters) {
ClusterMethods::createCollectionOnCoordinator(
TRI_col_type_e collectionType, TRI_vocbase_t* vocbase, VPackSlice parameters,
bool ignoreDistributeShardsLikeErrors) {
auto col = std::make_unique<LogicalCollection>(vocbase, parameters);
// Collection is a temporary collection object that undergoes sanity checks etc.
// It is not used anywhere and will be cleaned up after this call.
// Persist collection will return the real object.
return persistCollectionInAgency(col.get());
return persistCollectionInAgency(col.get(), ignoreDistributeShardsLikeErrors);
}
#endif
@ -2182,11 +2182,14 @@ ClusterMethods::createCollectionOnCoordinator(TRI_col_type_e collectionType,
////////////////////////////////////////////////////////////////////////////////
std::unique_ptr<LogicalCollection>
ClusterMethods::persistCollectionInAgency(LogicalCollection* col) {
ClusterMethods::persistCollectionInAgency(
LogicalCollection* col, bool ignoreDistributeShardsLikeErrors) {
std::string distributeShardsLike = col->distributeShardsLike();
std::vector<std::string> dbServers;
std::vector<std::string> avoid = col->avoidServers();
bool chainOfDistributeShardsLike = false;
ClusterInfo* ci = ClusterInfo::instance();
if (!distributeShardsLike.empty()) {
@ -2200,6 +2203,9 @@ ClusterMethods::persistCollectionInAgency(LogicalCollection* col) {
try {
std::shared_ptr<LogicalCollection> collInfo =
ci->getCollection(col->dbName(), otherCidString);
if (!collInfo->distributeShardsLike().empty()) {
chainOfDistributeShardsLike = true;
}
auto shards = collInfo->shardIds();
auto shardList = ci->getShardList(otherCidString);
for (auto const& s : *shardList) {
@ -2210,11 +2216,20 @@ ClusterMethods::persistCollectionInAgency(LogicalCollection* col) {
}
}
}
} catch (...) {
}
col->distributeShardsLike(otherCidString);
} catch (...) {}
if (chainOfDistributeShardsLike) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_CHAIN_OF_DISTRIBUTESHARDSLIKE);
}
col->distributeShardsLike(otherCidString);
} else {
if (ignoreDistributeShardsLikeErrors) {
col->distributeShardsLike(std::string());
} else {
THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_UNKNOWN_DISTRIBUTESHARDSLIKE);
}
}
} else if(!avoid.empty()) {
size_t replicationFactor = col->replicationFactor();

View File

@ -215,7 +215,8 @@ class ClusterMethods {
// to the caller, which is expressed by the returned unique_ptr.
static std::unique_ptr<LogicalCollection> createCollectionOnCoordinator(
TRI_col_type_e collectionType, TRI_vocbase_t* vocbase,
arangodb::velocypack::Slice parameters);
arangodb::velocypack::Slice parameters,
bool ignoreDistributeShardsLikeErrors = false);
private:
@ -224,7 +225,7 @@ class ClusterMethods {
////////////////////////////////////////////////////////////////////////////////
static std::unique_ptr<LogicalCollection> persistCollectionInAgency(
LogicalCollection* col);
LogicalCollection* col, bool ignoreDistributeShardsLikeErrors);
};
} // namespace arangodb

View File

@ -1331,6 +1331,11 @@ void RestReplicationHandler::handleCommandRestoreCollection() {
force = StringUtils::boolean(value3);
}
std::string const& value9 =
_request->value("ignoreDistributeShardsLikeErrors", found);
bool ignoreDistributeShardsLikeErrors =
found ? StringUtils::boolean(value9) : false;
uint64_t numberOfShards = 0;
std::string const& value4 = _request->value("numberOfShards", found);
@ -1349,9 +1354,11 @@ void RestReplicationHandler::handleCommandRestoreCollection() {
int res;
if (ServerState::instance()->isCoordinator()) {
res = processRestoreCollectionCoordinator(slice, overwrite, recycleIds,
force, numberOfShards, errorMsg,
replicationFactor);
res = (ServerState::instance()->isCoordinator()) ?
processRestoreCollectionCoordinator(
slice, overwrite, recycleIds, force, numberOfShards, errorMsg,
replicationFactor, ignoreDistributeShardsLikeErrors) :
processRestoreCollection(slice, overwrite, recycleIds, force, errorMsg);
} else {
res =
processRestoreCollection(slice, overwrite, recycleIds, force, errorMsg);
@ -1539,7 +1546,7 @@ int RestReplicationHandler::processRestoreCollection(
int RestReplicationHandler::processRestoreCollectionCoordinator(
VPackSlice const& collection, bool dropExisting, bool reuseId, bool force,
uint64_t numberOfShards, std::string& errorMsg,
uint64_t replicationFactor) {
uint64_t replicationFactor, bool ignoreDistributeShardsLikeErrors) {
if (!collection.isObject()) {
errorMsg = "collection declaration is invalid";
@ -1668,8 +1675,8 @@ int RestReplicationHandler::processRestoreCollectionCoordinator(
VPackSlice const merged = mergedBuilder.slice();
try {
auto col = ClusterMethods::createCollectionOnCoordinator(collectionType,
_vocbase, merged);
auto col = ClusterMethods::createCollectionOnCoordinator(
collectionType, _vocbase, merged, ignoreDistributeShardsLikeErrors);
TRI_ASSERT(col != nullptr);
} catch (basics::Exception const& e) {
// Error, report it.

View File

@ -189,7 +189,7 @@ class RestReplicationHandler : public RestVocbaseBaseHandler {
//////////////////////////////////////////////////////////////////////////////
int processRestoreCollectionCoordinator(VPackSlice const&, bool, bool, bool,
uint64_t, std::string&, uint64_t);
uint64_t, std::string&, uint64_t, bool);
//////////////////////////////////////////////////////////////////////////////
/// @brief restores the indexes of a collection TODO MOVE

View File

@ -65,6 +65,7 @@ RestoreFeature::RestoreFeature(application_features::ApplicationServer* server,
_overwrite(true),
_recycleIds(false),
_force(false),
_ignoreDistributeShardsLikeErrors(false),
_clusterMode(false),
_defaultNumberOfShards(1),
_defaultReplicationFactor(1),
@ -124,6 +125,11 @@ void RestoreFeature::collectOptions(
"default value for replicationFactor if not specified",
new UInt64Parameter(&_defaultReplicationFactor));
options->addOption(
"--ignore-distribute-shards-like-errors",
"continue restore even if sharding prototype collection is missing",
new BooleanParameter(&_ignoreDistributeShardsLikeErrors));
options->addOption(
"--force", "continue restore even in the face of some server-side errors",
new BooleanParameter(&_force));
@ -226,7 +232,9 @@ int RestoreFeature::sendRestoreCollection(VPackSlice const& slice,
"?overwrite=" +
std::string(_overwrite ? "true" : "false") + "&recycleIds=" +
std::string(_recycleIds ? "true" : "false") + "&force=" +
std::string(_force ? "true" : "false");
std::string(_force ? "true" : "false") +
"&ignoreDistributeShardsLikeErrors=" +
std::string(_ignoreDistributeShardsLikeErrors ? "true":"false");
if (_clusterMode) {
if (!slice.hasKey(std::vector<std::string>({"parameters", "shards"})) &&

View File

@ -60,6 +60,7 @@ class RestoreFeature final
bool _overwrite;
bool _recycleIds;
bool _force;
bool _ignoreDistributeShardsLikeErrors;
bool _clusterMode;
uint64_t _defaultNumberOfShards;
uint64_t _defaultReplicationFactor;

View File

@ -162,6 +162,9 @@
"ERROR_CLUSTER_AQL_COLLECTION_OUT_OF_SYNC" : { "code" : 1481, "message" : "collection is out of sync" },
"ERROR_CLUSTER_COULD_NOT_CREATE_INDEX_IN_PLAN" : { "code" : 1482, "message" : "could not create index in plan" },
"ERROR_CLUSTER_COULD_NOT_DROP_INDEX_IN_PLAN" : { "code" : 1483, "message" : "could not drop index in plan" },
"ERROR_CLUSTER_CHAIN_OF_DISTRIBUTESHARDSLIKE" : { "code" : 1484, "message" : "chain of distributeShardsLike references" },
"ERROR_CLUSTER_MUST_NOT_DROP_COLL_OTHER_DISTRIBUTESHARDSLIKE" : { "code" : 1485, "message" : "must not drop collection while another has a distributeShardsLike attribute pointing to it" },
"ERROR_CLUSTER_UNKNOWN_DISTRIBUTESHARDSLIKE" : { "code" : 1486, "message" : "must not have a distributeShardsLike attribute pointing to an unknown collection" },
"ERROR_QUERY_KILLED" : { "code" : 1500, "message" : "query killed" },
"ERROR_QUERY_PARSE" : { "code" : 1501, "message" : "%s" },
"ERROR_QUERY_EMPTY" : { "code" : 1502, "message" : "query is empty" },

View File

@ -198,6 +198,10 @@ ERROR_CLUSTER_AGENCY_STRUCTURE_INVALID,1480,"Invalid agency structure","The stru
ERROR_CLUSTER_AQL_COLLECTION_OUT_OF_SYNC,1481,"collection is out of sync","Will be raised if a collection needed during query execution is out of sync. This currently can only happen when using satellite collections"
ERROR_CLUSTER_COULD_NOT_CREATE_INDEX_IN_PLAN,1482,"could not create index in plan","Will be raised when a coordinator in a cluster cannot create an entry for a new index in the Plan hierarchy in the agency."
ERROR_CLUSTER_COULD_NOT_DROP_INDEX_IN_PLAN,1483,"could not drop index in plan","Will be raised when a coordinator in a cluster cannot remove an index from the Plan hierarchy in the agency."
ERROR_CLUSTER_CHAIN_OF_DISTRIBUTESHARDSLIKE,1484,"chain of distributeShardsLike references","Will be raised if one tries to create a collection with a distributeShardsLike attribute which points to another collection that also has one."
ERROR_CLUSTER_MUST_NOT_DROP_COLL_OTHER_DISTRIBUTESHARDSLIKE,1485,"must not drop collection while another has a distributeShardsLike attribute pointing to it","Will be raised if one tries to drop a collection to which another collection points with its distributeShardsLike attribute."
ERROR_CLUSTER_UNKNOWN_DISTRIBUTESHARDSLIKE,1486,"must not have a distributeShardsLike attribute pointing to an unknown collection","Will be raised if one tries to create a collection which points to an unknown collection in its distributeShardsLike attribute."
################################################################################
## ArangoDB query errors

View File

@ -158,6 +158,9 @@ void TRI_InitializeErrorMessages () {
REG_ERROR(ERROR_CLUSTER_AQL_COLLECTION_OUT_OF_SYNC, "collection is out of sync");
REG_ERROR(ERROR_CLUSTER_COULD_NOT_CREATE_INDEX_IN_PLAN, "could not create index in plan");
REG_ERROR(ERROR_CLUSTER_COULD_NOT_DROP_INDEX_IN_PLAN, "could not drop index in plan");
REG_ERROR(ERROR_CLUSTER_CHAIN_OF_DISTRIBUTESHARDSLIKE, "chain of distributeShardsLike references");
REG_ERROR(ERROR_CLUSTER_MUST_NOT_DROP_COLL_OTHER_DISTRIBUTESHARDSLIKE, "must not drop collection while another has a distributeShardsLike attribute pointing to it");
REG_ERROR(ERROR_CLUSTER_UNKNOWN_DISTRIBUTESHARDSLIKE, "must not have a distributeShardsLike attribute pointing to an unknown collection");
REG_ERROR(ERROR_QUERY_KILLED, "query killed");
REG_ERROR(ERROR_QUERY_PARSE, "%s");
REG_ERROR(ERROR_QUERY_EMPTY, "query is empty");

View File

@ -388,6 +388,16 @@
/// - 1483: @LIT{could not drop index in plan}
/// Will be raised when a coordinator in a cluster cannot remove an index
/// from the Plan hierarchy in the agency.
/// - 1484: @LIT{chain of distributeShardsLike references}
/// Will be raised if one tries to create a collection with a
/// distributeShardsLike attribute which points to another collection that
/// also has one.
/// - 1485: @LIT{must not drop collection while another has a distributeShardsLike attribute pointing to it}
/// Will be raised if one tries to drop a collection to which another
/// collection points with its distributeShardsLike attribute.
/// - 1486: @LIT{must not have a distributeShardsLike attribute pointing to an unknown collection}
/// Will be raised if one tries to create a collection which points to an
/// unknown collection in its distributeShardsLike attribute.
/// - 1500: @LIT{query killed}
/// Will be raised when a running query is killed by an explicit admin
/// command.
@ -2282,6 +2292,42 @@ void TRI_InitializeErrorMessages ();
#define TRI_ERROR_CLUSTER_COULD_NOT_DROP_INDEX_IN_PLAN (1483)
////////////////////////////////////////////////////////////////////////////////
/// @brief 1484: ERROR_CLUSTER_CHAIN_OF_DISTRIBUTESHARDSLIKE
///
/// chain of distributeShardsLike references
///
/// Will be raised if one tries to create a collection with a
/// distributeShardsLike attribute which points to another collection that also
/// has one.
////////////////////////////////////////////////////////////////////////////////
#define TRI_ERROR_CLUSTER_CHAIN_OF_DISTRIBUTESHARDSLIKE (1484)
////////////////////////////////////////////////////////////////////////////////
/// @brief 1485: ERROR_CLUSTER_MUST_NOT_DROP_COLL_OTHER_DISTRIBUTESHARDSLIKE
///
/// must not drop collection while another has a distributeShardsLike attribute
/// pointing to it
///
/// Will be raised if one tries to drop a collection to which another
/// collection points with its distributeShardsLike attribute.
////////////////////////////////////////////////////////////////////////////////
#define TRI_ERROR_CLUSTER_MUST_NOT_DROP_COLL_OTHER_DISTRIBUTESHARDSLIKE (1485)
////////////////////////////////////////////////////////////////////////////////
/// @brief 1486: ERROR_CLUSTER_UNKNOWN_DISTRIBUTESHARDSLIKE
///
/// must not have a distributeShardsLike attribute pointing to an unknown
/// collection
///
/// Will be raised if one tries to create a collection which points to an
/// unknown collection in its distributeShardsLike attribute.
////////////////////////////////////////////////////////////////////////////////
#define TRI_ERROR_CLUSTER_UNKNOWN_DISTRIBUTESHARDSLIKE (1486)
////////////////////////////////////////////////////////////////////////////////
/// @brief 1500: ERROR_QUERY_KILLED
///