1
0
Fork 0

[3.5] Background Get Ids (#9786)

* Obtain new unique IDs via a background thread.

* Updated changelog.
This commit is contained in:
Lars Maier 2019-09-27 18:21:26 +02:00 committed by KVS85
parent 6d3548fd92
commit 0757cf2b92
3 changed files with 97 additions and 38 deletions

View File

@ -1,6 +1,8 @@
v3.5.1 (XXXX-XX-XX)
-------------------
* Obtain new unique IDs via a background thread.
* Fixed issue #10078: FULLTEXT with sort on same field not working.
* Fixed issue #10062: AQL: could not extract custom attribute.

View File

@ -203,7 +203,9 @@ ClusterInfo::ClusterInfo(AgencyCallbackRegistry* agencyCallbackRegistry)
_uniqid() {
_uniqid._currentValue = 1ULL;
_uniqid._upperValue = 0ULL;
_uniqid._nextBatchStart = 1ULL;
_uniqid._nextUpperValue = 0ULL;
_uniqid._backgroundJobIsRunning = false;
// Actual loading into caches is postponed until necessary
}
@ -223,6 +225,16 @@ void ClusterInfo::cleanup() {
return;
}
while (true) {
{
MUTEX_LOCKER(mutexLocker, theInstance->_idLock);
if (!theInstance->_uniqid._backgroundJobIsRunning) {
break ;
}
}
std::this_thread::sleep_for(std::chrono::seconds(1));
}
MUTEX_LOCKER(mutexLocker, theInstance->_planProt.mutex);
TRI_ASSERT(theInstance->_newPlannedViews.empty()); // only non-empty during loadPlan()
@ -249,50 +261,87 @@ void ClusterInfo::logAgencyDump() const {
#endif
}
void ClusterInfo::triggerBackgroundGetIds() {
// Trigger a new load of batches
_uniqid._nextBatchStart = 1ULL;
_uniqid._nextUpperValue = 0ULL;
try {
if (_uniqid._backgroundJobIsRunning) {
return ;
}
_uniqid._backgroundJobIsRunning = true;
std::thread([this]{
auto guardRunning = scopeGuard([this]{
MUTEX_LOCKER(mutexLocker, _idLock);
_uniqid._backgroundJobIsRunning = false;
});
uint64_t result;
try {
result = _agency.uniqid(MinIdsPerBatch, 0.0);
} catch (std::exception const&) {
return ;
}
{
MUTEX_LOCKER(mutexLocker, _idLock);
if (1ULL == _uniqid._nextBatchStart) {
// Invalidate next batch
_uniqid._nextBatchStart = result;
_uniqid._nextUpperValue = result + MinIdsPerBatch - 1;
}
// If we get here, somebody else tried succeeded in doing the same,
// so we just try again.
}
}).detach();
} catch (std::exception const& e) {
LOG_TOPIC("adef4", WARN, Logger::CLUSTER) << "Failed to trigger background get ids. " << e.what();
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief increase the uniqid value. if it exceeds the upper bound, fetch a
/// new upper bound value from the agency
////////////////////////////////////////////////////////////////////////////////
uint64_t ClusterInfo::uniqid(uint64_t count) {
while (true) {
uint64_t oldValue;
{
// The quick path, we have enough in our private reserve:
MUTEX_LOCKER(mutexLocker, _idLock);
MUTEX_LOCKER(mutexLocker, _idLock);
if (_uniqid._currentValue + count - 1 <= _uniqid._upperValue) {
uint64_t result = _uniqid._currentValue;
_uniqid._currentValue += count;
return result;
}
oldValue = _uniqid._currentValue;
}
// We need to fetch from the agency
uint64_t fetch = count;
if (fetch < MinIdsPerBatch) {
fetch = MinIdsPerBatch;
}
uint64_t result = _agency.uniqid(fetch, 0.0);
{
MUTEX_LOCKER(mutexLocker, _idLock);
if (oldValue == _uniqid._currentValue) {
_uniqid._currentValue = result + count;
_uniqid._upperValue = result + fetch - 1;
return result;
}
// If we get here, somebody else tried succeeded in doing the same,
// so we just try again.
}
if (_uniqid._currentValue + count - 1 <= _uniqid._upperValue) {
uint64_t result = _uniqid._currentValue;
_uniqid._currentValue += count;
return result;
}
// Try if we can use the next batch
if (_uniqid._nextBatchStart + count - 1 <= _uniqid._nextUpperValue) {
uint64_t result = _uniqid._nextBatchStart;
_uniqid._currentValue = _uniqid._nextBatchStart + count;
_uniqid._upperValue = _uniqid._nextUpperValue;
triggerBackgroundGetIds();
return result;
}
// We need to fetch from the agency
uint64_t fetch = count;
if (fetch < MinIdsPerBatch) {
fetch = MinIdsPerBatch;
}
uint64_t result = _agency.uniqid(2 * fetch, 0.0);
_uniqid._currentValue = result + count;
_uniqid._upperValue = result + fetch - 1;
// Invalidate next batch
_uniqid._nextBatchStart = _uniqid._upperValue + 1;
_uniqid._nextUpperValue = _uniqid._upperValue + fetch - 1;
return result;
}
////////////////////////////////////////////////////////////////////////////////

View File

@ -722,6 +722,11 @@ class ClusterInfo final {
std::shared_ptr<std::vector<ServerID>> getResponsibleServer(ShardID const&);
//////////////////////////////////////////////////////////////////////////////
/// @brief triggers a new background thread to obtain the next batch of ids
//////////////////////////////////////////////////////////////////////////////
void triggerBackgroundGetIds();
//////////////////////////////////////////////////////////////////////////////
/// @brief find the shard list of a collection, sorted numerically
//////////////////////////////////////////////////////////////////////////////
@ -984,6 +989,9 @@ class ClusterInfo final {
struct {
uint64_t _currentValue;
uint64_t _upperValue;
uint64_t _nextBatchStart;
uint64_t _nextUpperValue;
bool _backgroundJobIsRunning;
} _uniqid;
//////////////////////////////////////////////////////////////////////////////