1
0
Fork 0

Readded temporary logical collections in cluster. Otherwise it is entirely unclear when which collection has to be deleted.

This commit is contained in:
Michael Hackstein 2016-08-18 11:38:16 +02:00
parent 41d70c1df5
commit bb91925903
7 changed files with 80 additions and 18 deletions

View File

@ -659,9 +659,9 @@ std::shared_ptr<LogicalCollection> ClusterInfo::getCollection(
/// @brief ask about all collections
////////////////////////////////////////////////////////////////////////////////
std::vector<LogicalCollection*> const ClusterInfo::getCollections(
std::vector<std::shared_ptr<LogicalCollection>> const ClusterInfo::getCollections(
DatabaseID const& databaseID) {
std::vector<LogicalCollection*> result;
std::vector<std::shared_ptr<LogicalCollection>> result;
// always reload
loadPlan();
@ -681,7 +681,7 @@ std::vector<LogicalCollection*> const ClusterInfo::getCollections(
if (c < '0' || c > '9') {
// skip collections indexed by id
result.push_back((*it2).second.get());
result.push_back((*it2).second);
}
++it2;

View File

@ -305,7 +305,8 @@ class ClusterInfo {
/// @brief ask about all collections
//////////////////////////////////////////////////////////////////////////////
std::vector<LogicalCollection*> const getCollections(DatabaseID const&);
std::vector<std::shared_ptr<LogicalCollection>> const getCollections(
DatabaseID const&);
//////////////////////////////////////////////////////////////////////////////
/// @brief (re-)load the information about current collections from the agency

View File

@ -175,7 +175,11 @@ static int ParseDocumentOrDocumentHandle(v8::Isolate* isolate,
ClusterInfo* ci = ClusterInfo::instance();
std::shared_ptr<LogicalCollection> col =
ci->getCollection(vocbase->name(), collectionName);
collection = col.get();
if (col == nullptr) {
// collection not found
return TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND;
}
collection = new LogicalCollection(col);
} else {
collection = resolver->getCollectionStruct(collectionName);
}
@ -184,7 +188,6 @@ static int ParseDocumentOrDocumentHandle(v8::Isolate* isolate,
return TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND;
}
}
TRI_ASSERT(collection != nullptr);
return TRI_ERROR_NO_ERROR;
@ -256,6 +259,26 @@ static arangodb::LogicalCollection const* UseCollection(
return nullptr;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief get all cluster collections
////////////////////////////////////////////////////////////////////////////////
static std::vector<LogicalCollection*> GetCollectionsCluster(
TRI_vocbase_t* vocbase) {
std::vector<LogicalCollection*> result;
std::vector<std::shared_ptr<LogicalCollection>> const collections =
ClusterInfo::instance()->getCollections(vocbase->name());
for (auto& collection : collections) {
auto c = std::make_unique<LogicalCollection>(collection);
result.emplace_back(c.get());
c.release();
}
return result;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief get all cluster collection names
////////////////////////////////////////////////////////////////////////////////
@ -264,7 +287,7 @@ static std::vector<std::string> GetCollectionNamesCluster(
TRI_vocbase_t* vocbase) {
std::vector<std::string> result;
std::vector<LogicalCollection*> const collections =
std::vector<std::shared_ptr<LogicalCollection>> const collections =
ClusterInfo::instance()->getCollections(vocbase->name());
for (auto& collection : collections) {
@ -2679,8 +2702,7 @@ static void JS_CollectionVocbase(
TRI_V8_RETURN_NULL();
}
// TODO collection has to be ci.
// collection = CoordinatorCollection(vocbase, *ci);
collection = new LogicalCollection(ci);
} else {
collection = GetCollectionFromArgument(vocbase, val);
}
@ -2719,7 +2741,7 @@ static void JS_CollectionsVocbase(
// if we are a coordinator, we need to fetch the collection info from the
// agency
if (ServerState::instance()->isCoordinator()) {
colls = ClusterInfo::instance()->getCollections(vocbase->name());
colls = GetCollectionsCluster(vocbase);
} else {
colls = vocbase->collections();
}

View File

@ -1887,8 +1887,11 @@ static void MapGetVocBase(v8::Local<v8::String> const name,
ClusterInfo::instance()->getCollection(vocbase->name(),
std::string(key));
// TODO do we leak?
collection = ci.get();
if (ci == nullptr) {
TRI_V8_RETURN(v8::Handle<v8::Value>());
}
collection = new LogicalCollection(ci);
} else {
collection = vocbase->lookupCollection(std::string(key));
}

View File

@ -1087,7 +1087,10 @@ static void CreateCollectionCoordinator(
ci->loadPlan();
std::shared_ptr<LogicalCollection> c = ci->getCollection(databaseName, cid);
TRI_V8_RETURN(WrapCollection(isolate, c.get()));
// If we get a nullptr here the create collection should have failed before.
TRI_ASSERT(c != nullptr);
auto newCol = std::make_unique<LogicalCollection>(c);
TRI_V8_RETURN(WrapCollection(isolate, newCol.release()));
}
////////////////////////////////////////////////////////////////////////////////

View File

@ -70,7 +70,7 @@ static std::shared_ptr<arangodb::velocypack::Buffer<uint8_t> const> CopySliceVal
return nullptr;
}
info = info.get(name);
if (!info.isObject()) {
if (info.isNone()) {
return nullptr;
}
return VPackBuilder::clone(info).steal();
@ -119,6 +119,37 @@ LogicalCollection::LogicalCollection(TRI_vocbase_t* vocbase,
_collection(nullptr),
_lock() {}
/// @brief This the "copy" constructor used in the cluster
/// it is required to create objects that survive plan
/// modifications and can be freed
/// Can only be given to V8, cannot be used for functionality.
LogicalCollection::LogicalCollection(
std::shared_ptr<LogicalCollection> const other)
: _internalVersion(0),
_cid(other->_cid),
_planId(other->_planId),
_type(other->_type),
_name(other->_name),
_status(other->_status),
_isLocal(false),
_isDeleted(other->_isDeleted),
_doCompact(other->_doCompact),
_isSystem(other->_isSystem),
_isVolatile(other->_isVolatile),
_waitForSync(other->_waitForSync),
_keyOptions(nullptr), // Not needed
_indexBuckets(other->_indexBuckets),
_indexes(nullptr), // Not needed
_replicationFactor(other->_replicationFactor),
_numberOfShards(other->_numberOfShards),
_allowUserKeys(other->_allowUserKeys),
_shardIds(new ShardMap()), // Not needed
_vocbase(other->_vocbase),
_physical(nullptr),
_collection(nullptr),
_lock() {
}
// @brief Constructor used in coordinator case.
// The Slice contains the part of the plan that
// is relevant for this collection.
@ -206,13 +237,13 @@ TRI_col_type_e LogicalCollection::type() const {
return _type;
}
std::string const& LogicalCollection::name() const {
std::string LogicalCollection::name() const {
// TODO Activate this lock. Right now we have some locks outside.
// READ_LOCKER(readLocker, _lock);
return _name;
}
std::string const& LogicalCollection::dbName() const {
std::string LogicalCollection::dbName() const {
TRI_ASSERT(_vocbase != nullptr);
return _vocbase->name();
}

View File

@ -52,6 +52,8 @@ class LogicalCollection {
LogicalCollection(TRI_vocbase_t*, arangodb::velocypack::Slice);
LogicalCollection(std::shared_ptr<LogicalCollection> const);
~LogicalCollection();
LogicalCollection(LogicalCollection const&) = delete;
@ -68,8 +70,8 @@ class LogicalCollection {
TRI_col_type_e type() const;
std::string const& name() const;
std::string const& dbName() const;
std::string name() const;
std::string dbName() const;
std::string const& path() const;
TRI_vocbase_col_status_e status();