1
0
Fork 0
arangodb/arangod/Cluster/AgencyCallback.h

155 lines
6.3 KiB
C++

////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2018 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Andreas Streichardt
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGODB_CLUSTER_AGENCY_CALLBACK_H
#define ARANGODB_CLUSTER_AGENCY_CALLBACK_H
#include "Basics/Common.h"
#include <velocypack/Builder.h>
#include <velocypack/velocypack-aliases.h>
#include <functional>
#include <memory>
#include "Agency/AgencyComm.h"
#include "Basics/ConditionVariable.h"
#include "Basics/Mutex.h"
namespace arangodb {
////////////////////////////////////////////////////////////////////////////////
/// class AgencyCallback
///
/// This class encapsulates an agency observer that has been registered
/// with the agency. One specifies a callback function that is called
/// for every incoming HTTP request from the agency. A mutex ensures that
/// this callback function is only executed in one thread at a time (see
/// below for more details).
///
/// Furthermore, if needsValue == true, the latest value of the key
/// which the callback observes is kept and updated with every HTTP
/// request received from the agency, and handed to the callback
/// function. If needsValue == false then a None Slice is handed in
/// instead.
///
/// If an initial value should be kept even before the first agency callback
/// has happened, then needInitialValue must be set to true. In this case
/// the callback function is already called once at object creation.
///
/// Usually, with needsValue == true one would like to wait until a certain
/// condition is met with respect to the value. The callback is only called
/// for new values, such that one can check this condition in the callback
/// function.
///
/// To assist code that wants to wait for something which is discovered
/// in the callback function (for example a certain value of the
/// observed key), this class maintains a condition variable, which is
/// signaled whenever the callback function has been called. To avoid
/// missing signals, the above mentioned mutex is the one of the condition
/// variable and the callback function is always called under this mutex
/// and the signal is sent while the mutex is still held. Thus, the following
/// pseudocode does not miss events:
///
/// create AgencyCallback object with a callback function and register it
/// TRI_defer(unregister callback)
/// {
/// acquire mutex of condition variable
/// while true:
/// check if a callback has produced the termination event: if so: OK
/// if overall patience lost: leave with error
/// wait for condition variable with a timeout
/// }
///
/// In this way, the mutex of the condition variable can at the same time
/// organize mutual exclusion of the callback function and the checking of
/// the termination condition in the main thread.
/// The wait for condition variable can conveniently be done with the
/// method executeByCallbackOrTimeout below.
class AgencyCallback {
//////////////////////////////////////////////////////////////////////////////
/// @brief ctor
//////////////////////////////////////////////////////////////////////////////
public:
AgencyCallback(AgencyComm&, std::string const&,
std::function<bool(VPackSlice const&)> const&, bool needsValue,
bool needsInitialValue = true);
std::string const key;
arangodb::basics::ConditionVariable _cv;
//////////////////////////////////////////////////////////////////////////////
/// @brief refetch the value, and call the callback function with it,
/// this is called whenever an HTTP request is received from the agency
/// (see RestAgencyCallbacksHandler and AgencyCallbackRegistry). If the
/// forceCheck flag is set, a check is initiated even if the value has
/// not changed. This is needed in case other outside conditions could
/// have changed (like a Plan change).
//////////////////////////////////////////////////////////////////////////////
void refetchAndUpdate(bool needToAcquireMutex, bool forceCheck);
//////////////////////////////////////////////////////////////////////////////
/// @brief wait until a callback is received or a timeout has happened
///
/// @return true => if we got woken up after maxTimeout
/// false => if someone else ringed the condition variable
//////////////////////////////////////////////////////////////////////////////
bool executeByCallbackOrTimeout(double);
//////////////////////////////////////////////////////////////////////////////
/// @brief private members
//////////////////////////////////////////////////////////////////////////////
private:
AgencyComm& _agency;
std::function<bool(VPackSlice const&)> const _cb;
std::shared_ptr<VPackBuilder> _lastData;
bool const _needsValue;
/// @brief this flag is set if there was an attempt to signal the callback's
/// condition variable - this is necessary to catch all signals that happen
/// before the caller is going into the wait state, i.e. to prevent this
/// 1) register callback
/// 2a) execute callback
/// 2b) execute callback signaling
/// 3) caller going into condition.wait() (and not woken up)
/// this variable is protected by the condition variable!
bool _wasSignaled;
// execute callback with current value data:
bool execute(std::shared_ptr<VPackBuilder>);
// execute callback without any data:
bool executeEmpty();
// Compare last value and newly read one and call execute if the are
// different:
void checkValue(std::shared_ptr<VPackBuilder>, bool forceCheck);
};
} // namespace arangodb
#endif