1
0
Fork 0

inquire interface and clientids

This commit is contained in:
Kaveh Vahedipour 2017-01-17 17:33:12 +01:00
parent d6adaad61d
commit 54dbf0a814
8 changed files with 242 additions and 46 deletions

View File

@ -84,22 +84,35 @@ struct trans_ret_t {
failed(f), result(res) {}
};
struct inquire_ret_t {
bool accepted; // Query accepted (i.e. we are leader)
std::string redirect; // If not accepted redirect id
query_t result;
inquire_ret_t() : accepted(false), redirect("") {}
inquire_ret_t(bool a, std::string const& id) : accepted(a), redirect(id){}
inquire_ret_t(bool a, std::string const& id, query_t const& res) :
accepted(a), redirect(id), result(res) {}
};
struct log_t {
index_t index; // Log index
term_t term; // Log term
buffer_t entry; // To log
std::string clientId; // Client ID
std::chrono::milliseconds timestamp; // Timestamp
log_t(index_t idx, term_t t, buffer_t const& e)
log_t(index_t idx, term_t t, buffer_t const& e,
std::string const& clientId = std::string())
: index(idx),
term(t),
entry(e),
clientId(clientId),
timestamp(std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())) {}
friend std::ostream& operator<<(std::ostream& o, log_t const& l) {
o << l.index << " " << l.term << " " << VPackSlice(l.entry->data()).toJson()
<< " " << l.timestamp.count();
<< " " << " " << l.clientId << " "<< l.timestamp.count();
return o;
}
};

View File

