<acronym id="s8ci2"><small id="s8ci2"></small></acronym>
<rt id="s8ci2"></rt><rt id="s8ci2"><optgroup id="s8ci2"></optgroup></rt>
<acronym id="s8ci2"></acronym>
<acronym id="s8ci2"><center id="s8ci2"></center></acronym>
0
  • 聊天消息
  • 系統消息
  • 評論與回復
登錄后你可以
  • 下載海量資料
  • 學習在線課程
  • 觀看技術視頻
  • 寫文章/發帖/加入社區
會員中心
創作中心

完善資料讓更多小伙伴認識你,還能領取20積分哦,立即完善>

3天內不再提示

一個注解實現WebSocket集群的方案

jf_ro2CN3Fa ? 來源:CSDN ? 2023-04-10 11:37 ? 次閱讀

介紹

WebSocket大家應該是再熟悉不過了,如果是單體應用確實不會有什么問題,但是當我們的項目使用微服務架構時,就可能會存在問題

比如服務A有兩個實例A1和A2,前端的WebSocket客戶端C通過網關的負載均衡連到了A1,這個時候當A2觸發消息發送的邏輯,需要將某個消息發送給所有的客戶端時,C就接受不到消息

這個時候我們很快就能想到一種最簡單的解決方案,就是把A2的消息轉發給A1,A1再把消息發送給C,這樣C就能收到A2發送的消息了

81f46d56-d6c8-11ed-bfe3-dac502259ad0.png

基于這個思路,我實現了一個庫,一個配置注解搞定一切

用法

接下來讓我們看看這個庫的用法

首先我們需要在啟動類上添加一個注解@EnableWebSocketLoadBalanceConcept

@EnableWebSocketLoadBalanceConcept
@EnableDiscoveryClient
@SpringBootApplication
publicclassAServiceApplication{

publicstaticvoidmain(String[]args){
SpringApplication.run(AServiceApplication.class,args);
}
}

接著我們在需要發送消息的地方注入WebSocketLoadBalanceConcept就可以愉快的跨實例發消息啦

@RestController
@RequestMapping("/ws")
publicclassWsController{

@Autowired
privateWebSocketLoadBalanceConceptconcept;

@RequestMapping("/send")
publicvoidsend(@RequestParamStringmsg){
concept.send(msg);
}
}

是不是很簡單,有沒有覺得比自己集成單體應用的WebSocket還要簡單!

當你的同事還在頭疼要實現手動轉發時你已經通過一個配置注解實現了功能并開始泡茶喝

你的同事肯定對你刮目相看?。ㄓ帜荛_始摸魚了)

不知道大家看了之后是不是對具體實現已經有了一些思路呢

接下來我就來講講這個庫的實現流程

抽象思路

其實我之前有專門針對WebSocket實現過類似功能的模塊,只是當時的一些場景都是基于項目定死的,所以相對來說實現比較簡單,但是過于定制化不好擴展

有一天在和我的一個前同事聊天的過程中得知,他們在考慮讓設備和服務直連,并且服務要部署成多實例

設備和服務直連無非就是通過TCP這種長連接來實現,可以使用緩存來保存連接和服務地址的映射關系來實現點對點轉發的功能需求

聽到這里,是不是感覺似曾相識?當時就有一道光穿過我的腦瓜子,真相只有一個!這不就和WebSocket在集群模式下的問題一樣么

于是我從原來針對WebSocket的思考,變成了對各種長連接的思考,最終我將這個問題抽象成了:長連接的集群方案

而不管是WebSocket還是TCP都是長連接的一種具體實現

所以我們可以抽象一個頂級接口Connection,然后實現WebSocketConnection或者是TCPConnection

其實從抽象的角度來說不僅僅是長連接,短連接也在我們的抽象范圍之內,只不過類似HTTP等協議并不存在上述的問題,但是并不妨礙你實現一個HTTPConnection用于轉發消息,所以大家不要被先入為主的思維束縛住了

轉發思路

之前講到,這個庫的主要思路就是將消息轉發給其他的服務實例來達到一個單播或廣播的效果

所以消息轉發的設計就非常重要了

首先消息轉發需要憑借一些支持數據交互的技術手段

比如HTTP,MQ,TCP,WebSocket

說到這里。。。大家是不是。。。你TM原來自己就能搞定?。ㄏ谱溃?/p>

長連接不就是用來交互數據的嗎,所以完全可以自給自足啊

于是就有一個精妙的想法在我腦子里形成:

如果每個服務實例都把自己作為一個客戶端,連接到其他服務上呢?

