Merge pull request #4 from JeffCurless/addMonitor

Add system monitoring code
This commit is contained in:
Jeff Curless
2025-10-12 23:08:26 -04:00
committed by GitHub
4 changed files with 611 additions and 0 deletions

53
monitor/cpuload.py Executable file
View File

@@ -0,0 +1,53 @@
#!/usr/bin/env python3
import os
import time
class CPULoad:
def __init__( self ):
self._previousData = self._getRawData()
self._names = []
for item in self._previousData:
self._names.append( item )
def _getRawData( self ):
result = {}
with open( "/proc/stat", "r") as f:
allLines = f.readlines()
for line in allLines:
cpu = line.replace('\t', ' ').strip().split()
if (len(cpu[0]) > 3) and (cpu[0][:3] == "cpu"):
total = 0
idle = 0
for i in range( 1, len(cpu)):
total += int(cpu[i])
if i == 4 or i == 5:
idle += int(cpu[i])
result[cpu[0]] = (total,idle)
return result
def getPercentages( self ):
results = {}
current = self._getRawData()
for item in current:
total = current[item][0] - self._previousData[item][0]
idle = current[item][1] - self._previousData[item][1]
percent = ((total - idle)/total) * 100
results[item] = percent
self._previousData = current
return results
@property
def cpuNames( self ):
return self._names
def __len__(self):
return len(self._previousData)
if __name__ == "__main__":
load = CPULoad()
print( f"Number of CPU's = {len(load)}" )
while True:
time.sleep( 1 )
percentage = load.getPercentages()
for item in percentage:
print( f"{item} : {percentage[item]:.02f}" )

228
monitor/oneUpMon.py Executable file
View File

