极乐门资源网 Design By www.ioogu.com
这个属于我项目中一个函数,跟大家分享一下我的思路及最终实现
在编写串口通信工具中,需要实现一个函数,自动找到对应com 口,并且连接该com口,保证后续通信正常
作为初始化过程的一部分。
思路
在win 设备管理器中,经常会出现多个com 口,但并不是每个com 口都是目标设备所链接的。
尝试打开每个com 口,输入enter 按键, 正确的com 口,会有ack log 返回,表明通信 正常
否则,没有任何log 返回,则判断为非目标设备所连接的com 口。
实现
尝试去打开所有com 口,然后发送enter, 如果在一段时间内有返回值,检查com 口收到的字节数,如果非零,则表明找到了对应的com 口。
完整测试代码如下:
import serial import serial.tools.list_ports import threading import binascii import time from datetime import datetime # default value baunRate = 115200 is_read = False is_write = False write_buff = [] sys_buff = [] mSerial = None callback = None is_opened = 0 is_registed = 0 class SerialPort: def __init__(self,port,buand): self.port = serial.Serial(port,buand) self.port.close() if not self.port.isOpen(): self.port.open() #the index of data_bytes for read operation,私有属性 #only used in read lines self.__read_ptr = 0 self.__read_head = 0 #store all read bytes # used in read date, read lines self.__data_bytes = bytearray() def port_open(self): if not self.port.isOpen(): self.port.open() def port_close(self): self.port.close() def send(self): global is_write global write_buff while is_write: if len(write_buff): msg = write_buff.pop(0) msg = msg+"\n" cmd = msg.encode() try: self.port.write(cmd) except: write_buff.clear() is_write = False write_buff.clear() def read_data(self): global is_read global is_opened byte_cnt = 0 while is_read: try: count = self.port.inWaiting() if count > 0: rec_str = self.port.read(count) self.__data_bytes = self.__data_bytes+rec_str #print("receive:",rec_str.decode()) #print(rec_str) byte_cnt += count if not is_opened: is_opened = 1 #print("累计收到:",byte_cnt) #time.sleep(0.5) self.read_lines() except: deinit() #将当前所有的数据都读出,读取位置不变,每次读取指针依次移动,不漏数据, 读取行为一直在进行 def read_lines(self): #reset line_cnt = 0 data_len = len(self.__data_bytes) #print ("") #print ("begin: prt=:", self.__read_ptr, " head =", self.__read_head,"current len =",data_len) if self.__read_ptr >=data_len: return #get all lines in current data_bytes while(self.__read_ptr < data_len-1): if(self.__data_bytes[self.__read_ptr+1] == 0x0a and self.__data_bytes[self.__read_ptr] == 0x0d): tmp = bytearray() tmp = self.__data_bytes[self.__read_head:self.__read_ptr] try: line = tmp.decode() except: self.__read_head = self.__read_ptr + 2 self.__read_ptr = self.__read_head continue iprint(line) line_cnt += 1 self.__read_head = self.__read_ptr + 2 self.__read_ptr = self.__read_head else: self.__read_ptr = self.__read_ptr + 1 def auto_open_serial(): global baunRate global mSerial global callback global is_registed global is_opened #reset deinit() # 列出所有当前的com口 port_list = list(serial.tools.list_ports.comports()) port_list_name = [] #get all com if len(port_list) <= 0: iprint("the serial port can't find!") return False else: for itms in port_list: port_list_name.append(itms.device) #try open #print(port_list_name) for i in port_list_name: try: mSerial = SerialPort(i,baunRate) iprint("try open %s"%i) start_task() send("") #return True time.sleep(1) if is_opened: iprint("connect %s successfully"%i) return True else: deinit() if i == port_list_name[len(port_list_name)-1]: iprint("uart don't open") break continue except: iprint(" uart don't open") deinit() return False def deinit(): global mSerial global is_write global is_read global write_buff global is_opened if mSerial: mSerial.port_close() is_opened = 0 is_read = False is_write = False write_buff = [] mSerial = None time.sleep(1) def init(): global mSerial global callback global is_registed global is_opened global is_read global is_write #retry retry_time = 0 while not auto_open_serial(): if not is_opened: iprint("wait for uart connect, retry %s"%str(retry_time)) else: return True retry_time += 1 time.sleep(2) if retry_time == 10: iprint(" open uart fail") return False def send(msg): global mSerial global is_write global write_buff if is_write: write_buff.append(msg) def start_task(): global mSerial global is_write global is_read if mSerial: is_write = True t1 = threading.Thread (target=mSerial.send) t1.setDaemon (False) t1.start () is_read = True t2 = threading.Thread (target=mSerial.read_data) t2.setDaemon (False) t2.start () def iprint(msg): global callback global is_registed msg = "[Uart] "+str(msg) if is_registed: callback.append(msg) else: print(msg) def start_sys_cmd(): global is_registed if is_registed: t3 = threading.Thread (target=process_receive_sys_cmd) t3.setDaemon (False) t3.start() def process_receive_sys_cmd(): global sys_buff global is_registed global callback #print("process_receive_sys_cmd") while is_registed: #print ("wait,process_receive_sys_cmd") if len(sys_buff): #print ("receive,process_receive_sys_cmd") line = sys_buff.pop(0) if "init" in line: if is_opened and is_read and is_write: iprint("already open uart") break iprint("start init") init() if is_opened: break iprint("Eixt uart sys thread") def register_cback(list): global callback global is_registed callback = list is_registed = 1 def unregister_cback(): global callback callback.clear() if __name__ == '__main__': receive = [] register_cback(receive) sys_buff.append("init") start_sys_cmd() def process_receive_msg(): global receive while True: #print("wait") if len(receive): #print("receive") print(receive.pop(0)) t = threading.Thread(target=process_receive_msg) t.setDaemon(False) t.start()
极乐门资源网 Design By www.ioogu.com
极乐门资源网
免责声明:本站文章均来自网站采集或用户投稿,网站不提供任何软件下载或自行开发的软件!
如有用户或公司发现本站内容信息存在侵权行为,请邮件告知! 858582#qq.com
极乐门资源网 Design By www.ioogu.com
暂无python 自动识别并连接串口的实现的评论...
更新日志
2024年11月15日
2024年11月15日
- s13wbg战队成员都有谁 wbg战队s13成员名单一览
- 网友热议《幻兽帕鲁》停售、赔千万日元:那也赚麻了 不亏
- 这谁顶得住 《最终幻想7:重生》尤菲3D作品穿上终极芙蕾娜衣服
- 外媒称PS5pro违背承诺:《蜘蛛侠2》根本没法4K60帧
- 群星.2007-中文十大金曲30周年纪念专辑【RTHK】【WAV+CUE】
- 杨林.1989-留一点爱来爱自己【综一唱片】【WAV+CUE】
- 南合文斗.2007-陪君醉笑三千尘鸟人唱片】【FLAC+CUE】
- 群星《我们的歌第六季 第1期》[320K/MP3][90.72MB]
- 群星《我们的歌第六季 第1期》[FLAC/分轨][456.01MB]
- 李雨寰《Break Free》[320K/MP3][98.39MB]
- 草蜢.2001-《真经典》环球唱片[WAV+CUE]
- 群星.2009-第4届2008十大发烧唱片精选[FLAC+CUE]
- 丁红《女儿情》[DTS-WAV]
- 陈惠婷.2017-Voyager3【亚神音乐】【FLAC分轨】
- 群星.2001-华纳歌声魅影【华纳】【WAV+CUE】