1
0
Fork 0

Bug fix 3.4/optimizations 210918 (#6574)

This commit is contained in:
Jan 2018-09-25 09:50:25 +02:00 committed by GitHub
parent 058b5072b3
commit a8cd72f502
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 96 additions and 50 deletions

View File

@ -1423,6 +1423,7 @@ Result RestReplicationHandler::processRestoreDataBatch(
options.ignoreRevs = true;
options.isRestore = true;
options.waitForSync = false;
options.overwrite = true;
opRes = trx.insert(collectionName, requestSlice, options);
if (opRes.fail()) {
return opRes.result;

View File

@ -841,8 +841,7 @@ bool RocksDBCollection::readDocumentWithCallback(
transaction::Methods* trx, LocalDocumentId const& documentId,
IndexIterator::DocumentCallback const& cb) const {
if (documentId.isSet()) {
auto res = lookupDocumentVPack(documentId, trx, cb, true);
return res.ok();
return lookupDocumentVPack(documentId, trx, cb, true).ok();
}
return false;
}
@ -870,6 +869,32 @@ Result RocksDBCollection::insert(arangodb::transaction::Methods* trx,
}
VPackSlice newSlice = builder->slice();
if (options.overwrite) {
// special optimization for the overwrite case:
// in case the operation is a RepSert, we will first check if the specified
// primary key exists. we can abort this low-level insert early, before any
// modification to the data has been done. this saves us from creating a RocksDB
// transaction SavePoint.
// if we don't do the check here, we will always create a SavePoint first and
// insert the new document. when then inserting the key for the primary index and
// then detecting a unique constraint violation, the transaction would be rolled
// back to the SavePoint state, which will rebuild *all* data in the WriteBatch
// up to the SavePoint. this can be super-expensive for bigger transactions.
// to keep things simple, we are not checking for unique constraint violations
// in secondary indexes here, but defer it to the regular index insertion check
VPackSlice keySlice = newSlice.get(StaticStrings::KeyString);
if (keySlice.isString()) {
LocalDocumentId const documentId = primaryIndex()->lookupKey(trx, StringRef(keySlice));
if (documentId.isSet()) {
if (options.indexOperationMode == Index::OperationMode::internal) {
// need to return the key of the conflict document
return Result(TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED, keySlice.copyString());
}
return Result(TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED);
}
}
}
auto state = RocksDBTransactionState::toState(trx);
auto mthds = RocksDBTransactionState::toMethods(trx);

View File

@ -721,7 +721,7 @@ arangodb::Result processInputDirectory(
auto queueStats = jobQueue.statistics();
// periodically report current status, but do not spam user
LOG_TOPIC(INFO, Logger::RESTORE)
<< "# Worker progress summary: restored " << stats.restoredCollections
<< "# Current restore progress: restored " << stats.restoredCollections
<< " of " << stats.totalCollections << " collection(s), read " << stats.totalRead << " byte(s) from datafiles, "
<< "sent " << stats.totalBatches << " data batch(es) of " << stats.totalSent << " byte(s) total size"
<< ", queued jobs: " << std::get<0>(queueStats) << ", workers: " << std::get<1>(queueStats);
@ -1043,9 +1043,7 @@ void RestoreFeature::start() {
// set up threads and workers
_clientTaskQueue.spawnWorkers(_clientManager, _options.threadCount);
if (_options.progress) {
LOG_TOPIC(INFO, Logger::RESTORE) << "Using " << _options.threadCount << " worker thread(s)";
}
LOG_TOPIC(DEBUG, Logger::RESTORE) << "Using " << _options.threadCount << " worker thread(s)";
// run the actual restore
try {

View File

@ -28,8 +28,19 @@ var jsunity = require("jsunity");
var arangodb = require("@arangodb");
var internal = require("internal");
var db = arangodb.db;
var ArangoCollection = require("@arangodb/arango-collection").ArangoCollection;
let db = arangodb.db;
let ArangoCollection = require("@arangodb/arango-collection").ArangoCollection;
let tasks = require("@arangodb/tasks");
let cleanTasks = function () {
tasks.get().forEach(function(task) {
if (task.id.match(/^UnitTest/) || task.name.match(/^UnitTest/)) {
try {
tasks.unregister(task);
} catch (err) {}
}
});
};
function ThrowCollectionNotLoadedSuite() {
'use strict';
@ -47,13 +58,14 @@ function ThrowCollectionNotLoadedSuite() {
db._drop(cn);
// restore old settings
internal.throwOnCollectionNotLoaded(old);
cleanTasks();
},
// test regular loading of collection
testLoad: function() {
internal.throwOnCollectionNotLoaded(false);
var c = db._create(cn);
let c = db._create(cn);
c.save({
value: 1
});
@ -94,17 +106,16 @@ function ThrowCollectionNotLoadedSuite() {
// test parallel loading of collection
testLoadParallel: function() {
internal.throwOnCollectionNotLoaded(false);
var tasks = require("@arangodb/tasks");
var c = db._create(cn);
let c = db._create(cn);
for (var i = 0; i < 10000; ++i) {
c.save({
value: 1
});
let docs = [];
for (let i = 0; i < 10000; ++i) {
docs.push({ value: i });
}
c.insert(docs);
db._drop(cn + "Collect");
var cnCollect = cn + "Collect";
let cnCollect = cn + "Collect";
db._drop(cnCollect);
db._create(cnCollect);
c.unload();
@ -113,20 +124,20 @@ function ThrowCollectionNotLoadedSuite() {
while (db._collection(cn).status() !== ArangoCollection.STATUS_UNLOADED) {
db._collection(cn).unload();
internal.wait(0.5);
internal.wait(0.25, false);
}
var task = {
let task = {
offset: 0,
params: {
cn: cn
cn
},
command: function(params) {
var db = require('internal').db;
var result = db._collection(params.cn + "Collect");
let db = require('internal').db;
let result = db._collection(params.cn + "Collect");
try {
for (var i = 0; i < 100; ++i) {
for (let i = 0; i < 100; ++i) {
db._collection(params.cn).load();
db._collection(params.cn).unload();
}
@ -146,8 +157,8 @@ function ThrowCollectionNotLoadedSuite() {
// spawn a few tasks that load and unload
let iter = 20;
for (i = 0; i < iter; ++i) {
task.id = "loadtest" + i;
for (let i = 0; i < iter; ++i) {
task.id = "UnitTest" + i;
tasks.register(task);
}
@ -155,13 +166,13 @@ function ThrowCollectionNotLoadedSuite() {
let rc = db._collection(cnCollect);
while (rc.count() < iter) {
internal.wait(0.5);
internal.wait(0.5, false);
}
// check for errors
var errors = internal.errors;
let errors = internal.errors;
var found = rc.byExample({
let found = rc.byExample({
err: errors.ERROR_ARANGO_COLLECTION_NOT_LOADED.code
}).toArray();
db._drop(cnCollect);
@ -173,17 +184,16 @@ function ThrowCollectionNotLoadedSuite() {
// test parallel loading of collection, with flag
testLoadParallelWithFlag: function() {
internal.throwOnCollectionNotLoaded(true);
var tasks = require("@arangodb/tasks");
var c = db._create(cn);
let c = db._create(cn);
for (var i = 0; i < 10000; ++i) {
c.save({
value: 1
});
let docs = [];
for (let i = 0; i < 50000; ++i) {
docs.push({ value: i });
}
c.insert(docs);
db._drop(cn + "Collect");
var cnCollect = cn + "Collect";
let cnCollect = cn + "Collect";
db._drop(cnCollect);
db._create(cnCollect);
c.unload();
@ -192,25 +202,30 @@ function ThrowCollectionNotLoadedSuite() {
while (db._collection(cn).status() !== ArangoCollection.STATUS_UNLOADED) {
db._collection(cn).unload();
internal.wait(0.5);
internal.wait(0.25, false);
}
var task = {
let task = {
offset: 0,
params: {
cn: cn
cn
},
command: function(params) {
var db = require('internal').db;
var result = db._collection(params.cn + "Collect");
let db = require('internal').db;
let result;
try {
for (var i = 0; i < 500; ++i) {
for (let i = 0; i < 500; ++i) {
result = db._collection(params.cn + "Collect");
if (result === null) {
// test is over already
return;
}
db._collection(params.cn).load();
db._collection(params.cn).unload();
}
} catch (err) {
db._collection(params.cn + "Collect").save({
result.save({
err: err.errorNum
});
return;
@ -225,22 +240,29 @@ function ThrowCollectionNotLoadedSuite() {
// spawn a few tasks that load and unload
let iter = 20;
for (i = 0; i < iter; ++i) {
task.id = "loadtest" + i;
for (let i = 0; i < iter; ++i) {
task.id = "UnitTest" + i;
tasks.register(task);
}
// wait for tasks to join
let rc = db._collection(cnCollect);
// check for errors
let errors = internal.errors;
while (rc.count() < iter) {
internal.wait(0.5);
if (db._collection(cnCollect).firstExample({
err: errors.ERROR_ARANGO_COLLECTION_NOT_LOADED.code
}) !== null) {
// already got one failure, so we can stop here
break;
}
internal.wait(0.25, false);
}
cleanTasks();
// check for errors
var errors = internal.errors;
var found = db._collection(cnCollect).byExample({
let found = db._collection(cnCollect).byExample({
err: errors.ERROR_ARANGO_COLLECTION_NOT_LOADED.code
}).toArray();
db._drop(cnCollect);