1
0
Fork 0

[3.5] clean up your crap, dbservers. alright, i'll do it. (#9722)

* clean up your crap, dbservers. alright, i'll do it.

* ts ts ts

* body is shared_ptr

* Update CHANGELOG

* revert callback bodies to API specification

* array needs be inside so that multiple unobserves to same key are possible
This commit is contained in:
Kaveh Vahedipour 2019-08-22 13:26:44 +02:00 committed by KVS85
parent 5bee042b65
commit 09d2745625
8 changed files with 177 additions and 25 deletions

View File

@ -1,6 +1,8 @@
v3.5.1 (XXXX-XX-XX)
-------------------
* Agents to remove callback entries when responded to with code 404.
* Fixed internal issue #622: Analyzer cache is now invalidated for dropped database.
* Show query string length and cacheability information in query explain output.

View File

@ -482,6 +482,44 @@ void AgencyCommResult::clear() {
VPackSlice AgencyCommResult::slice() const { return _vpack->slice(); }
void AgencyCommResult::toVelocyPack(VPackBuilder& builder) const {
{ VPackObjectBuilder dump(&builder);
builder.add("location", VPackValue(_location));
builder.add("message", VPackValue(_message));
builder.add("sent", VPackValue(_sent));
builder.add("body", VPackValue(_body));
if (_vpack != nullptr) {
if (_vpack->isClosed()) {
builder.add("vpack", _vpack->slice());
}
}
builder.add("statusCode", VPackValue(_statusCode));
builder.add(VPackValue("values"));
{ VPackObjectBuilder v(&builder);
for (auto const& value : _values) {
builder.add(VPackValue(value.first));
auto const& entry = value.second;
{ VPackObjectBuilder vv(&builder);
builder.add("index", VPackValue(entry._index));
builder.add("isDir", VPackValue(entry._isDir));
if (entry._vpack != nullptr && entry._vpack->isClosed()) {
builder.add("vpack", entry._vpack->slice());
}}}
}}
}
VPackBuilder AgencyCommResult::toVelocyPack() const {
VPackBuilder builder;
toVelocyPack(builder);
return builder;
}
namespace std {
ostream& operator<< (ostream& out, AgencyCommResult const& a) {
out << a.toVelocyPack().toJson();
return out;
}}
// -----------------------------------------------------------------------------
// --SECTION-- AgencyCommManager
// -----------------------------------------------------------------------------

View File

@ -271,6 +271,10 @@ class AgencyCommResult {
_vpack = vpack;
}
void toVelocyPack(VPackBuilder& builder) const;
VPackBuilder toVelocyPack() const;
public:
std::string _location;
std::string _message;
@ -699,4 +703,8 @@ class AgencyComm {
};
} // namespace arangodb
namespace std {
ostream& operator<<(ostream& o, arangodb::AgencyCommResult const& a);
}
#endif

View File

