1
0
Fork 0

fixed non-working DistributeBlock

This commit is contained in:
Jan Steemann 2014-10-22 12:07:31 +02:00
parent d9cee46f9b
commit e2ef21c33a
6 changed files with 78 additions and 29 deletions

View File

@ -75,7 +75,7 @@ Collection::~Collection () {
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief count the LOCAL number of documents in the collection
/// @brief count the number of documents in the collection
////////////////////////////////////////////////////////////////////////////////
size_t Collection::count () const {
@ -100,6 +100,21 @@ size_t Collection::count () const {
return static_cast<size_t>(numDocuments);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief returns the collection's plan id
////////////////////////////////////////////////////////////////////////////////
TRI_voc_cid_t Collection::getPlanId () const {
auto clusterInfo = triagens::arango::ClusterInfo::instance();
auto collectionInfo = clusterInfo->getCollection(std::string(vocbase->_name), name);
if (collectionInfo.get() == nullptr) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "collection not found");
}
return collectionInfo.get()->id();
}
////////////////////////////////////////////////////////////////////////////////
/// @brief returns the shard ids of a collection
////////////////////////////////////////////////////////////////////////////////

View File

@ -134,11 +134,16 @@ namespace triagens {
////////////////////////////////////////////////////////////////////////////////
/// @brief count the LOCAL number of documents in the collection
/// TODO: must be adjusted for clusters
////////////////////////////////////////////////////////////////////////////////
size_t count () const;
////////////////////////////////////////////////////////////////////////////////
/// @brief returns the collection's plan id
////////////////////////////////////////////////////////////////////////////////
TRI_voc_cid_t getPlanId() const;
////////////////////////////////////////////////////////////////////////////////
/// @brief returns the shard ids of a collection
////////////////////////////////////////////////////////////////////////////////

View File

@ -3805,9 +3805,16 @@ bool BlockWithClients::skipForShard (size_t number,
////////////////////////////////////////////////////////////////////////////////
size_t BlockWithClients::getClientId (std::string const& shardId) {
if (shardId.empty()) {
TRI_ASSERT(false);
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "got empty shard id");
}
auto it = _shardIdMap.find(shardId);
if (it == _shardIdMap.end()) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "AQL: unknown shard id");
std::string message("AQL: unknown shard id ");
message.append(shardId);
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, message);
}
return ((*it).second);
}
@ -3818,7 +3825,6 @@ size_t BlockWithClients::getClientId (std::string const& shardId) {
////////////////////////////////////////////////////////////////////////////////
bool BlockWithClients::preInitCursor () {
if (!_initOrShutdown) {
return false;
}
@ -4007,8 +4013,7 @@ DistributeBlock::DistributeBlock (ExecutionEngine* engine,
////////////////////////////////////////////////////////////////////////////////
int DistributeBlock::initializeCursor (AqlItemBlock* items, size_t pos) {
if (!preInitCursor()) {
if (! preInitCursor()) {
return TRI_ERROR_NO_ERROR;
}
@ -4220,14 +4225,18 @@ bool DistributeBlock::getBlockForClient (size_t atLeast,
////////////////////////////////////////////////////////////////////////////////
size_t DistributeBlock::sendToClient (AqlValue val) {
TRI_json_t const* json;
if (val._type == AqlValue::JSON) {
json = val._json->json();
}
/*
// TODO: if the DistributeBlock is supposed to run on coordinators only, it
// cannot have any AqlValues of type SHAPED. These are only present when there
// are physical collections
else if (val._type == AqlValue::SHAPED) {
json = val.toJson(_trx, _collection->documentCollection()).json();
}
*/
else {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_FAILED,
"DistributeBlock: can only send JSON or SHAPED");
@ -4235,13 +4244,21 @@ size_t DistributeBlock::sendToClient (AqlValue val) {
std::string shardId;
bool usesDefaultShardingAttributes;
auto clusterInfo = triagens::arango::ClusterInfo::instance();
clusterInfo->getResponsibleShard( _collection->getName(),
json,
true,
shardId,
usesDefaultShardingAttributes);
auto const planId = triagens::basics::StringUtils::itoa(_collection->getPlanId());
int res = clusterInfo->getResponsibleShard(planId,
json,
true,
shardId,
usesDefaultShardingAttributes);
if (res != TRI_ERROR_NO_ERROR) {
THROW_ARANGO_EXCEPTION(res);
}
TRI_ASSERT(!shardId.empty());
return getClientId(shardId);
}

View File

@ -615,15 +615,26 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
}
}
if (nodeType == ExecutionNode::GATHER) {
// we found a gather node
if (nodeType == ExecutionNode::GATHER ||
nodeType == ExecutionNode::DISTRIBUTE) {
// we found a gather or distribute node
TRI_ASSERT(remoteNode != nullptr);
// now we'll create a remote node for each shard and add it to the gather node
auto&& shardIds = static_cast<GatherNode const*>((*en))->collection()->shardIds();
// now we'll create a remote node for each shard and add it to the gather|distribute node
Collection const* collection = nullptr;
if (nodeType == ExecutionNode::GATHER) {
collection = static_cast<GatherNode const*>((*en))->collection();
}
else if (nodeType == ExecutionNode::DISTRIBUTE) {
collection = static_cast<DistributeNode const*>((*en))->collection();
}
else {
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
}
auto&& shardIds = collection->shardIds();
for (auto const& shardId : shardIds) {
// TODO: pass actual queryId into RemoteBlock
auto it = queryIds.find(shardId);
if (it == queryIds.end()) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "could not find query id in list");

View File

@ -135,7 +135,7 @@ namespace triagens {
if (_root != nullptr) {
return _root->shutdown();
}
else return 0;
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////

View File

@ -628,13 +628,13 @@ void ClusterInfo::loadPlannedCollections (bool acquireLock) {
if (it2 == _collections.end()) {
// not yet, so create an entry for the database
DatabaseCollections empty;
_collections.insert(std::make_pair(database, empty));
_collections.emplace(std::make_pair(database, empty));
it2 = _collections.find(database);
}
TRI_json_t* json = (*it).second._json;
// steal the json
(*it).second._json = 0;
(*it).second._json = nullptr;
shared_ptr<CollectionInfo> collectionData (new CollectionInfo(json));
vector<string>* shardKeys = new vector<string>;
@ -647,16 +647,16 @@ void ClusterInfo::loadPlannedCollections (bool acquireLock) {
for (it3 = shardIDs.begin(); it3 != shardIDs.end(); ++it3) {
shards->push_back(it3->first);
}
_shards.insert(
make_pair(collection,shared_ptr<vector<string> >(shards)));
_shards.emplace(
std::make_pair(collection, shared_ptr<vector<string> >(shards)));
// insert the collection into the existing map, insert it under its
// ID as well as under its name, so that a lookup can be done with
// either of the two.
(*it2).second.insert(std::make_pair(collection, collectionData));
(*it2).second.insert(std::make_pair(collectionData->name(),
collectionData));
(*it2).second.emplace(std::make_pair(collection, collectionData));
(*it2).second.emplace(std::make_pair(collectionData->name(),
collectionData));
}
_collectionsValid = true;
@ -2074,11 +2074,12 @@ int ClusterInfo::getResponsibleShard (CollectionID const& collectionID,
{
// Get the sharding keys and the number of shards:
READ_LOCKER(_lock);
map<CollectionID, shared_ptr<vector<string> > >::iterator it
map<CollectionID, shared_ptr<vector<string>>>::iterator it
= _shards.find(collectionID);
if (it != _shards.end()) {
shards = it->second;
map<CollectionID, shared_ptr<vector<string> > >::iterator it2
map<CollectionID, shared_ptr<vector<string>>>::iterator it2
= _shardKeys.find(collectionID);
if (it2 != _shardKeys.end()) {
shardKeysPtr = it2->second;
@ -2098,7 +2099,7 @@ int ClusterInfo::getResponsibleShard (CollectionID const& collectionID,
}
loadPlannedCollections();
}
if (!found) {
if (! found) {
return TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND;
}