insert all files for the moment

This commit is contained in:
Jeff Curless
2025-07-11 20:53:43 -04:00
parent 6dd148b4e1
commit e85b5dbae4
6 changed files with 727 additions and 0 deletions

373
argondashboard.py Executable file
View File

@@ -0,0 +1,373 @@
#!/bin/python3
import time
import os
import sys
import signal
import curses
sys.path.append("/etc/argon/")
from argonsysinfo import *
from argonregister import *
############
# Constants
############
COLORPAIRID_DEFAULT=1
COLORPAIRID_LOGO=2
COLORPAIRID_DEFAULTINVERSE=3
COLORPAIRID_ALERT=4
COLORPAIRID_WARNING=5
COLORPAIRID_GOOD=6
INPUTREFRESHMS=100
DISPLAYREFRESHMS=5000
UPS_LOGFILE="/dev/shm/upslog.txt"
###################
# Display Elements
###################
def displaydatetime(stdscr):
try:
curtimenow = time.localtime()
stdscr.addstr(1, 1, time.strftime("%A", curtimenow), curses.color_pair(COLORPAIRID_DEFAULT))
stdscr.addstr(2, 1, time.strftime("%b %d,%Y", curtimenow), curses.color_pair(COLORPAIRID_DEFAULT))
stdscr.addstr(3, 1, time.strftime("%I:%M%p", curtimenow), curses.color_pair(COLORPAIRID_DEFAULT))
except:
pass
def displayipbattery(stdscr):
try:
displaytextright(stdscr,1, argonsysinfo_getip()+" ", COLORPAIRID_DEFAULT)
except:
pass
try:
status = ""
level = ""
outobj = {}
# Load status
fp = open(UPS_LOGFILE, "r")
logdata = fp.read()
alllines = logdata.split("\n")
ctr = 0
while ctr < len(alllines):
tmpval = alllines[ctr].strip()
curinfo = tmpval.split(":")
if len(curinfo) > 1:
tmpattrib = curinfo[0].lower().split(" ")
# The rest are assumed to be value
outobj[tmpattrib[0]] = tmpval[(len(curinfo[0])+1):].strip()
ctr = ctr + 1
# Map to data
try:
statuslist = outobj["power"].lower().split(" ")
if statuslist[0] == "battery":
tmp_charging = 0
else:
tmp_charging = 1
tmp_battery = int(statuslist[1].replace("%",""))
colorpairidx = COLORPAIRID_DEFAULT
if tmp_charging:
if tmp_battery > 99:
status="Plugged"
level=""
else:
status="Charging"
level=str(tmp_battery)+"%"
else:
status="Battery"
level=str(tmp_battery)+"%"
if tmp_battery <= 20:
colorpairidx = COLORPAIRID_ALERT
elif tmp_battery <= 50:
colorpairidx = COLORPAIRID_WARNING
else:
colorpairidx = COLORPAIRID_GOOD
displaytextright(stdscr,2, status+" ", colorpairidx)
displaytextright(stdscr,3, level+" ", colorpairidx)
except:
pass
except:
pass
def displayramcpu(stdscr, refcpu, rowstart, colstart):
curusage_b = argonsysinfo_getcpuusagesnapshot()
try:
outputlist = []
tmpraminfo = argonsysinfo_getram()
outputlist.append({"title": "ram ", "value": tmpraminfo[1]+" "+tmpraminfo[0]+" Free"})
for cpuname in refcpu:
if cpuname == "cpu":
continue
if refcpu[cpuname]["total"] == curusage_b[cpuname]["total"]:
outputlist.append({"title": cpuname, "value": "Loading"})
else:
total = curusage_b[cpuname]["total"]-refcpu[cpuname]["total"]
idle = curusage_b[cpuname]["idle"]-refcpu[cpuname]["idle"]
outputlist.append({"title": cpuname, "value": str(int(100*(total-idle)/(total)))+"% Used"})
displaytitlevaluelist(stdscr, rowstart, colstart, outputlist)
except:
pass
return curusage_b
def displaytempfan(stdscr, rowstart, colstart):
print( f"displaytempfan( {stdscr}, {rowstart}, {colstart} )" )
try:
outputlist = []
try:
if bus is not None:
fanspeed = argonregister_getfanspeed(bus)
fanspeedstr = "Off"
if fanspeed > 0:
fanspeedstr = str(fanspeed)+"%"
outputlist.append({"title": "Fan ", "value": fanspeedstr})
except:
outputlist.append( {"title":"Fan ", "value": 'Error!'})
pass
# Todo load from config
temperature = "C"
hddtempctr = 0
maxcval = 0
mincval = 200
# Get min/max of hdd temp
hddtempobj = argonsysinfo_gethddtemp()
for curdev in hddtempobj:
if hddtempobj[curdev] < mincval:
mincval = hddtempobj[curdev]
if hddtempobj[curdev] > maxcval:
maxcval = hddtempobj[curdev]
hddtempctr = hddtempctr + 1
cpucval = argonsysinfo_getcputemp()
if hddtempctr > 0:
alltempobj = {"cpu": cpucval,"hdd min": mincval, "hdd max": maxcval}
# Update max C val to CPU Temp if necessary
if maxcval < cpucval:
maxcval = cpucval
displayrowht = 8
displayrow = 8
for curdev in alltempobj:
if temperature == "C":
# Celsius
tmpstr = str(alltempobj[curdev])
if len(tmpstr) > 4:
tmpstr = tmpstr[0:4]
else:
# Fahrenheit
tmpstr = str(32+9*(alltempobj[curdev])/5)
if len(tmpstr) > 5:
tmpstr = tmpstr[0:5]
if len(curdev) <= 3:
outputlist.append({"title": curdev.upper(), "value": tmpstr +temperature})
else:
outputlist.append({"title": curdev.upper(), "value": tmpstr +temperature})
else:
maxcval = cpucval
if temperature == "C":
# Celsius
tmpstr = str(cpucval)
if len(tmpstr) > 4:
tmpstr = tmpstr[0:4]
else:
# Fahrenheit
tmpstr = str(32+9*(cpucval)/5)
if len(tmpstr) > 5:
tmpstr = tmpstr[0:5]
outputlist.append({"title": "Temp", "value": tmpstr +temperature})
displaytitlevaluelist(stdscr, rowstart, colstart, outputlist)
except:
pass
def displaystorage(stdscr, rowstart, colstart):
try:
outputlist = []
tmpobj = argonsysinfo_listhddusage()
for curdev in tmpobj:
outputlist.append({"title": curdev, "value": argonsysinfo_kbstr(tmpobj[curdev]['total'])+ " "+ str(int(100*tmpobj[curdev]['used']/tmpobj[curdev]['total']))+"% Used" })
displaytitlevaluelist(stdscr, rowstart, colstart, outputlist)
except:
pass
##################
# Helpers
##################
# Initialize I2C Bus
bus = argonregister_initializebusobj()
def handle_resize(signum, frame):
# TODO: Not working?
curses.update_lines_cols()
# Ideally redraw here
def displaytitlevaluelist(stdscr, rowstart, leftoffset, curlist):
rowidx = rowstart
while rowidx < curses.LINES and len(curlist) > 0:
curline = ""
tmpitem = curlist.pop(0)
curline = tmpitem["title"]+": "+str(tmpitem["value"])
stdscr.addstr(rowidx, leftoffset, curline)
rowidx = rowidx + 1
def displaytextcentered(stdscr, rownum, strval, colorpairidx = COLORPAIRID_DEFAULT):
leftoffset = 0
numchars = len(strval)
if numchars < 1:
return
elif (numchars > curses.COLS):
leftoffset = 0
strval = strval[0:curses.COLS]
else:
leftoffset = (curses.COLS - numchars)>>1
stdscr.addstr(rownum, leftoffset, strval, curses.color_pair(colorpairidx))
def displaytextright(stdscr, rownum, strval, colorpairidx = COLORPAIRID_DEFAULT):
leftoffset = 0
numchars = len(strval)
if numchars < 1:
return
elif (numchars > curses.COLS):
leftoffset = 0
strval = strval[0:curses.COLS]
else:
leftoffset = curses.COLS - numchars
stdscr.addstr(rownum, leftoffset, strval, curses.color_pair(colorpairidx))
def displaylinebreak(stdscr, rownum, colorpairidx = COLORPAIRID_DEFAULTINVERSE):
strval = " "
while len(strval) < curses.COLS:
strval = strval + " "
stdscr.addstr(rownum, 0, strval, curses.color_pair(colorpairidx))
##################
# Main Loop
##################
def mainloop(stdscr):
try:
# Set up signal handler
signal.signal(signal.SIGWINCH, handle_resize)
maxloopctr = int(DISPLAYREFRESHMS/INPUTREFRESHMS)
sleepsecs = INPUTREFRESHMS/1000
loopctr = maxloopctr
loopmode = True
stdscr = curses.initscr()
# Turn off echoing of keys, and enter cbreak mode,
# where no buffering is performed on keyboard input
curses.noecho()
curses.cbreak()
curses.curs_set(0)
curses.start_color()
#curses.COLOR_BLACK
#curses.COLOR_BLUE
#curses.COLOR_CYAN
#curses.COLOR_GREEN
#curses.COLOR_MAGENTA
#curses.COLOR_RED
#curses.COLOR_WHITE
#curses.COLOR_YELLOW
curses.init_pair(COLORPAIRID_DEFAULT, curses.COLOR_WHITE, curses.COLOR_BLACK)
curses.init_pair(COLORPAIRID_LOGO, curses.COLOR_WHITE, curses.COLOR_RED)
curses.init_pair(COLORPAIRID_DEFAULTINVERSE, curses.COLOR_BLACK, curses.COLOR_WHITE)
curses.init_pair(COLORPAIRID_ALERT, curses.COLOR_RED, curses.COLOR_BLACK)
curses.init_pair(COLORPAIRID_WARNING, curses.COLOR_YELLOW, curses.COLOR_BLACK)
curses.init_pair(COLORPAIRID_GOOD, curses.COLOR_GREEN, curses.COLOR_BLACK)
stdscr.nodelay(True)
refcpu = argonsysinfo_getcpuusagesnapshot()
while True:
try:
key = stdscr.getch()
# if key == ord('x') or key == ord('X'):
# Any key
if key > 0:
break
except curses.error:
# No key was pressed
pass
loopctr = loopctr + 1
if loopctr >= maxloopctr:
loopctr = 0
# Screen refresh loop
# Clear screen
stdscr.clear()
displaytextcentered(stdscr, 0, " ", COLORPAIRID_LOGO)
displaytextcentered(stdscr, 1, " Argon40 Dashboard ", COLORPAIRID_LOGO)
displaytextcentered(stdscr, 2, " ", COLORPAIRID_LOGO)
displaytextcentered(stdscr, 3, "Press any key to close")
displaylinebreak(stdscr, 5)
# Display Elements
displaydatetime(stdscr)
displayipbattery(stdscr)
# Data Columns
rowstart = 7
colstart = 20
refcpu = displayramcpu(stdscr, refcpu, rowstart, colstart)
displaystorage(stdscr, rowstart, colstart+30)
displaytempfan(stdscr, rowstart, colstart+60)
# Main refresh even
stdscr.refresh()
time.sleep(sleepsecs)
except Exception as initerr:
pass
##########
# Cleanup
##########
try:
curses.curs_set(1)
curses.echo()
curses.nocbreak()
curses.endwin()
except Exception as closeerr:
pass
curses.wrapper(mainloop)

