Merge pull request #8 from JeffCurless/upateMonitor
Update system monitor.
This commit is contained in:
@@ -3,13 +3,38 @@ import os
|
|||||||
import time
|
import time
|
||||||
|
|
||||||
class CPULoad:
|
class CPULoad:
|
||||||
|
'''
|
||||||
|
A class to help with obtaining the CPU load of the system. If there is more information
|
||||||
|
needed, we can add to this.
|
||||||
|
|
||||||
|
Note:
|
||||||
|
This code automatically attempts to load the data from the system to initialize the
|
||||||
|
object with names, and an initial set of data.
|
||||||
|
|
||||||
|
This may result in th first actual call return some not very consistent values, for
|
||||||
|
the time period being observed, but that difference is minimal. In otherwords if we
|
||||||
|
the period of time being measured is 1 second, and it's been a minute since this class
|
||||||
|
was initialized, the first period reported will be CPU load over the minute, not 1 second,
|
||||||
|
and the second period reported will be for a second...
|
||||||
|
|
||||||
|
This is usually not an issue.
|
||||||
|
'''
|
||||||
def __init__( self ):
|
def __init__( self ):
|
||||||
self._previousData = self._getRawData()
|
self._previousData : dict[str,tuple] = self._getRawData()
|
||||||
self._names = []
|
self._names : list[str] = []
|
||||||
for item in self._previousData:
|
for item in self._previousData:
|
||||||
self._names.append( item )
|
self._names.append( item )
|
||||||
|
|
||||||
def _getRawData( self ):
|
def _getRawData( self ) -> dict[str : tuple]:
|
||||||
|
'''
|
||||||
|
Obtain the raw CPU data from the system (located in /prop/stat), and
|
||||||
|
return just the cpu0 -> cpux values. No assumption is made on the number of
|
||||||
|
cpus.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
A dictionary is returned, the format is name = (total, idle). The total
|
||||||
|
time and idle time are use to determine the percent utilization of the system.
|
||||||
|
'''
|
||||||
result = {}
|
result = {}
|
||||||
with open( "/proc/stat", "r") as f:
|
with open( "/proc/stat", "r") as f:
|
||||||
allLines = f.readlines()
|
allLines = f.readlines()
|
||||||
@@ -25,22 +50,50 @@ class CPULoad:
|
|||||||
result[cpu[0]] = (total,idle)
|
result[cpu[0]] = (total,idle)
|
||||||
return result
|
return result
|
||||||
|
|
||||||
def getPercentages( self ):
|
def getPercentages( self ) -> dict[ str : float ]:
|
||||||
|
'''
|
||||||
|
Obtain the percent CPU utilization of the system for a period of time.
|
||||||
|
|
||||||
|
This routine gets the current raw data from the system, and then performs
|
||||||
|
a delta from the prior time this function was called. This data is then run
|
||||||
|
through the following equation:
|
||||||
|
|
||||||
|
utilization = ((total - idle)/total) * 100
|
||||||
|
|
||||||
|
If the snapshots are taken at relativy consistent intervals, the CPU
|
||||||
|
utilization in percent, is reasonably lose to the actual percentage.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
A dictionary consisting of the name of the CPU, and a floating point
|
||||||
|
number representing the current utilization of that CPU.
|
||||||
|
'''
|
||||||
results = {}
|
results = {}
|
||||||
current = self._getRawData()
|
current = self._getRawData()
|
||||||
for item in current:
|
for item in current:
|
||||||
total = current[item][0] - self._previousData[item][0]
|
total = current[item][0] - self._previousData[item][0]
|
||||||
idle = current[item][1] - self._previousData[item][1]
|
idle = current[item][1] - self._previousData[item][1]
|
||||||
percent = ((total - idle)/total) * 100
|
percent = ((total - idle)/total) * 100
|
||||||
results[item] = percent
|
results[item] = round(percent,2)
|
||||||
self._previousData = current
|
self._previousData = current
|
||||||
return results
|
return results
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def cpuNames( self ):
|
def cpuNames( self ) -> list[str]:
|
||||||
|
'''
|
||||||
|
Get a list of CPU names from the system.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
a list of strings
|
||||||
|
'''
|
||||||
return self._names
|
return self._names
|
||||||
|
|
||||||
def __len__(self):
|
def __len__(self) -> int:
|
||||||
|
'''
|
||||||
|
handle getting the length (or count of CPU's).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Number of CPU's
|
||||||
|
'''
|
||||||
return len(self._previousData)
|
return len(self._previousData)
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
@@ -48,6 +101,7 @@ if __name__ == "__main__":
|
|||||||
print( f"Number of CPU's = {len(load)}" )
|
print( f"Number of CPU's = {len(load)}" )
|
||||||
while True:
|
while True:
|
||||||
time.sleep( 1 )
|
time.sleep( 1 )
|
||||||
percentage = load.getPercentages()
|
percentage : dict[str:float] = load.getPercentages()
|
||||||
|
print( f"percentage: {percentage}" )
|
||||||
for item in percentage:
|
for item in percentage:
|
||||||
print( f"{item} : {percentage[item]:.02f}" )
|
print( f"{item} : {percentage[item]:.02f}" )
|
||||||
|
|||||||
@@ -7,12 +7,9 @@ Requires: PyQt5 (including QtCharts)
|
|||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import sys
|
|
||||||
from typing import Tuple, List
|
|
||||||
from gpiozero import CPUTemperature
|
|
||||||
from oneUpSupport import systemData
|
|
||||||
from cpuload import CPULoad
|
|
||||||
import os
|
import os
|
||||||
|
import sys
|
||||||
|
from systemsupport import systemData, CPULoad
|
||||||
|
|
||||||
# --------------------------
|
# --------------------------
|
||||||
# Globals
|
# Globals
|
||||||
@@ -30,25 +27,27 @@ from PyQt5.QtWidgets import QApplication, QMainWindow, QWidget, QGridLayout, QLa
|
|||||||
from PyQt5.QtChart import QChart, QChartView, QLineSeries, QValueAxis
|
from PyQt5.QtChart import QChart, QChartView, QLineSeries, QValueAxis
|
||||||
|
|
||||||
class RollingChart(QWidget):
|
class RollingChart(QWidget):
|
||||||
"""
|
'''
|
||||||
A reusable chart widget with one or more QLineSeries and a rolling X window.
|
A reusable chart widget with one or more QLineSeries and a rolling X window.
|
||||||
|
|
||||||
Args:
|
Parameters:
|
||||||
title: Chart title
|
title - Chart title.
|
||||||
series_defs: List of (name, color_qt_str or None) for each line
|
series_defs - List of (name, color_qt_str or None) for each line.
|
||||||
y_min, y_max: Fixed Y axis range
|
y_min,y_max - Fixed Y axis range.
|
||||||
window: number of points to keep (points are 1 per tick by default)
|
window - Number of points to keep (points are 1 per tick by default).
|
||||||
"""
|
'''
|
||||||
def __init__(self, title: str, series_defs: List[tuple], y_min: float, y_max: float, window: int = 120, parent=None):
|
def __init__(self, title: str, series_defs: list[tuple], y_min: float, y_max: float, window: int = 120, parent=None):
|
||||||
super().__init__(parent)
|
super().__init__(parent)
|
||||||
|
|
||||||
self.window = window
|
self.window = window
|
||||||
self.x = 0
|
self.xpos = window - 1
|
||||||
self.series: List[QLineSeries] = []
|
self.chart = QChart()
|
||||||
self.chart = QChart()
|
|
||||||
self.chart.setTitle(title)
|
self.chart.setTitle(title)
|
||||||
self.chart.legend().setVisible(len(series_defs) > 1)
|
self.chart.legend().setVisible(len(series_defs) > 1)
|
||||||
self.chart.legend().setAlignment(Qt.AlignBottom)
|
self.chart.legend().setAlignment(Qt.AlignBottom)
|
||||||
|
|
||||||
|
self.series:list[QLineSeries] = []
|
||||||
for name, color in series_defs:
|
for name, color in series_defs:
|
||||||
s = QLineSeries()
|
s = QLineSeries()
|
||||||
s.setName(name)
|
s.setName(name)
|
||||||
@@ -57,14 +56,20 @@ class RollingChart(QWidget):
|
|||||||
self.series.append(s)
|
self.series.append(s)
|
||||||
self.chart.addSeries(s)
|
self.chart.addSeries(s)
|
||||||
|
|
||||||
# Axes
|
# Setup X Axis... Note, setVisible disables all of this, however whatI
|
||||||
|
# want is the tick count etc, but NO lable on the axis. There does not
|
||||||
|
# appear to be a way to do that.
|
||||||
self.axis_x = QValueAxis()
|
self.axis_x = QValueAxis()
|
||||||
self.axis_x.setRange(0, self.window)
|
self.axis_x.setRange(0, self.window)
|
||||||
#self.axis_x.setTitleText("Seconds")
|
self.axis_x.setMinorTickCount( 2 )
|
||||||
|
self.axis_x.setTickCount( 10 )
|
||||||
self.axis_x.setLabelFormat("%d")
|
self.axis_x.setLabelFormat("%d")
|
||||||
|
self.axis_x.setVisible(False)
|
||||||
|
|
||||||
|
# Setup Y Axis...
|
||||||
self.axis_y = QValueAxis()
|
self.axis_y = QValueAxis()
|
||||||
self.axis_y.setRange(y_min, y_max)
|
self.axis_y.setRange(y_min, y_max)
|
||||||
|
self.axis_y.setLabelFormat( "%d" )
|
||||||
|
|
||||||
self.chart.addAxis(self.axis_x, Qt.AlignBottom)
|
self.chart.addAxis(self.axis_x, Qt.AlignBottom)
|
||||||
self.chart.addAxis(self.axis_y, Qt.AlignLeft)
|
self.chart.addAxis(self.axis_y, Qt.AlignLeft)
|
||||||
@@ -80,26 +85,31 @@ class RollingChart(QWidget):
|
|||||||
layout.setContentsMargins(0, 0, 0, 0)
|
layout.setContentsMargins(0, 0, 0, 0)
|
||||||
layout.addWidget(self.view, 0, 0)
|
layout.addWidget(self.view, 0, 0)
|
||||||
|
|
||||||
def append(self, values: List[float]):
|
def append(self, values: list[float]):
|
||||||
"""
|
'''
|
||||||
Append one sample (for each series) at the next x value. Handles rolling window.
|
Append one sample (for each series) at the next x value. Handles rolling window.
|
||||||
values must match the number of series.
|
values must match the number of series.
|
||||||
"""
|
|
||||||
self.x += 1
|
Parameters:
|
||||||
|
values - A list of floating point numbers, on per data series in the
|
||||||
|
chart.
|
||||||
|
'''
|
||||||
|
self.xpos += 1
|
||||||
for s, v in zip(self.series, values):
|
for s, v in zip(self.series, values):
|
||||||
# Handle NaN by skipping, or plot zero—here we clamp None/NaN to None and skip
|
# Handle NaN by skipping, or plot zero—here we clamp None/NaN to None and skip
|
||||||
try:
|
try:
|
||||||
if v is None:
|
if v is None:
|
||||||
continue
|
continue
|
||||||
# If you want to clamp, do it here: v = max(self.axis_y.min(), min(self.axis_y.max(), v))
|
# If you want to clamp, do it here: v = max(self.axis_y.min(), min(self.axis_y.max(), v))
|
||||||
s.append(self.x, float(v))
|
s.append(self.xpos, float(v))
|
||||||
except Exception:
|
except Exception as error:
|
||||||
# ignore bad data points
|
# ignore bad data points
|
||||||
|
print( f"Exception error {error}" )
|
||||||
pass
|
pass
|
||||||
|
|
||||||
# Trim series to rolling window
|
# Trim series to rolling window
|
||||||
min_x_to_keep = max(0, self.x - self.window)
|
min_x_to_keep = max(0, self.xpos - self.window)
|
||||||
self.axis_x.setRange(min_x_to_keep, self.x)
|
self.axis_x.setRange(min_x_to_keep, self.xpos)
|
||||||
|
|
||||||
for s in self.series:
|
for s in self.series:
|
||||||
# Efficient trim: remove points with x < min_x_to_keep
|
# Efficient trim: remove points with x < min_x_to_keep
|
||||||
@@ -116,11 +126,141 @@ class RollingChart(QWidget):
|
|||||||
hi = mid
|
hi = mid
|
||||||
s.replace(points[lo:]) # keep tail only
|
s.replace(points[lo:]) # keep tail only
|
||||||
|
|
||||||
|
class scaleValues:
|
||||||
|
def __init__( self, range_y ):
|
||||||
|
self.index = 0
|
||||||
|
self.valueRange = range_y
|
||||||
|
|
||||||
|
@property
|
||||||
|
def scale( self ):
|
||||||
|
return self.valueRange[self.index][1]
|
||||||
|
|
||||||
|
def scaleValue(self, value : float ):
|
||||||
|
return value / self.scale
|
||||||
|
|
||||||
|
def nextScale( self ):
|
||||||
|
if (self.index + 1) < len(self.valueRange):
|
||||||
|
self.index += 1
|
||||||
|
#print( f"Switched scale to {self.valueRange[self.index]}")
|
||||||
|
|
||||||
|
def prevScale( self ):
|
||||||
|
if self.index > 0:
|
||||||
|
self.index -= 1
|
||||||
|
#print( f"Switches scale to {self.valueRange[self.index]}")
|
||||||
|
|
||||||
|
def scalePointsDown( self, points ):
|
||||||
|
amount = self.valueRange[self.index][1]
|
||||||
|
for point in points:
|
||||||
|
point.setY(point.y() / amount)
|
||||||
|
|
||||||
|
def scaleDown( self, value ):
|
||||||
|
return value / 1024
|
||||||
|
|
||||||
|
def scaleUp( self, value ):
|
||||||
|
return value * 1024
|
||||||
|
|
||||||
|
def scalePointsUp( self, points ):
|
||||||
|
amount = self.valueRange[self.index][1]
|
||||||
|
for point in points:
|
||||||
|
point.setY(point.y() * amount)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def name( self ):
|
||||||
|
return self.valueRange[self.index][0]
|
||||||
|
|
||||||
|
|
||||||
|
class RollingChartDynamic(RollingChart):
|
||||||
|
def __init__(self, title : str, series_defs: list[tuple], range_y : list[tuple], window=120,parent=None):
|
||||||
|
self.maxY = 512
|
||||||
|
super().__init__(title,series_defs,0,self.maxY,window,parent)
|
||||||
|
self.title = title
|
||||||
|
self.max = 0
|
||||||
|
self.scale = scaleValues(range_y)
|
||||||
|
self.chart.setTitle( title+ f" ({self.scale.name})" )
|
||||||
|
|
||||||
|
def getBestFit( self, value ):
|
||||||
|
values = [4,8,16,32,64,128,256,512,1024]
|
||||||
|
for i in values:
|
||||||
|
if value < i:
|
||||||
|
return i
|
||||||
|
return 4
|
||||||
|
def append(self, values: list[float]):
|
||||||
|
'''
|
||||||
|
Append one sample (for each series) at the next x value. Handles rolling window.
|
||||||
|
values must match the number of series.
|
||||||
|
|
||||||
|
Parameters:
|
||||||
|
values - A list of floating point numbers, on per data series in the
|
||||||
|
chart.
|
||||||
|
'''
|
||||||
|
scaleUp = False
|
||||||
|
self.xpos += 1
|
||||||
|
for s, v in zip(self.series, values):
|
||||||
|
# Handle NaN by skipping, or plot zero—here we clamp None/NaN to None and skip
|
||||||
|
try:
|
||||||
|
if v is None:
|
||||||
|
continue
|
||||||
|
sv = self.scale.scaleValue(v)
|
||||||
|
if sv > 1024:
|
||||||
|
scaleUp = True
|
||||||
|
# If you want to clamp, do it here: v = max(self.axis_y.min(), min(self.axis_y.max(), v))
|
||||||
|
#if v:
|
||||||
|
# print( f"value : {v} scaled: {sv} " )
|
||||||
|
s.append(self.xpos, float(sv))
|
||||||
|
except Exception as error:
|
||||||
|
# ignore bad data points
|
||||||
|
print( f"Exception error {error}" )
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Trim series to rolling window
|
||||||
|
min_x_to_keep = max(0, self.xpos - self.window)
|
||||||
|
self.axis_x.setRange(min_x_to_keep, self.xpos)
|
||||||
|
|
||||||
|
if scaleUp:
|
||||||
|
self.scale.nextScale()
|
||||||
|
self.chart.setTitle(self.title + f" ({self.scale.name})" )
|
||||||
|
|
||||||
|
maxV = 0
|
||||||
|
for s in self.series:
|
||||||
|
drop = 0
|
||||||
|
points = s.pointsVector()
|
||||||
|
for index, point in enumerate(points):
|
||||||
|
if point.x() < min_x_to_keep:
|
||||||
|
drop = index
|
||||||
|
if scaleUp:
|
||||||
|
point.setY( self.scale.scaleDown(point.y()))
|
||||||
|
if maxV < point.y():
|
||||||
|
maxV = point.y()
|
||||||
|
s.replace( points[drop:] )
|
||||||
|
|
||||||
|
if maxV > 1:
|
||||||
|
self.axis_y.setRange( 0, self.getBestFit(maxV) )
|
||||||
|
|
||||||
|
#print( f"maxV left is {maxV}" )
|
||||||
|
if maxV < 1:
|
||||||
|
self.scale.prevScale()
|
||||||
|
self.chart.setTitle( self.title + f" ({self.scale.name})")
|
||||||
|
for s in self.series:
|
||||||
|
points = s.pointsVector()
|
||||||
|
for point in points:
|
||||||
|
point.setY( self.scale.scaleUp(point.y()))
|
||||||
|
s.replace(points)
|
||||||
|
|
||||||
class MonitorWindow(QMainWindow):
|
class MonitorWindow(QMainWindow):
|
||||||
|
'''
|
||||||
|
Creating a window to monitor various system aspects.
|
||||||
|
|
||||||
|
Parameters:
|
||||||
|
refresh_ms - Time between refreshes of data on screen, in milliseconds, the
|
||||||
|
default is 1 second.
|
||||||
|
window - How much data do we want to store in the graph? Each data point
|
||||||
|
is a data refresh period.
|
||||||
|
Parent - Owning parent of this window... default is None.
|
||||||
|
'''
|
||||||
def __init__(self, refresh_ms: int = 1000, window = 120, parent=None):
|
def __init__(self, refresh_ms: int = 1000, window = 120, parent=None):
|
||||||
super().__init__(parent)
|
super().__init__(parent)
|
||||||
self.setWindowTitle("Argon 1UP Monitor")
|
|
||||||
|
self.setWindowTitle("System Monitor")
|
||||||
self.setMinimumSize(900, 900)
|
self.setMinimumSize(900, 900)
|
||||||
|
|
||||||
central = QWidget(self)
|
central = QWidget(self)
|
||||||
@@ -155,14 +295,14 @@ class MonitorWindow(QMainWindow):
|
|||||||
window=window
|
window=window
|
||||||
)
|
)
|
||||||
|
|
||||||
self.io_chart = RollingChart(
|
self.io_chart = RollingChartDynamic(
|
||||||
title="NVMe I/O (MB/s)",
|
title="Disk I/O",
|
||||||
series_defs=[
|
series_defs=[
|
||||||
("Read MB/s", None),
|
("Read", None),
|
||||||
("Write MB/s", None),
|
("Write", None),
|
||||||
],
|
],
|
||||||
y_min=0, y_max=1100, # adjust ceiling for your device
|
range_y=[("Bytes/s", 1),("KiB/s",1024),("MiB/s", 1024*1024),("GiB/s",1024*1024*1024)],
|
||||||
window=window
|
window=window,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Layout: 2x2 grid (CPU, NVMe on top; IO full width bottom)
|
# Layout: 2x2 grid (CPU, NVMe on top; IO full width bottom)
|
||||||
@@ -171,30 +311,40 @@ class MonitorWindow(QMainWindow):
|
|||||||
grid.addWidget(self.cpu_chart, 2, 0, 1, 1 )
|
grid.addWidget(self.cpu_chart, 2, 0, 1, 1 )
|
||||||
grid.addWidget(self.fan_chart, 2, 1, 1, 1 )
|
grid.addWidget(self.fan_chart, 2, 1, 1, 1 )
|
||||||
|
|
||||||
|
# Get the initial information from the syste
|
||||||
|
self.refresh_metrics()
|
||||||
|
|
||||||
# Timer
|
# Timer
|
||||||
self.timer = QTimer(self)
|
self.timer = QTimer(self)
|
||||||
self.timer.timeout.connect(self.refresh_metrics)
|
self.timer.timeout.connect(self.refresh_metrics)
|
||||||
self.timer.start(refresh_ms)
|
self.timer.start(refresh_ms)
|
||||||
|
|
||||||
self.refresh_metrics()
|
|
||||||
|
|
||||||
def refresh_metrics(self):
|
def refresh_metrics(self):
|
||||||
# Gather metrics with safety
|
'''
|
||||||
|
This routine is called periodically, as setup in the __init__ functon. Since this
|
||||||
|
routine calls out to other things, we want to make sure that there is no possible
|
||||||
|
exception, so everything needs to be wrapped in a handler.
|
||||||
|
|
||||||
|
'''
|
||||||
|
# Obtain the CPU temperature
|
||||||
try:
|
try:
|
||||||
cpu_c = float(sysdata.CPUTemperature)
|
cpu_c = float(sysdata.CPUTemperature)
|
||||||
except Exception:
|
except Exception:
|
||||||
cpu_c = None
|
cpu_c = None
|
||||||
|
|
||||||
|
# Obtain the current fan speed
|
||||||
try:
|
try:
|
||||||
fan_speed = sysdata.fanSpeed
|
fan_speed = sysdata.fanSpeed
|
||||||
except Exception:
|
except Exception:
|
||||||
fan_speed = None
|
fan_speed = None
|
||||||
|
|
||||||
|
# Obtain the NVMe device temperature
|
||||||
try:
|
try:
|
||||||
nvme_c = sysdata.driveTemp
|
nvme_c = sysdata.driveTemp
|
||||||
except Exception:
|
except Exception:
|
||||||
nvme_c = None
|
nvme_c = None
|
||||||
|
|
||||||
|
# Obtain the NVMe Device read and write rates
|
||||||
try:
|
try:
|
||||||
read_mb, write_mb = sysdata.driveStats
|
read_mb, write_mb = sysdata.driveStats
|
||||||
read_mb = float(read_mb)
|
read_mb = float(read_mb)
|
||||||
@@ -202,13 +352,12 @@ class MonitorWindow(QMainWindow):
|
|||||||
except Exception:
|
except Exception:
|
||||||
read_mb, write_mb = None, None
|
read_mb, write_mb = None, None
|
||||||
|
|
||||||
|
# Get the CPU load precentages
|
||||||
try:
|
try:
|
||||||
p = cpuload.getPercentages()
|
p = cpuload.getPercentages()
|
||||||
values = []
|
values = [p[name] for name in cpuload.cpuNames]
|
||||||
for i in range( len(cpuload) ):
|
|
||||||
values.append( round( p[f'cpu{i}'], 2 ) )
|
|
||||||
except Exception:
|
except Exception:
|
||||||
values = [ None for i in range( len( cpuload) ) ]
|
values = [ None for name in cpuload.cpuNames ]
|
||||||
|
|
||||||
# Append to charts
|
# Append to charts
|
||||||
self.cpu_chart.append([cpu_c,nvme_c])
|
self.cpu_chart.append([cpu_c,nvme_c])
|
||||||
|
|||||||
@@ -113,7 +113,6 @@ class systemData:
|
|||||||
def CPUTemperature(self) -> int:
|
def CPUTemperature(self) -> int:
|
||||||
return self._cpuTemp.temperature
|
return self._cpuTemp.temperature
|
||||||
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def fanSpeed( self ) -> int:
|
def fanSpeed( self ) -> int:
|
||||||
speed= 0
|
speed= 0
|
||||||
@@ -157,8 +156,8 @@ class systemData:
|
|||||||
@property
|
@property
|
||||||
def driveStats(self) -> tuple[float,float]:
|
def driveStats(self) -> tuple[float,float]:
|
||||||
data = self._stats.readWriteSectors()
|
data = self._stats.readWriteSectors()
|
||||||
readMB = (float(data[0]) * 512.0) / (1024.0 * 1024.0)
|
readMB = (float(data[0]) * 512.0) #/ (1024.0 * 1024.0)
|
||||||
writeMB = (float(data[1]) * 512.0) / (1024.0 * 1024.0)
|
writeMB = (float(data[1]) * 512.0) #/ (1024.0 * 1024.0)
|
||||||
return (readMB, writeMB )
|
return (readMB, writeMB )
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
273
monitor/systemsupport.py
Executable file
273
monitor/systemsupport.py
Executable file
@@ -0,0 +1,273 @@
|
|||||||
|
#!/usr/bin/python3
|
||||||
|
#
|
||||||
|
# Setup environment and pull in all of the items we need from gpiozero. The
|
||||||
|
# gpiozero library is the new one that supports Raspberry PI 5's (and I suspect
|
||||||
|
# will be new direction for all prior version the RPIi.)
|
||||||
|
#
|
||||||
|
from gpiozero import CPUTemperature
|
||||||
|
import time
|
||||||
|
import os
|
||||||
|
|
||||||
|
class DriveStats:
|
||||||
|
'''
|
||||||
|
DriveStat class -
|
||||||
|
|
||||||
|
This class gets the drive statistics from sysfs for the device passed
|
||||||
|
in. There are several statistics can can be obtained. Note that since
|
||||||
|
all of the data is pulled at the same time, it is upto the caller to
|
||||||
|
make sure all the stats needed are obtained at the same time.
|
||||||
|
|
||||||
|
See: https://www.kernel.org/doc/html/latest/block/stat.html
|
||||||
|
|
||||||
|
Parameters:
|
||||||
|
device - the name of the device to track
|
||||||
|
'''
|
||||||
|
|
||||||
|
READ_IOS = 0
|
||||||
|
READ_MERGES = 1
|
||||||
|
READ_SECTORS = 2
|
||||||
|
READ_TICKS = 3
|
||||||
|
WRITE_IOS = 4
|
||||||
|
WRITE_MERGES = 5
|
||||||
|
WRITE_SECTORS = 6
|
||||||
|
WRITE_TICKS = 7
|
||||||
|
IN_FLIGHT = 8
|
||||||
|
IO_TICKS = 9
|
||||||
|
TIME_IN_QUEUE = 10
|
||||||
|
DISCARD_IOS = 11
|
||||||
|
DISCARD_MERGES = 12
|
||||||
|
DISCARD_SECTORS = 13
|
||||||
|
DISCARD_TICS = 14
|
||||||
|
FLUSH_IOS = 15
|
||||||
|
FLUSH_TICKS = 16
|
||||||
|
|
||||||
|
def __init__( self, device:str ):
|
||||||
|
self._last : list[int] = []
|
||||||
|
self._stats : list[int] = []
|
||||||
|
self._device = device
|
||||||
|
self._readStats()
|
||||||
|
|
||||||
|
def _readStats( self ):
|
||||||
|
'''
|
||||||
|
Read the disk statistics. The stored statics in sysfs are stored as a single file
|
||||||
|
so that when the data is read, all of the stats correlate to the same time. The data
|
||||||
|
is from the time the device has come online.
|
||||||
|
|
||||||
|
last and set to the old version of the data, and the latest data is stored in stats
|
||||||
|
|
||||||
|
'''
|
||||||
|
try:
|
||||||
|
self._last = self._stats
|
||||||
|
with open( f"/sys/block/{self._device}/stat", "r") as f:
|
||||||
|
curStats = f.readline().strip().split(" ")
|
||||||
|
self._stats = [int(l) for l in curStats if l]
|
||||||
|
except Exception as e:
|
||||||
|
print( f"Failure reading disk statistics for {self._device} error {e}" )
|
||||||
|
|
||||||
|
def _getStats( self ) -> list[int]:
|
||||||
|
'''
|
||||||
|
Read the devices statistics from the device,and return it.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
An array containing all of the data colleected about the device.
|
||||||
|
'''
|
||||||
|
curData : list[int] = []
|
||||||
|
|
||||||
|
self._readStats()
|
||||||
|
if self._last == []:
|
||||||
|
curData = self._stats[:]
|
||||||
|
else:
|
||||||
|
curData = [ d-self._last[i] for i,d in enumerate( self._stats ) ]
|
||||||
|
return curData
|
||||||
|
|
||||||
|
def readAllStats( self ) -> list[int]:
|
||||||
|
'''
|
||||||
|
read all of the drive statisics from sysfs for the device.
|
||||||
|
|
||||||
|
Returns
|
||||||
|
A list of all of the device stats
|
||||||
|
'''
|
||||||
|
return self._getStats()
|
||||||
|
|
||||||
|
def readSectors( self )-> int:
|
||||||
|
return self._getStats()[DriveStats.READ_SECTORS]
|
||||||
|
|
||||||
|
def writeSectors( self ) -> int:
|
||||||
|
return self._getStats()[DriveStats.WRITE_SECTORS]
|
||||||
|
|
||||||
|
def discardSectors( self ) -> int:
|
||||||
|
return self._getStats()[DriveStats.DISCARD_SECTORS]
|
||||||
|
|
||||||
|
def readWriteSectors( self ) -> tuple[int,int]:
|
||||||
|
curData = self._getStats()
|
||||||
|
return (curData[DriveStats.READ_SECTORS],curData[DriveStats.WRITE_SECTORS])
|
||||||
|
|
||||||
|
|
||||||
|
class systemData:
|
||||||
|
def __init__( self, drive : str = 'nvme0n1' ):
|
||||||
|
self._drive = drive
|
||||||
|
self._cpuTemp = CPUTemperature()
|
||||||
|
self._stats = DriveStats( self._drive )
|
||||||
|
|
||||||
|
@property
|
||||||
|
def CPUTemperature(self) -> int:
|
||||||
|
return self._cpuTemp.temperature
|
||||||
|
|
||||||
|
@property
|
||||||
|
def fanSpeed( self ) -> int:
|
||||||
|
speed= 0
|
||||||
|
try:
|
||||||
|
command = os.popen( 'cat /sys/devices/platform/cooling_fan/hwmon/*/fan1_input' )
|
||||||
|
speed = int( command.read().strip())
|
||||||
|
except Exception as error:
|
||||||
|
print( f"Could not determine fan speed, error {error}" )
|
||||||
|
finally:
|
||||||
|
command.close()
|
||||||
|
|
||||||
|
return speed
|
||||||
|
|
||||||
|
@property
|
||||||
|
def driveTemp(self) -> float:
|
||||||
|
smartOutRaw = ""
|
||||||
|
cmd = f'sudo smartctl -A /dev/{self._drive}'
|
||||||
|
try:
|
||||||
|
command = os.popen( cmd )
|
||||||
|
smartOutRaw = command.read()
|
||||||
|
except Exception as error:
|
||||||
|
print( f"Could not launch {cmd} error is {error}" )
|
||||||
|
return 0.0
|
||||||
|
finally:
|
||||||
|
command.close()
|
||||||
|
|
||||||
|
smartOut = [ l for l in smartOutRaw.split('\n') if l]
|
||||||
|
for smartAttr in ["Temperature:","194","190"]:
|
||||||
|
try:
|
||||||
|
line = [l for l in smartOut if l.startswith(smartAttr)][0]
|
||||||
|
parts = [p for p in line.replace('\t',' ').split(' ') if p]
|
||||||
|
if smartAttr == "Temperature:":
|
||||||
|
return float(parts[1])
|
||||||
|
else:
|
||||||
|
return float(parts[0])
|
||||||
|
except IndexError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
return float(0.0)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def driveStats(self) -> tuple[float,float]:
|
||||||
|
data = self._stats.readWriteSectors()
|
||||||
|
readMB = (float(data[0]) * 512.0) #/ (1024.0 * 1024.0)
|
||||||
|
writeMB = (float(data[1]) * 512.0) #/ (1024.0 * 1024.0)
|
||||||
|
return (readMB, writeMB )
|
||||||
|
|
||||||
|
class CPULoad:
|
||||||
|
'''
|
||||||
|
A class to help with obtaining the CPU load of the system. If there is more information
|
||||||
|
needed, we can add to this.
|
||||||
|
|
||||||
|
Note:
|
||||||
|
This code automatically attempts to load the data from the system to initialize the
|
||||||
|
object with names, and an initial set of data.
|
||||||
|
|
||||||
|
This may result in th first actual call return some not very consistent values, for
|
||||||
|
the time period being observed, but that difference is minimal. In otherwords if we
|
||||||
|
the period of time being measured is 1 second, and it's been a minute since this class
|
||||||
|
was initialized, the first period reported will be CPU load over the minute, not 1 second,
|
||||||
|
and the second period reported will be for a second...
|
||||||
|
|
||||||
|
This is usually not an issue.
|
||||||
|
'''
|
||||||
|
def __init__( self ):
|
||||||
|
self._previousData : dict[str,tuple] = self._getRawData()
|
||||||
|
self._names : list[str] = []
|
||||||
|
for item in self._previousData:
|
||||||
|
self._names.append( item )
|
||||||
|
|
||||||
|
def _getRawData( self ) -> dict[str : tuple]:
|
||||||
|
'''
|
||||||
|
Obtain the raw CPU data from the system (located in /prop/stat), and
|
||||||
|
return just the cpu0 -> cpux values. No assumption is made on the number of
|
||||||
|
cpus.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
A dictionary is returned, the format is name = (total, idle). The total
|
||||||
|
time and idle time are use to determine the percent utilization of the system.
|
||||||
|
'''
|
||||||
|
result = {}
|
||||||
|
with open( "/proc/stat", "r") as f:
|
||||||
|
allLines = f.readlines()
|
||||||
|
for line in allLines:
|
||||||
|
cpu = line.replace('\t', ' ').strip().split()
|
||||||
|
if (len(cpu[0]) > 3) and (cpu[0][:3] == "cpu"):
|
||||||
|
total = 0
|
||||||
|
idle = 0
|
||||||
|
for i in range( 1, len(cpu)):
|
||||||
|
total += int(cpu[i])
|
||||||
|
if i == 4 or i == 5:
|
||||||
|
idle += int(cpu[i])
|
||||||
|
result[cpu[0]] = (total,idle)
|
||||||
|
return result
|
||||||
|
|
||||||
|
def getPercentages( self ) -> dict[ str : float ]:
|
||||||
|
'''
|
||||||
|
Obtain the percent CPU utilization of the system for a period of time.
|
||||||
|
|
||||||
|
This routine gets the current raw data from the system, and then performs
|
||||||
|
a delta from the prior time this function was called. This data is then run
|
||||||
|
through the following equation:
|
||||||
|
|
||||||
|
utilization = ((total - idle)/total) * 100
|
||||||
|
|
||||||
|
If the snapshots are taken at relativy consistent intervals, the CPU
|
||||||
|
utilization in percent, is reasonably lose to the actual percentage.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
A dictionary consisting of the name of the CPU, and a floating point
|
||||||
|
number representing the current utilization of that CPU.
|
||||||
|
'''
|
||||||
|
results = {}
|
||||||
|
current = self._getRawData()
|
||||||
|
for item in current:
|
||||||
|
total = current[item][0] - self._previousData[item][0]
|
||||||
|
idle = current[item][1] - self._previousData[item][1]
|
||||||
|
percent = ((total - idle)/total) * 100
|
||||||
|
results[item] = round(percent,2)
|
||||||
|
self._previousData = current
|
||||||
|
return results
|
||||||
|
|
||||||
|
@property
|
||||||
|
def cpuNames( self ) -> list[str]:
|
||||||
|
'''
|
||||||
|
Get a list of CPU names from the system.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
a list of strings
|
||||||
|
'''
|
||||||
|
return self._names
|
||||||
|
|
||||||
|
def __len__(self) -> int:
|
||||||
|
'''
|
||||||
|
handle getting the length (or count of CPU's).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Number of CPU's
|
||||||
|
'''
|
||||||
|
return len(self._previousData)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
data = systemData()
|
||||||
|
print( f"CPU Temp : {data.CPUTemperature}" )
|
||||||
|
print( f"Fan Speed: {data.fanSpeed}" )
|
||||||
|
print( f"NVME Temp: {data.driveTemp}" )
|
||||||
|
print( f"Stats : {data.driveStats}" )
|
||||||
|
|
||||||
|
load = CPULoad()
|
||||||
|
print( f"Number of CPU's = {len(load)}" )
|
||||||
|
for i in range(10):
|
||||||
|
time.sleep( 1 )
|
||||||
|
percentage : dict[str:float] = load.getPercentages()
|
||||||
|
print( f"percentage: {percentage}" )
|
||||||
|
for item in percentage:
|
||||||
|
print( f"{item} : {percentage[item]:.02f}" )
|
||||||
|
|
||||||
Reference in New Issue
Block a user