From 68f8057d8dcb41a91b0607d968b1f424c947edfe Mon Sep 17 00:00:00 2001 From: Jeff Curless Date: Sun, 26 Oct 2025 12:12:30 -0400 Subject: [PATCH] Modify the code to remove single drive class Remove the class for single drive data fetching, and so we can modify the monitor code to make sure we can handle multiple drives --- monitor/oneUpMon.py | 8 +-- monitor/systemsupport.py | 139 +++++++++++++-------------------------- 2 files changed, 51 insertions(+), 96 deletions(-) diff --git a/monitor/oneUpMon.py b/monitor/oneUpMon.py index 2c7df31..6301b3b 100755 --- a/monitor/oneUpMon.py +++ b/monitor/oneUpMon.py @@ -8,12 +8,12 @@ Requires: PyQt5 (including QtCharts) """ import sys -from systemsupport import systemData, CPULoad, multiDriveStat +from systemsupport import CPUInfo, CPULoad, multiDriveStat # -------------------------- # Globals # -------------------------- -sysdata = systemData() +cpuinfo = CPUInfo() cpuload = CPULoad() multiDrive = multiDriveStat() @@ -331,13 +331,13 @@ class MonitorWindow(QMainWindow): # Obtain the current fan speed try: - fan_speed = sysdata.fanSpeed + fan_speed = cpuinfo.CPUFanSpeed except Exception: fan_speed = None temperatures = [] try: - temperatures.append( float(sysdata.CPUTemperature) ) + temperatures.append( float(cpuinfo.temperature) ) except Exception: temperatures.append( 0.0 ) diff --git a/monitor/systemsupport.py b/monitor/systemsupport.py index b42c377..b7fb60f 100755 --- a/monitor/systemsupport.py +++ b/monitor/systemsupport.py @@ -123,32 +123,16 @@ class multiDriveStat(): If there is a missing drive from the filter, that drive is eliminated. - Parameters: - driveList - if None, generate a list by asking the system - - Otherwise remove all drives from the generated list that - are not in the monitor list. ''' - def __init__(self,driveList:list[str] | None = None): + def __init__(self): # # Get all drives # - with os.popen( 'lsblk -o NAME,SIZE,TYPE | grep disk') as command: + self._drives = [] + with os.popen( 'ls -1 /sys/block | grep -v -e loop -e ram') as command: lsblk_raw = command.read() - lsblk_out = [ l for l in lsblk_raw.split('\n') if l] - self._driveInfo = {} - for l in lsblk_out: - _item = l.split() - self._driveInfo[_item[0]] = _item[1] - # filter out drives - if driveList is not None: - _temp = {} - for _drive in driveList: - try: - _temp[_drive] = self._driveInfo[_drive] - except: - print( f"Filtering out drive {_drive}, not currently connected to system." ) - self._driveInfo = _temp - self._stats = [ DriveStats(_) for _ in self._driveInfo ] + self._drives = [ l for l in lsblk_raw.split('\n') if l] + self._stats = [ DriveStats(_) for _ in self._drives ] @property def drives(self) -> list[str]: @@ -158,8 +142,7 @@ class multiDriveStat(): Returns: A list of drives ''' - drives = [ _ for _ in self._driveInfo] - return drives + return self._drives def driveSize( self, _drive ) -> int: ''' @@ -172,22 +155,11 @@ class multiDriveStat(): The size in bytes, or 0 if the drive does not exist ''' try: - factor = self._driveInfo[_drive][-1:] - size = float(self._driveInfo[_drive][:-1]) - match factor: - case 'T': - size *= 1024 * 1024 * 1024 * 1024 - case 'G': - size *= 1024 * 1024 * 1024 - case 'M': - size *= 1024 * 1024 - case 'K': - size *= 1024 - case _: - pass - size = int(size/512) - size *= 512 - return size + byteCount = 0 + with os.popen(f'cat /sys/block/{_drive}/size') as command: + sectorCount = command.read().strip() + byteCount = int(sectorCount) * 512 + return byteCount except: return 0 @@ -211,7 +183,7 @@ class multiDriveStat(): if smartAttr == "Temperature:": return float(parts[1]) else: - return float(parts[0]) + return float(parts[9]) except IndexError: pass @@ -240,19 +212,35 @@ class multiDriveStat(): for _ in self._stats: curData[_.name] = _.readWriteBytes() return curData - -class systemData: - def __init__( self, _drive : str = 'nvme0n1' ): - self._drive = _drive - self._cpuTemp = CPUTemperature() - self._stats = DriveStats( self._drive ) - + +class CPUInfo: + ''' + This class deals with getting data about a Raspberry PI CPU + + ''' + def __init__( self ): + self._cputemp = CPUTemperature() + @property - def CPUTemperature(self) -> int: - return self._cpuTemp.temperature - + def temperature( self ) -> float: + ''' + Obtain the temperature of the CPU. This utilizes a GPIO call to obtain + the CPU temp, via the CPUTemperature object from gpiozero + + Returns: + A floating point number represetning the temperature in degrees C + ''' + return self._cputemp.temperature + @property - def fanSpeed( self ) -> float: + def CPUFanSpeed( self ) -> float: + ''' + Obtain the speed of the CPU fan. This is based on monitoring the hardware + monitor, assuming that fan1_input is the fan connected to the CPU. + + Return: + The fanspeed as a floating point number + ''' speed= 0 try: command = os.popen( 'cat /sys/devices/platform/cooling_fan/hwmon/*/fan1_input' ) @@ -264,40 +252,6 @@ class systemData: return float(speed) - @property - def driveTemp(self) -> float: - smartOutRaw = "" - cmd = f'sudo smartctl -A /dev/{self._drive}' - try: - command = os.popen( cmd ) - smartOutRaw = command.read() - except Exception as error: - print( f"Could not launch {cmd} error is {error}" ) - return 0.0 - finally: - command.close() - - smartOut = [ l for l in smartOutRaw.split('\n') if l] - for smartAttr in ["Temperature:","194","190"]: - try: - line = [l for l in smartOut if l.startswith(smartAttr)][0] - parts = [p for p in line.replace('\t',' ').split(' ') if p] - if smartAttr == "Temperature:": - return float(parts[1]) - else: - return float(parts[0]) - except IndexError: - pass - - return float(0.0) - - @property - def driveStats(self) -> tuple[float,float]: - _data = self._stats.readWriteSectors() - readMB = (float(_data[0]) * 512.0) - writeMB = (float(_data[1]) * 512.0) - return (readMB, writeMB ) - class CPULoad: ''' A class to help with obtaining the CPU load of the system. If there is more information @@ -321,6 +275,8 @@ class CPULoad: # self._previousData : dict[str,tuple[int,int]] = self._getRawData() self._names : list[str] = [] + self._cputemp : float = CPUTemperature() + # # For each CPU, reset the total and idle amount, and create the list # of names @@ -402,22 +358,21 @@ class CPULoad: if __name__ == "__main__": - data = systemData() - print( f"CPU Temp : {data.CPUTemperature}" ) - print( f"Fan Speed: {data.fanSpeed}" ) - print( f"NVME Temp: {data.driveTemp}" ) - print( f"Stats : {data.driveStats}" ) load = CPULoad() print( f"Number of CPU's = {len(load)}" ) - for i in range(2): + for i in range(3): time.sleep( 1 ) percentage : dict[str,float] = load.getPercentages() print( f"percentage: {percentage}" ) for item in percentage: print( f"{item} : {percentage[item]:.02f}" ) - test = multiDriveStat(["nvme0n1","sda","sdb"]) + cpuinfo = CPUInfo() + print( f"CPU Temperature = {cpuinfo.temperature}" ) + print( f"CPU Fan Speed = {cpuinfo.CPUFanSpeed}" ) + + test = multiDriveStat() print( test.drives ) for drive in test.drives: print( f"Drive {drive} size is {test.driveSize( drive )}" )