Loading [MathJax]/jax/input/TeX/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >分布式模式14-State Watch

分布式模式14-State Watch

作者头像
java达人
发布于 2021-12-21 08:18:41
发布于 2021-12-21 08:18:41
34100
代码可运行
举报
文章被收录于专栏:java达人java达人
运行总次数:0
代码可运行

作者: Unmesh Joshi

译者: java达人

来源: https://martinfowler.com/articles/patterns-of-distributed-systems/

服务器上的特定值更改时通知客户端

问题

客户端关注服务器上特定值的更改。如果客户需要不断地轮询服务器以查找更改,则很难构造其逻辑。如果客户端打开太多的服务器连接来监视更改,则可能使服务器不堪重负。

解决方案

允许客户端向服务器注册其关注点以进行特定状态更改。状态发生变化时,服务器会通知关注的客户端。客户端与服务器维护一个单一套接字通道。服务器在此通道上发送状态更改通知。客户端可能对多个值感兴趣,但是保持每个监视的连接可能会使服务器不堪重负。因此客户可以使用请求管道。

考虑一个在Consistent Core中使用的简单键值存储示例:当某个特定键的值更改或某个键被删除时,客户端可能会对此关注。该实现由两部分组成:客户端实现和服务器端实现。

客户端实现

客户端将接收键和函数,当客户端从服务器获取监视事件时函数被调用, 客户端存储方法对象以供以后调用。然后,它向服务器发送注册监视器的请求。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
ConcurrentHashMap<String, Consumer<WatchEvent>> watches = new ConcurrentHashMap<>();


public void watch(String key, Consumer<WatchEvent> consumer) {
    watches.put(key, consumer);
    sendWatchRequest(key);
}

private void sendWatchRequest(String key) {
    requestSendingQueue.submit(new RequestOrResponse(RequestId.WatchRequest.getId(),
            JsonSerDes.serialize(new WatchRequest(key)),
            correlationId.getAndIncrement()));
}

当在连接上收到监视器事件时,相应的消费者触发调用

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
this.pipelinedConnection = new PipelinedConnection(address, requestTimeoutMs, (r) -> {
    logger.info("Received response on the pipelined connection " + r);
    if (r.getRequestId() == RequestId.WatchRequest.getId()) {
        WatchEvent watchEvent = JsonSerDes.deserialize(r.getMessageBodyJson(), WatchEvent.class);
        Consumer<WatchEvent> watchEventConsumer = getConsumer(watchEvent.getKey());
        watchEventConsumer.accept(watchEvent);
        lastWatchedEventIndex = watchEvent.getIndex(); //capture last watched index, in case of connection failure.
    }
    completeRequestFutures(r);
});

服务端实现

当服务器收到监视器注册请求时,它将保留接收请求的管道连接以及keys的映射。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
private Map<String, ClientConnection> watches = new HashMap<>();
private Map<ClientConnection, List<String>> connection2WatchKeys = new HashMap<>();

public void watch(String key, ClientConnection clientConnection) {
    logger.info("Setting watch for " + key);
    addWatch(key, clientConnection);
}

private synchronized void addWatch(String key, ClientConnection clientConnection) {
    mapWatchKey2Connection(key, clientConnection);
    watches.put(key, clientConnection);
}

private void mapWatchKey2Connection(String key, ClientConnection clientConnection) {
    List<String> keys = connection2WatchKeys.get(clientConnection);
    if (keys == null) {
        keys = new ArrayList<>();
        connection2WatchKeys.put(clientConnection, keys);
    }
    keys.add(key);
}

ClientConnection包装了与客户端连接的套接字。它具有以下结构。对于基于阻塞IO的服务器和基于非阻塞IO的服务器,此结构均相同。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public interface ClientConnection {
    void write(RequestOrResponse response);
    void close();
}

