Merge pull request #10 from JeffCurless/multiDriveSupport

Multi drive support
This commit is contained in:
Jeff Curless
2025-10-20 20:49:49 -04:00
committed by GitHub
2 changed files with 185 additions and 34 deletions

View File

@@ -8,13 +8,14 @@ Requires: PyQt5 (including QtCharts)
""" """
import sys import sys
from systemsupport import systemData, CPULoad from systemsupport import systemData, CPULoad, multiDriveStat
# -------------------------- # --------------------------
# Globals # Globals
# -------------------------- # --------------------------
sysdata = systemData() sysdata = systemData()
cpuload = CPULoad() cpuload = CPULoad()
multiDrive = multiDriveStat()
# -------------------------- # --------------------------
# UI # UI
@@ -277,12 +278,12 @@ class MonitorWindow(QMainWindow):
window=120 window=120
) )
series = [("CPU", None)]
for name in multiDrive.drives:
series.append( (name,None) )
self.cpu_chart = RollingChart( self.cpu_chart = RollingChart(
title="Temperature (°C)", title="Temperature (°C)",
series_defs=[ series_defs= series,
("CPU", None),
("NVMe", None),
],
y_min=20, y_max=80, y_min=20, y_max=80,
window=window window=window
) )
@@ -294,12 +295,14 @@ class MonitorWindow(QMainWindow):
window=window window=window
) )
series = []
for name in multiDrive.drives:
series.append( (f"{name} Read", None) )
series.append( (f"{name} Write", None ) )
self.io_chart = RollingChartDynamic( self.io_chart = RollingChartDynamic(
title="Disk I/O", title="Disk I/O",
series_defs=[ series_defs=series,
("Read", None),
("Write", None),
],
range_y=[("Bytes/s", 1),("KiB/s",1024),("MiB/s", 1024*1024),("GiB/s",1024*1024*1024)], range_y=[("Bytes/s", 1),("KiB/s",1024),("MiB/s", 1024*1024),("GiB/s",1024*1024*1024)],
window=window, window=window,
) )
@@ -325,11 +328,6 @@ class MonitorWindow(QMainWindow):
exception, so everything needs to be wrapped in a handler. exception, so everything needs to be wrapped in a handler.
''' '''
# Obtain the CPU temperature
try:
cpu_c = float(sysdata.CPUTemperature)
except Exception:
cpu_c = None
# Obtain the current fan speed # Obtain the current fan speed
try: try:
@@ -337,19 +335,28 @@ class MonitorWindow(QMainWindow):
except Exception: except Exception:
fan_speed = None fan_speed = None
temperatures = []
try:
temperatures.append( float(sysdata.CPUTemperature) )
except Exception:
temperatures.append( 0.0 )
# Obtain the NVMe device temperature # Obtain the NVMe device temperature
try: try:
nvme_c = sysdata.driveTemp for _drive in multiDrive.drives:
temperatures.append( multiDrive.driveTemp( _drive ) )
except Exception: except Exception:
nvme_c = None temperatures = [ 0.0 for _ in multiDrive.drives ]
# Obtain the NVMe Device read and write rates # Obtain the NVMe Device read and write rates
try: try:
read_mb, write_mb = sysdata.driveStats rwData = []
read_mb = float(read_mb) drives = multiDrive.readWriteBytes()
write_mb = float(write_mb) for drive in drives:
rwData.append( float(drives[drive][0]))
rwData.append( float(drives[drive][1]))
except Exception : except Exception :
read_mb, write_mb = None, None rwData = [ None, None ]
# Get the CPU load precentages # Get the CPU load precentages
try: try:
@@ -359,9 +366,9 @@ class MonitorWindow(QMainWindow):
values = [ None for name in cpuload.cpuNames ] values = [ None for name in cpuload.cpuNames ]
# Append to charts # Append to charts
self.cpu_chart.append([cpu_c,nvme_c]) self.cpu_chart.append( temperatures )
self.fan_chart.append([fan_speed]) self.fan_chart.append([fan_speed])
self.io_chart.append([read_mb, write_mb]) self.io_chart.append( rwData )
self.use_chart.append( values ) self.use_chart.append( values )
def main(): def main():

View File

