Files
infoscreen/server/routes/users.py
RobbStarkAustria 5a0c1bc686 feat: document user management system and RBAC implementation
- Update copilot-instructions.md with user model, API routes, and frontend patterns
- Update README.md with RBAC details, user management API, and security sections
- Add user management technical documentation to TECH-CHANGELOG.md
- Bump version to 2025.1.0-alpha.13 with user management changelog entries
2025-12-29 12:37:54 +00:00

440 lines
14 KiB
Python

"""
User management routes.
This module provides endpoints for managing users (CRUD operations).
Access is role-based: admin can manage user/editor/admin, superadmin can manage all.
"""
from flask import Blueprint, request, jsonify, session
from server.database import Session
from models.models import User, UserRole
from server.permissions import require_role, superadmin_only
import bcrypt
import sys
from datetime import datetime, timezone
sys.path.append('/workspace')
users_bp = Blueprint("users", __name__, url_prefix="/api/users")
@users_bp.route("", methods=["GET"])
@require_role('admin', 'superadmin')
def list_users():
"""
List all users (filtered by current user's role).
Admin: sees user, editor, admin
Superadmin: sees all including superadmin
Returns:
200: [
{
"id": int,
"username": "string",
"role": "string",
"isActive": boolean,
"createdAt": "ISO8601",
"updatedAt": "ISO8601"
}
]
"""
db_session = Session()
try:
current_role = session.get('role')
query = db_session.query(User)
# Admin cannot see superadmin users
if current_role == 'admin':
query = query.filter(User.role.in_([UserRole.user, UserRole.editor, UserRole.admin]))
users = query.order_by(User.username).all()
result = []
for user in users:
result.append({
"id": user.id,
"username": user.username,
"role": user.role.value,
"isActive": user.is_active,
"lastLoginAt": user.last_login_at.isoformat() if user.last_login_at else None,
"lastPasswordChangeAt": user.last_password_change_at.isoformat() if user.last_password_change_at else None,
"failedLoginAttempts": user.failed_login_attempts,
"createdAt": user.created_at.isoformat() if user.created_at else None,
"updatedAt": user.updated_at.isoformat() if user.updated_at else None
})
return jsonify(result), 200
finally:
db_session.close()
@users_bp.route("", methods=["POST"])
@require_role('admin', 'superadmin')
def create_user():
"""
Create a new user.
Admin: can create user, editor, admin
Superadmin: can create any role including superadmin
Request body:
{
"username": "string",
"password": "string",
"role": "user|editor|admin|superadmin",
"isActive": boolean (optional, default true)
}
Returns:
201: {
"id": int,
"username": "string",
"role": "string",
"isActive": boolean,
"message": "User created successfully"
}
400: {"error": "Validation error"}
403: {"error": "Permission denied"}
409: {"error": "Username already exists"}
"""
data = request.get_json()
if not data:
return jsonify({"error": "Request body required"}), 400
username = data.get("username", "").strip()
password = data.get("password", "")
role_str = data.get("role", "user")
is_active = data.get("isActive", True)
# Validation
if not username:
return jsonify({"error": "Username is required"}), 400
if len(username) < 3:
return jsonify({"error": "Username must be at least 3 characters"}), 400
if not password:
return jsonify({"error": "Password is required"}), 400
if len(password) < 6:
return jsonify({"error": "Password must be at least 6 characters"}), 400
# Check if role is valid
try:
new_role = UserRole[role_str]
except KeyError:
return jsonify({"error": f"Invalid role: {role_str}"}), 400
# Check permissions: admin cannot create superadmin
current_role = session.get('role')
if current_role == 'admin' and new_role == UserRole.superadmin:
return jsonify({"error": "Admin cannot create superadmin accounts"}), 403
db_session = Session()
try:
# Check if username already exists
existing = db_session.query(User).filter_by(username=username).first()
if existing:
return jsonify({"error": "Username already exists"}), 409
# Hash password
password_hash = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
# Create user
new_user = User(
username=username,
password_hash=password_hash,
role=new_role,
is_active=is_active
)
db_session.add(new_user)
db_session.commit()
return jsonify({
"id": new_user.id,
"username": new_user.username,
"role": new_user.role.value,
"isActive": new_user.is_active,
"message": "User created successfully"
}), 201
finally:
db_session.close()
@users_bp.route("/<int:user_id>", methods=["GET"])
@require_role('admin', 'superadmin')
def get_user(user_id):
"""
Get a single user by ID.
Admin: cannot get superadmin users
Superadmin: can get any user
Returns:
200: {
"id": int,
"username": "string",
"role": "string",
"isActive": boolean,
"createdAt": "ISO8601",
"updatedAt": "ISO8601"
}
403: {"error": "Permission denied"}
404: {"error": "User not found"}
"""
db_session = Session()
try:
user = db_session.query(User).filter_by(id=user_id).first()
if not user:
return jsonify({"error": "User not found"}), 404
# Admin cannot view superadmin users
current_role = session.get('role')
if current_role == 'admin' and user.role == UserRole.superadmin:
return jsonify({"error": "Permission denied"}), 403
return jsonify({
"id": user.id,
"username": user.username,
"role": user.role.value,
"isActive": user.is_active,
"lastLoginAt": user.last_login_at.isoformat() if user.last_login_at else None,
"lastPasswordChangeAt": user.last_password_change_at.isoformat() if user.last_password_change_at else None,
"lastFailedLoginAt": user.last_failed_login_at.isoformat() if user.last_failed_login_at else None,
"failedLoginAttempts": user.failed_login_attempts,
"lockedUntil": user.locked_until.isoformat() if user.locked_until else None,
"deactivatedAt": user.deactivated_at.isoformat() if user.deactivated_at else None,
"createdAt": user.created_at.isoformat() if user.created_at else None,
"updatedAt": user.updated_at.isoformat() if user.updated_at else None
}), 200
finally:
db_session.close()
@users_bp.route("/<int:user_id>", methods=["PUT"])
@require_role('admin', 'superadmin')
def update_user(user_id):
"""
Update a user's details.
Admin: cannot edit superadmin users, cannot assign superadmin role
Superadmin: can edit any user
Restrictions:
- Cannot change own role
- Cannot change own active status
Request body:
{
"username": "string" (optional),
"role": "string" (optional),
"isActive": boolean (optional)
}
Returns:
200: {
"id": int,
"username": "string",
"role": "string",
"isActive": boolean,
"message": "User updated successfully"
}
400: {"error": "Validation error"}
403: {"error": "Permission denied"}
404: {"error": "User not found"}
409: {"error": "Username already exists"}
"""
data = request.get_json()
if not data:
return jsonify({"error": "Request body required"}), 400
current_user_id = session.get('user_id')
current_role = session.get('role')
db_session = Session()
try:
user = db_session.query(User).filter_by(id=user_id).first()
if not user:
return jsonify({"error": "User not found"}), 404
# Admin cannot edit superadmin users
if current_role == 'admin' and user.role == UserRole.superadmin:
return jsonify({"error": "Cannot edit superadmin users"}), 403
# Update username if provided
if "username" in data:
new_username = data["username"].strip()
if new_username and new_username != user.username:
if len(new_username) < 3:
return jsonify({"error": "Username must be at least 3 characters"}), 400
# Check if username already exists
existing = db_session.query(User).filter(
User.username == new_username,
User.id != user_id
).first()
if existing:
return jsonify({"error": "Username already exists"}), 409
user.username = new_username
# Update role if provided
if "role" in data:
role_str = data["role"]
# Cannot change own role
if user_id == current_user_id:
return jsonify({"error": "Cannot change your own role"}), 403
try:
new_role = UserRole[role_str]
except KeyError:
return jsonify({"error": f"Invalid role: {role_str}"}), 400
# Admin cannot assign superadmin role
if current_role == 'admin' and new_role == UserRole.superadmin:
return jsonify({"error": "Cannot assign superadmin role"}), 403
user.role = new_role
# Update active status if provided
if "isActive" in data:
# Cannot deactivate own account
if user_id == current_user_id:
return jsonify({"error": "Cannot deactivate your own account"}), 403
new_status = bool(data["isActive"])
user.is_active = new_status
# Track deactivation
if not new_status and not user.deactivated_at:
user.deactivated_at = datetime.now(timezone.utc)
user.deactivated_by = current_user_id
db_session.commit()
return jsonify({
"id": user.id,
"username": user.username,
"role": user.role.value,
"isActive": user.is_active, "lastLoginAt": None,
"lastPasswordChangeAt": None,
"failedLoginAttempts": 0, "lastLoginAt": user.last_login_at.isoformat() if user.last_login_at else None,
"lastPasswordChangeAt": user.last_password_change_at.isoformat() if user.last_password_change_at else None,
"failedLoginAttempts": user.failed_login_attempts,
"message": "User updated successfully"
}), 200
finally:
db_session.close()
@users_bp.route("/<int:user_id>/password", methods=["PUT"])
@require_role('admin', 'superadmin')
def reset_password(user_id):
"""
Reset a user's password.
Admin: cannot reset superadmin passwords
Superadmin: can reset any password
Request body:
{
"password": "string"
}
Returns:
200: {"message": "Password reset successfully"}
400: {"error": "Validation error"}
403: {"error": "Permission denied"}
404: {"error": "User not found"}
"""
data = request.get_json()
if not data:
return jsonify({"error": "Request body required"}), 400
password = data.get("password", "")
if not password:
return jsonify({"error": "Password is required"}), 400
if len(password) < 6:
return jsonify({"error": "Password must be at least 6 characters"}), 400
current_role = session.get('role')
current_user_id = session.get('user_id')
db_session = Session()
try:
user = db_session.query(User).filter_by(id=user_id).first()
if not user:
return jsonify({"error": "User not found"}), 404
# Users must change their own password via /auth/change-password (requires current password)
if user.id == current_user_id:
return jsonify({"error": "Use /api/auth/change-password to change your own password"}), 403
# Admin cannot reset superadmin passwords
if current_role == 'admin' and user.role == UserRole.superadmin:
return jsonify({"error": "Cannot reset superadmin passwords"}), 403
# Hash new password and update timestamp
password_hash = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
user.password_hash = password_hash
user.last_password_change_at = datetime.now(timezone.utc)
db_session.commit()
return jsonify({"message": "Password reset successfully"}), 200
finally:
db_session.close()
@users_bp.route("/<int:user_id>", methods=["DELETE"])
@superadmin_only
def delete_user(user_id):
"""
Permanently delete a user (superadmin only).
Cannot delete own account.
Returns:
200: {"message": "User deleted successfully"}
403: {"error": "Cannot delete your own account"}
404: {"error": "User not found"}
"""
current_user_id = session.get('user_id')
# Cannot delete own account
if user_id == current_user_id:
return jsonify({"error": "Cannot delete your own account"}), 403
db_session = Session()
try:
user = db_session.query(User).filter_by(id=user_id).first()
if not user:
return jsonify({"error": "User not found"}), 404
username = user.username # Store for message
db_session.delete(user)
db_session.commit()
return jsonify({"message": f"User '{username}' deleted successfully"}), 200
finally:
db_session.close()