En este proyecto, vamos a crear un monitor de comunicación de red UDP usando Python (PyQt). Cuando desarrolles un proyecto con Arduino, Raspberry Pi o cualquier otro microcontrolador, seguramente necesitarás crear una interfaz gráfica para gestionar el sistema (depurar, observar medidas, lanzar acciones, etc.). Hay muchas herramientas disponibles para crear interfaces gráficas. En este proyecto, vamos a crear un monitor de comunicación de red utilizando PyQt(PySide2).
Objetivo
Para este proyecto, queremos crear una interfaz gráfica en Python para Windows que se comporte como un monitor de comunicaciones UDP. Para ello, necesitaremos realizar las siguientes funciones
- Casilla de introducción de la dirección IP
- Zona portuaria de entrada
- Botón de conexión
- Zona de escritura por encargo
- Botón Enviar
- Consola que muestra los datos recibidos
Recuperar la dirección IP de la máquina
En la máquina receptora, recupere la dirección IPv4 (en nuestro caso 192.168.1.67)
ipconfig #sur la machine windows
o
ip addr #sur machine linux
Aplicación de monitorización de comunicaciones UDP
En primer lugar, vamos a crear la ventana de la aplicación, que llamaremos AcApp y que será la base de nuestra aplicación.
#!/usr/bin/python3 # -*-coding:Utf-8 -* import sys,os #from PyQt5.QtWidgets import * #from PyQt5.QtCore import * #from PyQt5.QtGui import * from PySide2.QtWidgets import * from PySide2.QtCore import * from PySide2.QtGui import * pyqtSignal=Signal #translate pyqt to Pyside class AcApp(QMainWindow): def __init__(self,title='AcApp',mainFrame=QFrame): super().__init__() self.title=title self.mainFrame=mainFrame() self.initUI() def initUI(self): self.setCentralWidget(self.mainFrame) #connect signals self.mainFrame.debugSignal.connect(self.debugMsg) #General configuration #self.resize(self.width, self.height) self.setWindowTitle(self.title) self.setGeometry(300, 300, 850, 450) #self.setWindowIcon(QIcon(__icon__)) #Debug bar self.statusBar() self.statusBar().showMessage('Display debug messages') self.show() def debugMsg(self,val): self.statusBar().showMessage(val) def main(): app = QApplication(sys.argv) app.setQuitOnLastWindowClosed(True) ex = AcApp(__title__) #ex = AcApp(__title__,EthernetInterface) app.quit() sys.exit(app.exec_()) if __name__ == '__main__': main()
Creación de widgets de gestión de comunicaciones UDP
Crearemos entonces una clase que contenga todos los Widgets que necesitamos para crear y gestionar el monitor de comunicación UDP (QLineEdit,QTextEdit,QButton, etc.). Este Widget se inicializará con la clase que gestionará la comunicación
class EthernetInterface(QFrame): debugSignal=pyqtSignal(str) #define debug signal def __init__(self,parent=None): super(EthernetInterface,self).__init__(parent) self.grid=QGridLayot() self.setLayot(self.grid) self.defineWidgets() self.model=None if self.model is not None: self.model.debugSignal.connect(self.read) def defineWidgets(self): #self.setStyleSheet("""QGropBox{backgrond-color:white;border: 1px solid green;border-radius: 4px;} #QGropBox::title {padding:1 5px;}""") #groopbox widget container self.grp=QGropBox(self) self.grp.setTitle("Connection Configuration") self.fields=QGridLayot() self.grp.setLayot(self.fields) self.grid.addWidget(self.grp,0,0) #Define widget UI #validator ipRange = "(?:[0-1]?[0-9]?[0-9]|2[0-4][0-9]|25[0-5])" # Part of the regular expression # Regulare expression ipRegex = QRegExp("^" + ipRange + "\\." + ipRange + "\\." + ipRange + "\\." + ipRange + "$") ipValidator = QRegExpValidator(ipRegex, self) #label self.selectlbl = QLabel("IP Address:") self.typeBox = QLineEdit(HOST) #self.typeBox.setInputMask("0.0.0.0"); self.typeBox.setValidator(ipValidator); self.baudlbl = QLabel("Port:") self.baudBox = QLineEdit("{}".format(PORT)) #btn self.button = QPushButton("Connect") self.button.clicked.connect(self.clicked) self.button.clicked.connect(self.connec) sendBtn = QPushButton("send") sendBtn.clicked.connect(self.clicked) sendBtn.clicked.connect(self.send) titlelbl= QLabel("Enter") self.edit = QLineEdit("") sentlbl=QLabel("Sent") self.sent = QTextEdit("") desclbl=QLabel("Console") self.desc = QTextEdit("") #row, column[, rowSpan=1[, columnSpan=1[ self.fields.addWidget(self.selectlbl,0,0,1,1) self.fields.addWidget(self.typeBox,0,1,1,1) self.fields.addWidget(self.baudlbl,0,2,1,1) self.fields.addWidget(self.baudBox,0,3,1,1) self.fields.addWidget(self.button,0,4,1,1) self.fields.addWidget(titlelbl,1,0,1,1) self.fields.addWidget(self.edit,1,1,1,3) self.fields.addWidget(sendBtn,1,4,1,1) self.fields.addWidget(sentlbl,2,0,1,1,Qt.AlignTop)#Qt.AlignmentFlag.AlignTop) self.fields.addWidget(self.sent,2,1,1,3) self.fields.addWidget(desclbl,3,0,1,1,Qt.AlignTop)#Qt.AlignmentFlag.AlignTop) self.fields.addWidget(self.desc,3,1,1,3) def debug(self,msg): sender = self.sender() self.debugSignal.emit(sender.__class__.__name__+" : "+msg) def clicked(self): sender = self.sender() if sender.__class__.__name__=="QPushButton": self.debugSignal.emit(sender.text()+ " clicked") if sender.__class__.__name__=="QComboBox": self.debugSignal.emit(sender.currentText()+ " selected") def connec(self): #self.desc.setText("") self.desc.clear() if self.model is not None: if self.button.text() == "Connect": self.desc.setText(">> trying to connect to address {} on port {} ...".format(self.typeBox.text(),self.baudBox.text())) print("Started") self.button.setText("Stop") #self.model = EthModel() self.model.connec(self.typeBox.text(),int(self.baudBox.text())) self.model.start() else: self.model.quit_flag = True print("Stop sent") self.model.close()#self.model.wait() print("Stopped") self.button.setText("Connect") self.desc.setText(">> deconnect address {} on port {} ...".format(self.typeBox.text(),self.baudBox.text())) def read(self,msg): self.desc.setText(self.desc.toPlainText()+msg+"\n") self.desc.verticalScrollBar().setValue(self.desc.verticalScrollBar().maximum()); def send(self): if self.edit.text() != "": self.sent.setText(self.sent.toPlainText()+self.edit.text()+"\n") if self.model is not None: self.model.write(self.edit.text())
Sustituya ex = AcApp(__title__) por ex = AcApp(__title__,EthernetInterface) en la función main()
Una vez colocados nuestros widgets, vamos a empezar a utilizarlos.
Creación de un QThread para gestionar la comunicación UDP
Para no bloquear la interfaz gráfica cuando se reciben o envían paquetes, vamos a crear un QThread para gestionar la comunicación UDP.
#Ethernet connection class EthModel(QThread): """Handle Ethernet connexion with remote device and connect to interface EthInterface""" debugSignal=pyqtSignal(str) #define debug signal def __init__(self): super(EthModel, self).__init__() self.quit_flag = False self.msgToEmit="" def run(self): while True: if not self.quit_flag: self.read() #time.sleep(1) else: self.close() break self.quit() #self.wait() def connec(self,addr='192.168.1.10',port=7): # Create a UDP/IP socket self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #worls with esp8266 udp client self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) self.sock.bind((addr,port)) # Listen for incoming connections #self.sock.listen(1) #stream print('starting up on {} port {}'.format(addr,port)) self.debugSignal.emit('starting up on {} port {}'.format(addr,port)) self.quit_flag = False self._isRunning=True def read(self): while self._isRunning: #data = self.sock.recv(1024) try: data, addr = self.sock.recvfrom(1024) print(data) except: print("socket closed") data=False if not data: print("no data > break loop") break #self.sock.sendto("received OK".encode('utf-8'),addr) if self.msgToEmit!="": self.sock.sendto(self.msgToEmit.encode('utf-8'),addr) self.msgToEmit="" #clear message self.debugSignal.emit(str(data)) def write(self,msg): self.msgToEmit=msg def close(self): self._isRunning=False self.sock.close()
Cliente UDP Código Python
Para probar su interfaz, puede ejecutar un código en el mismo ordenador utilizando la dirección 127.0.0.1 (dirección local). El siguiente código creará un socket y enviará el valor de un contador.
#!/usr/bin/env python # -*- coding: utf-8 -*- import socket import time HOST = "127.0.0.1" PORT = 8888 bufferSize = 1024 conter=0 while True: # Create a UDP socket at client side with socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM) as client: # Send to server using created UDP socket client.sendto(str.encode(str(conter)), (HOST,PORT)) data,addr= client.recvfrom(bufferSize) msg = "Message from Server {}".format(data) print(msg) conter+=1 time.sleep(0.1)
Código ESP8266 Cliente UDP
El código del ESP8266 para la comunicación UDP es bastante sencillo. Definimos la comunicación de red utilizando el protocolo UDP y devolvemos el valor dado por la función millis().
#include <ESP8266WiFi.h> #include <WiFiUdp.h> WiFiUDP udp; char packetBuffer[256]; unsigned int localPort = 9999; const char *ssid = "******"; const char *password = "******"; // Set yor Static IP address IPAddress local_IP(192, 168, 1, 80); IPAddress gateway(192, 168, 1, 1); IPAddress subnet(255, 255, 0, 0); // Set destination IP address const char *destaddr = "192.168.1.67"; unsigned int destPort = 8888; void setup() { Serial.begin(115200); WiFi.config(local_IP, gateway, subnet); WiFi.begin(ssid, password); while (WiFi.status() != WL_CONNECTED) { delay(500); Serial.print("."); } udp.begin(localPort); Serial.print(F("UDP Client : ")); Serial.println(WiFi.localIP()); } void loop() { int packetSize = udp.parsePacket(); Serial.print(" Received packet from : "); Serial.println(udp.remoteIP()); Serial.print(" Size : "); Serial.println(packetSize); Serial.print(" Data : "); packetBuffer[0] = '0'; //reset buffer if (packetSize) { int len = udp.read(packetBuffer, 256); Serial.print(packetBuffer); udp.flush(); } Serial.println("\n"); delay(500); Serial.print("[Client Connected] "); Serial.println(WiFi.localIP()); udp.beginPacket(destaddr, destPort); udp.write("Send millis: "); char buf[20]; unsigned long testID = millis(); sprintf(buf, "%lu", testID); Serial.print(" Sent : "); Serial.println(buf); udp.write(buf); udp.write("\r\n"); udp.endPacket(); }
N.B.: Para este proyecto, estamos utilizando un esp8266, pero puedes adaptar el código a cualquier dispositivo que utilice el protocolo UDP.
Resultados
Para utilizar el código de cliente Python, introduzca la dirección IP 127.0.0.1
Para probar la comunicación con el ESP8266, utilice la dirección IP de su ordenador (aquí, 192.168.1.67).
Hemos creado un monitor de comunicación UDP usando Python que puede interactuar con dispositivos remotos como ESP8266, ESP32, Raspberry Pi u otros ordenadores. Ahora puedes mejorar la interfaz para adaptarla a tus necesidades.
Código completo
#!/usr/bin/python3 # -*-coding:Utf-8 -* """ Created on Thu Nov 17 16:59:13 2022 @author: X.Wiedmer AC windows Define the application window """ import sys,os #from PyQt5.QtWidgets import * #from PyQt5.QtCore import * #from PyQt5.QtGui import * from PySide2.QtWidgets import * from PySide2.QtCore import * from PySide2.QtGui import * pyqtSignal=Signal #translate pyqt to Pyside import socket import time """ App configuration """ __title__="ACTerminal" __version__="v0.1" HOST = '192.168.1.67' PORT = 8888 #Ethernet connection class EthModel(QThread): """Handle Ethernet connexion with remote device and connect to interface EthInterface""" debugSignal=pyqtSignal(str) #define debug signal def __init__(self): super(EthModel, self).__init__() self.quit_flag = False self.msgToEmit="" def run(self): while True: if not self.quit_flag: self.read() #time.sleep(1) else: self.close() break self.quit() self.exit() #self.wait() def connec(self,addr='192.168.1.10',port=7): # Create a UDP/IP socket self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #worls with esp8266 udp client self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) self.sock.bind((addr,port)) # Listen for incoming connections #self.sock.listen(1) #stream print('starting up on {} port {}'.format(addr,port)) self.debugSignal.emit('starting up on {} port {}'.format(addr,port)) self.quit_flag = False self._isRunning=True def read(self): while self._isRunning: #data = self.sock.recv(1024) try: data, addr = self.sock.recvfrom(1024) print(data) except: print("socket closed") data=False if not data: print("no data > break loop") break #self.sock.sendto("received OK".encode('utf-8'),addr) if self.msgToEmit!="": self.sock.sendto(self.msgToEmit.encode('utf-8'),addr) self.msgToEmit="" #clear message self.debugSignal.emit(str(data)) def write(self,msg): self.msgToEmit=msg def close(self): self._isRunning=False self.sock.close() #define GUI class EthernetInterface(QFrame): debugSignal=pyqtSignal(str) #define debug signal def __init__(self,parent=None): super(EthernetInterface,self).__init__(parent) self.grid=QGridLayot() self.setLayot(self.grid) self.defineWidgets() self.model=EthModel()#self.model=None self.model.debugSignal.connect(self.read) def defineWidgets(self): #self.setStyleSheet("""QGropBox{backgrond-color:white;border: 1px solid green;border-radius: 4px;} #QGropBox::title {padding:1 5px;}""") #groopbox widget container self.grp=QGropBox(self) self.grp.setTitle("Connection Configuration") self.fields=QGridLayot() self.grp.setLayot(self.fields) self.grid.addWidget(self.grp,0,0) #Define widget UI #validator ipRange = "(?:[0-1]?[0-9]?[0-9]|2[0-4][0-9]|25[0-5])" # Part of the regular expression # Regulare expression ipRegex = QRegExp("^" + ipRange + "\\." + ipRange + "\\." + ipRange + "\\." + ipRange + "$") ipValidator = QRegExpValidator(ipRegex, self) #label self.selectlbl = QLabel("IP Address:") self.typeBox = QLineEdit(HOST) #self.typeBox.setInputMask("0.0.0.0"); self.typeBox.setValidator(ipValidator); self.baudlbl = QLabel("Port:") self.baudBox = QLineEdit("{}".format(PORT)) #btn self.button = QPushButton("Connect") self.button.clicked.connect(self.clicked) self.button.clicked.connect(self.connec) sendBtn = QPushButton("send") sendBtn.clicked.connect(self.clicked) sendBtn.clicked.connect(self.send) titlelbl= QLabel("Enter") self.edit = QLineEdit("") sentlbl=QLabel("Sent") self.sent = QTextEdit("") desclbl=QLabel("Console") self.desc = QTextEdit("") #row, column[, rowSpan=1[, columnSpan=1[ self.fields.addWidget(self.selectlbl,0,0,1,1) self.fields.addWidget(self.typeBox,0,1,1,1) self.fields.addWidget(self.baudlbl,0,2,1,1) self.fields.addWidget(self.baudBox,0,3,1,1) self.fields.addWidget(self.button,0,4,1,1) self.fields.addWidget(titlelbl,1,0,1,1) self.fields.addWidget(self.edit,1,1,1,3) self.fields.addWidget(sendBtn,1,4,1,1) self.fields.addWidget(sentlbl,2,0,1,1,Qt.AlignTop)#Qt.AlignmentFlag.AlignTop) self.fields.addWidget(self.sent,2,1,1,3) self.fields.addWidget(desclbl,3,0,1,1,Qt.AlignTop)#Qt.AlignmentFlag.AlignTop) self.fields.addWidget(self.desc,3,1,1,3) def debug(self,msg): sender = self.sender() self.debugSignal.emit(sender.__class__.__name__+" : "+msg) def clicked(self): sender = self.sender() if sender.__class__.__name__=="QPushButton": self.debugSignal.emit(sender.text()+ " clicked") if sender.__class__.__name__=="QComboBox": self.debugSignal.emit(sender.currentText()+ " selected") def connec(self): #self.desc.setText("") self.desc.clear() if self.model is not None: if self.button.text() == "Connect": self.desc.setText(">> trying to connect to address {} on port {} ...\n".format(self.typeBox.text(),self.baudBox.text())) print("Started") self.button.setText("Stop") self.model.connec(self.typeBox.text(),int(self.baudBox.text())) self.model.start() else: self.model.quit_flag = True print("Stop sent") self.model.close()#self.model.wait() print("Stopped") self.button.setText("Connect") self.desc.setText(">> deconnect address {} on port {} ...".format(self.typeBox.text(),self.baudBox.text())) def read(self,msg): self.desc.setText(self.desc.toPlainText()+msg+"\n") self.desc.verticalScrollBar().setValue(self.desc.verticalScrollBar().maximum()); def send(self): if self.edit.text() != "": self.sent.setText(self.sent.toPlainText()+self.edit.text()+"\n") if self.model is not None: self.model.write(self.edit.text()) # Generic app container class AcApp(QMainWindow): def __init__(self,title='AcApp',mainFrame=QFrame): super().__init__() self.title=title self.mainFrame=mainFrame() self.initUI() def initUI(self): self.setCentralWidget(self.mainFrame) #connect signals self.mainFrame.debugSignal.connect(self.debugMsg) #General configuration #self.resize(self.width, self.height) self.setWindowTitle(self.title) self.setGeometry(300, 300, 850, 450) #self.setWindowIcon(QIcon(__icon__)) #Debug bar self.statusBar() self.statusBar().showMessage('Display debug messages') self.show() def debugMsg(self,val): self.statusBar().showMessage(val) def main(): app = QApplication(sys.argv) app.setQuitOnLastWindowClosed(True) #app.setStyleSheet(open("style.txt").read()); #set style according to file #ex = AcApp(__title__) ex = AcApp(__title__,EthernetInterface) app.quit() sys.exit(app.exec_()) if __name__ == '__main__': main()