1
0
Fork 0

fix agency callback dumbness (#9624)

This commit is contained in:
Jan 2019-08-02 11:42:24 +02:00 committed by GitHub
parent c5ecfb3833
commit e176528e61
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 33 additions and 7 deletions

View File

@ -1,6 +1,9 @@
v3.3.24 (XXXX-XX-XX)
--------------------
* Decreased unnecessary wait times for agency callbacks in case they were
called earlier than expected by main thread.
* upgraded arangodb starter version to 0.14.12
* upgrade arangosync version to 0.6.5

View File

@ -41,7 +41,11 @@ using namespace arangodb;
AgencyCallback::AgencyCallback(AgencyComm& agency, std::string const& key,
std::function<bool(VPackSlice const&)> const& cb,
bool needsValue, bool needsInitialValue)
: key(key), _agency(agency), _cb(cb), _needsValue(needsValue) {
: key(key),
_agency(agency),
_cb(cb),
_needsValue(needsValue),
_wasSignaled(false) {
if (_needsValue && needsInitialValue) {
refetchAndUpdate(true, false);
}
@ -105,6 +109,7 @@ bool AgencyCallback::executeEmpty() {
LOG_TOPIC(DEBUG, Logger::CLUSTER) << "Executing (empty)";
bool result = _cb(VPackSlice::noneSlice());
if (result) {
_wasSignaled = true;
_cv.signal();
}
return result;
@ -116,6 +121,7 @@ bool AgencyCallback::execute(std::shared_ptr<VPackBuilder> newData) {
LOG_TOPIC(DEBUG, Logger::CLUSTER) << "Executing";
bool result = _cb(newData->slice());
if (result) {
_wasSignaled = true;
_cv.signal();
}
return result;
@ -124,11 +130,18 @@ bool AgencyCallback::execute(std::shared_ptr<VPackBuilder> newData) {
void AgencyCallback::executeByCallbackOrTimeout(double maxTimeout) {
// One needs to acquire the mutex of the condition variable
// before entering this function!
if (!_cv.wait(static_cast<uint64_t>(maxTimeout * 1000000.0)) &&
application_features::ApplicationServer::isRetryOK()) {
LOG_TOPIC(DEBUG, Logger::CLUSTER)
<< "Waiting done and nothing happended. Refetching to be sure";
// mop: watches have not triggered during our sleep...recheck to be sure
refetchAndUpdate(false, true); // Force a check
if (!application_features::ApplicationServer::isStopping()) {
if (_wasSignaled) {
// ok, we have been signaled already, so there is no need to wait at all
// directly refetch the values
_wasSignaled = false;
LOG_TOPIC(DEBUG, Logger::CLUSTER) << "We were signaled already";
} else if (!_cv.wait(static_cast<uint64_t>(maxTimeout * 1000000.0)) &&
application_features::ApplicationServer::isRetryOK()) {
LOG_TOPIC(DEBUG, Logger::CLUSTER)
<< "Waiting done and nothing happended. Refetching to be sure";
// mop: watches have not triggered during our sleep...recheck to be sure
refetchAndUpdate(false, true); // Force a check
}
}
}

View File

@ -125,6 +125,16 @@ class AgencyCallback {
std::function<bool(VPackSlice const&)> const _cb;
std::shared_ptr<VPackBuilder> _lastData;
bool const _needsValue;
/// @brief this flag is set if there was an attempt to signal the callback's
/// condition variable - this is necessary to catch all signals that happen
/// before the caller is going into the wait state, i.e. to prevent this
/// 1) register callback
/// 2a) execute callback
/// 2b) execute callback signaling
/// 3) caller going into condition.wait() (and not woken up)
/// this variable is protected by the condition variable!
bool _wasSignaled;
// execute callback with current value data:
bool execute(std::shared_ptr<VPackBuilder>);