238 lines
		
	
	
		
			6.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			238 lines
		
	
	
		
			6.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from abc import ABC, abstractmethod
 | |
| from owrx.config.core import CoreConfig
 | |
| from datetime import datetime, timezone
 | |
| import json
 | |
| import hashlib
 | |
| import os
 | |
| import stat
 | |
| 
 | |
| import logging
 | |
| 
 | |
| logger = logging.getLogger(__name__)
 | |
| 
 | |
| 
 | |
| class PasswordException(Exception):
 | |
|     pass
 | |
| 
 | |
| 
 | |
| class Password(ABC):
 | |
|     @staticmethod
 | |
|     def from_dict(d: dict):
 | |
|         if "encoding" not in d:
 | |
|             raise PasswordException("password encoding not set")
 | |
|         if d["encoding"] == "string":
 | |
|             return CleartextPassword(d)
 | |
|         elif d["encoding"] == "hash":
 | |
|             return HashedPassword(d)
 | |
|         raise PasswordException("invalid passord encoding: {0}".format(d["type"]))
 | |
| 
 | |
|     @abstractmethod
 | |
|     def is_valid(self, inp: str) -> bool:
 | |
|         pass
 | |
| 
 | |
|     @abstractmethod
 | |
|     def toJson(self) -> dict:
 | |
|         pass
 | |
| 
 | |
| 
 | |
| class CleartextPassword(Password):
 | |
|     def __init__(self, pwinfo):
 | |
|         if isinstance(pwinfo, str):
 | |
|             self._value = pwinfo
 | |
|         elif isinstance(pwinfo, dict):
 | |
|             self._value = pwinfo["value"]
 | |
|         else:
 | |
|             raise ValueError("invalid argument to ClearTextPassword()")
 | |
| 
 | |
|     def is_valid(self, inp: str) -> bool:
 | |
|         return self._value == inp
 | |
| 
 | |
|     def toJson(self) -> dict:
 | |
|         return {
 | |
|             "encoding": "string",
 | |
|             "value": self._value
 | |
|         }
 | |
| 
 | |
| 
 | |
| class HashedPassword(Password):
 | |
|     def __init__(self, pwinfo, algorithm="sha256"):
 | |
|         self.iterations = 100000
 | |
|         if isinstance(pwinfo, str):
 | |
|             self._createFromString(pwinfo, algorithm)
 | |
|         else:
 | |
|             self._loadFromDict(pwinfo)
 | |
| 
 | |
|     def _createFromString(self, pw: str, algorithm: str):
 | |
|         self._algorithm = algorithm
 | |
|         self._salt = os.urandom(32)
 | |
|         dk = hashlib.pbkdf2_hmac(self._algorithm, pw.encode(), self._salt, self.iterations)
 | |
|         self._hash = dk.hex()
 | |
|         pass
 | |
| 
 | |
|     def _loadFromDict(self, d: dict):
 | |
|         self._hash = d["value"]
 | |
|         self._algorithm = d["algorithm"]
 | |
|         self._salt = bytes.fromhex(d["salt"])
 | |
|         pass
 | |
| 
 | |
|     def is_valid(self, inp: str) -> bool:
 | |
|         dk = hashlib.pbkdf2_hmac(self._algorithm, inp.encode(), self._salt, self.iterations)
 | |
|         return dk.hex() == self._hash
 | |
| 
 | |
|     def toJson(self) -> dict:
 | |
|         return {
 | |
|             "encoding": "hash",
 | |
|             "value": self._hash,
 | |
|             "algorithm": self._algorithm,
 | |
|             "salt": self._salt.hex(),
 | |
|         }
 | |
| 
 | |
| 
 | |
| DefaultPasswordClass = HashedPassword
 | |
| 
 | |
| 
 | |
| class User(object):
 | |
|     def __init__(self, name: str, enabled: bool, password: Password, must_change_password: bool = False):
 | |
|         self.name = name
 | |
|         self.enabled = enabled
 | |
|         self.password = password
 | |
|         self.must_change_password = must_change_password
 | |
| 
 | |
|     def toJson(self):
 | |
|         return {
 | |
|             "user": self.name,
 | |
|             "enabled": self.enabled,
 | |
|             "must_change_password": self.must_change_password,
 | |
|             "password": self.password.toJson()
 | |
|         }
 | |
| 
 | |
|     @staticmethod
 | |
|     def fromJson(d):
 | |
|         if "user" in d and "password" in d and "enabled" in d:
 | |
|             mcp = d["must_change_password"] if "must_change_password" in d else False
 | |
|             return User(d["user"], d["enabled"], Password.from_dict(d["password"]), mcp)
 | |
| 
 | |
|     def setPassword(self, password: Password, must_change_password: bool = None):
 | |
|         self.password = password
 | |
