mirror of https://gitee.com/bigwinds/arangodb
Collection properties update in rocksdb (#3523)
* Allow changing replicationFactor on coordinator * Fixing logic * Allowing change of replication factor * Additional input validation * grrr * Testing invalid inputs * Better document various methods, fixing properties update in rocksdb * Removing invalid parameter before creating shards * Changing sleeps to dynamic check in Current
This commit is contained in:
parent
4d223597db
commit
d684eaa7f8
|
@ -1196,6 +1196,7 @@ bool MMFilesCollection::closeDatafiles(
|
|||
return result;
|
||||
}
|
||||
|
||||
/// @brief export properties
|
||||
void MMFilesCollection::getPropertiesVPack(velocypack::Builder& result) const {
|
||||
TRI_ASSERT(result.isOpenObject());
|
||||
result.add("count", VPackValue(_initialCount));
|
||||
|
@ -1208,6 +1209,7 @@ void MMFilesCollection::getPropertiesVPack(velocypack::Builder& result) const {
|
|||
TRI_ASSERT(result.isOpenObject());
|
||||
}
|
||||
|
||||
/// @brief used for updating properties
|
||||
void MMFilesCollection::getPropertiesVPackCoordinator(
|
||||
velocypack::Builder& result) const {
|
||||
TRI_ASSERT(result.isOpenObject());
|
||||
|
|
|
@ -157,7 +157,9 @@ class MMFilesCollection final : public PhysicalCollection {
|
|||
TRI_voc_tick_t maxTick() const { return _maxTick; }
|
||||
void maxTick(TRI_voc_tick_t value) { _maxTick = value; }
|
||||
|
||||
/// @brief export properties
|
||||
void getPropertiesVPack(velocypack::Builder&) const override;
|
||||
/// @brief used for updating properties
|
||||
void getPropertiesVPackCoordinator(velocypack::Builder&) const override;
|
||||
|
||||
// datafile management
|
||||
|
|
|
@ -145,7 +145,7 @@ void RocksDBCollection::setPath(std::string const&) {
|
|||
arangodb::Result RocksDBCollection::updateProperties(VPackSlice const& slice,
|
||||
bool doSync) {
|
||||
_cacheEnabled = basics::VelocyPackHelper::readBooleanValue(
|
||||
slice, "cacheEnabled", !_logicalCollection->isSystem());
|
||||
slice, "cacheEnabled", _cacheEnabled);
|
||||
primaryIndex()->setCacheEnabled(_cacheEnabled);
|
||||
if (_cacheEnabled) {
|
||||
createCache();
|
||||
|
@ -169,17 +169,21 @@ PhysicalCollection* RocksDBCollection::clone(LogicalCollection* logical) {
|
|||
return new RocksDBCollection(logical, this);
|
||||
}
|
||||
|
||||
/// @brief export properties
|
||||
void RocksDBCollection::getPropertiesVPack(velocypack::Builder& result) const {
|
||||
// objectId might be undefined on the coordinator
|
||||
TRI_ASSERT(result.isOpenObject());
|
||||
result.add("objectId", VPackValue(std::to_string(_objectId)));
|
||||
result.add("cacheEnabled", VPackValue(_cacheEnabled));
|
||||
TRI_ASSERT(result.isOpenObject());
|
||||
}
|
||||
|
||||
/// @brief used for updating properties
|
||||
void RocksDBCollection::getPropertiesVPackCoordinator(
|
||||
velocypack::Builder& result) const {
|
||||
getPropertiesVPack(result);
|
||||
// objectId might be undefined on the coordinator
|
||||
TRI_ASSERT(result.isOpenObject());
|
||||
result.add("cacheEnabled", VPackValue(_cacheEnabled));
|
||||
TRI_ASSERT(result.isOpenObject());
|
||||
}
|
||||
|
||||
/// @brief closes an open collection
|
||||
|
|
|
@ -71,7 +71,9 @@ class RocksDBCollection final : public PhysicalCollection {
|
|||
|
||||
virtual PhysicalCollection* clone(LogicalCollection*) override;
|
||||
|
||||
/// @brief export properties
|
||||
void getPropertiesVPack(velocypack::Builder&) const override;
|
||||
/// @brief used for updating properties
|
||||
void getPropertiesVPackCoordinator(velocypack::Builder&) const override;
|
||||
|
||||
/// @brief closes an open collection
|
||||
|
|
|
@ -68,6 +68,7 @@ class PhysicalCollection {
|
|||
|
||||
/// @brief export properties
|
||||
virtual void getPropertiesVPack(velocypack::Builder&) const = 0;
|
||||
/// @brief used for updating properties
|
||||
virtual void getPropertiesVPackCoordinator(velocypack::Builder&) const = 0;
|
||||
|
||||
/// @brief return the figures for a collection
|
||||
|
|
|
@ -958,38 +958,36 @@ arangodb::Result LogicalCollection::updateProperties(VPackSlice const& slice,
|
|||
|
||||
WRITE_LOCKER(writeLocker, _infoLock);
|
||||
|
||||
VPackSlice rf = slice.get("replicationFactor");
|
||||
if (!_isLocal && !rf.isNone()) {
|
||||
size_t rf = _replicationFactor;
|
||||
VPackSlice rfSl = slice.get("replicationFactor");
|
||||
if (!rfSl.isNone()) {
|
||||
if (!rfSl.isInteger() || rfSl.getInt() <= 0 || rfSl.getUInt() > 10) {
|
||||
return Result(TRI_ERROR_BAD_PARAMETER, "bad value replicationFactor");
|
||||
}
|
||||
rf = rfSl.getNumber<size_t>();
|
||||
if (!_isLocal && rf != _replicationFactor) { // sanity checks
|
||||
if (!_distributeShardsLike.empty()) {
|
||||
LOG_TOPIC(ERR, Logger::FIXME) << "Cannot change replicationFactor, please change " << _distributeShardsLike;
|
||||
return Result(TRI_ERROR_FORBIDDEN, "Cannot change replicationFactor, "
|
||||
"please change " + _distributeShardsLike);
|
||||
} else if (_type == TRI_COL_TYPE_EDGE && _isSmart) {
|
||||
LOG_TOPIC(ERR, Logger::FIXME) << "Changing replicationFactor not supported for smart edge collections";
|
||||
return Result(TRI_ERROR_NOT_IMPLEMENTED, "Changing replicationFactor "
|
||||
"not supported for smart edge collections");
|
||||
} else if (isSatellite()) {
|
||||
LOG_TOPIC(ERR, Logger::FIXME) << "Satellite collection cannot have replicationFactor";
|
||||
return Result(TRI_ERROR_FORBIDDEN, "Satellite collection "
|
||||
"cannot have replicationFactor");
|
||||
}
|
||||
if (!rf.isInteger() || rf.getInt() <= 0 || rf.getUInt() > 10) {
|
||||
return Result(TRI_ERROR_BAD_PARAMETER, "bad value replicationFactor");
|
||||
}
|
||||
}
|
||||
|
||||
// The physical may first reject illegal properties.
|
||||
// After this call it either has thrown or the properties are stored
|
||||
Result res = getPhysical()->updateProperties(slice, doSync);
|
||||
|
||||
if (!res.ok()) {
|
||||
return res;
|
||||
}
|
||||
|
||||
_waitForSync = Helper::getBooleanValue(slice, "waitForSync", _waitForSync);
|
||||
if (rf.isNumber()) {
|
||||
_replicationFactor = rf.getNumber<size_t>();
|
||||
}
|
||||
_replicationFactor = rf;
|
||||
|
||||
if (!_isLocal) {
|
||||
// We need to inform the cluster as well
|
||||
|
|
|
@ -800,6 +800,10 @@ function executePlanForCollections(plannedCollections) {
|
|||
let save = {id: collectionInfo.id, name: collectionInfo.name};
|
||||
delete collectionInfo.id; // must not
|
||||
delete collectionInfo.name;
|
||||
if (collectionInfo.hasOwnProperty('objectId')) {
|
||||
console.warn('unexpected objectId in %s', JSON.stringify(collectionInfo));
|
||||
}
|
||||
delete collectionInfo.objectId;
|
||||
try {
|
||||
if (collectionInfo.type === ArangoCollection.TYPE_EDGE) {
|
||||
db._createEdgeCollection(shardName, collectionInfo);
|
||||
|
|
|
@ -37,8 +37,9 @@ const db = require("@arangodb").db;
|
|||
const cn1 = "UnitTestPropertiesLeader";
|
||||
const cn2 = "UnitTestPropertiesFollower";
|
||||
|
||||
// check whether all shards have the right amount of followers
|
||||
function checkReplicationFactor(name, fac) {
|
||||
let current = ArangoAgency.get('Current/Collections/_system');
|
||||
// first we need the plan id of the collection
|
||||
let plan = ArangoAgency.get('Plan/Collections/_system');
|
||||
let collectionId = Object.values(plan.arango.Plan.Collections['_system']).reduce((result, collectionDef) => {
|
||||
if (result) {
|
||||
|
@ -49,9 +50,22 @@ function checkReplicationFactor(name, fac) {
|
|||
}
|
||||
}, undefined);
|
||||
|
||||
Object.values(current.arango.Current.Collections['_system'][collectionId]).forEach(entry => {
|
||||
expect(entry.servers).to.have.lengthOf(fac);
|
||||
for (let i = 0; i < 120; i++) {
|
||||
let current = ArangoAgency.get('Current/Collections/_system');
|
||||
let shards = Object.values(current.arango.Current.Collections['_system'][collectionId]);
|
||||
let finished = 0;
|
||||
shards.forEach(entry => {
|
||||
finished += entry.servers.length == fac ? 1 : 0;
|
||||
});
|
||||
if (shards.length > 0 && finished == shards.length) {
|
||||
return;
|
||||
}
|
||||
internal.sleep(0.5);
|
||||
}
|
||||
let current = ArangoAgency.get('Current/Collections/_system');
|
||||
let val = current.arango.Current.Collections['_system'][collectionId];
|
||||
throw "replicationFactor is not reflected properly in " +
|
||||
"/Current/Collections/_system/" + collectionId + ": "+ JSON.stringify(val);
|
||||
};
|
||||
|
||||
describe('Update collection properties', function() {
|
||||
|
@ -77,9 +91,6 @@ describe('Update collection properties', function() {
|
|||
let props = coll.properties({replicationFactor: 2});
|
||||
expect(props.replicationFactor).to.equal(2);
|
||||
|
||||
// FIXME: do not wait for a fixed time
|
||||
internal.sleep(5);
|
||||
|
||||
checkReplicationFactor(cn1, 2);
|
||||
});
|
||||
|
||||
|
@ -93,8 +104,6 @@ describe('Update collection properties', function() {
|
|||
let props = coll.properties({replicationFactor: 1});
|
||||
expect(props.replicationFactor).to.equal(1);
|
||||
|
||||
internal.sleep(5);
|
||||
|
||||
checkReplicationFactor(cn1, 1);
|
||||
});
|
||||
|
||||
|
@ -163,9 +172,6 @@ describe('Update collection properties with distributeShardsLike, ', function()
|
|||
let props = leader.properties({replicationFactor: 2});
|
||||
expect(props.replicationFactor).to.equal(2);
|
||||
|
||||
// FIXME: do not wait for a fixed time
|
||||
internal.sleep(5);
|
||||
|
||||
checkReplicationFactor(cn1, 2);
|
||||
checkReplicationFactor(cn2, 2);
|
||||
});
|
||||
|
@ -182,9 +188,6 @@ describe('Update collection properties with distributeShardsLike, ', function()
|
|||
let props = leader.properties({replicationFactor: 1});
|
||||
expect(props.replicationFactor).to.equal(1);
|
||||
|
||||
// FIXME: do not wait for a fixed time
|
||||
internal.sleep(5);
|
||||
|
||||
checkReplicationFactor(cn1, 1);
|
||||
checkReplicationFactor(cn2, 1);
|
||||
});
|
||||
|
|
Loading…
Reference in New Issue