mirror of https://gitee.com/bigwinds/arangodb
Merge branch 'devel' of https://github.com/arangodb/arangodb into devel
This commit is contained in:
commit
693a1eddf4
|
@ -77,6 +77,8 @@ GeneralCommTask::~GeneralCommTask() {
|
|||
// -----------------------------------------------------------------------------
|
||||
|
||||
void GeneralCommTask::setStatistics(uint64_t id, RequestStatistics* stat) {
|
||||
MUTEX_LOCKER(locker, _statisticsMutex);
|
||||
|
||||
auto iter = _statisticsMap.find(id);
|
||||
|
||||
if (iter == _statisticsMap.end()) {
|
||||
|
@ -180,7 +182,7 @@ void GeneralCommTask::processResponse(GeneralResponse* response) {
|
|||
<< "processResponse received a nullptr, closing connection";
|
||||
closeStream();
|
||||
} else {
|
||||
addResponse(response);
|
||||
addResponse(response, nullptr);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -191,6 +193,8 @@ RequestStatistics* GeneralCommTask::acquireStatistics(uint64_t id) {
|
|||
}
|
||||
|
||||
RequestStatistics* GeneralCommTask::statistics(uint64_t id) {
|
||||
MUTEX_LOCKER(locker, _statisticsMutex);
|
||||
|
||||
auto iter = _statisticsMap.find(id);
|
||||
|
||||
if (iter == _statisticsMap.end()) {
|
||||
|
@ -201,6 +205,8 @@ RequestStatistics* GeneralCommTask::statistics(uint64_t id) {
|
|||
}
|
||||
|
||||
RequestStatistics* GeneralCommTask::stealStatistics(uint64_t id) {
|
||||
MUTEX_LOCKER(locker, _statisticsMutex);
|
||||
|
||||
auto iter = _statisticsMap.find(id);
|
||||
|
||||
if (iter == _statisticsMap.end()) {
|
||||
|
@ -270,8 +276,8 @@ void GeneralCommTask::handleRequestDirectly(
|
|||
|
||||
auto self = shared_from_this();
|
||||
handler->initEngine(_loop, [self, this](RestHandler* h) {
|
||||
h->transferStatisticsTo(this);
|
||||
addResponse(h->response());
|
||||
RequestStatistics* stat = h->stealStatistics();
|
||||
addResponse(h->response(), stat);
|
||||
});
|
||||
|
||||
HandlerWorkStack monitor(handler);
|
||||
|
@ -298,7 +304,6 @@ bool GeneralCommTask::handleRequestAsync(std::shared_ptr<RestHandler> handler,
|
|||
if (store) {
|
||||
auto self = shared_from_this();
|
||||
handler->initEngine(_loop, [this, self](RestHandler* handler) {
|
||||
handler->transferStatisticsTo(this);
|
||||
GeneralServerFeature::JOB_MANAGER->finishAsyncJob(handler);
|
||||
});
|
||||
} else {
|
||||
|
|
|
@ -97,7 +97,7 @@ class GeneralCommTask : public SocketTask {
|
|||
virtual std::unique_ptr<GeneralResponse> createResponse(
|
||||
rest::ResponseCode, uint64_t messageId) = 0;
|
||||
|
||||
virtual void addResponse(GeneralResponse*) = 0;
|
||||
virtual void addResponse(GeneralResponse*, RequestStatistics*) = 0;
|
||||
|
||||
protected:
|
||||
void executeRequest(std::unique_ptr<GeneralRequest>&&,
|
||||
|
|
|
@ -71,7 +71,7 @@ HttpCommTask::HttpCommTask(EventLoop loop, GeneralServer* server,
|
|||
void HttpCommTask::handleSimpleError(rest::ResponseCode code,
|
||||
uint64_t /* messageId */) {
|
||||
std::unique_ptr<GeneralResponse> response(new HttpResponse(code));
|
||||
addResponse(response.get());
|
||||
addResponse(response.get(), stealStatistics(1UL));
|
||||
}
|
||||
|
||||
void HttpCommTask::handleSimpleError(rest::ResponseCode code, int errorNum,
|
||||
|
@ -89,7 +89,7 @@ void HttpCommTask::handleSimpleError(rest::ResponseCode code, int errorNum,
|
|||
|
||||
try {
|
||||
response->setPayload(builder.slice(), true, VPackOptions::Defaults);
|
||||
addResponse(response.get());
|
||||
addResponse(response.get(), stealStatistics(1UL));
|
||||
} catch (std::exception const& ex) {
|
||||
LOG_TOPIC(WARN, Logger::COMMUNICATION)
|
||||
<< "handleSimpleError received an exception, closing connection:"
|
||||
|
@ -102,7 +102,7 @@ void HttpCommTask::handleSimpleError(rest::ResponseCode code, int errorNum,
|
|||
}
|
||||
}
|
||||
|
||||
void HttpCommTask::addResponse(HttpResponse* response) {
|
||||
void HttpCommTask::addResponse(HttpResponse* response, RequestStatistics* stat) {
|
||||
resetKeepAlive();
|
||||
|
||||
// response has been queued, allow further requests
|
||||
|
@ -142,8 +142,6 @@ void HttpCommTask::addResponse(HttpResponse* response) {
|
|||
}
|
||||
|
||||
// reserve a buffer with some spare capacity
|
||||
RequestStatistics* stat = stealStatistics(1UL);
|
||||
|
||||
WriteBuffer buffer(
|
||||
new StringBuffer(TRI_UNKNOWN_MEM_ZONE, responseBodyLength + 128, false),
|
||||
stat);
|
||||
|
@ -264,7 +262,6 @@ bool HttpCommTask::processRead(double startTime) {
|
|||
_readBuffer.length() - 11);
|
||||
commTask->processRead(startTime);
|
||||
commTask->start();
|
||||
// statistics?!
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -364,10 +361,7 @@ bool HttpCommTask::processRead(double startTime) {
|
|||
// (original request object gets deleted before responding)
|
||||
_requestType = _incompleteRequest->requestType();
|
||||
|
||||
if (stat == nullptr) {
|
||||
stat = statistics(1UL);
|
||||
}
|
||||
|
||||
stat = statistics(1UL);
|
||||
RequestStatistics::SET_REQUEST_TYPE(stat, _requestType);
|
||||
|
||||
// handle different HTTP methods
|
||||
|
@ -494,12 +488,9 @@ bool HttpCommTask::processRead(double startTime) {
|
|||
return false;
|
||||
}
|
||||
|
||||
if (stat == nullptr) {
|
||||
stat = statistics(1UL);
|
||||
}
|
||||
|
||||
auto bytes = _bodyPosition - _startPosition + _bodyLength;
|
||||
|
||||
stat = statistics(1UL);
|
||||
RequestStatistics::SET_READ_END(stat);
|
||||
RequestStatistics::ADD_RECEIVED_BYTES(stat, bytes);
|
||||
|
||||
|
|
|
@ -25,14 +25,14 @@ class HttpCommTask final : public GeneralCommTask {
|
|||
};
|
||||
|
||||
// convert from GeneralResponse to httpResponse
|
||||
void addResponse(GeneralResponse* response) override {
|
||||
void addResponse(GeneralResponse* response, RequestStatistics* stat) override {
|
||||
HttpResponse* httpResponse = dynamic_cast<HttpResponse*>(response);
|
||||
|
||||
if (httpResponse == nullptr) {
|
||||
throw std::logic_error("invalid response or response Type");
|
||||
}
|
||||
|
||||
addResponse(httpResponse);
|
||||
addResponse(httpResponse, stat);
|
||||
};
|
||||
|
||||
protected:
|
||||
|
@ -54,7 +54,7 @@ class HttpCommTask final : public GeneralCommTask {
|
|||
|
||||
void resetState();
|
||||
|
||||
void addResponse(HttpResponse*);
|
||||
void addResponse(HttpResponse*, RequestStatistics* stat);
|
||||
|
||||
// check the content-length header of a request and fail it is broken
|
||||
bool checkContentLength(HttpRequest*, bool expectContentLength);
|
||||
|
|
|
@ -65,9 +65,10 @@ RestHandler::RestHandler(GeneralRequest* request, GeneralResponse* response)
|
|||
}
|
||||
|
||||
RestHandler::~RestHandler() {
|
||||
if (_statistics != nullptr) {
|
||||
_statistics->release();
|
||||
_statistics = nullptr;
|
||||
RequestStatistics* stat = _statistics.exchange(nullptr);
|
||||
|
||||
if (stat != nullptr) {
|
||||
stat->release();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -298,13 +299,6 @@ int RestHandler::runEngine(bool synchron) {
|
|||
return TRI_ERROR_INTERNAL;
|
||||
}
|
||||
|
||||
void RestHandler::transferStatisticsTo(GeneralCommTask* task) {
|
||||
auto statistics = _statistics;
|
||||
_statistics = nullptr;
|
||||
|
||||
task->setStatistics(messageId(), statistics);
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
// --SECTION-- protected methods
|
||||
// -----------------------------------------------------------------------------
|
||||
|
|
|
@ -52,7 +52,7 @@ class RestHandler : public std::enable_shared_from_this<RestHandler> {
|
|||
|
||||
public:
|
||||
RestHandler(GeneralRequest*, GeneralResponse*);
|
||||
~RestHandler();
|
||||
virtual ~RestHandler();
|
||||
|
||||
public:
|
||||
uint64_t handlerId() const { return _handlerId; }
|
||||
|
@ -69,13 +69,18 @@ class RestHandler : public std::enable_shared_from_this<RestHandler> {
|
|||
|
||||
std::shared_ptr<WorkContext> context() { return _context; }
|
||||
|
||||
RequestStatistics* statistics() const { return _statistics; }
|
||||
void setStatistics(RequestStatistics* stat) {
|
||||
if (_statistics != nullptr) {
|
||||
_statistics->release();
|
||||
}
|
||||
RequestStatistics* statistics() const { return _statistics.load(); }
|
||||
|
||||
_statistics = stat;
|
||||
RequestStatistics* stealStatistics() {
|
||||
return _statistics.exchange(nullptr);
|
||||
}
|
||||
|
||||
void setStatistics(RequestStatistics* stat) {
|
||||
RequestStatistics* old = _statistics.exchange(stat);
|
||||
|
||||
if (old != nullptr) {
|
||||
old->release();
|
||||
}
|
||||
}
|
||||
|
||||
public:
|
||||
|
@ -107,7 +112,7 @@ class RestHandler : public std::enable_shared_from_this<RestHandler> {
|
|||
|
||||
std::shared_ptr<WorkContext> _context;
|
||||
|
||||
RequestStatistics* _statistics;
|
||||
std::atomic<RequestStatistics*> _statistics;
|
||||
|
||||
private:
|
||||
bool _needsOwnThread = false;
|
||||
|
@ -130,8 +135,6 @@ class RestHandler : public std::enable_shared_from_this<RestHandler> {
|
|||
int runEngine(bool synchron);
|
||||
int finalizeEngine();
|
||||
|
||||
void transferStatisticsTo(rest::GeneralCommTask*);
|
||||
|
||||
private:
|
||||
RestEngine _engine;
|
||||
std::function<void(rest::RestHandler*)> _storeResult;
|
||||
|
|
|
@ -70,7 +70,7 @@ VppCommTask::VppCommTask(EventLoop loop, GeneralServer* server,
|
|||
_readBuffer.reserve(_bufferLength);
|
||||
}
|
||||
|
||||
void VppCommTask::addResponse(VppResponse* response) {
|
||||
void VppCommTask::addResponse(VppResponse* response, RequestStatistics* stat) {
|
||||
VPackMessageNoOwnBuffer response_message = response->prepareForNetwork();
|
||||
uint64_t const id = response_message._id;
|
||||
|
||||
|
@ -106,22 +106,27 @@ void VppCommTask::addResponse(VppResponse* response) {
|
|||
|
||||
// set some sensible maxchunk size and compression
|
||||
auto buffers = createChunkForNetwork(slices, id, chunkSize, false);
|
||||
size_t n = buffers.size() - 1;
|
||||
size_t c = 0;
|
||||
|
||||
RequestStatistics* stat = stealStatistics(id);
|
||||
double const totalTime = RequestStatistics::ELAPSED_SINCE_READ_START(stat);
|
||||
|
||||
for (auto&& buffer : buffers) {
|
||||
if (c == n) {
|
||||
WriteBuffer b(buffer.release(), stat);
|
||||
addWriteBuffer(b);
|
||||
} else {
|
||||
WriteBuffer b(buffer.release(), nullptr);
|
||||
addWriteBuffer(b);
|
||||
if (buffers.empty()) {
|
||||
if (stat != nullptr) {
|
||||
stat->release();
|
||||
}
|
||||
} else {
|
||||
size_t n = buffers.size() - 1;
|
||||
size_t c = 0;
|
||||
|
||||
++c;
|
||||
for (auto&& buffer : buffers) {
|
||||
if (c == n) {
|
||||
WriteBuffer b(buffer.release(), stat);
|
||||
addWriteBuffer(b);
|
||||
} else {
|
||||
WriteBuffer b(buffer.release(), nullptr);
|
||||
addWriteBuffer(b);
|
||||
}
|
||||
|
||||
++c;
|
||||
}
|
||||
}
|
||||
|
||||
// and give some request information
|
||||
|
|
|
@ -46,12 +46,14 @@ class VppCommTask : public GeneralCommTask {
|
|||
|
||||
// convert from GeneralResponse to vppResponse ad dispatch request to class
|
||||
// internal addResponse
|
||||
void addResponse(GeneralResponse* response) override {
|
||||
void addResponse(GeneralResponse* response, RequestStatistics* stat) override {
|
||||
VppResponse* vppResponse = dynamic_cast<VppResponse*>(response);
|
||||
|
||||
if (vppResponse == nullptr) {
|
||||
throw std::logic_error("invalid response or response Type");
|
||||
}
|
||||
addResponse(vppResponse);
|
||||
|
||||
addResponse(vppResponse, stat);
|
||||
};
|
||||
|
||||
arangodb::Endpoint::TransportType transportType() override {
|
||||
|
@ -67,10 +69,12 @@ class VppCommTask : public GeneralCommTask {
|
|||
rest::ResponseCode, uint64_t messageId) override final;
|
||||
|
||||
void handleAuthentication(VPackSlice const& header, uint64_t messageId);
|
||||
|
||||
void handleSimpleError(rest::ResponseCode code, uint64_t id) override {
|
||||
VppResponse response(code, id);
|
||||
addResponse(&response);
|
||||
addResponse(&response, nullptr);
|
||||
}
|
||||
|
||||
void handleSimpleError(rest::ResponseCode, int code,
|
||||
std::string const& errorMessage,
|
||||
uint64_t messageId) override;
|
||||
|
@ -80,7 +84,7 @@ class VppCommTask : public GeneralCommTask {
|
|||
// request handling aborts prematurely
|
||||
void closeTask(rest::ResponseCode code = rest::ResponseCode::SERVER_ERROR);
|
||||
|
||||
void addResponse(VppResponse*);
|
||||
void addResponse(VppResponse*, RequestStatistics* stat);
|
||||
rest::ResponseCode authenticateRequest(GeneralRequest* request);
|
||||
|
||||
private:
|
||||
|
|
|
@ -66,7 +66,7 @@ class ConnectionStatistics {
|
|||
_error = false;
|
||||
}
|
||||
|
||||
static size_t const QUEUE_SIZE = 1000;
|
||||
static size_t const QUEUE_SIZE = 5000;
|
||||
|
||||
static Mutex _dataLock;
|
||||
|
||||
|
|
|
@ -179,15 +179,14 @@ void RequestStatistics::process(RequestStatistics* statistics) {
|
|||
|
||||
void RequestStatistics::release() {
|
||||
TRI_ASSERT(!_released);
|
||||
TRI_ASSERT(!_inQueue);
|
||||
|
||||
if (!_ignore) {
|
||||
TRI_ASSERT(!_inQueue);
|
||||
_inQueue = true;
|
||||
bool ok = _finishedList.push(this);
|
||||
TRI_ASSERT(ok);
|
||||
} else {
|
||||
reset();
|
||||
_released = true;
|
||||
bool ok = _freeList.push(this);
|
||||
TRI_ASSERT(ok);
|
||||
}
|
||||
|
|
|
@ -60,7 +60,7 @@ class RequestStatistics {
|
|||
|
||||
static void SET_READ_START(RequestStatistics* stat, double start) {
|
||||
if (stat != nullptr) {
|
||||
if (stat->_readStart == 0.0 && stat->_readStart == 0.0) {
|
||||
if (stat->_readStart == 0.0) {
|
||||
stat->_readStart = start;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -602,6 +602,7 @@ LogicalCollection::LogicalCollection(TRI_vocbase_t* vocbase,
|
|||
StorageEngine* engine = EngineSelectorFeature::ENGINE;
|
||||
if (_path.empty()) {
|
||||
_path = engine->createCollection(_vocbase, _cid, this);
|
||||
ensureRevisionsCache();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -59,31 +59,38 @@
|
|||
},
|
||||
|
||||
render: function () {
|
||||
var self = this;
|
||||
|
||||
var callback = function (error, db) {
|
||||
if (error) {
|
||||
arangoHelper.arangoError('DB', 'Could not get current db properties');
|
||||
} else {
|
||||
this.currentDB = db;
|
||||
// sorting
|
||||
this.collection.sort();
|
||||
self.currentDB = db;
|
||||
|
||||
$(this.el).html(this.template.render({
|
||||
collection: this.collection,
|
||||
searchString: '',
|
||||
currentDB: this.currentDB
|
||||
}));
|
||||
self.collection.fetch({
|
||||
success: function () {
|
||||
// sorting
|
||||
self.collection.sort();
|
||||
|
||||
if (this.dropdownVisible === true) {
|
||||
$('#dbSortDesc').attr('checked', this.collection.sortOptions.desc);
|
||||
$('#databaseToggle').toggleClass('activated');
|
||||
$('#databaseDropdown2').show();
|
||||
}
|
||||
$(self.el).html(self.template.render({
|
||||
collection: self.collection,
|
||||
searchString: '',
|
||||
currentDB: self.currentDB
|
||||
}));
|
||||
|
||||
arangoHelper.setCheckboxStatus('#databaseDropdown');
|
||||
if (self.dropdownVisible === true) {
|
||||
$('#dbSortDesc').attr('checked', self.collection.sortOptions.desc);
|
||||
$('#databaseToggle').toggleClass('activated');
|
||||
$('#databaseDropdown2').show();
|
||||
}
|
||||
|
||||
this.replaceSVGs();
|
||||
arangoHelper.setCheckboxStatus('#databaseDropdown');
|
||||
|
||||
self.replaceSVGs();
|
||||
}
|
||||
});
|
||||
}
|
||||
}.bind(this);
|
||||
};
|
||||
|
||||
this.collection.getCurrentDatabase(callback);
|
||||
|
||||
|
|
|
@ -338,7 +338,11 @@ SimpleHttpResult* SimpleHttpClient::doRequest(
|
|||
break;
|
||||
}
|
||||
|
||||
else if (_state == IN_READ_BODY && !_result->hasContentLength()) {
|
||||
if (_state == IN_READ_HEADER) {
|
||||
processHeader();
|
||||
}
|
||||
|
||||
if (_state == IN_READ_BODY && !_result->hasContentLength()) {
|
||||
// If we are reading the body and no content length was
|
||||
// found in the header, then we must read until no more
|
||||
// progress is made (but without an error), this then means
|
||||
|
@ -346,24 +350,18 @@ SimpleHttpResult* SimpleHttpClient::doRequest(
|
|||
// process the body one more time:
|
||||
_result->setContentLength(_readBuffer.length() - _readBufferOffset);
|
||||
processBody();
|
||||
} else if (_state == IN_READ_BODY) {
|
||||
processBody();
|
||||
}
|
||||
|
||||
if (_state != FINISHED) {
|
||||
// If the body was not fully found we give up:
|
||||
this->close();
|
||||
_state = DEAD;
|
||||
setErrorMessage("Got unexpected response from remote");
|
||||
}
|
||||
|
||||
break;
|
||||
if (_state != FINISHED) {
|
||||
// If the body was not fully found we give up:
|
||||
this->close();
|
||||
_state = DEAD;
|
||||
setErrorMessage("Got unexpected response from remote");
|
||||
}
|
||||
|
||||
else {
|
||||
// In all other cases of closed connection, we are doomed:
|
||||
this->close();
|
||||
_state = DEAD;
|
||||
setErrorMessage("Got unexpected response from remote");
|
||||
break;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
// the connection is still alive:
|
||||
|
|
Loading…
Reference in New Issue