From 897ddca313ec4790b1867c4d3ab4dae3a943c8db Mon Sep 17 00:00:00 2001 From: Jan Steemann Date: Thu, 12 Dec 2013 15:25:53 +0100 Subject: [PATCH 1/4] startup/shutdown modifications --- arangod/Cluster/AgencyComm.cpp | 79 ++++++++++++++++++-------- arangod/Cluster/AgencyComm.h | 13 ++++- arangod/Cluster/ApplicationCluster.cpp | 78 ++++++++++++++++++++++--- arangod/Cluster/ApplicationCluster.h | 6 ++ arangod/Cluster/HeartbeatThread.cpp | 17 ++---- arangod/Cluster/HeartbeatThread.h | 3 +- arangod/Cluster/ServerState.cpp | 10 +++- arangod/Cluster/ServerState.h | 28 ++++++++- 8 files changed, 182 insertions(+), 52 deletions(-) diff --git a/arangod/Cluster/AgencyComm.cpp b/arangod/Cluster/AgencyComm.cpp index 8ff012010a..948b7277a4 100644 --- a/arangod/Cluster/AgencyComm.cpp +++ b/arangod/Cluster/AgencyComm.cpp @@ -31,6 +31,7 @@ #include "Basics/WriteLocker.h" #include "BasicsC/json.h" #include "BasicsC/logging.h" +#include "Cluster/ServerState.h" #include "Rest/Endpoint.h" #include "SimpleHttpClient/GeneralClientConnection.h" #include "SimpleHttpClient/SimpleHttpClient.h" @@ -441,18 +442,20 @@ bool AgencyComm::addEndpoint (std::string const& endpointSpecification, ++it; } - // not found a previous endpoint, now create one - AgencyEndpoint* agencyEndpoint = createAgencyEndpoint(endpointSpecification); + // didn't find the endpoint in our list of endpoints, so now create a new one + for (size_t i = 0; i < NumConnections; ++i) { + AgencyEndpoint* agencyEndpoint = createAgencyEndpoint(endpointSpecification); - if (agencyEndpoint == 0) { - return false; - } + if (agencyEndpoint == 0) { + return false; + } - if (toFront) { - AgencyComm::_globalEndpoints.push_front(agencyEndpoint); - } - else { - AgencyComm::_globalEndpoints.push_back(agencyEndpoint); + if (toFront) { + AgencyComm::_globalEndpoints.push_front(agencyEndpoint); + } + else { + AgencyComm::_globalEndpoints.push_back(agencyEndpoint); + } } } @@ -663,6 +666,19 @@ AgencyEndpoint* AgencyComm::createAgencyEndpoint (std::string const& endpointSpe // --SECTION-- public methods // ----------------------------------------------------------------------------- +//////////////////////////////////////////////////////////////////////////////// +/// @brief sends the current server state to the agency +//////////////////////////////////////////////////////////////////////////////// + +bool AgencyComm::sendServerState () { + const std::string value = ServerState::stateToString(ServerState::instance()->getState()) + + ":" + + AgencyComm::generateStamp(); + + AgencyCommResult result(setValue("State/ServerStates/" + ServerState::instance()->getId(), value)); + return result.successful(); +} + //////////////////////////////////////////////////////////////////////////////// /// @brief gets the backend version //////////////////////////////////////////////////////////////////////////////// @@ -845,7 +861,7 @@ AgencyCommResult AgencyComm::watchValue (std::string const& key, /// @brief pop an endpoint from the queue //////////////////////////////////////////////////////////////////////////////// -AgencyEndpoint* AgencyComm::popEndpoint () { +AgencyEndpoint* AgencyComm::popEndpoint (std::string const& endpoint) { while (1) { { WRITE_LOCKER(AgencyComm::_globalLock); @@ -858,6 +874,14 @@ AgencyEndpoint* AgencyComm::popEndpoint () { assert(agencyEndpoint != 0); + if (! endpoint.empty() && + agencyEndpoint->_endpoint->getSpecification() != endpoint) { + // we're looking for a different endpoint + ++it; + continue; + } + + if (! agencyEndpoint->_busy) { agencyEndpoint->_busy = true; @@ -940,13 +964,16 @@ bool AgencyComm::sendWithFailover (triagens::rest::HttpRequest::HttpRequestType numEndpoints = AgencyComm::_globalEndpoints.size(); assert(numEndpoints > 0); } - + size_t tries = 0; std::string realUrl = url; + std::string forceEndpoint = ""; while (tries++ < numEndpoints) { - AgencyEndpoint* agencyEndpoint = popEndpoint(); - + AgencyEndpoint* agencyEndpoint = popEndpoint(forceEndpoint); + + assert(agencyEndpoint != 0); + send(agencyEndpoint->_connection, method, timeout, @@ -987,16 +1014,14 @@ bool AgencyComm::sendWithFailover (triagens::rest::HttpRequest::HttpRequestType realUrl = endpoint.substr(delim); endpoint = endpoint.substr(0, delim); - - LOG_WARNING("handling failover from '%s' to '%s'", - agencyEndpoint->_endpoint->getSpecification().c_str(), - endpoint.c_str()); if (! AgencyComm::hasEndpoint(endpoint)) { // redirection to an unknown endpoint - if (_addNewEndpoints) { AgencyComm::addEndpoint(endpoint, true); + + LOG_INFO("adding agency-endpoint '%s'", endpoint.c_str()); + // re-check the new endpoint if (AgencyComm::hasEndpoint(endpoint)) { @@ -1012,10 +1037,14 @@ bool AgencyComm::sendWithFailover (triagens::rest::HttpRequest::HttpRequestType return false; } + forceEndpoint = endpoint; + // if we get here, we'll just use the next endpoint from the list continue; } + forceEndpoint = ""; + // we can stop iterating over endpoints if the operation succeeded, // if a watch timed out or // if the reason for failure was a client-side error @@ -1060,13 +1089,13 @@ bool AgencyComm::send (triagens::httpclient::GeneralClientConnection* connection result._statusCode = 0; -/* - LOG_INFO("sending %s request to agency at endpoint '%s', url '%s': %s", + + LOG_TRACE("sending %s request to agency at endpoint '%s', url '%s': %s", triagens::rest::HttpRequest::translateMethod(method).c_str(), connection->getEndpoint()->getSpecification().c_str(), url.c_str(), body.c_str()); - */ + triagens::httpclient::SimpleHttpClient client(connection, timeout, false); @@ -1120,12 +1149,12 @@ bool AgencyComm::send (triagens::httpclient::GeneralClientConnection* connection if (found) { result._index = triagens::basics::StringUtils::uint64(lastIndex); } -/* - LOG_INFO("request to agency returned status code %d, message: '%s', body: '%s'", + + LOG_TRACE("request to agency returned status code %d, message: '%s', body: '%s'", result._statusCode, result._message.c_str(), result._body.c_str()); -*/ + delete response; return result.successful(); diff --git a/arangod/Cluster/AgencyComm.h b/arangod/Cluster/AgencyComm.h index 6975687fbc..2fc7674d54 100644 --- a/arangod/Cluster/AgencyComm.h +++ b/arangod/Cluster/AgencyComm.h @@ -302,6 +302,12 @@ namespace triagens { // --SECTION-- public methods // ----------------------------------------------------------------------------- +//////////////////////////////////////////////////////////////////////////////// +/// @brief sends the current server state to the agency +//////////////////////////////////////////////////////////////////////////////// + + bool sendServerState (); + //////////////////////////////////////////////////////////////////////////////// /// @brief gets the backend version //////////////////////////////////////////////////////////////////////////////// @@ -373,7 +379,7 @@ namespace triagens { /// @brief pop an endpoint from the queue //////////////////////////////////////////////////////////////////////////////// - AgencyEndpoint* popEndpoint (); + AgencyEndpoint* popEndpoint (std::string const&); //////////////////////////////////////////////////////////////////////////////// /// @brief reinsert an endpoint into the queue @@ -458,6 +464,11 @@ namespace triagens { static AgencyConnectionOptions _globalConnectionOptions; +//////////////////////////////////////////////////////////////////////////////// +/// @brief number of connections per endpoint +//////////////////////////////////////////////////////////////////////////////// + + static const size_t NumConnections = 3; }; } } diff --git a/arangod/Cluster/ApplicationCluster.cpp b/arangod/Cluster/ApplicationCluster.cpp index 70d55df658..dabaca7021 100644 --- a/arangod/Cluster/ApplicationCluster.cpp +++ b/arangod/Cluster/ApplicationCluster.cpp @@ -155,6 +155,8 @@ bool ApplicationCluster::start () { return true; } + ServerState::instance()->setId(_myId); + // perfom an initial connect to the agency const std::string endpoints = AgencyComm::getEndpointsString(); @@ -207,8 +209,11 @@ bool ApplicationCluster::start () { ServerState::instance()->setRole(role); ServerState::instance()->setState(ServerState::STATE_STARTUP); - + + // the agency about our state AgencyComm comm; + comm.sendServerState(); + const std::string version = comm.getVersion(); LOG_INFO("Cluster feature is turned on. " @@ -221,7 +226,7 @@ bool ApplicationCluster::start () { ServerState::roleToString(role).c_str()); // start heartbeat thread - _heartbeat = new HeartbeatThread(_myId, _heartbeatInterval * 1000, 5); + _heartbeat = new HeartbeatThread(_heartbeatInterval * 1000, 5); if (_heartbeat == 0) { LOG_FATAL_AND_EXIT("unable to start cluster heartbeat thread"); @@ -231,6 +236,40 @@ bool ApplicationCluster::start () { LOG_FATAL_AND_EXIT("heartbeat could not connect to agency endpoints (%s)", endpoints.c_str()); } + + return true; +} + +//////////////////////////////////////////////////////////////////////////////// +/// {@inheritDoc} +//////////////////////////////////////////////////////////////////////////////// + +bool ApplicationCluster::open () { + ServerState::RoleEnum role = ServerState::instance()->getRole(); + + // tell the agency that we are ready + AgencyComm comm; + AgencyCommResult result = comm.setValue("State/ServersRegistered/" + _myId, _myAddress); + + if (! result.successful()) { + LOG_FATAL_AND_EXIT("unable to register server in agency"); + } + + if (role == ServerState::ROLE_COORDINATOR) { + ServerState::instance()->setState(ServerState::STATE_SERVING); + + // register coordinator + AgencyCommResult result = comm.setValue("State/Coordinators/" + _myId, "none"); + if (! result.successful()) { + LOG_FATAL_AND_EXIT("unable to register coordinator in agency"); + } + } + else if (role == ServerState::ROLE_PRIMARY) { + ServerState::instance()->setState(ServerState::STATE_SERVINGASYNC); + } + else if (role == ServerState::ROLE_SECONDARY) { + LOG_FATAL_AND_EXIT("secondary server tasks are currently not implemented"); + } return true; } @@ -245,6 +284,12 @@ void ApplicationCluster::close () { } _heartbeat->stop(); + + // change into shutdown state + ServerState::instance()->setState(ServerState::STATE_SHUTDOWN); + + AgencyComm comm; + comm.sendServerState(); } //////////////////////////////////////////////////////////////////////////////// @@ -255,9 +300,18 @@ void ApplicationCluster::stop () { if (! enabled()) { return; } + + // change into shutdown state + ServerState::instance()->setState(ServerState::STATE_SHUTDOWN); + + AgencyComm comm; + comm.sendServerState(); _heartbeat->stop(); - + + // unregister ourselves + comm.removeValues("State/ServersRegistered/" + _myId, false); + AgencyComm::cleanup(); } @@ -304,17 +358,20 @@ std::string ApplicationCluster::getEndpointForId () const { ServerState::RoleEnum ApplicationCluster::checkCoordinatorsList () const { // fetch value at TmpConfig/DBServers // we need this to determine the server's role + const std::string key = "TmpConfig/Coordinators"; + AgencyComm comm; - AgencyCommResult result = comm.getValues("TmpConfig/Coordinators", true); + AgencyCommResult result = comm.getValues(key, true); if (! result.successful()) { const std::string endpoints = AgencyComm::getEndpointsString(); LOG_FATAL_AND_EXIT("Could not fetch configuration from agency endpoints (%s): " - "got status code %d, message: %s", + "got status code %d, message: %s, key: %s", endpoints.c_str(), result._statusCode, - result.errorMessage().c_str()); + result.errorMessage().c_str(), + key.c_str()); } std::map out; @@ -340,17 +397,20 @@ ServerState::RoleEnum ApplicationCluster::checkCoordinatorsList () const { ServerState::RoleEnum ApplicationCluster::checkServersList () const { // fetch value at TmpConfig/DBServers // we need this to determine the server's role + const std::string key = "TmpConfig/DBServers"; + AgencyComm comm; - AgencyCommResult result = comm.getValues("TmpConfig/DBServers", true); + AgencyCommResult result = comm.getValues(key, true); if (! result.successful()) { const std::string endpoints = AgencyComm::getEndpointsString(); LOG_FATAL_AND_EXIT("Could not fetch configuration from agency endpoints (%s): " - "got status code %d, message: %s", + "got status code %d, message: %s, key: %s", endpoints.c_str(), result._statusCode, - result.errorMessage().c_str()); + result.errorMessage().c_str(), + key.c_str()); } std::map out; diff --git a/arangod/Cluster/ApplicationCluster.h b/arangod/Cluster/ApplicationCluster.h index dace1d3c49..4a325a4879 100644 --- a/arangod/Cluster/ApplicationCluster.h +++ b/arangod/Cluster/ApplicationCluster.h @@ -96,6 +96,12 @@ namespace triagens { bool prepare (); +//////////////////////////////////////////////////////////////////////////////// +/// {@inheritDoc} +//////////////////////////////////////////////////////////////////////////////// + + bool open (); + //////////////////////////////////////////////////////////////////////////////// /// {@inheritDoc} //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/Cluster/HeartbeatThread.cpp b/arangod/Cluster/HeartbeatThread.cpp index 465d58b9d6..c699a668bf 100644 --- a/arangod/Cluster/HeartbeatThread.cpp +++ b/arangod/Cluster/HeartbeatThread.cpp @@ -44,13 +44,12 @@ using namespace triagens::arango; /// @brief constructs a heartbeat thread //////////////////////////////////////////////////////////////////////////////// -HeartbeatThread::HeartbeatThread (std::string const& myId, - uint64_t interval, +HeartbeatThread::HeartbeatThread (uint64_t interval, uint64_t maxFailsBeforeWarning) : Thread("heartbeat"), _agency(), _condition(), - _myId(myId), + _myId(ServerState::instance()->getId()), _interval(interval), _maxFailsBeforeWarning(maxFailsBeforeWarning), _numFails(0), @@ -238,15 +237,9 @@ bool HeartbeatThread::handleStateChange (AgencyCommResult const& result, //////////////////////////////////////////////////////////////////////////////// bool HeartbeatThread::sendState () { - const std::string value = ServerState::stateToString(ServerState::instance()->getState()) + - ":" + - AgencyComm::generateStamp(); + const bool result = _agency.sendServerState(); - // return value is intentionally not handled - // if sending the current state fails, we'll just try again in the next iteration - AgencyCommResult result(_agency.setValue("State/ServerStates/" + _myId, value)); - - if (result.successful()) { + if (result) { _numFails = 0; } else { @@ -257,7 +250,7 @@ bool HeartbeatThread::sendState () { } } - return result.successful(); + return result; } // Local Variables: diff --git a/arangod/Cluster/HeartbeatThread.h b/arangod/Cluster/HeartbeatThread.h index 4fcc3399e5..3f29d7cad2 100644 --- a/arangod/Cluster/HeartbeatThread.h +++ b/arangod/Cluster/HeartbeatThread.h @@ -62,8 +62,7 @@ namespace triagens { /// @brief constructs a heartbeat thread //////////////////////////////////////////////////////////////////////////////// - HeartbeatThread (std::string const&, - uint64_t, + HeartbeatThread (uint64_t, uint64_t); //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/Cluster/ServerState.cpp b/arangod/Cluster/ServerState.cpp index fa14ce0055..2f94353de0 100644 --- a/arangod/Cluster/ServerState.cpp +++ b/arangod/Cluster/ServerState.cpp @@ -163,11 +163,15 @@ ServerState::StateEnum ServerState::getState () { /// @brief set the current state //////////////////////////////////////////////////////////////////////////////// -void ServerState::setState (StateEnum state) { +void ServerState::setState (StateEnum state) { bool result = false; WRITE_LOCKER(_lock); + if (state == _state) { + return; + } + if (_role == ROLE_PRIMARY) { result = checkPrimaryState(state); } @@ -225,7 +229,9 @@ bool ServerState::checkPrimaryState (StateEnum state) { } else if (state == STATE_SHUTDOWN) { return (_state == STATE_STARTUP || - _state == STATE_STOPPED); + _state == STATE_STOPPED || + _state == STATE_SERVINGSYNC || + _state == STATE_SERVINGASYNC); } // anything else is invalid diff --git a/arangod/Cluster/ServerState.h b/arangod/Cluster/ServerState.h index 30b90529cb..be5b6ca256 100644 --- a/arangod/Cluster/ServerState.h +++ b/arangod/Cluster/ServerState.h @@ -153,6 +153,26 @@ namespace triagens { _role = role; } +//////////////////////////////////////////////////////////////////////////////// +/// @brief get the server id +//////////////////////////////////////////////////////////////////////////////// + + inline std::string getId () const { + return _id; + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief set the server id +//////////////////////////////////////////////////////////////////////////////// + + void setId (std::string const& id) { + // id can be set just once + assert(_id.empty()); + assert(! id.empty()); + + _id = id; + } + //////////////////////////////////////////////////////////////////////////////// /// @brief get the current state //////////////////////////////////////////////////////////////////////////////// @@ -163,7 +183,7 @@ namespace triagens { /// @brief set the current state //////////////////////////////////////////////////////////////////////////////// - void setState (StateEnum); + void setState (StateEnum); // ----------------------------------------------------------------------------- // --SECTION-- private methods @@ -189,6 +209,12 @@ namespace triagens { private: +//////////////////////////////////////////////////////////////////////////////// +/// @brief the server's id. can be set just once +//////////////////////////////////////////////////////////////////////////////// + + std::string _id; + //////////////////////////////////////////////////////////////////////////////// /// @brief r/w lock for state //////////////////////////////////////////////////////////////////////////////// From e1fb1f3d14a28dfa1c9ee208d89edd2c10437dd6 Mon Sep 17 00:00:00 2001 From: Jan Steemann Date: Thu, 12 Dec 2013 15:59:16 +0100 Subject: [PATCH 2/4] added stub REST API for cluster requests --- arangod/Cluster/RestShardHandler.cpp | 101 ++++++++++++++++++++++++++ arangod/Cluster/RestShardHandler.h | 103 +++++++++++++++++++++++++++ arangod/Makefile.files | 5 +- arangod/RestServer/ArangoServer.cpp | 7 ++ 4 files changed, 214 insertions(+), 2 deletions(-) create mode 100644 arangod/Cluster/RestShardHandler.cpp create mode 100644 arangod/Cluster/RestShardHandler.h diff --git a/arangod/Cluster/RestShardHandler.cpp b/arangod/Cluster/RestShardHandler.cpp new file mode 100644 index 0000000000..03296ce8fa --- /dev/null +++ b/arangod/Cluster/RestShardHandler.cpp @@ -0,0 +1,101 @@ +//////////////////////////////////////////////////////////////////////////////// +/// @brief shard control request handler +/// +/// @file +/// +/// DISCLAIMER +/// +/// Copyright 2004-2013 triAGENS GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is triAGENS GmbH, Cologne, Germany +/// +/// @author Jan Steemann +/// @author Copyright 2010-2013, triAGENS GmbH, Cologne, Germany +//////////////////////////////////////////////////////////////////////////////// + +#include "RestShardHandler.h" +#include "Rest/HttpRequest.h" +#include "Rest/HttpResponse.h" +#include "Cluster/ServerState.h" + +using namespace triagens::arango; + +// ----------------------------------------------------------------------------- +// --SECTION-- public constants +// ----------------------------------------------------------------------------- + +//////////////////////////////////////////////////////////////////////////////// +/// @brief name of the queue +//////////////////////////////////////////////////////////////////////////////// + +const string RestShardHandler::QUEUE_NAME = "STANDARD"; + +// ----------------------------------------------------------------------------- +// --SECTION-- constructors and destructors +// ----------------------------------------------------------------------------- + +//////////////////////////////////////////////////////////////////////////////// +/// @brief constructor +//////////////////////////////////////////////////////////////////////////////// + +RestShardHandler::RestShardHandler (triagens::rest::HttpRequest* request) + : RestBaseHandler(request) { + +} + +// ----------------------------------------------------------------------------- +// --SECTION-- Handler methods +// ----------------------------------------------------------------------------- + +//////////////////////////////////////////////////////////////////////////////// +/// {@inheritDoc} +//////////////////////////////////////////////////////////////////////////////// + +bool RestShardHandler::isDirect () { + return true; +} + +//////////////////////////////////////////////////////////////////////////////// +/// {@inheritDoc} +//////////////////////////////////////////////////////////////////////////////// + +string const& RestShardHandler::queue () const { + return QUEUE_NAME; +} + +//////////////////////////////////////////////////////////////////////////////// +/// {@inheritDoc} +//////////////////////////////////////////////////////////////////////////////// + +triagens::rest::HttpHandler::status_e RestShardHandler::execute () { + ServerState::RoleEnum role = ServerState::instance()->getRole(); + + if (role == ServerState::ROLE_COORDINATOR) { + generateError(triagens::rest::HttpResponse::BAD, + (int) triagens::rest::HttpResponse::BAD, + "this API is meant to be called on a coordinator node"); + return HANDLER_DONE; + } + + // respond with a 202 + _response = createResponse(triagens::rest::HttpResponse::ACCEPTED); + + return HANDLER_DONE; +} + +// Local Variables: +// mode: outline-minor +// outline-regexp: "/// @brief\\|/// {@inheritDoc}\\|/// @addtogroup\\|/// @page\\|// --SECTION--\\|/// @\\}" +// End: diff --git a/arangod/Cluster/RestShardHandler.h b/arangod/Cluster/RestShardHandler.h new file mode 100644 index 0000000000..07ca286a9d --- /dev/null +++ b/arangod/Cluster/RestShardHandler.h @@ -0,0 +1,103 @@ +//////////////////////////////////////////////////////////////////////////////// +/// @brief shard request handler +/// +/// @file +/// +/// DISCLAIMER +/// +/// Copyright 2004-2013 triAGENS GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is triAGENS GmbH, Cologne, Germany +/// +/// @author Jan Steemann +/// @author Copyright 2010-2013, triAGENS GmbH, Cologne, Germany +//////////////////////////////////////////////////////////////////////////////// + +#ifndef TRIAGENS_ARANGOD_CLUSTER_REST_SHARD_HANDLER_H +#define TRIAGENS_ARANGOD_CLUSTER_REST_SHARD_HANDLER_H 1 + +#include "Admin/RestBaseHandler.h" + +namespace triagens { + namespace arango { + +// ----------------------------------------------------------------------------- +// --SECTION-- class RestShardHandler +// ----------------------------------------------------------------------------- + +//////////////////////////////////////////////////////////////////////////////// +/// @brief shard control request handler +//////////////////////////////////////////////////////////////////////////////// + + class RestShardHandler : public triagens::admin::RestBaseHandler { + +// ----------------------------------------------------------------------------- +// --SECTION-- constructors and destructors +// ----------------------------------------------------------------------------- + + public: + +//////////////////////////////////////////////////////////////////////////////// +/// @brief constructor +//////////////////////////////////////////////////////////////////////////////// + + RestShardHandler (triagens::rest::HttpRequest* request); + +// ----------------------------------------------------------------------------- +// --SECTION-- Handler methods +// ----------------------------------------------------------------------------- + + public: + +//////////////////////////////////////////////////////////////////////////////// +/// {@inheritDoc} +//////////////////////////////////////////////////////////////////////////////// + + bool isDirect (); + +//////////////////////////////////////////////////////////////////////////////// +/// {@inheritDoc} +//////////////////////////////////////////////////////////////////////////////// + + string const& queue () const; + +//////////////////////////////////////////////////////////////////////////////// +/// @brief executes the handler +//////////////////////////////////////////////////////////////////////////////// + + status_e execute (); + +// ----------------------------------------------------------------------------- +// --SECTION-- private variables +// ----------------------------------------------------------------------------- + + private: + +//////////////////////////////////////////////////////////////////////////////// +/// @brief name of the queue +//////////////////////////////////////////////////////////////////////////////// + + static const std::string QUEUE_NAME; + + }; + } +} + +#endif + +// Local Variables: +// mode: outline-minor +// outline-regexp: "/// @brief\\|/// {@inheritDoc}\\|/// @addtogroup\\|/// @page\\|// --SECTION--\\|/// @\\}" +// End: diff --git a/arangod/Makefile.files b/arangod/Makefile.files index f1ef6d7c5e..96178dda18 100644 --- a/arangod/Makefile.files +++ b/arangod/Makefile.files @@ -112,9 +112,10 @@ if ENABLE_CLUSTER bin_arangod_SOURCES += \ arangod/Cluster/AgencyComm.cpp \ arangod/Cluster/ApplicationCluster.cpp \ + arangod/Cluster/ClusterComm.cpp \ arangod/Cluster/HeartbeatThread.cpp \ - arangod/Cluster/ServerState.cpp \ - arangod/Cluster/ClusterComm.cpp + arangod/Cluster/RestShardHandler.cpp \ + arangod/Cluster/ServerState.cpp endif diff --git a/arangod/RestServer/ArangoServer.cpp b/arangod/RestServer/ArangoServer.cpp index a2658607a9..3fb14b075d 100644 --- a/arangod/RestServer/ArangoServer.cpp +++ b/arangod/RestServer/ArangoServer.cpp @@ -74,6 +74,7 @@ #ifdef TRI_ENABLE_CLUSTER #include "Cluster/ApplicationCluster.h" +#include "Cluster/RestShardHandler.h" #endif #include "V8/V8LineEditor.h" @@ -134,6 +135,12 @@ static void DefineApiHandlers (HttpHandlerFactory* factory, // add "/upload" handler factory->addPrefixHandler(RestVocbaseBaseHandler::UPLOAD_PATH, RestHandlerCreator::createNoData); + +#ifdef TRI_ENABLE_CLUSTER + // add "/shard-comm" handler + factory->addPrefixHandler("/_api/shard-comm", + RestHandlerCreator::createNoData); +#endif } //////////////////////////////////////////////////////////////////////////////// From 8eccb0a35fc5690b26203d47011c4b69337bc5a9 Mon Sep 17 00:00:00 2001 From: Jan Steemann Date: Thu, 12 Dec 2013 16:42:25 +0100 Subject: [PATCH 3/4] added dispatcher to shardhandler --- arangod/Cluster/ApplicationCluster.cpp | 4 +++ arangod/Cluster/RestShardHandler.cpp | 45 +++++++++++++++++++++++--- arangod/Cluster/RestShardHandler.h | 13 +++++++- arangod/RestServer/ArangoServer.cpp | 9 ++++-- lib/GeneralServer/GeneralServerJob.h | 1 - lib/Rest/Handler.h | 1 - 6 files changed, 63 insertions(+), 10 deletions(-) diff --git a/arangod/Cluster/ApplicationCluster.cpp b/arangod/Cluster/ApplicationCluster.cpp index dabaca7021..89fde37019 100644 --- a/arangod/Cluster/ApplicationCluster.cpp +++ b/arangod/Cluster/ApplicationCluster.cpp @@ -245,6 +245,10 @@ bool ApplicationCluster::start () { //////////////////////////////////////////////////////////////////////////////// bool ApplicationCluster::open () { + if (! enabled()) { + return true; + } + ServerState::RoleEnum role = ServerState::instance()->getRole(); // tell the agency that we are ready diff --git a/arangod/Cluster/RestShardHandler.cpp b/arangod/Cluster/RestShardHandler.cpp index 03296ce8fa..c4eec69ac4 100644 --- a/arangod/Cluster/RestShardHandler.cpp +++ b/arangod/Cluster/RestShardHandler.cpp @@ -26,9 +26,11 @@ //////////////////////////////////////////////////////////////////////////////// #include "RestShardHandler.h" + +#include "Cluster/ServerState.h" +#include "Dispatcher/Dispatcher.h" #include "Rest/HttpRequest.h" #include "Rest/HttpResponse.h" -#include "Cluster/ServerState.h" using namespace triagens::arango; @@ -50,9 +52,14 @@ const string RestShardHandler::QUEUE_NAME = "STANDARD"; /// @brief constructor //////////////////////////////////////////////////////////////////////////////// -RestShardHandler::RestShardHandler (triagens::rest::HttpRequest* request) - : RestBaseHandler(request) { +RestShardHandler::RestShardHandler (triagens::rest::HttpRequest* request, + void* data) + : RestBaseHandler(request), + _dispatcher(0) { + + _dispatcher = static_cast(data); + assert(_dispatcher != 0); } // ----------------------------------------------------------------------------- @@ -89,9 +96,39 @@ triagens::rest::HttpHandler::status_e RestShardHandler::execute () { return HANDLER_DONE; } + bool found; + char const* coordinator = _request->header("x-arango-coordinator", found); + + if (! found) { + generateError(triagens::rest::HttpResponse::BAD, + (int) triagens::rest::HttpResponse::BAD, + "header 'x-arango-coordinator' is missing"); + return HANDLER_DONE; + } + + char const* operation = _request->header("x-arango-operation", found); + if (! found) { + generateError(triagens::rest::HttpResponse::BAD, + (int) triagens::rest::HttpResponse::BAD, + "header 'x-arango-operation' is missing"); + } + +/* + Job* ajob = handler->createJob(this, true); + ServerJob* job = dynamic_cast(ajob); + if (jobId != 0) { + _jobManager->initAsyncJob(job, jobId); + } + if (! _dispatcher->addJob(job)) { + // could not add job to job queue + delete handler; + + return false; + } +*/ // respond with a 202 _response = createResponse(triagens::rest::HttpResponse::ACCEPTED); - + return HANDLER_DONE; } diff --git a/arangod/Cluster/RestShardHandler.h b/arangod/Cluster/RestShardHandler.h index 07ca286a9d..e67da78550 100644 --- a/arangod/Cluster/RestShardHandler.h +++ b/arangod/Cluster/RestShardHandler.h @@ -31,6 +31,10 @@ #include "Admin/RestBaseHandler.h" namespace triagens { + namespace rest { + class Dispatcher; + } + namespace arango { // ----------------------------------------------------------------------------- @@ -53,7 +57,8 @@ namespace triagens { /// @brief constructor //////////////////////////////////////////////////////////////////////////////// - RestShardHandler (triagens::rest::HttpRequest* request); + RestShardHandler (triagens::rest::HttpRequest* request, + void*); // ----------------------------------------------------------------------------- // --SECTION-- Handler methods @@ -85,6 +90,12 @@ namespace triagens { private: +//////////////////////////////////////////////////////////////////////////////// +/// @brief dispatcher +//////////////////////////////////////////////////////////////////////////////// + + triagens::rest::Dispatcher* _dispatcher; + //////////////////////////////////////////////////////////////////////////////// /// @brief name of the queue //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/RestServer/ArangoServer.cpp b/arangod/RestServer/ArangoServer.cpp index 3fb14b075d..261649b3df 100644 --- a/arangod/RestServer/ArangoServer.cpp +++ b/arangod/RestServer/ArangoServer.cpp @@ -107,6 +107,7 @@ using namespace triagens::arango; static void DefineApiHandlers (HttpHandlerFactory* factory, ApplicationAdminServer* admin, + ApplicationDispatcher* dispatcher, AsyncJobManager* jobManager) { // add "/version" handler @@ -139,7 +140,8 @@ static void DefineApiHandlers (HttpHandlerFactory* factory, #ifdef TRI_ENABLE_CLUSTER // add "/shard-comm" handler factory->addPrefixHandler("/_api/shard-comm", - RestHandlerCreator::createNoData); + RestHandlerCreator::createData, + (void*) dispatcher->dispatcher()); #endif } @@ -149,6 +151,7 @@ static void DefineApiHandlers (HttpHandlerFactory* factory, static void DefineAdminHandlers (HttpHandlerFactory* factory, ApplicationAdminServer* admin, + ApplicationDispatcher* dispatcher, AsyncJobManager* jobManager) { // add "/version" handler @@ -792,8 +795,8 @@ int ArangoServer::startupServer () { HttpHandlerFactory* handlerFactory = _applicationEndpointServer->getHandlerFactory(); - DefineApiHandlers(handlerFactory, _applicationAdminServer, _jobManager); - DefineAdminHandlers(handlerFactory, _applicationAdminServer, _jobManager); + DefineApiHandlers(handlerFactory, _applicationAdminServer, _applicationDispatcher, _jobManager); + DefineAdminHandlers(handlerFactory, _applicationAdminServer, _applicationDispatcher, _jobManager); // add action handler handlerFactory->addPrefixHandler( diff --git a/lib/GeneralServer/GeneralServerJob.h b/lib/GeneralServer/GeneralServerJob.h index 87485e3b42..4bfbb5315a 100644 --- a/lib/GeneralServer/GeneralServerJob.h +++ b/lib/GeneralServer/GeneralServerJob.h @@ -215,7 +215,6 @@ namespace triagens { switch (status) { case Handler::HANDLER_DONE: return Job::JOB_DONE; - case Handler::HANDLER_DETACH: return Job::JOB_DETACH; case Handler::HANDLER_REQUEUE: return Job::JOB_REQUEUE; case Handler::HANDLER_FAILED: return Job::JOB_FAILED; } diff --git a/lib/Rest/Handler.h b/lib/Rest/Handler.h index a53c747b3f..eed4c55927 100644 --- a/lib/Rest/Handler.h +++ b/lib/Rest/Handler.h @@ -81,7 +81,6 @@ namespace triagens { enum status_e { HANDLER_DONE, - HANDLER_DETACH, HANDLER_REQUEUE, HANDLER_FAILED }; From f0184d2786f200848c82f110e3833b026cb5e3d2 Mon Sep 17 00:00:00 2001 From: Jan Steemann Date: Thu, 12 Dec 2013 17:51:45 +0100 Subject: [PATCH 4/4] not sure about the restshardhandler yet --- arangod/Cluster/RestShardHandler.cpp | 32 +++++++++------- lib/Dispatcher/Job.cpp | 56 +--------------------------- lib/Dispatcher/Job.h | 43 --------------------- lib/GeneralServer/GeneralServerJob.h | 4 +- lib/HttpServer/AsyncJobManager.h | 4 +- 5 files changed, 25 insertions(+), 114 deletions(-) diff --git a/arangod/Cluster/RestShardHandler.cpp b/arangod/Cluster/RestShardHandler.cpp index c4eec69ac4..522d6d90b2 100644 --- a/arangod/Cluster/RestShardHandler.cpp +++ b/arangod/Cluster/RestShardHandler.cpp @@ -32,6 +32,11 @@ #include "Rest/HttpRequest.h" #include "Rest/HttpResponse.h" +#include "HttpServer/HttpServer.h" +#include "HttpServer/HttpHandlerFactory.h" +#include "GeneralServer/GeneralServerJob.h" +#include "GeneralServer/GeneralServer.h" + using namespace triagens::arango; // ----------------------------------------------------------------------------- @@ -95,7 +100,7 @@ triagens::rest::HttpHandler::status_e RestShardHandler::execute () { "this API is meant to be called on a coordinator node"); return HANDLER_DONE; } - +/* bool found; char const* coordinator = _request->header("x-arango-coordinator", found); @@ -112,20 +117,21 @@ triagens::rest::HttpHandler::status_e RestShardHandler::execute () { (int) triagens::rest::HttpResponse::BAD, "header 'x-arango-operation' is missing"); } + + char const* url = _request->header("x-arango-url", found); + if (! found) { + generateError(triagens::rest::HttpResponse::BAD, + (int) triagens::rest::HttpResponse::BAD, + "header 'x-arango-url' is missing"); + } +*/ /* - Job* ajob = handler->createJob(this, true); - ServerJob* job = dynamic_cast(ajob); - if (jobId != 0) { - _jobManager->initAsyncJob(job, jobId); - } - if (! _dispatcher->addJob(job)) { - // could not add job to job queue - delete handler; - - return false; - } -*/ + triagens::rest::HttpHandler* handler = this->_server->createHandler(_request); + triagens::rest::Job* job = new triagens::rest::GeneralServerJob(0, handler, true); + + _dispatcher->addJob(job); +*/ // respond with a 202 _response = createResponse(triagens::rest::HttpResponse::ACCEPTED); diff --git a/lib/Dispatcher/Job.cpp b/lib/Dispatcher/Job.cpp index d508367513..c1e281bffb 100644 --- a/lib/Dispatcher/Job.cpp +++ b/lib/Dispatcher/Job.cpp @@ -47,8 +47,7 @@ using namespace std; //////////////////////////////////////////////////////////////////////////////// Job::Job (string const& name) - : _name(name), - _observers() { + : _name(name) { } //////////////////////////////////////////////////////////////////////////////// @@ -79,15 +78,6 @@ const string& Job::getName () const { return _name; } -//////////////////////////////////////////////////////////////////////////////// -/// @brief attach an observer -//////////////////////////////////////////////////////////////////////////////// - -void Job::attachObserver (JobObserver* observer) { - _observers.push_back(observer); -} - - //////////////////////////////////////////////////////////////////////////////// /// @} //////////////////////////////////////////////////////////////////////////////// @@ -121,50 +111,6 @@ void Job::setDispatcherThread (DispatcherThread*) { /// @} //////////////////////////////////////////////////////////////////////////////// -// ----------------------------------------------------------------------------- -// --SECTION-- protected methods -// ----------------------------------------------------------------------------- - -//////////////////////////////////////////////////////////////////////////////// -/// @addtogroup Dispatcher -/// @{ -//////////////////////////////////////////////////////////////////////////////// - -//////////////////////////////////////////////////////////////////////////////// -/// @brief notify attached observers -//////////////////////////////////////////////////////////////////////////////// - -void Job::notifyObservers (const Job::notification_e type) { - vector::iterator i; - - switch (type) { - case JOB_WORK: - for (i = _observers.begin(); i != _observers.end(); ++i) { - (*i)->workCallback(this); - } - - break; - - case JOB_CLEANUP: - for (i = _observers.begin(); i != _observers.end(); ++i) { - (*i)->cleanupCallback(this); - } - - break; - - case JOB_SHUTDOWN: - for (i = _observers.begin(); i != _observers.end(); ++i) { - (*i)->shutdownCallback(this); - } - - break; - } -} - -//////////////////////////////////////////////////////////////////////////////// -/// @} -//////////////////////////////////////////////////////////////////////////////// - // ----------------------------------------------------------------------------- // --SECTION-- END-OF-FILE // ----------------------------------------------------------------------------- diff --git a/lib/Dispatcher/Job.h b/lib/Dispatcher/Job.h index 671d31b442..79a4504d06 100644 --- a/lib/Dispatcher/Job.h +++ b/lib/Dispatcher/Job.h @@ -42,7 +42,6 @@ namespace triagens { namespace rest { class DispatcherThread; - class JobObserver; // ----------------------------------------------------------------------------- // --SECTION-- class Job @@ -98,16 +97,6 @@ namespace triagens { JOB_FAILED }; -//////////////////////////////////////////////////////////////////////////////// -/// @brief notification for job observers -//////////////////////////////////////////////////////////////////////////////// - - enum notification_e { - JOB_WORK, - JOB_CLEANUP, - JOB_SHUTDOWN - }; - //////////////////////////////////////////////////////////////////////////////// /// @} //////////////////////////////////////////////////////////////////////////////// @@ -157,12 +146,6 @@ namespace triagens { const string& getName () const; -//////////////////////////////////////////////////////////////////////////////// -/// @brief attach an observer -//////////////////////////////////////////////////////////////////////////////// - - void attachObserver (JobObserver* observer); - //////////////////////////////////////////////////////////////////////////////// /// @} //////////////////////////////////////////////////////////////////////////////// @@ -226,27 +209,6 @@ namespace triagens { /// @} //////////////////////////////////////////////////////////////////////////////// -// ----------------------------------------------------------------------------- -// --SECTION-- protected methods -// ----------------------------------------------------------------------------- - -//////////////////////////////////////////////////////////////////////////////// -/// @addtogroup Dispatcher -/// @{ -//////////////////////////////////////////////////////////////////////////////// - - protected: - -//////////////////////////////////////////////////////////////////////////////// -/// @brief notify attached observers -//////////////////////////////////////////////////////////////////////////////// - - void notifyObservers (const Job::notification_e type); - -//////////////////////////////////////////////////////////////////////////////// -/// @} -//////////////////////////////////////////////////////////////////////////////// - // ----------------------------------------------------------------------------- // --SECTION-- private variables // ----------------------------------------------------------------------------- @@ -264,11 +226,6 @@ namespace triagens { const string& _name; -//////////////////////////////////////////////////////////////////////////////// -/// @brief attached observers -//////////////////////////////////////////////////////////////////////////////// - - vector _observers; }; } } diff --git a/lib/GeneralServer/GeneralServerJob.h b/lib/GeneralServer/GeneralServerJob.h index 4bfbb5315a..e239e7c0ac 100644 --- a/lib/GeneralServer/GeneralServerJob.h +++ b/lib/GeneralServer/GeneralServerJob.h @@ -174,7 +174,7 @@ namespace triagens { /// @brief whether or not the job is detached //////////////////////////////////////////////////////////////////////////////// - bool isDetached () const { + inline bool isDetached () const { return _isDetached; } @@ -234,7 +234,7 @@ namespace triagens { abandon = _abandon; } - if (! abandon) { + if (! abandon && _server != 0) { _server->jobDone(this); } diff --git a/lib/HttpServer/AsyncJobManager.h b/lib/HttpServer/AsyncJobManager.h index bfdcad018a..146ff0551b 100644 --- a/lib/HttpServer/AsyncJobManager.h +++ b/lib/HttpServer/AsyncJobManager.h @@ -388,6 +388,8 @@ namespace triagens { return; } + assert(job != 0); + *jobId = (AsyncJobResult::IdType) generate(); job->assignId((uint64_t) *jobId); @@ -415,7 +417,7 @@ namespace triagens { return; } - double now = TRI_microtime(); + const double now = TRI_microtime(); WRITE_LOCKER(_lock); JobList::iterator it = _jobs.find(jobId);