C#实现数据采集系统-数据反写(2)消息内容处理和写入通信类队列
创始人
2024-09-25 07:28:08
0

C#实现数据采集系统-数据反写

实现步骤

  1. MQTT订阅,接收消息 链接-MQTT订阅接收消息
  2. 反写内容写入通信类,添加到写入队列中
  3. 实现Modbustcp通信写入

具体实现

2. 消息内容写入通信类,添加到写入队列中

在服务类DAqService中添加通信集合_modbusTcps用于存储每个设备的通信类,使用键值对Dictionary存储设备ID和通信类,用于快速查找

然后在启动的时候,订阅各个设备ID的写入主题,添加控制方法DeviceControl

    public class DAqService     {          public static string MainTopic = "DTSDAQ/";                    private Dictionary _modbusTcps;                    public DAqService(DAqOption option)         {             _modbusTcps = new Dictionary();            //...         }          ///          /// 启动服务         ///          public void Start()         {             MqttControllor = new MqttControllor(_option.MqttConfig);              foreach (var item in _deviceLinks)             {                 ModbusTcp modbusTcp = new ModbusTcp(item);                 modbusTcp.DoMonitor();                 modbusTcp.ValueUpdated += ModbusTcp_ValueUpdated;                 //将                 _modbusTcps.Add(item.UID, modbusTcp);                 MqttControllor.SubscribeTopic($"{MainTopic}{item.UID}/write", DeviceControl);             }              if (_serviceConfig.IsPushScheduled)             {                 timer.Start();             }         }     } 

实现消息订阅方法-设备控制DeviceControl

处理消息,将消息转换成对应点位和值,然后调用modbustcp的写入方法

  /// 设备控制,反写 ///  ///  ///  private void DeviceControl(string topic, string msg) {     var message = JsonSerializer.Deserialize(msg);     //如果消息不为null,则通过设备id找到对应的modbustcp对象,并写入值     if (message != null)     {         var link = _deviceLinks.FirstOrDefault(x => x.UID == message.DeviceId); //通过设备id找到对应的modbustcp对象         if (link != null)         {             var modbusTcp = _modbusTcps[link.UID]; //通过设备id找到对应的modbustcp对象             //循环消息中的数据对象,然后再设备link对象点位中找到对应的点位对象,并写入值             foreach (var item in message.Data)             {                 var point = link.Points.FirstOrDefault(x => x.UID == item.Key); //通过点位id找到对应的点位对象                 if (point != null)                 {                     var parseMethod = point.Type.GetMethod(                         "Parse",                         BindingFlags.Public | BindingFlags.Static,                         new[] { typeof(string) }                     );                     point.WriteValue = parseMethod.Invoke(                         null,                         new object[] { item.Value.ToString() }                     ); //通过点位id找到对应的点位对象                 }                 modbusTcp.Write(point);             }         }     } } 

在ModbusTcp通信类中,添加一个写入队列和写入方法,写入点位先添加在队列中,然后再读数据间隙中,实现写入

 public class ModbusTcp  {          ///           /// 写入队列          ///           private Queue _writeQueue = new Queue();                    ·          //写入值先加入一个队列           public void Write(RegisterPoint point)           {               _writeQueue.Enqueue(point);           }  } 

完整代码

public class DAqService {     public static string MainTopic = "DTSDAQ/";      private MqttControllor MqttControllor;     private Dictionary _modbusTcps;      private DAqOption _option;     private List _deviceLinks;     private ServiceConfig _serviceConfig;      private System.Timers.Timer timer;      public DAqService(DAqOption option)     {         _modbusTcps = new Dictionary();         _option = option;         _deviceLinks = option.DeviceLinks;         _serviceConfig = option.ServiceConfig;         timer = new System.Timers.Timer(_serviceConfig.PushTimeSpan * 1000);         timer.Elapsed += Timer_Elapsed;     }      ///      /// 启动服务     ///      public void Start()     {         MqttControllor = new MqttControllor(_option.MqttConfig);          foreach (var item in _deviceLinks)         {             ModbusTcp modbusTcp = new ModbusTcp(item);             modbusTcp.DoMonitor();             modbusTcp.ValueUpdated += ModbusTcp_ValueUpdated;             _modbusTcps.Add(item.UID, modbusTcp);             MqttControllor.SubscribeTopic($"{MainTopic}{item.UID}/write", DeviceControl);         }          if (_serviceConfig.IsPushScheduled)         {             timer.Start();         }     }      ///      /// 设备控制,反写     ///      ///      ///      private void DeviceControl(string topic, string msg)     {         var message = JsonSerializer.Deserialize(msg);         //如果消息不为null,则通过设备id找到对应的modbustcp对象,并写入值         if (message != null)         {             var link = _deviceLinks.FirstOrDefault(x => x.UID == message.DeviceId); //通过设备id找到对应的modbustcp对象             if (link != null)             {                 var modbusTcp = _modbusTcps[link.UID]; //通过设备id找到对应的modbustcp对象                 //循环消息中的数据对象,然后再设备link对象点位中找到对应的点位对象,并写入值                 foreach (var item in message.Data)                 {                     var point = link.Points.FirstOrDefault(x => x.UID == item.Key); //通过点位id找到对应的点位对象                     if (point != null)                     {                         var parseMethod = point.Type.GetMethod(                             "Parse",                             BindingFlags.Public | BindingFlags.Static,                             new[] { typeof(string) }                         );                         point.WriteValue = parseMethod.Invoke(                             null,                             new object[] { item.Value.ToString() }                         ); //通过点位id找到对应的点位对象                     }                     modbusTcp.Write(point);                 }             }         }     }      private void Timer_Elapsed(object? sender, ElapsedEventArgs e)     {         foreach (var link in _deviceLinks)         {             try             {                 DeviceMessage device = new DeviceMessage { DeviceId = link.UID };                 foreach (RegisterPoint point in link.Points)                 {                     // Console.WriteLine($"Point:{point.UID}-->Value:{point.Value}");                     device.Data.Add(point.UID, point.Value);                 }                 var data = JsonSerializer.Serialize(device);                 MqttControllor.Publish($"{MainTopic}{link.UID}/Time", data); //定时推送             }             catch (Exception ex)             {                 Console.WriteLine(ex.Message);             }         }     }      private void ModbusTcp_ValueUpdated(RegisterPoint point, object value)     {         if (_serviceConfig.IsPushChanged)         {             try             {                 DeviceMessage device = new DeviceMessage { DeviceId = point.DeviceId };                 device.Data.Add(point.UID, value);                 var data = JsonSerializer.Serialize(device);                 MqttControllor.Publish($"{MainTopic}{point.DeviceId}/Update", data); //采集立刻推送             }             catch (Exception ex)             {                 Console.WriteLine(ex.Message);             }         }          Console.WriteLine($"Point:{point.UID}-->Value:{value}");     } } 

相关内容

热门资讯

专业讨论!德扑之星真破解套路(... 专业讨论!德扑之星真破解套路(辅助挂)软件透明挂(有挂了解)-哔哩哔哩;人气非常高,ai更新快且高清...
每日必看!智星德州菠萝外挂检测... 每日必看!智星德州菠萝外挂检测(辅助挂)软件透明挂(有挂教学)-哔哩哔哩1、玩家可以在智星德州菠萝外...
透视透明挂!轰趴十三水有后台(... 轰趴十三水有后台赢率提升策略‌;透视透明挂!轰趴十三水有后台(辅助挂)软件透明挂(有挂详情)-哔哩哔...
发现玩家!德扑ai助手软件(辅... 发现玩家!德扑ai助手软件(辅助挂)透视辅助(有挂教学)-哔哩哔哩;玩家在德扑ai助手软件中需先进行...
一分钟了解!x-poker辅助... 一分钟了解!x-poker辅助软件(辅助挂)辅助透视(有挂攻略)-哔哩哔哩1、每一步都需要思考,不同...
一分钟揭秘!德州最新辅助器(辅... 一分钟揭秘!德州最新辅助器(辅助挂)透视辅助(有挂攻略)-哔哩哔哩;德州最新辅助器最新版本免费下载安...
玩家攻略推荐!德州辅助(辅助挂... 玩家攻略推荐!德州辅助(辅助挂)辅助透视(有挂了解)-哔哩哔哩是由北京得德州辅助黑科技有限公司精心研...
揭秘真相!pokernow德州... 《揭秘真相!pokernow德州(辅助挂)辅助透视(有挂介绍)-哔哩哔哩》 pokernow德州软件...
五分钟了解!德州之星辅助器(辅... 五分钟了解!德州之星辅助器(辅助挂)辅助透视(有挂透明)-哔哩哔哩1、很好的工具软件,可以解锁游戏的...
推荐一款!pokermaste... 1、推荐一款!pokermaster有外挂(辅助挂)透视辅助(有挂教学)-哔哩哔哩;详细教程。2、p...