feat: document user management system and RBAC implementation

- Update copilot-instructions.md with user model, API routes, and frontend patterns
- Update README.md with RBAC details, user management API, and security sections
- Add user management technical documentation to TECH-CHANGELOG.md
- Bump version to 2025.1.0-alpha.13 with user management changelog entries
This commit is contained in:
RobbStarkAustria
2025-12-29 12:37:54 +00:00
parent c193209326
commit 5a0c1bc686
13 changed files with 1823 additions and 28 deletions

View File

@@ -0,0 +1,52 @@
"""add user audit fields
Revision ID: 4f0b8a3e5c20
Revises: 21226a449037
Create Date: 2025-12-29 00:00:00.000000
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '4f0b8a3e5c20'
down_revision: Union[str, None] = '21226a449037'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
op.add_column('users', sa.Column('last_login_at', sa.TIMESTAMP(timezone=True), nullable=True))
op.add_column('users', sa.Column('last_password_change_at', sa.TIMESTAMP(timezone=True), nullable=True))
op.add_column('users', sa.Column('last_failed_login_at', sa.TIMESTAMP(timezone=True), nullable=True))
op.add_column(
'users',
sa.Column('failed_login_attempts', sa.Integer(), nullable=False, server_default='0')
)
op.add_column('users', sa.Column('locked_until', sa.TIMESTAMP(timezone=True), nullable=True))
op.add_column('users', sa.Column('deactivated_at', sa.TIMESTAMP(timezone=True), nullable=True))
op.add_column('users', sa.Column('deactivated_by', sa.Integer(), nullable=True))
op.create_foreign_key(
'fk_users_deactivated_by_users',
'users',
'users',
['deactivated_by'],
['id'],
ondelete='SET NULL',
)
# Optional: keep server_default for failed_login_attempts; remove if you prefer no default after backfill
def downgrade() -> None:
"""Downgrade schema."""
op.drop_constraint('fk_users_deactivated_by_users', 'users', type_='foreignkey')
op.drop_column('users', 'deactivated_by')
op.drop_column('users', 'deactivated_at')
op.drop_column('users', 'locked_until')
op.drop_column('users', 'failed_login_attempts')
op.drop_column('users', 'last_failed_login_at')
op.drop_column('users', 'last_password_change_at')
op.drop_column('users', 'last_login_at')

View File

@@ -10,8 +10,10 @@ from flask import Blueprint, request, jsonify, session
import os
from server.database import Session
from models.models import User, UserRole
from server.permissions import require_auth
import bcrypt
import sys
from datetime import datetime, timezone
sys.path.append('/workspace')
@@ -66,8 +68,17 @@ def login():
# Verify password
if not bcrypt.checkpw(password.encode('utf-8'), user.password_hash.encode('utf-8')):
# Track failed login attempt
user.last_failed_login_at = datetime.now(timezone.utc)
user.failed_login_attempts = (user.failed_login_attempts or 0) + 1
db_session.commit()
return jsonify({"error": "Invalid credentials"}), 401
# Successful login: update last_login_at and reset failed attempts
user.last_login_at = datetime.now(timezone.utc)
user.failed_login_attempts = 0
db_session.commit()
# Create session
session['user_id'] = user.id
session['username'] = user.username
@@ -173,6 +184,57 @@ def check_auth():
return jsonify({"authenticated": False}), 200
@auth_bp.route("/change-password", methods=["PUT"])
@require_auth
def change_password():
"""
Allow the authenticated user to change their own password.
Request body:
{
"current_password": "string",
"new_password": "string"
}
Returns:
200: {"message": "Password changed successfully"}
400: {"error": "Validation error"}
401: {"error": "Invalid current password"}
404: {"error": "User not found"}
"""
data = request.get_json() or {}
current_password = data.get("current_password", "")
new_password = data.get("new_password", "")
if not current_password or not new_password:
return jsonify({"error": "Current password and new password are required"}), 400
if len(new_password) < 6:
return jsonify({"error": "New password must be at least 6 characters"}), 400
user_id = session.get('user_id')
db_session = Session()
try:
user = db_session.query(User).filter_by(id=user_id).first()
if not user:
session.clear()
return jsonify({"error": "User not found"}), 404
# Verify current password
if not bcrypt.checkpw(current_password.encode('utf-8'), user.password_hash.encode('utf-8')):
return jsonify({"error": "Current password is incorrect"}), 401
# Update password hash and timestamp
new_hash = bcrypt.hashpw(new_password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
user.password_hash = new_hash
user.last_password_change_at = datetime.now(timezone.utc)
db_session.commit()
return jsonify({"message": "Password changed successfully"}), 200
finally:
db_session.close()
@auth_bp.route("/dev-login-superadmin", methods=["POST"])
def dev_login_superadmin():
"""

