1
0
Fork 0

added collection.revision()

This commit is contained in:
Jan Steemann 2014-02-20 17:30:26 +01:00
parent caf7f6779e
commit 0480dbb404
3 changed files with 151 additions and 22 deletions

View File

@ -56,18 +56,15 @@ static T ExtractFigure (TRI_json_t const* json,
TRI_json_t const* g = TRI_LookupArrayJson(json, group);
if (! TRI_IsArrayJson(g)) {
std::cout << "COULD NOT FIND GROUP IN JSON: " << group << "-------------------\n";
return static_cast<T>(0);
}
TRI_json_t const* value = TRI_LookupArrayJson(g, name);
if (! TRI_IsNumberJson(value)) {
std::cout << "COULD NOT FIND ATT IN JSON: " << name << "-------------------\n";
return static_cast<T>(0);
}
return static_cast<T>(value->_value._number);
}
@ -173,6 +170,83 @@ bool shardKeysChanged (std::string const& dbname,
return false;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief returns revision for a sharded collection
////////////////////////////////////////////////////////////////////////////////
int revisionOnCoordinator (std::string const& dbname,
std::string const& collname,
TRI_voc_rid_t& rid) {
// Set a few variables needed for our work:
ClusterInfo* ci = ClusterInfo::instance();
ClusterComm* cc = ClusterComm::instance();
// First determine the collection ID from the name:
TRI_shared_ptr<CollectionInfo> collinfo = ci->getCollection(dbname, collname);
if (collinfo->empty()) {
return TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND;
}
rid = 0;
// If we get here, the sharding attributes are not only _key, therefore
// we have to contact everybody:
ClusterCommResult* res;
map<ShardID, ServerID> shards = collinfo->shardIds();
map<ShardID, ServerID>::iterator it;
CoordTransactionID coordTransactionID = TRI_NewTickServer();
for (it = shards.begin(); it != shards.end(); ++it) {
map<string, string>* headers = new map<string, string>;
res = cc->asyncRequest("", coordTransactionID, "shard:"+it->first,
triagens::rest::HttpRequest::HTTP_REQUEST_GET,
"/_db/" + dbname + "/_api/collection/" +
StringUtils::urlEncode(it->first)+ "/revision",
0, false, headers, NULL, 300.0);
delete res;
}
// Now listen to the results:
int count;
int nrok = 0;
for (count = shards.size(); count > 0; count--) {
res = cc->wait( "", coordTransactionID, 0, "", 0.0);
if (res->status == CL_COMM_RECEIVED) {
if (res->answer_code == triagens::rest::HttpResponse::OK) {
TRI_json_t* json = TRI_JsonString(TRI_UNKNOWN_MEM_ZONE, res->answer->body());
if (JsonHelper::isArray(json)) {
TRI_json_t const* r = TRI_LookupArrayJson(json, "revision");
if (TRI_IsStringJson(r)) {
TRI_voc_rid_t cmp = StringUtils::uint64(r->_value._string.data);
if (cmp > rid) {
// get the maximum value
rid = cmp;
}
}
nrok++;
}
if (json != 0) {
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
}
}
}
delete res;
}
if (nrok != (int) shards.size()) {
return TRI_ERROR_INTERNAL;
}
return TRI_ERROR_NO_ERROR; // the cluster operation was OK, however,
// the DBserver could have reported an error.
}
////////////////////////////////////////////////////////////////////////////////
/// @brief returns figures for a sharded collection
////////////////////////////////////////////////////////////////////////////////
@ -209,7 +283,7 @@ int figuresOnCoordinator (string const& dbname,
map<string, string>* headers = new map<string, string>;
res = cc->asyncRequest("", coordTransactionID, "shard:"+it->first,
triagens::rest::HttpRequest::HTTP_REQUEST_GET,
"/_db/"+dbname+"/_api/collection/" +
"/_db/" + dbname + "/_api/collection/" +
StringUtils::urlEncode(it->first)+ "/figures",
0, false, headers, NULL, 300.0);
delete res;
@ -301,7 +375,7 @@ int countOnCoordinator (
map<string, string>* headers = new map<string, string>;
res = cc->asyncRequest("", coordTransactionID, "shard:"+it->first,
triagens::rest::HttpRequest::HTTP_REQUEST_GET,
"/_db/"+dbname+"/_api/collection/"+
"/_db/" + dbname + "/_api/collection/"+
StringUtils::urlEncode(it->first)+"/count",
0, false, headers, NULL, 300.0);
delete res;

View File

@ -76,6 +76,14 @@ namespace triagens {
struct TRI_json_s const* newJson,
bool isPatch);
////////////////////////////////////////////////////////////////////////////////
/// @brief returns revision for a sharded collection
////////////////////////////////////////////////////////////////////////////////
int revisionOnCoordinator (std::string const& dbname,
std::string const& collname,
TRI_voc_rid_t&);
////////////////////////////////////////////////////////////////////////////////
/// @brief returns figures for a sharded collection
////////////////////////////////////////////////////////////////////////////////

View File

@ -6376,6 +6376,7 @@ static v8::Handle<v8::Value> JS_ExistsVocbaseCol (v8::Arguments const& argv) {
/// @brief fetch the figures for a sharded collection
////////////////////////////////////////////////////////////////////////////////
#ifdef TRI_ENABLE_CLUSTER
static TRI_doc_collection_info_t* GetFiguresCoordinator (TRI_vocbase_col_t* collection) {
assert(collection != 0);
@ -6392,7 +6393,8 @@ static TRI_doc_collection_info_t* GetFiguresCoordinator (TRI_vocbase_col_t* coll
}
return result;
}
}
#endif
////////////////////////////////////////////////////////////////////////////////
/// @brief fetch the figures for a local collection
@ -7223,6 +7225,52 @@ static v8::Handle<v8::Value> JS_ReplaceVocbaseCol (v8::Arguments const& argv) {
return ReplaceVocbaseCol(true, argv);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief fetch the revision for a local collection
////////////////////////////////////////////////////////////////////////////////
static int GetRevision (TRI_vocbase_col_t* collection,
TRI_voc_rid_t& rid) {
assert(collection != 0);
CollectionNameResolver resolver(collection->_vocbase);
ReadTransactionType trx(collection->_vocbase, resolver, collection->_cid);
int res = trx.begin();
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
// READ-LOCK start
trx.lockRead();
TRI_primary_collection_t* primary = collection->_collection;
rid = primary->base._info._revision;
trx.finish(res);
// READ-LOCK end
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief fetch the revision for a sharded collection
////////////////////////////////////////////////////////////////////////////////
#ifdef TRI_ENABLE_CLUSTER
static int GetRevisionCoordinator (TRI_vocbase_col_t* collection,
TRI_voc_rid_t& rid) {
assert(collection != 0);
string const databaseName(collection->_dbName);
string const cid = StringUtils::itoa(collection->_cid);
int res = revisionOnCoordinator(databaseName, cid, rid);
return res;
}
#endif
////////////////////////////////////////////////////////////////////////////////
/// @brief returns the revision id of a collection
///
@ -7249,26 +7297,25 @@ static v8::Handle<v8::Value> JS_RevisionVocbaseCol (v8::Arguments const& argv) {
if (collection == 0) {
TRI_V8_EXCEPTION_INTERNAL(scope, "cannot extract collection");
}
TRI_SHARDING_COLLECTION_NOT_YET_IMPLEMENTED(scope, collection);
CollectionNameResolver resolver(collection->_vocbase);
ReadTransactionType trx(collection->_vocbase, resolver, collection->_cid);
int res = trx.begin();
TRI_voc_rid_t rid;
int res;
#ifdef TRI_ENABLE_CLUSTER
if (ServerState::instance()->isCoordinator()) {
res = GetRevisionCoordinator(collection, rid);
}
else {
res = GetRevision(collection, rid);
}
#else
res = GetRevision(collection, rid);
#endif
if (res != TRI_ERROR_NO_ERROR) {
TRI_V8_EXCEPTION_MESSAGE(scope, res, "cannot fetch revision");
TRI_V8_EXCEPTION(scope, res);
}
// READ-LOCK start
trx.lockRead();
TRI_primary_collection_t* primary = collection->_collection;
TRI_voc_rid_t rid = primary->base._info._revision;
trx.finish(res);
// READ-LOCK end
return scope.Close(V8RevisionId(rid));
}