1
0
Fork 0

switch order of insertion into indexes & datafile for updates

updates are now done in the indexes first, then written to the datafile
This commit is contained in:
Jan Steemann 2013-03-18 13:03:05 +01:00
parent 0ef011d408
commit 99522ccb84
5 changed files with 364 additions and 414 deletions

View File

@ -229,7 +229,7 @@ describe ArangoDB do
doc.parsed_response['a'].should eq(1) doc.parsed_response['a'].should eq(1)
doc.parsed_response['b'].should eq(1) doc.parsed_response['b'].should eq(1)
doc.parsed_response['_id'].should eq(id1) doc.parsed_response['_id'].should eq(id1)
doc.parsed_response['_rev'].should_not eq(rev1) doc.parsed_response['_rev'].should eq(rev1)
rev3 = doc.parsed_response['_rev'] rev3 = doc.parsed_response['_rev']
rev3.should be_kind_of(String) rev3.should be_kind_of(String)
@ -257,8 +257,8 @@ describe ArangoDB do
doc.parsed_response['a'].should eq(1) doc.parsed_response['a'].should eq(1)
doc.parsed_response['b'].should eq(1) doc.parsed_response['b'].should eq(1)
doc.parsed_response['_id'].should eq(id1) doc.parsed_response['_id'].should eq(id1)
doc.parsed_response['_rev'].should_not eq(rev1) doc.parsed_response['_rev'].should eq(rev1)
doc.parsed_response['_rev'].should_not eq(rev3) doc.parsed_response['_rev'].should_not eq(rev2)
# unload collection # unload collection
cmd4 = "/_api/collection/#{@cn}/unload" cmd4 = "/_api/collection/#{@cn}/unload"
@ -282,8 +282,8 @@ describe ArangoDB do
doc.parsed_response['a'].should eq(1) doc.parsed_response['a'].should eq(1)
doc.parsed_response['b'].should eq(1) doc.parsed_response['b'].should eq(1)
doc.parsed_response['_id'].should eq(id1) doc.parsed_response['_id'].should eq(id1)
doc.parsed_response['_rev'].should_not eq(rev1) doc.parsed_response['_rev'].should eq(rev1)
doc.parsed_response['_rev'].should_not eq(rev3) doc.parsed_response['_rev'].should_not eq(rev2)
end end
end end

View File

@ -229,7 +229,7 @@ describe ArangoDB do
doc.parsed_response['a'].should eq(1) doc.parsed_response['a'].should eq(1)
doc.parsed_response['b'].should eq(1) doc.parsed_response['b'].should eq(1)
doc.parsed_response['_id'].should eq(id1) doc.parsed_response['_id'].should eq(id1)
doc.parsed_response['_rev'].should_not eq(rev1) doc.parsed_response['_rev'].should eq(rev1)
rev3 = doc.parsed_response['_rev'] rev3 = doc.parsed_response['_rev']
rev3.should be_kind_of(String) rev3.should be_kind_of(String)
@ -257,8 +257,8 @@ describe ArangoDB do
doc.parsed_response['a'].should eq(1) doc.parsed_response['a'].should eq(1)
doc.parsed_response['b'].should eq(1) doc.parsed_response['b'].should eq(1)
doc.parsed_response['_id'].should eq(id1) doc.parsed_response['_id'].should eq(id1)
doc.parsed_response['_rev'].should_not eq(rev1) doc.parsed_response['_rev'].should eq(rev1)
doc.parsed_response['_rev'].should_not eq(rev3) doc.parsed_response['_rev'].should_not eq(rev2)
# unload collection # unload collection
cmd4 = "/_api/collection/#{@cn}/unload" cmd4 = "/_api/collection/#{@cn}/unload"
@ -282,8 +282,8 @@ describe ArangoDB do
doc.parsed_response['a'].should eq(1) doc.parsed_response['a'].should eq(1)
doc.parsed_response['b'].should eq(1) doc.parsed_response['b'].should eq(1)
doc.parsed_response['_id'].should eq(id1) doc.parsed_response['_id'].should eq(id1)
doc.parsed_response['_rev'].should_not eq(rev1) doc.parsed_response['_rev'].should eq(rev1)
doc.parsed_response['_rev'].should_not eq(rev3) doc.parsed_response['_rev'].should_not eq(rev2)
end end
end end

View File

