1
0
Fork 0
arangodb/arangod/Replication/GlobalTailingSyncer.cpp

151 lines
5.1 KiB
C++

////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2017 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Simon Grätzer
////////////////////////////////////////////////////////////////////////////////
#include "GlobalTailingSyncer.h"
#include "Basics/Thread.h"
#include "Logger/LogMacros.h"
#include "Logger/Logger.h"
#include "Logger/LoggerStream.h"
#include "Replication/GlobalInitialSyncer.h"
#include "Replication/ReplicationFeature.h"
#include <velocypack/Iterator.h>
#include <velocypack/velocypack-aliases.h>
using namespace arangodb;
using namespace arangodb::basics;
using namespace arangodb::httpclient;
GlobalTailingSyncer::GlobalTailingSyncer(ReplicationApplierConfiguration const& configuration,
TRI_voc_tick_t initialTick,
bool useTick, TRI_voc_tick_t barrierId)
: TailingSyncer(ReplicationFeature::INSTANCE->globalReplicationApplier(),
configuration, initialTick, useTick, barrierId),
_queriedTranslations(false) {
_ignoreDatabaseMarkers = false;
_state.databaseName = TRI_VOC_SYSTEM_DATABASE;
}
std::string GlobalTailingSyncer::tailingBaseUrl(std::string const& command) {
TRI_ASSERT(!_state.master.endpoint.empty());
TRI_ASSERT(_state.master.serverId != 0);
TRI_ASSERT(_state.master.majorVersion != 0);
if (_state.master.majorVersion < 3 ||
(_state.master.majorVersion == 3 && _state.master.minorVersion <= 2)) {
std::string err =
"You need >= 3.3 to perform the replication of an entire server";
LOG_TOPIC("75fa1", ERR, Logger::REPLICATION) << err;
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_NOT_IMPLEMENTED, err);
}
return TailingSyncer::WalAccessUrl + "/" + command + "?global=true&";
}
/// @brief save the current applier state
Result GlobalTailingSyncer::saveApplierState() {
return _applier->persistStateResult(false);
}
bool GlobalTailingSyncer::skipMarker(VPackSlice const& slice) {
// we do not have a "cname" attribute in the marker...
// now check for a globally unique id attribute ("cuid")
// if its present, then we will use our local cuid -> collection name
// translation table
VPackSlice const name = slice.get("cuid");
if (!name.isString()) {
return false;
}
if (_state.master.majorVersion < 3 ||
(_state.master.majorVersion == 3 && _state.master.minorVersion <= 2)) {
// globallyUniqueId only exists in 3.3 and higher
return false;
}
if (!_queriedTranslations) {
// no translations yet... query master inventory to find names of all
// collections
try {
GlobalInitialSyncer init(_state.applier);
VPackBuilder inventoryResponse;
Result res = init.getInventory(inventoryResponse);
_queriedTranslations = true;
if (res.fail()) {
LOG_TOPIC("e25ae", ERR, Logger::REPLICATION)
<< "got error while fetching master inventory for collection name "
"translations: "
<< res.errorMessage();
return false;
}
VPackSlice invSlice = inventoryResponse.slice();
if (!invSlice.isObject()) {
return false;
}
invSlice = invSlice.get("databases");
if (!invSlice.isObject()) {
return false;
}
for (auto it : VPackObjectIterator(invSlice)) {
VPackSlice dbObj = it.value;
if (!dbObj.isObject()) {
continue;
}
dbObj = dbObj.get("collections");
if (!dbObj.isArray()) {
return false;
}
for (VPackSlice it : VPackArrayIterator(dbObj)) {
if (!it.isObject()) {
continue;
}
VPackSlice c = it.get("parameters");
if (c.hasKey("name") && c.hasKey("globallyUniqueId")) {
// we'll store everything for all databases in a global hash table,
// as we expect the globally unique ids to be unique...
_translations[c.get("globallyUniqueId").copyString()] =
c.get("name").copyString();
}
}
}
} catch (std::exception const& ex) {
LOG_TOPIC("2c5c2", ERR, Logger::REPLICATION)
<< "got error while fetching inventory: " << ex.what();
return false;
}
}
// look up cuid in translations map
auto it = _translations.find(name.copyString());
if (it != _translations.end()) {
return isExcludedCollection((*it).second);
}
return false;
}