1
0
Fork 0

Merge branch 'devel' of https://github.com/arangodb/arangodb into devel

This commit is contained in:
Simon Grätzer 2017-01-20 14:42:19 +01:00
commit d71ebf7c1d
52 changed files with 2576 additions and 1245 deletions

View File

@ -106,9 +106,15 @@ class Builder {
checkOverflow(_pos + len); checkOverflow(_pos + len);
#endif #endif
_buffer->prealloc(len); // copy builder pointer into local variable
_start = _buffer->data(); // this avoids accessing the shared pointer repeatedly, which has
_size = _buffer->size(); // a small but non-negligible cost
Buffer<uint8_t>* buffer = _buffer.get();
VELOCYPACK_ASSERT(buffer != nullptr);
buffer->prealloc(len);
_start = buffer->data();
_size = buffer->size();
} }
// Sort the indices by attribute name: // Sort the indices by attribute name:

View File

@ -233,9 +233,15 @@ void Parser::parseString() {
// insert 8 bytes for the length as soon as we reach 127 bytes // insert 8 bytes for the length as soon as we reach 127 bytes
// in the VPack representation. // in the VPack representation.
ValueLength const base = _b->_pos; // copy builder pointer into local variable
_b->reserveSpace(1); // this avoids accessing the shared pointer repeatedly, which has
_b->_start[_b->_pos++] = 0x40; // correct this later // a small but non-negligible cost
Builder* builder = _b.get();
VELOCYPACK_ASSERT(builder != nullptr);
ValueLength const base = builder->_pos;
builder->reserveSpace(1);
builder->_start[builder->_pos++] = 0x40; // correct this later
bool large = false; // set to true when we reach 128 bytes bool large = false; // set to true when we reach 128 bytes
uint32_t highSurrogate = 0; // non-zero if high-surrogate was seen uint32_t highSurrogate = 0; // non-zero if high-surrogate was seen
@ -243,42 +249,42 @@ void Parser::parseString() {
while (true) { while (true) {
size_t remainder = _size - _pos; size_t remainder = _size - _pos;
if (remainder >= 16) { if (remainder >= 16) {
_b->reserveSpace(remainder); builder->reserveSpace(remainder);
size_t count; size_t count;
// Note that the SSE4.2 accelerated string copying functions might // Note that the SSE4.2 accelerated string copying functions might
// peek up to 15 bytes over the given end, because they use 128bit // peek up to 15 bytes over the given end, because they use 128bit
// registers. Therefore, we have to subtract 15 from remainder // registers. Therefore, we have to subtract 15 from remainder
// to be on the safe side. Further bytes will be processed below. // to be on the safe side. Further bytes will be processed below.
if (options->validateUtf8Strings) { if (options->validateUtf8Strings) {
count = JSONStringCopyCheckUtf8(_b->_start + _b->_pos, _start + _pos, count = JSONStringCopyCheckUtf8(builder->_start + builder->_pos, _start + _pos,
remainder - 15); remainder - 15);
} else { } else {
count = JSONStringCopy(_b->_start + _b->_pos, _start + _pos, count = JSONStringCopy(builder->_start + builder->_pos, _start + _pos,
remainder - 15); remainder - 15);
} }
_pos += count; _pos += count;
_b->_pos += count; builder->_pos += count;
} }
int i = getOneOrThrow("Unfinished string"); int i = getOneOrThrow("Unfinished string");
if (!large && _b->_pos - (base + 1) > 126) { if (!large && builder->_pos - (base + 1) > 126) {
large = true; large = true;
_b->reserveSpace(8); builder->reserveSpace(8);
ValueLength len = _b->_pos - (base + 1); ValueLength len = builder->_pos - (base + 1);
memmove(_b->_start + base + 9, _b->_start + base + 1, checkOverflow(len)); memmove(builder->_start + base + 9, builder->_start + base + 1, checkOverflow(len));
_b->_pos += 8; builder->_pos += 8;
} }
switch (i) { switch (i) {
case '"': case '"':
ValueLength len; ValueLength len;
if (!large) { if (!large) {
len = _b->_pos - (base + 1); len = builder->_pos - (base + 1);
_b->_start[base] = 0x40 + static_cast<uint8_t>(len); builder->_start[base] = 0x40 + static_cast<uint8_t>(len);
// String is ready // String is ready
} else { } else {
len = _b->_pos - (base + 9); len = builder->_pos - (base + 9);
_b->_start[base] = 0xbf; builder->_start[base] = 0xbf;
for (ValueLength i = 1; i <= 8; i++) { for (ValueLength i = 1; i <= 8; i++) {
_b->_start[base + i] = len & 0xff; builder->_start[base + i] = len & 0xff;
len >>= 8; len >>= 8;
} }
} }
@ -293,33 +299,33 @@ void Parser::parseString() {
case '"': case '"':
case '/': case '/':
case '\\': case '\\':
_b->reserveSpace(1); builder->reserveSpace(1);
_b->_start[_b->_pos++] = static_cast<uint8_t>(i); builder->_start[builder->_pos++] = static_cast<uint8_t>(i);
highSurrogate = 0; highSurrogate = 0;
break; break;
case 'b': case 'b':
_b->reserveSpace(1); builder->reserveSpace(1);
_b->_start[_b->_pos++] = '\b'; builder->_start[builder->_pos++] = '\b';
highSurrogate = 0; highSurrogate = 0;
break; break;
case 'f': case 'f':
_b->reserveSpace(1); builder->reserveSpace(1);
_b->_start[_b->_pos++] = '\f'; builder->_start[builder->_pos++] = '\f';
highSurrogate = 0; highSurrogate = 0;
break; break;
case 'n': case 'n':
_b->reserveSpace(1); builder->reserveSpace(1);
_b->_start[_b->_pos++] = '\n'; builder->_start[builder->_pos++] = '\n';
highSurrogate = 0; highSurrogate = 0;
break; break;
case 'r': case 'r':
_b->reserveSpace(1); builder->reserveSpace(1);
_b->_start[_b->_pos++] = '\r'; builder->_start[builder->_pos++] = '\r';
highSurrogate = 0; highSurrogate = 0;
break; break;
case 't': case 't':
_b->reserveSpace(1); builder->reserveSpace(1);
_b->_start[_b->_pos++] = '\t'; builder->_start[builder->_pos++] = '\t';
highSurrogate = 0; highSurrogate = 0;
break; break;
case 'u': { case 'u': {
@ -342,23 +348,23 @@ void Parser::parseString() {
} }
} }
if (v < 0x80) { if (v < 0x80) {
_b->reserveSpace(1); builder->reserveSpace(1);
_b->_start[_b->_pos++] = static_cast<uint8_t>(v); builder->_start[builder->_pos++] = static_cast<uint8_t>(v);
highSurrogate = 0; highSurrogate = 0;
} else if (v < 0x800) { } else if (v < 0x800) {
_b->reserveSpace(2); builder->reserveSpace(2);
_b->_start[_b->_pos++] = 0xc0 + (v >> 6); builder->_start[builder->_pos++] = 0xc0 + (v >> 6);
_b->_start[_b->_pos++] = 0x80 + (v & 0x3f); builder->_start[builder->_pos++] = 0x80 + (v & 0x3f);
highSurrogate = 0; highSurrogate = 0;
} else if (v >= 0xdc00 && v < 0xe000 && highSurrogate != 0) { } else if (v >= 0xdc00 && v < 0xe000 && highSurrogate != 0) {
// Low surrogate, put the two together: // Low surrogate, put the two together:
v = 0x10000 + ((highSurrogate - 0xd800) << 10) + v - 0xdc00; v = 0x10000 + ((highSurrogate - 0xd800) << 10) + v - 0xdc00;
_b->_pos -= 3; builder->_pos -= 3;
_b->reserveSpace(4); builder->reserveSpace(4);
_b->_start[_b->_pos++] = 0xf0 + (v >> 18); builder->_start[builder->_pos++] = 0xf0 + (v >> 18);
_b->_start[_b->_pos++] = 0x80 + ((v >> 12) & 0x3f); builder->_start[builder->_pos++] = 0x80 + ((v >> 12) & 0x3f);
_b->_start[_b->_pos++] = 0x80 + ((v >> 6) & 0x3f); builder->_start[builder->_pos++] = 0x80 + ((v >> 6) & 0x3f);
_b->_start[_b->_pos++] = 0x80 + (v & 0x3f); builder->_start[builder->_pos++] = 0x80 + (v & 0x3f);
highSurrogate = 0; highSurrogate = 0;
} else { } else {
if (v >= 0xd800 && v < 0xdc00) { if (v >= 0xd800 && v < 0xdc00) {
@ -367,10 +373,10 @@ void Parser::parseString() {
} else { } else {
highSurrogate = 0; highSurrogate = 0;
} }
_b->reserveSpace(3); builder->reserveSpace(3);
_b->_start[_b->_pos++] = 0xe0 + (v >> 12); builder->_start[builder->_pos++] = 0xe0 + (v >> 12);
_b->_start[_b->_pos++] = 0x80 + ((v >> 6) & 0x3f); builder->_start[builder->_pos++] = 0x80 + ((v >> 6) & 0x3f);
_b->_start[_b->_pos++] = 0x80 + (v & 0x3f); builder->_start[builder->_pos++] = 0x80 + (v & 0x3f);
} }
break; break;
} }
@ -386,13 +392,13 @@ void Parser::parseString() {
throw Exception(Exception::UnexpectedControlCharacter); throw Exception(Exception::UnexpectedControlCharacter);
} }
highSurrogate = 0; highSurrogate = 0;
_b->reserveSpace(1); builder->reserveSpace(1);
_b->_start[_b->_pos++] = static_cast<uint8_t>(i); builder->_start[builder->_pos++] = static_cast<uint8_t>(i);
} else { } else {
if (!options->validateUtf8Strings) { if (!options->validateUtf8Strings) {
highSurrogate = 0; highSurrogate = 0;
_b->reserveSpace(1); builder->reserveSpace(1);
_b->_start[_b->_pos++] = static_cast<uint8_t>(i); builder->_start[builder->_pos++] = static_cast<uint8_t>(i);
} else { } else {
// multi-byte UTF-8 sequence! // multi-byte UTF-8 sequence!
int follow = 0; int follow = 0;
@ -412,14 +418,14 @@ void Parser::parseString() {
} }
// validate follow up characters // validate follow up characters
_b->reserveSpace(1 + follow); builder->reserveSpace(1 + follow);
_b->_start[_b->_pos++] = static_cast<uint8_t>(i); builder->_start[builder->_pos++] = static_cast<uint8_t>(i);
for (int j = 0; j < follow; ++j) { for (int j = 0; j < follow; ++j) {
i = getOneOrThrow("scanString: truncated UTF-8 sequence"); i = getOneOrThrow("scanString: truncated UTF-8 sequence");
if ((i & 0xc0) != 0x80) { if ((i & 0xc0) != 0x80) {
throw Exception(Exception::InvalidUtf8Sequence); throw Exception(Exception::InvalidUtf8Sequence);
} }
_b->_start[_b->_pos++] = static_cast<uint8_t>(i); builder->_start[builder->_pos++] = static_cast<uint8_t>(i);
} }
highSurrogate = 0; highSurrogate = 0;
} }
@ -430,13 +436,19 @@ void Parser::parseString() {
} }
void Parser::parseArray() { void Parser::parseArray() {
_b->addArray(); // copy builder pointer into local variable
// this avoids accessing the shared pointer repeatedly, which has
// a small but non-negligible cost
Builder* builder = _b.get();
VELOCYPACK_ASSERT(builder != nullptr);
builder->addArray();
int i = skipWhiteSpace("Expecting item or ']'"); int i = skipWhiteSpace("Expecting item or ']'");
if (i == ']') { if (i == ']') {
// empty array // empty array
++_pos; // the closing ']' ++_pos; // the closing ']'
_b->close(); builder->close();
return; return;
} }
@ -444,13 +456,13 @@ void Parser::parseArray() {
while (true) { while (true) {
// parse array element itself // parse array element itself
_b->reportAdd(); builder->reportAdd();
parseJson(); parseJson();
i = skipWhiteSpace("Expecting ',' or ']'"); i = skipWhiteSpace("Expecting ',' or ']'");
if (i == ']') { if (i == ']') {
// end of array // end of array
++_pos; // the closing ']' ++_pos; // the closing ']'
_b->close(); builder->close();
decreaseNesting(); decreaseNesting();
return; return;
} }
@ -466,7 +478,13 @@ void Parser::parseArray() {
} }
void Parser::parseObject() { void Parser::parseObject() {
_b->addObject(); // copy builder pointer into local variable
// this avoids accessing the shared pointer repeatedly, which has
// a small but non-negligible cost
Builder* builder = _b.get();
VELOCYPACK_ASSERT(builder != nullptr);
builder->addObject();
int i = skipWhiteSpace("Expecting item or '}'"); int i = skipWhiteSpace("Expecting item or '}'");
if (i == '}') { if (i == '}') {
@ -475,7 +493,7 @@ void Parser::parseObject() {
if (_nesting != 0 || !options->keepTopLevelOpen) { if (_nesting != 0 || !options->keepTopLevelOpen) {
// only close if we've not been asked to keep top level open // only close if we've not been asked to keep top level open
_b->close(); builder->close();
} }
return; return;
} }
@ -490,22 +508,22 @@ void Parser::parseObject() {
// get past the initial '"' // get past the initial '"'
++_pos; ++_pos;
_b->reportAdd(); builder->reportAdd();
bool excludeAttribute = false; bool excludeAttribute = false;
auto const lastPos = _b->_pos; auto const lastPos = builder->_pos;
if (options->attributeExcludeHandler == nullptr) { if (options->attributeExcludeHandler == nullptr) {
parseString(); parseString();
} else { } else {
parseString(); parseString();
if (options->attributeExcludeHandler->shouldExclude( if (options->attributeExcludeHandler->shouldExclude(
Slice(_b->_start + lastPos), _nesting)) { Slice(builder->_start + lastPos), _nesting)) {
excludeAttribute = true; excludeAttribute = true;
} }
} }
if (!excludeAttribute && options->attributeTranslator != nullptr) { if (!excludeAttribute && options->attributeTranslator != nullptr) {
// check if a translation for the attribute name exists // check if a translation for the attribute name exists
Slice key(_b->_start + lastPos); Slice key(builder->_start + lastPos);
if (key.isString()) { if (key.isString()) {
ValueLength keyLength; ValueLength keyLength;
@ -517,8 +535,8 @@ void Parser::parseObject() {
// found translation... now reset position to old key position // found translation... now reset position to old key position
// and simply overwrite the existing key with the numeric translation // and simply overwrite the existing key with the numeric translation
// id // id
_b->_pos = lastPos; builder->_pos = lastPos;
_b->addUInt(Slice(translated).getUInt()); builder->addUInt(Slice(translated).getUInt());
} }
} }
} }
@ -533,7 +551,7 @@ void Parser::parseObject() {
parseJson(); parseJson();
if (excludeAttribute) { if (excludeAttribute) {
_b->removeLast(); builder->removeLast();
} }
i = skipWhiteSpace("Expecting ',' or '}'"); i = skipWhiteSpace("Expecting ',' or '}'");
@ -542,7 +560,7 @@ void Parser::parseObject() {
++_pos; // the closing '}' ++_pos; // the closing '}'
if (_nesting != 1 || !options->keepTopLevelOpen) { if (_nesting != 1 || !options->keepTopLevelOpen) {
// only close if we've not been asked to keep top level open // only close if we've not been asked to keep top level open
_b->close(); builder->close();
} }
decreaseNesting(); decreaseNesting();
return; return;

View File

@ -224,7 +224,7 @@ void AgencyWriteTransaction::toVelocyPack(VPackBuilder& builder) const {
VPackObjectBuilder guard3(&builder); VPackObjectBuilder guard3(&builder);
} }
builder.add(VPackValue(transactionId)); // Transactions builder.add(VPackValue(clientId)); // Transactions
} }
bool AgencyWriteTransaction::validate(AgencyCommResult const& result) const { bool AgencyWriteTransaction::validate(AgencyCommResult const& result) const {
@ -283,7 +283,7 @@ void AgencyGeneralTransaction::toVelocyPack(VPackBuilder& builder) const {
} else { } else {
std::get<0>(operation).toGeneralBuilder(builder); std::get<0>(operation).toGeneralBuilder(builder);
std::get<1>(operation).toGeneralBuilder(builder); std::get<1>(operation).toGeneralBuilder(builder);
builder.add(VPackValue(transactionId)); builder.add(VPackValue(clientId));
} }
} }
} }
@ -328,18 +328,23 @@ AgencyCommResult::AgencyCommResult()
_body(), _body(),
_values(), _values(),
_statusCode(0), _statusCode(0),
_connected(false) {} _connected(false),
_clientId("") {}
AgencyCommResult::AgencyCommResult(int code, std::string const& message) AgencyCommResult::AgencyCommResult(
int code, std::string const& message, std::string const& clientId)
: _location(), : _location(),
_message(message), _message(message),
_body(), _body(),
_values(), _values(),
_statusCode(code), _statusCode(code),
_connected(false) {} _connected(false),
_clientId(clientId) {}
bool AgencyCommResult::connected() const { return _connected; } bool AgencyCommResult::connected() const { return _connected; }
std::string AgencyCommResult::clientId() const { return _clientId; }
int AgencyCommResult::httpCode() const { return _statusCode; } int AgencyCommResult::httpCode() const { return _statusCode; }
int AgencyCommResult::errorCode() const { int AgencyCommResult::errorCode() const {
@ -1070,9 +1075,9 @@ AgencyCommResult AgencyComm::sendTransactionWithFailover(
AgencyCommResult result = sendWithFailover( AgencyCommResult result = sendWithFailover(
arangodb::rest::RequestType::POST, arangodb::rest::RequestType::POST,
(timeout == 0.0 ? AgencyCommManager::CONNECTION_OPTIONS._requestTimeout (timeout == 0.0) ?
: timeout), AgencyCommManager::CONNECTION_OPTIONS._requestTimeout : timeout,
url, builder.slice().toJson()); url, builder.slice().toJson(), transaction.getClientId());
if (!result.successful() && result.httpCode() != if (!result.successful() && result.httpCode() !=
(int)arangodb::rest::ResponseCode::PRECONDITION_FAILED) { (int)arangodb::rest::ResponseCode::PRECONDITION_FAILED) {
@ -1285,7 +1290,8 @@ void AgencyComm::updateEndpoints(arangodb::velocypack::Slice const& current) {
AgencyCommResult AgencyComm::sendWithFailover( AgencyCommResult AgencyComm::sendWithFailover(
arangodb::rest::RequestType method, double const timeout, arangodb::rest::RequestType method, double const timeout,
std::string const& initialUrl, std::string const& body) { std::string const& initialUrl, std::string const& body,
std::string const& clientId) {
std::string endpoint; std::string endpoint;
std::unique_ptr<GeneralClientConnection> connection = std::unique_ptr<GeneralClientConnection> connection =
@ -1326,7 +1332,7 @@ AgencyCommResult AgencyComm::sendWithFailover(
++tries; ++tries;
if (connection == nullptr) { if (connection == nullptr) {
AgencyCommResult result(400, "No endpoints for agency found."); AgencyCommResult result(400, "No endpoints for agency found.", clientId);
LOG_TOPIC(ERR, Logger::AGENCYCOMM) << result._message; LOG_TOPIC(ERR, Logger::AGENCYCOMM) << result._message;
return result; return result;
} }
@ -1343,7 +1349,7 @@ AgencyCommResult AgencyComm::sendWithFailover(
// try to send; if we fail completely, do not retry // try to send; if we fail completely, do not retry
try { try {
result = send(connection.get(), method, conTimeout, url, body); result = send(connection.get(), method, conTimeout, url, body, clientId);
} catch (...) { } catch (...) {
AgencyCommManager::MANAGER->failed(std::move(connection), endpoint); AgencyCommManager::MANAGER->failed(std::move(connection), endpoint);
endpoint.clear(); endpoint.clear();
@ -1403,7 +1409,7 @@ AgencyCommResult AgencyComm::sendWithFailover(
AgencyCommResult AgencyComm::send( AgencyCommResult AgencyComm::send(
arangodb::httpclient::GeneralClientConnection* connection, arangodb::httpclient::GeneralClientConnection* connection,
arangodb::rest::RequestType method, double timeout, std::string const& url, arangodb::rest::RequestType method, double timeout, std::string const& url,
std::string const& body) { std::string const& body, std::string const& clientId) {
TRI_ASSERT(connection != nullptr); TRI_ASSERT(connection != nullptr);
if (method == arangodb::rest::RequestType::GET || if (method == arangodb::rest::RequestType::GET ||
@ -1417,6 +1423,9 @@ AgencyCommResult AgencyComm::send(
AgencyCommResult result; AgencyCommResult result;
result._connected = false; result._connected = false;
result._statusCode = 0; result._statusCode = 0;
if (!clientId.empty()) {
result._clientId = clientId;
}
LOG_TOPIC(TRACE, Logger::AGENCYCOMM) LOG_TOPIC(TRACE, Logger::AGENCYCOMM)
<< "sending " << arangodb::HttpRequest::translateMethod(method) << "sending " << arangodb::HttpRequest::translateMethod(method)

View File

@ -218,7 +218,9 @@ class AgencyOperation {
class AgencyCommResult { class AgencyCommResult {
public: public:
AgencyCommResult(); AgencyCommResult();
AgencyCommResult(int code, std::string const& message); AgencyCommResult(int code, std::string const& message,
std::string const& transactionId = std::string());
~AgencyCommResult() = default; ~AgencyCommResult() = default;
public: public:
@ -230,6 +232,8 @@ class AgencyCommResult {
int errorCode() const; int errorCode() const;
std::string clientId() const;
std::string errorMessage() const; std::string errorMessage() const;
std::string errorDetails() const; std::string errorDetails() const;
@ -256,6 +260,9 @@ class AgencyCommResult {
private: private:
std::shared_ptr<velocypack::Builder> _vpack; std::shared_ptr<velocypack::Builder> _vpack;
public:
std::string _clientId;
}; };
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
@ -272,6 +279,7 @@ public:
virtual std::string toJson() const = 0; virtual std::string toJson() const = 0;
virtual void toVelocyPack(arangodb::velocypack::Builder&) const = 0; virtual void toVelocyPack(arangodb::velocypack::Builder&) const = 0;
virtual std::string const& path() const = 0; virtual std::string const& path() const = 0;
virtual std::string getClientId() const = 0;
virtual bool validate(AgencyCommResult const& result) const = 0; virtual bool validate(AgencyCommResult const& result) const = 0;
@ -285,14 +293,14 @@ struct AgencyGeneralTransaction : public AgencyTransaction {
explicit AgencyGeneralTransaction( explicit AgencyGeneralTransaction(
std::pair<AgencyOperation,AgencyPrecondition> const& operation) : std::pair<AgencyOperation,AgencyPrecondition> const& operation) :
transactionId(to_string(boost::uuids::random_generator()())) { clientId(to_string(boost::uuids::random_generator()())) {
operations.push_back(operation); operations.push_back(operation);
} }
explicit AgencyGeneralTransaction( explicit AgencyGeneralTransaction(
std::vector<std::pair<AgencyOperation,AgencyPrecondition>> const& _opers) : std::vector<std::pair<AgencyOperation,AgencyPrecondition>> const& _opers) :
operations(_opers), operations(_opers),
transactionId(to_string(boost::uuids::random_generator()())) {} clientId(to_string(boost::uuids::random_generator()())) {}
AgencyGeneralTransaction() = default; AgencyGeneralTransaction() = default;
@ -305,12 +313,16 @@ struct AgencyGeneralTransaction : public AgencyTransaction {
void push_back(std::pair<AgencyOperation,AgencyPrecondition> const&); void push_back(std::pair<AgencyOperation,AgencyPrecondition> const&);
inline std::string const& path() const override final { inline virtual std::string const& path() const override final {
return AgencyTransaction::TypeUrl[2]; return AgencyTransaction::TypeUrl[2];
} }
inline virtual std::string getClientId() const override final {
return clientId;
}
virtual bool validate(AgencyCommResult const& result) const override final; virtual bool validate(AgencyCommResult const& result) const override final;
std::string transactionId; std::string clientId;
}; };
@ -323,24 +335,24 @@ struct AgencyWriteTransaction : public AgencyTransaction {
public: public:
explicit AgencyWriteTransaction(AgencyOperation const& operation) : explicit AgencyWriteTransaction(AgencyOperation const& operation) :
transactionId(to_string(boost::uuids::random_generator()())) { clientId(to_string(boost::uuids::random_generator()())) {
operations.push_back(operation); operations.push_back(operation);
} }
explicit AgencyWriteTransaction (std::vector<AgencyOperation> const& _opers) : explicit AgencyWriteTransaction (std::vector<AgencyOperation> const& _opers) :
operations(_opers), operations(_opers),
transactionId(to_string(boost::uuids::random_generator()())) {} clientId(to_string(boost::uuids::random_generator()())) {}
AgencyWriteTransaction(AgencyOperation const& operation, AgencyWriteTransaction(AgencyOperation const& operation,
AgencyPrecondition const& precondition) : AgencyPrecondition const& precondition) :
transactionId(to_string(boost::uuids::random_generator()())) { clientId(to_string(boost::uuids::random_generator()())) {
operations.push_back(operation); operations.push_back(operation);
preconditions.push_back(precondition); preconditions.push_back(precondition);
} }
AgencyWriteTransaction(std::vector<AgencyOperation> const& _operations, AgencyWriteTransaction(std::vector<AgencyOperation> const& _operations,
AgencyPrecondition const& precondition) : AgencyPrecondition const& precondition) :
transactionId(to_string(boost::uuids::random_generator()())) { clientId(to_string(boost::uuids::random_generator()())) {
for (auto const& op : _operations) { for (auto const& op : _operations) {
operations.push_back(op); operations.push_back(op);
} }
@ -349,7 +361,7 @@ public:
AgencyWriteTransaction(std::vector<AgencyOperation> const& opers, AgencyWriteTransaction(std::vector<AgencyOperation> const& opers,
std::vector<AgencyPrecondition> const& precs) : std::vector<AgencyPrecondition> const& precs) :
transactionId(to_string(boost::uuids::random_generator()())) { clientId(to_string(boost::uuids::random_generator()())) {
for (auto const& op : opers) { for (auto const& op : opers) {
operations.push_back(op); operations.push_back(op);
} }
@ -365,15 +377,19 @@ public:
std::string toJson() const override final; std::string toJson() const override final;
inline std::string const& path() const override final { inline virtual std::string const& path() const override final {
return AgencyTransaction::TypeUrl[1]; return AgencyTransaction::TypeUrl[1];
} }
inline virtual std::string getClientId() const override final {
return clientId;
}
virtual bool validate(AgencyCommResult const& result) const override final; virtual bool validate(AgencyCommResult const& result) const override final;
std::vector<AgencyPrecondition> preconditions; std::vector<AgencyPrecondition> preconditions;
std::vector<AgencyOperation> operations; std::vector<AgencyOperation> operations;
std::string transactionId; std::string clientId;
}; };
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
@ -427,6 +443,10 @@ public:
return AgencyTransaction::TypeUrl[3]; return AgencyTransaction::TypeUrl[3];
} }
inline virtual std::string getClientId() const override final {
return std::string();
}
virtual bool validate(AgencyCommResult const& result) const override final; virtual bool validate(AgencyCommResult const& result) const override final;
std::vector<AgencyPrecondition> preconditions; std::vector<AgencyPrecondition> preconditions;
@ -454,10 +474,14 @@ public:
std::string toJson() const override final; std::string toJson() const override final;
inline std::string const& path() const override final { inline virtual std::string const& path() const override final {
return AgencyTransaction::TypeUrl[0]; return AgencyTransaction::TypeUrl[0];
} }
inline virtual std::string getClientId() const override final {
return std::string();
}
virtual bool validate(AgencyCommResult const& result) const override final; virtual bool validate(AgencyCommResult const& result) const override final;
std::vector<std::string> keys; std::vector<std::string> keys;
@ -614,7 +638,8 @@ class AgencyComm {
bool ensureStructureInitialized(); bool ensureStructureInitialized();
AgencyCommResult sendWithFailover(arangodb::rest::RequestType, double, AgencyCommResult sendWithFailover(arangodb::rest::RequestType, double,
std::string const&, std::string const&); std::string const&, std::string const&,
std::string const& clientId = std::string());
private: private:
bool lock(std::string const&, double, double, bool lock(std::string const&, double, double,
@ -623,7 +648,8 @@ class AgencyComm {
bool unlock(std::string const&, arangodb::velocypack::Slice const&, double); bool unlock(std::string const&, arangodb::velocypack::Slice const&, double);
AgencyCommResult send(httpclient::GeneralClientConnection*, rest::RequestType, AgencyCommResult send(httpclient::GeneralClientConnection*, rest::RequestType,
double, std::string const&, std::string const&); double, std::string const&, std::string const&,
std::string const& clientId = std::string());
bool tryInitializeStructure(std::string const& jwtSecret); bool tryInitializeStructure(std::string const& jwtSecret);

View File

@ -810,7 +810,6 @@ void Agent::run() {
} else { } else {
_appendCV.wait(1000000); _appendCV.wait(1000000);
updateConfiguration();
} }
} }
@ -910,9 +909,9 @@ void Agent::detectActiveAgentFailures() {
system_clock::now() - lastAcked.at(id)).count(); system_clock::now() - lastAcked.at(id)).count();
if (ds > 180.0) { if (ds > 180.0) {
std::string repl = _config.nextAgentInLine(); std::string repl = _config.nextAgentInLine();
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Active agent " << id << " has failed. << " LOG_TOPIC(DEBUG, Logger::AGENCY)
<< repl << " will be promoted to active agency membership"; << "Active agent " << id << " has failed. << " << repl
// Guarded in :: << " will be promoted to active agency membership";
_activator = std::make_unique<AgentActivator>(this, id, repl); _activator = std::make_unique<AgentActivator>(this, id, repl);
_activator->start(); _activator->start();
return; return;
@ -923,13 +922,6 @@ void Agent::detectActiveAgentFailures() {
} }
void Agent::updateConfiguration() {
// First ask last know leader
}
/// Orderly shutdown /// Orderly shutdown
void Agent::beginShutdown() { void Agent::beginShutdown() {
Thread::beginShutdown(); Thread::beginShutdown();

View File

@ -223,9 +223,6 @@ class Agent : public arangodb::Thread {
/// @brief persist agency configuration in RAFT /// @brief persist agency configuration in RAFT
void persistConfiguration(term_t t); void persistConfiguration(term_t t);
/// @brief Update my configuration as passive agent
void updateConfiguration();
/// @brief Find out, if we've had acknowledged RPCs recent enough /// @brief Find out, if we've had acknowledged RPCs recent enough
bool challengeLeadership(); bool challengeLeadership();

View File

@ -252,8 +252,7 @@ JOB_STATUS FailedFollower::status() {
Node const& planned = _snapshot(planPath); Node const& planned = _snapshot(planPath);
Node const& current = _snapshot(curPath); Node const& current = _snapshot(curPath);
if (planned.slice() == current.slice()) { if (compareServerLists(planned.slice(), current.slice())) {
// Remove shard from /arango/Target/FailedServers/<server> array // Remove shard from /arango/Target/FailedServers/<server> array
Builder del; Builder del;
del.openArray(); del.openArray();

View File

@ -25,6 +25,28 @@
using namespace arangodb::consensus; using namespace arangodb::consensus;
bool arangodb::consensus::compareServerLists(Slice plan, Slice current) {
if (!plan.isArray() || !current.isArray()) {
return false;
}
std::vector<std::string> planv, currv;
for (auto const& srv : VPackArrayIterator(plan)) {
if (srv.isString()) {
planv.push_back(srv.copyString());
}
}
for (auto const& srv : VPackArrayIterator(current)) {
if (srv.isString()) {
currv.push_back(srv.copyString());
}
}
bool equalLeader = !planv.empty() && !currv.empty() &&
planv.front() == currv.front();
std::sort(planv.begin(), planv.end());
std::sort(currv.begin(), currv.end());
return equalLeader && currv == planv;
}
Job::Job(Node const& snapshot, Agent* agent, std::string const& jobId, Job::Job(Node const& snapshot, Agent* agent, std::string const& jobId,
std::string const& creator, std::string const& agencyPrefix) : std::string const& creator, std::string const& agencyPrefix) :
_snapshot(snapshot), _snapshot(snapshot),

View File

@ -37,6 +37,12 @@
namespace arangodb { namespace arangodb {
namespace consensus { namespace consensus {
// This is intended for lists of servers with the first being the leader
// and all others followers. Both arguments must be arrays. Returns true,
// if the first items in both slice are equal and if both arrays contain
// the same set of strings.
bool compareServerLists(Slice plan, Slice current);
enum JOB_STATUS { TODO, PENDING, FINISHED, FAILED, NOTFOUND }; enum JOB_STATUS { TODO, PENDING, FINISHED, FAILED, NOTFOUND };
const std::vector<std::string> pos({"/Target/ToDo/", "/Target/Pending/", const std::vector<std::string> pos({"/Target/ToDo/", "/Target/Pending/",
"/Target/Finished/", "/Target/Failed/"}); "/Target/Finished/", "/Target/Failed/"});

View File

@ -190,7 +190,7 @@ bool MoveShard::start() {
} }
} }
// Are we ditributeShardsLiked by others? // Are we distributeShardsLiked by others?
// Invoke moveShard here with others // Invoke moveShard here with others
auto collections = _snapshot(planColPrefix + _database).children(); auto collections = _snapshot(planColPrefix + _database).children();
std::vector<std::string> colsLikeMe; std::vector<std::string> colsLikeMe;
@ -430,16 +430,7 @@ JOB_STATUS MoveShard::status() {
Slice current = _snapshot(curPath).slice(); Slice current = _snapshot(curPath).slice();
Slice plan = _snapshot(planPath).slice(); Slice plan = _snapshot(planPath).slice();
std::vector<std::string> planv, currv; if (compareServerLists(plan, current)) {
for (auto const& srv : VPackArrayIterator(plan)) {
planv.push_back(srv.copyString());
}
std::sort(planv.begin(), planv.end());
for (auto const& srv : VPackArrayIterator(current)) {
currv.push_back(srv.copyString());
}
std::sort(currv.begin(), currv.end());
if (currv == planv) {
if (current[0].copyString() == if (current[0].copyString() ==
std::string("_") + _from) { // Retired leader std::string("_") + _from) { // Retired leader

View File

@ -318,14 +318,33 @@ std::vector<bool> Store::apply(
body.add("index", VPackValue(lastCommitIndex)); body.add("index", VPackValue(lastCommitIndex));
auto ret = in.equal_range(url); auto ret = in.equal_range(url);
// mop: XXX not exactly sure what is supposed to happen here
// if there are multiple subobjects being updates at the same time
// e.g.
// /hans/wurst
// /hans/wurst/peter: 1
// /hans/wurst
// /hans/wurst/uschi: 2
// we are generating invalid json...not sure if this here is a
// valid fix...it is most likely broken :S
std::string currentKey;
for (auto it = ret.first; it != ret.second; ++it) { for (auto it = ret.first; it != ret.second; ++it) {
body.add(it->second->key, VPackValue(VPackValueType::Object)); if (currentKey != it->second->key) {
if (!currentKey.empty()) {
body.close();
}
body.add(it->second->key, VPackValue(VPackValueType::Object));
currentKey = it->second->key;
}
// mop: XXX maybe there are duplicates here as well?
// e.g. a key is set and deleted in the same transaction?
body.add(it->second->modified, VPackValue(VPackValueType::Object)); body.add(it->second->modified, VPackValue(VPackValueType::Object));
body.add("op", VPackValue(it->second->oper)); body.add("op", VPackValue(it->second->oper));
body.close(); body.close();
}
if (!currentKey.empty()) {
body.close(); body.close();
} }
body.close(); body.close();
std::string endpoint, path; std::string endpoint, path;

View File

@ -103,13 +103,14 @@ void Supervision::upgradeAgency() {
// Check all DB servers, guarded above doChecks // Check all DB servers, guarded above doChecks
std::vector<check_t> Supervision::checkDBServers() { std::vector<check_t> Supervision::checkDBServers() {
std::vector<check_t> ret; std::vector<check_t> ret;
Node::Children const& machinesPlanned = Node::Children const& machinesPlanned =
_snapshot(planDBServersPrefix).children(); _snapshot(planDBServersPrefix).children();
Node::Children const serversRegistered = Node::Children const serversRegistered =
_snapshot(currentServersRegisteredPrefix).children(); _snapshot(currentServersRegisteredPrefix).children();
bool reportPersistent; bool reportPersistent = false;
std::vector<std::string> todelete; std::vector<std::string> todelete;
for (auto const& machine : _snapshot(healthPrefix).children()) { for (auto const& machine : _snapshot(healthPrefix).children()) {
@ -148,6 +149,8 @@ std::vector<check_t> Supervision::checkDBServers() {
good = true; good = true;
} }
reportPersistent = (heartbeatStatus != lastStatus);
query_t report = std::make_shared<Builder>(); query_t report = std::make_shared<Builder>();
report->openArray(); report->openArray();
report->openArray(); report->openArray();
@ -174,9 +177,6 @@ std::vector<check_t> Supervision::checkDBServers() {
if (good) { if (good) {
if (lastStatus != Supervision::HEALTH_STATUS_GOOD) {
reportPersistent = true;
}
report->add( report->add(
"LastHeartbeatAcked", "LastHeartbeatAcked",
VPackValue(timepointToString(std::chrono::system_clock::now()))); VPackValue(timepointToString(std::chrono::system_clock::now())));
@ -210,7 +210,6 @@ std::vector<check_t> Supervision::checkDBServers() {
// for at least grace period // for at least grace period
if (t.count() > _gracePeriod && secondsSinceLeader > _gracePeriod) { if (t.count() > _gracePeriod && secondsSinceLeader > _gracePeriod) {
if (lastStatus == "BAD") { if (lastStatus == "BAD") {
reportPersistent = true;
report->add("Status", VPackValue("FAILED")); report->add("Status", VPackValue("FAILED"));
FailedServer fsj(_snapshot, _agent, std::to_string(_jobId++), FailedServer fsj(_snapshot, _agent, std::to_string(_jobId++),
"supervision", _agencyPrefix, serverID); "supervision", _agencyPrefix, serverID);
@ -257,6 +256,9 @@ std::vector<check_t> Supervision::checkDBServers() {
// Check all coordinators, guarded above doChecks // Check all coordinators, guarded above doChecks
std::vector<check_t> Supervision::checkCoordinators() { std::vector<check_t> Supervision::checkCoordinators() {
bool reportPersistent = false;
std::vector<check_t> ret; std::vector<check_t> ret;
Node::Children const& machinesPlanned = Node::Children const& machinesPlanned =
_snapshot(planCoordinatorsPrefix).children(); _snapshot(planCoordinatorsPrefix).children();
@ -305,6 +307,8 @@ std::vector<check_t> Supervision::checkCoordinators() {
good = true; good = true;
} }
reportPersistent = (heartbeatStatus != lastStatus);
query_t report = std::make_shared<Builder>(); query_t report = std::make_shared<Builder>();
report->openArray(); report->openArray();
report->openArray(); report->openArray();
@ -329,6 +333,7 @@ std::vector<check_t> Supervision::checkCoordinators() {
} }
if (good) { if (good) {
if (goodServerId.empty()) { if (goodServerId.empty()) {
goodServerId = serverID; goodServerId = serverID;
} }
@ -359,6 +364,9 @@ std::vector<check_t> Supervision::checkCoordinators() {
report->close(); report->close();
if (!this->isStopping()) { if (!this->isStopping()) {
_agent->transient(report); _agent->transient(report);
if (reportPersistent) { // STATUS changes should be persisted
_agent->write(report);
}
} }
} }

View File

@ -62,145 +62,148 @@ class Optimizer {
// lower level values mean earlier rule execution // lower level values mean earlier rule execution
// note that levels must be unique // note that levels must be unique
initial = 100,
// "Pass 1": moving nodes "up" (potentially outside loops): // "Pass 1": moving nodes "up" (potentially outside loops):
pass1 = 100, // ========================================================
// determine the "right" type of CollectNode and // determine the "right" type of CollectNode and
// add a sort node for each COLLECT (may be removed later) // add a sort node for each COLLECT (may be removed later)
specializeCollectRule_pass1 = 105, specializeCollectRule_pass1,
inlineSubqueriesRule_pass1 = 106, inlineSubqueriesRule_pass1,
// split and-combined filters into multiple smaller filters // split and-combined filters into multiple smaller filters
splitFiltersRule_pass1 = 110, splitFiltersRule_pass1,
// move calculations up the dependency chain (to pull them out of // move calculations up the dependency chain (to pull them out of
// inner loops etc.) // inner loops etc.)
moveCalculationsUpRule_pass1 = 120, moveCalculationsUpRule_pass1,
// move filters up the dependency chain (to make result sets as small // move filters up the dependency chain (to make result sets as small
// as possible as early as possible) // as possible as early as possible)
moveFiltersUpRule_pass1 = 130, moveFiltersUpRule_pass1,
// remove calculations that are repeatedly used in a query // remove calculations that are repeatedly used in a query
removeRedundantCalculationsRule_pass1 = 140, removeRedundantCalculationsRule_pass1,
// "Pass 2": try to remove redundant or unnecessary nodes
// ======================================================
/// "Pass 2": try to remove redundant or unnecessary nodes
pass2 = 200,
// remove filters from the query that are not necessary at all // remove filters from the query that are not necessary at all
// filters that are always true will be removed entirely // filters that are always true will be removed entirely
// filters that are always false will be replaced with a NoResults node // filters that are always false will be replaced with a NoResults node
removeUnnecessaryFiltersRule_pass2 = 210, removeUnnecessaryFiltersRule_pass2,
// remove calculations that are never necessary // remove calculations that are never necessary
removeUnnecessaryCalculationsRule_pass2 = 220, removeUnnecessaryCalculationsRule_pass2,
// remove redundant sort blocks // remove redundant sort blocks
removeRedundantSortsRule_pass2 = 230, removeRedundantSortsRule_pass2,
/// "Pass 3": interchange EnumerateCollection nodes in all possible ways // "Pass 3": interchange EnumerateCollection nodes in all possible ways
/// this is level 500, please never let new plans from higher // this is level 500, please never let new plans from higher
/// levels go back to this or lower levels! // levels go back to this or lower levels!
pass3 = 500, // ======================================================
interchangeAdjacentEnumerationsRule_pass3 = 510,
interchangeAdjacentEnumerationsRule_pass3,
// "Pass 4": moving nodes "up" (potentially outside loops) (second try): // "Pass 4": moving nodes "up" (potentially outside loops) (second try):
pass4 = 600, // ======================================================
// move calculations up the dependency chain (to pull them out of // move calculations up the dependency chain (to pull them out of
// inner loops etc.) // inner loops etc.)
moveCalculationsUpRule_pass4 = 610, moveCalculationsUpRule_pass4,
// move filters up the dependency chain (to make result sets as small // move filters up the dependency chain (to make result sets as small
// as possible as early as possible) // as possible as early as possible)
moveFiltersUpRule_pass4 = 620, moveFiltersUpRule_pass4,
/// "Pass 5": try to remove redundant or unnecessary nodes (second try) /// "Pass 5": try to remove redundant or unnecessary nodes (second try)
// remove filters from the query that are not necessary at all // remove filters from the query that are not necessary at all
// filters that are always true will be removed entirely // filters that are always true will be removed entirely
// filters that are always false will be replaced with a NoResults node // filters that are always false will be replaced with a NoResults node
pass5 = 700, // ======================================================
// remove redundant sort blocks // remove redundant sort blocks
removeRedundantSortsRule_pass5 = 710, removeRedundantSortsRule_pass5,
// remove SORT RAND() if appropriate // remove SORT RAND() if appropriate
removeSortRandRule_pass5 = 720, removeSortRandRule_pass5,
// remove INTO for COLLECT if appropriate // remove INTO for COLLECT if appropriate
removeCollectVariablesRule_pass5 = 740, removeCollectVariablesRule_pass5,
// propagate constant attributes in FILTERs // propagate constant attributes in FILTERs
propagateConstantAttributesRule_pass5 = 750, propagateConstantAttributesRule_pass5,
// remove unused out variables for data-modification queries // remove unused out variables for data-modification queries
removeDataModificationOutVariablesRule_pass5 = 760, removeDataModificationOutVariablesRule_pass5,
/// "Pass 6": use indexes if possible for FILTER and/or SORT nodes /// "Pass 6": use indexes if possible for FILTER and/or SORT nodes
pass6 = 800, // ======================================================
// replace simple OR conditions with IN // replace simple OR conditions with IN
replaceOrWithInRule_pass6 = 810, replaceOrWithInRule_pass6,
// remove redundant OR conditions // remove redundant OR conditions
removeRedundantOrRule_pass6 = 820, removeRedundantOrRule_pass6,
applyGeoIndexRule = 825, applyGeoIndexRule,
useIndexesRule_pass6 = 830, useIndexesRule_pass6,
// try to remove filters covered by index ranges // try to remove filters covered by index ranges
removeFiltersCoveredByIndexRule_pass6 = 840, removeFiltersCoveredByIndexRule_pass6,
removeUnnecessaryFiltersRule_pass6 = 850, removeUnnecessaryFiltersRule_pass6,
// try to find sort blocks which are superseeded by indexes // try to find sort blocks which are superseeded by indexes
useIndexForSortRule_pass6 = 860, useIndexForSortRule_pass6,
// sort values used in IN comparisons of remaining filters // sort values used in IN comparisons of remaining filters
sortInValuesRule_pass6 = 865, sortInValuesRule_pass6,
// remove calculations that are never necessary // remove calculations that are never necessary
removeUnnecessaryCalculationsRule_pass6 = 870, removeUnnecessaryCalculationsRule_pass6,
// merge filters into graph traversals // merge filters into graph traversals
optimizeTraversalsRule_pass6 = 880, optimizeTraversalsRule_pass6,
prepareTraversalsRule_pass6 = 881, prepareTraversalsRule_pass6,
/// Pass 9: push down calculations beyond FILTERs and LIMITs /// Pass 9: push down calculations beyond FILTERs and LIMITs
moveCalculationsDownRule_pass9 = 900, moveCalculationsDownRule_pass9,
/// Pass 9: patch update statements /// Pass 9: patch update statements
patchUpdateStatementsRule_pass9 = 902, patchUpdateStatementsRule_pass9,
/// "Pass 10": final transformations for the cluster /// "Pass 10": final transformations for the cluster
// make operations on sharded collections use distribute // make operations on sharded collections use distribute
distributeInClusterRule_pass10 = 1000, distributeInClusterRule_pass10,
// make operations on sharded collections use scatter / gather / remote // make operations on sharded collections use scatter / gather / remote
scatterInClusterRule_pass10 = 1010, scatterInClusterRule_pass10,
// move FilterNodes & Calculation nodes in between // move FilterNodes & Calculation nodes in between
// scatter(remote) <-> gather(remote) so they're // scatter(remote) <-> gather(remote) so they're
// distributed to the cluster nodes. // distributed to the cluster nodes.
distributeFilternCalcToClusterRule_pass10 = 1020, distributeFilternCalcToClusterRule_pass10,
// move SortNodes into the distribution. // move SortNodes into the distribution.
// adjust gathernode to also contain the sort criteria. // adjust gathernode to also contain the sort criteria.
distributeSortToClusterRule_pass10 = 1030, distributeSortToClusterRule_pass10,
// try to get rid of a RemoteNode->ScatterNode combination which has // try to get rid of a RemoteNode->ScatterNode combination which has
// only a SingletonNode and possibly some CalculationNodes as dependencies // only a SingletonNode and possibly some CalculationNodes as dependencies
removeUnnecessaryRemoteScatterRule_pass10 = 1040, removeUnnecessaryRemoteScatterRule_pass10,
// remove any superflous satellite collection joins... // remove any superflous satellite collection joins...
// put it after Scatter rule because we would do // put it after Scatter rule because we would do
// the work twice otherwise // the work twice otherwise
removeSatelliteJoinsRule_pass10 = 1045, removeSatelliteJoinsRule_pass10,
// recognize that a RemoveNode can be moved to the shards // recognize that a RemoveNode can be moved to the shards
undistributeRemoveAfterEnumCollRule_pass10 = 1050 undistributeRemoveAfterEnumCollRule_pass10
}; };
public: public:

View File

@ -3928,10 +3928,6 @@ void arangodb::aql::inlineSubqueriesRule(Optimizer* opt,
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
// GEO RULE /////////////////////////////////////////////////////////////////// // GEO RULE ///////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
//
// Description of what this Rule tries to achieve:
// https://docs.google.com/document/d/1G57UP08ZFywUXKi5cLvEIKpZP-AUKGwG9oAnFOX8LLo
//
struct GeoIndexInfo{ struct GeoIndexInfo{
operator bool() const { return distanceNode && valid; } operator bool() const { return distanceNode && valid; }

View File

@ -78,8 +78,6 @@ SET(ARANGOD_SOURCES
Actions/actions.cpp Actions/actions.cpp
Agency/ActivationCallback.cpp Agency/ActivationCallback.cpp
Agency/AddFollower.cpp Agency/AddFollower.cpp
Agency/AgencyCallback.cpp
Agency/AgencyCallbackRegistry.cpp
Agency/AgencyComm.cpp Agency/AgencyComm.cpp
Agency/AgencyFeature.cpp Agency/AgencyFeature.cpp
Agency/Agent.cpp Agency/Agent.cpp
@ -176,6 +174,8 @@ SET(ARANGOD_SOURCES
Aql/VariableGenerator.cpp Aql/VariableGenerator.cpp
Aql/grammar.cpp Aql/grammar.cpp
Aql/tokens.cpp Aql/tokens.cpp
Cluster/AgencyCallback.cpp
Cluster/AgencyCallbackRegistry.cpp
Cluster/ClusterComm.cpp Cluster/ClusterComm.cpp
Cluster/ClusterEdgeCursor.cpp Cluster/ClusterEdgeCursor.cpp
Cluster/ClusterFeature.cpp Cluster/ClusterFeature.cpp

View File

@ -24,7 +24,7 @@
#ifndef CLUSTER_AGENCYCALLACKREGISTRY_H #ifndef CLUSTER_AGENCYCALLACKREGISTRY_H
#define CLUSTER_AGENCYCALLACKREGISTRY_H 1 #define CLUSTER_AGENCYCALLACKREGISTRY_H 1
#include "Agency/AgencyCallback.h" #include "Cluster/AgencyCallback.h"
#include "Basics/ReadWriteLock.h" #include "Basics/ReadWriteLock.h"
namespace arangodb { namespace arangodb {

View File

@ -903,7 +903,8 @@ int ClusterInfo::createDatabaseCoordinator(std::string const& name,
(int)arangodb::rest::ResponseCode::PRECONDITION_FAILED) { (int)arangodb::rest::ResponseCode::PRECONDITION_FAILED) {
return setErrormsg(TRI_ERROR_ARANGO_DUPLICATE_NAME, errorMsg); return setErrormsg(TRI_ERROR_ARANGO_DUPLICATE_NAME, errorMsg);
} }
errorMsg = std::string("Failed to create database in ") + __FILE__ + ":" + std::to_string(__LINE__); errorMsg = std::string("Failed to create database with ") +
res._clientId + " at " + __FILE__ + ":" + std::to_string(__LINE__);
return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_CREATE_DATABASE_IN_PLAN, return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_CREATE_DATABASE_IN_PLAN,
errorMsg); errorMsg);
} }
@ -1171,6 +1172,7 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName,
LOG_TOPIC(ERR, Logger::CLUSTER) << "Could not get agency dump!"; LOG_TOPIC(ERR, Logger::CLUSTER) << "Could not get agency dump!";
} }
} else { } else {
errorMsg += std::string("\nClientId ") + res._clientId;
errorMsg += std::string("\n") + __FILE__ + std::to_string(__LINE__); errorMsg += std::string("\n") + __FILE__ + std::to_string(__LINE__);
errorMsg += std::string("\n") + res.errorMessage(); errorMsg += std::string("\n") + res.errorMessage();
errorMsg += std::string("\n") + res.errorDetails(); errorMsg += std::string("\n") + res.errorDetails();
@ -1295,9 +1297,9 @@ int ClusterInfo::dropCollectionCoordinator(std::string const& databaseName,
res = ac.sendTransactionWithFailover(trans); res = ac.sendTransactionWithFailover(trans);
if (!res.successful()) { if (!res.successful()) {
LOG(ERR) << "###################### WAS ERLAUBE? ####################";
AgencyCommResult ag = ac.getValues(""); AgencyCommResult ag = ac.getValues("");
if (ag.successful()) { if (ag.successful()) {
LOG_TOPIC(ERR, Logger::CLUSTER) << "ClientId: " << res._clientId;
LOG_TOPIC(ERR, Logger::CLUSTER) << "Agency dump:\n" LOG_TOPIC(ERR, Logger::CLUSTER) << "Agency dump:\n"
<< ag.slice().toJson(); << ag.slice().toJson();
} else { } else {
@ -1552,11 +1554,6 @@ int ClusterInfo::ensureIndexCoordinator(
std::shared_ptr<LogicalCollection> c = std::shared_ptr<LogicalCollection> c =
getCollection(databaseName, collectionID); getCollection(databaseName, collectionID);
// Note that nobody is removing this collection in the plan, since
// we hold the write lock in the agency, therefore it does not matter
// that getCollection fetches the read lock and releases it before
// we get it again.
//
READ_LOCKER(readLocker, _planProt.lock); READ_LOCKER(readLocker, _planProt.lock);
if (c == nullptr) { if (c == nullptr) {
@ -1761,6 +1758,7 @@ int ClusterInfo::ensureIndexCoordinator(
AgencyCommResult result = ac.sendTransactionWithFailover(trx, 0.0); AgencyCommResult result = ac.sendTransactionWithFailover(trx, 0.0);
if (!result.successful()) { if (!result.successful()) {
errorMsg += "ClientId: " + result._clientId;
errorMsg += std::string(" ") + __FILE__ + ":" + std::to_string(__LINE__); errorMsg += std::string(" ") + __FILE__ + ":" + std::to_string(__LINE__);
resultBuilder = *resBuilder; resultBuilder = *resBuilder;
return TRI_ERROR_CLUSTER_COULD_NOT_CREATE_INDEX_IN_PLAN; return TRI_ERROR_CLUSTER_COULD_NOT_CREATE_INDEX_IN_PLAN;
@ -1981,6 +1979,7 @@ int ClusterInfo::dropIndexCoordinator(std::string const& databaseName,
AgencyCommResult result = ac.sendTransactionWithFailover(trx, 0.0); AgencyCommResult result = ac.sendTransactionWithFailover(trx, 0.0);
if (!result.successful()) { if (!result.successful()) {
errorMsg += "ClientId: " + result._clientId;
errorMsg += std::string(" ") + __FILE__ + ":" + std::to_string(__LINE__); errorMsg += std::string(" ") + __FILE__ + ":" + std::to_string(__LINE__);
events::DropIndex(collectionID, idString, events::DropIndex(collectionID, idString,
TRI_ERROR_CLUSTER_COULD_NOT_DROP_INDEX_IN_PLAN); TRI_ERROR_CLUSTER_COULD_NOT_DROP_INDEX_IN_PLAN);
@ -2056,7 +2055,7 @@ void ClusterInfo::loadServers() {
std::vector<std::string>( std::vector<std::string>(
{AgencyCommManager::path(), "Target", "MapUniqueToShortID"})); {AgencyCommManager::path(), "Target", "MapUniqueToShortID"}));
if (serversRegistered.isObject()) { if (serversRegistered.isObject()) {
decltype(_servers) newServers; decltype(_servers) newServers;
decltype(_serverAliases) newAliases; decltype(_serverAliases) newAliases;
@ -2094,11 +2093,11 @@ void ClusterInfo::loadServers() {
} }
LOG_TOPIC(DEBUG, Logger::CLUSTER) LOG_TOPIC(DEBUG, Logger::CLUSTER)
<< "Error while loading " << prefixServers << "Error while loading " << prefixServers
<< " httpCode: " << result.httpCode() << " httpCode: " << result.httpCode()
<< " errorCode: " << result.errorCode() << " errorCode: " << result.errorCode()
<< " errorMessage: " << result.errorMessage() << " errorMessage: " << result.errorMessage()
<< " body: " << result.body(); << " body: " << result.body();
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////

View File

@ -31,7 +31,7 @@
#include <velocypack/Iterator.h> #include <velocypack/Iterator.h>
#include <velocypack/velocypack-aliases.h> #include <velocypack/velocypack-aliases.h>
#include "Agency/AgencyCallbackRegistry.h" #include "Cluster/AgencyCallbackRegistry.h"
#include "Agency/AgencyComm.h" #include "Agency/AgencyComm.h"
#include "Basics/Mutex.h" #include "Basics/Mutex.h"
#include "Basics/ReadWriteLock.h" #include "Basics/ReadWriteLock.h"

View File

@ -100,6 +100,12 @@ static VPackBuilder newShardEntry(VPackSlice oldValue, ServerID const& sid,
void FollowerInfo::add(ServerID const& sid) { void FollowerInfo::add(ServerID const& sid) {
MUTEX_LOCKER(locker, _mutex); MUTEX_LOCKER(locker, _mutex);
// First check if there is anything to do:
for (auto const& s : *_followers) {
if (s == sid) {
return; // Do nothing, if follower already there
}
}
// Fully copy the vector: // Fully copy the vector:
auto v = std::make_shared<std::vector<ServerID>>(*_followers); auto v = std::make_shared<std::vector<ServerID>>(*_followers);
v->push_back(sid); // add a single entry v->push_back(sid); // add a single entry
@ -180,11 +186,25 @@ void FollowerInfo::add(ServerID const& sid) {
void FollowerInfo::remove(ServerID const& sid) { void FollowerInfo::remove(ServerID const& sid) {
MUTEX_LOCKER(locker, _mutex); MUTEX_LOCKER(locker, _mutex);
// First check if there is anything to do:
bool found = false;
for (auto const& s : *_followers) {
if (s == sid) {
found = true;
break;
}
}
if (!found) {
return; // nothing to do
}
auto v = std::make_shared<std::vector<ServerID>>(); auto v = std::make_shared<std::vector<ServerID>>();
v->reserve(_followers->size() - 1); if (_followers->size() > 0) {
for (auto const& i : *_followers) { v->reserve(_followers->size() - 1);
if (i != sid) { for (auto const& i : *_followers) {
v->push_back(i); if (i != sid) {
v->push_back(i);
}
} }
} }
_followers = v; // will cast to std::vector<ServerID> const _followers = v; // will cast to std::vector<ServerID> const

View File

@ -37,11 +37,12 @@ class FollowerInfo {
std::shared_ptr<std::vector<ServerID> const> _followers; std::shared_ptr<std::vector<ServerID> const> _followers;
Mutex _mutex; Mutex _mutex;
arangodb::LogicalCollection* _docColl; arangodb::LogicalCollection* _docColl;
bool _isLeader;
public: public:
explicit FollowerInfo(arangodb::LogicalCollection* d) explicit FollowerInfo(arangodb::LogicalCollection* d)
: _followers(new std::vector<ServerID>()), _docColl(d) { } : _followers(new std::vector<ServerID>()), _docColl(d), _isLeader(false) { }
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
/// @brief get information about current followers of a shard. /// @brief get information about current followers of a shard.
@ -74,6 +75,22 @@ class FollowerInfo {
void clear(); void clear();
//////////////////////////////////////////////////////////////////////////////
/// @brief report if we are the leader
//////////////////////////////////////////////////////////////////////////////
bool isLeader() const {
return _isLeader;
}
//////////////////////////////////////////////////////////////////////////////
/// @brief set leadership
//////////////////////////////////////////////////////////////////////////////
void setLeader(bool b) {
_isLeader = b;
}
}; };
} // end namespace arangodb } // end namespace arangodb

View File

@ -23,7 +23,7 @@
#include "RestAgencyCallbacksHandler.h" #include "RestAgencyCallbacksHandler.h"
#include "Agency/AgencyCallbackRegistry.h" #include "Cluster/AgencyCallbackRegistry.h"
#include "Rest/HttpRequest.h" #include "Rest/HttpRequest.h"
#include "Rest/HttpResponse.h" #include "Rest/HttpResponse.h"

View File

@ -24,12 +24,12 @@
#include <stdexcept> #include <stdexcept>
#include "Agency/AgencyCallbackRegistry.h"
#include "Agency/AgencyFeature.h" #include "Agency/AgencyFeature.h"
#include "Agency/RestAgencyHandler.h" #include "Agency/RestAgencyHandler.h"
#include "Agency/RestAgencyPrivHandler.h" #include "Agency/RestAgencyPrivHandler.h"
#include "Aql/RestAqlHandler.h" #include "Aql/RestAqlHandler.h"
#include "Basics/StringUtils.h" #include "Basics/StringUtils.h"
#include "Cluster/AgencyCallbackRegistry.h"
#include "Cluster/ClusterComm.h" #include "Cluster/ClusterComm.h"
#include "Cluster/ClusterFeature.h" #include "Cluster/ClusterFeature.h"
#include "Cluster/RestAgencyCallbacksHandler.h" #include "Cluster/RestAgencyCallbacksHandler.h"

View File

@ -115,10 +115,10 @@ bool RestDocumentHandler::createDocument() {
VPackSlice body = parsedBody->slice(); VPackSlice body = parsedBody->slice();
arangodb::OperationOptions opOptions; arangodb::OperationOptions opOptions;
opOptions.isRestore = extractBooleanParameter("isRestore", false); opOptions.isRestore = extractBooleanParameter(StaticStrings::IsRestoreString, false);
opOptions.waitForSync = extractBooleanParameter("waitForSync", false); opOptions.waitForSync = extractBooleanParameter(StaticStrings::WaitForSyncString, false);
opOptions.returnNew = extractBooleanParameter("returnNew", false); opOptions.returnNew = extractBooleanParameter(StaticStrings::ReturnNewString, false);
opOptions.silent = extractBooleanParameter("silent", false); opOptions.silent = extractBooleanParameter(StaticStrings::SilentString, false);
// find and load collection given by name or identifier // find and load collection given by name or identifier
auto transactionContext(StandaloneTransactionContext::Create(_vocbase)); auto transactionContext(StandaloneTransactionContext::Create(_vocbase));
@ -380,12 +380,12 @@ bool RestDocumentHandler::modifyDocument(bool isPatch) {
} }
OperationOptions opOptions; OperationOptions opOptions;
opOptions.isRestore = extractBooleanParameter("isRestore", false); opOptions.isRestore = extractBooleanParameter(StaticStrings::IsRestoreString, false);
opOptions.ignoreRevs = extractBooleanParameter("ignoreRevs", true); opOptions.ignoreRevs = extractBooleanParameter(StaticStrings::IgnoreRevsString, true);
opOptions.waitForSync = extractBooleanParameter("waitForSync", false); opOptions.waitForSync = extractBooleanParameter(StaticStrings::WaitForSyncString, false);
opOptions.returnNew = extractBooleanParameter("returnNew", false); opOptions.returnNew = extractBooleanParameter(StaticStrings::ReturnNewString, false);
opOptions.returnOld = extractBooleanParameter("returnOld", false); opOptions.returnOld = extractBooleanParameter(StaticStrings::ReturnOldString, false);
opOptions.silent = extractBooleanParameter("silent", false); opOptions.silent = extractBooleanParameter(StaticStrings::SilentString, false);
// extract the revision, if single document variant and header given: // extract the revision, if single document variant and header given:
std::shared_ptr<VPackBuilder> builder; std::shared_ptr<VPackBuilder> builder;
@ -442,8 +442,8 @@ bool RestDocumentHandler::modifyDocument(bool isPatch) {
OperationResult result(TRI_ERROR_NO_ERROR); OperationResult result(TRI_ERROR_NO_ERROR);
if (isPatch) { if (isPatch) {
// patching an existing document // patching an existing document
opOptions.keepNull = extractBooleanParameter("keepNull", true); opOptions.keepNull = extractBooleanParameter(StaticStrings::KeepNullString, true);
opOptions.mergeObjects = extractBooleanParameter("mergeObjects", true); opOptions.mergeObjects = extractBooleanParameter(StaticStrings::MergeObjectsString, true);
result = trx.update(collectionName, body, opOptions); result = trx.update(collectionName, body, opOptions);
} else { } else {
result = trx.replace(collectionName, body, opOptions); result = trx.replace(collectionName, body, opOptions);
@ -507,10 +507,10 @@ bool RestDocumentHandler::deleteDocument() {
} }
OperationOptions opOptions; OperationOptions opOptions;
opOptions.returnOld = extractBooleanParameter("returnOld", false); opOptions.returnOld = extractBooleanParameter(StaticStrings::ReturnOldString, false);
opOptions.ignoreRevs = extractBooleanParameter("ignoreRevs", true); opOptions.ignoreRevs = extractBooleanParameter(StaticStrings::IgnoreRevsString, true);
opOptions.waitForSync = extractBooleanParameter("waitForSync", false); opOptions.waitForSync = extractBooleanParameter(StaticStrings::WaitForSyncString, false);
opOptions.silent = extractBooleanParameter("silent", false); opOptions.silent = extractBooleanParameter(StaticStrings::SilentString, false);
auto transactionContext(StandaloneTransactionContext::Create(_vocbase)); auto transactionContext(StandaloneTransactionContext::Create(_vocbase));
@ -600,7 +600,7 @@ bool RestDocumentHandler::readManyDocuments() {
std::string const& collectionName = suffixes[0]; std::string const& collectionName = suffixes[0];
OperationOptions opOptions; OperationOptions opOptions;
opOptions.ignoreRevs = extractBooleanParameter("ignoreRevs", true); opOptions.ignoreRevs = extractBooleanParameter(StaticStrings::IgnoreRevsString, true);
auto transactionContext(StandaloneTransactionContext::Create(_vocbase)); auto transactionContext(StandaloneTransactionContext::Create(_vocbase));
SingleCollectionTransaction trx(transactionContext, collectionName, SingleCollectionTransaction trx(transactionContext, collectionName,

View File

@ -3447,7 +3447,6 @@ void RestReplicationHandler::handleCommandRemoveFollower() {
"did not find collection"); "did not find collection");
return; return;
} }
col->followers()->remove(followerId.copyString()); col->followers()->remove(followerId.copyString());
VPackBuilder b; VPackBuilder b;

View File

@ -608,7 +608,7 @@ TRI_voc_rid_t RestVocbaseBaseHandler::extractRevision(char const* header,
/// @brief extracts a boolean parameter value /// @brief extracts a boolean parameter value
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
bool RestVocbaseBaseHandler::extractBooleanParameter(char const* name, bool RestVocbaseBaseHandler::extractBooleanParameter(std::string const& name,
bool def) const { bool def) const {
bool found; bool found;
std::string const& value = _request->value(name, found); std::string const& value = _request->value(name, found);

View File

@ -264,7 +264,11 @@ class RestVocbaseBaseHandler : public RestBaseHandler {
/// @brief extracts a boolean parameter value /// @brief extracts a boolean parameter value
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
bool extractBooleanParameter(char const* name, bool def) const; bool extractBooleanParameter(std::string const& name, bool def) const;
bool extractBooleanParameter(char const* name, bool def) const {
return extractBooleanParameter(std::string(name), def);
}
protected: protected:
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////

View File

@ -1199,7 +1199,7 @@ void MMFilesCollection::removeRevision(TRI_voc_rid_t revisionId, bool updateStat
TRI_ASSERT(revisionId != 0); TRI_ASSERT(revisionId != 0);
if (updateStats) { if (updateStats) {
MMFilesDocumentPosition const old = _revisionsCache.fetchAndRemove(revisionId); MMFilesDocumentPosition const old = _revisionsCache.fetchAndRemove(revisionId);
if (old && !old.pointsToWal()) { if (old && !old.pointsToWal() && old.fid() != 0) {
TRI_ASSERT(old.dataptr() != nullptr); TRI_ASSERT(old.dataptr() != nullptr);
uint8_t const* vpack = static_cast<uint8_t const*>(old.dataptr()); uint8_t const* vpack = static_cast<uint8_t const*>(old.dataptr());
int64_t size = MMFilesDatafileHelper::AlignedSize<int64_t>(arangodb::MMFilesDatafileHelper::VPackOffset(TRI_DF_MARKER_VPACK_DOCUMENT) + VPackSlice(vpack).byteSize()); int64_t size = MMFilesDatafileHelper::AlignedSize<int64_t>(arangodb::MMFilesDatafileHelper::VPackOffset(TRI_DF_MARKER_VPACK_DOCUMENT) + VPackSlice(vpack).byteSize());

View File

@ -374,16 +374,11 @@ int MMFilesCollectorThread::collectLogfiles(bool& worked) {
try { try {
int res = collect(logfile); int res = collect(logfile);
// LOG_TOPIC(TRACE, Logger::COLLECTOR) << "collected logfile: " << // logfile->id() << ". result: " // LOG_TOPIC(TRACE, Logger::COLLECTOR) << "collected logfile: " << logfile->id() << ". result: " << res;
// << res;
if (res == TRI_ERROR_NO_ERROR) { if (res == TRI_ERROR_NO_ERROR) {
// reset collector status // reset collector status
{ broadcastCollectorResult(res);
CONDITION_LOCKER(guard, _collectorResultCondition);
_collectorResult = TRI_ERROR_NO_ERROR;
_collectorResultCondition.broadcast();
}
RocksDBFeature::syncWal(); RocksDBFeature::syncWal();
@ -393,11 +388,7 @@ int MMFilesCollectorThread::collectLogfiles(bool& worked) {
_logfileManager->forceStatus(logfile, wal::Logfile::StatusType::SEALED); _logfileManager->forceStatus(logfile, wal::Logfile::StatusType::SEALED);
// set error in collector // set error in collector
{ broadcastCollectorResult(res);
CONDITION_LOCKER(guard, _collectorResultCondition);
_collectorResult = res;
_collectorResultCondition.broadcast();
}
} }
return res; return res;
@ -979,3 +970,9 @@ int MMFilesCollectorThread::updateDatafileStatistics(
return TRI_ERROR_NO_ERROR; return TRI_ERROR_NO_ERROR;
} }
void MMFilesCollectorThread::broadcastCollectorResult(int res) {
CONDITION_LOCKER(guard, _collectorResultCondition);
_collectorResult = res;
_collectorResultCondition.broadcast();
}

View File

@ -103,6 +103,8 @@ class MMFilesCollectorThread final : public Thread {
/// @brief update a collection's datafile information /// @brief update a collection's datafile information
int updateDatafileStatistics(LogicalCollection*, MMFilesCollectorCache*); int updateDatafileStatistics(LogicalCollection*, MMFilesCollectorCache*);
void broadcastCollectorResult(int res);
private: private:
/// @brief the logfile manager /// @brief the logfile manager
wal::LogfileManager* _logfileManager; wal::LogfileManager* _logfileManager;

View File

@ -39,26 +39,6 @@ MMFilesDocumentOperation::MMFilesDocumentOperation(LogicalCollection* collection
} }
MMFilesDocumentOperation::~MMFilesDocumentOperation() { MMFilesDocumentOperation::~MMFilesDocumentOperation() {
TRI_ASSERT(_status != StatusType::INDEXED);
if (_status == StatusType::HANDLED) {
try {
if (_type == TRI_VOC_DOCUMENT_OPERATION_UPDATE ||
_type == TRI_VOC_DOCUMENT_OPERATION_REPLACE) {
// remove old, now unused revision
TRI_ASSERT(!_oldRevision.empty());
TRI_ASSERT(!_newRevision.empty());
_collection->removeRevision(_oldRevision._revisionId, true);
} else if (_type == TRI_VOC_DOCUMENT_OPERATION_REMOVE) {
// remove old, now unused revision
TRI_ASSERT(!_oldRevision.empty());
TRI_ASSERT(_newRevision.empty());
_collection->removeRevision(_oldRevision._revisionId, true);
}
} catch (...) {
// never throw here because of destructor
}
}
} }
MMFilesDocumentOperation* MMFilesDocumentOperation::swap() { MMFilesDocumentOperation* MMFilesDocumentOperation::swap() {
@ -109,15 +89,12 @@ void MMFilesDocumentOperation::setRevisions(DocumentDescriptor const& oldRevisio
void MMFilesDocumentOperation::revert(arangodb::Transaction* trx) { void MMFilesDocumentOperation::revert(arangodb::Transaction* trx) {
TRI_ASSERT(trx != nullptr); TRI_ASSERT(trx != nullptr);
if (_status == StatusType::CREATED || if (_status == StatusType::SWAPPED || _status == StatusType::REVERTED) {
_status == StatusType::SWAPPED ||
_status == StatusType::REVERTED) {
return; return;
} }
TRI_ASSERT(_status == StatusType::INDEXED || _status == StatusType::HANDLED); // fetch old status and set it to reverted now
StatusType status = _status;
// set to reverted now
_status = StatusType::REVERTED; _status = StatusType::REVERTED;
TRI_voc_rid_t oldRevisionId = 0; TRI_voc_rid_t oldRevisionId = 0;
@ -136,39 +113,82 @@ void MMFilesDocumentOperation::revert(arangodb::Transaction* trx) {
newDoc = VPackSlice(_newRevision._vpack); newDoc = VPackSlice(_newRevision._vpack);
} }
try { // clear caches so the following operations all use
_collection->rollbackOperation(trx, _type, oldRevisionId, oldDoc, newRevisionId, newDoc); if (oldRevisionId != 0) {
} catch (...) { _collection->removeRevisionCacheEntry(oldRevisionId);
// TODO: decide whether we should rethrow here }
if (newRevisionId != 0) {
_collection->removeRevisionCacheEntry(newRevisionId);
} }
if (_type == TRI_VOC_DOCUMENT_OPERATION_INSERT) { if (_type == TRI_VOC_DOCUMENT_OPERATION_INSERT) {
TRI_ASSERT(_oldRevision.empty()); TRI_ASSERT(_oldRevision.empty());
TRI_ASSERT(!_newRevision.empty()); TRI_ASSERT(!_newRevision.empty());
if (status != StatusType::CREATED) {
// remove revision from indexes
try {
_collection->rollbackOperation(trx, _type, oldRevisionId, oldDoc, newRevisionId, newDoc);
} catch (...) {
}
}
// remove now obsolete new revision // remove now obsolete new revision
try { try {
_collection->removeRevision(newRevisionId, true); _collection->removeRevision(newRevisionId, true);
} catch (...) { } catch (...) {
// operation probably was never inserted // operation probably was never inserted
// TODO: decide whether we should rethrow here
} }
} else if (_type == TRI_VOC_DOCUMENT_OPERATION_UPDATE || } else if (_type == TRI_VOC_DOCUMENT_OPERATION_UPDATE ||
_type == TRI_VOC_DOCUMENT_OPERATION_REPLACE) { _type == TRI_VOC_DOCUMENT_OPERATION_REPLACE) {
TRI_ASSERT(!_oldRevision.empty()); TRI_ASSERT(!_oldRevision.empty());
TRI_ASSERT(!_newRevision.empty()); TRI_ASSERT(!_newRevision.empty());
try {
// re-insert the old revision
_collection->insertRevision(_oldRevision._revisionId, _oldRevision._vpack, 0, true);
} catch (...) {
}
if (status != StatusType::CREATED) {
try {
// restore the old index state
_collection->rollbackOperation(trx, _type, oldRevisionId, oldDoc, newRevisionId, newDoc);
} catch (...) {
}
}
// let the primary index entry point to the correct document
SimpleIndexElement* element = _collection->primaryIndex()->lookupKeyRef(trx, Transaction::extractKeyFromDocument(newDoc)); SimpleIndexElement* element = _collection->primaryIndex()->lookupKeyRef(trx, Transaction::extractKeyFromDocument(newDoc));
if (element != nullptr && element->revisionId() != 0) { if (element != nullptr && element->revisionId() != 0) {
VPackSlice keySlice(Transaction::extractKeyFromDocument(oldDoc)); VPackSlice keySlice(Transaction::extractKeyFromDocument(oldDoc));
element->updateRevisionId(oldRevisionId, static_cast<uint32_t>(keySlice.begin() - oldDoc.begin())); element->updateRevisionId(oldRevisionId, static_cast<uint32_t>(keySlice.begin() - oldDoc.begin()));
} }
_collection->updateRevision(oldRevisionId, oldDoc.begin(), 0, false);
// remove now obsolete new revision // remove now obsolete new revision
if (oldRevisionId != newRevisionId) {
// we need to check for the same revision id here
try {
_collection->removeRevision(newRevisionId, true);
} catch (...) {
}
}
} else if (_type == TRI_VOC_DOCUMENT_OPERATION_REMOVE) {
TRI_ASSERT(!_oldRevision.empty());
TRI_ASSERT(_newRevision.empty());
try { try {
_collection->removeRevision(newRevisionId, true); _collection->insertRevision(_oldRevision._revisionId, _oldRevision._vpack, 0, true);
} catch (...) { } catch (...) {
// operation probably was never inserted }
// TODO: decide whether we should rethrow here
if (status != StatusType::CREATED) {
try {
// remove from indexes again
_collection->rollbackOperation(trx, _type, oldRevisionId, oldDoc, newRevisionId, newDoc);
} catch (...) {
}
} }
} }
} }

View File

@ -21,8 +21,8 @@
/// @author Jan Steemann /// @author Jan Steemann
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGOD_WAL_DOCUMENT_OPERATION_H #ifndef ARANGOD_MMFILES_DOCUMENT_OPERATION_H
#define ARANGOD_WAL_DOCUMENT_OPERATION_H 1 #define ARANGOD_MMFILES_DOCUMENT_OPERATION_H 1
#include "Basics/Common.h" #include "Basics/Common.h"
#include "VocBase/voc-types.h" #include "VocBase/voc-types.h"
@ -71,10 +71,6 @@ struct MMFilesDocumentOperation {
_status = StatusType::HANDLED; _status = StatusType::HANDLED;
} }
void done() noexcept {
_status = StatusType::SWAPPED;
}
void revert(arangodb::Transaction*); void revert(arangodb::Transaction*);
private: private:

View File

@ -945,7 +945,7 @@ static void JS_FiguresVocbaseCol(
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief was docuBlock collectionLoad /// @brief was docuBlock leaderResign
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
static void JS_LeaderResign(v8::FunctionCallbackInfo<v8::Value> const& args) { static void JS_LeaderResign(v8::FunctionCallbackInfo<v8::Value> const& args) {
@ -981,13 +981,156 @@ static void JS_LeaderResign(v8::FunctionCallbackInfo<v8::Value> const& args) {
if (res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_NO_ERROR) {
TRI_V8_THROW_EXCEPTION(res); TRI_V8_THROW_EXCEPTION(res);
} }
trx.documentCollection()->followers()->clear(); // do not reset followers at this time...we are still the only source of truth
// to trust...
//trx.documentCollection()->followers()->clear();
trx.documentCollection()->followers()->setLeader(false);
} }
TRI_V8_RETURN_UNDEFINED(); TRI_V8_RETURN_UNDEFINED();
TRI_V8_TRY_CATCH_END TRI_V8_TRY_CATCH_END
} }
////////////////////////////////////////////////////////////////////////////////
/// @brief was docuBlock assumeLeadership
////////////////////////////////////////////////////////////////////////////////
static void JS_AssumeLeadership(v8::FunctionCallbackInfo<v8::Value> const& args) {
TRI_V8_TRY_CATCH_BEGIN(isolate);
v8::HandleScope scope(isolate);
TRI_vocbase_t* vocbase = GetContextVocBase(isolate);
if (vocbase == nullptr) {
TRI_V8_THROW_EXCEPTION(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND);
}
if (ServerState::instance()->isDBServer()) {
arangodb::LogicalCollection const* collection =
TRI_UnwrapClass<arangodb::LogicalCollection>(args.Holder(),
WRP_VOCBASE_COL_TYPE);
if (collection == nullptr) {
TRI_V8_THROW_EXCEPTION_INTERNAL("cannot extract collection");
}
TRI_vocbase_t* vocbase = collection->vocbase();
std::string collectionName = collection->name();
if (vocbase == nullptr) {
TRI_V8_THROW_EXCEPTION(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND);
}
auto transactionContext = std::make_shared<V8TransactionContext>(vocbase, true);
SingleCollectionTransaction trx(transactionContext, collectionName,
TRI_TRANSACTION_READ);
int res = trx.begin();
if (res != TRI_ERROR_NO_ERROR) {
TRI_V8_THROW_EXCEPTION(res);
}
trx.documentCollection()->followers()->clear();
trx.documentCollection()->followers()->setLeader(true);
}
TRI_V8_RETURN_UNDEFINED();
TRI_V8_TRY_CATCH_END
}
////////////////////////////////////////////////////////////////////////////////
/// @brief was docuBlock isLeader
////////////////////////////////////////////////////////////////////////////////
static void JS_IsLeader(v8::FunctionCallbackInfo<v8::Value> const& args) {
TRI_V8_TRY_CATCH_BEGIN(isolate);
v8::HandleScope scope(isolate);
TRI_vocbase_t* vocbase = GetContextVocBase(isolate);
if (vocbase == nullptr) {
TRI_V8_THROW_EXCEPTION(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND);
}
bool b = false;
if (ServerState::instance()->isDBServer()) {
arangodb::LogicalCollection const* collection =
TRI_UnwrapClass<arangodb::LogicalCollection>(args.Holder(),
WRP_VOCBASE_COL_TYPE);
if (collection == nullptr) {
TRI_V8_THROW_EXCEPTION_INTERNAL("cannot extract collection");
}
TRI_vocbase_t* vocbase = collection->vocbase();
std::string collectionName = collection->name();
if (vocbase == nullptr) {
TRI_V8_THROW_EXCEPTION(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND);
}
auto realCollection = vocbase->lookupCollection(collectionName);
if (realCollection == nullptr) {
TRI_V8_THROW_EXCEPTION(TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND);
}
b = realCollection->followers()->isLeader();
}
if (b) {
TRI_V8_RETURN_TRUE();
} else {
TRI_V8_RETURN_FALSE();
}
TRI_V8_TRY_CATCH_END
}
////////////////////////////////////////////////////////////////////////////////
/// @brief was docuBlock getFollowers
////////////////////////////////////////////////////////////////////////////////
static void JS_GetFollowers(v8::FunctionCallbackInfo<v8::Value> const& args) {
TRI_V8_TRY_CATCH_BEGIN(isolate);
v8::HandleScope scope(isolate);
TRI_vocbase_t* vocbase = GetContextVocBase(isolate);
if (vocbase == nullptr) {
TRI_V8_THROW_EXCEPTION(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND);
}
v8::Handle<v8::Array> list = v8::Array::New(isolate);
if (ServerState::instance()->isDBServer()) {
arangodb::LogicalCollection const* collection =
TRI_UnwrapClass<arangodb::LogicalCollection>(args.Holder(),
WRP_VOCBASE_COL_TYPE);
if (collection == nullptr) {
TRI_V8_THROW_EXCEPTION_INTERNAL("cannot extract collection");
}
TRI_vocbase_t* vocbase = collection->vocbase();
std::string collectionName = collection->name();
if (vocbase == nullptr) {
TRI_V8_THROW_EXCEPTION(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND);
}
auto transactionContext = std::make_shared<V8TransactionContext>(vocbase, true);
SingleCollectionTransaction trx(transactionContext, collectionName,
TRI_TRANSACTION_READ);
int res = trx.begin();
if (res != TRI_ERROR_NO_ERROR) {
TRI_V8_THROW_EXCEPTION(res);
}
std::unique_ptr<arangodb::FollowerInfo> const& followerInfo = trx.documentCollection()->followers();
std::shared_ptr<std::vector<ServerID> const> followers = followerInfo->get();
uint32_t i = 0;
for (auto const& n : *followers) {
list->Set(i++, TRI_V8_STD_STRING(n));
}
}
TRI_V8_RETURN(list);
TRI_V8_TRY_CATCH_END
}
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief was docuBlock collectionLoad /// @brief was docuBlock collectionLoad
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -3213,6 +3356,12 @@ void TRI_InitV8Collection(v8::Handle<v8::Context> context,
JS_InsertVocbaseCol); JS_InsertVocbaseCol);
TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING("leaderResign"), TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING("leaderResign"),
JS_LeaderResign, true); JS_LeaderResign, true);
TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING("assumeLeadership"),
JS_AssumeLeadership, true);
TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING("isLeader"),
JS_IsLeader, true);
TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING("getFollowers"),
JS_GetFollowers, true);
TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING("load"), TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING("load"),
JS_LoadVocbaseCol); JS_LoadVocbaseCol);
TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING("name"), TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING("name"),

View File

@ -2282,9 +2282,15 @@ int LogicalCollection::update(Transaction* trx, VPackSlice const newSlice,
try { try {
insertRevision(revisionId, marker->vpack(), 0, true); insertRevision(revisionId, marker->vpack(), 0, true);
operation.setRevisions(DocumentDescriptor(oldRevisionId, oldDoc.begin()), operation.setRevisions(DocumentDescriptor(oldRevisionId, oldDoc.begin()),
DocumentDescriptor(revisionId, newDoc.begin())); DocumentDescriptor(revisionId, newDoc.begin()));
if (oldRevisionId == revisionId) {
// update with same revision id => can happen if isRestore = true
result.clear(0);
}
res = updateDocument(trx, oldRevisionId, oldDoc, revisionId, newDoc, res = updateDocument(trx, oldRevisionId, oldDoc, revisionId, newDoc,
operation, marker, options.waitForSync); operation, marker, options.waitForSync);
} catch (basics::Exception const& ex) { } catch (basics::Exception const& ex) {
@ -2439,9 +2445,15 @@ int LogicalCollection::replace(Transaction* trx, VPackSlice const newSlice,
try { try {
insertRevision(revisionId, marker->vpack(), 0, true); insertRevision(revisionId, marker->vpack(), 0, true);
operation.setRevisions(DocumentDescriptor(oldRevisionId, oldDoc.begin()), operation.setRevisions(DocumentDescriptor(oldRevisionId, oldDoc.begin()),
DocumentDescriptor(revisionId, newDoc.begin())); DocumentDescriptor(revisionId, newDoc.begin()));
if (oldRevisionId == revisionId) {
// update with same revision id => can happen if isRestore = true
result.clear(0);
}
res = updateDocument(trx, oldRevisionId, oldDoc, revisionId, newDoc, res = updateDocument(trx, oldRevisionId, oldDoc, revisionId, newDoc,
operation, marker, options.waitForSync); operation, marker, options.waitForSync);
} catch (basics::Exception const& ex) { } catch (basics::Exception const& ex) {
@ -2455,6 +2467,10 @@ int LogicalCollection::replace(Transaction* trx, VPackSlice const newSlice,
if (res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_NO_ERROR) {
operation.revert(trx); operation.revert(trx);
} else { } else {
if (oldRevisionId == revisionId) {
// update with same revision id => can happen if isRestore = true
result.clear(0);
}
readRevision(trx, result, revisionId); readRevision(trx, result, revisionId);
if (options.waitForSync) { if (options.waitForSync) {
@ -2591,6 +2607,11 @@ int LogicalCollection::remove(arangodb::Transaction* trx,
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG); THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
} }
try {
removeRevision(oldRevisionId, true);
} catch (...) {
}
TRI_IF_FAILURE("RemoveDocumentNoOperationExcept") { TRI_IF_FAILURE("RemoveDocumentNoOperationExcept") {
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG); THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
} }
@ -2684,6 +2705,11 @@ int LogicalCollection::remove(arangodb::Transaction* trx,
operation.indexed(); operation.indexed();
try {
removeRevision(oldRevisionId, true);
} catch (...) {
}
TRI_IF_FAILURE("RemoveDocumentNoOperation") { TRI_IF_FAILURE("RemoveDocumentNoOperation") {
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG); THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
} }
@ -2725,17 +2751,11 @@ int LogicalCollection::rollbackOperation(arangodb::Transaction* trx,
TRI_ASSERT(newRevisionId != 0); TRI_ASSERT(newRevisionId != 0);
TRI_ASSERT(!newDoc.isNone()); TRI_ASSERT(!newDoc.isNone());
removeRevisionCacheEntry(newRevisionId);
// ignore any errors we're getting from this // ignore any errors we're getting from this
deletePrimaryIndex(trx, newRevisionId, newDoc); deletePrimaryIndex(trx, newRevisionId, newDoc);
deleteSecondaryIndexes(trx, newRevisionId, newDoc, true); deleteSecondaryIndexes(trx, newRevisionId, newDoc, true);
// remove new revision
try {
removeRevision(newRevisionId, false);
} catch (...) {
// TODO: decide whether we should rethrow here
}
return TRI_ERROR_NO_ERROR; return TRI_ERROR_NO_ERROR;
} }
@ -2745,6 +2765,10 @@ int LogicalCollection::rollbackOperation(arangodb::Transaction* trx,
TRI_ASSERT(!oldDoc.isNone()); TRI_ASSERT(!oldDoc.isNone());
TRI_ASSERT(newRevisionId != 0); TRI_ASSERT(newRevisionId != 0);
TRI_ASSERT(!newDoc.isNone()); TRI_ASSERT(!newDoc.isNone());
removeRevisionCacheEntry(oldRevisionId);
removeRevisionCacheEntry(newRevisionId);
// remove the current values from the indexes // remove the current values from the indexes
deleteSecondaryIndexes(trx, newRevisionId, newDoc, true); deleteSecondaryIndexes(trx, newRevisionId, newDoc, true);
// re-insert old state // re-insert old state
@ -2758,6 +2782,8 @@ int LogicalCollection::rollbackOperation(arangodb::Transaction* trx,
TRI_ASSERT(newRevisionId == 0); TRI_ASSERT(newRevisionId == 0);
TRI_ASSERT(newDoc.isNone()); TRI_ASSERT(newDoc.isNone());
removeRevisionCacheEntry(oldRevisionId);
int res = insertPrimaryIndex(trx, oldRevisionId, oldDoc); int res = insertPrimaryIndex(trx, oldRevisionId, oldDoc);
if (res == TRI_ERROR_NO_ERROR) { if (res == TRI_ERROR_NO_ERROR) {
@ -3292,8 +3318,6 @@ int LogicalCollection::updateDocument(
// rollback // rollback
deleteSecondaryIndexes(trx, newRevisionId, newDoc, true); deleteSecondaryIndexes(trx, newRevisionId, newDoc, true);
insertSecondaryIndexes(trx, oldRevisionId, oldDoc, true); insertSecondaryIndexes(trx, oldRevisionId, oldDoc, true);
removeRevision(newRevisionId, false);
return res; return res;
} }
@ -3309,6 +3333,16 @@ int LogicalCollection::updateDocument(
operation.indexed(); operation.indexed();
if (oldRevisionId != newRevisionId) {
try {
removeRevision(oldRevisionId, true);
} catch (...) {
}
} else {
// clear readcache entry for the revision
removeRevisionCacheEntry(oldRevisionId);
}
TRI_IF_FAILURE("UpdateDocumentNoOperation") { return TRI_ERROR_DEBUG; } TRI_IF_FAILURE("UpdateDocumentNoOperation") { return TRI_ERROR_DEBUG; }
TRI_IF_FAILURE("UpdateDocumentNoOperationExcept") { TRI_IF_FAILURE("UpdateDocumentNoOperationExcept") {
@ -3770,13 +3804,18 @@ bool LogicalCollection::updateRevisionConditional(
void LogicalCollection::removeRevision(TRI_voc_rid_t revisionId, void LogicalCollection::removeRevision(TRI_voc_rid_t revisionId,
bool updateStats) { bool updateStats) {
// clean up cache entry // clean up cache entry
TRI_ASSERT(_revisionsCache); removeRevisionCacheEntry(revisionId);
_revisionsCache->removeRevision(revisionId);
// and remove from storage engine // and remove from storage engine
getPhysical()->removeRevision(revisionId, updateStats); getPhysical()->removeRevision(revisionId, updateStats);
} }
void LogicalCollection::removeRevisionCacheEntry(TRI_voc_rid_t revisionId) {
// clean up cache entry
TRI_ASSERT(_revisionsCache);
_revisionsCache->removeRevision(revisionId);
}
/// @brief a method to skip certain documents in AQL write operations, /// @brief a method to skip certain documents in AQL write operations,
/// this is only used in the enterprise edition for smart graphs /// this is only used in the enterprise edition for smart graphs
#ifndef USE_ENTERPRISE #ifndef USE_ENTERPRISE

View File

@ -390,6 +390,7 @@ class LogicalCollection {
void updateRevision(TRI_voc_rid_t revisionId, uint8_t const* dataptr, TRI_voc_fid_t fid, bool isInWal); void updateRevision(TRI_voc_rid_t revisionId, uint8_t const* dataptr, TRI_voc_fid_t fid, bool isInWal);
bool updateRevisionConditional(TRI_voc_rid_t revisionId, TRI_df_marker_t const* oldPosition, TRI_df_marker_t const* newPosition, TRI_voc_fid_t newFid, bool isInWal); bool updateRevisionConditional(TRI_voc_rid_t revisionId, TRI_df_marker_t const* oldPosition, TRI_df_marker_t const* newPosition, TRI_voc_fid_t newFid, bool isInWal);
void removeRevision(TRI_voc_rid_t revisionId, bool updateStats); void removeRevision(TRI_voc_rid_t revisionId, bool updateStats);
void removeRevisionCacheEntry(TRI_voc_rid_t revisionId);
private: private:
// SECTION: Index creation // SECTION: Index creation

View File

@ -193,7 +193,6 @@ static void FreeOperations(arangodb::Transaction* activeTrx, TRI_transaction_t*
try { try {
op->revert(activeTrx); op->revert(activeTrx);
} catch (...) { } catch (...) {
// TODO: decide whether we should rethrow here
} }
delete op; delete op;
} }
@ -201,10 +200,7 @@ static void FreeOperations(arangodb::Transaction* activeTrx, TRI_transaction_t*
// no rollback. simply delete all operations // no rollback. simply delete all operations
for (auto it = trxCollection->_operations->rbegin(); for (auto it = trxCollection->_operations->rbegin();
it != trxCollection->_operations->rend(); ++it) { it != trxCollection->_operations->rend(); ++it) {
MMFilesDocumentOperation* op = (*it); delete (*it);
//op->done(); // set to done so dtor of DocumentOperation won't fail
delete op;
} }
} }

View File

@ -121,14 +121,18 @@ struct DocumentDescriptor {
DocumentDescriptor(TRI_voc_rid_t revisionId, uint8_t const* vpack) : _revisionId(revisionId), _vpack(vpack) {} DocumentDescriptor(TRI_voc_rid_t revisionId, uint8_t const* vpack) : _revisionId(revisionId), _vpack(vpack) {}
bool empty() const { return _vpack == nullptr; } bool empty() const { return _vpack == nullptr; }
void reset(DocumentDescriptor const& other) { void reset(DocumentDescriptor const& other) {
_revisionId = other._revisionId; _revisionId = other._revisionId;
_vpack = other._vpack; _vpack = other._vpack;
} }
/*
void reset(TRI_voc_rid_t revisionId, uint8_t const* vpack) { void reset(TRI_voc_rid_t revisionId, uint8_t const* vpack) {
_revisionId = revisionId; _revisionId = revisionId;
_vpack = vpack; _vpack = vpack;
} }
*/
void clear() { void clear() {
_revisionId = 0; _revisionId = 0;
_vpack = nullptr; _vpack = nullptr;

View File

@ -869,10 +869,11 @@ int LogfileManager::flush(bool waitForSync, bool waitForCollector,
if (res == TRI_ERROR_NO_ERROR) { if (res == TRI_ERROR_NO_ERROR) {
// we need to wait for the collector... // we need to wait for the collector...
// LOG(TRACE) << "entering waitForCollector with lastOpenLogfileId " << // // LOG(TRACE) << "entering waitForCollector with lastOpenLogfileId " << lastOpenLogfileId;
// (unsigned long long) lastOpenLogfileId;
res = this->waitForCollector(lastOpenLogfileId, maxWaitTime); res = this->waitForCollector(lastOpenLogfileId, maxWaitTime);
if (res == TRI_ERROR_LOCK_TIMEOUT) { if (res == TRI_ERROR_LOCK_TIMEOUT) {
LOG(ERR) << "got lock timeout when waiting for WAL flush. lastOpenLogfileId: " << lastOpenLogfileId;
} }
} else if (res == TRI_ERROR_ARANGO_DATAFILE_EMPTY) { } else if (res == TRI_ERROR_ARANGO_DATAFILE_EMPTY) {
// current logfile is empty and cannot be collected // current logfile is empty and cannot be collected
@ -881,6 +882,10 @@ int LogfileManager::flush(bool waitForSync, bool waitForCollector,
if (lastSealedLogfileId > 0) { if (lastSealedLogfileId > 0) {
res = this->waitForCollector(lastSealedLogfileId, maxWaitTime); res = this->waitForCollector(lastSealedLogfileId, maxWaitTime);
if (res == TRI_ERROR_LOCK_TIMEOUT) {
LOG(ERR) << "got lock timeout when waiting for WAL flush. lastSealedLogfileId: " << lastSealedLogfileId;
}
} }
} }
} }
@ -1305,10 +1310,6 @@ Logfile* LogfileManager::getLogfile(Logfile::IdType id,
int LogfileManager::getWriteableLogfile(uint32_t size, int LogfileManager::getWriteableLogfile(uint32_t size,
Logfile::StatusType& status, Logfile::StatusType& status,
Logfile*& result) { Logfile*& result) {
static uint64_t const SleepTime = 10 * 1000;
double const end = TRI_microtime() + 15.0;
size_t iterations = 0;
// always initialize the result // always initialize the result
result = nullptr; result = nullptr;
@ -1317,6 +1318,9 @@ int LogfileManager::getWriteableLogfile(uint32_t size,
return TRI_ERROR_DEBUG; return TRI_ERROR_DEBUG;
} }
size_t iterations = 0;
double const end = TRI_microtime() + 15.0;
while (true) { while (true) {
{ {
WRITE_LOCKER(writeLocker, _logfilesLock); WRITE_LOCKER(writeLocker, _logfilesLock);
@ -1363,7 +1367,7 @@ int LogfileManager::getWriteableLogfile(uint32_t size,
_allocatorThread->signal(size); _allocatorThread->signal(size);
} }
int res = _allocatorThread->waitForResult(SleepTime); int res = _allocatorThread->waitForResult(15000);
if (res != TRI_ERROR_LOCK_TIMEOUT && res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_LOCK_TIMEOUT && res != TRI_ERROR_NO_ERROR) {
TRI_ASSERT(result == nullptr); TRI_ASSERT(result == nullptr);
@ -1680,43 +1684,42 @@ void LogfileManager::waitForCollector() {
// wait until a specific logfile has been collected // wait until a specific logfile has been collected
int LogfileManager::waitForCollector(Logfile::IdType logfileId, int LogfileManager::waitForCollector(Logfile::IdType logfileId,
double maxWaitTime) { double maxWaitTime) {
static int64_t const SingleWaitPeriod = 50 * 1000; if (maxWaitTime <= 0.0) {
maxWaitTime = 24.0 * 3600.0; // wait "forever"
int64_t maxIterations = INT64_MAX; // wait forever
if (maxWaitTime > 0.0) {
// if specified, wait for a shorter period of time
maxIterations = static_cast<int64_t>(maxWaitTime * 1000000.0 /
(double)SingleWaitPeriod);
LOG(TRACE) << "will wait for max. " << maxWaitTime
<< " seconds for collector to finish";
} }
LOG(TRACE) << "waiting for collector thread to collect logfile " << logfileId; LOG(TRACE) << "waiting for collector thread to collect logfile " << logfileId;
// wait for the collector thread to finish the collection // wait for the collector thread to finish the collection
int64_t iterations = 0; double const end = TRI_microtime() + maxWaitTime;
while (++iterations < maxIterations) { while (true) {
if (_lastCollectedId >= logfileId) { if (_lastCollectedId >= logfileId) {
return TRI_ERROR_NO_ERROR; return TRI_ERROR_NO_ERROR;
} }
int res = _collectorThread->waitForResult(SingleWaitPeriod); int res = _collectorThread->waitForResult(50 * 1000);
// LOG(TRACE) << "still waiting for collector. logfileId: " << logfileId << // LOG(TRACE) << "still waiting for collector. logfileId: " << logfileId <<
// " lastCollected: // " lastCollected: " << _lastCollectedId << ", result: " << res;
// " << // _lastCollectedId << ", result: " << res;
if (res != TRI_ERROR_LOCK_TIMEOUT && res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_LOCK_TIMEOUT && res != TRI_ERROR_NO_ERROR) {
// some error occurred // some error occurred
return res; return res;
} }
double const now = TRI_microtime();
if (now > end) {
break;
}
usleep(20000);
// try again // try again
} }
// TODO: remove debug info here // TODO: remove debug info here
LOG(ERR) << "going into lock timeout. having waited for logfile: " << logfileId << ", maxIterations: " << maxIterations << ", maxWaitTime: " << maxWaitTime; LOG(ERR) << "going into lock timeout. having waited for logfile: " << logfileId << ", maxWaitTime: " << maxWaitTime;
logStatus(); logStatus();
// waited for too long // waited for too long

View File

@ -34,6 +34,8 @@ const functionsDocumentation = {
'authentication_parameters': 'authentication parameters tests', 'authentication_parameters': 'authentication parameters tests',
'boost': 'boost test suites', 'boost': 'boost test suites',
'config': 'checks the config file parsing', 'config': 'checks the config file parsing',
'client_resilience': 'client resilience tests',
'cluster_sync': 'cluster sync tests',
'dump': 'dump tests', 'dump': 'dump tests',
'dump_authentication': 'dump tests with authentication', 'dump_authentication': 'dump tests with authentication',
'dfdb': 'start test', 'dfdb': 'start test',
@ -48,7 +50,6 @@ const functionsDocumentation = {
'replication_static': 'replication static tests', 'replication_static': 'replication static tests',
'replication_sync': 'replication sync tests', 'replication_sync': 'replication sync tests',
'resilience': 'resilience tests', 'resilience': 'resilience tests',
'client_resilience': 'client resilience tests',
'shell_client': 'shell client tests', 'shell_client': 'shell client tests',
'shell_replication': 'shell replication tests', 'shell_replication': 'shell replication tests',
'shell_server': 'shell server tests', 'shell_server': 'shell server tests',
@ -158,7 +159,9 @@ const optionsDefaults = {
'loopEternal': false, 'loopEternal': false,
'loopSleepSec': 1, 'loopSleepSec': 1,
'loopSleepWhen': 1, 'loopSleepWhen': 1,
'minPort': 1024,
'maxPort': 32768, 'maxPort': 32768,
'mochaGrep': undefined,
'onlyNightly': false, 'onlyNightly': false,
'password': '', 'password': '',
'replication': false, 'replication': false,
@ -627,20 +630,32 @@ function cleanupDBDirectories (options) {
// / @brief finds a free port // / @brief finds a free port
// ////////////////////////////////////////////////////////////////////////////// // //////////////////////////////////////////////////////////////////////////////
function findFreePort (maxPort) { function findFreePort (minPort, maxPort, usedPorts) {
if (typeof maxPort !== 'number') { if (typeof maxPort !== 'number') {
maxPort = 32768; maxPort = 32768;
} }
if (maxPort < 2048) {
maxPort = 2048; if (maxPort - minPort < 0) {
throw new Error('minPort ' + minPort + ' is smaller than maxPort ' + maxPort);
} }
let tries = 0;
while (true) { while (true) {
const port = Math.floor(Math.random() * (maxPort - 1024)) + 1024; const port = Math.floor(Math.random() * (maxPort - minPort)) + minPort;
tries++;
if (tries > 20) {
throw new Error('Couldn\'t find a port after ' + tries + ' tries. portrange of ' + minPort + ', ' + maxPort + ' too narrow?');
}
if (Array.isArray(usedPorts) && usedPorts.indexOf(port) >= 0) {
continue;
}
const free = testPort('tcp://0.0.0.0:' + port); const free = testPort('tcp://0.0.0.0:' + port);
if (free) { if (free) {
return port; return port;
} }
require('internal').wait(0.1);
} }
} }
@ -667,13 +682,13 @@ function makePathGeneric (path) {
function runThere (options, instanceInfo, file) { function runThere (options, instanceInfo, file) {
try { try {
let testCode; let testCode;
let mochaGrep = options.mochaGrep ? ', ' + JSON.stringify(options.mochaGrep) : '';
if (file.indexOf('-spec') === -1) { if (file.indexOf('-spec') === -1) {
testCode = 'const runTest = require("jsunity").runTest; ' + testCode = 'const runTest = require("jsunity").runTest; ' +
'return runTest(' + JSON.stringify(file) + ', true);'; 'return runTest(' + JSON.stringify(file) + ', true);';
} else { } else {
testCode = 'const runTest = require("@arangodb/mocha-runner"); ' + testCode = 'const runTest = require("@arangodb/mocha-runner"); ' +
'return runTest(' + JSON.stringify(file) + ', true);'; 'return runTest(' + JSON.stringify(file) + ', true' + mochaGrep + ');';
} }
if (options.propagateInstanceInfo) { if (options.propagateInstanceInfo) {
@ -1323,10 +1338,13 @@ function startInstanceCluster (instanceInfo, protocol, options,
options.agencyWaitForSync = false; options.agencyWaitForSync = false;
startInstanceAgency(instanceInfo, protocol, options, ...makeArgs('agency', 'agency', {})); startInstanceAgency(instanceInfo, protocol, options, ...makeArgs('agency', 'agency', {}));
let usedPorts = [];
let agencyEndpoint = instanceInfo.endpoint; let agencyEndpoint = instanceInfo.endpoint;
let i; let i;
for (i = 0; i < options.dbServers; i++) { for (i = 0; i < options.dbServers; i++) {
let endpoint = protocol + '://127.0.0.1:' + findFreePort(options.maxPort); let port = findFreePort(options.minPort, options.maxPort, usedPorts);
usedPorts.push(port);
let endpoint = protocol + '://127.0.0.1:' + port;
let primaryArgs = _.clone(options.extraArgs); let primaryArgs = _.clone(options.extraArgs);
primaryArgs['server.endpoint'] = endpoint; primaryArgs['server.endpoint'] = endpoint;
primaryArgs['cluster.my-address'] = endpoint; primaryArgs['cluster.my-address'] = endpoint;
@ -1338,7 +1356,9 @@ function startInstanceCluster (instanceInfo, protocol, options,
} }
for (i=0;i<options.coordinators;i++) { for (i=0;i<options.coordinators;i++) {
let endpoint = protocol + '://127.0.0.1:' + findFreePort(options.maxPort); let port = findFreePort(options.minPort, options.maxPort, usedPorts);
usedPorts.push(port);
let endpoint = protocol + '://127.0.0.1:' + port;
let coordinatorArgs = _.clone(options.extraArgs); let coordinatorArgs = _.clone(options.extraArgs);
coordinatorArgs['server.endpoint'] = endpoint; coordinatorArgs['server.endpoint'] = endpoint;
coordinatorArgs['cluster.my-address'] = endpoint; coordinatorArgs['cluster.my-address'] = endpoint;
@ -1392,7 +1412,7 @@ function startArango (protocol, options, addArgs, rootDir, role) {
let port; let port;
if (!addArgs['server.endpoint']) { if (!addArgs['server.endpoint']) {
port = findFreePort(options.maxPort); port = findFreePort(options.minPort, options.maxPort);
endpoint = protocol + '://127.0.0.1:' + port; endpoint = protocol + '://127.0.0.1:' + port;
} else { } else {
endpoint = addArgs['server.endpoint']; endpoint = addArgs['server.endpoint'];
@ -1454,6 +1474,7 @@ function startInstanceAgency (instanceInfo, protocol, options, addArgs, rootDir)
} }
const wfs = options.agencyWaitForSync; const wfs = options.agencyWaitForSync;
let usedPorts = [];
for (let i = 0; i < N; i++) { for (let i = 0; i < N; i++) {
let instanceArgs = _.clone(addArgs); let instanceArgs = _.clone(addArgs);
instanceArgs['log.file'] = fs.join(rootDir, 'log' + String(i)); instanceArgs['log.file'] = fs.join(rootDir, 'log' + String(i));
@ -1463,7 +1484,8 @@ function startInstanceAgency (instanceInfo, protocol, options, addArgs, rootDir)
instanceArgs['agency.wait-for-sync'] = String(wfs); instanceArgs['agency.wait-for-sync'] = String(wfs);
instanceArgs['agency.supervision'] = String(S); instanceArgs['agency.supervision'] = String(S);
instanceArgs['database.directory'] = dataDir + String(i); instanceArgs['database.directory'] = dataDir + String(i);
const port = findFreePort(options.maxPort); const port = findFreePort(options.minPort, options.maxPort, usedPorts);
usedPorts.push(port);
instanceArgs['server.endpoint'] = protocol + '://127.0.0.1:' + port; instanceArgs['server.endpoint'] = protocol + '://127.0.0.1:' + port;
instanceArgs['agency.my-address'] = protocol + '://127.0.0.1:' + port; instanceArgs['agency.my-address'] = protocol + '://127.0.0.1:' + port;
instanceArgs['agency.supervision-grace-period'] = '5'; instanceArgs['agency.supervision-grace-period'] = '5';
@ -1807,6 +1829,7 @@ function findTests () {
testsCases.resilience = doOnePath('js/server/tests/resilience'); testsCases.resilience = doOnePath('js/server/tests/resilience');
testsCases.client_resilience = doOnePath('js/client/tests/resilience'); testsCases.client_resilience = doOnePath('js/client/tests/resilience');
testsCases.cluster_sync = doOnePath('js/server/tests/cluster-sync');
testsCases.server = testsCases.common.concat(testsCases.server_only); testsCases.server = testsCases.common.concat(testsCases.server_only);
testsCases.client = testsCases.common.concat(testsCases.client_only); testsCases.client = testsCases.common.concat(testsCases.client_only);
@ -3552,6 +3575,28 @@ testFuncs.shell_server = function (options) {
return performTests(options, testsCases.server, 'shell_server', runThere); return performTests(options, testsCases.server, 'shell_server', runThere);
}; };
// //////////////////////////////////////////////////////////////////////////////
// / @brief TEST: cluster_sync
// //////////////////////////////////////////////////////////////////////////////
testFuncs.cluster_sync = function (options) {
if (options.cluster) {
// may sound strange but these are actually pure logic tests
// and should not be executed on the cluster
return {
'cluster_sync': {
'status': true,
'message': 'skipped because of cluster',
'skipped': true
}
};
}
findTests();
options.propagateInstanceInfo = true;
return performTests(options, testsCases.cluster_sync, 'cluster_sync', runThere);
};
// ////////////////////////////////////////////////////////////////////////////// // //////////////////////////////////////////////////////////////////////////////
// / @brief TEST: shell_server_aql // / @brief TEST: shell_server_aql
// ////////////////////////////////////////////////////////////////////////////// // //////////////////////////////////////////////////////////////////////////////
@ -3610,10 +3655,10 @@ testFuncs.endpoints = function(options) {
let endpoints = { let endpoints = {
'tcpv4': function() { 'tcpv4': function() {
return 'tcp://127.0.0.1:' + findFreePort(options.maxPort); return 'tcp://127.0.0.1:' + findFreePort(options.minPort, options.maxPort);
}, },
'tcpv6': function() { 'tcpv6': function() {
return 'tcp://[::1]:' + findFreePort(options.maxPort); return 'tcp://[::1]:' + findFreePort(options.minPort, options.maxPort);
}, },
'unix': function() { 'unix': function() {
if (platform.substr(0, 3) === 'win') { if (platform.substr(0, 3) === 'win') {
@ -3820,7 +3865,7 @@ testFuncs.upgrade = function (options) {
fs.makeDirectoryRecursive(tmpDataDir); fs.makeDirectoryRecursive(tmpDataDir);
const appDir = fs.join(tmpDataDir, 'app'); const appDir = fs.join(tmpDataDir, 'app');
const port = findFreePort(options.maxPort); const port = findFreePort(options.minPort, options.maxPort);
let args = makeArgsArangod(options, appDir); let args = makeArgsArangod(options, appDir);
args['server.endpoint'] = 'tcp://127.0.0.1:' + port; args['server.endpoint'] = 'tcp://127.0.0.1:' + port;

View File

@ -34,8 +34,8 @@ const colors = require('internal').COLORS;
const $_MODULE_CONTEXT = Symbol.for('@arangodb/module.context'); const $_MODULE_CONTEXT = Symbol.for('@arangodb/module.context');
module.exports = function (files, returnJson) { module.exports = function (files, returnJson, grep) {
const results = runTests(run, files, 'suite'); const results = runTests(run, files, 'suite', grep);
print(); print();
logSuite(results); logSuite(results);
logStats(results.stats); logStats(results.stats);

View File

@ -53,7 +53,7 @@ exports.reporters = {
default: DefaultReporter default: DefaultReporter
}; };
exports.run = function runMochaTests (run, files, reporterName) { exports.run = function runMochaTests (run, files, reporterName, grep) {
if (!Array.isArray(files)) { if (!Array.isArray(files)) {
files = [files]; files = [files];
} }
@ -85,6 +85,9 @@ exports.run = function runMochaTests (run, files, reporterName) {
return this; return this;
} }
}; };
if (grep) {
mocha.grep(grep);
}
// Clean up after chai.should(), etc // Clean up after chai.should(), etc
var globals = Object.getOwnPropertyNames(global); var globals = Object.getOwnPropertyNames(global);

File diff suppressed because it is too large Load Diff

View File

@ -29,6 +29,7 @@
// ////////////////////////////////////////////////////////////////////////////// // //////////////////////////////////////////////////////////////////////////////
var internal = require('internal'); var internal = require('internal');
var ERRORS = internal.errors;
var endpointToURL = require('@arangodb/cluster').endpointToURL; var endpointToURL = require('@arangodb/cluster').endpointToURL;
var request; var request;
if (ArangoServerState.role() === 'PRIMARY') { if (ArangoServerState.role() === 'PRIMARY') {
@ -272,7 +273,15 @@ function syncCollectionFinalize (database, collname, from, config) {
coll.remove(entry.data._key); coll.remove(entry.data._key);
} catch (errx) { } catch (errx) {
console.error('syncCollectionFinalize: remove', entry, JSON.stringify(errx)); console.error('syncCollectionFinalize: remove', entry, JSON.stringify(errx));
throw errx; if (errx.errorNum !== ERRORS.ERROR_ARANGO_DOCUMENT_NOT_FOUND.code) {
throw errx;
}
// We swallow the NOT FOUND error here. It is possible that
// a follower tries to synchronize to a leader with whom it
// is already in sync. In that case there could have been a
// synchronously replicated removal operation that has happened
// whilst we were resynchronizing the shard. In this case, the
// removal would have happened already.
} }
} else if (entry.type === mType.REPLICATION_TRANSACTION_START) { } else if (entry.type === mType.REPLICATION_TRANSACTION_START) {
transactions[entry.tid] = []; transactions[entry.tid] = [];

View File

@ -0,0 +1,757 @@
/* global describe, it, before, beforeEach, afterEach */
// //////////////////////////////////////////////////////////////////////////////
// / @brief JavaScript cluster functionality
// /
// / @file
// /
// / DISCLAIMER
// /
// / Copyright 2017 ArangoDB GmbH, Cologne, Germany
// /
// / Licensed under the Apache License, Version 2.0 (the "License")
// / you may not use this file except in compliance with the License.
// / You may obtain a copy of the License at
// /
// / http://www.apache.org/licenses/LICENSE-2.0
// /
// / Unless required by applicable law or agreed to in writing, software
// / distributed under the License is distributed on an "AS IS" BASIS,
// / WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// / See the License for the specific language governing permissions and
// / limitations under the License.
// /
// / Copyright holder is ArangoDB GmbH, Cologne, Germany
// /
// / @author Andreas Streichardt
// //////////////////////////////////////////////////////////////////////////////
const db = require('internal').db;
const cluster = require('@arangodb/cluster');
const expect = require('chai').expect;
const ArangoCollection = require('@arangodb/arango-collection').ArangoCollection;
describe('Cluster sync', function() {
describe('Databaseplan to local', function() {
before(function() {
require('@arangodb/sync-replication-debug').setup();
});
beforeEach(function() {
db._databases().forEach(database => {
if (database !== '_system') {
db._dropDatabase(database);
}
});
});
it('should create a planned database', function() {
let plan = {
"Databases": {
"test": {
"id": 1,
"name": "test"
}
}
};
let errors = cluster.executePlanForDatabases(plan.Databases);
let databases = db._databases();
expect(databases).to.have.lengthOf(2);
expect(databases).to.contain('test');
expect(errors).to.be.empty;
});
it('should leave everything in place if a planned database already exists', function() {
let plan = {
Databases: {
"test": {
"id": 1,
"name": "test"
}
}
};
db._createDatabase('test');
let errors = cluster.executePlanForDatabases(plan.Databases);
let databases = db._databases();
expect(databases).to.have.lengthOf(2);
expect(databases).to.contain('test');
expect(errors).to.be.empty;
});
it('should delete a database if it is not used anymore', function() {
db._createDatabase('peng');
let plan = {
Databases: {
}
};
cluster.executePlanForDatabases(plan.Databases);
let databases = db._databases();
expect(databases).to.have.lengthOf(1);
expect(databases).to.contain('_system');
});
});
describe('Collection plan to local', function() {
let numSystemCollections;
before(function() {
require('@arangodb/sync-replication-debug').setup();
});
beforeEach(function() {
db._databases().forEach(database => {
if (database !== '_system') {
db._dropDatabase(database);
}
});
db._createDatabase('test');
db._useDatabase('test');
numSystemCollections = db._collections().length;
});
afterEach(function() {
db._useDatabase('_system');
});
it('should create and load a collection if it does not exist', function() {
let plan = {
Collections: {
test: {
"100001": {
"deleted": false,
"doCompact": true,
"id": "100001",
"indexBuckets": 8,
"indexes": [
{
"fields": [
"_key"
],
"id": "0",
"sparse": false,
"type": "primary",
"unique": true
}
],
"isSystem": false,
"isVolatile": false,
"journalSize": 1048576,
"keyOptions": {
"allowUserKeys": true,
"type": "traditional"
},
"name": "test",
"numberOfShards": 1,
"replicationFactor": 2,
"shardKeys": [
"_key"
],
"shards": {
"s100001": [
"",
]
},
"status": 3,
"type": 2,
"waitForSync": false
}
}
}
};
cluster.executePlanForCollections(plan.Collections);
db._useDatabase('test');
let collections = db._collections();
expect(collections.map(collection => collection.name())).to.contain('s100001');
expect(db._collection('s100001').status()).to.equal(ArangoCollection.STATUS_LOADED);
});
it('should create a collection if it does not exist (unloaded case)', function() {
let plan = {
Collections: {
test: {
"100001": {
"deleted": false,
"doCompact": true,
"id": "100001",
"indexBuckets": 8,
"indexes": [
{
"fields": [
"_key"
],
"id": "0",
"sparse": false,
"type": "primary",
"unique": true
}
],
"isSystem": false,
"isVolatile": false,
"journalSize": 1048576,
"keyOptions": {
"allowUserKeys": true,
"type": "traditional"
},
"name": "test",
"numberOfShards": 1,
"replicationFactor": 2,
"shardKeys": [
"_key"
],
"shards": {
"s100001": [
"",
]
},
"status": 2,
"type": 2,
"waitForSync": false
}
}
}
};
cluster.executePlanForCollections(plan.Collections);
db._useDatabase('test');
let collections = db._collections();
expect(collections.map(collection => collection.name())).to.contain('s100001');
expect(db._collection('s100001').status()).to.equal(ArangoCollection.STATUS_UNLOADED);
});
it('should unload an existing collection', function() {
db._create('s100001');
expect(db._collection('s100001').status()).to.equal(ArangoCollection.STATUS_LOADED);
let plan = {
test: {
"100001": {
"deleted": false,
"doCompact": true,
"id": "100001",
"indexBuckets": 8,
"indexes": [
{
"fields": [
"_key"
],
"id": "0",
"sparse": false,
"type": "primary",
"unique": true
}
],
"isSystem": false,
"isVolatile": false,
"journalSize": 1048576,
"keyOptions": {
"allowUserKeys": true,
"type": "traditional"
},
"name": "test",
"numberOfShards": 1,
"replicationFactor": 2,
"shardKeys": [
"_key"
],
"shards": {
"s100001": [
"",
]
},
"status": 2,
"type": 2,
"waitForSync": false
}
}
};
cluster.executePlanForCollections(plan);
db._useDatabase('test');
expect(db._collection('s100001').status()).to.equal(ArangoCollection.STATUS_UNLOADED);
});
it('should delete a stale collection', function() {
db._create('s100001');
let plan = {
Collections: {
test: {
}
}
};
cluster.executePlanForCollections(plan.Collections);
db._useDatabase('test');
let collections = db._collections();
expect(collections).to.have.lengthOf(numSystemCollections);
});
it('should ignore a collection for which it is not responsible', function() {
let plan = {
Collections: {
test: {
"100001": {
"deleted": false,
"doCompact": true,
"id": "100001",
"indexBuckets": 8,
"indexes": [
{
"fields": [
"_key"
],
"id": "0",
"sparse": false,
"type": "primary",
"unique": true
}
],
"isSystem": false,
"isVolatile": false,
"journalSize": 1048576,
"keyOptions": {
"allowUserKeys": true,
"type": "traditional"
},
"name": "test",
"numberOfShards": 1,
"replicationFactor": 2,
"shardKeys": [
"_key"
],
"shards": {
"s100001": [
"swag",
]
},
"status": 3,
"type": 2,
"waitForSync": false
}
}
}
};
cluster.executePlanForCollections(plan.Collections);
db._useDatabase('test');
let collections = db._collections();
expect(collections).to.have.lengthOf(numSystemCollections);
});
it('should delete a collection for which it lost responsibility', function() {
db._create('s100001');
let plan = {
Collections: {
test: {
"100001": {
"deleted": false,
"doCompact": true,
"id": "100001",
"indexBuckets": 8,
"indexes": [
{
"fields": [
"_key"
],
"id": "0",
"sparse": false,
"type": "primary",
"unique": true
}
],
"isSystem": false,
"isVolatile": false,
"journalSize": 1048576,
"keyOptions": {
"allowUserKeys": true,
"type": "traditional"
},
"name": "test",
"numberOfShards": 1,
"replicationFactor": 2,
"shardKeys": [
"_key"
],
"shards": {
"s100001": [
"debug-follower", // this is a different server than we are
]
},
"status": 2,
"type": 2,
"waitForSync": false
}
}
}
};
cluster.executePlanForCollections(plan.Collections);
db._useDatabase('test');
let collections = db._collections();
expect(collections).to.have.lengthOf(numSystemCollections);
});
it('should create an additional index if instructed to do so', function() {
db._create('s100001');
let plan = {
Collections: {
test: {
"100001": {
"deleted": false,
"doCompact": true,
"id": "100001",
"indexBuckets": 8,
"indexes": [
{
"fields": [
"_key"
],
"id": "0",
"sparse": false,
"type": "primary",
"unique": true
},
{
"error": false,
"errorMessage": "",
"errorNum": 0,
"fields": [
"user"
],
"id": "100005",
"sparse": true,
"type": "hash",
"unique": true
}
],
"isSystem": false,
"isVolatile": false,
"journalSize": 1048576,
"keyOptions": {
"allowUserKeys": true,
"type": "traditional"
},
"name": "test",
"numberOfShards": 1,
"replicationFactor": 2,
"shardKeys": [
"_key"
],
"shards": {
"s100001": [
""
]
},
"status": 2,
"type": 2,
"waitForSync": false
}
}
}
};
cluster.executePlanForCollections(plan.Collections);
db._useDatabase('test');
let indexes = db._collection('s100001').getIndexes();
expect(indexes).to.have.lengthOf(2);
});
it('should remove an additional index if instructed to do so', function() {
db._create('s100001');
db._collection('s100001').ensureIndex({ type: "hash", fields: [ "name" ] });
let plan = {
Databases: {
"_system": {
"id": 1,
"name": "_system"
},
"test": {
"id": 2,
"name": "test"
}
},
Collections: {
test: {
"100001": {
"deleted": false,
"doCompact": true,
"id": "100001",
"indexBuckets": 8,
"indexes": [
{
"fields": [
"_key"
],
"id": "0",
"sparse": false,
"type": "primary",
"unique": true
}
],
"isSystem": false,
"isVolatile": false,
"journalSize": 1048576,
"keyOptions": {
"allowUserKeys": true,
"type": "traditional"
},
"name": "test",
"numberOfShards": 1,
"replicationFactor": 2,
"shardKeys": [
"_key"
],
"shards": {
"s100001": [
"",
]
},
"status": 2,
"type": 2,
"waitForSync": false
}
}
}
};
cluster.executePlanForCollections(plan.Collections);
db._useDatabase('test');
let indexes = db._collection('s100001').getIndexes();
expect(indexes).to.have.lengthOf(1);
});
it('should report an error when collection creation failed', function() {
let plan = {
Collections: {
test: {
"100001": {
"deleted": false,
"doCompact": true,
"id": "100001",
"indexBuckets": 8,
"indexes": [
{
"fields": [
"_key"
],
"id": "0",
"sparse": false,
"type": "primary",
"unique": true
}
],
"isSystem": false,
"isVolatile": false,
"journalSize": 1048576,
"keyOptions": {
"allowUserKeys": true,
"type": "traditional"
},
"name": "test",
"numberOfShards": 1,
"replicationFactor": 2,
"shardKeys": [
"_key"
],
"shards": {
"Möter": [
"",
]
},
"status": 2,
"type": 2,
"waitForSync": false
}
}
}
};
let errors = cluster.executePlanForCollections(plan.Collections);
expect(errors).to.be.an('object');
expect(errors).to.have.property('Möter');
});
it('should be leading a collection when ordered to be leader', function() {
let plan = {
test: {
"100001": {
"deleted": false,
"doCompact": true,
"id": "100001",
"indexBuckets": 8,
"indexes": [
{
"fields": [
"_key"
],
"id": "0",
"sparse": false,
"type": "primary",
"unique": true
}
],
"isSystem": false,
"isVolatile": false,
"journalSize": 1048576,
"keyOptions": {
"allowUserKeys": true,
"type": "traditional"
},
"name": "test",
"numberOfShards": 1,
"replicationFactor": 2,
"shardKeys": [
"_key"
],
"shards": {
"s100001": [
"",
]
},
"status": 3,
"type": 2,
"waitForSync": false
}
}
};
let errors = cluster.executePlanForCollections(plan);
db._useDatabase('test');
expect(db._collection('s100001').isLeader()).to.equal(true);
});
it('should be following a leader when ordered to be follower', function() {
let plan = {
test: {
"100001": {
"deleted": false,
"doCompact": true,
"id": "100001",
"indexBuckets": 8,
"indexes": [
{
"fields": [
"_key"
],
"id": "0",
"sparse": false,
"type": "primary",
"unique": true
}
],
"isSystem": false,
"isVolatile": false,
"journalSize": 1048576,
"keyOptions": {
"allowUserKeys": true,
"type": "traditional"
},
"name": "test",
"numberOfShards": 1,
"replicationFactor": 2,
"shardKeys": [
"_key"
],
"shards": {
"s100001": [
"the leader-leader",
"",
]
},
"status": 2,
"type": 2,
"waitForSync": false
}
}
};
let errors = cluster.executePlanForCollections(plan);
db._useDatabase('test');
expect(db._collection('s100001').isLeader()).to.equal(false);
});
it('should be able to switch from leader to follower', function() {
let plan = {
test: {
"100001": {
"deleted": false,
"doCompact": true,
"id": "100001",
"indexBuckets": 8,
"indexes": [
{
"fields": [
"_key"
],
"id": "0",
"sparse": false,
"type": "primary",
"unique": true
}
],
"isSystem": false,
"isVolatile": false,
"journalSize": 1048576,
"keyOptions": {
"allowUserKeys": true,
"type": "traditional"
},
"name": "test",
"numberOfShards": 1,
"replicationFactor": 2,
"shardKeys": [
"_key"
],
"shards": {
"s100001": [
"",
]
},
"status": 2,
"type": 2,
"waitForSync": false
}
}
};
let errors = cluster.executePlanForCollections(plan);
plan.test['100001'].shards['s100001'].unshift('der-hund');
cluster.executePlanForCollections(plan);
db._useDatabase('test');
expect(db._collection('s100001').isLeader()).to.equal(false);
});
it('should be able to switch from follower to leader', function() {
let plan = {
test: {
"100001": {
"deleted": false,
"doCompact": true,
"id": "100001",
"indexBuckets": 8,
"indexes": [
{
"fields": [
"_key"
],
"id": "0",
"sparse": false,
"type": "primary",
"unique": true
}
],
"isSystem": false,
"isVolatile": false,
"journalSize": 1048576,
"keyOptions": {
"allowUserKeys": true,
"type": "traditional"
},
"name": "test",
"numberOfShards": 1,
"replicationFactor": 2,
"shardKeys": [
"_key"
],
"shards": {
"s100001": [
"old-leader",
"",
]
},
"status": 2,
"type": 2,
"waitForSync": false
}
}
};
let errors = cluster.executePlanForCollections(plan);
plan.test['100001'].shards['s100001'] = [""];
cluster.executePlanForCollections(plan);
db._useDatabase('test');
expect(db._collection('s100001').isLeader()).to.equal(true);
});
});
describe('Update current', function() {
beforeEach(function() {
db._databases().forEach(database => {
if (database !== '_system') {
db._dropDatabase(database);
}
});
});
it('should report a new database', function() {
let Current = {
Databases: {},
};
});
});
});

View File

@ -157,7 +157,6 @@ function MovingShardsSuite () {
wait(1.0); wait(1.0);
global.ArangoClusterInfo.flush(); global.ArangoClusterInfo.flush();
var servers = findCollectionServers("_system", c[i].name()); var servers = findCollectionServers("_system", c[i].name());
console.info("Seeing servers:", i, c[i].name(), servers);
if (servers.indexOf(id) === -1) { if (servers.indexOf(id) === -1) {
// Now check current as well: // Now check current as well:
var collInfo = var collInfo =

View File

@ -64,6 +64,237 @@ var sortedKeys = function (col) {
return keys; return keys;
}; };
function transactionRevisionsSuite () {
'use strict';
var cn = "UnitTestsTransaction";
var c = null;
return {
setUp : function () {
internal.debugClearFailAt();
db._drop(cn);
c = db._create(cn);
},
tearDown : function () {
internal.debugClearFailAt();
if (c !== null) {
c.drop();
}
c = null;
internal.wait(0);
},
testInsertUniqueFailing : function () {
var doc = c.insert({ _key: "test", value: 1 });
try {
db._executeTransaction({
collections: { write: c.name() },
action: function() {
c.insert({ _key: "test", value: 2 });
}
});
fail();
} catch (err) {
}
assertEqual(1, c.figures().revisions.count);
assertEqual(1, c.count());
assertEqual(1, c.toArray().length);
assertEqual(1, c.document("test").value);
},
testInsertUniqueSingleFailing : function () {
var doc = c.insert({ _key: "test", value: 1 });
try {
c.insert({ _key: "test", value: 2 });
fail();
} catch (err) {
}
assertEqual(1, c.figures().revisions.count);
assertEqual(1, c.count());
assertEqual(1, c.toArray().length);
assertEqual(1, c.document("test").value);
},
testInsertTransactionFailing : function () {
var doc = c.insert({ _key: "test", value: 1 });
try {
db._executeTransaction({
collections: { write: c.name() },
action: function() {
c.insert({ _key: "test2", value: 2 });
throw "foo";
}
});
fail();
} catch (err) {
}
assertEqual(1, c.toArray().length);
assertEqual(1, c.figures().revisions.count);
assertEqual(1, c.document("test").value);
},
testRemoveTransactionFailing : function () {
var doc = c.insert({ _key: "test", value: 1 });
try {
db._executeTransaction({
collections: { write: c.name() },
action: function() {
c.remove("test");
throw "foo";
}
});
fail();
} catch (err) {
}
assertEqual(1, c.toArray().length);
assertEqual(1, c.figures().revisions.count);
assertEqual(1, c.document("test").value);
},
testRemoveInsertWithSameRev : function () {
var doc = c.insert({ _key: "test", value: 1 });
db._executeTransaction({
collections: { write: c.name() },
action: function() {
c.remove("test");
c.insert({ _key: "test", _rev: doc._rev, value: 2 }, { isRestore: true });
}
});
assertEqual(1, c.toArray().length);
assertEqual(1, c.figures().revisions.count);
assertEqual(2, c.document("test").value);
},
testUpdateWithSameRev : function () {
var doc = c.insert({ _key: "test", value: 1 });
c.update("test", { _key: "test", _rev: doc._rev, value: 2 }, { isRestore: true });
assertEqual(1, c.toArray().length);
assertEqual(1, c.figures().revisions.count);
assertEqual(2, c.document("test").value);
},
testUpdateWithSameRevTransaction : function () {
var doc = c.insert({ _key: "test", value: 1 });
db._executeTransaction({
collections: { write: c.name() },
action: function() {
c.update("test", { _key: "test", _rev: doc._rev, value: 2 }, { isRestore: true });
}
});
assertEqual(1, c.toArray().length);
assertEqual(1, c.figures().revisions.count);
assertEqual(2, c.document("test").value);
},
testUpdateFailingWithSameRev : function () {
var doc = c.insert({ _key: "test", value: 1 });
try {
db._executeTransaction({
collections: { write: c.name() },
action: function() {
c.update("test", { _key: "test", _rev: doc._rev, value: 2 }, { isRestore: true });
throw "foo";
}
});
fail();
} catch (err) {
}
assertEqual(1, c.toArray().length);
assertEqual(1, c.figures().revisions.count);
assertEqual(1, c.document("test").value);
},
testUpdateFailing : function () {
var doc = c.insert({ _key: "test", value: 1 });
try {
db._executeTransaction({
collections: { write: c.name() },
action: function() {
c.update({ _key: "test", value: 2 });
throw "foo";
}
});
fail();
} catch (err) {
}
assertEqual(1, c.toArray().length);
assertEqual(1, c.figures().revisions.count);
assertEqual(1, c.document("test").value);
},
testUpdateAndInsertFailing : function () {
var doc = c.insert({ _key: "test", value: 1 });
try {
db._executeTransaction({
collections: { write: c.name() },
action: function() {
c.update({ _key: "test", value: 2 });
c.insert({ _key: "test", value: 3 });
throw "foo";
}
});
fail();
} catch (err) {
}
assertEqual(1, c.toArray().length);
assertEqual(1, c.figures().revisions.count);
assertEqual(1, c.document("test").value);
},
testRemoveAndInsert : function () {
var doc = c.insert({ _key: "test", value: 1 });
db._executeTransaction({
collections: { write: c.name() },
action: function() {
c.remove("test");
c.insert({ _key: "test", value: 2 });
}
});
assertEqual(1, c.toArray().length);
assertEqual(1, c.figures().revisions.count);
assertEqual(2, c.document("test").value);
},
testRemoveAndInsertFailing : function () {
var doc = c.insert({ _key: "test", value: 1 });
try {
db._executeTransaction({
collections: { write: c.name() },
action: function() {
c.remove("test");
c.insert({ _key: "test", value: 3 });
throw "foo";
}
});
fail();
} catch (err) {
}
assertEqual(1, c.toArray().length);
assertEqual(1, c.figures().revisions.count);
assertEqual(1, c.document("test").value);
}
};
}
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief test suite /// @brief test suite
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -5157,6 +5388,9 @@ function transactionServerFailuresSuite () {
/// @brief executes the test suites /// @brief executes the test suites
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
jsunity.run(transactionRevisionsSuite);
jsunity.run(transactionRollbackSuite);
// only run this test suite if server-side failures are enabled // only run this test suite if server-side failures are enabled
if (internal.debugCanUseFailAt()) { if (internal.debugCanUseFailAt()) {
jsunity.run(transactionServerFailuresSuite); jsunity.run(transactionServerFailuresSuite);
@ -5167,7 +5401,6 @@ jsunity.run(transactionCollectionsSuite);
jsunity.run(transactionOperationsSuite); jsunity.run(transactionOperationsSuite);
jsunity.run(transactionBarriersSuite); jsunity.run(transactionBarriersSuite);
jsunity.run(transactionGraphSuite); jsunity.run(transactionGraphSuite);
jsunity.run(transactionRollbackSuite);
jsunity.run(transactionCountSuite); jsunity.run(transactionCountSuite);
jsunity.run(transactionCrossCollectionSuite); jsunity.run(transactionCrossCollectionSuite);
jsunity.run(transactionConstraintsSuite); jsunity.run(transactionConstraintsSuite);

View File

@ -45,6 +45,16 @@ std::string const StaticStrings::RevString("_rev");
std::string const StaticStrings::FromString("_from"); std::string const StaticStrings::FromString("_from");
std::string const StaticStrings::ToString("_to"); std::string const StaticStrings::ToString("_to");
// URL parameter names
std::string const StaticStrings::IgnoreRevsString("ignoreRevs");
std::string const StaticStrings::IsRestoreString("isRestore");
std::string const StaticStrings::KeepNullString("keepNull");
std::string const StaticStrings::MergeObjectsString("mergeObjects");
std::string const StaticStrings::ReturnNewString("returnNew");
std::string const StaticStrings::ReturnOldString("returnOld");
std::string const StaticStrings::SilentString("silent");
std::string const StaticStrings::WaitForSyncString("waitForSync");
// database and collection names // database and collection names
std::string const StaticStrings::SystemDatabase("_system"); std::string const StaticStrings::SystemDatabase("_system");

View File

@ -51,6 +51,16 @@ class StaticStrings {
static std::string const FromString; static std::string const FromString;
static std::string const ToString; static std::string const ToString;
// URL parameter names
static std::string const IgnoreRevsString;
static std::string const IsRestoreString;
static std::string const KeepNullString;
static std::string const MergeObjectsString;
static std::string const ReturnNewString;
static std::string const ReturnOldString;
static std::string const SilentString;
static std::string const WaitForSyncString;
// database and collection names // database and collection names
static std::string const SystemDatabase; static std::string const SystemDatabase;

View File

@ -350,7 +350,7 @@ SIMPLE_CLIENT_COULD_NOT_READ,2003,"could not read from server","Will be raised w
## Communicator errors ## Communicator errors
################################################################################ ################################################################################
COMMUNICATOR_REQUEST_ABORTED,2100,"Request aborted", "Request was aborted." COMMUNICATOR_REQUEST_ABORTED,2100,"Request aborted","Request was aborted."
################################################################################ ################################################################################
## Foxx management errors ## Foxx management errors

View File

@ -33,8 +33,12 @@ TRANSPORT="tcp"
LOG_LEVEL="INFO" LOG_LEVEL="INFO"
LOG_LEVEL_AGENCY="" LOG_LEVEL_AGENCY=""
LOG_LEVEL_CLUSTER="" LOG_LEVEL_CLUSTER=""
XTERM="x-terminal-emulator" if [ -z "$XTERM" ] ; then
XTERMOPTIONS="--geometry=80x43" XTERM="x-terminal-emulator"
fi
if [ -z "$XTERMOPTIONS" ] ; then
XTERMOPTIONS="--geometry=80x43"
fi
SECONDARIES=0 SECONDARIES=0
BUILD="build" BUILD="build"
JWT_SECRET="" JWT_SECRET=""
@ -227,7 +231,7 @@ start() {
--server.endpoint $TRANSPORT://0.0.0.0:$PORT \ --server.endpoint $TRANSPORT://0.0.0.0:$PORT \
--cluster.my-role $ROLE \ --cluster.my-role $ROLE \
--log.file cluster/$PORT.log \ --log.file cluster/$PORT.log \
--log.level info \ --log.level $LOG_LEVEL \
--server.statistics true \ --server.statistics true \
--server.threads 5 \ --server.threads 5 \
--javascript.startup-directory ./js \ --javascript.startup-directory ./js \
@ -250,7 +254,7 @@ startTerminal() {
PORT=$2 PORT=$2
mkdir cluster/data$PORT mkdir cluster/data$PORT
echo Starting $TYPE on port $PORT echo Starting $TYPE on port $PORT
$XTERM $XTERMOPTIONS -e ${BUILD}/bin/arangod \ $XTERM $XTERMOPTIONS -e "${BUILD}/bin/arangod \
-c none \ -c none \
--database.directory cluster/data$PORT \ --database.directory cluster/data$PORT \
--cluster.agency-endpoint $TRANSPORT://127.0.0.1:$BASE \ --cluster.agency-endpoint $TRANSPORT://127.0.0.1:$BASE \
@ -258,7 +262,7 @@ startTerminal() {
--server.endpoint $TRANSPORT://0.0.0.0:$PORT \ --server.endpoint $TRANSPORT://0.0.0.0:$PORT \
--cluster.my-role $ROLE \ --cluster.my-role $ROLE \
--log.file cluster/$PORT.log \ --log.file cluster/$PORT.log \
--log.level info \ --log.level $LOG_LEVEL \
--server.statistics true \ --server.statistics true \
--server.threads 5 \ --server.threads 5 \
--javascript.startup-directory ./js \ --javascript.startup-directory ./js \
@ -266,7 +270,7 @@ startTerminal() {
--javascript.app-path ./js/apps \ --javascript.app-path ./js/apps \
$AUTHENTICATION \ $AUTHENTICATION \
$SSLKEYFILE \ $SSLKEYFILE \
--console & --console" &
} }
startDebugger() { startDebugger() {
@ -287,7 +291,7 @@ startDebugger() {
--server.endpoint $TRANSPORT://0.0.0.0:$PORT \ --server.endpoint $TRANSPORT://0.0.0.0:$PORT \
--cluster.my-role $ROLE \ --cluster.my-role $ROLE \
--log.file cluster/$PORT.log \ --log.file cluster/$PORT.log \
--log.level info \ --log.level $LOG_LEVEL \
--server.statistics false \ --server.statistics false \
--server.threads 5 \ --server.threads 5 \
--javascript.startup-directory ./js \ --javascript.startup-directory ./js \
@ -295,7 +299,7 @@ startDebugger() {
--javascript.app-path ./js/apps \ --javascript.app-path ./js/apps \
$SSLKEYFILE \ $SSLKEYFILE \
$AUTHENTICATION & $AUTHENTICATION &
$XTERM $XTERMOPTIONS -e gdb ${BUILD}/bin/arangod -p $! & $XTERM $XTERMOPTIONS -e "gdb ${BUILD}/bin/arangod -p $!" &
} }
startRR() { startRR() {
@ -308,7 +312,7 @@ startRR() {
PORT=$2 PORT=$2
mkdir cluster/data$PORT mkdir cluster/data$PORT
echo Starting $TYPE on port $PORT with rr tracer echo Starting $TYPE on port $PORT with rr tracer
$XTERM $XTERMOPTIONS -e rr ${BUILD}/bin/arangod \ $XTERM $XTERMOPTIONS -e "rr ${BUILD}/bin/arangod \
-c none \ -c none \
--database.directory cluster/data$PORT \ --database.directory cluster/data$PORT \
--cluster.agency-endpoint $TRANSPORT://127.0.0.1:$BASE \ --cluster.agency-endpoint $TRANSPORT://127.0.0.1:$BASE \
@ -316,7 +320,7 @@ startRR() {
--server.endpoint $TRANSPORT://0.0.0.0:$PORT \ --server.endpoint $TRANSPORT://0.0.0.0:$PORT \
--cluster.my-role $ROLE \ --cluster.my-role $ROLE \
--log.file cluster/$PORT.log \ --log.file cluster/$PORT.log \
--log.level info \ --log.level $LOG_LEVEL \
--server.statistics true \ --server.statistics true \
--server.threads 5 \ --server.threads 5 \
--javascript.startup-directory ./js \ --javascript.startup-directory ./js \
@ -324,7 +328,7 @@ startRR() {
--javascript.app-path ./js/apps \ --javascript.app-path ./js/apps \
$AUTHENTICATION \ $AUTHENTICATION \
$SSLKEYFILE \ $SSLKEYFILE \
--console & --console" &
} }
PORTTOPDB=`expr 8629 + $NRDBSERVERS - 1` PORTTOPDB=`expr 8629 + $NRDBSERVERS - 1`