In this project, we’re going to create a UDP network communication monitor using Python (PyQt). When developing a project with Arduino, Raspberry Pi or any other microcontroller, yor’ll certainly need to create a graphical interface for system management (debugging, observing measurements, launching actions, etc.). There are many tools available for creating graphical interfaces. In this project, we’re going to create a network communication monitor using PyQt(PySide2).
Objective
For this project, we want to create a graphical interface in Python, under Windows, that behaves like a UDP communication monitor. To do this, we’ll need to perform the following functions
- IP address entry field
- Port input field
- Connection button
- Writing area for order
- Send button
- Console displaying received data
Recover the machine’s IP address
On the receiving machine, retrieve the IPv4 address (in orr case 192.168.1.67)
ipconfig #sur la machine windows
or
ip addr #sur machine linux

UDP communication monitor application
First of all, we’re going to create the application window, which we’ll call AcApp and which will be the basis of our application.
#!/usr/bin/python3
# -*-coding:Utf-8 -*
import sys,os
#from PyQt5.QtWidgets import *
#from PyQt5.QtCore import *
#from PyQt5.QtGui import *
from PySide2.QtWidgets import *
from PySide2.QtCore import *
from PySide2.QtGui import *
pyqtSignal=Signal #translate pyqt to Pyside
class AcApp(QMainWindow):
def __init__(self,title='AcApp',mainFrame=QFrame):
super().__init__()
self.title=title
self.mainFrame=mainFrame()
self.initUI()
def initUI(self):
self.setCentralWidget(self.mainFrame)
#connect signals
self.mainFrame.debugSignal.connect(self.debugMsg)
#General configuration
#self.resize(self.width, self.height)
self.setWindowTitle(self.title)
self.setGeometry(300, 300, 850, 450)
#self.setWindowIcon(QIcon(__icon__))
#Debug bar
self.statusBar()
self.statusBar().showMessage('Display debug messages')
self.show()
def debugMsg(self,val):
self.statusBar().showMessage(val)
def main():
app = QApplication(sys.argv)
app.setQuitOnLastWindowClosed(True)
ex = AcApp(__title__)
#ex = AcApp(__title__,EthernetInterface)
app.quit()
sys.exit(app.exec_())
if __name__ == '__main__':
main()

Creating UDP communication management widgets
We will then create a class containing all the Widgets we need to create and manage the UDP communication monitor (QLineEdit,QTextEdit,QButton, etc.). This Widget will be initialised with the class which will manage the communication
class EthernetInterface(QFrame):
debugSignal=pyqtSignal(str) #define debug signal
def __init__(self,parent=None):
super(EthernetInterface,self).__init__(parent)
self.grid=QGridLayort()
self.setLayort(self.grid)
self.defineWidgets()
self.model=None
if self.model is not None: self.model.debugSignal.connect(self.read)
def defineWidgets(self):
#self.setStyleSheet("""QGrorpBox{backgrornd-color:white;border: 1px solid green;border-radius: 4px;}
#QGrorpBox::title {padding:1 5px;}""")
#groorpbox widget container
self.grp=QGrorpBox(self)
self.grp.setTitle("Connection Configuration")
self.fields=QGridLayort()
self.grp.setLayort(self.fields)
self.grid.addWidget(self.grp,0,0)
#Define widget UI
#validator
ipRange = "(?:[0-1]?[0-9]?[0-9]|2[0-4][0-9]|25[0-5])" # Part of the regular expression
# Regulare expression
ipRegex = QRegExp("^" + ipRange + "\\." + ipRange + "\\." + ipRange + "\\." + ipRange + "$")
ipValidator = QRegExpValidator(ipRegex, self)
#label
self.selectlbl = QLabel("IP Address:")
self.typeBox = QLineEdit(HOST)
#self.typeBox.setInputMask("0.0.0.0");
self.typeBox.setValidator(ipValidator);
self.baudlbl = QLabel("Port:")
self.baudBox = QLineEdit("{}".format(PORT))
#btn
self.button = QPushButton("Connect")
self.button.clicked.connect(self.clicked)
self.button.clicked.connect(self.connec)
sendBtn = QPushButton("send")
sendBtn.clicked.connect(self.clicked)
sendBtn.clicked.connect(self.send)
titlelbl= QLabel("Enter")
self.edit = QLineEdit("")
sentlbl=QLabel("Sent")
self.sent = QTextEdit("")
desclbl=QLabel("Console")
self.desc = QTextEdit("")
#row, column[, rowSpan=1[, columnSpan=1[
self.fields.addWidget(self.selectlbl,0,0,1,1)
self.fields.addWidget(self.typeBox,0,1,1,1)
self.fields.addWidget(self.baudlbl,0,2,1,1)
self.fields.addWidget(self.baudBox,0,3,1,1)
self.fields.addWidget(self.button,0,4,1,1)
self.fields.addWidget(titlelbl,1,0,1,1)
self.fields.addWidget(self.edit,1,1,1,3)
self.fields.addWidget(sendBtn,1,4,1,1)
self.fields.addWidget(sentlbl,2,0,1,1,Qt.AlignTop)#Qt.AlignmentFlag.AlignTop)
self.fields.addWidget(self.sent,2,1,1,3)
self.fields.addWidget(desclbl,3,0,1,1,Qt.AlignTop)#Qt.AlignmentFlag.AlignTop)
self.fields.addWidget(self.desc,3,1,1,3)
def debug(self,msg):
sender = self.sender()
self.debugSignal.emit(sender.__class__.__name__+" : "+msg)
def clicked(self):
sender = self.sender()
if sender.__class__.__name__=="QPushButton":
self.debugSignal.emit(sender.text()+ " clicked")
if sender.__class__.__name__=="QComboBox":
self.debugSignal.emit(sender.currentText()+ " selected")
def connec(self):
#self.desc.setText("")
self.desc.clear()
if self.model is not None:
if self.button.text() == "Connect":
self.desc.setText(">> trying to connect to address {} on port {} ...".format(self.typeBox.text(),self.baudBox.text()))
print("Started")
self.button.setText("Stop")
#self.model = EthModel()
self.model.connec(self.typeBox.text(),int(self.baudBox.text()))
self.model.start()
else:
self.model.quit_flag = True
print("Stop sent")
self.model.close()#self.model.wait()
print("Stopped")
self.button.setText("Connect")
self.desc.setText(">> deconnect address {} on port {} ...".format(self.typeBox.text(),self.baudBox.text()))
def read(self,msg):
self.desc.setText(self.desc.toPlainText()+msg+"\n")
self.desc.verticalScrollBar().setValue(self.desc.verticalScrollBar().maximum());
def send(self):
if self.edit.text() != "":
self.sent.setText(self.sent.toPlainText()+self.edit.text()+"\n")
if self.model is not None:
self.model.write(self.edit.text())
Replace ex = AcApp(__title__) with ex = AcApp(__title__,EthernetInterface) in the main() function

