Merge pull request #11 from JeffCurless/cleanup_drive_code

Modify the code to remove single drive class
This commit is contained in:
Jeff Curless
2025-10-26 12:15:15 -04:00
committed by GitHub
2 changed files with 51 additions and 96 deletions

View File

@@ -8,12 +8,12 @@ Requires: PyQt5 (including QtCharts)
""" """
import sys import sys
from systemsupport import systemData, CPULoad, multiDriveStat from systemsupport import CPUInfo, CPULoad, multiDriveStat
# -------------------------- # --------------------------
# Globals # Globals
# -------------------------- # --------------------------
sysdata = systemData() cpuinfo = CPUInfo()
cpuload = CPULoad() cpuload = CPULoad()
multiDrive = multiDriveStat() multiDrive = multiDriveStat()
@@ -331,13 +331,13 @@ class MonitorWindow(QMainWindow):
# Obtain the current fan speed # Obtain the current fan speed
try: try:
fan_speed = sysdata.fanSpeed fan_speed = cpuinfo.CPUFanSpeed
except Exception: except Exception:
fan_speed = None fan_speed = None
temperatures = [] temperatures = []
try: try:
temperatures.append( float(sysdata.CPUTemperature) ) temperatures.append( float(cpuinfo.temperature) )
except Exception: except Exception:
temperatures.append( 0.0 ) temperatures.append( 0.0 )

View File

