1
0
Fork 0

more detailed progress reporting for arangorestore (#6329)

This commit is contained in:
Jan 2018-09-03 15:39:39 +02:00 committed by jsteemann
parent 66cceb3c37
commit 54c3ad572c
3 changed files with 120 additions and 18 deletions

View File

@ -27,6 +27,9 @@
#include <velocypack/velocypack-aliases.h>
#include <boost/algorithm/clamp.hpp>
#include <chrono>
#include <thread>
#include "ApplicationFeatures/ApplicationServer.h"
#include "Basics/FileUtils.h"
#include "Basics/Result.h"
@ -437,14 +440,18 @@ arangodb::Result restoreData(arangodb::httpclient::SimpleHttpClient& httpClient,
return result;
}
}
int64_t const fileSize = TRI_SizeFile(datafile->path().c_str());
if (jobData.options.progress) {
LOG_TOPIC(INFO, Logger::RESTORE) << "# Loading data into " << collectionType
<< " collection '" << cname << "'...";
<< " collection '" << cname << "', data size: " << fileSize << " byte(s)";
}
buffer.clear();
int64_t numReadForThisCollection = 0;
int64_t numReadSinceLastReport = 0;
buffer.clear();
while (true) {
if (buffer.reserve(16384) != TRI_ERROR_NO_ERROR) {
result = {TRI_ERROR_OUT_OF_MEMORY, "out of memory"};
@ -459,6 +466,8 @@ arangodb::Result restoreData(arangodb::httpclient::SimpleHttpClient& httpClient,
// we read something
buffer.increaseLength(numRead);
jobData.stats.totalRead += (uint64_t)numRead;
numReadForThisCollection += numRead;
numReadSinceLastReport += numRead;
if (buffer.length() < jobData.options.chunkSize && numRead > 0) {
continue; // still continue reading
@ -485,6 +494,8 @@ arangodb::Result restoreData(arangodb::httpclient::SimpleHttpClient& httpClient,
jobData.stats.totalBatches++;
result = ::sendRestoreData(httpClient, jobData.options, cname,
buffer.begin(), length);
jobData.stats.totalSent += length;
if (result.fail()) {
if (jobData.options.force) {
LOG_TOPIC(ERR, Logger::RESTORE) << result.errorMessage();
@ -495,6 +506,17 @@ arangodb::Result restoreData(arangodb::httpclient::SimpleHttpClient& httpClient,
}
buffer.erase_front(length);
if (jobData.options.progress &&
fileSize > 0 &&
numReadSinceLastReport > 1024 * 1024 * 8) {
// report every 8MB of transferred data
LOG_TOPIC(INFO, Logger::RESTORE) << "# Still loading data into " << collectionType
<< " collection '" << cname << "', "
<< numReadForThisCollection << " of " << fileSize
<< " byte(s) restored (" << int(100. * double(numReadForThisCollection) / double(fileSize)) << " %)";
numReadSinceLastReport = 0;
}
}
if (numRead == 0) { // EOF
@ -640,14 +662,16 @@ arangodb::Result processInputDirectory(
}
}
std::sort(collections.begin(), collections.end(), ::sortCollections);
LOG_TOPIC(INFO, Logger::RESTORE) << "# Creating views...";
// Step 2: recreate all views
for (VPackBuilder viewDefinition : views) {
LOG_TOPIC(DEBUG, Logger::RESTORE) << "# Creating view: " << viewDefinition.toJson();
Result res = ::restoreView(httpClient, options, viewDefinition.slice());
if (res.fail()) {
return res;
if (options.importStructure && !views.empty()) {
LOG_TOPIC(INFO, Logger::RESTORE) << "# Creating views...";
// Step 2: recreate all views
for (VPackBuilder const& viewDefinition : views) {
LOG_TOPIC(DEBUG, Logger::RESTORE) << "# Creating view: " << viewDefinition.toJson();
Result res = ::restoreView(httpClient, options, viewDefinition.slice());
if (res.fail()) {
return res;
}
}
}
@ -668,11 +692,45 @@ arangodb::Result processInputDirectory(
stats.totalCollections++;
// now let the index and data restoration happen async+parallel
jobQueue.queueJob(std::move(jobData));
if (!jobQueue.queueJob(std::move(jobData))) {
return Result(TRI_ERROR_OUT_OF_MEMORY, "unable to queue restore job");
}
}
// wait for all jobs to finish, then check for errors
if (options.progress) {
LOG_TOPIC(INFO, Logger::RESTORE) << "# Dispatched " << stats.totalCollections << " job(s) to " << options.threadCount << " worker(s)";
double start = TRI_microtime();
while (true) {
if (jobQueue.isQueueEmpty() && jobQueue.allWorkersIdle()) {
// done
break;
}
double now = TRI_microtime();
if (now - start >= 5.0) {
// returns #queued jobs, #workers total, #workers busy
auto queueStats = jobQueue.statistics();
// periodically report current status, but do not spam user
LOG_TOPIC(INFO, Logger::RESTORE)
<< "# Worker progress summary: restored " << stats.restoredCollections
<< " of " << stats.totalCollections << " collection(s), read " << stats.totalRead << " byte(s) from datafiles, "
<< "sent " << stats.totalBatches << " data batch(es) of " << stats.totalSent << " byte(s) total size"
<< ", queued jobs: " << std::get<0>(queueStats) << ", workers: " << std::get<1>(queueStats);
start = now;
}
// don't sleep for too long, as we want to quickly terminate
// when the gets empty
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
}
// should instantly return
jobQueue.waitForIdle();
Result firstError = feature.getFirstError();
if (firstError.fail()) {
return firstError;
@ -713,6 +771,20 @@ arangodb::Result processJob(arangodb::httpclient::SimpleHttpClient& httpClient,
return result;
}
}
++jobData.stats.restoredCollections;
if (jobData.options.progress) {
VPackSlice const parameters = jobData.collection.get("parameters");
std::string const cname = arangodb::basics::VelocyPackHelper::getStringValue(
parameters, "name", "");
int type = arangodb::basics::VelocyPackHelper::getNumericValue<int>(
parameters, "type", 2);
std::string const collectionType(type == 2 ? "document" : "edge");
LOG_TOPIC(INFO, arangodb::Logger::RESTORE)
<< "# Successfully restored " << collectionType << " collection '" << cname << "'";
}
return result;
}
@ -958,12 +1030,16 @@ void RestoreFeature::start() {
if (_options.progress) {
LOG_TOPIC(INFO, Logger::RESTORE)
<< "# Connected to ArangoDB '" << httpClient->getEndpointSpecification()
<< "Connected to ArangoDB '" << httpClient->getEndpointSpecification()
<< "'";
}
// set up threads and workers
_clientTaskQueue.spawnWorkers(_clientManager, _options.threadCount);
if (_options.progress) {
LOG_TOPIC(INFO, Logger::RESTORE) << "Using " << _options.threadCount << " worker thread(s)";
}
// run the actual restore
try {
@ -988,13 +1064,13 @@ void RestoreFeature::start() {
if (_options.importData) {
LOG_TOPIC(INFO, Logger::RESTORE)
<< "Processed " << _stats.totalCollections
<< " collection(s) in " << Logger::FIXED(totalTime, 6) << " s,"
<< "Processed " << _stats.restoredCollections
<< " collection(s) in " << Logger::FIXED(totalTime, 6) << " s, "
<< "read " << _stats.totalRead << " byte(s) from datafiles, "
<< "sent " << _stats.totalBatches << " batch(es)";
<< "sent " << _stats.totalBatches << " data batch(es) of " << _stats.totalSent << " byte(s) total size";
} else if (_options.importStructure) {
LOG_TOPIC(INFO, Logger::RESTORE)
<< "Processed " << _stats.totalCollections
<< "Processed " << _stats.restoredCollections
<< " collection(s) in " << Logger::FIXED(totalTime, 6) << " s";
}
}

View File

@ -97,7 +97,9 @@ class RestoreFeature final : public application_features::ApplicationFeature {
/// @brief Stores stats about the overall restore progress
struct Stats {
std::atomic<uint64_t> totalBatches{0};
std::atomic<uint64_t> totalSent{0};
std::atomic<uint64_t> totalCollections{0};
std::atomic<uint64_t> restoredCollections{0};
std::atomic<uint64_t> totalRead{0};
};
@ -127,4 +129,4 @@ class RestoreFeature final : public application_features::ApplicationFeature {
} // namespace arangodb
#endif
#endif

View File

@ -107,6 +107,16 @@ class ClientTaskQueue {
* @return `true` if there are no pending jobs
*/
bool isQueueEmpty() const noexcept;
/**
* @brief Determines the number of currently queued jobs, the number
* of total workers and the number of busy workers
*
* Thread-safe.
*
* @return number of queued jobs, number of workers, number of busy workers
*/
std::tuple<size_t, size_t, size_t> statistics() const noexcept;
/**
* @brief Determines if all workers are currently busy processing a job
@ -237,6 +247,20 @@ inline bool ClientTaskQueue<JobData>::isQueueEmpty() const noexcept {
return isEmpty;
}
template <typename JobData>
inline std::tuple<size_t, size_t, size_t> ClientTaskQueue<JobData>::statistics() const noexcept {
size_t busy = 0;
size_t workers = 0;
MUTEX_LOCKER(lock, _jobsLock);
for (auto& worker : _workers) {
++workers;
if (worker->isIdle()) {
++busy;
}
}
return std::make_tuple(_jobs.size(), workers, busy);
}
template <typename JobData>
inline bool ClientTaskQueue<JobData>::allWorkersBusy() const noexcept {
try {
@ -298,7 +322,7 @@ inline void ClientTaskQueue<JobData>::waitForIdle() noexcept {
}
CONDITION_LOCKER(lock, _workersCondition);
lock.wait(std::chrono::milliseconds(500));
lock.wait(std::chrono::milliseconds(250));
}
} catch (...) {
}