1
0
Fork 0
This commit is contained in:
Frank Celler 2012-01-03 19:00:49 +01:00
parent 35990d0cec
commit 053dba088d
13 changed files with 799 additions and 10 deletions

View File

@ -0,0 +1,44 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief application server with dispatcher
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2010-2011 triagens GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is triAGENS GmbH, Cologne, Germany
///
/// @author Dr. Frank Celler
/// @author Copyright 2009-2011, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#include "ApplicationServerDispatcher.h"
#include "Dispatcher/ApplicationServerDispatcherImpl.h"
namespace triagens {
namespace rest {
// -----------------------------------------------------------------------------
// constructors and destructors
// -----------------------------------------------------------------------------
ApplicationServerDispatcher* ApplicationServerDispatcher::create (string const& description, string const& version) {
return new ApplicationServerDispatcherImpl(description, version);
}
}
}

View File

@ -0,0 +1,87 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief application server with dispatcher
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2010-2011 triagens GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is triAGENS GmbH, Cologne, Germany
///
/// @author Dr. Frank Celler
/// @author Copyright 2009-2011, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#ifndef TRIAGENS_FYN_REST_APPLICATION_SERVER_DISPATCHER_H
#define TRIAGENS_FYN_REST_APPLICATION_SERVER_DISPATCHER_H 1
#include "ApplicationServer/ApplicationServer.h"
namespace triagens {
namespace rest {
class Dispatcher;
////////////////////////////////////////////////////////////////////////////////
/// @brief application server with dispatcher
////////////////////////////////////////////////////////////////////////////////
class ApplicationServerDispatcher : virtual public ApplicationServer {
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief constructs a new skeleton
////////////////////////////////////////////////////////////////////////////////
static ApplicationServerDispatcher* create (string const& description, string const& version);
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief returns the dispatcher
////////////////////////////////////////////////////////////////////////////////
virtual Dispatcher* dispatcher () const = 0;
////////////////////////////////////////////////////////////////////////////////
/// @brief builds the dispatcher
////////////////////////////////////////////////////////////////////////////////
virtual void buildDispatcher () = 0;
////////////////////////////////////////////////////////////////////////////////
/// @brief builds the dispatcher reporter
////////////////////////////////////////////////////////////////////////////////
virtual void buildDispatcherReporter () = 0;
////////////////////////////////////////////////////////////////////////////////
/// @brief builds the dispatcher queue
////////////////////////////////////////////////////////////////////////////////
virtual void buildStandardQueue (size_t nrThreads) = 0;
////////////////////////////////////////////////////////////////////////////////
/// @brief builds the named dispatcher queue
////////////////////////////////////////////////////////////////////////////////
virtual void buildNamedQueue (string const& name, size_t nrThreads) = 0;
};
}
}
#endif

View File

