1
0
Fork 0

Sort out transactions in cluster, part I.

This commit is contained in:
Max Neunhoeffer 2014-10-17 15:25:46 +02:00
parent 7a25f20e23
commit cb75b0b18a
5 changed files with 26 additions and 54 deletions

View File

@ -473,7 +473,7 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
jsonNodesList.set("variables", query->ast()->variables()->toJson(TRI_UNKNOWN_MEM_ZONE));
result.set("plan", jsonNodesList);
result.set("part", Json("main")); // TODO: set correct query type
result.set("part", Json("dependent")); // TODO: set correct query type
Json optimizerOptionsRules(Json::List);
Json optimizerOptions(Json::Array);
@ -527,12 +527,12 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
triagens::basics::Json response(TRI_UNKNOWN_MEM_ZONE, triagens::basics::JsonHelper::fromString(res->answer->body()));
std::string queryId = triagens::basics::JsonHelper::getStringValue(response.json(), "queryId", "");
// std::cout << "DB SERVER ANSWERED WITHOUT ERROR: " << res->answer->body() << ", SHARDID:" << res->shardID << ", QUERYID: " << queryId << "\n";
std::cout << "DB SERVER ANSWERED WITHOUT ERROR: " << res->answer->body() << ", SHARDID:" << res->shardID << ", QUERYID: " << queryId << "\n";
queryIds.emplace(std::make_pair(res->shardID, queryId));
}
else {
// std::cout << "DB SERVER ANSWERED WITH ERROR: " << res->answer->body() << "\n";
std::cout << "DB SERVER ANSWERED WITH ERROR: " << res->answer->body() << "\n";
}
}
delete res;

View File

