Update the monitor code

Changes made to the monitor code to remove all warnings from
systemsupport.py.  Note there are warnings in oneUpMon, however these do
not appear to be valid, as some of them complain about attributes not
existing, when they clearly do (Qt.AlignBottom for instance)
This commit is contained in:
Jeff Curless
2025-10-19 19:36:25 -04:00
parent caec0f5671
commit 72de0bcea7
6 changed files with 37 additions and 504 deletions

View File

@@ -1,107 +0,0 @@
#!/usr/bin/env python3
import os
import time
class CPULoad:
'''
A class to help with obtaining the CPU load of the system. If there is more information
needed, we can add to this.
Note:
This code automatically attempts to load the data from the system to initialize the
object with names, and an initial set of data.
This may result in th first actual call return some not very consistent values, for
the time period being observed, but that difference is minimal. In otherwords if we
the period of time being measured is 1 second, and it's been a minute since this class
was initialized, the first period reported will be CPU load over the minute, not 1 second,
and the second period reported will be for a second...
This is usually not an issue.
'''
def __init__( self ):
self._previousData : dict[str,tuple] = self._getRawData()
self._names : list[str] = []
for item in self._previousData:
self._names.append( item )
def _getRawData( self ) -> dict[str : tuple]:
'''
Obtain the raw CPU data from the system (located in /prop/stat), and
return just the cpu0 -> cpux values. No assumption is made on the number of
cpus.
Returns:
A dictionary is returned, the format is name = (total, idle). The total
time and idle time are use to determine the percent utilization of the system.
'''
result = {}
with open( "/proc/stat", "r") as f:
allLines = f.readlines()
for line in allLines:
cpu = line.replace('\t', ' ').strip().split()
if (len(cpu[0]) > 3) and (cpu[0][:3] == "cpu"):
total = 0
idle = 0
for i in range( 1, len(cpu)):
total += int(cpu[i])
if i == 4 or i == 5:
idle += int(cpu[i])
result[cpu[0]] = (total,idle)
return result
def getPercentages( self ) -> dict[ str : float ]:
'''
Obtain the percent CPU utilization of the system for a period of time.
This routine gets the current raw data from the system, and then performs
a delta from the prior time this function was called. This data is then run
through the following equation:
utilization = ((total - idle)/total) * 100
If the snapshots are taken at relativy consistent intervals, the CPU
utilization in percent, is reasonably lose to the actual percentage.
Returns:
A dictionary consisting of the name of the CPU, and a floating point
number representing the current utilization of that CPU.
'''
results = {}
current = self._getRawData()
for item in current:
total = current[item][0] - self._previousData[item][0]
idle = current[item][1] - self._previousData[item][1]
percent = ((total - idle)/total) * 100
results[item] = round(percent,2)
self._previousData = current
return results
@property
def cpuNames( self ) -> list[str]:
'''
Get a list of CPU names from the system.
Returns:
a list of strings
'''
return self._names
def __len__(self) -> int:
'''
handle getting the length (or count of CPU's).
Returns:
Number of CPU's
'''
return len(self._previousData)
if __name__ == "__main__":
load = CPULoad()
print( f"Number of CPU's = {len(load)}" )
while True:
time.sleep( 1 )
percentage : dict[str:float] = load.getPercentages()
print( f"percentage: {percentage}" )
for item in percentage:
print( f"{item} : {percentage[item]:.02f}" )

View File