单个连接上可以注册多个监听器。因此,存储从连接到监视器键列表的映射很重要。关闭客户端连接时需要使用它来删除所有关联的监视器,如下所示:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
  public void close(ClientConnection connection) {
        removeWatches(connection);
    }

    private synchronized void removeWatches(ClientConnection clientConnection) {
        List<String> watchedKeys = connection2WatchKeys.remove(clientConnection);
        if (watchedKeys == null) {
            return;
        }
        for (String key : watchedKeys) {
            watches.remove(key);
        }
    }

使用Reactive Streams (https://www.reactive-streams.org/) 此处的示例显示将事件直接写入管道连接。在应用程序级别具有某种类型的背压非常有用。如果产生了很多事件,则控制事件的发送速率非常重要。使事件的生产者和消费者保持同步是重要的考虑因素。etcd中此问题的一个示例(https://github.com/etcd-io/etcd/issues/11906)说明这些注意事项在生产中很重要。[reactive-streams] 使用背压作为首要概念,使代码更容易编写。像rsocket这样的协议提供了一种结构化的方法来实现此目的。

当服务器上发生诸如为key设置值之类的特定事件时,服务器会通过构造相关的WatchEvent来通知所有注册的客户端

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
private synchronized void notifyWatchers(SetValueCommand setValueCommand, Long entryId) {
    if (!hasWatchesFor(setValueCommand.getKey())) {
        return;
    }
    String watchedKey = setValueCommand.getKey();
    WatchEvent watchEvent = new WatchEvent(watchedKey,
                                setValueCommand.getValue(),
                                EventType.KEY_ADDED, entryId);
    notify(watchEvent, watchedKey);
}

private void notify(WatchEvent watchEvent, String watchedKey) {
    List<ClientConnection> watches = getAllWatchersFor(watchedKey);
    for (ClientConnection pipelinedClientConnection : watches) {
        try {
            String serializedEvent = JsonSerDes.serialize(watchEvent);
            getLogger().trace("Notifying watcher of event "
                    + watchEvent +
                    " from "
                    + server.getServerId());
            pipelinedClientConnection
                    .write(new RequestOrResponse(RequestId.WatchRequest.getId(),
                            serializedEvent));
        } catch (NetworkException e) {
            removeWatches(pipelinedClientConnection); //remove watch if network connection fails.
        }
    }
}

要注意的关键事情之一是,可以从客户端请求处理代码和客户端连接处理代码(关闭连接)中并发访问与监视相关的状态。因此,所有访问监视状态的方法都需要使用锁进行保护。

在层次结构存储中的监控

Consistent Core主要支持分层存储。可以在父节点或键的前缀上设置监视器。对子节点的任何更改都会触发在父节点上设置的监视器。对于每个事件,Consistent Core会遍历路径以检查父路径上是否有监视器设置,并将事件发送给所有这些监视器。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
List<ClientConnection> getAllWatchersFor(String key) {
    List<ClientConnection> affectedWatches = new ArrayList<>();
    String[] paths = key.split("/");
    String currentPath = paths[0];
    addWatch(currentPath, affectedWatches);
    for (int i = 1; i < paths.length; i++) {
        currentPath = currentPath + "/" + paths[i];
        addWatch(currentPath, affectedWatches);
    }
    return affectedWatches;
}

private void addWatch(String currentPath, List<ClientConnection> affectedWatches) {
    ClientConnection clientConnection = watches.get(currentPath);
    if (clientConnection != null) {
        affectedWatches.add(clientConnection);
    }
}

这样就可以将监视器设置在key前缀(例如“servers”)上。使用此前缀创建的任何key(例如“ servers / 1”,“ servers / 2”)都将触发此监视器。

由于要调用的函数的映射是使用键前缀存储的,因此遍历层次结构以找到客户端接收到事件时要调用的函数也很重要。一种替代方法是将事件触发的路径与事件一起发送,以便客户端知道导致事件发送的监视器。

处理连接失败

客户端和服务器之间的连接随时可能失败。在某些用例中,这是有问题的,因为客户端在断开连接时可能会错过某些事件。例如,集群控制器可能会对某些节点是否发生故障感兴趣,这由某些key的删除事件指示。客户端需要将收到的最后一个事件告知服务器。当客户端再次重置监视器时,客户端发送最后收到的事件编号。服务器应从该事件号开始发送其已记录的所有事件。

在Consistent Core客户端中,可以在客户端重新建立与leader的连接时完成。

Kafka中基于拉模式的设计 在监视器的典型设计中,服务器将监视器事件推送给客户端。[kafka]遵循端到端基于拉模式的设计。在其新架构中,Kafka broker将定期从Controller Quorum(它本身就是Consistent Core的一个例子)中提取元数据日志。基于偏移量的拉取机制允许客户端像其他任何Kafka消费者一样,从最后一个已知偏移量中读取事件,从而避免事件丢失。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
private void connectToLeader(List<InetAddressAndPort> servers) {
    while (isDisconnected()) {
        logger.info("Trying to connect to next server");
        waitForPossibleLeaderElection();
        establishConnectionToLeader(servers);
    }
    setWatchesOnNewLeader();
}

private void setWatchesOnNewLeader() {
    for (String watchKey : watches.keySet()) {
        sendWatchResetRequest(watchKey);
    }
}

private void sendWatchResetRequest(String key) {
    pipelinedConnection.send(new RequestOrResponse(RequestId.SetWatchRequest.getId(),
            JsonSerDes.serialize(new SetWatchRequest(key, lastWatchedEventIndex)), correlationId.getAndIncrement()));
}

服务器对发生的每个事件进行编号。例如,如果服务器是Consistent Core,则它以严格的顺序存储所有状态更改,并且每个更改都用“Write-Ahead Log”中讨论的日志索引编号,然后客户端可以从特定指标以下位置开始请求事件。

从键值存储库派生事件

查看键值存储库的当前状态来生成事件,如果它还对发生的每个更改进行编号并存储每个编号值。

当客户端重新建立与服务器的连接时,它可以再次设置监视器,并发送最后看到的更改编号。然后,服务器可以将其与存储的值进行比较,如果它大于客户端发送的值,则服务器可以将事件重新发送给客户端。从键值存储中派生事件可能会有些尴尬,因为需要猜测事件。它可能会错过一些事件。-例如,如果创建了key然后又将其删除-在客户端断开连接时,将丢失create事件。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
private synchronized void eventsFromStoreState(String key, long stateChangesSince) {
    List<StoredValue> values = getValuesForKeyPrefix(key);
    for (StoredValue value : values) {
        if (values == null) {
            //the key was probably deleted send deleted event
            notify(new WatchEvent(key, EventType.KEY_DELETED), key);
        } else if (value.index > stateChangesSince) {
            //the key/value was created/updated after the last event client knows about
            notify(new WatchEvent(key, value.getValue(), EventType.KEY_ADDED, value.getIndex()), key);
        }
    }
}

[zookeeper]使用这种方法。默认情况下,zookeeper中的监视器也是一次性触发器。触发事件后,如果客户端想要接收其他事件,则需要再次设置监视器。在重新设置监视器之前,可能会错过一些事件,因此客户端需要确保他们读到了最近状态,这样他们不会错过任何更新。

存储事件历史

保留过去事件的历史记录并从事件历史记录中回复客户端会更容易。这种方法的问题在于需要将事件历史记录限制为比如1000个事件。如果客户端断开连接的时间较长,则可能会错过超过1,000个事件窗口的事件。使用Google guava的EvictingQueue的简单实现如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class EventHistory implements Logging {
    Queue<WatchEvent> events = EvictingQueue.create(1000);
    public void addEvent(WatchEvent e) {
        getLogger().info("Adding " + e);
        events.add(e);
    }

    public List<WatchEvent> getEvents(String key, Long stateChangesSince) {
        return this.events.stream()
                .filter(e -> e.getIndex() > stateChangesSince && e.getKey().equals(key))
                .collect(Collectors.toList());
    }
}

当客户端重新建立起连接,重新设置监控时,事件可以从历史中发送。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
private void sendEventsFromHistory(String key, long stateChangesSince) {
    List<WatchEvent> events = eventHistory.getEvents(key, stateChangesSince);
    for (WatchEvent event : events) {
        notify(event, event.getKey());
    }
}

使用多版本存储

跟踪所有更改,可以使用多版本存储。它跟踪每个key的所有版本,并可以轻松地从所请求的版本中获取所有更改。

[etcd]版本3开始使用此方法

例子

[zookeeper]可以在节点上设置监视器。

诸如[kafka]之类的产品将其用于组成员身份和集群成员的故障检测。

[etcd]具有监视器实现,在[kubernetes]资源监视实现中有大量使用。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-12-19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 java达人 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
分布式系统模式13-Consistent Core
维护较小的集群可提供更强的一致性,以允许大型数据集群协调服务器活动,而无需实现基于 quorum 的算法。
java达人
2021/04/09
5600
redis 分布式锁的 5个坑,真是又大又深
最近项目上线的频率颇高,连着几天加班熬夜,身体有点吃不消精神也有些萎靡,无奈业务方催的紧,工期就在眼前只能硬着头皮上了。脑子浑浑噩噩的时候,写的就不能叫代码,可以直接叫做Bug。我就熬夜写了一个bug被骂惨了。
程序员小富
2020/04/22
2.3K0
利用Zookeeper实现分布式锁及服务注册中心
对于Zookeeper的定义以及原理,网上已经有很多的优秀文章对其进行了详细的介绍,所以本文不再进行这方面的阐述。 本文主要介绍一些基本的准备工作以及zookeeper.net的使用。 本文源代码github地址:https://github.com/Mike-Zrw/ZookeeperHelper zookeeper下载地址:https://archive.apache.org/dist/zookeeper/ ZooInspector下载地址:https://issues.apache.org/jira/
蓝夏
2018/04/28
7010
利用Zookeeper实现分布式锁及服务注册中心
基于Zookeeper的分布式锁
实现分布式锁目前有三种流行方案,分别为基于数据库、Redis、Zookeeper的方案,其中前两种方案网络上有很多资料可以参考,本文不做展开。我们来看下使用Zookeeper如何实现分布式锁。 什么是Zookeeper? Zookeeper(业界简称zk)是一种提供配置管理、分布式协同以及命名的中心化服务,这些提供的功能都是分布式系统中非常底层且必不可少的基本功能,但是如果自己实现这些功能而且要达到高吞吐、低延迟同时还要保持一致性和可用性,实际上非常困难。因此zookeeper提供了这些功能,开发者在zoo
CSDN技术头条
2018/02/06
9930
基于Zookeeper的分布式锁
分布式之Zookeeper一(分布式锁与Zookeeper集群)
说到分布式开发,不得不说的就是zookeeper了;zookeeper官网说到Apache ZooKeeper致力于开发和维护可实现高度可靠的分布式协调的开源服务器。那么zk作为一个协调者的存在,是分布式比不可少的一部分。废话不多说,直接上干货
Java_老男孩
2019/12/02
4790
分布式系统模式10-Request Pipeline
来源: https://martinfowler.com/articles/patterns-of-distributed-systems/
java达人
2021/01/05
5920
分布式系统模式10-Request Pipeline
用ZooKeeper实现分布式配置中心
比如我们线上有很多微服务分布在很多服务器上,有一天其中一个微服务比如user-service的ip地址需要变更,而user-service需要对很多其他程序提供服务,这个时候如果没有一个统一协调的东西,每个用到user-service的应用程序都要做相应的ip地址修改,这将是一件很麻烦的事情!
行百里er
2020/12/02
1.5K0
用ZooKeeper实现分布式配置中心
Zookeeper 分布式技术入门
Zookeeper 是一个开源的分布式(多台服务器干一件事)的,为分布式应用提供协调服务的 Apache 项目
RendaZhang
2020/10/26
9170
深入浅出Zookeeper源码(四):Watch实现剖析
用过zookeeper的同学都知道watch是一个非常好用的机制,今天我们就来看看它的实现原理。
泊浮目
2023/12/25
2340
分布式系统模式5-Leader和Follower
来源: https://martinfowler.com/articles/patterns-of-distributed-systems/
java达人
2020/12/16
1.1K0
分布式系统模式5-Leader和Follower
Zookeeper 操作练习
与ZooKeeper集合进行交互的应用程序称为 ZooKeeper客户端或简称客户端。 Znode是ZooKeeper集合的核心组件,ZooKeeper API提供了一小组方法使用ZooKeeper集合来操纵znode的所有细节。
名字是乱打的
2021/12/22
3270
Zookeeper 操作练习
分布式系统模式9-Single Socket Channel
来源: https://martinfowler.com/articles/patterns-of-distributed-systems/
java达人
2021/01/05
5800
分布式系统模式9-Single Socket Channel
分布式系统模式8-Singular Update Queue
来源: https://martinfowler.com/articles/patterns-of-distributed-systems/
java达人
2021/01/05
6690
分布式系统模式8-Singular Update Queue
zookeeper实现分布式锁的两种方式
jdk提供的synchronized和ReentrantLock可以帮助我们在单进程中解决资源共享数据一致性,但是在分布式系统中是多进程多线程,这个时候仅仅使用jdk实现的锁解决不了资源共享的问题,比如某商城中数据库有10个商品,A用户想要买走6个,B用户想买走5个。如果系统运行在单台机器上,我们使用Jdk提供的锁,可以保证数据的一致性,但是当系统运行在多台机器中,JDK实现的锁就会失效,这个时候就应该使用分布式锁,每次只能保证一台机器在请求资源。分布式锁有三种不同的方式实现,分别是数据库提供的分布式锁、redis、zookeeper实现,今天主要讲zookeeper实现分布式锁
全栈程序员站长
2022/07/04
5300
zookeeper实现分布式锁的两种方式
聊聊 分布式 WebSocket 集群解决方案
最近做项目时遇到了需要多用户之间通信的问题,涉及到了WebSocket握手请求,以及集群中WebSocket Session共享的问题。
猫头虎
2024/04/07
2.1K0
聊聊 分布式 WebSocket 集群解决方案
Redis分布式锁的正确实现方式(Java版)
https://wudashan.cn/2017/10/23/Redis-Distributed-Lock-Implement/
全栈程序员站长
2022/09/05
1.4K0
Zookeeper分布式锁
在单体项目中jvm中的锁即可完成需要,但是微服务、分布式环境下,同一个服务可能部署在多台服务器上,多个jvm之间无法通过常用的jvm锁来完成同步操作,需要借用分布式锁来完成上锁、释放锁。例如在订单服务中,我们需要根据日期来生成订单号流水,就有可能产生相同的时间日期,从而出现重复订单号。(jdk8使用LocalDateTime线程安全,不会存在这样的问题)
海向
2020/06/17
4790
基于C++、JsonCpp、Muduo库实现的分布式RPC通信框架
RPC(Remote Procedure Call,远程过程调用 )允许程序调用远程计算机上的服务或函数,而无需显式编写网络通信代码,就像调用本地函数一样方便地调用远程服务的函数。
_小羊_
2025/05/03
1880
基于C++、JsonCpp、Muduo库实现的分布式RPC通信框架
zookeeper分布式协调机制及创建分布式锁
要了解zookeeper如何创建分布式锁,先了解一下zookeeper。zookeeper官网给出解释:Apache ZooKeeper致力于开发和维护开源服务器,实现高度可靠的分布式协调。
神秘的寇先森
2018/09/29
5690
Zookeeper 通知更新可靠吗? 解读源码找答案!
遇到Keepper通知更新无法收到的问题,思考节点变更通知的可靠性,通过阅读源码解析了解到zk Watch的注册以及触发的机制,本地调试运行模拟zk更新的不可靠的场景以及得出相应的解决方案。
特鲁门
2018/07/17
3.4K12
相关推荐
分布式系统模式13-Consistent Core
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验