merge openwebrx-admin into openwebrx

This commit is contained in:
Jakob Ketterl
2021-04-24 19:39:48 +02:00
parent 6ad3a80fc6
commit 71b0fa968b
11 changed files with 43 additions and 32 deletions

View File

@ -10,7 +10,9 @@ from owrx.websocket import WebSocketConnection
from owrx.reporting import ReportingEngine
from owrx.version import openwebrx_version
from owrx.audio.queue import DecoderQueue
from owrx.admin import add_admin_parser, run_admin_action
import signal
import argparse
import logging
@ -31,6 +33,26 @@ def handleSignal(sig, frame):
def main():
parser = argparse.ArgumentParser(description="OpenWebRX - Open Source SDR Web App for Everyone!")
parser.add_argument("-v", "--version", action="store_true", help="Show the software version")
moduleparser = parser.add_subparsers(title="Modules", dest="module")
adminparser = moduleparser.add_parser("admin", help="OpenWebRX admin actions")
add_admin_parser(adminparser)
args = parser.parse_args()
if args.version:
print("OpenWebRX version {version}".format(version=openwebrx_version))
elif args.module == "admin":
# override loglevel for admin commands, they shouldn't be that verbose
logging.basicConfig(level=logging.INFO, force=True)
run_admin_action(adminparser, args)
else:
start_receiver()
def start_receiver():
print(
"""

59
owrx/admin/__init__.py Normal file
View File

@ -0,0 +1,59 @@
from owrx.admin.commands import NewUser, DeleteUser, ResetPassword, ListUsers, DisableUser, EnableUser, HasUser
import sys
import traceback
def add_admin_parser(moduleparser):
subparsers = moduleparser.add_subparsers(title="Commands", dest="command")
adduser_parser = subparsers.add_parser("adduser", help="Add a new user")
adduser_parser.add_argument("user", help="Username to be added")
adduser_parser.set_defaults(cls=NewUser)
removeuser_parser = subparsers.add_parser("removeuser", help="Remove an existing user")
removeuser_parser.add_argument("user", help="Username to be remvoed")
removeuser_parser.set_defaults(cls=DeleteUser)
resetpassword_parser = subparsers.add_parser("resetpassword", help="Reset a user's password")
resetpassword_parser.add_argument("user", help="Username to be remvoed")
resetpassword_parser.set_defaults(cls=ResetPassword)
listusers_parser = subparsers.add_parser("listusers", help="List enabled users")
listusers_parser.add_argument("-a", "--all", action="store_true", help="Show all users (including disabled ones)")
listusers_parser.set_defaults(cls=ListUsers)
disableuser_parser = subparsers.add_parser("disableuser", help="Disable a user")
disableuser_parser.add_argument("user", help="Username to be disabled")
disableuser_parser.set_defaults(cls=DisableUser)
enableuser_parser = subparsers.add_parser("enableuser", help="Enable a user")
enableuser_parser.add_argument("user", help="Username to be enabled")
enableuser_parser.set_defaults(cls=EnableUser)
hasuser_parser = subparsers.add_parser("hasuser", help="Test if a user exists")
hasuser_parser.add_argument("user", help="Username to be checked")
hasuser_parser.set_defaults(cls=HasUser)
moduleparser.add_argument(
"--noninteractive", action="store_true", help="Don't ask for any user input (useful for automation)"
)
moduleparser.add_argument("--silent", action="store_true", help="Ignore errors (useful for automation)")
def run_admin_action(parser, args):
if hasattr(args, "cls"):
command = args.cls()
else:
if not args.silent:
parser.print_help()
sys.exit(1)
sys.exit(0)
try:
command.run(args)
except Exception:
if not args.silent:
print("Error running command:")
traceback.print_exc()
sys.exit(1)
sys.exit(0)

115
owrx/admin/commands.py Normal file
View File

@ -0,0 +1,115 @@
from abc import ABC, ABCMeta, abstractmethod
from getpass import getpass
from owrx.users import UserList, User, DefaultPasswordClass
import sys
import random
import string
import os
class Command(ABC):
@abstractmethod
def run(self, args):
pass
class UserCommand(Command, metaclass=ABCMeta):
def getPassword(self, args, username):
if args.noninteractive:
if "OWRX_PASSWORD" in os.environ:
password = os.environ["OWRX_PASSWORD"]
generated = False
else:
print("Generating password for user {username}...".format(username=username))
password = self.getRandomPassword()
generated = True
print('Password for {username} is "{password}".'.format(username=username, password=password))
print('This password is suitable for initial setup only, you will be asked to reset it on initial use.')
print('This password cannot be recovered from the system, please copy it now.')
else:
password = getpass("Please enter the new password for {username}: ".format(username=username))
confirm = getpass("Please confirm the new password: ")
if password != confirm:
print("ERROR: Password mismatch.")
sys.exit(1)
generated = False
return password, generated
def getRandomPassword(self, length=10):
printable = list(string.ascii_letters) + list(string.digits)
return ''.join(random.choices(printable, k=length))
class NewUser(UserCommand):
def run(self, args):
username = args.user
userList = UserList()
# early test to bypass the password stuff if the user already exists
if username in userList:
raise KeyError("User {username} already exists".format(username=username))
password, generated = self.getPassword(args, username)
print("Creating user {username}...".format(username=username))
user = User(name=username, enabled=True, password=DefaultPasswordClass(password), must_change_password=generated)
userList.addUser(user)
class DeleteUser(UserCommand):
def run(self, args):
username = args.user
print("Deleting user {username}...".format(username=username))
userList = UserList()
userList.deleteUser(username)
class ResetPassword(UserCommand):
def run(self, args):
username = args.user
password, generated = self.getPassword(args, username)
userList = UserList()
userList[username].setPassword(DefaultPasswordClass(password), must_change_password=generated)
# this is a change to an object in the list, not the list itself
# in this case, store() is explicit
userList.store()
class DisableUser(UserCommand):
def run(self, args):
username = args.user
userList = UserList()
userList[username].disable()
userList.store()
class EnableUser(UserCommand):
def run(self, args):
username = args.user
userList = UserList()
userList[username].enable()
userList.store()
class ListUsers(Command):
def run(self, args):
userList = UserList()
print("List of enabled users:")
for u in userList.values():
if args.all or u.enabled:
print(" {name}".format(name=u.name))
class HasUser(Command):
"""
internal command used by the debian config scripts to test if the admin user has already been created
"""
def run(self, args):
userList = UserList()
if args.user in userList:
if not args.silent:
print('User "{name}" exists.'.format(name=args.user))
else:
if not args.silent:
print('User "{name}" does not exist.'.format(name=args.user))
# in bash, a return code > 0 is interpreted as "false"
sys.exit(1)