1
0
Fork 0

replication tests

This commit is contained in:
Jan Steemann 2013-07-22 18:12:56 +02:00
parent 004dde7cfe
commit c4f82088dd
19 changed files with 1014 additions and 32 deletions

View File

@ -60,19 +60,39 @@ unittests-verbose:
@sleep 1
################################################################################
### @brief start the arango server
### @brief arango server configuration
################################################################################
PID := $(shell echo $$PPID)
PIDFILE := /tmp/arango.$(PID).pid
VOCDIR := /tmp/vocdir.$(PID)
VOCPORT := $(shell printf "3%04d" `expr $(PID) % 10000`)
VOCHOST := 127.0.0.1
STARTFILE := $(VOCDIR)/startup
################################################################################
### @brief slave stuff
################################################################################
SLAVEPIDFILE := /tmp/arango-slave.$(PID).pid
SLAVEPORT := $(shell printf "4%04d" `expr $(PID) % 10000`)
SLAVEHOST := 127.0.0.1
SLAVEDIR := /tmp/vocdir-slave.$(PID)
SLAVESTARTFILE := $(SLAVEDIR)/startup
################################################################################
### @brief client configuration
################################################################################
CURL := curl
CURL_OPT := --basic --user "$(USERNAME):$(PASSWORD)"
CERT_FILE := @top_srcdir@/UnitTests/server.pem
################################################################################
### @brief start the arango server
################################################################################
SERVER_START :=
SERVER_OPT := \
@ -129,6 +149,32 @@ start-server:
@if [ "$(VALGRIND)" != "" ]; then echo "adding valgrind memorial time..."; sleep 75; else sleep 2; fi
@echo
.PHONY: start-slave
start-slave:
@echo
@echo "================================================================================"
@echo "<< STARTING SLAVE >>"
@echo "================================================================================"
@echo
@echo "Slave Options: $(SERVER_OPT)"
@echo
rm -f "$(SLAVEPIDFILE)"
rm -rf "$(SLAVEDIR)"
mkdir -p "$(SLAVEDIR)"
@test -d "$(SLAVEDIR)"
($(VALGRIND) @builddir@/bin/arangod "$(SLAVEDIR)" $(SERVER_OPT) --pid-file $(SLAVEPIDFILE) --watch-process $(PID) && rm -rf "$(SLAVEDIR)") &
@rm -f "$(SLAVESTARTFILE)"; while [ ! -s "$(SLAVESTARTFILE)" ]; do $(CURL) $(CURL_OPT) -X GET -s "$(PROTO)://$(SLAVEHOST):$(SLAVEPORT)/_api/version" > "$(SLAVESTARTFILE)" || sleep 2; done
@rm -f "$(SLAVESTARTFILE)"
@echo "slave has been started."
@if [ "$(VALGRIND)" != "" ]; then echo "adding valgrind memorial time..."; sleep 75; else sleep 2; fi
@echo
################################################################################
### @brief issue a curl request and return the status
################################################################################
@ -429,7 +475,7 @@ unittests-shell-client:
.PHONY: unittests-http-server
unittests-http-server:
$(MAKE) start-server PID=$(PID) SERVER_START="--server.endpoint tcp://$(VOCHOST):$(VOCPORT) --server.disable-auth true" PROTO=http
$(MAKE) start-server PID=$(PID) SERVER_START="--server.endpoint tcp://$(VOCHOST):$(VOCPORT) --server.disable-authentication true" PROTO=http
@echo
@echo "================================================================================"
@ -455,7 +501,7 @@ unittests-http-server:
.PHONY: unittests-ssl-server
unittests-ssl-server:
$(MAKE) start-server PID=$(PID) SERVER_START="--server.endpoint ssl://$(VOCHOST):$(VOCPORT) --server.keyfile $(CERT_FILE) --server.disable-auth true" PROTO=https
$(MAKE) start-server PID=$(PID) SERVER_START="--server.endpoint ssl://$(VOCHOST):$(VOCPORT) --server.keyfile $(CERT_FILE) --server.disable-authentication true" PROTO=https
@echo
@echo "================================================================================"
@ -506,6 +552,38 @@ unittests-import:
@echo
################################################################################
### @brief REPLICATION TESTS
###
### starts two servers (master & slave) and checks for data drift
################################################################################
.PHONY: unittests-replication
unittests-replication:
$(MAKE) start-server PID=$(PID) SERVER_START="--server.endpoint tcp://$(VOCHOST):$(VOCPORT) --server.disable-authentication true" PROTO=http
$(MAKE) start-slave PID=$(PID) SERVER_START="--server.endpoint tcp://$(SLAVEHOST):$(SLAVEPORT) --server.disable-authentication true" PROTO=http
@echo
@echo "================================================================================"
@echo "<< REPLICATION TESTS >>"
@echo "================================================================================"
@echo
$(VALGRIND) @builddir@/bin/arangosh $(CLIENT_OPT) --server.username "$(USERNAME)" --server.password "$(PASSWORD)" --server.endpoint tcp://$(VOCHOST):$(VOCPORT) --javascript.unit-tests @top_srcdir@/js/server/tests/replication.js || test "x$(FORCE)" == "x1"
kill `cat $(PIDFILE)`
kill `cat $(SLAVEPIDFILE)`
while test -f $(PIDFILE); do sleep 1; done
while test -f $(SLAVEPIDFILE); do sleep 1; done
@if [ "$(VALGRIND)" != "" ]; then sleep 60; fi
@rm -rf "$(VOCDIR)"
@rm -rf "$(SLAVEDIR)"
@echo
################################################################################
### @brief UPGRADE TESTS
###
@ -626,7 +704,7 @@ unittests-authentication:
### @brief FULL AUTHENTICATION
################################################################################
$(MAKE) start-server PID=$(PID) SERVER_START="--server.endpoint tcp://$(VOCHOST):$(VOCPORT) --server.disable-auth false --server.authenticate-system-only false" PROTO=http
$(MAKE) start-server PID=$(PID) SERVER_START="--server.endpoint tcp://$(VOCHOST):$(VOCPORT) --server.disable-authentication false --server.authenticate-system-only false" PROTO=http
$(MAKE) curl-request-get CURL_PROTO="http" CURL_HOST="$(VOCHOST)" CURL_PORT="$(VOCPORT)" CURL_URL="/_api/" EXPECTED="401" || test "x$(FORCE)" == "x1"
$(MAKE) curl-request-get CURL_PROTO="http" CURL_HOST="$(VOCHOST)" CURL_PORT="$(VOCPORT)" CURL_URL="/_api" EXPECTED="401" || test "x$(FORCE)" == "x1"
$(MAKE) curl-request-get CURL_PROTO="http" CURL_HOST="$(VOCHOST)" CURL_PORT="$(VOCPORT)" CURL_URL="/_api/version" EXPECTED="401" || test "x$(FORCE)" == "x1"
@ -644,7 +722,7 @@ unittests-authentication:
### @brief AUTHENTICATION FOR /_ ONLY
################################################################################
$(MAKE) start-server PID=$(PID) SERVER_START="--server.endpoint tcp://$(VOCHOST):$(VOCPORT) --server.disable-auth false --server.authenticate-system-only true" PROTO=http
$(MAKE) start-server PID=$(PID) SERVER_START="--server.endpoint tcp://$(VOCHOST):$(VOCPORT) --server.disable-authentication false --server.authenticate-system-only true" PROTO=http
$(MAKE) curl-request-get CURL_PROTO="http" CURL_HOST="$(VOCHOST)" CURL_PORT="$(VOCPORT)" CURL_URL="/_api/" EXPECTED="401" || test "x$(FORCE)" == "x1"
$(MAKE) curl-request-get CURL_PROTO="http" CURL_HOST="$(VOCHOST)" CURL_PORT="$(VOCPORT)" CURL_URL="/_api" EXPECTED="401" || test "x$(FORCE)" == "x1"
$(MAKE) curl-request-get CURL_PROTO="http" CURL_HOST="$(VOCHOST)" CURL_PORT="$(VOCPORT)" CURL_URL="/_api/version" EXPECTED="401" || test "x$(FORCE)" == "x1"
@ -662,7 +740,7 @@ unittests-authentication:
### @brief NO AUTHENTICATION
################################################################################
$(MAKE) start-server PID=$(PID) SERVER_START="--server.endpoint tcp://$(VOCHOST):$(VOCPORT) --server.disable-auth true --server.authenticate-system-only true" PROTO=http
$(MAKE) start-server PID=$(PID) SERVER_START="--server.endpoint tcp://$(VOCHOST):$(VOCPORT) --server.disable-authentication true --server.authenticate-system-only true" PROTO=http
$(MAKE) curl-request-get CURL_PROTO="http" CURL_HOST="$(VOCHOST)" CURL_PORT="$(VOCPORT)" CURL_URL="/_api/" EXPECTED="404" || test "x$(FORCE)" == "x1"
$(MAKE) curl-request-get CURL_PROTO="http" CURL_HOST="$(VOCHOST)" CURL_PORT="$(VOCPORT)" CURL_URL="/_api" EXPECTED="404" || test "x$(FORCE)" == "x1"
$(MAKE) curl-request-get CURL_PROTO="http" CURL_HOST="$(VOCHOST)" CURL_PORT="$(VOCPORT)" CURL_URL="/_api/version" EXPECTED="200" || test "x$(FORCE)" == "x1"

