mirror of https://gitee.com/bigwinds/arangodb
Merge branch 'devel' of https://github.com/arangodb/arangodb into devel
This commit is contained in:
commit
4fe234803e
|
@ -618,12 +618,14 @@ describe ArangoDB do
|
|||
|
||||
cmd = "/_api/export/#{id}"
|
||||
doc = ArangoDB.log_put("#{prefix}-create-wrong-api", cmd)
|
||||
|
||||
doc.code.should eq(404)
|
||||
doc.headers['content-type'].should eq("application/json; charset=utf-8")
|
||||
doc.parsed_response['error'].should eq(true)
|
||||
doc.parsed_response['code'].should eq(404)
|
||||
doc.parsed_response['errorNum'].should eq(1600)
|
||||
|
||||
if doc.code == 404
|
||||
doc.code.should eq(404)
|
||||
doc.headers['content-type'].should eq("application/json; charset=utf-8")
|
||||
doc.parsed_response['error'].should eq(true)
|
||||
doc.parsed_response['code'].should eq(404)
|
||||
doc.parsed_response['errorNum'].should eq(1600)
|
||||
end
|
||||
end
|
||||
|
||||
it "creates a query that survives memory limit constraints" do
|
||||
|
|
|
@ -100,9 +100,9 @@ bool AddFollower::create() {
|
|||
TRI_ASSERT(current[0].isString());
|
||||
#endif
|
||||
|
||||
size_t sub = 0;
|
||||
auto const& myClones = clones(_snapshot, _database, _collection, _shard);
|
||||
if (!myClones.empty()) {
|
||||
size_t sub = 0;
|
||||
for (auto const& clone : myClones) {
|
||||
AddFollower(_snapshot, _agent, _jobId + "-" + std::to_string(sub++),
|
||||
_jobId, _agencyPrefix, _database, clone.collection,
|
||||
|
|
|
@ -373,7 +373,7 @@ void AgencyCommManager::initialize(std::string const& prefix) {
|
|||
MANAGER.reset(new AgencyCommManager(prefix));
|
||||
}
|
||||
|
||||
void AgencyCommManager::shutdown() { MANAGER.release(); }
|
||||
void AgencyCommManager::shutdown() { MANAGER.reset(); }
|
||||
|
||||
std::string AgencyCommManager::path() {
|
||||
if (MANAGER == nullptr) {
|
||||
|
|
|
@ -65,13 +65,9 @@ bool FailedFollower::create() {
|
|||
LOG_TOPIC(INFO, Logger::AGENCY)
|
||||
<< "Todo: failed Follower for " + _shard + " from " + _from + " to " + _to;
|
||||
|
||||
std::string path = _agencyPrefix + toDoPrefix + _jobId;
|
||||
std::string planPath =
|
||||
planColPrefix + _database + "/" + _collection + "/shards";
|
||||
|
||||
size_t sub = 0;
|
||||
auto const& myClones = clones(_snapshot, _database, _collection, _shard);
|
||||
if (!myClones.empty()) {
|
||||
size_t sub = 0;
|
||||
for (auto const& clone : myClones) {
|
||||
FailedFollower(_snapshot, _agent, _jobId + "-" + std::to_string(sub++),
|
||||
_jobId, _agencyPrefix, _database, clone.collection,
|
||||
|
@ -84,6 +80,8 @@ bool FailedFollower::create() {
|
|||
_jb->openObject();
|
||||
|
||||
// Todo entry
|
||||
std::string path = _agencyPrefix + toDoPrefix + _jobId;
|
||||
|
||||
_jb->add(path, VPackValue(VPackValueType::Object));
|
||||
_jb->add("creator", VPackValue(_creator));
|
||||
_jb->add("type", VPackValue("failedFollower"));
|
||||
|
|
|
@ -446,7 +446,13 @@ bool Inception::estimateRAFTInterval() {
|
|||
|
||||
double precision = 1.0e-2;
|
||||
mn = precision *
|
||||
std::ceil((1./precision)*(.25 + precision*(maxmean+3*maxstdev)));
|
||||
std::ceil((1. / precision)*(.3 + precision * (maxmean + 3.*maxstdev)));
|
||||
if (config.waitForSync()) {
|
||||
mn *= 4.;
|
||||
}
|
||||
if (mn > 2.0) {
|
||||
mn = 2.0;
|
||||
}
|
||||
mx = 5. * mn;
|
||||
|
||||
LOG_TOPIC(INFO, Logger::AGENCY)
|
||||
|
|
|
@ -212,3 +212,14 @@ std::vector<Job::shard_t> Job::clones(
|
|||
return ret;
|
||||
|
||||
}
|
||||
|
||||
std::string Job::uuidLookup (Node const& snapshot, std::string const& shortID) {
|
||||
for (auto const& uuid : snapshot(mapUniqueToShortID).children()) {
|
||||
if ((*uuid.second)("ShortName").getString() == shortID) {
|
||||
return uuid.first;
|
||||
}
|
||||
}
|
||||
return std::string();
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -40,7 +40,7 @@ namespace consensus {
|
|||
enum JOB_STATUS { TODO, PENDING, FINISHED, FAILED, NOTFOUND };
|
||||
const std::vector<std::string> pos({"/Target/ToDo/", "/Target/Pending/",
|
||||
"/Target/Finished/", "/Target/Failed/"});
|
||||
|
||||
static std::string const mapUniqueToShortID = "/Target/MapUniqueToShortID/";
|
||||
static std::string const pendingPrefix = "/Target/Pending/";
|
||||
static std::string const failedPrefix = "/Target/Failed/";
|
||||
static std::string const finishedPrefix = "/Target/Finished/";
|
||||
|
@ -123,6 +123,8 @@ struct Job {
|
|||
Node const& snap, std::string const& db, std::string const& col,
|
||||
std::string const& shrd);
|
||||
|
||||
static std::string uuidLookup(Node const& snap, std::string const& shortID);
|
||||
|
||||
Node const _snapshot;
|
||||
Agent* _agent;
|
||||
std::string _jobId;
|
||||
|
|
|
@ -75,6 +75,24 @@ bool MoveShard::create() {
|
|||
_jb->openArray();
|
||||
_jb->openObject();
|
||||
|
||||
// Lookup from server
|
||||
if (_from.find("DBServer") == 0) {
|
||||
try {
|
||||
_from = uuidLookup(_snapshot, _from);
|
||||
} catch (...) {
|
||||
LOG_TOPIC(ERR, Logger::AGENCY) <<
|
||||
"MoveShard: From server " << _from << " does not exist";
|
||||
}
|
||||
}
|
||||
if (_to.find("DBServer") == 0) {
|
||||
try {
|
||||
_to = uuidLookup(_snapshot, _to);
|
||||
} catch (...) {
|
||||
LOG_TOPIC(ERR, Logger::AGENCY) <<
|
||||
"MoveShard: To server " << _to << " does not exist";
|
||||
}
|
||||
}
|
||||
|
||||
if (_from == _to) {
|
||||
path = _agencyPrefix + failedPrefix + _jobId;
|
||||
_jb->add("timeFinished", VPackValue(now));
|
||||
|
|
|
@ -186,7 +186,6 @@ SET(ARANGOD_SOURCES
|
|||
Cluster/DBServerAgencySync.cpp
|
||||
Cluster/HeartbeatThread.cpp
|
||||
Cluster/RestAgencyCallbacksHandler.cpp
|
||||
Cluster/RestShardHandler.cpp
|
||||
Cluster/ServerState.cpp
|
||||
Cluster/TraverserEngine.cpp
|
||||
Cluster/TraverserEngineRegistry.cpp
|
||||
|
|
|
@ -36,17 +36,9 @@
|
|||
#include "SimpleHttpClient/SimpleHttpCommunicatorResult.h"
|
||||
#include "Utils/Transaction.h"
|
||||
#include "VocBase/ticks.h"
|
||||
|
||||
using namespace arangodb;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief global callback for asynchronous REST handler
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void arangodb::ClusterCommRestCallback(std::string& coordinator,
|
||||
GeneralResponse* response) {
|
||||
ClusterComm::instance()->asyncAnswer(coordinator, response);
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief routine to set the destination
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -687,197 +679,6 @@ void ClusterComm::drop(ClientTransactionID const& clientTransactionID,
|
|||
}
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief send an answer HTTP request to a coordinator
|
||||
///
|
||||
/// This is only called in a DBServer node and never in a coordinator
|
||||
/// node.
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void ClusterComm::asyncAnswer(std::string& coordinatorHeader,
|
||||
GeneralResponse* response) {
|
||||
// FIXME - generalize for VPP
|
||||
HttpResponse* responseToSend = dynamic_cast<HttpResponse*>(response);
|
||||
if (responseToSend == nullptr) {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
|
||||
}
|
||||
|
||||
// First take apart the header to get the coordinatorID:
|
||||
ServerID coordinatorID;
|
||||
size_t start = 0;
|
||||
size_t pos;
|
||||
|
||||
LOG_TOPIC(DEBUG, Logger::CLUSTER)
|
||||
<< "In asyncAnswer, seeing " << coordinatorHeader;
|
||||
pos = coordinatorHeader.find(":", start);
|
||||
|
||||
if (pos == std::string::npos) {
|
||||
LOG_TOPIC(ERR, Logger::CLUSTER)
|
||||
<< "Could not find coordinator ID in X-Arango-Coordinator";
|
||||
return;
|
||||
}
|
||||
|
||||
coordinatorID = coordinatorHeader.substr(start, pos - start);
|
||||
|
||||
// Now find the connection to which the request goes from the coordinatorID:
|
||||
httpclient::ConnectionManager* cm = httpclient::ConnectionManager::instance();
|
||||
std::string endpoint =
|
||||
ClusterInfo::instance()->getServerEndpoint(coordinatorID);
|
||||
|
||||
if (endpoint == "") {
|
||||
if (logConnectionErrors()) {
|
||||
LOG_TOPIC(ERR, Logger::CLUSTER)
|
||||
<< "asyncAnswer: cannot find endpoint for server '"
|
||||
<< coordinatorID << "'";
|
||||
} else {
|
||||
LOG_TOPIC(INFO, Logger::CLUSTER)
|
||||
<< "asyncAnswer: cannot find endpoint for server '"
|
||||
<< coordinatorID << "'";
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
httpclient::ConnectionManager::SingleServerConnection* connection =
|
||||
cm->leaseConnection(endpoint);
|
||||
|
||||
if (nullptr == connection) {
|
||||
LOG_TOPIC(ERR, Logger::CLUSTER)
|
||||
<< "asyncAnswer: cannot create connection to server '"
|
||||
<< coordinatorID << "'";
|
||||
return;
|
||||
}
|
||||
|
||||
std::unordered_map<std::string, std::string> headers =
|
||||
responseToSend->headers();
|
||||
headers["X-Arango-Coordinator"] = coordinatorHeader;
|
||||
headers["X-Arango-Response-Code"] =
|
||||
responseToSend->responseString(responseToSend->responseCode());
|
||||
|
||||
addAuthorization(&headers);
|
||||
TRI_voc_tick_t timeStamp = TRI_HybridLogicalClock();
|
||||
headers[StaticStrings::HLCHeader] =
|
||||
arangodb::basics::HybridLogicalClock::encodeTimeStamp(timeStamp);
|
||||
|
||||
char const* body = responseToSend->body().c_str();
|
||||
size_t len = responseToSend->body().length();
|
||||
|
||||
LOG_TOPIC(DEBUG, Logger::CLUSTER)
|
||||
<< "asyncAnswer: sending PUT request to DB server '"
|
||||
<< coordinatorID << "'";
|
||||
|
||||
auto client = std::make_unique<arangodb::httpclient::SimpleHttpClient>(
|
||||
connection->_connection, 3600.0, false);
|
||||
client->keepConnectionOnDestruction(true);
|
||||
|
||||
// We add this result to the operation struct without acquiring
|
||||
// a lock, since we know that only we do such a thing:
|
||||
std::unique_ptr<httpclient::SimpleHttpResult> result(client->request(
|
||||
rest::RequestType::PUT, "/_api/shard-comm", body, len, headers));
|
||||
if (result.get() == nullptr || !result->isComplete()) {
|
||||
cm->brokenConnection(connection);
|
||||
client->invalidateConnection();
|
||||
} else {
|
||||
cm->returnConnection(connection);
|
||||
}
|
||||
// We cannot deal with a bad result here, so forget about it in any case.
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief process an answer coming in on the HTTP socket
|
||||
///
|
||||
/// this is called for a request, which is actually an answer to one of
|
||||
/// our earlier requests, return value of "" means OK and nonempty is
|
||||
/// an error. This is only called in a coordinator node and not in a
|
||||
/// DBServer node.
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
std::string ClusterComm::processAnswer(
|
||||
std::string const& coordinatorHeader,
|
||||
std::unique_ptr<GeneralRequest>&& answer) {
|
||||
TRI_ASSERT(false);
|
||||
if (answer == nullptr) {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
|
||||
}
|
||||
|
||||
TRI_ASSERT(answer != nullptr);
|
||||
// First take apart the header to get the operationID:
|
||||
OperationID operationID;
|
||||
size_t start = 0;
|
||||
size_t pos;
|
||||
|
||||
pos = coordinatorHeader.find(":", start);
|
||||
if (pos == std::string::npos) {
|
||||
return std::string(
|
||||
"could not find coordinator ID in 'X-Arango-Coordinator'");
|
||||
}
|
||||
// coordinatorID = coordinatorHeader.substr(start,pos-start);
|
||||
start = pos + 1;
|
||||
pos = coordinatorHeader.find(":", start);
|
||||
if (pos == std::string::npos) {
|
||||
return std::string("could not find operationID in 'X-Arango-Coordinator'");
|
||||
}
|
||||
operationID = basics::StringUtils::uint64(coordinatorHeader.substr(start));
|
||||
|
||||
// Finally find the ClusterCommOperation record for this operation:
|
||||
{
|
||||
CONDITION_LOCKER(locker, somethingReceived);
|
||||
|
||||
ClusterComm::IndexIterator i;
|
||||
i = receivedByOpID.find(operationID);
|
||||
if (i != receivedByOpID.end()) {
|
||||
TRI_ASSERT(answer != nullptr);
|
||||
ClusterCommOperation* op = *(i->second);
|
||||
op->result.answer = std::move(answer);
|
||||
op->result.answer_code = GeneralResponse::responseCode(
|
||||
op->result.answer->header("x-arango-response-code"));
|
||||
op->result.status = CL_COMM_RECEIVED;
|
||||
// Do we have to do a callback?
|
||||
if (nullptr != op->callback.get()) {
|
||||
if ((*op->callback.get())(&op->result)) {
|
||||
// This is fully processed, so let's remove it from the queue:
|
||||
QueueIterator q = i->second;
|
||||
std::unique_ptr<ClusterCommOperation> o(op);
|
||||
receivedByOpID.erase(i);
|
||||
received.erase(q);
|
||||
return std::string("");
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// We have to look in the send queue as well, as it might not yet
|
||||
// have been moved to the received queue. Note however that it must
|
||||
// have been fully sent, so this is highly unlikely, but not impossible.
|
||||
CONDITION_LOCKER(sendLocker, somethingToSend);
|
||||
|
||||
i = toSendByOpID.find(operationID);
|
||||
if (i != toSendByOpID.end()) {
|
||||
TRI_ASSERT(answer != nullptr);
|
||||
ClusterCommOperation* op = *(i->second);
|
||||
op->result.answer = std::move(answer);
|
||||
op->result.answer_code = GeneralResponse::responseCode(
|
||||
op->result.answer->header("x-arango-response-code"));
|
||||
op->result.status = CL_COMM_RECEIVED;
|
||||
if (nullptr != op->callback) {
|
||||
if ((*op->callback)(&op->result)) {
|
||||
// This is fully processed, so let's remove it from the queue:
|
||||
QueueIterator q = i->second;
|
||||
std::unique_ptr<ClusterCommOperation> o(op);
|
||||
toSendByOpID.erase(i);
|
||||
toSend.erase(q);
|
||||
return std::string("");
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Nothing known about the request, get rid of it:
|
||||
return std::string("operation was already dropped by sender");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Finally tell the others:
|
||||
somethingReceived.broadcast();
|
||||
return std::string("");
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief move an operation from the send to the receive queue
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
|
|
@ -337,13 +337,6 @@ struct ClusterCommOperation {
|
|||
ClusterCommTimeout initEndTime;
|
||||
};
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief global callback for asynchronous REST handler
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void ClusterCommRestCallback(std::string& coordinator,
|
||||
GeneralResponse* response);
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief used to let ClusterComm send a set of requests and look after them
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -491,13 +484,6 @@ class ClusterComm {
|
|||
CoordTransactionID const coordTransactionID,
|
||||
OperationID const operationID, ShardID const& shardID);
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief process an answer coming in on the HTTP socket
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
std::string processAnswer(std::string const& coordinatorHeader,
|
||||
std::unique_ptr<GeneralRequest>&& answer);
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief send an answer HTTP request to a coordinator
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
|
|
@ -1990,6 +1990,7 @@ int ClusterInfo::dropIndexCoordinator(std::string const& databaseName,
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
static std::string const prefixServers = "Current/ServersRegistered";
|
||||
static std::string const mapUniqueToShortId = "Target/MapUniqueToShortID";
|
||||
|
||||
void ClusterInfo::loadServers() {
|
||||
++_serversProt.wantedVersion; // Indicate that after *NOW* somebody has to
|
||||
|
@ -2002,23 +2003,43 @@ void ClusterInfo::loadServers() {
|
|||
return;
|
||||
}
|
||||
|
||||
// Now contact the agency:
|
||||
AgencyCommResult result = _agency.getValues(prefixServers);
|
||||
|
||||
AgencyCommResult result = _agency.sendTransactionWithFailover(
|
||||
AgencyReadTransaction({AgencyCommManager::path(prefixServers),
|
||||
AgencyCommManager::path(mapUniqueToShortId)}));
|
||||
|
||||
|
||||
if (result.successful()) {
|
||||
velocypack::Slice serversRegistered =
|
||||
result.slice()[0].get(std::vector<std::string>(
|
||||
{AgencyCommManager::path(), "Current", "ServersRegistered"}));
|
||||
result.slice()[0].get(
|
||||
std::vector<std::string>(
|
||||
{AgencyCommManager::path(), "Current", "ServersRegistered"}));
|
||||
|
||||
if (serversRegistered.isObject()) {
|
||||
velocypack::Slice serversAliases =
|
||||
result.slice()[0].get(
|
||||
std::vector<std::string>(
|
||||
{AgencyCommManager::path(), "Target", "MapUniqueToShortID"}));
|
||||
|
||||
if (serversRegistered.isObject()) {
|
||||
decltype(_servers) newServers;
|
||||
decltype(_serverAliases) newAliases;
|
||||
|
||||
size_t i = 0;
|
||||
for (auto const& res : VPackObjectIterator(serversRegistered)) {
|
||||
velocypack::Slice slice = res.value;
|
||||
|
||||
if (slice.isObject() && slice.hasKey("endpoint")) {
|
||||
std::string server =
|
||||
arangodb::basics::VelocyPackHelper::getStringValue(
|
||||
slice, "endpoint", "");
|
||||
|
||||
velocypack::Slice aslice;
|
||||
try {
|
||||
aslice = serversAliases.valueAt(i++);
|
||||
std::string alias =
|
||||
arangodb::basics::VelocyPackHelper::getStringValue(
|
||||
slice, "endpoint", "");
|
||||
aslice, "ShortName", "");
|
||||
newAliases.emplace(std::make_pair(alias, res.key.copyString()));
|
||||
} catch (...) {}
|
||||
newServers.emplace(std::make_pair(res.key.copyString(), server));
|
||||
}
|
||||
}
|
||||
|
@ -2027,6 +2048,7 @@ void ClusterInfo::loadServers() {
|
|||
{
|
||||
WRITE_LOCKER(writeLocker, _serversProt.lock);
|
||||
_servers.swap(newServers);
|
||||
_serverAliases.swap(newAliases);
|
||||
_serversProt.doneVersion = storedVersion;
|
||||
_serversProt.isValid = true; // will never be reset to false
|
||||
}
|
||||
|
@ -2056,17 +2078,29 @@ std::string ClusterInfo::getServerEndpoint(ServerID const& serverID) {
|
|||
tries++;
|
||||
}
|
||||
|
||||
std::string serverID_ = serverID;
|
||||
|
||||
while (true) {
|
||||
{
|
||||
READ_LOCKER(readLocker, _serversProt.lock);
|
||||
|
||||
// _serversAliases is a map-type <Alias, ServerID>
|
||||
auto ita = _serverAliases.find(serverID_);
|
||||
|
||||
if (ita != _serverAliases.end()) {
|
||||
serverID_ = (*ita).second;
|
||||
}
|
||||
|
||||
// _servers is a map-type <ServerId, std::string>
|
||||
auto it = _servers.find(serverID);
|
||||
auto it = _servers.find(serverID_);
|
||||
|
||||
if (it != _servers.end()) {
|
||||
return (*it).second;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
if (++tries >= 2) {
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -560,6 +560,8 @@ class ClusterInfo {
|
|||
// The servers, first all, we only need Current here:
|
||||
std::unordered_map<ServerID, std::string>
|
||||
_servers; // from Current/ServersRegistered
|
||||
std::unordered_map<ServerID, std::string>
|
||||
_serverAliases; // from Current/ServersRegistered
|
||||
ProtectionData _serversProt;
|
||||
|
||||
// The DBServers, also from Current:
|
||||
|
|
|
@ -1,65 +0,0 @@
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// DISCLAIMER
|
||||
///
|
||||
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
|
||||
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
|
||||
///
|
||||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
/// you may not use this file except in compliance with the License.
|
||||
/// You may obtain a copy of the License at
|
||||
///
|
||||
/// http://www.apache.org/licenses/LICENSE-2.0
|
||||
///
|
||||
/// Unless required by applicable law or agreed to in writing, software
|
||||
/// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
/// See the License for the specific language governing permissions and
|
||||
/// limitations under the License.
|
||||
///
|
||||
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
///
|
||||
/// @author Jan Steemann
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#include "RestShardHandler.h"
|
||||
#include "Basics/StaticStrings.h"
|
||||
#include "Cluster/ServerState.h"
|
||||
#include "Cluster/ClusterComm.h"
|
||||
#include "Rest/HttpRequest.h"
|
||||
#include "Rest/HttpResponse.h"
|
||||
|
||||
using namespace arangodb;
|
||||
using namespace arangodb::rest;
|
||||
|
||||
RestShardHandler::RestShardHandler(GeneralRequest* request,
|
||||
GeneralResponse* response)
|
||||
: RestBaseHandler(request, response) {}
|
||||
|
||||
bool RestShardHandler::isDirect() const { return true; }
|
||||
|
||||
RestStatus RestShardHandler::execute() {
|
||||
bool found;
|
||||
std::string const& _coordinator =
|
||||
_request->header(StaticStrings::Coordinator, found);
|
||||
|
||||
if (!found) {
|
||||
generateError(arangodb::rest::ResponseCode::BAD,
|
||||
(int)arangodb::rest::ResponseCode::BAD,
|
||||
"header 'X-Arango-Coordinator' is missing");
|
||||
return RestStatus::DONE;
|
||||
}
|
||||
|
||||
std::string coordinatorHeader = _coordinator;
|
||||
std::string result =
|
||||
ClusterComm::instance()->processAnswer(coordinatorHeader, stealRequest());
|
||||
|
||||
if (result == "") {
|
||||
resetResponse(arangodb::rest::ResponseCode::ACCEPTED);
|
||||
} else {
|
||||
generateError(arangodb::rest::ResponseCode::BAD,
|
||||
(int)arangodb::rest::ResponseCode::BAD,
|
||||
result.c_str());
|
||||
}
|
||||
|
||||
return RestStatus::DONE;
|
||||
}
|
|
@ -1,50 +0,0 @@
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// DISCLAIMER
|
||||
///
|
||||
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
|
||||
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
|
||||
///
|
||||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
/// you may not use this file except in compliance with the License.
|
||||
/// You may obtain a copy of the License at
|
||||
///
|
||||
/// http://www.apache.org/licenses/LICENSE-2.0
|
||||
///
|
||||
/// Unless required by applicable law or agreed to in writing, software
|
||||
/// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
/// See the License for the specific language governing permissions and
|
||||
/// limitations under the License.
|
||||
///
|
||||
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
///
|
||||
/// @author Jan Steemann
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#ifndef ARANGOD_CLUSTER_REST_SHARD_HANDLER_H
|
||||
#define ARANGOD_CLUSTER_REST_SHARD_HANDLER_H 1
|
||||
|
||||
#include "Basics/Common.h"
|
||||
#include "RestHandler/RestBaseHandler.h"
|
||||
|
||||
namespace arangodb {
|
||||
namespace rest {
|
||||
class Dispatcher;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief shard control request handler
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
class RestShardHandler : public RestBaseHandler {
|
||||
public:
|
||||
explicit RestShardHandler(GeneralRequest*, GeneralResponse*);
|
||||
|
||||
public:
|
||||
char const* name() const override final { return "RestShardHandler"; }
|
||||
bool isDirect() const override;
|
||||
RestStatus execute() override;
|
||||
};
|
||||
}
|
||||
|
||||
#endif
|
|
@ -72,8 +72,8 @@ AsyncJobResult::AsyncJobResult(IdType jobId, Status status,
|
|||
|
||||
AsyncJobResult::~AsyncJobResult() {}
|
||||
|
||||
AsyncJobManager::AsyncJobManager(callback_fptr callback)
|
||||
: _lock(), _jobs(), _callback(callback) {}
|
||||
AsyncJobManager::AsyncJobManager()
|
||||
: _lock(), _jobs() {}
|
||||
|
||||
AsyncJobManager::~AsyncJobManager() {
|
||||
// remove all results that haven't been fetched
|
||||
|
@ -291,11 +291,5 @@ void AsyncJobManager::finishAsyncJob(RestHandler* handler) {
|
|||
}
|
||||
}
|
||||
|
||||
if (nullptr != ctx) {
|
||||
if (nullptr != _callback) {
|
||||
_callback(ctx->getCoordinatorHeader(), response.get());
|
||||
}
|
||||
|
||||
delete ctx;
|
||||
}
|
||||
delete ctx;
|
||||
}
|
||||
|
|
|
@ -70,11 +70,10 @@ class AsyncJobManager {
|
|||
AsyncJobManager& operator=(AsyncJobManager const&) = delete;
|
||||
|
||||
public:
|
||||
typedef void (*callback_fptr)(std::string&, GeneralResponse*);
|
||||
typedef std::unordered_map<AsyncJobResult::IdType, AsyncJobResult> JobList;
|
||||
|
||||
public:
|
||||
explicit AsyncJobManager(callback_fptr);
|
||||
AsyncJobManager();
|
||||
~AsyncJobManager();
|
||||
|
||||
public:
|
||||
|
@ -95,7 +94,6 @@ class AsyncJobManager {
|
|||
private:
|
||||
basics::ReadWriteLock _lock;
|
||||
JobList _jobs;
|
||||
callback_fptr _callback;
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
|
@ -33,7 +33,6 @@
|
|||
#include "Cluster/ClusterComm.h"
|
||||
#include "Cluster/ClusterFeature.h"
|
||||
#include "Cluster/RestAgencyCallbacksHandler.h"
|
||||
#include "Cluster/RestShardHandler.h"
|
||||
#include "Cluster/TraverserEngineRegistry.h"
|
||||
#include "GeneralServer/AuthenticationFeature.h"
|
||||
#include "GeneralServer/GeneralServer.h"
|
||||
|
@ -238,7 +237,7 @@ void GeneralServerFeature::prepare() {
|
|||
}
|
||||
|
||||
void GeneralServerFeature::start() {
|
||||
_jobManager.reset(new AsyncJobManager(ClusterCommRestCallback));
|
||||
_jobManager.reset(new AsyncJobManager);
|
||||
|
||||
JOB_MANAGER = _jobManager.get();
|
||||
|
||||
|
@ -401,9 +400,6 @@ void GeneralServerFeature::defineHandlers() {
|
|||
RestVocbaseBaseHandler::UPLOAD_PATH,
|
||||
RestHandlerCreator<RestUploadHandler>::createNoData);
|
||||
|
||||
_handlerFactory->addPrefixHandler(
|
||||
"/_api/shard-comm", RestHandlerCreator<RestShardHandler>::createNoData);
|
||||
|
||||
_handlerFactory->addPrefixHandler(
|
||||
"/_api/aql",
|
||||
RestHandlerCreator<aql::RestAqlHandler>::createData<aql::QueryRegistry*>,
|
||||
|
|
|
@ -98,8 +98,7 @@ RestHandler* RestHandlerFactory::createHandler(
|
|||
if (_maintenanceMode.load()) {
|
||||
if ((!ServerState::instance()->isCoordinator() &&
|
||||
path.find("/_api/agency/agency-callbacks") == std::string::npos) ||
|
||||
(path != "/_api/shard-comm" &&
|
||||
path.find("/_api/agency/agency-callbacks") == std::string::npos &&
|
||||
(path.find("/_api/agency/agency-callbacks") == std::string::npos &&
|
||||
path.find("/_api/aql") == std::string::npos)) {
|
||||
LOG(DEBUG) << "Maintenance mode: refused path: " << path;
|
||||
return new MaintenanceHandler(request.release(), response.release());
|
||||
|
|
|
@ -95,10 +95,11 @@ function MovingShardsSuite () {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
function getCleanedOutServers() {
|
||||
var coordEndpoint = global.ArangoClusterInfo.getServerEndpoint("Coordinator001");
|
||||
var coordEndpoint = global.ArangoClusterInfo.getServerEndpoint("Coordinator0001");
|
||||
var request = require("@arangodb/request");
|
||||
var endpointToURL = require("@arangodb/cluster").endpointToURL;
|
||||
var url = endpointToURL(coordEndpoint);
|
||||
|
||||
var res = request({ method: "GET",
|
||||
url: url + "/_admin/cluster/numberOfServers"});
|
||||
var body = res.body;
|
||||
|
@ -178,7 +179,7 @@ function MovingShardsSuite () {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
function cleanOutServer(id) {
|
||||
var coordEndpoint = global.ArangoClusterInfo.getServerEndpoint("Coordinator001");
|
||||
var coordEndpoint = global.ArangoClusterInfo.getServerEndpoint("Coordinator0001");
|
||||
var request = require("@arangodb/request");
|
||||
var endpointToURL = require("@arangodb/cluster").endpointToURL;
|
||||
var url = endpointToURL(coordEndpoint);
|
||||
|
@ -193,7 +194,7 @@ function MovingShardsSuite () {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
function shrinkCluster(toNum) {
|
||||
var coordEndpoint = global.ArangoClusterInfo.getServerEndpoint("Coordinator001");
|
||||
var coordEndpoint = global.ArangoClusterInfo.getServerEndpoint("Coordinator0001");
|
||||
var request = require("@arangodb/request");
|
||||
var endpointToURL = require("@arangodb/cluster").endpointToURL;
|
||||
var url = endpointToURL(coordEndpoint);
|
||||
|
@ -208,7 +209,7 @@ function MovingShardsSuite () {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
function resetCleanedOutServers() {
|
||||
var coordEndpoint = global.ArangoClusterInfo.getServerEndpoint("Coordinator001");
|
||||
var coordEndpoint = global.ArangoClusterInfo.getServerEndpoint("Coordinator0001");
|
||||
var request = require("@arangodb/request");
|
||||
var endpointToURL = require("@arangodb/cluster").endpointToURL;
|
||||
var url = endpointToURL(coordEndpoint);
|
||||
|
@ -232,7 +233,7 @@ function MovingShardsSuite () {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
function moveShard(database, collection, shard, fromServer, toServer) {
|
||||
var coordEndpoint = global.ArangoClusterInfo.getServerEndpoint("Coordinator001");
|
||||
var coordEndpoint = global.ArangoClusterInfo.getServerEndpoint("Coordinator0001");
|
||||
var request = require("@arangodb/request");
|
||||
var endpointToURL = require("@arangodb/cluster").endpointToURL;
|
||||
var url = endpointToURL(coordEndpoint);
|
||||
|
@ -278,7 +279,7 @@ function MovingShardsSuite () {
|
|||
function findServerNotOnList(list) {
|
||||
var count = 1;
|
||||
var str = "" + count;
|
||||
var pad = "000";
|
||||
var pad = "0000";
|
||||
var ans = pad.substring(0, pad.length - str.length) + str;
|
||||
|
||||
var name = "DBServer" + ans;
|
||||
|
@ -353,13 +354,13 @@ function MovingShardsSuite () {
|
|||
testShrinkNoReplication : function() {
|
||||
assertTrue(waitForSynchronousReplication("_system"));
|
||||
shrinkCluster(4);
|
||||
assertTrue(testServerEmpty("DBServer005", true));
|
||||
assertTrue(testServerEmpty("DBServer0005", true));
|
||||
assertTrue(waitForSupervision());
|
||||
shrinkCluster(3);
|
||||
assertTrue(testServerEmpty("DBServer004", true));
|
||||
assertTrue(testServerEmpty("DBServer0004", true));
|
||||
assertTrue(waitForSupervision());
|
||||
shrinkCluster(2);
|
||||
assertTrue(testServerEmpty("DBServer003", true));
|
||||
assertTrue(testServerEmpty("DBServer0003", true));
|
||||
assertTrue(waitForSupervision());
|
||||
},
|
||||
|
||||
|
|
Loading…
Reference in New Issue