mirror of https://gitee.com/bigwinds/arangodb
Bug fix/fixes 110918 2 (#6848)
This commit is contained in:
parent
a0980ed2f2
commit
2dc05429fe
|
@ -424,15 +424,7 @@ OperationID ClusterComm::asyncRequest(
|
||||||
}
|
}
|
||||||
result->fromError(errorCode, std::move(response));
|
result->fromError(errorCode, std::move(response));
|
||||||
if (result->status == CL_COMM_BACKEND_UNAVAILABLE) {
|
if (result->status == CL_COMM_BACKEND_UNAVAILABLE) {
|
||||||
if (doLogConnectionErrors) {
|
logConnectionError(doLogConnectionErrors, result.get(), connectTimeout, __LINE__);
|
||||||
LOG_TOPIC(ERR, Logger::CLUSTER)
|
|
||||||
<< "cannot create connection to server (1) '" << result->serverID
|
|
||||||
<< "' at endpoint '" << result->endpoint << "'" << connectTimeout;
|
|
||||||
} else {
|
|
||||||
LOG_TOPIC(INFO, Logger::CLUSTER)
|
|
||||||
<< "cannot create connection to server (2)'" << result->serverID
|
|
||||||
<< "' at endpoint '" << result->endpoint << "'" << connectTimeout;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
/*bool ret =*/ ((*callback.get())(result.get()));
|
/*bool ret =*/ ((*callback.get())(result.get()));
|
||||||
// TRI_ASSERT(ret == true);
|
// TRI_ASSERT(ret == true);
|
||||||
|
@ -452,15 +444,7 @@ OperationID ClusterComm::asyncRequest(
|
||||||
CONDITION_LOCKER(locker, somethingReceived);
|
CONDITION_LOCKER(locker, somethingReceived);
|
||||||
result->fromError(errorCode, std::move(response));
|
result->fromError(errorCode, std::move(response));
|
||||||
if (result->status == CL_COMM_BACKEND_UNAVAILABLE) {
|
if (result->status == CL_COMM_BACKEND_UNAVAILABLE) {
|
||||||
if (doLogConnectionErrors) {
|
logConnectionError(doLogConnectionErrors, result.get(), connectTimeout, __LINE__);
|
||||||
LOG_TOPIC(ERR, Logger::CLUSTER)
|
|
||||||
<< "cannot create connection to server (3)'" << result->serverID
|
|
||||||
<< "' at endpoint '" << result->endpoint << "'" << connectTimeout;
|
|
||||||
} else {
|
|
||||||
LOG_TOPIC(INFO, Logger::CLUSTER)
|
|
||||||
<< "cannot create connection to server (4)'" << result->serverID
|
|
||||||
<< "' at endpoint '" << result->endpoint << "'" << connectTimeout;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
somethingReceived.broadcast();
|
somethingReceived.broadcast();
|
||||||
};
|
};
|
||||||
|
@ -532,15 +516,7 @@ std::unique_ptr<ClusterCommResult> ClusterComm::syncRequest(
|
||||||
CONDITION_LOCKER(isen, cv);
|
CONDITION_LOCKER(isen, cv);
|
||||||
result->fromError(errorCode, std::move(response));
|
result->fromError(errorCode, std::move(response));
|
||||||
if (result->status == CL_COMM_BACKEND_UNAVAILABLE) {
|
if (result->status == CL_COMM_BACKEND_UNAVAILABLE) {
|
||||||
if (doLogConnectionErrors) {
|
logConnectionError(doLogConnectionErrors, result.get(), 0.0, __LINE__);
|
||||||
LOG_TOPIC(ERR, Logger::CLUSTER)
|
|
||||||
<< "cannot create connection to server (5)'" << result->serverID
|
|
||||||
<< "' at endpoint '" << result->endpoint << "'";
|
|
||||||
} else {
|
|
||||||
LOG_TOPIC(INFO, Logger::CLUSTER)
|
|
||||||
<< "cannot create connection to server (6)'" << result->serverID
|
|
||||||
<< "' at endpoint '" << result->endpoint << "'";
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
wasSignaled = true;
|
wasSignaled = true;
|
||||||
cv.signal();
|
cv.signal();
|
||||||
|
@ -1365,3 +1341,18 @@ void ClusterCommThread::run() {
|
||||||
|
|
||||||
LOG_TOPIC(DEBUG, Logger::CLUSTER) << "stopped ClusterComm thread";
|
LOG_TOPIC(DEBUG, Logger::CLUSTER) << "stopped ClusterComm thread";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// @brief logs a connection error (backend unavailable)
|
||||||
|
void ClusterComm::logConnectionError(bool useErrorLogLevel, ClusterCommResult const* result, double timeout, int /*line*/) {
|
||||||
|
std::string msg = "cannot create connection to server";
|
||||||
|
if (!result->serverID.empty()) {
|
||||||
|
msg += ": '" + result->serverID + '\'';
|
||||||
|
}
|
||||||
|
msg += " at endpoint " + result->endpoint + "', timeout: " + std::to_string(timeout);
|
||||||
|
|
||||||
|
if (useErrorLogLevel) {
|
||||||
|
LOG_TOPIC(ERR, Logger::CLUSTER) << msg;
|
||||||
|
} else {
|
||||||
|
LOG_TOPIC(INFO, Logger::CLUSTER) << msg;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -681,13 +681,18 @@ class ClusterComm {
|
||||||
|
|
||||||
void cleanupAllQueues();
|
void cleanupAllQueues();
|
||||||
|
|
||||||
|
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
/// @brief activeServerTickets for a list of servers
|
/// @brief activeServerTickets for a list of servers
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
std::vector<communicator::Ticket> activeServerTickets(std::vector<std::string> const& servers);
|
std::vector<communicator::Ticket> activeServerTickets(std::vector<std::string> const& servers);
|
||||||
|
|
||||||
|
private:
|
||||||
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief logs a connection error (backend unavailable)
|
||||||
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
|
static void logConnectionError(bool useErrorLogLevel, ClusterCommResult const* result, double timeout, int line);
|
||||||
|
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
/// @brief our background communications thread
|
/// @brief our background communications thread
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
|
@ -52,6 +52,7 @@ ReplicationApplierConfiguration::ReplicationApplierConfiguration()
|
||||||
_idleMaxWaitTime(5 * 500 * 1000),
|
_idleMaxWaitTime(5 * 500 * 1000),
|
||||||
_initialSyncMaxWaitTime(300 * 1000 * 1000),
|
_initialSyncMaxWaitTime(300 * 1000 * 1000),
|
||||||
_autoResyncRetries(2),
|
_autoResyncRetries(2),
|
||||||
|
_maxPacketSize(512 * 1024 * 1024),
|
||||||
_sslProtocol(0),
|
_sslProtocol(0),
|
||||||
_skipCreateDrop(false),
|
_skipCreateDrop(false),
|
||||||
_autoStart(false),
|
_autoStart(false),
|
||||||
|
@ -82,6 +83,7 @@ void ReplicationApplierConfiguration::reset() {
|
||||||
_idleMaxWaitTime = 5 * 500 * 1000;
|
_idleMaxWaitTime = 5 * 500 * 1000;
|
||||||
_initialSyncMaxWaitTime = 300 * 1000 * 1000;
|
_initialSyncMaxWaitTime = 300 * 1000 * 1000;
|
||||||
_autoResyncRetries = 2;
|
_autoResyncRetries = 2;
|
||||||
|
_maxPacketSize = 512 * 1024 * 1024;
|
||||||
_sslProtocol = 0;
|
_sslProtocol = 0;
|
||||||
_skipCreateDrop = false;
|
_skipCreateDrop = false;
|
||||||
_autoStart = false;
|
_autoStart = false;
|
||||||
|
@ -133,6 +135,7 @@ void ReplicationApplierConfiguration::toVelocyPack(VPackBuilder& builder, bool i
|
||||||
builder.add("adaptivePolling", VPackValue(_adaptivePolling));
|
builder.add("adaptivePolling", VPackValue(_adaptivePolling));
|
||||||
builder.add("autoResync", VPackValue(_autoResync));
|
builder.add("autoResync", VPackValue(_autoResync));
|
||||||
builder.add("autoResyncRetries", VPackValue(_autoResyncRetries));
|
builder.add("autoResyncRetries", VPackValue(_autoResyncRetries));
|
||||||
|
builder.add("maxPacketSize", VPackValue(_maxPacketSize));
|
||||||
builder.add("includeSystem", VPackValue(_includeSystem));
|
builder.add("includeSystem", VPackValue(_includeSystem));
|
||||||
builder.add("requireFromPresent", VPackValue(_requireFromPresent));
|
builder.add("requireFromPresent", VPackValue(_requireFromPresent));
|
||||||
builder.add("verbose", VPackValue(_verbose));
|
builder.add("verbose", VPackValue(_verbose));
|
||||||
|
@ -346,6 +349,11 @@ ReplicationApplierConfiguration ReplicationApplierConfiguration::fromVelocyPack(
|
||||||
if (value.isNumber()) {
|
if (value.isNumber()) {
|
||||||
configuration._autoResyncRetries = value.getNumber<uint64_t>();
|
configuration._autoResyncRetries = value.getNumber<uint64_t>();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
value = slice.get("maxPacketSize");
|
||||||
|
if (value.isNumber()) {
|
||||||
|
configuration._maxPacketSize = value.getNumber<uint64_t>();
|
||||||
|
}
|
||||||
|
|
||||||
// read the endpoint
|
// read the endpoint
|
||||||
value = slice.get("endpoint");
|
value = slice.get("endpoint");
|
||||||
|
|
|
@ -57,6 +57,7 @@ class ReplicationApplierConfiguration {
|
||||||
uint64_t _idleMaxWaitTime;
|
uint64_t _idleMaxWaitTime;
|
||||||
uint64_t _initialSyncMaxWaitTime;
|
uint64_t _initialSyncMaxWaitTime;
|
||||||
uint64_t _autoResyncRetries;
|
uint64_t _autoResyncRetries;
|
||||||
|
uint64_t _maxPacketSize;
|
||||||
uint32_t _sslProtocol;
|
uint32_t _sslProtocol;
|
||||||
bool _skipCreateDrop; /// shards/indexes/views are created by schmutz++
|
bool _skipCreateDrop; /// shards/indexes/views are created by schmutz++
|
||||||
bool _autoStart; /// start applier after server start
|
bool _autoStart; /// start applier after server start
|
||||||
|
|
|
@ -190,8 +190,8 @@ Connection::Connection(Syncer* syncer,
|
||||||
if (endpoint != nullptr) {
|
if (endpoint != nullptr) {
|
||||||
connection.reset(httpclient::GeneralClientConnection::factory(
|
connection.reset(httpclient::GeneralClientConnection::factory(
|
||||||
endpoint, applierConfig._requestTimeout, applierConfig._connectTimeout,
|
endpoint, applierConfig._requestTimeout, applierConfig._connectTimeout,
|
||||||
(size_t)applierConfig._maxConnectRetries,
|
static_cast<size_t>(applierConfig._maxConnectRetries),
|
||||||
(uint32_t)applierConfig._sslProtocol));
|
static_cast<uint32_t>(applierConfig._sslProtocol)));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (connection != nullptr) {
|
if (connection != nullptr) {
|
||||||
|
@ -217,9 +217,9 @@ Connection::Connection(Syncer* syncer,
|
||||||
} else {
|
} else {
|
||||||
params.setJwt(applierConfig._jwt);
|
params.setJwt(applierConfig._jwt);
|
||||||
}
|
}
|
||||||
|
params.setMaxPacketSize(applierConfig._maxPacketSize);
|
||||||
params.setLocationRewriter(syncer, &(syncer->rewriteLocation));
|
params.setLocationRewriter(syncer, &(syncer->rewriteLocation));
|
||||||
client.reset(new httpclient::SimpleHttpClient(connection, params));
|
client.reset(new httpclient::SimpleHttpClient(connection, params));
|
||||||
// client->checkForGlobalAbort(true);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue