diff --git a/arangod/Ahuacatl/ahuacatl-functions.c b/arangod/Ahuacatl/ahuacatl-functions.c index 592e0d5448..bfd2d4c16c 100644 --- a/arangod/Ahuacatl/ahuacatl-functions.c +++ b/arangod/Ahuacatl/ahuacatl-functions.c @@ -569,7 +569,7 @@ static void OptimisePaths (const TRI_aql_node_t* const fcallNode, /// @brief initialise the array with the function declarations //////////////////////////////////////////////////////////////////////////////// -TRI_associative_pointer_t* TRI_InitialiseFunctionsAql (void) { +TRI_associative_pointer_t* TRI_CreateFunctionsAql (void) { TRI_associative_pointer_t* functions; bool result; int res; @@ -713,6 +713,10 @@ TRI_associative_pointer_t* TRI_InitialiseFunctionsAql (void) { void TRI_FreeFunctionsAql (TRI_associative_pointer_t* functions) { size_t i; + if (functions == NULL) { + return; + } + for (i = 0; i < functions->_nrAlloc; ++i) { TRI_aql_function_t* function = (TRI_aql_function_t*) functions->_table[i]; if (function == NULL) { diff --git a/arangod/Ahuacatl/ahuacatl-functions.h b/arangod/Ahuacatl/ahuacatl-functions.h index 2db41398c1..00979c52ac 100644 --- a/arangod/Ahuacatl/ahuacatl-functions.h +++ b/arangod/Ahuacatl/ahuacatl-functions.h @@ -123,7 +123,7 @@ TRI_aql_function_t; /// @brief initialise the array with the function declarations //////////////////////////////////////////////////////////////////////////////// -struct TRI_associative_pointer_s* TRI_InitialiseFunctionsAql (void); +struct TRI_associative_pointer_s* TRI_CreateFunctionsAql (void); //////////////////////////////////////////////////////////////////////////////// /// @brief free the array with the function declarations diff --git a/arangod/Replication/InitialSyncer.cpp b/arangod/Replication/InitialSyncer.cpp index 65fa56b116..d8b3f0891d 100644 --- a/arangod/Replication/InitialSyncer.cpp +++ b/arangod/Replication/InitialSyncer.cpp @@ -69,6 +69,9 @@ InitialSyncer::InitialSyncer (TRI_vocbase_t* vocbase, _restrictCollections(restrictCollections), _restrictType(restrictType), _processedCollections(), + _batchId(0), + _batchUpdateTime(0), + _batchTtl(180), _chunkSize(), _verbose(verbose) { @@ -87,6 +90,9 @@ InitialSyncer::InitialSyncer (TRI_vocbase_t* vocbase, //////////////////////////////////////////////////////////////////////////////// InitialSyncer::~InitialSyncer () { + if (_batchId > 0) { + sendFinishBatch(); + } } //////////////////////////////////////////////////////////////////////////////// @@ -121,10 +127,16 @@ int InitialSyncer::run (string& errorMsg) { return res; } + res = sendStartBatch(errorMsg); + + if (res != TRI_ERROR_NO_ERROR) { + return res; + } + + map headers; - static const string url = BaseUrl + - "/inventory" + - "?serverId=" + _localServerIdString; + + const string url = BaseUrl + "/inventory?serverId=" + _localServerIdString; // send request const string progress = "fetching master inventory from " + url; @@ -144,6 +156,8 @@ int InitialSyncer::run (string& errorMsg) { delete response; } + sendFinishBatch(); + return TRI_ERROR_REPLICATION_NO_RESPONSE; } @@ -173,6 +187,8 @@ int InitialSyncer::run (string& errorMsg) { } delete response; + + sendFinishBatch(); return res; } @@ -190,6 +206,173 @@ int InitialSyncer::run (string& errorMsg) { /// @{ //////////////////////////////////////////////////////////////////////////////// +//////////////////////////////////////////////////////////////////////////////// +/// @brief send a "start batch" command +//////////////////////////////////////////////////////////////////////////////// + +int InitialSyncer::sendStartBatch (string& errorMsg) { + _batchId = 0; + + const map headers; + + const string url = BaseUrl + "/batch"; + const string body = "{\"ttl\":" + StringUtils::itoa(_batchTtl) + "}"; + + // send request + const string progress = "send batch start command to url " + url; + setProgress(progress.c_str()); + + SimpleHttpResult* response = _client->request(HttpRequest::HTTP_REQUEST_POST, + url, + body.c_str(), + body.size(), + headers); + + if (response == 0 || ! response->isComplete()) { + errorMsg = "could not connect to master at " + string(_masterInfo._endpoint) + + ": " + _client->getErrorMessage(); + + if (response != 0) { + delete response; + } + + return TRI_ERROR_REPLICATION_NO_RESPONSE; + } + + int res = TRI_ERROR_NO_ERROR; + + if (response->wasHttpError()) { + res = TRI_ERROR_REPLICATION_MASTER_ERROR; + + errorMsg = "got invalid response from master at " + string(_masterInfo._endpoint) + + ": HTTP " + StringUtils::itoa(response->getHttpReturnCode()) + + ": " + response->getHttpReturnMessage(); + } + + if (res == TRI_ERROR_NO_ERROR) { + TRI_json_t* json = TRI_JsonString(TRI_CORE_MEM_ZONE, response->getBody().str().c_str()); + + if (json == 0) { + res = TRI_ERROR_REPLICATION_INVALID_RESPONSE; + } + else { + const string id = JsonHelper::getStringValue(json, "id", ""); + + if (id == "") { + res = TRI_ERROR_REPLICATION_INVALID_RESPONSE; + } + else { + _batchId = StringUtils::uint64(id); + _batchUpdateTime = TRI_microtime(); + } + + TRI_FreeJson(TRI_CORE_MEM_ZONE, json); + } + } + + delete response; + + return res; +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief send an "extend batch" command +//////////////////////////////////////////////////////////////////////////////// + +int InitialSyncer::sendExtendBatch () { + if (_batchId == 0) { + return TRI_ERROR_NO_ERROR; + } + + double now = TRI_microtime(); + + if (now <= _batchUpdateTime + _batchTtl - 60) { + // no need to extend the batch yet + return TRI_ERROR_NO_ERROR; + } + + const map headers; + + const string url = BaseUrl + "/batch/" + StringUtils::itoa(_batchId); + const string body = "{\"ttl\":" + StringUtils::itoa(_batchTtl) + "}"; + + // send request + const string progress = "send batch start command to url " + url; + setProgress(progress.c_str()); + + SimpleHttpResult* response = _client->request(HttpRequest::HTTP_REQUEST_PUT, + url, + body.c_str(), + body.size(), + headers); + + if (response == 0 || ! response->isComplete()) { + if (response != 0) { + delete response; + } + + return TRI_ERROR_REPLICATION_NO_RESPONSE; + } + + int res = TRI_ERROR_NO_ERROR; + + if (response->wasHttpError()) { + res = TRI_ERROR_REPLICATION_MASTER_ERROR; + } + else { + _batchUpdateTime = TRI_microtime(); + } + + delete response; + + return res; +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief send a "finish batch" command +//////////////////////////////////////////////////////////////////////////////// + +int InitialSyncer::sendFinishBatch () { + if (_batchId == 0) { + return TRI_ERROR_NO_ERROR; + } + + const map headers; + const string url = BaseUrl + "/batch/" + StringUtils::itoa(_batchId); + + // send request + const string progress = "send batch finish command to url " + url; + setProgress(progress.c_str()); + + SimpleHttpResult* response = _client->request(HttpRequest::HTTP_REQUEST_DELETE, + url, + 0, + 0, + headers); + + if (response == 0 || ! response->isComplete()) { + if (response != 0) { + delete response; + } + + return TRI_ERROR_REPLICATION_NO_RESPONSE; + } + + int res = TRI_ERROR_NO_ERROR; + + if (response->wasHttpError()) { + res = TRI_ERROR_REPLICATION_MASTER_ERROR; + } + else { + _batchId = 0; + _batchUpdateTime = 0; + } + + delete response; + + return res; +} + //////////////////////////////////////////////////////////////////////////////// /// @brief apply the data from a collection dump //////////////////////////////////////////////////////////////////////////////// @@ -296,6 +479,7 @@ int InitialSyncer::handleCollectionDump (TRI_transaction_collection_t* trxCollec const string& collectionName, TRI_voc_tick_t maxTick, string& errorMsg) { + const string cid = StringUtils::itoa(trxCollection->_cid); const string baseUrl = BaseUrl + @@ -308,8 +492,9 @@ int InitialSyncer::handleCollectionDump (TRI_transaction_collection_t* trxCollec int batch = 1; while (1) { - string url = baseUrl + - "&from=" + StringUtils::itoa(fromTick); + sendExtendBatch(); + + string url = baseUrl + "&from=" + StringUtils::itoa(fromTick); if (maxTick > 0) { url += "&to=" + StringUtils::itoa(maxTick); @@ -413,6 +598,8 @@ int InitialSyncer::handleCollectionInitial (TRI_json_t const* parameters, string& errorMsg, sync_phase_e phase) { + sendExtendBatch(); + const string masterName = JsonHelper::getStringValue(parameters, "name", ""); if (masterName.empty()) { diff --git a/arangod/Replication/InitialSyncer.h b/arangod/Replication/InitialSyncer.h index c3be93c1fd..bfd553cd64 100644 --- a/arangod/Replication/InitialSyncer.h +++ b/arangod/Replication/InitialSyncer.h @@ -176,7 +176,7 @@ namespace triagens { /// @brief set a progress message //////////////////////////////////////////////////////////////////////////////// - void setProgress (const string& message) { + void setProgress (const std::string& message) { _progress = message; if (_verbose) { @@ -184,6 +184,24 @@ namespace triagens { } } +//////////////////////////////////////////////////////////////////////////////// +/// @brief send a "start batch" command +//////////////////////////////////////////////////////////////////////////////// + + int sendStartBatch (std::string&); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief send an "extend batch" command +//////////////////////////////////////////////////////////////////////////////// + + int sendExtendBatch (); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief send a "finish batch" command +//////////////////////////////////////////////////////////////////////////////// + + int sendFinishBatch (); + //////////////////////////////////////////////////////////////////////////////// /// @brief apply the data from a collection dump //////////////////////////////////////////////////////////////////////////////// @@ -265,6 +283,24 @@ namespace triagens { std::map _processedCollections; +//////////////////////////////////////////////////////////////////////////////// +/// @brief dump batch id +//////////////////////////////////////////////////////////////////////////////// + + uint64_t _batchId; + +//////////////////////////////////////////////////////////////////////////////// +/// @brief dump batch last update time +//////////////////////////////////////////////////////////////////////////////// + + double _batchUpdateTime; + +//////////////////////////////////////////////////////////////////////////////// +/// @brief ttl for batches +//////////////////////////////////////////////////////////////////////////////// + + int _batchTtl; + //////////////////////////////////////////////////////////////////////////////// /// @brief chunk size to use //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/RestHandler/RestDocumentHandler.cpp b/arangod/RestHandler/RestDocumentHandler.cpp index e7d324234f..5f723b86d1 100644 --- a/arangod/RestHandler/RestDocumentHandler.cpp +++ b/arangod/RestHandler/RestDocumentHandler.cpp @@ -468,7 +468,7 @@ bool RestDocumentHandler::readDocument () { /// @RESTURLPARAMETERS /// /// @RESTURLPARAM{document-handle,string,required} -/// The Handle of the Document. +/// The handle of the document. /// /// @RESTHEADERPARAMETERS /// diff --git a/arangod/RestHandler/RestReplicationHandler.cpp b/arangod/RestHandler/RestReplicationHandler.cpp index 7ab882db51..08e7d9d9ce 100644 --- a/arangod/RestHandler/RestReplicationHandler.cpp +++ b/arangod/RestHandler/RestReplicationHandler.cpp @@ -36,6 +36,7 @@ #include "HttpServer/HttpServer.h" #include "Replication/InitialSyncer.h" #include "Rest/HttpRequest.h" +#include "VocBase/compactor.h" #include "VocBase/replication-applier.h" #include "VocBase/replication-dump.h" #include "VocBase/replication-logger.h" @@ -122,7 +123,7 @@ Handler::status_e RestReplicationHandler::execute() { const size_t len = suffix.size(); - if (len == 1) { + if (len >= 1) { const string& command = suffix[0]; if (command == "logger-start") { @@ -160,6 +161,9 @@ Handler::status_e RestReplicationHandler::execute() { } handleCommandLoggerFollow(); } + else if (command == "batch") { + handleCommandBatch(); + } else if (command == "inventory") { if (type != HttpRequest::HTTP_REQUEST_GET) { goto BAD_CALL; @@ -797,6 +801,177 @@ void RestReplicationHandler::handleCommandLoggerSetConfig () { handleCommandLoggerGetConfig(); } +//////////////////////////////////////////////////////////////////////////////// +/// @brief handle a dump batch command +/// +/// @RESTHEADER{POST /_api/replication/batch,creates a new dump batch} +/// +/// @RESTDESCRIPTION +/// Creates a new dump batch and returns the batch's id. +/// +/// The body of the request must be a JSON hash with the following attributes: +/// +/// - `ttl`: the time-to-live for the new batch (in seconds) +/// +/// The response is a JSON hash with the following attributes: +/// +/// - `id`: the id of the batch +/// +/// @RESTRETURNCODES +/// +/// @RESTRETURNCODE{204} +/// is returned if the batch was created successfully. +/// +/// @RESTRETURNCODE{400} +/// is returned if the ttl value is invalid. +/// +/// @RESTRETURNCODE{405} +/// is returned when an invalid HTTP method is used. +//////////////////////////////////////////////////////////////////////////////// + +//////////////////////////////////////////////////////////////////////////////// +/// @brief handle a dump batch command +/// +/// @RESTHEADER{PUT /_api/replication/`id`,prolongs an existing dump batch} +/// +/// @RESTURLPARAM{id,string,required} +/// The id of the batch. +/// +/// @RESTDESCRIPTION +/// Extends the ttl of an existing dump batch, using the batch's id and +/// the provided ttl value. +/// +/// The body of the request must be a JSON hash with the following attributes: +/// +/// - `ttl`: the time-to-live for the batch (in seconds) +/// +/// If the batch's ttl can be extended successully, the response is empty. +/// +/// @RESTRETURNCODES +/// +/// @RESTRETURNCODE{204} +/// is returned if the batch's ttl was extended successfully. +/// +/// @RESTRETURNCODE{400} +/// is returned if the ttl value is invalid or the batch was not found. +/// +/// @RESTRETURNCODE{405} +/// is returned when an invalid HTTP method is used. +//////////////////////////////////////////////////////////////////////////////// + +//////////////////////////////////////////////////////////////////////////////// +/// @brief handle a dump batch command +/// +/// @RESTHEADER{DELETE /_api/replication/`id`,deletes an existing dump batch} +/// +/// @RESTURLPARAM{id,string,required} +/// The id of the batch. +/// +/// @RESTDESCRIPTION +/// Deletes the existing dump batch, allowing compaction and cleanup to resume. +/// +/// @RESTRETURNCODES +/// +/// @RESTRETURNCODE{204} +/// is returned if the batch was deleted successfully. +/// +/// @RESTRETURNCODE{400} +/// is returned if the batch was not found. +/// +/// @RESTRETURNCODE{405} +/// is returned when an invalid HTTP method is used. +//////////////////////////////////////////////////////////////////////////////// + +void RestReplicationHandler::handleCommandBatch () { + // extract the request type + const HttpRequest::HttpRequestType type = _request->requestType(); + vector const& suffix = _request->suffix(); + const size_t len = suffix.size(); + + assert(len >= 1); + + if (type == HttpRequest::HTTP_REQUEST_POST) { + // create a new blocker + + TRI_json_t* input = _request->toJson(0); + + if (input == 0) { + generateError(HttpResponse::BAD, + TRI_ERROR_HTTP_BAD_PARAMETER, + "invalid JSON"); + return; + } + + // extract ttl + double expires = JsonHelper::getNumericValue(input, "ttl", 0); + + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, input); + + TRI_voc_tick_t id; + int res = TRI_InsertBlockerCompactorVocBase(_vocbase, expires, &id); + + if (res != TRI_ERROR_NO_ERROR) { + generateError(HttpResponse::BAD, res); + } + + TRI_json_t json; + TRI_InitArrayJson(TRI_CORE_MEM_ZONE, &json); + TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, &json, "id", TRI_CreateStringJson(TRI_CORE_MEM_ZONE, TRI_StringUInt64((uint64_t) id))); + + generateResult(&json); + TRI_DestroyJson(TRI_CORE_MEM_ZONE, &json); + return; + } + + if (type == HttpRequest::HTTP_REQUEST_PUT && len >= 2) { + // extend an existing blocker + TRI_voc_tick_t id = (TRI_voc_tick_t) StringUtils::uint64(suffix[1]); + + TRI_json_t* input = _request->toJson(0); + + if (input == 0) { + generateError(HttpResponse::BAD, + TRI_ERROR_HTTP_BAD_PARAMETER, + "invalid JSON"); + return; + } + + // extract ttl + double expires = JsonHelper::getNumericValue(input, "ttl", 0); + + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, input); + + // now extend the blocker + int res = TRI_TouchBlockerCompactorVocBase(_vocbase, id, expires); + + if (res == TRI_ERROR_NO_ERROR) { + _response = createResponse(HttpResponse::NO_CONTENT); + } + else { + generateError(HttpResponse::BAD, res); + } + return; + } + + if (type == HttpRequest::HTTP_REQUEST_DELETE && len >= 2) { + // delete an existing blocker + TRI_voc_tick_t id = (TRI_voc_tick_t) StringUtils::uint64(suffix[1]); + + int res = TRI_RemoveBlockerCompactorVocBase(_vocbase, id); + + if (res == TRI_ERROR_NO_ERROR) { + _response = createResponse(HttpResponse::NO_CONTENT); + } + else { + generateError(HttpResponse::BAD, res); + } + return; + } + + // we get here if anything above is invalid + generateError(HttpResponse::METHOD_NOT_ALLOWED, TRI_ERROR_HTTP_METHOD_NOT_ALLOWED); +} + //////////////////////////////////////////////////////////////////////////////// /// @brief returns ranged data from the replication log /// @@ -1281,7 +1456,7 @@ void RestReplicationHandler::handleCommandRestoreCollection () { if (json == 0) { generateError(HttpResponse::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, - "invalid collection parameter"); + "invalid JSON"); return; } diff --git a/arangod/RestHandler/RestReplicationHandler.h b/arangod/RestHandler/RestReplicationHandler.h index af438d5990..068a87e422 100644 --- a/arangod/RestHandler/RestReplicationHandler.h +++ b/arangod/RestHandler/RestReplicationHandler.h @@ -209,6 +209,12 @@ namespace triagens { void handleCommandLoggerFollow (); +//////////////////////////////////////////////////////////////////////////////// +/// @brief handle a batch command +//////////////////////////////////////////////////////////////////////////////// + + void handleCommandBatch (); + //////////////////////////////////////////////////////////////////////////////// /// @brief return the inventory (current replication and collection state) //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/VocBase/cleanup.c b/arangod/VocBase/cleanup.c index ecbac49c3a..56651fe365 100644 --- a/arangod/VocBase/cleanup.c +++ b/arangod/VocBase/cleanup.c @@ -31,6 +31,7 @@ #include "BasicsC/logging.h" #include "BasicsC/tri-strings.h" #include "VocBase/barrier.h" +#include "VocBase/compactor.h" #include "VocBase/document-collection.h" #include "VocBase/shadow-data.h" @@ -230,12 +231,10 @@ void TRI_CleanupVocBase (void* data) { TRI_InitVectorPointer(&collections, TRI_UNKNOWN_MEM_ZONE); while (true) { - size_t n; - size_t i; - TRI_col_type_e type; - // keep initial _state value as vocbase->_state might change during compaction loop - int state = vocbase->_state; - + int state; + + // keep initial _state value as vocbase->_state might change during cleanup loop + state = vocbase->_state; ++iterations; @@ -246,46 +245,54 @@ void TRI_CleanupVocBase (void* data) { CleanupShadows(vocbase, true); } - // copy all collections - TRI_READ_LOCK_COLLECTIONS_VOCBASE(vocbase); - TRI_CopyDataVectorPointer(&collections, &vocbase->_collections); - TRI_READ_UNLOCK_COLLECTIONS_VOCBASE(vocbase); + // check if we can get the compactor lock exclusively + if (TRI_CheckAndLockCompactorVocBase(vocbase)) { + size_t i, n; + TRI_col_type_e type; - n = collections._length; + // copy all collections + TRI_READ_LOCK_COLLECTIONS_VOCBASE(vocbase); + TRI_CopyDataVectorPointer(&collections, &vocbase->_collections); + TRI_READ_UNLOCK_COLLECTIONS_VOCBASE(vocbase); - for (i = 0; i < n; ++i) { - TRI_vocbase_col_t* collection; - TRI_primary_collection_t* primary; + n = collections._length; - collection = collections._buffer[i]; + for (i = 0; i < n; ++i) { + TRI_vocbase_col_t* collection; + TRI_primary_collection_t* primary; - TRI_READ_LOCK_STATUS_VOCBASE_COL(collection); + collection = collections._buffer[i]; - primary = collection->_collection; + TRI_READ_LOCK_STATUS_VOCBASE_COL(collection); - if (primary == NULL) { - TRI_READ_UNLOCK_STATUS_VOCBASE_COL(collection); - continue; - } + primary = collection->_collection; - type = primary->base._info._type; - - TRI_READ_UNLOCK_STATUS_VOCBASE_COL(collection); - - // we're the only ones that can unload the collection, so using - // the collection pointer outside the lock is ok - - // maybe cleanup indexes, unload the collection or some datafiles - if (TRI_IS_DOCUMENT_COLLECTION(type)) { - TRI_document_collection_t* document = (TRI_document_collection_t*) primary; - - // clean indexes? - if (iterations % (uint64_t) CLEANUP_INDEX_ITERATIONS == 0) { - document->cleanupIndexes(document); + if (primary == NULL) { + TRI_READ_UNLOCK_STATUS_VOCBASE_COL(collection); + continue; } - CleanupDocumentCollection(document); + type = primary->base._info._type; + + TRI_READ_UNLOCK_STATUS_VOCBASE_COL(collection); + + // we're the only ones that can unload the collection, so using + // the collection pointer outside the lock is ok + + // maybe cleanup indexes, unload the collection or some datafiles + if (TRI_IS_DOCUMENT_COLLECTION(type)) { + TRI_document_collection_t* document = (TRI_document_collection_t*) primary; + + // clean indexes? + if (iterations % (uint64_t) CLEANUP_INDEX_ITERATIONS == 0) { + document->cleanupIndexes(document); + } + + CleanupDocumentCollection(document); + } } + + TRI_UnlockCompactorVocBase(vocbase); } if (vocbase->_state >= 1) { @@ -294,6 +301,9 @@ void TRI_CleanupVocBase (void* data) { CleanupShadows(vocbase, false); } + // clean up expired compactor locks + TRI_CleanupCompactorVocBase(vocbase); + TRI_LockCondition(&vocbase->_cleanupCondition); TRI_TimedWaitCondition(&vocbase->_cleanupCondition, (uint64_t) CLEANUP_INTERVAL); TRI_UnlockCondition(&vocbase->_cleanupCondition); diff --git a/arangod/VocBase/compactor.c b/arangod/VocBase/compactor.c index e2f0c80aa7..a0b00ffde3 100644 --- a/arangod/VocBase/compactor.c +++ b/arangod/VocBase/compactor.c @@ -100,6 +100,16 @@ static int const COMPACTOR_INTERVAL = (1 * 1000 * 1000); /// @{ //////////////////////////////////////////////////////////////////////////////// +//////////////////////////////////////////////////////////////////////////////// +/// @brief compaction blocker entry +//////////////////////////////////////////////////////////////////////////////// + +typedef struct compaction_blocker_s { + TRI_voc_tick_t _id; + double _expires; +} +compaction_blocker_t; + //////////////////////////////////////////////////////////////////////////////// /// @brief compaction state //////////////////////////////////////////////////////////////////////////////// @@ -985,6 +995,67 @@ static bool CompactifyDocumentCollection (TRI_document_collection_t* document) { return true; } +//////////////////////////////////////////////////////////////////////////////// +/// @brief try to write-lock the compaction +/// returns true if lock acquisition was successful. the caller is responsible +/// to free the write lock eventually +//////////////////////////////////////////////////////////////////////////////// + +static bool TryLockCompaction (TRI_vocbase_t* vocbase) { + return TRI_TryWriteLockReadWriteLock(&vocbase->_compactionBlockers._lock); +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief write-lock the compaction +//////////////////////////////////////////////////////////////////////////////// + +static void LockCompaction (TRI_vocbase_t* vocbase) { + TRI_WriteLockReadWriteLock(&vocbase->_compactionBlockers._lock); +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief write-unlock the compaction +//////////////////////////////////////////////////////////////////////////////// + +static void UnlockCompaction (TRI_vocbase_t* vocbase) { + TRI_WriteUnlockReadWriteLock(&vocbase->_compactionBlockers._lock); +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief atomic check and lock for running the compaction +/// if this function returns true, it has acquired a write-lock on the +/// compactionBlockers structure, which the caller must free eventually +//////////////////////////////////////////////////////////////////////////////// + +static bool CheckAndLockCompaction (TRI_vocbase_t* vocbase) { + double now; + size_t i, n; + + now = TRI_microtime(); + + // check if we can acquire the write lock instantly + if (! TryLockCompaction(vocbase)) { + // couldn't acquire the write lock + return false; + } + + // we are now holding the write lock + + // check if we have a still-valid compaction blocker + n = vocbase->_compactionBlockers._data._length; + for (i = 0; i < n; ++i) { + compaction_blocker_t* blocker = TRI_AtVector(&vocbase->_compactionBlockers._data, i); + + if (blocker->_expires > now) { + // found a compaction blocker. unlock and return + UnlockCompaction(vocbase); + return false; + } + } + + return true; +} + //////////////////////////////////////////////////////////////////////////////// /// @} //////////////////////////////////////////////////////////////////////////////// @@ -998,97 +1069,287 @@ static bool CompactifyDocumentCollection (TRI_document_collection_t* document) { /// @{ //////////////////////////////////////////////////////////////////////////////// +//////////////////////////////////////////////////////////////////////////////// +/// @brief initialise the compaction blockers structure +//////////////////////////////////////////////////////////////////////////////// + +int TRI_InitCompactorVocBase (TRI_vocbase_t* vocbase) { + TRI_InitReadWriteLock(&vocbase->_compactionBlockers._lock); + TRI_InitVector(&vocbase->_compactionBlockers._data, TRI_UNKNOWN_MEM_ZONE, sizeof(compaction_blocker_t)); + + return TRI_ERROR_NO_ERROR; +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief destroy the compaction blockers structure +//////////////////////////////////////////////////////////////////////////////// + +void TRI_DestroyCompactorVocBase (TRI_vocbase_t* vocbase) { + TRI_DestroyVector(&vocbase->_compactionBlockers._data); + TRI_DestroyReadWriteLock(&vocbase->_compactionBlockers._lock); +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief remove data of expired compaction blockers +//////////////////////////////////////////////////////////////////////////////// + +bool TRI_CleanupCompactorVocBase (TRI_vocbase_t* vocbase) { + double now; + size_t i, n; + + now = TRI_microtime(); + + // check if we can instantly acquire the lock + if (! TryLockCompaction(vocbase)) { + // couldn't acquire lock + return false; + } + + // we are now holding the write lock + + n = vocbase->_compactionBlockers._data._length; + + i = 0; + while (i < n) { + compaction_blocker_t* blocker = TRI_AtVector(&vocbase->_compactionBlockers._data, i); + + if (blocker->_expires < now) { + TRI_RemoveVector(&vocbase->_compactionBlockers._data, i); + n--; + } + else { + i++; + } + } + + UnlockCompaction(vocbase); + + return true; +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief insert a compaction blocker +//////////////////////////////////////////////////////////////////////////////// + +int TRI_InsertBlockerCompactorVocBase (TRI_vocbase_t* vocbase, + double lifetime, + TRI_voc_tick_t* id) { + compaction_blocker_t blocker; + int res; + + if (lifetime <= 0.0) { + return TRI_ERROR_BAD_PARAMETER; + } + + blocker._id = TRI_NewTickVocBase(); + blocker._expires = TRI_microtime() + lifetime; + + LockCompaction(vocbase); + + res = TRI_PushBackVector(&vocbase->_compactionBlockers._data, &blocker); + + UnlockCompaction(vocbase); + + if (res != TRI_ERROR_NO_ERROR) { + return res; + } + + *id = blocker._id; + + return TRI_ERROR_NO_ERROR; +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief touch an existing compaction blocker +//////////////////////////////////////////////////////////////////////////////// + +int TRI_TouchBlockerCompactorVocBase (TRI_vocbase_t* vocbase, + TRI_voc_tick_t id, + double lifetime) { + size_t i, n; + bool found; + + found = false; + + if (lifetime <= 0.0) { + return TRI_ERROR_BAD_PARAMETER; + } + + LockCompaction(vocbase); + + n = vocbase->_compactionBlockers._data._length; + + for (i = 0; i < n; ++i) { + compaction_blocker_t* blocker = TRI_AtVector(&vocbase->_compactionBlockers._data, i); + + if (blocker->_id == id) { + blocker->_expires = TRI_microtime() + lifetime; + found = true; + break; + } + } + + UnlockCompaction(vocbase); + + if (! found) { + return TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND; + } + + return TRI_ERROR_NO_ERROR; +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief atomically check-and-lock the compactor +/// if the function returns true, then a write-lock on the compactor was +/// acquired, which must eventually be freed by the caller +//////////////////////////////////////////////////////////////////////////////// + +bool TRI_CheckAndLockCompactorVocBase (TRI_vocbase_t* vocbase) { + return TryLockCompaction(vocbase); +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief unlock the compactor +//////////////////////////////////////////////////////////////////////////////// + +void TRI_UnlockCompactorVocBase (TRI_vocbase_t* vocbase) { + UnlockCompaction(vocbase); +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief remove an existing compaction blocker +//////////////////////////////////////////////////////////////////////////////// + +int TRI_RemoveBlockerCompactorVocBase (TRI_vocbase_t* vocbase, + TRI_voc_tick_t id) { + size_t i, n; + bool found; + + found = false; + + LockCompaction(vocbase); + + n = vocbase->_compactionBlockers._data._length; + + for (i = 0; i < n; ++i) { + compaction_blocker_t* blocker = TRI_AtVector(&vocbase->_compactionBlockers._data, i); + + if (blocker->_id == id) { + TRI_RemoveVector(&vocbase->_compactionBlockers._data, i); + found = true; + break; + } + } + + UnlockCompaction(vocbase); + + if (! found) { + return TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND; + } + + return TRI_ERROR_NO_ERROR; +} + //////////////////////////////////////////////////////////////////////////////// /// @brief compactor event loop //////////////////////////////////////////////////////////////////////////////// void TRI_CompactorVocBase (void* data) { - TRI_vocbase_t* vocbase = data; + TRI_vocbase_t* vocbase; TRI_vector_pointer_t collections; + vocbase = data; assert(vocbase->_state == 1); TRI_InitVectorPointer(&collections, TRI_UNKNOWN_MEM_ZONE); while (true) { - TRI_col_type_e type; - size_t i, n; int state; - bool worked; // keep initial _state value as vocbase->_state might change during compaction loop state = vocbase->_state; + + // check if compaction is currently disallowed + if (CheckAndLockCompaction(vocbase)) { + // compaction is currently allowed + size_t i, n; - // copy all collections - TRI_READ_LOCK_COLLECTIONS_VOCBASE(vocbase); - TRI_CopyDataVectorPointer(&collections, &vocbase->_collections); - TRI_READ_UNLOCK_COLLECTIONS_VOCBASE(vocbase); + // copy all collections + TRI_READ_LOCK_COLLECTIONS_VOCBASE(vocbase); + TRI_CopyDataVectorPointer(&collections, &vocbase->_collections); + TRI_READ_UNLOCK_COLLECTIONS_VOCBASE(vocbase); - n = collections._length; + n = collections._length; - for (i = 0; i < n; ++i) { - TRI_vocbase_col_t* collection; - TRI_primary_collection_t* primary; - bool doCompact; + for (i = 0; i < n; ++i) { + TRI_vocbase_col_t* collection; + TRI_primary_collection_t* primary; + TRI_col_type_e type; + bool doCompact; + bool worked; + + collection = collections._buffer[i]; - collection = collections._buffer[i]; + if (! TRI_TRY_READ_LOCK_STATUS_VOCBASE_COL(collection)) { + // if we can't acquire the read lock instantly, we continue directly + // we don't want to stall here for too long + continue; + } - if (! TRI_TRY_READ_LOCK_STATUS_VOCBASE_COL(collection)) { - // if we can't acquire the read lock instantly, we continue directly - // we don't want to stall here for too long - continue; - } + primary = collection->_collection; - primary = collection->_collection; + if (primary == NULL) { + TRI_READ_UNLOCK_STATUS_VOCBASE_COL(collection); + continue; + } - if (primary == NULL) { - TRI_READ_UNLOCK_STATUS_VOCBASE_COL(collection); - continue; - } + worked = false; + doCompact = primary->base._info._doCompact; + type = primary->base._info._type; - worked = false; - doCompact = primary->base._info._doCompact; - type = primary->base._info._type; + // for document collection, compactify datafiles + if (TRI_IS_DOCUMENT_COLLECTION(type)) { + if (collection->_status == TRI_VOC_COL_STATUS_LOADED && doCompact) { + TRI_barrier_t* ce; + + // check whether someone else holds a read-lock on the compaction lock + if (! TRI_TryWriteLockReadWriteLock(&primary->_compactionLock)) { + // someone else is holding the compactor lock, we'll not compact + TRI_READ_UNLOCK_STATUS_VOCBASE_COL(collection); + continue; + } - // for document collection, compactify datafiles - if (TRI_IS_DOCUMENT_COLLECTION(type)) { - if (collection->_status == TRI_VOC_COL_STATUS_LOADED && doCompact) { - TRI_barrier_t* ce; + ce = TRI_CreateBarrierCompaction(&primary->_barrierList); + + if (ce == NULL) { + // out of memory + LOG_WARNING("out of memory when trying to create a barrier element"); + } + else { + worked = CompactifyDocumentCollection((TRI_document_collection_t*) primary); + + TRI_FreeBarrier(ce); + } - // check whether someone else holds a read-lock on the compaction lock - if (! TRI_TryWriteLockReadWriteLock(&primary->_compactionLock)) { - // someone else is holding the compactor lock, we'll not compact - TRI_READ_UNLOCK_STATUS_VOCBASE_COL(collection); - continue; + // read-unlock the compaction lock + TRI_WriteUnlockReadWriteLock(&primary->_compactionLock); } + } - ce = TRI_CreateBarrierCompaction(&primary->_barrierList); + TRI_READ_UNLOCK_STATUS_VOCBASE_COL(collection); - if (ce == NULL) { - // out of memory - LOG_WARNING("out of memory when trying to create a barrier element"); - } - else { - worked = CompactifyDocumentCollection((TRI_document_collection_t*) primary); - - TRI_FreeBarrier(ce); - } - - // read-unlock the compaction lock - TRI_WriteUnlockReadWriteLock(&primary->_compactionLock); + if (worked) { + // signal the cleanup thread that we worked and that it can now wake up + TRI_LockCondition(&vocbase->_cleanupCondition); + TRI_SignalCondition(&vocbase->_cleanupCondition); + TRI_UnlockCondition(&vocbase->_cleanupCondition); } } - TRI_READ_UNLOCK_STATUS_VOCBASE_COL(collection); - - if (worked) { - // signal the cleanup thread that we worked and that it can now wake up - TRI_LockCondition(&vocbase->_cleanupCondition); - TRI_SignalCondition(&vocbase->_cleanupCondition); - TRI_UnlockCondition(&vocbase->_cleanupCondition); - } + UnlockCompaction(vocbase); } + if (vocbase->_state == 1) { // only sleep while server is still running usleep(COMPACTOR_INTERVAL); diff --git a/arangod/VocBase/compactor.h b/arangod/VocBase/compactor.h index 26ec2f2e26..1ad0213da5 100644 --- a/arangod/VocBase/compactor.h +++ b/arangod/VocBase/compactor.h @@ -30,10 +30,14 @@ #include "BasicsC/common.h" +#include "VocBase/voc-types.h" + #ifdef __cplusplus extern "C" { #endif +struct TRI_vocbase_s; + // ----------------------------------------------------------------------------- // --SECTION-- public functions // ----------------------------------------------------------------------------- @@ -43,6 +47,69 @@ extern "C" { /// @{ //////////////////////////////////////////////////////////////////////////////// +//////////////////////////////////////////////////////////////////////////////// +/// @brief initialise the compaction blockers structure +//////////////////////////////////////////////////////////////////////////////// + +int TRI_InitCompactorVocBase (struct TRI_vocbase_s*); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief destroy the compaction blockers structure +//////////////////////////////////////////////////////////////////////////////// + +void TRI_DestroyCompactorVocBase (struct TRI_vocbase_s*); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief atomic check and lock for running the compaction +/// if this function returns true, it has acquired a write-lock on the +/// compactionBlockers structure +//////////////////////////////////////////////////////////////////////////////// + +bool TRI_CheckAndLockCompactorVocBase (struct TRI_vocbase_s*); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief remove data of expired compaction blockers +//////////////////////////////////////////////////////////////////////////////// + +bool TRI_CleanupCompactorVocBase (struct TRI_vocbase_s*); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief insert a compaction blocker +//////////////////////////////////////////////////////////////////////////////// + +int TRI_InsertBlockerCompactorVocBase (struct TRI_vocbase_s*, + double, + TRI_voc_tick_t*); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief touch an existing compaction blocker +//////////////////////////////////////////////////////////////////////////////// + +int TRI_TouchBlockerCompactorVocBase (struct TRI_vocbase_s*, + TRI_voc_tick_t, + double); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief remove an existing compaction blocker +//////////////////////////////////////////////////////////////////////////////// + +int TRI_RemoveBlockerCompactorVocBase (struct TRI_vocbase_s*, + TRI_voc_tick_t); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief atomically check-and-lock the compactor +/// if the function returns true, then a write-lock on the compactor was +/// acquired, which must eventually be freed by the caller +//////////////////////////////////////////////////////////////////////////////// + +bool TRI_CheckAndLockCompactorVocBase (struct TRI_vocbase_s*); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief unlock the compactor +//////////////////////////////////////////////////////////////////////////////// + +void TRI_UnlockCompactorVocBase (struct TRI_vocbase_s*); + //////////////////////////////////////////////////////////////////////////////// /// @brief compactor event loop //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/VocBase/vocbase.c b/arangod/VocBase/vocbase.c index 2767d50ec0..2784c636aa 100644 --- a/arangod/VocBase/vocbase.c +++ b/arangod/VocBase/vocbase.c @@ -1542,7 +1542,13 @@ TRI_vocbase_t* TRI_OpenVocBase (char const* path, ApplyDefaults(vocbase, defaults); // init AQL functions - vocbase->_functions = TRI_InitialiseFunctionsAql(); + vocbase->_functions = TRI_CreateFunctionsAql(); + + if (vocbase->_functions == NULL) { + LOG_FATAL_AND_EXIT("cannot create AQL functions"); + } + + TRI_InitCompactorVocBase(vocbase); // init collections TRI_InitVectorPointer(&vocbase->_collections, TRI_UNKNOWN_MEM_ZONE); @@ -1632,6 +1638,8 @@ TRI_vocbase_t* TRI_OpenVocBase (char const* path, res = ScanPath(vocbase, vocbase->_path, iterateMarkers); if (res != TRI_ERROR_NO_ERROR) { + TRI_FreeFunctionsAql(vocbase->_functions); + TRI_DestroyCompactorVocBase(vocbase); TRI_DestroyAssociativePointer(&vocbase->_collectionsByName); TRI_DestroyAssociativePointer(&vocbase->_collectionsById); TRI_DestroyVectorPointer(&vocbase->_collections); @@ -1819,6 +1827,8 @@ void TRI_DestroyVocBase (TRI_vocbase_t* vocbase) { TRI_DestroyVectorPointer(&vocbase->_collections); TRI_DestroyVectorPointer(&vocbase->_deadCollections); + TRI_DestroyCompactorVocBase(vocbase); + // free AQL functions TRI_FreeFunctionsAql(vocbase->_functions); diff --git a/arangod/VocBase/vocbase.h b/arangod/VocBase/vocbase.h index 95076d8a45..4357080b9a 100644 --- a/arangod/VocBase/vocbase.h +++ b/arangod/VocBase/vocbase.h @@ -374,6 +374,12 @@ typedef struct TRI_vocbase_s { struct TRI_shadow_store_s* _cursors; TRI_associative_pointer_t* _functions; + struct { + TRI_read_write_lock_t _lock; + TRI_vector_t _data; + } + _compactionBlockers; + TRI_condition_t _cleanupCondition; TRI_condition_t _syncWaitersCondition; int64_t _syncWaiters;