1
0
Fork 0

In high concurrency situations double checking checksums is a must have (#2890)

This commit is contained in:
m0ppers 2017-07-28 00:43:21 +02:00 committed by Frank Celler
parent 56143bc186
commit 6fefe5ab31
10 changed files with 237 additions and 68 deletions

View File

@ -3317,6 +3317,27 @@ void MMFilesRestReplicationHandler::handleCommandAddFollower() {
return;
}
VPackSlice const checksum = body.get("checksum");
// optional while intoroducing this bugfix. should definately be required with 3.4
// and throw a 400 then
if (checksum.isObject()) {
auto result = col->compareChecksums(checksum);
if (result.fail()) {
auto errorNumber = result.errorNumber();
rest::ResponseCode code;
if (errorNumber == TRI_ERROR_REPLICATION_WRONG_CHECKSUM ||
errorNumber == TRI_ERROR_REPLICATION_WRONG_CHECKSUM_FORMAT) {
code = rest::ResponseCode::BAD;
} else {
code = rest::ResponseCode::SERVER_ERROR;
}
generateError(code,
errorNumber, result.errorMessage());
return;
}
}
col->followers()->add(followerId.copyString());
VPackBuilder b;

View File

@ -1911,6 +1911,27 @@ void RocksDBRestReplicationHandler::handleCommandAddFollower() {
return;
}
VPackSlice const checksum = body.get("checksum");
// optional while intoroducing this bugfix. should definately be required with 3.4
// and throw a 400 then
if (checksum.isObject()) {
auto result = col->compareChecksums(checksum);
if (result.fail()) {
auto errorNumber = result.errorNumber();
rest::ResponseCode code;
if (errorNumber == TRI_ERROR_REPLICATION_WRONG_CHECKSUM ||
errorNumber == TRI_ERROR_REPLICATION_WRONG_CHECKSUM_FORMAT) {
code = rest::ResponseCode::BAD;
} else {
code = rest::ResponseCode::SERVER_ERROR;
}
generateError(code,
errorNumber, result.errorMessage());
return;
}
}
col->followers()->add(followerId.copyString());
VPackBuilder b;

View File

@ -27,7 +27,6 @@
#include "Aql/QueryString.h"
#include "Basics/StaticStrings.h"
#include "Basics/VelocyPackHelper.h"
#include "Basics/fasthash.h"
#include "Indexes/Index.h"
#include "StorageEngine/PhysicalCollection.h"
#include "Transaction/Helpers.h"
@ -371,73 +370,12 @@ static void JS_ChecksumCollection(
}
}
SingleCollectionTransaction trx(transaction::V8Context::Create(col->vocbase(), true),
col->cid(), AccessMode::Type::READ);
Result res = trx.begin();
if (!res.ok()) {
TRI_V8_THROW_EXCEPTION(res);
ChecksumResult result = col->checksum(withRevisions, withData);
if (!result.ok()) {
TRI_V8_THROW_EXCEPTION(result);
}
trx.pinData(col->cid()); // will throw when it fails
// get last tick
LogicalCollection* collection = trx.documentCollection();
auto physical = collection->getPhysical();
TRI_ASSERT(physical != nullptr);
std::string const revisionId = TRI_RidToString(physical->revision(&trx));
uint64_t hash = 0;
ManagedDocumentResult mmdr;
trx.invokeOnAllElements(col->name(), [&hash, &withData, &withRevisions, &trx, &collection, &mmdr](DocumentIdentifierToken const& token) {
if (collection->readDocument(&trx, token, mmdr)) {
VPackSlice const slice(mmdr.vpack());
uint64_t localHash = transaction::helpers::extractKeyFromDocument(slice).hashString();
if (withRevisions) {
localHash += transaction::helpers::extractRevSliceFromDocument(slice).hash();
}
if (withData) {
// with data
uint64_t const n = slice.length() ^ 0xf00ba44ba5;
uint64_t seed = fasthash64_uint64(n, 0xdeadf054);
for (auto const& it : VPackObjectIterator(slice, false)) {
// loop over all attributes, but exclude _rev, _id and _key
// _id is different for each collection anyway, _rev is covered by withRevisions, and _key
// was already handled before
VPackValueLength keyLength;
char const* key = it.key.getString(keyLength);
if (keyLength >= 3 &&
key[0] == '_' &&
((keyLength == 3 && memcmp(key, "_id", 3) == 0) ||
(keyLength == 4 && (memcmp(key, "_key", 4) == 0 || memcmp(key, "_rev", 4) == 0)))) {
// exclude attribute
continue;
}
localHash ^= it.key.hash(seed) ^ 0xba5befd00d;
localHash += it.value.normalizedHash(seed) ^ 0xd4129f526421;
}
}
hash ^= localHash;
}
return true;
});
trx.finish(res);
std::string const hashString = std::to_string(hash);
v8::Handle<v8::Object> result = v8::Object::New(isolate);
result->Set(TRI_V8_ASCII_STRING("checksum"), TRI_V8_STD_STRING(hashString));
result->Set(TRI_V8_ASCII_STRING("revision"), TRI_V8_STD_STRING(revisionId));
TRI_V8_RETURN(result);
TRI_V8_RETURN(TRI_VPackToV8(isolate, result.builder().slice()));
TRI_V8_TRY_CATCH_END
}

