1
0
Fork 0

Bugfix 3.3: Import would not report communication errors (#9501)

* arangoimport was neither reporting not stopping on communications errors.  Add reporting of impacted lines for CSV imports.

* correct lines missing from initial backport
This commit is contained in:
Matthew Von-Maszewski 2019-07-18 15:34:42 +02:00 committed by GitHub
parent 9a0bd3222e
commit 9ff3aeacfa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 95 additions and 54 deletions

View File

@ -1,7 +1,10 @@
v3.3.24 (XXXX-XX-XX)
--------------------
* changed several internal VelocyPack comparisons in the cluster to not check for bytewise identity,
* arangoimport would not stop, much less report, communications errors. Add CSV reporting
of line numbers that are impacted during such errors
* changed several internal VelocyPack comparisons in the cluster to not check for bytewise identity,
but for logical intentity of objects
* fix handling 0 byte-WAL files in RocksDB engine when encryption is turned on. From now
@ -11,13 +14,13 @@ v3.3.24 (XXXX-XX-XX)
* Fixed parsing of ArangoDB config files with inlined comments. Previous versions didn't handle
line comments properly if they were appended to an otherwise valid option value.
For example, the comment in the line
max-total-wal-size = 1024000 # 1M
was not ignored and made part of the value. In the end, the value was interpreted as if
max-total-wal-size = 10240001000000
was specified.
@ -102,7 +105,7 @@ v3.3.22 (2019-02-01)
* data masking: better documentation, fixed default phone number,
changed default range to -100 and 100 for integer masking function
* added configurable masking of dumped data via `arangodump` tool to
* added configurable masking of dumped data via `arangodump` tool to
obfuscate exported sensible data
* updated velocypack library

View File

@ -452,21 +452,24 @@ void ImportFeature::start() {
std::cout << std::endl;
// give information about import
if (ok) {
std::cout << "created: " << ih.getNumberCreated() << std::endl;
std::cout << "warnings/errors: " << ih.getNumberErrors() << std::endl;
std::cout << "updated/replaced: " << ih.getNumberUpdated() << std::endl;
std::cout << "ignored: " << ih.getNumberIgnored() << std::endl;
// give information about import (even if errors occur)
std::cout << "created: " << ih.getNumberCreated() << std::endl;
std::cout << "warnings/errors: " << ih.getNumberErrors() << std::endl;
std::cout << "updated/replaced: " << ih.getNumberUpdated() << std::endl;
std::cout << "ignored: " << ih.getNumberIgnored() << std::endl;
if (_typeImport == "csv" || _typeImport == "tsv") {
std::cout << "lines read: " << ih.getReadLines() << std::endl;
}
if (_typeImport == "csv" || _typeImport == "tsv") {
std::cout << "lines read: " << ih.getReadLines() << std::endl;
}
} else {
LOG_TOPIC(ERR, arangodb::Logger::FIXME) << "error message(s):";
for (std::string const& msg : ih.getErrorMessages()) {
LOG_TOPIC(ERR, arangodb::Logger::FIXME) << msg;
if (!ok) {
auto const& msgs = ih.getErrorMessages();
if (!msgs.empty()) {
LOG_TOPIC(ERR, arangodb::Logger::FIXME) << "error message(s):";
for (std::string const& msg : msgs) {
LOG_TOPIC(ERR, arangodb::Logger::FIXME) << msg;
}
}
}
} catch (std::exception const& ex) {

View File

@ -815,7 +815,7 @@ void ImportHelper::sendCsvBuffer() {
SenderThread* t = findIdleSender();
if (t != nullptr) {
uint64_t tmp_length = _outputBuffer.length();
t->sendData(url, &_outputBuffer);
t->sendData(url, &_outputBuffer, _rowOffset +1, _rowsRead);
addPeriodByteCount(tmp_length + url.length());
}

View File

@ -47,6 +47,8 @@ SenderThread::SenderThread(std::unique_ptr<httpclient::SimpleHttpClient>&& clien
_hasError(false),
_idle(true),
_ready(false),
_lowLineNumber(0),
_highLineNumber(0),
_stats(stats) {}
SenderThread::~SenderThread() {
@ -62,7 +64,8 @@ void SenderThread::beginShutdown() {
guard.broadcast();
}
void SenderThread::sendData(std::string const& url, arangodb::basics::StringBuffer* data) {
void SenderThread::sendData(std::string const& url, arangodb::basics::StringBuffer* data,
size_t lowLine, size_t highLine) {
TRI_ASSERT(_idle && !_hasError);
_url = url;
_data.swap(data);
@ -70,12 +73,25 @@ void SenderThread::sendData(std::string const& url, arangodb::basics::StringBuff
// wake up the thread that may be waiting in run()
CONDITION_LOCKER(guard, _condition);
_idle = false;
_lowLineNumber = lowLine;
_highLineNumber = highLine;
guard.broadcast();
}
bool SenderThread::hasError() {
CONDITION_LOCKER(guard, _condition);
return _hasError;
bool retFlag = false;
{
// flag reset after read to prevent multiple reporting
// of errors in ImportHelper
CONDITION_LOCKER(guard, _condition);
retFlag = _hasError;
_hasError = false;
}
if (retFlag) {
beginShutdown();
}
return retFlag;
}
bool SenderThread::isReady() {
@ -134,6 +150,8 @@ void SenderThread::run() {
}
void SenderThread::handleResult(httpclient::SimpleHttpResult* result) {
bool haveBody = false;
if (result == nullptr) {
return;
}
@ -141,56 +159,70 @@ void SenderThread::handleResult(httpclient::SimpleHttpResult* result) {
std::shared_ptr<VPackBuilder> parsedBody;
try {
parsedBody = result->getBodyVelocyPack();
haveBody = true;
} catch (...) {
// No action required
return;
// no body, likely error situation
}
VPackSlice const body = parsedBody->slice();
// error details
VPackSlice const details = body.get("details");
if (haveBody) {
VPackSlice const body = parsedBody->slice();
if (details.isArray()) {
for (VPackSlice const& detail : VPackArrayIterator(details)) {
if (detail.isString()) {
LOG_TOPIC(WARN, arangodb::Logger::FIXME) << "" << detail.copyString();
// error details
VPackSlice const details = body.get("details");
if (details.isArray()) {
for (VPackSlice const& detail : VPackArrayIterator(details)) {
if (detail.isString()) {
LOG_TOPIC(WARN, arangodb::Logger::FIXME) << "" << detail.copyString();
}
}
}
}
{
// first update all the statistics
MUTEX_LOCKER(guard, _stats->_mutex);
// look up the "created" flag
_stats->_numberCreated +=
{
// first update all the statistics
MUTEX_LOCKER(guard, _stats->_mutex);
// look up the "created" flag
_stats->_numberCreated +=
arangodb::basics::VelocyPackHelper::getNumericValue<size_t>(body,
"created", 0);
// look up the "errors" flag
_stats->_numberErrors +=
// look up the "errors" flag
_stats->_numberErrors +=
arangodb::basics::VelocyPackHelper::getNumericValue<size_t>(body,
"errors", 0);
// look up the "updated" flag
_stats->_numberUpdated +=
// look up the "updated" flag
_stats->_numberUpdated +=
arangodb::basics::VelocyPackHelper::getNumericValue<size_t>(body,
"updated", 0);
// look up the "ignored" flag
_stats->_numberIgnored +=
// look up the "ignored" flag
_stats->_numberIgnored +=
arangodb::basics::VelocyPackHelper::getNumericValue<size_t>(body,
"ignored", 0);
}
// get the "error" flag. This returns a pointer, not a copy
if (arangodb::basics::VelocyPackHelper::getBooleanValue(body, "error", false)) {
// get the error message
VPackSlice const errorMessage = body.get("errorMessage");
if (errorMessage.isString()) {
_errorMessage = errorMessage.copyString();
}
// will trigger the waiting ImportHelper thread to cancel the import
// get the "error" flag. This returns a pointer, not a copy
if (arangodb::basics::VelocyPackHelper::getBooleanValue(body, "error", false)) {
// get the error message
VPackSlice const errorMessage = body.get("errorMessage");
if (errorMessage.isString()) {
_errorMessage = errorMessage.copyString();
}
// will trigger the waiting ImportHelper thread to cancel the import
_hasError = true;
}
} // if
if (!_hasError && !result->getHttpReturnMessage().empty() && !result->isComplete()) {
_errorMessage = result->getHttpReturnMessage();
if (0 != _lowLineNumber || 0 != _highLineNumber) {
LOG_TOPIC(WARN, arangodb::Logger::FIXME) << "Error left import lines "
<< _lowLineNumber << " through "
<< _highLineNumber
<< " in unknown state";
} // if
_hasError = true;
}
} // if
}

View File

@ -55,7 +55,8 @@ class SenderThread : public arangodb::Thread {
/// @brief imports a delimited file
//////////////////////////////////////////////////////////////////////////////
void sendData(std::string const& url, basics::StringBuffer* sender);
void sendData(std::string const& url, basics::StringBuffer* sender,
size_t lowLine = 0, size_t highLine = 0);
bool hasError();
/// Ready to start sending
@ -80,6 +81,8 @@ class SenderThread : public arangodb::Thread {
bool _hasError;
bool _idle;
bool _ready;
size_t _lowLineNumber;
size_t _highLineNumber;
ImportStatistics* _stats;
std::string _errorMessage;