1
0
Fork 0
arangodb/arangosh/Import/AutoTuneThread.cpp

142 lines
4.7 KiB
C++

////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2018 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Matthew Von-Maszewski
////////////////////////////////////////////////////////////////////////////////
#include <iostream>
#include <thread>
#include "AutoTuneThread.h"
#include "ImportFeature.h"
#include "ImportHelper.h"
#include "Basics/ConditionLocker.h"
#include "Logger/LogMacros.h"
#include "Logger/Logger.h"
#include "Logger/LoggerStream.h"
using namespace arangodb;
using namespace arangodb::import;
////////////////////////////////////////////////////////////////////////////////
// Goals:
// 1. compute current one second throughput of import
// 2. spread byte count of one second throughput across sender threads
// 3. create "space" between sender execution to give server time for other
// activities
//
// The code collects the total count of bytes absorbed for ten seconds, then
// averages
// that amount with the total from the previous 10 seconds. The per second per
// thread pace is therefore average divided by the thread count divided by 10.
//
// The pace starts "slow", 1 megabyte per second. Each recalculation of pace
// adds
// a 20% growth factor above the actual calculation from average bytes
// consumed.
//
// The pacing code also notices when threads are completing quickly. It will
// release
// a new thread early in such cases to again encourage rate growth.
//
////////////////////////////////////////////////////////////////////////////////
AutoTuneThread::AutoTuneThread(application_features::ApplicationServer& server,
ImportHelper& importHelper)
: Thread(server, "AutoTuneThread"),
_importHelper(importHelper),
_nextSend(std::chrono::steady_clock::now()),
_pace(std::chrono::milliseconds(1000 / importHelper.getThreadCount())) {}
AutoTuneThread::~AutoTuneThread() { shutdown(); }
void AutoTuneThread::beginShutdown() {
Thread::beginShutdown();
// wake up the thread that may be waiting in run()
CONDITION_LOCKER(guard, _condition);
guard.broadcast();
}
void AutoTuneThread::run() {
while (!isStopping()) {
{
CONDITION_LOCKER(guard, _condition);
guard.wait(std::chrono::seconds(10));
}
if (!isStopping()) {
// getMaxUploadSize() is per thread
uint64_t current_max = _importHelper.getMaxUploadSize();
current_max *= _importHelper.getThreadCount();
uint64_t ten_second_actual = _importHelper.rotatePeriodByteCount();
uint64_t new_max;
// is current_max way too big
if (ten_second_actual < current_max && 10 < ten_second_actual) {
new_max = ten_second_actual / 10;
} else if (ten_second_actual <= 10) {
new_max = current_max / 10;
} else {
new_max = (current_max + ten_second_actual / 10) / 2;
}
// grow number slowly if possible (20%)
new_max += new_max / 5;
// make "per thread"
new_max /= _importHelper.getThreadCount();
// notes in Import mention an internal limit of 768MBytes
if ((arangodb::import::ImportHelper::MaxBatchSize) < new_max) {
new_max = arangodb::import::ImportHelper::MaxBatchSize;
}
LOG_TOPIC("e815e", DEBUG, arangodb::Logger::FIXME)
<< "Current: " << current_max << ", ten_sec: " << ten_second_actual
<< ", new_max: " << new_max;
_importHelper.setMaxUploadSize(new_max);
}
}
}
void AutoTuneThread::paceSends() {
auto now = std::chrono::steady_clock::now();
bool next_reset(false);
// has _nextSend time_point already passed?
// if so, move to next increment of _pace to force wait
while (_nextSend <= now) {
_nextSend += _pace;
next_reset = true;
}
std::this_thread::sleep_until(_nextSend);
// if the previous send thread thread was found really quickly,
// assume arangodb is absorbing data faster than current rate.
// try doubling rate by halfing pace time for subsequent send.
if (!next_reset && _pace / 2 < _nextSend - now)
_nextSend = _nextSend + _pace / 2;
else
_nextSend = _nextSend + _pace;
} // AutoTuneThread::paceSends