WebSocket的場景下,我們將當前服務實例作為一個WebSocket客戶端去連接其他服務實例的WebSocket服務端

TCP的場景下,我們將當前服務實例作為一個TCP的客戶端去連接其他服務實例的TCP服務端

這樣其他服務實例就可以把消息發到這些偽裝的客戶端上,當服務實例上偽裝的客戶端接收到消息之后就可以再轉發給自己管理的真正的客戶端

撒花家人們,自閉(自我閉環)了屬于是

所以我們首先需要先讓服務實例之間相互連接上

連接流程

讓我們來看看互相建立連接是怎么設計的

82155278-d6c8-11ed-bfe3-dac502259ad0.png

我定義了一個ConnectionSubscriber的接口,大家可以理解為我們的服務實例要去訂閱監聽其他服務發送的消息

同時提供了默認實現,就是基于自身的協議進行連接和消息的發送

當然也能夠靈活的支持其他方式,只需要自定義一個ConnectionSubscriber就可以了,如果使用MQ的方式就可以實現一個MQConnectionSubscriber或者使用HTTP就可以實現一個HTTPConnectionSubscriber

只不過使用自身的協議就可以不用依賴其他的庫或是中間件了,當然如果你對消息的丟失率有比較嚴格的要求也可以使用MQ作為消息轉發的中介,而以我之前參與過的項目來說,一般普通的WebSocket場景基本上還是能忍受一定的丟失率的

獲取服務實例信息

那么我們怎么知道要去連接哪些實例呢

我定義了一個ConnectionServerManager的接口用來管理服務信息

當然我們完全可以自己實現一個,比如通過配置文件來配置服務實例信息

不過我們有更方便的方式,那就是依賴Spring Cloud的服務發現組件了,不管是Eureka還是Nacos還是其他的注冊中心相當于都支持了,這就是抽象的魅力啊

我們可以通過DiscoveryClient#getInstances(Registration.getServiceId())來獲得所有的實例,排除掉自身就是需要連接的服務實例了

當我們的服務實例連接上其他的服務實例之后,發送一個自身實例信息的消息過去,其他的服務實例接收到對應的消息之后反過來連接我們的服務實例,保證一定的連接及時性,這樣雙方的連接就搭建起來了,可以互相轉發消息了

同時我還添加了心跳檢測和自動重連,當一段時間沒有收到心跳回復后就會斷開連接,并且每隔一段時間就會重新查詢一遍實例信息,如果發現存在某個服務實例沒有對應的連接,就會重新進行連接,這樣就能在某些偶爾網絡不好的情況下有一定的容錯

到目前為止,我們基本的框架已經建立了,當我們啟動服務之后,服務間就會自動建立連接

連接區分和管理

基于上述的思路,我們肯定需要區分真實的客戶端和用來轉發的客戶端

于是我就把這些連接做了一個分類

類別 說明
Client 普通的連接
Subscriber 服務實例偽裝的連接,用于接受需要轉發的消息
Observable 服務實例偽裝的連接,用于發送需要轉發的消息

然后對于這些連接進行一個統一的管理

826dec12-d6c8-11ed-bfe3-dac502259ad0.png

通過連接工廠ConnectionFactory我們可以將任意的連接適配成Connection對象,并實現各種連接間的消息轉發

每個連接都會配置一個MessageEncoder和MessageDecoder用于消息的編碼和解碼,而且不同類別的連接對應的編碼器和解碼器肯定是不一樣的,比如轉發的消息和發給真實客戶端的消息很大程度上都是有區別的,所以額外定義了一個MessageCodecAdapter用來適配不同類型的編解碼器,也能讓大家在自定義時方便管理

消息發送

現在當我們發送某條消息之后,消息就會被轉發到其他的服務實例,所有的客戶端就都能收到了

不對啊,在有些情況下我們不想讓所有客戶端都收到啊,能不能我們想讓誰收到就讓誰收到啊

真麻煩,來,我把所有的連接都給你,你自己選吧

連接選擇

我們需要在消息發送時確定發送給哪些連接

82cd30be-d6c8-11ed-bfe3-dac502259ad0.png

于是我就定義了一個連接選擇器ConnectionSelector

每次要發送消息的時候,我都會匹配一個連接選擇器,然后通過選擇器來獲得需要發送消息的連接,而我們可以通過自定義連接選擇器來實現我們消息的精準發送

這里其實就是我為什么會取名WebSocketLoadBalanceConcept的原因,為什么要叫LoadBalance呢

Ribbon通過IRule來選擇一個Server

