Dans ce projet, nous allons créer un moniteur de communication réseau UDP à l’aide de Python (PyQt). Lors de développement de projet avec Arduino, Raspberry Pi ou n’importe quel microcontrôleur vous serez certainement amené à créer une interface graphique pour la gestion du système (debugger, observer des mesure, lancer des actions, etc.). Ils existent beaucoup d’outils pour créer des interfaces graphiques. Dans ce projet, nous allons créer un moniteur de communication réseau avec PyQt(PySide2).
Objectif
Pour ce projet, nous souhaitons créer une interface graphique en Python, sous Windows, se comportant comme un moniteur de communication UDP. Pour cela, il va nous falloir réaliser les fonctions suivantes
- Zone de saisie de l’adresse IP
- Zone de saisie du port
- Bouton connexion
- Zone d’écriture pour la commande
- Bouton envoyer
- Console affichant les données reçues
Récupérer l’adresse IP de la machine
Sur la machine de réception récupérer l’adresse IPv4 (dans notre cas 192.168.1.67)
ipconfig #sur la machine windows
ou
ip addr #sur machine linux
Application moniteur de communication UDP
Nous allons d’abord créer la fenêtre de l’application que nous appelons AcApp et qui sera la base de notre application.
#!/usr/bin/python3 # -*-coding:Utf-8 -* import sys,os #from PyQt5.QtWidgets import * #from PyQt5.QtCore import * #from PyQt5.QtGui import * from PySide2.QtWidgets import * from PySide2.QtCore import * from PySide2.QtGui import * pyqtSignal=Signal #translate pyqt to Pyside class AcApp(QMainWindow): def __init__(self,title='AcApp',mainFrame=QFrame): super().__init__() self.title=title self.mainFrame=mainFrame() self.initUI() def initUI(self): self.setCentralWidget(self.mainFrame) #connect signals self.mainFrame.debugSignal.connect(self.debugMsg) #General configuration #self.resize(self.width, self.height) self.setWindowTitle(self.title) self.setGeometry(300, 300, 850, 450) #self.setWindowIcon(QIcon(__icon__)) #Debug bar self.statusBar() self.statusBar().showMessage('Display debug messages') self.show() def debugMsg(self,val): self.statusBar().showMessage(val) def main(): app = QApplication(sys.argv) app.setQuitOnLastWindowClosed(True) ex = AcApp(__title__) #ex = AcApp(__title__,EthernetInterface) app.quit() sys.exit(app.exec_()) if __name__ == '__main__': main()
Création des Widgets de gestion de la communication UDP
Nous allons ensuite créer une classe qui contiendra tous les Widgets dont nous avons besoin pour créer et gérer le moniteur de communication UDP (QLineEdit,QTextEdit,QButton, etc.). Ce Widget sera initialisé avec la classe qui va gérer la communication
#self.model=EthModel()
class EthernetInterface(QFrame): debugSignal=pyqtSignal(str) #define debug signal def __init__(self,parent=None): super(EthernetInterface,self).__init__(parent) self.grid=QGridLayout() self.setLayout(self.grid) self.defineWidgets() self.model=None #self.model=EthModel() if self.model is not None: self.model.debugSignal.connect(self.read) def defineWidgets(self): #self.setStyleSheet("""QGroupBox{background-color:white;border: 1px solid green;border-radius: 4px;} #QGroupBox::title {padding:1 5px;}""") #grooupbox widget container self.grp=QGroupBox(self) self.grp.setTitle("Connection Configuration") self.fields=QGridLayout() self.grp.setLayout(self.fields) self.grid.addWidget(self.grp,0,0) #Define widget UI #validator ipRange = "(?:[0-1]?[0-9]?[0-9]|2[0-4][0-9]|25[0-5])" # Part of the regular expression # Regulare expression ipRegex = QRegExp("^" + ipRange + "\\." + ipRange + "\\." + ipRange + "\\." + ipRange + "$") ipValidator = QRegExpValidator(ipRegex, self) #label self.selectlbl = QLabel("IP Address:") self.typeBox = QLineEdit(HOST) #self.typeBox.setInputMask("0.0.0.0"); self.typeBox.setValidator(ipValidator); self.baudlbl = QLabel("Port:") self.baudBox = QLineEdit("{}".format(PORT)) #btn self.button = QPushButton("Connect") self.button.clicked.connect(self.clicked) self.button.clicked.connect(self.connec) sendBtn = QPushButton("send") sendBtn.clicked.connect(self.clicked) sendBtn.clicked.connect(self.send) titlelbl= QLabel("Enter") self.edit = QLineEdit("") sentlbl=QLabel("Sent") self.sent = QTextEdit("") desclbl=QLabel("Console") self.desc = QTextEdit("") #row, column[, rowSpan=1[, columnSpan=1[ self.fields.addWidget(self.selectlbl,0,0,1,1) self.fields.addWidget(self.typeBox,0,1,1,1) self.fields.addWidget(self.baudlbl,0,2,1,1) self.fields.addWidget(self.baudBox,0,3,1,1) self.fields.addWidget(self.button,0,4,1,1) self.fields.addWidget(titlelbl,1,0,1,1) self.fields.addWidget(self.edit,1,1,1,3) self.fields.addWidget(sendBtn,1,4,1,1) self.fields.addWidget(sentlbl,2,0,1,1,Qt.AlignTop)#Qt.AlignmentFlag.AlignTop) self.fields.addWidget(self.sent,2,1,1,3) self.fields.addWidget(desclbl,3,0,1,1,Qt.AlignTop)#Qt.AlignmentFlag.AlignTop) self.fields.addWidget(self.desc,3,1,1,3) def debug(self,msg): sender = self.sender() self.debugSignal.emit(sender.__class__.__name__+" : "+msg) def clicked(self): sender = self.sender() if sender.__class__.__name__=="QPushButton": self.debugSignal.emit(sender.text()+ " clicked") if sender.__class__.__name__=="QComboBox": self.debugSignal.emit(sender.currentText()+ " selected") def connec(self): #self.desc.setText("") self.desc.clear() if self.model is not None: if self.button.text() == "Connect": self.desc.setText(">> trying to connect to address {} on port {} ...".format(self.typeBox.text(),self.baudBox.text())) print("Started") self.button.setText("Stop") #self.model = EthModel() self.model.connec(self.typeBox.text(),int(self.baudBox.text())) self.model.start() else: self.model.quit_flag = True print("Stop sent") self.model.close()#self.model.wait() print("Stopped") self.button.setText("Connect") self.desc.setText(">> deconnect address {} on port {} ...".format(self.typeBox.text(),self.baudBox.text())) def read(self,msg): self.desc.setText(self.desc.toPlainText()+msg+"\n") self.desc.verticalScrollBar().setValue(self.desc.verticalScrollBar().maximum()); def send(self): if self.edit.text() != "": self.sent.setText(self.sent.toPlainText()+self.edit.text()+"\n") if self.model is not None: self.model.write(self.edit.text())
Remplacer ex = AcApp(__title__) par ex = AcApp(__title__,EthernetInterface) dans la fonction main()
Une fois nos widgets en place, nous allons pouvoir les exploiter.
Création d’un QThread de gestion de la communication UDP
Afin de ne pas bloquer l’interface graphique lors de la réception ou l’émission de paquets, nous allons créer un QThread qui va permettre la gestion de la communication UDP
#Ethernet connection class EthModel(QThread): """Handle Ethernet connexion with remote device and connect to interface EthInterface""" debugSignal=pyqtSignal(str) #define debug signal def __init__(self): super(EthModel, self).__init__() self.quit_flag = False self.msgToEmit="" def run(self): while True: if not self.quit_flag: self.read() #time.sleep(1) else: self.close() break self.quit() #self.wait() def connec(self,addr='192.168.1.10',port=7): # Create a UDP/IP socket self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #worls with esp8266 udp client self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) self.sock.bind((addr,port)) # Listen for incoming connections #self.sock.listen(1) #stream print('starting up on {} port {}'.format(addr,port)) self.debugSignal.emit('starting up on {} port {}'.format(addr,port)) self.quit_flag = False self._isRunning=True def read(self): while self._isRunning: #data = self.sock.recv(1024) try: data, addr = self.sock.recvfrom(1024) print(data) except: print("socket closed") data=False if not data: print("no data > break loop") break #self.sock.sendto("received OK".encode('utf-8'),addr) if self.msgToEmit!="": self.sock.sendto(self.msgToEmit.encode('utf-8'),addr) self.msgToEmit="" #clear message self.debugSignal.emit(str(data)) def write(self,msg): self.msgToEmit=msg def close(self): self._isRunning=False self.sock.close()
Code Python client UDP
Pour tester votre interface, vous pouvez lancer un code sur le même ordinateur en utilisant l’adresse 127.0.0.1 (adresse locale). Le code suivant va créer un socket et va envoyer la valeur d’un compteur.
#!/usr/bin/env python # -*- coding: utf-8 -*- import socket import time HOST = "127.0.0.1" PORT = 8888 bufferSize = 1024 counter=0 while True: # Create a UDP socket at client side with socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM) as client: # Send to server using created UDP socket client.sendto(str.encode(str(counter)), (HOST,PORT)) data,addr= client.recvfrom(bufferSize) msg = "Message from Server {}".format(data) print(msg) counter+=1 time.sleep(0.1)
Code ESP8266 client UDP
Le code ESP8266 pour la communication UDP est assez simple, nous définissons la communication réseau avec le protocole UDP et nous renvoyons la valeur donnée par la fonction millis()
#include <ESP8266WiFi.h> #include <WiFiUdp.h> WiFiUDP udp; char packetBuffer[256]; unsigned int localPort = 9999; const char *ssid = "******"; const char *password = "******"; // Set your Static IP address IPAddress local_IP(192, 168, 1, 80); IPAddress gateway(192, 168, 1, 1); IPAddress subnet(255, 255, 0, 0); // Set destination IP address const char *destaddr = "192.168.1.67"; unsigned int destPort = 8888; void setup() { Serial.begin(115200); WiFi.config(local_IP, gateway, subnet); WiFi.begin(ssid, password); while (WiFi.status() != WL_CONNECTED) { delay(500); Serial.print("."); } udp.begin(localPort); Serial.print(F("UDP Client : ")); Serial.println(WiFi.localIP()); } void loop() { int packetSize = udp.parsePacket(); Serial.print(" Received packet from : "); Serial.println(udp.remoteIP()); Serial.print(" Size : "); Serial.println(packetSize); Serial.print(" Data : "); packetBuffer[0] = '0'; //reset buffer if (packetSize) { int len = udp.read(packetBuffer, 256); Serial.print(packetBuffer); udp.flush(); } Serial.println("\n"); delay(500); Serial.print("[Client Connected] "); Serial.println(WiFi.localIP()); udp.beginPacket(destaddr, destPort); udp.write("Send millis: "); char buf[20]; unsigned long testID = millis(); sprintf(buf, "%lu", testID); Serial.print(" Sent : "); Serial.println(buf); udp.write(buf); udp.write("\r\n"); udp.endPacket(); }
N.B.: Pour ce projet, nous utilisons un esp8266 mais vous pouvez adapter le code pour n’importe quel appareil utilisant le protocole UDP
Résultats
Pour utiliser le code client Python, entrez l’adresse IP 127.0.0.1
Pour tester la communication avec l’ESP8266, utilisez l’adresse IP de votre ordinateur (ici, 192.168.1.67)
Nous avons créer un moniteur de communication UDP à l’aide de Python qui peut permettre de s’interfacer avec des appareils distants comme ESP8266, ESP32, Raspberry Pi ou autres ordinateurs. Vous pouvez à présent améliorer l’interface pour qu’elle corresponde à votre besoin
Code complet
#!/usr/bin/python3 # -*-coding:Utf-8 -* """ Created on Thu Nov 17 16:59:13 2022 @author: X.Wiedmer AC windows Define the application window """ import sys,os #from PyQt5.QtWidgets import * #from PyQt5.QtCore import * #from PyQt5.QtGui import * from PySide2.QtWidgets import * from PySide2.QtCore import * from PySide2.QtGui import * pyqtSignal=Signal #translate pyqt to Pyside import socket import time """ App configuration """ __title__="ACTerminal" __version__="v0.1" HOST = '192.168.1.67' PORT = 8888 #Ethernet connection class EthModel(QThread): """Handle Ethernet connexion with remote device and connect to interface EthInterface""" debugSignal=pyqtSignal(str) #define debug signal def __init__(self): super(EthModel, self).__init__() self.quit_flag = False self.msgToEmit="" def run(self): while True: if not self.quit_flag: self.read() #time.sleep(1) else: self.close() break self.quit() self.exit() #self.wait() def connec(self,addr='192.168.1.10',port=7): # Create a UDP/IP socket self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #worls with esp8266 udp client self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) self.sock.bind((addr,port)) # Listen for incoming connections #self.sock.listen(1) #stream print('starting up on {} port {}'.format(addr,port)) self.debugSignal.emit('starting up on {} port {}'.format(addr,port)) self.quit_flag = False self._isRunning=True def read(self): while self._isRunning: #data = self.sock.recv(1024) try: data, addr = self.sock.recvfrom(1024) print(data) except: print("socket closed") data=False if not data: print("no data > break loop") break #self.sock.sendto("received OK".encode('utf-8'),addr) if self.msgToEmit!="": self.sock.sendto(self.msgToEmit.encode('utf-8'),addr) self.msgToEmit="" #clear message self.debugSignal.emit(str(data)) def write(self,msg): self.msgToEmit=msg def close(self): self._isRunning=False self.sock.close() #define GUI class EthernetInterface(QFrame): debugSignal=pyqtSignal(str) #define debug signal def __init__(self,parent=None): super(EthernetInterface,self).__init__(parent) self.grid=QGridLayout() self.setLayout(self.grid) self.defineWidgets() self.model=EthModel()#self.model=None self.model.debugSignal.connect(self.read) def defineWidgets(self): #self.setStyleSheet("""QGroupBox{background-color:white;border: 1px solid green;border-radius: 4px;} #QGroupBox::title {padding:1 5px;}""") #grooupbox widget container self.grp=QGroupBox(self) self.grp.setTitle("Connection Configuration") self.fields=QGridLayout() self.grp.setLayout(self.fields) self.grid.addWidget(self.grp,0,0) #Define widget UI #validator ipRange = "(?:[0-1]?[0-9]?[0-9]|2[0-4][0-9]|25[0-5])" # Part of the regular expression # Regulare expression ipRegex = QRegExp("^" + ipRange + "\\." + ipRange + "\\." + ipRange + "\\." + ipRange + "$") ipValidator = QRegExpValidator(ipRegex, self) #label self.selectlbl = QLabel("IP Address:") self.typeBox = QLineEdit(HOST) #self.typeBox.setInputMask("0.0.0.0"); self.typeBox.setValidator(ipValidator); self.baudlbl = QLabel("Port:") self.baudBox = QLineEdit("{}".format(PORT)) #btn self.button = QPushButton("Connect") self.button.clicked.connect(self.clicked) self.button.clicked.connect(self.connec) sendBtn = QPushButton("send") sendBtn.clicked.connect(self.clicked) sendBtn.clicked.connect(self.send) titlelbl= QLabel("Enter") self.edit = QLineEdit("") sentlbl=QLabel("Sent") self.sent = QTextEdit("") desclbl=QLabel("Console") self.desc = QTextEdit("") #row, column[, rowSpan=1[, columnSpan=1[ self.fields.addWidget(self.selectlbl,0,0,1,1) self.fields.addWidget(self.typeBox,0,1,1,1) self.fields.addWidget(self.baudlbl,0,2,1,1) self.fields.addWidget(self.baudBox,0,3,1,1) self.fields.addWidget(self.button,0,4,1,1) self.fields.addWidget(titlelbl,1,0,1,1) self.fields.addWidget(self.edit,1,1,1,3) self.fields.addWidget(sendBtn,1,4,1,1) self.fields.addWidget(sentlbl,2,0,1,1,Qt.AlignTop)#Qt.AlignmentFlag.AlignTop) self.fields.addWidget(self.sent,2,1,1,3) self.fields.addWidget(desclbl,3,0,1,1,Qt.AlignTop)#Qt.AlignmentFlag.AlignTop) self.fields.addWidget(self.desc,3,1,1,3) def debug(self,msg): sender = self.sender() self.debugSignal.emit(sender.__class__.__name__+" : "+msg) def clicked(self): sender = self.sender() if sender.__class__.__name__=="QPushButton": self.debugSignal.emit(sender.text()+ " clicked") if sender.__class__.__name__=="QComboBox": self.debugSignal.emit(sender.currentText()+ " selected") def connec(self): #self.desc.setText("") self.desc.clear() if self.model is not None: if self.button.text() == "Connect": self.desc.setText(">> trying to connect to address {} on port {} ...\n".format(self.typeBox.text(),self.baudBox.text())) print("Started") self.button.setText("Stop") self.model.connec(self.typeBox.text(),int(self.baudBox.text())) self.model.start() else: self.model.quit_flag = True print("Stop sent") self.model.close()#self.model.wait() print("Stopped") self.button.setText("Connect") self.desc.setText(">> deconnect address {} on port {} ...".format(self.typeBox.text(),self.baudBox.text())) def read(self,msg): self.desc.setText(self.desc.toPlainText()+msg+"\n") self.desc.verticalScrollBar().setValue(self.desc.verticalScrollBar().maximum()); def send(self): if self.edit.text() != "": self.sent.setText(self.sent.toPlainText()+self.edit.text()+"\n") if self.model is not None: self.model.write(self.edit.text()) # Generic app container class AcApp(QMainWindow): def __init__(self,title='AcApp',mainFrame=QFrame): super().__init__() self.title=title self.mainFrame=mainFrame() self.initUI() def initUI(self): self.setCentralWidget(self.mainFrame) #connect signals self.mainFrame.debugSignal.connect(self.debugMsg) #General configuration #self.resize(self.width, self.height) self.setWindowTitle(self.title) self.setGeometry(300, 300, 850, 450) #self.setWindowIcon(QIcon(__icon__)) #Debug bar self.statusBar() self.statusBar().showMessage('Display debug messages') self.show() def debugMsg(self,val): self.statusBar().showMessage(val) def main(): app = QApplication(sys.argv) app.setQuitOnLastWindowClosed(True) #app.setStyleSheet(open("style.txt").read()); #set style according to file #ex = AcApp(__title__) ex = AcApp(__title__,EthernetInterface) app.quit() sys.exit(app.exec_()) if __name__ == '__main__': main()