diff --git a/monitor/cpuload.py b/monitor/cpuload.py index ca3c058..ccdbe05 100755 --- a/monitor/cpuload.py +++ b/monitor/cpuload.py @@ -3,13 +3,38 @@ import os import time class CPULoad: + ''' + A class to help with obtaining the CPU load of the system. If there is more information + needed, we can add to this. + + Note: + This code automatically attempts to load the data from the system to initialize the + object with names, and an initial set of data. + + This may result in th first actual call return some not very consistent values, for + the time period being observed, but that difference is minimal. In otherwords if we + the period of time being measured is 1 second, and it's been a minute since this class + was initialized, the first period reported will be CPU load over the minute, not 1 second, + and the second period reported will be for a second... + + This is usually not an issue. + ''' def __init__( self ): - self._previousData = self._getRawData() - self._names = [] + self._previousData : dict[str,tuple] = self._getRawData() + self._names : list[str] = [] for item in self._previousData: self._names.append( item ) - def _getRawData( self ): + def _getRawData( self ) -> dict[str : tuple]: + ''' + Obtain the raw CPU data from the system (located in /prop/stat), and + return just the cpu0 -> cpux values. No assumption is made on the number of + cpus. + + Returns: + A dictionary is returned, the format is name = (total, idle). The total + time and idle time are use to determine the percent utilization of the system. + ''' result = {} with open( "/proc/stat", "r") as f: allLines = f.readlines() @@ -25,22 +50,50 @@ class CPULoad: result[cpu[0]] = (total,idle) return result - def getPercentages( self ): + def getPercentages( self ) -> dict[ str : float ]: + ''' + Obtain the percent CPU utilization of the system for a period of time. + + This routine gets the current raw data from the system, and then performs + a delta from the prior time this function was called. This data is then run + through the following equation: + + utilization = ((total - idle)/total) * 100 + + If the snapshots are taken at relativy consistent intervals, the CPU + utilization in percent, is reasonably lose to the actual percentage. + + Returns: + A dictionary consisting of the name of the CPU, and a floating point + number representing the current utilization of that CPU. + ''' results = {} current = self._getRawData() for item in current: total = current[item][0] - self._previousData[item][0] idle = current[item][1] - self._previousData[item][1] percent = ((total - idle)/total) * 100 - results[item] = percent + results[item] = round(percent,2) self._previousData = current return results @property - def cpuNames( self ): + def cpuNames( self ) -> list[str]: + ''' + Get a list of CPU names from the system. + + Returns: + a list of strings + ''' return self._names - def __len__(self): + def __len__(self) -> int: + ''' + handle getting the length (or count of CPU's). + + Returns: + Number of CPU's + ''' return len(self._previousData) if __name__ == "__main__": @@ -48,6 +101,7 @@ if __name__ == "__main__": print( f"Number of CPU's = {len(load)}" ) while True: time.sleep( 1 ) - percentage = load.getPercentages() + percentage : dict[str:float] = load.getPercentages() + print( f"percentage: {percentage}" ) for item in percentage: print( f"{item} : {percentage[item]:.02f}" ) diff --git a/monitor/oneUpMon.py b/monitor/oneUpMon.py index 891aa31..fc93d37 100755 --- a/monitor/oneUpMon.py +++ b/monitor/oneUpMon.py @@ -7,12 +7,9 @@ Requires: PyQt5 (including QtCharts) """ -import sys -from typing import Tuple, List -from gpiozero import CPUTemperature -from oneUpSupport import systemData -from cpuload import CPULoad import os +import sys +from systemsupport import systemData, CPULoad # -------------------------- # Globals @@ -30,25 +27,27 @@ from PyQt5.QtWidgets import QApplication, QMainWindow, QWidget, QGridLayout, QLa from PyQt5.QtChart import QChart, QChartView, QLineSeries, QValueAxis class RollingChart(QWidget): - """ + ''' A reusable chart widget with one or more QLineSeries and a rolling X window. - Args: - title: Chart title - series_defs: List of (name, color_qt_str or None) for each line - y_min, y_max: Fixed Y axis range - window: number of points to keep (points are 1 per tick by default) - """ - def __init__(self, title: str, series_defs: List[tuple], y_min: float, y_max: float, window: int = 120, parent=None): + Parameters: + title - Chart title. + series_defs - List of (name, color_qt_str or None) for each line. + y_min,y_max - Fixed Y axis range. + window - Number of points to keep (points are 1 per tick by default). + ''' + def __init__(self, title: str, series_defs: list[tuple], y_min: float, y_max: float, window: int = 120, parent=None): super().__init__(parent) + self.window = window - self.x = 0 - self.series: List[QLineSeries] = [] - self.chart = QChart() + self.xpos = window - 1 + self.chart = QChart() + self.chart.setTitle(title) self.chart.legend().setVisible(len(series_defs) > 1) self.chart.legend().setAlignment(Qt.AlignBottom) + self.series:list[QLineSeries] = [] for name, color in series_defs: s = QLineSeries() s.setName(name) @@ -57,14 +56,20 @@ class RollingChart(QWidget): self.series.append(s) self.chart.addSeries(s) - # Axes + # Setup X Axis... Note, setVisible disables all of this, however whatI + # want is the tick count etc, but NO lable on the axis. There does not + # appear to be a way to do that. self.axis_x = QValueAxis() self.axis_x.setRange(0, self.window) - #self.axis_x.setTitleText("Seconds") + self.axis_x.setMinorTickCount( 2 ) + self.axis_x.setTickCount( 10 ) self.axis_x.setLabelFormat("%d") + self.axis_x.setVisible(False) + # Setup Y Axis... self.axis_y = QValueAxis() self.axis_y.setRange(y_min, y_max) + self.axis_y.setLabelFormat( "%d" ) self.chart.addAxis(self.axis_x, Qt.AlignBottom) self.chart.addAxis(self.axis_y, Qt.AlignLeft) @@ -79,28 +84,33 @@ class RollingChart(QWidget): layout = QGridLayout(self) layout.setContentsMargins(0, 0, 0, 0) layout.addWidget(self.view, 0, 0) - - def append(self, values: List[float]): - """ + + def append(self, values: list[float]): + ''' Append one sample (for each series) at the next x value. Handles rolling window. values must match the number of series. - """ - self.x += 1 + + Parameters: + values - A list of floating point numbers, on per data series in the + chart. + ''' + self.xpos += 1 for s, v in zip(self.series, values): # Handle NaN by skipping, or plot zero—here we clamp None/NaN to None and skip try: if v is None: continue # If you want to clamp, do it here: v = max(self.axis_y.min(), min(self.axis_y.max(), v)) - s.append(self.x, float(v)) - except Exception: + s.append(self.xpos, float(v)) + except Exception as error: # ignore bad data points + print( f"Exception error {error}" ) pass - + # Trim series to rolling window - min_x_to_keep = max(0, self.x - self.window) - self.axis_x.setRange(min_x_to_keep, self.x) - + min_x_to_keep = max(0, self.xpos - self.window) + self.axis_x.setRange(min_x_to_keep, self.xpos) + for s in self.series: # Efficient trim: remove points with x < min_x_to_keep # QLineSeries doesn't provide O(1) pop from front, so we rebuild if large @@ -116,11 +126,141 @@ class RollingChart(QWidget): hi = mid s.replace(points[lo:]) # keep tail only +class scaleValues: + def __init__( self, range_y ): + self.index = 0 + self.valueRange = range_y + + @property + def scale( self ): + return self.valueRange[self.index][1] + + def scaleValue(self, value : float ): + return value / self.scale + + def nextScale( self ): + if (self.index + 1) < len(self.valueRange): + self.index += 1 + #print( f"Switched scale to {self.valueRange[self.index]}") + + def prevScale( self ): + if self.index > 0: + self.index -= 1 + #print( f"Switches scale to {self.valueRange[self.index]}") + + def scalePointsDown( self, points ): + amount = self.valueRange[self.index][1] + for point in points: + point.setY(point.y() / amount) + + def scaleDown( self, value ): + return value / 1024 + + def scaleUp( self, value ): + return value * 1024 + + def scalePointsUp( self, points ): + amount = self.valueRange[self.index][1] + for point in points: + point.setY(point.y() * amount) + @property + def name( self ): + return self.valueRange[self.index][0] + + +class RollingChartDynamic(RollingChart): + def __init__(self, title : str, series_defs: list[tuple], range_y : list[tuple], window=120,parent=None): + self.maxY = 512 + super().__init__(title,series_defs,0,self.maxY,window,parent) + self.title = title + self.max = 0 + self.scale = scaleValues(range_y) + self.chart.setTitle( title+ f" ({self.scale.name})" ) + + def getBestFit( self, value ): + values = [4,8,16,32,64,128,256,512,1024] + for i in values: + if value < i: + return i + return 4 + def append(self, values: list[float]): + ''' + Append one sample (for each series) at the next x value. Handles rolling window. + values must match the number of series. + + Parameters: + values - A list of floating point numbers, on per data series in the + chart. + ''' + scaleUp = False + self.xpos += 1 + for s, v in zip(self.series, values): + # Handle NaN by skipping, or plot zero—here we clamp None/NaN to None and skip + try: + if v is None: + continue + sv = self.scale.scaleValue(v) + if sv > 1024: + scaleUp = True + # If you want to clamp, do it here: v = max(self.axis_y.min(), min(self.axis_y.max(), v)) + #if v: + # print( f"value : {v} scaled: {sv} " ) + s.append(self.xpos, float(sv)) + except Exception as error: + # ignore bad data points + print( f"Exception error {error}" ) + pass + + # Trim series to rolling window + min_x_to_keep = max(0, self.xpos - self.window) + self.axis_x.setRange(min_x_to_keep, self.xpos) + + if scaleUp: + self.scale.nextScale() + self.chart.setTitle(self.title + f" ({self.scale.name})" ) + + maxV = 0 + for s in self.series: + drop = 0 + points = s.pointsVector() + for index, point in enumerate(points): + if point.x() < min_x_to_keep: + drop = index + if scaleUp: + point.setY( self.scale.scaleDown(point.y())) + if maxV < point.y(): + maxV = point.y() + s.replace( points[drop:] ) + + if maxV > 1: + self.axis_y.setRange( 0, self.getBestFit(maxV) ) + + #print( f"maxV left is {maxV}" ) + if maxV < 1: + self.scale.prevScale() + self.chart.setTitle( self.title + f" ({self.scale.name})") + for s in self.series: + points = s.pointsVector() + for point in points: + point.setY( self.scale.scaleUp(point.y())) + s.replace(points) + class MonitorWindow(QMainWindow): + ''' + Creating a window to monitor various system aspects. + + Parameters: + refresh_ms - Time between refreshes of data on screen, in milliseconds, the + default is 1 second. + window - How much data do we want to store in the graph? Each data point + is a data refresh period. + Parent - Owning parent of this window... default is None. + ''' def __init__(self, refresh_ms: int = 1000, window = 120, parent=None): super().__init__(parent) - self.setWindowTitle("Argon 1UP Monitor") + + self.setWindowTitle("System Monitor") self.setMinimumSize(900, 900) central = QWidget(self) @@ -155,14 +295,14 @@ class MonitorWindow(QMainWindow): window=window ) - self.io_chart = RollingChart( - title="NVMe I/O (MB/s)", + self.io_chart = RollingChartDynamic( + title="Disk I/O", series_defs=[ - ("Read MB/s", None), - ("Write MB/s", None), + ("Read", None), + ("Write", None), ], - y_min=0, y_max=1100, # adjust ceiling for your device - window=window + range_y=[("Bytes/s", 1),("KiB/s",1024),("MiB/s", 1024*1024),("GiB/s",1024*1024*1024)], + window=window, ) # Layout: 2x2 grid (CPU, NVMe on top; IO full width bottom) @@ -171,30 +311,40 @@ class MonitorWindow(QMainWindow): grid.addWidget(self.cpu_chart, 2, 0, 1, 1 ) grid.addWidget(self.fan_chart, 2, 1, 1, 1 ) + # Get the initial information from the syste + self.refresh_metrics() + # Timer self.timer = QTimer(self) self.timer.timeout.connect(self.refresh_metrics) self.timer.start(refresh_ms) - self.refresh_metrics() - def refresh_metrics(self): - # Gather metrics with safety + ''' + This routine is called periodically, as setup in the __init__ functon. Since this + routine calls out to other things, we want to make sure that there is no possible + exception, so everything needs to be wrapped in a handler. + + ''' + # Obtain the CPU temperature try: cpu_c = float(sysdata.CPUTemperature) except Exception: cpu_c = None + # Obtain the current fan speed try: fan_speed = sysdata.fanSpeed except Exception: fan_speed = None + # Obtain the NVMe device temperature try: nvme_c = sysdata.driveTemp except Exception: nvme_c = None + # Obtain the NVMe Device read and write rates try: read_mb, write_mb = sysdata.driveStats read_mb = float(read_mb) @@ -202,13 +352,12 @@ class MonitorWindow(QMainWindow): except Exception: read_mb, write_mb = None, None + # Get the CPU load precentages try: p = cpuload.getPercentages() - values = [] - for i in range( len(cpuload) ): - values.append( round( p[f'cpu{i}'], 2 ) ) + values = [p[name] for name in cpuload.cpuNames] except Exception: - values = [ None for i in range( len( cpuload) ) ] + values = [ None for name in cpuload.cpuNames ] # Append to charts self.cpu_chart.append([cpu_c,nvme_c]) diff --git a/monitor/oneUpSupport.py b/monitor/oneUpSupport.py index c384a83..e2d6651 100755 --- a/monitor/oneUpSupport.py +++ b/monitor/oneUpSupport.py @@ -113,7 +113,6 @@ class systemData: def CPUTemperature(self) -> int: return self._cpuTemp.temperature - @property def fanSpeed( self ) -> int: speed= 0 @@ -157,8 +156,8 @@ class systemData: @property def driveStats(self) -> tuple[float,float]: data = self._stats.readWriteSectors() - readMB = (float(data[0]) * 512.0) / (1024.0 * 1024.0) - writeMB = (float(data[1]) * 512.0) / (1024.0 * 1024.0) + readMB = (float(data[0]) * 512.0) #/ (1024.0 * 1024.0) + writeMB = (float(data[1]) * 512.0) #/ (1024.0 * 1024.0) return (readMB, writeMB ) diff --git a/monitor/systemsupport.py b/monitor/systemsupport.py new file mode 100755 index 0000000..270cdb1 --- /dev/null +++ b/monitor/systemsupport.py @@ -0,0 +1,273 @@ +#!/usr/bin/python3 +# +# Setup environment and pull in all of the items we need from gpiozero. The +# gpiozero library is the new one that supports Raspberry PI 5's (and I suspect +# will be new direction for all prior version the RPIi.) +# +from gpiozero import CPUTemperature +import time +import os + +class DriveStats: + ''' + DriveStat class - + + This class gets the drive statistics from sysfs for the device passed + in. There are several statistics can can be obtained. Note that since + all of the data is pulled at the same time, it is upto the caller to + make sure all the stats needed are obtained at the same time. + + See: https://www.kernel.org/doc/html/latest/block/stat.html + + Parameters: + device - the name of the device to track + ''' + + READ_IOS = 0 + READ_MERGES = 1 + READ_SECTORS = 2 + READ_TICKS = 3 + WRITE_IOS = 4 + WRITE_MERGES = 5 + WRITE_SECTORS = 6 + WRITE_TICKS = 7 + IN_FLIGHT = 8 + IO_TICKS = 9 + TIME_IN_QUEUE = 10 + DISCARD_IOS = 11 + DISCARD_MERGES = 12 + DISCARD_SECTORS = 13 + DISCARD_TICS = 14 + FLUSH_IOS = 15 + FLUSH_TICKS = 16 + + def __init__( self, device:str ): + self._last : list[int] = [] + self._stats : list[int] = [] + self._device = device + self._readStats() + + def _readStats( self ): + ''' + Read the disk statistics. The stored statics in sysfs are stored as a single file + so that when the data is read, all of the stats correlate to the same time. The data + is from the time the device has come online. + + last and set to the old version of the data, and the latest data is stored in stats + + ''' + try: + self._last = self._stats + with open( f"/sys/block/{self._device}/stat", "r") as f: + curStats = f.readline().strip().split(" ") + self._stats = [int(l) for l in curStats if l] + except Exception as e: + print( f"Failure reading disk statistics for {self._device} error {e}" ) + + def _getStats( self ) -> list[int]: + ''' + Read the devices statistics from the device,and return it. + + Returns: + An array containing all of the data colleected about the device. + ''' + curData : list[int] = [] + + self._readStats() + if self._last == []: + curData = self._stats[:] + else: + curData = [ d-self._last[i] for i,d in enumerate( self._stats ) ] + return curData + + def readAllStats( self ) -> list[int]: + ''' + read all of the drive statisics from sysfs for the device. + + Returns + A list of all of the device stats + ''' + return self._getStats() + + def readSectors( self )-> int: + return self._getStats()[DriveStats.READ_SECTORS] + + def writeSectors( self ) -> int: + return self._getStats()[DriveStats.WRITE_SECTORS] + + def discardSectors( self ) -> int: + return self._getStats()[DriveStats.DISCARD_SECTORS] + + def readWriteSectors( self ) -> tuple[int,int]: + curData = self._getStats() + return (curData[DriveStats.READ_SECTORS],curData[DriveStats.WRITE_SECTORS]) + + +class systemData: + def __init__( self, drive : str = 'nvme0n1' ): + self._drive = drive + self._cpuTemp = CPUTemperature() + self._stats = DriveStats( self._drive ) + + @property + def CPUTemperature(self) -> int: + return self._cpuTemp.temperature + + @property + def fanSpeed( self ) -> int: + speed= 0 + try: + command = os.popen( 'cat /sys/devices/platform/cooling_fan/hwmon/*/fan1_input' ) + speed = int( command.read().strip()) + except Exception as error: + print( f"Could not determine fan speed, error {error}" ) + finally: + command.close() + + return speed + + @property + def driveTemp(self) -> float: + smartOutRaw = "" + cmd = f'sudo smartctl -A /dev/{self._drive}' + try: + command = os.popen( cmd ) + smartOutRaw = command.read() + except Exception as error: + print( f"Could not launch {cmd} error is {error}" ) + return 0.0 + finally: + command.close() + + smartOut = [ l for l in smartOutRaw.split('\n') if l] + for smartAttr in ["Temperature:","194","190"]: + try: + line = [l for l in smartOut if l.startswith(smartAttr)][0] + parts = [p for p in line.replace('\t',' ').split(' ') if p] + if smartAttr == "Temperature:": + return float(parts[1]) + else: + return float(parts[0]) + except IndexError: + pass + + return float(0.0) + + @property + def driveStats(self) -> tuple[float,float]: + data = self._stats.readWriteSectors() + readMB = (float(data[0]) * 512.0) #/ (1024.0 * 1024.0) + writeMB = (float(data[1]) * 512.0) #/ (1024.0 * 1024.0) + return (readMB, writeMB ) + +class CPULoad: + ''' + A class to help with obtaining the CPU load of the system. If there is more information + needed, we can add to this. + + Note: + This code automatically attempts to load the data from the system to initialize the + object with names, and an initial set of data. + + This may result in th first actual call return some not very consistent values, for + the time period being observed, but that difference is minimal. In otherwords if we + the period of time being measured is 1 second, and it's been a minute since this class + was initialized, the first period reported will be CPU load over the minute, not 1 second, + and the second period reported will be for a second... + + This is usually not an issue. + ''' + def __init__( self ): + self._previousData : dict[str,tuple] = self._getRawData() + self._names : list[str] = [] + for item in self._previousData: + self._names.append( item ) + + def _getRawData( self ) -> dict[str : tuple]: + ''' + Obtain the raw CPU data from the system (located in /prop/stat), and + return just the cpu0 -> cpux values. No assumption is made on the number of + cpus. + + Returns: + A dictionary is returned, the format is name = (total, idle). The total + time and idle time are use to determine the percent utilization of the system. + ''' + result = {} + with open( "/proc/stat", "r") as f: + allLines = f.readlines() + for line in allLines: + cpu = line.replace('\t', ' ').strip().split() + if (len(cpu[0]) > 3) and (cpu[0][:3] == "cpu"): + total = 0 + idle = 0 + for i in range( 1, len(cpu)): + total += int(cpu[i]) + if i == 4 or i == 5: + idle += int(cpu[i]) + result[cpu[0]] = (total,idle) + return result + + def getPercentages( self ) -> dict[ str : float ]: + ''' + Obtain the percent CPU utilization of the system for a period of time. + + This routine gets the current raw data from the system, and then performs + a delta from the prior time this function was called. This data is then run + through the following equation: + + utilization = ((total - idle)/total) * 100 + + If the snapshots are taken at relativy consistent intervals, the CPU + utilization in percent, is reasonably lose to the actual percentage. + + Returns: + A dictionary consisting of the name of the CPU, and a floating point + number representing the current utilization of that CPU. + ''' + results = {} + current = self._getRawData() + for item in current: + total = current[item][0] - self._previousData[item][0] + idle = current[item][1] - self._previousData[item][1] + percent = ((total - idle)/total) * 100 + results[item] = round(percent,2) + self._previousData = current + return results + + @property + def cpuNames( self ) -> list[str]: + ''' + Get a list of CPU names from the system. + + Returns: + a list of strings + ''' + return self._names + + def __len__(self) -> int: + ''' + handle getting the length (or count of CPU's). + + Returns: + Number of CPU's + ''' + return len(self._previousData) + + +if __name__ == "__main__": + data = systemData() + print( f"CPU Temp : {data.CPUTemperature}" ) + print( f"Fan Speed: {data.fanSpeed}" ) + print( f"NVME Temp: {data.driveTemp}" ) + print( f"Stats : {data.driveStats}" ) + + load = CPULoad() + print( f"Number of CPU's = {len(load)}" ) + for i in range(10): + time.sleep( 1 ) + percentage : dict[str:float] = load.getPercentages() + print( f"percentage: {percentage}" ) + for item in percentage: + print( f"{item} : {percentage[item]:.02f}" ) + \ No newline at end of file