include command to create a user
This commit is contained in:
@ -1,4 +1,5 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from owrx.config import CoreConfig
|
||||
import json
|
||||
|
||||
import logging
|
||||
@ -19,17 +20,32 @@ class Password(ABC):
|
||||
return CleartextPassword(d)
|
||||
raise PasswordException("invalid passord encoding: {0}".format(d["type"]))
|
||||
|
||||
def __init__(self, pwinfo: dict):
|
||||
self.pwinfo = pwinfo
|
||||
|
||||
@abstractmethod
|
||||
def is_valid(self, inp: str):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def toJson(self):
|
||||
pass
|
||||
|
||||
|
||||
class CleartextPassword(Password):
|
||||
def __init__(self, pwinfo):
|
||||
if isinstance(pwinfo, str):
|
||||
self._value = pwinfo
|
||||
elif isinstance(pwinfo, dict):
|
||||
self._value = pwinfo["value"]
|
||||
else:
|
||||
raise ValueError("invalid argument to ClearTextPassword()")
|
||||
|
||||
def is_valid(self, inp: str):
|
||||
return self.pwinfo["value"] == inp
|
||||
return self._value == inp
|
||||
|
||||
def toJson(self):
|
||||
return {
|
||||
"encoding": "string",
|
||||
"value": self._value
|
||||
}
|
||||
|
||||
|
||||
class User(object):
|
||||
@ -38,6 +54,13 @@ class User(object):
|
||||
self.enabled = enabled
|
||||
self.password = password
|
||||
|
||||
def toJson(self):
|
||||
return {
|
||||
"user": self.name,
|
||||
"enabled": self.enabled,
|
||||
"password": self.password.toJson()
|
||||
}
|
||||
|
||||
|
||||
class UserList(object):
|
||||
sharedInstance = None
|
||||
@ -51,30 +74,53 @@ class UserList(object):
|
||||
def __init__(self):
|
||||
self.users = self._loadUsers()
|
||||
|
||||
def _getUsersFile(self):
|
||||
config = CoreConfig()
|
||||
return "{data_directory}/users.json".format(data_directory=config.get_data_directory())
|
||||
|
||||
def _loadUsers(self):
|
||||
for file in ["/etc/openwebrx/users.json", "users.json"]:
|
||||
try:
|
||||
f = open(file, "r")
|
||||
usersFile = self._getUsersFile()
|
||||
try:
|
||||
with open(usersFile, "r") as f:
|
||||
users_json = json.load(f)
|
||||
f.close()
|
||||
|
||||
return {u.name: u for u in [self.buildUser(d) for d in users_json]}
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
except json.JSONDecodeError:
|
||||
logger.exception("error while parsing users file %s", file)
|
||||
return {}
|
||||
except Exception:
|
||||
logger.exception("error while processing users from %s", file)
|
||||
return {}
|
||||
return {}
|
||||
return {u.name: u for u in [self._jsonToUser(d) for d in users_json]}
|
||||
except FileNotFoundError:
|
||||
return {}
|
||||
except json.JSONDecodeError:
|
||||
logger.exception("error while parsing users file %s", usersFile)
|
||||
return {}
|
||||
except Exception:
|
||||
logger.exception("error while processing users from %s", usersFile)
|
||||
return {}
|
||||
|
||||
def buildUser(self, d):
|
||||
def _jsonToUser(self, d):
|
||||
if "user" in d and "password" in d and "enabled" in d:
|
||||
return User(d["user"], d["enabled"], Password.from_dict(d["password"]))
|
||||
|
||||
def _userToJson(self, u):
|
||||
return u.toJson()
|
||||
|
||||
def _store(self):
|
||||
usersFile = self._getUsersFile()
|
||||
users = [u.toJson() for u in self.users.values()]
|
||||
try:
|
||||
with open(usersFile, "w") as f:
|
||||
json.dump(users, f, indent=4)
|
||||
except Exception:
|
||||
logger.exception("error while writing users file %s", usersFile)
|
||||
|
||||
def addUser(self, user: User):
|
||||
self[user.name] = user
|
||||
|
||||
def __getitem__(self, item):
|
||||
return self.users[item]
|
||||
|
||||
def __contains__(self, item):
|
||||
return item in self.users
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
if key in self.users:
|
||||
raise KeyError("User {user} already exists".format(user=key))
|
||||
self.users[key] = value
|
||||
self._store()
|
||||
|
Reference in New Issue
Block a user