1
0
Fork 0

Refactor stuff, add async batch extension task (#6875) (#6880)

This commit is contained in:
Simon 2018-10-15 13:18:24 +02:00 committed by Jan
parent 239771b5f3
commit 6628a4e55a
19 changed files with 257 additions and 255 deletions

View File

@ -258,27 +258,17 @@ Result handleSyncKeysMMFiles(arangodb::DatabaseInitialSyncer& syncer,
"' from " + url;
syncer.setProgress(progress);
std::unique_ptr<httpclient::SimpleHttpResult> response(
syncer._state.connection.client->retryRequest(rest::RequestType::GET, url,
nullptr, 0));
std::unique_ptr<httpclient::SimpleHttpResult> response;
syncer._state.connection.lease([&](httpclient::SimpleHttpClient* client) {
response.reset(client->retryRequest(rest::RequestType::GET, url, nullptr, 0));
});
if (response == nullptr || !response->isComplete()) {
return Result(TRI_ERROR_REPLICATION_NO_RESPONSE,
std::string("could not connect to master at ") +
syncer._state.master.endpoint + ": " +
syncer._state.connection.client->getErrorMessage());
if (replutils::hasFailed(response.get())) {
return buildHttpError(response.get(), url, syncer._state.connection);
}
TRI_ASSERT(response != nullptr);
if (response->wasHttpError()) {
return Result(TRI_ERROR_REPLICATION_MASTER_ERROR,
std::string("got invalid response from master at ") +
syncer._state.master.endpoint + ": HTTP " +
basics::StringUtils::itoa(response->getHttpReturnCode()) +
": " + response->getHttpReturnMessage());
}
VPackBuilder builder;
Result r = replutils::parseResponse(builder, response.get());
@ -486,29 +476,18 @@ Result handleSyncKeysMMFiles(arangodb::DatabaseInitialSyncer& syncer,
progress = "fetching keys chunk " + std::to_string(currentChunkId) +
" for collection '" + coll->name() + "' from " + url;
syncer.setProgress(progress);
std::unique_ptr<httpclient::SimpleHttpResult> response(
syncer._state.connection.client->retryRequest(rest::RequestType::PUT,
url, nullptr, 0));
if (response == nullptr || !response->isComplete()) {
return Result(TRI_ERROR_REPLICATION_NO_RESPONSE,
std::string("could not connect to master at ") +
syncer._state.master.endpoint + ": " +
syncer._state.connection.client->getErrorMessage());
std::unique_ptr<httpclient::SimpleHttpResult> response;
syncer._state.connection.lease([&](httpclient::SimpleHttpClient* client) {
response.reset(client->retryRequest(rest::RequestType::PUT, url, nullptr, 0));
});
if (replutils::hasFailed(response.get())) {
return buildHttpError(response.get(), url, syncer._state.connection);
}
TRI_ASSERT(response != nullptr);
if (response->wasHttpError()) {
return Result(
TRI_ERROR_REPLICATION_MASTER_ERROR,
std::string("got invalid response from master at ") +
syncer._state.master.endpoint + ": HTTP " +
basics::StringUtils::itoa(response->getHttpReturnCode()) +
": " + response->getHttpReturnMessage());
}
VPackBuilder builder;
Result r = replutils::parseResponse(builder, response.get());
@ -679,31 +658,19 @@ Result handleSyncKeysMMFiles(arangodb::DatabaseInitialSyncer& syncer,
coll->name() + "' from " + url;
syncer.setProgress(progress);
std::unique_ptr<httpclient::SimpleHttpResult> response(
syncer._state.connection.client->retryRequest(
rest::RequestType::PUT, url, keyJsonString.c_str(),
keyJsonString.size()));
if (response == nullptr || !response->isComplete()) {
return Result(
TRI_ERROR_REPLICATION_NO_RESPONSE,
std::string("could not connect to master at ") +
syncer._state.master.endpoint + ": " +
syncer._state.connection.client->getErrorMessage());
std::unique_ptr<httpclient::SimpleHttpResult> response;
syncer._state.connection.lease([&](httpclient::SimpleHttpClient* client) {
response.reset(client->retryRequest(rest::RequestType::PUT, url, keyJsonString.c_str(),
keyJsonString.size()));
});
if (replutils::hasFailed(response.get())) {
return buildHttpError(response.get(), url, syncer._state.connection);
}
TRI_ASSERT(response != nullptr);
if (response->wasHttpError()) {
return Result(
TRI_ERROR_REPLICATION_MASTER_ERROR,
std::string("got invalid response from master at ") +
syncer._state.master.endpoint + ": HTTP " +
basics::StringUtils::itoa(response->getHttpReturnCode()) +
": " + response->getHttpReturnMessage());
}
VPackBuilder builder;
Result r = replutils::parseResponse(builder, response.get());

View File

@ -421,10 +421,10 @@ void Conductor::startRecovery() {
TRI_ASSERT(SchedulerFeature::SCHEDULER != nullptr);
// let's wait for a final state in the cluster
_boost_timer.reset(SchedulerFeature::SCHEDULER->newDeadlineTimer(
boost::posix_time::seconds(2)));
_boost_timer->async_wait([this](const asio::error_code& error) {
_boost_timer.reset();
_steady_timer.reset(SchedulerFeature::SCHEDULER->newSteadyTimer());
_steady_timer->expires_after(std::chrono::seconds(2));
_steady_timer->async_wait([this](const asio::error_code& error) {
_steady_timer.reset();
if (error == asio::error::operation_aborted ||
_state != ExecutionState::RECOVERING) {

View File

@ -91,7 +91,7 @@ class Conductor {
double _startTimeSecs = 0;
double _computationStartTimeSecs = 0;
double _endTimeSecs = 0;
std::unique_ptr<asio::deadline_timer> _boost_timer;
std::unique_ptr<asio::steady_timer> _steady_timer;
bool _startGlobalStep();
int _initializeWorkers(std::string const& path, VPackSlice additional);

View File

@ -582,9 +582,9 @@ void Worker<V, E, M>::_continueAsync() {
int64_t milli =
_writeCache->containedMessageCount() < _messageBatchSize ? 50 : 5;
// start next iteration in $milli mseconds.
_boost_timer.reset(SchedulerFeature::SCHEDULER->newDeadlineTimer(
boost::posix_time::millisec(milli)));
_boost_timer->async_wait([this](const asio::error_code& error) {
_steady_timer.reset(SchedulerFeature::SCHEDULER->newSteadyTimer());
_steady_timer->expires_after(std::chrono::milliseconds(milli));
_steady_timer->async_wait([this](const asio::error_code& error) {
if (error != asio::error::operation_aborted) {
{ // swap these pointers atomically
MY_WRITE_LOCKER(guard, _cacheRWLock);
@ -599,7 +599,7 @@ void Worker<V, E, M>::_continueAsync() {
_conductorAggregators->aggregateValues(*_workerAggregators.get());
_workerAggregators->resetValues();
_startProcessing();
_boost_timer.reset();
_steady_timer.reset();
}
});
}

View File

@ -136,7 +136,7 @@ class Worker : public IWorker {
std::atomic<uint64_t> _nextGSSSendMessageCount;
/// if the worker has started sendng messages to the next GSS
std::atomic<bool> _requestedNextGSS;
std::unique_ptr<asio::deadline_timer> _boost_timer;
std::unique_ptr<asio::steady_timer> _steady_timer;
void _initializeMessageCaches();
void _initializeVertexContext(VertexContext<V, E, M>* ctx);

View File

@ -198,6 +198,8 @@ Result DatabaseInitialSyncer::runWithInventory(bool incremental,
if (r.fail()) {
return r;
}
startRecurringBatchExtension();
}
VPackSlice collections, views;
@ -259,8 +261,8 @@ Result DatabaseInitialSyncer::runWithInventory(bool incremental,
}
}
/// @brief returns the inventory
Result DatabaseInitialSyncer::inventory(VPackBuilder& builder) {
/// @brief fetch the server's inventory, public method for TailingSyncer
Result DatabaseInitialSyncer::getInventory(VPackBuilder& builder) {
if (!_state.connection.valid()) {
return Result(TRI_ERROR_INTERNAL, "invalid endpoint");
}
@ -331,9 +333,11 @@ Result DatabaseInitialSyncer::sendFlush() {
// send request
_config.progress.set("sending WAL flush command to url " + url);
std::unique_ptr<httpclient::SimpleHttpResult> response(
_config.connection.client->retryRequest(rest::RequestType::PUT, url,
body.c_str(), body.size()));
std::unique_ptr<httpclient::SimpleHttpResult> response;
_config.connection.lease([&](httpclient::SimpleHttpClient* client) {
response.reset(client->retryRequest(rest::RequestType::PUT, url,
body.c_str(), body.size()));
});
if (replutils::hasFailed(response.get())) {
return replutils::buildHttpError(response.get(), url, _config.connection);
@ -530,9 +534,12 @@ void DatabaseInitialSyncer::fetchDumpChunk(std::shared_ptr<Syncer::JobSynchroniz
double t = TRI_microtime();
// send request
std::unique_ptr<httpclient::SimpleHttpResult> response(
_config.connection.client->retryRequest(rest::RequestType::GET, url,
nullptr, 0, headers));
std::unique_ptr<httpclient::SimpleHttpResult> response;
_config.connection.lease([&](httpclient::SimpleHttpClient* client) {
response.reset(client->retryRequest(rest::RequestType::GET, url,
nullptr, 0, headers));
});
if (replutils::hasFailed(response.get())) {
stats.waitedForDump += TRI_microtime() - t;
@ -564,8 +571,9 @@ void DatabaseInitialSyncer::fetchDumpChunk(std::shared_ptr<Syncer::JobSynchroniz
}
std::string const jobUrl = "/_api/job/" + jobId;
response.reset(_config.connection.client->request(
rest::RequestType::PUT, jobUrl, nullptr, 0));
_config.connection.lease([&](httpclient::SimpleHttpClient* client) {
response.reset(client->request(rest::RequestType::PUT, jobUrl, nullptr, 0));
});
if (response != nullptr && response->isComplete()) {
if (response->hasHeaderField("x-arango-async-id")) {
@ -841,9 +849,12 @@ Result DatabaseInitialSyncer::fetchCollectionSync(
// so we're sending the x-arango-async header here
auto headers = replutils::createHeaders();
headers[StaticStrings::Async] = "store";
std::unique_ptr<httpclient::SimpleHttpResult> response(
_config.connection.client->retryRequest(rest::RequestType::POST, url,
nullptr, 0, headers));
std::unique_ptr<httpclient::SimpleHttpResult> response;
_config.connection.lease([&](httpclient::SimpleHttpClient* client) {
response.reset(client->retryRequest(rest::RequestType::POST, url,
nullptr, 0, headers));
});
if (replutils::hasFailed(response.get())) {
return replutils::buildHttpError(response.get(), url, _config.connection);
@ -868,8 +879,9 @@ Result DatabaseInitialSyncer::fetchCollectionSync(
}
std::string const jobUrl = "/_api/job/" + jobId;
response.reset(_config.connection.client->request(rest::RequestType::PUT,
jobUrl, nullptr, 0));
_config.connection.lease([&](httpclient::SimpleHttpClient* client) {
response.reset(client->request(rest::RequestType::PUT, jobUrl, nullptr, 0));
});
if (response != nullptr && response->isComplete()) {
if (response->hasHeaderField("x-arango-async-id")) {
@ -940,9 +952,11 @@ Result DatabaseInitialSyncer::fetchCollectionSync(
_config.progress.set(msg);
// now delete the keys we ordered
std::unique_ptr<httpclient::SimpleHttpResult> response(
_config.connection.client->retryRequest(rest::RequestType::DELETE_REQ,
url, nullptr, 0));
std::unique_ptr<httpclient::SimpleHttpResult> response;
_config.connection.lease([&](httpclient::SimpleHttpClient* client) {
response.reset(client->retryRequest(rest::RequestType::DELETE_REQ,
url, nullptr, 0));
});
};
TRI_DEFER(shutdown());
@ -1364,9 +1378,11 @@ arangodb::Result DatabaseInitialSyncer::fetchInventory(VPackBuilder& builder) {
// send request
_config.progress.set("fetching master inventory from " + url);
std::unique_ptr<httpclient::SimpleHttpResult> response(
_config.connection.client->retryRequest(rest::RequestType::GET, url,
nullptr, 0));
std::unique_ptr<httpclient::SimpleHttpResult> response;
_config.connection.lease([&](httpclient::SimpleHttpClient* client) {
response.reset(client->retryRequest(rest::RequestType::GET, url, nullptr, 0));
});
if (replutils::hasFailed(response.get())) {
if (!_config.isChild()) {
_config.batch.finish(_config.connection, _config.progress);

View File

@ -28,6 +28,7 @@
#include "Cluster/ServerState.h"
#include "Replication/InitialSyncer.h"
#include "Replication/utilities.h"
#include "Utils/SingleCollectionTransaction.h"
struct TRI_vocbase_t;
@ -161,7 +162,7 @@ class DatabaseInitialSyncer final : public InitialSyncer {
double batchUpdateTime() const { return _config.batch.updateTime; }
/// @brief fetch the server's inventory, public method
Result inventory(arangodb::velocypack::Builder& builder);
Result getInventory(arangodb::velocypack::Builder& builder);
private:
/// @brief order a new chunk from the /dump API

View File

@ -133,9 +133,10 @@ Result DatabaseTailingSyncer::syncCollectionFinalize(
"&collection=" + StringUtils::urlEncode(collectionName);
// send request
std::unique_ptr<SimpleHttpResult> response(
_state.connection.client->request(rest::RequestType::GET, url, nullptr,
0));
std::unique_ptr<httpclient::SimpleHttpResult> response;
_state.connection.lease([&](httpclient::SimpleHttpClient* client) {
response.reset(client->request(rest::RequestType::GET, url, nullptr, 0));
});
if (replutils::hasFailed(response.get())) {
return replutils::buildHttpError(response.get(), url, _state.connection);
@ -234,7 +235,7 @@ bool DatabaseTailingSyncer::skipMarker(VPackSlice const& slice) {
VPackBuilder inventoryResponse;
auto init = std::make_shared<DatabaseInitialSyncer>(*_vocbase, _state.applier);
Result res = init->inventory(inventoryResponse);
Result res = init->getInventory(inventoryResponse);
_queriedTranslations = true;
if (res.fail()) {
LOG_TOPIC(ERR, Logger::REPLICATION) << "got error while fetching master inventory for collection name translations: " << res.errorMessage();

View File

@ -128,8 +128,11 @@ Result GlobalInitialSyncer::runInternal(bool incremental) {
if (r.fail()) {
return r;
}
startRecurringBatchExtension();
}
TRI_DEFER(if (!_state.isChildSyncer) {
_batchPingTimer->cancel();
_batch.finish(_state.connection, _progress);
});
LOG_TOPIC(DEBUG, Logger::REPLICATION) << "sending start batch done";
@ -198,7 +201,7 @@ Result GlobalInitialSyncer::runInternal(bool incremental) {
DatabaseGuard guard(nameSlice.copyString());
// change database name in place
auto configurationCopy = _state.applier;
ReplicationApplierConfiguration configurationCopy = _state.applier;
configurationCopy._database = nameSlice.copyString();
auto syncer = std::make_shared<DatabaseInitialSyncer>(*vocbase, configurationCopy);
@ -364,8 +367,8 @@ Result GlobalInitialSyncer::updateServerInventory(
return TRI_ERROR_NO_ERROR;
}
/// @brief returns the inventory
Result GlobalInitialSyncer::inventory(VPackBuilder& builder) {
/// @brief fetch the server's inventory, public method for TailingSyncer
Result GlobalInitialSyncer::getInventory(VPackBuilder& builder) {
if (!_state.connection.valid()) {
return Result(TRI_ERROR_INTERNAL, "invalid endpoint");
} else if (application_features::ApplicationServer::isStopping()) {
@ -392,9 +395,10 @@ Result GlobalInitialSyncer::fetchInventory(VPackBuilder& builder) {
}
// send request
std::unique_ptr<SimpleHttpResult> response(
_state.connection.client->retryRequest(rest::RequestType::GET, url,
nullptr, 0));
std::unique_ptr<httpclient::SimpleHttpResult> response;
_state.connection.lease([&](httpclient::SimpleHttpClient* client) {
response.reset(client->retryRequest(rest::RequestType::GET, url, nullptr, 0));
});
if (replutils::hasFailed(response.get())) {
if (!_state.isChildSyncer) {

View File

@ -41,7 +41,7 @@ class GlobalInitialSyncer final : public InitialSyncer {
arangodb::Result run(bool incremental) override;
/// @brief fetch the server's inventory, public method
Result inventory(arangodb::velocypack::Builder& builder);
Result getInventory(arangodb::velocypack::Builder& builder);
private:

View File

@ -33,7 +33,6 @@
using namespace arangodb;
using namespace arangodb::basics;
using namespace arangodb::httpclient;
using namespace arangodb::rest;
GlobalTailingSyncer::GlobalTailingSyncer(
ReplicationApplierConfiguration const& configuration,
@ -106,7 +105,7 @@ bool GlobalTailingSyncer::skipMarker(VPackSlice const& slice) {
try {
GlobalInitialSyncer init(_state.applier);
VPackBuilder inventoryResponse;
Result res = init.inventory(inventoryResponse);
Result res = init.getInventory(inventoryResponse);
_queriedTranslations = true;
if (res.fail()) {

View File

@ -22,45 +22,11 @@
////////////////////////////////////////////////////////////////////////////////
#include "InitialSyncer.h"
#include "ApplicationFeatures/ApplicationServer.h"
#include "Basics/Exceptions.h"
#include "Basics/ReadLocker.h"
#include "Basics/Result.h"
#include "Basics/RocksDBUtils.h"
#include "Basics/StaticStrings.h"
#include "Basics/StringUtils.h"
#include "Basics/VelocyPackHelper.h"
#include "Indexes/Index.h"
#include "Indexes/IndexIterator.h"
#include "Logger/Logger.h"
#include "Replication/DatabaseReplicationApplier.h"
#include "Replication/utilities.h"
#include "RestServer/DatabaseFeature.h"
#include "SimpleHttpClient/SimpleHttpClient.h"
#include "SimpleHttpClient/SimpleHttpResult.h"
#include "StorageEngine/EngineSelectorFeature.h"
#include "StorageEngine/PhysicalCollection.h"
#include "StorageEngine/StorageEngine.h"
#include "Transaction/Helpers.h"
#include "Transaction/StandaloneContext.h"
#include "Utils/CollectionGuard.h"
#include "Utils/OperationOptions.h"
#include "VocBase/LogicalCollection.h"
#include "VocBase/ManagedDocumentResult.h"
#include "VocBase/voc-types.h"
#include "VocBase/vocbase.h"
#include <velocypack/Builder.h>
#include <velocypack/Iterator.h>
#include <velocypack/Slice.h>
#include <velocypack/velocypack-aliases.h>
#include <cstring>
#include "Scheduler/Scheduler.h"
#include "Scheduler/SchedulerFeature.h"
namespace arangodb {
using namespace arangodb::basics;
using namespace arangodb::httpclient;
using namespace arangodb::rest;
InitialSyncer::InitialSyncer(
ReplicationApplierConfiguration const& configuration,
replutils::ProgressInfo::Setter setter)
@ -68,6 +34,10 @@ InitialSyncer::InitialSyncer(
_progress{setter} {}
InitialSyncer::~InitialSyncer() {
if (_batchPingTimer) {
_batchPingTimer->cancel();
}
try {
if (!_state.isChildSyncer) {
_batch.finish(_state.connection, _progress);
@ -75,5 +45,29 @@ InitialSyncer::~InitialSyncer() {
} catch (...) {
}
}
/// @brief start a recurring task to extend the batch
void InitialSyncer::startRecurringBatchExtension() {
TRI_ASSERT(!_state.isChildSyncer);
if (isAborted()) {
return;
}
if (!_batchPingTimer) {
_batchPingTimer.reset(SchedulerFeature::SCHEDULER->newSteadyTimer());
}
int secs = _batch.ttl / 2;
if (secs < 30) {
secs = 30;
}
_batchPingTimer->expires_after(std::chrono::seconds(secs));
_batchPingTimer->async_wait([this](asio_ns::error_code ec) {
if (!ec && _batch.id != 0 && !isAborted()) {
_batch.extend(_state.connection, _progress);
startRecurringBatchExtension();
}
});
}
} // namespace arangodb

View File

@ -1,7 +1,7 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2017 ArangoDB GmbH, Cologne, Germany
/// Copyright 2014-2018 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
@ -26,10 +26,10 @@
#include "Basics/Common.h"
#include "Basics/Result.h"
#include "Basics/asio_ns.h"
#include "Replication/ReplicationApplierConfiguration.h"
#include "Replication/Syncer.h"
#include "Replication/utilities.h"
#include "Utils/SingleCollectionTransaction.h"
#include <velocypack/Slice.h>
@ -88,10 +88,17 @@ class InitialSyncer : public Syncer {
}
std::string progress() const { return _progress.message; }
protected:
/// @brief start a recurring task to extend the batch
void startRecurringBatchExtension();
protected:
replutils::BatchInfo _batch;
replutils::ProgressInfo _progress;
/// recurring task to keep the batch alive
std::unique_ptr<asio_ns::steady_timer> _batchPingTimer;
};
} // namespace arangodb

View File

@ -1544,8 +1544,10 @@ Result TailingSyncer::fetchOpenTransactions(TRI_voc_tick_t fromTick,
setProgress(progress);
// send request
std::unique_ptr<SimpleHttpResult> response(_state.connection.client->request(
rest::RequestType::GET, url, nullptr, 0));
std::unique_ptr<httpclient::SimpleHttpResult> response;
_state.connection.lease([&](httpclient::SimpleHttpClient* client) {
response.reset(client->request(rest::RequestType::GET, url, nullptr, 0));
});
if (replutils::hasFailed(response.get())) {
return replutils::buildHttpError(response.get(), url, _state.connection);
@ -1690,8 +1692,10 @@ void TailingSyncer::fetchMasterLog(std::shared_ptr<Syncer::JobSynchronizer> shar
std::string body = builder.slice().toJson();
std::unique_ptr<SimpleHttpResult> response(_state.connection.client->request(
rest::RequestType::PUT, url, body.c_str(), body.size()));
std::unique_ptr<httpclient::SimpleHttpResult> response;
_state.connection.lease([&](httpclient::SimpleHttpClient* client) {
response.reset(client->request(rest::RequestType::PUT, url, body.c_str(), body.size()));
});
if (replutils::hasFailed(response.get())) {
// failure

View File

@ -219,36 +219,33 @@ Connection::Connection(Syncer* syncer,
}
params.setMaxPacketSize(applierConfig._maxPacketSize);
params.setLocationRewriter(syncer, &(syncer->rewriteLocation));
client.reset(new httpclient::SimpleHttpClient(connection, params));
_client.reset(new httpclient::SimpleHttpClient(connection, params));
}
}
bool Connection::valid() const { return (client != nullptr); }
bool Connection::valid() const { return (_client != nullptr); }
std::string const& Connection::endpoint() const { return _endpointString; }
std::string const& Connection::localServerId() const { return _localServerId; }
void Connection::setAborted(bool value) {
MUTEX_LOCKER(locker, _mutex);
if (client) {
client->setAborted(value);
if (_client) {
_client->setAborted(value);
}
}
bool Connection::isAborted() const {
MUTEX_LOCKER(locker, _mutex);
if (client) {
return client->isAborted();
if (_client) {
return _client->isAborted();
}
return true;
}
ProgressInfo::ProgressInfo(Setter s) : _mutex{}, _setter{s} {}
ProgressInfo::ProgressInfo(Setter s) : _setter{s} {}
void ProgressInfo::set(std::string const& msg) {
MUTEX_LOCKER(locker, _mutex);
std::lock_guard<std::mutex> guard(_mutex);
_setter(msg);
}
@ -265,9 +262,12 @@ Result BarrierInfo::create(Connection& connection, TRI_voc_tick_t minTick) {
std::string body = builder.slice().toJson();
// send request
std::unique_ptr<httpclient::SimpleHttpResult> response(
connection.client->retryRequest(rest::RequestType::POST, url, body.data(),
body.size()));
std::unique_ptr<httpclient::SimpleHttpResult> response;
connection.lease([&](httpclient::SimpleHttpClient* client) {
response.reset(client->retryRequest(rest::RequestType::POST, url, body.data(),
body.size()));
});
if (hasFailed(response.get())) {
return buildHttpError(response.get(), url, connection);
}
@ -319,9 +319,12 @@ Result BarrierInfo::extend(Connection& connection, TRI_voc_tick_t tick) {
std::string const body = builder.slice().toJson();
// send request
std::unique_ptr<httpclient::SimpleHttpResult> response(
connection.client->request(rest::RequestType::PUT, url, body.data(),
body.size()));
std::unique_ptr<httpclient::SimpleHttpResult> response;
connection.lease([&](httpclient::SimpleHttpClient* client) {
response.reset(client->request(rest::RequestType::PUT, url, body.data(),
body.size()));
});
if (response == nullptr || !response->isComplete()) {
return Result(TRI_ERROR_REPLICATION_NO_RESPONSE);
}
@ -346,8 +349,10 @@ Result BarrierInfo::remove(Connection& connection) noexcept {
std::string const url = replutils::ReplicationUrl + "/barrier/" + itoa(id);
// send request
std::unique_ptr<httpclient::SimpleHttpResult> response(
connection.client->retryRequest(rest::RequestType::DELETE_REQ, url, nullptr, 0));
std::unique_ptr<httpclient::SimpleHttpResult> response;
connection.lease([&](httpclient::SimpleHttpClient* client) {
response.reset(client->retryRequest(rest::RequestType::DELETE_REQ, url, nullptr, 0));
});
if (replutils::hasFailed(response.get())) {
return replutils::buildHttpError(response.get(), url, connection);
@ -375,13 +380,14 @@ Result BatchInfo::start(replutils::Connection& connection,
std::string const url =
ReplicationUrl + "/batch" + "?serverId=" + connection.localServerId();
std::string const body = "{\"ttl\":" + basics::StringUtils::itoa(ttl) + "}";
// send request
progress.set("sending batch start command to url " + url);
std::unique_ptr<httpclient::SimpleHttpResult> response(
connection.client->retryRequest(rest::RequestType::POST, url,
body.c_str(), body.size()));
// send request
std::unique_ptr<httpclient::SimpleHttpResult> response;
connection.lease([&](httpclient::SimpleHttpClient* client) {
response.reset(client->retryRequest(rest::RequestType::POST, url,
body.c_str(), body.size()));
});
if (hasFailed(response.get())) {
return buildHttpError(response.get(), url, connection);
@ -434,13 +440,17 @@ Result BatchInfo::extend(replutils::Connection& connection,
basics::StringUtils::itoa(id) +
"?serverId=" + connection.localServerId();
std::string const body = "{\"ttl\":" + basics::StringUtils::itoa(ttl) + "}";
// send request
progress.set("sending batch extend command to url " + url);
std::unique_ptr<httpclient::SimpleHttpResult> response(
connection.client->request(rest::RequestType::PUT, url, body.c_str(),
body.size()));
// send request
std::unique_ptr<httpclient::SimpleHttpResult> response;
connection.lease([&](httpclient::SimpleHttpClient* client) {
if (id == 0) {
return;
}
response.reset(client->request(rest::RequestType::PUT, url, body.c_str(),
body.size()));
});
if (hasFailed(response.get())) {
return buildHttpError(response.get(), url, connection);
@ -464,12 +474,14 @@ Result BatchInfo::finish(replutils::Connection& connection,
std::string const url = ReplicationUrl + "/batch/" +
basics::StringUtils::itoa(id) +
"?serverId=" + connection.localServerId();
progress.set("sending batch finish command to url " + url);
// send request
progress.set("sending batch finish command to url " + url);
std::unique_ptr<httpclient::SimpleHttpResult> response(
connection.client->retryRequest(rest::RequestType::DELETE_REQ, url,
nullptr, 0));
std::unique_ptr<httpclient::SimpleHttpResult> response;
connection.lease([&](httpclient::SimpleHttpClient* client) {
response.reset(client->retryRequest(rest::RequestType::DELETE_REQ, url,
nullptr, 0));
});
if (hasFailed(response.get())) {
return buildHttpError(response.get(), url, connection);
@ -502,20 +514,22 @@ Result MasterInfo::getState(replutils::Connection& connection,
std::string const url =
ReplicationUrl + "/logger-state?serverId=" + connection.localServerId();
// store old settings
size_t maxRetries = connection.client->params().getMaxRetries();
uint64_t retryWaitTime = connection.client->params().getRetryWaitTime();
// apply settings that prevent endless waiting here
connection.client->params().setMaxRetries(1);
connection.client->params().setRetryWaitTime(500 * 1000); // 0.5s
std::unique_ptr<httpclient::SimpleHttpResult> response(
connection.client->retryRequest(rest::RequestType::GET, url, nullptr, 0));
// restore old settings
connection.client->params().setMaxRetries(maxRetries);
connection.client->params().setRetryWaitTime(retryWaitTime);
std::unique_ptr<httpclient::SimpleHttpResult> response;
connection.lease([&](httpclient::SimpleHttpClient* client) {
// store old settings
size_t maxRetries = client->params().getMaxRetries();
uint64_t retryWaitTime = client->params().getRetryWaitTime();
// apply settings that prevent endless waiting here
client->params().setMaxRetries(1);
client->params().setRetryWaitTime(500 * 1000); // 0.5s
response.reset(client->retryRequest(rest::RequestType::GET, url, nullptr, 0));
// restore old settings
client->params().setMaxRetries(maxRetries);
client->params().setRetryWaitTime(retryWaitTime);
});
if (hasFailed(response.get())) {
return buildHttpError(response.get(), url, connection);
@ -565,10 +579,14 @@ Result buildHttpError(httpclient::SimpleHttpResult* response,
TRI_ASSERT(hasFailed(response));
if (response == nullptr || !response->isComplete()) {
std::string errorMsg;
connection.lease([&errorMsg](httpclient::SimpleHttpClient* client) {
errorMsg = client->getErrorMessage();
});
return Result(TRI_ERROR_REPLICATION_NO_RESPONSE,
std::string("could not connect to master at ") +
connection.endpoint() + " for URL " + url + ": " +
connection.client->getErrorMessage());
errorMsg);
}
TRI_ASSERT(response->wasHttpError());

View File

@ -27,8 +27,8 @@
#include <string>
#include <unordered_map>
#include <mutex>
#include "Basics/Mutex.h"
#include "Basics/Result.h"
#include "VocBase/ticks.h"
@ -53,8 +53,6 @@ namespace replutils {
extern std::string const ReplicationUrl;
struct Connection {
/// @brief the http client we're using
std::unique_ptr<httpclient::SimpleHttpClient> client;
Connection(Syncer* syncer,
ReplicationApplierConfiguration const& applierConfig);
@ -68,14 +66,33 @@ struct Connection {
/// @brief identifier for local server
std::string const& localServerId() const;
/// @brief Thread-safe aborted status
void setAborted(bool value);
/// @brief Thread-safe check aborted
bool isAborted() const;
/// @brief get an exclusive connection
template<typename F>
void lease(F&& func) & {
std::lock_guard<std::mutex> guard(_mutex);
std::forward<F>(func)(_client.get());
}
template<typename F>
void lease(F&& func) const& {
std::lock_guard<std::mutex> guard(_mutex);
std::forward<F>(func)(_client.get());
}
private:
std::string _endpointString;
std::string const _endpointString;
std::string const _localServerId;
mutable Mutex _mutex;
/// lock to protect client connection
mutable std::mutex _mutex;
/// @brief the http client we're using
std::unique_ptr<httpclient::SimpleHttpClient> _client;
};
struct ProgressInfo {
@ -96,7 +113,7 @@ struct ProgressInfo {
void set(std::string const& msg);
private:
Mutex _mutex;
std::mutex _mutex;
Setter _setter;
};

View File

@ -205,30 +205,21 @@ Result syncChunkRocksDB(
// time how long the request takes
double t = TRI_microtime();
response.reset(syncer._state.connection.client->retryRequest(rest::RequestType::PUT, url, nullptr, 0,
replutils::createHeaders()));
syncer._state.connection.lease([&](httpclient::SimpleHttpClient* client) {
response.reset(client->retryRequest(rest::RequestType::PUT, url, nullptr, 0,
replutils::createHeaders()));
});
stats.waitedForKeys += TRI_microtime() - t;
++stats.numKeysRequests;
}
if (response == nullptr || !response->isComplete()) {
return Result(TRI_ERROR_REPLICATION_NO_RESPONSE,
std::string("could not connect to master at ") +
syncer._state.master.endpoint + ": " +
syncer._state.connection.client->getErrorMessage());
if (replutils::hasFailed(response.get())) {
return replutils::buildHttpError(response.get(), url, syncer._state.connection);
}
}
TRI_ASSERT(response != nullptr);
if (response->wasHttpError()) {
return Result(TRI_ERROR_REPLICATION_MASTER_ERROR,
std::string("got invalid response from master at ") +
syncer._state.master.endpoint + ": HTTP " +
basics::StringUtils::itoa(response->getHttpReturnCode()) +
": " + response->getHttpReturnMessage());
}
VPackBuilder builder;
Result r = replutils::parseResponse(builder, response.get());
response.reset(); // not needed anymore
@ -403,33 +394,22 @@ Result syncChunkRocksDB(
double t = TRI_microtime();
response.reset(syncer._state.connection.client->retryRequest(
rest::RequestType::PUT, url, keyJsonString.data(),
keyJsonString.size(), replutils::createHeaders()));
syncer._state.connection.lease([&](httpclient::SimpleHttpClient* client) {
response.reset(client->retryRequest(rest::RequestType::PUT, url, keyJsonString.data(),
keyJsonString.size(), replutils::createHeaders()));
});
stats.waitedForDocs += TRI_microtime() - t;
stats.numDocsRequested += toFetch.size();
++stats.numDocsRequests;
}
if (response == nullptr || !response->isComplete()) {
return Result(TRI_ERROR_REPLICATION_NO_RESPONSE,
std::string("could not connect to master at ") +
syncer._state.master.endpoint + ": " +
syncer._state.connection.client->getErrorMessage());
if (replutils::hasFailed(response.get())) {
return replutils::buildHttpError(response.get(), url, syncer._state.connection);
}
}
TRI_ASSERT(response != nullptr);
if (response->wasHttpError()) {
return Result(
TRI_ERROR_REPLICATION_MASTER_ERROR,
std::string("got invalid response from master at ") +
syncer._state.master.endpoint + ": HTTP " +
basics::StringUtils::itoa(response->getHttpReturnCode()) + ": " +
response->getHttpReturnMessage());
}
transaction::BuilderLeaser docsBuilder(trx);
docsBuilder->clear();
Result r = replutils::parseResponse(*docsBuilder.get(), response.get());
@ -590,28 +570,20 @@ Result handleSyncKeysRocksDB(DatabaseInitialSyncer& syncer,
auto const headers = replutils::createHeaders();
double t = TRI_microtime();
response.reset(syncer._state.connection.client->retryRequest(rest::RequestType::GET, url, nullptr, 0, headers));
syncer._state.connection.lease([&](httpclient::SimpleHttpClient* client) {
response.reset(client->retryRequest(rest::RequestType::GET, url, nullptr, 0, headers));
});
stats.waitedForInitial += TRI_microtime() - t;
}
if (response == nullptr || !response->isComplete()) {
return Result(TRI_ERROR_REPLICATION_NO_RESPONSE,
std::string("could not connect to master at ") +
syncer._state.master.endpoint + ": " +
syncer._state.connection.client->getErrorMessage());
if (replutils::hasFailed(response.get())) {
return replutils::buildHttpError(response.get(), url, syncer._state.connection);
}
}
TRI_ASSERT(response != nullptr);
if (response->wasHttpError()) {
return Result(TRI_ERROR_REPLICATION_MASTER_ERROR,
std::string("got invalid response from master at ") +
syncer._state.master.endpoint + ": HTTP " +
basics::StringUtils::itoa(response->getHttpReturnCode()) +
": " + response->getHttpReturnMessage());
}
VPackBuilder builder;
Result r = replutils::parseResponse(builder, response.get());

View File

@ -236,7 +236,7 @@ SimpleHttpResult* SimpleHttpClient::doRequest(
std::unordered_map<std::string, std::string> const& headers) {
// ensure connection has not yet been invalidated
TRI_ASSERT(_connection != nullptr);
if (_aborted.load(std::memory_order_acquire)) {
if (isAborted()) {
return nullptr;
}

View File

@ -321,8 +321,10 @@ class SimpleHttpClient {
SimpleHttpClientParams& params() { return _params; };
/// @brief Thread-safe check abortion status
bool isAborted() const noexcept { return _aborted.load(std::memory_order_acquire); }
/// @brief Thread-safe set abortion status
void setAborted(bool value) noexcept;
private: