1
0
Fork 0

agency compaction to c++ database api

This commit is contained in:
Kaveh Vahedipour 2016-05-09 08:12:22 +02:00
parent 642fc70082
commit 220c286a11
6 changed files with 90 additions and 31 deletions

View File

@ -46,7 +46,7 @@ Agent::Agent (config_t const& config)
_config(config),
_lastCommitIndex(0) {
_state.setEndPoint(_config.endpoint);
_state.configure(this);
_constituent.configure(this);
_confirmed.resize(size(),0); // agency's size and reset to 0
}
@ -83,6 +83,11 @@ inline size_t Agent::size() const {
return _config.size();
}
// My endpoint
std::string const& Agent::endpoint () const {
return _config.endpoint;
}
// Handle vote request
priv_rpc_ret_t Agent::requestVote(term_t t, arangodb::consensus::id_t id, index_t lastLogIndex,
index_t lastLogTerm, query_t const& query) {

View File

@ -61,8 +61,10 @@ public:
/// @brief Start thread
bool start();
/// @brief My endpoint
std::string const& endpoint() const;
/// @brief Verbose print of myself
////
void print(arangodb::LoggerStream&) const;
/// @brief Are we fit to run?

View File

@ -21,6 +21,7 @@
/// @author Kaveh Vahedipour
////////////////////////////////////////////////////////////////////////////////
#include "Agent.h"
#include "State.h"
#include <velocypack/Buffer.h>
@ -49,21 +50,25 @@ using namespace arangodb::velocypack;
using namespace arangodb::rest;
State::State(std::string const& endpoint)
: _vocbase(nullptr),
_endpoint(endpoint),
_collectionsChecked(false),
_collectionsLoaded(false) {
: _agent(nullptr),
_vocbase(nullptr),
_endpoint(endpoint),
_collectionsChecked(false),
_collectionsLoaded(false),
_compaction_step(1000) {
std::shared_ptr<Buffer<uint8_t>> buf = std::make_shared<Buffer<uint8_t>>();
VPackSlice value = arangodb::basics::VelocyPackHelper::EmptyObjectValue();
buf->append(value.startAs<char const>(), value.byteSize());
if (!_log.size()) {
_log.push_back(log_t(arangodb::consensus::index_t(0), term_t(0), arangodb::consensus::id_t(0), buf));
_log.push_back(log_t(arangodb::consensus::index_t(0), term_t(0),
arangodb::consensus::id_t(0), buf));
}
}
State::~State() {}
bool State::persist(arangodb::consensus::index_t index, term_t term, arangodb::consensus::id_t lid,
bool State::persist(arangodb::consensus::index_t index, term_t term,
arangodb::consensus::id_t lid,
arangodb::velocypack::Slice const& entry) {
Builder body;
body.add(VPackValue(VPackValueType::Object));
@ -95,7 +100,8 @@ bool State::persist(arangodb::consensus::index_t index, term_t term, arangodb::c
//Leader
std::vector<arangodb::consensus::index_t> State::log (
query_t const& query, std::vector<bool> const& appl, term_t term, arangodb::consensus::id_t lid) {
query_t const& query, std::vector<bool> const& appl, term_t term,
arangodb::consensus::id_t lid) {
std::vector<arangodb::consensus::index_t> idx(appl.size());
std::vector<bool> good = appl;
@ -110,6 +116,9 @@ std::vector<arangodb::consensus::index_t> State::log (
idx[j] = _log.back().index + 1;
_log.push_back(log_t(idx[j], term, lid, buf)); // log to RAM
persist(idx[j], term, lid, i[0]); // log to disk
if (idx[j] > 0 && (idx[j] % _compaction_step) == 0) {
compact(idx[j]);
}
++j;
}
}
@ -117,20 +126,26 @@ std::vector<arangodb::consensus::index_t> State::log (
}
// Follower
bool State::log(query_t const& queries, term_t term, arangodb::consensus::id_t lid,
arangodb::consensus::index_t prevLogIndex, term_t prevLogTerm) { // TODO: Throw exc
bool State::log(query_t const& queries, term_t term,
arangodb::consensus::id_t lid,
arangodb::consensus::index_t prevLogIndex,
term_t prevLogTerm) { // TODO: Throw exc
if (queries->slice().type() != VPackValueType::Array) {
return false;
}
MUTEX_LOCKER(mutexLocker, _logLock); // log entries must stay in order
for (auto const& i : VPackArrayIterator(queries->slice())) {
try {
auto idx = i.get("index").getUInt();
std::shared_ptr<Buffer<uint8_t>> buf =
std::make_shared<Buffer<uint8_t>>();
buf->append((char const*)i.get("query").begin(),
i.get("query").byteSize());
_log.push_back(log_t(i.get("index").getUInt(), term, lid, buf));
persist(i.get("index").getUInt(), term, lid, i.get("query")); // to disk
_log.push_back(log_t(idx, term, lid, buf));
persist(idx, term, lid, i.get("query")); // to disk
if (idx > 0 && (idx % _compaction_step) == 0) {
compact(idx);
}
} catch (std::exception const& e) {
LOG(ERR) << e.what();
}
@ -140,7 +155,8 @@ bool State::log(query_t const& queries, term_t term, arangodb::consensus::id_t l
}
// Get log entries from indices "start" to "end"
std::vector<log_t> State::get(arangodb::consensus::index_t start, arangodb::consensus::index_t end) const {
std::vector<log_t> State::get(arangodb::consensus::index_t start,
arangodb::consensus::index_t end) const {
std::vector<log_t> entries;
MUTEX_LOCKER(mutexLocker, _logLock);
if (end == (std::numeric_limits<uint64_t>::max)()) end = _log.size() - 1;
@ -150,7 +166,8 @@ std::vector<log_t> State::get(arangodb::consensus::index_t start, arangodb::cons
return entries;
}
std::vector<VPackSlice> State::slices(arangodb::consensus::index_t start, arangodb::consensus::index_t end) const {
std::vector<VPackSlice> State::slices(arangodb::consensus::index_t start,
arangodb::consensus::index_t end) const {
std::vector<VPackSlice> slices;
MUTEX_LOCKER(mutexLocker, _logLock);
if (end == (std::numeric_limits<uint64_t>::max)()) end = _log.size() - 1;
@ -170,8 +187,9 @@ log_t const& State::lastLog() const {
return _log.back();
}
bool State::setEndPoint(std::string const& endpoint) {
_endpoint = endpoint;
bool State::configure(Agent* agent) {
_agent = agent;
_endpoint = agent->endpoint();
_collectionsChecked = false;
return true;
};
@ -186,7 +204,8 @@ bool State::checkCollections() {
bool State::createCollections() {
if (!_collectionsChecked) {
return (createCollection("log") && createCollection("election"));
return (createCollection("log") && createCollection("election") &&
createCollection("compact"));
}
return _collectionsChecked;
}
@ -257,7 +276,8 @@ bool State::loadCollection(std::string const& name) {
_log.push_back(
log_t(std::stoi(i.get(TRI_VOC_ATTRIBUTE_KEY).copyString()),
static_cast<term_t>(i.get("term").getUInt()),
static_cast<arangodb::consensus::id_t>(i.get("leader").getUInt()), tmp));
static_cast<arangodb::consensus::id_t>(
i.get("leader").getUInt()), tmp));
}
}
@ -278,13 +298,39 @@ bool State::find (arangodb::consensus::index_t prevIndex, term_t prevTerm) {
return _log.at(prevIndex).term == prevTerm;
}
bool State::compact () {
bool State::compact (arangodb::consensus::index_t cind) {
// get read db at lastcommit % n == 0
// save read db with key 10
// update offset in logs
// delete
if (checkCollection("compact")) {
return true;
Builder store;
store.openObject();
store.add("spearhead", VPackValue(VPackValueType::Array));
_agent->spearhead().dumpToBuilder(store);
store.close();
std::stringstream i_str;
i_str << std::setw(20) << std::setfill('0') << cind;
store.add("_key", VPackValue(i_str.str()));
store.close();
TRI_ASSERT(_vocbase != nullptr);
auto transactionContext =
std::make_shared<StandaloneTransactionContext>(_vocbase);
SingleCollectionTransaction trx (
transactionContext, "compact", TRI_TRANSACTION_WRITE);
int res = trx.begin();
if (res != TRI_ERROR_NO_ERROR) {
THROW_ARANGO_EXCEPTION(res);
}
auto result = trx.insert("compact", store.slice(), _options);
res = trx.finish(result.code);
return (res == TRI_ERROR_NO_ERROR);
}
LOG_TOPIC (ERR, Logger::AGENCY) << "Compaction failed!";
return false;
}

View File

@ -93,7 +93,7 @@ public:
/// @brief Set endpoint
bool setEndPoint (std::string const&);
bool configure (Agent* agent);
/// @brief Load persisted data from above or start with empty log
@ -132,7 +132,9 @@ private:
/// @brief Create collection
bool createCollection(std::string const& name);
bool compact ();
bool compact (arangodb::consensus::index_t cind);
Agent* _agent;
TRI_vocbase_t* _vocbase;
@ -142,9 +144,12 @@ private:
bool _collectionsChecked; /**< @brief Collections checked */
bool _collectionsLoaded;
size_t _compaction_step;
OperationOptions _options;
};
}}

View File

@ -214,7 +214,8 @@ std::vector<bool> Store::apply (
std::string endpoint, path;
if (endpointPathFromUrl (url,endpoint,path)) {
auto headerFields = std::make_unique<std::unordered_map<std::string, std::string>>();
auto headerFields = std::make_unique<std::unordered_map<std::string,
std::string>>();
ClusterCommResult res =
arangodb::ClusterComm::instance()->asyncRequest(

View File

@ -190,19 +190,19 @@ function agencyTestSuite () {
assertEqual(readAndCheck([["a/z"]]), [{"a":{"z":12}}]);
writeAndCheck([[{"a/y":{"op":"set","new":12, "ttl": 1}}]]);
assertEqual(readAndCheck([["a/y"]]), [{"a":{"y":12}}]);
sleep(1100);
sleep(1250);
assertEqual(readAndCheck([["a/y"]]), [{a:{}}]);
writeAndCheck([[{"a/y":{"op":"set","new":12, "ttl": 1}}]]);
writeAndCheck([[{"a/y":{"op":"set","new":12}}]]);
assertEqual(readAndCheck([["a/y"]]), [{"a":{"y":12}}]);
sleep(1100);
sleep(1250);
assertEqual(readAndCheck([["a/y"]]), [{"a":{"y":12}}]);
writeAndCheck([[{"foo/bar":{"op":"set","new":{"baz":12}}}]]);
assertEqual(readAndCheck([["/foo/bar/baz"]]), [{"foo":{"bar":{"baz":12}}}]);
assertEqual(readAndCheck([["/foo/bar"]]), [{"foo":{"bar":{"baz":12}}}]);
assertEqual(readAndCheck([["/foo"]]), [{"foo":{"bar":{"baz":12}}}]);
writeAndCheck([[{"foo/bar":{"op":"set","new":{"baz":12},"ttl":1}}]]);
sleep(1100);
sleep(1250);
assertEqual(readAndCheck([["/foo"]]), [{"foo":{}}]);
assertEqual(readAndCheck([["/foo/bar"]]), [{"foo":{}}]);
assertEqual(readAndCheck([["/foo/bar/baz"]]), [{"foo":{}}]);