Files
infoscreen/server/init_db.py
olaf 1a6faaa104 separation of production and development
environments
Adding new migrations for renaming and adding fields
to the database schema
persistent uuid for simclient
2025-07-16 08:50:42 +00:00

80 lines
2.4 KiB
Python

import bcrypt
from dotenv import load_dotenv
import os
from models.models import Base, User, UserRole, ClientGroup
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine, text
import sys
sys.path.insert(0, '/workspace')
# .env laden
load_dotenv()
# Konfiguration aus .env
DB_USER = os.getenv("DB_USER")
DB_PASSWORD = os.getenv("DB_PASSWORD")
DB_HOST = os.getenv("DB_HOST")
DB_NAME = os.getenv("DB_NAME")
DEFAULT_ADMIN_USERNAME = os.getenv("DEFAULT_ADMIN_USERNAME")
DEFAULT_ADMIN_PASSWORD = os.getenv("DEFAULT_ADMIN_PASSWORD")
# Erst ohne DB verbinden, um sie ggf. zu erstellen
admin_conn_str = f"mysql+pymysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}/"
admin_engine = create_engine(admin_conn_str, echo=True)
with admin_engine.connect() as conn:
conn.execute(text(
f"CREATE DATABASE IF NOT EXISTS {DB_NAME} CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;"))
# Jetzt mit Datenbank verbinden
db_conn_str = f"mysql+pymysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}"
engine = create_engine(db_conn_str, echo=True)
# Tabellen anlegen
# Base.metadata.create_all(engine) # wird von alembic verwaltet
# Session erstellen
Session = sessionmaker(bind=engine)
session = Session()
# Default-ClientGroup anlegen, falls nicht vorhanden
DEFAULT_GROUP_ID = 1
DEFAULT_GROUP_NAME = "Nicht zugeordnet"
existing_group = session.query(ClientGroup).filter_by(
id=DEFAULT_GROUP_ID).first()
if not existing_group:
default_group = ClientGroup(
id=DEFAULT_GROUP_ID, name=DEFAULT_GROUP_NAME, is_active=True)
session.add(default_group)
session.commit()
print(
f"Default-ClientGroup '{DEFAULT_GROUP_NAME}' mit id={DEFAULT_GROUP_ID} wurde angelegt.")
else:
print(
f"Default-ClientGroup '{DEFAULT_GROUP_NAME}' mit id={DEFAULT_GROUP_ID} existiert bereits.")
# Prüfen, ob der User bereits existiert
existing_user = session.query(User).filter_by(
username=DEFAULT_ADMIN_USERNAME).first()
if not existing_user:
# Passwort hashen
hashed_pw = bcrypt.hashpw(
DEFAULT_ADMIN_PASSWORD.encode('utf-8'), bcrypt.gensalt())
# Neuen User anlegen
admin_user = User(
username=DEFAULT_ADMIN_USERNAME,
password_hash=hashed_pw.decode('utf-8'),
role=UserRole.admin
)
session.add(admin_user)
session.commit()
print(f"Admin-User '{DEFAULT_ADMIN_USERNAME}' wurde erfolgreich angelegt.")
else:
print(f"Admin-User '{DEFAULT_ADMIN_USERNAME}' existiert bereits.")
session.close()