439
server/routes/users.py Normal file
View File

@@ -0,0 +1,439 @@
"""
User management routes.
This module provides endpoints for managing users (CRUD operations).
Access is role-based: admin can manage user/editor/admin, superadmin can manage all.
"""
from flask import Blueprint, request, jsonify, session
from server.database import Session
from models.models import User, UserRole
from server.permissions import require_role, superadmin_only
import bcrypt
import sys
from datetime import datetime, timezone
sys.path.append('/workspace')
users_bp = Blueprint("users", __name__, url_prefix="/api/users")
@users_bp.route("", methods=["GET"])
@require_role('admin', 'superadmin')
def list_users():
"""
List all users (filtered by current user's role).
Admin: sees user, editor, admin
Superadmin: sees all including superadmin
Returns:
200: [
{
"id": int,
"username": "string",
"role": "string",
"isActive": boolean,
"createdAt": "ISO8601",
"updatedAt": "ISO8601"
}
]
"""
db_session = Session()
try:
current_role = session.get('role')
query = db_session.query(User)
# Admin cannot see superadmin users
if current_role == 'admin':
query = query.filter(User.role.in_([UserRole.user, UserRole.editor, UserRole.admin]))
users = query.order_by(User.username).all()
result = []
for user in users:
result.append({
"id": user.id,
"username": user.username,
"role": user.role.value,
"isActive": user.is_active,
"lastLoginAt": user.last_login_at.isoformat() if user.last_login_at else None,
"lastPasswordChangeAt": user.last_password_change_at.isoformat() if user.last_password_change_at else None,
"failedLoginAttempts": user.failed_login_attempts,
"createdAt": user.created_at.isoformat() if user.created_at else None,
"updatedAt": user.updated_at.isoformat() if user.updated_at else None
})
return jsonify(result), 200
finally:
db_session.close()
@users_bp.route("", methods=["POST"])
@require_role('admin', 'superadmin')
def create_user():
"""
Create a new user.
Admin: can create user, editor, admin
Superadmin: can create any role including superadmin
Request body:
{
"username": "string",
"password": "string",
"role": "user|editor|admin|superadmin",
"isActive": boolean (optional, default true)
}
Returns:
201: {
"id": int,
"username": "string",
"role": "string",
"isActive": boolean,
"message": "User created successfully"
}
400: {"error": "Validation error"}
403: {"error": "Permission denied"}
409: {"error": "Username already exists"}
"""
data = request.get_json()
if not data:
return jsonify({"error": "Request body required"}), 400
username = data.get("username", "").strip()
password = data.get("password", "")
role_str = data.get("role", "user")
is_active = data.get("isActive", True)
# Validation
if not username:
return jsonify({"error": "Username is required"}), 400
if len(username) < 3:
return jsonify({"error": "Username must be at least 3 characters"}), 400
if not password:
return jsonify({"error": "Password is required"}), 400
if len(password) < 6:
return jsonify({"error": "Password must be at least 6 characters"}), 400
# Check if role is valid
try:
new_role = UserRole[role_str]
except KeyError:
return jsonify({"error": f"Invalid role: {role_str}"}), 400
# Check permissions: admin cannot create superadmin
current_role = session.get('role')
if current_role == 'admin' and new_role == UserRole.superadmin:
return jsonify({"error": "Admin cannot create superadmin accounts"}), 403
db_session = Session()
try:
# Check if username already exists
existing = db_session.query(User).filter_by(username=username).first()
if existing:
return jsonify({"error": "Username already exists"}), 409
# Hash password
password_hash = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
# Create user
new_user = User(
username=username,
password_hash=password_hash,
role=new_role,
is_active=is_active
)
db_session.add(new_user)
db_session.commit()
return jsonify({
"id": new_user.id,
"username": new_user.username,
"role": new_user.role.value,
"isActive": new_user.is_active,
"message": "User created successfully"
}), 201
finally:
db_session.close()
@users_bp.route("/<int:user_id>", methods=["GET"])
@require_role('admin', 'superadmin')
def get_user(user_id):
"""
Get a single user by ID.
Admin: cannot get superadmin users
Superadmin: can get any user
Returns:
200: {
"id": int,
"username": "string",
"role": "string",
"isActive": boolean,
"createdAt": "ISO8601",
"updatedAt": "ISO8601"
}
403: {"error": "Permission denied"}
404: {"error": "User not found"}
"""
db_session = Session()
try:
user = db_session.query(User).filter_by(id=user_id).first()
if not user:
return jsonify({"error": "User not found"}), 404
# Admin cannot view superadmin users
current_role = session.get('role')
if current_role == 'admin' and user.role == UserRole.superadmin:
return jsonify({"error": "Permission denied"}), 403
return jsonify({
"id": user.id,
"username": user.username,
"role": user.role.value,
"isActive": user.is_active,
"lastLoginAt": user.last_login_at.isoformat() if user.last_login_at else None,
"lastPasswordChangeAt": user.last_password_change_at.isoformat() if user.last_password_change_at else None,
"lastFailedLoginAt": user.last_failed_login_at.isoformat() if user.last_failed_login_at else None,
"failedLoginAttempts": user.failed_login_attempts,
"lockedUntil": user.locked_until.isoformat() if user.locked_until else None,
"deactivatedAt": user.deactivated_at.isoformat() if user.deactivated_at else None,
"createdAt": user.created_at.isoformat() if user.created_at else None,
"updatedAt": user.updated_at.isoformat() if user.updated_at else None
}), 200
finally:
db_session.close()
@users_bp.route("/<int:user_id>", methods=["PUT"])
@require_role('admin', 'superadmin')
def update_user(user_id):
"""
Update a user's details.
Admin: cannot edit superadmin users, cannot assign superadmin role
Superadmin: can edit any user
Restrictions:
- Cannot change own role
- Cannot change own active status
Request body:
{
"username": "string" (optional),
"role": "string" (optional),
"isActive": boolean (optional)
}
Returns:
200: {
"id": int,
"username": "string",
"role": "string",
"isActive": boolean,
"message": "User updated successfully"
}
400: {"error": "Validation error"}
403: {"error": "Permission denied"}
404: {"error": "User not found"}
409: {"error": "Username already exists"}
"""
data = request.get_json()
if not data:
return jsonify({"error": "Request body required"}), 400
current_user_id = session.get('user_id')
current_role = session.get('role')
db_session = Session()
try:
user = db_session.query(User).filter_by(id=user_id).first()
if not user:
return jsonify({"error": "User not found"}), 404
# Admin cannot edit superadmin users
if current_role == 'admin' and user.role == UserRole.superadmin:
return jsonify({"error": "Cannot edit superadmin users"}), 403
# Update username if provided
if "username" in data:
new_username = data["username"].strip()
if new_username and new_username != user.username:
if len(new_username) < 3:
return jsonify({"error": "Username must be at least 3 characters"}), 400
# Check if username already exists
existing = db_session.query(User).filter(
User.username == new_username,
User.id != user_id
).first()
if existing:
return jsonify({"error": "Username already exists"}), 409
user.username = new_username
# Update role if provided
if "role" in data:
role_str = data["role"]
# Cannot change own role
if user_id == current_user_id:
return jsonify({"error": "Cannot change your own role"}), 403
try:
new_role = UserRole[role_str]
except KeyError:
return jsonify({"error": f"Invalid role: {role_str}"}), 400
# Admin cannot assign superadmin role
if current_role == 'admin' and new_role == UserRole.superadmin:
return jsonify({"error": "Cannot assign superadmin role"}), 403
user.role = new_role
# Update active status if provided
if "isActive" in data:
# Cannot deactivate own account
if user_id == current_user_id:
return jsonify({"error": "Cannot deactivate your own account"}), 403
new_status = bool(data["isActive"])
user.is_active = new_status
# Track deactivation
if not new_status and not user.deactivated_at:
user.deactivated_at = datetime.now(timezone.utc)
user.deactivated_by = current_user_id
db_session.commit()
return jsonify({
"id": user.id,
"username": user.username,
"role": user.role.value,
"isActive": user.is_active, "lastLoginAt": None,
"lastPasswordChangeAt": None,
"failedLoginAttempts": 0, "lastLoginAt": user.last_login_at.isoformat() if user.last_login_at else None,
"lastPasswordChangeAt": user.last_password_change_at.isoformat() if user.last_password_change_at else None,
"failedLoginAttempts": user.failed_login_attempts,
"message": "User updated successfully"
}), 200
finally:
db_session.close()
@users_bp.route("/<int:user_id>/password", methods=["PUT"])
@require_role('admin', 'superadmin')
def reset_password(user_id):
"""
Reset a user's password.
Admin: cannot reset superadmin passwords
Superadmin: can reset any password
Request body:
{
"password": "string"
}
Returns:
200: {"message": "Password reset successfully"}
400: {"error": "Validation error"}
403: {"error": "Permission denied"}
404: {"error": "User not found"}
"""
data = request.get_json()
if not data:
return jsonify({"error": "Request body required"}), 400
password = data.get("password", "")
if not password:
return jsonify({"error": "Password is required"}), 400
if len(password) < 6:
return jsonify({"error": "Password must be at least 6 characters"}), 400
current_role = session.get('role')
current_user_id = session.get('user_id')
db_session = Session()
try:
user = db_session.query(User).filter_by(id=user_id).first()
if not user:
return jsonify({"error": "User not found"}), 404
# Users must change their own password via /auth/change-password (requires current password)
if user.id == current_user_id:
return jsonify({"error": "Use /api/auth/change-password to change your own password"}), 403
# Admin cannot reset superadmin passwords
if current_role == 'admin' and user.role == UserRole.superadmin:
return jsonify({"error": "Cannot reset superadmin passwords"}), 403
# Hash new password and update timestamp
password_hash = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
user.password_hash = password_hash
user.last_password_change_at = datetime.now(timezone.utc)
db_session.commit()
return jsonify({"message": "Password reset successfully"}), 200
finally:
db_session.close()
@users_bp.route("/<int:user_id>", methods=["DELETE"])
@superadmin_only
def delete_user(user_id):
"""
Permanently delete a user (superadmin only).
Cannot delete own account.
Returns:
200: {"message": "User deleted successfully"}
403: {"error": "Cannot delete your own account"}
404: {"error": "User not found"}
"""
current_user_id = session.get('user_id')
# Cannot delete own account
if user_id == current_user_id:
return jsonify({"error": "Cannot delete your own account"}), 403
db_session = Session()
try:
user = db_session.query(User).filter_by(id=user_id).first()
if not user:
return jsonify({"error": "User not found"}), 404
username = user.username # Store for message
db_session.delete(user)
db_session.commit()
return jsonify({"message": f"User '{username}' deleted successfully"}), 200
finally:
db_session.close()

View File

@@ -9,6 +9,7 @@ from server.routes.academic_periods import academic_periods_bp
from server.routes.groups import groups_bp
from server.routes.clients import clients_bp
from server.routes.auth import auth_bp
from server.routes.users import users_bp
from server.routes.system_settings import system_settings_bp
from server.database import Session, engine
from flask import Flask, jsonify, send_from_directory, request
@@ -43,6 +44,7 @@ else:
# Blueprints importieren und registrieren
app.register_blueprint(auth_bp)
app.register_blueprint(users_bp)
app.register_blueprint(clients_bp)
app.register_blueprint(groups_bp)
app.register_blueprint(events_bp)