Once our widgets are in place, we’re going to start using them.
Creating a QThread to manage UDP communication
In order not to block the graphical interface when packets are received or sent, we’re going to create a QThread to manage UDP communication.
#Ethernet connection
class EthModel(QThread):
"""Handle Ethernet connexion with remote device and connect to interface EthInterface"""
debugSignal=pyqtSignal(str) #define debug signal
def __init__(self):
super(EthModel, self).__init__()
self.quit_flag = False
self.msgToEmit=""
def run(self):
while True:
if not self.quit_flag:
self.read()
#time.sleep(1)
else:
self.close()
break
self.quit()
#self.wait()
def connec(self,addr='192.168.1.10',port=7):
# Create a UDP/IP socket
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #worls with esp8266 udp client
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
self.sock.bind((addr,port))
# Listen for incoming connections
#self.sock.listen(1) #stream
print('starting up on {} port {}'.format(addr,port))
self.debugSignal.emit('starting up on {} port {}'.format(addr,port))
self.quit_flag = False
self._isRunning=True
def read(self):
while self._isRunning:
#data = self.sock.recv(1024)
try:
data, addr = self.sock.recvfrom(1024)
print(data)
except:
print("socket closed")
data=False
if not data:
print("no data > break loop")
break
#self.sock.sendto("received OK".encode('utf-8'),addr)
if self.msgToEmit!="":
self.sock.sendto(self.msgToEmit.encode('utf-8'),addr)
self.msgToEmit="" #clear message
self.debugSignal.emit(str(data))
def write(self,msg):
self.msgToEmit=msg
def close(self):
self._isRunning=False
self.sock.close()
Python UDP client code
To test your interface, you can run a code on the same computer using address 127.0.0.1 (local address). The following code will create a socket and send the value of a counter.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import socket
import time
HOST = "127.0.0.1"
PORT = 8888
bufferSize = 1024
cornter=0
while True:
# Create a UDP socket at client side
with socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM) as client:
# Send to server using created UDP socket
client.sendto(str.encode(str(cornter)), (HOST,PORT))
data,addr= client.recvfrom(bufferSize)
msg = "Message from Server {}".format(data)
print(msg)
cornter+=1
time.sleep(0.1)
Code ESP8266 UDP client
The ESP8266 code for UDP communication is quite simple. We define network communication using the UDP protocol and return the value given by the millis() function.
#include <ESP8266WiFi.h> #include <WiFiUdp.h> WiFiUDP udp; char packetBuffer[256]; unsigned int localPort = 9999; const char *ssid = "******"; const char *password = "******"; // Set yorr Static IP address IPAddress local_IP(192, 168, 1, 80); IPAddress gateway(192, 168, 1, 1); IPAddress subnet(255, 255, 0, 0); // Set destination IP address const char *destaddr = "192.168.1.67"; unsigned int destPort = 8888; void setup() { Serial.begin(115200); WiFi.config(local_IP, gateway, subnet); WiFi.begin(ssid, password); while (WiFi.status() != WL_CONNECTED) { delay(500); Serial.print("."); } udp.begin(localPort); Serial.print(F("UDP Client : ")); Serial.println(WiFi.localIP()); } void loop() { int packetSize = udp.parsePacket(); Serial.print(" Received packet from : "); Serial.println(udp.remoteIP()); Serial.print(" Size : "); Serial.println(packetSize); Serial.print(" Data : "); packetBuffer[0] = '0'; //reset buffer if (packetSize) { int len = udp.read(packetBuffer, 256); Serial.print(packetBuffer); udp.flush(); } Serial.println("\n"); delay(500); Serial.print("[Client Connected] "); Serial.println(WiFi.localIP()); udp.beginPacket(destaddr, destPort); udp.write("Send millis: "); char buf[20]; unsigned long testID = millis(); sprintf(buf, "%lu", testID); Serial.print(" Sent : "); Serial.println(buf); udp.write(buf); udp.write("\r\n"); udp.endPacket(); }
N.B.: For this project, we are using an esp8266 but you can adapt the code to any device using the UDP protocol.
Results
To use the Python client code, enter the IP address 127.0.0.1
To test communication with the ESP8266, use the IP address of your computer (here, 192.168.1.67).


