1
0
Fork 0

Merge branch 'devel' of https://github.com/arangodb/arangodb into generic-col-types

This commit is contained in:
jsteemann 2016-09-06 14:40:51 +02:00
commit b718f67ff6
12 changed files with 299 additions and 17 deletions

View File

@ -0,0 +1,50 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Kaveh Vahedipour
////////////////////////////////////////////////////////////////////////////////
#include "ActivationCallback.h"
#include "Agent.h"
using namespace arangodb::consensus;
using namespace arangodb::velocypack;
ActivationCallback::ActivationCallback() : _agent(0), _last(0) {}
ActivationCallback::ActivationCallback(Agent* agent, std::string const& slaveID,
index_t last)
: _agent(agent), _last(last), _slaveID(slaveID) {}
void ActivationCallback::shutdown() { _agent = 0; }
bool ActivationCallback::operator()(arangodb::ClusterCommResult* res) {
if (res->status == CL_COMM_SENT) {
if (_agent) {
_agent->reportIn(_slaveID, _last);
}
} else {
LOG_TOPIC(DEBUG, Logger::AGENCY) << "comm_status(" << res->status
<< "), last(" << _last << "), follower("
<< _slaveID << ")";
}
return true;
}

View File

@ -0,0 +1,53 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Kaveh Vahedipour
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGOD_CONSENSUS_ACTIVATION_CALLBACK_H
#define ARANGOD_CONSENSUS_ACTIVATION_CALLBACK_H 1
#include "Agency/AgencyCommon.h"
#include "Cluster/ClusterComm.h"
namespace arangodb {
namespace consensus {
class Agent;
class ActivationCallback : public arangodb::ClusterCommCallback {
public:
ActivationCallback();
ActivationCallback(Agent*, std::string const&, index_t);
virtual bool operator()(arangodb::ClusterCommResult*) override final;
void shutdown();
private:
Agent* _agent;
index_t _last;
std::string _slaveID;
};
}
} // namespace
#endif

View File

