Modify the code to remove single drive class

Remove the class for single drive data fetching, and so we can modify
the monitor code to make sure we can handle multiple drives
This commit is contained in:
Jeff Curless
2025-10-26 12:12:30 -04:00
parent cf678772d6
commit 68f8057d8d
2 changed files with 51 additions and 96 deletions

View File

@@ -8,12 +8,12 @@ Requires: PyQt5 (including QtCharts)
""" """
import sys import sys
from systemsupport import systemData, CPULoad, multiDriveStat from systemsupport import CPUInfo, CPULoad, multiDriveStat
# -------------------------- # --------------------------
# Globals # Globals
# -------------------------- # --------------------------
sysdata = systemData() cpuinfo = CPUInfo()
cpuload = CPULoad() cpuload = CPULoad()
multiDrive = multiDriveStat() multiDrive = multiDriveStat()
@@ -331,13 +331,13 @@ class MonitorWindow(QMainWindow):
# Obtain the current fan speed # Obtain the current fan speed
try: try:
fan_speed = sysdata.fanSpeed fan_speed = cpuinfo.CPUFanSpeed
except Exception: except Exception:
fan_speed = None fan_speed = None
temperatures = [] temperatures = []
try: try:
temperatures.append( float(sysdata.CPUTemperature) ) temperatures.append( float(cpuinfo.temperature) )
except Exception: except Exception:
temperatures.append( 0.0 ) temperatures.append( 0.0 )

View File