@ -0,0 +1,208 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief application server dispatcher implementation
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2010-2011 triagens GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is triAGENS GmbH, Cologne, Germany
///
/// @author Dr. Frank Celler
/// @author Copyright 2009-2011, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#include "ApplicationServerDispatcherImpl.h"
#include <Logger/Logger.h>
#include "Dispatcher/DispatcherImpl.h"
#include "Dispatcher/DispatcherQueue.h"
#include "Scheduler/PeriodicTask.h"
#include "Scheduler/Scheduler.h"
using namespace std;
using namespace triagens::basics;
using namespace triagens::rest;
// -----------------------------------------------------------------------------
// helper classes and methods
// -----------------------------------------------------------------------------
namespace {
////////////////////////////////////////////////////////////////////////////////
/// @brief produces a dispatcher status report
////////////////////////////////////////////////////////////////////////////////
class DispatcherReporterTask : public PeriodicTask {
public:
DispatcherReporterTask (Dispatcher* dispatcher, double reportIntervall)
: Task("Dispatcher-Reporter"), PeriodicTask(reportIntervall * 0., reportIntervall), dispatcher(dispatcher) {
}
public:
bool handlePeriod () {
dispatcher->reportStatus();
return true;
}
public:
Dispatcher* dispatcher;
};
}
namespace triagens {
namespace rest {
// -----------------------------------------------------------------------------
// constructors and destructors
// -----------------------------------------------------------------------------
ApplicationServerDispatcherImpl::ApplicationServerDispatcherImpl (string const& description, string const& version)
: ApplicationServerSchedulerImpl(description, version),
_dispatcher(0),
_dispatcherReporterTask(0) {
}
ApplicationServerDispatcherImpl::~ApplicationServerDispatcherImpl () {
if (_dispatcher != 0) {
delete _dispatcher;
}
}
// -----------------------------------------------------------------------------
// public methods
// -----------------------------------------------------------------------------
void ApplicationServerDispatcherImpl::buildDispatcher () {
if (_dispatcher != 0) {
LOGGER_FATAL << "a dispatcher has already been created";
exit(EXIT_FAILURE);
}
_dispatcher = new DispatcherImpl();
}
void ApplicationServerDispatcherImpl::buildDispatcherReporter () {
if (_dispatcher == 0) {
LOGGER_FATAL << "no dispatcher is known, cannot create dispatcher reporter";
exit(EXIT_FAILURE);
}
if (0.0 < _reportIntervall) {
_dispatcherReporterTask = new DispatcherReporterTask(_dispatcher, _reportIntervall);
registerTask(_dispatcherReporterTask);
}
}
void ApplicationServerDispatcherImpl::buildStandardQueue (size_t nrThreads) {
if (_dispatcher == 0) {
LOGGER_FATAL << "no dispatcher is known, cannot create dispatcher queue";
exit(EXIT_FAILURE);
}
LOGGER_TRACE << "setting up a standard queue with " << nrThreads << " threads ";
_dispatcher->addQueue("STANDARD", nrThreads);
}
void ApplicationServerDispatcherImpl::buildNamedQueue (string const& name, size_t nrThreads) {
if (_dispatcher == 0) {
LOGGER_FATAL << "no dispatcher is known, cannot create dispatcher queue";
exit(EXIT_FAILURE);
}
LOGGER_TRACE << "setting up a named queue '" << name << "' with " << nrThreads << " threads ";
_dispatcher->addQueue(name, nrThreads);
}
void ApplicationServerDispatcherImpl::start () {
ApplicationServerSchedulerImpl::start();
if (_dispatcher != 0) {
bool ok = _dispatcher->start();
if (! ok) {
LOGGER_FATAL << "cannot start dispatcher";
exit(EXIT_FAILURE);
}
}
}
void ApplicationServerDispatcherImpl::wait () {
ApplicationServerSchedulerImpl::wait();
if (_dispatcher != 0) {
while (_dispatcher->isRunning()) {
LOGGER_TRACE << "waiting for dispatcher to stop";
usleep(10000);
}
LOGGER_TRACE << "dispatcher has stopped";
}
}
void ApplicationServerDispatcherImpl::beginShutdown () {
ApplicationServerSchedulerImpl::beginShutdown();
if (_dispatcher != 0) {
_dispatcher->beginShutdown();
}
}
void ApplicationServerDispatcherImpl::shutdown () {
ApplicationServerSchedulerImpl::shutdown();
if (_dispatcher != 0) {
int count = 0;
while (++count < 6 && _dispatcher->isRunning()) {
LOGGER_TRACE << "waiting for dispatcher to stop";
sleep(1);
}
}
}
}
}

View File

@ -0,0 +1,139 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief application server dispatcher implementation
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2010-2011 triagens GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is triAGENS GmbH, Cologne, Germany
///
/// @author Dr. Frank Celler
/// @author Copyright 2009-2011, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#ifndef TRIAGENS_FYN_APPLICATION_SERVER_APPLICATION_SERVER_DISPATCHER_IMPL_H
#define TRIAGENS_FYN_APPLICATION_SERVER_APPLICATION_SERVER_DISPATCHER_IMPL_H 1
#include "ApplicationServer/ApplicationServerSchedulerImpl.h"
#include "Dispatcher/ApplicationServerDispatcher.h"
#include "Dispatcher/Dispatcher.h"
namespace triagens {
namespace rest {
////////////////////////////////////////////////////////////////////////////////
/// @brief application server dispatcher implementation
////////////////////////////////////////////////////////////////////////////////
class ApplicationServerDispatcherImpl : virtual public ApplicationServerSchedulerImpl,
virtual public ApplicationServerDispatcher {
ApplicationServerDispatcherImpl (ApplicationServerDispatcherImpl const&);
ApplicationServerDispatcherImpl& operator= (ApplicationServerDispatcherImpl const&);
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief constructor
////////////////////////////////////////////////////////////////////////////////
ApplicationServerDispatcherImpl (string const& description, string const& version);
////////////////////////////////////////////////////////////////////////////////
/// @brief destructor
////////////////////////////////////////////////////////////////////////////////
~ApplicationServerDispatcherImpl ();
public:
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
Dispatcher* dispatcher () const {
return _dispatcher;
}
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
void buildDispatcher ();
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
void buildDispatcherReporter ();
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
void buildStandardQueue (size_t nrThreads);
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
void buildNamedQueue (string const& name, size_t nrThreads);
public:
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
void start ();
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
void wait ();
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
void beginShutdown ();
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
void shutdown ();
protected:
////////////////////////////////////////////////////////////////////////////////
/// @brief scheduler
////////////////////////////////////////////////////////////////////////////////
Dispatcher* _dispatcher;
////////////////////////////////////////////////////////////////////////////////
/// @brief reporting task
////////////////////////////////////////////////////////////////////////////////
Task* _dispatcherReporterTask;
};
}
}
#endif