@ -459,7 +459,7 @@ namespace triagens {
this->lockExplicit(primary, TRI_TRANSACTION_READ); this->lockExplicit(primary, TRI_TRANSACTION_READ);
} }
int res = primary->read(&context, mptr, (TRI_voc_key_t) key.c_str()); int res = primary->read(&context, (TRI_voc_key_t) key.c_str(), mptr);
if (lock) { if (lock) {
this->unlockExplicit(primary, TRI_TRANSACTION_READ); this->unlockExplicit(primary, TRI_TRANSACTION_READ);
@ -649,7 +649,7 @@ namespace triagens {
TRI_InitContextPrimaryCollection(&context, primary, TRI_DOC_UPDATE_LAST_WRITE, forceSync); TRI_InitContextPrimaryCollection(&context, primary, TRI_DOC_UPDATE_LAST_WRITE, forceSync);
// TODO: set transaction lock here // TODO: set transaction lock here
return primary->insert(&context, markerType, key, mptr, shaped, data, lock, forceSync); return primary->insert(&context, key, mptr, markerType, shaped, data, lock, forceSync);
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -671,22 +671,7 @@ namespace triagens {
return TRI_ERROR_ARANGO_SHAPER_FAILED; return TRI_ERROR_ARANGO_SHAPER_FAILED;
} }
TRI_doc_operation_context_t context; int res = this->updateCollectionShaped(primary, key, mptr, shaped, policy, expectedRevision, actualRevision, forceSync, lock);
TRI_InitContextPrimaryCollection(&context, primary, policy, forceSync);
context._expectedRid = expectedRevision;
context._previousRid = actualRevision;
if (lock) {
// WRITE-LOCK START
this->lockExplicit(primary, TRI_TRANSACTION_WRITE);
}
int res = primary->update(&context, mptr, shaped, (TRI_voc_key_t) key.c_str());
if (lock) {
this->unlockExplicit(primary, TRI_TRANSACTION_WRITE);
// WRITE-LOCK END
}
TRI_FreeShapedJson(primary->_shaper, shaped); TRI_FreeShapedJson(primary->_shaper, shaped);
@ -697,7 +682,7 @@ namespace triagens {
/// @brief update a single document, using shaped json /// @brief update a single document, using shaped json
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
int updateCollectionShaped (TRI_primary_collection_t* const primary, inline int updateCollectionShaped (TRI_primary_collection_t* const primary,
const string& key, const string& key,
TRI_doc_mptr_t* mptr, TRI_doc_mptr_t* mptr,
TRI_shaped_json_t* const shaped, TRI_shaped_json_t* const shaped,
@ -706,22 +691,14 @@ namespace triagens {
TRI_voc_rid_t* actualRevision, TRI_voc_rid_t* actualRevision,
const bool forceSync, const bool forceSync,
const bool lock) { const bool lock) {
TRI_doc_operation_context_t context; TRI_doc_operation_context_t context;
TRI_InitContextPrimaryCollection(&context, primary, policy, forceSync); TRI_InitContextPrimaryCollection(&context, primary, policy, forceSync);
context._expectedRid = expectedRevision; context._expectedRid = expectedRevision;
context._previousRid = actualRevision; context._previousRid = actualRevision;
if (lock) { // TODO: set transaction lock here
// WRITE-LOCK START int res = primary->update(&context, (TRI_voc_key_t) key.c_str(), mptr, shaped, lock, forceSync);
this->lockExplicit(primary, TRI_TRANSACTION_WRITE);
}
int res = primary->update(&context, mptr, shaped, (TRI_voc_key_t) key.c_str());
if (lock) {
this->unlockExplicit(primary, TRI_TRANSACTION_WRITE);
// WRITE-LOCK END
}
return res; return res;
} }
@ -745,7 +722,7 @@ namespace triagens {
this->lockExplicit(primary, TRI_TRANSACTION_WRITE); this->lockExplicit(primary, TRI_TRANSACTION_WRITE);
// TODO: fix locks // TODO: fix locks
int res = primary->destroy(&context, (TRI_voc_key_t) key.c_str(), (TRI_voc_size_t) key.size(), false, forceSync); int res = primary->destroy(&context, (TRI_voc_key_t) key.c_str(), false, forceSync);
this->unlockExplicit(primary, TRI_TRANSACTION_WRITE); this->unlockExplicit(primary, TRI_TRANSACTION_WRITE);
// WRITE-LOCK END // WRITE-LOCK END
@ -780,7 +757,7 @@ namespace triagens {
for (size_t i = 0; i < n; ++i) { for (size_t i = 0; i < n; ++i) {
const string& id = ids[i]; const string& id = ids[i];
res = primary->destroy(&context, (TRI_voc_key_t) id.c_str(), (TRI_voc_size_t) id.size(), false, forceSync); res = primary->destroy(&context, (TRI_voc_key_t) id.c_str(), false, forceSync);
if (res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_NO_ERROR) {
// halt on first error // halt on first error

View File

@ -59,17 +59,6 @@ static int DeletePrimaryIndex (TRI_document_collection_t*,
static int DeleteSecondaryIndexes (TRI_document_collection_t*, static int DeleteSecondaryIndexes (TRI_document_collection_t*,
TRI_doc_mptr_t const*); TRI_doc_mptr_t const*);
static int UpdateDocument (TRI_doc_operation_context_t*,
TRI_doc_mptr_t const*,
TRI_doc_document_key_marker_t*,
TRI_voc_size_t,
void const*,
TRI_voc_size_t,
void const*,
TRI_voc_size_t,
TRI_df_marker_t**,
TRI_doc_mptr_t*);
static int CapConstraintFromJson (TRI_document_collection_t*, static int CapConstraintFromJson (TRI_document_collection_t*,
TRI_json_t*, TRI_json_t*,
TRI_idx_iid_t); TRI_idx_iid_t);
@ -116,15 +105,11 @@ static int PriorityQueueFromJson (TRI_document_collection_t*,
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
static size_t LengthDataMasterPointer (const TRI_doc_mptr_t* const mptr) { static size_t LengthDataMasterPointer (const TRI_doc_mptr_t* const mptr) {
if (mptr != NULL) {
void const* data = mptr->_data; void const* data = mptr->_data;
if (((TRI_df_marker_t const*) data)->_type == TRI_DOC_MARKER_KEY_DOCUMENT) { if (((TRI_df_marker_t const*) data)->_type == TRI_DOC_MARKER_KEY_DOCUMENT ||
return ((TRI_df_marker_t*) data)->_size - ((TRI_doc_document_key_marker_t const*) data)->_offsetJson; ((TRI_df_marker_t const*) data)->_type == TRI_DOC_MARKER_KEY_EDGE) {
} return ((TRI_df_marker_t*) data)->_size;
else if (((TRI_df_marker_t const*) data)->_type == TRI_DOC_MARKER_KEY_EDGE) {
return ((TRI_df_marker_t*) data)->_size - ((TRI_doc_edge_key_marker_t const*) data)->base._offsetJson;
}
} }
return 0; return 0;
@ -188,6 +173,99 @@ static int CreateDeletionMarker (TRI_doc_deletion_key_marker_t** result,
return TRI_ERROR_NO_ERROR; return TRI_ERROR_NO_ERROR;
} }
////////////////////////////////////////////////////////////////////////////////
/// @brief creates a new document or edge marker in memory, based on another
/// existing marker
////////////////////////////////////////////////////////////////////////////////
static int CloneDocumentMarker (TRI_df_marker_t const* original,
TRI_doc_document_key_marker_t** result,
TRI_voc_size_t* totalSize,
const TRI_df_marker_type_e markerType,
TRI_shaped_json_t const* shaped,
const bool calcCrc) {
TRI_doc_document_key_marker_t* marker;
TRI_voc_tick_t tick;
size_t baseLength;
*result = NULL;
if (markerType != original->_type) {
// cannot clone a different marker type
return TRI_ERROR_INTERNAL;
}
// calculate the basic marker size
if (markerType == TRI_DOC_MARKER_KEY_DOCUMENT) {
// document marker
TRI_doc_document_key_marker_t const* o = (TRI_doc_document_key_marker_t const*) original;
baseLength = o->_offsetJson;
TRI_ASSERT_DEBUG(baseLength > sizeof(TRI_doc_document_key_marker_t));
}
else if (markerType == TRI_DOC_MARKER_KEY_EDGE) {
// edge marker
TRI_doc_edge_key_marker_t const* o = (TRI_doc_edge_key_marker_t const*) original;
baseLength = o->base._offsetJson;
TRI_ASSERT_DEBUG(baseLength > sizeof(TRI_doc_edge_key_marker_t));
}
else {
// invalid marker type
LOG_WARNING("invalid marker type %d", (int) markerType);
return TRI_ERROR_INTERNAL;
}
// calculate the total size for the marker (= marker base size + key(s) + shaped json)
*totalSize = baseLength + shaped->_data.length;
marker = TRI_Allocate(TRI_UNKNOWN_MEM_ZONE, *totalSize * sizeof(char), false);
if (marker == NULL) {
return TRI_ERROR_OUT_OF_MEMORY;
}
// copy non-changed data (e.g. key(s)) from old marker into new marker
memcpy(marker, original, baseLength);
// set the marker type, size, revision id etc.
tick = TRI_NewTickVocBase();
TRI_InitMarker(&marker->base, markerType, *totalSize, tick);
marker->_rid = tick;
marker->_shape = shaped->_sid;
// copy shaped json into the marker
memcpy(((char*) marker) + baseLength, (char*) shaped->_data.data, shaped->_data.length);
// no need to adjust _offsetKey, _offsetJson etc. as we copied it from the old marker
#ifdef TRI_ENABLE_MAINTAINER_MODE
TRI_ASSERT_DEBUG(marker->_offsetKey == ((TRI_doc_document_key_marker_t const*) original)->_offsetKey);
TRI_ASSERT_DEBUG(marker->_offsetJson == ((TRI_doc_document_key_marker_t const*) original)->_offsetJson);
if (markerType == TRI_DOC_MARKER_KEY_EDGE) {
TRI_doc_edge_key_marker_t const* o = (TRI_doc_edge_key_marker_t const*) original;
TRI_doc_edge_key_marker_t const* c = (TRI_doc_edge_key_marker_t const*) marker;
TRI_ASSERT_DEBUG(c->_toCid == o->_toCid);
TRI_ASSERT_DEBUG(c->_fromCid == o->_fromCid);
TRI_ASSERT_DEBUG(c->_offsetToKey == o->_offsetToKey);
TRI_ASSERT_DEBUG(c->_offsetFromKey == o->_offsetFromKey);
}
#endif
if (calcCrc) {
// calculate crc checksum
TRI_CrcMarker(&marker->base, *totalSize);
}
*result = marker;
return TRI_ERROR_NO_ERROR;
}
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief creates a new document or edge marker in memory /// @brief creates a new document or edge marker in memory
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -206,7 +284,7 @@ static int CreateDocumentMarker (TRI_primary_collection_t* primary,
char* position; char* position;
char keyBuffer[TRI_VOC_KEY_MAX_LENGTH + 1]; char keyBuffer[TRI_VOC_KEY_MAX_LENGTH + 1];
TRI_voc_size_t keyBodySize; TRI_voc_size_t keyBodySize;
TRI_voc_tick_t markerId; TRI_voc_tick_t tick;
size_t markerSize; size_t markerSize;
size_t keySize; size_t keySize;
size_t fromSize; size_t fromSize;
@ -214,7 +292,7 @@ static int CreateDocumentMarker (TRI_primary_collection_t* primary,
int res; int res;
*result = NULL; *result = NULL;
markerId = TRI_NewTickVocBase(); tick = TRI_NewTickVocBase();
// generate the key // generate the key
keyGenerator = (TRI_key_generator_t*) primary->_keyGenerator; keyGenerator = (TRI_key_generator_t*) primary->_keyGenerator;
@ -223,7 +301,7 @@ static int CreateDocumentMarker (TRI_primary_collection_t* primary,
// create key using key generator // create key using key generator
res = keyGenerator->generate(keyGenerator, res = keyGenerator->generate(keyGenerator,
TRI_VOC_KEY_MAX_LENGTH, TRI_VOC_KEY_MAX_LENGTH,
markerId, tick,
key, key,
(char*) &keyBuffer, (char*) &keyBuffer,
&keySize); &keySize);
@ -238,12 +316,14 @@ static int CreateDocumentMarker (TRI_primary_collection_t* primary,
// calculate the basic marker size // calculate the basic marker size
if (markerType == TRI_DOC_MARKER_KEY_DOCUMENT) { if (markerType == TRI_DOC_MARKER_KEY_DOCUMENT) {
// document marker
fromSize = 0; fromSize = 0;
toSize = 0; toSize = 0;
keyBodySize = TRI_DF_ALIGN_BLOCK(keySize); keyBodySize = TRI_DF_ALIGN_BLOCK(keySize);
markerSize = sizeof(TRI_doc_document_key_marker_t); markerSize = sizeof(TRI_doc_document_key_marker_t);
} }
else if (markerType == TRI_DOC_MARKER_KEY_EDGE) { else if (markerType == TRI_DOC_MARKER_KEY_EDGE) {
// edge marker
TRI_document_edge_t const* edge = data; TRI_document_edge_t const* edge = data;
fromSize = strlen(edge->_fromKey) + 1; fromSize = strlen(edge->_fromKey) + 1;
@ -258,14 +338,18 @@ static int CreateDocumentMarker (TRI_primary_collection_t* primary,
} }
// calculate the total size for the marker (= marker base size + key(s) + shaped json)
*totalSize = markerSize + keyBodySize + shaped->_data.length; *totalSize = markerSize + keyBodySize + shaped->_data.length;
marker = TRI_Allocate(TRI_UNKNOWN_MEM_ZONE, *totalSize * sizeof(char), false); marker = TRI_Allocate(TRI_UNKNOWN_MEM_ZONE, *totalSize * sizeof(char), false);
if (marker == NULL) { if (marker == NULL) {
return TRI_ERROR_OUT_OF_MEMORY; return TRI_ERROR_OUT_OF_MEMORY;
} }
TRI_InitMarker(&marker->base, markerType, markerSize, markerId); // set the marker type, size, revision id etc.
TRI_InitMarker(&marker->base, markerType, *totalSize, tick);
marker->_rid = tick;
marker->_shape = shaped->_sid; marker->_shape = shaped->_sid;
*keyBody = ((char*) marker) + markerSize; *keyBody = ((char*) marker) + markerSize;
@ -275,6 +359,7 @@ static int CreateDocumentMarker (TRI_primary_collection_t* primary,
memcpy(position, (char*) &keyBuffer, keySize); memcpy(position, (char*) &keyBuffer, keySize);
if (markerType == TRI_DOC_MARKER_KEY_EDGE) { if (markerType == TRI_DOC_MARKER_KEY_EDGE) {
// additional attributes for an edge marker
TRI_doc_edge_key_marker_t* edgeMarker = (TRI_doc_edge_key_marker_t*) marker; TRI_doc_edge_key_marker_t* edgeMarker = (TRI_doc_edge_key_marker_t*) marker;
TRI_document_edge_t const* edge = data; TRI_document_edge_t const* edge = data;
@ -289,15 +374,17 @@ static int CreateDocumentMarker (TRI_primary_collection_t* primary,
edgeMarker->_toCid = edge->_toCid; edgeMarker->_toCid = edge->_toCid;
} }
// copy shaped json into the marker
position = ((char*) marker) + markerSize + keyBodySize; position = ((char*) marker) + markerSize + keyBodySize;
memcpy(position, (char*) shaped->_data.data, shaped->_data.length); memcpy(position, (char*) shaped->_data.data, shaped->_data.length);
// set the offsets for _key and shaped json
marker->_offsetKey = markerSize; marker->_offsetKey = markerSize;
marker->_offsetJson = markerSize + keyBodySize; marker->_offsetJson = markerSize + keyBodySize;
marker->base._size = markerSize + keyBodySize + shaped->_data.length;
if (calcCrc) { if (calcCrc) {
TRI_CrcMarker(&marker->base, markerSize + keyBodySize + shaped->_data.length); // calculate crc checksum
TRI_CrcMarker(&marker->base, *totalSize);
} }
*result = marker; *result = marker;
@ -466,7 +553,7 @@ static void WaitSync (TRI_document_collection_t* document,
static int WriteElement (TRI_document_collection_t* document, static int WriteElement (TRI_document_collection_t* document,
TRI_datafile_t* journal, TRI_datafile_t* journal,
TRI_df_marker_t* marker, TRI_df_marker_t const* marker,
TRI_voc_size_t markerSize, TRI_voc_size_t markerSize,
void const* keyBody, void const* keyBody,
TRI_voc_size_t keyBodySize, TRI_voc_size_t keyBodySize,
@ -570,7 +657,7 @@ static int InsertDocument (TRI_document_collection_t* document,
journal = SelectJournal(document, totalSize, &result); journal = SelectJournal(document, totalSize, &result);
if (journal == NULL) { if (journal == NULL) {
res = TRI_ERROR_INTERNAL; res = TRI_ERROR_ARANGO_NO_JOURNAL;
} }
if (res == TRI_ERROR_NO_ERROR) { if (res == TRI_ERROR_NO_ERROR) {
@ -774,7 +861,7 @@ static int DeleteDocument (TRI_doc_operation_context_t* context,
/// @brief updates an existing header /// @brief updates an existing header
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
static void UpdateHeader (TRI_datafile_t* datafile, static void UpdateHeader (TRI_voc_fid_t fid,
TRI_df_marker_t const* m, TRI_df_marker_t const* m,
TRI_doc_mptr_t* newHeader, TRI_doc_mptr_t* newHeader,
TRI_doc_mptr_t const* oldHeader) { TRI_doc_mptr_t const* oldHeader) {
@ -783,7 +870,7 @@ static void UpdateHeader (TRI_datafile_t* datafile,
marker = (TRI_doc_document_key_marker_t const*) m; marker = (TRI_doc_document_key_marker_t const*) m;
newHeader->_rid = marker->_rid; newHeader->_rid = marker->_rid;
newHeader->_fid = datafile->_fid; newHeader->_fid = fid;
newHeader->_data = marker; newHeader->_data = marker;
newHeader->_key = ((char*) marker) + marker->_offsetKey; newHeader->_key = ((char*) marker) + marker->_offsetKey;
@ -791,183 +878,114 @@ static void UpdateHeader (TRI_datafile_t* datafile,
newHeader->_validTo = oldHeader->_validTo; // TODO: fix for trx newHeader->_validTo = oldHeader->_validTo; // TODO: fix for trx
} }
////////////////////////////////////////////////////////////////////////////////
/// @brief rolls back an update
////////////////////////////////////////////////////////////////////////////////
static int RollbackUpdate (TRI_primary_collection_t* primary,
TRI_doc_mptr_t const* header,
TRI_df_marker_t const* originalMarker,
TRI_df_marker_t** result) {
TRI_doc_document_key_marker_t* marker;
char* data;
TRI_voc_size_t dataLength;
TRI_voc_size_t markerLength;
TRI_doc_operation_context_t rollbackContext;
char* keyData;
TRI_voc_size_t keyDataLength;
TRI_doc_document_key_marker_t documentUpdate;
TRI_doc_edge_key_marker_t edgeUpdate;
if (originalMarker->_type != TRI_DOC_MARKER_KEY_DOCUMENT &&
originalMarker->_type != TRI_DOC_MARKER_KEY_EDGE) {
// invalid marker type
LOG_WARNING("rollback operation called for unexpected marker type");
return TRI_ERROR_INTERNAL;
}
if (originalMarker->_type == TRI_DOC_MARKER_KEY_DOCUMENT) {
// document is a document
TRI_doc_document_key_marker_t* o = (TRI_doc_document_key_marker_t*) originalMarker;
memcpy(&documentUpdate, originalMarker, sizeof(TRI_doc_document_key_marker_t));
marker = &documentUpdate;
markerLength = sizeof(TRI_doc_document_key_marker_t);
keyData = ((char*) originalMarker) + o->_offsetKey;
keyDataLength = o->_offsetJson - o->_offsetKey;
data = ((char*) originalMarker) + marker->_offsetJson;
dataLength = originalMarker->_size - marker->_offsetJson;
}
else {
// document is an edge
TRI_doc_edge_key_marker_t* o = (TRI_doc_edge_key_marker_t*) originalMarker;
memcpy(&edgeUpdate, originalMarker, sizeof(TRI_doc_edge_key_marker_t));
marker = &edgeUpdate.base;
markerLength = sizeof(TRI_doc_edge_key_marker_t);
keyData = ((char*) originalMarker) + o->base._offsetKey;
keyDataLength = o->base._offsetJson - o->base._offsetKey;
data = ((char*) originalMarker) + o->base._offsetJson;
dataLength = originalMarker->_size - o->base._offsetJson;
}
// create a rollback context that does not rollback itself
TRI_InitContextPrimaryCollection(&rollbackContext, primary, TRI_DOC_UPDATE_LAST_WRITE, false);
rollbackContext._expectedRid = header->_rid;
rollbackContext._allowRollback = false;
return UpdateDocument(&rollbackContext,
header,
marker,
markerLength,
keyData,
keyDataLength,
data,
dataLength,
result,
NULL);
}
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief updates an existing document splitted into marker and body to file /// @brief updates an existing document splitted into marker and body to file
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
static int UpdateDocument (TRI_doc_operation_context_t* context, static int UpdateDocument (TRI_document_collection_t* document,
TRI_doc_mptr_t const* oldHeader, TRI_doc_mptr_t* oldHeader,
TRI_doc_document_key_marker_t* marker, TRI_doc_document_key_marker_t const* marker,
TRI_voc_size_t markerSize, const TRI_voc_size_t totalSize,
void const* keyBody, const bool forceSync,
TRI_voc_size_t keyBodySize,
void const* body,
TRI_voc_size_t bodySize,
TRI_df_marker_t** result,
TRI_doc_mptr_t* mptr) { TRI_doc_mptr_t* mptr) {
TRI_datafile_t* journal; TRI_datafile_t* journal;
TRI_doc_datafile_info_t* dfi; TRI_doc_mptr_t* newHeader;
TRI_doc_mptr_t oldData; TRI_doc_mptr_t oldData;
TRI_doc_mptr_t* newHeader; // note: IDENTICAL to oldHeader for non-transactional collections TRI_df_marker_t* result;
TRI_document_collection_t* document;
TRI_primary_collection_t* primary;
TRI_voc_size_t total;
int res; int res;
size_t i;
size_t n;
if (mptr != NULL) {
mptr->_key = NULL;
mptr->_data = NULL;
}
// .............................................................................
// check the revision
// .............................................................................
res = TRI_RevisionCheck(context, oldHeader->_rid);
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
// extract the collection
primary = context->_collection;
document = (TRI_document_collection_t*) primary;
// save the old data, remember // save the old data, remember
oldData = *oldHeader; oldData = *oldHeader;
// ............................................................................. // .............................................................................
// remove from all indexes // update indexes
// ............................................................................. // .............................................................................
// remove old document from secondary indexes
// (it will stay in the primary index as the key won't change)
res = DeleteSecondaryIndexes(document, oldHeader); res = DeleteSecondaryIndexes(document, oldHeader);
// reenter the document in case of failure, ignore errors during rollback
if (context->_allowRollback && res != TRI_ERROR_NO_ERROR) {
res = InsertSecondaryIndexes(document, oldHeader);
if (res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_NO_ERROR) {
LOG_ERROR("encountered error '%s' during rollback of update", TRI_errno_string(res)); // re-enter the document in case of failure, ignore errors during rollback
int resRollback;
resRollback = InsertSecondaryIndexes(document, oldHeader);
if (resRollback != TRI_ERROR_NO_ERROR) {
LOG_DEBUG("encountered error '%s' during rollback of update", TRI_errno_string(resRollback));
} }
return res; return res;
} }
// ............................................................................. // .............................................................................
// update header // update header
// ............................................................................. // .............................................................................
// generate a new tick // TODO: this will be identical for non-transactional collections only
marker->_rid = marker->base._tick = TRI_NewTickVocBase(); newHeader = CONST_CAST(oldHeader);
// find and select a journal // update the header. this will modify oldHeader !!!
total = markerSize + keyBodySize + bodySize; UpdateHeader(0, &marker->base, newHeader, oldHeader);
journal = SelectJournal(document, total, result);
if (journal == NULL) {
return TRI_ERROR_ARANGO_NO_JOURNAL;
}
// ............................................................................. // insert new document into secondary indexes
// write document blob res = InsertSecondaryIndexes(document, newHeader);
// .............................................................................
// generate crc
TRI_FillCrcMarkerDatafile(journal, &marker->base, markerSize, keyBody, keyBodySize, body, bodySize);
// and write marker and blob
// TODO: update
res = WriteElement(document, journal, &marker->base, markerSize, keyBody, keyBodySize, body, bodySize, *result);
if (res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_NO_ERROR) {
LOG_ERROR("cannot write element"); // rollback
int resRollback;
resRollback = DeleteSecondaryIndexes(document, newHeader);
if (resRollback != TRI_ERROR_NO_ERROR) {
LOG_DEBUG("encountered error '%s' during rollback of update", TRI_errno_string(resRollback));
}
// copy back old header data
*oldHeader = oldData;
resRollback = InsertSecondaryIndexes(document, oldHeader);
if (resRollback != TRI_ERROR_NO_ERROR) {
LOG_ERROR("encountered error '%s' during rollback of update", TRI_errno_string(resRollback));
}
return res; return res;
} }
// ............................................................................. // .............................................................................
// update indexes // write datafile
// ............................................................................. // .............................................................................
// TODO: this will be identical for non-transactional collections only
newHeader = CONST_CAST(oldHeader);
// update the header // find and select a journal
UpdateHeader(journal, *result, newHeader, oldHeader); journal = SelectJournal(document, totalSize, &result);
if (journal == NULL) {
res = TRI_ERROR_ARANGO_NO_JOURNAL;
}
if (res == TRI_ERROR_NO_ERROR) {
res = WriteElement(document, journal, &marker->base, totalSize, NULL, 0, NULL, 0, result);
if (res == TRI_ERROR_NO_ERROR) {
TRI_doc_datafile_info_t* dfi;
TRI_primary_collection_t* primary;
size_t i, n;
// update the header with the correct fid and the positions in the datafile
newHeader->_fid = journal->_fid;
newHeader->_data = ((char*) result);
newHeader->_key = ((char*) result) + marker->_offsetKey;
primary = (TRI_primary_collection_t*) document;
// update the datafile info // update the datafile info
dfi = TRI_FindDatafileInfoPrimaryCollection(primary, oldData._fid); dfi = TRI_FindDatafileInfoPrimaryCollection(primary, oldData._fid);
@ -981,37 +999,16 @@ static int UpdateDocument (TRI_doc_operation_context_t* context,
dfi->_sizeDead += length; dfi->_sizeDead += length;
} }
if (oldData._fid != journal->_fid) {
// only select new journal if it different from the old
dfi = TRI_FindDatafileInfoPrimaryCollection(primary, journal->_fid); dfi = TRI_FindDatafileInfoPrimaryCollection(primary, journal->_fid);
}
if (dfi != NULL) { if (dfi != NULL) {
dfi->_numberAlive += 1; dfi->_numberAlive += 1;
dfi->_sizeAlive += LengthDataMasterPointer(newHeader); dfi->_sizeAlive += LengthDataMasterPointer(newHeader);
} }
// update immediate indexes
res = InsertSecondaryIndexes(document, newHeader);
// check for constraint error
if (context->_allowRollback && res != TRI_ERROR_NO_ERROR) {
int resUpd;
LOG_DEBUG("encountered index violating during update, rolling back");
resUpd = RollbackUpdate(primary, oldHeader, oldData._data, result);
if (resUpd != TRI_ERROR_NO_ERROR) {
LOG_ERROR("encountered error '%s' during rollback of update", TRI_errno_string(resUpd));
}
}
// .............................................................................
// create result
// .............................................................................
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
// ............................................................................. // .............................................................................
// post process update // post process update
// ............................................................................. // .............................................................................
@ -1028,16 +1025,43 @@ static int UpdateDocument (TRI_doc_operation_context_t* context,
} }
} }
if (mptr != NULL) {
*mptr = *((TRI_doc_mptr_t*) oldHeader);
}
// wait for sync // wait for sync
if (context->_sync) { if (forceSync) {
WaitSync(document, journal, ((char const*) *result) + markerSize + bodySize); WaitSync(document, journal, ((char const*) result) + totalSize);
} }
return TRI_ERROR_NO_ERROR; // write new header into result
*mptr = *((TRI_doc_mptr_t*) newHeader);
TRI_ASSERT_DEBUG(res == TRI_ERROR_NO_ERROR);
}
else {
// writing the element into the datafile has failed
LOG_ERROR("cannot write element into datafile: '%s'", TRI_last_error());
}
}
if (res != TRI_ERROR_NO_ERROR) {
// rollback index insertion
int resRollback;
resRollback = DeleteSecondaryIndexes(document, newHeader);
if (resRollback != TRI_ERROR_NO_ERROR) {
LOG_DEBUG("encountered error '%s' during rollback of update", TRI_errno_string(resRollback));
}
// copy back old header data
*oldHeader = oldData;
resRollback = InsertSecondaryIndexes(document, oldHeader);
if (resRollback != TRI_ERROR_NO_ERROR) {
LOG_DEBUG("encountered error '%s' during rollback of update", TRI_errno_string(resRollback));
}
}
return res;
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -1236,36 +1260,15 @@ static void DebugHeaderDocumentCollection (TRI_document_collection_t* collection
} }
} }
////////////////////////////////////////////////////////////////////////////////
/// @brief initialise a document marker with common attributes
////////////////////////////////////////////////////////////////////////////////
static void InitDocumentMarker (TRI_doc_document_key_marker_t* marker,
const TRI_df_marker_type_t type,
TRI_shaped_json_t const* json,
const bool generateRid) {
marker->base._type = type;
// generate a new tick
if (generateRid) {
marker->_rid = marker->base._tick = TRI_NewTickVocBase();
}
assert(json->_sid != 0);
marker->_sid = 0;
marker->_shape = json->_sid;
}
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief insert a shaped-json document into the collection /// @brief insert a shaped-json document into the collection
/// note: key might be NULL. in this case, a key is auto-generated /// note: key might be NULL. in this case, a key is auto-generated
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
static int InsertShapedJson (TRI_doc_operation_context_t* context, static int InsertShapedJson (TRI_doc_operation_context_t* context,
TRI_df_marker_type_e markerType,
const TRI_voc_key_t key, const TRI_voc_key_t key,
TRI_doc_mptr_t* mptr, TRI_doc_mptr_t* mptr,
TRI_df_marker_type_e markerType,
TRI_shaped_json_t const* shaped, TRI_shaped_json_t const* shaped,
void const* data, void const* data,
const bool lock, const bool lock,
@ -1332,8 +1335,8 @@ static int InsertShapedJson (TRI_doc_operation_context_t* context,
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
static int ReadShapedJson (TRI_doc_operation_context_t* context, static int ReadShapedJson (TRI_doc_operation_context_t* context,
TRI_doc_mptr_t* mptr, const TRI_voc_key_t key,
const TRI_voc_key_t key) { TRI_doc_mptr_t* mptr) {
TRI_primary_collection_t* primary; TRI_primary_collection_t* primary;
TRI_doc_mptr_t const* header; TRI_doc_mptr_t const* header;
@ -1359,99 +1362,68 @@ static int ReadShapedJson (TRI_doc_operation_context_t* context,
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
static int UpdateShapedJson (TRI_doc_operation_context_t* context, static int UpdateShapedJson (TRI_doc_operation_context_t* context,
const TRI_voc_key_t key,
TRI_doc_mptr_t* mptr, TRI_doc_mptr_t* mptr,
TRI_shaped_json_t const* json, TRI_shaped_json_t const* shaped,
TRI_voc_key_t key) { const bool lock,
TRI_df_marker_t const* original; const bool forceSync) {
TRI_df_marker_t* result;
TRI_primary_collection_t* primary; TRI_primary_collection_t* primary;
TRI_doc_mptr_t const* header; TRI_doc_document_key_marker_t* marker;
char* keyBody; TRI_doc_mptr_t* header;
size_t keyBodyLength; TRI_voc_size_t totalSize;
int res;
TRI_ASSERT_DEBUG(mptr != NULL);
// initialise the result
mptr->_key = NULL;
mptr->_data = NULL;
marker = NULL;
primary = context->_collection; primary = context->_collection;
// get an existing header pointer if (lock) {
primary->beginWrite(primary);
}
TRI_ASSERT_DEBUG(key != NULL);
// get the header pointer of the previous revision
header = TRI_LookupByKeyAssociativePointer(&primary->_primaryIndex, key); header = TRI_LookupByKeyAssociativePointer(&primary->_primaryIndex, key);
if (! IsVisible(header, context)) { if (IsVisible(header, context)) {
return TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND; // document found, now check revision
res = TRI_RevisionCheck(context, header->_rid);
} }
else {
// document not found
res = TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND;
}
if (res == TRI_ERROR_NO_ERROR) {
TRI_df_marker_t const* original;
original = header->_data; original = header->_data;
if (original->_type != TRI_DOC_MARKER_KEY_DOCUMENT && // TODO: make calcCrc flag (last parameter) dynamic, based on the collection properties
original->_type != TRI_DOC_MARKER_KEY_EDGE) { res = CloneDocumentMarker(original, &marker, &totalSize, original->_type, shaped, true);
// invalid marker type
return TRI_ERROR_INTERNAL; if (res == TRI_ERROR_NO_ERROR) {
res = UpdateDocument((TRI_document_collection_t*) primary, header, marker, totalSize, forceSync, mptr);
}
} }
if (lock) {
if (original->_type == TRI_DOC_MARKER_KEY_DOCUMENT) { primary->endWrite(primary);
// the original is a document
TRI_doc_document_key_marker_t marker;
TRI_doc_document_key_marker_t const* o;
o = header->_data;
// create an update
memset(&marker, 0, sizeof(marker));
InitDocumentMarker(&marker, o->base._type, json, false);
keyBody = ((char*) original) + o->_offsetKey;
keyBodyLength = o->_offsetJson - o->_offsetKey;
marker._offsetJson = o->_offsetJson;
marker._offsetKey = o->_offsetKey;
marker.base._size = sizeof(marker) + keyBodyLength + json->_data.length;
return UpdateDocument(context,
header,
&marker,
sizeof(marker),
keyBody,
keyBodyLength,
json->_data.data,
json->_data.length,
&result,
mptr);
} }
// the original is an edge if (marker != NULL) {
else { TRI_Free(TRI_UNKNOWN_MEM_ZONE, marker);
TRI_doc_edge_key_marker_t marker;
TRI_doc_edge_key_marker_t const* o;
o = header->_data;
// create an update
memset(&marker, 0, sizeof(marker));
InitDocumentMarker(&marker.base, o->base.base._type, json, false);
marker._fromCid = o->_fromCid;
marker._toCid = o->_toCid;
keyBody = ((char*) o) + o->base._offsetKey;
keyBodyLength = o->base._offsetJson - o->base._offsetKey;
marker.base._offsetJson = o->base._offsetJson;
marker.base._offsetKey = o->base._offsetKey;
marker._offsetFromKey = o->_offsetFromKey;
marker._offsetToKey = o->_offsetToKey;
marker.base.base._size = sizeof(marker) + keyBodyLength + json->_data.length;
return UpdateDocument(context,
header,
&marker.base,
sizeof(marker),
keyBody,
keyBodyLength,
json->_data.data,
json->_data.length,
&result,
mptr);
} }
return res;
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -1460,7 +1432,6 @@ static int UpdateShapedJson (TRI_doc_operation_context_t* context,
static int DeleteShapedJson (TRI_doc_operation_context_t* context, static int DeleteShapedJson (TRI_doc_operation_context_t* context,
const TRI_voc_key_t key, const TRI_voc_key_t key,
const TRI_voc_size_t keyLength,
const bool lock, const bool lock,
const bool forceSync) { const bool forceSync) {
TRI_primary_collection_t* primary; TRI_primary_collection_t* primary;
@ -1468,9 +1439,11 @@ static int DeleteShapedJson (TRI_doc_operation_context_t* context,
TRI_voc_size_t totalSize; TRI_voc_size_t totalSize;
int res; int res;
TRI_ASSERT_DEBUG(key != NULL);
// TODO: make calcCrc dynamic // TODO: make calcCrc dynamic
marker = NULL; marker = NULL;
res = CreateDeletionMarker(&marker, &totalSize, key, keyLength, true); res = CreateDeletionMarker(&marker, &totalSize, key, strlen(key), true);
if (res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_NO_ERROR) {
return res; return res;
@ -1680,7 +1653,7 @@ static bool OpenIterator (TRI_df_marker_t const* marker, void* data, TRI_datafil
newHeader = CONST_CAST(found); newHeader = CONST_CAST(found);
// update the header info // update the header info
UpdateHeader(datafile, marker, newHeader, found); UpdateHeader(datafile->_fid, marker, newHeader, found);
newHeader->_validTo = 0; newHeader->_validTo = 0;
@ -5197,7 +5170,7 @@ TRI_vector_t TRI_SelectByExample (TRI_doc_operation_context_t* context,
int TRI_DeleteDocumentDocumentCollection (TRI_doc_operation_context_t* context, int TRI_DeleteDocumentDocumentCollection (TRI_doc_operation_context_t* context,
TRI_doc_mptr_t* doc) { TRI_doc_mptr_t* doc) {
// no extra locking here as the collection is already locked // no extra locking here as the collection is already locked
return DeleteShapedJson(context, doc->_key, strlen(doc->_key), false, false); return DeleteShapedJson(context, doc->_key, false, false);
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////

View File

@ -320,12 +320,12 @@ typedef struct TRI_primary_collection_s {
void (*createHeader) (struct TRI_primary_collection_s*, TRI_datafile_t*, TRI_df_marker_t const*, size_t, TRI_doc_mptr_t*, void const* data); void (*createHeader) (struct TRI_primary_collection_s*, TRI_datafile_t*, TRI_df_marker_t const*, size_t, TRI_doc_mptr_t*, void const* data);
void (*updateHeader) (struct TRI_primary_collection_s*, TRI_datafile_t*, TRI_df_marker_t const*, size_t, TRI_doc_mptr_t const*, TRI_doc_mptr_t*); void (*updateHeader) (struct TRI_primary_collection_s*, TRI_datafile_t*, TRI_df_marker_t const*, size_t, TRI_doc_mptr_t const*, TRI_doc_mptr_t*);
int (*insert) (struct TRI_doc_operation_context_s*, TRI_df_marker_type_e, const TRI_voc_key_t, TRI_doc_mptr_t*, TRI_shaped_json_t const*, void const*, const bool, const bool); int (*insert) (struct TRI_doc_operation_context_s*, const TRI_voc_key_t, TRI_doc_mptr_t*, TRI_df_marker_type_e, TRI_shaped_json_t const*, void const*, const bool, const bool);
int (*read) (struct TRI_doc_operation_context_s*, TRI_doc_mptr_t*, const TRI_voc_key_t); int (*read) (struct TRI_doc_operation_context_s*, const TRI_voc_key_t, TRI_doc_mptr_t*);
int (*update) (struct TRI_doc_operation_context_s*, TRI_doc_mptr_t*, TRI_shaped_json_t const*, TRI_voc_key_t); int (*update) (struct TRI_doc_operation_context_s*, const TRI_voc_key_t, TRI_doc_mptr_t*, TRI_shaped_json_t const*, const bool, const bool);
int (*destroy) (struct TRI_doc_operation_context_s*, const TRI_voc_key_t, const TRI_voc_size_t, const bool, const bool); int (*destroy) (struct TRI_doc_operation_context_s*, const TRI_voc_key_t, const bool, const bool);
TRI_doc_collection_info_t* (*figures) (struct TRI_primary_collection_s* collection); TRI_doc_collection_info_t* (*figures) (struct TRI_primary_collection_s* collection);
TRI_voc_size_t (*size) (struct TRI_primary_collection_s* collection); TRI_voc_size_t (*size) (struct TRI_primary_collection_s* collection);