mirror of https://gitee.com/bigwinds/arangodb
1564 lines
54 KiB
C++
1564 lines
54 KiB
C++
////////////////////////////////////////////////////////////////////////////////
|
|
/// DISCLAIMER
|
|
///
|
|
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
|
|
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
|
|
///
|
|
/// Licensed under the Apache License, Version 2.0 (the "License");
|
|
/// you may not use this file except in compliance with the License.
|
|
/// You may obtain a copy of the License at
|
|
///
|
|
/// http://www.apache.org/licenses/LICENSE-2.0
|
|
///
|
|
/// Unless required by applicable law or agreed to in writing, software
|
|
/// distributed under the License is distributed on an "AS IS" BASIS,
|
|
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
/// See the License for the specific language governing permissions and
|
|
/// limitations under the License.
|
|
///
|
|
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
|
///
|
|
/// @author Jan Steemann
|
|
////////////////////////////////////////////////////////////////////////////////
|
|
|
|
#include "MMFilesWalRecoverState.h"
|
|
#include "ApplicationFeatures/ApplicationServer.h"
|
|
#include "Basics/Exceptions.h"
|
|
#include "Basics/FileUtils.h"
|
|
#include "Basics/Result.h"
|
|
#include "Basics/VelocyPackHelper.h"
|
|
#include "Basics/conversions.h"
|
|
#include "Basics/files.h"
|
|
#include "Basics/memory-map.h"
|
|
#include "Basics/tri-strings.h"
|
|
#include "MMFiles/MMFilesCollection.h"
|
|
#include "MMFiles/MMFilesDatafileHelper.h"
|
|
#include "MMFiles/MMFilesEngine.h"
|
|
#include "MMFiles/MMFilesLogfileManager.h"
|
|
#include "MMFiles/MMFilesPersistentIndexFeature.h"
|
|
#include "MMFiles/MMFilesWalSlots.h"
|
|
#include "Rest/Version.h"
|
|
#include "RestServer/DatabaseFeature.h"
|
|
#include "StorageEngine/EngineSelectorFeature.h"
|
|
#include "Transaction/Helpers.h"
|
|
#include "Transaction/Hints.h"
|
|
#include "Transaction/StandaloneContext.h"
|
|
#include "Utils/OperationOptions.h"
|
|
#include "Utils/SingleCollectionTransaction.h"
|
|
#include "VocBase/LogicalCollection.h"
|
|
#include "VocBase/LogicalView.h"
|
|
|
|
#include <velocypack/Collection.h>
|
|
#include <velocypack/Parser.h>
|
|
#include <velocypack/velocypack-aliases.h>
|
|
|
|
using namespace arangodb;
|
|
|
|
namespace {
|
|
|
|
/// @brief convert a number slice into its numeric equivalent
|
|
template <typename T>
|
|
static inline T numericValue(VPackSlice const& slice, char const* attribute) {
|
|
if (!slice.isObject()) {
|
|
LOG_TOPIC(ERR, arangodb::Logger::ENGINES)
|
|
<< "invalid value type when looking for attribute '" << attribute
|
|
<< "': expecting object";
|
|
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER,
|
|
"invalid attribute value: expecting object");
|
|
}
|
|
VPackSlice v = slice.get(attribute);
|
|
if (v.isString()) {
|
|
return static_cast<T>(std::stoull(v.copyString()));
|
|
}
|
|
if (v.isNumber()) {
|
|
return v.getNumber<T>();
|
|
}
|
|
|
|
LOG_TOPIC(ERR, arangodb::Logger::ENGINES) << "invalid value for attribute '"
|
|
<< attribute << "'";
|
|
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER,
|
|
"invalid attribute value");
|
|
}
|
|
}
|
|
|
|
/// @brief creates the recover state
|
|
MMFilesWalRecoverState::MMFilesWalRecoverState(bool ignoreRecoveryErrors)
|
|
: databaseFeature(nullptr),
|
|
failedTransactions(),
|
|
lastTick(0),
|
|
logfilesToProcess(),
|
|
openedCollections(),
|
|
openedDatabases(),
|
|
emptyLogfiles(),
|
|
ignoreRecoveryErrors(ignoreRecoveryErrors),
|
|
errorCount(0),
|
|
maxRevisionId(0),
|
|
lastDatabaseId(0),
|
|
lastCollectionId(0) {
|
|
databaseFeature =
|
|
application_features::ApplicationServer::getFeature<DatabaseFeature>(
|
|
"Database");
|
|
}
|
|
|
|
/// @brief destroys the recover state
|
|
MMFilesWalRecoverState::~MMFilesWalRecoverState() { releaseResources(); }
|
|
|
|
/// @brief release opened collections and databases so they can be shut down
|
|
/// etc.
|
|
void MMFilesWalRecoverState::releaseResources() {
|
|
// release all collections
|
|
for (auto it = openedCollections.begin(); it != openedCollections.end();
|
|
++it) {
|
|
std::shared_ptr<arangodb::LogicalCollection>& collection = it->second;
|
|
collection->vocbase().releaseCollection(collection.get());
|
|
}
|
|
|
|
openedCollections.clear();
|
|
|
|
// release all databases
|
|
for (auto it = openedDatabases.begin(); it != openedDatabases.end(); ++it) {
|
|
(*it).second->release();
|
|
}
|
|
|
|
openedDatabases.clear();
|
|
}
|
|
|
|
/// @brief gets a database (and inserts it into the cache if not in it)
|
|
TRI_vocbase_t* MMFilesWalRecoverState::useDatabase(TRI_voc_tick_t databaseId) {
|
|
auto it = openedDatabases.find(databaseId);
|
|
|
|
if (it != openedDatabases.end()) {
|
|
return (*it).second;
|
|
}
|
|
|
|
TRI_vocbase_t* vocbase = databaseFeature->useDatabase(databaseId);
|
|
|
|
if (vocbase == nullptr) {
|
|
return nullptr;
|
|
}
|
|
|
|
openedDatabases.emplace(databaseId, vocbase);
|
|
return vocbase;
|
|
}
|
|
|
|
/// @brief release a database (so it can be dropped)
|
|
TRI_vocbase_t* MMFilesWalRecoverState::releaseDatabase(
|
|
TRI_voc_tick_t databaseId) {
|
|
auto it = openedDatabases.find(databaseId);
|
|
|
|
if (it == openedDatabases.end()) {
|
|
return nullptr;
|
|
}
|
|
|
|
TRI_vocbase_t* vocbase = (*it).second;
|
|
|
|
TRI_ASSERT(vocbase != nullptr);
|
|
|
|
// release all collections we ourselves have opened for this database
|
|
auto it2 = openedCollections.begin();
|
|
|
|
while (it2 != openedCollections.end()) {
|
|
std::shared_ptr<arangodb::LogicalCollection>& collection = it2->second;
|
|
|
|
TRI_ASSERT(collection != nullptr);
|
|
|
|
if (collection->vocbase().id() == databaseId) {
|
|
// correct database, now release the collection
|
|
TRI_ASSERT(vocbase == &(collection->vocbase()));
|
|
vocbase->releaseCollection(collection.get());
|
|
// get new iterator position
|
|
it2 = openedCollections.erase(it2);
|
|
} else {
|
|
// collection not found, advance in the loop
|
|
++it2;
|
|
}
|
|
}
|
|
|
|
vocbase->release();
|
|
openedDatabases.erase(databaseId);
|
|
|
|
return vocbase;
|
|
}
|
|
|
|
/// @brief release a collection (so it can be dropped)
|
|
std::shared_ptr<arangodb::LogicalCollection> MMFilesWalRecoverState::releaseCollection(
|
|
TRI_voc_cid_t collectionId) {
|
|
auto it = openedCollections.find(collectionId);
|
|
|
|
if (it == openedCollections.end()) {
|
|
return nullptr;
|
|
}
|
|
|
|
std::shared_ptr<arangodb::LogicalCollection>& collection = it->second;
|
|
TRI_ASSERT(collection != nullptr);
|
|
collection->vocbase().releaseCollection(collection.get());
|
|
openedCollections.erase(collectionId);
|
|
|
|
return collection;
|
|
}
|
|
|
|
/// @brief gets a collection (and inserts it into the cache if not in it)
|
|
arangodb::LogicalCollection* MMFilesWalRecoverState::useCollection(
|
|
TRI_vocbase_t* vocbase, TRI_voc_cid_t collectionId, int& res) {
|
|
auto it = openedCollections.find(collectionId);
|
|
|
|
if (it != openedCollections.end()) {
|
|
res = TRI_ERROR_NO_ERROR;
|
|
return (*it).second.get();
|
|
}
|
|
|
|
TRI_set_errno(TRI_ERROR_NO_ERROR);
|
|
TRI_vocbase_col_status_e status; // ignored here
|
|
std::shared_ptr<arangodb::LogicalCollection> collection =
|
|
vocbase->useCollection(collectionId, status);
|
|
|
|
if (collection == nullptr) {
|
|
res = TRI_errno();
|
|
|
|
if (res == TRI_ERROR_ARANGO_CORRUPTED_COLLECTION) {
|
|
LOG_TOPIC(WARN, arangodb::Logger::ENGINES)
|
|
<< "unable to open collection " << collectionId
|
|
<< ". Please check the logs above for errors.";
|
|
}
|
|
|
|
return nullptr;
|
|
}
|
|
|
|
auto physical = static_cast<MMFilesCollection*>(collection->getPhysical());
|
|
TRI_ASSERT(physical != nullptr);
|
|
// disable secondary indexes for the moment
|
|
physical->useSecondaryIndexes(false);
|
|
|
|
openedCollections.emplace(collectionId, collection);
|
|
res = TRI_ERROR_NO_ERROR;
|
|
return collection.get();
|
|
}
|
|
|
|
/// @brief looks up a collection
|
|
/// the collection will be opened after this call and inserted into a local
|
|
/// cache for faster lookups
|
|
/// returns nullptr if the collection does not exist
|
|
LogicalCollection* MMFilesWalRecoverState::getCollection(
|
|
TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId) {
|
|
TRI_vocbase_t* vocbase = useDatabase(databaseId);
|
|
|
|
if (vocbase == nullptr) {
|
|
LOG_TOPIC(TRACE, arangodb::Logger::ENGINES) << "database " << databaseId
|
|
<< " not found";
|
|
return nullptr;
|
|
}
|
|
|
|
int res;
|
|
arangodb::LogicalCollection* collection =
|
|
useCollection(vocbase, collectionId, res);
|
|
|
|
if (collection == nullptr) {
|
|
LOG_TOPIC(TRACE, arangodb::Logger::ENGINES) << "collection " << collectionId
|
|
<< " of database " << databaseId
|
|
<< " not found";
|
|
return nullptr;
|
|
}
|
|
return collection;
|
|
}
|
|
|
|
/// @brief executes a single operation inside a transaction
|
|
int MMFilesWalRecoverState::executeSingleOperation(
|
|
TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId,
|
|
MMFilesMarker const* marker, TRI_voc_fid_t fid,
|
|
std::function<int(SingleCollectionTransaction*, MMFilesMarkerEnvelope*)>
|
|
func) {
|
|
// first find the correct database
|
|
TRI_vocbase_t* vocbase = useDatabase(databaseId);
|
|
|
|
if (vocbase == nullptr) {
|
|
LOG_TOPIC(TRACE, arangodb::Logger::ENGINES) << "database " << databaseId
|
|
<< " not found";
|
|
return TRI_ERROR_ARANGO_DATABASE_NOT_FOUND;
|
|
}
|
|
|
|
Result res;
|
|
int tmpres;
|
|
arangodb::LogicalCollection* collection =
|
|
useCollection(vocbase, collectionId, tmpres);
|
|
res.reset(tmpres);
|
|
|
|
if (collection == nullptr) {
|
|
if (res.errorNumber() == TRI_ERROR_ARANGO_CORRUPTED_COLLECTION) {
|
|
return res.errorNumber();
|
|
}
|
|
return TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND;
|
|
}
|
|
|
|
auto mmfiles = static_cast<MMFilesCollection*>(collection->getPhysical());
|
|
TRI_ASSERT(mmfiles);
|
|
TRI_voc_tick_t maxTick = mmfiles->maxTick();
|
|
if (marker->getTick() <= maxTick) {
|
|
// already transferred this marker
|
|
return TRI_ERROR_NO_ERROR;
|
|
}
|
|
|
|
res = TRI_ERROR_INTERNAL;
|
|
|
|
try {
|
|
auto ctx = transaction::StandaloneContext::Create(*vocbase);
|
|
SingleCollectionTransaction trx(ctx, *collection, AccessMode::Type::WRITE);
|
|
|
|
trx.addHint(transaction::Hints::Hint::SINGLE_OPERATION);
|
|
trx.addHint(transaction::Hints::Hint::NO_BEGIN_MARKER);
|
|
trx.addHint(transaction::Hints::Hint::NO_ABORT_MARKER);
|
|
trx.addHint(transaction::Hints::Hint::NO_THROTTLING);
|
|
trx.addHint(transaction::Hints::Hint::LOCK_NEVER);
|
|
trx.addHint(
|
|
transaction::Hints::Hint::RECOVERY); // to turn off waitForSync!
|
|
|
|
res = trx.begin();
|
|
|
|
if (!res.ok()) {
|
|
THROW_ARANGO_EXCEPTION(res);
|
|
}
|
|
|
|
MMFilesMarkerEnvelope envelope(marker, fid);
|
|
|
|
// execute the operation
|
|
res = func(&trx, &envelope);
|
|
|
|
if (!res.ok()) {
|
|
THROW_ARANGO_EXCEPTION(res);
|
|
}
|
|
|
|
// commit the operation
|
|
res = trx.commit();
|
|
} catch (arangodb::basics::Exception const& ex) {
|
|
LOG_TOPIC(ERR, arangodb::Logger::ENGINES)
|
|
<< "caught exception during recovery of marker type "
|
|
<< TRI_NameMarkerDatafile(marker) << ": " << ex.what();
|
|
res = ex.code();
|
|
} catch (std::exception const& ex) {
|
|
LOG_TOPIC(ERR, arangodb::Logger::ENGINES)
|
|
<< "caught exception during recovery of marker type "
|
|
<< TRI_NameMarkerDatafile(marker) << ": " << ex.what();
|
|
res = TRI_ERROR_INTERNAL;
|
|
} catch (...) {
|
|
LOG_TOPIC(ERR, arangodb::Logger::ENGINES)
|
|
<< "caught unknown exception during recovery of marker type "
|
|
<< TRI_NameMarkerDatafile(marker);
|
|
res = TRI_ERROR_INTERNAL;
|
|
}
|
|
|
|
return res.errorNumber();
|
|
}
|
|
|
|
/// @brief callback to handle one marker during recovery
|
|
/// this function only builds up state and does not change any data
|
|
bool MMFilesWalRecoverState::InitialScanMarker(MMFilesMarker const* marker,
|
|
void* data,
|
|
MMFilesDatafile* datafile) {
|
|
MMFilesWalRecoverState* state =
|
|
reinterpret_cast<MMFilesWalRecoverState*>(data);
|
|
|
|
TRI_ASSERT(marker != nullptr);
|
|
|
|
// note the marker's tick
|
|
TRI_voc_tick_t const tick = marker->getTick();
|
|
|
|
TRI_ASSERT(tick >= state->lastTick);
|
|
|
|
if (tick > state->lastTick) {
|
|
state->lastTick = tick;
|
|
}
|
|
|
|
MMFilesMarkerType const type = marker->getType();
|
|
|
|
switch (type) {
|
|
case TRI_DF_MARKER_VPACK_DOCUMENT: {
|
|
VPackSlice const payloadSlice(reinterpret_cast<char const*>(marker) +
|
|
MMFilesDatafileHelper::VPackOffset(type));
|
|
if (payloadSlice.isObject()) {
|
|
TRI_voc_rid_t revisionId =
|
|
transaction::helpers::extractRevFromDocument(payloadSlice);
|
|
if (revisionId != UINT64_MAX && revisionId > state->maxRevisionId) {
|
|
state->maxRevisionId = revisionId;
|
|
}
|
|
}
|
|
break;
|
|
}
|
|
|
|
case TRI_DF_MARKER_VPACK_BEGIN_TRANSACTION: {
|
|
// insert this transaction into the list of failed transactions
|
|
// we do this because if we don't find a commit marker for this
|
|
// transaction,
|
|
// we'll have it in the failed list at the end of the scan and can ignore
|
|
// it
|
|
TRI_voc_tick_t const databaseId =
|
|
MMFilesDatafileHelper::DatabaseId(marker);
|
|
TRI_voc_tid_t const tid = MMFilesDatafileHelper::TransactionId(marker);
|
|
state->failedTransactions.emplace(tid, std::make_pair(databaseId, false));
|
|
break;
|
|
}
|
|
|
|
case TRI_DF_MARKER_VPACK_COMMIT_TRANSACTION: {
|
|
// remove this transaction from the list of failed transactions
|
|
TRI_voc_tid_t const tid = MMFilesDatafileHelper::TransactionId(marker);
|
|
state->failedTransactions.erase(tid);
|
|
break;
|
|
}
|
|
|
|
case TRI_DF_MARKER_VPACK_ABORT_TRANSACTION: {
|
|
// insert this transaction into the list of failed transactions
|
|
TRI_voc_tick_t const databaseId =
|
|
MMFilesDatafileHelper::DatabaseId(marker);
|
|
TRI_voc_tid_t const tid = MMFilesDatafileHelper::TransactionId(marker);
|
|
state->failedTransactions[tid] = std::make_pair(databaseId, true);
|
|
break;
|
|
}
|
|
|
|
case TRI_DF_MARKER_VPACK_DROP_DATABASE: {
|
|
// note that the database was dropped and doesn't need to be recovered
|
|
TRI_voc_tick_t const databaseId =
|
|
MMFilesDatafileHelper::DatabaseId(marker);
|
|
state->totalDroppedDatabases.emplace(databaseId);
|
|
break;
|
|
}
|
|
|
|
case TRI_DF_MARKER_VPACK_DROP_COLLECTION: {
|
|
// note that the collection was dropped and doesn't need to be recovered
|
|
TRI_voc_cid_t const collectionId =
|
|
MMFilesDatafileHelper::CollectionId(marker);
|
|
state->totalDroppedCollections.emplace(collectionId);
|
|
break;
|
|
}
|
|
|
|
case TRI_DF_MARKER_VPACK_DROP_VIEW: {
|
|
// note that the view was dropped and doesn't need to be recovered
|
|
TRI_voc_cid_t const viewId = MMFilesDatafileHelper::ViewId(marker);
|
|
state->totalDroppedViews.emplace(viewId);
|
|
break;
|
|
}
|
|
|
|
default: {
|
|
// do nothing
|
|
}
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
/// @brief callback to replay one marker during recovery
|
|
/// this function modifies indexes etc.
|
|
bool MMFilesWalRecoverState::ReplayMarker(MMFilesMarker const* marker,
|
|
void* data,
|
|
MMFilesDatafile* datafile) {
|
|
MMFilesWalRecoverState* state =
|
|
reinterpret_cast<MMFilesWalRecoverState*>(data);
|
|
|
|
#ifdef ARANGODB_ENABLE_FAILURE_TESTS
|
|
LOG_TOPIC(TRACE, arangodb::Logger::ENGINES) << "replaying marker of type "
|
|
<< TRI_NameMarkerDatafile(marker);
|
|
#endif
|
|
|
|
MMFilesMarkerType const type = marker->getType();
|
|
|
|
try {
|
|
switch (type) {
|
|
case TRI_DF_MARKER_PROLOGUE: {
|
|
// simply note the last state
|
|
TRI_voc_tick_t const databaseId =
|
|
MMFilesDatafileHelper::DatabaseId(marker);
|
|
TRI_voc_cid_t const collectionId =
|
|
MMFilesDatafileHelper::CollectionId(marker);
|
|
|
|
LOG_TOPIC(TRACE, arangodb::Logger::ENGINES)
|
|
<< "found prologue marker. databaseId: " << databaseId
|
|
<< ", collectionId: " << collectionId;
|
|
state->resetCollection(databaseId, collectionId);
|
|
return true;
|
|
}
|
|
|
|
// -----------------------------------------------------------------------------
|
|
// crud operations
|
|
// -----------------------------------------------------------------------------
|
|
|
|
case TRI_DF_MARKER_VPACK_DOCUMENT: {
|
|
// re-insert the document/edge into the collection
|
|
TRI_voc_tick_t const databaseId =
|
|
state->lastDatabaseId; // from prologue
|
|
TRI_voc_cid_t const collectionId =
|
|
state->lastCollectionId; // from prologue
|
|
|
|
if (state->isDropped(databaseId, collectionId)) {
|
|
return true;
|
|
}
|
|
|
|
TRI_voc_tid_t const tid = MMFilesDatafileHelper::TransactionId(marker);
|
|
|
|
if (state->ignoreTransaction(tid)) {
|
|
// transaction was aborted
|
|
return true;
|
|
}
|
|
|
|
LOG_TOPIC(TRACE, arangodb::Logger::ENGINES)
|
|
<< "found document marker. databaseId: " << databaseId
|
|
<< ", collectionId: " << collectionId << ", transactionId: " << tid;
|
|
|
|
int res = state->executeSingleOperation(
|
|
databaseId, collectionId, marker, datafile->fid(),
|
|
[&](SingleCollectionTransaction* trx,
|
|
MMFilesMarkerEnvelope* envelope) -> int {
|
|
if (arangodb::MMFilesCollection::toMMFilesCollection(
|
|
trx->documentCollection())
|
|
->isVolatile()) {
|
|
return TRI_ERROR_NO_ERROR;
|
|
}
|
|
|
|
MMFilesMarker const* marker =
|
|
static_cast<MMFilesMarker const*>(envelope->mem());
|
|
|
|
std::string const collectionName =
|
|
trx->documentCollection()->name();
|
|
uint8_t const* ptr = reinterpret_cast<uint8_t const*>(marker) +
|
|
MMFilesDatafileHelper::VPackOffset(type);
|
|
|
|
OperationOptions options;
|
|
options.silent = true;
|
|
options.recoveryData = static_cast<void*>(envelope);
|
|
options.isRestore = true;
|
|
options.waitForSync = false;
|
|
options.ignoreRevs = true;
|
|
|
|
// try an insert first
|
|
TRI_ASSERT(VPackSlice(ptr).isObject());
|
|
OperationResult opRes =
|
|
trx->insert(collectionName, VPackSlice(ptr), options);
|
|
int res = opRes.errorNumber();
|
|
|
|
if (opRes.is(TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED)) {
|
|
// document/edge already exists, now make it a replace
|
|
opRes = trx->replace(collectionName, VPackSlice(ptr), options);
|
|
res = opRes.errorNumber();
|
|
}
|
|
|
|
return res;
|
|
});
|
|
|
|
if (res != TRI_ERROR_NO_ERROR && res != TRI_ERROR_ARANGO_CONFLICT &&
|
|
res != TRI_ERROR_ARANGO_DATABASE_NOT_FOUND &&
|
|
res != TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND) {
|
|
LOG_TOPIC(WARN, arangodb::Logger::ENGINES)
|
|
<< "unable to insert document in collection " << collectionId
|
|
<< " of database " << databaseId << ": " << TRI_errno_string(res);
|
|
++state->errorCount;
|
|
return state->canContinue();
|
|
}
|
|
break;
|
|
}
|
|
|
|
case TRI_DF_MARKER_VPACK_REMOVE: {
|
|
// re-apply the remove operation
|
|
TRI_voc_tick_t const databaseId =
|
|
state->lastDatabaseId; // from prologue
|
|
TRI_voc_cid_t const collectionId =
|
|
state->lastCollectionId; // from prologue
|
|
|
|
TRI_ASSERT(databaseId > 0);
|
|
TRI_ASSERT(collectionId > 0);
|
|
|
|
if (state->isDropped(databaseId, collectionId)) {
|
|
return true;
|
|
}
|
|
|
|
TRI_voc_tid_t const tid = MMFilesDatafileHelper::TransactionId(marker);
|
|
|
|
if (state->ignoreTransaction(tid)) {
|
|
return true;
|
|
}
|
|
|
|
LOG_TOPIC(TRACE, arangodb::Logger::ENGINES)
|
|
<< "found remove marker. databaseId: " << databaseId
|
|
<< ", collectionId: " << collectionId << ", transactionId: " << tid;
|
|
|
|
int res = state->executeSingleOperation(
|
|
databaseId, collectionId, marker, datafile->fid(),
|
|
[&](SingleCollectionTransaction* trx,
|
|
MMFilesMarkerEnvelope* envelope) -> int {
|
|
|
|
if (arangodb::MMFilesCollection::toMMFilesCollection(
|
|
trx->documentCollection())
|
|
->isVolatile()) {
|
|
return TRI_ERROR_NO_ERROR;
|
|
}
|
|
|
|
MMFilesMarker const* marker =
|
|
static_cast<MMFilesMarker const*>(envelope->mem());
|
|
|
|
std::string const collectionName =
|
|
trx->documentCollection()->name();
|
|
uint8_t const* ptr = reinterpret_cast<uint8_t const*>(marker) +
|
|
MMFilesDatafileHelper::VPackOffset(type);
|
|
|
|
OperationOptions options;
|
|
options.silent = true;
|
|
options.recoveryData = static_cast<void*>(envelope);
|
|
options.waitForSync = false;
|
|
options.ignoreRevs = true;
|
|
|
|
try {
|
|
OperationResult opRes =
|
|
trx->remove(collectionName, VPackSlice(ptr), options);
|
|
if (opRes.is(TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND)) {
|
|
// document to delete is not present. this error can be
|
|
// ignored
|
|
return TRI_ERROR_NO_ERROR;
|
|
}
|
|
return opRes.errorNumber();
|
|
} catch (arangodb::basics::Exception const& ex) {
|
|
if (ex.code() == TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND) {
|
|
// document to delete is not present. this error can be
|
|
// ignored
|
|
return TRI_ERROR_NO_ERROR;
|
|
}
|
|
return ex.code();
|
|
}
|
|
// should not get here...
|
|
return TRI_ERROR_INTERNAL;
|
|
});
|
|
|
|
if (res != TRI_ERROR_NO_ERROR && res != TRI_ERROR_ARANGO_CONFLICT &&
|
|
res != TRI_ERROR_ARANGO_DATABASE_NOT_FOUND &&
|
|
res != TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND &&
|
|
res != TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND) {
|
|
LOG_TOPIC(WARN, arangodb::Logger::ENGINES)
|
|
<< "unable to remove document in collection " << collectionId
|
|
<< " of database " << databaseId << ": " << TRI_errno_string(res);
|
|
++state->errorCount;
|
|
return state->canContinue();
|
|
}
|
|
break;
|
|
}
|
|
|
|
// -----------------------------------------------------------------------------
|
|
// ddl
|
|
// -----------------------------------------------------------------------------
|
|
|
|
case TRI_DF_MARKER_VPACK_RENAME_COLLECTION: {
|
|
TRI_voc_tick_t const databaseId =
|
|
MMFilesDatafileHelper::DatabaseId(marker);
|
|
TRI_voc_cid_t const collectionId =
|
|
MMFilesDatafileHelper::CollectionId(marker);
|
|
VPackSlice const payloadSlice(reinterpret_cast<char const*>(marker) +
|
|
MMFilesDatafileHelper::VPackOffset(type));
|
|
|
|
if (!payloadSlice.isObject()) {
|
|
LOG_TOPIC(WARN, arangodb::Logger::ENGINES)
|
|
<< "cannot rename collection: invalid marker";
|
|
++state->errorCount;
|
|
return state->canContinue();
|
|
}
|
|
|
|
if (state->isDropped(databaseId)) {
|
|
return true;
|
|
}
|
|
|
|
LOG_TOPIC(TRACE, arangodb::Logger::ENGINES)
|
|
<< "found collection rename marker. databaseId: " << databaseId
|
|
<< ", collectionId: " << collectionId;
|
|
|
|
TRI_vocbase_t* vocbase = state->useDatabase(databaseId);
|
|
|
|
if (vocbase == nullptr) {
|
|
// if the underlying database is gone, we can go on
|
|
LOG_TOPIC(TRACE, arangodb::Logger::ENGINES) << "cannot open database "
|
|
<< databaseId;
|
|
return true;
|
|
}
|
|
|
|
auto collection = state->releaseCollection(collectionId);
|
|
if (!collection) {
|
|
collection = vocbase->lookupCollection(collectionId);
|
|
}
|
|
|
|
if (collection == nullptr) {
|
|
// if the underlying collection is gone, we can go on
|
|
LOG_TOPIC(TRACE, arangodb::Logger::ENGINES) << "cannot open collection "
|
|
<< collectionId;
|
|
return true;
|
|
}
|
|
|
|
VPackSlice nameSlice = payloadSlice.get("name");
|
|
if (!nameSlice.isString()) {
|
|
LOG_TOPIC(WARN, arangodb::Logger::ENGINES)
|
|
<< "cannot rename collection " << collectionId << " in database "
|
|
<< databaseId << ": name attribute is no string";
|
|
++state->errorCount;
|
|
return state->canContinue();
|
|
}
|
|
std::string name = nameSlice.copyString();
|
|
|
|
// check if other collection exist with target name
|
|
auto other = vocbase->lookupCollection(name);
|
|
|
|
if (other != nullptr) {
|
|
if (other->id() == collection->id()) {
|
|
LOG_TOPIC(TRACE, arangodb::Logger::ENGINES)
|
|
<< "collection " << collectionId << " in database "
|
|
<< databaseId << " already renamed; moving on";
|
|
break;
|
|
} else {
|
|
auto otherCid = other->id();
|
|
|
|
state->releaseCollection(otherCid);
|
|
vocbase->dropCollection(otherCid, true, -1.0);
|
|
}
|
|
}
|
|
|
|
auto res = vocbase->renameCollection(collection->id(), name, true);
|
|
|
|
if (!res.ok()) {
|
|
LOG_TOPIC(WARN, arangodb::Logger::ENGINES)
|
|
<< "cannot rename collection " << collectionId << " in database "
|
|
<< databaseId << " to '" << name
|
|
<< "': " << res.errorMessage();
|
|
++state->errorCount;
|
|
|
|
return state->canContinue();
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
case TRI_DF_MARKER_VPACK_CHANGE_COLLECTION: {
|
|
TRI_voc_tick_t const databaseId =
|
|
MMFilesDatafileHelper::DatabaseId(marker);
|
|
TRI_voc_cid_t const collectionId =
|
|
MMFilesDatafileHelper::CollectionId(marker);
|
|
VPackSlice const payloadSlice(reinterpret_cast<char const*>(marker) +
|
|
MMFilesDatafileHelper::VPackOffset(type));
|
|
|
|
if (!payloadSlice.isObject()) {
|
|
LOG_TOPIC(WARN, arangodb::Logger::ENGINES)
|
|
<< "cannot change properties of collection: invalid marker";
|
|
++state->errorCount;
|
|
return state->canContinue();
|
|
}
|
|
|
|
if (state->isDropped(databaseId)) {
|
|
return true;
|
|
}
|
|
|
|
LOG_TOPIC(TRACE, arangodb::Logger::ENGINES)
|
|
<< "found collection change marker. databaseId: " << databaseId
|
|
<< ", collectionId: " << collectionId;
|
|
|
|
TRI_vocbase_t* vocbase = state->useDatabase(databaseId);
|
|
|
|
if (vocbase == nullptr) {
|
|
// if the underlying database is gone, we can go on
|
|
LOG_TOPIC(TRACE, arangodb::Logger::ENGINES) << "cannot open database "
|
|
<< databaseId;
|
|
return true;
|
|
}
|
|
|
|
LogicalCollection* collection =
|
|
state->getCollection(databaseId, collectionId);
|
|
|
|
if (collection == nullptr) {
|
|
// if the underlying collection is gone, we can go on
|
|
LOG_TOPIC(TRACE, arangodb::Logger::ENGINES)
|
|
<< "cannot change properties of collection " << collectionId
|
|
<< " in database " << databaseId << ": "
|
|
<< TRI_errno_string(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND);
|
|
return true;
|
|
}
|
|
|
|
auto res = collection->properties(payloadSlice, false); // always a full-update
|
|
|
|
if (!res.ok()) {
|
|
LOG_TOPIC(WARN, arangodb::Logger::ENGINES)
|
|
<< "cannot change properties for collection " << collectionId
|
|
<< " in database " << databaseId << ": " << res.errorMessage();
|
|
++state->errorCount;
|
|
return state->canContinue();
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
case TRI_DF_MARKER_VPACK_CHANGE_VIEW: {
|
|
TRI_voc_tick_t const databaseId =
|
|
MMFilesDatafileHelper::DatabaseId(marker);
|
|
TRI_voc_cid_t const viewId = MMFilesDatafileHelper::ViewId(marker);
|
|
VPackSlice const payloadSlice(reinterpret_cast<char const*>(marker) +
|
|
MMFilesDatafileHelper::VPackOffset(type));
|
|
|
|
if (!payloadSlice.isObject()) {
|
|
LOG_TOPIC(WARN, arangodb::Logger::ENGINES)
|
|
<< "cannot change properties of view: invalid marker";
|
|
++state->errorCount;
|
|
return state->canContinue();
|
|
}
|
|
|
|
if (state->isDropped(databaseId)) {
|
|
return true;
|
|
}
|
|
|
|
LOG_TOPIC(TRACE, arangodb::Logger::ENGINES)
|
|
<< "found view change marker. databaseId: " << databaseId
|
|
<< ", viewId: " << viewId;
|
|
|
|
TRI_vocbase_t* vocbase = state->useDatabase(databaseId);
|
|
|
|
if (vocbase == nullptr) {
|
|
// if the underlying database is gone, we can go on
|
|
LOG_TOPIC(TRACE, arangodb::Logger::ENGINES) << "cannot open database "
|
|
<< databaseId;
|
|
return true;
|
|
}
|
|
|
|
std::shared_ptr<arangodb::LogicalView> view =
|
|
vocbase->lookupView(viewId);
|
|
|
|
if (view == nullptr) {
|
|
// if the underlying collection is gone, we can go on
|
|
LOG_TOPIC(TRACE, arangodb::Logger::ENGINES)
|
|
<< "cannot change properties of view " << viewId
|
|
<< " in database " << databaseId << ": "
|
|
<< TRI_errno_string(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND);
|
|
return true;
|
|
}
|
|
|
|
VPackSlice nameSlice = payloadSlice.get("name");
|
|
|
|
if (nameSlice.isString() && !nameSlice.isEqualString(view->name())) {
|
|
std::string name = nameSlice.copyString();
|
|
// check if other view exists with target name
|
|
std::shared_ptr<arangodb::LogicalView> other = vocbase->lookupView(name);
|
|
|
|
if (other != nullptr) {
|
|
if (other->id() == view->id()) {
|
|
LOG_TOPIC(TRACE, arangodb::Logger::ENGINES)
|
|
<< "view " << viewId << " in database " << databaseId
|
|
<< " was already renamed; moving on";
|
|
break;
|
|
}
|
|
vocbase->dropView(other->id(), true);
|
|
}
|
|
|
|
auto res = view->rename(std::string(name));
|
|
|
|
if (!res.ok()) {
|
|
LOG_TOPIC(WARN, arangodb::Logger::ENGINES)
|
|
<< "cannot rename view " << viewId << " in database "
|
|
<< databaseId << " to '" << name
|
|
<< "': " << res.errorMessage();
|
|
++state->errorCount;
|
|
|
|
return state->canContinue();
|
|
}
|
|
}
|
|
|
|
auto res = view->properties(payloadSlice, false); // always a full-update
|
|
|
|
if (!res.ok()) {
|
|
LOG_TOPIC(WARN, arangodb::Logger::ENGINES)
|
|
<< "cannot change properties for view " << viewId
|
|
<< " in database " << databaseId << ": " << res.errorMessage();
|
|
++state->errorCount;
|
|
return state->canContinue();
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
case TRI_DF_MARKER_VPACK_CREATE_INDEX: {
|
|
TRI_voc_tick_t const databaseId =
|
|
MMFilesDatafileHelper::DatabaseId(marker);
|
|
TRI_voc_cid_t const collectionId =
|
|
MMFilesDatafileHelper::CollectionId(marker);
|
|
VPackSlice const payloadSlice(reinterpret_cast<char const*>(marker) +
|
|
MMFilesDatafileHelper::VPackOffset(type));
|
|
|
|
if (!payloadSlice.isObject()) {
|
|
LOG_TOPIC(WARN, arangodb::Logger::ENGINES)
|
|
<< "cannot create index for collection: invalid marker";
|
|
++state->errorCount;
|
|
return state->canContinue();
|
|
}
|
|
|
|
TRI_idx_iid_t indexId = numericValue<TRI_idx_iid_t>(payloadSlice, "id");
|
|
|
|
if (state->isDropped(databaseId, collectionId)) {
|
|
return true;
|
|
}
|
|
|
|
LOG_TOPIC(TRACE, arangodb::Logger::ENGINES)
|
|
<< "found create index marker. databaseId: " << databaseId
|
|
<< ", collectionId: " << collectionId;
|
|
|
|
TRI_vocbase_t* vocbase = state->useDatabase(databaseId);
|
|
|
|
if (vocbase == nullptr) {
|
|
// if the underlying database is gone, we can go on
|
|
LOG_TOPIC(TRACE, arangodb::Logger::ENGINES)
|
|
<< "cannot create index for collection " << collectionId
|
|
<< " in database " << databaseId << ": "
|
|
<< TRI_errno_string(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND);
|
|
return true;
|
|
}
|
|
|
|
auto col = vocbase->lookupCollection(collectionId);
|
|
|
|
if (col == nullptr) {
|
|
// if the underlying collection gone, we can go on
|
|
LOG_TOPIC(TRACE, arangodb::Logger::ENGINES)
|
|
<< "cannot create index for collection " << collectionId
|
|
<< " in database " << databaseId << ": "
|
|
<< TRI_errno_string(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND);
|
|
return true;
|
|
}
|
|
|
|
auto physical = static_cast<MMFilesCollection*>(col->getPhysical());
|
|
TRI_ASSERT(physical != nullptr);
|
|
MMFilesPersistentIndexFeature::dropIndex(databaseId, collectionId,
|
|
indexId);
|
|
|
|
std::string const indexName("index-" + std::to_string(indexId) +
|
|
".json");
|
|
std::string const filename(arangodb::basics::FileUtils::buildFilename(
|
|
physical->path(), indexName));
|
|
|
|
bool const forceSync = state->willBeDropped(databaseId, collectionId);
|
|
bool ok = arangodb::basics::VelocyPackHelper::velocyPackToFile(
|
|
filename, payloadSlice, forceSync);
|
|
|
|
if (!ok) {
|
|
LOG_TOPIC(WARN, arangodb::Logger::ENGINES)
|
|
<< "cannot create index " << indexId << ", collection "
|
|
<< collectionId << " in database " << databaseId;
|
|
++state->errorCount;
|
|
|
|
return state->canContinue();
|
|
} else {
|
|
auto ctx = transaction::StandaloneContext::Create(*vocbase);
|
|
arangodb::SingleCollectionTransaction trx(
|
|
ctx, *col, AccessMode::Type::WRITE
|
|
);
|
|
std::shared_ptr<arangodb::Index> unused;
|
|
int res = physical->restoreIndex(&trx, payloadSlice, unused);
|
|
|
|
if (res != TRI_ERROR_NO_ERROR) {
|
|
LOG_TOPIC(WARN, arangodb::Logger::ENGINES)
|
|
<< "cannot create index " << indexId << ", collection "
|
|
<< collectionId << " in database " << databaseId;
|
|
++state->errorCount;
|
|
|
|
return state->canContinue();
|
|
}
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
case TRI_DF_MARKER_VPACK_CREATE_COLLECTION: {
|
|
TRI_voc_tick_t const databaseId =
|
|
MMFilesDatafileHelper::DatabaseId(marker);
|
|
TRI_voc_cid_t const collectionId =
|
|
MMFilesDatafileHelper::CollectionId(marker);
|
|
VPackSlice const payloadSlice(reinterpret_cast<char const*>(marker) +
|
|
MMFilesDatafileHelper::VPackOffset(type));
|
|
|
|
if (!payloadSlice.isObject()) {
|
|
LOG_TOPIC(WARN, arangodb::Logger::ENGINES)
|
|
<< "cannot create collection: invalid marker";
|
|
++state->errorCount;
|
|
return state->canContinue();
|
|
}
|
|
|
|
LOG_TOPIC(TRACE, arangodb::Logger::ENGINES)
|
|
<< "found create collection marker. databaseId: " << databaseId
|
|
<< ", collectionId: " << collectionId;
|
|
|
|
// remove the drop marker
|
|
state->droppedCollections.erase(collectionId);
|
|
|
|
if (state->isDropped(databaseId)) {
|
|
return true;
|
|
}
|
|
|
|
TRI_vocbase_t* vocbase = state->useDatabase(databaseId);
|
|
|
|
if (vocbase == nullptr) {
|
|
// if the underlying database is gone, we can go on
|
|
LOG_TOPIC(TRACE, arangodb::Logger::ENGINES) << "cannot open database "
|
|
<< databaseId;
|
|
return true;
|
|
}
|
|
|
|
auto collection = state->releaseCollection(collectionId);
|
|
if (!collection) {
|
|
collection = vocbase->lookupCollection(collectionId);
|
|
}
|
|
|
|
if (collection != nullptr) {
|
|
// drop an existing collection
|
|
vocbase->dropCollection(collection->id(), true, -1.0);
|
|
}
|
|
|
|
MMFilesPersistentIndexFeature::dropCollection(databaseId, collectionId);
|
|
|
|
// check if there is another collection with the same name as the one
|
|
// that
|
|
// we attempt to create
|
|
VPackSlice const nameSlice = payloadSlice.get("name");
|
|
std::string name = "";
|
|
|
|
if (nameSlice.isString()) {
|
|
name = nameSlice.copyString();
|
|
collection = vocbase->lookupCollection(name);
|
|
|
|
if (collection != nullptr) {
|
|
auto otherCid = collection->id();
|
|
|
|
state->releaseCollection(otherCid);
|
|
vocbase->dropCollection(otherCid, true, -1.0);
|
|
}
|
|
} else {
|
|
LOG_TOPIC(WARN, arangodb::Logger::ENGINES)
|
|
<< "empty name attribute in create collection marker for "
|
|
"collection "
|
|
<< collectionId << " and database " << databaseId;
|
|
++state->errorCount;
|
|
return state->canContinue();
|
|
}
|
|
|
|
// fiddle "isSystem" value, which is not contained in the JSON file
|
|
bool isSystemValue = false;
|
|
if (!name.empty()) {
|
|
isSystemValue = name[0] == '_';
|
|
}
|
|
|
|
VPackBuilder bx;
|
|
bx.openObject();
|
|
bx.add("isSystem", VPackValue(isSystemValue));
|
|
bx.close();
|
|
VPackSlice isSystem = bx.slice();
|
|
VPackBuilder b2 = VPackCollection::merge(payloadSlice, isSystem, false);
|
|
|
|
int res = TRI_ERROR_NO_ERROR;
|
|
try {
|
|
if (state->willBeDropped(collectionId)) {
|
|
// in case we detect that this collection is going to be deleted
|
|
// anyway,
|
|
// set the sync properties to false temporarily
|
|
bool oldSync = state->databaseFeature->forceSyncProperties();
|
|
state->databaseFeature->forceSyncProperties(false);
|
|
// restore the old behavior afterwards
|
|
TRI_DEFER(state->databaseFeature->forceSyncProperties(oldSync));
|
|
|
|
collection = vocbase->createCollection(b2.slice());
|
|
} else {
|
|
// collection will be kept
|
|
collection = vocbase->createCollection(b2.slice());
|
|
}
|
|
TRI_ASSERT(collection != nullptr);
|
|
} catch (basics::Exception const& ex) {
|
|
res = ex.code();
|
|
} catch (...) {
|
|
res = TRI_ERROR_INTERNAL;
|
|
}
|
|
|
|
if (res != TRI_ERROR_NO_ERROR) {
|
|
LOG_TOPIC(WARN, arangodb::Logger::ENGINES)
|
|
<< "cannot create collection " << collectionId << " in database "
|
|
<< databaseId << ": " << TRI_errno_string(res);
|
|
++state->errorCount;
|
|
return state->canContinue();
|
|
}
|
|
break;
|
|
}
|
|
|
|
case TRI_DF_MARKER_VPACK_CREATE_VIEW: {
|
|
TRI_voc_tick_t const databaseId =
|
|
MMFilesDatafileHelper::DatabaseId(marker);
|
|
TRI_voc_cid_t const viewId = MMFilesDatafileHelper::ViewId(marker);
|
|
VPackSlice const payloadSlice(reinterpret_cast<char const*>(marker) +
|
|
MMFilesDatafileHelper::VPackOffset(type));
|
|
|
|
if (!payloadSlice.isObject()) {
|
|
LOG_TOPIC(WARN, arangodb::Logger::ENGINES)
|
|
<< "cannot create view: invalid marker";
|
|
++state->errorCount;
|
|
return state->canContinue();
|
|
}
|
|
|
|
LOG_TOPIC(TRACE, arangodb::Logger::ENGINES)
|
|
<< "found create view marker. databaseId: " << databaseId
|
|
<< ", viewId: " << viewId;
|
|
|
|
// remove the drop marker
|
|
state->droppedViews.erase(viewId);
|
|
|
|
if (state->isDropped(databaseId)) {
|
|
return true;
|
|
}
|
|
|
|
TRI_vocbase_t* vocbase = state->useDatabase(databaseId);
|
|
|
|
if (vocbase == nullptr) {
|
|
// if the underlying database is gone, we can go on
|
|
LOG_TOPIC(TRACE, arangodb::Logger::ENGINES) << "cannot open database "
|
|
<< databaseId;
|
|
return true;
|
|
}
|
|
|
|
std::shared_ptr<arangodb::LogicalView> view =
|
|
vocbase->lookupView(viewId);
|
|
|
|
if (view != nullptr) {
|
|
view->drop(); // drop an existing view
|
|
}
|
|
|
|
// check if there is another view with the same name as the one that
|
|
// we attempt to create
|
|
VPackSlice const nameSlice = payloadSlice.get("name");
|
|
std::string name = "";
|
|
|
|
if (nameSlice.isString()) {
|
|
name = nameSlice.copyString();
|
|
view = vocbase->lookupView(name);
|
|
|
|
if (view != nullptr) {
|
|
view->drop();
|
|
}
|
|
} else {
|
|
LOG_TOPIC(WARN, arangodb::Logger::ENGINES)
|
|
<< "empty name attribute in create view marker for "
|
|
"view "
|
|
<< viewId << " and database " << databaseId;
|
|
++state->errorCount;
|
|
return state->canContinue();
|
|
}
|
|
|
|
int res = TRI_ERROR_NO_ERROR;
|
|
|
|
try {
|
|
if (state->willViewBeDropped(viewId)) {
|
|
// in case we detect that this view is going to be deleted
|
|
// anyway,
|
|
// set the sync properties to false temporarily
|
|
bool oldSync = state->databaseFeature->forceSyncProperties();
|
|
state->databaseFeature->forceSyncProperties(false);
|
|
// restore the old behavior afterwards
|
|
TRI_DEFER(state->databaseFeature->forceSyncProperties(oldSync));
|
|
}
|
|
|
|
auto res = arangodb::LogicalView::create(view,*vocbase, payloadSlice);
|
|
TRI_ASSERT(res.ok());
|
|
TRI_ASSERT(view != nullptr);
|
|
TRI_ASSERT(view->id() == viewId); // otherwise this a corrupt marker
|
|
} catch (basics::Exception const& ex) {
|
|
res = ex.code();
|
|
} catch (...) {
|
|
res = TRI_ERROR_INTERNAL;
|
|
}
|
|
|
|
if (res != TRI_ERROR_NO_ERROR) {
|
|
LOG_TOPIC(WARN, arangodb::Logger::ENGINES)
|
|
<< "cannot create view " << viewId << " in database "
|
|
<< databaseId << ": " << TRI_errno_string(res);
|
|
++state->errorCount;
|
|
return state->canContinue();
|
|
}
|
|
break;
|
|
}
|
|
|
|
case TRI_DF_MARKER_VPACK_CREATE_DATABASE: {
|
|
TRI_voc_tick_t const databaseId =
|
|
MMFilesDatafileHelper::DatabaseId(marker);
|
|
VPackSlice const payloadSlice(reinterpret_cast<char const*>(marker) +
|
|
MMFilesDatafileHelper::VPackOffset(type));
|
|
|
|
if (!payloadSlice.isObject()) {
|
|
LOG_TOPIC(WARN, arangodb::Logger::ENGINES)
|
|
<< "cannot create database: invalid marker";
|
|
++state->errorCount;
|
|
return state->canContinue();
|
|
}
|
|
|
|
LOG_TOPIC(TRACE, arangodb::Logger::ENGINES)
|
|
<< "found create database marker. databaseId: " << databaseId;
|
|
|
|
// remove the drop marker
|
|
state->droppedDatabases.erase(databaseId);
|
|
TRI_vocbase_t* vocbase = state->releaseDatabase(databaseId);
|
|
|
|
if (vocbase != nullptr) {
|
|
// remove already existing database
|
|
// TODO: how to signal a dropDatabase failure here?
|
|
state->databaseFeature->dropDatabase(databaseId, true, false);
|
|
}
|
|
|
|
VPackSlice const nameSlice = payloadSlice.get("name");
|
|
|
|
if (!nameSlice.isString()) {
|
|
LOG_TOPIC(WARN, arangodb::Logger::ENGINES)
|
|
<< "cannot unpack database properties for database "
|
|
<< databaseId;
|
|
++state->errorCount;
|
|
return state->canContinue();
|
|
}
|
|
|
|
std::string nameString = nameSlice.copyString();
|
|
|
|
MMFilesEngine* engine = static_cast<MMFilesEngine*>(EngineSelectorFeature::ENGINE);
|
|
std::string const versionFile = engine->versionFilename(databaseId);
|
|
std::string const versionFileContent = std::string("{\"version\":") + std::to_string(rest::Version::getNumericServerVersion()) + ",\"tasks\":{}}";
|
|
|
|
// remove already existing database with same name
|
|
vocbase = state->databaseFeature->lookupDatabase(nameString);
|
|
|
|
if (vocbase != nullptr) {
|
|
TRI_voc_tick_t otherId = vocbase->id();
|
|
|
|
state->releaseDatabase(otherId);
|
|
// TODO: how to signal a dropDatabase failure here?
|
|
state->databaseFeature->dropDatabase(nameString, true, false);
|
|
}
|
|
|
|
MMFilesPersistentIndexFeature::dropDatabase(databaseId);
|
|
|
|
vocbase = nullptr;
|
|
int res = state->databaseFeature->createDatabase(databaseId, nameString,
|
|
vocbase);
|
|
|
|
if (res != TRI_ERROR_NO_ERROR) {
|
|
LOG_TOPIC(WARN, arangodb::Logger::ENGINES)
|
|
<< "cannot create database " << databaseId << ": "
|
|
<< TRI_errno_string(res);
|
|
++state->errorCount;
|
|
return state->canContinue();
|
|
}
|
|
|
|
try {
|
|
basics::FileUtils::spit(versionFile, versionFileContent);
|
|
} catch (...) {
|
|
LOG_TOPIC(WARN, arangodb::Logger::ENGINES)
|
|
<< "unable to store version file '" << versionFile
|
|
<< "' for database " << databaseId;
|
|
++state->errorCount;
|
|
return state->canContinue();
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
case TRI_DF_MARKER_VPACK_DROP_INDEX: {
|
|
TRI_voc_tick_t const databaseId =
|
|
MMFilesDatafileHelper::DatabaseId(marker);
|
|
TRI_voc_cid_t const collectionId =
|
|
MMFilesDatafileHelper::CollectionId(marker);
|
|
VPackSlice const payloadSlice(reinterpret_cast<char const*>(marker) +
|
|
MMFilesDatafileHelper::VPackOffset(type));
|
|
|
|
if (!payloadSlice.isObject()) {
|
|
LOG_TOPIC(WARN, arangodb::Logger::ENGINES)
|
|
<< "cannot drop index for collection: invalid marker";
|
|
++state->errorCount;
|
|
return state->canContinue();
|
|
}
|
|
|
|
TRI_idx_iid_t indexId = numericValue<TRI_idx_iid_t>(payloadSlice, "id");
|
|
|
|
LOG_TOPIC(TRACE, arangodb::Logger::ENGINES)
|
|
<< "found drop index marker. databaseId: " << databaseId
|
|
<< ", collectionId: " << collectionId << ", indexId: " << indexId;
|
|
|
|
if (state->isDropped(databaseId, collectionId)) {
|
|
return true;
|
|
}
|
|
|
|
TRI_vocbase_t* vocbase = state->useDatabase(databaseId);
|
|
|
|
if (vocbase == nullptr) {
|
|
// if the underlying database is gone, we can go on
|
|
LOG_TOPIC(TRACE, arangodb::Logger::ENGINES) << "cannot open database "
|
|
<< databaseId;
|
|
return true;
|
|
}
|
|
|
|
auto col = vocbase->lookupCollection(collectionId);
|
|
|
|
if (col == nullptr) {
|
|
// if the underlying collection gone, we can go on
|
|
return true;
|
|
}
|
|
|
|
// ignore any potential error returned by this call
|
|
auto physical = static_cast<MMFilesCollection*>(col->getPhysical());
|
|
TRI_ASSERT(physical != nullptr);
|
|
col->dropIndex(indexId);
|
|
|
|
MMFilesPersistentIndexFeature::dropIndex(databaseId, collectionId,
|
|
indexId);
|
|
|
|
// additionally remove the index file
|
|
std::string const indexName("index-" + std::to_string(indexId) +
|
|
".json");
|
|
std::string const filename(arangodb::basics::FileUtils::buildFilename(
|
|
physical->path(), indexName));
|
|
|
|
TRI_UnlinkFile(filename.c_str());
|
|
break;
|
|
}
|
|
|
|
case TRI_DF_MARKER_VPACK_DROP_COLLECTION: {
|
|
TRI_voc_tick_t const databaseId =
|
|
MMFilesDatafileHelper::DatabaseId(marker);
|
|
TRI_voc_cid_t const collectionId =
|
|
MMFilesDatafileHelper::CollectionId(marker);
|
|
|
|
// insert the drop marker
|
|
state->droppedCollections.emplace(collectionId);
|
|
|
|
LOG_TOPIC(TRACE, arangodb::Logger::ENGINES)
|
|
<< "found drop collection marker. databaseId: " << databaseId
|
|
<< ", collectionId: " << collectionId;
|
|
|
|
TRI_vocbase_t* vocbase = state->useDatabase(databaseId);
|
|
|
|
if (vocbase == nullptr) {
|
|
// database already deleted - do nothing
|
|
return true;
|
|
}
|
|
|
|
// ignore any potential error returned by this call
|
|
auto collection = state->releaseCollection(collectionId);
|
|
|
|
if (collection == nullptr) {
|
|
collection = vocbase->lookupCollection(collectionId);
|
|
}
|
|
|
|
if (collection != nullptr) {
|
|
vocbase->dropCollection(collection->id(), true, -1.0);
|
|
}
|
|
MMFilesPersistentIndexFeature::dropCollection(databaseId, collectionId);
|
|
break;
|
|
}
|
|
|
|
case TRI_DF_MARKER_VPACK_DROP_VIEW: {
|
|
TRI_voc_tick_t const databaseId =
|
|
MMFilesDatafileHelper::DatabaseId(marker);
|
|
TRI_voc_cid_t const viewId = MMFilesDatafileHelper::ViewId(marker);
|
|
|
|
// insert the drop marker
|
|
state->droppedViews.emplace(viewId);
|
|
|
|
LOG_TOPIC(TRACE, arangodb::Logger::ENGINES)
|
|
<< "found drop view marker. databaseId: " << databaseId
|
|
<< ", viewId: " << viewId;
|
|
|
|
TRI_vocbase_t* vocbase = state->useDatabase(databaseId);
|
|
|
|
if (vocbase == nullptr) {
|
|
// database already deleted - do nothing
|
|
return true;
|
|
}
|
|
|
|
// ignore any potential error returned by this call
|
|
std::shared_ptr<arangodb::LogicalView> view =
|
|
vocbase->lookupView(viewId);
|
|
|
|
if (view != nullptr) {
|
|
vocbase->dropView(view->id(), true);
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
case TRI_DF_MARKER_VPACK_DROP_DATABASE: {
|
|
TRI_voc_tick_t const databaseId =
|
|
MMFilesDatafileHelper::DatabaseId(marker);
|
|
|
|
// insert the drop marker
|
|
state->droppedDatabases.emplace(databaseId);
|
|
|
|
LOG_TOPIC(TRACE, arangodb::Logger::ENGINES)
|
|
<< "found drop database marker. databaseId: " << databaseId;
|
|
|
|
/*TRI_vocbase_t* vocbase = */ state->releaseDatabase(databaseId);
|
|
|
|
// ignore any potential error returned by this call
|
|
state->databaseFeature->dropDatabase(databaseId, true, state->isDropped(databaseId));
|
|
|
|
MMFilesPersistentIndexFeature::dropDatabase(databaseId);
|
|
break;
|
|
}
|
|
|
|
case TRI_DF_MARKER_HEADER:
|
|
case TRI_DF_MARKER_COL_HEADER:
|
|
case TRI_DF_MARKER_FOOTER: {
|
|
// new datafile or end of datafile. forget state!
|
|
state->resetCollection();
|
|
return true;
|
|
}
|
|
|
|
default: {
|
|
// do nothing
|
|
}
|
|
}
|
|
|
|
return true;
|
|
} catch (std::exception const& ex) {
|
|
LOG_TOPIC(WARN, arangodb::Logger::ENGINES) << "cannot replay marker: "
|
|
<< ex.what();
|
|
++state->errorCount;
|
|
return state->canContinue();
|
|
} catch (...) {
|
|
LOG_TOPIC(WARN, arangodb::Logger::ENGINES) << "cannot replay marker";
|
|
++state->errorCount;
|
|
return state->canContinue();
|
|
}
|
|
}
|
|
|
|
/// @brief replay a single logfile
|
|
int MMFilesWalRecoverState::replayLogfile(MMFilesWalLogfile* logfile,
|
|
int number) {
|
|
std::string const logfileName = logfile->filename();
|
|
|
|
int const n = static_cast<int>(logfilesToProcess.size());
|
|
|
|
LOG_TOPIC(INFO, arangodb::Logger::ENGINES)
|
|
<< "replaying WAL logfile '" << logfileName << "' (" << (number + 1)
|
|
<< " of " << n << ")";
|
|
|
|
MMFilesDatafile* df = logfile->df();
|
|
|
|
// Advise on sequential use:
|
|
df->sequentialAccess();
|
|
df->willNeed();
|
|
|
|
if (!TRI_IterateDatafile(df, &MMFilesWalRecoverState::ReplayMarker,
|
|
static_cast<void*>(this))) {
|
|
LOG_TOPIC(WARN, arangodb::Logger::ENGINES)
|
|
<< "WAL inspection failed when scanning logfile '" << logfileName
|
|
<< "'";
|
|
return TRI_ERROR_ARANGO_RECOVERY;
|
|
}
|
|
|
|
// Advise on random access use:
|
|
df->randomAccess();
|
|
|
|
return TRI_ERROR_NO_ERROR;
|
|
}
|
|
|
|
/// @brief replay all logfiles
|
|
int MMFilesWalRecoverState::replayLogfiles() {
|
|
droppedCollections.clear();
|
|
droppedDatabases.clear();
|
|
|
|
int i = 0;
|
|
for (auto& it : logfilesToProcess) {
|
|
TRI_ASSERT(it != nullptr);
|
|
int res = replayLogfile(it, i++);
|
|
|
|
if (res != TRI_ERROR_NO_ERROR) {
|
|
return res;
|
|
}
|
|
}
|
|
|
|
return TRI_ERROR_NO_ERROR;
|
|
}
|
|
|
|
/// @brief abort open transactions
|
|
int MMFilesWalRecoverState::abortOpenTransactions() {
|
|
if (failedTransactions.empty()) {
|
|
// nothing to do
|
|
return TRI_ERROR_NO_ERROR;
|
|
}
|
|
|
|
LOG_TOPIC(TRACE, arangodb::Logger::ENGINES)
|
|
<< "writing abort markers for still open transactions";
|
|
int res = TRI_ERROR_NO_ERROR;
|
|
|
|
VPackBuilder builder;
|
|
|
|
try {
|
|
// write abort markers for all transactions
|
|
for (auto it = failedTransactions.begin(); it != failedTransactions.end();
|
|
++it) {
|
|
TRI_voc_tid_t transactionId = (*it).first;
|
|
|
|
if ((*it).second.second) {
|
|
// already handled
|
|
continue;
|
|
}
|
|
|
|
TRI_voc_tick_t databaseId = (*it).second.first;
|
|
|
|
MMFilesTransactionMarker marker(TRI_DF_MARKER_VPACK_ABORT_TRANSACTION,
|
|
databaseId, transactionId);
|
|
MMFilesWalSlotInfoCopy slotInfo =
|
|
MMFilesLogfileManager::instance()->allocateAndWrite(marker, false);
|
|
|
|
if (slotInfo.errorCode != TRI_ERROR_NO_ERROR) {
|
|
THROW_ARANGO_EXCEPTION(slotInfo.errorCode);
|
|
}
|
|
|
|
// recycle builder
|
|
builder.clear();
|
|
}
|
|
} catch (arangodb::basics::Exception const& ex) {
|
|
res = ex.code();
|
|
} catch (...) {
|
|
res = TRI_ERROR_INTERNAL;
|
|
}
|
|
|
|
return res;
|
|
}
|
|
|
|
/// @brief remove all empty logfiles found during logfile inspection
|
|
int MMFilesWalRecoverState::removeEmptyLogfiles() {
|
|
if (emptyLogfiles.empty()) {
|
|
return TRI_ERROR_NO_ERROR;
|
|
}
|
|
|
|
LOG_TOPIC(TRACE, arangodb::Logger::ENGINES) << "removing empty WAL logfiles";
|
|
|
|
for (auto it = emptyLogfiles.begin(); it != emptyLogfiles.end(); ++it) {
|
|
auto filename = (*it);
|
|
|
|
if (basics::FileUtils::remove(filename, nullptr)) {
|
|
LOG_TOPIC(TRACE, arangodb::Logger::ENGINES)
|
|
<< "removing empty WAL logfile '" << filename << "'";
|
|
}
|
|
}
|
|
|
|
emptyLogfiles.clear();
|
|
|
|
return TRI_ERROR_NO_ERROR;
|
|
}
|
|
|
|
/// @brief fill the secondary indexes of all collections used in recovery
|
|
int MMFilesWalRecoverState::fillIndexes() {
|
|
// release all collections
|
|
for (auto it = openedCollections.begin(); it != openedCollections.end();
|
|
++it) {
|
|
std::shared_ptr<arangodb::LogicalCollection>& collection = (*it).second;
|
|
auto physical = static_cast<MMFilesCollection*>(collection->getPhysical());
|
|
|
|
TRI_ASSERT(physical != nullptr);
|
|
// activate secondary indexes
|
|
physical->useSecondaryIndexes(true);
|
|
|
|
auto ctx = transaction::StandaloneContext::Create(collection->vocbase());
|
|
arangodb::SingleCollectionTransaction trx(
|
|
ctx, *collection, AccessMode::Type::WRITE
|
|
);
|
|
int res = physical->fillAllIndexes(&trx);
|
|
|
|
if (res != TRI_ERROR_NO_ERROR) {
|
|
return res;
|
|
}
|
|
}
|
|
|
|
return TRI_ERROR_NO_ERROR;
|
|
}
|