@ -43,16 +43,16 @@ namespace consensus {
/// Agent configuration
Agent::Agent(config_t const& config)
: Thread("Agent"),
_config(config),
_lastCommitIndex(0),
_spearhead(this),
_readDB(this),
_vacillant(this),
_nextCompationAfter(_config.compactionStepSize()),
_inception(std::make_unique<Inception>(this)),
_activator(nullptr),
_ready(false) {
: Thread("Agent"),
_config(config),
_lastCommitIndex(0),
_spearhead(this),
_readDB(this),
_transient(this),
_nextCompationAfter(_config.compactionStepSize()),
_inception(std::make_unique<Inception>(this)),
_activator(nullptr),
_ready(false) {
_state.configure(this);
_constituent.configure(this);
}
@ -368,6 +368,7 @@ void Agent::sendAppendEntriesRPC() {
builder.add("index", VPackValue(entry.index));
builder.add("term", VPackValue(entry.term));
builder.add("query", VPackSlice(entry.entry->data()));
builder.add("clientId", VPackValue(entry.clientId));
builder.close();
highest = entry.index;
}
@ -411,6 +412,7 @@ bool Agent::active() const {
return (find(active.begin(), active.end(), id()) != active.end());
}
// Activate with everything I need to know
query_t Agent::activate(query_t const& everything) {
@ -649,8 +651,38 @@ trans_ret_t Agent::transact(query_t const& queries) {
// Non-persistent write to non-persisted key-value store
write_ret_t Agent::vacillant(query_t const& query) {
write_ret_t Agent::transient(query_t const& query) {
write_ret_t ret;
return ret;
}
inquire_ret_t Agent::inquire(query_t const& query) {
inquire_ret_t ret;
auto leader = _constituent.leaderID();
if (leader != id()) {
return inquire_ret_t(false, leader);
}
MUTEX_LOCKER(mutexLocker, _ioLock);
auto si = _state.inquire(query);
auto builder = std::make_shared<VPackBuilder>();
{
VPackArrayBuilder b(builder.get());
for (auto const& i : si) {
VPackObjectBuilder bb(builder.get());
builder->add("index", VPackValue(i.index));
builder->add("term", VPackValue(i.term));
builder->add("query", VPackSlice(i.entry->data()));
builder->add("index", VPackValue(i.index));
}
}
ret.result = builder;
return ret;
}

View File

@ -94,7 +94,7 @@ class Agent : public arangodb::Thread {
bool load();
/// @brief Unpersisted key-value-store
write_ret_t vacillant(query_t const&);
write_ret_t transient(query_t const&);
/// @brief Attempt write
write_ret_t write(query_t const&);
@ -102,6 +102,9 @@ class Agent : public arangodb::Thread {
/// @brief Read from agency
read_ret_t read(query_t const&);
/// @brief Inquire success of logs given clientIds
inquire_ret_t inquire(query_t const&);
/// @brief Attempt read/write transaction
trans_ret_t transact(query_t const&);
@ -263,7 +266,7 @@ class Agent : public arangodb::Thread {
Store _readDB;
/// @brief Committed (read) kv-store
Store _vacillant;
Store _transient;
/// @brief Condition variable for appendEntries
arangodb::basics::ConditionVariable _appendCV;

View File

@ -86,7 +86,7 @@ void RestAgencyHandler::redirectRequest(std::string const& leaderId) {
}
}
RestStatus RestAgencyHandler::handleVacillant() {
RestStatus RestAgencyHandler::handleTransient() {
// Must be a POST request
if (_request->requestType() != rest::RequestType::POST) {
@ -134,7 +134,7 @@ RestStatus RestAgencyHandler::handleVacillant() {
write_ret_t ret;
try {
ret = _agent->vacillant(query);
ret = _agent->transient(query);
} catch (std::exception const& e) {
Builder body;
body.openObject();
@ -426,6 +426,70 @@ RestStatus RestAgencyHandler::handleTransact() {
}
inline RestStatus RestAgencyHandler::handleInquire() {
if (_request->requestType() != rest::RequestType::POST) {
generateError(rest::ResponseCode::METHOD_NOT_ALLOWED, 405);
}
arangodb::velocypack::Options options;
query_t query;
// Get query from body
try {
query = _request->toVelocyPackBuilderPtr(&options);
} catch (std::exception const& e) {
LOG_TOPIC(DEBUG, Logger::AGENCY)
<< e.what() << " " << __FILE__ << ":" << __LINE__;
generateError(rest::ResponseCode::BAD, 400);
return RestStatus::DONE;
}
// Leadership established?
auto s = std::chrono::system_clock::now();
std::chrono::duration<double> timeout(_agent->config().minPing());
while (_agent->size() > 1 && _agent->leaderID() == NO_LEADER) {
if ((std::chrono::system_clock::now() - s) > timeout) {
Builder body;
body.openObject();
body.add("message", VPackValue("No leader"));
body.close();
generateResult(rest::ResponseCode::SERVICE_UNAVAILABLE, body.slice());
LOG_TOPIC(DEBUG, Logger::AGENCY) << "We don't know who the leader is";
return RestStatus::DONE;
}
std::this_thread::sleep_for(duration_t(100));
}
inquire_ret_t ret = _agent->inquire(query);
if (ret.accepted) { // I am leading
generateResult(rest::ResponseCode::OK, ret.result->slice());
} else { // Redirect to leader
if (_agent->leaderID() == NO_LEADER) {
Builder body;
body.openObject();
body.add("message", VPackValue("No leader"));
body.close();
generateResult(rest::ResponseCode::SERVICE_UNAVAILABLE, body.slice());
LOG_TOPIC(DEBUG, Logger::AGENCY) << "We don't know who the leader is";
return RestStatus::DONE;
} else {
TRI_ASSERT(ret.redirect != _agent->id());
redirectRequest(ret.redirect);
}
return RestStatus::DONE;
}
return RestStatus::DONE;
}
inline RestStatus RestAgencyHandler::handleRead() {
arangodb::velocypack::Options options;
@ -524,6 +588,7 @@ RestStatus RestAgencyHandler::handleState() {
body.add("index", VPackValue(i.index));
body.add("term", VPackValue(i.term));
body.add("query", VPackSlice(i.entry->data()));
body.add("clientId", VPackValue(i.clientId));
body.close();
}
body.close();

View File

@ -53,7 +53,8 @@ class RestAgencyHandler : public RestBaseHandler {
RestStatus handleConfig();
RestStatus reportMethodNotAllowed();
RestStatus handleState();
RestStatus handleVacillant();
RestStatus handleTransient();
RestStatus handleInquire();
void redirectRequest(std::string const& leaderId);
consensus::Agent* _agent;

View File

@ -77,12 +77,14 @@ inline static std::string stringify(arangodb::consensus::index_t index) {
/// Persist one entry
bool State::persist(arangodb::consensus::index_t index, term_t term,
arangodb::velocypack::Slice const& entry) const {
arangodb::velocypack::Slice const& entry,
std::string const& clientId) const {
Builder body;
body.add(VPackValue(VPackValueType::Object));
body.add("_key", Value(stringify(index)));
body.add("term", Value(term));
body.add("request", entry);
body.add("clientId", Value(clientId));
body.close();
TRI_ASSERT(_vocbase != nullptr);
@ -128,14 +130,21 @@ std::vector<arangodb::consensus::index_t> State::log(
MUTEX_LOCKER(mutexLocker, _logLock); // log entries must stay in order
for (auto const& i : VPackArrayIterator(slice)) {
TRI_ASSERT(i.isArray());
std::string clientId;
if (good[j]) {
std::shared_ptr<Buffer<uint8_t>> buf =
std::make_shared<Buffer<uint8_t>>();
buf->append((char const*)i[0].begin(), i[0].byteSize());
LOG(WARN) << i.length();
if (i.length()==3) {
clientId = i[2].copyString();
}
TRI_ASSERT(!_log.empty()); // log must not ever be empty
idx[j] = _log.back().index + 1;
_log.push_back(log_t(idx[j], term, buf)); // log to RAM
persist(idx[j], term, i[0]); // log to disk
_log.push_back(log_t(idx[j], term, buf, clientId)); // log to RAM
_clientIdLookupTable.emplace(
std::pair<std::string, arangodb::consensus::index_t>(clientId, idx[j]));
persist(idx[j], term, i[0], clientId); // log to disk
++j;
}
}
@ -147,6 +156,7 @@ std::vector<arangodb::consensus::index_t> State::log(
arangodb::consensus::index_t State::log(
velocypack::Slice const& slice, term_t term) {
std::string clientId;
arangodb::consensus::index_t idx = 0;
auto buf = std::make_shared<Buffer<uint8_t>>();
@ -154,8 +164,8 @@ arangodb::consensus::index_t State::log(
buf->append((char const*)slice.begin(), slice.byteSize());
TRI_ASSERT(!_log.empty()); // log must not ever be empty
idx = _log.back().index + 1;
_log.push_back(log_t(idx, term, buf)); // log to RAM
persist(idx, term, slice); // log to disk
_log.push_back(log_t(idx, term, buf, clientId)); // log to RAM
persist(idx, term, slice, clientId); // log to disk
return _log.back().index;
@ -173,21 +183,28 @@ arangodb::consensus::index_t State::log(query_t const& transactions,
TRI_ASSERT(nqs > ndups);
MUTEX_LOCKER(mutexLocker, _logLock); // log entries must stay in order
std::string clientId;
for (size_t i = ndups; i < nqs; ++i) {
VPackSlice slice = slices[i];
try {
auto idx = slice.get("index").getUInt();
auto trm = slice.get("term").getUInt();
auto buf = std::make_shared<Buffer<uint8_t>>();
clientId = slice.get("clientId").copyString();
auto buf = std::make_shared<Buffer<uint8_t>>();
buf->append((char const*)slice.get("query").begin(),
slice.get("query").byteSize());
// to RAM
_log.push_back(log_t(idx, trm, buf));
_log.push_back(log_t(idx, trm, buf, clientId));
_clientIdLookupTable.emplace(
std::pair<std::string, arangodb::consensus::index_t>(clientId, idx));
// to disk
persist(idx, trm, slice.get("query"));
persist(idx, trm, slice.get("query"), clientId);
} catch (std::exception const& e) {
LOG_TOPIC(ERR, Logger::AGENCY) << e.what() << " " << __FILE__ << __LINE__;
}
@ -409,7 +426,6 @@ bool State::loadCollections(TRI_vocbase_t* vocbase,
_options.waitForSync = waitForSync;
_options.silent = true;
if (loadPersisted()) {
MUTEX_LOCKER(logLock, _logLock);
if (_log.empty()) {
@ -417,8 +433,9 @@ bool State::loadCollections(TRI_vocbase_t* vocbase,
std::make_shared<Buffer<uint8_t>>();
VPackSlice value = arangodb::basics::VelocyPackHelper::EmptyObjectValue();
buf->append(value.startAs<char const>(), value.byteSize());
_log.push_back(log_t(arangodb::consensus::index_t(0), term_t(0), buf));
persist(0, 0, value);
_log.push_back(
log_t(arangodb::consensus::index_t(0), term_t(0), buf, std::string()));
persist(0, 0, value, std::string());
}
return true;
}
@ -587,16 +604,22 @@ bool State::loadRemaining() {
if (result.isArray()) {
_log.clear();
std::string clientId;
for (auto const& i : VPackArrayIterator(result)) {
buffer_t tmp = std::make_shared<arangodb::velocypack::Buffer<uint8_t>>();
auto ii = i.resolveExternals();
auto req = ii.get("request");
tmp->append(req.startAs<char const>(), req.byteSize());
clientId = req.hasKey("clientId") ?
req.get("clientId").copyString() : std::string();
try {
_log.push_back(
log_t(std::stoi(ii.get(StaticStrings::KeyString).copyString()),
static_cast<term_t>(ii.get("term").getUInt()), tmp));
static_cast<term_t>(ii.get("term").getUInt()), tmp, clientId));
} catch (std::exception const& e) {
LOG_TOPIC(ERR, Logger::AGENCY)
<< "Failed to convert " +
@ -815,3 +838,33 @@ query_t State::allLogs() const {
return everything;
}
std::vector<log_t> State::inquire(query_t const& query) const {
std::vector<log_t> result;
MUTEX_LOCKER(mutexLocker, _logLock); // Cannot be read lock (Compaction)
if (!query->slice().isArray()) {
LOG_TOPIC(WARN, Logger::AGENCY)
<< "Inquire interface handles [<clientIds>]. We got " << query->toJson();
return result;
}
for (auto const& i : VPackArrayIterator(query->slice())) {
if (!i.isString()) {
LOG_TOPIC(WARN, Logger::AGENCY)
<< "ClientIds must be strings. We got " << i.toJson();
continue;
}
auto ret = _clientIdLookupTable.equal_range(i.copyString());
for (auto it = ret.first; it != ret.second; ++it) {
result.push_back(_log.at(it->second-_cur));
}
}
return result;
}

View File

@ -32,6 +32,7 @@
#include <cstdint>
#include <deque>
#include <functional>
#include <map>
struct TRI_vocbase_t;
@ -77,6 +78,9 @@ class State {
std::vector<log_t> get(
index_t = 0, index_t = (std::numeric_limits<uint64_t>::max)()) const;
/// @brief Get log entries by client Id
std::vector<log_t> inquire(query_t const&) const;
/// @brief Get complete logged commands by lower and upper bounds.
/// Default: [first, last]
std::vector<VPackSlice> slices(
@ -119,8 +123,8 @@ class State {
private:
/// @brief Save currentTerm, votedFor, log entries
bool persist(index_t index, term_t term,
arangodb::velocypack::Slice const& entry) const;
bool persist(index_t, term_t, arangodb::velocypack::Slice const&,
std::string const&) const;
bool saveCompacted();
@ -168,6 +172,7 @@ class State {
std::string _endpoint; /**< @brief persistence end point */
bool _collectionsChecked; /**< @brief Collections checked */
bool _collectionsLoaded;
std::multimap<std::string,arangodb::consensus::index_t> _clientIdLookupTable;
/// @brief Our query registry
aql::QueryRegistry* _queryRegistry;

View File

@ -31,6 +31,20 @@
var jsunity = require("jsunity");
var wait = require("internal").wait;
////////////////////////////////////////////////////////////////////////////////
/// @brief bogus UUIDs
////////////////////////////////////////////////////////////////////////////////
function guid() {
function s4() {
return Math.floor((1 + Math.random()) * 0x10000)
.toString(16)
.substring(1);
}
return s4() + s4() + '-' + s4() + '-' + s4() + '-' +
s4() + '-' + s4() + s4() + s4();
}
////////////////////////////////////////////////////////////////////////////////
/// @brief test suite
////////////////////////////////////////////////////////////////////////////////
@ -137,33 +151,34 @@ function agencyTestSuite () {
},
////////////////////////////////////////////////////////////////////////////////
/// @brief Test nasty willful attempt to break
/// @brief Test transact interface
////////////////////////////////////////////////////////////////////////////////
/*
TODO kaveh...fixup test
testTransact : function () {
var res;
var cur = accessAgency("write",[[{"/": {"op":"delete"}}]]).
bodyParsed.results[0];
assertEqual(readAndCheck([["/x"]]), [{}]);
var res = transactAndCheck([["/x"],[{"/x":12}]],200);
assertEqual(res, [{},++cur]);
res = transactAndCheck([["/x"],[{"/x":12}]],200);
assertEqual(res, [{},2]);
res = transactAndCheck([["/x"],[{"/x":12}]],200);
assertEqual(res, [{x:12},3]);
assertEqual(res, [{x:12},++cur]);
res = transactAndCheck([["/x"],[{"/x":12}],["/x"]],200);
assertEqual(res, [{x:12},4,{x:12}]);
assertEqual(res, [{x:12},++cur,{x:12}]);
res = transactAndCheck([["/x"],[{"/x":12}],["/x"]],200);
assertEqual(res, [{x:12},5,{x:12}]);
assertEqual(res, [{x:12},++cur,{x:12}]);
res = transactAndCheck([["/x"],[{"/x":{"op":"increment"}}],["/x"]],200);
assertEqual(res, [{x:12},6,{x:13}]);
assertEqual(res, [{x:12},++cur,{x:13}]);
res = transactAndCheck(
[["/x"],[{"/x":{"op":"increment"}}],["/x"],[{"/x":{"op":"increment"}}]],
200);
assertEqual(res, [{x:13},7,{x:14},8]);
assertEqual(res, [{x:13},++cur,{x:14},++cur]);
res = transactAndCheck(
[[{"/x":{"op":"increment"}}],[{"/x":{"op":"increment"}}],["/x"]],200);
assertEqual(res, [9,10,{x:17}]);
assertEqual(res, [++cur,++cur,{x:17}]);
writeAndCheck([[{"/":{"op":"delete"}}]]);
},
*/
////////////////////////////////////////////////////////////////////////////////
/// @brief test to write a single top level key
@ -274,6 +289,15 @@ function agencyTestSuite () {
},
////////////////////////////////////////////////////////////////////////////////
/// @brief test clientIds
////////////////////////////////////////////////////////////////////////////////
testClientIds : function () {
writeAndCheck([[{"a":12},{},guid()]]);
},
////////////////////////////////////////////////////////////////////////////////
/// @brief test document/transaction assignment
////////////////////////////////////////////////////////////////////////////////