@@ -1,196 +0,0 @@
#!/usr/bin/python3
#
# Setup environment and pull in all of the items we need from gpiozero. The
# gpiozero library is the new one that supports Raspberry PI 5's (and I suspect
# will be new direction for all prior version the RPIi.)
#
from gpiozero import CPUTemperature
import time
import os
class DriveStat:
'''
DriveStat class -
This class gets the drive statistics from sysfs for the device passed
in. There are several statistics can can be obtained. Note that since
all of the data is pulled at the same time, it is upto the caller to
make sure all the stats needed are obtained at the same time.
See: https://www.kernel.org/doc/html/latest/block/stat.html
Parameters:
device - the name of the device to track
'''
READ_IOS = 0
READ_MERGES = 1
READ_SECTORS = 2
READ_TICKS = 3
WRITE_IOS = 4
WRITE_MERGES = 5
WRITE_SECTORS = 6
WRITE_TICKS = 7
IN_FLIGHT = 8
IO_TICKS = 9
TIME_IN_QUEUE = 10
DISCARD_IOS = 11
DISCARD_MERGES = 12
DISCARD_SECTORS = 13
DISCARD_TICS = 14
FLUSH_IOS = 15
FLUSH_TICKS = 16
def __init__( self, device:str ):
self._last : list[int] = []
self._stats : list[int] = []
self._device = device
self._readStats()
def _readStats( self ):
'''
Read the disk statistics. The stored statics in sysfs are stored as a single file
so that when the data is read, all of the stats correlate to the same time. The data
is from the time the device has come online.
last and set to the old version of the data, and the latest data is stored in stats
'''
try:
self._last = self._stats
with open( f"/sys/block/{self._device}/stat", "r") as f:
curStats = f.readline().strip().split(" ")
self._stats = [int(l) for l in curStats if l]
except Exception as e:
print( f"Failure reading disk statistics for {self._device} error {e}" )
def _getStats( self ) -> list[int]:
'''
Read the devices statistics from the device,and return it.
Returns:
An array containing all of the data colleected about the device.
'''
curData : list[int] = []
self._readStats()
if self._last == []:
curData = self._stats[:]
else:
curData = [ d-self._last[i] for i,d in enumerate( self._stats ) ]
return curData
def readAllStats( self ) -> list[int]:
'''
read all of the drive statisics from sysfs for the device.
Returns
A list of all of the device stats
'''
return self._getStats()
def readSectors( self )-> int:
return self._getStats()[DriveStat.READ_SECTORS]
def writeSectors( self ) -> int:
return self._getStats()[DriveStat.WRITE_SECTORS]
def discardSectors( self ) -> int:
return self._getStats()[DriveStat.DISCARD_SECTORS]
def readWriteSectors( self ) -> tuple[int,int]:
curData = self._getStats()
return (curData[DriveStat.READ_SECTORS],curData[DriveStat.WRITE_SECTORS])
def setupTemperatureObject():
'''
Get a cpu temperature object, and set the min and max range.
When the ranges are set to the non-default values, if the temperature is
less than min_temp we get 0, and when the temperature reaches the max we get
a value of 1. This value can be used directly as a duty cycle for the fan.
Return:
A CPU temperature object
'''
cpuTempObj = None
try:
cpuTempObj = CPUTemperature()
except Exception as error:
print( f"Error creating CPU temperature object, error is {error}" )
return cpuTempObj
def getFanSpeed() -> int:
'''
Obtain the speed of the fan attached to the CPU. This is accomplished reading
the information from sysfs.
NOTE: There is an assumption that the fan s hanging off of /hwmon/hwmon3. This may
or may not be the case in all situations.
'''
fanSpeed = 0
try:
command = os.popen( 'cat /sys/devices/platform/cooling_fan/hwmon/*/fan1_input' )
fanSpeed = int(command.read().strip())
except:
pass
return fanSpeed
def getNVMETemp(device : str) -> float:
'''
Obtain the temperature of the device passed in, using smartctl.
Parameters :
device - A string containing the device name.
Returns:
The temperature as a float
'''
smartOutRaw = ""
cmd = f'smartctl -A /dev/{device}'
try:
command = os.popen( cmd )
smartOutRaw = command.read()
except Exception as error:
print( f"Could not launch {cmd} error is {error}" )
return 0.0
finally:
command.close()
smartOut = [ l for l in smartOutRaw.split('\n') if l]
for smartAttr in ["Temperature:","194","190"]:
try:
line = [l for l in smartOut if l.startswith(smartAttr)][0]
parts = [p for p in line.replace('\t',' ').split(' ') if p]
if smartAttr == "Temperature:":
return float(parts[1])
else:
return float(parts[0])
except IndexError:
pass
return float(0.0)
def argonsysinfo_kbstr(kbval):
suffixidx = 0
suffixlist = ["B","KiB", "MiB", "GiB", "TiB"]
while kbval > 1023 and suffixidx < len(suffixlist):
kbval = kbval // 1024
suffixidx = suffixidx + 1
return f"{kbval} {suffixlist[suffixidx]}"
stats = DriveStat( 'nvme0n1' )
cpuTemp = setupTemperatureObject()
while True:
os.system( "clear ")
print( f"CPU : {cpuTemp.temperature}" )
print( f"Fan : {getFanSpeed()}" )
print( f"NVME : {getNVMETemp('nvme0n1')}" )
data = stats.readWriteSectors()
print( f"Read : {argonsysinfo_kbstr(data[0]*512)}/s" )
print( f"Write: {argonsysinfo_kbstr(data[1]*512)}/s" )
time.sleep(1)

View File

