版權(quán)說(shuō)明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡(jiǎn)介
1、UDP傳輸工具類(serverclient)UDP不適合傳輸大數(shù)據(jù),所以傳輸要盡量小。UDP傳輸中可能會(huì)丟包,如果需要可能多次發(fā)送同一個(gè)包保證包能安全到達(dá);接收端可以對(duì)收到的包進(jìn)行CRC校驗(yàn),javaviewplaincopypackageorg.sl.udp.beans;.DatagramPacket;/* 處理udp請(qǐng)求的接口* authorshanl*/publicinterfaceIUdpRequestHandler/*解析請(qǐng)求數(shù)據(jù)包*paramrequestPack*/voidparse(DatagramPacketrequestPack);javaviewplaincopypack
2、ageorg.sl.udp.beans;importjava.io.ByteArrayInputStream;importjava.io.ByteArrayOutputStream;importjava.io.ObjectInputStream;importjava.io.ObjectOutputStream;importjava.io.PrintStream;importjava.util.Arrays;importjava.util.Map;importjava.util.Properties;importjava.util.Set;* 傳輸包格式<br/>* 具備頭校驗(yàn)功能。
3、* authorshanl* /publicclassPacketFormat/*請(qǐng)求頭,主要用于校驗(yàn)*/publicstaticfinalbyteREQUEST_HEADER=S,H,A,N;privatePropertiesprop=null;/*構(gòu)造一個(gè)用于發(fā)送請(qǐng)求的包結(jié)構(gòu)*/publicPacketFormat()p=newProperties();/*解析并從請(qǐng)求中加載數(shù)據(jù)* parambuff數(shù)據(jù)包* paramoffset偏移量* paramlen長(zhǎng)度* returntrue:解析成功false:解析失敗*/publicbooleanparse(bytebuff,
4、intoffset,intlen)booleandone=false;ByteArrayInputStreamdataCache=null;ObjectInputStreamobjectIn=null;ByteArrayInputStreampropCache=null;trydataCache=newByteArrayInputStream(buff,offset,len);byted_header=newbyteREQUEST_HEADER.length;dataCache.read(d_header);/校驗(yàn)請(qǐng)求頭if(!Arrays.equals(d_header,REQUEST_HE
5、ADER)returnfalse;objectIn=newObjectInputStream(dataCache);/得到數(shù)據(jù)長(zhǎng)度shortdataLen=objectIn.readShort();bytepropBys=newbytedataLen;objectIn.read(propBys);propCache=newByteArrayInputStream(propBys);/加載數(shù)據(jù)p.load(propCache);done=true;catch(Exceptione)done=false;finallytryif(null!=propCache)propCache.
6、close();catch(Exceptionex)tryif(null!=objectIn)objectIn.close();catch(Exceptionex)tryif(null!=dataCache)dataCache.close();catch(Exceptionex)returndone;/*設(shè)置數(shù)據(jù)*paramkey*paramvalue*/publicvoidsetProperty(Stringkey,Stringvalue)p.setProperty(key,value);/*取數(shù)據(jù)*paramkey*return*/publicStringgetProper
7、ty(Stringkey)p.getProperty(key,);/*返回keyset*return*/publicSet<Object>keySet()p.keySet();/*將內(nèi)容轉(zhuǎn)換為byte數(shù)組*return*/publicbytetoBytes()bytedataCacheBys=null;ByteArrayOutputStreamdataCache=null;ObjectOutputStreamdataOut=null;StringBuilderpropContent=newStringBuilder();byt
8、epropBys=null;Set<Map.Entry<Object,Object>>items=p.entrySet();for(Map.Entry<Object,Object>i:items)propContent.append(String)i.getKey();propContent.append(=);propContent.append(String)i.getValue();propContent.append(n);propBys=propContent.toString().getBytes();trydataCache=n
9、ewByteArrayOutputStream();dataCache.write(REQUEST_HEADER);dataOut=newObjectOutputStream(dataCache);/寫(xiě)入數(shù)據(jù)長(zhǎng)度dataOut.writeShort(propBys.length);/寫(xiě)入數(shù)據(jù)dataOut.write(propBys,0,propBys.length);dataOut.flush();dataCacheBys=dataCache.toByteArray();catch(Exceptionex)thrownewRuntimeException(ex);finallytryif(n
10、ull!=dataOut)dataOut.close();catch(Exceptionex)if(null!=dataCache)dataCache.close();catch(Exceptionex)returndataCacheBys;/* 得到請(qǐng)求數(shù)據(jù)* parambuff數(shù)據(jù)緩存,緩存大小>=128* paramoffset偏移量* return包長(zhǎng)度=請(qǐng)求頭長(zhǎng)度+2+數(shù)據(jù)* /publicintgetBytes(bytebuff,intoffset)intlen=0;/ByteArrayOutputStreampropCache=null;ByteArrayOutputStr
11、eamdataCache=null;ObjectOutputStreamdataOut=null;StringBuilderpropContent=newStringBuilder();bytepropBys=null;/propCache=newByteArrayOutputStream(buff.length-REQUEST_HEADER.length-2);/p.list(newPrintStream(propCache);/propBys=propCache.toByteArray();/catch(Exceptionex)/thrownewRuntimeExcepti
12、on(ex);/finally/try/if(null!=propCache)propCache.close();/catch(Exceptionex)/這段代碼與上面這段功能相同,但Properties的list()會(huì)寫(xiě)入額外的字節(jié)Set<Map.Entry<Object,Object>>items=p.entrySet();for(Map.Entry<Object,Object>i:items)propContent.append(String)i.getKey();propContent.append(=);propContent.ap
13、pend(String)i.getValue();propContent.append(n);propBys=propContent.toString().getBytes();try/寫(xiě)入頭dataCache=newByteArrayOutputStream(buff.length);dataCache.write(REQUEST_HEADER);len+=REQUEST_HEADER.length;dataOut=newObjectOutputStream(dataCache);/寫(xiě)入數(shù)據(jù)長(zhǎng)度dataOut.writeShort(propBys.length);len+=2;/寫(xiě)入數(shù)據(jù)da
14、taOut.write(propBys,0,propBys.length);dataOut.flush();bytedataCacheBys=dataCache.toByteArray();System.arraycopy(dataCacheBys,0,buff,offset,dataCacheBys.length);len+=dataCacheBys.length;catch(Exceptionex)thrownewRuntimeException(ex);finallytryif(null!=dataOut)dataOut.close();catch(Exceptionex)tryif(n
15、ull!=dataCache)dataCache.close();catch(Exceptionex)returnlen;/* 返回crc32校驗(yàn)碼* return* /longcrc32()CRC32crc32=newCRC32();crc32.update(toBytes();returncrc32.getValue();javaviewplaincopypackageorg.sl.udp.client;importjava.io.IOException;.DatagramPacket;.DatagramSocket;.SocketException;importjava.util.Ran
16、dom;importjava.util.concurrent.BlockingDeque;importjava.util.concurrent.LinkedBlockingDeque;/* UDP發(fā)送請(qǐng)求服務(wù)* authorshanl*/publicclassUDPSenderServiceprivateBlockingDeque<RequestObject>requestPool=null;privatelongintervalTime=200L;privateDatagramSocketudpSender=null;privatebooleanshutdown=true;pri
17、vateObjectlockObj=newObject();publicUDPSenderService()requestPool=newLinkedBlockingDeque<RequestObject>();/* parampoolSize請(qǐng)求緩沖池上限* paramintervalTime間隔時(shí)間* /publicUDPSenderService(intpoolSize,intintervalTime)requestPool=newLinkedBlockingDeque<RequestObject>(poolSize);ervalTime=inte
18、rvalTime;/*過(guò)程*/voidprocess()dolongnow=System.currentTimeMillis();trysynchronized(lockObj)lockObj.wait(intervalTime);if(!requestPool.isEmpty()send(now);catch(Exceptione)e.printStackTrace();while(!shutdown);/intc=0;privatevoidsend(longtime)throwsIOExceptionRequestObjectro=null;tryfor(inti=0;i<10;i+
19、)ro=requestPool.poll();/ro=requestPool.take();/ro=requestPool.pollFirst();if(null!=ro)if(ro.getTime()<=time)/System.out.println(c+);udpSender.send(ro.getDataPacket();elserequestPool.put(ro);/requestPool.offerLast(ro);elsebreak;catch(Exceptione)e.printStackTrace();* 重復(fù)多送發(fā)送一個(gè)udp請(qǐng)求* paramrequest請(qǐng)求包*
20、 paramrepeatCount重復(fù)次數(shù)* paraminterval重復(fù)發(fā)送請(qǐng)求包間隔* /publicvoidaddRepeatingRequest(DatagramPacketrequest,intrepeatCount,longinterval)addImmediateRequest(request);/timingRequest(System.currentTimeMillis(),request);bytereqData=newbyterequest.getLength()-request.getOffset();System.arraycopy(request.getData(
21、),request.getOffset(),reqData,request.getOffset(),request.getLength();longnow=System.currentTimeMillis();DatagramPacketdpClone=null;for(longi=0,nextTime=interval;i<repeatCount;i+,nextTime+=interval)dpClone=newDatagramPacket(reqData,request.getOffset(),request.getLength();dpClone.setSocketAddress(
22、request.getSocketAddress();addTimingRequest(now+nextTime,dpClone);/* 添加一個(gè)定時(shí)的請(qǐng)求,在一個(gè)近似的時(shí)間執(zhí)行發(fā)送.<br/>* 如果這個(gè)請(qǐng)求為過(guò)期的請(qǐng)求,則會(huì)在下一個(gè)時(shí)間被執(zhí)行.* paramtime* paramrequest* /publicvoidaddTimingRequest(longtime,DatagramPacketrequest)try/requestPool.offerLast(newRequestObject(time,request),300,TimeUnit.MILLISECONDS);
23、requestPool.put(newRequestObject(time,request);/requestPool.offerLast(newRequestObject(time,request);catch(Exceptione)e.printStackTrace();synchronized(lockObj)lockObj.notify();/try/Thread.sleep(1);/catch(InterruptedExceptione)/* 立即發(fā)送一個(gè)請(qǐng)求* paramrequest* /publicvoidaddImmediateRequest(DatagramPacketre
24、quest)udpSender.send(request);catch(Exceptionex)ex.printStackTrace();/*啟動(dòng)服務(wù)*/publicvoidstart()if(this.shutdown)tryif(null=udpSender)udpSender=newDatagramSocket();catch(SocketExceptione1)e1.printStackTrace();return;tryThreadt=newThread(newProcessService(),UDPSenderService-+newRandom().nextInt(999);t.
25、start();catch(Exceptione)thrownewRuntimeException(e.getMessage();this.shutdown=false;/*中止服務(wù)*/publicvoidshutdown()this.shutdown=true;tryThread.sleep(1000*10);catch(Exceptionex)trythis.udpSender.disconnect();catch(Exceptionex)trythis.udpSender.close();catch(Exceptionex)publicvoidsetDatagramSocket(Data
26、gramSocketsender)this.udpSender=sender;privateclassProcessServiceimplementsRunnablepublicvoidrun()process();staticclassRequestObjectprivateLongtime;privateDatagramPacketdataPacket=null;publicRequestObject(DatagramPacketdataPacket)this.time=System.currentTimeMillis();publicRequestObject(Longtime,Data
27、gramPacketdataPacket)this.time=time;this.dataPacket=dataPacket;publicLonggetTime()returntime;publicDatagramPacketgetDataPacket()returndataPacket;javaviewplaincopypackageorg.sl.udp.server;.DatagramPacket;.DatagramSocket;.InetSocketAddress;importorg.sl.udp.beans.IUdpRequestHandler;/* udp接收器* authorsha
28、nl* /publicclassUDPReceptorimplementsRunnableprivateStringhostname=localhost;privateintport=7777;privateintrecePacketSize=512;privateIUdpRequestHandlerrequestHandler=null;/*過(guò)程*/publicvoidrun()DatagramSocketudpRece=null;DatagramPacketdataPack=null;bytebuff=null;tryudpRece=newDatagramSocket(newInetSoc
29、ketAddress(this.hostname,this.port);udpRece.setReceiveBufferSize(this.recePacketSize);for(;)buff=newbytethis.recePacketSize;dataPack=newDatagramPacket(buff,this.recePacketSize);udpRece.receive(dataPack);if(null!=requestHandler)requestHandler.parse(dataPack);catch(Exceptionex)ex.printStackTrace();/*注
30、入請(qǐng)求處理* paramrequestHandler請(qǐng)求處理*/publicvoidsetRequestHandler(IUdpRequestHandlerrequestHandler)this.requestHandler=requestHandler;/*設(shè)置接收包大小* paramudpPacketSize*/publicvoidsetRecePacketSize(intudpPacketSize)this.recePacketSize=udpPacketSize;publicvoidsetHostname(Stringhostname)this.hostname=hostname;pu
31、blicvoidsetPort(intport)this.port=port;javaviewplaincopypackageorg.sl.udp.demo;.DatagramPacket;importjava.util.Set;importjava.util.concurrent.ExecutorService;importorg.sl.udp.beans.PacketFormat;importorg.sl.udp.beans.IUdpRequestHandler;publicclassDemoMultiThreadUDPRequestHandlerImplimplementsIUdpReq
32、uestHandlerprivateExecutorServicethreadPool=null;publicvoidparse(DatagramPacketrequestPack)threadPool.execute(newHandlerService(requestPack);publicvoidsetThreadPool(ExecutorServicethreadPool)this.threadPool=threadPool;privatestaticclassHandlerServiceextendsThreadprivateDatagramPacketrequestPack=null
33、;publicHandlerService(DatagramPacketrequestPack)this.requestPack=requestPack;publicvoidrun()tryPacketFormatreqPack=newPacketFormat();if(!reqPack.parse(requestPack.getData(),requestPack.getOffset(),requestPack.getLength()return;Set<Object>keyset=reqPack.keySet();System.out.println(value:+reqPac
34、k.getProperty(value);System.out.println(=time:+System.currentTimeMillis();for(Objectkey:keyset)System.out.println(key+:+reqPack.getProperty(String)key);catch(Exceptionex)ex.printStackTrace();javaviewplaincopypackageorg.sl.udp.demo;.DatagramPacket;importjava.util.Set;importorg.sl.udp.beans.PacketForm
35、at;importorg.sl.udp.beans.IUdpRequestHandler;publicclassDemoUDPRequestHandlerImplimplementsIUdpRequestHandlerprivateDatagramPacketrequestPack=null;publicvoidparse(DatagramPacketrequestPack)this.requestPack=requestPack;process();privatevoidprocess()tryPacketFormatreqPack=newPacketFormat();if(!reqPack
36、.parse(requestPack.getData(),requestPack.getOffset(),requestPack.getLength()return;Set<Object>keyset=reqPack.keySet();synchronized(UDPReceptorDemo.iset)UDPReceptorDemo.iset.add(Integer.parseInt(reqPack.getProperty(value);/System.out.println(value:+reqPack.getProperty(value);/System.out.println
37、(=time:+System.currentTimeMillis();/for(Objectkey:keyset)/System.out.println(key+:+reqPack.getProperty(String)key);/catch(Exceptionex)ex.printStackTrace();javaviewplaincopypackageorg.sl.udp.demo;importjava.util.HashSet;importjava.util.Set;importjava.util.concurrent.Executors;importorg.sl.udp.beans.I
38、UdpRequestHandler;importorg.sl.udp.server.UDPReceptor;publicclassUDPReceptorDemopublicstaticvoidmain(Stringargs)/t1();t2();t3();publicstaticSet<Integer>iset=newHashSet<Integer>(10000);staticvoidt3()for(;)System.out.println(iset.size();tryThread.sleep(3000);catch(InterruptedExceptione)e.p
39、rintStackTrace();staticvoidt2()System.err.println(啟動(dòng)udp監(jiān)聽(tīng),端口7777.);IUdpRequestHandlerhandlerImpl=newDemoUDPRequestHandlerImpl();UDPReceptorrece=newUDPReceptor();rece.setRequestHandler(handlerImpl);rece.setHostname(3);rece.setPort(7777);Threadt=newThread(rece,udpRece_port7777);t.start();/*
40、已普通實(shí)例啟動(dòng)*/staticvoidt1()System.err.println(啟動(dòng)udp監(jiān)聽(tīng),端口7777.);DemoMultiThreadUDPRequestHandlerImplhandlerImpl=newDemoMultiThreadUDPRequestHandlerImpl();handlerImpl.setThreadPool(Executors.newFixedThreadPool(5);UDPReceptorrece=newUDPReceptor();rece.setRequestHandler(handlerImpl);rece.setHostname(192.168
41、.2.23);rece.setPort(7777);rece.run();javaviewplaincopypackageorg.sl.udp.demo;.DatagramPacket;.DatagramSocket;.InetSocketAddress;.SocketException;importjava.util.Random;importorg.sl.udp.beans.PacketFormat;importorg.sl.udp.client.UDPSenderService;publicclassUDPSenderServiceDemopublicstaticvoidmain(Str
42、ingargs)/t1();/t2();/t3();t4();staticvoidt4()UDPSenderServiceudpSenderService=newUDPSenderService();udpSenderService.start();System.err.println(啟動(dòng)udp發(fā)送服務(wù));Randomrand=newRandom();bytebuff=null;longtime=System.currentTimeMillis();DatagramPacketreq=null;PacketFormatpf=null;for(inti=0,endi=300;i<endi
43、;i+)/time=(rand.nextInt(100)+1)*100L+System.currentTimeMillis();pf=newPacketFormat();pf.setProperty(msg,test);pf.setProperty(msg,test);pf.setProperty(type,cpu);pf.setProperty(value,+i);buff=newbyte512;intpackLen=pf.getBytes(buff,0);req=newDatagramPacket(buff,0,packLen);/System.out.println(value:+pf.
44、getProperty(value);req.setSocketAddress(newInetSocketAddress(3,7777);/有一定幾率丟包/for(intn=0;n<3;n+)/udpSenderService.addImmediateRequest(req);/多次重發(fā)保證可靠性/udpSenderService.addRepeatingRequest(req,2,1L);/有一定幾率丟包udpSenderService.addTimingRequest(time,req);/try/if(i%10=0)/Thread.sleep(3);/rand
45、.setSeed(time);/catch(Exceptione)/tryThread.sleep(1000*60);catch(Exceptionex)udpSenderService.shutdown();System.err.println(udp服務(wù)關(guān)閉.);staticvoidt3()UDPSenderServiceudpSenderService=newUDPSenderService();udpSenderService.start();System.err.println(啟動(dòng)udp發(fā)送服務(wù));Randomrand=newRandom();bytebuff=newbyte512
46、;longtime=0;for(inti=0,endi=3*100*100;i<endi;i+)PacketFormatpf=newPacketFormat();pf.setProperty(msg,test);pf.setProperty(msg,test);pf.setProperty(type,cpu);pf.setProperty(value,33.05);intpackLen=pf.getBytes(buff,0);DatagramPacketdata=newDatagramPacket(buff,0,packLen);data.setSocketAddress(newInet
47、SocketAddress(3,7777);time=(rand.nextInt(4)+1)*1000L+System.currentTimeMillis();/udpSenderService.timingRequest(time,data);udpSenderService.addTimingRequest(time,data);tryif(i%50=0)Thread.sleep(1);rand.setSeed(time);catch(Exceptione)tryThread.sleep(1000*10);catch(Exceptionex)udpSenderServ
48、ice.shutdown();System.err.println(udp服務(wù)關(guān)閉.);staticvoidt2()UDPSenderServiceudpSenderService=newUDPSenderService();try/InetSocketAddressserviceAddress=newInetSocketAddress(localhost,7777);/DatagramSocketudpSocket=newDatagramSocket(serviceAddress);/udpSenderService.setDatagramSocket(udpSocket);udpSenderService.start();System.err.printl
溫馨提示
- 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。
最新文檔
- 十六橋課件教學(xué)課件
- 04品牌授權(quán)塔吊品牌授權(quán)使用合同
- 2024年度汽車(chē)租賃與售后服務(wù)合同
- 2024年度道路照明工程燈具維修勞務(wù)分包合同
- 2024年屋面瓦鋪設(shè)工程項(xiàng)目合同
- 2024家庭裝飾裝修的合同模板
- 2024年度衛(wèi)星導(dǎo)航系統(tǒng)應(yīng)用合作協(xié)議
- 2024年度軟件開(kāi)發(fā)與測(cè)試合同
- 2024年度知識(shí)產(chǎn)權(quán)許可合同.do
- 2024年度物流配送服務(wù)承包商的選取協(xié)議
- 醫(yī)師定期考核表格參考模板
- 英語(yǔ)人教版三年級(jí)上冊(cè)(教具)動(dòng)物圖卡
- 泥水平衡頂管施工方案
- 民辦非企業(yè)單位(法人)登記申請(qǐng)表08669
- 霍蘭德人格六角形模型(共享內(nèi)容)
- 寶鋼中央研究院創(chuàng)新戰(zhàn)略與運(yùn)行機(jī)制研究
- 建筑CAD測(cè)試多選題
- 支座鑄造工藝設(shè)計(jì)
- 2022年學(xué)校禁毒工作計(jì)劃
- GB-T-30512-2014-汽車(chē)禁用物質(zhì)要求
- 生物相容性試驗(yàn)檢測(cè)報(bào)告
評(píng)論
0/150
提交評(píng)論