mirror of https://gitee.com/bigwinds/arangodb
exclude VST requests from direct execution (#9075)
This commit is contained in:
parent
fb0fdb6629
commit
3d79491f36
|
@ -456,12 +456,10 @@ bool GeneralCommTask::handleRequestSync(std::shared_ptr<RestHandler> handler) {
|
|||
return false;
|
||||
}
|
||||
|
||||
auto const lane = handler->getRequestLane();
|
||||
|
||||
bool ok = SchedulerFeature::SCHEDULER->queue(lane, [self = shared_from_this(), handler]() {
|
||||
bool ok = SchedulerFeature::SCHEDULER->queue(handler->getRequestLane(), [self = shared_from_this(), handler]() {
|
||||
auto thisPtr = static_cast<GeneralCommTask*>(self.get());
|
||||
thisPtr->handleRequestDirectly(basics::ConditionalLocking::DoLock, handler);
|
||||
});
|
||||
}, allowDirectHandling());
|
||||
|
||||
if (!ok) {
|
||||
addErrorResponse(rest::ResponseCode::SERVICE_UNAVAILABLE,
|
||||
|
|
|
@ -109,6 +109,10 @@ class GeneralCommTask : public SocketTask {
|
|||
/// @brief send the response to the client.
|
||||
virtual void addResponse(GeneralResponse&, RequestStatistics*) = 0;
|
||||
|
||||
/// @brief whether or not requests of this CommTask can be executed directly,
|
||||
/// inside the IO thread
|
||||
virtual bool allowDirectHandling() const = 0;
|
||||
|
||||
protected:
|
||||
enum class RequestFlow : bool { Continue = true, Abort = false };
|
||||
|
||||
|
|
|
@ -41,6 +41,8 @@ class HttpCommTask final : public GeneralCommTask {
|
|||
void addSimpleResponse(rest::ResponseCode, rest::ContentType, uint64_t messageId,
|
||||
velocypack::Buffer<uint8_t>&&) override;
|
||||
|
||||
bool allowDirectHandling() const override final { return true; }
|
||||
|
||||
private:
|
||||
void processRequest(std::unique_ptr<HttpRequest>);
|
||||
void processCorsOptions(std::unique_ptr<HttpRequest>);
|
||||
|
|
|
@ -36,9 +36,6 @@
|
|||
#include "Logger/LoggerFeature.h"
|
||||
#include "Meta/conversion.h"
|
||||
#include "RestServer/ServerFeature.h"
|
||||
#include "Scheduler/Scheduler.h"
|
||||
#include "Scheduler/SchedulerFeature.h"
|
||||
#include "VocBase/ticks.h"
|
||||
|
||||
#include <stdexcept>
|
||||
|
||||
|
@ -51,6 +48,8 @@ using namespace arangodb::basics;
|
|||
using namespace arangodb::rest;
|
||||
|
||||
// throws on error
|
||||
namespace {
|
||||
|
||||
inline void validateMessage(char const* vpStart, char const* vpEnd) {
|
||||
VPackOptions validationOptions = VPackOptions::Defaults;
|
||||
validationOptions.validateUtf8Strings = true;
|
||||
|
@ -79,6 +78,8 @@ inline void validateMessage(char const* vpStart, char const* vpEnd) {
|
|||
}
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
VstCommTask::VstCommTask(GeneralServer& server, GeneralServer::IoContext& context,
|
||||
std::unique_ptr<Socket> socket, ConnectionInfo&& info,
|
||||
double timeout, ProtocolVersion protocolVersion, bool skipInit)
|
||||
|
@ -442,7 +443,7 @@ bool VstCommTask::getMessageFromSingleChunk(ChunkHeader const& chunkHeader,
|
|||
LOG_TOPIC("97c38", DEBUG, Logger::COMMUNICATION) << "VstCommTask: "
|
||||
<< "chunk contains single message";
|
||||
try {
|
||||
validateMessage(vpackBegin, chunkEnd);
|
||||
::validateMessage(vpackBegin, chunkEnd);
|
||||
} catch (std::exception const& e) {
|
||||
addSimpleResponse(rest::ResponseCode::BAD, rest::ContentType::VPACK,
|
||||
chunkHeader._messageID, VPackBuffer<uint8_t>());
|
||||
|
@ -526,9 +527,9 @@ bool VstCommTask::getMessageFromMultiChunks(ChunkHeader const& chunkHeader,
|
|||
LOG_TOPIC("c6cce", DEBUG, Logger::COMMUNICATION) << "VstCommTask: "
|
||||
<< "chunk completes a message";
|
||||
try {
|
||||
validateMessage(reinterpret_cast<char const*>(im._buffer.data()),
|
||||
reinterpret_cast<char const*>(im._buffer.data() +
|
||||
im._buffer.byteSize()));
|
||||
::validateMessage(reinterpret_cast<char const*>(im._buffer.data()),
|
||||
reinterpret_cast<char const*>(im._buffer.data() +
|
||||
im._buffer.byteSize()));
|
||||
} catch (std::exception const& e) {
|
||||
addErrorResponse(rest::ResponseCode::BAD, rest::ContentType::VPACK,
|
||||
chunkHeader._messageID, TRI_ERROR_BAD_PARAMETER, e.what());
|
||||
|
|
|
@ -65,6 +65,8 @@ class VstCommTask final : public GeneralCommTask {
|
|||
// internal addResponse
|
||||
void addResponse(GeneralResponse&, RequestStatistics*) override;
|
||||
|
||||
bool allowDirectHandling() const override final { return false; }
|
||||
|
||||
private:
|
||||
// process the VST 1000 request type
|
||||
void handleAuthHeader(VPackSlice const& header, uint64_t messageId);
|
||||
|
|
|
@ -56,7 +56,7 @@ class Scheduler {
|
|||
typedef std::shared_ptr<WorkItem> WorkHandle;
|
||||
|
||||
// Enqueues a task - this is implemented on the specific scheduler
|
||||
virtual bool queue(RequestLane lane, std::function<void()>) = 0;
|
||||
virtual bool queue(RequestLane lane, std::function<void()>, bool allowDirectHandling = false) = 0;
|
||||
|
||||
// Enqueues a task after delay - this uses the queue functions above.
|
||||
// WorkHandle is a shared_ptr to a WorkItem. If all references the WorkItem
|
||||
|
|
|
@ -116,8 +116,9 @@ SupervisedScheduler::SupervisedScheduler(uint64_t minThreads, uint64_t maxThread
|
|||
|
||||
SupervisedScheduler::~SupervisedScheduler() {}
|
||||
|
||||
bool SupervisedScheduler::queue(RequestLane lane, std::function<void()> handler) {
|
||||
bool SupervisedScheduler::queue(RequestLane lane, std::function<void()> handler, bool allowDirectHandling) {
|
||||
if (!isDirectDeadlockLane(lane) &&
|
||||
allowDirectHandling &&
|
||||
!ServerState::instance()->isClusterRole() &&
|
||||
(_jobsSubmitted - _jobsDone) < 2) {
|
||||
_jobsSubmitted.fetch_add(1, std::memory_order_relaxed);
|
||||
|
|
|
@ -44,7 +44,7 @@ class SupervisedScheduler final : public Scheduler {
|
|||
uint64_t fifo1Size, uint64_t fifo2Size);
|
||||
virtual ~SupervisedScheduler();
|
||||
|
||||
bool queue(RequestLane lane, std::function<void()>) override;
|
||||
bool queue(RequestLane lane, std::function<void()>, bool allowDirectHandling = false) override;
|
||||
|
||||
private:
|
||||
std::atomic<size_t> _numWorker;
|
||||
|
|
Loading…
Reference in New Issue