1
0
Fork 0

fix threading problems in arangoimp

This commit is contained in:
jsteemann 2017-06-02 13:58:18 +02:00
parent 33c577dd80
commit a01716a2af
3 changed files with 53 additions and 13 deletions

View File

@ -168,6 +168,19 @@ ImportHelper::ImportHelper(ClientFeature const* client,
_senderThreads.emplace_back(new SenderThread(std::move(http), &_stats));
_senderThreads.back()->start();
}
// wait until all sender threads are ready
while (true) {
uint32_t numReady = 0;
for (auto const& t : _senderThreads) {
if (t->isReady()) {
numReady++;
}
}
if (numReady == _senderThreads.size()) {
break;
}
}
}
ImportHelper::~ImportHelper() {
@ -821,7 +834,7 @@ SenderThread* ImportHelper::findSender() {
_hasError = true;
_errorMessage = t->errorMessage();
return nullptr;
} else if (t->idle()) {
} else if (t->isIdle()) {
return t.get();
}
}
@ -834,7 +847,7 @@ void ImportHelper::waitForSenders() {
while (!_senderThreads.empty()) {
uint32_t numIdle = 0;
for (auto const& t : _senderThreads) {
if (t->idle() || t->hasError()) {
if (t->isDone()) {
numIdle++;
}
}

View File

@ -46,6 +46,7 @@ SenderThread::SenderThread(
_data(TRI_UNKNOWN_MEM_ZONE, false),
_hasError(false),
_idle(true),
_ready(false),
_stats(stats) {}
SenderThread::~SenderThread() {
@ -73,31 +74,52 @@ void SenderThread::sendData(std::string const& url,
guard.broadcast();
}
bool SenderThread::hasError() {
CONDITION_LOCKER(guard, _condition);
return _hasError;
}
bool SenderThread::isReady() {
CONDITION_LOCKER(guard, _condition);
return _ready;
}
bool SenderThread::isIdle() {
CONDITION_LOCKER(guard, _condition);
return _idle;
}
bool SenderThread::isDone() {
CONDITION_LOCKER(guard, _condition);
return _idle || _hasError;
}
void SenderThread::run() {
while (!isStopping() && !_hasError) {
{
CONDITION_LOCKER(guard, _condition);
guard.wait();
_ready = true;
if (_idle) {
guard.wait();
}
}
if (isStopping()) {
CONDITION_LOCKER(guard, _condition);
_idle = true;
return;
break;
}
try {
if (_data.length() > 0) {
TRI_ASSERT(!_idle && !_url.empty());
std::unordered_map<std::string, std::string> headerFields;
std::unique_ptr<httpclient::SimpleHttpResult> result(
_client->request(rest::RequestType::POST, _url, _data.c_str(),
_data.length(), headerFields));
_data.length()));
handleResult(result.get());
_url.clear();
_data.reset();
}
CONDITION_LOCKER(guard, _condition);
_idle = true;
} catch (...) {
@ -106,6 +128,9 @@ void SenderThread::run() {
_idle = true;
}
}
CONDITION_LOCKER(guard, _condition);
TRI_ASSERT(_idle);
}
void SenderThread::handleResult(httpclient::SimpleHttpResult* result) {

View File

@ -57,9 +57,10 @@ class SenderThread : public arangodb::Thread {
void sendData(std::string const& url, basics::StringBuffer* sender);
bool idle() const { return _idle; }
bool hasError() const { return _hasError; }
bool hasError();
bool isReady();
bool isIdle();
bool isDone();
std::string const& errorMessage() const { return _errorMessage; }
@ -73,8 +74,9 @@ class SenderThread : public arangodb::Thread {
httpclient::SimpleHttpClient* _client;
std::string _url;
basics::StringBuffer _data;
bool _hasError = false;
bool _idle = true;
bool _hasError;
bool _idle;
bool _ready;
ImportStatistics* _stats;
std::string _errorMessage;