From b612e34aa7ff106dff7121f1149d6ac6ef9140ba Mon Sep 17 00:00:00 2001 From: Michael Hackstein Date: Fri, 6 Jan 2017 18:09:11 +0100 Subject: [PATCH] Refactoring in RestReplicationHandler. Preparation for Enterprise Restore version. --- .../RestHandler/RestReplicationHandler.cpp | 340 +++++++++--------- arangod/RestHandler/RestReplicationHandler.h | 32 ++ 2 files changed, 207 insertions(+), 165 deletions(-) diff --git a/arangod/RestHandler/RestReplicationHandler.cpp b/arangod/RestHandler/RestReplicationHandler.cpp index af1495af12..ed7f1330da 100644 --- a/arangod/RestHandler/RestReplicationHandler.cpp +++ b/arangod/RestHandler/RestReplicationHandler.cpp @@ -2445,21 +2445,9 @@ void RestReplicationHandler::handleCommandRestoreDataCoordinator() { return; } - // We need to distribute the documents we get over the shards: - auto shardIdsMap = col->shardIds(); - std::unordered_map shardTab; - std::vector shardIds; - for (auto const& p : *shardIdsMap) { - shardTab.emplace(p.first, shardIds.size()); - shardIds.push_back(p.first); - } - std::vector bufs; - for (size_t j = 0; j < shardIds.size(); j++) { - auto b = std::make_unique(TRI_UNKNOWN_MEM_ZONE); - bufs.push_back(b.get()); - b.release(); - } + std::unordered_map> shardTab; + prepareShardTable(ci, col.get(), shardTab); std::string const invalidMsg = std::string("received invalid JSON data for collection ") + name; @@ -2497,35 +2485,17 @@ void RestReplicationHandler::handleCommandRestoreDataCoordinator() { res = restoreDataParser(ptr, pos, invalidMsg, false, errorMsg, key, builder, doc, type); if (res != TRI_ERROR_NO_ERROR) { - // We might need to clean up buffers break; } if (!doc.isNone() && type != REPLICATION_MARKER_REMOVE) { - ShardID responsibleShard; - bool usesDefaultSharding; - res = ci->getResponsibleShard(col.get(), doc, true, responsibleShard, - usesDefaultSharding); - if (res != TRI_ERROR_NO_ERROR) { - errorMsg = "error during determining responsible shard"; - res = TRI_ERROR_INTERNAL; - break; - } else { - auto it2 = shardTab.find(responsibleShard); - if (it2 == shardTab.end()) { - errorMsg = "cannot find responsible shard"; - res = TRI_ERROR_INTERNAL; - break; - } else { - bufs[it2->second]->appendText(ptr, pos - ptr); - bufs[it2->second]->appendText("\n"); - } - } + res = + insertDocInBuffer(ci, col.get(), doc, shardTab, ptr, pos, errorMsg); } else if (type == REPLICATION_MARKER_REMOVE) { // A remove marker, this has to be appended to all! - for (auto& buf : bufs) { - buf->appendText(ptr, pos - ptr); - buf->appendText("\n"); + for (auto& it2 : shardTab) { + it2.second->appendText(ptr, pos - ptr); + it2.second->appendText("\n"); } } else { // How very strange! @@ -2540,137 +2510,11 @@ void RestReplicationHandler::handleCommandRestoreDataCoordinator() { } if (res == TRI_ERROR_NO_ERROR) { - // Set a few variables needed for our work: - ClusterComm* cc = ClusterComm::instance(); - - // Send a synchronous request to that shard using ClusterComm: - CoordTransactionID coordTransactionID = TRI_NewTickServer(); - - std::string forceopt; - std::string const& value = _request->value("force"); - - if (!value.empty()) { - bool force = StringUtils::boolean(value); - - if (force) { - forceopt = "&force=true"; - } - } - - for (auto const& p : *shardIdsMap) { - auto it = shardTab.find(p.first); - if (it == shardTab.end()) { - errorMsg = "cannot find shard"; - res = TRI_ERROR_INTERNAL; - } else { - auto headers = - std::make_unique>(); - size_t j = it->second; - auto body = std::make_shared(bufs[j]->c_str(), - bufs[j]->length()); - cc->asyncRequest("", coordTransactionID, "shard:" + p.first, - arangodb::rest::RequestType::PUT, - "/_db/" + StringUtils::urlEncode(dbName) + - "/_api/replication/restore-data?collection=" + - p.first + forceopt, - body, headers, nullptr, 300.0); - } - } - - // Now listen to the results: - unsigned int count; - unsigned int nrok = 0; - for (count = (int)(*shardIdsMap).size(); count > 0; count--) { - auto result = cc->wait("", coordTransactionID, 0, "", 0.0); - if (result.status == CL_COMM_RECEIVED) { - if (result.answer_code == rest::ResponseCode::OK || - result.answer_code == rest::ResponseCode::CREATED) { - // copy default options - VPackOptions options = VPackOptions::Defaults; - options.checkAttributeUniqueness = true; - - VPackSlice answer; - try { - answer = result.answer->payload(&options); - } catch (VPackException const& e) { - // Only log this error and try the next doc - LOG(DEBUG) << "failed to parse json object: '" << e.what() << "'"; - continue; - } - - if (answer.isObject()) { - VPackSlice const result = answer.get("result"); - if (result.isBoolean()) { - if (result.getBoolean()) { - nrok++; - } else { - LOG(ERR) << "some shard result not OK"; - } - } else { - VPackSlice const errorMessage = answer.get("errorMessage"); - if (errorMessage.isString()) { - errorMsg.append(errorMessage.copyString()); - errorMsg.push_back(':'); - } - } - } else { - LOG(ERR) << "result body is no object"; - } - } else if (result.answer_code == rest::ResponseCode::SERVER_ERROR) { - // copy default options - VPackOptions options = VPackOptions::Defaults; - options.checkAttributeUniqueness = true; - VPackSlice answer; - try { - answer = result.answer->payload(&options); - } catch (VPackException const& e) { - // Only log this error and try the next doc - LOG(DEBUG) << "failed to parse json object: '" << e.what() << "'"; - continue; - } - - if (answer.isObject()) { - VPackSlice const errorMessage = answer.get("errorMessage"); - if (errorMessage.isString()) { - errorMsg.append(errorMessage.copyString()); - errorMsg.push_back(':'); - } - } - } else { - LOG(ERR) << "Bad answer code from shard: " << (int)result.answer_code; - } - } else { - LOG(ERR) << "Bad status from DBServer: " << result.status - << ", msg: " << result.errorMessage - << ", shard: " << result.shardID; - if (result.status >= CL_COMM_SENT) { - if (result.result.get() == nullptr) { - LOG(ERR) << "result.result is nullptr"; - } else { - auto msg = result.result->getResultTypeMessage(); - LOG(ERR) << "Bad HTTP return code: " - << result.result->getHttpReturnCode() << ", msg: " << msg; - auto body = result.result->getBodyVelocyPack(); - msg = body->toString(); - LOG(ERR) << "Body: " << msg; - } - } - } - } - - if (nrok != shardIdsMap->size()) { - errorMsg.append("some shard(s) produced error(s)"); - res = TRI_ERROR_INTERNAL; - } - } - - // Free all the string buffers: - for (size_t j = 0; j < shardIds.size(); j++) { - delete bufs[j]; + res = sendBuffersToShards(shardTab, dbName, errorMsg); } if (res != TRI_ERROR_NO_ERROR) { - THROW_ARANGO_EXCEPTION_Message(res, errorMessage); + THROW_ARANGO_EXCEPTION_MESSAGE(res, errorMsg); } VPackBuilder result; @@ -3945,6 +3789,172 @@ void RestReplicationHandler::handleCommandGetIdForReadLockCollection() { generateResult(rest::ResponseCode::OK, b.slice()); } +/// @brief Send content of buffers to each shard and wait for results + +int RestReplicationHandler::sendBuffersToShards( + std::unordered_map> const& + shardTab, + std::string const& dbName, std::string& errorMsg) const { + // Set a few variables needed for our work: + ClusterComm* cc = ClusterComm::instance(); + + // Send a synchronous request to that shard using ClusterComm: + CoordTransactionID coordTransactionID = TRI_NewTickServer(); + + std::string forceopt; + std::string const& value = _request->value("force"); + + if (!value.empty()) { + bool force = StringUtils::boolean(value); + + if (force) { + forceopt = "&force=true"; + } + } + + int res = TRI_ERROR_NO_ERROR; + + for (auto const& it : shardTab) { + auto headers = + std::make_unique>(); + auto body = std::make_shared(it.second->c_str(), + it.second->length()); + cc->asyncRequest("", coordTransactionID, "shard:" + it.first, + arangodb::rest::RequestType::PUT, + "/_db/" + StringUtils::urlEncode(dbName) + + "/_api/replication/restore-data?collection=" + + it.first + forceopt, + body, headers, nullptr, 300.0); + } + + // Now listen to the results: + size_t count; + size_t nrok = 0; + for (count = shardTab.size(); count > 0; count--) { + auto result = cc->wait("", coordTransactionID, 0, "", 0.0); + if (result.status == CL_COMM_RECEIVED) { + if (result.answer_code == rest::ResponseCode::OK || + result.answer_code == rest::ResponseCode::CREATED) { + // copy default options + VPackOptions options = VPackOptions::Defaults; + options.checkAttributeUniqueness = true; + + VPackSlice answer; + try { + answer = result.answer->payload(&options); + } catch (VPackException const& e) { + // Only log this error and try the next doc + LOG(DEBUG) << "failed to parse json object: '" << e.what() << "'"; + continue; + } + + if (answer.isObject()) { + VPackSlice const result = answer.get("result"); + if (result.isBoolean()) { + if (result.getBoolean()) { + nrok++; + } else { + LOG(ERR) << "some shard result not OK"; + } + } else { + VPackSlice const errorMessage = answer.get("errorMessage"); + if (errorMessage.isString()) { + errorMsg.append(errorMessage.copyString()); + errorMsg.push_back(':'); + } + } + } else { + LOG(ERR) << "result body is no object"; + } + } else if (result.answer_code == rest::ResponseCode::SERVER_ERROR) { + // copy default options + VPackOptions options = VPackOptions::Defaults; + options.checkAttributeUniqueness = true; + VPackSlice answer; + try { + answer = result.answer->payload(&options); + } catch (VPackException const& e) { + // Only log this error and try the next doc + LOG(DEBUG) << "failed to parse json object: '" << e.what() << "'"; + continue; + } + + if (answer.isObject()) { + VPackSlice const errorMessage = answer.get("errorMessage"); + if (errorMessage.isString()) { + errorMsg.append(errorMessage.copyString()); + errorMsg.push_back(':'); + } + } + } else { + LOG(ERR) << "Bad answer code from shard: " << (int)result.answer_code; + } + } else { + LOG(ERR) << "Bad status from DBServer: " << result.status + << ", msg: " << result.errorMessage + << ", shard: " << result.shardID; + if (result.status >= CL_COMM_SENT) { + if (result.result.get() == nullptr) { + LOG(ERR) << "result.result is nullptr"; + } else { + auto msg = result.result->getResultTypeMessage(); + LOG(ERR) << "Bad HTTP return code: " + << result.result->getHttpReturnCode() << ", msg: " << msg; + auto body = result.result->getBodyVelocyPack(); + msg = body->toString(); + LOG(ERR) << "Body: " << msg; + } + } + } + } + + if (nrok != shardTab.size()) { + errorMsg.append("some shard(s) produced error(s)"); + res = TRI_ERROR_INTERNAL; + } + return res; +} + +/// @brief Insert a NON-REMOVE marker into the shardTab. + +int RestReplicationHandler::insertDocInBuffer( + ClusterInfo* ci, LogicalCollection* col, VPackSlice doc, + std::unordered_map> const& + shardTab, + char const* ptr, char const* pos, std::string& errorMsg) const { + ShardID responsibleShard; + bool usesDefaultSharding; + int res = ci->getResponsibleShard(col, doc, true, responsibleShard, + usesDefaultSharding); + if (res != TRI_ERROR_NO_ERROR) { + errorMsg = "error during determining responsible shard"; + return TRI_ERROR_INTERNAL; + } else { + auto it2 = shardTab.find(responsibleShard); + if (it2 == shardTab.end()) { + errorMsg = "cannot find responsible shard"; + return TRI_ERROR_INTERNAL; + } else { + it2->second->appendText(ptr, pos - ptr); + it2->second->appendText("\n"); + } + } + return TRI_ERROR_NO_ERROR; +} + +void RestReplicationHandler::prepareShardTable( + ClusterInfo* ci, LogicalCollection* col, + std::unordered_map>& + shardTab) const { + // We need to distribute the documents we get over the shards: + auto shardIdsMap = col->shardIds(); + for (auto const& p : *shardIdsMap) { + shardTab.emplace(p.first, std::make_unique(TRI_UNKNOWN_MEM_ZONE)); + } +} + + + ////////////////////////////////////////////////////////////////////////////// /// @brief condition locker to wake up holdReadLockCollection jobs ////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/RestHandler/RestReplicationHandler.h b/arangod/RestHandler/RestReplicationHandler.h index 6f0e670d13..ce9b225d3a 100644 --- a/arangod/RestHandler/RestReplicationHandler.h +++ b/arangod/RestHandler/RestReplicationHandler.h @@ -30,6 +30,7 @@ #include "VocBase/replication-common.h" namespace arangodb { +class ClusterInfo; class CollectionNameResolver; class LogicalCollection; class Transaction; @@ -353,6 +354,37 @@ class RestReplicationHandler : public RestVocbaseBaseHandler { void handleCommandGetIdForReadLockCollection(); + + ////////////////////////////////////////////////////////////////////////////// + /// @brief Send content of buffers to each shard and wait for results + ////////////////////////////////////////////////////////////////////////////// + + int sendBuffersToShards( + std::unordered_map> const& + shardTab, + std::string const& dbName, std::string& errorMsg) const; + + ////////////////////////////////////////////////////////////////////////////// + /// @brief Insert a NON-REMOVE marker into the shardTab. + ////////////////////////////////////////////////////////////////////////////// + + int insertDocInBuffer( + arangodb::ClusterInfo* ci, arangodb::LogicalCollection* col, + arangodb::velocypack::Slice doc, + std::unordered_map> const& + shardTab, + char const* ptr, char const* pos, std::string& errorMsg) const; + + /// @brief Prepare the ShardTable mapping a ShardID => StringBuffer + + void prepareShardTable( + arangodb::ClusterInfo* ci, arangodb::LogicalCollection* col, + std::unordered_map>& + shardTab) const; + private: ////////////////////////////////////////////////////////////////////////////// /// @brief minimum chunk size