1
0
Fork 0

Merge branch 'json_agency_comm' into devel

This commit is contained in:
Andreas Streichardt 2016-05-06 17:21:41 +02:00
commit 9cbe13dd43
10 changed files with 255 additions and 355 deletions

View File

@ -315,10 +315,12 @@ else ()
option(USE_OPTIMIZE_FOR_ARCHITECTURE "try to determine CPU architecture" ON)
if (USE_OPTIMIZE_FOR_ARCHITECTURE)
include(OptimizeForArchitecture)
OptimizeForArchitecture()
if (NOT USE_OPTIMIZE_FOR_ARCHITECTURE)
# mop: core2 (merom) is our absolute minimum!
SET(TARGET_ARCHITECTURE "merom")
endif ()
include(OptimizeForArchitecture)
OptimizeForArchitecture()
set(BASE_FLAGS "${Vc_ARCHITECTURE_FLAGS} ${BASE_FLAGS}")
endif ()

View File

@ -99,34 +99,25 @@ static std::string extractErrorMessage(std::string const& shardId,
}
////////////////////////////////////////////////////////////////////////////////
/// @brief creates an empty collection info object
/// @brief creates an empty collection info object
////////////////////////////////////////////////////////////////////////////////
CollectionInfo::CollectionInfo() : _json(nullptr) {}
CollectionInfo::CollectionInfo()
: _vpack(std::make_shared<VPackBuilder>()) {
}
////////////////////////////////////////////////////////////////////////////////
/// @brief creates a collection info object from json
////////////////////////////////////////////////////////////////////////////////
CollectionInfo::CollectionInfo(TRI_json_t* json) : _json(json) {}
CollectionInfo::CollectionInfo(std::shared_ptr<VPackBuilder> vpack, VPackSlice slice)
: _vpack(vpack), _slice(slice) {}
////////////////////////////////////////////////////////////////////////////////
/// @brief creates a collection info object from another
////////////////////////////////////////////////////////////////////////////////
CollectionInfo::CollectionInfo(CollectionInfo const& other)
: _json(other._json) {
if (other._json != nullptr) {
_json = TRI_CopyJson(TRI_UNKNOWN_MEM_ZONE, other._json);
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief move constructs a collection info object from another
////////////////////////////////////////////////////////////////////////////////
CollectionInfo::CollectionInfo(CollectionInfo&& other) : _json(other._json) {
other._json = nullptr;
: _vpack(other._vpack), _slice(other._slice) {
}
////////////////////////////////////////////////////////////////////////////////
@ -134,29 +125,8 @@ CollectionInfo::CollectionInfo(CollectionInfo&& other) : _json(other._json) {
////////////////////////////////////////////////////////////////////////////////
CollectionInfo& CollectionInfo::operator=(CollectionInfo const& other) {
if (other._json != nullptr && this != &other) {
_json = TRI_CopyJson(TRI_UNKNOWN_MEM_ZONE, other._json);
} else {
_json = nullptr;
}
return *this;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief move assigns a collection info object from another one
////////////////////////////////////////////////////////////////////////////////
CollectionInfo& CollectionInfo::operator=(CollectionInfo&& other) {
if (this == &other) {
return *this;
}
if (_json != nullptr) {
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, _json);
}
_json = other._json;
other._json = nullptr;
_vpack = other._vpack;
_slice = other._slice;
return *this;
}
@ -166,9 +136,6 @@ CollectionInfo& CollectionInfo::operator=(CollectionInfo&& other) {
////////////////////////////////////////////////////////////////////////////////
CollectionInfo::~CollectionInfo() {
if (_json != nullptr) {
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, _json);
}
}
////////////////////////////////////////////////////////////////////////////////
@ -339,7 +306,6 @@ void ClusterInfo::flush() {
loadCurrentCoordinators();
loadPlan();
loadCurrentDatabases();
loadPlannedCollections();
loadCurrentCollections();
}
@ -483,9 +449,15 @@ void ClusterInfo::loadPlan() {
if (planSlice.isObject()) {
decltype(_plannedDatabases) newDatabases;
decltype(_plannedCollections) newCollections;
decltype(_shards) newShards;
decltype(_shardKeys) newShardKeys;
bool swapDatabases = false;
auto databasesSlice = planSlice.get("Databases");
bool swapCollections = false;
VPackSlice databasesSlice;
databasesSlice = planSlice.get("Databases");
if (databasesSlice.isObject()) {
for (auto const& database : VPackObjectIterator(databasesSlice)) {
std::string const& name = database.key.copyString();
@ -495,11 +467,64 @@ void ClusterInfo::loadPlan() {
swapDatabases = true;
}
// mop: immediate children of collections are DATABASES, followed by their collections
databasesSlice = planSlice.get("Collections");
if (databasesSlice.isObject()) {
for (auto const& databasePairSlice : VPackObjectIterator(databasesSlice)) {
VPackSlice const& collectionsSlice = databasePairSlice.value;
if (!collectionsSlice.isObject()) {
continue;
}
DatabaseCollections databaseCollections;
std::string const databaseName = databasePairSlice.key.copyString();
for (auto const& collectionPairSlice: VPackObjectIterator(collectionsSlice)) {
VPackSlice const& collectionSlice = collectionPairSlice.value;
if (!collectionSlice.isObject()) {
continue;
}
std::string const collectionId = collectionPairSlice.key.copyString();
auto newCollection = std::make_shared<CollectionInfo>(planBuilder, collectionSlice);
std::string const collectionName = newCollection->name();
// mop: register with name as well as with id
databaseCollections.emplace(
std::make_pair(collectionName, newCollection));
databaseCollections.emplace(std::make_pair(collectionId, newCollection));
auto shardKeys = std::make_shared<std::vector<std::string>>(
newCollection->shardKeys());
newShardKeys.insert(make_pair(collectionId, shardKeys));
auto shardIDs = newCollection->shardIds();
auto shards = std::make_shared<std::vector<std::string>>();
for (auto const& p : *shardIDs) {
shards->push_back(p.first);
}
// Sort by the number in the shard ID ("s0000001" for example):
std::sort(shards->begin(), shards->end(),
[](std::string const& a, std::string const& b) -> bool {
return std::strtol(a.c_str() + 1, nullptr, 10) <
std::strtol(b.c_str() + 1, nullptr, 10);
});
newShards.emplace(std::make_pair(collectionId, shards));
}
newCollections.emplace(std::make_pair(databaseName, databaseCollections));
swapCollections = true;
}
}
WRITE_LOCKER(writeLocker, _planProt.lock);
_plan = planBuilder;
if (swapDatabases) {
_plannedDatabases.swap(newDatabases);
}
if (swapCollections) {
_plannedCollections.swap(newCollections);
_shards.swap(newShards);
_shardKeys.swap(newShardKeys);
}
_planProt.version++; // such that others notice our change
_planProt.isValid = true; // will never be reset to false
return;
@ -617,115 +642,6 @@ void ClusterInfo::loadCurrentDatabases() {
<< " body: " << result.body();
}
////////////////////////////////////////////////////////////////////////////////
/// @brief (re-)load the information about collections from the agency
/// Usually one does not have to call this directly.
////////////////////////////////////////////////////////////////////////////////
static std::string const prefixPlannedCollections = "Plan/Collections";
void ClusterInfo::loadPlannedCollections() {
uint64_t storedVersion = _plannedCollectionsProt.version;
MUTEX_LOCKER(mutexLocker, _plannedCollectionsProt.mutex);
if (_plannedCollectionsProt.version > storedVersion) {
// Somebody else did, what we intended to do, so just return
return;
}
// Now contact the agency:
AgencyCommResult result;
{
AgencyCommLocker locker("Plan", "READ");
if (locker.successful()) {
result = _agency.getValues2(prefixPlannedCollections);
} else {
LOG(ERR) << "Error while locking " << prefixPlannedCollections;
return;
}
}
if (result.successful()) {
velocypack::Slice databases =
result.slice()[0].get(std::vector<std::string>(
{AgencyComm::prefixStripped(), "Plan", "Collections"}));
if (!databases.isNone()) {
decltype(_plannedCollections) newCollections;
decltype(_shards) newShards;
decltype(_shardKeys) newShardKeys;
for (auto const& db : VPackObjectIterator(databases)) {
std::string const database = db.key.copyString();
for (auto const& col : VPackObjectIterator(db.value)) {
std::string const collection = col.key.copyString();
// check whether we have created an entry for the database already
AllCollections::iterator it2 = newCollections.find(database);
if (it2 == newCollections.end()) {
// not yet, so create an entry for the database
DatabaseCollections empty;
newCollections.emplace(std::make_pair(database, empty));
it2 = newCollections.find(database);
}
// TODO: The Collection info has to store VPack instead of JSON
TRI_json_t* json = arangodb::basics::VelocyPackHelper::velocyPackToJson(
col.value);
auto collectionData = std::make_shared<CollectionInfo>(json);
auto shardKeys = std::make_shared<std::vector<std::string>>(
collectionData->shardKeys());
newShardKeys.insert(make_pair(collection, shardKeys));
auto shardIDs = collectionData->shardIds();
auto shards = std::make_shared<std::vector<std::string>>();
for (auto const& p : *shardIDs) {
shards->push_back(p.first);
}
// Sort by the number in the shard ID ("s0000001" for example):
std::sort(shards->begin(), shards->end(),
[](std::string const& a, std::string const& b) -> bool {
return std::strtol(a.c_str() + 1, nullptr, 10) <
std::strtol(b.c_str() + 1, nullptr, 10);
});
newShards.emplace(std::make_pair(collection, shards));
// insert the collection into the existing map, insert it under its
// ID as well as under its name, so that a lookup can be done with
// either of the two.
(*it2).second.emplace(std::make_pair(collection, collectionData));
(*it2).second.emplace(
std::make_pair(collectionData->name(), collectionData));
}
}
// Now set the new value:
{
WRITE_LOCKER(writeLocker, _plannedCollectionsProt.lock);
_plannedCollections.swap(newCollections);
_shards.swap(newShards);
_shardKeys.swap(newShardKeys);
_plannedCollectionsProt.version++; // such that others notice our change
_plannedCollectionsProt.isValid = true; // will never be reset to false
}
return;
}
}
LOG(ERR) << "Error while loading " << prefixPlannedCollections
<< " httpCode: " << result.httpCode()
<< " errorCode: " << result.errorCode()
<< " errorMessage: " << result.errorMessage()
<< " body: " << result.body();
}
////////////////////////////////////////////////////////////////////////////////
/// @brief ask about a collection
/// If it is not found in the cache, the cache is reloaded once
////////////////////////////////////////////////////////////////////////////////
@ -734,14 +650,14 @@ std::shared_ptr<CollectionInfo> ClusterInfo::getCollection(
DatabaseID const& databaseID, CollectionID const& collectionID) {
int tries = 0;
if (!_plannedCollectionsProt.isValid) {
loadPlannedCollections();
if (!_planProt.isValid) {
loadPlan();
++tries;
}
while (true) { // left by break
{
READ_LOCKER(readLocker, _plannedCollectionsProt.lock);
READ_LOCKER(readLocker, _planProt.lock);
// look up database by id
AllCollections::const_iterator it = _plannedCollections.find(databaseID);
@ -760,7 +676,7 @@ std::shared_ptr<CollectionInfo> ClusterInfo::getCollection(
}
// must load collections outside the lock
loadPlannedCollections();
loadPlan();
}
return std::make_shared<CollectionInfo>();
@ -795,9 +711,9 @@ std::vector<std::shared_ptr<CollectionInfo>> const ClusterInfo::getCollections(
std::vector<std::shared_ptr<CollectionInfo>> result;
// always reload
loadPlannedCollections();
loadPlan();
READ_LOCKER(readLocker, _plannedCollectionsProt.lock);
READ_LOCKER(readLocker, _planProt.lock);
// look up database by id
AllCollections::const_iterator it = _plannedCollections.find(databaseID);
@ -1154,7 +1070,6 @@ int ClusterInfo::dropDatabaseCoordinator(std::string const& name,
// Load our own caches:
loadPlan();
loadPlannedCollections();
// Now wait for it to appear and be complete:
res.clear();
@ -1200,9 +1115,9 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName,
{
// check if a collection with the same name is already planned
loadPlannedCollections();
loadPlan();
READ_LOCKER(readLocker, _plannedCollectionsProt.lock);
READ_LOCKER(readLocker, _planProt.lock);
AllCollections::const_iterator it = _plannedCollections.find(databaseName);
if (it != _plannedCollections.end()) {
std::string const name =
@ -1297,7 +1212,7 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName,
}
// Update our cache:
loadPlannedCollections();
loadPlan();
while (TRI_microtime() <= endTime) {
agencyCallback->executeByCallbackOrTimeout(interval);
@ -1389,7 +1304,7 @@ int ClusterInfo::dropCollectionCoordinator(std::string const& databaseName,
}
// Update our own cache:
loadPlannedCollections();
loadPlan();
while (TRI_microtime() <= endTime) {
agencyCallback->executeByCallbackOrTimeout(interval);
@ -1472,7 +1387,7 @@ int ClusterInfo::setCollectionPropertiesCoordinator(
}
if (res.successful()) {
loadPlannedCollections();
loadPlan();
return TRI_ERROR_NO_ERROR;
}
@ -1550,7 +1465,7 @@ int ClusterInfo::setCollectionStatusCoordinator(
}
if (res.successful()) {
loadPlannedCollections();
loadPlan();
return TRI_ERROR_NO_ERROR;
}
@ -1707,7 +1622,7 @@ int ClusterInfo::ensureIndexCoordinator(
}
velocypack::Slice collection = database.get(collectionID);
loadPlannedCollections();
loadPlan();
// It is possible that between the fetching of the planned collections
// and the write lock we acquire below something has changed. Therefore
// we first get the previous value and then do a compare and swap operation.
@ -1718,7 +1633,7 @@ int ClusterInfo::ensureIndexCoordinator(
return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_LOCK_PLAN, errorMsg);
}
std::shared_ptr<VPackBuilder> collectionBuilder;
auto collectionBuilder = std::make_shared<VPackBuilder>();
{
std::shared_ptr<CollectionInfo> c =
getCollection(databaseName, collectionID);
@ -1728,14 +1643,14 @@ int ClusterInfo::ensureIndexCoordinator(
// that getCollection fetches the read lock and releases it before
// we get it again.
//
READ_LOCKER(readLocker, _plannedCollectionsProt.lock);
READ_LOCKER(readLocker, _planProt.lock);
if (c->empty()) {
return setErrormsg(TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND, errorMsg);
}
std::shared_ptr<VPackBuilder> tmp =
arangodb::basics::JsonHelper::toVelocyPack(c->getIndexes());
std::shared_ptr<VPackBuilder> tmp = std::make_shared<VPackBuilder>();
tmp->add(c->getIndexes());
MUTEX_LOCKER(guard, numberOfShardsMutex);
{
numberOfShards = c->numberOfShards();
@ -1781,8 +1696,7 @@ int ClusterInfo::ensureIndexCoordinator(
}
// now create a new index
collectionBuilder =
arangodb::basics::JsonHelper::toVelocyPack(c->getJson());
collectionBuilder->add(c->getSlice());
}
VPackSlice const collectionSlice = collectionBuilder->slice();
@ -1839,7 +1753,7 @@ int ClusterInfo::ensureIndexCoordinator(
}
// reload our own cache:
loadPlannedCollections();
loadPlan();
TRI_ASSERT(numberOfShards > 0);
@ -1958,7 +1872,7 @@ int ClusterInfo::dropIndexCoordinator(std::string const& databaseName,
_agencyCallbackRegistry->registerCallback(agencyCallback);
TRI_DEFER(_agencyCallbackRegistry->unregisterCallback(agencyCallback));
loadPlannedCollections();
loadPlan();
// It is possible that between the fetching of the planned collections
// and the write lock we acquire below something has changed. Therefore
// we first get the previous value and then do a compare and swap operation.
@ -1969,93 +1883,71 @@ int ClusterInfo::dropIndexCoordinator(std::string const& databaseName,
return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_LOCK_PLAN, errorMsg);
}
TRI_json_t* collectionJson = nullptr;
TRI_json_t const* indexes = nullptr;
VPackSlice indexes;
{
std::shared_ptr<CollectionInfo> c =
getCollection(databaseName, collectionID);
READ_LOCKER(readLocker, _plannedCollectionsProt.lock);
READ_LOCKER(readLocker, _planProt.lock);
if (c->empty()) {
return setErrormsg(TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND, errorMsg);
}
indexes = c->getIndexes();
if (!TRI_IsArrayJson(indexes)) {
if (!indexes.isArray()) {
// no indexes present, so we can't delete our index
return setErrormsg(TRI_ERROR_ARANGO_INDEX_NOT_FOUND, errorMsg);
}
collectionJson = TRI_CopyJson(TRI_UNKNOWN_MEM_ZONE, c->getJson());
MUTEX_LOCKER(guard, numberOfShardsMutex);
numberOfShards = c->numberOfShards();
}
if (collectionJson == nullptr) {
return setErrormsg(TRI_ERROR_OUT_OF_MEMORY, errorMsg);
}
TRI_ASSERT(TRI_IsArrayJson(indexes));
// delete previous indexes entry
TRI_DeleteObjectJson(TRI_UNKNOWN_MEM_ZONE, collectionJson, "indexes");
TRI_json_t* copy = TRI_CreateArrayJson(TRI_UNKNOWN_MEM_ZONE);
if (copy == nullptr) {
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, collectionJson);
return setErrormsg(TRI_ERROR_OUT_OF_MEMORY, errorMsg);
}
bool found = false;
VPackBuilder newIndexes;
{
VPackArrayBuilder newIndexesArrayBuilder(&newIndexes);
// mop: eh....do we need a flag to mark it invalid until cache is renewed?
// TRI_DeleteObjectJson(TRI_UNKNOWN_MEM_ZONE, collectionJson, "indexes");
for (auto const& indexSlice: VPackArrayIterator(indexes)) {
VPackSlice id = indexSlice.get("id");
VPackSlice type = indexSlice.get("type");
// copy remaining indexes back into collection
size_t const n = TRI_LengthArrayJson(indexes);
for (size_t i = 0; i < n; ++i) {
TRI_json_t const* v = TRI_LookupArrayJson(indexes, i);
TRI_json_t const* id = TRI_LookupObjectJson(v, "id");
TRI_json_t const* type = TRI_LookupObjectJson(v, "type");
if (!TRI_IsStringJson(id) || !TRI_IsStringJson(type)) {
continue;
}
if (idString == std::string(id->_value._string.data)) {
// found our index, ignore it when copying
found = true;
std::string const typeString(type->_value._string.data);
if (typeString == "primary" || typeString == "edge") {
// must not delete these index types
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, copy);
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, collectionJson);
return setErrormsg(TRI_ERROR_FORBIDDEN, errorMsg);
if (!id.isString() || !type.isString()) {
continue;
}
if (idString == id.copyString()) {
// found our index, ignore it when copying
found = true;
continue;
std::string const typeString = type.copyString();
if (typeString == "primary" || typeString == "edge") {
return setErrormsg(TRI_ERROR_FORBIDDEN, errorMsg);
}
continue;
}
newIndexes.add(indexSlice);
}
TRI_PushBack3ArrayJson(TRI_UNKNOWN_MEM_ZONE, copy,
TRI_CopyJson(TRI_UNKNOWN_MEM_ZONE, v));
}
TRI_Insert3ObjectJson(TRI_UNKNOWN_MEM_ZONE, collectionJson, "indexes",
copy);
if (!found) {
// did not find the sought index
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, collectionJson);
return setErrormsg(TRI_ERROR_ARANGO_INDEX_NOT_FOUND, errorMsg);
}
auto tmp = arangodb::basics::JsonHelper::toVelocyPack(collectionJson);
VPackBuilder newCollectionBuilder;
{
VPackObjectBuilder newCollectionObjectBuilder(&newCollectionBuilder);
for (auto const& property: VPackObjectIterator(previous)) {
if (property.key.copyString() == "indexes") {
newCollectionBuilder.add(property.key.copyString(), newIndexes.slice());
} else {
newCollectionBuilder.add(property.key.copyString(), property.value);
}
}
}
AgencyCommResult result =
ac.casValue(key, previous, tmp->slice(), 0.0, 0.0);
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, collectionJson);
ac.casValue(key, previous, newCollectionBuilder.slice(), 0.0, 0.0);
if (!result.successful()) {
return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION_IN_PLAN,
@ -2064,7 +1956,7 @@ int ClusterInfo::dropIndexCoordinator(std::string const& databaseName,
}
// load our own cache:
loadPlannedCollections();
loadPlan();
{
MUTEX_LOCKER(guard, numberOfShardsMutex);
@ -2427,15 +2319,15 @@ std::shared_ptr<std::vector<ServerID>> ClusterInfo::getResponsibleServer(
std::shared_ptr<std::vector<ShardID>> ClusterInfo::getShardList(
CollectionID const& collectionID) {
if (!_plannedCollectionsProt.isValid) {
loadPlannedCollections();
if (!_planProt.isValid) {
loadPlan();
}
int tries = 0;
while (true) {
{
// Get the sharding keys and the number of shards:
READ_LOCKER(readLocker, _plannedCollectionsProt.lock);
READ_LOCKER(readLocker, _planProt.lock);
// _shards is a map-type <CollectionId, shared_ptr<vector<string>>>
auto it = _shards.find(collectionID);
@ -2446,7 +2338,7 @@ std::shared_ptr<std::vector<ShardID>> ClusterInfo::getShardList(
if (++tries >= 2) {
return std::make_shared<std::vector<ShardID>>();
}
loadPlannedCollections();
loadPlan();
}
}
@ -2476,8 +2368,8 @@ int ClusterInfo::getResponsibleShard(CollectionID const& collectionID,
// Note that currently we take the number of shards and the shardKeys
// from Plan, since they are immutable. Later we will have to switch
// this to Current, when we allow to add and remove shards.
if (!_plannedCollectionsProt.isValid) {
loadPlannedCollections();
if (!_planProt.isValid) {
loadPlan();
}
int tries = 0;
@ -2488,7 +2380,7 @@ int ClusterInfo::getResponsibleShard(CollectionID const& collectionID,
while (true) {
{
// Get the sharding keys and the number of shards:
READ_LOCKER(readLocker, _plannedCollectionsProt.lock);
READ_LOCKER(readLocker, _planProt.lock);
// _shards is a map-type <CollectionId, shared_ptr<vector<string>>>
auto it = _shards.find(collectionID);
@ -2509,7 +2401,7 @@ int ClusterInfo::getResponsibleShard(CollectionID const& collectionID,
if (++tries >= 2) {
break;
}
loadPlannedCollections();
loadPlan();
}
if (!found) {
@ -2554,8 +2446,8 @@ int ClusterInfo::getResponsibleShard(CollectionID const& collectionID,
// Note that currently we take the number of shards and the shardKeys
// from Plan, since they are immutable. Later we will have to switch
// this to Current, when we allow to add and remove shards.
if (!_plannedCollectionsProt.isValid) {
loadPlannedCollections();
if (!_planProt.isValid) {
loadPlan();
}
int tries = 0;
@ -2567,7 +2459,7 @@ int ClusterInfo::getResponsibleShard(CollectionID const& collectionID,
while (true) {
{
// Get the sharding keys and the number of shards:
READ_LOCKER(readLocker, _plannedCollectionsProt.lock);
READ_LOCKER(readLocker, _planProt.lock);
// _shards is a map-type <CollectionId, shared_ptr<vector<string>>>
auto it = _shards.find(collectionID);
@ -2593,7 +2485,7 @@ int ClusterInfo::getResponsibleShard(CollectionID const& collectionID,
if (++tries >= 2) {
break;
}
loadPlannedCollections();
loadPlan();
}
if (!found) {
@ -2661,8 +2553,8 @@ void ClusterInfo::invalidatePlan() {
_planProt.isValid = false;
}
{
WRITE_LOCKER(writeLocker, _plannedCollectionsProt.lock);
_plannedCollectionsProt.isValid = false;
WRITE_LOCKER(writeLocker, _planProt.lock);
_planProt.isValid = false;
}
}

View File

@ -27,6 +27,7 @@
#include "Basics/Common.h"
#include "Basics/JsonHelper.h"
#include "Basics/VelocyPackHelper.h"
#include "Basics/Mutex.h"
#include "Basics/ReadWriteLock.h"
#include "Cluster/AgencyComm.h"
@ -35,6 +36,8 @@
#include "VocBase/vocbase.h"
#include <velocypack/Slice.h>
#include <velocypack/Iterator.h>
#include <velocypack/velocypack-aliases.h>
struct TRI_json_t;
@ -55,7 +58,7 @@ class CollectionInfo {
public:
CollectionInfo();
explicit CollectionInfo(struct TRI_json_t*);
CollectionInfo(std::shared_ptr<VPackBuilder>, VPackSlice);
CollectionInfo(CollectionInfo const&);
@ -72,14 +75,8 @@ class CollectionInfo {
//////////////////////////////////////////////////////////////////////////////
int replicationFactor () const {
TRI_json_t* const node
= arangodb::basics::JsonHelper::getObjectElement(_json,
"replicationFactor");
if (TRI_IsNumberJson(node)) {
return (int) (node->_value._number);
}
return 1;
return arangodb::basics::VelocyPackHelper::getNumericValue<TRI_voc_size_t>(
_slice, "replicationFactor", 1);
}
//////////////////////////////////////////////////////////////////////////////
@ -87,7 +84,7 @@ class CollectionInfo {
//////////////////////////////////////////////////////////////////////////////
bool empty() const {
return (nullptr == _json); //|| (id() == 0);
return _slice.isNone();
}
//////////////////////////////////////////////////////////////////////////////
@ -95,7 +92,14 @@ class CollectionInfo {
//////////////////////////////////////////////////////////////////////////////
TRI_voc_cid_t id() const {
return arangodb::basics::JsonHelper::stringUInt64(_json, "id");
if (!_slice.isObject()) {
return 0;
}
VPackSlice idSlice = _slice.get("id");
if (idSlice.isString()) {
return arangodb::basics::VelocyPackHelper::stringUInt64(idSlice);
}
return 0;
}
//////////////////////////////////////////////////////////////////////////////
@ -103,7 +107,7 @@ class CollectionInfo {
//////////////////////////////////////////////////////////////////////////////
std::string id_as_string() const {
return arangodb::basics::JsonHelper::getStringValue(_json, "id", "");
return arangodb::basics::VelocyPackHelper::getStringValue(_slice, "id", "");
}
//////////////////////////////////////////////////////////////////////////////
@ -111,7 +115,7 @@ class CollectionInfo {
//////////////////////////////////////////////////////////////////////////////
std::string name() const {
return arangodb::basics::JsonHelper::getStringValue(_json, "name", "");
return arangodb::basics::VelocyPackHelper::getStringValue(_slice, "name", "");
}
//////////////////////////////////////////////////////////////////////////////
@ -119,8 +123,8 @@ class CollectionInfo {
//////////////////////////////////////////////////////////////////////////////
TRI_col_type_e type() const {
return (TRI_col_type_e)arangodb::basics::JsonHelper::getNumericValue<int>(
_json, "type", (int)TRI_COL_TYPE_UNKNOWN);
return (TRI_col_type_e)arangodb::basics::VelocyPackHelper::getNumericValue<int>(
_slice, "type", (int)TRI_COL_TYPE_UNKNOWN);
}
//////////////////////////////////////////////////////////////////////////////
@ -129,8 +133,8 @@ class CollectionInfo {
TRI_vocbase_col_status_e status() const {
return (TRI_vocbase_col_status_e)
arangodb::basics::JsonHelper::getNumericValue<int>(
_json, "status", (int)TRI_VOC_COL_STATUS_CORRUPTED);
arangodb::basics::VelocyPackHelper::getNumericValue<int>(
_slice, "status", (int)TRI_VOC_COL_STATUS_CORRUPTED);
}
//////////////////////////////////////////////////////////////////////////////
@ -146,7 +150,7 @@ class CollectionInfo {
//////////////////////////////////////////////////////////////////////////////
bool deleted() const {
return arangodb::basics::JsonHelper::getBooleanValue(_json, "deleted",
return arangodb::basics::VelocyPackHelper::getBooleanValue(_slice, "deleted",
false);
}
@ -155,7 +159,7 @@ class CollectionInfo {
//////////////////////////////////////////////////////////////////////////////
bool doCompact() const {
return arangodb::basics::JsonHelper::getBooleanValue(_json, "doCompact",
return arangodb::basics::VelocyPackHelper::getBooleanValue(_slice, "doCompact",
false);
}
@ -164,7 +168,7 @@ class CollectionInfo {
//////////////////////////////////////////////////////////////////////////////
bool isSystem() const {
return arangodb::basics::JsonHelper::getBooleanValue(_json, "isSystem",
return arangodb::basics::VelocyPackHelper::getBooleanValue(_slice, "isSystem",
false);
}
@ -173,7 +177,7 @@ class CollectionInfo {
//////////////////////////////////////////////////////////////////////////////
bool isVolatile() const {
return arangodb::basics::JsonHelper::getBooleanValue(_json, "isVolatile",
return arangodb::basics::VelocyPackHelper::getBooleanValue(_slice, "isVolatile",
false);
}
@ -181,24 +185,22 @@ class CollectionInfo {
/// @brief returns the indexes
//////////////////////////////////////////////////////////////////////////////
TRI_json_t const* getIndexes() const {
return arangodb::basics::JsonHelper::getObjectElement(_json, "indexes");
VPackSlice const getIndexes() const {
if (_slice.isNone()) {
return VPackSlice();
}
return _slice.get("indexes");
}
//////////////////////////////////////////////////////////////////////////////
/// @brief returns a copy of the key options
/// the caller is responsible for freeing it
//////////////////////////////////////////////////////////////////////////////
TRI_json_t* keyOptions() const {
TRI_json_t const* keyOptions =
arangodb::basics::JsonHelper::getObjectElement(_json, "keyOptions");
if (keyOptions != nullptr) {
return TRI_CopyJson(TRI_UNKNOWN_MEM_ZONE, keyOptions);
VPackSlice const keyOptions() const {
if (_slice.isNone()) {
return VPackSlice();
}
return nullptr;
return _slice.get("keyOptions");
}
//////////////////////////////////////////////////////////////////////////////
@ -206,12 +208,10 @@ class CollectionInfo {
//////////////////////////////////////////////////////////////////////////////
bool allowUserKeys() const {
TRI_json_t const* keyOptions =
arangodb::basics::JsonHelper::getObjectElement(_json, "keyOptions");
if (keyOptions != nullptr) {
return arangodb::basics::JsonHelper::getBooleanValue(
keyOptions, "allowUserKeys", true);
VPackSlice keyOptionsSlice = keyOptions();
if (!keyOptionsSlice.isNone()) {
return arangodb::basics::VelocyPackHelper::getBooleanValue(
keyOptionsSlice, "allowUserKeys", true);
}
return true; // the default value
@ -222,7 +222,7 @@ class CollectionInfo {
//////////////////////////////////////////////////////////////////////////////
bool waitForSync() const {
return arangodb::basics::JsonHelper::getBooleanValue(_json, "waitForSync",
return arangodb::basics::VelocyPackHelper::getBooleanValue(_slice, "waitForSync",
false);
}
@ -231,8 +231,8 @@ class CollectionInfo {
//////////////////////////////////////////////////////////////////////////////
TRI_voc_size_t journalSize() const {
return arangodb::basics::JsonHelper::getNumericValue<TRI_voc_size_t>(
_json, "journalSize", 0);
return arangodb::basics::VelocyPackHelper::getNumericValue<TRI_voc_size_t>(
_slice, "journalSize", 0);
}
//////////////////////////////////////////////////////////////////////////////
@ -240,8 +240,8 @@ class CollectionInfo {
//////////////////////////////////////////////////////////////////////////////
uint32_t indexBuckets() const {
return arangodb::basics::JsonHelper::getNumericValue<uint32_t>(
_json, "indexBuckets", 1);
return arangodb::basics::VelocyPackHelper::getNumericValue<uint32_t>(
_slice, "indexBuckets", 1);
}
//////////////////////////////////////////////////////////////////////////////
@ -249,9 +249,19 @@ class CollectionInfo {
//////////////////////////////////////////////////////////////////////////////
std::vector<std::string> shardKeys() const {
TRI_json_t* const node =
arangodb::basics::JsonHelper::getObjectElement(_json, "shardKeys");
return arangodb::basics::JsonHelper::stringArray(node);
std::vector<std::string> shardKeys;
if (_slice.isNone()) {
return shardKeys;
}
auto shardKeysSlice = _slice.get("shardKeys");
if (shardKeysSlice.isArray()) {
for (auto const& shardKey: VPackArrayIterator(shardKeysSlice)) {
shardKeys.push_back(shardKey.copyString());
}
}
return shardKeys;
}
//////////////////////////////////////////////////////////////////////////////
@ -259,15 +269,18 @@ class CollectionInfo {
//////////////////////////////////////////////////////////////////////////////
bool usesDefaultShardKeys() const {
TRI_json_t* const node =
arangodb::basics::JsonHelper::getObjectElement(_json, "shardKeys");
if (TRI_LengthArrayJson(node) != 1) {
if (_slice.isNone()) {
return false;
}
TRI_json_t* firstKey = TRI_LookupArrayJson(node, 0);
TRI_ASSERT(TRI_IsStringJson(firstKey));
auto shardKeysSlice = _slice.get("shardKeys");
if (!shardKeysSlice.isArray() || shardKeysSlice.length() != 1) {
return false;
}
auto firstElement = shardKeysSlice.get(0);
TRI_ASSERT(firstElement.isString());
std::string shardKey =
arangodb::basics::JsonHelper::getStringValue(firstKey, "");
arangodb::basics::VelocyPackHelper::getStringValue(firstElement, "");
return shardKey == TRI_VOC_ATTRIBUTE_KEY;
}
@ -287,22 +300,18 @@ class CollectionInfo {
return res;
}
res.reset(new ShardMap());
TRI_json_t* const node =
arangodb::basics::JsonHelper::getObjectElement(_json, "shards");
if (node != nullptr && TRI_IsObjectJson(node)) {
size_t len = TRI_LengthVector(&node->_value._objects);
for (size_t i = 0; i < len; i += 2) {
auto key =
static_cast<TRI_json_t*>(TRI_AtVector(&node->_value._objects, i));
auto value = static_cast<TRI_json_t*>(
TRI_AtVector(&node->_value._objects, i + 1));
if (TRI_IsStringJson(key) && TRI_IsArrayJson(value)) {
ShardID shard = arangodb::basics::JsonHelper::getStringValue(key, "");
std::vector<ServerID> servers =
arangodb::basics::JsonHelper::stringArray(value);
if (shard != "") {
(*res).insert(make_pair(shard, servers));
auto shardsSlice = _slice.get("shards");
if (shardsSlice.isObject()) {
for (auto const& shardSlice: VPackObjectIterator(shardsSlice)) {
if (shardSlice.key.isString() && shardSlice.value.isArray()) {
ShardID shard = shardSlice.key.copyString();
std::vector<ServerID> servers;
for (auto const& serverSlice: VPackArrayIterator(shardSlice.value)) {
servers.push_back(serverSlice.copyString());
}
(*res).insert(make_pair(shardSlice.key.copyString(), servers));
}
}
}
@ -318,11 +327,13 @@ class CollectionInfo {
//////////////////////////////////////////////////////////////////////////////
int numberOfShards() const {
TRI_json_t* const node =
arangodb::basics::JsonHelper::getObjectElement(_json, "shards");
if (_slice.isNone()) {
return 0;
}
auto shardsSlice = _slice.get("shards");
if (TRI_IsObjectJson(node)) {
return (int)(TRI_LengthVector(&node->_value._objects) / 2);
if (shardsSlice.isObject()) {
return shardsSlice.length();
}
return 0;
}
@ -331,10 +342,16 @@ class CollectionInfo {
/// @brief returns the json
//////////////////////////////////////////////////////////////////////////////
TRI_json_t const* getJson() const { return _json; }
std::shared_ptr<VPackBuilder> const getVPack() const { return _vpack; }
//////////////////////////////////////////////////////////////////////////////
/// @brief returns the slice
//////////////////////////////////////////////////////////////////////////////
VPackSlice const getSlice() const { return _slice; }
private:
TRI_json_t* _json;
std::shared_ptr<VPackBuilder> _vpack;
VPackSlice _slice;
// Only to protect the cache:
mutable Mutex _mutex;
@ -568,13 +585,6 @@ class ClusterInfo {
std::vector<DatabaseID> listDatabases(bool = false);
//////////////////////////////////////////////////////////////////////////////
/// @brief (re-)load the information about planned collections from the agency
/// Usually one does not have to call this directly.
//////////////////////////////////////////////////////////////////////////////
void loadPlannedCollections();
//////////////////////////////////////////////////////////////////////////////
/// @brief (re-)load the information about our plan
/// Usually one does not have to call this directly.
@ -911,7 +921,6 @@ class ClusterInfo {
// The Plan state:
AllCollections _plannedCollections; // from Plan/Collections/
ProtectionData _plannedCollectionsProt;
std::unordered_map<CollectionID,
std::shared_ptr<std::vector<std::string>>>
_shards; // from Plan/Collections/

View File

@ -790,7 +790,7 @@ static void JS_GetCollectionInfoClusterInfo(
}
result->Set(TRI_V8_ASCII_STRING("shards"), shardIds);
v8::Handle<v8::Value> indexes = TRI_ObjectJson(isolate, ci->getIndexes());
v8::Handle<v8::Value> indexes = TRI_VPackToV8(isolate, ci->getIndexes());
result->Set(TRI_V8_ASCII_STRING("indexes"), indexes);
TRI_V8_RETURN(result);

View File

@ -3065,9 +3065,7 @@ std::shared_ptr<Index> Transaction::indexForCollectionCoordinator(
name.c_str(), _vocbase->_name);
}
TRI_json_t const* json = (*collectionInfo).getIndexes();
auto indexBuilder = arangodb::basics::JsonHelper::toVelocyPack(json);
VPackSlice const slice = indexBuilder->slice();
VPackSlice const slice = (*collectionInfo).getIndexes();
if (slice.isArray()) {
for (auto const& v : VPackArrayIterator(slice)) {
@ -3129,9 +3127,7 @@ std::vector<std::shared_ptr<Index>> Transaction::indexesForCollectionCoordinator
name.c_str(), _vocbase->_name);
}
TRI_json_t const* json = collectionInfo->getIndexes();
auto indexBuilder = arangodb::basics::JsonHelper::toVelocyPack(json);
VPackSlice const slice = indexBuilder->slice();
VPackSlice const slice = collectionInfo->getIndexes();
if (slice.isArray()) {
size_t const n = static_cast<size_t>(slice.length());

View File

@ -1105,7 +1105,7 @@ static void CreateCollectionCoordinator(
if (myerrno != TRI_ERROR_NO_ERROR) {
TRI_V8_THROW_EXCEPTION_MESSAGE(myerrno, errorMsg);
}
ci->loadPlannedCollections();
ci->loadPlan();
std::shared_ptr<CollectionInfo> c = ci->getCollection(databaseName, cid);
TRI_vocbase_col_t* newcoll = CoordinatorCollection(vocbase, *c);
@ -1276,9 +1276,7 @@ static void GetIndexesCoordinator(
v8::Handle<v8::Array> ret = v8::Array::New(isolate);
std::shared_ptr<VPackBuilder> tmp =
arangodb::basics::JsonHelper::toVelocyPack(c->getIndexes());
VPackSlice slice = tmp->slice();
VPackSlice slice = c->getIndexes();
if (slice.isArray()) {
uint32_t j = 0;

View File

@ -894,12 +894,13 @@ VocbaseCollectionInfo::VocbaseCollectionInfo(CollectionInfo const& other)
std::string const name = other.name();
memset(_name, 0, sizeof(_name));
memcpy(_name, name.c_str(), name.size());
VPackSlice keyOptionsSlice(other.keyOptions());
std::unique_ptr<TRI_json_t> otherOpts(other.keyOptions());
if (otherOpts != nullptr) {
std::shared_ptr<arangodb::velocypack::Builder> builder =
arangodb::basics::JsonHelper::toVelocyPack(otherOpts.get());
_keyOptions = builder->steal();
if (!keyOptionsSlice.isNone()) {
VPackBuilder builder;
builder.add(keyOptionsSlice);
_keyOptions = builder.steal();
}
}

View File

@ -79,8 +79,13 @@ ArangoStatement.prototype.execute = function () {
opts.cache = this._cache;
}
}
try {
var result = AQL_EXECUTE(this._query, this._bindVars, opts);
} catch (e) {
console.log("HASSHASSHASSHASS", this._query, e);
throw e;
}
return new GeneralArrayCursor(result.json, 0, null, result);
};

View File

@ -4,7 +4,7 @@ set -e
mkdir -p build-debian
cd build-debian
cmake -DASM_OPTIMIZATIONS=Off -DETCDIR=/etc -DCMAKE_INSTALL_PREFIX=/usr -DVARDIR=/var ..
cmake -DCMAKE_BUILD_TYPE=Release -DUSE_OPTIMIZE_FOR_ARCHITECTURE=Off -DETCDIR=/etc -DCMAKE_INSTALL_PREFIX=/usr -DVARDIR=/var ..
make -j12
cpack -G DEB --verbose
cd ..

View File

@ -1,9 +1,6 @@
#!/bin/bash
if [ -z "$XTERM" ] ; then
XTERM=x-terminal-emulator
fi
if [ -z "$XTERMOPTIONS" ] ; then
XTERMOPTIONS="--geometry=80x43"
XTERM=xterm
fi