@@ -0,0 +1,228 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Application that monitors current CPU and Drive temp, along with fan speed and IO utilization
Requires: PyQt5 (including QtCharts)
"""
import sys
from typing import Tuple, List
from gpiozero import CPUTemperature
from oneUpSupport import systemData
from cpuload import CPULoad
import os
# --------------------------
# Globals
# --------------------------
sysdata = None
cpuload = CPULoad()
# --------------------------
# UI
# --------------------------
from PyQt5.QtCore import Qt, QTimer
from PyQt5.QtGui import QPainter
from PyQt5.QtWidgets import QApplication, QMainWindow, QWidget, QGridLayout, QLabel
from PyQt5.QtChart import QChart, QChartView, QLineSeries, QValueAxis
class RollingChart(QWidget):
"""
A reusable chart widget with one or more QLineSeries and a rolling X window.
Args:
title: Chart title
series_defs: List of (name, color_qt_str or None) for each line
y_min, y_max: Fixed Y axis range
window: number of points to keep (points are 1 per tick by default)
"""
def __init__(self, title: str, series_defs: List[tuple], y_min: float, y_max: float, window: int = 120, parent=None):
super().__init__(parent)
self.window = window
self.x = 0
self.series: List[QLineSeries] = []
self.chart = QChart()
self.chart.setTitle(title)
self.chart.legend().setVisible(len(series_defs) > 1)
self.chart.legend().setAlignment(Qt.AlignBottom)
for name, color in series_defs:
s = QLineSeries()
s.setName(name)
if color:
s.setColor(color) # QColor or string like "#RRGGBB"
self.series.append(s)
self.chart.addSeries(s)
# Axes
self.axis_x = QValueAxis()
self.axis_x.setRange(0, self.window)
#self.axis_x.setTitleText("Seconds")
self.axis_x.setLabelFormat("%d")
self.axis_y = QValueAxis()
self.axis_y.setRange(y_min, y_max)
self.chart.addAxis(self.axis_x, Qt.AlignBottom)
self.chart.addAxis(self.axis_y, Qt.AlignLeft)
for s in self.series:
s.attachAxis(self.axis_x)
s.attachAxis(self.axis_y)
self.view = QChartView(self.chart)
self.view.setRenderHints(QPainter.RenderHint.Antialiasing)
layout = QGridLayout(self)
layout.setContentsMargins(0, 0, 0, 0)
layout.addWidget(self.view, 0, 0)
def append(self, values: List[float]):
"""
Append one sample (for each series) at the next x value. Handles rolling window.
values must match the number of series.
"""
self.x += 1
for s, v in zip(self.series, values):
# Handle NaN by skipping, or plot zero—here we clamp None/NaN to None and skip
try:
if v is None:
continue
# If you want to clamp, do it here: v = max(self.axis_y.min(), min(self.axis_y.max(), v))
s.append(self.x, float(v))
except Exception:
# ignore bad data points
pass
# Trim series to rolling window
min_x_to_keep = max(0, self.x - self.window)
self.axis_x.setRange(min_x_to_keep, self.x)
for s in self.series:
# Efficient trim: remove points with x < min_x_to_keep
# QLineSeries doesn't provide O(1) pop from front, so we rebuild if large
points = s.pointsVector()
if points and points[0].x() < min_x_to_keep:
# binary search for first index >= min_x_to_keep
lo, hi = 0, len(points)
while lo < hi:
mid = (lo + hi) // 2
if points[mid].x() < min_x_to_keep:
lo = mid + 1
else:
hi = mid
s.replace(points[lo:]) # keep tail only
class MonitorWindow(QMainWindow):
def __init__(self, refresh_ms: int = 1000, window = 120, parent=None):
super().__init__(parent)
self.setWindowTitle("Argon 1UP Monitor")
self.setMinimumSize(900, 900)
central = QWidget(self)
grid = QGridLayout(central)
grid.setContentsMargins(8, 8, 8, 8)
grid.setHorizontalSpacing(8)
grid.setVerticalSpacing(8)
self.setCentralWidget(central)
# Charts
self.use_chart = RollingChart(
title="CPU Utilization",
series_defs=[ (name, None) for name in cpuload.cpuNames ],
y_min=0, y_max=100,
window=120
)
self.cpu_chart = RollingChart(
title="Temperature (°C)",
series_defs=[
("CPU", None),
("NVMe", None),
],
y_min=20, y_max=80,
window=window
)
self.fan_chart = RollingChart(
title="Fan Speed",
series_defs=[("RPM",None)],
y_min=0,y_max=6000,
window=window
)
self.io_chart = RollingChart(
title="NVMe I/O (MB/s)",
series_defs=[
("Read MB/s", None),
("Write MB/s", None),
],
y_min=0, y_max=1100, # adjust ceiling for your device
window=window
)
# Layout: 2x2 grid (CPU, NVMe on top; IO full width bottom)
grid.addWidget(self.use_chart, 0, 0, 1, 2 )
grid.addWidget(self.io_chart, 1, 0, 1, 2 )
grid.addWidget(self.cpu_chart, 2, 0, 1, 1 )
grid.addWidget(self.fan_chart, 2, 1, 1, 1 )
# Timer
self.timer = QTimer(self)
self.timer.timeout.connect(self.refresh_metrics)
self.timer.start(refresh_ms)
self.refresh_metrics()
def refresh_metrics(self):
# Gather metrics with safety
try:
cpu_c = float(sysdata.CPUTemperature)
except Exception:
cpu_c = None
try:
fan_speed = sysdata.fanSpeed
except Exception:
fan_speed = None
try:
nvme_c = sysdata.driveTemp
except Exception:
nvme_c = None
try:
read_mb, write_mb = sysdata.driveStats
read_mb = float(read_mb)
write_mb = float(write_mb)
except Exception:
read_mb, write_mb = None, None
try:
p = cpuload.getPercentages()
values = []
for i in range( len(cpuload) ):
values.append( round( p[f'cpu{i}'], 2 ) )
except Exception:
values = [ None for i in range( len( cpuload) ) ]
# Append to charts
self.cpu_chart.append([cpu_c,nvme_c])
self.fan_chart.append([fan_speed])
self.io_chart.append([read_mb, write_mb])
self.use_chart.append( values )
def main():
app = QApplication(sys.argv)
w = MonitorWindow(refresh_ms=1000)
w.show()
sys.exit(app.exec_())
if __name__ == "__main__":
sysdata = systemData()
main()

171
monitor/oneUpSupport.py Executable file
View File

@@ -0,0 +1,171 @@
#!/usr/bin/python3
#
# Setup environment and pull in all of the items we need from gpiozero. The
# gpiozero library is the new one that supports Raspberry PI 5's (and I suspect
# will be new direction for all prior version the RPIi.)
#
from gpiozero import CPUTemperature
import time
import os
class DriveStats:
'''
DriveStat class -
This class gets the drive statistics from sysfs for the device passed
in. There are several statistics can can be obtained. Note that since
all of the data is pulled at the same time, it is upto the caller to
make sure all the stats needed are obtained at the same time.
See: https://www.kernel.org/doc/html/latest/block/stat.html
Parameters:
device - the name of the device to track
'''
READ_IOS = 0
READ_MERGES = 1
READ_SECTORS = 2
READ_TICKS = 3
WRITE_IOS = 4
WRITE_MERGES = 5
WRITE_SECTORS = 6
WRITE_TICKS = 7
IN_FLIGHT = 8
IO_TICKS = 9
TIME_IN_QUEUE = 10
DISCARD_IOS = 11
DISCARD_MERGES = 12
DISCARD_SECTORS = 13
DISCARD_TICS = 14
FLUSH_IOS = 15
FLUSH_TICKS = 16
def __init__( self, device:str ):
self._last : list[int] = []
self._stats : list[int] = []
self._device = device
self._readStats()
def _readStats( self ):
'''
Read the disk statistics. The stored statics in sysfs are stored as a single file
so that when the data is read, all of the stats correlate to the same time. The data
is from the time the device has come online.
last and set to the old version of the data, and the latest data is stored in stats
'''
try:
self._last = self._stats
with open( f"/sys/block/{self._device}/stat", "r") as f:
curStats = f.readline().strip().split(" ")
self._stats = [int(l) for l in curStats if l]
except Exception as e:
print( f"Failure reading disk statistics for {self._device} error {e}" )
def _getStats( self ) -> list[int]:
'''
Read the devices statistics from the device,and return it.
Returns:
An array containing all of the data colleected about the device.
'''
curData : list[int] = []
self._readStats()
if self._last == []:
curData = self._stats[:]
else:
curData = [ d-self._last[i] for i,d in enumerate( self._stats ) ]
return curData
def readAllStats( self ) -> list[int]:
'''
read all of the drive statisics from sysfs for the device.
Returns
A list of all of the device stats
'''
return self._getStats()
def readSectors( self )-> int:
return self._getStats()[DriveStats.READ_SECTORS]
def writeSectors( self ) -> int:
return self._getStats()[DriveStats.WRITE_SECTORS]
def discardSectors( self ) -> int:
return self._getStats()[DriveStats.DISCARD_SECTORS]
def readWriteSectors( self ) -> tuple[int,int]:
curData = self._getStats()
return (curData[DriveStats.READ_SECTORS],curData[DriveStats.WRITE_SECTORS])
class systemData:
def __init__( self, drive : str = 'nvme0n1' ):
self._drive = drive
self._cpuTemp = CPUTemperature()
self._stats = DriveStats( self._drive )
@property
def CPUTemperature(self) -> int:
return self._cpuTemp.temperature
@property
def fanSpeed( self ) -> int:
speed= 0
try:
command = os.popen( 'cat /sys/devices/platform/cooling_fan/hwmon/*/fan1_input' )
speed = int( command.read().strip())
except Exception as error:
print( f"Could not determine fan speed, error {error}" )
finally:
command.close()
return speed
@property
def driveTemp(self) -> float:
smartOutRaw = ""
cmd = f'sudo smartctl -A /dev/{self._drive}'
try:
command = os.popen( cmd )
smartOutRaw = command.read()
except Exception as error:
print( f"Could not launch {cmd} error is {error}" )
return 0.0
finally:
command.close()
smartOut = [ l for l in smartOutRaw.split('\n') if l]
for smartAttr in ["Temperature:","194","190"]:
try:
line = [l for l in smartOut if l.startswith(smartAttr)][0]
parts = [p for p in line.replace('\t',' ').split(' ') if p]
if smartAttr == "Temperature:":
return float(parts[1])
else:
return float(parts[0])
except IndexError:
pass
return float(0.0)
@property
def driveStats(self) -> tuple[float,float]:
data = self._stats.readWriteSectors()
readMB = (float(data[0]) * 512.0) / (1024.0 * 1024.0)
writeMB = (float(data[1]) * 512.0) / (1024.0 * 1024.0)
return (readMB, writeMB )
if __name__ == "__main__":
data = systemData()
print( f"CPU Temp : {data.CPUTemperature}" )
print( f"Fan Speed: {data.fanSpeed}" )
print( f"NVME Temp: {data.driveTemp}" )
print( f"Stats : {data.driveStats}" )

159
monitor/simple_monitor.py Executable file
View File

@@ -0,0 +1,159 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
PyQt5 CPU/NVMe Monitor
- getCPUTemp() -> float (°C)
- getDriveTemp() -> float (°C)
- getIORate() -> tuple[float, float] in MB/s as (read_mb_s, write_mb_s)
Replace the stub return values with your real implementations later.
"""
import sys
from typing import Tuple
from oneUpSupport import systemData
sysdata = systemData()
# --------------------------
# Metrics function stubs
# --------------------------
def getCPUTemp() -> float:
"""Return current CPU temperature in °C."""
return float( sysdata.CPUTemperature )
def getDriveTemp() -> float:
"""Return current NVMe drive temperature in °C."""
return sysdata.driveTemp
def getIORate() -> Tuple[float, float]:
"""Return current NVMe IO rates (read_MBps, write_MBps)."""
return sysdata.driveStats
# --------------------------
# UI
# --------------------------
from PyQt5.QtCore import Qt, QTimer, QDateTime
from PyQt5.QtWidgets import (
QApplication, QMainWindow, QWidget, QGridLayout, QLabel, QProgressBar, QHBoxLayout
)
class MetricRow(QWidget):
"""A compact row with a label, numeric value, unit, and optional progress bar."""
def __init__(self, title: str, show_bar: bool = False, bar_min: int = 0, bar_max: int = 110, parent=None):
super().__init__(parent)
self.title_lbl = QLabel(title)
self.title_lbl.setStyleSheet("font-weight: 600;")
self.value_lbl = QLabel("--")
self.value_lbl.setAlignment(Qt.AlignRight | Qt.AlignVCenter)
self.unit_lbl = QLabel("")
self.unit_lbl.setAlignment(Qt.AlignLeft | Qt.AlignVCenter)
layout = QGridLayout(self)
layout.setContentsMargins(0, 0, 0, 0)
layout.addWidget(self.title_lbl, 0, 0, 1, 1)
layout.addWidget(self.value_lbl, 0, 1, 1, 1)
layout.addWidget(self.unit_lbl, 0, 2, 1, 1)
self.bar = None
if show_bar:
self.bar = QProgressBar()
self.bar.setMinimum(bar_min)
self.bar.setMaximum(bar_max)
self.bar.setTextVisible(False)
layout.addWidget(self.bar, 1, 0, 1, 3)
layout.setColumnStretch(0, 1)
layout.setColumnStretch(1, 0)
layout.setColumnStretch(2, 0)
def set_value(self, value: float, unit: str = "", bar_value: float = None):
self.value_lbl.setText(f"{value:.1f}")
self.unit_lbl.setText(unit)
if self.bar is not None and bar_value is not None:
self.bar.setValue(int(bar_value))
class MonitorWindow(QMainWindow):
def __init__(self, refresh_ms: int = 1000, parent=None):
super().__init__(parent)
self.setWindowTitle("CPU & NVMe Monitor")
self.setMinimumWidth(420)
central = QWidget(self)
grid = QGridLayout(central)
grid.setContentsMargins(16, 16, 16, 16)
grid.setVerticalSpacing(12)
self.setCentralWidget(central)
# Rows
self.cpu_row = MetricRow("CPU Temperature", show_bar=True, bar_max=90)
self.nvme_row = MetricRow("NVMe Temperature", show_bar=True, bar_max=90)
# IO row: two side-by-side values
self.io_title = QLabel("NVMe I/O Rate")
self.io_title.setStyleSheet("font-weight: 600;")
self.io_read = QLabel("Read: -- MB/s")
self.io_write = QLabel("Write: -- MB/s")
io_box = QHBoxLayout()
io_box.addWidget(self.io_read, 1)
io_box.addWidget(self.io_write, 1)
# Last updated
self.updated_lbl = QLabel("Last updated: --")
self.updated_lbl.setAlignment(Qt.AlignRight | Qt.AlignVCenter)
self.updated_lbl.setStyleSheet("color: #666; font-size: 11px;")
# Layout
grid.addWidget(self.cpu_row, 0, 0, 1, 2)
grid.addWidget(self.nvme_row, 1, 0, 1, 2)
grid.addWidget(self.io_title, 2, 0, 1, 2)
grid.addLayout(io_box, 3, 0, 1, 2)
grid.addWidget(self.updated_lbl, 4, 0, 1, 2)
# Timer
self.timer = QTimer(self)
self.timer.timeout.connect(self.refresh_metrics)
self.timer.start(refresh_ms)
# Initial fill
self.refresh_metrics()
def refresh_metrics(self):
try:
cpu_c = float(getCPUTemp())
except Exception:
cpu_c = float("nan")
try:
nvme_c = float(getDriveTemp())
except Exception:
nvme_c = float("nan")
try:
read_mb, write_mb = getIORate()
read_mb = float(read_mb)
write_mb = float(write_mb)
except Exception:
read_mb, write_mb = float("nan"), float("nan")
# Update rows
self.cpu_row.set_value(cpu_c if cpu_c == cpu_c else 0.0, "°C", bar_value=cpu_c if cpu_c == cpu_c else 0)
self.nvme_row.set_value(nvme_c if nvme_c == nvme_c else 0.0, "°C", bar_value=nvme_c if nvme_c == nvme_c else 0)
self.io_read.setText(f"Read: {read_mb:.1f} MB/s" if read_mb == read_mb else "Read: -- MB/s")
self.io_write.setText(f"Write: {write_mb:.1f} MB/s" if write_mb == write_mb else "Write: -- MB/s")
self.updated_lbl.setText(f"Last updated: {QDateTime.currentDateTime().toString('yyyy-MM-dd hh:mm:ss')}")
def main():
app = QApplication(sys.argv)
w = MonitorWindow(refresh_ms=1000)
w.show()
sys.exit(app.exec_())
if __name__ == "__main__":
main()