1
0
Fork 0

Merge branch 'devel' of https://github.com/arangodb/arangodb into devel

This commit is contained in:
Kaveh Vahedipour 2017-06-08 12:44:50 +02:00
commit e55d3e92a9
20 changed files with 133 additions and 39 deletions

View File

@ -28,6 +28,7 @@
#include <velocypack/Iterator.h>
#include <velocypack/velocypack-aliases.h>
#include "ApplicationFeatures/ApplicationServer.h"
#include "Aql/Function.h"
#include "Aql/Query.h"
#include "Basics/Exceptions.h"
@ -1758,9 +1759,10 @@ AqlValue Functions::Sleep(arangodb::aql::Query* query,
if (query->killed()) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_QUERY_KILLED);
} else if (application_features::ApplicationServer::isStopping()) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_SHUTTING_DOWN);
}
}
return AqlValue(arangodb::basics::VelocyPackHelper::NullValue());
}

View File

@ -23,7 +23,6 @@
#include "Query.h"
#include "ApplicationFeatures/ApplicationServer.h"
#include "Aql/AqlItemBlock.h"
#include "Aql/AqlTransaction.h"
#include "Aql/ExecutionBlock.h"
@ -43,7 +42,6 @@
#include "Cluster/ServerState.h"
#include "Logger/Logger.h"
#include "RestServer/AqlFeature.h"
#include "RestServer/QueryRegistryFeature.h"
#include "StorageEngine/TransactionState.h"
#include "Transaction/Methods.h"
#include "Transaction/StandaloneContext.h"
@ -90,7 +88,6 @@ Query::Query(bool contextOwnedByExterior, TRI_vocbase_t* vocbase,
_trx(nullptr),
_warnings(),
_startTime(TRI_microtime()),
_queryRegistry(application_features::ApplicationServer::getFeature<arangodb::QueryRegistryFeature>("QueryRegistry")),
_part(part),
_contextOwnedByExterior(contextOwnedByExterior),
_killed(false),
@ -170,7 +167,6 @@ Query::Query(bool contextOwnedByExterior, TRI_vocbase_t* vocbase,
_trx(nullptr),
_warnings(),
_startTime(TRI_microtime()),
_queryRegistry(application_features::ApplicationServer::getFeature<arangodb::QueryRegistryFeature>("QueryRegistry")),
_part(part),
_contextOwnedByExterior(contextOwnedByExterior),
_killed(false),

View File

@ -55,8 +55,6 @@ namespace velocypack {
class Builder;
}
class QueryRegistryFeature;
namespace aql {
struct AstNode;
@ -342,8 +340,6 @@ class Query {
/// @brief query start time
double _startTime;
QueryRegistryFeature const* _queryRegistry;
/// @brief the query part
QueryPart const _part;

View File

@ -118,8 +118,7 @@ Query* QueryRegistry::open(TRI_vocbase_t* vocbase, QueryId id) {
auto m = _queries.find(vocbase->name());
if (m == _queries.end()) {
m = _queries.emplace(vocbase->name(),
std::unordered_map<QueryId, QueryInfo*>()).first;
return nullptr;
}
auto q = m->second.find(id);
if (q == m->second.end()) {
@ -290,6 +289,10 @@ void QueryRegistry::destroyAll() {
}
}
for (auto& p : allQueries) {
destroy(p.first, p.second, TRI_ERROR_SHUTTING_DOWN);
try {
destroy(p.first, p.second, TRI_ERROR_SHUTTING_DOWN);
} catch (...) {
// ignore any errors here
}
}
}

View File

@ -1138,6 +1138,11 @@ std::vector<communicator::Ticket> ClusterComm::activeServerTickets(std::vector<s
return tickets;
}
void ClusterComm::disable() {
_communicator->disable();
_communicator->abortRequests();
}
////////////////////////////////////////////////////////////////////////////////
/// @brief ClusterComm main loop
////////////////////////////////////////////////////////////////////////////////

View File

@ -518,6 +518,12 @@ class ClusterComm {
void addAuthorization(std::unordered_map<std::string, std::string>* headers);
std::string jwt() { return _jwt; };
//////////////////////////////////////////////////////////////////////////////
/// @brief abort and disable all communication
//////////////////////////////////////////////////////////////////////////////
void disable();
private:
size_t performSingleRequest(std::vector<ClusterCommRequest>& requests,
@ -616,6 +622,7 @@ class ClusterComm {
void cleanupAllQueues();
//////////////////////////////////////////////////////////////////////////////
/// @brief activeServerTickets for a list of servers
//////////////////////////////////////////////////////////////////////////////
@ -638,6 +645,7 @@ class ClusterComm {
bool _authenticationEnabled;
std::string _jwt;
std::string _jwtAuthorization;
};
////////////////////////////////////////////////////////////////////////////////

View File

@ -537,6 +537,7 @@ void ClusterFeature::unprepare() {
}
if (!_enableCluster) {
ClusterComm::instance()->disable();
ClusterComm::cleanup();
return;
}

View File

@ -30,7 +30,6 @@
#include "RestServer/QueryRegistryFeature.h"
#include "RestServer/TraverserEngineRegistryFeature.h"
using namespace arangodb;
using namespace arangodb::application_features;
@ -68,14 +67,13 @@ AqlFeature* AqlFeature::lease() {
void AqlFeature::unlease() {
MUTEX_LOCKER(locker, AqlFeature::_aqlFeatureMutex);
AqlFeature* aql = AqlFeature::_AQL;
if (aql == nullptr) {
return;
}
TRI_ASSERT(aql != nullptr);
--aql->_numberLeases;
}
void AqlFeature::start() {
MUTEX_LOCKER(locker, AqlFeature::_aqlFeatureMutex);
TRI_ASSERT(_AQL == nullptr);
_AQL = this;
LOG_TOPIC(DEBUG, Logger::QUERIES) << "AQL feature started";
}

View File

@ -1691,7 +1691,6 @@ uint64_t RocksDBCollection::recalculateCounts() {
// count documents
auto documentBounds = RocksDBKeyBounds::CollectionDocuments(_objectId);
_numberDocuments = rocksutils::countKeyRange(globalRocksDB(), readOptions,
RocksDBColumnFamily::documents(),
documentBounds);
// update counter manager value

View File

@ -27,8 +27,8 @@
#include "Basics/RocksDBUtils.h"
#include "Basics/StringRef.h"
#include "Logger/Logger.h"
#include "RocksDBEngine/RocksDBComparator.h"
#include "RocksDBEngine/RocksDBColumnFamily.h"
#include "RocksDBEngine/RocksDBComparator.h"
#include "RocksDBEngine/RocksDBEngine.h"
#include "RocksDBEngine/RocksDBKey.h"
#include "RocksDBEngine/RocksDBKeyBounds.h"
@ -134,7 +134,7 @@ RocksDBTransactionState* toRocksTransactionState(transaction::Methods* trx) {
TRI_ASSERT(state != nullptr);
return static_cast<RocksDBTransactionState*>(state);
}
RocksDBMethods* toRocksMethods(transaction::Methods* trx) {
TRI_ASSERT(trx != nullptr);
TransactionState* state = trx->state();
@ -193,7 +193,8 @@ std::pair<TRI_voc_tick_t, TRI_voc_cid_t> mapObjectToCollection(
}
std::size_t countKeyRange(rocksdb::DB* db, rocksdb::ReadOptions const& opts,
rocksdb::ColumnFamilyHandle* handle, RocksDBKeyBounds const& bounds) {
RocksDBKeyBounds const& bounds) {
rocksdb::ColumnFamilyHandle* handle = bounds.columnFamily();
rocksdb::Comparator const* cmp = db->GetOptions().comparator;
std::unique_ptr<rocksdb::Iterator> it(db->NewIterator(opts, handle));
std::size_t count = 0;
@ -214,13 +215,13 @@ Result removeLargeRange(rocksdb::TransactionDB* db,
RocksDBKeyBounds const& bounds) {
LOG_TOPIC(DEBUG, Logger::FIXME) << "removing large range: " << bounds;
try {
rocksdb::ColumnFamilyHandle* handle = bounds.columnFamily();
// delete files in range lower..upper
rocksdb::Slice lower(bounds.start());
rocksdb::Slice upper(bounds.end());
{
rocksdb::Status status = rocksdb::DeleteFilesInRange(
db->GetBaseDB(), db->GetBaseDB()->DefaultColumnFamily(), &lower,
&upper);
rocksdb::Status status =
rocksdb::DeleteFilesInRange(db->GetBaseDB(), handle, &lower, &upper);
if (!status.ok()) {
// if file deletion failed, we will still iterate over the remaining
// keys, so we don't need to abort and raise an error here
@ -235,8 +236,7 @@ Result removeLargeRange(rocksdb::TransactionDB* db,
rocksdb::WriteBatch batch;
rocksdb::ReadOptions readOptions;
readOptions.fill_cache = false;
std::unique_ptr<rocksdb::Iterator> it(
db->NewIterator(readOptions));
std::unique_ptr<rocksdb::Iterator> it(db->NewIterator(readOptions));
// TODO: split this into multiple batches if batches get too big
it->Seek(lower);
@ -253,7 +253,7 @@ Result removeLargeRange(rocksdb::TransactionDB* db,
<< "RocksDB key deletion failed: " << status.ToString();
return TRI_ERROR_INTERNAL;
}
return TRI_ERROR_NO_ERROR;
} catch (arangodb::basics::Exception const& ex) {
LOG_TOPIC(ERR, arangodb::Logger::FIXME)
@ -274,10 +274,13 @@ std::vector<std::pair<RocksDBKey, RocksDBValue>> collectionKVPairs(
TRI_voc_tick_t databaseId) {
std::vector<std::pair<RocksDBKey, RocksDBValue>> rv;
RocksDBKeyBounds bounds = RocksDBKeyBounds::DatabaseCollections(databaseId);
iterateBounds(bounds, [&rv](rocksdb::Iterator* it) {
rv.emplace_back(RocksDBKey(it->key()),
RocksDBValue(RocksDBEntryType::Collection, it->value()));
}, arangodb::RocksDBColumnFamily::other());
iterateBounds(bounds,
[&rv](rocksdb::Iterator* it) {
rv.emplace_back(
RocksDBKey(it->key()),
RocksDBValue(RocksDBEntryType::Collection, it->value()));
},
arangodb::RocksDBColumnFamily::other());
return rv;
}
@ -285,10 +288,13 @@ std::vector<std::pair<RocksDBKey, RocksDBValue>> viewKVPairs(
TRI_voc_tick_t databaseId) {
std::vector<std::pair<RocksDBKey, RocksDBValue>> rv;
RocksDBKeyBounds bounds = RocksDBKeyBounds::DatabaseViews(databaseId);
iterateBounds(bounds, [&rv](rocksdb::Iterator* it) {
rv.emplace_back(RocksDBKey(it->key()),
RocksDBValue(RocksDBEntryType::View, it->value()));
}, arangodb::RocksDBColumnFamily::other());
iterateBounds(bounds,
[&rv](rocksdb::Iterator* it) {
rv.emplace_back(
RocksDBKey(it->key()),
RocksDBValue(RocksDBEntryType::View, it->value()));
},
arangodb::RocksDBColumnFamily::other());
return rv;
}

View File

@ -167,7 +167,7 @@ std::pair<TRI_voc_tick_t, TRI_voc_cid_t> mapObjectToCollection(uint64_t);
/// Iterator over all keys in range and count them
std::size_t countKeyRange(rocksdb::DB*, rocksdb::ReadOptions const&,
rocksdb::ColumnFamilyHandle*, RocksDBKeyBounds const&);
RocksDBKeyBounds const&);
/// @brief helper method to remove large ranges of data
/// Should mainly be used to implement the drop() call

View File

@ -24,6 +24,7 @@
#include "RocksDBKeyBounds.h"
#include "Basics/Exceptions.h"
#include "RocksDBEngine/RocksDBColumnFamily.h"
#include "RocksDBEngine/RocksDBCommon.h"
#include "RocksDBEngine/RocksDBTypes.h"
@ -199,6 +200,28 @@ uint64_t RocksDBKeyBounds::objectId() const {
}
}
rocksdb::ColumnFamilyHandle* RocksDBKeyBounds::columnFamily() const {
RocksDBEntryType type = static_cast<RocksDBEntryType>(_internals._buffer[0]);
switch (type) {
case RocksDBEntryType::Document:
return RocksDBColumnFamily::documents();
case RocksDBEntryType::PrimaryIndexValue:
return RocksDBColumnFamily::primary();
case RocksDBEntryType::EdgeIndexValue:
return RocksDBColumnFamily::edge();
case RocksDBEntryType::IndexValue:
return RocksDBColumnFamily::index();
case RocksDBEntryType::UniqueIndexValue:
return RocksDBColumnFamily::uniqueIndex();
case RocksDBEntryType::FulltextIndexValue:
return RocksDBColumnFamily::fulltext();
case RocksDBEntryType::GeoIndexValue:
return RocksDBColumnFamily::geo();
default:
return RocksDBColumnFamily::other();
}
}
// constructor for an empty bound. do not use for anything but to
// default-construct a key bound!
RocksDBKeyBounds::RocksDBKeyBounds()

View File

@ -36,6 +36,10 @@
#include <iosfwd>
namespace rocksdb {
class ColumnFamilyHandle;
}
namespace arangodb {
class RocksDBKeyBounds {
@ -170,6 +174,17 @@ class RocksDBKeyBounds {
return _internals.end();
}
//////////////////////////////////////////////////////////////////////////////
/// @brief Returns the column family from this Bound
///
/// All bounds iterators need to iterate over the correct column families
/// with this helper function it is made sure that correct column family
/// for bound is used.
//////////////////////////////////////////////////////////////////////////////
rocksdb::ColumnFamilyHandle* columnFamily() const;
//////////////////////////////////////////////////////////////////////////////
/// @brief Returns the object ID for these bounds
///

View File

@ -274,6 +274,7 @@
"SIMPLE_CLIENT_COULD_NOT_WRITE" : { "code" : 2002, "message" : "could not write to server" },
"SIMPLE_CLIENT_COULD_NOT_READ" : { "code" : 2003, "message" : "could not read from server" },
"COMMUNICATOR_REQUEST_ABORTED" : { "code" : 2100, "message" : "Request aborted" },
"COMMUNICATOR_DISABLED" : { "code" : 2101, "message" : "Communication was disabled" },
"ERROR_MALFORMED_MANIFEST_FILE" : { "code" : 3000, "message" : "failed to parse manifest file" },
"ERROR_INVALID_SERVICE_MANIFEST" : { "code" : 3001, "message" : "manifest file is invalid" },
"ERROR_SERVICE_FILES_MISSING" : { "code" : 3002, "message" : "service files missing" },

View File

@ -244,7 +244,13 @@ void ApplicationServer::beginShutdown() {
++it) {
if ((*it)->isEnabled()) {
LOG_TOPIC(TRACE, Logger::STARTUP) << (*it)->name() << "::beginShutdown";
(*it)->beginShutdown();
try {
(*it)->beginShutdown();
} catch (std::exception const& ex) {
LOG_TOPIC(ERR, Logger::STARTUP) << "caught exception during beginShutdown of feature '" << (*it)->name() << "': " << ex.what();
} catch (...) {
LOG_TOPIC(ERR, Logger::STARTUP) << "caught unknown exception during beginShutdown of feature '" << (*it)->name() << "'";
}
}
}
@ -616,7 +622,13 @@ void ApplicationServer::stop() {
}
LOG_TOPIC(TRACE, Logger::STARTUP) << feature->name() << "::stop";
feature->stop();
try {
feature->stop();
} catch (std::exception const& ex) {
LOG_TOPIC(ERR, Logger::STARTUP) << "caught exception during stop of feature '" << feature->name() << "': " << ex.what();
} catch (...) {
LOG_TOPIC(ERR, Logger::STARTUP) << "caught unknown exception during stop of feature '" << feature->name() << "'";
}
feature->state(FeatureState::STOPPED);
reportFeatureProgress(_state, feature->name());
}
@ -630,7 +642,13 @@ void ApplicationServer::unprepare() {
auto feature = *it;
LOG_TOPIC(TRACE, Logger::STARTUP) << feature->name() << "::unprepare";
feature->unprepare();
try {
feature->unprepare();
} catch (std::exception const& ex) {
LOG_TOPIC(ERR, Logger::STARTUP) << "caught exception during unprepare of feature '" << feature->name() << "': " << ex.what();
} catch (...) {
LOG_TOPIC(ERR, Logger::STARTUP) << "caught unknown exception during unprepare of feature '" << feature->name() << "'";
}
feature->state(FeatureState::UNPREPARED);
reportFeatureProgress(_state, feature->name());
}

View File

@ -377,6 +377,7 @@ SIMPLE_CLIENT_COULD_NOT_READ,2003,"could not read from server","Will be raised w
################################################################################
COMMUNICATOR_REQUEST_ABORTED,2100,"Request aborted","Request was aborted."
COMMUNICATOR_DISABLED,2101,"Communication was disabled","Communication was disabled."
################################################################################
## Foxx management errors

View File

@ -270,6 +270,7 @@ void TRI_InitializeErrorMessages () {
REG_ERROR(SIMPLE_CLIENT_COULD_NOT_WRITE, "could not write to server");
REG_ERROR(SIMPLE_CLIENT_COULD_NOT_READ, "could not read from server");
REG_ERROR(COMMUNICATOR_REQUEST_ABORTED, "Request aborted");
REG_ERROR(COMMUNICATOR_DISABLED, "Communication was disabled");
REG_ERROR(ERROR_MALFORMED_MANIFEST_FILE, "failed to parse manifest file");
REG_ERROR(ERROR_INVALID_SERVICE_MANIFEST, "manifest file is invalid");
REG_ERROR(ERROR_SERVICE_FILES_MISSING, "service files missing");

View File

@ -647,6 +647,8 @@
/// Will be raised when the client could not read data.
/// - 2100: @LIT{Request aborted}
/// Request was aborted.
/// - 2101: @LIT{Communication was disabled}
/// Communication was disabled.
/// - 3000: @LIT{failed to parse manifest file}
/// The service manifest file is not well-formed JSON.
/// - 3001: @LIT{manifest file is invalid}
@ -3475,6 +3477,16 @@ void TRI_InitializeErrorMessages ();
#define TRI_COMMUNICATOR_REQUEST_ABORTED (2100)
////////////////////////////////////////////////////////////////////////////////
/// @brief 2101: COMMUNICATOR_DISABLED
///
/// Communication was disabled
///
/// Communication was disabled.
////////////////////////////////////////////////////////////////////////////////
#define TRI_COMMUNICATOR_DISABLED (2101)
////////////////////////////////////////////////////////////////////////////////
/// @brief 3000: ERROR_MALFORMED_MANIFEST_FILE
///

View File

@ -131,7 +131,7 @@ std::atomic_uint_fast64_t NEXT_TICKET_ID(static_cast<uint64_t>(0));
std::vector<char> urlDotSeparators{'/', '#', '?'};
}
Communicator::Communicator() : _curl(nullptr), _mc(CURLM_OK) {
Communicator::Communicator() : _curl(nullptr), _mc(CURLM_OK), _enabled(true) {
curl_global_init(CURL_GLOBAL_ALL);
_curl = curl_multi_init();
@ -252,6 +252,12 @@ void Communicator::wait() {
// -----------------------------------------------------------------------------
void Communicator::createRequestInProgress(NewRequest const& newRequest) {
if (!_enabled) {
LOG_TOPIC(DEBUG, arangodb::Logger::COMMUNICATION) << "Request to '" << newRequest._destination.url() << "' was not even started because communication is disabled";
newRequest._callbacks._onError(TRI_COMMUNICATOR_DISABLED, {nullptr});
return;
}
auto request = (HttpRequest*)newRequest._request.get();
TRI_ASSERT(request != nullptr);

View File

@ -120,6 +120,8 @@ class Communicator {
std::vector<RequestInProgress const*> requestsInProgress();
void abortRequest(Ticket ticketId);
void abortRequests();
void disable() { _enabled = false; };
void enable() { _enabled = true; };
private:
struct NewRequest {
@ -144,6 +146,7 @@ class Communicator {
#else
int _fds[2];
#endif
bool _enabled;
private:
void createRequestInProgress(NewRequest const& newRequest);