1
0
Fork 0

Sort out AgencyReadTransactions and cleanup AgencyWriteTransactions.

This commit is contained in:
Max Neunhoeffer 2016-05-04 00:08:15 +02:00
parent eeb1e769da
commit b13b12960b
3 changed files with 170 additions and 132 deletions

View File

@ -142,7 +142,7 @@ void AgencyPrecondition::toVelocyPack(VPackBuilder& builder) const {
/// @brief converts the transaction to json
//////////////////////////////////////////////////////////////////////////////
std::string AgencyTransaction::toJson() const {
std::string AgencyWriteTransaction::toJson() const {
VPackBuilder builder;
{
VPackArrayBuilder guard(&builder);
@ -155,7 +155,7 @@ std::string AgencyTransaction::toJson() const {
/// @brief converts the transaction to velocypack
//////////////////////////////////////////////////////////////////////////////
void AgencyTransaction::toVelocyPack(VPackBuilder& builder) const {
void AgencyWriteTransaction::toVelocyPack(VPackBuilder& builder) const {
VPackArrayBuilder guard(&builder);
{
VPackObjectBuilder guard2(&builder);
@ -171,6 +171,33 @@ void AgencyTransaction::toVelocyPack(VPackBuilder& builder) const {
}
}
//////////////////////////////////////////////////////////////////////////////
/// @brief converts the read transaction to json
//////////////////////////////////////////////////////////////////////////////
std::string AgencyReadTransaction::toJson() const {
VPackBuilder builder;
{
VPackArrayBuilder guard(&builder);
toVelocyPack(builder);
}
return builder.toJson();
}
//////////////////////////////////////////////////////////////////////////////
/// @brief converts the read transaction to velocypack
//////////////////////////////////////////////////////////////////////////////
void AgencyReadTransaction::toVelocyPack(VPackBuilder& builder) const {
VPackArrayBuilder guard(&builder);
{
VPackArrayBuilder guard2(&builder);
for (std::string const& key: keys) {
builder.add(VPackValue(key));
}
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief creates an agency endpoint
////////////////////////////////////////////////////////////////////////////////
@ -715,13 +742,12 @@ bool AgencyComm::tryInitializeStructure() {
LOG_TOPIC(TRACE, Logger::STARTUP)
<< "Initializing agency with " << builder.toJson();
AgencyCommResult result;
AgencyOperation initOperation("", AgencyValueOperationType::SET, builder.slice());
AgencyTransaction initTransaction;
AgencyWriteTransaction initTransaction;
initTransaction.operations.push_back(initOperation);
sendTransactionWithFailover(result, initTransaction);
auto result = sendTransactionWithFailover(initTransaction);
return result.successful();
} catch (std::exception const& e) {
@ -743,8 +769,7 @@ bool AgencyComm::shouldInitializeStructure() {
double timeout = _globalConnectionOptions._requestTimeout;
// "InitDone" key should not previously exist
AgencyCommResult result = casValue("InitDone", builder.slice(), false,
60.0, timeout);
auto result = casValue("InitDone", builder.slice(), false, 60.0, timeout);
if (!result.successful() &&
result.httpCode() ==
@ -1111,10 +1136,9 @@ AgencyCommResult AgencyComm::sendServerState(double ttl) {
////////////////////////////////////////////////////////////////////////////////
std::string AgencyComm::getVersion() {
AgencyCommResult result;
sendWithFailover(arangodb::GeneralRequest::RequestType::GET,
_globalConnectionOptions._requestTimeout, result, "version",
AgencyCommResult result
=sendWithFailover(arangodb::GeneralRequest::RequestType::GET,
_globalConnectionOptions._requestTimeout, "version",
"", false);
if (result.successful()) {
@ -1134,32 +1158,27 @@ AgencyCommResult AgencyComm::createDirectory(std::string const& key) {
VPackObjectBuilder dir(&builder);
}
AgencyCommResult result;
AgencyOperation operation(key, AgencyValueOperationType::SET, builder.slice());
AgencyTransaction transaction(operation);
AgencyWriteTransaction transaction(operation);
sendTransactionWithFailover(result, transaction);
return result;
return sendTransactionWithFailover(transaction);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief sets a value in the backend
////////////////////////////////////////////////////////////////////////////////
AgencyCommResult AgencyComm::setValue(std::string const& key,
std::string const& value,
double ttl) {
VPackBuilder builder;
builder.add(VPackValue(value));
AgencyCommResult result;
AgencyOperation operation(key, AgencyValueOperationType::SET, builder.slice());
operation._ttl = static_cast<uint32_t>(ttl);
AgencyTransaction transaction(operation);
AgencyWriteTransaction transaction(operation);
sendTransactionWithFailover(result, transaction);
return result;
return sendTransactionWithFailover(transaction);
}
////////////////////////////////////////////////////////////////////////////////
@ -1170,14 +1189,11 @@ AgencyCommResult AgencyComm::setValue(std::string const& key,
arangodb::velocypack::Slice const& slice,
double ttl) {
AgencyCommResult result;
AgencyOperation operation(key, AgencyValueOperationType::SET, slice);
operation._ttl = static_cast<uint32_t>(ttl);
AgencyTransaction transaction(operation);
AgencyWriteTransaction transaction(operation);
sendTransactionWithFailover(result, transaction);
return result;
return sendTransactionWithFailover(transaction);
}
////////////////////////////////////////////////////////////////////////////////
@ -1195,15 +1211,12 @@ bool AgencyComm::exists(std::string const& key) {
////////////////////////////////////////////////////////////////////////////////
AgencyCommResult AgencyComm::increment(std::string const& key) {
AgencyCommResult result;
AgencyTransaction transaction(
AgencyWriteTransaction transaction(
AgencyOperation(key, AgencySimpleOperationType::INCREMENT_OP)
);
sendTransactionWithFailover(result, transaction);
return result;
return sendTransactionWithFailover(transaction);
}
////////////////////////////////////////////////////////////////////////////////
@ -1214,7 +1227,6 @@ AgencyCommResult AgencyComm::getValues(std::string const& key, bool recursive) {
std::string url(buildUrl());
url += "/read";
AgencyCommResult result;
VPackBuilder builder;
{
VPackArrayBuilder root(&builder);
@ -1224,8 +1236,9 @@ AgencyCommResult AgencyComm::getValues(std::string const& key, bool recursive) {
}
}
sendWithFailover(arangodb::GeneralRequest::RequestType::POST,
_globalConnectionOptions._requestTimeout, result, url,
AgencyCommResult result
= sendWithFailover(arangodb::GeneralRequest::RequestType::POST,
_globalConnectionOptions._requestTimeout, url,
builder.toJson(), false);
if (!result.successful()) {
@ -1340,7 +1353,6 @@ AgencyCommResult AgencyComm::getValues2(std::string const& key, bool recursive)
std::string url(buildUrl());
url += "/read";
AgencyCommResult result;
VPackBuilder builder;
{
VPackArrayBuilder root(&builder);
@ -1350,8 +1362,9 @@ AgencyCommResult AgencyComm::getValues2(std::string const& key, bool recursive)
}
}
sendWithFailover(arangodb::GeneralRequest::RequestType::POST,
_globalConnectionOptions._requestTimeout, result, url,
AgencyCommResult result
= sendWithFailover(arangodb::GeneralRequest::RequestType::POST,
_globalConnectionOptions._requestTimeout, url,
builder.toJson(), false);
if (!result.successful()) {
@ -1392,28 +1405,24 @@ AgencyCommResult AgencyComm::getValues2(std::string const& key, bool recursive)
AgencyCommResult AgencyComm::removeValues(std::string const& key,
bool recursive) {
AgencyCommResult result;
AgencyTransaction transaction(
AgencyWriteTransaction transaction(
AgencyOperation(key, AgencySimpleOperationType::DELETE_OP),
AgencyPrecondition(key, AgencyPrecondition::EMPTY, false)
);
sendTransactionWithFailover(result, transaction);
return result;
return sendTransactionWithFailover(transaction);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief compares and swaps a single value in the backend
/// the CAS condition is whether or not a previous value existed for the key
////////////////////////////////////////////////////////////////////////////////
AgencyCommResult AgencyComm::casValue(std::string const& key,
arangodb::velocypack::Slice const& json,
bool prevExist, double ttl,
double timeout) {
AgencyCommResult result;
VPackBuilder newBuilder;
newBuilder.add(json);
@ -1423,19 +1432,8 @@ AgencyCommResult AgencyComm::casValue(std::string const& key,
operation._ttl = static_cast<uint32_t>(ttl);
}
std::string url(buildUrl());
url += "/write";
AgencyTransaction transaction(operation, precondition);
sendWithFailover(
arangodb::GeneralRequest::RequestType::POST,
timeout == 0.0 ? _globalConnectionOptions._requestTimeout : timeout,
result, url,
transaction.toJson(),
false);
return result;
AgencyWriteTransaction transaction(operation, precondition);
return sendTransactionWithFailover(transaction, timeout);
}
////////////////////////////////////////////////////////////////////////////////
@ -1448,8 +1446,6 @@ AgencyCommResult AgencyComm::casValue(std::string const& key,
VPackSlice const& oldJson,
VPackSlice const& newJson, double ttl,
double timeout) {
AgencyCommResult result;
VPackBuilder newBuilder;
newBuilder.add(newJson);
@ -1463,50 +1459,38 @@ AgencyCommResult AgencyComm::casValue(std::string const& key,
operation._ttl = static_cast<uint32_t>(ttl);
}
std::string url(buildUrl());
url += "/write";
AgencyTransaction transaction(operation, precondition);
sendWithFailover(
arangodb::GeneralRequest::RequestType::POST,
timeout == 0.0 ? _globalConnectionOptions._requestTimeout : timeout,
result, url,
transaction.toJson(),
false);
return result;
AgencyWriteTransaction transaction(operation, precondition);
return sendTransactionWithFailover(transaction, timeout);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief registers a callback on a key
////////////////////////////////////////////////////////////////////////////////
bool AgencyComm::registerCallback(std::string const& key, std::string const& endpoint) {
VPackBuilder builder;
builder.add(VPackValue(endpoint));
AgencyCommResult result;
AgencyOperation operation(key, AgencyValueOperationType::OBSERVE, builder.slice());
AgencyTransaction transaction(operation);
sendTransactionWithFailover(result, transaction);
AgencyWriteTransaction transaction(operation);
auto result = sendTransactionWithFailover(transaction);
return result.successful();
}
////////////////////////////////////////////////////////////////////////////////
/// @brief unregisters a callback on a key
////////////////////////////////////////////////////////////////////////////////
bool AgencyComm::unregisterCallback(std::string const& key, std::string const& endpoint) {
bool AgencyComm::unregisterCallback(std::string const& key,
std::string const& endpoint) {
VPackBuilder builder;
builder.add(VPackValue(endpoint));
AgencyCommResult result;
AgencyOperation operation(key, AgencyValueOperationType::UNOBSERVE, builder.slice());
AgencyTransaction transaction(operation);
sendTransactionWithFailover(result, transaction);
AgencyWriteTransaction transaction(operation);
auto result = sendTransactionWithFailover(transaction);
return result.successful();
}
@ -1839,16 +1823,16 @@ std::string AgencyComm::buildUrl() const {
//////////////////////////////////////////////////////////////////////////////
/// @brief sends a write HTTP request to the agency, handling failover
//////////////////////////////////////////////////////////////////////////////
bool AgencyComm::sendTransactionWithFailover(
AgencyCommResult& result,
AgencyTransaction const& transaction
) {
AgencyCommResult AgencyComm::sendTransactionWithFailover(
AgencyTransaction const& transaction, double timeout) {
std::string url(buildUrl());
url += "/write";
return sendWithFailover(arangodb::GeneralRequest::RequestType::POST,
_globalConnectionOptions._requestTimeout, result, url,
timeout == 0.0 ? _globalConnectionOptions._requestTimeout : timeout, url,
transaction.toJson(), false);
}
@ -1856,11 +1840,12 @@ bool AgencyComm::sendTransactionWithFailover(
/// @brief sends an HTTP request to the agency, handling failover
////////////////////////////////////////////////////////////////////////////////
bool AgencyComm::sendWithFailover(arangodb::GeneralRequest::RequestType method,
AgencyCommResult AgencyComm::sendWithFailover(
arangodb::GeneralRequest::RequestType method,
double const timeout,
AgencyCommResult& result,
std::string const& url,
std::string const& body, bool isWatch) {
size_t numEndpoints;
{
@ -1868,7 +1853,10 @@ bool AgencyComm::sendWithFailover(arangodb::GeneralRequest::RequestType method,
numEndpoints = AgencyComm::_globalEndpoints.size();
if (numEndpoints == 0) {
return false;
AgencyCommResult result;
result._statusCode = 400;
result._message = "No endpoints for agency found.";
return result;
}
}
@ -1878,13 +1866,15 @@ bool AgencyComm::sendWithFailover(arangodb::GeneralRequest::RequestType method,
std::string realUrl = url;
std::string forceEndpoint;
AgencyCommResult result;
while (tries++ < numEndpoints) {
AgencyEndpoint* agencyEndpoint = popEndpoint(forceEndpoint);
TRI_ASSERT(agencyEndpoint != nullptr);
try {
send(agencyEndpoint->_connection, method, timeout, result, realUrl, body);
result = send(agencyEndpoint->_connection, method, timeout, realUrl, body);
} catch (...) {
result._connected = false;
result._statusCode = 0;
@ -1893,7 +1883,7 @@ bool AgencyComm::sendWithFailover(arangodb::GeneralRequest::RequestType method,
agencyEndpoint->_connection->disconnect();
requeueEndpoint(agencyEndpoint, true);
return false;
break;
}
if (result._statusCode ==
@ -1919,14 +1909,14 @@ bool AgencyComm::sendWithFailover(arangodb::GeneralRequest::RequestType method,
offset = 6;
} else {
// invalid endpoint, return an error
return false;
break;
}
size_t const delim = endpoint.find('/', offset);
if (delim == std::string::npos) {
// invalid location header
return false;
break;
}
realUrl = endpoint.substr(delim);
@ -1947,7 +1937,7 @@ bool AgencyComm::sendWithFailover(arangodb::GeneralRequest::RequestType method,
<< "'. Will not follow!";
// this is an error
return false;
break;
}
forceEndpoint = endpoint;
@ -1969,24 +1959,23 @@ bool AgencyComm::sendWithFailover(arangodb::GeneralRequest::RequestType method,
if (canAbort) {
// we're done
return true;
break;
}
// otherwise, try next
}
// if we get here, we could not send data to any endpoint successfully
return false;
return result;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief sends data to the URL
////////////////////////////////////////////////////////////////////////////////
bool AgencyComm::send(arangodb::httpclient::GeneralClientConnection* connection,
AgencyCommResult AgencyComm::send(
arangodb::httpclient::GeneralClientConnection* connection,
arangodb::GeneralRequest::RequestType method,
double timeout, AgencyCommResult& result,
std::string const& url, std::string const& body) {
double timeout, std::string const& url, std::string const& body) {
TRI_ASSERT(connection != nullptr);
if (method == arangodb::GeneralRequest::RequestType::GET ||
@ -1997,6 +1986,7 @@ bool AgencyComm::send(arangodb::httpclient::GeneralClientConnection* connection,
TRI_ASSERT(!url.empty());
AgencyCommResult result;
result._connected = false;
result._statusCode = 0;
@ -2025,7 +2015,7 @@ bool AgencyComm::send(arangodb::httpclient::GeneralClientConnection* connection,
result._message = "could not send request to agency";
LOG(TRACE) << "sending request to agency failed";
return false;
return result;
}
if (!response->isComplete()) {
@ -2033,7 +2023,7 @@ bool AgencyComm::send(arangodb::httpclient::GeneralClientConnection* connection,
result._message = "sending request to agency failed";
LOG(TRACE) << "sending request to agency failed";
return false;
return result;
}
result._connected = true;
@ -2052,7 +2042,7 @@ bool AgencyComm::send(arangodb::httpclient::GeneralClientConnection* connection,
connection->disconnect();
result._message = "invalid agency response (header missing)";
return false;
return result;
}
}
@ -2073,9 +2063,9 @@ bool AgencyComm::send(arangodb::httpclient::GeneralClientConnection* connection,
<< result._body << "'";
if (result.successful()) {
return true;
return result;
}
connection->disconnect();
return false;
return result;
}

View File

@ -165,14 +165,14 @@ struct AgencyPrecondition {
AgencyPrecondition(std::string const& key, Type t, bool e);
AgencyPrecondition(std::string const& key, Type t, VPackSlice s);
AgencyPrecondition(std::string const& key, Type t, VPackSlice const s);
void toVelocyPack(arangodb::velocypack::Builder& builder) const;
std::string key;
Type type;
bool empty;
VPackSlice value;
VPackSlice const value;
};
struct AgencyOperation {
@ -190,7 +190,7 @@ struct AgencyOperation {
AgencyOperation(
std::string const& key,
AgencyValueOperationType opType,
VPackSlice value
VPackSlice const value
);
//////////////////////////////////////////////////////////////////////////////
@ -209,7 +209,17 @@ private:
VPackSlice _value;
};
//////////////////////////////////////////////////////////////////////////////
/// @brief AgencyTransaction base class
//////////////////////////////////////////////////////////////////////////////
struct AgencyTransaction {
virtual std::string toJson() const = 0;
virtual ~AgencyTransaction() {
}
};
struct AgencyWriteTransaction : public AgencyTransaction {
//////////////////////////////////////////////////////////////////////////////
/// @brief vector of preconditions
@ -233,13 +243,13 @@ struct AgencyTransaction {
/// @brief converts the transaction to json
//////////////////////////////////////////////////////////////////////////////
std::string toJson() const;
std::string toJson() const override final;
//////////////////////////////////////////////////////////////////////////////
/// @brief shortcut to create a transaction with one operation
//////////////////////////////////////////////////////////////////////////////
explicit AgencyTransaction(AgencyOperation const& operation) {
explicit AgencyWriteTransaction(AgencyOperation const& operation) {
operations.push_back(operation);
}
@ -248,14 +258,54 @@ struct AgencyTransaction {
/// precondition
//////////////////////////////////////////////////////////////////////////////
explicit AgencyTransaction(AgencyOperation const& operation,
explicit AgencyWriteTransaction(AgencyOperation const& operation,
AgencyPrecondition const& precondition) {
operations.push_back(operation);
preconditions.push_back(precondition);
}
explicit AgencyTransaction() {
//////////////////////////////////////////////////////////////////////////////
/// @brief default constructor
//////////////////////////////////////////////////////////////////////////////
AgencyWriteTransaction() = default;
};
struct AgencyReadTransaction : public AgencyTransaction {
//////////////////////////////////////////////////////////////////////////////
/// @brief vector of operations
//////////////////////////////////////////////////////////////////////////////
std::vector<std::string> keys;
//////////////////////////////////////////////////////////////////////////////
/// @brief converts the transaction to velocypack
//////////////////////////////////////////////////////////////////////////////
void toVelocyPack(arangodb::velocypack::Builder& builder) const;
//////////////////////////////////////////////////////////////////////////////
/// @brief converts the transaction to json
//////////////////////////////////////////////////////////////////////////////
std::string toJson() const override final;
//////////////////////////////////////////////////////////////////////////////
/// @brief shortcut to create a transaction with one operation
//////////////////////////////////////////////////////////////////////////////
explicit AgencyReadTransaction(std::string const& key) {
keys.push_back(key);
}
//////////////////////////////////////////////////////////////////////////////
/// @brief default constructor
//////////////////////////////////////////////////////////////////////////////
AgencyReadTransaction() = default;
};
struct AgencyCommResult {
@ -619,9 +669,9 @@ class AgencyComm {
/// @brief sends a transaction to the agency, handling failover
//////////////////////////////////////////////////////////////////////////////
bool sendTransactionWithFailover(
AgencyCommResult&,
AgencyTransaction const&
AgencyCommResult sendTransactionWithFailover(
AgencyTransaction const&,
double timeout = 0.0
);
private:
@ -660,10 +710,9 @@ class AgencyComm {
/// @brief sends a write HTTP request to the agency, handling failover
//////////////////////////////////////////////////////////////////////////////
bool sendWithFailover(
AgencyCommResult sendWithFailover(
arangodb::GeneralRequest::RequestType,
double,
AgencyCommResult&,
std::string const&,
std::string const&,
bool
@ -673,9 +722,9 @@ class AgencyComm {
/// @brief sends data to the URL
//////////////////////////////////////////////////////////////////////////////
bool send(arangodb::httpclient::GeneralClientConnection*,
AgencyCommResult send(arangodb::httpclient::GeneralClientConnection*,
arangodb::GeneralRequest::RequestType, double,
AgencyCommResult&, std::string const&, std::string const&);
std::string const&, std::string const&);
//////////////////////////////////////////////////////////////////////////////
/// @brief tries to establish a communication channel

View File

@ -1284,14 +1284,13 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName,
, AgencyPrecondition::EMPTY, true
);
AgencyTransaction transaction;
AgencyWriteTransaction transaction;
transaction.operations.push_back(createCollection);
transaction.operations.push_back(increaseVersion);
transaction.preconditions.push_back(precondition);
AgencyCommResult res;
ac.sendTransactionWithFailover(res, transaction);
AgencyCommResult res = ac.sendTransactionWithFailover(transaction);
if (!res.successful()) {
return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION_IN_PLAN,