1
0
Fork 0

Improve cluster in various ways.

Finish implementation of AgencyReadTransaction.
Use AgencyReadTransaction in HeartbeatThread of coordinator (less requests).
Repair ClusterInfo::uniqid.
Repair AgencyComm::uniqid.
Remove x-etcd-index header.
Remove _index in AgencyCommResult.
Streamline HeartbeatThread of coordinator.
Remove lastCommandIndex from AgencyComm.
Fix HeartbeatThread::handleStateChange.
This commit is contained in:
Max Neunhoeffer 2016-05-06 22:30:27 +02:00
parent 5690c0410c
commit 0fbc48b83e
7 changed files with 200 additions and 229 deletions

View File

@ -137,8 +137,8 @@ void AgencyCallback::executeByCallbackOrTimeout(double maxTimeout) {
compareBuilder = _lastData;
}
_useCv = true;
CONDITION_LOCKER(locker, _cv);
_useCv = true;
locker.wait(static_cast<uint64_t>(maxTimeout * 1000000.0));
_useCv = false;

View File

@ -144,10 +144,7 @@ void AgencyPrecondition::toVelocyPack(VPackBuilder& builder) const {
std::string AgencyWriteTransaction::toJson() const {
VPackBuilder builder;
{
VPackArrayBuilder guard(&builder);
toVelocyPack(builder);
}
toVelocyPack(builder);
return builder.toJson();
}
@ -177,10 +174,7 @@ void AgencyWriteTransaction::toVelocyPack(VPackBuilder& builder) const {
std::string AgencyReadTransaction::toJson() const {
VPackBuilder builder;
{
VPackArrayBuilder guard(&builder);
toVelocyPack(builder);
}
toVelocyPack(builder);
return builder.toJson();
}
@ -189,12 +183,9 @@ std::string AgencyReadTransaction::toJson() const {
//////////////////////////////////////////////////////////////////////////////
void AgencyReadTransaction::toVelocyPack(VPackBuilder& builder) const {
VPackArrayBuilder guard(&builder);
{
VPackArrayBuilder guard2(&builder);
for (std::string const& key: keys) {
builder.add(VPackValue(key));
}
VPackArrayBuilder guard2(&builder);
for (std::string const& key: keys) {
builder.add(VPackValue(key));
}
}
@ -225,7 +216,6 @@ AgencyCommResult::AgencyCommResult()
_message(),
_body(),
_values(),
_index(0),
_statusCode(0),
_connected(false) {}
@ -328,7 +318,6 @@ void AgencyCommResult::clear() {
_location = "";
_message = "";
_body = "";
_index = 0;
_statusCode = 0;
}
@ -658,7 +647,7 @@ bool AgencyComm::tryInitializeStructure() {
builder.add(VPackValue("Sync"));
{
VPackObjectBuilder c(&builder);
builder.add("LatestID", VPackValue("1"));
builder.add("LatestID", VPackValue(1));
addEmptyVPackObject("Problems", builder);
builder.add("UserVersion", VPackValue(1));
addEmptyVPackObject("ServerStates", builder);
@ -1566,76 +1555,69 @@ bool AgencyComm::unlockWrite(std::string const& key, double timeout) {
/// @brief get unique id
////////////////////////////////////////////////////////////////////////////////
AgencyCommResult AgencyComm::uniqid(std::string const& key, uint64_t count,
double timeout) {
static int const maxTries = 10;
uint64_t AgencyComm::uniqid(uint64_t count, double timeout) {
static int const maxTries = 1000000;
// this is pretty much forever, but we simply cannot continue at all
// if we do not get a unique id from the agency.
int tries = 0;
AgencyCommResult result;
while (tries++ < maxTries) {
result.clear();
result = getValues(key, false);
uint64_t oldValue = 0;
if (result.httpCode() ==
(int)arangodb::GeneralResponse::ResponseCode::NOT_FOUND) {
while (tries++ < maxTries) {
result = getValues2("Sync/LatestID");
if (!result.successful()) {
usleep(500000);
continue;
}
VPackSlice oldSlice = result.slice()[0].get(std::vector<std::string>(
{prefixStripped(), "Sync", "LatestID"}));
if (!(oldSlice.isSmallInt() || oldSlice.isUInt())) {
LOG(WARN) << "Sync/LatestID in agency is not an unsigned integer, fixing...";
try {
VPackBuilder builder;
builder.add(VPackValue(0));
// create the key on the fly
setValue(key, builder.slice(), 0.0);
tries--;
setValue("Sync/LatestID", builder.slice(), 0.0);
continue;
} catch (...) {
// Could not build local key. Try again
}
continue;
}
if (!result.successful()) {
return result;
}
result.parse("", false);
std::shared_ptr<VPackBuilder> oldBuilder;
std::map<std::string, AgencyCommResultEntry>::iterator it =
result._values.begin();
// If we get here, slice is pointing to an unsigned integer, which
// is the value in the agency.
oldValue = 0;
try {
if (it != result._values.end()) {
// steal the velocypack
oldBuilder.swap((*it).second._vpack);
} else {
oldBuilder->add(VPackValue(0));
}
} catch (...) {
return AgencyCommResult();
oldValue = oldSlice.getUInt();
}
catch (...) {
}
VPackSlice oldSlice = oldBuilder->slice();
uint64_t const oldValue = arangodb::basics::VelocyPackHelper::stringUInt64(oldSlice) + count;
uint64_t const newValue = oldValue + count;
VPackBuilder newBuilder;
try {
newBuilder.add(VPackValue(newValue));
} catch (...) {
return AgencyCommResult();
usleep(500000);
continue;
}
result.clear();
result = casValue(key, oldSlice, newBuilder.slice(), 0.0, timeout);
result = casValue("Sync/LatestID", oldSlice, newBuilder.slice(),
0.0, timeout);
if (result.successful()) {
result._index = oldValue + 1;
break;
}
// The cas did not work, simply try again!
}
return result;
return oldValue;
}
////////////////////////////////////////////////////////////////////////////////
@ -1841,11 +1823,56 @@ AgencyCommResult AgencyComm::sendTransactionWithFailover(
std::string url(buildUrl());
url += "/write";
url += transaction.isWriteTransaction() ? "/write" : "/read";
return sendWithFailover(arangodb::GeneralRequest::RequestType::POST,
VPackBuilder builder;
{
VPackArrayBuilder guard(&builder);
transaction.toVelocyPack(builder);
}
AgencyCommResult result = sendWithFailover(
arangodb::GeneralRequest::RequestType::POST,
timeout == 0.0 ? _globalConnectionOptions._requestTimeout : timeout, url,
transaction.toJson(), false);
builder.slice().toJson(), false);
try {
result.setVPack(VPackParser::fromJson(result.body().c_str()));
if (transaction.isWriteTransaction()) {
if (!result.slice().isObject() ||
!result.slice().get("results").isArray()) {
result._statusCode = 500;
return result;
}
if (result.slice().get("results").length() != 1) {
result._statusCode = 500;
return result;
}
} else {
if (!result.slice().isArray()) {
result._statusCode = 500;
return result;
}
if (result.slice().length() != 1) {
result._statusCode = 500;
return result;
}
}
result._body.clear();
result._statusCode = 200;
} catch(std::exception &e) {
LOG(ERR) << "Error transforming result. " << e.what();
result.clear();
} catch(...) {
LOG(ERR) << "Error transforming result. Out of memory";
result.clear();
}
return result;
}
////////////////////////////////////////////////////////////////////////////////
@ -2061,15 +2088,8 @@ AgencyCommResult AgencyComm::send(
result._message = response->getHttpReturnMessage();
basics::StringBuffer& sb = response->getBody();
result._body = std::string(sb.c_str(), sb.length());
result._index = 0;
result._statusCode = response->getHttpReturnCode();
bool found = false;
std::string lastIndex = response->getHeaderField("x-etcd-index", found);
if (found) {
result._index = arangodb::basics::StringUtils::uint64(lastIndex);
}
LOG(TRACE) << "request to agency returned status code " << result._statusCode
<< ", message: '" << result._message << "', body: '"
<< result._body << "'";

View File

@ -215,8 +215,10 @@ private:
struct AgencyTransaction {
virtual std::string toJson() const = 0;
virtual void toVelocyPack(arangodb::velocypack::Builder& builder) const = 0;
virtual ~AgencyTransaction() {
}
virtual bool isWriteTransaction() const = 0;
};
struct AgencyWriteTransaction : public AgencyTransaction {
@ -237,7 +239,7 @@ struct AgencyWriteTransaction : public AgencyTransaction {
/// @brief converts the transaction to velocypack
//////////////////////////////////////////////////////////////////////////////
void toVelocyPack(arangodb::velocypack::Builder& builder) const;
void toVelocyPack(arangodb::velocypack::Builder& builder) const override final;
//////////////////////////////////////////////////////////////////////////////
/// @brief converts the transaction to json
@ -270,6 +272,13 @@ struct AgencyWriteTransaction : public AgencyTransaction {
AgencyWriteTransaction() = default;
//////////////////////////////////////////////////////////////////////////////
/// @brief return type of transaction
//////////////////////////////////////////////////////////////////////////////
bool isWriteTransaction() const override final {
return true;
}
};
struct AgencyReadTransaction : public AgencyTransaction {
@ -284,7 +293,7 @@ struct AgencyReadTransaction : public AgencyTransaction {
/// @brief converts the transaction to velocypack
//////////////////////////////////////////////////////////////////////////////
void toVelocyPack(arangodb::velocypack::Builder& builder) const;
void toVelocyPack(arangodb::velocypack::Builder& builder) const override final;
//////////////////////////////////////////////////////////////////////////////
/// @brief converts the transaction to json
@ -300,12 +309,27 @@ struct AgencyReadTransaction : public AgencyTransaction {
keys.push_back(key);
}
//////////////////////////////////////////////////////////////////////////////
/// @brief shortcut to create a transaction with more than one operation
//////////////////////////////////////////////////////////////////////////////
explicit AgencyReadTransaction(std::vector<std::string>&& k)
: keys(k) {
}
//////////////////////////////////////////////////////////////////////////////
/// @brief default constructor
//////////////////////////////////////////////////////////////////////////////
AgencyReadTransaction() = default;
//////////////////////////////////////////////////////////////////////////////
/// @brief return type of transaction
//////////////////////////////////////////////////////////////////////////////
bool isWriteTransaction() const override final {
return false;
}
};
struct AgencyCommResult {
@ -342,12 +366,6 @@ struct AgencyCommResult {
int httpCode() const;
//////////////////////////////////////////////////////////////////////////////
/// @brief extract the "index" attribute from the result
//////////////////////////////////////////////////////////////////////////////
uint64_t index() const { return _index; }
//////////////////////////////////////////////////////////////////////////////
/// @brief extract the error code from the result
//////////////////////////////////////////////////////////////////////////////
@ -418,7 +436,6 @@ struct AgencyCommResult {
std::string _realBody;
std::map<std::string, AgencyCommResultEntry> _values;
uint64_t _index;
int _statusCode;
bool _connected;
};
@ -635,7 +652,7 @@ class AgencyComm {
/// @brief get unique id
//////////////////////////////////////////////////////////////////////////////
AgencyCommResult uniqid(std::string const&, uint64_t, double);
uint64_t uniqid(uint64_t, double);
//////////////////////////////////////////////////////////////////////////////
/// @brief registers a callback on a key

View File

@ -302,31 +302,44 @@ ClusterInfo::~ClusterInfo() {
////////////////////////////////////////////////////////////////////////////////
uint64_t ClusterInfo::uniqid(uint64_t count) {
MUTEX_LOCKER(mutexLocker, _idLock);
while (true) {
uint64_t oldValue;
{
// The quick path, we have enough in our private reserve:
MUTEX_LOCKER(mutexLocker, _idLock);
if (_uniqid._currentValue + count - 1 <= _uniqid._upperValue) {
uint64_t result = _uniqid._currentValue;
_uniqid._currentValue += count;
return result;
}
oldValue = _uniqid._currentValue;
}
// We need to fetch from the agency
if (_uniqid._currentValue + count - 1 >= _uniqid._upperValue) {
uint64_t fetch = count;
if (fetch < MinIdsPerBatch) {
fetch = MinIdsPerBatch;
}
AgencyCommResult result = _agency.uniqid("Sync/LatestID", fetch, 0.0);
uint64_t result = _agency.uniqid(fetch, 0.0);
if (!result.successful() || result._index == 0) {
return 0;
{
MUTEX_LOCKER(mutexLocker, _idLock);
if (oldValue == _uniqid._currentValue) {
_uniqid._currentValue = result + count;
_uniqid._upperValue = result + fetch - 1;
return result;
}
// If we get here, somebody else tried succeeded in doing the same,
// so we just try again.
}
_uniqid._currentValue = result._index + count;
_uniqid._upperValue = _uniqid._currentValue + fetch - 1;
return result._index;
}
uint64_t result = _uniqid._currentValue;
_uniqid._currentValue += count;
return result;
}
////////////////////////////////////////////////////////////////////////////////

View File

@ -113,9 +113,6 @@ void HeartbeatThread::runDBServer() {
// convert timeout to seconds
double const interval = (double)_interval / 1000.0 / 1000.0;
// value of Sync/Commands/my-id at startup
uint64_t lastCommandIndex = getLastCommandIndex();
std::function<bool(VPackSlice const& result)> updatePlan = [&](
VPackSlice const& result) {
if (!result.isNumber()) {
@ -170,23 +167,23 @@ void HeartbeatThread::runDBServer() {
break;
}
{
// send an initial GET request to Sync/Commands/my-id
AgencyCommResult result =
_agency.getValues("Sync/Commands/" + _myId, false);
if (result.successful()) {
handleStateChange(result, lastCommandIndex);
}
}
if (isStopping()) {
break;
}
if (--currentCount == 0) {
currentCount = currentCountStart;
// send an initial GET request to Sync/Commands/my-id
LOG(TRACE) << "Looking at Sync/Commands/" + _myId;
AgencyCommResult result =
_agency.getValues2("Sync/Commands/" + _myId);
if (result.successful()) {
handleStateChange(result);
}
if (isStopping()) {
break;
}
LOG(TRACE) << "Refetching Current/Version...";
AgencyCommResult res = _agency.getValues2("Current/Version");
if (!res.successful()) {
@ -286,9 +283,6 @@ void HeartbeatThread::runCoordinator() {
// last value of current which we have noticed:
uint64_t lastCurrentVersionNoticed = 0;
// value of Sync/Commands/my-id at startup
uint64_t lastCommandIndex = getLastCommandIndex();
setReady();
while (!isStopping()) {
@ -303,28 +297,20 @@ void HeartbeatThread::runCoordinator() {
break;
}
{
AgencyReadTransaction trx(std::vector<std::string>({
_agency.prefix() + "Plan/Version",
_agency.prefix() + "Current/Version",
_agency.prefix() + "Sync/Commands/" + _myId,
_agency.prefix() + "Sync/UserVersion"}));
AgencyCommResult result = _agency.sendTransactionWithFailover(trx);
// send an initial GET request to Sync/Commands/my-id
AgencyCommResult result =
_agency.getValues("Sync/Commands/" + _myId, false);
if (!result.successful()) {
LOG(WARN) << "Heartbeat: Could not read from agency!";
} else {
LOG(TRACE) << "Looking at Sync/Commands/" + _myId;
if (result.successful()) {
handleStateChange(result, lastCommandIndex);
}
}
handleStateChange(result);
if (isStopping()) {
break;
}
bool shouldSleep = true;
// get the current version of the Plan
AgencyCommResult result = _agency.getValues2("Plan/Version");
if (result.successful()) {
VPackSlice versionSlice
= result.slice()[0].get(std::vector<std::string>(
{_agency.prefixStripped(), "Plan", "Version"}));
@ -345,15 +331,8 @@ void HeartbeatThread::runCoordinator() {
}
}
}
}
result.clear();
result = _agency.getValues2("Sync/UserVersion");
if (result.successful()) {
velocypack::Slice slice =
VPackSlice slice =
result.slice()[0].get(std::vector<std::string>(
{_agency.prefixStripped(), "Sync", "UserVersion"}));
@ -395,14 +374,9 @@ void HeartbeatThread::runCoordinator() {
}
}
}
}
result = _agency.getValues2("Current/Version");
if (result.successful()) {
VPackSlice versionSlice
= result.slice()[0].get(std::vector<std::string>(
{_agency.prefixStripped(), "Plan", "Version"}));
versionSlice = result.slice()[0].get(std::vector<std::string>(
{_agency.prefixStripped(), "Plan", "Version"}));
if (versionSlice.isInteger()) {
uint64_t currentVersion = 0;
@ -419,19 +393,19 @@ void HeartbeatThread::runCoordinator() {
}
}
if (shouldSleep) {
double remain = interval - (TRI_microtime() - start);
double remain = interval - (TRI_microtime() - start);
// sleep for a while if appropriate, on some systems usleep does not
// like arguments greater than 1000000
while (remain > 0.0) {
if (remain >= 0.5) {
usleep(500000);
remain -= 0.5;
} else {
usleep((unsigned long)(remain * 1000.0 * 1000.0));
remain = 0.0;
}
LOG(INFO) << "HeartbeatThread: remain is " << remain;
// sleep for a while if appropriate, on some systems usleep does not
// like arguments greater than 1000000
while (remain > 0.0) {
if (remain >= 0.5) {
usleep(500000);
remain -= 0.5;
} else {
usleep((unsigned long)(remain * 1000.0 * 1000.0));
remain = 0.0;
}
}
@ -477,39 +451,6 @@ void HeartbeatThread::removeDispatchedJob(ServerJobResult result) {
_condition.signal();
}
////////////////////////////////////////////////////////////////////////////////
/// @brief fetch the index id of the value of Sync/Commands/my-id from the
/// agency this index value is determined initially and it is passed to the
/// watch command (we're waiting for an entry with a higher id)
////////////////////////////////////////////////////////////////////////////////
uint64_t HeartbeatThread::getLastCommandIndex() {
// get the initial command state
AgencyCommResult result = _agency.getValues("Sync/Commands/" + _myId, false);
if (result.successful()) {
result.parse("Sync/Commands/", false);
std::map<std::string, AgencyCommResultEntry>::iterator it =
result._values.find(_myId);
if (it != result._values.end()) {
// found something
LOG(TRACE) << "last command index was: '" << (*it).second._index << "'";
return (*it).second._index;
}
}
if (result._index > 0) {
// use the value returned in header X-Etcd-Index
return result._index;
}
// nothing found. this is not an error
return 0;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief handles a plan version change, coordinator case
/// this is triggered if the heartbeat thread finds a new plan version number
@ -648,7 +589,8 @@ bool HeartbeatThread::handlePlanChangeCoordinator(uint64_t currentPlanVersion) {
////////////////////////////////////////////////////////////////////////////////
/// @brief handles a plan version change, DBServer case
/// this is triggered if the heartbeat thread finds a new plan version number
/// this is triggered if the heartbeat thread finds a new plan version number,
/// and every few heartbeats if the Current/Version has changed.
////////////////////////////////////////////////////////////////////////////////
bool HeartbeatThread::syncDBServerStatusQuo() {
@ -716,21 +658,12 @@ bool HeartbeatThread::syncDBServerStatusQuo() {
/// notified about this particular change again).
////////////////////////////////////////////////////////////////////////////////
bool HeartbeatThread::handleStateChange(AgencyCommResult& result,
uint64_t& lastCommandIndex) {
result.parse("Sync/Commands/", false);
std::map<std::string, AgencyCommResultEntry>::const_iterator it =
result._values.find(_myId);
if (it != result._values.end()) {
lastCommandIndex = (*it).second._index;
std::string command = "";
VPackSlice const slice = it->second._vpack->slice();
if (slice.isString()) {
command = slice.copyString();
}
bool HeartbeatThread::handleStateChange(AgencyCommResult& result) {
VPackSlice const slice = result.slice()[0].get(
std::vector<std::string>({ AgencyComm::prefixStripped(), "Sync",
"Commands", _myId }));
if (slice.isString()) {
std::string command = slice.copyString();
ServerState::StateEnum newState = ServerState::stringToState(command);
if (newState != ServerState::STATE_UNDEFINED) {

View File

@ -128,13 +128,7 @@ class HeartbeatThread : public Thread {
/// @brief handles a state change
//////////////////////////////////////////////////////////////////////////////
bool handleStateChange(AgencyCommResult&, uint64_t&);
//////////////////////////////////////////////////////////////////////////////
/// @brief fetch the last value of Sync/Commands/my-id from the agency
//////////////////////////////////////////////////////////////////////////////
uint64_t getLastCommandIndex();
bool handleStateChange(AgencyCommResult&);
//////////////////////////////////////////////////////////////////////////////
/// @brief sends the current server's state to the agency

View File

@ -608,15 +608,13 @@ static void JS_UniqidAgency(v8::FunctionCallbackInfo<v8::Value> const& args) {
TRI_V8_TRY_CATCH_BEGIN(isolate);
v8::HandleScope scope(isolate);
if (args.Length() < 1 || args.Length() > 3) {
TRI_V8_THROW_EXCEPTION_USAGE("uniqid(<key>, <count>, <timeout>)");
if (args.Length() > 2) {
TRI_V8_THROW_EXCEPTION_USAGE("uniqid(<count>, <timeout>)");
}
std::string const key = TRI_ObjectToString(args[0]);
uint64_t count = 1;
if (args.Length() > 1) {
count = TRI_ObjectToUInt64(args[1], true);
if (args.Length() > 0) {
count = TRI_ObjectToUInt64(args[0], true);
}
if (count < 1 || count > 10000000) {
@ -624,18 +622,14 @@ static void JS_UniqidAgency(v8::FunctionCallbackInfo<v8::Value> const& args) {
}
double timeout = 0.0;
if (args.Length() > 2) {
timeout = TRI_ObjectToDouble(args[2]);
if (args.Length() > 1) {
timeout = TRI_ObjectToDouble(args[1]);
}
AgencyComm comm;
AgencyCommResult result = comm.uniqid(key, count, timeout);
uint64_t result = comm.uniqid(count, timeout);
if (!result.successful() || result._index == 0) {
THROW_AGENCY_EXCEPTION(result);
}
std::string const value = StringUtils::itoa(result._index);
std::string const value = StringUtils::itoa(result);
TRI_V8_RETURN_STD_STRING(value);
TRI_V8_TRY_CATCH_END
@ -1070,7 +1064,7 @@ static void JS_UniqidClusterInfo(
TRI_V8_THROW_EXCEPTION_PARAMETER("<count> is invalid");
}
uint64_t value = ClusterInfo::instance()->uniqid();
uint64_t value = ClusterInfo::instance()->uniqid(count);
if (value == 0) {
TRI_V8_THROW_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,