from flask import Blueprint, request, jsonify from server.permissions import admin_or_higher from server.database import Session from models.models import AcademicPeriod, SchoolHoliday, Event, EventException from datetime import datetime, date, timedelta from sqlalchemy import func from sqlalchemy.exc import IntegrityError import csv import io holidays_bp = Blueprint("holidays", __name__, url_prefix="/api/holidays") def _regenerate_for_period(session, academic_period_id) -> int: """Re-generate holiday skip exceptions for all skip_holidays recurring events in the period.""" from dateutil.rrule import rrulestr from dateutil.tz import UTC q = session.query(Event).filter( Event.skip_holidays == True, # noqa: E712 Event.recurrence_rule.isnot(None), ) if academic_period_id is not None: q = q.filter(Event.academic_period_id == academic_period_id) else: q = q.filter(Event.academic_period_id.is_(None)) events = q.all() hq = session.query(SchoolHoliday) if academic_period_id is not None: hq = hq.filter(SchoolHoliday.academic_period_id == academic_period_id) else: hq = hq.filter(SchoolHoliday.academic_period_id.is_(None)) holidays = hq.all() holiday_dates = set() for h in holidays: d = h.start_date while d <= h.end_date: holiday_dates.add(d) d = d + timedelta(days=1) for ev in events: session.query(EventException).filter( EventException.event_id == ev.id, EventException.is_skipped == True, # noqa: E712 EventException.override_title.is_(None), EventException.override_description.is_(None), EventException.override_start.is_(None), EventException.override_end.is_(None), ).delete(synchronize_session=False) if not holiday_dates: continue try: dtstart = ev.start.astimezone(UTC) r = rrulestr(ev.recurrence_rule, dtstart=dtstart) window_start = dtstart window_end = ( ev.recurrence_end.astimezone(UTC) if ev.recurrence_end else dtstart.replace(year=dtstart.year + 1) ) for occ_start in r.between(window_start, window_end, inc=True): occ_date = occ_start.date() if occ_date in holiday_dates: session.add(EventException( event_id=ev.id, exception_date=occ_date, is_skipped=True, )) except Exception: pass # malformed recurrence rule — skip silently return len(events) def _parse_academic_period_id(raw_value): if raw_value in (None, ""): return None try: return int(raw_value) except (TypeError, ValueError) as exc: raise ValueError("Invalid academicPeriodId") from exc def _validate_holiday_dates_within_period(period, start_date, end_date, label="Ferienblock"): if period is None or start_date is None or end_date is None: return if start_date < period.start_date or end_date > period.end_date: period_name = period.display_name or period.name raise ValueError( f"{label} liegt außerhalb der akademischen Periode \"{period_name}\" " f"({period.start_date.isoformat()} bis {period.end_date.isoformat()})" ) def _normalize_optional_text(value): normalized = (value or "").strip() return normalized or None def _apply_period_filter(query, academic_period_id): if academic_period_id is None: return query.filter(SchoolHoliday.academic_period_id.is_(None)) return query.filter(SchoolHoliday.academic_period_id == academic_period_id) def _identity_key(name, region): normalized_name = _normalize_optional_text(name) or "" normalized_region = _normalize_optional_text(region) or "" return normalized_name.casefold(), normalized_region.casefold() def _is_same_identity(holiday, name, region): return _identity_key(holiday.name, holiday.region) == _identity_key(name, region) def _find_overlapping_holidays(session, academic_period_id, start_date, end_date, exclude_id=None): query = _apply_period_filter(session.query(SchoolHoliday), academic_period_id).filter( SchoolHoliday.start_date <= end_date + timedelta(days=1), SchoolHoliday.end_date >= start_date - timedelta(days=1), ) if exclude_id is not None: query = query.filter(SchoolHoliday.id != exclude_id) return query.order_by(SchoolHoliday.start_date.asc(), SchoolHoliday.id.asc()).all() def _split_overlap_candidates(overlaps, name, region): same_identity = [holiday for holiday in overlaps if _is_same_identity(holiday, name, region)] conflicts = [holiday for holiday in overlaps if not _is_same_identity(holiday, name, region)] return same_identity, conflicts def _merge_holiday_group(session, keeper, others, name, start_date, end_date, region, source_file_name=None): all_starts = [start_date, keeper.start_date, *[holiday.start_date for holiday in others]] all_ends = [end_date, keeper.end_date, *[holiday.end_date for holiday in others]] keeper.name = _normalize_optional_text(name) or keeper.name keeper.region = _normalize_optional_text(region) keeper.start_date = min(all_starts) keeper.end_date = max(all_ends) if source_file_name is not None: keeper.source_file_name = source_file_name for holiday in others: session.delete(holiday) return keeper def _format_overlap_conflict(label, conflicts): conflict_labels = ", ".join( f'{holiday.name} ({holiday.start_date.isoformat()} bis {holiday.end_date.isoformat()})' for holiday in conflicts[:3] ) suffix = "" if len(conflicts) <= 3 else f" und {len(conflicts) - 3} weitere" return f"{label} überschneidet sich mit bestehenden Ferienblöcken: {conflict_labels}{suffix}" def _find_duplicate_holiday(session, academic_period_id, name, start_date, end_date, region, exclude_id=None): normalized_name = _normalize_optional_text(name) normalized_region = _normalize_optional_text(region) query = session.query(SchoolHoliday).filter( func.lower(SchoolHoliday.name) == normalized_name.casefold(), SchoolHoliday.start_date == start_date, SchoolHoliday.end_date == end_date, ) query = _apply_period_filter(query, academic_period_id) if normalized_region is None: query = query.filter(SchoolHoliday.region.is_(None)) else: query = query.filter(func.lower(SchoolHoliday.region) == normalized_region.casefold()) if exclude_id is not None: query = query.filter(SchoolHoliday.id != exclude_id) return query.first() @holidays_bp.route("", methods=["GET"]) def list_holidays(): session = Session() try: region = request.args.get("region") academic_period_id = _parse_academic_period_id( request.args.get("academicPeriodId") or request.args.get("academic_period_id") ) q = session.query(SchoolHoliday) if region: q = q.filter(SchoolHoliday.region == region) if academic_period_id is not None: q = q.filter(SchoolHoliday.academic_period_id == academic_period_id) rows = q.order_by(SchoolHoliday.start_date.asc(), SchoolHoliday.end_date.asc()).all() data = [r.to_dict() for r in rows] return jsonify({"holidays": data}) except ValueError as exc: return jsonify({"error": str(exc)}), 400 finally: session.close() @holidays_bp.route("/upload", methods=["POST"]) @admin_or_higher def upload_holidays(): """ Accepts a CSV/TXT file upload (multipart/form-data). Supported formats: 1) Headered CSV with columns (case-insensitive): name, start_date, end_date[, region] - Dates: YYYY-MM-DD, DD.MM.YYYY, YYYY/MM/DD, or YYYYMMDD 2) Headerless CSV/TXT lines with columns: [internal, name, start_yyyymmdd, end_yyyymmdd, optional_internal] - Only columns 2-4 are used; 1 and 5 are ignored. """ if "file" not in request.files: return jsonify({"error": "No file part"}), 400 file = request.files["file"] if file.filename == "": return jsonify({"error": "No selected file"}), 400 session = Session() try: raw = file.read() # Try UTF-8 first (strict), then cp1252, then latin-1 as last resort try: content = raw.decode("utf-8") except UnicodeDecodeError: try: content = raw.decode("cp1252") except UnicodeDecodeError: content = raw.decode("latin-1", errors="replace") sniffer = csv.Sniffer() dialect = None try: sample = content[:2048] # Some files may contain a lot of quotes; allow Sniffer to guess delimiter dialect = sniffer.sniff(sample) except Exception: pass def parse_date(s: str): s = (s or "").strip() if not s: return None # Numeric YYYYMMDD if s.isdigit() and len(s) == 8: try: return datetime.strptime(s, "%Y%m%d").date() except ValueError: pass # Common formats for fmt in ("%Y-%m-%d", "%d.%m.%Y", "%Y/%m/%d"): try: return datetime.strptime(s, fmt).date() except ValueError: continue raise ValueError(f"Unsupported date format: {s}") academic_period_id = _parse_academic_period_id( request.form.get("academicPeriodId") or request.form.get("academic_period_id") ) period = None if academic_period_id is not None: period = session.query(AcademicPeriod).get(academic_period_id) if not period: return jsonify({"error": "Academic period not found"}), 404 if period.is_archived: return jsonify({"error": "Cannot import holidays into an archived academic period"}), 409 inserted = 0 updated = 0 merged_overlaps = 0 skipped_duplicates = 0 conflicts = [] def build_exact_key(name, start_date, end_date, region): normalized_name = _normalize_optional_text(name) normalized_region = _normalize_optional_text(region) return ( (normalized_name or "").casefold(), start_date, end_date, (normalized_region or "").casefold(), ) seen_in_file = set() # First, try headered CSV via DictReader dict_reader = csv.DictReader(io.StringIO( content), dialect=dialect) if dialect else csv.DictReader(io.StringIO(content)) fieldnames_lower = [h.lower() for h in (dict_reader.fieldnames or [])] has_required_headers = {"name", "start_date", "end_date"}.issubset(set(fieldnames_lower)) def upsert(name: str, start_date, end_date, region=None, source_label="Ferienblock"): nonlocal inserted, updated, merged_overlaps, skipped_duplicates if not name or not start_date or not end_date: return _validate_holiday_dates_within_period(period, start_date, end_date, source_label) normalized_name = _normalize_optional_text(name) normalized_region = _normalize_optional_text(region) key = build_exact_key(normalized_name, start_date, end_date, normalized_region) if key in seen_in_file: skipped_duplicates += 1 return seen_in_file.add(key) duplicate = _find_duplicate_holiday( session, academic_period_id, normalized_name, start_date, end_date, normalized_region, ) if duplicate: duplicate.source_file_name = file.filename updated += 1 return overlaps = _find_overlapping_holidays( session, academic_period_id, start_date, end_date, ) same_identity, conflicting = _split_overlap_candidates(overlaps, normalized_name, normalized_region) if conflicting: conflicts.append(_format_overlap_conflict(source_label, conflicting)) return if same_identity: keeper = same_identity[0] _merge_holiday_group( session, keeper, same_identity[1:], normalized_name, start_date, end_date, normalized_region, source_file_name=file.filename, ) merged_overlaps += 1 return session.add(SchoolHoliday( academic_period_id=academic_period_id, name=normalized_name, start_date=start_date, end_date=end_date, region=normalized_region, source_file_name=file.filename, )) inserted += 1 if has_required_headers: for row in dict_reader: norm = {k.lower(): (v or "").strip() for k, v in row.items()} name = norm.get("name") try: start_date = parse_date(norm.get("start_date")) end_date = parse_date(norm.get("end_date")) except ValueError: # Skip rows with unparseable dates continue region = (norm.get("region") or None) if "region" in norm else None upsert(name, start_date, end_date, region, f"Zeile {dict_reader.line_num}") else: # Fallback: headerless rows -> use columns [1]=name, [2]=start, [3]=end reader = csv.reader(io.StringIO( content), dialect=dialect) if dialect else csv.reader(io.StringIO(content)) for row_index, row in enumerate(reader, start=1): if not row: continue # tolerate varying column counts (4 or 5); ignore first and optional last cols = [c.strip() for c in row] if len(cols) < 4: # Not enough data continue name = cols[1].strip().strip('"') start_raw = cols[2] end_raw = cols[3] try: start_date = parse_date(start_raw) end_date = parse_date(end_raw) except ValueError: continue upsert(name, start_date, end_date, None, f"Zeile {row_index}") session.commit() return jsonify({ "success": True, "inserted": inserted, "updated": updated, "merged_overlaps": merged_overlaps, "skipped_duplicates": skipped_duplicates, "conflicts": conflicts, "academic_period_id": academic_period_id, }) except ValueError as e: session.rollback() return jsonify({"error": str(e)}), 400 except Exception as e: session.rollback() return jsonify({"error": str(e)}), 400 finally: session.close() @holidays_bp.route("", methods=["POST"]) @admin_or_higher def create_holiday(): data = request.json or {} name = _normalize_optional_text(data.get("name")) or "" start_date_str = (data.get("start_date") or "").strip() end_date_str = (data.get("end_date") or "").strip() region = _normalize_optional_text(data.get("region")) if not name or not start_date_str or not end_date_str: return jsonify({"error": "name, start_date und end_date sind erforderlich"}), 400 try: start_date_val = date.fromisoformat(start_date_str) end_date_val = date.fromisoformat(end_date_str) except ValueError: return jsonify({"error": "Ung\u00fcltiges Datumsformat. Erwartet: YYYY-MM-DD"}), 400 if end_date_val < start_date_val: return jsonify({"error": "Enddatum muss nach oder gleich Startdatum sein"}), 400 academic_period_id = _parse_academic_period_id(data.get("academic_period_id")) session = Session() try: period = None if academic_period_id is not None: period = session.query(AcademicPeriod).get(academic_period_id) if not period: return jsonify({"error": "Akademische Periode nicht gefunden"}), 404 if period.is_archived: return jsonify({"error": "Archivierte Perioden k\u00f6nnen nicht bearbeitet werden"}), 409 _validate_holiday_dates_within_period(period, start_date_val, end_date_val) duplicate = _find_duplicate_holiday( session, academic_period_id, name, start_date_val, end_date_val, region, ) if duplicate: return jsonify({"error": "Ein Ferienblock mit diesem Namen und Zeitraum existiert bereits in dieser Periode"}), 409 overlaps = _find_overlapping_holidays(session, academic_period_id, start_date_val, end_date_val) same_identity, conflicting = _split_overlap_candidates(overlaps, name, region) if conflicting: return jsonify({"error": _format_overlap_conflict("Der Ferienblock", conflicting)}), 409 merged = False if same_identity: holiday = _merge_holiday_group( session, same_identity[0], same_identity[1:], name, start_date_val, end_date_val, region, source_file_name="manual", ) merged = True else: holiday = SchoolHoliday( academic_period_id=academic_period_id, name=name, start_date=start_date_val, end_date=end_date_val, region=region, source_file_name="manual", ) session.add(holiday) session.flush() regenerated = _regenerate_for_period(session, academic_period_id) session.commit() return jsonify({"success": True, "holiday": holiday.to_dict(), "regenerated_events": regenerated, "merged": merged}), 201 except IntegrityError: session.rollback() return jsonify({"error": "Ein Ferienblock mit diesem Namen und Zeitraum existiert bereits in dieser Periode"}), 409 except ValueError as e: session.rollback() return jsonify({"error": str(e)}), 400 except Exception as e: session.rollback() return jsonify({"error": str(e)}), 400 finally: session.close() @holidays_bp.route("/", methods=["PUT"]) @admin_or_higher def update_holiday(holiday_id): data = request.json or {} session = Session() try: holiday = session.query(SchoolHoliday).get(holiday_id) if not holiday: return jsonify({"error": "Ferienblock nicht gefunden"}), 404 period = None if holiday.academic_period_id is not None: period = session.query(AcademicPeriod).get(holiday.academic_period_id) if period and period.is_archived: return jsonify({"error": "Archivierte Perioden k\u00f6nnen nicht bearbeitet werden"}), 409 if "name" in data: holiday.name = _normalize_optional_text(data["name"]) or "" if "start_date" in data: try: holiday.start_date = date.fromisoformat((data["start_date"] or "").strip()) except ValueError: return jsonify({"error": "Ung\u00fcltiges Startdatum. Erwartet: YYYY-MM-DD"}), 400 if "end_date" in data: try: holiday.end_date = date.fromisoformat((data["end_date"] or "").strip()) except ValueError: return jsonify({"error": "Ung\u00fcltiges Enddatum. Erwartet: YYYY-MM-DD"}), 400 if "region" in data: holiday.region = _normalize_optional_text(data["region"]) if not holiday.name: return jsonify({"error": "Name darf nicht leer sein"}), 400 if holiday.end_date < holiday.start_date: return jsonify({"error": "Enddatum muss nach oder gleich Startdatum sein"}), 400 _validate_holiday_dates_within_period(period, holiday.start_date, holiday.end_date) duplicate = _find_duplicate_holiday( session, holiday.academic_period_id, holiday.name, holiday.start_date, holiday.end_date, holiday.region, exclude_id=holiday.id, ) if duplicate: return jsonify({"error": "Ein Ferienblock mit diesem Namen und Zeitraum existiert bereits in dieser Periode"}), 409 overlaps = _find_overlapping_holidays( session, holiday.academic_period_id, holiday.start_date, holiday.end_date, exclude_id=holiday.id, ) same_identity, conflicting = _split_overlap_candidates(overlaps, holiday.name, holiday.region) if conflicting: return jsonify({"error": _format_overlap_conflict("Der Ferienblock", conflicting)}), 409 merged = False if same_identity: _merge_holiday_group( session, holiday, same_identity, holiday.name, holiday.start_date, holiday.end_date, holiday.region, source_file_name="manual", ) merged = True session.flush() academic_period_id = holiday.academic_period_id regenerated = _regenerate_for_period(session, academic_period_id) session.commit() return jsonify({"success": True, "holiday": holiday.to_dict(), "regenerated_events": regenerated, "merged": merged}) except IntegrityError: session.rollback() return jsonify({"error": "Ein Ferienblock mit diesem Namen und Zeitraum existiert bereits in dieser Periode"}), 409 except Exception as e: session.rollback() return jsonify({"error": str(e)}), 400 finally: session.close() @holidays_bp.route("/", methods=["DELETE"]) @admin_or_higher def delete_holiday(holiday_id): session = Session() try: holiday = session.query(SchoolHoliday).get(holiday_id) if not holiday: return jsonify({"error": "Ferienblock nicht gefunden"}), 404 if holiday.academic_period_id is not None: period = session.query(AcademicPeriod).get(holiday.academic_period_id) if period and period.is_archived: return jsonify({"error": "Archivierte Perioden k\u00f6nnen nicht bearbeitet werden"}), 409 academic_period_id = holiday.academic_period_id session.delete(holiday) session.flush() regenerated = _regenerate_for_period(session, academic_period_id) session.commit() return jsonify({"success": True, "regenerated_events": regenerated}) except Exception as e: session.rollback() return jsonify({"error": str(e)}), 400 finally: session.close()