C#实现数据采集系统-数据反写(2)消息内容处理和写入通信类队列
创始人
2024-09-25 07:28:08
0

C#实现数据采集系统-数据反写

实现步骤

  1. MQTT订阅,接收消息 链接-MQTT订阅接收消息
  2. 反写内容写入通信类,添加到写入队列中
  3. 实现Modbustcp通信写入

具体实现

2. 消息内容写入通信类,添加到写入队列中

在服务类DAqService中添加通信集合_modbusTcps用于存储每个设备的通信类,使用键值对Dictionary存储设备ID和通信类,用于快速查找

然后在启动的时候,订阅各个设备ID的写入主题,添加控制方法DeviceControl

    public class DAqService     {          public static string MainTopic = "DTSDAQ/";                    private Dictionary _modbusTcps;                    public DAqService(DAqOption option)         {             _modbusTcps = new Dictionary();            //...         }          ///          /// 启动服务         ///          public void Start()         {             MqttControllor = new MqttControllor(_option.MqttConfig);              foreach (var item in _deviceLinks)             {                 ModbusTcp modbusTcp = new ModbusTcp(item);                 modbusTcp.DoMonitor();                 modbusTcp.ValueUpdated += ModbusTcp_ValueUpdated;                 //将                 _modbusTcps.Add(item.UID, modbusTcp);                 MqttControllor.SubscribeTopic($"{MainTopic}{item.UID}/write", DeviceControl);             }              if (_serviceConfig.IsPushScheduled)             {                 timer.Start();             }         }     } 

实现消息订阅方法-设备控制DeviceControl

处理消息,将消息转换成对应点位和值,然后调用modbustcp的写入方法

  /// 设备控制,反写 ///  ///  ///  private void DeviceControl(string topic, string msg) {     var message = JsonSerializer.Deserialize(msg);     //如果消息不为null,则通过设备id找到对应的modbustcp对象,并写入值     if (message != null)     {         var link = _deviceLinks.FirstOrDefault(x => x.UID == message.DeviceId); //通过设备id找到对应的modbustcp对象         if (link != null)         {             var modbusTcp = _modbusTcps[link.UID]; //通过设备id找到对应的modbustcp对象             //循环消息中的数据对象,然后再设备link对象点位中找到对应的点位对象,并写入值             foreach (var item in message.Data)             {                 var point = link.Points.FirstOrDefault(x => x.UID == item.Key); //通过点位id找到对应的点位对象                 if (point != null)                 {                     var parseMethod = point.Type.GetMethod(                         "Parse",                         BindingFlags.Public | BindingFlags.Static,                         new[] { typeof(string) }                     );                     point.WriteValue = parseMethod.Invoke(                         null,                         new object[] { item.Value.ToString() }                     ); //通过点位id找到对应的点位对象                 }                 modbusTcp.Write(point);             }         }     } } 

在ModbusTcp通信类中,添加一个写入队列和写入方法,写入点位先添加在队列中,然后再读数据间隙中,实现写入

 public class ModbusTcp  {          ///           /// 写入队列          ///           private Queue _writeQueue = new Queue();                    ·          //写入值先加入一个队列           public void Write(RegisterPoint point)           {               _writeQueue.Enqueue(point);           }  } 

完整代码

public class DAqService {     public static string MainTopic = "DTSDAQ/";      private MqttControllor MqttControllor;     private Dictionary _modbusTcps;      private DAqOption _option;     private List _deviceLinks;     private ServiceConfig _serviceConfig;      private System.Timers.Timer timer;      public DAqService(DAqOption option)     {         _modbusTcps = new Dictionary();         _option = option;         _deviceLinks = option.DeviceLinks;         _serviceConfig = option.ServiceConfig;         timer = new System.Timers.Timer(_serviceConfig.PushTimeSpan * 1000);         timer.Elapsed += Timer_Elapsed;     }      ///      /// 启动服务     ///      public void Start()     {         MqttControllor = new MqttControllor(_option.MqttConfig);          foreach (var item in _deviceLinks)         {             ModbusTcp modbusTcp = new ModbusTcp(item);             modbusTcp.DoMonitor();             modbusTcp.ValueUpdated += ModbusTcp_ValueUpdated;             _modbusTcps.Add(item.UID, modbusTcp);             MqttControllor.SubscribeTopic($"{MainTopic}{item.UID}/write", DeviceControl);         }          if (_serviceConfig.IsPushScheduled)         {             timer.Start();         }     }      ///      /// 设备控制,反写     ///      ///      ///      private void DeviceControl(string topic, string msg)     {         var message = JsonSerializer.Deserialize(msg);         //如果消息不为null,则通过设备id找到对应的modbustcp对象,并写入值         if (message != null)         {             var link = _deviceLinks.FirstOrDefault(x => x.UID == message.DeviceId); //通过设备id找到对应的modbustcp对象             if (link != null)             {                 var modbusTcp = _modbusTcps[link.UID]; //通过设备id找到对应的modbustcp对象                 //循环消息中的数据对象,然后再设备link对象点位中找到对应的点位对象,并写入值                 foreach (var item in message.Data)                 {                     var point = link.Points.FirstOrDefault(x => x.UID == item.Key); //通过点位id找到对应的点位对象                     if (point != null)                     {                         var parseMethod = point.Type.GetMethod(                             "Parse",                             BindingFlags.Public | BindingFlags.Static,                             new[] { typeof(string) }                         );                         point.WriteValue = parseMethod.Invoke(                             null,                             new object[] { item.Value.ToString() }                         ); //通过点位id找到对应的点位对象                     }                     modbusTcp.Write(point);                 }             }         }     }      private void Timer_Elapsed(object? sender, ElapsedEventArgs e)     {         foreach (var link in _deviceLinks)         {             try             {                 DeviceMessage device = new DeviceMessage { DeviceId = link.UID };                 foreach (RegisterPoint point in link.Points)                 {                     // Console.WriteLine($"Point:{point.UID}-->Value:{point.Value}");                     device.Data.Add(point.UID, point.Value);                 }                 var data = JsonSerializer.Serialize(device);                 MqttControllor.Publish($"{MainTopic}{link.UID}/Time", data); //定时推送             }             catch (Exception ex)             {                 Console.WriteLine(ex.Message);             }         }     }      private void ModbusTcp_ValueUpdated(RegisterPoint point, object value)     {         if (_serviceConfig.IsPushChanged)         {             try             {                 DeviceMessage device = new DeviceMessage { DeviceId = point.DeviceId };                 device.Data.Add(point.UID, value);                 var data = JsonSerializer.Serialize(device);                 MqttControllor.Publish($"{MainTopic}{point.DeviceId}/Update", data); //采集立刻推送             }             catch (Exception ex)             {                 Console.WriteLine(ex.Message);             }         }          Console.WriteLine($"Point:{point.UID}-->Value:{value}");     } } 

相关内容

热门资讯

一分钟内幕!科乐吉林麻将系统发... 一分钟内幕!科乐吉林麻将系统发牌规律,福建大玩家确实真的是有挂,技巧教程(有挂ai代打);所有人都在...
一分钟揭秘!微扑克辅助软件(透... 一分钟揭秘!微扑克辅助软件(透视辅助)确实是有挂(2024已更新)(哔哩哔哩);1、用户打开应用后不...
五分钟发现!广东雀神麻雀怎么赢... 五分钟发现!广东雀神麻雀怎么赢,朋朋棋牌都是是真的有挂,高科技教程(有挂方法)1、广东雀神麻雀怎么赢...
每日必看!人皇大厅吗(透明挂)... 每日必看!人皇大厅吗(透明挂)好像存在有挂(2026已更新)(哔哩哔哩);人皇大厅吗辅助器中分为三种...
重大科普!新华棋牌有挂吗(透视... 重大科普!新华棋牌有挂吗(透视)一直是有挂(2021已更新)(哔哩哔哩)1、完成新华棋牌有挂吗的残局...
二分钟内幕!微信小程序途游辅助... 二分钟内幕!微信小程序途游辅助器,掌中乐游戏中心其实存在有挂,微扑克教程(有挂规律)二分钟内幕!微信...
科技揭秘!jj斗地主系统控牌吗... 科技揭秘!jj斗地主系统控牌吗(透视)本来真的是有挂(2025已更新)(哔哩哔哩)1、科技揭秘!jj...
1分钟普及!哈灵麻将攻略小,微... 1分钟普及!哈灵麻将攻略小,微信小程序十三张好像存在有挂,规律教程(有挂技巧)哈灵麻将攻略小是一种具...
9分钟教程!科乐麻将有挂吗,传... 9分钟教程!科乐麻将有挂吗,传送屋高防版辅助(总是存在有挂)1、完成传送屋高防版辅助透视辅助安装,帮...
每日必看教程!兴动游戏辅助器下... 每日必看教程!兴动游戏辅助器下载(辅助)真是真的有挂(2025已更新)(哔哩哔哩)1、打开软件启动之...