feat(conversions): end-to-end PPT/PPTX/ODP -> PDF pipeline with RQ worker + Gotenberg
DB/model Add Conversion model + ConversionStatus enum (pending, processing, ready, failed) Alembic migrations: create conversions table, indexes, unique (source_event_media_id, target_format, file_hash), and NOT NULL on file_hash API Enqueue on upload (ppt|pptx|odp) in routes/eventmedia.py: compute sha256, upsert Conversion, enqueue job New routes: POST /api/conversions/<media_id>/pdf — ensure/enqueue conversion GET /api/conversions/<media_id>/status — latest status/details GET /api/files/converted/<path> — serve converted PDFs Register conversions blueprint in wsgi Worker server/worker.py: convert_event_media_to_pdf Calls Gotenberg /forms/libreoffice/convert, writes to server/media/converted/ Updates Conversion status, timestamps, error messages Fix media root resolution to /server/media Prefer function enqueue over string path; expose server.worker in package init for RQ string compatibility Queue/infra server/task_queue.py: RQ queue helper (REDIS_URL, default redis://redis:6379/0) docker-compose: Add redis and gotenberg services Add worker service (rq worker conversions) Pass REDIS_URL and GOTENBERG_URL to server/worker Mount shared media volume in prod for API/worker parity docker-compose.override: Add dev redis/gotenberg/worker services Ensure PYTHONPATH + working_dir allow importing server.worker Use rq CLI instead of python -m rq for worker Dashboard dev: run as appropriate user/root and pre-create/chown caches to avoid EACCES Dashboard dev UX Vite: set cacheDir .vite to avoid EACCES in node_modules Disable Node inspector by default to avoid port conflicts Docs Update copilot-instructions.md with conversion system: flow, services, env vars, endpoints, storage paths, and data model
This commit is contained in:
@@ -0,0 +1,8 @@
|
||||
"""Server package initializer.
|
||||
|
||||
Expose submodules required by external importers (e.g., RQ string paths).
|
||||
"""
|
||||
|
||||
# Ensure 'server.worker' is available as an attribute of the 'server' package
|
||||
# so that RQ can resolve 'server.worker.convert_event_media_to_pdf'.
|
||||
from . import worker # noqa: F401
|
||||
|
||||
@@ -0,0 +1,28 @@
|
||||
"""merge heads after conversions
|
||||
|
||||
Revision ID: 2b627d0885c3
|
||||
Revises: 5b3c1a2f8d10, 8d1df7199cb7
|
||||
Create Date: 2025-10-06 20:27:53.974926
|
||||
|
||||
"""
|
||||
from typing import Sequence, Union
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision: str = '2b627d0885c3'
|
||||
down_revision: Union[str, None] = ('5b3c1a2f8d10', '8d1df7199cb7')
|
||||
branch_labels: Union[str, Sequence[str], None] = None
|
||||
depends_on: Union[str, Sequence[str], None] = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
"""Upgrade schema."""
|
||||
pass
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
"""Downgrade schema."""
|
||||
pass
|
||||
@@ -0,0 +1,53 @@
|
||||
"""Add conversions table
|
||||
|
||||
Revision ID: 5b3c1a2f8d10
|
||||
Revises: e6eaede720aa
|
||||
Create Date: 2025-10-06 12:00:00.000000
|
||||
|
||||
"""
|
||||
from typing import Sequence, Union
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision: str = '5b3c1a2f8d10'
|
||||
down_revision: Union[str, None] = 'e6eaede720aa'
|
||||
branch_labels: Union[str, Sequence[str], None] = None
|
||||
depends_on: Union[str, Sequence[str], None] = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
op.create_table(
|
||||
'conversions',
|
||||
sa.Column('id', sa.Integer(), primary_key=True, autoincrement=True),
|
||||
sa.Column('source_event_media_id', sa.Integer(), nullable=False),
|
||||
sa.Column('target_format', sa.String(length=10), nullable=False),
|
||||
sa.Column('target_path', sa.String(length=512), nullable=True),
|
||||
sa.Column('status', sa.Enum('pending', 'processing', 'ready', 'failed', name='conversionstatus'),
|
||||
nullable=False, server_default='pending'),
|
||||
sa.Column('file_hash', sa.String(length=64), nullable=True),
|
||||
sa.Column('started_at', sa.TIMESTAMP(timezone=True), nullable=True),
|
||||
sa.Column('completed_at', sa.TIMESTAMP(timezone=True), nullable=True),
|
||||
sa.Column('error_message', sa.Text(), nullable=True),
|
||||
sa.ForeignKeyConstraint(['source_event_media_id'], ['event_media.id'],
|
||||
name='fk_conversions_event_media', ondelete='CASCADE'),
|
||||
)
|
||||
|
||||
op.create_index('ix_conv_source_event_media_id', 'conversions', ['source_event_media_id'])
|
||||
op.create_index('ix_conversions_target_format', 'conversions', ['target_format'])
|
||||
op.create_index('ix_conv_status_target', 'conversions', ['status', 'target_format'])
|
||||
op.create_index('ix_conv_source_target', 'conversions', ['source_event_media_id', 'target_format'])
|
||||
|
||||
op.create_unique_constraint('uq_conv_source_target_hash', 'conversions',
|
||||
['source_event_media_id', 'target_format', 'file_hash'])
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
op.drop_constraint('uq_conv_source_target_hash', 'conversions', type_='unique')
|
||||
op.drop_index('ix_conv_source_target', table_name='conversions')
|
||||
op.drop_index('ix_conv_status_target', table_name='conversions')
|
||||
op.drop_index('ix_conversions_target_format', table_name='conversions')
|
||||
op.drop_index('ix_conv_source_event_media_id', table_name='conversions')
|
||||
op.drop_table('conversions')
|
||||
@@ -0,0 +1,40 @@
|
||||
"""Make conversions.file_hash NOT NULL
|
||||
|
||||
Revision ID: b5a6c3d4e7f8
|
||||
Revises: 2b627d0885c3
|
||||
Create Date: 2025-10-06 21:05:00.000000
|
||||
|
||||
"""
|
||||
from typing import Sequence, Union
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision: str = "b5a6c3d4e7f8"
|
||||
down_revision: Union[str, None] = "2b627d0885c3"
|
||||
branch_labels: Union[str, Sequence[str], None] = None
|
||||
depends_on: Union[str, Sequence[str], None] = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
# Ensure no NULLs remain before altering nullability
|
||||
op.execute("UPDATE conversions SET file_hash = '' WHERE file_hash IS NULL")
|
||||
op.alter_column(
|
||||
"conversions",
|
||||
"file_hash",
|
||||
existing_type=sa.String(length=64),
|
||||
nullable=False,
|
||||
existing_nullable=True,
|
||||
)
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
op.alter_column(
|
||||
"conversions",
|
||||
"file_hash",
|
||||
existing_type=sa.String(length=64),
|
||||
nullable=True,
|
||||
existing_nullable=False,
|
||||
)
|
||||
@@ -6,3 +6,6 @@ python-dotenv>=1.1.0
|
||||
SQLAlchemy>=2.0.41
|
||||
flask
|
||||
gunicorn
|
||||
redis>=5.0.1
|
||||
rq>=1.16.2
|
||||
requests>=2.32.3
|
||||
|
||||
94
server/routes/conversions.py
Normal file
94
server/routes/conversions.py
Normal file
@@ -0,0 +1,94 @@
|
||||
from flask import Blueprint, jsonify, request
|
||||
from server.database import Session
|
||||
from models.models import Conversion, ConversionStatus, EventMedia, MediaType
|
||||
from server.task_queue import get_queue
|
||||
from server.worker import convert_event_media_to_pdf
|
||||
from datetime import datetime, timezone
|
||||
import hashlib
|
||||
|
||||
conversions_bp = Blueprint("conversions", __name__,
|
||||
url_prefix="/api/conversions")
|
||||
|
||||
|
||||
def sha256_file(abs_path: str) -> str:
|
||||
h = hashlib.sha256()
|
||||
with open(abs_path, "rb") as f:
|
||||
for chunk in iter(lambda: f.read(8192), b""):
|
||||
h.update(chunk)
|
||||
return h.hexdigest()
|
||||
|
||||
|
||||
@conversions_bp.route("/<int:media_id>/pdf", methods=["POST"])
|
||||
def ensure_conversion(media_id: int):
|
||||
session = Session()
|
||||
try:
|
||||
media = session.query(EventMedia).get(media_id)
|
||||
if not media or not media.file_path:
|
||||
return jsonify({"error": "Media not found or no file"}), 404
|
||||
|
||||
# Only enqueue for office presentation formats
|
||||
if media.media_type not in {MediaType.ppt, MediaType.pptx, MediaType.odp}:
|
||||
return jsonify({"message": "No conversion required for this media_type"}), 200
|
||||
|
||||
# Compute file hash
|
||||
import os
|
||||
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||
media_root = os.path.join(base_dir, "media")
|
||||
abs_source = os.path.join(media_root, media.file_path)
|
||||
file_hash = sha256_file(abs_source)
|
||||
|
||||
# Find or create conversion row
|
||||
conv = (
|
||||
session.query(Conversion)
|
||||
.filter_by(
|
||||
source_event_media_id=media.id,
|
||||
target_format="pdf",
|
||||
file_hash=file_hash,
|
||||
)
|
||||
.one_or_none()
|
||||
)
|
||||
if not conv:
|
||||
conv = Conversion(
|
||||
source_event_media_id=media.id,
|
||||
target_format="pdf",
|
||||
status=ConversionStatus.pending,
|
||||
file_hash=file_hash,
|
||||
)
|
||||
session.add(conv)
|
||||
session.commit()
|
||||
|
||||
# Enqueue if not already processing/ready
|
||||
if conv.status in {ConversionStatus.pending, ConversionStatus.failed}:
|
||||
q = get_queue()
|
||||
job = q.enqueue(convert_event_media_to_pdf, conv.id)
|
||||
return jsonify({"id": conv.id, "status": conv.status.value, "job_id": job.get_id()}), 202
|
||||
else:
|
||||
return jsonify({"id": conv.id, "status": conv.status.value, "target_path": conv.target_path}), 200
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
|
||||
@conversions_bp.route("/<int:media_id>/status", methods=["GET"])
|
||||
def conversion_status(media_id: int):
|
||||
session = Session()
|
||||
try:
|
||||
conv = (
|
||||
session.query(Conversion)
|
||||
.filter_by(source_event_media_id=media_id, target_format="pdf")
|
||||
.order_by(Conversion.id.desc())
|
||||
.first()
|
||||
)
|
||||
if not conv:
|
||||
return jsonify({"status": "missing"}), 404
|
||||
return jsonify(
|
||||
{
|
||||
"id": conv.id,
|
||||
"status": conv.status.value,
|
||||
"target_path": conv.target_path,
|
||||
"started_at": conv.started_at.isoformat() if conv.started_at else None,
|
||||
"completed_at": conv.completed_at.isoformat() if conv.completed_at else None,
|
||||
"error_message": conv.error_message,
|
||||
}
|
||||
)
|
||||
finally:
|
||||
session.close()
|
||||
@@ -1,7 +1,10 @@
|
||||
from re import A
|
||||
from flask import Blueprint, request, jsonify, send_from_directory
|
||||
from server.database import Session
|
||||
from models.models import EventMedia, MediaType
|
||||
from models.models import EventMedia, MediaType, Conversion, ConversionStatus
|
||||
from server.task_queue import get_queue
|
||||
from server.worker import convert_event_media_to_pdf
|
||||
import hashlib
|
||||
import os
|
||||
|
||||
eventmedia_bp = Blueprint('eventmedia', __name__, url_prefix='/api/eventmedia')
|
||||
@@ -134,6 +137,41 @@ def filemanager_upload():
|
||||
uploaded_at=datetime.now(timezone.utc)
|
||||
)
|
||||
session.add(media)
|
||||
session.commit()
|
||||
|
||||
# Enqueue conversion for office presentation types
|
||||
if media_type in {MediaType.ppt, MediaType.pptx, MediaType.odp}:
|
||||
# compute file hash
|
||||
h = hashlib.sha256()
|
||||
with open(file_path, 'rb') as f:
|
||||
for chunk in iter(lambda: f.read(8192), b""):
|
||||
h.update(chunk)
|
||||
file_hash = h.hexdigest()
|
||||
|
||||
# upsert Conversion row
|
||||
conv = (
|
||||
session.query(Conversion)
|
||||
.filter_by(
|
||||
source_event_media_id=media.id,
|
||||
target_format='pdf',
|
||||
file_hash=file_hash,
|
||||
)
|
||||
.one_or_none()
|
||||
)
|
||||
if not conv:
|
||||
conv = Conversion(
|
||||
source_event_media_id=media.id,
|
||||
target_format='pdf',
|
||||
status=ConversionStatus.pending,
|
||||
file_hash=file_hash,
|
||||
)
|
||||
session.add(conv)
|
||||
session.commit()
|
||||
|
||||
if conv.status in {ConversionStatus.pending, ConversionStatus.failed}:
|
||||
q = get_queue()
|
||||
q.enqueue(convert_event_media_to_pdf, conv.id)
|
||||
|
||||
session.commit()
|
||||
return jsonify({'success': True})
|
||||
|
||||
|
||||
@@ -55,3 +55,14 @@ def download_media_file(media_id: int, filename: str):
|
||||
served_name = os.path.basename(abs_path)
|
||||
session.close()
|
||||
return send_from_directory(directory, served_name, as_attachment=True)
|
||||
|
||||
|
||||
@files_bp.route("/converted/<path:relpath>", methods=["GET"])
|
||||
def download_converted(relpath: str):
|
||||
"""Serve converted files (e.g., PDFs) relative to media/converted."""
|
||||
abs_path = os.path.join(MEDIA_ROOT, relpath)
|
||||
if not abs_path.startswith(MEDIA_ROOT):
|
||||
return jsonify({"error": "Invalid path"}), 400
|
||||
if not os.path.isfile(abs_path):
|
||||
return jsonify({"error": "File not found"}), 404
|
||||
return send_from_directory(os.path.dirname(abs_path), os.path.basename(abs_path), as_attachment=True)
|
||||
|
||||
15
server/rq_worker.py
Normal file
15
server/rq_worker.py
Normal file
@@ -0,0 +1,15 @@
|
||||
import os
|
||||
from rq import Worker
|
||||
from server.task_queue import get_queue, get_redis_url
|
||||
import redis
|
||||
|
||||
|
||||
def main():
|
||||
conn = redis.from_url(get_redis_url())
|
||||
# Single queue named 'conversions'
|
||||
w = Worker([get_queue().name], connection=conn)
|
||||
w.work(with_scheduler=True)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
14
server/task_queue.py
Normal file
14
server/task_queue.py
Normal file
@@ -0,0 +1,14 @@
|
||||
import os
|
||||
import redis
|
||||
from rq import Queue
|
||||
|
||||
|
||||
def get_redis_url() -> str:
|
||||
# Default to local Redis service name in compose network
|
||||
return os.getenv("REDIS_URL", "redis://redis:6379/0")
|
||||
|
||||
|
||||
def get_queue(name: str = "conversions") -> Queue:
|
||||
conn = redis.from_url(get_redis_url())
|
||||
# 10 minutes default
|
||||
return Queue(name, connection=conn, default_timeout=600)
|
||||
94
server/worker.py
Normal file
94
server/worker.py
Normal file
@@ -0,0 +1,94 @@
|
||||
import os
|
||||
import traceback
|
||||
from datetime import datetime, timezone
|
||||
|
||||
import requests
|
||||
from sqlalchemy.orm import Session as SASession
|
||||
|
||||
from server.database import Session
|
||||
from models.models import Conversion, ConversionStatus, EventMedia, MediaType
|
||||
|
||||
GOTENBERG_URL = os.getenv("GOTENBERG_URL", "http://gotenberg:3000")
|
||||
|
||||
|
||||
def _now():
|
||||
return datetime.now(timezone.utc)
|
||||
|
||||
|
||||
def convert_event_media_to_pdf(conversion_id: int):
|
||||
"""
|
||||
Job entry point: convert a single EventMedia to PDF using Gotenberg.
|
||||
|
||||
Steps:
|
||||
- Load conversion + source media
|
||||
- Set status=processing, started_at
|
||||
- POST to Gotenberg /forms/libreoffice/convert with the source file bytes
|
||||
- Save response bytes to target_path
|
||||
- Set status=ready, completed_at, target_path
|
||||
- On error: set status=failed, error_message
|
||||
"""
|
||||
session: SASession = Session()
|
||||
try:
|
||||
conv: Conversion = session.query(Conversion).get(conversion_id)
|
||||
if not conv:
|
||||
return
|
||||
|
||||
media: EventMedia = session.query(
|
||||
EventMedia).get(conv.source_event_media_id)
|
||||
if not media or not media.file_path:
|
||||
conv.status = ConversionStatus.failed
|
||||
conv.error_message = "Source media or file_path missing"
|
||||
conv.completed_at = _now()
|
||||
session.commit()
|
||||
return
|
||||
|
||||
conv.status = ConversionStatus.processing
|
||||
conv.started_at = _now()
|
||||
session.commit()
|
||||
|
||||
# Get the server directory (where this worker.py file is located)
|
||||
server_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
media_root = os.path.join(server_dir, "media")
|
||||
abs_source = os.path.join(media_root, media.file_path)
|
||||
# Output target under media/converted
|
||||
converted_dir = os.path.join(media_root, "converted")
|
||||
os.makedirs(converted_dir, exist_ok=True)
|
||||
filename_wo_ext = os.path.splitext(
|
||||
os.path.basename(media.file_path))[0]
|
||||
pdf_name = f"{filename_wo_ext}.pdf"
|
||||
abs_target = os.path.join(converted_dir, pdf_name)
|
||||
|
||||
# Send to Gotenberg
|
||||
with open(abs_source, "rb") as f:
|
||||
files = {"files": (os.path.basename(abs_source), f)}
|
||||
resp = requests.post(
|
||||
f"{GOTENBERG_URL}/forms/libreoffice/convert",
|
||||
files=files,
|
||||
timeout=600,
|
||||
)
|
||||
resp.raise_for_status()
|
||||
|
||||
with open(abs_target, "wb") as out:
|
||||
out.write(resp.content)
|
||||
|
||||
conv.status = ConversionStatus.ready
|
||||
# Store relative path under media/
|
||||
conv.target_path = os.path.relpath(abs_target, media_root)
|
||||
conv.completed_at = _now()
|
||||
session.commit()
|
||||
except requests.exceptions.Timeout:
|
||||
conv = session.query(Conversion).get(conversion_id)
|
||||
if conv:
|
||||
conv.status = ConversionStatus.failed
|
||||
conv.error_message = "Conversion timeout"
|
||||
conv.completed_at = _now()
|
||||
session.commit()
|
||||
except Exception as e:
|
||||
conv = session.query(Conversion).get(conversion_id)
|
||||
if conv:
|
||||
conv.status = ConversionStatus.failed
|
||||
conv.error_message = f"{e}\n{traceback.format_exc()}"
|
||||
conv.completed_at = _now()
|
||||
session.commit()
|
||||
finally:
|
||||
session.close()
|
||||
@@ -2,6 +2,7 @@
|
||||
from server.routes.eventmedia import eventmedia_bp
|
||||
from server.routes.files import files_bp
|
||||
from server.routes.events import events_bp
|
||||
from server.routes.conversions import conversions_bp
|
||||
from server.routes.holidays import holidays_bp
|
||||
from server.routes.academic_periods import academic_periods_bp
|
||||
from server.routes.groups import groups_bp
|
||||
@@ -24,6 +25,7 @@ app.register_blueprint(eventmedia_bp)
|
||||
app.register_blueprint(files_bp)
|
||||
app.register_blueprint(holidays_bp)
|
||||
app.register_blueprint(academic_periods_bp)
|
||||
app.register_blueprint(conversions_bp)
|
||||
|
||||
|
||||
@app.route("/health")
|
||||
|
||||
Reference in New Issue
Block a user