1
0
Fork 0

Merge branch 'devel' of github.com:arangodb/arangodb into devel

This commit is contained in:
Andreas Streichardt 2016-06-14 13:04:30 +02:00
commit f46096d693
29 changed files with 538 additions and 367 deletions

View File

@ -61,6 +61,7 @@ set(ARANGODB_VERSION
# for NSIS
set(ARANGODB_DISPLAY_NAME "ArangoDB")
set(ARANGODB_URL_INFO_ABOUT "https://www.arangodb.com")
set(ARANGODB_HELP_LINK "https://docs.arangodb.com/${ARANGODB_VERSION_MAJOR}.${ARANGODB_VERSION_MINOR}/")
set(ARANGODB_CONTACT "hackers@arangodb.com")
set(ARANGODB_FRIENDLY_STRING "ArangoDB - the multi-model database")

View File

@ -1,11 +1,35 @@
!CHAPTER Linux
You can find binary packages for the most common Linux distributions
[here](http://www.arangodb.com/install/).
- Visit the official [ArangoDB install page](https://www.arangodb.com/install)
and download the correct package for your Linux distribution. You can find
binary packages for the most common distributions there.
- Follow the instructions to use your favorite package manager for the
major distributions. After setting up the ArangoDB repository you can
easily install ArangoDB using yum, aptitude, urpmi or zypper.
- Alternatively, see [Compiling](Compiling.md) if you want to build ArangoDB
yourself.
- Start up the database server, normally this is done by
executing `etc/init.d/arangod start`. The exact command
depends on your Linux distribution
After these steps there should be a running instance of *_arangod_* -
the ArangoDB database server.
unix> ps auxw | fgrep arangod
arangodb 14536 0.1 0.6 5307264 23464 s002 S 1:21pm 0:00.18 /usr/local/sbin/arangod
If there is no such process, check the log file
*/var/log/arangodb/arangod.log* for errors. If you see a log message like
2012-12-03T11:35:29Z [12882] ERROR Database directory version (1) is lower than server version (1.2).
2012-12-03T11:35:29Z [12882] ERROR It seems like you have upgraded the ArangoDB binary. If this is what you wanted to do, please restart with the --database.upgrade option to upgrade the data in the database directory.
2012-12-03T11:35:29Z [12882] FATAL Database version check failed. Please start the server with the --database.upgrade option
make sure to start the server once with the *--database.upgrade* option.
Note that you may have to enable logging first. If you start the server
in a shell, you should see errors logged there as well.
Follow the instructions to use your favorite package manager for the
major distributions. After setting up the ArangoDB repository you can
easily install ArangoDB using yum, aptitude, urpmi or zypper.
!SUBSECTION Linux Mint

View File

@ -18,6 +18,8 @@ installed as:
/usr/local/sbin/arangod
You can start the server by running the command `/usr/local/sbin/arangod &`.
The ArangoDB shell will be installed as:
/usr/local/bin/arangosh
@ -41,6 +43,7 @@ also need to update homebrew:
brew update
!SUBSECTION Known issues
- Performance - the LLVM delivered as of Mac OS X El Capitan builds slow binaries. Use GCC instead,
until this issue has been fixed by Apple.
- the Commandline argument parsing doesn't accept blanks in filenames; the CLI version below does.

View File

@ -2,8 +2,10 @@
This chapter describes how to install ArangoDB under various operation systems.
First of all download and install the corresponding RPM or Debian package or use homebrew on MacOS X.
You can find packages for various operation systems at our [install](https://www.arangodb.com/download) section.
First of all, download and install the corresponding RPM or Debian package or use
homebrew on MacOS X. You can find packages for various operation systems at our
[install](https://www.arangodb.com/download) section, including an installer
for Windows.
In this chapter you will also learn how to compile ArangoDB from scratch.

View File

@ -1,6 +1,6 @@
!CHAPTER Windows
The default installation directory is *C:\Program Files\ArangoDB-3.x.y*. During the
The default installation directory is *C:\Program Files\ArangoDB-3.x.x*. During the
installation process you may change this. In the following description we will assume
that ArangoDB has been installed in the location *<ROOTDIR>*.
@ -16,28 +16,28 @@ Installing for multiple users: Keep the default directory. After the
installation edit the file *<ROOTDIR>\etc\ArangoDB\arangod.conf*. Adjust the
*directory* and *app-path* so that these paths point into your home directory.
[database]
directory = @HOMEDRIVE@\@HOMEPATH@\arangodb\databases
[database]
directory = @HOMEDRIVE@\@HOMEPATH@\arangodb\databases
[javascript]
app-path = @HOMEDRIVE@\@HOMEPATH@\arangodb\apps
[javascript]
app-path = @HOMEDRIVE@\@HOMEPATH@\arangodb\apps
Create the directories for each user that wants to use ArangoDB.
Create the directories for each user that wants to use ArangoDB.
Installing as Service: Keep the default directory. After the installation open
a command line as administrator (search for *cmd* and right click *run as
administrator*).
cmd> arangod --install-service
INFO: adding service 'ArangoDB - the multi-model database' (internal 'ArangoDB')
INFO: added service with command line '"C:\Program Files (x86)\ArangoDB 1.4.4\bin\arangod.exe" --start-service'
cmd> arangod --install-service
INFO: adding service 'ArangoDB - the multi-model database' (internal 'ArangoDB')
INFO: added service with command line '"C:\Program Files (x86)\ArangoDB 3.x.x\bin\arangod.exe" --start-service'
Open the service manager and start ArangoDB. In order to enable logging
edit the file "<ROOTDIR>\etc\arangodb\arangod.conf" and uncomment the file
option.
Open the service manager and start ArangoDB. In order to enable logging
edit the file "<ROOTDIR>\etc\arangodb\arangod.conf" and uncomment the file
option.
[log]
file = @ROOTDIR@\var\log\arangodb\arangod.log
[log]
file = @ROOTDIR@\var\log\arangodb\arangod.log
!SUBSECTION Client, Server and Lock-Files

View File

@ -1,40 +0,0 @@
!CHAPTER Getting familiar with ArangoDB
First of all download and install the corresponding RPM or Debian package or use
homebrew on the MacOS X. See the [installation manual](Installing/README.md) for more details.
!SUBSECTION For Linux
* Visit the official [ArangoDB install page](https://www.arangodb.com/install)
and download the correct package for your Linux distribution
* Install the package using your favorite package manager
* Start up the database server, normally this is done by
executing */etc/init.d/arangod start*. The exact command
depends on your Linux distribution
!SUBSECTION For MacOS X
* Execute *brew install arangodb*
* And start the server using */usr/local/sbin/arangod &*
!SUBSECTION For Microsoft Windows
* Visit the official [ArangoDB install page](https://www.arangodb.com/install)
and download the installer for Windows
* Start up the database server
After these steps there should be a running instance of *_arangod_* -
the ArangoDB database server.
unix> ps auxw | fgrep arangod
arangodb 14536 0.1 0.6 5307264 23464 s002 S 1:21pm 0:00.18 /usr/local/sbin/arangod
If there is no such process, check the log file
*/var/log/arangodb/arangod.log* for errors. If you see a log message
like
2012-12-03T11:35:29Z [12882] ERROR Database directory version (1) is lower than server version (1.2).
2012-12-03T11:35:29Z [12882] ERROR It seems like you have upgraded the ArangoDB binary. If this is what you wanted to do, please restart with the --database.upgrade option to upgrade the data in the database directory.
2012-12-03T11:35:29Z [12882] FATAL Database version check failed. Please start the server with the --database.upgrade option
make sure to start the server once with the *--database.upgrade* option.

View File

@ -62,7 +62,7 @@ is set to *true*.
Each plan in the result is a JSON object with the following attributes:
- *nodes*: the array of execution nodes of the plan. The array of available node types
can be found [here](../Aql/Optimizer.md)
can be found [here](../../AQL/Optimizer.html)
- *estimatedCost*: the total estimated cost for the plan. If there are multiple
plans, the optimizer will choose the plan with the lowest total cost.
@ -70,7 +70,7 @@ Each plan in the result is a JSON object with the following attributes:
- *collections*: an array of collections used in the query
- *rules*: an array of rules the optimizer applied. An overview of the
available rules can be found [here](../Aql/Optimizer.md)
available rules can be found [here](../../AQL/Optimizer.html)
- *variables*: array of variables used in the query (note: this may contain
internal variables created by the optimizer)

View File

@ -135,7 +135,7 @@ If the query specification is complete, the server will process the query. If an
error occurs during query processing, the server will respond with *HTTP 400*.
Again, the body of the response will contain details about the error.
A [list of query errors can be found here](../ErrorCodes/README.md).
A [list of query errors can be found here](../../Manual/Appendix/ErrorCodes.html).
@RESTRETURNCODE{404}

View File

@ -21,7 +21,7 @@ is applied before the *limit* restriction. (optional)
This will find all documents matching a given example.
Returns a cursor containing the result, see [Http Cursor](../HttpAqlQueryCursor/README.md) for details.
Returns a cursor containing the result, see [Http Cursor](../AqlQueryCursor/README.md) for details.
@RESTRETURNCODES

View File

@ -32,12 +32,12 @@ query specified in *query*.
In order to use the *fulltext* operator, a fulltext index must be defined
for the collection and the specified attribute.
Returns a cursor containing the result, see [Http Cursor](../HttpAqlQueryCursor/README.md) for details.
Returns a cursor containing the result, see [Http Cursor](../AqlQueryCursor/README.md) for details.
Note: the *fulltext* simple query is **deprecated** as of ArangoDB 2.6.
This API may be removed in future versions of ArangoDB. The preferred
way for retrieving documents from a collection using the near operator is
to issue an AQL query using the *FULLTEXT* [AQL function](../Aql/FulltextFunctions.md)
to issue an AQL query using the *FULLTEXT* [AQL function](../../AQL/Functions/Fulltext.html)
as follows:

View File

@ -40,7 +40,7 @@ for the document. If you have more than one geo-spatial index, you can use
the *geo* field to select a particular index.
Returns a cursor containing the result, see [Http Cursor](../HttpAqlQueryCursor/README.md) for details.
Returns a cursor containing the result, see [Http Cursor](../AqlQueryCursor/README.md) for details.
Note: the *near* simple query is **deprecated** as of ArangoDB 2.6.
This API may be removed in future versions of ArangoDB. The preferred

View File

@ -32,7 +32,7 @@ is applied before the *limit* restriction. (optional)
This will find all documents within a given range. In order to execute a
range query, a skip-list index on the queried attribute must be present.
Returns a cursor containing the result, see [Http Cursor](../HttpAqlQueryCursor/README.md) for details.
Returns a cursor containing the result, see [Http Cursor](../AqlQueryCursor/README.md) for details.
Note: the *range* simple query is **deprecated** as of ArangoDB 2.6.
The function may be removed in future versions of ArangoDB. The preferred

View File

@ -41,7 +41,7 @@ coordinates for the document. If you have more than one geo-spatial index,
you can use the *geo* field to select a particular index.
Returns a cursor containing the result, see [Http Cursor](../HttpAqlQueryCursor/README.md) for details.
Returns a cursor containing the result, see [Http Cursor](../AqlQueryCursor/README.md) for details.
Note: the *within* simple query is **deprecated** as of ArangoDB 2.6.
This API may be removed in future versions of ArangoDB. The preferred

View File

@ -39,7 +39,7 @@ the collection. This index also defines which attribute holds the
coordinates for the document. If you have more than one geo-spatial index,
you can use the *geo* field to select a particular index.
Returns a cursor containing the result, see [Http Cursor](../HttpAqlQueryCursor/README.md) for details.
Returns a cursor containing the result, see [Http Cursor](../AqlQueryCursor/README.md) for details.
@RESTRETURNCODES

View File

@ -281,7 +281,6 @@ winXX-build:
packXX:
if test ! -d ../b/js; then ./Installation/file-copy-js.sh . ../b; fi
cd ../b; rm -f ArangoDB-*.exe ArangoDB*.nsi
cd ../b && find -name cmake_install.cmake -exec sed -i {} -e "s;(Configuration);{CMAKE_INSTALL_CONFIG_NAME};" \;
cd ../b && cpack -G NSIS -C $(BUILD_TARGET)
cd ../b && cpack -G ZIP -C $(BUILD_TARGET)

View File

@ -25,7 +25,7 @@
!define TRI_UNINSTALL_REG_PATH "Software\Microsoft\Windows\CurrentVersion\Uninstall\@CPACK_PACKAGE_INSTALL_REGISTRY_KEY@";
!define TRI_SVC_NAME 'ArangoDB'
; Put some of the more custom ones in from the CMakeFile:
@CPACK_NSIS_DEFINES@
@CPACK_ARANGODB_NSIS_DEFINES@
;--------------------------------
;Variables

View File

@ -29,11 +29,6 @@ echo "$0: compiling ArangoDB"
(cd build && make -j1)
echo
echo "$0: linting ArangoDB JS"
./utils/jslint.sh
echo
echo "$0: testing ArangoDB"

View File

@ -36,50 +36,47 @@
namespace arangodb {
namespace consensus {
typedef uint64_t term_t; // Term type
typedef uint32_t id_t; // Id type
enum role_t { // Role
FOLLOWER,
CANDIDATE,
LEADER
};
/// @brief Term type
typedef uint64_t term_t;
const std::vector<std::string> roleStr ({"Follower", "Candidate", "Leader"});
struct constituent_t { // Constituent type
id_t id;
std::string endpoint;
};
/// @brief Id type
typedef uint32_t id_t;
typedef std::vector<constituent_t> constituency_t; // Constituency type
typedef uint32_t state_t; // State type
typedef std::chrono::duration<long, std::ratio<1, 1000>>
duration_t; // Duration type
/// @brief Agent roles
enum role_t {FOLLOWER, CANDIDATE, LEADER};
/// @brief Duration type
typedef std::chrono::duration<long, std::ratio<1, 1000>> duration_t;
/// @brief Log query type
using query_t = std::shared_ptr<arangodb::velocypack::Builder>;
struct vote_ret_t {
query_t result;
explicit vote_ret_t(query_t res) : result(res) {}
};
/// @brief Log entry index type
typedef uint64_t index_t;
/// @brief Read request return type
struct read_ret_t {
bool accepted; // Query processed
id_t redirect; // Otherwise redirect to
std::vector<bool> success;
query_t result; // Result
bool accepted; ///< @brief Query accepted (i.e. we are leader)
id_t redirect; ///< @brief If not accepted redirect id
std::vector<bool> success; ///< @brief Query's precond OK
query_t result; ///< @brief Query result
read_ret_t(bool a, id_t id, std::vector<bool> suc = std::vector<bool>(),
query_t res = nullptr)
: accepted(a), redirect(id), success(suc), result(res) {}
};
typedef uint64_t index_t;
typedef std::initializer_list<index_t> index_list_t;
/// @brief Write request return type
struct write_ret_t {
bool accepted; // Query processed
id_t redirect; // Otherwise redirect to
bool accepted; ///< @brief Query accepted (i.e. we are leader)
id_t redirect; ///< @brief If not accepted redirect id
std::vector<bool> applied;
std::vector<index_t> indices; // Indices of log entries (if any) to wait for
write_ret_t() : accepted(false), redirect(0) {}
@ -89,54 +86,41 @@ struct write_ret_t {
: accepted(a), redirect(id), applied(app), indices(idx) {}
};
using namespace std::chrono;
/// @brief Buffer type
using buffer_t = std::shared_ptr<arangodb::velocypack::Buffer<uint8_t>>;
/**
* @brief State entry
*/
/// @brief State entry
struct log_t {
index_t index;
term_t term;
id_t leaderId;
buffer_t entry;
milliseconds timestamp;
index_t index; ///< @brief Log index
term_t term; ///< @brief Log term
id_t leaderId; ///< @brief Leader's ID
buffer_t entry; ///< @brief To log
std::chrono::milliseconds timestamp; ///< @brief Timestamp
log_t(index_t idx, term_t t, id_t lid, buffer_t const& e)
: index(idx),
term(t),
leaderId(lid),
entry(e),
timestamp(duration_cast<milliseconds>(
system_clock::now().time_since_epoch())) {}
: index(idx), term(t), leaderId(lid), entry(e),
timestamp(std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())) {}
friend std::ostream& operator<<(std::ostream& o, log_t const& l) {
o << l.index << " " << l.term << " " << l.leaderId << " "
<< l.entry->toString() << " " << l.timestamp.count();
return o;
}
};
enum agencyException { QUERY_NOT_APPLICABLE };
struct append_entries_t {
term_t term;
bool success;
append_entries_t(term_t t, bool s) : term(t), success(s) {}
};
struct collect_ret_t {
index_t prev_log_index;
term_t prev_log_term;
std::vector<index_t> indices;
collect_ret_t() : prev_log_index(0), prev_log_term(0) {}
collect_ret_t(index_t pli, term_t plt, std::vector<index_t> const& idx)
: prev_log_index(pli), prev_log_term(plt), indices(idx) {}
size_t size() const { return indices.size(); }
};
/// @brief Private RPC return type
struct priv_rpc_ret_t {
bool success;
term_t term;
priv_rpc_ret_t(bool s, term_t t) : success(s), term(t) {}
};
}
}

View File

@ -40,7 +40,8 @@ using namespace arangodb::velocypack;
namespace arangodb {
namespace consensus {
// Agent configuration
/// Agent configuration
Agent::Agent(config_t const& config)
: Thread("Agent"),
_config(config),
@ -51,42 +52,62 @@ Agent::Agent(config_t const& config)
_confirmed.resize(size(), 0); // agency's size and reset to 0
}
// This agent's id
arangodb::consensus::id_t Agent::id() const { return _config.id; }
// Shutdown
Agent::~Agent() { shutdown(); }
/// This agent's id
arangodb::consensus::id_t Agent::id() const {
return _config.id;
}
// State machine
State const& Agent::state() const { return _state; }
// Start all agent thread
/// Dtor shuts down thread
Agent::~Agent() {
shutdown();
}
/// State machine
State const& Agent::state() const {
return _state;
}
/// Start all agent thread
bool Agent::start() {
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Starting agency comm worker.";
Thread::start();
return true;
}
// This agent's term
term_t Agent::term() const { return _constituent.term(); }
// Agency size
inline size_t Agent::size() const { return _config.size(); }
/// This agent's term
term_t Agent::term() const {
return _constituent.term();
}
// My endpoint
std::string const& Agent::endpoint() const { return _config.endpoint; }
// Handle vote request
priv_rpc_ret_t Agent::requestVote(term_t t, arangodb::consensus::id_t id,
index_t lastLogIndex, index_t lastLogTerm,
query_t const& query) {
/// Agency size
inline size_t Agent::size() const {
return _config.size();
}
/// My endpoint
std::string const& Agent::endpoint() const {
return _config.endpoint;
}
/// Handle voting
priv_rpc_ret_t Agent::requestVote(
term_t t, arangodb::consensus::id_t id, index_t lastLogIndex,
index_t lastLogTerm, query_t const& query) {
/// Are we receiving new endpoints
if (query != nullptr) { // record new endpoints
if (query->slice().hasKey("endpoints") &&
query->slice().get("endpoints").isArray()) {
size_t j = 0;
for (auto const& i :
VPackArrayIterator(query->slice().get("endpoints"))) {
for (auto const& i : VPackArrayIterator(query->slice().get("endpoints"))) {
_config.endpoints[j++] = i.copyString();
}
}
@ -95,29 +116,35 @@ priv_rpc_ret_t Agent::requestVote(term_t t, arangodb::consensus::id_t id,
/// Constituent handles this
return priv_rpc_ret_t(_constituent.vote(t, id, lastLogIndex, lastLogTerm),
this->term());
}
// Get configuration
config_t const& Agent::config() const { return _config; }
// Leader's id
/// Get configuration
config_t const& Agent::config() const {
return _config;
}
/// Leader's id
arangodb::consensus::id_t Agent::leaderID() const {
return _constituent.leaderID();
}
// Are we leading?
bool Agent::leading() const { return _constituent.leading(); }
// Persist term and id we vote for
void Agent::persist(term_t t, arangodb::consensus::id_t i) {
// _state.persist(t, i);
/// Are we leading?
bool Agent::leading() const {
return _constituent.leading();
}
// Waits here for confirmation of log's commits up to index.
// Timeout in seconds
bool Agent::waitFor(index_t index, double timeout) {
if (size() == 1) // single host agency
if (size() == 1) { // single host agency
return true;
}
CONDITION_LOCKER(guard, _waitForCV);
@ -137,48 +164,63 @@ bool Agent::waitFor(index_t index, double timeout) {
return false;
}
}
// We should never get here
TRI_ASSERT(false);
}
// AgentCallback reports id of follower and its highest processed index
void Agent::reportIn(arangodb::consensus::id_t id, index_t index) {
MUTEX_LOCKER(mutexLocker, _ioLock);
if (index > _confirmed[id]) // progress this follower?
if (index > _confirmed[id]) { // progress this follower?
_confirmed[id] = index;
}
if (index > _lastCommitIndex) { // progress last commit?
size_t n = 0;
for (size_t i = 0; i < size(); ++i) {
n += (_confirmed[i] >= index);
}
if (n > size() / 2) { // catch up read database and commit index
// catch up read database and commit index
if (n > size() / 2) {
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Critical mass for commiting "
<< _lastCommitIndex + 1 << " through "
<< index << " to read db";
_readDB.apply(_state.slices(_lastCommitIndex + 1, index));
_lastCommitIndex = index;
if (_lastCommitIndex >= _nextCompationAfter) {
_state.compact(_lastCommitIndex);
_nextCompationAfter += _config.compactionStepSize;
}
}
}
CONDITION_LOCKER(guard, _waitForCV);
_waitForCV.broadcast(); // wake up REST handlers
}
// Followers' append entries
/// Followers' append entries
bool Agent::recvAppendEntriesRPC(term_t term,
arangodb::consensus::id_t leaderId,
index_t prevIndex, term_t prevTerm,
index_t leaderCommitIndex,
query_t const& queries) {
// Update commit index
// Update commit index
if (queries->slice().type() != VPackValueType::Array) {
LOG_TOPIC(WARN, Logger::AGENCY)
<< "Received malformed entries for appending. Discarting!";
@ -213,10 +255,8 @@ bool Agent::recvAppendEntriesRPC(term_t term,
<< " entries to state machine.";
/* bool success = */
_state.log(queries, term, leaderId, prevIndex, prevTerm);
} else {
// heart-beat
}
}
// appendEntries 5. If leaderCommit > commitIndex, set commitIndex =
// min(leaderCommit, index of last new entry)
if (leaderCommitIndex > lastCommitIndex) {
@ -226,58 +266,69 @@ bool Agent::recvAppendEntriesRPC(term_t term,
return true;
}
// Leader's append entries
append_entries_t Agent::sendAppendEntriesRPC(
/// Leader's append entries
priv_rpc_ret_t Agent::sendAppendEntriesRPC(
arangodb::consensus::id_t follower_id) {
index_t last_confirmed = _confirmed[follower_id];
std::vector<log_t> unconfirmed = _state.get(last_confirmed);
MUTEX_LOCKER(mutexLocker, _ioLock);
term_t t = this->term();
term_t t(0);
{
MUTEX_LOCKER(mutexLocker, _ioLock);
t = this->term();
}
// RPC path
std::stringstream path;
path << "/_api/agency_priv/appendEntries?term=" << t << "&leaderId=" << id()
<< "&prevLogIndex=" << unconfirmed[0].index
<< "&prevLogTerm=" << unconfirmed[0].term
<< "&prevLogIndex=" << unconfirmed.front().index
<< "&prevLogTerm=" << unconfirmed.front().term
<< "&leaderCommit=" << _lastCommitIndex;
// Headers
auto headerFields =
std::make_unique<std::unordered_map<std::string, std::string>>();
// Highest unconfirmed
index_t last = unconfirmed[0].index;
// Body
Builder builder;
index_t last = unconfirmed[0].index;
builder.add(VPackValue(VPackValueType::Array));
for (size_t i = 1; i < unconfirmed.size(); ++i) {
auto const& entry = unconfirmed.at(i);
builder.add(VPackValue(VPackValueType::Object));
builder.add("index", VPackValue(unconfirmed[i].index));
builder.add("query", VPackSlice(unconfirmed[i].entry->data()));
builder.add("index", VPackValue(entry.index));
builder.add("query", VPackSlice(entry.entry->data()));
builder.close();
last = unconfirmed[i].index;
last = entry.index;
}
builder.close();
// Send request
// Verbose output
if (unconfirmed.size() > 1) {
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Appending " << unconfirmed.size() - 1
<< " entries up to index " << last
<< " to follower " << follower_id;
}
// Send request
arangodb::ClusterComm::instance()->asyncRequest(
"1", 1, _config.endpoints[follower_id],
arangodb::GeneralRequest::RequestType::POST, path.str(),
std::make_shared<std::string>(builder.toJson()), headerFields,
std::make_shared<AgentCallback>(this, follower_id, last), 1, true);
return append_entries_t(t, true);
return priv_rpc_ret_t(true, t);
}
// @brief load persistent state
/// Load persistent state
bool Agent::load() {
DatabaseFeature* database =
ApplicationServer::getFeature<DatabaseFeature>("Database");
@ -314,72 +365,85 @@ bool Agent::load() {
return true;
}
// Write new entries to replicated state and store
/// Write new entries to replicated state and store
write_ret_t Agent::write(query_t const& query) {
std::vector<bool> applied;
std::vector<index_t> indices;
index_t maxind = 0;
if (_constituent.leading()) { // Only leader
std::vector<bool> applied;
std::vector<index_t> indices;
index_t maxind = 0;
{
MUTEX_LOCKER(mutexLocker, _ioLock);
applied = _spearhead.apply(query); // Apply to spearhead
indices = _state.log(query, applied, term(), id()); // Log w/ indicies
}
if (!indices.empty()) {
maxind = *std::max_element(indices.begin(), indices.end());
}
// _appendCV.signal(); // Wake up run
reportIn(id(), maxind);
return write_ret_t(true, id(), applied,
indices); // Indices to wait for to rest
} else { // Else we redirect
// Only leader else redirect
if (!_constituent.leading()) {
return write_ret_t(false, _constituent.leaderID());
}
// Apply to spearhead and get indices for log entries
{
MUTEX_LOCKER(mutexLocker, _ioLock);
applied = _spearhead.apply(query);
indices = _state.log(query, applied, term(), id());
}
// Maximum log index
if (!indices.empty()) {
maxind = *std::max_element(indices.begin(), indices.end());
}
// Report that leader has persisted
reportIn(id(), maxind);
return write_ret_t(true, id(), applied, indices);
}
// Read from store
/// Read from store
read_ret_t Agent::read(query_t const& query) const {
if (_constituent.leading()) { // Only working as leader
query_t result = std::make_shared<arangodb::velocypack::Builder>();
std::vector<bool> success = _readDB.read(query, result);
return read_ret_t(true, _constituent.leaderID(), success, result);
} else { // Else We redirect
// Only leader else redirect
if (!_constituent.leading()) {
return read_ret_t(false, _constituent.leaderID());
}
// Retrieve data from readDB
query_t result = std::make_shared<arangodb::velocypack::Builder>();
std::vector<bool> success = _readDB.read(query, result);
return read_ret_t(true, _constituent.leaderID(), success, result);
}
// Repeated append entries
/// Send out append entries to followers regularly or on event
void Agent::run() {
CONDITION_LOCKER(guard, _appendCV);
while (!this->isStopping() && size() > 1) { // need only to run in multi-host
// Only run in case we are in multi-host mode
while (!this->isStopping() && size() > 1) {
if (leading())
_appendCV.wait(25000); // Only if leading
else
_appendCV.wait(); // Just sit there doing nothing
// Collect all unacknowledged
if (leading()) { // Only if leading
_appendCV.wait(25000);
} else {
_appendCV.wait(); // Else wait for our moment in the sun
}
// Append entries to followers
for (arangodb::consensus::id_t i = 0; i < size(); ++i) {
if (i != id()) {
sendAppendEntriesRPC(i);
}
}
}
}
// Orderly shutdown
/// Orderly shutdown
void Agent::beginShutdown() {
// Personal hygiene
Thread::beginShutdown();
// Stop supervision
@ -392,12 +456,13 @@ void Agent::beginShutdown() {
_spearhead.beginShutdown();
_readDB.beginShutdown();
// Wake up all waiting REST handler (waitFor)
// Wake up all waiting rest handlers
{
CONDITION_LOCKER(guardW, _waitForCV);
guardW.broadcast();
}
// Wake up run method
// Wake up run
{
CONDITION_LOCKER(guardA, _appendCV);
guardA.broadcast();
@ -405,46 +470,72 @@ void Agent::beginShutdown() {
}
// Becoming leader
/// Becoming leader
bool Agent::lead() {
// Key value stores
rebuildDBs();
// Wake up run
_appendCV.signal();
CONDITION_LOCKER(guard, _appendCV);
guard.signal();
return true;
}
// Rebuild key value stores
bool Agent::rebuildDBs() {
MUTEX_LOCKER(mutexLocker, _ioLock);
_spearhead.apply(_state.slices());
_readDB.apply(_state.slices());
return true;
}
// Last log entry
/// Last commit index
arangodb::consensus::index_t Agent::lastCommited() const {
return _lastCommitIndex;
}
/// Last log entry
log_t const& Agent::lastLog() const { return _state.lastLog(); }
// Get spearhead
/// Get spearhead
Store const& Agent::spearhead() const { return _spearhead; }
// Get readdb
/// Get readdb
Store const& Agent::readDB() const { return _readDB; }
/// Rebuild from persisted state
Agent& Agent::operator=(VPackSlice const& compaction) {
// Catch up with compacted state
MUTEX_LOCKER(mutexLocker, _ioLock);
_spearhead = compaction.get("readDB");
_readDB = compaction.get("readDB");
// Catch up with commit
try {
_lastCommitIndex = std::stoul(compaction.get("_key").copyString());
} catch (std::exception const& e) {
LOG_TOPIC(ERR, Logger::AGENCY) << e.what();
}
// Schedule next compaction
_nextCompationAfter = _lastCommitIndex + _config.compactionStepSize;
return *this;
}
}
}
}} // namespace

View File

@ -70,6 +70,9 @@ class Agent : public arangodb::Thread {
/// @brief Are we fit to run?
bool fitness() const;
/// @brief Leader ID
arangodb::consensus::index_t lastCommited() const;
/// @brief Leader ID
arangodb::consensus::id_t leaderID() const;
@ -96,7 +99,7 @@ class Agent : public arangodb::Thread {
/// @brief Invoked by leader to replicate log entries ($5.3);
/// also used as heartbeat ($5.2).
append_entries_t sendAppendEntriesRPC(arangodb::consensus::id_t slave_id);
priv_rpc_ret_t sendAppendEntriesRPC(arangodb::consensus::id_t slave_id);
/// @brief 1. Deal with appendEntries to slaves.
/// 2. Report success of write processes.
@ -120,9 +123,6 @@ class Agent : public arangodb::Thread {
/// @brief Last log entry
log_t const& lastLog() const;
/// @brief Persist term
void persist(term_t, arangodb::consensus::id_t);
/// @brief State machine
State const& state() const;

View File

@ -50,10 +50,16 @@ using namespace arangodb::rest;
using namespace arangodb::velocypack;
using namespace arangodb;
// Configure with agent's configuration
/// Raft role names for display purposes
const std::vector<std::string> roleStr ({"Follower", "Candidate", "Leader"});
/// Configure with agent's configuration
void Constituent::configure(Agent* agent) {
_agent = agent;
TRI_ASSERT(_agent != nullptr);
if (size() == 1) {
_role = LEADER;
} else {
@ -62,50 +68,63 @@ void Constituent::configure(Agent* agent) {
notifyAll();
}
}
}
// Default ctor
Constituent::Constituent()
: Thread("Constituent"),
_vocbase(nullptr),
_queryRegistry(nullptr),
_term(0),
_leaderID((std::numeric_limits<arangodb::consensus::id_t>::max)()),
_id(0),
// XXX #warning KAVEH use RandomGenerator
_gen(std::random_device()()),
_role(FOLLOWER),
_agent(nullptr),
_votedFor((std::numeric_limits<arangodb::consensus::id_t>::max)()),
_notifier(nullptr) {
: Thread("Constituent"),
_vocbase(nullptr),
_queryRegistry(nullptr),
_term(0),
_leaderID((std::numeric_limits<arangodb::consensus::id_t>::max)()),
_id(0),
// XXX #warning KAVEH use RandomGenerator
_gen(std::random_device()()),
_role(FOLLOWER),
_agent(nullptr),
_votedFor((std::numeric_limits<arangodb::consensus::id_t>::max)()),
_notifier(nullptr) {
_gen.seed(RandomGenerator::interval(UINT32_MAX));
}
/// Shutdown if not already
Constituent::~Constituent() {
shutdown();
}
// Shutdown if not already
Constituent::~Constituent() { shutdown(); }
// Configuration
config_t const& Constituent::config() const { return _agent->config(); }
/// Configuration
config_t const& Constituent::config() const {
return _agent->config();
}
// Wait for sync
bool Constituent::waitForSync() const { return _agent->config().waitForSync; }
// Random sleep times in election process
/// Wait for sync
bool Constituent::waitForSync() const {
return _agent->config().waitForSync;
}
/// Random sleep times in election process
duration_t Constituent::sleepFor(double min_t, double max_t) {
dist_t dis(min_t, max_t);
return duration_t((long)std::round(dis(_gen) * 1000.0));
}
// Get my term
/// Get my term
term_t Constituent::term() const {
MUTEX_LOCKER(guard, _castLock);
return _term;
}
// Update my term
void Constituent::term(term_t t) {
term_t tmp;
/// Update my term
void Constituent::term(term_t t) {
term_t tmp;
{
MUTEX_LOCKER(guard, _castLock);
tmp = _term;
@ -113,6 +132,7 @@ void Constituent::term(term_t t) {
}
if (tmp != t) {
LOG_TOPIC(INFO, Logger::AGENCY) << roleStr[_role] << " term " << t;
Builder body;
@ -141,86 +161,116 @@ void Constituent::term(term_t t) {
options.silent = true;
OperationResult result = trx.insert("election", body.slice(), options);
/*res = */ trx.finish(result.code); // OMG
trx.finish(result.code);
}
}
/// @brief My role
/// My role
role_t Constituent::role() const {
MUTEX_LOCKER(guard, _castLock);
return _role;
}
/// @brief Become follower in term
/// Become follower in term
void Constituent::follow(term_t t) {
MUTEX_LOCKER(guard, _castLock);
if (_role != FOLLOWER) {
LOG_TOPIC(INFO, Logger::AGENCY)
<< "Role change: Converting to follower in term " << t;
<< "Role change: Converting to follower in term " << t;
}
_term = t;
_role = FOLLOWER;
}
/// @brief Become leader
/// Become leader
void Constituent::lead() {
MUTEX_LOCKER(guard, _castLock);
if (_role != LEADER) {
LOG_TOPIC(INFO, Logger::AGENCY)
<< "Role change: Converted to leader in term " << _term;
_agent->lead(); // We need to rebuild spear_head and read_db;
}
_role = LEADER;
_role = LEADER;
_leaderID = _id;
}
/// @brief Become follower
/// Become follower
void Constituent::candidate() {
MUTEX_LOCKER(guard, _castLock);
if (_role != CANDIDATE)
LOG_TOPIC(INFO, Logger::AGENCY)
<< "Role change: Converted to candidate in term " << _term;
_role = CANDIDATE;
}
/// @brief Leading?
/// Leading?
bool Constituent::leading() const {
MUTEX_LOCKER(guard, _castLock);
return _role == LEADER;
}
/// @brief Following?
/// Following?
bool Constituent::following() const {
MUTEX_LOCKER(guard, _castLock);
return _role == FOLLOWER;
}
/// @brief Runnig as candidate?
/// Runnig as candidate?
bool Constituent::running() const {
MUTEX_LOCKER(guard, _castLock);
return _role == CANDIDATE;
}
/// @brief Get current leader's id
arangodb::consensus::id_t Constituent::leaderID() const { return _leaderID; }
/// @brief Agency size
size_t Constituent::size() const { return config().size(); }
/// Get current leader's id
arangodb::consensus::id_t Constituent::leaderID() const {
return _leaderID;
}
/// @brief Get endpoint to an id
/// Agency size
size_t Constituent::size() const {
return config().size();
}
/// Get endpoint to an id
std::string const& Constituent::endpoint(arangodb::consensus::id_t id) const {
return config().endpoints[id];
}
/// @brief Get all endpoints
/// Get all endpoints
std::vector<std::string> const& Constituent::endpoints() const {
return config().endpoints;
}
/// @brief Notify peers of updated endpoints
/// Notify peers of updated endpoints
void Constituent::notifyAll() {
std::vector<std::string> toNotify;
// Send request to all but myself
std::vector<std::string> toNotify;
for (arangodb::consensus::id_t i = 0; i < size(); ++i) {
if (i != _id) {
toNotify.push_back(endpoint(i));
@ -231,9 +281,11 @@ void Constituent::notifyAll() {
auto body = std::make_shared<VPackBuilder>();
body->openObject();
body->add("endpoints", VPackValue(VPackValueType::Array));
for (auto const& i : endpoints()) {
body->add(Value(i));
}
body->close();
body->close();
@ -244,6 +296,7 @@ void Constituent::notifyAll() {
_notifier = std::make_unique<NotifierThread>(path.str(), body, toNotify);
_notifier->start();
}
/// @brief Vote
@ -267,7 +320,6 @@ bool Constituent::vote(term_t term, arangodb::consensus::id_t id,
if (_role > FOLLOWER) {
follow(_term);
}
_agent->persist(term, id);
{
CONDITION_LOCKER(guard, _cv);
_cv.signal();
@ -359,9 +411,13 @@ void Constituent::callElection() {
} else {
follow(_term);
}
}
/// Start clean shutdown
void Constituent::beginShutdown() {
_notifier.reset();
Thread::beginShutdown();
@ -370,20 +426,28 @@ void Constituent::beginShutdown() {
}
/// Start operation
bool Constituent::start(TRI_vocbase_t* vocbase,
aql::QueryRegistry* queryRegistry) {
_vocbase = vocbase;
_queryRegistry = queryRegistry;
return Thread::start();
}
/// Get persisted information and run election process
void Constituent::run() {
TRI_ASSERT(_vocbase != nullptr);
auto bindVars = std::make_shared<VPackBuilder>();
bindVars->openObject();
bindVars->close();
// Query
// Most recent vote
std::string const aql("FOR l IN election SORT l._key DESC LIMIT 1 RETURN l");
arangodb::aql::Query query(false, _vocbase, aql.c_str(), aql.size(), bindVars,
nullptr, arangodb::aql::PART_MAIN);
@ -410,6 +474,7 @@ void Constituent::run() {
// Always start off as follower
while (!this->isStopping() && size() > 1) {
if (_role == FOLLOWER) {
bool cast = false;
@ -423,20 +488,22 @@ void Constituent::run() {
{
CONDITION_LOCKER(guardv, _cv);
/*bool timedout =*/_cv.wait(rand_wait);
_cv.wait(rand_wait);
}
{
MUTEX_LOCKER(guard, _castLock);
cast = _cast;
}
if (!cast) {
candidate(); // Next round, we are running
}
} else {
callElection(); // Run for office
}
}
}

View File

@ -136,11 +136,10 @@ class Constituent : public arangodb::Thread {
term_t _term; /**< @brief term number */
std::atomic<bool> _cast; /**< @brief cast a vote this term */
std::atomic<state_t> _state; /**< @brief State (follower, candidate, leader)*/
arangodb::consensus::id_t _leaderID; /**< @brief Current leader */
arangodb::consensus::id_t _id; /**< @brief My own id */
constituency_t _constituency; /**< @brief List of consituents */
std::mt19937 _gen; /**< @brief Random number generator */
role_t _role; /**< @brief My role */
Agent* _agent; /**< @brief My boss */

View File

@ -175,16 +175,17 @@ std::vector<log_t> State::get(arangodb::consensus::index_t start,
return entries;
}
std::vector<VPackSlice> State::slices(arangodb::consensus::index_t start,
arangodb::consensus::index_t end) const {
std::vector<VPackSlice> State::slices(
arangodb::consensus::index_t start, arangodb::consensus::index_t end) const {
std::vector<VPackSlice> slices;
MUTEX_LOCKER(mutexLocker, _logLock);
if (start < _log.front().index) {
if (start < _log.front().index) { // no start specified
start = _log.front().index;
}
if (start > _log.back().index) {
if (start > _log.back().index) { // no end specified
return slices;
}
@ -192,7 +193,7 @@ std::vector<VPackSlice> State::slices(arangodb::consensus::index_t start,
end = _log.back().index;
}
for (size_t i = start - _cur; i <= end - _cur; ++i) { // TODO:: Check bounds
for (size_t i = start - _cur; i <= end - _cur; ++i) {
try {
slices.push_back(VPackSlice(_log.at(i).entry->data()));
} catch (std::exception const&) {

View File

@ -51,32 +51,34 @@ struct Empty {
/// @brief Split strings by separator
inline std::vector<std::string> split(const std::string& value,
char separator) {
std::vector<std::string> result;
std::string::size_type p = (value.find(separator) == 0) ? 1 : 0;
std::string::size_type q;
std::vector<std::string> res;
std::string::size_type q, p = (value.find(separator) == 0) ? 1 : 0;
while ((q = value.find(separator, p)) != std::string::npos) {
result.emplace_back(value, p, q - p);
res.emplace_back(value, p, q - p);
p = q + 1;
}
result.emplace_back(value, p);
result.erase(std::find_if(result.rbegin(), result.rend(), NotEmpty()).base(),
result.end());
return result;
res.emplace_back(value, p);
res.erase(
std::find_if(res.rbegin(), res.rend(), NotEmpty()).base(), res.end());
return res;
}
// Build endpoint from URL
inline static bool endpointPathFromUrl(std::string const& url,
std::string& endpoint,
std::string& path) {
inline static bool endpointPathFromUrl(
std::string const& url, std::string& endpoint, std::string& path) {
std::stringstream ep;
path = "/";
size_t pos = 7;
if (url.find("http://") == 0) {
if (url.compare(0, pos, "http://") == 0) {
ep << "tcp://";
} else if (url.find("https://") == 0) {
} else if (url.compare(0, ++pos, "https://") == 0) {
ep << "ssl://";
++pos;
} else {
return false;
}
@ -96,6 +98,7 @@ inline static bool endpointPathFromUrl(std::string const& url,
endpoint = ep.str();
return true;
}
@ -145,6 +148,7 @@ Store::~Store() {
// Apply queries multiple queries to store
std::vector<bool> Store::apply(query_t const& query) {
std::vector<bool> applied;
MUTEX_LOCKER(storeLocker, _storeLock);
for (auto const& i : VPackArrayIterator(query->slice())) {
@ -179,15 +183,18 @@ std::string const& Store::name() const {
}
// template<class T, class U> std::multimap<std::string, std::string>
std::ostream& operator<<(std::ostream& os,
std::multimap<std::string, std::string> const& m) {
std::ostream& operator<<(
std::ostream& os, std::multimap<std::string, std::string> const& m) {
for (auto const& i : m) {
os << i.first << ": " << i.second << std::endl;
}
return os;
}
// Apply external
// Notification type
struct notify_t {
std::string key;
std::string modified;
@ -248,8 +255,8 @@ std::vector<bool> Store::apply(
for (auto const& url : urls) {
Builder body; // host
body.openObject();
body.add("term", VPackValue(0));
body.add("index", VPackValue(0));
body.add("term", VPackValue(_agent->term()));
body.add("index", VPackValue(_agent->lastCommited()));
auto ret = in.equal_range(url);
for (auto it = ret.first; it != ret.second; ++it) {

View File

@ -82,6 +82,9 @@ class Store : public arangodb::Thread {
/// @brief Notify observers
void notifyObservers() const;
/// @brief See how far the path matches anything in store
size_t matchPath(std::vector<std::string> const& pv) const;
/// @brief Get node specified by path vector
Node operator()(std::vector<std::string> const& pv);
/// @brief Get node specified by path vector

View File

@ -84,8 +84,8 @@ std::vector<check_t> Supervision::checkDBServers() {
for (auto const& machine : machinesPlanned) {
bool good = false;
std::string lastHeartbeatTime, lastHeartbeatStatus, lastHeartbeatAcked,
lastStatus, heartbeatTime, heartbeatStatus, serverID;
std::string lastHeartbeatTime, lastHeartbeatAcked, lastStatus, heartbeatTime,
heartbeatStatus, serverID;
serverID = machine.first;
heartbeatTime = _snapshot(syncPrefix + serverID + "/time").toJson();
@ -195,8 +195,8 @@ std::vector<check_t> Supervision::checkCoordinators() {
for (auto const& machine : machinesPlanned) {
bool good = false;
std::string lastHeartbeatTime, lastHeartbeatStatus, lastHeartbeatAcked,
lastStatus, heartbeatTime, heartbeatStatus, serverID;
std::string lastHeartbeatTime, lastHeartbeatAcked, lastStatus, heartbeatTime,
heartbeatStatus, serverID;
serverID = machine.first;
heartbeatTime = _snapshot(syncPrefix + serverID + "/time").toJson();
@ -208,8 +208,6 @@ std::vector<check_t> Supervision::checkCoordinators() {
try { // Existing
lastHeartbeatTime =
_snapshot(healthPrefix + serverID + "/LastHeartbeatSent").toJson();
lastHeartbeatStatus =
_snapshot(healthPrefix + serverID + "/LastHeartbeatStatus").toJson();
lastStatus = _snapshot(healthPrefix + serverID + "/Status").toJson();
if (lastHeartbeatTime != heartbeatTime) { // Update
good = true;

View File

@ -339,7 +339,7 @@ if (MSVC)
DIRECTORY "${PROJECT_SOURCE_DIR}/Installation/Windows/Icons"
DESTINATION ${TRI_RESOURCEDIR})
set(CPACK_NSIS_DEFINES "
set(CPACK_ARANGODB_NSIS_DEFINES "
!define BITS ${BITS}
!define TRI_FRIENDLY_SVC_NAME '${ARANGODB_FRIENDLY_STRING}'
!define TRI_AARDVARK_URL 'http://127.0.0.1:8529'

View File

@ -34,11 +34,10 @@
img.icon {
background-color: $c-white;
border-radius: 3px;
height: auto;
max-height: 162px;
max-width: 162px;
box-sizing: border-box;
height: 100%;
width: 100%;
padding: 10px;
width: auto;
}
}

View File

@ -959,38 +959,66 @@ function cleanupCurrentCollections (plannedCollections, currentCollections,
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief lock key space
////////////////////////////////////////////////////////////////////////////////
function lockSyncKeyspace() {
while (!global.KEY_SET_CAS("shardSynchronization", "lock", 1, null)) {
wait(0.001);
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief unlock key space
////////////////////////////////////////////////////////////////////////////////
function unlockSyncKeyspace() {
global.KEY_SET("shardSynchronization", "lock", null);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief launch a scheduled job if needed
////////////////////////////////////////////////////////////////////////////////
function launchJob() {
function tryLaunchJob() {
const registerTask = require("internal").registerTask;
var jobs = global.KEYSPACE_GET("shardSynchronization");
if (jobs.running === null) {
var shards = Object.keys(jobs.scheduled);
if (shards.length > 0) {
var jobInfo = jobs.scheduled[shards[0]];
try {
registerTask({
database: jobInfo.database,
params: {database: jobInfo.database, shard: jobInfo.shard,
planId: jobInfo.planId, leader: jobInfo.leader},
command: function(params) {
require("@arangodb/cluster").synchronizeOneShard(
params.database, params.shard, params.planId, params.leader);
}});
} catch (err) {
if (! require("internal").isStopping()) {
console.error("Could not registerTask for shard synchronization.");
var isStopping = require("internal").isStopping;
if (isStopping()) {
return;
}
lockSyncKeyspace();
try {
var jobs = global.KEYSPACE_GET("shardSynchronization");
if (jobs.running === null) {
var shards = Object.keys(jobs.scheduled);
if (shards.length > 0) {
var jobInfo = jobs.scheduled[shards[0]];
try {
registerTask({
database: jobInfo.database,
params: {database: jobInfo.database, shard: jobInfo.shard,
planId: jobInfo.planId, leader: jobInfo.leader},
command: function(params) {
require("@arangodb/cluster").synchronizeOneShard(
params.database, params.shard, params.planId, params.leader);
}});
} catch (err) {
if (! require("internal").isStopping()) {
console.error("Could not registerTask for shard synchronization.");
}
return;
}
return;
global.KEY_SET("shardSynchronization", "running", jobInfo);
console.debug("scheduleOneShardSynchronization: have launched job", jobInfo);
delete jobs.scheduled[shards[0]];
global.KEY_SET("shardSynchronization", "scheduled", jobs.scheduled);
}
global.KEY_SET("shardSynchronization", "running", jobInfo);
console.debug("scheduleOneShardSynchronization: have launched job", jobInfo);
delete jobs.scheduled[shards[0]];
global.KEY_SET("shardSynchronization", "scheduled", jobs.scheduled);
}
}
finally {
unlockSyncKeyspace();
}
}
////////////////////////////////////////////////////////////////////////////////
@ -1105,10 +1133,16 @@ function synchronizeOneShard(database, shard, planId, leader) {
}
}
// Tell others that we are done:
global.KEY_SET("shardSynchronization", "running", null);
if (!isStopping()) {
launchJob(); // start a new one if needed
lockSyncKeyspace();
try {
global.KEY_SET("shardSynchronization", "running", null);
}
finally {
unlockSyncKeyspace();
}
tryLaunchJob(); // start a new one if needed
console.info("synchronizeOneShard: donedone, %s/%s, %s/%s",
database, shard, database, planId);
}
////////////////////////////////////////////////////////////////////////////////
@ -1118,32 +1152,36 @@ function synchronizeOneShard(database, shard, planId, leader) {
function scheduleOneShardSynchronization(database, shard, planId, leader) {
console.debug("scheduleOneShardSynchronization:", database, shard, planId,
leader);
var jobs;
try {
jobs = global.KEYSPACE_GET("shardSynchronization");
global.KEY_GET("shardSynchronization", "lock");
}
catch (e) {
global.KEYSPACE_CREATE("shardSynchronization");
global.KEY_SET("shardSynchronization", "scheduled", {});
global.KEY_SET("shardSynchronization", "running", null);
jobs = { scheduled: {}, running: null };
global.KEY_SET("shardSynchronization", "lock", null);
}
if ((jobs.running !== null && jobs.running.shard === shard) ||
jobs.scheduled.hasOwnProperty(shard)) {
console.debug("task is already running or scheduled,",
"ignoring scheduling request");
return false;
}
lockSyncKeyspace();
try {
var jobs = global.KEYSPACE_GET("shardSynchronization");
if ((jobs.running !== null && jobs.running.shard === shard) ||
jobs.scheduled.hasOwnProperty(shard)) {
console.debug("task is already running or scheduled,",
"ignoring scheduling request");
return false;
}
// If we reach this, we actually have to schedule a new task:
var jobInfo = { database, shard, planId, leader };
jobs.scheduled[shard] = jobInfo;
global.KEY_SET("shardSynchronization", "scheduled", jobs.scheduled);
console.debug("scheduleOneShardSynchronization: have scheduled job", jobInfo);
if (jobs.running === null) { // no job scheduled, so start it:
launchJob();
// If we reach this, we actually have to schedule a new task:
var jobInfo = { database, shard, planId, leader };
jobs.scheduled[shard] = jobInfo;
global.KEY_SET("shardSynchronization", "scheduled", jobs.scheduled);
console.debug("scheduleOneShardSynchronization: have scheduled job", jobInfo);
}
finally {
unlockSyncKeyspace();
}
tryLaunchJob();
return true;
}