@@ -123,32 +123,16 @@ class multiDriveStat():
If there is a missing drive from the filter, that drive is eliminated. If there is a missing drive from the filter, that drive is eliminated.
Parameters:
driveList - if None, generate a list by asking the system
- Otherwise remove all drives from the generated list that
are not in the monitor list.
''' '''
def __init__(self,driveList:list[str] | None = None): def __init__(self):
# #
# Get all drives # Get all drives
# #
with os.popen( 'lsblk -o NAME,SIZE,TYPE | grep disk') as command: self._drives = []
with os.popen( 'ls -1 /sys/block | grep -v -e loop -e ram') as command:
lsblk_raw = command.read() lsblk_raw = command.read()
lsblk_out = [ l for l in lsblk_raw.split('\n') if l] self._drives = [ l for l in lsblk_raw.split('\n') if l]
self._driveInfo = {} self._stats = [ DriveStats(_) for _ in self._drives ]
for l in lsblk_out:
_item = l.split()
self._driveInfo[_item[0]] = _item[1]
# filter out drives
if driveList is not None:
_temp = {}
for _drive in driveList:
try:
_temp[_drive] = self._driveInfo[_drive]
except:
print( f"Filtering out drive {_drive}, not currently connected to system." )
self._driveInfo = _temp
self._stats = [ DriveStats(_) for _ in self._driveInfo ]
@property @property
def drives(self) -> list[str]: def drives(self) -> list[str]:
@@ -158,8 +142,7 @@ class multiDriveStat():
Returns: Returns:
A list of drives A list of drives
''' '''
drives = [ _ for _ in self._driveInfo] return self._drives
return drives
def driveSize( self, _drive ) -> int: def driveSize( self, _drive ) -> int:
''' '''
@@ -172,22 +155,11 @@ class multiDriveStat():
The size in bytes, or 0 if the drive does not exist The size in bytes, or 0 if the drive does not exist
''' '''
try: try:
factor = self._driveInfo[_drive][-1:] byteCount = 0
size = float(self._driveInfo[_drive][:-1]) with os.popen(f'cat /sys/block/{_drive}/size') as command:
match factor: sectorCount = command.read().strip()
case 'T': byteCount = int(sectorCount) * 512
size *= 1024 * 1024 * 1024 * 1024 return byteCount
case 'G':
size *= 1024 * 1024 * 1024
case 'M':
size *= 1024 * 1024
case 'K':
size *= 1024
case _:
pass
size = int(size/512)
size *= 512
return size
except: except:
return 0 return 0
@@ -211,7 +183,7 @@ class multiDriveStat():
if smartAttr == "Temperature:": if smartAttr == "Temperature:":
return float(parts[1]) return float(parts[1])
else: else:
return float(parts[0]) return float(parts[9])
except IndexError: except IndexError:
pass pass
@@ -241,18 +213,34 @@ class multiDriveStat():
curData[_.name] = _.readWriteBytes() curData[_.name] = _.readWriteBytes()
return curData return curData
class systemData: class CPUInfo:
def __init__( self, _drive : str = 'nvme0n1' ): '''
self._drive = _drive This class deals with getting data about a Raspberry PI CPU
self._cpuTemp = CPUTemperature()
self._stats = DriveStats( self._drive ) '''
def __init__( self ):
self._cputemp = CPUTemperature()
@property @property
def CPUTemperature(self) -> int: def temperature( self ) -> float:
return self._cpuTemp.temperature '''
Obtain the temperature of the CPU. This utilizes a GPIO call to obtain
the CPU temp, via the CPUTemperature object from gpiozero
Returns:
A floating point number represetning the temperature in degrees C
'''
return self._cputemp.temperature
@property @property
def fanSpeed( self ) -> float: def CPUFanSpeed( self ) -> float:
'''
Obtain the speed of the CPU fan. This is based on monitoring the hardware
monitor, assuming that fan1_input is the fan connected to the CPU.
Return:
The fanspeed as a floating point number
'''
speed= 0 speed= 0
try: try:
command = os.popen( 'cat /sys/devices/platform/cooling_fan/hwmon/*/fan1_input' ) command = os.popen( 'cat /sys/devices/platform/cooling_fan/hwmon/*/fan1_input' )
@@ -264,40 +252,6 @@ class systemData:
return float(speed) return float(speed)
@property
def driveTemp(self) -> float:
smartOutRaw = ""
cmd = f'sudo smartctl -A /dev/{self._drive}'
try:
command = os.popen( cmd )
smartOutRaw = command.read()
except Exception as error:
print( f"Could not launch {cmd} error is {error}" )
return 0.0
finally:
command.close()
smartOut = [ l for l in smartOutRaw.split('\n') if l]
for smartAttr in ["Temperature:","194","190"]:
try:
line = [l for l in smartOut if l.startswith(smartAttr)][0]
parts = [p for p in line.replace('\t',' ').split(' ') if p]
if smartAttr == "Temperature:":
return float(parts[1])
else:
return float(parts[0])
except IndexError:
pass
return float(0.0)
@property
def driveStats(self) -> tuple[float,float]:
_data = self._stats.readWriteSectors()
readMB = (float(_data[0]) * 512.0)
writeMB = (float(_data[1]) * 512.0)
return (readMB, writeMB )
class CPULoad: class CPULoad:
''' '''
A class to help with obtaining the CPU load of the system. If there is more information A class to help with obtaining the CPU load of the system. If there is more information
@@ -321,6 +275,8 @@ class CPULoad:
# #
self._previousData : dict[str,tuple[int,int]] = self._getRawData() self._previousData : dict[str,tuple[int,int]] = self._getRawData()
self._names : list[str] = [] self._names : list[str] = []
self._cputemp : float = CPUTemperature()
# #
# For each CPU, reset the total and idle amount, and create the list # For each CPU, reset the total and idle amount, and create the list
# of names # of names
@@ -402,22 +358,21 @@ class CPULoad:
if __name__ == "__main__": if __name__ == "__main__":
data = systemData()
print( f"CPU Temp : {data.CPUTemperature}" )
print( f"Fan Speed: {data.fanSpeed}" )
print( f"NVME Temp: {data.driveTemp}" )
print( f"Stats : {data.driveStats}" )
load = CPULoad() load = CPULoad()
print( f"Number of CPU's = {len(load)}" ) print( f"Number of CPU's = {len(load)}" )
for i in range(2): for i in range(3):
time.sleep( 1 ) time.sleep( 1 )
percentage : dict[str,float] = load.getPercentages() percentage : dict[str,float] = load.getPercentages()
print( f"percentage: {percentage}" ) print( f"percentage: {percentage}" )
for item in percentage: for item in percentage:
print( f"{item} : {percentage[item]:.02f}" ) print( f"{item} : {percentage[item]:.02f}" )
test = multiDriveStat(["nvme0n1","sda","sdb"]) cpuinfo = CPUInfo()
print( f"CPU Temperature = {cpuinfo.temperature}" )
print( f"CPU Fan Speed = {cpuinfo.CPUFanSpeed}" )
test = multiDriveStat()
print( test.drives ) print( test.drives )
for drive in test.drives: for drive in test.drives:
print( f"Drive {drive} size is {test.driveSize( drive )}" ) print( f"Drive {drive} size is {test.driveSize( drive )}" )