1
0
Fork 0

Add pacing logic to arangoimp (#5238)

* initial check-in of working auto block size tuning for import.  Needs clean-up.
* partial fix in this branch to enable testing, awaiting better fix in different branch.
* move pacing code from findIdleSender to AutoTuneThread object.
* move pacing code from findIdleSender to AutoTuneThread object.  clean up dead code.  add comments about algorithm.
* edits to algorithm discussion
* correct logging level.  add CHANGELOG entry.
* update comment concerning proper usage of shutdown()
* initialize new static member MaxBatchSize.  Correct sendCsvBuffer() call to addPeriodByteCount() since _outputBuffer drained by sendData().
* remove redundant semicolons.  add new static member MaxBatchSize.
* move MaxBatchSize to ImportHelper so both ImportFeature object and AutoTuneThread object can access it.
* minor updates to comments and statics
* add pacing discussion to import manual page
* minor edit
This commit is contained in:
Matthew Von-Maszewski 2018-05-02 16:34:04 -04:00 committed by Max Neunhöffer
parent 01f3223c06
commit 9ff6a41236
10 changed files with 318 additions and 25 deletions

View File

@ -1,5 +1,7 @@
devel
-----
* PR #5238: Create a default pacing algorithm for arangoimport to avoid TimeoutErrors
on VMs with limited disk throughput
* Starting with coordinators and DB servers with different storage engines is unsupported.
Doing it anyway will now produce a warning on startup

View File

@ -90,7 +90,7 @@ be inhomogeneous.
Please note that by default, _arangoimport_ will import data into the specified
collection in the default database (*_system*). To specify a different database,
use the *--server.database* option when invoking _arangoimport_. If you want to
use the *--server.database* option when invoking _arangoimport_. If you want to
import into a nonexistent database you need to pass *--create-database true*.
Note *--create-database* defaults to *false*
@ -165,7 +165,7 @@ data records and will be imported.
The CSV import requires the data to have a homogeneous structure. All records
must have exactly the same amount of columns as there are headers. By default,
lines with a different number of values will not be imported and there will be
lines with a different number of values will not be imported and there will be
warnings for them. To still import lines with less values than in the header,
there is the *--ignore-missing* option. If set to true, lines that have a
different amount of fields will be imported. In this case only those attributes
@ -450,3 +450,43 @@ Importing the following document will then create an edge between *users/1234* a
```js
{ "_from" : "1234", "_to" : "4321", "desc" : "users/1234 is connected to products/4321" }
```
### Arangoimport with busy or low throughput disk subsystems
Arangoimport has an automatic pacing algorithm that limits how fast
data is sent to the ArangoDB servers. This pacing algorithm exists to
prevent the import operation from failing due to slow responses.
Google Compute and other VM providers limit the throughput of disk
devices. Google's limit is more strict for smaller disk rentals, than
for larger. Specifically, a user could choose the smallest disk space
and be limited to 3 Mbytes per second.
The pacing algorithm adjusts the transmit block size dynamically based
upon the actual throughput of the server over the last 20
seconds. Further, each thread delivers its portion of the data in
mostly non-overlapping chunks. The thread timing creates intentional
windows of non-import activity to allow the server extra time for meta
operations.
The pacing algorithm works successfully with mmfiles with disks
limited to read and write throughput as small as 1 Mbyte per
second. The algorithm works successfully with rocksdb with disks
limited to read and write throughput as small as 3 Mbyte per second.
This algorithm will slow import of an unlimited (really fast) disk
array by almost 25%. Raising the number of threads via the "--threads
X" command line to any value of X greater than 2 resolves the
performance loss.
The algorithm is disabled by manually specifying any
"--batch-size". 16777216 is the previous default for --batch-size.
Arangoimport previously defaulted to a transmit block size of 16
Mbytes per thread, as often as possible (default of 2 threads). The
transmitted data would back-up on the server and result in a
TimeoutError. The TimeoutError is very frustrating to new users. The
users were required to learn how to lower the default block size and
default thread count ... and the new parameters would still not be
guaranteed to work.

View File

@ -174,6 +174,7 @@ endif ()
add_executable(${BIN_ARANGOIMPORT}
${ProductVersionFiles_arangoimport}
${PROJECT_SOURCE_DIR}/lib/Basics/WorkMonitorDummy.cpp
Import/AutoTuneThread.cpp
Import/ImportFeature.cpp
Import/ImportHelper.cpp
Import/SenderThread.cpp
@ -283,6 +284,7 @@ endif ()
add_executable(${BIN_ARANGOSH}
${ProductVersionFiles_arangosh}
${PROJECT_SOURCE_DIR}/lib/Basics/WorkMonitorDummy.cpp
Import/AutoTuneThread.cpp
Import/ImportHelper.cpp
Import/SenderThread.cpp
Shell/ClientFeature.cpp

View File

@ -0,0 +1,138 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2018 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Matthew Von-Maszewski
////////////////////////////////////////////////////////////////////////////////
#include <iostream>
#include <thread>
#include "AutoTuneThread.h"
#include "Basics/ConditionLocker.h"
#include "ImportFeature.h"
#include "ImportHelper.h"
using namespace arangodb;
using namespace arangodb::import;
////////////////////////////////////////////////////////////////////////////////
// Goals:
// 1. compute current one second throughput of import
// 2. spread byte count of one second throughput across sender threads
// 3. create "space" between sender execution to give server time for other activities
//
// The code collects the total count of bytes absorbed for ten seconds, then averages
// that amount with the total from the previous 10 seconds. The per second per thread pace
// is therefore average divided by the thread count divided by 10.
//
// The pace starts "slow", 1 megabyte per second. Each recalculation of pace adds
// a 20% growth factor above the actual calculation from average bytes consumed.
//
// The pacing code also notices when threads are completing quickly. It will release
// a new thread early in such cases to again encourage rate growth.
//
////////////////////////////////////////////////////////////////////////////////
AutoTuneThread::AutoTuneThread(ImportHelper & importHelper)
: Thread("AutoTuneThread"),
_importHelper(importHelper),
_nextSend(std::chrono::steady_clock::now()),
_pace(std::chrono::microseconds(1000000 / importHelper.getThreadCount()))
{}
AutoTuneThread::~AutoTuneThread() {
shutdown();
}
void AutoTuneThread::beginShutdown() {
Thread::beginShutdown();
// wake up the thread that may be waiting in run()
CONDITION_LOCKER(guard, _condition);
guard.broadcast();
}
void AutoTuneThread::run() {
while (!isStopping()) {
{
CONDITION_LOCKER(guard, _condition);
guard.wait(std::chrono::seconds(10));
}
if (!isStopping()) {
// getMaxUploadSize() is per thread
uint64_t current_max = _importHelper.getMaxUploadSize();
current_max *= _importHelper.getThreadCount();
uint64_t ten_second_actual = _importHelper.rotatePeriodByteCount();
uint64_t new_max = current_max;
// is current_max way too big
if (ten_second_actual < current_max && 10 < ten_second_actual) {
new_max = ten_second_actual / 10;
} else if ( ten_second_actual <= 10 ) {
new_max = current_max / 10;
} else {
new_max = (current_max + ten_second_actual / 10) / 2;
}
// grow number slowly if possible (20%)
new_max += new_max/5;
// make "per thread"
new_max /= _importHelper.getThreadCount();
// notes in Import mention an internal limit of 768MBytes
if ((arangodb::import::ImportHelper::MaxBatchSize) < new_max) {
new_max = arangodb::import::ImportHelper::MaxBatchSize;
}
LOG_TOPIC(DEBUG, arangodb::Logger::FIXME)
<< "Current: " << current_max
<< ", ten_sec: " << ten_second_actual
<< ", new_max: " << new_max;
_importHelper.setMaxUploadSize(new_max);
}
}
}
void AutoTuneThread::paceSends() {
auto now = std::chrono::steady_clock::now();
bool next_reset(false);
// has _nextSend time_point already passed?
// if so, move to next increment of _pace to force wait
while(_nextSend <= now) {
_nextSend += _pace;
next_reset = true;
}
std::this_thread::sleep_until(_nextSend);
// if the previous send thread thread was found really quickly,
// assume arangodb is absorbing data faster than current rate.
// try doubling rate by halfing pace time for subsequent send.
if (!next_reset && _pace/2 < _nextSend - now )
_nextSend = _nextSend + _pace/2;
else
_nextSend = _nextSend + _pace;
} // AutoTuneThread::paceSends

View File

@ -0,0 +1,61 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2018 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Matthew Von-Maszewski
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGODB_IMPORT_AUTOTUNE_THREAD_H
#define ARANGODB_IMPORT_AUTOTUNE_THREAD_H 1
#include "Basics/ConditionVariable.h"
#include "Basics/Thread.h"
#include "Logger/Logger.h"
namespace arangodb {
namespace import {
class ImportHelper;
class AutoTuneThread : public arangodb::Thread {
private:
AutoTuneThread(AutoTuneThread const&) = delete;
AutoTuneThread& operator=(AutoTuneThread const&) = delete;
public:
explicit AutoTuneThread(ImportHelper & importHelper);
~AutoTuneThread();
void beginShutdown() override;
void paceSends();
protected:
void run() override;
ImportHelper & _importHelper;
basics::ConditionVariable _condition;
std::chrono::steady_clock::time_point _nextSend;
std::chrono::microseconds _pace;
};
}
}
#endif

View File

@ -47,7 +47,8 @@ ImportFeature::ImportFeature(application_features::ApplicationServer* server,
_filename(""),
_useBackslash(false),
_convert(true),
_chunkSize(1024 * 1024 * 16),
_autoChunkSize(true),
_chunkSize(1024 * 1024 * 1),
_threadCount(2),
_collectionName(""),
_fromCollectionPrefix(""),
@ -197,15 +198,16 @@ void ImportFeature::validateOptions(
FATAL_ERROR_EXIT();
}
static unsigned const MaxBatchSize = 768 * 1024 * 1024;
// _chunkSize is dynamic ... unless user explicitly sets it
_autoChunkSize = !options->processingResult().touched("--batch-size");
if (_chunkSize > MaxBatchSize) {
if (_chunkSize > arangodb::import::ImportHelper::MaxBatchSize) {
// it's not sensible to raise the batch size beyond this value
// because the server has a built-in limit for the batch size too
// and will reject bigger HTTP request bodies
LOG_TOPIC(WARN, arangodb::Logger::FIXME) << "capping --batch-size value to "
<< MaxBatchSize;
_chunkSize = MaxBatchSize;
<< arangodb::import::ImportHelper::MaxBatchSize;
_chunkSize = arangodb::import::ImportHelper::MaxBatchSize;
}
if (_threadCount < 1) {
@ -214,11 +216,12 @@ void ImportFeature::validateOptions(
<< 1;
_threadCount = 1;
}
if (_threadCount > TRI_numberProcessors()) {
// it's not sensible to use just one thread
if (_threadCount > TRI_numberProcessors()*2) {
// it's not sensible to use just one thread ...
// and import's CPU usage is negligible, real limit is cluster cores
LOG_TOPIC(WARN, arangodb::Logger::FIXME) << "capping --threads value to "
<< TRI_numberProcessors();
_threadCount = (uint32_t)TRI_numberProcessors();
<< TRI_numberProcessors()*2;
_threadCount = (uint32_t)TRI_numberProcessors()*2;
}
for (auto const& it : _translations) {
@ -295,7 +298,7 @@ void ImportFeature::start() {
auto versionString = _httpClient->getServerVersion(&err);
auto dbName = client->databaseName();
bool createdDatabase = false;
auto successfulConnection = [&](){
std::cout << "Connected to ArangoDB '"
<< _httpClient->getEndpointSpecification() << "', version "
@ -379,7 +382,7 @@ void ImportFeature::start() {
SimpleHttpClientParams params = _httpClient->params();
arangodb::import::ImportHelper ih(client, client->endpoint(), params,
_chunkSize, _threadCount);
_chunkSize, _threadCount, _autoChunkSize);
// create colletion
if (_createCollection) {

View File

@ -51,6 +51,7 @@ class ImportFeature final : public application_features::ApplicationFeature,
std::string _filename;
bool _useBackslash;
bool _convert;
bool _autoChunkSize;
uint64_t _chunkSize;
uint32_t _threadCount;
std::string _collectionName;

View File

@ -46,6 +46,7 @@ using namespace arangodb;
using namespace arangodb::basics;
using namespace arangodb::httpclient;
////////////////////////////////////////////////////////////////////////////////
/// @brief helper function to determine if a field value is an integer
/// this function is here to avoid usage of regexes, which are too slow
@ -135,6 +136,13 @@ namespace import {
double const ImportHelper::ProgressStep = 3.0;
////////////////////////////////////////////////////////////////////////////////
/// the server has a built-in limit for the batch size
/// and will reject bigger HTTP request bodies
////////////////////////////////////////////////////////////////////////////////
unsigned const ImportHelper::MaxBatchSize = 768 * 1024 * 1024;
////////////////////////////////////////////////////////////////////////////////
/// constructor and destructor
////////////////////////////////////////////////////////////////////////////////
@ -142,9 +150,12 @@ double const ImportHelper::ProgressStep = 3.0;
ImportHelper::ImportHelper(ClientFeature const* client,
std::string const& endpoint,
httpclient::SimpleHttpClientParams const& params,
uint64_t maxUploadSize, uint32_t threadCount)
uint64_t maxUploadSize, uint32_t threadCount, bool autoUploadSize)
: _httpClient(client->createHttpClient(endpoint, params)),
_maxUploadSize(maxUploadSize),
_periodByteCount(0),
_autoUploadSize(autoUploadSize),
_threadCount(threadCount),
_separator(","),
_quote("\""),
_createCollectionType("document"),
@ -175,8 +186,14 @@ ImportHelper::ImportHelper(ClientFeature const* client,
}));
_senderThreads.back()->start();
}
// wait until all sender threads are ready
// should self tuning code activate?
if (_autoUploadSize) {
_autoTuneThread.reset(new AutoTuneThread(*this));
_autoTuneThread->start();
} // if
// wait until all sender threads are ready
while (true) {
uint32_t numReady = 0;
for (auto const& t : _senderThreads) {
@ -306,7 +323,7 @@ bool ImportHelper::importDelimited(std::string const& collectionName,
waitForSenders();
reportProgress(totalLength, totalRead, nextProgress);
_outputBuffer.clear();
return !_hasError;
}
@ -432,7 +449,7 @@ bool ImportHelper::importJson(std::string const& collectionName,
waitForSenders();
reportProgress(totalLength, totalRead, nextProgress);
MUTEX_LOCKER(guard, _stats._mutex);
// this is an approximation only. _numberLines is more meaningful for CSV
// imports
@ -540,11 +557,11 @@ void ImportHelper::addField(char const* field, size_t fieldLength, size_t row,
_removeAttributes.find(_columnNames[column]) != _removeAttributes.end()) {
return;
}
if (column > 0) {
_lineBuffer.appendChar(',');
}
if (_keyColumn == -1 && row == _rowsToSkip && fieldLength == 4 &&
memcmp(field, "_key", 4) == 0) {
_keyColumn = column;
@ -805,7 +822,9 @@ void ImportHelper::sendCsvBuffer() {
SenderThread* t = findIdleSender();
if (t != nullptr) {
uint64_t tmp_length = _outputBuffer.length();
t->sendData(url, &_outputBuffer);
addPeriodByteCount(tmp_length + url.length());
}
_outputBuffer.reset();
@ -847,13 +866,21 @@ void ImportHelper::sendJsonBuffer(char const* str, size_t len, bool isObject) {
if (t != nullptr) {
StringBuffer buff(len, false);
buff.appendText(str, len);
uint64_t tmp_length = buff.length();
t->sendData(url, &buff);
addPeriodByteCount(tmp_length + url.length());
}
}
/// Should return an idle sender, collect all errors
/// and return nullptr, if there was an error
SenderThread* ImportHelper::findIdleSender() {
if (_autoUploadSize) {
_autoTuneThread->paceSends();
} // if
while (!_senderThreads.empty()) {
for (auto const& t : _senderThreads) {
if (t->hasError()) {

View File

@ -25,6 +25,10 @@
#ifndef ARANGODB_IMPORT_IMPORT_HELPER_H
#define ARANGODB_IMPORT_IMPORT_HELPER_H 1
#include <atomic>
#include "AutoTuneThread.h"
#include "Basics/Common.h"
#include "Basics/ConditionVariable.h"
#include "Basics/Mutex.h"
@ -77,7 +81,7 @@ class ImportHelper {
public:
ImportHelper(ClientFeature const* client, std::string const& endpoint,
httpclient::SimpleHttpClientParams const& params,
uint64_t maxUploadSize, uint32_t threadCount);
uint64_t maxUploadSize, uint32_t threadCount, bool autoUploadSize=false);
~ImportHelper();
@ -157,7 +161,7 @@ class ImportHelper {
std::unordered_map<std::string, std::string> const& translations) {
_translations = translations;
}
void setRemoveAttributes(std::vector<std::string> const& attr) {
for (std::string const& str : attr) {
_removeAttributes.insert(str);
@ -245,6 +249,16 @@ class ImportHelper {
std::vector<std::string> getErrorMessages() { return _errorMessages; }
uint64_t getMaxUploadSize() {return(_maxUploadSize.load());}
void setMaxUploadSize(uint64_t newSize) {_maxUploadSize.store(newSize);}
uint64_t rotatePeriodByteCount() {return(_periodByteCount.exchange(0));}
void addPeriodByteCount(uint64_t add) {_periodByteCount.fetch_add(add);}
uint32_t getThreadCount() const {return _threadCount;}
static unsigned const MaxBatchSize;
private:
static void ProcessCsvBegin(TRI_csv_parser_t*, size_t);
static void ProcessCsvAdd(TRI_csv_parser_t*, char const*, size_t, size_t,
@ -270,9 +284,14 @@ class ImportHelper {
private:
std::unique_ptr<httpclient::SimpleHttpClient> _httpClient;
uint64_t const _maxUploadSize;
std::atomic<uint64_t> _maxUploadSize;
std::atomic<uint64_t> _periodByteCount;
bool const _autoUploadSize;
std::unique_ptr<AutoTuneThread> _autoTuneThread;
std::vector<std::unique_ptr<SenderThread>> _senderThreads;
basics::ConditionVariable _threadsCondition;
uint32_t const _threadCount;
basics::ConditionVariable _threadsCondition;
std::string _separator;
std::string _quote;

View File

@ -252,7 +252,7 @@ void Thread::beginShutdown() {
}
////////////////////////////////////////////////////////////////////////////////
/// @brief called from the destructor
/// @brief derived class MUST call from its destructor
////////////////////////////////////////////////////////////////////////////////
void Thread::shutdown() {