@@ -43,8 +43,8 @@ class DriveStats:
def __init__( self, device:str ): def __init__( self, device:str ):
self._last : list[int] = [] self._last : list[int] = []
self._stats : list[int] = []
self._device = device self._device = device
self._stats : list[int] = []
self._readStats() self._readStats()
def _readStats( self ): def _readStats( self ):
@@ -80,6 +80,10 @@ class DriveStats:
curData = [ d-self._last[i] for i,d in enumerate( self._stats ) ] curData = [ d-self._last[i] for i,d in enumerate( self._stats ) ]
return curData return curData
@property
def name(self) -> str:
return self._device
def readAllStats( self ) -> list[int]: def readAllStats( self ) -> list[int]:
''' '''
read all of the drive statisics from sysfs for the device. read all of the drive statisics from sysfs for the device.
@@ -102,10 +106,144 @@ class DriveStats:
curData = self._getStats() curData = self._getStats()
return (curData[DriveStats.READ_SECTORS],curData[DriveStats.WRITE_SECTORS]) return (curData[DriveStats.READ_SECTORS],curData[DriveStats.WRITE_SECTORS])
def readWriteBytes( self ) -> tuple[int,int]:
curData = self._getStats()
return (curData[DriveStats.READ_SECTORS]*512,curData[DriveStats.WRITE_SECTORS]*512)
class multiDriveStat():
'''
This class allow for monitoring multiple drives at the same time. There are
two mechanisms used to create this class. The first is with no parameters,
in this case, the system will automatically grab all of the drives and setup
to monitor them.
The second method is to provide this class with a list of drives to monitor.
In this cased all of the drives that are NOT on that list are filtered out and
only the drives left will be processed.
If there is a missing drive from the filter, that drive is eliminated.
Parameters:
driveList - if None, generate a list by asking the system
- Otherwise remove all drives from the generated list that
are not in the monitor list.
'''
def __init__(self,driveList:list[str] | None = None):
#
# Get all drives
#
with os.popen( 'lsblk -o NAME,SIZE,TYPE | grep disk') as command:
lsblk_raw = command.read()
lsblk_out = [ l for l in lsblk_raw.split('\n') if l]
self._driveInfo = {}
for l in lsblk_out:
_item = l.split()
self._driveInfo[_item[0]] = _item[1]
# filter out drives
if driveList is not None:
_temp = {}
for _drive in driveList:
try:
_temp[_drive] = self._driveInfo[_drive]
except:
print( f"Filtering out drive {_drive}, not currently connected to system." )
self._driveInfo = _temp
self._stats = [ DriveStats(_) for _ in self._driveInfo ]
@property
def drives(self) -> list[str]:
'''
This attribute is used to list all of the drives that are being monitored
Returns:
A list of drives
'''
drives = [ _ for _ in self._driveInfo]
return drives
def driveSize( self, _drive ) -> int:
'''
This function is called to obtain the size of the drive requested.
Parameters:
_drive - the drive to lookup
Returns:
The size in bytes, or 0 if the drive does not exist
'''
try:
factor = self._driveInfo[_drive][-1:]
size = float(self._driveInfo[_drive][:-1])
match factor:
case 'T':
size *= 1024 * 1024 * 1024 * 1024
case 'G':
size *= 1024 * 1024 * 1024
case 'M':
size *= 1024 * 1024
case 'K':
size *= 1024
case _:
pass
size = int(size/512)
size *= 512
return size
except:
return 0
def driveTemp(self,_drive) -> float:
smartOutRaw = ""
cmd = f'sudo smartctl -A /dev/{_drive}'
try:
command = os.popen( cmd )
smartOutRaw = command.read()
except Exception as error:
print( f"Could not launch {cmd} error is {error}" )
return 0.0
finally:
command.close()
smartOut = [ l for l in smartOutRaw.split('\n') if l]
for smartAttr in ["Temperature:","194","190"]:
try:
line = [l for l in smartOut if l.startswith(smartAttr)][0]
parts = [p for p in line.replace('\t',' ').split(' ') if p]
if smartAttr == "Temperature:":
return float(parts[1])
else:
return float(parts[0])
except IndexError:
pass
return float(0.0)
def readWriteSectors( self )-> dict[str,tuple[int,int]]:
'''
Obtain the number of sectors read and written since the last
time this function as called.
Returns:
A dictionary of the data, they key is the drive name, the value is the
read/write tuple.
'''
curData = {}
for _ in self._stats:
curData[_.name] = _.readWriteSectors()
return curData
def readWriteBytes( self ) -> dict[str,tuple[int,int]]:
'''
Just like the readWriteSectors function but returns the data in Bytes
'''
curData = {}
for _ in self._stats:
curData[_.name] = _.readWriteBytes()
return curData
class systemData: class systemData:
def __init__( self, drive : str = 'nvme0n1' ): def __init__( self, _drive : str = 'nvme0n1' ):
self._drive = drive self._drive = _drive
self._cpuTemp = CPUTemperature() self._cpuTemp = CPUTemperature()
self._stats = DriveStats( self._drive ) self._stats = DriveStats( self._drive )
@@ -156,8 +294,8 @@ class systemData:
@property @property
def driveStats(self) -> tuple[float,float]: def driveStats(self) -> tuple[float,float]:
_data = self._stats.readWriteSectors() _data = self._stats.readWriteSectors()
readMB = (float(_data[0]) * 512.0) #/ (1024.0 * 1024.0) readMB = (float(_data[0]) * 512.0)
writeMB = (float(_data[1]) * 512.0) #/ (1024.0 * 1024.0) writeMB = (float(_data[1]) * 512.0)
return (readMB, writeMB ) return (readMB, writeMB )
class CPULoad: class CPULoad:
@@ -177,7 +315,7 @@ class CPULoad:
This is usually not an issue. This is usually not an issue.
''' '''
def __init__( self ): def __init__( self ) -> None:
# #
# Get the current data # Get the current data
# #
@@ -272,10 +410,16 @@ if __name__ == "__main__":
load = CPULoad() load = CPULoad()
print( f"Number of CPU's = {len(load)}" ) print( f"Number of CPU's = {len(load)}" )
for i in range(10): for i in range(2):
time.sleep( 1 ) time.sleep( 1 )
percentage : dict[str,float] = load.getPercentages() percentage : dict[str,float] = load.getPercentages()
print( f"percentage: {percentage}" ) print( f"percentage: {percentage}" )
for item in percentage: for item in percentage:
print( f"{item} : {percentage[item]:.02f}" ) print( f"{item} : {percentage[item]:.02f}" )
test = multiDriveStat(["nvme0n1","sda","sdb"])
print( test.drives )
for drive in test.drives:
print( f"Drive {drive} size is {test.driveSize( drive )}" )
print( test.readWriteSectors() )