1
0
Fork 0

Merge branch 'sharding' of ssh://github.com/triAGENS/ArangoDB into sharding

Conflicts:
	arangod/Makefile.files
This commit is contained in:
Max Neunhoeffer 2013-12-13 16:06:01 +01:00
commit 145fa2562e
17 changed files with 465 additions and 158 deletions

View File

@ -31,6 +31,7 @@
#include "Basics/WriteLocker.h"
#include "BasicsC/json.h"
#include "BasicsC/logging.h"
#include "Cluster/ServerState.h"
#include "Rest/Endpoint.h"
#include "SimpleHttpClient/GeneralClientConnection.h"
#include "SimpleHttpClient/SimpleHttpClient.h"
@ -441,18 +442,20 @@ bool AgencyComm::addEndpoint (std::string const& endpointSpecification,
++it;
}
// not found a previous endpoint, now create one
AgencyEndpoint* agencyEndpoint = createAgencyEndpoint(endpointSpecification);
// didn't find the endpoint in our list of endpoints, so now create a new one
for (size_t i = 0; i < NumConnections; ++i) {
AgencyEndpoint* agencyEndpoint = createAgencyEndpoint(endpointSpecification);
if (agencyEndpoint == 0) {
return false;
}
if (agencyEndpoint == 0) {
return false;
}
if (toFront) {
AgencyComm::_globalEndpoints.push_front(agencyEndpoint);
}
else {
AgencyComm::_globalEndpoints.push_back(agencyEndpoint);
if (toFront) {
AgencyComm::_globalEndpoints.push_front(agencyEndpoint);
}
else {
AgencyComm::_globalEndpoints.push_back(agencyEndpoint);
}
}
}
@ -663,6 +666,19 @@ AgencyEndpoint* AgencyComm::createAgencyEndpoint (std::string const& endpointSpe
// --SECTION-- public methods
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief sends the current server state to the agency
////////////////////////////////////////////////////////////////////////////////
bool AgencyComm::sendServerState () {
const std::string value = ServerState::stateToString(ServerState::instance()->getState()) +
":" +
AgencyComm::generateStamp();
AgencyCommResult result(setValue("State/ServerStates/" + ServerState::instance()->getId(), value));
return result.successful();
}
////////////////////////////////////////////////////////////////////////////////
/// @brief gets the backend version
////////////////////////////////////////////////////////////////////////////////
@ -845,7 +861,7 @@ AgencyCommResult AgencyComm::watchValue (std::string const& key,
/// @brief pop an endpoint from the queue
////////////////////////////////////////////////////////////////////////////////
AgencyEndpoint* AgencyComm::popEndpoint () {
AgencyEndpoint* AgencyComm::popEndpoint (std::string const& endpoint) {
while (1) {
{
WRITE_LOCKER(AgencyComm::_globalLock);
@ -858,6 +874,14 @@ AgencyEndpoint* AgencyComm::popEndpoint () {
assert(agencyEndpoint != 0);
if (! endpoint.empty() &&
agencyEndpoint->_endpoint->getSpecification() != endpoint) {
// we're looking for a different endpoint
++it;
continue;
}
if (! agencyEndpoint->_busy) {
agencyEndpoint->_busy = true;
@ -940,13 +964,16 @@ bool AgencyComm::sendWithFailover (triagens::rest::HttpRequest::HttpRequestType
numEndpoints = AgencyComm::_globalEndpoints.size();
assert(numEndpoints > 0);
}
size_t tries = 0;
std::string realUrl = url;
std::string forceEndpoint = "";
while (tries++ < numEndpoints) {
AgencyEndpoint* agencyEndpoint = popEndpoint();
AgencyEndpoint* agencyEndpoint = popEndpoint(forceEndpoint);
assert(agencyEndpoint != 0);
send(agencyEndpoint->_connection,
method,
timeout,
@ -987,16 +1014,14 @@ bool AgencyComm::sendWithFailover (triagens::rest::HttpRequest::HttpRequestType
realUrl = endpoint.substr(delim);
endpoint = endpoint.substr(0, delim);
LOG_WARNING("handling failover from '%s' to '%s'",
agencyEndpoint->_endpoint->getSpecification().c_str(),
endpoint.c_str());
if (! AgencyComm::hasEndpoint(endpoint)) {
// redirection to an unknown endpoint
if (_addNewEndpoints) {
AgencyComm::addEndpoint(endpoint, true);
LOG_INFO("adding agency-endpoint '%s'", endpoint.c_str());
// re-check the new endpoint
if (AgencyComm::hasEndpoint(endpoint)) {
@ -1012,10 +1037,14 @@ bool AgencyComm::sendWithFailover (triagens::rest::HttpRequest::HttpRequestType
return false;
}
forceEndpoint = endpoint;
// if we get here, we'll just use the next endpoint from the list
continue;
}
forceEndpoint = "";
// we can stop iterating over endpoints if the operation succeeded,
// if a watch timed out or
// if the reason for failure was a client-side error
@ -1060,13 +1089,13 @@ bool AgencyComm::send (triagens::httpclient::GeneralClientConnection* connection
result._statusCode = 0;
/*
LOG_INFO("sending %s request to agency at endpoint '%s', url '%s': %s",
LOG_TRACE("sending %s request to agency at endpoint '%s', url '%s': %s",
triagens::rest::HttpRequest::translateMethod(method).c_str(),
connection->getEndpoint()->getSpecification().c_str(),
url.c_str(),
body.c_str());
*/
triagens::httpclient::SimpleHttpClient client(connection,
timeout,
false);
@ -1120,12 +1149,12 @@ bool AgencyComm::send (triagens::httpclient::GeneralClientConnection* connection
if (found) {
result._index = triagens::basics::StringUtils::uint64(lastIndex);
}
/*
LOG_INFO("request to agency returned status code %d, message: '%s', body: '%s'",
LOG_TRACE("request to agency returned status code %d, message: '%s', body: '%s'",
result._statusCode,
result._message.c_str(),
result._body.c_str());
*/
delete response;
return result.successful();

View File

@ -302,6 +302,12 @@ namespace triagens {
// --SECTION-- public methods
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief sends the current server state to the agency
////////////////////////////////////////////////////////////////////////////////
bool sendServerState ();
////////////////////////////////////////////////////////////////////////////////
/// @brief gets the backend version
////////////////////////////////////////////////////////////////////////////////
@ -373,7 +379,7 @@ namespace triagens {
/// @brief pop an endpoint from the queue
////////////////////////////////////////////////////////////////////////////////
AgencyEndpoint* popEndpoint ();
AgencyEndpoint* popEndpoint (std::string const&);
////////////////////////////////////////////////////////////////////////////////
/// @brief reinsert an endpoint into the queue
@ -458,6 +464,11 @@ namespace triagens {
static AgencyConnectionOptions _globalConnectionOptions;
////////////////////////////////////////////////////////////////////////////////
/// @brief number of connections per endpoint
////////////////////////////////////////////////////////////////////////////////
static const size_t NumConnections = 3;
};
}
}

View File

@ -155,6 +155,8 @@ bool ApplicationCluster::start () {
return true;
}
ServerState::instance()->setId(_myId);
// perfom an initial connect to the agency
const std::string endpoints = AgencyComm::getEndpointsString();
@ -207,8 +209,11 @@ bool ApplicationCluster::start () {
ServerState::instance()->setRole(role);
ServerState::instance()->setState(ServerState::STATE_STARTUP);
// the agency about our state
AgencyComm comm;
comm.sendServerState();
const std::string version = comm.getVersion();
LOG_INFO("Cluster feature is turned on. "
@ -221,7 +226,7 @@ bool ApplicationCluster::start () {
ServerState::roleToString(role).c_str());
// start heartbeat thread
_heartbeat = new HeartbeatThread(_myId, _heartbeatInterval * 1000, 5);
_heartbeat = new HeartbeatThread(_heartbeatInterval * 1000, 5);
if (_heartbeat == 0) {
LOG_FATAL_AND_EXIT("unable to start cluster heartbeat thread");
@ -231,6 +236,44 @@ bool ApplicationCluster::start () {
LOG_FATAL_AND_EXIT("heartbeat could not connect to agency endpoints (%s)",
endpoints.c_str());
}
return true;
}
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
bool ApplicationCluster::open () {
if (! enabled()) {
return true;
}
ServerState::RoleEnum role = ServerState::instance()->getRole();
// tell the agency that we are ready
AgencyComm comm;
AgencyCommResult result = comm.setValue("State/ServersRegistered/" + _myId, _myAddress);
if (! result.successful()) {
LOG_FATAL_AND_EXIT("unable to register server in agency");
}
if (role == ServerState::ROLE_COORDINATOR) {
ServerState::instance()->setState(ServerState::STATE_SERVING);
// register coordinator
AgencyCommResult result = comm.setValue("State/Coordinators/" + _myId, "none");
if (! result.successful()) {
LOG_FATAL_AND_EXIT("unable to register coordinator in agency");
}
}
else if (role == ServerState::ROLE_PRIMARY) {
ServerState::instance()->setState(ServerState::STATE_SERVINGASYNC);
}
else if (role == ServerState::ROLE_SECONDARY) {
LOG_FATAL_AND_EXIT("secondary server tasks are currently not implemented");
}
return true;
}
@ -245,6 +288,12 @@ void ApplicationCluster::close () {
}
_heartbeat->stop();
// change into shutdown state
ServerState::instance()->setState(ServerState::STATE_SHUTDOWN);
AgencyComm comm;
comm.sendServerState();
}
////////////////////////////////////////////////////////////////////////////////
@ -255,9 +304,18 @@ void ApplicationCluster::stop () {
if (! enabled()) {
return;
}
// change into shutdown state
ServerState::instance()->setState(ServerState::STATE_SHUTDOWN);
AgencyComm comm;
comm.sendServerState();
_heartbeat->stop();
// unregister ourselves
comm.removeValues("State/ServersRegistered/" + _myId, false);
AgencyComm::cleanup();
}
@ -304,17 +362,20 @@ std::string ApplicationCluster::getEndpointForId () const {
ServerState::RoleEnum ApplicationCluster::checkCoordinatorsList () const {
// fetch value at TmpConfig/DBServers
// we need this to determine the server's role
const std::string key = "TmpConfig/Coordinators";
AgencyComm comm;
AgencyCommResult result = comm.getValues("TmpConfig/Coordinators", true);
AgencyCommResult result = comm.getValues(key, true);
if (! result.successful()) {
const std::string endpoints = AgencyComm::getEndpointsString();
LOG_FATAL_AND_EXIT("Could not fetch configuration from agency endpoints (%s): "
"got status code %d, message: %s",
"got status code %d, message: %s, key: %s",
endpoints.c_str(),
result._statusCode,
result.errorMessage().c_str());
result.errorMessage().c_str(),
key.c_str());
}
std::map<std::string, std::string> out;
@ -340,17 +401,20 @@ ServerState::RoleEnum ApplicationCluster::checkCoordinatorsList () const {
ServerState::RoleEnum ApplicationCluster::checkServersList () const {
// fetch value at TmpConfig/DBServers
// we need this to determine the server's role
const std::string key = "TmpConfig/DBServers";
AgencyComm comm;
AgencyCommResult result = comm.getValues("TmpConfig/DBServers", true);
AgencyCommResult result = comm.getValues(key, true);
if (! result.successful()) {
const std::string endpoints = AgencyComm::getEndpointsString();
LOG_FATAL_AND_EXIT("Could not fetch configuration from agency endpoints (%s): "
"got status code %d, message: %s",
"got status code %d, message: %s, key: %s",
endpoints.c_str(),
result._statusCode,
result.errorMessage().c_str());
result.errorMessage().c_str(),
key.c_str());
}
std::map<std::string, std::string> out;

View File

@ -96,6 +96,12 @@ namespace triagens {
bool prepare ();
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
bool open ();
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////

View File

@ -44,13 +44,12 @@ using namespace triagens::arango;
/// @brief constructs a heartbeat thread
////////////////////////////////////////////////////////////////////////////////
HeartbeatThread::HeartbeatThread (std::string const& myId,
uint64_t interval,
HeartbeatThread::HeartbeatThread (uint64_t interval,
uint64_t maxFailsBeforeWarning)
: Thread("heartbeat"),
_agency(),
_condition(),
_myId(myId),
_myId(ServerState::instance()->getId()),
_interval(interval),
_maxFailsBeforeWarning(maxFailsBeforeWarning),
_numFails(0),
@ -238,15 +237,9 @@ bool HeartbeatThread::handleStateChange (AgencyCommResult const& result,
////////////////////////////////////////////////////////////////////////////////
bool HeartbeatThread::sendState () {
const std::string value = ServerState::stateToString(ServerState::instance()->getState()) +
":" +
AgencyComm::generateStamp();
const bool result = _agency.sendServerState();
// return value is intentionally not handled
// if sending the current state fails, we'll just try again in the next iteration
AgencyCommResult result(_agency.setValue("State/ServerStates/" + _myId, value));
if (result.successful()) {
if (result) {
_numFails = 0;
}
else {
@ -257,7 +250,7 @@ bool HeartbeatThread::sendState () {
}
}
return result.successful();
return result;
}
// Local Variables:

View File

@ -62,8 +62,7 @@ namespace triagens {
/// @brief constructs a heartbeat thread
////////////////////////////////////////////////////////////////////////////////
HeartbeatThread (std::string const&,
uint64_t,
HeartbeatThread (uint64_t,
uint64_t);
////////////////////////////////////////////////////////////////////////////////

View File

@ -0,0 +1,144 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief shard control request handler
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2004-2013 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is triAGENS GmbH, Cologne, Germany
///
/// @author Jan Steemann
/// @author Copyright 2010-2013, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#include "RestShardHandler.h"
#include "Cluster/ServerState.h"
#include "Dispatcher/Dispatcher.h"
#include "Rest/HttpRequest.h"
#include "Rest/HttpResponse.h"
#include "HttpServer/HttpServer.h"
#include "HttpServer/HttpHandlerFactory.h"
#include "GeneralServer/GeneralServerJob.h"
#include "GeneralServer/GeneralServer.h"
using namespace triagens::arango;
// -----------------------------------------------------------------------------
// --SECTION-- public constants
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief name of the queue
////////////////////////////////////////////////////////////////////////////////
const string RestShardHandler::QUEUE_NAME = "STANDARD";
// -----------------------------------------------------------------------------
// --SECTION-- constructors and destructors
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief constructor
////////////////////////////////////////////////////////////////////////////////
RestShardHandler::RestShardHandler (triagens::rest::HttpRequest* request,
void* data)
: RestBaseHandler(request),
_dispatcher(0) {
_dispatcher = static_cast<triagens::rest::Dispatcher*>(data);
assert(_dispatcher != 0);
}
// -----------------------------------------------------------------------------
// --SECTION-- Handler methods
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
bool RestShardHandler::isDirect () {
return true;
}
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
string const& RestShardHandler::queue () const {
return QUEUE_NAME;
}
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
triagens::rest::HttpHandler::status_e RestShardHandler::execute () {
ServerState::RoleEnum role = ServerState::instance()->getRole();
if (role == ServerState::ROLE_COORDINATOR) {
generateError(triagens::rest::HttpResponse::BAD,
(int) triagens::rest::HttpResponse::BAD,
"this API is meant to be called on a coordinator node");
return HANDLER_DONE;
}
/*
bool found;
char const* coordinator = _request->header("x-arango-coordinator", found);
if (! found) {
generateError(triagens::rest::HttpResponse::BAD,
(int) triagens::rest::HttpResponse::BAD,
"header 'x-arango-coordinator' is missing");
return HANDLER_DONE;
}
char const* operation = _request->header("x-arango-operation", found);
if (! found) {
generateError(triagens::rest::HttpResponse::BAD,
(int) triagens::rest::HttpResponse::BAD,
"header 'x-arango-operation' is missing");
}
char const* url = _request->header("x-arango-url", found);
if (! found) {
generateError(triagens::rest::HttpResponse::BAD,
(int) triagens::rest::HttpResponse::BAD,
"header 'x-arango-url' is missing");
}
*/
/*
triagens::rest::HttpHandler* handler = this->_server->createHandler(_request);
triagens::rest::Job* job = new triagens::rest::GeneralServerJob<triagens::rest::HttpServer, triagens::rest::HttpHandlerFactory::GeneralHandler>(0, handler, true);
_dispatcher->addJob(job);
*/
// respond with a 202
_response = createResponse(triagens::rest::HttpResponse::ACCEPTED);
return HANDLER_DONE;
}
// Local Variables:
// mode: outline-minor
// outline-regexp: "/// @brief\\|/// {@inheritDoc}\\|/// @addtogroup\\|/// @page\\|// --SECTION--\\|/// @\\}"
// End:

View File

@ -0,0 +1,114 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief shard request handler
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2004-2013 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is triAGENS GmbH, Cologne, Germany
///
/// @author Jan Steemann
/// @author Copyright 2010-2013, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#ifndef TRIAGENS_ARANGOD_CLUSTER_REST_SHARD_HANDLER_H
#define TRIAGENS_ARANGOD_CLUSTER_REST_SHARD_HANDLER_H 1
#include "Admin/RestBaseHandler.h"
namespace triagens {
namespace rest {
class Dispatcher;
}
namespace arango {
// -----------------------------------------------------------------------------
// --SECTION-- class RestShardHandler
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief shard control request handler
////////////////////////////////////////////////////////////////////////////////
class RestShardHandler : public triagens::admin::RestBaseHandler {
// -----------------------------------------------------------------------------
// --SECTION-- constructors and destructors
// -----------------------------------------------------------------------------
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief constructor
////////////////////////////////////////////////////////////////////////////////
RestShardHandler (triagens::rest::HttpRequest* request,
void*);
// -----------------------------------------------------------------------------
// --SECTION-- Handler methods
// -----------------------------------------------------------------------------
public:
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
bool isDirect ();
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
string const& queue () const;
////////////////////////////////////////////////////////////////////////////////
/// @brief executes the handler
////////////////////////////////////////////////////////////////////////////////
status_e execute ();
// -----------------------------------------------------------------------------
// --SECTION-- private variables
// -----------------------------------------------------------------------------
private:
////////////////////////////////////////////////////////////////////////////////
/// @brief dispatcher
////////////////////////////////////////////////////////////////////////////////
triagens::rest::Dispatcher* _dispatcher;
////////////////////////////////////////////////////////////////////////////////
/// @brief name of the queue
////////////////////////////////////////////////////////////////////////////////
static const std::string QUEUE_NAME;
};
}
}
#endif
// Local Variables:
// mode: outline-minor
// outline-regexp: "/// @brief\\|/// {@inheritDoc}\\|/// @addtogroup\\|/// @page\\|// --SECTION--\\|/// @\\}"
// End:

View File

@ -163,11 +163,15 @@ ServerState::StateEnum ServerState::getState () {
/// @brief set the current state
////////////////////////////////////////////////////////////////////////////////
void ServerState::setState (StateEnum state) {
void ServerState::setState (StateEnum state) {
bool result = false;
WRITE_LOCKER(_lock);
if (state == _state) {
return;
}
if (_role == ROLE_PRIMARY) {
result = checkPrimaryState(state);
}
@ -225,7 +229,9 @@ bool ServerState::checkPrimaryState (StateEnum state) {
}
else if (state == STATE_SHUTDOWN) {
return (_state == STATE_STARTUP ||
_state == STATE_STOPPED);
_state == STATE_STOPPED ||
_state == STATE_SERVINGSYNC ||
_state == STATE_SERVINGASYNC);
}
// anything else is invalid

View File

@ -153,6 +153,26 @@ namespace triagens {
_role = role;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief get the server id
////////////////////////////////////////////////////////////////////////////////
inline std::string getId () const {
return _id;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief set the server id
////////////////////////////////////////////////////////////////////////////////
void setId (std::string const& id) {
// id can be set just once
assert(_id.empty());
assert(! id.empty());
_id = id;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief get the current state
////////////////////////////////////////////////////////////////////////////////
@ -163,7 +183,7 @@ namespace triagens {
/// @brief set the current state
////////////////////////////////////////////////////////////////////////////////
void setState (StateEnum);
void setState (StateEnum);
// -----------------------------------------------------------------------------
// --SECTION-- private methods
@ -189,6 +209,12 @@ namespace triagens {
private:
////////////////////////////////////////////////////////////////////////////////
/// @brief the server's id. can be set just once
////////////////////////////////////////////////////////////////////////////////
std::string _id;
////////////////////////////////////////////////////////////////////////////////
/// @brief r/w lock for state
////////////////////////////////////////////////////////////////////////////////

View File

@ -112,9 +112,11 @@ if ENABLE_CLUSTER
bin_arangod_SOURCES += \
arangod/Cluster/AgencyComm.cpp \
arangod/Cluster/ApplicationCluster.cpp \
arangod/Cluster/ClusterComm.cpp \
arangod/Cluster/HeartbeatThread.cpp \
arangod/Cluster/RestShardHandler.cpp \
arangod/Cluster/ServerState.cpp \
arangod/Cluster/ServerState.cpp \
arangod/Cluster/ClusterState.cpp \
arangod/Cluster/ClusterComm.cpp
endif

View File

@ -74,6 +74,7 @@
#ifdef TRI_ENABLE_CLUSTER
#include "Cluster/ApplicationCluster.h"
#include "Cluster/RestShardHandler.h"
#endif
#include "V8/V8LineEditor.h"
@ -106,6 +107,7 @@ using namespace triagens::arango;
static void DefineApiHandlers (HttpHandlerFactory* factory,
ApplicationAdminServer* admin,
ApplicationDispatcher* dispatcher,
AsyncJobManager* jobManager) {
// add "/version" handler
@ -134,6 +136,13 @@ static void DefineApiHandlers (HttpHandlerFactory* factory,
// add "/upload" handler
factory->addPrefixHandler(RestVocbaseBaseHandler::UPLOAD_PATH,
RestHandlerCreator<RestUploadHandler>::createNoData);
#ifdef TRI_ENABLE_CLUSTER
// add "/shard-comm" handler
factory->addPrefixHandler("/_api/shard-comm",
RestHandlerCreator<RestShardHandler>::createData<void*>,
(void*) dispatcher->dispatcher());
#endif
}
////////////////////////////////////////////////////////////////////////////////
@ -142,6 +151,7 @@ static void DefineApiHandlers (HttpHandlerFactory* factory,
static void DefineAdminHandlers (HttpHandlerFactory* factory,
ApplicationAdminServer* admin,
ApplicationDispatcher* dispatcher,
AsyncJobManager* jobManager) {
// add "/version" handler
@ -785,8 +795,8 @@ int ArangoServer::startupServer () {
HttpHandlerFactory* handlerFactory = _applicationEndpointServer->getHandlerFactory();
DefineApiHandlers(handlerFactory, _applicationAdminServer, _jobManager);
DefineAdminHandlers(handlerFactory, _applicationAdminServer, _jobManager);
DefineApiHandlers(handlerFactory, _applicationAdminServer, _applicationDispatcher, _jobManager);
DefineAdminHandlers(handlerFactory, _applicationAdminServer, _applicationDispatcher, _jobManager);
// add action handler
handlerFactory->addPrefixHandler(

View File

@ -47,8 +47,7 @@ using namespace std;
////////////////////////////////////////////////////////////////////////////////
Job::Job (string const& name)
: _name(name),
_observers() {
: _name(name) {
}
////////////////////////////////////////////////////////////////////////////////
@ -79,15 +78,6 @@ const string& Job::getName () const {
return _name;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief attach an observer
////////////////////////////////////////////////////////////////////////////////
void Job::attachObserver (JobObserver* observer) {
_observers.push_back(observer);
}
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
@ -121,50 +111,6 @@ void Job::setDispatcherThread (DispatcherThread*) {
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- protected methods
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup Dispatcher
/// @{
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @brief notify attached observers
////////////////////////////////////////////////////////////////////////////////
void Job::notifyObservers (const Job::notification_e type) {
vector<JobObserver*>::iterator i;
switch (type) {
case JOB_WORK:
for (i = _observers.begin(); i != _observers.end(); ++i) {
(*i)->workCallback(this);
}
break;
case JOB_CLEANUP:
for (i = _observers.begin(); i != _observers.end(); ++i) {
(*i)->cleanupCallback(this);
}
break;
case JOB_SHUTDOWN:
for (i = _observers.begin(); i != _observers.end(); ++i) {
(*i)->shutdownCallback(this);
}
break;
}
}
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- END-OF-FILE
// -----------------------------------------------------------------------------

View File

@ -42,7 +42,6 @@ namespace triagens {
namespace rest {
class DispatcherThread;
class JobObserver;
// -----------------------------------------------------------------------------
// --SECTION-- class Job
@ -98,16 +97,6 @@ namespace triagens {
JOB_FAILED
};
////////////////////////////////////////////////////////////////////////////////
/// @brief notification for job observers
////////////////////////////////////////////////////////////////////////////////
enum notification_e {
JOB_WORK,
JOB_CLEANUP,
JOB_SHUTDOWN
};
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
@ -157,12 +146,6 @@ namespace triagens {
const string& getName () const;
////////////////////////////////////////////////////////////////////////////////
/// @brief attach an observer
////////////////////////////////////////////////////////////////////////////////
void attachObserver (JobObserver* observer);
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
@ -226,27 +209,6 @@ namespace triagens {
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- protected methods
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup Dispatcher
/// @{
////////////////////////////////////////////////////////////////////////////////
protected:
////////////////////////////////////////////////////////////////////////////////
/// @brief notify attached observers
////////////////////////////////////////////////////////////////////////////////
void notifyObservers (const Job::notification_e type);
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- private variables
// -----------------------------------------------------------------------------
@ -264,11 +226,6 @@ namespace triagens {
const string& _name;
////////////////////////////////////////////////////////////////////////////////
/// @brief attached observers
////////////////////////////////////////////////////////////////////////////////
vector<JobObserver*> _observers;
};
}
}

View File

@ -174,7 +174,7 @@ namespace triagens {
/// @brief whether or not the job is detached
////////////////////////////////////////////////////////////////////////////////
bool isDetached () const {
inline bool isDetached () const {
return _isDetached;
}
@ -215,7 +215,6 @@ namespace triagens {
switch (status) {
case Handler::HANDLER_DONE: return Job::JOB_DONE;
case Handler::HANDLER_DETACH: return Job::JOB_DETACH;
case Handler::HANDLER_REQUEUE: return Job::JOB_REQUEUE;
case Handler::HANDLER_FAILED: return Job::JOB_FAILED;
}
@ -235,7 +234,7 @@ namespace triagens {
abandon = _abandon;
}
if (! abandon) {
if (! abandon && _server != 0) {
_server->jobDone(this);
}

View File

@ -388,6 +388,8 @@ namespace triagens {
return;
}
assert(job != 0);
*jobId = (AsyncJobResult::IdType) generate();
job->assignId((uint64_t) *jobId);
@ -415,7 +417,7 @@ namespace triagens {
return;
}
double now = TRI_microtime();
const double now = TRI_microtime();
WRITE_LOCKER(_lock);
JobList::iterator it = _jobs.find(jobId);

View File

@ -81,7 +81,6 @@ namespace triagens {
enum status_e {
HANDLER_DONE,
HANDLER_DETACH,
HANDLER_REQUEUE,
HANDLER_FAILED
};