View File

@ -26,6 +26,7 @@
#include "Aql/PlanCache.h"
#include "Aql/QueryCache.h"
#include "Basics/fasthash.h"
#include "Basics/LocalTaskQueue.h"
#include "Basics/PerformanceLogScope.h"
#include "Basics/ReadLocker.h"
@ -1194,3 +1195,134 @@ VPackSlice LogicalCollection::keyOptions() const {
}
return VPackSlice(_keyOptions->data());
}
ChecksumResult LogicalCollection::checksum(bool withRevisions, bool withData) const {
auto transactionContext =
std::make_shared<transaction::StandaloneContext>(vocbase());
SingleCollectionTransaction trx(transactionContext, cid(), AccessMode::Type::READ);
Result res = trx.begin();
if (!res.ok()) {
return ChecksumResult(res);
}
trx.pinData(_cid); // will throw when it fails
// get last tick
LogicalCollection* collection = trx.documentCollection();
auto physical = collection->getPhysical();
TRI_ASSERT(physical != nullptr);
std::string const revisionId = TRI_RidToString(physical->revision(&trx));
uint64_t hash = 0;
ManagedDocumentResult mmdr;
trx.invokeOnAllElements(name(), [&hash, &withData, &withRevisions, &trx, &collection, &mmdr](DocumentIdentifierToken const& token) {
if (collection->readDocument(&trx, token, mmdr)) {
VPackSlice const slice(mmdr.vpack());
uint64_t localHash = transaction::helpers::extractKeyFromDocument(slice).hashString();
if (withRevisions) {
localHash += transaction::helpers::extractRevSliceFromDocument(slice).hash();
}
if (withData) {
// with data
uint64_t const n = slice.length() ^ 0xf00ba44ba5;
uint64_t seed = fasthash64_uint64(n, 0xdeadf054);
for (auto const& it : VPackObjectIterator(slice, false)) {
// loop over all attributes, but exclude _rev, _id and _key
// _id is different for each collection anyway, _rev is covered by withRevisions, and _key
// was already handled before
VPackValueLength keyLength;
char const* key = it.key.getString(keyLength);
if (keyLength >= 3 &&
key[0] == '_' &&
((keyLength == 3 && memcmp(key, "_id", 3) == 0) ||
(keyLength == 4 && (memcmp(key, "_key", 4) == 0 || memcmp(key, "_rev", 4) == 0)))) {
// exclude attribute
continue;
}
localHash ^= it.key.hash(seed) ^ 0xba5befd00d;
localHash += it.value.normalizedHash(seed) ^ 0xd4129f526421;
}
}
hash ^= localHash;
}
return true;
});
trx.finish(res);
std::string const hashString = std::to_string(hash);
VPackBuilder b;
{
VPackObjectBuilder o(&b);
b.add("checksum", VPackValue(hashString));
b.add("revision", VPackValue(revisionId));
}
return ChecksumResult(b);
}
Result LogicalCollection::compareChecksums(VPackSlice checksumSlice) const {
if (!checksumSlice.isObject()) {
auto typeName = checksumSlice.typeName();
return Result(
TRI_ERROR_REPLICATION_WRONG_CHECKSUM_FORMAT,
std::string("Checksum must be an object but is ") + typeName
);
}
auto revision = checksumSlice.get("revision");
auto checksum = checksumSlice.get("checksum");
if (!revision.isString()) {
return Result(
TRI_ERROR_REPLICATION_WRONG_CHECKSUM_FORMAT,
"Property `revisionId` must be a string"
);
}
if (!checksum.isString()) {
return Result(
TRI_ERROR_REPLICATION_WRONG_CHECKSUM_FORMAT,
"Property `checksum` must be a string"
);
}
auto result = this->checksum(false, false);
if (!result.ok()) {
return Result(result);
}
auto referenceChecksumSlice = result.slice();
auto referenceChecksum = referenceChecksumSlice.get("checksum");
auto referenceRevision = referenceChecksumSlice.get("revision");
TRI_ASSERT(referenceChecksum.isString());
TRI_ASSERT(referenceRevision.isString());
if (!checksum.isEqualString(referenceChecksum.copyString())) {
return Result(
TRI_ERROR_REPLICATION_WRONG_CHECKSUM,
"`checksum` property is wrong. Expected: "
+ referenceChecksum.copyString()
+ ". Actual: " + checksum.copyString()
);
}
if (!revision.isEqualString(referenceRevision.copyString())) {
return Result(
TRI_ERROR_REPLICATION_WRONG_CHECKSUM,
"`checksum` property is wrong. Expected: "
+ revision.copyString()
+ ". Actual: " + referenceRevision.copyString()
);
}
return Result();
}