@@ -7,14 +7,13 @@ Requires: PyQt5 (including QtCharts)
""" """
import os
import sys import sys
from systemsupport import systemData, CPULoad from systemsupport import systemData, CPULoad
# -------------------------- # --------------------------
# Globals # Globals
# -------------------------- # --------------------------
sysdata = None sysdata = systemData()
cpuload = CPULoad() cpuload = CPULoad()
# -------------------------- # --------------------------
@@ -23,7 +22,7 @@ cpuload = CPULoad()
from PyQt5.QtCore import Qt, QTimer from PyQt5.QtCore import Qt, QTimer
from PyQt5.QtGui import QPainter from PyQt5.QtGui import QPainter
from PyQt5.QtWidgets import QApplication, QMainWindow, QWidget, QGridLayout, QLabel from PyQt5.QtWidgets import QApplication, QMainWindow, QWidget, QGridLayout
from PyQt5.QtChart import QChart, QChartView, QLineSeries, QValueAxis from PyQt5.QtChart import QChart, QChartView, QLineSeries, QValueAxis
class RollingChart(QWidget): class RollingChart(QWidget):
@@ -39,7 +38,7 @@ class RollingChart(QWidget):
def __init__(self, title: str, series_defs: list[tuple], y_min: float, y_max: float, window: int = 120, parent=None): def __init__(self, title: str, series_defs: list[tuple], y_min: float, y_max: float, window: int = 120, parent=None):
super().__init__(parent) super().__init__(parent)
self.window = window self.pointWindow = window
self.xpos = window - 1 self.xpos = window - 1
self.chart = QChart() self.chart = QChart()
@@ -60,7 +59,7 @@ class RollingChart(QWidget):
# want is the tick count etc, but NO lable on the axis. There does not # want is the tick count etc, but NO lable on the axis. There does not
# appear to be a way to do that. # appear to be a way to do that.
self.axis_x = QValueAxis() self.axis_x = QValueAxis()
self.axis_x.setRange(0, self.window) self.axis_x.setRange(0, self.pointWindow)
self.axis_x.setMinorTickCount( 2 ) self.axis_x.setMinorTickCount( 2 )
self.axis_x.setTickCount( 10 ) self.axis_x.setTickCount( 10 )
self.axis_x.setLabelFormat("%d") self.axis_x.setLabelFormat("%d")
@@ -108,7 +107,7 @@ class RollingChart(QWidget):
pass pass
# Trim series to rolling window # Trim series to rolling window
min_x_to_keep = max(0, self.xpos - self.window) min_x_to_keep = max(0, self.xpos - self.pointWindow)
self.axis_x.setRange(min_x_to_keep, self.xpos) self.axis_x.setRange(min_x_to_keep, self.xpos)
for s in self.series: for s in self.series:
@@ -213,7 +212,7 @@ class RollingChartDynamic(RollingChart):
pass pass
# Trim series to rolling window # Trim series to rolling window
min_x_to_keep = max(0, self.xpos - self.window) min_x_to_keep = max(0, self.xpos - self.pointWindow)
self.axis_x.setRange(min_x_to_keep, self.xpos) self.axis_x.setRange(min_x_to_keep, self.xpos)
if scaleUp: if scaleUp:
@@ -372,6 +371,5 @@ def main():
sys.exit(app.exec_()) sys.exit(app.exec_())
if __name__ == "__main__": if __name__ == "__main__":
sysdata = systemData()
main() main()

View File

