diff --git a/monitor/cpuload.py b/monitor/cpuload.py deleted file mode 100755 index ccdbe05..0000000 --- a/monitor/cpuload.py +++ /dev/null @@ -1,107 +0,0 @@ -#!/usr/bin/env python3 -import os -import time - -class CPULoad: - ''' - A class to help with obtaining the CPU load of the system. If there is more information - needed, we can add to this. - - Note: - This code automatically attempts to load the data from the system to initialize the - object with names, and an initial set of data. - - This may result in th first actual call return some not very consistent values, for - the time period being observed, but that difference is minimal. In otherwords if we - the period of time being measured is 1 second, and it's been a minute since this class - was initialized, the first period reported will be CPU load over the minute, not 1 second, - and the second period reported will be for a second... - - This is usually not an issue. - ''' - def __init__( self ): - self._previousData : dict[str,tuple] = self._getRawData() - self._names : list[str] = [] - for item in self._previousData: - self._names.append( item ) - - def _getRawData( self ) -> dict[str : tuple]: - ''' - Obtain the raw CPU data from the system (located in /prop/stat), and - return just the cpu0 -> cpux values. No assumption is made on the number of - cpus. - - Returns: - A dictionary is returned, the format is name = (total, idle). The total - time and idle time are use to determine the percent utilization of the system. - ''' - result = {} - with open( "/proc/stat", "r") as f: - allLines = f.readlines() - for line in allLines: - cpu = line.replace('\t', ' ').strip().split() - if (len(cpu[0]) > 3) and (cpu[0][:3] == "cpu"): - total = 0 - idle = 0 - for i in range( 1, len(cpu)): - total += int(cpu[i]) - if i == 4 or i == 5: - idle += int(cpu[i]) - result[cpu[0]] = (total,idle) - return result - - def getPercentages( self ) -> dict[ str : float ]: - ''' - Obtain the percent CPU utilization of the system for a period of time. - - This routine gets the current raw data from the system, and then performs - a delta from the prior time this function was called. This data is then run - through the following equation: - - utilization = ((total - idle)/total) * 100 - - If the snapshots are taken at relativy consistent intervals, the CPU - utilization in percent, is reasonably lose to the actual percentage. - - Returns: - A dictionary consisting of the name of the CPU, and a floating point - number representing the current utilization of that CPU. - ''' - results = {} - current = self._getRawData() - for item in current: - total = current[item][0] - self._previousData[item][0] - idle = current[item][1] - self._previousData[item][1] - percent = ((total - idle)/total) * 100 - results[item] = round(percent,2) - self._previousData = current - return results - - @property - def cpuNames( self ) -> list[str]: - ''' - Get a list of CPU names from the system. - - Returns: - a list of strings - ''' - return self._names - - def __len__(self) -> int: - ''' - handle getting the length (or count of CPU's). - - Returns: - Number of CPU's - ''' - return len(self._previousData) - -if __name__ == "__main__": - load = CPULoad() - print( f"Number of CPU's = {len(load)}" ) - while True: - time.sleep( 1 ) - percentage : dict[str:float] = load.getPercentages() - print( f"percentage: {percentage}" ) - for item in percentage: - print( f"{item} : {percentage[item]:.02f}" ) diff --git a/monitor/monitor1up.py b/monitor/monitor1up.py deleted file mode 100755 index facd632..0000000 --- a/monitor/monitor1up.py +++ /dev/null @@ -1,196 +0,0 @@ -#!/usr/bin/python3 -# -# Setup environment and pull in all of the items we need from gpiozero. The -# gpiozero library is the new one that supports Raspberry PI 5's (and I suspect -# will be new direction for all prior version the RPIi.) -# -from gpiozero import CPUTemperature -import time -import os - -class DriveStat: - ''' - DriveStat class - - - This class gets the drive statistics from sysfs for the device passed - in. There are several statistics can can be obtained. Note that since - all of the data is pulled at the same time, it is upto the caller to - make sure all the stats needed are obtained at the same time. - - See: https://www.kernel.org/doc/html/latest/block/stat.html - - Parameters: - device - the name of the device to track - ''' - - READ_IOS = 0 - READ_MERGES = 1 - READ_SECTORS = 2 - READ_TICKS = 3 - WRITE_IOS = 4 - WRITE_MERGES = 5 - WRITE_SECTORS = 6 - WRITE_TICKS = 7 - IN_FLIGHT = 8 - IO_TICKS = 9 - TIME_IN_QUEUE = 10 - DISCARD_IOS = 11 - DISCARD_MERGES = 12 - DISCARD_SECTORS = 13 - DISCARD_TICS = 14 - FLUSH_IOS = 15 - FLUSH_TICKS = 16 - - def __init__( self, device:str ): - self._last : list[int] = [] - self._stats : list[int] = [] - self._device = device - self._readStats() - - def _readStats( self ): - ''' - Read the disk statistics. The stored statics in sysfs are stored as a single file - so that when the data is read, all of the stats correlate to the same time. The data - is from the time the device has come online. - - last and set to the old version of the data, and the latest data is stored in stats - - ''' - try: - self._last = self._stats - with open( f"/sys/block/{self._device}/stat", "r") as f: - curStats = f.readline().strip().split(" ") - self._stats = [int(l) for l in curStats if l] - except Exception as e: - print( f"Failure reading disk statistics for {self._device} error {e}" ) - - def _getStats( self ) -> list[int]: - ''' - Read the devices statistics from the device,and return it. - - Returns: - An array containing all of the data colleected about the device. - ''' - curData : list[int] = [] - - self._readStats() - if self._last == []: - curData = self._stats[:] - else: - curData = [ d-self._last[i] for i,d in enumerate( self._stats ) ] - return curData - - def readAllStats( self ) -> list[int]: - ''' - read all of the drive statisics from sysfs for the device. - - Returns - A list of all of the device stats - ''' - return self._getStats() - - def readSectors( self )-> int: - return self._getStats()[DriveStat.READ_SECTORS] - - def writeSectors( self ) -> int: - return self._getStats()[DriveStat.WRITE_SECTORS] - - def discardSectors( self ) -> int: - return self._getStats()[DriveStat.DISCARD_SECTORS] - - def readWriteSectors( self ) -> tuple[int,int]: - curData = self._getStats() - return (curData[DriveStat.READ_SECTORS],curData[DriveStat.WRITE_SECTORS]) - - -def setupTemperatureObject(): - ''' - Get a cpu temperature object, and set the min and max range. - - When the ranges are set to the non-default values, if the temperature is - less than min_temp we get 0, and when the temperature reaches the max we get - a value of 1. This value can be used directly as a duty cycle for the fan. - - Return: - A CPU temperature object - ''' - cpuTempObj = None - try: - cpuTempObj = CPUTemperature() - except Exception as error: - print( f"Error creating CPU temperature object, error is {error}" ) - - return cpuTempObj - -def getFanSpeed() -> int: - ''' - Obtain the speed of the fan attached to the CPU. This is accomplished reading - the information from sysfs. - - NOTE: There is an assumption that the fan s hanging off of /hwmon/hwmon3. This may - or may not be the case in all situations. - ''' - fanSpeed = 0 - try: - command = os.popen( 'cat /sys/devices/platform/cooling_fan/hwmon/*/fan1_input' ) - fanSpeed = int(command.read().strip()) - except: - pass - return fanSpeed - -def getNVMETemp(device : str) -> float: - ''' - Obtain the temperature of the device passed in, using smartctl. - - Parameters : - device - A string containing the device name. - - Returns: - The temperature as a float - ''' - smartOutRaw = "" - cmd = f'smartctl -A /dev/{device}' - try: - command = os.popen( cmd ) - smartOutRaw = command.read() - except Exception as error: - print( f"Could not launch {cmd} error is {error}" ) - return 0.0 - finally: - command.close() - - smartOut = [ l for l in smartOutRaw.split('\n') if l] - for smartAttr in ["Temperature:","194","190"]: - try: - line = [l for l in smartOut if l.startswith(smartAttr)][0] - parts = [p for p in line.replace('\t',' ').split(' ') if p] - if smartAttr == "Temperature:": - return float(parts[1]) - else: - return float(parts[0]) - except IndexError: - pass - - return float(0.0) - -def argonsysinfo_kbstr(kbval): - suffixidx = 0 - suffixlist = ["B","KiB", "MiB", "GiB", "TiB"] - while kbval > 1023 and suffixidx < len(suffixlist): - kbval = kbval // 1024 - suffixidx = suffixidx + 1 - return f"{kbval} {suffixlist[suffixidx]}" - - -stats = DriveStat( 'nvme0n1' ) -cpuTemp = setupTemperatureObject() - -while True: - os.system( "clear ") - print( f"CPU : {cpuTemp.temperature}" ) - print( f"Fan : {getFanSpeed()}" ) - print( f"NVME : {getNVMETemp('nvme0n1')}" ) - data = stats.readWriteSectors() - print( f"Read : {argonsysinfo_kbstr(data[0]*512)}/s" ) - print( f"Write: {argonsysinfo_kbstr(data[1]*512)}/s" ) - time.sleep(1) diff --git a/monitor/oneUpMon.py b/monitor/oneUpMon.py index fc93d37..0dc83e7 100755 --- a/monitor/oneUpMon.py +++ b/monitor/oneUpMon.py @@ -7,14 +7,13 @@ Requires: PyQt5 (including QtCharts) """ -import os import sys from systemsupport import systemData, CPULoad # -------------------------- # Globals # -------------------------- -sysdata = None +sysdata = systemData() cpuload = CPULoad() # -------------------------- @@ -23,7 +22,7 @@ cpuload = CPULoad() from PyQt5.QtCore import Qt, QTimer from PyQt5.QtGui import QPainter -from PyQt5.QtWidgets import QApplication, QMainWindow, QWidget, QGridLayout, QLabel +from PyQt5.QtWidgets import QApplication, QMainWindow, QWidget, QGridLayout from PyQt5.QtChart import QChart, QChartView, QLineSeries, QValueAxis class RollingChart(QWidget): @@ -39,7 +38,7 @@ class RollingChart(QWidget): def __init__(self, title: str, series_defs: list[tuple], y_min: float, y_max: float, window: int = 120, parent=None): super().__init__(parent) - self.window = window + self.pointWindow = window self.xpos = window - 1 self.chart = QChart() @@ -60,7 +59,7 @@ class RollingChart(QWidget): # want is the tick count etc, but NO lable on the axis. There does not # appear to be a way to do that. self.axis_x = QValueAxis() - self.axis_x.setRange(0, self.window) + self.axis_x.setRange(0, self.pointWindow) self.axis_x.setMinorTickCount( 2 ) self.axis_x.setTickCount( 10 ) self.axis_x.setLabelFormat("%d") @@ -108,7 +107,7 @@ class RollingChart(QWidget): pass # Trim series to rolling window - min_x_to_keep = max(0, self.xpos - self.window) + min_x_to_keep = max(0, self.xpos - self.pointWindow) self.axis_x.setRange(min_x_to_keep, self.xpos) for s in self.series: @@ -213,7 +212,7 @@ class RollingChartDynamic(RollingChart): pass # Trim series to rolling window - min_x_to_keep = max(0, self.xpos - self.window) + min_x_to_keep = max(0, self.xpos - self.pointWindow) self.axis_x.setRange(min_x_to_keep, self.xpos) if scaleUp: @@ -372,6 +371,5 @@ def main(): sys.exit(app.exec_()) if __name__ == "__main__": - sysdata = systemData() main() diff --git a/monitor/oneUpSupport.py b/monitor/oneUpSupport.py deleted file mode 100755 index e2d6651..0000000 --- a/monitor/oneUpSupport.py +++ /dev/null @@ -1,170 +0,0 @@ -#!/usr/bin/python3 -# -# Setup environment and pull in all of the items we need from gpiozero. The -# gpiozero library is the new one that supports Raspberry PI 5's (and I suspect -# will be new direction for all prior version the RPIi.) -# -from gpiozero import CPUTemperature -import time -import os - -class DriveStats: - ''' - DriveStat class - - - This class gets the drive statistics from sysfs for the device passed - in. There are several statistics can can be obtained. Note that since - all of the data is pulled at the same time, it is upto the caller to - make sure all the stats needed are obtained at the same time. - - See: https://www.kernel.org/doc/html/latest/block/stat.html - - Parameters: - device - the name of the device to track - ''' - - READ_IOS = 0 - READ_MERGES = 1 - READ_SECTORS = 2 - READ_TICKS = 3 - WRITE_IOS = 4 - WRITE_MERGES = 5 - WRITE_SECTORS = 6 - WRITE_TICKS = 7 - IN_FLIGHT = 8 - IO_TICKS = 9 - TIME_IN_QUEUE = 10 - DISCARD_IOS = 11 - DISCARD_MERGES = 12 - DISCARD_SECTORS = 13 - DISCARD_TICS = 14 - FLUSH_IOS = 15 - FLUSH_TICKS = 16 - - def __init__( self, device:str ): - self._last : list[int] = [] - self._stats : list[int] = [] - self._device = device - self._readStats() - - def _readStats( self ): - ''' - Read the disk statistics. The stored statics in sysfs are stored as a single file - so that when the data is read, all of the stats correlate to the same time. The data - is from the time the device has come online. - - last and set to the old version of the data, and the latest data is stored in stats - - ''' - try: - self._last = self._stats - with open( f"/sys/block/{self._device}/stat", "r") as f: - curStats = f.readline().strip().split(" ") - self._stats = [int(l) for l in curStats if l] - except Exception as e: - print( f"Failure reading disk statistics for {self._device} error {e}" ) - - def _getStats( self ) -> list[int]: - ''' - Read the devices statistics from the device,and return it. - - Returns: - An array containing all of the data colleected about the device. - ''' - curData : list[int] = [] - - self._readStats() - if self._last == []: - curData = self._stats[:] - else: - curData = [ d-self._last[i] for i,d in enumerate( self._stats ) ] - return curData - - def readAllStats( self ) -> list[int]: - ''' - read all of the drive statisics from sysfs for the device. - - Returns - A list of all of the device stats - ''' - return self._getStats() - - def readSectors( self )-> int: - return self._getStats()[DriveStats.READ_SECTORS] - - def writeSectors( self ) -> int: - return self._getStats()[DriveStats.WRITE_SECTORS] - - def discardSectors( self ) -> int: - return self._getStats()[DriveStats.DISCARD_SECTORS] - - def readWriteSectors( self ) -> tuple[int,int]: - curData = self._getStats() - return (curData[DriveStats.READ_SECTORS],curData[DriveStats.WRITE_SECTORS]) - - -class systemData: - def __init__( self, drive : str = 'nvme0n1' ): - self._drive = drive - self._cpuTemp = CPUTemperature() - self._stats = DriveStats( self._drive ) - - @property - def CPUTemperature(self) -> int: - return self._cpuTemp.temperature - - @property - def fanSpeed( self ) -> int: - speed= 0 - try: - command = os.popen( 'cat /sys/devices/platform/cooling_fan/hwmon/*/fan1_input' ) - speed = int( command.read().strip()) - except Exception as error: - print( f"Could not determine fan speed, error {error}" ) - finally: - command.close() - - return speed - - @property - def driveTemp(self) -> float: - smartOutRaw = "" - cmd = f'sudo smartctl -A /dev/{self._drive}' - try: - command = os.popen( cmd ) - smartOutRaw = command.read() - except Exception as error: - print( f"Could not launch {cmd} error is {error}" ) - return 0.0 - finally: - command.close() - - smartOut = [ l for l in smartOutRaw.split('\n') if l] - for smartAttr in ["Temperature:","194","190"]: - try: - line = [l for l in smartOut if l.startswith(smartAttr)][0] - parts = [p for p in line.replace('\t',' ').split(' ') if p] - if smartAttr == "Temperature:": - return float(parts[1]) - else: - return float(parts[0]) - except IndexError: - pass - - return float(0.0) - - @property - def driveStats(self) -> tuple[float,float]: - data = self._stats.readWriteSectors() - readMB = (float(data[0]) * 512.0) #/ (1024.0 * 1024.0) - writeMB = (float(data[1]) * 512.0) #/ (1024.0 * 1024.0) - return (readMB, writeMB ) - - -if __name__ == "__main__": - data = systemData() - print( f"CPU Temp : {data.CPUTemperature}" ) - print( f"Fan Speed: {data.fanSpeed}" ) - print( f"NVME Temp: {data.driveTemp}" ) - print( f"Stats : {data.driveStats}" ) - diff --git a/monitor/simple_monitor.py b/monitor/simple_monitor.py index 44ffe1d..62aaace 100755 --- a/monitor/simple_monitor.py +++ b/monitor/simple_monitor.py @@ -13,7 +13,7 @@ Replace the stub return values with your real implementations later. import sys from typing import Tuple -from oneUpSupport import systemData +from systemsupport import systemData, CPULoad sysdata = systemData() diff --git a/monitor/systemsupport.py b/monitor/systemsupport.py index 270cdb1..a1568a7 100755 --- a/monitor/systemsupport.py +++ b/monitor/systemsupport.py @@ -58,7 +58,7 @@ class DriveStats: ''' try: self._last = self._stats - with open( f"/sys/block/{self._device}/stat", "r") as f: + with open( f"/sys/block/{self._device}/stat", "r",encoding="utf8") as f: curStats = f.readline().strip().split(" ") self._stats = [int(l) for l in curStats if l] except Exception as e: @@ -114,7 +114,7 @@ class systemData: return self._cpuTemp.temperature @property - def fanSpeed( self ) -> int: + def fanSpeed( self ) -> float: speed= 0 try: command = os.popen( 'cat /sys/devices/platform/cooling_fan/hwmon/*/fan1_input' ) @@ -124,7 +124,7 @@ class systemData: finally: command.close() - return speed + return float(speed) @property def driveTemp(self) -> float: @@ -155,9 +155,9 @@ class systemData: @property def driveStats(self) -> tuple[float,float]: - data = self._stats.readWriteSectors() - readMB = (float(data[0]) * 512.0) #/ (1024.0 * 1024.0) - writeMB = (float(data[1]) * 512.0) #/ (1024.0 * 1024.0) + _data = self._stats.readWriteSectors() + readMB = (float(_data[0]) * 512.0) #/ (1024.0 * 1024.0) + writeMB = (float(_data[1]) * 512.0) #/ (1024.0 * 1024.0) return (readMB, writeMB ) class CPULoad: @@ -178,12 +178,20 @@ class CPULoad: This is usually not an issue. ''' def __init__( self ): - self._previousData : dict[str,tuple] = self._getRawData() + # + # Get the current data + # + self._previousData : dict[str,tuple[int,int]] = self._getRawData() self._names : list[str] = [] - for item in self._previousData: - self._names.append( item ) + # + # For each CPU, reset the total and idle amount, and create the list + # of names + # + for _item in self._previousData: + self._previousData[_item] = (0,0) + self._names.append(_item) - def _getRawData( self ) -> dict[str : tuple]: + def _getRawData( self ) -> dict[str,tuple[int,int]]: ''' Obtain the raw CPU data from the system (located in /prop/stat), and return just the cpu0 -> cpux values. No assumption is made on the number of @@ -194,21 +202,21 @@ class CPULoad: time and idle time are use to determine the percent utilization of the system. ''' result = {} - with open( "/proc/stat", "r") as f: + with open( "/proc/stat", "r",encoding="utf8") as f: allLines = f.readlines() for line in allLines: cpu = line.replace('\t', ' ').strip().split() if (len(cpu[0]) > 3) and (cpu[0][:3] == "cpu"): total = 0 idle = 0 - for i in range( 1, len(cpu)): - total += int(cpu[i]) - if i == 4 or i == 5: - idle += int(cpu[i]) + for _index in range( 1, len(cpu)): + total += int(cpu[_index]) + if _index == 4 or _index == 5: + idle += int(cpu[_index]) result[cpu[0]] = (total,idle) return result - def getPercentages( self ) -> dict[ str : float ]: + def getPercentages( self ) -> dict[str,float]: ''' Obtain the percent CPU utilization of the system for a period of time. @@ -227,11 +235,11 @@ class CPULoad: ''' results = {} current = self._getRawData() - for item in current: - total = current[item][0] - self._previousData[item][0] - idle = current[item][1] - self._previousData[item][1] + for _item in current: + total = current[_item][0] - self._previousData[_item][0] + idle = current[_item][1] - self._previousData[_item][1] percent = ((total - idle)/total) * 100 - results[item] = round(percent,2) + results[_item] = round(percent,2) self._previousData = current return results @@ -266,8 +274,8 @@ if __name__ == "__main__": print( f"Number of CPU's = {len(load)}" ) for i in range(10): time.sleep( 1 ) - percentage : dict[str:float] = load.getPercentages() + percentage : dict[str,float] = load.getPercentages() print( f"percentage: {percentage}" ) for item in percentage: print( f"{item} : {percentage[item]:.02f}" ) - \ No newline at end of file +