View File

@ -58,6 +58,23 @@ namespace transaction {
class Methods;
}
class ChecksumResult: public Result {
public:
ChecksumResult(Result result) : Result(result) {}
ChecksumResult(VPackBuilder builder): Result(TRI_ERROR_NO_ERROR), _builder(builder) {}
VPackBuilder builder() {
return _builder;
}
VPackSlice slice() {
return _builder.slice();
}
private:
VPackBuilder _builder;
};
class LogicalCollection {
friend struct ::TRI_vocbase_t;
@ -305,6 +322,10 @@ class LogicalCollection {
// Caller is not allowed to free it.
inline KeyGenerator* keyGenerator() const { return _keyGenerator.get(); }
ChecksumResult checksum(bool, bool) const;
Result compareChecksums(velocypack::Slice) const;
private:
void prepareIndexes(velocypack::Slice indexesSlice);

View File

@ -129,6 +129,8 @@
"ERROR_REPLICATION_APPLIER_STOPPED" : { "code" : 1412, "message" : "replication stopped" },
"ERROR_REPLICATION_NO_START_TICK" : { "code" : 1413, "message" : "no start tick" },
"ERROR_REPLICATION_START_TICK_NOT_PRESENT" : { "code" : 1414, "message" : "start tick not present" },
"ERROR_REPLICATION_WRONG_CHECKSUM_FORMAT" : { "code" : 1415, "message" : "the checksum format is wrong" },
"ERROR_REPLICATION_WRONG_CHECKSUM" : { "code" : 1416, "message" : "wrong checksum" },
"ERROR_CLUSTER_NO_AGENCY" : { "code" : 1450, "message" : "could not connect to agency" },
"ERROR_CLUSTER_NO_COORDINATOR_HEADER" : { "code" : 1451, "message" : "missing coordinator header" },
"ERROR_CLUSTER_COULD_NOT_LOCK_PLAN" : { "code" : 1452, "message" : "could not lock plan in agency" },

View File

@ -217,7 +217,8 @@ function addShardFollower (endpoint, database, shard) {
console.topic('heartbeat=debug', 'addShardFollower: tell the leader to put us into the follower list...');
var url = endpointToURL(endpoint) + '/_db/' + database +
'/_api/replication/addFollower';
var body = {followerId: ArangoServerState.id(), shard};
let db = require('internal').db;
var body = {followerId: ArangoServerState.id(), shard, checksum: db._collection(shard).checksum()};
var r = request({url, body: JSON.stringify(body), method: 'PUT'});
if (r.status !== 200) {
console.topic('heartbeat=error', "addShardFollower: could not add us to the leader's follower list.", r);
@ -590,7 +591,12 @@ function synchronizeOneShard (database, shard, planId, leader) {
shard, sy2);
ok = false;
} else {
ok = addShardFollower(ep, database, shard);
try {
ok = addShardFollower(ep, database, shard);
} catch (err4) {
db._drop(shard);
throw err4;
}
}
} catch (err3) {
console.topic('heartbeat=error', 'synchronizeOneshard: exception in',

View File

@ -160,6 +160,8 @@ ERROR_REPLICATION_RUNNING,1411,"cannot perform operation while applier is runnin
ERROR_REPLICATION_APPLIER_STOPPED,1412,"replication stopped","Special error code used to indicate the replication applier was stopped by a user."
ERROR_REPLICATION_NO_START_TICK,1413,"no start tick","Will be raised when the replication applier is started without a known start tick value."
ERROR_REPLICATION_START_TICK_NOT_PRESENT,1414,"start tick not present","Will be raised when the replication applier fetches data using a start tick, but that start tick is not present on the logger server anymore."
ERROR_REPLICATION_WRONG_CHECKSUM_FORMAT,1415,"the checksum format is wrong", "Will be raised when the format of the checksum is wrong")
ERROR_REPLICATION_WRONG_CHECKSUM,1416,"wrong checksum","Will be raised when a new born follower submits a wrong checksum"
################################################################################
## ArangoDB cluster errors

View File

@ -125,6 +125,8 @@ void TRI_InitializeErrorMessages () {
REG_ERROR(ERROR_REPLICATION_APPLIER_STOPPED, "replication stopped");
REG_ERROR(ERROR_REPLICATION_NO_START_TICK, "no start tick");
REG_ERROR(ERROR_REPLICATION_START_TICK_NOT_PRESENT, "start tick not present");
REG_ERROR(ERROR_REPLICATION_WRONG_CHECKSUM_FORMAT, "the checksum format is wrong");
REG_ERROR(ERROR_REPLICATION_WRONG_CHECKSUM, "wrong checksum");
REG_ERROR(ERROR_CLUSTER_NO_AGENCY, "could not connect to agency");
REG_ERROR(ERROR_CLUSTER_NO_COORDINATOR_HEADER, "missing coordinator header");
REG_ERROR(ERROR_CLUSTER_COULD_NOT_LOCK_PLAN, "could not lock plan in agency");

View File

@ -288,6 +288,10 @@
/// - 1414: @LIT{start tick not present}
/// Will be raised when the replication applier fetches data using a start
/// tick, but that start tick is not present on the logger server anymore.
/// - 1415: @LIT{the checksum format is wrong}
/// "Will be raised when the format of the checksum is wrong")
/// - 1416: @LIT{wrong checksum}
/// Will be raised when a new born follower submits a wrong checksum
/// - 1450: @LIT{could not connect to agency}
/// Will be raised when none of the agency servers can be connected to.
/// - 1451: @LIT{missing coordinator header}
@ -1985,6 +1989,26 @@ void TRI_InitializeErrorMessages ();
#define TRI_ERROR_REPLICATION_START_TICK_NOT_PRESENT (1414)
////////////////////////////////////////////////////////////////////////////////
/// @brief 1415: ERROR_REPLICATION_WRONG_CHECKSUM_FORMAT
///
/// the checksum format is wrong
///
/// "Will be raised when the format of the checksum is wrong")
////////////////////////////////////////////////////////////////////////////////
#define TRI_ERROR_REPLICATION_WRONG_CHECKSUM_FORMAT (1415)
////////////////////////////////////////////////////////////////////////////////
/// @brief 1416: ERROR_REPLICATION_WRONG_CHECKSUM
///
/// wrong checksum
///
/// Will be raised when a new born follower submits a wrong checksum
////////////////////////////////////////////////////////////////////////////////
#define TRI_ERROR_REPLICATION_WRONG_CHECKSUM (1416)
////////////////////////////////////////////////////////////////////////////////
/// @brief 1450: ERROR_CLUSTER_NO_AGENCY
///