1
0
Fork 0

Overwriting ttls

This commit is contained in:
Kaveh Vahedipour 2016-04-01 14:24:30 +02:00
parent 5363703846
commit 8ace3c10a3
7 changed files with 89 additions and 22 deletions

View File

@ -332,5 +332,12 @@ log_t const& Agent::lastLog() const {
return _state.lastLog();
}
Store const& Agent::spearhead () const {
return _spearhead;
}
Store const& Agent::readDB () const {
return _read_db;
}
}}

View File

@ -126,6 +126,12 @@ class Agent : public arangodb::Thread {
/// @brief State machine
State const& state() const;
/// @brief Get read store
Store const& readDB() const;
/// @brief Get spearhead store
Store const& spearhead() const;
private:
Constituent _constituent; /**< @brief Leader election delegate */
State _state; /**< @brief Log replica */

View File

@ -217,12 +217,11 @@ bool State::createCollection(std::string const& name) {
}
bool State::loadCollections() {
loadCollection("log");
return true;
return loadCollection("log");
}
bool State::loadCollection(std::string const& name) {
if (checkCollections()) {
if (checkCollection(name)) {
// Path
std::string path("/_api/cursor");
@ -255,10 +254,11 @@ bool State::loadCollection(std::string const& name) {
}
}
}
return true;
} else {
LOG_TOPIC (INFO, Logger::AGENCY) << "Couldn't find persisted log";
createCollections();
return false;
}
}

View File

@ -30,6 +30,8 @@
#include <Basics/ConditionLocker.h>
#include <ctime>
#include <iomanip>
#include <iostream>
using namespace arangodb::consensus;
@ -96,7 +98,6 @@ Node& Node::operator= (VPackSlice const& slice) { // Assign value (become leaf)
Node& Node::operator= (Node const& node) { // Assign node
_node_name = node._node_name;
_type = node._type;
_value = node._value;
_children = node._children;
return *this;
@ -394,6 +395,16 @@ std::ostream& Node::print (std::ostream& o) const {
} else {
o << ((slice().type() == ValueType::None) ? "NONE" : slice().toJson()) << std::endl;
}
if (_time_table.size()) {
for (auto const& i : _time_table) {
o << i.second.get() << std::endl;
}
}
if (_table_time.size()) {
for (auto const& i : _table_time) {
o << i.first.get() << std::endl;
}
}
return o;
}
@ -560,9 +571,8 @@ void Store::beginShutdown() {
}
void Store::clearTimeTable () {
size_t deleted = 0;
// std::vector<std::shared_ptr<Node>> deleted;
for (auto it = _time_table.cbegin(); it != _time_table.cend(); ++it) {
// Remove expired from front
if (it->first < std::chrono::system_clock::now()) {
query_t tmp = std::make_shared<Builder>();
tmp->openArray(); tmp->openArray(); tmp->openObject();
@ -570,18 +580,42 @@ void Store::clearTimeTable () {
tmp->add("op",VPackValue("delete"));
tmp->close(); tmp->close(); tmp->close(); tmp->close();
_agent->write(tmp);
it->second->remove();
deleted++;
// _time_table.erase(it++);
} else {
break;
}
}
for (size_t i = 0; i < deleted; ++i) {
/* for (size_t i = 0; i < deleted; ++i) {
try {
_table_time.erase(_table_time.find(_time_table.cbegin()->second));
_time_table.erase(_time_table.cbegin());
} catch (std::exception const& e) {
std::cout << e.what() << std::endl;
}*/
}
void Store::dumpToBuilder (Builder& builder) const {
MUTEX_LOCKER(storeLocker, _storeLock);
toBuilder(builder);
{
VPackObjectBuilder guard(&builder);
for (auto const& i : _time_table) {
auto in_time_t = std::chrono::system_clock::to_time_t(i.first);
std::stringstream ss;
ss << std::put_time(std::localtime(&in_time_t), "%Y-%m-%d %X");
builder.add(ss.str(), VPackValue((size_t)i.second.get()));
}
}
{
VPackObjectBuilder guard(&builder);
for (auto const& i : _table_time) {
auto in_time_t = std::chrono::system_clock::to_time_t(i.second);
std::stringstream ss;
ss << std::put_time(std::localtime(&in_time_t), "%Y-%m-%d %X");
builder.add(std::to_string((size_t)i.first.get()), VPackValue(ss.str()));
}
}
}
bool Store::start () {
@ -601,6 +635,3 @@ void Store::run() {
clearTimeTable();
}
}

View File

@ -158,7 +158,6 @@ protected:
TableTime _table_time;
Buffer<uint8_t> _value;
NodeType _type;
std::string _node_name;
};
@ -201,6 +200,9 @@ public:
/// @brief Set name
void name (std::string const& name);
/// @brief Dump everything to builder
void dumpToBuilder (Builder&) const;
private:
/// @brief Read individual entry specified in slice into builder
bool read (arangodb::velocypack::Slice const&,

View File

@ -79,7 +79,25 @@ void RestAgencyHandler::redirectRequest (id_t leaderId) {
_response->setHeader("Location", rendpoint);
}
inline HttpHandler::status_t RestAgencyHandler::handleWrite () {
HttpHandler::status_t RestAgencyHandler::handleStores () {
if (_request->requestType() == GeneralRequest::RequestType::GET) {
Builder body;
body.openObject();
body.add("spearhead", VPackValue(VPackValueType::Array));
_agent->spearhead().dumpToBuilder(body);
body.close();
body.add("read_db", VPackValue(VPackValueType::Array));
_agent->readDB().dumpToBuilder(body);
body.close();
body.close();
generateResult(body.slice());
} else {
generateError(HttpResponse::BAD,400);
}
return HttpHandler::status_t(HANDLER_DONE);
}
HttpHandler::status_t RestAgencyHandler::handleWrite () {
arangodb::velocypack::Options options; // TODO: User not wait.
if (_request->requestType() == GeneralRequest::RequestType::POST) {
@ -215,7 +233,7 @@ HttpHandler::status_t RestAgencyHandler::execute() {
} else if (_request->suffix().size() > 1) { // path size >= 2
return reportTooManySuffices();
} else {
if (_request->suffix()[0] == "write") {
if (_request->suffix()[0] == "write") {
return handleWrite();
} else if (_request->suffix()[0] == "read") {
return handleRead();
@ -224,14 +242,16 @@ HttpHandler::status_t RestAgencyHandler::execute() {
return reportMethodNotAllowed();
}
return handleConfig();
} else if (_request->suffix()[0] == "state") {
} else if (_request->suffix()[0] == "state") {
if (_request->requestType() != GeneralRequest::RequestType::GET) {
return reportMethodNotAllowed();
}
return handleState();
} else {
} else if (_request->suffix()[0] == "stores") {
return handleStores();
} else {
return reportUnknownMethod();
}
}
}
} catch (...) {
// Ignore this error

View File

@ -45,6 +45,7 @@ class RestAgencyHandler : public RestBaseHandler {
status_t reportErrorEmptyRequest();
status_t reportTooManySuffices();
status_t reportUnknownMethod();
status_t handleStores();
status_t handleRead();
status_t handleWrite();
status_t handleConfig();