1
0
Fork 0

removed all replication conditionals

This commit is contained in:
Jan Steemann 2013-07-30 13:16:37 +02:00
parent 6b5900e8e0
commit 002ebd48bc
42 changed files with 3132 additions and 2946 deletions

View File

@ -35,7 +35,6 @@ unittests-brief: \
unittests-http-server \
unittests-ssl-server \
unittests-shell-client \
unittests-replication \
unittests-arangob \
unittests-import \
unittests-upgrade \
@ -306,13 +305,6 @@ SHELL_COMMON = \
@top_srcdir@/js/common/tests/shell-fulltext.js \
@top_srcdir@/js/common/tests/shell-graph.js
if ENABLE_REPLICATION
SHELL_COMMON += \
@top_srcdir@/js/common/tests/shell-replication.js
endif
################################################################################
### @brief SHELL SERVER TESTS (BASICS)
################################################################################
@ -570,9 +562,22 @@ unittests-import:
.PHONY: unittests-replication unittests-replication-data unittests-replication-logger
if ENABLE_REPLICATION
unittests-replication: unittests-replication-common unittests-replication-data unittests-replication-logger
unittests-replication: unittests-replication-data unittests-replication-logger
unittests-replication-common:
@echo
@echo "================================================================================"
@echo "<< REPLICATION COMMON TESTS >>"
@echo "================================================================================"
@echo
@rm -rf "$(VOCDIR)"
@mkdir "$(VOCDIR)"
$(VALGRIND) @builddir@/bin/arangod "$(VOCDIR)" $(SERVER_OPT) --javascript.unit-tests @top_srcdir@/js/common/tests/shell-replication.js || test "x$(FORCE)" == "x1"
@rm -rf "$(VOCDIR)"
@echo
unittests-replication-data:
$(MAKE) start-server PID=$(PID) SERVER_START="--server.endpoint tcp://$(VOCHOST):$(VOCPORT) --server.disable-authentication true" PROTO=http
@ -612,24 +617,6 @@ unittests-replication-logger:
@rm -rf "$(VOCDIR)"
@echo
else
unittests-replication:
@echo
@echo "================================================================================"
@echo "<< REPLICATION TESTS >>"
@echo "================================================================================"
@echo
@echo "to enable replication tests, configure with --enable-replication"
@echo
unittests-replication-data:
unittests-replication-logger:
endif
################################################################################
### @brief UPGRADE TESTS
###

View File

@ -62,6 +62,10 @@ bin_arangod_SOURCES = \
arangod/IndexOperators/index-operator.c \
arangod/PriorityQueue/pqueueindex.c \
arangod/PriorityQueue/priorityqueue.c \
arangod/Replication/replication-static.cpp \
arangod/Replication/ContinuousSyncer.cpp \
arangod/Replication/InitialSyncer.cpp \
arangod/Replication/Syncer.cpp \
arangod/RestHandler/RestBatchHandler.cpp \
arangod/RestHandler/RestDocumentHandler.cpp \
arangod/RestHandler/RestEdgeHandler.cpp \
@ -122,13 +126,6 @@ bin_arangod_SOURCES += \
arangod/MRServer/mr-actions.cpp
endif
if ENABLE_REPLICATION
bin_arangod_SOURCES += \
arangod/Replication/ReplicationFetcher.cpp \
arangod/Replication/replication-static.cpp
endif
################################################################################
## --SECTION-- SCANNER & PARSER
################################################################################

View File

