mirror of https://gitee.com/bigwinds/arangodb
Resilience test failure points (#6545)
This commit is contained in:
parent
e37576e03c
commit
3c965ee48a
|
@ -3307,23 +3307,18 @@ void ClusterInfo::loadCurrentMappings() {
|
|||
|
||||
if (mappings.isObject()) {
|
||||
decltype(_coordinatorIdMap) newCoordinatorIdMap;
|
||||
decltype(_dbserverIdMap) newDBServerIdMap;
|
||||
decltype(_nameMap) newNameMap;
|
||||
|
||||
for (auto const& mapping : VPackObjectIterator(mappings)) {
|
||||
ServerID fullId = mapping.key.copyString();
|
||||
auto mapObject = mapping.value;
|
||||
if (mapObject.isObject()) {
|
||||
ServerShortName shortName = mapObject.get("ShortName").copyString();
|
||||
newNameMap.emplace(shortName, fullId);
|
||||
|
||||
ServerShortID shortId = mapObject.get("TransactionID").getNumericValue<ServerShortID>();
|
||||
static std::string const expectedPrefix{"Coordinator"};
|
||||
if (shortName.size() > expectedPrefix.size() &&
|
||||
shortName.substr(0, expectedPrefix.size()) == expectedPrefix) {
|
||||
newCoordinatorIdMap.emplace(shortId, fullId);
|
||||
} else {
|
||||
newDBServerIdMap.emplace(shortId, fullId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -3331,9 +3326,7 @@ void ClusterInfo::loadCurrentMappings() {
|
|||
// Now set the new value:
|
||||
{
|
||||
WRITE_LOCKER(writeLocker, _mappingsProt.lock);
|
||||
_nameMap.swap(newNameMap);
|
||||
_coordinatorIdMap.swap(newCoordinatorIdMap);
|
||||
_dbserverIdMap.swap(newDBServerIdMap);
|
||||
_mappingsProt.doneVersion = storedVersion;
|
||||
_mappingsProt.isValid = true;
|
||||
}
|
||||
|
@ -3594,50 +3587,6 @@ ServerID ClusterInfo::getCoordinatorByShortID(ServerShortID shortId) {
|
|||
return result;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief lookup full dbserver ID from short ID
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
ServerID ClusterInfo::getDBServerByShortID(ServerShortID shortId) {
|
||||
ServerID result;
|
||||
|
||||
if (!_mappingsProt.isValid) {
|
||||
loadCurrentMappings();
|
||||
}
|
||||
|
||||
// return a consistent state of servers
|
||||
READ_LOCKER(readLocker, _mappingsProt.lock);
|
||||
|
||||
auto it = _dbserverIdMap.find(shortId);
|
||||
if (it != _dbserverIdMap.end()) {
|
||||
result = it->second;
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief lookup full server ID from short name
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
ServerID ClusterInfo::getServerByShortName(ServerShortName const& shortName) {
|
||||
ServerID result;
|
||||
|
||||
if (!_mappingsProt.isValid) {
|
||||
loadCurrentMappings();
|
||||
}
|
||||
|
||||
// return a consistent state of servers
|
||||
READ_LOCKER(readLocker, _mappingsProt.lock);
|
||||
|
||||
auto it = _nameMap.find(shortName);
|
||||
if (it != _nameMap.end()) {
|
||||
result = it->second;
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief invalidate plan
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
|
|
@ -548,18 +548,6 @@ class ClusterInfo {
|
|||
|
||||
ServerID getCoordinatorByShortID(ServerShortID);
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief lookup a full dbserver ID by short ID
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
ServerID getDBServerByShortID(ServerShortID);
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief lookup a full server ID by short name
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
ServerID getServerByShortName(ServerShortName const&);
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief invalidate planned
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -715,8 +703,6 @@ class ClusterInfo {
|
|||
|
||||
// Mappings between short names/IDs and full server IDs
|
||||
std::unordered_map<ServerShortID, ServerID> _coordinatorIdMap;
|
||||
std::unordered_map<ServerShortID, ServerID> _dbserverIdMap;
|
||||
std::unordered_map<ServerShortName, ServerID> _nameMap;
|
||||
ProtectionData _mappingsProt;
|
||||
|
||||
std::shared_ptr<VPackBuilder> _plan;
|
||||
|
|
|
@ -167,7 +167,7 @@ arangodb::Result getReadLockId (
|
|||
}
|
||||
|
||||
|
||||
arangodb::Result count(
|
||||
arangodb::Result collectionCount(
|
||||
std::shared_ptr<arangodb::LogicalCollection> const& col, uint64_t& c) {
|
||||
|
||||
std::string collectionName(col->name());
|
||||
|
@ -227,16 +227,22 @@ arangodb::Result addShardFollower (
|
|||
return arangodb::Result(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND, errorMsg);
|
||||
}
|
||||
|
||||
uint64_t c;
|
||||
count(collection, c);
|
||||
uint64_t docCount = 0;
|
||||
Result res = collectionCount(collection, docCount);
|
||||
if (res.fail()) {
|
||||
return res;
|
||||
}
|
||||
VPackBuilder body;
|
||||
{ VPackObjectBuilder b(&body);
|
||||
body.add(FOLLOWER_ID, VPackValue(arangodb::ServerState::instance()->getId()));
|
||||
body.add(SHARD, VPackValue(shard));
|
||||
body.add("checksum", VPackValue(std::to_string(c)));
|
||||
body.add("checksum", VPackValue(std::to_string(docCount)));
|
||||
if (lockJobId != 0) {
|
||||
body.add("readLockId", VPackValue(lockJobId));
|
||||
}}
|
||||
body.add("readLockId", VPackValue(std::to_string(lockJobId)));
|
||||
} else {
|
||||
TRI_ASSERT(docCount == 0);
|
||||
}
|
||||
}
|
||||
|
||||
auto comres = cc->syncRequest(
|
||||
clientId, 1, endpoint, rest::RequestType::PUT,
|
||||
|
@ -723,8 +729,8 @@ bool SynchronizeShard::first() {
|
|||
}
|
||||
|
||||
auto ep = clusterInfo->getServerEndpoint(leader);
|
||||
uint64_t c;
|
||||
if (!count(collection, c).ok()) {
|
||||
uint64_t docCount;
|
||||
if (!collectionCount(collection, docCount).ok()) {
|
||||
std::stringstream error;
|
||||
error << "failed to get a count on leader " << shard;
|
||||
LOG_TOPIC(ERR, Logger::MAINTENANCE) << "SynchronizeShard " << error.str();
|
||||
|
@ -732,7 +738,7 @@ bool SynchronizeShard::first() {
|
|||
return false;
|
||||
}
|
||||
|
||||
if (c == 0) {
|
||||
if (docCount == 0) {
|
||||
// We have a short cut:
|
||||
LOG_TOPIC(DEBUG, Logger::MAINTENANCE) <<
|
||||
"synchronizeOneShard: trying short cut to synchronize local shard '" <<
|
||||
|
|
|
@ -478,9 +478,9 @@ void ClusterCollection::invokeOnAllElements(
|
|||
// -- SECTION DML Operations --
|
||||
///////////////////////////////////
|
||||
|
||||
void ClusterCollection::truncate(transaction::Methods* trx,
|
||||
OperationOptions& options) {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED);
|
||||
Result ClusterCollection::truncate(transaction::Methods* trx,
|
||||
OperationOptions& options) {
|
||||
return Result(TRI_ERROR_NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
LocalDocumentId ClusterCollection::lookupKey(transaction::Methods* trx,
|
||||
|
|
|
@ -132,7 +132,7 @@ class ClusterCollection final : public PhysicalCollection {
|
|||
// -- SECTION DML Operations --
|
||||
///////////////////////////////////
|
||||
|
||||
void truncate(transaction::Methods* trx, OperationOptions& options) override;
|
||||
Result truncate(transaction::Methods* trx, OperationOptions&) override;
|
||||
|
||||
void deferDropCollection(
|
||||
std::function<bool(LogicalCollection&)> const& callback
|
||||
|
|
|
@ -2736,8 +2736,8 @@ int MMFilesCollection::unlockWrite(bool useDeadlockDetector, TransactionState co
|
|||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
||||
void MMFilesCollection::truncate(transaction::Methods* trx,
|
||||
OperationOptions& options) {
|
||||
Result MMFilesCollection::truncate(transaction::Methods* trx,
|
||||
OperationOptions& options) {
|
||||
auto primaryIdx = primaryIndex();
|
||||
|
||||
options.ignoreRevs = true;
|
||||
|
@ -2766,7 +2766,15 @@ void MMFilesCollection::truncate(transaction::Methods* trx,
|
|||
|
||||
return true;
|
||||
};
|
||||
primaryIdx->invokeOnAllElementsForRemoval(callback);
|
||||
try {
|
||||
primaryIdx->invokeOnAllElementsForRemoval(callback);
|
||||
} catch(basics::Exception const& e) {
|
||||
return Result(e.code(), e.message());
|
||||
} catch(std::exception const& e) {
|
||||
return Result(TRI_ERROR_INTERNAL, e.what());
|
||||
} catch(...) {
|
||||
return Result(TRI_ERROR_INTERNAL, "unknown error during truncate");
|
||||
}
|
||||
|
||||
READ_LOCKER(guard, _indexesLock);
|
||||
auto indexes = _indexes;
|
||||
|
@ -2777,6 +2785,7 @@ void MMFilesCollection::truncate(transaction::Methods* trx,
|
|||
TRI_ASSERT(idx->type() != Index::IndexType::TRI_IDX_TYPE_PRIMARY_INDEX);
|
||||
idx->afterTruncate();
|
||||
}
|
||||
return Result();
|
||||
}
|
||||
|
||||
LocalDocumentId MMFilesCollection::reuseOrCreateLocalDocumentId(OperationOptions const& options) const {
|
||||
|
|
|
@ -329,7 +329,7 @@ class MMFilesCollection final : public PhysicalCollection {
|
|||
// -- SECTION DML Operations --
|
||||
///////////////////////////////////
|
||||
|
||||
void truncate(transaction::Methods* trx, OperationOptions& options) override;
|
||||
Result truncate(transaction::Methods* trx, OperationOptions&) override;
|
||||
|
||||
/// @brief Defer a callback to be executed when the collection
|
||||
/// can be dropped. The callback is supposed to drop
|
||||
|
|
|
@ -2041,7 +2041,7 @@ void RestReplicationHandler::handleCommandApplierDeleteState() {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void RestReplicationHandler::handleCommandAddFollower() {
|
||||
TRI_ASSERT(ServerState::instance()->isDBServer());
|
||||
TRI_ASSERT(ServerState::instance()->isDBServer());
|
||||
|
||||
bool success = false;
|
||||
VPackSlice const body = this->parseVPackBody(success);
|
||||
|
@ -2055,26 +2055,29 @@ void RestReplicationHandler::handleCommandAddFollower() {
|
|||
"and 'shard'");
|
||||
return;
|
||||
}
|
||||
VPackSlice const followerId = body.get("followerId");
|
||||
VPackSlice const readLockId = body.get("readLockId");
|
||||
VPackSlice const shard = body.get("shard");
|
||||
if (!followerId.isString() || !shard.isString()) {
|
||||
VPackSlice const followerIdSlice = body.get("followerId");
|
||||
VPackSlice const readLockIdSlice = body.get("readLockId");
|
||||
VPackSlice const shardSlice = body.get("shard");
|
||||
VPackSlice const checksumSlice = body.get("checksum");
|
||||
if (!followerIdSlice.isString() ||
|
||||
!shardSlice.isString() ||
|
||||
!checksumSlice.isString()) {
|
||||
generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER,
|
||||
"'followerId' and 'shard' attributes must be strings");
|
||||
"'followerId', 'shard' and 'checksum' attributes must be strings");
|
||||
return;
|
||||
}
|
||||
|
||||
auto col = _vocbase.lookupCollection(shard.copyString());
|
||||
|
||||
auto col = _vocbase.lookupCollection(shardSlice.copyString());
|
||||
if (col == nullptr) {
|
||||
generateError(rest::ResponseCode::SERVER_ERROR,
|
||||
TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND,
|
||||
"did not find collection");
|
||||
return;
|
||||
}
|
||||
|
||||
if (readLockId.isNone()) {
|
||||
// Short cut for the case that the collection is empty
|
||||
|
||||
const std::string followerId = followerIdSlice.copyString();
|
||||
// Short cut for the case that the collection is empty
|
||||
if (readLockIdSlice.isNone()) {
|
||||
auto ctx = transaction::StandaloneContext::Create(_vocbase);
|
||||
SingleCollectionTransaction trx(ctx, *col, AccessMode::Type::EXCLUSIVE);
|
||||
auto res = trx.begin();
|
||||
|
@ -2085,9 +2088,8 @@ void RestReplicationHandler::handleCommandAddFollower() {
|
|||
if (countRes.ok()) {
|
||||
VPackSlice nrSlice = countRes.slice();
|
||||
uint64_t nr = nrSlice.getNumber<uint64_t>();
|
||||
|
||||
if (nr == 0) {
|
||||
col->followers()->add(followerId.copyString());
|
||||
if (nr == 0 && checksumSlice.isEqualString("0")) {
|
||||
col->followers()->add(followerId);
|
||||
|
||||
VPackBuilder b;
|
||||
{
|
||||
|
@ -2107,55 +2109,57 @@ void RestReplicationHandler::handleCommandAddFollower() {
|
|||
return;
|
||||
}
|
||||
|
||||
VPackSlice const checksum = body.get("checksum");
|
||||
// optional while introducing this bugfix. should definitely be required with
|
||||
// 3.4
|
||||
// and throw a 400 then when no checksum is provided
|
||||
if (checksum.isString() && readLockId.isString()) {
|
||||
std::string referenceChecksum;
|
||||
{
|
||||
CONDITION_LOCKER(locker, _condVar);
|
||||
auto it = _holdReadLockJobs.find(readLockId.copyString());
|
||||
if (it == _holdReadLockJobs.end()) {
|
||||
// Entry has been removed since, so we cancel the whole thing
|
||||
// right away and generate an error:
|
||||
generateError(rest::ResponseCode::SERVER_ERROR,
|
||||
TRI_ERROR_TRANSACTION_INTERNAL,
|
||||
"read transaction was cancelled");
|
||||
return;
|
||||
}
|
||||
if (!readLockIdSlice.isString() || readLockIdSlice.getStringLength() == 0) {
|
||||
generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER,
|
||||
"'readLockId' is not a string or empty");
|
||||
return;
|
||||
}
|
||||
// previous versions < 3.3x might not send the checksum, if mixed clusters
|
||||
// get into trouble here we may need to be more lenient
|
||||
TRI_ASSERT(checksumSlice.isString() && readLockIdSlice.isString());
|
||||
|
||||
const std::string readLockId = readLockIdSlice.copyString();
|
||||
const std::string checksum = checksumSlice.copyString();
|
||||
|
||||
auto trx = it->second;
|
||||
if (!trx) {
|
||||
generateError(rest::ResponseCode::SERVER_ERROR,
|
||||
TRI_ERROR_TRANSACTION_INTERNAL,
|
||||
"Read lock not yet acquired!");
|
||||
return;
|
||||
}
|
||||
|
||||
// referenceChecksum is the stringified number of documents in the
|
||||
// collection
|
||||
uint64_t num = col->numberDocuments(trx.get(), transaction::CountType::Normal);
|
||||
referenceChecksum = std::to_string(num);
|
||||
}
|
||||
|
||||
auto result = col->compareChecksums(checksum, referenceChecksum);
|
||||
|
||||
if (result.fail()) {
|
||||
auto errorNumber = result.errorNumber();
|
||||
rest::ResponseCode code;
|
||||
if (errorNumber == TRI_ERROR_REPLICATION_WRONG_CHECKSUM ||
|
||||
errorNumber == TRI_ERROR_REPLICATION_WRONG_CHECKSUM_FORMAT) {
|
||||
code = rest::ResponseCode::BAD;
|
||||
} else {
|
||||
code = rest::ResponseCode::SERVER_ERROR;
|
||||
}
|
||||
generateError(code, errorNumber, result.errorMessage());
|
||||
std::string referenceChecksum;
|
||||
{
|
||||
CONDITION_LOCKER(locker, _condVar);
|
||||
auto it = _holdReadLockJobs.find(readLockId);
|
||||
if (it == _holdReadLockJobs.end()) {
|
||||
// Entry has been removed since, so we cancel the whole thing
|
||||
// right away and generate an error:
|
||||
generateError(rest::ResponseCode::SERVER_ERROR,
|
||||
TRI_ERROR_TRANSACTION_INTERNAL,
|
||||
"read transaction was cancelled");
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
col->followers()->add(followerId.copyString());
|
||||
auto trx = it->second;
|
||||
if (!trx) {
|
||||
generateError(rest::ResponseCode::SERVER_ERROR,
|
||||
TRI_ERROR_TRANSACTION_INTERNAL,
|
||||
"Read lock not yet acquired!");
|
||||
return;
|
||||
}
|
||||
|
||||
// referenceChecksum is the stringified number of documents in the
|
||||
// collection
|
||||
uint64_t num = col->numberDocuments(trx.get(), transaction::CountType::Normal);
|
||||
referenceChecksum = std::to_string(num);
|
||||
}
|
||||
|
||||
if (!checksumSlice.isEqualString(referenceChecksum)) {
|
||||
const std::string checksum = checksumSlice.copyString();
|
||||
LOG_TOPIC(WARN, Logger::REPLICATION) << "Cannot add follower, mismatching checksums. "
|
||||
<< "Expected: " << referenceChecksum << " Actual: " << checksum;
|
||||
generateError(rest::ResponseCode::BAD, TRI_ERROR_REPLICATION_WRONG_CHECKSUM,
|
||||
"'checksum' is wrong. Expected: "
|
||||
+ referenceChecksum
|
||||
+ ". Actual: " + checksum);
|
||||
return;
|
||||
}
|
||||
|
||||
col->followers()->add(followerId);
|
||||
|
||||
VPackBuilder b;
|
||||
{
|
||||
|
|
|
@ -627,8 +627,8 @@ void RocksDBCollection::invokeOnAllElements(
|
|||
// -- SECTION DML Operations --
|
||||
///////////////////////////////////
|
||||
|
||||
void RocksDBCollection::truncate(transaction::Methods* trx,
|
||||
OperationOptions& options) {
|
||||
Result RocksDBCollection::truncate(transaction::Methods* trx,
|
||||
OperationOptions& options) {
|
||||
TRI_ASSERT(_objectId != 0);
|
||||
auto state = RocksDBTransactionState::toState(trx);
|
||||
RocksDBMethods* mthds = state->rocksdbMethods();
|
||||
|
@ -650,18 +650,18 @@ void RocksDBCollection::truncate(transaction::Methods* trx,
|
|||
_logicalCollection.id(), _objectId);
|
||||
rocksdb::Status s = batch.PutLogData(log.slice());
|
||||
if (!s.ok()) {
|
||||
THROW_ARANGO_EXCEPTION(rocksutils::convertStatus(s));
|
||||
return rocksutils::convertStatus(s);
|
||||
}
|
||||
|
||||
TRI_IF_FAILURE("RocksDBRemoveLargeRangeOn") {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
|
||||
return Result(TRI_ERROR_DEBUG);
|
||||
}
|
||||
|
||||
// delete documents
|
||||
RocksDBKeyBounds bounds = RocksDBKeyBounds::CollectionDocuments(_objectId);
|
||||
s = batch.DeleteRange(bounds.columnFamily(), bounds.start(), bounds.end());
|
||||
if (!s.ok()) {
|
||||
THROW_ARANGO_EXCEPTION(rocksutils::convertStatus(s));
|
||||
return rocksutils::convertStatus(s);
|
||||
}
|
||||
|
||||
// delete indexes
|
||||
|
@ -672,7 +672,7 @@ void RocksDBCollection::truncate(transaction::Methods* trx,
|
|||
bounds = ridx->getBounds();
|
||||
s = batch.DeleteRange(bounds.columnFamily(), bounds.start(), bounds.end());
|
||||
if (!s.ok()) {
|
||||
THROW_ARANGO_EXCEPTION(rocksutils::convertStatus(s));
|
||||
return rocksutils::convertStatus(s);
|
||||
}
|
||||
idx->afterTruncate(); // clears caches (if applicable)
|
||||
}
|
||||
|
@ -681,7 +681,7 @@ void RocksDBCollection::truncate(transaction::Methods* trx,
|
|||
rocksdb::WriteOptions wo;
|
||||
s = rocksutils::globalRocksDB()->Write(wo, &batch);
|
||||
if (!s.ok()) {
|
||||
THROW_ARANGO_EXCEPTION(rocksutils::convertStatus(s));
|
||||
return rocksutils::convertStatus(s);
|
||||
}
|
||||
uint64_t prevCount = _numberDocuments;
|
||||
_numberDocuments = 0; // protected by collection lock
|
||||
|
@ -690,11 +690,11 @@ void RocksDBCollection::truncate(transaction::Methods* trx,
|
|||
// also compact the ranges in order to speed up all further accesses
|
||||
compact();
|
||||
}
|
||||
return;
|
||||
} else {
|
||||
TRI_IF_FAILURE("RocksDBRemoveLargeRangeOff") {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
|
||||
}
|
||||
return Result{};
|
||||
}
|
||||
|
||||
TRI_IF_FAILURE("RocksDBRemoveLargeRangeOff") {
|
||||
return TRI_ERROR_DEBUG;
|
||||
}
|
||||
|
||||
// normal transactional truncate
|
||||
|
@ -737,9 +737,8 @@ void RocksDBCollection::truncate(transaction::Methods* trx,
|
|||
LocalDocumentId const docId = RocksDBKey::documentId(iter->key());
|
||||
auto res = removeDocument(trx, docId, doc, options);
|
||||
|
||||
if (res.fail()) {
|
||||
// Failed to remove document in truncate. Throw
|
||||
THROW_ARANGO_EXCEPTION(res);
|
||||
if (res.fail()) { // Failed to remove document in truncate.
|
||||
return res;
|
||||
}
|
||||
|
||||
bool hasPerformedIntermediateCommit = false;
|
||||
|
@ -747,9 +746,8 @@ void RocksDBCollection::truncate(transaction::Methods* trx,
|
|||
res = state->addOperation(_logicalCollection.id(), docId.id(),
|
||||
TRI_VOC_DOCUMENT_OPERATION_REMOVE, hasPerformedIntermediateCommit);
|
||||
|
||||
if (res.fail()) {
|
||||
// This should never happen...
|
||||
THROW_ARANGO_EXCEPTION(res);
|
||||
if (res.fail()) { // This should never happen...
|
||||
return res;
|
||||
}
|
||||
guard.finish(hasPerformedIntermediateCommit);
|
||||
|
||||
|
@ -778,11 +776,12 @@ void RocksDBCollection::truncate(transaction::Methods* trx,
|
|||
#endif
|
||||
|
||||
TRI_IF_FAILURE("FailAfterAllCommits") {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
|
||||
return Result(TRI_ERROR_DEBUG);
|
||||
}
|
||||
TRI_IF_FAILURE("SegfaultAfterAllCommits") {
|
||||
TRI_SegfaultDebugging("SegfaultAfterAllCommits");
|
||||
}
|
||||
return Result{};
|
||||
}
|
||||
|
||||
LocalDocumentId RocksDBCollection::lookupKey(transaction::Methods* trx,
|
||||
|
|
|
@ -118,7 +118,7 @@ class RocksDBCollection final : public PhysicalCollection {
|
|||
// -- SECTION DML Operations --
|
||||
///////////////////////////////////
|
||||
|
||||
void truncate(transaction::Methods* trx, OperationOptions& options) override;
|
||||
Result truncate(transaction::Methods* trx, OperationOptions&) override;
|
||||
|
||||
void deferDropCollection(
|
||||
std::function<bool(LogicalCollection&)> const& callback
|
||||
|
|
|
@ -140,8 +140,8 @@ class PhysicalCollection {
|
|||
// -- SECTION DML Operations --
|
||||
///////////////////////////////////
|
||||
|
||||
virtual void truncate(transaction::Methods* trx,
|
||||
OperationOptions& options) = 0;
|
||||
virtual Result truncate(transaction::Methods* trx,
|
||||
OperationOptions&) = 0;
|
||||
|
||||
/// @brief Defer a callback to be executed when the collection
|
||||
/// can be dropped. The callback is supposed to drop
|
||||
|
|
|
@ -1786,45 +1786,42 @@ OperationResult transaction::Methods::insertLocal(
|
|||
if (cc != nullptr) {
|
||||
// nullptr only happens on controlled shutdown
|
||||
size_t nrDone = 0;
|
||||
size_t nrGood = cc->performRequests(requests,
|
||||
chooseTimeout(count, body->size()*followers->size()),
|
||||
nrDone, Logger::REPLICATION, false);
|
||||
if (nrGood < followers->size()) {
|
||||
// If any would-be-follower refused to follow there must be a
|
||||
// new leader in the meantime, in this case we must not allow
|
||||
// this operation to succeed, we simply return with a refusal
|
||||
// error (note that we use the follower version, since we have
|
||||
// lost leadership):
|
||||
if (findRefusal(requests)) {
|
||||
return OperationResult(TRI_ERROR_CLUSTER_SHARD_LEADER_RESIGNED, options);
|
||||
}
|
||||
cc->performRequests(requests, chooseTimeout(count, body->size()*followers->size()),
|
||||
nrDone, Logger::REPLICATION, false);
|
||||
// If any would-be-follower refused to follow there must be a
|
||||
// new leader in the meantime, in this case we must not allow
|
||||
// this operation to succeed, we simply return with a refusal
|
||||
// error (note that we use the follower version, since we have
|
||||
// lost leadership):
|
||||
if (findRefusal(requests)) {
|
||||
return OperationResult(TRI_ERROR_CLUSTER_SHARD_LEADER_RESIGNED, options);
|
||||
}
|
||||
|
||||
// Otherwise we drop all followers that were not successful:
|
||||
for (size_t i = 0; i < followers->size(); ++i) {
|
||||
bool replicationWorked =
|
||||
requests[i].done &&
|
||||
requests[i].result.status == CL_COMM_RECEIVED &&
|
||||
(requests[i].result.answer_code ==
|
||||
rest::ResponseCode::ACCEPTED ||
|
||||
requests[i].result.answer_code == rest::ResponseCode::CREATED);
|
||||
if (replicationWorked) {
|
||||
bool found;
|
||||
requests[i].result.answer->header(StaticStrings::ErrorCodes,
|
||||
found);
|
||||
replicationWorked = !found;
|
||||
}
|
||||
if (!replicationWorked) {
|
||||
auto const& followerInfo = collection->followers();
|
||||
if (followerInfo->remove((*followers)[i])) {
|
||||
LOG_TOPIC(WARN, Logger::REPLICATION)
|
||||
<< "insertLocal: dropping follower " << (*followers)[i]
|
||||
<< " for shard " << collectionName;
|
||||
} else {
|
||||
LOG_TOPIC(ERR, Logger::REPLICATION)
|
||||
<< "insertLocal: could not drop follower "
|
||||
<< (*followers)[i] << " for shard " << collectionName;
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_COULD_NOT_DROP_FOLLOWER);
|
||||
}
|
||||
// Otherwise we drop all followers that were not successful:
|
||||
for (size_t i = 0; i < followers->size(); ++i) {
|
||||
bool replicationWorked =
|
||||
requests[i].done &&
|
||||
requests[i].result.status == CL_COMM_RECEIVED &&
|
||||
(requests[i].result.answer_code ==
|
||||
rest::ResponseCode::ACCEPTED ||
|
||||
requests[i].result.answer_code == rest::ResponseCode::CREATED);
|
||||
if (replicationWorked) {
|
||||
bool found;
|
||||
requests[i].result.answer->header(StaticStrings::ErrorCodes,
|
||||
found);
|
||||
replicationWorked = !found;
|
||||
}
|
||||
if (!replicationWorked) {
|
||||
auto const& followerInfo = collection->followers();
|
||||
if (followerInfo->remove((*followers)[i])) {
|
||||
LOG_TOPIC(WARN, Logger::REPLICATION)
|
||||
<< "insertLocal: dropping follower " << (*followers)[i]
|
||||
<< " for shard " << collectionName;
|
||||
} else {
|
||||
LOG_TOPIC(ERR, Logger::REPLICATION)
|
||||
<< "insertLocal: could not drop follower "
|
||||
<< (*followers)[i] << " for shard " << collectionName;
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_COULD_NOT_DROP_FOLLOWER);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2164,45 +2161,42 @@ OperationResult transaction::Methods::modifyLocal(
|
|||
path, body);
|
||||
}
|
||||
size_t nrDone = 0;
|
||||
size_t nrGood = cc->performRequests(requests,
|
||||
chooseTimeout(count, body->size()*followers->size()),
|
||||
nrDone, Logger::REPLICATION, false);
|
||||
if (nrGood < followers->size()) {
|
||||
// If any would-be-follower refused to follow there must be a
|
||||
// new leader in the meantime, in this case we must not allow
|
||||
// this operation to succeed, we simply return with a refusal
|
||||
// error (note that we use the follower version, since we have
|
||||
// lost leadership):
|
||||
if (findRefusal(requests)) {
|
||||
return OperationResult(TRI_ERROR_CLUSTER_SHARD_LEADER_RESIGNED);
|
||||
}
|
||||
cc->performRequests(requests, chooseTimeout(count, body->size()*followers->size()),
|
||||
nrDone, Logger::REPLICATION, false);
|
||||
// If any would-be-follower refused to follow there must be a
|
||||
// new leader in the meantime, in this case we must not allow
|
||||
// this operation to succeed, we simply return with a refusal
|
||||
// error (note that we use the follower version, since we have
|
||||
// lost leadership):
|
||||
if (findRefusal(requests)) {
|
||||
return OperationResult(TRI_ERROR_CLUSTER_SHARD_LEADER_RESIGNED);
|
||||
}
|
||||
|
||||
// Otherwise we drop all followers that were not successful:
|
||||
for (size_t i = 0; i < followers->size(); ++i) {
|
||||
bool replicationWorked =
|
||||
requests[i].done &&
|
||||
requests[i].result.status == CL_COMM_RECEIVED &&
|
||||
(requests[i].result.answer_code ==
|
||||
rest::ResponseCode::ACCEPTED ||
|
||||
requests[i].result.answer_code == rest::ResponseCode::OK);
|
||||
if (replicationWorked) {
|
||||
bool found;
|
||||
requests[i].result.answer->header(StaticStrings::ErrorCodes,
|
||||
found);
|
||||
replicationWorked = !found;
|
||||
}
|
||||
if (!replicationWorked) {
|
||||
auto const& followerInfo = collection->followers();
|
||||
if (followerInfo->remove((*followers)[i])) {
|
||||
LOG_TOPIC(WARN, Logger::REPLICATION)
|
||||
<< "modifyLocal: dropping follower " << (*followers)[i]
|
||||
<< " for shard " << collectionName;
|
||||
} else {
|
||||
LOG_TOPIC(ERR, Logger::REPLICATION)
|
||||
<< "modifyLocal: could not drop follower "
|
||||
<< (*followers)[i] << " for shard " << collectionName;
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_COULD_NOT_DROP_FOLLOWER);
|
||||
}
|
||||
// Otherwise we drop all followers that were not successful:
|
||||
for (size_t i = 0; i < followers->size(); ++i) {
|
||||
bool replicationWorked =
|
||||
requests[i].done &&
|
||||
requests[i].result.status == CL_COMM_RECEIVED &&
|
||||
(requests[i].result.answer_code ==
|
||||
rest::ResponseCode::ACCEPTED ||
|
||||
requests[i].result.answer_code == rest::ResponseCode::OK);
|
||||
if (replicationWorked) {
|
||||
bool found;
|
||||
requests[i].result.answer->header(StaticStrings::ErrorCodes,
|
||||
found);
|
||||
replicationWorked = !found;
|
||||
}
|
||||
if (!replicationWorked) {
|
||||
auto const& followerInfo = collection->followers();
|
||||
if (followerInfo->remove((*followers)[i])) {
|
||||
LOG_TOPIC(WARN, Logger::REPLICATION)
|
||||
<< "modifyLocal: dropping follower " << (*followers)[i]
|
||||
<< " for shard " << collectionName;
|
||||
} else {
|
||||
LOG_TOPIC(ERR, Logger::REPLICATION)
|
||||
<< "modifyLocal: could not drop follower "
|
||||
<< (*followers)[i] << " for shard " << collectionName;
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_COULD_NOT_DROP_FOLLOWER);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2459,45 +2453,42 @@ OperationResult transaction::Methods::removeLocal(
|
|||
body);
|
||||
}
|
||||
size_t nrDone = 0;
|
||||
size_t nrGood = cc->performRequests(requests,
|
||||
chooseTimeout(count, body->size()*followers->size()),
|
||||
nrDone, Logger::REPLICATION, false);
|
||||
if (nrGood < followers->size()) {
|
||||
// If any would-be-follower refused to follow there must be a
|
||||
// new leader in the meantime, in this case we must not allow
|
||||
// this operation to succeed, we simply return with a refusal
|
||||
// error (note that we use the follower version, since we have
|
||||
// lost leadership):
|
||||
if (findRefusal(requests)) {
|
||||
return OperationResult(TRI_ERROR_CLUSTER_SHARD_LEADER_RESIGNED);
|
||||
}
|
||||
cc->performRequests(requests, chooseTimeout(count, body->size()*followers->size()),
|
||||
nrDone, Logger::REPLICATION, false);
|
||||
// If any would-be-follower refused to follow there must be a
|
||||
// new leader in the meantime, in this case we must not allow
|
||||
// this operation to succeed, we simply return with a refusal
|
||||
// error (note that we use the follower version, since we have
|
||||
// lost leadership):
|
||||
if (findRefusal(requests)) {
|
||||
return OperationResult(TRI_ERROR_CLUSTER_SHARD_LEADER_RESIGNED);
|
||||
}
|
||||
|
||||
// we drop all followers that were not successful:
|
||||
for (size_t i = 0; i < followers->size(); ++i) {
|
||||
bool replicationWorked =
|
||||
requests[i].done &&
|
||||
requests[i].result.status == CL_COMM_RECEIVED &&
|
||||
(requests[i].result.answer_code ==
|
||||
rest::ResponseCode::ACCEPTED ||
|
||||
requests[i].result.answer_code == rest::ResponseCode::OK);
|
||||
if (replicationWorked) {
|
||||
bool found;
|
||||
requests[i].result.answer->header(StaticStrings::ErrorCodes,
|
||||
found);
|
||||
replicationWorked = !found;
|
||||
}
|
||||
if (!replicationWorked) {
|
||||
auto const& followerInfo = collection->followers();
|
||||
if (followerInfo->remove((*followers)[i])) {
|
||||
LOG_TOPIC(WARN, Logger::REPLICATION)
|
||||
<< "removeLocal: dropping follower " << (*followers)[i]
|
||||
<< " for shard " << collectionName;
|
||||
} else {
|
||||
LOG_TOPIC(ERR, Logger::REPLICATION)
|
||||
<< "removeLocal: could not drop follower "
|
||||
<< (*followers)[i] << " for shard " << collectionName;
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_COULD_NOT_DROP_FOLLOWER);
|
||||
}
|
||||
// we drop all followers that were not successful:
|
||||
for (size_t i = 0; i < followers->size(); ++i) {
|
||||
bool replicationWorked =
|
||||
requests[i].done &&
|
||||
requests[i].result.status == CL_COMM_RECEIVED &&
|
||||
(requests[i].result.answer_code ==
|
||||
rest::ResponseCode::ACCEPTED ||
|
||||
requests[i].result.answer_code == rest::ResponseCode::OK);
|
||||
if (replicationWorked) {
|
||||
bool found;
|
||||
requests[i].result.answer->header(StaticStrings::ErrorCodes,
|
||||
found);
|
||||
replicationWorked = !found;
|
||||
}
|
||||
if (!replicationWorked) {
|
||||
auto const& followerInfo = collection->followers();
|
||||
if (followerInfo->remove((*followers)[i])) {
|
||||
LOG_TOPIC(WARN, Logger::REPLICATION)
|
||||
<< "removeLocal: dropping follower " << (*followers)[i]
|
||||
<< " for shard " << collectionName;
|
||||
} else {
|
||||
LOG_TOPIC(ERR, Logger::REPLICATION)
|
||||
<< "removeLocal: could not drop follower "
|
||||
<< (*followers)[i] << " for shard " << collectionName;
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_COULD_NOT_DROP_FOLLOWER);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2657,18 +2648,12 @@ OperationResult transaction::Methods::truncateLocal(
|
|||
|
||||
TRI_ASSERT(isLocked(collection, AccessMode::Type::WRITE));
|
||||
|
||||
try {
|
||||
collection->truncate(this, options);
|
||||
} catch (basics::Exception const& ex) {
|
||||
Result res = collection->truncate(this, options);
|
||||
if (res.fail()) {
|
||||
if (lockResult.is(TRI_ERROR_LOCKED)) {
|
||||
unlockRecursive(cid, AccessMode::Type::WRITE);
|
||||
}
|
||||
return OperationResult(Result(ex.code(), ex.what()));
|
||||
} catch (std::exception const& ex) {
|
||||
if (lockResult.is(TRI_ERROR_LOCKED)) {
|
||||
unlockRecursive(cid, AccessMode::Type::WRITE);
|
||||
}
|
||||
return OperationResult(Result(TRI_ERROR_INTERNAL, ex.what()));
|
||||
return OperationResult(res);
|
||||
}
|
||||
|
||||
// Now see whether or not we have to do synchronous replication:
|
||||
|
@ -2699,44 +2684,40 @@ OperationResult transaction::Methods::truncateLocal(
|
|||
}
|
||||
|
||||
size_t nrDone = 0;
|
||||
size_t nrGood = cc->performRequests(requests, 120.0,
|
||||
nrDone, Logger::REPLICATION, false);
|
||||
if (nrGood < followers->size()) {
|
||||
// If any would-be-follower refused to follow there must be a
|
||||
// new leader in the meantime, in this case we must not allow
|
||||
// this operation to succeed, we simply return with a refusal
|
||||
// error (note that we use the follower version, since we have
|
||||
// lost leadership):
|
||||
if (findRefusal(requests)) {
|
||||
return OperationResult(TRI_ERROR_CLUSTER_SHARD_LEADER_RESIGNED);
|
||||
}
|
||||
// we drop all followers that were not successful:
|
||||
for (size_t i = 0; i < followers->size(); ++i) {
|
||||
bool replicationWorked =
|
||||
requests[i].done &&
|
||||
requests[i].result.status == CL_COMM_RECEIVED &&
|
||||
(requests[i].result.answer_code ==
|
||||
rest::ResponseCode::ACCEPTED ||
|
||||
requests[i].result.answer_code == rest::ResponseCode::OK);
|
||||
if (!replicationWorked) {
|
||||
auto const& followerInfo = collection->followers();
|
||||
if (followerInfo->remove((*followers)[i])) {
|
||||
LOG_TOPIC(WARN, Logger::REPLICATION)
|
||||
<< "truncateLocal: dropping follower " << (*followers)[i]
|
||||
<< " for shard " << collectionName;
|
||||
} else {
|
||||
LOG_TOPIC(ERR, Logger::REPLICATION)
|
||||
<< "truncateLocal: could not drop follower "
|
||||
<< (*followers)[i] << " for shard " << collectionName;
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_COULD_NOT_DROP_FOLLOWER);
|
||||
}
|
||||
cc->performRequests(requests, 120.0, nrDone, Logger::REPLICATION, false);
|
||||
// If any would-be-follower refused to follow there must be a
|
||||
// new leader in the meantime, in this case we must not allow
|
||||
// this operation to succeed, we simply return with a refusal
|
||||
// error (note that we use the follower version, since we have
|
||||
// lost leadership):
|
||||
if (findRefusal(requests)) {
|
||||
return OperationResult(TRI_ERROR_CLUSTER_SHARD_LEADER_RESIGNED);
|
||||
}
|
||||
// we drop all followers that were not successful:
|
||||
for (size_t i = 0; i < followers->size(); ++i) {
|
||||
bool replicationWorked =
|
||||
requests[i].done &&
|
||||
requests[i].result.status == CL_COMM_RECEIVED &&
|
||||
(requests[i].result.answer_code ==
|
||||
rest::ResponseCode::ACCEPTED ||
|
||||
requests[i].result.answer_code == rest::ResponseCode::OK);
|
||||
if (!replicationWorked) {
|
||||
auto const& followerInfo = collection->followers();
|
||||
if (followerInfo->remove((*followers)[i])) {
|
||||
LOG_TOPIC(WARN, Logger::REPLICATION)
|
||||
<< "truncateLocal: dropping follower " << (*followers)[i]
|
||||
<< " for shard " << collectionName;
|
||||
} else {
|
||||
LOG_TOPIC(ERR, Logger::REPLICATION)
|
||||
<< "truncateLocal: could not drop follower "
|
||||
<< (*followers)[i] << " for shard " << collectionName;
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_COULD_NOT_DROP_FOLLOWER);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Result res;
|
||||
if (lockResult.is(TRI_ERROR_LOCKED)) {
|
||||
res = unlockRecursive(cid, AccessMode::Type::WRITE);
|
||||
}
|
||||
|
|
|
@ -893,11 +893,17 @@ void LogicalCollection::deferDropCollection(
|
|||
/// @brief reads an element from the document collection
|
||||
Result LogicalCollection::read(transaction::Methods* trx, StringRef const& key,
|
||||
ManagedDocumentResult& result, bool lock) {
|
||||
TRI_IF_FAILURE("LogicalCollection::read") {
|
||||
return Result(TRI_ERROR_DEBUG);
|
||||
}
|
||||
return getPhysical()->read(trx, key, result, lock);
|
||||
}
|
||||
|
||||
Result LogicalCollection::read(transaction::Methods* trx, arangodb::velocypack::Slice const& key,
|
||||
ManagedDocumentResult& result, bool lock) {
|
||||
TRI_IF_FAILURE("LogicalCollection::read") {
|
||||
return Result(TRI_ERROR_DEBUG);
|
||||
}
|
||||
return getPhysical()->read(trx, key, result, lock);
|
||||
}
|
||||
|
||||
|
@ -906,9 +912,12 @@ Result LogicalCollection::read(transaction::Methods* trx, arangodb::velocypack::
|
|||
/// the read-cache
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void LogicalCollection::truncate(transaction::Methods* trx,
|
||||
OperationOptions& options) {
|
||||
getPhysical()->truncate(trx, options);
|
||||
Result LogicalCollection::truncate(transaction::Methods* trx,
|
||||
OperationOptions& options) {
|
||||
TRI_IF_FAILURE("LogicalCollection::truncate") {
|
||||
return Result(TRI_ERROR_DEBUG);
|
||||
}
|
||||
return getPhysical()->truncate(trx, options);
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -921,6 +930,9 @@ Result LogicalCollection::insert(transaction::Methods* trx,
|
|||
OperationOptions& options,
|
||||
TRI_voc_tick_t& resultMarkerTick, bool lock,
|
||||
TRI_voc_tick_t& revisionId) {
|
||||
TRI_IF_FAILURE("LogicalCollection::insert") {
|
||||
return Result(TRI_ERROR_DEBUG);
|
||||
}
|
||||
resultMarkerTick = 0;
|
||||
return getPhysical()->insert(trx, slice, result, options, resultMarkerTick,
|
||||
lock, revisionId);
|
||||
|
@ -934,6 +946,9 @@ Result LogicalCollection::update(transaction::Methods* trx,
|
|||
TRI_voc_tick_t& resultMarkerTick, bool lock,
|
||||
TRI_voc_rid_t& prevRev,
|
||||
ManagedDocumentResult& previous) {
|
||||
TRI_IF_FAILURE("LogicalCollection::update") {
|
||||
return Result(TRI_ERROR_DEBUG);
|
||||
}
|
||||
resultMarkerTick = 0;
|
||||
|
||||
if (!newSlice.isObject()) {
|
||||
|
@ -959,6 +974,9 @@ Result LogicalCollection::replace(transaction::Methods* trx,
|
|||
TRI_voc_tick_t& resultMarkerTick, bool lock,
|
||||
TRI_voc_rid_t& prevRev,
|
||||
ManagedDocumentResult& previous) {
|
||||
TRI_IF_FAILURE("LogicalCollection::replace") {
|
||||
return Result(TRI_ERROR_DEBUG);
|
||||
}
|
||||
resultMarkerTick = 0;
|
||||
|
||||
if (!newSlice.isObject()) {
|
||||
|
@ -977,6 +995,9 @@ Result LogicalCollection::remove(transaction::Methods* trx,
|
|||
TRI_voc_tick_t& resultMarkerTick, bool lock,
|
||||
TRI_voc_rid_t& prevRev,
|
||||
ManagedDocumentResult& previous) {
|
||||
TRI_IF_FAILURE("LogicalCollection::remove") {
|
||||
return Result(TRI_ERROR_DEBUG);
|
||||
}
|
||||
resultMarkerTick = 0;
|
||||
TRI_voc_rid_t revisionId = 0;
|
||||
return getPhysical()->remove(trx, slice, previous, options, resultMarkerTick, lock, prevRev, revisionId);
|
||||
|
@ -1079,25 +1100,3 @@ ChecksumResult LogicalCollection::checksum(bool withRevisions, bool withData) co
|
|||
|
||||
return ChecksumResult(std::move(b));
|
||||
}
|
||||
|
||||
Result LogicalCollection::compareChecksums(VPackSlice checksumSlice, std::string const& referenceChecksum) const {
|
||||
if (!checksumSlice.isString()) {
|
||||
return Result(
|
||||
TRI_ERROR_REPLICATION_WRONG_CHECKSUM_FORMAT,
|
||||
std::string("Checksum must be a string but is ") + checksumSlice.typeName()
|
||||
);
|
||||
}
|
||||
|
||||
auto checksum = checksumSlice.copyString();
|
||||
|
||||
if (checksum != referenceChecksum) {
|
||||
return Result(
|
||||
TRI_ERROR_REPLICATION_WRONG_CHECKSUM,
|
||||
"'checksum' is wrong. Expected: "
|
||||
+ referenceChecksum
|
||||
+ ". Actual: " + checksum
|
||||
);
|
||||
}
|
||||
|
||||
return Result();
|
||||
}
|
||||
|
|
|
@ -279,8 +279,7 @@ class LogicalCollection: public LogicalDataSource {
|
|||
ManagedDocumentResult& result, bool);
|
||||
|
||||
/// @brief processes a truncate operation
|
||||
/// NOTE: This function throws on error
|
||||
void truncate(transaction::Methods* trx, OperationOptions&);
|
||||
Result truncate(transaction::Methods* trx, OperationOptions&);
|
||||
|
||||
Result insert(transaction::Methods*, velocypack::Slice const,
|
||||
ManagedDocumentResult& result, OperationOptions&,
|
||||
|
@ -341,10 +340,6 @@ class LogicalCollection: public LogicalDataSource {
|
|||
|
||||
ChecksumResult checksum(bool, bool) const;
|
||||
|
||||
// compares the checksum value passed in the Slice (must be of type String)
|
||||
// with the checksum provided in the reference checksum
|
||||
Result compareChecksums(velocypack::Slice checksumSlice, std::string const& referenceChecksum) const;
|
||||
|
||||
std::unique_ptr<FollowerInfo> const& followers() const;
|
||||
|
||||
protected:
|
||||
|
|
|
@ -131,7 +131,6 @@
|
|||
"ERROR_REPLICATION_APPLIER_STOPPED" : { "code" : 1412, "message" : "replication stopped" },
|
||||
"ERROR_REPLICATION_NO_START_TICK" : { "code" : 1413, "message" : "no start tick" },
|
||||
"ERROR_REPLICATION_START_TICK_NOT_PRESENT" : { "code" : 1414, "message" : "start tick not present" },
|
||||
"ERROR_REPLICATION_WRONG_CHECKSUM_FORMAT" : { "code" : 1415, "message" : "the checksum format is wrong" },
|
||||
"ERROR_REPLICATION_WRONG_CHECKSUM" : { "code" : 1416, "message" : "wrong checksum" },
|
||||
"ERROR_REPLICATION_SHARD_NONEMPTY" : { "code" : 1417, "message" : "shard not empty" },
|
||||
"ERROR_CLUSTER_NO_AGENCY" : { "code" : 1450, "message" : "could not connect to agency" },
|
||||
|
|
|
@ -252,51 +252,6 @@ global.DEFINE_MODULE('internal', (function () {
|
|||
delete global.SYS_BASE64ENCODE;
|
||||
}
|
||||
|
||||
// //////////////////////////////////////////////////////////////////////////////
|
||||
// / @brief debugSegfault
|
||||
// //////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
if (global.SYS_DEBUG_SEGFAULT) {
|
||||
exports.debugSegfault = global.SYS_DEBUG_SEGFAULT;
|
||||
delete global.SYS_DEBUG_SEGFAULT;
|
||||
}
|
||||
|
||||
// //////////////////////////////////////////////////////////////////////////////
|
||||
// / @brief debugSetFailAt
|
||||
// //////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
if (global.SYS_DEBUG_SET_FAILAT) {
|
||||
exports.debugSetFailAt = global.SYS_DEBUG_SET_FAILAT;
|
||||
delete global.SYS_DEBUG_SET_FAILAT;
|
||||
}
|
||||
|
||||
// //////////////////////////////////////////////////////////////////////////////
|
||||
// / @brief debugRemoveFailAt
|
||||
// //////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
if (global.SYS_DEBUG_REMOVE_FAILAT) {
|
||||
exports.debugRemoveFailAt = global.SYS_DEBUG_REMOVE_FAILAT;
|
||||
delete global.SYS_DEBUG_REMOVE_FAILAT;
|
||||
}
|
||||
|
||||
// //////////////////////////////////////////////////////////////////////////////
|
||||
// / @brief debugClearFailAt
|
||||
// //////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
if (global.SYS_DEBUG_CLEAR_FAILAT) {
|
||||
exports.debugClearFailAt = global.SYS_DEBUG_CLEAR_FAILAT;
|
||||
delete global.SYS_DEBUG_CLEAR_FAILAT;
|
||||
}
|
||||
|
||||
// //////////////////////////////////////////////////////////////////////////////
|
||||
// / @brief debugCanUseFailAt
|
||||
// //////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
if (global.SYS_DEBUG_CAN_USE_FAILAT) {
|
||||
exports.debugCanUseFailAt = global.SYS_DEBUG_CAN_USE_FAILAT;
|
||||
delete global.SYS_DEBUG_CAN_USE_FAILAT;
|
||||
}
|
||||
|
||||
// //////////////////////////////////////////////////////////////////////////////
|
||||
// / @brief download
|
||||
// //////////////////////////////////////////////////////////////////////////////
|
||||
|
|
|
@ -405,4 +405,50 @@
|
|||
exports.getCollectionShardDistribution = global.SYS_CLUSTER_COLLECTION_SHARD_DISTRIBUTION;
|
||||
delete global.SYS_CLUSTER_COLLECTION_SHARD_DISTRIBUTION;
|
||||
}
|
||||
|
||||
// //////////////////////////////////////////////////////////////////////////////
|
||||
// / @brief debugSegfault
|
||||
// //////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
if (global.SYS_DEBUG_SEGFAULT) {
|
||||
exports.debugSegfault = global.SYS_DEBUG_SEGFAULT;
|
||||
delete global.SYS_DEBUG_SEGFAULT;
|
||||
}
|
||||
|
||||
// //////////////////////////////////////////////////////////////////////////////
|
||||
// / @brief debugSetFailAt
|
||||
// //////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
if (global.SYS_DEBUG_SET_FAILAT) {
|
||||
exports.debugSetFailAt = global.SYS_DEBUG_SET_FAILAT;
|
||||
delete global.SYS_DEBUG_SET_FAILAT;
|
||||
}
|
||||
|
||||
// //////////////////////////////////////////////////////////////////////////////
|
||||
// / @brief debugRemoveFailAt
|
||||
// //////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
if (global.SYS_DEBUG_REMOVE_FAILAT) {
|
||||
exports.debugRemoveFailAt = global.SYS_DEBUG_REMOVE_FAILAT;
|
||||
delete global.SYS_DEBUG_REMOVE_FAILAT;
|
||||
}
|
||||
|
||||
// //////////////////////////////////////////////////////////////////////////////
|
||||
// / @brief debugClearFailAt
|
||||
// //////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
if (global.SYS_DEBUG_CLEAR_FAILAT) {
|
||||
exports.debugClearFailAt = global.SYS_DEBUG_CLEAR_FAILAT;
|
||||
delete global.SYS_DEBUG_CLEAR_FAILAT;
|
||||
}
|
||||
|
||||
// //////////////////////////////////////////////////////////////////////////////
|
||||
// / @brief debugCanUseFailAt
|
||||
// //////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
if (global.SYS_DEBUG_CAN_USE_FAILAT) {
|
||||
exports.debugCanUseFailAt = global.SYS_DEBUG_CAN_USE_FAILAT;
|
||||
delete global.SYS_DEBUG_CAN_USE_FAILAT;
|
||||
}
|
||||
|
||||
}());
|
||||
|
|
|
@ -164,7 +164,6 @@ ERROR_REPLICATION_RUNNING,1411,"cannot perform operation while applier is runnin
|
|||
ERROR_REPLICATION_APPLIER_STOPPED,1412,"replication stopped","Special error code used to indicate the replication applier was stopped by a user."
|
||||
ERROR_REPLICATION_NO_START_TICK,1413,"no start tick","Will be raised when the replication applier is started without a known start tick value."
|
||||
ERROR_REPLICATION_START_TICK_NOT_PRESENT,1414,"start tick not present","Will be raised when the replication applier fetches data using a start tick, but that start tick is not present on the logger server anymore."
|
||||
ERROR_REPLICATION_WRONG_CHECKSUM_FORMAT,1415,"the checksum format is wrong", "Will be raised when the format of the checksum is wrong")
|
||||
ERROR_REPLICATION_WRONG_CHECKSUM,1416,"wrong checksum","Will be raised when a new born follower submits a wrong checksum"
|
||||
ERROR_REPLICATION_SHARD_NONEMPTY,1417,"shard not empty","Will be raised when a shard is not empty and the follower tries a shortcut"
|
||||
|
||||
|
|
|
@ -130,7 +130,6 @@ void TRI_InitializeErrorMessages() {
|
|||
REG_ERROR(ERROR_REPLICATION_APPLIER_STOPPED, "replication stopped");
|
||||
REG_ERROR(ERROR_REPLICATION_NO_START_TICK, "no start tick");
|
||||
REG_ERROR(ERROR_REPLICATION_START_TICK_NOT_PRESENT, "start tick not present");
|
||||
REG_ERROR(ERROR_REPLICATION_WRONG_CHECKSUM_FORMAT, "the checksum format is wrong");
|
||||
REG_ERROR(ERROR_REPLICATION_WRONG_CHECKSUM, "wrong checksum");
|
||||
REG_ERROR(ERROR_REPLICATION_SHARD_NONEMPTY, "shard not empty");
|
||||
REG_ERROR(ERROR_CLUSTER_NO_AGENCY, "could not connect to agency");
|
||||
|
|
|
@ -660,11 +660,6 @@ constexpr int TRI_ERROR_REPLICATION_NO_START_TICK
|
|||
/// tick, but that start tick is not present on the logger server anymore.
|
||||
constexpr int TRI_ERROR_REPLICATION_START_TICK_NOT_PRESENT = 1414;
|
||||
|
||||
/// 1415: ERROR_REPLICATION_WRONG_CHECKSUM_FORMAT
|
||||
/// "the checksum format is wrong"
|
||||
/// "Will be raised when the format of the checksum is wrong")
|
||||
constexpr int TRI_ERROR_REPLICATION_WRONG_CHECKSUM_FORMAT = 1415;
|
||||
|
||||
/// 1416: ERROR_REPLICATION_WRONG_CHECKSUM
|
||||
/// "wrong checksum"
|
||||
/// Will be raised when a new born follower submits a wrong checksum
|
||||
|
|
|
@ -885,9 +885,10 @@ void PhysicalCollectionMock::setPath(std::string const& value) {
|
|||
physicalPath = value;
|
||||
}
|
||||
|
||||
void PhysicalCollectionMock::truncate(arangodb::transaction::Methods* trx, arangodb::OperationOptions& options) {
|
||||
arangodb::Result PhysicalCollectionMock::truncate(arangodb::transaction::Methods*, arangodb::OperationOptions&) {
|
||||
before();
|
||||
documents.clear();
|
||||
return arangodb::Result();
|
||||
}
|
||||
|
||||
arangodb::Result PhysicalCollectionMock::update(arangodb::transaction::Methods* trx, arangodb::velocypack::Slice const newSlice, arangodb::ManagedDocumentResult& result, arangodb::OperationOptions& options, TRI_voc_tick_t& resultMarkerTick, bool lock, TRI_voc_rid_t& prevRev, arangodb::ManagedDocumentResult& previous, arangodb::velocypack::Slice const key) {
|
||||
|
|
|
@ -92,7 +92,7 @@ class PhysicalCollectionMock: public arangodb::PhysicalCollection {
|
|||
virtual int restoreIndex(arangodb::transaction::Methods*, arangodb::velocypack::Slice const&, std::shared_ptr<arangodb::Index>&) override;
|
||||
virtual TRI_voc_rid_t revision(arangodb::transaction::Methods* trx) const override;
|
||||
virtual void setPath(std::string const&) override;
|
||||
virtual void truncate(arangodb::transaction::Methods* trx, arangodb::OperationOptions& options) override;
|
||||
virtual arangodb::Result truncate(arangodb::transaction::Methods* trx, arangodb::OperationOptions&) override;
|
||||
virtual arangodb::Result update(arangodb::transaction::Methods* trx, arangodb::velocypack::Slice const newSlice, arangodb::ManagedDocumentResult& result, arangodb::OperationOptions& options, TRI_voc_tick_t& resultMarkerTick, bool lock, TRI_voc_rid_t& prevRev, arangodb::ManagedDocumentResult& previous, arangodb::velocypack::Slice const key) override;
|
||||
virtual void load() override {}
|
||||
virtual void unload() override {}
|
||||
|
|
|
@ -35,7 +35,6 @@ const _ = require("lodash");
|
|||
const wait = require("internal").wait;
|
||||
const suspendExternal = require("internal").suspendExternal;
|
||||
const continueExternal = require("internal").continueExternal;
|
||||
const download = require('internal').download;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief test suite
|
||||
|
|
|
@ -0,0 +1,684 @@
|
|||
/*jshint globalstrict:false, strict:false */
|
||||
/*global assertTrue, assertFalse, assertEqual, fail, instanceInfo */
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief test synchronous replication in the cluster
|
||||
///
|
||||
/// DISCLAIMER
|
||||
///
|
||||
/// Copyright 2016-2016 ArangoDB GmbH, Cologne, Germany
|
||||
///
|
||||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
/// you may not use this file except in compliance with the License.
|
||||
/// You may obtain a copy of the License at
|
||||
///
|
||||
/// http://www.apache.org/licenses/LICENSE-2.0
|
||||
///
|
||||
/// Unless required by applicable law or agreed to in writing, software
|
||||
/// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
/// See the License for the specific language governing permissions and
|
||||
/// limitations under the License.
|
||||
///
|
||||
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
///
|
||||
/// @author Max Neunhoeffer
|
||||
/// @author Copyright 2016, ArangoDB GmbH, Cologne, Germany
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
const jsunity = require("jsunity");
|
||||
|
||||
const arangodb = require("@arangodb");
|
||||
const db = arangodb.db;
|
||||
const ERRORS = arangodb.errors;
|
||||
const _ = require("lodash");
|
||||
const wait = require("internal").wait;
|
||||
const request = require('@arangodb/request');
|
||||
const suspendExternal = require("internal").suspendExternal;
|
||||
const continueExternal = require("internal").continueExternal;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief test suite
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
function SynchronousReplicationSuite() {
|
||||
'use strict';
|
||||
var cn = "UnitTestSyncRep";
|
||||
var c;
|
||||
var cinfo;
|
||||
var ccinfo;
|
||||
var shards;
|
||||
|
||||
if (!require('internal').debugSetFailAt) {
|
||||
console.info("Failure Tests disabled, Skipping...");
|
||||
return {};
|
||||
}
|
||||
|
||||
function baseUrl(endpoint) { // arango.getEndpoint()
|
||||
return endpoint.replace(/^tcp:/, 'http:').replace(/^ssl:/, 'https:');
|
||||
};
|
||||
|
||||
/// @brief set failure point
|
||||
function debugSetFailAt(endpoint, failAt) {
|
||||
assertTrue(failAt !== undefined);
|
||||
let res = request.put({
|
||||
url: baseUrl(endpoint) + '/_admin/debug/failat/' + failAt,
|
||||
body: ""
|
||||
});
|
||||
if (res.status !== 200) {
|
||||
throw "Error seting failure point";
|
||||
}
|
||||
}
|
||||
|
||||
/// @brief remove failure point
|
||||
function debugRemoveFailAt(endpoint, failAt) {
|
||||
assertTrue(failAt !== undefined);
|
||||
let res = request.delete({
|
||||
url: baseUrl(endpoint) + '/_admin/debug/failat/' + failAt,
|
||||
body: ""
|
||||
});
|
||||
if (res.status !== 200) {
|
||||
throw "Error seting failure point";
|
||||
}
|
||||
}
|
||||
|
||||
/// @brief remove all failure points
|
||||
function debugClearFailAt(endpoint) {
|
||||
let res = request.delete({
|
||||
url: baseUrl(endpoint) + '/_admin/debug/failat',
|
||||
body: ""
|
||||
});
|
||||
if (res.status !== 200) {
|
||||
throw "Error seting failure point";
|
||||
}
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief find out servers for the system collections
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
function findCollectionServers(database, collection) {
|
||||
var cinfo = global.ArangoClusterInfo.getCollectionInfo(database, collection);
|
||||
var shard = Object.keys(cinfo.shards)[0];
|
||||
return cinfo.shards[shard];
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief wait for synchronous replication
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
function waitForSynchronousReplication(database) {
|
||||
console.info("Waiting for synchronous replication to settle...");
|
||||
global.ArangoClusterInfo.flush();
|
||||
cinfo = global.ArangoClusterInfo.getCollectionInfo(database, cn);
|
||||
shards = Object.keys(cinfo.shards);
|
||||
var count = 0;
|
||||
var replicas;
|
||||
while (++count <= 300) {
|
||||
ccinfo = shards.map(
|
||||
s => global.ArangoClusterInfo.getCollectionInfoCurrent(database, cn, s)
|
||||
);
|
||||
console.info("Plan:", cinfo.shards, "Current:", ccinfo.map(s => s.servers));
|
||||
replicas = ccinfo.map(s => s.servers.length);
|
||||
if (replicas.every(x => x > 1)) {
|
||||
console.info("Replication up and running!");
|
||||
return true;
|
||||
}
|
||||
wait(0.5);
|
||||
global.ArangoClusterInfo.flush();
|
||||
}
|
||||
console.error("Replication did not finish");
|
||||
return false;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief fail the follower
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
function failFollower(failAt) {
|
||||
var follower = cinfo.shards[shards[0]][1];
|
||||
var endpoint = global.ArangoClusterInfo.getServerEndpoint(follower);
|
||||
|
||||
// Now look for instanceInfo:
|
||||
var pos = _.findIndex(global.instanceInfo.arangods,
|
||||
x => x.endpoint === endpoint);
|
||||
assertTrue(pos >= 0);
|
||||
if (failAt) {
|
||||
debugSetFailAt(endpoint, failAt);
|
||||
console.info("Have added failure in follower", follower, " at ", failAt);
|
||||
} else {
|
||||
assertTrue(suspendExternal(global.instanceInfo.arangods[pos].pid));
|
||||
console.info("Have failed follower", follower);
|
||||
}
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief heal the follower
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
function healFollower(failAt) {
|
||||
var follower = cinfo.shards[shards[0]][1];
|
||||
var endpoint = global.ArangoClusterInfo.getServerEndpoint(follower);
|
||||
// Now look for instanceInfo:
|
||||
var pos = _.findIndex(global.instanceInfo.arangods,
|
||||
x => x.endpoint === endpoint);
|
||||
assertTrue(pos >= 0);
|
||||
if (failAt) {
|
||||
debugRemoveFailAt(endpoint, failAt);
|
||||
console.info("Have removed failure in follower", follower, " at ", failAt);
|
||||
} else {
|
||||
assertTrue(continueExternal(global.instanceInfo.arangods[pos].pid));
|
||||
console.info("Have healed follower", follower);
|
||||
}
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief fail the leader
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
function failLeader(failAt) {
|
||||
var leader = cinfo.shards[shards[0]][0];
|
||||
var endpoint = global.ArangoClusterInfo.getServerEndpoint(leader);
|
||||
// Now look for instanceInfo:
|
||||
var pos = _.findIndex(global.instanceInfo.arangods,
|
||||
x => x.endpoint === endpoint);
|
||||
assertTrue(pos >= 0);
|
||||
if (failAt) {
|
||||
debugSetFailAt(endpoint, failAt);
|
||||
console.info("Have failed leader", leader, " at ", failAt);
|
||||
} else {
|
||||
assertTrue(suspendExternal(global.instanceInfo.arangods[pos].pid));
|
||||
console.info("Have failed leader", leader);
|
||||
}
|
||||
return leader;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief heal the follower
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
function healLeader(failAt) {
|
||||
var leader = cinfo.shards[shards[0]][0];
|
||||
var endpoint = global.ArangoClusterInfo.getServerEndpoint(leader);
|
||||
// Now look for instanceInfo:
|
||||
var pos = _.findIndex(global.instanceInfo.arangods,
|
||||
x => x.endpoint === endpoint);
|
||||
assertTrue(pos >= 0);
|
||||
if (failAt) {
|
||||
debugRemoveFailAt(endpoint, failAt);
|
||||
console.info("Have removed failure in leader", leader, " at ", failAt);
|
||||
} else {
|
||||
assertTrue(continueExternal(global.instanceInfo.arangods[pos].pid));
|
||||
console.info("Have healed leader", leader);
|
||||
}
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief basic operations, with various failure modes:
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
function runBasicOperations(monkeyFn) {
|
||||
monkeyFn(1);
|
||||
|
||||
// Insert with check:
|
||||
var id = c.insert({ Hallo: 12 });
|
||||
assertEqual(1, c.count());
|
||||
|
||||
monkeyFn(2);
|
||||
|
||||
var doc = c.document(id._key);
|
||||
assertEqual(12, doc.Hallo);
|
||||
|
||||
monkeyFn(3);
|
||||
|
||||
var ids = c.insert([{ Hallo: 13 }, { Hallo: 14 }]);
|
||||
assertEqual(3, c.count());
|
||||
assertEqual(2, ids.length);
|
||||
|
||||
monkeyFn(4);
|
||||
|
||||
var docs = c.document([ids[0]._key, ids[1]._key]);
|
||||
assertEqual(2, docs.length);
|
||||
assertEqual(13, docs[0].Hallo);
|
||||
assertEqual(14, docs[1].Hallo);
|
||||
|
||||
monkeyFn(5);
|
||||
|
||||
// Replace with check:
|
||||
c.replace(id._key, { "Hallo": 100 });
|
||||
|
||||
monkeyFn(6);
|
||||
|
||||
doc = c.document(id._key);
|
||||
assertEqual(100, doc.Hallo);
|
||||
|
||||
monkeyFn(7);
|
||||
|
||||
c.replace([ids[0]._key, ids[1]._key], [{ Hallo: 101 }, { Hallo: 102 }]);
|
||||
|
||||
monkeyFn(8);
|
||||
|
||||
docs = c.document([ids[0]._key, ids[1]._key]);
|
||||
assertEqual(2, docs.length);
|
||||
assertEqual(101, docs[0].Hallo);
|
||||
assertEqual(102, docs[1].Hallo);
|
||||
|
||||
monkeyFn(9);
|
||||
|
||||
// Update with check:
|
||||
c.update(id._key, { "Hallox": 105 });
|
||||
|
||||
monkeyFn(10);
|
||||
|
||||
doc = c.document(id._key);
|
||||
assertEqual(100, doc.Hallo);
|
||||
assertEqual(105, doc.Hallox);
|
||||
|
||||
monkeyFn(11);
|
||||
|
||||
c.update([ids[0]._key, ids[1]._key], [{ Hallox: 106 }, { Hallox: 107 }]);
|
||||
|
||||
monkeyFn(12);
|
||||
|
||||
docs = c.document([ids[0]._key, ids[1]._key]);
|
||||
assertEqual(2, docs.length);
|
||||
assertEqual(101, docs[0].Hallo);
|
||||
assertEqual(102, docs[1].Hallo);
|
||||
assertEqual(106, docs[0].Hallox);
|
||||
assertEqual(107, docs[1].Hallox);
|
||||
|
||||
monkeyFn(13);
|
||||
|
||||
// AQL:
|
||||
var q = db._query(`FOR x IN @@cn
|
||||
FILTER x.Hallo > 0
|
||||
SORT x.Hallo
|
||||
RETURN {"Hallo": x.Hallo}`, { "@cn": cn });
|
||||
docs = q.toArray();
|
||||
assertEqual(3, docs.length);
|
||||
assertEqual([{ Hallo: 100 }, { Hallo: 101 }, { Hallo: 102 }], docs);
|
||||
|
||||
monkeyFn(14);
|
||||
|
||||
// Remove with check:
|
||||
c.remove(id._key);
|
||||
|
||||
monkeyFn(15);
|
||||
|
||||
try {
|
||||
doc = c.document(id._key);
|
||||
fail();
|
||||
}
|
||||
catch (e1) {
|
||||
assertEqual(ERRORS.ERROR_ARANGO_DOCUMENT_NOT_FOUND.code, e1.errorNum);
|
||||
}
|
||||
|
||||
monkeyFn(16);
|
||||
|
||||
assertEqual(2, c.count());
|
||||
|
||||
monkeyFn(17);
|
||||
|
||||
c.remove([ids[0]._key, ids[1]._key]);
|
||||
|
||||
monkeyFn(18);
|
||||
|
||||
docs = c.document([ids[0]._key, ids[1]._key]);
|
||||
assertEqual(2, docs.length);
|
||||
assertTrue(docs[0].error);
|
||||
assertTrue(docs[1].error);
|
||||
|
||||
monkeyFn(19);
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief the actual tests
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
return {
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief set up
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
setUp: function () {
|
||||
var systemCollServers = findCollectionServers("_system", "_graphs");
|
||||
console.info("System collections use servers:", systemCollServers);
|
||||
while (true) {
|
||||
db._drop(cn);
|
||||
c = db._create(cn, {
|
||||
numberOfShards: 1, replicationFactor: 2,
|
||||
avoidServers: systemCollServers
|
||||
});
|
||||
var servers = findCollectionServers("_system", cn);
|
||||
console.info("Test collections uses servers:", servers);
|
||||
if (_.intersection(systemCollServers, servers).length === 0) {
|
||||
return;
|
||||
}
|
||||
console.info("Need to recreate collection to avoid system collection servers.");
|
||||
//waitForSynchronousReplication("_system");
|
||||
console.info("Synchronous replication has settled, now dropping again.");
|
||||
}
|
||||
},
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief tear down
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
tearDown: function () {
|
||||
var servers = global.ArangoClusterInfo.getDBServers();
|
||||
servers.forEach(s => {
|
||||
let endpoint = global.ArangoClusterInfo.getServerEndpoint(s.serverId);
|
||||
debugClearFailAt(endpoint);
|
||||
});
|
||||
db._drop(cn);
|
||||
//global.ArangoAgency.set('Target/FailedServers', {});
|
||||
},
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief check whether we have access to global.instanceInfo
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
testCheckInstanceInfo: function () {
|
||||
assertTrue(global.instanceInfo !== undefined);
|
||||
},
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief check if a synchronously replicated collection gets online
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
testSetup: function () {
|
||||
assertTrue(waitForSynchronousReplication("_system"));
|
||||
},
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief fail in place 1
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
testBasicOperationsFollowerFail1: function () {
|
||||
assertTrue(waitForSynchronousReplication("_system"));
|
||||
runBasicOperations((place) => {
|
||||
if (place === 1) {
|
||||
failFollower("LogicalCollection::insert");
|
||||
} else if (place === 18) {
|
||||
healFollower("LogicalCollection::insert");
|
||||
}
|
||||
});
|
||||
assertTrue(waitForSynchronousReplication("_system"));
|
||||
},
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief fail in place 2
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
testBasicOperationsFollowerFail2: function () {
|
||||
assertTrue(waitForSynchronousReplication("_system"));
|
||||
runBasicOperations((place) => {
|
||||
if (place === 2) {
|
||||
failFollower("LogicalCollection::read");
|
||||
} else if (place === 18) {
|
||||
healFollower("LogicalCollection::read");
|
||||
}
|
||||
});
|
||||
assertTrue(waitForSynchronousReplication("_system"));
|
||||
},
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief fail in place 3
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
testBasicOperationsFollowerFail3: function () {
|
||||
assertTrue(waitForSynchronousReplication("_system"));
|
||||
runBasicOperations((place) => {
|
||||
if (place === 2) {
|
||||
failFollower("LogicalCollection::insert");
|
||||
} else if (place === 18) {
|
||||
healFollower("LogicalCollection::insert");
|
||||
}
|
||||
});
|
||||
assertTrue(waitForSynchronousReplication("_system"));
|
||||
},
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief fail in place 5
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
testBasicOperationsFollowerFail5: function () {
|
||||
assertTrue(waitForSynchronousReplication("_system"));
|
||||
runBasicOperations((place) => {
|
||||
if (place === 2) {
|
||||
failFollower("LogicalCollection::replace");
|
||||
} else if (place === 18) {
|
||||
healFollower("LogicalCollection::replace");
|
||||
}
|
||||
});
|
||||
assertTrue(waitForSynchronousReplication("_system"));
|
||||
},
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief fail in place 7
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
testBasicOperationsFollowerFail7: function () {
|
||||
assertTrue(waitForSynchronousReplication("_system"));
|
||||
runBasicOperations((place) => {
|
||||
if (place === 2) {
|
||||
failFollower("LogicalCollection::replace");
|
||||
} else if (place === 18) {
|
||||
healFollower("LogicalCollection::replace");
|
||||
}
|
||||
});
|
||||
assertTrue(waitForSynchronousReplication("_system"));
|
||||
},
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief fail in place 9
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
testBasicOperationsFollowerFail9: function () {
|
||||
assertTrue(waitForSynchronousReplication("_system"));
|
||||
runBasicOperations((place) => {
|
||||
if (place === 2) {
|
||||
failFollower("LogicalCollection::update");
|
||||
} else if (place === 18) {
|
||||
healFollower("LogicalCollection::update");
|
||||
}
|
||||
});
|
||||
assertTrue(waitForSynchronousReplication("_system"));
|
||||
},
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief fail in place 9
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
testBasicOperationsFollowerFail14: function () {
|
||||
assertTrue(waitForSynchronousReplication("_system"));
|
||||
runBasicOperations((place) => {
|
||||
if (place === 2) {
|
||||
failFollower("LogicalCollection::remove");
|
||||
} else if (place === 18) {
|
||||
healFollower("LogicalCollection::remove");
|
||||
}
|
||||
});
|
||||
assertTrue(waitForSynchronousReplication("_system"));
|
||||
},
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief follower fail in place 1 until 2, leader fail in 3
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
testBasicOperationsCombinedFail1_2_3: function () {
|
||||
assertTrue(waitForSynchronousReplication("_system"));
|
||||
runBasicOperations((place) => {
|
||||
if (place === 1) {
|
||||
failFollower("LogicalCollection::insert"); // replication fails
|
||||
} else if (place === 2) {
|
||||
healFollower("LogicalCollection::insert");
|
||||
} else if (place === 3) {
|
||||
assertTrue(waitForSynchronousReplication("_system"));
|
||||
failLeader();
|
||||
} else if (place === 19) {
|
||||
healLeader();
|
||||
}
|
||||
});
|
||||
assertTrue(waitForSynchronousReplication("_system"));
|
||||
},
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief follower fail in place 3 until 4, leader fails in 4 after in-sync
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
testBasicOperationsCombinedFail3_4: function () {
|
||||
assertTrue(waitForSynchronousReplication("_system"));
|
||||
runBasicOperations((place) => {
|
||||
if (place === 3) {
|
||||
failFollower("LogicalCollection::insert"); // replication fails
|
||||
} else if (place === 4) {
|
||||
healFollower("LogicalCollection::insert");
|
||||
assertTrue(waitForSynchronousReplication("_system"));
|
||||
failLeader();
|
||||
} else if (place === 19) {
|
||||
healLeader();
|
||||
}
|
||||
});
|
||||
assertTrue(waitForSynchronousReplication("_system"));
|
||||
},
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief follower fail in place 5 until 4, leader fails in 4 after in-sync
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
testBasicOperationsCombinedFail5_6: function () {
|
||||
assertTrue(waitForSynchronousReplication("_system"));
|
||||
runBasicOperations((place) => {
|
||||
if (place === 5) {
|
||||
failFollower("LogicalCollection::replace"); // replication fails
|
||||
} else if (place === 6) {
|
||||
healFollower("LogicalCollection::replace");
|
||||
assertTrue(waitForSynchronousReplication("_system"));
|
||||
failLeader();
|
||||
} else if (place === 19) {
|
||||
healLeader();
|
||||
}
|
||||
});
|
||||
assertTrue(waitForSynchronousReplication("_system"));
|
||||
},
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief follower fail in place 7 until 8, leader fails in 8 after in-sync
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
testBasicOperationsCombinedFail7_8: function () {
|
||||
assertTrue(waitForSynchronousReplication("_system"));
|
||||
runBasicOperations((place) => {
|
||||
if (place === 7) {
|
||||
failFollower("LogicalCollection::replace"); // replication fails
|
||||
} else if (place === 8) {
|
||||
healFollower("LogicalCollection::replace");
|
||||
assertTrue(waitForSynchronousReplication("_system"));
|
||||
failLeader();
|
||||
} else if (place === 19) {
|
||||
healLeader();
|
||||
}
|
||||
});
|
||||
assertTrue(waitForSynchronousReplication("_system"));
|
||||
},
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief follower fail in place 9 until 10, leader fails in 10 after in-sync
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
testBasicOperationsCombinedFail9_10: function () {
|
||||
assertTrue(waitForSynchronousReplication("_system"));
|
||||
runBasicOperations((place) => {
|
||||
if (place === 9) {
|
||||
failFollower("LogicalCollection::update"); // replication fails
|
||||
} else if (place === 10) {
|
||||
healFollower("LogicalCollection::update");
|
||||
assertTrue(waitForSynchronousReplication("_system"));
|
||||
failLeader();
|
||||
} else if (place === 19) {
|
||||
healLeader();
|
||||
}
|
||||
});
|
||||
assertTrue(waitForSynchronousReplication("_system"));
|
||||
},
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief follower fail in place 11 until 12, leader fails in 12 after in-sync
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
testBasicOperationsCombinedFail11_12: function () {
|
||||
assertTrue(waitForSynchronousReplication("_system"));
|
||||
runBasicOperations((place) => {
|
||||
if (place === 11) {
|
||||
failFollower("LogicalCollection::update"); // replication fails
|
||||
} else if (place === 12) {
|
||||
healFollower("LogicalCollection::update");
|
||||
assertTrue(waitForSynchronousReplication("_system"));
|
||||
failLeader();
|
||||
} else if (place === 19) {
|
||||
healLeader();
|
||||
}
|
||||
});
|
||||
assertTrue(waitForSynchronousReplication("_system"));
|
||||
},
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief follower fail in place 14 until 15, leader fails in 18 after in-sync
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
testBasicOperationsCombinedFail14_15: function () {
|
||||
assertTrue(waitForSynchronousReplication("_system"));
|
||||
runBasicOperations((place) => {
|
||||
if (place === 14) {
|
||||
failFollower("LogicalCollection::remove"); // replication fails
|
||||
} else if (place === 15) {
|
||||
healFollower("LogicalCollection::remove");
|
||||
assertTrue(waitForSynchronousReplication("_system"));
|
||||
failLeader();
|
||||
} else if (place === 19) {
|
||||
healLeader();
|
||||
}
|
||||
});
|
||||
assertTrue(waitForSynchronousReplication("_system"));
|
||||
},
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief follower fail in place 17 until 18, leader fails in 18 after in-sync
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
testBasicOperationsCombinedFail17_18: function () {
|
||||
assertTrue(waitForSynchronousReplication("_system"));
|
||||
runBasicOperations((place) => {
|
||||
if (place === 17) {
|
||||
failFollower("LogicalCollection::remove"); // replication fails
|
||||
} else if (place === 18) {
|
||||
healFollower("LogicalCollection::remove");
|
||||
assertTrue(waitForSynchronousReplication("_system"));
|
||||
failLeader();
|
||||
} else if (place === 19) {
|
||||
healLeader();
|
||||
}
|
||||
});
|
||||
assertTrue(waitForSynchronousReplication("_system"));
|
||||
},
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief just to allow a trailing comma at the end of the last test
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
testDummy: function () {
|
||||
assertEqual(12, 12);
|
||||
}
|
||||
|
||||
};
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief executes the test suite
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
jsunity.run(SynchronousReplicationSuite);
|
||||
|
||||
return jsunity.done();
|
||||
|
Loading…
Reference in New Issue