1
0
Fork 0

try to not fail hard when a collection is dropped while the WAL is tailed (#4225)

This commit is contained in:
Jan 2018-01-04 16:31:17 +01:00 committed by GitHub
parent 8908d39031
commit b8c043945f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 47 additions and 42 deletions

View File

@ -25,6 +25,7 @@
#include "ClusterInfo.h"
#include "Basics/ConditionLocker.h"
#include "Basics/Exceptions.h"
#include "Basics/MutexLocker.h"
#include "Basics/ReadLocker.h"
#include "Basics/StringUtils.h"
@ -782,6 +783,7 @@ void ClusterInfo::loadCurrent() {
/// @brief ask about a collection
/// If it is not found in the cache, the cache is reloaded once
/// if the collection is not found afterwards, this method will throw an exception
std::shared_ptr<LogicalCollection> ClusterInfo::getCollection(
DatabaseID const& databaseID, CollectionID const& collectionID) {
@ -1407,7 +1409,6 @@ int ClusterInfo::dropCollectionCoordinator(
// First check that no other collection has a distributeShardsLike
// entry pointing to us:
auto coll = getCollection(databaseName, collectionID);
// not used # std::string id = std::to_string(coll->cid());
auto colls = getCollections(databaseName);
std::vector<std::string> clones;
for (std::shared_ptr<LogicalCollection> const& p : colls) {
@ -1418,8 +1419,8 @@ int ClusterInfo::dropCollectionCoordinator(
}
if (!clones.empty()){
errorMsg += "Collection must not be dropped while it is sharding "
"prototype for collection[s]";
errorMsg += "Collection must not be dropped while it is a sharding "
"prototype for collection(s)";
for (auto const& i : clones) {
errorMsg += std::string(" ") + i;
}
@ -1692,9 +1693,15 @@ int ClusterInfo::ensureIndexCoordinator(
}
std::string const idString = arangodb::basics::StringUtils::itoa(iid);
int errorCode = ensureIndexCoordinatorWithoutRollback(
databaseName, collectionID, idString, slice, create, compare, resultBuilder, errorMsg, timeout);
int errorCode;
try {
errorCode = ensureIndexCoordinatorWithoutRollback(
databaseName, collectionID, idString, slice, create, compare, resultBuilder, errorMsg, timeout);
} catch (basics::Exception const& ex) {
errorCode = ex.code();
} catch (...) {
errorCode = TRI_ERROR_INTERNAL;
}
if (errorCode == TRI_ERROR_NO_ERROR || application_features::ApplicationServer::isStopping()) {
return errorCode;
}
@ -1800,11 +1807,6 @@ int ClusterInfo::ensureIndexCoordinatorWithoutRollback(
{
std::shared_ptr<LogicalCollection> c =
getCollection(databaseName, collectionID);
if (c == nullptr) {
return setErrormsg(TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND, errorMsg);
}
std::shared_ptr<VPackBuilder> tmp = std::make_shared<VPackBuilder>();
c->getIndexesVPack(*(tmp.get()), false, false);
{
@ -2174,9 +2176,6 @@ int ClusterInfo::dropIndexCoordinator(std::string const& databaseName,
READ_LOCKER(readLocker, _planProt.lock);
if (c == nullptr) {
return setErrormsg(TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND, errorMsg);
}
c->getIndexesVPack(tmp, false, false);
indexes = tmp.slice();

View File

@ -306,6 +306,8 @@ class ClusterInfo {
/// @brief ask about a collection
/// If it is not found in the cache, the cache is reloaded once. The second
/// argument can be a collection ID or a collection name (both cluster-wide).
/// if the collection is not found afterwards, this method will throw an
/// exception
//////////////////////////////////////////////////////////////////////////////
std::shared_ptr<LogicalCollection> getCollection(DatabaseID const&,

View File

@ -222,11 +222,6 @@ AqlValue MMFilesAqlFunctions::Fulltext(
return AqlValue(AqlValueHintNull());
}
// NOTE: The shared_ptr is protected by trx lock.
// It is save to use the raw pointer directly.
// We are NOT allowed to delete the index.
arangodb::MMFilesFulltextIndex* fulltextIndex = nullptr;
// split requested attribute name on '.' character to create a proper
// vector of AttributeNames
std::vector<std::vector<arangodb::basics::AttributeName>> search;
@ -234,8 +229,15 @@ AqlValue MMFilesAqlFunctions::Fulltext(
for (auto const& it : basics::StringUtils::split(attributeName, '.')) {
search.back().emplace_back(it, false);
}
// NOTE: The shared_ptr is protected by trx lock.
// It is safe to use the raw pointer directly.
// We are NOT allowed to delete the index.
arangodb::MMFilesFulltextIndex* fulltextIndex = nullptr;
for (auto const& idx : collection->getIndexes()) {
auto indexes = collection->getIndexes();
for (auto const& idx : indexes) {
if (idx->type() == arangodb::Index::TRI_IDX_TYPE_FULLTEXT_INDEX) {
// test if index is on the correct field
if (arangodb::basics::AttributeName::isIdentical(idx->fields(), search,
@ -291,7 +293,6 @@ AqlValue MMFilesAqlFunctions::Fulltext(
return AqlValue(builder.get());
}
/// @brief function NEAR
AqlValue MMFilesAqlFunctions::Near(arangodb::aql::Query* query,
transaction::Methods* trx,

View File

@ -483,7 +483,7 @@ static void resolveInfo(
ClusterInfo* ci = ClusterInfo::instance();
std::shared_ptr<LogicalCollection> lc =
ci->getCollection(vocbase->name(), collectionID);
if (!lc || lc->deleted()) {
if (lc->deleted()) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND,
collectionID);
}

View File

@ -112,12 +112,17 @@ int Utils::resolveShard(WorkerConfig const* config,
auto const& it = planIDMap.find(collectionName);
if (it != planIDMap.end()) {
info = ci->getCollection(config->database(), it->second); // might throw
if (info == nullptr) {
return TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND;
}
} else {
LOG_TOPIC(ERR, Logger::PREGEL)
<< "The collection could not be translated to a planID";
return TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND;
}
TRI_ASSERT(info != nullptr);
bool usesDefaultShardingAttributes;
VPackBuilder partial;
partial.openObject();

View File

@ -227,16 +227,17 @@ std::string CollectionNameResolver::getCollectionNameCluster(
int tries = 0;
while (tries++ < 2) {
auto ci = ClusterInfo::instance()->getCollection(
_vocbase->name(), arangodb::basics::StringUtils::itoa(cid));
if (ci == nullptr) {
try {
auto ci = ClusterInfo::instance()->getCollection(
_vocbase->name(), arangodb::basics::StringUtils::itoa(cid));
name = ci->name();
_resolvedIds.emplace(cid, name);
return name;
} catch (...) {
// most likely collection not found. now try again
ClusterInfo::instance()->flush();
continue;
}
name = ci->name();
_resolvedIds.emplace(cid, name);
return name;
}
LOG_TOPIC(ERR, arangodb::Logger::FIXME) << "CollectionNameResolver: was not able to resolve id " << cid;

View File

@ -88,15 +88,13 @@ Result methods::Collections::lookup(TRI_vocbase_t* vocbase,
if (ServerState::instance()->isCoordinator()) {
try {
auto coll = ClusterInfo::instance()->getCollection(vocbase->name(), name);
if (coll) {
// check authentication after ensuring the collection exists
if (exec != nullptr &&
!exec->canUseCollection(vocbase->name(), coll->name(), AuthLevel::RO)) {
return Result(TRI_ERROR_FORBIDDEN, "No access to collection '" + name + "'");
}
func(coll.get());
return Result();
// check authentication after ensuring the collection exists
if (exec != nullptr &&
!exec->canUseCollection(vocbase->name(), coll->name(), AuthLevel::RO)) {
return Result(TRI_ERROR_FORBIDDEN, "No access to collection '" + name + "'");
}
func(coll.get());
return Result();
} catch (basics::Exception const& ex) {
return Result(ex.code(), ex.what());
} catch (std::exception const& ex) {

View File

@ -106,17 +106,16 @@ arangodb::Result Indexes::getAll(LogicalCollection const* collection,
//std::string const cid = collection->cid_as_string();
std::string const& cid = collection->name();
auto c = ClusterInfo::instance()->getCollection(databaseName, cid);
// add code for estimates here
std::unordered_map<std::string,double> estimates;
int rv = selectivityEstimatesOnCoordinator(databaseName,cid,estimates);
int rv = selectivityEstimatesOnCoordinator(databaseName, cid, estimates);
if (rv != TRI_ERROR_NO_ERROR){
return Result(rv, "could not retrieve estimates");
}
VPackBuilder tmpInner;
auto c = ClusterInfo::instance()->getCollection(databaseName, cid);
c->getIndexesVPack(tmpInner, withFigures, false);
tmp.openArray();
@ -347,7 +346,6 @@ Result Indexes::ensureIndex(LogicalCollection* collection,
VPackSlice indexDef = defBuilder.slice();
if (ServerState::instance()->isCoordinator()) {
TRI_ASSERT(indexDef.isObject());
auto c = ClusterInfo::instance()->getCollection(dbname, collname);
// check if there is an attempt to create a unique index on non-shard keys
if (create) {
@ -371,6 +369,7 @@ Result Indexes::ensureIndex(LogicalCollection* collection,
if (v.isBoolean() && v.getBoolean()) {
// unique index, now check if fields and shard keys match
auto c = ClusterInfo::instance()->getCollection(dbname, collname);
VPackSlice flds = indexDef.get("fields");
if (flds.isArray() && c->numberOfShards() > 1) {
std::vector<std::string> const& shardKeys = c->shardKeys();