1
0
Fork 0

Merge branch 'sharding' of https://github.com/triAGENS/ArangoDB into sharding

This commit is contained in:
Michael Hackstein 2014-01-23 08:14:22 +01:00
commit eeca60897a
12 changed files with 975 additions and 301 deletions

View File

@ -30,6 +30,7 @@
#include "BasicsC/conversions.h" #include "BasicsC/conversions.h"
#include "BasicsC/json.h" #include "BasicsC/json.h"
#include "BasicsC/json-utilities.h"
#include "BasicsC/logging.h" #include "BasicsC/logging.h"
#include "Basics/JsonHelper.h" #include "Basics/JsonHelper.h"
#include "Basics/ReadLocker.h" #include "Basics/ReadLocker.h"
@ -550,6 +551,7 @@ void ClusterInfo::loadPlannedCollections (bool acquireLock) {
WRITE_LOCKER(_lock); WRITE_LOCKER(_lock);
_collections.clear(); _collections.clear();
_shards.clear();
std::map<std::string, AgencyCommResultEntry>::iterator it = result._values.begin(); std::map<std::string, AgencyCommResultEntry>::iterator it = result._values.begin();
@ -583,6 +585,20 @@ void ClusterInfo::loadPlannedCollections (bool acquireLock) {
(*it).second._json = 0; (*it).second._json = 0;
const CollectionInfo collectionData(json); const CollectionInfo collectionData(json);
vector<string>* shardKeys = new vector<string>;
*shardKeys = collectionData.shardKeys();
_shardKeys.insert(
make_pair<CollectionID, TRI_shared_ptr<vector<string> > >
(collection, TRI_shared_ptr<vector<string> > (shardKeys)));
map<ShardID, ServerID> shardIDs = collectionData.shardIds();
vector<string>* shards = new vector<string>;
map<ShardID, ServerID>::iterator it3;
for (it3 = shardIDs.begin(); it3 != shardIDs.end(); ++it3) {
shards->push_back(it3->first);
}
_shards.insert(
make_pair<CollectionID, TRI_shared_ptr<vector<string> > >
(collection,TRI_shared_ptr<vector<string> >(shards)));
// insert the collection into the existing map // insert the collection into the existing map
@ -969,22 +985,23 @@ int ClusterInfo::dropDatabaseCoordinator (string const& name, string& errorMsg,
return setErrormsg(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND, errorMsg); return setErrormsg(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND, errorMsg);
} }
res = ac.removeValues("Plan/Collections/" + name, true);
if (! res.successful()) {
return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_REMOVE_DATABASE_IN_PLAN,
errorMsg);
}
res = ac.removeValues("Plan/Databases/"+name, false); res = ac.removeValues("Plan/Databases/"+name, false);
if (!res.successful()) { if (!res.successful()) {
if (res._statusCode == rest::HttpResponse::NOT_FOUND) { if (res.httpCode() == (int) rest::HttpResponse::NOT_FOUND) {
return setErrormsg(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND, errorMsg); return setErrormsg(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND, errorMsg);
} }
return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_REMOVE_DATABASE_IN_PLAN, return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_REMOVE_DATABASE_IN_PLAN,
errorMsg); errorMsg);
} }
res = ac.removeValues("Plan/Collections/" + name, true);
if (! res.successful() && res.httpCode() != (int) rest::HttpResponse::NOT_FOUND) {
return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_REMOVE_DATABASE_IN_PLAN,
errorMsg);
}
} }
// Now wait for it to appear and be complete: // Now wait for it to appear and be complete:
@ -1203,6 +1220,134 @@ int ClusterInfo::dropCollectionCoordinator (string const& databaseName,
return setErrormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg); return setErrormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg);
} }
////////////////////////////////////////////////////////////////////////////////
/// @brief set collection properties in coordinator
////////////////////////////////////////////////////////////////////////////////
int ClusterInfo::setCollectionPropertiesCoordinator (string const& databaseName,
string const& collectionID,
TRI_col_info_t const* info) {
AgencyComm ac;
AgencyCommResult res;
AgencyCommLocker locker("Plan", "WRITE");
if (! locker.successful()) {
return TRI_ERROR_CLUSTER_COULD_NOT_LOCK_PLAN;
}
if (! ac.exists("Plan/Databases/" + databaseName)) {
return TRI_ERROR_ARANGO_DATABASE_NOT_FOUND;
}
res = ac.getValues("Plan/Collections/" + databaseName + "/" + collectionID, false);
if (! res.successful()) {
return TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND;
}
res.parse("", false);
std::map<std::string, AgencyCommResultEntry>::const_iterator it = res._values.begin();
if (it == res._values.end()) {
return TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND;
}
TRI_json_t* json = (*it).second._json;
if (json == 0) {
return TRI_ERROR_OUT_OF_MEMORY;
}
TRI_json_t* copy = TRI_CopyJson(TRI_UNKNOWN_MEM_ZONE, json);
if (copy == 0) {
return TRI_ERROR_OUT_OF_MEMORY;
}
TRI_DeleteArrayJson(TRI_UNKNOWN_MEM_ZONE, copy, "doCompact");
TRI_DeleteArrayJson(TRI_UNKNOWN_MEM_ZONE, copy, "journalSize");
TRI_DeleteArrayJson(TRI_UNKNOWN_MEM_ZONE, copy, "waitForSync");
TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, copy, "doCompact", TRI_CreateBooleanJson(TRI_UNKNOWN_MEM_ZONE, info->_doCompact));
TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, copy, "journalSize", TRI_CreateNumberJson(TRI_UNKNOWN_MEM_ZONE, info->_maximalSize));
TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, copy, "waitForSync", TRI_CreateBooleanJson(TRI_UNKNOWN_MEM_ZONE, info->_waitForSync));
res = ac.setValue("Plan/Collections/" + databaseName + "/" + collectionID, copy, 0.0);
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, copy);
if (res.successful()) {
loadPlannedCollections(false);
return TRI_ERROR_NO_ERROR;
}
return TRI_ERROR_INTERNAL;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief set collection status in coordinator
////////////////////////////////////////////////////////////////////////////////
int ClusterInfo::setCollectionStatusCoordinator (string const& databaseName,
string const& collectionID,
TRI_vocbase_col_status_e status) {
AgencyComm ac;
AgencyCommResult res;
AgencyCommLocker locker("Plan", "WRITE");
if (! locker.successful()) {
return TRI_ERROR_CLUSTER_COULD_NOT_LOCK_PLAN;
}
if (! ac.exists("Plan/Databases/" + databaseName)) {
return TRI_ERROR_ARANGO_DATABASE_NOT_FOUND;
}
res = ac.getValues("Plan/Collections/" + databaseName + "/" + collectionID, false);
if (! res.successful()) {
return TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND;
}
res.parse("", false);
std::map<std::string, AgencyCommResultEntry>::const_iterator it = res._values.begin();
if (it == res._values.end()) {
return TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND;
}
TRI_json_t* json = (*it).second._json;
if (json == 0) {
return TRI_ERROR_OUT_OF_MEMORY;
}
TRI_vocbase_col_status_e old = triagens::basics::JsonHelper::getNumericValue<TRI_vocbase_col_status_e>(json, "status", TRI_VOC_COL_STATUS_CORRUPTED);
if (old == status) {
// no status change
return TRI_ERROR_NO_ERROR;
}
TRI_json_t* copy = TRI_CopyJson(TRI_UNKNOWN_MEM_ZONE, json);
if (copy == 0) {
return TRI_ERROR_OUT_OF_MEMORY;
}
TRI_DeleteArrayJson(TRI_UNKNOWN_MEM_ZONE, copy, "status");
TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, copy, "status", TRI_CreateNumberJson(TRI_UNKNOWN_MEM_ZONE, status));
res = ac.setValue("Plan/Collections/" + databaseName + "/" + collectionID, copy, 0.0);
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, copy);
if (res.successful()) {
loadPlannedCollections(false);
return TRI_ERROR_NO_ERROR;
}
return TRI_ERROR_INTERNAL;
}
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief (re-)load the information about servers from the agency /// @brief (re-)load the information about servers from the agency
/// Usually one does not have to call this directly. /// Usually one does not have to call this directly.
@ -1408,6 +1553,69 @@ ServerID ClusterInfo::getResponsibleServer (ShardID const& shardID) {
return ServerID(""); return ServerID("");
} }
////////////////////////////////////////////////////////////////////////////////
/// @brief find the shard that is responsible for a document, which is given
/// as a TRI_json_t const*.
///
/// There are two modes, one assumes that the document is given as a
/// whole (`docComplete`==`true`), in this case, the non-existence of
/// values for some of the sharding attributes is silently ignored
/// and treated as if these values were `null`. In the second mode
/// (`docComplete`==false) leads to an error which is reported by
/// returning an empty string as the shardID.
////////////////////////////////////////////////////////////////////////////////
ShardID ClusterInfo::getResponsibleShard (CollectionID const& collectionID,
TRI_json_t const* json,
bool docComplete) {
// Note that currently we take the number of shards and the shardKeys
// from Plan, since they are immutable. Later we will have to switch
// this to Current, when we allow to add and remove shards.
if (!_collectionsValid) {
loadPlannedCollections();
}
int tries = 0;
TRI_shared_ptr<vector<string> > shardKeysPtr;
char const** shardKeys = 0;
int nrShardKeys = 0;
TRI_shared_ptr<vector<ShardID> > shards;
while (++tries <= 2) {
{
// Get the sharding keys and the number of shards:
READ_LOCKER(_lock);
map<CollectionID, TRI_shared_ptr<vector<string> > >::iterator it
= _shards.find(collectionID);
if (it != _shards.end()) {
shards = it->second;
map<CollectionID, TRI_shared_ptr<vector<string> > >::iterator it2
= _shardKeys.find(collectionID);
if (it2 != _shardKeys.end()) {
shardKeysPtr = it2->second;
shardKeys = new char const * [shardKeysPtr->size()];
if (shardKeys != 0) {
size_t i;
for (i = 0; i < shardKeysPtr->size(); ++i) {
shardKeys[i] = shardKeysPtr->at(i).c_str();
}
break; // all OK
}
}
}
}
loadPlannedCollections();
}
if (0 == shardKeys) {
return string("");
}
uint64_t hash = TRI_HashJsonByAttributes(json, shardKeys, nrShardKeys);
delete[] shardKeys;
return shards->at(hash % shards->size());
}
// Local Variables: // Local Variables:
// mode: outline-minor // mode: outline-minor
// outline-regexp: "^\\(/// @brief\\|/// {@inheritDoc}\\|/// @addtogroup\\|// --SECTION--\\|/// @\\}\\)" // outline-regexp: "^\\(/// @brief\\|/// {@inheritDoc}\\|/// @addtogroup\\|// --SECTION--\\|/// @\\}\\)"

