diff --git a/arangod/Aql/Collection.cpp b/arangod/Aql/Collection.cpp index 64789d9714..6876037cd1 100644 --- a/arangod/Aql/Collection.cpp +++ b/arangod/Aql/Collection.cpp @@ -75,7 +75,7 @@ Collection::~Collection () { // ----------------------------------------------------------------------------- //////////////////////////////////////////////////////////////////////////////// -/// @brief count the LOCAL number of documents in the collection +/// @brief count the number of documents in the collection //////////////////////////////////////////////////////////////////////////////// size_t Collection::count () const { @@ -100,6 +100,21 @@ size_t Collection::count () const { return static_cast(numDocuments); } +//////////////////////////////////////////////////////////////////////////////// +/// @brief returns the collection's plan id +//////////////////////////////////////////////////////////////////////////////// + +TRI_voc_cid_t Collection::getPlanId () const { + auto clusterInfo = triagens::arango::ClusterInfo::instance(); + auto collectionInfo = clusterInfo->getCollection(std::string(vocbase->_name), name); + + if (collectionInfo.get() == nullptr) { + THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "collection not found"); + } + + return collectionInfo.get()->id(); +} + //////////////////////////////////////////////////////////////////////////////// /// @brief returns the shard ids of a collection //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/Aql/Collection.h b/arangod/Aql/Collection.h index de34f007a2..4d0a413d8c 100644 --- a/arangod/Aql/Collection.h +++ b/arangod/Aql/Collection.h @@ -134,11 +134,16 @@ namespace triagens { //////////////////////////////////////////////////////////////////////////////// /// @brief count the LOCAL number of documents in the collection -/// TODO: must be adjusted for clusters //////////////////////////////////////////////////////////////////////////////// size_t count () const; +//////////////////////////////////////////////////////////////////////////////// +/// @brief returns the collection's plan id +//////////////////////////////////////////////////////////////////////////////// + + TRI_voc_cid_t getPlanId() const; + //////////////////////////////////////////////////////////////////////////////// /// @brief returns the shard ids of a collection //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/Aql/ExecutionBlock.cpp b/arangod/Aql/ExecutionBlock.cpp index 9ed6da14f7..1e7cf5fde3 100644 --- a/arangod/Aql/ExecutionBlock.cpp +++ b/arangod/Aql/ExecutionBlock.cpp @@ -3805,9 +3805,16 @@ bool BlockWithClients::skipForShard (size_t number, //////////////////////////////////////////////////////////////////////////////// size_t BlockWithClients::getClientId (std::string const& shardId) { + if (shardId.empty()) { + TRI_ASSERT(false); + THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "got empty shard id"); + } + auto it = _shardIdMap.find(shardId); if (it == _shardIdMap.end()) { - THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "AQL: unknown shard id"); + std::string message("AQL: unknown shard id "); + message.append(shardId); + THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, message); } return ((*it).second); } @@ -3818,7 +3825,6 @@ size_t BlockWithClients::getClientId (std::string const& shardId) { //////////////////////////////////////////////////////////////////////////////// bool BlockWithClients::preInitCursor () { - if (!_initOrShutdown) { return false; } @@ -4007,8 +4013,7 @@ DistributeBlock::DistributeBlock (ExecutionEngine* engine, //////////////////////////////////////////////////////////////////////////////// int DistributeBlock::initializeCursor (AqlItemBlock* items, size_t pos) { - - if (!preInitCursor()) { + if (! preInitCursor()) { return TRI_ERROR_NO_ERROR; } @@ -4220,14 +4225,18 @@ bool DistributeBlock::getBlockForClient (size_t atLeast, //////////////////////////////////////////////////////////////////////////////// size_t DistributeBlock::sendToClient (AqlValue val) { - TRI_json_t const* json; if (val._type == AqlValue::JSON) { json = val._json->json(); } + /* + // TODO: if the DistributeBlock is supposed to run on coordinators only, it + // cannot have any AqlValues of type SHAPED. These are only present when there + // are physical collections else if (val._type == AqlValue::SHAPED) { json = val.toJson(_trx, _collection->documentCollection()).json(); } + */ else { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_FAILED, "DistributeBlock: can only send JSON or SHAPED"); @@ -4235,13 +4244,21 @@ size_t DistributeBlock::sendToClient (AqlValue val) { std::string shardId; bool usesDefaultShardingAttributes; - auto clusterInfo = triagens::arango::ClusterInfo::instance(); - clusterInfo->getResponsibleShard( _collection->getName(), - json, - true, - shardId, - usesDefaultShardingAttributes); + auto const planId = triagens::basics::StringUtils::itoa(_collection->getPlanId()); + + int res = clusterInfo->getResponsibleShard(planId, + json, + true, + shardId, + usesDefaultShardingAttributes); + + if (res != TRI_ERROR_NO_ERROR) { + THROW_ARANGO_EXCEPTION(res); + } + + TRI_ASSERT(!shardId.empty()); + return getClientId(shardId); } diff --git a/arangod/Aql/ExecutionEngine.cpp b/arangod/Aql/ExecutionEngine.cpp index 2638d5e2af..7ad712d17e 100644 --- a/arangod/Aql/ExecutionEngine.cpp +++ b/arangod/Aql/ExecutionEngine.cpp @@ -615,15 +615,26 @@ struct CoordinatorInstanciator : public WalkerWorker { } } - if (nodeType == ExecutionNode::GATHER) { - // we found a gather node + if (nodeType == ExecutionNode::GATHER || + nodeType == ExecutionNode::DISTRIBUTE) { + // we found a gather or distribute node TRI_ASSERT(remoteNode != nullptr); - // now we'll create a remote node for each shard and add it to the gather node - auto&& shardIds = static_cast((*en))->collection()->shardIds(); + // now we'll create a remote node for each shard and add it to the gather|distribute node + Collection const* collection = nullptr; + if (nodeType == ExecutionNode::GATHER) { + collection = static_cast((*en))->collection(); + } + else if (nodeType == ExecutionNode::DISTRIBUTE) { + collection = static_cast((*en))->collection(); + } + else { + THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL); + } + + auto&& shardIds = collection->shardIds(); for (auto const& shardId : shardIds) { - // TODO: pass actual queryId into RemoteBlock auto it = queryIds.find(shardId); if (it == queryIds.end()) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "could not find query id in list"); diff --git a/arangod/Aql/ExecutionEngine.h b/arangod/Aql/ExecutionEngine.h index 1d81793361..35386d7ab7 100644 --- a/arangod/Aql/ExecutionEngine.h +++ b/arangod/Aql/ExecutionEngine.h @@ -135,7 +135,7 @@ namespace triagens { if (_root != nullptr) { return _root->shutdown(); } - else return 0; + return TRI_ERROR_NO_ERROR; } //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/Cluster/ClusterInfo.cpp b/arangod/Cluster/ClusterInfo.cpp index f34d74e0c6..5feb98683d 100644 --- a/arangod/Cluster/ClusterInfo.cpp +++ b/arangod/Cluster/ClusterInfo.cpp @@ -628,13 +628,13 @@ void ClusterInfo::loadPlannedCollections (bool acquireLock) { if (it2 == _collections.end()) { // not yet, so create an entry for the database DatabaseCollections empty; - _collections.insert(std::make_pair(database, empty)); + _collections.emplace(std::make_pair(database, empty)); it2 = _collections.find(database); } TRI_json_t* json = (*it).second._json; // steal the json - (*it).second._json = 0; + (*it).second._json = nullptr; shared_ptr collectionData (new CollectionInfo(json)); vector* shardKeys = new vector; @@ -647,16 +647,16 @@ void ClusterInfo::loadPlannedCollections (bool acquireLock) { for (it3 = shardIDs.begin(); it3 != shardIDs.end(); ++it3) { shards->push_back(it3->first); } - _shards.insert( - make_pair(collection,shared_ptr >(shards))); + _shards.emplace( + std::make_pair(collection, shared_ptr >(shards))); // insert the collection into the existing map, insert it under its // ID as well as under its name, so that a lookup can be done with // either of the two. - (*it2).second.insert(std::make_pair(collection, collectionData)); - (*it2).second.insert(std::make_pair(collectionData->name(), - collectionData)); + (*it2).second.emplace(std::make_pair(collection, collectionData)); + (*it2).second.emplace(std::make_pair(collectionData->name(), + collectionData)); } _collectionsValid = true; @@ -2074,11 +2074,12 @@ int ClusterInfo::getResponsibleShard (CollectionID const& collectionID, { // Get the sharding keys and the number of shards: READ_LOCKER(_lock); - map > >::iterator it + map>>::iterator it = _shards.find(collectionID); + if (it != _shards.end()) { shards = it->second; - map > >::iterator it2 + map>>::iterator it2 = _shardKeys.find(collectionID); if (it2 != _shardKeys.end()) { shardKeysPtr = it2->second; @@ -2098,7 +2099,7 @@ int ClusterInfo::getResponsibleShard (CollectionID const& collectionID, } loadPlannedCollections(); } - if (!found) { + if (! found) { return TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND; }