mirror of https://gitee.com/bigwinds/arangodb
virtualize functions of request/response
This commit is contained in:
parent
2695b2bea9
commit
682cf7a2f1
|
@ -220,7 +220,7 @@ OperationID ClusterComm::getOperationID() { return TRI_NewTickServer(); }
|
|||
/// here in the form of "server:" followed by a serverID. Furthermore,
|
||||
/// it is possible to specify the target endpoint directly using
|
||||
/// "tcp://..." or "ssl://..." endpoints, if `singleRequest` is true.
|
||||
///
|
||||
///
|
||||
/// There are two timeout arguments. `timeout` is the globale timeout
|
||||
/// specifying after how many seconds the complete operation must be
|
||||
/// completed. `initTimeout` is a second timeout, which is used to
|
||||
|
@ -228,7 +228,7 @@ OperationID ClusterComm::getOperationID() { return TRI_NewTickServer(); }
|
|||
/// is negative (as for example in the default value), then `initTimeout`
|
||||
/// is taken to be the same as `timeout`. The idea behind the two timeouts
|
||||
/// is to be able to specify correct behaviour for automatic failover.
|
||||
/// The idea is that if the initial request cannot be sent within
|
||||
/// The idea is that if the initial request cannot be sent within
|
||||
/// `initTimeout`, one can retry after a potential failover.
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
|
@ -793,14 +793,12 @@ void ClusterComm::drop(ClientTransactionID const& clientTransactionID,
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void ClusterComm::asyncAnswer(std::string& coordinatorHeader,
|
||||
GeneralResponse* responseToSendGeneral) {
|
||||
// TODO needs to generalized
|
||||
auto responseToSend = dynamic_cast<HttpResponse*>(responseToSendGeneral);
|
||||
GeneralResponse* responseToSend) {
|
||||
|
||||
if (responseToSend == nullptr) {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
|
||||
}
|
||||
|
||||
|
||||
// First take apart the header to get the coordinatorID:
|
||||
ServerID coordinatorID;
|
||||
size_t start = 0;
|
||||
|
@ -881,14 +879,11 @@ void ClusterComm::asyncAnswer(std::string& coordinatorHeader,
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
std::string ClusterComm::processAnswer(std::string& coordinatorHeader,
|
||||
GeneralRequest* answerGeneral) {
|
||||
// TODO needs to generalized
|
||||
auto answer = dynamic_cast<HttpRequest*>(answerGeneral);
|
||||
|
||||
GeneralRequest* answer) {
|
||||
if (answer == nullptr) {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
|
||||
}
|
||||
|
||||
|
||||
TRI_ASSERT(answer != nullptr);
|
||||
// First take apart the header to get the operaitonID:
|
||||
OperationID operationID;
|
||||
|
@ -981,7 +976,7 @@ bool ClusterComm::moveFromSendToReceived(OperationID operationID) {
|
|||
CONDITION_LOCKER(locker, somethingReceived);
|
||||
CONDITION_LOCKER(sendLocker, somethingToSend);
|
||||
|
||||
IndexIterator i = toSendByOpID.find(operationID); // cannot fail
|
||||
IndexIterator i = toSendByOpID.find(operationID); // cannot fail
|
||||
// TRI_ASSERT(i != toSendByOpID.end());
|
||||
//KV: Except the operation has been dropped in the meantime
|
||||
|
||||
|
@ -1089,7 +1084,7 @@ size_t ClusterComm::performRequests(std::vector<ClusterCommRequest>& requests,
|
|||
for (size_t i = 0; i < requests.size(); ++i) {
|
||||
dueTime.push_back(startTime);
|
||||
}
|
||||
|
||||
|
||||
nrDone = 0;
|
||||
size_t nrGood = 0;
|
||||
|
||||
|
@ -1106,7 +1101,7 @@ size_t ClusterComm::performRequests(std::vector<ClusterCommRequest>& requests,
|
|||
for (size_t i = 0; i < requests.size(); i++) {
|
||||
if (!requests[i].done && now >= dueTime[i]) {
|
||||
if (requests[i].headerFields.get() == nullptr) {
|
||||
requests[i].headerFields
|
||||
requests[i].headerFields
|
||||
= std::make_unique<std::unordered_map<std::string, std::string>>();
|
||||
}
|
||||
LOG_TOPIC(TRACE, logTopic)
|
||||
|
@ -1181,7 +1176,7 @@ size_t ClusterComm::performRequests(std::vector<ClusterCommRequest>& requests,
|
|||
}
|
||||
LOG_TOPIC(TRACE, logTopic) << "ClusterComm::performRequests: "
|
||||
<< "got answer from " << requests[index].destination << ":"
|
||||
<< requests[index].path << " with return code "
|
||||
<< requests[index].path << " with return code "
|
||||
<< (int) res.answer_code;
|
||||
} else if (res.status == CL_COMM_BACKEND_UNAVAILABLE ||
|
||||
(res.status == CL_COMM_TIMEOUT && !res.sendWasComplete)) {
|
||||
|
@ -1226,7 +1221,7 @@ size_t ClusterComm::performRequests(std::vector<ClusterCommRequest>& requests,
|
|||
LOG_TOPIC(DEBUG, logTopic) << "ClusterComm::performRequests: "
|
||||
<< "got timeout, this will be reported...";
|
||||
|
||||
// Forget about
|
||||
// Forget about
|
||||
drop("", coordinatorTransactionID, 0, "");
|
||||
return nrGood;
|
||||
}
|
||||
|
|
|
@ -32,6 +32,8 @@
|
|||
#include "Basics/Thread.h"
|
||||
#include "Cluster/AgencyComm.h"
|
||||
#include "Cluster/ClusterInfo.h"
|
||||
#include "Rest/GeneralRequest.h"
|
||||
#include "Rest/GeneralResponse.h"
|
||||
#include "Rest/HttpRequest.h"
|
||||
#include "Rest/HttpResponse.h"
|
||||
#include "SimpleHttpClient/SimpleHttpResult.h"
|
||||
|
@ -90,8 +92,8 @@ enum ClusterCommOpStatus {
|
|||
///
|
||||
/// First, the actual destination is determined. If the responsible server
|
||||
/// for a shard is not found or the endpoint for a named server is not found,
|
||||
/// or if the given endpoint is no known protocol (currently "tcp://" or
|
||||
/// "ssl://", then `status` is set to CL_COMM_BACKEND_UNAVAILABLE,
|
||||
/// or if the given endpoint is no known protocol (currently "tcp://" or
|
||||
/// "ssl://", then `status` is set to CL_COMM_BACKEND_UNAVAILABLE,
|
||||
/// `errorMessage` is set but `result` and `answer` are both set
|
||||
/// to nullptr. The flag `sendWasComplete` remains false and the
|
||||
/// `answer_code` remains GeneralResponse::ResponseCode::PROCESSING.
|
||||
|
@ -111,7 +113,7 @@ enum ClusterCommOpStatus {
|
|||
/// error cases `result`, `answer` and `answer_code` are still unset.
|
||||
///
|
||||
/// If the connection was successfully created the request is sent.
|
||||
/// If the request ended with a timeout, `status` is set to
|
||||
/// If the request ended with a timeout, `status` is set to
|
||||
/// CL_COMM_TIMEOUT as above. If another communication error (broken
|
||||
/// connection) happens, `status` is set to CL_COMM_BACKEND_UNAVAILABLE.
|
||||
/// In both cases, `result` can be set or can still be a nullptr.
|
||||
|
@ -125,7 +127,7 @@ enum ClusterCommOpStatus {
|
|||
/// stage. The callback is called, and the result either left in the
|
||||
/// receiving queue or dropped. A call to ClusterComm::enquire or
|
||||
/// ClusterComm::wait can return a result in this state. Note that
|
||||
/// `answer` and `answer_code` are still not set. The flag
|
||||
/// `answer` and `answer_code` are still not set. The flag
|
||||
/// `sendWasComplete` is correctly set, though.
|
||||
///
|
||||
/// In the `singleRequest==false` mode, an asynchronous operation happens
|
||||
|
@ -189,9 +191,9 @@ struct ClusterCommResult {
|
|||
std::shared_ptr<httpclient::SimpleHttpResult> result;
|
||||
// the field answer is != nullptr if status is == CL_COMM_RECEIVED
|
||||
// answer_code is valid iff answer is != 0
|
||||
std::shared_ptr<HttpRequest> answer;
|
||||
std::shared_ptr<GeneralRequest> answer;
|
||||
GeneralResponse::ResponseCode answer_code;
|
||||
|
||||
|
||||
// The following flag indicates whether or not the complete request was
|
||||
// sent to the other side. This is often important to judge whether or
|
||||
// not the operation could have been completed on the server, for example
|
||||
|
|
|
@ -48,7 +48,7 @@ static double const CL_DEFAULT_TIMEOUT = 60.0;
|
|||
namespace arangodb {
|
||||
|
||||
static int handleGeneralCommErrors(ClusterCommResult const* res) {
|
||||
// This function creates an error code from a ClusterCommResult,
|
||||
// This function creates an error code from a ClusterCommResult,
|
||||
// but only if it is a communication error. If the communication
|
||||
// was successful and there was an HTTP error code, this function
|
||||
// returns TRI_ERROR_NO_ERROR.
|
||||
|
@ -405,10 +405,8 @@ std::unordered_map<std::string, std::string> getForwardableRequestHeaders(
|
|||
++it;
|
||||
}
|
||||
|
||||
auto httpRequest = dynamic_cast<HttpRequest*>(request);
|
||||
|
||||
if (httpRequest != nullptr) {
|
||||
result["content-length"] = StringUtils::itoa(httpRequest->contentLength());
|
||||
if (request != nullptr) {
|
||||
result["content-length"] = StringUtils::itoa(request->contentLength());
|
||||
}
|
||||
|
||||
return result;
|
||||
|
@ -671,7 +669,7 @@ int countOnCoordinator(std::string const& dbname, std::string const& collname,
|
|||
for (auto const& p : *shards) {
|
||||
requests.emplace_back("shard:" + p.first,
|
||||
arangodb::GeneralRequest::RequestType::GET,
|
||||
"/_db/" + StringUtils::urlEncode(dbname) +
|
||||
"/_db/" + StringUtils::urlEncode(dbname) +
|
||||
"/_api/collection/" +
|
||||
StringUtils::urlEncode(p.first) + "/count", body);
|
||||
}
|
||||
|
@ -699,7 +697,7 @@ int countOnCoordinator(std::string const& dbname, std::string const& collname,
|
|||
return TRI_ERROR_CLUSTER_BACKEND_UNAVAILABLE;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
||||
|
@ -972,7 +970,7 @@ int deleteDocumentOnCoordinator(
|
|||
TRI_ASSERT(requests.size() == 1);
|
||||
auto const& req = requests[0];
|
||||
auto& res = req.result;
|
||||
|
||||
|
||||
int commError = handleGeneralCommErrors(&res);
|
||||
if (commError != TRI_ERROR_NO_ERROR) {
|
||||
return commError;
|
||||
|
@ -1218,7 +1216,7 @@ int getDocumentOnCoordinator(
|
|||
headers->emplace("if-match",
|
||||
slice.get(StaticStrings::RevString).copyString());
|
||||
}
|
||||
|
||||
|
||||
VPackSlice keySlice = slice;
|
||||
if (slice.isObject()) {
|
||||
keySlice = slice.get(StaticStrings::KeyString);
|
||||
|
@ -1964,7 +1962,7 @@ int flushWalOnAllDBServers(bool waitForSync, bool waitForCollector) {
|
|||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief compute a shard distribution for a new collection, the list
|
||||
/// dbServers must be a list of DBserver ids to distribute across.
|
||||
/// dbServers must be a list of DBserver ids to distribute across.
|
||||
/// If this list is empty, the complete current list of DBservers is
|
||||
/// fetched from ClusterInfo and with random_shuffle to mix it up.
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -2008,7 +2006,7 @@ std::map<std::string, std::vector<std::string>> distributeShards(
|
|||
found = false;
|
||||
break;
|
||||
}
|
||||
} while (std::find(serverIds.begin(), serverIds.end(), candidate) !=
|
||||
} while (std::find(serverIds.begin(), serverIds.end(), candidate) !=
|
||||
serverIds.end());
|
||||
if (found) {
|
||||
serverIds.push_back(candidate);
|
||||
|
|
|
@ -66,7 +66,7 @@ int HttpServer::sendChunk(uint64_t taskId, std::string const& data) {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
HttpServer::HttpServer(
|
||||
double keepAliveTimeout,
|
||||
double keepAliveTimeout,
|
||||
bool allowMethodOverride,
|
||||
std::vector<std::string> const& accessControlAllowOrigins)
|
||||
: _listenTasks(),
|
||||
|
|
|
@ -46,17 +46,12 @@ RestBatchHandler::~RestBatchHandler() {}
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
RestHandler::status RestBatchHandler::execute() {
|
||||
// TODO needs to generalized
|
||||
auto response = dynamic_cast<HttpResponse*>(_response);
|
||||
|
||||
if (response == nullptr) {
|
||||
// TODO OBI - generalize function
|
||||
if (_response == nullptr) {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
|
||||
}
|
||||
|
||||
// TODO needs to generalized
|
||||
auto httpRequest = dynamic_cast<HttpRequest*>(_request);
|
||||
|
||||
if (httpRequest == nullptr) {
|
||||
if (_request == nullptr) {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
|
||||
}
|
||||
|
||||
|
@ -86,15 +81,15 @@ RestHandler::status RestBatchHandler::execute() {
|
|||
|
||||
// get authorization header. we will inject this into the subparts
|
||||
std::string const& authorization =
|
||||
httpRequest->header(StaticStrings::Authorization);
|
||||
_request->header(StaticStrings::Authorization);
|
||||
|
||||
// create the response
|
||||
setResponseCode(GeneralResponse::ResponseCode::OK);
|
||||
response->setContentType(
|
||||
httpRequest->header(StaticStrings::ContentTypeHeader));
|
||||
_response->setContentType(
|
||||
_request->header(StaticStrings::ContentTypeHeader));
|
||||
|
||||
// setup some auxiliary structures to parse the multipart message
|
||||
std::string const& bodyStr = httpRequest->body();
|
||||
std::string const& bodyStr = _request->body();
|
||||
MultipartMessage message(boundary.c_str(), boundary.size(), bodyStr.c_str(),
|
||||
bodyStr.c_str() + bodyStr.size());
|
||||
|
||||
|
@ -148,7 +143,7 @@ RestHandler::status RestBatchHandler::execute() {
|
|||
|
||||
// set up request object for the part
|
||||
LOG(TRACE) << "part header is: " << std::string(headerStart, headerLength);
|
||||
HttpRequest* request = new HttpRequest(httpRequest->connectionInfo(),
|
||||
HttpRequest* request = new HttpRequest(_request->connectionInfo(),
|
||||
headerStart, headerLength, false);
|
||||
|
||||
// we do not have a client task id here
|
||||
|
@ -204,8 +199,7 @@ RestHandler::status RestBatchHandler::execute() {
|
|||
return status::FAILED;
|
||||
}
|
||||
|
||||
HttpResponse* partResponse =
|
||||
dynamic_cast<HttpResponse*>(handler->response());
|
||||
GeneralResponse* partResponse = handler->response();
|
||||
|
||||
if (partResponse == nullptr) {
|
||||
generateError(GeneralResponse::ResponseCode::BAD, TRI_ERROR_INTERNAL,
|
||||
|
@ -222,28 +216,28 @@ RestHandler::status RestBatchHandler::execute() {
|
|||
}
|
||||
|
||||
// append the boundary for this subpart
|
||||
response->body().appendText(boundary + "\r\nContent-Type: ");
|
||||
response->body().appendText(StaticStrings::BatchContentType);
|
||||
_response->body().appendText(boundary + "\r\nContent-Type: ");
|
||||
_response->body().appendText(StaticStrings::BatchContentType);
|
||||
|
||||
// append content-id if it is present
|
||||
if (helper.contentId != 0) {
|
||||
response->body().appendText(
|
||||
_response->body().appendText(
|
||||
"\r\nContent-Id: " +
|
||||
std::string(helper.contentId, helper.contentIdLength));
|
||||
}
|
||||
|
||||
response->body().appendText(TRI_CHAR_LENGTH_PAIR("\r\n\r\n"));
|
||||
_response->body().appendText(TRI_CHAR_LENGTH_PAIR("\r\n\r\n"));
|
||||
|
||||
// remove some headers we don't need
|
||||
partResponse->setConnectionType(HttpResponse::CONNECTION_NONE);
|
||||
partResponse->setHeaderNC(StaticStrings::Server, "");
|
||||
|
||||
// append the part response header
|
||||
partResponse->writeHeader(&response->body());
|
||||
partResponse->writeHeader(&_response->body());
|
||||
|
||||
// append the part response body
|
||||
response->body().appendText(partResponse->body());
|
||||
response->body().appendText(TRI_CHAR_LENGTH_PAIR("\r\n"));
|
||||
_response->body().appendText(partResponse->body());
|
||||
_response->body().appendText(TRI_CHAR_LENGTH_PAIR("\r\n"));
|
||||
}
|
||||
|
||||
// we've read the last part
|
||||
|
@ -253,10 +247,10 @@ RestHandler::status RestBatchHandler::execute() {
|
|||
}
|
||||
|
||||
// append final boundary + "--"
|
||||
response->body().appendText(boundary + "--");
|
||||
_response->body().appendText(boundary + "--");
|
||||
|
||||
if (errors > 0) {
|
||||
response->setHeaderNC(StaticStrings::Errors, StringUtils::itoa(errors));
|
||||
_response->setHeaderNC(StaticStrings::Errors, StringUtils::itoa(errors));
|
||||
}
|
||||
|
||||
// success
|
||||
|
@ -268,14 +262,12 @@ RestHandler::status RestBatchHandler::execute() {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool RestBatchHandler::getBoundaryBody(std::string* result) {
|
||||
// TODO needs to generalized
|
||||
auto request = dynamic_cast<HttpRequest*>(_request);
|
||||
|
||||
if (request == nullptr) {
|
||||
if (_request == nullptr) {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
|
||||
}
|
||||
|
||||
std::string const& bodyStr = request->body();
|
||||
std::string const& bodyStr = _request->body();
|
||||
char const* p = bodyStr.c_str();
|
||||
char const* e = p + bodyStr.size();
|
||||
|
||||
|
|
|
@ -161,7 +161,7 @@ std::string RestImportHandler::buildParseError(size_t i,
|
|||
|
||||
int RestImportHandler::handleSingleDocument(
|
||||
SingleCollectionTransaction& trx, RestImportResult& result,
|
||||
VPackBuilder& babies, char const* lineStart, VPackSlice slice,
|
||||
VPackBuilder& babies, char const* lineStart, VPackSlice slice,
|
||||
bool isEdgeCollection, size_t i) {
|
||||
|
||||
if (!slice.isObject()) {
|
||||
|
@ -188,7 +188,7 @@ int RestImportHandler::handleSingleDocument(
|
|||
// add prefixes to _from and _to
|
||||
if (!_fromPrefix.empty() || !_toPrefix.empty()) {
|
||||
TransactionBuilderLeaser tempBuilder(&trx);
|
||||
|
||||
|
||||
tempBuilder->openObject();
|
||||
if (!_fromPrefix.empty()) {
|
||||
VPackSlice from = slice.get(StaticStrings::FromString);
|
||||
|
@ -253,10 +253,8 @@ int RestImportHandler::handleSingleDocument(
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool RestImportHandler::createFromJson(std::string const& type) {
|
||||
// TODO needs to generalized
|
||||
auto request = dynamic_cast<HttpRequest*>(_request);
|
||||
|
||||
if (request == nullptr) {
|
||||
if (_request == nullptr) {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
|
||||
}
|
||||
|
||||
|
@ -300,15 +298,12 @@ bool RestImportHandler::createFromJson(std::string const& type) {
|
|||
} else if (type == "auto") {
|
||||
linewise = true;
|
||||
|
||||
// TODO generalize
|
||||
auto* httpResponse = dynamic_cast<HttpResponse*>(_response);
|
||||
|
||||
if (httpResponse == nullptr) {
|
||||
if (_response == nullptr) {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
|
||||
}
|
||||
|
||||
// auto detect import type by peeking at first non-whitespace character
|
||||
std::string const& body = request->body();
|
||||
std::string const& body = _request->body();
|
||||
char const* ptr = body.c_str();
|
||||
char const* end = ptr + body.size();
|
||||
|
||||
|
@ -361,7 +356,7 @@ bool RestImportHandler::createFromJson(std::string const& type) {
|
|||
|
||||
if (linewise) {
|
||||
// each line is a separate JSON document
|
||||
std::string const& body = request->body();
|
||||
std::string const& body = _request->body();
|
||||
char const* ptr = body.c_str();
|
||||
char const* end = ptr + body.size();
|
||||
size_t i = 0;
|
||||
|
@ -395,8 +390,8 @@ bool RestImportHandler::createFromJson(std::string const& type) {
|
|||
ptr = pos + 1;
|
||||
++result._numEmpty;
|
||||
continue;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if (pos != nullptr) {
|
||||
// non-empty line
|
||||
*(const_cast<char*>(pos)) = '\0';
|
||||
|
@ -442,7 +437,7 @@ bool RestImportHandler::createFromJson(std::string const& type) {
|
|||
// the entire request body is one JSON document
|
||||
std::shared_ptr<VPackBuilder> parsedDocuments;
|
||||
try {
|
||||
parsedDocuments = VPackParser::fromJson(request->body());
|
||||
parsedDocuments = VPackParser::fromJson(_request->body());
|
||||
} catch (VPackException const&) {
|
||||
generateError(GeneralResponse::ResponseCode::BAD,
|
||||
TRI_ERROR_HTTP_BAD_PARAMETER,
|
||||
|
@ -477,14 +472,14 @@ bool RestImportHandler::createFromJson(std::string const& type) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
babies.close();
|
||||
|
||||
|
||||
if (res == TRI_ERROR_NO_ERROR) {
|
||||
// no error so far. go on and perform the actual insert
|
||||
res = performImport(trx, result, collectionName, babies, complete, opOptions);
|
||||
}
|
||||
|
||||
|
||||
res = trx.finish(res);
|
||||
|
||||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
|
@ -500,10 +495,7 @@ bool RestImportHandler::createFromJson(std::string const& type) {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool RestImportHandler::createFromKeyValueList() {
|
||||
// TODO needs to generalized
|
||||
auto* request = dynamic_cast<HttpRequest*>(_request);
|
||||
|
||||
if (request == nullptr) {
|
||||
if (_request == nullptr) {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
|
||||
}
|
||||
|
||||
|
@ -544,7 +536,7 @@ bool RestImportHandler::createFromKeyValueList() {
|
|||
lineNumber = StringUtils::int64(lineNumValue);
|
||||
}
|
||||
|
||||
std::string const& bodyStr = request->body();
|
||||
std::string const& bodyStr = _request->body();
|
||||
char const* current = bodyStr.c_str();
|
||||
char const* bodyEnd = current + bodyStr.size();
|
||||
|
||||
|
@ -702,12 +694,12 @@ bool RestImportHandler::createFromKeyValueList() {
|
|||
}
|
||||
|
||||
babies.close();
|
||||
|
||||
|
||||
if (res == TRI_ERROR_NO_ERROR) {
|
||||
// no error so far. go on and perform the actual insert
|
||||
res = performImport(trx, result, collectionName, babies, complete, opOptions);
|
||||
}
|
||||
|
||||
|
||||
res = trx.finish(res);
|
||||
|
||||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
|
@ -723,7 +715,7 @@ bool RestImportHandler::createFromKeyValueList() {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int RestImportHandler::performImport(SingleCollectionTransaction& trx,
|
||||
RestImportResult& result,
|
||||
RestImportResult& result,
|
||||
std::string const& collectionName,
|
||||
VPackBuilder const& babies,
|
||||
bool complete,
|
||||
|
@ -767,10 +759,10 @@ int RestImportHandler::performImport(SingleCollectionTransaction& trx,
|
|||
// special behavior in case of unique constraint violation . . .
|
||||
if (errorCode == TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED && _onDuplicateAction != DUPLICATE_ERROR) {
|
||||
VPackSlice const keySlice = which.get(StaticStrings::KeyString);
|
||||
|
||||
|
||||
if (keySlice.isString()) {
|
||||
// insert failed. now try an update/replace
|
||||
if (_onDuplicateAction == DUPLICATE_UPDATE ||
|
||||
if (_onDuplicateAction == DUPLICATE_UPDATE ||
|
||||
_onDuplicateAction == DUPLICATE_REPLACE) {
|
||||
// update/replace
|
||||
updateReplace.add(which);
|
||||
|
@ -795,7 +787,7 @@ int RestImportHandler::performImport(SingleCollectionTransaction& trx,
|
|||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
++pos;
|
||||
}
|
||||
|
@ -803,14 +795,14 @@ int RestImportHandler::performImport(SingleCollectionTransaction& trx,
|
|||
updateReplace.close();
|
||||
|
||||
if (res == TRI_ERROR_NO_ERROR && updateReplace.slice().length() > 0) {
|
||||
if (_onDuplicateAction == DUPLICATE_UPDATE) {
|
||||
if (_onDuplicateAction == DUPLICATE_UPDATE) {
|
||||
opResult = trx.update(collectionName, updateReplace.slice(), opOptions);
|
||||
} else {
|
||||
opResult = trx.replace(collectionName, updateReplace.slice(), opOptions);
|
||||
}
|
||||
|
||||
|
||||
VPackSlice resultSlice = opResult.slice();
|
||||
size_t pos = 0;
|
||||
size_t pos = 0;
|
||||
for (auto const& it : VPackArrayIterator(resultSlice)) {
|
||||
if (!it.hasKey("error") || !it.get("error").getBool()) {
|
||||
++result._numUpdated;
|
||||
|
@ -827,7 +819,7 @@ int RestImportHandler::performImport(SingleCollectionTransaction& trx,
|
|||
++pos;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
|
|
|
@ -778,10 +778,8 @@ void RestReplicationHandler::handleCommandBarrier() {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void RestReplicationHandler::handleTrampolineCoordinator() {
|
||||
// TODO needs to generalized
|
||||
auto request = dynamic_cast<HttpRequest*>(_request);
|
||||
|
||||
if (request == nullptr) {
|
||||
if (_request == nullptr) {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
|
||||
}
|
||||
|
||||
|
@ -821,7 +819,7 @@ void RestReplicationHandler::handleTrampolineCoordinator() {
|
|||
_request->requestType(),
|
||||
"/_db/" + StringUtils::urlEncode(dbname) +
|
||||
_request->requestPath() + params,
|
||||
request->body(), *headers, 300.0);
|
||||
_request->body(), *headers, 300.0);
|
||||
|
||||
if (res->status == CL_COMM_TIMEOUT) {
|
||||
// No reply, we give up:
|
||||
|
@ -848,16 +846,13 @@ void RestReplicationHandler::handleTrampolineCoordinator() {
|
|||
setResponseCode(static_cast<GeneralResponse::ResponseCode>(
|
||||
res->result->getHttpReturnCode()));
|
||||
|
||||
// TODO needs to generalized
|
||||
auto response = dynamic_cast<HttpResponse*>(_response);
|
||||
|
||||
if (response == nullptr) {
|
||||
if (_response == nullptr) {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
|
||||
}
|
||||
|
||||
response->setContentType(
|
||||
_response->setContentType(
|
||||
res->result->getHeaderField(StaticStrings::ContentTypeHeader, dummy));
|
||||
response->body().swap(&(res->result->getBody()));
|
||||
_response->body().swap(&(res->result->getBody()));
|
||||
|
||||
auto const& resultHeaders = res->result->getHeaderFields();
|
||||
for (auto const& it : resultHeaders) {
|
||||
|
@ -1003,14 +998,11 @@ void RestReplicationHandler::handleCommandLoggerFollow() {
|
|||
setResponseCode(GeneralResponse::ResponseCode::OK);
|
||||
}
|
||||
|
||||
// TODO needs to generalized
|
||||
auto response = dynamic_cast<HttpResponse*>(_response);
|
||||
|
||||
if (response == nullptr) {
|
||||
if (_response == nullptr) {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
|
||||
}
|
||||
|
||||
response->setContentType(GeneralResponse::ContentType::DUMP);
|
||||
_response->setContentType(GeneralResponse::ContentType::DUMP);
|
||||
|
||||
// set headers
|
||||
_response->setHeaderNC(TRI_REPLICATION_HEADER_CHECKMORE,
|
||||
|
@ -1029,7 +1021,7 @@ void RestReplicationHandler::handleCommandLoggerFollow() {
|
|||
|
||||
if (length > 0) {
|
||||
// transfer ownership of the buffer contents
|
||||
response->body().set(dump._buffer);
|
||||
_response->body().set(dump._buffer);
|
||||
|
||||
// to avoid double freeing
|
||||
TRI_StealStringBuffer(dump._buffer);
|
||||
|
@ -1106,14 +1098,11 @@ void RestReplicationHandler::handleCommandDetermineOpenTransactions() {
|
|||
setResponseCode(GeneralResponse::ResponseCode::OK);
|
||||
}
|
||||
|
||||
// TODO needs to generalized
|
||||
auto response = dynamic_cast<HttpResponse*>(_response);
|
||||
|
||||
if (response == nullptr) {
|
||||
if (_response == nullptr) {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
|
||||
}
|
||||
|
||||
response->setContentType(HttpResponse::ContentType::DUMP);
|
||||
_response->setContentType(HttpResponse::ContentType::DUMP);
|
||||
|
||||
_response->setHeaderNC(TRI_REPLICATION_HEADER_FROMPRESENT,
|
||||
dump._fromTickIncluded ? "true" : "false");
|
||||
|
@ -1123,7 +1112,7 @@ void RestReplicationHandler::handleCommandDetermineOpenTransactions() {
|
|||
|
||||
if (length > 0) {
|
||||
// transfer ownership of the buffer contents
|
||||
response->body().set(dump._buffer);
|
||||
_response->body().set(dump._buffer);
|
||||
|
||||
// to avoid double freeing
|
||||
TRI_StealStringBuffer(dump._buffer);
|
||||
|
@ -1215,10 +1204,10 @@ void RestReplicationHandler::handleCommandClusterInventory() {
|
|||
|
||||
AgencyComm _agency;
|
||||
AgencyCommResult result;
|
||||
|
||||
|
||||
std::string prefix("Plan/Collections/");
|
||||
prefix.append(dbName);
|
||||
|
||||
|
||||
result = _agency.getValues(prefix);
|
||||
if (!result.successful()) {
|
||||
generateError(GeneralResponse::ResponseCode::SERVER_ERROR,
|
||||
|
@ -1258,7 +1247,7 @@ void RestReplicationHandler::handleCommandClusterInventory() {
|
|||
resultBuilder.slice());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -2186,14 +2175,12 @@ int RestReplicationHandler::processRestoreDataBatch(
|
|||
|
||||
VPackBuilder builder;
|
||||
|
||||
// TODO needs to generalized
|
||||
auto request = dynamic_cast<HttpRequest*>(_request);
|
||||
|
||||
if (request == nullptr) {
|
||||
if (_request == nullptr) {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
|
||||
}
|
||||
|
||||
std::string const& bodyStr = request->body();
|
||||
std::string const& bodyStr = _request->body();
|
||||
char const* ptr = bodyStr.c_str();
|
||||
char const* end = ptr + bodyStr.size();
|
||||
|
||||
|
@ -2528,14 +2515,11 @@ void RestReplicationHandler::handleCommandRestoreDataCoordinator() {
|
|||
std::string("received invalid JSON data for collection ") + name;
|
||||
VPackBuilder builder;
|
||||
|
||||
// TODO needs to generalized
|
||||
auto request = dynamic_cast<HttpRequest*>(_request);
|
||||
|
||||
if (request == nullptr) {
|
||||
if (_request == nullptr) {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
|
||||
}
|
||||
|
||||
std::string const& bodyStr = request->body();
|
||||
std::string const& bodyStr = _request->body();
|
||||
char const* ptr = bodyStr.c_str();
|
||||
char const* end = ptr + bodyStr.size();
|
||||
|
||||
|
@ -3153,7 +3137,7 @@ void RestReplicationHandler::handleCommandDump() {
|
|||
|
||||
bool compat28 = false;
|
||||
std::string const& value8 = _request->value("compat28", found);
|
||||
|
||||
|
||||
if (found) {
|
||||
compat28 = StringUtils::boolean(value8);
|
||||
}
|
||||
|
@ -3195,7 +3179,7 @@ void RestReplicationHandler::handleCommandDump() {
|
|||
TRI_replication_dump_t dump(transactionContext,
|
||||
static_cast<size_t>(determineChunkSize()),
|
||||
includeSystem, 0);
|
||||
|
||||
|
||||
if (compat28) {
|
||||
dump._compat28 = true;
|
||||
}
|
||||
|
@ -3507,7 +3491,7 @@ void RestReplicationHandler::handleCommandSync() {
|
|||
config._includeSystem = includeSystem;
|
||||
config._verbose = verbose;
|
||||
config._useCollectionId = useCollectionId;
|
||||
|
||||
|
||||
// wait until all data in current logfile got synced
|
||||
arangodb::wal::LogfileManager::instance()->waitForSync(5.0);
|
||||
|
||||
|
@ -4018,7 +4002,7 @@ void RestReplicationHandler::handleCommandHoldReadLockCollection() {
|
|||
double now = TRI_microtime();
|
||||
double startTime = now;
|
||||
double endTime = startTime + ttl;
|
||||
|
||||
|
||||
{
|
||||
CONDITION_LOCKER(locker, _condVar);
|
||||
while (now < endTime) {
|
||||
|
|
|
@ -42,10 +42,8 @@ RestUploadHandler::RestUploadHandler(GeneralRequest* request,
|
|||
RestUploadHandler::~RestUploadHandler() {}
|
||||
|
||||
RestHandler::status RestUploadHandler::execute() {
|
||||
// TODO needs to generalized
|
||||
auto request = dynamic_cast<HttpRequest*>(_request);
|
||||
|
||||
if (request == nullptr) {
|
||||
if (_request == nullptr) {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
|
||||
}
|
||||
|
||||
|
@ -73,7 +71,7 @@ RestHandler::status RestUploadHandler::execute() {
|
|||
|
||||
char* relative = TRI_GetFilename(filename);
|
||||
|
||||
std::string const& bodyStr = request->body();
|
||||
std::string const& bodyStr = _request->body();
|
||||
char const* body = bodyStr.c_str();
|
||||
size_t bodySize = bodyStr.size();
|
||||
|
||||
|
@ -135,14 +133,12 @@ RestHandler::status RestUploadHandler::execute() {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool RestUploadHandler::parseMultiPart(char const*& body, size_t& length) {
|
||||
// TODO needs to generalized
|
||||
auto request = dynamic_cast<HttpRequest*>(_request);
|
||||
|
||||
if (request == nullptr) {
|
||||
if (_request == nullptr) {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
|
||||
}
|
||||
|
||||
std::string const& bodyStr = request->body();
|
||||
std::string const& bodyStr = _request->body();
|
||||
char const* beg = bodyStr.c_str();
|
||||
char const* end = beg + bodyStr.size();
|
||||
|
||||
|
|
|
@ -655,12 +655,8 @@ std::shared_ptr<VPackBuilder> RestVocbaseBaseHandler::parseVelocyPackBody(
|
|||
contentType == StaticStrings::MimeTypeVPack) {
|
||||
|
||||
VPackValidator validator;
|
||||
//FIXME broken casts!!
|
||||
validator.validate(static_cast<HttpRequest*>(_request)->body().c_str()
|
||||
,static_cast<HttpRequest*>(_request)->body().length()
|
||||
);
|
||||
|
||||
VPackSlice slice{ static_cast<HttpRequest*>(_request)->body().c_str()};
|
||||
validator.validate(_request->body().c_str() ,_request->body().length());
|
||||
VPackSlice slice{_request->body().c_str()};
|
||||
auto builder = std::make_shared<VPackBuilder>(options);
|
||||
builder->add(slice);
|
||||
return builder;
|
||||
|
|
|
@ -290,11 +290,11 @@ static void AddCookie(v8::Isolate* isolate, TRI_v8_global_t const* v8g,
|
|||
|
||||
static v8::Handle<v8::Object> RequestCppToV8(v8::Isolate* isolate,
|
||||
TRI_v8_global_t const* v8g,
|
||||
GeneralRequest* generalRequest) {
|
||||
GeneralRequest* request) {
|
||||
// setup the request
|
||||
v8::Handle<v8::Object> req = v8::Object::New(isolate);
|
||||
|
||||
auto request = dynamic_cast<HttpRequest*>(generalRequest);
|
||||
//auto request = dynamic_cast<HttpRequest*>(generalRequest);
|
||||
|
||||
// TODO generalize
|
||||
if (request == nullptr) {
|
||||
|
|
|
@ -173,6 +173,11 @@ class GeneralRequest {
|
|||
virtual std::shared_ptr<arangodb::velocypack::Builder> toVelocyPack(
|
||||
arangodb::velocypack::Options const*) = 0;
|
||||
|
||||
virtual std::string const& body() const = 0;
|
||||
virtual int64_t contentLength() const = 0;
|
||||
|
||||
virtual std::unordered_map<std::string, std::string> cookieValues() const = 0;
|
||||
|
||||
protected:
|
||||
void setValue(char const* key, char const* value);
|
||||
void setArrayValue(char* key, size_t length, char const* value);
|
||||
|
|
|
@ -28,6 +28,7 @@
|
|||
|
||||
#include "Basics/StaticStrings.h"
|
||||
#include "Basics/StringUtils.h"
|
||||
#include "Basics/StringBuffer.h"
|
||||
|
||||
namespace arangodb {
|
||||
namespace velocypack {
|
||||
|
@ -106,6 +107,13 @@ class GeneralResponse {
|
|||
DUMP // application/x-arango-dump
|
||||
};
|
||||
|
||||
enum ConnectionType {
|
||||
CONNECTION_NONE,
|
||||
CONNECTION_KEEP_ALIVE,
|
||||
CONNECTION_CLOSE
|
||||
};
|
||||
|
||||
|
||||
public:
|
||||
// converts the response code to a string for delivering to a http client.
|
||||
static std::string responseString(ResponseCode);
|
||||
|
@ -116,6 +124,15 @@ class GeneralResponse {
|
|||
// response code from integer error code
|
||||
static ResponseCode responseCode(int);
|
||||
|
||||
// TODO OBI - check what can be implemented in this base class
|
||||
virtual basics::StringBuffer& body() = 0;
|
||||
virtual void setContentType(ContentType type) = 0;
|
||||
virtual void setContentType(std::string const& contentType) = 0;
|
||||
virtual void setContentType(std::string&& contentType) = 0;
|
||||
virtual void setConnectionType(ConnectionType type) = 0;
|
||||
virtual void writeHeader(basics::StringBuffer*) = 0;
|
||||
|
||||
|
||||
protected:
|
||||
explicit GeneralResponse(ResponseCode);
|
||||
|
||||
|
|
|
@ -60,15 +60,15 @@ class HttpRequest : public GeneralRequest {
|
|||
|
||||
public:
|
||||
// the content length
|
||||
int64_t contentLength() const { return _contentLength; }
|
||||
int64_t contentLength() const override { return _contentLength; }
|
||||
|
||||
std::string const& cookieValue(std::string const& key) const;
|
||||
std::string const& cookieValue(std::string const& key, bool& found) const;
|
||||
std::unordered_map<std::string, std::string> cookieValues() const {
|
||||
std::unordered_map<std::string, std::string> cookieValues() const override {
|
||||
return _cookies;
|
||||
}
|
||||
|
||||
std::string const& body() const;
|
||||
std::string const& body() const override;
|
||||
void setBody(char const* body, size_t length);
|
||||
|
||||
// the request body as VelocyPackBuilder
|
||||
|
|
|
@ -49,12 +49,6 @@ class HttpResponse : public GeneralResponse {
|
|||
public:
|
||||
bool isHeadResponse() const { return _isHeadResponse; }
|
||||
|
||||
enum ConnectionType {
|
||||
CONNECTION_NONE,
|
||||
CONNECTION_KEEP_ALIVE,
|
||||
CONNECTION_CLOSE
|
||||
};
|
||||
|
||||
public:
|
||||
void setCookie(std::string const& name, std::string const& value,
|
||||
int lifeTimeSeconds, std::string const& path,
|
||||
|
@ -69,30 +63,30 @@ class HttpResponse : public GeneralResponse {
|
|||
// information to the string buffer. Note that adding data to the body
|
||||
// invalidates any previously returned header. You must call header
|
||||
// again.
|
||||
basics::StringBuffer& body() { return _body; }
|
||||
basics::StringBuffer& body() override { return _body; }
|
||||
size_t bodySize() const;
|
||||
|
||||
/// @brief set type of connection
|
||||
void setConnectionType(ConnectionType type) { _connectionType = type; }
|
||||
void setConnectionType(ConnectionType type) override { _connectionType = type; }
|
||||
|
||||
/// @brief set content-type
|
||||
void setContentType(ContentType type) { _contentType = type; }
|
||||
void setContentType(ContentType type) override { _contentType = type; }
|
||||
|
||||
/// @brief set content-type from a string. this should only be used in
|
||||
/// cases when the content-type is user-defined
|
||||
void setContentType(std::string const& contentType) {
|
||||
void setContentType(std::string const& contentType) override {
|
||||
_headers[arangodb::StaticStrings::ContentTypeHeader] = contentType;
|
||||
_contentType = ContentType::CUSTOM;
|
||||
}
|
||||
|
||||
void setContentType(std::string&& contentType) {
|
||||
void setContentType(std::string&& contentType) override {
|
||||
_headers[arangodb::StaticStrings::ContentTypeHeader] =
|
||||
std::move(contentType);
|
||||
_contentType = ContentType::CUSTOM;
|
||||
}
|
||||
|
||||
// you should call writeHeader only after the body has been created
|
||||
void writeHeader(basics::StringBuffer*);
|
||||
void writeHeader(basics::StringBuffer*) override;
|
||||
|
||||
public:
|
||||
void reset(ResponseCode code) override final;
|
||||
|
|
Loading…
Reference in New Issue