diff --git a/arangod/Agency/AgencyCommon.h b/arangod/Agency/AgencyCommon.h index d7901f002c..2bc4e7f911 100644 --- a/arangod/Agency/AgencyCommon.h +++ b/arangod/Agency/AgencyCommon.h @@ -41,10 +41,6 @@ namespace consensus { typedef uint64_t term_t; -/// @brief Id type -typedef std::string id_t; - - /// @brief Agent roles enum role_t {FOLLOWER, CANDIDATE, LEADER}; @@ -64,10 +60,10 @@ typedef uint64_t index_t; /// @brief Read request return type struct read_ret_t { bool accepted; ///< @brief Query accepted (i.e. we are leader) - id_t redirect; ///< @brief If not accepted redirect id + std::string redirect; ///< @brief If not accepted redirect id std::vector success; ///< @brief Query's precond OK query_t result; ///< @brief Query result - read_ret_t(bool a, id_t id, std::vector suc = std::vector(), + read_ret_t(bool a, std::string id, std::vector suc = std::vector(), query_t res = nullptr) : accepted(a), redirect(id), success(suc), result(res) {} }; @@ -76,12 +72,12 @@ struct read_ret_t { /// @brief Write request return type struct write_ret_t { bool accepted; ///< @brief Query accepted (i.e. we are leader) - id_t redirect; ///< @brief If not accepted redirect id + std::string redirect; ///< @brief If not accepted redirect id std::vector applied; std::vector indices; // Indices of log entries (if any) to wait for - write_ret_t() : accepted(false), redirect(0) {} - write_ret_t(bool a, id_t id) : accepted(a), redirect(id) {} - write_ret_t(bool a, id_t id, std::vector const& app, + write_ret_t() : accepted(false), redirect("") {} + write_ret_t(bool a, std::string id) : accepted(a), redirect(id) {} + write_ret_t(bool a, std::string id, std::vector const& app, std::vector const& idx) : accepted(a), redirect(id), applied(app), indices(idx) {} }; diff --git a/arangod/Agency/AgencyFeature.cpp b/arangod/Agency/AgencyFeature.cpp index 0fade13c08..fce14c3755 100644 --- a/arangod/Agency/AgencyFeature.cpp +++ b/arangod/Agency/AgencyFeature.cpp @@ -39,6 +39,7 @@ AgencyFeature::AgencyFeature(application_features::ApplicationServer* server) : ApplicationFeature(server, "Agency"), _activated(false), _size(1), + _poolSize(1), _minElectionTimeout(0.5), _maxElectionTimeout(2.5), _supervision(false), @@ -83,6 +84,9 @@ void AgencyFeature::collectOptions(std::shared_ptr options) { options->addOption("--agency.endpoint", "agency endpoints", new VectorParameter(&_agencyEndpoints)); + + options->addOption("--agency.my-address", "which address to advertise to the outside", + new StringParameter(&_agencyMyAddress)); options->addOption("--agency.supervision", "perform arangodb cluster supervision", @@ -165,6 +169,15 @@ void AgencyFeature::validateOptions(std::shared_ptr options) { << " " << __FILE__ << __LINE__; } + if (!_agencyMyAddress.empty()) { + std::string const unified = Endpoint::unifiedForm(_agencyMyAddress); + + if (unified.empty()) { + LOG_TOPIC(FATAL, Logger::AGENCY) << "invalid endpoint '" + << _agencyMyAddress << "' specified for --agency.my-address"; + FATAL_ERROR_EXIT(); + } + } } void AgencyFeature::prepare() { @@ -179,22 +192,27 @@ void AgencyFeature::start() { // TODO: Port this to new options handling std::string endpoint; - std::string port = "8529"; - - EndpointFeature* endpointFeature = - ApplicationServer::getFeature("Endpoint"); - auto endpoints = endpointFeature->httpEndpoints(); - - if (!endpoints.empty()) { - std::string const& tmp = endpoints.front(); - size_t pos = tmp.find(':',10); - - if (pos != std::string::npos) { - port = tmp.substr(pos + 1, tmp.size() - pos); + + if (_agencyMyAddress.empty()) { + std::string port = "8529"; + + EndpointFeature* endpointFeature = + ApplicationServer::getFeature("Endpoint"); + auto endpoints = endpointFeature->httpEndpoints(); + + if (!endpoints.empty()) { + std::string const& tmp = endpoints.front(); + size_t pos = tmp.find(':',10); + + if (pos != std::string::npos) { + port = tmp.substr(pos + 1, tmp.size() - pos); + } } + + endpoint = std::string("tcp://localhost:" + port); + } else { + endpoint = _agencyMyAddress; } - - endpoint = std::string("tcp://localhost:" + port); LOG_TOPIC(DEBUG, Logger::AGENCY) << "Agency endpoint " << endpoint; _agent.reset( diff --git a/arangod/Agency/AgencyFeature.h b/arangod/Agency/AgencyFeature.h index 000b0083f1..cee475eba7 100644 --- a/arangod/Agency/AgencyFeature.h +++ b/arangod/Agency/AgencyFeature.h @@ -53,6 +53,7 @@ class AgencyFeature : virtual public application_features::ApplicationFeature { bool _waitForSync; double _supervisionFrequency; uint64_t _compactionStepSize; + std::string _agencyMyAddress; std::vector _agencyEndpoints; public: diff --git a/arangod/Agency/Agent.cpp b/arangod/Agency/Agent.cpp index 116b1a7018..324ef24a86 100644 --- a/arangod/Agency/Agent.cpp +++ b/arangod/Agency/Agent.cpp @@ -64,7 +64,7 @@ std::string Agent::id() const { /// Agent's id is set once from state machine -bool Agent::id(arangodb::consensus::id_t const& id) { +bool Agent::id(std::string const& id) { bool success; if ((success = _config.setId(id))) { LOG_TOPIC(DEBUG, Logger::AGENCY) << "My id is " << id; @@ -123,7 +123,7 @@ std::string Agent::endpoint() const { /// Handle voting priv_rpc_ret_t Agent::requestVote( - term_t t, arangodb::consensus::id_t id, index_t lastLogIndex, + term_t t, std::string const& id, index_t lastLogIndex, index_t lastLogTerm, query_t const& query) { return priv_rpc_ret_t( _constituent.vote(t, id, lastLogIndex, lastLogTerm), this->term()); @@ -137,7 +137,7 @@ config_t const Agent::config() const { /// Leader's id -arangodb::consensus::id_t Agent::leaderID() const { +std::string Agent::leaderID() const { return _constituent.leaderID(); } @@ -196,7 +196,7 @@ bool Agent::waitFor(index_t index, double timeout) { // AgentCallback reports id of follower and its highest processed index -void Agent::reportIn(arangodb::consensus::id_t id, index_t index) { +void Agent::reportIn(std::string const& id, index_t index) { MUTEX_LOCKER(mutexLocker, _ioLock); @@ -240,11 +240,9 @@ void Agent::reportIn(arangodb::consensus::id_t id, index_t index) { /// Followers' append entries -bool Agent::recvAppendEntriesRPC(term_t term, - arangodb::consensus::id_t leaderId, - index_t prevIndex, term_t prevTerm, - index_t leaderCommitIndex, - query_t const& queries) { +bool Agent::recvAppendEntriesRPC( + term_t term, std::string const& leaderId, index_t prevIndex, term_t prevTerm, + index_t leaderCommitIndex, query_t const& queries) { // Update commit index if (queries->slice().type() != VPackValueType::Array) { @@ -297,8 +295,7 @@ bool Agent::recvAppendEntriesRPC(term_t term, /// Leader's append entries -priv_rpc_ret_t Agent::sendAppendEntriesRPC( - arangodb::consensus::id_t follower_id) { +priv_rpc_ret_t Agent::sendAppendEntriesRPC(std::string const& follower_id) { term_t t(0); { @@ -646,7 +643,7 @@ query_t Agent::gossip(query_t const& in, bool isCallback) { THROW_ARANGO_EXCEPTION_MESSAGE( 20002, "Gossip message must contain string parameter 'id'"); } - std::string id = slice.get("id").copyString(); + //std::string id = slice.get("id").copyString(); if (!slice.hasKey("endpoint") || !slice.get("endpoint").isString()) { THROW_ARANGO_EXCEPTION_MESSAGE( diff --git a/arangod/Agency/Agent.h b/arangod/Agency/Agent.h index f8d8198f8c..7c6db395ca 100644 --- a/arangod/Agency/Agent.h +++ b/arangod/Agency/Agent.h @@ -50,10 +50,10 @@ class Agent : public arangodb::Thread { term_t term() const; /// @brief Get current term - arangodb::consensus::id_t id() const; + std::string id() const; /// @brief Vote request - priv_rpc_ret_t requestVote(term_t, arangodb::consensus::id_t, index_t, + priv_rpc_ret_t requestVote(term_t, std::string const&, index_t, index_t, query_t const&); /// @brief Provide configuration @@ -75,7 +75,7 @@ class Agent : public arangodb::Thread { arangodb::consensus::index_t lastCommitted() const; /// @brief Leader ID - arangodb::consensus::id_t leaderID() const; + std::string leaderID() const; /// @brief Are we leading? bool leading() const; @@ -94,13 +94,13 @@ class Agent : public arangodb::Thread { /// @brief Received by followers to replicate log entries ($5.3); /// also used as heartbeat ($5.2). - bool recvAppendEntriesRPC(term_t term, arangodb::consensus::id_t leaderId, - index_t prevIndex, term_t prevTerm, - index_t lastCommitIndex, query_t const& queries); + bool recvAppendEntriesRPC( + term_t term, std::string const& leaderId, index_t prevIndex, + term_t prevTerm, index_t lastCommitIndex, query_t const& queries); /// @brief Invoked by leader to replicate log entries ($5.3); /// also used as heartbeat ($5.2). - priv_rpc_ret_t sendAppendEntriesRPC(arangodb::consensus::id_t slave_id); + priv_rpc_ret_t sendAppendEntriesRPC(std::string const& slave_id); /// @brief 1. Deal with appendEntries to slaves. /// 2. Report success of write processes. @@ -125,7 +125,7 @@ class Agent : public arangodb::Thread { void beginShutdown() override final; /// @brief Report appended entries from AgentCallback - void reportIn(arangodb::consensus::id_t id, index_t idx); + void reportIn(std::string const& id, index_t idx); /// @brief Wait for slaves to confirm appended entries bool waitFor(index_t last_entry, double timeout = 2.0); @@ -165,7 +165,7 @@ class Agent : public arangodb::Thread { Agent& operator=(VPackSlice const&); /// @brief Get current term - bool id(arangodb::consensus::id_t const&); + bool id(std::string const&); /// @brief Get current term bool mergeConfiguration(VPackSlice const&); diff --git a/arangod/Agency/AgentCallback.cpp b/arangod/Agency/AgentCallback.cpp index c6e9cb5169..ba22cde7ca 100644 --- a/arangod/Agency/AgentCallback.cpp +++ b/arangod/Agency/AgentCallback.cpp @@ -28,11 +28,11 @@ using namespace arangodb::consensus; using namespace arangodb::velocypack; -AgentCallback::AgentCallback() : _agent(0), _last(0), _slaveID(0) {} +AgentCallback::AgentCallback() : _agent(0), _last(0) {} -AgentCallback::AgentCallback(Agent* agent, arangodb::consensus::id_t slaveID, - index_t last) - : _agent(agent), _last(last), _slaveID(slaveID) {} +AgentCallback::AgentCallback( + Agent* agent, std::string const& slaveID, index_t last) + : _agent(agent), _last(last), _slaveID(slaveID) {} void AgentCallback::shutdown() { _agent = 0; } diff --git a/arangod/Agency/AgentCallback.h b/arangod/Agency/AgentCallback.h index f9eadc9342..0670ab14ac 100644 --- a/arangod/Agency/AgentCallback.h +++ b/arangod/Agency/AgentCallback.h @@ -36,7 +36,7 @@ class AgentCallback : public arangodb::ClusterCommCallback { public: AgentCallback(); - AgentCallback(Agent*, arangodb::consensus::id_t, index_t); + AgentCallback(Agent*, std::string const&, index_t); virtual bool operator()(arangodb::ClusterCommResult*) override final; @@ -45,7 +45,7 @@ public: private: Agent* _agent; index_t _last; - arangodb::consensus::id_t _slaveID; + std::string _slaveID; }; } } // namespace diff --git a/arangod/Agency/Constituent.cpp b/arangod/Agency/Constituent.cpp index 059328aeef..c0ac999204 100644 --- a/arangod/Agency/Constituent.cpp +++ b/arangod/Agency/Constituent.cpp @@ -454,7 +454,7 @@ void Constituent::run() { std::vector act = _agent->config().active(); while(!this->isStopping() && - ((find(act.begin(), act.end(), _id) - act.begin()) >= size())) { + ((size_t)(find(act.begin(), act.end(), _id) - act.begin()) >= size())) { std::this_thread::sleep_for(std::chrono::milliseconds(1000)); } diff --git a/arangod/Agency/GossipCallback.cpp b/arangod/Agency/GossipCallback.cpp index c68921c33b..fb79b4fe2b 100644 --- a/arangod/Agency/GossipCallback.cpp +++ b/arangod/Agency/GossipCallback.cpp @@ -27,7 +27,7 @@ using namespace arangodb::consensus; using namespace arangodb::velocypack; -GossipCallback::GossipCallback(Agent*) {} +GossipCallback::GossipCallback(Agent*) : _agent(nullptr) {} bool GossipCallback::operator()(arangodb::ClusterCommResult* res) { if (res->status == CL_COMM_SENT && res->result->getHttpReturnCode() == 200) { diff --git a/arangod/Agency/Inception.cpp b/arangod/Agency/Inception.cpp index c6347c14ea..0347c5cf6d 100644 --- a/arangod/Agency/Inception.cpp +++ b/arangod/Agency/Inception.cpp @@ -56,7 +56,7 @@ void Inception::run() { TRI_ASSERT(_agent != nullptr); auto s = std::chrono::system_clock::now(); - std::chrono::seconds timeout(5); + std::chrono::seconds timeout(120); size_t i = 0; //bool cs = false; while (!this->isStopping()) { @@ -82,7 +82,7 @@ void Inception::run() { auto hf = std::make_unique>(); arangodb::ClusterComm::instance()->asyncRequest( - "1", 1, p, GeneralRequest::RequestType::POST, path, + clientid, 1, p, GeneralRequest::RequestType::POST, path, std::make_shared(out->toJson()), hf, std::make_shared(_agent), 1.0, true); } diff --git a/arangod/Agency/Inception.h b/arangod/Agency/Inception.h index 1a90f6257c..ee0b28e3a2 100644 --- a/arangod/Agency/Inception.h +++ b/arangod/Agency/Inception.h @@ -54,7 +54,6 @@ public: private: - bool _done; Agent* _agent; }; diff --git a/arangod/Agency/RestAgencyHandler.cpp b/arangod/Agency/RestAgencyHandler.cpp index ea1312d1c1..fa85601e47 100644 --- a/arangod/Agency/RestAgencyHandler.cpp +++ b/arangod/Agency/RestAgencyHandler.cpp @@ -71,7 +71,7 @@ inline RestHandler::status RestAgencyHandler::reportUnknownMethod() { return status::DONE; } -void RestAgencyHandler::redirectRequest(arangodb::consensus::id_t leaderId) { +void RestAgencyHandler::redirectRequest(std::string const& leaderId) { try { std::string url = Endpoint::uriForm(_agent->config().poolAt(leaderId)) + diff --git a/arangod/Agency/RestAgencyHandler.h b/arangod/Agency/RestAgencyHandler.h index d215d9cf62..592b090dae 100644 --- a/arangod/Agency/RestAgencyHandler.h +++ b/arangod/Agency/RestAgencyHandler.h @@ -52,7 +52,7 @@ class RestAgencyHandler : public RestBaseHandler { status reportMethodNotAllowed(); status handleState(); - void redirectRequest(arangodb::consensus::id_t leaderId); + void redirectRequest(std::string const& leaderId); consensus::Agent* _agent; }; } diff --git a/arangod/Agency/RestAgencyPrivHandler.cpp b/arangod/Agency/RestAgencyPrivHandler.cpp index f5888cbd8e..b6565ff781 100644 --- a/arangod/Agency/RestAgencyPrivHandler.cpp +++ b/arangod/Agency/RestAgencyPrivHandler.cpp @@ -91,8 +91,7 @@ RestHandler::status RestAgencyPrivHandler::execute() { } else { term_t term = 0; term_t prevLogTerm = 0; - arangodb::consensus::id_t - id; // leaderId for appendEntries, cadidateId for requestVote + std::string id; // leaderId for appendEntries, cadidateId for requestVote arangodb::consensus::index_t prevLogIndex, leaderCommit; if (_request->suffix()[0] == "appendEntries") { // appendEntries if (_request->requestType() != GeneralRequest::RequestType::POST) { diff --git a/js/common/modules/@arangodb/general-graph.js b/js/common/modules/@arangodb/general-graph.js index 869a389292..64ea372eac 100644 --- a/js/common/modules/@arangodb/general-graph.js +++ b/js/common/modules/@arangodb/general-graph.js @@ -277,9 +277,13 @@ var startInAllCollections = function (collections) { return `UNION(${collections.map(c => `(FOR x IN ${c} RETURN x)`).join(", ")})`; }; -var buildEdgeCollectionRestriction = function (collections) { - if (!Array.isArray(collections)) { - collections = [ collections ]; +var buildEdgeCollectionRestriction = function (collections, bindVars, graph) { + if (typeof collections === "string") { + collections = [collections]; + } + if (!Array.isArray(collections) || collections.length === 0) { + bindVars.graphName = graph.__name; + return "GRAPH @graphName"; } return collections.map(collection => '`' + collection + '`').join(','); }; @@ -1186,20 +1190,12 @@ Graph.prototype._OUTEDGES = function (vertexId) { Graph.prototype._edges = function (vertexExample, options) { var bindVars = {}; options = options || {}; - if (options.edgeCollectionRestriction) { - if (!Array.isArray(options.edgeCollectionRestriction)) { - options.edgeCollectionRestriction = [ options.edgeCollectionRestriction ]; - } - } var query = ` ${transformExampleToAQL(vertexExample, Object.keys(this.__vertexCollections), bindVars, "start")} FOR v, e IN ${options.minDepth || 1}..${options.maxDepth || 1} ${options.direction || "ANY"} start - ${Array.isArray(options.edgeCollectionRestriction) && options.edgeCollectionRestriction.length > 0 ? buildEdgeCollectionRestriction(options.edgeCollectionRestriction) : "GRAPH @graphName"} + ${buildEdgeCollectionRestriction(options.edgeCollectionRestriction, bindVars, this)} ${buildFilter(options.edgeExamples, bindVars, "e")} RETURN DISTINCT ${options.includeData === true ? "e" : "e._id"}`; - if (!Array.isArray(options.edgeCollectionRestriction) || options.edgeCollectionRestriction.length === 0) { - bindVars.graphName = this.__name; - } return db._query(query, bindVars).toArray(); }; @@ -1302,23 +1298,17 @@ Graph.prototype._neighbors = function (vertexExample, options) { options.vertexCollectionRestriction = [ options.vertexCollectionRestriction ]; } } - if (options.edgeCollectionRestriction) { - if (!Array.isArray(options.edgeCollectionRestriction)) { - options.edgeCollectionRestriction = [ options.edgeCollectionRestriction ]; - } - } var bindVars = {}; var query = ` ${transformExampleToAQL(vertexExample, Object.keys(this.__vertexCollections), bindVars, "start")} - FOR v, e IN ${options.minDepth || 1}..${options.maxDepth || 1} ${options.direction || "ANY"} start ${Array.isArray(options.edgeCollectionRestriction) && options.edgeCollectionRestriction.length > 0 ? buildEdgeCollectionRestriction(options.edgeCollectionRestriction) : "GRAPH @graphName"} OPTIONS {bfs: true} + FOR v, e IN ${options.minDepth || 1}..${options.maxDepth || 1} ${options.direction || "ANY"} start + ${buildEdgeCollectionRestriction(options.edgeCollectionRestriction, bindVars, this)} + OPTIONS {bfs: true} ${buildFilter(options.neighborExamples, bindVars, "v")} ${buildFilter(options.edgeExamples, bindVars, "e")} ${Array.isArray(options.vertexCollectionRestriction) && options.vertexCollectionRestriction.length > 0 ? buildVertexCollectionRestriction(options.vertexCollectionRestriction,"v") : ""} RETURN DISTINCT ${options.includeData === true ? "v" : "v._id"}`; - if (!Array.isArray(options.edgeCollectionRestriction) || options.edgeCollectionRestriction.length === 0) { - bindVars.graphName = this.__name; - } return db._query(query, bindVars).toArray(); }; @@ -1343,13 +1333,15 @@ Graph.prototype._commonNeighbors = function (vertex1Example, vertex2Example, opt var query = ` ${transformExampleToAQL(vertex1Example, Object.keys(this.__vertexCollections), bindVars, "left")} LET leftNeighbors = (FOR v IN ${optionsVertex1.minDepth || 1}..${optionsVertex1.maxDepth || 1} ${optionsVertex1.direction || "ANY"} left - GRAPH @graphName OPTIONS {bfs: true, uniqueVertices: "global"} + ${buildEdgeCollectionRestriction(optionsVertex1.edgeCollectionRestriction, bindVars, this)} + OPTIONS {bfs: true, uniqueVertices: "global"} ${Array.isArray(optionsVertex1.vertexCollectionRestriction) && optionsVertex1.vertexCollectionRestriction.length > 0 ? buildVertexCollectionRestriction(optionsVertex1.vertexCollectionRestriction,"v") : ""} RETURN v) ${transformExampleToAQL(vertex2Example, Object.keys(this.__vertexCollections), bindVars, "right")} FILTER right != left LET rightNeighbors = (FOR v IN ${optionsVertex2.minDepth || 1}..${optionsVertex2.maxDepth || 1} ${optionsVertex2.direction || "ANY"} right - GRAPH @graphName OPTIONS {bfs: true, uniqueVertices: "global"} + ${buildEdgeCollectionRestriction(optionsVertex2.edgeCollectionRestriction, bindVars, this)} + OPTIONS {bfs: true, uniqueVertices: "global"} ${Array.isArray(optionsVertex2.vertexCollectionRestriction) && optionsVertex2.vertexCollectionRestriction.length > 0 ? buildVertexCollectionRestriction(optionsVertex2.vertexCollectionRestriction,"v") : ""} RETURN v) LET neighbors = INTERSECTION(leftNeighbors, rightNeighbors) @@ -1359,7 +1351,6 @@ Graph.prototype._commonNeighbors = function (vertex1Example, vertex2Example, opt } else { query += `RETURN {left : left._id, right: right._id, neighbors: neighbors[*]._id}`; } - bindVars.graphName = this.__name; return db._query(query, bindVars).toArray(); };