@ -1289,6 +1289,9 @@ void Agent::run() {
// Check whether we can advance _commitIndex
advanceCommitIndex();
// Empty store callback trash bin
emptyCbTrashBin();
bool commenceService = false;
{
READ_LOCKER(oLocker, _outputLock);
@ -1626,7 +1629,7 @@ arangodb::consensus::index_t Agent::readDB(VPackBuilder& builder) const {
TRI_ASSERT(builder.isOpenObject());
uint64_t commitIndex = 0;
{ READ_LOCKER(oLocker, _outputLock);
commitIndex = _commitIndex;
@ -1637,10 +1640,10 @@ arangodb::consensus::index_t Agent::readDB(VPackBuilder& builder) const {
// key-value store {}
builder.add(VPackValue("agency"));
_readDB.get().toBuilder(builder, true); }
// replicated log []
_state.toVelocyPack(commitIndex, builder);
return commitIndex;
}
@ -1904,6 +1907,72 @@ bool Agent::ready() const {
return _ready;
}
void Agent::trashStoreCallback(std::string const& url, query_t const& body) {
auto const& slice = body->slice();
TRI_ASSERT(slice.isObject());
// body consists of object holding keys index, term and the observed keys
// we'll remove observation on every key and according observer url
for (auto const& i : VPackObjectIterator(slice)) {
if (!i.key.isEqualString("term") && !i.key.isEqualString("index")) {
MUTEX_LOCKER(lock, _cbtLock);
_callbackTrashBin[i.key.copyString()].emplace(url);
}
}
}
void Agent::emptyCbTrashBin() {
using clock = std::chrono::steady_clock;
auto envelope = std::make_shared<VPackBuilder>();
{
_cbtLock.assertNotLockedByCurrentThread();
MUTEX_LOCKER(lock, _cbtLock);
auto early =
std::chrono::duration_cast<std::chrono::seconds>(
clock::now() - _callbackLastPurged).count() < 10;
if (early || _callbackTrashBin.empty()) {
return;
}
{
VPackArrayBuilder trxs(envelope.get());
for (auto const& i : _callbackTrashBin) {
for (auto const& j : i.second) {
{
VPackArrayBuilder trx(envelope.get());
{
VPackObjectBuilder ak(envelope.get());
envelope->add(VPackValue(i.first));
{
VPackObjectBuilder oper(envelope.get());
envelope->add("op", VPackValue("unobserve"));
envelope->add("url", VPackValue(j));
}
}
}
}
}
}
_callbackTrashBin.clear();
_callbackLastPurged = std::chrono::steady_clock::now();
}
LOG_TOPIC("12ad3", DEBUG, Logger::AGENCY) << "unobserving: " << envelope->toJson();
// Best effort. Will be retried anyway
auto wres = write(envelope);
}
query_t Agent::buildDB(arangodb::consensus::index_t index) {
Store store(this);
index_t oldIndex;

View File

@ -145,7 +145,14 @@ class Agent final : public arangodb::Thread, public AgentInterface {
/// @brief Resign leadership
void resign(term_t otherTerm = 0);
/// @brief collect store callbacks for removal
void trashStoreCallback(std::string const& url, query_t const& body);
private:
/// @brief empty callback trash bin
void emptyCbTrashBin();
/// @brief Invoked by leader to replicate log entries ($5.3);
/// also used as heartbeat ($5.2).
void sendAppendEntriesRPC();
@ -402,6 +409,11 @@ class Agent final : public arangodb::Thread, public AgentInterface {
///
mutable arangodb::Mutex _ioLock;
/// @brief Callback trash bin lock
/// _callbackTrashBin
///
mutable arangodb::Mutex _cbtLock;
/// @brief RAFT consistency lock:
/// _readDB and _commitIndex
/// Allows reading from one or both if used alone.
@ -455,6 +467,10 @@ class Agent final : public arangodb::Thread, public AgentInterface {
/// since the epoch of the steady clock.
std::atomic<int64_t> _leaderSince;
/// @brief Container for callbacks for removal
std::unordered_map<std::string, std::unordered_set<std::string>> _callbackTrashBin;
std::chrono::time_point<std::chrono::steady_clock> _callbackLastPurged;
/// @brief Ids of ongoing transactions, used for inquire:
std::unordered_set<std::string> _ongoingTrxs;

View File

@ -326,26 +326,32 @@ std::vector<bool> Store::applyLogEntries(arangodb::velocypack::Builder const& qu
// Callback
for (auto const& url : urls) {
Builder body; // host
auto body = std::make_shared<VPackBuilder>(); // host
{
VPackObjectBuilder b(&body);
body.add("term", VPackValue(term));
body.add("index", VPackValue(index));
VPackObjectBuilder b(body.get());
body->add("term", VPackValue(term));
body->add("index", VPackValue(index));
auto ret = in.equal_range(url);
std::map<std::string,std::map<std::string, std::string>> result;
// key -> (modified -> op)
// using the map to make sure no double key entries end up in document
std::map<std::string,std::map<std::string, std::string>> result;
for (auto it = ret.first; it != ret.second; ++it) {
result[it->second->key][it->second->modified] = it->second->oper;
}
// Work the map into JSON
for (auto const& m : result) {
body.add(VPackValue(m.first));
body->add(VPackValue(m.first));
{
VPackObjectBuilder guard(&body);
VPackObjectBuilder guard(body.get());
for (auto const& m2 : m.second) {
body.add(VPackValue(m2.first));
body->add(VPackValue(m2.first));
{
VPackObjectBuilder guard2(&body);
body.add("op", VPackValue(m2.second));
VPackObjectBuilder guard2(body.get());
body->add("op", VPackValue(m2.second));
}
}
}
@ -359,8 +365,8 @@ std::vector<bool> Store::applyLogEntries(arangodb::velocypack::Builder const& qu
arangodb::ClusterComm::instance()->asyncRequest(
coordinatorTransactionID, endpoint, rest::RequestType::POST, path,
std::make_shared<std::string>(body.toString()), hf,
std::make_shared<StoreCallback>(path, body.toJson()), 1.0, true, 0.01);
std::make_shared<std::string>(body->toString()), hf,
std::make_shared<StoreCallback>(path, body, _agent), 1.0, true, 0.01);
} else {
LOG_TOPIC("76aca", WARN, Logger::AGENCY) << "Malformed URL " << url;

View File

@ -21,18 +21,28 @@
/// @author Kaveh Vahedipour
////////////////////////////////////////////////////////////////////////////////
#include "Agent.h"
#include "StoreCallback.h"
using namespace arangodb::consensus;
using namespace arangodb::velocypack;
StoreCallback::StoreCallback(std::string const& path, std::string const& body)
: _path(path), _body(body) {}
StoreCallback::StoreCallback(
std::string const& url, query_t const& body, Agent* agent)
: _url(url), _body(body), _agent(agent) {}
bool StoreCallback::operator()(arangodb::ClusterCommResult* res) {
if (res->status != CL_COMM_SENT) {
LOG_TOPIC("7c4cc", DEBUG, Logger::AGENCY) << res->endpoint + _path << "(" << res->status
<< ", " << res->errorMessage << "): " << _body;
if (res->status == CL_COMM_ERROR) {
LOG_TOPIC("9sdbf0", TRACE, Logger::AGENCY)
<< _url << "(" << res->status << ", " << res->errorMessage
<< "): " << _body->toJson();
if (res->result->getHttpReturnCode() == 404 && _agent != nullptr) {
LOG_TOPIC("9sdbf0", DEBUG, Logger::AGENCY) << "dropping dead callback at " << _url;
_agent->trashStoreCallback(_url, _body);
}
}
return true;
}

View File

@ -29,15 +29,18 @@
namespace arangodb {
namespace consensus {
class Agent;
class StoreCallback : public arangodb::ClusterCommCallback {
public:
StoreCallback(std::string const&, std::string const&);
public:
StoreCallback(std::string const&, query_t const&, Agent* agent);
bool operator()(arangodb::ClusterCommResult*) override final;
private:
std::string _path;
std::string _body;
private:
std::string _url;
query_t _body;
Agent* _agent;
};
} // namespace consensus
} // namespace arangodb