""" Authentication and user management routes. This module provides endpoints for user authentication and role information. Currently implements a basic session-based auth that can be extended with JWT or Flask-Login later. """ from flask import Blueprint, request, jsonify, session import os from server.database import Session from models.models import User, UserRole from server.permissions import require_auth import bcrypt import sys from datetime import datetime, timezone sys.path.append('/workspace') auth_bp = Blueprint("auth", __name__, url_prefix="/api/auth") @auth_bp.route("/login", methods=["POST"]) def login(): """ Authenticate a user and create a session. Request body: { "username": "string", "password": "string" } Returns: 200: { "message": "Login successful", "user": { "id": int, "username": "string", "role": "string" } } 401: {"error": "Invalid credentials"} 400: {"error": "Username and password required"} """ data = request.get_json() if not data: return jsonify({"error": "Request body required"}), 400 username = data.get("username") password = data.get("password") if not username or not password: return jsonify({"error": "Username and password required"}), 400 db_session = Session() try: # Find user by username user = db_session.query(User).filter_by(username=username).first() if not user: return jsonify({"error": "Invalid credentials"}), 401 # Check if user is active if not user.is_active: return jsonify({"error": "Account is disabled"}), 401 # Verify password if not bcrypt.checkpw(password.encode('utf-8'), user.password_hash.encode('utf-8')): # Track failed login attempt user.last_failed_login_at = datetime.now(timezone.utc) user.failed_login_attempts = (user.failed_login_attempts or 0) + 1 db_session.commit() return jsonify({"error": "Invalid credentials"}), 401 # Successful login: update last_login_at and reset failed attempts user.last_login_at = datetime.now(timezone.utc) user.failed_login_attempts = 0 db_session.commit() # Create session session['user_id'] = user.id session['username'] = user.username session['role'] = user.role.value # Persist session across browser restarts (uses PERMANENT_SESSION_LIFETIME) session.permanent = True return jsonify({ "message": "Login successful", "user": { "id": user.id, "username": user.username, "role": user.role.value } }), 200 finally: db_session.close() @auth_bp.route("/logout", methods=["POST"]) def logout(): """ End the current user session. Returns: 200: {"message": "Logout successful"} """ session.clear() return jsonify({"message": "Logout successful"}), 200 @auth_bp.route("/me", methods=["GET"]) def get_current_user(): """ Get the current authenticated user's information. Returns: 200: { "id": int, "username": "string", "role": "string", "is_active": bool } 401: {"error": "Not authenticated"} """ user_id = session.get('user_id') if not user_id: return jsonify({"error": "Not authenticated"}), 401 db_session = Session() try: user = db_session.query(User).filter_by(id=user_id).first() if not user: # Session is stale, user was deleted session.clear() return jsonify({"error": "Not authenticated"}), 401 if not user.is_active: # User was deactivated session.clear() return jsonify({"error": "Account is disabled"}), 401 # For SQLAlchemy Enum(UserRole), ensure we return the string value role_value = user.role.value if isinstance(user.role, UserRole) else str(user.role) return jsonify({ "id": user.id, "username": user.username, "role": role_value, "is_active": user.is_active }), 200 except Exception as e: # Avoid naked 500s; return a JSON error with minimal info (safe in dev) env = os.environ.get("ENV", "production").lower() msg = str(e) if env in ("development", "dev") else "Internal server error" return jsonify({"error": msg}), 500 finally: db_session.close() @auth_bp.route("/check", methods=["GET"]) def check_auth(): """ Quick check if user is authenticated (lighter than /me). Returns: 200: {"authenticated": true, "role": "string"} 200: {"authenticated": false} """ user_id = session.get('user_id') role = session.get('role') if user_id and role: return jsonify({ "authenticated": True, "role": role }), 200 return jsonify({"authenticated": False}), 200 @auth_bp.route("/change-password", methods=["PUT"]) @require_auth def change_password(): """ Allow the authenticated user to change their own password. Request body: { "current_password": "string", "new_password": "string" } Returns: 200: {"message": "Password changed successfully"} 400: {"error": "Validation error"} 401: {"error": "Invalid current password"} 404: {"error": "User not found"} """ data = request.get_json() or {} current_password = data.get("current_password", "") new_password = data.get("new_password", "") if not current_password or not new_password: return jsonify({"error": "Current password and new password are required"}), 400 if len(new_password) < 6: return jsonify({"error": "New password must be at least 6 characters"}), 400 user_id = session.get('user_id') db_session = Session() try: user = db_session.query(User).filter_by(id=user_id).first() if not user: session.clear() return jsonify({"error": "User not found"}), 404 # Verify current password if not bcrypt.checkpw(current_password.encode('utf-8'), user.password_hash.encode('utf-8')): return jsonify({"error": "Current password is incorrect"}), 401 # Update password hash and timestamp new_hash = bcrypt.hashpw(new_password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8') user.password_hash = new_hash user.last_password_change_at = datetime.now(timezone.utc) db_session.commit() return jsonify({"message": "Password changed successfully"}), 200 finally: db_session.close() @auth_bp.route("/dev-login-superadmin", methods=["POST"]) def dev_login_superadmin(): """ Development-only endpoint to quickly establish a superadmin session without a password. Enabled only when ENV is 'development' or 'dev'. Returns 404 otherwise. """ env = os.environ.get("ENV", "production").lower() if env not in ("development", "dev"): # Pretend the route does not exist in non-dev environments return jsonify({"error": "Not found"}), 404 db_session = Session() try: # Prefer explicit username from env, else pick any superadmin preferred_username = os.environ.get("DEFAULT_SUPERADMIN_USERNAME", "superadmin") user = ( db_session.query(User) .filter((User.username == preferred_username) | (User.role == UserRole.superadmin)) .order_by(User.id.asc()) .first() ) if not user: return jsonify({ "error": "No superadmin user found. Seed a superadmin first (DEFAULT_SUPERADMIN_PASSWORD)." }), 404 # Establish session session['user_id'] = user.id session['username'] = user.username session['role'] = user.role.value session.permanent = True return jsonify({ "message": "Dev login successful (superadmin)", "user": { "id": user.id, "username": user.username, "role": user.role.value } }), 200 finally: db_session.close()