Files
argon40-battery-display/monitor/oneUpMon.py
Jeff Curless 68f8057d8d Modify the code to remove single drive class
Remove the class for single drive data fetching, and so we can modify
the monitor code to make sure we can handle multiple drives
2025-10-26 12:12:30 -04:00

383 lines
13 KiB
Python
Executable File

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Application that monitors current CPU and Drive temp, along with fan speed and IO utilization
Requires: PyQt5 (including QtCharts)
"""
import sys
from systemsupport import CPUInfo, CPULoad, multiDriveStat
# --------------------------
# Globals
# --------------------------
cpuinfo = CPUInfo()
cpuload = CPULoad()
multiDrive = multiDriveStat()
# --------------------------
# UI
# --------------------------
from PyQt5.QtCore import Qt, QTimer
from PyQt5.QtGui import QPainter
from PyQt5.QtWidgets import QApplication, QMainWindow, QWidget, QGridLayout
from PyQt5.QtChart import QChart, QChartView, QLineSeries, QValueAxis
class RollingChart(QWidget):
'''
A reusable chart widget with one or more QLineSeries and a rolling X window.
Parameters:
title - Chart title.
series_defs - List of (name, color_qt_str or None) for each line.
y_min,y_max - Fixed Y axis range.
window - Number of points to keep (points are 1 per tick by default).
'''
def __init__(self, title: str, series_defs: list[tuple], y_min: float, y_max: float, window: int = 120, parent=None):
super().__init__(parent)
self.pointWindow = window
self.xpos = window - 1
self.chart = QChart()
self.chart.setTitle(title)
self.chart.legend().setVisible(len(series_defs) > 1)
self.chart.legend().setAlignment(Qt.AlignBottom)
self.series:list[QLineSeries] = []
for name, color in series_defs:
s = QLineSeries()
s.setName(name)
if color:
s.setColor(color) # QColor or string like "#RRGGBB"
self.series.append(s)
self.chart.addSeries(s)
# Setup X Axis... Note, setVisible disables all of this, however whatI
# want is the tick count etc, but NO lable on the axis. There does not
# appear to be a way to do that.
self.axis_x = QValueAxis()
self.axis_x.setRange(0, self.pointWindow)
self.axis_x.setMinorTickCount( 2 )
self.axis_x.setTickCount( 10 )
self.axis_x.setLabelFormat("%d")
self.axis_x.setVisible(False)
# Setup Y Axis...
self.axis_y = QValueAxis()
self.axis_y.setRange(y_min, y_max)
self.axis_y.setLabelFormat( "%d" )
self.chart.addAxis(self.axis_x, Qt.AlignBottom)
self.chart.addAxis(self.axis_y, Qt.AlignLeft)
for s in self.series:
s.attachAxis(self.axis_x)
s.attachAxis(self.axis_y)
self.view = QChartView(self.chart)
self.view.setRenderHints(QPainter.RenderHint.Antialiasing)
layout = QGridLayout(self)
layout.setContentsMargins(0, 0, 0, 0)
layout.addWidget(self.view, 0, 0)
def append(self, values: list[float]):
'''
Append one sample (for each series) at the next x value. Handles rolling window.
values must match the number of series.
Parameters:
values - A list of floating point numbers, on per data series in the
chart.
'''
self.xpos += 1
for s, v in zip(self.series, values):
# Handle NaN by skipping, or plot zero—here we clamp None/NaN to None and skip
try:
if v is None:
continue
# If you want to clamp, do it here: v = max(self.axis_y.min(), min(self.axis_y.max(), v))
s.append(self.xpos, float(v))
except Exception as error:
# ignore bad data points
print( f"Exception error {error}" )
pass
# Trim series to rolling window
min_x_to_keep = max(0, self.xpos - self.pointWindow)
self.axis_x.setRange(min_x_to_keep, self.xpos)
for s in self.series:
# Efficient trim: remove points with x < min_x_to_keep
# QLineSeries doesn't provide O(1) pop from front, so we rebuild if large
points = s.pointsVector()
if points and points[0].x() < min_x_to_keep:
# binary search for first index >= min_x_to_keep
lo, hi = 0, len(points)
while lo < hi:
mid = (lo + hi) // 2
if points[mid].x() < min_x_to_keep:
lo = mid + 1
else:
hi = mid
s.replace(points[lo:]) # keep tail only
class scaleValues:
def __init__( self, range_y ):
self.index = 0
self.valueRange = range_y
@property
def scale( self ):
return self.valueRange[self.index][1]
def scaleValue(self, value : float ):
return value / self.scale
def nextScale( self ):
if (self.index + 1) < len(self.valueRange):
self.index += 1
#print( f"Switched scale to {self.valueRange[self.index]}")
def prevScale( self ):
if self.index > 0:
self.index -= 1
#print( f"Switches scale to {self.valueRange[self.index]}")
def scalePointsDown( self, points ):
amount = self.valueRange[self.index][1]
for point in points:
point.setY(point.y() / amount)
def scaleDown( self, value ):
return value / 1024
def scaleUp( self, value ):
return value * 1024
def scalePointsUp( self, points ):
amount = self.valueRange[self.index][1]
for point in points:
point.setY(point.y() * amount)
@property
def name( self ):
return self.valueRange[self.index][0]
class RollingChartDynamic(RollingChart):
def __init__(self, title : str, series_defs: list[tuple], range_y : list[tuple], window=120,parent=None):
self.maxY = 512
super().__init__(title,series_defs,0,self.maxY,window,parent)
self.title = title
self.max = 0
self.scale = scaleValues(range_y)
self.chart.setTitle( title+ f" ({self.scale.name})" )
def getBestFit( self, value ):
values = [4,8,16,32,64,128,256,512,1024]
for i in values:
if value < i:
return i
return 4
def append(self, values: list[float]):
'''
Append one sample (for each series) at the next x value. Handles rolling window.
values must match the number of series.
Parameters:
values - A list of floating point numbers, on per data series in the
chart.
'''
scaleUp = False
self.xpos += 1
for s, v in zip(self.series, values):
# Handle NaN by skipping, or plot zero—here we clamp None/NaN to None and skip
try:
if v is None:
continue
sv = self.scale.scaleValue(v)
if sv > 1024:
scaleUp = True
# If you want to clamp, do it here: v = max(self.axis_y.min(), min(self.axis_y.max(), v))
#if v:
# print( f"value : {v} scaled: {sv} " )
s.append(self.xpos, float(sv))
except Exception as error:
# ignore bad data points
print( f"Exception error {error}" )
pass
# Trim series to rolling window
min_x_to_keep = max(0, self.xpos - self.pointWindow)
self.axis_x.setRange(min_x_to_keep, self.xpos)
if scaleUp:
self.scale.nextScale()
self.chart.setTitle(self.title + f" ({self.scale.name})" )
maxV = 0
for s in self.series:
drop = 0
points = s.pointsVector()
for index, point in enumerate(points):
if point.x() < min_x_to_keep:
drop = index
if scaleUp:
point.setY( self.scale.scaleDown(point.y()))
if maxV < point.y():
maxV = point.y()
s.replace( points[drop:] )
if maxV > 1:
self.axis_y.setRange( 0, self.getBestFit(maxV) )
#print( f"maxV left is {maxV}" )
if maxV < 1:
self.scale.prevScale()
self.chart.setTitle( self.title + f" ({self.scale.name})")
for s in self.series:
points = s.pointsVector()
for point in points:
point.setY( self.scale.scaleUp(point.y()))
s.replace(points)
class MonitorWindow(QMainWindow):
'''
Creating a window to monitor various system aspects.
Parameters:
refresh_ms - Time between refreshes of data on screen, in milliseconds, the
default is 1 second.
window - How much data do we want to store in the graph? Each data point
is a data refresh period.
Parent - Owning parent of this window... default is None.
'''
def __init__(self, refresh_ms: int = 1000, window = 120, parent=None):
super().__init__(parent)
self.setWindowTitle("System Monitor")
self.setMinimumSize(900, 900)
central = QWidget(self)
grid = QGridLayout(central)
grid.setContentsMargins(8, 8, 8, 8)
grid.setHorizontalSpacing(8)
grid.setVerticalSpacing(8)
self.setCentralWidget(central)
# Charts
self.use_chart = RollingChart(
title="CPU Utilization",
series_defs=[ (name, None) for name in cpuload.cpuNames ],
y_min=0, y_max=100,
window=120
)
series = [("CPU", None)]
for name in multiDrive.drives:
series.append( (name,None) )
self.cpu_chart = RollingChart(
title="Temperature (°C)",
series_defs= series,
y_min=20, y_max=80,
window=window
)
self.fan_chart = RollingChart(
title="Fan Speed",
series_defs=[("RPM",None)],
y_min=0,y_max=6000,
window=window
)
series = []
for name in multiDrive.drives:
series.append( (f"{name} Read", None) )
series.append( (f"{name} Write", None ) )
self.io_chart = RollingChartDynamic(
title="Disk I/O",
series_defs=series,
range_y=[("Bytes/s", 1),("KiB/s",1024),("MiB/s", 1024*1024),("GiB/s",1024*1024*1024)],
window=window,
)
# Layout: 2x2 grid (CPU, NVMe on top; IO full width bottom)
grid.addWidget(self.use_chart, 0, 0, 1, 2 )
grid.addWidget(self.io_chart, 1, 0, 1, 2 )
grid.addWidget(self.cpu_chart, 2, 0, 1, 1 )
grid.addWidget(self.fan_chart, 2, 1, 1, 1 )
# Get the initial information from the syste
self.refresh_metrics()
# Timer
self.timer = QTimer(self)
self.timer.timeout.connect(self.refresh_metrics)
self.timer.start(refresh_ms)
def refresh_metrics(self):
'''
This routine is called periodically, as setup in the __init__ functon. Since this
routine calls out to other things, we want to make sure that there is no possible
exception, so everything needs to be wrapped in a handler.
'''
# Obtain the current fan speed
try:
fan_speed = cpuinfo.CPUFanSpeed
except Exception:
fan_speed = None
temperatures = []
try:
temperatures.append( float(cpuinfo.temperature) )
except Exception:
temperatures.append( 0.0 )
# Obtain the NVMe device temperature
try:
for _drive in multiDrive.drives:
temperatures.append( multiDrive.driveTemp( _drive ) )
except Exception:
temperatures = [ 0.0 for _ in multiDrive.drives ]
# Obtain the NVMe Device read and write rates
try:
rwData = []
drives = multiDrive.readWriteBytes()
for drive in drives:
rwData.append( float(drives[drive][0]))
rwData.append( float(drives[drive][1]))
except Exception :
rwData = [ None, None ]
# Get the CPU load precentages
try:
p = cpuload.getPercentages()
values = [p[name] for name in cpuload.cpuNames]
except Exception:
values = [ None for name in cpuload.cpuNames ]
# Append to charts
self.cpu_chart.append( temperatures )
self.fan_chart.append([fan_speed])
self.io_chart.append( rwData )
self.use_chart.append( values )
def main():
app = QApplication(sys.argv)
w = MonitorWindow(refresh_ms=1000)
w.show()
sys.exit(app.exec_())
if __name__ == "__main__":
main()