1
0
Fork 0

make collection revision id a real revision id

This commit is contained in:
Jan Steemann 2013-08-02 09:37:46 +02:00
parent 83b279116e
commit d8690e5fa6
12 changed files with 199 additions and 48 deletions

View File

@ -2323,7 +2323,7 @@ static v8::Handle<v8::Value> JS_ChecksumCollection (v8::Arguments const& argv) {
trx.lockRead();
// get last tick
const string tick = StringUtils::itoa(primary->base._info._tick);
const string rid = StringUtils::itoa(primary->base._info._revision);
if (withData) {
TRI_InitStringBuffer(&helper._buffer, TRI_CORE_MEM_ZONE);
@ -2344,7 +2344,7 @@ static v8::Handle<v8::Value> JS_ChecksumCollection (v8::Arguments const& argv) {
v8::Handle<v8::Object> result = v8::Object::New();
result->Set(v8::String::New("checksum"), v8::Number::New(helper._checksum));
result->Set(v8::String::New("revision"), v8::String::New(tick.c_str(), tick.size()));
result->Set(v8::String::New("revision"), v8::String::New(rid.c_str(), rid.size()));
return scope.Close(result);
}

View File

@ -6572,12 +6572,12 @@ static v8::Handle<v8::Value> JS_RevisionVocbaseCol (v8::Arguments const& argv) {
// READ-LOCK start
trx.lockRead();
TRI_primary_collection_t* primary = collection->_collection;
TRI_voc_tick_t tick = primary->base._info._tick;
TRI_voc_rid_t rid = primary->base._info._revision;
trx.finish(res);
// READ-LOCK end
return scope.Close(V8RevisionId(tick));
return scope.Close(V8RevisionId(rid));
}
////////////////////////////////////////////////////////////////////////////////

View File

@ -897,7 +897,7 @@ void TRI_InitCollectionInfo (TRI_vocbase_t* vocbase,
parameter->_version = TRI_COL_VERSION;
parameter->_type = type;
parameter->_cid = 0;
parameter->_tick = 0;
parameter->_revision = 0;
parameter->_deleted = false;
parameter->_doCompact = true;
@ -929,7 +929,7 @@ void TRI_CopyCollectionInfo (TRI_col_info_t* dst, const TRI_col_info_t* const sr
dst->_version = src->_version;
dst->_type = src->_type;
dst->_cid = src->_cid;
dst->_tick = src->_tick;
dst->_revision = src->_revision;
dst->_deleted = src->_deleted;
dst->_doCompact = src->_doCompact;

View File

@ -255,7 +255,7 @@ typedef struct TRI_col_info_s {
TRI_col_version_t _version; // collection version
TRI_col_type_e _type; // collection type
TRI_voc_cid_t _cid; // collection identifier
TRI_voc_tick_t _tick; // last tick
TRI_voc_rid_t _revision; // last revision id written
TRI_voc_size_t _maximalSize; // maximal size of memory mapped file
char _name[TRI_COL_PATH_LENGTH]; // name of the collection

View File

@ -112,12 +112,13 @@ static inline bool IsVisible (TRI_doc_mptr_t const* header) {
/// @brief set the collection tick with the marker's tick value
////////////////////////////////////////////////////////////////////////////////
static inline void SetTick (TRI_document_collection_t* document,
TRI_voc_tick_t tick) {
static inline void SetRevision (TRI_document_collection_t* document,
TRI_voc_rid_t rid,
bool force) {
TRI_col_info_t* info = &document->base.base._info;
if (tick > info->_tick) {
info->_tick = tick;
if (force || rid > info->_revision) {
info->_revision = rid;
}
}
@ -864,7 +865,7 @@ static int WriteInsertMarker (TRI_document_collection_t* document,
assert(totalSize == marker->base._size);
res = TRI_WriteMarkerDocumentCollection(document, &marker->base, totalSize, &fid, result, waitForSync);
if (res == TRI_ERROR_NO_ERROR) {
// writing the element into the datafile has succeeded
TRI_doc_datafile_info_t* dfi;
@ -883,6 +884,9 @@ static int WriteInsertMarker (TRI_document_collection_t* document,
dfi->_numberAlive++;
dfi->_sizeAlive += (int64_t) marker->base._size;
}
// update tick
SetRevision(document, marker->_rid, false);
}
return res;
@ -1033,6 +1037,9 @@ static int WriteRemoveMarker (TRI_document_collection_t* document,
if (dfi != NULL) {
dfi->_numberDeletion++;
}
// update tick
SetRevision(document, marker->_rid, false);
}
return res;
@ -1206,6 +1213,9 @@ static int WriteUpdateMarker (TRI_document_collection_t* document,
dfi->_numberDead++;
dfi->_sizeDead += size;
}
// update tick
SetRevision(document, marker->_rid, false);
}
return res;
@ -1970,15 +1980,15 @@ static int OpenIteratorApplyInsert (open_iterator_state_t* state,
marker = operation->_marker;
d = (TRI_doc_document_key_marker_t const*) marker;
if (state->_fid != operation->_fid) {
// update the state
state->_fid = operation->_fid;
state->_dfi = TRI_FindDatafileInfoPrimaryCollection(primary, operation->_fid, true);
}
SetTick(document, marker->_tick);
SetRevision(document, d->_rid, false);
#ifdef TRI_ENABLE_LOGGER
if (marker->_type == TRI_DOC_MARKER_KEY_DOCUMENT) {
LOG_TRACE("document: fid %llu, key %s, rid %llu, _offsetJson %lu, _offsetKey %lu",
@ -2119,7 +2129,7 @@ static int OpenIteratorApplyRemove (open_iterator_state_t* state,
marker = operation->_marker;
d = (TRI_doc_deletion_key_marker_t const*) marker;
SetTick(document, marker->_tick);
SetRevision(document, d->_rid, false);
if (state->_fid != operation->_fid) {
// update the state
@ -3220,9 +3230,6 @@ int TRI_WriteMarkerDocumentCollection (TRI_document_collection_t* document,
if (forceSync) {
WaitSync(document, journal, ((char const*) *result) + totalSize);
}
// update tick
SetTick(document, (*result)->_tick);
}
else {
// writing the element into the datafile has failed
@ -6340,12 +6347,13 @@ int TRI_DeleteDocumentDocumentCollection (TRI_transaction_collection_t* trxColle
}
////////////////////////////////////////////////////////////////////////////////
/// @brief set the collection tick
/// @brief set the collection revision
////////////////////////////////////////////////////////////////////////////////
void TRI_SetTickDocumentCollection (TRI_document_collection_t* document,
TRI_voc_tick_t tick) {
SetTick(document, tick);
void TRI_SetRevisionDocumentCollection (TRI_document_collection_t* document,
TRI_voc_rid_t rid,
bool force) {
SetRevision(document, rid, force);
}
////////////////////////////////////////////////////////////////////////////////

View File

@ -720,11 +720,12 @@ int TRI_DeleteDocumentDocumentCollection (struct TRI_transaction_collection_s*,
TRI_doc_mptr_t*);
////////////////////////////////////////////////////////////////////////////////
/// @brief set the collection tick
/// @brief set the collection revision
////////////////////////////////////////////////////////////////////////////////
void TRI_SetTickDocumentCollection (TRI_document_collection_t*,
TRI_voc_tick_t);
void TRI_SetRevisionDocumentCollection (TRI_document_collection_t*,
TRI_voc_rid_t,
bool);
////////////////////////////////////////////////////////////////////////////////
/// @brief rotate the current journal of the collection

View File

@ -962,7 +962,7 @@ static int StartReplicationLogger (TRI_replication_logger_t* logger) {
CreateCap(logger);
}
logger->_state._lastLogTick = ((TRI_collection_t*) collection->_collection)->_info._tick;
logger->_state._lastLogTick = ((TRI_collection_t*) collection->_collection)->_info._revision;
logger->_state._active = true;
LOG_INFO("started replication logger for database '%s', last tick: %llu",
@ -1054,7 +1054,7 @@ static int GetStateInactive (TRI_vocbase_t* vocbase,
primary = (TRI_primary_collection_t*) col->_collection;
dst->_active = false;
dst->_lastLogTick = primary->base._info._tick;
dst->_lastLogTick = primary->base._info._revision;
TRI_ReleaseCollectionVocBase(vocbase, col);

View File

@ -496,6 +496,7 @@ static int AddCollectionOperation (TRI_transaction_collection_t* trxCollection,
size_t totalSize) {
TRI_transaction_operation_t trxOperation;
TRI_document_collection_t* document;
TRI_voc_rid_t rid;
int res;
TRI_DEBUG_INTENTIONAL_FAIL_IF("AddCollectionOperation-OOM") {
@ -530,16 +531,24 @@ static int AddCollectionOperation (TRI_transaction_collection_t* trxCollection,
}
document = (TRI_document_collection_t*) trxCollection->_collection->_collection;
rid = 0;
if (type == TRI_VOC_DOCUMENT_OPERATION_UPDATE) {
if (type == TRI_VOC_DOCUMENT_OPERATION_INSERT) {
rid = ((TRI_doc_document_key_marker_t*) marker)->_rid;
}
else if (type == TRI_VOC_DOCUMENT_OPERATION_UPDATE) {
document->_headers->moveBack(document->_headers, newHeader, oldData);
rid = ((TRI_doc_document_key_marker_t*) marker)->_rid;
}
else if (type == TRI_VOC_DOCUMENT_OPERATION_REMOVE) {
document->_headers->unlink(document->_headers, oldHeader);
rid = ((TRI_doc_deletion_key_marker_t*) marker)->_rid;
}
// update collection tick
TRI_SetTickDocumentCollection(document, marker->_tick);
// update collection revision
if (rid > 0) {
TRI_SetRevisionDocumentCollection(document, rid, false);
}
return TRI_ERROR_NO_ERROR;
}
@ -1053,7 +1062,7 @@ static int RollbackCollectionOperations (TRI_transaction_collection_t* trxCollec
}
TRI_SetTickDocumentCollection(document, trxCollection->_originalTick);
TRI_SetRevisionDocumentCollection(document, trxCollection->_originalRevision, true);
return res;
}
@ -1207,7 +1216,7 @@ static TRI_transaction_collection_t* CreateCollection (TRI_transaction_t* trx,
trxCollection->_globalInstance = globalInstance;
#endif
trxCollection->_operations = NULL;
trxCollection->_originalTick = 0;
trxCollection->_originalRevision = 0;
trxCollection->_locked = false;
trxCollection->_compactionLocked = false;
trxCollection->_waitForSync = false;
@ -1899,8 +1908,8 @@ int TRI_AddOperationCollectionTransaction (TRI_transaction_collection_t* trxColl
trx = trxCollection->_transaction;
primary = trxCollection->_collection->_collection;
if (trxCollection->_originalTick == 0) {
trxCollection->_originalTick = primary->base._info._tick;
if (trxCollection->_originalRevision == 0) {
trxCollection->_originalRevision = primary->base._info._revision;
}
if (trx->_hints & ((TRI_transaction_hint_t) TRI_TRANSACTION_HINT_SINGLE_OPERATION)) {

View File

@ -339,7 +339,7 @@ typedef struct TRI_transaction_collection_s {
TRI_transaction_collection_global_t* _globalInstance; // pointer to the global instance
#endif
TRI_vector_t* _operations; // buffered CRUD operations
TRI_voc_tick_t _originalTick; // collection revision at trx start
TRI_voc_rid_t _originalRevision; // collection revision at trx start
bool _locked; // collection lock flag
bool _compactionLocked; // was the compaction lock grabbed for the collection?
bool _waitForSync; // whether or not the collection has waitForSync

View File

@ -2308,7 +2308,7 @@ int TRI_DropCollectionVocBase (TRI_vocbase_t* vocbase,
if (tmpFile != NULL) {
if (TRI_ExistsFile(tmpFile)) {
TRI_UnlinkFile(tmpFile);
LOG_WARNING("removing dangling temporary file '%s'", tmpFile);
LOG_DEBUG("removing dangling temporary file '%s'", tmpFile);
}
TRI_FreeString(TRI_CORE_MEM_ZONE, tmpFile);
}

View File

@ -28,10 +28,27 @@
var jsunity = require("jsunity");
var arangodb = require("org/arangodb");
var internal = require("internal");
var ArangoCollection = arangodb.ArangoCollection;
var db = arangodb.db;
var ERRORS = arangodb.errors;
var compareStringIds = function (l, r) {
if (l.length != r.length) {
return l.length - r.length < 0 ? -1 : 1;
}
// length is equal
for (i = 0; i < l.length; ++i) {
if (l[i] != r[i]) {
return l[i] < r[i] ? -1 : 1;
}
}
return 0;
};
// -----------------------------------------------------------------------------
// --SECTION-- collection methods
@ -971,7 +988,7 @@ function CollectionSuite () {
/// @brief test revision id
////////////////////////////////////////////////////////////////////////////////
testRevision : function () {
testRevision1 : function () {
var cn = "example";
db._drop(cn);
@ -979,32 +996,115 @@ function CollectionSuite () {
var r1 = c1.revision();
assertTypeOf("string", r1);
assertTrue(r1 !== "");
assertTrue(r1.match(/^[0-9]+$/));
c1.save({ a : 1 });
var r2 = c1.revision();
assertTrue(r1 != r2);
assertTypeOf("string", r2);
assertTrue(r2 !== "");
assertTrue(r2.match(/^[0-9]+$/));
assertEqual(1, compareStringIds(r2, r1));
c1.save({ a : 2 });
var r3 = c1.revision();
assertTrue(r1 != r3);
assertTrue(r2 != r3);
assertTypeOf("string", r3);
assertTrue(r3 !== "");
assertTrue(r3.match(/^[0-9]+$/));
assertEqual(1, compareStringIds(r3, r2));
// unload
c1.unload();
c1 = null;
internal.wait(5);
c1 = db._collection(cn);
var r4 = c1.revision();
assertTypeOf("string", r4);
assertEqual(r3, r4);
assertEqual(0, compareStringIds(r3, r4));
db._drop(cn);
},
////////////////////////////////////////////////////////////////////////////////
/// @brief test revision id
////////////////////////////////////////////////////////////////////////////////
testRevision2 : function () {
var cn = "example";
db._drop(cn);
var c1 = db._create(cn);
var r1 = c1.revision();
assertTrue(r1.match(/^[0-9]+$/));
c1.save({ _key: "abc" });
var r2 = c1.revision();
assertTrue(r2.match(/^[0-9]+$/));
assertEqual(1, compareStringIds(r2, r1));
c1.save({ _key: "123" });
c1.save({ _key: "456" });
c1.save({ _key: "789" });
var r3 = c1.revision();
assertTrue(r3.match(/^[0-9]+$/));
assertEqual(1, compareStringIds(r3, r2));
c1.remove("123");
var r4 = c1.revision();
assertTrue(r4.match(/^[0-9]+$/));
assertEqual(1, compareStringIds(r4, r3));
c1.truncate();
var r5 = c1.revision();
assertTrue(r5.match(/^[0-9]+$/));
assertEqual(1, compareStringIds(r5, r4));
// unload
c1.unload();
c1 = null;
internal.wait(5);
// compare rev
c1 = db._collection(cn);
var r6 = c1.revision();
assertTrue(r6.match(/^[0-9]+$/));
assertEqual(0, compareStringIds(r6, r5));
for (var i = 0; i < 10; ++i) {
c1.save({ _key: "test" + i });
assertTrue(c1.revision().match(/^[0-9]+$/));
assertEqual(1, compareStringIds(c1.revision(), r6));
r6 = c1.revision();
}
// unload
c1.unload();
c1 = null;
internal.wait(5);
// compare rev
c1 = db._collection(cn);
var r7 = c1.revision();
assertTrue(r7.match(/^[0-9]+$/));
assertEqual(0, compareStringIds(r7, r6));
c1.truncate();
var r8 = c1.revision();
// unload
c1.unload();
c1 = null;
internal.wait(5);
// compare rev
c1 = db._collection(cn);
var r9 = c1.revision();
assertTrue(r9.match(/^[0-9]+$/));
assertEqual(0, compareStringIds(r9, r8));
db._drop(cn);
},
////////////////////////////////////////////////////////////////////////////////
/// @brief test first
////////////////////////////////////////////////////////////////////////////////

View File

@ -31,6 +31,22 @@ var arangodb = require("org/arangodb");
var helper = require("org/arangodb/aql-helper");
var db = arangodb.db;
var compareStringIds = function (l, r) {
if (l.length != r.length) {
return l.length - r.length < 0 ? -1 : 1;
}
// length is equal
for (i = 0; i < l.length; ++i) {
if (l[i] != r[i]) {
return l[i] < r[i] ? -1 : 1;
}
}
return 0;
};
// -----------------------------------------------------------------------------
// --SECTION-- test suite
// -----------------------------------------------------------------------------
@ -1955,14 +1971,31 @@ function transactionRollbackSuite () {
write: [ cn1 ]
},
action : function () {
var _r = r;
c1.save({ _key: "tom" });
assertTrue(c1.revision() > r);
assertEqual(1, compareStringIds(c1.revision(), _r));
_r = c1.revision();
c1.save({ _key: "tim" });
assertTrue(c1.revision() > r);
assertEqual(1, compareStringIds(c1.revision(), _r));
_r = c1.revision();
c1.save({ _key: "tam" });
assertTrue(c1.revision() > r);
assertEqual(1, compareStringIds(c1.revision(), _r));
_r = c1.revision();
c1.remove("tam");
assertEqual(1, compareStringIds(c1.revision(), _r));
_r = c1.revision();
c1.update("tom", { "bom" : true });
assertEqual(1, compareStringIds(c1.revision(), _r));
_r = c1.revision();
c1.remove("tom");
assertEqual(1, compareStringIds(c1.revision(), _r));
//_r = c1.revision();
throw "rollback";
}