diff --git a/arangosh/Import/ImportHelper.cpp b/arangosh/Import/ImportHelper.cpp index 4f14b51b75..0dc1ada99f 100644 --- a/arangosh/Import/ImportHelper.cpp +++ b/arangosh/Import/ImportHelper.cpp @@ -168,6 +168,19 @@ ImportHelper::ImportHelper(ClientFeature const* client, _senderThreads.emplace_back(new SenderThread(std::move(http), &_stats)); _senderThreads.back()->start(); } + + // wait until all sender threads are ready + while (true) { + uint32_t numReady = 0; + for (auto const& t : _senderThreads) { + if (t->isReady()) { + numReady++; + } + } + if (numReady == _senderThreads.size()) { + break; + } + } } ImportHelper::~ImportHelper() { @@ -821,7 +834,7 @@ SenderThread* ImportHelper::findSender() { _hasError = true; _errorMessage = t->errorMessage(); return nullptr; - } else if (t->idle()) { + } else if (t->isIdle()) { return t.get(); } } @@ -834,7 +847,7 @@ void ImportHelper::waitForSenders() { while (!_senderThreads.empty()) { uint32_t numIdle = 0; for (auto const& t : _senderThreads) { - if (t->idle() || t->hasError()) { + if (t->isDone()) { numIdle++; } } diff --git a/arangosh/Import/SenderThread.cpp b/arangosh/Import/SenderThread.cpp index bf93ae7602..ea3493eebd 100644 --- a/arangosh/Import/SenderThread.cpp +++ b/arangosh/Import/SenderThread.cpp @@ -46,6 +46,7 @@ SenderThread::SenderThread( _data(TRI_UNKNOWN_MEM_ZONE, false), _hasError(false), _idle(true), + _ready(false), _stats(stats) {} SenderThread::~SenderThread() { @@ -73,31 +74,52 @@ void SenderThread::sendData(std::string const& url, guard.broadcast(); } +bool SenderThread::hasError() { + CONDITION_LOCKER(guard, _condition); + return _hasError; +} + +bool SenderThread::isReady() { + CONDITION_LOCKER(guard, _condition); + return _ready; +} + +bool SenderThread::isIdle() { + CONDITION_LOCKER(guard, _condition); + return _idle; +} + +bool SenderThread::isDone() { + CONDITION_LOCKER(guard, _condition); + return _idle || _hasError; +} + void SenderThread::run() { while (!isStopping() && !_hasError) { { CONDITION_LOCKER(guard, _condition); - guard.wait(); + _ready = true; + if (_idle) { + guard.wait(); + } } if (isStopping()) { - CONDITION_LOCKER(guard, _condition); - _idle = true; - return; + break; } try { if (_data.length() > 0) { TRI_ASSERT(!_idle && !_url.empty()); - std::unordered_map headerFields; std::unique_ptr result( _client->request(rest::RequestType::POST, _url, _data.c_str(), - _data.length(), headerFields)); + _data.length())); handleResult(result.get()); _url.clear(); _data.reset(); } + CONDITION_LOCKER(guard, _condition); _idle = true; } catch (...) { @@ -106,6 +128,9 @@ void SenderThread::run() { _idle = true; } } + + CONDITION_LOCKER(guard, _condition); + TRI_ASSERT(_idle); } void SenderThread::handleResult(httpclient::SimpleHttpResult* result) { diff --git a/arangosh/Import/SenderThread.h b/arangosh/Import/SenderThread.h index 973d700b7e..2f094f54d8 100644 --- a/arangosh/Import/SenderThread.h +++ b/arangosh/Import/SenderThread.h @@ -57,9 +57,10 @@ class SenderThread : public arangodb::Thread { void sendData(std::string const& url, basics::StringBuffer* sender); - bool idle() const { return _idle; } - - bool hasError() const { return _hasError; } + bool hasError(); + bool isReady(); + bool isIdle(); + bool isDone(); std::string const& errorMessage() const { return _errorMessage; } @@ -73,8 +74,9 @@ class SenderThread : public arangodb::Thread { httpclient::SimpleHttpClient* _client; std::string _url; basics::StringBuffer _data; - bool _hasError = false; - bool _idle = true; + bool _hasError; + bool _idle; + bool _ready; ImportStatistics* _stats; std::string _errorMessage;