mirror of https://gitee.com/bigwinds/arangodb
Feature 3.4/medium priority (#6910)
This commit is contained in:
parent
fa235feeb2
commit
18de63c7c8
|
@ -433,10 +433,10 @@ void GeneralCommTask::addErrorResponse(rest::ResponseCode code,
|
|||
// thread. Depending on the number of running threads requests may be queued
|
||||
// and scheduled later when the number of used threads decreases
|
||||
bool GeneralCommTask::handleRequestSync(std::shared_ptr<RestHandler> handler) {
|
||||
auto const lane = handler->lane();
|
||||
auto const prio = handler->priority();
|
||||
auto self = shared_from_this();
|
||||
|
||||
bool ok = SchedulerFeature::SCHEDULER->queue(PriorityRequestLane(lane), [self, this, handler]() {
|
||||
bool ok = SchedulerFeature::SCHEDULER->queue(prio, [self, this, handler]() {
|
||||
handleRequestDirectly(basics::ConditionalLocking::DoLock,
|
||||
std::move(handler));
|
||||
});
|
||||
|
@ -485,7 +485,7 @@ bool GeneralCommTask::handleRequestAsync(std::shared_ptr<RestHandler> handler,
|
|||
|
||||
// callback will persist the response with the AsyncJobManager
|
||||
return SchedulerFeature::SCHEDULER->queue(
|
||||
PriorityRequestLane(handler->lane()), [self, handler] {
|
||||
handler->priority(), [self, handler] {
|
||||
handler->runHandler([](RestHandler* h) {
|
||||
GeneralServerFeature::JOB_MANAGER->finishAsyncJob(h);
|
||||
});
|
||||
|
@ -493,7 +493,7 @@ bool GeneralCommTask::handleRequestAsync(std::shared_ptr<RestHandler> handler,
|
|||
} else {
|
||||
// here the response will just be ignored
|
||||
return SchedulerFeature::SCHEDULER->queue(
|
||||
PriorityRequestLane(handler->lane()),
|
||||
handler->priority(),
|
||||
[self, handler] { handler->runHandler([](RestHandler*) {}); });
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,6 +26,8 @@
|
|||
#include "Basics/Common.h"
|
||||
|
||||
namespace arangodb {
|
||||
class GeneralRequest;
|
||||
|
||||
enum class RequestLane {
|
||||
// For requests that do not block or wait for something.
|
||||
// This ignores blocks that can occur when delivering
|
||||
|
@ -84,35 +86,8 @@ enum class RequestLane {
|
|||
// AGENCY_CALLBACK`
|
||||
};
|
||||
|
||||
enum class RequestPriority { HIGH, LOW };
|
||||
enum class RequestPriority { HIGH, MED, LOW };
|
||||
|
||||
inline RequestPriority PriorityRequestLane(RequestLane lane) {
|
||||
switch (lane) {
|
||||
case RequestLane::CLIENT_FAST:
|
||||
return RequestPriority::HIGH;
|
||||
case RequestLane::CLIENT_AQL:
|
||||
return RequestPriority::LOW;
|
||||
case RequestLane::CLIENT_V8:
|
||||
return RequestPriority::LOW;
|
||||
case RequestLane::CLIENT_SLOW:
|
||||
return RequestPriority::LOW;
|
||||
case RequestLane::AGENCY_INTERNAL:
|
||||
return RequestPriority::HIGH;
|
||||
case RequestLane::AGENCY_CLUSTER:
|
||||
return RequestPriority::LOW;
|
||||
case RequestLane::CLUSTER_INTERNAL:
|
||||
return RequestPriority::HIGH;
|
||||
case RequestLane::CLUSTER_V8:
|
||||
return RequestPriority::LOW;
|
||||
case RequestLane::CLUSTER_ADMIN:
|
||||
return RequestPriority::LOW;
|
||||
case RequestLane::SERVER_REPLICATION:
|
||||
return RequestPriority::LOW;
|
||||
case RequestLane::TASK_V8:
|
||||
return RequestPriority::LOW;
|
||||
}
|
||||
return RequestPriority::LOW;
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
||||
|
|
|
@ -311,6 +311,42 @@ void RestHandler::runHandlerStateMachine() {
|
|||
}
|
||||
}
|
||||
|
||||
RequestPriority RestHandler::priority(RequestLane l) const {
|
||||
RequestPriority p = RequestPriority::LOW;
|
||||
|
||||
switch (l) {
|
||||
case RequestLane::AGENCY_INTERNAL:
|
||||
case RequestLane::CLIENT_FAST:
|
||||
case RequestLane::CLUSTER_INTERNAL:
|
||||
p = RequestPriority::HIGH;
|
||||
break;
|
||||
|
||||
case RequestLane::CLIENT_AQL:
|
||||
case RequestLane::CLIENT_SLOW:
|
||||
case RequestLane::AGENCY_CLUSTER:
|
||||
case RequestLane::CLUSTER_ADMIN:
|
||||
case RequestLane::SERVER_REPLICATION:
|
||||
case RequestLane::CLIENT_V8:
|
||||
case RequestLane::CLUSTER_V8:
|
||||
case RequestLane::TASK_V8:
|
||||
p = RequestPriority::LOW;
|
||||
break;
|
||||
}
|
||||
|
||||
if (p == RequestPriority::HIGH) {
|
||||
return p;
|
||||
}
|
||||
|
||||
bool found;
|
||||
_request->header(StaticStrings::XArangoFrontend, found);
|
||||
|
||||
if (!found) {
|
||||
return p;
|
||||
}
|
||||
|
||||
return RequestPriority::MED;
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
// --SECTION-- private methods
|
||||
// -----------------------------------------------------------------------------
|
||||
|
|
|
@ -83,6 +83,14 @@ class RestHandler : public std::enable_shared_from_this<RestHandler> {
|
|||
/// @brief forwards the request to the appropriate server
|
||||
bool forwardRequest();
|
||||
|
||||
// The priority is derived from the lane.
|
||||
// Header fields might influence the priority.
|
||||
// In order to change the priority of a handler
|
||||
// adjust the lane, do not overwrite the priority
|
||||
// function!
|
||||
RequestPriority priority(RequestLane) const;
|
||||
RequestPriority priority() const {return priority(lane());}
|
||||
|
||||
public:
|
||||
// rest handler name for debugging and logging
|
||||
virtual char const* name() const = 0;
|
||||
|
|
|
@ -224,7 +224,7 @@ bool RestBatchHandler::executeNextHandler() {
|
|||
|
||||
// now scheduler the real handler
|
||||
bool ok = SchedulerFeature::SCHEDULER->queue(
|
||||
PriorityRequestLane(handler->lane()), [this, self, handler]() {
|
||||
handler->priority(), [this, self, handler]() {
|
||||
|
||||
// start to work for this handler
|
||||
// ignore any errors here, will be handled later by inspecting the response
|
||||
|
|
|
@ -118,7 +118,6 @@ RestStatus RestTestHandler::execute() {
|
|||
}
|
||||
|
||||
if (body.hasKey("workload")) {
|
||||
|
||||
auto workload = body.get("workload");
|
||||
|
||||
if (workload.isNumber()) {
|
||||
|
@ -134,7 +133,7 @@ RestStatus RestTestHandler::execute() {
|
|||
auto self(shared_from_this());
|
||||
|
||||
bool ok = SchedulerFeature::SCHEDULER->queue(
|
||||
PriorityRequestLane(res.get()),
|
||||
priority(res.get()),
|
||||
[this, self, duration]() {
|
||||
auto stop = clock::now() + duration;
|
||||
|
||||
|
|
|
@ -168,16 +168,18 @@ class arangodb::SchedulerThread : public Thread {
|
|||
Scheduler::Scheduler(uint64_t nrMinimum, uint64_t nrMaximum,
|
||||
uint64_t fifo1Size, uint64_t fifo2Size)
|
||||
: _counters(0),
|
||||
_maxFifoSize{fifo1Size, fifo2Size},
|
||||
_maxFifoSize{fifo1Size, fifo2Size, fifo2Size},
|
||||
_fifo1(_maxFifoSize[FIFO1]),
|
||||
_fifo2(_maxFifoSize[FIFO2]),
|
||||
_fifos{&_fifo1, &_fifo2},
|
||||
_fifo3(_maxFifoSize[FIFO3]),
|
||||
_fifos{&_fifo1, &_fifo2, &_fifo3},
|
||||
_minThreads(nrMinimum),
|
||||
_maxThreads(nrMaximum),
|
||||
_lastAllBusyStamp(0.0) {
|
||||
LOG_TOPIC(DEBUG, Logger::THREADS) << "Scheduler configuration min: " << nrMinimum << " max: " << nrMaximum;
|
||||
_fifoSize[FIFO1] = 0;
|
||||
_fifoSize[FIFO2] = 0;
|
||||
_fifoSize[FIFO3] = 0;
|
||||
|
||||
// setup signal handlers
|
||||
initializeSignalHandlers();
|
||||
|
@ -269,7 +271,7 @@ bool Scheduler::queue(RequestPriority prio,
|
|||
// or if the scheduler queue is already full, then
|
||||
// append it to the fifo2. Otherewise directly queue
|
||||
// it.
|
||||
case RequestPriority::LOW:
|
||||
case RequestPriority::MED:
|
||||
if (0 < _fifoSize[FIFO1] || 0 < _fifoSize[FIFO2] || !canPostDirectly(prio)) {
|
||||
ok = pushToFifo(FIFO2, callback);
|
||||
} else {
|
||||
|
@ -277,6 +279,18 @@ bool Scheduler::queue(RequestPriority prio,
|
|||
}
|
||||
break;
|
||||
|
||||
// If there is anything in the fifo1, fifo2, fifo3
|
||||
// or if the scheduler queue is already full, then
|
||||
// append it to the fifo2. Otherwise directly queue
|
||||
// it.
|
||||
case RequestPriority::LOW:
|
||||
if (0 < _fifoSize[FIFO1] || 0 < _fifoSize[FIFO2] || 0 < _fifoSize[FIFO3] || !canPostDirectly(prio)) {
|
||||
ok = pushToFifo(FIFO3, callback);
|
||||
} else {
|
||||
post(callback);
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
TRI_ASSERT(false);
|
||||
break;
|
||||
|
@ -300,6 +314,10 @@ void Scheduler::drain() {
|
|||
if (!found) {
|
||||
found = popFifo(FIFO2);
|
||||
}
|
||||
|
||||
if (!found) {
|
||||
found = popFifo(FIFO3);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -315,6 +333,8 @@ void Scheduler::addQueueStatistics(velocypack::Builder& b) const {
|
|||
b.add("fifo1-size", VPackValue(_maxFifoSize[FIFO1]));
|
||||
b.add("current-fifo2", VPackValue(_fifoSize[FIFO2]));
|
||||
b.add("fifo2-size", VPackValue(_maxFifoSize[FIFO2]));
|
||||
b.add("current-fifo3", VPackValue(_fifoSize[FIFO3]));
|
||||
b.add("fifo3-size", VPackValue(_maxFifoSize[FIFO3]));
|
||||
}
|
||||
|
||||
Scheduler::QueueStatistics Scheduler::queueStatistics() const {
|
||||
|
@ -324,7 +344,8 @@ Scheduler::QueueStatistics Scheduler::queueStatistics() const {
|
|||
numWorking(counters),
|
||||
numQueued(counters),
|
||||
static_cast<uint64_t>(_fifoSize[FIFO1]),
|
||||
static_cast<uint64_t>(_fifoSize[FIFO2])};
|
||||
static_cast<uint64_t>(_fifoSize[FIFO2]),
|
||||
static_cast<uint64_t>(_fifoSize[FIFO3])};
|
||||
}
|
||||
|
||||
std::string Scheduler::infoStatus() {
|
||||
|
@ -337,7 +358,9 @@ std::string Scheduler::infoStatus() {
|
|||
" F1 " + std::to_string(_fifoSize[FIFO1]) +
|
||||
" (<=" + std::to_string(_maxFifoSize[FIFO1]) + ") F2 " +
|
||||
std::to_string(_fifoSize[FIFO2]) +
|
||||
" (<=" + std::to_string(_maxFifoSize[FIFO2]) + ")";
|
||||
" (<=" + std::to_string(_maxFifoSize[FIFO2]) + ") F3 " +
|
||||
std::to_string(_fifoSize[FIFO3]) +
|
||||
" (<=" + std::to_string(_maxFifoSize[FIFO3]) + ")";
|
||||
}
|
||||
|
||||
bool Scheduler::canPostDirectly(RequestPriority prio) const noexcept {
|
||||
|
@ -349,6 +372,7 @@ bool Scheduler::canPostDirectly(RequestPriority prio) const noexcept {
|
|||
case RequestPriority::HIGH:
|
||||
return nrWorking + nrQueued < _maxThreads;
|
||||
|
||||
case RequestPriority::MED:
|
||||
case RequestPriority::LOW:
|
||||
return nrWorking + nrQueued < _maxThreads - _minThreads;
|
||||
}
|
||||
|
|
|
@ -93,6 +93,7 @@ class Scheduler {
|
|||
uint64_t _queued;
|
||||
uint64_t _fifo1;
|
||||
uint64_t _fifo2;
|
||||
uint64_t _fifo3;
|
||||
};
|
||||
|
||||
bool queue(RequestPriority prio, std::function<void()> const&);
|
||||
|
@ -184,15 +185,17 @@ class Scheduler {
|
|||
bool pushToFifo(int64_t fifo, std::function<void()> const& callback);
|
||||
bool popFifo(int64_t fifo);
|
||||
|
||||
static constexpr int64_t NUMBER_FIFOS = 2;
|
||||
static constexpr int64_t NUMBER_FIFOS = 3;
|
||||
static constexpr int64_t FIFO1 = 0;
|
||||
static constexpr int64_t FIFO2 = 1;
|
||||
static constexpr int64_t FIFO3 = 2;
|
||||
|
||||
uint64_t const _maxFifoSize[NUMBER_FIFOS];
|
||||
std::atomic<int64_t> _fifoSize[NUMBER_FIFOS];
|
||||
|
||||
boost::lockfree::queue<FifoJob*> _fifo1;
|
||||
boost::lockfree::queue<FifoJob*> _fifo2;
|
||||
boost::lockfree::queue<FifoJob*> _fifo3;
|
||||
boost::lockfree::queue<FifoJob*>* _fifos[NUMBER_FIFOS];
|
||||
|
||||
// the following methds create tasks in the `io_context`.
|
||||
|
|
|
@ -97,9 +97,9 @@ void SchedulerFeature::validateOptions(
|
|||
std::shared_ptr<options::ProgramOptions>) {
|
||||
if (_nrMaximalThreads == 0) {
|
||||
_nrMaximalThreads = defaultNumberOfThreads();
|
||||
if (_nrMinimalThreads == 0) {
|
||||
_nrMinimalThreads = _nrMaximalThreads / 2;
|
||||
}
|
||||
}
|
||||
if (_nrMinimalThreads == 0) {
|
||||
_nrMinimalThreads = _nrMaximalThreads / 2;
|
||||
}
|
||||
|
||||
if (_queueSize == 0) {
|
||||
|
|
|
@ -657,11 +657,10 @@ arangodb::Result LogicalCollection::appendVelocyPack(
|
|||
if (_keyGenerator != nullptr) {
|
||||
result.openObject();
|
||||
_keyGenerator->toVelocyPack(result);
|
||||
result.close();
|
||||
} else {
|
||||
result.openArray();
|
||||
result.close();
|
||||
}
|
||||
result.close();
|
||||
|
||||
// Physical Information
|
||||
getPhysical()->getPropertiesVPack(result);
|
||||
|
|
|
@ -283,7 +283,7 @@ std::function<void(const asio::error_code&)> Task::callbackFunction() {
|
|||
|
||||
// now do the work:
|
||||
SchedulerFeature::SCHEDULER->queue(
|
||||
PriorityRequestLane(RequestLane::TASK_V8), [self, this, execContext] {
|
||||
RequestPriority::LOW, [self, this, execContext] {
|
||||
ExecContextScope scope(_user.empty() ? ExecContext::superuser()
|
||||
: execContext.get());
|
||||
|
||||
|
|
|
@ -872,7 +872,10 @@ void TRI_vocbase_t::shutdown() {
|
|||
|
||||
// starts unloading of collections
|
||||
for (auto& collection : collections) {
|
||||
collection->close(); // required to release indexes
|
||||
{
|
||||
WRITE_LOCKER_EVENTUAL(locker, collection->lock());
|
||||
collection->close(); // required to release indexes
|
||||
}
|
||||
unloadCollection(collection.get(), true);
|
||||
}
|
||||
|
||||
|
@ -1845,6 +1848,7 @@ TRI_vocbase_t::~TRI_vocbase_t() {
|
|||
|
||||
// do a final cleanup of collections
|
||||
for (auto& it : _collections) {
|
||||
WRITE_LOCKER_EVENTUAL(locker, it->lock());
|
||||
it->close(); // required to release indexes
|
||||
}
|
||||
|
||||
|
|
File diff suppressed because one or more lines are too long
Binary file not shown.
|
@ -3547,4 +3547,4 @@ var cutByResolution = function (str) {
|
|||
</div>
|
||||
</div></script><script id="warningList.ejs" type="text/template"> <% if (warnings.length > 0) { %> <div>
|
||||
<ul> <% console.log(warnings); _.each(warnings, function(w) { console.log(w);%> <li><b><%=w.code%></b>: <%=w.message%></li> <% }); %> </ul>
|
||||
</div> <% } %> </script></head><body><nav class="navbar" style="display: none"><div class="primary"><div class="navlogo"><a class="logo big" href="#"><img id="ArangoDBLogo" class="arangodbLogo" src="img/arangodb-edition-optimized.svg"></a><a class="logo small" href="#"><img class="arangodbLogo" src="img/arangodb_logo_small.png"></a><a class="version"><span id="currentVersion"></span></a></div><div class="statmenu" id="statisticBar"></div><div class="navmenu" id="navigationBar"></div></div></nav><div id="modalPlaceholder"></div><div class="bodyWrapper" style="display: none"><div class="centralRow"><div id="navbar2" class="navbarWrapper secondary"><div class="subnavmenu" id="subNavigationBar"></div></div><div class="resizecontainer contentWrapper"><div id="loadingScreen" class="loadingScreen" style="display: none"><i class="fa fa-circle-o-notch fa-spin fa-3x fa-fw margin-bottom"></i> <span class="sr-only">Loading...</span></div><div id="content" class="centralContent"></div><footer class="footer"><div id="footerBar"></div></footer></div></div></div><div id="progressPlaceholder" style="display:none"></div><div id="spotlightPlaceholder" style="display:none"></div><div id="graphSettingsContent" style="display: none"></div><div id="filterSelectDiv" style="display:none"></div><div id="offlinePlaceholder" style="display:none"><div class="offline-div"><div class="pure-u"><div class="pure-u-1-4"></div><div class="pure-u-1-2 offline-window"><div class="offline-header"><h3>You have been disconnected from the server</h3></div><div class="offline-body"><p>The connection to the server has been lost. The server may be under heavy load.</p><p>Trying to reconnect in <span id="offlineSeconds">10</span> seconds.</p><p class="animation_state"><span><button class="button-success">Reconnect now</button></span></p></div></div><div class="pure-u-1-4"></div></div></div></div><div class="arangoFrame" style=""><div class="outerDiv"><div class="innerDiv"></div></div></div><script src="libs.js?version=1539440496398"></script><script src="app.js?version=1539440496398"></script></body></html>
|
||||
</div> <% } %> </script></head><body><nav class="navbar" style="display: none"><div class="primary"><div class="navlogo"><a class="logo big" href="#"><img id="ArangoDBLogo" class="arangodbLogo" src="img/arangodb-edition-optimized.svg"></a><a class="logo small" href="#"><img class="arangodbLogo" src="img/arangodb_logo_small.png"></a><a class="version"><span id="currentVersion"></span></a></div><div class="statmenu" id="statisticBar"></div><div class="navmenu" id="navigationBar"></div></div></nav><div id="modalPlaceholder"></div><div class="bodyWrapper" style="display: none"><div class="centralRow"><div id="navbar2" class="navbarWrapper secondary"><div class="subnavmenu" id="subNavigationBar"></div></div><div class="resizecontainer contentWrapper"><div id="loadingScreen" class="loadingScreen" style="display: none"><i class="fa fa-circle-o-notch fa-spin fa-3x fa-fw margin-bottom"></i> <span class="sr-only">Loading...</span></div><div id="content" class="centralContent"></div><footer class="footer"><div id="footerBar"></div></footer></div></div></div><div id="progressPlaceholder" style="display:none"></div><div id="spotlightPlaceholder" style="display:none"></div><div id="graphSettingsContent" style="display: none"></div><div id="filterSelectDiv" style="display:none"></div><div id="offlinePlaceholder" style="display:none"><div class="offline-div"><div class="pure-u"><div class="pure-u-1-4"></div><div class="pure-u-1-2 offline-window"><div class="offline-header"><h3>You have been disconnected from the server</h3></div><div class="offline-body"><p>The connection to the server has been lost. The server may be under heavy load.</p><p>Trying to reconnect in <span id="offlineSeconds">10</span> seconds.</p><p class="animation_state"><span><button class="button-success">Reconnect now</button></span></p></div></div><div class="pure-u-1-4"></div></div></div></div><div class="arangoFrame" style=""><div class="outerDiv"><div class="innerDiv"></div></div></div><script src="libs.js?version=1539717844351"></script><script src="app.js?version=1539717844351"></script></body></html>
|
Binary file not shown.
File diff suppressed because one or more lines are too long
Binary file not shown.
|
@ -6,6 +6,7 @@
|
|||
// We have to start the app only in production mode, not in test mode
|
||||
if (!window.hasOwnProperty('TEST_BUILD')) {
|
||||
$(document).ajaxSend(function (event, jqxhr, settings) {
|
||||
jqxhr.setRequestHeader('X-Arango-Frontend', 'true');
|
||||
var currentJwt = window.arangoHelper.getCurrentJwt();
|
||||
if (currentJwt) {
|
||||
jqxhr.setRequestHeader('Authorization', 'bearer ' + currentJwt);
|
||||
|
|
File diff suppressed because one or more lines are too long
|
@ -155,6 +155,7 @@ std::string const StaticStrings::Unlimited = "unlimited";
|
|||
std::string const StaticStrings::WwwAuthenticate("www-authenticate");
|
||||
std::string const StaticStrings::XContentTypeOptions("x-content-type-options");
|
||||
std::string const StaticStrings::XArangoNoLock("x-arango-nolock");
|
||||
std::string const StaticStrings::XArangoFrontend("x-arango-frontend");
|
||||
|
||||
// mime types
|
||||
std::string const StaticStrings::MimeTypeJson(
|
||||
|
|
|
@ -143,6 +143,7 @@ class StaticStrings {
|
|||
static std::string const WwwAuthenticate;
|
||||
static std::string const XContentTypeOptions;
|
||||
static std::string const XArangoNoLock;
|
||||
static std::string const XArangoFrontend;
|
||||
|
||||
// mime types
|
||||
static std::string const MimeTypeJson;
|
||||
|
|
Loading…
Reference in New Issue