mirror of https://gitee.com/bigwinds/arangodb
multi-host agency tests startup ok
This commit is contained in:
commit
49c1435d9c
|
@ -159,26 +159,27 @@ void AgencyFeature::prepare() {
|
|||
}
|
||||
|
||||
void AgencyFeature::start() {
|
||||
|
||||
if (!isEnabled()) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
// TODO: Port this to new options handling
|
||||
std::string endpoint;
|
||||
std::string port = "8529";
|
||||
|
||||
|
||||
EndpointFeature* endpointFeature =
|
||||
ApplicationServer::getFeature<EndpointFeature>("Endpoint");
|
||||
ApplicationServer::getFeature<EndpointFeature>("Endpoint");
|
||||
auto endpoints = endpointFeature->httpEndpoints();
|
||||
|
||||
|
||||
if (!endpoints.empty()) {
|
||||
size_t pos = endpoint.find(':', 10);
|
||||
|
||||
size_t pos = endpoints[0].find(':',10);
|
||||
|
||||
if (pos != std::string::npos) {
|
||||
port = endpoint.substr(pos + 1, endpoint.size() - pos);
|
||||
port = endpoints[0].substr(pos + 1, endpoints[0].size() - pos);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
endpoint = std::string("tcp://localhost:" + port);
|
||||
|
||||
_agent.reset(new consensus::Agent(consensus::config_t(
|
||||
|
|
|
@ -212,7 +212,7 @@ bool Agent::recvAppendEntriesRPC(term_t term,
|
|||
<< queries->slice().length()
|
||||
<< " entries to state machine.";
|
||||
/* bool success = */
|
||||
//_state.log(queries, term, leaderId, prevIndex, prevTerm);
|
||||
_state.log(queries, term, leaderId, prevIndex, prevTerm);
|
||||
} else {
|
||||
// heart-beat
|
||||
}
|
||||
|
|
|
@ -220,6 +220,16 @@ OperationID ClusterComm::getOperationID() { return TRI_NewTickServer(); }
|
|||
/// here in the form of "server:" followed by a serverID. Furthermore,
|
||||
/// it is possible to specify the target endpoint directly using
|
||||
/// "tcp://..." or "ssl://..." endpoints, if `singleRequest` is true.
|
||||
///
|
||||
/// There are two timeout arguments. `timeout` is the globale timeout
|
||||
/// specifying after how many seconds the complete operation must be
|
||||
/// completed. `initTimeout` is a second timeout, which is used to
|
||||
/// limit the time to send the initial request away. If `initTimeout`
|
||||
/// is negative (as for example in the default value), then `initTimeout`
|
||||
/// is taken to be the same as `timeout`. The idea behind the two timeouts
|
||||
/// is to be able to specify correct behaviour for automatic failover.
|
||||
/// The idea is that if the initial request cannot be sent within
|
||||
/// `initTimeout`, one can retry after a potential failover.
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
OperationID ClusterComm::asyncRequest(
|
||||
|
@ -229,7 +239,7 @@ OperationID ClusterComm::asyncRequest(
|
|||
std::string const& path, std::shared_ptr<std::string const> body,
|
||||
std::unique_ptr<std::unordered_map<std::string, std::string>>& headerFields,
|
||||
std::shared_ptr<ClusterCommCallback> callback, ClusterCommTimeout timeout,
|
||||
bool singleRequest) {
|
||||
bool singleRequest, ClusterCommTimeout initTimeout) {
|
||||
|
||||
TRI_ASSERT(headerFields.get() != nullptr);
|
||||
|
||||
|
@ -248,8 +258,13 @@ OperationID ClusterComm::asyncRequest(
|
|||
op->body = body;
|
||||
op->headerFields = std::move(headerFields);
|
||||
op->callback = callback;
|
||||
op->endTime = timeout == 0.0 ? TRI_microtime() + 24 * 60 * 60.0
|
||||
: TRI_microtime() + timeout;
|
||||
double now = TRI_microtime();
|
||||
op->endTime = timeout == 0.0 ? now + 24 * 60 * 60.0 : now + timeout;
|
||||
if (initTimeout <= 0.0) {
|
||||
op->initEndTime = op->endTime;
|
||||
} else {
|
||||
op->initEndTime = now + initTimeout;
|
||||
}
|
||||
|
||||
op->result.setDestination(destination, logConnectionErrors());
|
||||
if (op->result.status == CL_COMM_BACKEND_UNAVAILABLE) {
|
||||
|
@ -1079,13 +1094,12 @@ size_t ClusterComm::performRequests(std::vector<ClusterCommRequest>& requests,
|
|||
<< "ClusterComm::performRequests: sending request to "
|
||||
<< requests[i].destination << ":" << requests[i].path
|
||||
<< "body:" << requests[i].body;
|
||||
double localTimeOut
|
||||
double localInitTimeout
|
||||
= (std::min)((std::max)(1.0, now - startTime), 10.0);
|
||||
if (localTimeOut <= endTime - now) {
|
||||
dueTime[i] = now + localTimeOut;
|
||||
} else {
|
||||
localTimeOut = endTime - now;
|
||||
dueTime[i] = endTime + 10;
|
||||
double localTimeout = endTime - now;
|
||||
dueTime[i] = endTime + 10; // no retry unless ordered elsewhere
|
||||
if (localInitTimeout > localTimeout) {
|
||||
localInitTimeout = localTimeout;
|
||||
}
|
||||
OperationID opId = asyncRequest("", coordinatorTransactionID,
|
||||
requests[i].destination,
|
||||
|
@ -1093,8 +1107,8 @@ size_t ClusterComm::performRequests(std::vector<ClusterCommRequest>& requests,
|
|||
requests[i].path,
|
||||
requests[i].body,
|
||||
requests[i].headerFields,
|
||||
nullptr, localTimeOut,
|
||||
false);
|
||||
nullptr, localTimeout,
|
||||
false, localInitTimeout);
|
||||
opIDtoIndex.insert(std::make_pair(opId, i));
|
||||
// It is possible that an error occurs right away, we will notice
|
||||
// below after wait(), though, and retry in due course.
|
||||
|
@ -1153,11 +1167,17 @@ size_t ClusterComm::performRequests(std::vector<ClusterCommRequest>& requests,
|
|||
} else if (res.status == CL_COMM_BACKEND_UNAVAILABLE ||
|
||||
(res.status == CL_COMM_TIMEOUT && !res.sendWasComplete)) {
|
||||
requests[index].result = res;
|
||||
// In this case we will retry at the dueTime, if it is before endTime:
|
||||
// In this case we will retry:
|
||||
dueTime[index] = (std::min)(10.0,
|
||||
(std::max)(0.2, 2 * (now - startTime))) +
|
||||
startTime;
|
||||
if (dueTime[index] >= endTime) {
|
||||
requests[index].done = true;
|
||||
nrDone++;
|
||||
}
|
||||
if (dueTime[index] < actionNeeded) {
|
||||
actionNeeded = dueTime[index];
|
||||
}
|
||||
LOG_TOPIC(TRACE, logTopic) << "ClusterComm::performRequests: "
|
||||
<< "got BACKEND_UNAVAILABLE or TIMEOUT from "
|
||||
<< requests[index].destination << ":"
|
||||
|
@ -1231,7 +1251,7 @@ void ClusterCommThread::run() {
|
|||
|
||||
// Have we already reached the timeout?
|
||||
double currentTime = TRI_microtime();
|
||||
if (op->endTime <= currentTime) {
|
||||
if (op->initEndTime <= currentTime) {
|
||||
op->result.status = CL_COMM_TIMEOUT;
|
||||
} else {
|
||||
// We know that op->result.endpoint is nonempty here, otherwise
|
||||
|
@ -1271,7 +1291,8 @@ void ClusterCommThread::run() {
|
|||
|
||||
auto client =
|
||||
std::make_unique<arangodb::httpclient::SimpleHttpClient>(
|
||||
connection->_connection, op->endTime - currentTime, false);
|
||||
connection->_connection, op->initEndTime - currentTime,
|
||||
false);
|
||||
client->keepConnectionOnDestruction(true);
|
||||
|
||||
// We add this result to the operation struct without acquiring
|
||||
|
|
|
@ -257,6 +257,7 @@ struct ClusterCommOperation {
|
|||
std::unique_ptr<std::unordered_map<std::string, std::string>> headerFields;
|
||||
std::shared_ptr<ClusterCommCallback> callback;
|
||||
ClusterCommTimeout endTime;
|
||||
ClusterCommTimeout initEndTime;
|
||||
};
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -374,7 +375,7 @@ class ClusterComm {
|
|||
std::shared_ptr<std::string const> body,
|
||||
std::unique_ptr<std::unordered_map<std::string, std::string>>& headerFields,
|
||||
std::shared_ptr<ClusterCommCallback> callback, ClusterCommTimeout timeout,
|
||||
bool singleRequest = false);
|
||||
bool singleRequest = false, ClusterCommTimeout initTimeout = -1.0);
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief submit a single HTTP request to a shard synchronously.
|
||||
|
|
|
@ -1406,7 +1406,7 @@ static void PrepareClusterCommRequest(
|
|||
std::unordered_map<std::string, std::string>& headerFields,
|
||||
ClientTransactionID& clientTransactionID,
|
||||
CoordTransactionID& coordTransactionID, double& timeout,
|
||||
bool& singleRequest) {
|
||||
bool& singleRequest, double& initTimeout) {
|
||||
v8::Isolate* isolate = args.GetIsolate();
|
||||
TRI_V8_CURRENT_GLOBALS_AND_SCOPE;
|
||||
|
||||
|
@ -1487,6 +1487,10 @@ static void PrepareClusterCommRequest(
|
|||
if (opt->Has(SingleRequestKey)) {
|
||||
singleRequest = TRI_ObjectToBoolean(opt->Get(SingleRequestKey));
|
||||
}
|
||||
TRI_GET_GLOBAL_STRING(InitTimeoutKey);
|
||||
if (opt->Has(InitTimeoutKey)) {
|
||||
initTimeout = TRI_ObjectToDouble(opt->Get(InitTimeoutKey));
|
||||
}
|
||||
}
|
||||
if (clientTransactionID == "") {
|
||||
clientTransactionID = StringUtils::itoa(TRI_NewTickServer());
|
||||
|
@ -1645,6 +1649,7 @@ static void JS_AsyncRequest(v8::FunctionCallbackInfo<v8::Value> const& args) {
|
|||
// - coordTransactionID (number)
|
||||
// - timeout (number)
|
||||
// - singleRequest (boolean) default is false
|
||||
// - initTimeout (number)
|
||||
|
||||
ClusterComm* cc = ClusterComm::instance();
|
||||
|
||||
|
@ -1661,15 +1666,17 @@ static void JS_AsyncRequest(v8::FunctionCallbackInfo<v8::Value> const& args) {
|
|||
ClientTransactionID clientTransactionID;
|
||||
CoordTransactionID coordTransactionID;
|
||||
double timeout;
|
||||
double initTimeout = -1.0;
|
||||
bool singleRequest = false;
|
||||
|
||||
PrepareClusterCommRequest(args, reqType, destination, path, *body,
|
||||
*headerFields, clientTransactionID,
|
||||
coordTransactionID, timeout, singleRequest);
|
||||
coordTransactionID, timeout, singleRequest,
|
||||
initTimeout);
|
||||
|
||||
OperationID opId = cc->asyncRequest(
|
||||
clientTransactionID, coordTransactionID, destination, reqType, path, body,
|
||||
headerFields, 0, timeout, singleRequest);
|
||||
headerFields, 0, timeout, singleRequest, initTimeout);
|
||||
ClusterCommResult res = cc->enquire(opId);
|
||||
if (res.status == CL_COMM_BACKEND_UNAVAILABLE) {
|
||||
TRI_V8_THROW_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
|
||||
|
@ -1723,11 +1730,13 @@ static void JS_SyncRequest(v8::FunctionCallbackInfo<v8::Value> const& args) {
|
|||
ClientTransactionID clientTransactionID;
|
||||
CoordTransactionID coordTransactionID;
|
||||
double timeout;
|
||||
double initTimeout = -1.0;
|
||||
bool singleRequest = false; // of no relevance here
|
||||
|
||||
PrepareClusterCommRequest(args, reqType, destination, path, body,
|
||||
*headerFields, clientTransactionID,
|
||||
coordTransactionID, timeout, singleRequest);
|
||||
coordTransactionID, timeout, singleRequest,
|
||||
initTimeout);
|
||||
|
||||
std::unique_ptr<ClusterCommResult> res =
|
||||
cc->syncRequest(clientTransactionID, coordTransactionID, destination,
|
||||
|
|
|
@ -1404,18 +1404,19 @@ function startInstanceAgency(instanceInfo, protocol, options,
|
|||
instanceArgs["agency.id"] = String(i);
|
||||
instanceArgs["agency.size"] = String(N);
|
||||
instanceArgs["agency.wait-for-sync"] = String(wfs);
|
||||
instanceArgs["agency.supervision"] = "false";
|
||||
instanceArgs["database.directory"] = rootDir + "agency-" + i;
|
||||
|
||||
instanceArgs["agency.supervision"] = "true";
|
||||
instanceArgs["database.directory"] = dataDir + String(i);
|
||||
|
||||
if (i === N - 1) {
|
||||
const port = findFreePort();
|
||||
instanceArgs["server.endpoint"] = "tcp://127.0.0.1:" + port;
|
||||
let l = [];
|
||||
for (let j = 0; j < N; j++) {
|
||||
instanceInfo.arangods.forEach(arangod => {
|
||||
l.push("--agency.endpoint");
|
||||
l.push(arangod.endpoint);
|
||||
});
|
||||
}
|
||||
instanceInfo.arangods.forEach(arangod => {
|
||||
l.push("--agency.endpoint");
|
||||
l.push(arangod.endpoint);
|
||||
});
|
||||
l.push("--agency.endpoint");
|
||||
l.push("tcp://127.0.0.1:" + port);
|
||||
l.push("--agency.notify");
|
||||
l.push("true");
|
||||
|
||||
|
|
|
@ -67,6 +67,7 @@ TRI_v8_global_s::TRI_v8_global_s(v8::Isolate* isolate)
|
|||
HeadersKey(),
|
||||
HttpOnlyKey(),
|
||||
IdKey(),
|
||||
InitTimeoutKey(),
|
||||
IsSystemKey(),
|
||||
IsVolatileKey(),
|
||||
JournalSizeKey(),
|
||||
|
@ -156,6 +157,7 @@ TRI_v8_global_s::TRI_v8_global_s(v8::Isolate* isolate)
|
|||
HeadersKey.Reset(isolate, TRI_V8_ASCII_STRING("headers"));
|
||||
HttpOnlyKey.Reset(isolate, TRI_V8_ASCII_STRING("httpOnly"));
|
||||
IdKey.Reset(isolate, TRI_V8_ASCII_STRING("id"));
|
||||
InitTimeoutKey.Reset(isolate, TRI_V8_ASCII_STRING("initTimeout"));
|
||||
IsSystemKey.Reset(isolate, TRI_V8_ASCII_STRING("isSystem"));
|
||||
IsVolatileKey.Reset(isolate, TRI_V8_ASCII_STRING("isVolatile"));
|
||||
JournalSizeKey.Reset(isolate, TRI_V8_ASCII_STRING("journalSize"));
|
||||
|
|
|
@ -706,6 +706,12 @@ typedef struct TRI_v8_global_s {
|
|||
|
||||
v8::Persistent<v8::String> IdKey;
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief "initTimeout" key name
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
v8::Persistent<v8::String> InitTimeoutKey;
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief "isSystem" key name
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
|
Loading…
Reference in New Issue