43
fioscript/fio-ran-r4k.job Normal file
View File

@@ -0,0 +1,43 @@
[global]
name=fio-rand-r
filename=readdatafile
rw=randread
bs=4K
direct=1
time_based=1
runtime=300
group_reporting=1
ioengine=libaio
[job1]
size=10g
iodepth=64
[job2]
size=10g
iodepth=64
[job3]
size=10g
iodepth=64
[job4]
size=10g
iodepth=64
[job5]
size=10g
iodepth=64
[job6]
size=10g
iodepth=64
[job7]
size=10g
iodepth=64
[job8]
size=10g
iodepth=64

42
fioscript/fio-ran-w4k.job Normal file
View File

@@ -0,0 +1,42 @@
[global]
name=fio-rand-w
filename=writedatafile
rw=randwrite
bs=4K
direct=1
time_based=1
runtime=300
group_reporting=1
ioengine=libaio
[job1]
size=10g
iodepth=64
[job2]
size=10g
iodepth=64
[job3]
size=10g
iodepth=64
[job4]
size=10g
iodepth=64
[job5]
size=10g
iodepth=64
[job6]
size=10g
iodepth=64
[job7]
size=10g
iodepth=64
[job8]
size=10g
iodepth=64

43
fioscript/fio-seq-r4k.job Normal file
View File

