From 16bf152c8fb321891ebc13414dbccb6d4adbb8ae Mon Sep 17 00:00:00 2001 From: Jan Steemann Date: Sat, 21 Jun 2014 00:32:47 +0200 Subject: [PATCH] fully removed replication logger --- arangod/CMakeLists.txt | 2 - arangod/Makefile.files | 2 - arangod/Replication/InitialSyncer.cpp | 2 +- arangod/Replication/InitialSyncer.h | 2 +- arangod/Replication/Syncer.cpp | 4 +- arangod/Replication/replication-static.cpp | 86 -- arangod/Replication/replication-static.h | 78 - .../RestHandler/RestReplicationHandler.cpp | 270 ++-- arangod/RestServer/ArangoServer.cpp | 4 +- arangod/RestServer/ArangoServer.h | 20 - arangod/V8Server/v8-vocbase.cpp | 193 +-- arangod/VocBase/document-collection.cpp | 1 - arangod/VocBase/index.cpp | 1 - arangod/VocBase/replication-applier.cpp | 17 +- arangod/VocBase/replication-applier.h | 1 - arangod/VocBase/replication-logger.cpp | 1288 ----------------- arangod/VocBase/replication-logger.h | 213 --- arangod/VocBase/replication-master.cpp | 6 +- arangod/VocBase/replication-master.h | 4 +- arangod/VocBase/server.cpp | 7 +- arangod/VocBase/server.h | 2 - arangod/VocBase/transaction.cpp | 1 - arangod/VocBase/vocbase.cpp | 29 - arangod/VocBase/vocbase.h | 1 - arangod/Wal/LogfileManager.cpp | 14 + arangod/Wal/LogfileManager.h | 54 +- arangod/Wal/Slots.cpp | 21 +- arangod/Wal/Slots.h | 19 + arangosh/V8Client/arangodump.cpp | 2 +- .../frontend/js/bootstrap/module-internal.js | 21 +- js/common/bootstrap/module-internal.js | 21 +- js/server/modules/org/arangodb/replication.js | 51 +- 32 files changed, 275 insertions(+), 2162 deletions(-) delete mode 100644 arangod/Replication/replication-static.cpp delete mode 100644 arangod/Replication/replication-static.h delete mode 100644 arangod/VocBase/replication-logger.cpp delete mode 100644 arangod/VocBase/replication-logger.h diff --git a/arangod/CMakeLists.txt b/arangod/CMakeLists.txt index 2aa7fad3a8..373a4bdb89 100644 --- a/arangod/CMakeLists.txt +++ b/arangod/CMakeLists.txt @@ -82,7 +82,6 @@ add_executable( HashIndex/hash-index.cpp IndexIterators/index-iterator.cpp IndexOperators/index-operator.cpp - Replication/replication-static.cpp Replication/ContinuousSyncer.cpp Replication/InitialSyncer.cpp Replication/Syncer.cpp @@ -125,7 +124,6 @@ add_executable( VocBase/replication-applier.cpp VocBase/replication-common.cpp VocBase/replication-dump.cpp - VocBase/replication-logger.cpp VocBase/replication-master.cpp VocBase/server.cpp VocBase/transaction.cpp diff --git a/arangod/Makefile.files b/arangod/Makefile.files index 88356ce11b..53390db176 100644 --- a/arangod/Makefile.files +++ b/arangod/Makefile.files @@ -63,7 +63,6 @@ arangod_libarangod_a_SOURCES = \ arangod/HashIndex/hash-index.cpp \ arangod/IndexIterators/index-iterator.cpp \ arangod/IndexOperators/index-operator.cpp \ - arangod/Replication/replication-static.cpp \ arangod/Replication/ContinuousSyncer.cpp \ arangod/Replication/InitialSyncer.cpp \ arangod/Replication/Syncer.cpp \ @@ -106,7 +105,6 @@ arangod_libarangod_a_SOURCES = \ arangod/VocBase/replication-applier.cpp \ arangod/VocBase/replication-common.cpp \ arangod/VocBase/replication-dump.cpp \ - arangod/VocBase/replication-logger.cpp \ arangod/VocBase/replication-master.cpp \ arangod/VocBase/server.cpp \ arangod/VocBase/transaction.cpp \ diff --git a/arangod/Replication/InitialSyncer.cpp b/arangod/Replication/InitialSyncer.cpp index 77eb8b1e50..ff713b8033 100644 --- a/arangod/Replication/InitialSyncer.cpp +++ b/arangod/Replication/InitialSyncer.cpp @@ -738,7 +738,7 @@ int InitialSyncer::handleCollection (TRI_json_t const* parameters, errorMsg = "unable to start transaction: " + string(TRI_errno_string(res)); } else { - res = handleCollectionDump(trxCollection, masterName, _masterInfo._state._lastLogTick, errorMsg); + res = handleCollectionDump(trxCollection, masterName, _masterInfo._lastLogTick, errorMsg); } if (res == TRI_ERROR_NO_ERROR) { diff --git a/arangod/Replication/InitialSyncer.h b/arangod/Replication/InitialSyncer.h index 704bac6b84..4248e22ea1 100644 --- a/arangod/Replication/InitialSyncer.h +++ b/arangod/Replication/InitialSyncer.h @@ -116,7 +116,7 @@ namespace triagens { //////////////////////////////////////////////////////////////////////////////// TRI_voc_tick_t getLastLogTick () const { - return _masterInfo._state._lastLogTick; + return _masterInfo._lastLogTick; } //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/Replication/Syncer.cpp b/arangod/Replication/Syncer.cpp index d1d56c067d..5200b18bb3 100644 --- a/arangod/Replication/Syncer.cpp +++ b/arangod/Replication/Syncer.cpp @@ -709,8 +709,8 @@ int Syncer::handleStateResponse (TRI_json_t const* json, _masterInfo._majorVersion = major; _masterInfo._minorVersion = minor; _masterInfo._serverId = masterId; - _masterInfo._state._lastLogTick = lastLogTick; - _masterInfo._state._active = running; + _masterInfo._lastLogTick = lastLogTick; + _masterInfo._active = running; TRI_LogMasterInfoReplication(&_masterInfo, "connected to"); diff --git a/arangod/Replication/replication-static.cpp b/arangod/Replication/replication-static.cpp deleted file mode 100644 index 7c3143d819..0000000000 --- a/arangod/Replication/replication-static.cpp +++ /dev/null @@ -1,86 +0,0 @@ -//////////////////////////////////////////////////////////////////////////////// -/// @brief replication data fetcher -/// -/// @file -/// -/// DISCLAIMER -/// -/// Copyright 2014 ArangoDB GmbH, Cologne, Germany -/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany -/// -/// Licensed under the Apache License, Version 2.0 (the "License"); -/// you may not use this file except in compliance with the License. -/// You may obtain a copy of the License at -/// -/// http://www.apache.org/licenses/LICENSE-2.0 -/// -/// Unless required by applicable law or agreed to in writing, software -/// distributed under the License is distributed on an "AS IS" BASIS, -/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -/// See the License for the specific language governing permissions and -/// limitations under the License. -/// -/// Copyright holder is ArangoDB GmbH, Cologne, Germany -/// -/// @author Jan Steemann -/// @author Copyright 2014, ArangoDB GmbH, Cologne, Germany -/// @author Copyright 2013, triAGENS GmbH, Cologne, Germany -//////////////////////////////////////////////////////////////////////////////// - -#include "replication-static.h" -#include "Replication/ContinuousSyncer.h" - -#include "VocBase/vocbase.h" - -using namespace std; -using namespace triagens::arango; - -// ----------------------------------------------------------------------------- -// --SECTION-- public functions -// ----------------------------------------------------------------------------- - -//////////////////////////////////////////////////////////////////////////////// -/// @brief static create method -//////////////////////////////////////////////////////////////////////////////// - -void* TRI_CreateContinuousSyncerReplication (TRI_vocbase_t* vocbase, - TRI_replication_applier_configuration_t const* configuration, - TRI_voc_tick_t initialTick, - bool useTick) { - - ContinuousSyncer* s = new ContinuousSyncer(vocbase, - configuration, - initialTick, - useTick); - - return (void*) s; -} - -//////////////////////////////////////////////////////////////////////////////// -/// @brief static free method -//////////////////////////////////////////////////////////////////////////////// - -void TRI_DeleteContinuousSyncerReplication (void* ptr) { - ContinuousSyncer* s = static_cast(ptr); - - delete s; -} - -//////////////////////////////////////////////////////////////////////////////// -/// @brief static run method -//////////////////////////////////////////////////////////////////////////////// - -int TRI_RunContinuousSyncerReplication (void* ptr) { - ContinuousSyncer* s = static_cast(ptr); - - return s->run(); -} - -// ----------------------------------------------------------------------------- -// --SECTION-- END-OF-FILE -// ----------------------------------------------------------------------------- - -// Local Variables: -// mode: outline-minor -// outline-regexp: "/// @brief\\|/// {@inheritDoc}\\|/// @page\\|// --SECTION--\\|/// @\\}" -// End: diff --git a/arangod/Replication/replication-static.h b/arangod/Replication/replication-static.h deleted file mode 100644 index c600620913..0000000000 --- a/arangod/Replication/replication-static.h +++ /dev/null @@ -1,78 +0,0 @@ -//////////////////////////////////////////////////////////////////////////////// -/// @brief replication data fetcher -/// -/// @file -/// -/// DISCLAIMER -/// -/// Copyright 2014 ArangoDB GmbH, Cologne, Germany -/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany -/// -/// Licensed under the Apache License, Version 2.0 (the "License"); -/// you may not use this file except in compliance with the License. -/// You may obtain a copy of the License at -/// -/// http://www.apache.org/licenses/LICENSE-2.0 -/// -/// Unless required by applicable law or agreed to in writing, software -/// distributed under the License is distributed on an "AS IS" BASIS, -/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -/// See the License for the specific language governing permissions and -/// limitations under the License. -/// -/// Copyright holder is ArangoDB GmbH, Cologne, Germany -/// -/// @author Jan Steemann -/// @author Copyright 2014, ArangoDB GmbH, Cologne, Germany -/// @author Copyright 2013, triAGENS GmbH, Cologne, Germany -//////////////////////////////////////////////////////////////////////////////// - -#ifndef ARANGODB_REPLICATION_REPLICATION__STATIC_H -#define ARANGODB_REPLICATION_REPLICATION__STATIC_H 1 - -#include "Basics/Common.h" - -#include "VocBase/voc-types.h" - -// ----------------------------------------------------------------------------- -// --SECTION-- forward declarations -// ----------------------------------------------------------------------------- - -struct TRI_replication_applier_configuration_s; -struct TRI_vocbase_s; - -// ----------------------------------------------------------------------------- -// --SECTION-- public functions -// ----------------------------------------------------------------------------- - -//////////////////////////////////////////////////////////////////////////////// -/// @brief static create method -//////////////////////////////////////////////////////////////////////////////// - -void* TRI_CreateContinuousSyncerReplication (struct TRI_vocbase_s*, - struct TRI_replication_applier_configuration_s const*, - TRI_voc_tick_t initialTick, - bool); - -//////////////////////////////////////////////////////////////////////////////// -/// @brief static free method -//////////////////////////////////////////////////////////////////////////////// - -void TRI_DeleteContinuousSyncerReplication (void*); - -//////////////////////////////////////////////////////////////////////////////// -/// @brief static run method -//////////////////////////////////////////////////////////////////////////////// - -int TRI_RunContinuousSyncerReplication (void*); - -#endif - -// ----------------------------------------------------------------------------- -// --SECTION-- END-OF-FILE -// ----------------------------------------------------------------------------- - -// Local Variables: -// mode: outline-minor -// outline-regexp: "/// @brief\\|/// {@inheritDoc}\\|/// @page\\|// --SECTION--\\|/// @\\}" -// End: diff --git a/arangod/RestHandler/RestReplicationHandler.cpp b/arangod/RestHandler/RestReplicationHandler.cpp index 280df4e5de..159d62d951 100644 --- a/arangod/RestHandler/RestReplicationHandler.cpp +++ b/arangod/RestHandler/RestReplicationHandler.cpp @@ -36,14 +36,15 @@ #include "HttpServer/HttpServer.h" #include "Replication/InitialSyncer.h" #include "Rest/HttpRequest.h" +#include "Utils/CollectionGuard.h" #include "Utils/transactions.h" #include "VocBase/compactor.h" #include "VocBase/replication-applier.h" #include "VocBase/replication-dump.h" -#include "VocBase/replication-logger.h" #include "VocBase/server.h" #include "VocBase/update-policy.h" #include "VocBase/index.h" +#include "Wal/LogfileManager.h" #include "Cluster/ClusterMethods.h" #include "Cluster/ClusterComm.h" @@ -394,7 +395,8 @@ void RestReplicationHandler::insertClient (TRI_voc_tick_t lastServedTick) { TRI_server_id_t serverId = (TRI_server_id_t) StringUtils::uint64(value); if (serverId > 0) { - TRI_UpdateClientReplicationLogger(_vocbase->_replicationLogger, serverId, lastServedTick); + // TODO: FIXME!! +// TRI_UpdateClientReplicationLogger(_vocbase->_replicationLogger, serverId, lastServedTick); } } } @@ -469,15 +471,8 @@ uint64_t RestReplicationHandler::determineChunkSize () const { //////////////////////////////////////////////////////////////////////////////// void RestReplicationHandler::handleCommandLoggerStart () { - TRI_ASSERT(_vocbase->_replicationLogger != 0); - - int res = TRI_StartReplicationLogger(_vocbase->_replicationLogger); - - if (res != TRI_ERROR_NO_ERROR) { - generateError(HttpResponse::SERVER_ERROR, res); - return; - } - + // the logger in ArangoDB 2.2 is now the WAL... + // so the logger cannot be started but is always running TRI_json_t result; TRI_InitArrayJson(TRI_CORE_MEM_ZONE, &result); @@ -534,19 +529,12 @@ void RestReplicationHandler::handleCommandLoggerStart () { //////////////////////////////////////////////////////////////////////////////// void RestReplicationHandler::handleCommandLoggerStop () { - TRI_ASSERT(_vocbase->_replicationLogger != 0); - - int res = TRI_StopReplicationLogger(_vocbase->_replicationLogger); - - if (res != TRI_ERROR_NO_ERROR) { - generateError(HttpResponse::SERVER_ERROR, res); - return; - } - + // the logger in ArangoDB 2.2 is now the WAL... + // so the logger cannot be stopped TRI_json_t result; TRI_InitArrayJson(TRI_CORE_MEM_ZONE, &result); - TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, &result, "running", TRI_CreateBooleanJson(TRI_CORE_MEM_ZONE, false)); + TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, &result, "running", TRI_CreateBooleanJson(TRI_CORE_MEM_ZONE, true)); generateResult(&result); TRI_DestroyJson(TRI_CORE_MEM_ZONE, &result); @@ -643,17 +631,46 @@ void RestReplicationHandler::handleCommandLoggerStop () { //////////////////////////////////////////////////////////////////////////////// void RestReplicationHandler::handleCommandLoggerState () { - TRI_ASSERT(_vocbase->_replicationLogger != 0); + TRI_json_t* json = TRI_CreateArrayJson(TRI_UNKNOWN_MEM_ZONE); - TRI_json_t* json = TRI_JsonReplicationLogger(_vocbase->_replicationLogger); - - if (json == 0) { + if (json == nullptr) { generateError(HttpResponse::SERVER_ERROR, TRI_ERROR_OUT_OF_MEMORY); return; } + // "state" part + TRI_json_t* state = TRI_CreateArrayJson(TRI_UNKNOWN_MEM_ZONE); + + if (state == nullptr) { + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); + generateError(HttpResponse::SERVER_ERROR, TRI_ERROR_OUT_OF_MEMORY); + return; + } + + triagens::wal::LogfileManagerState const&& s = triagens::wal::LogfileManager::instance()->state(); + + TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, state, "running", TRI_CreateBooleanJson(TRI_UNKNOWN_MEM_ZONE, true)); + TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, state, "lastLogTick", TRI_CreateStringCopyJson(TRI_UNKNOWN_MEM_ZONE, StringUtils::itoa(s.lastTick).c_str())); + TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, state, "totalEvents", TRI_CreateNumberJson(TRI_UNKNOWN_MEM_ZONE, s.numEvents)); + TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, state, "time", TRI_CreateStringCopyJson(TRI_UNKNOWN_MEM_ZONE, s.timeString.c_str())); + TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "state", state); + + // "server" part + TRI_json_t* server = TRI_CreateArrayJson(TRI_UNKNOWN_MEM_ZONE); + + if (server == nullptr) { + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); + generateError(HttpResponse::SERVER_ERROR, TRI_ERROR_OUT_OF_MEMORY); + return; + } + + TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, server, "version", TRI_CreateStringCopyJson(TRI_UNKNOWN_MEM_ZONE, TRI_VERSION)); + char* serverIdString = TRI_StringUInt64(TRI_GetIdServer()); + TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, server, "serverId", TRI_CreateStringCopyJson(TRI_UNKNOWN_MEM_ZONE, serverIdString)); + TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "server", server); + generateResult(json); - TRI_FreeJson(TRI_CORE_MEM_ZONE, json); + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); } //////////////////////////////////////////////////////////////////////////////// @@ -704,23 +721,20 @@ void RestReplicationHandler::handleCommandLoggerState () { //////////////////////////////////////////////////////////////////////////////// void RestReplicationHandler::handleCommandLoggerGetConfig () { - TRI_ASSERT(_vocbase->_replicationLogger != 0); + TRI_json_t* json = TRI_CreateArrayJson(TRI_UNKNOWN_MEM_ZONE); - TRI_replication_logger_configuration_t config; - - TRI_ReadLockReadWriteLock(&_vocbase->_replicationLogger->_statusLock); - TRI_CopyConfigurationReplicationLogger(&_vocbase->_replicationLogger->_configuration, &config); - TRI_ReadUnlockReadWriteLock(&_vocbase->_replicationLogger->_statusLock); - - TRI_json_t* json = TRI_JsonConfigurationReplicationLogger(&config); - - if (json == 0) { + if (json == nullptr) { generateError(HttpResponse::SERVER_ERROR, TRI_ERROR_OUT_OF_MEMORY); return; } + TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "autoStart", TRI_CreateBooleanJson(TRI_UNKNOWN_MEM_ZONE, true)); + TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "logRemoteChanges", TRI_CreateBooleanJson(TRI_UNKNOWN_MEM_ZONE, true)); + TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "maxEvents", TRI_CreateNumberJson(TRI_UNKNOWN_MEM_ZONE, 0)); + TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "maxEventsSize", TRI_CreateNumberJson(TRI_UNKNOWN_MEM_ZONE, 0)); + generateResult(json); - TRI_FreeJson(TRI_CORE_MEM_ZONE, json); + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); } //////////////////////////////////////////////////////////////////////////////// @@ -809,58 +823,6 @@ void RestReplicationHandler::handleCommandLoggerGetConfig () { //////////////////////////////////////////////////////////////////////////////// void RestReplicationHandler::handleCommandLoggerSetConfig () { - TRI_ASSERT(_vocbase->_replicationLogger != 0); - - TRI_replication_logger_configuration_t config; - - // copy previous config - TRI_ReadLockReadWriteLock(&_vocbase->_replicationLogger->_statusLock); - TRI_CopyConfigurationReplicationLogger(&_vocbase->_replicationLogger->_configuration, &config); - TRI_ReadUnlockReadWriteLock(&_vocbase->_replicationLogger->_statusLock); - - TRI_json_t* json = parseJsonBody(); - - if (json == 0) { - generateError(HttpResponse::BAD, TRI_ERROR_HTTP_BAD_PARAMETER); - return; - } - - TRI_json_t const* value; - - value = JsonHelper::getArrayElement(json, "autoStart"); - if (JsonHelper::isBoolean(value)) { - config._autoStart = value->_value._boolean; - } - - value = JsonHelper::getArrayElement(json, "logRemoteChanges"); - if (JsonHelper::isBoolean(value)) { - config._logRemoteChanges = value->_value._boolean; - } - - value = JsonHelper::getArrayElement(json, "maxEvents"); - if (JsonHelper::isNumber(value)) { - config._maxEvents = (uint64_t) value->_value._number; - } - - value = JsonHelper::getArrayElement(json, "maxEventsSize"); - if (JsonHelper::isNumber(value)) { - config._maxEventsSize = (uint64_t) value->_value._number; - } - - TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); - - int res = TRI_ConfigureReplicationLogger(_vocbase->_replicationLogger, &config); - - if (res != TRI_ERROR_NO_ERROR) { - if (res == TRI_ERROR_REPLICATION_INVALID_APPLIER_CONFIGURATION) { - generateError(HttpResponse::BAD, res); - } - else { - generateError(HttpResponse::SERVER_ERROR, res); - } - return; - } - handleCommandLoggerGetConfig(); } @@ -1378,14 +1340,10 @@ void RestReplicationHandler::handleCommandLoggerFollow () { int res = TRI_DumpLogReplication(_vocbase, &dump, tickStart, tickEnd, chunkSize); - TRI_replication_logger_state_t state; + triagens::wal::LogfileManagerState state = triagens::wal::LogfileManager::instance()->state(); if (res == TRI_ERROR_NO_ERROR) { - res = TRI_StateReplicationLogger(_vocbase->_replicationLogger, &state); - } - - if (res == TRI_ERROR_NO_ERROR) { - const bool checkMore = (dump._lastFoundTick > 0 && dump._lastFoundTick != state._lastLogTick); + const bool checkMore = (dump._lastFoundTick > 0 && dump._lastFoundTick != state.lastTick); // generate the result const size_t length = TRI_LengthStringBuffer(dump._buffer); @@ -1410,11 +1368,11 @@ void RestReplicationHandler::handleCommandLoggerFollow () { _response->setHeader(TRI_REPLICATION_HEADER_LASTTICK, strlen(TRI_REPLICATION_HEADER_LASTTICK), - StringUtils::itoa(state._lastLogTick)); + StringUtils::itoa(state.lastTick)); _response->setHeader(TRI_REPLICATION_HEADER_ACTIVE, - strlen(TRI_REPLICATION_HEADER_ACTIVE), - state._active ? "true" : "false"); + strlen(TRI_REPLICATION_HEADER_ACTIVE), + "true"); if (length > 0) { // transfer ownership of the buffer contents @@ -1559,8 +1517,6 @@ void RestReplicationHandler::handleCommandLoggerFollow () { //////////////////////////////////////////////////////////////////////////////// void RestReplicationHandler::handleCommandInventory () { - TRI_ASSERT(_vocbase->_replicationLogger != 0); - TRI_voc_tick_t tick = TRI_CurrentTickServer(); // include system collections? @@ -1583,7 +1539,7 @@ void RestReplicationHandler::handleCommandInventory () { TRI_ASSERT(JsonHelper::isList(collections)); // sort collections by type, then name - const size_t n = collections->_value._objects._length; + size_t const n = collections->_value._objects._length; if (n > 1) { // sort by collection type (vertices before edges), then name @@ -1591,26 +1547,32 @@ void RestReplicationHandler::handleCommandInventory () { } - TRI_replication_logger_state_t state; - - int res = TRI_StateReplicationLogger(_vocbase->_replicationLogger, &state); - - if (res != TRI_ERROR_NO_ERROR) { - TRI_Free(TRI_CORE_MEM_ZONE, collections); - - generateError(HttpResponse::SERVER_ERROR, res); - return; - } - TRI_json_t json; - TRI_InitArrayJson(TRI_CORE_MEM_ZONE, &json); char* tickString = TRI_StringUInt64(tick); // add collections data TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, &json, "collections", collections); - TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, &json, "state", TRI_JsonStateReplicationLogger(&state)); + + // "state" + TRI_json_t* state = TRI_CreateArrayJson(TRI_CORE_MEM_ZONE); + + if (state == nullptr) { + TRI_DestroyJson(TRI_CORE_MEM_ZONE, &json); + generateError(HttpResponse::SERVER_ERROR, TRI_ERROR_OUT_OF_MEMORY); + return; + } + + triagens::wal::LogfileManagerState const&& s = triagens::wal::LogfileManager::instance()->state(); + + TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, state, "running", TRI_CreateBooleanJson(TRI_CORE_MEM_ZONE, true)); + char* logTickString = TRI_StringUInt64(s.lastTick); + TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, state, "lastLogTick", TRI_CreateStringJson(TRI_CORE_MEM_ZONE, logTickString)); + TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, state, "totalEvents", TRI_CreateNumberJson(TRI_CORE_MEM_ZONE, s.numEvents)); + TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, state, "time", TRI_CreateStringCopyJson(TRI_CORE_MEM_ZONE, s.timeString.c_str())); + TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, &json, "state", state); + TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, &json, "tick", TRI_CreateStringJson(TRI_CORE_MEM_ZONE, tickString)); generateResult(&json); @@ -3158,7 +3120,7 @@ void RestReplicationHandler::handleCommandRestoreDataCoordinator () { void RestReplicationHandler::handleCommandDump () { char const* collection = _request->value("collection"); - if (collection == 0) { + if (collection == nullptr) { generateError(HttpResponse::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, "invalid collection parameter"); @@ -3167,13 +3129,22 @@ void RestReplicationHandler::handleCommandDump () { // determine start tick for dump TRI_voc_tick_t tickStart = 0; - TRI_voc_tick_t tickEnd = (TRI_voc_tick_t) UINT64_MAX; + TRI_voc_tick_t tickEnd = static_cast(UINT64_MAX); + bool flush = true; // flush WAL before dumping? bool withTicks = true; bool translateCollectionIds = true; bool found; char const* value; + + // determine flush WAL value + value = _request->value("flush", found); + if (found) { + flush = StringUtils::boolean(value); + } + + // determine start tick for dump value = _request->value("from", found); if (found) { @@ -3195,54 +3166,60 @@ void RestReplicationHandler::handleCommandDump () { } value = _request->value("ticks", found); + if (found) { withTicks = StringUtils::boolean(value); } value = _request->value("translateIds", found); + if (found) { translateCollectionIds = StringUtils::boolean(value); } - const uint64_t chunkSize = determineChunkSize(); + uint64_t const chunkSize = determineChunkSize(); TRI_vocbase_col_t* c = TRI_LookupCollectionByNameVocBase(_vocbase, collection); - if (c == 0) { + if (c == nullptr) { generateError(HttpResponse::NOT_FOUND, TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND); return; } - const TRI_voc_cid_t cid = c->_cid; + LOG_TRACE("requested collection dump for collection '%s', tickStart: %llu, tickEnd: %llu", + collection, + (unsigned long long) tickStart, + (unsigned long long) tickEnd); - LOG_DEBUG("requested collection dump for collection '%s', tickStart: %llu, tickEnd: %llu", - collection, - (unsigned long long) tickStart, - (unsigned long long) tickEnd); + int res = TRI_ERROR_NO_ERROR; - TRI_vocbase_col_status_e status; - TRI_vocbase_col_t* col = TRI_UseCollectionByIdVocBase(_vocbase, cid, status); + try { + if (flush) { + triagens::wal::LogfileManager::instance()->flush(true, true, false); + } - if (col == 0) { - generateError(HttpResponse::NOT_FOUND, TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND); - return; - } + triagens::arango::CollectionGuard guard(_vocbase, c->_cid, false); - // initialise the dump container - TRI_replication_dump_t dump; - if (TRI_InitDumpReplication(&dump, _vocbase, (size_t) defaultChunkSize) != TRI_ERROR_NO_ERROR) { - TRI_ReleaseCollectionVocBase(_vocbase, col); - generateError(HttpResponse::SERVER_ERROR, TRI_ERROR_OUT_OF_MEMORY); - return; - } + TRI_vocbase_col_t* col = guard.collection(); + TRI_ASSERT(col != nullptr); + + // initialise the dump container + TRI_replication_dump_t dump; + res = TRI_InitDumpReplication(&dump, _vocbase, (size_t) defaultChunkSize); - int res = TRI_DumpCollectionReplication(&dump, col, tickStart, tickEnd, chunkSize, withTicks, translateCollectionIds); + if (res != TRI_ERROR_NO_ERROR) { + THROW_ARANGO_EXCEPTION(res); + } + + res = TRI_DumpCollectionReplication(&dump, col, tickStart, tickEnd, chunkSize, withTicks, translateCollectionIds); - TRI_ReleaseCollectionVocBase(_vocbase, col); + if (res != TRI_ERROR_NO_ERROR) { + TRI_DestroyDumpReplication(&dump); + THROW_ARANGO_EXCEPTION(res); + } - if (res == TRI_ERROR_NO_ERROR) { // generate the result - const size_t length = TRI_LengthStringBuffer(dump._buffer); + size_t const length = TRI_LengthStringBuffer(dump._buffer); if (length == 0) { _response = createResponse(HttpResponse::NO_CONTENT); @@ -3267,12 +3244,19 @@ void RestReplicationHandler::handleCommandDump () { // avoid double freeing TRI_StealStringBuffer(dump._buffer); + + TRI_DestroyDumpReplication(&dump); } - else { + catch (triagens::arango::Exception const& ex) { + res = ex.code(); + } + catch (...) { + res = TRI_ERROR_INTERNAL; + } + + if (res != TRI_ERROR_NO_ERROR) { generateError(HttpResponse::SERVER_ERROR, res); } - - TRI_DestroyDumpReplication(&dump); } //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/RestServer/ArangoServer.cpp b/arangod/RestServer/ArangoServer.cpp index 3a3ea35351..719c713bf7 100644 --- a/arangod/RestServer/ArangoServer.cpp +++ b/arangod/RestServer/ArangoServer.cpp @@ -285,7 +285,6 @@ ArangoServer::ArangoServer (int argc, char** argv) _defaultWaitForSync(false), _forceSyncProperties(true), _unusedForceSyncShapes(false), - _disableReplicationLogger(false), _disableReplicationApplier(false), _removeOnDrop(true), _server(nullptr) { @@ -417,6 +416,7 @@ void ArangoServer::buildApplicationServer () { ("ruby.action-directory", &ignoreOpt, "path to the Ruby action directory") ("ruby.modules-path", &ignoreOpt, "one or more directories separated by (semi-) colons") ("ruby.startup-directory", &ignoreOpt, "path to the directory containing alternate Ruby startup scripts") + ("server.disable-replication-logger", &ignoreOpt, "start with replication logger turned off") ; // ............................................................................. @@ -539,7 +539,6 @@ void ArangoServer::buildApplicationServer () { #ifdef TRI_HAVE_LINUX_SOCKETS ("server.disable-authentication-unix-sockets", &_disableAuthenticationUnixSockets, "disable authentication for requests via UNIX domain sockets") #endif - ("server.disable-replication-logger", &_disableReplicationLogger, "start with replication logger turned off") ("server.disable-replication-applier", &_disableReplicationApplier, "start with replication applier turned off") ("server.allow-use-database", &allowUseDatabaseInRESTActions, "allow change of database in REST actions, only needed for unittests") ; @@ -1114,7 +1113,6 @@ void ArangoServer::openDatabases (bool checkVersion, _applicationV8->appPath().c_str(), _applicationV8->devAppPath().c_str(), &defaults, - _disableReplicationLogger, _disableReplicationApplier, iterateMarkersOnOpen); diff --git a/arangod/RestServer/ArangoServer.h b/arangod/RestServer/ArangoServer.h index 584c1ee974..16c82ca15b 100644 --- a/arangod/RestServer/ArangoServer.h +++ b/arangod/RestServer/ArangoServer.h @@ -404,26 +404,6 @@ namespace triagens { bool _unusedForceSyncShapes; -//////////////////////////////////////////////////////////////////////////////// -/// @brief disable the replication logger on server startup -/// -/// @CMDOPT{\--server.disable-replication-logger @CA{flag}} -/// -/// If @LIT{true} the server will start with the replication logger turned off, -/// even if the replication logger is configured with the `autoStart` option. -/// Using this option will not change the value of the `autoStart` option in -/// the logger configuration, but will suppress auto-starting the replication -/// logger just once. -/// -/// If the option is not used, ArangoDB will read the logger configuration from -/// the file `REPLICATION-LOGGER-CONFIG` on startup, and use the value of the -/// `autoStart` attribute from this file. -/// -/// The default is @LIT{false}. -//////////////////////////////////////////////////////////////////////////////// - - bool _disableReplicationLogger; - //////////////////////////////////////////////////////////////////////////////// /// @brief disable the replication applier on server startup /// diff --git a/arangod/V8Server/v8-vocbase.cpp b/arangod/V8Server/v8-vocbase.cpp index 01bcfe445f..961c0cad3f 100644 --- a/arangod/V8Server/v8-vocbase.cpp +++ b/arangod/V8Server/v8-vocbase.cpp @@ -64,7 +64,6 @@ #include "VocBase/general-cursor.h" #include "VocBase/key-generator.h" #include "VocBase/replication-applier.h" -#include "VocBase/replication-logger.h" #include "VocBase/server.h" #include "VocBase/voc-shaper.h" #include "VocBase/index.h" @@ -4564,66 +4563,6 @@ static v8::Handle JS_DeleteCursor (v8::Arguments const& argv) { // --SECTION-- REPLICATION // ----------------------------------------------------------------------------- -//////////////////////////////////////////////////////////////////////////////// -/// @brief start the replication logger manually -//////////////////////////////////////////////////////////////////////////////// - -static v8::Handle JS_StartLoggerReplication (v8::Arguments const& argv) { - v8::HandleScope scope; - - if (argv.Length() != 0) { - TRI_V8_EXCEPTION_USAGE(scope, "REPLICATION_LOGGER_START()"); - } - - TRI_vocbase_t* vocbase = GetContextVocBase(); - - if (vocbase == 0) { - TRI_V8_EXCEPTION(scope, TRI_ERROR_ARANGO_DATABASE_NOT_FOUND); - } - - if (vocbase->_replicationLogger == 0) { - TRI_V8_EXCEPTION(scope, TRI_ERROR_INTERNAL); - } - - int res = TRI_StartReplicationLogger(vocbase->_replicationLogger); - - if (res != TRI_ERROR_NO_ERROR) { - TRI_V8_EXCEPTION_MESSAGE(scope, res, "cannot start replication logger"); - } - - return scope.Close(v8::True()); -} - -//////////////////////////////////////////////////////////////////////////////// -/// @brief stop the replication logger manually -//////////////////////////////////////////////////////////////////////////////// - -static v8::Handle JS_StopLoggerReplication (v8::Arguments const& argv) { - v8::HandleScope scope; - - if (argv.Length() != 0) { - TRI_V8_EXCEPTION_USAGE(scope, "REPLICATION_LOGGER_STOP()"); - } - - TRI_vocbase_t* vocbase = GetContextVocBase(); - - if (vocbase == 0) { - TRI_V8_EXCEPTION(scope, TRI_ERROR_ARANGO_DATABASE_NOT_FOUND); - } - - if (vocbase->_replicationLogger == 0) { - TRI_V8_EXCEPTION(scope, TRI_ERROR_INTERNAL); - } - - int res = TRI_StopReplicationLogger(vocbase->_replicationLogger); - - if (res != TRI_ERROR_NO_ERROR) { - TRI_V8_EXCEPTION_MESSAGE(scope, res, "cannot stop replication logger"); - } - - return scope.Close(v8::True()); -} - //////////////////////////////////////////////////////////////////////////////// /// @brief get the state of the replication logger //////////////////////////////////////////////////////////////////////////////// @@ -4631,124 +4570,44 @@ static v8::Handle JS_StopLoggerReplication (v8::Arguments const& argv static v8::Handle JS_StateLoggerReplication (v8::Arguments const& argv) { v8::HandleScope scope; - if (argv.Length() != 0) { - TRI_V8_EXCEPTION_USAGE(scope, "REPLICATION_LOGGER_STATE()"); - } + triagens::wal::LogfileManagerState s = triagens::wal::LogfileManager::instance()->state(); - TRI_vocbase_t* vocbase = GetContextVocBase(); + v8::Handle result = v8::Object::New(); - if (vocbase == 0) { - TRI_V8_EXCEPTION(scope, TRI_ERROR_ARANGO_DATABASE_NOT_FOUND); - } + v8::Handle state = v8::Object::New(); + state->Set(TRI_V8_STRING("running"), v8::True()); + state->Set(TRI_V8_STRING("lastLogTick"), V8TickId(s.lastTick)); + state->Set(TRI_V8_STRING("totalEvents"), v8::Number::New(s.numEvents)); + state->Set(TRI_V8_STRING("time"), v8::String::New(s.timeString.c_str(), s.timeString.size())); + result->Set(TRI_V8_STRING("state"), state); - if (vocbase->_replicationLogger == 0) { - TRI_V8_EXCEPTION(scope, TRI_ERROR_INTERNAL); - } - - TRI_json_t* json = TRI_JsonReplicationLogger(vocbase->_replicationLogger); - - if (json == 0) { - TRI_V8_EXCEPTION(scope, TRI_ERROR_OUT_OF_MEMORY); - } - - v8::Handle result = TRI_ObjectJson(json); - TRI_FreeJson(TRI_CORE_MEM_ZONE, json); + v8::Handle server = v8::Object::New(); + server->Set(TRI_V8_STRING("version"), v8::String::New(TRI_VERSION)); + server->Set(TRI_V8_STRING("serverId"), v8::String::New(StringUtils::itoa(TRI_GetIdServer()).c_str())); + result->Set(TRI_V8_STRING("server"), server); + + v8::Handle clients = v8::Object::New(); + result->Set(TRI_V8_STRING("clients"), clients); return scope.Close(result); } //////////////////////////////////////////////////////////////////////////////// -/// @brief configure the replication logger manually +/// @brief return the configuration of the replication logger //////////////////////////////////////////////////////////////////////////////// static v8::Handle JS_ConfigureLoggerReplication (v8::Arguments const& argv) { v8::HandleScope scope; - TRI_vocbase_t* vocbase = GetContextVocBase(); + // the replication logger is actually non-existing in ArangoDB 2.2 and higher + // as there is the WAL. To be downwards-compatible, we'll return dummy values + v8::Handle result = v8::Object::New(); + result->Set(TRI_V8_STRING("autostart"), v8::True()); + result->Set(TRI_V8_STRING("logRemoteChanges"), v8::True()); + result->Set(TRI_V8_STRING("maxEvents"), v8::Number::New(0)); + result->Set(TRI_V8_STRING("maxEventsSize"), v8::Number::New(0)); - if (vocbase == 0) { - TRI_V8_EXCEPTION(scope, TRI_ERROR_ARANGO_DATABASE_NOT_FOUND); - } - - if (vocbase->_replicationLogger == 0) { - TRI_V8_EXCEPTION(scope, TRI_ERROR_INTERNAL); - } - - if (argv.Length() == 0) { - // no argument: return the current configuration - - TRI_replication_logger_configuration_t config; - - TRI_ReadLockReadWriteLock(&vocbase->_replicationLogger->_statusLock); - TRI_CopyConfigurationReplicationLogger(&vocbase->_replicationLogger->_configuration, &config); - TRI_ReadUnlockReadWriteLock(&vocbase->_replicationLogger->_statusLock); - - TRI_json_t* json = TRI_JsonConfigurationReplicationLogger(&config); - - if (json == 0) { - TRI_V8_EXCEPTION_MEMORY(scope); - } - - v8::Handle result = TRI_ObjectJson(json); - TRI_FreeJson(TRI_CORE_MEM_ZONE, json); - - return scope.Close(result); - } - - else { - // set the configuration - - if (argv.Length() != 1 || ! argv[0]->IsObject()) { - TRI_V8_EXCEPTION_USAGE(scope, "REPLICATION_LOGGER_CONFIGURE()"); - } - - TRI_replication_logger_configuration_t config; - - // fill with previous configuration - TRI_ReadLockReadWriteLock(&vocbase->_replicationLogger->_statusLock); - TRI_CopyConfigurationReplicationLogger(&vocbase->_replicationLogger->_configuration, &config); - TRI_ReadUnlockReadWriteLock(&vocbase->_replicationLogger->_statusLock); - - // treat the argument as an object from now on - v8::Handle object = v8::Handle::Cast(argv[0]); - - if (object->Has(TRI_V8_SYMBOL("autoStart"))) { - if (object->Get(TRI_V8_SYMBOL("autoStart"))->IsBoolean()) { - config._autoStart = TRI_ObjectToBoolean(object->Get(TRI_V8_SYMBOL("autoStart"))); - } - } - - if (object->Has(TRI_V8_SYMBOL("logRemoteChanges"))) { - if (object->Get(TRI_V8_SYMBOL("logRemoteChanges"))->IsBoolean()) { - config._logRemoteChanges = TRI_ObjectToBoolean(object->Get(TRI_V8_SYMBOL("logRemoteChanges"))); - } - } - - if (object->Has(TRI_V8_SYMBOL("maxEvents"))) { - config._maxEvents = TRI_ObjectToUInt64(object->Get(TRI_V8_SYMBOL("maxEvents")), true); - } - - if (object->Has(TRI_V8_SYMBOL("maxEventsSize"))) { - config._maxEventsSize = TRI_ObjectToUInt64(object->Get(TRI_V8_SYMBOL("maxEventsSize")), true); - } - - int res = TRI_ConfigureReplicationLogger(vocbase->_replicationLogger, &config); - - if (res != TRI_ERROR_NO_ERROR) { - TRI_V8_EXCEPTION(scope, res); - } - - TRI_json_t* json = TRI_JsonConfigurationReplicationLogger(&config); - - if (json == 0) { - TRI_V8_EXCEPTION_MEMORY(scope); - } - - v8::Handle result = TRI_ObjectJson(json); - TRI_FreeJson(TRI_CORE_MEM_ZONE, json); - - return scope.Close(result); - } + return scope.Close(result); } //////////////////////////////////////////////////////////////////////////////// @@ -5159,7 +5018,7 @@ static v8::Handle JS_StateApplierReplication (v8::Arguments const& ar TRI_V8_EXCEPTION(scope, TRI_ERROR_ARANGO_DATABASE_NOT_FOUND); } - if (vocbase->_replicationLogger == 0) { + if (vocbase->_replicationApplier == 0) { TRI_V8_EXCEPTION(scope, TRI_ERROR_INTERNAL); } @@ -10158,8 +10017,6 @@ void TRI_InitV8VocBridge (v8::Handle context, TRI_AddGlobalFunctionVocbase(context, "DELETE_CURSOR", JS_DeleteCursor, true); // replication functions. not intended to be used by end users - TRI_AddGlobalFunctionVocbase(context, "REPLICATION_LOGGER_START", JS_StartLoggerReplication, true); - TRI_AddGlobalFunctionVocbase(context, "REPLICATION_LOGGER_STOP", JS_StopLoggerReplication, true); TRI_AddGlobalFunctionVocbase(context, "REPLICATION_LOGGER_STATE", JS_StateLoggerReplication, true); TRI_AddGlobalFunctionVocbase(context, "REPLICATION_LOGGER_CONFIGURE", JS_ConfigureLoggerReplication, true); TRI_AddGlobalFunctionVocbase(context, "REPLICATION_SYNCHRONISE", JS_SynchroniseReplication, true); diff --git a/arangod/VocBase/document-collection.cpp b/arangod/VocBase/document-collection.cpp index 22d36d010f..0246a63ea4 100644 --- a/arangod/VocBase/document-collection.cpp +++ b/arangod/VocBase/document-collection.cpp @@ -47,7 +47,6 @@ #include "VocBase/index.h" #include "VocBase/key-generator.h" #include "VocBase/primary-index.h" -#include "VocBase/replication-logger.h" #include "VocBase/server.h" #include "VocBase/update-policy.h" #include "VocBase/voc-shaper.h" diff --git a/arangod/VocBase/index.cpp b/arangod/VocBase/index.cpp index 382b1885a6..195cfc13a5 100644 --- a/arangod/VocBase/index.cpp +++ b/arangod/VocBase/index.cpp @@ -51,7 +51,6 @@ #include "Utils/Exception.h" #include "VocBase/document-collection.h" #include "VocBase/edge-collection.h" -#include "VocBase/replication-logger.h" #include "VocBase/server.h" #include "VocBase/voc-shaper.h" #include "Wal/LogfileManager.h" diff --git a/arangod/VocBase/replication-applier.cpp b/arangod/VocBase/replication-applier.cpp index 9f36e41236..f914d466ee 100644 --- a/arangod/VocBase/replication-applier.cpp +++ b/arangod/VocBase/replication-applier.cpp @@ -42,6 +42,7 @@ #include "VocBase/server.h" #include "VocBase/transaction.h" #include "VocBase/vocbase.h" +#include "Replication/ContinuousSyncer.h" // ----------------------------------------------------------------------------- // --SECTION-- REPLICATION APPLIER @@ -422,8 +423,9 @@ static int SetError (TRI_replication_applier_t* applier, //////////////////////////////////////////////////////////////////////////////// void ApplyThread (void* data) { - TRI_RunContinuousSyncerReplication(data); - TRI_DeleteContinuousSyncerReplication(data); + triagens::arango::ContinuousSyncer* s = static_cast(data); + s->run(); + delete s; } //////////////////////////////////////////////////////////////////////////////// @@ -451,10 +453,10 @@ static int StartApplier (TRI_replication_applier_t* applier, return SetError(applier, TRI_ERROR_REPLICATION_INVALID_APPLIER_CONFIGURATION, "no database configured"); } - fetcher = (void*) TRI_CreateContinuousSyncerReplication(applier->_vocbase, - &applier->_configuration, - initialTick, - useTick); + fetcher = (void*) new triagens::arango::ContinuousSyncer(applier->_vocbase, + &applier->_configuration, + initialTick, + useTick); if (fetcher == NULL) { return TRI_ERROR_OUT_OF_MEMORY; @@ -478,7 +480,8 @@ static int StartApplier (TRI_replication_applier_t* applier, TRI_InitThread(&applier->_thread); if (! TRI_StartThread(&applier->_thread, NULL, "[applier]", ApplyThread, fetcher)) { - TRI_DeleteContinuousSyncerReplication(fetcher); + triagens::arango::ContinuousSyncer* s = static_cast(fetcher); + delete s; return TRI_ERROR_INTERNAL; } diff --git a/arangod/VocBase/replication-applier.h b/arangod/VocBase/replication-applier.h index 7d99bc194b..cac62fb96e 100644 --- a/arangod/VocBase/replication-applier.h +++ b/arangod/VocBase/replication-applier.h @@ -33,7 +33,6 @@ #include "Basics/Common.h" #include "BasicsC/locks.h" #include "BasicsC/threads.h" -#include "Replication/replication-static.h" #include "VocBase/replication-common.h" #include "VocBase/voc-types.h" diff --git a/arangod/VocBase/replication-logger.cpp b/arangod/VocBase/replication-logger.cpp deleted file mode 100644 index 9932b71267..0000000000 --- a/arangod/VocBase/replication-logger.cpp +++ /dev/null @@ -1,1288 +0,0 @@ -//////////////////////////////////////////////////////////////////////////////// -/// @brief replication logger -/// -/// @file -/// -/// DISCLAIMER -/// -/// Copyright 2014 ArangoDB GmbH, Cologne, Germany -/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany -/// -/// Licensed under the Apache License, Version 2.0 (the "License"); -/// you may not use this file except in compliance with the License. -/// You may obtain a copy of the License at -/// -/// http://www.apache.org/licenses/LICENSE-2.0 -/// -/// Unless required by applicable law or agreed to in writing, software -/// distributed under the License is distributed on an "AS IS" BASIS, -/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -/// See the License for the specific language governing permissions and -/// limitations under the License. -/// -/// Copyright holder is ArangoDB GmbH, Cologne, Germany -/// -/// @author Jan Steemann -/// @author Copyright 2014, ArangoDB GmbH, Cologne, Germany -/// @author Copyright 2011-2013, triAGENS GmbH, Cologne, Germany -//////////////////////////////////////////////////////////////////////////////// - -#include "replication-logger.h" - -#include "BasicsC/conversions.h" -#include "BasicsC/files.h" -#include "BasicsC/json.h" -#include "BasicsC/logging.h" -#include "BasicsC/string-buffer.h" -#include "BasicsC/tri-strings.h" - -#include "VocBase/collection.h" -#include "VocBase/datafile.h" -#include "VocBase/document-collection.h" -#include "VocBase/server.h" -#include "VocBase/transaction.h" -#include "VocBase/vocbase.h" -#include "VocBase/voc-shaper.h" - -// ----------------------------------------------------------------------------- -// --SECTION-- REPLICATION LOGGER -// ----------------------------------------------------------------------------- - -// ----------------------------------------------------------------------------- -// --SECTION-- CLIENT HANDLING -// ----------------------------------------------------------------------------- - -// ----------------------------------------------------------------------------- -// --SECTION-- private types -// ----------------------------------------------------------------------------- - -//////////////////////////////////////////////////////////////////////////////// -/// @brief struct to hold a client action -//////////////////////////////////////////////////////////////////////////////// - -typedef struct logger_client_s { - TRI_server_id_t _serverId; - TRI_voc_tick_t _lastServedTick; - char _stamp[24]; -} -logger_client_t; - -// ----------------------------------------------------------------------------- -// --SECTION-- private functions -// ----------------------------------------------------------------------------- - -//////////////////////////////////////////////////////////////////////////////// -/// @brief hashes a client id -//////////////////////////////////////////////////////////////////////////////// - -static uint64_t HashKeyClient (TRI_associative_pointer_t* array, - void const* key) { - TRI_server_id_t const* k = static_cast(key); - - return (uint64_t) *k; -} - -//////////////////////////////////////////////////////////////////////////////// -/// @brief hashes a client struct -//////////////////////////////////////////////////////////////////////////////// - -static uint64_t HashElementClient (TRI_associative_pointer_t* array, - void const* element) { - logger_client_t const* e = static_cast(element); - - return (uint64_t) e->_serverId; -} - -//////////////////////////////////////////////////////////////////////////////// -/// @brief compares a client key and a client struct -//////////////////////////////////////////////////////////////////////////////// - -static bool IsEqualKeyClient (TRI_associative_pointer_t* array, - void const* key, - void const* element) { - TRI_server_id_t const* k = static_cast(key); - logger_client_t const* e = static_cast(element); - - return *k == e->_serverId; -} - -//////////////////////////////////////////////////////////////////////////////// -/// @brief free a single registered client -//////////////////////////////////////////////////////////////////////////////// - -static void FreeClient (logger_client_t* client) { - TRI_Free(TRI_UNKNOWN_MEM_ZONE, client); -} - -//////////////////////////////////////////////////////////////////////////////// -/// @brief free registered clients -//////////////////////////////////////////////////////////////////////////////// - -static void FreeClients (TRI_replication_logger_t* logger) { - uint32_t i, n; - - n = logger->_clients._nrAlloc; - - for (i = 0; i < n; ++i) { - logger_client_t* client = static_cast(logger->_clients._table[i]); - - if (client != NULL) { - FreeClient(client); - } - } -} - -// ----------------------------------------------------------------------------- -// --SECTION-- LOGGING -// ----------------------------------------------------------------------------- - -// ----------------------------------------------------------------------------- -// --SECTION-- private defines -// ----------------------------------------------------------------------------- - -//////////////////////////////////////////////////////////////////////////////// -/// @brief shortcut function -//////////////////////////////////////////////////////////////////////////////// - -#define FAIL_IFNOT(func, buffer, val) \ - if (func(buffer, val) != TRI_ERROR_NO_ERROR) { \ - return false; \ - } - -//////////////////////////////////////////////////////////////////////////////// -/// @brief create a string-buffer function name -//////////////////////////////////////////////////////////////////////////////// - -#define APPEND_FUNC(name) TRI_ ## name ## StringBuffer - -//////////////////////////////////////////////////////////////////////////////// -/// @brief append a character to a string-buffer or fail -//////////////////////////////////////////////////////////////////////////////// - -#define APPEND_CHAR(buffer, c) FAIL_IFNOT(APPEND_FUNC(AppendChar), buffer, c) - -//////////////////////////////////////////////////////////////////////////////// -/// @brief append a string to a string-buffer or fail -//////////////////////////////////////////////////////////////////////////////// - -#define APPEND_STRING(buffer, str) FAIL_IFNOT(APPEND_FUNC(AppendString), buffer, str) - -//////////////////////////////////////////////////////////////////////////////// -/// @brief append uint64 to a string-buffer or fail -//////////////////////////////////////////////////////////////////////////////// - -#define APPEND_UINT64(buffer, val) FAIL_IFNOT(APPEND_FUNC(AppendUInt64), buffer, val) - -//////////////////////////////////////////////////////////////////////////////// -/// @brief append json to a string-buffer or fail -//////////////////////////////////////////////////////////////////////////////// - -#define APPEND_JSON(buffer, json) FAIL_IFNOT(TRI_StringifyJson, buffer, json) - -// ----------------------------------------------------------------------------- -// --SECTION-- private variables -// ----------------------------------------------------------------------------- - -//////////////////////////////////////////////////////////////////////////////// -/// @brief number of pre-allocated string buffers for logging -//////////////////////////////////////////////////////////////////////////////// - -static size_t NumBuffers = 16; - -//////////////////////////////////////////////////////////////////////////////// -/// @brief pre-allocated size for each log buffer -//////////////////////////////////////////////////////////////////////////////// - -static size_t BufferSize = 256; - -// ----------------------------------------------------------------------------- -// --SECTION-- private functions -// ----------------------------------------------------------------------------- - -//////////////////////////////////////////////////////////////////////////////// -/// @brief free the logger's cap constraint -/// the function must called under the statusLock -//////////////////////////////////////////////////////////////////////////////// - -static void FreeCap (TRI_replication_logger_t* logger) { - TRI_ASSERT(logger != NULL); - - if (logger->_cap != NULL) { - - TRI_ASSERT(logger->_trxCollection != NULL); - TRI_ASSERT(logger->_trxCollection->_collection != NULL); - - TRI_document_collection_t* document = logger->_trxCollection->_collection->_collection; - TRI_ASSERT(document != NULL); - - // TODO: this wont work anymore - remove function altogether - TRI_DropIndexDocumentCollection(document, - logger->_cap->_iid, - TRI_GetIdServer()); - - logger->_cap = NULL; - } -} - -//////////////////////////////////////////////////////////////////////////////// -/// @brief create a cap constraint for the logger -/// the function must called under the statusLock -//////////////////////////////////////////////////////////////////////////////// - -static bool CreateCap (TRI_replication_logger_t* logger) { - TRI_index_t* idx; - size_t maxEvents; - int64_t maxEventsSize; - - if (logger->_configuration._maxEvents == 0ULL && - logger->_configuration._maxEventsSize == 0ULL) { - return true; - } - - TRI_document_collection_t* document = logger->_trxCollection->_collection->_collection; - TRI_ASSERT(document != NULL); - - TRI_ASSERT(logger->_configuration._maxEvents > 0ULL || - logger->_configuration._maxEventsSize > 0ULL); - - LOG_TRACE("creating cap constraint for replication logger. maxEvents: %llu, maxEventsSize: %llu", - (unsigned long long) logger->_configuration._maxEvents, - (unsigned long long) logger->_configuration._maxEventsSize); - - // need to convert to (possibly) lower types - if (logger->_configuration._maxEvents > (uint64_t) SIZE_MAX) { - maxEvents = SIZE_MAX; - } - else { - maxEvents = (size_t) logger->_configuration._maxEvents; - } - - if (logger->_configuration._maxEventsSize > (uint64_t) INT64_MAX) { - maxEventsSize = INT64_MAX; - } - else { - maxEventsSize = (int64_t) logger->_configuration._maxEventsSize; - } - - idx = TRI_EnsureCapConstraintDocumentCollection(document, - 0, - maxEvents, - maxEventsSize, - NULL, - TRI_GetIdServer()); - - if (idx == NULL) { - LOG_WARNING("creating cap constraint for '%s' failed", TRI_COL_NAME_REPLICATION); - return false; - } - - logger->_cap = idx; - - return true; -} - -//////////////////////////////////////////////////////////////////////////////// -/// @brief get a buffer to write an event in -//////////////////////////////////////////////////////////////////////////////// - -static TRI_string_buffer_t* GetBuffer (TRI_replication_logger_t* logger) { - size_t n; - - TRI_ASSERT(logger != NULL); - TRI_string_buffer_t* buffer = NULL; - - // locked section - // --------------------------------------- - TRI_LockSpin(&logger->_bufferLock); - - n = logger->_buffers._length; - - if (n > 0) { - buffer = static_cast(TRI_RemoveVectorPointer(&logger->_buffers, (size_t) (n - 1))); - } - - TRI_UnlockSpin(&logger->_bufferLock); - // --------------------------------------- - // locked section end - - TRI_ASSERT(buffer != NULL); - - return buffer; -} - -//////////////////////////////////////////////////////////////////////////////// -/// @brief returns a buffer to the list of available buffers -//////////////////////////////////////////////////////////////////////////////// - -static void ReturnBuffer (TRI_replication_logger_t* logger, - TRI_string_buffer_t* buffer) { - TRI_ASSERT(logger != NULL); - TRI_ASSERT(buffer != NULL); - - // make the buffer usable again - if (buffer->_buffer == NULL) { - TRI_InitSizedStringBuffer(buffer, TRI_CORE_MEM_ZONE, BufferSize); - } - else { - TRI_ResetStringBuffer(buffer); - } - - // locked section - // --------------------------------------- - TRI_LockSpin(&logger->_bufferLock); - - TRI_PushBackVectorPointer(&logger->_buffers, buffer); - TRI_ASSERT(logger->_buffers._length <= NumBuffers); - - TRI_UnlockSpin(&logger->_bufferLock); - // --------------------------------------- - // locked section end -} - -//////////////////////////////////////////////////////////////////////////////// -/// @brief logs a replication event contained in the buffer -/// the function will always free the buffer passed -//////////////////////////////////////////////////////////////////////////////// - -static int LogEvent (TRI_replication_logger_t* logger, - TRI_voc_tid_t tid, - bool isStandaloneOperation, - TRI_replication_operation_e type, - TRI_string_buffer_t* buffer) { - TRI_memory_zone_t* zone; - TRI_shaped_json_t* shaped; - TRI_json_t json; - TRI_doc_mptr_copy_t mptr; - size_t len; - int res; - bool forceSync; - bool withTid; - bool lock; - - TRI_ASSERT(logger != NULL); - TRI_ASSERT(buffer != NULL); - - len = TRI_LengthStringBuffer(buffer); - - if (len < 1) { - // buffer is empty - ReturnBuffer(logger, buffer); - - return TRI_ERROR_NO_ERROR; - } - - // create fake transaction to prevent assertion error. TODO: FIXME - triagens::arango::TransactionBase fake(true); - - // do we have a transaction id? - withTid = (tid > 0); - - // this type of operation will be synced. all other operations will not be synced. - forceSync = (type == REPLICATION_STOP); - - TRI_InitArray2Json(TRI_CORE_MEM_ZONE, &json, withTid ? 3 : 2); - - // add "type" attribute - { - TRI_json_t typeAttribute; - TRI_InitNumberJson(&typeAttribute, (double) type); - - TRI_Insert4ArrayJson(TRI_CORE_MEM_ZONE, - &json, - (char*) "type", - 4, // strlen("type") - &typeAttribute, - true); - } - - // "tid" attribute - if (withTid) { - TRI_json_t tidAttribute; - TRI_InitStringJson(&tidAttribute, TRI_StringUInt64(tid)); - - TRI_Insert4ArrayJson(TRI_CORE_MEM_ZONE, - &json, - (char*) "tid", - 3, // strlen("tid") - &tidAttribute, - true); - } - - // "data" attribute - { - TRI_json_t dataAttribute; - // pass the string-buffer buffer pointer to the JSON - TRI_InitStringReference2Json(&dataAttribute, TRI_BeginStringBuffer(buffer), TRI_LengthStringBuffer(buffer)); - - TRI_Insert4ArrayJson(TRI_CORE_MEM_ZONE, - &json, - (char*) "data", - 4, // strlen("data") - &dataAttribute, - true); - } - - LOG_TRACE("logging replication event, type: %d, tid: %llu, sync: %d, data: %s", - (int) type, - (unsigned long long) tid, - (int) forceSync, - TRI_BeginStringBuffer(buffer)); - - lock = isStandaloneOperation; - - TRI_document_collection_t* document = logger->_trxCollection->_collection->_collection; - zone = document->getShaper()->_memoryZone; // ONLY IN INDEX, PROTECTED by RUNTIME - shaped = TRI_ShapedJsonJson(document->getShaper(), &json, true, ! lock); // ONLY IN INDEX, PROTECTED by RUNTIME - TRI_DestroyJson(TRI_CORE_MEM_ZONE, &json); - - ReturnBuffer(logger, buffer); - - if (shaped == NULL) { - return TRI_ERROR_ARANGO_SHAPER_FAILED; - } - - res = TRI_InsertShapedJsonDocumentCollection(logger->_trxCollection, - NULL, - 0, - &mptr, - shaped, - NULL, - lock, - forceSync, - false); - - TRI_FreeShapedJson(zone, shaped); - - if (res != TRI_ERROR_NO_ERROR) { - return res; - } - - // TRI_ASSERT the write was successful - TRI_ASSERT(mptr.getDataPtr() != NULL); // PROTECTED by trx in logger - - // update the last tick that we've logged - TRI_LockSpin(&logger->_idLock); - - logger->_state._lastLogTick = ((TRI_df_marker_t*) mptr.getDataPtr())->_tick; // PROTECTED by trx in logger - logger->_state._totalEvents++; - - TRI_UnlockSpin(&logger->_idLock); - - return TRI_ERROR_NO_ERROR; -} - -//////////////////////////////////////////////////////////////////////////////// -/// @brief stringify a "replication" operation with a tick -//////////////////////////////////////////////////////////////////////////////// - -static bool StringifyTickReplication (TRI_string_buffer_t* buffer, - TRI_voc_tick_t tick) { - if (buffer == NULL) { - return false; - } - - APPEND_STRING(buffer, "{\"lastTick\":\""); - APPEND_UINT64(buffer, (uint64_t) tick); - APPEND_STRING(buffer, "\"}"); - - return true; -} - -//////////////////////////////////////////////////////////////////////////////// -/// @brief get current state from the replication logger -/// note: must hold the lock when calling this -//////////////////////////////////////////////////////////////////////////////// - -static int GetStateReplicationLogger (TRI_replication_logger_t* logger, - TRI_replication_logger_state_t* dst) { - TRI_ASSERT(logger->_state._active); - - TRI_LockSpin(&logger->_idLock); - memcpy(dst, &logger->_state, sizeof(TRI_replication_logger_state_t)); - TRI_UnlockSpin(&logger->_idLock); - - return TRI_ERROR_NO_ERROR; -} - -//////////////////////////////////////////////////////////////////////////////// -/// @brief start the replication logger -/// note: must hold the lock when calling this -//////////////////////////////////////////////////////////////////////////////// - -static int StartReplicationLogger (TRI_replication_logger_t* logger) { - TRI_transaction_t* trx; - TRI_vocbase_col_t* collection; - TRI_vocbase_t* vocbase; - TRI_transaction_hint_t hint; - TRI_string_buffer_t* buffer; - TRI_voc_cid_t cid; - int res; - - if (logger->_state._active) { - return TRI_ERROR_INTERNAL; - } - - TRI_ASSERT(logger->_trx == NULL); - TRI_ASSERT(logger->_trxCollection == NULL); - TRI_ASSERT(logger->_state._lastLogTick == 0); - - vocbase = logger->_vocbase; - collection = TRI_LookupCollectionByNameVocBase(vocbase, TRI_COL_NAME_REPLICATION); - - if (collection == NULL) { - // try to create _replication collection on the fly - TRI_col_info_t parameter; - - TRI_InitCollectionInfo(vocbase, - ¶meter, - TRI_COL_NAME_REPLICATION, - (TRI_col_type_e) TRI_COL_TYPE_DOCUMENT, - (TRI_voc_size_t) vocbase->_settings.defaultMaximalSize, - NULL); - - parameter._isSystem = true; - - collection = TRI_CreateCollectionVocBase(vocbase, ¶meter, 0, TRI_GetIdServer()); - TRI_FreeCollectionInfoOptions(¶meter); - - if (collection != NULL) { - LOG_INFO("created collection '" TRI_COL_NAME_REPLICATION "'"); - } - } - - if (collection == NULL) { - LOG_ERROR("could not open collection '" TRI_COL_NAME_REPLICATION "'"); - - return TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND; - } - - cid = collection->_cid; - // create fake transaction to prevent assertion error. TODO: FIXME - triagens::arango::TransactionBase fake(true); - - trx = TRI_CreateTransaction(vocbase, TRI_GetIdServer(), false, 0.0, false); - - if (trx == NULL) { - return TRI_ERROR_OUT_OF_MEMORY; - } - - res = TRI_AddCollectionTransaction(trx, cid, TRI_TRANSACTION_WRITE, TRI_TRANSACTION_TOP_LEVEL); - - if (res != TRI_ERROR_NO_ERROR) { - TRI_FreeTransaction(trx); - - return TRI_ERROR_INTERNAL; - } - - // the SINGLE_OPERATION hint is actually a hack: - // the logger does not write just one operation, but it is used to prevent locking the collection - // for the entire duration of the transaction - hint = (TRI_transaction_hint_t) TRI_TRANSACTION_HINT_SINGLE_OPERATION; - res = TRI_BeginTransaction(trx, hint, TRI_TRANSACTION_TOP_LEVEL); - - if (res != TRI_ERROR_NO_ERROR) { - TRI_FreeTransaction(trx); - - return TRI_ERROR_INTERNAL; - } - - logger->_trxCollection = TRI_GetCollectionTransaction(trx, cid, TRI_TRANSACTION_WRITE); - logger->_trx = trx; - - TRI_ASSERT(logger->_trxCollection != NULL); - TRI_ASSERT(logger->_trxCollection->_collection != NULL); - TRI_ASSERT(logger->_trxCollection->_collection->_collection != NULL); - TRI_ASSERT(logger->_state._active == false); - - TRI_ASSERT(logger->_cap == NULL); - // create cap constraint? - if (logger->_configuration._maxEvents > 0ULL || - logger->_configuration._maxEventsSize > 0ULL) { - - CreateCap(logger); - } - - logger->_state._lastLogTick = ((TRI_collection_t*) collection->_collection)->_info._revision; - logger->_state._active = true; - - LOG_INFO("started replication logger for database '%s', last tick: %llu", - logger->_databaseName, - (unsigned long long) logger->_state._lastLogTick); - - buffer = GetBuffer(logger); - - if (! StringifyTickReplication(buffer, logger->_state._lastLogTick)) { - ReturnBuffer(logger, buffer); - - return TRI_ERROR_OUT_OF_MEMORY; - } - - res = LogEvent(logger, 0, true, REPLICATION_START, buffer); - - return res; -} - -//////////////////////////////////////////////////////////////////////////////// -/// @brief stop the replication logger -/// note: must hold the lock when calling this -//////////////////////////////////////////////////////////////////////////////// - -static int StopReplicationLogger (TRI_replication_logger_t* logger) { - TRI_string_buffer_t* buffer; - TRI_voc_tick_t lastTick; - int res; - - if (! logger->_state._active) { - return TRI_ERROR_INTERNAL; - } - - TRI_LockSpin(&logger->_idLock); - lastTick = logger->_state._lastLogTick; - TRI_UnlockSpin(&logger->_idLock); - - TRI_ASSERT(logger->_trx != NULL); - TRI_ASSERT(logger->_trxCollection != NULL); - - buffer = GetBuffer(logger); - - if (! StringifyTickReplication(buffer, lastTick)) { - ReturnBuffer(logger, buffer); - - return TRI_ERROR_OUT_OF_MEMORY; - } - - res = LogEvent(logger, 0, true, REPLICATION_STOP, buffer); - - // destroy cap constraint - FreeCap(logger); - - TRI_CommitTransaction(logger->_trx, 0); - - TRI_FreeTransaction(logger->_trx); - - LOG_INFO("stopped replication logger for database '%s', last tick: %llu", - logger->_databaseName, - (unsigned long long) lastTick); - - logger->_trx = NULL; - logger->_trxCollection = NULL; - logger->_state._lastLogTick = 0; - logger->_state._active = false; - - return res; -} - -//////////////////////////////////////////////////////////////////////////////// -/// @brief get the state of the _replication collection for a non-running -/// replication logger -/// note: must hold the lock when calling this -//////////////////////////////////////////////////////////////////////////////// - -static int GetStateInactive (TRI_replication_logger_t* logger, - TRI_replication_logger_state_t* dst) { - TRI_vocbase_t* vocbase; - TRI_vocbase_col_t* col; - - vocbase = logger->_vocbase; - - if (vocbase->_type == TRI_VOCBASE_TYPE_COORDINATOR) { - dst->_lastLogTick = 0; - dst->_totalEvents = 0; - dst->_active = false; - return TRI_ERROR_NO_ERROR; - } - - TRI_vocbase_col_status_e status; - col = TRI_UseCollectionByNameVocBase(vocbase, TRI_COL_NAME_REPLICATION, status); - - if (col == NULL || col->_collection == NULL) { - LOG_ERROR("could not open collection '" TRI_COL_NAME_REPLICATION "'"); - - return TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND; - } - - TRI_document_collection_t* document = col->_collection; - - dst->_lastLogTick = document->_info._revision; - dst->_totalEvents = 0; - dst->_active = false; - - TRI_ReleaseCollectionVocBase(vocbase, col); - - TRI_LockSpin(&logger->_idLock); - dst->_totalEvents = logger->_state._totalEvents; - TRI_UnlockSpin(&logger->_idLock); - - return TRI_ERROR_NO_ERROR; -} - -//////////////////////////////////////////////////////////////////////////////// -/// @brief free all allocated buffers -//////////////////////////////////////////////////////////////////////////////// - -static void FreeBuffers (TRI_replication_logger_t* logger) { - size_t i, n; - - LOG_TRACE("freeing buffers"); - - n = logger->_buffers._length; - - for (i = 0; i < n; ++i) { - TRI_string_buffer_t* buffer = (TRI_string_buffer_t*) TRI_AtVectorPointer(&logger->_buffers, i); - - TRI_ASSERT(buffer != NULL); - TRI_FreeStringBuffer(TRI_CORE_MEM_ZONE, buffer); - } - - TRI_DestroyVectorPointer(&logger->_buffers); -} - -//////////////////////////////////////////////////////////////////////////////// -/// @brief initialise buffers -//////////////////////////////////////////////////////////////////////////////// - -static int InitBuffers (TRI_replication_logger_t* logger) { - size_t i; - int res; - - TRI_ASSERT(NumBuffers > 0); - - LOG_TRACE("initialising buffers"); - - res = TRI_InitVectorPointer2(&logger->_buffers, TRI_CORE_MEM_ZONE, NumBuffers); - - if (res != TRI_ERROR_NO_ERROR) { - return res; - } - - for (i = 0; i < NumBuffers; ++i) { - TRI_string_buffer_t* buffer = TRI_CreateSizedStringBuffer(TRI_CORE_MEM_ZONE, BufferSize); - - if (buffer == NULL) { - FreeBuffers(logger); - - return TRI_ERROR_OUT_OF_MEMORY; - } - - TRI_PushBackVectorPointer(&logger->_buffers, buffer); - } - - TRI_ASSERT(logger->_buffers._length == NumBuffers); - - return TRI_ERROR_NO_ERROR; -} - -//////////////////////////////////////////////////////////////////////////////// -/// @brief get the filename of the replication logger configuration file -//////////////////////////////////////////////////////////////////////////////// - -static char* GetConfigurationFilename (TRI_vocbase_t* vocbase) { - return TRI_Concatenate2File(vocbase->_path, "REPLICATION-LOGGER-CONFIG"); -} - -// ----------------------------------------------------------------------------- -// --SECTION-- constructors / destructors -// ----------------------------------------------------------------------------- - -//////////////////////////////////////////////////////////////////////////////// -/// @brief create a replication logger -//////////////////////////////////////////////////////////////////////////////// - -TRI_replication_logger_t* TRI_CreateReplicationLogger (TRI_vocbase_t* vocbase) { - TRI_replication_logger_t* logger = static_cast(TRI_Allocate(TRI_CORE_MEM_ZONE, sizeof(TRI_replication_logger_t), false)); - - if (logger == NULL) { - return NULL; - } - - // init string buffers - int res = InitBuffers(logger); - - if (res != TRI_ERROR_NO_ERROR) { - // out of memory - TRI_Free(TRI_CORE_MEM_ZONE, logger); - - return NULL; - } - - res = TRI_InitAssociativePointer(&logger->_clients, - TRI_UNKNOWN_MEM_ZONE, - HashKeyClient, - HashElementClient, - IsEqualKeyClient, - NULL); - - if (res != TRI_ERROR_NO_ERROR) { - // out of memory - FreeBuffers(logger); - TRI_Free(TRI_CORE_MEM_ZONE, logger); - - return NULL; - } - - TRI_InitReadWriteLock(&logger->_statusLock); - TRI_InitReadWriteLock(&logger->_clientsLock); - TRI_InitSpin(&logger->_idLock); - TRI_InitSpin(&logger->_bufferLock); - - logger->_vocbase = vocbase; - logger->_trx = NULL; - logger->_trxCollection = NULL; - logger->_cap = NULL; - - logger->_state._lastLogTick = 0; - logger->_state._totalEvents = 0; - logger->_state._active = false; - logger->_configuration._logRemoteChanges = false; - logger->_configuration._maxEvents = (uint64_t) TRI_REPLICATION_LOGGER_EVENTS_DEFAULT; - logger->_configuration._maxEventsSize = (uint64_t) TRI_REPLICATION_LOGGER_SIZE_DEFAULT; - logger->_configuration._autoStart = false; - - logger->_localServerId = TRI_GetIdServer(); - logger->_databaseName = TRI_DuplicateStringZ(TRI_CORE_MEM_ZONE, vocbase->_name); - - TRI_ASSERT(logger->_databaseName != NULL); - - // check if there is a configuration file to load - char* filename = GetConfigurationFilename(vocbase); - - if (filename != NULL) { - LOG_TRACE("looking for replication logger configuration in '%s'", filename); - - if (TRI_ExistsFile(filename)) { - TRI_json_t* json; - - LOG_TRACE("loading replication logger configuration from '%s'", filename); - - json = TRI_JsonFile(TRI_CORE_MEM_ZONE, filename, NULL); - - if (TRI_IsArrayJson(json)) { - TRI_json_t const* value; - - value = TRI_LookupArrayJson(json, "autoStart"); - - if (value != NULL && value->_type == TRI_JSON_BOOLEAN) { - logger->_configuration._autoStart = value->_value._boolean; - } - - value = TRI_LookupArrayJson(json, "logRemoteChanges"); - - if (value != NULL && value->_type == TRI_JSON_BOOLEAN) { - logger->_configuration._logRemoteChanges = value->_value._boolean; - } - - value = TRI_LookupArrayJson(json, "maxEvents"); - - if (value != NULL && value->_type == TRI_JSON_NUMBER) { - logger->_configuration._maxEvents = (uint64_t) value->_value._number; - } - - value = TRI_LookupArrayJson(json, "maxEventsSize"); - - if (value != NULL && value->_type == TRI_JSON_NUMBER) { - logger->_configuration._maxEventsSize = (uint64_t) value->_value._number; - } - } - - if (json != NULL) { - TRI_FreeJson(TRI_CORE_MEM_ZONE, json); - } - } - - TRI_FreeString(TRI_CORE_MEM_ZONE, filename); - } - - return logger; -} - -//////////////////////////////////////////////////////////////////////////////// -/// @brief destroy a replication logger -//////////////////////////////////////////////////////////////////////////////// - -void TRI_DestroyReplicationLogger (TRI_replication_logger_t* logger) { - TRI_StopReplicationLogger(logger); - - FreeBuffers(logger); - - FreeClients(logger); - TRI_DestroyAssociativePointer(&logger->_clients); - - TRI_FreeString(TRI_CORE_MEM_ZONE, logger->_databaseName); - TRI_DestroySpin(&logger->_bufferLock); - TRI_DestroySpin(&logger->_idLock); - TRI_DestroyReadWriteLock(&logger->_clientsLock); - TRI_DestroyReadWriteLock(&logger->_statusLock); -} - -//////////////////////////////////////////////////////////////////////////////// -/// @brief free a replication logger -//////////////////////////////////////////////////////////////////////////////// - -void TRI_FreeReplicationLogger (TRI_replication_logger_t* logger) { - TRI_DestroyReplicationLogger(logger); - TRI_Free(TRI_CORE_MEM_ZONE, logger); -} - -// ----------------------------------------------------------------------------- -// --SECTION-- public functions -// ----------------------------------------------------------------------------- - -//////////////////////////////////////////////////////////////////////////////// -/// @brief get a JSON representation of the replication logger configuration -//////////////////////////////////////////////////////////////////////////////// - -TRI_json_t* TRI_JsonConfigurationReplicationLogger (TRI_replication_logger_configuration_t const* config) { - TRI_json_t* json; - - json = TRI_CreateArray2Json(TRI_CORE_MEM_ZONE, 3); - - if (json == NULL) { - return NULL; - } - - TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, - json, - "autoStart", - TRI_CreateBooleanJson(TRI_CORE_MEM_ZONE, config->_autoStart)); - - TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, - json, - "logRemoteChanges", - TRI_CreateBooleanJson(TRI_CORE_MEM_ZONE, config->_logRemoteChanges)); - - TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, - json, - "maxEvents", - TRI_CreateNumberJson(TRI_CORE_MEM_ZONE, (double) config->_maxEvents)); - - TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, - json, - "maxEventsSize", - TRI_CreateNumberJson(TRI_CORE_MEM_ZONE, (double) config->_maxEventsSize)); - - return json; -} - -//////////////////////////////////////////////////////////////////////////////// -/// @brief configure the replication logger -//////////////////////////////////////////////////////////////////////////////// - -int TRI_ConfigureReplicationLogger (TRI_replication_logger_t* logger, - TRI_replication_logger_configuration_t const* config) { - uint64_t oldMaxEvents; - uint64_t oldMaxEventsSize; - char* filename; - int res; - - if (logger->_vocbase->_type == TRI_VOCBASE_TYPE_COORDINATOR) { - return TRI_ERROR_CLUSTER_UNSUPPORTED; - } - - res = TRI_ERROR_NO_ERROR; - - if (config->_maxEventsSize != 0 || config->_maxEvents != 0) { - if (config->_maxEvents > 0 && config->_maxEvents < TRI_REPLICATION_LOGGER_EVENTS_MIN) { - return TRI_ERROR_REPLICATION_INVALID_LOGGER_CONFIGURATION; - } - - if (config->_maxEventsSize > 0 && config->_maxEventsSize < TRI_REPLICATION_LOGGER_SIZE_MIN) { - return TRI_ERROR_REPLICATION_INVALID_LOGGER_CONFIGURATION; - } - } - - // valid - - TRI_WriteLockReadWriteLock(&logger->_statusLock); - oldMaxEvents = logger->_configuration._maxEvents; - oldMaxEventsSize = logger->_configuration._maxEventsSize; - - if (config->_maxEvents != oldMaxEvents || - config->_maxEventsSize != oldMaxEventsSize) { - // configuration change. free existing cap - if (logger->_state._active) { - FreeCap(logger); - } - - // set new limits and re-create cap if necessary - logger->_configuration._maxEvents = config->_maxEvents; - logger->_configuration._maxEventsSize = config->_maxEventsSize; - - TRI_ASSERT(logger->_cap == NULL); - - if (logger->_state._active) { - CreateCap(logger); - } - } - - logger->_configuration._logRemoteChanges = config->_logRemoteChanges; - logger->_configuration._autoStart = config->_autoStart; - - // now save configuration to file - filename = GetConfigurationFilename(logger->_vocbase); - - if (filename != NULL) { - TRI_json_t* json = TRI_JsonConfigurationReplicationLogger(&logger->_configuration); - - if (json != NULL) { - TRI_SaveJson(filename, json, true); - TRI_FreeJson(TRI_CORE_MEM_ZONE, json); - } - - TRI_Free(TRI_CORE_MEM_ZONE, filename); - } - - TRI_WriteUnlockReadWriteLock(&logger->_statusLock); - - return res; -} - -//////////////////////////////////////////////////////////////////////////////// -/// @brief copy a logger configuration -//////////////////////////////////////////////////////////////////////////////// - -void TRI_CopyConfigurationReplicationLogger (TRI_replication_logger_configuration_t const* src, - TRI_replication_logger_configuration_t* dst) { - memcpy(dst, src, sizeof(TRI_replication_logger_configuration_t)); -} - -//////////////////////////////////////////////////////////////////////////////// -/// @brief return the list of clients as a JSON array -//////////////////////////////////////////////////////////////////////////////// - -TRI_json_t* TRI_JsonClientsReplicationLogger (TRI_replication_logger_t* logger) { - TRI_json_t* json; - uint32_t i, n; - - json = TRI_CreateListJson(TRI_CORE_MEM_ZONE); - - if (json == NULL) { - return NULL; - } - - TRI_ReadLockReadWriteLock(&logger->_clientsLock); - - n = logger->_clients._nrAlloc; - - for (i = 0; i < n; ++i) { - logger_client_t* client = static_cast(logger->_clients._table[i]); - - if (client != NULL) { - TRI_json_t* element = TRI_CreateArrayJson(TRI_CORE_MEM_ZONE); - - if (element != NULL) { - char* value; - - value = TRI_StringUInt64(client->_serverId); - - if (value != NULL) { - TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, element, "serverId", TRI_CreateStringJson(TRI_CORE_MEM_ZONE, value)); - } - - value = TRI_StringUInt64(client->_lastServedTick); - - if (value != NULL) { - TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, element, "lastServedTick", TRI_CreateStringJson(TRI_CORE_MEM_ZONE, value)); - } - - TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, element, "time", TRI_CreateStringCopyJson(TRI_CORE_MEM_ZONE, client->_stamp)); - - TRI_PushBack3ListJson(TRI_CORE_MEM_ZONE, json, element); - } - } - } - - TRI_ReadUnlockReadWriteLock(&logger->_clientsLock); - - return json; -} - -//////////////////////////////////////////////////////////////////////////////// -/// @brief insert an applier action into an action list -//////////////////////////////////////////////////////////////////////////////// - -void TRI_UpdateClientReplicationLogger (TRI_replication_logger_t* logger, - TRI_server_id_t serverId, - TRI_voc_tick_t lastServedTick) { - - logger_client_t* client = static_cast(TRI_Allocate(TRI_UNKNOWN_MEM_ZONE, sizeof(logger_client_t), false)); - - if (client == NULL) { - return; - } - - TRI_GetTimeStampReplication(client->_stamp, sizeof(client->_stamp) - 1); - - client->_serverId = serverId; - client->_lastServedTick = lastServedTick; - - TRI_WriteLockReadWriteLock(&logger->_clientsLock); - - void* found = TRI_RemoveKeyAssociativePointer(&logger->_clients, &client->_serverId); - - if (found != NULL) { - FreeClient((logger_client_t*) found); - } - - found = TRI_InsertKeyAssociativePointer(&logger->_clients, &client->_serverId, (void*) client, false); - TRI_ASSERT(found == NULL); - - TRI_WriteUnlockReadWriteLock(&logger->_clientsLock); -} - -//////////////////////////////////////////////////////////////////////////////// -/// @brief start the replication logger -//////////////////////////////////////////////////////////////////////////////// - -int TRI_StartReplicationLogger (TRI_replication_logger_t* logger) { - int res; - - if (logger->_vocbase->_type == TRI_VOCBASE_TYPE_COORDINATOR) { - return TRI_ERROR_CLUSTER_UNSUPPORTED; - } - - res = TRI_ERROR_NO_ERROR; - - TRI_WriteLockReadWriteLock(&logger->_statusLock); - - if (! logger->_state._active) { - res = StartReplicationLogger(logger); - } - - TRI_WriteUnlockReadWriteLock(&logger->_statusLock); - - return res; -} - -//////////////////////////////////////////////////////////////////////////////// -/// @brief stop the replication logger -//////////////////////////////////////////////////////////////////////////////// - -int TRI_StopReplicationLogger (TRI_replication_logger_t* logger) { - int res; - - if (logger->_vocbase->_type == TRI_VOCBASE_TYPE_COORDINATOR) { - return TRI_ERROR_CLUSTER_UNSUPPORTED; - } - - res = TRI_ERROR_NO_ERROR; - - TRI_WriteLockReadWriteLock(&logger->_statusLock); - - if (logger->_state._active) { - res = StopReplicationLogger(logger); - } - - TRI_WriteUnlockReadWriteLock(&logger->_statusLock); - - return res; -} - -//////////////////////////////////////////////////////////////////////////////// -/// @brief get the current replication logger state -//////////////////////////////////////////////////////////////////////////////// - -int TRI_StateReplicationLogger (TRI_replication_logger_t* logger, - TRI_replication_logger_state_t* state) { - int res; - - TRI_ReadLockReadWriteLock(&logger->_statusLock); - - if (logger->_state._active) { - // use state from logger - res = GetStateReplicationLogger(logger, state); - } - else { - // read first/last directly from collection - res = GetStateInactive(logger, state); - } - - TRI_ReadUnlockReadWriteLock(&logger->_statusLock); - - return res; -} - -//////////////////////////////////////////////////////////////////////////////// -/// @brief get a JSON representation of a logger state -//////////////////////////////////////////////////////////////////////////////// - -TRI_json_t* TRI_JsonStateReplicationLogger (TRI_replication_logger_state_t const* state) { - TRI_json_t* json; - char* value; - char timeString[24]; - - json = TRI_CreateArray2Json(TRI_CORE_MEM_ZONE, 3); - - if (json == NULL) { - return NULL; - } - - // add replication state - TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, json, "running", TRI_CreateBooleanJson(TRI_CORE_MEM_ZONE, state->_active)); - - value = TRI_StringUInt64(state->_lastLogTick); - TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, json, "lastLogTick", TRI_CreateStringJson(TRI_CORE_MEM_ZONE, value)); - - TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, json, "totalEvents", TRI_CreateNumberJson(TRI_CORE_MEM_ZONE, (double) state->_totalEvents)); - - TRI_GetTimeStampReplication(timeString, sizeof(timeString) - 1); - TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, json, "time", TRI_CreateStringCopyJson(TRI_CORE_MEM_ZONE, timeString)); - - return json; -} - -//////////////////////////////////////////////////////////////////////////////// -/// @brief return a JSON representation of the replication logger -//////////////////////////////////////////////////////////////////////////////// - -TRI_json_t* TRI_JsonReplicationLogger (TRI_replication_logger_t* logger) { - TRI_replication_logger_state_t state; - TRI_json_t* json; - TRI_json_t* server; - TRI_json_t* clients; - int res; - - res = TRI_StateReplicationLogger(logger, &state); - - if (res != TRI_ERROR_NO_ERROR) { - return NULL; - } - - json = TRI_CreateArrayJson(TRI_CORE_MEM_ZONE); - - if (json == NULL) { - return NULL; - } - - TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, json, "state", TRI_JsonStateReplicationLogger(&state)); - - // add server info - server = TRI_CreateArrayJson(TRI_CORE_MEM_ZONE); - - if (server != NULL) { - TRI_server_id_t serverId; - - TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, server, "version", TRI_CreateStringCopyJson(TRI_CORE_MEM_ZONE, TRI_VERSION)); - - serverId = TRI_GetIdServer(); - TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, server, "serverId", TRI_CreateStringJson(TRI_CORE_MEM_ZONE, TRI_StringUInt64(serverId))); - - TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, json, "server", server); - } - - clients = TRI_JsonClientsReplicationLogger(logger); - - if (clients != NULL) { - TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, json, "clients", clients); - } - - return json; -} - -// ----------------------------------------------------------------------------- -// --SECTION-- END-OF-FILE -// ----------------------------------------------------------------------------- - -// Local Variables: -// mode: outline-minor -// outline-regexp: "/// @brief\\|/// {@inheritDoc}\\|/// @page\\|// --SECTION--\\|/// @\\}" -// End: diff --git a/arangod/VocBase/replication-logger.h b/arangod/VocBase/replication-logger.h deleted file mode 100644 index 4c9f24e615..0000000000 --- a/arangod/VocBase/replication-logger.h +++ /dev/null @@ -1,213 +0,0 @@ -//////////////////////////////////////////////////////////////////////////////// -/// @brief replication logger -/// -/// @file -/// -/// DISCLAIMER -/// -/// Copyright 2014 ArangoDB GmbH, Cologne, Germany -/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany -/// -/// Licensed under the Apache License, Version 2.0 (the "License"); -/// you may not use this file except in compliance with the License. -/// You may obtain a copy of the License at -/// -/// http://www.apache.org/licenses/LICENSE-2.0 -/// -/// Unless required by applicable law or agreed to in writing, software -/// distributed under the License is distributed on an "AS IS" BASIS, -/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -/// See the License for the specific language governing permissions and -/// limitations under the License. -/// -/// Copyright holder is ArangoDB GmbH, Cologne, Germany -/// -/// @author Jan Steemann -/// @author Copyright 2014, ArangoDB GmbH, Cologne, Germany -/// @author Copyright 2011-2013, triAGENS GmbH, Cologne, Germany -//////////////////////////////////////////////////////////////////////////////// - -#ifndef ARANGODB_VOC_BASE_REPLICATION__LOGGER_H -#define ARANGODB_VOC_BASE_REPLICATION__LOGGER_H 1 - -#include "Basics/Common.h" - -#include "BasicsC/associative.h" -#include "BasicsC/locks.h" -#include "BasicsC/vector.h" - -#include "VocBase/replication-common.h" -#include "VocBase/vocbase.h" -#include "VocBase/voc-types.h" - -// ----------------------------------------------------------------------------- -// --SECTION-- forward declarations -// ----------------------------------------------------------------------------- - -struct TRI_df_marker_s; -struct TRI_document_collection_t; -struct TRI_doc_mptr_t; -struct TRI_index_s; -struct TRI_json_s; -struct TRI_transaction_s; -struct TRI_transaction_collection_s; -struct TRI_vocbase_s; -struct TRI_vocbase_col_s; - -// ----------------------------------------------------------------------------- -// --SECTION-- REPLICATION LOGGER -// ----------------------------------------------------------------------------- - -// ----------------------------------------------------------------------------- -// --SECTION-- public types -// ----------------------------------------------------------------------------- - -//////////////////////////////////////////////////////////////////////////////// -/// @brief logger configuration -//////////////////////////////////////////////////////////////////////////////// - -typedef struct TRI_replication_logger_configuration_s { - uint64_t _maxEvents; - uint64_t _maxEventsSize; - bool _logRemoteChanges; - bool _autoStart; -} -TRI_replication_logger_configuration_t; - -//////////////////////////////////////////////////////////////////////////////// -/// @brief state information about replication logging -//////////////////////////////////////////////////////////////////////////////// - -typedef struct TRI_replication_logger_state_s { - TRI_voc_tick_t _lastLogTick; - uint64_t _totalEvents; - bool _active; -} -TRI_replication_logger_state_t; - -//////////////////////////////////////////////////////////////////////////////// -/// @brief context information for replication logging -//////////////////////////////////////////////////////////////////////////////// - -typedef struct TRI_replication_logger_s { - TRI_read_write_lock_t _statusLock; - TRI_spin_t _idLock; - TRI_spin_t _bufferLock; - TRI_vector_pointer_t _buffers; - TRI_read_write_lock_t _clientsLock; - TRI_associative_pointer_t _clients; - - struct TRI_vocbase_s* _vocbase; - struct TRI_transaction_s* _trx; - struct TRI_transaction_collection_s* _trxCollection; - struct TRI_index_s* _cap; // cap constraint - - TRI_replication_logger_state_t _state; - TRI_replication_logger_configuration_t _configuration; - TRI_server_id_t _localServerId; - - char* _databaseName; -} -TRI_replication_logger_t; - -// ----------------------------------------------------------------------------- -// --SECTION-- constructors / destructors -// ----------------------------------------------------------------------------- - -//////////////////////////////////////////////////////////////////////////////// -/// @brief create a replication logger -//////////////////////////////////////////////////////////////////////////////// - -TRI_replication_logger_t* TRI_CreateReplicationLogger (struct TRI_vocbase_s*); - -//////////////////////////////////////////////////////////////////////////////// -/// @brief destroy a replication logger -//////////////////////////////////////////////////////////////////////////////// - -void TRI_DestroyReplicationLogger (TRI_replication_logger_t*); - -//////////////////////////////////////////////////////////////////////////////// -/// @brief free a replication logger -//////////////////////////////////////////////////////////////////////////////// - -void TRI_FreeReplicationLogger (TRI_replication_logger_t*); - -// ----------------------------------------------------------------------------- -// --SECTION-- public functions -// ----------------------------------------------------------------------------- - -//////////////////////////////////////////////////////////////////////////////// -/// @brief get a JSON representation of the replication logger configuration -//////////////////////////////////////////////////////////////////////////////// - -struct TRI_json_s* TRI_JsonConfigurationReplicationLogger (TRI_replication_logger_configuration_t const*); - -//////////////////////////////////////////////////////////////////////////////// -/// @brief configure the replication logger -//////////////////////////////////////////////////////////////////////////////// - -int TRI_ConfigureReplicationLogger (TRI_replication_logger_t*, - TRI_replication_logger_configuration_t const*); - -//////////////////////////////////////////////////////////////////////////////// -/// @brief copy a logger configuration -//////////////////////////////////////////////////////////////////////////////// - -void TRI_CopyConfigurationReplicationLogger (TRI_replication_logger_configuration_t const*, - TRI_replication_logger_configuration_t*); - -//////////////////////////////////////////////////////////////////////////////// -/// @brief return the list of clients as a JSON array -//////////////////////////////////////////////////////////////////////////////// - -struct TRI_json_s* TRI_JsonClientsReplicationLogger (TRI_replication_logger_t*); - -//////////////////////////////////////////////////////////////////////////////// -/// @brief insert an applier action into an action list -//////////////////////////////////////////////////////////////////////////////// - -void TRI_UpdateClientReplicationLogger (TRI_replication_logger_t*, - TRI_server_id_t, - TRI_voc_tick_t); - -//////////////////////////////////////////////////////////////////////////////// -/// @brief start the replication logger -//////////////////////////////////////////////////////////////////////////////// - -int TRI_StartReplicationLogger (TRI_replication_logger_t*); - -//////////////////////////////////////////////////////////////////////////////// -/// @brief stop the replication logger -//////////////////////////////////////////////////////////////////////////////// - -int TRI_StopReplicationLogger (TRI_replication_logger_t*); - -//////////////////////////////////////////////////////////////////////////////// -/// @brief get the current replication state -//////////////////////////////////////////////////////////////////////////////// - -int TRI_StateReplicationLogger (TRI_replication_logger_t*, - TRI_replication_logger_state_t*); - -//////////////////////////////////////////////////////////////////////////////// -/// @brief get a JSON representation of a logger state -//////////////////////////////////////////////////////////////////////////////// - -struct TRI_json_s* TRI_JsonStateReplicationLogger (TRI_replication_logger_state_t const*); - -//////////////////////////////////////////////////////////////////////////////// -/// @brief return a JSON representation of the replication logger -//////////////////////////////////////////////////////////////////////////////// - -struct TRI_json_s* TRI_JsonReplicationLogger (TRI_replication_logger_t*); - -#endif - -// ----------------------------------------------------------------------------- -// --SECTION-- END-OF-FILE -// ----------------------------------------------------------------------------- - -// Local Variables: -// mode: outline-minor -// outline-regexp: "/// @brief\\|/// {@inheritDoc}\\|/// @page\\|// --SECTION--\\|/// @\\}" -// End: diff --git a/arangod/VocBase/replication-master.cpp b/arangod/VocBase/replication-master.cpp index 46fab1d401..044970f796 100644 --- a/arangod/VocBase/replication-master.cpp +++ b/arangod/VocBase/replication-master.cpp @@ -52,8 +52,8 @@ void TRI_InitMasterInfoReplication (TRI_replication_master_info_t* info, info->_serverId = 0; info->_majorVersion = 0; info->_minorVersion = 0; - info->_state._lastLogTick = 0; - info->_state._active = false; + info->_lastLogTick = 0; + info->_active = false; } //////////////////////////////////////////////////////////////////////////////// @@ -81,7 +81,7 @@ void TRI_LogMasterInfoReplication (TRI_replication_master_info_t const* info, (unsigned long long) info->_serverId, info->_majorVersion, info->_minorVersion, - (unsigned long long) info->_state._lastLogTick); + (unsigned long long) info->_lastLogTick); } // ----------------------------------------------------------------------------- diff --git a/arangod/VocBase/replication-master.h b/arangod/VocBase/replication-master.h index 373b7c797f..5950e7d1f3 100644 --- a/arangod/VocBase/replication-master.h +++ b/arangod/VocBase/replication-master.h @@ -33,7 +33,6 @@ #include "Basics/Common.h" #include "VocBase/replication-common.h" -#include "VocBase/replication-logger.h" // ----------------------------------------------------------------------------- // --SECTION-- REPLICATION MASTER INFO @@ -52,7 +51,8 @@ typedef struct TRI_replication_master_info_s { TRI_server_id_t _serverId; int _majorVersion; int _minorVersion; - TRI_replication_logger_state_t _state; + TRI_voc_tick_t _lastLogTick; + bool _active; } TRI_replication_master_info_t; diff --git a/arangod/VocBase/server.cpp b/arangod/VocBase/server.cpp index f89cc73f6d..e6c1e5b709 100644 --- a/arangod/VocBase/server.cpp +++ b/arangod/VocBase/server.cpp @@ -49,7 +49,6 @@ #include "Utils/Exception.h" #include "VocBase/auth.h" #include "VocBase/replication-applier.h" -#include "VocBase/replication-logger.h" #include "VocBase/vocbase.h" #include "Wal/LogfileManager.h" #include "Wal/Marker.h" @@ -1651,7 +1650,6 @@ int TRI_InitServer (TRI_server_t* server, char const* appPath, char const* devAppPath, TRI_vocbase_defaults_t const* defaults, - bool disableLoggers, bool disableAppliers, bool iterateMarkersOnOpen) { @@ -1755,7 +1753,6 @@ int TRI_InitServer (TRI_server_t* server, TRI_InitMutex(&server->_createLock); - server->_disableReplicationLoggers = disableLoggers; server->_disableReplicationAppliers = disableAppliers; server->_initialised = true; @@ -2125,11 +2122,9 @@ int TRI_CreateCoordinatorDatabaseServer (TRI_server_t* server, TRI_ASSERT(vocbase != nullptr); - vocbase->_replicationLogger = TRI_CreateReplicationLogger(vocbase); vocbase->_replicationApplier = TRI_CreateReplicationApplier(vocbase); - if (vocbase->_replicationLogger == nullptr || - vocbase->_replicationApplier == nullptr) { + if (vocbase->_replicationApplier == nullptr) { TRI_DestroyInitialVocBase(vocbase); TRI_Free(TRI_UNKNOWN_MEM_ZONE, vocbase); diff --git a/arangod/VocBase/server.h b/arangod/VocBase/server.h index a6c4d1057d..2287b23f66 100644 --- a/arangod/VocBase/server.h +++ b/arangod/VocBase/server.h @@ -69,7 +69,6 @@ typedef struct TRI_server_s { char* _appPath; char* _devAppPath; - bool _disableReplicationLoggers; bool _disableReplicationAppliers; bool _iterateMarkersOnOpen; bool _hasCreatedSystemDatabase; @@ -104,7 +103,6 @@ int TRI_InitServer (TRI_server_t* server, char const* appPath, char const* devappPath, TRI_vocbase_defaults_t const*, - bool disableReplicationLogger, bool disableReplicationApplier, bool iterateMarkersOnOpen); diff --git a/arangod/VocBase/transaction.cpp b/arangod/VocBase/transaction.cpp index 12e940f227..a13399def4 100644 --- a/arangod/VocBase/transaction.cpp +++ b/arangod/VocBase/transaction.cpp @@ -36,7 +36,6 @@ #include "Utils/Exception.h" #include "VocBase/collection.h" #include "VocBase/document-collection.h" -#include "VocBase/replication-logger.h" #include "VocBase/server.h" #include "VocBase/vocbase.h" #include "Wal/DocumentOperation.h" diff --git a/arangod/VocBase/vocbase.cpp b/arangod/VocBase/vocbase.cpp index e502213fa7..158b13ad5e 100644 --- a/arangod/VocBase/vocbase.cpp +++ b/arangod/VocBase/vocbase.cpp @@ -54,7 +54,6 @@ #include "VocBase/document-collection.h" #include "VocBase/general-cursor.h" #include "VocBase/replication-applier.h" -#include "VocBase/replication-logger.h" #include "VocBase/server.h" #include "VocBase/transaction.h" #include "VocBase/vocbase-defaults.h" @@ -1355,7 +1354,6 @@ TRI_vocbase_t* TRI_CreateInitialVocBase (TRI_vocbase_type_e type, vocbase->_name = TRI_DuplicateStringZ(TRI_CORE_MEM_ZONE, name); vocbase->_authInfoLoaded = false; vocbase->_hasCompactor = false; - vocbase->_replicationLogger = nullptr; vocbase->_replicationApplier = nullptr; vocbase->_oldTransactions = nullptr; @@ -1432,11 +1430,6 @@ void TRI_DestroyInitialVocBase (TRI_vocbase_t* vocbase) { vocbase->_replicationApplier = nullptr; } - if (vocbase->_replicationLogger != nullptr) { - TRI_FreeReplicationLogger(vocbase->_replicationLogger); - vocbase->_replicationLogger = nullptr; - } - if (vocbase->_oldTransactions == nullptr) { delete vocbase->_oldTransactions; } @@ -1526,27 +1519,6 @@ TRI_vocbase_t* TRI_OpenVocBase (TRI_server_t* server, TRI_InitThread(&vocbase->_cleanup); TRI_StartThread(&vocbase->_cleanup, NULL, "[cleanup]", TRI_CleanupVocBase, vocbase); - vocbase->_replicationLogger = TRI_CreateReplicationLogger(vocbase); - - if (vocbase->_replicationLogger == NULL) { - // TODO - LOG_FATAL_AND_EXIT("initialising replication logger for database '%s' failed", name); - } - - if (vocbase->_replicationLogger->_configuration._autoStart) { - if (server->_disableReplicationLoggers) { - LOG_INFO("replication logger explicitly deactivated for database '%s'", name); - } - else { - res = TRI_StartReplicationLogger(vocbase->_replicationLogger); - - if (res != TRI_ERROR_NO_ERROR) { - // TODO - LOG_FATAL_AND_EXIT("unable to start replication logger for database '%s'", name); - } - } - } - vocbase->_replicationApplier = TRI_CreateReplicationApplier(vocbase); if (vocbase->_replicationApplier == NULL) { @@ -1584,7 +1556,6 @@ void TRI_DestroyVocBase (TRI_vocbase_t* vocbase) { // stop replication TRI_StopReplicationApplier(vocbase->_replicationApplier, false); - TRI_StopReplicationLogger(vocbase->_replicationLogger); TRI_InitVectorPointer(&collections, TRI_UNKNOWN_MEM_ZONE); diff --git a/arangod/VocBase/vocbase.h b/arangod/VocBase/vocbase.h index 4c67d6b743..5a03ea4ac7 100644 --- a/arangod/VocBase/vocbase.h +++ b/arangod/VocBase/vocbase.h @@ -310,7 +310,6 @@ typedef struct TRI_vocbase_s { std::set* _oldTransactions; - struct TRI_replication_logger_s* _replicationLogger; struct TRI_replication_applier_s* _replicationApplier; // state of the database diff --git a/arangod/Wal/LogfileManager.cpp b/arangod/Wal/LogfileManager.cpp index 3675a8c61f..a672d1598e 100644 --- a/arangod/Wal/LogfileManager.cpp +++ b/arangod/Wal/LogfileManager.cpp @@ -1210,6 +1210,20 @@ void LogfileManager::setCollectionDone (Logfile* logfile) { } } +//////////////////////////////////////////////////////////////////////////////// +/// @brief return the current state +//////////////////////////////////////////////////////////////////////////////// + +LogfileManagerState LogfileManager::state () { + LogfileManagerState state; + + // now fill the state + _slots->statistics(state.lastTick, state.numEvents); + state.timeString = getTimeString(); + + return state; +} + // ----------------------------------------------------------------------------- // --SECTION-- private methods // ----------------------------------------------------------------------------- diff --git a/arangod/Wal/LogfileManager.h b/arangod/Wal/LogfileManager.h index b1a3970af9..5e9926be75 100644 --- a/arangod/Wal/LogfileManager.h +++ b/arangod/Wal/LogfileManager.h @@ -59,13 +59,23 @@ namespace triagens { /// @brief state that is built up when scanning a WAL logfile during recovery //////////////////////////////////////////////////////////////////////////////// -struct RecoverState { - std::unordered_map collections; - std::unordered_set failedTransactions; - std::unordered_set droppedCollections; - std::unordered_set droppedDatabases; - TRI_voc_tick_t lastTick; -}; + struct RecoverState { + std::unordered_map collections; + std::unordered_set failedTransactions; + std::unordered_set droppedCollections; + std::unordered_set droppedDatabases; + TRI_voc_tick_t lastTick; + }; + +// ----------------------------------------------------------------------------- +// --SECTION-- LogfileManagerState +// ----------------------------------------------------------------------------- + + struct LogfileManagerState { + TRI_voc_tick_t lastTick; + uint64_t numEvents; + std::string timeString; + }; // ----------------------------------------------------------------------------- // --SECTION-- class LogfileManager @@ -217,6 +227,22 @@ struct RecoverState { return _slots; } +//////////////////////////////////////////////////////////////////////////////// +/// @brief allow or disallow writes to the WAL +//////////////////////////////////////////////////////////////////////////////// + + inline void allowWrites (bool value) { + _allowWrites = value; + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief whether or not we are in the recovery mode +//////////////////////////////////////////////////////////////////////////////// + + inline bool isInRecovery () const { + return _inRecovery; + } + //////////////////////////////////////////////////////////////////////////////// /// @brief registers a transaction //////////////////////////////////////////////////////////////////////////////// @@ -426,20 +452,10 @@ struct RecoverState { void setCollectionDone (Logfile*); //////////////////////////////////////////////////////////////////////////////// -/// @brief allow or disallow writes to the WAL +/// @brief return the current state //////////////////////////////////////////////////////////////////////////////// - inline void allowWrites (bool value) { - _allowWrites = value; - } - -//////////////////////////////////////////////////////////////////////////////// -/// @brief whether or not we are in the recovery mode -//////////////////////////////////////////////////////////////////////////////// - - inline bool isInRecovery () const { - return _inRecovery; - } + LogfileManagerState state (); // ----------------------------------------------------------------------------- // --SECTION-- private methods diff --git a/arangod/Wal/Slots.cpp b/arangod/Wal/Slots.cpp index e137cfd726..a65c828d55 100644 --- a/arangod/Wal/Slots.cpp +++ b/arangod/Wal/Slots.cpp @@ -59,7 +59,9 @@ Slots::Slots (LogfileManager* logfileManager, _handoutIndex(0), _recycleIndex(0), _logfile(nullptr), - _lastCommittedTick(0) { + _lastAssignedTick(0), + _lastCommittedTick(0), + _numEvents(0) { } //////////////////////////////////////////////////////////////////////////////// @@ -76,6 +78,17 @@ Slots::~Slots () { // --SECTION-- public methods // ----------------------------------------------------------------------------- +//////////////////////////////////////////////////////////////////////////////// +/// @brief get the statistics of the slots +//////////////////////////////////////////////////////////////////////////////// + +void Slots::statistics (Slot::TickType& lastTick, + uint64_t& numEvents) { + MUTEX_LOCKER(_lock); + lastTick = _lastCommittedTick; + numEvents = _numEvents; +} + //////////////////////////////////////////////////////////////////////////////// /// @brief execute a flush operation //////////////////////////////////////////////////////////////////////////////// @@ -236,6 +249,7 @@ void Slots::returnUsed (SlotInfo& slotInfo, { MUTEX_LOCKER(_lock); slotInfo.slot->setReturned(waitForSync); + ++_numEvents; } _logfileManager->signalSync(); @@ -510,8 +524,9 @@ Slot::TickType Slots::handout () { // wrap around _handoutIndex = 0; } - - return static_cast(TRI_NewTickServer()); + + _lastAssignedTick = static_cast(TRI_NewTickServer()); + return _lastAssignedTick; } //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/Wal/Slots.h b/arangod/Wal/Slots.h index 3a015e0794..59bf983b74 100644 --- a/arangod/Wal/Slots.h +++ b/arangod/Wal/Slots.h @@ -136,6 +136,13 @@ namespace triagens { public: +//////////////////////////////////////////////////////////////////////////////// +/// @brief get the statistics of the slots +//////////////////////////////////////////////////////////////////////////////// + + void statistics (Slot::TickType&, + uint64_t&); + //////////////////////////////////////////////////////////////////////////////// /// @brief execute a flush operation //////////////////////////////////////////////////////////////////////////////// @@ -281,12 +288,24 @@ namespace triagens { Logfile* _logfile; +//////////////////////////////////////////////////////////////////////////////// +/// @brief last assigned tick value +//////////////////////////////////////////////////////////////////////////////// + + Slot::TickType _lastAssignedTick; + //////////////////////////////////////////////////////////////////////////////// /// @brief last committed tick value //////////////////////////////////////////////////////////////////////////////// Slot::TickType _lastCommittedTick; +//////////////////////////////////////////////////////////////////////////////// +/// @brief number of log events handled +//////////////////////////////////////////////////////////////////////////////// + + uint64_t _numEvents; + }; } diff --git a/arangosh/V8Client/arangodump.cpp b/arangosh/V8Client/arangodump.cpp index 5b31d241f1..8840a297c5 100644 --- a/arangosh/V8Client/arangodump.cpp +++ b/arangosh/V8Client/arangodump.cpp @@ -529,7 +529,7 @@ static int DumpCollection (int fd, const string baseUrl = "/_api/replication/dump?collection=" + cid + "&chunkSize=" + StringUtils::itoa(ChunkSize) + - "&ticks=false&translateIds=true"; + "&ticks=false&translateIds=true&flush=false"; map headers; diff --git a/js/apps/system/aardvark/frontend/js/bootstrap/module-internal.js b/js/apps/system/aardvark/frontend/js/bootstrap/module-internal.js index 72121fbf11..96796f6367 100644 --- a/js/apps/system/aardvark/frontend/js/bootstrap/module-internal.js +++ b/js/apps/system/aardvark/frontend/js/bootstrap/module-internal.js @@ -1,8 +1,7 @@ /*jslint indent: 2, nomen: true, maxlen: 120, vars: true, white: true, plusplus: true, nonpropdel: true, proto: true */ /*jslint sloppy: true, regexp: true */ /*global require, module, Module, ArangoError, SleepAndRequeue, - REPLICATION_LOGGER_START, REPLICATION_LOGGER_STOP, REPLICATION_LOGGER_STATE, - REPLICATION_LOGGER_CONFIGURE, REPLICATION_APPLIER_CONFIGURE, REPLICATION_APPLIER_START, + REPLICATION_LOGGER_STATE, REPLICATION_LOGGER_CONFIGURE, REPLICATION_APPLIER_CONFIGURE, REPLICATION_APPLIER_START, REPLICATION_APPLIER_STOP, REPLICATION_APPLIER_FORGET, REPLICATION_APPLIER_STATE, REPLICATION_SYNCHRONISE, REPLICATION_SERVER_ID, CONFIGURE_ENDPOINT, REMOVE_ENDPOINT, LIST_ENDPOINTS, SYS_BASE64DECODE, SYS_BASE64ENCODE, SYS_DEBUG_SEGFAULT, @@ -264,24 +263,6 @@ // --SECTION-- public functions // ----------------------------------------------------------------------------- -//////////////////////////////////////////////////////////////////////////////// -/// @brief startReplicationLogger -//////////////////////////////////////////////////////////////////////////////// - - if (typeof REPLICATION_LOGGER_START !== "undefined") { - exports.startReplicationLogger = REPLICATION_LOGGER_START; - delete REPLICATION_LOGGER_START; - } - -//////////////////////////////////////////////////////////////////////////////// -/// @brief stopReplicationLogger -//////////////////////////////////////////////////////////////////////////////// - - if (typeof REPLICATION_LOGGER_STOP !== "undefined") { - exports.stopReplicationLogger = REPLICATION_LOGGER_STOP; - delete REPLICATION_LOGGER_STOP; - } - //////////////////////////////////////////////////////////////////////////////// /// @brief getStateReplicationLogger //////////////////////////////////////////////////////////////////////////////// diff --git a/js/common/bootstrap/module-internal.js b/js/common/bootstrap/module-internal.js index 72121fbf11..96796f6367 100644 --- a/js/common/bootstrap/module-internal.js +++ b/js/common/bootstrap/module-internal.js @@ -1,8 +1,7 @@ /*jslint indent: 2, nomen: true, maxlen: 120, vars: true, white: true, plusplus: true, nonpropdel: true, proto: true */ /*jslint sloppy: true, regexp: true */ /*global require, module, Module, ArangoError, SleepAndRequeue, - REPLICATION_LOGGER_START, REPLICATION_LOGGER_STOP, REPLICATION_LOGGER_STATE, - REPLICATION_LOGGER_CONFIGURE, REPLICATION_APPLIER_CONFIGURE, REPLICATION_APPLIER_START, + REPLICATION_LOGGER_STATE, REPLICATION_LOGGER_CONFIGURE, REPLICATION_APPLIER_CONFIGURE, REPLICATION_APPLIER_START, REPLICATION_APPLIER_STOP, REPLICATION_APPLIER_FORGET, REPLICATION_APPLIER_STATE, REPLICATION_SYNCHRONISE, REPLICATION_SERVER_ID, CONFIGURE_ENDPOINT, REMOVE_ENDPOINT, LIST_ENDPOINTS, SYS_BASE64DECODE, SYS_BASE64ENCODE, SYS_DEBUG_SEGFAULT, @@ -264,24 +263,6 @@ // --SECTION-- public functions // ----------------------------------------------------------------------------- -//////////////////////////////////////////////////////////////////////////////// -/// @brief startReplicationLogger -//////////////////////////////////////////////////////////////////////////////// - - if (typeof REPLICATION_LOGGER_START !== "undefined") { - exports.startReplicationLogger = REPLICATION_LOGGER_START; - delete REPLICATION_LOGGER_START; - } - -//////////////////////////////////////////////////////////////////////////////// -/// @brief stopReplicationLogger -//////////////////////////////////////////////////////////////////////////////// - - if (typeof REPLICATION_LOGGER_STOP !== "undefined") { - exports.stopReplicationLogger = REPLICATION_LOGGER_STOP; - delete REPLICATION_LOGGER_STOP; - } - //////////////////////////////////////////////////////////////////////////////// /// @brief getStateReplicationLogger //////////////////////////////////////////////////////////////////////////////// diff --git a/js/server/modules/org/arangodb/replication.js b/js/server/modules/org/arangodb/replication.js index 6378e1cdd1..ddef42f70b 100644 --- a/js/server/modules/org/arangodb/replication.js +++ b/js/server/modules/org/arangodb/replication.js @@ -38,32 +38,33 @@ var internal = require("internal"); // --SECTION-- private functions // ----------------------------------------------------------------------------- -//////////////////////////////////////////////////////////////////////////////// -/// @addtogroup ArangoShell -/// @{ -//////////////////////////////////////////////////////////////////////////////// - var logger = { }; var applier = { }; //////////////////////////////////////////////////////////////////////////////// -/// @brief starts the replication logger +/// @brief starts the replication logger - this is a no-op in ArangoDB 2.2 and +/// higher //////////////////////////////////////////////////////////////////////////////// logger.start = function () { 'use strict'; - return internal.startReplicationLogger(); + // the logger in ArangoDB 2.2 is now the WAL... + // so the logger cannot be started but is always running + return true; }; //////////////////////////////////////////////////////////////////////////////// -/// @brief stops the replication logger +/// @brief stops the replication logger - this is a no-op in ArangoDB 2.2 and +/// higher //////////////////////////////////////////////////////////////////////////////// logger.stop = function () { 'use strict'; - - return internal.stopReplicationLogger(); + + // the logger in ArangoDB 2.2 is now the WAL... + // so the logger cannot be stopped + return false; }; //////////////////////////////////////////////////////////////////////////////// @@ -80,14 +81,10 @@ logger.state = function () { /// @brief returns the configuration of the replication logger //////////////////////////////////////////////////////////////////////////////// -logger.properties = function (config) { +logger.properties = function () { 'use strict'; - if (config === undefined) { - return internal.configureReplicationLogger(); - } - - return internal.configureReplicationLogger(config); + return internal.configureReplicationLogger(); }; //////////////////////////////////////////////////////////////////////////////// @@ -148,19 +145,10 @@ applier.properties = function (config) { return internal.configureReplicationApplier(config); }; -//////////////////////////////////////////////////////////////////////////////// -/// @} -//////////////////////////////////////////////////////////////////////////////// - // ----------------------------------------------------------------------------- // --SECTION-- other functions // ----------------------------------------------------------------------------- -//////////////////////////////////////////////////////////////////////////////// -/// @addtogroup ArangoShell -/// @{ -//////////////////////////////////////////////////////////////////////////////// - //////////////////////////////////////////////////////////////////////////////// /// @brief performs a one-time synchronisation with a remote endpoint //////////////////////////////////////////////////////////////////////////////// @@ -181,28 +169,15 @@ var serverId = function () { return internal.serverId(); }; -//////////////////////////////////////////////////////////////////////////////// -/// @} -//////////////////////////////////////////////////////////////////////////////// - // ----------------------------------------------------------------------------- // --SECTION-- module exports // ----------------------------------------------------------------------------- -//////////////////////////////////////////////////////////////////////////////// -/// @addtogroup ArangoShell -/// @{ -//////////////////////////////////////////////////////////////////////////////// - exports.logger = logger; exports.applier = applier; exports.sync = sync; exports.serverId = serverId; -//////////////////////////////////////////////////////////////////////////////// -/// @} -//////////////////////////////////////////////////////////////////////////////// - // ----------------------------------------------------------------------------- // --SECTION-- END-OF-FILE // -----------------------------------------------------------------------------