1
0
Fork 0

Merge branch 'engine-vs-velocystream' of https://github.com/arangodb/arangodb into generic-col-types

This commit is contained in:
jsteemann 2016-08-26 14:29:45 +02:00
commit c44649da00
10 changed files with 246 additions and 64 deletions

View File

@ -37,7 +37,7 @@ endif ()
set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
# where to find CMAKE modules
set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} ${CMAKE_SOURCE_DIR}/cmake)
set(CMAKE_MODULE_PATH ${CMAKE_SOURCE_DIR}/cmake ${CMAKE_MODULE_PATH})
# be verbose about flags used
option(VERBOSE OFF)

View File

@ -193,7 +193,7 @@ add_executable(${BIN_ARANGOD}
GeneralServer/GeneralServer.cpp
GeneralServer/GeneralServerFeature.cpp
GeneralServer/HttpCommTask.cpp
GeneralServer/HttpServerJob.cpp
GeneralServer/GeneralServerJob.cpp
GeneralServer/HttpsCommTask.cpp
GeneralServer/PathHandler.cpp
GeneralServer/RestHandler.cpp

View File

@ -945,9 +945,8 @@ void ClusterComm::asyncAnswer(std::string& coordinatorHeader,
// We add this result to the operation struct without acquiring
// a lock, since we know that only we do such a thing:
std::unique_ptr<httpclient::SimpleHttpResult> result(
client->request(rest::RequestType::PUT, "/_api/shard-comm",
body, len, headers));
std::unique_ptr<httpclient::SimpleHttpResult> result(client->request(
rest::RequestType::PUT, "/_api/shard-comm", body, len, headers));
if (result.get() == nullptr || !result->isComplete()) {
cm->brokenConnection(connection);
client->invalidateConnection();
@ -1273,9 +1272,8 @@ size_t ClusterComm::performRequests(std::vector<ClusterCommRequest>& requests,
requests[index].result = res;
// In this case we will retry:
dueTime[index] = (std::min)(10.0,
(std::max)(0.2, 2 * (now - startTime))) +
now;
dueTime[index] =
(std::min)(10.0, (std::max)(0.2, 2 * (now - startTime))) + now;
if (dueTime[index] >= endTime) {
requests[index].done = true;
nrDone++;
@ -1362,6 +1360,13 @@ size_t ClusterComm::performSingleRequest(
rest::ContentType type = rest::ContentType::JSON;
basics::StringBuffer& buffer = req.result.result->getBody();
// PERFORMANCE TODO (fc) (max) (obi)
// body() could return a basic_string_ref
// The FakeRequest Replacement does a copy of the body and is not as fast
// as the original
// auto answer = new FakeRequest(type, buffer.c_str(),
// static_cast<int64_t>(buffer.length()));
// answer->setHeaders(req.result.result->getHeaderFields());
@ -1371,8 +1376,8 @@ size_t ClusterComm::performSingleRequest(
req.result.result->getHeaderFields());
req.result.answer.reset(static_cast<GeneralRequest*>(answer));
req.result.answer_code = static_cast<rest::ResponseCode>(
req.result.result->getHttpReturnCode());
req.result.answer_code =
static_cast<rest::ResponseCode>(req.result.result->getHttpReturnCode());
return (req.result.answer_code == rest::ResponseCode::OK ||
req.result.answer_code == rest::ResponseCode::CREATED ||
req.result.answer_code == rest::ResponseCode::ACCEPTED)
@ -1443,17 +1448,18 @@ void ClusterCommThread::run() {
}
} else {
if (nullptr != op->body.get()) {
LOG(DEBUG) << "sending "
<< arangodb::HttpRequest::translateMethod(op->reqtype)
.c_str() << " request to DB server '"
<< op->result.serverID << "' at endpoint '"
<< op->result.endpoint << "': " << op->body->c_str();
LOG(DEBUG)
<< "sending "
<< arangodb::HttpRequest::translateMethod(op->reqtype).c_str()
<< " request to DB server '" << op->result.serverID
<< "' at endpoint '" << op->result.endpoint
<< "': " << op->body->c_str();
} else {
LOG(DEBUG) << "sending "
<< arangodb::HttpRequest::translateMethod(op->reqtype)
.c_str() << " request to DB server '"
<< op->result.serverID << "' at endpoint '"
<< op->result.endpoint << "'";
LOG(DEBUG)
<< "sending "
<< arangodb::HttpRequest::translateMethod(op->reqtype).c_str()
<< " request to DB server '" << op->result.serverID
<< "' at endpoint '" << op->result.endpoint << "'";
}
auto client =

View File

@ -25,7 +25,7 @@
#include "Basics/ReadLocker.h"
#include "Basics/WriteLocker.h"
#include "GeneralServer/HttpServerJob.h"
#include "GeneralServer/GeneralServerJob.h"
#include "GeneralServer/RestHandler.h"
#include "Logger/Logger.h"
#include "Rest/GeneralResponse.h"
@ -260,7 +260,7 @@ std::vector<AsyncJobResult::IdType> AsyncJobManager::byStatus(
/// @brief initializes an async job
////////////////////////////////////////////////////////////////////////////////
void AsyncJobManager::initAsyncJob(HttpServerJob* job, char const* hdr) {
void AsyncJobManager::initAsyncJob(GeneralServerJob* job, char const* hdr) {
AsyncCallbackContext* ctx = nullptr;
if (hdr != nullptr) {

View File

@ -32,7 +32,7 @@ class GeneralResponse;
namespace rest {
class AsyncCallbackContext;
class HttpServerJob;
class GeneralServerJob;
////////////////////////////////////////////////////////////////////////////////
/// @brief AsyncJobResult
@ -133,7 +133,7 @@ class AsyncJobManager {
/// @brief initializes an async job
//////////////////////////////////////////////////////////////////////////////
void initAsyncJob(HttpServerJob*, char const*);
void initAsyncJob(GeneralServerJob*, char const*);
//////////////////////////////////////////////////////////////////////////////
/// @brief finishes the execution of an async job

View File

@ -33,7 +33,7 @@
#include "GeneralServer/GeneralCommTask.h"
#include "GeneralServer/GeneralListenTask.h"
#include "GeneralServer/GeneralServerFeature.h"
#include "GeneralServer/HttpServerJob.h"
#include "GeneralServer/GeneralServerJob.h"
#include "GeneralServer/RestHandler.h"
#include "Logger/Logger.h"
#include "Scheduler/ListenTask.h"
@ -127,13 +127,13 @@ bool GeneralServer::handleRequestAsync(GeneralCommTask* task,
// execute the handler using the dispatcher
std::unique_ptr<Job> job =
std::make_unique<HttpServerJob>(this, handler, true);
std::make_unique<GeneralServerJob>(this, std::move(handler), true);
task->RequestStatisticsAgent::transferTo(job.get());
// register the job with the job manager
if (jobId != nullptr) {
GeneralServerFeature::JOB_MANAGER->initAsyncJob(
static_cast<HttpServerJob*>(job.get()), hdr);
static_cast<GeneralServerJob*>(job.get()), hdr);
*jobId = job->jobId();
}
@ -171,11 +171,12 @@ bool GeneralServer::handleRequest(GeneralCommTask* task,
bool startThread = handler->needsOwnThread();
// use a dispatcher queue, handler belongs to the job
std::unique_ptr<Job> job = std::make_unique<HttpServerJob>(this, handler);
std::unique_ptr<Job> job =
std::make_unique<GeneralServerJob>(this, std::move(handler));
task->RequestStatisticsAgent::transferTo(job.get());
LOG(TRACE) << "GeneralCommTask " << (void*)task << " created HttpServerJob "
<< (void*)job.get();
LOG(TRACE) << "GeneralCommTask " << (void*)task
<< " created GeneralServerJob " << (void*)job.get();
// add the job to the dispatcher
int res = DispatcherFeature::DISPATCHER->addJob(job, startThread);

View File

@ -22,7 +22,7 @@
/// @author Achim Brandt
////////////////////////////////////////////////////////////////////////////////
#include "HttpServerJob.h"
#include "GeneralServerJob.h"
#include "Basics/WorkMonitor.h"
#include "Dispatcher/DispatcherQueue.h"
@ -42,28 +42,29 @@ using namespace arangodb::rest;
/// @brief constructs a new server job
////////////////////////////////////////////////////////////////////////////////
HttpServerJob::HttpServerJob(GeneralServer* server,
WorkItem::uptr<RestHandler>& handler, bool isAsync)
: Job("HttpServerJob"),
GeneralServerJob::GeneralServerJob(GeneralServer* server,
WorkItem::uptr<RestHandler> handler,
bool isAsync)
: Job("GeneralServerJob"),
_server(server),
_workDesc(nullptr),
_isAsync(isAsync) {
_handler.swap(handler);
_handler = std::move(handler);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief destructs a server job
////////////////////////////////////////////////////////////////////////////////
HttpServerJob::~HttpServerJob() {
GeneralServerJob::~GeneralServerJob() {
if (_workDesc != nullptr) {
WorkMonitor::freeWorkDescription(_workDesc);
}
}
size_t HttpServerJob::queue() const { return _handler->queue(); }
size_t GeneralServerJob::queue() const { return _handler->queue(); }
void HttpServerJob::work() {
void GeneralServerJob::work() {
TRI_ASSERT(_handler.get() != nullptr);
RequestStatisticsAgent::transferTo(_handler.get());
@ -106,13 +107,13 @@ void HttpServerJob::work() {
_workDesc = WorkMonitor::popHandler(_handler.release(), false);
}
bool HttpServerJob::cancel() { return _handler->cancel(); }
bool GeneralServerJob::cancel() { return _handler->cancel(); }
void HttpServerJob::cleanup(DispatcherQueue* queue) {
void GeneralServerJob::cleanup(DispatcherQueue* queue) {
queue->removeJob(this);
delete this;
}
void HttpServerJob::handleError(arangodb::basics::Exception const& ex) {
void GeneralServerJob::handleError(arangodb::basics::Exception const& ex) {
_handler->handleError(ex);
}

View File

@ -35,15 +35,15 @@ namespace rest {
class RestHandler;
class GeneralServer;
class HttpServerJob : public Job {
HttpServerJob(HttpServerJob const&) = delete;
HttpServerJob& operator=(HttpServerJob const&) = delete;
class GeneralServerJob : public Job {
GeneralServerJob(GeneralServerJob const&) = delete;
GeneralServerJob& operator=(GeneralServerJob const&) = delete;
public:
HttpServerJob(GeneralServer*, arangodb::WorkItem::uptr<RestHandler>&,
bool isAsync = false);
GeneralServerJob(GeneralServer*, arangodb::WorkItem::uptr<RestHandler>,
bool isAsync = false);
~HttpServerJob();
~GeneralServerJob();
public:
RestHandler* handler() const { return _handler.get(); }

View File

@ -26,12 +26,12 @@
#include "Basics/HybridLogicalClock.h"
#include "Basics/StringBuffer.h"
#include "Basics/VelocyPackHelper.h"
#include "Meta/conversion.h"
#include "GeneralServer/GeneralServer.h"
#include "GeneralServer/GeneralServerFeature.h"
#include "GeneralServer/RestHandler.h"
#include "GeneralServer/RestHandlerFactory.h"
#include "Logger/LoggerFeature.h"
#include "Meta/conversion.h"
#include "Scheduler/Scheduler.h"
#include "Scheduler/SchedulerFeature.h"
#include "VocBase/ticks.h"
@ -278,6 +278,7 @@ bool VppCommTask::processRead() {
// CASE 1: message is in one chunk
if (chunkHeader._isFirst && chunkHeader._chunk == 1) {
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "chunk contains single message";
std::size_t payloads = 0;
try {
@ -306,17 +307,19 @@ bool VppCommTask::processRead() {
// }
doExecute = true;
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "CASE 1";
}
// CASE 2: message is in multiple chunks
auto incompleteMessageItr = _incompleteMessages.find(chunkHeader._messageID);
// CASE 2a: chunk starts new message
if (chunkHeader._isFirst) { // first chunk of multi chunk message
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "chunk starts a new message";
if (incompleteMessageItr != _incompleteMessages.end()) {
throw std::logic_error(
"Message should be first but is already in the Map of incomplete "
"messages");
LOG_TOPIC(DEBUG, Logger::COMMUNICATION)
<< "Message should be first but is already in the Map of incomplete "
"messages";
closeTask(rest::ResponseCode::BAD);
return false;
}
// TODO: is a 32bit value sufficient for the messageLength here?
@ -327,17 +330,19 @@ bool VppCommTask::processRead() {
auto insertPair = _incompleteMessages.emplace(
std::make_pair(chunkHeader._messageID, std::move(message)));
if (!insertPair.second) {
throw std::logic_error("insert failed");
closeTask();
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "insert failed";
closeTask(rest::ResponseCode::BAD);
return false;
}
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "CASE 2a";
// CASE 2b: chunk continues a message
} else { // followup chunk of some mesage
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "chunk continues a message";
if (incompleteMessageItr == _incompleteMessages.end()) {
throw std::logic_error("found message without previous part");
closeTask();
LOG_TOPIC(DEBUG, Logger::COMMUNICATION)
<< "found message without previous part";
closeTask(rest::ResponseCode::BAD);
return false;
}
auto& im = incompleteMessageItr->second; // incomplete Message
im._currentChunk++;
@ -347,6 +352,7 @@ bool VppCommTask::processRead() {
// MESSAGE COMPLETE
if (im._currentChunk == im._numberOfChunks - 1 /* zero based counting */) {
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "chunk completes a message";
std::size_t payloads = 0;
try {
@ -374,9 +380,9 @@ bool VppCommTask::processRead() {
// check length
doExecute = true;
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "CASE 2b - complete";
}
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "CASE 2b - still incomplete";
LOG_TOPIC(DEBUG, Logger::COMMUNICATION)
<< "chunk does not complete a message";
}
read_maybe_only_part_of_buffer = true;
@ -394,14 +400,17 @@ bool VppCommTask::processRead() {
if (doExecute) {
VPackSlice header = message.header();
LOG_TOPIC(DEBUG, Logger::COMMUNICATION)
<< "got request:" << header.toJson();
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "got request:"
<< header.toJson();
int type = meta::underlyingValue(rest::RequestType::ILLEGAL);
try {
type = header.at(1).getInt();
} catch (std::exception const& e) {
throw std::runtime_error(
std::string("Error during Parsing of VppHeader: ") + e.what());
handleSimpleError(rest::ResponseCode::BAD, chunkHeader._messageID);
LOG_TOPIC(DEBUG, Logger::COMMUNICATION)
<< std::string("VPack Validation failed!") + e.what();
closeTask(rest::ResponseCode::BAD);
return false;
}
if (type == 1000) {
// do auth

View File

@ -0,0 +1,165 @@
#.rst:
# FindPythonInterp
# ----------------
#
# Find python interpreter
#
# This module finds if Python interpreter is installed and determines
# where the executables are. This code sets the following variables:
#
# ::
#
# PYTHONINTERP_FOUND - Was the Python executable found
# PYTHON_EXECUTABLE - path to the Python interpreter
#
#
#
# ::
#
# PYTHON_VERSION_STRING - Python version found e.g. 2.5.2
# PYTHON_VERSION_MAJOR - Python major version found e.g. 2
# PYTHON_VERSION_MINOR - Python minor version found e.g. 5
# PYTHON_VERSION_PATCH - Python patch version found e.g. 2
#
#
#
# The Python_ADDITIONAL_VERSIONS variable can be used to specify a list
# of version numbers that should be taken into account when searching
# for Python. You need to set this variable before calling
# find_package(PythonInterp).
#
# If calling both ``find_package(PythonInterp)`` and
# ``find_package(PythonLibs)``, call ``find_package(PythonInterp)`` first to
# get the currently active Python version by default with a consistent version
# of PYTHON_LIBRARIES.
#=============================================================================
# Copyright 2005-2010 Kitware, Inc.
# Copyright 2011 Bjoern Ricks <bjoern.ricks@gmail.com>
# Copyright 2012 Rolf Eike Beer <eike@sf-mail.de>
#
# Distributed under the OSI-approved BSD License (the "License");
# see accompanying file Copyright.txt for details.
#
# This software is distributed WITHOUT ANY WARRANTY; without even the
# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the License for more information.
#=============================================================================
# (To distribute this file outside of CMake, substitute the full
# License text for the above reference.)
unset(_Python_NAMES)
set(_PYTHON1_VERSIONS 1.6 1.5)
set(_PYTHON2_VERSIONS 2.7 2.6 2.5 2.4 2.3 2.2 2.1 2.0)
set(_PYTHON3_VERSIONS 3.6 3.5 3.4 3.3 3.2 3.1 3.0)
if(PythonInterp_FIND_VERSION)
if(PythonInterp_FIND_VERSION_COUNT GREATER 1)
set(_PYTHON_FIND_MAJ_MIN "${PythonInterp_FIND_VERSION_MAJOR}.${PythonInterp_FIND_VERSION_MINOR}")
list(APPEND _Python_NAMES
python${_PYTHON_FIND_MAJ_MIN}
python${PythonInterp_FIND_VERSION_MAJOR})
unset(_PYTHON_FIND_OTHER_VERSIONS)
if(NOT PythonInterp_FIND_VERSION_EXACT)
foreach(_PYTHON_V ${_PYTHON${PythonInterp_FIND_VERSION_MAJOR}_VERSIONS})
if(NOT _PYTHON_V VERSION_LESS _PYTHON_FIND_MAJ_MIN)
list(APPEND _PYTHON_FIND_OTHER_VERSIONS ${_PYTHON_V})
endif()
endforeach()
endif()
unset(_PYTHON_FIND_MAJ_MIN)
else()
list(APPEND _Python_NAMES python${PythonInterp_FIND_VERSION_MAJOR})
set(_PYTHON_FIND_OTHER_VERSIONS ${_PYTHON${PythonInterp_FIND_VERSION_MAJOR}_VERSIONS})
endif()
else()
set(_PYTHON_FIND_OTHER_VERSIONS ${_PYTHON3_VERSIONS} ${_PYTHON2_VERSIONS} ${_PYTHON1_VERSIONS})
endif()
find_program(PYTHON_EXECUTABLE NAMES ${_Python_NAMES})
# Set up the versions we know about, in the order we will search. Always add
# the user supplied additional versions to the front.
set(_Python_VERSIONS ${Python_ADDITIONAL_VERSIONS})
# If FindPythonInterp has already found the major and minor version,
# insert that version next to get consistent versions of the interpreter and
# library.
if(DEFINED PYTHONLIBS_VERSION_STRING)
string(REPLACE "." ";" _PYTHONLIBS_VERSION "${PYTHONLIBS_VERSION_STRING}")
list(GET _PYTHONLIBS_VERSION 0 _PYTHONLIBS_VERSION_MAJOR)
list(GET _PYTHONLIBS_VERSION 1 _PYTHONLIBS_VERSION_MINOR)
list(APPEND _Python_VERSIONS ${_PYTHONLIBS_VERSION_MAJOR}.${_PYTHONLIBS_VERSION_MINOR})
endif()
# Search for the current active python version first
list(APPEND _Python_VERSIONS ";")
list(APPEND _Python_VERSIONS ${_PYTHON_FIND_OTHER_VERSIONS})
unset(_PYTHON_FIND_OTHER_VERSIONS)
unset(_PYTHON1_VERSIONS)
unset(_PYTHON2_VERSIONS)
unset(_PYTHON3_VERSIONS)
# Search for newest python version if python executable isn't found
if(NOT PYTHON_EXECUTABLE)
foreach(_CURRENT_VERSION IN LISTS _Python_VERSIONS)
set(_Python_NAMES python${_CURRENT_VERSION})
if(WIN32)
list(APPEND _Python_NAMES python)
endif()
find_program(PYTHON_EXECUTABLE
NAMES ${_Python_NAMES}
PATHS [HKEY_LOCAL_MACHINE\\SOFTWARE\\Python\\PythonCore\\${_CURRENT_VERSION}\\InstallPath]
)
endforeach()
endif()
# determine python version string
if(PYTHON_EXECUTABLE)
execute_process(COMMAND "${PYTHON_EXECUTABLE}" -c
"import sys; sys.stdout.write(';'.join([str(x) for x in sys.version_info[:3]]))"
OUTPUT_VARIABLE _VERSION
RESULT_VARIABLE _PYTHON_VERSION_RESULT
ERROR_QUIET)
if(NOT _PYTHON_VERSION_RESULT)
string(REPLACE ";" "." PYTHON_VERSION_STRING "${_VERSION}")
list(GET _VERSION 0 PYTHON_VERSION_MAJOR)
list(GET _VERSION 1 PYTHON_VERSION_MINOR)
list(GET _VERSION 2 PYTHON_VERSION_PATCH)
if(PYTHON_VERSION_PATCH EQUAL 0)
# it's called "Python 2.7", not "2.7.0"
string(REGEX REPLACE "\\.0$" "" PYTHON_VERSION_STRING "${PYTHON_VERSION_STRING}")
endif()
else()
# sys.version predates sys.version_info, so use that
execute_process(COMMAND "${PYTHON_EXECUTABLE}" -c "import sys; sys.stdout.write(sys.version)"
OUTPUT_VARIABLE _VERSION
RESULT_VARIABLE _PYTHON_VERSION_RESULT
ERROR_QUIET)
if(NOT _PYTHON_VERSION_RESULT)
string(REGEX REPLACE " .*" "" PYTHON_VERSION_STRING "${_VERSION}")
string(REGEX REPLACE "^([0-9]+)\\.[0-9]+.*" "\\1" PYTHON_VERSION_MAJOR "${PYTHON_VERSION_STRING}")
string(REGEX REPLACE "^[0-9]+\\.([0-9])+.*" "\\1" PYTHON_VERSION_MINOR "${PYTHON_VERSION_STRING}")
if(PYTHON_VERSION_STRING MATCHES "^[0-9]+\\.[0-9]+\\.([0-9]+)")
set(PYTHON_VERSION_PATCH "${CMAKE_MATCH_1}")
else()
set(PYTHON_VERSION_PATCH "0")
endif()
else()
# sys.version was first documented for Python 1.5, so assume
# this is older.
set(PYTHON_VERSION_STRING "1.4")
set(PYTHON_VERSION_MAJOR "1")
set(PYTHON_VERSION_MINOR "4")
set(PYTHON_VERSION_PATCH "0")
endif()
endif()
unset(_PYTHON_VERSION_RESULT)
unset(_VERSION)
endif()
# handle the QUIETLY and REQUIRED arguments and set PYTHONINTERP_FOUND to TRUE if
# all listed variables are TRUE
include(${CMAKE_CURRENT_LIST_DIR}/FindPackageHandleStandardArgs.cmake)
FIND_PACKAGE_HANDLE_STANDARD_ARGS(PythonInterp REQUIRED_VARS PYTHON_EXECUTABLE VERSION_VAR PYTHON_VERSION_STRING)
mark_as_advanced(PYTHON_EXECUTABLE)