mirror of https://gitee.com/bigwinds/arangodb
Merge branch 'devel' of github.com:arangodb/arangodb into spdvpk
This commit is contained in:
commit
8c7023850e
|
@ -1,6 +1,6 @@
|
||||||
[Unit]
|
[Unit]
|
||||||
Description=arango database server
|
Description=arango database server
|
||||||
After=sysinit.target sockets.target timers.target paths.target slices.target network.target
|
After=sysinit.target sockets.target timers.target paths.target slices.target network.target syslog.target
|
||||||
|
|
||||||
[Service]
|
[Service]
|
||||||
ExecStart=/usr/sbin/arangod --uid arangodb --gid arangodb --pid-file /var/run/arangodb/arangod.pid --temp-path /var/tmp/arangod --log.tty "" --supervisor
|
ExecStart=/usr/sbin/arangod --uid arangodb --gid arangodb --pid-file /var/run/arangodb/arangod.pid --temp-path /var/tmp/arangod --log.tty "" --supervisor
|
||||||
|
@ -9,4 +9,4 @@ LimitNOFILE=131072
|
||||||
PIDFile=/var/run/arangodb/arangod.pid
|
PIDFile=/var/run/arangodb/arangod.pid
|
||||||
|
|
||||||
[Install]
|
[Install]
|
||||||
WantedBy=
|
WantedBy=multi-user.target
|
||||||
|
|
|
@ -68,6 +68,7 @@ void AgencyCallback::refetchAndUpdate() {
|
||||||
|
|
||||||
if (it == result._values.end()) {
|
if (it == result._values.end()) {
|
||||||
std::shared_ptr<VPackBuilder> newData = std::make_shared<VPackBuilder>();
|
std::shared_ptr<VPackBuilder> newData = std::make_shared<VPackBuilder>();
|
||||||
|
newData->add(VPackSlice::noneSlice());
|
||||||
checkValue(newData);
|
checkValue(newData);
|
||||||
} else {
|
} else {
|
||||||
checkValue(it->second._vpack);
|
checkValue(it->second._vpack);
|
||||||
|
@ -76,7 +77,8 @@ void AgencyCallback::refetchAndUpdate() {
|
||||||
|
|
||||||
void AgencyCallback::checkValue(std::shared_ptr<VPackBuilder> newData) {
|
void AgencyCallback::checkValue(std::shared_ptr<VPackBuilder> newData) {
|
||||||
if (!_lastData || !_lastData->slice().equals(newData->slice())) {
|
if (!_lastData || !_lastData->slice().equals(newData->slice())) {
|
||||||
LOG(DEBUG) << "Got new value" << newData->toJson();
|
LOG(DEBUG) << "Got new value " << newData->slice().typeName();
|
||||||
|
LOG(DEBUG) << "Got new value " << newData->toJson();
|
||||||
if (execute(newData)) {
|
if (execute(newData)) {
|
||||||
_lastData = newData;
|
_lastData = newData;
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -346,8 +346,8 @@ bool AgencyCommResult::parseVelocyPackNode(VPackSlice const& node,
|
||||||
// get "value" attribute
|
// get "value" attribute
|
||||||
VPackSlice const value = node.get("value");
|
VPackSlice const value = node.get("value");
|
||||||
|
|
||||||
if (value.isString()) {
|
if (!prefix.empty()) {
|
||||||
if (!prefix.empty()) {
|
if (value.isString()) {
|
||||||
AgencyCommResultEntry entry;
|
AgencyCommResultEntry entry;
|
||||||
|
|
||||||
// get "modifiedIndex"
|
// get "modifiedIndex"
|
||||||
|
@ -357,6 +357,18 @@ bool AgencyCommResult::parseVelocyPackNode(VPackSlice const& node,
|
||||||
entry._vpack = VPackParser::fromJson(tmp);
|
entry._vpack = VPackParser::fromJson(tmp);
|
||||||
entry._isDir = false;
|
entry._isDir = false;
|
||||||
|
|
||||||
|
_values.emplace(prefix, entry);
|
||||||
|
} else if (value.isNumber()) {
|
||||||
|
AgencyCommResultEntry entry;
|
||||||
|
|
||||||
|
// get "modifiedIndex"
|
||||||
|
entry._index = arangodb::basics::VelocyPackHelper::stringUInt64(
|
||||||
|
node.get("modifiedIndex"));
|
||||||
|
entry._vpack = std::make_shared<VPackBuilder>();
|
||||||
|
entry._isDir = false;
|
||||||
|
|
||||||
|
entry._vpack->add(value);
|
||||||
|
|
||||||
_values.emplace(prefix, entry);
|
_values.emplace(prefix, entry);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -433,7 +445,7 @@ AgencyConnectionOptions AgencyComm::_globalConnectionOptions = {
|
||||||
|
|
||||||
AgencyCommLocker::AgencyCommLocker(std::string const& key,
|
AgencyCommLocker::AgencyCommLocker(std::string const& key,
|
||||||
std::string const& type, double ttl, double timeout)
|
std::string const& type, double ttl, double timeout)
|
||||||
: _key(key), _type(type), _version(0), _isLocked(false) {
|
: _key(key), _type(type), _isLocked(false) {
|
||||||
AgencyComm comm;
|
AgencyComm comm;
|
||||||
|
|
||||||
_vpack = std::make_shared<VPackBuilder>();
|
_vpack = std::make_shared<VPackBuilder>();
|
||||||
|
@ -444,7 +456,6 @@ AgencyCommLocker::AgencyCommLocker(std::string const& key,
|
||||||
}
|
}
|
||||||
|
|
||||||
if (comm.lock(key, ttl, timeout, _vpack->slice())) {
|
if (comm.lock(key, ttl, timeout, _vpack->slice())) {
|
||||||
fetchVersion(comm);
|
|
||||||
_isLocked = true;
|
_isLocked = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -472,38 +483,6 @@ void AgencyCommLocker::unlock() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
/// @brief fetch a lock version from the agency
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
|
|
||||||
bool AgencyCommLocker::fetchVersion(AgencyComm& comm) {
|
|
||||||
if (_type != "WRITE") {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
AgencyCommResult result = comm.getValues(_key + "/Version", false);
|
|
||||||
if (!result.successful()) {
|
|
||||||
if (result.httpCode() !=
|
|
||||||
(int)arangodb::GeneralResponse::ResponseCode::NOT_FOUND) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
result.parse("", false);
|
|
||||||
std::map<std::string, AgencyCommResultEntry>::const_iterator it =
|
|
||||||
result._values.begin();
|
|
||||||
|
|
||||||
if (it == result._values.end()) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
VPackSlice const versionSlice = it->second._vpack->slice();
|
|
||||||
_version = arangodb::basics::VelocyPackHelper::stringUInt64(versionSlice);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
/// @brief update a lock version in the agency
|
/// @brief update a lock version in the agency
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
@ -512,39 +491,9 @@ bool AgencyCommLocker::updateVersion(AgencyComm& comm) {
|
||||||
if (_type != "WRITE") {
|
if (_type != "WRITE") {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
AgencyCommResult result = comm.increment(_key + "/Version");
|
||||||
|
|
||||||
if (_version == 0) {
|
return result.successful();
|
||||||
VPackBuilder builder;
|
|
||||||
try {
|
|
||||||
builder.add(VPackValue(1));
|
|
||||||
} catch (...) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
// no Version key found, now set it
|
|
||||||
AgencyCommResult result =
|
|
||||||
comm.casValue(_key + "/Version", builder.slice(), false, 0.0, 0.0);
|
|
||||||
|
|
||||||
return result.successful();
|
|
||||||
} else {
|
|
||||||
// Version key found, now update it
|
|
||||||
VPackBuilder oldBuilder;
|
|
||||||
try {
|
|
||||||
oldBuilder.add(VPackValue(_version));
|
|
||||||
} catch (...) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
VPackBuilder newBuilder;
|
|
||||||
try {
|
|
||||||
newBuilder.add(VPackValue(_version + 1));
|
|
||||||
} catch (...) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
AgencyCommResult result = comm.casValue(
|
|
||||||
_key + "/Version", oldBuilder.slice(), newBuilder.slice(), 0.0, 0.0);
|
|
||||||
|
|
||||||
return result.successful();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
@ -645,18 +594,6 @@ bool AgencyComm::initialize() {
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
bool AgencyComm::tryInitializeStructure() {
|
bool AgencyComm::tryInitializeStructure() {
|
||||||
VPackBuilder trueBuilder;
|
|
||||||
trueBuilder.add(VPackValue(true));
|
|
||||||
|
|
||||||
VPackSlice trueSlice = trueBuilder.slice();
|
|
||||||
|
|
||||||
AgencyCommResult result;
|
|
||||||
result = casValue("Init", trueSlice, false, 120.0, 0.0);
|
|
||||||
if (!result.successful()) {
|
|
||||||
// mop: we couldn"t aquire a lock. so somebody else is already initializing
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
VPackBuilder builder;
|
VPackBuilder builder;
|
||||||
try {
|
try {
|
||||||
VPackObjectBuilder b(&builder);
|
VPackObjectBuilder b(&builder);
|
||||||
|
@ -678,7 +615,7 @@ bool AgencyComm::tryInitializeStructure() {
|
||||||
VPackObjectBuilder d(&builder);
|
VPackObjectBuilder d(&builder);
|
||||||
addEmptyVPackObject("_system", builder);
|
addEmptyVPackObject("_system", builder);
|
||||||
}
|
}
|
||||||
builder.add("Version", VPackValue("1"));
|
builder.add("Version", VPackValue(1));
|
||||||
addEmptyVPackObject("ShardsCopied", builder);
|
addEmptyVPackObject("ShardsCopied", builder);
|
||||||
addEmptyVPackObject("NewServers", builder);
|
addEmptyVPackObject("NewServers", builder);
|
||||||
addEmptyVPackObject("Coordinators", builder);
|
addEmptyVPackObject("Coordinators", builder);
|
||||||
|
@ -703,7 +640,7 @@ bool AgencyComm::tryInitializeStructure() {
|
||||||
}
|
}
|
||||||
builder.add("Lock", VPackValue("\"UNLOCKED\""));
|
builder.add("Lock", VPackValue("\"UNLOCKED\""));
|
||||||
addEmptyVPackObject("DBServers", builder);
|
addEmptyVPackObject("DBServers", builder);
|
||||||
builder.add("Version", VPackValue("1"));
|
builder.add("Version", VPackValue(1));
|
||||||
builder.add(VPackValue("Collections"));
|
builder.add(VPackValue("Collections"));
|
||||||
{
|
{
|
||||||
VPackObjectBuilder d(&builder);
|
VPackObjectBuilder d(&builder);
|
||||||
|
@ -721,7 +658,7 @@ bool AgencyComm::tryInitializeStructure() {
|
||||||
VPackObjectBuilder d(&builder);
|
VPackObjectBuilder d(&builder);
|
||||||
addEmptyVPackObject("_system", builder);
|
addEmptyVPackObject("_system", builder);
|
||||||
}
|
}
|
||||||
builder.add("Version", VPackValue("\"1\""));
|
builder.add("Version", VPackValue(1));
|
||||||
addEmptyVPackObject("MapLocalToID", builder);
|
addEmptyVPackObject("MapLocalToID", builder);
|
||||||
builder.add(VPackValue("Databases"));
|
builder.add(VPackValue("Databases"));
|
||||||
{
|
{
|
||||||
|
@ -731,6 +668,7 @@ bool AgencyComm::tryInitializeStructure() {
|
||||||
}
|
}
|
||||||
addEmptyVPackObject("DBServers", builder);
|
addEmptyVPackObject("DBServers", builder);
|
||||||
builder.add("Lock", VPackValue("\"UNLOCKED\""));
|
builder.add("Lock", VPackValue("\"UNLOCKED\""));
|
||||||
|
builder.add("InitDone", VPackValue(true));
|
||||||
}
|
}
|
||||||
} catch (...) {
|
} catch (...) {
|
||||||
LOG(WARN) << "Couldn't create initializing structure";
|
LOG(WARN) << "Couldn't create initializing structure";
|
||||||
|
@ -738,22 +676,16 @@ bool AgencyComm::tryInitializeStructure() {
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
VPackSlice s = builder.slice();
|
LOG(DEBUG) << "Initializing agency with " << builder.toJson();
|
||||||
|
|
||||||
// now dump the Slice into an std::string
|
AgencyCommResult result;
|
||||||
std::string buffer;
|
AgencyOperation initOperation("", AgencyValueOperationType::SET, builder.slice());
|
||||||
VPackStringSink sink(&buffer);
|
AgencyTransaction initTransaction;
|
||||||
VPackDumper::dump(s, &sink);
|
initTransaction.operations.push_back(initOperation);
|
||||||
|
|
||||||
LOG(DEBUG) << "Initializing agency with " << buffer;
|
sendTransactionWithFailover(result, initTransaction);
|
||||||
|
|
||||||
if (!initFromVPackSlice(std::string(""), s)) {
|
return result.successful();
|
||||||
LOG(FATAL) << "Couldn't initialize agency";
|
|
||||||
FATAL_ERROR_EXIT();
|
|
||||||
} else {
|
|
||||||
setValue("InitDone", trueSlice, 0.0);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
} catch (std::exception const& e) {
|
} catch (std::exception const& e) {
|
||||||
LOG(FATAL) << "Fatal error initializing agency " << e.what();
|
LOG(FATAL) << "Fatal error initializing agency " << e.what();
|
||||||
FATAL_ERROR_EXIT();
|
FATAL_ERROR_EXIT();
|
||||||
|
@ -763,41 +695,6 @@ bool AgencyComm::tryInitializeStructure() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool AgencyComm::initFromVPackSlice(std::string key, VPackSlice s) {
|
|
||||||
bool ret = true;
|
|
||||||
AgencyCommResult result;
|
|
||||||
if (s.isObject()) {
|
|
||||||
if (!key.empty()) {
|
|
||||||
result = createDirectory(key);
|
|
||||||
if (!result.successful()) {
|
|
||||||
// mop: forbidden will be thrown if directory already exists
|
|
||||||
// need ability to recover in a case where the agency was half
|
|
||||||
// initialized
|
|
||||||
if (result.httpCode() !=
|
|
||||||
(int)arangodb::GeneralResponse::ResponseCode::FORBIDDEN) {
|
|
||||||
ret = false;
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for (auto const& it : VPackObjectIterator(s)) {
|
|
||||||
std::string subKey("");
|
|
||||||
if (!key.empty()) {
|
|
||||||
subKey += key + "/";
|
|
||||||
}
|
|
||||||
subKey += it.key.copyString();
|
|
||||||
|
|
||||||
ret = ret && initFromVPackSlice(subKey, it.value);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
result = setValue(key, s.copyString(), 0.0);
|
|
||||||
ret = ret && result.successful();
|
|
||||||
}
|
|
||||||
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
/// @brief checks if the agency is initialized
|
/// @brief checks if the agency is initialized
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
|
@ -1217,87 +1114,6 @@ bool AgencyComm::exists(std::string const& key) {
|
||||||
return result.successful();
|
return result.successful();
|
||||||
}
|
}
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
/// @brief update a version number in the agency
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
|
|
||||||
bool AgencyComm::increaseVersion(std::string const& key) {
|
|
||||||
// fetch existing version number
|
|
||||||
AgencyCommResult result = getValues(key, false);
|
|
||||||
|
|
||||||
if (!result.successful()) {
|
|
||||||
if (result.httpCode() !=
|
|
||||||
(int)arangodb::GeneralResponse::ResponseCode::NOT_FOUND) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
// no version key found, now set it
|
|
||||||
VPackBuilder builder;
|
|
||||||
try {
|
|
||||||
builder.add(VPackValue(1));
|
|
||||||
} catch (...) {
|
|
||||||
LOG(ERR) << "Couldn't add value to builder";
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
result.clear();
|
|
||||||
result = casValue(key, builder.slice(), false, 0.0, 0.0);
|
|
||||||
|
|
||||||
return result.successful();
|
|
||||||
}
|
|
||||||
|
|
||||||
// found a version
|
|
||||||
result.parse("", false);
|
|
||||||
auto it = result._values.begin();
|
|
||||||
|
|
||||||
if (it == result._values.end()) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
VPackSlice const versionSlice = it->second._vpack->slice();
|
|
||||||
uint64_t version =
|
|
||||||
arangodb::basics::VelocyPackHelper::stringUInt64(versionSlice);
|
|
||||||
VPackBuilder oldBuilder;
|
|
||||||
try {
|
|
||||||
if (versionSlice.isString()) {
|
|
||||||
oldBuilder.add(VPackValue(std::to_string(version)));
|
|
||||||
} else {
|
|
||||||
oldBuilder.add(VPackValue(version));
|
|
||||||
}
|
|
||||||
} catch (...) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
VPackBuilder newBuilder;
|
|
||||||
try {
|
|
||||||
newBuilder.add(VPackValue(version + 1));
|
|
||||||
} catch (...) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
result.clear();
|
|
||||||
|
|
||||||
result = casValue(key, oldBuilder.slice(), newBuilder.slice(), 0.0, 0.0);
|
|
||||||
|
|
||||||
return result.successful();
|
|
||||||
}
|
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
/// @brief update a version number in the agency, retry until it works
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
|
|
||||||
void AgencyComm::increaseVersionRepeated(std::string const& key) {
|
|
||||||
bool ok = false;
|
|
||||||
while (!ok) {
|
|
||||||
ok = increaseVersion(key);
|
|
||||||
if (ok) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
uint32_t val = 300 + TRI_UInt32Random() % 400;
|
|
||||||
LOG(INFO) << "Could not increase " << key << " in agency, retrying in "
|
|
||||||
<< val << "!";
|
|
||||||
usleep(val * 1000);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
/// @brief increment a key
|
/// @brief increment a key
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
@ -1673,8 +1489,7 @@ AgencyCommResult AgencyComm::uniqid(std::string const& key, uint64_t count,
|
||||||
}
|
}
|
||||||
|
|
||||||
VPackSlice oldSlice = oldBuilder->slice();
|
VPackSlice oldSlice = oldBuilder->slice();
|
||||||
uint64_t const oldValue =
|
uint64_t const oldValue = arangodb::basics::VelocyPackHelper::stringUInt64(oldSlice) + count;
|
||||||
arangodb::basics::VelocyPackHelper::stringUInt64(oldSlice) + count;
|
|
||||||
uint64_t const newValue = oldValue + count;
|
uint64_t const newValue = oldValue + count;
|
||||||
|
|
||||||
VPackBuilder newBuilder;
|
VPackBuilder newBuilder;
|
||||||
|
|
|
@ -235,6 +235,9 @@ struct AgencyTransaction {
|
||||||
explicit AgencyTransaction(AgencyOperation const& operation) {
|
explicit AgencyTransaction(AgencyOperation const& operation) {
|
||||||
operations.push_back(operation);
|
operations.push_back(operation);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
explicit AgencyTransaction() {
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
struct AgencyCommResult {
|
struct AgencyCommResult {
|
||||||
|
@ -374,12 +377,6 @@ class AgencyCommLocker {
|
||||||
void unlock();
|
void unlock();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
|
||||||
/// @brief fetch a lock version from the agency
|
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
|
||||||
|
|
||||||
bool fetchVersion(AgencyComm&);
|
|
||||||
|
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
/// @brief update a lock version in the agency
|
/// @brief update a lock version in the agency
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
|
@ -390,7 +387,6 @@ class AgencyCommLocker {
|
||||||
std::string const _key;
|
std::string const _key;
|
||||||
std::string const _type;
|
std::string const _type;
|
||||||
std::shared_ptr<arangodb::velocypack::Builder> _vpack;
|
std::shared_ptr<arangodb::velocypack::Builder> _vpack;
|
||||||
uint64_t _version;
|
|
||||||
bool _isLocked;
|
bool _isLocked;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -488,13 +484,10 @@ class AgencyComm {
|
||||||
/// @brief update a version number in the agency
|
/// @brief update a version number in the agency
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
bool increaseVersion(std::string const& key);
|
inline bool increaseVersion(std::string const& key) {
|
||||||
|
AgencyCommResult result = increment(key);
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
return result.successful();
|
||||||
/// @brief update a version number in the agency, retry until it works
|
}
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
|
||||||
|
|
||||||
void increaseVersionRepeated(std::string const& key);
|
|
||||||
|
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
/// @brief creates a directory in the backend
|
/// @brief creates a directory in the backend
|
||||||
|
|
|
@ -1258,25 +1258,24 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName,
|
||||||
errorMsg);
|
errorMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
ac.increaseVersionRepeated("Plan/Version");
|
ac.increaseVersion("Plan/Version");
|
||||||
|
|
||||||
// Update our cache:
|
// Update our cache:
|
||||||
loadPlannedCollections();
|
loadPlannedCollections();
|
||||||
|
|
||||||
// Now wait for it to appear and be complete:
|
AgencyCommResult res;
|
||||||
AgencyCommResult res = ac.getValues("Current/Version", false);
|
|
||||||
if (!res.successful()) {
|
|
||||||
return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_READ_CURRENT_VERSION,
|
|
||||||
errorMsg);
|
|
||||||
}
|
|
||||||
|
|
||||||
std::string const where =
|
std::string const where =
|
||||||
"Current/Collections/" + databaseName + "/" + collectionID;
|
"Current/Collections/" + databaseName + "/" + collectionID;
|
||||||
while (TRI_microtime() <= endTime) {
|
while (TRI_microtime() <= endTime) {
|
||||||
res.clear();
|
res.clear();
|
||||||
res = ac.getValues(where, true);
|
res = ac.getValues(where, true);
|
||||||
|
|
||||||
|
LOG(TRACE) << "CREATE OYOYOYOY " << where;
|
||||||
|
|
||||||
if (res.successful() && res.parse(where + "/", false)) {
|
if (res.successful() && res.parse(where + "/", false)) {
|
||||||
|
LOG(TRACE) << "CREATE IS SUCCESS " << where;
|
||||||
if (res._values.size() == (size_t)numberOfShards) {
|
if (res._values.size() == (size_t)numberOfShards) {
|
||||||
|
LOG(TRACE) << "CREATE has number " << where;
|
||||||
std::string tmpMsg = "";
|
std::string tmpMsg = "";
|
||||||
bool tmpHaveError = false;
|
bool tmpHaveError = false;
|
||||||
for (auto const& p : res._values) {
|
for (auto const& p : res._values) {
|
||||||
|
@ -1298,17 +1297,23 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
LOG(TRACE) << "CREATE PRE LOAD has number " << where;
|
||||||
loadCurrentCollections();
|
loadCurrentCollections();
|
||||||
|
LOG(TRACE) << "CREATE POST LOAD has number " << where;
|
||||||
if (tmpHaveError) {
|
if (tmpHaveError) {
|
||||||
errorMsg = "Error in creation of collection:" + tmpMsg;
|
errorMsg = "Error in creation of collection:" + tmpMsg;
|
||||||
|
LOG(TRACE) << "CREATE KAP0TT " << where;
|
||||||
return TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION;
|
return TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION;
|
||||||
}
|
}
|
||||||
|
LOG(TRACE) << "CREATE OK " << where;
|
||||||
return setErrormsg(TRI_ERROR_NO_ERROR, errorMsg);
|
return setErrormsg(TRI_ERROR_NO_ERROR, errorMsg);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
res.clear();
|
res.clear();
|
||||||
|
LOG(TRACE) << "JASSSSS " << interval;
|
||||||
_agencyCallbackRegistry->awaitNextChange("Current/Version", interval);
|
_agencyCallbackRegistry->awaitNextChange("Current/Version", interval);
|
||||||
|
LOG(TRACE) << "NNNNJASSSSS " << interval;
|
||||||
}
|
}
|
||||||
|
|
||||||
// LOG(ERR) << "GOT TIMEOUT. NUMBEROFSHARDS: " << numberOfShards;
|
// LOG(ERR) << "GOT TIMEOUT. NUMBEROFSHARDS: " << numberOfShards;
|
||||||
|
|
|
@ -116,6 +116,12 @@ void HeartbeatThread::runDBServer() {
|
||||||
uint64_t lastCommandIndex = getLastCommandIndex();
|
uint64_t lastCommandIndex = getLastCommandIndex();
|
||||||
|
|
||||||
std::function<bool(VPackSlice const& result)> updatePlan = [&](VPackSlice const& result) {
|
std::function<bool(VPackSlice const& result)> updatePlan = [&](VPackSlice const& result) {
|
||||||
|
if (!result.isNumber()) {
|
||||||
|
LOG(ERR) << "Version is not a number! " << result.toJson();
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
uint64_t version = result.getNumber<uint64_t>();
|
||||||
|
LOG(TRACE) << "Hass " << result.toJson() << " " << version << " " << _dispatchedPlanVersion;
|
||||||
bool mustHandlePlanChange = false;
|
bool mustHandlePlanChange = false;
|
||||||
{
|
{
|
||||||
MUTEX_LOCKER(mutexLocker, _statusLock);
|
MUTEX_LOCKER(mutexLocker, _statusLock);
|
||||||
|
@ -126,7 +132,7 @@ void HeartbeatThread::runDBServer() {
|
||||||
if (_lastDispatchedJobResult) {
|
if (_lastDispatchedJobResult) {
|
||||||
LOG(DEBUG) << "...and was successful";
|
LOG(DEBUG) << "...and was successful";
|
||||||
// mop: the dispatched version is still the same => we are finally uptodate
|
// mop: the dispatched version is still the same => we are finally uptodate
|
||||||
if (!_dispatchedPlanVersion.isEmpty() && _dispatchedPlanVersion.slice().equals(result)) {
|
if (_dispatchedPlanVersion == version) {
|
||||||
LOG(DEBUG) << "Version is correct :)";
|
LOG(DEBUG) << "Version is correct :)";
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -134,15 +140,14 @@ void HeartbeatThread::runDBServer() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (_numDispatchedJobs == 0) {
|
if (_numDispatchedJobs == 0) {
|
||||||
LOG(DEBUG) << "Will dispatch plan change " << result;
|
LOG(DEBUG) << "Will dispatch plan change " << version;
|
||||||
mustHandlePlanChange = true;
|
mustHandlePlanChange = true;
|
||||||
_dispatchedPlanVersion.clear();
|
_dispatchedPlanVersion = version;
|
||||||
_dispatchedPlanVersion.add(result);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (mustHandlePlanChange) {
|
if (mustHandlePlanChange) {
|
||||||
// mop: a dispatched task has returned
|
// mop: a dispatched task has returned
|
||||||
handlePlanChangeDBServer(arangodb::basics::VelocyPackHelper::stringUInt64(result));
|
handlePlanChangeDBServer(version);
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
};
|
};
|
||||||
|
|
|
@ -185,7 +185,7 @@ class HeartbeatThread : public Thread {
|
||||||
|
|
||||||
arangodb::basics::ConditionVariable _condition;
|
arangodb::basics::ConditionVariable _condition;
|
||||||
|
|
||||||
VPackBuilder _dispatchedPlanVersion;
|
uint64_t _dispatchedPlanVersion;
|
||||||
|
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
/// @brief users for these databases will be re-fetched the next time the
|
/// @brief users for these databases will be re-fetched the next time the
|
||||||
|
|
Loading…
Reference in New Issue