@ -0,0 +1,946 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief replication continuous data synchroniser
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2004-2013 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is triAGENS GmbH, Cologne, Germany
///
/// @author Jan Steemann
/// @author Copyright 2013, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#include "ContinuousSyncer.h"
#include "BasicsC/json.h"
#include "Basics/JsonHelper.h"
#include "Rest/HttpRequest.h"
#include "Rest/SslInterface.h"
#include "SimpleHttpClient/GeneralClientConnection.h"
#include "SimpleHttpClient/SimpleHttpClient.h"
#include "SimpleHttpClient/SimpleHttpResult.h"
#include "VocBase/document-collection.h"
#include "VocBase/transaction.h"
#include "VocBase/vocbase.h"
#include "VocBase/voc-types.h"
using namespace std;
using namespace triagens::basics;
using namespace triagens::rest;
using namespace triagens::arango;
using namespace triagens::httpclient;
// -----------------------------------------------------------------------------
// --SECTION-- constructors and destructors
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup Replication
/// @{
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @brief constructor
////////////////////////////////////////////////////////////////////////////////
ContinuousSyncer::ContinuousSyncer (TRI_vocbase_t* vocbase,
TRI_replication_applier_configuration_t const* configuration) :
Syncer(vocbase, configuration),
_applier(vocbase->_replicationApplier),
_transactionState() {
_transactionState._trx = 0;
_transactionState._externalTid = 0;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief destructor
////////////////////////////////////////////////////////////////////////////////
ContinuousSyncer::~ContinuousSyncer () {
abortOngoingTransaction();
}
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- public methods
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup Replication
/// @{
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @brief run method, performs continuous synchronisation
////////////////////////////////////////////////////////////////////////////////
int ContinuousSyncer::run () {
if (_client == 0 || _connection == 0 || _endpoint == 0) {
return TRI_ERROR_INTERNAL;
}
string errorMsg;
int res = getMasterState(errorMsg);
if (res == TRI_ERROR_NO_ERROR) {
TRI_WriteLockReadWriteLock(&_applier->_statusLock);
res = getLocalState(errorMsg);
TRI_WriteUnlockReadWriteLock(&_applier->_statusLock);
}
if (res != TRI_ERROR_NO_ERROR) {
return TRI_SetErrorReplicationApplier(_applier, res, errorMsg.c_str());
}
if (res == TRI_ERROR_NO_ERROR) {
res = runContinuousSync(errorMsg);
}
if (res != TRI_ERROR_NO_ERROR) {
TRI_SetErrorReplicationApplier(_applier, res, errorMsg.c_str());
// stop ourselves
TRI_StopReplicationApplier(_applier, false);
return res;
}
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- private methods
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup Replication
/// @{
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @brief set the applier progress
////////////////////////////////////////////////////////////////////////////////
void ContinuousSyncer::setProgress (char const* msg) {
TRI_SetProgressReplicationApplier(_applier, msg, true);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief save the current applier state
////////////////////////////////////////////////////////////////////////////////
int ContinuousSyncer::saveApplierState () {
LOGGER_TRACE("saving replication applier state. "
"last applied continuous tick: " << _applier->_state._lastAppliedContinuousTick);
int res = TRI_SaveStateReplicationApplier(_vocbase, &_applier->_state, false);
if (res != TRI_ERROR_NO_ERROR) {
LOGGER_WARNING("unable to save replication applier state: " << TRI_errno_string(res));
}
return res;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief get local replication apply state
////////////////////////////////////////////////////////////////////////////////
int ContinuousSyncer::getLocalState (string& errorMsg) {
int res;
res = TRI_LoadStateReplicationApplier(_vocbase, &_applier->_state);
_applier->_state._active = true;
if (res == TRI_ERROR_FILE_NOT_FOUND) {
// no state file found, so this is the initialisation
_applier->_state._serverId = _masterInfo._serverId;
res = TRI_SaveStateReplicationApplier(_vocbase, &_applier->_state, true);
if (res != TRI_ERROR_NO_ERROR) {
errorMsg = "could not save replication state information";
}
}
else if (res == TRI_ERROR_NO_ERROR) {
if (_masterInfo._serverId != _applier->_state._serverId &&
_applier->_state._serverId != 0) {
res = TRI_ERROR_REPLICATION_MASTER_CHANGE;
errorMsg = "encountered wrong master id in replication state file. "
"found: " + StringUtils::itoa(_masterInfo._serverId) + ", "
"expected: " + StringUtils::itoa(_applier->_state._serverId);
}
}
else {
// some error occurred
assert(res != TRI_ERROR_NO_ERROR);
errorMsg = TRI_errno_string(res);
}
return res;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief abort any ongoing transaction
////////////////////////////////////////////////////////////////////////////////
void ContinuousSyncer::abortOngoingTransaction () {
if (_transactionState._trx != 0) {
LOGGER_DEBUG("aborting replication transaction " << _transactionState._externalTid);
TRI_FreeTransaction(_transactionState._trx);
_transactionState._trx = 0;
_transactionState._externalTid = 0;
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief creates a transaction for a single operation
////////////////////////////////////////////////////////////////////////////////
TRI_transaction_t* ContinuousSyncer::createSingleOperationTransaction (TRI_voc_cid_t cid,
int* result) {
TRI_transaction_t* trx = TRI_CreateTransaction(_vocbase->_transactionContext,
_masterInfo._serverId,
false,
0.0,
false);
if (trx == 0) {
*result = TRI_ERROR_OUT_OF_MEMORY;
return 0;
}
int res = TRI_AddCollectionTransaction(trx, cid, TRI_TRANSACTION_WRITE, TRI_TRANSACTION_TOP_LEVEL);
if (res != TRI_ERROR_NO_ERROR) {
TRI_FreeTransaction(trx);
*result = res;
return 0;
}
res = TRI_BeginTransaction(trx, getHint(1), TRI_TRANSACTION_TOP_LEVEL);
if (res != TRI_ERROR_NO_ERROR) {
TRI_FreeTransaction(trx);
*result = res;
return 0;
}
*result = TRI_ERROR_NO_ERROR;
return trx;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief inserts a document, based on the JSON provided
////////////////////////////////////////////////////////////////////////////////
int ContinuousSyncer::processDocument (TRI_replication_operation_e type,
TRI_json_t const* json,
bool& updateTick,
string& errorMsg) {
updateTick = false;
// extract "cid"
TRI_voc_cid_t cid = getCid(json);
if (cid == 0) {
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
// extract "key"
TRI_json_t const* keyJson = JsonHelper::getArrayElement(json, "key");
if (! JsonHelper::isString(keyJson)) {
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
// extract "rev"
TRI_voc_rid_t rid;
const string ridString = JsonHelper::getStringValue(json, "rev", "");
if (ridString.empty()) {
rid = 0;
}
else {
rid = StringUtils::uint64(ridString.c_str(), ridString.size());
}
// extract "data"
TRI_json_t const* doc = JsonHelper::getArrayElement(json, "data");
// extract "tid"
const string id = JsonHelper::getStringValue(json, "tid", "");
TRI_voc_tid_t tid;
if (id.empty()) {
// standalone operation
tid = 0;
}
else {
// operation is part of a transaction
tid = (TRI_voc_tid_t) StringUtils::uint64(id.c_str(), id.size());
}
if (tid != _transactionState._externalTid) {
// unexpected transaction id
abortOngoingTransaction();
if (tid > 0) {
// transactional operation but no transaction for it
return TRI_ERROR_REPLICATION_UNEXPECTED_TRANSACTION;
}
// continue and apply standalone operations
}
if (_transactionState._trx != 0) {
// transactional operation
assert(tid > 0);
TRI_transaction_collection_t* trxCollection = TRI_GetCollectionTransaction(_transactionState._trx, cid, TRI_TRANSACTION_WRITE);
if (trxCollection == 0) {
return TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND;
}
int res = applyCollectionDumpMarker(trxCollection,
type,
(const TRI_voc_key_t) keyJson->_value._string.data,
rid,
doc,
errorMsg);
return res;
}
else {
// standalone operation
assert(tid == 0);
// update the apply tick for all standalone operations
updateTick = true;
int res;
TRI_transaction_t* trx = createSingleOperationTransaction(cid, &res);
if (trx == 0) {
errorMsg = "unable to create replication transaction: " + string(TRI_errno_string(res));
return res;
}
TRI_transaction_collection_t* trxCollection = TRI_GetCollectionTransaction(trx, cid, TRI_TRANSACTION_WRITE);
if (trxCollection == 0) {
return TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND;
}
res = applyCollectionDumpMarker(trxCollection,
type,
(const TRI_voc_key_t) keyJson->_value._string.data,
rid,
doc,
errorMsg);
if (res == TRI_ERROR_NO_ERROR) {
TRI_CommitTransaction(trx, TRI_TRANSACTION_TOP_LEVEL);
}
else {
TRI_AbortTransaction(trx, TRI_TRANSACTION_TOP_LEVEL);
}
TRI_FreeTransaction(trx);
return res;
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief starts a transaction, based on the JSON provided
////////////////////////////////////////////////////////////////////////////////
int ContinuousSyncer::startTransaction (TRI_json_t const* json) {
// {"type":2200,"tid":"230920705812199","collections":[{"cid":"230920700700391","operations":10}]}
abortOngoingTransaction();
const string id = JsonHelper::getStringValue(json, "tid", "");
if (id.empty()) {
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
// transaction id
TRI_voc_tid_t tid = (TRI_voc_tid_t) StringUtils::uint64(id.c_str(), id.size());
TRI_json_t const* collections = JsonHelper::getArrayElement(json, "collections");
if (! JsonHelper::isList(collections)) {
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
LOGGER_TRACE("starting replication transaction " << tid);
TRI_transaction_t* trx = TRI_CreateTransaction(_vocbase->_transactionContext,
_masterInfo._serverId,
false,
0.0,
false);
if (trx == 0) {
return TRI_ERROR_OUT_OF_MEMORY;
}
int res;
uint64_t totalOperations = 0;
const size_t n = collections->_value._objects._length;
for (size_t i = 0; i < n; ++i) {
TRI_json_t const* collection = (TRI_json_t const*) TRI_AtVector(&collections->_value._objects, i);
if (! JsonHelper::isArray(collection)) {
TRI_FreeTransaction(trx);
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
TRI_voc_cid_t cid = getCid(collection);
if (cid == 0) {
TRI_FreeTransaction(trx);
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
uint64_t numOperations = (uint64_t) JsonHelper::getNumberValue(collection, "operations", 0.0);
if (numOperations > 0) {
res = TRI_AddCollectionTransaction(trx, cid, TRI_TRANSACTION_WRITE, TRI_TRANSACTION_TOP_LEVEL);
if (res != TRI_ERROR_NO_ERROR) {
TRI_FreeTransaction(trx);
return res;
}
totalOperations += numOperations;
}
}
res = TRI_BeginTransaction(trx, getHint(totalOperations), TRI_TRANSACTION_TOP_LEVEL);
if (res != TRI_ERROR_NO_ERROR) {
TRI_FreeTransaction(trx);
return res;
}
_transactionState._trx = trx;
_transactionState._externalTid = tid;
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief commits a transaction, based on the JSON provided
////////////////////////////////////////////////////////////////////////////////
int ContinuousSyncer::commitTransaction (TRI_json_t const* json) {
// {"type":2201,"tid":"230920705812199","collections":[{"cid":"230920700700391","operations":10}]}
const string id = JsonHelper::getStringValue(json, "tid", "");
if (id.empty()) {
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
// transaction id
const TRI_voc_tid_t tid = (TRI_voc_tid_t) StringUtils::uint64(id.c_str(), id.size());
if (_transactionState._trx == 0) {
// invalid state, no transaction was started. TODO: fix error number
return TRI_ERROR_INTERNAL;
}
if (_transactionState._externalTid != tid) {
// unexpected transaction id. TODO: fix error number
abortOngoingTransaction();
return TRI_ERROR_INTERNAL;
}
LOGGER_TRACE("committing replication transaction " << tid);
int res = TRI_CommitTransaction(_transactionState._trx, TRI_TRANSACTION_TOP_LEVEL);
TRI_FreeTransaction(_transactionState._trx);
_transactionState._trx = 0;
_transactionState._externalTid = 0;
return res;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief renames a collection, based on the JSON provided
////////////////////////////////////////////////////////////////////////////////
int ContinuousSyncer::renameCollection (TRI_json_t const* json) {
const TRI_voc_cid_t cid = getCid(json);
if (cid == 0) {
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
TRI_json_t const* collectionJson = TRI_LookupArrayJson(json, "collection");
const string name = JsonHelper::getStringValue(collectionJson, "name", "");
if (name.empty()) {
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
TRI_vocbase_col_t* col = TRI_LookupCollectionByIdVocBase(_vocbase, cid);
if (col == 0) {
return TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND;
}
return TRI_RenameCollectionVocBase(_vocbase, col, name.c_str(), _masterInfo._serverId);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief apply a single marker from the continuous log
////////////////////////////////////////////////////////////////////////////////
int ContinuousSyncer::applyLogMarker (TRI_json_t const* json,
bool& updateTick,
string& errorMsg) {
static const string invalidMsg = "received invalid JSON data";
updateTick = false;
// check data
if (! JsonHelper::isArray(json)) {
errorMsg = invalidMsg;
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
// fetch marker "type"
int typeValue = (int) JsonHelper::getIntValue(json, "type", 0);
// fetch "tick"
const string tick = JsonHelper::getStringValue(json, "tick", "");
if (! tick.empty()) {
TRI_voc_tick_t newTick = (TRI_voc_tick_t) StringUtils::uint64(tick.c_str(), tick.size());
TRI_WriteLockReadWriteLock(&_applier->_statusLock);
if (newTick > _applier->_state._lastProcessedContinuousTick) {
_applier->_state._lastProcessedContinuousTick = newTick;
}
else {
LOGGER_WARNING("replication marker tick value " << newTick <<
" is lower than last processed tick value " <<
_applier->_state._lastProcessedContinuousTick);
}
TRI_WriteUnlockReadWriteLock(&_applier->_statusLock);
}
// handle marker type
TRI_replication_operation_e type = (TRI_replication_operation_e) typeValue;
if (type == MARKER_DOCUMENT || type == MARKER_EDGE || type == MARKER_REMOVE) {
return processDocument(type, json, updateTick, errorMsg);
}
else if (type == TRI_TRANSACTION_START) {
updateTick = false;
return startTransaction(json);
}
else if (type == TRI_TRANSACTION_COMMIT) {
updateTick = true;
return commitTransaction(json);
}
else if (type == COLLECTION_CREATE) {
TRI_json_t const* collectionJson = TRI_LookupArrayJson(json, "collection");
updateTick = true;
return createCollection(collectionJson, 0);
}
else if (type == COLLECTION_DROP) {
updateTick = true;
return dropCollection(json);
}
else if (type == COLLECTION_RENAME) {
updateTick = true;
return renameCollection(json);
}
else if (type == INDEX_CREATE) {
updateTick = true;
return createIndex(json);
}
else if (type == INDEX_DROP) {
updateTick = true;
return dropIndex(json);
}
else if (type == REPLICATION_STOP) {
abortOngoingTransaction();
updateTick = true;
return TRI_ERROR_NO_ERROR;
}
else if (type == REPLICATION_START) {
abortOngoingTransaction();
updateTick = true;
return TRI_ERROR_NO_ERROR;
}
else {
errorMsg = "unexpected marker type " + StringUtils::itoa(type);
updateTick = true;
return TRI_ERROR_REPLICATION_UNEXPECTED_MARKER;
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief apply the data from the continuous log
////////////////////////////////////////////////////////////////////////////////
int ContinuousSyncer::applyLog (SimpleHttpResult* response,
string& errorMsg,
uint64_t& ignoreCount) {
std::stringstream& data = response->getBody();
while (true) {
string line;
std::getline(data, line, '\n');
if (line.size() < 2) {
// we are done
return TRI_ERROR_NO_ERROR;
}
TRI_json_t* json = TRI_JsonString(TRI_CORE_MEM_ZONE, line.c_str());
bool updateTick;
int res = applyLogMarker(json, updateTick, errorMsg);
TRI_FreeJson(TRI_CORE_MEM_ZONE, json);
if (res == TRI_ERROR_NO_ERROR) {
// apply ok
}
else {
// apply error
if (errorMsg.empty()) {
// don't overwrite previous error message
errorMsg = TRI_errno_string(res);
}
if (ignoreCount == 0) {
if (line.size() > 128) {
errorMsg += ", offending marker: " + line.substr(128) + "...";
}
else {
errorMsg += ", offending marker: " + line;;
}
return res;
}
else {
ignoreCount--;
LOGGER_WARNING("ignoring replication error: " << errorMsg);
errorMsg = "";
}
}
if (updateTick) {
// update tick value
TRI_WriteLockReadWriteLock(&_applier->_statusLock);
if (_applier->_state._lastProcessedContinuousTick > _applier->_state._lastAppliedContinuousTick) {
_applier->_state._lastAppliedContinuousTick = _applier->_state._lastProcessedContinuousTick;
}
TRI_WriteUnlockReadWriteLock(&_applier->_statusLock);
}
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief perform a continuous sync with the master
////////////////////////////////////////////////////////////////////////////////
int ContinuousSyncer::runContinuousSync (string& errorMsg) {
int connectRetries = 0;
uint64_t inactiveCycles = 0;
int res = TRI_ERROR_INTERNAL;
while (1) {
bool worked = false;
bool masterActive = false;
res = followMasterLog(errorMsg, _configuration._ignoreErrors, worked, masterActive);
uint64_t sleepTime;
if (res == TRI_ERROR_REPLICATION_NO_RESPONSE ||
res == TRI_ERROR_REPLICATION_MASTER_ERROR) {
// master error. try again after a sleep period
sleepTime = 30 * 1000 * 1000;
connectRetries++;
if (connectRetries > _configuration._maxConnectRetries) {
// stop ourselves
return res;
}
}
else {
connectRetries = 0;
if (res != TRI_ERROR_NO_ERROR) {
// some other error we will not ignore
return res;
}
else {
// no error
if (worked) {
// we have done something, so we won't sleep (but check for cancellation)
inactiveCycles = 0;
sleepTime = 0;
}
else {
if (masterActive) {
sleepTime = 500 * 1000;
}
else {
sleepTime = 5 * 1000 * 1000;
}
if (_configuration._adaptivePolling) {
inactiveCycles++;
if (inactiveCycles > 60) {
sleepTime *= 5;
}
else if (inactiveCycles > 30) {
sleepTime *= 3;
}
if (inactiveCycles > 15) {
sleepTime *= 2;
}
}
}
}
}
// this will make the applier thread sleep if there is nothing to do,
// but will also check for cancellation
if (! TRI_WaitReplicationApplier(_applier, sleepTime)) {
return TRI_ERROR_REPLICATION_STOPPED;
}
}
return res;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief run the continuous synchronisation
////////////////////////////////////////////////////////////////////////////////
int ContinuousSyncer::followMasterLog (string& errorMsg,
uint64_t& ignoreCount,
bool& worked,
bool& masterActive) {
const string baseUrl = BaseUrl +
"/logger-follow?chunkSize=" + StringUtils::itoa(getChunkSize());
map<string, string> headers;
// get start tick
// ---------------------------------------
// use tick from initial dump
TRI_ReadLockReadWriteLock(&_applier->_statusLock);
TRI_voc_tick_t fromTick = _applier->_state._lastAppliedInitialTick;
// if we already transferred some data, we'll use the last applied tick
if (_applier->_state._lastAppliedContinuousTick > fromTick) {
fromTick = _applier->_state._lastAppliedContinuousTick;
}
TRI_ReadUnlockReadWriteLock(&_applier->_statusLock);
LOGGER_TRACE("starting continuous replication with tick " << fromTick);
const string tickString = StringUtils::itoa(fromTick);
const string url = baseUrl +
"&from=" + tickString +
"&serverId=" + _localServerIdString;
// send request
const string progress = "fetching master log from offset " + tickString;
setProgress(progress.c_str());
SimpleHttpResult* response = _client->request(HttpRequest::HTTP_REQUEST_GET,
url,
0,
0,
headers);
if (response == 0 || ! response->isComplete()) {
errorMsg = "got invalid response from master at " + string(_masterInfo._endpoint) +
": " + _client->getErrorMessage();
if (response != 0) {
delete response;
}
return TRI_ERROR_REPLICATION_NO_RESPONSE;
}
if (response->wasHttpError()) {
errorMsg = "got invalid response from master at " + string(_masterInfo._endpoint) +
": HTTP " + StringUtils::itoa(response->getHttpReturnCode()) +
": " + response->getHttpReturnMessage();
delete response;
return TRI_ERROR_REPLICATION_MASTER_ERROR;
}
int res;
bool checkMore = false;
bool active = false;
TRI_voc_tick_t tick;
bool found;
string header = response->getHeaderField(TRI_REPLICATION_HEADER_CHECKMORE, found);
if (found) {
checkMore = StringUtils::boolean(header);
res = TRI_ERROR_NO_ERROR;
header = response->getHeaderField(TRI_REPLICATION_HEADER_ACTIVE, found);
if (found) {
active = StringUtils::boolean(header);
}
header = response->getHeaderField(TRI_REPLICATION_HEADER_LASTINCLUDED, found);
if (found) {
tick = StringUtils::uint64(header);
if (tick > fromTick) {
fromTick = tick;
}
else {
// we got the same tick again, this indicates we're at the end
checkMore = false;
}
header = response->getHeaderField(TRI_REPLICATION_HEADER_LASTTICK, found);
if (found) {
tick = StringUtils::uint64(header);
TRI_WriteLockReadWriteLock(&_applier->_statusLock);
_applier->_state._lastAvailableContinuousTick = tick;
TRI_WriteUnlockReadWriteLock(&_applier->_statusLock);
}
}
}
if (! found) {
res = TRI_ERROR_REPLICATION_INVALID_RESPONSE;
errorMsg = "got invalid response from master at " + string(_masterInfo._endpoint) +
": required header is missing";
}
if (res == TRI_ERROR_NO_ERROR) {
TRI_ReadLockReadWriteLock(&_applier->_statusLock);
TRI_voc_tick_t lastAppliedTick = _applier->_state._lastAppliedContinuousTick;
TRI_ReadUnlockReadWriteLock(&_applier->_statusLock);
res = applyLog(response, errorMsg, ignoreCount);
TRI_WriteLockReadWriteLock(&_applier->_statusLock);
if (_applier->_state._lastAppliedContinuousTick != lastAppliedTick) {
saveApplierState();
}
TRI_WriteUnlockReadWriteLock(&_applier->_statusLock);
}
delete response;
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
masterActive = active;
if (! checkMore || fromTick == 0) {
// nothing to do.
worked = false;
}
else {
worked = true;
}
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// Local Variables:
// mode: outline-minor
// outline-regexp: "/// @brief\\|/// {@inheritDoc}\\|/// @addtogroup\\|/// @page\\|// --SECTION--\\|/// @\\}"
// End:

View File

@ -0,0 +1,252 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief replication continuous data synchroniser
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2004-2013 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is triAGENS GmbH, Cologne, Germany
///
/// @author Jan Steemann
/// @author Copyright 2013, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#ifndef TRIAGENS_REPLICATION_CONTINUOUS_SYNCER_H
#define TRIAGENS_REPLICATION_CONTINUOUS_SYNCER_H 1
#include "Basics/Common.h"
#include "Replication/Syncer.h"
// -----------------------------------------------------------------------------
// --SECTION-- forward declarations
// -----------------------------------------------------------------------------
struct TRI_json_s;
struct TRI_replication_applier_s;
struct TRI_replication_applier_configuration_s;
struct TRI_vocbase_s;
namespace triagens {
namespace httpclient {
class SimpleHttpResult;
}
namespace arango {
// -----------------------------------------------------------------------------
// --SECTION-- ContinuousSyncer
// -----------------------------------------------------------------------------
class ContinuousSyncer : public Syncer {
// -----------------------------------------------------------------------------
// --SECTION-- constructors and destructors
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup Replication
/// @{
////////////////////////////////////////////////////////////////////////////////
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief constructor
////////////////////////////////////////////////////////////////////////////////
ContinuousSyncer (struct TRI_vocbase_s*,
struct TRI_replication_applier_configuration_s const*);
////////////////////////////////////////////////////////////////////////////////
/// @brief destructor
////////////////////////////////////////////////////////////////////////////////
~ContinuousSyncer ();
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- public methods
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup Replication
/// @{
////////////////////////////////////////////////////////////////////////////////
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief run method, performs continuous synchronisation
////////////////////////////////////////////////////////////////////////////////
int run ();
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- private methods
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup Replication
/// @{
////////////////////////////////////////////////////////////////////////////////
private:
////////////////////////////////////////////////////////////////////////////////
/// @brief set the applier progress
////////////////////////////////////////////////////////////////////////////////
void setProgress (char const*);
////////////////////////////////////////////////////////////////////////////////
/// @brief save the current applier state
////////////////////////////////////////////////////////////////////////////////
int saveApplierState ();
////////////////////////////////////////////////////////////////////////////////
/// @brief get local replication applier state
////////////////////////////////////////////////////////////////////////////////
int getLocalState (std::string&);
////////////////////////////////////////////////////////////////////////////////
/// @brief abort any ongoing transaction
////////////////////////////////////////////////////////////////////////////////
void abortOngoingTransaction ();
////////////////////////////////////////////////////////////////////////////////
/// @brief creates a transaction for a single operation
////////////////////////////////////////////////////////////////////////////////
struct TRI_transaction_s* createSingleOperationTransaction (TRI_voc_cid_t,
int*);
////////////////////////////////////////////////////////////////////////////////
/// @brief starts a transaction, based on the JSON provided
////////////////////////////////////////////////////////////////////////////////
int startTransaction (struct TRI_json_s const*);
////////////////////////////////////////////////////////////////////////////////
/// @brief commits a transaction, based on the JSON provided
////////////////////////////////////////////////////////////////////////////////
int commitTransaction (struct TRI_json_s const*);
////////////////////////////////////////////////////////////////////////////////
/// @brief process a document operation, based on the JSON provided
////////////////////////////////////////////////////////////////////////////////
int processDocument (TRI_replication_operation_e,
struct TRI_json_s const*,
bool&,
std::string&);
////////////////////////////////////////////////////////////////////////////////
/// @brief renames a collection, based on the JSON provided
////////////////////////////////////////////////////////////////////////////////
int renameCollection (struct TRI_json_s const*);
////////////////////////////////////////////////////////////////////////////////
/// @brief apply a single marker from the continuous log
////////////////////////////////////////////////////////////////////////////////
int applyLogMarker (struct TRI_json_s const*,
bool&,
std::string&);
////////////////////////////////////////////////////////////////////////////////
/// @brief apply the data from the continuous log
////////////////////////////////////////////////////////////////////////////////
int applyLog (httpclient::SimpleHttpResult*,
std::string&,
uint64_t&);
////////////////////////////////////////////////////////////////////////////////
/// @brief perform a continuous sync with the master
////////////////////////////////////////////////////////////////////////////////
int runContinuousSync (std::string&);
////////////////////////////////////////////////////////////////////////////////
/// @brief run the continuous synchronisation
////////////////////////////////////////////////////////////////////////////////
int followMasterLog (std::string&,
uint64_t&,
bool&,
bool&);
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- private variables
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup Replication
/// @{
////////////////////////////////////////////////////////////////////////////////
private:
////////////////////////////////////////////////////////////////////////////////
/// @brief pointer to the applier state
////////////////////////////////////////////////////////////////////////////////
struct TRI_replication_applier_s* _applier;
////////////////////////////////////////////////////////////////////////////////
/// @brief information about the local applier state
////////////////////////////////////////////////////////////////////////////////
struct {
struct TRI_transaction_s* _trx;
TRI_voc_tid_t _externalTid;
}
_transactionState;
};
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
}
}
#endif
// Local Variables:
// mode: outline-minor
// outline-regexp: "/// @brief\\|/// {@inheritDoc}\\|/// @addtogroup\\|/// @page\\|// --SECTION--\\|/// @\\}"
// End:

View File

@ -0,0 +1,748 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief replication initial data synchroniser
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2004-2013 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is triAGENS GmbH, Cologne, Germany
///
/// @author Jan Steemann
/// @author Copyright 2013, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#include "InitialSyncer.h"
#include "BasicsC/json.h"
#include "BasicsC/tri-strings.h"
#include "Basics/JsonHelper.h"
#include "Basics/StringUtils.h"
#include "Logger/Logger.h"
#include "SimpleHttpClient/SimpleHttpClient.h"
#include "SimpleHttpClient/SimpleHttpResult.h"
#include "VocBase/index.h"
#include "VocBase/document-collection.h"
#include "VocBase/transaction.h"
#include "VocBase/vocbase.h"
#include "VocBase/voc-types.h"
using namespace std;
using namespace triagens::basics;
using namespace triagens::arango;
using namespace triagens::httpclient;
using namespace triagens::rest;
// -----------------------------------------------------------------------------
// --SECTION-- constructors and destructors
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup Replication
/// @{
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @brief constructor
////////////////////////////////////////////////////////////////////////////////
InitialSyncer::InitialSyncer (TRI_vocbase_t* vocbase,
TRI_replication_applier_configuration_t const* configuration,
bool verbose) :
Syncer(vocbase, configuration),
_progress("not started"),
_numCollections(0),
_verbose(verbose) {
}
////////////////////////////////////////////////////////////////////////////////
/// @brief destructor
////////////////////////////////////////////////////////////////////////////////
InitialSyncer::~InitialSyncer () {
}
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- public methods
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup Replication
/// @{
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @brief run method, performs a full synchronisation
////////////////////////////////////////////////////////////////////////////////
int InitialSyncer::run (string& errorMsg) {
if (_client == 0 || _connection == 0 || _endpoint == 0) {
errorMsg = "invalid endpoint";
return TRI_ERROR_INTERNAL;
}
setProgress("fetching master state");
int res = getMasterState(errorMsg);
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
map<string, string> headers;
static const string url = BaseUrl +
"/inventory" +
"?serverId=" + _localServerIdString;
// send request
const string progress = "fetching master inventory from " + url;
setProgress(progress.c_str());
SimpleHttpResult* response = _client->request(HttpRequest::HTTP_REQUEST_GET,
url,
0,
0,
headers);
if (response == 0 || ! response->isComplete()) {
errorMsg = "could not connect to master at " + string(_masterInfo._endpoint) +
": " + _client->getErrorMessage();
if (response != 0) {
delete response;
}
return TRI_ERROR_REPLICATION_NO_RESPONSE;
}
res = TRI_ERROR_NO_ERROR;
if (response->wasHttpError()) {
res = TRI_ERROR_REPLICATION_MASTER_ERROR;
errorMsg = "got invalid response from master at " + string(_masterInfo._endpoint) +
": HTTP " + StringUtils::itoa(response->getHttpReturnCode()) +
": " + response->getHttpReturnMessage();
}
else {
TRI_json_t* json = TRI_JsonString(TRI_UNKNOWN_MEM_ZONE, response->getBody().str().c_str());
if (JsonHelper::isArray(json)) {
res = handleInventoryResponse(json, errorMsg);
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
}
else {
res = TRI_ERROR_REPLICATION_INVALID_RESPONSE;
errorMsg = "got invalid response from master at " + string(_masterInfo._endpoint) +
": invalid JSON";
}
}
delete response;
return res;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief return the last log tick of the master at start
////////////////////////////////////////////////////////////////////////////////
TRI_voc_tick_t InitialSyncer::getLastLogTick () const {
return _masterInfo._state._lastLogTick;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief return the number of collections synced
////////////////////////////////////////////////////////////////////////////////
uint32_t InitialSyncer::getNumCollections () const {
return _numCollections;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief comparator to sort collections
/// sort order is by collection type first (vertices before edges, this is
/// because edges depend on vertices being there), then name
////////////////////////////////////////////////////////////////////////////////
int InitialSyncer::sortCollections (const void* l, const void* r) {
TRI_json_t const* left = JsonHelper::getArrayElement((TRI_json_t const*) l, "parameters");
TRI_json_t const* right = JsonHelper::getArrayElement((TRI_json_t const*) r, "parameters");
int leftType = (int) JsonHelper::getIntValue(left, "type", (int) TRI_COL_TYPE_DOCUMENT);
int rightType = (int) JsonHelper::getIntValue(right, "type", (int) TRI_COL_TYPE_DOCUMENT);
if (leftType != rightType) {
return leftType - rightType;
}
string leftName = JsonHelper::getStringValue(left, "name", "");
string rightName = JsonHelper::getStringValue(right, "name", "");
return strcmp(leftName.c_str(), rightName.c_str());
}
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- private methods
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup Replication
/// @{
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @brief apply the data from a collection dump
////////////////////////////////////////////////////////////////////////////////
int InitialSyncer::applyCollectionDump (TRI_transaction_collection_t* trxCollection,
SimpleHttpResult* response,
string& errorMsg) {
const string invalidMsg = "received invalid JSON data for collection " +
StringUtils::itoa(trxCollection->_cid);
std::stringstream& data = response->getBody();
while (true) {
string line;
std::getline(data, line, '\n');
if (line.size() < 2) {
// we are done
return TRI_ERROR_NO_ERROR;
}
TRI_json_t* json = TRI_JsonString(TRI_CORE_MEM_ZONE, line.c_str());
if (! JsonHelper::isArray(json)) {
if (json != 0) {
TRI_FreeJson(TRI_CORE_MEM_ZONE, json);
}
errorMsg = invalidMsg;
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
TRI_replication_operation_e type = REPLICATION_INVALID;
const char* key = 0;
TRI_voc_rid_t rid = 0;
TRI_json_t const* doc = 0;
const size_t n = json->_value._objects._length;
for (size_t i = 0; i < n; i += 2) {
TRI_json_t const* element = (TRI_json_t const*) TRI_AtVector(&json->_value._objects, i);
if (! JsonHelper::isString(element)) {
TRI_FreeJson(TRI_CORE_MEM_ZONE, json);
errorMsg = invalidMsg;
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
const char* attributeName = element->_value._string.data;
TRI_json_t const* value = (TRI_json_t const*) TRI_AtVector(&json->_value._objects, i + 1);
if (TRI_EqualString(attributeName, "type")) {
if (JsonHelper::isNumber(value)) {
type = (TRI_replication_operation_e) value->_value._number;
}
}
else if (TRI_EqualString(attributeName, "key")) {
if (JsonHelper::isString(value)) {
key = value->_value._string.data;
}
}
else if (TRI_EqualString(attributeName, "rev")) {
if (JsonHelper::isString(value)) {
rid = StringUtils::uint64(value->_value._string.data, value->_value._string.length - 1);
}
}
else if (TRI_EqualString(attributeName, "data")) {
if (JsonHelper::isArray(value)) {
doc = value;
}
}
}
// key must not be 0, but doc can be 0!
if (key == 0) {
TRI_FreeJson(TRI_CORE_MEM_ZONE, json);
errorMsg = invalidMsg;
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
int res = applyCollectionDumpMarker(trxCollection, type, (const TRI_voc_key_t) key, rid, doc, errorMsg);
TRI_FreeJson(TRI_CORE_MEM_ZONE, json);
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief incrementally fetch data from a collection
////////////////////////////////////////////////////////////////////////////////
int InitialSyncer::handleCollectionDump (TRI_transaction_collection_t* trxCollection,
const string& collectionName,
TRI_voc_tick_t maxTick,
string& errorMsg) {
const string cid = StringUtils::itoa(trxCollection->_cid);
const string baseUrl = BaseUrl +
"/dump?collection=" + cid +
"&chunkSize=" + StringUtils::itoa(getChunkSize());
map<string, string> headers;
TRI_voc_tick_t fromTick = 0;
int batch = 1;
while (1) {
const string url = baseUrl +
"&from=" + StringUtils::itoa(fromTick) +
"&to=" + StringUtils::itoa(maxTick) +
"&serverId=" + _localServerIdString;
// send request
const string progress = "fetching master collection dump for collection '" + collectionName +
"', id " + cid + ", batch " + StringUtils::itoa(batch);
setProgress(progress.c_str());
SimpleHttpResult* response = _client->request(HttpRequest::HTTP_REQUEST_GET,
url,
0,
0,
headers);
if (response == 0 || ! response->isComplete()) {
errorMsg = "could not connect to master at " + string(_masterInfo._endpoint) +
": " + _client->getErrorMessage();
if (response != 0) {
delete response;
}
return TRI_ERROR_REPLICATION_NO_RESPONSE;
}
if (response->wasHttpError()) {
errorMsg = "got invalid response from master at " + string(_masterInfo._endpoint) +
": HTTP " + StringUtils::itoa(response->getHttpReturnCode()) +
": " + response->getHttpReturnMessage();
delete response;
return TRI_ERROR_REPLICATION_MASTER_ERROR;
}
int res;
bool checkMore = false;
bool found;
TRI_voc_tick_t tick;
string header = response->getHeaderField(TRI_REPLICATION_HEADER_CHECKMORE, found);
if (found) {
checkMore = StringUtils::boolean(header);
res = TRI_ERROR_NO_ERROR;
if (checkMore) {
header = response->getHeaderField(TRI_REPLICATION_HEADER_LASTINCLUDED, found);
if (found) {
tick = StringUtils::uint64(header);
if (tick > fromTick) {
fromTick = tick;
}
else {
// we got the same tick again, this indicates we're at the end
checkMore = false;
}
}
}
}
if (! found) {
errorMsg = "got invalid response from master at " + string(_masterInfo._endpoint) +
": required header is missing";
res = TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
if (res == TRI_ERROR_NO_ERROR) {
res = applyCollectionDump(trxCollection, response, errorMsg);
}
delete response;
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
if (! checkMore || fromTick == 0) {
// done
return res;
}
batch++;
}
assert(false);
return TRI_ERROR_INTERNAL;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief handle the information about a collection
////////////////////////////////////////////////////////////////////////////////
int InitialSyncer::handleCollectionInitial (TRI_json_t const* parameters,
TRI_json_t const* indexes,
string& errorMsg,
sync_phase_e phase) {
const string masterName = JsonHelper::getStringValue(parameters, "name", "");
if (masterName.empty()) {
errorMsg = "collection name is missing in response";
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
if (TRI_ExcludeCollectionReplication(masterName.c_str())) {
// we're not interested in this collection
return TRI_ERROR_NO_ERROR;
}
if (JsonHelper::getBooleanValue(parameters, "deleted", false)) {
// we don't care about deleted collections
return TRI_ERROR_NO_ERROR;
}
TRI_json_t const* masterId = JsonHelper::getArrayElement(parameters, "cid");
if (! JsonHelper::isString(masterId)) {
errorMsg = "collection id is missing in response";
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
TRI_voc_cid_t cid = StringUtils::uint64(masterId->_value._string.data, masterId->_value._string.length - 1);
const string collectionMsg = "collection '" + masterName + "', id " + StringUtils::itoa(cid);
// phase handling
if (phase == PHASE_VALIDATE) {
// validation phase just returns ok if we got here (aborts above if data is invalid)
_numCollections++;
return TRI_ERROR_NO_ERROR;
}
// drop collections locally
// -------------------------------------------------------------------------------------
if (phase == PHASE_DROP) {
// first look up the collection by the cid
TRI_vocbase_col_t* col = TRI_LookupCollectionByIdVocBase(_vocbase, cid);
if (col == 0) {
// not found, try name next
col = TRI_LookupCollectionByNameVocBase(_vocbase, masterName.c_str());
}
if (col != 0) {
const string progress = "dropping " + collectionMsg;
setProgress(progress.c_str());
int res = TRI_DropCollectionVocBase(_vocbase, col, _masterInfo._serverId);
if (res != TRI_ERROR_NO_ERROR) {
errorMsg = "unable to drop " + collectionMsg + ": " + TRI_errno_string(res);
return res;
}
}
return TRI_ERROR_NO_ERROR;
}
// re-create collections locally
// -------------------------------------------------------------------------------------
else if (phase == PHASE_CREATE) {
TRI_vocbase_col_t* col = 0;
const string progress = "creating " + collectionMsg;
setProgress(progress.c_str());
int res = createCollection(parameters, &col);
if (res != TRI_ERROR_NO_ERROR) {
errorMsg = "unable to create " + collectionMsg + ": " + TRI_errno_string(res);
return res;
}
return TRI_ERROR_NO_ERROR;
}
// sync collection data
// -------------------------------------------------------------------------------------
else if (phase == PHASE_DUMP) {
int res;
const string progress = "syncing data for " + collectionMsg;
setProgress(progress.c_str());
TRI_transaction_t* trx = TRI_CreateTransaction(_vocbase->_transactionContext,
_masterInfo._serverId,
false,
0.0,
false);
if (trx == 0) {
errorMsg = "unable to start transaction";
return TRI_ERROR_OUT_OF_MEMORY;
}
res = TRI_AddCollectionTransaction(trx, cid, TRI_TRANSACTION_WRITE, TRI_TRANSACTION_TOP_LEVEL);
if (res != TRI_ERROR_NO_ERROR) {
TRI_FreeTransaction(trx);
errorMsg = "unable to start transaction: " + string(TRI_errno_string(res));
return res;
}
res = TRI_BeginTransaction(trx, getHint(1), TRI_TRANSACTION_TOP_LEVEL);
if (res != TRI_ERROR_NO_ERROR) {
TRI_FreeTransaction(trx);
errorMsg = "unable to start transaction: " + string(TRI_errno_string(res));
return TRI_ERROR_INTERNAL;
}
TRI_transaction_collection_t* trxCollection = TRI_GetCollectionTransaction(trx, cid, TRI_TRANSACTION_WRITE);
if (trxCollection == NULL) {
res = TRI_ERROR_INTERNAL;
errorMsg = "unable to start transaction: " + string(TRI_errno_string(res));
}
else {
res = handleCollectionDump(trxCollection, masterName, _masterInfo._state._lastLogTick, errorMsg);
}
if (res == TRI_ERROR_NO_ERROR) {
// now create indexes
const size_t n = indexes->_value._objects._length;
if (n > 0) {
const string progress = "creating indexes for " + collectionMsg;
setProgress(progress.c_str());
for (size_t i = 0; i < n; ++i) {
TRI_json_t const* idxDef = (TRI_json_t const*) TRI_AtVector(&indexes->_value._objects, i);
TRI_index_t* idx = 0;
// {"id":"229907440927234","type":"hash","unique":false,"fields":["x","Y"]}
res = TRI_FromJsonIndexDocumentCollection((TRI_document_collection_t*) trxCollection->_collection->_collection, idxDef, &idx);
if (res != TRI_ERROR_NO_ERROR) {
errorMsg = "could not create index: " + string(TRI_errno_string(res));
break;
}
else {
assert(idx != 0);
res = TRI_SaveIndex((TRI_primary_collection_t*) trxCollection->_collection->_collection,
idx,
_masterInfo._serverId);
if (res != TRI_ERROR_NO_ERROR) {
errorMsg = "could not save index: " + string(TRI_errno_string(res));
break;
}
}
}
}
}
if (res == TRI_ERROR_NO_ERROR) {
TRI_CommitTransaction(trx, TRI_TRANSACTION_TOP_LEVEL);
}
TRI_FreeTransaction(trx);
return res;
}
// we won't get here
assert(false);
return TRI_ERROR_INTERNAL;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief handle the inventory response of the master
////////////////////////////////////////////////////////////////////////////////
int InitialSyncer::handleInventoryResponse (TRI_json_t const* json,
string& errorMsg) {
TRI_json_t* collections = JsonHelper::getArrayElement(json, "collections");
if (! JsonHelper::isList(collections)) {
errorMsg = "collections section is missing from response";
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
const size_t n = collections->_value._objects._length;
if (n > 1) {
// sort by collection type (vertices before edges), then name
qsort(collections->_value._objects._buffer, n, sizeof(TRI_json_t), &sortCollections);
}
int res;
// STEP 1: validate collection declarations from master
// ----------------------------------------------------------------------------------
// iterate over all collections from the master...
res = iterateCollections(collections, errorMsg, PHASE_VALIDATE);
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
// STEP 2: drop collections locally if they are also present on the master (clean up)
// ----------------------------------------------------------------------------------
res = iterateCollections(collections, errorMsg, PHASE_DROP);
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
// STEP 3: re-create empty collections locally
// ----------------------------------------------------------------------------------
if (n > 0) {
// we'll sleep for a while to allow the collections to be dropped (asynchronously)
// TODO: find a safer mechanism for waiting until we can beginning creating collections
sleep(5);
}
res = iterateCollections(collections, errorMsg, PHASE_CREATE);
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
// STEP 4: sync collection data from master and create initial indexes
// ----------------------------------------------------------------------------------
res = iterateCollections(collections, errorMsg, PHASE_DUMP);
if (res != TRI_ERROR_NO_ERROR) {
errorMsg = "could not save replication state information";
}
return res;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief iterate over all collections from a list and apply an action
////////////////////////////////////////////////////////////////////////////////
int InitialSyncer::iterateCollections (TRI_json_t const* collections,
string& errorMsg,
sync_phase_e phase) {
const size_t n = collections->_value._objects._length;
for (size_t i = 0; i < n; ++i) {
TRI_json_t const* collection = (TRI_json_t const*) TRI_AtVector(&collections->_value._objects, i);
if (! JsonHelper::isArray(collection)) {
errorMsg = "collection declaration is invalid in response";
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
TRI_json_t const* parameters = JsonHelper::getArrayElement(collection, "parameters");
if (! JsonHelper::isArray(parameters)) {
errorMsg = "collection parameters declaration is invalid in response";
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
TRI_json_t const* indexes = JsonHelper::getArrayElement(collection, "indexes");
if (! JsonHelper::isList(indexes)) {
errorMsg = "collection indexes declaration is invalid in response";
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
int res = handleCollectionInitial(parameters, indexes, errorMsg, phase);
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
}
// all ok
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// Local Variables:
// mode: outline-minor
// outline-regexp: "/// @brief\\|/// {@inheritDoc}\\|/// @addtogroup\\|/// @page\\|// --SECTION--\\|/// @\\}"
// End:

View File

@ -0,0 +1,278 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief replication initial data synchroniser
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2004-2013 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is triAGENS GmbH, Cologne, Germany
///
/// @author Jan Steemann
/// @author Copyright 2013, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#ifndef TRIAGENS_REPLICATION_INITIAL_SYNCER_H
#define TRIAGENS_REPLICATION_INITIAL_SYNCER_H 1
#include "Basics/Common.h"
#include "Replication/Syncer.h"
// -----------------------------------------------------------------------------
// --SECTION-- forward declarations
// -----------------------------------------------------------------------------
struct TRI_json_s;
struct TRI_replication_applier_configuration_s;
struct TRI_transaction_collection_s;
struct TRI_vocbase_s;
namespace triagens {
namespace httpclient {
class SimpleHttpResult;
}
namespace arango {
// -----------------------------------------------------------------------------
// --SECTION-- InitialSyncer
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup Replication
/// @{
////////////////////////////////////////////////////////////////////////////////
class InitialSyncer : public Syncer {
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- private types
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup Replication
/// @{
////////////////////////////////////////////////////////////////////////////////
private:
////////////////////////////////////////////////////////////////////////////////
/// @brief apply phases
////////////////////////////////////////////////////////////////////////////////
typedef enum {
PHASE_NONE,
PHASE_INIT,
PHASE_VALIDATE,
PHASE_DROP,
PHASE_CREATE,
PHASE_DUMP
}
sync_phase_e;
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- constructors and destructors
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup Replication
/// @{
////////////////////////////////////////////////////////////////////////////////
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief constructor
////////////////////////////////////////////////////////////////////////////////
InitialSyncer (struct TRI_vocbase_s*,
struct TRI_replication_applier_configuration_s const*,
bool);
////////////////////////////////////////////////////////////////////////////////
/// @brief destructor
////////////////////////////////////////////////////////////////////////////////
~InitialSyncer ();
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- public methods
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup Replication
/// @{
////////////////////////////////////////////////////////////////////////////////
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief run method, performs a full synchronisation
////////////////////////////////////////////////////////////////////////////////
int run (std::string&);
////////////////////////////////////////////////////////////////////////////////
/// @brief return the last log tick of the master at start
////////////////////////////////////////////////////////////////////////////////
TRI_voc_tick_t getLastLogTick () const;
////////////////////////////////////////////////////////////////////////////////
/// @brief return the number of collections synced
////////////////////////////////////////////////////////////////////////////////
uint32_t getNumCollections () const;
////////////////////////////////////////////////////////////////////////////////
/// @brief comparator to sort collections
/// sort order is by collection type first (vertices before edges), then name
////////////////////////////////////////////////////////////////////////////////
static int sortCollections (const void*,
const void*);
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- private methods
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup Replication
/// @{
////////////////////////////////////////////////////////////////////////////////
private:
////////////////////////////////////////////////////////////////////////////////
/// @brief set a progress message
////////////////////////////////////////////////////////////////////////////////
void setProgress (const string& message) {
_progress = message;
if (_verbose) {
LOGGER_INFO("synchronisation progress: " << message);
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief apply the data from a collection dump
////////////////////////////////////////////////////////////////////////////////
int applyCollectionDump (struct TRI_transaction_collection_s*,
httpclient::SimpleHttpResult*,
std::string&);
////////////////////////////////////////////////////////////////////////////////
/// @brief incrementally fetch data from a collection
////////////////////////////////////////////////////////////////////////////////
int handleCollectionDump (struct TRI_transaction_collection_s*,
const std::string&,
TRI_voc_tick_t,
std::string&);
////////////////////////////////////////////////////////////////////////////////
/// @brief handle the information about a collection
////////////////////////////////////////////////////////////////////////////////
int handleCollectionInitial (struct TRI_json_s const*,
struct TRI_json_s const*,
std::string&,
sync_phase_e);
////////////////////////////////////////////////////////////////////////////////
/// @brief handle the inventory response of the master
////////////////////////////////////////////////////////////////////////////////
int handleInventoryResponse (struct TRI_json_s const*,
std::string&);
////////////////////////////////////////////////////////////////////////////////
/// @brief iterate over all collections from a list and apply an action
////////////////////////////////////////////////////////////////////////////////
int iterateCollections (struct TRI_json_s const*,
std::string&,
sync_phase_e);
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- private variables
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup Replication
/// @{
////////////////////////////////////////////////////////////////////////////////
private:
////////////////////////////////////////////////////////////////////////////////
/// @brief progress message
////////////////////////////////////////////////////////////////////////////////
std::string _progress;
////////////////////////////////////////////////////////////////////////////////
/// @brief number of collections synced
////////////////////////////////////////////////////////////////////////////////
uint32_t _numCollections;
////////////////////////////////////////////////////////////////////////////////
/// @brief verbosity
////////////////////////////////////////////////////////////////////////////////
bool _verbose;
};
}
}
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
#endif
// Local Variables:
// mode: outline-minor
// outline-regexp: "/// @brief\\|/// {@inheritDoc}\\|/// @addtogroup\\|/// @page\\|// --SECTION--\\|/// @\\}"
// End:

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,658 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief replication syncer base class
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2004-2013 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is triAGENS GmbH, Cologne, Germany
///
/// @author Jan Steemann
/// @author Copyright 2013, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#include "Syncer.h"
#include "BasicsC/json.h"
#include "BasicsC/tri-strings.h"
#include "Basics/JsonHelper.h"
#include "Rest/HttpRequest.h"
#include "Rest/SslInterface.h"
#include "SimpleHttpClient/GeneralClientConnection.h"
#include "SimpleHttpClient/SimpleHttpClient.h"
#include "SimpleHttpClient/SimpleHttpResult.h"
#include "Utils/DocumentHelper.h"
#include "VocBase/collection.h"
#include "VocBase/edge-collection.h"
#include "VocBase/primary-collection.h"
#include "VocBase/server-id.h"
#include "VocBase/transaction.h"
#include "VocBase/vocbase.h"
#include "VocBase/voc-types.h"
using namespace std;
using namespace triagens::arango;
using namespace triagens::basics;
using namespace triagens::rest;
using namespace triagens::httpclient;
// -----------------------------------------------------------------------------
// --SECTION-- static variables
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup Replication
/// @{
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @brief base url of the replication API
////////////////////////////////////////////////////////////////////////////////
const string Syncer::BaseUrl = "/_api/replication";
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- constructors and destructors
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup Replication
/// @{
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @brief constructor
////////////////////////////////////////////////////////////////////////////////
Syncer::Syncer (TRI_vocbase_t* vocbase,
TRI_replication_applier_configuration_t const* configuration) :
_vocbase(vocbase),
_configuration(),
_masterInfo(),
_policy(),
_endpoint(0),
_connection(0),
_client(0) {
// get our own server-id
_localServerId = TRI_GetServerId();
_localServerIdString = StringUtils::itoa(_localServerId);
// init the update policy
TRI_InitUpdatePolicy(&_policy, TRI_DOC_UPDATE_LAST_WRITE, 0, 0);
TRI_InitConfigurationReplicationApplier(&_configuration);
TRI_CopyConfigurationReplicationApplier(configuration, &_configuration);
TRI_InitMasterInfoReplication(&_masterInfo, configuration->_endpoint);
_endpoint = Endpoint::clientFactory(_configuration._endpoint);
if (_endpoint != 0) {
_connection = GeneralClientConnection::factory(_endpoint,
_configuration._requestTimeout,
_configuration._connectTimeout,
(size_t) _configuration._maxConnectRetries);
if (_connection != 0) {
_client = new SimpleHttpClient(_connection, _configuration._requestTimeout, false);
if (_client != 0) {
string username;
string password;
if (_configuration._username != 0) {
username = string(_configuration._username);
}
if (_configuration._password != 0) {
password = string(_configuration._password);
}
_client->setUserNamePassword("/", username, password);
}
}
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief destructor
////////////////////////////////////////////////////////////////////////////////
Syncer::~Syncer () {
// shutdown everything properly
if (_client != 0) {
delete _client;
}
if (_connection != 0) {
delete _connection;
}
if (_endpoint != 0) {
delete _endpoint;
}
TRI_DestroyMasterInfoReplication(&_masterInfo);
TRI_DestroyConfigurationReplicationApplier(&_configuration);
}
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- protected methods
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup Replication
/// @{
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @brief get chunk size for a transfer
////////////////////////////////////////////////////////////////////////////////
uint64_t Syncer::getChunkSize () const {
static const uint64_t chunkSize = 4 * 1024 * 1024;
return chunkSize;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief extract the collection id from JSON
////////////////////////////////////////////////////////////////////////////////
TRI_voc_cid_t Syncer::getCid (TRI_json_t const* json) const {
if (! JsonHelper::isArray(json)) {
return 0;
}
TRI_json_t const* id = JsonHelper::getArrayElement(json, "cid");
if (JsonHelper::isString(id)) {
return StringUtils::uint64(id->_value._string.data, id->_value._string.length - 1);
}
else if (JsonHelper::isNumber(id)) {
return (TRI_voc_cid_t) id->_value._number;
}
return 0;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief creates a transaction hint
////////////////////////////////////////////////////////////////////////////////
TRI_transaction_hint_t Syncer::getHint (const size_t numOperations) const {
if (numOperations <= 1) {
return (TRI_transaction_hint_t) TRI_TRANSACTION_HINT_SINGLE_OPERATION;
}
return (TRI_transaction_hint_t) 0;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief apply the data from a collection dump or the continuous log
////////////////////////////////////////////////////////////////////////////////
int Syncer::applyCollectionDumpMarker (TRI_transaction_collection_t* trxCollection,
TRI_replication_operation_e type,
const TRI_voc_key_t key,
const TRI_voc_rid_t rid,
TRI_json_t const* json,
string& errorMsg) {
if (type == MARKER_DOCUMENT || type == MARKER_EDGE) {
// {"type":2400,"key":"230274209405676","data":{"_key":"230274209405676","_rev":"230274209405676","foo":"bar"}}
assert(json != 0);
TRI_primary_collection_t* primary = trxCollection->_collection->_collection;
TRI_shaped_json_t* shaped = TRI_ShapedJsonJson(primary->_shaper, json);
if (shaped != 0) {
TRI_doc_mptr_t mptr;
int res = primary->read(trxCollection, key, &mptr, false);
if (res == TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND) {
// insert
if (type == MARKER_EDGE) {
// edge
if (primary->base._info._type != TRI_COL_TYPE_EDGE) {
res = TRI_ERROR_ARANGO_COLLECTION_TYPE_INVALID;
}
else {
res = TRI_ERROR_NO_ERROR;
}
const string from = JsonHelper::getStringValue(json, TRI_VOC_ATTRIBUTE_FROM, "");
const string to = JsonHelper::getStringValue(json, TRI_VOC_ATTRIBUTE_TO, "");
// parse _from
TRI_document_edge_t edge;
if (! DocumentHelper::parseDocumentId(from.c_str(), edge._fromCid, &edge._fromKey)) {
res = TRI_ERROR_ARANGO_DOCUMENT_HANDLE_BAD;
}
// parse _to
if (! DocumentHelper::parseDocumentId(to.c_str(), edge._toCid, &edge._toKey)) {
res = TRI_ERROR_ARANGO_DOCUMENT_HANDLE_BAD;
}
if (res == TRI_ERROR_NO_ERROR) {
res = primary->insert(trxCollection, key, rid, &mptr, TRI_DOC_MARKER_KEY_EDGE, shaped, &edge, false, false);
}
}
else {
// document
if (primary->base._info._type != TRI_COL_TYPE_DOCUMENT) {
res = TRI_ERROR_ARANGO_COLLECTION_TYPE_INVALID;
}
else {
res = primary->insert(trxCollection, key, rid, &mptr, TRI_DOC_MARKER_KEY_DOCUMENT, shaped, 0, false, false);
}
}
}
else {
// update
res = primary->update(trxCollection, key, rid, &mptr, shaped, &_policy, false, false);
}
TRI_FreeShapedJson(primary->_shaper, shaped);
return res;
}
else {
errorMsg = TRI_errno_string(TRI_ERROR_OUT_OF_MEMORY);
return TRI_ERROR_OUT_OF_MEMORY;
}
}
else if (type == MARKER_REMOVE) {
// {"type":2402,"key":"592063"}
TRI_primary_collection_t* primary = trxCollection->_collection->_collection;
int res = primary->remove(trxCollection, key, rid, &_policy, false, false);
if (res != TRI_ERROR_NO_ERROR) {
if (res == TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND) {
// ignore this error
res = TRI_ERROR_NO_ERROR;
}
else {
errorMsg = "document removal operation failed: " + string(TRI_errno_string(res));
}
}
return res;
}
else {
errorMsg = "unexpected marker type " + StringUtils::itoa(type);
return TRI_ERROR_REPLICATION_UNEXPECTED_MARKER;
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief creates a collection, based on the JSON provided
////////////////////////////////////////////////////////////////////////////////
int Syncer::createCollection (TRI_json_t const* json,
TRI_vocbase_col_t** dst) {
if (dst != 0) {
*dst = 0;
}
if (! JsonHelper::isArray(json)) {
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
const string name = JsonHelper::getStringValue(json, "name", "");
if (name.empty()) {
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
const TRI_voc_cid_t cid = getCid(json);
if (cid == 0) {
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
const TRI_col_type_e type = (TRI_col_type_e) JsonHelper::getIntValue(json, "type", (int) TRI_COL_TYPE_DOCUMENT);
TRI_vocbase_col_t* col = TRI_LookupCollectionByIdVocBase(_vocbase, cid);
if (col != 0 &&
(TRI_col_type_t) col->_type == (TRI_col_type_t) type) {
// collection already exists. TODO: compare attributes
return TRI_ERROR_NO_ERROR;
}
TRI_json_t* keyOptions = 0;
if (JsonHelper::isArray(JsonHelper::getArrayElement(json, "keyOptions"))) {
keyOptions = TRI_CopyJson(TRI_CORE_MEM_ZONE, JsonHelper::getArrayElement(json, "keyOptions"));
}
TRI_col_info_t params;
TRI_InitCollectionInfo(_vocbase,
&params,
name.c_str(),
type,
(TRI_voc_size_t) JsonHelper::getNumberValue(json, "maximalSize", (double) TRI_JOURNAL_DEFAULT_MAXIMAL_SIZE),
keyOptions);
params._doCompact = JsonHelper::getBooleanValue(json, "doCompact", true);
params._waitForSync = JsonHelper::getBooleanValue(json, "waitForSync", _vocbase->_defaultWaitForSync);
params._isVolatile = JsonHelper::getBooleanValue(json, "isVolatile", false);
col = TRI_CreateCollectionVocBase(_vocbase, &params, cid, _masterInfo._serverId);
TRI_FreeCollectionInfoOptions(&params);
if (col == NULL) {
return TRI_errno();
}
if (dst != 0) {
*dst = col;
}
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief drops a collection, based on the JSON provided
////////////////////////////////////////////////////////////////////////////////
int Syncer::dropCollection (TRI_json_t const* json) {
const TRI_voc_cid_t cid = getCid(json);
if (cid == 0) {
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
TRI_vocbase_col_t* col = TRI_LookupCollectionByIdVocBase(_vocbase, cid);
if (col == 0) {
// TODO: should we care?
return TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND;
}
return TRI_DropCollectionVocBase(_vocbase, col, _masterInfo._serverId);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief creates an index, based on the JSON provided
////////////////////////////////////////////////////////////////////////////////
int Syncer::createIndex (TRI_json_t const* json) {
const TRI_voc_cid_t cid = getCid(json);
if (cid == 0) {
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
TRI_json_t const* indexJson = JsonHelper::getArrayElement(json, "index");
if (! JsonHelper::isArray(indexJson)) {
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
TRI_vocbase_col_t* col = TRI_UseCollectionByIdVocBase(_vocbase, cid);
if (col == 0 || col->_collection == 0) {
return TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND;
}
TRI_index_t* idx;
TRI_primary_collection_t* primary = col->_collection;
TRI_WRITE_LOCK_DOCUMENTS_INDEXES_PRIMARY_COLLECTION(primary);
int res = TRI_FromJsonIndexDocumentCollection((TRI_document_collection_t*) primary, indexJson, &idx);
if (res == TRI_ERROR_NO_ERROR) {
res = TRI_SaveIndex(primary, idx, _masterInfo._serverId);
}
TRI_WRITE_UNLOCK_DOCUMENTS_INDEXES_PRIMARY_COLLECTION(primary);
TRI_ReleaseCollectionVocBase(_vocbase, col);
return res;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief drops an index, based on the JSON provided
////////////////////////////////////////////////////////////////////////////////
int Syncer::dropIndex (TRI_json_t const* json) {
const TRI_voc_cid_t cid = getCid(json);
if (cid == 0) {
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
const string id = JsonHelper::getStringValue(json, "id", "");
if (id.empty()) {
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
const TRI_idx_iid_t iid = StringUtils::uint64(id);
TRI_vocbase_col_t* col = TRI_UseCollectionByIdVocBase(_vocbase, cid);
if (col == 0 || col->_collection == 0) {
return TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND;
}
TRI_document_collection_t* document = (TRI_document_collection_t*) col->_collection;
bool result = TRI_DropIndexDocumentCollection(document, iid, _masterInfo._serverId);
TRI_ReleaseCollectionVocBase(_vocbase, col);
if (! result) {
// TODO: index not found, should we care??
return TRI_ERROR_NO_ERROR;
}
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief get master state
////////////////////////////////////////////////////////////////////////////////
int Syncer::getMasterState (string& errorMsg) {
map<string, string> headers;
static const string url = BaseUrl +
"/logger-state" +
"?serverId=" + _localServerIdString;
SimpleHttpResult* response = _client->request(HttpRequest::HTTP_REQUEST_GET,
url,
0,
0,
headers);
if (response == 0 || ! response->isComplete()) {
errorMsg = "could not connect to master at " + string(_masterInfo._endpoint) +
": " + _client->getErrorMessage();
if (response != 0) {
delete response;
}
return TRI_ERROR_REPLICATION_NO_RESPONSE;
}
int res = TRI_ERROR_NO_ERROR;
if (response->wasHttpError()) {
res = TRI_ERROR_REPLICATION_MASTER_ERROR;
errorMsg = "got invalid response from master at " + string(_masterInfo._endpoint) +
": HTTP " + StringUtils::itoa(response->getHttpReturnCode()) +
": " + response->getHttpReturnMessage();
}
else {
TRI_json_t* json = TRI_JsonString(TRI_UNKNOWN_MEM_ZONE, response->getBody().str().c_str());
if (JsonHelper::isArray(json)) {
res = handleStateResponse(json, errorMsg);
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
}
else {
res = TRI_ERROR_REPLICATION_INVALID_RESPONSE;
errorMsg = "got invalid response from master at " + string(_masterInfo._endpoint) +
": invalid JSON";
}
}
delete response;
return res;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief handle the state response of the master
////////////////////////////////////////////////////////////////////////////////
int Syncer::handleStateResponse (TRI_json_t const* json,
string& errorMsg) {
// process "state" section
TRI_json_t const* state = JsonHelper::getArrayElement(json, "state");
if (! JsonHelper::isArray(state)) {
errorMsg = "state section is missing from response";
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
// state."lastLogTick"
TRI_json_t const* tick = JsonHelper::getArrayElement(state, "lastLogTick");
if (! JsonHelper::isString(tick)) {
errorMsg = "lastLogTick is missing from response";
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
const TRI_voc_tick_t lastLogTick = StringUtils::uint64(tick->_value._string.data, tick->_value._string.length - 1);
// state."running"
bool running = JsonHelper::getBooleanValue(state, "running", false);
// process "server" section
TRI_json_t const* server = JsonHelper::getArrayElement(json, "server");
if (! JsonHelper::isArray(server)) {
errorMsg = "server section is missing from response";
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
// server."version"
TRI_json_t const* version = JsonHelper::getArrayElement(server, "version");
if (! JsonHelper::isString(version)) {
errorMsg = "server version is missing from response";
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
// server."serverId"
TRI_json_t const* serverId = JsonHelper::getArrayElement(server, "serverId");
if (! JsonHelper::isString(serverId)) {
errorMsg = "server id is missing from response";
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
// validate all values we got
const string masterIdString = string(serverId->_value._string.data, serverId->_value._string.length - 1);
const TRI_server_id_t masterId = StringUtils::uint64(masterIdString);
if (masterId == 0) {
// invalid master id
errorMsg = "server id in response is invalid";
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
if (masterIdString == _localServerIdString) {
// master and replica are the same instance. this is not supported.
errorMsg = "master's id is the same as the local server's id";
return TRI_ERROR_REPLICATION_LOOP;
}
int major = 0;
int minor = 0;
const string versionString = string(version->_value._string.data, version->_value._string.length - 1);
if (sscanf(versionString.c_str(), "%d.%d", &major, &minor) != 2) {
errorMsg = "invalid master version info: " + versionString;
return TRI_ERROR_REPLICATION_MASTER_INCOMPATIBLE;
}
if (major != 1 ||
(major == 1 && minor != 4)) {
errorMsg = "incompatible master version: " + versionString;
return TRI_ERROR_REPLICATION_MASTER_INCOMPATIBLE;
}
_masterInfo._majorVersion = major;
_masterInfo._minorVersion = minor;
_masterInfo._serverId = masterId;
_masterInfo._state._lastLogTick = lastLogTick;
_masterInfo._state._active = running;
TRI_LogMasterInfoReplication(&_masterInfo, "connected to");
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// Local Variables:
// mode: outline-minor
// outline-regexp: "/// @brief\\|/// {@inheritDoc}\\|/// @addtogroup\\|/// @page\\|// --SECTION--\\|/// @\\}"
// End:

View File

@ -1,5 +1,5 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief replication data fetcher
/// @brief replication syncer base class
///
/// @file
///
@ -25,14 +25,13 @@
/// @author Copyright 2013, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#ifndef TRIAGENS_REPLICATION_REPLICATION_FETCHER_H
#define TRIAGENS_REPLICATION_REPLICATION_FETCHER_H 1
#ifndef TRIAGENS_REPLICATION_SYNCER_H
#define TRIAGENS_REPLICATION_SYNCER_H 1
#include "Basics/Common.h"
#include "Logger/Logger.h"
#include "VocBase/replication-applier.h"
#include "VocBase/replication-dump.h"
#include "VocBase/replication-master.h"
#include "VocBase/server-id.h"
#include "VocBase/transaction.h"
@ -43,7 +42,6 @@
// -----------------------------------------------------------------------------
struct TRI_json_s;
struct TRI_replication_applier_s;
struct TRI_replication_applier_configuration_s;
struct TRI_transaction_collection_s;
struct TRI_vocbase_s;
@ -64,10 +62,19 @@ namespace triagens {
namespace arango {
// -----------------------------------------------------------------------------
// --SECTION-- ReplicationFetcher
// --SECTION-- Syncer
// -----------------------------------------------------------------------------
class ReplicationFetcher {
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup Replication
/// @{
////////////////////////////////////////////////////////////////////////////////
class Syncer {
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- constructors and destructors
@ -84,22 +91,21 @@ namespace triagens {
/// @brief constructor
////////////////////////////////////////////////////////////////////////////////
ReplicationFetcher (struct TRI_vocbase_s*,
struct TRI_replication_applier_configuration_s const*,
bool);
Syncer (struct TRI_vocbase_s*,
struct TRI_replication_applier_configuration_s const*);
////////////////////////////////////////////////////////////////////////////////
/// @brief destructor
////////////////////////////////////////////////////////////////////////////////
~ReplicationFetcher ();
virtual ~Syncer ();
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- public methods
// --SECTION-- protected methods
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
@ -107,42 +113,7 @@ namespace triagens {
/// @{
////////////////////////////////////////////////////////////////////////////////
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief non-static run method
////////////////////////////////////////////////////////////////////////////////
int run ();
////////////////////////////////////////////////////////////////////////////////
/// @brief comparator to sort collections
/// sort order is by collection type first (vertices before edges), then name
////////////////////////////////////////////////////////////////////////////////
static int sortCollections (const void*,
const void*);
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- private methods
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup Replication
/// @{
////////////////////////////////////////////////////////////////////////////////
private:
////////////////////////////////////////////////////////////////////////////////
/// @brief save the current applier state
////////////////////////////////////////////////////////////////////////////////
int saveApplierState ();
protected:
////////////////////////////////////////////////////////////////////////////////
/// @brief get chunk size for a transfer
@ -150,30 +121,12 @@ namespace triagens {
uint64_t getChunkSize () const;
////////////////////////////////////////////////////////////////////////////////
/// @brief set the applier progress
////////////////////////////////////////////////////////////////////////////////
void setProgress (char const*);
////////////////////////////////////////////////////////////////////////////////
/// @brief set the applier phase
////////////////////////////////////////////////////////////////////////////////
void setPhase (TRI_replication_applier_phase_e);
////////////////////////////////////////////////////////////////////////////////
/// @brief extract the collection id from JSON
////////////////////////////////////////////////////////////////////////////////
TRI_voc_cid_t getCid (struct TRI_json_s const*) const;
////////////////////////////////////////////////////////////////////////////////
/// @brief abort any ongoing transaction
////////////////////////////////////////////////////////////////////////////////
void abortOngoingTransaction ();
////////////////////////////////////////////////////////////////////////////////
/// @brief creates a transaction hint
////////////////////////////////////////////////////////////////////////////////
@ -181,32 +134,15 @@ namespace triagens {
TRI_transaction_hint_t getHint (const size_t) const;
////////////////////////////////////////////////////////////////////////////////
/// @brief creates a transaction for a single operation
/// @brief apply a single marker from the collection dump
////////////////////////////////////////////////////////////////////////////////
struct TRI_transaction_s* createSingleOperationTransaction (TRI_voc_cid_t,
int*);
////////////////////////////////////////////////////////////////////////////////
/// @brief starts a transaction, based on the JSON provided
////////////////////////////////////////////////////////////////////////////////
int startTransaction (struct TRI_json_s const*);
////////////////////////////////////////////////////////////////////////////////
/// @brief commits a transaction, based on the JSON provided
////////////////////////////////////////////////////////////////////////////////
int commitTransaction (struct TRI_json_s const*);
////////////////////////////////////////////////////////////////////////////////
/// @brief process a document operation, based on the JSON provided
////////////////////////////////////////////////////////////////////////////////
int processDocument (TRI_replication_operation_e,
struct TRI_json_s const*,
bool&,
std::string&);
int applyCollectionDumpMarker (struct TRI_transaction_collection_s*,
TRI_replication_operation_e,
const TRI_voc_key_t,
const TRI_voc_rid_t,
struct TRI_json_s const*,
std::string&);
////////////////////////////////////////////////////////////////////////////////
/// @brief creates a collection, based on the JSON provided
@ -221,12 +157,6 @@ namespace triagens {
int dropCollection (struct TRI_json_s const*);
////////////////////////////////////////////////////////////////////////////////
/// @brief renames a collection, based on the JSON provided
////////////////////////////////////////////////////////////////////////////////
int renameCollection (struct TRI_json_s const*);
////////////////////////////////////////////////////////////////////////////////
/// @brief creates an index, based on the JSON provided
////////////////////////////////////////////////////////////////////////////////
@ -239,83 +169,12 @@ namespace triagens {
int dropIndex (struct TRI_json_s const*);
////////////////////////////////////////////////////////////////////////////////
/// @brief apply a single marker from the collection dump
////////////////////////////////////////////////////////////////////////////////
int applyCollectionDumpMarker (struct TRI_transaction_collection_s*,
TRI_replication_operation_e,
const TRI_voc_key_t,
const TRI_voc_rid_t,
struct TRI_json_s const*,
std::string&);
////////////////////////////////////////////////////////////////////////////////
/// @brief apply the data from a collection dump
////////////////////////////////////////////////////////////////////////////////
int applyCollectionDump (struct TRI_transaction_collection_s*,
httpclient::SimpleHttpResult*,
std::string&);
////////////////////////////////////////////////////////////////////////////////
/// @brief apply a single marker from the continuous log
////////////////////////////////////////////////////////////////////////////////
int applyLogMarker (struct TRI_json_s const*,
bool&,
std::string&);
////////////////////////////////////////////////////////////////////////////////
/// @brief apply the data from the continuous log
////////////////////////////////////////////////////////////////////////////////
int applyLog (httpclient::SimpleHttpResult*,
std::string&,
uint64_t&);
////////////////////////////////////////////////////////////////////////////////
/// @brief get local replication applier state
////////////////////////////////////////////////////////////////////////////////
int getLocalState (std::string&);
////////////////////////////////////////////////////////////////////////////////
/// @brief get master state
////////////////////////////////////////////////////////////////////////////////
int getMasterState (std::string&);
////////////////////////////////////////////////////////////////////////////////
/// @brief perform an initial sync with the master
////////////////////////////////////////////////////////////////////////////////
int performInitialSync (std::string&);
////////////////////////////////////////////////////////////////////////////////
/// @brief perform a continuous sync with the master
////////////////////////////////////////////////////////////////////////////////
int performContinuousSync (std::string&);
////////////////////////////////////////////////////////////////////////////////
/// @brief incrementally fetch data from a collection
////////////////////////////////////////////////////////////////////////////////
int handleCollectionDump (struct TRI_transaction_collection_s*,
const std::string&,
TRI_voc_tick_t,
std::string&);
////////////////////////////////////////////////////////////////////////////////
/// @brief handle the information about a collection
////////////////////////////////////////////////////////////////////////////////
int handleCollectionInitial (struct TRI_json_s const*,
struct TRI_json_s const*,
std::string&,
TRI_replication_applier_phase_e);
////////////////////////////////////////////////////////////////////////////////
/// @brief handle the state response of the master
////////////////////////////////////////////////////////////////////////////////
@ -323,35 +182,12 @@ namespace triagens {
int handleStateResponse (struct TRI_json_s const*,
std::string&);
////////////////////////////////////////////////////////////////////////////////
/// @brief handle the inventory response of the master
////////////////////////////////////////////////////////////////////////////////
int handleInventoryResponse (struct TRI_json_s const*, std::string&);
////////////////////////////////////////////////////////////////////////////////
/// @brief iterate over all collections from a list and apply an action
////////////////////////////////////////////////////////////////////////////////
int iterateCollections (struct TRI_json_s const*,
std::string&,
TRI_replication_applier_phase_e);
////////////////////////////////////////////////////////////////////////////////
/// @brief run the continuous synchronisation
////////////////////////////////////////////////////////////////////////////////
int followMasterLog (std::string&,
uint64_t&,
bool&,
bool&);
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- private variables
// --SECTION-- protected variables
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
@ -359,7 +195,7 @@ namespace triagens {
/// @{
////////////////////////////////////////////////////////////////////////////////
private:
protected:
////////////////////////////////////////////////////////////////////////////////
/// @brief vocbase base pointer
@ -368,13 +204,7 @@ namespace triagens {
struct TRI_vocbase_s* _vocbase;
////////////////////////////////////////////////////////////////////////////////
/// @brief pointer to the applier state
////////////////////////////////////////////////////////////////////////////////
struct TRI_replication_applier_s* _applier;
////////////////////////////////////////////////////////////////////////////////
/// @brief applier configuration;
/// @brief configuration
////////////////////////////////////////////////////////////////////////////////
TRI_replication_applier_configuration_t _configuration;
@ -385,22 +215,6 @@ namespace triagens {
TRI_replication_master_info_t _masterInfo;
////////////////////////////////////////////////////////////////////////////////
/// @brief information about the local applier state
////////////////////////////////////////////////////////////////////////////////
struct {
struct TRI_transaction_s* _trx;
TRI_voc_tid_t _externalTid;
}
_applyState;
////////////////////////////////////////////////////////////////////////////////
/// @brief whether or not a full sync was requested
////////////////////////////////////////////////////////////////////////////////
bool _forceFullSynchronisation;
////////////////////////////////////////////////////////////////////////////////
/// @brief the update policy object (will be the same for all actions)
////////////////////////////////////////////////////////////////////////////////
@ -444,13 +258,13 @@ namespace triagens {
static const std::string BaseUrl;
};
}
}
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
}
}
#endif
// Local Variables:

View File

@ -26,7 +26,7 @@
////////////////////////////////////////////////////////////////////////////////
#include "replication-static.h"
#include "Replication/ReplicationFetcher.h"
#include "Replication/ContinuousSyncer.h"
#include "VocBase/vocbase.h"
@ -51,9 +51,9 @@ extern "C" {
////////////////////////////////////////////////////////////////////////////////
void* TRI_CreateFetcherReplication (TRI_vocbase_t* vocbase,
TRI_replication_applier_configuration_t const* configuration,
bool forceFullSynchronisation) {
ReplicationFetcher* f = new ReplicationFetcher(vocbase, configuration, forceFullSynchronisation);
TRI_replication_applier_configuration_t const* configuration) {
ContinuousSyncer* f = new ContinuousSyncer(vocbase, configuration);
return (void*) f;
}
@ -63,7 +63,7 @@ void* TRI_CreateFetcherReplication (TRI_vocbase_t* vocbase,
////////////////////////////////////////////////////////////////////////////////
void TRI_DeleteFetcherReplication (void* ptr) {
ReplicationFetcher* f = static_cast<ReplicationFetcher*>(ptr);
ContinuousSyncer* f = static_cast<ContinuousSyncer*>(ptr);
delete f;
}
@ -73,7 +73,7 @@ void TRI_DeleteFetcherReplication (void* ptr) {
////////////////////////////////////////////////////////////////////////////////
int TRI_RunFetcherReplication (void* ptr) {
ReplicationFetcher* f = static_cast<ReplicationFetcher*>(ptr);
ContinuousSyncer* f = static_cast<ContinuousSyncer*>(ptr);
return f->run();
}

View File

@ -55,8 +55,7 @@ struct TRI_vocbase_s;
////////////////////////////////////////////////////////////////////////////////
void* TRI_CreateFetcherReplication (struct TRI_vocbase_s*,
struct TRI_replication_applier_configuration_s const*,
bool);
struct TRI_replication_applier_configuration_s const*);
////////////////////////////////////////////////////////////////////////////////
/// @brief static free method

View File

@ -39,8 +39,6 @@
#include "VocBase/replication-logger.h"
#include "VocBase/server-id.h"
#ifdef TRI_ENABLE_REPLICATION
using namespace std;
using namespace triagens::basics;
using namespace triagens::rest;
@ -1623,18 +1621,7 @@ void RestReplicationHandler::handleCommandApplierSetConfig () {
void RestReplicationHandler::handleCommandApplierStart () {
assert(_vocbase->_replicationApplier != 0);
bool found;
const char* value = _request->value("fullSync", found);
bool fullSync;
if (found) {
fullSync = StringUtils::boolean(value);
}
else {
fullSync = false;
}
int res = TRI_StartReplicationApplier(_vocbase->_replicationApplier, fullSync);
int res = TRI_StartReplicationApplier(_vocbase->_replicationApplier);
if (res != TRI_ERROR_NO_ERROR) {
if (res == TRI_ERROR_REPLICATION_INVALID_CONFIGURATION ||
@ -1843,8 +1830,6 @@ void RestReplicationHandler::handleCommandApplierDeleteState () {
/// @}
////////////////////////////////////////////////////////////////////////////////
#endif
// Local Variables:
// mode: outline-minor
// outline-regexp: "/// @brief\\|/// {@inheritDoc}\\|/// @addtogroup\\|/// @page\\|// --SECTION--\\|/// @\\}"

View File

@ -31,8 +31,6 @@
#include "RestHandler/RestVocbaseBaseHandler.h"
#include "HttpServer/HttpServer.h"
//#ifdef TRI_ENABLE_REPLICATION
using namespace triagens::basics;
using namespace triagens::rest;
using namespace std;
@ -285,8 +283,6 @@ namespace triagens {
/// @}
////////////////////////////////////////////////////////////////////////////////
//#endif
#endif
// Local Variables:

View File

@ -135,12 +135,10 @@ static void DefineApiHandlers (HttpHandlerFactory* factory,
RestHandlerCreator<RestBatchHandler>::createData<TRI_vocbase_t*>,
vocbase);
#ifdef TRI_ENABLE_REPLICATION
// add replication handler
factory->addPrefixHandler(RestVocbaseBaseHandler::REPLICATION_PATH,
RestHandlerCreator<RestReplicationHandler>::createData<TRI_vocbase_t*>,
vocbase);
#endif
// add upload handler
factory->addPrefixHandler(RestVocbaseBaseHandler::UPLOAD_PATH,
@ -204,10 +202,8 @@ ArangoServer::ArangoServer (int argc, char** argv)
_multipleDatabases(false),
_removeOnCompacted(true),
_removeOnDrop(true),
#ifdef TRI_ENABLE_REPLICATION
_replicationEnableLogger(false),
_replicationLogRemoteChanges(false),
#endif
_vocbase(0) {
// locate path to binary
@ -365,12 +361,10 @@ void ArangoServer::buildApplicationServer () {
// replication options
// .............................................................................
#ifdef TRI_ENABLE_REPLICATION
additional[ApplicationServer::OPTIONS_REPLICATION + ":help-replication"]
("replication.enable-logger", &_replicationEnableLogger, "enable replication logger")
("replication.log-remote-changes", &_replicationLogRemoteChanges, "log remote changes")
;
#endif
// .............................................................................
// database options
@ -1176,12 +1170,10 @@ static bool handleUserDatabase (TRI_doc_mptr_t const* document,
systemDefaults->requireAuthentication);
defaults.authenticateSystemOnly = doc.getBooleanValue("authenticateSystemOnly",
systemDefaults->authenticateSystemOnly);
#ifdef TRI_ENABLE_REPLICATION
defaults.replicationEnableLogger = doc.getBooleanValue("replicationEnableLogger",
systemDefaults->replicationEnableLogger);
defaults.replicationLogRemoteChanges = doc.getBooleanValue("replicationLogRemoteChanges",
systemDefaults->replicationLogRemoteChanges);
#endif
// open/load database
TRI_vocbase_t* userVocbase = TRI_OpenVocBase(dbPath.c_str(), dbName.c_str(), &defaults);
@ -1401,10 +1393,8 @@ void ArangoServer::openDatabases () {
defaults.forceSyncProperties = _forceSyncProperties;
defaults.requireAuthentication = ! _applicationEndpointServer->isAuthenticationDisabled();
defaults.authenticateSystemOnly = _authenticateSystemOnly;
#ifdef TRI_ENABLE_REPLICATION
defaults.replicationEnableLogger = _replicationEnableLogger;
defaults.replicationLogRemoteChanges = _replicationLogRemoteChanges;
#endif
// store these settings as initial system defaults
TRI_SetSystemDefaultsVocBase(&defaults);

View File

@ -515,17 +515,13 @@ namespace triagens {
/// @brief enable replication logging
////////////////////////////////////////////////////////////////////////////////
#ifdef TRI_ENABLE_REPLICATION
bool _replicationEnableLogger;
#endif
////////////////////////////////////////////////////////////////////////////////
/// @brief enable logging of remotely created changes
////////////////////////////////////////////////////////////////////////////////
#ifdef TRI_ENABLE_REPLICATION
bool _replicationLogRemoteChanges;
#endif
////////////////////////////////////////////////////////////////////////////////
/// @brief vocbase

View File

@ -45,6 +45,7 @@
#include "BasicsC/tri-strings.h"
#include "CapConstraint/cap-constraint.h"
#include "FulltextIndex/fulltext-index.h"
#include "Replication/InitialSyncer.h"
#include "ShapedJson/shape-accessor.h"
#include "ShapedJson/shaped-json.h"
#include "Utils/AhuacatlGuard.h"
@ -3074,8 +3075,6 @@ static v8::Handle<v8::Value> JS_DeleteCursor (v8::Arguments const& argv) {
/// @brief start the replication logger manually
////////////////////////////////////////////////////////////////////////////////
#ifdef TRI_ENABLE_REPLICATION
static v8::Handle<v8::Value> JS_StartLoggerReplication (v8::Arguments const& argv) {
v8::HandleScope scope;
@ -3098,14 +3097,10 @@ static v8::Handle<v8::Value> JS_StartLoggerReplication (v8::Arguments const& arg
return scope.Close(v8::True());
}
#endif
////////////////////////////////////////////////////////////////////////////////
/// @brief stop the replication logger manually
////////////////////////////////////////////////////////////////////////////////
#ifdef TRI_ENABLE_REPLICATION
static v8::Handle<v8::Value> JS_StopLoggerReplication (v8::Arguments const& argv) {
v8::HandleScope scope;
@ -3128,14 +3123,10 @@ static v8::Handle<v8::Value> JS_StopLoggerReplication (v8::Arguments const& argv
return scope.Close(v8::True());
}
#endif
////////////////////////////////////////////////////////////////////////////////
/// @brief get the state of the replication logger
////////////////////////////////////////////////////////////////////////////////
#ifdef TRI_ENABLE_REPLICATION
static v8::Handle<v8::Value> JS_StateLoggerReplication (v8::Arguments const& argv) {
v8::HandleScope scope;
@ -3161,14 +3152,10 @@ static v8::Handle<v8::Value> JS_StateLoggerReplication (v8::Arguments const& arg
return scope.Close(result);
}
#endif
////////////////////////////////////////////////////////////////////////////////
/// @brief configure the replication logger manually
////////////////////////////////////////////////////////////////////////////////
#ifdef TRI_ENABLE_REPLICATION
static v8::Handle<v8::Value> JS_ConfigureLoggerReplication (v8::Arguments const& argv) {
v8::HandleScope scope;
@ -3241,14 +3228,79 @@ static v8::Handle<v8::Value> JS_ConfigureLoggerReplication (v8::Arguments const&
}
}
#endif
////////////////////////////////////////////////////////////////////////////////
/// @brief sync data from a remote master
////////////////////////////////////////////////////////////////////////////////
static v8::Handle<v8::Value> JS_SynchroniseReplication (v8::Arguments const& argv) {
v8::HandleScope scope;
if (argv.Length() != 1) {
TRI_V8_EXCEPTION_USAGE(scope, "REPLICATION_SYNCHRONISE(<config>)");
}
TRI_vocbase_t* vocbase = GetContextVocBase();
// treat the argument as an object from now on
v8::Handle<v8::Object> object = v8::Handle<v8::Object>::Cast(argv[0]);
string endpoint;
if (object->Has(TRI_V8_SYMBOL("endpoint"))) {
endpoint = TRI_ObjectToString(object->Get(TRI_V8_SYMBOL("endpoint")));
}
string username;
if (object->Has(TRI_V8_SYMBOL("username"))) {
username = TRI_ObjectToString(object->Get(TRI_V8_SYMBOL("username")));
}
string password;
if (object->Has(TRI_V8_SYMBOL("password"))) {
password = TRI_ObjectToString(object->Get(TRI_V8_SYMBOL("password")));
}
bool verbose = true;
if (object->Has(TRI_V8_SYMBOL("verbose"))) {
verbose = TRI_ObjectToBoolean(object->Get(TRI_V8_SYMBOL("verbose")));
}
if (endpoint.empty()) {
TRI_V8_EXCEPTION_PARAMETER(scope, "<endpoint> must be a valid endpoint")
}
TRI_replication_applier_configuration_t config;
TRI_InitConfigurationReplicationApplier(&config);
config._endpoint = TRI_DuplicateString2Z(TRI_CORE_MEM_ZONE, endpoint.c_str(), endpoint.size());
config._username = TRI_DuplicateString2Z(TRI_CORE_MEM_ZONE, username.c_str(), username.size());
config._password = TRI_DuplicateString2Z(TRI_CORE_MEM_ZONE, password.c_str(), password.size());
string errorMsg = "";
InitialSyncer syncer(vocbase, &config, verbose);
TRI_DestroyConfigurationReplicationApplier(&config);
int res = TRI_ERROR_NO_ERROR;
v8::Handle<v8::Object> result = v8::Object::New();
try {
res = syncer.run(errorMsg);
result->Set(v8::String::New("lastLogTick"), V8TickId(syncer.getLastLogTick()));
result->Set(v8::String::New("collectionsSynced"), v8::Number::New(syncer.getNumCollections()));
}
catch (...) {
}
if (res != TRI_ERROR_NO_ERROR) {
TRI_V8_EXCEPTION_MESSAGE(scope, res, "cannot sync from remote endpoint: " + errorMsg);
}
return scope.Close(result);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief configure the replication applier manually
////////////////////////////////////////////////////////////////////////////////
#ifdef TRI_ENABLE_REPLICATION
static v8::Handle<v8::Value> JS_ConfigureApplierReplication (v8::Arguments const& argv) {
v8::HandleScope scope;
@ -3370,14 +3422,10 @@ static v8::Handle<v8::Value> JS_ConfigureApplierReplication (v8::Arguments const
}
}
#endif
////////////////////////////////////////////////////////////////////////////////
/// @brief start the replication applier manually
////////////////////////////////////////////////////////////////////////////////
#ifdef TRI_ENABLE_REPLICATION
static v8::Handle<v8::Value> JS_StartApplierReplication (v8::Arguments const& argv) {
v8::HandleScope scope;
@ -3387,16 +3435,11 @@ static v8::Handle<v8::Value> JS_StartApplierReplication (v8::Arguments const& ar
TRI_V8_EXCEPTION_INTERNAL(scope, "cannot extract vocbase");
}
if (argv.Length() > 1) {
TRI_V8_EXCEPTION_USAGE(scope, "REPLICATION_APPLIER_START(<fullSync>)");
if (argv.Length() != 0) {
TRI_V8_EXCEPTION_USAGE(scope, "REPLICATION_APPLIER_START()");
}
bool fullSync = false;
if (argv.Length() > 0) {
fullSync = TRI_ObjectToBoolean(argv[0]);
}
int res = TRI_StartReplicationApplier(vocbase->_replicationApplier, fullSync);
int res = TRI_StartReplicationApplier(vocbase->_replicationApplier);
if (res != TRI_ERROR_NO_ERROR) {
TRI_V8_EXCEPTION_MESSAGE(scope, res, "cannot start replication applier");
@ -3405,14 +3448,10 @@ static v8::Handle<v8::Value> JS_StartApplierReplication (v8::Arguments const& ar
return scope.Close(v8::True());
}
#endif
////////////////////////////////////////////////////////////////////////////////
/// @brief stop the replication applier manually
////////////////////////////////////////////////////////////////////////////////
#ifdef TRI_ENABLE_REPLICATION
static v8::Handle<v8::Value> JS_StopApplierReplication (v8::Arguments const& argv) {
v8::HandleScope scope;
@ -3435,14 +3474,10 @@ static v8::Handle<v8::Value> JS_StopApplierReplication (v8::Arguments const& arg
return scope.Close(v8::True());
}
#endif
////////////////////////////////////////////////////////////////////////////////
/// @brief get the state of the replication applier
////////////////////////////////////////////////////////////////////////////////
#ifdef TRI_ENABLE_REPLICATION
static v8::Handle<v8::Value> JS_StateApplierReplication (v8::Arguments const& argv) {
v8::HandleScope scope;
@ -3468,14 +3503,10 @@ static v8::Handle<v8::Value> JS_StateApplierReplication (v8::Arguments const& ar
return scope.Close(result);
}
#endif
////////////////////////////////////////////////////////////////////////////////
/// @brief stop the replication applier and "forget" all state
////////////////////////////////////////////////////////////////////////////////
#ifdef TRI_ENABLE_REPLICATION
static v8::Handle<v8::Value> JS_ForgetApplierReplication (v8::Arguments const& argv) {
v8::HandleScope scope;
@ -3498,8 +3529,6 @@ static v8::Handle<v8::Value> JS_ForgetApplierReplication (v8::Arguments const& a
return scope.Close(v8::True());
}
#endif
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
@ -5795,7 +5824,6 @@ static v8::Handle<v8::Value> JS_PropertiesVocbaseCol (v8::Arguments const& argv)
TRI_V8_EXCEPTION(scope, res);
}
#ifdef TRI_ENABLE_REPLICATION
TRI_json_t* json = TRI_CreateJsonCollectionInfo(&base->_info);
TRI_LogChangePropertiesCollectionReplication(base->_vocbase,
base->_info._cid,
@ -5803,7 +5831,6 @@ static v8::Handle<v8::Value> JS_PropertiesVocbaseCol (v8::Arguments const& argv)
json,
TRI_GetServerId());
TRI_FreeJson(TRI_CORE_MEM_ZONE, json);
#endif
}
}
@ -7495,9 +7522,7 @@ static v8::Handle<v8::Value> JS_CreateUserVocbase (v8::Arguments const& argv) {
v8::Local<v8::String> keyForceSyncProperties = v8::String::New("forceSyncProperties");
v8::Local<v8::String> keyRequireAuthentication = v8::String::New("requireAuthentication");
v8::Local<v8::String> keyAuthenticateSystemOnly = v8::String::New("authenticateSystemOnly");
#ifdef TRI_ENABLE_REPLICATION
v8::Local<v8::String> keyReplicationEnableLogger = v8::String::New("replicationEnableLogger");
#endif
// get database defaults from system vocbase
TRI_vocbase_defaults_t defaults;
@ -7539,11 +7564,9 @@ static v8::Handle<v8::Value> JS_CreateUserVocbase (v8::Arguments const& argv) {
defaults.authenticateSystemOnly = options->Get(keyAuthenticateSystemOnly)->BooleanValue();
}
#ifdef TRI_ENABLE_REPLICATION
if (options->Has(keyReplicationEnableLogger)) {
defaults.replicationEnableLogger = options->Get(keyReplicationEnableLogger)->BooleanValue();
}
#endif
}
// load vocbase with defaults
@ -8462,17 +8485,16 @@ void TRI_InitV8VocBridge (v8::Handle<v8::Context> context,
TRI_AddGlobalFunctionVocbase(context, "CREATE_CURSOR", JS_CreateCursor);
TRI_AddGlobalFunctionVocbase(context, "DELETE_CURSOR", JS_DeleteCursor);
#ifdef TRI_ENABLE_REPLICATION
TRI_AddGlobalFunctionVocbase(context, "REPLICATION_LOGGER_START", JS_StartLoggerReplication);
TRI_AddGlobalFunctionVocbase(context, "REPLICATION_LOGGER_STOP", JS_StopLoggerReplication);
TRI_AddGlobalFunctionVocbase(context, "REPLICATION_LOGGER_STATE", JS_StateLoggerReplication);
TRI_AddGlobalFunctionVocbase(context, "REPLICATION_LOGGER_CONFIGURE", JS_ConfigureLoggerReplication);
TRI_AddGlobalFunctionVocbase(context, "REPLICATION_SYNCHRONISE", JS_SynchroniseReplication);
TRI_AddGlobalFunctionVocbase(context, "REPLICATION_APPLIER_CONFIGURE", JS_ConfigureApplierReplication);
TRI_AddGlobalFunctionVocbase(context, "REPLICATION_APPLIER_START", JS_StartApplierReplication);
TRI_AddGlobalFunctionVocbase(context, "REPLICATION_APPLIER_STOP", JS_StopApplierReplication);
TRI_AddGlobalFunctionVocbase(context, "REPLICATION_APPLIER_STATE", JS_StateApplierReplication);
TRI_AddGlobalFunctionVocbase(context, "REPLICATION_APPLIER_FORGET", JS_ForgetApplierReplication);
#endif
TRI_AddGlobalFunctionVocbase(context, "COMPARE_STRING", JS_compare_string);
TRI_AddGlobalFunctionVocbase(context, "NORMALIZE_STRING", JS_normalize_string);

View File

@ -4064,9 +4064,7 @@ bool TRI_DropIndexDocumentCollection (TRI_document_collection_t* document,
TRI_idx_iid_t iid,
TRI_server_id_t generatingServer) {
TRI_index_t* found;
#ifdef TRI_ENABLE_REPLICATION
TRI_vocbase_t* vocbase;
#endif
TRI_primary_collection_t* primary;
size_t i, n;
@ -4078,11 +4076,8 @@ bool TRI_DropIndexDocumentCollection (TRI_document_collection_t* document,
found = NULL;
primary = &document->base;
#ifdef TRI_ENABLE_REPLICATION
vocbase = primary->base._vocbase;
TRI_ReadLockReadWriteLock(&vocbase->_inventoryLock);
#endif
// .............................................................................
// inside write-lock
@ -4118,9 +4113,7 @@ bool TRI_DropIndexDocumentCollection (TRI_document_collection_t* document,
TRI_WRITE_UNLOCK_DOCUMENTS_INDEXES_PRIMARY_COLLECTION(primary);
#ifdef TRI_ENABLE_REPLICATION
TRI_ReadUnlockReadWriteLock(&vocbase->_inventoryLock);
#endif
// .............................................................................
// outside write-lock
@ -4132,14 +4125,12 @@ bool TRI_DropIndexDocumentCollection (TRI_document_collection_t* document,
removeResult = TRI_RemoveIndexFile(primary, found);
TRI_FreeIndex(found);
#ifdef TRI_ENABLE_REPLICATION
// it is safe to use _name as we hold a read-lock on the collection status
TRI_LogDropIndexReplication(vocbase,
primary->base._info._cid,
primary->base._info._name,
iid,
generatingServer);
#endif
return removeResult;
}
@ -4406,9 +4397,7 @@ TRI_index_t* TRI_EnsureCapConstraintDocumentCollection (TRI_document_collection_
// inside write-lock
// .............................................................................
#ifdef TRI_ENABLE_REPLICATION
TRI_ReadLockReadWriteLock(&primary->base._vocbase->_inventoryLock);
#endif
TRI_WRITE_LOCK_DOCUMENTS_INDEXES_PRIMARY_COLLECTION(primary);
@ -4432,9 +4421,7 @@ TRI_index_t* TRI_EnsureCapConstraintDocumentCollection (TRI_document_collection_
}
}
#ifdef TRI_ENABLE_REPLICATION
TRI_ReadUnlockReadWriteLock(&primary->base._vocbase->_inventoryLock);
#endif
return idx;
}
@ -4824,9 +4811,7 @@ TRI_index_t* TRI_EnsureGeoIndex1DocumentCollection (TRI_document_collection_t* d
primary = &document->base;
#ifdef TRI_ENABLE_REPLICATION
TRI_ReadLockReadWriteLock(&primary->base._vocbase->_inventoryLock);
#endif
// .............................................................................
// inside write-lock
@ -4854,9 +4839,7 @@ TRI_index_t* TRI_EnsureGeoIndex1DocumentCollection (TRI_document_collection_t* d
}
}
#ifdef TRI_ENABLE_REPLICATION
TRI_ReadUnlockReadWriteLock(&primary->base._vocbase->_inventoryLock);
#endif
return idx;
}
@ -4877,9 +4860,7 @@ TRI_index_t* TRI_EnsureGeoIndex2DocumentCollection (TRI_document_collection_t* d
primary = &document->base;
#ifdef TRI_ENABLE_REPLICATION
TRI_ReadLockReadWriteLock(&primary->base._vocbase->_inventoryLock);
#endif
// .............................................................................
// inside write-lock
@ -4907,9 +4888,7 @@ TRI_index_t* TRI_EnsureGeoIndex2DocumentCollection (TRI_document_collection_t* d
}
}
#ifdef TRI_ENABLE_REPLICATION
TRI_ReadUnlockReadWriteLock(&primary->base._vocbase->_inventoryLock);
#endif
return idx;
}
@ -5103,9 +5082,7 @@ TRI_index_t* TRI_EnsureHashIndexDocumentCollection (TRI_document_collection_t* d
primary = &document->base;
#ifdef TRI_ENABLE_REPLICATION
TRI_ReadLockReadWriteLock(&primary->base._vocbase->_inventoryLock);
#endif
// .............................................................................
// inside write-lock
@ -5134,9 +5111,7 @@ TRI_index_t* TRI_EnsureHashIndexDocumentCollection (TRI_document_collection_t* d
}
}
#ifdef TRI_ENABLE_REPLICATION
TRI_ReadUnlockReadWriteLock(&primary->base._vocbase->_inventoryLock);
#endif
return idx;
}
@ -5322,9 +5297,7 @@ TRI_index_t* TRI_EnsureSkiplistIndexDocumentCollection (TRI_document_collection_
primary = &document->base;
#ifdef TRI_ENABLE_REPLICATION
TRI_ReadLockReadWriteLock(&primary->base._vocbase->_inventoryLock);
#endif
// .............................................................................
// inside write-lock the collection
@ -5352,9 +5325,7 @@ TRI_index_t* TRI_EnsureSkiplistIndexDocumentCollection (TRI_document_collection_
}
}
#ifdef TRI_ENABLE_REPLICATION
TRI_ReadUnlockReadWriteLock(&primary->base._vocbase->_inventoryLock);
#endif
return idx;
}
@ -5605,9 +5576,7 @@ TRI_index_t* TRI_EnsureFulltextIndexDocumentCollection (TRI_document_collection_
primary = &document->base;
#ifdef TRI_ENABLE_REPLICATION
TRI_ReadLockReadWriteLock(&primary->base._vocbase->_inventoryLock);
#endif
// .............................................................................
// inside write-lock the collection
@ -5635,9 +5604,7 @@ TRI_index_t* TRI_EnsureFulltextIndexDocumentCollection (TRI_document_collection_
}
}
#ifdef TRI_ENABLE_REPLICATION
TRI_ReadUnlockReadWriteLock(&primary->base._vocbase->_inventoryLock);
#endif
return idx;
}
@ -5874,9 +5841,7 @@ TRI_index_t* TRI_EnsurePriorityQueueIndexDocumentCollection(TRI_document_collect
primary = &document->base;
#ifdef TRI_ENABLE_REPLICATION
TRI_ReadLockReadWriteLock(&primary->base._vocbase->_inventoryLock);
#endif
// .............................................................................
// inside write-lock
@ -5905,9 +5870,7 @@ TRI_index_t* TRI_EnsurePriorityQueueIndexDocumentCollection(TRI_document_collect
}
}
#ifdef TRI_ENABLE_REPLICATION
TRI_ReadUnlockReadWriteLock(&primary->base._vocbase->_inventoryLock);
#endif
return idx;
}
@ -6173,9 +6136,7 @@ TRI_index_t* TRI_EnsureBitarrayIndexDocumentCollection (TRI_document_collection_
*errorCode = TRI_ERROR_NO_ERROR;
*errorStr = NULL;
#ifdef TRI_ENABLE_REPLICATION
TRI_ReadLockReadWriteLock(&primary->base._vocbase->_inventoryLock);
#endif
// .............................................................................
// inside write-lock the collection
@ -6215,9 +6176,7 @@ TRI_index_t* TRI_EnsureBitarrayIndexDocumentCollection (TRI_document_collection_
}
}
#ifdef TRI_ENABLE_REPLICATION
TRI_ReadUnlockReadWriteLock(&primary->base._vocbase->_inventoryLock);
#endif
// .............................................................................
// Index already exists so simply return it

View File

@ -251,7 +251,6 @@ int TRI_SaveIndex (TRI_primary_collection_t* collection,
return TRI_errno();
}
#ifdef TRI_ENABLE_REPLICATION
// it is safe to use _name as we hold a read-lock on the collection status
TRI_LogCreateIndexReplication(vocbase,
collection->base._info._cid,
@ -259,7 +258,6 @@ int TRI_SaveIndex (TRI_primary_collection_t* collection,
idx->_iid,
json,
generatingServer);
#endif
TRI_FreeJson(TRI_CORE_MEM_ZONE, json);

View File

@ -40,9 +40,6 @@
#include "VocBase/transaction.h"
#include "VocBase/vocbase.h"
#ifdef TRI_ENABLE_REPLICATION
// -----------------------------------------------------------------------------
// --SECTION-- REPLICATION APPLIER
// -----------------------------------------------------------------------------
@ -82,32 +79,6 @@ static bool CheckTerminateFlag (TRI_replication_applier_t* applier) {
return result;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief stringify an applier phase name
////////////////////////////////////////////////////////////////////////////////
static const char* StringifyPhase (TRI_replication_applier_phase_e phase) {
switch (phase) {
case PHASE_NONE:
return "not running";
case PHASE_INIT:
return "initialising";
case PHASE_VALIDATE:
return "initial dump - validating";
case PHASE_DROP:
return "initial dump - dropping collections";
case PHASE_CREATE:
return "initial dump - creating collections";
case PHASE_DUMP:
return "initial dump - dumping data";
case PHASE_FOLLOW:
return "continuous dump";
}
assert(false);
return NULL;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief read a tick value from a JSON struct
////////////////////////////////////////////////////////////////////////////////
@ -429,8 +400,7 @@ void ApplyThread (void* data) {
/// note: must hold the lock when calling this
////////////////////////////////////////////////////////////////////////////////
static int StartApplier (TRI_replication_applier_t* applier,
bool fullSync) {
static int StartApplier (TRI_replication_applier_t* applier) {
TRI_replication_applier_state_t* state;
void* fetcher;
@ -444,27 +414,7 @@ static int StartApplier (TRI_replication_applier_t* applier,
return SetError(applier, TRI_ERROR_REPLICATION_INVALID_CONFIGURATION, "no endpoint configured");
}
if (fullSync) {
state->_lastProcessedContinuousTick = 0;
state->_lastAppliedContinuousTick = 0;
state->_lastAvailableContinuousTick = 0;
state->_lastAppliedInitialTick = 0;
state->_lastError._code = 0;
state->_lastError._time[0] = '\0';
if (state->_lastError._msg != NULL) {
TRI_FreeString(TRI_CORE_MEM_ZONE, state->_lastError._msg);
}
state->_lastError._msg = NULL;
if (state->_progressMsg != NULL) {
TRI_FreeString(TRI_CORE_MEM_ZONE, state->_progressMsg);
}
state->_progressMsg = NULL;
state->_progressTime[0] = '\0';
}
fetcher = (void*) TRI_CreateFetcherReplication(applier->_vocbase, &applier->_configuration, fullSync);
fetcher = (void*) TRI_CreateFetcherReplication(applier->_vocbase, &applier->_configuration);
if (fetcher == NULL) {
return TRI_ERROR_OUT_OF_MEMORY;
@ -481,8 +431,6 @@ static int StartApplier (TRI_replication_applier_t* applier,
return TRI_ERROR_INTERNAL;
}
applier->_state._phase = PHASE_INIT;
LOG_INFO("started replication applier for database '%s'",
applier->_databaseName);
@ -508,8 +456,6 @@ static int StopApplier (TRI_replication_applier_t* applier,
SetTerminateFlag(applier, true);
state->_phase = PHASE_NONE;
TRI_SetProgressReplicationApplier(applier, "applier stopped", false);
if (resetError) {
@ -538,7 +484,6 @@ static int StopApplier (TRI_replication_applier_t* applier,
static TRI_json_t* JsonState (TRI_replication_applier_state_t const* state) {
TRI_json_t* json;
TRI_json_t* last;
TRI_json_t* phase;
TRI_json_t* progress;
TRI_json_t* error;
char* lastString;
@ -589,13 +534,6 @@ static TRI_json_t* JsonState (TRI_replication_applier_state_t const* state) {
}
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, json, "lastAppliedInitialTick", last);
// currentPhase
phase = TRI_CreateArray2Json(TRI_CORE_MEM_ZONE, 2);
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, phase, "id", TRI_CreateNumberJson(TRI_CORE_MEM_ZONE, (double) state->_phase));
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, phase, "label", TRI_CreateStringCopyJson(TRI_CORE_MEM_ZONE, StringifyPhase(state->_phase)));
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, json, "currentPhase", phase);
// progress
progress = TRI_CreateArray2Json(TRI_CORE_MEM_ZONE, 2);
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, progress, "time", TRI_CreateStringCopyJson(TRI_CORE_MEM_ZONE, state->_progressTime));
@ -769,13 +707,12 @@ TRI_json_t* TRI_JsonConfigurationReplicationApplier (TRI_replication_applier_con
/// @brief start the replication applier
////////////////////////////////////////////////////////////////////////////////
int TRI_StartReplicationApplier (TRI_replication_applier_t* applier,
bool fullSync) {
int TRI_StartReplicationApplier (TRI_replication_applier_t* applier) {
int res;
res = TRI_ERROR_NO_ERROR;
LOG_TRACE("requesting replication applier start. fullSync: %d", (int) fullSync);
LOG_TRACE("requesting replication applier start");
// wait until previous applier thread is shut down
while (! TRI_WaitReplicationApplier(applier, 10 * 1000));
@ -783,7 +720,7 @@ int TRI_StartReplicationApplier (TRI_replication_applier_t* applier,
TRI_WriteLockReadWriteLock(&applier->_statusLock);
if (! applier->_state._active) {
res = StartApplier(applier, fullSync);
res = StartApplier(applier);
}
TRI_WriteUnlockReadWriteLock(&applier->_statusLock);
@ -877,7 +814,6 @@ int TRI_StateReplicationApplier (TRI_replication_applier_t* applier,
state->_lastAvailableContinuousTick = applier->_state._lastAvailableContinuousTick;
state->_lastAppliedInitialTick = applier->_state._lastAppliedInitialTick;
state->_serverId = applier->_state._serverId;
state->_phase = applier->_state._phase;
state->_lastError._code = applier->_state._lastError._code;
memcpy(&state->_lastError._time, &applier->_state._lastError._time, sizeof(state->_lastError._time));
@ -975,19 +911,6 @@ int TRI_SetErrorReplicationApplier (TRI_replication_applier_t* applier,
return errorCode;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief set the current phase
////////////////////////////////////////////////////////////////////////////////
void TRI_SetPhaseReplicationApplier (TRI_replication_applier_t* applier,
TRI_replication_applier_phase_e phase) {
TRI_WriteLockReadWriteLock(&applier->_statusLock);
applier->_state._phase = phase;
TRI_WriteUnlockReadWriteLock(&applier->_statusLock);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief set the progress with or without a lock
////////////////////////////////////////////////////////////////////////////////
@ -1029,7 +952,6 @@ void TRI_InitStateReplicationApplier (TRI_replication_applier_state_t* state) {
memset(state, 0, sizeof(TRI_replication_applier_state_t));
state->_active = false;
state->_phase = PHASE_NONE;
state->_lastError._code = TRI_ERROR_NO_ERROR;
state->_lastError._msg = NULL;
@ -1338,8 +1260,6 @@ int TRI_ForgetReplicationApplier (TRI_replication_applier_t* applier) {
/// @}
////////////////////////////////////////////////////////////////////////////////
#endif
// Local Variables:
// mode: outline-minor
// outline-regexp: "/// @brief\\|/// {@inheritDoc}\\|/// @addtogroup\\|/// @page\\|// --SECTION--\\|/// @\\}"

View File

@ -61,21 +61,6 @@ struct TRI_vocbase_s;
/// @{
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @brief apply phases
////////////////////////////////////////////////////////////////////////////////
typedef enum {
PHASE_NONE,
PHASE_INIT,
PHASE_VALIDATE,
PHASE_DROP,
PHASE_CREATE,
PHASE_DUMP,
PHASE_FOLLOW
}
TRI_replication_applier_phase_e;
////////////////////////////////////////////////////////////////////////////////
/// @brief struct containing a replication apply configuration
////////////////////////////////////////////////////////////////////////////////
@ -113,7 +98,6 @@ typedef struct TRI_replication_applier_state_s {
TRI_voc_tick_t _lastAppliedContinuousTick;
TRI_voc_tick_t _lastAvailableContinuousTick;
bool _active;
TRI_replication_applier_phase_e _phase;
char* _progressMsg;
char _progressTime[24];
TRI_voc_tick_t _lastAppliedInitialTick;
@ -200,8 +184,7 @@ struct TRI_json_s* TRI_JsonConfigurationReplicationApplier (TRI_replication_appl
/// @brief start the replication applier
////////////////////////////////////////////////////////////////////////////////
int TRI_StartReplicationApplier (TRI_replication_applier_t*,
bool);
int TRI_StartReplicationApplier (TRI_replication_applier_t*);
////////////////////////////////////////////////////////////////////////////////
/// @brief stop the replication applier
@ -238,13 +221,6 @@ int TRI_SetErrorReplicationApplier (TRI_replication_applier_t*,
int,
char const*);
////////////////////////////////////////////////////////////////////////////////
/// @brief set the current phase
////////////////////////////////////////////////////////////////////////////////
void TRI_SetPhaseReplicationApplier (TRI_replication_applier_t*,
TRI_replication_applier_phase_e);
////////////////////////////////////////////////////////////////////////////////
/// @brief set the progress with or without a lock
////////////////////////////////////////////////////////////////////////////////

View File

@ -32,9 +32,6 @@
#include "VocBase/collection.h"
#include "VocBase/vocbase.h"
#ifdef TRI_ENABLE_REPLICATION
// -----------------------------------------------------------------------------
// --SECTION-- REPLICATION
// -----------------------------------------------------------------------------
@ -80,8 +77,6 @@ bool TRI_ExcludeCollectionReplication (const char* name) {
/// @}
////////////////////////////////////////////////////////////////////////////////
#endif
// Local Variables:
// mode: outline-minor
// outline-regexp: "/// @brief\\|/// {@inheritDoc}\\|/// @addtogroup\\|/// @page\\|// --SECTION--\\|/// @\\}"

View File

@ -40,9 +40,6 @@
#include "VocBase/transaction.h"
#include "VocBase/vocbase.h"
#ifdef TRI_ENABLE_REPLICATION
// -----------------------------------------------------------------------------
// --SECTION-- REPLICATION
// -----------------------------------------------------------------------------
@ -1036,8 +1033,6 @@ void TRI_InitDumpReplication (TRI_replication_dump_t* dump) {
/// @}
////////////////////////////////////////////////////////////////////////////////
#endif
// Local Variables:
// mode: outline-minor
// outline-regexp: "/// @brief\\|/// {@inheritDoc}\\|/// @addtogroup\\|/// @page\\|// --SECTION--\\|/// @\\}"

View File

@ -92,52 +92,28 @@ TRI_replication_dump_t;
/// @brief dump data from a single collection
////////////////////////////////////////////////////////////////////////////////
#ifdef TRI_ENABLE_REPLICATION
int TRI_DumpCollectionReplication (TRI_replication_dump_t*,
struct TRI_vocbase_col_s*,
TRI_voc_tick_t,
TRI_voc_tick_t,
uint64_t);
#else
#define TRI_DumpCollectionReplication(...)
#endif
////////////////////////////////////////////////////////////////////////////////
/// @brief dump data from the replication log
////////////////////////////////////////////////////////////////////////////////
#ifdef TRI_ENABLE_REPLICATION
int TRI_DumpLogReplication (struct TRI_vocbase_s*,
TRI_replication_dump_t*,
TRI_voc_tick_t,
TRI_voc_tick_t,
uint64_t);
#else
#define TRI_DumpLogReplication(...)
#endif
////////////////////////////////////////////////////////////////////////////////
/// @brief initialise a replication dump container
////////////////////////////////////////////////////////////////////////////////
#ifdef TRI_ENABLE_REPLICATION
void TRI_InitDumpReplication (TRI_replication_dump_t*);
#else
#define TRI_InitDumpReplication(...)
#endif
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////

View File

@ -41,9 +41,6 @@
#include "VocBase/transaction.h"
#include "VocBase/vocbase.h"
#ifdef TRI_ENABLE_REPLICATION
// -----------------------------------------------------------------------------
// --SECTION-- REPLICATION LOGGER
// -----------------------------------------------------------------------------
@ -1915,8 +1912,6 @@ int TRI_LogDocumentReplication (TRI_vocbase_t* vocbase,
/// @}
////////////////////////////////////////////////////////////////////////////////
#endif
// Local Variables:
// mode: outline-minor
// outline-regexp: "/// @brief\\|/// {@inheritDoc}\\|/// @addtogroup\\|/// @page\\|// --SECTION--\\|/// @\\}"

View File

@ -240,94 +240,52 @@ struct TRI_json_s* TRI_JsonReplicationLogger (TRI_replication_logger_t*);
/// @brief replicate a transaction
////////////////////////////////////////////////////////////////////////////////
#ifdef TRI_ENABLE_REPLICATION
int TRI_LogTransactionReplication (struct TRI_vocbase_s*,
struct TRI_transaction_s const*,
TRI_server_id_t);
#else
#define TRI_LogTransactionReplication(...)
#endif
////////////////////////////////////////////////////////////////////////////////
/// @brief replicate a "create collection" operation
////////////////////////////////////////////////////////////////////////////////
#ifdef TRI_ENABLE_REPLICATION
int TRI_LogCreateCollectionReplication (struct TRI_vocbase_s*,
TRI_voc_cid_t,
char const*,
struct TRI_json_s const*,
TRI_server_id_t);
#else
#define TRI_LogCreateCollectionReplication(...)
#endif
////////////////////////////////////////////////////////////////////////////////
/// @brief replicate a "drop collection" operation
////////////////////////////////////////////////////////////////////////////////
#ifdef TRI_ENABLE_REPLICATION
int TRI_LogDropCollectionReplication (struct TRI_vocbase_s*,
TRI_voc_cid_t,
char const*,
TRI_server_id_t);
#else
#define TRI_LogDropCollectionReplication(...)
#endif
////////////////////////////////////////////////////////////////////////////////
/// @brief replicate a "rename collection" operation
////////////////////////////////////////////////////////////////////////////////
#ifdef TRI_ENABLE_REPLICATION
int TRI_LogRenameCollectionReplication (struct TRI_vocbase_s*,
TRI_voc_cid_t,
char const*,
TRI_server_id_t);
#else
#define TRI_LogRenameCollectionReplication(...)
#endif
////////////////////////////////////////////////////////////////////////////////
/// @brief replicate a "change collection properties" operation
////////////////////////////////////////////////////////////////////////////////
#ifdef TRI_ENABLE_REPLICATION
int TRI_LogChangePropertiesCollectionReplication (struct TRI_vocbase_s*,
TRI_voc_cid_t,
char const*,
struct TRI_json_s const*,
TRI_server_id_t);
#else
#define TRI_LogChangePropertiesCollectionReplication(...)
#endif
////////////////////////////////////////////////////////////////////////////////
/// @brief replicate a "create index" operation
////////////////////////////////////////////////////////////////////////////////
#ifdef TRI_ENABLE_REPLICATION
int TRI_LogCreateIndexReplication (struct TRI_vocbase_s*,
TRI_voc_cid_t,
char const*,
@ -335,36 +293,20 @@ int TRI_LogCreateIndexReplication (struct TRI_vocbase_s*,
struct TRI_json_s const*,
TRI_server_id_t);
#else
#define TRI_LogCreateIndexReplication(...)
#endif
////////////////////////////////////////////////////////////////////////////////
/// @brief replicate a "drop index" operation
////////////////////////////////////////////////////////////////////////////////
#ifdef TRI_ENABLE_REPLICATION
int TRI_LogDropIndexReplication (struct TRI_vocbase_s*,
TRI_voc_cid_t,
char const*,
TRI_idx_iid_t iid,
TRI_server_id_t);
#else
#define TRI_LogDropIndexReplication(...)
#endif
////////////////////////////////////////////////////////////////////////////////
/// @brief replicate a document operation
////////////////////////////////////////////////////////////////////////////////
#ifdef TRI_ENABLE_REPLICATION
int TRI_LogDocumentReplication (struct TRI_vocbase_s*,
struct TRI_document_collection_s*,
TRI_voc_document_operation_e,
@ -372,12 +314,6 @@ int TRI_LogDocumentReplication (struct TRI_vocbase_s*,
struct TRI_doc_mptr_s const*,
TRI_server_id_t);
#else
#define TRI_LogDocumentReplication(...)
#endif
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////

View File

@ -30,9 +30,6 @@
#include "BasicsC/logging.h"
#include "BasicsC/tri-strings.h"
#ifdef TRI_ENABLE_REPLICATION
// -----------------------------------------------------------------------------
// --SECTION-- REPLICATION MASTER INFO
// -----------------------------------------------------------------------------
@ -89,8 +86,6 @@ void TRI_LogMasterInfoReplication (TRI_replication_master_info_t const* info,
/// @}
////////////////////////////////////////////////////////////////////////////////
#endif
// Local Variables:
// mode: outline-minor
// outline-regexp: "/// @brief\\|/// {@inheritDoc}\\|/// @addtogroup\\|/// @page\\|// --SECTION--\\|/// @\\}"

View File

@ -81,46 +81,22 @@ TRI_replication_master_info_t;
/// @brief initialise a master info struct
////////////////////////////////////////////////////////////////////////////////
#ifdef TRI_ENABLE_REPLICATION
void TRI_InitMasterInfoReplication (TRI_replication_master_info_t*,
const char*);
#else
#define TRI_InitMasterInfoReplication(...)
#endif
////////////////////////////////////////////////////////////////////////////////
/// @brief destroy a master info struct
////////////////////////////////////////////////////////////////////////////////
#ifdef TRI_ENABLE_REPLICATION
void TRI_DestroyMasterInfoReplication (TRI_replication_master_info_t*);
#else
#define TRI_DestroyMasterInfoReplication(...)
#endif
////////////////////////////////////////////////////////////////////////////////
/// @brief log information about the master state
////////////////////////////////////////////////////////////////////////////////
#ifdef TRI_ENABLE_REPLICATION
void TRI_LogMasterInfoReplication (TRI_replication_master_info_t const*,
const char*);
#else
#define TRI_LogMasterInfoReplication(...)
#endif
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////

View File

@ -845,11 +845,9 @@ static int WriteOperationsSingle (TRI_transaction_t* const trx) {
// something went wrong. now write the "abort" marker
WriteAbortMarkers(trx, i + 1);
}
#ifdef TRI_ENABLE_REPLICATION
else if (trx->_replicate) {
TRI_LogTransactionReplication(trx->_context->_vocbase, trx, trx->_generatingServer);
}
#endif
return res;
}
@ -964,11 +962,9 @@ static int WriteOperationsMulti (TRI_transaction_t* const trx,
false);
#ifdef TRI_ENABLE_REPLICATION
if (res == TRI_ERROR_NO_ERROR && trx->_replicate) {
TRI_LogTransactionReplication(trx->_context->_vocbase, trx, trx->_generatingServer);
}
#endif
}
}
else {
@ -1923,8 +1919,6 @@ int TRI_AddOperationCollectionTransaction (TRI_transaction_collection_t* trxColl
doSync);
*directOperation = true;
#ifdef TRI_ENABLE_REPLICATION
if (res == TRI_ERROR_NO_ERROR && trx->_replicate) {
TRI_LogDocumentReplication(trx->_context->_vocbase,
(TRI_document_collection_t*) primary,
@ -1933,7 +1927,6 @@ int TRI_AddOperationCollectionTransaction (TRI_transaction_collection_t* trxColl
oldData,
trx->_generatingServer);
}
#endif
}
else {
trx->_hasOperations = true;

View File

@ -260,10 +260,8 @@ static void CopyDefaults (TRI_vocbase_defaults_t const* src,
dst->forceSyncProperties = src->forceSyncProperties;
dst->requireAuthentication = src->requireAuthentication;
dst->authenticateSystemOnly = src->authenticateSystemOnly;
#ifdef TRI_ENABLE_REPLICATION
dst->replicationEnableLogger = src->replicationEnableLogger;
dst->replicationLogRemoteChanges = src->replicationLogRemoteChanges;
#endif
}
////////////////////////////////////////////////////////////////////////////////
@ -281,10 +279,8 @@ static void ApplyDefaults (TRI_vocbase_t* vocbase,
vocbase->_forceSyncProperties = defaults->forceSyncProperties;
vocbase->_requireAuthentication = defaults->requireAuthentication;
vocbase->_authenticateSystemOnly = defaults->authenticateSystemOnly;
#ifdef TRI_ENABLE_REPLICATION
vocbase->_replicationEnableLogger = defaults->replicationEnableLogger;
vocbase->_replicationLogRemoteChanges = defaults->replicationLogRemoteChanges;
#endif
}
////////////////////////////////////////////////////////////////////////////////
@ -302,10 +298,8 @@ static void GetDefaults (TRI_vocbase_t const* vocbase,
defaults->forceSyncProperties = vocbase->_forceSyncProperties;
defaults->requireAuthentication = vocbase->_requireAuthentication;
defaults->authenticateSystemOnly = vocbase->_authenticateSystemOnly;
#ifdef TRI_ENABLE_REPLICATION
defaults->replicationEnableLogger = vocbase->_replicationEnableLogger;
defaults->replicationLogRemoteChanges = vocbase->_replicationLogRemoteChanges;
#endif
}
////////////////////////////////////////////////////////////////////////////////
@ -474,12 +468,10 @@ static bool UnregisterCollection (TRI_vocbase_t* vocbase,
TRI_WRITE_UNLOCK_COLLECTIONS_VOCBASE(vocbase);
#ifdef TRI_ENABLE_REPLICATION
TRI_LogDropCollectionReplication(vocbase,
collection->_cid,
collection->_name,
generatingServer);
#endif
return true;
}
@ -1563,9 +1555,7 @@ TRI_vocbase_t* TRI_OpenVocBase (char const* path,
vocbase->_transactionContext = TRI_CreateTransactionContext(vocbase);
#ifdef TRI_ENABLE_REPLICATION
TRI_InitReadWriteLock(&vocbase->_inventoryLock);
#endif
TRI_InitReadWriteLock(&vocbase->_authInfoLock);
TRI_InitReadWriteLock(&vocbase->_lock);
@ -1678,7 +1668,6 @@ TRI_vocbase_t* TRI_OpenVocBase (char const* path,
TRI_StartThread(&(vocbase->_indexGC), "[indeX_garbage_collector]", TRI_IndexGCVocBase, vocbase);
#ifdef TRI_ENABLE_REPLICATION
vocbase->_replicationLogger = TRI_CreateReplicationLogger(vocbase, defaults);
if (vocbase->_replicationLogger == NULL) {
@ -1702,7 +1691,7 @@ TRI_vocbase_t* TRI_OpenVocBase (char const* path,
if (vocbase->_replicationApplier->_configuration._autoStart) {
int res2;
res2 = TRI_StartReplicationApplier(vocbase->_replicationApplier, false);
res2 = TRI_StartReplicationApplier(vocbase->_replicationApplier);
if (res2 != TRI_ERROR_NO_ERROR) {
LOG_WARNING("unable to start replication applier for database '%s': %s",
@ -1710,7 +1699,6 @@ TRI_vocbase_t* TRI_OpenVocBase (char const* path,
TRI_errno_string(res2));
}
}
#endif
// we are done
return vocbase;
@ -1740,13 +1728,11 @@ void TRI_DestroyVocBase (TRI_vocbase_t* vocbase) {
TRI_DestroyVectorPointer(&collections);
#ifdef TRI_ENABLE_REPLICATION
TRI_FreeReplicationLogger(vocbase->_replicationLogger);
vocbase->_replicationLogger = NULL;
TRI_FreeReplicationApplier(vocbase->_replicationApplier);
vocbase->_replicationApplier = NULL;
#endif
// this will signal the synchroniser and the compactor threads to do one last iteration
vocbase->_state = 2;
@ -1810,9 +1796,7 @@ void TRI_DestroyVocBase (TRI_vocbase_t* vocbase) {
TRI_FreeTransactionContext(vocbase->_transactionContext);
// destroy locks
#ifdef TRI_ENABLE_REPLICATION
TRI_DestroyReadWriteLock(&vocbase->_inventoryLock);
#endif
TRI_DestroyReadWriteLock(&vocbase->_authInfoLock);
TRI_DestroyReadWriteLock(&vocbase->_lock);
TRI_DestroyCondition(&vocbase->_syncWaitersCondition);
@ -1904,9 +1888,7 @@ TRI_json_t* TRI_InventoryCollectionsVocBase (TRI_vocbase_t* vocbase,
TRI_InitVectorPointer(&collections, TRI_CORE_MEM_ZONE);
#ifdef TRI_ENABLE_REPLICATION
TRI_WriteLockReadWriteLock(&vocbase->_inventoryLock);
#endif
// copy collection pointers into vector so we can work with the copy without
// the global lock
@ -1971,9 +1953,7 @@ TRI_json_t* TRI_InventoryCollectionsVocBase (TRI_vocbase_t* vocbase,
TRI_READ_UNLOCK_STATUS_VOCBASE_COL(collection);
}
#ifdef TRI_ENABLE_REPLICATION
TRI_WriteUnlockReadWriteLock(&vocbase->_inventoryLock);
#endif
TRI_DestroyVectorPointer(&collections);
@ -2098,9 +2078,7 @@ TRI_vocbase_col_t* TRI_CreateCollectionVocBase (TRI_vocbase_t* vocbase,
TRI_collection_t* col;
TRI_primary_collection_t* primary = NULL;
TRI_document_collection_t* document;
#ifdef TRI_ENABLE_REPLICATION
TRI_json_t* json;
#endif
TRI_col_type_e type;
char const* name;
void const* found;
@ -2124,9 +2102,7 @@ TRI_vocbase_col_t* TRI_CreateCollectionVocBase (TRI_vocbase_t* vocbase,
return NULL;
}
#ifdef TRI_ENABLE_REPLICATION
TRI_ReadLockReadWriteLock(&vocbase->_inventoryLock);
#endif
TRI_WRITE_LOCK_COLLECTIONS_VOCBASE(vocbase);
@ -2139,9 +2115,7 @@ TRI_vocbase_col_t* TRI_CreateCollectionVocBase (TRI_vocbase_t* vocbase,
if (found != NULL) {
TRI_WRITE_UNLOCK_COLLECTIONS_VOCBASE(vocbase);
#ifdef TRI_ENABLE_REPLICATION
TRI_ReadUnlockReadWriteLock(&vocbase->_inventoryLock);
#endif
LOG_DEBUG("collection named '%s' already exists", name);
@ -2158,9 +2132,7 @@ TRI_vocbase_col_t* TRI_CreateCollectionVocBase (TRI_vocbase_t* vocbase,
if (document == NULL) {
TRI_WRITE_UNLOCK_COLLECTIONS_VOCBASE(vocbase);
#ifdef TRI_ENABLE_REPLICATION
TRI_ReadUnlockReadWriteLock(&vocbase->_inventoryLock);
#endif
return NULL;
}
@ -2184,9 +2156,7 @@ TRI_vocbase_col_t* TRI_CreateCollectionVocBase (TRI_vocbase_t* vocbase,
TRI_WRITE_UNLOCK_COLLECTIONS_VOCBASE(vocbase);
#ifdef TRI_ENABLE_REPLICATION
TRI_ReadUnlockReadWriteLock(&vocbase->_inventoryLock);
#endif
return NULL;
}
@ -2204,11 +2174,8 @@ TRI_vocbase_col_t* TRI_CreateCollectionVocBase (TRI_vocbase_t* vocbase,
// release the lock on the list of collections
TRI_WRITE_UNLOCK_COLLECTIONS_VOCBASE(vocbase);
#ifdef TRI_ENABLE_REPLICATION
TRI_ReadUnlockReadWriteLock(&vocbase->_inventoryLock);
#endif
#ifdef TRI_ENABLE_REPLICATION
// replicate and finally unlock the collection
json = TRI_CreateJsonCollectionInfo(&col->_info);
TRI_LogCreateCollectionReplication(vocbase,
@ -2217,7 +2184,6 @@ TRI_vocbase_col_t* TRI_CreateCollectionVocBase (TRI_vocbase_t* vocbase,
json,
generatingServer);
TRI_FreeJson(TRI_CORE_MEM_ZONE, json);
#endif
TRI_READ_UNLOCK_STATUS_VOCBASE_COL(collection);
@ -2299,9 +2265,7 @@ int TRI_DropCollectionVocBase (TRI_vocbase_t* vocbase,
return TRI_set_errno(TRI_ERROR_FORBIDDEN);
}
#ifdef TRI_ENABLE_REPLICATION
TRI_ReadLockReadWriteLock(&vocbase->_inventoryLock);
#endif
// mark collection as deleted
TRI_WRITE_LOCK_STATUS_VOCBASE_COL(collection);
@ -2315,9 +2279,7 @@ int TRI_DropCollectionVocBase (TRI_vocbase_t* vocbase,
TRI_WRITE_UNLOCK_STATUS_VOCBASE_COL(collection);
#ifdef TRI_ENABLE_REPLICATION
TRI_ReadUnlockReadWriteLock(&vocbase->_inventoryLock);
#endif
return TRI_ERROR_NO_ERROR;
}
@ -2335,9 +2297,7 @@ int TRI_DropCollectionVocBase (TRI_vocbase_t* vocbase,
if (res != TRI_ERROR_NO_ERROR) {
TRI_WRITE_UNLOCK_STATUS_VOCBASE_COL(collection);
#ifdef TRI_ENABLE_REPLICATION
TRI_ReadUnlockReadWriteLock(&vocbase->_inventoryLock);
#endif
return TRI_set_errno(res);
}
@ -2360,9 +2320,7 @@ int TRI_DropCollectionVocBase (TRI_vocbase_t* vocbase,
if (res != TRI_ERROR_NO_ERROR) {
TRI_WRITE_UNLOCK_STATUS_VOCBASE_COL(collection);
#ifdef TRI_ENABLE_REPLICATION
TRI_ReadUnlockReadWriteLock(&vocbase->_inventoryLock);
#endif
return TRI_set_errno(res);
}
}
@ -2375,9 +2333,7 @@ int TRI_DropCollectionVocBase (TRI_vocbase_t* vocbase,
DropCollectionCallback(0, collection);
#ifdef TRI_ENABLE_REPLICATION
TRI_ReadUnlockReadWriteLock(&vocbase->_inventoryLock);
#endif
return TRI_ERROR_NO_ERROR;
}
@ -2394,9 +2350,7 @@ int TRI_DropCollectionVocBase (TRI_vocbase_t* vocbase,
if (res != TRI_ERROR_NO_ERROR) {
TRI_WRITE_UNLOCK_STATUS_VOCBASE_COL(collection);
#ifdef TRI_ENABLE_REPLICATION
TRI_ReadUnlockReadWriteLock(&vocbase->_inventoryLock);
#endif
return res;
}
@ -2406,9 +2360,7 @@ int TRI_DropCollectionVocBase (TRI_vocbase_t* vocbase,
TRI_WRITE_UNLOCK_STATUS_VOCBASE_COL(collection);
#ifdef TRI_ENABLE_REPLICATION
TRI_ReadUnlockReadWriteLock(&vocbase->_inventoryLock);
#endif
// added callback for dropping
TRI_CreateBarrierDropCollection(&collection->_collection->_barrierList,
@ -2431,9 +2383,7 @@ int TRI_DropCollectionVocBase (TRI_vocbase_t* vocbase,
else {
TRI_WRITE_UNLOCK_STATUS_VOCBASE_COL(collection);
#ifdef TRI_ENABLE_REPLICATION
TRI_ReadUnlockReadWriteLock(&vocbase->_inventoryLock);
#endif
LOG_WARNING("internal error in TRI_DropCollectionVocBase");
@ -2554,15 +2504,11 @@ int TRI_RenameCollectionVocBase (TRI_vocbase_t* vocbase,
TRI_CopyString(info._name, newName, sizeof(info._name));
#ifdef TRI_ENABLE_REPLICATION
TRI_ReadLockReadWriteLock(&vocbase->_inventoryLock);
#endif
res = TRI_SaveCollectionInfo(collection->_path, &info, vocbase->_forceSyncProperties);
#ifdef TRI_ENABLE_REPLICATION
TRI_ReadUnlockReadWriteLock(&vocbase->_inventoryLock);
#endif
TRI_FreeCollectionInfoOptions(&info);
@ -2582,15 +2528,11 @@ int TRI_RenameCollectionVocBase (TRI_vocbase_t* vocbase,
else if (collection->_status == TRI_VOC_COL_STATUS_LOADED ||
collection->_status == TRI_VOC_COL_STATUS_UNLOADING) {
#ifdef TRI_ENABLE_REPLICATION
TRI_ReadLockReadWriteLock(&vocbase->_inventoryLock);
#endif
res = TRI_RenameCollection(&collection->_collection->base, newName);
#ifdef TRI_ENABLE_REPLICATION
TRI_ReadUnlockReadWriteLock(&vocbase->_inventoryLock);
#endif
if (res != TRI_ERROR_NO_ERROR) {
TRI_WRITE_UNLOCK_COLLECTIONS_VOCBASE(vocbase);
@ -2619,9 +2561,7 @@ int TRI_RenameCollectionVocBase (TRI_vocbase_t* vocbase,
// rename and release locks
// .............................................................................
#ifdef TRI_ENABLE_REPLICATION
TRI_ReadLockReadWriteLock(&vocbase->_inventoryLock);
#endif
TRI_RemoveKeyAssociativePointer(&vocbase->_collectionsByName, oldName);
TRI_CopyString(collection->_name, newName, sizeof(collection->_name));
@ -2629,16 +2569,12 @@ int TRI_RenameCollectionVocBase (TRI_vocbase_t* vocbase,
// this shouldn't fail, as we removed an element above so adding one should be ok
TRI_InsertKeyAssociativePointer(&vocbase->_collectionsByName, newName, CONST_CAST(collection), false);
#ifdef TRI_ENABLE_REPLICATION
TRI_ReadUnlockReadWriteLock(&vocbase->_inventoryLock);
#endif
TRI_WRITE_UNLOCK_COLLECTIONS_VOCBASE(vocbase);
#ifdef TRI_ENABLE_REPLICATION
// stay inside the outer lock to protect against unloading
TRI_LogRenameCollectionReplication(vocbase, collection->_cid, newName, generatingServer);
#endif
TRI_WRITE_UNLOCK_STATUS_VOCBASE_COL(collection);

View File

@ -48,13 +48,10 @@ extern "C" {
struct TRI_primary_collection_s;
struct TRI_col_info_s;
struct TRI_json_s;
struct TRI_shadow_store_s;
struct TRI_transaction_context_s;
#ifdef TRI_ENABLE_REPLICATION
struct TRI_replication_applier_s;
struct TRI_replication_logger_s;
#endif
struct TRI_shadow_store_s;
struct TRI_transaction_context_s;
// -----------------------------------------------------------------------------
// --SECTION-- public macros
@ -328,18 +325,14 @@ typedef struct TRI_vocbase_s {
char* _path; // path to the data directory
TRI_voc_size_t _defaultMaximalSize;
#ifdef TRI_ENABLE_REPLICATION
int64_t _replicationLogSize;
#endif
bool _removeOnDrop; // wipe collection from disk after dropping
bool _removeOnCompacted; // wipe datafile from disk after compaction
bool _defaultWaitForSync;
bool _forceSyncShapes; // force syncing of shape data to disk
bool _forceSyncProperties; // force syncing of shape data to disk
#ifdef TRI_ENABLE_REPLICATION
bool _replicationEnableLogger;
bool _replicationLogRemoteChanges;
#endif
bool _isSystem;
bool _requireAuthentication;
bool _authenticateSystemOnly;
@ -353,9 +346,7 @@ typedef struct TRI_vocbase_s {
TRI_associative_pointer_t _collectionsByName; // collections by name
TRI_associative_pointer_t _collectionsById; // collections by id
#ifdef TRI_ENABLE_REPLICATION
TRI_read_write_lock_t _inventoryLock; // object lock needed when replication is assessing the state of the vocbase
#endif
TRI_associative_pointer_t _authInfo;
TRI_read_write_lock_t _authInfoLock;
@ -364,10 +355,8 @@ typedef struct TRI_vocbase_s {
struct TRI_transaction_context_s* _transactionContext;
#ifdef TRI_ENABLE_REPLICATION
struct TRI_replication_logger_s* _replicationLogger;
struct TRI_replication_applier_s* _replicationApplier;
#endif
// state of the database
// 0 = inactive
@ -451,10 +440,8 @@ typedef struct TRI_vocbase_defaults_s {
bool forceSyncProperties;
bool requireAuthentication;
bool authenticateSystemOnly;
#ifdef TRI_ENABLE_REPLICATION
bool replicationEnableLogger;
bool replicationLogRemoteChanges;
#endif
}
TRI_vocbase_defaults_t;

View File

@ -141,7 +141,6 @@ m4_include([m4/configure.threads])
m4_include([m4/configure.documentation])
m4_include([m4/configure.coverage])
m4_include([m4/configure.maintainer])
m4_include([m4/configure.replication])
dnl ============================================================================
dnl --SECTION-- EXTERNAL LIBRARIES

View File

@ -3,7 +3,8 @@
/*global require, module, Module, ArangoError,
REPLICATION_LOGGER_START, REPLICATION_LOGGER_STOP, REPLICATION_LOGGER_STATE,
REPLICATION_LOGGER_CONFIGURE, REPLICATION_APPLIER_CONFIGURE, REPLICATION_APPLIER_START,
REPLICATION_APPLIER_STOP, REPLICATION_APPLIER_FORGET, REPLICATION_APPLIER_STATE,
REPLICATION_APPLIER_STOP, REPLICATION_APPLIER_FORGET, REPLICATION_APPLIER_STATE,
REPLICATION_SYNCHRONISE,
SYS_DEBUG_CAN_USE_FAILAT, SYS_DEBUG_SET_FAILAT, SYS_DEBUG_REMOVE_FAILAT, SYS_DEBUG_CLEAR_FAILAT,
SYS_DOWNLOAD, SYS_EXECUTE, SYS_LOAD, SYS_LOG_LEVEL, SYS_MD5, SYS_OUTPUT, SYS_PROCESS_STATISTICS,
SYS_RAND, SYS_SERVER_STATISTICS, SYS_SPRINTF, SYS_TIME, SYS_START_PAGER, SYS_STOP_PAGER,
@ -321,6 +322,15 @@
delete REPLICATION_APPLIER_FORGET;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief sychroniseReplication
////////////////////////////////////////////////////////////////////////////////
if (typeof REPLICATION_SYNCHRONISE !== "undefined") {
exports.synchroniseReplication = REPLICATION_SYNCHRONISE;
delete REPLICATION_SYNCHRONISE;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief debugSetFailAt
////////////////////////////////////////////////////////////////////////////////

View File

@ -204,6 +204,36 @@ applier.properties = function (config) {
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- other functions
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup ArangoShell
/// @{
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @brief performs a one-time synchronisation with a remote endpoint
////////////////////////////////////////////////////////////////////////////////
var sync = function (config) {
'use strict';
var db = internal.db;
var requestResult = db._connection.PUT("_api/replication/sync",
JSON.stringify(config));
arangosh.checkRequestResult(requestResult);
return requestResult;
};
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- module exports
// -----------------------------------------------------------------------------
@ -215,6 +245,7 @@ applier.properties = function (config) {
exports.logger = logger;
exports.applier = applier;
exports.sync = sync;
////////////////////////////////////////////////////////////////////////////////
/// @}

View File

@ -93,10 +93,6 @@
<td class="alignLeft">Last applied tick</th>
<td id="applyLastAppliedTickVal"></td>
</tr>
<tr class="checkApplyRunningStatus">
<td class="alignLeft">Current phase</td>
<td id="applyCurrentPhaseLabelVal"></td>
</tr>
<tr class="checkApplyRunningStatus">
<td class="alignLeft">Progress</td>
<td id="applyProgressVal"></td>

View File

@ -139,7 +139,6 @@ var dashboardView = Backbone.View.extend({
//apply table
var lastAppliedTick;
var phase = "-";
var progress = "-";
var lastError = "-";
var endpoint = "-";
@ -159,10 +158,6 @@ var dashboardView = Backbone.View.extend({
endpoint = this.replApplyState.state.endpoint;
}
if (this.replApplyState.state.currentPhase) {
phase = this.replApplyState.state.currentPhase.label;
}
time = this.replApplyState.state.time;
if (this.replApplyState.state.progress) {
@ -184,7 +179,6 @@ var dashboardView = Backbone.View.extend({
$('#applyRunningVal').html(runningApply);
$('#applyEndpointVal').text(endpoint);
$('#applyLastAppliedTickVal').text(lastAppliedTick);
$('#applyCurrentPhaseLabelVal').text(phase);
$('#applyTimeVal').text(time);
$('#applyProgressVal').text(progress);
$('#applyLastErrorVal').text(lastError);

View File

@ -203,6 +203,36 @@ applier.properties = function (config) {
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- other functions
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup ArangoShell
/// @{
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @brief performs a one-time synchronisation with a remote endpoint
////////////////////////////////////////////////////////////////////////////////
var sync = function (config) {
'use strict';
var db = internal.db;
var requestResult = db._connection.PUT("_api/replication/sync",
JSON.stringify(config));
arangosh.checkRequestResult(requestResult);
return requestResult;
};
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- module exports
// -----------------------------------------------------------------------------
@ -214,6 +244,7 @@ applier.properties = function (config) {
exports.logger = logger;
exports.applier = applier;
exports.sync = sync;
////////////////////////////////////////////////////////////////////////////////
/// @}

View File

@ -3,7 +3,8 @@
/*global require, module, Module, ArangoError,
REPLICATION_LOGGER_START, REPLICATION_LOGGER_STOP, REPLICATION_LOGGER_STATE,
REPLICATION_LOGGER_CONFIGURE, REPLICATION_APPLIER_CONFIGURE, REPLICATION_APPLIER_START,
REPLICATION_APPLIER_STOP, REPLICATION_APPLIER_FORGET, REPLICATION_APPLIER_STATE,
REPLICATION_APPLIER_STOP, REPLICATION_APPLIER_FORGET, REPLICATION_APPLIER_STATE,
REPLICATION_SYNCHRONISE,
SYS_DEBUG_CAN_USE_FAILAT, SYS_DEBUG_SET_FAILAT, SYS_DEBUG_REMOVE_FAILAT, SYS_DEBUG_CLEAR_FAILAT,
SYS_DOWNLOAD, SYS_EXECUTE, SYS_LOAD, SYS_LOG_LEVEL, SYS_MD5, SYS_OUTPUT, SYS_PROCESS_STATISTICS,
SYS_RAND, SYS_SERVER_STATISTICS, SYS_SPRINTF, SYS_TIME, SYS_START_PAGER, SYS_STOP_PAGER,
@ -321,6 +322,15 @@
delete REPLICATION_APPLIER_FORGET;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief sychroniseReplication
////////////////////////////////////////////////////////////////////////////////
if (typeof REPLICATION_SYNCHRONISE !== "undefined") {
exports.synchroniseReplication = REPLICATION_SYNCHRONISE;
delete REPLICATION_SYNCHRONISE;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief debugSetFailAt
////////////////////////////////////////////////////////////////////////////////

View File

@ -148,6 +148,29 @@ applier.properties = function (config) {
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- other functions
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup ArangoShell
/// @{
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @brief performs a one-time synchronisation with a remote endpoint
////////////////////////////////////////////////////////////////////////////////
var sync = function (config) {
'use strict';
return internal.synchroniseReplication(config);
};
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- module exports
// -----------------------------------------------------------------------------
@ -159,6 +182,7 @@ applier.properties = function (config) {
exports.logger = logger;
exports.applier = applier;
exports.sync = sync;
////////////////////////////////////////////////////////////////////////////////
/// @}

View File

@ -87,9 +87,7 @@ string const ApplicationServer::OPTIONS_LOGGER = "Logging Options";
/// @brief Replication Options
////////////////////////////////////////////////////////////////////////////////
#ifdef TRI_ENABLE_REPLICATION
string const ApplicationServer::OPTIONS_REPLICATION = "Replication Options";
#endif
////////////////////////////////////////////////////////////////////////////////
/// @brief Server Options

View File

@ -105,9 +105,7 @@ namespace triagens {
/// @brief Replication Options
////////////////////////////////////////////////////////////////////////////////
#ifdef TRI_ENABLE_REPLICATION
static std::string const OPTIONS_REPLICATION;
#endif
////////////////////////////////////////////////////////////////////////////////
/// @brief Server Options

View File

@ -83,12 +83,6 @@
#undef TRI_ENABLE_MRUBY
////////////////////////////////////////////////////////////////////////////////
/// @brief enable replication
////////////////////////////////////////////////////////////////////////////////
#undef TRI_ENABLE_REPLICATION
////////////////////////////////////////////////////////////////////////////////
/// @brief configure command
////////////////////////////////////////////////////////////////////////////////

View File

@ -1,26 +0,0 @@
dnl -*- mode: Autoconf; -*-
dnl ----------------------------------------------------------------------------
dnl --SECTION-- REPLICATION
dnl ----------------------------------------------------------------------------
AC_ARG_ENABLE(replication,
AS_HELP_STRING([--enable-replication], [enable replication (default: no)]),
[tr_REPLICATION="${enableval:-yes}"],
[tr_REPLICATION=no]
)
if test "x$tr_REPLICATION" = "xyes"; then
AC_DEFINE_UNQUOTED(TRI_ENABLE_REPLICATION, 1, [true if replication support should be built])
fi
AM_CONDITIONAL(ENABLE_REPLICATION, test "x$tr_REPLICATION" = "xyes")
dnl ----------------------------------------------------------------------------
dnl --SECTION-- END-OF-FILE
dnl ----------------------------------------------------------------------------
dnl Local Variables:
dnl mode: outline-minor
dnl outline-regexp: "^\\(### @brief\\|## --SECTION--\\|# -\\*- \\)"
dnl End: