1
0
Fork 0

Merge branch 'devel' of https://github.com/triAGENS/ArangoDB into devel

This commit is contained in:
Oreste Panaia 2012-06-29 14:28:10 +08:00
commit 4ea30529ba
23 changed files with 313 additions and 292 deletions

View File

@ -1,5 +1,4 @@
> ./arangod /tmp/vocbase > ./arangod /tmp/vocbase
2012-05-13T12:37:08Z [8145] INFO no user init file '/home/fceller/.arango/arango.conf' found
2012-05-13T12:37:08Z [8145] INFO using built-in JavaScript startup files 2012-05-13T12:37:08Z [8145] INFO using built-in JavaScript startup files
2012-05-13T12:37:08Z [8145] INFO ArangoDB (version 0.5.0) is ready for business 2012-05-13T12:37:08Z [8145] INFO ArangoDB (version 0.5.0) is ready for business
2012-05-13T12:37:08Z [8145] INFO HTTP client/admin port: 127.0.0.1:8529 2012-05-13T12:37:08Z [8145] INFO HTTP client/admin port: 127.0.0.1:8529

View File

@ -1,4 +1,4 @@
> ./arango --server.http-port 12345 /tmp/vocbase > ./arangod --server.http-port 12345 /tmp/vocbase
2012-02-05T13:23:52Z [455] INFO ArangoDB (version 0.0.8 [exported]) is ready for business 2012-02-05T13:23:52Z [455] INFO ArangoDB (version 0.0.8 [exported]) is ready for business
2012-02-05T13:23:52Z [455] INFO HTTP client port: 12345 2012-02-05T13:23:52Z [455] INFO HTTP client port: 12345
2012-02-05T13:23:52Z [455] INFO HTTP admin port: localhost:8530 2012-02-05T13:23:52Z [455] INFO HTTP admin port: localhost:8530

View File

@ -0,0 +1,2 @@
> ./arangod --supervisor --pid-file /var/run/arangodb.pid /tmp/vocbase/
2012-06-27T15:58:28Z [10133] INFO starting up in supervisor mode

View File

@ -0,0 +1,3 @@
> ps fax | grep arangod
10137 ? Ssl 0:00 ./arangod --supervisor --pid-file /var/run/arangodb.pid /tmp/vocbase/
10142 ? Sl 0:00 \_ ./arangod --supervisor --pid-file /var/run/arangodb.pid /tmp/vocbase/

View File

@ -0,0 +1,5 @@
> kill -SIGSEGV 10142
> ps fax | grep arangod
10137 ? Ssl 0:00 ./arangod --supervisor --pid-file /var/run/arangodb.pid /tmp/vocbase/
10168 ? Sl 0:00 \_ ./arangod --supervisor --pid-file /var/run/arangodb.pid /tmp/vocbase/

View File

@ -1,4 +1,4 @@
> ./arango --shell --startup.modules-path "/tmp/path1;/tmp/path2" /tmp/vocbase > ./arangod --shell --startup.modules-path "/tmp/path1;/tmp/path2" /tmp/vocbase
ArangoDB shell [V8 version 3.6.5.1, DB version 1 (9727)] ArangoDB shell [V8 version 3.6.5.1, DB version 1 (9727)]
arango> var test1 = require("test1"); arango> var test1 = require("test1");

View File

@ -132,6 +132,18 @@
/// that the server will run as a daemon. Note that, as with the daemon flag, /// that the server will run as a daemon. Note that, as with the daemon flag,
/// this flag requires that the pid-file parameter will set. /// this flag requires that the pid-file parameter will set.
/// ///
/// @verbinclude supervisor
///
/// As can be seen (e.g. by executing the ps command), this will start a supervisor
/// process and the actual database process:
///
/// @verbinclude supervisor-ps1
///
/// When the database process terminates unexpectedly, the supervisor process will
/// start up a new database process:
///
/// @verbinclude supervisor-ps2
///
/// @anchor CommandLineUid /// @anchor CommandLineUid
/// @copydetails triagens::rest::ApplicationServer::_uid /// @copydetails triagens::rest::ApplicationServer::_uid
/// ///

View File

