mirror of https://gitee.com/bigwinds/arangodb
Bug fix/fix races in collection creation (#9506)
This commit is contained in:
parent
36b1d290a9
commit
cdbe63fa6e
|
@ -224,6 +224,20 @@ void ClusterInfo::cleanup() {
|
||||||
theInstance->_currentCollections.clear();
|
theInstance->_currentCollections.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// @brief produces an agency dump and logs it
|
||||||
|
void ClusterInfo::logAgencyDump() const {
|
||||||
|
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
|
||||||
|
AgencyComm ac;
|
||||||
|
AgencyCommResult ag = ac.getValues("/");
|
||||||
|
|
||||||
|
if (ag.successful()) {
|
||||||
|
LOG_TOPIC("fe8ce", INFO, Logger::CLUSTER) << "Agency dump:\n" << ag.slice().toJson();
|
||||||
|
} else {
|
||||||
|
LOG_TOPIC("e7e30", WARN, Logger::CLUSTER) << "Could not get agency dump!";
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
/// @brief increase the uniqid value. if it exceeds the upper bound, fetch a
|
/// @brief increase the uniqid value. if it exceeds the upper bound, fetch a
|
||||||
/// new upper bound value from the agency
|
/// new upper bound value from the agency
|
||||||
|
@ -1568,16 +1582,7 @@ Result ClusterInfo::dropDatabaseCoordinator( // drop database
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TRI_microtime() > endTime) {
|
if (TRI_microtime() > endTime) {
|
||||||
AgencyCommResult ag = ac.getValues("/");
|
logAgencyDump();
|
||||||
|
|
||||||
if (ag.successful()) {
|
|
||||||
LOG_TOPIC("fe8ce", ERR, Logger::CLUSTER) << "Agency dump:\n"
|
|
||||||
<< ag.slice().toJson();
|
|
||||||
} else {
|
|
||||||
LOG_TOPIC("e7e30", ERR, Logger::CLUSTER)
|
|
||||||
<< "Could not get agency dump!";
|
|
||||||
}
|
|
||||||
|
|
||||||
return Result(TRI_ERROR_CLUSTER_TIMEOUT);
|
return Result(TRI_ERROR_CLUSTER_TIMEOUT);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1604,30 +1609,28 @@ Result ClusterInfo::createCollectionCoordinator( // create collection
|
||||||
std::vector<ClusterCollectionCreationInfo> infos{
|
std::vector<ClusterCollectionCreationInfo> infos{
|
||||||
ClusterCollectionCreationInfo{collectionID, numberOfShards, replicationFactor,
|
ClusterCollectionCreationInfo{collectionID, numberOfShards, replicationFactor,
|
||||||
minReplicationFactor, waitForReplication, json}};
|
minReplicationFactor, waitForReplication, json}};
|
||||||
return createCollectionsCoordinator(databaseName, infos, timeout);
|
|
||||||
}
|
|
||||||
|
|
||||||
Result ClusterInfo::createCollectionsCoordinator(std::string const& databaseName,
|
|
||||||
std::vector<ClusterCollectionCreationInfo>& infos,
|
|
||||||
double timeout) {
|
|
||||||
using arangodb::velocypack::Slice;
|
|
||||||
|
|
||||||
AgencyComm ac;
|
|
||||||
double const realTimeout = getTimeout(timeout);
|
double const realTimeout = getTimeout(timeout);
|
||||||
double const endTime = TRI_microtime() + realTimeout;
|
double const endTime = TRI_microtime() + realTimeout;
|
||||||
double const interval = getPollInterval();
|
return createCollectionsCoordinator(databaseName, infos, endTime);
|
||||||
// We need to make sure our plan is up to date.
|
}
|
||||||
LOG_TOPIC("4315c", DEBUG, Logger::CLUSTER)
|
|
||||||
<< "createCollectionCoordinator, loading Plan from agency...";
|
/// @brief this method does an atomic check of the preconditions for the collections
|
||||||
loadPlan();
|
/// to be created, using the currently loaded plan. it populates the plan version
|
||||||
// No matter how long this will take, we will not ourselfes trigger a plan relaoding.
|
/// used for the checks
|
||||||
for (auto& info : infos) {
|
Result ClusterInfo::checkCollectionPreconditions(std::string const& databaseName,
|
||||||
|
std::vector<ClusterCollectionCreationInfo> const& infos,
|
||||||
|
uint64_t& planVersion) {
|
||||||
|
READ_LOCKER(readLocker, _planProt.lock);
|
||||||
|
|
||||||
|
planVersion = _planVersion;
|
||||||
|
|
||||||
|
for (auto const& info : infos) {
|
||||||
// Check if name exists.
|
// Check if name exists.
|
||||||
if (info.name.empty() || !info.json.isObject() || !info.json.get("shards").isObject()) {
|
if (info.name.empty() || !info.json.isObject() || !info.json.get("shards").isObject()) {
|
||||||
return TRI_ERROR_BAD_PARAMETER; // must not be empty
|
return TRI_ERROR_BAD_PARAMETER; // must not be empty
|
||||||
}
|
}
|
||||||
READ_LOCKER(readLocker, _planProt.lock);
|
|
||||||
// Validate that his collection does not exist
|
// Validate that the collection does not exist in the current plan
|
||||||
{
|
{
|
||||||
AllCollections::const_iterator it = _plannedCollections.find(databaseName);
|
AllCollections::const_iterator it = _plannedCollections.find(databaseName);
|
||||||
if (it != _plannedCollections.end()) {
|
if (it != _plannedCollections.end()) {
|
||||||
|
@ -1636,10 +1639,20 @@ Result ClusterInfo::createCollectionsCoordinator(std::string const& databaseName
|
||||||
if (it2 != (*it).second.end()) {
|
if (it2 != (*it).second.end()) {
|
||||||
// collection already exists!
|
// collection already exists!
|
||||||
events::CreateCollection(databaseName, info.name, TRI_ERROR_ARANGO_DUPLICATE_NAME);
|
events::CreateCollection(databaseName, info.name, TRI_ERROR_ARANGO_DUPLICATE_NAME);
|
||||||
return Result(TRI_ERROR_ARANGO_DUPLICATE_NAME);
|
return TRI_ERROR_ARANGO_DUPLICATE_NAME;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// no collection in plan for this particular database... this may be true for
|
||||||
|
// the first collection created in a db
|
||||||
|
// now check if there is a planned database at least
|
||||||
|
if (_plannedDatabases.find(databaseName) == _plannedDatabases.end()) {
|
||||||
|
// no need to create a collection in a database that is not there (anymore)
|
||||||
|
events::CreateCollection(databaseName, info.name, TRI_ERROR_ARANGO_DATABASE_NOT_FOUND);
|
||||||
|
return TRI_ERROR_ARANGO_DATABASE_NOT_FOUND;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Validate that there is no view with this name either
|
// Validate that there is no view with this name either
|
||||||
{
|
{
|
||||||
// check against planned views as well
|
// check against planned views as well
|
||||||
|
@ -1650,26 +1663,23 @@ Result ClusterInfo::createCollectionsCoordinator(std::string const& databaseName
|
||||||
if (it2 != (*it).second.end()) {
|
if (it2 != (*it).second.end()) {
|
||||||
// view already exists!
|
// view already exists!
|
||||||
events::CreateCollection(databaseName, info.name, TRI_ERROR_ARANGO_DUPLICATE_NAME);
|
events::CreateCollection(databaseName, info.name, TRI_ERROR_ARANGO_DUPLICATE_NAME);
|
||||||
return Result(TRI_ERROR_ARANGO_DUPLICATE_NAME);
|
return TRI_ERROR_ARANGO_DUPLICATE_NAME;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG_TOPIC("66541", DEBUG, Logger::CLUSTER)
|
|
||||||
<< "createCollectionCoordinator, checking things...";
|
|
||||||
|
|
||||||
// mop: why do these ask the agency instead of checking cluster info?
|
|
||||||
if (!ac.exists("Plan/Databases/" + databaseName)) {
|
|
||||||
events::CreateCollection(databaseName, info.name, TRI_ERROR_ARANGO_DATABASE_NOT_FOUND);
|
|
||||||
return TRI_ERROR_ARANGO_DATABASE_NOT_FOUND;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ac.exists("Plan/Collections/" + databaseName + "/" + info.collectionID)) {
|
return {};
|
||||||
events::CreateCollection(databaseName, info.name, TRI_ERROR_CLUSTER_COLLECTION_ID_EXISTS);
|
|
||||||
return TRI_ERROR_CLUSTER_COLLECTION_ID_EXISTS;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Result ClusterInfo::createCollectionsCoordinator(std::string const& databaseName,
|
||||||
|
std::vector<ClusterCollectionCreationInfo>& infos,
|
||||||
|
double endTime) {
|
||||||
|
using arangodb::velocypack::Slice;
|
||||||
|
|
||||||
|
double const interval = getPollInterval();
|
||||||
|
|
||||||
// The following three are used for synchronization between the callback
|
// The following three are used for synchronization between the callback
|
||||||
// closure and the main thread executing this function. Note that it can
|
// closure and the main thread executing this function. Note that it can
|
||||||
// happen that the callback is called only after we return from this
|
// happen that the callback is called only after we return from this
|
||||||
|
@ -1681,6 +1691,7 @@ Result ClusterInfo::createCollectionsCoordinator(std::string const& databaseName
|
||||||
auto cacheMutexOwner = std::make_shared<std::atomic<std::thread::id>>();
|
auto cacheMutexOwner = std::make_shared<std::atomic<std::thread::id>>();
|
||||||
auto isCleaned = std::make_shared<bool>(false);
|
auto isCleaned = std::make_shared<bool>(false);
|
||||||
|
|
||||||
|
AgencyComm ac;
|
||||||
std::vector<std::shared_ptr<AgencyCallback>> agencyCallbacks;
|
std::vector<std::shared_ptr<AgencyCallback>> agencyCallbacks;
|
||||||
|
|
||||||
auto cbGuard = scopeGuard([&] {
|
auto cbGuard = scopeGuard([&] {
|
||||||
|
@ -1701,8 +1712,8 @@ Result ClusterInfo::createCollectionsCoordinator(std::string const& databaseName
|
||||||
_agencyCallbackRegistry->unregisterCallback(cb);
|
_agencyCallbackRegistry->unregisterCallback(cb);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
std::vector<AgencyOperation> opers({IncreaseVersion()});
|
|
||||||
|
|
||||||
|
std::vector<AgencyOperation> opers({IncreaseVersion()});
|
||||||
std::vector<AgencyPrecondition> precs;
|
std::vector<AgencyPrecondition> precs;
|
||||||
std::unordered_set<std::string> conditions;
|
std::unordered_set<std::string> conditions;
|
||||||
|
|
||||||
|
@ -1862,25 +1873,35 @@ Result ClusterInfo::createCollectionsCoordinator(std::string const& databaseName
|
||||||
AgencyPrecondition::Type::EMPTY, true));
|
AgencyPrecondition::Type::EMPTY, true));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// additionally ensure that no such collectionID exists yet in Plan/Collections
|
||||||
|
precs.emplace_back(AgencyPrecondition("Plan/Collections/" + databaseName + "/" + info.collectionID,
|
||||||
|
AgencyPrecondition::Type::EMPTY, true));
|
||||||
}
|
}
|
||||||
|
|
||||||
// We run a loop here to send the agency transaction, since there might
|
// We need to make sure our plan is up to date.
|
||||||
// be a precondition failed, in which case we want to retry for some time:
|
LOG_TOPIC("f4b14", DEBUG, Logger::CLUSTER)
|
||||||
while (true) {
|
<< "createCollectionCoordinator, loading Plan from agency...";
|
||||||
if (TRI_microtime() > endTime) {
|
|
||||||
for (auto const& info : infos) {
|
// load the plan, so we are up-to-date
|
||||||
if (info.state != ClusterCollectionCreationInfo::DONE) {
|
loadPlan();
|
||||||
LOG_TOPIC("a2184", ERR, Logger::CLUSTER)
|
uint64_t planVersion = 0; // will be populated by following function call
|
||||||
<< "Timeout in _create collection"
|
Result res = checkCollectionPreconditions(databaseName, infos, planVersion);
|
||||||
<< ": database: " << databaseName << ", collId:" << info.collectionID
|
if (res.fail()) {
|
||||||
<< "\njson: " << info.json.toString()
|
return res;
|
||||||
<< "\ncould not send transaction to agency.";
|
|
||||||
events::CreateCollection(databaseName, info.name,
|
|
||||||
TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION_IN_PLAN);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION_IN_PLAN;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// now try to update the plan in the agency, using the current plan version as our
|
||||||
|
// precondition
|
||||||
|
{
|
||||||
|
// create a builder with just the version number for comparison
|
||||||
|
VPackBuilder versionBuilder;
|
||||||
|
versionBuilder.add(VPackValue(planVersion));
|
||||||
|
|
||||||
|
// add a precondition that checks the plan version has not yet changed
|
||||||
|
precs.emplace_back(AgencyPrecondition("Plan/Version", AgencyPrecondition::Type::VALUE, versionBuilder.slice()));
|
||||||
|
|
||||||
AgencyWriteTransaction transaction(opers, precs);
|
AgencyWriteTransaction transaction(opers, precs);
|
||||||
|
|
||||||
{ // we hold this mutex from now on until we have updated our cache
|
{ // we hold this mutex from now on until we have updated our cache
|
||||||
|
@ -1892,68 +1913,17 @@ Result ClusterInfo::createCollectionsCoordinator(std::string const& databaseName
|
||||||
// Only if not precondition failed
|
// Only if not precondition failed
|
||||||
if (!res.successful()) {
|
if (!res.successful()) {
|
||||||
if (res.httpCode() == (int)arangodb::rest::ResponseCode::PRECONDITION_FAILED) {
|
if (res.httpCode() == (int)arangodb::rest::ResponseCode::PRECONDITION_FAILED) {
|
||||||
auto result = res.slice();
|
// use this special error code to signal that we got a precondition failure
|
||||||
AgencyCommResult ag = ac.getValues("/");
|
// in this case the caller can try again with an updated version of the plan change
|
||||||
|
return {TRI_ERROR_REQUEST_CANCELED, "operation aborted due to precondition failure"};
|
||||||
if (result.isArray() && result.length() > 0) {
|
|
||||||
if (result[0].isObject()) {
|
|
||||||
auto tres = result[0];
|
|
||||||
if (!tres.hasKey(std::vector<std::string>(
|
|
||||||
{AgencyCommManager::path(), "Supervision"}))) {
|
|
||||||
for (auto const& info : infos) {
|
|
||||||
events::CreateCollection(databaseName, info.name,
|
|
||||||
TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION_IN_PLAN);
|
|
||||||
}
|
|
||||||
return Result(TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION_IN_PLAN);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string errorMsg;
|
std::string errorMsg = "HTTP code: " + std::to_string(res.httpCode());
|
||||||
|
|
||||||
for (auto const& s :
|
|
||||||
velocypack::ObjectIterator(tres.get(std::vector<std::string>(
|
|
||||||
{AgencyCommManager::path(), "Supervision", "Shards"})))) {
|
|
||||||
errorMsg += std::string("Shard ") + s.key.copyString();
|
|
||||||
errorMsg +=
|
|
||||||
" of prototype collection is blocked by supervision job ";
|
|
||||||
errorMsg += s.value.copyString();
|
|
||||||
}
|
|
||||||
for (auto const& info : infos) {
|
|
||||||
events::CreateCollection(databaseName, info.name,
|
|
||||||
TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION_IN_PLAN);
|
|
||||||
}
|
|
||||||
return Result( // result
|
|
||||||
TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION_IN_PLAN, // code
|
|
||||||
errorMsg // message
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
LOG_TOPIC("f6ecf", ERR, Logger::CLUSTER)
|
|
||||||
<< "Precondition failed for this agency transaction: "
|
|
||||||
<< transaction.toJson() << ", return code: " << res.httpCode();
|
|
||||||
|
|
||||||
if (ag.successful()) {
|
|
||||||
LOG_TOPIC("de7f8", ERR, Logger::CLUSTER) << "Agency dump:\n"
|
|
||||||
<< ag.slice().toJson();
|
|
||||||
} else {
|
|
||||||
LOG_TOPIC("4b178", ERR, Logger::CLUSTER)
|
|
||||||
<< "Could not get agency dump!";
|
|
||||||
}
|
|
||||||
|
|
||||||
// Agency is currently unhappy, try again in a few seconds:
|
|
||||||
std::this_thread::sleep_for(std::chrono::seconds(5));
|
|
||||||
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
std::string errorMsg = "";
|
|
||||||
errorMsg += std::string("file: ") + __FILE__ + " line: " + std::to_string(__LINE__);
|
|
||||||
errorMsg += " HTTP code: " + std::to_string(res.httpCode());
|
|
||||||
errorMsg += " error message: " + res.errorMessage();
|
errorMsg += " error message: " + res.errorMessage();
|
||||||
errorMsg += " error details: " + res.errorDetails();
|
errorMsg += " error details: " + res.errorDetails();
|
||||||
errorMsg += " body: " + res.body();
|
errorMsg += " body: " + res.body();
|
||||||
for (auto const& info : infos) {
|
for (auto const& info : infos) {
|
||||||
events::CreateCollection(databaseName, info.name,
|
events::CreateCollection(databaseName, info.name, TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION_IN_PLAN);
|
||||||
TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION_IN_PLAN);
|
|
||||||
}
|
}
|
||||||
return {TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION_IN_PLAN, std::move(errorMsg)};
|
return {TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION_IN_PLAN, std::move(errorMsg)};
|
||||||
}
|
}
|
||||||
|
@ -1961,7 +1931,6 @@ Result ClusterInfo::createCollectionsCoordinator(std::string const& databaseName
|
||||||
// Update our cache:
|
// Update our cache:
|
||||||
loadPlan();
|
loadPlan();
|
||||||
}
|
}
|
||||||
break; // Leave loop, since we are done
|
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG_TOPIC("98bca", DEBUG, Logger::CLUSTER)
|
LOG_TOPIC("98bca", DEBUG, Logger::CLUSTER)
|
||||||
|
@ -1978,16 +1947,7 @@ Result ClusterInfo::createCollectionsCoordinator(std::string const& databaseName
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get a full agency dump for debugging
|
// Get a full agency dump for debugging
|
||||||
{
|
logAgencyDump();
|
||||||
AgencyCommResult ag = ac.getValues("");
|
|
||||||
if (ag.successful()) {
|
|
||||||
LOG_TOPIC("ab229", ERR, Logger::CLUSTER) << "Agency dump:\n"
|
|
||||||
<< ag.slice().toJson();
|
|
||||||
} else {
|
|
||||||
LOG_TOPIC("2c83c", ERR, Logger::CLUSTER)
|
|
||||||
<< "Could not get agency dump!";
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tmpRes <= TRI_ERROR_NO_ERROR) {
|
if (tmpRes <= TRI_ERROR_NO_ERROR) {
|
||||||
tmpRes = TRI_ERROR_CLUSTER_TIMEOUT;
|
tmpRes = TRI_ERROR_CLUSTER_TIMEOUT;
|
||||||
|
@ -2029,11 +1989,9 @@ Result ClusterInfo::createCollectionsCoordinator(std::string const& databaseName
|
||||||
return res.errorCode();
|
return res.errorCode();
|
||||||
}
|
}
|
||||||
if (tmpRes > TRI_ERROR_NO_ERROR) {
|
if (tmpRes > TRI_ERROR_NO_ERROR) {
|
||||||
{
|
|
||||||
// We do not need to lock all condition variables
|
// We do not need to lock all condition variables
|
||||||
// we are save by cacheMutex
|
// we are safe by using cacheMutex
|
||||||
cbGuard.fire();
|
cbGuard.fire();
|
||||||
}
|
|
||||||
|
|
||||||
// report error
|
// report error
|
||||||
for (auto const& info : infos) {
|
for (auto const& info : infos) {
|
||||||
|
@ -2176,17 +2134,31 @@ Result ClusterInfo::dropCollectionCoordinator( // drop collection
|
||||||
"/shards");
|
"/shards");
|
||||||
|
|
||||||
if (res.successful()) {
|
if (res.successful()) {
|
||||||
velocypack::Slice shards = res.slice()[0].get(std::vector<std::string>(
|
velocypack::Slice databaseSlice = res.slice()[0].get(std::vector<std::string>(
|
||||||
{AgencyCommManager::path(), "Plan", "Collections", dbName, collectionID,
|
{AgencyCommManager::path(), "Plan", "Collections", dbName }));
|
||||||
"shards"}));
|
|
||||||
if (shards.isObject()) {
|
if (!databaseSlice.isObject()) {
|
||||||
numberOfShards = shards.length();
|
// database dropped in the meantime
|
||||||
|
events::DropCollection(dbName, collectionID, TRI_ERROR_ARANGO_DATABASE_NOT_FOUND);
|
||||||
|
return TRI_ERROR_ARANGO_DATABASE_NOT_FOUND;
|
||||||
|
}
|
||||||
|
|
||||||
|
velocypack::Slice collectionSlice = databaseSlice.get(collectionID);
|
||||||
|
if (!collectionSlice.isObject()) {
|
||||||
|
// collection dropped in the meantime
|
||||||
|
events::DropCollection(dbName, collectionID, TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND);
|
||||||
|
return TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND;
|
||||||
|
}
|
||||||
|
|
||||||
|
velocypack::Slice shardsSlice = collectionSlice.get("shards");
|
||||||
|
if (shardsSlice.isObject()) {
|
||||||
|
numberOfShards = shardsSlice.length();
|
||||||
} else {
|
} else {
|
||||||
LOG_TOPIC("d340d", ERR, Logger::CLUSTER)
|
LOG_TOPIC("d340d", ERR, Logger::CLUSTER)
|
||||||
<< "Missing shards information on dropping " << dbName << "/" << collectionID;
|
<< "Missing shards information on dropping " << dbName << "/" << collectionID;
|
||||||
|
|
||||||
events::DropCollection(dbName, collectionID, TRI_ERROR_ARANGO_DATABASE_NOT_FOUND);
|
events::DropCollection(dbName, collectionID, TRI_ERROR_ARANGO_DATABASE_NOT_FOUND);
|
||||||
return Result(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND);
|
return Result(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2206,15 +2178,10 @@ Result ClusterInfo::dropCollectionCoordinator( // drop collection
|
||||||
<< ", return code: " << res.httpCode();
|
<< ", return code: " << res.httpCode();
|
||||||
}
|
}
|
||||||
|
|
||||||
AgencyCommResult ag = ac.getValues("");
|
logAgencyDump();
|
||||||
|
|
||||||
if (ag.successful()) {
|
|
||||||
LOG_TOPIC("53e01", ERR, Logger::CLUSTER) << "Agency dump:\n"
|
|
||||||
<< ag.slice().toJson();
|
|
||||||
} else {
|
|
||||||
LOG_TOPIC("f1bfb", ERR, Logger::CLUSTER) << "Could not get agency dump!";
|
|
||||||
}
|
|
||||||
|
|
||||||
|
// TODO: this should rather be TRI_ERROR_ARANGO_DATABASE_NOT_FOUND, as the
|
||||||
|
// precondition is that the database still exists
|
||||||
events::DropCollection(dbName, collectionID, TRI_ERROR_CLUSTER_COULD_NOT_DROP_COLLECTION);
|
events::DropCollection(dbName, collectionID, TRI_ERROR_CLUSTER_COULD_NOT_DROP_COLLECTION);
|
||||||
return Result(TRI_ERROR_CLUSTER_COULD_NOT_DROP_COLLECTION);
|
return Result(TRI_ERROR_CLUSTER_COULD_NOT_DROP_COLLECTION);
|
||||||
}
|
}
|
||||||
|
@ -2248,15 +2215,8 @@ Result ClusterInfo::dropCollectionCoordinator( // drop collection
|
||||||
<< "Timeout in _drop collection (" << realTimeout << ")"
|
<< "Timeout in _drop collection (" << realTimeout << ")"
|
||||||
<< ": database: " << dbName << ", collId:" << collectionID
|
<< ": database: " << dbName << ", collId:" << collectionID
|
||||||
<< "\ntransaction sent to agency: " << trans.toJson();
|
<< "\ntransaction sent to agency: " << trans.toJson();
|
||||||
AgencyCommResult ag = ac.getValues("");
|
|
||||||
|
|
||||||
if (ag.successful()) {
|
logAgencyDump();
|
||||||
LOG_TOPIC("803c8", ERR, Logger::CLUSTER) << "Agency dump:\n"
|
|
||||||
<< ag.slice().toJson();
|
|
||||||
} else {
|
|
||||||
LOG_TOPIC("37297", ERR, Logger::CLUSTER)
|
|
||||||
<< "Could not get agency dump!";
|
|
||||||
}
|
|
||||||
|
|
||||||
events::DropCollection(dbName, collectionID, TRI_ERROR_CLUSTER_TIMEOUT);
|
events::DropCollection(dbName, collectionID, TRI_ERROR_CLUSTER_TIMEOUT);
|
||||||
return Result(TRI_ERROR_CLUSTER_TIMEOUT);
|
return Result(TRI_ERROR_CLUSTER_TIMEOUT);
|
||||||
|
@ -2415,15 +2375,8 @@ Result ClusterInfo::createViewCoordinator( // create view
|
||||||
if (!res.successful()) {
|
if (!res.successful()) {
|
||||||
if (res.httpCode() == (int)arangodb::rest::ResponseCode::PRECONDITION_FAILED) {
|
if (res.httpCode() == (int)arangodb::rest::ResponseCode::PRECONDITION_FAILED) {
|
||||||
// Dump agency plan:
|
// Dump agency plan:
|
||||||
auto const ag = ac.getValues("/");
|
|
||||||
|
|
||||||
if (ag.successful()) {
|
logAgencyDump();
|
||||||
LOG_TOPIC("d3aac", ERR, Logger::CLUSTER) << "Agency dump:\n"
|
|
||||||
<< ag.slice().toJson();
|
|
||||||
} else {
|
|
||||||
LOG_TOPIC("69f86", ERR, Logger::CLUSTER)
|
|
||||||
<< "Could not get agency dump!";
|
|
||||||
}
|
|
||||||
|
|
||||||
events::CreateView(databaseName, name, TRI_ERROR_CLUSTER_COULD_NOT_CREATE_VIEW_IN_PLAN);
|
events::CreateView(databaseName, name, TRI_ERROR_CLUSTER_COULD_NOT_CREATE_VIEW_IN_PLAN);
|
||||||
return Result( // result
|
return Result( // result
|
||||||
|
@ -2481,15 +2434,7 @@ Result ClusterInfo::dropViewCoordinator( // drop view
|
||||||
" already exist failed. Cannot create view.");
|
" already exist failed. Cannot create view.");
|
||||||
|
|
||||||
// Dump agency plan:
|
// Dump agency plan:
|
||||||
auto const ag = ac.getValues("/");
|
logAgencyDump();
|
||||||
|
|
||||||
if (ag.successful()) {
|
|
||||||
LOG_TOPIC("8a7e8", ERR, Logger::CLUSTER) << "Agency dump:\n"
|
|
||||||
<< ag.slice().toJson();
|
|
||||||
} else {
|
|
||||||
LOG_TOPIC("a7261", ERR, Logger::CLUSTER)
|
|
||||||
<< "Could not get agency dump!";
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
result = Result( // result
|
result = Result( // result
|
||||||
TRI_ERROR_CLUSTER_COULD_NOT_REMOVE_COLLECTION_IN_PLAN, // FIXME COULD_NOT_REMOVE_VIEW_IN_PLAN
|
TRI_ERROR_CLUSTER_COULD_NOT_REMOVE_COLLECTION_IN_PLAN, // FIXME COULD_NOT_REMOVE_VIEW_IN_PLAN
|
||||||
|
@ -2524,14 +2469,7 @@ Result ClusterInfo::setViewPropertiesCoordinator(std::string const& databaseName
|
||||||
{AgencyCommManager::path(), "Plan", "Views", databaseName, viewID});
|
{AgencyCommManager::path(), "Plan", "Views", databaseName, viewID});
|
||||||
|
|
||||||
if (!view.isObject()) {
|
if (!view.isObject()) {
|
||||||
auto const ag = ac.getValues("");
|
logAgencyDump();
|
||||||
|
|
||||||
if (ag.successful()) {
|
|
||||||
LOG_TOPIC("eabbe", ERR, Logger::CLUSTER) << "Agency dump:\n"
|
|
||||||
<< ag.slice().toJson();
|
|
||||||
} else {
|
|
||||||
LOG_TOPIC("5f212", ERR, Logger::CLUSTER) << "Could not get agency dump!";
|
|
||||||
}
|
|
||||||
|
|
||||||
return {TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND};
|
return {TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND};
|
||||||
}
|
}
|
||||||
|
|
|
@ -298,6 +298,9 @@ class ClusterInfo final {
|
||||||
static void cleanup();
|
static void cleanup();
|
||||||
|
|
||||||
public:
|
public:
|
||||||
|
/// @brief produces an agency dump and logs it
|
||||||
|
void logAgencyDump() const;
|
||||||
|
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
/// @brief get a number of cluster-wide unique IDs, returns the first
|
/// @brief get a number of cluster-wide unique IDs, returns the first
|
||||||
/// one and guarantees that <number> are reserved for the caller.
|
/// one and guarantees that <number> are reserved for the caller.
|
||||||
|
@ -421,17 +424,21 @@ class ClusterInfo final {
|
||||||
double timeout // request timeout
|
double timeout // request timeout
|
||||||
);
|
);
|
||||||
|
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
/// @brief this method does an atomic check of the preconditions for the collections
|
||||||
|
/// to be created, using the currently loaded plan. it populates the plan version
|
||||||
|
/// used for the checks
|
||||||
|
Result checkCollectionPreconditions(std::string const& databaseName,
|
||||||
|
std::vector<ClusterCollectionCreationInfo> const& infos,
|
||||||
|
uint64_t& planVersion);
|
||||||
|
|
||||||
/// @brief create multiple collections in coordinator
|
/// @brief create multiple collections in coordinator
|
||||||
/// If any one of these collections fails, all creations will be
|
/// If any one of these collections fails, all creations will be
|
||||||
/// rolled back.
|
/// rolled back.
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
/// Note that in contrast to most other methods here, this method does not
|
||||||
|
/// get a timeout parameter, but an endTime parameter!!!
|
||||||
Result createCollectionsCoordinator(std::string const& databaseName,
|
Result createCollectionsCoordinator(std::string const& databaseName,
|
||||||
std::vector<ClusterCollectionCreationInfo>&,
|
std::vector<ClusterCollectionCreationInfo>&, double endTime);
|
||||||
double timeout);
|
|
||||||
|
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
|
||||||
/// @brief drop collection in coordinator
|
/// @brief drop collection in coordinator
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
Result dropCollectionCoordinator( // drop collection
|
Result dropCollectionCoordinator( // drop collection
|
||||||
|
@ -657,20 +664,20 @@ class ClusterInfo final {
|
||||||
*/
|
*/
|
||||||
arangodb::Result getShardServers(ShardID const& shardId, std::vector<ServerID>&);
|
arangodb::Result getShardServers(ShardID const& shardId, std::vector<ServerID>&);
|
||||||
|
|
||||||
private:
|
|
||||||
void loadClusterId();
|
|
||||||
|
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
/// @brief get an operation timeout
|
/// @brief get an operation timeout
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
double getTimeout(double timeout) const {
|
static double getTimeout(double timeout) {
|
||||||
if (timeout == 0.0) {
|
if (timeout == 0.0) {
|
||||||
return 24.0 * 3600.0;
|
return 24.0 * 3600.0;
|
||||||
}
|
}
|
||||||
return timeout;
|
return timeout;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
void loadClusterId();
|
||||||
|
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
/// @brief get the poll interval
|
/// @brief get the poll interval
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
|
@ -2881,20 +2881,35 @@ std::vector<std::shared_ptr<LogicalCollection>> ClusterMethods::persistCollectio
|
||||||
std::vector<std::shared_ptr<LogicalCollection>>& collections,
|
std::vector<std::shared_ptr<LogicalCollection>>& collections,
|
||||||
bool ignoreDistributeShardsLikeErrors, bool waitForSyncReplication,
|
bool ignoreDistributeShardsLikeErrors, bool waitForSyncReplication,
|
||||||
bool enforceReplicationFactor) {
|
bool enforceReplicationFactor) {
|
||||||
|
|
||||||
TRI_ASSERT(!collections.empty());
|
TRI_ASSERT(!collections.empty());
|
||||||
if (collections.empty()) {
|
if (collections.empty()) {
|
||||||
THROW_ARANGO_EXCEPTION_MESSAGE(
|
THROW_ARANGO_EXCEPTION_MESSAGE(
|
||||||
TRI_ERROR_INTERNAL,
|
TRI_ERROR_INTERNAL,
|
||||||
"Trying to create an empty list of collections on coordinator.");
|
"Trying to create an empty list of collections on coordinator.");
|
||||||
}
|
}
|
||||||
// We have at least one, take this collections DB name
|
|
||||||
auto& dbName = collections[0]->vocbase().name();
|
double const realTimeout = ClusterInfo::getTimeout(240.0);
|
||||||
|
double const endTime = TRI_microtime() + realTimeout;
|
||||||
|
|
||||||
|
// We have at least one, take this collection's DB name
|
||||||
|
// (if there are multiple collections to create, the assumption is that
|
||||||
|
// all collections have the same database name - ArangoDB does not
|
||||||
|
// support cross-database operations and they cannot be triggered by
|
||||||
|
// users)
|
||||||
|
auto const dbName = collections[0]->vocbase().name();
|
||||||
ClusterInfo* ci = ClusterInfo::instance();
|
ClusterInfo* ci = ClusterInfo::instance();
|
||||||
|
|
||||||
|
std::vector<ClusterCollectionCreationInfo> infos;
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
infos.clear();
|
||||||
|
|
||||||
ci->loadCurrentDBServers();
|
ci->loadCurrentDBServers();
|
||||||
std::vector<std::string> dbServers = ci->getCurrentDBServers();
|
std::vector<std::string> dbServers = ci->getCurrentDBServers();
|
||||||
std::vector<ClusterCollectionCreationInfo> infos;
|
|
||||||
std::vector<std::shared_ptr<VPackBuffer<uint8_t>>> vpackData;
|
|
||||||
infos.reserve(collections.size());
|
infos.reserve(collections.size());
|
||||||
|
|
||||||
|
std::vector<std::shared_ptr<VPackBuffer<uint8_t>>> vpackData;
|
||||||
vpackData.reserve(collections.size());
|
vpackData.reserve(collections.size());
|
||||||
for (auto& col : collections) {
|
for (auto& col : collections) {
|
||||||
// We can only serve on Database at a time with this call.
|
// We can only serve on Database at a time with this call.
|
||||||
|
@ -2943,8 +2958,6 @@ std::vector<std::shared_ptr<LogicalCollection>> ClusterMethods::persistCollectio
|
||||||
if (!avoid.empty()) {
|
if (!avoid.empty()) {
|
||||||
// We need to remove all servers that are in the avoid list
|
// We need to remove all servers that are in the avoid list
|
||||||
if (dbServers.size() - avoid.size() < replicationFactor) {
|
if (dbServers.size() - avoid.size() < replicationFactor) {
|
||||||
TRI_ASSERT(minReplicationFactor <= replicationFactor);
|
|
||||||
// => (dbServers.size() - avoid.size() < minReplicationFactor) is granted
|
|
||||||
LOG_TOPIC("03682", DEBUG, Logger::CLUSTER)
|
LOG_TOPIC("03682", DEBUG, Logger::CLUSTER)
|
||||||
<< "Do not have enough DBServers for requested replicationFactor,"
|
<< "Do not have enough DBServers for requested replicationFactor,"
|
||||||
<< " (after considering avoid list),"
|
<< " (after considering avoid list),"
|
||||||
|
@ -2979,16 +2992,46 @@ std::vector<std::shared_ptr<LogicalCollection>> ClusterMethods::persistCollectio
|
||||||
VPackBuilder velocy =
|
VPackBuilder velocy =
|
||||||
col->toVelocyPackIgnore(ignoreKeys, LogicalDataSource::makeFlags());
|
col->toVelocyPackIgnore(ignoreKeys, LogicalDataSource::makeFlags());
|
||||||
|
|
||||||
infos.emplace_back(ClusterCollectionCreationInfo{
|
infos.emplace_back(
|
||||||
std::to_string(col->id()), col->numberOfShards(), col->replicationFactor(),
|
ClusterCollectionCreationInfo{std::to_string(col->id()),
|
||||||
col->minReplicationFactor(), waitForSyncReplication, velocy.slice()});
|
col->numberOfShards(), col->replicationFactor(),
|
||||||
|
col->minReplicationFactor(),
|
||||||
|
waitForSyncReplication, velocy.slice()});
|
||||||
vpackData.emplace_back(velocy.steal());
|
vpackData.emplace_back(velocy.steal());
|
||||||
}
|
}
|
||||||
|
|
||||||
Result res = ci->createCollectionsCoordinator(dbName, infos, 240.0);
|
// pass in the *endTime* here, not a timeout!
|
||||||
if (res.fail()) {
|
Result res = ci->createCollectionsCoordinator(dbName, infos, endTime);
|
||||||
|
|
||||||
|
if (res.ok()) {
|
||||||
|
// success! exit the loop and go on
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (res.is(TRI_ERROR_REQUEST_CANCELED)) {
|
||||||
|
// special error code indicating that storing the updated plan in the agency
|
||||||
|
// didn't succeed, and that we should try again
|
||||||
|
|
||||||
|
// sleep for a while
|
||||||
|
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||||
|
|
||||||
|
if (TRI_microtime() > endTime) {
|
||||||
|
// timeout expired
|
||||||
|
THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_TIMEOUT);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (arangodb::application_features::ApplicationServer::isStopping()) {
|
||||||
|
THROW_ARANGO_EXCEPTION(TRI_ERROR_SHUTTING_DOWN);
|
||||||
|
}
|
||||||
|
|
||||||
|
// try in next iteration with an adjusted plan change attempt
|
||||||
|
continue;
|
||||||
|
|
||||||
|
} else {
|
||||||
|
// any other error
|
||||||
THROW_ARANGO_EXCEPTION(res);
|
THROW_ARANGO_EXCEPTION(res);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
ci->loadPlan();
|
ci->loadPlan();
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue