由于猫每次在一个临时测试点此测试一大片服务器的延迟和丢包, 一个个去跑太蛋疼, 于是用PyQt做了这么个小工具来测试各种线路的延迟丢包等信息.
这是我的第二个PyQt作品= =|||
这个工具我已经初步实现了跨平台(在以上截图环境下运行正常), 在编写过程中, 我有如下的收获:
- 不可以在子线程中直接操作UI, 以免引起资源冲突导致Segmentation Fault
- 使用Queue类可以初步实现子线程与UI线程更新界面的通信. Signal方面, 我实例了一个QTimer, 每隔一定时间处理一次消息队列.
- QTableView比QTableWidget效率高得多, 尤其是在Win32平台下. 因此应尽量采用 QTableView + QStandardItemModel 的搭配来做Table
注: 代码中已经内置了一份测试IP列表, 可以根据需要添加/删除. 第一次运行会生成 ips.conf 文件, 以后需要修改IP列表, 只需要编辑此文件.
再注: 这个程序写的确实很丑, 欢迎各种拍砖 = =
下面贴上全部的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 |
# -*- coding: utf-8 -*- import sys import time import subprocess from threading import Thread import re from PyQt4.QtGui import QMainWindow, QApplication, QStandardItemModel, QStandardItem, QWidget, QVBoxLayout, QTableView from PyQt4.QtCore import pyqtSignature, Qt, QTimer, SIGNAL, QString, QMetaObject from Queue import Queue #from Ui_main import Ui_MainWindow try: _fromUtf8 = QString.fromUtf8 except AttributeError: _fromUtf8 = lambda s: s class Ui_MainWindow(object): def setupUi(self, MainWindow): MainWindow.setObjectName(_fromUtf8("MainWindow")) MainWindow.resize(500, 435) self.centralWidget = QWidget(MainWindow) self.centralWidget.setObjectName(_fromUtf8("centralWidget")) self.verticalLayout = QVBoxLayout(self.centralWidget) self.verticalLayout.setObjectName(_fromUtf8("verticalLayout")) self.tableView = QTableView(self.centralWidget) self.tableView.setObjectName(_fromUtf8("tableView")) self.verticalLayout.addWidget(self.tableView) MainWindow.setCentralWidget(self.centralWidget) self.retranslateUi(MainWindow) QMetaObject.connectSlotsByName(MainWindow) def retranslateUi(self, MainWindow): MainWindow.setWindowTitle(QApplication.translate("MainWindow", "Ping Tester", None, QApplication.UnicodeUTF8)) if sys.platform.startswith('linux'): getdata = re.compile(r"icmp_req=(\d+) ttl=(\d+) time=([\d\.]+)\sms") pingstr = ["ping", "-n", "-i 0.2"] filtered = "Packet filtered" delaytime = 200 else: getdata = re.compile(r"=([\d\.]+)ms TTL=(\d+)") pingstr = ["ping.exe", "-t"] timeout = "Request timed out." delaytime = 500 try: with open("ips.conf", "r") as f: t_node = f.read().decode('utf-8') if not t_node: raise IOError except IOError: with open("ips.conf", "w") as f: t_node = u""" 210.51.176.182-北京联通BGP 202.97.224.68-黑龙江网通 202.99.96.68-天津网通 202.99.160.68-河北网通 202.96.69.38-大连网通 221.208.172.1-哈尔滨网通 202.99.192.68-山西网通 202.99.160.68-河北网通 210.52.149.2-湖北网通 218.30.64.121-湖南电信 202.100.4.15-陕西电信 202.96.199.133-上海电信 219.150.150.150-河南电信 219.146.0.130-黑龙江电信 202.98.96.68-四川电信 59.66.4.50-北京教育网 202.112.26.246-上海教育网 202.114.0.254-武汉教育网 211.161.159.9-武汉长城宽带 143.90.12.170-日本东京odn 202.160.123.5-新加坡Host1Plus 60.199.17.138-台湾固网 220.128.152.1-台湾HINET 168.95.1.1-台湾中华电信 202.65.207.1-香港DYX 202.181.171.1-香港HKCIX 203.169.186.1-香港NTT.HKNET 59.148.193.38-香港HKCTI 220.232.214.1-香港Supernet 59.152.208.1-香港WTT 210.56.48.1-香港HKSUN 202.76.56.1-香港CPCNET 218.213.250.130-香港SNL 69.162.132.1-美国芝加哥Level3 205.185.112.131-美国佛里蒙特HE 204.152.221.1-美国洛杉矶nLayer 178.63.62.208-德国Nuremburg 94.23.202.83-法国OVH 69.163.43.73-美国波特兰HE 216.189.1.14-美国RockHill 69.172.231.1-美国洛杉矶Peer1 184.22.112.34-美国迈阿密HE 199.48.146.37-美国圣琼斯 216.245.208.1-美国达拉斯 109.74.207.9-英国伦敦 """ f.write(t_node.encode('utf-8')) node = [] for line in t_node.split('\n'): try: ip, desc = line.strip().split("-") node.append((ip, desc)) except ValueError: pass nodecount = len(node) class MainWindow(QMainWindow, Ui_MainWindow): """ Class documentation goes here. """ def __init__(self, parent = None): """ Constructor """ QMainWindow.__init__(self, parent) self.setupUi(self) self.model = QStandardItemModel() self.model.setColumnCount(6) self.model.setRowCount(nodecount) self.model.setHorizontalHeaderLabels(["IP", "Description", "Loss%", "CurPing", "AvgPing", "TTL"]) for i, (ip, desc) in enumerate(node): self.setitem(i, 0, ip) self.setitem(i, 1, desc) self.setitem(i, 2, "") self.setitem(i, 3, "") self.setitem(i, 4, "") self.setitem(i, 5, "") self.tableView.setModel(self.model) for i in range(len(node)): self.tableView.setRowHeight(i, 18) self.resizetable() self.timer = QTimer(self) self.connect(self.timer, SIGNAL("timeout()"), self.checkitems) self.timer.start(delaytime) def checkitems(self): while not q.empty(): item = q.get() self.chgtxt(*item) q.task_done() self.resizetable() def resizetable(self): self.tableView.resizeColumnsToContents() def chgtxt(self, x, y, value): self.model.item(x, y).setText(value) def setitem(self, x, y, value): self.model.setItem(x, y, QStandardItem(value)) app = QApplication(sys.argv) ui = MainWindow() ui.show() q = Queue() def pinger(i, ip, desc): s = "" avgping = 0 count = 0 timeoutcount = 0 ret = subprocess.Popen(pingstr + [ip], stdout=subprocess.PIPE) while True: try: s += ret.stdout.read(1) tryfind = getdata.findall(s) if sys.platform.startswith('linux'): if len(tryfind) > 0: req, ttl, crtping = tryfind[-1] avgping += float(crtping) count += 1 q.put((i, 3, crtping + "ms")) q.put((i, 4, "%.2f" % (avgping * 1.0 / count) + "ms")) q.put((i, 5, ttl)) q.put((i, 2, "%.2f" % ((int(req) - count) * 100.0 / int(req)))) s = "" elif filtered in s: q.put((i, 2, "Failed")) q.put((i, 3, "Failed")) q.put((i, 4, "Failed")) q.put((i, 5, "Failed")) ret.kill() s = "" else: if len(tryfind) > 0: crtping, ttl = tryfind[-1] avgping += float(crtping) count += 1 q.put((i, 3, crtping + "ms")) q.put((i, 4, "%.2f" % (avgping * 1.0 / count) + "ms")) q.put((i, 5, ttl)) q.put((i, 2, "%.2f" % (timeoutcount * 100.0 / (count + timeoutcount)))) elif timeout in s: timeoutcount += 1 q.put((i, 2, "-")) q.put((i, 3, "-")) if count: q.put((i, 5, "%.2f" % (timeoutcount * 100.0 / (count + timeoutcount)))) else: q.put((i, 5, "-")) s = "" except IOError: print s break def startworkers(): for i, (ip, desc) in enumerate(node): worker = Thread(target=pinger, args=(i, ip, desc)) worker.setDaemon(True) worker.start() time.sleep(delaytime / 10000.0) startthread = Thread(target=startworkers) startthread.setDaemon(True) startthread.start() sys.exit(app.exec_()) |
学习你了代码,借用你的代码在Stackflow求到了如何正常关闭subprocess。
给作者你反馈下,谢谢你的分享
啊, 谢谢 😀
if is Windows cmd -> taskkill /F /IM “PING.EXE”
很不错,就是有些地方没看懂。
一关终端就喷了…… 话说那个列表不能排序啊。
貌似是因为死掉的时候没清 thread。
而且貌似有次死掉之后 ping 都没杀掉……
这个我就不清楚了=.=排序是还不知道怎么做…..
sort=。=
同步问题…界面排序和输出排序….我这个程序结构相当不合理的
怎么喷了= =是不是出了错误提示XD
Exception in thread Thread-7:
Traceback (most recent call last):
File “/usr/lib/python2.6/threading.py”, line 532, in __bootstrap_inner
self.run()
File “/usr/lib/python2.6/threading.py”, line 484, in run
self.__target(*self.__args, **self.__kwargs)
File “multiping.py”, line 154, in pinger
tryfind = getdata.findall(s)
AttributeError: ‘NoneType’ object has no attribute ‘findall’
然后 ping 没杀掉……这个太严重了
没有想到合理的方法去手动杀…求教 0.0
我是小白一个……对 QT 跟 python 的 thread 都不熟……
我也是初学者呢= =||||||
话说这种情况下 ping 是不是应该收到 SIGPIPE 然后挂掉?
Thread 7 == any of those threads.
线程退出前调用一下.kill()保证子进程退出。
酱紫啊..谢谢PT酱!!
哈哈 我蛮喜欢的
第一次在猫窝看到Rex呢= =|||