1
0
Fork 0
This commit is contained in:
hkernbach 2015-08-04 17:34:51 +02:00
commit ae17532ee3
6 changed files with 122 additions and 27 deletions

View File

@ -228,15 +228,15 @@ void ArangoServer::defineHandlers (HttpHandlerFactory* factory) {
static TRI_vocbase_t* LookupDatabaseFromRequest (triagens::rest::HttpRequest* request,
TRI_server_t* server) {
// get the request endpoint
ConnectionInfo ci = request->connectionInfo();
const string& endpoint = ci.endpoint;
ConnectionInfo const& ci = request->connectionInfo();
std::string const& endpoint = ci.endpoint;
// get the databases mapped to the endpoint
ApplicationEndpointServer* s = static_cast<ApplicationEndpointServer*>(server->_applicationEndpointServer);
const vector<string> databases = s->getEndpointMapping(endpoint);
std::vector<std::string> const& databases = s->getEndpointMapping(endpoint);
// get database name from request
string dbName = request->databaseName();
std::string dbName = request->databaseName();
if (databases.empty()) {
// no databases defined. this means all databases are accessible via the endpoint
@ -248,11 +248,12 @@ static TRI_vocbase_t* LookupDatabaseFromRequest (triagens::rest::HttpRequest* re
}
}
else {
// only some databases are allowed for this endpoint
if (dbName.empty()) {
// no specific database requested, so use first mapped database
dbName = databases.at(0);
TRI_ASSERT(! databases.empty());
dbName = databases[0];
request->setDatabaseName(dbName);
}
else {
@ -260,6 +261,7 @@ static TRI_vocbase_t* LookupDatabaseFromRequest (triagens::rest::HttpRequest* re
for (size_t i = 0; i < databases.size(); ++i) {
if (dbName == databases.at(i)) {
request->setDatabaseName(dbName);
found = true;
break;
}

View File

@ -1326,16 +1326,18 @@ int TRI_BeginTransaction (TRI_transaction_t* trx,
if (nestingLevel == 0) {
TRI_ASSERT(trx->_status == TRI_TRANSACTION_CREATED);
auto logfileManager = triagens::wal::LogfileManager::instance();
if (! HasHint(trx, TRI_TRANSACTION_HINT_NO_THROTTLING) &&
trx->_type == TRI_TRANSACTION_WRITE &&
triagens::wal::LogfileManager::instance()->canBeThrottled()) {
logfileManager->canBeThrottled()) {
// write-throttling?
static uint64_t const WaitTime = 50000;
uint64_t const maxIterations = triagens::wal::LogfileManager::instance()->maxThrottleWait() / (WaitTime / 1000);
uint64_t const maxIterations = logfileManager->maxThrottleWait() / (WaitTime / 1000);
uint64_t iterations = 0;
while (triagens::wal::LogfileManager::instance()->isThrottled()) {
while (logfileManager->isThrottled()) {
if (++iterations == maxIterations) {
return TRI_ERROR_ARANGO_WRITE_THROTTLE_TIMEOUT;
}
@ -1344,14 +1346,18 @@ int TRI_BeginTransaction (TRI_transaction_t* trx,
}
}
// get a new id
trx->_id = TRI_NewTickServer();
// set hints
trx->_hints = hints;
// get a new id
trx->_id = TRI_NewTickServer();
// register a protector
triagens::wal::LogfileManager::instance()->registerTransaction(trx->_id);
int res = logfileManager->registerTransaction(trx->_id);
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
}
else {
TRI_ASSERT(trx->_status == TRI_TRANSACTION_RUNNING);

View File

@ -177,6 +177,9 @@ LogfileManager::LogfileManager (TRI_server_t* server,
if (res != 0) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "could not compile regex");
}
_transactions.reserve(32);
_failedTransactions.reserve(32);
}
////////////////////////////////////////////////////////////////////////////////
@ -414,9 +417,12 @@ bool LogfileManager::open () {
// note all failed transactions that we found plus the list
// of collections and databases that we can ignore
{
WRITE_LOCKER(_logfilesLock);
for (auto it = _recoverState->failedTransactions.begin(); it != _recoverState->failedTransactions.end(); ++it) {
_failedTransactions.insert((*it).first);
WRITE_LOCKER(_transactionsLock);
_failedTransactions.reserve(_recoverState->failedTransactions.size());
for (auto const& it : _recoverState->failedTransactions) {
_failedTransactions.emplace(it.first);
}
_droppedDatabases = _recoverState->droppedDatabases;
@ -593,14 +599,28 @@ void LogfileManager::stop () {
/// @brief registers a transaction
////////////////////////////////////////////////////////////////////////////////
void LogfileManager::registerTransaction (TRI_voc_tid_t transactionId) {
int LogfileManager::registerTransaction (TRI_voc_tid_t transactionId) {
auto lastCollectedId = _lastCollectedId.load();
auto lastSealedId = _lastSealedId.load();
WRITE_LOCKER(_transactionsLock);
TRI_IF_FAILURE("LogfileManagerRegisterTransactionOom") {
// intentionally fail here
return TRI_ERROR_OUT_OF_MEMORY;
}
_transactions.emplace(transactionId, std::make_pair(lastCollectedId, lastSealedId));
TRI_ASSERT_EXPENSIVE(lastCollectedId <= lastSealedId);
try {
auto p = std::make_pair(lastCollectedId, lastSealedId);
WRITE_LOCKER(_transactionsLock);
_transactions.emplace(transactionId, std::move(p));
TRI_ASSERT_EXPENSIVE(lastCollectedId <= lastSealedId);
return TRI_ERROR_NO_ERROR;
}
catch (...) {
return TRI_ERROR_OUT_OF_MEMORY;
}
}
////////////////////////////////////////////////////////////////////////////////
@ -612,7 +632,7 @@ void LogfileManager::unregisterTransaction (TRI_voc_tid_t transactionId,
WRITE_LOCKER(_transactionsLock);
_transactions.erase(transactionId);
if (markAsFailed) {
_failedTransactions.emplace(transactionId);
}
@ -626,7 +646,7 @@ std::unordered_set<TRI_voc_tid_t> LogfileManager::getFailedTransactions () {
std::unordered_set<TRI_voc_tid_t> failedTransactions;
{
READ_LOCKER(_logfilesLock);
READ_LOCKER(_transactionsLock);
failedTransactions = _failedTransactions;
}
@ -670,7 +690,7 @@ std::unordered_set<TRI_voc_tick_t> LogfileManager::getDroppedDatabases () {
////////////////////////////////////////////////////////////////////////////////
void LogfileManager::unregisterFailedTransactions (std::unordered_set<TRI_voc_tid_t> const& failedTransactions) {
WRITE_LOCKER(_logfilesLock);
WRITE_LOCKER(_transactionsLock);
std::for_each(failedTransactions.begin(), failedTransactions.end(), [&] (TRI_voc_tid_t id) {
_failedTransactions.erase(id);
@ -1330,7 +1350,7 @@ Logfile* LogfileManager::getCollectableLogfile () {
READ_LOCKER(_transactionsLock);
// iterate over all active transactions and find their minimum used logfile id
for (auto& it : _transactions) {
for (auto const& it : _transactions) {
Logfile::IdType lastWrittenId = it.second.second;
if (lastWrittenId < minId) {
@ -1377,7 +1397,7 @@ Logfile* LogfileManager::getRemovableLogfile () {
READ_LOCKER(_transactionsLock);
// iterate over all active readers and find their minimum used logfile id
for (auto& it : _transactions) {
for (auto const& it : _transactions) {
Logfile::IdType lastCollectedId = it.second.first;
if (lastCollectedId < minId) {

View File

@ -367,7 +367,7 @@ namespace triagens {
/// @brief registers a transaction
////////////////////////////////////////////////////////////////////////////////
void registerTransaction (TRI_voc_tid_t);
int registerTransaction (TRI_voc_tid_t);
////////////////////////////////////////////////////////////////////////////////
/// @brief unregisters a transaction
@ -1135,7 +1135,7 @@ namespace triagens {
/// @brief currently ongoing transactions
////////////////////////////////////////////////////////////////////////////////
std::map<TRI_voc_tid_t, std::pair<Logfile::IdType, Logfile::IdType>> _transactions;
std::unordered_map<TRI_voc_tid_t, std::pair<Logfile::IdType, Logfile::IdType>> _transactions;
////////////////////////////////////////////////////////////////////////////////
/// @brief set of failed transactions

View File

@ -0,0 +1,38 @@
'use strict';
const semver = require('semver');
const db = require('org/arangodb').db;
function deprecated(version, graceReleases, message) {
if (typeof version === 'number') {
version = String(version);
}
if (typeof version === 'string' && !semver.valid(version)) {
if (semver.valid(version + '.0.0')) {
version += '.0.0';
} else if (semver.valid(version + '.0')) {
version += '.0';
}
}
const arangoVersion = db._version();
const arangoMajor = semver.major(arangoVersion);
const arangoMinor = semver.minor(arangoVersion);
const deprecateMajor = semver.major(version);
const deprecateMinor = semver.minor(version);
if (!message && typeof graceReleases === 'string') {
message = graceReleases;
graceReleases = 1;
}
if (!message) {
message = 'This feature is deprecated.';
}
if (arangoMajor >= deprecateMajor) {
if (arangoMajor > deprecateMajor || arangoMinor >= deprecateMinor) {
throw new Error(`DEPRECATED: ${message}`);
}
if (arangoMinor >= (deprecateMinor - graceReleases)) {
console.warn(`DEPRECATED: ${message}`);
}
}
}
module.exports = deprecated;

View File

@ -4263,6 +4263,35 @@ function transactionServerFailuresSuite () {
c = null;
internal.wait(0);
},
////////////////////////////////////////////////////////////////////////////////
/// @brief test: rollback in case of starting a transaction fails
////////////////////////////////////////////////////////////////////////////////
testBeginTransactionFailure : function () {
internal.debugClearFailAt();
db._drop(cn);
c = db._create(cn);
internal.debugSetFailAt("LogfileManagerRegisterTransactionOom");
try {
db._executeTransaction({
collections: {
write: cn
},
action: function () {
c.save({ value: 1 });
fail();
}
});
fail();
}
catch (err) {
assertEqual(internal.errors.ERROR_OUT_OF_MEMORY.code, err.errorNum);
}
},
////////////////////////////////////////////////////////////////////////////////
/// @brief test: rollback in case of a server-side fail