C#实现数据采集系统-数据反写(2)消息内容处理和写入通信类队列
创始人
2024-09-25 07:28:08
0

C#实现数据采集系统-数据反写

实现步骤

  1. MQTT订阅,接收消息 链接-MQTT订阅接收消息
  2. 反写内容写入通信类,添加到写入队列中
  3. 实现Modbustcp通信写入

具体实现

2. 消息内容写入通信类,添加到写入队列中

在服务类DAqService中添加通信集合_modbusTcps用于存储每个设备的通信类,使用键值对Dictionary存储设备ID和通信类,用于快速查找

然后在启动的时候,订阅各个设备ID的写入主题,添加控制方法DeviceControl

    public class DAqService     {          public static string MainTopic = "DTSDAQ/";                    private Dictionary _modbusTcps;                    public DAqService(DAqOption option)         {             _modbusTcps = new Dictionary();            //...         }          ///          /// 启动服务         ///          public void Start()         {             MqttControllor = new MqttControllor(_option.MqttConfig);              foreach (var item in _deviceLinks)             {                 ModbusTcp modbusTcp = new ModbusTcp(item);                 modbusTcp.DoMonitor();                 modbusTcp.ValueUpdated += ModbusTcp_ValueUpdated;                 //将                 _modbusTcps.Add(item.UID, modbusTcp);                 MqttControllor.SubscribeTopic($"{MainTopic}{item.UID}/write", DeviceControl);             }              if (_serviceConfig.IsPushScheduled)             {                 timer.Start();             }         }     } 

实现消息订阅方法-设备控制DeviceControl

处理消息,将消息转换成对应点位和值,然后调用modbustcp的写入方法

  /// 设备控制,反写 ///  ///  ///  private void DeviceControl(string topic, string msg) {     var message = JsonSerializer.Deserialize(msg);     //如果消息不为null,则通过设备id找到对应的modbustcp对象,并写入值     if (message != null)     {         var link = _deviceLinks.FirstOrDefault(x => x.UID == message.DeviceId); //通过设备id找到对应的modbustcp对象         if (link != null)         {             var modbusTcp = _modbusTcps[link.UID]; //通过设备id找到对应的modbustcp对象             //循环消息中的数据对象,然后再设备link对象点位中找到对应的点位对象,并写入值             foreach (var item in message.Data)             {                 var point = link.Points.FirstOrDefault(x => x.UID == item.Key); //通过点位id找到对应的点位对象                 if (point != null)                 {                     var parseMethod = point.Type.GetMethod(                         "Parse",                         BindingFlags.Public | BindingFlags.Static,                         new[] { typeof(string) }                     );                     point.WriteValue = parseMethod.Invoke(                         null,                         new object[] { item.Value.ToString() }                     ); //通过点位id找到对应的点位对象                 }                 modbusTcp.Write(point);             }         }     } } 

在ModbusTcp通信类中,添加一个写入队列和写入方法,写入点位先添加在队列中,然后再读数据间隙中,实现写入

 public class ModbusTcp  {          ///           /// 写入队列          ///           private Queue _writeQueue = new Queue();                    ·          //写入值先加入一个队列           public void Write(RegisterPoint point)           {               _writeQueue.Enqueue(point);           }  } 

完整代码

public class DAqService {     public static string MainTopic = "DTSDAQ/";      private MqttControllor MqttControllor;     private Dictionary _modbusTcps;      private DAqOption _option;     private List _deviceLinks;     private ServiceConfig _serviceConfig;      private System.Timers.Timer timer;      public DAqService(DAqOption option)     {         _modbusTcps = new Dictionary();         _option = option;         _deviceLinks = option.DeviceLinks;         _serviceConfig = option.ServiceConfig;         timer = new System.Timers.Timer(_serviceConfig.PushTimeSpan * 1000);         timer.Elapsed += Timer_Elapsed;     }      ///      /// 启动服务     ///      public void Start()     {         MqttControllor = new MqttControllor(_option.MqttConfig);          foreach (var item in _deviceLinks)         {             ModbusTcp modbusTcp = new ModbusTcp(item);             modbusTcp.DoMonitor();             modbusTcp.ValueUpdated += ModbusTcp_ValueUpdated;             _modbusTcps.Add(item.UID, modbusTcp);             MqttControllor.SubscribeTopic($"{MainTopic}{item.UID}/write", DeviceControl);         }          if (_serviceConfig.IsPushScheduled)         {             timer.Start();         }     }      ///      /// 设备控制,反写     ///      ///      ///      private void DeviceControl(string topic, string msg)     {         var message = JsonSerializer.Deserialize(msg);         //如果消息不为null,则通过设备id找到对应的modbustcp对象,并写入值         if (message != null)         {             var link = _deviceLinks.FirstOrDefault(x => x.UID == message.DeviceId); //通过设备id找到对应的modbustcp对象             if (link != null)             {                 var modbusTcp = _modbusTcps[link.UID]; //通过设备id找到对应的modbustcp对象                 //循环消息中的数据对象,然后再设备link对象点位中找到对应的点位对象,并写入值                 foreach (var item in message.Data)                 {                     var point = link.Points.FirstOrDefault(x => x.UID == item.Key); //通过点位id找到对应的点位对象                     if (point != null)                     {                         var parseMethod = point.Type.GetMethod(                             "Parse",                             BindingFlags.Public | BindingFlags.Static,                             new[] { typeof(string) }                         );                         point.WriteValue = parseMethod.Invoke(                             null,                             new object[] { item.Value.ToString() }                         ); //通过点位id找到对应的点位对象                     }                     modbusTcp.Write(point);                 }             }         }     }      private void Timer_Elapsed(object? sender, ElapsedEventArgs e)     {         foreach (var link in _deviceLinks)         {             try             {                 DeviceMessage device = new DeviceMessage { DeviceId = link.UID };                 foreach (RegisterPoint point in link.Points)                 {                     // Console.WriteLine($"Point:{point.UID}-->Value:{point.Value}");                     device.Data.Add(point.UID, point.Value);                 }                 var data = JsonSerializer.Serialize(device);                 MqttControllor.Publish($"{MainTopic}{link.UID}/Time", data); //定时推送             }             catch (Exception ex)             {                 Console.WriteLine(ex.Message);             }         }     }      private void ModbusTcp_ValueUpdated(RegisterPoint point, object value)     {         if (_serviceConfig.IsPushChanged)         {             try             {                 DeviceMessage device = new DeviceMessage { DeviceId = point.DeviceId };                 device.Data.Add(point.UID, value);                 var data = JsonSerializer.Serialize(device);                 MqttControllor.Publish($"{MainTopic}{point.DeviceId}/Update", data); //采集立刻推送             }             catch (Exception ex)             {                 Console.WriteLine(ex.Message);             }         }          Console.WriteLine($"Point:{point.UID}-->Value:{value}");     } } 

相关内容

热门资讯

三分钟了解!德扑之星带入记分牌... 《德扑之星带入软件透明挂》是一款多人竞技的德扑之星带入辅助透视游戏,你将微扑克对手来到同一个战场,为...
4分钟了解!德州辅助软件(透视... 您好,德州这款游戏可以开挂的,确实是有挂的,需要了解加微【841106723】很多玩家在这款游戏中打...
3分钟了解!pokerx智能软... 您好,pokerx智能软件这款游戏可以开挂的,确实是有挂的,需要了解加微【757446909】很多玩...
玩家必备教程!线上德州后台可以... 玩家必备教程!线上德州后台可以操控,乐平中至麻将是有问题,可靠教程(有挂技巧)-哔哩哔哩;最新版20...
六分钟了解!wpk微扑克辅助(... 您好,微扑克辅助这款游戏可以开挂的,确实是有挂的,需要了解加微【757446909】很多玩家在这款游...
分享个大家!wpk透视辅助可测... 分享个大家!wpk透视辅助可测试,开元金花透明辅助,曝光教程(有挂技巧)-哔哩哔哩是一款可以让一直输...
五分钟了解!fishpoker... 五分钟了解!fishpoker扑克辅助(辅助挂)外挂透明挂辅助挂(2025已更新)(哔哩哔哩)是一款...
七分钟了解!aapoker设置... 七分钟了解!aapoker设置牌局(透视)外挂透明挂辅助下载(2020已更新)(哔哩哔哩)是一款可以...
一分钟揭秘!wepoke软件透... 一分钟揭秘!wepoke软件透明挂辅助,浙江宝宝游戏老是输,必备教程(有挂秘笈)-哔哩哔哩;超受欢迎...
三分钟了解!德扑之星的发牌是有... 《德扑之星软件透明挂》是一款多人竞技的德扑之星辅助透视游戏,你将微扑克对手来到同一个战场,为至高无上...