1
0
Fork 0

Miscellaneous fixes for named indices (#9100)

This commit is contained in:
Dan Larkin-York 2019-05-31 11:00:56 -04:00 committed by Jan
parent 6a56130ff8
commit 44a413a9af
11 changed files with 195 additions and 55 deletions

View File

@ -1,6 +1,11 @@
devel
-----
* enabled dropping an index by its name
* fixed lookup of index from collection by fully qualified name, e.g.
`db.testCollection.index('testCollection/primary')`.
* fix issue #9106: Sparse Skiplist Index on multiple fields not used for FILTER + SORT query
Allow AQL query optimizer to use sparse indexes in more cases, specifically when

View File

@ -361,45 +361,51 @@ bool Index::validateId(char const* key) {
}
}
/// @brief validate an index handle (collection name + / + index id)
bool Index::validateHandle(char const* key, size_t* split) {
/// @brief validate an index name
bool Index::validateName(char const* key) {
return TRI_vocbase_t::IsAllowedName(false, arangodb::velocypack::StringRef(key, strlen(key)));
}
namespace {
bool validatePrefix(char const* key, size_t* split) {
char const* p = key;
char c = *p;
// extract collection name
if (!(c == '_' || (c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z'))) {
return false;
}
++p;
// find divider
while (1) {
c = *p;
if ((c == '_') || (c == '-') || (c >= '0' && c <= '9') ||
(c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z')) {
++p;
continue;
if (c == '\0') {
return false;
}
if (c == '/') {
break;
}
return false;
}
if (static_cast<size_t>(p - key) > TRI_COL_NAME_LENGTH) {
return false;
p++;
}
// store split position
*split = p - key;
++p;
return TRI_vocbase_t::IsAllowedName(true, arangodb::velocypack::StringRef(key, *split));
}
} // namespace
/// @brief validate an index handle (collection name + / + index id)
bool Index::validateHandle(char const* key, size_t* split) {
bool ok = validatePrefix(key, split);
// validate index id
return validateId(p);
return ok && validateId(key + *split + 1);
}
/// @brief validate an index handle (collection name + / + index name)
bool Index::validateHandleName(char const* key, size_t* split) {
bool ok = validatePrefix(key, split);
// validate index id
return ok && validateName(key + *split + 1);
}
/// @brief generate a new index id

View File

@ -275,9 +275,15 @@ class Index {
/// @brief validate an index id
static bool validateId(char const*);
/// @brief validate an index name
static bool validateName(char const*);
/// @brief validate an index handle (collection name + / + index id)
static bool validateHandle(char const*, size_t*);
/// @brief validate an index handle (by name) (collection name + / + index name)
static bool validateHandleName(char const*, size_t*);
/// @brief generate a new index id
static TRI_idx_iid_t generateId();

View File

@ -752,8 +752,9 @@ void Syncer::createIndexInternal(VPackSlice const& idxDef, LogicalCollection& co
{
// check ID first
TRI_idx_iid_t iid = 0;
std::string name; // placeholder for now
CollectionNameResolver resolver(col.vocbase());
Result res = methods::Indexes::extractHandle(&col, &resolver, idxDef, iid);
Result res = methods::Indexes::extractHandle(&col, &resolver, idxDef, iid, name);
if (res.ok() && iid != 0) {
// lookup by id
auto byId = physical->lookupIndex(iid);
@ -769,9 +770,8 @@ void Syncer::createIndexInternal(VPackSlice const& idxDef, LogicalCollection& co
}
// now check name;
std::string name =
basics::VelocyPackHelper::getStringValue(idxDef,
StaticStrings::IndexName, "");
name = basics::VelocyPackHelper::getStringValue(idxDef, StaticStrings::IndexName,
"");
if (!name.empty()) {
// lookup by name
auto byName = physical->lookupIndex(name);

View File

@ -36,6 +36,15 @@
#include <velocypack/Iterator.h>
#include <velocypack/velocypack-aliases.h>
namespace {
bool startsWith(std::string const& str, std::string const& prefix) {
if (str.size() < prefix.size()) {
return false;
}
return str.substr(0, prefix.size()) == prefix;
}
} // namespace
using namespace arangodb;
using namespace arangodb::basics;
using namespace arangodb::rest;
@ -293,7 +302,11 @@ RestStatus RestIndexHandler::dropIndex() {
std::string const& iid = suffixes[1];
VPackBuilder idBuilder;
idBuilder.add(VPackValue(cName + TRI_INDEX_HANDLE_SEPARATOR_CHR + iid));
if (::startsWith(iid, cName + TRI_INDEX_HANDLE_SEPARATOR_CHR)) {
idBuilder.add(VPackValue(iid));
} else {
idBuilder.add(VPackValue(cName + TRI_INDEX_HANDLE_SEPARATOR_CHR + iid));
}
Result res = methods::Indexes::drop(coll.get(), idBuilder.slice());
if (res.ok()) {

View File

@ -59,8 +59,8 @@ using namespace arangodb;
using namespace arangodb::basics;
using namespace arangodb::methods;
Result Indexes::getIndex(LogicalCollection const* collection,
VPackSlice const& indexId, VPackBuilder& out) {
Result Indexes::getIndex(LogicalCollection const* collection, VPackSlice const& indexId,
VPackBuilder& out, transaction::Methods* trx) {
// do some magic to parse the iid
std::string id; // will (eventually) be fully-qualified; "collection/identifier"
std::string name; // will be just name or id (no "collection/")
@ -86,7 +86,8 @@ Result Indexes::getIndex(LogicalCollection const* collection,
}
VPackBuilder tmp;
Result res = Indexes::getAll(collection, Index::makeFlags(), /*withHidden*/ true, tmp);
Result res =
Indexes::getAll(collection, Index::makeFlags(), /*withHidden*/ true, tmp, trx);
if (res.ok()) {
for (VPackSlice const& index : VPackArrayIterator(tmp.slice())) {
if (index.get(StaticStrings::IndexId).compareString(id) == 0 ||
@ -102,7 +103,8 @@ Result Indexes::getIndex(LogicalCollection const* collection,
/// @brief get all indexes, skips view links
arangodb::Result Indexes::getAll(LogicalCollection const* collection,
std::underlying_type<Index::Serialize>::type flags,
bool withHidden, VPackBuilder& result) {
bool withHidden, VPackBuilder& result,
transaction::Methods* inputTrx) {
VPackBuilder tmp;
if (ServerState::instance()->isCoordinator()) {
TRI_ASSERT(collection);
@ -145,18 +147,22 @@ arangodb::Result Indexes::getAll(LogicalCollection const* collection,
tmp.close();
} else {
SingleCollectionTransaction trx(transaction::StandaloneContext::Create(
collection->vocbase()),
*collection, AccessMode::Type::READ);
std::shared_ptr<transaction::Methods> trx;
if (inputTrx) {
trx = std::shared_ptr<transaction::Methods>(inputTrx, [](transaction::Methods*) {});
} else {
trx = std::make_shared<SingleCollectionTransaction>(
transaction::StandaloneContext::Create(collection->vocbase()),
*collection, AccessMode::Type::READ);
// we actually need this hint here, so that the collection is not
// loaded if it has status unloaded.
trx.addHint(transaction::Hints::Hint::NO_USAGE_LOCK);
// we actually need this hint here, so that the collection is not
// loaded if it has status unloaded.
trx->addHint(transaction::Hints::Hint::NO_USAGE_LOCK);
Result res = trx.begin();
if (!res.ok()) {
return res;
Result res = trx->begin();
if (!res.ok()) {
return res;
}
}
// get list of indexes
@ -170,7 +176,14 @@ arangodb::Result Indexes::getAll(LogicalCollection const* collection,
idx->toVelocyPack(tmp, flags);
}
tmp.close();
trx.finish(res);
if (!inputTrx) {
Result res;
res = trx->finish(res);
if (res.fail()) {
return res;
}
}
}
bool mergeEdgeIdxs = !ServerState::instance()->isDBServer();
@ -520,13 +533,42 @@ static bool ExtractIndexHandle(VPackSlice const& arg,
return false;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief checks if argument is an index name
////////////////////////////////////////////////////////////////////////////////
static bool ExtractIndexName(VPackSlice const& arg, std::string& collectionName,
std::string& name) {
TRI_ASSERT(collectionName.empty());
TRI_ASSERT(name.empty());
if (!arg.isString()) {
return false;
}
std::string str = arg.copyString();
size_t split;
if (arangodb::Index::validateHandleName(str.data(), &split)) {
collectionName = std::string(str.data(), split);
name = std::string(str.data() + split + 1, str.length() - split - 1);
return true;
}
if (arangodb::Index::validateName(str.data())) {
name = str;
return true;
}
return false;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief looks up an index identifier
////////////////////////////////////////////////////////////////////////////////
Result Indexes::extractHandle(arangodb::LogicalCollection const* collection,
arangodb::CollectionNameResolver const* resolver,
VPackSlice const& val, TRI_idx_iid_t& iid) {
VPackSlice const& val, TRI_idx_iid_t& iid,
std::string& name) {
// reset the collection identifier
std::string collectionName;
@ -535,7 +577,8 @@ Result Indexes::extractHandle(arangodb::LogicalCollection const* collection,
// extract the index identifier from a string
if (val.isString() || val.isNumber()) {
if (!ExtractIndexHandle(val, collectionName, iid)) {
if (!ExtractIndexHandle(val, collectionName, iid) &&
!ExtractIndexName(val, collectionName, name)) {
return Result(TRI_ERROR_ARANGO_INDEX_HANDLE_BAD);
}
}
@ -543,9 +586,11 @@ Result Indexes::extractHandle(arangodb::LogicalCollection const* collection,
// extract the index identifier from an object
else if (val.isObject()) {
VPackSlice iidVal = val.get(StaticStrings::IndexId);
if (!ExtractIndexHandle(iidVal, collectionName, iid)) {
return Result(TRI_ERROR_ARANGO_INDEX_HANDLE_BAD);
VPackSlice nameVal = val.get(StaticStrings::IndexName);
if (!ExtractIndexName(nameVal, collectionName, name)) {
return Result(TRI_ERROR_ARANGO_INDEX_HANDLE_BAD);
}
}
}
@ -570,9 +615,11 @@ arangodb::Result Indexes::drop(LogicalCollection* collection, VPackSlice const&
}
TRI_idx_iid_t iid = 0;
if (ServerState::instance()->isCoordinator()) {
CollectionNameResolver resolver(collection->vocbase());
Result res = Indexes::extractHandle(collection, &resolver, indexArg, iid);
std::string name;
auto getHandle = [collection, &indexArg, &iid,
&name](CollectionNameResolver const* resolver,
transaction::Methods* trx = nullptr) -> Result {
Result res = Indexes::extractHandle(collection, resolver, indexArg, iid, name);
if (!res.ok()) {
events::DropIndex(collection->vocbase().name(), collection->name(), "",
@ -580,6 +627,34 @@ arangodb::Result Indexes::drop(LogicalCollection* collection, VPackSlice const&
return res;
}
if (iid == 0 && !name.empty()) {
VPackBuilder builder;
res = methods::Indexes::getIndex(collection, indexArg, builder, trx);
if (!res.ok()) {
events::DropIndex(collection->vocbase().name(), collection->name(), "",
res.errorNumber());
return res;
}
VPackSlice idSlice = builder.slice().get(StaticStrings::IndexId);
Result res = Indexes::extractHandle(collection, resolver, idSlice, iid, name);
if (!res.ok()) {
events::DropIndex(collection->vocbase().name(), collection->name(), "",
res.errorNumber());
}
}
return res;
};
if (ServerState::instance()->isCoordinator()) {
CollectionNameResolver resolver(collection->vocbase());
Result res = getHandle(&resolver);
if (!res.ok()) {
return res;
}
// flush estimates
collection->flushClusterIndexEstimates();
@ -608,10 +683,8 @@ arangodb::Result Indexes::drop(LogicalCollection* collection, VPackSlice const&
}
LogicalCollection* col = trx.documentCollection();
res = Indexes::extractHandle(collection, trx.resolver(), indexArg, iid);
res = getHandle(trx.resolver(), &trx);
if (!res.ok()) {
events::DropIndex(collection->vocbase().name(), collection->name(),
std::to_string(iid), res.errorNumber());
return res;
}

View File

@ -40,12 +40,14 @@ namespace methods {
/// Common code for ensureIndexes and api-index.js
struct Indexes {
static arangodb::Result getIndex(LogicalCollection const* collection,
velocypack::Slice const& indexId, velocypack::Builder&);
velocypack::Slice const& indexId, velocypack::Builder&,
transaction::Methods* trx = nullptr);
/// @brief get all indexes, skips view links
static arangodb::Result getAll(LogicalCollection const* collection,
std::underlying_type<Index::Serialize>::type,
bool withHidden, arangodb::velocypack::Builder&);
bool withHidden, arangodb::velocypack::Builder&,
transaction::Methods* trx = nullptr);
static arangodb::Result createIndex(LogicalCollection*, Index::IndexType,
std::vector<std::string> const&,
@ -60,7 +62,8 @@ struct Indexes {
static arangodb::Result extractHandle(LogicalCollection const* collection,
CollectionNameResolver const* resolver,
velocypack::Slice const& val, TRI_idx_iid_t& iid);
velocypack::Slice const& val,
TRI_idx_iid_t& iid, std::string& name);
private:
static arangodb::Result ensureIndexCoordinator(LogicalCollection const* collection,

View File

@ -685,6 +685,8 @@ ArangoCollection.prototype.index = function (id) {
ArangoCollection.prototype.dropIndex = function (id) {
if (id.hasOwnProperty('id')) {
id = id.id;
} else if (id.hasOwnProperty('name')) {
id = id.name;
}
var requestResult = this._database._connection.DELETE(this._database._indexurl(id, this.name()));

View File

@ -173,7 +173,7 @@ ArangoDatabase.prototype._indexurl = function (id, expectedName) {
if (typeof id === 'string') {
var pa = ArangoDatabase.indexRegex.exec(id);
if (pa === null && expectedName !== undefined) {
if (pa === null && expectedName !== undefined && !id.startsWith(expectedName + '/')) {
id = expectedName + '/' + id;
}
} else if (typeof id === 'number' && expectedName !== undefined) {

View File

@ -156,7 +156,7 @@ ArangoCollection.prototype.index = function (id) {
for (i = 0; i < indexes.length; ++i) {
var index = indexes[i];
if (index.id === id || index.name === id) {
if (index.id === id || index.name === id || this.name() + '/' + index.name === id) {
return index;
}
}

View File

@ -136,6 +136,10 @@ function indexSuite() {
idx = internal.db._index(fqn);
assertEqual(id.id, idx.id);
assertEqual(id.name, idx.name);
idx = collection.index(fqn);
assertEqual(id.id, idx.id);
assertEqual(id.name, idx.name);
},
////////////////////////////////////////////////////////////////////////////////
@ -193,6 +197,34 @@ function indexSuite() {
assertFalse(res);
},
////////////////////////////////////////////////////////////////////////////////
/// @brief drop index by id string
////////////////////////////////////////////////////////////////////////////////
testDropIndexByName : function () {
// pick up the numeric part (starts after the slash)
var name = collection.ensureGeoIndex("a").name;
var res = collection.dropIndex(collection.name() + "/" + name);
assertTrue(res);
res = collection.dropIndex(collection.name() + "/" + name);
assertFalse(res);
name = collection.ensureGeoIndex("a").name;
res = collection.dropIndex(name);
assertTrue(res);
res = collection.dropIndex(name);
assertFalse(res);
name = collection.ensureGeoIndex("a").name;
res = internal.db._dropIndex(collection.name() + "/" + name);
assertTrue(res);
res = internal.db._dropIndex(collection.name() + "/" + name);
assertFalse(res);
},
////////////////////////////////////////////////////////////////////////////////
/// @brief access a non-existing index
////////////////////////////////////////////////////////////////////////////////