1
0
Fork 0

pass transaction collection into CRUD operations

This commit is contained in:
Jan Steemann 2013-03-28 13:26:49 +01:00
parent 014d8e4a6d
commit 6b3b181930
16 changed files with 518 additions and 482 deletions

View File

@ -99,7 +99,8 @@ static int InsertCapConstraint (TRI_index_t* idx,
/// @brief post processing of insert
////////////////////////////////////////////////////////////////////////////////
static int PostInsertCapConstraint (TRI_index_t* idx,
static int PostInsertCapConstraint (TRI_transaction_collection_t* trxCollection,
TRI_index_t* idx,
TRI_doc_mptr_t const* doc) {
TRI_cap_constraint_t* cap;
TRI_primary_collection_t* primary;
@ -108,7 +109,6 @@ static int PostInsertCapConstraint (TRI_index_t* idx,
primary = idx->_collection;
while (cap->_size < cap->_array._array._nrUsed) {
TRI_doc_operation_context_t rollbackContext;
TRI_doc_mptr_t* oldest;
int res;
@ -122,8 +122,7 @@ static int PostInsertCapConstraint (TRI_index_t* idx,
LOG_DEBUG("removing document '%s' because of cap constraint", (char*) oldest->_key);
TRI_InitContextPrimaryCollection(&rollbackContext, primary);
res = TRI_DeleteDocumentDocumentCollection(&rollbackContext, NULL, oldest);
res = TRI_DeleteDocumentDocumentCollection(trxCollection, NULL, oldest);
if (res != TRI_ERROR_NO_ERROR) {
LOG_WARNING("cannot cap collection: %s", TRI_last_error());

View File

@ -405,6 +405,7 @@ bool RestDocumentHandler::createDocument () {
TRI_doc_mptr_t document;
res = trx.createDocument(&document, json, waitForSync, true);
const bool wasSynchronous = trx.synchronous();
res = trx.finish(res);
// .............................................................................
@ -419,7 +420,7 @@ bool RestDocumentHandler::createDocument () {
assert(document._key != 0);
// generate result
if (trx.synchronous()) {
if (wasSynchronous) {
generateCreated(cid, document._key, document._rid);
}
else {
@ -1175,6 +1176,8 @@ bool RestDocumentHandler::modifyDocument (bool isPatch) {
res = trx.updateDocument(key, &document, json, policy, waitForSync, revision, &rid, true);
}
const bool wasSynchronous = trx.synchronous();
res = trx.finish(res);
// .............................................................................
@ -1188,7 +1191,7 @@ bool RestDocumentHandler::modifyDocument (bool isPatch) {
}
// generate result
if (trx.synchronous()) {
if (wasSynchronous) {
generateCreated(cid, (TRI_voc_key_t) key.c_str(), document._rid);
}
else {
@ -1313,6 +1316,8 @@ bool RestDocumentHandler::deleteDocument () {
TRI_voc_rid_t rid = 0;
res = trx.deleteDocument(key, policy, waitForSync, revision, &rid, false);
const bool wasSynchronous = trx.synchronous();
if (res == TRI_ERROR_NO_ERROR) {
res = trx.commit();
}
@ -1329,7 +1334,7 @@ bool RestDocumentHandler::deleteDocument () {
return false;
}
if (trx.synchronous()) {
if (wasSynchronous) {
generateDeleted(cid, (TRI_voc_key_t) key.c_str(), rid);
}
else {

View File

@ -212,6 +212,7 @@ bool RestEdgeHandler::createDocument () {
// will hold the result
TRI_doc_mptr_t document;
res = trx.createEdge(&document, json, waitForSync, &edge, true);
const bool wasSynchronous = trx.synchronous();
res = trx.finish(res);
// .............................................................................
@ -226,7 +227,7 @@ bool RestEdgeHandler::createDocument () {
assert(document._key != 0);
// generate result
if (trx.synchronous()) {
if (wasSynchronous) {
generateCreated(cid, document._key, document._rid);
}
else {

View File

@ -28,6 +28,8 @@
#ifndef TRIAGENS_UTILS_DOCUMENT_HELPER_H
#define TRIAGENS_UTILS_DOCUMENT_HELPER_H 1
#include "BasicsC/json.h"
namespace triagens {
namespace arango {
@ -89,6 +91,36 @@ namespace triagens {
return collectionName + TRI_DOCUMENT_HANDLE_SEPARATOR_STR + key;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief extract the "_key" attribute from a JSON object
////////////////////////////////////////////////////////////////////////////////
static int getKey (TRI_json_t const* json, TRI_voc_key_t* key) {
*key = 0;
// check type of json
if (json == 0 || json->_type != TRI_JSON_ARRAY) {
return TRI_ERROR_NO_ERROR;
}
// check _key is there
const TRI_json_t* k = TRI_LookupArrayJson((TRI_json_t*) json, "_key");
if (k == 0) {
return TRI_ERROR_NO_ERROR;
}
if (k->_type != TRI_JSON_STRING) {
// _key is there but not a string
return TRI_ERROR_ARANGO_DOCUMENT_KEY_BAD;
}
// _key is there and a string
*key = k->_value._string.data;
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////

View File

@ -103,19 +103,31 @@ namespace triagens {
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief get the underlying transaction collection
////////////////////////////////////////////////////////////////////////////////
inline TRI_transaction_collection_t* trxCollection () {
TRI_ASSERT_DEBUG(_cid > 0);
TRI_transaction_collection_t* trxCollection = TRI_GetCollectionTransaction(this->_trx, this->_cid, _accessType);
TRI_ASSERT_DEBUG(trxCollection != 0);
return trxCollection;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief get the underlying primary collection
////////////////////////////////////////////////////////////////////////////////
inline TRI_primary_collection_t* primaryCollection () {
TRI_ASSERT_DEBUG(_cid > 0);
TRI_transaction_collection_t* trxCollection = this->trxCollection();
TRI_vocbase_col_t* collection = TRI_GetCollectionTransaction(this->getTrx(), this->_cid, _accessType);
TRI_ASSERT_DEBUG(collection != 0);
TRI_ASSERT_DEBUG(collection->_collection != 0);
TRI_ASSERT_DEBUG(trxCollection != 0);
TRI_ASSERT_DEBUG(trxCollection->_collection != 0);
TRI_ASSERT_DEBUG(trxCollection->_collection->_collection != 0);
return collection->_collection;
return trxCollection->_collection->_collection;
}
////////////////////////////////////////////////////////////////////////////////
@ -126,22 +138,12 @@ namespace triagens {
return _cid;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief return the collection's barrier list
////////////////////////////////////////////////////////////////////////////////
inline TRI_barrier_list_t* barrierList () {
return &primaryCollection()->_barrierList;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief explicitly lock the underlying collection for read access
////////////////////////////////////////////////////////////////////////////////
int lockRead () {
TRI_primary_collection_t* primary = primaryCollection();
return this->lockExplicit(primary, TRI_TRANSACTION_READ);
return this->lock(this->trxCollection(), TRI_TRANSACTION_READ);
}
////////////////////////////////////////////////////////////////////////////////
@ -149,29 +151,23 @@ namespace triagens {
////////////////////////////////////////////////////////////////////////////////
int lockWrite () {
TRI_primary_collection_t* primary = primaryCollection();
return this->lockExplicit(primary, TRI_TRANSACTION_WRITE);
return this->lock(this->trxCollection(), TRI_TRANSACTION_WRITE);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief read any (random) document within a transaction
////////////////////////////////////////////////////////////////////////////////
int read (TRI_doc_mptr_t* mptr, TRI_barrier_t** barrier) {
TRI_primary_collection_t* const primary = primaryCollection();
return this->readCollectionAny(primary, mptr, barrier);
inline int read (TRI_doc_mptr_t* mptr, TRI_barrier_t** barrier) {
return this->readAny(this->trxCollection(), mptr, barrier);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief read a document within a transaction
////////////////////////////////////////////////////////////////////////////////
int read (TRI_doc_mptr_t* mptr, const string& key, const bool lock) {
TRI_primary_collection_t* const primary = primaryCollection();
return this->readCollectionDocument(primary, mptr, key, lock);
inline int read (TRI_doc_mptr_t* mptr, const string& key, const bool lock) {
return this->readSingle(this->trxCollection(), mptr, key, lock);
}
////////////////////////////////////////////////////////////////////////////////
@ -179,9 +175,7 @@ namespace triagens {
////////////////////////////////////////////////////////////////////////////////
int read (vector<string>& ids) {
TRI_primary_collection_t* primary = primaryCollection();
return this->readCollectionDocuments(primary, ids);
return this->readAll(this->trxCollection(), ids);
}
////////////////////////////////////////////////////////////////////////////////
@ -193,9 +187,8 @@ namespace triagens {
TRI_voc_ssize_t skip,
TRI_voc_size_t limit,
uint32_t* total) {
TRI_primary_collection_t* primary = primaryCollection();
return this->readCollectionPointers(primary, docs, barrier, skip, limit, total);
return this->readSlice(this->trxCollection(), docs, barrier, skip, limit, total);
}
////////////////////////////////////////////////////////////////////////////////

View File

@ -72,8 +72,7 @@ namespace triagens {
const triagens::arango::CollectionNameResolver& resolver,
const TRI_transaction_cid_t cid) :
SingleCollectionTransaction<T>(vocbase, resolver, cid, TRI_TRANSACTION_WRITE),
_numWrites(0),
_synchronous(false) {
_numWrites(0) {
if (N == 1) {
this->addHint(TRI_TRANSACTION_HINT_SINGLE_OPERATION);
@ -88,8 +87,7 @@ namespace triagens {
const triagens::arango::CollectionNameResolver& resolver,
const string& name) :
SingleCollectionTransaction<T>(vocbase, resolver, resolver.getCollectionId(name), TRI_TRANSACTION_WRITE),
_numWrites(0),
_synchronous(false) {
_numWrites(0) {
if (N == 1) {
this->addHint(TRI_TRANSACTION_HINT_SINGLE_OPERATION);
@ -119,11 +117,11 @@ namespace triagens {
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief return whether the last write in a transaction was synchronous
/// @brief return whether a write in the transaction was synchronous
////////////////////////////////////////////////////////////////////////////////
inline bool synchronous () const {
return _synchronous;
return TRI_WasSynchronousCollectionTransaction(this->_trx, this->cid());
}
////////////////////////////////////////////////////////////////////////////////
@ -131,9 +129,7 @@ namespace triagens {
////////////////////////////////////////////////////////////////////////////////
int lockWrite () {
TRI_primary_collection_t* primary = this->primaryCollection();
return this->lockExplicit(primary, TRI_TRANSACTION_WRITE);
return this->lock(this->trxCollection(), TRI_TRANSACTION_WRITE);
}
////////////////////////////////////////////////////////////////////////////////
@ -148,10 +144,7 @@ namespace triagens {
return TRI_ERROR_TRANSACTION_INTERNAL;
}
TRI_primary_collection_t* primary = this->primaryCollection();
_synchronous = forceSync || primary->base._info._waitForSync;
return this->createCollectionDocument(primary, TRI_DOC_MARKER_KEY_DOCUMENT, mptr, json, 0, forceSync, lock);
return this->create(this->trxCollection(), TRI_DOC_MARKER_KEY_DOCUMENT, mptr, json, 0, forceSync, lock);
}
////////////////////////////////////////////////////////////////////////////////
@ -167,10 +160,7 @@ namespace triagens {
return TRI_ERROR_TRANSACTION_INTERNAL;
}
TRI_primary_collection_t* primary = this->primaryCollection();
_synchronous = forceSync || primary->base._info._waitForSync;
return this->createCollectionDocument(primary, TRI_DOC_MARKER_KEY_EDGE, mptr, json, data, forceSync, lock);
return this->create(this->trxCollection(), TRI_DOC_MARKER_KEY_EDGE, mptr, json, data, forceSync, lock);
}
////////////////////////////////////////////////////////////////////////////////
@ -186,10 +176,7 @@ namespace triagens {
return TRI_ERROR_TRANSACTION_INTERNAL;
}
TRI_primary_collection_t* primary = this->primaryCollection();
_synchronous = forceSync || primary->base._info._waitForSync;
return this->createCollectionShaped(primary, TRI_DOC_MARKER_KEY_DOCUMENT, key, mptr, shaped, 0, forceSync, lock);
return this->create(this->trxCollection(), TRI_DOC_MARKER_KEY_DOCUMENT, key, mptr, shaped, 0, forceSync, lock);
}
////////////////////////////////////////////////////////////////////////////////
@ -206,10 +193,7 @@ namespace triagens {
return TRI_ERROR_TRANSACTION_INTERNAL;
}
TRI_primary_collection_t* primary = this->primaryCollection();
_synchronous = forceSync || primary->base._info._waitForSync;
return this->createCollectionShaped(primary, TRI_DOC_MARKER_KEY_EDGE, key, mptr, shaped, data, forceSync, lock);
return this->create(this->trxCollection(), TRI_DOC_MARKER_KEY_EDGE, key, mptr, shaped, data, forceSync, lock);
}
////////////////////////////////////////////////////////////////////////////////
@ -229,10 +213,7 @@ namespace triagens {
return TRI_ERROR_TRANSACTION_INTERNAL;
}
TRI_primary_collection_t* primary = this->primaryCollection();
_synchronous = forceSync || primary->base._info._waitForSync;
return this->updateCollectionDocument(primary, key, mptr, json, policy, expectedRevision, actualRevision, forceSync, lock);
return this->update(this->trxCollection(), key, mptr, json, policy, expectedRevision, actualRevision, forceSync, lock);
}
////////////////////////////////////////////////////////////////////////////////
@ -252,10 +233,7 @@ namespace triagens {
return TRI_ERROR_TRANSACTION_INTERNAL;
}
TRI_primary_collection_t* primary = this->primaryCollection();
_synchronous = forceSync || primary->base._info._waitForSync;
return this->updateCollectionShaped(primary, key, mptr, shaped, policy, expectedRevision, actualRevision, forceSync, lock);
return this->update(this->trxCollection(), key, mptr, shaped, policy, expectedRevision, actualRevision, forceSync, lock);
}
////////////////////////////////////////////////////////////////////////////////
@ -272,10 +250,7 @@ namespace triagens {
return TRI_ERROR_TRANSACTION_INTERNAL;
}
TRI_primary_collection_t* primary = this->primaryCollection();
_synchronous = forceSync || primary->base._info._waitForSync;
return this->deleteCollectionDocument(primary, key, policy, expectedRevision, actualRevision, forceSync, lock);
return this->remove(this->trxCollection(), key, policy, expectedRevision, actualRevision, forceSync, lock);
}
////////////////////////////////////////////////////////////////////////////////
@ -287,10 +262,7 @@ namespace triagens {
return TRI_ERROR_TRANSACTION_INTERNAL;
}
TRI_primary_collection_t* primary = this->primaryCollection();
_synchronous = forceSync || primary->base._info._waitForSync;
return this->truncateCollection(primary, forceSync);
return this->removeAll(this->trxCollection(), forceSync);
}
////////////////////////////////////////////////////////////////////////////////
@ -316,12 +288,6 @@ namespace triagens {
uint64_t _numWrites;
////////////////////////////////////////////////////////////////////////////////
/// @brief whether of not the last write action was executed synchronously
////////////////////////////////////////////////////////////////////////////////
bool _synchronous;
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////

View File

@ -33,12 +33,14 @@
#include "VocBase/transaction.h"
#include "VocBase/update-policy.h"
#include "VocBase/vocbase.h"
#include "VocBase/voc-shaper.h"
#include "BasicsC/random.h"
#include "BasicsC/strings.h"
#include "Logger/Logger.h"
#include "Utils/CollectionNameResolver.h"
#include "Utils/DocumentHelper.h"
namespace triagens {
namespace arango {
@ -86,11 +88,11 @@ namespace triagens {
Transaction (TRI_vocbase_t* const vocbase,
const triagens::arango::CollectionNameResolver& resolver) :
T(),
_trx(0),
_setupState(TRI_ERROR_NO_ERROR),
_nestingLevel(0),
_readOnly(true),
_hints(0),
_trx(0),
_vocbase(vocbase),
_resolver(resolver) {
@ -250,6 +252,32 @@ namespace triagens {
protected:
////////////////////////////////////////////////////////////////////////////////
/// @brief return a collection's primary collection
////////////////////////////////////////////////////////////////////////////////
TRI_primary_collection_t* primaryCollection (TRI_transaction_collection_t const* trxCollection) const {
TRI_ASSERT_DEBUG(_trx != 0);
TRI_ASSERT_DEBUG(getStatus() == TRI_TRANSACTION_RUNNING);
TRI_ASSERT_DEBUG(trxCollection->_collection != 0);
TRI_ASSERT_DEBUG(trxCollection->_collection->_collection != 0);
return trxCollection->_collection->_collection;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief return a collection's shaper
////////////////////////////////////////////////////////////////////////////////
TRI_shaper_t* shaper (TRI_transaction_collection_t const* trxCollection) const {
TRI_ASSERT_DEBUG(_trx != 0);
TRI_ASSERT_DEBUG(getStatus() == TRI_TRANSACTION_RUNNING);
TRI_ASSERT_DEBUG(trxCollection->_collection != 0);
TRI_ASSERT_DEBUG(trxCollection->_collection->_collection != 0);
return trxCollection->_collection->_collection->_shaper;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief add a collection by id
////////////////////////////////////////////////////////////////////////////////
@ -311,13 +339,14 @@ namespace triagens {
/// @brief read- or write-lock a collection
////////////////////////////////////////////////////////////////////////////////
int lockExplicit (TRI_primary_collection_t* const primary,
const TRI_transaction_type_e type) {
int lock (TRI_transaction_collection_t* trxCollection,
const TRI_transaction_type_e type) {
if (_trx == 0 || getStatus() != TRI_TRANSACTION_RUNNING) {
return TRI_ERROR_TRANSACTION_INVALID_STATE;
}
int res = TRI_LockCollectionTransaction(_trx, (TRI_transaction_cid_t) primary->base._info._cid, type, _nestingLevel);
int res = TRI_LockCollectionTransaction(trxCollection, type, _nestingLevel);
return res;
}
@ -326,13 +355,14 @@ namespace triagens {
/// @brief read- or write-unlock a collection
////////////////////////////////////////////////////////////////////////////////
int unlockExplicit (TRI_primary_collection_t* const primary,
const TRI_transaction_type_e type) {
int unlock (TRI_transaction_collection_t* trxCollection,
const TRI_transaction_type_e type) {
if (_trx == 0 || getStatus() != TRI_TRANSACTION_RUNNING) {
return TRI_ERROR_TRANSACTION_INVALID_STATE;
}
int res = TRI_UnlockCollectionTransaction(_trx, (TRI_transaction_cid_t) primary->base._info._cid, type, _nestingLevel);
int res = TRI_UnlockCollectionTransaction(trxCollection, type, _nestingLevel);
return res;
}
@ -341,13 +371,13 @@ namespace triagens {
/// @brief read- or write-unlock a collection
////////////////////////////////////////////////////////////////////////////////
bool isLocked (TRI_primary_collection_t* const primary,
bool isLocked (TRI_transaction_collection_t* const trxCollection,
const TRI_transaction_type_e type) {
if (_trx == 0 || getStatus() != TRI_TRANSACTION_RUNNING) {
return false;
}
const bool locked = TRI_IsLockedCollectionTransaction(_trx, (TRI_transaction_cid_t) primary->base._info._cid, type, _nestingLevel);
const bool locked = TRI_IsLockedCollectionTransaction(trxCollection, type, _nestingLevel);
return locked;
}
@ -356,16 +386,19 @@ namespace triagens {
/// @brief read any (random) document
////////////////////////////////////////////////////////////////////////////////
int readCollectionAny (TRI_primary_collection_t* const primary,
TRI_doc_mptr_t* mptr,
TRI_barrier_t** barrier) {
int readAny (TRI_transaction_collection_t* trxCollection,
TRI_doc_mptr_t* mptr,
TRI_barrier_t** barrier) {
TRI_primary_collection_t* primary = primaryCollection(trxCollection);
*barrier = TRI_CreateBarrierElement(&primary->_barrierList);
if (*barrier == 0) {
return TRI_ERROR_OUT_OF_MEMORY;
}
// READ-LOCK START
this->lockExplicit(primary, TRI_TRANSACTION_READ);
this->lock(trxCollection, TRI_TRANSACTION_READ);
if (primary->_primaryIndex._nrUsed == 0) {
TRI_FreeBarrier(*barrier);
@ -387,7 +420,7 @@ namespace triagens {
*mptr = *((TRI_doc_mptr_t*) beg[pos]);
}
this->unlockExplicit(primary, TRI_TRANSACTION_READ);
this->unlock(trxCollection, TRI_TRANSACTION_READ);
// READ-LOCK END
return TRI_ERROR_NO_ERROR;
@ -397,17 +430,17 @@ namespace triagens {
/// @brief read a single document, identified by key
////////////////////////////////////////////////////////////////////////////////
int readCollectionDocument (TRI_primary_collection_t* const primary,
TRI_doc_mptr_t* mptr,
const string& key,
const bool lock) {
TRI_doc_operation_context_t context;
TRI_InitContextPrimaryCollection(&context, primary);
int readSingle (TRI_transaction_collection_t* trxCollection,
TRI_doc_mptr_t* mptr,
const string& key,
const bool lock) {
TRI_primary_collection_t* primary = primaryCollection(trxCollection);
int res = primary->read(&context,
int res = primary->read(trxCollection,
(TRI_voc_key_t) key.c_str(),
mptr,
lock && ! isLocked(primary, TRI_TRANSACTION_READ));
lock && ! isLocked(trxCollection, TRI_TRANSACTION_READ));
return res;
}
@ -416,13 +449,13 @@ namespace triagens {
/// @brief read all documents
////////////////////////////////////////////////////////////////////////////////
int readCollectionDocuments (TRI_primary_collection_t* const primary,
vector<string>& ids) {
TRI_doc_operation_context_t context;
TRI_InitContextPrimaryCollection(&context, primary);
int readAll (TRI_transaction_collection_t* trxCollection,
vector<string>& ids) {
TRI_primary_collection_t* primary = primaryCollection(trxCollection);
// READ-LOCK START
this->lockExplicit(primary, TRI_TRANSACTION_READ);
this->lock(trxCollection, TRI_TRANSACTION_READ);
if (primary->_primaryIndex._nrUsed > 0) {
void** ptr = primary->_primaryIndex._table;
@ -439,7 +472,7 @@ namespace triagens {
}
}
this->unlockExplicit(primary, TRI_TRANSACTION_READ);
this->unlock(trxCollection, TRI_TRANSACTION_READ);
// READ-LOCK END
return TRI_ERROR_NO_ERROR;
@ -449,14 +482,14 @@ namespace triagens {
/// @brief read all master pointers, using skip and limit
////////////////////////////////////////////////////////////////////////////////
int readCollectionPointers (TRI_primary_collection_t* const primary,
vector<TRI_doc_mptr_t>& docs,
TRI_barrier_t** barrier,
TRI_voc_ssize_t skip,
TRI_voc_size_t limit,
uint32_t* total) {
TRI_doc_operation_context_t context;
TRI_InitContextPrimaryCollection(&context, primary);
int readSlice (TRI_transaction_collection_t* trxCollection,
vector<TRI_doc_mptr_t>& docs,
TRI_barrier_t** barrier,
TRI_voc_ssize_t skip,
TRI_voc_size_t limit,
uint32_t* total) {
TRI_primary_collection_t* primary = primaryCollection(trxCollection);
if (limit == 0) {
// nothing to do
@ -464,12 +497,12 @@ namespace triagens {
}
// READ-LOCK START
this->lockExplicit(primary, TRI_TRANSACTION_READ);
this->lock(trxCollection, TRI_TRANSACTION_READ);
if (primary->_primaryIndex._nrUsed == 0) {
// nothing to do
this->unlockExplicit(primary, TRI_TRANSACTION_READ);
this->unlock(trxCollection, TRI_TRANSACTION_READ);
// READ-LOCK END
return TRI_ERROR_NO_ERROR;
}
@ -534,7 +567,7 @@ namespace triagens {
}
}
this->unlockExplicit(primary, TRI_TRANSACTION_READ);
this->unlock(trxCollection, TRI_TRANSACTION_READ);
// READ-LOCK END
if (count == 0) {
@ -549,36 +582,37 @@ namespace triagens {
/// @brief create a single document, using JSON
////////////////////////////////////////////////////////////////////////////////
int createCollectionDocument (TRI_primary_collection_t* const primary,
const TRI_df_marker_type_e markerType,
TRI_doc_mptr_t* mptr,
TRI_json_t const* json,
void const* data,
const bool forceSync,
const bool lock) {
int create (TRI_transaction_collection_t* trxCollection,
const TRI_df_marker_type_e markerType,
TRI_doc_mptr_t* mptr,
TRI_json_t const* json,
void const* data,
const bool forceSync,
const bool lock) {
TRI_voc_key_t key = 0;
int res = this->getKey(json, &key);
int res = DocumentHelper::getKey(json, &key);
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
TRI_shaped_json_t* shaped = TRI_ShapedJsonJson(primary->_shaper, json);
TRI_shaped_json_t* shaped = TRI_ShapedJsonJson(shaper(trxCollection), json);
if (shaped == 0) {
return TRI_ERROR_ARANGO_SHAPER_FAILED;
}
res = createCollectionShaped(primary,
markerType,
key,
mptr,
shaped,
data,
forceSync,
lock);
res = create(trxCollection,
markerType,
key,
mptr,
shaped,
data,
forceSync,
lock);
TRI_FreeShapedJson(primary->_shaper, shaped);
TRI_FreeShapedJson(shaper(trxCollection), shaped);
return res;
}
@ -587,25 +621,24 @@ namespace triagens {
/// @brief create a single document, using shaped json
////////////////////////////////////////////////////////////////////////////////
inline int createCollectionShaped (TRI_primary_collection_t* const primary,
const TRI_df_marker_type_e markerType,
TRI_voc_key_t key,
TRI_doc_mptr_t* mptr,
TRI_shaped_json_t const* shaped,
void const* data,
const bool forceSync,
const bool lock) {
TRI_doc_operation_context_t context;
TRI_InitContextPrimaryCollection(&context, primary);
int res = primary->insert(&context,
inline int create (TRI_transaction_collection_t* trxCollection,
const TRI_df_marker_type_e markerType,
const TRI_voc_key_t key,
TRI_doc_mptr_t* mptr,
TRI_shaped_json_t const* shaped,
void const* data,
const bool forceSync,
const bool lock) {
TRI_primary_collection_t* primary = primaryCollection(trxCollection);
int res = primary->insert(trxCollection,
key,
mptr,
markerType,
shaped,
data,
(lock && ! isLocked(primary, TRI_TRANSACTION_WRITE)),
(lock && ! isLocked(trxCollection, TRI_TRANSACTION_WRITE)),
forceSync);
return res;
@ -615,33 +648,33 @@ namespace triagens {
/// @brief update a single document, using JSON
////////////////////////////////////////////////////////////////////////////////
int updateCollectionDocument (TRI_primary_collection_t* const primary,
const string& key,
TRI_doc_mptr_t* mptr,
TRI_json_t* const json,
const TRI_doc_update_policy_e policy,
const TRI_voc_rid_t expectedRevision,
TRI_voc_rid_t* actualRevision,
const bool forceSync,
const bool lock) {
int update (TRI_transaction_collection_t* trxCollection,
const string& key,
TRI_doc_mptr_t* mptr,
TRI_json_t* const json,
const TRI_doc_update_policy_e policy,
const TRI_voc_rid_t expectedRevision,
TRI_voc_rid_t* actualRevision,
const bool forceSync,
const bool lock) {
TRI_shaped_json_t* shaped = TRI_ShapedJsonJson(primary->_shaper, json);
TRI_shaped_json_t* shaped = TRI_ShapedJsonJson(shaper(trxCollection), json);
if (shaped == 0) {
return TRI_ERROR_ARANGO_SHAPER_FAILED;
}
int res = updateCollectionShaped(primary,
key,
mptr,
shaped,
policy,
expectedRevision,
actualRevision,
forceSync,
lock);
int res = update(trxCollection,
key,
mptr,
shaped,
policy,
expectedRevision,
actualRevision,
forceSync,
lock);
TRI_FreeShapedJson(primary->_shaper, shaped);
TRI_FreeShapedJson(shaper(trxCollection), shaped);
return res;
}
@ -650,28 +683,27 @@ namespace triagens {
/// @brief update a single document, using shaped json
////////////////////////////////////////////////////////////////////////////////
inline int updateCollectionShaped (TRI_primary_collection_t* const primary,
const string& key,
TRI_doc_mptr_t* mptr,
TRI_shaped_json_t* const shaped,
const TRI_doc_update_policy_e policy,
const TRI_voc_rid_t expectedRevision,
TRI_voc_rid_t* actualRevision,
const bool forceSync,
const bool lock) {
TRI_doc_operation_context_t context;
TRI_InitContextPrimaryCollection(&context, primary);
inline int update (TRI_transaction_collection_t* const trxCollection,
const string& key,
TRI_doc_mptr_t* mptr,
TRI_shaped_json_t* const shaped,
const TRI_doc_update_policy_e policy,
const TRI_voc_rid_t expectedRevision,
TRI_voc_rid_t* actualRevision,
const bool forceSync,
const bool lock) {
TRI_doc_update_policy_t updatePolicy;
TRI_InitUpdatePolicy(&updatePolicy, policy, expectedRevision, actualRevision);
int res = primary->update(&context,
(TRI_voc_key_t) key.c_str(),
TRI_primary_collection_t* primary = primaryCollection(trxCollection);
int res = primary->update(trxCollection,
(const TRI_voc_key_t) key.c_str(),
mptr,
shaped,
&updatePolicy,
(lock && ! isLocked(primary, TRI_TRANSACTION_WRITE)),
(lock && ! isLocked(trxCollection, TRI_TRANSACTION_WRITE)),
forceSync);
return res;
@ -681,24 +713,24 @@ namespace triagens {
/// @brief delete a single document
////////////////////////////////////////////////////////////////////////////////
int deleteCollectionDocument (TRI_primary_collection_t* const primary,
const string& key,
const TRI_doc_update_policy_e policy,
const TRI_voc_rid_t expectedRevision,
TRI_voc_rid_t* actualRevision,
const bool forceSync,
const bool lock) {
TRI_doc_operation_context_t context;
TRI_InitContextPrimaryCollection(&context, primary);
int remove (TRI_transaction_collection_t* trxCollection,
const string& key,
const TRI_doc_update_policy_e policy,
const TRI_voc_rid_t expectedRevision,
TRI_voc_rid_t* actualRevision,
const bool forceSync,
const bool lock) {
TRI_doc_update_policy_t updatePolicy;
TRI_InitUpdatePolicy(&updatePolicy, policy, expectedRevision, actualRevision);
TRI_primary_collection_t* primary = primaryCollection(trxCollection);
int res = primary->destroy(&context,
(TRI_voc_key_t) key.c_str(),
&updatePolicy,
(lock && ! isLocked(primary, TRI_TRANSACTION_WRITE)),
forceSync);
int res = primary->remove(trxCollection,
(TRI_voc_key_t) key.c_str(),
&updatePolicy,
(lock && ! isLocked(trxCollection, TRI_TRANSACTION_WRITE)),
forceSync);
return res;
}
@ -707,38 +739,36 @@ namespace triagens {
/// @brief truncate a collection
////////////////////////////////////////////////////////////////////////////////
int truncateCollection (TRI_primary_collection_t* const primary,
const bool forceSync) {
int removeAll (TRI_transaction_collection_t* const trxCollection,
const bool forceSync) {
vector<string> ids;
int res = readAll(trxCollection, ids);
int res = readCollectionDocuments(primary, ids);
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
TRI_doc_operation_context_t context;
TRI_InitContextPrimaryCollection(&context, primary);
TRI_doc_update_policy_t updatePolicy;
TRI_InitUpdatePolicy(&updatePolicy, TRI_DOC_UPDATE_LAST_WRITE, 0, NULL);
size_t n = ids.size();
TRI_primary_collection_t* primary = primaryCollection(trxCollection);
res = TRI_ERROR_NO_ERROR;
// WRITE-LOCK START
this->lockExplicit(primary, TRI_TRANSACTION_WRITE);
// TODO: fix locks
this->lock(trxCollection, TRI_TRANSACTION_WRITE);
for (size_t i = 0; i < n; ++i) {
const string& id = ids[i];
res = primary->destroy(&context,
(TRI_voc_key_t) id.c_str(),
&updatePolicy,
false,
forceSync);
res = primary->remove(trxCollection,
(TRI_voc_key_t) id.c_str(),
&updatePolicy,
false,
forceSync);
if (res != TRI_ERROR_NO_ERROR) {
// halt on first error
@ -746,21 +776,12 @@ namespace triagens {
}
}
this->unlockExplicit(primary, TRI_TRANSACTION_WRITE);
this->unlock(trxCollection, TRI_TRANSACTION_WRITE);
// WRITE-LOCK END
return res;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief return the pointer to the C transaction struct
/// DEPRECATED
////////////////////////////////////////////////////////////////////////////////
TRI_transaction_t* getTrx () {
return this->_trx;
}
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
@ -909,36 +930,6 @@ namespace triagens {
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief extract the "_key" attribute from a JSON object
/// TODO: move out of this class
////////////////////////////////////////////////////////////////////////////////
int getKey (TRI_json_t const* json, TRI_voc_key_t* key) {
*key = 0;
// check type of json
if (json == 0 || json->_type != TRI_JSON_ARRAY) {
return TRI_ERROR_NO_ERROR;
}
// check _key is there
const TRI_json_t* k = TRI_LookupArrayJson((TRI_json_t*) json, "_key");
if (k == 0) {
return TRI_ERROR_NO_ERROR;
}
if (k->_type != TRI_JSON_STRING) {
// _key is there but not a string
return TRI_ERROR_ARANGO_DOCUMENT_KEY_BAD;
}
// _key is there and a string
*key = k->_value._string.data;
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @}
@ -955,12 +946,6 @@ namespace triagens {
private:
////////////////////////////////////////////////////////////////////////////////
/// @brief the C transaction struct
////////////////////////////////////////////////////////////////////////////////
TRI_transaction_t* _trx;
////////////////////////////////////////////////////////////////////////////////
/// @brief error that occurred on transaction initialisation (before begin())
////////////////////////////////////////////////////////////////////////////////
@ -1000,6 +985,12 @@ namespace triagens {
protected:
////////////////////////////////////////////////////////////////////////////////
/// @brief the C transaction struct
////////////////////////////////////////////////////////////////////////////////
TRI_transaction_t* _trx;
////////////////////////////////////////////////////////////////////////////////
/// @brief the vocbase
////////////////////////////////////////////////////////////////////////////////

View File

@ -1819,7 +1819,6 @@ static v8::Handle<v8::Value> JS_ByExampleQuery (v8::Arguments const& argv) {
// setup result
v8::Handle<v8::Object> result = v8::Object::New();
v8::Handle<v8::Array> documents = v8::Array::New();
result->Set(v8::String::New("documents"), documents);
@ -1827,14 +1826,10 @@ static v8::Handle<v8::Value> JS_ByExampleQuery (v8::Arguments const& argv) {
// inside a read transaction
// .............................................................................
TRI_doc_operation_context_t context;
TRI_InitContextPrimaryCollection(&context, primary);
trx.lockRead();
// find documents by example
TRI_vector_t filtered = TRI_SelectByExample(&context, n, pids, values);
TRI_vector_t filtered = TRI_SelectByExample(trx.trxCollection(), n, pids, values);
// convert to list of shaped jsons
size_t total = filtered._length;

View File

@ -809,7 +809,7 @@ static v8::Handle<v8::Value> DocumentVocbaseCol (const bool useCollection,
return scope.Close(v8::ThrowException(TRI_CreateErrorObject(res, "cannot fetch document", true)));
}
TRI_barrier_t* barrier = TRI_CreateBarrierElement(trx.barrierList());
TRI_barrier_t* barrier = TRI_CreateBarrierElement(&(trx.primaryCollection()->_barrierList));
if (barrier == 0) {
return scope.Close(v8::ThrowException(TRI_CreateErrorObject(TRI_ERROR_OUT_OF_MEMORY)));
}

View File

@ -250,8 +250,7 @@ static size_t LengthDataMasterPointer (const TRI_doc_mptr_t* const mptr) {
/// @brief checks whether a header is visible in the current context
////////////////////////////////////////////////////////////////////////////////
static bool IsVisible (TRI_doc_mptr_t const* header,
const TRI_doc_operation_context_t* const context) {
static bool IsVisible (TRI_doc_mptr_t const* header) {
return (header != NULL && header->_validTo == 0);
}
@ -704,6 +703,26 @@ static int WriteElement (TRI_document_collection_t* document,
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief whether or not to write synchronised
////////////////////////////////////////////////////////////////////////////////
static bool RealSync (TRI_transaction_collection_t* trxCollection,
const bool syncRequested) {
if (syncRequested) {
// explicit request to sync
return true;
}
if (trxCollection->_waitForSync) {
// collection has waitForSync property
return true;
}
// default is false
return false;
}
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
@ -764,18 +783,20 @@ static int WriteMarker (TRI_document_collection_t* document,
/// the collection must be held
////////////////////////////////////////////////////////////////////////////////
static int InsertDocument (TRI_document_collection_t* document,
static int InsertDocument (TRI_transaction_collection_t* trxCollection,
TRI_doc_document_key_marker_t* marker,
TRI_doc_mptr_t* header,
TRI_voc_size_t totalSize,
bool forceSync,
TRI_doc_mptr_t* mptr) {
TRI_document_collection_t* document;
TRI_df_marker_t* result;
TRI_voc_fid_t fid;
int res;
bool writeSynced;
TRI_ASSERT_DEBUG(header != NULL);
document = (TRI_document_collection_t*) trxCollection->_collection->_collection;
// .............................................................................
// insert into indexes
@ -803,14 +824,17 @@ static int InsertDocument (TRI_document_collection_t* document,
// .............................................................................
// insert into datafile
// .............................................................................
res = WriteMarker(document, &marker->base, totalSize, &fid, &result, forceSync);
writeSynced = RealSync(trxCollection, forceSync);
res = WriteMarker(document, &marker->base, totalSize, &fid, &result, writeSynced);
if (res == TRI_ERROR_NO_ERROR) {
// writing the element into the datafile has succeeded
TRI_doc_datafile_info_t* dfi;
size_t i, n;
TRI_IncreaseWritesCollectionTransaction(trxCollection, writeSynced);
// update the header with the correct fid and the positions in the datafile
header->_fid = fid;
header->_data = ((char*) result);
@ -838,7 +862,7 @@ static int InsertDocument (TRI_document_collection_t* document,
idx = document->_allIndexes._buffer[i];
if (idx->postInsert != NULL) {
idx->postInsert(idx, header);
idx->postInsert(trxCollection, idx, header);
}
}
}
@ -861,7 +885,7 @@ static int InsertDocument (TRI_document_collection_t* document,
/// the collection must be held
////////////////////////////////////////////////////////////////////////////////
static int DeleteDocument (TRI_doc_operation_context_t* context,
static int RemoveDocument (TRI_transaction_collection_t* trxCollection,
TRI_doc_update_policy_t const* policy,
TRI_doc_deletion_key_marker_t* marker,
const TRI_voc_size_t totalSize,
@ -872,14 +896,15 @@ static int DeleteDocument (TRI_doc_operation_context_t* context,
TRI_df_marker_t* result;
TRI_voc_fid_t fid;
int res;
bool writeSynced;
primary = context->_collection;
primary = trxCollection->_collection->_collection;
document = (TRI_document_collection_t*) primary;
// get the existing header pointer
header = TRI_LookupByKeyAssociativePointer(&primary->_primaryIndex, ((char*) marker) + marker->_offsetKey);
if (! IsVisible(header, context)) {
if (! IsVisible(header)) {
return TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND;
}
@ -922,12 +947,15 @@ static int DeleteDocument (TRI_doc_operation_context_t* context,
// find and select a journal
res = WriteMarker(document, &marker->base, totalSize, &fid, &result, forceSync);
writeSynced = RealSync(trxCollection, forceSync);
res = WriteMarker(document, &marker->base, totalSize, &fid, &result, writeSynced);
if (res == TRI_ERROR_NO_ERROR) {
TRI_doc_datafile_info_t* dfi;
size_t i, n;
TRI_IncreaseWritesCollectionTransaction(trxCollection, writeSynced);
// update the datafile info
dfi = TRI_FindDatafileInfoPrimaryCollection(primary, header->_fid);
@ -961,7 +989,7 @@ static int DeleteDocument (TRI_doc_operation_context_t* context,
idx = document->_allIndexes._buffer[i];
if (idx->postRemove != NULL) {
idx->postRemove(idx, header);
idx->postRemove(trxCollection, idx, header);
}
}
@ -1002,18 +1030,21 @@ static void UpdateHeader (TRI_voc_fid_t fid,
/// @brief updates an existing document splitted into marker and body to file
////////////////////////////////////////////////////////////////////////////////
static int UpdateDocument (TRI_document_collection_t* document,
static int UpdateDocument (TRI_transaction_collection_t* trxCollection,
TRI_doc_mptr_t* oldHeader,
TRI_doc_document_key_marker_t* marker,
const TRI_voc_size_t totalSize,
const bool forceSync,
TRI_doc_mptr_t* mptr) {
TRI_document_collection_t* document;
TRI_doc_mptr_t* newHeader;
TRI_doc_mptr_t oldData;
TRI_df_marker_t* result;
TRI_voc_fid_t fid;
int res;
bool writeSynced;
document = (TRI_document_collection_t*) trxCollection->_collection->_collection;
// save the old data, remember
oldData = *oldHeader;
@ -1066,12 +1097,15 @@ static int UpdateDocument (TRI_document_collection_t* document,
// write datafile
// .............................................................................
res = WriteMarker(document, &marker->base, totalSize, &fid, &result, forceSync);
writeSynced = RealSync(trxCollection, forceSync);
res = WriteMarker(document, &marker->base, totalSize, &fid, &result, writeSynced);
if (res == TRI_ERROR_NO_ERROR) {
TRI_doc_datafile_info_t* dfi;
TRI_primary_collection_t* primary;
size_t i, n;
TRI_IncreaseWritesCollectionTransaction(trxCollection, writeSynced);
// update the header with the correct fid and the positions in the datafile
newHeader->_fid = fid;
@ -1114,7 +1148,7 @@ static int UpdateDocument (TRI_document_collection_t* document,
idx = document->_allIndexes._buffer[i];
if (idx->postUpdate != NULL) {
idx->postUpdate(idx, newHeader, oldHeader, &oldData);
idx->postUpdate(trxCollection, idx, newHeader, oldHeader, &oldData);
}
}
@ -1372,7 +1406,7 @@ static int NotifyTransaction (TRI_primary_collection_t* primary,
/// note: key might be NULL. in this case, a key is auto-generated
////////////////////////////////////////////////////////////////////////////////
static int InsertShapedJson (TRI_doc_operation_context_t* context,
static int InsertShapedJson (TRI_transaction_collection_t* trxCollection,
const TRI_voc_key_t key,
TRI_doc_mptr_t* mptr,
TRI_df_marker_type_e markerType,
@ -1389,7 +1423,7 @@ static int InsertShapedJson (TRI_doc_operation_context_t* context,
TRI_voc_size_t totalSize;
int res;
primary = context->_collection;
primary = trxCollection->_collection->_collection;
TRI_ASSERT_DEBUG(primary != NULL);
TRI_ASSERT_DEBUG(shaped != NULL);
@ -1418,7 +1452,7 @@ static int InsertShapedJson (TRI_doc_operation_context_t* context,
res = CreateHeader(document, marker, 0, &header);
if (res == TRI_ERROR_NO_ERROR) {
res = InsertDocument(document, marker, header, totalSize, forceSync, mptr);
res = InsertDocument(trxCollection, marker, header, totalSize, forceSync, mptr);
}
if (res != TRI_ERROR_NO_ERROR) {
@ -1452,14 +1486,14 @@ static int InsertShapedJson (TRI_doc_operation_context_t* context,
/// @brief reads an element from the document collection
////////////////////////////////////////////////////////////////////////////////
static int ReadShapedJson (TRI_doc_operation_context_t* context,
static int ReadShapedJson (TRI_transaction_collection_t* trxCollection,
const TRI_voc_key_t key,
TRI_doc_mptr_t* mptr,
const bool lock) {
TRI_primary_collection_t* primary;
TRI_doc_mptr_t const* header;
primary = context->_collection;
primary = trxCollection->_collection->_collection;
if (lock) {
primary->beginRead(primary);
@ -1467,7 +1501,7 @@ static int ReadShapedJson (TRI_doc_operation_context_t* context,
header = TRI_LookupByKeyAssociativePointer(&primary->_primaryIndex, key);
if (! IsVisible(header, context)) {
if (! IsVisible(header)) {
if (lock) {
primary->endRead(primary);
}
@ -1499,7 +1533,7 @@ static int ReadShapedJson (TRI_doc_operation_context_t* context,
/// @brief updates a document in the collection from shaped json
////////////////////////////////////////////////////////////////////////////////
static int UpdateShapedJson (TRI_doc_operation_context_t* context,
static int UpdateShapedJson (TRI_transaction_collection_t* trxCollection,
const TRI_voc_key_t key,
TRI_doc_mptr_t* mptr,
TRI_shaped_json_t const* shaped,
@ -1512,6 +1546,8 @@ static int UpdateShapedJson (TRI_doc_operation_context_t* context,
TRI_voc_size_t totalSize;
int res;
primary = trxCollection->_collection->_collection;
TRI_ASSERT_DEBUG(mptr != NULL);
// initialise the result
@ -1520,8 +1556,6 @@ static int UpdateShapedJson (TRI_doc_operation_context_t* context,
marker = NULL;
primary = context->_collection;
if (lock) {
primary->beginWrite(primary);
}
@ -1531,7 +1565,7 @@ static int UpdateShapedJson (TRI_doc_operation_context_t* context,
// get the header pointer of the previous revision
header = TRI_LookupByKeyAssociativePointer(&primary->_primaryIndex, key);
if (IsVisible(header, context)) {
if (IsVisible(header)) {
// document found, now check revision
res = TRI_CheckUpdatePolicy(policy, header->_rid);
}
@ -1549,7 +1583,7 @@ static int UpdateShapedJson (TRI_doc_operation_context_t* context,
res = CloneDocumentMarker(original, &marker, &totalSize, original->_type, shaped);
if (res == TRI_ERROR_NO_ERROR) {
res = UpdateDocument((TRI_document_collection_t*) primary, header, marker, totalSize, forceSync, mptr);
res = UpdateDocument(trxCollection, header, marker, totalSize, forceSync, mptr);
}
}
@ -1577,7 +1611,7 @@ static int UpdateShapedJson (TRI_doc_operation_context_t* context,
/// @brief deletes a json document given the identifier
////////////////////////////////////////////////////////////////////////////////
static int DeleteShapedJson (TRI_doc_operation_context_t* context,
static int RemoveShapedJson (TRI_transaction_collection_t* trxCollection,
const TRI_voc_key_t key,
TRI_doc_update_policy_t const* policy,
const bool lock,
@ -1587,6 +1621,7 @@ static int DeleteShapedJson (TRI_doc_operation_context_t* context,
TRI_voc_size_t totalSize;
int res;
primary = trxCollection->_collection->_collection;
TRI_ASSERT_DEBUG(key != NULL);
marker = NULL;
@ -1598,13 +1633,11 @@ static int DeleteShapedJson (TRI_doc_operation_context_t* context,
TRI_ASSERT_DEBUG(marker != NULL);
primary = context->_collection;
if (lock) {
primary->beginWrite(primary);
}
res = DeleteDocument(context, policy, marker, totalSize, forceSync);
res = RemoveDocument(trxCollection, policy, marker, totalSize, forceSync);
if (res == TRI_ERROR_NO_ERROR) {
DecreaseDocumentCount(primary);
@ -2217,7 +2250,7 @@ static bool InitDocumentCollection (TRI_document_collection_t* collection,
collection->base.insert = InsertShapedJson;
collection->base.read = ReadShapedJson;
collection->base.update = UpdateShapedJson;
collection->base.destroy = DeleteShapedJson;
collection->base.remove = RemoveShapedJson;
collection->cleanupIndexes = CleanupIndexes;
@ -2701,7 +2734,6 @@ static TRI_json_t* ExtractFieldValues (TRI_json_t* jsonIndex, size_t* fieldCount
static int FillIndex (TRI_document_collection_t* document, TRI_index_t* idx) {
TRI_doc_mptr_t const* mptr;
TRI_primary_collection_t* primary;
TRI_doc_operation_context_t context;
uint64_t inserted;
void** end;
void** ptr;
@ -2709,8 +2741,6 @@ static int FillIndex (TRI_document_collection_t* document, TRI_index_t* idx) {
primary = &document->base;
TRI_InitContextPrimaryCollection(&context, primary);
// update index
ptr = primary->_primaryIndex._table;
end = ptr + primary->_primaryIndex._nrAlloc;
@ -2718,7 +2748,7 @@ static int FillIndex (TRI_document_collection_t* document, TRI_index_t* idx) {
inserted = 0;
for (; ptr < end; ++ptr) {
if (IsVisible(*ptr, &context)) {
if (IsVisible(*ptr)) {
mptr = *ptr;
res = idx->insert(idx, mptr, false);
@ -5163,7 +5193,7 @@ static bool IsExampleMatch (TRI_shaper_t* shaper,
/// @brief executes a select-by-example query
////////////////////////////////////////////////////////////////////////////////
TRI_vector_t TRI_SelectByExample (TRI_doc_operation_context_t* context,
TRI_vector_t TRI_SelectByExample (TRI_transaction_collection_t* trxCollection,
size_t length,
TRI_shape_pid_t* pids,
TRI_shaped_json_t** values) {
@ -5173,7 +5203,7 @@ TRI_vector_t TRI_SelectByExample (TRI_doc_operation_context_t* context,
TRI_doc_mptr_t const** end;
TRI_vector_t filtered;
primary = context->_collection;
primary = trxCollection->_collection->_collection;
// use filtered to hold copies of the master pointer
TRI_InitVector(&filtered, TRI_UNKNOWN_MEM_ZONE, sizeof(TRI_doc_mptr_t));
@ -5185,7 +5215,7 @@ TRI_vector_t TRI_SelectByExample (TRI_doc_operation_context_t* context,
end = (TRI_doc_mptr_t const**) ptr + primary->_primaryIndex._nrAlloc;
for (; ptr < end; ++ptr) {
if (IsVisible(*ptr, context)) {
if (IsVisible(*ptr)) {
if (IsExampleMatch(shaper, *ptr, length, pids, values)) {
TRI_PushBackVector(&filtered, *ptr);
}
@ -5199,11 +5229,11 @@ TRI_vector_t TRI_SelectByExample (TRI_doc_operation_context_t* context,
/// @brief deletes a documet given by a master pointer
////////////////////////////////////////////////////////////////////////////////
int TRI_DeleteDocumentDocumentCollection (TRI_doc_operation_context_t* context,
int TRI_DeleteDocumentDocumentCollection (TRI_transaction_collection_t* trxCollection,
TRI_doc_update_policy_t const* policy,
TRI_doc_mptr_t* doc) {
// no extra locking here as the collection is already locked
return DeleteShapedJson(context, doc->_key, policy, false, false);
return RemoveShapedJson(trxCollection, doc->_key, policy, false, false);
}
////////////////////////////////////////////////////////////////////////////////

View File

@ -620,7 +620,7 @@ struct TRI_index_s* TRI_EnsurePriorityQueueIndexDocumentCollection (TRI_document
/// @brief executes a select-by-example query
////////////////////////////////////////////////////////////////////////////////
TRI_vector_t TRI_SelectByExample (TRI_doc_operation_context_t*,
TRI_vector_t TRI_SelectByExample (struct TRI_transaction_collection_s*,
size_t,
TRI_shape_pid_t*,
TRI_shaped_json_t**);
@ -629,7 +629,7 @@ TRI_vector_t TRI_SelectByExample (TRI_doc_operation_context_t*,
/// @brief deletes a documet given by a master pointer
////////////////////////////////////////////////////////////////////////////////
int TRI_DeleteDocumentDocumentCollection (TRI_doc_operation_context_t*,
int TRI_DeleteDocumentDocumentCollection (struct TRI_transaction_collection_s*,
struct TRI_doc_update_policy_s const*,
TRI_doc_mptr_t*);

View File

@ -134,9 +134,9 @@ typedef struct TRI_index_s {
int (*remove) (struct TRI_index_s*, struct TRI_doc_mptr_s const*, const bool);
// NULL by default. will only be called if non-NULL
int (*postInsert) (struct TRI_index_s*, struct TRI_doc_mptr_s const*);
int (*postRemove) (struct TRI_index_s*, struct TRI_doc_mptr_s const*);
int (*postUpdate) (struct TRI_index_s*, struct TRI_doc_mptr_s const*, struct TRI_doc_mptr_s const*, struct TRI_doc_mptr_s const*);
int (*postInsert) (struct TRI_transaction_collection_s*, struct TRI_index_s*, struct TRI_doc_mptr_s const*);
int (*postRemove) (struct TRI_transaction_collection_s*, struct TRI_index_s*, struct TRI_doc_mptr_s const*);
int (*postUpdate) (struct TRI_transaction_collection_s*, struct TRI_index_s*, struct TRI_doc_mptr_s const*, struct TRI_doc_mptr_s const*, struct TRI_doc_mptr_s const*);
// a garbage collection function for the index
int (*cleanup) (struct TRI_index_s*);

View File

@ -568,15 +568,6 @@ bool TRI_CloseCompactorPrimaryCollection (TRI_primary_collection_t* primary,
return CloseJournalPrimaryCollection(primary, position, true);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief initialise a new operation context
////////////////////////////////////////////////////////////////////////////////
void TRI_InitContextPrimaryCollection (TRI_doc_operation_context_t* const context,
TRI_primary_collection_t* const primary) {
context->_collection = primary;
}
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////

View File

@ -101,17 +101,6 @@ struct TRI_primary_collection_s;
/// @{
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @brief typedef for arbitrary collection operation parameters
///
/// the context struct needs to be passed as a parameter for CRUD operations
////////////////////////////////////////////////////////////////////////////////
typedef struct TRI_doc_operation_context_s {
struct TRI_primary_collection_s* _collection; // collection to be used
}
TRI_doc_operation_context_t;
////////////////////////////////////////////////////////////////////////////////
/// @brief master pointer
////////////////////////////////////////////////////////////////////////////////
@ -296,12 +285,12 @@ typedef struct TRI_primary_collection_s {
int (*notifyTransaction) (struct TRI_primary_collection_s*, TRI_transaction_status_e);
int (*insert) (struct TRI_doc_operation_context_s*, const TRI_voc_key_t, TRI_doc_mptr_t*, TRI_df_marker_type_e, TRI_shaped_json_t const*, void const*, const bool, const bool);
int (*insert) (struct TRI_transaction_collection_s*, const TRI_voc_key_t, TRI_doc_mptr_t*, TRI_df_marker_type_e, TRI_shaped_json_t const*, void const*, const bool, const bool);
int (*read) (struct TRI_doc_operation_context_s*, const TRI_voc_key_t, TRI_doc_mptr_t*, const bool);
int (*read) (struct TRI_transaction_collection_s*, const TRI_voc_key_t, TRI_doc_mptr_t*, const bool);
int (*update) (struct TRI_doc_operation_context_s*, const TRI_voc_key_t, TRI_doc_mptr_t*, TRI_shaped_json_t const*, struct TRI_doc_update_policy_s const*, const bool, const bool);
int (*destroy) (struct TRI_doc_operation_context_s*, const TRI_voc_key_t, struct TRI_doc_update_policy_s const*, const bool, const bool);
int (*update) (struct TRI_transaction_collection_s*, const TRI_voc_key_t, TRI_doc_mptr_t*, TRI_shaped_json_t const*, struct TRI_doc_update_policy_s const*, const bool, const bool);
int (*remove) (struct TRI_transaction_collection_s*, const TRI_voc_key_t, struct TRI_doc_update_policy_s const*, const bool, const bool);
TRI_doc_collection_info_t* (*figures) (struct TRI_primary_collection_s* collection);
TRI_voc_size_t (*size) (struct TRI_primary_collection_s* collection);
@ -466,13 +455,6 @@ TRI_datafile_t* TRI_CreateCompactorPrimaryCollection (TRI_primary_collection_t*)
bool TRI_CloseCompactorPrimaryCollection (TRI_primary_collection_t*,
size_t);
////////////////////////////////////////////////////////////////////////////////
/// @brief initialise a new operation context
////////////////////////////////////////////////////////////////////////////////
void TRI_InitContextPrimaryCollection (TRI_doc_operation_context_t* const,
TRI_primary_collection_t* const);
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////

View File

@ -395,74 +395,77 @@ static TRI_transaction_collection_t* FindCollection (const TRI_transaction_t* co
/// @brief create a transaction collection container
////////////////////////////////////////////////////////////////////////////////
static TRI_transaction_collection_t* CreateCollection (TRI_transaction_context_t* context,
static TRI_transaction_collection_t* CreateCollection (TRI_transaction_t* trx,
const TRI_transaction_cid_t cid,
const TRI_transaction_type_e accessType,
const int nestingLevel) {
TRI_transaction_collection_t* collection;
TRI_transaction_collection_t* trxCollection;
TRI_transaction_collection_global_t* globalInstance;
globalInstance = GetGlobalInstance(context, cid, true);
globalInstance = GetGlobalInstance(trx->_context, cid, true);
if (globalInstance == NULL) {
return NULL;
}
collection = TRI_Allocate(TRI_UNKNOWN_MEM_ZONE, sizeof(TRI_transaction_collection_t), false);
trxCollection = TRI_Allocate(TRI_UNKNOWN_MEM_ZONE, sizeof(TRI_transaction_collection_t), false);
if (collection == NULL) {
if (trxCollection == NULL) {
// OOM
return NULL;
}
// initialise collection properties
collection->_cid = cid;
collection->_accessType = accessType;
collection->_nestingLevel = nestingLevel;
collection->_collection = NULL;
collection->_locked = false;
collection->_globalInstance = globalInstance;
trxCollection->_transaction = trx;
trxCollection->_cid = cid;
trxCollection->_accessType = accessType;
trxCollection->_nestingLevel = nestingLevel;
trxCollection->_collection = NULL;
trxCollection->_globalInstance = globalInstance;
trxCollection->_numWrites = 0;
trxCollection->_locked = false;
trxCollection->_waitForSync = false;
trxCollection->_hadSyncedWrites = false;
return collection;
return trxCollection;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief free a transaction collection container
////////////////////////////////////////////////////////////////////////////////
static void FreeCollection (TRI_transaction_collection_t* collection) {
TRI_ASSERT_DEBUG(collection != NULL);
static void FreeCollection (TRI_transaction_collection_t* trxCollection) {
TRI_ASSERT_DEBUG(trxCollection != NULL);
TRI_Free(TRI_UNKNOWN_MEM_ZONE, collection);
TRI_Free(TRI_UNKNOWN_MEM_ZONE, trxCollection);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief lock a collection
////////////////////////////////////////////////////////////////////////////////
static int LockCollection (TRI_transaction_t* trx,
TRI_transaction_collection_t* collection,
static int LockCollection (TRI_transaction_collection_t* trxCollection,
const TRI_transaction_type_e type,
const int nestingLevel) {
TRI_primary_collection_t* primary;
TRI_ASSERT_DEBUG(collection != NULL);
TRI_ASSERT_DEBUG(collection->_collection != NULL);
TRI_ASSERT_DEBUG(collection->_collection->_collection != NULL);
TRI_ASSERT_DEBUG(collection->_locked == false);
TRI_ASSERT_DEBUG(trxCollection != NULL);
TRI_ASSERT_DEBUG(trxCollection->_collection != NULL);
TRI_ASSERT_DEBUG(trxCollection->_collection->_collection != NULL);
TRI_ASSERT_DEBUG(trxCollection->_locked == false);
primary = collection->_collection->_collection;
primary = trxCollection->_collection->_collection;
if (type == TRI_TRANSACTION_READ) {
LOG_TRX(trx, nestingLevel, "read-locking collection %llu", (unsigned long long) collection->_cid);
LOG_TRX(trxCollection->_transaction, nestingLevel, "read-locking collection %llu", (unsigned long long) trxCollection->_cid);
primary->beginRead(primary);
}
else {
LOG_TRX(trx, nestingLevel, "write-locking collection %llu", (unsigned long long) collection->_cid);
LOG_TRX(trxCollection->_transaction, nestingLevel, "write-locking collection %llu", (unsigned long long) trxCollection->_cid);
primary->beginWrite(primary);
}
collection->_locked = true;
trxCollection->_locked = true;
return TRI_ERROR_NO_ERROR;
}
@ -471,29 +474,28 @@ static int LockCollection (TRI_transaction_t* trx,
/// @brief unlock a collection
////////////////////////////////////////////////////////////////////////////////
static int UnlockCollection (TRI_transaction_t* trx,
TRI_transaction_collection_t* collection,
static int UnlockCollection (TRI_transaction_collection_t* trxCollection,
const TRI_transaction_type_e type,
const int nestingLevel) {
TRI_primary_collection_t* primary;
TRI_ASSERT_DEBUG(collection != NULL);
TRI_ASSERT_DEBUG(collection->_collection != NULL);
TRI_ASSERT_DEBUG(collection->_collection->_collection != NULL);
TRI_ASSERT_DEBUG(collection->_locked == true);
TRI_ASSERT_DEBUG(trxCollection != NULL);
TRI_ASSERT_DEBUG(trxCollection->_collection != NULL);
TRI_ASSERT_DEBUG(trxCollection->_collection->_collection != NULL);
TRI_ASSERT_DEBUG(trxCollection->_locked == true);
primary = collection->_collection->_collection;
primary = trxCollection->_collection->_collection;
if (type == TRI_TRANSACTION_READ) {
LOG_TRX(trx, nestingLevel, "read-unlocking collection %llu", (unsigned long long) collection->_cid);
LOG_TRX(trxCollection->_transaction, nestingLevel, "read-unlocking collection %llu", (unsigned long long) trxCollection->_cid);
primary->endRead(primary);
}
else {
LOG_TRX(trx, nestingLevel, "write-unlocking collection %llu", (unsigned long long) collection->_cid);
LOG_TRX(trxCollection->_transaction, nestingLevel, "write-unlocking collection %llu", (unsigned long long) trxCollection->_cid);
primary->endWrite(primary);
}
collection->_locked = false;
trxCollection->_locked = false;
return TRI_ERROR_NO_ERROR;
}
@ -509,17 +511,17 @@ static int NotifyCollections (TRI_transaction_t* const trx,
n = trx->_collections._length;
for (i = 0; i < n; ++i) {
TRI_transaction_collection_t* collection;
TRI_transaction_collection_t* trxCollection;
collection = (TRI_transaction_collection_t*) TRI_AtVectorPointer(&trx->_collections, i);
trxCollection = (TRI_transaction_collection_t*) TRI_AtVectorPointer(&trx->_collections, i);
if (collection->_collection != NULL &&
collection->_collection->_collection != NULL) {
if (trxCollection->_collection != NULL &&
trxCollection->_collection->_collection != NULL) {
TRI_primary_collection_t* primary;
bool lock;
primary = collection->_collection->_collection;
lock = ! collection->_locked;
primary = trxCollection->_collection->_collection;
lock = ! trxCollection->_locked;
if (lock) {
primary->beginRead(primary);
@ -549,30 +551,39 @@ static int UseCollections (TRI_transaction_t* const trx,
// process collections in forward order
for (i = 0; i < n; ++i) {
TRI_transaction_collection_t* collection;
TRI_transaction_collection_t* trxCollection;
collection = (TRI_transaction_collection_t*) TRI_AtVectorPointer(&trx->_collections, i);
trxCollection = (TRI_transaction_collection_t*) TRI_AtVectorPointer(&trx->_collections, i);
if (collection->_nestingLevel != nestingLevel) {
if (trxCollection->_nestingLevel != nestingLevel) {
// only process our own collections
continue;
}
if (collection->_collection == NULL) {
LOG_TRX(trx, nestingLevel, "using collection %llu", (unsigned long long) collection->_cid);
collection->_collection = TRI_UseCollectionByIdVocBase(trx->_context->_vocbase, collection->_cid);
if (trxCollection->_collection == NULL) {
LOG_TRX(trx, nestingLevel, "using collection %llu", (unsigned long long) trxCollection->_cid);
trxCollection->_collection = TRI_UseCollectionByIdVocBase(trx->_context->_vocbase, trxCollection->_cid);
if (collection->_collection == NULL) {
if (trxCollection->_collection == NULL) {
// something went wrong
return TRI_errno();
}
if (trxCollection->_collection->_collection == NULL) {
// something went wrong
return TRI_errno();
}
// store the waitForSync property
trxCollection->_waitForSync = trxCollection->_collection->_collection->base._info._waitForSync;
}
TRI_ASSERT_DEBUG(collection->_collection != NULL);
TRI_ASSERT_DEBUG(trxCollection->_collection != NULL);
TRI_ASSERT_DEBUG(trxCollection->_collection->_collection != NULL);
if (! collection->_locked &&
if (! trxCollection->_locked &&
((trx->_hints & (TRI_transaction_hint_t) TRI_TRANSACTION_HINT_LOCK_ENTIRELY) != 0)) {
int res = LockCollection(trx, collection, collection->_accessType, nestingLevel);
int res = LockCollection(trxCollection, trxCollection->_accessType, nestingLevel);
if (res != TRI_ERROR_NO_ERROR) {
return res;
@ -598,24 +609,24 @@ static int ReleaseCollections (TRI_transaction_t* const trx,
// process collections in reverse order
while (i-- > 0) {
TRI_transaction_collection_t* collection;
TRI_transaction_collection_t* trxCollection;
collection = TRI_AtVectorPointer(&trx->_collections, i);
trxCollection = TRI_AtVectorPointer(&trx->_collections, i);
if (collection->_locked &&
(nestingLevel == 0 || collection->_nestingLevel == nestingLevel)) {
if (trxCollection->_locked &&
(nestingLevel == 0 || trxCollection->_nestingLevel == nestingLevel)) {
// unlock our own locks
UnlockCollection(trx, collection, collection->_accessType, nestingLevel);
UnlockCollection(trxCollection, trxCollection->_accessType, nestingLevel);
}
// the top level transaction releases all collections
if (nestingLevel == 0 && collection->_collection != NULL) {
if (nestingLevel == 0 && trxCollection->_collection != NULL) {
// unuse collection
LOG_TRX(trx, nestingLevel, "unusing collection %llu", (unsigned long long) collection->_cid);
TRI_ReleaseCollectionVocBase(trx->_context->_vocbase, collection->_collection);
LOG_TRX(trx, nestingLevel, "unusing collection %llu", (unsigned long long) trxCollection->_cid);
TRI_ReleaseCollectionVocBase(trx->_context->_vocbase, trxCollection->_collection);
collection->_locked = false;
collection->_collection = NULL;
trxCollection->_locked = false;
trxCollection->_collection = NULL;
}
}
@ -738,91 +749,103 @@ void TRI_FreeTransaction (TRI_transaction_t* const trx) {
/// @brief request a lock for a collection
////////////////////////////////////////////////////////////////////////////////
int TRI_LockCollectionTransaction (TRI_transaction_t* const trx,
const TRI_transaction_cid_t cid,
int TRI_LockCollectionTransaction (TRI_transaction_collection_t* trxCollection,
const TRI_transaction_type_e accessType,
const int nestingLevel) {
TRI_transaction_collection_t* collection = FindCollection(trx, cid, NULL);
if (collection == NULL || collection->_collection == NULL) {
return TRI_ERROR_TRANSACTION_UNREGISTERED_COLLECTION;
}
if (accessType == TRI_TRANSACTION_WRITE && collection->_accessType != TRI_TRANSACTION_WRITE) {
if (accessType == TRI_TRANSACTION_WRITE && trxCollection->_accessType != TRI_TRANSACTION_WRITE) {
// wrong lock type
return TRI_ERROR_INTERNAL;
}
if (collection->_locked) {
if (trxCollection->_locked) {
// already locked
return TRI_ERROR_NO_ERROR;
}
return LockCollection(trx, collection, accessType, nestingLevel);
return LockCollection(trxCollection, accessType, nestingLevel);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief request an unlock for a collection
////////////////////////////////////////////////////////////////////////////////
int TRI_UnlockCollectionTransaction (TRI_transaction_t* const trx,
const TRI_transaction_cid_t cid,
int TRI_UnlockCollectionTransaction (TRI_transaction_collection_t* trxCollection,
const TRI_transaction_type_e accessType,
const int nestingLevel) {
TRI_transaction_collection_t* collection = FindCollection(trx, cid, NULL);
if (collection == NULL || collection->_collection == NULL) {
return TRI_ERROR_TRANSACTION_UNREGISTERED_COLLECTION;
}
if (accessType == TRI_TRANSACTION_WRITE && collection->_accessType != TRI_TRANSACTION_WRITE) {
if (accessType == TRI_TRANSACTION_WRITE && trxCollection->_accessType != TRI_TRANSACTION_WRITE) {
// wrong lock type
return TRI_ERROR_INTERNAL;
}
if (! collection->_locked) {
if (! trxCollection->_locked) {
// already unlocked
return TRI_ERROR_NO_ERROR;
}
return UnlockCollection(trx, collection, accessType, nestingLevel);
return UnlockCollection(trxCollection, accessType, nestingLevel);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief check if a collection is locked in a transaction
////////////////////////////////////////////////////////////////////////////////
bool TRI_IsLockedCollectionTransaction (TRI_transaction_t* const trx,
const TRI_transaction_cid_t cid,
bool TRI_IsLockedCollectionTransaction (TRI_transaction_collection_t* trxCollection,
const TRI_transaction_type_e accessType,
const int nestingLevel) {
TRI_transaction_collection_t* collection = FindCollection(trx, cid, NULL);
if (collection == NULL || collection->_collection == NULL) {
// unknown collection
LOG_WARNING("logic error. checking lock status for a non-registered collection");
return false;
}
if (accessType == TRI_TRANSACTION_WRITE && collection->_accessType != TRI_TRANSACTION_WRITE) {
if (accessType == TRI_TRANSACTION_WRITE && trxCollection->_accessType != TRI_TRANSACTION_WRITE) {
// wrong lock type
LOG_WARNING("logic error. checking wrong lock type");
return false;
}
return collection->_locked;
return trxCollection->_locked;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief check if a collection is contained in a transaction and return it
/// @brief increase the number of writes done for a collection
////////////////////////////////////////////////////////////////////////////////
TRI_vocbase_col_t* TRI_GetCollectionTransaction (TRI_transaction_t* const trx,
const TRI_transaction_cid_t cid,
const TRI_transaction_type_e accessType) {
void TRI_IncreaseWritesCollectionTransaction (TRI_transaction_collection_t* trxCollection,
bool wasSynchronous) {
if (wasSynchronous) {
trxCollection->_hadSyncedWrites = true;
}
trxCollection->_numWrites++;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief return the collection from a transaction
////////////////////////////////////////////////////////////////////////////////
bool TRI_WasSynchronousCollectionTransaction (TRI_transaction_t const* trx,
const TRI_transaction_cid_t cid) {
TRI_transaction_collection_t* collection;
TRI_ASSERT_DEBUG(trx->_status == TRI_TRANSACTION_RUNNING ||
trx->_status == TRI_TRANSACTION_ABORTED ||
trx->_status == TRI_TRANSACTION_COMMITTED);
collection = FindCollection(trx, cid, NULL);
if (collection == NULL || collection->_collection == NULL) {
// not found or not opened. probably a mistake made by the caller
return false;
}
return collection->_hadSyncedWrites;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief return the collection from a transaction
////////////////////////////////////////////////////////////////////////////////
TRI_transaction_collection_t* TRI_GetCollectionTransaction (TRI_transaction_t const* trx,
const TRI_transaction_cid_t cid,
const TRI_transaction_type_e accessType) {
TRI_transaction_collection_t* collection;
@ -832,16 +855,17 @@ TRI_vocbase_col_t* TRI_GetCollectionTransaction (TRI_transaction_t* const trx,
collection = FindCollection(trx, cid, NULL);
if (collection == NULL || collection->_collection == NULL) {
// not found or not opened. probably a mistake made by the caller
return NULL;
}
// check if access type matches
if (accessType == TRI_TRANSACTION_WRITE && collection->_accessType == TRI_TRANSACTION_READ) {
// type doesn't match
// type doesn't match. probably also a mistake by the caller
return NULL;
}
return collection->_collection;
return collection;
}
////////////////////////////////////////////////////////////////////////////////
@ -904,7 +928,7 @@ int TRI_AddCollectionTransaction (TRI_transaction_t* const trx,
}
// collection was not contained. now create and insert it
collection = CreateCollection(trx->_context, cid, accessType, nestingLevel);
collection = CreateCollection(trx, cid, accessType, nestingLevel);
if (collection == NULL) {
// out of memory
@ -935,7 +959,18 @@ int TRI_BeginTransaction (TRI_transaction_t* const trx,
if (nestingLevel == 0) {
TRI_ASSERT_DEBUG(trx->_status == TRI_TRANSACTION_CREATED);
// get a new id
trx->_id = TRI_NewTickVocBase();
// update hints
if (trx->_collections._length == 1) {
hints |= (TRI_transaction_hint_t) TRI_TRANSACTION_HINT_SINGLE_COLLECTION;
}
if (trx->_type == TRI_TRANSACTION_READ) {
hints |= (TRI_transaction_hint_t) TRI_TRANSACTION_HINT_READ_ONLY;
}
trx->_hints = hints;
}
else {

View File

@ -161,7 +161,6 @@ TRI_transaction_collection_stats_t;
typedef struct TRI_transaction_collection_global_s {
TRI_transaction_cid_t _cid;
TRI_read_write_lock_t _lock;
TRI_transaction_collection_stats_t _stats;
@ -239,20 +238,6 @@ int TRI_StatsCollectionTransactionContext (TRI_transaction_context_t*,
/// @{
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @brief collection used in a transaction
////////////////////////////////////////////////////////////////////////////////
typedef struct TRI_transaction_collection_s {
TRI_transaction_cid_t _cid; // collection id
TRI_transaction_type_e _accessType; // access type (read|write)
int _nestingLevel; // the transaction level that added this collection
struct TRI_vocbase_col_s* _collection; // vocbase collection pointer
TRI_transaction_collection_global_t* _globalInstance; // pointer to the global instance
bool _locked; // collection lock flag
}
TRI_transaction_collection_t;
////////////////////////////////////////////////////////////////////////////////
/// @brief typedef for transaction hints
////////////////////////////////////////////////////////////////////////////////
@ -264,9 +249,11 @@ typedef uint32_t TRI_transaction_hint_t;
////////////////////////////////////////////////////////////////////////////////
typedef enum {
TRI_TRANSACTION_HINT_NONE = 0,
TRI_TRANSACTION_HINT_SINGLE_OPERATION = 1,
TRI_TRANSACTION_HINT_LOCK_ENTIRELY = 2
TRI_TRANSACTION_HINT_NONE = 0,
TRI_TRANSACTION_HINT_SINGLE_OPERATION = 1,
TRI_TRANSACTION_HINT_LOCK_ENTIRELY = 2,
TRI_TRANSACTION_HINT_READ_ONLY = 4,
TRI_TRANSACTION_HINT_SINGLE_COLLECTION = 8
}
TRI_transaction_hint_e;
@ -285,6 +272,24 @@ typedef struct TRI_transaction_s {
}
TRI_transaction_t;
////////////////////////////////////////////////////////////////////////////////
/// @brief collection used in a transaction
////////////////////////////////////////////////////////////////////////////////
typedef struct TRI_transaction_collection_s {
TRI_transaction_t* _transaction; // the transaction
TRI_transaction_cid_t _cid; // collection id
TRI_transaction_type_e _accessType; // access type (read|write)
int _nestingLevel; // the transaction level that added this collection
struct TRI_vocbase_col_s* _collection; // vocbase collection pointer
TRI_transaction_collection_global_t* _globalInstance; // pointer to the global instance
uint64_t _numWrites; // number of writes
bool _locked; // collection lock flag
bool _waitForSync; // whether or not the collection has waitForSync
bool _hadSyncedWrites; // whether or not synced writes happened
}
TRI_transaction_collection_t;
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
@ -324,12 +329,26 @@ void TRI_FreeTransaction (TRI_transaction_t* const);
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @brief check if a collection is contained in a transaction and return it
/// @brief increase the number of writes done for a collection
////////////////////////////////////////////////////////////////////////////////
struct TRI_vocbase_col_s* TRI_GetCollectionTransaction (TRI_transaction_t* const,
const TRI_transaction_cid_t,
const TRI_transaction_type_e);
void TRI_IncreaseWritesCollectionTransaction (TRI_transaction_collection_t*,
bool);
////////////////////////////////////////////////////////////////////////////////
/// @brief return the collection from a transaction
////////////////////////////////////////////////////////////////////////////////
bool TRI_WasSynchronousCollectionTransaction (TRI_transaction_t const*,
const TRI_transaction_cid_t);
////////////////////////////////////////////////////////////////////////////////
/// @brief return the collection from a transaction
////////////////////////////////////////////////////////////////////////////////
TRI_transaction_collection_t* TRI_GetCollectionTransaction (TRI_transaction_t const*,
const TRI_transaction_cid_t,
const TRI_transaction_type_e);
////////////////////////////////////////////////////////////////////////////////
/// @brief add a collection to a transaction
@ -344,8 +363,7 @@ int TRI_AddCollectionTransaction (TRI_transaction_t* const,
/// @brief request a lock for a collection
////////////////////////////////////////////////////////////////////////////////
int TRI_LockCollectionTransaction (TRI_transaction_t* const,
const TRI_transaction_cid_t,
int TRI_LockCollectionTransaction (TRI_transaction_collection_t*,
const TRI_transaction_type_e,
const int);
@ -353,8 +371,7 @@ int TRI_LockCollectionTransaction (TRI_transaction_t* const,
/// @brief request an unlock for a collection
////////////////////////////////////////////////////////////////////////////////
int TRI_UnlockCollectionTransaction (TRI_transaction_t* const,
const TRI_transaction_cid_t,
int TRI_UnlockCollectionTransaction (TRI_transaction_collection_t*,
const TRI_transaction_type_e,
const int);
@ -362,8 +379,7 @@ int TRI_UnlockCollectionTransaction (TRI_transaction_t* const,
/// @brief check whether a collection is locked in a transaction
////////////////////////////////////////////////////////////////////////////////
bool TRI_IsLockedCollectionTransaction (TRI_transaction_t* const,
const TRI_transaction_cid_t,
bool TRI_IsLockedCollectionTransaction (TRI_transaction_collection_t*,
const TRI_transaction_type_e,
const int);