1
0
Fork 0

fully removed replication logger

This commit is contained in:
Jan Steemann 2014-06-21 00:32:47 +02:00
parent c1b24339ae
commit 16bf152c8f
32 changed files with 275 additions and 2162 deletions

View File

@ -82,7 +82,6 @@ add_executable(
HashIndex/hash-index.cpp
IndexIterators/index-iterator.cpp
IndexOperators/index-operator.cpp
Replication/replication-static.cpp
Replication/ContinuousSyncer.cpp
Replication/InitialSyncer.cpp
Replication/Syncer.cpp
@ -125,7 +124,6 @@ add_executable(
VocBase/replication-applier.cpp
VocBase/replication-common.cpp
VocBase/replication-dump.cpp
VocBase/replication-logger.cpp
VocBase/replication-master.cpp
VocBase/server.cpp
VocBase/transaction.cpp

View File

@ -63,7 +63,6 @@ arangod_libarangod_a_SOURCES = \
arangod/HashIndex/hash-index.cpp \
arangod/IndexIterators/index-iterator.cpp \
arangod/IndexOperators/index-operator.cpp \
arangod/Replication/replication-static.cpp \
arangod/Replication/ContinuousSyncer.cpp \
arangod/Replication/InitialSyncer.cpp \
arangod/Replication/Syncer.cpp \
@ -106,7 +105,6 @@ arangod_libarangod_a_SOURCES = \
arangod/VocBase/replication-applier.cpp \
arangod/VocBase/replication-common.cpp \
arangod/VocBase/replication-dump.cpp \
arangod/VocBase/replication-logger.cpp \
arangod/VocBase/replication-master.cpp \
arangod/VocBase/server.cpp \
arangod/VocBase/transaction.cpp \

View File

@ -738,7 +738,7 @@ int InitialSyncer::handleCollection (TRI_json_t const* parameters,
errorMsg = "unable to start transaction: " + string(TRI_errno_string(res));
}
else {
res = handleCollectionDump(trxCollection, masterName, _masterInfo._state._lastLogTick, errorMsg);
res = handleCollectionDump(trxCollection, masterName, _masterInfo._lastLogTick, errorMsg);
}
if (res == TRI_ERROR_NO_ERROR) {

View File

@ -116,7 +116,7 @@ namespace triagens {
////////////////////////////////////////////////////////////////////////////////
TRI_voc_tick_t getLastLogTick () const {
return _masterInfo._state._lastLogTick;
return _masterInfo._lastLogTick;
}
////////////////////////////////////////////////////////////////////////////////

View File

@ -709,8 +709,8 @@ int Syncer::handleStateResponse (TRI_json_t const* json,
_masterInfo._majorVersion = major;
_masterInfo._minorVersion = minor;
_masterInfo._serverId = masterId;
_masterInfo._state._lastLogTick = lastLogTick;
_masterInfo._state._active = running;
_masterInfo._lastLogTick = lastLogTick;
_masterInfo._active = running;
TRI_LogMasterInfoReplication(&_masterInfo, "connected to");

View File

@ -1,86 +0,0 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief replication data fetcher
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2014 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Jan Steemann
/// @author Copyright 2014, ArangoDB GmbH, Cologne, Germany
/// @author Copyright 2013, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#include "replication-static.h"
#include "Replication/ContinuousSyncer.h"
#include "VocBase/vocbase.h"
using namespace std;
using namespace triagens::arango;
// -----------------------------------------------------------------------------
// --SECTION-- public functions
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief static create method
////////////////////////////////////////////////////////////////////////////////
void* TRI_CreateContinuousSyncerReplication (TRI_vocbase_t* vocbase,
TRI_replication_applier_configuration_t const* configuration,
TRI_voc_tick_t initialTick,
bool useTick) {
ContinuousSyncer* s = new ContinuousSyncer(vocbase,
configuration,
initialTick,
useTick);
return (void*) s;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief static free method
////////////////////////////////////////////////////////////////////////////////
void TRI_DeleteContinuousSyncerReplication (void* ptr) {
ContinuousSyncer* s = static_cast<ContinuousSyncer*>(ptr);
delete s;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief static run method
////////////////////////////////////////////////////////////////////////////////
int TRI_RunContinuousSyncerReplication (void* ptr) {
ContinuousSyncer* s = static_cast<ContinuousSyncer*>(ptr);
return s->run();
}
// -----------------------------------------------------------------------------
// --SECTION-- END-OF-FILE
// -----------------------------------------------------------------------------
// Local Variables:
// mode: outline-minor
// outline-regexp: "/// @brief\\|/// {@inheritDoc}\\|/// @page\\|// --SECTION--\\|/// @\\}"
// End:

View File

@ -1,78 +0,0 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief replication data fetcher
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2014 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Jan Steemann
/// @author Copyright 2014, ArangoDB GmbH, Cologne, Germany
/// @author Copyright 2013, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGODB_REPLICATION_REPLICATION__STATIC_H
#define ARANGODB_REPLICATION_REPLICATION__STATIC_H 1
#include "Basics/Common.h"
#include "VocBase/voc-types.h"
// -----------------------------------------------------------------------------
// --SECTION-- forward declarations
// -----------------------------------------------------------------------------
struct TRI_replication_applier_configuration_s;
struct TRI_vocbase_s;
// -----------------------------------------------------------------------------
// --SECTION-- public functions
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief static create method
////////////////////////////////////////////////////////////////////////////////
void* TRI_CreateContinuousSyncerReplication (struct TRI_vocbase_s*,
struct TRI_replication_applier_configuration_s const*,
TRI_voc_tick_t initialTick,
bool);
////////////////////////////////////////////////////////////////////////////////
/// @brief static free method
////////////////////////////////////////////////////////////////////////////////
void TRI_DeleteContinuousSyncerReplication (void*);
////////////////////////////////////////////////////////////////////////////////
/// @brief static run method
////////////////////////////////////////////////////////////////////////////////
int TRI_RunContinuousSyncerReplication (void*);
#endif
// -----------------------------------------------------------------------------
// --SECTION-- END-OF-FILE
// -----------------------------------------------------------------------------
// Local Variables:
// mode: outline-minor
// outline-regexp: "/// @brief\\|/// {@inheritDoc}\\|/// @page\\|// --SECTION--\\|/// @\\}"
// End:

View File

@ -36,14 +36,15 @@
#include "HttpServer/HttpServer.h"
#include "Replication/InitialSyncer.h"
#include "Rest/HttpRequest.h"
#include "Utils/CollectionGuard.h"
#include "Utils/transactions.h"
#include "VocBase/compactor.h"
#include "VocBase/replication-applier.h"
#include "VocBase/replication-dump.h"
#include "VocBase/replication-logger.h"
#include "VocBase/server.h"
#include "VocBase/update-policy.h"
#include "VocBase/index.h"
#include "Wal/LogfileManager.h"
#include "Cluster/ClusterMethods.h"
#include "Cluster/ClusterComm.h"
@ -394,7 +395,8 @@ void RestReplicationHandler::insertClient (TRI_voc_tick_t lastServedTick) {
TRI_server_id_t serverId = (TRI_server_id_t) StringUtils::uint64(value);
if (serverId > 0) {
TRI_UpdateClientReplicationLogger(_vocbase->_replicationLogger, serverId, lastServedTick);
// TODO: FIXME!!
// TRI_UpdateClientReplicationLogger(_vocbase->_replicationLogger, serverId, lastServedTick);
}
}
}
@ -469,15 +471,8 @@ uint64_t RestReplicationHandler::determineChunkSize () const {
////////////////////////////////////////////////////////////////////////////////
void RestReplicationHandler::handleCommandLoggerStart () {
TRI_ASSERT(_vocbase->_replicationLogger != 0);
int res = TRI_StartReplicationLogger(_vocbase->_replicationLogger);
if (res != TRI_ERROR_NO_ERROR) {
generateError(HttpResponse::SERVER_ERROR, res);
return;
}
// the logger in ArangoDB 2.2 is now the WAL...
// so the logger cannot be started but is always running
TRI_json_t result;
TRI_InitArrayJson(TRI_CORE_MEM_ZONE, &result);
@ -534,19 +529,12 @@ void RestReplicationHandler::handleCommandLoggerStart () {
////////////////////////////////////////////////////////////////////////////////
void RestReplicationHandler::handleCommandLoggerStop () {
TRI_ASSERT(_vocbase->_replicationLogger != 0);
int res = TRI_StopReplicationLogger(_vocbase->_replicationLogger);
if (res != TRI_ERROR_NO_ERROR) {
generateError(HttpResponse::SERVER_ERROR, res);
return;
}
// the logger in ArangoDB 2.2 is now the WAL...
// so the logger cannot be stopped
TRI_json_t result;
TRI_InitArrayJson(TRI_CORE_MEM_ZONE, &result);
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, &result, "running", TRI_CreateBooleanJson(TRI_CORE_MEM_ZONE, false));
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, &result, "running", TRI_CreateBooleanJson(TRI_CORE_MEM_ZONE, true));
generateResult(&result);
TRI_DestroyJson(TRI_CORE_MEM_ZONE, &result);
@ -643,17 +631,46 @@ void RestReplicationHandler::handleCommandLoggerStop () {
////////////////////////////////////////////////////////////////////////////////
void RestReplicationHandler::handleCommandLoggerState () {
TRI_ASSERT(_vocbase->_replicationLogger != 0);
TRI_json_t* json = TRI_CreateArrayJson(TRI_UNKNOWN_MEM_ZONE);
TRI_json_t* json = TRI_JsonReplicationLogger(_vocbase->_replicationLogger);
if (json == 0) {
if (json == nullptr) {
generateError(HttpResponse::SERVER_ERROR, TRI_ERROR_OUT_OF_MEMORY);
return;
}
// "state" part
TRI_json_t* state = TRI_CreateArrayJson(TRI_UNKNOWN_MEM_ZONE);
if (state == nullptr) {
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
generateError(HttpResponse::SERVER_ERROR, TRI_ERROR_OUT_OF_MEMORY);
return;
}
triagens::wal::LogfileManagerState const&& s = triagens::wal::LogfileManager::instance()->state();
TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, state, "running", TRI_CreateBooleanJson(TRI_UNKNOWN_MEM_ZONE, true));
TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, state, "lastLogTick", TRI_CreateStringCopyJson(TRI_UNKNOWN_MEM_ZONE, StringUtils::itoa(s.lastTick).c_str()));
TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, state, "totalEvents", TRI_CreateNumberJson(TRI_UNKNOWN_MEM_ZONE, s.numEvents));
TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, state, "time", TRI_CreateStringCopyJson(TRI_UNKNOWN_MEM_ZONE, s.timeString.c_str()));
TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "state", state);
// "server" part
TRI_json_t* server = TRI_CreateArrayJson(TRI_UNKNOWN_MEM_ZONE);
if (server == nullptr) {
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
generateError(HttpResponse::SERVER_ERROR, TRI_ERROR_OUT_OF_MEMORY);
return;
}
TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, server, "version", TRI_CreateStringCopyJson(TRI_UNKNOWN_MEM_ZONE, TRI_VERSION));
char* serverIdString = TRI_StringUInt64(TRI_GetIdServer());
TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, server, "serverId", TRI_CreateStringCopyJson(TRI_UNKNOWN_MEM_ZONE, serverIdString));
TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "server", server);
generateResult(json);
TRI_FreeJson(TRI_CORE_MEM_ZONE, json);
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
}
////////////////////////////////////////////////////////////////////////////////
@ -704,23 +721,20 @@ void RestReplicationHandler::handleCommandLoggerState () {
////////////////////////////////////////////////////////////////////////////////
void RestReplicationHandler::handleCommandLoggerGetConfig () {
TRI_ASSERT(_vocbase->_replicationLogger != 0);
TRI_json_t* json = TRI_CreateArrayJson(TRI_UNKNOWN_MEM_ZONE);
TRI_replication_logger_configuration_t config;
TRI_ReadLockReadWriteLock(&_vocbase->_replicationLogger->_statusLock);
TRI_CopyConfigurationReplicationLogger(&_vocbase->_replicationLogger->_configuration, &config);
TRI_ReadUnlockReadWriteLock(&_vocbase->_replicationLogger->_statusLock);
TRI_json_t* json = TRI_JsonConfigurationReplicationLogger(&config);
if (json == 0) {
if (json == nullptr) {
generateError(HttpResponse::SERVER_ERROR, TRI_ERROR_OUT_OF_MEMORY);
return;
}
TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "autoStart", TRI_CreateBooleanJson(TRI_UNKNOWN_MEM_ZONE, true));
TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "logRemoteChanges", TRI_CreateBooleanJson(TRI_UNKNOWN_MEM_ZONE, true));
TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "maxEvents", TRI_CreateNumberJson(TRI_UNKNOWN_MEM_ZONE, 0));
TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "maxEventsSize", TRI_CreateNumberJson(TRI_UNKNOWN_MEM_ZONE, 0));
generateResult(json);
TRI_FreeJson(TRI_CORE_MEM_ZONE, json);
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
}
////////////////////////////////////////////////////////////////////////////////
@ -809,58 +823,6 @@ void RestReplicationHandler::handleCommandLoggerGetConfig () {
////////////////////////////////////////////////////////////////////////////////
void RestReplicationHandler::handleCommandLoggerSetConfig () {
TRI_ASSERT(_vocbase->_replicationLogger != 0);
TRI_replication_logger_configuration_t config;
// copy previous config
TRI_ReadLockReadWriteLock(&_vocbase->_replicationLogger->_statusLock);
TRI_CopyConfigurationReplicationLogger(&_vocbase->_replicationLogger->_configuration, &config);
TRI_ReadUnlockReadWriteLock(&_vocbase->_replicationLogger->_statusLock);
TRI_json_t* json = parseJsonBody();
if (json == 0) {
generateError(HttpResponse::BAD, TRI_ERROR_HTTP_BAD_PARAMETER);
return;
}
TRI_json_t const* value;
value = JsonHelper::getArrayElement(json, "autoStart");
if (JsonHelper::isBoolean(value)) {
config._autoStart = value->_value._boolean;
}
value = JsonHelper::getArrayElement(json, "logRemoteChanges");
if (JsonHelper::isBoolean(value)) {
config._logRemoteChanges = value->_value._boolean;
}
value = JsonHelper::getArrayElement(json, "maxEvents");
if (JsonHelper::isNumber(value)) {
config._maxEvents = (uint64_t) value->_value._number;
}
value = JsonHelper::getArrayElement(json, "maxEventsSize");
if (JsonHelper::isNumber(value)) {
config._maxEventsSize = (uint64_t) value->_value._number;
}
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
int res = TRI_ConfigureReplicationLogger(_vocbase->_replicationLogger, &config);
if (res != TRI_ERROR_NO_ERROR) {
if (res == TRI_ERROR_REPLICATION_INVALID_APPLIER_CONFIGURATION) {
generateError(HttpResponse::BAD, res);
}
else {
generateError(HttpResponse::SERVER_ERROR, res);
}
return;
}
handleCommandLoggerGetConfig();
}
@ -1378,14 +1340,10 @@ void RestReplicationHandler::handleCommandLoggerFollow () {
int res = TRI_DumpLogReplication(_vocbase, &dump, tickStart, tickEnd, chunkSize);
TRI_replication_logger_state_t state;
triagens::wal::LogfileManagerState state = triagens::wal::LogfileManager::instance()->state();
if (res == TRI_ERROR_NO_ERROR) {
res = TRI_StateReplicationLogger(_vocbase->_replicationLogger, &state);
}
if (res == TRI_ERROR_NO_ERROR) {
const bool checkMore = (dump._lastFoundTick > 0 && dump._lastFoundTick != state._lastLogTick);
const bool checkMore = (dump._lastFoundTick > 0 && dump._lastFoundTick != state.lastTick);
// generate the result
const size_t length = TRI_LengthStringBuffer(dump._buffer);
@ -1410,11 +1368,11 @@ void RestReplicationHandler::handleCommandLoggerFollow () {
_response->setHeader(TRI_REPLICATION_HEADER_LASTTICK,
strlen(TRI_REPLICATION_HEADER_LASTTICK),
StringUtils::itoa(state._lastLogTick));
StringUtils::itoa(state.lastTick));
_response->setHeader(TRI_REPLICATION_HEADER_ACTIVE,
strlen(TRI_REPLICATION_HEADER_ACTIVE),
state._active ? "true" : "false");
strlen(TRI_REPLICATION_HEADER_ACTIVE),
"true");
if (length > 0) {
// transfer ownership of the buffer contents
@ -1559,8 +1517,6 @@ void RestReplicationHandler::handleCommandLoggerFollow () {
////////////////////////////////////////////////////////////////////////////////
void RestReplicationHandler::handleCommandInventory () {
TRI_ASSERT(_vocbase->_replicationLogger != 0);
TRI_voc_tick_t tick = TRI_CurrentTickServer();
// include system collections?
@ -1583,7 +1539,7 @@ void RestReplicationHandler::handleCommandInventory () {
TRI_ASSERT(JsonHelper::isList(collections));
// sort collections by type, then name
const size_t n = collections->_value._objects._length;
size_t const n = collections->_value._objects._length;
if (n > 1) {
// sort by collection type (vertices before edges), then name
@ -1591,26 +1547,32 @@ void RestReplicationHandler::handleCommandInventory () {
}
TRI_replication_logger_state_t state;
int res = TRI_StateReplicationLogger(_vocbase->_replicationLogger, &state);
if (res != TRI_ERROR_NO_ERROR) {
TRI_Free(TRI_CORE_MEM_ZONE, collections);
generateError(HttpResponse::SERVER_ERROR, res);
return;
}
TRI_json_t json;
TRI_InitArrayJson(TRI_CORE_MEM_ZONE, &json);
char* tickString = TRI_StringUInt64(tick);
// add collections data
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, &json, "collections", collections);
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, &json, "state", TRI_JsonStateReplicationLogger(&state));
// "state"
TRI_json_t* state = TRI_CreateArrayJson(TRI_CORE_MEM_ZONE);
if (state == nullptr) {
TRI_DestroyJson(TRI_CORE_MEM_ZONE, &json);
generateError(HttpResponse::SERVER_ERROR, TRI_ERROR_OUT_OF_MEMORY);
return;
}
triagens::wal::LogfileManagerState const&& s = triagens::wal::LogfileManager::instance()->state();
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, state, "running", TRI_CreateBooleanJson(TRI_CORE_MEM_ZONE, true));
char* logTickString = TRI_StringUInt64(s.lastTick);
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, state, "lastLogTick", TRI_CreateStringJson(TRI_CORE_MEM_ZONE, logTickString));
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, state, "totalEvents", TRI_CreateNumberJson(TRI_CORE_MEM_ZONE, s.numEvents));
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, state, "time", TRI_CreateStringCopyJson(TRI_CORE_MEM_ZONE, s.timeString.c_str()));
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, &json, "state", state);
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, &json, "tick", TRI_CreateStringJson(TRI_CORE_MEM_ZONE, tickString));
generateResult(&json);
@ -3158,7 +3120,7 @@ void RestReplicationHandler::handleCommandRestoreDataCoordinator () {
void RestReplicationHandler::handleCommandDump () {
char const* collection = _request->value("collection");
if (collection == 0) {
if (collection == nullptr) {
generateError(HttpResponse::BAD,
TRI_ERROR_HTTP_BAD_PARAMETER,
"invalid collection parameter");
@ -3167,13 +3129,22 @@ void RestReplicationHandler::handleCommandDump () {
// determine start tick for dump
TRI_voc_tick_t tickStart = 0;
TRI_voc_tick_t tickEnd = (TRI_voc_tick_t) UINT64_MAX;
TRI_voc_tick_t tickEnd = static_cast<TRI_voc_tick_t>(UINT64_MAX);
bool flush = true; // flush WAL before dumping?
bool withTicks = true;
bool translateCollectionIds = true;
bool found;
char const* value;
// determine flush WAL value
value = _request->value("flush", found);
if (found) {
flush = StringUtils::boolean(value);
}
// determine start tick for dump
value = _request->value("from", found);
if (found) {
@ -3195,54 +3166,60 @@ void RestReplicationHandler::handleCommandDump () {
}
value = _request->value("ticks", found);
if (found) {
withTicks = StringUtils::boolean(value);
}
value = _request->value("translateIds", found);
if (found) {
translateCollectionIds = StringUtils::boolean(value);
}
const uint64_t chunkSize = determineChunkSize();
uint64_t const chunkSize = determineChunkSize();
TRI_vocbase_col_t* c = TRI_LookupCollectionByNameVocBase(_vocbase, collection);
if (c == 0) {
if (c == nullptr) {
generateError(HttpResponse::NOT_FOUND, TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND);
return;
}
const TRI_voc_cid_t cid = c->_cid;
LOG_TRACE("requested collection dump for collection '%s', tickStart: %llu, tickEnd: %llu",
collection,
(unsigned long long) tickStart,
(unsigned long long) tickEnd);
LOG_DEBUG("requested collection dump for collection '%s', tickStart: %llu, tickEnd: %llu",
collection,
(unsigned long long) tickStart,
(unsigned long long) tickEnd);
int res = TRI_ERROR_NO_ERROR;
TRI_vocbase_col_status_e status;
TRI_vocbase_col_t* col = TRI_UseCollectionByIdVocBase(_vocbase, cid, status);
try {
if (flush) {
triagens::wal::LogfileManager::instance()->flush(true, true, false);
}
if (col == 0) {
generateError(HttpResponse::NOT_FOUND, TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND);
return;
}
triagens::arango::CollectionGuard guard(_vocbase, c->_cid, false);
// initialise the dump container
TRI_replication_dump_t dump;
if (TRI_InitDumpReplication(&dump, _vocbase, (size_t) defaultChunkSize) != TRI_ERROR_NO_ERROR) {
TRI_ReleaseCollectionVocBase(_vocbase, col);
generateError(HttpResponse::SERVER_ERROR, TRI_ERROR_OUT_OF_MEMORY);
return;
}
TRI_vocbase_col_t* col = guard.collection();
TRI_ASSERT(col != nullptr);
// initialise the dump container
TRI_replication_dump_t dump;
res = TRI_InitDumpReplication(&dump, _vocbase, (size_t) defaultChunkSize);
int res = TRI_DumpCollectionReplication(&dump, col, tickStart, tickEnd, chunkSize, withTicks, translateCollectionIds);
if (res != TRI_ERROR_NO_ERROR) {
THROW_ARANGO_EXCEPTION(res);
}
res = TRI_DumpCollectionReplication(&dump, col, tickStart, tickEnd, chunkSize, withTicks, translateCollectionIds);
TRI_ReleaseCollectionVocBase(_vocbase, col);
if (res != TRI_ERROR_NO_ERROR) {
TRI_DestroyDumpReplication(&dump);
THROW_ARANGO_EXCEPTION(res);
}
if (res == TRI_ERROR_NO_ERROR) {
// generate the result
const size_t length = TRI_LengthStringBuffer(dump._buffer);
size_t const length = TRI_LengthStringBuffer(dump._buffer);
if (length == 0) {
_response = createResponse(HttpResponse::NO_CONTENT);
@ -3267,12 +3244,19 @@ void RestReplicationHandler::handleCommandDump () {
// avoid double freeing
TRI_StealStringBuffer(dump._buffer);
TRI_DestroyDumpReplication(&dump);
}
else {
catch (triagens::arango::Exception const& ex) {
res = ex.code();
}
catch (...) {
res = TRI_ERROR_INTERNAL;
}
if (res != TRI_ERROR_NO_ERROR) {
generateError(HttpResponse::SERVER_ERROR, res);
}
TRI_DestroyDumpReplication(&dump);
}
////////////////////////////////////////////////////////////////////////////////

View File

@ -285,7 +285,6 @@ ArangoServer::ArangoServer (int argc, char** argv)
_defaultWaitForSync(false),
_forceSyncProperties(true),
_unusedForceSyncShapes(false),
_disableReplicationLogger(false),
_disableReplicationApplier(false),
_removeOnDrop(true),
_server(nullptr) {
@ -417,6 +416,7 @@ void ArangoServer::buildApplicationServer () {
("ruby.action-directory", &ignoreOpt, "path to the Ruby action directory")
("ruby.modules-path", &ignoreOpt, "one or more directories separated by (semi-) colons")
("ruby.startup-directory", &ignoreOpt, "path to the directory containing alternate Ruby startup scripts")
("server.disable-replication-logger", &ignoreOpt, "start with replication logger turned off")
;
// .............................................................................
@ -539,7 +539,6 @@ void ArangoServer::buildApplicationServer () {
#ifdef TRI_HAVE_LINUX_SOCKETS
("server.disable-authentication-unix-sockets", &_disableAuthenticationUnixSockets, "disable authentication for requests via UNIX domain sockets")
#endif
("server.disable-replication-logger", &_disableReplicationLogger, "start with replication logger turned off")
("server.disable-replication-applier", &_disableReplicationApplier, "start with replication applier turned off")
("server.allow-use-database", &allowUseDatabaseInRESTActions, "allow change of database in REST actions, only needed for unittests")
;
@ -1114,7 +1113,6 @@ void ArangoServer::openDatabases (bool checkVersion,
_applicationV8->appPath().c_str(),
_applicationV8->devAppPath().c_str(),
&defaults,
_disableReplicationLogger,
_disableReplicationApplier,
iterateMarkersOnOpen);

View File

@ -404,26 +404,6 @@ namespace triagens {
bool _unusedForceSyncShapes;
////////////////////////////////////////////////////////////////////////////////
/// @brief disable the replication logger on server startup
///
/// @CMDOPT{\--server.disable-replication-logger @CA{flag}}
///
/// If @LIT{true} the server will start with the replication logger turned off,
/// even if the replication logger is configured with the `autoStart` option.
/// Using this option will not change the value of the `autoStart` option in
/// the logger configuration, but will suppress auto-starting the replication
/// logger just once.
///
/// If the option is not used, ArangoDB will read the logger configuration from
/// the file `REPLICATION-LOGGER-CONFIG` on startup, and use the value of the
/// `autoStart` attribute from this file.
///
/// The default is @LIT{false}.
////////////////////////////////////////////////////////////////////////////////
bool _disableReplicationLogger;
////////////////////////////////////////////////////////////////////////////////
/// @brief disable the replication applier on server startup
///

View File

@ -64,7 +64,6 @@
#include "VocBase/general-cursor.h"
#include "VocBase/key-generator.h"
#include "VocBase/replication-applier.h"
#include "VocBase/replication-logger.h"
#include "VocBase/server.h"
#include "VocBase/voc-shaper.h"
#include "VocBase/index.h"
@ -4564,66 +4563,6 @@ static v8::Handle<v8::Value> JS_DeleteCursor (v8::Arguments const& argv) {
// --SECTION-- REPLICATION
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief start the replication logger manually
////////////////////////////////////////////////////////////////////////////////
static v8::Handle<v8::Value> JS_StartLoggerReplication (v8::Arguments const& argv) {
v8::HandleScope scope;
if (argv.Length() != 0) {
TRI_V8_EXCEPTION_USAGE(scope, "REPLICATION_LOGGER_START()");
}
TRI_vocbase_t* vocbase = GetContextVocBase();
if (vocbase == 0) {
TRI_V8_EXCEPTION(scope, TRI_ERROR_ARANGO_DATABASE_NOT_FOUND);
}
if (vocbase->_replicationLogger == 0) {
TRI_V8_EXCEPTION(scope, TRI_ERROR_INTERNAL);
}
int res = TRI_StartReplicationLogger(vocbase->_replicationLogger);
if (res != TRI_ERROR_NO_ERROR) {
TRI_V8_EXCEPTION_MESSAGE(scope, res, "cannot start replication logger");
}
return scope.Close(v8::True());
}
////////////////////////////////////////////////////////////////////////////////
/// @brief stop the replication logger manually
////////////////////////////////////////////////////////////////////////////////
static v8::Handle<v8::Value> JS_StopLoggerReplication (v8::Arguments const& argv) {
v8::HandleScope scope;
if (argv.Length() != 0) {
TRI_V8_EXCEPTION_USAGE(scope, "REPLICATION_LOGGER_STOP()");
}
TRI_vocbase_t* vocbase = GetContextVocBase();
if (vocbase == 0) {
TRI_V8_EXCEPTION(scope, TRI_ERROR_ARANGO_DATABASE_NOT_FOUND);
}
if (vocbase->_replicationLogger == 0) {
TRI_V8_EXCEPTION(scope, TRI_ERROR_INTERNAL);
}
int res = TRI_StopReplicationLogger(vocbase->_replicationLogger);
if (res != TRI_ERROR_NO_ERROR) {
TRI_V8_EXCEPTION_MESSAGE(scope, res, "cannot stop replication logger");
}
return scope.Close(v8::True());
}
////////////////////////////////////////////////////////////////////////////////
/// @brief get the state of the replication logger
////////////////////////////////////////////////////////////////////////////////
@ -4631,124 +4570,44 @@ static v8::Handle<v8::Value> JS_StopLoggerReplication (v8::Arguments const& argv
static v8::Handle<v8::Value> JS_StateLoggerReplication (v8::Arguments const& argv) {
v8::HandleScope scope;
if (argv.Length() != 0) {
TRI_V8_EXCEPTION_USAGE(scope, "REPLICATION_LOGGER_STATE()");
}
triagens::wal::LogfileManagerState s = triagens::wal::LogfileManager::instance()->state();
TRI_vocbase_t* vocbase = GetContextVocBase();
v8::Handle<v8::Object> result = v8::Object::New();
if (vocbase == 0) {
TRI_V8_EXCEPTION(scope, TRI_ERROR_ARANGO_DATABASE_NOT_FOUND);
}
v8::Handle<v8::Object> state = v8::Object::New();
state->Set(TRI_V8_STRING("running"), v8::True());
state->Set(TRI_V8_STRING("lastLogTick"), V8TickId(s.lastTick));
state->Set(TRI_V8_STRING("totalEvents"), v8::Number::New(s.numEvents));
state->Set(TRI_V8_STRING("time"), v8::String::New(s.timeString.c_str(), s.timeString.size()));
result->Set(TRI_V8_STRING("state"), state);
if (vocbase->_replicationLogger == 0) {
TRI_V8_EXCEPTION(scope, TRI_ERROR_INTERNAL);
}
TRI_json_t* json = TRI_JsonReplicationLogger(vocbase->_replicationLogger);
if (json == 0) {
TRI_V8_EXCEPTION(scope, TRI_ERROR_OUT_OF_MEMORY);
}
v8::Handle<v8::Value> result = TRI_ObjectJson(json);
TRI_FreeJson(TRI_CORE_MEM_ZONE, json);
v8::Handle<v8::Object> server = v8::Object::New();
server->Set(TRI_V8_STRING("version"), v8::String::New(TRI_VERSION));
server->Set(TRI_V8_STRING("serverId"), v8::String::New(StringUtils::itoa(TRI_GetIdServer()).c_str()));
result->Set(TRI_V8_STRING("server"), server);
v8::Handle<v8::Object> clients = v8::Object::New();
result->Set(TRI_V8_STRING("clients"), clients);
return scope.Close(result);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief configure the replication logger manually
/// @brief return the configuration of the replication logger
////////////////////////////////////////////////////////////////////////////////
static v8::Handle<v8::Value> JS_ConfigureLoggerReplication (v8::Arguments const& argv) {
v8::HandleScope scope;
TRI_vocbase_t* vocbase = GetContextVocBase();
// the replication logger is actually non-existing in ArangoDB 2.2 and higher
// as there is the WAL. To be downwards-compatible, we'll return dummy values
v8::Handle<v8::Object> result = v8::Object::New();
result->Set(TRI_V8_STRING("autostart"), v8::True());
result->Set(TRI_V8_STRING("logRemoteChanges"), v8::True());
result->Set(TRI_V8_STRING("maxEvents"), v8::Number::New(0));
result->Set(TRI_V8_STRING("maxEventsSize"), v8::Number::New(0));
if (vocbase == 0) {
TRI_V8_EXCEPTION(scope, TRI_ERROR_ARANGO_DATABASE_NOT_FOUND);
}
if (vocbase->_replicationLogger == 0) {
TRI_V8_EXCEPTION(scope, TRI_ERROR_INTERNAL);
}
if (argv.Length() == 0) {
// no argument: return the current configuration
TRI_replication_logger_configuration_t config;
TRI_ReadLockReadWriteLock(&vocbase->_replicationLogger->_statusLock);
TRI_CopyConfigurationReplicationLogger(&vocbase->_replicationLogger->_configuration, &config);
TRI_ReadUnlockReadWriteLock(&vocbase->_replicationLogger->_statusLock);
TRI_json_t* json = TRI_JsonConfigurationReplicationLogger(&config);
if (json == 0) {
TRI_V8_EXCEPTION_MEMORY(scope);
}
v8::Handle<v8::Value> result = TRI_ObjectJson(json);
TRI_FreeJson(TRI_CORE_MEM_ZONE, json);
return scope.Close(result);
}
else {
// set the configuration
if (argv.Length() != 1 || ! argv[0]->IsObject()) {
TRI_V8_EXCEPTION_USAGE(scope, "REPLICATION_LOGGER_CONFIGURE(<configuration>)");
}
TRI_replication_logger_configuration_t config;
// fill with previous configuration
TRI_ReadLockReadWriteLock(&vocbase->_replicationLogger->_statusLock);
TRI_CopyConfigurationReplicationLogger(&vocbase->_replicationLogger->_configuration, &config);
TRI_ReadUnlockReadWriteLock(&vocbase->_replicationLogger->_statusLock);
// treat the argument as an object from now on
v8::Handle<v8::Object> object = v8::Handle<v8::Object>::Cast(argv[0]);
if (object->Has(TRI_V8_SYMBOL("autoStart"))) {
if (object->Get(TRI_V8_SYMBOL("autoStart"))->IsBoolean()) {
config._autoStart = TRI_ObjectToBoolean(object->Get(TRI_V8_SYMBOL("autoStart")));
}
}
if (object->Has(TRI_V8_SYMBOL("logRemoteChanges"))) {
if (object->Get(TRI_V8_SYMBOL("logRemoteChanges"))->IsBoolean()) {
config._logRemoteChanges = TRI_ObjectToBoolean(object->Get(TRI_V8_SYMBOL("logRemoteChanges")));
}
}
if (object->Has(TRI_V8_SYMBOL("maxEvents"))) {
config._maxEvents = TRI_ObjectToUInt64(object->Get(TRI_V8_SYMBOL("maxEvents")), true);
}
if (object->Has(TRI_V8_SYMBOL("maxEventsSize"))) {
config._maxEventsSize = TRI_ObjectToUInt64(object->Get(TRI_V8_SYMBOL("maxEventsSize")), true);
}
int res = TRI_ConfigureReplicationLogger(vocbase->_replicationLogger, &config);
if (res != TRI_ERROR_NO_ERROR) {
TRI_V8_EXCEPTION(scope, res);
}
TRI_json_t* json = TRI_JsonConfigurationReplicationLogger(&config);
if (json == 0) {
TRI_V8_EXCEPTION_MEMORY(scope);
}
v8::Handle<v8::Value> result = TRI_ObjectJson(json);
TRI_FreeJson(TRI_CORE_MEM_ZONE, json);
return scope.Close(result);
}
return scope.Close(result);
}
////////////////////////////////////////////////////////////////////////////////
@ -5159,7 +5018,7 @@ static v8::Handle<v8::Value> JS_StateApplierReplication (v8::Arguments const& ar
TRI_V8_EXCEPTION(scope, TRI_ERROR_ARANGO_DATABASE_NOT_FOUND);
}
if (vocbase->_replicationLogger == 0) {
if (vocbase->_replicationApplier == 0) {
TRI_V8_EXCEPTION(scope, TRI_ERROR_INTERNAL);
}
@ -10158,8 +10017,6 @@ void TRI_InitV8VocBridge (v8::Handle<v8::Context> context,
TRI_AddGlobalFunctionVocbase(context, "DELETE_CURSOR", JS_DeleteCursor, true);
// replication functions. not intended to be used by end users
TRI_AddGlobalFunctionVocbase(context, "REPLICATION_LOGGER_START", JS_StartLoggerReplication, true);
TRI_AddGlobalFunctionVocbase(context, "REPLICATION_LOGGER_STOP", JS_StopLoggerReplication, true);
TRI_AddGlobalFunctionVocbase(context, "REPLICATION_LOGGER_STATE", JS_StateLoggerReplication, true);
TRI_AddGlobalFunctionVocbase(context, "REPLICATION_LOGGER_CONFIGURE", JS_ConfigureLoggerReplication, true);
TRI_AddGlobalFunctionVocbase(context, "REPLICATION_SYNCHRONISE", JS_SynchroniseReplication, true);

View File

@ -47,7 +47,6 @@
#include "VocBase/index.h"
#include "VocBase/key-generator.h"
#include "VocBase/primary-index.h"
#include "VocBase/replication-logger.h"
#include "VocBase/server.h"
#include "VocBase/update-policy.h"
#include "VocBase/voc-shaper.h"

View File

@ -51,7 +51,6 @@
#include "Utils/Exception.h"
#include "VocBase/document-collection.h"
#include "VocBase/edge-collection.h"
#include "VocBase/replication-logger.h"
#include "VocBase/server.h"
#include "VocBase/voc-shaper.h"
#include "Wal/LogfileManager.h"

View File

@ -42,6 +42,7 @@
#include "VocBase/server.h"
#include "VocBase/transaction.h"
#include "VocBase/vocbase.h"
#include "Replication/ContinuousSyncer.h"
// -----------------------------------------------------------------------------
// --SECTION-- REPLICATION APPLIER
@ -422,8 +423,9 @@ static int SetError (TRI_replication_applier_t* applier,
////////////////////////////////////////////////////////////////////////////////
void ApplyThread (void* data) {
TRI_RunContinuousSyncerReplication(data);
TRI_DeleteContinuousSyncerReplication(data);
triagens::arango::ContinuousSyncer* s = static_cast<triagens::arango::ContinuousSyncer*>(data);
s->run();
delete s;
}
////////////////////////////////////////////////////////////////////////////////
@ -451,10 +453,10 @@ static int StartApplier (TRI_replication_applier_t* applier,
return SetError(applier, TRI_ERROR_REPLICATION_INVALID_APPLIER_CONFIGURATION, "no database configured");
}
fetcher = (void*) TRI_CreateContinuousSyncerReplication(applier->_vocbase,
&applier->_configuration,
initialTick,
useTick);
fetcher = (void*) new triagens::arango::ContinuousSyncer(applier->_vocbase,
&applier->_configuration,
initialTick,
useTick);
if (fetcher == NULL) {
return TRI_ERROR_OUT_OF_MEMORY;
@ -478,7 +480,8 @@ static int StartApplier (TRI_replication_applier_t* applier,
TRI_InitThread(&applier->_thread);
if (! TRI_StartThread(&applier->_thread, NULL, "[applier]", ApplyThread, fetcher)) {
TRI_DeleteContinuousSyncerReplication(fetcher);
triagens::arango::ContinuousSyncer* s = static_cast<triagens::arango::ContinuousSyncer*>(fetcher);
delete s;
return TRI_ERROR_INTERNAL;
}

View File

@ -33,7 +33,6 @@
#include "Basics/Common.h"
#include "BasicsC/locks.h"
#include "BasicsC/threads.h"
#include "Replication/replication-static.h"
#include "VocBase/replication-common.h"
#include "VocBase/voc-types.h"

File diff suppressed because it is too large Load Diff

View File

@ -1,213 +0,0 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief replication logger
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2014 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Jan Steemann
/// @author Copyright 2014, ArangoDB GmbH, Cologne, Germany
/// @author Copyright 2011-2013, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGODB_VOC_BASE_REPLICATION__LOGGER_H
#define ARANGODB_VOC_BASE_REPLICATION__LOGGER_H 1
#include "Basics/Common.h"
#include "BasicsC/associative.h"
#include "BasicsC/locks.h"
#include "BasicsC/vector.h"
#include "VocBase/replication-common.h"
#include "VocBase/vocbase.h"
#include "VocBase/voc-types.h"
// -----------------------------------------------------------------------------
// --SECTION-- forward declarations
// -----------------------------------------------------------------------------
struct TRI_df_marker_s;
struct TRI_document_collection_t;
struct TRI_doc_mptr_t;
struct TRI_index_s;
struct TRI_json_s;
struct TRI_transaction_s;
struct TRI_transaction_collection_s;
struct TRI_vocbase_s;
struct TRI_vocbase_col_s;
// -----------------------------------------------------------------------------
// --SECTION-- REPLICATION LOGGER
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
// --SECTION-- public types
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief logger configuration
////////////////////////////////////////////////////////////////////////////////
typedef struct TRI_replication_logger_configuration_s {
uint64_t _maxEvents;
uint64_t _maxEventsSize;
bool _logRemoteChanges;
bool _autoStart;
}
TRI_replication_logger_configuration_t;
////////////////////////////////////////////////////////////////////////////////
/// @brief state information about replication logging
////////////////////////////////////////////////////////////////////////////////
typedef struct TRI_replication_logger_state_s {
TRI_voc_tick_t _lastLogTick;
uint64_t _totalEvents;
bool _active;
}
TRI_replication_logger_state_t;
////////////////////////////////////////////////////////////////////////////////
/// @brief context information for replication logging
////////////////////////////////////////////////////////////////////////////////
typedef struct TRI_replication_logger_s {
TRI_read_write_lock_t _statusLock;
TRI_spin_t _idLock;
TRI_spin_t _bufferLock;
TRI_vector_pointer_t _buffers;
TRI_read_write_lock_t _clientsLock;
TRI_associative_pointer_t _clients;
struct TRI_vocbase_s* _vocbase;
struct TRI_transaction_s* _trx;
struct TRI_transaction_collection_s* _trxCollection;
struct TRI_index_s* _cap; // cap constraint
TRI_replication_logger_state_t _state;
TRI_replication_logger_configuration_t _configuration;
TRI_server_id_t _localServerId;
char* _databaseName;
}
TRI_replication_logger_t;
// -----------------------------------------------------------------------------
// --SECTION-- constructors / destructors
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief create a replication logger
////////////////////////////////////////////////////////////////////////////////
TRI_replication_logger_t* TRI_CreateReplicationLogger (struct TRI_vocbase_s*);
////////////////////////////////////////////////////////////////////////////////
/// @brief destroy a replication logger
////////////////////////////////////////////////////////////////////////////////
void TRI_DestroyReplicationLogger (TRI_replication_logger_t*);
////////////////////////////////////////////////////////////////////////////////
/// @brief free a replication logger
////////////////////////////////////////////////////////////////////////////////
void TRI_FreeReplicationLogger (TRI_replication_logger_t*);
// -----------------------------------------------------------------------------
// --SECTION-- public functions
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief get a JSON representation of the replication logger configuration
////////////////////////////////////////////////////////////////////////////////
struct TRI_json_s* TRI_JsonConfigurationReplicationLogger (TRI_replication_logger_configuration_t const*);
////////////////////////////////////////////////////////////////////////////////
/// @brief configure the replication logger
////////////////////////////////////////////////////////////////////////////////
int TRI_ConfigureReplicationLogger (TRI_replication_logger_t*,
TRI_replication_logger_configuration_t const*);
////////////////////////////////////////////////////////////////////////////////
/// @brief copy a logger configuration
////////////////////////////////////////////////////////////////////////////////
void TRI_CopyConfigurationReplicationLogger (TRI_replication_logger_configuration_t const*,
TRI_replication_logger_configuration_t*);
////////////////////////////////////////////////////////////////////////////////
/// @brief return the list of clients as a JSON array
////////////////////////////////////////////////////////////////////////////////
struct TRI_json_s* TRI_JsonClientsReplicationLogger (TRI_replication_logger_t*);
////////////////////////////////////////////////////////////////////////////////
/// @brief insert an applier action into an action list
////////////////////////////////////////////////////////////////////////////////
void TRI_UpdateClientReplicationLogger (TRI_replication_logger_t*,
TRI_server_id_t,
TRI_voc_tick_t);
////////////////////////////////////////////////////////////////////////////////
/// @brief start the replication logger
////////////////////////////////////////////////////////////////////////////////
int TRI_StartReplicationLogger (TRI_replication_logger_t*);
////////////////////////////////////////////////////////////////////////////////
/// @brief stop the replication logger
////////////////////////////////////////////////////////////////////////////////
int TRI_StopReplicationLogger (TRI_replication_logger_t*);
////////////////////////////////////////////////////////////////////////////////
/// @brief get the current replication state
////////////////////////////////////////////////////////////////////////////////
int TRI_StateReplicationLogger (TRI_replication_logger_t*,
TRI_replication_logger_state_t*);
////////////////////////////////////////////////////////////////////////////////
/// @brief get a JSON representation of a logger state
////////////////////////////////////////////////////////////////////////////////
struct TRI_json_s* TRI_JsonStateReplicationLogger (TRI_replication_logger_state_t const*);
////////////////////////////////////////////////////////////////////////////////
/// @brief return a JSON representation of the replication logger
////////////////////////////////////////////////////////////////////////////////
struct TRI_json_s* TRI_JsonReplicationLogger (TRI_replication_logger_t*);
#endif
// -----------------------------------------------------------------------------
// --SECTION-- END-OF-FILE
// -----------------------------------------------------------------------------
// Local Variables:
// mode: outline-minor
// outline-regexp: "/// @brief\\|/// {@inheritDoc}\\|/// @page\\|// --SECTION--\\|/// @\\}"
// End:

View File

@ -52,8 +52,8 @@ void TRI_InitMasterInfoReplication (TRI_replication_master_info_t* info,
info->_serverId = 0;
info->_majorVersion = 0;
info->_minorVersion = 0;
info->_state._lastLogTick = 0;
info->_state._active = false;
info->_lastLogTick = 0;
info->_active = false;
}
////////////////////////////////////////////////////////////////////////////////
@ -81,7 +81,7 @@ void TRI_LogMasterInfoReplication (TRI_replication_master_info_t const* info,
(unsigned long long) info->_serverId,
info->_majorVersion,
info->_minorVersion,
(unsigned long long) info->_state._lastLogTick);
(unsigned long long) info->_lastLogTick);
}
// -----------------------------------------------------------------------------

View File

@ -33,7 +33,6 @@
#include "Basics/Common.h"
#include "VocBase/replication-common.h"
#include "VocBase/replication-logger.h"
// -----------------------------------------------------------------------------
// --SECTION-- REPLICATION MASTER INFO
@ -52,7 +51,8 @@ typedef struct TRI_replication_master_info_s {
TRI_server_id_t _serverId;
int _majorVersion;
int _minorVersion;
TRI_replication_logger_state_t _state;
TRI_voc_tick_t _lastLogTick;
bool _active;
}
TRI_replication_master_info_t;

View File

@ -49,7 +49,6 @@
#include "Utils/Exception.h"
#include "VocBase/auth.h"
#include "VocBase/replication-applier.h"
#include "VocBase/replication-logger.h"
#include "VocBase/vocbase.h"
#include "Wal/LogfileManager.h"
#include "Wal/Marker.h"
@ -1651,7 +1650,6 @@ int TRI_InitServer (TRI_server_t* server,
char const* appPath,
char const* devAppPath,
TRI_vocbase_defaults_t const* defaults,
bool disableLoggers,
bool disableAppliers,
bool iterateMarkersOnOpen) {
@ -1755,7 +1753,6 @@ int TRI_InitServer (TRI_server_t* server,
TRI_InitMutex(&server->_createLock);
server->_disableReplicationLoggers = disableLoggers;
server->_disableReplicationAppliers = disableAppliers;
server->_initialised = true;
@ -2125,11 +2122,9 @@ int TRI_CreateCoordinatorDatabaseServer (TRI_server_t* server,
TRI_ASSERT(vocbase != nullptr);
vocbase->_replicationLogger = TRI_CreateReplicationLogger(vocbase);
vocbase->_replicationApplier = TRI_CreateReplicationApplier(vocbase);
if (vocbase->_replicationLogger == nullptr ||
vocbase->_replicationApplier == nullptr) {
if (vocbase->_replicationApplier == nullptr) {
TRI_DestroyInitialVocBase(vocbase);
TRI_Free(TRI_UNKNOWN_MEM_ZONE, vocbase);

View File

@ -69,7 +69,6 @@ typedef struct TRI_server_s {
char* _appPath;
char* _devAppPath;
bool _disableReplicationLoggers;
bool _disableReplicationAppliers;
bool _iterateMarkersOnOpen;
bool _hasCreatedSystemDatabase;
@ -104,7 +103,6 @@ int TRI_InitServer (TRI_server_t* server,
char const* appPath,
char const* devappPath,
TRI_vocbase_defaults_t const*,
bool disableReplicationLogger,
bool disableReplicationApplier,
bool iterateMarkersOnOpen);

View File

@ -36,7 +36,6 @@
#include "Utils/Exception.h"
#include "VocBase/collection.h"
#include "VocBase/document-collection.h"
#include "VocBase/replication-logger.h"
#include "VocBase/server.h"
#include "VocBase/vocbase.h"
#include "Wal/DocumentOperation.h"

View File

@ -54,7 +54,6 @@
#include "VocBase/document-collection.h"
#include "VocBase/general-cursor.h"
#include "VocBase/replication-applier.h"
#include "VocBase/replication-logger.h"
#include "VocBase/server.h"
#include "VocBase/transaction.h"
#include "VocBase/vocbase-defaults.h"
@ -1355,7 +1354,6 @@ TRI_vocbase_t* TRI_CreateInitialVocBase (TRI_vocbase_type_e type,
vocbase->_name = TRI_DuplicateStringZ(TRI_CORE_MEM_ZONE, name);
vocbase->_authInfoLoaded = false;
vocbase->_hasCompactor = false;
vocbase->_replicationLogger = nullptr;
vocbase->_replicationApplier = nullptr;
vocbase->_oldTransactions = nullptr;
@ -1432,11 +1430,6 @@ void TRI_DestroyInitialVocBase (TRI_vocbase_t* vocbase) {
vocbase->_replicationApplier = nullptr;
}
if (vocbase->_replicationLogger != nullptr) {
TRI_FreeReplicationLogger(vocbase->_replicationLogger);
vocbase->_replicationLogger = nullptr;
}
if (vocbase->_oldTransactions == nullptr) {
delete vocbase->_oldTransactions;
}
@ -1526,27 +1519,6 @@ TRI_vocbase_t* TRI_OpenVocBase (TRI_server_t* server,
TRI_InitThread(&vocbase->_cleanup);
TRI_StartThread(&vocbase->_cleanup, NULL, "[cleanup]", TRI_CleanupVocBase, vocbase);
vocbase->_replicationLogger = TRI_CreateReplicationLogger(vocbase);
if (vocbase->_replicationLogger == NULL) {
// TODO
LOG_FATAL_AND_EXIT("initialising replication logger for database '%s' failed", name);
}
if (vocbase->_replicationLogger->_configuration._autoStart) {
if (server->_disableReplicationLoggers) {
LOG_INFO("replication logger explicitly deactivated for database '%s'", name);
}
else {
res = TRI_StartReplicationLogger(vocbase->_replicationLogger);
if (res != TRI_ERROR_NO_ERROR) {
// TODO
LOG_FATAL_AND_EXIT("unable to start replication logger for database '%s'", name);
}
}
}
vocbase->_replicationApplier = TRI_CreateReplicationApplier(vocbase);
if (vocbase->_replicationApplier == NULL) {
@ -1584,7 +1556,6 @@ void TRI_DestroyVocBase (TRI_vocbase_t* vocbase) {
// stop replication
TRI_StopReplicationApplier(vocbase->_replicationApplier, false);
TRI_StopReplicationLogger(vocbase->_replicationLogger);
TRI_InitVectorPointer(&collections, TRI_UNKNOWN_MEM_ZONE);

View File

@ -310,7 +310,6 @@ typedef struct TRI_vocbase_s {
std::set<TRI_voc_tid_t>* _oldTransactions;
struct TRI_replication_logger_s* _replicationLogger;
struct TRI_replication_applier_s* _replicationApplier;
// state of the database

View File

@ -1210,6 +1210,20 @@ void LogfileManager::setCollectionDone (Logfile* logfile) {
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief return the current state
////////////////////////////////////////////////////////////////////////////////
LogfileManagerState LogfileManager::state () {
LogfileManagerState state;
// now fill the state
_slots->statistics(state.lastTick, state.numEvents);
state.timeString = getTimeString();
return state;
}
// -----------------------------------------------------------------------------
// --SECTION-- private methods
// -----------------------------------------------------------------------------

View File

@ -59,13 +59,23 @@ namespace triagens {
/// @brief state that is built up when scanning a WAL logfile during recovery
////////////////////////////////////////////////////////////////////////////////
struct RecoverState {
std::unordered_map<TRI_voc_cid_t, TRI_voc_tick_t> collections;
std::unordered_set<TRI_voc_tid_t> failedTransactions;
std::unordered_set<TRI_voc_cid_t> droppedCollections;
std::unordered_set<TRI_voc_tick_t> droppedDatabases;
TRI_voc_tick_t lastTick;
};
struct RecoverState {
std::unordered_map<TRI_voc_cid_t, TRI_voc_tick_t> collections;
std::unordered_set<TRI_voc_tid_t> failedTransactions;
std::unordered_set<TRI_voc_cid_t> droppedCollections;
std::unordered_set<TRI_voc_tick_t> droppedDatabases;
TRI_voc_tick_t lastTick;
};
// -----------------------------------------------------------------------------
// --SECTION-- LogfileManagerState
// -----------------------------------------------------------------------------
struct LogfileManagerState {
TRI_voc_tick_t lastTick;
uint64_t numEvents;
std::string timeString;
};
// -----------------------------------------------------------------------------
// --SECTION-- class LogfileManager
@ -217,6 +227,22 @@ struct RecoverState {
return _slots;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief allow or disallow writes to the WAL
////////////////////////////////////////////////////////////////////////////////
inline void allowWrites (bool value) {
_allowWrites = value;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief whether or not we are in the recovery mode
////////////////////////////////////////////////////////////////////////////////
inline bool isInRecovery () const {
return _inRecovery;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief registers a transaction
////////////////////////////////////////////////////////////////////////////////
@ -426,20 +452,10 @@ struct RecoverState {
void setCollectionDone (Logfile*);
////////////////////////////////////////////////////////////////////////////////
/// @brief allow or disallow writes to the WAL
/// @brief return the current state
////////////////////////////////////////////////////////////////////////////////
inline void allowWrites (bool value) {
_allowWrites = value;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief whether or not we are in the recovery mode
////////////////////////////////////////////////////////////////////////////////
inline bool isInRecovery () const {
return _inRecovery;
}
LogfileManagerState state ();
// -----------------------------------------------------------------------------
// --SECTION-- private methods

View File

@ -59,7 +59,9 @@ Slots::Slots (LogfileManager* logfileManager,
_handoutIndex(0),
_recycleIndex(0),
_logfile(nullptr),
_lastCommittedTick(0) {
_lastAssignedTick(0),
_lastCommittedTick(0),
_numEvents(0) {
}
////////////////////////////////////////////////////////////////////////////////
@ -76,6 +78,17 @@ Slots::~Slots () {
// --SECTION-- public methods
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief get the statistics of the slots
////////////////////////////////////////////////////////////////////////////////
void Slots::statistics (Slot::TickType& lastTick,
uint64_t& numEvents) {
MUTEX_LOCKER(_lock);
lastTick = _lastCommittedTick;
numEvents = _numEvents;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief execute a flush operation
////////////////////////////////////////////////////////////////////////////////
@ -236,6 +249,7 @@ void Slots::returnUsed (SlotInfo& slotInfo,
{
MUTEX_LOCKER(_lock);
slotInfo.slot->setReturned(waitForSync);
++_numEvents;
}
_logfileManager->signalSync();
@ -510,8 +524,9 @@ Slot::TickType Slots::handout () {
// wrap around
_handoutIndex = 0;
}
return static_cast<Slot::TickType>(TRI_NewTickServer());
_lastAssignedTick = static_cast<Slot::TickType>(TRI_NewTickServer());
return _lastAssignedTick;
}
////////////////////////////////////////////////////////////////////////////////

View File

@ -136,6 +136,13 @@ namespace triagens {
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief get the statistics of the slots
////////////////////////////////////////////////////////////////////////////////
void statistics (Slot::TickType&,
uint64_t&);
////////////////////////////////////////////////////////////////////////////////
/// @brief execute a flush operation
////////////////////////////////////////////////////////////////////////////////
@ -281,12 +288,24 @@ namespace triagens {
Logfile* _logfile;
////////////////////////////////////////////////////////////////////////////////
/// @brief last assigned tick value
////////////////////////////////////////////////////////////////////////////////
Slot::TickType _lastAssignedTick;
////////////////////////////////////////////////////////////////////////////////
/// @brief last committed tick value
////////////////////////////////////////////////////////////////////////////////
Slot::TickType _lastCommittedTick;
////////////////////////////////////////////////////////////////////////////////
/// @brief number of log events handled
////////////////////////////////////////////////////////////////////////////////
uint64_t _numEvents;
};
}

View File

@ -529,7 +529,7 @@ static int DumpCollection (int fd,
const string baseUrl = "/_api/replication/dump?collection=" + cid +
"&chunkSize=" + StringUtils::itoa(ChunkSize) +
"&ticks=false&translateIds=true";
"&ticks=false&translateIds=true&flush=false";
map<string, string> headers;

View File

@ -1,8 +1,7 @@
/*jslint indent: 2, nomen: true, maxlen: 120, vars: true, white: true, plusplus: true, nonpropdel: true, proto: true */
/*jslint sloppy: true, regexp: true */
/*global require, module, Module, ArangoError, SleepAndRequeue,
REPLICATION_LOGGER_START, REPLICATION_LOGGER_STOP, REPLICATION_LOGGER_STATE,
REPLICATION_LOGGER_CONFIGURE, REPLICATION_APPLIER_CONFIGURE, REPLICATION_APPLIER_START,
REPLICATION_LOGGER_STATE, REPLICATION_LOGGER_CONFIGURE, REPLICATION_APPLIER_CONFIGURE, REPLICATION_APPLIER_START,
REPLICATION_APPLIER_STOP, REPLICATION_APPLIER_FORGET, REPLICATION_APPLIER_STATE,
REPLICATION_SYNCHRONISE, REPLICATION_SERVER_ID, CONFIGURE_ENDPOINT, REMOVE_ENDPOINT, LIST_ENDPOINTS,
SYS_BASE64DECODE, SYS_BASE64ENCODE, SYS_DEBUG_SEGFAULT,
@ -264,24 +263,6 @@
// --SECTION-- public functions
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief startReplicationLogger
////////////////////////////////////////////////////////////////////////////////
if (typeof REPLICATION_LOGGER_START !== "undefined") {
exports.startReplicationLogger = REPLICATION_LOGGER_START;
delete REPLICATION_LOGGER_START;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief stopReplicationLogger
////////////////////////////////////////////////////////////////////////////////
if (typeof REPLICATION_LOGGER_STOP !== "undefined") {
exports.stopReplicationLogger = REPLICATION_LOGGER_STOP;
delete REPLICATION_LOGGER_STOP;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief getStateReplicationLogger
////////////////////////////////////////////////////////////////////////////////

View File

@ -1,8 +1,7 @@
/*jslint indent: 2, nomen: true, maxlen: 120, vars: true, white: true, plusplus: true, nonpropdel: true, proto: true */
/*jslint sloppy: true, regexp: true */
/*global require, module, Module, ArangoError, SleepAndRequeue,
REPLICATION_LOGGER_START, REPLICATION_LOGGER_STOP, REPLICATION_LOGGER_STATE,
REPLICATION_LOGGER_CONFIGURE, REPLICATION_APPLIER_CONFIGURE, REPLICATION_APPLIER_START,
REPLICATION_LOGGER_STATE, REPLICATION_LOGGER_CONFIGURE, REPLICATION_APPLIER_CONFIGURE, REPLICATION_APPLIER_START,
REPLICATION_APPLIER_STOP, REPLICATION_APPLIER_FORGET, REPLICATION_APPLIER_STATE,
REPLICATION_SYNCHRONISE, REPLICATION_SERVER_ID, CONFIGURE_ENDPOINT, REMOVE_ENDPOINT, LIST_ENDPOINTS,
SYS_BASE64DECODE, SYS_BASE64ENCODE, SYS_DEBUG_SEGFAULT,
@ -264,24 +263,6 @@
// --SECTION-- public functions
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief startReplicationLogger
////////////////////////////////////////////////////////////////////////////////
if (typeof REPLICATION_LOGGER_START !== "undefined") {
exports.startReplicationLogger = REPLICATION_LOGGER_START;
delete REPLICATION_LOGGER_START;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief stopReplicationLogger
////////////////////////////////////////////////////////////////////////////////
if (typeof REPLICATION_LOGGER_STOP !== "undefined") {
exports.stopReplicationLogger = REPLICATION_LOGGER_STOP;
delete REPLICATION_LOGGER_STOP;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief getStateReplicationLogger
////////////////////////////////////////////////////////////////////////////////

View File

@ -38,32 +38,33 @@ var internal = require("internal");
// --SECTION-- private functions
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup ArangoShell
/// @{
////////////////////////////////////////////////////////////////////////////////
var logger = { };
var applier = { };
////////////////////////////////////////////////////////////////////////////////
/// @brief starts the replication logger
/// @brief starts the replication logger - this is a no-op in ArangoDB 2.2 and
/// higher
////////////////////////////////////////////////////////////////////////////////
logger.start = function () {
'use strict';
return internal.startReplicationLogger();
// the logger in ArangoDB 2.2 is now the WAL...
// so the logger cannot be started but is always running
return true;
};
////////////////////////////////////////////////////////////////////////////////
/// @brief stops the replication logger
/// @brief stops the replication logger - this is a no-op in ArangoDB 2.2 and
/// higher
////////////////////////////////////////////////////////////////////////////////
logger.stop = function () {
'use strict';
return internal.stopReplicationLogger();
// the logger in ArangoDB 2.2 is now the WAL...
// so the logger cannot be stopped
return false;
};
////////////////////////////////////////////////////////////////////////////////
@ -80,14 +81,10 @@ logger.state = function () {
/// @brief returns the configuration of the replication logger
////////////////////////////////////////////////////////////////////////////////
logger.properties = function (config) {
logger.properties = function () {
'use strict';
if (config === undefined) {
return internal.configureReplicationLogger();
}
return internal.configureReplicationLogger(config);
return internal.configureReplicationLogger();
};
////////////////////////////////////////////////////////////////////////////////
@ -148,19 +145,10 @@ applier.properties = function (config) {
return internal.configureReplicationApplier(config);
};
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- other functions
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup ArangoShell
/// @{
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @brief performs a one-time synchronisation with a remote endpoint
////////////////////////////////////////////////////////////////////////////////
@ -181,28 +169,15 @@ var serverId = function () {
return internal.serverId();
};
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- module exports
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup ArangoShell
/// @{
////////////////////////////////////////////////////////////////////////////////
exports.logger = logger;
exports.applier = applier;
exports.sync = sync;
exports.serverId = serverId;
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- END-OF-FILE
// -----------------------------------------------------------------------------