1
0
Fork 0

handle state changes

This commit is contained in:
Jan Steemann 2014-01-16 14:52:40 +01:00
parent 8a9fe34ad6
commit a987647bd2
7 changed files with 385 additions and 23 deletions

View File

@ -28,19 +28,22 @@
#include "ApplicationCluster.h"
#include "Rest/Endpoint.h"
#include "Basics/JsonHelper.h"
#include "SimpleHttpClient/ConnectionManager.h"
#include "BasicsC/logging.h"
#include "Cluster/HeartbeatThread.h"
#include "Cluster/ServerState.h"
#include "Cluster/ClusterInfo.h"
#include "Cluster/ClusterComm.h"
#include "BasicsC/logging.h"
#include "Dispatcher/ApplicationDispatcher.h"
#include "SimpleHttpClient/ConnectionManager.h"
#include "V8Server/ApplicationV8.h"
#include "VocBase/server.h"
using namespace triagens;
using namespace triagens::basics;
using namespace triagens::arango;
// -----------------------------------------------------------------------------
// --SECTION-- class ApplicationCluster
// --SECTION-- class ApplicationCluster
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
@ -51,8 +54,13 @@ using namespace triagens::arango;
/// @brief constructor
////////////////////////////////////////////////////////////////////////////////
ApplicationCluster::ApplicationCluster ()
ApplicationCluster::ApplicationCluster (TRI_server_t* server,
triagens::rest::ApplicationDispatcher* dispatcher,
ApplicationV8* applicationV8)
: ApplicationFeature("Sharding"),
_server(server),
_dispatcher(dispatcher),
_applicationV8(applicationV8),
_heartbeat(0),
_disableHeartbeat(false),
_heartbeatInterval(0),
@ -62,6 +70,7 @@ ApplicationCluster::ApplicationCluster ()
_myAddress(),
_enableCluster(false) {
assert(_dispatcher != 0);
}
////////////////////////////////////////////////////////////////////////////////
@ -249,7 +258,7 @@ bool ApplicationCluster::start () {
// start heartbeat thread
_heartbeat = new HeartbeatThread(_heartbeatInterval * 1000, 5);
_heartbeat = new HeartbeatThread(_server, _dispatcher, _applicationV8, _heartbeatInterval * 1000, 5);
if (_heartbeat == 0) {
LOG_FATAL_AND_EXIT("unable to start cluster heartbeat thread");

View File

@ -31,9 +31,17 @@
#include "ApplicationServer/ApplicationFeature.h"
#include "Cluster/ServerState.h"
namespace triagens {
namespace arango {
extern "C" {
struct TRI_server_s;
}
namespace triagens {
namespace rest {
class ApplicationDispatcher;
}
namespace arango {
class ApplicationV8;
class HeartbeatThread;
////////////////////////////////////////////////////////////////////////////////
@ -56,7 +64,9 @@ namespace triagens {
/// @brief constructor
////////////////////////////////////////////////////////////////////////////////
ApplicationCluster ();
ApplicationCluster (struct TRI_server_s*,
triagens::rest::ApplicationDispatcher*,
triagens::arango::ApplicationV8*);
////////////////////////////////////////////////////////////////////////////////
/// @brief destructor
@ -134,6 +144,24 @@ namespace triagens {
private:
////////////////////////////////////////////////////////////////////////////////
/// @brief server
////////////////////////////////////////////////////////////////////////////////
struct TRI_server_s* _server;
////////////////////////////////////////////////////////////////////////////////
/// @brief dispatcher
////////////////////////////////////////////////////////////////////////////////
triagens::rest::ApplicationDispatcher* _dispatcher;
////////////////////////////////////////////////////////////////////////////////
/// @brief v8 dispatcher
////////////////////////////////////////////////////////////////////////////////
triagens::arango::ApplicationV8* _applicationV8;
////////////////////////////////////////////////////////////////////////////////
/// @brief thread for heartbeat
////////////////////////////////////////////////////////////////////////////////

View File

@ -0,0 +1,268 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief DB server job
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2004-2013 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is triAGENS GmbH, Cologne, Germany
///
/// @author Jan Steemann
/// @author Copyright 2009-2013, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#ifndef TRIAGENS_CLUSTER_DBSERVER_JOB_H
#define TRIAGENS_CLUSTER_DBSERVER_JOB_H 1
#include "Dispatcher/Job.h"
#include "Basics/Exceptions.h"
#include "Basics/Mutex.h"
#include "Basics/MutexLocker.h"
#include "BasicsC/logging.h"
#include "Rest/Handler.h"
#include "V8Server/ApplicationV8.h"
#include "V8/v8-utils.h"
#include "VocBase/server.h"
#include "VocBase/vocbase.h"
// -----------------------------------------------------------------------------
// --SECTION-- class DBServerJob
// -----------------------------------------------------------------------------
namespace triagens {
namespace arango {
////////////////////////////////////////////////////////////////////////////////
/// @brief general server job
////////////////////////////////////////////////////////////////////////////////
class DBServerJob : public triagens::rest::Job {
private:
DBServerJob (DBServerJob const&);
DBServerJob& operator= (DBServerJob const&);
// -----------------------------------------------------------------------------
// --SECTION-- constructors and destructors
// -----------------------------------------------------------------------------
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief constructs a new db server job
////////////////////////////////////////////////////////////////////////////////
DBServerJob (TRI_server_t* server,
ApplicationV8* applicationV8)
: Job("HttpServerJob"),
_server(server),
_applicationV8(applicationV8),
_shutdown(0),
_abandon(false) {
}
////////////////////////////////////////////////////////////////////////////////
/// @brief destructs a db server job
////////////////////////////////////////////////////////////////////////////////
~DBServerJob () {
}
// -----------------------------------------------------------------------------
// --SECTION-- public methods
// -----------------------------------------------------------------------------
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief abandon job
////////////////////////////////////////////////////////////////////////////////
void abandon () {
MUTEX_LOCKER(_abandonLock);
_abandon = true;
}
// -----------------------------------------------------------------------------
// --SECTION-- Job methods
// -----------------------------------------------------------------------------
public:
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
JobType type () {
return Job::READ_JOB;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief whether or not the job is detached
////////////////////////////////////////////////////////////////////////////////
inline bool isDetached () const {
return true;
}
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
string const& queue () {
static string const standard = "STANDARD";
return standard;
}
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
status_e work () {
LOG_TRACE("starting plan update handler");
if (_shutdown != 0) {
return Job::JOB_DONE;
}
bool result = execute();
if (result) {
return triagens::rest::Job::JOB_DONE;
}
return triagens::rest::Job::JOB_FAILED;
}
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
void cleanup () {
delete this;
}
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
bool beginShutdown () {
LOG_TRACE("shutdown job %p", (void*) this);
_shutdown = 1;
return true;
}
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
void handleError (basics::TriagensError const& ex) {
}
// -----------------------------------------------------------------------------
// --SECTION-- private methods
// -----------------------------------------------------------------------------
private:
bool execute () {
// default to system database
TRI_vocbase_t* vocbase = TRI_UseDatabaseServer(_server, "_system");
if (vocbase == 0) {
return false;
}
ApplicationV8::V8Context* context = _applicationV8->enterContext(vocbase, 0, false, true);
if (context == 0) {
TRI_ReleaseDatabaseServer(_server, vocbase);
return false;
}
{
v8::HandleScope scope;
// execute script inside the context
char const* file = "handle-plan-change";
char const* content = "require('org/arangodb/cluster').handlePlanChange();";
TRI_ExecuteJavaScriptString(v8::Context::GetCurrent(), v8::String::New(content), v8::String::New(file), false);
}
_applicationV8->exitContext(context);
TRI_ReleaseDatabaseServer(_server, vocbase);
return true;
}
// -----------------------------------------------------------------------------
// --SECTION-- private variables
// -----------------------------------------------------------------------------
private:
////////////////////////////////////////////////////////////////////////////////
/// @brief server
////////////////////////////////////////////////////////////////////////////////
TRI_server_t* _server;
////////////////////////////////////////////////////////////////////////////////
/// @brief v8 dispatcher
////////////////////////////////////////////////////////////////////////////////
ApplicationV8* _applicationV8;
// -----------------------------------------------------------------------------
// --SECTION-- protected variables
// -----------------------------------------------------------------------------
protected:
////////////////////////////////////////////////////////////////////////////////
/// @brief shutdown in progress
////////////////////////////////////////////////////////////////////////////////
volatile sig_atomic_t _shutdown;
////////////////////////////////////////////////////////////////////////////////
/// @brief server is dead lock
////////////////////////////////////////////////////////////////////////////////
basics::Mutex _abandonLock;
////////////////////////////////////////////////////////////////////////////////
/// @brief server is dead
////////////////////////////////////////////////////////////////////////////////
bool _abandon;
};
}
}
#endif
// -----------------------------------------------------------------------------
// --SECTION-- END-OF-FILE
// -----------------------------------------------------------------------------
// Local Variables:
// mode: outline-minor
// outline-regexp: "/// @brief\\|/// {@inheritDoc}\\|/// @addtogroup\\|/// @page\\|// --SECTION--\\|/// @\\}"
// End:

View File

@ -29,7 +29,13 @@
#include "Basics/ConditionLocker.h"
#include "Basics/JsonHelper.h"
#include "BasicsC/logging.h"
#include "Cluster/DBServerJob.h"
#include "Cluster/ServerState.h"
#include "Dispatcher/ApplicationDispatcher.h"
#include "Dispatcher/Dispatcher.h"
#include "Dispatcher/Job.h"
#include "V8Server/ApplicationV8.h"
#include "VocBase/server.h"
using namespace triagens::arango;
@ -45,9 +51,15 @@ using namespace triagens::arango;
/// @brief constructs a heartbeat thread
////////////////////////////////////////////////////////////////////////////////
HeartbeatThread::HeartbeatThread (uint64_t interval,
uint64_t maxFailsBeforeWarning)
HeartbeatThread::HeartbeatThread (TRI_server_t* server,
triagens::rest::ApplicationDispatcher* dispatcher,
ApplicationV8* applicationV8,
uint64_t interval,
uint64_t maxFailsBeforeWarning)
: Thread("heartbeat"),
_server(server),
_dispatcher(dispatcher),
_applicationV8(applicationV8),
_agency(),
_condition(),
_myId(ServerState::instance()->getId()),
@ -56,6 +68,7 @@ HeartbeatThread::HeartbeatThread (uint64_t interval,
_numFails(0),
_stop(0) {
assert(_dispatcher != 0);
allowAsynchronousCancelation();
}
@ -258,9 +271,19 @@ bool HeartbeatThread::handlePlanChange (uint64_t currentPlanVersion,
uint64_t& remotePlanVersion) {
LOG_TRACE("found a plan update");
// TODO: actually handle the change
// schedule a job for the change
triagens::rest::Job* job = new DBServerJob(_server, _applicationV8);
remotePlanVersion = currentPlanVersion;
assert(job != 0);
if (_dispatcher->dispatcher()->addJob(job)) {
remotePlanVersion = currentPlanVersion;
LOG_TRACE("scheduled plan update handler");
}
else {
LOG_ERROR("could not schedule plan update handler");
}
return true;
}

View File

@ -34,13 +34,17 @@
#include "BasicsC/logging.h"
#include "Cluster/AgencyComm.h"
#ifdef __cplusplus
extern "C" {
#endif
struct TRI_server_s;
}
namespace triagens {
namespace rest {
class ApplicationDispatcher;
}
namespace arango {
class ApplicationV8;
// -----------------------------------------------------------------------------
// --SECTION-- HeartbeatThread
@ -62,7 +66,10 @@ namespace triagens {
/// @brief constructs a heartbeat thread
////////////////////////////////////////////////////////////////////////////////
HeartbeatThread (uint64_t,
HeartbeatThread (struct TRI_server_s*,
triagens::rest::ApplicationDispatcher*,
ApplicationV8*,
uint64_t,
uint64_t);
////////////////////////////////////////////////////////////////////////////////
@ -152,6 +159,24 @@ namespace triagens {
private:
////////////////////////////////////////////////////////////////////////////////
/// @brief server
////////////////////////////////////////////////////////////////////////////////
struct TRI_server_s* _server;
////////////////////////////////////////////////////////////////////////////////
/// @brief Job dispatcher
////////////////////////////////////////////////////////////////////////////////
triagens::rest::ApplicationDispatcher* _dispatcher;
////////////////////////////////////////////////////////////////////////////////
/// @brief v8 dispatcher
////////////////////////////////////////////////////////////////////////////////
ApplicationV8* _applicationV8;
////////////////////////////////////////////////////////////////////////////////
/// @brief AgencyComm instance
////////////////////////////////////////////////////////////////////////////////
@ -199,11 +224,6 @@ namespace triagens {
}
}
#ifdef __cplusplus
}
#endif
#endif
// Local Variables:

View File

@ -538,7 +538,7 @@ void ArangoServer::buildApplicationServer () {
// .............................................................................
#ifdef TRI_ENABLE_CLUSTER
_applicationCluster = new ApplicationCluster();
_applicationCluster = new ApplicationCluster(_server, _applicationDispatcher, _applicationV8);
if (_applicationCluster == 0) {
LOG_FATAL_AND_EXIT("out of memory");

View File

@ -1,5 +1,5 @@
/*jslint indent: 2, nomen: true, maxlen: 100, sloppy: true, vars: true, white: true, plusplus: true */
/*global ArangoAgency, ArangoServerState, exports */
/*global ArangoAgency, ArangoServerState, require, exports */
////////////////////////////////////////////////////////////////////////////////
/// @brief JavaScript cluster functionality
@ -65,11 +65,25 @@ var isCoordinatorRequest = function (req) {
return req.headers.hasOwnProperty("x-arango-coordinator");
};
var handlePlanChange = function () {
if (! isCluster() || isCoordinator()) {
return;
}
try {
require("internal").print("handling a plan change");
}
catch (err) {
require("internal").print("plan change handling failed");
}
};
exports.isCluster = isCluster;
exports.isCoordinator = isCoordinator;
exports.role = role;
exports.status = status;
exports.isCoordinatorRequest = isCoordinatorRequest;
exports.handlePlanChange = handlePlanChange;
// -----------------------------------------------------------------------------
// --SECTION-- END-OF-FILE