




版權說明:本文檔由用戶提供并上傳,收益歸屬內容提供方,若內容存在侵權,請進行舉報或認領
文檔簡介
比如,我們會采用WebSocket長連接,來替代之前的HTTP輪詢方式,并且會加上一些課程中有講到的相對高級的功能,如應用層心跳、ACK機制等。希望通過期末整體技術實現(xiàn)上的升級,你能更深刻地體會到IM系統(tǒng)升級前后,對使用方和 現(xiàn)關于這次期末實戰(zhàn),希望你能夠完成的功能主要包括以下幾個部支持基于WebSocket的長連接消息收發(fā)均通過長連接進行通支持消息推送的ACK機制和重推機支持客戶端的心跳機制和雙端的idle超時斷連支持客戶端斷線后的自動重接下來,我們就針對以上這些需要升級的功能和新增的主要功能,來進行實現(xiàn)上的拆WebSocket長連首先,期末實戰(zhàn)一個比較大的改變就是,將之前HTTP輪詢的實現(xiàn),改造成真正的長連接。為了方便Web端的演示,這里我建議你可以使用WebSocket來實現(xiàn)。對于WebSocket,我們在客戶端JS(JavaScript)里主要是使用HTML5原生API實代if(window.WebSocket)websocket=newwebsocket.onmessage=function(event) 6//連接建立后websocket.onopen=function()
//連接關閉后websocket.onclose=function() //連接出現(xiàn)websocket.onerror=function() }elsealert("您的瀏覽器不支持WebSocket頁面打開時,JS先通過服務端的WebSocket地址建立長連接。要注意這里服務端連接的地址是ws://開頭的,不是http://的了;如果是使用加密的WebSocket協(xié)議,那么相應的地址應該是以wss://開頭的。建立長連之后,要針對創(chuàng)建的WebSocket對象進行的,我們只需要在各種比如,API主要支持的幾種有:長連接通道建立完成后,通過onopen來進行用戶信息的上報綁定;通過onmessage,對接收到的所有該連接上的數(shù)據進行處理,這個也是我們最的消息推送的處理邏輯;另外,在長連接通道發(fā)生異常錯誤,或者連接被關閉時,可以分別通過onerror和onclose兩個來進行處理。WebSocket連接,向服務器發(fā)送數(shù)據(消息)。這個功能在實現(xiàn)上也非常簡單,你只需要調用WebSocket對象的send方法就OK了。通過長連接發(fā)送消息的代碼設計如代1varsendMsgJson='{"type":3,"data":{"senderUid":'+sender_id+23此外,針對WebSocket在服務端的實現(xiàn),如果你是使用JVM(JavaVirtualMachine,Java虛擬機)系列語言的話,我推薦你使用比較成JavaNIO框架Netty來做實現(xiàn)。因為Netty本身對WebSocket的支持就很完善了,各種編器和WebSocket的處理采用Netty實現(xiàn)WebSocketServer的代碼,你可以參考下面的示例代碼代EventLoopGroupbossGroupnewEpollEventLoopGroup(serverConfig.bossThreads,new3EventLoopGroupworkerGroupnewEpollEventLoopGroup(serverConfig.workerThreads,new67ServerBootstrapserverBootstrap=newServerBootstrap().group(bossGroup,8ChannelInitializer<SocketChannel>initializer=newprotectedvoidinitChannel(SocketChannelch)throwsExceptionChannelPipelinepipeline=//先添加WebSocket相關的編器和協(xié)議處理pipeline.addLast(newpipeline.addLast(newpipeline.addLast(newpipeline.addLast(newWebSocketServerProtocolHandler("/",null,//再添加服務端業(yè)務消息//服務端添加一個idle處理器,如果一段時間Socket中沒有消息傳輸,服pipeline.addLast(newIdleStateHandler(0,0, 24首先創(chuàng)建服務器的ServerBootstrap對象。Netty作為服務端,從ServerBootstrap啟動,ServerBootstrap對象主要用于在服務端的某一個端口進行,并接受客戶端的連接著,通過ChannelInitializer對象,初始化連接管道中用于處理數(shù)據的各種編器和業(yè)務邏輯處理器。比如這里,我們就需要添加為了處理WebSocket協(xié)議相關的編器,還要添加服務端接收到客戶端發(fā)送的消息的業(yè)務邏輯處理器,并且還加上了用于idle超時管理的處理器最后,把這個管道處理器鏈掛到ServerBootstrap,再通過bindsync法,啟ServerBootstrap的端口進行就可以了消息收發(fā)建立好WebSocket長連接后,我們再來看一下最的消息收發(fā)是怎么處理的剛才講到,客戶端發(fā)送消息的功能,在實現(xiàn)上其實比較簡單。我們只需要通過對象的send方法,就可以把消息通過長連接發(fā)送到服務端那么,下面我們就來看一下服務端接收到消息后的邏輯處的代碼邏輯在WebSocketRouterHandler個處理器中,消息接收處理的相關代碼代 2protectedvoidchannelRead0(ChannelHandlerContextctx,WebSocketFrameframe)throws3如果是文本類型的WebSocket數(shù)4(frameinstanceofTextWebSocketFrame)5//先解析出具體的6Stringmsg=((TextWebSocketFrame)7//再用JSON來對這些數(shù)據內容進8JSONObjectmsgJson=9inttype=JSONObjectdata=longsenderUid=longrecipientUid=Stringcontent=intmsgType=//調用業(yè)務層的Service來進行真正的發(fā)消息MessageVOmessageContent=messageService.sendNewMsg(senderUid,recipientUid,(messageContent!=null)JSONObjectjsonObject=newjsonObject.put("type",26
jsonObject.put("data",JSONObject.toJSON(messageContent));ctx.writeAndFlush(newTextWebSocketFrame(JSONObject.toJSONStrin}}這里的WebSocketRouterHandler,我們也是采用機制來實現(xiàn)。由于這里需要處理“接收到”的消息,所以我們只需要實現(xiàn)channelRead0方法就可以。面的管道處理器鏈中,因為添加了WebSocket相關的編器,所以這里WebSocketRouterHandler接收到的都是WebSocketFrame格式的數(shù)據接下來,我們從WebSocketFrame式的數(shù)據中,解析出文本類型的收發(fā)雙方UID發(fā)最后,把需要返回給消息發(fā)送方的客戶端的信息,再通過writeAndFlush法寫回去,就不過,以上的代碼只是處理消息的發(fā)送,那么針對消息下推的邏輯處理又是如何實現(xiàn)的呢因此,我們可以在messageService.sendNewMsg方法中,等待消息、未讀變更都完你可以參考下面的代碼代1privatestaticfinalConcurrentHashMap<Long,Channel>userChannel=new2 protectedvoidchannelRead0(ChannelHandlerContextctx,WebSocketFrameframe)5//處理上線請6longloginUid=7userChannel.put(loginUid,8}9publicvoidpushMsg(longrecipientUid,JSONObjectmessage)14
Channelchannel=if(channel!=null&&channel.isActive()&&{channel.writeAndFlush(new}首先,我們在處理用戶建連上線的請求時,會先在網關機內存記錄一個“當前連接用戶和對代publicclassNewMessageListenerimplementsMessageListenerpublicvoidonMessage(Messagemessage,byte[]pattern)Stringtopic=//從訂閱到的Redis的消息里解析出真正需要的業(yè)務數(shù)StringjsonMsg=("MessageReceived-->pattern:{},topic:{},message:{}",newJSONObjectmsgJson=//解析出消息接收人的longotherUid=JSONObjectpushJson=newpushJson.put("type",pushJson.put("data",//最終調用網關層處理器將消息真正websocketRouterHandler.pushMsg(otherUid, 19publicMessageVOsendNewMsg(longsenderUid,longrecipientUid,Stringcontent,int//先對發(fā)送消息進行、加未讀等操//然后將待推送消息發(fā)布到redisTemte.convertAndSend(Constants.WEBSOCKET_MSG_TOPIC,然后,我們可以基于Redis的發(fā)布/訂閱,實現(xiàn)一個消息推送的發(fā)布訂閱在業(yè)務層進行發(fā)送消息邏輯處理的最后,會將這條消息發(fā)布到Redis的一個Topic中,這個Topic被NewMessageListener一直著,如果有消息發(fā)布,那么器會馬上感知到,然后再將消息提交給WebSocketRouterHandler,來進行最終消息的下推。消息推送的我在“04|ACK機制:如何保證消息的可靠投遞?”中有講到,當系統(tǒng)有消息下推后,我們會依賴客戶端響應的ACK包,來保證消息推送的可靠性。如果消息下推后一段時間,服務端沒有收到客戶端的ACK包,那么服務端會認為這條消息沒有正常投遞下去,就會觸發(fā)關于ACK機制相應的服務端代碼,你可以參考下面的示代publicvoidpushMsg(longrecipientUid,JSONObjectmessage)channel.writeAndFlush(new//消息推送下去后,將這條消息加入到待ACK列表addMsgToAckBuffer(channel,5publicvoidaddMsgToAckBuffer(Channelchannel,JSONObjectmsgJson)nonAcked.put(msgJson.getLong("tid"),//定時器針對下推的這條消息在5s后進行"是否ACK"executorService.schedule(()->if(channel.isActive())//檢查是否被ACK,如果沒有收到ACK回包,會checkAndResend(channel, },5000,15longtid=privatevoidcheckAndResend(Channelchannel,JSONObjectmsgJson)longtid=//重推2inttryTimes=while(tryTimes>0)if(nonAcked.containsKey(tid)&&tryTimes>0)channel.writeAndFlush(newtry}catch(InterruptedExceptione) tryTimes-- 33用戶在上線完成后,服務端會在這個連接維度的里,初始化一個起始值為0的序(tid),每當有消息推送給客戶端時,服務端會針對這個序號進行加1操作,下推消息時消息推送后,服務端會將當前消息加入到一個“待ACKBuffer”中,這個ACKBuffer的實現(xiàn),我們可以簡單地用一個ConcurrentHashMap來實現(xiàn),Key就是這條消息攜帶的序號,Value是消息本身。當消息加入到這個“待ACKBuffer”時,服務端會同時創(chuàng)建一個定時器,在一定的時間后,會觸發(fā)“檢查當前消息是否被ACK”的邏輯;如果客戶端有回ACK,那么服務端就會從這個“待ACKBuffer”中移除這條消息,否則如果這條消息沒有被ACK,那么就會觸發(fā)應用層在了解了如何通過WebSocket長連接,來完成最的消息收發(fā)功能之后,我們再來看應用層心跳的作用,我在第8“智能心跳機制:解決網絡的不確定性”中也有講到過,代//每2分鐘發(fā)送一次心跳包,接收到消息或者服務端的響應又會重置varheartBeat= timeout:timeoutObj:serverTimeoutObj:reset:function() start:function()varself=this.timeoutObj=setTimeout(function()varsender_id=varsendMsgJson='{"type":0,"data":{"uid":'+sender_id+self.serverTimeoutObj=setTimeout(function()$("#ws_status").text("失去連接},},23客戶端通過一個定時器,每2分鐘通過長連接給服務端發(fā)送一次心跳包,如果在2分鐘內接收到服務端的消息或者響應,那么客戶端的下次2分鐘定時器的計時,會進行重置,重新計算;如果發(fā)送的心跳包在2分鐘后沒有收到服務端的響應,客戶端會斷開當前代protectedvoidchannelRead0(ChannelHandlerContextctx,WebSocketFrameframe)throwslonguid=longtimeout=("[heart
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網頁內容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
- 4. 未經權益所有人同意不得將文件中的內容挪作商業(yè)或盈利用途。
- 5. 人人文庫網僅提供信息存儲空間,僅對用戶上傳內容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內容本身不做任何修改或編輯,并不能對任何下載內容負責。
- 6. 下載文件中如有侵權或不適當內容,請與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 廠區(qū)道路橫平豎直施工方案
- 湖南舊鋼煙囪防腐施工方案
- 帶視頻的數(shù)學試卷
- 電纜線下作業(yè)施工方案
- 杭州日式屋頂花園施工方案
- 數(shù)控加工工藝與編程技術基礎 教案 模塊二 項目三 自動編程(3-4)
- 智能制造與傳統(tǒng)制造的區(qū)別
- 石油化工靜電接地的接地網設計
- 健全公共衛(wèi)生體系的策略及實施路徑
- 環(huán)保與可持續(xù)發(fā)展在新型城鎮(zhèn)化中的作用
- DB37∕T 5107-2018 城鎮(zhèn)排水管道檢測與評估技術規(guī)程
- 2022新冠疫苗疑似預防接種異常反應監(jiān)測和處置方案
- 電磁學第三版趙凱華答案
- 酒精溶液體積濃度、質量濃度與密度對照表
- 主要腸內營養(yǎng)制劑成分比較
- 老年人各系統(tǒng)的老化改變
- 小學五年級綜合實踐課教案
- 煤礦井下供電常用計算公式及系數(shù)
- ISO14001:2015中文版(20211205141421)
- 汽車總裝車間板鏈輸送線的應用研究
- 工作日志模板
評論
0/150
提交評論