1
0
Fork 0

Fixed babies creation in cluster with undefined keys.

This commit is contained in:
Michael Hackstein 2016-04-08 10:53:11 +02:00
parent d792432d95
commit 12d973504c
5 changed files with 61 additions and 17 deletions

View File

@ -2418,7 +2418,8 @@ std::shared_ptr<std::vector<ShardID>> ClusterInfo::getShardList(
int ClusterInfo::getResponsibleShard(CollectionID const& collectionID,
VPackSlice slice, bool docComplete,
ShardID& shardID,
bool& usesDefaultShardingAttributes) {
bool& usesDefaultShardingAttributes,
std::string const& key) {
// Note that currently we take the number of shards and the shardKeys
// from Plan, since they are immutable. Later we will have to switch
// this to Current, when we allow to add and remove shards.
@ -2464,7 +2465,7 @@ int ClusterInfo::getResponsibleShard(CollectionID const& collectionID,
int error = TRI_ERROR_NO_ERROR;
uint64_t hash = arangodb::basics::VelocyPackHelper::hashByAttributes(
slice, *shardKeysPtr, docComplete, error);
slice, *shardKeysPtr, docComplete, error, key);
static char const* magicPhrase =
"Foxx you have stolen the goose, give she back again!";
static size_t const len = 52;

View File

@ -800,7 +800,8 @@ class ClusterInfo {
int getResponsibleShard(CollectionID const&, arangodb::velocypack::Slice,
bool docComplete, ShardID& shardID,
bool& usesDefaultShardingAttributes);
bool& usesDefaultShardingAttributes,
std::string const& key = "");
//////////////////////////////////////////////////////////////////////////////

View File

@ -568,8 +568,8 @@ int createDocumentOnCoordinator(
}
std::string const collid = StringUtils::itoa(collinfo->id());
std::unordered_map<ShardID, std::vector<VPackValueLength>> shardMap;
std::vector<std::pair<ShardID, VPackValueLength>> reverseMapping;
std::unordered_map<ShardID, std::vector<std::pair<VPackValueLength, std::string>>> shardMap;
std::vector<std::pair<ShardID, VPackValueLength>> reverseMapping;
bool useMultiple = slice.isArray();
auto workOnOneNode = [&shardMap, &ci, &collid, &collinfo, &reverseMapping](
@ -597,8 +597,15 @@ int createDocumentOnCoordinator(
// Now find the responsible shard:
bool usesDefaultShardingAttributes;
ShardID shardID;
int error = ci->getResponsibleShard(collid, node, true, shardID,
usesDefaultShardingAttributes);
int error TRI_ERROR_NO_ERROR;
if (userSpecifiedKey) {
error = ci->getResponsibleShard(collid, node, true, shardID,
usesDefaultShardingAttributes);
} else {
error = ci->getResponsibleShard(collid, node, true, shardID,
usesDefaultShardingAttributes,
_key);
}
if (error == TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND) {
return TRI_ERROR_CLUSTER_SHARD_GONE;
}
@ -610,10 +617,19 @@ int createDocumentOnCoordinator(
// We found the responsible shard. Add it to the list.
auto it = shardMap.find(shardID);
if (it == shardMap.end()) {
std::vector<VPackValueLength> counter({index});
shardMap.emplace(shardID, counter);
if (userSpecifiedKey) {
std::vector<std::pair<VPackValueLength, std::string>> counter({{index, ""}});
shardMap.emplace(shardID, counter);
} else {
std::vector<std::pair<VPackValueLength, std::string>> counter({{index, _key}});
shardMap.emplace(shardID, counter);
}
} else {
it->second.emplace_back(index);
if (userSpecifiedKey) {
it->second.emplace_back(index, "");
} else {
it->second.emplace_back(index, _key);
}
}
reverseMapping.emplace_back(shardID, index);
return TRI_ERROR_NO_ERROR;
@ -649,12 +665,30 @@ int createDocumentOnCoordinator(
auto body = std::make_shared<std::string>();
for (auto const& it : shardMap) {
if (!useMultiple) {
body->assign(slice.toJson());
TRI_ASSERT(it.second.size() == 1);
auto idx = it.second.front();
if (idx.second.empty()) {
body->assign(slice.toJson());
} else {
reqBuilder.clear();
reqBuilder.openObject();
TRI_SanitizeObject(slice, reqBuilder);
reqBuilder.add(TRI_VOC_ATTRIBUTE_KEY, VPackValue(idx.second));
reqBuilder.close();
body->assign(reqBuilder.slice().toJson());
}
} else {
reqBuilder.clear();
reqBuilder.openArray();
for (auto idx : it.second) {
reqBuilder.add(slice.at(idx));
for (auto const& idx : it.second) {
if (idx.second.empty()) {
reqBuilder.add(slice.at(idx.first));
} else {
reqBuilder.openObject();
TRI_SanitizeObject(slice.at(idx.first), reqBuilder);
reqBuilder.add(TRI_VOC_ATTRIBUTE_KEY, VPackValue(idx.second));
reqBuilder.close();
}
}
reqBuilder.close();
body->assign(reqBuilder.slice().toJson());

View File

@ -700,14 +700,22 @@ double VelocyPackHelper::toDouble(VPackSlice const& slice, bool& failed) {
uint64_t VelocyPackHelper::hashByAttributes(
VPackSlice slice, std::vector<std::string> const& attributes,
bool docComplete, int& error) {
bool docComplete, int& error, std::string const& key) {
error = TRI_ERROR_NO_ERROR;
uint64_t hash = TRI_FnvHashBlockInitial();
if (slice.isObject()) {
for (auto const& attr: attributes) {
VPackSlice sub = slice.get(attr);
if (sub.isNone() && !docComplete) {
error = TRI_ERROR_CLUSTER_NOT_ALL_SHARDING_ATTRIBUTES_GIVEN;
if (sub.isNone()) {
if (attr == "_key" && !key.empty()) {
VPackBuilder temporaryBuilder;
temporaryBuilder.add(VPackValue(key));
temporaryBuilder.slice().normalizedHash(hash);
continue;
}
if (!docComplete) {
error = TRI_ERROR_CLUSTER_NOT_ALL_SHARDING_ATTRIBUTES_GIVEN;
}
}
sub.normalizedHash(hash);
}

View File

@ -225,7 +225,7 @@ class VelocyPackHelper {
static double toDouble(VPackSlice const&, bool&);
static uint64_t hashByAttributes(VPackSlice, std::vector<std::string> const&,
bool, int&);
bool, int&, std::string const& key = "");
static arangodb::velocypack::Slice NullValue();
static arangodb::velocypack::Slice TrueValue();