diff --git a/Installation/systemd/arangodb.service b/Installation/systemd/arangodb.service index 87609561b4..bda180f014 100644 --- a/Installation/systemd/arangodb.service +++ b/Installation/systemd/arangodb.service @@ -1,6 +1,6 @@ [Unit] Description=arango database server -After=sysinit.target sockets.target timers.target paths.target slices.target network.target +After=sysinit.target sockets.target timers.target paths.target slices.target network.target syslog.target [Service] ExecStart=/usr/sbin/arangod --uid arangodb --gid arangodb --pid-file /var/run/arangodb/arangod.pid --temp-path /var/tmp/arangod --log.tty "" --supervisor @@ -9,4 +9,4 @@ LimitNOFILE=131072 PIDFile=/var/run/arangodb/arangod.pid [Install] -WantedBy= +WantedBy=multi-user.target diff --git a/arangod/Cluster/AgencyCallback.cpp b/arangod/Cluster/AgencyCallback.cpp index d8ce11f460..59d917ee48 100644 --- a/arangod/Cluster/AgencyCallback.cpp +++ b/arangod/Cluster/AgencyCallback.cpp @@ -68,6 +68,7 @@ void AgencyCallback::refetchAndUpdate() { if (it == result._values.end()) { std::shared_ptr newData = std::make_shared(); + newData->add(VPackSlice::noneSlice()); checkValue(newData); } else { checkValue(it->second._vpack); @@ -76,7 +77,8 @@ void AgencyCallback::refetchAndUpdate() { void AgencyCallback::checkValue(std::shared_ptr newData) { if (!_lastData || !_lastData->slice().equals(newData->slice())) { - LOG(DEBUG) << "Got new value" << newData->toJson(); + LOG(DEBUG) << "Got new value " << newData->slice().typeName(); + LOG(DEBUG) << "Got new value " << newData->toJson(); if (execute(newData)) { _lastData = newData; } else { diff --git a/arangod/Cluster/AgencyComm.cpp b/arangod/Cluster/AgencyComm.cpp index a49c247131..fc6785146d 100644 --- a/arangod/Cluster/AgencyComm.cpp +++ b/arangod/Cluster/AgencyComm.cpp @@ -346,8 +346,8 @@ bool AgencyCommResult::parseVelocyPackNode(VPackSlice const& node, // get "value" attribute VPackSlice const value = node.get("value"); - if (value.isString()) { - if (!prefix.empty()) { + if (!prefix.empty()) { + if (value.isString()) { AgencyCommResultEntry entry; // get "modifiedIndex" @@ -357,6 +357,18 @@ bool AgencyCommResult::parseVelocyPackNode(VPackSlice const& node, entry._vpack = VPackParser::fromJson(tmp); entry._isDir = false; + _values.emplace(prefix, entry); + } else if (value.isNumber()) { + AgencyCommResultEntry entry; + + // get "modifiedIndex" + entry._index = arangodb::basics::VelocyPackHelper::stringUInt64( + node.get("modifiedIndex")); + entry._vpack = std::make_shared(); + entry._isDir = false; + + entry._vpack->add(value); + _values.emplace(prefix, entry); } } @@ -433,7 +445,7 @@ AgencyConnectionOptions AgencyComm::_globalConnectionOptions = { AgencyCommLocker::AgencyCommLocker(std::string const& key, std::string const& type, double ttl, double timeout) - : _key(key), _type(type), _version(0), _isLocked(false) { + : _key(key), _type(type), _isLocked(false) { AgencyComm comm; _vpack = std::make_shared(); @@ -444,7 +456,6 @@ AgencyCommLocker::AgencyCommLocker(std::string const& key, } if (comm.lock(key, ttl, timeout, _vpack->slice())) { - fetchVersion(comm); _isLocked = true; } } @@ -472,38 +483,6 @@ void AgencyCommLocker::unlock() { } } -//////////////////////////////////////////////////////////////////////////////// -/// @brief fetch a lock version from the agency -//////////////////////////////////////////////////////////////////////////////// - -bool AgencyCommLocker::fetchVersion(AgencyComm& comm) { - if (_type != "WRITE") { - return true; - } - - AgencyCommResult result = comm.getValues(_key + "/Version", false); - if (!result.successful()) { - if (result.httpCode() != - (int)arangodb::GeneralResponse::ResponseCode::NOT_FOUND) { - return false; - } - - return true; - } - - result.parse("", false); - std::map::const_iterator it = - result._values.begin(); - - if (it == result._values.end()) { - return false; - } - - VPackSlice const versionSlice = it->second._vpack->slice(); - _version = arangodb::basics::VelocyPackHelper::stringUInt64(versionSlice); - return true; -} - //////////////////////////////////////////////////////////////////////////////// /// @brief update a lock version in the agency //////////////////////////////////////////////////////////////////////////////// @@ -512,39 +491,9 @@ bool AgencyCommLocker::updateVersion(AgencyComm& comm) { if (_type != "WRITE") { return true; } + AgencyCommResult result = comm.increment(_key + "/Version"); - if (_version == 0) { - VPackBuilder builder; - try { - builder.add(VPackValue(1)); - } catch (...) { - return false; - } - - // no Version key found, now set it - AgencyCommResult result = - comm.casValue(_key + "/Version", builder.slice(), false, 0.0, 0.0); - - return result.successful(); - } else { - // Version key found, now update it - VPackBuilder oldBuilder; - try { - oldBuilder.add(VPackValue(_version)); - } catch (...) { - return false; - } - VPackBuilder newBuilder; - try { - newBuilder.add(VPackValue(_version + 1)); - } catch (...) { - return false; - } - AgencyCommResult result = comm.casValue( - _key + "/Version", oldBuilder.slice(), newBuilder.slice(), 0.0, 0.0); - - return result.successful(); - } + return result.successful(); } //////////////////////////////////////////////////////////////////////////////// @@ -645,18 +594,6 @@ bool AgencyComm::initialize() { ////////////////////////////////////////////////////////////////////////////// bool AgencyComm::tryInitializeStructure() { - VPackBuilder trueBuilder; - trueBuilder.add(VPackValue(true)); - - VPackSlice trueSlice = trueBuilder.slice(); - - AgencyCommResult result; - result = casValue("Init", trueSlice, false, 120.0, 0.0); - if (!result.successful()) { - // mop: we couldn"t aquire a lock. so somebody else is already initializing - return false; - } - VPackBuilder builder; try { VPackObjectBuilder b(&builder); @@ -678,7 +615,7 @@ bool AgencyComm::tryInitializeStructure() { VPackObjectBuilder d(&builder); addEmptyVPackObject("_system", builder); } - builder.add("Version", VPackValue("1")); + builder.add("Version", VPackValue(1)); addEmptyVPackObject("ShardsCopied", builder); addEmptyVPackObject("NewServers", builder); addEmptyVPackObject("Coordinators", builder); @@ -703,7 +640,7 @@ bool AgencyComm::tryInitializeStructure() { } builder.add("Lock", VPackValue("\"UNLOCKED\"")); addEmptyVPackObject("DBServers", builder); - builder.add("Version", VPackValue("1")); + builder.add("Version", VPackValue(1)); builder.add(VPackValue("Collections")); { VPackObjectBuilder d(&builder); @@ -721,7 +658,7 @@ bool AgencyComm::tryInitializeStructure() { VPackObjectBuilder d(&builder); addEmptyVPackObject("_system", builder); } - builder.add("Version", VPackValue("\"1\"")); + builder.add("Version", VPackValue(1)); addEmptyVPackObject("MapLocalToID", builder); builder.add(VPackValue("Databases")); { @@ -731,6 +668,7 @@ bool AgencyComm::tryInitializeStructure() { } addEmptyVPackObject("DBServers", builder); builder.add("Lock", VPackValue("\"UNLOCKED\"")); + builder.add("InitDone", VPackValue(true)); } } catch (...) { LOG(WARN) << "Couldn't create initializing structure"; @@ -738,22 +676,16 @@ bool AgencyComm::tryInitializeStructure() { } try { - VPackSlice s = builder.slice(); + LOG(DEBUG) << "Initializing agency with " << builder.toJson(); - // now dump the Slice into an std::string - std::string buffer; - VPackStringSink sink(&buffer); - VPackDumper::dump(s, &sink); + AgencyCommResult result; + AgencyOperation initOperation("", AgencyValueOperationType::SET, builder.slice()); + AgencyTransaction initTransaction; + initTransaction.operations.push_back(initOperation); - LOG(DEBUG) << "Initializing agency with " << buffer; - - if (!initFromVPackSlice(std::string(""), s)) { - LOG(FATAL) << "Couldn't initialize agency"; - FATAL_ERROR_EXIT(); - } else { - setValue("InitDone", trueSlice, 0.0); - return true; - } + sendTransactionWithFailover(result, initTransaction); + + return result.successful(); } catch (std::exception const& e) { LOG(FATAL) << "Fatal error initializing agency " << e.what(); FATAL_ERROR_EXIT(); @@ -763,41 +695,6 @@ bool AgencyComm::tryInitializeStructure() { } } -bool AgencyComm::initFromVPackSlice(std::string key, VPackSlice s) { - bool ret = true; - AgencyCommResult result; - if (s.isObject()) { - if (!key.empty()) { - result = createDirectory(key); - if (!result.successful()) { - // mop: forbidden will be thrown if directory already exists - // need ability to recover in a case where the agency was half - // initialized - if (result.httpCode() != - (int)arangodb::GeneralResponse::ResponseCode::FORBIDDEN) { - ret = false; - return ret; - } - } - } - - for (auto const& it : VPackObjectIterator(s)) { - std::string subKey(""); - if (!key.empty()) { - subKey += key + "/"; - } - subKey += it.key.copyString(); - - ret = ret && initFromVPackSlice(subKey, it.value); - } - } else { - result = setValue(key, s.copyString(), 0.0); - ret = ret && result.successful(); - } - - return ret; -} - ////////////////////////////////////////////////////////////////////////////// /// @brief checks if the agency is initialized ////////////////////////////////////////////////////////////////////////////// @@ -1217,87 +1114,6 @@ bool AgencyComm::exists(std::string const& key) { return result.successful(); } -//////////////////////////////////////////////////////////////////////////////// -/// @brief update a version number in the agency -//////////////////////////////////////////////////////////////////////////////// - -bool AgencyComm::increaseVersion(std::string const& key) { - // fetch existing version number - AgencyCommResult result = getValues(key, false); - - if (!result.successful()) { - if (result.httpCode() != - (int)arangodb::GeneralResponse::ResponseCode::NOT_FOUND) { - return false; - } - - // no version key found, now set it - VPackBuilder builder; - try { - builder.add(VPackValue(1)); - } catch (...) { - LOG(ERR) << "Couldn't add value to builder"; - return false; - } - - result.clear(); - result = casValue(key, builder.slice(), false, 0.0, 0.0); - - return result.successful(); - } - - // found a version - result.parse("", false); - auto it = result._values.begin(); - - if (it == result._values.end()) { - return false; - } - - VPackSlice const versionSlice = it->second._vpack->slice(); - uint64_t version = - arangodb::basics::VelocyPackHelper::stringUInt64(versionSlice); - VPackBuilder oldBuilder; - try { - if (versionSlice.isString()) { - oldBuilder.add(VPackValue(std::to_string(version))); - } else { - oldBuilder.add(VPackValue(version)); - } - } catch (...) { - return false; - } - VPackBuilder newBuilder; - try { - newBuilder.add(VPackValue(version + 1)); - } catch (...) { - return false; - } - result.clear(); - - result = casValue(key, oldBuilder.slice(), newBuilder.slice(), 0.0, 0.0); - - return result.successful(); -} - -//////////////////////////////////////////////////////////////////////////////// -/// @brief update a version number in the agency, retry until it works -//////////////////////////////////////////////////////////////////////////////// - -void AgencyComm::increaseVersionRepeated(std::string const& key) { - bool ok = false; - while (!ok) { - ok = increaseVersion(key); - if (ok) { - return; - } - uint32_t val = 300 + TRI_UInt32Random() % 400; - LOG(INFO) << "Could not increase " << key << " in agency, retrying in " - << val << "!"; - usleep(val * 1000); - } -} - //////////////////////////////////////////////////////////////////////////////// /// @brief increment a key //////////////////////////////////////////////////////////////////////////////// @@ -1673,8 +1489,7 @@ AgencyCommResult AgencyComm::uniqid(std::string const& key, uint64_t count, } VPackSlice oldSlice = oldBuilder->slice(); - uint64_t const oldValue = - arangodb::basics::VelocyPackHelper::stringUInt64(oldSlice) + count; + uint64_t const oldValue = arangodb::basics::VelocyPackHelper::stringUInt64(oldSlice) + count; uint64_t const newValue = oldValue + count; VPackBuilder newBuilder; diff --git a/arangod/Cluster/AgencyComm.h b/arangod/Cluster/AgencyComm.h index e476a4e654..aa0442081b 100644 --- a/arangod/Cluster/AgencyComm.h +++ b/arangod/Cluster/AgencyComm.h @@ -235,6 +235,9 @@ struct AgencyTransaction { explicit AgencyTransaction(AgencyOperation const& operation) { operations.push_back(operation); } + + explicit AgencyTransaction() { + } }; struct AgencyCommResult { @@ -374,12 +377,6 @@ class AgencyCommLocker { void unlock(); private: - ////////////////////////////////////////////////////////////////////////////// - /// @brief fetch a lock version from the agency - ////////////////////////////////////////////////////////////////////////////// - - bool fetchVersion(AgencyComm&); - ////////////////////////////////////////////////////////////////////////////// /// @brief update a lock version in the agency ////////////////////////////////////////////////////////////////////////////// @@ -390,7 +387,6 @@ class AgencyCommLocker { std::string const _key; std::string const _type; std::shared_ptr _vpack; - uint64_t _version; bool _isLocked; }; @@ -488,13 +484,10 @@ class AgencyComm { /// @brief update a version number in the agency ////////////////////////////////////////////////////////////////////////////// - bool increaseVersion(std::string const& key); - - ////////////////////////////////////////////////////////////////////////////// - /// @brief update a version number in the agency, retry until it works - ////////////////////////////////////////////////////////////////////////////// - - void increaseVersionRepeated(std::string const& key); + inline bool increaseVersion(std::string const& key) { + AgencyCommResult result = increment(key); + return result.successful(); + } ////////////////////////////////////////////////////////////////////////////// /// @brief creates a directory in the backend diff --git a/arangod/Cluster/ClusterInfo.cpp b/arangod/Cluster/ClusterInfo.cpp index 63a27fa04e..1e35c27ed5 100644 --- a/arangod/Cluster/ClusterInfo.cpp +++ b/arangod/Cluster/ClusterInfo.cpp @@ -1258,25 +1258,24 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName, errorMsg); } - ac.increaseVersionRepeated("Plan/Version"); + ac.increaseVersion("Plan/Version"); // Update our cache: loadPlannedCollections(); - - // Now wait for it to appear and be complete: - AgencyCommResult res = ac.getValues("Current/Version", false); - if (!res.successful()) { - return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_READ_CURRENT_VERSION, - errorMsg); - } - + + AgencyCommResult res; std::string const where = "Current/Collections/" + databaseName + "/" + collectionID; while (TRI_microtime() <= endTime) { res.clear(); res = ac.getValues(where, true); + + LOG(TRACE) << "CREATE OYOYOYOY " << where; + if (res.successful() && res.parse(where + "/", false)) { + LOG(TRACE) << "CREATE IS SUCCESS " << where; if (res._values.size() == (size_t)numberOfShards) { + LOG(TRACE) << "CREATE has number " << where; std::string tmpMsg = ""; bool tmpHaveError = false; for (auto const& p : res._values) { @@ -1298,17 +1297,23 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName, } } } + LOG(TRACE) << "CREATE PRE LOAD has number " << where; loadCurrentCollections(); + LOG(TRACE) << "CREATE POST LOAD has number " << where; if (tmpHaveError) { errorMsg = "Error in creation of collection:" + tmpMsg; + LOG(TRACE) << "CREATE KAP0TT " << where; return TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION; } + LOG(TRACE) << "CREATE OK " << where; return setErrormsg(TRI_ERROR_NO_ERROR, errorMsg); } } res.clear(); + LOG(TRACE) << "JASSSSS " << interval; _agencyCallbackRegistry->awaitNextChange("Current/Version", interval); + LOG(TRACE) << "NNNNJASSSSS " << interval; } // LOG(ERR) << "GOT TIMEOUT. NUMBEROFSHARDS: " << numberOfShards; diff --git a/arangod/Cluster/HeartbeatThread.cpp b/arangod/Cluster/HeartbeatThread.cpp index fed935e62f..8f5372f3b0 100644 --- a/arangod/Cluster/HeartbeatThread.cpp +++ b/arangod/Cluster/HeartbeatThread.cpp @@ -116,6 +116,12 @@ void HeartbeatThread::runDBServer() { uint64_t lastCommandIndex = getLastCommandIndex(); std::function updatePlan = [&](VPackSlice const& result) { + if (!result.isNumber()) { + LOG(ERR) << "Version is not a number! " << result.toJson(); + return false; + } + uint64_t version = result.getNumber(); + LOG(TRACE) << "Hass " << result.toJson() << " " << version << " " << _dispatchedPlanVersion; bool mustHandlePlanChange = false; { MUTEX_LOCKER(mutexLocker, _statusLock); @@ -126,7 +132,7 @@ void HeartbeatThread::runDBServer() { if (_lastDispatchedJobResult) { LOG(DEBUG) << "...and was successful"; // mop: the dispatched version is still the same => we are finally uptodate - if (!_dispatchedPlanVersion.isEmpty() && _dispatchedPlanVersion.slice().equals(result)) { + if (_dispatchedPlanVersion == version) { LOG(DEBUG) << "Version is correct :)"; return true; } @@ -134,15 +140,14 @@ void HeartbeatThread::runDBServer() { } } if (_numDispatchedJobs == 0) { - LOG(DEBUG) << "Will dispatch plan change " << result; + LOG(DEBUG) << "Will dispatch plan change " << version; mustHandlePlanChange = true; - _dispatchedPlanVersion.clear(); - _dispatchedPlanVersion.add(result); + _dispatchedPlanVersion = version; } } if (mustHandlePlanChange) { // mop: a dispatched task has returned - handlePlanChangeDBServer(arangodb::basics::VelocyPackHelper::stringUInt64(result)); + handlePlanChangeDBServer(version); } return false; }; diff --git a/arangod/Cluster/HeartbeatThread.h b/arangod/Cluster/HeartbeatThread.h index fa20280442..f47605e2d5 100644 --- a/arangod/Cluster/HeartbeatThread.h +++ b/arangod/Cluster/HeartbeatThread.h @@ -185,7 +185,7 @@ class HeartbeatThread : public Thread { arangodb::basics::ConditionVariable _condition; - VPackBuilder _dispatchedPlanVersion; + uint64_t _dispatchedPlanVersion; ////////////////////////////////////////////////////////////////////////////// /// @brief users for these databases will be re-fetched the next time the