diff --git a/monitor/cpuload.py b/monitor/cpuload.py new file mode 100755 index 0000000..ca3c058 --- /dev/null +++ b/monitor/cpuload.py @@ -0,0 +1,53 @@ +#!/usr/bin/env python3 +import os +import time + +class CPULoad: + def __init__( self ): + self._previousData = self._getRawData() + self._names = [] + for item in self._previousData: + self._names.append( item ) + + def _getRawData( self ): + result = {} + with open( "/proc/stat", "r") as f: + allLines = f.readlines() + for line in allLines: + cpu = line.replace('\t', ' ').strip().split() + if (len(cpu[0]) > 3) and (cpu[0][:3] == "cpu"): + total = 0 + idle = 0 + for i in range( 1, len(cpu)): + total += int(cpu[i]) + if i == 4 or i == 5: + idle += int(cpu[i]) + result[cpu[0]] = (total,idle) + return result + + def getPercentages( self ): + results = {} + current = self._getRawData() + for item in current: + total = current[item][0] - self._previousData[item][0] + idle = current[item][1] - self._previousData[item][1] + percent = ((total - idle)/total) * 100 + results[item] = percent + self._previousData = current + return results + + @property + def cpuNames( self ): + return self._names + + def __len__(self): + return len(self._previousData) + +if __name__ == "__main__": + load = CPULoad() + print( f"Number of CPU's = {len(load)}" ) + while True: + time.sleep( 1 ) + percentage = load.getPercentages() + for item in percentage: + print( f"{item} : {percentage[item]:.02f}" ) diff --git a/monitor/oneUpMon.py b/monitor/oneUpMon.py new file mode 100755 index 0000000..891aa31 --- /dev/null +++ b/monitor/oneUpMon.py @@ -0,0 +1,228 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +Application that monitors current CPU and Drive temp, along with fan speed and IO utilization +Requires: PyQt5 (including QtCharts) + +""" + +import sys +from typing import Tuple, List +from gpiozero import CPUTemperature +from oneUpSupport import systemData +from cpuload import CPULoad +import os + +# -------------------------- +# Globals +# -------------------------- +sysdata = None +cpuload = CPULoad() + +# -------------------------- +# UI +# -------------------------- + +from PyQt5.QtCore import Qt, QTimer +from PyQt5.QtGui import QPainter +from PyQt5.QtWidgets import QApplication, QMainWindow, QWidget, QGridLayout, QLabel +from PyQt5.QtChart import QChart, QChartView, QLineSeries, QValueAxis + +class RollingChart(QWidget): + """ + A reusable chart widget with one or more QLineSeries and a rolling X window. + + Args: + title: Chart title + series_defs: List of (name, color_qt_str or None) for each line + y_min, y_max: Fixed Y axis range + window: number of points to keep (points are 1 per tick by default) + """ + def __init__(self, title: str, series_defs: List[tuple], y_min: float, y_max: float, window: int = 120, parent=None): + super().__init__(parent) + self.window = window + self.x = 0 + self.series: List[QLineSeries] = [] + self.chart = QChart() + self.chart.setTitle(title) + self.chart.legend().setVisible(len(series_defs) > 1) + self.chart.legend().setAlignment(Qt.AlignBottom) + + for name, color in series_defs: + s = QLineSeries() + s.setName(name) + if color: + s.setColor(color) # QColor or string like "#RRGGBB" + self.series.append(s) + self.chart.addSeries(s) + + # Axes + self.axis_x = QValueAxis() + self.axis_x.setRange(0, self.window) + #self.axis_x.setTitleText("Seconds") + self.axis_x.setLabelFormat("%d") + + self.axis_y = QValueAxis() + self.axis_y.setRange(y_min, y_max) + + self.chart.addAxis(self.axis_x, Qt.AlignBottom) + self.chart.addAxis(self.axis_y, Qt.AlignLeft) + + for s in self.series: + s.attachAxis(self.axis_x) + s.attachAxis(self.axis_y) + + self.view = QChartView(self.chart) + self.view.setRenderHints(QPainter.RenderHint.Antialiasing) + + layout = QGridLayout(self) + layout.setContentsMargins(0, 0, 0, 0) + layout.addWidget(self.view, 0, 0) + + def append(self, values: List[float]): + """ + Append one sample (for each series) at the next x value. Handles rolling window. + values must match the number of series. + """ + self.x += 1 + for s, v in zip(self.series, values): + # Handle NaN by skipping, or plot zero—here we clamp None/NaN to None and skip + try: + if v is None: + continue + # If you want to clamp, do it here: v = max(self.axis_y.min(), min(self.axis_y.max(), v)) + s.append(self.x, float(v)) + except Exception: + # ignore bad data points + pass + + # Trim series to rolling window + min_x_to_keep = max(0, self.x - self.window) + self.axis_x.setRange(min_x_to_keep, self.x) + + for s in self.series: + # Efficient trim: remove points with x < min_x_to_keep + # QLineSeries doesn't provide O(1) pop from front, so we rebuild if large + points = s.pointsVector() + if points and points[0].x() < min_x_to_keep: + # binary search for first index >= min_x_to_keep + lo, hi = 0, len(points) + while lo < hi: + mid = (lo + hi) // 2 + if points[mid].x() < min_x_to_keep: + lo = mid + 1 + else: + hi = mid + s.replace(points[lo:]) # keep tail only + + +class MonitorWindow(QMainWindow): + def __init__(self, refresh_ms: int = 1000, window = 120, parent=None): + super().__init__(parent) + self.setWindowTitle("Argon 1UP Monitor") + self.setMinimumSize(900, 900) + + central = QWidget(self) + grid = QGridLayout(central) + grid.setContentsMargins(8, 8, 8, 8) + grid.setHorizontalSpacing(8) + grid.setVerticalSpacing(8) + self.setCentralWidget(central) + + # Charts + self.use_chart = RollingChart( + title="CPU Utilization", + series_defs=[ (name, None) for name in cpuload.cpuNames ], + y_min=0, y_max=100, + window=120 + ) + + self.cpu_chart = RollingChart( + title="Temperature (°C)", + series_defs=[ + ("CPU", None), + ("NVMe", None), + ], + y_min=20, y_max=80, + window=window + ) + + self.fan_chart = RollingChart( + title="Fan Speed", + series_defs=[("RPM",None)], + y_min=0,y_max=6000, + window=window + ) + + self.io_chart = RollingChart( + title="NVMe I/O (MB/s)", + series_defs=[ + ("Read MB/s", None), + ("Write MB/s", None), + ], + y_min=0, y_max=1100, # adjust ceiling for your device + window=window + ) + + # Layout: 2x2 grid (CPU, NVMe on top; IO full width bottom) + grid.addWidget(self.use_chart, 0, 0, 1, 2 ) + grid.addWidget(self.io_chart, 1, 0, 1, 2 ) + grid.addWidget(self.cpu_chart, 2, 0, 1, 1 ) + grid.addWidget(self.fan_chart, 2, 1, 1, 1 ) + + # Timer + self.timer = QTimer(self) + self.timer.timeout.connect(self.refresh_metrics) + self.timer.start(refresh_ms) + + self.refresh_metrics() + + def refresh_metrics(self): + # Gather metrics with safety + try: + cpu_c = float(sysdata.CPUTemperature) + except Exception: + cpu_c = None + + try: + fan_speed = sysdata.fanSpeed + except Exception: + fan_speed = None + + try: + nvme_c = sysdata.driveTemp + except Exception: + nvme_c = None + + try: + read_mb, write_mb = sysdata.driveStats + read_mb = float(read_mb) + write_mb = float(write_mb) + except Exception: + read_mb, write_mb = None, None + + try: + p = cpuload.getPercentages() + values = [] + for i in range( len(cpuload) ): + values.append( round( p[f'cpu{i}'], 2 ) ) + except Exception: + values = [ None for i in range( len( cpuload) ) ] + + # Append to charts + self.cpu_chart.append([cpu_c,nvme_c]) + self.fan_chart.append([fan_speed]) + self.io_chart.append([read_mb, write_mb]) + self.use_chart.append( values ) + +def main(): + app = QApplication(sys.argv) + w = MonitorWindow(refresh_ms=1000) + w.show() + sys.exit(app.exec_()) + +if __name__ == "__main__": + sysdata = systemData() + main() + diff --git a/monitor/oneUpSupport.py b/monitor/oneUpSupport.py new file mode 100755 index 0000000..c384a83 --- /dev/null +++ b/monitor/oneUpSupport.py @@ -0,0 +1,171 @@ +#!/usr/bin/python3 +# +# Setup environment and pull in all of the items we need from gpiozero. The +# gpiozero library is the new one that supports Raspberry PI 5's (and I suspect +# will be new direction for all prior version the RPIi.) +# +from gpiozero import CPUTemperature +import time +import os + +class DriveStats: + ''' + DriveStat class - + + This class gets the drive statistics from sysfs for the device passed + in. There are several statistics can can be obtained. Note that since + all of the data is pulled at the same time, it is upto the caller to + make sure all the stats needed are obtained at the same time. + + See: https://www.kernel.org/doc/html/latest/block/stat.html + + Parameters: + device - the name of the device to track + ''' + + READ_IOS = 0 + READ_MERGES = 1 + READ_SECTORS = 2 + READ_TICKS = 3 + WRITE_IOS = 4 + WRITE_MERGES = 5 + WRITE_SECTORS = 6 + WRITE_TICKS = 7 + IN_FLIGHT = 8 + IO_TICKS = 9 + TIME_IN_QUEUE = 10 + DISCARD_IOS = 11 + DISCARD_MERGES = 12 + DISCARD_SECTORS = 13 + DISCARD_TICS = 14 + FLUSH_IOS = 15 + FLUSH_TICKS = 16 + + def __init__( self, device:str ): + self._last : list[int] = [] + self._stats : list[int] = [] + self._device = device + self._readStats() + + def _readStats( self ): + ''' + Read the disk statistics. The stored statics in sysfs are stored as a single file + so that when the data is read, all of the stats correlate to the same time. The data + is from the time the device has come online. + + last and set to the old version of the data, and the latest data is stored in stats + + ''' + try: + self._last = self._stats + with open( f"/sys/block/{self._device}/stat", "r") as f: + curStats = f.readline().strip().split(" ") + self._stats = [int(l) for l in curStats if l] + except Exception as e: + print( f"Failure reading disk statistics for {self._device} error {e}" ) + + def _getStats( self ) -> list[int]: + ''' + Read the devices statistics from the device,and return it. + + Returns: + An array containing all of the data colleected about the device. + ''' + curData : list[int] = [] + + self._readStats() + if self._last == []: + curData = self._stats[:] + else: + curData = [ d-self._last[i] for i,d in enumerate( self._stats ) ] + return curData + + def readAllStats( self ) -> list[int]: + ''' + read all of the drive statisics from sysfs for the device. + + Returns + A list of all of the device stats + ''' + return self._getStats() + + def readSectors( self )-> int: + return self._getStats()[DriveStats.READ_SECTORS] + + def writeSectors( self ) -> int: + return self._getStats()[DriveStats.WRITE_SECTORS] + + def discardSectors( self ) -> int: + return self._getStats()[DriveStats.DISCARD_SECTORS] + + def readWriteSectors( self ) -> tuple[int,int]: + curData = self._getStats() + return (curData[DriveStats.READ_SECTORS],curData[DriveStats.WRITE_SECTORS]) + + +class systemData: + def __init__( self, drive : str = 'nvme0n1' ): + self._drive = drive + self._cpuTemp = CPUTemperature() + self._stats = DriveStats( self._drive ) + + @property + def CPUTemperature(self) -> int: + return self._cpuTemp.temperature + + + @property + def fanSpeed( self ) -> int: + speed= 0 + try: + command = os.popen( 'cat /sys/devices/platform/cooling_fan/hwmon/*/fan1_input' ) + speed = int( command.read().strip()) + except Exception as error: + print( f"Could not determine fan speed, error {error}" ) + finally: + command.close() + + return speed + + @property + def driveTemp(self) -> float: + smartOutRaw = "" + cmd = f'sudo smartctl -A /dev/{self._drive}' + try: + command = os.popen( cmd ) + smartOutRaw = command.read() + except Exception as error: + print( f"Could not launch {cmd} error is {error}" ) + return 0.0 + finally: + command.close() + + smartOut = [ l for l in smartOutRaw.split('\n') if l] + for smartAttr in ["Temperature:","194","190"]: + try: + line = [l for l in smartOut if l.startswith(smartAttr)][0] + parts = [p for p in line.replace('\t',' ').split(' ') if p] + if smartAttr == "Temperature:": + return float(parts[1]) + else: + return float(parts[0]) + except IndexError: + pass + + return float(0.0) + + @property + def driveStats(self) -> tuple[float,float]: + data = self._stats.readWriteSectors() + readMB = (float(data[0]) * 512.0) / (1024.0 * 1024.0) + writeMB = (float(data[1]) * 512.0) / (1024.0 * 1024.0) + return (readMB, writeMB ) + + +if __name__ == "__main__": + data = systemData() + print( f"CPU Temp : {data.CPUTemperature}" ) + print( f"Fan Speed: {data.fanSpeed}" ) + print( f"NVME Temp: {data.driveTemp}" ) + print( f"Stats : {data.driveStats}" ) + diff --git a/monitor/simple_monitor.py b/monitor/simple_monitor.py new file mode 100755 index 0000000..44ffe1d --- /dev/null +++ b/monitor/simple_monitor.py @@ -0,0 +1,159 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +PyQt5 CPU/NVMe Monitor + +- getCPUTemp() -> float (°C) +- getDriveTemp() -> float (°C) +- getIORate() -> tuple[float, float] in MB/s as (read_mb_s, write_mb_s) + +Replace the stub return values with your real implementations later. +""" + +import sys +from typing import Tuple +from oneUpSupport import systemData + +sysdata = systemData() + +# -------------------------- +# Metrics function stubs +# -------------------------- +def getCPUTemp() -> float: + """Return current CPU temperature in °C.""" + return float( sysdata.CPUTemperature ) + +def getDriveTemp() -> float: + """Return current NVMe drive temperature in °C.""" + return sysdata.driveTemp + +def getIORate() -> Tuple[float, float]: + """Return current NVMe IO rates (read_MBps, write_MBps).""" + return sysdata.driveStats + + +# -------------------------- +# UI +# -------------------------- +from PyQt5.QtCore import Qt, QTimer, QDateTime +from PyQt5.QtWidgets import ( + QApplication, QMainWindow, QWidget, QGridLayout, QLabel, QProgressBar, QHBoxLayout +) + +class MetricRow(QWidget): + """A compact row with a label, numeric value, unit, and optional progress bar.""" + def __init__(self, title: str, show_bar: bool = False, bar_min: int = 0, bar_max: int = 110, parent=None): + super().__init__(parent) + self.title_lbl = QLabel(title) + self.title_lbl.setStyleSheet("font-weight: 600;") + self.value_lbl = QLabel("--") + self.value_lbl.setAlignment(Qt.AlignRight | Qt.AlignVCenter) + self.unit_lbl = QLabel("") + self.unit_lbl.setAlignment(Qt.AlignLeft | Qt.AlignVCenter) + + layout = QGridLayout(self) + layout.setContentsMargins(0, 0, 0, 0) + layout.addWidget(self.title_lbl, 0, 0, 1, 1) + layout.addWidget(self.value_lbl, 0, 1, 1, 1) + layout.addWidget(self.unit_lbl, 0, 2, 1, 1) + + self.bar = None + if show_bar: + self.bar = QProgressBar() + self.bar.setMinimum(bar_min) + self.bar.setMaximum(bar_max) + self.bar.setTextVisible(False) + layout.addWidget(self.bar, 1, 0, 1, 3) + + layout.setColumnStretch(0, 1) + layout.setColumnStretch(1, 0) + layout.setColumnStretch(2, 0) + + def set_value(self, value: float, unit: str = "", bar_value: float = None): + self.value_lbl.setText(f"{value:.1f}") + self.unit_lbl.setText(unit) + if self.bar is not None and bar_value is not None: + self.bar.setValue(int(bar_value)) + + +class MonitorWindow(QMainWindow): + def __init__(self, refresh_ms: int = 1000, parent=None): + super().__init__(parent) + self.setWindowTitle("CPU & NVMe Monitor") + self.setMinimumWidth(420) + + central = QWidget(self) + grid = QGridLayout(central) + grid.setContentsMargins(16, 16, 16, 16) + grid.setVerticalSpacing(12) + self.setCentralWidget(central) + + # Rows + self.cpu_row = MetricRow("CPU Temperature", show_bar=True, bar_max=90) + self.nvme_row = MetricRow("NVMe Temperature", show_bar=True, bar_max=90) + + # IO row: two side-by-side values + self.io_title = QLabel("NVMe I/O Rate") + self.io_title.setStyleSheet("font-weight: 600;") + self.io_read = QLabel("Read: -- MB/s") + self.io_write = QLabel("Write: -- MB/s") + io_box = QHBoxLayout() + io_box.addWidget(self.io_read, 1) + io_box.addWidget(self.io_write, 1) + + # Last updated + self.updated_lbl = QLabel("Last updated: --") + self.updated_lbl.setAlignment(Qt.AlignRight | Qt.AlignVCenter) + self.updated_lbl.setStyleSheet("color: #666; font-size: 11px;") + + # Layout + grid.addWidget(self.cpu_row, 0, 0, 1, 2) + grid.addWidget(self.nvme_row, 1, 0, 1, 2) + grid.addWidget(self.io_title, 2, 0, 1, 2) + grid.addLayout(io_box, 3, 0, 1, 2) + grid.addWidget(self.updated_lbl, 4, 0, 1, 2) + + # Timer + self.timer = QTimer(self) + self.timer.timeout.connect(self.refresh_metrics) + self.timer.start(refresh_ms) + + # Initial fill + self.refresh_metrics() + + def refresh_metrics(self): + try: + cpu_c = float(getCPUTemp()) + except Exception: + cpu_c = float("nan") + + try: + nvme_c = float(getDriveTemp()) + except Exception: + nvme_c = float("nan") + + try: + read_mb, write_mb = getIORate() + read_mb = float(read_mb) + write_mb = float(write_mb) + except Exception: + read_mb, write_mb = float("nan"), float("nan") + + # Update rows + self.cpu_row.set_value(cpu_c if cpu_c == cpu_c else 0.0, "°C", bar_value=cpu_c if cpu_c == cpu_c else 0) + self.nvme_row.set_value(nvme_c if nvme_c == nvme_c else 0.0, "°C", bar_value=nvme_c if nvme_c == nvme_c else 0) + self.io_read.setText(f"Read: {read_mb:.1f} MB/s" if read_mb == read_mb else "Read: -- MB/s") + self.io_write.setText(f"Write: {write_mb:.1f} MB/s" if write_mb == write_mb else "Write: -- MB/s") + + self.updated_lbl.setText(f"Last updated: {QDateTime.currentDateTime().toString('yyyy-MM-dd hh:mm:ss')}") + +def main(): + app = QApplication(sys.argv) + w = MonitorWindow(refresh_ms=1000) + w.show() + sys.exit(app.exec_()) + +if __name__ == "__main__": + main() +