1
0
Fork 0

Fix timeouts for write operations from coordinator to leader. (#7081)

* Improve logging on coordinator when doing `arangorestore`.

* Return more error information in `mergeResults`.

* Longer timeout for communication coordinator -> leader for writes.

This is taking into account possible write stops from followers needed
to get in sync.

* Fix compilation.

* Get rid of numbers in exception log messages.

* Fix compilation.

* Fix indentation.
This commit is contained in:
Max Neunhöffer 2018-10-31 14:39:48 +01:00 committed by GitHub
parent 8225003861
commit 42fd0825ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 74 additions and 6 deletions

View File

@ -53,8 +53,14 @@
using namespace arangodb::basics;
using namespace arangodb::rest;
// Timeout for read operations:
static double const CL_DEFAULT_TIMEOUT = 120.0;
// Timeout for write operations, note that these are used for communication
// with a shard leader and we always have to assume that some follower has
// stopped writes for some time to get in sync:
static double const CL_DEFAULT_LONG_TIMEOUT = 900.0;
namespace {
template<typename T>
T addFigures(VPackSlice const& v1, VPackSlice const& v2, std::vector<std::string> const& attr) {
@ -231,7 +237,12 @@ static void mergeResults(
arr.get(StaticStrings::Error).isBoolean() && arr.get(StaticStrings::Error).getBoolean()) {
// an error occurred, now rethrow the error
int res = arr.get(StaticStrings::ErrorNum).getNumericValue<int>();
THROW_ARANGO_EXCEPTION(res);
VPackSlice msg = arr.get(StaticStrings::ErrorMessage);
if (msg.isString()) {
THROW_ARANGO_EXCEPTION_MESSAGE(res, msg.copyString());
} else {
THROW_ARANGO_EXCEPTION(res);
}
}
resultBody->add(arr.at(pair.second));
}
@ -1186,7 +1197,7 @@ Result createDocumentOnCoordinator(
// Perform the requests
size_t nrDone = 0;
cc->performRequests(requests, CL_DEFAULT_TIMEOUT, nrDone, Logger::COMMUNICATION, true);
cc->performRequests(requests, CL_DEFAULT_LONG_TIMEOUT, nrDone, Logger::COMMUNICATION, true);
// Now listen to the results:
if (!useMultiple) {
@ -1350,7 +1361,7 @@ int deleteDocumentOnCoordinator(
// Perform the requests
size_t nrDone = 0;
cc->performRequests(requests, CL_DEFAULT_TIMEOUT, nrDone, Logger::COMMUNICATION, true);
cc->performRequests(requests, CL_DEFAULT_LONG_TIMEOUT, nrDone, Logger::COMMUNICATION, true);
// Now listen to the results:
if (!useMultiple) {
@ -1400,7 +1411,7 @@ int deleteDocumentOnCoordinator(
// Perform the requests
size_t nrDone = 0;
cc->performRequests(requests, CL_DEFAULT_TIMEOUT, nrDone, Logger::COMMUNICATION, true);
cc->performRequests(requests, CL_DEFAULT_LONG_TIMEOUT, nrDone, Logger::COMMUNICATION, true);
// Now listen to the results:
if (!useMultiple) {
@ -2359,7 +2370,7 @@ int modifyDocumentOnCoordinator(
// Perform the requests
size_t nrDone = 0;
cc->performRequests(requests, CL_DEFAULT_TIMEOUT, nrDone, Logger::COMMUNICATION, true);
cc->performRequests(requests, CL_DEFAULT_LONG_TIMEOUT, nrDone, Logger::COMMUNICATION, true);
// Now listen to the results:
if (!useMultiple) {
@ -2415,7 +2426,7 @@ int modifyDocumentOnCoordinator(
// Perform the requests
size_t nrDone = 0;
cc->performRequests(requests, CL_DEFAULT_TIMEOUT, nrDone, Logger::COMMUNICATION, true);
cc->performRequests(requests, CL_DEFAULT_LONG_TIMEOUT, nrDone, Logger::COMMUNICATION, true);
// Now listen to the results:
if (!useMultiple) {

View File

@ -1372,16 +1372,35 @@ Result RestReplicationHandler::processRestoreDataBatch(
options.ignoreRevs = true;
options.isRestore = true;
options.waitForSync = false;
double startTime = TRI_microtime();
OperationResult opRes =
trx.remove(collectionName, oldBuilder.slice(), options);
double duration = TRI_microtime() - startTime;
if (opRes.fail()) {
LOG_TOPIC(WARN, Logger::CLUSTER)
<< "Could not delete " << oldBuilder.slice().length()
<< " documents for restore: "
<< opRes.result.errorMessage();
return opRes.result;
}
if (duration > 30) {
LOG_TOPIC(INFO, Logger::PERFORMANCE) << "Restored/deleted "
<< oldBuilder.slice().length() << " documents in time: " << duration
<< " seconds.";
}
} catch (arangodb::basics::Exception const& ex) {
LOG_TOPIC(WARN, Logger::CLUSTER)
<< "Could not delete documents for restore exception: "
<< ex.what();
return Result(ex.code(), ex.what());
} catch (std::exception const& ex) {
LOG_TOPIC(WARN, Logger::CLUSTER)
<< "Could not delete documents for restore exception: "
<< ex.what();
return Result(TRI_ERROR_INTERNAL, ex.what());
} catch (...) {
LOG_TOPIC(WARN, Logger::CLUSTER)
<< "Could not delete documents for restore exception.";
return Result(TRI_ERROR_INTERNAL);
}
@ -1436,15 +1455,34 @@ Result RestReplicationHandler::processRestoreDataBatch(
options.isRestore = true;
options.waitForSync = false;
options.overwrite = true;
double startTime = TRI_microtime();
opRes = trx.insert(collectionName, requestSlice, options);
double duration = TRI_microtime() - startTime;
if (opRes.fail()) {
LOG_TOPIC(WARN, Logger::CLUSTER)
<< "Could not insert " << requestSlice.length()
<< " documents for restore: "
<< opRes.result.errorMessage();
return opRes.result;
}
if (duration > 30) {
LOG_TOPIC(INFO, Logger::PERFORMANCE) << "Restored/inserted "
<< requestSlice.length() << " documents in time: " << duration
<< " seconds.";
}
} catch (arangodb::basics::Exception const& ex) {
LOG_TOPIC(WARN, Logger::CLUSTER)
<< "Could not insert documents for restore exception: "
<< ex.what();
return Result(ex.code(), ex.what());
} catch (std::exception const& ex) {
LOG_TOPIC(WARN, Logger::CLUSTER)
<< "Could not insert documents for restore exception: "
<< ex.what();
return Result(TRI_ERROR_INTERNAL, ex.what());
} catch (...) {
LOG_TOPIC(WARN, Logger::CLUSTER)
<< "Could not insert documents for restore exception.";
return Result(TRI_ERROR_INTERNAL);
}
@ -1490,15 +1528,34 @@ Result RestReplicationHandler::processRestoreDataBatch(
options.ignoreRevs = true;
options.isRestore = true;
options.waitForSync = false;
double startTime = TRI_microtime();
opRes = trx.replace(collectionName, builder.slice(), options);
double duration = TRI_microtime() - startTime;
if (opRes.fail()) {
LOG_TOPIC(WARN, Logger::CLUSTER)
<< "Could not replace " << builder.slice().length()
<< " documents for restore: "
<< opRes.result.errorMessage();
return opRes.result;
}
if (duration > 30) {
LOG_TOPIC(INFO, Logger::PERFORMANCE) << "Restored/replaced "
<< builder.slice().length() << " documents in time: " << duration
<< " seconds.";
}
} catch (arangodb::basics::Exception const& ex) {
LOG_TOPIC(WARN, Logger::CLUSTER)
<< "Could not replace documents for restore exception: "
<< ex.what();
return Result(ex.code(), ex.what());
} catch (std::exception const& ex) {
LOG_TOPIC(WARN, Logger::CLUSTER)
<< "Could not replace documents for restore exception: "
<< ex.what();
return Result(TRI_ERROR_INTERNAL, ex.what());
} catch (...) {
LOG_TOPIC(WARN, Logger::CLUSTER)
<< "Could not replace documents for restore exception.";
return Result(TRI_ERROR_INTERNAL);
}