117
Dispatcher/Dispatcher.h Normal file
View File

@ -0,0 +1,117 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief interface of a job dispatcher
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2010-2011 triagens GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is triAGENS GmbH, Cologne, Germany
///
/// @author Dr. Frank Celler
/// @author Martin Schoenert
/// @author Copyright 2009-2011, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#ifndef TRIAGENS_FYN_REST_DISPATCHER_H
#define TRIAGENS_FYN_REST_DISPATCHER_H 1
#include <Basics/Common.h>
////////////////////////////////////////////////////////////////////////////////
/// @defgroup Dispatcher Job Dispatcher
////////////////////////////////////////////////////////////////////////////////
namespace triagens {
namespace rest {
class Job;
class DispatcherQueue;
class DispatcherThread;
////////////////////////////////////////////////////////////////////////////////
/// @ingroup Dispatcher
/// @brief interface of a job dispatcher
////////////////////////////////////////////////////////////////////////////////
class Dispatcher {
private:
Dispatcher (Dispatcher const&);
Dispatcher& operator= (Dispatcher const&);
public:
/////////////////////////////////////////////////////////////////////////
/// @brief constructor
/////////////////////////////////////////////////////////////////////////
Dispatcher () {
}
/////////////////////////////////////////////////////////////////////////
/// @brief destructor
/////////////////////////////////////////////////////////////////////////
virtual ~Dispatcher () {
}
public:
/////////////////////////////////////////////////////////////////////////
/// @brief is the dispatcher still running
/////////////////////////////////////////////////////////////////////////
virtual bool isRunning () = 0;
/////////////////////////////////////////////////////////////////////////
/// @brief adds a new queue
/////////////////////////////////////////////////////////////////////////
virtual void addQueue (string const& name, size_t nrThreads) = 0;
/////////////////////////////////////////////////////////////////////////
/// @brief adds a new job
///
/// The method is called from the scheduler to add a new job request.
/// It returns immediately (i.e. without waiting for the job to finish).
/// When the job is finished the scheduler will be awoken and
/// the scheduler will write the response over the network to
/// the caller.
/////////////////////////////////////////////////////////////////////////
virtual bool addJob (Job*) = 0;
/////////////////////////////////////////////////////////////////////////
/// @brief start the dispatcher
/////////////////////////////////////////////////////////////////////////
virtual bool start () = 0;
/////////////////////////////////////////////////////////////////////////
/// @brief begins shutdown process
/////////////////////////////////////////////////////////////////////////
virtual void beginShutdown () = 0;
/////////////////////////////////////////////////////////////////////////
/// @brief reports status of dispatcher queues
/////////////////////////////////////////////////////////////////////////
virtual void reportStatus () = 0;
};
}
}
#endif

View File

