1.环境
python2.7
serial 库
2.AT command
什么是AT command:
https://baike.baidu.com/item/AT命令/3441555?fr=aladdin
3.实现
4.假设环境
有一款物联网设备,主要功能连接MTQQ实现数据的推送,有:
1.devices建立与服务器的新的MQTT和TCP连接
AT+NEW=,,
【server : 0.0.0.0 MQTTservers 地址】
【port: 5001 端口】
【command_timeout: 100ms 延时】
Response
a.OK 连接成功
b.ERROR 连接错误
c.Timeout 连接超时
2.devices发送MQTT断开数据包
AT + DISCON
Response
a.OK\r\n
b.ERROR\r\n
3.devices重启
AT + Reboot
Response
a.OK\r\n
测试案例:连接成功 10s后断开 重复100次。
5.代码实现,因为项目存在研发保密,测试脚本只体现大致思路
#!/usr/bin/env python
# -*- coding=utf-8 -*-
__author__ = 'man'
'''
AT Command 自动化测试实验;
'''
import sys
import time
import serial
class AT_Command:
#port 串口 COM1
#baudrate 波特率 9000
def __init__(self,port,baudrate):
self.serial_port = serial.Serial()
self.serial_port.baudrate = baudrate
self.serial_port.port = port
self.serial_port.timeout = 30 # 30 seconds
self.serial_port.open()
if self.serial_port.is_open != True:
print "Open Serial Port "+self.serial_port.port+" Failed!!!"
exit()
elif self.serial_port.is_open == True:
print "Open Serial Port "+self.serial_port.port+" Passed!!!"
#ATcomd is string 输入的CMD
#seek is list 匹配的子串列表
#timeout is int 规定匹配的时间
def input_cmd(self,ATcomd,seek,timeout=20):
print "input command : "+str(ATcomd)
#self.serial_port.flushInput()
self.serial_port.write(ATcomd)
read_buffer = ""
timeout_number=0
while(timeout_number
read_buffer = read_buffer + self.serial_port.read()
if read_buffer in seek:
print "response : ",read_buffer
return read_buffer
break;
else:
time.sleep(0.1)
timeout_number+=1
print "not response"
return "not response"
def input_hex(self,send_data,seek,timeout=20):
#serial_port.flushInput()
self.serial_port.write(send_data.decode("hex"))
print "Sended data: ", send_data.decode("hex")
read_buffer = ""
timeout_number=0
while(timeout_number
read_buffer = read_buffer + self.serial_port.read()
if read_buffer in seek:
print "response : ",read_buffer
return read_buffer
break;
else:
time.sleep(0.1)
timeout_number+=1
print "not response"
return "not response"
def testcase():
atcommand = AT_Command('COM1','9000')
conn_response = ['OK','ERROR','Timeout']
dsconn_response = ['OK','ERROR']
reboot_response = ['OK']
passed_number = 0
case_number = 0
while(case_number<100):
conn = atcommand.input_cmd('AT+NEW=1.1.1.1,80,10',conn_response)
if conn == 'OK':
time.sleep(10)
dsconn = atcommand.input_cmd('AT + DISCON',dsconn_response )
if dsconn == 'OK':
print "Passed"
passed_number+=1
elif dsconn == 'ERROR':
print "Failed; dscon fail"
else:
print "Failed; dscon time out"
elif conn == 'ERROR':
print "Failed; con ERROR"
elif conn == 'Timeout':
print "Failed; con Timeout"
else:
print "Failed; con time out"
atcommand.input_cmd('AT + Reboot',reboot_response)
case_number+=1
fail_number = case_number - passed_number
return passed_number,fail_number
if __name__=="__main__":
testcase()