mirror of https://gitee.com/bigwinds/arangodb
Fix missed callbacks race condition (#9183)
* If we miss a callback in CollectionCreation also test if we missed later callbacks. * Updated changelog * Fixed callback lockers * Remove lockers vector alltogether, we are protected by mutex anyways * Fixed include guard * Do not recheck the Collection that has woken us up * Fixed recursive lock gathering
This commit is contained in:
parent
9c123a638c
commit
b14259b55e
|
@ -2,6 +2,10 @@ v3.4.7 (2019-XX-XX)
|
|||
-------------------
|
||||
* allow pregel to select the shard key via `shardKeyAttribute` in pregel start parameters
|
||||
|
||||
|
||||
* Speed up collection creation process in cluster, if not all agency callbacks are
|
||||
delivered successfully.
|
||||
|
||||
* Fixed parsing of ArangoDB config files with inlined comments. Previous versions didn't handle
|
||||
line comments properly if they were appended to an otherwise valid option value.
|
||||
|
||||
|
|
|
@ -127,7 +127,7 @@ bool AgencyCallback::execute(std::shared_ptr<VPackBuilder> newData) {
|
|||
return result;
|
||||
}
|
||||
|
||||
void AgencyCallback::executeByCallbackOrTimeout(double maxTimeout) {
|
||||
bool AgencyCallback::executeByCallbackOrTimeout(double maxTimeout) {
|
||||
// One needs to acquire the mutex of the condition variable
|
||||
// before entering this function!
|
||||
if (!_cv.wait(static_cast<uint64_t>(maxTimeout * 1000000.0)) &&
|
||||
|
@ -136,5 +136,7 @@ void AgencyCallback::executeByCallbackOrTimeout(double maxTimeout) {
|
|||
<< "Waiting done and nothing happended. Refetching to be sure";
|
||||
// mop: watches have not triggered during our sleep...recheck to be sure
|
||||
refetchAndUpdate(false, true); // Force a check
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -112,9 +112,12 @@ class AgencyCallback {
|
|||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief wait until a callback is received or a timeout has happened
|
||||
///
|
||||
/// @return true => if we got woken up after maxTimeout
|
||||
/// false => if someone else ringed the condition variable
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void executeByCallbackOrTimeout(double);
|
||||
bool executeByCallbackOrTimeout(double);
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief private members
|
||||
|
|
|
@ -1919,15 +1919,9 @@ Result ClusterInfo::createCollectionsCoordinator(std::string const& databaseName
|
|||
}
|
||||
|
||||
if (nrDone->load(std::memory_order_acquire) == infos.size()) {
|
||||
{
|
||||
// We need to lock all condition variables
|
||||
std::vector<::arangodb::basics::ConditionLocker> lockers;
|
||||
for (auto& cb : agencyCallbacks) {
|
||||
CONDITION_LOCKER(locker, cb->_cv);
|
||||
}
|
||||
cbGuard.fire();
|
||||
// After the guard is done we can release the lockers
|
||||
}
|
||||
// unregister the callbacks
|
||||
// We do not need any locks on condition variables as we are locked by the cacheMutex
|
||||
cbGuard.fire();
|
||||
// Now we need to remove TTL + the IsBuilding flag in Agency
|
||||
opers.clear();
|
||||
opers.push_back(IncreaseVersion());
|
||||
|
@ -1958,15 +1952,9 @@ Result ClusterInfo::createCollectionsCoordinator(std::string const& databaseName
|
|||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
if (tmpRes > TRI_ERROR_NO_ERROR) {
|
||||
{
|
||||
// We need to lock all condition variables
|
||||
std::vector<::arangodb::basics::ConditionLocker> lockers;
|
||||
for (auto& cb : agencyCallbacks) {
|
||||
CONDITION_LOCKER(locker, cb->_cv);
|
||||
}
|
||||
cbGuard.fire();
|
||||
// After the guard is done we can release the lockers
|
||||
}
|
||||
// unregister the callbacks
|
||||
// We do not need any locks on condition variables as we are locked by the cacheMutex
|
||||
cbGuard.fire();
|
||||
|
||||
// report error
|
||||
for (auto const& info : infos) {
|
||||
|
@ -1998,8 +1986,22 @@ Result ClusterInfo::createCollectionsCoordinator(std::string const& databaseName
|
|||
for (size_t i = 0; i < infos.size(); ++i) {
|
||||
if (infos[i].state == ClusterCollectionCreationInfo::INIT) {
|
||||
// This one has not responded, wait for it.
|
||||
CONDITION_LOCKER(locker, agencyCallbacks[i]->_cv);
|
||||
agencyCallbacks[i]->executeByCallbackOrTimeout(interval);
|
||||
bool wokenUp = false;
|
||||
{
|
||||
// Release the lock
|
||||
CONDITION_LOCKER(locker, agencyCallbacks[i]->_cv);
|
||||
wokenUp = agencyCallbacks[i]->executeByCallbackOrTimeout(interval);
|
||||
}
|
||||
if (wokenUp) {
|
||||
++i;
|
||||
// We got woken up by waittime, not by callback.
|
||||
// Let us check if we skipped other callbacks as well
|
||||
for (; i < infos.size(); ++i) {
|
||||
if (infos[i].state == ClusterCollectionCreationInfo::INIT) {
|
||||
agencyCallbacks[i]->refetchAndUpdate(true, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,7 +21,7 @@
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#ifndef ARANGOD_VOCBASE__COLLECTION_CREATION_INFO_H
|
||||
#define ARANGOD_VOCBASE__COLLECTION_CREATION__INFO_H 1
|
||||
#define ARANGOD_VOCBASE__COLLECTION_CREATION_INFO_H 1
|
||||
|
||||
#include <velocypack/Slice.h>
|
||||
|
||||
|
@ -37,4 +37,4 @@ struct CollectionCreationInfo {
|
|||
};
|
||||
} // namespace arangodb
|
||||
|
||||
#endif
|
||||
#endif
|
||||
|
|
Loading…
Reference in New Issue