mirror of https://gitee.com/bigwinds/arangodb
246 lines
8.2 KiB
C++
246 lines
8.2 KiB
C++
////////////////////////////////////////////////////////////////////////////////
|
|
/// DISCLAIMER
|
|
///
|
|
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
|
|
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
|
|
///
|
|
/// Licensed under the Apache License, Version 2.0 (the "License");
|
|
/// you may not use this file except in compliance with the License.
|
|
/// You may obtain a copy of the License at
|
|
///
|
|
/// http://www.apache.org/licenses/LICENSE-2.0
|
|
///
|
|
/// Unless required by applicable law or agreed to in writing, software
|
|
/// distributed under the License is distributed on an "AS IS" BASIS,
|
|
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
/// See the License for the specific language governing permissions and
|
|
/// limitations under the License.
|
|
///
|
|
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
|
///
|
|
/// @author Jan Steemann
|
|
////////////////////////////////////////////////////////////////////////////////
|
|
|
|
#ifndef ARANGOD_REPLICATION_SYNCER_H
|
|
#define ARANGOD_REPLICATION_SYNCER_H 1
|
|
|
|
#include "Basics/Common.h"
|
|
#include "Basics/ConditionVariable.h"
|
|
#include "Replication/ReplicationApplierConfiguration.h"
|
|
#include "Replication/SyncerId.h"
|
|
#include "Replication/common-defines.h"
|
|
#include "Replication/utilities.h"
|
|
#include "Utils/DatabaseGuard.h"
|
|
#include "VocBase/ticks.h"
|
|
|
|
struct TRI_vocbase_t;
|
|
|
|
namespace arangodb {
|
|
namespace httpclient {
|
|
class GeneralClientConnection;
|
|
class SimpleHttpClient;
|
|
class SimpleHttpResult;
|
|
} // namespace httpclient
|
|
|
|
namespace transaction {
|
|
class Methods;
|
|
}
|
|
|
|
namespace velocypack {
|
|
class Slice;
|
|
}
|
|
|
|
class Endpoint;
|
|
class LogicalCollection;
|
|
|
|
class Syncer : public std::enable_shared_from_this<Syncer> {
|
|
public:
|
|
/// @brief a helper object used for synchronization between the
|
|
/// dump apply thread and some helper job posted into the scheduler
|
|
/// for async fetching of the next dump results
|
|
class JobSynchronizer : public std::enable_shared_from_this<JobSynchronizer> {
|
|
public:
|
|
JobSynchronizer(JobSynchronizer const&) = delete;
|
|
JobSynchronizer& operator=(JobSynchronizer const&) = delete;
|
|
|
|
explicit JobSynchronizer(std::shared_ptr<Syncer const> const& syncer);
|
|
~JobSynchronizer();
|
|
|
|
/// @brief whether or not a response has arrived
|
|
bool gotResponse() const noexcept;
|
|
|
|
/// @brief will be called whenever a response for the job comes in
|
|
void gotResponse(std::unique_ptr<arangodb::httpclient::SimpleHttpResult> response, double time) noexcept;
|
|
|
|
/// @brief will be called whenever an error occurred
|
|
/// expects "res" to be an error!
|
|
void gotResponse(arangodb::Result&& res, double time = 0.0) noexcept;
|
|
|
|
/// @brief the calling Syncer will call and block inside this function until
|
|
/// there is a response or the syncer/server is shut down
|
|
arangodb::Result waitForResponse(std::unique_ptr<arangodb::httpclient::SimpleHttpResult>& response);
|
|
|
|
/// @brief post an async request to the scheduler
|
|
/// this will increase the number of inflight jobs, and count it down
|
|
/// when the posted request has finished
|
|
void request(std::function<void()> const& cb);
|
|
|
|
/// @brief notifies that a job was posted
|
|
/// returns false if job counter could not be increased (e.g. because
|
|
/// the syncer was stopped/aborted already)
|
|
bool jobPosted();
|
|
|
|
/// @brief notifies that a job was done
|
|
void jobDone();
|
|
|
|
/// @brief checks if there are jobs in flight (can be 0 or 1 job only)
|
|
bool hasJobInFlight() const noexcept;
|
|
|
|
/// @brief return the stored elapsed time for the job
|
|
double time() const noexcept { return _time; }
|
|
|
|
private:
|
|
/// @brief the shared syncer we use to check if sychronization was
|
|
/// externally aborted
|
|
std::shared_ptr<Syncer const> _syncer;
|
|
|
|
/// @brief condition variable used for synchronization
|
|
arangodb::basics::ConditionVariable mutable _condition;
|
|
|
|
/// @brief true if a response was received
|
|
bool _gotResponse;
|
|
|
|
/// @brief elapsed time for performing the job
|
|
double _time;
|
|
|
|
/// @brief the processing response of the job (indicates failure if no
|
|
/// response was received or if something went wrong)
|
|
arangodb::Result _res;
|
|
|
|
/// @brief the response received by the job (nullptr if no response
|
|
/// received)
|
|
std::unique_ptr<arangodb::httpclient::SimpleHttpResult> _response;
|
|
|
|
/// @brief number of posted jobs in flight
|
|
uint64_t _jobsInFlight;
|
|
};
|
|
|
|
struct SyncerState {
|
|
SyncerId syncerId;
|
|
|
|
/// @brief configuration
|
|
ReplicationApplierConfiguration applier;
|
|
|
|
/// @brief information about the replication barrier
|
|
replutils::BarrierInfo barrier{};
|
|
|
|
/// @brief object holding the HTTP client and all connection machinery
|
|
replutils::Connection connection;
|
|
|
|
/// @brief database name
|
|
std::string databaseName{};
|
|
|
|
/// Is this syncer allowed to handle its own batch
|
|
bool isChildSyncer{false};
|
|
|
|
/// @brief leaderId, this is used in the cluster to the unique ID of the
|
|
/// source server (the shard leader in this case). We need this information
|
|
/// to apply the changes locally to a shard, which is configured as a
|
|
/// follower and thus only accepts modifications that are replications
|
|
/// from the leader. Leave empty if there is no concept of a "leader".
|
|
std::string leaderId{};
|
|
|
|
/// @brief local server id
|
|
TRI_server_id_t localServerId{0};
|
|
|
|
/// @brief local server id
|
|
std::string localServerIdString{};
|
|
|
|
/// @brief information about the master state
|
|
replutils::MasterInfo master;
|
|
|
|
/// @brief lazy loaded list of vocbases
|
|
std::unordered_map<std::string, DatabaseGuard> vocbases{};
|
|
|
|
SyncerState(Syncer*, ReplicationApplierConfiguration const&);
|
|
};
|
|
|
|
Syncer(Syncer const&) = delete;
|
|
Syncer& operator=(Syncer const&) = delete;
|
|
|
|
explicit Syncer(ReplicationApplierConfiguration const&);
|
|
|
|
virtual ~Syncer();
|
|
|
|
/// @brief request location rewriter (injects database name)
|
|
static std::string rewriteLocation(void*, std::string const&);
|
|
|
|
/// @brief steal the barrier id from the syncer
|
|
TRI_voc_tick_t stealBarrier();
|
|
|
|
void setLeaderId(std::string const& leaderId) { _state.leaderId = leaderId; }
|
|
|
|
// TODO worker-safety
|
|
void setAborted(bool value);
|
|
|
|
// TODO worker-safety
|
|
virtual bool isAborted() const;
|
|
|
|
SyncerId syncerId() const noexcept;
|
|
|
|
protected:
|
|
/// @brief reload all users
|
|
// TODO worker safety
|
|
void reloadUsers();
|
|
|
|
/// @brief apply a single marker from the collection dump
|
|
// TODO worker-safety
|
|
Result applyCollectionDumpMarker(transaction::Methods&, LogicalCollection* coll,
|
|
TRI_replication_operation_e,
|
|
arangodb::velocypack::Slice const&);
|
|
|
|
/// @brief creates a collection, based on the VelocyPack provided
|
|
// TODO worker safety - create/drop phase
|
|
Result createCollection(TRI_vocbase_t& vocbase, arangodb::velocypack::Slice const& slice,
|
|
arangodb::LogicalCollection** dst);
|
|
|
|
/// @brief drops a collection, based on the VelocyPack provided
|
|
// TODO worker safety - create/drop phase
|
|
Result dropCollection(arangodb::velocypack::Slice const&, bool reportError);
|
|
|
|
/// @brief creates an index, based on the VelocyPack provided
|
|
Result createIndex(arangodb::velocypack::Slice const&);
|
|
|
|
/// @brief creates an index, or returns the existing matching index if there is one
|
|
void createIndexInternal(arangodb::velocypack::Slice const&, LogicalCollection&);
|
|
|
|
/// @brief drops an index, based on the VelocyPack provided
|
|
Result dropIndex(arangodb::velocypack::Slice const&);
|
|
|
|
/// @brief creates a view, based on the VelocyPack provided
|
|
Result createView(TRI_vocbase_t& vocbase, arangodb::velocypack::Slice const& slice);
|
|
|
|
/// @brief drops a view, based on the VelocyPack provided
|
|
Result dropView(arangodb::velocypack::Slice const&, bool reportError);
|
|
|
|
// TODO worker safety
|
|
virtual TRI_vocbase_t* resolveVocbase(velocypack::Slice const&);
|
|
|
|
// TODO worker safety
|
|
std::shared_ptr<LogicalCollection> resolveCollection(TRI_vocbase_t& vocbase,
|
|
arangodb::velocypack::Slice const& slice);
|
|
|
|
// TODO worker safety
|
|
std::unordered_map<std::string, DatabaseGuard> const& vocbases() const {
|
|
return _state.vocbases;
|
|
}
|
|
|
|
protected:
|
|
/// @brief state information for the syncer
|
|
SyncerState _state;
|
|
};
|
|
|
|
} // namespace arangodb
|
|
|
|
#endif
|