160 lines
5.8 KiB
Python
160 lines
5.8 KiB
Python
from flask import Blueprint, request, jsonify
|
|
from server.database import Session
|
|
from models.models import SchoolHoliday
|
|
from datetime import datetime
|
|
import csv
|
|
import io
|
|
|
|
holidays_bp = Blueprint("holidays", __name__, url_prefix="/api/holidays")
|
|
|
|
|
|
@holidays_bp.route("", methods=["GET"])
|
|
def list_holidays():
|
|
session = Session()
|
|
region = request.args.get("region")
|
|
q = session.query(SchoolHoliday)
|
|
if region:
|
|
q = q.filter(SchoolHoliday.region == region)
|
|
rows = q.order_by(SchoolHoliday.start_date.asc()).all()
|
|
data = [r.to_dict() for r in rows]
|
|
session.close()
|
|
return jsonify({"holidays": data})
|
|
|
|
|
|
@holidays_bp.route("/upload", methods=["POST"])
|
|
def upload_holidays():
|
|
"""
|
|
Accepts a CSV/TXT file upload (multipart/form-data).
|
|
|
|
Supported formats:
|
|
1) Headered CSV with columns (case-insensitive): name, start_date, end_date[, region]
|
|
- Dates: YYYY-MM-DD, DD.MM.YYYY, YYYY/MM/DD, or YYYYMMDD
|
|
2) Headerless CSV/TXT lines with columns:
|
|
[internal, name, start_yyyymmdd, end_yyyymmdd, optional_internal]
|
|
- Only columns 2-4 are used; 1 and 5 are ignored.
|
|
"""
|
|
if "file" not in request.files:
|
|
return jsonify({"error": "No file part"}), 400
|
|
file = request.files["file"]
|
|
if file.filename == "":
|
|
return jsonify({"error": "No selected file"}), 400
|
|
|
|
try:
|
|
raw = file.read()
|
|
# Try UTF-8 first (strict), then cp1252, then latin-1 as last resort
|
|
try:
|
|
content = raw.decode("utf-8")
|
|
except UnicodeDecodeError:
|
|
try:
|
|
content = raw.decode("cp1252")
|
|
except UnicodeDecodeError:
|
|
content = raw.decode("latin-1", errors="replace")
|
|
|
|
sniffer = csv.Sniffer()
|
|
dialect = None
|
|
try:
|
|
sample = content[:2048]
|
|
# Some files may contain a lot of quotes; allow Sniffer to guess delimiter
|
|
dialect = sniffer.sniff(sample)
|
|
except Exception:
|
|
pass
|
|
|
|
def parse_date(s: str):
|
|
s = (s or "").strip()
|
|
if not s:
|
|
return None
|
|
# Numeric YYYYMMDD
|
|
if s.isdigit() and len(s) == 8:
|
|
try:
|
|
return datetime.strptime(s, "%Y%m%d").date()
|
|
except ValueError:
|
|
pass
|
|
# Common formats
|
|
for fmt in ("%Y-%m-%d", "%d.%m.%Y", "%Y/%m/%d"):
|
|
try:
|
|
return datetime.strptime(s, fmt).date()
|
|
except ValueError:
|
|
continue
|
|
raise ValueError(f"Unsupported date format: {s}")
|
|
|
|
session = Session()
|
|
inserted = 0
|
|
updated = 0
|
|
|
|
# First, try headered CSV via DictReader
|
|
dict_reader = csv.DictReader(io.StringIO(
|
|
content), dialect=dialect) if dialect else csv.DictReader(io.StringIO(content))
|
|
fieldnames_lower = [h.lower() for h in (dict_reader.fieldnames or [])]
|
|
has_required_headers = {"name", "start_date",
|
|
"end_date"}.issubset(set(fieldnames_lower))
|
|
|
|
def upsert(name: str, start_date, end_date, region=None):
|
|
nonlocal inserted, updated
|
|
if not name or not start_date or not end_date:
|
|
return
|
|
existing = (
|
|
session.query(SchoolHoliday)
|
|
.filter(
|
|
SchoolHoliday.name == name,
|
|
SchoolHoliday.start_date == start_date,
|
|
SchoolHoliday.end_date == end_date,
|
|
SchoolHoliday.region.is_(
|
|
region) if region is None else SchoolHoliday.region == region,
|
|
)
|
|
.first()
|
|
)
|
|
if existing:
|
|
existing.region = region
|
|
existing.source_file_name = file.filename
|
|
updated += 1
|
|
else:
|
|
session.add(SchoolHoliday(
|
|
name=name,
|
|
start_date=start_date,
|
|
end_date=end_date,
|
|
region=region,
|
|
source_file_name=file.filename,
|
|
))
|
|
inserted += 1
|
|
|
|
if has_required_headers:
|
|
for row in dict_reader:
|
|
norm = {k.lower(): (v or "").strip() for k, v in row.items()}
|
|
name = norm.get("name")
|
|
try:
|
|
start_date = parse_date(norm.get("start_date"))
|
|
end_date = parse_date(norm.get("end_date"))
|
|
except ValueError:
|
|
# Skip rows with unparseable dates
|
|
continue
|
|
region = (norm.get("region")
|
|
or None) if "region" in norm else None
|
|
upsert(name, start_date, end_date, region)
|
|
else:
|
|
# Fallback: headerless rows -> use columns [1]=name, [2]=start, [3]=end
|
|
reader = csv.reader(io.StringIO(
|
|
content), dialect=dialect) if dialect else csv.reader(io.StringIO(content))
|
|
for row in reader:
|
|
if not row:
|
|
continue
|
|
# tolerate varying column counts (4 or 5); ignore first and optional last
|
|
cols = [c.strip() for c in row]
|
|
if len(cols) < 4:
|
|
# Not enough data
|
|
continue
|
|
name = cols[1].strip().strip('"')
|
|
start_raw = cols[2]
|
|
end_raw = cols[3]
|
|
try:
|
|
start_date = parse_date(start_raw)
|
|
end_date = parse_date(end_raw)
|
|
except ValueError:
|
|
continue
|
|
upsert(name, start_date, end_date, None)
|
|
|
|
session.commit()
|
|
session.close()
|
|
return jsonify({"success": True, "inserted": inserted, "updated": updated})
|
|
except Exception as e:
|
|
return jsonify({"error": str(e)}), 400
|