博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Webapi实现websocket实时通讯
阅读量:5327 次
发布时间:2019-06-14

本文共 10406 字,大约阅读时间需要 34 分钟。

  应用场景:前端页面发起一个websocket请求与后端进行实时通讯。后端监听某端口获取数据,将监听到的数据加工处理,通过websocket发送到前端。

  这里只提供后台的处理方案仅供参考。

  1.后端监听某端口,获取数据并数据处理。可以在Global中单独开启一个后台线程用来监听数据。数据处理交给datawatcher的单例对象来处理。由于是监控端口的工作,一般采用独立线程在项目启动的时候就进行监听,因此可以将代码放在Application_start中。
  2.datawatcher对象,它需要有个委托队列,允许外部进行注册和删除。原理同事件类似,但是采用委托队列并将其暴露主要是为了容错,因为可能会用到删除委托队列中的委托。严谨的事务逻辑应该用事件(事件有点久了,忘了事件怎么写了)
  3.webapi建立websocket。

  以下为代码部分:

      protected void Application_Start()        {     //webapi相关配置            AreaRegistration.RegisterAllAreas();            GlobalConfiguration.Configure(WebApiConfig.Register);            FilterConfig.RegisterGlobalFilters(GlobalFilters.Filters);            RouteConfig.RegisterRoutes(RouteTable.Routes);            BundleConfig.RegisterBundles(BundleTable.Bundles);           //todo 初始化datawatcher 监听实时数据             //伪造实时数据            Thread bgthread = new Thread(Start);            bgthread.IsBackground = true;            bgthread.Start();        }    //除红色部分均为模拟实时数据,正常数据应是通过监听获取        private void Start()        {            Random r = new Random();            var datawatcher = DataWatcher.GetInit();            while (true)            {                dynamic dyobj = new ExpandoObject();                dyobj.stationid = "01";                dyobj.value = new { at = DateTime.Now.ToTimeStampMS(), value = (r.Next(0, 1024)) }.ToJson();                datawatcher.Updata(dyobj, CallBackType.Record);                Thread.Sleep(1000);            }        }
//使用单例是为了确保项目中使用的是同一个对象,防止注册的callback丢失 public class DataWatcher    {        private static DataWatcher _dataWatcher;        static readonly object locker = new object();        private DataWatcher()        {            CallBackAlarms = new List
(); CallBackRecords = new List
(); CallBackSpes = new List
(); } public static DataWatcher GetInit() { if (_dataWatcher == null) { lock (locker) { if (_dataWatcher == null) { _dataWatcher = new DataWatcher(); } } } return _dataWatcher; } public WebSocket WebSocket { get; set; } public List
CallBackSpes; public List
CallBackRecords; public List
CallBackAlarms; ///
/// 更新数据 /// ///
///
public void Updata(dynamic dyobj, CallBackType type) { switch (type) { case CallBackType.Spe: if (CallBackSpes.Count > 0) { foreach (var callback in CallBackSpes) { callback.Invoke(dyobj, WebSocket); } } //CallBackSpes?.Invoke(dyobj, WebSocket); break; case CallBackType.Record: if (CallBackRecords.Count > 0) { foreach (var callback in CallBackRecords) { callback.Invoke(dyobj, WebSocket); } } //CallBackRecords?.Invoke(dyobj, WebSocket); break; case CallBackType.Alarm: if (CallBackAlarms.Count > 0) { foreach (var callback in CallBackAlarms) { callback.Invoke(dyobj, WebSocket); } } //CallBackAlarms?.Invoke(dyobj, WebSocket); break; } } } public enum CallBackType { Spe, Record, Alarm } public delegate void DynamicDelegate(dynamic dynamic, WebSocket ws);
   public class subscribeController : ApiController    {        private DataWatcher dataWatcher;        public subscribeController()        {            dataWatcher = DataWatcher.GetInit();//获取Datawatcher实例            if (!dataWatcher.CallBackRecords.Contains(RealtimeDataCallBack))            {                dataWatcher.CallBackRecords.Add(RealtimeDataCallBack);          //注册委托,若无特殊的场景需求可以将委托列表封装为事件event            }        }        public HttpResponseMessage Get()        {       //对原请求返回101变更协议状态码,开启一个异步线程通过websocket进行数据传输            if (HttpContext.Current.IsWebSocketRequest)            {                HttpContext.Current.AcceptWebSocketRequest(CreateWS);            }            return new HttpResponseMessage(HttpStatusCode.SwitchingProtocols);        }        private async Task CreateWS(AspNetWebSocketContext aspNetWebSocketContext)        {            var socket = aspNetWebSocketContext.WebSocket;            dataWatcher.WebSocket = socket; //将上下文中的websocket注入到Datawatcher中            //防止后台线程将socket释放            while (true)            {                if (socket.State != WebSocketState.Open)                {                    break;                }            }        }        private void WriteJson(string res, WebSocket socket)        {            lock (this)            {                res = res.Replace("\\", "").Replace("\"{
", "{
").Replace("\"}", "}").Replace("\"[", "["); ArraySegment
bytes = new ArraySegment
(Encoding.UTF8.GetBytes(res)); if (socket != null && socket.State == WebSocketState.Open) { socket.SendAsync(bytes, WebSocketMessageType.Text, true, CancellationToken.None); Thread.Sleep(1000); } } } private void RealtimeDataCallBack(dynamic dyobj, WebSocket ws) { var templist = new List
(); templist.Add(dyobj.value); var temp = new Dictionary
(); temp.Add(dyobj.stationid, new { doseRateNaI = templist.toListString() }.ToJson()); var res = new { datastreamUpdate = temp.ToJson() }.ToJson(); WriteJson(res, ws); } }

  Datawatcher的设计是作为一个中间人的角色,既要提供调用回调的方法,也要提供websocket 的注入接口。考虑到Datawatcher是单例,故采用属性的方式为websocket提供外部访问(此处的设计有待商榷)。在外部注入了websocket之后,在合适的位置进行回调。注意:数据的处理是在监听时进行一次封装处理,在进行回调的时候再进行一次封装处理,如果类型是固定统一的可以只在监听的时候进行封装处理即可。同时应注意对回调方法中websocket的空判断增强健壮性。

  以上代码为提供思路的简易demo,使用时需要根据自己场景需求相应修改。

------------------------------------------------------------------------------分隔符----------------------------------------------------------------------------

存疑1:webapi中的controller是否是单例?(当然Mvc等的控制器也是同问)

           如果是单例,Datawatcher的委托注册可以在控制器的构造方法中注册(前提是允许访问公有的构造方法);如果不是单例,可以在post或者get中注册委托,此时一定要判断委托是否被注册过,避免重复注册。这是采用委托列表,而不用事件的一个原因。事件的内部委托链表是不允许外部直接访问的,因此判断是否注册不是很方便。如果事件允许不判断,直接注册或注销委托不会抛出异常,即如果已经注册不会重复注册,保证委托唯一性,如果已经注销,再次注销不会抛出异常,那么Datawatcher中的设计应该采用事件,确保封装的安全性。(由于未对事件做深入了解,故此处存疑暂时保留)

释疑1:经测试,controller不是单例(起码webapi中不是)。测试方法,给控制器添加一个属性和一个公有构造方法,构造方法给属性赋一个初始值。写一个访问方法,每次给属性加1,并返回。通过页面进行访问,查看api返回结果。或者打一个断点在构造方法中,通过页面访问。

经测试,事件event中的委托链表在注销未注册的委托方法的时候不会抛出异常。注销的时候,会在委托链表中查找对应的方法,如果找到则注销掉,如果未找到也不会抛出异常。

参考资料:

存疑2:websocket的async Task 为什么使用死循环?

释疑2:websocket不是持续的,其中的代码只执行一次,为了保持实时性,他的持续需要一个死循环。循环的作用只在于保持连接不断开,真正的数据传输则是由Datawatcher来处理。每监听到一条数据,会调用一次Datawatcher的update,这个时候如果websocket是打开的,数据则会被发送,如果此时socket为空或者关闭,则放弃发送或者插入缓存中等下次打开连接再发送,这部分的处理则需要根据业务需求来判断。

强调:以上demo只提供思路,确认这种处理是可行的。关于存疑部分的思考,我会尽快验证。

 ------------------------------------------------------------------------------最终代码-------------------------------------------------------------------------------------------

public class DataWatcher    {        private static DataWatcher _dataWatcher;        static readonly object locker = new object();        private DataWatcher()        {        }        public static DataWatcher GetInit()        {            if (_dataWatcher == null)            {                lock (locker)                {                    if (_dataWatcher == null)                    {                        _dataWatcher = new DataWatcher();                    }                }            }            return _dataWatcher;        }        public WebSocket WebSocket { get; set; }     //委托列表被事件替代        public event DynamicDelegate CallbackRecoedEvent;        public event DynamicDelegate CallbackAlarmEvent;        public event DynamicDelegate CallbackSpeEvent;        ///         /// 更新谱图        ///         ///         ///         public void Update(dynamic dyobj, CallBackType type)        {            switch (type)            {                case CallBackType.Spe:                  CallbackSpeEvent?.Invoke(dyobj,WebSocket);                    break;                case CallBackType.Record:                   CallbackRecoedEvent?.Invoke(dyobj,WebSocket);                    break;                case CallBackType.Alarm:                   CallbackAlarmEvent?.Invoke(dyobj,WebSocket);                    break;            }        }           }
public class subscribeController : ApiController    {        public HttpResponseMessage Get()        {            if (HttpContext.Current.IsWebSocketRequest)            {                HttpContext.Current.AcceptWebSocketRequest(CreateWS);            }            return new HttpResponseMessage(HttpStatusCode.SwitchingProtocols);        }        private async Task CreateWS(AspNetWebSocketContext aspNetWebSocketContext)        {            var socket = aspNetWebSocketContext.WebSocket;            var dataWatcher = DataWatcher.GetInit();            dataWatcher.WebSocket = socket;            //            dataWatcher.CallbackRecoedEvent -= RealtimeDataCallBack;            dataWatcher.CallbackRecoedEvent += RealtimeDataCallBack;            //防止后台线程将socket释放            while (true)            {                if (socket.State != WebSocketState.Open)                {                    break;                }            }        }        private void WriteJson(string res, WebSocket socket)        {            lock (this)            {                res = res.Replace("\\", "").Replace("\"{
", "{
").Replace("\"}", "}").Replace("\"[", "["); ArraySegment
bytes = new ArraySegment
(Encoding.UTF8.GetBytes(res)); if (socket != null && socket.State == WebSocketState.Open) { socket.SendAsync(bytes, WebSocketMessageType.Text, true, CancellationToken.None); Thread.Sleep(1000); } } } private void RealtimeDataCallBack(dynamic dyobj, WebSocket ws) { var templist = new List
(); templist.Add(dyobj.value); var temp = new Dictionary
(); temp.Add(dyobj.stationid, new { doseRateNaI = templist.toListString() }.ToJson()); var res = new { datastreamUpdate = temp.ToJson() }.ToJson(); WriteJson(res, ws); } }

  由于确定了controller非单例,所以没有必要在构造中为Datawatcher注册事件,可以在其他合适的地方注册。

转载于:https://www.cnblogs.com/cnDqf/p/7263310.html

你可能感兴趣的文章
罗振宇“时间的朋友”跨年演讲:为做事的人服务 准确抓住小趋势
查看>>
nginx日志切割脚本
查看>>
洛谷 P1618 三连击(升级版)
查看>>
[容斥][dp][快速幂] Jzoj P5862 孤独
查看>>
Reflect反编译C#程序
查看>>
DSAPI 字符串和文件转Md5字符串
查看>>
Lucene 学习之二:数值类型的索引和范围查询分析
查看>>
软件开发工作模型
查看>>
20165301 2017-2018-2 《Java程序设计》第九周学习总结
查看>>
jquery验证图片类型与大小
查看>>
tomcat启动时出现了Failed to start component [StandardEngine[Catalina].StandardHost[localhost]]
查看>>
基础测试jmeter5.0+badboy(从小白到入门)
查看>>
Java基础之字符串匹配大全
查看>>
SGA和PGA的分配原则及更改大小
查看>>
面向对象
查看>>
lintcode83- Single Number II- midium
查看>>
HTML5学习笔记简明版(2):新元素之section,article,aside
查看>>
移动端 响应式、自适应、适配 实现方法分析(和其他基础知识拓展)
查看>>
我在使用Spring Gateway时遇到的一些坑
查看>>
谈谈分享邀请奖励机制(附iOS实现代码)
查看>>