1
0
Fork 0

[3.4] Background Get Ids (#9785)

* Obtain new unique IDs via a background thread.

* Updated changelog.
This commit is contained in:
Lars Maier 2019-09-19 21:33:02 +02:00 committed by KVS85
parent dc2e27db6c
commit e51bc5ca52
3 changed files with 115 additions and 52 deletions

View File

@ -1,6 +1,9 @@
v3.4.9 (XXXX-XX-XX)
-------------------
* Obtain new unique IDs via a background thread.
* Fixed a shutdown hanger because of a read/write lock race.
* Fixed "ArangoDB is not running in cluster mode" errors in active failover setups.

View File

@ -249,7 +249,9 @@ ClusterInfo::ClusterInfo(AgencyCallbackRegistry* agencyCallbackRegistry)
_uniqid() {
_uniqid._currentValue = 1ULL;
_uniqid._upperValue = 0ULL;
_uniqid._nextBatchStart = 1ULL;
_uniqid._nextUpperValue = 0ULL;
_uniqid._backgroundJobIsRunning = false;
// Actual loading into caches is postponed until necessary
}
@ -269,6 +271,16 @@ void ClusterInfo::cleanup() {
return;
}
while (true) {
{
MUTEX_LOCKER(mutexLocker, theInstance->_idLock);
if (!theInstance->_uniqid._backgroundJobIsRunning) {
break ;
}
}
std::this_thread::sleep_for(std::chrono::seconds(1));
}
MUTEX_LOCKER(mutexLocker, theInstance->_planProt.mutex);
TRI_ASSERT(theInstance->_newPlannedViews.empty()); // only non-empty during loadPlan()
@ -294,52 +306,92 @@ void ClusterInfo::logAgencyDump() const {
#endif
}
void ClusterInfo::triggerBackgroundGetIds() {
// Trigger a new load of batches
_uniqid._nextBatchStart = 1ULL;
_uniqid._nextUpperValue = 0ULL;
try {
if (_uniqid._backgroundJobIsRunning) {
return ;
}
_uniqid._backgroundJobIsRunning = true;
std::thread([this]{
auto guardRunning = scopeGuard([this]{
MUTEX_LOCKER(mutexLocker, _idLock);
_uniqid._backgroundJobIsRunning = false;
});
uint64_t result;
try {
result = _agency.uniqid(MinIdsPerBatch, 0.0);
} catch (std::exception const&) {
return ;
}
{
MUTEX_LOCKER(mutexLocker, _idLock);
if (1ULL == _uniqid._nextBatchStart) {
// Invalidate next batch
_uniqid._nextBatchStart = result;
_uniqid._nextUpperValue = result + MinIdsPerBatch - 1;
}
// If we get here, somebody else tried succeeded in doing the same,
// so we just try again.
}
}).detach();
} catch (std::exception const& e) {
LOG_TOPIC(WARN, Logger::CLUSTER) << "Failed to trigger background get ids. " << e.what();
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief increase the uniqid value. if it exceeds the upper bound, fetch a
/// new upper bound value from the agency
////////////////////////////////////////////////////////////////////////////////
uint64_t ClusterInfo::uniqid(uint64_t count) {
while (true) {
uint64_t oldValue;
{
// The quick path, we have enough in our private reserve:
MUTEX_LOCKER(mutexLocker, _idLock);
MUTEX_LOCKER(mutexLocker, _idLock);
if (_uniqid._currentValue + count - 1 <= _uniqid._upperValue) {
uint64_t result = _uniqid._currentValue;
_uniqid._currentValue += count;
return result;
}
oldValue = _uniqid._currentValue;
}
// We need to fetch from the agency
uint64_t fetch = count;
if (fetch < MinIdsPerBatch) {
fetch = MinIdsPerBatch;
}
uint64_t result = _agency.uniqid(fetch, 0.0);
{
MUTEX_LOCKER(mutexLocker, _idLock);
if (oldValue == _uniqid._currentValue) {
_uniqid._currentValue = result + count;
_uniqid._upperValue = result + fetch - 1;
return result;
}
// If we get here, somebody else tried succeeded in doing the same,
// so we just try again.
}
if (_uniqid._currentValue + count - 1 <= _uniqid._upperValue) {
uint64_t result = _uniqid._currentValue;
_uniqid._currentValue += count;
return result;
}
// Try if we can use the next batch
if (_uniqid._nextBatchStart + count - 1 <= _uniqid._nextUpperValue) {
uint64_t result = _uniqid._nextBatchStart;
_uniqid._currentValue = _uniqid._nextBatchStart + count;
_uniqid._upperValue = _uniqid._nextUpperValue;
triggerBackgroundGetIds();
return result;
}
// We need to fetch from the agency
uint64_t fetch = count;
if (fetch < MinIdsPerBatch) {
fetch = MinIdsPerBatch;
}
uint64_t result = _agency.uniqid(2 * fetch, 0.0);
_uniqid._currentValue = result + count;
_uniqid._upperValue = result + fetch - 1;
// Invalidate next batch
_uniqid._nextBatchStart = _uniqid._upperValue + 1;
_uniqid._nextUpperValue = _uniqid._upperValue + fetch - 1;
return result;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief flush the caches (used for testing)
////////////////////////////////////////////////////////////////////////////////
@ -1603,7 +1655,7 @@ Result ClusterInfo::checkCollectionPreconditions(std::string const& databaseName
std::vector<ClusterCollectionCreationInfo> const& infos,
uint64_t& planVersion) {
READ_LOCKER(readLocker, _planProt.lock);
planVersion = _planVersion;
for (auto const& info : infos) {
@ -1611,7 +1663,7 @@ Result ClusterInfo::checkCollectionPreconditions(std::string const& databaseName
if (info.name.empty() || !info.json.isObject() || !info.json.get("shards").isObject()) {
return TRI_ERROR_BAD_PARAMETER; // must not be empty
}
// Validate that the collection does not exist in the current plan
{
AllCollections::const_iterator it = _plannedCollections.find(databaseName);
@ -1634,7 +1686,7 @@ Result ClusterInfo::checkCollectionPreconditions(std::string const& databaseName
}
}
}
// Validate that there is no view with this name either
{
// check against planned views as well
@ -1654,7 +1706,7 @@ Result ClusterInfo::checkCollectionPreconditions(std::string const& databaseName
return {};
}
/// @brief create multiple collections in coordinator
/// If any one of these collections fails, all creations will be
/// rolled back.
@ -1680,7 +1732,7 @@ Result ClusterInfo::createCollectionsCoordinator(std::string const& databaseName
AgencyComm ac;
std::vector<std::shared_ptr<AgencyCallback>> agencyCallbacks;
auto cbGuard = scopeGuard([&] {
// We have a subtle race here, that we try to cover against:
// We register a callback in the agency.
@ -1703,7 +1755,7 @@ Result ClusterInfo::createCollectionsCoordinator(std::string const& databaseName
std::vector<AgencyOperation> opers({IncreaseVersion()});
std::vector<AgencyPrecondition> precs;
std::unordered_set<std::string> conditions;
// current thread owning 'cacheMutex' write lock (workaround for non-recursive Mutex)
for (auto& info : infos) {
TRI_ASSERT(!info.name.empty());
@ -1860,7 +1912,7 @@ Result ClusterInfo::createCollectionsCoordinator(std::string const& databaseName
AgencyPrecondition::Type::EMPTY, true));
}
}
// additionally ensure that no such collectionID exists yet in Plan/Collections
precs.emplace_back(AgencyPrecondition("Plan/Collections/" + databaseName + "/" + info.collectionID,
AgencyPrecondition::Type::EMPTY, true));
@ -1879,13 +1931,13 @@ Result ClusterInfo::createCollectionsCoordinator(std::string const& databaseName
}
// now try to update the plan in the agency, using the current plan version as our
// now try to update the plan in the agency, using the current plan version as our
// precondition
{
// create a builder with just the version number for comparison
VPackBuilder versionBuilder;
versionBuilder.add(VPackValue(planVersion));
// add a precondition that checks the plan version has not yet changed
precs.emplace_back(AgencyPrecondition("Plan/Version", AgencyPrecondition::Type::VALUE, versionBuilder.slice()));
@ -1903,8 +1955,8 @@ Result ClusterInfo::createCollectionsCoordinator(std::string const& databaseName
// use this special error code to signal that we got a precondition failure
// in this case the caller can try again with an updated version of the plan change
return {TRI_ERROR_REQUEST_CANCELED, "operation aborted due to precondition failure"};
}
}
std::string errorMsg = "HTTP code: " + std::to_string(res.httpCode());
errorMsg += " error message: " + res.errorMessage();
errorMsg += " error details: " + res.errorDetails();
@ -1921,7 +1973,7 @@ Result ClusterInfo::createCollectionsCoordinator(std::string const& databaseName
}
// if we got here, the plan was updated successfully
LOG_TOPIC(DEBUG, Logger::CLUSTER)
<< "createCollectionCoordinator, Plan changed, waiting for success...";
@ -2158,7 +2210,7 @@ int ClusterInfo::dropCollectionCoordinator(std::string const& dbName,
<< "Precondition failed for this agency transaction: " << trans.toJson()
<< ", return code: " << res.httpCode();
}
logAgencyDump();
// TODO: this should rather be TRI_ERROR_ARANGO_DATABASE_NOT_FOUND, as the
// precondition is that the database still exists
@ -2195,7 +2247,7 @@ int ClusterInfo::dropCollectionCoordinator(std::string const& dbName,
<< "Timeout in _drop collection (" << realTimeout << ")"
<< ": database: " << dbName << ", collId:" << collectionID
<< "\ntransaction sent to agency: " << trans.toJson();
logAgencyDump();
events::DropCollection(collectionID, TRI_ERROR_CLUSTER_TIMEOUT);
return setErrormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg);

View File

@ -556,6 +556,11 @@ class ClusterInfo {
std::shared_ptr<std::vector<ServerID>> getResponsibleServer(ShardID const&);
//////////////////////////////////////////////////////////////////////////////
/// @brief triggers a new background thread to obtain the next batch of ids
//////////////////////////////////////////////////////////////////////////////
void triggerBackgroundGetIds();
//////////////////////////////////////////////////////////////////////////////
/// @brief find the shard list of a collection, sorted numerically
//////////////////////////////////////////////////////////////////////////////
@ -642,7 +647,7 @@ class ClusterInfo {
* @return List of DB servers serving the shard
*/
arangodb::Result getShardServers(ShardID const& shardId, std::vector<ServerID>&);
//////////////////////////////////////////////////////////////////////////////
/// @brief get an operation timeout
//////////////////////////////////////////////////////////////////////////////
@ -789,6 +794,9 @@ class ClusterInfo {
struct {
uint64_t _currentValue;
uint64_t _upperValue;
uint64_t _nextBatchStart;
uint64_t _nextUpperValue;
bool _backgroundJobIsRunning;
} _uniqid;
//////////////////////////////////////////////////////////////////////////////