我通過ConnectionSelector來選擇一個Connection集合

是不是有異曲同工之妙

繼續來說自定義選擇器

準備工作:

我們的Connection有一個metadata字段用于存放自定義屬性

我們的Message有一個headers字段用于存放消息頭

給指定用戶發送消息

很多場景下我們需要給指定的用戶發送消息

首先當客戶端連接上來時,可以通過參數或者主動發送一個消息將userId發給服務端,然后服務端將得到的userId存在Connection的metadata中

接著我們給需要發送的Message添加一個header,將對應的userId作為消息頭

這樣我們就可以自定義一個連接選擇器通過判斷Message是否包含userId消息頭來作為匹配的條件,當Message的headers中存在userId時,對Connection中的metadata進行userId的匹配來篩選需要發送消息的連接

由于userId是唯一的,當我們自身服務連上來的客戶端中已經匹配到就不需要再轉發了,如果沒有匹配到就通過其他服務實例的客戶端進行消息轉發

庫中已經實現了對應的UserSelector和UserMessage,可以使用配置開啟并通過在連接路徑上添加userId參數來標記用戶

當然我們也可以借用緩存來精確的判斷需不需要轉發或者是需要轉發給哪幾個服務,把userId和服務的instanceId等一些具有唯一性的數據緩存在Redis中,當給用戶發送消息時,從Redis中獲得用戶對應的服務實例的instanceId或是具有唯一性的數據,如果經過匹配就是當前服務就可以直接下發,如果是其他服務就轉發給那個對應的服務就行了

給指定路徑發送消息

還有一種場景也比較常見就是類似主題訂閱,如訂閱設備狀態更新的數據,就要給每一個對應路徑的連接發送消息了

我們可以使用不同的路徑來表示不同主題,然后自定義一個連接選擇器來匹配連接的路徑和消息頭中指定的路徑

當然庫中也已經實現了對應的PathSelector和PathMessage,可以通過配置開啟

結束

最后請允許我發表一點對于抽象的拙見

抽象其實就和 “道生一,一生二,二生三,三生萬物” 一樣,根據你的頂級接口(也就是核心功能)不斷的向外展開,你的頂級接口就是道(狹義的來講)

以這個庫為例,ConnectionLoadBalanceConcept就是這個庫的道,他的核心功能就是發送消息,至于怎么發,發給誰,不確定,像是一個混沌的狀態

那么什么是一,二,三呢,我們發送消息需要載體于是就有了Connection和Message,我們需要對Connection進行管理于是就有了ConnectionRepository,我們需要轉發消息于是就有了ConnectionSubscriber等等

而萬物就像是具體的實現,是能落實的,基于Spring Cloud服務發現的連接管理器DiscoveryConnectionServerManager,基于路徑的連接選擇器PathSelector,基于Reactive的WebSocket連接ReactiveWebSocketConnection

就像是你創造的世界,不斷的衍生出各種各樣的規則,這些規則相輔相成,讓你的世界平穩的運行。






審核編輯:劉清

聲明:本文內容及配圖由入駐作者撰寫或者入駐合作網站授權轉載。文章觀點僅代表作者本人,不代表電子發燒友網立場。文章及其配圖僅供工程師學習之用,如有內容侵權或者其他違規問題,請聯系本站處理。 舉報投訴
  • HTTP協議
    +關注

    關注

    0

    文章

    55

    瀏覽量

    9645
  • TCP通信
    +關注

    關注

    0

    文章

    145

    瀏覽量

    4154
  • WebSocket
    +關注

    關注

    0

    文章

    24

    瀏覽量

    3673

原文標題:一個注解實現 WebSocket 集群方案,這樣玩才爽!

文章出處:【微信號:芋道源碼,微信公眾號:芋道源碼】歡迎添加關注!文章轉載請注明出處。

