1
0
Fork 0

Merge branch 'devel' of github.com:arangodb/arangodb into devel

This commit is contained in:
Michael Hackstein 2016-04-21 17:46:58 +02:00
commit 9a9781149d
11 changed files with 288 additions and 40 deletions

View File

@ -44,6 +44,17 @@
#include "VocBase/collection.h"
#include "VocBase/vocbase.h"
#include "Constituent.h"
#include "Agent.h"
#include "NotifierThread.h"
#include <velocypack/Iterator.h>
#include <velocypack/velocypack-aliases.h>
#include <chrono>
#include <iomanip>
#include <thread>
using namespace arangodb::consensus;
using namespace arangodb::rest;
using namespace arangodb::velocypack;
@ -73,7 +84,8 @@ Constituent::Constituent()
_gen(std::random_device()()),
_role(FOLLOWER),
_agent(nullptr),
_votedFor((std::numeric_limits<uint32_t>::max)()) {
_votedFor((std::numeric_limits<uint32_t>::max)()),
_notifier(nullptr) {
_gen.seed(RandomGenerator::interval(UINT32_MAX));
}
@ -220,35 +232,32 @@ std::vector<std::string> const& Constituent::endpoints() const {
}
/// @brief Notify peers of updated endpoints
size_t Constituent::notifyAll() {
// Last process notifies everyone
std::stringstream path;
path << "/_api/agency_priv/notifyAll?term=" << _term << "&agencyId=" << _id;
// Body contains endpoints list
Builder body;
body.openObject();
body.add("endpoints", VPackValue(VPackValueType::Array));
for (auto const& i : endpoints()) {
body.add(Value(i));
}
body.close();
body.close();
void Constituent::notifyAll () {
std::vector<std::string> toNotify;
// Send request to all but myself
for (id_t i = 0; i < size(); ++i) {
if (i != _id) {
std::unique_ptr<std::map<std::string, std::string>> headerFields =
std::make_unique<std::map<std::string, std::string>>();
arangodb::ClusterComm::instance()->asyncRequest(
"1", 1, endpoint(i), GeneralRequest::RequestType::POST, path.str(),
std::make_shared<std::string>(body.toString()), headerFields, nullptr,
0.0, true);
toNotify.push_back(endpoint(i));
}
}
return size() - 1;
// Body contains endpoints list
auto body = std::make_shared<VPackBuilder>();
body->openObject();
body->add("endpoints", VPackValue(VPackValueType::Array));
for (auto const& i : endpoints()) {
body->add(Value(i));
}
body->close();
body->close();
// Last process notifies everyone
std::stringstream path;
path << "/_api/agency_priv/notifyAll?term=" << _term << "&agencyId=" << _id;
_notifier = std::make_unique<NotifierThread>(path.str(), body, toNotify);
_notifier->start();
}
/// @brief Vote
@ -361,11 +370,13 @@ void Constituent::callElection() {
}
}
void Constituent::beginShutdown() { Thread::beginShutdown(); }
void Constituent::beginShutdown() {
_notifier.reset();
Thread::beginShutdown();
}
bool Constituent::start(TRI_vocbase_t* vocbase) {
bool Constituent::start (TRI_vocbase_t* vocbase) {
_vocbase = vocbase;
return Thread::start();
}

View File

@ -30,6 +30,7 @@
#include "AgencyCommon.h"
#include "AgentConfiguration.h"
#include "NotifierThread.h"
#include "Basics/Common.h"
#include "Basics/Thread.h"
@ -130,7 +131,7 @@ private:
/// @brief Notify everyone, that we are good to go.
/// This is the task of the last process starting up.
/// Will be taken care of by gossip
size_t notifyAll();
void notifyAll();
/// @brief Sleep for how long
duration_t sleepFor(double, double);
@ -138,7 +139,6 @@ private:
TRI_vocbase_t* _vocbase;
aql::QueryRegistry* _queryRegistry;
term_t _term; /**< @brief term number */
std::atomic<bool> _cast; /**< @brief cast a vote this term */
std::atomic<state_t> _state; /**< @brief State (follower, candidate, leader)*/
@ -151,6 +151,8 @@ private:
Agent* _agent; /**< @brief My boss */
id_t _votedFor;
std::unique_ptr<NotifierThread> _notifier;
arangodb::basics::ConditionVariable _cv; // agency callbacks
mutable arangodb::Mutex _castLock;

View File

@ -0,0 +1,92 @@
#include "Basics/ConditionLocker.h"
#include "Agency/NotifierThread.h"
#include "Agency/NotifyCallback.h"
#include "Cluster/ClusterComm.h"
#include "VocBase/server.h"
using namespace arangodb::consensus;
NotifierThread::NotifierThread(const std::string& path, std::shared_ptr<VPackBuilder> body, const std::vector<std::string> &endpoints)
: Thread("AgencyNotifiaction"), _path(path), _body(body), _endpoints(endpoints)
{}
void NotifierThread::scheduleNotification(const std::string& endpoint) {
LOG(DEBUG) << "Scheduling " << endpoint << _path << " " << _body->toJson() << " " << endpoint;
auto cb = [this, endpoint] (bool result) {
LOG(DEBUG) << "Agencynotification for " << endpoint << ". Result: " << result;
CONDITION_LOCKER(guard, _cv);
_openResults.emplace_back(NotificationResult({result, endpoint}));
_cv.signal();
};
std::unique_ptr<std::map<std::string, std::string>> headerFields =
std::make_unique<std::map<std::string, std::string> >();
while (true) {
auto res = arangodb::ClusterComm::instance()->asyncRequest(
"", TRI_NewTickServer(), endpoint, GeneralRequest::RequestType::POST, _path,
std::make_shared<std::string>(_body->toJson()), headerFields,
std::make_shared<NotifyCallback>(cb), 5.0, true);
if (res.status == CL_COMM_SUBMITTED) {
break;
}
usleep(500000);
}
}
bool NotifierThread::start() {
return Thread::start();
}
void NotifierThread::run() {
try {
LOG(DEBUG) << "Starting Agencynotifications";
// mop: locker necessary because if scheduledNotifications may return earlier than this thread reaching the while
CONDITION_LOCKER(locker, _cv);
size_t numEndpoints = _endpoints.size();
for (auto& endpoint : _endpoints) {
scheduleNotification(endpoint);
}
while (numEndpoints > 0) {
LOG(DEBUG) << "WAITING " << numEndpoints;
locker.wait();
if (isStopping()) {
LOG(DEBUG) << "Agencynotifications stopping";
break;
}
for (auto& result: _openResults) {
if (result.success) {
numEndpoints--;
} else {
// mop: this is totally suboptimal under the lock but
// we don't want to commence a http bombardment
usleep(50000);
scheduleNotification(result.endpoint);
}
}
_openResults.clear();
}
LOG(DEBUG) << "Agencynotifications done";
} catch (std::exception& e) {
LOG(ERR) << "Couldn't notify agents: " << e.what();
} catch (...) {
LOG(ERR) << "Couldn't notify agents!";
}
}
void NotifierThread::beginShutdown() {
Thread::beginShutdown();
CONDITION_LOCKER(locker, _cv);
_cv.signal();
}
// Shutdown if not already
NotifierThread::~NotifierThread() {
shutdown();
}

View File

@ -0,0 +1,72 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Andreas Streichardt
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGODB_CONSENSUS_NOTIFIER_COMMON_H
#define ARANGODB_CONSENSUS_NOTIFIER_COMMON_H
#include <string>
#include <vector>
#include <memory>
#include "Basics/ConditionVariable.h"
#include "Basics/Thread.h"
#include <velocypack/Builder.h>
#include <velocypack/velocypack-aliases.h>
namespace arangodb {
namespace consensus {
struct NotificationResult {
bool success;
std::string endpoint;
};
class NotifierThread : public Thread {
public:
NotifierThread(const std::string& path, std::shared_ptr<VPackBuilder> body, const std::vector<std::string> &endpoints);
virtual ~NotifierThread();
void run() override;
bool start();
/// @brief Orderly shutdown of thread
void beginShutdown () override;
private:
void scheduleNotification(const std::string&);
arangodb::basics::ConditionVariable _cv;
std::string _path;
std::shared_ptr<VPackBuilder> _body;
std::vector<std::string> _endpoints;
std::vector<NotificationResult> _openResults;
};
}}
#endif // ARANGODB_CONSENSUS_NOTIFIER_COMMON_H

View File

@ -0,0 +1,14 @@
#include "NotifyCallback.h"
using namespace arangodb::consensus;
NotifyCallback::NotifyCallback(std::function<void(bool)> cb)
: _cb(cb)
{
}
bool NotifyCallback::operator()(arangodb::ClusterCommResult* res) {
_cb(res->status == CL_COMM_SENT && res->result->getHttpReturnCode() == 200);
return true;
}

View File

@ -0,0 +1,50 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Andreas Streichardt
////////////////////////////////////////////////////////////////////////////////
#ifndef __ARANGODB_CONSENSUS_NOTIFY_CALLBACK__
#define __ARANGODB_CONSENSUS_NOTIFY_CALLBACK__
#include "Cluster/ClusterComm.h"
#include "AgencyCommon.h"
#include <velocypack/velocypack-aliases.h>
namespace arangodb {
namespace consensus {
class NotifyCallback : public arangodb::ClusterCommCallback {
public:
NotifyCallback(std::function<void(bool)>);
virtual bool operator()(arangodb::ClusterCommResult*) override final;
void shutdown();
private:
std::function<void(bool)> _cb;
};
}} // namespace
#endif

View File

@ -1697,6 +1697,17 @@ bool AstNode::isAttributeAccessForVariable(
return true;
}
/// @brief recursively clear flags
void AstNode::clearFlagsRecursive() {
clearFlags();
size_t const n = numMembers();
for (size_t i = 0; i < n; ++i) {
auto member = getMemberUnchecked(i);
member->clearFlagsRecursive();
}
}
/// @brief whether or not a node is simple enough to be used in a simple
/// expression
bool AstNode::isSimple() const {

View File

@ -319,6 +319,9 @@ struct AstNode {
/// @brief reset flags in case a node is changed drastically
inline void clearFlags() { flags = 0; }
/// @brief recursively clear flags
void clearFlagsRecursive();
/// @brief set a flag for the node
inline void setFlag(AstNodeFlagType flag) const {

View File

@ -205,6 +205,7 @@ void Expression::replaceVariableReference(Variable const* variable,
// must rebuild the expression completely, as it may have changed drastically
_built = false;
_type = UNPROCESSED;
_node->clearFlagsRecursive(); // recursively delete the node's flags
}
const_cast<AstNode*>(_node)->clearFlags();
@ -224,7 +225,7 @@ void Expression::invalidate() {
_func = nullptr;
_built = false;
}
}
}
// we do not need to invalidate the other expression type
// expression data will be freed in the destructor
}

View File

@ -73,6 +73,8 @@ add_executable(${BIN_ARANGOD}
Agency/Agent.cpp
Agency/AgentCallback.cpp
Agency/Constituent.cpp
Agency/NotifierThread.cpp
Agency/NotifyCallback.cpp
Agency/Node.cpp
Agency/Supervision.cpp
Agency/RestAgencyHandler.cpp

View File

@ -1270,12 +1270,8 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName,
res.clear();
res = ac.getValues(where, true);
LOG(TRACE) << "CREATE OYOYOYOY " << where;
if (res.successful() && res.parse(where + "/", false)) {
LOG(TRACE) << "CREATE IS SUCCESS " << where;
if (res._values.size() == (size_t)numberOfShards) {
LOG(TRACE) << "CREATE has number " << where;
std::string tmpMsg = "";
bool tmpHaveError = false;
for (auto const& p : res._values) {
@ -1297,23 +1293,17 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName,
}
}
}
LOG(TRACE) << "CREATE PRE LOAD has number " << where;
loadCurrentCollections();
LOG(TRACE) << "CREATE POST LOAD has number " << where;
if (tmpHaveError) {
errorMsg = "Error in creation of collection:" + tmpMsg;
LOG(TRACE) << "CREATE KAP0TT " << where;
return TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION;
}
LOG(TRACE) << "CREATE OK " << where;
return setErrormsg(TRI_ERROR_NO_ERROR, errorMsg);
}
}
res.clear();
LOG(TRACE) << "JASSSSS " << interval;
_agencyCallbackRegistry->awaitNextChange("Current/Version", interval);
LOG(TRACE) << "NNNNJASSSSS " << interval;
}
// LOG(ERR) << "GOT TIMEOUT. NUMBEROFSHARDS: " << numberOfShards;