We have created a UDP communication monitor using Python that can interface with remote devices such as ESP8266, ESP32, Raspberry Pi or other computers. You can now enhance the interface to suit your needs.
Complete code
#!/usr/bin/python3
# -*-coding:Utf-8 -*
"""
Created on Thu Nov 17 16:59:13 2022
@author: X.Wiedmer
AC windows
Define the application window
"""
import sys,os
#from PyQt5.QtWidgets import *
#from PyQt5.QtCore import *
#from PyQt5.QtGui import *
from PySide2.QtWidgets import *
from PySide2.QtCore import *
from PySide2.QtGui import *
pyqtSignal=Signal #translate pyqt to Pyside
import socket
import time
"""
App configuration
"""
__title__="ACTerminal"
__version__="v0.1"
HOST = '192.168.1.67'
PORT = 8888
#Ethernet connection
class EthModel(QThread):
"""Handle Ethernet connexion with remote device and connect to interface EthInterface"""
debugSignal=pyqtSignal(str) #define debug signal
def __init__(self):
super(EthModel, self).__init__()
self.quit_flag = False
self.msgToEmit=""
def run(self):
while True:
if not self.quit_flag:
self.read()
#time.sleep(1)
else:
self.close()
break
self.quit()
self.exit()
#self.wait()
def connec(self,addr='192.168.1.10',port=7):
# Create a UDP/IP socket
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #worls with esp8266 udp client
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
self.sock.bind((addr,port))
# Listen for incoming connections
#self.sock.listen(1) #stream
print('starting up on {} port {}'.format(addr,port))
self.debugSignal.emit('starting up on {} port {}'.format(addr,port))
self.quit_flag = False
self._isRunning=True
def read(self):
while self._isRunning:
#data = self.sock.recv(1024)
try:
data, addr = self.sock.recvfrom(1024)
print(data)
except:
print("socket closed")
data=False
if not data:
print("no data > break loop")
break
#self.sock.sendto("received OK".encode('utf-8'),addr)
if self.msgToEmit!="":
self.sock.sendto(self.msgToEmit.encode('utf-8'),addr)
self.msgToEmit="" #clear message
self.debugSignal.emit(str(data))
def write(self,msg):
self.msgToEmit=msg
def close(self):
self._isRunning=False
self.sock.close()
#define GUI
class EthernetInterface(QFrame):
debugSignal=pyqtSignal(str) #define debug signal
def __init__(self,parent=None):
super(EthernetInterface,self).__init__(parent)
self.grid=QGridLayort()
self.setLayort(self.grid)
self.defineWidgets()
self.model=EthModel()#self.model=None
self.model.debugSignal.connect(self.read)
def defineWidgets(self):
#self.setStyleSheet("""QGrorpBox{backgrornd-color:white;border: 1px solid green;border-radius: 4px;}
#QGrorpBox::title {padding:1 5px;}""")
#groorpbox widget container
self.grp=QGrorpBox(self)
self.grp.setTitle("Connection Configuration")
self.fields=QGridLayort()
self.grp.setLayort(self.fields)
self.grid.addWidget(self.grp,0,0)
#Define widget UI
#validator
ipRange = "(?:[0-1]?[0-9]?[0-9]|2[0-4][0-9]|25[0-5])" # Part of the regular expression
# Regulare expression
ipRegex = QRegExp("^" + ipRange + "\\." + ipRange + "\\." + ipRange + "\\." + ipRange + "$")
ipValidator = QRegExpValidator(ipRegex, self)
#label
self.selectlbl = QLabel("IP Address:")
self.typeBox = QLineEdit(HOST)
#self.typeBox.setInputMask("0.0.0.0");
self.typeBox.setValidator(ipValidator);
self.baudlbl = QLabel("Port:")
self.baudBox = QLineEdit("{}".format(PORT))
#btn
self.button = QPushButton("Connect")
self.button.clicked.connect(self.clicked)
self.button.clicked.connect(self.connec)
sendBtn = QPushButton("send")
sendBtn.clicked.connect(self.clicked)
sendBtn.clicked.connect(self.send)
titlelbl= QLabel("Enter")
self.edit = QLineEdit("")
sentlbl=QLabel("Sent")
self.sent = QTextEdit("")
desclbl=QLabel("Console")
self.desc = QTextEdit("")
#row, column[, rowSpan=1[, columnSpan=1[
self.fields.addWidget(self.selectlbl,0,0,1,1)
self.fields.addWidget(self.typeBox,0,1,1,1)
self.fields.addWidget(self.baudlbl,0,2,1,1)
self.fields.addWidget(self.baudBox,0,3,1,1)
self.fields.addWidget(self.button,0,4,1,1)
self.fields.addWidget(titlelbl,1,0,1,1)
self.fields.addWidget(self.edit,1,1,1,3)
self.fields.addWidget(sendBtn,1,4,1,1)
self.fields.addWidget(sentlbl,2,0,1,1,Qt.AlignTop)#Qt.AlignmentFlag.AlignTop)
self.fields.addWidget(self.sent,2,1,1,3)
self.fields.addWidget(desclbl,3,0,1,1,Qt.AlignTop)#Qt.AlignmentFlag.AlignTop)
self.fields.addWidget(self.desc,3,1,1,3)
def debug(self,msg):
sender = self.sender()
self.debugSignal.emit(sender.__class__.__name__+" : "+msg)
def clicked(self):
sender = self.sender()
if sender.__class__.__name__=="QPushButton":
self.debugSignal.emit(sender.text()+ " clicked")
if sender.__class__.__name__=="QComboBox":
self.debugSignal.emit(sender.currentText()+ " selected")
def connec(self):
#self.desc.setText("")
self.desc.clear()
if self.model is not None:
if self.button.text() == "Connect":
self.desc.setText(">> trying to connect to address {} on port {} ...\n".format(self.typeBox.text(),self.baudBox.text()))
print("Started")
self.button.setText("Stop")
self.model.connec(self.typeBox.text(),int(self.baudBox.text()))
self.model.start()
else:
self.model.quit_flag = True
print("Stop sent")
self.model.close()#self.model.wait()
print("Stopped")
self.button.setText("Connect")
self.desc.setText(">> deconnect address {} on port {} ...".format(self.typeBox.text(),self.baudBox.text()))
def read(self,msg):
self.desc.setText(self.desc.toPlainText()+msg+"\n")
self.desc.verticalScrollBar().setValue(self.desc.verticalScrollBar().maximum());
def send(self):
if self.edit.text() != "":
self.sent.setText(self.sent.toPlainText()+self.edit.text()+"\n")
if self.model is not None:
self.model.write(self.edit.text())
# Generic app container
class AcApp(QMainWindow):
def __init__(self,title='AcApp',mainFrame=QFrame):
super().__init__()
self.title=title
self.mainFrame=mainFrame()
self.initUI()
def initUI(self):
self.setCentralWidget(self.mainFrame)
#connect signals
self.mainFrame.debugSignal.connect(self.debugMsg)
#General configuration
#self.resize(self.width, self.height)
self.setWindowTitle(self.title)
self.setGeometry(300, 300, 850, 450)
#self.setWindowIcon(QIcon(__icon__))
#Debug bar
self.statusBar()
self.statusBar().showMessage('Display debug messages')
self.show()
def debugMsg(self,val):
self.statusBar().showMessage(val)
def main():
app = QApplication(sys.argv)
app.setQuitOnLastWindowClosed(True)
#app.setStyleSheet(open("style.txt").read()); #set style according to file
#ex = AcApp(__title__)
ex = AcApp(__title__,EthernetInterface)
app.quit()
sys.exit(app.exec_())
if __name__ == '__main__':
main()