diff --git a/arangod/GeneralServer/GeneralCommTask.cpp b/arangod/GeneralServer/GeneralCommTask.cpp index 3cfea152b1..f4c1b0cbe7 100644 --- a/arangod/GeneralServer/GeneralCommTask.cpp +++ b/arangod/GeneralServer/GeneralCommTask.cpp @@ -456,12 +456,10 @@ bool GeneralCommTask::handleRequestSync(std::shared_ptr handler) { return false; } - auto const lane = handler->getRequestLane(); - - bool ok = SchedulerFeature::SCHEDULER->queue(lane, [self = shared_from_this(), handler]() { + bool ok = SchedulerFeature::SCHEDULER->queue(handler->getRequestLane(), [self = shared_from_this(), handler]() { auto thisPtr = static_cast(self.get()); thisPtr->handleRequestDirectly(basics::ConditionalLocking::DoLock, handler); - }); + }, allowDirectHandling()); if (!ok) { addErrorResponse(rest::ResponseCode::SERVICE_UNAVAILABLE, diff --git a/arangod/GeneralServer/GeneralCommTask.h b/arangod/GeneralServer/GeneralCommTask.h index 9b8f62f017..a8075c5de7 100644 --- a/arangod/GeneralServer/GeneralCommTask.h +++ b/arangod/GeneralServer/GeneralCommTask.h @@ -109,6 +109,10 @@ class GeneralCommTask : public SocketTask { /// @brief send the response to the client. virtual void addResponse(GeneralResponse&, RequestStatistics*) = 0; + /// @brief whether or not requests of this CommTask can be executed directly, + /// inside the IO thread + virtual bool allowDirectHandling() const = 0; + protected: enum class RequestFlow : bool { Continue = true, Abort = false }; diff --git a/arangod/GeneralServer/HttpCommTask.h b/arangod/GeneralServer/HttpCommTask.h index f1d74cd1a8..25dd1d919f 100644 --- a/arangod/GeneralServer/HttpCommTask.h +++ b/arangod/GeneralServer/HttpCommTask.h @@ -41,6 +41,8 @@ class HttpCommTask final : public GeneralCommTask { void addSimpleResponse(rest::ResponseCode, rest::ContentType, uint64_t messageId, velocypack::Buffer&&) override; + bool allowDirectHandling() const override final { return true; } + private: void processRequest(std::unique_ptr); void processCorsOptions(std::unique_ptr); diff --git a/arangod/GeneralServer/VstCommTask.cpp b/arangod/GeneralServer/VstCommTask.cpp index 5f45ac9658..4b07863d10 100644 --- a/arangod/GeneralServer/VstCommTask.cpp +++ b/arangod/GeneralServer/VstCommTask.cpp @@ -36,9 +36,6 @@ #include "Logger/LoggerFeature.h" #include "Meta/conversion.h" #include "RestServer/ServerFeature.h" -#include "Scheduler/Scheduler.h" -#include "Scheduler/SchedulerFeature.h" -#include "VocBase/ticks.h" #include @@ -51,6 +48,8 @@ using namespace arangodb::basics; using namespace arangodb::rest; // throws on error +namespace { + inline void validateMessage(char const* vpStart, char const* vpEnd) { VPackOptions validationOptions = VPackOptions::Defaults; validationOptions.validateUtf8Strings = true; @@ -79,6 +78,8 @@ inline void validateMessage(char const* vpStart, char const* vpEnd) { } } +} // namespace + VstCommTask::VstCommTask(GeneralServer& server, GeneralServer::IoContext& context, std::unique_ptr socket, ConnectionInfo&& info, double timeout, ProtocolVersion protocolVersion, bool skipInit) @@ -442,7 +443,7 @@ bool VstCommTask::getMessageFromSingleChunk(ChunkHeader const& chunkHeader, LOG_TOPIC("97c38", DEBUG, Logger::COMMUNICATION) << "VstCommTask: " << "chunk contains single message"; try { - validateMessage(vpackBegin, chunkEnd); + ::validateMessage(vpackBegin, chunkEnd); } catch (std::exception const& e) { addSimpleResponse(rest::ResponseCode::BAD, rest::ContentType::VPACK, chunkHeader._messageID, VPackBuffer()); @@ -526,9 +527,9 @@ bool VstCommTask::getMessageFromMultiChunks(ChunkHeader const& chunkHeader, LOG_TOPIC("c6cce", DEBUG, Logger::COMMUNICATION) << "VstCommTask: " << "chunk completes a message"; try { - validateMessage(reinterpret_cast(im._buffer.data()), - reinterpret_cast(im._buffer.data() + - im._buffer.byteSize())); + ::validateMessage(reinterpret_cast(im._buffer.data()), + reinterpret_cast(im._buffer.data() + + im._buffer.byteSize())); } catch (std::exception const& e) { addErrorResponse(rest::ResponseCode::BAD, rest::ContentType::VPACK, chunkHeader._messageID, TRI_ERROR_BAD_PARAMETER, e.what()); diff --git a/arangod/GeneralServer/VstCommTask.h b/arangod/GeneralServer/VstCommTask.h index 2809da9e92..9da8c3453d 100644 --- a/arangod/GeneralServer/VstCommTask.h +++ b/arangod/GeneralServer/VstCommTask.h @@ -65,6 +65,8 @@ class VstCommTask final : public GeneralCommTask { // internal addResponse void addResponse(GeneralResponse&, RequestStatistics*) override; + bool allowDirectHandling() const override final { return false; } + private: // process the VST 1000 request type void handleAuthHeader(VPackSlice const& header, uint64_t messageId); diff --git a/arangod/Scheduler/Scheduler.h b/arangod/Scheduler/Scheduler.h index f9f3677c07..004805a899 100644 --- a/arangod/Scheduler/Scheduler.h +++ b/arangod/Scheduler/Scheduler.h @@ -56,7 +56,7 @@ class Scheduler { typedef std::shared_ptr WorkHandle; // Enqueues a task - this is implemented on the specific scheduler - virtual bool queue(RequestLane lane, std::function) = 0; + virtual bool queue(RequestLane lane, std::function, bool allowDirectHandling = false) = 0; // Enqueues a task after delay - this uses the queue functions above. // WorkHandle is a shared_ptr to a WorkItem. If all references the WorkItem diff --git a/arangod/Scheduler/SupervisedScheduler.cpp b/arangod/Scheduler/SupervisedScheduler.cpp index 6915f98ea0..66980d1492 100644 --- a/arangod/Scheduler/SupervisedScheduler.cpp +++ b/arangod/Scheduler/SupervisedScheduler.cpp @@ -116,8 +116,9 @@ SupervisedScheduler::SupervisedScheduler(uint64_t minThreads, uint64_t maxThread SupervisedScheduler::~SupervisedScheduler() {} -bool SupervisedScheduler::queue(RequestLane lane, std::function handler) { +bool SupervisedScheduler::queue(RequestLane lane, std::function handler, bool allowDirectHandling) { if (!isDirectDeadlockLane(lane) && + allowDirectHandling && !ServerState::instance()->isClusterRole() && (_jobsSubmitted - _jobsDone) < 2) { _jobsSubmitted.fetch_add(1, std::memory_order_relaxed); diff --git a/arangod/Scheduler/SupervisedScheduler.h b/arangod/Scheduler/SupervisedScheduler.h index 4cb9f12095..e0d6122cf5 100644 --- a/arangod/Scheduler/SupervisedScheduler.h +++ b/arangod/Scheduler/SupervisedScheduler.h @@ -44,7 +44,7 @@ class SupervisedScheduler final : public Scheduler { uint64_t fifo1Size, uint64_t fifo2Size); virtual ~SupervisedScheduler(); - bool queue(RequestLane lane, std::function) override; + bool queue(RequestLane lane, std::function, bool allowDirectHandling = false) override; private: std::atomic _numWorker;