//////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// /// Copyright 2014-2018 ArangoDB GmbH, Cologne, Germany /// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// /// Copyright holder is ArangoDB GmbH, Cologne, Germany /// /// @author Max Neunhoeffer /// @author Jan Steemann /// @author Kaveh Vahedipour //////////////////////////////////////////////////////////////////////////////// #include "ClusterInfo.h" #include "Basics/ConditionLocker.h" #include "Basics/Exceptions.h" #include "Basics/MutexLocker.h" #include "Basics/RecursiveLocker.h" #include "Basics/StringUtils.h" #include "Basics/VelocyPackHelper.h" #include "Basics/WriteLocker.h" #include "Basics/hashes.h" #include "Cluster/ClusterCollectionCreationInfo.h" #include "Cluster/ClusterHelpers.h" #include "Cluster/RebootTracker.h" #include "Cluster/ServerState.h" #include "Logger/Logger.h" #include "Random/RandomGenerator.h" #include "Rest/HttpResponse.h" #include "RestServer/DatabaseFeature.h" #include "StorageEngine/PhysicalCollection.h" #include "Utils/Events.h" #include "VocBase/LogicalCollection.h" #include "VocBase/LogicalView.h" #include "VocBase/Methods/Databases.h" #ifdef USE_ENTERPRISE #include "Enterprise/VocBase/SmartVertexCollection.h" #include "Enterprise/VocBase/VirtualCollection.h" #endif #include #include #include #include #include #include #include #include namespace { static inline arangodb::AgencyOperation IncreaseVersion() { return arangodb::AgencyOperation{"Plan/Version", arangodb::AgencySimpleOperationType::INCREMENT_OP}; } static inline std::string collectionPath(std::string const& dbName, std::string const& collection) { return "Plan/Collections/" + dbName + "/" + collection; } static inline arangodb::AgencyOperation CreateCollectionOrder(std::string const& dbName, std::string const& collection, VPackSlice const& info, double timeout) { #ifdef ARANGODB_ENABLE_MAINTAINER_MODE if (!info.get("shards").isEmptyObject() && !arangodb::basics::VelocyPackHelper::getBooleanValue(info, arangodb::StaticStrings::IsSmart, false)) { TRI_ASSERT(info.hasKey(arangodb::StaticStrings::IsBuilding)); TRI_ASSERT(info.get(arangodb::StaticStrings::IsBuilding).isBool()); TRI_ASSERT(info.get(arangodb::StaticStrings::IsBuilding).getBool() == true); } #endif arangodb::AgencyOperation op{collectionPath(dbName, collection), arangodb::AgencyValueOperationType::SET, info}; op._ttl = static_cast(timeout); return op; } static inline arangodb::AgencyPrecondition CreateCollectionOrderPrecondition( std::string const& dbName, std::string const& collection, VPackSlice const& info) { arangodb::AgencyPrecondition prec{collectionPath(dbName, collection), arangodb::AgencyPrecondition::Type::VALUE, info}; return prec; } static inline arangodb::AgencyOperation CreateCollectionSuccess( std::string const& dbName, std::string const& collection, VPackSlice const& info) { TRI_ASSERT(!info.hasKey(arangodb::StaticStrings::IsBuilding)); return arangodb::AgencyOperation{collectionPath(dbName, collection), arangodb::AgencyValueOperationType::SET, info}; } } // namespace #ifdef _WIN32 // turn off warnings about too long type name for debug symbols blabla in MSVC // only... #pragma warning(disable : 4503) #endif using namespace arangodb; using namespace arangodb::cluster; using namespace arangodb::methods; static std::unique_ptr _instance; //////////////////////////////////////////////////////////////////////////////// /// @brief a local helper to report errors and messages //////////////////////////////////////////////////////////////////////////////// static inline int setErrormsg(int ourerrno, std::string& errorMsg) { errorMsg = TRI_errno_string(ourerrno); return ourerrno; } //////////////////////////////////////////////////////////////////////////////// /// @brief check whether the JSON returns an error //////////////////////////////////////////////////////////////////////////////// static inline bool hasError(VPackSlice const& slice) { return arangodb::basics::VelocyPackHelper::getBooleanValue(slice, "error", false); } //////////////////////////////////////////////////////////////////////////////// /// @brief extract the error message from a JSON //////////////////////////////////////////////////////////////////////////////// static std::string extractErrorMessage(std::string const& shardId, VPackSlice const& slice) { std::string msg = " shardID:" + shardId + ": "; // add error message text msg += arangodb::basics::VelocyPackHelper::getStringValue(slice, "errorMessage", ""); // add error number if (slice.hasKey(StaticStrings::ErrorNum)) { VPackSlice const errorNum = slice.get(StaticStrings::ErrorNum); if (errorNum.isNumber()) { msg += " (errNum=" + arangodb::basics::StringUtils::itoa(errorNum.getNumericValue()) + ")"; } } return msg; } //////////////////////////////////////////////////////////////////////////////// /// @brief creates an empty collection info object //////////////////////////////////////////////////////////////////////////////// CollectionInfoCurrent::CollectionInfoCurrent(uint64_t currentVersion) : _currentVersion(currentVersion) {} //////////////////////////////////////////////////////////////////////////////// /// @brief destroys a collection info object //////////////////////////////////////////////////////////////////////////////// CollectionInfoCurrent::~CollectionInfoCurrent() {} //////////////////////////////////////////////////////////////////////////////// /// @brief create the clusterinfo instance //////////////////////////////////////////////////////////////////////////////// void ClusterInfo::createInstance(AgencyCallbackRegistry* agencyCallbackRegistry) { _instance.reset(new ClusterInfo(agencyCallbackRegistry)); } //////////////////////////////////////////////////////////////////////////////// /// @brief returns an instance of the cluster info class //////////////////////////////////////////////////////////////////////////////// ClusterInfo* ClusterInfo::instance() { return _instance.get(); } //////////////////////////////////////////////////////////////////////////////// /// @brief creates a cluster info object //////////////////////////////////////////////////////////////////////////////// ClusterInfo::ClusterInfo(AgencyCallbackRegistry* agencyCallbackRegistry) : _agency(), _agencyCallbackRegistry(agencyCallbackRegistry), _rebootTracker(SchedulerFeature::SCHEDULER), _planVersion(0), _currentVersion(0), _planLoader(std::thread::id()), _uniqid() { _uniqid._currentValue = 1ULL; _uniqid._upperValue = 0ULL; _uniqid._nextBatchStart = 1ULL; _uniqid._nextUpperValue = 0ULL; _uniqid._backgroundJobIsRunning = false; // Actual loading into caches is postponed until necessary } //////////////////////////////////////////////////////////////////////////////// /// @brief destroys a cluster info object //////////////////////////////////////////////////////////////////////////////// ClusterInfo::~ClusterInfo() {} //////////////////////////////////////////////////////////////////////////////// /// @brief cleanup method which frees cluster-internal shared ptrs on shutdown //////////////////////////////////////////////////////////////////////////////// void ClusterInfo::cleanup() { ClusterInfo* theInstance = instance(); if (theInstance == nullptr) { return; } while (true) { { MUTEX_LOCKER(mutexLocker, theInstance->_idLock); if (!theInstance->_uniqid._backgroundJobIsRunning) { break ; } } std::this_thread::sleep_for(std::chrono::seconds(1)); } MUTEX_LOCKER(mutexLocker, theInstance->_planProt.mutex); TRI_ASSERT(theInstance->_newPlannedViews.empty()); // only non-empty during loadPlan() theInstance->_plannedViews.clear(); theInstance->_plannedCollections.clear(); theInstance->_shards.clear(); theInstance->_shardKeys.clear(); theInstance->_shardIds.clear(); theInstance->_currentCollections.clear(); } /// @brief produces an agency dump and logs it void ClusterInfo::logAgencyDump() const { #ifdef ARANGODB_ENABLE_MAINTAINER_MODE AgencyComm ac; AgencyCommResult ag = ac.getValues("/"); if (ag.successful()) { LOG_TOPIC("fe8ce", INFO, Logger::CLUSTER) << "Agency dump:\n" << ag.slice().toJson(); } else { LOG_TOPIC("e7e30", WARN, Logger::CLUSTER) << "Could not get agency dump!"; } #endif } void ClusterInfo::triggerBackgroundGetIds() { // Trigger a new load of batches _uniqid._nextBatchStart = 1ULL; _uniqid._nextUpperValue = 0ULL; try { if (_uniqid._backgroundJobIsRunning) { return ; } _uniqid._backgroundJobIsRunning = true; std::thread([this]{ auto guardRunning = scopeGuard([this]{ MUTEX_LOCKER(mutexLocker, _idLock); _uniqid._backgroundJobIsRunning = false; }); uint64_t result; try { result = _agency.uniqid(MinIdsPerBatch, 0.0); } catch (std::exception const&) { return ; } { MUTEX_LOCKER(mutexLocker, _idLock); if (1ULL == _uniqid._nextBatchStart) { // Invalidate next batch _uniqid._nextBatchStart = result; _uniqid._nextUpperValue = result + MinIdsPerBatch - 1; } // If we get here, somebody else tried succeeded in doing the same, // so we just try again. } }).detach(); } catch (std::exception const& e) { LOG_TOPIC("adef4", WARN, Logger::CLUSTER) << "Failed to trigger background get ids. " << e.what(); } } //////////////////////////////////////////////////////////////////////////////// /// @brief increase the uniqid value. if it exceeds the upper bound, fetch a /// new upper bound value from the agency //////////////////////////////////////////////////////////////////////////////// uint64_t ClusterInfo::uniqid(uint64_t count) { MUTEX_LOCKER(mutexLocker, _idLock); if (_uniqid._currentValue + count - 1 <= _uniqid._upperValue) { uint64_t result = _uniqid._currentValue; _uniqid._currentValue += count; return result; } // Try if we can use the next batch if (_uniqid._nextBatchStart + count - 1 <= _uniqid._nextUpperValue) { uint64_t result = _uniqid._nextBatchStart; _uniqid._currentValue = _uniqid._nextBatchStart + count; _uniqid._upperValue = _uniqid._nextUpperValue; triggerBackgroundGetIds(); return result; } // We need to fetch from the agency uint64_t fetch = count; if (fetch < MinIdsPerBatch) { fetch = MinIdsPerBatch; } uint64_t result = _agency.uniqid(2 * fetch, 0.0); _uniqid._currentValue = result + count; _uniqid._upperValue = result + fetch - 1; // Invalidate next batch _uniqid._nextBatchStart = _uniqid._upperValue + 1; _uniqid._nextUpperValue = _uniqid._upperValue + fetch - 1; return result; } //////////////////////////////////////////////////////////////////////////////// /// @brief flush the caches (used for testing) //////////////////////////////////////////////////////////////////////////////// void ClusterInfo::flush() { loadServers(); loadCurrentDBServers(); loadCurrentCoordinators(); loadCurrentMappings(); loadPlan(); loadCurrent(); } //////////////////////////////////////////////////////////////////////////////// /// @brief ask whether a cluster database exists //////////////////////////////////////////////////////////////////////////////// bool ClusterInfo::doesDatabaseExist(DatabaseID const& databaseID, bool reload) { int tries = 0; if (reload || !_planProt.isValid || !_currentProt.isValid || !_DBServersProt.isValid) { loadPlan(); loadCurrent(); loadCurrentDBServers(); ++tries; // no need to reload if the database is not found } // From now on we know that all data has been valid once, so no need // to check the isValid flags again under the lock. while (true) { { size_t expectedSize; { READ_LOCKER(readLocker, _DBServersProt.lock); expectedSize = _DBServers.size(); } // look up database by name: READ_LOCKER(readLocker, _planProt.lock); // _plannedDatabases is a map-type auto it = _plannedDatabases.find(databaseID); if (it != _plannedDatabases.end()) { // found the database in Plan READ_LOCKER(readLocker, _currentProt.lock); // _currentDatabases is // a map-type> auto it2 = _currentDatabases.find(databaseID); if (it2 != _currentDatabases.end()) { // found the database in Current return ((*it2).second.size() >= expectedSize); } } } if (++tries >= 2) { break; } loadPlan(); loadCurrent(); loadCurrentDBServers(); } return false; } //////////////////////////////////////////////////////////////////////////////// /// @brief get list of databases in the cluster //////////////////////////////////////////////////////////////////////////////// std::vector ClusterInfo::databases(bool reload) { std::vector result; if (_clusterId.empty()) { loadClusterId(); } if (reload || !_planProt.isValid || !_currentProt.isValid || !_DBServersProt.isValid) { loadPlan(); loadCurrent(); loadCurrentDBServers(); } // From now on we know that all data has been valid once, so no need // to check the isValid flags again under the lock. size_t expectedSize; { READ_LOCKER(readLocker, _DBServersProt.lock); expectedSize = _DBServers.size(); } { READ_LOCKER(readLockerPlanned, _planProt.lock); READ_LOCKER(readLockerCurrent, _currentProt.lock); // _plannedDatabases is a map-type auto it = _plannedDatabases.begin(); while (it != _plannedDatabases.end()) { // _currentDatabases is: // a map-type> auto it2 = _currentDatabases.find((*it).first); if (it2 != _currentDatabases.end()) { if ((*it2).second.size() >= expectedSize) { result.push_back((*it).first); } } ++it; } } return result; } /// @brief Load cluster ID void ClusterInfo::loadClusterId() { // Contact agency for //Cluster AgencyCommResult result = _agency.getValues("Cluster"); // Parse if (result.successful()) { VPackSlice slice = result.slice()[0].get( std::vector({AgencyCommManager::path(), "Cluster"})); if (slice.isString()) { _clusterId = slice.copyString(); } } } //////////////////////////////////////////////////////////////////////////////// /// @brief (re-)load the information about our plan /// Usually one does not have to call this directly. //////////////////////////////////////////////////////////////////////////////// static std::string const prefixPlan = "Plan"; void ClusterInfo::loadPlan() { DatabaseFeature* databaseFeature = application_features::ApplicationServer::getFeature( "Database"); ++_planProt.wantedVersion; // Indicate that after *NOW* somebody has to // reread from the agency! MUTEX_LOCKER(mutexLocker, _planProt.mutex); // only one may work at a time // For ArangoSearch views we need to get access to immediately created views // in order to allow links to be created correctly. // For the scenario above, we track such views in '_newPlannedViews' member // which is supposed to be empty before and after 'ClusterInfo::loadPlan()' // execution. In addition, we do the following "trick" to provide access to // '_newPlannedViews' from outside 'ClusterInfo': in case if // 'ClusterInfo::getView' has been called from within 'ClusterInfo::loadPlan', // we redirect caller to search view in // '_newPlannedViews' member instead of '_plannedViews' // set plan loader TRI_ASSERT(_newPlannedViews.empty()); _planLoader = std::this_thread::get_id(); // ensure we'll eventually reset plan loader auto resetLoader = scopeGuard([this]() { _planLoader = std::thread::id(); _newPlannedViews.clear(); }); bool planValid = true; // has the loadPlan compleated without skipping valid objects uint64_t storedVersion = _planProt.wantedVersion; // this is the version // we will set in the end LOG_TOPIC("eb0e4", DEBUG, Logger::CLUSTER) << "loadPlan: wantedVersion=" << storedVersion << ", doneVersion=" << _planProt.doneVersion; if (_planProt.doneVersion == storedVersion) { // Somebody else did, what we intended to do, so just return return; } // Now contact the agency: AgencyCommResult result = _agency.getValues(prefixPlan); if (!result.successful()) { LOG_TOPIC("989d5", DEBUG, Logger::CLUSTER) << "Error while loading " << prefixPlan << " httpCode: " << result.httpCode() << " errorCode: " << result.errorCode() << " errorMessage: " << result.errorMessage() << " body: " << result.body(); return; } auto resultSlice = result.slice(); if (!resultSlice.isArray() || resultSlice.length() != 1) { LOG_TOPIC("e089b", DEBUG, Logger::CLUSTER) << "Error while loading " << prefixPlan << " response structure is not an array of size 1" << " httpCode: " << result.httpCode() << " errorCode: " << result.errorCode() << " errorMessage: " << result.errorMessage() << " body: " << result.body(); return; } auto slice = resultSlice[0].get( // get slice std::vector({AgencyCommManager::path(), "Plan"}) // args ); auto planBuilder = std::make_shared(slice); auto planSlice = planBuilder->slice(); if (!planSlice.isObject()) { LOG_TOPIC("bc8e1", ERR, Logger::CLUSTER) << "\"Plan\" is not an object in agency"; return; } uint64_t newPlanVersion = 0; auto planVersionSlice = planSlice.get("Version"); if (planVersionSlice.isNumber()) { try { newPlanVersion = planVersionSlice.getNumber(); } catch (...) { } } LOG_TOPIC("c6303", TRACE, Logger::CLUSTER) << "loadPlan: newPlanVersion=" << newPlanVersion; if (newPlanVersion == 0) { LOG_TOPIC("d68ae", WARN, Logger::CLUSTER) << "Attention: /arango/Plan/Version in the agency is not set or not a " "positive number."; } { READ_LOCKER(guard, _planProt.lock); if (_planProt.isValid && newPlanVersion <= _planVersion) { LOG_TOPIC("20450", DEBUG, Logger::CLUSTER) << "We already know this or a later version, do not update. " << "newPlanVersion=" << newPlanVersion << " _planVersion=" << _planVersion; return; } } decltype(_plannedDatabases) newDatabases; std::set buildingDatabases; decltype(_plannedCollections) newCollections; // map // > // > decltype(_shards) newShards; decltype(_shardServers) newShardServers; decltype(_shardKeys) newShardKeys; bool swapDatabases = false; bool swapCollections = false; bool swapViews = false; auto planDatabasesSlice = planSlice.get("Databases"); if (planDatabasesSlice.isObject()) { swapDatabases = true; // mark for swap even if no databases present to ensure dangling datasources are removed std::string name; for (auto const& database : velocypack::ObjectIterator(planDatabasesSlice)) { try { name = database.key.copyString(); } catch (arangodb::velocypack::Exception const& e) { LOG_TOPIC("adc82", ERR, Logger::AGENCY) << "Failed to get database name from json, error '" << e.what() << "'. VelocyPack: " << database.key.toJson(); throw; } // We create the database object on the coordinator here, because // it is used to create LogicalCollection instances further // down in this function. if (ServerState::instance()->isCoordinator() && !database.value.hasKey(StaticStrings::DatabaseIsBuilding)) { TRI_vocbase_t* vocbase = databaseFeature->lookupDatabase(name); if (vocbase == nullptr) { // database does not yet exist, create it now TRI_voc_tick_t id = arangodb::basics::VelocyPackHelper::stringUInt64(database.value, StaticStrings::DatabaseId); // create a local database object... Result res = databaseFeature->createDatabase(id, name, vocbase); if (res.fail()) { LOG_TOPIC("91870", ERR, arangodb::Logger::AGENCY) << "creating local database '" << name << "' failed: " << res.errorMessage(); } } } // On a coordinator we can only see databases that have been fully created if (!(ServerState::instance()->isCoordinator() && database.value.hasKey(StaticStrings::DatabaseIsBuilding))) { newDatabases.emplace(std::move(name), database.value); } else { buildingDatabases.emplace(std::move(name)); } } } // Ensure views are being created BEFORE collections to allow // links find them // Immediate children of "Views" are database names, then ids // of views, then one JSON object with the description: // "Plan":{"Views": { // "_system": { // "654321": { // "id": "654321", // "name": "v", // "collections": [ // // ] // },... // },... // }} // Now the same for views: auto planViewsSlice = planSlice.get("Views"); // format above if (planViewsSlice.isObject()) { swapViews = true; // mark for swap even if no databases present to ensure dangling datasources are removed for (auto const& databasePairSlice : velocypack::ObjectIterator(planViewsSlice)) { auto const& viewsSlice = databasePairSlice.value; if (!viewsSlice.isObject()) { LOG_TOPIC("0ee7f", INFO, Logger::AGENCY) << "Views in the plan is not a valid json object." << " Views will be ignored for now and the invalid information" << " will be repaired. VelocyPack: " << viewsSlice.toJson(); continue; } auto const databaseName = databasePairSlice.key.copyString(); auto* vocbase = databaseFeature->lookupDatabase(databaseName); if (!vocbase) { // No database with this name found. // We have an invalid state here. LOG_TOPIC("f105f", WARN, Logger::AGENCY) << "No database '" << databaseName << "' found," << " corresponding view will be ignored for now and the " << "invalid information will be repaired. VelocyPack: " << viewsSlice.toJson(); planValid &= !viewsSlice.length(); // cannot find vocbase for defined views (allow empty views for missing vocbase) continue; } for (auto const& viewPairSlice : velocypack::ObjectIterator(viewsSlice)) { auto const& viewSlice = viewPairSlice.value; if (!viewSlice.isObject()) { LOG_TOPIC("2487b", INFO, Logger::AGENCY) << "View entry is not a valid json object." << " The view will be ignored for now and the invalid " << "information will be repaired. VelocyPack: " << viewSlice.toJson(); continue; } auto const viewId = viewPairSlice.key.copyString(); try { LogicalView::ptr view; auto res = LogicalView::instantiate( // instantiate view, *vocbase, viewPairSlice.value, newPlanVersion // args ); if (!res.ok() || !view) { LOG_TOPIC("b0d48", ERR, Logger::AGENCY) << "Failed to create view '" << viewId << "'. The view will be ignored for now and the invalid " << "information will be repaired. VelocyPack: " << viewSlice.toJson(); planValid = false; // view creation failure continue; } auto& views = _newPlannedViews[databaseName]; // register with guid/id/name views.reserve(views.size() + 3); views[viewId] = view; views[view->name()] = view; views[view->guid()] = view; } catch (std::exception const& ex) { // The Plan contains invalid view information. // This should not happen in healthy situations. // If it happens in unhealthy situations the // cluster should not fail. LOG_TOPIC("ec9e6", ERR, Logger::AGENCY) << "Failed to load information for view '" << viewId << "': " << ex.what() << ". invalid information in Plan. The " << "view will be ignored for now and the invalid " << "information will be repaired. VelocyPack: " << viewSlice.toJson(); TRI_ASSERT(false); continue; } catch (...) { // The Plan contains invalid view information. // This should not happen in healthy situations. // If it happens in unhealthy situations the // cluster should not fail. LOG_TOPIC("660bf", ERR, Logger::AGENCY) << "Failed to load information for view '" << viewId << ". invalid information in Plan. The view will " << "be ignored for now and the invalid information will " << "be repaired. VelocyPack: " << viewSlice.toJson(); TRI_ASSERT(false); continue; } } } } // Immediate children of "Collections" are database names, then ids // of collections, then one JSON object with the description: // "Plan":{"Collections": { // "_system": { // "3010001": { // "deleted": false, // DO_COMPACT: true, // "id": "3010001", // INDEX_BUCKETS: 8, // "indexes": [ // { // "fields": [ // "_key" // ], // "id": "0", // "sparse": false, // "type": "primary", // "unique": true // } // ], // "isSmart": false, // "isSystem": true, // "isVolatile": false, // JOURNAL_SIZE: 1048576, // "keyOptions": { // "allowUserKeys": true, // "lastValue": 0, // "type": "traditional" // }, // "name": "_graphs", // "numberOfSh ards": 1, // "path": "", // "replicationFactor": 2, // "shardKeys": [ // "_key" // ], // "shards": { // "s3010002": [ // "PRMR-bf44d6fe-e31c-4b09-a9bf-e2df6c627999", // "PRMR-11a29830-5aca-454b-a2c3-dac3a08baca1" // ] // }, // "status": 3, // "statusString": "loaded", // "type": 2, // StaticStrings::WaitForSyncString: false // },... // },... // }} auto planCollectionsSlice = planSlice.get("Collections"); // format above if (planCollectionsSlice.isObject()) { swapCollections = true; // mark for swap even if no databases present to ensure dangling datasources are removed bool const isCoordinator = ServerState::instance()->isCoordinator(); for (auto const& databasePairSlice : velocypack::ObjectIterator(planCollectionsSlice)) { auto const& collectionsSlice = databasePairSlice.value; if (!collectionsSlice.isObject()) { LOG_TOPIC("e2e7a", INFO, Logger::AGENCY) << "Collections in the plan is not a valid json object." << " Collections will be ignored for now and the invalid " << "information will be repaired. VelocyPack: " << collectionsSlice.toJson(); continue; } DatabaseCollections databaseCollections; auto const databaseName = databasePairSlice.key.copyString(); // Skip databases that are still building. if (buildingDatabases.find(databaseName) != buildingDatabases.end()) { continue; } auto* vocbase = databaseFeature->lookupDatabase(databaseName); if (!vocbase) { // No database with this name found. // We have an invalid state here. LOG_TOPIC("83d4c", WARN, Logger::AGENCY) << "No database '" << databaseName << "' found," << " corresponding collection will be ignored for now and the " << "invalid information will be repaired. VelocyPack: " << collectionsSlice.toJson(); planValid &= !collectionsSlice.length(); // cannot find vocbase for defined collections (allow empty collections for missing vocbase) continue; } for (auto const& collectionPairSlice : velocypack::ObjectIterator(collectionsSlice)) { auto const& collectionSlice = collectionPairSlice.value; if (!collectionSlice.isObject()) { LOG_TOPIC("0f689", WARN, Logger::AGENCY) << "Collection entry is not a valid json object." << " The collection will be ignored for now and the invalid " << "information will be repaired. VelocyPack: " << collectionSlice.toJson(); continue; } auto const collectionId = collectionPairSlice.key.copyString(); try { std::shared_ptr newCollection; #if defined(USE_ENTERPRISE) auto isSmart = collectionSlice.get(StaticStrings::IsSmart); if (isSmart.isTrue()) { auto type = collectionSlice.get(StaticStrings::DataSourceType); if (type.isInteger() && type.getUInt() == TRI_COL_TYPE_EDGE) { newCollection = std::make_shared( // create collection *vocbase, collectionSlice, newPlanVersion // args ); } else { newCollection = std::make_shared( // create collection *vocbase, collectionSlice, newPlanVersion // args ); } } else #endif { newCollection = std::make_shared( // create collection *vocbase, collectionSlice, true, newPlanVersion // args ); } auto& collectionName = newCollection->name(); bool isBuilding = isCoordinator && arangodb::basics::VelocyPackHelper::getBooleanValue( collectionSlice, StaticStrings::IsBuilding, false); if (isCoordinator) { // copying over index estimates from the old version of the // collection into the new one LOG_TOPIC("7a884", TRACE, Logger::CLUSTER) << "copying index estimates"; // it is effectively safe to access _plannedCollections in // read-only mode here, as the only places that modify // _plannedCollections are the shutdown and this function // itself, which is protected by a mutex auto it = _plannedCollections.find(databaseName); if (it != _plannedCollections.end()) { auto it2 = (*it).second.find(collectionId); if (it2 != (*it).second.end()) { try { auto estimates = (*it2).second->clusterIndexEstimates(false); if (!estimates.empty()) { // already have an estimate... now copy it over newCollection->setClusterIndexEstimates(std::move(estimates)); } } catch (...) { // this may fail during unit tests, when mocks are used } } } } // NOTE: This is building has the following feature. A collection needs to be working on // all DBServers to allow replication to go on, also we require to have the shards planned. // BUT: The users should not be able to detect these collections. // Hence we simply do NOT add the collection to the coordinator local vocbase, which happens // inside the IF if (!isBuilding) { // register with name as well as with id: databaseCollections.emplace(collectionName, newCollection); databaseCollections.emplace(collectionId, newCollection); } newShardKeys.emplace(collectionId, std::make_shared>( newCollection->shardKeys())); auto shardIDs = newCollection->shardIds(); auto shards = std::make_shared>(); shards->reserve(shardIDs->size()); for (auto const& p : *shardIDs) { shards->push_back(p.first); newShardServers.emplace(p.first, p.second); } // Sort by the number in the shard ID ("s0000001" for example): std::sort( // sort shards->begin(), // begin shards->end(), // end [](std::string const& a, std::string const& b) -> bool { return std::strtol(a.c_str() + 1, nullptr, 10) < std::strtol(b.c_str() + 1, nullptr, 10); } // comparator ); newShards.emplace(collectionId, std::move(shards)); } catch (std::exception const& ex) { // The plan contains invalid collection information. // This should not happen in healthy situations. // If it happens in unhealthy situations the // cluster should not fail. LOG_TOPIC("359f3", ERR, Logger::AGENCY) << "Failed to load information for collection '" << collectionId << "': " << ex.what() << ". invalid information in plan. The " << "collection will be ignored for now and the invalid " << "information will be repaired. VelocyPack: " << collectionSlice.toJson(); TRI_ASSERT(false); continue; } catch (...) { // The plan contains invalid collection information. // This should not happen in healthy situations. // If it happens in unhealthy situations the // cluster should not fail. LOG_TOPIC("5f3d5", ERR, Logger::AGENCY) << "Failed to load information for collection '" << collectionId << ". invalid information in plan. The collection will " << "be ignored for now and the invalid information will " << "be repaired. VelocyPack: " << collectionSlice.toJson(); TRI_ASSERT(false); continue; } } newCollections.emplace(std::move(databaseName), std::move(databaseCollections)); } LOG_TOPIC("12dfa", DEBUG, Logger::CLUSTER) << "loadPlan done: wantedVersion=" << storedVersion << ", doneVersion=" << _planProt.doneVersion; } WRITE_LOCKER(writeLocker, _planProt.lock); _plan = std::move(planBuilder); _planVersion = newPlanVersion; if (swapDatabases) { _plannedDatabases.swap(newDatabases); } if (swapCollections) { _plannedCollections.swap(newCollections); _shards.swap(newShards); _shardKeys.swap(newShardKeys); _shardServers.swap(newShardServers); } if (swapViews) { _plannedViews.swap(_newPlannedViews); } // mark plan as fully loaded only if all incoming objects were fully loaded // must still swap structures to allow creation of new vocbases and removal of stale datasources if (planValid) { _planProt.doneVersion = storedVersion; _planProt.isValid = true; } } //////////////////////////////////////////////////////////////////////////////// /// @brief (re-)load the information about current databases /// Usually one does not have to call this directly. //////////////////////////////////////////////////////////////////////////////// static std::string const prefixCurrent = "Current"; void ClusterInfo::loadCurrent() { // We need to update ServersKnown to notice rebootId changes for all servers. // To keep things simple and separate, we call loadServers here instead of // trying to integrate the servers upgrade code into loadCurrent, even if that // means small bits of the plan are read twice. loadServers(); ++_currentProt.wantedVersion; // Indicate that after *NOW* somebody has to // reread from the agency! MUTEX_LOCKER(mutexLocker, _currentProt.mutex); // only one may work at a time uint64_t storedVersion = _currentProt.wantedVersion; // this is the version // we will set at the end if (_currentProt.doneVersion == storedVersion) { // Somebody else did, what we intended to do, so just return return; } LOG_TOPIC("54789", DEBUG, Logger::CLUSTER) << "loadCurrent: wantedVersion: " << _currentProt.wantedVersion; // Now contact the agency: AgencyCommResult result = _agency.getValues(prefixCurrent); if (!result.successful()) { LOG_TOPIC("5d4e4", DEBUG, Logger::CLUSTER) << "Error while loading " << prefixCurrent << " httpCode: " << result.httpCode() << " errorCode: " << result.errorCode() << " errorMessage: " << result.errorMessage() << " body: " << result.body(); return; } auto resultSlice = result.slice(); if (!resultSlice.isArray() || resultSlice.length() != 1) { LOG_TOPIC("b020c", DEBUG, Logger::CLUSTER) << "Error while loading " << prefixCurrent << " response structure is not an array of size 1" << " httpCode: " << result.httpCode() << " errorCode: " << result.errorCode() << " errorMessage: " << result.errorMessage() << " body: " << result.body(); return; } auto slice = resultSlice[0].get( // get slice std::vector({AgencyCommManager::path(), "Current"}) // args ); auto currentBuilder = std::make_shared(); currentBuilder->add(slice); auto currentSlice = currentBuilder->slice(); if (!currentSlice.isObject()) { LOG_TOPIC("b8410", ERR, Logger::CLUSTER) << "Current is not an object!"; LOG_TOPIC("eed43", DEBUG, Logger::CLUSTER) << "loadCurrent done."; return; } uint64_t newCurrentVersion = 0; auto currentVersionSlice = currentSlice.get("Version"); if (currentVersionSlice.isNumber()) { try { newCurrentVersion = currentVersionSlice.getNumber(); } catch (...) { } } if (newCurrentVersion == 0) { LOG_TOPIC("e088e", WARN, Logger::CLUSTER) << "Attention: /arango/Current/Version in the agency is not set or not " "a positive number."; } { READ_LOCKER(guard, _currentProt.lock); if (_currentProt.isValid && newCurrentVersion <= _currentVersion) { LOG_TOPIC("00d58", DEBUG, Logger::CLUSTER) << "We already know this or a later version, do not update. " << "newCurrentVersion=" << newCurrentVersion << " _currentVersion=" << _currentVersion; return; } } decltype(_currentDatabases) newDatabases; decltype(_currentCollections) newCollections; decltype(_shardIds) newShardIds; bool swapDatabases = false; bool swapCollections = false; auto currentDatabasesSlice = currentSlice.get("Databases"); if (currentDatabasesSlice.isObject()) { swapDatabases = true; for (auto const& databaseSlicePair : velocypack::ObjectIterator(currentDatabasesSlice)) { auto const database = databaseSlicePair.key.copyString(); if (!databaseSlicePair.value.isObject()) { continue; } std::unordered_map serverList; for (auto const& serverSlicePair : velocypack::ObjectIterator(databaseSlicePair.value)) { serverList.emplace(serverSlicePair.key.copyString(), serverSlicePair.value); } newDatabases.emplace(database, serverList); } } auto currentCollectionsSlice = currentSlice.get("Collections"); if (currentCollectionsSlice.isObject()) { swapCollections = true; for (auto const& databaseSlice : velocypack::ObjectIterator(currentCollectionsSlice)) { auto const databaseName = databaseSlice.key.copyString(); DatabaseCollectionsCurrent databaseCollections; for (auto const& collectionSlice : velocypack::ObjectIterator(databaseSlice.value)) { auto const collectionName = collectionSlice.key.copyString(); auto collectionDataCurrent = std::make_shared(newCurrentVersion); for (auto const& shardSlice : velocypack::ObjectIterator(collectionSlice.value)) { auto const shardID = shardSlice.key.copyString(); collectionDataCurrent->add(shardID, shardSlice.value); // Note that we have only inserted the CollectionInfoCurrent under // the collection ID and not under the name! It is not possible // to query the current collection info by name. This is because // the correct place to hold the current name is in the plan. // Thus: Look there and get the collection ID from there. Then // ask about the current collection info. // Now take note of this shard and its responsible server: auto servers = std::make_shared>( collectionDataCurrent->servers(shardID) // args ); newShardIds.emplace(shardID, servers); } databaseCollections.emplace(collectionName, collectionDataCurrent); } newCollections.emplace(databaseName, databaseCollections); } } // Now set the new value: WRITE_LOCKER(writeLocker, _currentProt.lock); _current = currentBuilder; _currentVersion = newCurrentVersion; if (swapDatabases) { _currentDatabases.swap(newDatabases); } if (swapCollections) { LOG_TOPIC("b4059", TRACE, Logger::CLUSTER) << "Have loaded new collections current cache!"; _currentCollections.swap(newCollections); _shardIds.swap(newShardIds); } _currentProt.doneVersion = storedVersion; _currentProt.isValid = true; } /// @brief ask about a collection /// If it is not found in the cache, the cache is reloaded once /// if the collection is not found afterwards, this method will throw an /// exception std::shared_ptr ClusterInfo::getCollection(DatabaseID const& databaseID, CollectionID const& collectionID) { auto c = getCollectionNT(databaseID, collectionID); if (c == nullptr) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND, getCollectionNotFoundMsg(databaseID, collectionID) // message ); } else { return c; } } std::shared_ptr ClusterInfo::getCollectionNT(DatabaseID const& databaseID, CollectionID const& collectionID) { int tries = 0; if (!_planProt.isValid) { loadPlan(); ++tries; } while (true) { // left by break { READ_LOCKER(readLocker, _planProt.lock); // look up database by id AllCollections::const_iterator it = _plannedCollections.find(databaseID); if (it != _plannedCollections.end()) { // look up collection by id (or by name) DatabaseCollections::const_iterator it2 = (*it).second.find(collectionID); if (it2 != (*it).second.end()) { return (*it2).second; } } } if (++tries >= 2) { break; } // must load collections outside the lock loadPlan(); } return nullptr; } std::string ClusterInfo::getCollectionNotFoundMsg(DatabaseID const& databaseID, CollectionID const& collectionID) { return "Collection not found: " + collectionID + " in database " + databaseID; } //////////////////////////////////////////////////////////////////////////////// /// @brief ask about all collections //////////////////////////////////////////////////////////////////////////////// std::vector> const ClusterInfo::getCollections(DatabaseID const& databaseID) { std::vector> result; // always reload loadPlan(); READ_LOCKER(readLocker, _planProt.lock); // look up database by id AllCollections::const_iterator it = _plannedCollections.find(databaseID); if (it == _plannedCollections.end()) { return result; } // iterate over all collections DatabaseCollections::const_iterator it2 = (*it).second.begin(); while (it2 != (*it).second.end()) { char c = (*it2).first[0]; if (c < '0' || c > '9') { // skip collections indexed by id result.push_back((*it2).second); } ++it2; } return result; } //////////////////////////////////////////////////////////////////////////////// /// @brief ask about a collection in current. This returns information about /// all shards in the collection. /// If it is not found in the cache, the cache is reloaded once. //////////////////////////////////////////////////////////////////////////////// std::shared_ptr ClusterInfo::getCollectionCurrent( DatabaseID const& databaseID, CollectionID const& collectionID) { int tries = 0; if (!_currentProt.isValid) { loadCurrent(); ++tries; } while (true) { { READ_LOCKER(readLocker, _currentProt.lock); // look up database by id AllCollectionsCurrent::const_iterator it = _currentCollections.find(databaseID); if (it != _currentCollections.end()) { // look up collection by id DatabaseCollectionsCurrent::const_iterator it2 = (*it).second.find(collectionID); if (it2 != (*it).second.end()) { return (*it2).second; } } } if (++tries >= 2) { break; } // must load collections outside the lock loadCurrent(); } return std::make_shared(0); } RebootTracker& ClusterInfo::rebootTracker() noexcept { return _rebootTracker; } RebootTracker const& ClusterInfo::rebootTracker() const noexcept { return _rebootTracker; } ////////////////////////////////////////////////////////////////////////////// /// @brief ask about a view /// If it is not found in the cache, the cache is reloaded once. The second /// argument can be a view ID or a view name (both cluster-wide). ////////////////////////////////////////////////////////////////////////////// std::shared_ptr ClusterInfo::getView(DatabaseID const& databaseID, ViewID const& viewID) { if (viewID.empty()) { return nullptr; } auto lookupView = [](AllViews const& dbs, DatabaseID const& databaseID, ViewID const& viewID) noexcept -> std::shared_ptr { // look up database by id auto const db = dbs.find(databaseID); if (db != dbs.end()) { // look up view by id (or by name) auto& views = db->second; auto const view = views.find(viewID); if (view != views.end()) { return view->second; } } return nullptr; }; if (std::this_thread::get_id() == _planLoader) { // we're loading plan, lookup inside immediately created planned views // already protected by _planProt.mutex, don't need to lock there return lookupView(_newPlannedViews, databaseID, viewID); } int tries = 0; if (!_planProt.isValid) { loadPlan(); ++tries; } while (true) { // left by break { READ_LOCKER(readLocker, _planProt.lock); auto const view = lookupView(_plannedViews, databaseID, viewID); if (view) { return view; } } if (++tries >= 2) { break; } // must load plan outside the lock loadPlan(); } LOG_TOPIC("a227e", DEBUG, Logger::CLUSTER) << "View not found: '" << viewID << "' in database '" << databaseID << "'"; return nullptr; } ////////////////////////////////////////////////////////////////////////////// /// @brief ask about all views of a database ////////////////////////////////////////////////////////////////////////////// std::vector> const ClusterInfo::getViews(DatabaseID const& databaseID) { std::vector> result; // always reload loadPlan(); READ_LOCKER(readLocker, _planProt.lock); // look up database by id AllViews::const_iterator it = _plannedViews.find(databaseID); if (it == _plannedViews.end()) { return result; } // iterate over all collections DatabaseViews::const_iterator it2 = (*it).second.begin(); while (it2 != it->second.end()) { char c = it2->first[0]; if (c >= '0' && c <= '9') { // skip views indexed by name result.emplace_back(it2->second); } ++it2; } return result; } // Build the VPackSlice that contains the `isBuilding` entry Result ClusterInfo::buildIsBuildingSlice(methods::CreateDatabaseInfo const& database, VPackBuilder& builder) { database.buildSlice(builder); builder.add(StaticStrings::DatabaseCoordinator, VPackValue(ServerState::instance()->getId())); builder.add(StaticStrings::DatabaseCoordinatorRebootId, VPackValue(ServerState::instance()->getRebootId())); builder.add(StaticStrings::DatabaseIsBuilding, VPackValue(true)); builder.close(); return Result(); } // Build the VPackSlice that does not contain the `isBuilding` entry Result ClusterInfo::buildFinalSlice(methods::CreateDatabaseInfo const& database, VPackBuilder& builder) { database.buildSlice(builder); builder.close(); return Result(); } // This waits for the database described in `database` to turn up in `Current` // and no DBServer is allowed to report an error. Result ClusterInfo::waitForDatabaseInCurrent(methods::CreateDatabaseInfo const& database) { AgencyComm ac; AgencyCommResult res; auto DBServers = std::make_shared>(getCurrentDBServers()); auto dbServerResult = std::make_shared>(-1); std::shared_ptr errMsg = std::make_shared(); std::function dbServerChanged = [=](VPackSlice const& result) { size_t numDbServers = DBServers->size(); if (result.isObject() && result.length() >= numDbServers) { // We use >= here since the number of DBservers could have increased // during the creation of the database and we might not yet have // the latest list. Thus there could be more reports than we know // servers. VPackObjectIterator dbs(result); std::string tmpMsg = ""; bool tmpHaveError = false; for (VPackObjectIterator::ObjectPair dbserver : dbs) { VPackSlice slice = dbserver.value; if (arangodb::basics::VelocyPackHelper::getBooleanValue(slice, "error", false)) { tmpHaveError = true; tmpMsg += " DBServer:" + dbserver.key.copyString() + ":"; tmpMsg += arangodb::basics::VelocyPackHelper::getStringValue(slice, StaticStrings::ErrorMessage, ""); if (slice.hasKey(StaticStrings::ErrorNum)) { VPackSlice errorNum = slice.get(StaticStrings::ErrorNum); if (errorNum.isNumber()) { tmpMsg += " (errorNum="; tmpMsg += basics::StringUtils::itoa(errorNum.getNumericValue()); tmpMsg += ")"; } } } } if (tmpHaveError) { *errMsg = "Error in creation of database:" + tmpMsg; dbServerResult->store(TRI_ERROR_CLUSTER_COULD_NOT_CREATE_DATABASE, std::memory_order_release); return true; } dbServerResult->store(setErrormsg(TRI_ERROR_NO_ERROR, *errMsg), std::memory_order_release); } return true; }; // ATTENTION: The following callback calls the above closure in a // different thread. Nevertheless, the closure accesses some of our // local variables. Therefore we have to protect all accesses to them // by a mutex. We use the mutex of the condition variable in the // AgencyCallback for this. auto agencyCallback = std::make_shared(ac, "Current/Databases/" + database.getName(), dbServerChanged, true, false); _agencyCallbackRegistry->registerCallback(agencyCallback); auto cbGuard = scopeGuard( [&] { _agencyCallbackRegistry->unregisterCallback(agencyCallback); }); // Waits for the database to turn up in Current/Databases { double const interval = getPollInterval(); CONDITION_LOCKER(locker, agencyCallback->_cv); int count = 0; // this counts, when we have to reload the DBServers while (true) { if (++count >= static_cast(getReloadServerListTimeout() / interval)) { // We update the list of DBServers every minute in case one of them // was taken away since we last looked. This also helps (slightly) // if a new DBServer was added. However, in this case we report // success a bit too early, which is not too bad. loadCurrentDBServers(); *DBServers = getCurrentDBServers(); count = 0; } int tmpRes = dbServerResult->load(std::memory_order_acquire); // An error was detected on one of the DBServers if (tmpRes >= 0) { cbGuard.fire(); // unregister cb before accessing errMsg loadCurrent(); // update our cache return Result(tmpRes, *errMsg); } agencyCallback->executeByCallbackOrTimeout(getReloadServerListTimeout() / interval); if (application_features::ApplicationServer::isStopping()) { return Result(TRI_ERROR_CLUSTER_TIMEOUT); } } } } // Start creating a database in a coordinator by entering it into Plan/Databases with, // status flag `isBuilding`; this makes the database invisible to the outside world. Result ClusterInfo::createIsBuildingDatabaseCoordinator(methods::CreateDatabaseInfo const& database) { AgencyComm ac; AgencyCommResult res; // Instruct the Agency to enter the creation of the new database // by entering it into Plan/Databases/ but with the fields // isBuilding: true, and coordinator and rebootId set to our respective // id and rebootId VPackBuilder builder; buildIsBuildingSlice(database, builder); AgencyWriteTransaction trx( {AgencyOperation("Plan/Databases/" + database.getName(), AgencyValueOperationType::SET, builder.slice()), AgencyOperation("Plan/Version", AgencySimpleOperationType::INCREMENT_OP)}, AgencyPrecondition("Plan/Databases/" + database.getName(), AgencyPrecondition::Type::EMPTY, true)); // TODO: Should this never timeout? res = ac.sendTransactionWithFailover(trx, 0.0); if (!res.successful()) { if (res._statusCode == (int)arangodb::rest::ResponseCode::PRECONDITION_FAILED) { return Result(TRI_ERROR_ARANGO_DUPLICATE_NAME); } return Result(TRI_ERROR_CLUSTER_COULD_NOT_CREATE_DATABASE_IN_PLAN); } // Now update our own cache of planned databases: loadPlan(); // And wait for our database to show up in `Current/Databases` auto waitresult = waitForDatabaseInCurrent(database); if (waitresult.fail()) { // cleanup: remove database from plan auto ret = cancelCreateDatabaseCoordinator(database); if (ret.ok()) { // Creation failed return Result(TRI_ERROR_CLUSTER_COULD_NOT_CREATE_DATABASE, "database creation failed"); } else { // Cleanup failed too return ret; } } return Result(); } // Finalize creation of database in cluster by removing isBuilding, coordinator, and coordinatorRebootId; // as precondition that the entry we put in createIsBuildingDatabaseCoordinator is still in Plan/ unchanged. Result ClusterInfo::createFinalizeDatabaseCoordinator(methods::CreateDatabaseInfo const& database) { AgencyComm ac; VPackBuilder pcBuilder; buildIsBuildingSlice(database, pcBuilder); VPackBuilder entryBuilder; buildFinalSlice(database, entryBuilder); AgencyWriteTransaction trx( {AgencyOperation("Plan/Databases/" + database.getName(), AgencyValueOperationType::SET, entryBuilder.slice()), AgencyOperation("Plan/Version", AgencySimpleOperationType::INCREMENT_OP)}, AgencyPrecondition("Plan/Databases/" + database.getName(), AgencyPrecondition::Type::VALUE, pcBuilder.slice())); auto res = ac.sendTransactionWithFailover(trx, 0.0); if (!res.successful()) { if (res._statusCode == (int)arangodb::rest::ResponseCode::PRECONDITION_FAILED) { return Result(TRI_ERROR_CLUSTER_COULD_NOT_CREATE_DATABASE, "Could not finish creation of database: Plan/Databases/ " "entry was modified in Agency"); } // Something else went wrong. return Result(TRI_ERROR_CLUSTER_COULD_NOT_CREATE_DATABASE); } // Make sure we're all aware of collections that have been created loadPlan(); // The transaction was successful and the database should // now be visible and usable. return Result(); } // This function can only return on success or when the cluster is shutting down. Result ClusterInfo::cancelCreateDatabaseCoordinator(methods::CreateDatabaseInfo const& database) { AgencyComm ac; VPackBuilder builder; buildIsBuildingSlice(database, builder); // delete all collections and the database itself from the agency plan AgencyOperation delPlanCollections("Plan/Collections/" + database.getName(), AgencySimpleOperationType::DELETE_OP); AgencyOperation delPlanDatabase("Plan/Databases/" + database.getName(), AgencySimpleOperationType::DELETE_OP); AgencyOperation incrPlan("Plan/Version", AgencySimpleOperationType::INCREMENT_OP); AgencyPrecondition preCondition("Plan/Databases/" + database.getName(), AgencyPrecondition::Type::VALUE, builder.slice()); AgencyWriteTransaction trx({delPlanCollections, delPlanDatabase, incrPlan}, preCondition); size_t tries = 0; // TODO: Hard coded timeout, also the retry numbers below are all // pulled out of thin air. double nextTimeout = 0.5; AgencyCommResult res; do { tries++; res = ac.sendTransactionWithFailover(trx, nextTimeout); if (!res.successful()) { if (tries == 1) { events::CreateDatabase(database.getName(), res.errorCode()); } if (tries >= 5) { nextTimeout = 5.0; } LOG_TOPIC("b47aa", WARN, arangodb::Logger::CLUSTER) << "failed to cancel creation of database " << database.getName() << " with error " << res.errorMessage() << ". Retrying."; } if (application_features::ApplicationServer::isStopping()) { return Result(TRI_ERROR_CLUSTER_TIMEOUT); } } while(!res.successful()); return Result(); } //////////////////////////////////////////////////////////////////////////////// /// @brief drop database in coordinator, the return value is an ArangoDB /// error code and the errorMsg is set accordingly. One possible error /// is a timeout, a timeout of 0.0 means no timeout. //////////////////////////////////////////////////////////////////////////////// Result ClusterInfo::dropDatabaseCoordinator( // drop database std::string const& name, // database name double timeout // request timeout ) { TRI_ASSERT(ServerState::instance()->isCoordinator()); if (name == TRI_VOC_SYSTEM_DATABASE) { return Result(TRI_ERROR_FORBIDDEN); } AgencyComm ac; double const realTimeout = getTimeout(timeout); double const endTime = TRI_microtime() + realTimeout; double const interval = getPollInterval(); auto dbServerResult = std::make_shared>(-1); std::function dbServerChanged = [=](VPackSlice const& result) { if (result.isNone() || result.isEmptyObject()) { dbServerResult->store(TRI_ERROR_NO_ERROR, std::memory_order_release); } return true; }; std::string where("Current/Databases/" + name); // ATTENTION: The following callback calls the above closure in a // different thread. Nevertheless, the closure accesses some of our // local variables. Therefore we have to protect all accesses to them // by a mutex. We use the mutex of the condition variable in the // AgencyCallback for this. auto agencyCallback = std::make_shared(ac, where, dbServerChanged, true, false); _agencyCallbackRegistry->registerCallback(agencyCallback); auto cbGuard = scopeGuard([this, &agencyCallback]() -> void { _agencyCallbackRegistry->unregisterCallback(agencyCallback); }); // Transact to agency AgencyOperation delPlanDatabases("Plan/Databases/" + name, AgencySimpleOperationType::DELETE_OP); AgencyOperation delPlanCollections("Plan/Collections/" + name, AgencySimpleOperationType::DELETE_OP); AgencyOperation delPlanViews("Plan/Views/" + name, AgencySimpleOperationType::DELETE_OP); AgencyOperation incrementVersion("Plan/Version", AgencySimpleOperationType::INCREMENT_OP); AgencyPrecondition databaseExists("Plan/Databases/" + name, AgencyPrecondition::Type::EMPTY, false); AgencyWriteTransaction trans({delPlanDatabases, delPlanCollections, delPlanViews, incrementVersion}, databaseExists); AgencyCommResult res = ac.sendTransactionWithFailover(trans); if (!res.successful()) { if (res._statusCode == (int)arangodb::rest::ResponseCode::PRECONDITION_FAILED) { return Result(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND); } return Result(TRI_ERROR_CLUSTER_COULD_NOT_REMOVE_DATABASE_IN_PLAN); } // Load our own caches: loadPlan(); // Now wait stuff in Current to disappear and thus be complete: { CONDITION_LOCKER(locker, agencyCallback->_cv); while (true) { if (dbServerResult->load(std::memory_order_acquire) >= 0) { cbGuard.fire(); // unregister cb before calling ac.removeValues(...) res = ac.removeValues(where, true); if (res.successful()) { return Result(TRI_ERROR_NO_ERROR); } return Result(TRI_ERROR_CLUSTER_COULD_NOT_REMOVE_DATABASE_IN_CURRENT); } if (TRI_microtime() > endTime) { logAgencyDump(); return Result(TRI_ERROR_CLUSTER_TIMEOUT); } agencyCallback->executeByCallbackOrTimeout(interval); if (application_features::ApplicationServer::isStopping()) { return Result(TRI_ERROR_CLUSTER_TIMEOUT); } } } } //////////////////////////////////////////////////////////////////////////////// /// @brief create collection in coordinator, the return value is an ArangoDB /// error code and the errorMsg is set accordingly. One possible error /// is a timeout, a timeout of 0.0 means no timeout. //////////////////////////////////////////////////////////////////////////////// Result ClusterInfo::createCollectionCoordinator( // create collection std::string const& databaseName, std::string const& collectionID, uint64_t numberOfShards, uint64_t replicationFactor, uint64_t minReplicationFactor, bool waitForReplication, velocypack::Slice const& json, // collection definition double timeout, // request timeout, bool isNewDatabase, std::shared_ptr const& colToDistributeShardsLike) { TRI_ASSERT(ServerState::instance()->isCoordinator()); std::vector infos{ ClusterCollectionCreationInfo{collectionID, numberOfShards, replicationFactor, minReplicationFactor, waitForReplication, json}}; double const realTimeout = getTimeout(timeout); double const endTime = TRI_microtime() + realTimeout; return createCollectionsCoordinator(databaseName, infos, endTime, isNewDatabase, colToDistributeShardsLike); } /// @brief this method does an atomic check of the preconditions for the /// collections to be created, using the currently loaded plan. it populates the /// plan version used for the checks Result ClusterInfo::checkCollectionPreconditions(std::string const& databaseName, std::vector const& infos, uint64_t& planVersion) { for (auto const& info : infos) { // Check if name exists. if (info.name.empty() || !info.json.isObject() || !info.json.get("shards").isObject()) { return TRI_ERROR_BAD_PARAMETER; // must not be empty } // Validate that the collection does not exist in the current plan { AllCollections::const_iterator it = _plannedCollections.find(databaseName); if (it != _plannedCollections.end()) { DatabaseCollections::const_iterator it2 = (*it).second.find(info.name); if (it2 != (*it).second.end()) { // collection already exists! events::CreateCollection(databaseName, info.name, TRI_ERROR_ARANGO_DUPLICATE_NAME); return Result(TRI_ERROR_ARANGO_DUPLICATE_NAME, std::string("duplicate collection name '") + info.name + "'"); } } else { // no collection in plan for this particular database... this may be true for // the first collection created in a db // now check if there is a planned database at least if (_plannedDatabases.find(databaseName) == _plannedDatabases.end()) { // no need to create a collection in a database that is not there (anymore) events::CreateCollection(databaseName, info.name, TRI_ERROR_ARANGO_DATABASE_NOT_FOUND); return TRI_ERROR_ARANGO_DATABASE_NOT_FOUND; } } } // Validate that there is no view with this name either { // check against planned views as well AllViews::const_iterator it = _plannedViews.find(databaseName); if (it != _plannedViews.end()) { DatabaseViews::const_iterator it2 = (*it).second.find(info.name); if (it2 != (*it).second.end()) { // view already exists! events::CreateCollection(databaseName, info.name, TRI_ERROR_ARANGO_DUPLICATE_NAME); return Result(TRI_ERROR_ARANGO_DUPLICATE_NAME, std::string("duplicate collection name '") + info.name + "'"); } } } } return {}; } Result ClusterInfo::createCollectionsCoordinator( std::string const& databaseName, std::vector& infos, double endTime, bool isNewDatabase, std::shared_ptr const& colToDistributeShardsLike) { TRI_ASSERT(ServerState::instance()->isCoordinator()); using arangodb::velocypack::Slice; double const interval = getPollInterval(); // The following three are used for synchronization between the callback // closure and the main thread executing this function. Note that it can // happen that the callback is called only after we return from this // function! auto dbServerResult = std::make_shared>(-1); auto nrDone = std::make_shared>(0); auto errMsg = std::make_shared(); auto cacheMutex = std::make_shared(); auto cacheMutexOwner = std::make_shared>(); auto isCleaned = std::make_shared(false); AgencyComm ac; std::vector> agencyCallbacks; auto cbGuard = scopeGuard([&] { // We have a subtle race here, that we try to cover against: // We register a callback in the agency. // For some reason this scopeguard is executed (e.g. error case) // While we are in this cleanup, and before a callback is removed from the // agency. The callback is triggered by another threat. We have the // following guarantees: a) cacheMutex|Owner are valid and locked by cleanup // b) isCleand is valid and now set to true // c) the closure is owned by the callback // d) info might be deleted, so we cannot use it. // e) If the callback is ongoing during cleanup, the callback will // hold the Mutex and delay the cleanup. RECURSIVE_MUTEX_LOCKER(*cacheMutex, *cacheMutexOwner); *isCleaned = true; for (auto& cb : agencyCallbacks) { _agencyCallbackRegistry->unregisterCallback(cb); } }); std::vector opers({IncreaseVersion()}); std::vector precs; std::unordered_set conditions; // current thread owning 'cacheMutex' write lock (workaround for non-recursive Mutex) for (auto& info : infos) { TRI_ASSERT(!info.name.empty()); if (info.state == ClusterCollectionCreationInfo::State::DONE) { // This is possible in Enterprise / Smart Collection situation (*nrDone)++; } std::map> shardServers; for (auto const& pair : VPackObjectIterator(info.json.get("shards"))) { ShardID shardID = pair.key.copyString(); std::vector serverIds; for (auto const& serv : VPackArrayIterator(pair.value)) { serverIds.emplace_back(serv.copyString()); } shardServers.emplace(shardID, serverIds); } // The AgencyCallback will copy the closure will take responsibilty of it. auto closure = [cacheMutex, cacheMutexOwner, &info, dbServerResult, errMsg, nrDone, isCleaned, shardServers, this](VPackSlice const& result) { // NOTE: This ordering here is important to cover against a race in cleanup. // a) The Guard get's the Mutex, sets isCleaned == true, then removes the callback // b) If the callback is aquired it is saved in a shared_ptr, the Mutex will be aquired first, then it will check if it isCleaned RECURSIVE_MUTEX_LOCKER(*cacheMutex, *cacheMutexOwner); if (*isCleaned) { return true; } TRI_ASSERT(!info.name.empty()); if (info.state != ClusterCollectionCreationInfo::State::INIT) { // All leaders have reported either good or bad // We might be called by followers if they get in sync fast enough // In this IF we are in the followers case, we can savely ignore return true; } // result is the object at the path if (result.isObject() && result.length() == (size_t)info.numberOfShards) { std::string tmpError = ""; for (auto const& p : VPackObjectIterator(result)) { if (arangodb::basics::VelocyPackHelper::getBooleanValue(p.value, "error", false)) { tmpError += " shardID:" + p.key.copyString() + ":"; tmpError += arangodb::basics::VelocyPackHelper::getStringValue( p.value, "errorMessage", ""); if (p.value.hasKey(StaticStrings::ErrorNum)) { VPackSlice const errorNum = p.value.get(StaticStrings::ErrorNum); if (errorNum.isNumber()) { tmpError += " (errNum="; tmpError += basics::StringUtils::itoa(errorNum.getNumericValue()); tmpError += ")"; } } } // wait that all followers have created our new collection if (tmpError.empty() && info.waitForReplication) { std::vector plannedServers; { READ_LOCKER(readLocker, _planProt.lock); auto it = shardServers.find(p.key.copyString()); if (it != shardServers.end()) { plannedServers = (*it).second; } else { LOG_TOPIC("9ed54", ERR, Logger::CLUSTER) << "Did not find shard in _shardServers: " << p.key.copyString() << ". Maybe the collection is already dropped."; *errMsg = "Error in creation of collection: " + p.key.copyString() + ". Collection already dropped. " + __FILE__ + ":" + std::to_string(__LINE__); dbServerResult->store(TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION, std::memory_order_release); TRI_ASSERT(info.state != ClusterCollectionCreationInfo::State::DONE); info.state = ClusterCollectionCreationInfo::FAILED; return true; } } if (plannedServers.empty()) { READ_LOCKER(readLocker, _planProt.lock); LOG_TOPIC("a0a76", DEBUG, Logger::CLUSTER) << "This should never have happened, Plan empty. Dumping " "_shards in Plan:"; for (auto const& p : _shards) { LOG_TOPIC("60c7d", DEBUG, Logger::CLUSTER) << "Shard: " << p.first; for (auto const& q : *(p.second)) { LOG_TOPIC("c7363", DEBUG, Logger::CLUSTER) << " Server: " << q; } } TRI_ASSERT(false); } std::vector currentServers; VPackSlice servers = p.value.get("servers"); if (!servers.isArray()) { return true; } for (auto const& server : VPackArrayIterator(servers)) { if (!server.isString()) { return true; } currentServers.push_back(server.copyString()); } if (!ClusterHelpers::compareServerLists(plannedServers, currentServers)) { TRI_ASSERT(!info.name.empty()); LOG_TOPIC("16623", DEBUG, Logger::CLUSTER) << "Still waiting for all servers to ACK creation of " << info.name << ". Planned: " << plannedServers << ", Current: " << currentServers; return true; } } } if (!tmpError.empty()) { *errMsg = "Error in creation of collection:" + tmpError + " " + __FILE__ + std::to_string(__LINE__); dbServerResult->store(TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION, std::memory_order_release); // We cannot get into bad state after a collection was created TRI_ASSERT(info.state != ClusterCollectionCreationInfo::State::DONE); info.state = ClusterCollectionCreationInfo::FAILED; } else { // We can have multiple calls to this callback, one per leader and one per follower // As soon as all leaders are done we are either FAILED or DONE, this cannot be altered later. TRI_ASSERT(info.state != ClusterCollectionCreationInfo::State::FAILED); info.state = ClusterCollectionCreationInfo::DONE; (*nrDone)++; } } return true; }; // ATTENTION: The following callback calls the above closure in a // different thread. Nevertheless, the closure accesses some of our // local variables. Therefore we have to protect all accesses to them // by a mutex. We use the mutex of the condition variable in the // AgencyCallback for this. auto agencyCallback = std::make_shared(ac, "Current/Collections/" + databaseName + "/" + info.collectionID, closure, true, false); _agencyCallbackRegistry->registerCallback(agencyCallback); agencyCallbacks.emplace_back(std::move(agencyCallback)); opers.emplace_back(CreateCollectionOrder(databaseName, info.collectionID, info.isBuildingSlice(), 240.0)); // Ensure preconditions on the agency std::shared_ptr otherCidShardMap = nullptr; auto const otherCidString = basics::VelocyPackHelper::getStringValue(info.json, StaticStrings::DistributeShardsLike, StaticStrings::Empty); if (!otherCidString.empty() && conditions.find(otherCidString) == conditions.end()) { // Distribute shards like case. // Precondition: Master collection is not moving while we create this // collection We only need to add these once for every Master, we cannot // add multiple because we will end up with duplicate entries. // NOTE: We do not need to add all collections created here, as they will not succeed // In callbacks if they are moved during creation. // If they are moved after creation was reported success they are under protection by Supervision. conditions.emplace(otherCidString); if (colToDistributeShardsLike != nullptr) { otherCidShardMap = colToDistributeShardsLike->shardIds(); } else { otherCidShardMap = getCollection(databaseName, otherCidString)->shardIds(); } // Any of the shards locked? for (auto const& shard : *otherCidShardMap) { precs.emplace_back(AgencyPrecondition("Supervision/Shards/" + shard.first, AgencyPrecondition::Type::EMPTY, true)); } } // additionally ensure that no such collectionID exists yet in Plan/Collections precs.emplace_back(AgencyPrecondition("Plan/Collections/" + databaseName + "/" + info.collectionID, AgencyPrecondition::Type::EMPTY, true)); } // We need to make sure our plan is up to date. LOG_TOPIC("f4b14", DEBUG, Logger::CLUSTER) << "createCollectionCoordinator, loading Plan from agency..."; // load the plan, so we are up-to-date loadPlan(); uint64_t planVersion = 0; // will be populated by following function call { READ_LOCKER(readLocker, _planProt.lock); planVersion = _planVersion; if (!isNewDatabase) { Result res = checkCollectionPreconditions(databaseName, infos, planVersion); if (res.fail()) { return res; } } } // now try to update the plan in the agency, using the current plan version as // our precondition { // create a builder with just the version number for comparison VPackBuilder versionBuilder; versionBuilder.add(VPackValue(planVersion)); // add a precondition that checks the plan version has not yet changed precs.emplace_back(AgencyPrecondition("Plan/Version", AgencyPrecondition::Type::VALUE, versionBuilder.slice())); AgencyWriteTransaction transaction(opers, precs); { // we hold this mutex from now on until we have updated our cache // using loadPlan, this is necessary for the callback closure to // see the new planned state for this collection. Otherwise it cannot // recognize completion of the create collection operation properly: RECURSIVE_MUTEX_LOCKER(*cacheMutex, *cacheMutexOwner); auto res = ac.sendTransactionWithFailover(transaction); // Only if not precondition failed if (!res.successful()) { if (res.httpCode() == (int)arangodb::rest::ResponseCode::PRECONDITION_FAILED) { // use this special error code to signal that we got a precondition failure // in this case the caller can try again with an updated version of the plan change return {TRI_ERROR_REQUEST_CANCELED, "operation aborted due to precondition failure"}; } std::string errorMsg = "HTTP code: " + std::to_string(res.httpCode()); errorMsg += " error message: " + res.errorMessage(); errorMsg += " error details: " + res.errorDetails(); errorMsg += " body: " + res.body(); for (auto const& info : infos) { events::CreateCollection(databaseName, info.name, TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION_IN_PLAN); } return {TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION_IN_PLAN, std::move(errorMsg)}; } // Update our cache: loadPlan(); } } LOG_TOPIC("98bca", DEBUG, Logger::CLUSTER) << "createCollectionCoordinator, Plan changed, waiting for success..."; do { int tmpRes = dbServerResult->load(std::memory_order_acquire); if (TRI_microtime() > endTime) { for (auto const& info : infos) { LOG_TOPIC("f6b57", ERR, Logger::CLUSTER) << "Timeout in _create collection" << ": database: " << databaseName << ", collId:" << info.collectionID << "\njson: " << info.json.toString(); } // Get a full agency dump for debugging logAgencyDump(); if (tmpRes <= TRI_ERROR_NO_ERROR) { tmpRes = TRI_ERROR_CLUSTER_TIMEOUT; } } if (nrDone->load(std::memory_order_acquire) == infos.size()) { // We do not need to lock all condition variables // we are save by cacheMutex cbGuard.fire(); // Now we need to remove TTL + the IsBuilding flag in Agency opers.clear(); precs.clear(); opers.push_back(IncreaseVersion()); for (auto const& info : infos) { opers.emplace_back( CreateCollectionSuccess(databaseName, info.collectionID, info.json)); // NOTE: // We cannot do anything better than: "noone" has modified our collections while // we tried to create them... // Preconditions cover against supervision jobs injecting other leaders / followers during failovers. // If they are it is not valid to confirm them here. (bad luck we were almost there) precs.emplace_back(CreateCollectionOrderPrecondition(databaseName, info.collectionID, info.isBuildingSlice())); } AgencyWriteTransaction transaction(opers, precs); // This is a best effort, in the worst case the collection stays, but will // be cleaned out by ttl auto res = ac.sendTransactionWithFailover(transaction); // Report if this operation worked, if it failed collections will be cleaned up eventually for (auto const& info : infos) { TRI_ASSERT(info.state == ClusterCollectionCreationInfo::State::DONE); events::CreateCollection(databaseName, info.name, res.errorCode()); } loadCurrent(); return res.errorCode(); } if (tmpRes > TRI_ERROR_NO_ERROR) { // We do not need to lock all condition variables // we are safe by using cacheMutex cbGuard.fire(); // report error for (auto const& info : infos) { // Report first error. // On timeout report it on all not finished ones. if (info.state == ClusterCollectionCreationInfo::State::FAILED || (tmpRes == TRI_ERROR_CLUSTER_TIMEOUT && info.state == ClusterCollectionCreationInfo::State::INIT)) { events::CreateCollection(databaseName, info.name, tmpRes); } } loadCurrent(); return {tmpRes, *errMsg}; } // If we get here we have not tried anything. // Wait on callbacks. if (application_features::ApplicationServer::isStopping()) { // Report shutdown on all collections for (auto const& info : infos) { events::CreateCollection(databaseName, info.name, TRI_ERROR_SHUTTING_DOWN); } return TRI_ERROR_SHUTTING_DOWN; } // Wait for Callbacks to be triggered, it is sufficent to wait for the first non, done TRI_ASSERT(agencyCallbacks.size() == infos.size()); for (size_t i = 0; i < infos.size(); ++i) { if (infos[i].state == ClusterCollectionCreationInfo::INIT) { bool gotTimeout; { // This one has not responded, wait for it. CONDITION_LOCKER(locker, agencyCallbacks[i]->_cv); gotTimeout = agencyCallbacks[i]->executeByCallbackOrTimeout(interval); } if (gotTimeout) { ++i; // We got woken up by waittime, not by callback. // Let us check if we skipped other callbacks as well for (; i < infos.size(); ++i) { if (infos[i].state == ClusterCollectionCreationInfo::INIT) { agencyCallbacks[i]->refetchAndUpdate(true, false); } } } break; } } } while (!application_features::ApplicationServer::isStopping()); // If we get here we are not allowed to retry. // The loop above does not contain a break TRI_ASSERT(application_features::ApplicationServer::isStopping()); for (auto const& info : infos) { events::CreateCollection(databaseName, info.name, TRI_ERROR_SHUTTING_DOWN); } return Result{TRI_ERROR_SHUTTING_DOWN}; } //////////////////////////////////////////////////////////////////////////////// /// @brief drop collection in coordinator, the return value is an ArangoDB /// error code and the errorMsg is set accordingly. One possible error /// is a timeout, a timeout of 0.0 means no timeout. //////////////////////////////////////////////////////////////////////////////// Result ClusterInfo::dropCollectionCoordinator( // drop collection std::string const& dbName, // database name std::string const& collectionID, double timeout // request timeout ) { TRI_ASSERT(ServerState::instance()->isCoordinator()); if (dbName.empty() || (dbName[0] > '0' && dbName[0] < '9')) { events::DropCollection(dbName, collectionID, TRI_ERROR_ARANGO_DATABASE_NAME_INVALID); return Result(TRI_ERROR_ARANGO_DATABASE_NAME_INVALID); } AgencyComm ac; AgencyCommResult res; // First check that no other collection has a distributeShardsLike // entry pointing to us: auto coll = getCollection(dbName, collectionID); auto colls = getCollections(dbName); // reloads plan std::vector clones; for (std::shared_ptr const& p : colls) { if (p->distributeShardsLike() == coll->name() || p->distributeShardsLike() == collectionID) { clones.push_back(p->name()); } } if (!clones.empty()) { std::string errorMsg("Collection "); errorMsg += coll->name(); errorMsg += " must not be dropped while "; errorMsg += arangodb::basics::StringUtils::join(clones, ", "); if (clones.size() == 1) { errorMsg += " has "; } else { errorMsg += " have "; }; errorMsg += "distributeShardsLike set to "; errorMsg += coll->name(); errorMsg += "."; events::DropCollection(dbName, collectionID, TRI_ERROR_CLUSTER_MUST_NOT_DROP_COLL_OTHER_DISTRIBUTESHARDSLIKE); return Result( // result TRI_ERROR_CLUSTER_MUST_NOT_DROP_COLL_OTHER_DISTRIBUTESHARDSLIKE, // code errorMsg // message ); } double const realTimeout = getTimeout(timeout); double const endTime = TRI_microtime() + realTimeout; double const interval = getPollInterval(); auto dbServerResult = std::make_shared>(-1); std::function dbServerChanged = [=](VPackSlice const& result) { if (result.isNone() || result.isEmptyObject()) { dbServerResult->store(TRI_ERROR_NO_ERROR, std::memory_order_release); } return true; }; // monitor the entry for the collection std::string const where = "Current/Collections/" + dbName + "/" + collectionID; // ATTENTION: The following callback calls the above closure in a // different thread. Nevertheless, the closure accesses some of our // local variables. Therefore we have to protect all accesses to them // by a mutex. We use the mutex of the condition variable in the // AgencyCallback for this. auto agencyCallback = std::make_shared(ac, where, dbServerChanged, true, false); _agencyCallbackRegistry->registerCallback(agencyCallback); auto cbGuard = scopeGuard([this, &agencyCallback]() -> void { _agencyCallbackRegistry->unregisterCallback(agencyCallback); }); size_t numberOfShards = 0; res = ac.getValues("Plan/Collections/" + dbName + "/" + collectionID + "/shards"); if (res.successful()) { velocypack::Slice databaseSlice = res.slice()[0].get(std::vector( {AgencyCommManager::path(), "Plan", "Collections", dbName})); if (!databaseSlice.isObject()) { // database dropped in the meantime events::DropCollection(dbName, collectionID, TRI_ERROR_ARANGO_DATABASE_NOT_FOUND); return TRI_ERROR_ARANGO_DATABASE_NOT_FOUND; } velocypack::Slice collectionSlice = databaseSlice.get(collectionID); if (!collectionSlice.isObject()) { // collection dropped in the meantime events::DropCollection(dbName, collectionID, TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND); return TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND; } velocypack::Slice shardsSlice = collectionSlice.get("shards"); if (shardsSlice.isObject()) { numberOfShards = shardsSlice.length(); } else { LOG_TOPIC("d340d", ERR, Logger::CLUSTER) << "Missing shards information on dropping " << dbName << "/" << collectionID; events::DropCollection(dbName, collectionID, TRI_ERROR_ARANGO_DATABASE_NOT_FOUND); return Result(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND); } } // Transact to agency AgencyOperation delPlanCollection("Plan/Collections/" + dbName + "/" + collectionID, AgencySimpleOperationType::DELETE_OP); AgencyOperation incrementVersion("Plan/Version", AgencySimpleOperationType::INCREMENT_OP); AgencyPrecondition precondition = AgencyPrecondition("Plan/Databases/" + dbName, AgencyPrecondition::Type::EMPTY, false); AgencyWriteTransaction trans({delPlanCollection, incrementVersion}, precondition); res = ac.sendTransactionWithFailover(trans); if (!res.successful()) { if (res.httpCode() == (int)arangodb::rest::ResponseCode::PRECONDITION_FAILED) { LOG_TOPIC("279c5", ERR, Logger::CLUSTER) << "Precondition failed for this agency transaction: " << trans.toJson() << ", return code: " << res.httpCode(); } logAgencyDump(); // TODO: this should rather be TRI_ERROR_ARANGO_DATABASE_NOT_FOUND, as the // precondition is that the database still exists events::DropCollection(dbName, collectionID, TRI_ERROR_CLUSTER_COULD_NOT_DROP_COLLECTION); return Result(TRI_ERROR_CLUSTER_COULD_NOT_DROP_COLLECTION); } // Update our own cache: loadPlan(); if (numberOfShards == 0) { loadCurrent(); events::DropCollection(dbName, collectionID, TRI_ERROR_NO_ERROR); return Result(TRI_ERROR_NO_ERROR); } { CONDITION_LOCKER(locker, agencyCallback->_cv); while (true) { if (*dbServerResult >= 0) { cbGuard.fire(); // unregister cb before calling ac.removeValues(...) // ...remove the entire directory for the collection ac.removeValues("Current/Collections/" + dbName + "/" + collectionID, true); loadCurrent(); events::DropCollection(dbName, collectionID, *dbServerResult); return Result(*dbServerResult); } if (TRI_microtime() > endTime) { LOG_TOPIC("76ea6", ERR, Logger::CLUSTER) << "Timeout in _drop collection (" << realTimeout << ")" << ": database: " << dbName << ", collId:" << collectionID << "\ntransaction sent to agency: " << trans.toJson(); logAgencyDump(); events::DropCollection(dbName, collectionID, TRI_ERROR_CLUSTER_TIMEOUT); return Result(TRI_ERROR_CLUSTER_TIMEOUT); } agencyCallback->executeByCallbackOrTimeout(interval); if (application_features::ApplicationServer::isStopping()) { events::DropCollection(dbName, collectionID, TRI_ERROR_CLUSTER_TIMEOUT); return Result(TRI_ERROR_CLUSTER_TIMEOUT); } } } } //////////////////////////////////////////////////////////////////////////////// /// @brief set collection properties in coordinator //////////////////////////////////////////////////////////////////////////////// Result ClusterInfo::setCollectionPropertiesCoordinator(std::string const& databaseName, std::string const& collectionID, LogicalCollection const* info) { TRI_ASSERT(ServerState::instance()->isCoordinator()); AgencyComm ac; AgencyCommResult res; AgencyPrecondition databaseExists("Plan/Databases/" + databaseName, AgencyPrecondition::Type::EMPTY, false); AgencyOperation incrementVersion("Plan/Version", AgencySimpleOperationType::INCREMENT_OP); res = ac.getValues("Plan/Collections/" + databaseName + "/" + collectionID); if (!res.successful()) { return Result(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND); } velocypack::Slice collection = res.slice()[0].get(std::vector( {AgencyCommManager::path(), "Plan", "Collections", databaseName, collectionID})); if (!collection.isObject()) { return Result(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND); } VPackBuilder temp; temp.openObject(); temp.add(StaticStrings::WaitForSyncString, VPackValue(info->waitForSync())); temp.add("replicationFactor", VPackValue(info->replicationFactor())); temp.add("minReplicationFactor", VPackValue(info->minReplicationFactor())); info->getPhysical()->getPropertiesVPack(temp); temp.close(); VPackBuilder builder = VPackCollection::merge(collection, temp.slice(), true); res.clear(); AgencyOperation setColl("Plan/Collections/" + databaseName + "/" + collectionID, AgencyValueOperationType::SET, builder.slice()); AgencyWriteTransaction trans({setColl, incrementVersion}, databaseExists); res = ac.sendTransactionWithFailover(trans); if (res.successful()) { loadPlan(); return Result(); } return Result(TRI_ERROR_CLUSTER_AGENCY_COMMUNICATION_FAILED, res.errorMessage()); } //////////////////////////////////////////////////////////////////////////////// /// @brief create view in coordinator, the return value is an ArangoDB /// error code and the errorMsg is set accordingly. One possible error /// is a timeout, a timeout of 0.0 means no timeout. //////////////////////////////////////////////////////////////////////////////// Result ClusterInfo::createViewCoordinator( // create view std::string const& databaseName, // database name std::string const& viewID, velocypack::Slice json // view definition ) { // TRI_ASSERT(ServerState::instance()->isCoordinator()); // FIXME TODO is this check required? auto const typeSlice = json.get(arangodb::StaticStrings::DataSourceType); if (!typeSlice.isString()) { std::string name; if (json.isObject()) { name = basics::VelocyPackHelper::getStringValue(json, StaticStrings::DataSourceName, ""); } events::CreateView(databaseName, name, TRI_ERROR_BAD_PARAMETER); return Result(TRI_ERROR_BAD_PARAMETER); } auto const name = basics::VelocyPackHelper::getStringValue(json, arangodb::StaticStrings::DataSourceName, StaticStrings::Empty); if (name.empty()) { events::CreateView(databaseName, name, TRI_ERROR_BAD_PARAMETER); return Result(TRI_ERROR_BAD_PARAMETER); // must not be empty } { // check if a view with the same name is already planned loadPlan(); READ_LOCKER(readLocker, _planProt.lock); { AllViews::const_iterator it = _plannedViews.find(databaseName); if (it != _plannedViews.end()) { DatabaseViews::const_iterator it2 = (*it).second.find(name); if (it2 != (*it).second.end()) { // view already exists! events::CreateView(databaseName, name, TRI_ERROR_ARANGO_DUPLICATE_NAME); return Result(TRI_ERROR_ARANGO_DUPLICATE_NAME, std::string("duplicate view name '") + name + "'"); } } } { // check against planned collections as well AllCollections::const_iterator it = _plannedCollections.find(databaseName); if (it != _plannedCollections.end()) { DatabaseCollections::const_iterator it2 = (*it).second.find(name); if (it2 != (*it).second.end()) { // collection already exists! events::CreateCollection(databaseName, name, TRI_ERROR_ARANGO_DUPLICATE_NAME); return Result(TRI_ERROR_ARANGO_DUPLICATE_NAME, std::string("duplicate view name '") + name + "'"); } } } } AgencyComm ac; // mop: why do these ask the agency instead of checking cluster info? if (!ac.exists("Plan/Databases/" + databaseName)) { events::CreateView(databaseName, name, TRI_ERROR_ARANGO_DATABASE_NOT_FOUND); return Result(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND); } if (ac.exists("Plan/Views/" + databaseName + "/" + viewID)) { events::CreateView(databaseName, name, TRI_ERROR_CLUSTER_VIEW_ID_EXISTS); return Result(TRI_ERROR_CLUSTER_VIEW_ID_EXISTS); } AgencyWriteTransaction const transaction{ // operations {{"Plan/Views/" + databaseName + "/" + viewID, AgencyValueOperationType::SET, json}, {"Plan/Version", AgencySimpleOperationType::INCREMENT_OP}}, // preconditions {{"Plan/Views/" + databaseName + "/" + viewID, AgencyPrecondition::Type::EMPTY, true}}}; auto const res = ac.sendTransactionWithFailover(transaction); // Only if not precondition failed if (!res.successful()) { if (res.httpCode() == (int)arangodb::rest::ResponseCode::PRECONDITION_FAILED) { // Dump agency plan: logAgencyDump(); events::CreateView(databaseName, name, TRI_ERROR_CLUSTER_COULD_NOT_CREATE_VIEW_IN_PLAN); return Result( // result TRI_ERROR_CLUSTER_COULD_NOT_CREATE_VIEW_IN_PLAN, // code std::string("Precondition that view ") + name + " with ID " + viewID + " does not yet exist failed. Cannot create view."); } events::CreateView(databaseName, name, TRI_ERROR_CLUSTER_COULD_NOT_CREATE_VIEW_IN_PLAN); return Result( // result TRI_ERROR_CLUSTER_COULD_NOT_CREATE_VIEW_IN_PLAN, // code std::string("file: ") + __FILE__ + " line: " + std::to_string(__LINE__) + " HTTP code: " + std::to_string(res.httpCode()) + " error message: " + res.errorMessage() + " error details: " + res.errorDetails() + " body: " + res.body()); } // Update our cache: loadPlan(); events::CreateView(databaseName, name, TRI_ERROR_NO_ERROR); return Result(TRI_ERROR_NO_ERROR); } //////////////////////////////////////////////////////////////////////////////// /// @brief drop view in coordinator, the return value is an ArangoDB /// error code and the errorMsg is set accordingly. //////////////////////////////////////////////////////////////////////////////// Result ClusterInfo::dropViewCoordinator( // drop view std::string const& databaseName, // database name std::string const& viewID // view identifier ) { TRI_ASSERT(ServerState::instance()->isCoordinator()); // Transact to agency AgencyWriteTransaction const trans{ // operations {{"Plan/Views/" + databaseName + "/" + viewID, AgencySimpleOperationType::DELETE_OP}, {"Plan/Version", AgencySimpleOperationType::INCREMENT_OP}}, // preconditions {{"Plan/Databases/" + databaseName, AgencyPrecondition::Type::EMPTY, false}, {"Plan/Views/" + databaseName + "/" + viewID, AgencyPrecondition::Type::EMPTY, false}}}; AgencyComm ac; auto const res = ac.sendTransactionWithFailover(trans); // Update our own cache loadPlan(); Result result; if (!res.successful()) { if (res.errorCode() == int(arangodb::ResponseCode::PRECONDITION_FAILED)) { result = Result( // result TRI_ERROR_CLUSTER_COULD_NOT_REMOVE_COLLECTION_IN_PLAN, // FIXME COULD_NOT_REMOVE_VIEW_IN_PLAN std::string("Precondition that view with ID ") + viewID + " already exist failed. Cannot create view."); // Dump agency plan: logAgencyDump(); } else { result = Result( // result TRI_ERROR_CLUSTER_COULD_NOT_REMOVE_COLLECTION_IN_PLAN, // FIXME COULD_NOT_REMOVE_VIEW_IN_PLAN std::string("file: ") + __FILE__ + " line: " + std::to_string(__LINE__) + " HTTP code: " + std::to_string(res.httpCode()) + " error message: " + res.errorMessage() + " error details: " + res.errorDetails() + " body: " + res.body()); } } events::DropView(databaseName, viewID, result.errorNumber()); return result; } //////////////////////////////////////////////////////////////////////////////// /// @brief set view properties in coordinator //////////////////////////////////////////////////////////////////////////////// Result ClusterInfo::setViewPropertiesCoordinator(std::string const& databaseName, std::string const& viewID, VPackSlice const& json) { // TRI_ASSERT(ServerState::instance()->isCoordinator()); AgencyComm ac; auto res = ac.getValues("Plan/Views/" + databaseName + "/" + viewID); if (!res.successful()) { return {TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND}; } auto const view = res.slice()[0].get( {AgencyCommManager::path(), "Plan", "Views", databaseName, viewID}); if (!view.isObject()) { logAgencyDump(); return {TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND}; } res.clear(); AgencyWriteTransaction const trans{ // operations {{"Plan/Views/" + databaseName + "/" + viewID, AgencyValueOperationType::SET, json}, {"Plan/Version", AgencySimpleOperationType::INCREMENT_OP}}, // preconditions {"Plan/Databases/" + databaseName, AgencyPrecondition::Type::EMPTY, false}}; res = ac.sendTransactionWithFailover(trans); if (!res.successful()) { return {TRI_ERROR_CLUSTER_AGENCY_COMMUNICATION_FAILED, res.errorMessage()}; } loadPlan(); return {}; } //////////////////////////////////////////////////////////////////////////////// /// @brief set collection status in coordinator //////////////////////////////////////////////////////////////////////////////// Result ClusterInfo::setCollectionStatusCoordinator(std::string const& databaseName, std::string const& collectionID, TRI_vocbase_col_status_e status) { TRI_ASSERT(ServerState::instance()->isCoordinator()); AgencyComm ac; AgencyCommResult res; AgencyPrecondition databaseExists("Plan/Databases/" + databaseName, AgencyPrecondition::Type::EMPTY, false); res = ac.getValues("Plan/Collections/" + databaseName + "/" + collectionID); if (!res.successful()) { return Result(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND); } VPackSlice col = res.slice()[0].get(std::vector( {AgencyCommManager::path(), "Plan", "Collections", databaseName, collectionID})); if (!col.isObject()) { return Result(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND); } TRI_vocbase_col_status_e old = static_cast( arangodb::basics::VelocyPackHelper::getNumericValue( col, "status", static_cast(TRI_VOC_COL_STATUS_CORRUPTED))); if (old == status) { // no status change return Result(); } VPackBuilder builder; try { VPackObjectBuilder b(&builder); for (auto const& entry : VPackObjectIterator(col)) { std::string key = entry.key.copyString(); if (key != "status") { builder.add(key, entry.value); } } builder.add("status", VPackValue(status)); } catch (...) { return Result(TRI_ERROR_OUT_OF_MEMORY); } res.clear(); AgencyOperation setColl("Plan/Collections/" + databaseName + "/" + collectionID, AgencyValueOperationType::SET, builder.slice()); AgencyOperation incrementVersion("Plan/Version", AgencySimpleOperationType::INCREMENT_OP); AgencyWriteTransaction trans({setColl, incrementVersion}, databaseExists); res = ac.sendTransactionWithFailover(trans); if (res.successful()) { loadPlan(); return Result(); } return Result(TRI_ERROR_CLUSTER_AGENCY_COMMUNICATION_FAILED, res.errorMessage()); } //////////////////////////////////////////////////////////////////////////////// /// @brief ensure an index in coordinator. //////////////////////////////////////////////////////////////////////////////// Result ClusterInfo::ensureIndexCoordinator(LogicalCollection const& collection, VPackSlice const& slice, bool create, VPackBuilder& resultBuilder, double timeout) { TRI_ASSERT(ServerState::instance()->isCoordinator()); // check index id uint64_t iid = 0; VPackSlice const idSlice = slice.get(StaticStrings::IndexId); if (idSlice.isString()) { // use predefined index id iid = arangodb::basics::StringUtils::uint64(idSlice.copyString()); } if (iid == 0) { // no id set, create a new one! iid = uniqid(); } std::string const idString = arangodb::basics::StringUtils::itoa(iid); Result res; try { auto start = std::chrono::steady_clock::now(); // Keep trying for 2 minutes, if it's preconditions, which are stopping us do { resultBuilder.clear(); res = ensureIndexCoordinatorInner( // creat index collection, idString, slice, create, resultBuilder, timeout); // Note that this function sets the errorMsg unless it is precondition // failed, in which case we retry, if this times out, we need to set // it ourselves, otherwise all is done! if (TRI_ERROR_HTTP_PRECONDITION_FAILED == res.errorNumber()) { auto diff = std::chrono::steady_clock::now() - start; if (diff < std::chrono::seconds(120)) { uint32_t wt = RandomGenerator::interval(static_cast(1000)); std::this_thread::sleep_for(std::chrono::steady_clock::duration(wt)); continue; } res = Result( // result TRI_ERROR_CLUSTER_COULD_NOT_CREATE_INDEX_IN_PLAN, // code res.errorMessage() // message ); } break; } while (true); } catch (basics::Exception const& ex) { res = Result( // result ex.code(), // code TRI_errno_string(ex.code()) + std::string(", exception: ") + ex.what()); } catch (...) { res = Result(TRI_ERROR_INTERNAL); } // We get here in any case eventually, regardless of whether we have // - succeeded with lookup or index creation // - failed because of a timeout and rollback // - some other error // There is nothing more to do here. if (!application_features::ApplicationServer::isStopping()) { loadPlan(); } return res; } // The following function does the actual work of index creation: Create // in Plan, watch Current until all dbservers for all shards have done their // bit. If this goes wrong with a timeout, the creation operation is rolled // back. If the `create` flag is false, this is actually a lookup operation. // In any case, no rollback has to happen in the calling function // ClusterInfo::ensureIndexCoordinator. Note that this method here // sets the `isBuilding` attribute to `true`, which leads to the fact // that the index is not yet used by queries. There is code in the // Agency Supervision which deletes this flag once everything has been // built successfully. This is a more robust and self-repairing solution // than if we would take out the `isBuilding` here, since it survives a // coordinator crash and failover operations. // Finally note that the retry loop for the case of a failed precondition // is outside this function here in `ensureIndexCoordinator`. Result ClusterInfo::ensureIndexCoordinatorInner(LogicalCollection const& collection, std::string const& idString, VPackSlice const& slice, bool create, VPackBuilder& resultBuilder, double timeout) { AgencyComm ac; using namespace std::chrono; double const realTimeout = getTimeout(timeout); double const endTime = TRI_microtime() + realTimeout; double const interval = getPollInterval(); TRI_ASSERT(resultBuilder.isEmpty()); auto type = slice.get(arangodb::StaticStrings::IndexType); if (!type.isString()) { return Result(TRI_ERROR_INTERNAL, "expecting string value for \"type\" attribute"); } const size_t numberOfShards = collection.numberOfShards(); // Get the current entry in Plan for this collection PlanCollectionReader collectionFromPlan(collection); if (!collectionFromPlan.state().ok()) { return collectionFromPlan.state(); } VPackSlice indexes = collectionFromPlan.indexes(); for (auto const& other : VPackArrayIterator(indexes)) { TRI_ASSERT(other.isObject()); if (true == arangodb::Index::Compare(slice, other)) { { // found an existing index... Copy over all elements in slice. VPackObjectBuilder b(&resultBuilder); resultBuilder.add(VPackObjectIterator(other)); resultBuilder.add("isNewlyCreated", VPackValue(false)); } return Result(TRI_ERROR_NO_ERROR); } if (true == arangodb::Index::CompareIdentifiers(slice, other)) { // found an existing index with a same identifier (i.e. name) // but different definition, throw an error return Result(TRI_ERROR_ARANGO_DUPLICATE_IDENTIFIER, "duplicate value for `" + arangodb::StaticStrings::IndexId + "` or `" + arangodb::StaticStrings::IndexName + "`"); } } // no existing index found. if (!create) { TRI_ASSERT(resultBuilder.isEmpty()); return Result(TRI_ERROR_NO_ERROR); } // will contain the error number and message std::shared_ptr> dbServerResult = std::make_shared>(-1); std::shared_ptr errMsg = std::make_shared(); std::function dbServerChanged = [=](VPackSlice const& result) { if (!result.isObject() || result.length() != numberOfShards) { return true; } size_t found = 0; for (auto const& shard : VPackObjectIterator(result)) { VPackSlice const slice = shard.value; if (slice.hasKey("indexes")) { VPackSlice const indexes = slice.get("indexes"); if (!indexes.isArray()) { break; // no list, so our index is not present. we can abort // searching } for (auto const& v : VPackArrayIterator(indexes)) { VPackSlice const k = v.get(StaticStrings::IndexId); if (!k.isString() || idString != k.copyString()) { continue; // this is not our index } // check for errors if (hasError(v)) { // Note that this closure runs with the mutex in the condition // variable of the agency callback, which protects writing // to *errMsg: *errMsg = extractErrorMessage(shard.key.copyString(), v); *errMsg = "Error during index creation: " + *errMsg; // Returns the specific error number if set, or the general // error otherwise int errNum = arangodb::basics::VelocyPackHelper::readNumericValue( v, StaticStrings::ErrorNum, TRI_ERROR_ARANGO_INDEX_CREATION_FAILED); dbServerResult->store(errNum, std::memory_order_release); return true; } found++; // found our index break; } } } if (found == (size_t)numberOfShards) { dbServerResult->store(setErrormsg(TRI_ERROR_NO_ERROR, *errMsg), std::memory_order_release); } return true; }; VPackBuilder newIndexBuilder; { VPackObjectBuilder ob(&newIndexBuilder); // Add the new index ignoring "id" for (auto const& e : VPackObjectIterator(slice)) { TRI_ASSERT(e.key.isString()); std::string const& key = e.key.copyString(); if (key != StaticStrings::IndexId && key != StaticStrings::IndexIsBuilding) { ob->add(e.key); ob->add(e.value); } } if (numberOfShards > 0 && !slice.get(StaticStrings::IndexType).isEqualString("arangosearch")) { ob->add(StaticStrings::IndexIsBuilding, VPackValue(true)); } ob->add(StaticStrings::IndexId, VPackValue(idString)); } // ATTENTION: The following callback calls the above closure in a // different thread. Nevertheless, the closure accesses some of our // local variables. Therefore we have to protect all accesses to them // by a mutex. We use the mutex of the condition variable in the // AgencyCallback for this. std::string databaseName = collection.vocbase().name(); std::string collectionID = std::to_string(collection.id()); std::string where = "Current/Collections/" + databaseName + "/" + collectionID; auto agencyCallback = std::make_shared(ac, where, dbServerChanged, true, false); _agencyCallbackRegistry->registerCallback(agencyCallback); auto cbGuard = scopeGuard( [&] { _agencyCallbackRegistry->unregisterCallback(agencyCallback); }); std::string const planCollKey = "Plan/Collections/" + databaseName + "/" + collectionID; std::string const planIndexesKey = planCollKey + "/indexes"; AgencyOperation newValue(planIndexesKey, AgencyValueOperationType::PUSH, newIndexBuilder.slice()); AgencyOperation incrementVersion("Plan/Version", AgencySimpleOperationType::INCREMENT_OP); AgencyPrecondition oldValue(planCollKey, AgencyPrecondition::Type::VALUE, collectionFromPlan.slice()); AgencyWriteTransaction trx({newValue, incrementVersion}, oldValue); AgencyCommResult result = ac.sendTransactionWithFailover(trx, 0.0); // This object watches whether the collection is still present in Plan // It assumes that the collection *is* present and only changes state // if the collection disappears CollectionWatcher collectionWatcher(_agencyCallbackRegistry, collection); if (!result.successful()) { if (result.httpCode() == (int)arangodb::rest::ResponseCode::PRECONDITION_FAILED) { // Retry loop is outside! return Result(TRI_ERROR_HTTP_PRECONDITION_FAILED); } return Result( // result TRI_ERROR_CLUSTER_COULD_NOT_CREATE_INDEX_IN_PLAN, // code std::string(" Failed to execute ") + trx.toJson() + " ResultCode: " + std::to_string(result.errorCode()) + " HttpCode: " + std::to_string(result.httpCode()) + " " + std::string(__FILE__) + ":" + std::to_string(__LINE__)); } // From here on we want to roll back the index creation if we run into // the timeout. If this coordinator crashes, the worst that can happen // is that the index stays in some state. In most cases, it will converge // against the planned state. loadPlan(); if (numberOfShards == 0) { // smart "dummy" collection has no shards TRI_ASSERT(collection.isSmart()); { // Copy over all elements in slice. VPackObjectBuilder b(&resultBuilder); resultBuilder.add(StaticStrings::IsSmart, VPackValue(true)); } loadCurrent(); return Result(TRI_ERROR_NO_ERROR); } { while (!application_features::ApplicationServer::isStopping()) { int tmpRes = dbServerResult->load(std::memory_order_acquire); if (tmpRes < 0) { // index has not shown up in Current yet, follow up check to // ensure it is still in plan (not dropped between iterations) AgencyCommResult result = _agency.sendTransactionWithFailover( AgencyReadTransaction(AgencyCommManager::path(planIndexesKey))); if (result.successful()) { auto indexes = result.slice()[0].get( std::vector{AgencyCommManager::path(), "Plan", "Collections", databaseName, collectionID, "indexes"}); bool found = false; if (indexes.isArray()) { for (auto const& v : VPackArrayIterator(indexes)) { VPackSlice const k = v.get(StaticStrings::IndexId); if (k.isString() && k.isEqualString(idString)) { // index is still here found = true; break; } } } if (!found) { return Result(TRI_ERROR_ARANGO_INDEX_CREATION_FAILED, "index was dropped during creation"); } } } if (tmpRes == 0) { // Finally, in case all is good, remove the `isBuilding` flag // check that the index has appeared. Note that we have to have // a precondition since the collection could have been deleted // in the meantime: VPackBuilder finishedPlanIndex; { VPackObjectBuilder o(&finishedPlanIndex); for (auto const& entry : VPackObjectIterator(newIndexBuilder.slice())) { auto const key = entry.key.copyString(); if (key != StaticStrings::IndexIsBuilding && key != "isNewlyCreated") { finishedPlanIndex.add(entry.key.copyString(), entry.value); } } } AgencyWriteTransaction trx( {AgencyOperation(planIndexesKey, AgencyValueOperationType::REPLACE, finishedPlanIndex.slice(), newIndexBuilder.slice()), AgencyOperation("Plan/Version", AgencySimpleOperationType::INCREMENT_OP)}, AgencyPrecondition(planIndexesKey, AgencyPrecondition::Type::EMPTY, false)); TRI_idx_iid_t indexId = arangodb::basics::StringUtils::uint64( newIndexBuilder.slice().get("id").copyString()); if (!_agency.sendTransactionWithFailover(trx, 0.0).successful()) { // We just log the problem and move on, the Supervision will repair // things in due course: LOG_TOPIC("d9420", INFO, Logger::CLUSTER) << "Could not remove isBuilding flag in new index " << indexId << ", this will be repaired automatically."; } loadPlan(); if (!collectionWatcher.isPresent()) { return Result(TRI_ERROR_ARANGO_INDEX_CREATION_FAILED, "Collection " + collectionID + " has gone from database " + databaseName + ". Aborting index creation"); } { // Copy over all elements in slice. VPackObjectBuilder b(&resultBuilder); resultBuilder.add(VPackObjectIterator(finishedPlanIndex.slice())); resultBuilder.add("isNewlyCreated", VPackValue(true)); } CONDITION_LOCKER(locker, agencyCallback->_cv); return Result(tmpRes, *errMsg); } if (tmpRes > 0 || TRI_microtime() > endTime) { // At this time the index creation has failed and we want to // roll back the plan entry, provided the collection still exists: AgencyWriteTransaction trx( std::vector( {AgencyOperation(planIndexesKey, AgencyValueOperationType::ERASE, newIndexBuilder.slice()), AgencyOperation("Plan/Version", AgencySimpleOperationType::INCREMENT_OP)}), AgencyPrecondition(planCollKey, AgencyPrecondition::Type::EMPTY, false)); int sleepFor = 50; auto rollbackEndTime = steady_clock::now() + std::chrono::seconds(10); while (true) { AgencyCommResult update = _agency.sendTransactionWithFailover(trx, 0.0); if (update.successful()) { loadPlan(); if (tmpRes < 0) { // timeout return Result( // result TRI_ERROR_CLUSTER_TIMEOUT, // code "Index could not be created within timeout, giving up and " "rolling back index creation."); } // The mutex in the condition variable protects the access to // *errMsg: CONDITION_LOCKER(locker, agencyCallback->_cv); return Result(tmpRes, *errMsg); } if (update._statusCode == TRI_ERROR_HTTP_PRECONDITION_FAILED) { // Collection was removed, let's break here and report outside break; } if (steady_clock::now() > rollbackEndTime) { LOG_TOPIC("db00b", ERR, Logger::CLUSTER) << "Couldn't roll back index creation of " << idString << ". Database: " << databaseName << ", Collection " << collectionID; if (tmpRes < 0) { // timeout return Result( // result TRI_ERROR_CLUSTER_TIMEOUT, // code "Timed out while trying to roll back index creation failure"); } // The mutex in the condition variable protects the access to // *errMsg: CONDITION_LOCKER(locker, agencyCallback->_cv); return Result(tmpRes, *errMsg); } if (sleepFor <= 2500) { sleepFor *= 2; } std::this_thread::sleep_for(std::chrono::milliseconds(sleepFor)); } // We only get here if the collection was dropped just in the moment // when we wanted to roll back the index creation. } if (!collectionWatcher.isPresent()) { return Result(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND, "collection " + collectionID + "appears to have been dropped from database " + databaseName + " during ensureIndex"); } { CONDITION_LOCKER(locker, agencyCallback->_cv); agencyCallback->executeByCallbackOrTimeout(interval); } } } return Result(TRI_ERROR_SHUTTING_DOWN); } //////////////////////////////////////////////////////////////////////////////// /// @brief drop an index in coordinator. //////////////////////////////////////////////////////////////////////////////// Result ClusterInfo::dropIndexCoordinator( // drop index std::string const& databaseName, // database name std::string const& collectionID, TRI_idx_iid_t iid, double timeout // request timeout ) { TRI_ASSERT(ServerState::instance()->isCoordinator()); AgencyComm ac; double const realTimeout = getTimeout(timeout); double const endTime = TRI_microtime() + realTimeout; double const interval = getPollInterval(); std::string const idString = arangodb::basics::StringUtils::itoa(iid); std::string const planCollKey = "Plan/Collections/" + databaseName + "/" + collectionID; std::string const planIndexesKey = planCollKey + "/indexes"; AgencyCommResult previous = ac.getValues(planCollKey); if (!previous.successful()) { events::DropIndex(databaseName, collectionID, idString, TRI_ERROR_CLUSTER_READING_PLAN_AGENCY); return Result(TRI_ERROR_CLUSTER_READING_PLAN_AGENCY); } velocypack::Slice collection = previous.slice()[0].get(std::vector( {AgencyCommManager::path(), "Plan", "Collections", databaseName, collectionID})); if (!collection.isObject()) { events::DropIndex(databaseName, collectionID, idString, TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND); return Result(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND); } TRI_ASSERT(VPackObjectIterator(collection).size() > 0); size_t const numberOfShards = basics::VelocyPackHelper::readNumericValue(collection, StaticStrings::NumberOfShards, 1); VPackSlice indexes = collection.get("indexes"); if (!indexes.isArray()) { LOG_TOPIC("63178", DEBUG, Logger::CLUSTER) << "Failed to find index " << databaseName << "/" << collectionID << "/" << iid; events::DropIndex(databaseName, collectionID, idString, TRI_ERROR_ARANGO_INDEX_NOT_FOUND); return Result(TRI_ERROR_ARANGO_INDEX_NOT_FOUND); } VPackSlice indexToRemove; for (VPackSlice indexSlice : VPackArrayIterator(indexes)) { auto idSlice = indexSlice.get(arangodb::StaticStrings::IndexId); auto typeSlice = indexSlice.get(arangodb::StaticStrings::IndexType); if (!idSlice.isString() || !typeSlice.isString()) { continue; } if (idSlice.isEqualString(idString)) { Index::IndexType type = Index::type(typeSlice.copyString()); if (type == Index::TRI_IDX_TYPE_PRIMARY_INDEX || type == Index::TRI_IDX_TYPE_EDGE_INDEX) { events::DropIndex(databaseName, collectionID, idString, TRI_ERROR_FORBIDDEN); return Result(TRI_ERROR_FORBIDDEN); } indexToRemove = indexSlice; break; } } if (!indexToRemove.isObject()) { LOG_TOPIC("95fe6", DEBUG, Logger::CLUSTER) << "Failed to find index " << databaseName << "/" << collectionID << "/" << iid; events::DropIndex(databaseName, collectionID, idString, TRI_ERROR_ARANGO_INDEX_NOT_FOUND); return Result(TRI_ERROR_ARANGO_INDEX_NOT_FOUND); } std::string where = "Current/Collections/" + databaseName + "/" + collectionID; auto dbServerResult = std::make_shared>(-1); std::function dbServerChanged = [=](VPackSlice const& current) { if (numberOfShards == 0) { return false; } if (!current.isObject()) { return true; } VPackObjectIterator shards(current); if (shards.size() == numberOfShards) { bool found = false; for (auto const& shard : shards) { VPackSlice const indexes = shard.value.get("indexes"); if (indexes.isArray()) { for (auto const& v : VPackArrayIterator(indexes)) { if (v.isObject()) { VPackSlice const k = v.get(StaticStrings::IndexId); if (k.isString() && k.isEqualString(idString)) { // still found the index in some shard found = true; break; } } if (found) { break; } } } } if (!found) { dbServerResult->store(TRI_ERROR_NO_ERROR, std::memory_order_release); } } return true; }; // ATTENTION: The following callback calls the above closure in a // different thread. Nevertheless, the closure accesses some of our // local variables. Therefore we have to protect all accesses to them // by a mutex. We use the mutex of the condition variable in the // AgencyCallback for this. auto agencyCallback = std::make_shared(ac, where, dbServerChanged, true, false); _agencyCallbackRegistry->registerCallback(agencyCallback); auto cbGuard = scopeGuard( [&] { _agencyCallbackRegistry->unregisterCallback(agencyCallback); }); AgencyOperation planErase(planIndexesKey, AgencyValueOperationType::ERASE, indexToRemove); AgencyOperation incrementVersion("Plan/Version", AgencySimpleOperationType::INCREMENT_OP); AgencyPrecondition prec(planCollKey, AgencyPrecondition::Type::VALUE, collection); AgencyWriteTransaction trx({planErase, incrementVersion}, prec); AgencyCommResult result = ac.sendTransactionWithFailover(trx, 0.0); if (!result.successful()) { events::DropIndex(databaseName, collectionID, idString, TRI_ERROR_CLUSTER_COULD_NOT_DROP_INDEX_IN_PLAN); return Result( // result TRI_ERROR_CLUSTER_COULD_NOT_DROP_INDEX_IN_PLAN, // code std::string(" Failed to execute ") + trx.toJson() + " ResultCode: " + std::to_string(result.errorCode())); } // load our own cache: loadPlan(); if (numberOfShards == 0) { // smart "dummy" collection has no shards TRI_ASSERT(collection.get(StaticStrings::IsSmart).getBool()); loadCurrent(); return Result(TRI_ERROR_NO_ERROR); } { CONDITION_LOCKER(locker, agencyCallback->_cv); while (true) { if (*dbServerResult >= 0) { cbGuard.fire(); // unregister cb loadCurrent(); events::DropIndex(databaseName, collectionID, idString, *dbServerResult); return Result(*dbServerResult); } if (TRI_microtime() > endTime) { events::DropIndex(databaseName, collectionID, idString, TRI_ERROR_CLUSTER_TIMEOUT); return Result(TRI_ERROR_CLUSTER_TIMEOUT); } agencyCallback->executeByCallbackOrTimeout(interval); if (application_features::ApplicationServer::isStopping()) { return Result(TRI_ERROR_CLUSTER_TIMEOUT); } } } } //////////////////////////////////////////////////////////////////////////////// /// @brief (re-)load the information about servers from the agency /// Usually one does not have to call this directly. //////////////////////////////////////////////////////////////////////////////// static std::string const prefixServersRegistered = "Current/ServersRegistered"; static std::string const prefixServersKnown = "Current/ServersKnown"; static std::string const mapUniqueToShortId = "Target/MapUniqueToShortID"; void ClusterInfo::loadServers() { ++_serversProt.wantedVersion; // Indicate that after *NOW* somebody has to // reread from the agency! MUTEX_LOCKER(mutexLocker, _serversProt.mutex); uint64_t storedVersion = _serversProt.wantedVersion; // this is the version // we will set in the end if (_serversProt.doneVersion == storedVersion) { // Somebody else did, what we intended to do, so just return return; } AgencyCommResult result = _agency.sendTransactionWithFailover(AgencyReadTransaction( std::vector({AgencyCommManager::path(prefixServersRegistered), AgencyCommManager::path(mapUniqueToShortId), AgencyCommManager::path(prefixServersKnown)}))); if (result.successful()) { velocypack::Slice serversRegistered = result.slice()[0].get(std::vector( {AgencyCommManager::path(), "Current", "ServersRegistered"})); velocypack::Slice serversAliases = result.slice()[0].get(std::vector( {AgencyCommManager::path(), "Target", "MapUniqueToShortID"})); velocypack::Slice serversKnownSlice = result.slice()[0].get(std::vector( {AgencyCommManager::path(), "Current", "ServersKnown"})); if (serversRegistered.isObject()) { decltype(_servers) newServers; decltype(_serverAliases) newAliases; decltype(_serverAdvertisedEndpoints) newAdvertisedEndpoints; decltype(_serverTimestamps) newTimestamps; std::unordered_set serverIds; for (auto const& res : VPackObjectIterator(serversRegistered)) { velocypack::Slice slice = res.value; if (slice.isObject() && slice.hasKey("endpoint")) { std::string server = arangodb::basics::VelocyPackHelper::getStringValue(slice, "endpoint", ""); std::string advertised = arangodb::basics::VelocyPackHelper::getStringValue( slice, "advertisedEndpoint", ""); std::string serverId = res.key.copyString(); try { velocypack::Slice serverSlice; serverSlice = serversAliases.get(serverId); if (serverSlice.isObject()) { std::string alias = arangodb::basics::VelocyPackHelper::getStringValue( serverSlice, "ShortName", ""); newAliases.emplace(std::make_pair(alias, serverId)); } } catch (...) { } std::string serverTimestamp = arangodb::basics::VelocyPackHelper::getStringValue(slice, "timestamp", ""); newServers.emplace(std::make_pair(serverId, server)); newAdvertisedEndpoints.emplace(std::make_pair(serverId, advertised)); newTimestamps.emplace(std::make_pair(serverId, serverTimestamp)); serverIds.emplace(serverId); } } decltype(_serversKnown) newServersKnown(serversKnownSlice, serverIds); // Now set the new value: { WRITE_LOCKER(writeLocker, _serversProt.lock); _servers.swap(newServers); _serverAliases.swap(newAliases); _serverAdvertisedEndpoints.swap(newAdvertisedEndpoints); _serverTimestamps.swap(newTimestamps); _serversKnown = std::move(newServersKnown); _serversProt.doneVersion = storedVersion; _serversProt.isValid = true; } // RebootTracker has its own mutex, and doesn't strictly need to be in // sync with the other members. rebootTracker().updateServerState(_serversKnown.rebootIds()); return; } } LOG_TOPIC("449e0", DEBUG, Logger::CLUSTER) << "Error while loading " << prefixServersRegistered << " httpCode: " << result.httpCode() << " errorCode: " << result.errorCode() << " errorMessage: " << result.errorMessage() << " body: " << result.body(); } //////////////////////////////////////////////////////////////////////// /// @brief Hand out copy of reboot ids //////////////////////////////////////////////////////////////////////////////// std::unordered_map ClusterInfo::rebootIds() const { MUTEX_LOCKER(mutexLocker, _serversProt.mutex); return _serversKnown.rebootIds(); } //////////////////////////////////////////////////////////////////////// /// @brief find the endpoint of a server from its ID. /// If it is not found in the cache, the cache is reloaded once, if /// it is still not there an empty string is returned as an error. //////////////////////////////////////////////////////////////////////////////// std::string ClusterInfo::getServerEndpoint(ServerID const& serverID) { #ifdef DEBUG_SYNC_REPLICATION if (serverID == "debug-follower") { return "tcp://127.0.0.1:3000"; } #endif int tries = 0; if (!_serversProt.isValid) { loadServers(); tries++; } std::string serverID_ = serverID; while (true) { { READ_LOCKER(readLocker, _serversProt.lock); // _serversAliases is a map-type auto ita = _serverAliases.find(serverID_); if (ita != _serverAliases.end()) { serverID_ = (*ita).second; } // _servers is a map-type auto it = _servers.find(serverID_); if (it != _servers.end()) { return (*it).second; } } if (++tries >= 2) { break; } // must call loadServers outside the lock loadServers(); } return std::string(); } //////////////////////////////////////////////////////////////////////////////// /// @brief find the advertised endpoint of a server from its ID. /// If it is not found in the cache, the cache is reloaded once, if /// it is still not there an empty string is returned as an error. //////////////////////////////////////////////////////////////////////////////// std::string ClusterInfo::getServerAdvertisedEndpoint(ServerID const& serverID) { #ifdef DEBUG_SYNC_REPLICATION if (serverID == "debug-follower") { return "tcp://127.0.0.1:3000"; } #endif int tries = 0; if (!_serversProt.isValid) { loadServers(); tries++; } std::string serverID_ = serverID; while (true) { { READ_LOCKER(readLocker, _serversProt.lock); // _serversAliases is a map-type auto ita = _serverAliases.find(serverID_); if (ita != _serverAliases.end()) { serverID_ = (*ita).second; } // _serversAliases is a map-type auto it = _serverAdvertisedEndpoints.find(serverID_); if (it != _serverAdvertisedEndpoints.end()) { return (*it).second; } } if (++tries >= 2) { break; } // must call loadServers outside the lock loadServers(); } return std::string(); } //////////////////////////////////////////////////////////////////////////////// /// @brief find the ID of a server from its endpoint. /// If it is not found in the cache, the cache is reloaded once, if /// it is still not there an empty string is returned as an error. //////////////////////////////////////////////////////////////////////////////// std::string ClusterInfo::getServerName(std::string const& endpoint) { int tries = 0; if (!_serversProt.isValid) { loadServers(); tries++; } while (true) { { READ_LOCKER(readLocker, _serversProt.lock); for (auto const& it : _servers) { if (it.second == endpoint) { return it.first; } } } if (++tries >= 2) { break; } // must call loadServers outside the lock loadServers(); } return std::string(); } //////////////////////////////////////////////////////////////////////////////// /// @brief (re-)load the information about all coordinators from the agency /// Usually one does not have to call this directly. //////////////////////////////////////////////////////////////////////////////// static std::string const prefixCurrentCoordinators = "Current/Coordinators"; void ClusterInfo::loadCurrentCoordinators() { ++_coordinatorsProt.wantedVersion; // Indicate that after *NOW* somebody // has to reread from the agency! MUTEX_LOCKER(mutexLocker, _coordinatorsProt.mutex); uint64_t storedVersion = _coordinatorsProt.wantedVersion; // this is the // version we will set in the end if (_coordinatorsProt.doneVersion == storedVersion) { // Somebody else did, what we intended to do, so just return return; } // Now contact the agency: AgencyCommResult result = _agency.getValues(prefixCurrentCoordinators); if (result.successful()) { velocypack::Slice currentCoordinators = result.slice()[0].get(std::vector( {AgencyCommManager::path(), "Current", "Coordinators"})); if (currentCoordinators.isObject()) { decltype(_coordinators) newCoordinators; for (auto const& coordinator : VPackObjectIterator(currentCoordinators)) { newCoordinators.emplace(std::make_pair(coordinator.key.copyString(), coordinator.value.copyString())); } // Now set the new value: { WRITE_LOCKER(writeLocker, _coordinatorsProt.lock); _coordinators.swap(newCoordinators); _coordinatorsProt.doneVersion = storedVersion; _coordinatorsProt.isValid = true; } return; } } LOG_TOPIC("5ee6d", DEBUG, Logger::CLUSTER) << "Error while loading " << prefixCurrentCoordinators << " httpCode: " << result.httpCode() << " errorCode: " << result.errorCode() << " errorMessage: " << result.errorMessage() << " body: " << result.body(); } static std::string const prefixMappings = "Target/MapUniqueToShortID"; void ClusterInfo::loadCurrentMappings() { ++_mappingsProt.wantedVersion; // Indicate that after *NOW* somebody // has to reread from the agency! MUTEX_LOCKER(mutexLocker, _mappingsProt.mutex); uint64_t storedVersion = _mappingsProt.wantedVersion; // this is the // version we will // set in the end if (_mappingsProt.doneVersion == storedVersion) { // Somebody else did, what we intended to do, so just return return; } // Now contact the agency: AgencyCommResult result = _agency.getValues(prefixMappings); if (result.successful()) { velocypack::Slice mappings = result.slice()[0].get(std::vector( {AgencyCommManager::path(), "Target", "MapUniqueToShortID"})); if (mappings.isObject()) { decltype(_coordinatorIdMap) newCoordinatorIdMap; for (auto const& mapping : VPackObjectIterator(mappings)) { ServerID fullId = mapping.key.copyString(); auto mapObject = mapping.value; if (mapObject.isObject()) { ServerShortName shortName = mapObject.get("ShortName").copyString(); ServerShortID shortId = mapObject.get("TransactionID").getNumericValue(); static std::string const expectedPrefix{"Coordinator"}; if (shortName.size() > expectedPrefix.size() && shortName.substr(0, expectedPrefix.size()) == expectedPrefix) { newCoordinatorIdMap.emplace(shortId, fullId); } } } // Now set the new value: { WRITE_LOCKER(writeLocker, _mappingsProt.lock); _coordinatorIdMap.swap(newCoordinatorIdMap); _mappingsProt.doneVersion = storedVersion; _mappingsProt.isValid = true; } return; } } LOG_TOPIC("36f2e", DEBUG, Logger::CLUSTER) << "Error while loading " << prefixMappings << " httpCode: " << result.httpCode() << " errorCode: " << result.errorCode() << " errorMessage: " << result.errorMessage() << " body: " << result.body(); } //////////////////////////////////////////////////////////////////////////////// /// @brief (re-)load the information about all DBservers from the agency /// Usually one does not have to call this directly. //////////////////////////////////////////////////////////////////////////////// static std::string const prefixCurrentDBServers = "Current/DBServers"; static std::string const prefixTarget = "Target"; void ClusterInfo::loadCurrentDBServers() { ++_DBServersProt.wantedVersion; // Indicate that after *NOW* somebody has to // reread from the agency! MUTEX_LOCKER(mutexLocker, _DBServersProt.mutex); uint64_t storedVersion = _DBServersProt.wantedVersion; // this is the version // we will set in the end if (_DBServersProt.doneVersion == storedVersion) { // Somebody else did, what we intended to do, so just return return; } // Now contact the agency: AgencyCommResult result = _agency.getValues(prefixCurrentDBServers); AgencyCommResult target = _agency.getValues(prefixTarget); if (result.successful() && target.successful()) { velocypack::Slice currentDBServers; velocypack::Slice failedDBServers; velocypack::Slice cleanedDBServers; velocypack::Slice toBeCleanedDBServers; if (result.slice().length() > 0) { currentDBServers = result.slice()[0].get(std::vector( {AgencyCommManager::path(), "Current", "DBServers"})); } if (target.slice().length() > 0) { failedDBServers = target.slice()[0].get(std::vector( {AgencyCommManager::path(), "Target", "FailedServers"})); cleanedDBServers = target.slice()[0].get(std::vector( {AgencyCommManager::path(), "Target", "CleanedServers"})); toBeCleanedDBServers = target.slice()[0].get(std::vector( {AgencyCommManager::path(), "Target", "ToBeCleanedServers"})); } if (currentDBServers.isObject() && failedDBServers.isObject()) { decltype(_DBServers) newDBServers; for (auto const& dbserver : VPackObjectIterator(currentDBServers)) { bool found = false; if (failedDBServers.isObject()) { for (auto const& failedServer : VPackObjectIterator(failedDBServers)) { if (basics::VelocyPackHelper::equal(dbserver.key, failedServer.key, false)) { found = true; break; } } } if (found) { continue; } if (cleanedDBServers.isArray()) { bool found = false; for (auto const& cleanedServer : VPackArrayIterator(cleanedDBServers)) { if (basics::VelocyPackHelper::equal(dbserver.key, cleanedServer, false)) { found = true; break; } } if (found) { continue; } } if (toBeCleanedDBServers.isArray()) { bool found = false; for (auto const& toBeCleanedServer : VPackArrayIterator(toBeCleanedDBServers)) { if (basics::VelocyPackHelper::equal(dbserver.key, toBeCleanedServer, false)) { found = true; break; } } if (found) { continue; } } newDBServers.emplace(std::make_pair(dbserver.key.copyString(), dbserver.value.copyString())); } // Now set the new value: { WRITE_LOCKER(writeLocker, _DBServersProt.lock); _DBServers.swap(newDBServers); _DBServersProt.doneVersion = storedVersion; _DBServersProt.isValid = true; } return; } } LOG_TOPIC("5a7e1", DEBUG, Logger::CLUSTER) << "Error while loading " << prefixCurrentDBServers << " httpCode: " << result.httpCode() << " errorCode: " << result.errorCode() << " errorMessage: " << result.errorMessage() << " body: " << result.body(); } //////////////////////////////////////////////////////////////////////////////// /// @brief return a list of all DBServers in the cluster that have /// currently registered //////////////////////////////////////////////////////////////////////////////// std::vector ClusterInfo::getCurrentDBServers() { std::vector result; if (!_DBServersProt.isValid) { loadCurrentDBServers(); } // return a consistent state of servers READ_LOCKER(readLocker, _DBServersProt.lock); result.reserve(_DBServers.size()); for (auto& it : _DBServers) { result.emplace_back(it.first); } return result; } //////////////////////////////////////////////////////////////////////////////// /// @brief find the servers who are responsible for a shard (one leader /// and multiple followers) /// If it is not found in the cache, the cache is reloaded once, if /// it is still not there an empty string is returned as an error. //////////////////////////////////////////////////////////////////////////////// std::shared_ptr> ClusterInfo::getResponsibleServer(ShardID const& shardID) { int tries = 0; if (!_currentProt.isValid) { loadCurrent(); tries++; } while (true) { { { READ_LOCKER(readLocker, _currentProt.lock); // _shardIds is a map-type >> auto it = _shardIds.find(shardID); if (it != _shardIds.end()) { auto serverList = (*it).second; if (serverList != nullptr && serverList->size() > 0 && (*serverList)[0].size() > 0 && (*serverList)[0][0] == '_') { // This is a temporary situation in which the leader has already // resigned, let's wait half a second and try again. --tries; LOG_TOPIC("b1dc5", INFO, Logger::CLUSTER) << "getResponsibleServer: found resigned leader," << "waiting for half a second..."; } else { return (*it).second; } } } std::this_thread::sleep_for(std::chrono::microseconds(500000)); } if (++tries >= 2) { break; } // must load collections outside the lock loadCurrent(); } return std::make_shared>(); } //////////////////////////////////////////////////////////////////////////////// /// @brief find the shard list of a collection, sorted numerically //////////////////////////////////////////////////////////////////////////////// std::shared_ptr> ClusterInfo::getShardList(CollectionID const& collectionID) { if (!_planProt.isValid) { loadPlan(); } int tries = 0; while (true) { { // Get the sharding keys and the number of shards: READ_LOCKER(readLocker, _planProt.lock); // _shards is a map-type >> auto it = _shards.find(collectionID); if (it != _shards.end()) { return it->second; } } if (++tries >= 2) { return std::make_shared>(); } loadPlan(); } } //////////////////////////////////////////////////////////////////////////////// /// @brief return the list of coordinator server names //////////////////////////////////////////////////////////////////////////////// std::vector ClusterInfo::getCurrentCoordinators() { std::vector result; if (!_coordinatorsProt.isValid) { loadCurrentCoordinators(); } // return a consistent state of servers READ_LOCKER(readLocker, _coordinatorsProt.lock); result.reserve(_coordinators.size()); for (auto& it : _coordinators) { result.emplace_back(it.first); } return result; } //////////////////////////////////////////////////////////////////////////////// /// @brief lookup full coordinator ID from short ID //////////////////////////////////////////////////////////////////////////////// ServerID ClusterInfo::getCoordinatorByShortID(ServerShortID shortId) { ServerID result; if (!_mappingsProt.isValid) { loadCurrentMappings(); } // return a consistent state of servers READ_LOCKER(readLocker, _mappingsProt.lock); auto it = _coordinatorIdMap.find(shortId); if (it != _coordinatorIdMap.end()) { result = it->second; } return result; } ////////////////////////////////////////////////////////////////////////////// /// @brief invalidate plan ////////////////////////////////////////////////////////////////////////////// void ClusterInfo::invalidatePlan() { { WRITE_LOCKER(writeLocker, _planProt.lock); _planProt.isValid = false; } } ////////////////////////////////////////////////////////////////////////////// /// @brief invalidate current coordinators ////////////////////////////////////////////////////////////////////////////// void ClusterInfo::invalidateCurrentCoordinators() { { WRITE_LOCKER(writeLocker, _coordinatorsProt.lock); _coordinatorsProt.isValid = false; } } ////////////////////////////////////////////////////////////////////////////// /// @brief invalidate current mappings ////////////////////////////////////////////////////////////////////////////// void ClusterInfo::invalidateCurrentMappings() { { WRITE_LOCKER(writeLocker, _mappingsProt.lock); _mappingsProt.isValid = false; } } ////////////////////////////////////////////////////////////////////////////// /// @brief invalidate current ////////////////////////////////////////////////////////////////////////////// void ClusterInfo::invalidateCurrent() { { WRITE_LOCKER(writeLocker, _serversProt.lock); _serversProt.isValid = false; } { WRITE_LOCKER(writeLocker, _DBServersProt.lock); _DBServersProt.isValid = false; } { WRITE_LOCKER(writeLocker, _currentProt.lock); _currentProt.isValid = false; } invalidateCurrentCoordinators(); invalidateCurrentMappings(); } ////////////////////////////////////////////////////////////////////////////// /// @brief get current "Plan" structure ////////////////////////////////////////////////////////////////////////////// std::shared_ptr ClusterInfo::getPlan() { if (!_planProt.isValid) { loadPlan(); } READ_LOCKER(readLocker, _planProt.lock); return _plan; } ////////////////////////////////////////////////////////////////////////////// /// @brief get current "Current" structure ////////////////////////////////////////////////////////////////////////////// std::shared_ptr ClusterInfo::getCurrent() { if (!_currentProt.isValid) { loadCurrent(); } READ_LOCKER(readLocker, _currentProt.lock); return _current; } std::unordered_map ClusterInfo::getServers() { if (!_serversProt.isValid) { loadServers(); } READ_LOCKER(readLocker, _serversProt.lock); std::unordered_map serv = _servers; return serv; } std::unordered_map ClusterInfo::getServerAliases() { READ_LOCKER(readLocker, _serversProt.lock); std::unordered_map ret; for (const auto& i : _serverAliases) { ret.emplace(i.second, i.first); } return ret; } std::unordered_map ClusterInfo::getServerAdvertisedEndpoints() { READ_LOCKER(readLocker, _serversProt.lock); std::unordered_map ret; for (const auto& i : _serverAdvertisedEndpoints) { ret.emplace(i.second, i.first); } return ret; } std::unordered_map ClusterInfo::getServerTimestamps() { READ_LOCKER(readLocker, _serversProt.lock); return _serverTimestamps; } arangodb::Result ClusterInfo::getShardServers(ShardID const& shardId, std::vector& servers) { READ_LOCKER(readLocker, _planProt.lock); auto it = _shardServers.find(shardId); if (it != _shardServers.end()) { servers = (*it).second; return arangodb::Result(); } LOG_TOPIC("16d14", DEBUG, Logger::CLUSTER) << "Strange, did not find shard in _shardServers: " << shardId; return arangodb::Result(TRI_ERROR_FAILED); } arangodb::Result ClusterInfo::agencyDump(std::shared_ptr body) { AgencyCommResult dump = _agency.dump(); if (!dump.successful()) { LOG_TOPIC("93c0e", ERR, Logger::CLUSTER) << "failed to acquire agency dump: " << dump.errorMessage(); return Result(dump.errorCode(), dump.errorMessage()); } body->add(dump.slice()); return Result(); } arangodb::Result ClusterInfo::agencyPlan(std::shared_ptr body) { AgencyCommResult dump = _agency.getValues("Plan"); if (!dump.successful()) { LOG_TOPIC("ce937", ERR, Logger::CLUSTER) << "failed to acquire agency dump: " << dump.errorMessage(); return Result(dump.errorCode(), dump.errorMessage()); } body->add(dump.slice()); return Result(); } arangodb::Result ClusterInfo::agencyReplan(VPackSlice const plan) { // Apply only Collections and DBServers AgencyWriteTransaction planTransaction(std::vector{ AgencyOperation("Plan/Collections", AgencyValueOperationType::SET, plan.get(std::vector{"arango", "Plan", "Collections"})), AgencyOperation("Plan/Databases", AgencyValueOperationType::SET, plan.get(std::vector{"arango", "Plan", "Databases"})), AgencyOperation("Plan/Views", AgencyValueOperationType::SET, plan.get( std::vector{"arango", "Plan", "Views"})), AgencyOperation("Plan/Version", AgencySimpleOperationType::INCREMENT_OP), AgencyOperation("Sync/UserVersion", AgencySimpleOperationType::INCREMENT_OP)}); AgencyCommResult r = _agency.sendTransactionWithFailover(planTransaction); if (!r.successful()) { arangodb::Result result(TRI_ERROR_HOT_BACKUP_INTERNAL, std::string( "Error reporting to agency: _statusCode: ") + std::to_string(r.errorCode())); return result; } return arangodb::Result(); } std::string const backupKey = "/arango/Target/HotBackup/Create/"; std::string const maintenanceKey = "/arango/Supervision/Maintenance"; std::string const toDoKey = "/arango/Target/ToDo"; std::string const pendingKey = "/arango/Target/Pending"; std::string const writeURL = "_api/agency/write"; std::vector modepv = {"arango", "Supervision", "State", "Mode"}; arangodb::Result ClusterInfo::agencyHotBackupLock(std::string const& backupId, double const& timeout, bool& supervisionOff) { using namespace std::chrono; auto const endTime = steady_clock::now() + milliseconds(static_cast(1.0e3 * timeout)); supervisionOff = false; LOG_TOPIC("e74e5", DEBUG, Logger::BACKUP) << "initiating agency lock for hot backup " << backupId; VPackBuilder builder; { VPackArrayBuilder trxs(&builder); { VPackArrayBuilder trx(&builder); // Operations { VPackObjectBuilder o(&builder); builder.add(backupKey + backupId, VPackValue(0)); // Hot backup key builder.add(maintenanceKey, VPackValue("on")); // Turn off supervision } // Preconditions { VPackObjectBuilder precs(&builder); builder.add(VPackValue(backupKey)); // Backup key empty { VPackObjectBuilder oe(&builder); builder.add("oldEmpty", VPackValue(true)); } builder.add(VPackValue(pendingKey)); // No jobs pending { VPackObjectBuilder oe(&builder); builder.add("old", VPackSlice::emptyObjectSlice()); } builder.add(VPackValue(toDoKey)); // No jobs to do { VPackObjectBuilder oe(&builder); builder.add("old", VPackSlice::emptyObjectSlice()); } builder.add(VPackValue(maintenanceKey)); // Supervision on { VPackObjectBuilder old(&builder); builder.add("oldEmpty", VPackValue(true)); } } } { VPackArrayBuilder trx(&builder); // Operations { VPackObjectBuilder o(&builder); builder.add(backupKey + backupId, VPackValue(0)); // Hot backup key builder.add(maintenanceKey, VPackValue("on")); // Turn off maintenance } // Prevonditions { VPackObjectBuilder precs(&builder); builder.add(VPackValue(backupKey)); // Backup key empty { VPackObjectBuilder oe(&builder); builder.add("oldEmpty", VPackValue(true)); } builder.add(VPackValue(pendingKey)); // No jobs pending { VPackObjectBuilder oe(&builder); builder.add("old", VPackSlice::emptyObjectSlice()); } builder.add(VPackValue(toDoKey)); // No jobs to do { VPackObjectBuilder oe(&builder); builder.add("old", VPackSlice::emptyObjectSlice()); } builder.add(VPackValue(maintenanceKey)); // Supervision off { VPackObjectBuilder old(&builder); builder.add("old", VPackValue("on")); } } } } // Try to establish hot backup lock in agency. auto result = _agency.sendWithFailover(arangodb::rest::RequestType::POST, timeout, writeURL, builder.slice()); LOG_TOPIC("53a93", DEBUG, Logger::BACKUP) << "agency lock for hot backup " << backupId << " scheduled with " << builder.toJson(); // *** ATTENTION ***: Result will always be 412. // So we're going to fail, if we have an error OTHER THAN 412: if (!result.successful() && result.httpCode() != (int)arangodb::rest::ResponseCode::PRECONDITION_FAILED) { return arangodb::Result(TRI_ERROR_HOT_BACKUP_INTERNAL, "failed to acquire backup lock in agency"); } auto rv = VPackParser::fromJson(result.bodyRef()); LOG_TOPIC("a94d5", DEBUG, Logger::BACKUP) << "agency lock response for backup id " << backupId << ": " << rv->toJson(); if (!rv->slice().isObject() || !rv->slice().hasKey("results") || !rv->slice().get("results").isArray() || rv->slice().get("results").length() != 2) { return arangodb::Result( TRI_ERROR_HOT_BACKUP_INTERNAL, "invalid agency result while acquiring backup lock"); } auto ar = rv->slice().get("results"); uint64_t first = ar[0].getNumber(); uint64_t second = ar[1].getNumber(); if (first == 0 && second == 0) { // tough luck return arangodb::Result(TRI_ERROR_HOT_BACKUP_INTERNAL, "preconditions failed while trying to acquire " "backup lock in the agency"); } if (first > 0) { // Supervision was on LOG_TOPIC("b6c98", DEBUG, Logger::BACKUP) << "agency lock found supervision on before"; supervisionOff = false; } else { LOG_TOPIC("bbb55", DEBUG, Logger::BACKUP) << "agency lock found supervision off before"; supervisionOff = true; } double wait = 0.1; while (!application_features::ApplicationServer::isStopping() && std::chrono::steady_clock::now() < endTime) { result = _agency.getValues("Supervision/State/Mode"); if (result.successful()) { if (!result.slice().isArray() || result.slice().length() != 1) { return arangodb::Result( TRI_ERROR_HOT_BACKUP_INTERNAL, std::string( "invalid JSON from agency, when acquiring supervision mode: ") + result.slice().toJson()); } if (result.slice()[0].hasKey(modepv) && result.slice()[0].get(modepv).isString()) { if (result.slice()[0].get(modepv).isEqualString("Maintenance")) { LOG_TOPIC("76a2c", DEBUG, Logger::BACKUP) << "agency hot backup lock acquired"; return arangodb::Result(); } } } LOG_TOPIC("ede54", DEBUG, Logger::BACKUP) << "agency hot backup lock waiting: " << result.slice().toJson(); if (wait < 2.0) { wait *= 1.1; } std::this_thread::sleep_for(std::chrono::duration(wait)); } agencyHotBackupUnlock(backupId, timeout, supervisionOff); return arangodb::Result( TRI_ERROR_HOT_BACKUP_INTERNAL, "timeout waiting for maintenance mode to be activated in agency"); } arangodb::Result ClusterInfo::agencyHotBackupUnlock(std::string const& backupId, double const& timeout, const bool& supervisionOff) { using namespace std::chrono; auto const endTime = steady_clock::now() + milliseconds(static_cast(1.0e3 * timeout)); LOG_TOPIC("6ae41", DEBUG, Logger::BACKUP) << "unlocking backup lock for backup " + backupId + " in agency"; VPackBuilder builder; { VPackArrayBuilder trxs(&builder); { VPackArrayBuilder trx(&builder); { VPackObjectBuilder o(&builder); builder.add(VPackValue(backupKey)); // Remove backup key { VPackObjectBuilder oo(&builder); builder.add("op", VPackValue("delete")); } if (!supervisionOff) { // Turn supervision on, if it was on before builder.add(VPackValue(maintenanceKey)); VPackObjectBuilder d(&builder); builder.add("op", VPackValue("delete")); } } } } // Try to establish hot backup lock in agency. Result will always be 412. // Question is: How 412? auto result = _agency.sendWithFailover(arangodb::rest::RequestType::POST, timeout, writeURL, builder.slice()); if (!result.successful() && result.httpCode() != (int)arangodb::rest::ResponseCode::PRECONDITION_FAILED) { return arangodb::Result(TRI_ERROR_HOT_BACKUP_INTERNAL, "failed to release backup lock in agency"); } auto rv = VPackParser::fromJson(result.bodyRef()); if (!rv->slice().isObject() || !rv->slice().hasKey("results") || !rv->slice().get("results").isArray()) { return arangodb::Result( TRI_ERROR_HOT_BACKUP_INTERNAL, "invalid agency result while releasing backup lock"); } auto ar = rv->slice().get("results"); if (!ar[0].isNumber()) { return arangodb::Result( TRI_ERROR_HOT_BACKUP_INTERNAL, "invalid agency result while releasing backup lock"); } double wait = 0.1; while (!application_features::ApplicationServer::isStopping() && std::chrono::steady_clock::now() < endTime) { result = _agency.getValues("/arango/Supervision/State/Mode"); if (result.successful()) { if (!result.slice().isArray() || result.slice().length() != 1 || !result.slice()[0].hasKey(modepv) || !result.slice()[0].get(modepv).isString()) { return arangodb::Result( TRI_ERROR_HOT_BACKUP_INTERNAL, std::string("invalid JSON from agency, when deactivating supervision mode:") + result.slice().toJson()); } if (result.slice()[0].get(modepv).isEqualString("Normal")) { return arangodb::Result(); } } if (wait < 2.0) { wait *= 1.1; } std::this_thread::sleep_for(std::chrono::duration(wait)); } return arangodb::Result( TRI_ERROR_HOT_BACKUP_INTERNAL, "timeout waiting for maintenance mode to be deactivated in agency"); } ClusterInfo::ServersKnown::ServersKnown(VPackSlice const serversKnownSlice, std::unordered_set const& serverIds) : _serversKnown() { TRI_ASSERT(serversKnownSlice.isNone() || serversKnownSlice.isObject()); if (serversKnownSlice.isObject()) { for (auto const it : VPackObjectIterator(serversKnownSlice)) { VPackSlice const knownServerSlice = it.value; TRI_ASSERT(knownServerSlice.isObject()); if (knownServerSlice.isObject()) { VPackSlice const rebootIdSlice = knownServerSlice.get("rebootId"); TRI_ASSERT(rebootIdSlice.isInteger()); if (rebootIdSlice.isInteger()) { std::string serverId = it.key.copyString(); auto const rebootId = RebootId{rebootIdSlice.getNumericValue()}; _serversKnown.emplace(std::move(serverId), rebootId); } } } } // For backwards compatibility / rolling upgrades, add servers that aren't in // ServersKnown but in ServersRegistered with a reboot ID of 0 as a fallback. // We should be able to remove this in 3.6. for (auto const& serverId : serverIds) { auto const rv = _serversKnown.emplace(serverId, RebootId{0}); LOG_TOPIC_IF("0acbd", INFO, Logger::CLUSTER, rv.second) << "Server " << serverId << " is in Current/ServersRegistered, but not in " "Current/ServersKnown. This is expected to happen " "during a rolling upgrade."; } } std::unordered_map const& ClusterInfo::ServersKnown::serversKnown() const noexcept { return _serversKnown; } std::unordered_map ClusterInfo::ServersKnown::rebootIds() const { std::unordered_map rebootIds; for (auto const& it : _serversKnown) { rebootIds.emplace(it.first, it.second.rebootId()); } return rebootIds; } VPackSlice PlanCollectionReader::indexes() { VPackSlice res = _collection.get("indexes"); if (res.isNone()) { return VPackSlice::emptyArraySlice(); } else { TRI_ASSERT(res.isArray()); return res; } } CollectionWatcher::~CollectionWatcher() { _agencyCallbackRegistry->unregisterCallback(_agencyCallback); }; // ----------------------------------------------------------------------------- // --SECTION-- END-OF-FILE // -----------------------------------------------------------------------------