|         if must_change_password is not None:
 | |
|             self.must_change_password = must_change_password
 | |
| 
 | |
|     def is_enabled(self):
 | |
|         return self.enabled
 | |
| 
 | |
|     def enable(self):
 | |
|         self.enabled = True
 | |
| 
 | |
|     def disable(self):
 | |
|         self.enabled = False
 | |
| 
 | |
| 
 | |
| class UserList(object):
 | |
|     sharedInstance = None
 | |
| 
 | |
|     @staticmethod
 | |
|     def getSharedInstance():
 | |
|         if UserList.sharedInstance is None:
 | |
|             UserList.sharedInstance = UserList()
 | |
|         return UserList.sharedInstance
 | |
| 
 | |
|     def __init__(self):
 | |
|         self.file_modified = None
 | |
|         self.users = {}
 | |
| 
 | |
|     def refresh(self):
 | |
|         if self.file_modified is None or self._getUsersFileModifiedTimestamp() > self.file_modified:
 | |
|             logger.debug("reloading users from disk due to file modification")
 | |
|             self.users = self._loadUsers()
 | |
| 
 | |
|     def _getUsersFile(self):
 | |
|         config = CoreConfig()
 | |
|         return "{data_directory}/users.json".format(data_directory=config.get_data_directory())
 | |
| 
 | |
|     def _getUsersFileModifiedTimestamp(self):
 | |
|         timestamp = 0
 | |
|         try:
 | |
|             timestamp = os.path.getmtime(self._getUsersFile())
 | |
|         except FileNotFoundError:
 | |
|             pass
 | |
|         return datetime.fromtimestamp(timestamp, timezone.utc)
 | |
| 
 | |
|     def _loadUsers(self):
 | |
|         usersFile = self._getUsersFile()
 | |
|         # to avoid concurrency issues and problems when parsing errors occur:
 | |
|         # get early, store late
 | |
|         modified = self._getUsersFileModifiedTimestamp()
 | |
|         try:
 | |
|             with open(usersFile, "r") as f:
 | |
|                 users_json = json.load(f)
 | |
| 
 | |
|             users = {u.name: u for u in [User.fromJson(d) for d in users_json]}
 | |
|             self.file_modified = modified
 | |
|             return users
 | |
|         except FileNotFoundError:
 | |
|             self.file_modified = modified
 | |
|             return {}
 | |
|         except json.JSONDecodeError:
 | |
|             logger.exception("error while parsing users file %s", usersFile)
 | |
|             return {}
 | |
|         except Exception:
 | |
|             logger.exception("error while processing users from %s", usersFile)
 | |
|             return {}
 | |
| 
 | |
|     def _userToJson(self, u):
 | |
|         return u.toJson()
 | |
| 
 | |
|     def store(self):
 | |
|         usersFile = self._getUsersFile()
 | |
|         users = [u.toJson() for u in self.values()]
 | |
|         try:
 | |
|             # don't write directly to file to avoid corruption on exceptions
 | |
|             jsonContent = json.dumps(users, indent=4)
 | |
|             with open(usersFile, "w") as f:
 | |
|                 f.write(jsonContent)
 | |
|             # file should be readable by us only
 | |
|             os.chmod(usersFile, stat.S_IWUSR + stat.S_IRUSR)
 | |
|         except Exception:
 | |
|             logger.exception("error while writing users file %s", usersFile)
 | |
|         self.refresh()
 | |
| 
 | |
|     def _getUsername(self, user):
 | |
|         if isinstance(user, User):
 | |
|             return user.name
 | |
|         elif isinstance(user, str):
 | |
|             return user
 | |
|         else:
 | |
|             raise ValueError("invalid user type")
 | |
| 
 | |
|     def addUser(self, user: User):
 | |
|         self[user.name] = user
 | |
| 
 | |
|     def deleteUser(self, user):
 | |
|         del self[self._getUsername(user)]
 | |
| 
 | |
|     def __delitem__(self, key):
 | |
|         self.refresh()
 | |
|         if key not in self.users:
 | |
|             raise KeyError("User {user} doesn't exist".format(user=key))
 | |
|         del self.users[key]
 | |
|         self.store()
 | |
| 
 | |
|     def __getitem__(self, item):
 | |
|         self.refresh()
 | |
|         return self.users[item]
 | |
| 
 | |
|     def __contains__(self, item):
 | |
|         self.refresh()
 | |
|         return item in self.users
 | |
| 
 | |
|     def __setitem__(self, key, value):
 | |
|         self.refresh()
 | |
|         if key in self.users:
 | |
|             raise KeyError("User {user} already exists".format(user=key))
 | |
|         self.users[key] = value
 | |
|         self.store()
 | |
| 
 | |
|     def values(self):
 | |
|         self.refresh()
 | |
|         return self.users.values()
 | 
