C#实现数据采集系统-数据反写(2)消息内容处理和写入通信类队列
创始人
2024-09-25 07:28:08
0

C#实现数据采集系统-数据反写

实现步骤

  1. MQTT订阅,接收消息 链接-MQTT订阅接收消息
  2. 反写内容写入通信类,添加到写入队列中
  3. 实现Modbustcp通信写入

具体实现

2. 消息内容写入通信类,添加到写入队列中

在服务类DAqService中添加通信集合_modbusTcps用于存储每个设备的通信类,使用键值对Dictionary存储设备ID和通信类,用于快速查找

然后在启动的时候,订阅各个设备ID的写入主题,添加控制方法DeviceControl

    public class DAqService     {          public static string MainTopic = "DTSDAQ/";                    private Dictionary _modbusTcps;                    public DAqService(DAqOption option)         {             _modbusTcps = new Dictionary();            //...         }          ///          /// 启动服务         ///          public void Start()         {             MqttControllor = new MqttControllor(_option.MqttConfig);              foreach (var item in _deviceLinks)             {                 ModbusTcp modbusTcp = new ModbusTcp(item);                 modbusTcp.DoMonitor();                 modbusTcp.ValueUpdated += ModbusTcp_ValueUpdated;                 //将                 _modbusTcps.Add(item.UID, modbusTcp);                 MqttControllor.SubscribeTopic($"{MainTopic}{item.UID}/write", DeviceControl);             }              if (_serviceConfig.IsPushScheduled)             {                 timer.Start();             }         }     } 

实现消息订阅方法-设备控制DeviceControl

处理消息,将消息转换成对应点位和值,然后调用modbustcp的写入方法

  /// 设备控制,反写 ///  ///  ///  private void DeviceControl(string topic, string msg) {     var message = JsonSerializer.Deserialize(msg);     //如果消息不为null,则通过设备id找到对应的modbustcp对象,并写入值     if (message != null)     {         var link = _deviceLinks.FirstOrDefault(x => x.UID == message.DeviceId); //通过设备id找到对应的modbustcp对象         if (link != null)         {             var modbusTcp = _modbusTcps[link.UID]; //通过设备id找到对应的modbustcp对象             //循环消息中的数据对象,然后再设备link对象点位中找到对应的点位对象,并写入值             foreach (var item in message.Data)             {                 var point = link.Points.FirstOrDefault(x => x.UID == item.Key); //通过点位id找到对应的点位对象                 if (point != null)                 {                     var parseMethod = point.Type.GetMethod(                         "Parse",                         BindingFlags.Public | BindingFlags.Static,                         new[] { typeof(string) }                     );                     point.WriteValue = parseMethod.Invoke(                         null,                         new object[] { item.Value.ToString() }                     ); //通过点位id找到对应的点位对象                 }                 modbusTcp.Write(point);             }         }     } } 

在ModbusTcp通信类中,添加一个写入队列和写入方法,写入点位先添加在队列中,然后再读数据间隙中,实现写入

 public class ModbusTcp  {          ///           /// 写入队列          ///           private Queue _writeQueue = new Queue();                    ·          //写入值先加入一个队列           public void Write(RegisterPoint point)           {               _writeQueue.Enqueue(point);           }  } 

完整代码

public class DAqService {     public static string MainTopic = "DTSDAQ/";      private MqttControllor MqttControllor;     private Dictionary _modbusTcps;      private DAqOption _option;     private List _deviceLinks;     private ServiceConfig _serviceConfig;      private System.Timers.Timer timer;      public DAqService(DAqOption option)     {         _modbusTcps = new Dictionary();         _option = option;         _deviceLinks = option.DeviceLinks;         _serviceConfig = option.ServiceConfig;         timer = new System.Timers.Timer(_serviceConfig.PushTimeSpan * 1000);         timer.Elapsed += Timer_Elapsed;     }      ///      /// 启动服务     ///      public void Start()     {         MqttControllor = new MqttControllor(_option.MqttConfig);          foreach (var item in _deviceLinks)         {             ModbusTcp modbusTcp = new ModbusTcp(item);             modbusTcp.DoMonitor();             modbusTcp.ValueUpdated += ModbusTcp_ValueUpdated;             _modbusTcps.Add(item.UID, modbusTcp);             MqttControllor.SubscribeTopic($"{MainTopic}{item.UID}/write", DeviceControl);         }          if (_serviceConfig.IsPushScheduled)         {             timer.Start();         }     }      ///      /// 设备控制,反写     ///      ///      ///      private void DeviceControl(string topic, string msg)     {         var message = JsonSerializer.Deserialize(msg);         //如果消息不为null,则通过设备id找到对应的modbustcp对象,并写入值         if (message != null)         {             var link = _deviceLinks.FirstOrDefault(x => x.UID == message.DeviceId); //通过设备id找到对应的modbustcp对象             if (link != null)             {                 var modbusTcp = _modbusTcps[link.UID]; //通过设备id找到对应的modbustcp对象                 //循环消息中的数据对象,然后再设备link对象点位中找到对应的点位对象,并写入值                 foreach (var item in message.Data)                 {                     var point = link.Points.FirstOrDefault(x => x.UID == item.Key); //通过点位id找到对应的点位对象                     if (point != null)                     {                         var parseMethod = point.Type.GetMethod(                             "Parse",                             BindingFlags.Public | BindingFlags.Static,                             new[] { typeof(string) }                         );                         point.WriteValue = parseMethod.Invoke(                             null,                             new object[] { item.Value.ToString() }                         ); //通过点位id找到对应的点位对象                     }                     modbusTcp.Write(point);                 }             }         }     }      private void Timer_Elapsed(object? sender, ElapsedEventArgs e)     {         foreach (var link in _deviceLinks)         {             try             {                 DeviceMessage device = new DeviceMessage { DeviceId = link.UID };                 foreach (RegisterPoint point in link.Points)                 {                     // Console.WriteLine($"Point:{point.UID}-->Value:{point.Value}");                     device.Data.Add(point.UID, point.Value);                 }                 var data = JsonSerializer.Serialize(device);                 MqttControllor.Publish($"{MainTopic}{link.UID}/Time", data); //定时推送             }             catch (Exception ex)             {                 Console.WriteLine(ex.Message);             }         }     }      private void ModbusTcp_ValueUpdated(RegisterPoint point, object value)     {         if (_serviceConfig.IsPushChanged)         {             try             {                 DeviceMessage device = new DeviceMessage { DeviceId = point.DeviceId };                 device.Data.Add(point.UID, value);                 var data = JsonSerializer.Serialize(device);                 MqttControllor.Publish($"{MainTopic}{point.DeviceId}/Update", data); //采集立刻推送             }             catch (Exception ex)             {                 Console.WriteLine(ex.Message);             }         }          Console.WriteLine($"Point:{point.UID}-->Value:{value}");     } } 

相关内容

热门资讯

aapoker俱乐部(智星德州... aapoker俱乐部(智星德州扑克)wepoke德扑之星(透视辅助)总是真的有挂(有挂下载)-小红书...
一分钟掌握!微扑克有辅助插件(... 一分钟掌握!微扑克有辅助插件(黑科技)外挂透明挂辅助神器(2020已更新)(百度知乎)微扑克有辅助插...
透视辅助挂!德扑ai助手&qu... 透视辅助挂!德扑ai助手"wpk有"的确是有挂的(有挂代打)-知乎德扑ai助手软件透明挂微扑克wpk...
透明辅助(菠萝德州)外挂软件透... 透明辅助(菠萝德州)外挂软件透明插件(辅助挂)一贯真的有挂(2023已更新)(哔哩哔哩);1.菠萝德...
aapoker俱乐部(微扑克)... aapoker俱乐部(微扑克)德扑ai助手软件(黑科技)其实真的有挂(有挂打法)-百度1、任何微扑克...
黑科技辅助挂!德扑之星猫腻&q... 黑科技辅助挂!德扑之星猫腻"来玩德州app外挂"本来真的有挂(有挂私人房)-头条来玩德州app外挂辅...
3分钟普及!微扑克这软件有问题... 3分钟普及!微扑克这软件有问题(辅助挂)外挂透明挂辅助脚本(2024已更新)(知乎)1、完成微扑克这...
德州辅助(aApoker)外挂... 德州辅助(aApoker)外挂软件透明软件(智能ai代打)本来真的有挂(2021已更新)(知乎);1...
德扑之星有猫腻(德州竞技联盟)... 德扑之星有猫腻(德州竞技联盟)德州ai人工智能(透视)其实真的有挂(有挂俱乐部)-微博热搜1、让任何...
透视辅助!aapoker辅助工... 透视辅助!aapoker辅助工具"wpk辅助软件查得出来"果真真的有挂(有挂黑科技)-今日头条wpk...