收藏 人收藏

    評論

    相關推薦

    鴻蒙原生應用開發-網絡管理WebSocket連接

    。使用該功能需要申請ohos.permission.INTERNET權限。具體接口說明如下表。 三、開發步驟 1.導入需要的webSocket模塊。 2.創建WebSocket連接
    發表于 04-07 09:46

    三菱M70 PLC增加注解的三種方法簡析

    三菱M70 PLC增加注解的三種方法
    的頭像 發表于 02-26 09:59 ?310次閱讀
    三菱M70 PLC增加<b class='flag-5'>注解</b>的三種方法簡析

    恒訊科技帶大家深入理解:WebSocket服務器的工作原理

    WebSocket是一種在單個TCP連接上進行全雙工通信的通信協議。它的設計目標是在Web瀏覽器和服務器之間提供低延遲、高效的雙向通信。下面是深入理解WebSocket服務器工作原理的一些關鍵概念
    的頭像 發表于 01-29 16:48 ?229次閱讀

    一文詳解WebSocket協議

    WebSocket出現之前,一個Web應用(即時聊天、多人協作)的客戶端和服務端之間常見的雙向數據交換方式有短輪詢、長輪詢、SSE(Server-Sent Events,服務器發送事件)。這些方式
    的頭像 發表于 01-07 11:26 ?2498次閱讀
    一文詳解<b class='flag-5'>WebSocket</b>協議

    redis集群中的hash一致性算法的理解

    Redis集群是一種為了增強Redis的可擴展性和高可用性而設計的集群方案。在Redis集群中,一致性哈希算法被廣泛地應用于數據分片和負載均衡。 一、Redis
    的頭像 發表于 12-04 10:45 ?407次閱讀

    SpringBoot核心注解由幾個注解組成

    簡化應用程序開發的注解,其中核心注解包括 @SpringBootApplication、@RestController、@RequestMapping、@Autowired、@ComponentScan
    的頭像 發表于 12-03 15:09 ?393次閱讀

    springboot核心注解

    Spring Boot 是基于 Spring 框架的開源框架,它可以幫助開發者快速構建、部署和運行獨立的、生產級的 Spring 應用程序。Spring Boot 提供了一系列核心注解,這些注解可以
    的頭像 發表于 11-23 09:23 ?332次閱讀

    springmvc常用5種注解

    SpringMVC是一種基于Java的Web框架,使用注解可以更加方便靈活地開發和管理控制器,實現請求的映射和處理。在SpringMVC中,有許多常用的注解,本文將詳細介紹其中的五種注解
    的頭像 發表于 11-22 16:51 ?419次閱讀

    websocket協議的原理

    WebSocket協議是基于TCP的一種新的網絡協議。它實現了瀏覽器與服務器全雙工(full-duplex)通信——允許服務器主動發送信息給客戶端。 WebSocket通信協議于2011年被IETF
    的頭像 發表于 11-09 15:13 ?506次閱讀
    <b class='flag-5'>websocket</b>協議的原理

    如何通過注解來優化我們的Java代碼

    Java注解可以說是我們編碼過程中最常用的。本篇文章將給大家介紹Java注解的概念、作用以及如何使用注解來提升代碼的可讀性和靈活性,并介紹如何通過注解來優化我們的Java代碼。 1、什
    的頭像 發表于 09-30 11:39 ?381次閱讀

    【中秋國慶不斷更】HarmonyOS網絡管理開發—HTTP與WebSocket

    、網絡管理開發概述 網絡管理模塊主要提供以下功能: ● ??HTTP數據請求??:通過HTTP發起數據請求。 ● ??WebSocket連接??:使用
    發表于 09-27 15:40

    LabView工具包WebSocket-API主機下載

    LabView工具包WebSocket-API主機下載
    發表于 09-19 09:15 ?65次下載

    WebSocket的6種集成方式介紹

    由于前段時間我實現了一個庫【Spring Cloud】一個配置注解實現 WebSocket 集群方案
    的頭像 發表于 09-02 16:52 ?975次閱讀

    物聯網與ESP32、Node-RED和InfluxDB與WebSocket

    電子發燒友網站提供《物聯網與ESP32、Node-RED和InfluxDB與WebSocket.zip》資料免費下載
    發表于 07-13 09:51 ?0次下載
    物聯網與ESP32、Node-RED和InfluxDB與<b class='flag-5'>WebSocket</b>

    如何實現一個注解進行數據脫敏

    ? 什么是數據脫敏 開胃菜 使用 Hutool 工具類實現數據掩碼 使用 Jackson 進行數據序列化脫敏 注解實現數據脫敏 1、定義一個注解 2、創建一個枚舉類 3、創建我們的自定
    的頭像 發表于 06-14 09:37 ?707次閱讀
    如何<b class='flag-5'>實現</b>一個<b class='flag-5'>注解</b>進行數據脫敏
    亚洲欧美日韩精品久久_久久精品AⅤ无码中文_日本中文字幕有码在线播放_亚洲视频高清不卡在线观看
    <acronym id="s8ci2"><small id="s8ci2"></small></acronym>
    <rt id="s8ci2"></rt><rt id="s8ci2"><optgroup id="s8ci2"></optgroup></rt>
    <acronym id="s8ci2"></acronym>
    <acronym id="s8ci2"><center id="s8ci2"></center></acronym>