1
0
Fork 0

Doc - Cluster Architecture Improvements (#7992)

This commit is contained in:
sleto-it 2019-02-27 16:24:42 +01:00 committed by GitHub
parent a76f21735a
commit ef03234331
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 198 additions and 95 deletions

View File

@ -1,8 +1,11 @@
Cluster Architecture
====================
The cluster architecture of ArangoDB is a _CP_ master/master model with no
single point of failure. With "CP" we mean that in the presence of a
The Cluster architecture of ArangoDB is a _CP_ master/master model with no
single point of failure.
With "CP" in terms of the [CAP theorem](https://en.wikipedia.org/wiki/CAP_theorem)
we mean that in the presence of a
network partition, the database prefers internal consistency over
availability. With "master/master" we mean that clients can send their
requests to an arbitrary node, and experience the same view on the
@ -10,7 +13,7 @@ database regardless. "No single point of failure" means that the cluster
can continue to serve requests, even if one machine fails completely.
In this way, ArangoDB has been designed as a distributed multi-model
database. This section gives a short outline on the cluster architecture and
database. This section gives a short outline on the Cluster architecture and
how the above features and capabilities are achieved.
Structure of an ArangoDB Cluster
@ -18,12 +21,14 @@ Structure of an ArangoDB Cluster
An ArangoDB Cluster consists of a number of ArangoDB instances
which talk to each other over the network. They play different roles,
which will be explained in detail below. The current configuration
which will be explained in detail below.
The current configuration
of the Cluster is held in the _Agency_, which is a highly-available
resilient key/value store based on an odd number of ArangoDB instances
running [Raft Consensus Protocol](https://raft.github.io/).
For the various instances in an ArangoDB Cluster there are 3 distinct
For the various instances in an ArangoDB Cluster there are three distinct
roles:
- _Agents_
@ -63,14 +68,16 @@ and restarted as needed.
### DBServers
DBservers are the ones where the data is actually hosted. They
host shards of data and using synchronous replication a DBServer may
either be leader or follower for a shard.
_DBservers_ are the ones where the data is actually hosted. They
host shards of data and using synchronous replication a _DBServer_ may
either be _leader_ or _follower_ for a shard.
They should not be accessed from the outside but indirectly through the
_Coordinators_. They may also execute queries in part or as a whole when
asked by a _Coordinator_.
See [Sharding](#sharding) below for more information.
Many sensible configurations
----------------------------
@ -95,9 +102,31 @@ which are suitable for different usage scenarios:
latency. Essentially, this moves some of the database distribution
logic to the machine where the client runs.
As you acn see, the _Coordinator_ layer can be scaled and deployed independently
As you can see, the _Coordinator_ layer can be scaled and deployed independently
from the _DBServer_ layer.
{% hint 'warning' %}
It is a best practice and a recommended approach to run _Agent_ instances
on different machines than _DBServer_ instances.
When deploying using the tool [_Starter_](../../../Deployment/ArangoDBStarter/README.md)
this can be achieved by using the options `--cluster.start-dbserver=false` and
`--cluster.start-coordinator=false` on the first three machines where the _Starter_
is started, if the desired _Agency_ _size_ is 3, or on the first 5 machines
if the desired _Agency_ _size_ is 5.
{% endhint %}
{% hint 'info' %}
The different instances that form a Cluster are supposed to be run in the same
_Data Center_ (DC), with reliable and high-speed network connection between
all the machines participating to the Cluster.
Multi-datacenter Clusters, where the entire structure and content of a Cluster located
in a specific DC is replicated to others Clusters located in different DCs, are
possible as well. See [Datacenter to datacenter replication](../DC2DC/README.md)
(DC2DC) for further details.
{% endhint %}
Cluster ID
----------
@ -144,17 +173,34 @@ is synchronous.
Synchronous replication works on a per-shard basis. Using the option _replicationFactor_,
one configures for each _collection_ how many copies of each _shard_ are kept in the Cluster.
{% hint 'danger' %}
If a collection has a _replication factor_ of 1, its data is **not**
replicated to other _DBServers_. This exposes you to a risk of data loss, if
the machine running the _DBServer_ with the only copy of the data fails permanently.
The _replication factor_ has to be set to a value equals or higher than 2
to achieve minimal data redundancy via the synchronous replication.
An equal-or-higher-than 2 _replication factor_ has to be set **explicitly**
when the collection is created, or can be set later at run time if you forgot
to set it at creation time.
When using a Cluster, please make sure all the collections that are important
(and should not be lost in any case) have a _replication factor_ equal or higher
than 2.
{% endhint %}
At any given time, one of the copies is declared to be the _leader_ and
all other replicas are _followers_. Write operations for this _shard_
all other replicas are _followers_. Internally, write operations for this _shard_
are always sent to the _DBServer_ which happens to hold the _leader_ copy,
which in turn replicates the changes to all _followers_ before the operation
is considered to be done and reported back to the _Coordinator_.
Read operations are all served by the server holding the _leader_ copy,
Internally, read operations are all served by the _DBServer_ holding the _leader_ copy,
this allows to provide snapshot semantics for complex transactions.
Using synchronous replication alone will guarantee consistency and high availabilty
Using synchronous replication alone will guarantee consistency and high availability
at the cost of reduced performance: write requests will have a higher latency
(due to every write-request having to be executed on the followers) and
(due to every write-request having to be executed on the _followers_) and
read requests will not scale out as only the _leader_ is being asked.
In a Cluster, synchronous replication will be managed by the _Coordinators_ for the client.
@ -163,19 +209,21 @@ The data will always be stored on the _DBServers_.
The following example will give you an idea of how synchronous operation
has been implemented in ArangoDB Cluster:
1. Connect to a coordinator via arangosh
1. Connect to a _Coordinator_ via [_arangosh_](../../../Programs/Arangosh/README.md)
2. Create a collection
127.0.0.1:8530@_system> db._create("test", {"replicationFactor": 2})
3. the coordinator will figure out a *leader* and 1 *follower* and create 1 *shard* (as this is the default)
3. The _Coordinator_ will figure out a *leader* and one *follower* and create
one *shard* (as this is the default)
4. Insert data
127.0.0.1:8530@_system> db.test.insert({"replication": "😎"})
127.0.0.1:8530@_system> db.test.insert({"foo": "bar"})
5. The _Coordinator_ will write the data to the _leader_, which in turn will
replicate it to the _follower_.
6. Only when both were successful the result is reported to be successful:
5. The coordinator will write the data to the leader, which in turn will
replicate it to the follower.
6. Only when both were successful the result is reported to be successful
```json
{
"_id" : "test/7987",
@ -183,66 +231,119 @@ replicate it to the follower.
"_rev" : "7987"
}
```
When a follower fails, the leader will give up on it after 3 seconds
and proceed with the operation. As soon as the follower (or the network
connection to the leader) is back up, the two will resynchronize and
synchronous replication is resumed. This happens all transparently
to the client.
Obviously, synchronous replication comes at the cost of an increased latency for
write operations, simply because there is one more network hop within the
Cluster for every request. Therefore the user can set the _replicationFactor_
to 1, which means that only one copy of each shard is kept, thereby
switching off synchronous replication. This is a suitable setting for
less important or easily recoverable data for which low latency write
operations matter.
Automatic failover
------------------
### Failure of a follower
If a _DBServer_ that holds a _follower_ copy of a _shard_ fails, then the _leader_
can no longer synchronize its changes to that _follower_. After a short timeout
(3 seconds), the _leader_ gives up on the _follower_, declares it to be
out of sync, and continues service without the _follower_. When the server
with the _follower_ copy comes back, it automatically resynchronizes its
data with the _leader_ and synchronous replication is restored.
(3 seconds), the _leader_ gives up on the _follower_ and declares it to be
out of sync.
If a _DBserver_ that holds a _leader_ copy of a shard fails, then the _leader_
One of the following two cases can happen:
**a)** If another _DBServer_ (that does not hold a _replica_ for this _shard_ already)
is available in the Cluster, a new _follower_ will automatically
be created on this other _DBServer_ (so the _replication factor_ constraint is
satisfied again).
**b)** If no other _DBServer_ (that does not hold a _replica_ for this _shard_ already)
is available, the service continues with one _follower_ less than the number
prescribed by the _replication factor_.
If the old _DBServer_ with the _follower_ copy comes back, one of the following
two cases can happen:
**a)** If previously we were in case a), the _DBServer_ recognizes that there is a new
_follower_ that was elected in the meantime, so it will no longer be a _follower_
for that _shard_.
**b)** If previously we were in case b), the _DBServer_ automatically resynchronizes its
data with the _leader_. The _replication factor_ constraint is now satisfied again
and order is restored.
### Failure of a leader
If a _DBServer_ that holds a _leader_ copy of a shard fails, then the _leader_
can no longer serve any requests. It will no longer send a heartbeat to
the _Agency_. Therefore, a _supervision_ process running in the Raft leader
the _Agency_. Therefore, a _supervision_ process running in the _Raft_ _leader_
of the Agency, can take the necessary action (after 15 seconds of missing
heartbeats), namely to promote one of the servers that hold in-sync
replicas of the shard to leader for that shard. This involves a
reconfiguration in the Agency and leads to the fact that coordinators
now contact a different DBserver for requests to this shard. Service
resumes. The other surviving replicas automatically resynchronize their
data with the new leader. When the DBserver with the original leader
copy comes back, it notices that it now holds a follower replica,
resynchronizes its data with the new leader and order is restored.
heartbeats), namely to promote one of the _DBServers_ that hold in-sync
replicas of the _shard_ to _leader_ for that _shard_. This involves a
reconfiguration in the _Agency_ and leads to the fact that _Coordinators_
now contact a different _DBServer_ for requests to this _shard_. Service
resumes. The other surviving _replicas_ automatically resynchronize their
data with the new _leader_.
The following example will give you an idea of how failover
In addition to the above, one of the following two cases cases can happen:
a) If another _DBServer_ (that does not hold a _replica_ for this _shard_ already)
is available in the Cluster, a new _follower_ will automatically
be created on this other _DBServer_ (so the _replication factor_ constraint is
satisfied again).
b) If no other _DBServer_ (that does not hold a _replica_ for this _shard_ already)
is available the service continues with one _follower_ less than the number
prescribed by the _replication factor_.
When the _DBServer_ with the original _leader_ copy comes back, it recognizes
that a new _leader_ was elected in the meantime, and one of the following
two cases can happen:
a) If previously we were in case a), since also a new _follower_ was created and
the _replication factor_ constraint is satisfied, the _DBServer_ will no
longer be a _follower_ for that _shard_.
b) If previously we were in case b), the _DBServer_ notices that it now holds
a _follower_ _replica_ of that _shard_ and it resynchronizes its data with the
new _leader_. The _replication factor_ constraint is now satisfied again,
and order is restored.
The following example will give you an idea of how _failover_
has been implemented in ArangoDB Cluster:
1. The _leader_ of a _shard_ (lets name it _DBServer001_) is going down.
1. The _leader_ of a _shard_ (let's name it _DBServer001_) is going down.
2. A _Coordinator_ is asked to return a document:
127.0.0.1:8530@_system> db.test.document("100069")
3. The _Coordinator_ determines which server is responsible for this document and finds _DBServer001_
4. The _Coordinator_ tries to contact _DBServer001_ and timeouts because it is not reachable.
5. After a short while the _supervision_ (running in parallel on the _Agency_) will see that _heartbeats_ from _DBServer001_ are not coming in
6. The _supervision_ promotes one of the _followers_ (say _DBServer002_), that is in sync, to be _leader_ and makes _DBServer001_ a _follower_.
7. As the _Coordinator_ continues trying to fetch the document it will see that the _leader_ changed to _DBServer002_
8. The _Coordinator_ tries to contact the new _leader_ (_DBServer002_) and returns the result:
3. The _Coordinator_ determines which server is responsible for this document
and finds _DBServer001_
4. The _Coordinator_ tries to contact _DBServer001_ and timeouts because it is
not reachable.
5. After a short while the _supervision_ (running in parallel on the _Agency_)
will see that _heartbeats_ from _DBServer001_ are not coming in
6. The _supervision_ promotes one of the _followers_ (say _DBServer002_), that
is in sync, to be _leader_ and makes _DBServer001_ a _follower_.
7. As the _Coordinator_ continues trying to fetch the document it will see that
the _leader_ changed to _DBServer002_
8. The _Coordinator_ tries to contact the new _leader_ (_DBServer002_) and returns
the result:
```json
{
"_key" : "100069",
"_id" : "test/100069",
"_rev" : "513",
"replication" : "😎"
"foo" : "bar"
}
```
9. After a while the _supervision_ declares _DBServer001_ to be completely dead.
10. A new _follower_ is determined from the pool of _DBservers_.
11. The new _follower_ syncs its data from the _leade_r and order is restored.
11. The new _follower_ syncs its data from the _leader_ and order is restored.
Please note that there may still be timeouts. Depending on when exactly
the request has been done (in regard to the _supervision_) and depending
on the time needed to reconfigure the Cluster the _Coordinator_ might fail
with a timeout error!
with a timeout error.
Shard movement and resynchronization
------------------------------------
@ -260,14 +361,6 @@ All these operations can be triggered via a REST/JSON API or via the
graphical web UI. All fail-over operations are completely handled within
the ArangoDB Cluster.
Obviously, synchronous replication involves a certain increased latency for
write operations, simply because there is one more network hop within the
Cluster for every request. Therefore the user can set the _replicationFactor_
to 1, which means that only one copy of each shard is kept, thereby
switching off synchronous replication. This is a suitable setting for
less important or easily recoverable data for which low latency write
operations matter.
Microservices and zero administation
------------------------------------
@ -286,44 +379,15 @@ being able to use a single technology for various roles in a project.
To simplify life of the _devops_ in such a scenario we try as much as
possible to use a _zero administration_ approach for ArangoDB. A running
ArangoDB Cluster is resilient against failures and essentially repairs
itself in case of temporary failures. See the next section for further
capabilities in this direction.
itself in case of temporary failures.
Apache Mesos integration
------------------------
Deployment
----------
For the distributed setup, we use the Apache Mesos infrastructure by default.
An ArangoDB Cluster can be deployed in several ways, e.g. by manually
starting all the needed instances, by using the tool
[_Starter_](../../../Programs/Starter/README.md), in
Docker, in Mesos or DC/OS, and in Kubernetes.
ArangoDB is a fully certified package for DC/OS and can thus
be deployed essentially with a few mouse clicks or a single command, once
you have an existing DC/OS cluster. But even on a plain Apache Mesos cluster
one can deploy ArangoDB via Marathon with a single API call and some JSON
configuration.
The advantage of this approach is that we can not only implement the
initial deployment, but also the later management of automatic
replacement of failed instances and the scaling of the ArangoDB cluster
(triggered manually or even automatically). Since all manipulations are
either via the graphical web UI or via JSON/REST calls, one can even
implement auto-scaling very easily.
A DC/OS cluster is a very natural environment to deploy microservice
architectures, since it is so convenient to deploy various services,
including potentially multiple ArangoDB cluster instances within the
same DC/OS cluster. The built-in service discovery makes it extremely
simple to connect the various microservices and Mesos automatically
takes care of the distribution and deployment of the various tasks.
See the [Deployment](../../../Deployment/README.md) chapter and its subsections
for instructions.
It is possible to deploy an ArangoDB cluster by simply launching a bunch of
Docker containers with the right command line options to link them up,
or even on a single machine starting multiple ArangoDB processes. In that
case, synchronous replication will work within the deployed ArangoDB cluster,
and automatic fail-over in the sense that the duties of a failed server will
automatically be assigned to another, surviving one. However, since the
ArangoDB cluster cannot within itself launch additional instances, replacement
of failed nodes is not automatic and scaling up and down has to be managed
manually. This is why we do not recommend this setup for production
deployment.
See the [Cluster Deployment](../../../Deployment/Cluster/README.md)
chapter for instructions.

View File

@ -0,0 +1,39 @@
Apache Mesos integration
------------------------
For the distributed setup, we use the Apache Mesos infrastructure by default.
ArangoDB is a fully certified package for DC/OS and can thus
be deployed essentially with a few mouse clicks or a single command, once
you have an existing DC/OS cluster. But even on a plain Apache Mesos cluster
one can deploy ArangoDB via Marathon with a single API call and some JSON
configuration.
The advantage of this approach is that we can not only implement the
initial deployment, but also the later management of automatic
replacement of failed instances and the scaling of the ArangoDB cluster
(triggered manually or even automatically). Since all manipulations are
either via the graphical web UI or via JSON/REST calls, one can even
implement auto-scaling very easily.
A DC/OS cluster is a very natural environment to deploy microservice
architectures, since it is so convenient to deploy various services,
including potentially multiple ArangoDB cluster instances within the
same DC/OS cluster. The built-in service discovery makes it extremely
simple to connect the various microservices and Mesos automatically
takes care of the distribution and deployment of the various tasks.
See the [Deployment](../../../Deployment/README.md) chapter and its subsections
for instructions.
It is possible to deploy an ArangoDB cluster by simply launching a bunch of
Docker containers with the right command line options to link them up,
or even on a single machine starting multiple ArangoDB processes. In that
case, synchronous replication will work within the deployed ArangoDB cluster,
and automatic fail-over in the sense that the duties of a failed server will
automatically be assigned to another, surviving one. However, since the
ArangoDB cluster cannot within itself launch additional instances, replacement
of failed nodes is not automatic and scaling up and down has to be managed
manually. This is why we do not recommend this setup for production
deployment.