1
0
Fork 0

port from 3.4 (#8275)

This commit is contained in:
Jan 2019-02-28 14:36:29 +01:00 committed by GitHub
parent bed5a5f9a2
commit 5d2ab0c901
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 243 additions and 159 deletions

View File

@ -47,6 +47,7 @@
#include <velocypack/Iterator.h> #include <velocypack/Iterator.h>
#include <velocypack/Parser.h> #include <velocypack/Parser.h>
#include <velocypack/Slice.h> #include <velocypack/Slice.h>
#include <velocypack/StringRef.h>
#include <velocypack/velocypack-aliases.h> #include <velocypack/velocypack-aliases.h>
using namespace arangodb; using namespace arangodb;
@ -54,6 +55,10 @@ using namespace arangodb::basics;
using namespace arangodb::httpclient; using namespace arangodb::httpclient;
using namespace arangodb::rest; using namespace arangodb::rest;
namespace {
arangodb::velocypack::StringRef const cuidRef("cuid");
}
DatabaseTailingSyncer::DatabaseTailingSyncer(TRI_vocbase_t& vocbase, DatabaseTailingSyncer::DatabaseTailingSyncer(TRI_vocbase_t& vocbase,
ReplicationApplierConfiguration const& configuration, ReplicationApplierConfiguration const& configuration,
TRI_voc_tick_t initialTick, TRI_voc_tick_t initialTick,
@ -238,7 +243,7 @@ bool DatabaseTailingSyncer::skipMarker(VPackSlice const& slice) {
// now check for a globally unique id attribute ("cuid") // now check for a globally unique id attribute ("cuid")
// if its present, then we will use our local cuid -> collection name // if its present, then we will use our local cuid -> collection name
// translation table // translation table
VPackSlice const name = slice.get("cuid"); VPackSlice const name = slice.get(::cuidRef);
if (!name.isString()) { if (!name.isString()) {
return false; return false;
} }

View File

@ -41,11 +41,13 @@
#include "StorageEngine/PhysicalCollection.h" #include "StorageEngine/PhysicalCollection.h"
#include "StorageEngine/StorageEngine.h" #include "StorageEngine/StorageEngine.h"
#include "StorageEngine/TransactionState.h" #include "StorageEngine/TransactionState.h"
#include "Transaction/Helpers.h"
#include "Transaction/StandaloneContext.h" #include "Transaction/StandaloneContext.h"
#include "Utils/CollectionGuard.h" #include "Utils/CollectionGuard.h"
#include "Utils/OperationOptions.h" #include "Utils/OperationOptions.h"
#include "Utils/OperationResult.h" #include "Utils/OperationResult.h"
#include "Utils/SingleCollectionTransaction.h" #include "Utils/SingleCollectionTransaction.h"
#include "VocBase/LocalDocumentId.h"
#include "VocBase/LogicalCollection.h" #include "VocBase/LogicalCollection.h"
#include "VocBase/LogicalView.h" #include "VocBase/LogicalView.h"
#include "VocBase/voc-types.h" #include "VocBase/voc-types.h"
@ -54,10 +56,16 @@
#include <velocypack/Builder.h> #include <velocypack/Builder.h>
#include <velocypack/Collection.h> #include <velocypack/Collection.h>
#include <velocypack/Slice.h> #include <velocypack/Slice.h>
#include <velocypack/StringRef.h>
#include <velocypack/velocypack-aliases.h> #include <velocypack/velocypack-aliases.h>
namespace { namespace {
arangodb::velocypack::StringRef const cuidRef("cuid");
arangodb::velocypack::StringRef const dbRef("db");
arangodb::velocypack::StringRef const databaseRef("database");
arangodb::velocypack::StringRef const globallyUniqueIdRef("globallyUniqueId");
/// @brief extract the collection id from VelocyPack /// @brief extract the collection id from VelocyPack
TRI_voc_cid_t getCid(arangodb::velocypack::Slice const& slice) { TRI_voc_cid_t getCid(arangodb::velocypack::Slice const& slice) {
return arangodb::basics::VelocyPackHelper::extractIdValue(slice); return arangodb::basics::VelocyPackHelper::extractIdValue(slice);
@ -130,20 +138,43 @@ arangodb::Result applyCollectionDumpMarkerInternal(
options.indexOperationMode = arangodb::Index::OperationMode::internal; options.indexOperationMode = arangodb::Index::OperationMode::internal;
try { try {
OperationResult opRes;
bool useReplace = false;
VPackSlice keySlice = arangodb::transaction::helpers::extractKeyFromDocument(slice);
// if we are about to process a single document marker (outside of a multi-document
// transaction), we first check if the target document exists. if yes, we don't
// try an insert (which would fail anyway) but carry on with a replace.
if (trx.isSingleOperationTransaction() && keySlice.isString()) {
arangodb::LocalDocumentId const oldDocumentId =
coll->getPhysical()->lookupKey(&trx, keySlice);
if (oldDocumentId.isSet()) {
useReplace = true;
opRes.result.reset(TRI_ERROR_NO_ERROR, keySlice.copyString());
}
}
if (!useReplace) {
// try insert first // try insert first
OperationResult opRes = trx.insert(coll->name(), slice, options); opRes = trx.insert(coll->name(), slice, options);
if (opRes.is(TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED)) { if (opRes.is(TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED)) {
bool useReplace = true; useReplace = true;
}
}
if (useReplace) {
// conflicting key is contained in opRes.errorMessage() now // conflicting key is contained in opRes.errorMessage() now
VPackSlice keySlice = slice.get(arangodb::StaticStrings::KeyString);
if (keySlice.isString()) { if (keySlice.isString()) {
// let's check if the key we have got is the same as the one // let's check if the key we have got is the same as the one
// that we would like to insert // that we would like to insert
if (keySlice.copyString() != opRes.errorMessage()) { if (keySlice.copyString() != opRes.errorMessage()) {
// different key // different key
if (trx.isSingleOperationTransaction()) {
// return a special error code from here, with the key of
// the conflicting document as the error message :-|
return Result(TRI_ERROR_ARANGO_TRY_AGAIN, opRes.errorMessage());
}
VPackBuilder tmp; VPackBuilder tmp;
tmp.add(VPackValue(opRes.errorMessage())); tmp.add(VPackValue(opRes.errorMessage()));
@ -429,9 +460,9 @@ TRI_vocbase_t* Syncer::resolveVocbase(VPackSlice const& slice) {
std::string name; std::string name;
if (slice.isObject()) { if (slice.isObject()) {
VPackSlice tmp; VPackSlice tmp;
if ((tmp = slice.get("db")).isString()) { // wal access protocol if ((tmp = slice.get(::dbRef)).isString()) { // wal access protocol
name = tmp.copyString(); name = tmp.copyString();
} else if ((tmp = slice.get("database")).isString()) { // pre 3.3 } else if ((tmp = slice.get(::databaseRef)).isString()) { // pre 3.3
name = tmp.copyString(); name = tmp.copyString();
} }
} else if (slice.isString()) { } else if (slice.isString()) {
@ -471,9 +502,9 @@ std::shared_ptr<LogicalCollection> Syncer::resolveCollection(
if (!_state.master.simulate32Client() || cid == 0) { if (!_state.master.simulate32Client() || cid == 0) {
VPackSlice uuid; VPackSlice uuid;
if ((uuid = slice.get("cuid")).isString()) { if ((uuid = slice.get(::cuidRef)).isString()) {
return vocbase.lookupCollectionByUuid(uuid.copyString()); return vocbase.lookupCollectionByUuid(uuid.copyString());
} else if ((uuid = slice.get("globallyUniqueId")).isString()) { } else if ((uuid = slice.get(::globallyUniqueIdRef)).isString()) {
return vocbase.lookupCollectionByUuid(uuid.copyString()); return vocbase.lookupCollectionByUuid(uuid.copyString());
} }
} }
@ -854,9 +885,9 @@ Result Syncer::dropView(arangodb::velocypack::Slice const& slice, bool reportErr
return Result(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND); return Result(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND);
} }
VPackSlice guidSlice = slice.get("globallyUniqueId"); VPackSlice guidSlice = slice.get(::globallyUniqueIdRef);
if (guidSlice.isNone()) { if (guidSlice.isNone()) {
guidSlice = slice.get("cuid"); guidSlice = slice.get(::cuidRef);
} }
if (!guidSlice.isString() || guidSlice.getStringLength() == 0) { if (!guidSlice.isString() || guidSlice.getStringLength() == 0) {
return Result(TRI_ERROR_REPLICATION_INVALID_RESPONSE, return Result(TRI_ERROR_REPLICATION_INVALID_RESPONSE,

View File

@ -55,6 +55,7 @@
#include <velocypack/Iterator.h> #include <velocypack/Iterator.h>
#include <velocypack/Parser.h> #include <velocypack/Parser.h>
#include <velocypack/Slice.h> #include <velocypack/Slice.h>
#include <velocypack/StringRef.h>
#include <velocypack/velocypack-aliases.h> #include <velocypack/velocypack-aliases.h>
using namespace arangodb; using namespace arangodb;
@ -64,6 +65,10 @@ using namespace arangodb::rest;
namespace { namespace {
static arangodb::velocypack::StringRef const cnameRef("cname");
static arangodb::velocypack::StringRef const dataRef("data");
static arangodb::velocypack::StringRef const tickRef("tick");
bool hasHeader(std::unique_ptr<httpclient::SimpleHttpResult> const& response, bool hasHeader(std::unique_ptr<httpclient::SimpleHttpResult> const& response,
std::string const& name) { std::string const& name) {
return response->hasHeaderField(name); return response->hasHeaderField(name);
@ -104,7 +109,6 @@ TailingSyncer::TailingSyncer(ReplicationApplier* applier,
_usersModified(false), _usersModified(false),
_useTick(useTick), _useTick(useTick),
_requireFromPresent(configuration._requireFromPresent), _requireFromPresent(configuration._requireFromPresent),
_supportsSingleOperations(false),
_ignoreRenameCreateDrop(false), _ignoreRenameCreateDrop(false),
_ignoreDatabaseMarkers(true), _ignoreDatabaseMarkers(true),
_workInParallel(false) { _workInParallel(false) {
@ -115,7 +119,6 @@ TailingSyncer::TailingSyncer(ReplicationApplier* applier,
// FIXME: move this into engine code // FIXME: move this into engine code
std::string const& engineName = EngineSelectorFeature::ENGINE->typeName(); std::string const& engineName = EngineSelectorFeature::ENGINE->typeName();
_supportsSingleOperations = (engineName == "mmfiles");
// Replication for RocksDB expects only one open transaction at a time // Replication for RocksDB expects only one open transaction at a time
_supportsMultipleOpenTransactions = (engineName != "rocksdb"); _supportsMultipleOpenTransactions = (engineName != "rocksdb");
@ -161,22 +164,14 @@ void TailingSyncer::abortOngoingTransactions() noexcept {
} }
/// @brief whether or not a marker should be skipped /// @brief whether or not a marker should be skipped
bool TailingSyncer::skipMarker(TRI_voc_tick_t firstRegularTick, VPackSlice const& slice) { bool TailingSyncer::skipMarker(TRI_voc_tick_t firstRegularTick, VPackSlice const& slice,
TRI_voc_tick_t actualMarkerTick, TRI_replication_operation_e type) {
TRI_ASSERT(slice.isObject()); TRI_ASSERT(slice.isObject());
bool tooOld = false; bool tooOld = (actualMarkerTick < firstRegularTick);
VPackSlice tickSlice = slice.get("tick");
if (tickSlice.isString() && tickSlice.getStringLength() > 0) {
VPackValueLength len = 0;
char const* str = tickSlice.getStringUnchecked(len);
tooOld = (NumberUtils::atoi_zero<TRI_voc_tick_t>(str, str + len) < firstRegularTick);
if (tooOld) { if (tooOld) {
int typeValue = VelocyPackHelper::getNumericValue<int>(slice, "type", 0);
// handle marker type // handle marker type
TRI_replication_operation_e type =
static_cast<TRI_replication_operation_e>(typeValue);
if (type == REPLICATION_MARKER_DOCUMENT || type == REPLICATION_MARKER_REMOVE || if (type == REPLICATION_MARKER_DOCUMENT || type == REPLICATION_MARKER_REMOVE ||
type == REPLICATION_TRANSACTION_START || type == REPLICATION_TRANSACTION_ABORT || type == REPLICATION_TRANSACTION_START || type == REPLICATION_TRANSACTION_ABORT ||
@ -184,7 +179,8 @@ bool TailingSyncer::skipMarker(TRI_voc_tick_t firstRegularTick, VPackSlice const
// read "tid" entry from marker // read "tid" entry from marker
VPackSlice tidSlice = slice.get("tid"); VPackSlice tidSlice = slice.get("tid");
if (tidSlice.isString() && tidSlice.getStringLength() > 0) { if (tidSlice.isString() && tidSlice.getStringLength() > 0) {
str = tidSlice.getStringUnchecked(len); VPackValueLength len;
char const* str = tidSlice.getStringUnchecked(len);
TRI_voc_tid_t tid = NumberUtils::atoi_zero<TRI_voc_tid_t>(str, str + len); TRI_voc_tid_t tid = NumberUtils::atoi_zero<TRI_voc_tid_t>(str, str + len);
if (tid > 0 && _ongoingTransactions.find(tid) != _ongoingTransactions.end()) { if (tid > 0 && _ongoingTransactions.find(tid) != _ongoingTransactions.end()) {
@ -195,11 +191,6 @@ bool TailingSyncer::skipMarker(TRI_voc_tick_t firstRegularTick, VPackSlice const
} }
} }
} }
}
if (tooOld) {
return true;
}
// the transient applier state is just used for one shard / collection // the transient applier state is just used for one shard / collection
if (_state.applier._restrictCollections.empty()) { if (_state.applier._restrictCollections.empty()) {
@ -211,7 +202,7 @@ bool TailingSyncer::skipMarker(TRI_voc_tick_t firstRegularTick, VPackSlice const
return false; return false;
} }
VPackSlice const name = slice.get("cname"); VPackSlice const name = slice.get(::cnameRef);
if (name.isString()) { if (name.isString()) {
return isExcludedCollection(name.copyString()); return isExcludedCollection(name.copyString());
} }
@ -346,10 +337,11 @@ Result TailingSyncer::processDocument(TRI_replication_operation_e type,
return Result(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND); return Result(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND);
} }
bool isSystem = coll->system(); bool const isSystem = coll->system();
bool const isUsers = coll->name() == TRI_COL_NAME_USERS;
// extract "data" // extract "data"
VPackSlice const data = slice.get("data"); VPackSlice const data = slice.get(::dataRef);
if (!data.isObject()) { if (!data.isObject()) {
return Result(TRI_ERROR_REPLICATION_INVALID_RESPONSE, return Result(TRI_ERROR_REPLICATION_INVALID_RESPONSE,
@ -374,8 +366,8 @@ Result TailingSyncer::processDocument(TRI_replication_operation_e type,
} }
// extract "tid" // extract "tid"
std::string const transactionId = arangodb::velocypack::StringRef const transactionId =
VelocyPackHelper::getStringValue(slice, "tid", ""); VelocyPackHelper::getStringRef(slice, "tid", "");
TRI_voc_tid_t tid = 0; TRI_voc_tid_t tid = 0;
if (!transactionId.empty()) { if (!transactionId.empty()) {
// operation is part of a transaction // operation is part of a transaction
@ -415,12 +407,13 @@ Result TailingSyncer::processDocument(TRI_replication_operation_e type,
trx->addCollectionAtRuntime(coll->id(), coll->name(), AccessMode::Type::EXCLUSIVE); trx->addCollectionAtRuntime(coll->id(), coll->name(), AccessMode::Type::EXCLUSIVE);
Result r = applyCollectionDumpMarker(*trx, coll, type, applySlice); Result r = applyCollectionDumpMarker(*trx, coll, type, applySlice);
TRI_ASSERT(!r.is(TRI_ERROR_ARANGO_TRY_AGAIN));
if (r.errorNumber() == TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED && isSystem) { if (r.errorNumber() == TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED && isSystem) {
// ignore unique constraint violations for system collections // ignore unique constraint violations for system collections
r.reset(); r.reset();
} }
if (r.ok() && coll->name() == TRI_COL_NAME_USERS) { if (r.ok() && isUsers) {
_usersModified = true; _usersModified = true;
} }
@ -428,13 +421,37 @@ Result TailingSyncer::processDocument(TRI_replication_operation_e type,
} }
// standalone operation // standalone operation
// this variable will store the key of a conflicting document we will have to remove first
// it is initially empty, and may be populated by a failing operation
std::string conflictDocumentKey;
// normally we will go into this while loop just once. only in the very exceptional case
// that there is a unique constraint violation in one of the secondary indexes we will
// get into the while loop a second time
int tries = 0;
while (tries++ < 10) {
if (!conflictDocumentKey.empty()) {
// a rather exceptional case in which we have to remove a conflicting document,
// which is conflicting with the to-be-inserted document in one the unique
// secondary indexes
// intentionally ignore the return code here, as the operation will be followed
// by yet another insert/replace
removeSingleDocument(coll, conflictDocumentKey);
conflictDocumentKey.clear();
}
// update the apply tick for all standalone operations // update the apply tick for all standalone operations
SingleCollectionTransaction trx(transaction::StandaloneContext::Create(*vocbase), SingleCollectionTransaction trx(transaction::StandaloneContext::Create(*vocbase),
*coll, AccessMode::Type::EXCLUSIVE); *coll, AccessMode::Type::EXCLUSIVE);
if (_supportsSingleOperations) { // we will always check if the target document already exists and then either
// carry out an insert or a replace.
// so we will be carrying out either a read-then-insert or a read-then-replace
// operation, which is a single write operation. and for MMFiles this is also
// safe as we have the exclusive lock on the underlying collection anyway
trx.addHint(transaction::Hints::Hint::SINGLE_OPERATION); trx.addHint(transaction::Hints::Hint::SINGLE_OPERATION);
}
Result res = trx.begin(); Result res = trx.begin();
@ -445,9 +462,19 @@ Result TailingSyncer::processDocument(TRI_replication_operation_e type,
res.errorMessage()); res.errorMessage());
} }
std::string collectionName = trx.name();
res = applyCollectionDumpMarker(trx, coll, type, applySlice); res = applyCollectionDumpMarker(trx, coll, type, applySlice);
if (res.is(TRI_ERROR_ARANGO_TRY_AGAIN)) {
// TRY_AGAIN we will only be getting when there is a conflicting document.
// the key of the conflicting document can be found in the errorMessage
// of the result :-|
TRI_ASSERT(type != REPLICATION_MARKER_REMOVE);
TRI_ASSERT(!res.errorMessage().empty());
conflictDocumentKey = std::move(res.errorMessage());
// restart the while loop above
continue;
}
if (res.errorNumber() == TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED && isSystem) { if (res.errorNumber() == TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED && isSystem) {
// ignore unique constraint violations for system collections // ignore unique constraint violations for system collections
res.reset(); res.reset();
@ -457,7 +484,7 @@ Result TailingSyncer::processDocument(TRI_replication_operation_e type,
if (res.ok()) { if (res.ok()) {
res = trx.commit(); res = trx.commit();
if (res.ok() && collectionName == TRI_COL_NAME_USERS) { if (res.ok() && isUsers) {
_usersModified = true; _usersModified = true;
} }
} }
@ -465,6 +492,37 @@ Result TailingSyncer::processDocument(TRI_replication_operation_e type,
return res; return res;
} }
return Result(TRI_ERROR_INTERNAL, "invalid state reached in processDocument");
}
Result TailingSyncer::removeSingleDocument(LogicalCollection* coll, std::string const& key) {
SingleCollectionTransaction trx(transaction::StandaloneContext::Create(coll->vocbase()),
*coll, AccessMode::Type::EXCLUSIVE);
trx.addHint(transaction::Hints::Hint::SINGLE_OPERATION);
Result res = trx.begin();
if (res.fail()) {
return res;
}
OperationOptions options;
options.silent = true;
options.ignoreRevs = true;
options.isRestore = true;
options.waitForSync = false;
VPackBuilder tmp;
tmp.add(VPackValue(key));
OperationResult opRes = trx.remove(coll->name(), tmp.slice(), options);
if (opRes.fail()) {
return opRes.result;
}
return trx.commit();
}
/// @brief starts a transaction, based on the VelocyPack provided /// @brief starts a transaction, based on the VelocyPack provided
Result TailingSyncer::startTransaction(VPackSlice const& slice) { Result TailingSyncer::startTransaction(VPackSlice const& slice) {
// {"type":2200,"tid":"230920705812199", "database": "123", // {"type":2200,"tid":"230920705812199", "database": "123",
@ -794,29 +852,8 @@ Result TailingSyncer::changeView(VPackSlice const& slice) {
/// @brief apply a single marker from the continuous log /// @brief apply a single marker from the continuous log
Result TailingSyncer::applyLogMarker(VPackSlice const& slice, TRI_voc_tick_t firstRegularTick, Result TailingSyncer::applyLogMarker(VPackSlice const& slice, TRI_voc_tick_t firstRegularTick,
TRI_voc_tick_t& markerTick) { TRI_voc_tick_t markerTick, TRI_replication_operation_e type) {
// reset found tick value to 0
markerTick = 0;
if (!slice.isObject()) {
return Result(TRI_ERROR_REPLICATION_INVALID_RESPONSE,
"marker slice is no object");
}
// fetch marker "type"
int typeValue = VelocyPackHelper::getNumericValue<int>(slice, "type", 0);
// fetch "tick"
VPackSlice tickSlice = slice.get("tick");
if (tickSlice.isString()) {
VPackValueLength length;
char const* p = tickSlice.getStringUnchecked(length);
// update the caller's tick
markerTick = NumberUtils::atoi_zero<TRI_voc_tick_t>(p, p + length);
}
// handle marker type // handle marker type
TRI_replication_operation_e type = static_cast<TRI_replication_operation_e>(typeValue);
if (type == REPLICATION_MARKER_DOCUMENT || type == REPLICATION_MARKER_REMOVE) { if (type == REPLICATION_MARKER_DOCUMENT || type == REPLICATION_MARKER_REMOVE) {
try { try {
return processDocument(type, slice); return processDocument(type, slice);
@ -963,7 +1000,7 @@ Result TailingSyncer::applyLog(SimpleHttpResult* response, TRI_voc_tick_t firstR
auto builder = std::make_shared<VPackBuilder>(); auto builder = std::make_shared<VPackBuilder>();
while (p < end) { while (p < end) {
char const* q = strchr(p, '\n'); char const* q = static_cast<char const*>(memchr(p, '\n', (end - p)));
if (q == nullptr) { if (q == nullptr) {
q = end; q = end;
@ -985,8 +1022,9 @@ Result TailingSyncer::applyLog(SimpleHttpResult* response, TRI_voc_tick_t firstR
try { try {
VPackParser parser(builder); VPackParser parser(builder);
parser.parse(p, static_cast<size_t>(q - p)); parser.parse(p, static_cast<size_t>(q - p));
} catch (std::exception const& ex) {
return Result(TRI_ERROR_HTTP_CORRUPTED_JSON, ex.what());
} catch (...) { } catch (...) {
// TODO: improve error reporting
return Result(TRI_ERROR_OUT_OF_MEMORY); return Result(TRI_ERROR_OUT_OF_MEMORY);
} }
@ -999,18 +1037,24 @@ Result TailingSyncer::applyLog(SimpleHttpResult* response, TRI_voc_tick_t firstR
"received invalid JSON data"); "received invalid JSON data");
} }
Result res; int typeValue = VelocyPackHelper::getNumericValue<int>(slice, "type", 0);
bool skipped; TRI_replication_operation_e markerType = static_cast<TRI_replication_operation_e>(typeValue);
TRI_voc_tick_t markerTick = 0; TRI_voc_tick_t markerTick = 0;
if (skipMarker(firstRegularTick, slice)) { VPackSlice tickSlice = slice.get(::tickRef);
// entry is skipped
skipped = true; if (tickSlice.isString() && tickSlice.getStringLength() > 0) {
} else { VPackValueLength len = 0;
res = applyLogMarker(slice, firstRegularTick, markerTick); char const* str = tickSlice.getStringUnchecked(len);
skipped = false; markerTick = NumberUtils::atoi_zero<TRI_voc_tick_t>(str, str + len);
} }
// entry is skipped?
bool skipped = skipMarker(firstRegularTick, slice, markerTick, markerType);
if (!skipped) {
Result res = applyLogMarker(slice, firstRegularTick, markerTick, markerType);
if (res.fail()) { if (res.fail()) {
// apply error // apply error
std::string errorMsg = res.errorMessage(); std::string errorMsg = res.errorMessage();
@ -1032,6 +1076,7 @@ Result TailingSyncer::applyLog(SimpleHttpResult* response, TRI_voc_tick_t firstR
<< "ignoring replication error for database '" << _state.databaseName << "ignoring replication error for database '" << _state.databaseName
<< "': " << errorMsg; << "': " << errorMsg;
} }
}
// update tick value // update tick value
WRITE_LOCKER_EVENTUAL(writeLocker, _applier->_statusLock); WRITE_LOCKER_EVENTUAL(writeLocker, _applier->_statusLock);
@ -1785,9 +1830,12 @@ Result TailingSyncer::processMasterLog(std::shared_ptr<Syncer::JobSynchronizer>
LOG_TOPIC(DEBUG, Logger::REPLICATION) << "applyLog. bumping tick"; LOG_TOPIC(DEBUG, Logger::REPLICATION) << "applyLog. bumping tick";
} }
TRI_voc_tick_t lastAppliedTick;
{ {
WRITE_LOCKER_EVENTUAL(writeLocker, _applier->_statusLock); WRITE_LOCKER_EVENTUAL(writeLocker, _applier->_statusLock);
_applier->_state._lastAvailableContinuousTick = tick; _applier->_state._lastAvailableContinuousTick = tick;
lastAppliedTick = _applier->_state._lastAppliedContinuousTick;
} }
if (!fromIncluded && fetchTick > 0 && if (!fromIncluded && fetchTick > 0 &&
@ -1819,13 +1867,6 @@ Result TailingSyncer::processMasterLog(std::shared_ptr<Syncer::JobSynchronizer>
}); });
} }
TRI_voc_tick_t lastAppliedTick;
{
READ_LOCKER(locker, _applier->_statusLock);
lastAppliedTick = _applier->_state._lastAppliedContinuousTick;
}
uint64_t processedMarkers = 0; uint64_t processedMarkers = 0;
Result r = applyLog(response.get(), firstRegularTick, processedMarkers, ignoreCount); Result r = applyLog(response.get(), firstRegularTick, processedMarkers, ignoreCount);

View File

@ -69,7 +69,8 @@ class TailingSyncer : public Syncer {
void abortOngoingTransactions() noexcept; void abortOngoingTransactions() noexcept;
/// @brief whether or not a collection should be excluded /// @brief whether or not a collection should be excluded
bool skipMarker(TRI_voc_tick_t, arangodb::velocypack::Slice const&); bool skipMarker(TRI_voc_tick_t firstRegulaTick, arangodb::velocypack::Slice const& slice,
TRI_voc_tick_t actualMarkerTick, TRI_replication_operation_e type);
/// @brief whether or not a collection should be excluded /// @brief whether or not a collection should be excluded
bool isExcludedCollection(std::string const&) const; bool isExcludedCollection(std::string const&) const;
@ -105,7 +106,8 @@ class TailingSyncer : public Syncer {
/// @brief apply a single marker from the continuous log /// @brief apply a single marker from the continuous log
Result applyLogMarker(arangodb::velocypack::Slice const& slice, Result applyLogMarker(arangodb::velocypack::Slice const& slice,
TRI_voc_tick_t firstRegularTick, TRI_voc_tick_t& markerTick); TRI_voc_tick_t firstRegularTick, TRI_voc_tick_t markerTick,
TRI_replication_operation_e type);
/// @brief apply the data from the continuous log /// @brief apply the data from the continuous log
Result applyLog(httpclient::SimpleHttpResult*, TRI_voc_tick_t firstRegularTick, Result applyLog(httpclient::SimpleHttpResult*, TRI_voc_tick_t firstRegularTick,
@ -148,6 +150,8 @@ class TailingSyncer : public Syncer {
/// @brief determines if we can work in parallel on master and slave /// @brief determines if we can work in parallel on master and slave
void checkParallel(); void checkParallel();
arangodb::Result removeSingleDocument(arangodb::LogicalCollection* coll, std::string const& key);
protected: protected:
virtual bool skipMarker(arangodb::velocypack::Slice const& slice) = 0; virtual bool skipMarker(arangodb::velocypack::Slice const& slice) = 0;
@ -180,9 +184,6 @@ class TailingSyncer : public Syncer {
/// data from a master /// data from a master
bool _requireFromPresent; bool _requireFromPresent;
/// @brief whether we can use single operation transactions
bool _supportsSingleOperations;
/// @brief ignore rename, create and drop operations for collections /// @brief ignore rename, create and drop operations for collections
bool _ignoreRenameCreateDrop; bool _ignoreRenameCreateDrop;

View File

@ -797,8 +797,8 @@ Result RocksDBCollection::insert(arangodb::transaction::Methods* trx,
// note that we don't need it for this engine // note that we don't need it for this engine
resultMarkerTick = 0; resultMarkerTick = 0;
LocalDocumentId const documentId = LocalDocumentId::create(); bool const isEdgeCollection = (TRI_COL_TYPE_EDGE == _logicalCollection.type());
auto isEdgeCollection = (TRI_COL_TYPE_EDGE == _logicalCollection.type());
transaction::BuilderLeaser builder(trx); transaction::BuilderLeaser builder(trx);
Result res(newObjectForInsert(trx, slice, isEdgeCollection, *builder.get(), Result res(newObjectForInsert(trx, slice, isEdgeCollection, *builder.get(),
options.isRestore, revisionId)); options.isRestore, revisionId));
@ -825,9 +825,9 @@ Result RocksDBCollection::insert(arangodb::transaction::Methods* trx,
// check // check
VPackSlice keySlice = transaction::helpers::extractKeyFromDocument(newSlice); VPackSlice keySlice = transaction::helpers::extractKeyFromDocument(newSlice);
if (keySlice.isString()) { if (keySlice.isString()) {
LocalDocumentId const documentId = LocalDocumentId const oldDocumentId =
primaryIndex()->lookupKey(trx, arangodb::velocypack::StringRef(keySlice)); primaryIndex()->lookupKey(trx, arangodb::velocypack::StringRef(keySlice));
if (documentId.isSet()) { if (oldDocumentId.isSet()) {
if (options.indexOperationMode == Index::OperationMode::internal) { if (options.indexOperationMode == Index::OperationMode::internal) {
// need to return the key of the conflict document // need to return the key of the conflict document
return res.reset(TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED, return res.reset(TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED,
@ -838,6 +838,8 @@ Result RocksDBCollection::insert(arangodb::transaction::Methods* trx,
} }
} }
LocalDocumentId const documentId = LocalDocumentId::create();
RocksDBSavePoint guard(trx, TRI_VOC_DOCUMENT_OPERATION_INSERT); RocksDBSavePoint guard(trx, TRI_VOC_DOCUMENT_OPERATION_INSERT);
auto state = RocksDBTransactionState::toState(trx); auto state = RocksDBTransactionState::toState(trx);
@ -988,8 +990,7 @@ Result RocksDBCollection::replace(transaction::Methods* trx,
std::function<Result(void)> callbackDuringLock) { std::function<Result(void)> callbackDuringLock) {
resultMarkerTick = 0; resultMarkerTick = 0;
LocalDocumentId const documentId = LocalDocumentId::create(); bool const isEdgeCollection = (TRI_COL_TYPE_EDGE == _logicalCollection.type());
auto isEdgeCollection = (TRI_COL_TYPE_EDGE == _logicalCollection.type());
// get the previous revision // get the previous revision
VPackSlice key = newSlice.get(StaticStrings::KeyString); VPackSlice key = newSlice.get(StaticStrings::KeyString);
@ -1025,6 +1026,8 @@ Result RocksDBCollection::replace(transaction::Methods* trx,
} }
} }
LocalDocumentId const documentId = LocalDocumentId::create();
// merge old and new values // merge old and new values
TRI_voc_rid_t revisionId; TRI_voc_rid_t revisionId;
transaction::BuilderLeaser builder(trx); transaction::BuilderLeaser builder(trx);

View File

@ -434,16 +434,18 @@ Result RocksDBPrimaryIndex::insert(transaction::Methods& trx, RocksDBMethods* mt
LocalDocumentId const& documentId, LocalDocumentId const& documentId,
velocypack::Slice const& slice, velocypack::Slice const& slice,
Index::OperationMode mode) { Index::OperationMode mode) {
Result res; VPackSlice keySlice;
VPackSlice keySlice = transaction::helpers::extractKeyFromDocument(slice); TRI_voc_rid_t revision;
transaction::helpers::extractKeyAndRevFromDocument(slice, keySlice, revision);
TRI_ASSERT(keySlice.isString()); TRI_ASSERT(keySlice.isString());
RocksDBKeyLeaser key(&trx); RocksDBKeyLeaser key(&trx);
key->constructPrimaryIndexValue(_objectId, arangodb::velocypack::StringRef(keySlice)); key->constructPrimaryIndexValue(_objectId, arangodb::velocypack::StringRef(keySlice));
rocksdb::PinnableSlice val; rocksdb::PinnableSlice val;
rocksdb::Status s = mthd->Get(_cf, key->string(), &val); rocksdb::Status s = mthd->Get(_cf, key->string(), &val);
Result res;
if (s.ok()) { // detected conflicting primary key if (s.ok()) { // detected conflicting primary key
std::string existingId = keySlice.copyString(); std::string existingId = keySlice.copyString();
@ -459,7 +461,6 @@ Result RocksDBPrimaryIndex::insert(transaction::Methods& trx, RocksDBMethods* mt
blackListKey(key->string().data(), static_cast<uint32_t>(key->string().size())); blackListKey(key->string().data(), static_cast<uint32_t>(key->string().size()));
TRI_voc_rid_t revision = transaction::helpers::extractRevFromDocument(slice);
auto value = RocksDBValue::PrimaryIndexValue(documentId, revision); auto value = RocksDBValue::PrimaryIndexValue(documentId, revision);
s = mthd->Put(_cf, key.ref(), value.string()); s = mthd->Put(_cf, key.ref(), value.string());

View File

@ -47,7 +47,6 @@
#include <rocksdb/options.h> #include <rocksdb/options.h>
#include <rocksdb/status.h> #include <rocksdb/status.h>
#include <rocksdb/utilities/optimistic_transaction_db.h>
#include <rocksdb/utilities/transaction.h> #include <rocksdb/utilities/transaction.h>
#include <rocksdb/utilities/transaction_db.h> #include <rocksdb/utilities/transaction_db.h>
#include <rocksdb/utilities/write_batch_with_index.h> #include <rocksdb/utilities/write_batch_with_index.h>

View File

@ -122,7 +122,7 @@ bool PhysicalCollection::hasIndexOfType(arangodb::Index::IndexType type) const {
} }
VPackValueLength len; VPackValueLength len;
char const* str = value.getString(len); char const* str = value.getStringUnchecked(len);
arangodb::Index::IndexType const type = arangodb::Index::type(str, len); arangodb::Index::IndexType const type = arangodb::Index::type(str, len);
for (auto const& idx : indexes) { for (auto const& idx : indexes) {
if (idx->type() == type) { if (idx->type() == type) {
@ -336,7 +336,7 @@ Result PhysicalCollection::newObjectForInsert(transaction::Methods* trx,
return Result(TRI_ERROR_ARANGO_DOCUMENT_KEY_BAD); return Result(TRI_ERROR_ARANGO_DOCUMENT_KEY_BAD);
} else { } else {
VPackValueLength l; VPackValueLength l;
char const* p = s.getString(l); char const* p = s.getStringUnchecked(l);
// validate and track the key just used // validate and track the key just used
auto res = _logicalCollection.keyGenerator()->validate(p, l, isRestore); auto res = _logicalCollection.keyGenerator()->validate(p, l, isRestore);
@ -393,7 +393,7 @@ Result PhysicalCollection::newObjectForInsert(transaction::Methods* trx,
if (s.isString()) { if (s.isString()) {
builder.add(StaticStrings::RevString, s); builder.add(StaticStrings::RevString, s);
VPackValueLength l; VPackValueLength l;
char const* p = s.getString(l); char const* p = s.getStringUnchecked(l);
revisionId = TRI_StringToRid(p, l, false); revisionId = TRI_StringToRid(p, l, false);
handled = true; handled = true;
} }
@ -482,7 +482,7 @@ Result PhysicalCollection::newObjectForReplace(transaction::Methods* trx,
if (s.isString()) { if (s.isString()) {
builder.add(StaticStrings::RevString, s); builder.add(StaticStrings::RevString, s);
VPackValueLength l; VPackValueLength l;
char const* p = s.getString(l); char const* p = s.getStringUnchecked(l);
revisionId = TRI_StringToRid(p, l, false); revisionId = TRI_StringToRid(p, l, false);
handled = true; handled = true;
} }

View File

@ -279,7 +279,7 @@ class LogicalCollection : public LogicalDataSource {
Result read(transaction::Methods* trx, arangodb::velocypack::StringRef const& key, Result read(transaction::Methods* trx, arangodb::velocypack::StringRef const& key,
ManagedDocumentResult& mdr, bool lock); ManagedDocumentResult& mdr, bool lock);
Result read(transaction::Methods*, arangodb::velocypack::Slice const&, Result read(transaction::Methods*, arangodb::velocypack::Slice const&,
ManagedDocumentResult& result, bool); ManagedDocumentResult& result, bool lock);
/// @brief processes a truncate operation /// @brief processes a truncate operation
Result truncate(transaction::Methods& trx, OperationOptions& options); Result truncate(transaction::Methods& trx, OperationOptions& options);

View File

@ -53,6 +53,9 @@ using VelocyPackHelper = arangodb::basics::VelocyPackHelper;
namespace { namespace {
static arangodb::velocypack::StringRef const idRef("id");
static arangodb::velocypack::StringRef const cidRef("cid");
static std::unique_ptr<VPackAttributeTranslator> translator; static std::unique_ptr<VPackAttributeTranslator> translator;
static std::unique_ptr<VPackAttributeExcludeHandler> excludeHandler; static std::unique_ptr<VPackAttributeExcludeHandler> excludeHandler;
static std::unique_ptr<VPackCustomTypeHandler>customTypeHandler; static std::unique_ptr<VPackCustomTypeHandler>customTypeHandler;
@ -976,10 +979,10 @@ uint64_t VelocyPackHelper::extractIdValue(VPackSlice const& slice) {
if (!slice.isObject()) { if (!slice.isObject()) {
return 0; return 0;
} }
VPackSlice id = slice.get("id"); VPackSlice id = slice.get(::idRef);
if (id.isNone()) { if (id.isNone()) {
// pre-3.1 compatibility // pre-3.1 compatibility
id = slice.get("cid"); id = slice.get(::cidRef);
} }
if (id.isString()) { if (id.isString()) {