@ -39,7 +39,7 @@
#include "Basics/tri-strings.h"
#include "Utils/AqlTransaction.h"
#include "Utils/Exception.h"
#include "Utils/V8TransactionContext.h"
#include "Utils/StandaloneTransactionContext.h"
#include "VocBase/vocbase.h"
using namespace triagens::aql;
@ -386,7 +386,7 @@ QueryResult Query::prepare (QueryRegistry* registry) {
// std::cout << "AST: " << triagens::basics::JsonHelper::toString(parser->ast()->toJson(TRI_UNKNOWN_MEM_ZONE, false)) << "\n";
}
auto trx = new triagens::arango::AqlTransaction(new triagens::arango::V8TransactionContext(true), _vocbase, _collections.collections(), _part == PART_MAIN);
auto trx = new triagens::arango::AqlTransaction(new triagens::arango::StandaloneTransactionContext(), _vocbase, _collections.collections(), _part == PART_MAIN);
_trx = trx; // Save the transaction in our object
bool planRegisters;
@ -595,7 +595,7 @@ QueryResult Query::explain () {
// std::cout << "AST: " << triagens::basics::JsonHelper::toString(parser.ast()->toJson(TRI_UNKNOWN_MEM_ZONE)) << "\n";
// create the transaction object, but do not start it yet
auto trx = new triagens::arango::AqlTransaction(new triagens::arango::V8TransactionContext(true), _vocbase, _collections.collections(), true);
auto trx = new triagens::arango::AqlTransaction(new triagens::arango::StandaloneTransactionContext(), _vocbase, _collections.collections(), true);
_trx = trx; // save the pointer in this
// we have an AST

View File

@ -98,13 +98,8 @@ void QueryRegistry::insert (TRI_vocbase_t* vocbase,
TRI_ASSERT_EXPENSIVE(_queries.find(vocbase->_name)->second.find(id) != _queries.find(vocbase->_name)->second.end());
if (query->part() == PART_MAIN) {
// A query that is being shelved must unregister its transaction
// with the current context:
query->trx()->unregisterTransactionWithContext();
// Also, we need to count down the debugging counters for transactions:
triagens::arango::TransactionBase::increaseNumbers(-1, -1);
}
// Also, we need to count down the debugging counters for transactions:
triagens::arango::TransactionBase::increaseNumbers(-1, -1);
}
else {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
@ -136,14 +131,8 @@ Query* QueryRegistry::open (TRI_vocbase_t* vocbase,
}
qi->_isOpen = true;
// A query that is being opened must register its transaction
// with the current context:
if (qi->_query->part() == PART_MAIN) {
qi->_query->trx()->registerTransactionWithContext();
// Also, we need to count up the debugging counters for transactions:
triagens::arango::TransactionBase::increaseNumbers(1, 1);
}
// We need to count up the debugging counters for transactions:
triagens::arango::TransactionBase::increaseNumbers(1, 1);
return qi->_query;
}
@ -171,13 +160,8 @@ void QueryRegistry::close (TRI_vocbase_t* vocbase, QueryId id, double ttl) {
"query with given vocbase and id is not open");
}
// A query that is being closed must unregister its transaction
// with the current context:
if (qi->_query->part() == PART_MAIN) {
qi->_query->trx()->unregisterTransactionWithContext();
// Also, we need to count down the debugging counters for transactions:
triagens::arango::TransactionBase::increaseNumbers(1, 1);
}
// We need to count down the debugging counters for transactions:
triagens::arango::TransactionBase::increaseNumbers(-1, -1);
qi->_isOpen = false;
qi->_expires = TRI_microtime() + qi->_timeToLive;
@ -205,11 +189,8 @@ void QueryRegistry::destroy (std::string const& vocbase, QueryId id) {
// to register the transaction with the current context and adjust
// the debugging counters for transactions:
if (! qi->_isOpen) {
if (qi->_query->part() == PART_MAIN) {
qi->_query->trx()->registerTransactionWithContext();
// Also, we need to count down the debugging counters for transactions:
triagens::arango::TransactionBase::increaseNumbers(1, 1);
}
// We need to count up the debugging counters for transactions:
triagens::arango::TransactionBase::increaseNumbers(1, 1);
}
// Now we can delete it:

View File

@ -113,6 +113,8 @@ void RestAqlHandler::createQueryFromJson () {
if (queryJson.isEmpty()) {
return;
}
std::cout << "createQueryFromJson" << queryJson.toString() << std::endl;
Json plan;
Json options;
@ -163,7 +165,7 @@ void RestAqlHandler::createQueryFromJson () {
answerBody("queryId", Json(StringUtils::itoa(_qId)))
("ttl", Json(ttl));
//std::cout << "RESPONSE BODY IS: " << answerBody.toString() << "\n";
std::cout << "RESPONSE BODY IS: " << answerBody.toString() << "\n";
_response->body().appendText(answerBody.toString());
}
@ -434,6 +436,11 @@ void RestAqlHandler::useQuery (std::string const& operation,
_queryRegistry->close(_vocbase, _qId);
return;
}
std::cout << "useQuery op:" << operation << "," << idString << std::endl
<< queryJson.toString() << std::endl;
}
else {
std::cout << "useQuery shutdown" << std::endl;
}
try {
@ -467,6 +474,8 @@ void RestAqlHandler::useQuery (std::string const& operation,
TRI_ERROR_HTTP_SERVER_ERROR,
"an unknown exception occurred");
}
std::cout << "Response of useQuery:" << _response->body().c_str() <<
std::endl;
}
////////////////////////////////////////////////////////////////////////////////

View File

@ -36,7 +36,7 @@
#include "Cluster/ServerState.h"
#include "Utils/CollectionNameResolver.h"
#include "Utils/Transaction.h"
#include "Utils/V8TransactionContext.h"
#include "Utils/StandaloneTransactionContext.h"
#include "VocBase/transaction.h"
#include "VocBase/vocbase.h"
#include <v8.h>
@ -165,24 +165,6 @@ namespace triagens {
return trxColl->_collection->_collection;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief registerTransaction
////////////////////////////////////////////////////////////////////////////////
int registerTransactionWithContext () {
// This calls the method in the V8TransactionContext
return this->_transactionContext->registerTransaction(this->_trx);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief unregisterTransaction
////////////////////////////////////////////////////////////////////////////////
int unregisterTransactionWithContext () {
// This calls the method in the V8TransactionContext
return this->_transactionContext->unregisterTransaction();
}
////////////////////////////////////////////////////////////////////////////////
/// @brief clone, used to make daughter transactions for parts of a distributed
/// AQL query running on the coordinator
@ -190,7 +172,7 @@ namespace triagens {
triagens::arango::AqlTransaction* clone () const {
return new triagens::arango::AqlTransaction(
new triagens::arango::V8TransactionContext(true),
new triagens::arango::StandaloneTransactionContext(),
this->_vocbase,
&_collections, false);
}