View File

@ -837,6 +837,22 @@ namespace triagens {
string& errorMsg, string& errorMsg,
double timeout); double timeout);
////////////////////////////////////////////////////////////////////////////////
/// @brief set collection properties in coordinator
////////////////////////////////////////////////////////////////////////////////
int setCollectionPropertiesCoordinator (string const& databaseName,
string const& collectionID,
TRI_col_info_t const*);
////////////////////////////////////////////////////////////////////////////////
/// @brief set collection status in coordinator
////////////////////////////////////////////////////////////////////////////////
int setCollectionStatusCoordinator (string const& databaseName,
string const& collectionID,
TRI_vocbase_col_status_e status);
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief (re-)load the information about all DBservers from the agency /// @brief (re-)load the information about all DBservers from the agency
/// Usually one does not have to call this directly. /// Usually one does not have to call this directly.
@ -881,6 +897,13 @@ namespace triagens {
ServerID getResponsibleServer (ShardID const&); ServerID getResponsibleServer (ShardID const&);
////////////////////////////////////////////////////////////////////////////////
/// @brief find the shard that is responsible for a document
////////////////////////////////////////////////////////////////////////////////
ShardID getResponsibleShard (CollectionID const&, TRI_json_t const*,
bool docComplete);
private: private:
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -948,7 +971,14 @@ namespace triagens {
// from Current/DBServers // from Current/DBServers
bool _DBServersValid; bool _DBServersValid;
std::map<ShardID, ServerID> _shardIds; std::map<ShardID, ServerID> _shardIds;
// from Plan/Collections/ ??? // from Current/Collections/
std::map<CollectionID, TRI_shared_ptr<std::vector<std::string> > >
_shards;
// from Plan/Collections/
// (may later come from Current/Colletions/ )
std::map<CollectionID, TRI_shared_ptr<std::vector<std::string> > >
_shardKeys;
// from Plan/Collections/
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
// --SECTION-- private static variables // --SECTION-- private static variables

View File

@ -37,6 +37,12 @@
#include "VocBase/vocbase.h" #include "VocBase/vocbase.h"
#include "Utils/Barrier.h" #include "Utils/Barrier.h"
#ifdef TRI_ENABLE_CLUSTER
#include "Cluster/ServerState.h"
#include "Cluster/ClusterInfo.h"
#include "Cluster/ClusterComm.h"
#endif
using namespace std; using namespace std;
using namespace triagens::basics; using namespace triagens::basics;
using namespace triagens::rest; using namespace triagens::rest;
@ -317,6 +323,12 @@ bool RestDocumentHandler::createDocument () {
return false; return false;
} }
#ifdef TRI_ENABLE_CLUSTER
if (ServerState::instance()->isCoordinator()) {
return createDocumentCoordinator(collection, waitForSync, json);
}
#endif
if (! checkCreateCollection(collection, getCollectionType())) { if (! checkCreateCollection(collection, getCollectionType())) {
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
return false; return false;
@ -384,6 +396,23 @@ bool RestDocumentHandler::createDocument () {
return true; return true;
} }
////////////////////////////////////////////////////////////////////////////////
/// @brief creates a document, coordinator case in a cluster
////////////////////////////////////////////////////////////////////////////////
#ifdef TRI_ENABLE_CLUSTER
bool RestDocumentHandler::createDocumentCoordinator (char const* collection,
bool waitForSync,
TRI_json_t* json) {
// Find collectionID from collection, which is the name
// ask ClusterInfo for the responsible shard
// send a synchronous request to that shard using ClusterComm
// if not successful prepare error and return false
// prepare successful answer (created or accepted depending on waitForSync)
return true;
}
#endif
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief reads a single or all documents /// @brief reads a single or all documents
/// ///

View File

@ -113,6 +113,14 @@ namespace triagens {
return TRI_COL_TYPE_DOCUMENT; return TRI_COL_TYPE_DOCUMENT;
} }
////////////////////////////////////////////////////////////////////////////////
/// @brief creates a document, coordinator case in a cluster
////////////////////////////////////////////////////////////////////////////////
bool createDocumentCoordinator (char const* collection,
bool waitForSync,
TRI_json_t* json);
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief creates a document /// @brief creates a document
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////

View File

@ -2031,13 +2031,13 @@ static v8::Handle<v8::Value> CreateVocBase (v8::Arguments const& argv,
#ifdef TRI_ENABLE_CLUSTER #ifdef TRI_ENABLE_CLUSTER
if (ServerState::instance()->isCoordinator()) { if (ServerState::instance()->isCoordinator()) {
char const* originalDatabase = GetCurrentDatabaseName(); char const* databaseName = GetCurrentDatabaseName();
if (! ClusterInfo::instance()->doesDatabaseExist(originalDatabase)) { if (! ClusterInfo::instance()->doesDatabaseExist(databaseName)) {
TRI_FreeCollectionInfoOptions(&parameter); TRI_FreeCollectionInfoOptions(&parameter);
TRI_V8_EXCEPTION_PARAMETER(scope, "selected database is not a cluster database"); TRI_V8_EXCEPTION_PARAMETER(scope, "selected database is not a cluster database");
} }
v8::Handle<v8::Value> result = CreateCollectionCoordinator(argv, collectionType, originalDatabase, parameter, vocbase); v8::Handle<v8::Value> result = CreateCollectionCoordinator(argv, collectionType, databaseName, parameter, vocbase);
TRI_FreeCollectionInfoOptions(&parameter); TRI_FreeCollectionInfoOptions(&parameter);
return scope.Close(result); return scope.Close(result);
@ -4902,7 +4902,12 @@ static v8::Handle<v8::Value> JS_CountVocbaseCol (v8::Arguments const& argv) {
TRI_V8_EXCEPTION_INTERNAL(scope, "cannot extract collection"); TRI_V8_EXCEPTION_INTERNAL(scope, "cannot extract collection");
} }
TRI_SHARDING_COLLECTION_NOT_YET_IMPLEMENTED(scope, collection); #ifdef TRI_ENABLE_CLUSTER
if (ServerState::instance()->isCoordinator()) {
// TODO: fix this
return scope.Close(v8::Number::New(0));
}
#endif
CollectionNameResolver resolver(collection->_vocbase); CollectionNameResolver resolver(collection->_vocbase);
ReadTransactionType trx(collection->_vocbase, resolver, collection->_cid); ReadTransactionType trx(collection->_vocbase, resolver, collection->_cid);
@ -6114,6 +6119,31 @@ static v8::Handle<v8::Value> JS_GetIndexesVocbaseCol (v8::Arguments const& argv)
static v8::Handle<v8::Value> JS_LoadVocbaseCol (v8::Arguments const& argv) { static v8::Handle<v8::Value> JS_LoadVocbaseCol (v8::Arguments const& argv) {
v8::HandleScope scope; v8::HandleScope scope;
#ifdef TRI_ENABLE_CLUSTER
if (ServerState::instance()->isCoordinator()) {
TRI_vocbase_col_t const* collection = TRI_UnwrapClass<TRI_vocbase_col_t>(argv.Holder(), WRP_VOCBASE_COL_TYPE);
if (collection == 0) {
TRI_V8_EXCEPTION_INTERNAL(scope, "cannot extract collection");
}
string const databaseName(collection->_dbName);
if (! ClusterInfo::instance()->doesDatabaseExist(databaseName)) {
TRI_V8_EXCEPTION_PARAMETER(scope, "selected database is not a cluster database");
}
const std::string cid = StringUtils::itoa(collection->_cid);
int res = ClusterInfo::instance()->setCollectionStatusCoordinator(databaseName, cid, TRI_VOC_COL_STATUS_LOADED);
if (res != TRI_ERROR_NO_ERROR) {
TRI_V8_EXCEPTION(scope, res);
}
return scope.Close(v8::Undefined());
}
#endif
v8::Handle<v8::Object> err; v8::Handle<v8::Object> err;
TRI_vocbase_col_t const* collection = UseCollection(argv.Holder(), &err); TRI_vocbase_col_t const* collection = UseCollection(argv.Holder(), &err);
@ -6251,8 +6281,64 @@ static v8::Handle<v8::Value> JS_PropertiesVocbaseCol (v8::Arguments const& argv)
#ifdef TRI_ENABLE_CLUSTER #ifdef TRI_ENABLE_CLUSTER
if (! collection->_isLocal) { if (! collection->_isLocal) {
char const* originalDatabase = GetCurrentDatabaseName(); std::string const databaseName = std::string(collection->_dbName);
TRI_col_info_t info = ClusterInfo::instance()->getCollectionProperties(std::string(originalDatabase), StringUtils::itoa(collection->_cid)); TRI_col_info_t info = ClusterInfo::instance()->getCollectionProperties(databaseName, StringUtils::itoa(collection->_cid));
if (0 < argv.Length()) {
v8::Handle<v8::Value> par = argv[0];
if (par->IsObject()) {
v8::Handle<v8::Object> po = par->ToObject();
// extract doCompact flag
if (po->Has(v8g->DoCompactKey)) {
info._doCompact = TRI_ObjectToBoolean(po->Get(v8g->DoCompactKey));
}
// extract sync flag
if (po->Has(v8g->WaitForSyncKey)) {
info._waitForSync = TRI_ObjectToBoolean(po->Get(v8g->WaitForSyncKey));
}
// extract the journal size
if (po->Has(v8g->JournalSizeKey)) {
info._maximalSize = (TRI_voc_size_t) TRI_ObjectToUInt64(po->Get(v8g->JournalSizeKey), false);
if (info._maximalSize < TRI_JOURNAL_MINIMAL_SIZE) {
if (info._keyOptions != 0) {
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, info._keyOptions);
}
TRI_V8_EXCEPTION_PARAMETER(scope, "<properties>.journalSize too small");
}
}
if (po->Has(v8g->IsVolatileKey)) {
if (TRI_ObjectToBoolean(po->Get(v8g->IsVolatileKey)) != info._isVolatile) {
if (info._keyOptions != 0) {
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, info._keyOptions);
}
TRI_V8_EXCEPTION_PARAMETER(scope, "isVolatile option cannot be changed at runtime");
}
}
if (info._isVolatile && info._waitForSync) {
if (info._keyOptions != 0) {
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, info._keyOptions);
}
TRI_V8_EXCEPTION_PARAMETER(scope, "volatile collections do not support the waitForSync option");
}
}
int res = ClusterInfo::instance()->setCollectionPropertiesCoordinator(databaseName, StringUtils::itoa(collection->_cid), &info);
if (res != TRI_ERROR_NO_ERROR) {
if (info._keyOptions != 0) {
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, info._keyOptions);
}
TRI_V8_EXCEPTION(scope, res);
}
}
// return the current parameter set // return the current parameter set
v8::Handle<v8::Object> result = v8::Object::New(); v8::Handle<v8::Object> result = v8::Object::New();
@ -7015,13 +7101,13 @@ static v8::Handle<v8::Value> JS_StatusVocbaseCol (v8::Arguments const& argv) {
#ifdef TRI_ENABLE_CLUSTER #ifdef TRI_ENABLE_CLUSTER
if (ServerState::instance()->isCoordinator()) { if (ServerState::instance()->isCoordinator()) {
char const* originalDatabase = GetCurrentDatabaseName(); std::string const databaseName = std::string(collection->_dbName);
if (! ClusterInfo::instance()->doesDatabaseExist(originalDatabase)) { if (! ClusterInfo::instance()->doesDatabaseExist(databaseName)) {
TRI_V8_EXCEPTION_PARAMETER(scope, "selected database is not a cluster database"); TRI_V8_EXCEPTION_PARAMETER(scope, "selected database is not a cluster database");
} }
CollectionInfo const& ci = ClusterInfo::instance()->getCollection(originalDatabase, StringUtils::itoa(collection->_cid)); CollectionInfo const& ci = ClusterInfo::instance()->getCollection(databaseName, StringUtils::itoa(collection->_cid));
return scope.Close(v8::Number::New((int) ci.status())); return scope.Close(v8::Number::New((int) ci.status()));
} }
// fallthru intentional // fallthru intentional
@ -7138,13 +7224,13 @@ static v8::Handle<v8::Value> JS_TypeVocbaseCol (v8::Arguments const& argv) {
#ifdef TRI_ENABLE_CLUSTER #ifdef TRI_ENABLE_CLUSTER
if (ServerState::instance()->isCoordinator()) { if (ServerState::instance()->isCoordinator()) {
char const* originalDatabase = GetCurrentDatabaseName(); std::string const databaseName = std::string(collection->_dbName);
if (! ClusterInfo::instance()->doesDatabaseExist(originalDatabase)) { if (! ClusterInfo::instance()->doesDatabaseExist(databaseName)) {
TRI_V8_EXCEPTION_PARAMETER(scope, "selected database is not a cluster database"); TRI_V8_EXCEPTION_PARAMETER(scope, "selected database is not a cluster database");
} }
CollectionInfo const& ci = ClusterInfo::instance()->getCollection(originalDatabase, StringUtils::itoa(collection->_cid)); CollectionInfo const& ci = ClusterInfo::instance()->getCollection(databaseName, StringUtils::itoa(collection->_cid));
return scope.Close(v8::Number::New((int) ci.type())); return scope.Close(v8::Number::New((int) ci.type()));
} }
// fallthru intentional // fallthru intentional
@ -7180,12 +7266,29 @@ static v8::Handle<v8::Value> JS_UnloadVocbaseCol (v8::Arguments const& argv) {
TRI_V8_EXCEPTION_INTERNAL(scope, "cannot extract collection"); TRI_V8_EXCEPTION_INTERNAL(scope, "cannot extract collection");
} }
TRI_SHARDING_COLLECTION_NOT_YET_IMPLEMENTED(scope, collection); int res;
int res = TRI_UnloadCollectionVocBase(collection->_vocbase, collection, false); #ifdef TRI_ENABLE_CLUSTER
if (ServerState::instance()->isCoordinator()) {
string const databaseName(collection->_dbName);
if (! ClusterInfo::instance()->doesDatabaseExist(databaseName)) {
TRI_V8_EXCEPTION_PARAMETER(scope, "selected database is not a cluster database");
}
res = ClusterInfo::instance()->setCollectionStatusCoordinator(databaseName, StringUtils::itoa(collection->_cid), TRI_VOC_COL_STATUS_UNLOADED);
}
else {
res = TRI_UnloadCollectionVocBase(collection->_vocbase, collection, false);
}
#else
res = TRI_UnloadCollectionVocBase(collection->_vocbase, collection, false);
#endif
if (res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_NO_ERROR) {
TRI_V8_EXCEPTION_MESSAGE(scope, res, "cannot unload collection"); TRI_V8_EXCEPTION(scope, res);
} }
return scope.Close(v8::Undefined()); return scope.Close(v8::Undefined());
@ -7512,13 +7615,13 @@ static v8::Handle<v8::Value> JS_CollectionsVocbase (v8::Arguments const& argv) {
// if we are a coordinator, we need to fetch the collection info from the agency // if we are a coordinator, we need to fetch the collection info from the agency
if (ServerState::instance()->isCoordinator()) { if (ServerState::instance()->isCoordinator()) {
char const* originalDatabase = GetCurrentDatabaseName(); char const* databaseName = GetCurrentDatabaseName();
if (! ClusterInfo::instance()->doesDatabaseExist(originalDatabase)) { if (! ClusterInfo::instance()->doesDatabaseExist(databaseName)) {
TRI_V8_EXCEPTION_PARAMETER(scope, "selected database is not a cluster database"); TRI_V8_EXCEPTION_PARAMETER(scope, "selected database is not a cluster database");
} }
colls = GetCollectionsCluster(vocbase, originalDatabase); colls = GetCollectionsCluster(vocbase, databaseName);
} }
else { else {
colls = TRI_CollectionsVocBase(vocbase); colls = TRI_CollectionsVocBase(vocbase);
@ -7573,10 +7676,10 @@ static v8::Handle<v8::Value> JS_CompletionsVocbase (v8::Arguments const& argv) {
#ifdef TRI_ENABLE_CLUSTER #ifdef TRI_ENABLE_CLUSTER
TRI_vector_string_t names; TRI_vector_string_t names;
if (ServerState::instance()->isCoordinator()) { if (ServerState::instance()->isCoordinator()) {
char const* originalDatabase = GetCurrentDatabaseName(); char const* databaseName = GetCurrentDatabaseName();
if (ClusterInfo::instance()->doesDatabaseExist(originalDatabase)) { if (ClusterInfo::instance()->doesDatabaseExist(databaseName)) {
names = GetCollectionNamesCluster(vocbase, originalDatabase); names = GetCollectionNamesCluster(vocbase, databaseName);
} }
else { else {
TRI_InitVectorString(&names, TRI_UNKNOWN_MEM_ZONE); TRI_InitVectorString(&names, TRI_UNKNOWN_MEM_ZONE);

View File

@ -17,8 +17,8 @@ curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Target/MapIDToEndpoint/Per
curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Target/MapIDToEndpoint/Claus" -d "value=\"tcp://127.0.0.1:8529\"" || exit 1 curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Target/MapIDToEndpoint/Claus" -d "value=\"tcp://127.0.0.1:8529\"" || exit 1
#curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Current/Collections/@Usystem/5678" -d 'value={"status":3,"shards":{"shardBlubb": "Pavel"},"shardKeys":["xyz"],"indexes":{},"name":"testCollection","type":2,"id":"5678","doCompact":true,"isSystem":false,"isVolatile":false,"waitForSync":false,"maximalSize":1048576,"keyOptions":{"type":"traditional","allowUserKeys":true}}' || exit 1 #curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Current/Collections/@Usystem/5678" -d 'value={"status":3,"shards":{"shardBlubb": "Pavel"},"shardKeys":["xyz"],"indexes":{},"name":"testCollection","type":2,"id":"5678","doCompact":true,"isSystem":false,"isVolatile":false,"waitForSync":false,"maximalSize":1048576,"keyOptions":{"type":"traditional","allowUserKeys":true}}' || exit 1
curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Current/Databases/@Usystem/Pavel" -d 'value={"name":"system"}}' || exit 1 #curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Current/Databases/@Usystem/Pavel" -d 'value={"name":"system"}}' || exit 1
curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Current/Databases/@Usystem/Perry" -d 'value={"name":"system"}}' || exit 1 #curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Current/Databases/@Usystem/Perry" -d 'value={"name":"system"}}' || exit 1
echo echo
echo start arangod with: echo start arangod with:

View File

@ -1,5 +1,5 @@
/*jslint indent: 2, nomen: true, maxlen: 150, sloppy: true, vars: true, white: true, plusplus: true, stupid: true */ /*jslint indent: 2, nomen: true, maxlen: 150, sloppy: true, vars: true, white: true, plusplus: true, stupid: true */
/*global require */ /*global require, ArangoAgency */
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief database management /// @brief database management
@ -30,6 +30,7 @@
var arangodb = require("org/arangodb"); var arangodb = require("org/arangodb");
var actions = require("org/arangodb/actions"); var actions = require("org/arangodb/actions");
var cluster = require("org/arangodb/cluster");
var API = "_api/database"; var API = "_api/database";
@ -179,6 +180,13 @@ function get_api_database (req, res) {
result = arangodb.db._listDatabases(username, password, auth); result = arangodb.db._listDatabases(username, password, auth);
} }
else if (req.suffix[0] === 'current') { else if (req.suffix[0] === 'current') {
if (cluster.isCoordinator()) {
// fetch database information from Agency
var values = ArangoAgency.get("Plan/Databases/" + req.originalDatabase, false);
// TODO: check if this information is sufficient
result = values["Plan/Databases/" + req.originalDatabase];
}
else {
// information about the current database // information about the current database
result = { result = {
name: arangodb.db._name(), name: arangodb.db._name(),
@ -187,6 +195,7 @@ function get_api_database (req, res) {
isSystem: arangodb.db._isSystem() isSystem: arangodb.db._isSystem()
}; };
} }
}
else { else {
actions.resultBad(req, res, arangodb.ERROR_HTTP_BAD_PARAMETER); actions.resultBad(req, res, arangodb.ERROR_HTTP_BAD_PARAMETER);
return; return;

View File

@ -37,7 +37,26 @@ var ArangoCollection = arangodb.ArangoCollection;
/// @brief get values from Plan or Current by a prefix /// @brief get values from Plan or Current by a prefix
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
function getByPrefix (values, prefix, multiDimensional) { function getByPrefix (values, prefix) {
var result = { };
var a;
var n = prefix.length;
for (a in values) {
if (values.hasOwnProperty(a)) {
if (a.substr(0, n) === prefix) {
result[a.substr(n)] = values[a];
}
}
}
return result;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief get values from Plan or Current by a prefix
////////////////////////////////////////////////////////////////////////////////
function getByPrefix3d (values, prefix) {
var result = { }; var result = { };
var a; var a;
var n = prefix.length; var n = prefix.length;
@ -46,16 +65,41 @@ function getByPrefix (values, prefix, multiDimensional) {
if (values.hasOwnProperty(a)) { if (values.hasOwnProperty(a)) {
if (a.substr(0, n) === prefix) { if (a.substr(0, n) === prefix) {
var key = a.substr(n); var key = a.substr(n);
if (multiDimensional) {
var parts = key.split('/'); var parts = key.split('/');
if (parts.length >= 2) {
if (! result.hasOwnProperty(parts[0])) { if (! result.hasOwnProperty(parts[0])) {
result[parts[0]] = { }; result[parts[0]] = { };
} }
result[parts[0]][parts[1]] = values[a]; result[parts[0]][parts[1]] = values[a];
} }
else { }
result[key] = values[a]; }
}
return result;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief get values from Plan or Current by a prefix
////////////////////////////////////////////////////////////////////////////////
function getByPrefix4d (values, prefix) {
var result = { };
var a;
var n = prefix.length;
for (a in values) {
if (values.hasOwnProperty(a)) {
if (a.substr(0, n) === prefix) {
var key = a.substr(n);
var parts = key.split('/');
if (parts.length >= 3) {
if (! result.hasOwnProperty(parts[0])) {
result[parts[0]] = { };
}
if (! result[parts[0]].hasOwnProperty(parts[1])) {
result[parts[0]][parts[1]] = { };
}
result[parts[0]][parts[1]][parts[2]] = values[a];
} }
} }
} }
@ -190,32 +234,33 @@ function createLocalDatabases (plannedDatabases) {
// check which databases need to be created locally // check which databases need to be created locally
for (name in plannedDatabases) { for (name in plannedDatabases) {
if (plannedDatabases.hasOwnProperty(name)) { if (plannedDatabases.hasOwnProperty(name)) {
var payload = plannedDatabases[name];
payload.error = false;
payload.errorNum = 0;
payload.errorMessage = "no error";
if (! localDatabases.hasOwnProperty(name)) { if (! localDatabases.hasOwnProperty(name)) {
// must create database // must create database
var payload = plannedDatabases[name];
// TODO: handle options and user information // TODO: handle options and user information
console.info("creating local database '%s'", payload.name); console.info("creating local database '%s'", payload.name);
try { try {
db._createDatabase(payload.name); db._createDatabase(payload.name);
payload.error = false;
payload.errorNum = 0;
payload.errorMessage = "no error";
} }
catch (err) { catch (err) {
payload.error = true; payload.error = true;
payload.errorNum = err.errorNum; payload.errorNum = err.errorNum;
payload.errorMessage = err.errorMessage; payload.errorMessage = err.errorMessage;
} }
}
writeLocked({ part: "Current" }, writeLocked({ part: "Current" },
createDatabaseAgency, createDatabaseAgency,
[ payload ]); [ payload ]);
} }
} }
}
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -275,7 +320,7 @@ function cleanupCurrentDatabases () {
db._useDatabase("_system"); db._useDatabase("_system");
var all = ArangoAgency.get("Current/Databases", true); var all = ArangoAgency.get("Current/Databases", true);
var currentDatabases = getByPrefix(all, "Current/Databases/", true); var currentDatabases = getByPrefix3d(all, "Current/Databases/");
var localDatabases = getLocalDatabases(); var localDatabases = getLocalDatabases();
var name; var name;
@ -286,7 +331,7 @@ function cleanupCurrentDatabases () {
if (currentDatabases[name].hasOwnProperty(ourselves)) { if (currentDatabases[name].hasOwnProperty(ourselves)) {
// we are entered for a database that we don't have locally // we are entered for a database that we don't have locally
console.info("remvoing entry for local database '%s'", name); console.info("cleaning up entry for unknown database '%s'", name);
writeLocked({ part: "Current" }, writeLocked({ part: "Current" },
dropDatabaseAgency, dropDatabaseAgency,
@ -387,6 +432,29 @@ function createLocalCollections (plannedCollections) {
[ database, shard, payload ]); [ database, shard, payload ]);
} }
else { else {
if (localCollections[shard].status !== payload.status) {
console.info("detected status change for local shard '%s/%s'",
database,
shard);
if (payload.status === ArangoCollection.STATUS_UNLOADED) {
console.info("unloading local shard '%s/%s'",
database,
shard);
db._collection(shard).unload();
}
else if (payload.status === ArangoCollection.STATUS_LOADED) {
console.info("loading local shard '%s/%s'",
database,
shard);
db._collection(shard).load();
}
writeLocked({ part: "Current" },
createCollectionAgency,
[ database, shard, payload ]);
}
// collection exists, now compare collection properties // collection exists, now compare collection properties
var properties = { }; var properties = { };
var cmp = [ "journalSize", "waitForSync", "doCompact" ], i; var cmp = [ "journalSize", "waitForSync", "doCompact" ], i;
@ -408,7 +476,6 @@ function createLocalCollections (plannedCollections) {
payload.error = false; payload.error = false;
payload.errorNum = 0; payload.errorNum = 0;
payload.errorMessage = "no error"; payload.errorMessage = "no error";
} }
catch (err3) { catch (err3) {
payload.error = true; payload.error = true;
@ -509,8 +576,60 @@ function dropLocalCollections (plannedCollections) {
} }
} }
function cleanupCurrentCollections () { ////////////////////////////////////////////////////////////////////////////////
/// @brief clean up what's in Current/Collections for ourselves
////////////////////////////////////////////////////////////////////////////////
function cleanupCurrentCollections (plannedCollections) {
var ourselves = ArangoServerState.id();
var dropCollectionAgency = function (database, collection, shardID) {
try {
ArangoAgency.remove("Current/Collections/" + database + "/" + collection + "/" + shardID);
}
catch (err) {
// ignore errors
}
};
db._useDatabase("_system"); db._useDatabase("_system");
var all = ArangoAgency.get("Current/Collections", true);
var currentCollections = getByPrefix4d(all, "Current/Collections/");
var shardMap = getShardMap(plannedCollections);
var database;
for (database in currentCollections) {
if (currentCollections.hasOwnProperty(database)) {
var collections = currentCollections[database];
var collection;
for (collection in collections) {
if (collections.hasOwnProperty(collection)) {
var shards = collections[collection];
var shard;
for (shard in shards) {
if (shards.hasOwnProperty(shard)) {
if (! shardMap.hasOwnProperty(shard) ||
shardMap[shard] !== ourselves) {
console.info("cleaning up entry for unknown shard '%s' of '%s/%s",
shard,
database,
collection);
writeLocked({ part: "Current" },
dropCollectionAgency,
[ database, collection, shard ]);
}
}
}
}
}
}
}
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -518,11 +637,11 @@ function cleanupCurrentCollections () {
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
function handleCollectionChanges (plan, current) { function handleCollectionChanges (plan, current) {
var plannedCollections = getByPrefix(plan, "Plan/Collections/", true); var plannedCollections = getByPrefix3d(plan, "Plan/Collections/");
createLocalCollections(plannedCollections); createLocalCollections(plannedCollections);
dropLocalCollections(plannedCollections); dropLocalCollections(plannedCollections);
cleanupCurrentCollections(); cleanupCurrentCollections(plannedCollections);
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////

View File

@ -47,6 +47,7 @@
var fs = require("fs"); var fs = require("fs");
var console = require("console"); var console = require("console");
var userManager = require("org/arangodb/users"); var userManager = require("org/arangodb/users");
var cluster = require("org/arangodb/cluster");
var db = internal.db; var db = internal.db;
// whether or not we are initialising an empty / a new database // whether or not we are initialising an empty / a new database
@ -259,11 +260,14 @@
}); });
} }
if (! cluster.isCoordinator()) {
// set up the collection _users // set up the collection _users
addTask("setupUsers", "setup _users collection", function () { addTask("setupUsers", "setup _users collection", function () {
return createSystemCollection("_users", { waitForSync : true }); return createSystemCollection("_users", { waitForSync : true });
}); });
}
if (! cluster.isCoordinator()) {
// create a unique index on "user" attribute in _users // create a unique index on "user" attribute in _users
addTask("createUsersIndex", addTask("createUsersIndex",
"create index on 'user' attribute in _users collection", "create index on 'user' attribute in _users collection",
@ -276,8 +280,11 @@
users.ensureUniqueConstraint("user"); users.ensureUniqueConstraint("user");
return true; return true;
}); }
);
}
if (! cluster.isCoordinator()) {
// add a default root user with no passwd // add a default root user with no passwd
addTask("addDefaultUser", "add default root user", function () { addTask("addDefaultUser", "add default root user", function () {
var users = getCollection("_users"); var users = getCollection("_users");
@ -298,12 +305,16 @@
return true; return true;
}); });
}
if (! cluster.isCoordinator()) {
// set up the collection _graphs // set up the collection _graphs
addTask("setupGraphs", "setup _graphs collection", function () { addTask("setupGraphs", "setup _graphs collection", function () {
return createSystemCollection("_graphs", { waitForSync : true }); return createSystemCollection("_graphs", { waitForSync : true });
}); });
}
if (! cluster.isCoordinator()) {
// create a unique index on name attribute in _graphs // create a unique index on name attribute in _graphs
addTask("createGraphsIndex", addTask("createGraphsIndex",
"create index on name attribute in _graphs collection", "create index on name attribute in _graphs collection",
@ -317,8 +328,11 @@
graphs.ensureUniqueConstraint("name"); graphs.ensureUniqueConstraint("name");
return true; return true;
}); }
);
}
if (! cluster.isCoordinator()) {
// make distinction between document and edge collections // make distinction between document and edge collections
addUpgradeTask("addCollectionVersion", addUpgradeTask("addCollectionVersion",
"set new collection type for edge collections and update collection version", "set new collection type for edge collections and update collection version",
@ -375,19 +389,26 @@
} }
return true; return true;
}); }
);
}
if (! cluster.isCoordinator()) {
// create the _modules collection // create the _modules collection
addTask("createModules", "setup _modules collection", function () { addTask("createModules", "setup _modules collection", function () {
return createSystemCollection("_modules"); return createSystemCollection("_modules");
}); });
}
if (! cluster.isCoordinator()) {
// create the _routing collection // create the _routing collection
addTask("createRouting", "setup _routing collection", function () { addTask("createRouting", "setup _routing collection", function () {
// needs to be big enough for assets // needs to be big enough for assets
return createSystemCollection("_routing", { journalSize: 32 * 1024 * 1024 }); return createSystemCollection("_routing", { journalSize: 32 * 1024 * 1024 });
}); });
}
if (! cluster.isCoordinator()) {
// create the default route in the _routing collection // create the default route in the _routing collection
addTask("insertRedirectionsAll", "insert default routes for admin interface", function () { addTask("insertRedirectionsAll", "insert default routes for admin interface", function () {
var routing = getCollection("_routing"); var routing = getCollection("_routing");
@ -425,7 +446,9 @@
return true; return true;
}); });
}
if (! cluster.isCoordinator()) {
// update markers in all collection datafiles to key markers // update markers in all collection datafiles to key markers
addUpgradeTask("upgradeMarkers12", "update markers in all collection datafiles", function () { addUpgradeTask("upgradeMarkers12", "update markers in all collection datafiles", function () {
var collections = db._collections(); var collections = db._collections();
@ -462,7 +485,9 @@
return true; return true;
}); });
}
if (! cluster.isCoordinator()) {
// set up the collection _aal // set up the collection _aal
addTask("setupAal", "setup _aal collection", function () { addTask("setupAal", "setup _aal collection", function () {
return createSystemCollection("_aal", { waitForSync : true }); return createSystemCollection("_aal", { waitForSync : true });
@ -482,22 +507,30 @@
return true; return true;
}); });
}
if (! cluster.isCoordinator()) {
// set up the collection _aqlfunctions // set up the collection _aqlfunctions
addTask("setupAqlFunctions", "setup _aqlfunctions collection", function () { addTask("setupAqlFunctions", "setup _aqlfunctions collection", function () {
return createSystemCollection("_aqlfunctions"); return createSystemCollection("_aqlfunctions");
}); });
}
if (! cluster.isCoordinator()) {
// set up the collection _trx // set up the collection _trx
addTask("setupTrx", "setup _trx collection", function () { addTask("setupTrx", "setup _trx collection", function () {
return createSystemCollection("_trx", { waitForSync : false }); return createSystemCollection("_trx", { waitForSync : false });
}); });
}
if (! cluster.isCoordinator()) {
// set up the collection _replication // set up the collection _replication
addTask("setupReplication", "setup _replication collection", function () { addTask("setupReplication", "setup _replication collection", function () {
return createSystemCollection("_replication", { waitForSync : false }); return createSystemCollection("_replication", { waitForSync : false });
}); });
}
if (! cluster.isCoordinator()) {
// migration aql function names // migration aql function names
addUpgradeTask("migrateAqlFunctions", "migrate _aqlfunctions name", function () { addUpgradeTask("migrateAqlFunctions", "migrate _aqlfunctions name", function () {
var funcs = getCollection('_aqlfunctions'); var funcs = getCollection('_aqlfunctions');
@ -531,7 +564,9 @@
return result; return result;
}); });
}
if (! cluster.isCoordinator()) {
addUpgradeTask("removeOldFoxxRoutes", "Remove all old Foxx Routes", function () { addUpgradeTask("removeOldFoxxRoutes", "Remove all old Foxx Routes", function () {
var potentialFoxxes = getCollection('_routing'); var potentialFoxxes = getCollection('_routing');
@ -544,7 +579,7 @@
return true; return true;
}); });
}
// loop through all tasks and execute them // loop through all tasks and execute them

View File

@ -88,6 +88,15 @@ namespace triagens {
}; };
} }
// -----------------------------------------------------------------------------
// --SECTION-- modern C++ stuff
// -----------------------------------------------------------------------------
#include <boost/shared_ptr.hpp>
#define TRI_shared_ptr boost::shared_ptr
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @} /// @}
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////

View File

@ -27,6 +27,7 @@
#include "BasicsC/json-utilities.h" #include "BasicsC/json-utilities.h"
#include "BasicsC/string-buffer.h" #include "BasicsC/string-buffer.h"
#include "BasicsC/hashes.h"
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
// --SECTION-- private functions // --SECTION-- private functions
@ -766,6 +767,114 @@ TRI_json_t* TRI_MergeJson (TRI_memory_zone_t* zone,
return result; return result;
} }
////////////////////////////////////////////////////////////////////////////////
/// @brief compute a hash value for a JSON document, starting with a given
/// initial hash value. Note that a NULL pointer for json hashes to the
/// same value as a json pointer that points to a JSON value `null`.
////////////////////////////////////////////////////////////////////////////////
static uint64_t HashJsonRecursive (uint64_t hash, TRI_json_t const* object) {
size_t n;
size_t i;
uint64_t tmphash;
TRI_json_t const* subjson;
if (0 == object) {
return TRI_FnvHashBlock(hash, "null", 4); // strlen("null")
}
switch (object->_type) {
case TRI_JSON_UNUSED: {
return hash;
}
case TRI_JSON_NULL: {
return TRI_FnvHashBlock(hash, "null", 4); // strlen("null")
}
case TRI_JSON_BOOLEAN: {
if (object->_value._boolean) {
return TRI_FnvHashBlock(hash, "true", 4); // strlen("true")
}
else {
return TRI_FnvHashBlock(hash, "false", 5); // strlen("true")
}
}
case TRI_JSON_NUMBER: {
return TRI_FnvHashBlock(hash, (char const*) &(object->_value._number),
sizeof(object->_value._number));
}
case TRI_JSON_STRING:
case TRI_JSON_STRING_REFERENCE: {
return TRI_FnvHashBlock(hash, object->_value._string.data,
object->_value._string.length);
}
case TRI_JSON_ARRAY: {
n = object->_value._objects._length;
tmphash = hash;
for (i = 0; i < n; i += 2) {
subjson = (const TRI_json_t*) TRI_AtVector(&object->_value._objects, i);
assert(TRI_IsStringJson(subjson));
tmphash ^= HashJsonRecursive(hash, subjson);
subjson = (const TRI_json_t*) TRI_AtVector(&object->_value._objects,
i+1);
tmphash ^= HashJsonRecursive(hash, subjson);
}
return tmphash;
}
case TRI_JSON_LIST: {
n = object->_value._objects._length;
for (i = 0; i < n; ++i) {
subjson = (const TRI_json_t*) TRI_AtVector(&object->_value._objects, i);
hash = HashJsonRecursive(hash, subjson);
}
return hash;
}
}
return hash; // never reached
}
////////////////////////////////////////////////////////////////////////////////
/// @brief compute a hash value for a JSON document. Note that a NULL pointer
/// for json hashes to the same value as a json pointer that points to a
/// JSON value `null`.
////////////////////////////////////////////////////////////////////////////////
uint64_t TRI_HashJson (TRI_json_t const* json) {
return HashJsonRecursive(TRI_FnvHashBlockInitial(), json);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief compute a hash value for a JSON document depending on a list
/// of attributes. This is used for sharding to map documents to shards.
///
/// The attributes array `attributes` has to contain exactly `nrAttributes`
/// pointers to zero-terminated strings.
/// Note that all JSON values given for `json` that are not JSON arrays
/// hash to the same value, which is not the same value a JSON array gets
/// that does not contain any of the specified attributes.
////////////////////////////////////////////////////////////////////////////////
uint64_t TRI_HashJsonByAttributes (TRI_json_t const* json,
char const *attributes[],
int nrAttributes) {
int i;
TRI_json_t const* subjson;
uint64_t hash;
hash = TRI_FnvHashBlockInitial();
if (TRI_IsArrayJson(json)) {
for (i = 0; i < nrAttributes; i++) {
subjson = TRI_LookupArrayJson(json, attributes[i]);
hash = HashJsonRecursive(hash, subjson);
}
}
return hash;
}
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @} /// @}
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////

View File

@ -137,6 +137,21 @@ TRI_json_t* TRI_MergeJson (TRI_memory_zone_t*,
const TRI_json_t* const, const TRI_json_t* const,
const bool); const bool);
////////////////////////////////////////////////////////////////////////////////
/// @brief compute a hash value for a JSON document.
////////////////////////////////////////////////////////////////////////////////
uint64_t TRI_HashJson (TRI_json_t const* json);
////////////////////////////////////////////////////////////////////////////////
/// @brief compute a hash value for a JSON document depending on a list
/// of attributes.
////////////////////////////////////////////////////////////////////////////////
uint64_t TRI_HashJsonByAttributes (TRI_json_t const* json,
char const *attributes[],
int nrAttributes);
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @} /// @}
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////