mirror of https://gitee.com/bigwinds/arangodb
Merge branch 'agency' into devel
This commit is contained in:
commit
0ed24b7465
|
@ -141,12 +141,15 @@ add_executable(${BIN_ARANGOD}
|
|||
Aql/grammar.cpp
|
||||
Aql/tokens.cpp
|
||||
Cluster/AgencyComm.cpp
|
||||
Cluster/AgencyCallback.cpp
|
||||
Cluster/AgencyCallbackRegistry.cpp
|
||||
Cluster/ApplicationCluster.cpp
|
||||
Cluster/ClusterComm.cpp
|
||||
Cluster/ClusterInfo.cpp
|
||||
Cluster/ClusterMethods.cpp
|
||||
Cluster/ClusterTraverser.cpp
|
||||
Cluster/HeartbeatThread.cpp
|
||||
Cluster/RestAgencyCallbacksHandler.cpp
|
||||
Cluster/RestShardHandler.cpp
|
||||
Cluster/ServerJob.cpp
|
||||
Cluster/ServerState.cpp
|
||||
|
|
|
@ -0,0 +1,92 @@
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// DISCLAIMER
|
||||
///
|
||||
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
|
||||
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
|
||||
///
|
||||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
/// you may not use this file except in compliance with the License.
|
||||
/// You may obtain a copy of the License at
|
||||
///
|
||||
/// http://www.apache.org/licenses/LICENSE-2.0
|
||||
///
|
||||
/// Unless required by applicable law or agreed to in writing, software
|
||||
/// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
/// See the License for the specific language governing permissions and
|
||||
/// limitations under the License.
|
||||
///
|
||||
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
///
|
||||
/// @author Andreas Streichardt
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#include "AgencyCallback.h"
|
||||
#include <velocypack/Exception.h>
|
||||
#include <velocypack/Parser.h>
|
||||
#include <velocypack/velocypack-aliases.h>
|
||||
#include <chrono>
|
||||
#include <thread>
|
||||
#include "Basics/MutexLocker.h"
|
||||
|
||||
using namespace arangodb;
|
||||
|
||||
AgencyCallback::AgencyCallback(AgencyComm& agency, std::string const& key, std::function<bool(VPackSlice const&)> const& cb)
|
||||
: key(key),
|
||||
_agency(agency),
|
||||
_cb(cb)
|
||||
{
|
||||
refetchAndUpdate();
|
||||
}
|
||||
|
||||
void AgencyCallback::refetchAndUpdate() {
|
||||
AgencyCommResult result = _agency.getValues(key, true);
|
||||
|
||||
if (!result.successful()) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!result.parse("", false)) {
|
||||
LOG(ERR) << "Cannot parse body " << result.body();
|
||||
return;
|
||||
}
|
||||
|
||||
std::map<std::string, AgencyCommResultEntry>::const_iterator it =
|
||||
result._values.begin();
|
||||
|
||||
if (it == result._values.end()) {
|
||||
std::shared_ptr<VPackBuilder> newData = std::make_shared<VPackBuilder>();
|
||||
checkValue(newData);
|
||||
} else {
|
||||
checkValue(it->second._vpack);
|
||||
}
|
||||
}
|
||||
|
||||
void AgencyCallback::checkValue(std::shared_ptr<VPackBuilder> newData) {
|
||||
if (!_lastData || !_lastData->slice().equals(newData->slice())) {
|
||||
LOG(DEBUG) << "Got new value" << newData->toJson();
|
||||
if (execute(newData)) {
|
||||
_lastData = newData;
|
||||
} else {
|
||||
LOG(DEBUG) << "Callback was not successful for " << newData->toJson();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool AgencyCallback::execute(std::shared_ptr<VPackBuilder> newData) {
|
||||
LOG(DEBUG) << "Executing";
|
||||
MUTEX_LOCKER(locker, _lock);
|
||||
return _cb(newData->slice());
|
||||
}
|
||||
|
||||
void AgencyCallback::waitWithFailover(double timeout) {
|
||||
// mop: todo thread safe? check with max
|
||||
std::shared_ptr<VPackBuilder> beginData = _lastData;
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(static_cast<int>(timeout * 1000)));
|
||||
|
||||
if (!_lastData || _lastData->slice().equals(beginData->slice())) {
|
||||
LOG(DEBUG) << "Waiting done and nothing happended. Refetching to be sure";
|
||||
// mop: watches have not triggered during our sleep...recheck to be sure
|
||||
refetchAndUpdate();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,62 @@
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// DISCLAIMER
|
||||
///
|
||||
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
|
||||
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
|
||||
///
|
||||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
/// you may not use this file except in compliance with the License.
|
||||
/// You may obtain a copy of the License at
|
||||
///
|
||||
/// http://www.apache.org/licenses/LICENSE-2.0
|
||||
///
|
||||
/// Unless required by applicable law or agreed to in writing, software
|
||||
/// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
/// See the License for the specific language governing permissions and
|
||||
/// limitations under the License.
|
||||
///
|
||||
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
///
|
||||
/// @author Andreas Streichardt
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
#include "Cluster/AgencyComm.h"
|
||||
#include <velocypack/Builder.h>
|
||||
#include <velocypack/velocypack-aliases.h>
|
||||
#include "Basics/Mutex.h"
|
||||
|
||||
namespace arangodb {
|
||||
|
||||
class AgencyCallback {
|
||||
public:
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief ctor
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
AgencyCallback(AgencyComm&, std::string const&, std::function<bool(VPackSlice const&)> const&);
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief wait a specified timeout. execute cb if watch didn't fire
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
void waitWithFailover(double timeout);
|
||||
|
||||
std::string const key;
|
||||
|
||||
void refetchAndUpdate();
|
||||
|
||||
|
||||
private:
|
||||
arangodb::Mutex _lock;
|
||||
AgencyComm& _agency;
|
||||
std::function<bool(VPackSlice const&)> const _cb;
|
||||
std::shared_ptr<VPackBuilder> _lastData;
|
||||
|
||||
bool execute(std::shared_ptr<VPackBuilder>);
|
||||
void checkValue(std::shared_ptr<VPackBuilder>);
|
||||
};
|
||||
|
||||
}
|
|
@ -0,0 +1,131 @@
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// DISCLAIMER
|
||||
///
|
||||
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
|
||||
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
|
||||
///
|
||||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
/// you may not use this file except in compliance with the License.
|
||||
/// You may obtain a copy of the License at
|
||||
///
|
||||
/// http://www.apache.org/licenses/LICENSE-2.0
|
||||
///
|
||||
/// Unless required by applicable law or agreed to in writing, software
|
||||
/// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
/// See the License for the specific language governing permissions and
|
||||
/// limitations under the License.
|
||||
///
|
||||
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
///
|
||||
/// @author Andreas Streichardt
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#include "AgencyCallbackRegistry.h"
|
||||
#include "Basics/ReadLocker.h"
|
||||
#include "Basics/random.h"
|
||||
#include "Basics/WriteLocker.h"
|
||||
#include "Basics/Exceptions.h"
|
||||
#include "Cluster/ServerState.h"
|
||||
#include "Endpoint/Endpoint.h"
|
||||
#include <ctime>
|
||||
#include <condition_variable>
|
||||
#include <mutex>
|
||||
#include <thread>
|
||||
#include <velocypack/Slice.h>
|
||||
#include <velocypack/velocypack-aliases.h>
|
||||
|
||||
using namespace arangodb;
|
||||
|
||||
AgencyCallbackRegistry::AgencyCallbackRegistry(std::string const& callbackBasePath)
|
||||
: _agency(),
|
||||
_callbackBasePath(callbackBasePath) {
|
||||
}
|
||||
|
||||
AgencyCallbackRegistry::~AgencyCallbackRegistry() {
|
||||
}
|
||||
|
||||
bool AgencyCallbackRegistry::registerCallback(std::shared_ptr<AgencyCallback> cb) {
|
||||
uint32_t rand;
|
||||
{
|
||||
WRITE_LOCKER(locker, _lock);
|
||||
while (true) {
|
||||
rand = TRI_UInt32Random();
|
||||
if (_endpoints.count(rand) == 0) {
|
||||
_endpoints.emplace(rand, cb);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool ok = false;
|
||||
try {
|
||||
ok = _agency.registerCallback(cb->key, getEndpointUrl(rand));
|
||||
if (!ok) {
|
||||
LOG(ERR) << "Registering callback failed";
|
||||
}
|
||||
} catch (std::exception const& e) {
|
||||
LOG(ERR) << "Couldn't register callback " << e.what();
|
||||
} catch (...) {
|
||||
LOG(ERR) << "Couldn't register callback. Unknown exception";
|
||||
}
|
||||
if (!ok) {
|
||||
WRITE_LOCKER(locker, _lock);
|
||||
_endpoints.erase(rand);
|
||||
}
|
||||
return ok;
|
||||
}
|
||||
|
||||
std::shared_ptr<AgencyCallback> AgencyCallbackRegistry::getCallback(uint32_t id) {
|
||||
READ_LOCKER(locker, _lock);
|
||||
auto it = _endpoints.find(id);
|
||||
|
||||
if (it == _endpoints.end()) {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_UNKNOWN_CALLBACK_ENDPOINT);
|
||||
}
|
||||
return (*it).second;
|
||||
}
|
||||
|
||||
bool AgencyCallbackRegistry::unregisterCallback(std::shared_ptr<AgencyCallback> cb) {
|
||||
WRITE_LOCKER(locker, _lock);
|
||||
|
||||
for (auto it: _endpoints) {
|
||||
if (it.second == cb) {
|
||||
_endpoints.erase(it.first);
|
||||
_agency.unregisterCallback(cb->key, getEndpointUrl(it.first));
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
std::string AgencyCallbackRegistry::getEndpointUrl(uint32_t endpoint) {
|
||||
std::stringstream url;
|
||||
url << Endpoint::uriForm(ServerState::instance()->getAddress()) << _callbackBasePath << "/" << endpoint;
|
||||
|
||||
return url.str();
|
||||
}
|
||||
|
||||
void AgencyCallbackRegistry::awaitNextChange(std::string const& key, double timeout) {
|
||||
auto maxWait = std::chrono::milliseconds(static_cast<int>(timeout * 1000));
|
||||
|
||||
std::condition_variable cv;
|
||||
|
||||
std::function<bool(VPackSlice const& result)> notify = [&](VPackSlice const& result) {
|
||||
LOG(DEBUG) << "Notifying change!";
|
||||
cv.notify_one();
|
||||
return true;
|
||||
};
|
||||
auto agencyCallback = std::make_shared<AgencyCallback>(_agency, key, notify);
|
||||
|
||||
std::mutex mtx;
|
||||
std::unique_lock<std::mutex> lck(mtx);
|
||||
// mop: hmmm if callback registering failed this will just wait for the timeout, which
|
||||
// should be ok I think?
|
||||
registerCallback(agencyCallback);
|
||||
LOG(DEBUG) << "Awaiting change!";
|
||||
if (cv.wait_for(lck, maxWait) == std::cv_status::timeout) {
|
||||
LOG(DEBUG) << "Reached timeout!";
|
||||
}
|
||||
unregisterCallback(agencyCallback);
|
||||
}
|
|
@ -0,0 +1,75 @@
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// DISCLAIMER
|
||||
///
|
||||
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
|
||||
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
|
||||
///
|
||||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
/// you may not use this file except in compliance with the License.
|
||||
/// You may obtain a copy of the License at
|
||||
///
|
||||
/// http://www.apache.org/licenses/LICENSE-2.0
|
||||
///
|
||||
/// Unless required by applicable law or agreed to in writing, software
|
||||
/// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
/// See the License for the specific language governing permissions and
|
||||
/// limitations under the License.
|
||||
///
|
||||
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
///
|
||||
/// @author Andreas Streichardt
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "Cluster/AgencyCallback.h"
|
||||
#include "Basics/ReadWriteLock.h"
|
||||
|
||||
namespace arangodb {
|
||||
|
||||
class AgencyCallbackRegistry {
|
||||
public:
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief ctor
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
explicit AgencyCallbackRegistry(std::string const&);
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief dtor
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
~AgencyCallbackRegistry();
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief register a callback
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
bool registerCallback(std::shared_ptr<AgencyCallback>);
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief unregister a callback
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
bool unregisterCallback(std::shared_ptr<AgencyCallback>);
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief get a callback by its key
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
std::shared_ptr<AgencyCallback> getCallback(uint32_t);
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief wait for a change of the value or timeout
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
void awaitNextChange(std::string const& key, double timeout);
|
||||
|
||||
private:
|
||||
std::string getEndpointUrl(uint32_t);
|
||||
|
||||
AgencyComm _agency;
|
||||
|
||||
arangodb::basics::ReadWriteLock _lock;
|
||||
|
||||
std::string const _callbackBasePath;
|
||||
|
||||
std::unordered_map<uint32_t, std::shared_ptr<AgencyCallback>> _endpoints;
|
||||
};
|
||||
|
||||
}
|
|
@ -87,7 +87,12 @@ std::shared_ptr<VPackBuilder> AgencyOperation::toVelocyPack() {
|
|||
VPackObjectBuilder valueOperation(builder.get());
|
||||
builder->add("op", VPackValue(_opType.toString()));
|
||||
if (_opType.type == AgencyOperationType::VALUE) {
|
||||
builder->add("new", _value);
|
||||
if (_opType.value == AgencyValueOperationType::OBSERVE
|
||||
|| _opType.value == AgencyValueOperationType::UNOBSERVE) {
|
||||
builder->add("url", _value);
|
||||
} else {
|
||||
builder->add("new", _value);
|
||||
}
|
||||
if (_ttl > 0) {
|
||||
builder->add("ttl", VPackValue(_ttl));
|
||||
}
|
||||
|
@ -1532,16 +1537,35 @@ AgencyCommResult AgencyComm::casValue(std::string const& key,
|
|||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief blocks on a change of a single value in the backend
|
||||
/// @brief registers a callback on a key
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
bool AgencyComm::registerCallback(std::string const& key, std::string const& endpoint) {
|
||||
VPackBuilder builder;
|
||||
builder.add(VPackValue(endpoint));
|
||||
|
||||
AgencyCommResult result;
|
||||
AgencyOperation operation(key, AgencyValueOperationType::OBSERVE, builder.slice());
|
||||
AgencyTransaction transaction(operation);
|
||||
|
||||
sendTransactionWithFailover(result, transaction);
|
||||
|
||||
AgencyCommResult AgencyComm::watchValue(std::string const& key,
|
||||
uint64_t waitIndex, double timeout,
|
||||
bool recursive) {
|
||||
AgencyCommResult result = getValues(key, recursive);
|
||||
return result.successful();
|
||||
}
|
||||
|
||||
usleep(1000);
|
||||
return result;
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief unregisters a callback on a key
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
bool AgencyComm::unregisterCallback(std::string const& key, std::string const& endpoint) {
|
||||
VPackBuilder builder;
|
||||
builder.add(VPackValue(endpoint));
|
||||
|
||||
AgencyCommResult result;
|
||||
AgencyOperation operation(key, AgencyValueOperationType::UNOBSERVE, builder.slice());
|
||||
AgencyTransaction transaction(operation);
|
||||
|
||||
sendTransactionWithFailover(result, transaction);
|
||||
|
||||
return result.successful();
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
|
|
@ -98,6 +98,8 @@ struct AgencyCommResultEntry {
|
|||
|
||||
enum class AgencyValueOperationType {
|
||||
SET,
|
||||
OBSERVE,
|
||||
UNOBSERVE,
|
||||
PUSH,
|
||||
PREPEND
|
||||
};
|
||||
|
@ -124,6 +126,10 @@ struct AgencyOperationType {
|
|||
switch(value) {
|
||||
case AgencyValueOperationType::SET:
|
||||
return "set";
|
||||
case AgencyValueOperationType::OBSERVE:
|
||||
return "observe";
|
||||
case AgencyValueOperationType::UNOBSERVE:
|
||||
return "unobserve";
|
||||
case AgencyValueOperationType::PUSH:
|
||||
return "push";
|
||||
case AgencyValueOperationType::PREPEND:
|
||||
|
@ -296,7 +302,7 @@ struct AgencyCommResult {
|
|||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
std::string const location() const { return _location; }
|
||||
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief return the body (might be empty)
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -329,6 +335,7 @@ struct AgencyCommResult {
|
|||
std::string _location;
|
||||
std::string _message;
|
||||
std::string _body;
|
||||
std::string _realBody;
|
||||
|
||||
std::map<std::string, AgencyCommResultEntry> _values;
|
||||
uint64_t _index;
|
||||
|
@ -559,10 +566,14 @@ class AgencyComm {
|
|||
AgencyCommResult uniqid(std::string const&, uint64_t, double);
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief blocks on a change of a single value in the back end
|
||||
/// @brief registers a callback on a key
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
AgencyCommResult watchValue(std::string const&, uint64_t, double, bool);
|
||||
bool registerCallback(std::string const& key, std::string const& endpoint);
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief unregisters a callback on a key
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
bool unregisterCallback(std::string const& key, std::string const& endpoint);
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief acquire a read lock
|
||||
|
|
|
@ -43,11 +43,13 @@ using namespace arangodb::basics;
|
|||
|
||||
ApplicationCluster::ApplicationCluster(
|
||||
TRI_server_t* server, arangodb::rest::ApplicationDispatcher* dispatcher,
|
||||
ApplicationV8* applicationV8)
|
||||
ApplicationV8* applicationV8,
|
||||
arangodb::AgencyCallbackRegistry* agencyCallbackRegistry)
|
||||
: ApplicationFeature("Sharding"),
|
||||
_server(server),
|
||||
_dispatcher(dispatcher),
|
||||
_applicationV8(applicationV8),
|
||||
_agencyCallbackRegistry(agencyCallbackRegistry),
|
||||
_heartbeat(nullptr),
|
||||
_heartbeatInterval(0),
|
||||
_agencyEndpoints(),
|
||||
|
@ -99,6 +101,7 @@ void ApplicationCluster::setupOptions(
|
|||
}
|
||||
|
||||
bool ApplicationCluster::prepare() {
|
||||
ClusterInfo::createInstance(_agencyCallbackRegistry);
|
||||
// set authentication data
|
||||
ServerState::instance()->setAuthentication(_username, _password);
|
||||
|
||||
|
@ -345,6 +348,7 @@ bool ApplicationCluster::start() {
|
|||
|
||||
// start heartbeat thread
|
||||
_heartbeat = new HeartbeatThread(_server, _dispatcher, _applicationV8,
|
||||
_agencyCallbackRegistry,
|
||||
_heartbeatInterval * 1000, 5);
|
||||
|
||||
if (_heartbeat == nullptr) {
|
||||
|
|
|
@ -35,6 +35,7 @@ class ApplicationDispatcher;
|
|||
}
|
||||
|
||||
class ApplicationV8;
|
||||
class AgencyCallbackRegistry;
|
||||
class HeartbeatThread;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -48,7 +49,7 @@ class ApplicationCluster : public rest::ApplicationFeature {
|
|||
|
||||
public:
|
||||
ApplicationCluster(TRI_server_t*, arangodb::rest::ApplicationDispatcher*,
|
||||
arangodb::ApplicationV8*);
|
||||
arangodb::ApplicationV8*, arangodb::AgencyCallbackRegistry*);
|
||||
|
||||
~ApplicationCluster();
|
||||
|
||||
|
@ -96,6 +97,12 @@ class ApplicationCluster : public rest::ApplicationFeature {
|
|||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
arangodb::ApplicationV8* _applicationV8;
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief the agency callback registry
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
arangodb::AgencyCallbackRegistry* _agencyCallbackRegistry;
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief thread for heartbeat
|
||||
|
|
|
@ -52,12 +52,7 @@ using namespace arangodb;
|
|||
|
||||
using arangodb::basics::JsonHelper;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief single instance of ClusterInfo - will live as long as the server is
|
||||
/// running
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
static ClusterInfo Instance;
|
||||
static std::unique_ptr<ClusterInfo> _instance;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief a local helper to report errors and messages
|
||||
|
@ -267,17 +262,26 @@ void CollectionInfoCurrent::copyAllJsons() {
|
|||
}
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief create the clusterinfo instance
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void ClusterInfo::createInstance(AgencyCallbackRegistry* agencyCallbackRegistry) {
|
||||
_instance.reset(new ClusterInfo(agencyCallbackRegistry));
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief returns an instance of the cluster info class
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
ClusterInfo* ClusterInfo::instance() { return &Instance; }
|
||||
ClusterInfo* ClusterInfo::instance() { return _instance.get(); }
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief creates a cluster info object
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
ClusterInfo::ClusterInfo() : _agency(), _uniqid() {
|
||||
ClusterInfo::ClusterInfo(AgencyCallbackRegistry* agencyCallbackRegistry)
|
||||
: _agency(), _agencyCallbackRegistry(agencyCallbackRegistry), _uniqid() {
|
||||
_uniqid._currentValue = _uniqid._upperValue = 0ULL;
|
||||
|
||||
// Actual loading into caches is postponed until necessary
|
||||
|
@ -1060,8 +1064,6 @@ int ClusterInfo::createDatabaseCoordinator(std::string const& name,
|
|||
return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_READ_CURRENT_VERSION,
|
||||
errorMsg);
|
||||
}
|
||||
uint64_t index = res._index;
|
||||
|
||||
std::vector<ServerID> DBServers = getCurrentDBServers();
|
||||
int count = 0; // this counts, when we have to reload the DBServers
|
||||
|
||||
|
@ -1101,11 +1103,10 @@ int ClusterInfo::createDatabaseCoordinator(std::string const& name,
|
|||
return setErrormsg(TRI_ERROR_NO_ERROR, errorMsg);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
res.clear();
|
||||
res = ac.watchValue("Current/Version", index,
|
||||
getReloadServerListTimeout() / interval, false);
|
||||
index = res._index;
|
||||
_agencyCallbackRegistry->awaitNextChange("Current/Version", getReloadServerListTimeout() / interval);
|
||||
|
||||
if (++count >= static_cast<int>(getReloadServerListTimeout() / interval)) {
|
||||
// We update the list of DBServers every minute in case one of them
|
||||
// was taken away since we last looked. This also helps (slightly)
|
||||
|
@ -1177,7 +1178,6 @@ int ClusterInfo::dropDatabaseCoordinator(std::string const& name,
|
|||
return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_READ_CURRENT_VERSION,
|
||||
errorMsg);
|
||||
}
|
||||
uint64_t index = res._index;
|
||||
|
||||
std::string where = "Current/Databases/" + name;
|
||||
while (TRI_microtime() <= endTime) {
|
||||
|
@ -1199,8 +1199,7 @@ int ClusterInfo::dropDatabaseCoordinator(std::string const& name,
|
|||
}
|
||||
}
|
||||
res.clear();
|
||||
res = ac.watchValue("Current/Version", index, interval, false);
|
||||
index = res._index;
|
||||
_agencyCallbackRegistry->awaitNextChange("Current/Version", interval);
|
||||
}
|
||||
return setErrormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg);
|
||||
}
|
||||
|
@ -1270,7 +1269,6 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName,
|
|||
return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_READ_CURRENT_VERSION,
|
||||
errorMsg);
|
||||
}
|
||||
uint64_t index = res._index;
|
||||
|
||||
std::string const where =
|
||||
"Current/Collections/" + databaseName + "/" + collectionID;
|
||||
|
@ -1310,8 +1308,7 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName,
|
|||
}
|
||||
|
||||
res.clear();
|
||||
res = ac.watchValue("Current/Version", index, interval, false);
|
||||
index = res._index;
|
||||
_agencyCallbackRegistry->awaitNextChange("Current/Version", interval);
|
||||
}
|
||||
|
||||
// LOG(ERR) << "GOT TIMEOUT. NUMBEROFSHARDS: " << numberOfShards;
|
||||
|
@ -1367,7 +1364,6 @@ int ClusterInfo::dropCollectionCoordinator(std::string const& databaseName,
|
|||
return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_READ_CURRENT_VERSION,
|
||||
errorMsg);
|
||||
}
|
||||
uint64_t index = res._index;
|
||||
|
||||
// monitor the entry for the collection
|
||||
std::string const where =
|
||||
|
@ -1397,8 +1393,7 @@ int ClusterInfo::dropCollectionCoordinator(std::string const& databaseName,
|
|||
}
|
||||
|
||||
res.clear();
|
||||
res = ac.watchValue("Current/Version", index, interval, false);
|
||||
index = res._index;
|
||||
_agencyCallbackRegistry->awaitNextChange("Current/Version", interval);
|
||||
}
|
||||
return setErrormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg);
|
||||
}
|
||||
|
@ -1741,7 +1736,6 @@ int ClusterInfo::ensureIndexCoordinator(
|
|||
return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_READ_CURRENT_VERSION,
|
||||
errorMsg);
|
||||
}
|
||||
uint64_t index = res._index;
|
||||
|
||||
std::string where =
|
||||
"Current/Collections/" + databaseName + "/" + collectionID;
|
||||
|
@ -1813,8 +1807,7 @@ int ClusterInfo::ensureIndexCoordinator(
|
|||
}
|
||||
}
|
||||
res.clear();
|
||||
res = ac.watchValue("Current/Version", index, interval, false);
|
||||
index = res._index;
|
||||
_agencyCallbackRegistry->awaitNextChange("Current/Version", interval);
|
||||
}
|
||||
|
||||
return setErrormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg);
|
||||
|
@ -1959,7 +1952,6 @@ int ClusterInfo::dropIndexCoordinator(std::string const& databaseName,
|
|||
return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_READ_CURRENT_VERSION,
|
||||
errorMsg);
|
||||
}
|
||||
uint64_t index = res._index;
|
||||
|
||||
std::string where =
|
||||
"Current/Collections/" + databaseName + "/" + collectionID;
|
||||
|
@ -2001,8 +1993,7 @@ int ClusterInfo::dropIndexCoordinator(std::string const& databaseName,
|
|||
}
|
||||
|
||||
res.clear();
|
||||
res = ac.watchValue("Current/Version", index, interval, false);
|
||||
index = res._index;
|
||||
_agencyCallbackRegistry->awaitNextChange("Current/Version", interval);
|
||||
}
|
||||
|
||||
return setErrormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg);
|
||||
|
|
|
@ -30,6 +30,7 @@
|
|||
#include "Basics/Mutex.h"
|
||||
#include "Basics/ReadWriteLock.h"
|
||||
#include "Cluster/AgencyComm.h"
|
||||
#include "Cluster/AgencyCallbackRegistry.h"
|
||||
#include "VocBase/voc-types.h"
|
||||
#include "VocBase/vocbase.h"
|
||||
|
||||
|
@ -540,7 +541,7 @@ class ClusterInfo {
|
|||
/// @brief creates library
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
ClusterInfo();
|
||||
explicit ClusterInfo(AgencyCallbackRegistry*);
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief shuts down library
|
||||
|
@ -549,6 +550,7 @@ class ClusterInfo {
|
|||
~ClusterInfo();
|
||||
|
||||
public:
|
||||
static void createInstance(AgencyCallbackRegistry*);
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief get the unique instance
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -867,6 +869,8 @@ class ClusterInfo {
|
|||
|
||||
AgencyComm _agency;
|
||||
|
||||
AgencyCallbackRegistry* _agencyCallbackRegistry;
|
||||
|
||||
// Cached data from the agency, we reload whenever necessary:
|
||||
|
||||
// We group the data, each group has an atomic "valid-flag"
|
||||
|
|
|
@ -40,6 +40,7 @@
|
|||
#include "VocBase/auth.h"
|
||||
#include "VocBase/server.h"
|
||||
#include "VocBase/vocbase.h"
|
||||
#include <functional>
|
||||
|
||||
using namespace arangodb;
|
||||
|
||||
|
@ -51,15 +52,17 @@ volatile sig_atomic_t HeartbeatThread::HasRunOnce = 0;
|
|||
|
||||
HeartbeatThread::HeartbeatThread(
|
||||
TRI_server_t* server, arangodb::rest::ApplicationDispatcher* dispatcher,
|
||||
ApplicationV8* applicationV8, uint64_t interval,
|
||||
uint64_t maxFailsBeforeWarning)
|
||||
ApplicationV8* applicationV8, AgencyCallbackRegistry* agencyCallbackRegistry,
|
||||
uint64_t interval, uint64_t maxFailsBeforeWarning)
|
||||
: Thread("Heartbeat"),
|
||||
_server(server),
|
||||
_dispatcher(dispatcher),
|
||||
_applicationV8(applicationV8),
|
||||
_agencyCallbackRegistry(agencyCallbackRegistry),
|
||||
_statusLock(),
|
||||
_agency(),
|
||||
_condition(),
|
||||
_dispatchedPlanVersion(),
|
||||
_refetchUsers(),
|
||||
_myId(ServerState::instance()->getId()),
|
||||
_interval(interval),
|
||||
|
@ -109,20 +112,54 @@ void HeartbeatThread::runDBServer() {
|
|||
// convert timeout to seconds
|
||||
double const interval = (double)_interval / 1000.0 / 1000.0;
|
||||
|
||||
// last value of plan which we have noticed:
|
||||
uint64_t lastPlanVersionNoticed = 0;
|
||||
|
||||
// last value of plan for which a job was successfully completed
|
||||
// on a coordinator, only this is used and not lastPlanVersionJobScheduled
|
||||
uint64_t lastPlanVersionJobSuccess = 0;
|
||||
|
||||
// value of Sync/Commands/my-id at startup
|
||||
uint64_t lastCommandIndex = getLastCommandIndex();
|
||||
|
||||
uint64_t agencyIndex = 0;
|
||||
|
||||
std::function<bool(VPackSlice const& result)> updatePlan = [&](VPackSlice const& result) {
|
||||
bool mustHandlePlanChange = false;
|
||||
{
|
||||
MUTEX_LOCKER(mutexLocker, _statusLock);
|
||||
if (_numDispatchedJobs == -1) {
|
||||
LOG(DEBUG) << "Dispatched plan changed returned";
|
||||
// mop: unblock dispatching
|
||||
_numDispatchedJobs = 0;
|
||||
if (_lastDispatchedJobResult) {
|
||||
LOG(DEBUG) << "...and was successful";
|
||||
// mop: the dispatched version is still the same => we aer finally uptodate
|
||||
if (!_dispatchedPlanVersion.isEmpty() && _dispatchedPlanVersion.slice().equals(result)) {
|
||||
LOG(DEBUG) << "Version is correct :)";
|
||||
return true;
|
||||
}
|
||||
// mop: meanwhile we got an updated version and must reschedule
|
||||
}
|
||||
}
|
||||
if (_numDispatchedJobs == 0) {
|
||||
LOG(DEBUG) << "Will dispatch plan change " << result;
|
||||
mustHandlePlanChange = true;
|
||||
_dispatchedPlanVersion.clear();
|
||||
_dispatchedPlanVersion.add(result);
|
||||
}
|
||||
}
|
||||
if (mustHandlePlanChange) {
|
||||
// mop: a dispatched task has returned
|
||||
handlePlanChangeDBServer(arangodb::basics::VelocyPackHelper::stringUInt64(result));
|
||||
}
|
||||
return false;
|
||||
};
|
||||
|
||||
auto agencyCallback = std::make_shared<AgencyCallback>(_agency, "Plan/Version", updatePlan);
|
||||
|
||||
bool registered = false;
|
||||
while (!registered) {
|
||||
registered = _agencyCallbackRegistry->registerCallback(agencyCallback);
|
||||
if (!registered) {
|
||||
LOG(ERR) << "Couldn't register plan change in agency!";
|
||||
sleep(1);
|
||||
}
|
||||
}
|
||||
|
||||
while (!isStopping()) {
|
||||
LOG(TRACE) << "sending heartbeat to agency";
|
||||
LOG(DEBUG) << "sending heartbeat to agency";
|
||||
|
||||
double const start = TRI_microtime();
|
||||
|
||||
|
@ -147,75 +184,12 @@ void HeartbeatThread::runDBServer() {
|
|||
if (isStopping()) {
|
||||
break;
|
||||
}
|
||||
|
||||
// The following loop will run until the interval has passed, at which
|
||||
// time a break will be called.
|
||||
while (true) {
|
||||
double remain = interval - (TRI_microtime() - start);
|
||||
|
||||
if (remain <= 0.0) {
|
||||
break;
|
||||
}
|
||||
|
||||
// First see whether a previously scheduled job has done some good:
|
||||
double timeout = remain;
|
||||
{
|
||||
MUTEX_LOCKER(mutexLocker, _statusLock);
|
||||
if (_numDispatchedJobs == -1) {
|
||||
if (_lastDispatchedJobResult) {
|
||||
lastPlanVersionJobSuccess = _versionThatTriggeredLastJob;
|
||||
LOG(INFO) << "Found result of successful handleChangesDBServer "
|
||||
"job, have now version "
|
||||
<< lastPlanVersionJobSuccess << ".";
|
||||
}
|
||||
_numDispatchedJobs = 0;
|
||||
} else if (_numDispatchedJobs > 0) {
|
||||
timeout = (std::min)(0.1, remain);
|
||||
// Only wait for at most this much, because
|
||||
// we want to see the result of the running job
|
||||
// in time
|
||||
}
|
||||
}
|
||||
|
||||
// get the current version of the Plan, or watch for a change:
|
||||
AgencyCommResult result;
|
||||
result.clear();
|
||||
|
||||
result =
|
||||
_agency.watchValue("Plan/Version", agencyIndex + 1, timeout, false);
|
||||
|
||||
if (result.successful()) {
|
||||
agencyIndex = result.index();
|
||||
result.parse("", false);
|
||||
|
||||
std::map<std::string, AgencyCommResultEntry>::iterator it =
|
||||
result._values.begin();
|
||||
|
||||
if (it != result._values.end()) {
|
||||
// there is a plan version
|
||||
uint64_t planVersion =
|
||||
arangodb::basics::VelocyPackHelper::stringUInt64(
|
||||
it->second._vpack->slice());
|
||||
|
||||
if (planVersion > lastPlanVersionNoticed) {
|
||||
lastPlanVersionNoticed = planVersion;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
agencyIndex = 0;
|
||||
}
|
||||
|
||||
if (lastPlanVersionNoticed > lastPlanVersionJobSuccess &&
|
||||
!hasPendingJob()) {
|
||||
handlePlanChangeDBServer(lastPlanVersionNoticed);
|
||||
}
|
||||
|
||||
if (isStopping()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
double remain = interval - (TRI_microtime() - start);
|
||||
agencyCallback->waitWithFailover(remain);
|
||||
}
|
||||
|
||||
|
||||
_agencyCallbackRegistry->unregisterCallback(agencyCallback);
|
||||
// Wait until any pending job is finished
|
||||
int count = 0;
|
||||
while (count++ < 10000) {
|
||||
|
@ -231,15 +205,6 @@ void HeartbeatThread::runDBServer() {
|
|||
LOG(TRACE) << "stopped heartbeat thread (DBServer version)";
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief check whether a job is still running or does not have reported yet
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool HeartbeatThread::hasPendingJob() {
|
||||
MUTEX_LOCKER(mutexLocker, _statusLock);
|
||||
return _numDispatchedJobs != 0;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief heartbeat main loop, coordinator version
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
|
|
@ -30,6 +30,7 @@
|
|||
#include "Basics/Thread.h"
|
||||
#include "Logger/Logger.h"
|
||||
#include "Cluster/AgencyComm.h"
|
||||
#include "Cluster/AgencyCallbackRegistry.h"
|
||||
|
||||
struct TRI_server_t;
|
||||
struct TRI_vocbase_t;
|
||||
|
@ -48,7 +49,7 @@ class HeartbeatThread : public Thread {
|
|||
|
||||
public:
|
||||
HeartbeatThread(TRI_server_t*, arangodb::rest::ApplicationDispatcher*,
|
||||
ApplicationV8*, uint64_t, uint64_t);
|
||||
ApplicationV8*, AgencyCallbackRegistry*, uint64_t, uint64_t);
|
||||
|
||||
~HeartbeatThread();
|
||||
|
||||
|
@ -78,12 +79,6 @@ class HeartbeatThread : public Thread {
|
|||
|
||||
void removeDispatchedJob(bool success);
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief check whether a job is still running or does not have reported yet
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool hasPendingJob();
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief whether or not the thread has run at least once.
|
||||
/// this is used on the coordinator only
|
||||
|
@ -166,6 +161,12 @@ class HeartbeatThread : public Thread {
|
|||
|
||||
ApplicationV8* _applicationV8;
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief AgencyCallbackRegistry
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
AgencyCallbackRegistry* _agencyCallbackRegistry;
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief status lock
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -184,6 +185,8 @@ class HeartbeatThread : public Thread {
|
|||
|
||||
arangodb::basics::ConditionVariable _condition;
|
||||
|
||||
VPackBuilder _dispatchedPlanVersion;
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief users for these databases will be re-fetched the next time the
|
||||
/// heartbeat thread runs
|
||||
|
@ -238,6 +241,7 @@ class HeartbeatThread : public Thread {
|
|||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
std::atomic<bool> _ready;
|
||||
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief whether or not the heartbeat thread has run at least once
|
||||
|
|
|
@ -0,0 +1,82 @@
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// DISCLAIMER
|
||||
///
|
||||
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
|
||||
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
|
||||
///
|
||||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
/// you may not use this file except in compliance with the License.
|
||||
/// You may obtain a copy of the License at
|
||||
///
|
||||
/// http://www.apache.org/licenses/LICENSE-2.0
|
||||
///
|
||||
/// Unless required by applicable law or agreed to in writing, software
|
||||
/// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
/// See the License for the specific language governing permissions and
|
||||
/// limitations under the License.
|
||||
///
|
||||
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
///
|
||||
/// @author Andreas Streichardt
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#include "RestAgencyCallbacksHandler.h"
|
||||
#include "Rest/HttpRequest.h"
|
||||
#include "Rest/HttpResponse.h"
|
||||
#include "Cluster/AgencyCallbackRegistry.h"
|
||||
|
||||
using namespace arangodb;
|
||||
using namespace arangodb::rest;
|
||||
|
||||
RestAgencyCallbacksHandler::RestAgencyCallbacksHandler(arangodb::HttpRequest* request,
|
||||
arangodb::AgencyCallbackRegistry* agencyCallbackRegistry)
|
||||
: RestVocbaseBaseHandler(request),
|
||||
_agencyCallbackRegistry(agencyCallbackRegistry) {
|
||||
}
|
||||
|
||||
bool RestAgencyCallbacksHandler::isDirect() const { return true; }
|
||||
|
||||
arangodb::rest::HttpHandler::status_t RestAgencyCallbacksHandler::execute() {
|
||||
std::vector<std::string> const& suffix = _request->suffix();
|
||||
|
||||
if (suffix.size() != 1) {
|
||||
generateError(GeneralResponse::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER,
|
||||
"invalid callback");
|
||||
return status_t(HANDLER_DONE);
|
||||
}
|
||||
|
||||
// extract the sub-request type
|
||||
auto const type = _request->requestType();
|
||||
if (type != GeneralRequest::RequestType::POST) {
|
||||
generateError(GeneralResponse::ResponseCode::METHOD_NOT_ALLOWED,
|
||||
TRI_ERROR_HTTP_METHOD_NOT_ALLOWED);
|
||||
return status_t(HANDLER_DONE);
|
||||
}
|
||||
|
||||
bool parseSuccess = true;
|
||||
VPackOptions options = VPackOptions::Defaults;
|
||||
options.checkAttributeUniqueness = true;
|
||||
std::shared_ptr<VPackBuilder> parsedBody =
|
||||
parseVelocyPackBody(&options, parseSuccess);
|
||||
if (!parseSuccess) {
|
||||
generateError(GeneralResponse::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER,
|
||||
"invalid JSON");
|
||||
return status_t(HANDLER_DONE);
|
||||
}
|
||||
|
||||
try {
|
||||
std::stringstream ss(suffix.at(0));
|
||||
uint32_t index;
|
||||
ss >> index;
|
||||
|
||||
auto callback = _agencyCallbackRegistry->getCallback(index);
|
||||
LOG(DEBUG) << "Agency callback has been triggered. refetching!";
|
||||
callback->refetchAndUpdate();
|
||||
createResponse(arangodb::GeneralResponse::ResponseCode::ACCEPTED);
|
||||
} catch (arangodb::basics::Exception const&) {
|
||||
// mop: not found...expected
|
||||
createResponse(arangodb::GeneralResponse::ResponseCode::NOT_FOUND);
|
||||
}
|
||||
return status_t(HANDLER_DONE);
|
||||
}
|
|
@ -0,0 +1,54 @@
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// DISCLAIMER
|
||||
///
|
||||
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
|
||||
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
|
||||
///
|
||||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
/// you may not use this file except in compliance with the License.
|
||||
/// You may obtain a copy of the License at
|
||||
///
|
||||
/// http://www.apache.org/licenses/LICENSE-2.0
|
||||
///
|
||||
/// Unless required by applicable law or agreed to in writing, software
|
||||
/// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
/// See the License for the specific language governing permissions and
|
||||
/// limitations under the License.
|
||||
///
|
||||
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
///
|
||||
/// @author Andreas Streichardt
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "Basics/Common.h"
|
||||
#include "RestHandler/RestVocbaseBaseHandler.h"
|
||||
|
||||
namespace arangodb {
|
||||
class AgencyCallbackRegistry;
|
||||
|
||||
namespace rest {
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief shard control request handler
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
class RestAgencyCallbacksHandler : public RestVocbaseBaseHandler {
|
||||
public:
|
||||
RestAgencyCallbacksHandler(HttpRequest* request, AgencyCallbackRegistry* agencyCallbackRegistry);
|
||||
|
||||
public:
|
||||
bool isDirect() const override;
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief executes the handler
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
status_t execute() override;
|
||||
private:
|
||||
AgencyCallbackRegistry* _agencyCallbackRegistry;
|
||||
};
|
||||
}
|
||||
}
|
|
@ -544,67 +544,6 @@ static void JS_SetAgency(v8::FunctionCallbackInfo<v8::Value> const& args) {
|
|||
TRI_V8_TRY_CATCH_END
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief watches a value in the agency
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
static void JS_WatchAgency(v8::FunctionCallbackInfo<v8::Value> const& args) {
|
||||
TRI_V8_TRY_CATCH_BEGIN(isolate);
|
||||
v8::HandleScope scope(isolate);
|
||||
|
||||
if (args.Length() < 1) {
|
||||
TRI_V8_THROW_EXCEPTION_USAGE(
|
||||
"watch(<key>, <waitIndex>, <timeout>, <recursive>)");
|
||||
}
|
||||
|
||||
std::string const key = TRI_ObjectToString(args[0]);
|
||||
double timeout = 1.0;
|
||||
uint64_t waitIndex = 0;
|
||||
bool recursive = false;
|
||||
|
||||
if (args.Length() > 1) {
|
||||
waitIndex = TRI_ObjectToUInt64(args[1], true);
|
||||
}
|
||||
if (args.Length() > 2) {
|
||||
timeout = TRI_ObjectToDouble(args[2]);
|
||||
}
|
||||
if (args.Length() > 3) {
|
||||
recursive = TRI_ObjectToBoolean(args[3]);
|
||||
}
|
||||
|
||||
AgencyComm comm;
|
||||
AgencyCommResult result = comm.watchValue(key, waitIndex, timeout, recursive);
|
||||
|
||||
if (result._statusCode == 0) {
|
||||
// watch timed out
|
||||
TRI_V8_RETURN_FALSE();
|
||||
}
|
||||
|
||||
if (!result.successful()) {
|
||||
THROW_AGENCY_EXCEPTION(result);
|
||||
}
|
||||
|
||||
result.parse("", false);
|
||||
std::map<std::string, AgencyCommResultEntry>::const_iterator it =
|
||||
result._values.begin();
|
||||
|
||||
v8::Handle<v8::Object> l = v8::Object::New(isolate);
|
||||
|
||||
while (it != result._values.end()) {
|
||||
std::string const key = (*it).first;
|
||||
VPackSlice const slice = it->second._vpack->slice();
|
||||
|
||||
if (!slice.isNone()) {
|
||||
l->Set(TRI_V8_STD_STRING(key), TRI_VPackToV8(isolate, slice));
|
||||
}
|
||||
|
||||
++it;
|
||||
}
|
||||
|
||||
TRI_V8_RETURN(l);
|
||||
TRI_V8_TRY_CATCH_END
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief returns the agency endpoints
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -2105,8 +2044,6 @@ void TRI_InitV8Cluster(v8::Isolate* isolate, v8::Handle<v8::Context> context) {
|
|||
TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING("remove"),
|
||||
JS_RemoveAgency);
|
||||
TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING("set"), JS_SetAgency);
|
||||
TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING("watch"),
|
||||
JS_WatchAgency);
|
||||
TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING("endpoints"),
|
||||
JS_EndpointsAgency);
|
||||
TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING("prefix"),
|
||||
|
|
|
@ -54,6 +54,7 @@
|
|||
#include "Cluster/ApplicationCluster.h"
|
||||
#include "Cluster/ClusterComm.h"
|
||||
#include "Cluster/HeartbeatThread.h"
|
||||
#include "Cluster/RestAgencyCallbacksHandler.h"
|
||||
#include "Cluster/RestShardHandler.h"
|
||||
#include "Dispatcher/ApplicationDispatcher.h"
|
||||
#include "Dispatcher/Dispatcher.h"
|
||||
|
@ -375,6 +376,7 @@ ArangoServer::ArangoServer(int argc, char** argv)
|
|||
_applicationDispatcher(nullptr),
|
||||
_applicationEndpointServer(nullptr),
|
||||
_applicationCluster(nullptr),
|
||||
_agencyCallbackRegistry(nullptr),
|
||||
_applicationAgency(nullptr),
|
||||
_jobManager(nullptr),
|
||||
_applicationV8(nullptr),
|
||||
|
@ -431,6 +433,7 @@ ArangoServer::~ArangoServer() {
|
|||
Nonce::destroy();
|
||||
|
||||
delete _applicationServer;
|
||||
delete _agencyCallbackRegistry;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -790,6 +793,12 @@ void ArangoServer::defineHandlers(HttpHandlerFactory* factory) {
|
|||
"/_api/shard-comm",
|
||||
RestHandlerCreator<RestShardHandler>::createData<Dispatcher*>,
|
||||
_applicationDispatcher->dispatcher());
|
||||
|
||||
// add "/agency-callbacks" handler
|
||||
factory->addPrefixHandler(
|
||||
getAgencyCallbacksPath(),
|
||||
RestHandlerCreator<RestAgencyCallbacksHandler>::createData<AgencyCallbackRegistry*>,
|
||||
_agencyCallbackRegistry);
|
||||
|
||||
// add "/aql" handler
|
||||
factory->addPrefixHandler(
|
||||
|
@ -1052,9 +1061,13 @@ void ArangoServer::buildApplicationServer() {
|
|||
// .............................................................................
|
||||
// cluster options
|
||||
// .............................................................................
|
||||
_agencyCallbackRegistry = new AgencyCallbackRegistry(
|
||||
getAgencyCallbacksPath()
|
||||
);
|
||||
|
||||
_applicationCluster =
|
||||
new ApplicationCluster(_server, _applicationDispatcher, _applicationV8);
|
||||
new ApplicationCluster(_server, _applicationDispatcher, _applicationV8,
|
||||
_agencyCallbackRegistry);
|
||||
_applicationServer->addFeature(_applicationCluster);
|
||||
|
||||
// .............................................................................
|
||||
|
|
|
@ -54,6 +54,7 @@ class HttpHandlerFactory;
|
|||
|
||||
class ApplicationV8;
|
||||
class ApplicationCluster;
|
||||
class AgencyCallbackRegistry;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief ArangoDB server
|
||||
|
@ -171,6 +172,14 @@ class ArangoServer {
|
|||
int startupDaemon();
|
||||
|
||||
private:
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief get agency callback path
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
std::string const getAgencyCallbacksPath() {
|
||||
return "/_api/agency/agency-callbacks";
|
||||
};
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief run arbitrary checks at startup
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -269,6 +278,12 @@ class ArangoServer {
|
|||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
arangodb::ApplicationCluster* _applicationCluster;
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief cluster application feature
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
arangodb::AgencyCallbackRegistry* _agencyCallbackRegistry;
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief cluster application feature
|
||||
|
|
|
@ -148,6 +148,7 @@
|
|||
"ERROR_CLUSTER_COULD_NOT_DETERMINE_ID" : { "code" : 1476, "message" : "could not determine my ID from my local info" },
|
||||
"ERROR_CLUSTER_ONLY_ON_DBSERVER" : { "code" : 1477, "message" : "this operation is only valid on a DBserver in a cluster" },
|
||||
"ERROR_CLUSTER_BACKEND_UNAVAILABLE" : { "code" : 1478, "message" : "A cluster backend which was required for the operation could not be reached" },
|
||||
"ERROR_CLUSTER_UNKNOWN_CALLBACK_ENDPOINT" : { "code" : 1479, "message" : "An endpoint couldn't be found" },
|
||||
"ERROR_QUERY_KILLED" : { "code" : 1500, "message" : "query killed" },
|
||||
"ERROR_QUERY_PARSE" : { "code" : 1501, "message" : "%s" },
|
||||
"ERROR_QUERY_EMPTY" : { "code" : 1502, "message" : "query is empty" },
|
||||
|
|
|
@ -32,8 +32,8 @@
|
|||
.globl TRI_BlockCrc32_SSE42
|
||||
.globl _TRI_BlockCrc32_SSE42
|
||||
#ifndef __APPLE__
|
||||
.type TRI_BlockCrc32_SSE42, @function
|
||||
.type _TRI_BlockCrc32_SSE42, @function
|
||||
// .type TRI_BlockCrc32_SSE42, @function
|
||||
// .type _TRI_BlockCrc32_SSE42, @function
|
||||
#endif
|
||||
TRI_BlockCrc32_SSE42:
|
||||
_TRI_BlockCrc32_SSE42:
|
||||
|
@ -57,7 +57,7 @@ crca4:
|
|||
crca9:
|
||||
ret
|
||||
#ifndef __APPLE__
|
||||
.size TRI_BlockCrc32_SSE42, .-TRI_BlockCrc32_SSE42
|
||||
.size _TRI_BlockCrc32_SSE42, .-_TRI_BlockCrc32_SSE42
|
||||
// .size TRI_BlockCrc32_SSE42, .-TRI_BlockCrc32_SSE42
|
||||
// .size _TRI_BlockCrc32_SSE42, .-_TRI_BlockCrc32_SSE42
|
||||
#endif
|
||||
/* end of TRI_BlockCrc32_SSE42 */
|
||||
|
|
|
@ -184,6 +184,7 @@ ERROR_ARANGO_DOCUMENT_NOT_FOUND_OR_SHARDING_ATTRIBUTES_CHANGED,1475,"document no
|
|||
ERROR_CLUSTER_COULD_NOT_DETERMINE_ID,1476,"could not determine my ID from my local info","Will be raised if a cluster server at startup could not determine its own ID from the local info provided."
|
||||
ERROR_CLUSTER_ONLY_ON_DBSERVER,1477,"this operation is only valid on a DBserver in a cluster","Will be raised if there is an attempt to run a DBserver-only operation on a different type of node."
|
||||
ERROR_CLUSTER_BACKEND_UNAVAILABLE,1478,"A cluster backend which was required for the operation could not be reached","Will be raised if a required db server can't be reached."
|
||||
ERROR_CLUSTER_UNKNOWN_CALLBACK_ENDPOINT,1479,"An endpoint couldn't be found", "An endpoint couldn't be found"
|
||||
|
||||
################################################################################
|
||||
## ArangoDB query errors
|
||||
|
|
|
@ -144,6 +144,7 @@ void TRI_InitializeErrorMessages () {
|
|||
REG_ERROR(ERROR_CLUSTER_COULD_NOT_DETERMINE_ID, "could not determine my ID from my local info");
|
||||
REG_ERROR(ERROR_CLUSTER_ONLY_ON_DBSERVER, "this operation is only valid on a DBserver in a cluster");
|
||||
REG_ERROR(ERROR_CLUSTER_BACKEND_UNAVAILABLE, "A cluster backend which was required for the operation could not be reached");
|
||||
REG_ERROR(ERROR_CLUSTER_UNKNOWN_CALLBACK_ENDPOINT, "An endpoint couldn't be found");
|
||||
REG_ERROR(ERROR_QUERY_KILLED, "query killed");
|
||||
REG_ERROR(ERROR_QUERY_PARSE, "%s");
|
||||
REG_ERROR(ERROR_QUERY_EMPTY, "query is empty");
|
||||
|
|
|
@ -353,6 +353,8 @@
|
|||
/// a different type of node.
|
||||
/// - 1478: @LIT{A cluster backend which was required for the operation could not be reached}
|
||||
/// Will be raised if a required db server can't be reached.
|
||||
/// - 1479: @LIT{An endpoint couldn't be found}
|
||||
/// "An endpoint couldn't be found"
|
||||
/// - 1500: @LIT{query killed}
|
||||
/// Will be raised when a running query is killed by an explicit admin
|
||||
/// command.
|
||||
|
@ -2080,6 +2082,16 @@ void TRI_InitializeErrorMessages ();
|
|||
|
||||
#define TRI_ERROR_CLUSTER_BACKEND_UNAVAILABLE (1478)
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief 1479: ERROR_CLUSTER_UNKNOWN_CALLBACK_ENDPOINT
|
||||
///
|
||||
/// An endpoint couldn't be found
|
||||
///
|
||||
/// "An endpoint couldn't be found"
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#define TRI_ERROR_CLUSTER_UNKNOWN_CALLBACK_ENDPOINT (1479)
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief 1500: ERROR_QUERY_KILLED
|
||||
///
|
||||
|
|
|
@ -71,6 +71,7 @@ start() {
|
|||
--cluster.my-local-info $TYPE:127.0.0.1:$PORT \
|
||||
--cluster.my-role $ROLE \
|
||||
--log.file cluster/$PORT.log \
|
||||
--log.buffered false \
|
||||
--log.level info \
|
||||
--log.requests-file cluster/$PORT.req \
|
||||
--server.disable-statistics true \
|
||||
|
@ -197,7 +198,7 @@ testServer() {
|
|||
PORT=$1
|
||||
while true ; do
|
||||
sleep 1
|
||||
curl -s -X GET "http://127.0.0.1:$PORT/_api/version" > /dev/null 2>&1
|
||||
curl -s -f -X GET "http://127.0.0.1:$PORT/_api/version" > /dev/null 2>&1
|
||||
if [ "$?" != "0" ] ; then
|
||||
echo Server on port $PORT does not answer yet.
|
||||
else
|
||||
|
|
Loading…
Reference in New Issue