1
0
Fork 0

added db.<collection>.getResponsibleShard() (#8683)

This commit is contained in:
Jan 2019-04-08 16:10:45 +02:00 committed by GitHub
parent 44b25e2e88
commit 5897baa984
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 458 additions and 50 deletions

View File

@ -118,16 +118,9 @@ class StringRef {
return operator[](index);
}
size_t find(char c) const {
char const* p =
static_cast<char const*>(memchr(static_cast<void const*>(_data), c, _length));
size_t find(char c) const;
if (p == nullptr) {
return std::string::npos;
}
return (p - _data);
}
size_t rfind(char c) const;
int compare(std::string const& other) const noexcept {
int res = memcmp(_data, other.data(), (std::min)(_length, other.size()));

View File

@ -120,7 +120,11 @@ void Builder::sortObjectIndexLong(uint8_t* objBase,
}
VELOCYPACK_ASSERT(_sortEntries.size() == n);
std::sort(_sortEntries.begin(), _sortEntries.end(), [](SortEntry const& a,
SortEntry const& b) noexcept(checkOverflow(UINT64_MAX)) {
SortEntry const& b)
#ifdef VELOCYPACK_64BIT
noexcept
#endif
{
// return true iff a < b:
uint64_t sizea = a.nameSize;
uint64_t sizeb = b.nameSize;

View File

@ -31,6 +31,28 @@
using namespace arangodb::velocypack;
namespace {
void* memrchrSwitch(void const* block, int c, size_t size) {
#ifdef __linux__
return const_cast<void*>(memrchr(block, c, size));
#else
/// naive memrchr overlay for Windows or other platforms, which don't implement it
if (size) {
unsigned char const* p = static_cast<unsigned char const*>(block);
for (p += size - 1; size; p--, size--) {
if (*p == c) {
return const_cast<void*>(static_cast<void const*>(p));
}
}
}
return nullptr;
#endif
}
} // namespace
StringRef::StringRef(Slice slice) {
VELOCYPACK_ASSERT(slice.isString());
ValueLength l;
@ -47,6 +69,28 @@ StringRef& StringRef::operator=(Slice slice) {
return *this;
}
size_t StringRef::find(char c) const {
char const* p =
static_cast<char const*>(memchr(static_cast<void const*>(_data), c, _length));
if (p == nullptr) {
return std::string::npos;
}
return (p - _data);
}
size_t StringRef::rfind(char c) const {
char const* p =
static_cast<char const*>(::memrchrSwitch(static_cast<void const*>(_data), c, _length));
if (p == nullptr) {
return std::string::npos;
}
return (p - _data);
}
namespace arangodb {
namespace velocypack {

View File

@ -1,6 +1,10 @@
devel
-----
* added function `db.<collection>.getResponsibleShard()` to find out which is the
responsible shard for a given document. Note that this function is only available
in a cluster coordinator.
* updated bundled version of jemalloc memory allocator to 5.2.0.
* don't create per-database system collection `_frontend` automatically.

View File

@ -13,6 +13,8 @@ Getting Information about a Collection
<!-- js/actions/api-collection.js -->
@startDocuBlock get_api_collection_figures
@startDocuBlock get_api_collection_getResponsibleShard
<!-- js/actions/api-collection.js -->
@startDocuBlock get_api_collection_revision

View File

@ -10,6 +10,7 @@ The following methods exist on the collection object (returned by *db.name*):
* [collection.count()](../../DataModeling/Documents/DocumentMethods.md#count)
* [collection.drop()](../../DataModeling/Collections/CollectionMethods.md#drop)
* [collection.figures()](../../DataModeling/Collections/CollectionMethods.md#figures)
* [collection.getResponsibleShard()](../../DataModeling/Collections/CollectionMethods.md#getresponsibleshard)
* [collection.load()](../../DataModeling/Collections/CollectionMethods.md#load)
* [collection.properties(options)](../../DataModeling/Collections/CollectionMethods.md#properties)
* [collection.revision()](../../DataModeling/Collections/CollectionMethods.md#revision)

View File

@ -213,6 +213,21 @@ used as a lower bound approximation of the disk usage.
@endDocuBlock collectionFigures
GetResponsibleShard
-------------------
<!-- arangod/V8Server/v8-collection.cpp -->
returns the responsible shard for the given document.
`collection.getResponsibleShard(document)`
Returns a string with the responsible shard's ID. Note that the
returned shard ID is the ID of responsible shard for the document's
shard key values, and it will be returned even if no such document exists.
**Note**: this function can only be used on a coordinator in a cluster.
Load
----

View File

@ -254,6 +254,8 @@ documents in a single collection at once can be controlled by the startup option
HTTP API extensions
-------------------
### Extended index API
The HTTP API for creating indexes at POST `/_api/index` has been extended two-fold:
* to create a TTL (time-to-live) index, it is now possible to specify a value of `ttl`
@ -263,6 +265,22 @@ The HTTP API for creating indexes at POST `/_api/index` has been extended two-fo
* to create an index in background, the attribute `inBackground` can be set to `true`.
### API for querying the responsible shard
The HTTP API for collections has got an additional route for retrieving the responsible
shard for a document at PUT `/_api/collection/<name>/responsibleShard`.
When calling this route, the request body is supposed to contain the document for which
the responsible shard should be determined. The response will contain an attribute `shardId`
containing the ID of the shard that is responsible for that document.
A method `collection.getResponsibleShard(document)` was added to the JS API as well.
It does not matter if the document actually exists or not, as the shard responsibility
is determined from the document's attribute values only.
Please note that this API is only meaningful and available on a cluster coordinator.
Web interface
-------------
@ -390,14 +408,18 @@ them as well.
### Fewer system collections
The system collections `_routing` and `_modules` are not created anymore for new
new databases, as both are only needed for legacy functionality.
The system collections `_frontend`, `_modules` and `_routing` are not created
anymore for new databases by default.
`_modules` and `_routing` are only needed for legacy functionality.
Existing `_routing` collections will not be touched as they may contain user-defined
entries, and will continue to work.
Existing `_modules` collections will also remain functional.
The `_frontend` collection may still be required for actions triggered by the
web interface, but it will automatically be created lazily if needed.
### Named indices
Indices now have an additional `name` field, which allows for more useful

View File

@ -0,0 +1,35 @@
@startDocuBlock get_api_collection_getResponsibleShard
@brief Return the responsible shard for a document
@RESTHEADER{PUT /_api/collection/{collection-name}/responsibleShard, Return responsible shard for a document}
@RESTURLPARAMETERS
@RESTURLPARAM{collection-name,string,required}
The name of the collection.
@RESTDESCRIPTION
Returns the ID of the shard that is responsible for the given document
(if the document exists) or that would be responsible if such document
existed.
The response is a JSON object with a *shardId* attribute, which will
contain the ID of the responsible shard.
**Note** : This method is only available in a cluster coordinator.
@RESTRETURNCODES
@RESTRETURNCODE{200}
Returns the ID of the responsible shard:
@RESTRETURNCODE{400}
If the *collection-name* is missing, then a *HTTP 400* is
returned.
@RESTRETURNCODE{404}
If the *collection-name* is unknown, then an *HTTP 404*
is returned.
@endDocuBlock

View File

@ -246,7 +246,7 @@ void RestCollectionHandler::handleCommandGet() {
skipGenerate = true;
this->generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_HTTP_NOT_FOUND,
"expecting one of the resources 'checksum', 'count', "
"'figures', 'properties', 'revision', 'shards'");
"'figures', 'properties', 'responsibleShard', 'revision', 'shards'");
}
});
@ -392,6 +392,19 @@ void RestCollectionHandler::handleCommandPut() {
/*showFigures*/ false, /*showCount*/ false,
/*detailedCount*/ true);
}
} else if (sub == "responsibleShard") {
if (!ServerState::instance()->isCoordinator()) {
res.reset(TRI_ERROR_CLUSTER_ONLY_ON_COORDINATOR);
} else {
std::string shardId;
res = coll->getResponsibleShard(body, false, shardId);
if (res.ok()) {
builder.openObject();
builder.add("shardId", VPackValue(shardId));
builder.close();
}
}
} else if (sub == "truncate") {
{
OperationOptions opts;

View File

@ -148,7 +148,7 @@ rocksdb::Status RocksDBReadOnlyMethods::Get(rocksdb::ColumnFamilyHandle* cf,
TRI_ASSERT(cf != nullptr);
rocksdb::ReadOptions const& ro = _state->_rocksReadOptions;
TRI_ASSERT(ro.snapshot != nullptr ||
_state->isReadOnlyTransaction() && _state->isSingleOperation());
(_state->isReadOnlyTransaction() && _state->isSingleOperation()));
return _db->Get(ro, cf, key, val);
}
@ -158,7 +158,7 @@ rocksdb::Status RocksDBReadOnlyMethods::Get(rocksdb::ColumnFamilyHandle* cf,
TRI_ASSERT(cf != nullptr);
rocksdb::ReadOptions const& ro = _state->_rocksReadOptions;
TRI_ASSERT(ro.snapshot != nullptr ||
_state->isReadOnlyTransaction() && _state->isSingleOperation());
(_state->isReadOnlyTransaction() && _state->isSingleOperation()));
return _db->Get(ro, cf, key, val);
}

View File

@ -69,48 +69,43 @@ template <bool returnNullSlice>
VPackSlice buildTemporarySlice(VPackSlice const& sub, Part const& part,
VPackBuilder& temporaryBuilder, bool splitSlash) {
if (sub.isString()) {
switch (part) {
case Part::ALL: {
if (splitSlash) {
arangodb::velocypack::StringRef key(sub);
if (splitSlash) {
size_t pos = key.find('/');
if (pos != std::string::npos) {
// We have an _id. Split it.
key = key.substr(pos + 1);
temporaryBuilder.clear();
temporaryBuilder.add(VPackValue(key.toString()));
return temporaryBuilder.slice();
}
}
return sub;
switch (part) {
case Part::ALL: {
// by adding the key to the builder, we may invalidate the original key...
// however, this is safe here as the original key is not used after we have
// added to the builder
return VPackSlice(temporaryBuilder.add(VPackValuePair(key.data(), key.size(), VPackValueType::String)));
}
case Part::FRONT: {
arangodb::velocypack::StringRef prefix(sub);
size_t pos;
if (splitSlash) {
pos = prefix.find('/');
size_t pos = key.find(':');
if (pos != std::string::npos) {
// We have an _id. Split it.
prefix = prefix.substr(pos + 1);
}
}
pos = prefix.find(':');
if (pos != std::string::npos) {
prefix = prefix.substr(0, pos);
temporaryBuilder.clear();
temporaryBuilder.add(VPackValue(prefix.toString()));
return temporaryBuilder.slice();
key = key.substr(0, pos);
// by adding the key to the builder, we may invalidate the original key...
// however, this is safe here as the original key is not used after we have
// added to the builder
return VPackSlice(temporaryBuilder.add(VPackValuePair(key.data(), key.size(), VPackValueType::String)));
}
// fall-through to returning null or original slice
break;
}
case Part::BACK: {
std::string prefix = sub.copyString();
size_t pos = prefix.rfind(':');
size_t pos = key.rfind(':');
if (pos != std::string::npos) {
temporaryBuilder.clear();
temporaryBuilder.add(VPackValue(prefix.substr(pos + 1)));
return temporaryBuilder.slice();
key = key.substr(pos + 1);
// by adding the key to the builder, we may invalidate the original key...
// however, this is safe here as the original key is not used after we have
// added to the builder
return VPackSlice(temporaryBuilder.add(VPackValuePair(key.data(), key.size(), VPackValueType::String)));
}
// fall-through to returning null or original slice
break;
}
}
@ -129,13 +124,16 @@ uint64_t hashByAttributesImpl(VPackSlice slice, std::vector<std::string> const&
error = TRI_ERROR_NO_ERROR;
slice = slice.resolveExternal();
if (slice.isObject()) {
VPackBuilder temporaryBuilder;
for (auto const& attr : attributes) {
temporaryBuilder.clear();
arangodb::velocypack::StringRef realAttr;
::Part part;
::parseAttributeAndPart(attr, realAttr, part);
VPackSlice sub = slice.get(realAttr).resolveExternal();
VPackBuilder temporaryBuilder;
if (sub.isNone()) {
// shard key attribute not present in document
if (realAttr == StaticStrings::KeyString && !key.empty()) {
temporaryBuilder.add(VPackValue(key));
sub = temporaryBuilder.slice();
@ -147,6 +145,9 @@ uint64_t hashByAttributesImpl(VPackSlice slice, std::vector<std::string> const&
sub = VPackSlice::nullSlice();
}
}
// buildTemporarySlice may append data to the builder, which may invalidate
// the original "sub" value. however, "sub" is reassigned immediately with
// a new value, so it does not matter in reality
sub = ::buildTemporarySlice<returnNullSlice>(sub, part, temporaryBuilder, false);
hash = sub.normalizedHash(hash);
}

View File

@ -961,6 +961,42 @@ static void JS_FiguresVocbaseCol(v8::FunctionCallbackInfo<v8::Value> const& args
TRI_V8_TRY_CATCH_END
}
static void JS_GetResponsibleShardVocbaseCol(v8::FunctionCallbackInfo<v8::Value> const& args) {
TRI_V8_TRY_CATCH_BEGIN(isolate);
v8::HandleScope scope(isolate);
if (!ServerState::instance()->isCoordinator()) {
TRI_V8_THROW_EXCEPTION(TRI_ERROR_CLUSTER_ONLY_ON_COORDINATOR);
}
auto* collection = UnwrapCollection(isolate, args.Holder());
if (!collection) {
TRI_V8_THROW_EXCEPTION_INTERNAL("cannot extract collection");
}
if (args.Length() < 1) {
TRI_V8_THROW_EXCEPTION_USAGE("getResponsibleShard(<data>)");
}
VPackBuilder builder;
int res = TRI_V8ToVPack(isolate, builder, args[0], false);
if (res != TRI_ERROR_NO_ERROR) {
TRI_V8_THROW_EXCEPTION(res);
}
std::string shardId;
res = collection->getResponsibleShard(builder.slice(), false, shardId);
if (res != TRI_ERROR_NO_ERROR) {
TRI_V8_THROW_EXCEPTION(res);
}
v8::Handle<v8::Value> result = TRI_V8_STD_STRING(isolate, shardId);
TRI_V8_RETURN(result);
TRI_V8_TRY_CATCH_END
}
////////////////////////////////////////////////////////////////////////////////
/// @brief was docuBlock collectionLoad
////////////////////////////////////////////////////////////////////////////////
@ -2481,6 +2517,7 @@ void TRI_InitV8Collections(v8::Handle<v8::Context> context, TRI_vocbase_t* vocba
TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING(isolate, "drop"), JS_DropVocbaseCol);
TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING(isolate, "exists"), JS_ExistsVocbaseVPack);
TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING(isolate, "figures"), JS_FiguresVocbaseCol);
TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING(isolate, "getResponsibleShard"), JS_GetResponsibleShardVocbaseCol);
TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING(isolate, "insert"), JS_InsertVocbaseCol);
TRI_AddMethodVocbase(isolate, rt,
TRI_V8_ASCII_STRING(isolate, "_binaryInsert"),

View File

@ -431,6 +431,23 @@ ArangoCollection.prototype.figures = function () {
return requestResult.figures;
};
// //////////////////////////////////////////////////////////////////////////////
// / @brief gets the responsible shard for a specific value
// //////////////////////////////////////////////////////////////////////////////
ArangoCollection.prototype.getResponsibleShard = function (data) {
if (data === undefined || data === null) {
data = {};
} else if (typeof data === 'string' || typeof data === 'number') {
data = { _key: String(data) };
}
var requestResult = this._database._connection.PUT(this._baseurl('responsibleShard'), data);
arangosh.checkRequestResult(requestResult);
return requestResult.shardId;
};
// //////////////////////////////////////////////////////////////////////////////
// / @brief gets the checksum of a collection
// //////////////////////////////////////////////////////////////////////////////

View File

@ -0,0 +1,220 @@
/*jshint globalstrict:false, strict:false */
/*global assertEqual, assertNotEqual, assertNull, assertTrue, assertUndefined, fail */
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2018 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Jan Steemann
////////////////////////////////////////////////////////////////////////////////
let jsunity = require("jsunity");
let arangodb = require("@arangodb");
let internal = require("internal");
let db = arangodb.db;
function ResponsibleShardSuite() {
const cn = "UnitTestsCollection1";
return {
setUp : function () {
db._drop(cn);
},
tearDown : function () {
db._drop(cn);
},
testCheckMissingParameter : function () {
let c = db._create(cn, { numberOfShards: 5, shardKeys: ["_key"] });
let key = c.insert({})._key;
let shardCounts = c.count(true);
let total = 0;
let expected = null;
assertEqual(5, Object.keys(shardCounts).length);
Object.keys(shardCounts).forEach(function(key) {
let count = shardCounts[key];
total += count;
if (count > 0) {
expected = key;
}
});
assertEqual(1, total);
assertEqual(expected, c.getResponsibleShard(key));
assertEqual(expected, c.getResponsibleShard({ _key: key }));
},
testCheckBrokenParameter : function () {
let c = db._create(cn, { numberOfShards: 5, shardKeys: ["_key"] });
assertNotEqual("", c.getResponsibleShard(12345));
},
testCheckKeyNotGiven : function () {
let c = db._create(cn, { numberOfShards: 5, shardKeys: ["_key"] });
try {
c.getResponsibleShard({ test: "foo" });
fail();
} catch (err) {
assertEqual(internal.errors.ERROR_CLUSTER_NOT_ALL_SHARDING_ATTRIBUTES_GIVEN.code, err.errorNum);
}
},
testCheckWithShardKeyKey : function () {
let c = db._create(cn, { numberOfShards: 5, shardKeys: ["_key"] });
c.insert({ _key: "meow" });
let shardCounts = c.count(true);
let total = 0;
let expected = null;
assertEqual(5, Object.keys(shardCounts).length);
Object.keys(shardCounts).forEach(function(key) {
let count = shardCounts[key];
total += count;
if (count > 0) {
expected = key;
}
});
assertEqual(1, total);
assertEqual(expected, c.getResponsibleShard("meow"));
assertEqual(expected, c.getResponsibleShard({ _key: "meow" }));
},
testCheckWithShardKeyOther : function () {
let c = db._create(cn, { numberOfShards: 5, shardKeys: ["test"] });
c.insert({ test: "abc" });
let shardCounts = c.count(true);
let total = 0;
let expected = null;
assertEqual(5, Object.keys(shardCounts).length);
Object.keys(shardCounts).forEach(function(key) {
let count = shardCounts[key];
total += count;
if (count > 0) {
expected = key;
}
});
assertEqual(1, total);
assertEqual(expected, c.getResponsibleShard({ test: "abc" }));
try {
c.getResponsibleShard({});
fail();
} catch (err) {
assertEqual(internal.errors.ERROR_CLUSTER_NOT_ALL_SHARDING_ATTRIBUTES_GIVEN.code, err.errorNum);
}
},
testCheckWithShardKeysMultiple : function () {
let c = db._create(cn, { numberOfShards: 5, shardKeys: ["test1", "test2"] });
c.insert({ test1: "abc", test2: "nnsn" });
let shardCounts = c.count(true);
let total = 0;
let expected = null;
assertEqual(5, Object.keys(shardCounts).length);
Object.keys(shardCounts).forEach(function(key) {
let count = shardCounts[key];
total += count;
if (count > 0) {
expected = key;
}
});
assertEqual(1, total);
assertEqual(expected, c.getResponsibleShard({ test1: "abc", test2: "nnsn" }));
},
testCheckWithShardKeysPartial : function () {
let c = db._create(cn, { numberOfShards: 5, shardKeys: ["test1", "test2"] });
c.insert({ test1: "abc" });
let shardCounts = c.count(true);
let total = 0;
let expected = null;
assertEqual(5, Object.keys(shardCounts).length);
Object.keys(shardCounts).forEach(function(key) {
let count = shardCounts[key];
total += count;
if (count > 0) {
expected = key;
}
});
assertEqual(1, total);
assertEqual(expected, c.getResponsibleShard({ test1: "abc", test2: null }));
try {
c.getResponsibleShard({ test1: "abc" });
fail();
} catch (err) {
assertEqual(internal.errors.ERROR_CLUSTER_NOT_ALL_SHARDING_ATTRIBUTES_GIVEN.code, err.errorNum);
}
},
testCheckWithShardKeysPartialOther : function () {
let c = db._create(cn, { numberOfShards: 5, shardKeys: ["test1", "test2"] });
c.insert({ test2: "abc", test1: null });
let shardCounts = c.count(true);
let total = 0;
let expected = null;
assertEqual(5, Object.keys(shardCounts).length);
Object.keys(shardCounts).forEach(function(key) {
let count = shardCounts[key];
total += count;
if (count > 0) {
expected = key;
}
});
assertEqual(1, total);
assertEqual(expected, c.getResponsibleShard({ test2: "abc", test1: null }));
try {
c.getResponsibleShard({ test2: "abc" });
fail();
} catch (err) {
assertEqual(internal.errors.ERROR_CLUSTER_NOT_ALL_SHARDING_ATTRIBUTES_GIVEN.code, err.errorNum);
}
},
};
}
jsunity.run(ResponsibleShardSuite);
return jsunity.done();