C#实现数据采集系统-数据反写(2)消息内容处理和写入通信类队列
创始人
2024-09-25 07:28:08
0

C#实现数据采集系统-数据反写

实现步骤

  1. MQTT订阅,接收消息 链接-MQTT订阅接收消息
  2. 反写内容写入通信类,添加到写入队列中
  3. 实现Modbustcp通信写入

具体实现

2. 消息内容写入通信类,添加到写入队列中

在服务类DAqService中添加通信集合_modbusTcps用于存储每个设备的通信类,使用键值对Dictionary存储设备ID和通信类,用于快速查找

然后在启动的时候,订阅各个设备ID的写入主题,添加控制方法DeviceControl

    public class DAqService     {          public static string MainTopic = "DTSDAQ/";                    private Dictionary _modbusTcps;                    public DAqService(DAqOption option)         {             _modbusTcps = new Dictionary();            //...         }          ///          /// 启动服务         ///          public void Start()         {             MqttControllor = new MqttControllor(_option.MqttConfig);              foreach (var item in _deviceLinks)             {                 ModbusTcp modbusTcp = new ModbusTcp(item);                 modbusTcp.DoMonitor();                 modbusTcp.ValueUpdated += ModbusTcp_ValueUpdated;                 //将                 _modbusTcps.Add(item.UID, modbusTcp);                 MqttControllor.SubscribeTopic($"{MainTopic}{item.UID}/write", DeviceControl);             }              if (_serviceConfig.IsPushScheduled)             {                 timer.Start();             }         }     } 

实现消息订阅方法-设备控制DeviceControl

处理消息,将消息转换成对应点位和值,然后调用modbustcp的写入方法

  /// 设备控制,反写 ///  ///  ///  private void DeviceControl(string topic, string msg) {     var message = JsonSerializer.Deserialize(msg);     //如果消息不为null,则通过设备id找到对应的modbustcp对象,并写入值     if (message != null)     {         var link = _deviceLinks.FirstOrDefault(x => x.UID == message.DeviceId); //通过设备id找到对应的modbustcp对象         if (link != null)         {             var modbusTcp = _modbusTcps[link.UID]; //通过设备id找到对应的modbustcp对象             //循环消息中的数据对象,然后再设备link对象点位中找到对应的点位对象,并写入值             foreach (var item in message.Data)             {                 var point = link.Points.FirstOrDefault(x => x.UID == item.Key); //通过点位id找到对应的点位对象                 if (point != null)                 {                     var parseMethod = point.Type.GetMethod(                         "Parse",                         BindingFlags.Public | BindingFlags.Static,                         new[] { typeof(string) }                     );                     point.WriteValue = parseMethod.Invoke(                         null,                         new object[] { item.Value.ToString() }                     ); //通过点位id找到对应的点位对象                 }                 modbusTcp.Write(point);             }         }     } } 

在ModbusTcp通信类中,添加一个写入队列和写入方法,写入点位先添加在队列中,然后再读数据间隙中,实现写入

 public class ModbusTcp  {          ///           /// 写入队列          ///           private Queue _writeQueue = new Queue();                    ·          //写入值先加入一个队列           public void Write(RegisterPoint point)           {               _writeQueue.Enqueue(point);           }  } 

完整代码

public class DAqService {     public static string MainTopic = "DTSDAQ/";      private MqttControllor MqttControllor;     private Dictionary _modbusTcps;      private DAqOption _option;     private List _deviceLinks;     private ServiceConfig _serviceConfig;      private System.Timers.Timer timer;      public DAqService(DAqOption option)     {         _modbusTcps = new Dictionary();         _option = option;         _deviceLinks = option.DeviceLinks;         _serviceConfig = option.ServiceConfig;         timer = new System.Timers.Timer(_serviceConfig.PushTimeSpan * 1000);         timer.Elapsed += Timer_Elapsed;     }      ///      /// 启动服务     ///      public void Start()     {         MqttControllor = new MqttControllor(_option.MqttConfig);          foreach (var item in _deviceLinks)         {             ModbusTcp modbusTcp = new ModbusTcp(item);             modbusTcp.DoMonitor();             modbusTcp.ValueUpdated += ModbusTcp_ValueUpdated;             _modbusTcps.Add(item.UID, modbusTcp);             MqttControllor.SubscribeTopic($"{MainTopic}{item.UID}/write", DeviceControl);         }          if (_serviceConfig.IsPushScheduled)         {             timer.Start();         }     }      ///      /// 设备控制,反写     ///      ///      ///      private void DeviceControl(string topic, string msg)     {         var message = JsonSerializer.Deserialize(msg);         //如果消息不为null,则通过设备id找到对应的modbustcp对象,并写入值         if (message != null)         {             var link = _deviceLinks.FirstOrDefault(x => x.UID == message.DeviceId); //通过设备id找到对应的modbustcp对象             if (link != null)             {                 var modbusTcp = _modbusTcps[link.UID]; //通过设备id找到对应的modbustcp对象                 //循环消息中的数据对象,然后再设备link对象点位中找到对应的点位对象,并写入值                 foreach (var item in message.Data)                 {                     var point = link.Points.FirstOrDefault(x => x.UID == item.Key); //通过点位id找到对应的点位对象                     if (point != null)                     {                         var parseMethod = point.Type.GetMethod(                             "Parse",                             BindingFlags.Public | BindingFlags.Static,                             new[] { typeof(string) }                         );                         point.WriteValue = parseMethod.Invoke(                             null,                             new object[] { item.Value.ToString() }                         ); //通过点位id找到对应的点位对象                     }                     modbusTcp.Write(point);                 }             }         }     }      private void Timer_Elapsed(object? sender, ElapsedEventArgs e)     {         foreach (var link in _deviceLinks)         {             try             {                 DeviceMessage device = new DeviceMessage { DeviceId = link.UID };                 foreach (RegisterPoint point in link.Points)                 {                     // Console.WriteLine($"Point:{point.UID}-->Value:{point.Value}");                     device.Data.Add(point.UID, point.Value);                 }                 var data = JsonSerializer.Serialize(device);                 MqttControllor.Publish($"{MainTopic}{link.UID}/Time", data); //定时推送             }             catch (Exception ex)             {                 Console.WriteLine(ex.Message);             }         }     }      private void ModbusTcp_ValueUpdated(RegisterPoint point, object value)     {         if (_serviceConfig.IsPushChanged)         {             try             {                 DeviceMessage device = new DeviceMessage { DeviceId = point.DeviceId };                 device.Data.Add(point.UID, value);                 var data = JsonSerializer.Serialize(device);                 MqttControllor.Publish($"{MainTopic}{point.DeviceId}/Update", data); //采集立刻推送             }             catch (Exception ex)             {                 Console.WriteLine(ex.Message);             }         }          Console.WriteLine($"Point:{point.UID}-->Value:{value}");     } } 

相关内容

热门资讯

程序员教你!(洛阳麻将)外挂辅... 程序员教你!(洛阳麻将)外挂辅助插件(辅助挂)外挂透视辅助识别(2020已更新)(哔哩哔哩);超受欢...
终于清楚!白金岛三打哈辅助器(... 终于清楚!白金岛三打哈辅助器(辅助挂)!外挂透明挂辅助助手(2021已更新)(哔哩哔哩)终于清楚!白...
关于!(德扑扑克)外挂透明挂辅... 亲,德扑扑克这款游戏可以开挂的,确实是有挂的,很多玩家在这款游戏中打牌都会发现很多用户的牌特别好,总...
玩家必看科普!(Wepoke控... 玩家必看科普!(Wepoke控制)外挂透明挂辅助助手(辅助透视)软件透明挂(2024已更新)(哔哩哔...
解密关于!((Wepokeap... 解密关于!((Wepokeapp))有挂确实有辅助挂,太过分了其实真的是有挂(2025已更新)(哔哩...
重要通知!(抚远麻将)外挂辅助... 《抚远麻将软件透明挂》是一款多人竞技的抚远麻将辅助透视游戏,你将微扑克对手来到同一个战场,为至高无上...
专业讨论!途游斗地主脚本(辅助... 专业讨论!途游斗地主脚本(辅助挂)!外挂辅助器计算器(2025已更新)(哔哩哔哩);详细途游斗地主脚...
技术分享!(Wepoke黑科技... 技术分享!(Wepoke黑科技)外挂透明挂辅助透视(透视)详细教程(2021已更新)(哔哩哔哩);小...
玩家必备科普!(微扑克智能)外... 玩家必备科普!(微扑克智能)外挂透明挂辅助测试(透视辅助)软件透明挂(2025已更新)(哔哩哔哩);...
玩家必看分享!((wepoKe... 玩家必看分享!((wepoKe))有挂确实有辅助挂,太嚣张了原来真的确实是有挂(2022已更新)(哔...