@ -29,13 +29,13 @@
#include "DispatcherImpl.h"
#include <Basics/ConditionLocker.h>
#include <Basics/Logger.h>
#include <Logger/Logger.h>
#include <Basics/MutexLocker.h>
#include <Basics/StringUtils.h>
#include <Rest/Job.h>
#include "Dispatcher/DispatcherQueue.h"
#include "Dispatcher/DispatcherThread.h"
#include "Dispatcher/Job.h"
using namespace triagens::basics;
@ -160,7 +160,7 @@ namespace triagens {
void DispatcherImpl::reportStatus () {
if (TRI_IsDebugLogging()) {
if (TRI_IsDebugLogging(__FILE__)) {
MUTEX_LOCKER(accessDispatcher);
for (map<string, DispatcherQueue*>::iterator i = queues.begin(); i != queues.end(); ++i) {

View File

@ -31,9 +31,10 @@
#include <Basics/Common.h>
#include <Rest/Dispatcher.h>
#include <Basics/Mutex.h>
#include "Dispatcher/Dispatcher.h"
namespace triagens {
namespace rest {

View File

@ -29,7 +29,7 @@
#include "DispatcherQueue.h"
#include <Basics/ConditionLocker.h>
#include <Basics/Logger.h>
#include <Logger/Logger.h>
#include "Dispatcher/DispatcherThread.h"

View File

@ -32,8 +32,8 @@
#include <Basics/Common.h>
#include <Basics/ConditionVariable.h>
#include <Rest/Dispatcher.h>
#include "Dispatcher/Dispatcher.h"
#include "Dispatcher/DispatcherImpl.h"
namespace triagens {

View File

@ -29,12 +29,12 @@
#include "DispatcherThread.h"
#include <Basics/Exceptions.h>
#include <Basics/Logger.h>
#include <Logger/Logger.h>
#include <Basics/StringUtils.h>
#include <Rest/Dispatcher.h>
#include <Rest/Job.h>
#include "Dispatcher/Dispatcher.h"
#include "Dispatcher/DispatcherQueue.h"
#include "Dispatcher/Job.h"
using namespace triagens::basics;

View File

@ -32,7 +32,8 @@
#include <Basics/Common.h>
#include <Basics/Thread.h>
#include <Rest/Job.h>
#include "Dispatcher/Job.h"
namespace triagens {
namespace rest {

41
Dispatcher/Job.cpp Normal file
View File

@ -0,0 +1,41 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief abstract base class for jobs
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2010-2011 triagens GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is triAGENS GmbH, Cologne, Germany
///
/// @author Dr. Frank Celler
/// @author Martin Schoenert
/// @author Copyright 2009-2011, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#include "Job.h"
namespace triagens {
namespace rest {
// -----------------------------------------------------------------------------
// public methods
// -----------------------------------------------------------------------------
void Job::setDispatcherThread (DispatcherThread*) {
}
}
}

151
Dispatcher/Job.h Normal file
View File

@ -0,0 +1,151 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief abstract base class for jobs
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2010-2011 triagens GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is triAGENS GmbH, Cologne, Germany
///
/// @author Dr. Frank Celler
/// @author Martin Schoenert
/// @author Copyright 2009-2011, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#ifndef TRIAGENS_FYN_REST_JOB_H
#define TRIAGENS_FYN_REST_JOB_H 1
#include <Basics/Common.h>
namespace triagens {
namespace basics {
class TriagensError;
}
namespace rest {
class DispatcherThread;
////////////////////////////////////////////////////////////////////////////////
/// @ingroup Dispatcher
/// @brief abstract base class for jobs
////////////////////////////////////////////////////////////////////////////////
class Job {
private:
Job (Job const&);
Job& operator= (Job const&);
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief job types
////////////////////////////////////////////////////////////////////////////////
enum JobType {
READ_JOB,
WRITE_JOB,
SPECIAL_JOB
};
////////////////////////////////////////////////////////////////////////////////
/// @brief status of execution
////////////////////////////////////////////////////////////////////////////////
enum status_e {
JOB_DONE,
JOB_REQUEUE,
JOB_FAILED
};
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief constructs a job
////////////////////////////////////////////////////////////////////////////////
explicit
Job (string const& name)
: name(name) {
}
////////////////////////////////////////////////////////////////////////////////
/// @brief deletes the job
////////////////////////////////////////////////////////////////////////////////
virtual ~Job () {
}
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief gets the type of the job
///
/// Note that initialise can change the job type.
////////////////////////////////////////////////////////////////////////////////
virtual JobType type () = 0;
////////////////////////////////////////////////////////////////////////////////
/// @brief returns the queue name to use
////////////////////////////////////////////////////////////////////////////////
virtual string const& queue () {
static string standard = "STANDARD";
return standard;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief sets the thread which currently dealing with the job
////////////////////////////////////////////////////////////////////////////////
virtual void setDispatcherThread (DispatcherThread*);
////////////////////////////////////////////////////////////////////////////////
/// @brief starts working
////////////////////////////////////////////////////////////////////////////////
virtual status_e work () = 0;
////////////////////////////////////////////////////////////////////////////////
/// @brief cleans up after work and delete
////////////////////////////////////////////////////////////////////////////////
virtual void cleanup () = 0;
////////////////////////////////////////////////////////////////////////////////
/// @brief handle error and delete
////////////////////////////////////////////////////////////////////////////////
virtual void handleError (basics::TriagensError const&) = 0;
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief getter for the name
////////////////////////////////////////////////////////////////////////////////
const string& getName () const {
return name;
}
private:
const string& name;
};
}
}
#endif