1
0
Fork 0

we have ttl

This commit is contained in:
Kaveh Vahedipour 2016-03-17 21:04:12 +01:00
parent 92bcc0c376
commit 0051c9d3d4
3 changed files with 118 additions and 36 deletions

View File

@ -51,6 +51,7 @@ Agent::~Agent () {
void Agent::start() {
_constituent.start();
_spear_head.start();
}
term_t Agent::term () const {

View File

@ -28,6 +28,8 @@
#include <velocypack/velocypack-aliases.h>
#include <velocypack/Hexdump.h>
#include <Basics/ConditionLocker.h>
#include <iostream>
using namespace arangodb::consensus;
@ -102,6 +104,11 @@ bool Node::remove (std::string const& path) {
}
}
bool Node::remove () {
Node& parent = *_parent;
return parent.removeChild(_name);
}
bool Node::removeChild (std::string const& key) {
auto found = _children.find(key);
if (found == _children.end())
@ -117,16 +124,6 @@ Node& Node::operator [](std::string name) {
return *_children[name];
}
bool Node::append (std::string const name, std::shared_ptr<Node> const node) {
if (node != nullptr) {
_children[name] = node;
} else {
_children[name] = std::make_shared<Node>(name);
}
_children[name]->_parent = this;
return true;
}
Node& Node::operator ()(std::vector<std::string>& pv) {
if (pv.size()) {
std::string const key = pv[0];
@ -164,14 +161,24 @@ Node& Node::operator ()(std::string const& path) {
return this->operator()(pv);
}
Node const& Node::read (std::string const& path) const {
PathType pv = split(path,'/');
return this->operator()(pv);
std::ostream& operator<< (
std::ostream& o, std::chrono::system_clock::time_point const& t) {
std::time_t tmp = std::chrono::system_clock::to_time_t(t);
o << std::ctime(&tmp);
return o;
}
template<class S, class T>
std::ostream& operator<< (std::ostream& o, std::map<S,T> const& d) {
for (auto const& i : d)
o << i.first << ":" << i.second << std::endl;
return o;
}
Node& Node::write (std::string const& path) {
PathType pv = split(path,'/');
return this->operator()(pv);
bool Node::addTimeToLive (long millis) {
_time_table[
std::chrono::system_clock::now() + std::chrono::milliseconds(millis)] =
_parent->_children[_name];
return true;
}
bool Node::applies (arangodb::velocypack::Slice const& slice) {
@ -188,12 +195,15 @@ bool Node::applies (arangodb::velocypack::Slice const& slice) {
Slice const& self = this->slice();
if (oper == "delete") {
return _parent->removeChild(_name);
} else if (oper == "set") {
} else if (oper == "set") { //
if (!slice.hasKey("new")) {
LOG(WARN) << "Operator set without new value";
LOG(WARN) << slice.toJson();
return false;
}
if (slice.hasKey("ttl")) {
addTimeToLive ((long)slice.get("ttl").getDouble()*1000);
}
*this = slice.get("new");
return true;
} else if (oper == "increment") { // Increment
@ -328,7 +338,7 @@ void Node::toBuilder (Builder& builder) const {
}
}
Store::Store (std::string const& name) : Node(name) {}
Store::Store (std::string const& name) : Node(name), Thread(name) {}
Store::~Store () {}
@ -353,6 +363,8 @@ std::vector<bool> Store::apply (query_t const& query) {
break;
}
}
_cv.signal(); // Wake up run
return applied;
}
@ -459,5 +471,31 @@ bool Store::read (arangodb::velocypack::Slice const& query, Builder& ret) const
return true;
}
void Store::beginShutdown() {
Thread::beginShutdown();
}
void Store::clearTimeTable () {
for (auto it = _time_table.cbegin(); it != _time_table.cend() ;) {
if (it->first < std::chrono::system_clock::now()) {
std::cout << "clearing time table: " << it->second->name() << std::endl;
it->second->remove();
_time_table.erase(it++);
} else {
break;
}
}
}
void Store::run() {
CONDITION_LOCKER(guard, _cv);
while (!this->isStopping()) { // Check timetable and remove overage entries
_cv.wait(1000000); // better wait to next known time point
clearTimeTable();
}
}

View File

@ -39,6 +39,8 @@
#include <Basics/Mutex.h>
#include <Basics/MutexLocker.h>
#include <Basics/Thread.h>
#include <Basics/ConditionVariable.h>
#include <velocypack/Buffer.h>
#include <velocypack/velocypack-aliases.h>
@ -59,51 +61,64 @@ private:
enum NODE_EXCEPTION {PATH_NOT_FOUND};
/// @brief Simple tree implementation
class Node {
public:
typedef std::vector<std::string> PathType;
typedef std::map<std::string, std::shared_ptr<Node>> Children;
typedef std::chrono::system_clock::time_point TimePoint;
typedef std::map<TimePoint, std::shared_ptr<Node>> TimeTable;
/// @brief Construct with name
Node (std::string const& name);
/// @brief Construct with name and introduce to tree under parent
Node (std::string const& name, Node* parent);
/// @brief Default dtor
virtual ~Node ();
/// @brief Get name
std::string const& name() const;
/// @brief Apply rhs to this node (deep copy of rhs)
Node& operator= (Node const& node);
/// @brief Apply value slice to this node
Node& operator= (arangodb::velocypack::Slice const&);
/// @brief Check equality with slice
bool operator== (arangodb::velocypack::Slice const&) const;
/// @brief Type of this node (LEAF / NODE)
NodeType type() const;
/// @brief Get child specified by name
Node& operator [](std::string name);
Node const& operator [](std::string name) const;
bool append (std::string const name,
std::shared_ptr<Node> const node = nullptr);
/// @brief Get node specified by path vector
Node& operator ()(std::vector<std::string>& pv);
Node const& operator ()(std::vector<std::string>& pv) const;
/// @brief Get node specified by path string
Node& operator ()(std::string const& path);
Node const& operator ()(std::string const& path) const;
Node const& read (std::string const& path) const;
Node& write (std::string const& path);
/// @brief Remove node with absolute path
bool remove (std::string const& path);
/// @brief Remove child
bool removeChild (std::string const& key);
/// @brief Remove this node
bool remove();
/// @brief Dump
friend std::ostream& operator<<(std::ostream& os, const Node& n) {
Node* par = n._parent;
Node const* par = n._parent;
while (par != 0) {
par = par->_parent;
os << " ";
@ -119,19 +134,23 @@ public:
return os;
}
/// @brief Apply single slice
bool applies (arangodb::velocypack::Slice const&);
/// @brief Create Builder representing this store
void toBuilder (Builder&) const;
Buffer<uint8_t> const& value() const {
return _value;
}
/// @brief Create slice from value
Slice slice() const;
protected:
/// @brief Add time to live entry
bool addTimeToLive (long millis);
Node* _parent;
Children _children;
TimeTable _time_table;
Buffer<uint8_t> _value;
std::chrono::system_clock::time_point _ttl;
@ -141,20 +160,44 @@ protected:
};
class Store : public Node { // Root node
/// @brief Key value tree
class Store : public Node, public arangodb::Thread {
public:
/// @brief Construct with name
Store (std::string const& name = "root");
/// @brief Destruct
virtual ~Store ();
/// @brief Apply entry in query
std::vector<bool> apply (query_t const& query);
query_t read (query_t const& query) const;
/// @brief Read specified query from store
query_t read (query_t const& query) const;
private:
/// @brief Read individual entry specified in slice into builder
bool read (arangodb::velocypack::Slice const&,
arangodb::velocypack::Builder&) const;
/// @brief Check precondition
bool check (arangodb::velocypack::Slice const&) const;
/// @brief Clear entries, whose time to live has expired
void clearTimeTable ();
/// @brief Begin shutdown of thread
void beginShutdown () override;
/// @brief Run thread
void run () override final;
/// @brief Condition variable guarding removal of expired entries
arangodb::basics::ConditionVariable _cv;
/// @brief Read/Write mutex on database
mutable arangodb::Mutex _storeLock;
};