mirror of https://gitee.com/bigwinds/arangodb
Bug fix 3.4/fix races in collection creation (#9504)
This commit is contained in:
parent
d8955bbb29
commit
1a812b4b4f
|
@ -1,15 +1,18 @@
|
|||
v3.4.8 (XXXX-XX-XX)
|
||||
-------------------
|
||||
|
||||
* Fixed some races in cluster collection creation, which allowed collections with the
|
||||
same name to be created in parallel under some rare conditions.
|
||||
|
||||
* arangoimport would not stop, much less report, communications errors. Add CSV reporting
|
||||
of line numbers that are impacted during such errors
|
||||
of line numbers that are impacted during such errors.
|
||||
|
||||
* Fixed a bug which could lead to some unnecessary HTTP requests during an AQL query in a cluster.
|
||||
Only occurs with views in the query.
|
||||
|
||||
* Prevent rare cases of duplicate DDL actions being executed by Maintenance.
|
||||
|
||||
* coordinator code was reporting rocksdb error codes, but not the associated detail message.
|
||||
* Coordinator code was reporting rocksdb error codes, but not the associated detail message.
|
||||
Corrected.
|
||||
|
||||
* Fixed some error reporting and logging in Maintenance.
|
||||
|
|
|
@ -277,6 +277,20 @@ void ClusterInfo::cleanup() {
|
|||
theInstance->_currentCollections.clear();
|
||||
}
|
||||
|
||||
/// @brief produces an agency dump and logs it
|
||||
void ClusterInfo::logAgencyDump() const {
|
||||
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
|
||||
AgencyComm ac;
|
||||
AgencyCommResult ag = ac.getValues("/");
|
||||
|
||||
if (ag.successful()) {
|
||||
LOG_TOPIC(INFO, Logger::CLUSTER) << "Agency dump:\n" << ag.slice().toJson();
|
||||
} else {
|
||||
LOG_TOPIC(WARN, Logger::CLUSTER) << "Could not get agency dump!";
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief increase the uniqid value. if it exceeds the upper bound, fetch a
|
||||
/// new upper bound value from the agency
|
||||
|
@ -1529,14 +1543,7 @@ int ClusterInfo::dropDatabaseCoordinator(std::string const& name,
|
|||
}
|
||||
|
||||
if (TRI_microtime() > endTime) {
|
||||
AgencyCommResult ag = ac.getValues("/");
|
||||
if (ag.successful()) {
|
||||
LOG_TOPIC(ERR, Logger::CLUSTER) << "Agency dump:\n"
|
||||
<< ag.slice().toJson();
|
||||
} else {
|
||||
LOG_TOPIC(ERR, Logger::CLUSTER) << "Could not get agency dump!";
|
||||
}
|
||||
|
||||
logAgencyDump();
|
||||
return setErrormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg);
|
||||
}
|
||||
|
||||
|
@ -1561,7 +1568,9 @@ int ClusterInfo::createCollectionCoordinator(
|
|||
std::vector<ClusterCollectionCreationInfo> infos{
|
||||
ClusterCollectionCreationInfo{collectionID, numberOfShards,
|
||||
replicationFactor, waitForReplication, json}};
|
||||
Result res = createCollectionsCoordinator(databaseName, infos, timeout);
|
||||
double const realTimeout = getTimeout(timeout);
|
||||
double const endTime = TRI_microtime() + realTimeout;
|
||||
Result res = createCollectionsCoordinator(databaseName, infos, endTime);
|
||||
if (res.fail()) {
|
||||
errorMsg = res.errorMessage();
|
||||
return res.errorNumber();
|
||||
|
@ -1569,28 +1578,23 @@ int ClusterInfo::createCollectionCoordinator(
|
|||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
||||
Result ClusterInfo::createCollectionsCoordinator(std::string const& databaseName,
|
||||
std::vector<ClusterCollectionCreationInfo>& infos,
|
||||
double timeout) {
|
||||
using arangodb::velocypack::Slice;
|
||||
/// @brief this method does an atomic check of the preconditions for the collections
|
||||
/// to be created, using the currently loaded plan. it populates the plan version
|
||||
/// used for the checks
|
||||
Result ClusterInfo::checkCollectionPreconditions(std::string const& databaseName,
|
||||
std::vector<ClusterCollectionCreationInfo> const& infos,
|
||||
uint64_t& planVersion) {
|
||||
READ_LOCKER(readLocker, _planProt.lock);
|
||||
|
||||
planVersion = _planVersion;
|
||||
|
||||
AgencyComm ac;
|
||||
|
||||
double const realTimeout = getTimeout(timeout);
|
||||
double const endTime = TRI_microtime() + realTimeout;
|
||||
double const interval = getPollInterval();
|
||||
// We need to make sure our plan is up to date.
|
||||
LOG_TOPIC(DEBUG, Logger::CLUSTER)
|
||||
<< "createCollectionCoordinator, loading Plan from agency...";
|
||||
loadPlan();
|
||||
// No matter how long this will take, we will not ourselfes trigger a plan relaoding.
|
||||
for (auto& info : infos) {
|
||||
for (auto const& info : infos) {
|
||||
// Check if name exists.
|
||||
if (info.name.empty() || !info.json.isObject() || !info.json.get("shards").isObject()) {
|
||||
return TRI_ERROR_BAD_PARAMETER; // must not be empty
|
||||
}
|
||||
READ_LOCKER(readLocker, _planProt.lock);
|
||||
// Validate that his collection does not exist
|
||||
|
||||
// Validate that the collection does not exist in the current plan
|
||||
{
|
||||
AllCollections::const_iterator it = _plannedCollections.find(databaseName);
|
||||
if (it != _plannedCollections.end()) {
|
||||
|
@ -1601,8 +1605,18 @@ Result ClusterInfo::createCollectionsCoordinator(std::string const& databaseName
|
|||
events::CreateCollection(info.name, TRI_ERROR_ARANGO_DUPLICATE_NAME);
|
||||
return TRI_ERROR_ARANGO_DUPLICATE_NAME;
|
||||
}
|
||||
} else {
|
||||
// no collection in plan for this particular database... this may be true for
|
||||
// the first collection created in a db
|
||||
// now check if there is a planned database at least
|
||||
if (_plannedDatabases.find(databaseName) == _plannedDatabases.end()) {
|
||||
// no need to create a collection in a database that is not there (anymore)
|
||||
events::CreateCollection(info.name, TRI_ERROR_ARANGO_DATABASE_NOT_FOUND);
|
||||
return TRI_ERROR_ARANGO_DATABASE_NOT_FOUND;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Validate that there is no view with this name either
|
||||
{
|
||||
// check against planned views as well
|
||||
|
@ -1618,21 +1632,23 @@ Result ClusterInfo::createCollectionsCoordinator(std::string const& databaseName
|
|||
}
|
||||
}
|
||||
|
||||
LOG_TOPIC(DEBUG, Logger::CLUSTER)
|
||||
<< "createCollectionCoordinator, checking things...";
|
||||
|
||||
// mop: why do these ask the agency instead of checking cluster info?
|
||||
if (!ac.exists("Plan/Databases/" + databaseName)) {
|
||||
events::CreateCollection(info.name, TRI_ERROR_ARANGO_DATABASE_NOT_FOUND);
|
||||
return TRI_ERROR_ARANGO_DATABASE_NOT_FOUND;
|
||||
}
|
||||
|
||||
if (ac.exists("Plan/Collections/" + databaseName + "/" + info.collectionID)) {
|
||||
events::CreateCollection(info.name, TRI_ERROR_CLUSTER_COLLECTION_ID_EXISTS);
|
||||
return TRI_ERROR_CLUSTER_COLLECTION_ID_EXISTS;
|
||||
}
|
||||
}
|
||||
|
||||
return {};
|
||||
}
|
||||
|
||||
/// @brief create multiple collections in coordinator
|
||||
/// If any one of these collections fails, all creations will be
|
||||
/// rolled back.
|
||||
/// Note that in contrast to most other methods here, this method does not
|
||||
/// get a timeout parameter, but an endTime parameter!!!
|
||||
Result ClusterInfo::createCollectionsCoordinator(std::string const& databaseName,
|
||||
std::vector<ClusterCollectionCreationInfo>& infos,
|
||||
double endTime) {
|
||||
using arangodb::velocypack::Slice;
|
||||
|
||||
double const interval = getPollInterval();
|
||||
|
||||
// The following three are used for synchronization between the callback
|
||||
// closure and the main thread executing this function. Note that it can
|
||||
// happen that the callback is called only after we return from this
|
||||
|
@ -1644,8 +1660,9 @@ Result ClusterInfo::createCollectionsCoordinator(std::string const& databaseName
|
|||
auto cacheMutexOwner = std::make_shared<std::atomic<std::thread::id>>();
|
||||
auto isCleaned = std::make_shared<bool>(false);
|
||||
|
||||
AgencyComm ac;
|
||||
std::vector<std::shared_ptr<AgencyCallback>> agencyCallbacks;
|
||||
|
||||
|
||||
auto cbGuard = scopeGuard([&] {
|
||||
// We have a subtle race here, that we try to cover against:
|
||||
// We register a callback in the agency.
|
||||
|
@ -1664,17 +1681,17 @@ Result ClusterInfo::createCollectionsCoordinator(std::string const& databaseName
|
|||
_agencyCallbackRegistry->unregisterCallback(cb);
|
||||
}
|
||||
});
|
||||
std::vector<AgencyOperation> opers({IncreaseVersion()});
|
||||
|
||||
std::vector<AgencyOperation> opers({IncreaseVersion()});
|
||||
std::vector<AgencyPrecondition> precs;
|
||||
std::unordered_set<std::string> conditions;
|
||||
|
||||
|
||||
// current thread owning 'cacheMutex' write lock (workaround for non-recursive Mutex)
|
||||
for (auto& info : infos) {
|
||||
TRI_ASSERT(!info.name.empty());
|
||||
|
||||
if (info.state == ClusterCollectionCreationInfo::State::DONE) {
|
||||
// This is possible in Enterprise / Smart Collection situation
|
||||
// This is possible in Enterprise / Smart collection situation
|
||||
(*nrDone)++;
|
||||
}
|
||||
// The AgencyCallback will copy the closure will take responsibilty of it.
|
||||
|
@ -1825,23 +1842,35 @@ Result ClusterInfo::createCollectionsCoordinator(std::string const& databaseName
|
|||
AgencyPrecondition::Type::EMPTY, true));
|
||||
}
|
||||
}
|
||||
|
||||
// additionally ensure that no such collectionID exists yet in Plan/Collections
|
||||
precs.emplace_back(AgencyPrecondition("Plan/Collections/" + databaseName + "/" + info.collectionID,
|
||||
AgencyPrecondition::Type::EMPTY, true));
|
||||
}
|
||||
|
||||
// We run a loop here to send the agency transaction, since there might
|
||||
// be a precondition failed, in which case we want to retry for some time:
|
||||
while (true) {
|
||||
if (TRI_microtime() > endTime) {
|
||||
for (auto info : infos) {
|
||||
if (info.state != ClusterCollectionCreationInfo::DONE) {
|
||||
LOG_TOPIC(ERR, Logger::CLUSTER)
|
||||
<< "Timeout in _create collection"
|
||||
<< ": database: " << databaseName << ", collId:" << info.collectionID
|
||||
<< "\njson: " << info.json.toString()
|
||||
<< "\ncould not send transaction to agency.";
|
||||
}
|
||||
}
|
||||
return TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION_IN_PLAN;
|
||||
}
|
||||
// We need to make sure our plan is up to date.
|
||||
LOG_TOPIC(DEBUG, Logger::CLUSTER)
|
||||
<< "createCollectionCoordinator, loading Plan from agency...";
|
||||
|
||||
// load the plan, so we are up-to-date
|
||||
loadPlan();
|
||||
uint64_t planVersion = 0; // will be populated by following function call
|
||||
Result res = checkCollectionPreconditions(databaseName, infos, planVersion);
|
||||
if (res.fail()) {
|
||||
return res;
|
||||
}
|
||||
|
||||
|
||||
// now try to update the plan in the agency, using the current plan version as our
|
||||
// precondition
|
||||
{
|
||||
// create a builder with just the version number for comparison
|
||||
VPackBuilder versionBuilder;
|
||||
versionBuilder.add(VPackValue(planVersion));
|
||||
|
||||
// add a precondition that checks the plan version has not yet changed
|
||||
precs.emplace_back(AgencyPrecondition("Plan/Version", AgencyPrecondition::Type::VALUE, versionBuilder.slice()));
|
||||
|
||||
AgencyWriteTransaction transaction(opers, precs);
|
||||
|
||||
{ // we hold this mutex from now on until we have updated our cache
|
||||
|
@ -1853,61 +1882,28 @@ Result ClusterInfo::createCollectionsCoordinator(std::string const& databaseName
|
|||
// Only if not precondition failed
|
||||
if (!res.successful()) {
|
||||
if (res.httpCode() == (int)arangodb::rest::ResponseCode::PRECONDITION_FAILED) {
|
||||
auto result = res.slice();
|
||||
AgencyCommResult ag = ac.getValues("/");
|
||||
|
||||
if (result.isArray() && result.length() > 0) {
|
||||
if (result[0].isObject()) {
|
||||
auto tres = result[0];
|
||||
std::string errorMsg = "";
|
||||
if (tres.hasKey(std::vector<std::string>(
|
||||
{AgencyCommManager::path(), "Supervision"}))) {
|
||||
for (const auto& s : VPackObjectIterator(tres.get(
|
||||
std::vector<std::string>({AgencyCommManager::path(),
|
||||
"Supervision", "Shards"})))) {
|
||||
errorMsg += std::string("Shard ") + s.key.copyString();
|
||||
errorMsg +=
|
||||
" of prototype collection is blocked by supervision job ";
|
||||
errorMsg += s.value.copyString();
|
||||
}
|
||||
}
|
||||
return {TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION_IN_PLAN,
|
||||
std::move(errorMsg)};
|
||||
}
|
||||
}
|
||||
|
||||
LOG_TOPIC(ERR, Logger::CLUSTER)
|
||||
<< "Precondition failed for this agency transaction: "
|
||||
<< transaction.toJson() << ", return code: " << res.httpCode();
|
||||
if (ag.successful()) {
|
||||
LOG_TOPIC(ERR, Logger::CLUSTER) << "Agency dump:\n"
|
||||
<< ag.slice().toJson();
|
||||
} else {
|
||||
LOG_TOPIC(ERR, Logger::CLUSTER) << "Could not get agency dump!";
|
||||
}
|
||||
// Agency is currently unhappy, try again in a few seconds:
|
||||
std::this_thread::sleep_for(std::chrono::seconds(5));
|
||||
continue;
|
||||
} else {
|
||||
std::string errorMsg = "";
|
||||
errorMsg += std::string("file: ") + __FILE__ + " line: " + std::to_string(__LINE__);
|
||||
errorMsg += " HTTP code: " + std::to_string(res.httpCode());
|
||||
errorMsg += " error message: " + res.errorMessage();
|
||||
errorMsg += " error details: " + res.errorDetails();
|
||||
errorMsg += " body: " + res.body();
|
||||
for (auto const& info : infos) {
|
||||
events::CreateCollection(info.name, TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION_IN_PLAN);
|
||||
}
|
||||
return {TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION_IN_PLAN, std::move(errorMsg)};
|
||||
// use this special error code to signal that we got a precondition failure
|
||||
// in this case the caller can try again with an updated version of the plan change
|
||||
return {TRI_ERROR_REQUEST_CANCELED, "operation aborted due to precondition failure"};
|
||||
}
|
||||
|
||||
std::string errorMsg = "HTTP code: " + std::to_string(res.httpCode());
|
||||
errorMsg += " error message: " + res.errorMessage();
|
||||
errorMsg += " error details: " + res.errorDetails();
|
||||
errorMsg += " body: " + res.body();
|
||||
for (auto const& info : infos) {
|
||||
events::CreateCollection(info.name, TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION_IN_PLAN);
|
||||
}
|
||||
return {TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION_IN_PLAN, std::move(errorMsg)};
|
||||
}
|
||||
|
||||
// Update our cache:
|
||||
loadPlan();
|
||||
}
|
||||
break; // Leave loop, since we are done
|
||||
}
|
||||
|
||||
// if we got here, the plan was updated successfully
|
||||
|
||||
LOG_TOPIC(DEBUG, Logger::CLUSTER)
|
||||
<< "createCollectionCoordinator, Plan changed, waiting for success...";
|
||||
|
||||
|
@ -1922,15 +1918,7 @@ Result ClusterInfo::createCollectionsCoordinator(std::string const& databaseName
|
|||
}
|
||||
|
||||
// Get a full agency dump for debugging
|
||||
{
|
||||
AgencyCommResult ag = ac.getValues("");
|
||||
if (ag.successful()) {
|
||||
LOG_TOPIC(ERR, Logger::CLUSTER) << "Agency dump:\n"
|
||||
<< ag.slice().toJson();
|
||||
} else {
|
||||
LOG_TOPIC(ERR, Logger::CLUSTER) << "Could not get agency dump!";
|
||||
}
|
||||
}
|
||||
logAgencyDump();
|
||||
|
||||
if (tmpRes <= TRI_ERROR_NO_ERROR) {
|
||||
tmpRes = TRI_ERROR_CLUSTER_TIMEOUT;
|
||||
|
@ -1956,7 +1944,6 @@ Result ClusterInfo::createCollectionsCoordinator(std::string const& databaseName
|
|||
precs.emplace_back(CreateCollectionOrderPrecondition(databaseName, info.collectionID,
|
||||
info.isBuildingSlice()));
|
||||
}
|
||||
// TODO: Should we use preconditions?
|
||||
|
||||
AgencyWriteTransaction transaction(opers, precs);
|
||||
|
||||
|
@ -2114,15 +2101,27 @@ int ClusterInfo::dropCollectionCoordinator(std::string const& dbName,
|
|||
"/shards");
|
||||
|
||||
if (res.successful()) {
|
||||
velocypack::Slice shards = res.slice()[0].get(std::vector<std::string>(
|
||||
{AgencyCommManager::path(), "Plan", "Collections", dbName, collectionID,
|
||||
"shards"}));
|
||||
if (shards.isObject()) {
|
||||
numberOfShards = shards.length();
|
||||
velocypack::Slice databaseSlice = res.slice()[0].get(std::vector<std::string>(
|
||||
{AgencyCommManager::path(), "Plan", "Collections", dbName }));
|
||||
|
||||
if (!databaseSlice.isObject()) {
|
||||
// database dropped in the meantime
|
||||
return TRI_ERROR_ARANGO_DATABASE_NOT_FOUND;
|
||||
}
|
||||
|
||||
velocypack::Slice collectionSlice = databaseSlice.get(collectionID);
|
||||
if (!collectionSlice.isObject()) {
|
||||
// collection dropped in the meantime
|
||||
return TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND;
|
||||
}
|
||||
|
||||
velocypack::Slice shardsSlice = collectionSlice.get("shards");
|
||||
if (shardsSlice.isObject()) {
|
||||
numberOfShards = shardsSlice.length();
|
||||
} else {
|
||||
LOG_TOPIC(ERR, Logger::CLUSTER)
|
||||
<< "Missing shards information on dropping " << dbName << "/" << collectionID;
|
||||
return TRI_ERROR_ARANGO_DATABASE_NOT_FOUND;
|
||||
return TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2141,13 +2140,10 @@ int ClusterInfo::dropCollectionCoordinator(std::string const& dbName,
|
|||
<< "Precondition failed for this agency transaction: " << trans.toJson()
|
||||
<< ", return code: " << res.httpCode();
|
||||
}
|
||||
AgencyCommResult ag = ac.getValues("");
|
||||
if (ag.successful()) {
|
||||
LOG_TOPIC(ERR, Logger::CLUSTER) << "Agency dump:\n"
|
||||
<< ag.slice().toJson();
|
||||
} else {
|
||||
LOG_TOPIC(ERR, Logger::CLUSTER) << "Could not get agency dump!";
|
||||
}
|
||||
|
||||
logAgencyDump();
|
||||
// TODO: this should rather be TRI_ERROR_ARANGO_DATABASE_NOT_FOUND, as the
|
||||
// precondition is that the database still exists
|
||||
return TRI_ERROR_CLUSTER_COULD_NOT_DROP_COLLECTION;
|
||||
}
|
||||
|
||||
|
@ -2181,13 +2177,8 @@ int ClusterInfo::dropCollectionCoordinator(std::string const& dbName,
|
|||
<< "Timeout in _drop collection (" << realTimeout << ")"
|
||||
<< ": database: " << dbName << ", collId:" << collectionID
|
||||
<< "\ntransaction sent to agency: " << trans.toJson();
|
||||
AgencyCommResult ag = ac.getValues("");
|
||||
if (ag.successful()) {
|
||||
LOG_TOPIC(ERR, Logger::CLUSTER) << "Agency dump:\n"
|
||||
<< ag.slice().toJson();
|
||||
} else {
|
||||
LOG_TOPIC(ERR, Logger::CLUSTER) << "Could not get agency dump!";
|
||||
}
|
||||
|
||||
logAgencyDump();
|
||||
events::DropCollection(collectionID, TRI_ERROR_CLUSTER_TIMEOUT);
|
||||
return setErrormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg);
|
||||
}
|
||||
|
@ -2337,14 +2328,7 @@ int ClusterInfo::createViewCoordinator(std::string const& databaseName,
|
|||
viewID + " does not yet exist failed. Cannot create view.";
|
||||
|
||||
// Dump agency plan:
|
||||
auto const ag = ac.getValues("/");
|
||||
|
||||
if (ag.successful()) {
|
||||
LOG_TOPIC(ERR, Logger::CLUSTER) << "Agency dump:\n"
|
||||
<< ag.slice().toJson();
|
||||
} else {
|
||||
LOG_TOPIC(ERR, Logger::CLUSTER) << "Could not get agency dump!";
|
||||
}
|
||||
logAgencyDump();
|
||||
|
||||
return TRI_ERROR_CLUSTER_COULD_NOT_CREATE_VIEW_IN_PLAN;
|
||||
} else {
|
||||
|
@ -2395,14 +2379,7 @@ int ClusterInfo::dropViewCoordinator(std::string const& databaseName,
|
|||
errorMsg += " already exist failed. Cannot create view.";
|
||||
|
||||
// Dump agency plan:
|
||||
auto const ag = ac.getValues("/");
|
||||
|
||||
if (ag.successful()) {
|
||||
LOG_TOPIC(ERR, Logger::CLUSTER) << "Agency dump:\n"
|
||||
<< ag.slice().toJson();
|
||||
} else {
|
||||
LOG_TOPIC(ERR, Logger::CLUSTER) << "Could not get agency dump!";
|
||||
}
|
||||
logAgencyDump();
|
||||
} else {
|
||||
errorMsg += std::string("file: ") + __FILE__ + " line: " + std::to_string(__LINE__);
|
||||
errorMsg += " HTTP code: " + std::to_string(res.httpCode());
|
||||
|
@ -2437,14 +2414,7 @@ Result ClusterInfo::setViewPropertiesCoordinator(std::string const& databaseName
|
|||
{AgencyCommManager::path(), "Plan", "Views", databaseName, viewID});
|
||||
|
||||
if (!view.isObject()) {
|
||||
auto const ag = ac.getValues("");
|
||||
|
||||
if (ag.successful()) {
|
||||
LOG_TOPIC(ERR, Logger::CLUSTER) << "Agency dump:\n"
|
||||
<< ag.slice().toJson();
|
||||
} else {
|
||||
LOG_TOPIC(ERR, Logger::CLUSTER) << "Could not get agency dump!";
|
||||
}
|
||||
logAgencyDump();
|
||||
|
||||
return {TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND};
|
||||
}
|
||||
|
|
|
@ -270,6 +270,9 @@ class ClusterInfo {
|
|||
static void cleanup();
|
||||
|
||||
public:
|
||||
/// @brief produces an agency dump and logs it
|
||||
void logAgencyDump() const;
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief get a number of cluster-wide unique IDs, returns the first
|
||||
/// one and guarantees that <number> are reserved for the caller.
|
||||
|
@ -388,19 +391,22 @@ class ClusterInfo {
|
|||
arangodb::velocypack::Slice const& json,
|
||||
std::string& errorMsg, double timeout);
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief this method does an atomic check of the preconditions for the collections
|
||||
/// to be created, using the currently loaded plan. it populates the plan version
|
||||
/// used for the checks
|
||||
Result checkCollectionPreconditions(std::string const& databaseName,
|
||||
std::vector<ClusterCollectionCreationInfo> const& infos,
|
||||
uint64_t& planVersion);
|
||||
|
||||
/// @brief create multiple collections in coordinator
|
||||
/// If any one of these collections fails, all creations will be
|
||||
/// rolled back.
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
/// Note that in contrast to most other methods here, this method does not
|
||||
/// get a timeout parameter, but an endTime parameter!!!
|
||||
Result createCollectionsCoordinator(std::string const& databaseName,
|
||||
std::vector<ClusterCollectionCreationInfo>&, double timeout);
|
||||
std::vector<ClusterCollectionCreationInfo>&, double endTime);
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief drop collection in coordinator
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int dropCollectionCoordinator(std::string const& databaseName, std::string const& collectionID,
|
||||
std::string& errorMsg, double timeout);
|
||||
|
||||
|
@ -612,21 +618,21 @@ class ClusterInfo {
|
|||
* @return List of DB servers serving the shard
|
||||
*/
|
||||
arangodb::Result getShardServers(ShardID const& shardId, std::vector<ServerID>&);
|
||||
|
||||
private:
|
||||
void loadClusterId();
|
||||
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief get an operation timeout
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
double getTimeout(double timeout) const {
|
||||
static double getTimeout(double timeout) {
|
||||
if (timeout == 0.0) {
|
||||
return 24.0 * 3600.0;
|
||||
}
|
||||
return timeout;
|
||||
}
|
||||
|
||||
private:
|
||||
void loadClusterId();
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief get the poll interval
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
|
|
@ -22,6 +22,7 @@
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#include "ClusterMethods.h"
|
||||
#include "ApplicationFeatures/ApplicationServer.h"
|
||||
#include "Basics/NumberUtils.h"
|
||||
#include "Basics/StaticStrings.h"
|
||||
#include "Basics/StringRef.h"
|
||||
|
@ -2567,107 +2568,147 @@ std::vector<std::shared_ptr<LogicalCollection>> ClusterMethods::persistCollectio
|
|||
std::vector<std::shared_ptr<LogicalCollection>>& collections,
|
||||
bool ignoreDistributeShardsLikeErrors, bool waitForSyncReplication,
|
||||
bool enforceReplicationFactor) {
|
||||
|
||||
TRI_ASSERT(!collections.empty());
|
||||
if (collections.empty()) {
|
||||
THROW_ARANGO_EXCEPTION_MESSAGE(
|
||||
TRI_ERROR_INTERNAL,
|
||||
"Trying to create an empty list of collections on coordinator.");
|
||||
}
|
||||
|
||||
double const realTimeout = ClusterInfo::getTimeout(240.0);
|
||||
double const endTime = TRI_microtime() + realTimeout;
|
||||
|
||||
// We have at least one, take this collections DB name
|
||||
auto& dbName = collections[0]->vocbase().name();
|
||||
auto const dbName = collections[0]->vocbase().name();
|
||||
ClusterInfo* ci = ClusterInfo::instance();
|
||||
ci->loadCurrentDBServers();
|
||||
std::vector<std::string> dbServers = ci->getCurrentDBServers();
|
||||
|
||||
std::vector<ClusterCollectionCreationInfo> infos;
|
||||
std::vector<std::shared_ptr<VPackBuffer<uint8_t>>> vpackData;
|
||||
infos.reserve(collections.size());
|
||||
vpackData.reserve(collections.size());
|
||||
for (auto& col : collections) {
|
||||
// We can only serve on Database at a time with this call.
|
||||
// We have the vocbase context around this calls anyways, so this is save.
|
||||
TRI_ASSERT(col->vocbase().name() == dbName);
|
||||
std::string distributeShardsLike = col->distributeShardsLike();
|
||||
std::vector<std::string> avoid = col->avoidServers();
|
||||
std::shared_ptr<std::unordered_map<std::string, std::vector<std::string>>> shards = nullptr;
|
||||
|
||||
if (!distributeShardsLike.empty()) {
|
||||
CollectionNameResolver resolver(col->vocbase());
|
||||
TRI_voc_cid_t otherCid = resolver.getCollectionIdCluster(distributeShardsLike);
|
||||
while (true) {
|
||||
infos.clear();
|
||||
|
||||
if (otherCid != 0) {
|
||||
shards = CloneShardDistribution(ci, col.get(), otherCid);
|
||||
ci->loadCurrentDBServers();
|
||||
std::vector<std::string> dbServers = ci->getCurrentDBServers();
|
||||
infos.reserve(collections.size());
|
||||
|
||||
std::vector<std::shared_ptr<VPackBuffer<uint8_t>>> vpackData;
|
||||
vpackData.reserve(collections.size());
|
||||
for (auto& col : collections) {
|
||||
// We can only serve on Database at a time with this call.
|
||||
// We have the vocbase context around this calls anyways, so this is save.
|
||||
TRI_ASSERT(col->vocbase().name() == dbName);
|
||||
std::string distributeShardsLike = col->distributeShardsLike();
|
||||
std::vector<std::string> avoid = col->avoidServers();
|
||||
std::shared_ptr<std::unordered_map<std::string, std::vector<std::string>>> shards = nullptr;
|
||||
|
||||
if (!distributeShardsLike.empty()) {
|
||||
CollectionNameResolver resolver(col->vocbase());
|
||||
TRI_voc_cid_t otherCid = resolver.getCollectionIdCluster(distributeShardsLike);
|
||||
|
||||
if (otherCid != 0) {
|
||||
shards = CloneShardDistribution(ci, col.get(), otherCid);
|
||||
} else {
|
||||
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_CLUSTER_UNKNOWN_DISTRIBUTESHARDSLIKE,
|
||||
"Could not find collection " + distributeShardsLike +
|
||||
" to distribute shards like it.");
|
||||
}
|
||||
} else {
|
||||
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_CLUSTER_UNKNOWN_DISTRIBUTESHARDSLIKE,
|
||||
"Could not find collection " + distributeShardsLike +
|
||||
" to distribute shards like it.");
|
||||
}
|
||||
} else {
|
||||
// system collections should never enforce replicationfactor
|
||||
// to allow them to come up with 1 dbserver
|
||||
if (col->system()) {
|
||||
enforceReplicationFactor = false;
|
||||
}
|
||||
// system collections should never enforce replicationfactor
|
||||
// to allow them to come up with 1 dbserver
|
||||
if (col->system()) {
|
||||
enforceReplicationFactor = false;
|
||||
}
|
||||
|
||||
size_t replicationFactor = col->replicationFactor();
|
||||
size_t numberOfShards = col->numberOfShards();
|
||||
size_t replicationFactor = col->replicationFactor();
|
||||
size_t numberOfShards = col->numberOfShards();
|
||||
|
||||
// the default behaviour however is to bail out and inform the user
|
||||
// that the requested replicationFactor is not possible right now
|
||||
if (dbServers.size() < replicationFactor) {
|
||||
LOG_TOPIC(DEBUG, Logger::CLUSTER)
|
||||
// the default behaviour however is to bail out and inform the user
|
||||
// that the requested replicationFactor is not possible right now
|
||||
if (dbServers.size() < replicationFactor) {
|
||||
LOG_TOPIC(DEBUG, Logger::CLUSTER)
|
||||
<< "Do not have enough DBServers for requested replicationFactor,"
|
||||
<< " nrDBServers: " << dbServers.size()
|
||||
<< " replicationFactor: " << replicationFactor;
|
||||
if (enforceReplicationFactor) {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_INSUFFICIENT_DBSERVERS);
|
||||
if (enforceReplicationFactor) {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_INSUFFICIENT_DBSERVERS);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!avoid.empty()) {
|
||||
// We need to remove all servers that are in the avoid list
|
||||
if (dbServers.size() - avoid.size() < replicationFactor) {
|
||||
LOG_TOPIC(DEBUG, Logger::CLUSTER)
|
||||
if (!avoid.empty()) {
|
||||
// We need to remove all servers that are in the avoid list
|
||||
if (dbServers.size() - avoid.size() < replicationFactor) {
|
||||
LOG_TOPIC(DEBUG, Logger::CLUSTER)
|
||||
<< "Do not have enough DBServers for requested replicationFactor,"
|
||||
<< " (after considering avoid list),"
|
||||
<< " nrDBServers: " << dbServers.size() << " replicationFactor: " << replicationFactor
|
||||
<< " avoid list size: " << avoid.size();
|
||||
// Not enough DBServers left
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_INSUFFICIENT_DBSERVERS);
|
||||
// Not enough DBServers left
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_INSUFFICIENT_DBSERVERS);
|
||||
}
|
||||
dbServers.erase(std::remove_if(dbServers.begin(), dbServers.end(),
|
||||
[&](const std::string& x) {
|
||||
return std::find(avoid.begin(), avoid.end(),
|
||||
x) != avoid.end();
|
||||
}),
|
||||
dbServers.end());
|
||||
}
|
||||
dbServers.erase(std::remove_if(dbServers.begin(), dbServers.end(),
|
||||
[&](const std::string& x) {
|
||||
return std::find(avoid.begin(), avoid.end(),
|
||||
x) != avoid.end();
|
||||
}),
|
||||
dbServers.end());
|
||||
std::random_shuffle(dbServers.begin(), dbServers.end());
|
||||
shards = DistributeShardsEvenly(ci, numberOfShards, replicationFactor,
|
||||
dbServers, !col->system());
|
||||
}
|
||||
std::random_shuffle(dbServers.begin(), dbServers.end());
|
||||
shards = DistributeShardsEvenly(ci, numberOfShards, replicationFactor,
|
||||
dbServers, !col->system());
|
||||
}
|
||||
|
||||
if (shards->empty() && !col->isSmart()) {
|
||||
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
|
||||
"no database servers found in cluster");
|
||||
}
|
||||
if (shards->empty() && !col->isSmart()) {
|
||||
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
|
||||
"no database servers found in cluster");
|
||||
}
|
||||
|
||||
col->setShardMap(shards);
|
||||
col->setShardMap(shards);
|
||||
|
||||
std::unordered_set<std::string> const ignoreKeys{
|
||||
std::unordered_set<std::string> const ignoreKeys{
|
||||
"allowUserKeys", "cid", "globallyUniqueId", "count",
|
||||
"planId", "version", "objectId"};
|
||||
col->setStatus(TRI_VOC_COL_STATUS_LOADED);
|
||||
VPackBuilder velocy = col->toVelocyPackIgnore(ignoreKeys, false, false);
|
||||
"planId", "version", "objectId"};
|
||||
col->setStatus(TRI_VOC_COL_STATUS_LOADED);
|
||||
VPackBuilder velocy = col->toVelocyPackIgnore(ignoreKeys, false, false);
|
||||
|
||||
infos.emplace_back(
|
||||
ClusterCollectionCreationInfo{std::to_string(col->id()),
|
||||
col->numberOfShards(), col->replicationFactor(),
|
||||
waitForSyncReplication, velocy.slice()});
|
||||
vpackData.emplace_back(velocy.steal());
|
||||
}
|
||||
Result res = ci->createCollectionsCoordinator(dbName, infos, 240.0);
|
||||
if (res.fail()) {
|
||||
THROW_ARANGO_EXCEPTION(res);
|
||||
infos.emplace_back(
|
||||
ClusterCollectionCreationInfo{std::to_string(col->id()),
|
||||
col->numberOfShards(), col->replicationFactor(),
|
||||
waitForSyncReplication, velocy.slice()});
|
||||
vpackData.emplace_back(velocy.steal());
|
||||
}
|
||||
|
||||
// pass in the *endTime* here, not a timeout!
|
||||
Result res = ci->createCollectionsCoordinator(dbName, infos, endTime);
|
||||
|
||||
if (res.ok()) {
|
||||
// success! exit the loop and go on
|
||||
break;
|
||||
}
|
||||
|
||||
if (res.is(TRI_ERROR_REQUEST_CANCELED)) {
|
||||
// special error code indicating that storing the updated plan in the agency
|
||||
// didn't succeed, and that we should try again
|
||||
|
||||
// sleep for a while
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
|
||||
if (TRI_microtime() > endTime) {
|
||||
// timeout expired
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_TIMEOUT);
|
||||
}
|
||||
|
||||
if (arangodb::application_features::ApplicationServer::isStopping()) {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_SHUTTING_DOWN);
|
||||
}
|
||||
|
||||
// try in next iteration with an adjusted plan change attempt
|
||||
continue;
|
||||
|
||||
} else {
|
||||
// any other error
|
||||
THROW_ARANGO_EXCEPTION(res);
|
||||
}
|
||||
}
|
||||
|
||||
// This is no longer necessary, since we load the Plan in
|
||||
|
|
Loading…
Reference in New Issue