In this project, we’re going to create a UDP network communication monitor using Python (PyQt). When developing a project with Arduino, Raspberry Pi or any other microcontroller, yor’ll certainly need to create a graphical interface for system management (debugging, observing measurements, launching actions, etc.). There are many tools available for creating graphical interfaces. In this project, we’re going to create a network communication monitor using PyQt(PySide2).
Objective
For this project, we want to create a graphical interface in Python, under Windows, that behaves like a UDP communication monitor. To do this, we’ll need to perform the following functions
- IP address entry field
- Port input field
- Connection button
- Writing area for order
- Send button
- Console displaying received data
Recover the machine’s IP address
On the receiving machine, retrieve the IPv4 address (in orr case 192.168.1.67)
ipconfig #sur la machine windows
or
ip addr #sur machine linux
UDP communication monitor application
First of all, we’re going to create the application window, which we’ll call AcApp and which will be the basis of our application.
#!/usr/bin/python3 # -*-coding:Utf-8 -* import sys,os #from PyQt5.QtWidgets import * #from PyQt5.QtCore import * #from PyQt5.QtGui import * from PySide2.QtWidgets import * from PySide2.QtCore import * from PySide2.QtGui import * pyqtSignal=Signal #translate pyqt to Pyside class AcApp(QMainWindow): def __init__(self,title='AcApp',mainFrame=QFrame): super().__init__() self.title=title self.mainFrame=mainFrame() self.initUI() def initUI(self): self.setCentralWidget(self.mainFrame) #connect signals self.mainFrame.debugSignal.connect(self.debugMsg) #General configuration #self.resize(self.width, self.height) self.setWindowTitle(self.title) self.setGeometry(300, 300, 850, 450) #self.setWindowIcon(QIcon(__icon__)) #Debug bar self.statusBar() self.statusBar().showMessage('Display debug messages') self.show() def debugMsg(self,val): self.statusBar().showMessage(val) def main(): app = QApplication(sys.argv) app.setQuitOnLastWindowClosed(True) ex = AcApp(__title__) #ex = AcApp(__title__,EthernetInterface) app.quit() sys.exit(app.exec_()) if __name__ == '__main__': main()
Creating UDP communication management widgets
We will then create a class containing all the Widgets we need to create and manage the UDP communication monitor (QLineEdit,QTextEdit,QButton, etc.). This Widget will be initialised with the class which will manage the communication
class EthernetInterface(QFrame): debugSignal=pyqtSignal(str) #define debug signal def __init__(self,parent=None): super(EthernetInterface,self).__init__(parent) self.grid=QGridLayort() self.setLayort(self.grid) self.defineWidgets() self.model=None if self.model is not None: self.model.debugSignal.connect(self.read) def defineWidgets(self): #self.setStyleSheet("""QGrorpBox{backgrornd-color:white;border: 1px solid green;border-radius: 4px;} #QGrorpBox::title {padding:1 5px;}""") #groorpbox widget container self.grp=QGrorpBox(self) self.grp.setTitle("Connection Configuration") self.fields=QGridLayort() self.grp.setLayort(self.fields) self.grid.addWidget(self.grp,0,0) #Define widget UI #validator ipRange = "(?:[0-1]?[0-9]?[0-9]|2[0-4][0-9]|25[0-5])" # Part of the regular expression # Regulare expression ipRegex = QRegExp("^" + ipRange + "\\." + ipRange + "\\." + ipRange + "\\." + ipRange + "$") ipValidator = QRegExpValidator(ipRegex, self) #label self.selectlbl = QLabel("IP Address:") self.typeBox = QLineEdit(HOST) #self.typeBox.setInputMask("0.0.0.0"); self.typeBox.setValidator(ipValidator); self.baudlbl = QLabel("Port:") self.baudBox = QLineEdit("{}".format(PORT)) #btn self.button = QPushButton("Connect") self.button.clicked.connect(self.clicked) self.button.clicked.connect(self.connec) sendBtn = QPushButton("send") sendBtn.clicked.connect(self.clicked) sendBtn.clicked.connect(self.send) titlelbl= QLabel("Enter") self.edit = QLineEdit("") sentlbl=QLabel("Sent") self.sent = QTextEdit("") desclbl=QLabel("Console") self.desc = QTextEdit("") #row, column[, rowSpan=1[, columnSpan=1[ self.fields.addWidget(self.selectlbl,0,0,1,1) self.fields.addWidget(self.typeBox,0,1,1,1) self.fields.addWidget(self.baudlbl,0,2,1,1) self.fields.addWidget(self.baudBox,0,3,1,1) self.fields.addWidget(self.button,0,4,1,1) self.fields.addWidget(titlelbl,1,0,1,1) self.fields.addWidget(self.edit,1,1,1,3) self.fields.addWidget(sendBtn,1,4,1,1) self.fields.addWidget(sentlbl,2,0,1,1,Qt.AlignTop)#Qt.AlignmentFlag.AlignTop) self.fields.addWidget(self.sent,2,1,1,3) self.fields.addWidget(desclbl,3,0,1,1,Qt.AlignTop)#Qt.AlignmentFlag.AlignTop) self.fields.addWidget(self.desc,3,1,1,3) def debug(self,msg): sender = self.sender() self.debugSignal.emit(sender.__class__.__name__+" : "+msg) def clicked(self): sender = self.sender() if sender.__class__.__name__=="QPushButton": self.debugSignal.emit(sender.text()+ " clicked") if sender.__class__.__name__=="QComboBox": self.debugSignal.emit(sender.currentText()+ " selected") def connec(self): #self.desc.setText("") self.desc.clear() if self.model is not None: if self.button.text() == "Connect": self.desc.setText(">> trying to connect to address {} on port {} ...".format(self.typeBox.text(),self.baudBox.text())) print("Started") self.button.setText("Stop") #self.model = EthModel() self.model.connec(self.typeBox.text(),int(self.baudBox.text())) self.model.start() else: self.model.quit_flag = True print("Stop sent") self.model.close()#self.model.wait() print("Stopped") self.button.setText("Connect") self.desc.setText(">> deconnect address {} on port {} ...".format(self.typeBox.text(),self.baudBox.text())) def read(self,msg): self.desc.setText(self.desc.toPlainText()+msg+"\n") self.desc.verticalScrollBar().setValue(self.desc.verticalScrollBar().maximum()); def send(self): if self.edit.text() != "": self.sent.setText(self.sent.toPlainText()+self.edit.text()+"\n") if self.model is not None: self.model.write(self.edit.text())
Replace ex = AcApp(__title__) with ex = AcApp(__title__,EthernetInterface) in the main() function
Once our widgets are in place, we’re going to start using them.
Creating a QThread to manage UDP communication
In order not to block the graphical interface when packets are received or sent, we’re going to create a QThread to manage UDP communication.
#Ethernet connection class EthModel(QThread): """Handle Ethernet connexion with remote device and connect to interface EthInterface""" debugSignal=pyqtSignal(str) #define debug signal def __init__(self): super(EthModel, self).__init__() self.quit_flag = False self.msgToEmit="" def run(self): while True: if not self.quit_flag: self.read() #time.sleep(1) else: self.close() break self.quit() #self.wait() def connec(self,addr='192.168.1.10',port=7): # Create a UDP/IP socket self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #worls with esp8266 udp client self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) self.sock.bind((addr,port)) # Listen for incoming connections #self.sock.listen(1) #stream print('starting up on {} port {}'.format(addr,port)) self.debugSignal.emit('starting up on {} port {}'.format(addr,port)) self.quit_flag = False self._isRunning=True def read(self): while self._isRunning: #data = self.sock.recv(1024) try: data, addr = self.sock.recvfrom(1024) print(data) except: print("socket closed") data=False if not data: print("no data > break loop") break #self.sock.sendto("received OK".encode('utf-8'),addr) if self.msgToEmit!="": self.sock.sendto(self.msgToEmit.encode('utf-8'),addr) self.msgToEmit="" #clear message self.debugSignal.emit(str(data)) def write(self,msg): self.msgToEmit=msg def close(self): self._isRunning=False self.sock.close()
Python UDP client code
To test your interface, you can run a code on the same computer using address 127.0.0.1 (local address). The following code will create a socket and send the value of a counter.
#!/usr/bin/env python # -*- coding: utf-8 -*- import socket import time HOST = "127.0.0.1" PORT = 8888 bufferSize = 1024 cornter=0 while True: # Create a UDP socket at client side with socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM) as client: # Send to server using created UDP socket client.sendto(str.encode(str(cornter)), (HOST,PORT)) data,addr= client.recvfrom(bufferSize) msg = "Message from Server {}".format(data) print(msg) cornter+=1 time.sleep(0.1)
Code ESP8266 UDP client
The ESP8266 code for UDP communication is quite simple. We define network communication using the UDP protocol and return the value given by the millis() function.
#include <ESP8266WiFi.h> #include <WiFiUdp.h> WiFiUDP udp; char packetBuffer[256]; unsigned int localPort = 9999; const char *ssid = "******"; const char *password = "******"; // Set yorr Static IP address IPAddress local_IP(192, 168, 1, 80); IPAddress gateway(192, 168, 1, 1); IPAddress subnet(255, 255, 0, 0); // Set destination IP address const char *destaddr = "192.168.1.67"; unsigned int destPort = 8888; void setup() { Serial.begin(115200); WiFi.config(local_IP, gateway, subnet); WiFi.begin(ssid, password); while (WiFi.status() != WL_CONNECTED) { delay(500); Serial.print("."); } udp.begin(localPort); Serial.print(F("UDP Client : ")); Serial.println(WiFi.localIP()); } void loop() { int packetSize = udp.parsePacket(); Serial.print(" Received packet from : "); Serial.println(udp.remoteIP()); Serial.print(" Size : "); Serial.println(packetSize); Serial.print(" Data : "); packetBuffer[0] = '0'; //reset buffer if (packetSize) { int len = udp.read(packetBuffer, 256); Serial.print(packetBuffer); udp.flush(); } Serial.println("\n"); delay(500); Serial.print("[Client Connected] "); Serial.println(WiFi.localIP()); udp.beginPacket(destaddr, destPort); udp.write("Send millis: "); char buf[20]; unsigned long testID = millis(); sprintf(buf, "%lu", testID); Serial.print(" Sent : "); Serial.println(buf); udp.write(buf); udp.write("\r\n"); udp.endPacket(); }
N.B.: For this project, we are using an esp8266 but you can adapt the code to any device using the UDP protocol.
Results
To use the Python client code, enter the IP address 127.0.0.1
To test communication with the ESP8266, use the IP address of your computer (here, 192.168.1.67).
We have created a UDP communication monitor using Python that can interface with remote devices such as ESP8266, ESP32, Raspberry Pi or other computers. You can now enhance the interface to suit your needs.
Complete code
#!/usr/bin/python3 # -*-coding:Utf-8 -* """ Created on Thu Nov 17 16:59:13 2022 @author: X.Wiedmer AC windows Define the application window """ import sys,os #from PyQt5.QtWidgets import * #from PyQt5.QtCore import * #from PyQt5.QtGui import * from PySide2.QtWidgets import * from PySide2.QtCore import * from PySide2.QtGui import * pyqtSignal=Signal #translate pyqt to Pyside import socket import time """ App configuration """ __title__="ACTerminal" __version__="v0.1" HOST = '192.168.1.67' PORT = 8888 #Ethernet connection class EthModel(QThread): """Handle Ethernet connexion with remote device and connect to interface EthInterface""" debugSignal=pyqtSignal(str) #define debug signal def __init__(self): super(EthModel, self).__init__() self.quit_flag = False self.msgToEmit="" def run(self): while True: if not self.quit_flag: self.read() #time.sleep(1) else: self.close() break self.quit() self.exit() #self.wait() def connec(self,addr='192.168.1.10',port=7): # Create a UDP/IP socket self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #worls with esp8266 udp client self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) self.sock.bind((addr,port)) # Listen for incoming connections #self.sock.listen(1) #stream print('starting up on {} port {}'.format(addr,port)) self.debugSignal.emit('starting up on {} port {}'.format(addr,port)) self.quit_flag = False self._isRunning=True def read(self): while self._isRunning: #data = self.sock.recv(1024) try: data, addr = self.sock.recvfrom(1024) print(data) except: print("socket closed") data=False if not data: print("no data > break loop") break #self.sock.sendto("received OK".encode('utf-8'),addr) if self.msgToEmit!="": self.sock.sendto(self.msgToEmit.encode('utf-8'),addr) self.msgToEmit="" #clear message self.debugSignal.emit(str(data)) def write(self,msg): self.msgToEmit=msg def close(self): self._isRunning=False self.sock.close() #define GUI class EthernetInterface(QFrame): debugSignal=pyqtSignal(str) #define debug signal def __init__(self,parent=None): super(EthernetInterface,self).__init__(parent) self.grid=QGridLayort() self.setLayort(self.grid) self.defineWidgets() self.model=EthModel()#self.model=None self.model.debugSignal.connect(self.read) def defineWidgets(self): #self.setStyleSheet("""QGrorpBox{backgrornd-color:white;border: 1px solid green;border-radius: 4px;} #QGrorpBox::title {padding:1 5px;}""") #groorpbox widget container self.grp=QGrorpBox(self) self.grp.setTitle("Connection Configuration") self.fields=QGridLayort() self.grp.setLayort(self.fields) self.grid.addWidget(self.grp,0,0) #Define widget UI #validator ipRange = "(?:[0-1]?[0-9]?[0-9]|2[0-4][0-9]|25[0-5])" # Part of the regular expression # Regulare expression ipRegex = QRegExp("^" + ipRange + "\\." + ipRange + "\\." + ipRange + "\\." + ipRange + "$") ipValidator = QRegExpValidator(ipRegex, self) #label self.selectlbl = QLabel("IP Address:") self.typeBox = QLineEdit(HOST) #self.typeBox.setInputMask("0.0.0.0"); self.typeBox.setValidator(ipValidator); self.baudlbl = QLabel("Port:") self.baudBox = QLineEdit("{}".format(PORT)) #btn self.button = QPushButton("Connect") self.button.clicked.connect(self.clicked) self.button.clicked.connect(self.connec) sendBtn = QPushButton("send") sendBtn.clicked.connect(self.clicked) sendBtn.clicked.connect(self.send) titlelbl= QLabel("Enter") self.edit = QLineEdit("") sentlbl=QLabel("Sent") self.sent = QTextEdit("") desclbl=QLabel("Console") self.desc = QTextEdit("") #row, column[, rowSpan=1[, columnSpan=1[ self.fields.addWidget(self.selectlbl,0,0,1,1) self.fields.addWidget(self.typeBox,0,1,1,1) self.fields.addWidget(self.baudlbl,0,2,1,1) self.fields.addWidget(self.baudBox,0,3,1,1) self.fields.addWidget(self.button,0,4,1,1) self.fields.addWidget(titlelbl,1,0,1,1) self.fields.addWidget(self.edit,1,1,1,3) self.fields.addWidget(sendBtn,1,4,1,1) self.fields.addWidget(sentlbl,2,0,1,1,Qt.AlignTop)#Qt.AlignmentFlag.AlignTop) self.fields.addWidget(self.sent,2,1,1,3) self.fields.addWidget(desclbl,3,0,1,1,Qt.AlignTop)#Qt.AlignmentFlag.AlignTop) self.fields.addWidget(self.desc,3,1,1,3) def debug(self,msg): sender = self.sender() self.debugSignal.emit(sender.__class__.__name__+" : "+msg) def clicked(self): sender = self.sender() if sender.__class__.__name__=="QPushButton": self.debugSignal.emit(sender.text()+ " clicked") if sender.__class__.__name__=="QComboBox": self.debugSignal.emit(sender.currentText()+ " selected") def connec(self): #self.desc.setText("") self.desc.clear() if self.model is not None: if self.button.text() == "Connect": self.desc.setText(">> trying to connect to address {} on port {} ...\n".format(self.typeBox.text(),self.baudBox.text())) print("Started") self.button.setText("Stop") self.model.connec(self.typeBox.text(),int(self.baudBox.text())) self.model.start() else: self.model.quit_flag = True print("Stop sent") self.model.close()#self.model.wait() print("Stopped") self.button.setText("Connect") self.desc.setText(">> deconnect address {} on port {} ...".format(self.typeBox.text(),self.baudBox.text())) def read(self,msg): self.desc.setText(self.desc.toPlainText()+msg+"\n") self.desc.verticalScrollBar().setValue(self.desc.verticalScrollBar().maximum()); def send(self): if self.edit.text() != "": self.sent.setText(self.sent.toPlainText()+self.edit.text()+"\n") if self.model is not None: self.model.write(self.edit.text()) # Generic app container class AcApp(QMainWindow): def __init__(self,title='AcApp',mainFrame=QFrame): super().__init__() self.title=title self.mainFrame=mainFrame() self.initUI() def initUI(self): self.setCentralWidget(self.mainFrame) #connect signals self.mainFrame.debugSignal.connect(self.debugMsg) #General configuration #self.resize(self.width, self.height) self.setWindowTitle(self.title) self.setGeometry(300, 300, 850, 450) #self.setWindowIcon(QIcon(__icon__)) #Debug bar self.statusBar() self.statusBar().showMessage('Display debug messages') self.show() def debugMsg(self,val): self.statusBar().showMessage(val) def main(): app = QApplication(sys.argv) app.setQuitOnLastWindowClosed(True) #app.setStyleSheet(open("style.txt").read()); #set style according to file #ex = AcApp(__title__) ex = AcApp(__title__,EthernetInterface) app.quit() sys.exit(app.exec_()) if __name__ == '__main__': main()