From d65ee0a2414f865b2613a257cc00569e925d40c0 Mon Sep 17 00:00:00 2001 From: sleto-it <31849787+sleto-it@users.noreply.github.com> Date: Wed, 27 Feb 2019 16:49:48 +0100 Subject: [PATCH] Doc - Cluster Architecture Improvements (#8285) --- .../DeploymentModes/Cluster/Architecture.md | 254 +++++++++++------- .../Books/stash/ClusterArchitecture.md | 39 +++ 2 files changed, 198 insertions(+), 95 deletions(-) create mode 100644 Documentation/Books/stash/ClusterArchitecture.md diff --git a/Documentation/Books/Manual/Architecture/DeploymentModes/Cluster/Architecture.md b/Documentation/Books/Manual/Architecture/DeploymentModes/Cluster/Architecture.md index 841a19353c..7552ffc404 100644 --- a/Documentation/Books/Manual/Architecture/DeploymentModes/Cluster/Architecture.md +++ b/Documentation/Books/Manual/Architecture/DeploymentModes/Cluster/Architecture.md @@ -1,8 +1,11 @@ Cluster Architecture ==================== -The cluster architecture of ArangoDB is a _CP_ master/master model with no -single point of failure. With "CP" we mean that in the presence of a +The Cluster architecture of ArangoDB is a _CP_ master/master model with no +single point of failure. + +With "CP" in terms of the [CAP theorem](https://en.wikipedia.org/wiki/CAP_theorem) +we mean that in the presence of a network partition, the database prefers internal consistency over availability. With "master/master" we mean that clients can send their requests to an arbitrary node, and experience the same view on the @@ -10,7 +13,7 @@ database regardless. "No single point of failure" means that the cluster can continue to serve requests, even if one machine fails completely. In this way, ArangoDB has been designed as a distributed multi-model -database. This section gives a short outline on the cluster architecture and +database. This section gives a short outline on the Cluster architecture and how the above features and capabilities are achieved. Structure of an ArangoDB Cluster @@ -18,12 +21,14 @@ Structure of an ArangoDB Cluster An ArangoDB Cluster consists of a number of ArangoDB instances which talk to each other over the network. They play different roles, -which will be explained in detail below. The current configuration +which will be explained in detail below. + +The current configuration of the Cluster is held in the _Agency_, which is a highly-available resilient key/value store based on an odd number of ArangoDB instances running [Raft Consensus Protocol](https://raft.github.io/). -For the various instances in an ArangoDB Cluster there are 3 distinct +For the various instances in an ArangoDB Cluster there are three distinct roles: - _Agents_ @@ -63,14 +68,16 @@ and restarted as needed. ### DBServers -DBservers are the ones where the data is actually hosted. They -host shards of data and using synchronous replication a DBServer may -either be leader or follower for a shard. +_DBservers_ are the ones where the data is actually hosted. They +host shards of data and using synchronous replication a _DBServer_ may +either be _leader_ or _follower_ for a shard. They should not be accessed from the outside but indirectly through the _Coordinators_. They may also execute queries in part or as a whole when asked by a _Coordinator_. +See [Sharding](#sharding) below for more information. + Many sensible configurations ---------------------------- @@ -95,9 +102,31 @@ which are suitable for different usage scenarios: latency. Essentially, this moves some of the database distribution logic to the machine where the client runs. -As you acn see, the _Coordinator_ layer can be scaled and deployed independently +As you can see, the _Coordinator_ layer can be scaled and deployed independently from the _DBServer_ layer. +{% hint 'warning' %} +It is a best practice and a recommended approach to run _Agent_ instances +on different machines than _DBServer_ instances. + +When deploying using the tool [_Starter_](../../../Deployment/ArangoDBStarter/README.md) +this can be achieved by using the options `--cluster.start-dbserver=false` and +`--cluster.start-coordinator=false` on the first three machines where the _Starter_ +is started, if the desired _Agency_ _size_ is 3, or on the first 5 machines +if the desired _Agency_ _size_ is 5. +{% endhint %} + +{% hint 'info' %} +The different instances that form a Cluster are supposed to be run in the same +_Data Center_ (DC), with reliable and high-speed network connection between +all the machines participating to the Cluster. + +Multi-datacenter Clusters, where the entire structure and content of a Cluster located +in a specific DC is replicated to others Clusters located in different DCs, are +possible as well. See [Datacenter to datacenter replication](../DC2DC/README.md) +(DC2DC) for further details. +{% endhint %} + Cluster ID ---------- @@ -144,17 +173,34 @@ is synchronous. Synchronous replication works on a per-shard basis. Using the option _replicationFactor_, one configures for each _collection_ how many copies of each _shard_ are kept in the Cluster. +{% hint 'danger' %} +If a collection has a _replication factor_ of 1, its data is **not** +replicated to other _DBServers_. This exposes you to a risk of data loss, if +the machine running the _DBServer_ with the only copy of the data fails permanently. + +The _replication factor_ has to be set to a value equals or higher than 2 +to achieve minimal data redundancy via the synchronous replication. + +An equal-or-higher-than 2 _replication factor_ has to be set **explicitly** +when the collection is created, or can be set later at run time if you forgot +to set it at creation time. + +When using a Cluster, please make sure all the collections that are important +(and should not be lost in any case) have a _replication factor_ equal or higher +than 2. +{% endhint %} + At any given time, one of the copies is declared to be the _leader_ and -all other replicas are _followers_. Write operations for this _shard_ +all other replicas are _followers_. Internally, write operations for this _shard_ are always sent to the _DBServer_ which happens to hold the _leader_ copy, which in turn replicates the changes to all _followers_ before the operation is considered to be done and reported back to the _Coordinator_. -Read operations are all served by the server holding the _leader_ copy, +Internally, read operations are all served by the _DBServer_ holding the _leader_ copy, this allows to provide snapshot semantics for complex transactions. -Using synchronous replication alone will guarantee consistency and high availabilty +Using synchronous replication alone will guarantee consistency and high availability at the cost of reduced performance: write requests will have a higher latency -(due to every write-request having to be executed on the followers) and +(due to every write-request having to be executed on the _followers_) and read requests will not scale out as only the _leader_ is being asked. In a Cluster, synchronous replication will be managed by the _Coordinators_ for the client. @@ -163,19 +209,21 @@ The data will always be stored on the _DBServers_. The following example will give you an idea of how synchronous operation has been implemented in ArangoDB Cluster: -1. Connect to a coordinator via arangosh +1. Connect to a _Coordinator_ via [_arangosh_](../../../Programs/Arangosh/README.md) 2. Create a collection 127.0.0.1:8530@_system> db._create("test", {"replicationFactor": 2}) -3. the coordinator will figure out a *leader* and 1 *follower* and create 1 *shard* (as this is the default) +3. The _Coordinator_ will figure out a *leader* and one *follower* and create + one *shard* (as this is the default) 4. Insert data - 127.0.0.1:8530@_system> db.test.insert({"replication": "😎"}) + 127.0.0.1:8530@_system> db.test.insert({"foo": "bar"}) + +5. The _Coordinator_ will write the data to the _leader_, which in turn will +replicate it to the _follower_. +6. Only when both were successful the result is reported to be successful: -5. The coordinator will write the data to the leader, which in turn will -replicate it to the follower. -6. Only when both were successful the result is reported to be successful ```json { "_id" : "test/7987", @@ -183,66 +231,119 @@ replicate it to the follower. "_rev" : "7987" } ``` - When a follower fails, the leader will give up on it after 3 seconds - and proceed with the operation. As soon as the follower (or the network - connection to the leader) is back up, the two will resynchronize and - synchronous replication is resumed. This happens all transparently - to the client. + +Obviously, synchronous replication comes at the cost of an increased latency for +write operations, simply because there is one more network hop within the +Cluster for every request. Therefore the user can set the _replicationFactor_ +to 1, which means that only one copy of each shard is kept, thereby +switching off synchronous replication. This is a suitable setting for +less important or easily recoverable data for which low latency write +operations matter. Automatic failover ------------------ +### Failure of a follower + If a _DBServer_ that holds a _follower_ copy of a _shard_ fails, then the _leader_ can no longer synchronize its changes to that _follower_. After a short timeout -(3 seconds), the _leader_ gives up on the _follower_, declares it to be -out of sync, and continues service without the _follower_. When the server -with the _follower_ copy comes back, it automatically resynchronizes its -data with the _leader_ and synchronous replication is restored. +(3 seconds), the _leader_ gives up on the _follower_ and declares it to be +out of sync. -If a _DBserver_ that holds a _leader_ copy of a shard fails, then the _leader_ +One of the following two cases can happen: + +**a)** If another _DBServer_ (that does not hold a _replica_ for this _shard_ already) + is available in the Cluster, a new _follower_ will automatically + be created on this other _DBServer_ (so the _replication factor_ constraint is + satisfied again). + +**b)** If no other _DBServer_ (that does not hold a _replica_ for this _shard_ already) + is available, the service continues with one _follower_ less than the number + prescribed by the _replication factor_. + +If the old _DBServer_ with the _follower_ copy comes back, one of the following +two cases can happen: + +**a)** If previously we were in case a), the _DBServer_ recognizes that there is a new + _follower_ that was elected in the meantime, so it will no longer be a _follower_ + for that _shard_. + +**b)** If previously we were in case b), the _DBServer_ automatically resynchronizes its + data with the _leader_. The _replication factor_ constraint is now satisfied again + and order is restored. + +### Failure of a leader + +If a _DBServer_ that holds a _leader_ copy of a shard fails, then the _leader_ can no longer serve any requests. It will no longer send a heartbeat to -the _Agency_. Therefore, a _supervision_ process running in the Raft leader +the _Agency_. Therefore, a _supervision_ process running in the _Raft_ _leader_ of the Agency, can take the necessary action (after 15 seconds of missing -heartbeats), namely to promote one of the servers that hold in-sync -replicas of the shard to leader for that shard. This involves a -reconfiguration in the Agency and leads to the fact that coordinators -now contact a different DBserver for requests to this shard. Service -resumes. The other surviving replicas automatically resynchronize their -data with the new leader. When the DBserver with the original leader -copy comes back, it notices that it now holds a follower replica, -resynchronizes its data with the new leader and order is restored. +heartbeats), namely to promote one of the _DBServers_ that hold in-sync +replicas of the _shard_ to _leader_ for that _shard_. This involves a +reconfiguration in the _Agency_ and leads to the fact that _Coordinators_ +now contact a different _DBServer_ for requests to this _shard_. Service +resumes. The other surviving _replicas_ automatically resynchronize their +data with the new _leader_. -The following example will give you an idea of how failover +In addition to the above, one of the following two cases cases can happen: + +a) If another _DBServer_ (that does not hold a _replica_ for this _shard_ already) + is available in the Cluster, a new _follower_ will automatically + be created on this other _DBServer_ (so the _replication factor_ constraint is + satisfied again). +b) If no other _DBServer_ (that does not hold a _replica_ for this _shard_ already) + is available the service continues with one _follower_ less than the number + prescribed by the _replication factor_. + +When the _DBServer_ with the original _leader_ copy comes back, it recognizes +that a new _leader_ was elected in the meantime, and one of the following +two cases can happen: + +a) If previously we were in case a), since also a new _follower_ was created and + the _replication factor_ constraint is satisfied, the _DBServer_ will no + longer be a _follower_ for that _shard_. +b) If previously we were in case b), the _DBServer_ notices that it now holds + a _follower_ _replica_ of that _shard_ and it resynchronizes its data with the + new _leader_. The _replication factor_ constraint is now satisfied again, + and order is restored. + +The following example will give you an idea of how _failover_ has been implemented in ArangoDB Cluster: -1. The _leader_ of a _shard_ (lets name it _DBServer001_) is going down. +1. The _leader_ of a _shard_ (let's name it _DBServer001_) is going down. 2. A _Coordinator_ is asked to return a document: 127.0.0.1:8530@_system> db.test.document("100069") -3. The _Coordinator_ determines which server is responsible for this document and finds _DBServer001_ -4. The _Coordinator_ tries to contact _DBServer001_ and timeouts because it is not reachable. -5. After a short while the _supervision_ (running in parallel on the _Agency_) will see that _heartbeats_ from _DBServer001_ are not coming in -6. The _supervision_ promotes one of the _followers_ (say _DBServer002_), that is in sync, to be _leader_ and makes _DBServer001_ a _follower_. -7. As the _Coordinator_ continues trying to fetch the document it will see that the _leader_ changed to _DBServer002_ -8. The _Coordinator_ tries to contact the new _leader_ (_DBServer002_) and returns the result: +3. The _Coordinator_ determines which server is responsible for this document + and finds _DBServer001_ +4. The _Coordinator_ tries to contact _DBServer001_ and timeouts because it is + not reachable. +5. After a short while the _supervision_ (running in parallel on the _Agency_) + will see that _heartbeats_ from _DBServer001_ are not coming in +6. The _supervision_ promotes one of the _followers_ (say _DBServer002_), that + is in sync, to be _leader_ and makes _DBServer001_ a _follower_. +7. As the _Coordinator_ continues trying to fetch the document it will see that + the _leader_ changed to _DBServer002_ +8. The _Coordinator_ tries to contact the new _leader_ (_DBServer002_) and returns + the result: ```json { "_key" : "100069", "_id" : "test/100069", "_rev" : "513", - "replication" : "😎" + "foo" : "bar" } ``` 9. After a while the _supervision_ declares _DBServer001_ to be completely dead. 10. A new _follower_ is determined from the pool of _DBservers_. -11. The new _follower_ syncs its data from the _leade_r and order is restored. +11. The new _follower_ syncs its data from the _leader_ and order is restored. Please note that there may still be timeouts. Depending on when exactly the request has been done (in regard to the _supervision_) and depending on the time needed to reconfigure the Cluster the _Coordinator_ might fail -with a timeout error! +with a timeout error. Shard movement and resynchronization ------------------------------------ @@ -260,14 +361,6 @@ All these operations can be triggered via a REST/JSON API or via the graphical web UI. All fail-over operations are completely handled within the ArangoDB Cluster. -Obviously, synchronous replication involves a certain increased latency for -write operations, simply because there is one more network hop within the -Cluster for every request. Therefore the user can set the _replicationFactor_ -to 1, which means that only one copy of each shard is kept, thereby -switching off synchronous replication. This is a suitable setting for -less important or easily recoverable data for which low latency write -operations matter. - Microservices and zero administation ------------------------------------ @@ -286,44 +379,15 @@ being able to use a single technology for various roles in a project. To simplify life of the _devops_ in such a scenario we try as much as possible to use a _zero administration_ approach for ArangoDB. A running ArangoDB Cluster is resilient against failures and essentially repairs -itself in case of temporary failures. See the next section for further -capabilities in this direction. +itself in case of temporary failures. -Apache Mesos integration ------------------------- +Deployment +---------- -For the distributed setup, we use the Apache Mesos infrastructure by default. +An ArangoDB Cluster can be deployed in several ways, e.g. by manually +starting all the needed instances, by using the tool +[_Starter_](../../../Programs/Starter/README.md), in +Docker, in Mesos or DC/OS, and in Kubernetes. -ArangoDB is a fully certified package for DC/OS and can thus -be deployed essentially with a few mouse clicks or a single command, once -you have an existing DC/OS cluster. But even on a plain Apache Mesos cluster -one can deploy ArangoDB via Marathon with a single API call and some JSON -configuration. - -The advantage of this approach is that we can not only implement the -initial deployment, but also the later management of automatic -replacement of failed instances and the scaling of the ArangoDB cluster -(triggered manually or even automatically). Since all manipulations are -either via the graphical web UI or via JSON/REST calls, one can even -implement auto-scaling very easily. - -A DC/OS cluster is a very natural environment to deploy microservice -architectures, since it is so convenient to deploy various services, -including potentially multiple ArangoDB cluster instances within the -same DC/OS cluster. The built-in service discovery makes it extremely -simple to connect the various microservices and Mesos automatically -takes care of the distribution and deployment of the various tasks. - -See the [Deployment](../../../Deployment/README.md) chapter and its subsections -for instructions. - -It is possible to deploy an ArangoDB cluster by simply launching a bunch of -Docker containers with the right command line options to link them up, -or even on a single machine starting multiple ArangoDB processes. In that -case, synchronous replication will work within the deployed ArangoDB cluster, -and automatic fail-over in the sense that the duties of a failed server will -automatically be assigned to another, surviving one. However, since the -ArangoDB cluster cannot within itself launch additional instances, replacement -of failed nodes is not automatic and scaling up and down has to be managed -manually. This is why we do not recommend this setup for production -deployment. +See the [Cluster Deployment](../../../Deployment/Cluster/README.md) +chapter for instructions. diff --git a/Documentation/Books/stash/ClusterArchitecture.md b/Documentation/Books/stash/ClusterArchitecture.md new file mode 100644 index 0000000000..dcdec1987b --- /dev/null +++ b/Documentation/Books/stash/ClusterArchitecture.md @@ -0,0 +1,39 @@ + +Apache Mesos integration +------------------------ + +For the distributed setup, we use the Apache Mesos infrastructure by default. + +ArangoDB is a fully certified package for DC/OS and can thus +be deployed essentially with a few mouse clicks or a single command, once +you have an existing DC/OS cluster. But even on a plain Apache Mesos cluster +one can deploy ArangoDB via Marathon with a single API call and some JSON +configuration. + +The advantage of this approach is that we can not only implement the +initial deployment, but also the later management of automatic +replacement of failed instances and the scaling of the ArangoDB cluster +(triggered manually or even automatically). Since all manipulations are +either via the graphical web UI or via JSON/REST calls, one can even +implement auto-scaling very easily. + +A DC/OS cluster is a very natural environment to deploy microservice +architectures, since it is so convenient to deploy various services, +including potentially multiple ArangoDB cluster instances within the +same DC/OS cluster. The built-in service discovery makes it extremely +simple to connect the various microservices and Mesos automatically +takes care of the distribution and deployment of the various tasks. + +See the [Deployment](../../../Deployment/README.md) chapter and its subsections +for instructions. + +It is possible to deploy an ArangoDB cluster by simply launching a bunch of +Docker containers with the right command line options to link them up, +or even on a single machine starting multiple ArangoDB processes. In that +case, synchronous replication will work within the deployed ArangoDB cluster, +and automatic fail-over in the sense that the duties of a failed server will +automatically be assigned to another, surviving one. However, since the +ArangoDB cluster cannot within itself launch additional instances, replacement +of failed nodes is not automatic and scaling up and down has to be managed +manually. This is why we do not recommend this setup for production +deployment.