1
0
Fork 0

Refactoring in RestReplicationHandler. Preparation for Enterprise Restore version.

This commit is contained in:
Michael Hackstein 2017-01-06 18:09:11 +01:00
parent a307ee57c4
commit b612e34aa7
2 changed files with 207 additions and 165 deletions

View File

@ -2445,21 +2445,9 @@ void RestReplicationHandler::handleCommandRestoreDataCoordinator() {
return;
}
// We need to distribute the documents we get over the shards:
auto shardIdsMap = col->shardIds();
std::unordered_map<std::string, size_t> shardTab;
std::vector<std::string> shardIds;
for (auto const& p : *shardIdsMap) {
shardTab.emplace(p.first, shardIds.size());
shardIds.push_back(p.first);
}
std::vector<StringBuffer*> bufs;
for (size_t j = 0; j < shardIds.size(); j++) {
auto b = std::make_unique<StringBuffer>(TRI_UNKNOWN_MEM_ZONE);
bufs.push_back(b.get());
b.release();
}
std::unordered_map<std::string, std::unique_ptr<StringBuffer>> shardTab;
prepareShardTable(ci, col.get(), shardTab);
std::string const invalidMsg =
std::string("received invalid JSON data for collection ") + name;
@ -2497,35 +2485,17 @@ void RestReplicationHandler::handleCommandRestoreDataCoordinator() {
res = restoreDataParser(ptr, pos, invalidMsg, false, errorMsg, key,
builder, doc, type);
if (res != TRI_ERROR_NO_ERROR) {
// We might need to clean up buffers
break;
}
if (!doc.isNone() && type != REPLICATION_MARKER_REMOVE) {
ShardID responsibleShard;
bool usesDefaultSharding;
res = ci->getResponsibleShard(col.get(), doc, true, responsibleShard,
usesDefaultSharding);
if (res != TRI_ERROR_NO_ERROR) {
errorMsg = "error during determining responsible shard";
res = TRI_ERROR_INTERNAL;
break;
} else {
auto it2 = shardTab.find(responsibleShard);
if (it2 == shardTab.end()) {
errorMsg = "cannot find responsible shard";
res = TRI_ERROR_INTERNAL;
break;
} else {
bufs[it2->second]->appendText(ptr, pos - ptr);
bufs[it2->second]->appendText("\n");
}
}
res =
insertDocInBuffer(ci, col.get(), doc, shardTab, ptr, pos, errorMsg);
} else if (type == REPLICATION_MARKER_REMOVE) {
// A remove marker, this has to be appended to all!
for (auto& buf : bufs) {
buf->appendText(ptr, pos - ptr);
buf->appendText("\n");
for (auto& it2 : shardTab) {
it2.second->appendText(ptr, pos - ptr);
it2.second->appendText("\n");
}
} else {
// How very strange!
@ -2540,137 +2510,11 @@ void RestReplicationHandler::handleCommandRestoreDataCoordinator() {
}
if (res == TRI_ERROR_NO_ERROR) {
// Set a few variables needed for our work:
ClusterComm* cc = ClusterComm::instance();
// Send a synchronous request to that shard using ClusterComm:
CoordTransactionID coordTransactionID = TRI_NewTickServer();
std::string forceopt;
std::string const& value = _request->value("force");
if (!value.empty()) {
bool force = StringUtils::boolean(value);
if (force) {
forceopt = "&force=true";
}
}
for (auto const& p : *shardIdsMap) {
auto it = shardTab.find(p.first);
if (it == shardTab.end()) {
errorMsg = "cannot find shard";
res = TRI_ERROR_INTERNAL;
} else {
auto headers =
std::make_unique<std::unordered_map<std::string, std::string>>();
size_t j = it->second;
auto body = std::make_shared<std::string const>(bufs[j]->c_str(),
bufs[j]->length());
cc->asyncRequest("", coordTransactionID, "shard:" + p.first,
arangodb::rest::RequestType::PUT,
"/_db/" + StringUtils::urlEncode(dbName) +
"/_api/replication/restore-data?collection=" +
p.first + forceopt,
body, headers, nullptr, 300.0);
}
}
// Now listen to the results:
unsigned int count;
unsigned int nrok = 0;
for (count = (int)(*shardIdsMap).size(); count > 0; count--) {
auto result = cc->wait("", coordTransactionID, 0, "", 0.0);
if (result.status == CL_COMM_RECEIVED) {
if (result.answer_code == rest::ResponseCode::OK ||
result.answer_code == rest::ResponseCode::CREATED) {
// copy default options
VPackOptions options = VPackOptions::Defaults;
options.checkAttributeUniqueness = true;
VPackSlice answer;
try {
answer = result.answer->payload(&options);
} catch (VPackException const& e) {
// Only log this error and try the next doc
LOG(DEBUG) << "failed to parse json object: '" << e.what() << "'";
continue;
}
if (answer.isObject()) {
VPackSlice const result = answer.get("result");
if (result.isBoolean()) {
if (result.getBoolean()) {
nrok++;
} else {
LOG(ERR) << "some shard result not OK";
}
} else {
VPackSlice const errorMessage = answer.get("errorMessage");
if (errorMessage.isString()) {
errorMsg.append(errorMessage.copyString());
errorMsg.push_back(':');
}
}
} else {
LOG(ERR) << "result body is no object";
}
} else if (result.answer_code == rest::ResponseCode::SERVER_ERROR) {
// copy default options
VPackOptions options = VPackOptions::Defaults;
options.checkAttributeUniqueness = true;
VPackSlice answer;
try {
answer = result.answer->payload(&options);
} catch (VPackException const& e) {
// Only log this error and try the next doc
LOG(DEBUG) << "failed to parse json object: '" << e.what() << "'";
continue;
}
if (answer.isObject()) {
VPackSlice const errorMessage = answer.get("errorMessage");
if (errorMessage.isString()) {
errorMsg.append(errorMessage.copyString());
errorMsg.push_back(':');
}
}
} else {
LOG(ERR) << "Bad answer code from shard: " << (int)result.answer_code;
}
} else {
LOG(ERR) << "Bad status from DBServer: " << result.status
<< ", msg: " << result.errorMessage
<< ", shard: " << result.shardID;
if (result.status >= CL_COMM_SENT) {
if (result.result.get() == nullptr) {
LOG(ERR) << "result.result is nullptr";
} else {
auto msg = result.result->getResultTypeMessage();
LOG(ERR) << "Bad HTTP return code: "
<< result.result->getHttpReturnCode() << ", msg: " << msg;
auto body = result.result->getBodyVelocyPack();
msg = body->toString();
LOG(ERR) << "Body: " << msg;
}
}
}
}
if (nrok != shardIdsMap->size()) {
errorMsg.append("some shard(s) produced error(s)");
res = TRI_ERROR_INTERNAL;
}
}
// Free all the string buffers:
for (size_t j = 0; j < shardIds.size(); j++) {
delete bufs[j];
res = sendBuffersToShards(shardTab, dbName, errorMsg);
}
if (res != TRI_ERROR_NO_ERROR) {
THROW_ARANGO_EXCEPTION_Message(res, errorMessage);
THROW_ARANGO_EXCEPTION_MESSAGE(res, errorMsg);
}
VPackBuilder result;
@ -3945,6 +3789,172 @@ void RestReplicationHandler::handleCommandGetIdForReadLockCollection() {
generateResult(rest::ResponseCode::OK, b.slice());
}
/// @brief Send content of buffers to each shard and wait for results
int RestReplicationHandler::sendBuffersToShards(
std::unordered_map<std::string, std::unique_ptr<StringBuffer>> const&
shardTab,
std::string const& dbName, std::string& errorMsg) const {
// Set a few variables needed for our work:
ClusterComm* cc = ClusterComm::instance();
// Send a synchronous request to that shard using ClusterComm:
CoordTransactionID coordTransactionID = TRI_NewTickServer();
std::string forceopt;
std::string const& value = _request->value("force");
if (!value.empty()) {
bool force = StringUtils::boolean(value);
if (force) {
forceopt = "&force=true";
}
}
int res = TRI_ERROR_NO_ERROR;
for (auto const& it : shardTab) {
auto headers =
std::make_unique<std::unordered_map<std::string, std::string>>();
auto body = std::make_shared<std::string const>(it.second->c_str(),
it.second->length());
cc->asyncRequest("", coordTransactionID, "shard:" + it.first,
arangodb::rest::RequestType::PUT,
"/_db/" + StringUtils::urlEncode(dbName) +
"/_api/replication/restore-data?collection=" +
it.first + forceopt,
body, headers, nullptr, 300.0);
}
// Now listen to the results:
size_t count;
size_t nrok = 0;
for (count = shardTab.size(); count > 0; count--) {
auto result = cc->wait("", coordTransactionID, 0, "", 0.0);
if (result.status == CL_COMM_RECEIVED) {
if (result.answer_code == rest::ResponseCode::OK ||
result.answer_code == rest::ResponseCode::CREATED) {
// copy default options
VPackOptions options = VPackOptions::Defaults;
options.checkAttributeUniqueness = true;
VPackSlice answer;
try {
answer = result.answer->payload(&options);
} catch (VPackException const& e) {
// Only log this error and try the next doc
LOG(DEBUG) << "failed to parse json object: '" << e.what() << "'";
continue;
}
if (answer.isObject()) {
VPackSlice const result = answer.get("result");
if (result.isBoolean()) {
if (result.getBoolean()) {
nrok++;
} else {
LOG(ERR) << "some shard result not OK";
}
} else {
VPackSlice const errorMessage = answer.get("errorMessage");
if (errorMessage.isString()) {
errorMsg.append(errorMessage.copyString());
errorMsg.push_back(':');
}
}
} else {
LOG(ERR) << "result body is no object";
}
} else if (result.answer_code == rest::ResponseCode::SERVER_ERROR) {
// copy default options
VPackOptions options = VPackOptions::Defaults;
options.checkAttributeUniqueness = true;
VPackSlice answer;
try {
answer = result.answer->payload(&options);
} catch (VPackException const& e) {
// Only log this error and try the next doc
LOG(DEBUG) << "failed to parse json object: '" << e.what() << "'";
continue;
}
if (answer.isObject()) {
VPackSlice const errorMessage = answer.get("errorMessage");
if (errorMessage.isString()) {
errorMsg.append(errorMessage.copyString());
errorMsg.push_back(':');
}
}
} else {
LOG(ERR) << "Bad answer code from shard: " << (int)result.answer_code;
}
} else {
LOG(ERR) << "Bad status from DBServer: " << result.status
<< ", msg: " << result.errorMessage
<< ", shard: " << result.shardID;
if (result.status >= CL_COMM_SENT) {
if (result.result.get() == nullptr) {
LOG(ERR) << "result.result is nullptr";
} else {
auto msg = result.result->getResultTypeMessage();
LOG(ERR) << "Bad HTTP return code: "
<< result.result->getHttpReturnCode() << ", msg: " << msg;
auto body = result.result->getBodyVelocyPack();
msg = body->toString();
LOG(ERR) << "Body: " << msg;
}
}
}
}
if (nrok != shardTab.size()) {
errorMsg.append("some shard(s) produced error(s)");
res = TRI_ERROR_INTERNAL;
}
return res;
}
/// @brief Insert a NON-REMOVE marker into the shardTab.
int RestReplicationHandler::insertDocInBuffer(
ClusterInfo* ci, LogicalCollection* col, VPackSlice doc,
std::unordered_map<std::string, std::unique_ptr<StringBuffer>> const&
shardTab,
char const* ptr, char const* pos, std::string& errorMsg) const {
ShardID responsibleShard;
bool usesDefaultSharding;
int res = ci->getResponsibleShard(col, doc, true, responsibleShard,
usesDefaultSharding);
if (res != TRI_ERROR_NO_ERROR) {
errorMsg = "error during determining responsible shard";
return TRI_ERROR_INTERNAL;
} else {
auto it2 = shardTab.find(responsibleShard);
if (it2 == shardTab.end()) {
errorMsg = "cannot find responsible shard";
return TRI_ERROR_INTERNAL;
} else {
it2->second->appendText(ptr, pos - ptr);
it2->second->appendText("\n");
}
}
return TRI_ERROR_NO_ERROR;
}
void RestReplicationHandler::prepareShardTable(
ClusterInfo* ci, LogicalCollection* col,
std::unordered_map<std::string, std::unique_ptr<StringBuffer>>&
shardTab) const {
// We need to distribute the documents we get over the shards:
auto shardIdsMap = col->shardIds();
for (auto const& p : *shardIdsMap) {
shardTab.emplace(p.first, std::make_unique<StringBuffer>(TRI_UNKNOWN_MEM_ZONE));
}
}
//////////////////////////////////////////////////////////////////////////////
/// @brief condition locker to wake up holdReadLockCollection jobs
//////////////////////////////////////////////////////////////////////////////

View File

@ -30,6 +30,7 @@
#include "VocBase/replication-common.h"
namespace arangodb {
class ClusterInfo;
class CollectionNameResolver;
class LogicalCollection;
class Transaction;
@ -353,6 +354,37 @@ class RestReplicationHandler : public RestVocbaseBaseHandler {
void handleCommandGetIdForReadLockCollection();
//////////////////////////////////////////////////////////////////////////////
/// @brief Send content of buffers to each shard and wait for results
//////////////////////////////////////////////////////////////////////////////
int sendBuffersToShards(
std::unordered_map<std::string,
std::unique_ptr<arangodb::basics::StringBuffer>> const&
shardTab,
std::string const& dbName, std::string& errorMsg) const;
//////////////////////////////////////////////////////////////////////////////
/// @brief Insert a NON-REMOVE marker into the shardTab.
//////////////////////////////////////////////////////////////////////////////
int insertDocInBuffer(
arangodb::ClusterInfo* ci, arangodb::LogicalCollection* col,
arangodb::velocypack::Slice doc,
std::unordered_map<std::string,
std::unique_ptr<arangodb::basics::StringBuffer>> const&
shardTab,
char const* ptr, char const* pos, std::string& errorMsg) const;
/// @brief Prepare the ShardTable mapping a ShardID => StringBuffer
void prepareShardTable(
arangodb::ClusterInfo* ci, arangodb::LogicalCollection* col,
std::unordered_map<std::string,
std::unique_ptr<arangodb::basics::StringBuffer>>&
shardTab) const;
private:
//////////////////////////////////////////////////////////////////////////////
/// @brief minimum chunk size