mirror of https://gitee.com/bigwinds/arangodb
speedup collection creation
This commit is contained in:
parent
600c0ab93c
commit
33d0a4ff9b
|
@ -1,6 +1,9 @@
|
||||||
v3.4.6.1 (2019-06-03)
|
v3.4.6.1 (2019-06-03)
|
||||||
---------------------
|
---------------------
|
||||||
|
|
||||||
|
* Speed up collection creation process in cluster, if not all agency callbacks are
|
||||||
|
delivered successfully.
|
||||||
|
|
||||||
* Fix agency's handling of object assignments with TTL
|
* Fix agency's handling of object assignments with TTL
|
||||||
|
|
||||||
v3.4.6 (2019-05-28)
|
v3.4.6 (2019-05-28)
|
||||||
|
|
|
@ -127,7 +127,7 @@ bool AgencyCallback::execute(std::shared_ptr<VPackBuilder> newData) {
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
void AgencyCallback::executeByCallbackOrTimeout(double maxTimeout) {
|
bool AgencyCallback::executeByCallbackOrTimeout(double maxTimeout) {
|
||||||
// One needs to acquire the mutex of the condition variable
|
// One needs to acquire the mutex of the condition variable
|
||||||
// before entering this function!
|
// before entering this function!
|
||||||
if (!_cv.wait(static_cast<uint64_t>(maxTimeout * 1000000.0)) &&
|
if (!_cv.wait(static_cast<uint64_t>(maxTimeout * 1000000.0)) &&
|
||||||
|
@ -136,5 +136,7 @@ void AgencyCallback::executeByCallbackOrTimeout(double maxTimeout) {
|
||||||
<< "Waiting done and nothing happended. Refetching to be sure";
|
<< "Waiting done and nothing happended. Refetching to be sure";
|
||||||
// mop: watches have not triggered during our sleep...recheck to be sure
|
// mop: watches have not triggered during our sleep...recheck to be sure
|
||||||
refetchAndUpdate(false, true); // Force a check
|
refetchAndUpdate(false, true); // Force a check
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
|
@ -112,9 +112,12 @@ class AgencyCallback {
|
||||||
|
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
/// @brief wait until a callback is received or a timeout has happened
|
/// @brief wait until a callback is received or a timeout has happened
|
||||||
|
///
|
||||||
|
/// @return true => if we got woken up after maxTimeout
|
||||||
|
/// false => if someone else ringed the condition variable
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
void executeByCallbackOrTimeout(double);
|
bool executeByCallbackOrTimeout(double);
|
||||||
|
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
/// @brief private members
|
/// @brief private members
|
||||||
|
|
|
@ -1919,15 +1919,9 @@ Result ClusterInfo::createCollectionsCoordinator(std::string const& databaseName
|
||||||
}
|
}
|
||||||
|
|
||||||
if (nrDone->load(std::memory_order_acquire) == infos.size()) {
|
if (nrDone->load(std::memory_order_acquire) == infos.size()) {
|
||||||
{
|
// unregister the callbacks
|
||||||
// We need to lock all condition variables
|
// We do not need any locks on condition variables as we are locked by the cacheMutex
|
||||||
std::vector<::arangodb::basics::ConditionLocker> lockers;
|
|
||||||
for (auto& cb : agencyCallbacks) {
|
|
||||||
CONDITION_LOCKER(locker, cb->_cv);
|
|
||||||
}
|
|
||||||
cbGuard.fire();
|
cbGuard.fire();
|
||||||
// After the guard is done we can release the lockers
|
|
||||||
}
|
|
||||||
// Now we need to remove TTL + the IsBuilding flag in Agency
|
// Now we need to remove TTL + the IsBuilding flag in Agency
|
||||||
opers.clear();
|
opers.clear();
|
||||||
opers.push_back(IncreaseVersion());
|
opers.push_back(IncreaseVersion());
|
||||||
|
@ -1958,15 +1952,9 @@ Result ClusterInfo::createCollectionsCoordinator(std::string const& databaseName
|
||||||
return TRI_ERROR_NO_ERROR;
|
return TRI_ERROR_NO_ERROR;
|
||||||
}
|
}
|
||||||
if (tmpRes > TRI_ERROR_NO_ERROR) {
|
if (tmpRes > TRI_ERROR_NO_ERROR) {
|
||||||
{
|
// unregister the callbacks
|
||||||
// We need to lock all condition variables
|
// We do not need any locks on condition variables as we are locked by the cacheMutex
|
||||||
std::vector<::arangodb::basics::ConditionLocker> lockers;
|
|
||||||
for (auto& cb : agencyCallbacks) {
|
|
||||||
CONDITION_LOCKER(locker, cb->_cv);
|
|
||||||
}
|
|
||||||
cbGuard.fire();
|
cbGuard.fire();
|
||||||
// After the guard is done we can release the lockers
|
|
||||||
}
|
|
||||||
|
|
||||||
// report error
|
// report error
|
||||||
for (auto const& info : infos) {
|
for (auto const& info : infos) {
|
||||||
|
@ -1997,9 +1985,20 @@ Result ClusterInfo::createCollectionsCoordinator(std::string const& databaseName
|
||||||
TRI_ASSERT(agencyCallbacks.size() == infos.size());
|
TRI_ASSERT(agencyCallbacks.size() == infos.size());
|
||||||
for (size_t i = 0; i < infos.size(); ++i) {
|
for (size_t i = 0; i < infos.size(); ++i) {
|
||||||
if (infos[i].state == ClusterCollectionCreationInfo::INIT) {
|
if (infos[i].state == ClusterCollectionCreationInfo::INIT) {
|
||||||
|
{
|
||||||
// This one has not responded, wait for it.
|
// This one has not responded, wait for it.
|
||||||
CONDITION_LOCKER(locker, agencyCallbacks[i]->_cv);
|
CONDITION_LOCKER(locker, agencyCallbacks[i]->_cv);
|
||||||
agencyCallbacks[i]->executeByCallbackOrTimeout(interval);
|
bool wokenUp = agencyCallbacks[i]->executeByCallbackOrTimeout(interval);
|
||||||
|
if (wokenUp) {
|
||||||
|
// We got woken up by waittime, not by callback.
|
||||||
|
// Let us check if we skipped other callbacks as well
|
||||||
|
for (; i < infos.size(); ++i) {
|
||||||
|
if (infos[i].state == ClusterCollectionCreationInfo::INIT) {
|
||||||
|
agencyCallbacks[i]->refetchAndUpdate(true, false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,7 +21,7 @@
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
#ifndef ARANGOD_VOCBASE__COLLECTION_CREATION_INFO_H
|
#ifndef ARANGOD_VOCBASE__COLLECTION_CREATION_INFO_H
|
||||||
#define ARANGOD_VOCBASE__COLLECTION_CREATION__INFO_H 1
|
#define ARANGOD_VOCBASE__COLLECTION_CREATION_INFO_H 1
|
||||||
|
|
||||||
#include <velocypack/Slice.h>
|
#include <velocypack/Slice.h>
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue