1
0
Fork 0

Bugfix 3.4: Import would not report communication errors (#9499)

* arangoimport was neither reporting nor stopping on communications errors.  Add reporting of impacted lines for CSV imports.

* add fix notes to CHANGELOG

* correct warning from Windows build about size_t to int conversion
This commit is contained in:
Matthew Von-Maszewski 2019-07-18 13:45:09 +02:00 committed by GitHub
parent 94a84b6233
commit d8955bbb29
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 83 additions and 46 deletions

View File

@ -1,6 +1,9 @@
v3.4.8 (XXXX-XX-XX) v3.4.8 (XXXX-XX-XX)
------------------- -------------------
* arangoimport would not stop, much less report, communications errors. Add CSV reporting
of line numbers that are impacted during such errors
* Fixed a bug which could lead to some unnecessary HTTP requests during an AQL query in a cluster. * Fixed a bug which could lead to some unnecessary HTTP requests during an AQL query in a cluster.
Only occurs with views in the query. Only occurs with views in the query.

View File

@ -502,18 +502,17 @@ void ImportFeature::start() {
std::cout << std::endl; std::cout << std::endl;
// give information about import // give information about import (even if errors occur)
if (ok) { std::cout << "created: " << ih.getNumberCreated() << std::endl;
std::cout << "created: " << ih.getNumberCreated() << std::endl; std::cout << "warnings/errors: " << ih.getNumberErrors() << std::endl;
std::cout << "warnings/errors: " << ih.getNumberErrors() << std::endl; std::cout << "updated/replaced: " << ih.getNumberUpdated() << std::endl;
std::cout << "updated/replaced: " << ih.getNumberUpdated() << std::endl; std::cout << "ignored: " << ih.getNumberIgnored() << std::endl;
std::cout << "ignored: " << ih.getNumberIgnored() << std::endl;
if (_typeImport == "csv" || _typeImport == "tsv") { if (_typeImport == "csv" || _typeImport == "tsv") {
std::cout << "lines read: " << ih.getReadLines() << std::endl; std::cout << "lines read: " << ih.getReadLines() << std::endl;
} }
} else { if (!ok) {
auto const& msgs = ih.getErrorMessages(); auto const& msgs = ih.getErrorMessages();
if (!msgs.empty()) { if (!msgs.empty()) {
LOG_TOPIC(ERR, arangodb::Logger::FIXME) << "error message(s):"; LOG_TOPIC(ERR, arangodb::Logger::FIXME) << "error message(s):";

View File

@ -858,7 +858,7 @@ void ImportHelper::sendCsvBuffer() {
SenderThread* t = findIdleSender(); SenderThread* t = findIdleSender();
if (t != nullptr) { if (t != nullptr) {
uint64_t tmp_length = _outputBuffer.length(); uint64_t tmp_length = _outputBuffer.length();
t->sendData(url, &_outputBuffer); t->sendData(url, &_outputBuffer, _rowOffset +1, _rowsRead);
addPeriodByteCount(tmp_length + url.length()); addPeriodByteCount(tmp_length + url.length());
} }

View File

@ -50,6 +50,8 @@ SenderThread::SenderThread(std::unique_ptr<httpclient::SimpleHttpClient> client,
_hasError(false), _hasError(false),
_idle(true), _idle(true),
_ready(false), _ready(false),
_lowLineNumber(0),
_highLineNumber(0),
_stats(stats) {} _stats(stats) {}
SenderThread::~SenderThread() { shutdown(); } SenderThread::~SenderThread() { shutdown(); }
@ -62,7 +64,8 @@ void SenderThread::beginShutdown() {
guard.broadcast(); guard.broadcast();
} }
void SenderThread::sendData(std::string const& url, arangodb::basics::StringBuffer* data) { void SenderThread::sendData(std::string const& url, arangodb::basics::StringBuffer* data,
size_t lowLine, size_t highLine) {
TRI_ASSERT(_idle && !_hasError); TRI_ASSERT(_idle && !_hasError);
_url = url; _url = url;
_data.swap(data); _data.swap(data);
@ -70,12 +73,25 @@ void SenderThread::sendData(std::string const& url, arangodb::basics::StringBuff
// wake up the thread that may be waiting in run() // wake up the thread that may be waiting in run()
CONDITION_LOCKER(guard, _condition); CONDITION_LOCKER(guard, _condition);
_idle = false; _idle = false;
_lowLineNumber = lowLine;
_highLineNumber = highLine;
guard.broadcast(); guard.broadcast();
} }
bool SenderThread::hasError() { bool SenderThread::hasError() {
CONDITION_LOCKER(guard, _condition); bool retFlag = false;
return _hasError; {
// flag reset after read to prevent multiple reporting
// of errors in ImportHelper
CONDITION_LOCKER(guard, _condition);
retFlag = _hasError;
_hasError = false;
}
if (retFlag) {
beginShutdown();
}
return retFlag;
} }
bool SenderThread::isReady() { bool SenderThread::isReady() {
@ -138,6 +154,8 @@ void SenderThread::run() {
} }
void SenderThread::handleResult(httpclient::SimpleHttpResult* result) { void SenderThread::handleResult(httpclient::SimpleHttpResult* result) {
bool haveBody = false;
if (result == nullptr) { if (result == nullptr) {
return; return;
} }
@ -145,56 +163,70 @@ void SenderThread::handleResult(httpclient::SimpleHttpResult* result) {
std::shared_ptr<VPackBuilder> parsedBody; std::shared_ptr<VPackBuilder> parsedBody;
try { try {
parsedBody = result->getBodyVelocyPack(); parsedBody = result->getBodyVelocyPack();
haveBody = true;
} catch (...) { } catch (...) {
// No action required // no body, likely error situation
return;
} }
VPackSlice const body = parsedBody->slice();
// error details if (haveBody) {
VPackSlice const details = body.get("details"); VPackSlice const body = parsedBody->slice();
if (details.isArray()) { // error details
for (VPackSlice const& detail : VPackArrayIterator(details)) { VPackSlice const details = body.get("details");
if (detail.isString()) {
LOG_TOPIC(WARN, arangodb::Logger::FIXME) << "" << detail.copyString(); if (details.isArray()) {
for (VPackSlice const& detail : VPackArrayIterator(details)) {
if (detail.isString()) {
LOG_TOPIC(WARN, arangodb::Logger::FIXME) << "" << detail.copyString();
}
} }
} }
}
{ {
// first update all the statistics // first update all the statistics
MUTEX_LOCKER(guard, _stats->_mutex); MUTEX_LOCKER(guard, _stats->_mutex);
// look up the "created" flag // look up the "created" flag
_stats->_numberCreated += _stats->_numberCreated +=
arangodb::basics::VelocyPackHelper::getNumericValue<size_t>(body, arangodb::basics::VelocyPackHelper::getNumericValue<size_t>(body,
"created", 0); "created", 0);
// look up the "errors" flag // look up the "errors" flag
_stats->_numberErrors += _stats->_numberErrors +=
arangodb::basics::VelocyPackHelper::getNumericValue<size_t>(body, arangodb::basics::VelocyPackHelper::getNumericValue<size_t>(body,
"errors", 0); "errors", 0);
// look up the "updated" flag // look up the "updated" flag
_stats->_numberUpdated += _stats->_numberUpdated +=
arangodb::basics::VelocyPackHelper::getNumericValue<size_t>(body, arangodb::basics::VelocyPackHelper::getNumericValue<size_t>(body,
"updated", 0); "updated", 0);
// look up the "ignored" flag // look up the "ignored" flag
_stats->_numberIgnored += _stats->_numberIgnored +=
arangodb::basics::VelocyPackHelper::getNumericValue<size_t>(body, arangodb::basics::VelocyPackHelper::getNumericValue<size_t>(body,
"ignored", 0); "ignored", 0);
}
// get the "error" flag. This returns a pointer, not a copy
if (arangodb::basics::VelocyPackHelper::getBooleanValue(body, "error", false)) {
// get the error message
VPackSlice const errorMessage = body.get("errorMessage");
if (errorMessage.isString()) {
_errorMessage = errorMessage.copyString();
} }
// will trigger the waiting ImportHelper thread to cancel the import // get the "error" flag. This returns a pointer, not a copy
if (arangodb::basics::VelocyPackHelper::getBooleanValue(body, "error", false)) {
// get the error message
VPackSlice const errorMessage = body.get("errorMessage");
if (errorMessage.isString()) {
_errorMessage = errorMessage.copyString();
}
// will trigger the waiting ImportHelper thread to cancel the import
_hasError = true;
}
} // if
if (!_hasError && !result->getHttpReturnMessage().empty() && !result->isComplete()) {
_errorMessage = result->getHttpReturnMessage();
if (0 != _lowLineNumber || 0 != _highLineNumber) {
LOG_TOPIC(WARN, arangodb::Logger::FIXME) << "Error left import lines "
<< _lowLineNumber << " through "
<< _highLineNumber
<< " in unknown state";
} // if
_hasError = true; _hasError = true;
} } // if
} }

View File

@ -55,7 +55,8 @@ class SenderThread final : public arangodb::Thread {
/// @brief imports a delimited file /// @brief imports a delimited file
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
void sendData(std::string const& url, basics::StringBuffer* sender); void sendData(std::string const& url, basics::StringBuffer* sender,
size_t lowLine = 0, size_t highLine = 0);
bool hasError(); bool hasError();
/// Ready to start sending /// Ready to start sending
@ -80,6 +81,8 @@ class SenderThread final : public arangodb::Thread {
bool _hasError; bool _hasError;
bool _idle; bool _idle;
bool _ready; bool _ready;
size_t _lowLineNumber;
size_t _highLineNumber;
ImportStatistics* _stats; ImportStatistics* _stats;
std::string _errorMessage; std::string _errorMessage;