-
Notifications
You must be signed in to change notification settings - Fork 17
/
rtt2uart.py
229 lines (190 loc) · 7.53 KB
/
rtt2uart.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
import logging
import pylink
import time
import serial
import threading
import socket
logging.basicConfig(level=logging.NOTSET,
format='%(asctime)s - [%(levelname)s] (%(filename)s:%(lineno)d) - %(message)s')
logger = logging.getLogger(__name__)
class rtt_to_serial():
def __init__(self, jlink, connect_inf='USB', connect_para=None, device=None, port=None, baudrate=115200, interface=pylink.enums.JLinkInterfaces.SWD, speed=12000, reset=False):
# jlink接入方式
self._connect_inf = connect_inf
# jlink接入参数
self._connect_para = connect_para
# 目标芯片名字
self.device = device
# 调试口
self._interface = interface
# 连接速率
self._speed = speed
# 复位标志
self._reset = reset
# 串口参数
self.port = port
self.baudrate = baudrate
self.jlink = jlink
# 线程
self._write_lock = threading.Lock()
try:
self.serial = serial.Serial()
except:
logger.error('Creat serial object failed', exc_info=True)
raise
self.socket = None
self.timer = None
self.rtt2uart = None
self.uart2rtt = None
def __del__(self):
logger.debug('close app')
self.stop()
def start(self):
logger.debug('start rtt2uart')
try:
if self._connect_inf != 'EXISTING' and self.jlink.connected() == False:
# 加载jlinkARM.dll
if self._connect_inf == 'USB':
self.jlink.open(serial_no=self._connect_para)
else:
self.jlink.open(ip_addr=self._connect_para)
# 设置连接速率
if self.jlink.set_speed(self._speed) == False:
logger.error('Set speed failed', exc_info=True)
raise Exception("Set jlink speed failed")
# 设置连接接口为SWD
if self.jlink.set_tif(self._interface) == False:
logger.error('Set interface failed', exc_info=True)
raise Exception("Set jlink interface failed")
try:
if self._reset == True:
# 复位一下目标芯片,复位后不要停止芯片,保证后续操作的稳定性
self.jlink.reset(halt=False)
# 连接目标芯片
self.jlink.connect(self.device)
except pylink.errors.JLinkException:
logger.error('Connect target failed', exc_info=True)
raise
except pylink.errors.JLinkException as errors:
logger.error('Open jlink failed', exc_info=True)
raise
try:
if self.serial.isOpen() == False:
# 设置串口参数并打开串口
self.serial.port = self.port
self.serial.baudrate = self.baudrate
self.serial.timeout = 3
self.serial.write_timeout = 3
self.serial.open()
except:
logger.error('Open serial failed', exc_info=True)
raise
self.socket = self.doConnect('localhost', 19021)
if self.socket:
self.thread_switch = True
self.rtt2uart = threading.Thread(target=self.rtt_to_uart)
self.rtt2uart.setDaemon(True)
self.rtt2uart.name = 'rtt->serial'
self.rtt2uart.start()
self.uart2rtt = threading.Thread(target=self.uart_to_rtt)
self.uart2rtt.setDaemon(True)
self.uart2rtt.name = 'serial->rtt'
self.uart2rtt.start()
else:
raise Exception("Connect or config RTT server failed")
def stop(self):
logger.debug('stop rtt2uart')
if self.timer:
self.timer.cancel()
self.thread_switch = False
if self.rtt2uart:
self.rtt2uart.join(0.5)
if self.uart2rtt:
self.uart2rtt.join(0.5)
if self._connect_inf == 'USB':
try:
if self.jlink.connected() == True:
# 使用完后停止RTT
self.jlink.rtt_stop()
# 释放之前加载的jlinkARM.dll
self.jlink.close()
except pylink.errors.JLinkException:
logger.error('Disconnect target failed', exc_info=True)
pass
try:
if self.serial.isOpen() == True:
self.serial.close()
except:
logger.error('Close serial failed', exc_info=True)
pass
if self.socket:
self.socket.close()
def rtt_to_uart(self):
while self.thread_switch == True:
try:
rtt_recv = self.socket.recv(1024)
if self._connect_para == True and rtt_recv == b'':
logger.debug('telnel server close')
self.thread_switch = False
self.socket.close()
# telnet服务器已经关闭,开启定时查询服务器状态
self.timer = threading.Timer(0.5, self.check_socket_status)
self.timer.start()
except socket.error as msg:
logger.error(msg, exc_info=True)
# probably got disconnected
raise Exception("Jlink rtt read error")
try:
if rtt_recv:
self.serial.write(rtt_recv)
except:
raise Exception("Serial write error")
def uart_to_rtt(self):
while self.thread_switch == True:
try:
data = self.serial.read(self.serial.in_waiting or 1)
if data:
with self._write_lock:
self.socket.sendall(data)
except socket.error as msg:
logger.error(msg, exc_info=True)
# probably got disconnected
raise Exception("Jlink rtt write error")
def doConnect(self, host, port):
socketlocal = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
socketlocal.connect((host, port))
except socket.error:
socketlocal.close()
socketlocal = None
pass
return socketlocal
def check_socket_status(self):
self.timer = threading.Timer(1, self.check_socket_status)
# 连接到RTT Telnet Server
self.socket = self.doConnect('localhost', 19021)
if self.socket:
logger.debug('reconnect success')
self.rtt2uart.join(0.5)
self.uart2rtt.join(0.5)
# 连接成功,重建线程
self.thread_switch = True
self.rtt2uart = threading.Thread(target=self.rtt_to_uart)
self.rtt2uart.setDaemon(True)
self.rtt2uart.name = 'rtt->serial'
self.rtt2uart.start()
self.uart2rtt = threading.Thread(target=self.uart_to_rtt)
self.uart2rtt.setDaemon(True)
self.uart2rtt.name = 'serial->rtt'
self.uart2rtt.start()
self.timer.cancel()
else:
# 连接失败,继续尝试连接
self.timer.start()
logger.debug("try to reconnect")
if __name__ == "__main__":
serial_name = input("请输入虚拟串口对中的串口名字,如COM26:")
if '' == serial_name:
serial_name = 'COM26'
test = rtt_to_serial('AMAPH1KK-KBR', serial_name, 115200)
test.start()