@ -33,7 +33,6 @@
#include "Rest/HttpResponse.h" #include "Rest/HttpResponse.h"
#include "VocBase/vocbase.h" #include "VocBase/vocbase.h"
#include "GeneralServer/GeneralCommTask.h" #include "GeneralServer/GeneralCommTask.h"
#include "GeneralServer/GeneralServerBatchJob.h"
#include "GeneralServer/GeneralServerJob.h" #include "GeneralServer/GeneralServerJob.h"
#include "HttpServer/HttpHandler.h" #include "HttpServer/HttpHandler.h"
#include "HttpServer/HttpServer.h" #include "HttpServer/HttpServer.h"
@ -60,16 +59,27 @@ using namespace triagens::arango;
RestBatchHandler::RestBatchHandler (HttpRequest* request, TRI_vocbase_t* vocbase) RestBatchHandler::RestBatchHandler (HttpRequest* request, TRI_vocbase_t* vocbase)
: RestVocbaseBaseHandler(request, vocbase), : RestVocbaseBaseHandler(request, vocbase),
_missingResponses(0), _missingResponses(0),
_outputMessages(new PB_ArangoMessage), _reallyDone(0),
_handled(false) { _outputMessages(new PB_ArangoMessage) {
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief denstructor /// @brief destructor
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
RestBatchHandler::~RestBatchHandler () { RestBatchHandler::~RestBatchHandler () {
// delete protobuf message
delete _outputMessages; delete _outputMessages;
// clear all handlers that still exist
for (size_t i = 0; i < _handlers.size(); ++i) {
HttpHandler* handler = _handlers[i];
if (handler != 0) {
_task->getServer()->destroyHandler(handler);
_handlers[i] = 0;
}
}
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -112,124 +122,161 @@ HttpHandler::status_e RestBatchHandler::execute () {
HttpRequest::HttpRequestType type = request->requestType(); HttpRequest::HttpRequestType type = request->requestType();
string contentType = StringUtils::tolower(StringUtils::trim(request->header("content-type"))); string contentType = StringUtils::tolower(StringUtils::trim(request->header("content-type")));
if (type != HttpRequest::HTTP_REQUEST_POST || contentType != "application/x-protobuf") { if (type != HttpRequest::HTTP_REQUEST_POST || contentType != getContentType()) {
generateNotImplemented("ILLEGAL " + BATCH_PATH); generateNotImplemented("ILLEGAL " + BATCH_PATH);
return HANDLER_DONE; return HANDLER_DONE;
} }
/*
FILE* fp = fopen("got","w");
fwrite(request->body(), request->bodySize(), 1, fp);
fclose(fp);
*/
PB_ArangoMessage inputMessages; PB_ArangoMessage inputMessages;
bool result = inputMessages.ParseFromArray(request->body(), request->bodySize()); bool result = inputMessages.ParseFromArray(request->body(), request->bodySize());
if (!result) { if (!result) {
generateError(HttpResponse::BAD, generateError(HttpResponse::BAD,
TRI_ERROR_ARANGO_COLLECTION_PARAMETER_MISSING, // TODO FIXME TRI_ERROR_HTTP_BAD_PARAMETER,
"invalid protobuf message"); "invalid protobuf message");
return HANDLER_DONE; return HANDLER_DONE;
} }
assert(_task);
HttpServer* server = dynamic_cast<HttpServer*>(_task->getServer());
assert(server);
size_t asyncResponses = 0; bool failed = false;
bool hasAsync = false;
// loop over the input messages once to set up the output structures without concurrency
for (int i = 0; i < inputMessages.messages_size(); ++i) { for (int i = 0; i < inputMessages.messages_size(); ++i) {
_outputMessages->add_messages();
// create a handler for each input part
const PB_ArangoBatchMessage inputMessage = inputMessages.messages(i); const PB_ArangoBatchMessage inputMessage = inputMessages.messages(i);
{ // locked
MUTEX_LOCKER(_handlerLock);
_outputMessages->add_messages();
} // locked end
HttpRequestProtobuf* request = new HttpRequestProtobuf(inputMessage); HttpRequestProtobuf* request = new HttpRequestProtobuf(inputMessage);
HttpHandler* handler = _task->getServer()->createHandler(request); HttpHandler* handler = server->createHandler(request);
if (handler == 0) { if (!handler) {
delete request; failed = true;
// TODO: handle fail break;
} }
else { else {
{ _handlers.push_back(handler);
MUTEX_LOCKER(_handlerLock); if (!handler->isDirect()) {
_handlers.push_back(handler); // async handler
}
if (handler->isDirect()) {
// execute handler directly
Handler::status_e status = Handler::HANDLER_FAILED;
try {
status = handler->execute();
}
catch (...) {
// TODO
}
if (status != Handler::HANDLER_REQUEUE) {
addResponse(handler);
}
}
else {
// execute handler via dispatcher
++_missingResponses; ++_missingResponses;
++asyncResponses; hasAsync = true;
HttpServer* server = dynamic_cast<HttpServer*>(_task->getServer());
Scheduler* scheduler = server->getScheduler();
Dispatcher* dispatcher = server->getDispatcher();
Job* job = handler->createJob(scheduler, dispatcher, _task);
GeneralServerJob<HttpServer, HttpHandlerFactory::GeneralHandler>* generalJob = dynamic_cast<GeneralServerJob<HttpServer, HttpHandlerFactory::GeneralHandler> * >(job);
generalJob->attachObserver(this);
dispatcher->addJob(job);
} }
} }
}
if (asyncResponses == 0) {
// signal ourselves
GeneralAsyncCommTask<HttpServer, HttpHandlerFactory, HttpCommTask>* atask =
dynamic_cast<GeneralAsyncCommTask<HttpServer, HttpHandlerFactory, HttpCommTask>*>(_task);
atask->signal();
} }
return Handler::HANDLER_DONE; if (failed) {
// TODO: handle error!
std::cout << "SOMETHING FAILED--------------------------------------------------------------------------\n";
return Handler::HANDLER_DONE;
}
try {
// now loop again with all output structures set up
for (int i = 0; i < inputMessages.messages_size(); ++i) {
const PB_ArangoBatchMessage inputMessage = inputMessages.messages(i);
HttpHandler* handler = _handlers[i];
assert(handler);
if (handler->isDirect()) {
// execute handler directly
Handler::status_e status = Handler::HANDLER_FAILED;
try {
status = handler->execute();
}
catch (...) {
// TODO
}
if (status != Handler::HANDLER_REQUEUE) {
addResponse(handler);
}
}
else {
// execute handler via dispatcher
Scheduler* scheduler = server->getScheduler();
Dispatcher* dispatcher = server->getDispatcher();
Job* job = handler->createJob(scheduler, dispatcher, _task);
GeneralServerJob<HttpServer, HttpHandlerFactory::GeneralHandler>* generalJob =
dynamic_cast<GeneralServerJob<HttpServer, HttpHandlerFactory::GeneralHandler> * >(job);
generalJob->attachObserver(this);
dispatcher->addJob(job);
}
}
}
catch (...) {
std::cout << "SOMETHING WENT WRONG - EXCEPTION\n";
}
if (!hasAsync) {
_reallyDone = 1;
return Handler::HANDLER_DONE;
}
// we have async jobs
return Handler::HANDLER_DETACH;
}
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
bool RestBatchHandler::handleAsync () {
if (_reallyDone) {
assembleResponse();
toServerJob(_job)->setDone();
return HttpHandler::handleAsync();
}
return true;
}
////////////////////////////////////////////////////////////////////////////////
/// notification routine called by async sub jobs
////////////////////////////////////////////////////////////////////////////////
void RestBatchHandler::notify (Job* job, const Job::notification_e type) {
if (type != Job::JOB_CLEANUP) {
return;
}
assert(_reallyDone == 0);
HttpHandler* handler = toServerJob(job)->getHandler();
addResponse(handler);
if (--_missingResponses == 0) {
_reallyDone = 1;
// signal to the task that we are done
GeneralAsyncCommTask<HttpServer, HttpHandlerFactory, HttpCommTask>* atask =
dynamic_cast<GeneralAsyncCommTask<HttpServer, HttpHandlerFactory, HttpCommTask>*>(_task);
atask->signal();
}
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief create a special job for the handler /// @brief add a single handler response to the output array
////////////////////////////////////////////////////////////////////////////////
Job* RestBatchHandler::createJob (Scheduler* scheduler, Dispatcher* dispatcher, HttpCommTask* task) {
HttpServer* server = dynamic_cast<HttpServer*>(task->getServer());
GeneralAsyncCommTask<HttpServer, HttpHandlerFactory, GeneralCommTask<HttpServer, HttpHandlerFactory> >* atask =
dynamic_cast<GeneralAsyncCommTask<HttpServer, HttpHandlerFactory, GeneralCommTask<HttpServer, HttpHandlerFactory> >*>(task);
GeneralServerBatchJob<HttpServer, HttpHandlerFactory::GeneralHandler>* job
= new GeneralServerBatchJob<HttpServer, HttpHandlerFactory::GeneralHandler>(server, scheduler, dispatcher, atask, this);
setJob(job);
return job;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief add a response supplied by a handler
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
void RestBatchHandler::addResponse (HttpHandler* handler) { void RestBatchHandler::addResponse (HttpHandler* handler) {
MUTEX_LOCKER(_handlerLock);
for (size_t i = 0; i < _handlers.size(); ++i) { for (size_t i = 0; i < _handlers.size(); ++i) {
if (_handlers[i] == handler) { if (_handlers[i] == handler) {
// avoid concurrent modifications to the structure
MUTEX_LOCKER(_handlerLock);
PB_ArangoBatchMessage* batch = _outputMessages->mutable_messages(i); PB_ArangoBatchMessage* batch = _outputMessages->mutable_messages(i);
handler->getResponse()->write(batch); handler->getResponse()->write(batch);
// delete the handler
_task->getServer()->destroyHandler(handler);
_handlers[i] = 0;
return; return;
} }
} }
@ -237,89 +284,42 @@ void RestBatchHandler::addResponse (HttpHandler* handler) {
// handler not found // handler not found
LOGGER_WARNING << "handler not found. this should not happen."; LOGGER_WARNING << "handler not found. this should not happen.";
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief notification routine /// @brief create an overall protobuf response from the array of responses
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
void RestBatchHandler::notify (Job* job, const Job::notification_e type) {
if (type != Job::JOB_CLEANUP) {
return;
}
GeneralServerJob<HttpServer, HttpHandlerFactory::GeneralHandler>* generalJob = dynamic_cast<GeneralServerJob<HttpServer, HttpHandlerFactory::GeneralHandler> * >(job);
HttpHandler* handler = generalJob->getHandler();
addResponse(handler);
if (--_missingResponses == 0) {
handleAsync();
}
}
bool RestBatchHandler::handleAsync () {
if (_missingResponses == 0) {
{ // locked
MUTEX_LOCKER(_handlerLock);
if (_handled) {
return true;
}
_handled = true;
} // lock end
assembleResponse();
GeneralServerJob<HttpServer, HttpHandlerFactory::GeneralHandler>* generalJob = dynamic_cast<GeneralServerJob<HttpServer, HttpHandlerFactory::GeneralHandler> * >(_job);
generalJob->setDone();
_task->setHandler(0);
GeneralAsyncCommTask<HttpServer, HttpHandlerFactory, HttpCommTask>* atask =
dynamic_cast<GeneralAsyncCommTask<HttpServer, HttpHandlerFactory, HttpCommTask>*>(_task);
atask->setHandler(0);
// atask->signal();
_task->handleResponse(getResponse());
return _job->beginShutdown();
}
return true;
}
void RestBatchHandler::assembleResponse () { void RestBatchHandler::assembleResponse () {
assert(_missingResponses == 0); assert(_missingResponses == 0);
response = new HttpResponse; response = new HttpResponse(HttpResponse::OK);
response->setContentType("application/x-protobuf"); response->setContentType(getContentType());
string data; string data;
if (!_outputMessages->SerializeToString(&data)) { if (!_outputMessages->SerializeToString(&data)) {
// TODO // TODO
} }
response->body().appendText(data); response->body().appendText(data);
MUTEX_LOCKER(_handlerLock); // locked
for (size_t i = 0; i < _handlers.size(); ++i) {
HttpHandler* handler = _handlers[i];
_task->getServer()->destroyHandler(handler);
}
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @} /// @brief convert a Job* to a GeneralServerJob*
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
// ----------------------------------------------------------------------------- GeneralServerJob<HttpServer, HttpHandlerFactory::GeneralHandler>* RestBatchHandler::toServerJob(Job* job) {
// --SECTION-- protected methods return dynamic_cast<GeneralServerJob<HttpServer, HttpHandlerFactory::GeneralHandler> * >(job);
// ----------------------------------------------------------------------------- }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @addtogroup ArangoDB /// @brief return the required content type string
/// @{
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
string const& RestBatchHandler::getContentType () {
static string const contentType = "application/x-protobuf";
return contentType;
}
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @} /// @}
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////

View File

@ -34,6 +34,7 @@
#include "HttpServer/HttpHandler.h" #include "HttpServer/HttpHandler.h"
#include "HttpServer/HttpServer.h" #include "HttpServer/HttpServer.h"
#include "HttpServer/HttpCommTask.h" #include "HttpServer/HttpCommTask.h"
#include "GeneralServer/GeneralServerJob.h"
#include "Scheduler/Scheduler.h" #include "Scheduler/Scheduler.h"
#include "Dispatcher/Dispatcher.h" #include "Dispatcher/Dispatcher.h"
#include "ProtocolBuffers/arangodb.pb.h" #include "ProtocolBuffers/arangodb.pb.h"
@ -122,7 +123,7 @@ namespace triagens {
status_e execute (); status_e execute ();
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc} /// notification routine called by async sub jobs
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
virtual void notify (Job*, const Job::notification_e); virtual void notify (Job*, const Job::notification_e);
@ -131,39 +132,88 @@ namespace triagens {
/// {@inheritDoc} /// {@inheritDoc}
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
virtual Job* createJob (Scheduler*, Dispatcher*, HttpCommTask*); virtual bool handleAsync ();
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
bool handleAsync ();
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @} /// @}
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
// --SECTION-- protected methods // --SECTION-- private methods
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @addtogroup ArangoDB /// @addtogroup ArangoDB
/// @{ /// @{
////////////////////////////////////////////////////////////////////////////////
private:
////////////////////////////////////////////////////////////////////////////////
/// @brief add a single handler response to the output array
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
void addResponse (HttpHandler*); void addResponse (HttpHandler*);
////////////////////////////////////////////////////////////////////////////////
/// @brief create an overall protobuf response from the array of responses
////////////////////////////////////////////////////////////////////////////////
void assembleResponse (); void assembleResponse ();
////////////////////////////////////////////////////////////////////////////////
/// @brief convert a Job* to a GeneralServerJob*
////////////////////////////////////////////////////////////////////////////////
static GeneralServerJob<HttpServer, HttpHandlerFactory::GeneralHandler>* toServerJob(Job*);
////////////////////////////////////////////////////////////////////////////////
/// @brief return the required content type string
////////////////////////////////////////////////////////////////////////////////
static string const& getContentType ();
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- private variables
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup ArangoDB
/// @{
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @brief handlers created, ordered by their sequence in the input
////////////////////////////////////////////////////////////////////////////////
vector<HttpHandler*> _handlers; vector<HttpHandler*> _handlers;
////////////////////////////////////////////////////////////////////////////////
/// @brief number of async handler responses still missing
////////////////////////////////////////////////////////////////////////////////
sig_atomic_t _missingResponses; sig_atomic_t _missingResponses;
////////////////////////////////////////////////////////////////////////////////
/// @brief flag that's set when the handler is fully finished
////////////////////////////////////////////////////////////////////////////////
sig_atomic_t _reallyDone;
////////////////////////////////////////////////////////////////////////////////
/// @brief protobuffer container with all output messages
////////////////////////////////////////////////////////////////////////////////
PB_ArangoMessage* _outputMessages; PB_ArangoMessage* _outputMessages;
bool _handled;
////////////////////////////////////////////////////////////////////////////////
/// @brief mutex used to protected protobuffer and handlers structure
////////////////////////////////////////////////////////////////////////////////
basics::Mutex _handlerLock; basics::Mutex _handlerLock;
}; };

View File

@ -184,11 +184,12 @@ HttpHandler::status_e RestDocumentHandler::execute () {
/// then a @LIT{HTTP 400} is returned and the body of the response contains /// then a @LIT{HTTP 400} is returned and the body of the response contains
/// an error document. /// an error document.
/// ///
/// @REST{POST /_api/document?collection=@FA{collection-name}@LATEXBREAK&createCollection=@FA{create}} /// @REST{POST /_api/document?collection=@FA{collection-name}@LATEXBREAK&createCollection=@FA{create-flag}}
/// ///
/// Instead of a @FA{collection-identifier}, a @FA{collection-name} can be /// Instead of a @FA{collection-identifier}, a @FA{collection-name} can be
/// used. If @FA{createCollection} is true, then the collection is created if it /// used. If @FA{create-flag} has a value of @LIT{true} or @LIT{yes}, then the
/// does not exists. /// collection is created if it does not yet exist. Other values for @FA{create-flag}
/// will be ignored so the collection must be present for the operation to succeed.
/// ///
/// @EXAMPLES /// @EXAMPLES
/// ///

View File

@ -2275,12 +2275,10 @@ static v8::Handle<v8::Value> JS_EnsureCapConstraintVocbaseCol (v8::Arguments con
return scope.Close(index); return scope.Close(index);
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief ensures that a bitarray index exists /// @brief ensures that a bitarray index exists
/// ///
/// @FUN{ensureBitarray(@FA{field1}, @FA{value1}, @FA{field2}, @FA(value2},...,@FA{fieldn}, @FA{valuen})} /// @FUN{@FA{collection}.ensureBitarray(@FA{field1}, @FA{value1}, @FA{field2}, @FA{value2},...,@FA{fieldn}, @FA{valuen})}
/// ///
/// Creates a bitarray index on all documents using attributes as paths to /// Creates a bitarray index on all documents using attributes as paths to
/// the fields. At least one attribute and one set of possible values must be given. /// the fields. At least one attribute and one set of possible values must be given.

View File

@ -4337,8 +4337,9 @@ void TRI_FreeBitarrayIndex (TRI_index_t* idx) {
TRI_Free(TRI_UNKNOWN_MEM_ZONE, idx); TRI_Free(TRI_UNKNOWN_MEM_ZONE, idx);
} }
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// Local Variables: // Local Variables:
// mode: outline-minor // mode: outline-minor

View File

@ -357,6 +357,17 @@ namespace triagens {
return res == 0; return res == 0;
} }
off_t size (string const& path) {
struct stat stbuf;
int res = stat(path.c_str(), &stbuf);
if (res != 0) {
return 0;
}
return stbuf.st_size;
}

View File

@ -130,6 +130,14 @@ namespace triagens {
bool exists (string const& path); bool exists (string const& path);
////////////////////////////////////////////////////////////////////////////////
/// @brief returns the size of a file. will return 0 for non-existing files
///
/// the caller should check first if the file exists via the exists() method
////////////////////////////////////////////////////////////////////////////////
off_t size (string const& path);
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief strip extension /// @brief strip extension
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////

View File

@ -230,35 +230,42 @@ void DispatcherThread::run () {
// cleanup // cleanup
_queue->_monopolizer = 0; _queue->_monopolizer = 0;
// finish jobs // detached jobs (status == JOB::DETACH) might be killed asynchronously by other means
try { // it is not safe to use detached jobs after job->work()
job->setDispatcherThread(0);
if (status == Job::JOB_DONE) { if (status != Job::JOB_DETACH) {
job->cleanup(); // finish jobs
} try {
assert(status != Job::JOB_DETACH);
job->setDispatcherThread(0);
if (status == Job::JOB_DONE) {
job->cleanup();
}
#ifdef TRI_ENABLE_ZEROMQ #ifdef TRI_ENABLE_ZEROMQ
else if (status == Job::JOB_DONE_ZEROMQ) { else if (status == Job::JOB_DONE_ZEROMQ) {
job->finish(zBridge); job->finish(zBridge);
} }
#endif #endif
else if (status == Job::JOB_REQUEUE) { else if (status == Job::JOB_REQUEUE) {
_queue->_dispatcher->addJob(job); _queue->_dispatcher->addJob(job);
}
else if (status == Job::JOB_FAILED) {
job->cleanup();
}
} }
else if (status == Job::JOB_FAILED) { catch (...) {
job->cleanup();
}
}
catch (...) {
#ifdef TRI_HAVE_POSIX_THREADS #ifdef TRI_HAVE_POSIX_THREADS
if (_queue->_stopping != 0) { if (_queue->_stopping != 0) {
LOGGER_WARNING << "caught cancellation exception during cleanup"; LOGGER_WARNING << "caught cancellation exception during cleanup";
_queue->_accessQueue.unlock(); _queue->_accessQueue.unlock();
throw; throw;
} }
#endif #endif
LOGGER_WARNING << "caught error while cleaning up!"; LOGGER_WARNING << "caught error while cleaning up!";
}
} }
if (0 < _queue->_nrWaiting && ! _queue->_readyJobs.empty()) { if (0 < _queue->_nrWaiting && ! _queue->_readyJobs.empty()) {

View File

@ -92,6 +92,7 @@ namespace triagens {
enum status_e { enum status_e {
JOB_DONE, JOB_DONE,
JOB_DETACH,
#ifdef TRI_ENABLE_ZEROMQ #ifdef TRI_ENABLE_ZEROMQ
JOB_DONE_ZEROMQ, JOB_DONE_ZEROMQ,
#endif #endif

View File

@ -67,7 +67,6 @@ namespace triagens {
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
~GeneralAsyncCommTask () { ~GeneralAsyncCommTask () {
// this has shut down the job before, should we now free the handler if any?
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -109,8 +108,6 @@ namespace triagens {
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
void setHandler (Handler* handler) { void setHandler (Handler* handler) {
assert(handler);
_handler = handler; _handler = handler;
} }
@ -122,7 +119,9 @@ namespace triagens {
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
bool handleAsync () { bool handleAsync () {
assert(_handler); if (_handler == 0) {
return true;
}
return _handler->handleAsync(); return _handler->handleAsync();
} }

View File

@ -1,86 +0,0 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief general server batch job
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2010-2011 triagens GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is triAGENS GmbH, Cologne, Germany
///
/// @author Dr. Frank Celler
/// @author Jan Steemann
/// @author Copyright 2009-2011, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#ifndef TRIAGENS_FYN_GENERAL_SERVER_GENERAL_SERVER_BATCH_JOB_H
#define TRIAGENS_FYN_GENERAL_SERVER_GENERAL_SERVER_BATCH_JOB_H 1
#include <Basics/Common.h>
#include <Logger/Logger.h>
#include <Rest/Handler.h>
#include "Dispatcher/Job.h"
#include "GeneralServer/GeneralServerJob.h"
#include "Scheduler/AsyncTask.h"
namespace triagens {
namespace rest {
class Dispatcher;
class Scheduler;
////////////////////////////////////////////////////////////////////////////////
/// @brief general server batch job
////////////////////////////////////////////////////////////////////////////////
template<typename S, typename H>
class GeneralServerBatchJob : public GeneralServerJob<S, H> {
GeneralServerBatchJob (GeneralServerBatchJob const&);
GeneralServerBatchJob& operator= (GeneralServerBatchJob const&);
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief constructs a new server batch job
////////////////////////////////////////////////////////////////////////////////
GeneralServerBatchJob (S* server, Scheduler* scheduler, Dispatcher* dispatcher, AsyncTask* task, H* handler)
: GeneralServerJob<S, H>(server, scheduler, dispatcher, task, handler) {
}
////////////////////////////////////////////////////////////////////////////////
/// @brief destructs a server job
////////////////////////////////////////////////////////////////////////////////
~GeneralServerBatchJob () {
}
public:
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
void cleanup () {
// do nothing in cleanup
}
};
}
}
#endif

View File

@ -119,9 +119,6 @@ namespace triagens {
return false; return false;
} }
else { else {
// GeneralServerJob<S, typename HF::GeneralHandler>* job
// = new GeneralServerJob<S, typename HF::GeneralHandler>(dynamic_cast<S*>(this), this->_scheduler, _dispatcher, atask, handler);
atask->setHandler(handler); atask->setHandler(handler);
Job* job = handler->createJob(this->_scheduler, _dispatcher, atask); Job* job = handler->createJob(this->_scheduler, _dispatcher, atask);
_dispatcher->addJob(job); _dispatcher->addJob(job);

View File

@ -141,6 +141,7 @@ namespace triagens {
switch (status) { switch (status) {
case Handler::HANDLER_DONE: return Job::JOB_DONE; case Handler::HANDLER_DONE: return Job::JOB_DONE;
case Handler::HANDLER_DETACH: return Job::JOB_DETACH;
case Handler::HANDLER_REQUEUE: return Job::JOB_REQUEUE; case Handler::HANDLER_REQUEUE: return Job::JOB_REQUEUE;
case Handler::HANDLER_FAILED: return Job::JOB_FAILED; case Handler::HANDLER_FAILED: return Job::JOB_FAILED;
} }
@ -207,12 +208,12 @@ namespace triagens {
/// @brief set the job to done from the outside /// @brief set the job to done from the outside
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
void setDone () { virtual void setDone () {
_done = 1; _done = 1;
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief general server /// @brief get the general server
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
S* getServer () const { S* getServer () const {
@ -228,7 +229,7 @@ namespace triagens {
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief dispatcher /// @brief get the dispatcher
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
Dispatcher* getDispatcher () const { Dispatcher* getDispatcher () const {
@ -236,7 +237,7 @@ namespace triagens {
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief task /// @brief get the task
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
AsyncTask* getTask () const { AsyncTask* getTask () const {
@ -244,12 +245,20 @@ namespace triagens {
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief handler /// @brief get the handler
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
H* getHandler () const { H* getHandler () const {
return _handler; return _handler;
} }
////////////////////////////////////////////////////////////////////////////////
/// @brief set the handler
////////////////////////////////////////////////////////////////////////////////
void setHandler (H* handler) {
_handler = handler;
}
protected: protected:

View File

@ -68,7 +68,7 @@ static void CheckPidFile (string const& pidFile) {
TRI_FlushLogging(); TRI_FlushLogging();
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
else if (FileUtils::exists(pidFile)) { else if (FileUtils::exists(pidFile) && FileUtils::size(pidFile) > 0) {
LOGGER_INFO << "pid-file '" << pidFile << "' already exists, verifying pid"; LOGGER_INFO << "pid-file '" << pidFile << "' already exists, verifying pid";
ifstream f(pidFile.c_str()); ifstream f(pidFile.c_str());
@ -137,7 +137,7 @@ static void WritePidFile (string const& pidFile, int pid) {
ofstream out(pidFile.c_str(), ios::trunc); ofstream out(pidFile.c_str(), ios::trunc);
if (! out) { if (! out) {
cerr << "cannot write pid\n"; cerr << "cannot write pid-file \"" << pidFile << "\"\n";
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
@ -159,6 +159,9 @@ static int forkProcess (string const& pidFile, string const& workingDirectory, s
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
// Upon successful completion, fork() shall return 0 to the child process and
// shall return the process ID of the child process to the parent process.
// if we got a good PID, then we can exit the parent process // if we got a good PID, then we can exit the parent process
if (pid > 0) { if (pid > 0) {
LOGGER_DEBUG << "started child process with pid " << pid; LOGGER_DEBUG << "started child process with pid " << pid;

View File

@ -54,6 +54,7 @@ namespace triagens {
enum status_e { enum status_e {
HANDLER_DONE, HANDLER_DONE,
HANDLER_DETACH,
HANDLER_REQUEUE, HANDLER_REQUEUE,
HANDLER_FAILED HANDLER_FAILED
}; };

View File

@ -44,7 +44,7 @@ namespace triagens {
/// @brief abstract base class for tasks /// @brief abstract base class for tasks
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
class Task { class Task {
friend class TaskManager; friend class TaskManager;
Task (Task const&); Task (Task const&);
Task& operator= (Task const&); Task& operator= (Task const&);