mirror of https://gitee.com/bigwinds/arangodb
Optimize version increase stuff
This commit is contained in:
parent
7a8f101cfc
commit
612c387c56
|
@ -68,6 +68,7 @@ void AgencyCallback::refetchAndUpdate() {
|
|||
|
||||
if (it == result._values.end()) {
|
||||
std::shared_ptr<VPackBuilder> newData = std::make_shared<VPackBuilder>();
|
||||
newData->add(VPackSlice::noneSlice());
|
||||
checkValue(newData);
|
||||
} else {
|
||||
checkValue(it->second._vpack);
|
||||
|
@ -76,7 +77,8 @@ void AgencyCallback::refetchAndUpdate() {
|
|||
|
||||
void AgencyCallback::checkValue(std::shared_ptr<VPackBuilder> newData) {
|
||||
if (!_lastData || !_lastData->slice().equals(newData->slice())) {
|
||||
LOG(DEBUG) << "Got new value" << newData->toJson();
|
||||
LOG(DEBUG) << "Got new value " << newData->slice().typeName();
|
||||
LOG(DEBUG) << "Got new value " << newData->toJson();
|
||||
if (execute(newData)) {
|
||||
_lastData = newData;
|
||||
} else {
|
||||
|
|
|
@ -346,8 +346,8 @@ bool AgencyCommResult::parseVelocyPackNode(VPackSlice const& node,
|
|||
// get "value" attribute
|
||||
VPackSlice const value = node.get("value");
|
||||
|
||||
if (value.isString()) {
|
||||
if (!prefix.empty()) {
|
||||
if (!prefix.empty()) {
|
||||
if (value.isString()) {
|
||||
AgencyCommResultEntry entry;
|
||||
|
||||
// get "modifiedIndex"
|
||||
|
@ -357,6 +357,18 @@ bool AgencyCommResult::parseVelocyPackNode(VPackSlice const& node,
|
|||
entry._vpack = VPackParser::fromJson(tmp);
|
||||
entry._isDir = false;
|
||||
|
||||
_values.emplace(prefix, entry);
|
||||
} else if (value.isNumber()) {
|
||||
AgencyCommResultEntry entry;
|
||||
|
||||
// get "modifiedIndex"
|
||||
entry._index = arangodb::basics::VelocyPackHelper::stringUInt64(
|
||||
node.get("modifiedIndex"));
|
||||
entry._vpack = std::make_shared<VPackBuilder>();
|
||||
entry._isDir = false;
|
||||
|
||||
entry._vpack->add(value);
|
||||
|
||||
_values.emplace(prefix, entry);
|
||||
}
|
||||
}
|
||||
|
@ -433,7 +445,7 @@ AgencyConnectionOptions AgencyComm::_globalConnectionOptions = {
|
|||
|
||||
AgencyCommLocker::AgencyCommLocker(std::string const& key,
|
||||
std::string const& type, double ttl, double timeout)
|
||||
: _key(key), _type(type), _version(0), _isLocked(false) {
|
||||
: _key(key), _type(type), _isLocked(false) {
|
||||
AgencyComm comm;
|
||||
|
||||
_vpack = std::make_shared<VPackBuilder>();
|
||||
|
@ -444,7 +456,6 @@ AgencyCommLocker::AgencyCommLocker(std::string const& key,
|
|||
}
|
||||
|
||||
if (comm.lock(key, ttl, timeout, _vpack->slice())) {
|
||||
fetchVersion(comm);
|
||||
_isLocked = true;
|
||||
}
|
||||
}
|
||||
|
@ -472,38 +483,6 @@ void AgencyCommLocker::unlock() {
|
|||
}
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief fetch a lock version from the agency
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool AgencyCommLocker::fetchVersion(AgencyComm& comm) {
|
||||
if (_type != "WRITE") {
|
||||
return true;
|
||||
}
|
||||
|
||||
AgencyCommResult result = comm.getValues(_key + "/Version", false);
|
||||
if (!result.successful()) {
|
||||
if (result.httpCode() !=
|
||||
(int)arangodb::GeneralResponse::ResponseCode::NOT_FOUND) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
result.parse("", false);
|
||||
std::map<std::string, AgencyCommResultEntry>::const_iterator it =
|
||||
result._values.begin();
|
||||
|
||||
if (it == result._values.end()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
VPackSlice const versionSlice = it->second._vpack->slice();
|
||||
_version = versionSlice.getUInt();
|
||||
return true;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief update a lock version in the agency
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -512,39 +491,9 @@ bool AgencyCommLocker::updateVersion(AgencyComm& comm) {
|
|||
if (_type != "WRITE") {
|
||||
return true;
|
||||
}
|
||||
AgencyCommResult result = comm.increment(_key + "/Version");
|
||||
|
||||
if (_version == 0) {
|
||||
VPackBuilder builder;
|
||||
try {
|
||||
builder.add(VPackValue(1));
|
||||
} catch (...) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// no Version key found, now set it
|
||||
AgencyCommResult result =
|
||||
comm.casValue(_key + "/Version", builder.slice(), false, 0.0, 0.0);
|
||||
|
||||
return result.successful();
|
||||
} else {
|
||||
// Version key found, now update it
|
||||
VPackBuilder oldBuilder;
|
||||
try {
|
||||
oldBuilder.add(VPackValue(_version));
|
||||
} catch (...) {
|
||||
return false;
|
||||
}
|
||||
VPackBuilder newBuilder;
|
||||
try {
|
||||
newBuilder.add(VPackValue(_version + 1));
|
||||
} catch (...) {
|
||||
return false;
|
||||
}
|
||||
AgencyCommResult result = comm.casValue(
|
||||
_key + "/Version", oldBuilder.slice(), newBuilder.slice(), 0.0, 0.0);
|
||||
|
||||
return result.successful();
|
||||
}
|
||||
return result.successful();
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -1545,7 +1494,7 @@ AgencyCommResult AgencyComm::uniqid(std::string const& key, uint64_t count,
|
|||
}
|
||||
|
||||
VPackSlice oldSlice = oldBuilder->slice();
|
||||
uint64_t const oldValue = oldSlice.getUInt();
|
||||
uint64_t const oldValue = arangodb::basics::VelocyPackHelper::stringUInt64(oldSlice) + count;
|
||||
uint64_t const newValue = oldValue + count;
|
||||
|
||||
VPackBuilder newBuilder;
|
||||
|
|
|
@ -377,12 +377,6 @@ class AgencyCommLocker {
|
|||
void unlock();
|
||||
|
||||
private:
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief fetch a lock version from the agency
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool fetchVersion(AgencyComm&);
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief update a lock version in the agency
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -393,7 +387,6 @@ class AgencyCommLocker {
|
|||
std::string const _key;
|
||||
std::string const _type;
|
||||
std::shared_ptr<arangodb::velocypack::Builder> _vpack;
|
||||
uint64_t _version;
|
||||
bool _isLocked;
|
||||
};
|
||||
|
||||
|
|
|
@ -1262,21 +1262,20 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName,
|
|||
|
||||
// Update our cache:
|
||||
loadPlannedCollections();
|
||||
|
||||
// Now wait for it to appear and be complete:
|
||||
AgencyCommResult res = ac.getValues("Current/Version", false);
|
||||
if (!res.successful()) {
|
||||
return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_READ_CURRENT_VERSION,
|
||||
errorMsg);
|
||||
}
|
||||
|
||||
|
||||
AgencyCommResult res;
|
||||
std::string const where =
|
||||
"Current/Collections/" + databaseName + "/" + collectionID;
|
||||
while (TRI_microtime() <= endTime) {
|
||||
res.clear();
|
||||
res = ac.getValues(where, true);
|
||||
|
||||
LOG(TRACE) << "CREATE OYOYOYOY " << where;
|
||||
|
||||
if (res.successful() && res.parse(where + "/", false)) {
|
||||
LOG(TRACE) << "CREATE IS SUCCESS " << where;
|
||||
if (res._values.size() == (size_t)numberOfShards) {
|
||||
LOG(TRACE) << "CREATE has number " << where;
|
||||
std::string tmpMsg = "";
|
||||
bool tmpHaveError = false;
|
||||
for (auto const& p : res._values) {
|
||||
|
@ -1298,17 +1297,23 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName,
|
|||
}
|
||||
}
|
||||
}
|
||||
LOG(TRACE) << "CREATE PRE LOAD has number " << where;
|
||||
loadCurrentCollections();
|
||||
LOG(TRACE) << "CREATE POST LOAD has number " << where;
|
||||
if (tmpHaveError) {
|
||||
errorMsg = "Error in creation of collection:" + tmpMsg;
|
||||
LOG(TRACE) << "CREATE KAP0TT " << where;
|
||||
return TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION;
|
||||
}
|
||||
LOG(TRACE) << "CREATE OK " << where;
|
||||
return setErrormsg(TRI_ERROR_NO_ERROR, errorMsg);
|
||||
}
|
||||
}
|
||||
|
||||
res.clear();
|
||||
LOG(TRACE) << "JASSSSS " << interval;
|
||||
_agencyCallbackRegistry->awaitNextChange("Current/Version", interval);
|
||||
LOG(TRACE) << "NNNNJASSSSS " << interval;
|
||||
}
|
||||
|
||||
// LOG(ERR) << "GOT TIMEOUT. NUMBEROFSHARDS: " << numberOfShards;
|
||||
|
|
|
@ -116,6 +116,12 @@ void HeartbeatThread::runDBServer() {
|
|||
uint64_t lastCommandIndex = getLastCommandIndex();
|
||||
|
||||
std::function<bool(VPackSlice const& result)> updatePlan = [&](VPackSlice const& result) {
|
||||
if (!result.isNumber()) {
|
||||
LOG(ERR) << "Version is not a number! " << result.toJson();
|
||||
return false;
|
||||
}
|
||||
uint64_t version = result.getNumber<uint64_t>();
|
||||
LOG(TRACE) << "Hass " << result.toJson() << " " << version << " " << _dispatchedPlanVersion;
|
||||
bool mustHandlePlanChange = false;
|
||||
{
|
||||
MUTEX_LOCKER(mutexLocker, _statusLock);
|
||||
|
@ -126,7 +132,7 @@ void HeartbeatThread::runDBServer() {
|
|||
if (_lastDispatchedJobResult) {
|
||||
LOG(DEBUG) << "...and was successful";
|
||||
// mop: the dispatched version is still the same => we are finally uptodate
|
||||
if (!_dispatchedPlanVersion.isEmpty() && _dispatchedPlanVersion.slice().equals(result)) {
|
||||
if (_dispatchedPlanVersion == version) {
|
||||
LOG(DEBUG) << "Version is correct :)";
|
||||
return true;
|
||||
}
|
||||
|
@ -134,15 +140,14 @@ void HeartbeatThread::runDBServer() {
|
|||
}
|
||||
}
|
||||
if (_numDispatchedJobs == 0) {
|
||||
LOG(DEBUG) << "Will dispatch plan change " << result;
|
||||
LOG(DEBUG) << "Will dispatch plan change " << version;
|
||||
mustHandlePlanChange = true;
|
||||
_dispatchedPlanVersion.clear();
|
||||
_dispatchedPlanVersion.add(result);
|
||||
_dispatchedPlanVersion = version;
|
||||
}
|
||||
}
|
||||
if (mustHandlePlanChange) {
|
||||
// mop: a dispatched task has returned
|
||||
handlePlanChangeDBServer(arangodb::basics::VelocyPackHelper::stringUInt64(result));
|
||||
handlePlanChangeDBServer(version);
|
||||
}
|
||||
return false;
|
||||
};
|
||||
|
|
|
@ -185,7 +185,7 @@ class HeartbeatThread : public Thread {
|
|||
|
||||
arangodb::basics::ConditionVariable _condition;
|
||||
|
||||
VPackBuilder _dispatchedPlanVersion;
|
||||
uint64_t _dispatchedPlanVersion;
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief users for these databases will be re-fetched the next time the
|
||||
|
|
|
@ -73,7 +73,7 @@ start() {
|
|||
--cluster.my-role $ROLE \
|
||||
--log.file cluster/$PORT.log \
|
||||
--log.buffered false \
|
||||
--log.level info \
|
||||
--log.level trace \
|
||||
--log.requests-file cluster/$PORT.req \
|
||||
--server.disable-statistics true \
|
||||
--server.foxx-queues false \
|
||||
|
|
Loading…
Reference in New Issue