1
0
Fork 0

port some changes from 3.4 (#8011)

This commit is contained in:
Jan 2019-01-29 09:26:57 +01:00 committed by GitHub
parent a578f2e82b
commit f7d94daba2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 215 additions and 64 deletions

View File

@ -701,19 +701,47 @@ int MMFilesCollection::close() {
{
// We also have to unload the indexes.
READ_LOCKER(guard, _indexesLock); /// TODO - DEADLOCK!?!?
WRITE_LOCKER(writeLocker, _dataLock);
READ_LOCKER_EVENTUAL(guard, _indexesLock);
for (auto& idx : _indexes) {
idx->unload();
}
}
// wait until ditches have been processed fully
while (_ditches.contains(MMFilesDitch::TRI_DITCH_DATAFILE_DROP) ||
_ditches.contains(MMFilesDitch::TRI_DITCH_DATAFILE_RENAME) ||
_ditches.contains(MMFilesDitch::TRI_DITCH_COMPACTION)) {
WRITE_UNLOCKER(unlocker, _logicalCollection.lock());
std::this_thread::sleep_for(std::chrono::milliseconds(20));
int tries = 0;
while (true) {
bool hasDocumentDitch = _ditches.contains(MMFilesDitch::TRI_DITCH_DOCUMENT);
bool hasOtherDitch = (_ditches.contains(MMFilesDitch::TRI_DITCH_DATAFILE_DROP) ||
_ditches.contains(MMFilesDitch::TRI_DITCH_DATAFILE_RENAME) ||
_ditches.contains(MMFilesDitch::TRI_DITCH_REPLICATION) ||
_ditches.contains(MMFilesDitch::TRI_DITCH_COMPACTION));
if (!hasDocumentDitch && !hasOtherDitch) {
// we can abort now
break;
}
// give the cleanup thread more time to clean up
{
WRITE_UNLOCKER(unlocker, _logicalCollection.lock());
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
if ((++tries % 10) == 0) {
if (hasDocumentDitch) {
LOG_TOPIC(WARN, Logger::ENGINES) << "waiting for cleanup of document ditches for collection '" << _logicalCollection.name() << "'. has other: " << hasOtherDitch;
} else {
LOG_TOPIC(WARN, Logger::ENGINES) << "waiting for cleanup of ditches for collection '" << _logicalCollection.name() << "'";
}
}
if (tries == 60 && !hasOtherDitch) {
// give it up after a minute - this will close the collection at all cost
break;
}
}
{

View File

@ -125,7 +125,6 @@ void MMFilesDitches::destroy() {
} else if (type == MMFilesDitch::TRI_DITCH_DOCUMENT) {
LOG_TOPIC(ERR, arangodb::Logger::ENGINES)
<< "logic error. shouldn't have document ditches on unload";
TRI_ASSERT(false);
} else {
LOG_TOPIC(ERR, arangodb::Logger::ENGINES) << "unknown ditch type";
}
@ -279,16 +278,7 @@ void MMFilesDitches::freeMMFilesDocumentDitch(MMFilesDocumentDitch* ditch, bool
TRI_ASSERT(ditch->usedByTransaction() == true);
}
{
MUTEX_LOCKER(mutexLocker, _lock);
unlink(ditch);
// decrease counter
--_numMMFilesDocumentMMFilesDitches;
}
delete ditch;
freeDitch(ditch);
}
/// @brief creates a new document ditch and links it

View File

@ -3186,8 +3186,14 @@ char* MMFilesEngine::nextFreeMarkerPosition(LogicalCollection* collection, TRI_v
if (ditch == nullptr) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_OUT_OF_MEMORY);
}
cache->addDitch(ditch);
try {
cache->addDitch(ditch);
} catch (...) {
// prevent leaking here
arangodb::MMFilesCollection::toMMFilesCollection(collection)->ditches()->freeDitch(ditch);
throw;
}
}
TRI_ASSERT(dst != nullptr);

View File

@ -354,10 +354,6 @@ static int DumpCollection(MMFilesReplicationDumpContext* dump,
LogicalCollection* collection, TRI_voc_tick_t databaseId,
TRI_voc_cid_t collectionId, TRI_voc_tick_t dataMin,
TRI_voc_tick_t dataMax, bool withTicks, bool useVst = false) {
LOG_TOPIC(TRACE, arangodb::Logger::REPLICATION)
<< "dumping collection " << collection->id() << ", tick range " << dataMin
<< " - " << dataMax;
bool const isEdgeCollection = (collection->type() == TRI_COL_TYPE_EDGE);
// setup some iteration state
@ -415,8 +411,8 @@ static int DumpCollection(MMFilesReplicationDumpContext* dump,
dump->_bufferFull = false;
}
LOG_TOPIC(TRACE, arangodb::Logger::REPLICATION)
<< "dumped collection " << collection->id() << ", tick range "
LOG_TOPIC(DEBUG, arangodb::Logger::REPLICATION)
<< "dumped collection '" << collection->name() << "', tick range "
<< dataMin << " - " << dataMax << ", markers: " << numMarkers
<< ", last found tick: " << dump->_lastFoundTick
<< ", hasMore: " << dump->_hasMore << ", buffer full: " << dump->_bufferFull;
@ -446,6 +442,9 @@ int MMFilesDumpCollectionReplication(MMFilesReplicationDumpContext* dump,
TRI_voc_tick_t dataMin,
TRI_voc_tick_t dataMax, bool withTicks) {
TRI_ASSERT(collection != nullptr);
LOG_TOPIC(DEBUG, arangodb::Logger::REPLICATION)
<< "dumping collection '" << collection->name() << "', tick range " << dataMin << " - " << dataMax;
// get a custom type handler
auto customTypeHandler = dump->_transactionContext->orderCustomTypeHandler();
@ -458,6 +457,9 @@ int MMFilesDumpCollectionReplication(MMFilesReplicationDumpContext* dump,
if (b == nullptr) {
return TRI_ERROR_OUT_OF_MEMORY;
}
// always execute this
TRI_DEFER(mmfiles->ditches()->freeDitch(b));
// block compaction
int res;
@ -473,9 +475,6 @@ int MMFilesDumpCollectionReplication(MMFilesReplicationDumpContext* dump,
}
}
// always execute this
mmfiles->ditches()->freeDitch(b);
return res;
}
@ -484,7 +483,7 @@ int MMFilesDumpLogReplication(MMFilesReplicationDumpContext* dump,
std::unordered_set<TRI_voc_tid_t> const& transactionIds,
TRI_voc_tick_t firstRegularTick, TRI_voc_tick_t tickMin,
TRI_voc_tick_t tickMax, bool outputAsArray) {
LOG_TOPIC(TRACE, arangodb::Logger::REPLICATION)
LOG_TOPIC(DEBUG, arangodb::Logger::REPLICATION)
<< "dumping log, tick range " << tickMin << " - " << tickMax;
// get a custom type handler
@ -504,6 +503,7 @@ int MMFilesDumpLogReplication(MMFilesReplicationDumpContext* dump,
TRI_voc_cid_t lastCollectionId = 0;
bool hasMore = true;
bool bufferFull = false;
size_t numMarkers = 0;
try {
if (outputAsArray) {
@ -622,6 +622,8 @@ int MMFilesDumpLogReplication(MMFilesReplicationDumpContext* dump,
if (res != TRI_ERROR_NO_ERROR) {
THROW_ARANGO_EXCEPTION(res);
}
++numMarkers;
if (static_cast<uint64_t>(TRI_LengthStringBuffer(dump->_buffer)) >= dump->_chunkSize) {
// abort the iteration
@ -670,6 +672,12 @@ int MMFilesDumpLogReplication(MMFilesReplicationDumpContext* dump,
dump->_hasMore = false;
dump->_bufferFull = false;
}
LOG_TOPIC(DEBUG, arangodb::Logger::REPLICATION)
<< "dumped log, tick range "
<< tickMin << " - " << tickMax << ", markers: " << numMarkers
<< ", last found tick: " << dump->_lastFoundTick
<< ", hasMore: " << dump->_hasMore << ", buffer full: " << dump->_bufferFull;
}
return res;

View File

@ -227,6 +227,9 @@ void PregelFeature::start() {
void PregelFeature::beginShutdown() {
cleanupAll();
}
void PregelFeature::stop() {
Instance = nullptr;
}

View File

@ -52,6 +52,7 @@ class PregelFeature final : public application_features::ApplicationFeature {
void start() override final;
void beginShutdown() override final;
void stop() override final;
uint64_t createExecutionNumber();
void addConductor(std::unique_ptr<Conductor>&&, uint64_t executionNumber);
@ -87,4 +88,4 @@ class PregelFeature final : public application_features::ApplicationFeature {
} // namespace pregel
} // namespace arangodb
#endif
#endif

View File

@ -144,6 +144,11 @@ Result DatabaseInitialSyncer::runWithInventory(bool incremental, VPackSlice dbIn
LOG_TOPIC(DEBUG, Logger::REPLICATION)
<< "client: getting master state to dump " << vocbase().name();
Result r;
r = sendFlush();
if (r.fail()) {
return r;
}
if (!_config.isChild()) {
r = _config.master.getState(_config.connection, _config.isChild());
@ -168,10 +173,6 @@ Result DatabaseInitialSyncer::runWithInventory(bool incremental, VPackSlice dbIn
}
}
r = sendFlush();
if (r.fail()) {
return r;
}
if (!_config.isChild()) {
// create a WAL logfile barrier that prevents WAL logfile collection
@ -1231,8 +1232,8 @@ Result DatabaseInitialSyncer::handleCollection(VPackSlice const& parameters,
std::string const& masterColl = !masterUuid.empty() ? masterUuid : itoa(masterCid);
auto res = incremental && getSize(*col) > 0
? fetchCollectionSync(col, masterColl, _config.master.lastUncommittedLogTick)
: fetchCollectionDump(col, masterColl, _config.master.lastUncommittedLogTick);
? fetchCollectionSync(col, masterColl, _config.master.lastLogTick)
: fetchCollectionDump(col, masterColl, _config.master.lastLogTick);
if (!res.ok()) {
return res;

View File

@ -80,11 +80,6 @@ class InitialSyncer : public Syncer {
/// @brief return the last log tick of the master at start
TRI_voc_tick_t getLastLogTick() const { return _state.master.lastLogTick; }
/// @brief return the last uncommitted log tick of the master at start
TRI_voc_tick_t getLastUncommittedLogTick() const {
return _state.master.lastUncommittedLogTick;
}
/// @brief return the collections that were synced
std::map<TRI_voc_cid_t, std::string> const& getProcessedCollections() const {
return _progress.processedCollections;

View File

@ -645,6 +645,7 @@ function readTestResult(path, rc) {
let msg = 'failed to read ' + jsonFN + " - " + x;
print(RED + msg + RESET);
return {
failed: 1,
status: false,
message: msg,
duration: -1
@ -686,6 +687,12 @@ function readTestResult(path, rc) {
return rc;
}
}
function writeTestResult(path, data) {
const jsonFN = fs.join(path, 'testresult.json');
fs.write(jsonFN, JSON.stringify(data));
}
// //////////////////////////////////////////////////////////////////////////////
// / @brief runs a local unittest file using arangosh
// //////////////////////////////////////////////////////////////////////////////
@ -888,6 +895,7 @@ exports.makePathUnix = makePathUnix;
exports.makePathGeneric = makePathGeneric;
exports.performTests = performTests;
exports.readTestResult = readTestResult;
exports.writeTestResult = writeTestResult;
exports.filterTestcaseByOptions = filterTestcaseByOptions;
exports.splitBuckets = splitBuckets;
exports.doOnePathInner = doOnePathInner;

View File

@ -96,6 +96,7 @@ function runArangodRecovery (params) {
{
'log.foreground-tty': 'true',
'wal.ignore-logfile-errors': 'true',
'database.ignore-datafile-errors': 'false', // intentionally false!
'javascript.script-parameter': 'recovery'
}
)
@ -167,6 +168,12 @@ function recovery (options) {
print(BLUE + "running recovery of test " + count + " - " + test + RESET);
params.options.disableMonitor = options.disableMonitor;
params.setup = false;
tu.writeTestResult(params.args['temp.path'], {
failed: 1,
status: false,
message: "unable to run recovery test " + test,
duration: -1
});
runArangodRecovery(params);
results[test] = tu.readTestResult(

View File

@ -1197,6 +1197,9 @@ static ExternalProcess* getExternalProcess(TRI_pid_t pid) {
#ifndef _WIN32
static bool killProcess(ExternalProcess* pid, int signal) {
TRI_ASSERT(pid != nullptr);
if (signal == SIGKILL) {
LOG_TOPIC(WARN, arangodb::Logger::FIXME) << "sending SIGKILL signal to process: " << pid->_pid;
}
if (kill(pid->_pid, signal) == 0) {
return true;
}
@ -1417,11 +1420,12 @@ ExternalProcessStatus TRI_KillExternalProcess(ExternalId pid, int signal, bool i
return status;
}
std::this_thread::sleep_for(std::chrono::seconds(1));
if (count >= 8) {
if (count >= 13) {
TRI_ASSERT(external != nullptr);
LOG_TOPIC(WARN, arangodb::Logger::FIXME) << "about to send SIGKILL signal to process: " << external->_pid << ", status: " << (int) status._status;
killProcess(external, SIGKILL);
}
if (count > 20) {
if (count > 25) {
return status;
}
count++;
@ -1430,7 +1434,6 @@ ExternalProcessStatus TRI_KillExternalProcess(ExternalId pid, int signal, bool i
return TRI_CheckExternalProcess(pid, false);
}
#ifdef _WIN32
typedef LONG (NTAPI *NtSuspendProcess)(IN HANDLE ProcessHandle);
typedef LONG (NTAPI *NtResumeProcess)(IN HANDLE ProcessHandle);

View File

@ -89,7 +89,7 @@ function getClusterEndpoints() {
auth: {
bearer: jwtRoot,
},
timeout: 120
timeout: 300
});
assertTrue(res instanceof request.Response);
assertTrue(res.hasOwnProperty('statusCode'), JSON.stringify(res));
@ -107,7 +107,7 @@ function getLoggerState(endpoint) {
auth: {
bearer: jwtRoot,
},
timeout: 120
timeout: 300
});
assertTrue(res instanceof request.Response);
assertTrue(res.hasOwnProperty('statusCode'));
@ -122,7 +122,7 @@ function getApplierState(endpoint) {
auth: {
bearer: jwtRoot,
},
timeout: 120
timeout: 300
});
assertTrue(res instanceof request.Response);
assertTrue(res.hasOwnProperty('statusCode'));
@ -166,7 +166,7 @@ function checkData(server) {
auth: {
bearer: jwtRoot,
},
timeout: 120
timeout: 300
});
assertTrue(res instanceof request.Response);
@ -185,7 +185,7 @@ function readAgencyValue(path) {
bearer: jwtSuperuser,
},
body: JSON.stringify([[path]]),
timeout: 120
timeout: 300
});
assertTrue(res instanceof request.Response);
assertTrue(res.hasOwnProperty('statusCode'), JSON.stringify(res));

View File

@ -91,7 +91,7 @@ function getClusterEndpoints() {
auth: {
bearer: jwtRoot,
},
timeout: 120
timeout: 300
});
assertTrue(res instanceof request.Response);
assertTrue(res.hasOwnProperty('statusCode'), JSON.stringify(res));
@ -109,7 +109,7 @@ function getLoggerState(endpoint) {
auth: {
bearer: jwtRoot,
},
timeout: 120
timeout: 300
});
assertTrue(res instanceof request.Response);
assertTrue(res.hasOwnProperty('statusCode'));
@ -124,7 +124,7 @@ function getApplierState(endpoint) {
auth: {
bearer: jwtRoot,
},
timeout: 120
timeout: 300
});
assertTrue(res instanceof request.Response);
assertTrue(res.hasOwnProperty('statusCode'));
@ -168,7 +168,7 @@ function checkData(server) {
auth: {
bearer: jwtRoot,
},
timeout: 120
timeout: 300
});
assertTrue(res instanceof request.Response);
@ -187,7 +187,7 @@ function readAgencyValue(path) {
bearer: jwtSuperuser,
},
body: JSON.stringify([[path]]),
timeout: 120
timeout: 300
});
assertTrue(res instanceof request.Response);
assertTrue(res.hasOwnProperty('statusCode'), JSON.stringify(res));
@ -259,7 +259,7 @@ function setReadOnly(endpoint, ro) {
},
body: {"mode" : str},
json: true,
timeout: 120
timeout: 300
});
print(JSON.stringify(res));

View File

@ -0,0 +1,103 @@
/* jshint globalstrict:false, strict:false, unused : false */
/* global fail, assertEqual, assertTrue */
// //////////////////////////////////////////////////////////////////////////////
// / @brief tests for dump/reload
// /
// / @file
// /
// / DISCLAIMER
// /
// / Copyright 2010-2012 triagens GmbH, Cologne, Germany
// /
// / Licensed under the Apache License, Version 2.0 (the "License")
// / you may not use this file except in compliance with the License.
// / You may obtain a copy of the License at
// /
// / http://www.apache.org/licenses/LICENSE-2.0
// /
// / Unless required by applicable law or agreed to in writing, software
// / distributed under the License is distributed on an "AS IS" BASIS,
// / WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// / See the License for the specific language governing permissions and
// / limitations under the License.
// /
// / Copyright holder is triAGENS GmbH, Cologne, Germany
// /
// / @author Jan Steemann
// / @author Copyright 2012, triAGENS GmbH, Cologne, Germany
// //////////////////////////////////////////////////////////////////////////////
var db = require('@arangodb').db;
var internal = require('internal');
var jsunity = require('jsunity');
function runSetup () {
'use strict';
internal.debugClearFailAt();
db._drop('UnitTestsRecovery');
var c = db._create('UnitTestsRecovery');
internal.wal.flush(true, true);
internal.wait(1, false);
internal.debugSetFailAt('BreakHeaderMarker');
for (var i = 0; i < 10; ++i) {
c.insert({ value: i });
}
internal.wal.flush(true, true);
internal.wal.waitForCollector('UnitTestsRecovery');
// internal.wait(5, false);
internal.debugSegfault('crashing server');
}
// //////////////////////////////////////////////////////////////////////////////
// / @brief test suite
// //////////////////////////////////////////////////////////////////////////////
function recoverySuite () {
'use strict';
jsunity.jsUnity.attachAssertions();
return {
setUp: function () {},
tearDown: function () {},
// //////////////////////////////////////////////////////////////////////////////
// / @brief test whether we can start the server
// //////////////////////////////////////////////////////////////////////////////
testCorruptedCrcDatafile: function () {
if (require("internal").options()["database.ignore-datafile-errors"]) {
// counting must succeed in all cases
assertTrue(db._collection('UnitTestsRecovery').count() >= 0);
} else {
// if datafile errors are not ignored, then any counting must fail here
try {
db._collection('UnitTestsRecovery').count();
fail();
} catch (err) {
assertEqual(internal.errors.ERROR_ARANGO_CORRUPTED_COLLECTION.code, err.errorNum);
}
}
}
};
}
// //////////////////////////////////////////////////////////////////////////////
// / @brief executes the test suite
// //////////////////////////////////////////////////////////////////////////////
function main (argv) {
'use strict';
if (argv[1] === 'setup') {
runSetup();
return 0;
} else {
jsunity.run(recoverySuite);
return jsunity.writeDone().status ? 0 : 1;
}
}

View File

@ -1,5 +1,5 @@
/* jshint globalstrict:false, strict:false, unused : false */
/* global fail, assertEqual */
/* global fail, assertEqual, assertTrue */
// //////////////////////////////////////////////////////////////////////////////
// / @brief tests for dump/reload
@ -42,12 +42,13 @@ function runSetup () {
internal.wait(1, false);
internal.debugSetFailAt('BreakHeaderMarker');
internal.debugSetFailAt('LogfileManagerFlush');
internal.debugSetFailAt('CollectorThreadCollect');
for (var i = 0; i < 10; ++i) {
c.insert({ value: i });
}
internal.wal.flush(true, true);
internal.wal.flush(true);
internal.debugSetFailAt('LogfileManagerFlush');
internal.wait(5, false);
internal.debugSegfault('crashing server');
@ -70,12 +71,9 @@ function recoverySuite () {
// //////////////////////////////////////////////////////////////////////////////
testCorruptedCrc: function () {
try {
db._collection('UnitTestsRecovery').count();
fail();
} catch (err) {
assertEqual(internal.errors.ERROR_ARANGO_CORRUPTED_COLLECTION.code, err.errorNum);
}
// counting must succeed in all cases, but we don't know how many
// documents we will recover
assertTrue(db._collection('UnitTestsRecovery').count() >= 0);
}
};