""" User management routes. This module provides endpoints for managing users (CRUD operations). Access is role-based: admin can manage user/editor/admin, superadmin can manage all. """ from flask import Blueprint, request, jsonify, session from server.database import Session from models.models import User, UserRole from server.permissions import require_role, superadmin_only import bcrypt import sys from datetime import datetime, timezone sys.path.append('/workspace') users_bp = Blueprint("users", __name__, url_prefix="/api/users") @users_bp.route("", methods=["GET"]) @require_role('admin', 'superadmin') def list_users(): """ List all users (filtered by current user's role). Admin: sees user, editor, admin Superadmin: sees all including superadmin Returns: 200: [ { "id": int, "username": "string", "role": "string", "isActive": boolean, "createdAt": "ISO8601", "updatedAt": "ISO8601" } ] """ db_session = Session() try: current_role = session.get('role') query = db_session.query(User) # Admin cannot see superadmin users if current_role == 'admin': query = query.filter(User.role.in_([UserRole.user, UserRole.editor, UserRole.admin])) users = query.order_by(User.username).all() result = [] for user in users: result.append({ "id": user.id, "username": user.username, "role": user.role.value, "isActive": user.is_active, "lastLoginAt": user.last_login_at.isoformat() if user.last_login_at else None, "lastPasswordChangeAt": user.last_password_change_at.isoformat() if user.last_password_change_at else None, "failedLoginAttempts": user.failed_login_attempts, "createdAt": user.created_at.isoformat() if user.created_at else None, "updatedAt": user.updated_at.isoformat() if user.updated_at else None }) return jsonify(result), 200 finally: db_session.close() @users_bp.route("", methods=["POST"]) @require_role('admin', 'superadmin') def create_user(): """ Create a new user. Admin: can create user, editor, admin Superadmin: can create any role including superadmin Request body: { "username": "string", "password": "string", "role": "user|editor|admin|superadmin", "isActive": boolean (optional, default true) } Returns: 201: { "id": int, "username": "string", "role": "string", "isActive": boolean, "message": "User created successfully" } 400: {"error": "Validation error"} 403: {"error": "Permission denied"} 409: {"error": "Username already exists"} """ data = request.get_json() if not data: return jsonify({"error": "Request body required"}), 400 username = data.get("username", "").strip() password = data.get("password", "") role_str = data.get("role", "user") is_active = data.get("isActive", True) # Validation if not username: return jsonify({"error": "Username is required"}), 400 if len(username) < 3: return jsonify({"error": "Username must be at least 3 characters"}), 400 if not password: return jsonify({"error": "Password is required"}), 400 if len(password) < 6: return jsonify({"error": "Password must be at least 6 characters"}), 400 # Check if role is valid try: new_role = UserRole[role_str] except KeyError: return jsonify({"error": f"Invalid role: {role_str}"}), 400 # Check permissions: admin cannot create superadmin current_role = session.get('role') if current_role == 'admin' and new_role == UserRole.superadmin: return jsonify({"error": "Admin cannot create superadmin accounts"}), 403 db_session = Session() try: # Check if username already exists existing = db_session.query(User).filter_by(username=username).first() if existing: return jsonify({"error": "Username already exists"}), 409 # Hash password password_hash = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8') # Create user new_user = User( username=username, password_hash=password_hash, role=new_role, is_active=is_active ) db_session.add(new_user) db_session.commit() return jsonify({ "id": new_user.id, "username": new_user.username, "role": new_user.role.value, "isActive": new_user.is_active, "message": "User created successfully" }), 201 finally: db_session.close() @users_bp.route("/", methods=["GET"]) @require_role('admin', 'superadmin') def get_user(user_id): """ Get a single user by ID. Admin: cannot get superadmin users Superadmin: can get any user Returns: 200: { "id": int, "username": "string", "role": "string", "isActive": boolean, "createdAt": "ISO8601", "updatedAt": "ISO8601" } 403: {"error": "Permission denied"} 404: {"error": "User not found"} """ db_session = Session() try: user = db_session.query(User).filter_by(id=user_id).first() if not user: return jsonify({"error": "User not found"}), 404 # Admin cannot view superadmin users current_role = session.get('role') if current_role == 'admin' and user.role == UserRole.superadmin: return jsonify({"error": "Permission denied"}), 403 return jsonify({ "id": user.id, "username": user.username, "role": user.role.value, "isActive": user.is_active, "lastLoginAt": user.last_login_at.isoformat() if user.last_login_at else None, "lastPasswordChangeAt": user.last_password_change_at.isoformat() if user.last_password_change_at else None, "lastFailedLoginAt": user.last_failed_login_at.isoformat() if user.last_failed_login_at else None, "failedLoginAttempts": user.failed_login_attempts, "lockedUntil": user.locked_until.isoformat() if user.locked_until else None, "deactivatedAt": user.deactivated_at.isoformat() if user.deactivated_at else None, "createdAt": user.created_at.isoformat() if user.created_at else None, "updatedAt": user.updated_at.isoformat() if user.updated_at else None }), 200 finally: db_session.close() @users_bp.route("/", methods=["PUT"]) @require_role('admin', 'superadmin') def update_user(user_id): """ Update a user's details. Admin: cannot edit superadmin users, cannot assign superadmin role Superadmin: can edit any user Restrictions: - Cannot change own role - Cannot change own active status Request body: { "username": "string" (optional), "role": "string" (optional), "isActive": boolean (optional) } Returns: 200: { "id": int, "username": "string", "role": "string", "isActive": boolean, "message": "User updated successfully" } 400: {"error": "Validation error"} 403: {"error": "Permission denied"} 404: {"error": "User not found"} 409: {"error": "Username already exists"} """ data = request.get_json() if not data: return jsonify({"error": "Request body required"}), 400 current_user_id = session.get('user_id') current_role = session.get('role') db_session = Session() try: user = db_session.query(User).filter_by(id=user_id).first() if not user: return jsonify({"error": "User not found"}), 404 # Admin cannot edit superadmin users if current_role == 'admin' and user.role == UserRole.superadmin: return jsonify({"error": "Cannot edit superadmin users"}), 403 # Update username if provided if "username" in data: new_username = data["username"].strip() if new_username and new_username != user.username: if len(new_username) < 3: return jsonify({"error": "Username must be at least 3 characters"}), 400 # Check if username already exists existing = db_session.query(User).filter( User.username == new_username, User.id != user_id ).first() if existing: return jsonify({"error": "Username already exists"}), 409 user.username = new_username # Update role if provided if "role" in data: role_str = data["role"] # Cannot change own role if user_id == current_user_id: return jsonify({"error": "Cannot change your own role"}), 403 try: new_role = UserRole[role_str] except KeyError: return jsonify({"error": f"Invalid role: {role_str}"}), 400 # Admin cannot assign superadmin role if current_role == 'admin' and new_role == UserRole.superadmin: return jsonify({"error": "Cannot assign superadmin role"}), 403 user.role = new_role # Update active status if provided if "isActive" in data: # Cannot deactivate own account if user_id == current_user_id: return jsonify({"error": "Cannot deactivate your own account"}), 403 new_status = bool(data["isActive"]) user.is_active = new_status # Track deactivation if not new_status and not user.deactivated_at: user.deactivated_at = datetime.now(timezone.utc) user.deactivated_by = current_user_id db_session.commit() return jsonify({ "id": user.id, "username": user.username, "role": user.role.value, "isActive": user.is_active, "lastLoginAt": None, "lastPasswordChangeAt": None, "failedLoginAttempts": 0, "lastLoginAt": user.last_login_at.isoformat() if user.last_login_at else None, "lastPasswordChangeAt": user.last_password_change_at.isoformat() if user.last_password_change_at else None, "failedLoginAttempts": user.failed_login_attempts, "message": "User updated successfully" }), 200 finally: db_session.close() @users_bp.route("//password", methods=["PUT"]) @require_role('admin', 'superadmin') def reset_password(user_id): """ Reset a user's password. Admin: cannot reset superadmin passwords Superadmin: can reset any password Request body: { "password": "string" } Returns: 200: {"message": "Password reset successfully"} 400: {"error": "Validation error"} 403: {"error": "Permission denied"} 404: {"error": "User not found"} """ data = request.get_json() if not data: return jsonify({"error": "Request body required"}), 400 password = data.get("password", "") if not password: return jsonify({"error": "Password is required"}), 400 if len(password) < 6: return jsonify({"error": "Password must be at least 6 characters"}), 400 current_role = session.get('role') current_user_id = session.get('user_id') db_session = Session() try: user = db_session.query(User).filter_by(id=user_id).first() if not user: return jsonify({"error": "User not found"}), 404 # Users must change their own password via /auth/change-password (requires current password) if user.id == current_user_id: return jsonify({"error": "Use /api/auth/change-password to change your own password"}), 403 # Admin cannot reset superadmin passwords if current_role == 'admin' and user.role == UserRole.superadmin: return jsonify({"error": "Cannot reset superadmin passwords"}), 403 # Hash new password and update timestamp password_hash = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8') user.password_hash = password_hash user.last_password_change_at = datetime.now(timezone.utc) db_session.commit() return jsonify({"message": "Password reset successfully"}), 200 finally: db_session.close() @users_bp.route("/", methods=["DELETE"]) @superadmin_only def delete_user(user_id): """ Permanently delete a user (superadmin only). Cannot delete own account. Returns: 200: {"message": "User deleted successfully"} 403: {"error": "Cannot delete your own account"} 404: {"error": "User not found"} """ current_user_id = session.get('user_id') # Cannot delete own account if user_id == current_user_id: return jsonify({"error": "Cannot delete your own account"}), 403 db_session = Session() try: user = db_session.query(User).filter_by(id=user_id).first() if not user: return jsonify({"error": "User not found"}), 404 username = user.username # Store for message db_session.delete(user) db_session.commit() return jsonify({"message": f"User '{username}' deleted successfully"}), 200 finally: db_session.close()