View File

@ -642,7 +642,7 @@ int ReplicationFetcher::createCollection (TRI_json_t const* json,
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
const TRI_col_type_e type = (TRI_col_type_e) JsonHelper::getNumberValue(json, "type", (double) TRI_COL_TYPE_DOCUMENT);
const TRI_col_type_e type = (TRI_col_type_e) JsonHelper::getIntValue(json, "type", (int) TRI_COL_TYPE_DOCUMENT);
TRI_vocbase_col_t* col = TRI_LookupCollectionByIdVocBase(_vocbase, cid);
@ -888,8 +888,9 @@ int ReplicationFetcher::applyCollectionDumpMarker (TRI_transaction_collection_t*
// update
TRI_doc_update_policy_t policy;
TRI_InitUpdatePolicy(&policy, TRI_DOC_UPDATE_LAST_WRITE, 0, 0);
const TRI_voc_rid_t rid = StringUtils::uint64(JsonHelper::getStringValue(json, TRI_VOC_ATTRIBUTE_REV, ""));
res = primary->update(trxCollection, key, &mptr, shaped, &policy, false, false);
res = primary->update(trxCollection, key, rid, &mptr, shaped, &policy, false, false);
}
TRI_FreeShapedJson(primary->_shaper, shaped);
@ -1410,21 +1411,21 @@ int ReplicationFetcher::performContinuousSync (string& errorMsg) {
}
else {
if (masterActive) {
sleepTime = 1 * 1000 * 1000;
sleepTime = 500 * 1000;
}
else {
sleepTime = 10 * 1000 * 1000;
sleepTime = 5 * 1000 * 1000;
}
if (_configuration._adaptivePolling) {
inactiveCycles++;
if (inactiveCycles > 30) {
if (inactiveCycles > 60) {
sleepTime *= 5;
}
else if (inactiveCycles > 20) {
else if (inactiveCycles > 30) {
sleepTime *= 3;
}
if (inactiveCycles > 10) {
if (inactiveCycles > 15) {
sleepTime *= 2;
}
}

View File

@ -28,6 +28,7 @@
#include "RestReplicationHandler.h"
#include "build.h"
#include "Basics/JsonHelper.h"
#include "BasicsC/conversions.h"
#include "BasicsC/files.h"
#include "Logger/Logger.h"
@ -155,6 +156,12 @@ Handler::status_e RestReplicationHandler::execute() {
}
handleCommandDump();
}
else if (command == "apply-config") {
if (type != HttpRequest::HTTP_REQUEST_PUT) {
goto BAD_CALL;
}
handleCommandApplyConfig();
}
else if (command == "apply-start") {
if (type != HttpRequest::HTTP_REQUEST_PUT) {
goto BAD_CALL;
@ -581,6 +588,45 @@ void RestReplicationHandler::handleCommandDump () {
TRI_FreeStringBuffer(TRI_CORE_MEM_ZONE, dump._buffer);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief configure the replication applier
////////////////////////////////////////////////////////////////////////////////
void RestReplicationHandler::handleCommandApplyConfig () {
assert(_vocbase->_replicationApplier != 0);
TRI_replication_apply_configuration_t configuration;
TRI_InitApplyConfigurationReplicationApplier(&configuration);
TRI_json_t* json = parseJsonBody();
if (json == 0) {
return;
}
const string endpoint = JsonHelper::getStringValue(json, "endpoint", "");
configuration._endpoint = TRI_DuplicateString2Z(TRI_CORE_MEM_ZONE, endpoint.c_str(), endpoint.size());
configuration._requestTimeout = JsonHelper::getDoubleValue(json, "requestTimeout", configuration._requestTimeout);
configuration._connectTimeout = JsonHelper::getDoubleValue(json, "connectTimeout", configuration._connectTimeout);
configuration._ignoreErrors = JsonHelper::getUInt64Value(json, "ignoreErrors", configuration._ignoreErrors);
configuration._maxConnectRetries = JsonHelper::getIntValue(json, "maxConnectRetries", configuration._maxConnectRetries);
configuration._autoStart = JsonHelper::getBooleanValue(json, "autoStart", configuration._autoStart);
configuration._adaptivePolling = JsonHelper::getBooleanValue(json, "adaptivePolling", configuration._adaptivePolling);
TRI_Free(TRI_UNKNOWN_MEM_ZONE, json);
int res = TRI_ConfigureReplicationApplier(_vocbase->_replicationApplier, &configuration);
TRI_DestroyApplyConfigurationReplicationApplier(&configuration);
if (res != TRI_ERROR_NO_ERROR) {
generateError(HttpResponse::SERVER_ERROR, res);
}
else {
handleCommandApplyState();
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief start the replication applier
////////////////////////////////////////////////////////////////////////////////
@ -634,7 +680,7 @@ void RestReplicationHandler::handleCommandApplyState () {
assert(_vocbase->_replicationApplier != 0);
TRI_replication_apply_state_t state;
int res = TRI_StateReplicationApplier(_vocbase->_replicationApplier, &state);
if (res != TRI_ERROR_NO_ERROR) {
@ -656,6 +702,18 @@ void RestReplicationHandler::handleCommandApplyState () {
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, server, "serverId", TRI_CreateStringJson(TRI_CORE_MEM_ZONE, TRI_StringUInt64(serverId)));
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, &result, "server", server);
TRI_replication_apply_configuration_t config;
TRI_InitApplyConfigurationReplicationApplier(&config);
TRI_ReadLockReadWriteLock(&_vocbase->_replicationApplier->_statusLock);
TRI_CopyApplyConfigurationReplicationApplier(&_vocbase->_replicationApplier->_configuration, &config);
TRI_ReadUnlockReadWriteLock(&_vocbase->_replicationApplier->_statusLock);
if (config._endpoint != NULL) {
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, &result, "endpoint", TRI_CreateStringCopyJson(TRI_CORE_MEM_ZONE, config._endpoint));
}
TRI_DestroyApplyConfigurationReplicationApplier(&config);
generateResult(&result);

View File

@ -189,6 +189,12 @@ namespace triagens {
void handleCommandDump ();
////////////////////////////////////////////////////////////////////////////////
/// @brief configure the replication applier
////////////////////////////////////////////////////////////////////////////////
void handleCommandApplyConfig ();
////////////////////////////////////////////////////////////////////////////////
/// @brief start the replication applier
////////////////////////////////////////////////////////////////////////////////

View File

@ -210,7 +210,7 @@ namespace triagens {
return TRI_ERROR_TRANSACTION_INTERNAL;
}
return this->update(this->trxCollection(), key, mptr, json, policy, expectedRevision, actualRevision, forceSync);
return this->update(this->trxCollection(), key, 0, mptr, json, policy, expectedRevision, actualRevision, forceSync);
}
////////////////////////////////////////////////////////////////////////////////
@ -229,7 +229,7 @@ namespace triagens {
return TRI_ERROR_TRANSACTION_INTERNAL;
}
return this->update(this->trxCollection(), key, mptr, shaped, policy, expectedRevision, actualRevision, forceSync);
return this->update(this->trxCollection(), key, 0, mptr, shaped, policy, expectedRevision, actualRevision, forceSync);
}
////////////////////////////////////////////////////////////////////////////////

View File

@ -852,6 +852,7 @@ namespace triagens {
int update (TRI_transaction_collection_t* trxCollection,
const string& key,
TRI_voc_rid_t rid,
TRI_doc_mptr_t* mptr,
TRI_json_t* const json,
const TRI_doc_update_policy_e policy,
@ -867,6 +868,7 @@ namespace triagens {
int res = update(trxCollection,
key,
rid,
mptr,
shaped,
policy,
@ -885,6 +887,7 @@ namespace triagens {
inline int update (TRI_transaction_collection_t* const trxCollection,
const string& key,
TRI_voc_rid_t rid,
TRI_doc_mptr_t* mptr,
TRI_shaped_json_t* const shaped,
const TRI_doc_update_policy_e policy,
@ -898,7 +901,8 @@ namespace triagens {
TRI_primary_collection_t* primary = primaryCollection(trxCollection);
int res = primary->update(trxCollection,
(const TRI_voc_key_t) key.c_str(),
(const TRI_voc_key_t) key.c_str(),
rid,
mptr,
shaped,
&updatePolicy,

View File

@ -1599,8 +1599,13 @@ static v8::Handle<v8::Value> CreateVocBase (v8::Arguments const& argv, TRI_col_t
keyOptions = TRI_ObjectToJson(p->Get(v8g->KeyOptionsKey));
}
// TRI_InitCollectionInfo will copy keyOptions
TRI_InitCollectionInfo(vocbase, &parameter, name.c_str(), collectionType, effectiveSize, keyOptions);
if (keyOptions != 0) {
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, keyOptions);
}
if (p->Has(v8g->WaitForSyncKey)) {
parameter._waitForSync = TRI_ObjectToBoolean(p->Get(v8g->WaitForSyncKey));
}
@ -5591,12 +5596,13 @@ static v8::Handle<v8::Value> JS_PropertiesVocbaseCol (v8::Arguments const& argv)
v8::Handle<v8::Object> result = v8::Object::New();
if (TRI_IS_DOCUMENT_COLLECTION(base->_info._type)) {
TRI_json_t* keyOptions = primary->_keyGenerator->toJson(primary->_keyGenerator);
result->Set(v8g->DoCompactKey, base->_info._doCompact ? v8::True() : v8::False());
result->Set(v8g->IsSystemKey, base->_info._isSystem ? v8::True() : v8::False());
result->Set(v8g->IsVolatileKey, base->_info._isVolatile ? v8::True() : v8::False());
result->Set(v8g->JournalSizeKey, v8::Number::New(base->_info._maximalSize));
TRI_json_t* keyOptions = primary->_keyGenerator->toJson(primary->_keyGenerator);
if (keyOptions != 0) {
result->Set(v8g->KeyOptionsKey, TRI_ObjectJson(keyOptions)->ToObject());

View File

@ -908,7 +908,12 @@ void TRI_InitCollectionInfo (TRI_vocbase_t* vocbase,
parameter->_maximalSize = PageSize;
}
parameter->_waitForSync = vocbase->_defaultWaitForSync;
parameter->_keyOptions = keyOptions;
parameter->_keyOptions = NULL;
if (keyOptions != NULL) {
parameter->_keyOptions = TRI_CopyJson(TRI_CORE_MEM_ZONE, keyOptions);
}
TRI_CopyString(parameter->_name, name, sizeof(parameter->_name));
}
@ -934,7 +939,7 @@ void TRI_CopyCollectionInfo (TRI_col_info_t* dst, const TRI_col_info_t* const sr
dst->_waitForSync = src->_waitForSync;
if (src->_keyOptions) {
dst->_keyOptions = TRI_CopyJson(TRI_UNKNOWN_MEM_ZONE, src->_keyOptions);
dst->_keyOptions = TRI_CopyJson(TRI_CORE_MEM_ZONE, src->_keyOptions);
}
else {
dst->_keyOptions = NULL;
@ -949,7 +954,7 @@ void TRI_CopyCollectionInfo (TRI_col_info_t* dst, const TRI_col_info_t* const sr
void TRI_FreeCollectionInfoOptions (TRI_col_info_t* parameter) {
if (parameter->_keyOptions != NULL) {
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, parameter->_keyOptions);
TRI_FreeJson(TRI_CORE_MEM_ZONE, parameter->_keyOptions);
parameter->_keyOptions = NULL;
}
}

View File

@ -312,6 +312,7 @@ static int CreateDeletionMarker (TRI_voc_tid_t tid,
////////////////////////////////////////////////////////////////////////////////
static int CloneDocumentMarker (TRI_voc_tid_t tid,
TRI_voc_tick_t tick,
TRI_df_marker_t const* original,
TRI_doc_document_key_marker_t** result,
TRI_voc_size_t* totalSize,
@ -319,7 +320,6 @@ static int CloneDocumentMarker (TRI_voc_tid_t tid,
TRI_shaped_json_t const* shaped) {
TRI_doc_document_key_marker_t* marker;
TRI_voc_tick_t tick;
size_t baseLength;
*result = NULL;
@ -359,12 +359,14 @@ static int CloneDocumentMarker (TRI_voc_tid_t tid,
return TRI_ERROR_OUT_OF_MEMORY;
}
tick = TRI_NewTickVocBase();
if (tick == 0) {
tick = TRI_NewTickVocBase();
}
// copy non-changed data (e.g. key(s)) from old marker into new marker
TRI_CloneMarker(&marker->base, original, baseLength, *totalSize);
assert(marker->_rid != 0);
// the new revision must be greater than the old one
assert((TRI_voc_rid_t) tick > marker->_rid);
// give the marker a new revision id
marker->_rid = (TRI_voc_rid_t) tick;
@ -1621,6 +1623,7 @@ static int ReadShapedJson (TRI_transaction_collection_t* trxCollection,
static int UpdateShapedJson (TRI_transaction_collection_t* trxCollection,
const TRI_voc_key_t key,
TRI_voc_rid_t rid,
TRI_doc_mptr_t* mptr,
TRI_shaped_json_t const* shaped,
TRI_doc_update_policy_t const* policy,
@ -1670,7 +1673,7 @@ static int UpdateShapedJson (TRI_transaction_collection_t* trxCollection,
original = header->_data;
tid = TRI_GetMarkerIdTransaction(trxCollection->_transaction);
res = CloneDocumentMarker(tid, original, &marker, &totalSize, original->_type, shaped);
res = CloneDocumentMarker(tid, (TRI_voc_tick_t) rid, original, &marker, &totalSize, original->_type, shaped);
if (res == TRI_ERROR_NO_ERROR) {
res = UpdateDocument(trxCollection, header, marker, totalSize, forceSync, mptr, &freeMarker);

View File

@ -321,7 +321,7 @@ typedef struct TRI_primary_collection_s {
int (*read) (struct TRI_transaction_collection_s*, const TRI_voc_key_t, TRI_doc_mptr_t*, const bool);
int (*update) (struct TRI_transaction_collection_s*, const TRI_voc_key_t, TRI_doc_mptr_t*, TRI_shaped_json_t const*, struct TRI_doc_update_policy_s const*, const bool, const bool);
int (*update) (struct TRI_transaction_collection_s*, const TRI_voc_key_t, TRI_voc_rid_t, TRI_doc_mptr_t*, TRI_shaped_json_t const*, struct TRI_doc_update_policy_s const*, const bool, const bool);
int (*remove) (struct TRI_transaction_collection_s*, const TRI_voc_key_t, struct TRI_doc_update_policy_s const*, const bool, const bool);
TRI_doc_collection_info_t* (*figures) (struct TRI_primary_collection_s* collection);

View File

@ -159,7 +159,7 @@ static TRI_json_t* JsonApplyConfiguration (TRI_replication_apply_configuration_t
assert(config->_endpoint != NULL);
json = TRI_CreateArray2Json(TRI_CORE_MEM_ZONE, 4);
json = TRI_CreateArray2Json(TRI_CORE_MEM_ZONE, 8);
if (json == NULL) {
return NULL;
@ -621,6 +621,10 @@ int TRI_ConfigureReplicationApplier (TRI_replication_applier_t* applier,
}
res = TRI_SaveConfigurationFileReplicationApplier(applier->_vocbase, config, true);
if (res == TRI_ERROR_NO_ERROR) {
res = TRI_LoadConfigurationFileReplicationApplier(applier->_vocbase, &applier->_configuration);
}
TRI_WriteUnlockReadWriteLock(&applier->_statusLock);
@ -1018,9 +1022,12 @@ void TRI_DestroyApplyConfigurationReplicationApplier (TRI_replication_apply_conf
void TRI_CopyApplyConfigurationReplicationApplier (TRI_replication_apply_configuration_t const* src,
TRI_replication_apply_configuration_t* dst) {
assert(src->_endpoint != NULL);
dst->_endpoint = TRI_DuplicateStringZ(TRI_CORE_MEM_ZONE, src->_endpoint);
if (src->_endpoint != NULL) {
dst->_endpoint = TRI_DuplicateStringZ(TRI_CORE_MEM_ZONE, src->_endpoint);
}
else {
dst->_endpoint = NULL;
}
if (src->_username != NULL) {
dst->_username = TRI_DuplicateStringZ(TRI_CORE_MEM_ZONE, src->_username);
@ -1104,6 +1111,7 @@ int TRI_SaveConfigurationFileReplicationApplier (TRI_vocbase_t* vocbase,
////////////////////////////////////////////////////////////////////////////////
/// @brief load the replication application configuration from a file
/// this function must be called under the statusLock
////////////////////////////////////////////////////////////////////////////////
int TRI_LoadConfigurationFileReplicationApplier (TRI_vocbase_t* vocbase,

View File

@ -354,7 +354,7 @@ static bool StringifyTickReplication (TRI_string_buffer_t* buffer,
return false;
}
APPEND_STRING(buffer, "{\"lastTick\":\"");
APPEND_STRING(buffer, "{\"tick\":\"");
APPEND_UINT64(buffer, (uint64_t) tick);
APPEND_STRING(buffer, "\"}");

View File

@ -1053,6 +1053,29 @@ static v8::Handle<v8::Value> ClientConnection_httpSendFile (v8::Arguments const&
return scope.Close(result);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief ClientConnection method "getEndpoint"
////////////////////////////////////////////////////////////////////////////////
static v8::Handle<v8::Value> ClientConnection_getEndpoint (v8::Arguments const& argv) {
v8::HandleScope scope;
// get the connection
V8ClientConnection* connection = TRI_UnwrapClass<V8ClientConnection>(argv.Holder(), WRAP_TYPE_CONNECTION);
if (connection == 0) {
TRI_V8_EXCEPTION_INTERNAL(scope, "connection class corrupted");
}
// check params
if (argv.Length() != 0) {
TRI_V8_EXCEPTION_USAGE(scope, "getEndpoint()");
}
const string endpoint = BaseClient.endpointString();
return scope.Close(v8::String::New(endpoint.c_str(), endpoint.size()));
}
////////////////////////////////////////////////////////////////////////////////
/// @brief ClientConnection method "lastError"
////////////////////////////////////////////////////////////////////////////////
@ -1602,6 +1625,7 @@ int main (int argc, char* argv[]) {
connection_proto->Set("PUT", v8::FunctionTemplate::New(ClientConnection_httpPut));
connection_proto->Set("PUT_RAW", v8::FunctionTemplate::New(ClientConnection_httpPutRaw));
connection_proto->Set("SEND_FILE", v8::FunctionTemplate::New(ClientConnection_httpSendFile));
connection_proto->Set("getEndpoint", v8::FunctionTemplate::New(ClientConnection_getEndpoint));
connection_proto->Set("lastHttpReturnCode", v8::FunctionTemplate::New(ClientConnection_lastHttpReturnCode));
connection_proto->Set("lastErrorMessage", v8::FunctionTemplate::New(ClientConnection_lastErrorMessage));
connection_proto->Set("isConnected", v8::FunctionTemplate::New(ClientConnection_isConnected));

View File

@ -434,6 +434,13 @@ ArangoDatabase.prototype._flushCache = function () {
}
}
}
try {
// repopulate cache
this._collections();
}
catch (err) {
}
this._properties = null;
};

View File

@ -433,6 +433,13 @@ ArangoDatabase.prototype._flushCache = function () {
}
}
}
try {
// repopulate cache
this._collections();
}
catch (err) {
}
this._properties = null;
};

View File

@ -83,6 +83,20 @@ exports.getLoggerState = function () {
return requestResult;
};
////////////////////////////////////////////////////////////////////////////////
/// @brief configures the replication applier
////////////////////////////////////////////////////////////////////////////////
exports.configureApplier = function (config) {
var db = internal.db;
var requestResult = db._connection.PUT("_api/replication/apply-config",
JSON.stringify(config));
arangosh.checkRequestResult(requestResult);
return requestResult;
};
////////////////////////////////////////////////////////////////////////////////
/// @brief starts the replication applier
////////////////////////////////////////////////////////////////////////////////

View File

@ -0,0 +1,692 @@
/*jslint sloppy: true, white: true, indent: 2, nomen: true, maxlen: 80 */
/*global require, assertEqual, assertTrue, arango */
////////////////////////////////////////////////////////////////////////////////
/// @brief test the replication
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2010-2012 triagens GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is triAGENS GmbH, Cologne, Germany
///
/// @author Jan Steemann
/// @author Copyright 2013, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
var jsunity = require("jsunity");
var arangodb = require("org/arangodb");
var db = arangodb.db;
var replication = require("org/arangodb/replication");
var console = require("console");
var sleep = require("internal").wait;
var masterEndpoint = arango.getEndpoint();
var slaveEndpoint = masterEndpoint.replace(/:3(\d+)$/, ':4$1');
// -----------------------------------------------------------------------------
// --SECTION-- replication tests
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief test suite
////////////////////////////////////////////////////////////////////////////////
function ReplicationSuite () {
var cn = "UnitTestsReplication";
var cn2 = "UnitTestsReplication2";
var connectToMaster = function () {
arango.reconnect(masterEndpoint, "root", "");
};
var connectToSlave = function () {
arango.reconnect(slaveEndpoint, "root", "");
};
var collectionChecksum = function (name) {
return db._collection(name).checksum(true).checksum;
};
var collectionCount = function (name) {
return db._collection(name).count();
};
var compare = function (master, slave) {
var state = { };
master(state);
var masterState = replication.getLoggerState();
assertTrue(masterState.state.running);
var lastTick = masterState.state.lastLogTick;
replication.stopLogger();
masterState = replication.getLoggerState();
assertFalse(masterState.running);
connectToSlave();
replication.stopApplier();
replication.configureApplier({ endpoint: masterEndpoint, username: "root", password: "" });
replication.startApplier(true);
console.log("waiting for slave to catch up");
while (1) {
var slaveState = replication.getApplierState();
if (! slaveState.state.running || slaveState.state.lastError.errorNum > 0) {
break;
}
if (slaveState.state.lastAppliedContinuousTick === lastTick ||
slaveState.state.lastAppliedInitialTick >= lastTick) {
break;
}
sleep(1);
}
slave(state);
};
return {
////////////////////////////////////////////////////////////////////////////////
/// @brief set up
////////////////////////////////////////////////////////////////////////////////
setUp : function () {
connectToMaster();
replication.stopLogger();
db._drop(cn);
db._drop(cn2);
replication.startLogger();
},
////////////////////////////////////////////////////////////////////////////////
/// @brief tear down
////////////////////////////////////////////////////////////////////////////////
tearDown : function () {
connectToMaster();
replication.stopLogger();
db._drop(cn);
db._drop(cn2);
connectToSlave();
replication.stopApplier();
db._drop(cn);
db._drop(cn2);
},
////////////////////////////////////////////////////////////////////////////////
/// @brief test documents
////////////////////////////////////////////////////////////////////////////////
testDocuments1 : function () {
compare(
function (state) {
var c = db._create(cn), i;
for (i = 0; i < 1000; ++i) {
c.save({ "value" : i, "foo" : true, "bar" : [ i , false ], "value2" : null, "mydata" : { "test" : [ "abc", "def" ] } });
}
state.checksum = collectionChecksum(cn);
state.count = collectionCount(cn);
assertEqual(1000, state.count);
},
function (state) {
assertEqual(state.checksum, collectionChecksum(cn));
assertEqual(state.count, collectionCount(cn));
}
);
},
////////////////////////////////////////////////////////////////////////////////
/// @brief test documents
////////////////////////////////////////////////////////////////////////////////
testDocuments2 : function () {
compare(
function (state) {
var c = db._create(cn), i;
for (i = 0; i < 1000; ++i) {
c.save({ "abc" : true, "_key" : "test" + i });
if (i % 3 == 0) {
c.remove(c.last());
}
else if (i % 5 == 0) {
c.update("test" + i, { "def" : "hifh" });
}
}
state.checksum = collectionChecksum(cn);
state.count = collectionCount(cn);
assertEqual(666, state.count);
},
function (state) {
assertEqual(state.checksum, collectionChecksum(cn));
assertEqual(state.count, collectionCount(cn));
}
);
},
////////////////////////////////////////////////////////////////////////////////
/// @brief test documents
////////////////////////////////////////////////////////////////////////////////
testDocuments3 : function () {
compare(
function (state) {
var c = db._create(cn), i;
for (i = 0; i < 50000; ++i) {
c.save({ "_key" : "test" + i, "foo" : "bar", "baz" : "bat" });
}
state.checksum = collectionChecksum(cn);
state.count = collectionCount(cn);
assertEqual(50000, state.count);
},
function (state) {
assertEqual(state.checksum, collectionChecksum(cn));
assertEqual(state.count, collectionCount(cn));
}
);
},
////////////////////////////////////////////////////////////////////////////////
/// @brief test edges
////////////////////////////////////////////////////////////////////////////////
testEdges : function () {
compare(
function (state) {
var v = db._create(cn), i;
var e = db._createEdgeCollection(cn2);
for (i = 0; i < 1000; ++i) {
v.save({ "_key" : "test" + i });
}
for (i = 0; i < 5000; i += 10) {
e.save(cn + "/test" + i, cn + "/test" + i, { "foo" : "bar", "value" : i });
}
state.checksum1 = collectionChecksum(cn);
state.count1 = collectionCount(cn);
assertEqual(1000, state.count1);
state.checksum2 = collectionChecksum(cn2);
state.count2 = collectionCount(cn2);
assertEqual(500, state.count2);
},
function (state) {
assertEqual(state.checksum1, collectionChecksum(cn));
assertEqual(state.count1, collectionCount(cn));
assertEqual(state.checksum2, collectionChecksum(cn2));
assertEqual(state.count2, collectionCount(cn2));
}
);
},
////////////////////////////////////////////////////////////////////////////////
/// @brief test transactions
////////////////////////////////////////////////////////////////////////////////
testTransaction1 : function () {
compare(
function (state) {
var c = db._create(cn), i;
try {
db._executeTransaction({
collections: {
write: cn
},
action: function () {
for (i = 0; i < 1000; ++i) {
c.save({ "_key" : "test" + i });
}
throw "rollback!";
}
});
fail();
}
catch (err) {
}
state.checksum = collectionChecksum(cn);
state.count = collectionCount(cn);
assertEqual(0, state.count);
},
function (state) {
assertEqual(state.checksum, collectionChecksum(cn));
assertEqual(state.count, collectionCount(cn));
}
);
},
////////////////////////////////////////////////////////////////////////////////
/// @brief test transactions
////////////////////////////////////////////////////////////////////////////////
testTransaction2 : function () {
compare(
function (state) {
var c = db._create(cn), i;
for (i = 0; i < 1000; ++i) {
c.save({ "_key" : "test" + i });
}
try {
db._executeTransaction({
collections: {
write: cn
},
action: function () {
for (i = 0; i < 1000; ++i) {
c.remove("test" + i);
}
}
});
fail();
}
catch (err) {
}
state.checksum = collectionChecksum(cn);
state.count = collectionCount(cn);
assertEqual(1000, state.count);
},
function (state) {
assertEqual(state.checksum, collectionChecksum(cn));
assertEqual(state.count, collectionCount(cn));
}
);
},
////////////////////////////////////////////////////////////////////////////////
/// @brief test transactions
////////////////////////////////////////////////////////////////////////////////
testTransaction3 : function () {
compare(
function (state) {
var c = db._create(cn), i;
db._executeTransaction({
collections: {
write: cn
},
action: function (params) {
var c = require("internal").db._collection(params.cn), i;
for (i = 0; i < 1000; ++i) {
c.save({ "_key" : "test" + i });
}
},
params: { "cn": cn },
});
state.checksum = collectionChecksum(cn);
state.count = collectionCount(cn);
assertEqual(1000, state.count);
},
function (state) {
assertEqual(state.checksum, collectionChecksum(cn));
assertEqual(state.count, collectionCount(cn));
}
);
},
////////////////////////////////////////////////////////////////////////////////
/// @brief test transactions
////////////////////////////////////////////////////////////////////////////////
testTransaction4 : function () {
compare(
function (state) {
var c = db._create(cn), i;
db._executeTransaction({
collections: {
write: cn
},
action: function (params) {
var c = require("internal").db._collection(params.cn), i;
for (i = 0; i < 1000; ++i) {
c.save({ "_key" : "test" + i });
}
for (i = 0; i < 1000; ++i) {
c.update("test" + i, { "foo" : "bar" + i });
}
for (i = 0; i < 1000; ++i) {
c.update("test" + i, { "foo" : "baz" + i });
}
for (i = 0; i < 1000; i += 10) {
c.remove("test" + i);
}
},
params: { "cn": cn },
});
state.checksum = collectionChecksum(cn);
state.count = collectionCount(cn);
assertEqual(900, state.count);
},
function (state) {
assertEqual(state.checksum, collectionChecksum(cn));
assertEqual(state.count, collectionCount(cn));
}
);
},
////////////////////////////////////////////////////////////////////////////////
/// @brief test create collection
////////////////////////////////////////////////////////////////////////////////
testCreateCollection1 : function () {
compare(
function (state) {
var c = db._create(cn, {
isVolatile : true,
waitForSync : false,
doCompact : false,
journalSize : 1048576
});
state.cid = c._id;
state.properties = c.properties();
},
function (state) {
var properties = db._collection(cn).properties();
assertEqual(state.cid, db._collection(cn)._id);
assertEqual(cn, db._collection(cn).name());
assertTrue(properties.isVolatile);
assertFalse(properties.waitForSync);
assertFalse(properties.deleted);
assertFalse(properties.doCompact);
assertEqual(1048576, properties.journalSize);
assertTrue(properties.keyOptions.allowUserKeys);
assertEqual("traditional", properties.keyOptions.type);
}
);
},
////////////////////////////////////////////////////////////////////////////////
/// @brief test create collection
////////////////////////////////////////////////////////////////////////////////
testCreateCollection2 : function () {
compare(
function (state) {
var c = db._create(cn, {
keyOptions : {
type : "autoincrement",
allowUserKeys : false
},
isVolatile : false,
waitForSync : true,
doCompact : true,
journalSize : 2097152
});
state.cid = c._id;
state.properties = c.properties();
},
function (state) {
var properties = db._collection(cn).properties();
assertEqual(state.cid, db._collection(cn)._id);
assertEqual(cn, db._collection(cn).name());
assertFalse(properties.isVolatile);
assertTrue(properties.waitForSync);
assertFalse(properties.deleted);
assertTrue(properties.doCompact);
assertEqual(2097152, properties.journalSize);
assertFalse(properties.keyOptions.allowUserKeys);
assertEqual("autoincrement", properties.keyOptions.type);
}
);
},
////////////////////////////////////////////////////////////////////////////////
/// @brief test drop collection
////////////////////////////////////////////////////////////////////////////////
testDropCollection : function () {
compare(
function (state) {
var c = db._create(cn);
c.drop();
},
function (state) {
assertNull(db._collection(cn));
}
);
},
////////////////////////////////////////////////////////////////////////////////
/// @brief test cap constraint
////////////////////////////////////////////////////////////////////////////////
testCapConstraint : function () {
compare(
function (state) {
var c = db._create(cn), i;
c.ensureCapConstraint(128);
for (i = 0; i < 1000; ++i) {
c.save({ "_key" : "test" + i });
}
state.last = c.last(3);
state.checksum = collectionChecksum(cn);
state.count = collectionCount(cn);
assertEqual(128, state.count);
assertEqual("test999", state.last[0]._key);
assertEqual("test998", state.last[1]._key);
assertEqual("test997", state.last[2]._key);
state.idx = c.getIndexes()[1];
},
function (state) {
assertEqual(state.checksum, collectionChecksum(cn));
assertEqual(state.count, collectionCount(cn));
assertEqual(state.last, db._collection(cn).last(3));
assertEqual(state.idx.id, db._collection(cn).getIndexes()[1].id);
}
);
},
////////////////////////////////////////////////////////////////////////////////
/// @brief test hash index
////////////////////////////////////////////////////////////////////////////////
testUniqueConstraint : function () {
compare(
function (state) {
var c = db._create(cn), i;
c.ensureHashIndex("a", "b");
for (i = 0; i < 1000; ++i) {
c.save({ "_key" : "test" + i, "a" : parseInt(i / 2), "b" : i });
}
state.checksum = collectionChecksum(cn);
state.count = collectionCount(cn);
assertEqual(1000, state.count);
state.idx = c.getIndexes()[1];
},
function (state) {
assertEqual(state.checksum, collectionChecksum(cn));
assertEqual(state.count, collectionCount(cn));
var idx = db._collection(cn).getIndexes()[1];
assertEqual(state.idx.id, idx.id);
assertEqual("hash", state.idx.type);
assertFalse(state.idx.unique);
assertEqual([ "a" ], state.idx.fields);
}
);
},
////////////////////////////////////////////////////////////////////////////////
/// @brief test unique constraint
////////////////////////////////////////////////////////////////////////////////
testUniqueConstraint : function () {
compare(
function (state) {
var c = db._create(cn), i;
c.ensureUniqueConstraint("a");
for (i = 0; i < 1000; ++i) {
try {
c.save({ "_key" : "test" + i, "a" : parseInt(i / 2) });
}
catch (err) {
}
}
state.checksum = collectionChecksum(cn);
state.count = collectionCount(cn);
assertEqual(500, state.count);
state.idx = c.getIndexes()[1];
},
function (state) {
assertEqual(state.checksum, collectionChecksum(cn));
assertEqual(state.count, collectionCount(cn));
var idx = db._collection(cn).getIndexes()[1];
assertEqual(state.idx.id, idx.id);
assertEqual("hash", state.idx.type);
assertTrue(state.idx.unique);
assertEqual([ "a" ], state.idx.fields);
}
);
},
////////////////////////////////////////////////////////////////////////////////
/// @brief test skiplist
////////////////////////////////////////////////////////////////////////////////
testSkiplist : function () {
compare(
function (state) {
var c = db._create(cn), i;
c.ensureSkiplist("a", "b");
for (i = 0; i < 1000; ++i) {
c.save({ "_key" : "test" + i, "a" : parseInt(i / 2), "b" : i });
}
state.checksum = collectionChecksum(cn);
state.count = collectionCount(cn);
assertEqual(1000, state.count);
state.idx = c.getIndexes()[1];
},
function (state) {
assertEqual(state.checksum, collectionChecksum(cn));
assertEqual(state.count, collectionCount(cn));
var idx = db._collection(cn).getIndexes()[1];
assertEqual(state.idx.id, idx.id);
assertEqual("skiplist", state.idx.type);
assertFalse(state.idx.unique);
assertEqual([ "a", "b" ], state.idx.fields);
}
);
},
////////////////////////////////////////////////////////////////////////////////
/// @brief test unique skiplist
////////////////////////////////////////////////////////////////////////////////
testUniqueSkiplist : function () {
compare(
function (state) {
var c = db._create(cn), i;
c.ensureUniqueSkiplist("a");
for (i = 0; i < 1000; ++i) {
try {
c.save({ "_key" : "test" + i, "a" : parseInt(i / 2) });
}
catch (err) {
}
}
state.checksum = collectionChecksum(cn);
state.count = collectionCount(cn);
assertEqual(500, state.count);
state.idx = c.getIndexes()[1];
},
function (state) {
assertEqual(state.checksum, collectionChecksum(cn));
assertEqual(state.count, collectionCount(cn));
var idx = db._collection(cn).getIndexes()[1];
assertEqual(state.idx.id, idx.id);
assertEqual("skiplist", state.idx.type);
assertTrue(state.idx.unique);
assertEqual([ "a" ], state.idx.fields);
}
);
}
};
}
// -----------------------------------------------------------------------------
// --SECTION-- main
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief executes the test suite
////////////////////////////////////////////////////////////////////////////////
jsunity.run(ReplicationSuite);
return jsunity.done();
// Local Variables:
// mode: outline-minor
// outline-regexp: "^\\(/// @brief\\|/// @addtogroup\\|// --SECTION--\\|/// @page\\|/// @}\\)"
// End:

View File

@ -107,6 +107,51 @@ double JsonHelper::getNumberValue (TRI_json_t const* json,
return defaultValue;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief returns a double sub-element, or a default it is does not exist
////////////////////////////////////////////////////////////////////////////////
double JsonHelper::getDoubleValue (TRI_json_t const* json,
const char* name,
double defaultValue) {
TRI_json_t const* sub = getArrayElement(json, name);
if (isNumber(sub)) {
return sub->_value._number;
}
return defaultValue;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief returns an int sub-element, or a default it is does not exist
////////////////////////////////////////////////////////////////////////////////
int JsonHelper::getIntValue (TRI_json_t const* json,
const char* name,
int defaultValue) {
TRI_json_t const* sub = getArrayElement(json, name);
if (isNumber(sub)) {
return (int) sub->_value._number;
}
return defaultValue;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief returns a uint64 sub-element, or a default it is does not exist
////////////////////////////////////////////////////////////////////////////////
uint64_t JsonHelper::getUInt64Value (TRI_json_t const* json,
const char* name,
uint64_t defaultValue) {
TRI_json_t const* sub = getArrayElement(json, name);
if (isNumber(sub)) {
return (uint64_t) sub->_value._number;
}
return defaultValue;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief returns a boolean sub-element, or a default it is does not exist
////////////////////////////////////////////////////////////////////////////////

View File

@ -138,6 +138,30 @@ namespace triagens {
const char*,
double);
////////////////////////////////////////////////////////////////////////////////
/// @brief returns a double sub-element, or a default it is does not exist
////////////////////////////////////////////////////////////////////////////////
static double getDoubleValue (TRI_json_t const*,
const char*,
double);
////////////////////////////////////////////////////////////////////////////////
/// @brief returns an int sub-element, or a default it is does not exist
////////////////////////////////////////////////////////////////////////////////
static int getIntValue (TRI_json_t const*,
const char*,
int);
////////////////////////////////////////////////////////////////////////////////
/// @brief returns a uint64 sub-element, or a default it is does not exist
////////////////////////////////////////////////////////////////////////////////
static uint64_t getUInt64Value (TRI_json_t const*,
const char*,
uint64_t);
////////////////////////////////////////////////////////////////////////////////
/// @brief returns a boolean sub-element, or a default it is does not exist
////////////////////////////////////////////////////////////////////////////////