文章目录
购买 发送邮件的设备

我们购买的是这款,端口是串行的
使用说明:
发送了一条,返回SMS_SND_SUCESS后再发下一条,返回了SMS_SND_FAIL,也可以再发下一条,什么都不返回,等20s,这一条视为发送失败,继续发送其它短信。
官方给的Python实例如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
import cgi import serial import time import request #参数接收 #web访问链接示例 http://localhost:8101/dtu.py?a=s&p=13389467776&c=你好world! a = request.REQUEST.get('a') #获取动作 p = request.REQUEST.get('p') #获取接收号码 c = request.REQUEST.get('c') #获取发送内容 #web页面头 print("Content-Type: text/html\n") #判断电话号码和内容不为空 if p and c: s=serial.Serial('com4',9600,timeout=1) s.close()#端口关闭,防止已经打开 s.open()#打开端口 myinput = ('%s:0:%s'%(p,c)).encode("GBK") #print(myinput) s.write(myinput) #端口缓冲5秒 time.sleep(5) #接收返回数据 n=s.inWaiting() if n: data= str(s.read(n)) print(data) s.close() #关闭端口 |
优化版本如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
class SendTelMessage(): def __init__(self): self.port_name = None self.is_ready = False port_list = list(serial.tools.list_ports.comports()) for i in port_list: if i.description.startswith('Prolific USB-to-Serial Comm Port') or 'USB-Serial' in i.description: self.port_name = i.name break try: self.s = serial.Serial('/dev/' + self.port_name, 9600, timeout=1) self.s.close() except Exception as e: logger.error(f"未发现串口设备: {e}") else: self.is_ready = True def send_message_tel(self,phone, msg): """ 发送信息 """ myinput = ('86%s:0:%s' % (phone, msg)).encode("GBK") try: if not self.s.is_open: self.s.open() self.s.write(myinput) # 端口缓冲5秒 time.sleep(3) # 接收返回数据 n = self.s.inWaiting() if n: data = str(self.s.read(n)) self.s.close() # 关闭端口 if 'SMS_SEND_SUCESS' in data: return True return False except Exception as e: logger.error(f"send msg error {e}") if self.s.is_open: self.s.close() return False @staticmethod def wait_until(somepredicate, timeout, period=0.25, *args, **kwargs): # 增加一个 超时限制 mustend = time.time() + timeout while time.time() < mustend: if somepredicate(*args, **kwargs): return True time.sleep(period) logger.info("超过 最大时间限制 仍未发送成功") return False def sendMsg(self,phone, msg): return self.wait_until(self.send_message_tel,21,period=0.25,phone=phone,msg=msg) send_msg_obj = SendTelMessage() if __name__ == '__main__': stm = SendTelMessage() for i in range(10): stm.sendMsg("15xx0593","hello world {i}") logger.info(f"发送成功 {i}") |
测试如下图

