1
0
Fork 0

Merge branch 'devel' of github.com:arangodb/arangodb into devel

This commit is contained in:
hkernbach 2016-06-14 15:09:32 +02:00
commit dbf3dafdde
7 changed files with 172 additions and 73 deletions

View File

@ -372,7 +372,7 @@ write_ret_t Agent::write(query_t const& query) {
std::vector<bool> applied;
std::vector<index_t> indices;
index_t maxind = 0;
// Only leader else redirect
if (!_constituent.leading()) {
return write_ret_t(false, _constituent.leaderID());

View File

@ -79,13 +79,10 @@ Constituent::Constituent()
_term(0),
_leaderID((std::numeric_limits<arangodb::consensus::id_t>::max)()),
_id(0),
// XXX #warning KAVEH use RandomGenerator
_gen(std::random_device()()),
_role(FOLLOWER),
_agent(nullptr),
_votedFor((std::numeric_limits<arangodb::consensus::id_t>::max)()),
_notifier(nullptr) {
_gen.seed(RandomGenerator::interval(UINT32_MAX));
}
@ -109,8 +106,9 @@ bool Constituent::waitForSync() const {
/// Random sleep times in election process
duration_t Constituent::sleepFor(double min_t, double max_t) {
dist_t dis(min_t, max_t);
return duration_t((long)std::round(dis(_gen) * 1000.0));
int32_t left = 1000*min_t, right = 1000*max_t;
return duration_t(
static_cast<long>(RandomGenerator::interval(left, right)));
}
@ -483,8 +481,8 @@ void Constituent::run() {
_cast = false; // New round set not cast vote
}
dist_t dis(config().minPing, config().maxPing);
long rand_wait = static_cast<long>(dis(_gen) * 1000000.0);
int32_t left = 1000000*config().minPing, right = 1000000*config().maxPing;
long rand_wait = static_cast<long>(RandomGenerator::interval(left, right));
{
CONDITION_LOCKER(guardv, _cv);

View File

@ -24,8 +24,6 @@
#ifndef ARANGOD_CONSENSUS_CONSTITUENT_H
#define ARANGOD_CONSENSUS_CONSTITUENT_H 1
#include <random>
#include "AgencyCommon.h"
#include "AgentConfiguration.h"
#include "NotifierThread.h"
@ -48,8 +46,6 @@ class Agent;
/// @brief RAFT leader election
class Constituent : public arangodb::Thread {
public:
/// @brief Distribution type
typedef std::uniform_real_distribution<double> dist_t;
/// @brief Default ctor
Constituent();
@ -140,7 +136,6 @@ class Constituent : public arangodb::Thread {
arangodb::consensus::id_t _leaderID; /**< @brief Current leader */
arangodb::consensus::id_t _id; /**< @brief My own id */
std::mt19937 _gen; /**< @brief Random number generator */
role_t _role; /**< @brief My role */
Agent* _agent; /**< @brief My boss */
arangodb::consensus::id_t _votedFor;

View File

@ -484,7 +484,13 @@ bool Node::applieOp(VPackSlice const& slice) {
std::string oper = slice.get("op").copyString();
if (oper == "delete") {
return _parent->removeChild(_node_name);
if (_parent == nullptr) { // root node
_children.clear();
_value.clear();
return true;
} else {
return _parent->removeChild(_node_name);
}
} else if (oper == "set") { // "op":"set"
return handle<SET>(slice);
} else if (oper == "increment") { // "op":"increment"
@ -513,7 +519,7 @@ bool Node::applieOp(VPackSlice const& slice) {
// Apply slice to this node
bool Node::applies(VPackSlice const& slice) {
if (slice.isObject()) {
// Object is special case json

View File

@ -119,6 +119,26 @@ HttpHandler::status_t RestAgencyHandler::handleWrite() {
return HttpHandler::status_t(HANDLER_DONE);
}
if (!query->slice().isArray()) {
Builder body;
body.openObject();
body.add("message",
VPackValue("Excpecting array of arrays as outermost structure"));
body.close();
generateResult(GeneralResponse::ResponseCode::BAD, body.slice());
return HttpHandler::status_t(HANDLER_DONE);
}
if (query->slice().length() == 0) {
Builder body;
body.openObject();
body.add(
"message", VPackValue("Empty request."));
body.close();
generateResult(GeneralResponse::ResponseCode::BAD, body.slice());
return HttpHandler::status_t(HANDLER_DONE);
}
write_ret_t ret = _agent->write(query);
if (ret.accepted) { // We're leading and handling the request
@ -177,12 +197,6 @@ HttpHandler::status_t RestAgencyHandler::handleWrite() {
return HttpHandler::status_t(HANDLER_DONE);
}
/*inline HttpHandler::status_t RestAgencyHandler::handleReplicate () {
if (_request->requestType() == GeneralRequest::RequestType::POST) {
}
}*/
inline HttpHandler::status_t RestAgencyHandler::handleRead() {
arangodb::velocypack::Options options;
if (_request->requestType() == GeneralRequest::RequestType::POST) {

View File

@ -151,28 +151,33 @@ std::vector<bool> Store::apply(query_t const& query) {
std::vector<bool> applied;
MUTEX_LOCKER(storeLocker, _storeLock);
for (auto const& i : VPackArrayIterator(query->slice())) {
switch (i.length()) {
case 1:
applied.push_back(applies(i[0]));
break; // no precond
case 2:
if (check(i[1])) { // precondition
try {
for (auto const& i : VPackArrayIterator(query->slice())) {
switch (i.length()) {
case 1:
applied.push_back(applies(i[0]));
} else { // precondition failed
LOG_TOPIC(TRACE, Logger::AGENCY) << "Precondition failed!";
break; // no precond
case 2:
if (check(i[1])) { // precondition
applied.push_back(applies(i[0]));
} else { // precondition failed
LOG_TOPIC(TRACE, Logger::AGENCY) << "Precondition failed!";
applied.push_back(false);
}
break;
default: // wrong
LOG_TOPIC(ERR, Logger::AGENCY)
<< "We can only handle log entry with or without precondition!";
applied.push_back(false);
break;
}
break;
default: // wrong
LOG_TOPIC(ERR, Logger::AGENCY)
<< "We can only handle log entry with or without precondition!";
applied.push_back(false);
break;
}
}
_cv.signal();
} catch (std::exception const& e) {
LOG_TOPIC(ERR, Logger::AGENCY)
<< __FILE__ << ":" << __LINE__ << " " << e.what();
}
return applied;
}

View File

@ -189,7 +189,7 @@ function agencyTestSuite () {
////////////////////////////////////////////////////////////////////////////////
/// @brief test a document
/// @brief test document/transaction assignment
////////////////////////////////////////////////////////////////////////////////
testDocument : function () {
@ -198,6 +198,10 @@ function agencyTestSuite () {
[{a:{e:12}},{a:{b:{c:[1,2,3]},d:false}}]);
},
////////////////////////////////////////////////////////////////////////////////
/// @brief test multiple transaction
////////////////////////////////////////////////////////////////////////////////
testTransaction : function () {
writeAndCheck([[{"a":{"b":{"c":[1,2,4]},"e":12},"d":false}],
[{"a":{"b":{"c":[1,2,3]}}}]]);
@ -205,6 +209,10 @@ function agencyTestSuite () {
[{a:{e:12}},{a:{b:{c:[1,2,3]},d:false}}]);
},
////////////////////////////////////////////////////////////////////////////////
/// @brief Test "new" operator
////////////////////////////////////////////////////////////////////////////////
testOpSetNew : function () {
writeAndCheck([[{"a/z":{"op":"set","new":12}}]]);
assertEqual(readAndCheck([["a/z"]]), [{"a":{"z":12}}]);
@ -228,6 +236,10 @@ function agencyTestSuite () {
assertEqual(readAndCheck([["/foo/bar/baz"]]), [{"foo":{}}]);
},
////////////////////////////////////////////////////////////////////////////////
/// @brief Test "push" operator
////////////////////////////////////////////////////////////////////////////////
testOpPush : function () {
writeAndCheck([[{"/a/b/c":{"op":"push","new":"max"}}]]);
assertEqual(readAndCheck([["/a/b/c"]]), [{a:{b:{c:[1,2,3,"max"]}}}]);
@ -242,11 +254,19 @@ function agencyTestSuite () {
[{a:{euler:[2.71828182845904523536]}}]);
},
////////////////////////////////////////////////////////////////////////////////
/// @brief Test "remove" operator
////////////////////////////////////////////////////////////////////////////////
testOpRemove : function () {
writeAndCheck([[{"/a/euler":{"op":"delete"}}]]);
assertEqual(readAndCheck([["/a/euler"]]), [{a:{}}]);
},
////////////////////////////////////////////////////////////////////////////////
/// @brief Test "prepend" operator
////////////////////////////////////////////////////////////////////////////////
testOpPrepend : function () {
writeAndCheck([[{"/a/b/c":{"op":"prepend","new":3.141592653589793}}]]);
assertEqual(readAndCheck([["/a/b/c"]]),
@ -268,6 +288,10 @@ function agencyTestSuite () {
[{a:{euler:[1.25,2.71828182845904523536]}}]);
},
////////////////////////////////////////////////////////////////////////////////
/// @brief Test "shift" operator
////////////////////////////////////////////////////////////////////////////////
testOpShift : function () {
writeAndCheck([[{"/a/f":{"op":"shift"}}]]); // none before
assertEqual(readAndCheck([["/a/f"]]), [{a:{f:[]}}]);
@ -279,6 +303,10 @@ function agencyTestSuite () {
assertEqual(readAndCheck([["/a/b/d"]]), [{a:{b:{d:[]}}}]);
},
////////////////////////////////////////////////////////////////////////////////
/// @brief Test "pop" operator
////////////////////////////////////////////////////////////////////////////////
testOpPop : function () {
writeAndCheck([[{"/a/f":{"op":"pop"}}]]); // none before
assertEqual(readAndCheck([["/a/f"]]), [{a:{f:[]}}]);
@ -291,6 +319,10 @@ function agencyTestSuite () {
assertEqual(readAndCheck([["/a/b/d"]]), [{a:{b:{d:[]}}}]);
},
////////////////////////////////////////////////////////////////////////////////
/// @brief Test "increment" operator
////////////////////////////////////////////////////////////////////////////////
testOpIncrement : function () {
writeAndCheck([[{"/version":{"op":"delete"}}]]);
writeAndCheck([[{"/version":{"op":"increment"}}]]); // none before
@ -299,6 +331,10 @@ function agencyTestSuite () {
assertEqual(readAndCheck([["version"]]), [{version:2}]);
},
////////////////////////////////////////////////////////////////////////////////
/// @brief Test "decrement" operator
////////////////////////////////////////////////////////////////////////////////
testOpDecrement : function () {
writeAndCheck([[{"/version":{"op":"delete"}}]]);
writeAndCheck([[{"/version":{"op":"decrement"}}]]); // none before
@ -307,6 +343,10 @@ function agencyTestSuite () {
assertEqual(readAndCheck([["version"]]), [{version:-2}]);
},
////////////////////////////////////////////////////////////////////////////////
/// @brief Test "op" keyword in other places than as operator
////////////////////////////////////////////////////////////////////////////////
testOpInStrangePlaces : function () {
writeAndCheck([[{"/op":12}]]);
assertEqual(readAndCheck([["/op"]]), [{op:12}]);
@ -334,45 +374,86 @@ function agencyTestSuite () {
writeAndCheck([[{"/op/a/b/d/ttl":{"op":"decrement"}}]]);
assertEqual(readAndCheck([["/op/a/b/d"]]), [{op:{a:{b:{d:{ttl:14}}}}}]);
},
testInit: function() {
// mop: wait until leader has been determined
let leaderEndpoint;
let res;
if (agencyServers.length > 1) {
let leaderTests = 0;
while (true) {
res = request.get(agencyServers[whoseTurn] + "/_api/agency/config");
var config = JSON.parse(res.body);
////////////////////////////////////////////////////////////////////////////////
/// @brief op delete on top node
////////////////////////////////////////////////////////////////////////////////
// mop: leader election still in progress
if (config.leaderId > 127) {
if (leaderTests++ > 10) {
throw new Error("Agency doesn't report a valid leader after 10 checks. Bailing out!");
}
} else {
leaderEndpoint = config.configuration.endpoints[config.leaderId].replace("tcp", "http");
break;
}
wait(0.05);
}
} else {
leaderEndpoint = agencyServers[0].replace("tcp", "http");
}
testOperatorsOnRootNode : function () {
writeAndCheck([[{"/":{"op":"delete"}}]]);
assertEqual(readAndCheck([["/"]]), [{}]);
writeAndCheck([[{"/":{"op":"increment"}}]]);
assertEqual(readAndCheck([["/"]]), [1]);
writeAndCheck([[{"/":{"op":"delete"}}]]);
writeAndCheck([[{"/":{"op":"decrement"}}]]);
assertEqual(readAndCheck([["/"]]), [-1]);
writeAndCheck([[{"/":{"op":"push","new":"Hello"}}]]);
assertEqual(readAndCheck([["/"]]), [["Hello"]]);
writeAndCheck([[{"/":{"op":"delete"}}]]);
writeAndCheck([[{"/":{"op":"push","new":"Hello"}}]]);
assertEqual(readAndCheck([["/"]]), [["Hello"]]);
writeAndCheck([[{"/":{"op":"pop"}}]]);
assertEqual(readAndCheck([["/"]]), [[]]);
writeAndCheck([[{"/":{"op":"pop"}}]]);
assertEqual(readAndCheck([["/"]]), [[]]);
writeAndCheck([[{"/":{"op":"push","new":"Hello"}}]]);
assertEqual(readAndCheck([["/"]]), [["Hello"]]);
writeAndCheck([[{"/":{"op":"shift"}}]]);
assertEqual(readAndCheck([["/"]]), [[]]);
writeAndCheck([[{"/":{"op":"shift"}}]]);
assertEqual(readAndCheck([["/"]]), [[]]);
writeAndCheck([[{"/":{"op":"prepend","new":"Hello"}}]]);
assertEqual(readAndCheck([["/"]]), [["Hello"]]);
writeAndCheck([[{"/":{"op":"shift"}}]]);
assertEqual(readAndCheck([["/"]]), [[]]);
writeAndCheck([[{"/":{"op":"pop"}}]]);
assertEqual(readAndCheck([["/"]]), [[]]);
writeAndCheck([[{"/":{"op":"delete"}}]]);
assertEqual(readAndCheck([["/"]]), [{}]);
writeAndCheck([[{"/":{"op":"delete"}}]]);
assertEqual(readAndCheck([["/"]]), [{}]);
},
var requests = [
["/_api/agency/write", [[{"/arango/Plan/DBServers/DBServer001":{"new":"none","op":"set"}}]]],
["/_api/agency/read", [["/arango/Plan/DBServers"]]],
];
requests.forEach(requestStruct => {
res = request.post(leaderEndpoint + requestStruct[0], {body: JSON.stringify(requestStruct[1]), headers: {"Content-Type": "application/json"}});
assertEqual(res.statusCode, 200);
});
////////////////////////////////////////////////////////////////////////////////
/// @brief Test that order should not matter
////////////////////////////////////////////////////////////////////////////////
assertEqual(res.body, JSON.stringify([{"arango":{"Plan":{"DBServers":{"DBServer001":"none"}}}}]));
testOrder : function () {
writeAndCheck([[{"a":{"b":{"c":[1,2,3]},"e":12},"d":false}]]);
assertEqual(readAndCheck([["a/e"],[ "d","a/b"]]),
[{a:{e:12}},{a:{b:{c:[1,2,3]},d:false}}]);
writeAndCheck([[{"/":{"op":"delete"}}]]);
writeAndCheck([[{"d":false, "a":{"b":{"c":[1,2,3]},"e":12}}]]);
assertEqual(readAndCheck([["a/e"],[ "d","a/b"]]),
[{a:{e:12}},{a:{b:{c:[1,2,3]},d:false}}]);
writeAndCheck([[{"d":false, "a":{"e":12,"b":{"c":[1,2,3]}}}]]);
assertEqual(readAndCheck([["a/e"],[ "d","a/b"]]),
[{a:{e:12}},{a:{b:{c:[1,2,3]},d:false}}]);
writeAndCheck([[{"d":false, "a":{"e":12,"b":{"c":[1,2,3]}}}]]);
assertEqual(readAndCheck([["a/e"],["a/b","d"]]),
[{a:{e:12}},{a:{b:{c:[1,2,3]},d:false}}]);
},
////////////////////////////////////////////////////////////////////////////////
/// @brief Test nasty willful attempt to break
////////////////////////////////////////////////////////////////////////////////
testOrder : function () {
writeAndCheck([[{"a":{"b":{"c":[1,2,3]},"e":12},"d":false}]]);
assertEqual(readAndCheck([["a/e"],[ "d","a/b"]]),
[{a:{e:12}},{a:{b:{c:[1,2,3]},d:false}}]);
writeAndCheck([[{"/":{"op":"delete"}}]]);
writeAndCheck([[{"d":false, "a":{"b":{"c":[1,2,3]},"e":12}}]]);
assertEqual(readAndCheck([["a/e"],[ "d","a/b"]]),
[{a:{e:12}},{a:{b:{c:[1,2,3]},d:false}}]);
writeAndCheck([[{"d":false, "a":{"e":12,"b":{"c":[1,2,3]}}}]]);
assertEqual(readAndCheck([["a/e"],[ "d","a/b"]]),
[{a:{e:12}},{a:{b:{c:[1,2,3]},d:false}}]);
writeAndCheck([[{"d":false, "a":{"e":12,"b":{"c":[1,2,3]}}}]]);
assertEqual(readAndCheck([["a/e"],["a/b","d"]]),
[{a:{e:12}},{a:{b:{c:[1,2,3]},d:false}}]);
}
};
}