implement forced password change for generated passwords
This commit is contained in:
@ -10,33 +10,36 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Authentication(object):
|
||||
def isAuthenticated(self, request):
|
||||
def getUser(self, request):
|
||||
if "owrx-session" not in request.cookies:
|
||||
return False
|
||||
return None
|
||||
session = SessionStorage.getSharedInstance().getSession(request.cookies["owrx-session"].value)
|
||||
if session is None:
|
||||
return False
|
||||
return None
|
||||
if "user" not in session:
|
||||
return False
|
||||
return None
|
||||
userList = UserList.getSharedInstance()
|
||||
try:
|
||||
user = userList[session["user"]]
|
||||
return user.is_enabled()
|
||||
return userList[session["user"]]
|
||||
except KeyError:
|
||||
return False
|
||||
return None
|
||||
|
||||
|
||||
class AdminController(WebpageController):
|
||||
def __init__(self, handler, request, options):
|
||||
self.authentication = Authentication()
|
||||
self.user = self.authentication.getUser(request)
|
||||
super().__init__(handler, request, options)
|
||||
|
||||
def isAuthorized(self):
|
||||
return self.user is not None and self.user.is_enabled() and not self.user.must_change_password
|
||||
|
||||
def handle_request(self):
|
||||
config = Config.get()
|
||||
if "webadmin_enabled" not in config or not config["webadmin_enabled"]:
|
||||
self.send_response("Web Admin is disabled", code=403)
|
||||
return
|
||||
if self.authentication.isAuthenticated(self.request):
|
||||
if self.isAuthorized():
|
||||
super().handle_request()
|
||||
else:
|
||||
target = "/login?{0}".format(parse.urlencode({"ref": self.request.path}))
|
||||
|
23
owrx/controllers/profile.py
Normal file
23
owrx/controllers/profile.py
Normal file
@ -0,0 +1,23 @@
|
||||
from owrx.controllers.admin import AdminController
|
||||
from owrx.users import UserList, DefaultPasswordClass
|
||||
from urllib.parse import parse_qs
|
||||
|
||||
|
||||
class ProfileController(AdminController):
|
||||
def isAuthorized(self):
|
||||
return self.user is not None and self.user.is_enabled() and self.user.must_change_password
|
||||
|
||||
def indexAction(self):
|
||||
self.serve_template("pwchange.html", **self.template_variables())
|
||||
|
||||
def processPwChange(self):
|
||||
data = parse_qs(self.get_body().decode("utf-8"))
|
||||
data = {k: v[0] for k, v in data.items()}
|
||||
userlist = UserList.getSharedInstance()
|
||||
if "password" in data and "confirm" in data and data["password"] == data["confirm"]:
|
||||
self.user.setPassword(DefaultPasswordClass(data["password"]), must_change_password=False)
|
||||
userlist.store()
|
||||
target = self.request.query["ref"][0] if "ref" in self.request.query else "/settings"
|
||||
else:
|
||||
target = "/pwchange"
|
||||
self.send_redirect(target)
|
@ -1,5 +1,5 @@
|
||||
from .template import WebpageController
|
||||
from urllib.parse import parse_qs
|
||||
from urllib.parse import parse_qs, urlencode
|
||||
from uuid import uuid4
|
||||
from http.cookies import SimpleCookie
|
||||
from owrx.users import UserList
|
||||
@ -51,6 +51,8 @@ class SessionController(WebpageController):
|
||||
cookie = SimpleCookie()
|
||||
cookie["owrx-session"] = key
|
||||
target = self.request.query["ref"][0] if "ref" in self.request.query else "/settings"
|
||||
if user.must_change_password:
|
||||
target = "/pwchange?{0}".format(urlencode({"ref": target}))
|
||||
self.send_redirect(target, cookies=cookie)
|
||||
return
|
||||
self.send_redirect("/login")
|
||||
|
@ -6,6 +6,7 @@ from owrx.controllers.api import ApiController
|
||||
from owrx.controllers.metrics import MetricsController
|
||||
from owrx.controllers.settings import SettingsController, GeneralSettingsController, SdrSettingsController
|
||||
from owrx.controllers.session import SessionController
|
||||
from owrx.controllers.profile import ProfileController
|
||||
from http.server import BaseHTTPRequestHandler
|
||||
from urllib.parse import urlparse, parse_qs
|
||||
import re
|
||||
@ -109,6 +110,8 @@ class Router(object):
|
||||
StaticRoute("/login", SessionController, options={"action": "loginAction"}),
|
||||
StaticRoute("/login", SessionController, method="POST", options={"action": "processLoginAction"}),
|
||||
StaticRoute("/logout", SessionController, options={"action": "logoutAction"}),
|
||||
StaticRoute("/pwchange", ProfileController),
|
||||
StaticRoute("/pwchange", ProfileController, method="POST", options={"action": "processPwChange"}),
|
||||
]
|
||||
|
||||
def find_route(self, request):
|
||||
|
@ -56,7 +56,7 @@ class CleartextPassword(Password):
|
||||
class HashedPassword(Password):
|
||||
def __init__(self, pwinfo, algorithm="sha256"):
|
||||
self.iterations = 100000
|
||||
if (isinstance(pwinfo, str)):
|
||||
if isinstance(pwinfo, str):
|
||||
self._createFromString(pwinfo, algorithm)
|
||||
else:
|
||||
self._loadFromDict(pwinfo)
|
||||
@ -91,20 +91,30 @@ DefaultPasswordClass = HashedPassword
|
||||
|
||||
|
||||
class User(object):
|
||||
def __init__(self, name: str, enabled: bool, password: Password):
|
||||
def __init__(self, name: str, enabled: bool, password: Password, must_change_password: bool = False):
|
||||
self.name = name
|
||||
self.enabled = enabled
|
||||
self.password = password
|
||||
self.must_change_password = must_change_password
|
||||
|
||||
def toJson(self):
|
||||
return {
|
||||
"user": self.name,
|
||||
"enabled": self.enabled,
|
||||
"must_change_password": self.must_change_password,
|
||||
"password": self.password.toJson()
|
||||
}
|
||||
|
||||
def setPassword(self, password: Password):
|
||||
@staticmethod
|
||||
def fromJson(d):
|
||||
if "user" in d and "password" in d and "enabled" in d:
|
||||
mcp = d["must_change_password"] if "must_change_password" in d else False
|
||||
return User(d["user"], d["enabled"], Password.from_dict(d["password"]), mcp)
|
||||
|
||||
def setPassword(self, password: Password, must_change_password: bool = None):
|
||||
self.password = password
|
||||
if must_change_password is not None:
|
||||
self.must_change_password = must_change_password
|
||||
|
||||
def is_enabled(self):
|
||||
return self.enabled
|
||||
@ -150,7 +160,7 @@ class UserList(object):
|
||||
with open(usersFile, "r") as f:
|
||||
users_json = json.load(f)
|
||||
|
||||
users = {u.name: u for u in [self._jsonToUser(d) for d in users_json]}
|
||||
users = {u.name: u for u in [User.fromJson(d) for d in users_json]}
|
||||
self.file_modified = modified
|
||||
return users
|
||||
except FileNotFoundError:
|
||||
@ -162,10 +172,6 @@ class UserList(object):
|
||||
logger.exception("error while processing users from %s", usersFile)
|
||||
return {}
|
||||
|
||||
def _jsonToUser(self, d):
|
||||
if "user" in d and "password" in d and "enabled" in d:
|
||||
return User(d["user"], d["enabled"], Password.from_dict(d["password"]))
|
||||
|
||||
def _userToJson(self, u):
|
||||
return u.toJson()
|
||||
|
||||
|
Reference in New Issue
Block a user