@@ -1,170 +0,0 @@
#!/usr/bin/python3
#
# Setup environment and pull in all of the items we need from gpiozero. The
# gpiozero library is the new one that supports Raspberry PI 5's (and I suspect
# will be new direction for all prior version the RPIi.)
#
from gpiozero import CPUTemperature
import time
import os
class DriveStats:
'''
DriveStat class -
This class gets the drive statistics from sysfs for the device passed
in. There are several statistics can can be obtained. Note that since
all of the data is pulled at the same time, it is upto the caller to
make sure all the stats needed are obtained at the same time.
See: https://www.kernel.org/doc/html/latest/block/stat.html
Parameters:
device - the name of the device to track
'''
READ_IOS = 0
READ_MERGES = 1
READ_SECTORS = 2
READ_TICKS = 3
WRITE_IOS = 4
WRITE_MERGES = 5
WRITE_SECTORS = 6
WRITE_TICKS = 7
IN_FLIGHT = 8
IO_TICKS = 9
TIME_IN_QUEUE = 10
DISCARD_IOS = 11
DISCARD_MERGES = 12
DISCARD_SECTORS = 13
DISCARD_TICS = 14
FLUSH_IOS = 15
FLUSH_TICKS = 16
def __init__( self, device:str ):
self._last : list[int] = []
self._stats : list[int] = []
self._device = device
self._readStats()
def _readStats( self ):
'''
Read the disk statistics. The stored statics in sysfs are stored as a single file
so that when the data is read, all of the stats correlate to the same time. The data
is from the time the device has come online.
last and set to the old version of the data, and the latest data is stored in stats
'''
try:
self._last = self._stats
with open( f"/sys/block/{self._device}/stat", "r") as f:
curStats = f.readline().strip().split(" ")
self._stats = [int(l) for l in curStats if l]
except Exception as e:
print( f"Failure reading disk statistics for {self._device} error {e}" )
def _getStats( self ) -> list[int]:
'''
Read the devices statistics from the device,and return it.
Returns:
An array containing all of the data colleected about the device.
'''
curData : list[int] = []
self._readStats()
if self._last == []:
curData = self._stats[:]
else:
curData = [ d-self._last[i] for i,d in enumerate( self._stats ) ]
return curData
def readAllStats( self ) -> list[int]:
'''
read all of the drive statisics from sysfs for the device.
Returns
A list of all of the device stats
'''
return self._getStats()
def readSectors( self )-> int:
return self._getStats()[DriveStats.READ_SECTORS]
def writeSectors( self ) -> int:
return self._getStats()[DriveStats.WRITE_SECTORS]
def discardSectors( self ) -> int:
return self._getStats()[DriveStats.DISCARD_SECTORS]
def readWriteSectors( self ) -> tuple[int,int]:
curData = self._getStats()
return (curData[DriveStats.READ_SECTORS],curData[DriveStats.WRITE_SECTORS])
class systemData:
def __init__( self, drive : str = 'nvme0n1' ):
self._drive = drive
self._cpuTemp = CPUTemperature()
self._stats = DriveStats( self._drive )
@property
def CPUTemperature(self) -> int:
return self._cpuTemp.temperature
@property
def fanSpeed( self ) -> int:
speed= 0
try:
command = os.popen( 'cat /sys/devices/platform/cooling_fan/hwmon/*/fan1_input' )
speed = int( command.read().strip())
except Exception as error:
print( f"Could not determine fan speed, error {error}" )
finally:
command.close()
return speed
@property
def driveTemp(self) -> float:
smartOutRaw = ""
cmd = f'sudo smartctl -A /dev/{self._drive}'
try:
command = os.popen( cmd )
smartOutRaw = command.read()
except Exception as error:
print( f"Could not launch {cmd} error is {error}" )
return 0.0
finally:
command.close()
smartOut = [ l for l in smartOutRaw.split('\n') if l]
for smartAttr in ["Temperature:","194","190"]:
try:
line = [l for l in smartOut if l.startswith(smartAttr)][0]
parts = [p for p in line.replace('\t',' ').split(' ') if p]
if smartAttr == "Temperature:":
return float(parts[1])
else:
return float(parts[0])
except IndexError:
pass
return float(0.0)
@property
def driveStats(self) -> tuple[float,float]:
data = self._stats.readWriteSectors()
readMB = (float(data[0]) * 512.0) #/ (1024.0 * 1024.0)
writeMB = (float(data[1]) * 512.0) #/ (1024.0 * 1024.0)
return (readMB, writeMB )
if __name__ == "__main__":
data = systemData()
print( f"CPU Temp : {data.CPUTemperature}" )
print( f"Fan Speed: {data.fanSpeed}" )
print( f"NVME Temp: {data.driveTemp}" )
print( f"Stats : {data.driveStats}" )

View File

@@ -13,7 +13,7 @@ Replace the stub return values with your real implementations later.
import sys import sys
from typing import Tuple from typing import Tuple
from oneUpSupport import systemData from systemsupport import systemData, CPULoad
sysdata = systemData() sysdata = systemData()

View File

