1
0
Fork 0

include collection name in replication output

This commit is contained in:
Jan Steemann 2014-11-24 12:55:35 +01:00 committed by Frank Celler
parent e68869d23e
commit 0e2770996b
3 changed files with 97 additions and 54 deletions

View File

@ -307,6 +307,20 @@ int ContinuousSyncer::processDocument (TRI_replication_operation_e type,
return TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND;
}
// extract optional "cname"
TRI_json_t const* cnameJson = JsonHelper::getArrayElement(json, "cname");
if (JsonHelper::isString(cnameJson)) {
string const cnameString = JsonHelper::getStringValue(json, "cname", "");
if (! cnameString.empty() && cnameString[0] == '_') {
// system collection
TRI_vocbase_col_t* col = TRI_LookupCollectionByNameVocBase(_vocbase, cnameString.c_str());
if (col != nullptr && col->_cid != cid) {
// cid change? this may happen for system collections
cid = col->_cid;
}
}
}
// extract "key"
TRI_json_t const* keyJson = JsonHelper::getArrayElement(json, "key");
@ -724,8 +738,8 @@ int ContinuousSyncer::applyLog (SimpleHttpResult* response,
}
if (ignoreCount == 0) {
if (line.size() > 128) {
errorMsg += ", offending marker: " + line.substr(0, 128) + "...";
if (line.size() > 256) {
errorMsg += ", offending marker: " + line.substr(0, 256) + "...";
}
else {
errorMsg += ", offending marker: " + line;;

View File

@ -85,7 +85,7 @@ static int ReadTick (TRI_json_t const* json,
TRI_voc_tick_t* dst) {
TRI_json_t* tick;
TRI_ASSERT(json != NULL);
TRI_ASSERT(json != nullptr);
TRI_ASSERT(json->_type == TRI_JSON_ARRAY);
tick = TRI_LookupArrayJson(json, attributeName);
@ -117,32 +117,32 @@ static TRI_json_t* JsonConfiguration (TRI_replication_applier_configuration_t co
json = TRI_CreateArray2Json(TRI_CORE_MEM_ZONE, 9);
if (json == NULL) {
return NULL;
if (json == nullptr) {
return nullptr;
}
if (config->_endpoint != NULL) {
if (config->_endpoint != nullptr) {
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE,
json,
"endpoint",
TRI_CreateStringCopyJson(TRI_CORE_MEM_ZONE, config->_endpoint));
}
if (config->_database != NULL) {
if (config->_database != nullptr) {
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE,
json,
"database",
TRI_CreateStringCopyJson(TRI_CORE_MEM_ZONE, config->_database));
}
if (config->_username != NULL) {
if (config->_username != nullptr) {
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE,
json,
"username",
TRI_CreateStringCopyJson(TRI_CORE_MEM_ZONE, config->_username));
}
if (config->_password != NULL && includePassword) {
if (config->_password != nullptr && includePassword) {
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE,
json,
"password",
@ -159,12 +159,11 @@ static TRI_json_t* JsonConfiguration (TRI_replication_applier_configuration_t co
"connectTimeout",
TRI_CreateNumberJson(TRI_CORE_MEM_ZONE, config->_connectTimeout));
/* TODO: decide about the fate of this...
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE,
json,
"ignoreErrors",
TRI_CreateNumberJson(TRI_CORE_MEM_ZONE, (double) config->_ignoreErrors));
*/
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE,
json,
"maxConnectRetries",
@ -215,11 +214,11 @@ static int LoadConfiguration (TRI_vocbase_t* vocbase,
return TRI_ERROR_FILE_NOT_FOUND;
}
json = TRI_JsonFile(TRI_CORE_MEM_ZONE, filename, NULL);
json = TRI_JsonFile(TRI_CORE_MEM_ZONE, filename, nullptr);
TRI_FreeString(TRI_CORE_MEM_ZONE, filename);
if (! TRI_IsArrayJson(json)) {
if (json != NULL) {
if (json != nullptr) {
TRI_FreeJson(TRI_CORE_MEM_ZONE, json);
}
@ -228,21 +227,21 @@ static int LoadConfiguration (TRI_vocbase_t* vocbase,
res = TRI_ERROR_NO_ERROR;
if (config->_endpoint != NULL) {
if (config->_endpoint != nullptr) {
TRI_FreeString(TRI_CORE_MEM_ZONE, config->_endpoint);
config->_endpoint = NULL;
config->_endpoint = nullptr;
}
if (config->_database != NULL) {
if (config->_database != nullptr) {
TRI_FreeString(TRI_CORE_MEM_ZONE, config->_database);
config->_database = NULL;
config->_database = nullptr;
}
if (config->_username != NULL) {
if (config->_username != nullptr) {
TRI_FreeString(TRI_CORE_MEM_ZONE, config->_username);
config->_username = NULL;
config->_username = nullptr;
}
if (config->_password != NULL) {
if (config->_password != nullptr) {
TRI_FreeString(TRI_CORE_MEM_ZONE, config->_password);
config->_password = NULL;
config->_password = nullptr;
}
// read the endpoint
@ -328,6 +327,20 @@ static int LoadConfiguration (TRI_vocbase_t* vocbase,
if (TRI_IsBooleanJson(value)) {
config->_adaptivePolling = value->_value._boolean;
}
value = TRI_LookupArrayJson(json, "ignoreErrors");
if (TRI_IsNumberJson(value)) {
config->_ignoreErrors = value->_value._number;
}
else if (TRI_IsBooleanJson(value)) {
if (value->_value._boolean) {
config->_ignoreErrors = UINT64_MAX;
}
else {
config->_ignoreErrors = 0;
}
}
TRI_FreeJson(TRI_CORE_MEM_ZONE, json);
@ -354,8 +367,8 @@ static TRI_json_t* JsonApplyState (TRI_replication_applier_state_t const* state)
json = TRI_CreateArray2Json(TRI_CORE_MEM_ZONE, 4);
if (json == NULL) {
return NULL;
if (json == nullptr) {
return nullptr;
}
lastProcessedContinuousTick = TRI_StringUInt64(state->_lastProcessedContinuousTick);
@ -390,7 +403,7 @@ static int SetError (TRI_replication_applier_t* applier,
TRI_replication_applier_state_t* state;
char const* realMsg;
if (msg == NULL || strlen(msg) == 0) {
if (msg == nullptr || strlen(msg) == 0) {
realMsg = TRI_errno_string(errorCode);
}
else {
@ -407,7 +420,7 @@ static int SetError (TRI_replication_applier_t* applier,
TRI_GetTimeStampReplication(state->_lastError._time, sizeof(state->_lastError._time) - 1);
if (state->_lastError._msg != NULL) {
if (state->_lastError._msg != nullptr) {
TRI_FreeString(TRI_CORE_MEM_ZONE, state->_lastError._msg);
}
@ -610,7 +623,7 @@ static TRI_json_t* JsonState (TRI_replication_applier_state_t const* state) {
progress = TRI_CreateArray2Json(TRI_CORE_MEM_ZONE, 2);
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, progress, "time", TRI_CreateStringCopyJson(TRI_CORE_MEM_ZONE, state->_progressTime));
if (state->_progressMsg != NULL) {
if (state->_progressMsg != nullptr) {
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, progress, "message", TRI_CreateStringCopyJson(TRI_CORE_MEM_ZONE, state->_progressMsg));
}
@ -625,11 +638,11 @@ static TRI_json_t* JsonState (TRI_replication_applier_state_t const* state) {
// lastError
error = TRI_CreateArrayJson(TRI_CORE_MEM_ZONE);
if (error != NULL) {
if (error != nullptr) {
if (state->_lastError._code > 0) {
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, error, "time", TRI_CreateStringCopyJson(TRI_CORE_MEM_ZONE, state->_lastError._time));
if (state->_lastError._msg != NULL) {
if (state->_lastError._msg != nullptr) {
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, error, "errorMessage", TRI_CreateStringCopyJson(TRI_CORE_MEM_ZONE, state->_lastError._msg));
}
}
@ -903,12 +916,12 @@ int TRI_ConfigureReplicationApplier (TRI_replication_applier_t* applier,
return TRI_ERROR_CLUSTER_UNSUPPORTED;
}
if (config->_endpoint == NULL || strlen(config->_endpoint) == 0) {
if (config->_endpoint == nullptr || strlen(config->_endpoint) == 0) {
// no endpoint
return TRI_ERROR_REPLICATION_INVALID_APPLIER_CONFIGURATION;
}
if (config->_database == NULL || strlen(config->_database) == 0) {
if (config->_database == nullptr || strlen(config->_database) == 0) {
// no database
return TRI_ERROR_REPLICATION_INVALID_APPLIER_CONFIGURATION;
}
@ -955,20 +968,20 @@ int TRI_StateReplicationApplier (TRI_replication_applier_t* applier,
state->_totalEvents = applier->_state._totalEvents;
memcpy(&state->_lastError._time, &applier->_state._lastError._time, sizeof(state->_lastError._time));
if (applier->_state._progressMsg != NULL) {
if (applier->_state._progressMsg != nullptr) {
state->_progressMsg = TRI_DuplicateStringZ(TRI_CORE_MEM_ZONE, applier->_state._progressMsg);
}
else {
state->_progressMsg = NULL;
state->_progressMsg = nullptr;
}
memcpy(&state->_progressTime, &applier->_state._progressTime, sizeof(state->_progressTime));
if (applier->_state._lastError._msg != NULL) {
if (applier->_state._lastError._msg != nullptr) {
state->_lastError._msg = TRI_DuplicateStringZ(TRI_CORE_MEM_ZONE, applier->_state._lastError._msg);
}
else {
state->_lastError._msg = NULL;
state->_lastError._msg = nullptr;
}
TRI_ReadUnlockReadWriteLock(&applier->_statusLock);
@ -990,15 +1003,15 @@ TRI_json_t* TRI_JsonReplicationApplier (TRI_replication_applier_t* applier) {
res = TRI_StateReplicationApplier(applier, &state);
if (res != TRI_ERROR_NO_ERROR) {
return NULL;
return nullptr;
}
json = TRI_CreateArrayJson(TRI_CORE_MEM_ZONE);
if (json == NULL) {
if (json == nullptr) {
TRI_DestroyStateReplicationApplier(&state);
return NULL;
return nullptr;
}
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, json, "state", JsonState(&state));
@ -1006,7 +1019,7 @@ TRI_json_t* TRI_JsonReplicationApplier (TRI_replication_applier_t* applier) {
// add server info
server = TRI_CreateArrayJson(TRI_CORE_MEM_ZONE);
if (server != NULL) {
if (server != nullptr) {
TRI_server_id_t serverId;
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, server, "version", TRI_CreateStringCopyJson(TRI_CORE_MEM_ZONE, TRI_VERSION));
@ -1023,11 +1036,11 @@ TRI_json_t* TRI_JsonReplicationApplier (TRI_replication_applier_t* applier) {
TRI_CopyConfigurationReplicationApplier(&applier->_configuration, &config);
TRI_ReadUnlockReadWriteLock(&applier->_statusLock);
if (config._endpoint != NULL) {
if (config._endpoint != nullptr) {
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, json, "endpoint", TRI_CreateStringCopyJson(TRI_CORE_MEM_ZONE, config._endpoint));
}
if (config._database != NULL) {
if (config._database != nullptr) {
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, json, "database", TRI_CreateStringCopyJson(TRI_CORE_MEM_ZONE, config._database));
}
@ -1064,7 +1077,7 @@ void TRI_SetProgressReplicationApplier (TRI_replication_applier_t* applier,
copy = TRI_DuplicateStringZ(TRI_CORE_MEM_ZONE, msg);
if (copy == NULL) {
if (copy == nullptr) {
return;
}
@ -1072,7 +1085,7 @@ void TRI_SetProgressReplicationApplier (TRI_replication_applier_t* applier,
TRI_WriteLockReadWriteLock(&applier->_statusLock);
}
if (applier->_state._progressMsg != NULL) {
if (applier->_state._progressMsg != nullptr) {
TRI_FreeString(TRI_CORE_MEM_ZONE, applier->_state._progressMsg);
}
@ -1096,7 +1109,7 @@ void TRI_InitStateReplicationApplier (TRI_replication_applier_state_t* state) {
state->_active = false;
state->_lastError._code = TRI_ERROR_NO_ERROR;
state->_lastError._msg = NULL;
state->_lastError._msg = nullptr;
state->_lastError._time[0] = '\0';
}
@ -1272,6 +1285,7 @@ void TRI_InitConfigurationReplicationApplier (TRI_replication_applier_configurat
config->_autoStart = false;
config->_chunkSize = 0;
config->_adaptivePolling = true;
config->_ignoreErrors = 0;
}
////////////////////////////////////////////////////////////////////////////////
@ -1306,32 +1320,32 @@ void TRI_DestroyConfigurationReplicationApplier (TRI_replication_applier_configu
void TRI_CopyConfigurationReplicationApplier (TRI_replication_applier_configuration_t const* src,
TRI_replication_applier_configuration_t* dst) {
if (src->_endpoint != NULL) {
if (src->_endpoint != nullptr) {
dst->_endpoint = TRI_DuplicateStringZ(TRI_CORE_MEM_ZONE, src->_endpoint);
}
else {
dst->_endpoint = NULL;
dst->_endpoint = nullptr;
}
if (src->_database != NULL) {
if (src->_database != nullptr) {
dst->_database = TRI_DuplicateStringZ(TRI_CORE_MEM_ZONE, src->_database);
}
else {
dst->_database = NULL;
dst->_database = nullptr;
}
if (src->_username != NULL) {
if (src->_username != nullptr) {
dst->_username = TRI_DuplicateStringZ(TRI_CORE_MEM_ZONE, src->_username);
}
else {
dst->_username = NULL;
dst->_username = nullptr;
}
if (src->_password != NULL) {
if (src->_password != nullptr) {
dst->_password = TRI_DuplicateStringZ(TRI_CORE_MEM_ZONE, src->_password);
}
else {
dst->_password = NULL;
dst->_password = nullptr;
}
dst->_requestTimeout = src->_requestTimeout;
@ -1358,7 +1372,7 @@ int TRI_RemoveConfigurationReplicationApplier (TRI_vocbase_t* vocbase) {
filename = GetConfigurationFilename(vocbase);
if (filename == NULL) {
if (filename == nullptr) {
return TRI_ERROR_OUT_OF_MEMORY;
}
@ -1391,7 +1405,7 @@ int TRI_SaveConfigurationReplicationApplier (TRI_vocbase_t* vocbase,
json = JsonConfiguration(config, true);
if (json == NULL) {
if (json == nullptr) {
return TRI_ERROR_OUT_OF_MEMORY;
}

View File

@ -130,7 +130,14 @@ char const* NameFromCid (TRI_replication_dump_t* dump,
if (name != nullptr) {
// insert into cache
dump->_collectionNames.insert(it, std::make_pair(cid, std::string(name)));
try {
dump->_collectionNames.emplace(std::make_pair(cid, std::string(name)));
}
catch (...) {
TRI_FreeString(TRI_UNKNOWN_MEM_ZONE, name);
return nullptr;
}
TRI_FreeString(TRI_UNKNOWN_MEM_ZONE, name);
// and look it up again
@ -267,6 +274,14 @@ static int AppendContext (TRI_replication_dump_t* dump,
APPEND_UINT64(dump->_buffer, databaseId);
APPEND_STRING(dump->_buffer, "\",\"cid\":\"");
APPEND_UINT64(dump->_buffer, collectionId);
// also include collection name
char const* cname = NameFromCid(dump, collectionId);
if (cname != nullptr) {
APPEND_STRING(dump->_buffer, "\",\"cname\":\"");
APPEND_STRING(dump->_buffer, cname);
}
APPEND_STRING(dump->_buffer, "\",");
return TRI_ERROR_NO_ERROR;