1
0
Fork 0

Merge branch 'engine-api' of https://github.com/arangodb/arangodb into engine-api

This commit is contained in:
jsteemann 2017-02-27 14:38:35 +01:00
commit e37e94adbe
13 changed files with 215 additions and 133 deletions

View File

@ -1140,12 +1140,9 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName,
_agencyCallbackRegistry->registerCallback(agencyCallback);
TRI_DEFER(_agencyCallbackRegistry->unregisterCallback(agencyCallback));
VPackBuilder builder;
builder.add(json);
AgencyOperation createCollection(
"Plan/Collections/" + databaseName + "/" + collectionID,
AgencyValueOperationType::SET, builder.slice());
AgencyValueOperationType::SET, json);
AgencyOperation increaseVersion("Plan/Version",
AgencySimpleOperationType::INCREMENT_OP);
@ -1549,7 +1546,7 @@ int ClusterInfo::ensureIndexCoordinator(
auto numberOfShardsMutex = std::make_shared<Mutex>();
auto numberOfShards = std::make_shared<int>(0);
auto resBuilder = std::make_shared<VPackBuilder>();
auto collectionBuilder = std::make_shared<VPackBuilder>();
VPackBuilder collectionBuilder;
{
std::shared_ptr<LogicalCollection> c =
@ -1607,9 +1604,14 @@ int ClusterInfo::ensureIndexCoordinator(
}
// now create a new index
c->toVelocyPackForAgency(*collectionBuilder);
std::unordered_set<std::string> const ignoreKeys{
"allowUserKeys", "cid", /* cid really ignore?*/
"count", "planId", "version",
};
c->setStatus(TRI_VOC_COL_STATUS_LOADED);
collectionBuilder = c->toVelocyPackIgnore(ignoreKeys, false);
}
VPackSlice const collectionSlice = collectionBuilder->slice();
VPackSlice const collectionSlice = collectionBuilder.slice();
auto newBuilder = std::make_shared<VPackBuilder>();
if (!collectionSlice.isObject()) {

View File

@ -2242,8 +2242,12 @@ ClusterMethods::persistCollectionInAgency(LogicalCollection* col) {
}
col->setShardMap(shards);
VPackBuilder velocy;
col->toVelocyPackForAgency(velocy);
std::unordered_set<std::string> const ignoreKeys{
"allowUserKeys", "cid", /* cid really ignore?*/
"count", "planId", "version",
};
col->setStatus(TRI_VOC_COL_STATUS_LOADED);
VPackBuilder velocy = col->toVelocyPackIgnore(ignoreKeys, false);
std::string errorMsg;
int myerrno = ci->createCollectionCoordinator(

View File

@ -111,7 +111,8 @@ static DatafileStatisticsContainer* FindDatafileStats(
} // namespace
int MMFilesCollection::updateProperties(VPackSlice const& slice, bool doSync){
CollectionResult MMFilesCollection::updateProperties(VPackSlice const& slice,
bool doSync) {
// validation
if (isVolatile() &&
arangodb::basics::VelocyPackHelper::getBooleanValue(
@ -128,6 +129,19 @@ int MMFilesCollection::updateProperties(VPackSlice const& slice, bool doSync){
TRI_ERROR_BAD_PARAMETER,
"isVolatile option cannot be changed at runtime");
}
auto journalSlice = slice.get("journalSize");
if (journalSlice.isNone()) {
// In some apis maximalSize is allowed instead
journalSlice = slice.get("maximalSize");
}
if (!journalSlice.isNone()) {
TRI_voc_size_t toUpdate = journalSlice.getNumericValue<TRI_voc_size_t>();
if (toUpdate < TRI_JOURNAL_MINIMAL_SIZE) {
return {TRI_ERROR_BAD_PARAMETER, "<properties>.journalSize too small"};
}
}
if (slice.hasKey("journalSize")) {
_journalSize = Helper::getNumericValue<TRI_voc_size_t>(slice, "journalSize",
@ -137,7 +151,8 @@ int MMFilesCollection::updateProperties(VPackSlice const& slice, bool doSync){
_journalSize);
}
_doCompact = Helper::getBooleanValue(slice, "doCompact", _doCompact);
return TRI_ERROR_NO_ERROR;
return CollectionResult{TRI_ERROR_NO_ERROR};
}
int MMFilesCollection::persistProperties() noexcept {

View File

@ -129,7 +129,7 @@ class MMFilesCollection final : public PhysicalCollection {
_path = path;
};
virtual int updateProperties(VPackSlice const& slice, bool doSync) override;
CollectionResult updateProperties(VPackSlice const& slice, bool doSync) override;
virtual int persistProperties() noexcept override;
virtual PhysicalCollection* clone(LogicalCollection*, PhysicalCollection*) override;

View File

@ -732,12 +732,11 @@ bool MMFilesWalRecoverState::ReplayMarker(TRI_df_marker_t const* marker,
// be
// dropped later
bool const forceSync = state->willBeDropped(databaseId, collectionId);
int res = collection->updateProperties(payloadSlice, forceSync);
if (res != TRI_ERROR_NO_ERROR) {
CollectionResult res = collection->updateProperties(payloadSlice, forceSync);
if (res.successful()) {
LOG_TOPIC(WARN, arangodb::Logger::FIXME) << "cannot change collection properties for collection "
<< collectionId << " in database " << databaseId << ": "
<< TRI_errno_string(res);
<< res.errorMessage;
++state->errorCount;
return state->canContinue();
}

View File

@ -784,7 +784,7 @@ int ContinuousSyncer::changeCollection(VPackSlice const& slice) {
arangodb::CollectionGuard guard(_vocbase, cid);
bool doSync = application_features::ApplicationServer::getFeature<DatabaseFeature>("Database")->forceSyncProperties();
return guard.collection()->updateProperties(data, doSync);
return guard.collection()->updateProperties(data, doSync).code;
}
////////////////////////////////////////////////////////////////////////////////

View File

@ -1651,7 +1651,7 @@ int InitialSyncer::changeCollection(arangodb::LogicalCollection* col,
"Database")
->forceSyncProperties();
return guard.collection()->updateProperties(slice, doSync);
return guard.collection()->updateProperties(slice, doSync).code;
}
////////////////////////////////////////////////////////////////////////////////

View File

@ -1268,9 +1268,6 @@ static void JS_PropertiesVocbaseCol(
std::shared_ptr<LogicalCollection> info =
ClusterInfo::instance()->getCollection(
databaseName, collection->cid_as_string());
auto physical = static_cast<MMFilesCollection*>(info->getPhysical());
TRI_ASSERT(physical != nullptr);
if (0 < args.Length()) {
v8::Handle<v8::Value> par = args[0];
@ -1285,38 +1282,11 @@ static void JS_PropertiesVocbaseCol(
}
VPackSlice const slice = builder.slice();
if (slice.hasKey("journalSize")) {
VPackSlice maxSizeSlice = slice.get("journalSize");
TRI_voc_size_t maximalSize =
maxSizeSlice.getNumericValue<TRI_voc_size_t>();
if (maximalSize < TRI_JOURNAL_MINIMAL_SIZE) {
TRI_V8_THROW_EXCEPTION_PARAMETER(
"<properties>.journalSize too small");
}
}
if (physical->isVolatile() !=
arangodb::basics::VelocyPackHelper::getBooleanValue(
slice, "isVolatile", physical->isVolatile())) {
TRI_V8_THROW_EXCEPTION_PARAMETER(
"isVolatile option cannot be changed at runtime");
}
if (physical->isVolatile() && info->waitForSync()) {
TRI_V8_THROW_EXCEPTION_PARAMETER(
"volatile collections do not support the waitForSync option");
}
uint32_t tmp =
arangodb::basics::VelocyPackHelper::getNumericValue<uint32_t>(
slice, "indexBuckets",
2 /*Just for validation, this default Value passes*/);
if (tmp == 0 || tmp > 1024) {
TRI_V8_THROW_EXCEPTION_PARAMETER(
"indexBuckets must be a two-power between 1 and 1024");
}
int res = info->updateProperties(slice, false);
CollectionResult res = info->updateProperties(slice, false);
if (res != TRI_ERROR_NO_ERROR) {
TRI_V8_THROW_EXCEPTION(res);
if (!res.successful()) {
TRI_V8_THROW_EXCEPTION_MESSAGE(res.code, res.errorMessage);
}
}
@ -1325,10 +1295,11 @@ static void JS_PropertiesVocbaseCol(
auto c = ClusterInfo::instance()->getCollection(
databaseName, StringUtils::itoa(collection->cid()));
VPackBuilder vpackProperties;
vpackProperties.openObject();
c->toVelocyPackForV8(vpackProperties);
vpackProperties.close();
std::unordered_set<std::string> const ignoreKeys{
"allowUserKeys", "cid", "count", "deleted", "id",
"indexes", "name", "path", "planId", "shards",
"status", "type", "version"};
VPackBuilder vpackProperties = c->toVelocyPackIgnore(ignoreKeys, true);
// return the current parameter set
v8::Handle<v8::Object> result =
@ -1351,8 +1322,6 @@ static void JS_PropertiesVocbaseCol(
if (res != TRI_ERROR_NO_ERROR) {
TRI_V8_THROW_EXCEPTION(res);
}
auto physical = static_cast<MMFilesCollection*>(collection->getPhysical());
TRI_ASSERT(physical != nullptr);
// check if we want to change some parameters
if (isModification) {
@ -1370,12 +1339,14 @@ static void JS_PropertiesVocbaseCol(
// try to write new parameter to file
bool doSync = application_features::ApplicationServer::getFeature<DatabaseFeature>("Database")->forceSyncProperties();
res = collection->updateProperties(slice, doSync);
CollectionResult updateRes = collection->updateProperties(slice, doSync);
if (res != TRI_ERROR_NO_ERROR) {
TRI_V8_THROW_EXCEPTION(res);
if (!updateRes.successful()) {
TRI_V8_THROW_EXCEPTION_MESSAGE(updateRes.code, updateRes.errorMessage);
}
auto physical = static_cast<MMFilesCollection*>(collection->getPhysical());
TRI_ASSERT(physical != nullptr);
res = physical->persistProperties();
if (res != TRI_ERROR_NO_ERROR) {
@ -1385,10 +1356,14 @@ static void JS_PropertiesVocbaseCol(
}
}
}
VPackBuilder vpackProperties;
vpackProperties.openObject();
collection->toVelocyPackForV8(vpackProperties);
vpackProperties.close();
std::unordered_set<std::string> const ignoreKeys{
"allowUserKeys", "cid", "count", "deleted", "id", "indexes", "name",
"path", "planId", "shards", "status", "type", "version",
/* These are only relevant for cluster */
"distributeShardsLike", "isSmart", "numberOfShards", "replicationFactor",
"shardKeys"};
VPackBuilder vpackProperties = collection->toVelocyPackIgnore(ignoreKeys, true);
// return the current parameter set
v8::Handle<v8::Object> result =

View File

@ -573,7 +573,7 @@ TRI_vocbase_col_status_e LogicalCollection::tryFetchStatus(bool& didFetch) {
}
/// @brief returns a translation of a collection status
std::string LogicalCollection::statusString() {
std::string LogicalCollection::statusString() const {
READ_LOCKER(readLocker, _lock);
switch (_status) {
case TRI_VOC_COL_STATUS_UNLOADED:
@ -802,49 +802,6 @@ void LogicalCollection::setStatus(TRI_vocbase_col_status_e status) {
}
}
void LogicalCollection::toVelocyPackForV8(VPackBuilder& result) const {
getPropertiesVPack(result, false, true);
// TODO We have to properly unify the VPack creation functions...
if (ServerState::instance()->isCoordinator()) {
result.add("numberOfShards", VPackValue(_numberOfShards));
if (isSatellite()) {
result.add("replicationFactor", VPackValue("satelite"));
} else {
result.add("replicationFactor", VPackValue(_replicationFactor));
}
if (!_distributeShardsLike.empty()) {
CollectionNameResolver resolver(_vocbase);
result.add("distributeShardsLike",
VPackValue(resolver.getCollectionNameCluster(
static_cast<TRI_voc_cid_t>(
basics::StringUtils::uint64(_distributeShardsLike)))));
}
result.add(VPackValue("shardKeys"));
result.openArray();
for (auto const& key : _shardKeys) {
result.add(VPackValue(key));
}
result.close(); // shardKeys
if (!_avoidServers.empty()) {
result.add(VPackValue("avoidServers"));
result.openArray();
for (auto const& server : _avoidServers) {
result.add(VPackValue(server));
}
result.close();
}
}
}
void LogicalCollection::toVelocyPackForAgency(VPackBuilder& result) {
_status = TRI_VOC_COL_STATUS_LOADED;
result.openObject();
toVelocyPackInObject(result, false);
result.close(); // Base Object
}
void LogicalCollection::toVelocyPackForClusterInventory(VPackBuilder& result,
bool useSystem) const {
if (_isSystem && !useSystem) {
@ -880,6 +837,106 @@ void LogicalCollection::toVelocyPack(VPackBuilder& result,
result.close();
}
void LogicalCollection::toVelocyPack2(VPackBuilder& result, bool translateCids) const {
// We write into an open object
TRI_ASSERT(result.isOpenObject());
// Collection Meta Information
result.add("cid", VPackValue(std::to_string(_cid)));
result.add("id", VPackValue(std::to_string(_cid)));
result.add("name", VPackValue(_name));
result.add("type", VPackValue(static_cast<int>(_type)));
result.add("status", VPackValue(_status));
result.add("statusString", VPackValue(statusString()));
result.add("version", VPackValue(_version));
// Collection Flags
result.add("deleted", VPackValue(_isDeleted));
result.add("isSystem", VPackValue(_isSystem));
result.add("waitForSync", VPackValue(_waitForSync));
// TODO is this still releveant or redundant in keyGenerator?
result.add("allowUserKeys", VPackValue(_allowUserKeys));
// Physical Information
getPhysical()->getPropertiesVPack(result);
// TODO
result.add("count", VPackValue(_physical->initialCount()));
result.add("indexBuckets", VPackValue(_indexBuckets)); //MMFiles
// ODOT
// Indexes
result.add(VPackValue("indexes"));
getIndexesVPack(result, false);
// Cluster Specific
result.add("isSmart", VPackValue(_isSmart));
result.add("planId", VPackValue(std::to_string(_planId)));
result.add("numberOfShards", VPackValue(_numberOfShards));
result.add(VPackValue("shards"));
result.openObject();
for (auto const& shards : *_shardIds) {
result.add(VPackValue(shards.first));
result.openArray();
for (auto const& servers : shards.second) {
result.add(VPackValue(servers));
}
result.close(); // server array
}
result.close(); // shards
if (isSatellite()) {
result.add("replicationFactor", VPackValue("satelite"));
} else {
result.add("replicationFactor", VPackValue(_replicationFactor));
}
if (!_distributeShardsLike.empty()) {
if (translateCids) {
CollectionNameResolver resolver(_vocbase);
result.add("distributeShardsLike",
VPackValue(resolver.getCollectionNameCluster(
static_cast<TRI_voc_cid_t>(
basics::StringUtils::uint64(_distributeShardsLike)))));
} else {
result.add("distributeShardsLike", VPackValue(_distributeShardsLike));
}
}
result.add(VPackValue("shardKeys"));
result.openArray();
for (auto const& key : _shardKeys) {
result.add(VPackValue(key));
}
result.close(); // shardKeys
if (!_avoidServers.empty()) {
result.add(VPackValue("avoidServers"));
result.openArray();
for (auto const& server : _avoidServers) {
result.add(VPackValue(server));
}
result.close();
}
includeVelocyPackEnterprise(result);
TRI_ASSERT(result.isOpenObject());
// We leave the object open
}
VPackBuilder LogicalCollection::toVelocyPackIgnore(std::unordered_set<std::string> const& ignoreKeys, bool translateCids) const {
VPackBuilder full;
full.openObject();
toVelocyPack2(full, translateCids);
full.close();
return VPackCollection::remove(full.slice(), ignoreKeys);
}
void LogicalCollection::includeVelocyPackEnterprise(VPackBuilder&) const {
// We ain't no enterprise
}
// Internal helper that inserts VPack info into an existing object and leaves
// the object open
void LogicalCollection::getFullProperties(VPackBuilder& result, bool translateCids) const {
@ -932,7 +989,8 @@ void LogicalCollection::toVelocyPack(VPackBuilder& builder, bool includeIndexes,
void LogicalCollection::increaseInternalVersion() { ++_internalVersion; }
int LogicalCollection::updateProperties(VPackSlice const& slice, bool doSync) {
CollectionResult LogicalCollection::updateProperties(VPackSlice const& slice,
bool doSync) {
// the following collection properties are intentionally not updated as
// updating
// them would be very complicated:
@ -949,9 +1007,8 @@ int LogicalCollection::updateProperties(VPackSlice const& slice, bool doSync) {
slice, "indexBuckets",
2 /*Just for validation, this default Value passes*/);
if (tmp == 0 || tmp > 1024) {
THROW_ARANGO_EXCEPTION_MESSAGE(
TRI_ERROR_BAD_PARAMETER,
"indexBuckets must be a two-power between 1 and 1024");
return {TRI_ERROR_BAD_PARAMETER,
"indexBuckets must be a two-power between 1 and 1024"};
}
// The physical may first reject illegal properties.
@ -965,8 +1022,9 @@ int LogicalCollection::updateProperties(VPackSlice const& slice, bool doSync) {
if (!_isLocal) {
// We need to inform the cluster as well
return ClusterInfo::instance()->setCollectionPropertiesCoordinator(
int tmp = ClusterInfo::instance()->setCollectionPropertiesCoordinator(
_vocbase->name(), cid_as_string(), this);
return CollectionResult{tmp};
}
int64_t count = arangodb::basics::VelocyPackHelper::getNumericValue<int64_t>(
@ -977,7 +1035,7 @@ int LogicalCollection::updateProperties(VPackSlice const& slice, bool doSync) {
StorageEngine* engine = EngineSelectorFeature::ENGINE;
engine->changeCollection(_vocbase, _cid, this, doSync);
return TRI_ERROR_NO_ERROR;
return {};
}
/// @brief return the figures for a collection

View File

@ -62,6 +62,32 @@ namespace transaction {
class Methods;
}
struct CollectionResult {
CollectionResult() : code(TRI_ERROR_NO_ERROR) {}
explicit CollectionResult(int code) : code(code) {
if (code != TRI_ERROR_NO_ERROR) {
errorMessage = TRI_errno_string(code);
}
}
CollectionResult(int code, std::string const& message)
: code(code), errorMessage(message) {
TRI_ASSERT(code != TRI_ERROR_NO_ERROR);
}
bool successful() const {
return code == TRI_ERROR_NO_ERROR;
}
bool failed() const {
return !successful();
}
int code;
std::string errorMessage;
};
class LogicalCollection {
friend struct ::TRI_vocbase_t;
@ -150,7 +176,7 @@ class LogicalCollection {
/// if the boolean is false, the return value is always
/// TRI_VOC_COL_STATUS_CORRUPTED
TRI_vocbase_col_status_e tryFetchStatus(bool&);
std::string statusString();
std::string statusString() const;
uint64_t numberDocuments() const;
@ -215,12 +241,17 @@ class LogicalCollection {
virtual void setStatus(TRI_vocbase_col_status_e);
// SECTION: Serialisation
void toVelocyPack(velocypack::Builder&, bool withPath) const;
virtual void toVelocyPackForV8(velocypack::Builder&) const;
void toVelocyPack2(velocypack::Builder&, bool translateCids) const;
velocypack::Builder toVelocyPackIgnore(
std::unordered_set<std::string> const& ignoreKeys,
bool translateCids) const;
void toVelocyPack(velocypack::Builder&, bool withPath) const;
virtual void toVelocyPackForAgency(velocypack::Builder&);
virtual void toVelocyPackForClusterInventory(velocypack::Builder&,
bool useSystem) const;
// virtual void toVelocyPackForClusterCollectionInfo(velocypack::Builder&) const;
/// @brief transform the information for this collection to velocypack
/// The builder has to be an opened Type::Object
@ -229,7 +260,7 @@ class LogicalCollection {
inline TRI_vocbase_t* vocbase() const { return _vocbase; }
// Update this collection.
virtual int updateProperties(velocypack::Slice const&, bool);
virtual CollectionResult updateProperties(velocypack::Slice const&, bool);
/// @brief return the figures for a collection
virtual std::shared_ptr<velocypack::Builder> figures();
@ -321,6 +352,7 @@ private:
void increaseInternalVersion();
protected:
virtual void includeVelocyPackEnterprise(velocypack::Builder& result) const;
void getFullProperties(velocypack::Builder& result,
bool translateCids) const;

View File

@ -37,6 +37,7 @@ namespace transaction {
class Methods;
}
struct CollectionResult;
struct DocumentIdentifierToken;
class Index;
class IndexIterator;
@ -57,7 +58,8 @@ class PhysicalCollection {
virtual std::string const& path() const = 0;
virtual void setPath(std::string const&) = 0; // should be set during collection creation
// creation happens atm in engine->createCollection
virtual int updateProperties(arangodb::velocypack::Slice const& slice, bool doSync) = 0;
virtual CollectionResult updateProperties(
arangodb::velocypack::Slice const& slice, bool doSync) = 0;
virtual int persistProperties() noexcept = 0;
virtual PhysicalCollection* clone(LogicalCollection*, PhysicalCollection*) = 0;

View File

@ -337,8 +337,7 @@ bool TRI_vocbase_t::DropCollectionCallback(arangodb::LogicalCollection* collecti
/// @brief creates a new collection, worker function
arangodb::LogicalCollection* TRI_vocbase_t::createCollectionWorker(
VPackSlice parameters, TRI_voc_cid_t& cid,
bool writeMarker, VPackBuilder& builder) {
VPackSlice parameters, TRI_voc_cid_t& cid) {
std::string name = arangodb::basics::VelocyPackHelper::getStringValue(parameters, "name" , "");
TRI_ASSERT(!name.empty());
@ -369,10 +368,6 @@ arangodb::LogicalCollection* TRI_vocbase_t::createCollectionWorker(
collection->setStatus(TRI_VOC_COL_STATUS_LOADED);
// set collection version to 3.1, as the collection is just created
collection->setVersion(LogicalCollection::VERSION_31);
if (writeMarker) {
collection->toVelocyPack(builder, false);
}
events::CreateCollection(name, TRI_ERROR_NO_ERROR);
return collection;
} catch (...) {
@ -648,10 +643,10 @@ int TRI_vocbase_t::dropCollectionWorker(arangodb::LogicalCollection* collection,
VPackBuilder builder;
StorageEngine* engine = EngineSelectorFeature::ENGINE;
engine->getCollectionInfo(this, collection->cid(), builder, false, 0);
int res = collection->updateProperties(builder.slice().get("parameters"), doSync);
CollectionResult res = collection->updateProperties(builder.slice().get("parameters"), doSync);
if (res != TRI_ERROR_NO_ERROR) {
return res;
if (!res.successful()) {
return res.code;
}
collection->setStatus(TRI_VOC_COL_STATUS_DELETED);
@ -797,6 +792,7 @@ std::shared_ptr<VPackBuilder> TRI_vocbase_t::inventory(TRI_voc_tick_t maxTick,
builder->close();
LOG_TOPIC(ERR, arangodb::Logger::FIXME) << "toVpack( true, maxTick): " << builder->slice().toJson();
return builder;
}
@ -892,12 +888,11 @@ arangodb::LogicalCollection* TRI_vocbase_t::createCollection(
THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_ILLEGAL_NAME);
}
VPackBuilder builder;
READ_LOCKER(readLocker, _inventoryLock);
// note: cid may be modified by this function call
arangodb::LogicalCollection* collection = createCollectionWorker(parameters, cid, writeMarker, builder);
arangodb::LogicalCollection* collection = createCollectionWorker(parameters, cid);
if (!writeMarker) {
return collection;
@ -908,6 +903,7 @@ arangodb::LogicalCollection* TRI_vocbase_t::createCollection(
return nullptr;
}
VPackBuilder builder = collection->toVelocyPackIgnore({"statusString"}, true);
VPackSlice const slice = builder.slice();
TRI_ASSERT(cid != 0);

View File

@ -320,8 +320,7 @@ struct TRI_vocbase_t {
/// @brief creates a new collection, worker function
arangodb::LogicalCollection* createCollectionWorker(
arangodb::velocypack::Slice parameters, TRI_voc_cid_t& cid,
bool writeMarker, VPackBuilder& builder);
arangodb::velocypack::Slice parameters, TRI_voc_cid_t& cid);
/// @brief drops a collection, worker function
int dropCollectionWorker(arangodb::LogicalCollection* collection,