@@ -123,32 +123,16 @@ class multiDriveStat():
If there is a missing drive from the filter, that drive is eliminated. If there is a missing drive from the filter, that drive is eliminated.
Parameters:
driveList - if None, generate a list by asking the system
- Otherwise remove all drives from the generated list that
are not in the monitor list.
''' '''
def __init__(self,driveList:list[str] | None = None): def __init__(self):
# #
# Get all drives # Get all drives
# #
with os.popen( 'lsblk -o NAME,SIZE,TYPE | grep disk') as command: self._drives = []
with os.popen( 'ls -1 /sys/block | grep -v -e loop -e ram') as command:
lsblk_raw = command.read() lsblk_raw = command.read()
lsblk_out = [ l for l in lsblk_raw.split('\n') if l] self._drives = [ l for l in lsblk_raw.split('\n') if l]
self._driveInfo = {} self._stats = [ DriveStats(_) for _ in self._drives ]
for l in lsblk_out:
_item = l.split()
self._driveInfo[_item[0]] = _item[1]
# filter out drives
if driveList is not None:
_temp = {}
for _drive in driveList:
try:
_temp[_drive] = self._driveInfo[_drive]
except:
print( f"Filtering out drive {_drive}, not currently connected to system." )
self._driveInfo = _temp
self._stats = [ DriveStats(_) for _ in self._driveInfo ]
@property @property
def drives(self) -> list[str]: def drives(self) -> list[str]:
@@ -158,8 +142,7 @@ class multiDriveStat():
Returns: Returns:
A list of drives A list of drives
''' '''
drives = [ _ for _ in self._driveInfo] return self._drives
return drives
def driveSize( self, _drive ) -> int: def driveSize( self, _drive ) -> int:
''' '''
@@ -172,22 +155,11 @@ class multiDriveStat():
The size in bytes, or 0 if the drive does not exist The size in bytes, or 0 if the drive does not exist
''' '''
try: try:
factor = self._driveInfo[_drive][-1:] byteCount = 0
size = float(self._driveInfo[_drive][:-1]) with os.popen(f'cat /sys/block/{_drive}/size') as command:
match factor: sectorCount = command.read().strip()
case 'T': byteCount = int(sectorCount) * 512
size *= 1024 * 1024 * 1024 * 1024 return byteCount
case 'G':
size *= 1024 * 1024 * 1024
case 'M':
size *= 1024 * 1024
case 'K':
size *= 1024
case _:
pass
size = int(size/512)
size *= 512
return size
except: except:
return 0 return 0
@@ -211,7 +183,7 @@ class multiDriveStat():
if smartAttr == "Temperature:": if smartAttr == "Temperature:":
return float(parts[1]) return float(parts[1])
else: else:
return float(parts[0]) return float(parts[9])
except IndexError: except IndexError:
pass pass
@@ -240,19 +212,35 @@ class multiDriveStat():
for _ in self._stats: for _ in self._stats:
curData[_.name] = _.readWriteBytes() curData[_.name] = _.readWriteBytes()
return curData return curData
class systemData: class CPUInfo:
def __init__( self, _drive : str = 'nvme0n1' ): '''
self._drive = _drive This class deals with getting data about a Raspberry PI CPU
self._cpuTemp = CPUTemperature()
self._stats = DriveStats( self._drive ) '''
def __init__( self ):
self._cputemp = CPUTemperature()
@property @property
def CPUTemperature(self) -> int: def temperature( self ) -> float:
return self._cpuTemp.temperature '''
Obtain the temperature of the CPU. This utilizes a GPIO call to obtain
the CPU temp, via the CPUTemperature object from gpiozero
Returns:
A floating point number represetning the temperature in degrees C
'''
return self._cputemp.temperature
@property @property
def fanSpeed( self ) -> float: def CPUFanSpeed( self ) -> float:
'''
Obtain the speed of the CPU fan. This is based on monitoring the hardware
monitor, assuming that fan1_input is the fan connected to the CPU.
Return:
The fanspeed as a floating point number
'''
speed= 0 speed= 0
try: try:
command = os.popen( 'cat /sys/devices/platform/cooling_fan/hwmon/*/fan1_input' ) command = os.popen( 'cat /sys/devices/platform/cooling_fan/hwmon/*/fan1_input' )
@@ -264,40 +252,6 @@ class systemData:
return float(speed) return float(speed)
@property
def driveTemp(self) -> float:
smartOutRaw = ""
cmd = f'sudo smartctl -A /dev/{self._drive}'
try:
command = os.popen( cmd )
smartOutRaw = command.read()
except Exception as error:
print( f"Could not launch {cmd} error is {error}" )
return 0.0
finally:
command.close()
smartOut = [ l for l in smartOutRaw.split('\n') if l]
for smartAttr in ["Temperature:","194","190"]:
try:
line = [l for l in smartOut if l.startswith(smartAttr)][0]
parts = [p for p in line.replace('\t',' ').split(' ') if p]
if smartAttr == "Temperature:":
return float(parts[1])
else:
return float(parts[0])
except IndexError:
pass
return float(0.0)
@property
def driveStats(self) -> tuple[float,float]:
_data = self._stats.readWriteSectors()
readMB = (float(_data[0]) * 512.0)
writeMB = (float(_data[1]) * 512.0)
return (readMB, writeMB )
class CPULoad: class CPULoad:
''' '''
A class to help with obtaining the CPU load of the system. If there is more information A class to help with obtaining the CPU load of the system. If there is more information
@@ -321,6 +275,8 @@ class CPULoad:
# #
self._previousData : dict[str,tuple[int,int]] = self._getRawData() self._previousData : dict[str,tuple[int,int]] = self._getRawData()
self._names : list[str] = [] self._names : list[str] = []
self._cputemp : float = CPUTemperature()
# #
# For each CPU, reset the total and idle amount, and create the list # For each CPU, reset the total and idle amount, and create the list
# of names # of names
@@ -402,22 +358,21 @@ class CPULoad:
if __name__ == "__main__": if __name__ == "__main__":
data = systemData()
print( f"CPU Temp : {data.CPUTemperature}" )
print( f"Fan Speed: {data.fanSpeed}" )
print( f"NVME Temp: {data.driveTemp}" )
print( f"Stats : {data.driveStats}" )
load = CPULoad() load = CPULoad()
print( f"Number of CPU's = {len(load)}" ) print( f"Number of CPU's = {len(load)}" )
for i in range(2): for i in range(3):
time.sleep( 1 ) time.sleep( 1 )
percentage : dict[str,float] = load.getPercentages() percentage : dict[str,float] = load.getPercentages()
print( f"percentage: {percentage}" ) print( f"percentage: {percentage}" )
for item in percentage: for item in percentage:
print( f"{item} : {percentage[item]:.02f}" ) print( f"{item} : {percentage[item]:.02f}" )
test = multiDriveStat(["nvme0n1","sda","sdb"]) cpuinfo = CPUInfo()
print( f"CPU Temperature = {cpuinfo.temperature}" )
print( f"CPU Fan Speed = {cpuinfo.CPUFanSpeed}" )
test = multiDriveStat()
print( test.drives ) print( test.drives )
for drive in test.drives: for drive in test.drives:
print( f"Drive {drive} size is {test.driveSize( drive )}" ) print( f"Drive {drive} size is {test.driveSize( drive )}" )