1
0
Fork 0
arangodb/arangod/VocBase/Methods/Tasks.cpp

478 lines
15 KiB
C++

////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2018 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Dr. Frank Celler
/// @author Jan Steemann
/// @author Dan Larkin-York
////////////////////////////////////////////////////////////////////////////////
#include "VocBase/Methods/Tasks.h"
#include <v8.h>
#include <velocypack/Builder.h>
#include <velocypack/velocypack-aliases.h>
#include "Basics/FunctionUtils.h"
#include "Basics/StringUtils.h"
#include "Basics/system-functions.h"
#include "Basics/tri-strings.h"
#include "Cluster/ServerState.h"
#include "Logger/LogMacros.h"
#include "Logger/Logger.h"
#include "Logger/LoggerStream.h"
#include "Scheduler/Scheduler.h"
#include "Scheduler/SchedulerFeature.h"
#include "Transaction/Hints.h"
#include "Transaction/V8Context.h"
#include "Utils/DatabaseGuard.h"
#include "Utils/ExecContext.h"
#include "Utils/OperationOptions.h"
#include "Utils/SingleCollectionTransaction.h"
#include "V8/JavaScriptSecurityContext.h"
#include "V8/v8-conv.h"
#include "V8/v8-utils.h"
#include "V8/v8-vpack.h"
#include "V8Server/V8Context.h"
#include "V8Server/V8DealerFeature.h"
#include "VocBase/AccessMode.h"
#include "VocBase/ticks.h"
#include "VocBase/vocbase.h"
using namespace arangodb::basics;
// -----------------------------------------------------------------------------
// --SECTION-- task handling
// -----------------------------------------------------------------------------
namespace {
bool authorized(std::pair<std::string, std::shared_ptr<arangodb::Task>> const& task) {
arangodb::ExecContext const& exec = arangodb::ExecContext::current();
if (exec.isSuperuser()) {
return true;
}
return (task.first == exec.user());
}
} // namespace
namespace arangodb {
Mutex Task::_tasksLock;
std::unordered_map<std::string, std::pair<std::string, std::shared_ptr<Task>>> Task::_tasks;
std::shared_ptr<Task> Task::createTask(std::string const& id, std::string const& name,
TRI_vocbase_t* vocbase, std::string const& command,
bool allowUseDatabase, int& ec) {
if (id.empty()) {
ec = TRI_ERROR_TASK_INVALID_ID;
return nullptr;
}
TRI_ASSERT(nullptr != vocbase); // this check was previously in the
// DatabaseGuard constructor which on failure
// would fail Task constructor
if (vocbase->server().isStopping()) {
ec = TRI_ERROR_SHUTTING_DOWN;
return nullptr;
}
TRI_ASSERT(nullptr != vocbase); // this check was previously in the
// DatabaseGuard constructor which on failure
// would fail Task constructor
std::string const& user = ExecContext::current().user();
auto task = std::make_shared<Task>(id, name, *vocbase, command, allowUseDatabase);
MUTEX_LOCKER(guard, _tasksLock);
if (!_tasks.try_emplace(id, user, task).second) {
ec = TRI_ERROR_TASK_DUPLICATE_ID;
return {nullptr};
}
ec = TRI_ERROR_NO_ERROR;
return task;
}
int Task::unregisterTask(std::string const& id, bool cancel) {
if (id.empty()) {
return TRI_ERROR_TASK_INVALID_ID;
}
MUTEX_LOCKER(guard, _tasksLock);
auto itr = _tasks.find(id);
if (itr == _tasks.end() || !::authorized(itr->second)) {
return TRI_ERROR_TASK_NOT_FOUND;
}
if (cancel) {
itr->second.second->cancel();
}
_tasks.erase(itr);
return TRI_ERROR_NO_ERROR;
}
std::shared_ptr<VPackBuilder> Task::registeredTask(std::string const& id) {
MUTEX_LOCKER(guard, _tasksLock);
auto itr = _tasks.find(id);
if (itr == _tasks.end() || !::authorized(itr->second)) {
return nullptr;
}
return itr->second.second->toVelocyPack();
}
std::shared_ptr<VPackBuilder> Task::registeredTasks() {
auto builder = std::make_shared<VPackBuilder>();
try {
VPackArrayBuilder b1(builder.get());
MUTEX_LOCKER(guard, _tasksLock);
for (auto& it : _tasks) {
if (::authorized(it.second)) {
VPackObjectBuilder b2(builder.get());
it.second.second->toVelocyPack(*builder);
}
}
} catch (...) {
return std::make_shared<VPackBuilder>();
}
return builder;
}
void Task::shutdownTasks() {
{
MUTEX_LOCKER(guard, _tasksLock);
for (auto& it : _tasks) {
it.second.second->cancel();
}
}
// wait for the tasks to be cleaned up
int iterations = 0;
while (true) {
size_t size;
{
MUTEX_LOCKER(guard, _tasksLock);
size = _tasks.size();
}
if (size == 0) {
break;
}
if (++iterations % 10 == 0) {
LOG_TOPIC("3966b", INFO, Logger::FIXME) << "waiting for " << size << " task(s) to complete";
} else if (iterations >= 25) {
LOG_TOPIC("54653", INFO, Logger::FIXME) << "giving up waiting for unfinished tasks";
MUTEX_LOCKER(guard, _tasksLock);
_tasks.clear();
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
}
void Task::removeTasksForDatabase(std::string const& name) {
MUTEX_LOCKER(guard, _tasksLock);
for (auto it = _tasks.begin(); it != _tasks.end(); /* no hoisting */) {
if (!(*it).second.second->databaseMatches(name)) {
++it;
} else {
auto task = (*it).second.second;
task->cancel();
it = _tasks.erase(it);
}
}
}
bool Task::tryCompile(v8::Isolate* isolate, std::string const& command) {
v8::HandleScope scope(isolate);
// get built-in Function constructor (see ECMA-262 5th edition 15.3.2)
auto current = isolate->GetCurrentContext()->Global();
auto ctor = v8::Local<v8::Function>::Cast(
current->Get(TRI_V8_ASCII_STRING(isolate, "Function")));
// Invoke Function constructor to create function with the given body and no
// arguments
v8::Handle<v8::Value> args[2] = {TRI_V8_ASCII_STRING(isolate, "params"),
TRI_V8_STD_STRING(isolate, command)};
v8::Local<v8::Object> function = ctor->NewInstance(TRI_IGETC, 2, args).FromMaybe(v8::Local<v8::Object>());
v8::Handle<v8::Function> action = v8::Local<v8::Function>::Cast(function);
return (!action.IsEmpty());
}
bool Task::databaseMatches(std::string const& name) const {
return (_dbGuard->database().name() == name);
}
Task::Task(std::string const& id, std::string const& name, TRI_vocbase_t& vocbase,
std::string const& command, bool allowUseDatabase)
: _id(id),
_name(name),
_created(TRI_microtime()),
_dbGuard(new DatabaseGuard(vocbase)),
_command(command),
_allowUseDatabase(allowUseDatabase),
_offset(0),
_interval(0) {}
Task::~Task() = default;
void Task::setOffset(double offset) {
_offset = std::chrono::milliseconds(static_cast<long long>(offset * 1000));
_periodic.store(false);
}
void Task::setPeriod(double offset, double period) {
_offset = std::chrono::milliseconds(static_cast<long long>(offset * 1000));
_interval = std::chrono::milliseconds(static_cast<long long>(period * 1000));
_periodic.store(true);
}
void Task::setParameter(std::shared_ptr<arangodb::velocypack::Builder> const& parameters) {
_parameters = parameters;
}
void Task::setUser(std::string const& user) { _user = user; }
std::function<void(bool cancelled)> Task::callbackFunction() {
return [self = shared_from_this(), this](bool cancelled) {
if (cancelled) {
MUTEX_LOCKER(guard, _tasksLock);
auto itr = _tasks.find(_id);
if (itr != _tasks.end()) {
// remove task from list of tasks if it is still active
if (this == (*itr).second.second.get()) {
// still the same task. must remove from map
_tasks.erase(itr);
}
}
return;
}
// get the permissions to be used by this task
bool allowContinue = true;
std::shared_ptr<ExecContext> execContext;
if (!_user.empty()) { // not superuser
auto& dbname = _dbGuard->database().name();
execContext.reset(ExecContext::create(_user, dbname).release());
allowContinue = execContext->canUseDatabase(dbname, auth::Level::RW);
}
// permissions might have changed since starting this task
if (_dbGuard->database().server().isStopping() || !allowContinue) {
Task::unregisterTask(_id, true);
return;
}
// now do the work:
bool queued = basics::function_utils::retryUntilTimeout(
[this, self, execContext]() -> bool {
return SchedulerFeature::SCHEDULER->queue(RequestLane::INTERNAL_LOW, [self, this, execContext] {
ExecContextScope scope(_user.empty() ? &ExecContext::superuser()
: execContext.get());
work(execContext.get());
if (_periodic.load() && !_dbGuard->database().server().isStopping()) {
// requeue the task
bool queued = basics::function_utils::retryUntilTimeout(
[this]() -> bool { return queue(_interval); }, Logger::FIXME,
"queue task");
if (!queued) {
THROW_ARANGO_EXCEPTION_MESSAGE(
TRI_ERROR_QUEUE_FULL,
"Failed to queue task for 5 minutes, gave up.");
}
} else {
// in case of one-off tasks or in case of a shutdown, simply
// remove the task from the list
Task::unregisterTask(_id, true);
}
});
},
Logger::FIXME, "queue task");
if (!queued) {
THROW_ARANGO_EXCEPTION_MESSAGE(
TRI_ERROR_QUEUE_FULL, "Failed to queue task for 5 minutes, gave up.");
}
};
}
void Task::start() {
ExecContext const& exec = ExecContext::current();
TRI_ASSERT(exec.isAdminUser() || (!_user.empty() && exec.user() == _user));
{
MUTEX_LOCKER(lock, _taskHandleMutex);
_taskHandle.reset();
}
if (_offset.count() <= 0) {
_offset = std::chrono::microseconds(1);
}
// initially queue the task
bool queued = basics::function_utils::retryUntilTimeout(
[this]() -> bool { return queue(_offset); }, Logger::FIXME, "queue task");
if (!queued) {
THROW_ARANGO_EXCEPTION_MESSAGE(
TRI_ERROR_QUEUE_FULL, "Failed to queue task for 5 minutes, gave up.");
}
}
bool Task::queue(std::chrono::microseconds offset) {
MUTEX_LOCKER(lock, _taskHandleMutex);
bool queued = false;
std::tie(queued, _taskHandle) =
SchedulerFeature::SCHEDULER->queueDelay(RequestLane::INTERNAL_LOW, offset,
callbackFunction());
return queued;
}
void Task::cancel() {
// this will prevent the task from dispatching itself again
_periodic.store(false);
MUTEX_LOCKER(lock, _taskHandleMutex);
_taskHandle.reset();
}
std::shared_ptr<VPackBuilder> Task::toVelocyPack() const {
try {
auto builder = std::make_shared<VPackBuilder>();
{
VPackObjectBuilder b(builder.get());
toVelocyPack(*builder);
}
return builder;
} catch (...) {
return std::make_shared<VPackBuilder>();
}
}
void Task::toVelocyPack(VPackBuilder& builder) const {
builder.add("id", VPackValue(_id));
builder.add("name", VPackValue(_name));
builder.add("created", VPackValue(_created));
if (_periodic.load()) {
builder.add("type", VPackValue("periodic"));
builder.add("period", VPackValue(_interval.count() / 1000000.0));
} else {
builder.add("type", VPackValue("timed"));
}
builder.add("offset", VPackValue(_offset.count() / 1000000.0));
builder.add("command", VPackValue(_command));
builder.add("database", VPackValue(_dbGuard->database().name()));
}
void Task::work(ExecContext const* exec) {
JavaScriptSecurityContext securityContext =
_allowUseDatabase
? JavaScriptSecurityContext::createInternalContext() // internal context that may access internal data
: JavaScriptSecurityContext::createTaskContext(false /*_allowUseDatabase*/); // task context that has no access to dbs
V8ContextGuard guard(&(_dbGuard->database()), securityContext);
// now execute the function within this context
{
auto isolate = guard.isolate();
v8::HandleScope scope(isolate);
// get built-in Function constructor (see ECMA-262 5th edition 15.3.2)
auto current = isolate->GetCurrentContext()->Global();
auto ctor = v8::Local<v8::Function>::Cast(
current->Get(TRI_V8_ASCII_STRING(isolate, "Function")));
// Invoke Function constructor to create function with the given body and
// no
// arguments
v8::Handle<v8::Value> args[2] = {TRI_V8_ASCII_STRING(isolate, "params"),
TRI_V8_STD_STRING(isolate, _command)};
v8::Local<v8::Object> function = ctor->NewInstance(TRI_IGETC, 2, args).FromMaybe(v8::Local<v8::Object>());
v8::Handle<v8::Function> action = v8::Local<v8::Function>::Cast(function);
if (!action.IsEmpty()) {
// only go in here if action is a function
v8::Handle<v8::Value> fArgs;
if (_parameters != nullptr) {
fArgs = TRI_VPackToV8(isolate, _parameters->slice());
} else {
fArgs = v8::Undefined(isolate);
}
// call the function within a try/catch
try {
v8::TryCatch tryCatch(isolate);
action->Call(current, 1, &fArgs);
if (tryCatch.HasCaught()) {
if (tryCatch.CanContinue()) {
TRI_LogV8Exception(isolate, &tryCatch);
} else {
TRI_GET_GLOBALS();
v8g->_canceled = true;
LOG_TOPIC("131e8", WARN, arangodb::Logger::FIXME)
<< "caught non-catchable exception (aka termination) in job";
}
}
} catch (arangodb::basics::Exception const& ex) {
LOG_TOPIC("d6729", ERR, arangodb::Logger::FIXME)
<< "caught exception in V8 user task: " << TRI_errno_string(ex.code())
<< " " << ex.what();
} catch (std::bad_alloc const&) {
LOG_TOPIC("bfe8a", ERR, arangodb::Logger::FIXME)
<< "caught exception in V8 user task: "
<< TRI_errno_string(TRI_ERROR_OUT_OF_MEMORY);
} catch (...) {
LOG_TOPIC("342ec", ERR, arangodb::Logger::FIXME)
<< "caught unknown exception in V8 user task";
}
}
}
}
} // namespace arangodb