from flask import Blueprint, request, jsonify from server.permissions import admin_or_higher from server.database import Session from models.models import SchoolHoliday from datetime import datetime import csv import io holidays_bp = Blueprint("holidays", __name__, url_prefix="/api/holidays") @holidays_bp.route("", methods=["GET"]) def list_holidays(): session = Session() region = request.args.get("region") q = session.query(SchoolHoliday) if region: q = q.filter(SchoolHoliday.region == region) rows = q.order_by(SchoolHoliday.start_date.asc()).all() data = [r.to_dict() for r in rows] session.close() return jsonify({"holidays": data}) @holidays_bp.route("/upload", methods=["POST"]) @admin_or_higher def upload_holidays(): """ Accepts a CSV/TXT file upload (multipart/form-data). Supported formats: 1) Headered CSV with columns (case-insensitive): name, start_date, end_date[, region] - Dates: YYYY-MM-DD, DD.MM.YYYY, YYYY/MM/DD, or YYYYMMDD 2) Headerless CSV/TXT lines with columns: [internal, name, start_yyyymmdd, end_yyyymmdd, optional_internal] - Only columns 2-4 are used; 1 and 5 are ignored. """ if "file" not in request.files: return jsonify({"error": "No file part"}), 400 file = request.files["file"] if file.filename == "": return jsonify({"error": "No selected file"}), 400 try: raw = file.read() # Try UTF-8 first (strict), then cp1252, then latin-1 as last resort try: content = raw.decode("utf-8") except UnicodeDecodeError: try: content = raw.decode("cp1252") except UnicodeDecodeError: content = raw.decode("latin-1", errors="replace") sniffer = csv.Sniffer() dialect = None try: sample = content[:2048] # Some files may contain a lot of quotes; allow Sniffer to guess delimiter dialect = sniffer.sniff(sample) except Exception: pass def parse_date(s: str): s = (s or "").strip() if not s: return None # Numeric YYYYMMDD if s.isdigit() and len(s) == 8: try: return datetime.strptime(s, "%Y%m%d").date() except ValueError: pass # Common formats for fmt in ("%Y-%m-%d", "%d.%m.%Y", "%Y/%m/%d"): try: return datetime.strptime(s, fmt).date() except ValueError: continue raise ValueError(f"Unsupported date format: {s}") session = Session() inserted = 0 updated = 0 # First, try headered CSV via DictReader dict_reader = csv.DictReader(io.StringIO( content), dialect=dialect) if dialect else csv.DictReader(io.StringIO(content)) fieldnames_lower = [h.lower() for h in (dict_reader.fieldnames or [])] has_required_headers = {"name", "start_date", "end_date"}.issubset(set(fieldnames_lower)) def upsert(name: str, start_date, end_date, region=None): nonlocal inserted, updated if not name or not start_date or not end_date: return existing = ( session.query(SchoolHoliday) .filter( SchoolHoliday.name == name, SchoolHoliday.start_date == start_date, SchoolHoliday.end_date == end_date, SchoolHoliday.region.is_( region) if region is None else SchoolHoliday.region == region, ) .first() ) if existing: existing.region = region existing.source_file_name = file.filename updated += 1 else: session.add(SchoolHoliday( name=name, start_date=start_date, end_date=end_date, region=region, source_file_name=file.filename, )) inserted += 1 if has_required_headers: for row in dict_reader: norm = {k.lower(): (v or "").strip() for k, v in row.items()} name = norm.get("name") try: start_date = parse_date(norm.get("start_date")) end_date = parse_date(norm.get("end_date")) except ValueError: # Skip rows with unparseable dates continue region = (norm.get("region") or None) if "region" in norm else None upsert(name, start_date, end_date, region) else: # Fallback: headerless rows -> use columns [1]=name, [2]=start, [3]=end reader = csv.reader(io.StringIO( content), dialect=dialect) if dialect else csv.reader(io.StringIO(content)) for row in reader: if not row: continue # tolerate varying column counts (4 or 5); ignore first and optional last cols = [c.strip() for c in row] if len(cols) < 4: # Not enough data continue name = cols[1].strip().strip('"') start_raw = cols[2] end_raw = cols[3] try: start_date = parse_date(start_raw) end_date = parse_date(end_raw) except ValueError: continue upsert(name, start_date, end_date, None) session.commit() session.close() return jsonify({"success": True, "inserted": inserted, "updated": updated}) except Exception as e: return jsonify({"error": str(e)}), 400