1
0
Fork 0

Merge branch 'devel' of https://github.com/arangodb/arangodb into devel

This commit is contained in:
jsteemann 2017-04-26 10:23:01 +02:00
commit 5cf3787c8d
4 changed files with 25 additions and 24 deletions

View File

@ -30,7 +30,6 @@
#include "Graph/ShortestPathResult.h"
#include "Transaction/Methods.h"
#include "Utils/OperationCursor.h"
#include "VocBase/EdgeCollectionInfo.h"
#include "VocBase/LogicalCollection.h"
#include "VocBase/ManagedDocumentResult.h"
#include "VocBase/ticks.h"
@ -48,7 +47,7 @@ ShortestPathBlock::ShortestPathBlock(ExecutionEngine* engine,
_vertexReg(ExecutionNode::MaxRegisterId),
_edgeVar(nullptr),
_edgeReg(ExecutionNode::MaxRegisterId),
_opts(nullptr),
_opts(static_cast<ShortestPathOptions*>(ep->options()),
_posInPath(0),
_pathLength(0),
_path(nullptr),
@ -58,8 +57,7 @@ ShortestPathBlock::ShortestPathBlock(ExecutionEngine* engine,
_useTargetRegister(false),
_usedConstant(false),
_engines(nullptr) {
_opts = static_cast<ShortestPathOptions*>(ep->options());
_mmdr.reset(new ManagedDocumentResult);
TRI_ASSERT(_opts != nullptr);
if (!ep->usesStartInVariable()) {
_startVertexId = ep->getStartVertex();

View File

@ -36,17 +36,11 @@ class ShortestPathFinder;
class ShortestPathResult;
}
namespace traverser {
class EdgeCollectionInfo;
}
namespace aql {
class ShortestPathNode;
class ShortestPathBlock : public ExecutionBlock {
friend struct EdgeWeightExpanderLocal;
friend struct EdgeWeightExpanderCluster;
public:
ShortestPathBlock(ExecutionEngine* engine, ShortestPathNode const* ep);
@ -96,8 +90,6 @@ class ShortestPathBlock : public ExecutionBlock {
/// @brief Register for the edge output
RegisterId _edgeReg;
std::unique_ptr<ManagedDocumentResult> _mmdr;
/// @brief options to compute the shortest path
graph::ShortestPathOptions* _opts;

View File

@ -1123,6 +1123,19 @@ void ClusterComm::addAuthorization(std::unordered_map<std::string, std::string>*
}
}
std::vector<Ticket> ClusterComm::activeServerTickets(std::vector<std::string> const& servers) {
std::vector<Ticket> tickets;
CONDITION_LOCKER(locker, somethingReceived);
for (auto const& it: responses) {
for (auto const& server: servers) {
if (it.second.result && it.second.result->serverID == server) {
tickets.push_back(it.first);
}
}
}
return tickets;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief ClusterComm main loop
////////////////////////////////////////////////////////////////////////////////
@ -1130,18 +1143,10 @@ void ClusterComm::addAuthorization(std::unordered_map<std::string, std::string>*
void ClusterCommThread::abortRequestsToFailedServers() {
ClusterInfo* ci = ClusterInfo::instance();
auto failedServers = ci->getFailedServers();
std::vector<std::string> failedServerEndpoints;
failedServerEndpoints.reserve(failedServers.size());
for (auto const& failedServer: failedServers) {
failedServerEndpoints.push_back(_cc->createCommunicatorDestination(ci->getServerEndpoint(failedServer), "/").url());
}
for (auto const& request: _cc->communicator()->requestsInProgress()) {
for (auto const& failedServerEndpoint: failedServerEndpoints) {
if (request->_destination.url().substr(0, failedServerEndpoint.length()) == failedServerEndpoint) {
_cc->communicator()->abortRequest(request->_ticketId);
}
if (failedServers.size() > 0) {
auto ticketIds = _cc->activeServerTickets(failedServers);
for (auto const& ticketId: ticketIds) {
_cc->communicator()->abortRequest(ticketId);
}
}
}

View File

@ -616,6 +616,12 @@ class ClusterComm {
void cleanupAllQueues();
//////////////////////////////////////////////////////////////////////////////
/// @brief activeServerTickets for a list of servers
//////////////////////////////////////////////////////////////////////////////
std::vector<Ticket> activeServerTickets(std::vector<std::string> const& servers);
//////////////////////////////////////////////////////////////////////////////
/// @brief our background communications thread
//////////////////////////////////////////////////////////////////////////////