1
0
Fork 0

Start cluster mode for arangodump. Unfinished.

This commit is contained in:
Max Neunhoeffer 2014-05-07 16:46:41 +02:00
parent a312d496a9
commit f6bd2bbe91
1 changed files with 242 additions and 9 deletions

View File

@ -148,6 +148,12 @@ static uint64_t TickEnd = 0;
static uint64_t BatchId = 0;
////////////////////////////////////////////////////////////////////////////////
/// @brief cluster mode flag
////////////////////////////////////////////////////////////////////////////////
static bool clusterMode = false;
////////////////////////////////////////////////////////////////////////////////
/// @brief statistics
////////////////////////////////////////////////////////////////////////////////
@ -190,6 +196,7 @@ static void ParseProgramOptions (int argc, char* argv[]) {
("progress", &Progress, "show progress")
("tick-start", &TickStart, "only include data after this tick")
("tick-end", &TickEnd, "last tick to be included in data dump")
("cluster", &clusterMode, "dump data from a cluster via a coordinator")
;
BaseClient.setupGeneral(description);
@ -809,6 +816,226 @@ static int RunDump (string& errorMsg) {
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief dump data from cluster via a coordinator
////////////////////////////////////////////////////////////////////////////////
static int RunClusterDump (string& errorMsg) {
map<string, string> headers;
const string url = "/_api/replication/clusterInventory?includeSystem=" +
string(IncludeSystemCollections ? "true" : "false");
SimpleHttpResult* response = Client->request(HttpRequest::HTTP_REQUEST_GET,
url,
0,
0,
headers);
if (response == 0 || ! response->isComplete()) {
errorMsg = "got invalid response from server: " + Client->getErrorMessage();
if (response != 0) {
delete response;
}
return TRI_ERROR_INTERNAL;
}
if (response->wasHttpError()) {
errorMsg = "got invalid response from server: HTTP " +
StringUtils::itoa(response->getHttpReturnCode()) + ": " +
response->getHttpReturnMessage();
delete response;
return TRI_ERROR_INTERNAL;
}
const StringBuffer& data = response->getBody();
TRI_json_t* json = TRI_JsonString(TRI_UNKNOWN_MEM_ZONE, data.c_str());
delete response;
if (! JsonHelper::isArray(json)) {
if (json != 0) {
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
}
errorMsg = "got malformed JSON response from server";
return TRI_ERROR_INTERNAL;
}
TRI_json_t const* collections = JsonHelper::getArrayElement(json, "collections");
if (! JsonHelper::isList(collections)) {
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
errorMsg = "got malformed JSON response from server";
return TRI_ERROR_INTERNAL;
}
const string tickString = JsonHelper::getStringValue(json, "tick", "");
if (tickString == "") {
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
errorMsg = "got malformed JSON response from server";
return TRI_ERROR_INTERNAL;
}
cout << "Last tick provided by server is: " << tickString << endl;
// read the server's max tick value
uint64_t maxTick = StringUtils::uint64(tickString);
// check if the user specific a max tick value
if (TickEnd > 0 && maxTick > TickEnd) {
maxTick = TickEnd;
}
// create a lookup table for collections
map<string, bool> restrictList;
for (size_t i = 0; i < Collections.size(); ++i) {
restrictList.insert(pair<string, bool>(Collections[i], true));
}
// iterate over collections
const size_t n = collections->_value._objects._length;
for (size_t i = 0; i < n; ++i) {
TRI_json_t const* collection = (TRI_json_t const*) TRI_AtVector(&collections->_value._objects, i);
if (! JsonHelper::isArray(collection)) {
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
errorMsg = "got malformed JSON response from server";
return TRI_ERROR_INTERNAL;
}
TRI_json_t const* parameters = JsonHelper::getArrayElement(collection, "parameters");
if (! JsonHelper::isArray(parameters)) {
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
errorMsg = "got malformed JSON response from server";
return TRI_ERROR_INTERNAL;
}
const string cid = JsonHelper::getStringValue(parameters, "cid", "");
const string name = JsonHelper::getStringValue(parameters, "name", "");
const bool deleted = JsonHelper::getBooleanValue(parameters, "deleted", false);
if (cid == "" || name == "") {
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
errorMsg = "got malformed JSON response from server";
return TRI_ERROR_INTERNAL;
}
if (deleted) {
continue;
}
if (name[0] == '_' && ! IncludeSystemCollections) {
continue;
}
if (restrictList.size() > 0 &&
restrictList.find(name) == restrictList.end()) {
// collection name not in list
continue;
}
// found a collection!
if (Progress) {
cout << "dumping collection '" << name << "'..." << endl;
}
// now save the collection meta data and/or the actual data
Stats._totalCollections++;
{
// save meta data
string fileName;
fileName = OutputDirectory + TRI_DIR_SEPARATOR_STR + name + ".structure.json";
int fd;
// remove an existing file first
if (TRI_ExistsFile(fileName.c_str())) {
TRI_UnlinkFile(fileName.c_str());
}
fd = TRI_CREATE(fileName.c_str(), O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
if (fd < 0) {
errorMsg = "cannot write to file '" + fileName + "'";
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
return TRI_ERROR_CANNOT_WRITE_FILE;
}
const string collectionInfo = JsonHelper::toString(collection);
if (! TRI_WritePointer(fd, collectionInfo.c_str(), collectionInfo.size())) {
TRI_CLOSE(fd);
errorMsg = "cannot write to file '" + fileName + "'";
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
return TRI_ERROR_CANNOT_WRITE_FILE;
}
TRI_CLOSE(fd);
}
if (DumpData) {
// save the actual data
string fileName;
fileName = OutputDirectory + TRI_DIR_SEPARATOR_STR + name + ".data.json";
int fd;
// remove an existing file first
if (TRI_ExistsFile(fileName.c_str())) {
TRI_UnlinkFile(fileName.c_str());
}
fd = TRI_CREATE(fileName.c_str(), O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
if (fd < 0) {
errorMsg = "cannot write to file '" + fileName + "'";
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
return TRI_ERROR_CANNOT_WRITE_FILE;
}
ExtendBatch();
int res = DumpCollection(fd, cid, name, parameters, maxTick, errorMsg);
TRI_CLOSE(fd);
if (res != TRI_ERROR_NO_ERROR) {
if (errorMsg.empty()) {
errorMsg = "cannot write to file '" + fileName + "'";
}
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
return res;
}
}
}
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief request location rewriter (injects database name)
////////////////////////////////////////////////////////////////////////////////
@ -971,17 +1198,23 @@ int main (int argc, char* argv[]) {
string errorMsg = "";
int res = StartBatch(errorMsg);
if (res != TRI_ERROR_NO_ERROR && Force) {
res = TRI_ERROR_NO_ERROR;
}
int res;
if (!clusterMode) {
res = StartBatch(errorMsg);
if (res != TRI_ERROR_NO_ERROR && Force) {
res = TRI_ERROR_NO_ERROR;
}
if (res == TRI_ERROR_NO_ERROR) {
res = RunDump(errorMsg);
}
if (res == TRI_ERROR_NO_ERROR) {
res = RunDump(errorMsg);
}
if (BatchId > 0) {
EndBatch();
if (BatchId > 0) {
EndBatch();
}
}
else { // clusterMode == true
res = RunClusterDump(errorMsg);
}
if (Progress) {