@@ -0,0 +1,43 @@
[global]
name=fio-seq-r
filename=readdatafile
rw=read
bs=4K
direct=1
time_based=1
runtime=300
group_reporting=1
ioengine=libaio
[job1]
size=10g
iodepth=64
[job2]
size=10g
iodepth=64
[job3]
size=10g
iodepth=64
[job4]
size=10g
iodepth=64
[job5]
size=10g
iodepth=64
[job6]
size=10g
iodepth=64
[job7]
size=10g
iodepth=64
[job8]
size=10g
iodepth=64

42
fioscript/fio-seq-w4k.job Normal file
View File

@@ -0,0 +1,42 @@
[global]
name=fio-seq-w
filename=writedatafile
rw=write
bs=4K
direct=1
time_based=1
runtime=300
group_reporting=1
ioengine=libaio
[job1]
size=10g
iodepth=64
[job2]
size=10g
iodepth=64
[job3]
size=10g
iodepth=64
[job4]
size=10g
iodepth=64
[job5]
size=10g
iodepth=64
[job6]
size=10g
iodepth=64
[job7]
size=10g
iodepth=64
[job8]
size=10g
iodepth=64

184
monitor1up.py Executable file
View File

@@ -0,0 +1,184 @@
#!/usr/bin/python3
#
# Setup environment and pull in all of the items we need from gpiozero. The
# gpiozero library is the new one that supports Raspberry PI 5's (and I suspect
# will be new direction for all prior version the RPIi.)
#
from gpiozero import CPUTemperature
import time
import os
class DriveStat:
'''
DriveStat class -
This class gets the drive statistics from sysfs for the device passed
in. There are several statistics can can be obtained. Note that since
all of the data is pulled at the same time, it is upto the caller to
make sure all the stats needed are obtained at the same time.
See: https://www.kernel.org/doc/html/latest/block/stat.html
Parameters:
device - the name of the device to track
'''
READ_IOS = 0
READ_MERGES = 1
READ_SECTORS = 2
READ_TICKS = 3
WRITE_IOS = 4
WRITE_MERGES = 5
WRITE_SECTORS = 6
WRITE_TICKS = 7
IN_FLIGHT = 8
IO_TICKS = 9
TIME_IN_QUEUE = 10
DISCARD_IOS = 11
DISCARD_MERGES = 12
DISCARD_SECTORS = 13
DISCARD_TICS = 14
FLUSH_IOS = 15
FLUSH_TICKS = 16
def __init__( self, device:str ):
self.last = []
self.stats = []
self.device = device
self._readStats()
def _readStats( self ):
'''
Read the disk statistics. The stored statics in sysfs are stored as a single file
so that when the data is read, all of the stats correlate to the same time. The data
is from the time the device has come online.
last and set to the old version of the data, and the latest data is stored in stats
'''
try:
self.last = self.stats
with open( f"/sys/block/{self.device}/stat", "r") as f:
stats = f.readline().strip().split(" ")
self.stats = [int(l) for l in stats if l]
except Exception as e:
print( f"Failure reading disk statistics for {device} error {e}" )
def _getStats( self ) -> list[int]:
'''
Read the devices statistics from the device,and return it.
Returns:
An array containing all of the data colleected about the device.
'''
self._readStats()
if self.last == []:
data = self.stats[:]
else:
data = [ d-self.last[i] for i,d in enumerate( self.stats ) ]
return data
def readAllStats( self ) -> list[int]:
'''
read all of the drive statisics from sysfs for the device.
Returns
A list of all of the device stats
'''
return self._getStats()
def readSectors( self )-> int:
return self._getStats()[DriveStat.READ_SECTORS]
def writeSectors( self ) -> int:
return self._getStats()[DriveStat.WRITE_SECTORS]
def discardSectors( self ) -> int:
return self._getStats()[DriveStat.DISCARD_SECTORS]
def readWriteSectors( self ) -> (int,int):
data = self._getStats()
return (data[DriveStat.READ_SECTORS],data[DriveStat.WRITE_SECTORS])
def setupTemperatureObject():
'''
Get a cpu temperature object, and set the min and max range.
When the ranges are set to the non-default values, if the temperature is
less than min_temp we get 0, and when the temperature reaches the max we get
a value of 1. This value can be used directly as a duty cycle for the fan.
Return:
A CPU temperature object
'''
cpuTemp = None
try:
cpuTemp = CPUTemperature()
except Exception as error:
log.error( f"Error creating CPU temperature object, error is {error}" )
return cpuTemp
def getFanSpeed() -> int:
'''
Obtain the speed of the fan attached to the CPU. This is accomplished reading
the information from sysfs.
NOTE: There is an assumption that the fan s hanging off of /hwmon/hwmon3. This may
or may not be the case in all situations.
'''
fanSpeed = 0
try:
command = os.popen( 'cat /sys/devices/platform/cooling_fan/hwmon/*/fan1_input' )
fanSpeed = command.read().strip()
except:
pass
return int(fanSpeed)
def getNVMETemp(device : str) -> float:
'''
Obtain the temperature of the device passed in, using smartctl.
Parameters :
device - A string containing the device name.
Returns:
The temperature as a float
'''
smartOutRaw = ""
try:
command = os.popen( f'smartctl -A /dev/{device}' )
smartOutRaw = command.read()
except Exception as e:
return 0
finally:
command.close()
smartOut = [ l for l in smartOutRaw.split('\n') if l]
for smartAttr in ["Temperature:","194","190"]:
try:
line = [l for l in smartOut if l.startswith(smartAttr)][0]
parts = [p for p in line.replace('\t',' ').split(' ') if p]
if smartAttr == "Temperature:":
return float(parts[1])
else:
return float(parts[0])
except IndexError:
pass
return float(0.0)
stats = DriveStat( 'nvme0n1' )
cpuTemp = setupTemperatureObject()
while True:
print( f"CPU : {cpuTemp.temperature}" )
print( f"Fan : {getFanSpeed()}" )
print( f"NVME : {getNVMETemp('nvme0n1')}" )
data = stats.readWriteSectors()
print( f"Read : {data[0]*512}" )
print( f"Write: {data[1] *512}" )
time.sleep( 1 )