@@ -58,7 +58,7 @@ class DriveStats:
''' '''
try: try:
self._last = self._stats self._last = self._stats
with open( f"/sys/block/{self._device}/stat", "r") as f: with open( f"/sys/block/{self._device}/stat", "r",encoding="utf8") as f:
curStats = f.readline().strip().split(" ") curStats = f.readline().strip().split(" ")
self._stats = [int(l) for l in curStats if l] self._stats = [int(l) for l in curStats if l]
except Exception as e: except Exception as e:
@@ -114,7 +114,7 @@ class systemData:
return self._cpuTemp.temperature return self._cpuTemp.temperature
@property @property
def fanSpeed( self ) -> int: def fanSpeed( self ) -> float:
speed= 0 speed= 0
try: try:
command = os.popen( 'cat /sys/devices/platform/cooling_fan/hwmon/*/fan1_input' ) command = os.popen( 'cat /sys/devices/platform/cooling_fan/hwmon/*/fan1_input' )
@@ -124,7 +124,7 @@ class systemData:
finally: finally:
command.close() command.close()
return speed return float(speed)
@property @property
def driveTemp(self) -> float: def driveTemp(self) -> float:
@@ -155,9 +155,9 @@ class systemData:
@property @property
def driveStats(self) -> tuple[float,float]: def driveStats(self) -> tuple[float,float]:
data = self._stats.readWriteSectors() _data = self._stats.readWriteSectors()
readMB = (float(data[0]) * 512.0) #/ (1024.0 * 1024.0) readMB = (float(_data[0]) * 512.0) #/ (1024.0 * 1024.0)
writeMB = (float(data[1]) * 512.0) #/ (1024.0 * 1024.0) writeMB = (float(_data[1]) * 512.0) #/ (1024.0 * 1024.0)
return (readMB, writeMB ) return (readMB, writeMB )
class CPULoad: class CPULoad:
@@ -178,12 +178,20 @@ class CPULoad:
This is usually not an issue. This is usually not an issue.
''' '''
def __init__( self ): def __init__( self ):
self._previousData : dict[str,tuple] = self._getRawData() #
# Get the current data
#
self._previousData : dict[str,tuple[int,int]] = self._getRawData()
self._names : list[str] = [] self._names : list[str] = []
for item in self._previousData: #
self._names.append( item ) # For each CPU, reset the total and idle amount, and create the list
# of names
#
for _item in self._previousData:
self._previousData[_item] = (0,0)
self._names.append(_item)
def _getRawData( self ) -> dict[str : tuple]: def _getRawData( self ) -> dict[str,tuple[int,int]]:
''' '''
Obtain the raw CPU data from the system (located in /prop/stat), and Obtain the raw CPU data from the system (located in /prop/stat), and
return just the cpu0 -> cpux values. No assumption is made on the number of return just the cpu0 -> cpux values. No assumption is made on the number of
@@ -194,21 +202,21 @@ class CPULoad:
time and idle time are use to determine the percent utilization of the system. time and idle time are use to determine the percent utilization of the system.
''' '''
result = {} result = {}
with open( "/proc/stat", "r") as f: with open( "/proc/stat", "r",encoding="utf8") as f:
allLines = f.readlines() allLines = f.readlines()
for line in allLines: for line in allLines:
cpu = line.replace('\t', ' ').strip().split() cpu = line.replace('\t', ' ').strip().split()
if (len(cpu[0]) > 3) and (cpu[0][:3] == "cpu"): if (len(cpu[0]) > 3) and (cpu[0][:3] == "cpu"):
total = 0 total = 0
idle = 0 idle = 0
for i in range( 1, len(cpu)): for _index in range( 1, len(cpu)):
total += int(cpu[i]) total += int(cpu[_index])
if i == 4 or i == 5: if _index == 4 or _index == 5:
idle += int(cpu[i]) idle += int(cpu[_index])
result[cpu[0]] = (total,idle) result[cpu[0]] = (total,idle)
return result return result
def getPercentages( self ) -> dict[ str : float ]: def getPercentages( self ) -> dict[str,float]:
''' '''
Obtain the percent CPU utilization of the system for a period of time. Obtain the percent CPU utilization of the system for a period of time.
@@ -227,11 +235,11 @@ class CPULoad:
''' '''
results = {} results = {}
current = self._getRawData() current = self._getRawData()
for item in current: for _item in current:
total = current[item][0] - self._previousData[item][0] total = current[_item][0] - self._previousData[_item][0]
idle = current[item][1] - self._previousData[item][1] idle = current[_item][1] - self._previousData[_item][1]
percent = ((total - idle)/total) * 100 percent = ((total - idle)/total) * 100
results[item] = round(percent,2) results[_item] = round(percent,2)
self._previousData = current self._previousData = current
return results return results
@@ -266,7 +274,7 @@ if __name__ == "__main__":
print( f"Number of CPU's = {len(load)}" ) print( f"Number of CPU's = {len(load)}" )
for i in range(10): for i in range(10):
time.sleep( 1 ) time.sleep( 1 )
percentage : dict[str:float] = load.getPercentages() percentage : dict[str,float] = load.getPercentages()
print( f"percentage: {percentage}" ) print( f"percentage: {percentage}" )
for item in percentage: for item in percentage:
print( f"{item} : {percentage[item]:.02f}" ) print( f"{item} : {percentage[item]:.02f}" )