@ -101,6 +101,10 @@ struct log_t {
}
};
static std::string const pubApiPrefix = "/api/agency/";
static std::string const privApiPrefix = "/api/agency_priv/";
/// @brief Private RPC return type
struct priv_rpc_ret_t {
bool success;

View File

@ -113,6 +113,12 @@ std::string Agent::leaderID() const { return _constituent.leaderID(); }
/// Are we leading?
bool Agent::leading() const { return _constituent.leading(); }
/// Activate a standby agent
bool Agent::activateStandbyAgent() {
return true;
}
/// Start constituent personality
void Agent::startConstituent() {
activateAgency();

View File

@ -25,6 +25,7 @@
#define ARANGOD_CONSENSUS_AGENT_H 1
#include "Agency/AgencyCommon.h"
#include "Agency/AgentActivator.h"
#include "Agency/AgentCallback.h"
#include "Agency/AgentConfiguration.h"
#include "Agency/Constituent.h"
@ -117,9 +118,8 @@ class Agent : public arangodb::Thread {
/// @brief Gossip in
bool activeAgency();
/// @brief Startup process of detection of agent pool, active agency, gossip
/// etc
// void inception();
/// @brief Gossip in
bool activeStandbyAgent();
/// @brief Start orderly shutdown of threads
void beginShutdown() override final;
@ -170,6 +170,9 @@ class Agent : public arangodb::Thread {
/// @brief Activate this agent in single agent mode.
bool activateAgency();
/// @brief Activate new agent in pool to replace failed agent
bool activateStandbyAgent();
/// @brief Assignment of persisted state
Agent& operator=(VPackSlice const&);
@ -236,6 +239,7 @@ class Agent : public arangodb::Thread {
std::map<std::string, bool> _gossipTmp;
std::unique_ptr<Inception> _inception;
std::unique_ptr<AgentActivator> _activator;
};
}
}

View File

@ -0,0 +1,72 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Kaveh Vahedipour
////////////////////////////////////////////////////////////////////////////////
#include "AgentActivator.h"
#include "Agency/Agent.h"
#include "Agency/ActivationCallback.h"
#include "Basics/ConditionLocker.h"
#include <chrono>
#include <thread>
using namespace arangodb::consensus;
AgentActivator::AgentActivator() : Thread("AgentActivator"), _agent(nullptr) {}
AgentActivator::AgentActivator(Agent* agent, std::string const& peerId)
: Thread("AgentActivator"), _agent(agent), _peerId(peerId) {}
// Shutdown if not already
AgentActivator::~AgentActivator() { shutdown(); }
void AgentActivator::beginShutdown() { Thread::beginShutdown(); }
bool AgentActivator::start() { return Thread::start(); }
void AgentActivator::run() {
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Starting activation of " << _peerId;
std::string const path = privApiPrefix + "activate";
while (!this->isStopping()) {
auto const& pool = _agent->config().pool();
Builder builder;
size_t highest = 0;
auto headerFields =
std::make_unique<std::unordered_map<std::string, std::string>>();
arangodb::ClusterComm::instance()->asyncRequest(
"1", 1, pool.at(_peerId), GeneralRequest::RequestType::POST,
path, std::make_shared<std::string>(builder.toJson()), headerFields,
std::make_shared<ActivationCallback>(_agent, _peerId, highest), 5.0, true,
1.0);
}
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Done activating " << _peerId;
}

View File

@ -0,0 +1,62 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Kaveh Vahedipour
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGOD_CONSENSUS_AGENT_ACTIVATOR_H
#define ARANGOD_CONSENSUS_AGENT_ACTIVATOR_H 1
#include <memory>
#include "Basics/Common.h"
#include "Basics/ConditionVariable.h"
#include "Basics/Thread.h"
#include <velocypack/Builder.h>
#include <velocypack/velocypack-aliases.h>
namespace arangodb {
namespace consensus {
class Agent;
class AgentActivator : public Thread {
public:
AgentActivator();
AgentActivator(Agent*, std::string const&);
virtual ~AgentActivator();
void run() override;
bool start();
/// @brief Orderly shutdown of thread
void beginShutdown() override;
private:
Agent* _agent;
std::string _peerId;
};
}
}
#endif

View File

@ -463,6 +463,7 @@ bool State::loadCompacted() {
return true;
}
/// Load persisted configuration
bool State::loadOrPersistConfiguration() {
auto bindVars = std::make_shared<VPackBuilder>();

View File

@ -100,15 +100,17 @@ class State {
return os;
}
/// @brief compact state machine
bool compact(arangodb::consensus::index_t cind);
/// @brief Remove RAFT conflicts. i.e. All indices, where higher term version
/// exists are overwritten
size_t removeConflicts(query_t const&);
/// @brief Persist active agency in pool
bool persistActiveAgents(query_t const& active, query_t const& pool);
private:
bool snapshot();
/// @brief Save currentTerm, votedFor, log entries
bool persist(index_t index, term_t term,
arangodb::velocypack::Slice const& entry);
@ -131,13 +133,22 @@ class State {
/// @brief Create collection
bool createCollection(std::string const& name);
/// @brief Compact persisted logs
bool compactPersisted(arangodb::consensus::index_t cind);
/// @brief Compact RAM logs
bool compactVolatile(arangodb::consensus::index_t cind);
/// @brief Remove obsolete logs
bool removeObsolete(arangodb::consensus::index_t cind);
/// @brief Persist read database
bool persistReadDB(arangodb::consensus::index_t cind);
/// @brief Our agent
Agent* _agent;
/// @brief Our vocbase
TRI_vocbase_t* _vocbase;
mutable arangodb::Mutex _logLock; /**< @brief Mutex for modifying _log */
@ -146,11 +157,16 @@ class State {
bool _collectionsChecked; /**< @brief Collections checked */
bool _collectionsLoaded;
/// @brief Our query registry
aql::QueryRegistry* _queryRegistry;
/// @brief Compaction step
size_t _compaction_step;
/// @brief Current log offset
size_t _cur;
/// @brief Operation options
OperationOptions _options;
};
}

View File

@ -76,8 +76,10 @@ add_executable(${BIN_ARANGOD}
Actions/ActionFeature.cpp
Actions/RestActionHandler.cpp
Actions/actions.cpp
Agency/ActivationCallback.cpp
Agency/AgencyFeature.cpp
Agency/Agent.cpp
Agency/AgentActivator.cpp
Agency/AgentCallback.cpp
Agency/AgentConfiguration.cpp
Agency/Constituent.cpp

View File

@ -49,8 +49,6 @@ function agencyTestSuite () {
var whoseTurn = 0;
var request = require("@arangodb/request");
wait(3.0);
function readAgency(list) {
// We simply try all agency servers in turn until one gives us an HTTP
// response:
@ -99,6 +97,24 @@ function agencyTestSuite () {
tearDown : function () {
},
////////////////////////////////////////////////////////////////////////////////
/// @brief tear down
////////////////////////////////////////////////////////////////////////////////
testStartup : function () {
while (true) {
var res = request({
url: agencyServers[whoseTurn] + "/_api/agency/config",
method: "GET"
});
res.bodyParsed = JSON.parse(res.body);
if (res.bodyParsed.leaderId != "") {
break;
}
wait(0.1);
}
},
////////////////////////////////////////////////////////////////////////////////
/// @brief test to write a single top level key
////////////////////////////////////////////////////////////////////////////////
@ -269,12 +285,12 @@ function agencyTestSuite () {
assertEqual(readAndCheck([["a/z"]]), [{"a":{"z":12}}]);
writeAndCheck([[{"a/y":{"op":"set","new":12, "ttl": 1}}]]);
assertEqual(readAndCheck([["a/y"]]), [{"a":{"y":12}}]);
wait(1.50);
wait(2.0);
assertEqual(readAndCheck([["a/y"]]), [{a:{}}]);
writeAndCheck([[{"a/y":{"op":"set","new":12, "ttl": 1}}]]);
writeAndCheck([[{"a/y":{"op":"set","new":12}}]]);
assertEqual(readAndCheck([["a/y"]]), [{"a":{"y":12}}]);
wait(1.50);
wait(2.0);
assertEqual(readAndCheck([["a/y"]]), [{"a":{"y":12}}]);
writeAndCheck([[{"foo/bar":{"op":"set","new":{"baz":12}}}]]);
assertEqual(readAndCheck([["/foo/bar/baz"]]),
@ -282,7 +298,7 @@ function agencyTestSuite () {
assertEqual(readAndCheck([["/foo/bar"]]), [{"foo":{"bar":{"baz":12}}}]);
assertEqual(readAndCheck([["/foo"]]), [{"foo":{"bar":{"baz":12}}}]);
writeAndCheck([[{"foo/bar":{"op":"set","new":{"baz":12},"ttl":1}}]]);
wait(1.50);
wait(2.0);
assertEqual(readAndCheck([["/foo"]]), [{"foo":{}}]);
assertEqual(readAndCheck([["/foo/bar"]]), [{"foo":{}}]);
assertEqual(readAndCheck([["/foo/bar/baz"]]), [{"foo":{}}]);
@ -596,14 +612,8 @@ function agencyTestSuite () {
var res = writeAgency([[{"/bumms":{"op":"set","new":"fallera"}, "/bummsfallera": {"op":"set","new":"lalalala"}}]]);
assertEqual(res.statusCode, 200);
assertEqual(readAndCheck([["/bumms", "/bummsfallera"]]), [{bumms:"fallera", bummsfallera: "lalalala"}]);
},
testThousand: function() {
var i;
for (i = 0; i < 1000; i++) {
writeAndCheck([[{x:12}]]);
}
}
};
}

View File

@ -133,6 +133,7 @@ class Logger {
static LogTopic COMPACTOR;
static LogTopic COMMUNICATION;
static LogTopic CONFIG;
static LogTopic CLUSTER;
static LogTopic DATAFILES;
static LogTopic HEARTBEAT;
static LogTopic MMAP;
@ -141,6 +142,7 @@ class Logger {
static LogTopic REPLICATION;
static LogTopic REQUESTS;
static LogTopic STARTUP;
static LogTopic SUPERVISION;
static LogTopic THREADS;
static LogTopic V8;