1
0
Fork 0

block compaction & datafile removal while dumping

This commit is contained in:
Jan Steemann 2013-08-30 17:25:09 +02:00
parent 54dd63dd32
commit 1af5aa6b45
12 changed files with 867 additions and 105 deletions

View File

@ -569,7 +569,7 @@ static void OptimisePaths (const TRI_aql_node_t* const fcallNode,
/// @brief initialise the array with the function declarations
////////////////////////////////////////////////////////////////////////////////
TRI_associative_pointer_t* TRI_InitialiseFunctionsAql (void) {
TRI_associative_pointer_t* TRI_CreateFunctionsAql (void) {
TRI_associative_pointer_t* functions;
bool result;
int res;
@ -713,6 +713,10 @@ TRI_associative_pointer_t* TRI_InitialiseFunctionsAql (void) {
void TRI_FreeFunctionsAql (TRI_associative_pointer_t* functions) {
size_t i;
if (functions == NULL) {
return;
}
for (i = 0; i < functions->_nrAlloc; ++i) {
TRI_aql_function_t* function = (TRI_aql_function_t*) functions->_table[i];
if (function == NULL) {

View File

@ -123,7 +123,7 @@ TRI_aql_function_t;
/// @brief initialise the array with the function declarations
////////////////////////////////////////////////////////////////////////////////
struct TRI_associative_pointer_s* TRI_InitialiseFunctionsAql (void);
struct TRI_associative_pointer_s* TRI_CreateFunctionsAql (void);
////////////////////////////////////////////////////////////////////////////////
/// @brief free the array with the function declarations

View File

@ -69,6 +69,9 @@ InitialSyncer::InitialSyncer (TRI_vocbase_t* vocbase,
_restrictCollections(restrictCollections),
_restrictType(restrictType),
_processedCollections(),
_batchId(0),
_batchUpdateTime(0),
_batchTtl(180),
_chunkSize(),
_verbose(verbose) {
@ -87,6 +90,9 @@ InitialSyncer::InitialSyncer (TRI_vocbase_t* vocbase,
////////////////////////////////////////////////////////////////////////////////
InitialSyncer::~InitialSyncer () {
if (_batchId > 0) {
sendFinishBatch();
}
}
////////////////////////////////////////////////////////////////////////////////
@ -121,10 +127,16 @@ int InitialSyncer::run (string& errorMsg) {
return res;
}
res = sendStartBatch(errorMsg);
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
map<string, string> headers;
static const string url = BaseUrl +
"/inventory" +
"?serverId=" + _localServerIdString;
const string url = BaseUrl + "/inventory?serverId=" + _localServerIdString;
// send request
const string progress = "fetching master inventory from " + url;
@ -144,6 +156,8 @@ int InitialSyncer::run (string& errorMsg) {
delete response;
}
sendFinishBatch();
return TRI_ERROR_REPLICATION_NO_RESPONSE;
}
@ -173,6 +187,8 @@ int InitialSyncer::run (string& errorMsg) {
}
delete response;
sendFinishBatch();
return res;
}
@ -190,6 +206,173 @@ int InitialSyncer::run (string& errorMsg) {
/// @{
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @brief send a "start batch" command
////////////////////////////////////////////////////////////////////////////////
int InitialSyncer::sendStartBatch (string& errorMsg) {
_batchId = 0;
const map<string, string> headers;
const string url = BaseUrl + "/batch";
const string body = "{\"ttl\":" + StringUtils::itoa(_batchTtl) + "}";
// send request
const string progress = "send batch start command to url " + url;
setProgress(progress.c_str());
SimpleHttpResult* response = _client->request(HttpRequest::HTTP_REQUEST_POST,
url,
body.c_str(),
body.size(),
headers);
if (response == 0 || ! response->isComplete()) {
errorMsg = "could not connect to master at " + string(_masterInfo._endpoint) +
": " + _client->getErrorMessage();
if (response != 0) {
delete response;
}
return TRI_ERROR_REPLICATION_NO_RESPONSE;
}
int res = TRI_ERROR_NO_ERROR;
if (response->wasHttpError()) {
res = TRI_ERROR_REPLICATION_MASTER_ERROR;
errorMsg = "got invalid response from master at " + string(_masterInfo._endpoint) +
": HTTP " + StringUtils::itoa(response->getHttpReturnCode()) +
": " + response->getHttpReturnMessage();
}
if (res == TRI_ERROR_NO_ERROR) {
TRI_json_t* json = TRI_JsonString(TRI_CORE_MEM_ZONE, response->getBody().str().c_str());
if (json == 0) {
res = TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
else {
const string id = JsonHelper::getStringValue(json, "id", "");
if (id == "") {
res = TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
else {
_batchId = StringUtils::uint64(id);
_batchUpdateTime = TRI_microtime();
}
TRI_FreeJson(TRI_CORE_MEM_ZONE, json);
}
}
delete response;
return res;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief send an "extend batch" command
////////////////////////////////////////////////////////////////////////////////
int InitialSyncer::sendExtendBatch () {
if (_batchId == 0) {
return TRI_ERROR_NO_ERROR;
}
double now = TRI_microtime();
if (now <= _batchUpdateTime + _batchTtl - 60) {
// no need to extend the batch yet
return TRI_ERROR_NO_ERROR;
}
const map<string, string> headers;
const string url = BaseUrl + "/batch/" + StringUtils::itoa(_batchId);
const string body = "{\"ttl\":" + StringUtils::itoa(_batchTtl) + "}";
// send request
const string progress = "send batch start command to url " + url;
setProgress(progress.c_str());
SimpleHttpResult* response = _client->request(HttpRequest::HTTP_REQUEST_PUT,
url,
body.c_str(),
body.size(),
headers);
if (response == 0 || ! response->isComplete()) {
if (response != 0) {
delete response;
}
return TRI_ERROR_REPLICATION_NO_RESPONSE;
}
int res = TRI_ERROR_NO_ERROR;
if (response->wasHttpError()) {
res = TRI_ERROR_REPLICATION_MASTER_ERROR;
}
else {
_batchUpdateTime = TRI_microtime();
}
delete response;
return res;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief send a "finish batch" command
////////////////////////////////////////////////////////////////////////////////
int InitialSyncer::sendFinishBatch () {
if (_batchId == 0) {
return TRI_ERROR_NO_ERROR;
}
const map<string, string> headers;
const string url = BaseUrl + "/batch/" + StringUtils::itoa(_batchId);
// send request
const string progress = "send batch finish command to url " + url;
setProgress(progress.c_str());
SimpleHttpResult* response = _client->request(HttpRequest::HTTP_REQUEST_DELETE,
url,
0,
0,
headers);
if (response == 0 || ! response->isComplete()) {
if (response != 0) {
delete response;
}
return TRI_ERROR_REPLICATION_NO_RESPONSE;
}
int res = TRI_ERROR_NO_ERROR;
if (response->wasHttpError()) {
res = TRI_ERROR_REPLICATION_MASTER_ERROR;
}
else {
_batchId = 0;
_batchUpdateTime = 0;
}
delete response;
return res;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief apply the data from a collection dump
////////////////////////////////////////////////////////////////////////////////
@ -296,6 +479,7 @@ int InitialSyncer::handleCollectionDump (TRI_transaction_collection_t* trxCollec
const string& collectionName,
TRI_voc_tick_t maxTick,
string& errorMsg) {
const string cid = StringUtils::itoa(trxCollection->_cid);
const string baseUrl = BaseUrl +
@ -308,8 +492,9 @@ int InitialSyncer::handleCollectionDump (TRI_transaction_collection_t* trxCollec
int batch = 1;
while (1) {
string url = baseUrl +
"&from=" + StringUtils::itoa(fromTick);
sendExtendBatch();
string url = baseUrl + "&from=" + StringUtils::itoa(fromTick);
if (maxTick > 0) {
url += "&to=" + StringUtils::itoa(maxTick);
@ -413,6 +598,8 @@ int InitialSyncer::handleCollectionInitial (TRI_json_t const* parameters,
string& errorMsg,
sync_phase_e phase) {
sendExtendBatch();
const string masterName = JsonHelper::getStringValue(parameters, "name", "");
if (masterName.empty()) {

View File

@ -176,7 +176,7 @@ namespace triagens {
/// @brief set a progress message
////////////////////////////////////////////////////////////////////////////////
void setProgress (const string& message) {
void setProgress (const std::string& message) {
_progress = message;
if (_verbose) {
@ -184,6 +184,24 @@ namespace triagens {
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief send a "start batch" command
////////////////////////////////////////////////////////////////////////////////
int sendStartBatch (std::string&);
////////////////////////////////////////////////////////////////////////////////
/// @brief send an "extend batch" command
////////////////////////////////////////////////////////////////////////////////
int sendExtendBatch ();
////////////////////////////////////////////////////////////////////////////////
/// @brief send a "finish batch" command
////////////////////////////////////////////////////////////////////////////////
int sendFinishBatch ();
////////////////////////////////////////////////////////////////////////////////
/// @brief apply the data from a collection dump
////////////////////////////////////////////////////////////////////////////////
@ -265,6 +283,24 @@ namespace triagens {
std::map<TRI_voc_cid_t, std::string> _processedCollections;
////////////////////////////////////////////////////////////////////////////////
/// @brief dump batch id
////////////////////////////////////////////////////////////////////////////////
uint64_t _batchId;
////////////////////////////////////////////////////////////////////////////////
/// @brief dump batch last update time
////////////////////////////////////////////////////////////////////////////////
double _batchUpdateTime;
////////////////////////////////////////////////////////////////////////////////
/// @brief ttl for batches
////////////////////////////////////////////////////////////////////////////////
int _batchTtl;
////////////////////////////////////////////////////////////////////////////////
/// @brief chunk size to use
////////////////////////////////////////////////////////////////////////////////

View File

@ -468,7 +468,7 @@ bool RestDocumentHandler::readDocument () {
/// @RESTURLPARAMETERS
///
/// @RESTURLPARAM{document-handle,string,required}
/// The Handle of the Document.
/// The handle of the document.
///
/// @RESTHEADERPARAMETERS
///

View File

@ -36,6 +36,7 @@
#include "HttpServer/HttpServer.h"
#include "Replication/InitialSyncer.h"
#include "Rest/HttpRequest.h"
#include "VocBase/compactor.h"
#include "VocBase/replication-applier.h"
#include "VocBase/replication-dump.h"
#include "VocBase/replication-logger.h"
@ -122,7 +123,7 @@ Handler::status_e RestReplicationHandler::execute() {
const size_t len = suffix.size();
if (len == 1) {
if (len >= 1) {
const string& command = suffix[0];
if (command == "logger-start") {
@ -160,6 +161,9 @@ Handler::status_e RestReplicationHandler::execute() {
}
handleCommandLoggerFollow();
}
else if (command == "batch") {
handleCommandBatch();
}
else if (command == "inventory") {
if (type != HttpRequest::HTTP_REQUEST_GET) {
goto BAD_CALL;
@ -797,6 +801,177 @@ void RestReplicationHandler::handleCommandLoggerSetConfig () {
handleCommandLoggerGetConfig();
}
////////////////////////////////////////////////////////////////////////////////
/// @brief handle a dump batch command
///
/// @RESTHEADER{POST /_api/replication/batch,creates a new dump batch}
///
/// @RESTDESCRIPTION
/// Creates a new dump batch and returns the batch's id.
///
/// The body of the request must be a JSON hash with the following attributes:
///
/// - `ttl`: the time-to-live for the new batch (in seconds)
///
/// The response is a JSON hash with the following attributes:
///
/// - `id`: the id of the batch
///
/// @RESTRETURNCODES
///
/// @RESTRETURNCODE{204}
/// is returned if the batch was created successfully.
///
/// @RESTRETURNCODE{400}
/// is returned if the ttl value is invalid.
///
/// @RESTRETURNCODE{405}
/// is returned when an invalid HTTP method is used.
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @brief handle a dump batch command
///
/// @RESTHEADER{PUT /_api/replication/`id`,prolongs an existing dump batch}
///
/// @RESTURLPARAM{id,string,required}
/// The id of the batch.
///
/// @RESTDESCRIPTION
/// Extends the ttl of an existing dump batch, using the batch's id and
/// the provided ttl value.
///
/// The body of the request must be a JSON hash with the following attributes:
///
/// - `ttl`: the time-to-live for the batch (in seconds)
///
/// If the batch's ttl can be extended successully, the response is empty.
///
/// @RESTRETURNCODES
///
/// @RESTRETURNCODE{204}
/// is returned if the batch's ttl was extended successfully.
///
/// @RESTRETURNCODE{400}
/// is returned if the ttl value is invalid or the batch was not found.
///
/// @RESTRETURNCODE{405}
/// is returned when an invalid HTTP method is used.
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @brief handle a dump batch command
///
/// @RESTHEADER{DELETE /_api/replication/`id`,deletes an existing dump batch}
///
/// @RESTURLPARAM{id,string,required}
/// The id of the batch.
///
/// @RESTDESCRIPTION
/// Deletes the existing dump batch, allowing compaction and cleanup to resume.
///
/// @RESTRETURNCODES
///
/// @RESTRETURNCODE{204}
/// is returned if the batch was deleted successfully.
///
/// @RESTRETURNCODE{400}
/// is returned if the batch was not found.
///
/// @RESTRETURNCODE{405}
/// is returned when an invalid HTTP method is used.
////////////////////////////////////////////////////////////////////////////////
void RestReplicationHandler::handleCommandBatch () {
// extract the request type
const HttpRequest::HttpRequestType type = _request->requestType();
vector<string> const& suffix = _request->suffix();
const size_t len = suffix.size();
assert(len >= 1);
if (type == HttpRequest::HTTP_REQUEST_POST) {
// create a new blocker
TRI_json_t* input = _request->toJson(0);
if (input == 0) {
generateError(HttpResponse::BAD,
TRI_ERROR_HTTP_BAD_PARAMETER,
"invalid JSON");
return;
}
// extract ttl
double expires = JsonHelper::getNumericValue<double>(input, "ttl", 0);
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, input);
TRI_voc_tick_t id;
int res = TRI_InsertBlockerCompactorVocBase(_vocbase, expires, &id);
if (res != TRI_ERROR_NO_ERROR) {
generateError(HttpResponse::BAD, res);
}
TRI_json_t json;
TRI_InitArrayJson(TRI_CORE_MEM_ZONE, &json);
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, &json, "id", TRI_CreateStringJson(TRI_CORE_MEM_ZONE, TRI_StringUInt64((uint64_t) id)));
generateResult(&json);
TRI_DestroyJson(TRI_CORE_MEM_ZONE, &json);
return;
}
if (type == HttpRequest::HTTP_REQUEST_PUT && len >= 2) {
// extend an existing blocker
TRI_voc_tick_t id = (TRI_voc_tick_t) StringUtils::uint64(suffix[1]);
TRI_json_t* input = _request->toJson(0);
if (input == 0) {
generateError(HttpResponse::BAD,
TRI_ERROR_HTTP_BAD_PARAMETER,
"invalid JSON");
return;
}
// extract ttl
double expires = JsonHelper::getNumericValue<double>(input, "ttl", 0);
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, input);
// now extend the blocker
int res = TRI_TouchBlockerCompactorVocBase(_vocbase, id, expires);
if (res == TRI_ERROR_NO_ERROR) {
_response = createResponse(HttpResponse::NO_CONTENT);
}
else {
generateError(HttpResponse::BAD, res);
}
return;
}
if (type == HttpRequest::HTTP_REQUEST_DELETE && len >= 2) {
// delete an existing blocker
TRI_voc_tick_t id = (TRI_voc_tick_t) StringUtils::uint64(suffix[1]);
int res = TRI_RemoveBlockerCompactorVocBase(_vocbase, id);
if (res == TRI_ERROR_NO_ERROR) {
_response = createResponse(HttpResponse::NO_CONTENT);
}
else {
generateError(HttpResponse::BAD, res);
}
return;
}
// we get here if anything above is invalid
generateError(HttpResponse::METHOD_NOT_ALLOWED, TRI_ERROR_HTTP_METHOD_NOT_ALLOWED);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief returns ranged data from the replication log
///
@ -1281,7 +1456,7 @@ void RestReplicationHandler::handleCommandRestoreCollection () {
if (json == 0) {
generateError(HttpResponse::BAD,
TRI_ERROR_HTTP_BAD_PARAMETER,
"invalid collection parameter");
"invalid JSON");
return;
}

View File

@ -209,6 +209,12 @@ namespace triagens {
void handleCommandLoggerFollow ();
////////////////////////////////////////////////////////////////////////////////
/// @brief handle a batch command
////////////////////////////////////////////////////////////////////////////////
void handleCommandBatch ();
////////////////////////////////////////////////////////////////////////////////
/// @brief return the inventory (current replication and collection state)
////////////////////////////////////////////////////////////////////////////////

View File

@ -31,6 +31,7 @@
#include "BasicsC/logging.h"
#include "BasicsC/tri-strings.h"
#include "VocBase/barrier.h"
#include "VocBase/compactor.h"
#include "VocBase/document-collection.h"
#include "VocBase/shadow-data.h"
@ -230,12 +231,10 @@ void TRI_CleanupVocBase (void* data) {
TRI_InitVectorPointer(&collections, TRI_UNKNOWN_MEM_ZONE);
while (true) {
size_t n;
size_t i;
TRI_col_type_e type;
// keep initial _state value as vocbase->_state might change during compaction loop
int state = vocbase->_state;
int state;
// keep initial _state value as vocbase->_state might change during cleanup loop
state = vocbase->_state;
++iterations;
@ -246,46 +245,54 @@ void TRI_CleanupVocBase (void* data) {
CleanupShadows(vocbase, true);
}
// copy all collections
TRI_READ_LOCK_COLLECTIONS_VOCBASE(vocbase);
TRI_CopyDataVectorPointer(&collections, &vocbase->_collections);
TRI_READ_UNLOCK_COLLECTIONS_VOCBASE(vocbase);
// check if we can get the compactor lock exclusively
if (TRI_CheckAndLockCompactorVocBase(vocbase)) {
size_t i, n;
TRI_col_type_e type;
n = collections._length;
// copy all collections
TRI_READ_LOCK_COLLECTIONS_VOCBASE(vocbase);
TRI_CopyDataVectorPointer(&collections, &vocbase->_collections);
TRI_READ_UNLOCK_COLLECTIONS_VOCBASE(vocbase);
for (i = 0; i < n; ++i) {
TRI_vocbase_col_t* collection;
TRI_primary_collection_t* primary;
n = collections._length;
collection = collections._buffer[i];
for (i = 0; i < n; ++i) {
TRI_vocbase_col_t* collection;
TRI_primary_collection_t* primary;
TRI_READ_LOCK_STATUS_VOCBASE_COL(collection);
collection = collections._buffer[i];
primary = collection->_collection;
TRI_READ_LOCK_STATUS_VOCBASE_COL(collection);
if (primary == NULL) {
TRI_READ_UNLOCK_STATUS_VOCBASE_COL(collection);
continue;
}
primary = collection->_collection;
type = primary->base._info._type;
TRI_READ_UNLOCK_STATUS_VOCBASE_COL(collection);
// we're the only ones that can unload the collection, so using
// the collection pointer outside the lock is ok
// maybe cleanup indexes, unload the collection or some datafiles
if (TRI_IS_DOCUMENT_COLLECTION(type)) {
TRI_document_collection_t* document = (TRI_document_collection_t*) primary;
// clean indexes?
if (iterations % (uint64_t) CLEANUP_INDEX_ITERATIONS == 0) {
document->cleanupIndexes(document);
if (primary == NULL) {
TRI_READ_UNLOCK_STATUS_VOCBASE_COL(collection);
continue;
}
CleanupDocumentCollection(document);
type = primary->base._info._type;
TRI_READ_UNLOCK_STATUS_VOCBASE_COL(collection);
// we're the only ones that can unload the collection, so using
// the collection pointer outside the lock is ok
// maybe cleanup indexes, unload the collection or some datafiles
if (TRI_IS_DOCUMENT_COLLECTION(type)) {
TRI_document_collection_t* document = (TRI_document_collection_t*) primary;
// clean indexes?
if (iterations % (uint64_t) CLEANUP_INDEX_ITERATIONS == 0) {
document->cleanupIndexes(document);
}
CleanupDocumentCollection(document);
}
}
TRI_UnlockCompactorVocBase(vocbase);
}
if (vocbase->_state >= 1) {
@ -294,6 +301,9 @@ void TRI_CleanupVocBase (void* data) {
CleanupShadows(vocbase, false);
}
// clean up expired compactor locks
TRI_CleanupCompactorVocBase(vocbase);
TRI_LockCondition(&vocbase->_cleanupCondition);
TRI_TimedWaitCondition(&vocbase->_cleanupCondition, (uint64_t) CLEANUP_INTERVAL);
TRI_UnlockCondition(&vocbase->_cleanupCondition);

View File

@ -100,6 +100,16 @@ static int const COMPACTOR_INTERVAL = (1 * 1000 * 1000);
/// @{
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @brief compaction blocker entry
////////////////////////////////////////////////////////////////////////////////
typedef struct compaction_blocker_s {
TRI_voc_tick_t _id;
double _expires;
}
compaction_blocker_t;
////////////////////////////////////////////////////////////////////////////////
/// @brief compaction state
////////////////////////////////////////////////////////////////////////////////
@ -985,6 +995,67 @@ static bool CompactifyDocumentCollection (TRI_document_collection_t* document) {
return true;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief try to write-lock the compaction
/// returns true if lock acquisition was successful. the caller is responsible
/// to free the write lock eventually
////////////////////////////////////////////////////////////////////////////////
static bool TryLockCompaction (TRI_vocbase_t* vocbase) {
return TRI_TryWriteLockReadWriteLock(&vocbase->_compactionBlockers._lock);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief write-lock the compaction
////////////////////////////////////////////////////////////////////////////////
static void LockCompaction (TRI_vocbase_t* vocbase) {
TRI_WriteLockReadWriteLock(&vocbase->_compactionBlockers._lock);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief write-unlock the compaction
////////////////////////////////////////////////////////////////////////////////
static void UnlockCompaction (TRI_vocbase_t* vocbase) {
TRI_WriteUnlockReadWriteLock(&vocbase->_compactionBlockers._lock);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief atomic check and lock for running the compaction
/// if this function returns true, it has acquired a write-lock on the
/// compactionBlockers structure, which the caller must free eventually
////////////////////////////////////////////////////////////////////////////////
static bool CheckAndLockCompaction (TRI_vocbase_t* vocbase) {
double now;
size_t i, n;
now = TRI_microtime();
// check if we can acquire the write lock instantly
if (! TryLockCompaction(vocbase)) {
// couldn't acquire the write lock
return false;
}
// we are now holding the write lock
// check if we have a still-valid compaction blocker
n = vocbase->_compactionBlockers._data._length;
for (i = 0; i < n; ++i) {
compaction_blocker_t* blocker = TRI_AtVector(&vocbase->_compactionBlockers._data, i);
if (blocker->_expires > now) {
// found a compaction blocker. unlock and return
UnlockCompaction(vocbase);
return false;
}
}
return true;
}
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
@ -998,97 +1069,287 @@ static bool CompactifyDocumentCollection (TRI_document_collection_t* document) {
/// @{
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @brief initialise the compaction blockers structure
////////////////////////////////////////////////////////////////////////////////
int TRI_InitCompactorVocBase (TRI_vocbase_t* vocbase) {
TRI_InitReadWriteLock(&vocbase->_compactionBlockers._lock);
TRI_InitVector(&vocbase->_compactionBlockers._data, TRI_UNKNOWN_MEM_ZONE, sizeof(compaction_blocker_t));
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief destroy the compaction blockers structure
////////////////////////////////////////////////////////////////////////////////
void TRI_DestroyCompactorVocBase (TRI_vocbase_t* vocbase) {
TRI_DestroyVector(&vocbase->_compactionBlockers._data);
TRI_DestroyReadWriteLock(&vocbase->_compactionBlockers._lock);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief remove data of expired compaction blockers
////////////////////////////////////////////////////////////////////////////////
bool TRI_CleanupCompactorVocBase (TRI_vocbase_t* vocbase) {
double now;
size_t i, n;
now = TRI_microtime();
// check if we can instantly acquire the lock
if (! TryLockCompaction(vocbase)) {
// couldn't acquire lock
return false;
}
// we are now holding the write lock
n = vocbase->_compactionBlockers._data._length;
i = 0;
while (i < n) {
compaction_blocker_t* blocker = TRI_AtVector(&vocbase->_compactionBlockers._data, i);
if (blocker->_expires < now) {
TRI_RemoveVector(&vocbase->_compactionBlockers._data, i);
n--;
}
else {
i++;
}
}
UnlockCompaction(vocbase);
return true;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief insert a compaction blocker
////////////////////////////////////////////////////////////////////////////////
int TRI_InsertBlockerCompactorVocBase (TRI_vocbase_t* vocbase,
double lifetime,
TRI_voc_tick_t* id) {
compaction_blocker_t blocker;
int res;
if (lifetime <= 0.0) {
return TRI_ERROR_BAD_PARAMETER;
}
blocker._id = TRI_NewTickVocBase();
blocker._expires = TRI_microtime() + lifetime;
LockCompaction(vocbase);
res = TRI_PushBackVector(&vocbase->_compactionBlockers._data, &blocker);
UnlockCompaction(vocbase);
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
*id = blocker._id;
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief touch an existing compaction blocker
////////////////////////////////////////////////////////////////////////////////
int TRI_TouchBlockerCompactorVocBase (TRI_vocbase_t* vocbase,
TRI_voc_tick_t id,
double lifetime) {
size_t i, n;
bool found;
found = false;
if (lifetime <= 0.0) {
return TRI_ERROR_BAD_PARAMETER;
}
LockCompaction(vocbase);
n = vocbase->_compactionBlockers._data._length;
for (i = 0; i < n; ++i) {
compaction_blocker_t* blocker = TRI_AtVector(&vocbase->_compactionBlockers._data, i);
if (blocker->_id == id) {
blocker->_expires = TRI_microtime() + lifetime;
found = true;
break;
}
}
UnlockCompaction(vocbase);
if (! found) {
return TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND;
}
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief atomically check-and-lock the compactor
/// if the function returns true, then a write-lock on the compactor was
/// acquired, which must eventually be freed by the caller
////////////////////////////////////////////////////////////////////////////////
bool TRI_CheckAndLockCompactorVocBase (TRI_vocbase_t* vocbase) {
return TryLockCompaction(vocbase);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief unlock the compactor
////////////////////////////////////////////////////////////////////////////////
void TRI_UnlockCompactorVocBase (TRI_vocbase_t* vocbase) {
UnlockCompaction(vocbase);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief remove an existing compaction blocker
////////////////////////////////////////////////////////////////////////////////
int TRI_RemoveBlockerCompactorVocBase (TRI_vocbase_t* vocbase,
TRI_voc_tick_t id) {
size_t i, n;
bool found;
found = false;
LockCompaction(vocbase);
n = vocbase->_compactionBlockers._data._length;
for (i = 0; i < n; ++i) {
compaction_blocker_t* blocker = TRI_AtVector(&vocbase->_compactionBlockers._data, i);
if (blocker->_id == id) {
TRI_RemoveVector(&vocbase->_compactionBlockers._data, i);
found = true;
break;
}
}
UnlockCompaction(vocbase);
if (! found) {
return TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND;
}
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief compactor event loop
////////////////////////////////////////////////////////////////////////////////
void TRI_CompactorVocBase (void* data) {
TRI_vocbase_t* vocbase = data;
TRI_vocbase_t* vocbase;
TRI_vector_pointer_t collections;
vocbase = data;
assert(vocbase->_state == 1);
TRI_InitVectorPointer(&collections, TRI_UNKNOWN_MEM_ZONE);
while (true) {
TRI_col_type_e type;
size_t i, n;
int state;
bool worked;
// keep initial _state value as vocbase->_state might change during compaction loop
state = vocbase->_state;
// check if compaction is currently disallowed
if (CheckAndLockCompaction(vocbase)) {
// compaction is currently allowed
size_t i, n;
// copy all collections
TRI_READ_LOCK_COLLECTIONS_VOCBASE(vocbase);
TRI_CopyDataVectorPointer(&collections, &vocbase->_collections);
TRI_READ_UNLOCK_COLLECTIONS_VOCBASE(vocbase);
// copy all collections
TRI_READ_LOCK_COLLECTIONS_VOCBASE(vocbase);
TRI_CopyDataVectorPointer(&collections, &vocbase->_collections);
TRI_READ_UNLOCK_COLLECTIONS_VOCBASE(vocbase);
n = collections._length;
n = collections._length;
for (i = 0; i < n; ++i) {
TRI_vocbase_col_t* collection;
TRI_primary_collection_t* primary;
bool doCompact;
for (i = 0; i < n; ++i) {
TRI_vocbase_col_t* collection;
TRI_primary_collection_t* primary;
TRI_col_type_e type;
bool doCompact;
bool worked;
collection = collections._buffer[i];
collection = collections._buffer[i];
if (! TRI_TRY_READ_LOCK_STATUS_VOCBASE_COL(collection)) {
// if we can't acquire the read lock instantly, we continue directly
// we don't want to stall here for too long
continue;
}
if (! TRI_TRY_READ_LOCK_STATUS_VOCBASE_COL(collection)) {
// if we can't acquire the read lock instantly, we continue directly
// we don't want to stall here for too long
continue;
}
primary = collection->_collection;
primary = collection->_collection;
if (primary == NULL) {
TRI_READ_UNLOCK_STATUS_VOCBASE_COL(collection);
continue;
}
if (primary == NULL) {
TRI_READ_UNLOCK_STATUS_VOCBASE_COL(collection);
continue;
}
worked = false;
doCompact = primary->base._info._doCompact;
type = primary->base._info._type;
worked = false;
doCompact = primary->base._info._doCompact;
type = primary->base._info._type;
// for document collection, compactify datafiles
if (TRI_IS_DOCUMENT_COLLECTION(type)) {
if (collection->_status == TRI_VOC_COL_STATUS_LOADED && doCompact) {
TRI_barrier_t* ce;
// check whether someone else holds a read-lock on the compaction lock
if (! TRI_TryWriteLockReadWriteLock(&primary->_compactionLock)) {
// someone else is holding the compactor lock, we'll not compact
TRI_READ_UNLOCK_STATUS_VOCBASE_COL(collection);
continue;
}
// for document collection, compactify datafiles
if (TRI_IS_DOCUMENT_COLLECTION(type)) {
if (collection->_status == TRI_VOC_COL_STATUS_LOADED && doCompact) {
TRI_barrier_t* ce;
ce = TRI_CreateBarrierCompaction(&primary->_barrierList);
if (ce == NULL) {
// out of memory
LOG_WARNING("out of memory when trying to create a barrier element");
}
else {
worked = CompactifyDocumentCollection((TRI_document_collection_t*) primary);
TRI_FreeBarrier(ce);
}
// check whether someone else holds a read-lock on the compaction lock
if (! TRI_TryWriteLockReadWriteLock(&primary->_compactionLock)) {
// someone else is holding the compactor lock, we'll not compact
TRI_READ_UNLOCK_STATUS_VOCBASE_COL(collection);
continue;
// read-unlock the compaction lock
TRI_WriteUnlockReadWriteLock(&primary->_compactionLock);
}
}
ce = TRI_CreateBarrierCompaction(&primary->_barrierList);
TRI_READ_UNLOCK_STATUS_VOCBASE_COL(collection);
if (ce == NULL) {
// out of memory
LOG_WARNING("out of memory when trying to create a barrier element");
}
else {
worked = CompactifyDocumentCollection((TRI_document_collection_t*) primary);
TRI_FreeBarrier(ce);
}
// read-unlock the compaction lock
TRI_WriteUnlockReadWriteLock(&primary->_compactionLock);
if (worked) {
// signal the cleanup thread that we worked and that it can now wake up
TRI_LockCondition(&vocbase->_cleanupCondition);
TRI_SignalCondition(&vocbase->_cleanupCondition);
TRI_UnlockCondition(&vocbase->_cleanupCondition);
}
}
TRI_READ_UNLOCK_STATUS_VOCBASE_COL(collection);
if (worked) {
// signal the cleanup thread that we worked and that it can now wake up
TRI_LockCondition(&vocbase->_cleanupCondition);
TRI_SignalCondition(&vocbase->_cleanupCondition);
TRI_UnlockCondition(&vocbase->_cleanupCondition);
}
UnlockCompaction(vocbase);
}
if (vocbase->_state == 1) {
// only sleep while server is still running
usleep(COMPACTOR_INTERVAL);

View File

@ -30,10 +30,14 @@
#include "BasicsC/common.h"
#include "VocBase/voc-types.h"
#ifdef __cplusplus
extern "C" {
#endif
struct TRI_vocbase_s;
// -----------------------------------------------------------------------------
// --SECTION-- public functions
// -----------------------------------------------------------------------------
@ -43,6 +47,69 @@ extern "C" {
/// @{
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @brief initialise the compaction blockers structure
////////////////////////////////////////////////////////////////////////////////
int TRI_InitCompactorVocBase (struct TRI_vocbase_s*);
////////////////////////////////////////////////////////////////////////////////
/// @brief destroy the compaction blockers structure
////////////////////////////////////////////////////////////////////////////////
void TRI_DestroyCompactorVocBase (struct TRI_vocbase_s*);
////////////////////////////////////////////////////////////////////////////////
/// @brief atomic check and lock for running the compaction
/// if this function returns true, it has acquired a write-lock on the
/// compactionBlockers structure
////////////////////////////////////////////////////////////////////////////////
bool TRI_CheckAndLockCompactorVocBase (struct TRI_vocbase_s*);
////////////////////////////////////////////////////////////////////////////////
/// @brief remove data of expired compaction blockers
////////////////////////////////////////////////////////////////////////////////
bool TRI_CleanupCompactorVocBase (struct TRI_vocbase_s*);
////////////////////////////////////////////////////////////////////////////////
/// @brief insert a compaction blocker
////////////////////////////////////////////////////////////////////////////////
int TRI_InsertBlockerCompactorVocBase (struct TRI_vocbase_s*,
double,
TRI_voc_tick_t*);
////////////////////////////////////////////////////////////////////////////////
/// @brief touch an existing compaction blocker
////////////////////////////////////////////////////////////////////////////////
int TRI_TouchBlockerCompactorVocBase (struct TRI_vocbase_s*,
TRI_voc_tick_t,
double);
////////////////////////////////////////////////////////////////////////////////
/// @brief remove an existing compaction blocker
////////////////////////////////////////////////////////////////////////////////
int TRI_RemoveBlockerCompactorVocBase (struct TRI_vocbase_s*,
TRI_voc_tick_t);
////////////////////////////////////////////////////////////////////////////////
/// @brief atomically check-and-lock the compactor
/// if the function returns true, then a write-lock on the compactor was
/// acquired, which must eventually be freed by the caller
////////////////////////////////////////////////////////////////////////////////
bool TRI_CheckAndLockCompactorVocBase (struct TRI_vocbase_s*);
////////////////////////////////////////////////////////////////////////////////
/// @brief unlock the compactor
////////////////////////////////////////////////////////////////////////////////
void TRI_UnlockCompactorVocBase (struct TRI_vocbase_s*);
////////////////////////////////////////////////////////////////////////////////
/// @brief compactor event loop
////////////////////////////////////////////////////////////////////////////////

View File

@ -1542,7 +1542,13 @@ TRI_vocbase_t* TRI_OpenVocBase (char const* path,
ApplyDefaults(vocbase, defaults);
// init AQL functions
vocbase->_functions = TRI_InitialiseFunctionsAql();
vocbase->_functions = TRI_CreateFunctionsAql();
if (vocbase->_functions == NULL) {
LOG_FATAL_AND_EXIT("cannot create AQL functions");
}
TRI_InitCompactorVocBase(vocbase);
// init collections
TRI_InitVectorPointer(&vocbase->_collections, TRI_UNKNOWN_MEM_ZONE);
@ -1632,6 +1638,8 @@ TRI_vocbase_t* TRI_OpenVocBase (char const* path,
res = ScanPath(vocbase, vocbase->_path, iterateMarkers);
if (res != TRI_ERROR_NO_ERROR) {
TRI_FreeFunctionsAql(vocbase->_functions);
TRI_DestroyCompactorVocBase(vocbase);
TRI_DestroyAssociativePointer(&vocbase->_collectionsByName);
TRI_DestroyAssociativePointer(&vocbase->_collectionsById);
TRI_DestroyVectorPointer(&vocbase->_collections);
@ -1819,6 +1827,8 @@ void TRI_DestroyVocBase (TRI_vocbase_t* vocbase) {
TRI_DestroyVectorPointer(&vocbase->_collections);
TRI_DestroyVectorPointer(&vocbase->_deadCollections);
TRI_DestroyCompactorVocBase(vocbase);
// free AQL functions
TRI_FreeFunctionsAql(vocbase->_functions);

View File

@ -374,6 +374,12 @@ typedef struct TRI_vocbase_s {
struct TRI_shadow_store_s* _cursors;
TRI_associative_pointer_t* _functions;
struct {
TRI_read_write_lock_t _lock;
TRI_vector_t _data;
}
_compactionBlockers;
TRI_condition_t _cleanupCondition;
TRI_condition_t _syncWaitersCondition;
int64_t _syncWaiters;