通过协调合作,在分布式应用程序的任务实例集合执行的操作,选举一个实例作为承担管理的其他实例责任的领导者。这个模式可以有助于确保任务实例不互相冲突,导致争用共享资源,或与其他的任务实例正在执行的工作无意中干扰。
一个典型的云应用包括行动协调的方式很多任务。这些任务都可以是实例运行相同的代码和需要访问相同的资源,或者它们可能是可并行工作,以执行复杂计算的各个部分。
任务实例可能为多的时间自主运行,但它也可能是必要的,以协调各实例的操作,以确保它们不发生冲突,导致争用共享资源,或无意中妨碍工作,其他的任务实例正在执行。例如:
由于任务实例都是同行,没有天生的领导者,能够充当协调员或聚合器。
单个任务实例应选充当领导者,这种情况下应协调其他从属任务实例的操作。如果所有的任务实例都运行相同的代码,他们可能都能够充当领导者。因此,选举过程必须谨慎管理,以防止两个或多个实例接管领导者的角色在同一时间。
该系统必须选择的领导者提供了一个坚固的机构。这种机制必须能够应付诸如网络中断或进程故障事件。在许多解决方案中,下属的工作情况监控的领导者,通过某种类型的心跳机制,或通过轮询。如果指定的领导者意外终止或网络故障使得领导者不通下属的工作情况,有必要为他们选出了新的领导人。
有可用于选举的领导者之间的一组任务在分布式环境中,包括几个策略:
在决定如何实现这个模式时,请考虑以下几点:
使用此模式时,在分布式应用程序的任务,比如云托管解决方案,需要认真协调,也没有天生的领导者。
注意: 避免使领导者在系统中的瓶颈。领导者的目的是协调的附属任务完成的工作,它不一定有机会参加这项工作本身,尽管它应该是有能力这样做,如果任务没有当选领导人。
这种模式可能不适合:
在列入可用于本指南中的示例代码中 LeaderElection 解决方案 DistributedMutex 项目展示了如何使用租赁在 Azure 存储 BLOB 提供了一种机制,实现共享的分布式互斥。此互斥锁可以用来选择在 Azure 云服务的领导者之间的一组角色的实例。第一个角色实例获得租约当选的领导人,并保持领先直至其租赁或直到它无法续租。其他角色实例可以继续监视在领导不再可用的情况下将 BLOB 租赁。
一个 BLOB 租赁是在一个 blob 的排他写锁。单个 BLOB 可以是一整租的在任何一个时间点的问题。角色实例可以请求租约在指定的斑点,而且将被授予租约,如果没有其他租赁在同一个斑点,是由这个或任何其他作用,比如举行,否则将抛出一个异常。
为了减少一个故障角色实例保留无限期租用的可能性,指定了一辈子的租约。当此期满后,租赁变为可用。然而,当一个角色实例持有的租赁也可以请求租约到期,并且将被授予租约的时间再延长。角色实例可以不断重复这一过程,如果它希望保留租约。
有关如何租用一个 blob 的详细信息,请参阅租赁斑点(REST API)在 MSDN 上。
public class BlobDistributedMutex { ... private readonly BlobSettings blobSettings; private readonly Func<CancellationToken, Task> taskToRunWhenLeaseAcquired; ... public BlobDistributedMutex(BlobSettings blobSettings, Func<CancellationToken, Task> taskToRunWhenLeaseAquired) { this.blobSettings = blobSettings; this.taskToRunWhenLeaseAquired = taskToRunWhenLeaseAquired; } public async Task RunTaskWhenMutexAcquired(CancellationToken token) { var leaseManager = new BlobLeaseManager(blobSettings); await this.RunTaskWhenBlobLeaseAcquired(leaseManager, token); } ...代码示例中的 RunTaskWhenMutexAquired 上述方法调用以下代码示例来实际获得的租赁所示的 RunTaskWhenBlobLeaseAcquired 方法。该 RunTaskWhenBlobLeaseAcquired 方法异步运行。如果租赁的成功获取,角色实例已经当选的领导者。该 taskToRunWhenLeaseAcquired 委托的目的是为了执行协调其它角色实例的工作。如果未取得租赁,另一个角色实例已当选为领导人和当前角色实例仍然是一个下属。注意,TryAcquireLeaseOrWait 方法是使用 BlobLeaseManager 对象获取租赁一个辅助方法。
private async Task RunTaskWhenBlobLeaseAcquired( BlobLeaseManager leaseManager, CancellationToken token) { while (!token.IsCancellationRequested) { // Try to acquire the blob lease. // Otherwise wait for a short time before trying again. string leaseId = await this.TryAquireLeaseOrWait(leaseManager, token); if (!string.IsNullOrEmpty(leaseId)) { // Create a new linked cancellation token source so that if either the // original token is cancelled or the lease cannot be renewed, the // leader task can be cancelled. using (var leaseCts = CancellationTokenSource.CreateLinkedTokenSource(new[] { token })) { // Run the leader task. var leaderTask = this.taskToRunWhenLeaseAquired.Invoke(leaseCts.Token); ... } } } }由领导开始的任务还异步执行。虽然这个任务正在运行,下面的代码示例所示的 RunTaskWhenBlobLeaseAquired方法周期性地尝试续订租约。这个动作有助于确保该角色实例保持领先。在简单解决方案中,续订请求之间的延迟小于对租赁期限,以防止另一角色实例当选的领导人指定的时间。
如果更新失败,以任何理由,任务被取消。如果租用未能被更新或任务被取消(可能为角色实例关停的结果),租赁被释放。在这一点上,这个或另一个角色实例可能被选为领导者。下面的代码提取物显示出此过程的一部分。
private async Task RunTaskWhenBlobLeaseAcquired( BlobLeaseManager leaseManager, CancellationToken token) { while (...) { ... if (...) { ... using (var leaseCts = ...) { ... // Keep renewing the lease in regular intervals. // If the lease cannot be renewed, then the task completes. var renewLeaseTask = this.KeepRenewingLease(leaseManager, leaseId, leaseCts.Token); // When any task completes (either the leader task itself or when it could // not renew the lease) then cancel the other task. await CancelAllWhenAnyCompletes(leaderTask, renewLeaseTask, leaseCts); } } } } ... }
该 KeepRenewingLease 方法是使用 BlobLeaseManager 对象续租另一个 helper 方法。该 CancelAllWhenAnyCompletes 方法取消指定为前两个参数的任务。
图 1 示出了 BlobDistributedMutex 类的功能。
var settings = new BlobSettings(CloudStorageAccount.DevelopmentStorageAccount, "leases", "MyLeaderCoordinatorTask"); var cts = new CancellationTokenSource(); var mutex = new BlobDistributedMutex(settings, MyLeaderCoordinatorTask); mutex.RunTaskWhenMutexAcquired(this.cts.Token); ... // Method that runs if the role instance is elected the leader private static async Task MyLeaderCoordinatorTask(CancellationToken token) { ... }
请注意有关样品溶液中的以下几点: