1
0
Fork 0

Renewed restore-collection in RestReplicationHandler. It now uses new API to create Collections, doing all sanity-checks etc.

This commit is contained in:
Michael Hackstein 2017-01-16 13:36:46 +01:00
parent 9a4c7a508e
commit 4f67c8352c
3 changed files with 84 additions and 118 deletions

View File

@ -1619,11 +1619,20 @@ int RestReplicationHandler::processRestoreCollectionCoordinator(
}
// now re-create the collection
// dig out number of shards, explicit attribute takes precedence:
// Build up new information that we need to merge with the given one
VPackBuilder toMerge;
toMerge.openObject();
// We always need a new id
TRI_voc_tick_t newIdTick = ci->uniqid(1);
std::string&& newId = StringUtils::itoa(newIdTick);
toMerge.add("id", VPackValue(newId));
// Number of shards. Will be overwritten if not existent
VPackSlice const numberOfShardsSlice = parameters.get("numberOfShards");
if (numberOfShardsSlice.isInteger()) {
numberOfShards = numberOfShardsSlice.getNumericValue<uint64_t>();
} else {
if (!numberOfShardsSlice.isInteger()) {
// The information does not contain numberOfShards. Overwrite it.
VPackSlice const shards = parameters.get("shards");
if (shards.isObject()) {
numberOfShards = static_cast<uint64_t>(shards.length());
@ -1635,70 +1644,20 @@ int RestReplicationHandler::processRestoreCollectionCoordinator(
numberOfShards = 1;
}
}
TRI_ASSERT(numberOfShards > 0);
toMerge.add("numberOfShards", VPackValue(numberOfShards));
}
TRI_ASSERT(numberOfShards > 0);
// Replication Factor. Will be overwritten if not existent
VPackSlice const replFactorSlice = parameters.get("replicationFactor");
if (replFactorSlice.isInteger()) {
replicationFactor =
replFactorSlice.getNumericValue<decltype(replicationFactor)>();
}
if (replicationFactor == 0) {
replicationFactor = 1;
}
TRI_voc_tick_t newIdTick = ci->uniqid(1);
VPackBuilder toMerge;
std::string&& newId = StringUtils::itoa(newIdTick);
toMerge.openObject();
toMerge.add("id", VPackValue(newId));
// shard keys
VPackSlice const shardKeys = parameters.get("shardKeys");
if (!shardKeys.isObject()) {
// set default shard key
toMerge.add("shardKeys", VPackValue(VPackValueType::Array));
toMerge.add(VPackValue(StaticStrings::KeyString));
toMerge.close(); // end of shardKeys
}
// shards
std::vector<std::string> dbServers; // will be filled
std::unordered_map<std::string, std::vector<std::string>> shardDistribution =
arangodb::distributeShards(numberOfShards, replicationFactor, dbServers);
if (shardDistribution.empty()) {
errorMsg = "no database servers found in cluster";
return TRI_ERROR_INTERNAL;
}
toMerge.add(VPackValue("shards"));
{
VPackObjectBuilder guard(&toMerge);
for (auto const& p : shardDistribution) {
toMerge.add(VPackValue(p.first));
{
VPackArrayBuilder guard2(&toMerge);
for (std::string const& s : p.second) {
toMerge.add(VPackValue(s));
}
}
if (!replFactorSlice.isInteger()) {
if (replicationFactor == 0) {
replicationFactor = 1;
}
TRI_ASSERT(replicationFactor > 0);
toMerge.add("replicationFactor", VPackValue(replicationFactor));
}
toMerge.add("replicationFactor", VPackValue(replicationFactor));
// Now put in the primary and an edge index if needed:
toMerge.add("indexes", VPackValue(VPackValueType::Array));
// create a dummy primary index
{
arangodb::LogicalCollection* collection = nullptr;
std::unique_ptr<arangodb::PrimaryIndex> primaryIndex(
new arangodb::PrimaryIndex(collection));
toMerge.openObject();
primaryIndex->toVelocyPack(toMerge, false);
toMerge.close();
}
toMerge.close(); // TopLevel
VPackSlice const type = parameters.get("type");
TRI_col_type_e collectionType;
@ -1709,31 +1668,21 @@ int RestReplicationHandler::processRestoreCollectionCoordinator(
return TRI_ERROR_HTTP_BAD_PARAMETER;
}
if (collectionType == TRI_COL_TYPE_EDGE) {
// create a dummy edge index
std::unique_ptr<arangodb::EdgeIndex> edgeIndex(
new arangodb::EdgeIndex(newIdTick, nullptr));
toMerge.openObject();
edgeIndex->toVelocyPack(toMerge, false);
toMerge.close();
}
toMerge.close(); // indexes
toMerge.close(); // TopLevel
VPackSlice const sliceToMerge = toMerge.slice();
VPackBuilder mergedBuilder =
VPackCollection::merge(parameters, sliceToMerge, false);
VPackSlice const merged = mergedBuilder.slice();
int res = ci->createCollectionCoordinator(
dbName, newId, numberOfShards, replicationFactor, merged, errorMsg, 0.0);
if (res != TRI_ERROR_NO_ERROR) {
errorMsg =
"unable to create collection: " + std::string(TRI_errno_string(res));
try {
auto col = ClusterMethods::createCollectionOnCoordinator(collectionType,
_vocbase, merged);
TRI_ASSERT(col != nullptr);
} catch (basics::Exception const& e) {
// Error, report it.
errorMsg = e.message();
return e.code();
}
return res;
// All other errors are thrown to the outside.
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////

View File

@ -892,6 +892,47 @@ void LogicalCollection::getIndexesVPack(VPackBuilder& result,
result.close();
}
void LogicalCollection::getPropertiesVPack(VPackBuilder& result, bool translateCids) const {
TRI_ASSERT(result.isOpenObject());
result.add("id", VPackValue(std::to_string(_cid)));
result.add("name", VPackValue(_name));
result.add("type", VPackValue(static_cast<int>(_type)));
result.add("status", VPackValue(_status));
result.add("deleted", VPackValue(_isDeleted));
result.add("doCompact", VPackValue(_doCompact));
result.add("isSystem", VPackValue(_isSystem));
result.add("isVolatile", VPackValue(_isVolatile));
result.add("waitForSync", VPackValue(_waitForSync));
result.add("journalSize", VPackValue(_journalSize));
result.add("indexBuckets", VPackValue(_indexBuckets));
result.add("replicationFactor", VPackValue(_replicationFactor));
if (!_distributeShardsLike.empty()) {
if (translateCids) {
CollectionNameResolver resolver(_vocbase);
result.add("distributeShardsLike",
VPackValue(resolver.getCollectionNameCluster(
static_cast<TRI_voc_cid_t>(
basics::StringUtils::uint64(_distributeShardsLike)))));
} else {
result.add("distributeShardsLike", VPackValue(_distributeShardsLike));
}
}
if (_keyGenerator != nullptr) {
result.add(VPackValue("keyOptions"));
result.openObject();
_keyGenerator->toVelocyPack(result);
result.close();
}
result.add(VPackValue("shardKeys"));
result.openArray();
for (auto const& key : _shardKeys) {
result.add(VPackValue(key));
}
result.close(); // shardKeys
}
// SECTION: Replication
int LogicalCollection::replicationFactor() const {
return static_cast<int>(_replicationFactor);
@ -1068,7 +1109,7 @@ void LogicalCollection::setStatus(TRI_vocbase_col_status_e status) {
void LogicalCollection::toVelocyPackForAgency(VPackBuilder& result) {
_status = TRI_VOC_COL_STATUS_LOADED;
result.openObject();
toVelocyPackInObject(result);
toVelocyPackInObject(result, false);
result.close(); // Base Object
}
@ -1081,7 +1122,7 @@ void LogicalCollection::toVelocyPackForClusterInventory(VPackBuilder& result,
result.openObject();
result.add(VPackValue("parameters"));
result.openObject();
toVelocyPackInObject(result);
toVelocyPackInObject(result, true);
result.close();
result.add(VPackValue("indexes"));
getIndexesVPack(result, false);
@ -1091,7 +1132,7 @@ void LogicalCollection::toVelocyPackForClusterInventory(VPackBuilder& result,
void LogicalCollection::toVelocyPack(VPackBuilder& result,
bool withPath) const {
result.openObject();
toVelocyPackInObject(result);
toVelocyPackInObject(result, false);
result.add(
"cid",
VPackValue(std::to_string(_cid))); // export cid for compatibility, too
@ -1110,38 +1151,9 @@ void LogicalCollection::toVelocyPack(VPackBuilder& result,
// Internal helper that inserts VPack info into an existing object and leaves
// the object open
void LogicalCollection::toVelocyPackInObject(VPackBuilder& result) const {
result.add("id", VPackValue(std::to_string(_cid)));
result.add("name", VPackValue(_name));
result.add("type", VPackValue(static_cast<int>(_type)));
result.add("status", VPackValue(_status));
result.add("deleted", VPackValue(_isDeleted));
result.add("doCompact", VPackValue(_doCompact));
result.add("isSystem", VPackValue(_isSystem));
result.add("isVolatile", VPackValue(_isVolatile));
result.add("waitForSync", VPackValue(_waitForSync));
result.add("journalSize", VPackValue(_journalSize));
result.add("indexBuckets", VPackValue(_indexBuckets));
result.add("replicationFactor", VPackValue(_replicationFactor));
void LogicalCollection::toVelocyPackInObject(VPackBuilder& result, bool translateCids) const {
getPropertiesVPack(result, translateCids);
result.add("numberOfShards", VPackValue(_numberOfShards));
if (!_distributeShardsLike.empty()) {
result.add("distributeShardsLike", VPackValue(_distributeShardsLike));
}
if (_keyGenerator != nullptr) {
result.add(VPackValue("keyOptions"));
result.openObject();
_keyGenerator->toVelocyPack(result);
result.close();
}
result.add(VPackValue("shardKeys"));
result.openArray();
for (auto const& key : _shardKeys) {
result.add(VPackValue(key));
}
result.close(); // shardKeys
result.add(VPackValue("shards"));
result.openObject();
for (auto const& shards : *_shardIds) {

View File

@ -236,6 +236,11 @@ class LogicalCollection {
// is somehow protected. If it goes out of all scopes
// or it's indexes are freed the pointer returned will get invalidated.
arangodb::PrimaryIndex* primaryIndex() const;
// Adds all properties to the builder (has to be an open object)
// Does not add Shards or Indexes
void getPropertiesVPack(arangodb::velocypack::Builder&,
bool translateCids) const;
void getIndexesVPack(arangodb::velocypack::Builder&, bool) const;
// SECTION: Replication
@ -487,7 +492,7 @@ class LogicalCollection {
void increaseInternalVersion();
protected:
void toVelocyPackInObject(arangodb::velocypack::Builder& result) const;
void toVelocyPackInObject(arangodb::velocypack::Builder& result, bool translateCids) const;
// SECTION: Meta Information
//