HDMI-CEC: auto-disable in dev mode + docs/tests synced
README: document dev-mode CEC auto-disable; add testing notes; set CEC_POWER_OFF_WAIT=5 Copilot instructions: add dev-mode CEC behavior and test script guidance test-hdmi-cec.sh: respect ENV=development (skip option 5), explicit .env load, ASCII output, 30s wait display_manager: remove emoji from logs to avoid Unicode errors
This commit is contained in:
@@ -45,6 +45,16 @@ CHECK_INTERVAL = int(os.getenv("DISPLAY_CHECK_INTERVAL", "5")) # seconds
|
||||
PRESENTATION_DIR = os.path.join(os.path.dirname(__file__), "presentation")
|
||||
EVENT_FILE = os.path.join(os.path.dirname(__file__), "current_event.json")
|
||||
|
||||
# HDMI-CEC Configuration
|
||||
# Note: CEC is automatically disabled in development mode to avoid constantly switching TV on/off
|
||||
CEC_ENABLED = os.getenv("CEC_ENABLED", "true").lower() in ("true", "1", "yes")
|
||||
if ENV == "development":
|
||||
CEC_ENABLED = False # Override: disable CEC in development mode
|
||||
CEC_DEVICE = os.getenv("CEC_DEVICE", "TV") # Target device name (TV, 0, etc.)
|
||||
CEC_TURN_OFF_DELAY = int(os.getenv("CEC_TURN_OFF_DELAY", "30")) # seconds after last event ends
|
||||
CEC_POWER_ON_WAIT = int(os.getenv("CEC_POWER_ON_WAIT", "3")) # seconds to wait after turning TV on
|
||||
CEC_POWER_OFF_WAIT = int(os.getenv("CEC_POWER_OFF_WAIT", "2")) # seconds to wait after turning TV off
|
||||
|
||||
# Setup logging
|
||||
LOG_PATH = os.path.join(os.path.dirname(__file__), "..", "logs", "display_manager.log")
|
||||
os.makedirs(os.path.dirname(LOG_PATH), exist_ok=True)
|
||||
@@ -58,6 +68,247 @@ logging.basicConfig(
|
||||
]
|
||||
)
|
||||
|
||||
# Log CEC mode after logging is configured
|
||||
if ENV == "development":
|
||||
logging.info("[DEV MODE] HDMI-CEC automatically disabled (TV control off)")
|
||||
elif CEC_ENABLED:
|
||||
logging.info(f"[CEC] HDMI-CEC enabled: TV control active (device: {CEC_DEVICE})")
|
||||
else:
|
||||
logging.info("[CEC] HDMI-CEC disabled in configuration")
|
||||
|
||||
|
||||
class HDMICECController:
|
||||
"""Controls HDMI-CEC to turn TV on/off automatically
|
||||
|
||||
Uses cec-client from libcec to send CEC commands to the connected TV.
|
||||
Automatically turns TV on when events start and off when events end (with configurable delay).
|
||||
"""
|
||||
|
||||
def __init__(self, enabled: bool = True, device: str = "TV", turn_off_delay: int = 30,
|
||||
power_on_wait: int = 3, power_off_wait: int = 2):
|
||||
"""
|
||||
Args:
|
||||
enabled: Whether CEC control is enabled
|
||||
device: Target CEC device (TV, 0, etc.)
|
||||
turn_off_delay: Seconds to wait after last event ends before turning off TV
|
||||
power_on_wait: Seconds to wait after sending power ON command (for TV to boot)
|
||||
power_off_wait: Seconds to wait after sending power OFF command
|
||||
"""
|
||||
self.enabled = enabled
|
||||
self.device = device
|
||||
self.turn_off_delay = turn_off_delay
|
||||
self.power_on_wait = power_on_wait
|
||||
self.power_off_wait = power_off_wait
|
||||
self.tv_state = None # None = unknown, True = on, False = off
|
||||
self.turn_off_timer = None
|
||||
|
||||
if not self.enabled:
|
||||
logging.info("HDMI-CEC control disabled")
|
||||
return
|
||||
|
||||
# Check if cec-client is available
|
||||
if not self._check_cec_available():
|
||||
logging.warning("cec-client not found - HDMI-CEC control disabled")
|
||||
logging.info("Install with: sudo apt-get install cec-utils")
|
||||
self.enabled = False
|
||||
return
|
||||
|
||||
logging.info(f"HDMI-CEC controller initialized (device: {self.device}, turn_off_delay: {self.turn_off_delay}s)")
|
||||
|
||||
# Try to detect current TV state
|
||||
self._detect_tv_state()
|
||||
|
||||
def _check_cec_available(self) -> bool:
|
||||
"""Check if cec-client command is available"""
|
||||
try:
|
||||
subprocess.run(
|
||||
['which', 'cec-client'],
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
check=True
|
||||
)
|
||||
return True
|
||||
except subprocess.CalledProcessError:
|
||||
return False
|
||||
|
||||
def _run_cec_command(self, command: str, timeout: int = 10) -> bool:
|
||||
"""Run a CEC command via cec-client
|
||||
|
||||
Args:
|
||||
command: CEC command to send (e.g., 'on 0', 'standby 0')
|
||||
timeout: Command timeout in seconds
|
||||
|
||||
Returns:
|
||||
True if command succeeded, False otherwise
|
||||
"""
|
||||
if not self.enabled:
|
||||
return False
|
||||
|
||||
try:
|
||||
# Use echo to pipe command to cec-client
|
||||
# cec-client -s -d 1 means: single command mode, log level 1
|
||||
result = subprocess.run(
|
||||
f'echo "{command}" | cec-client -s -d 1',
|
||||
shell=True,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
timeout=timeout,
|
||||
check=False
|
||||
)
|
||||
|
||||
output = result.stdout.decode('utf-8', errors='ignore')
|
||||
|
||||
# Check for common success indicators in output
|
||||
success = (
|
||||
result.returncode == 0 or
|
||||
'power status changed' in output.lower() or
|
||||
'power on' in output.lower() or
|
||||
'standby' in output.lower()
|
||||
)
|
||||
|
||||
if success:
|
||||
logging.debug(f"CEC command '{command}' executed successfully")
|
||||
else:
|
||||
logging.warning(f"CEC command '{command}' may have failed (rc={result.returncode})")
|
||||
|
||||
return success
|
||||
|
||||
except subprocess.TimeoutExpired:
|
||||
logging.error(f"CEC command '{command}' timed out after {timeout}s")
|
||||
return False
|
||||
except Exception as e:
|
||||
logging.error(f"Error running CEC command '{command}': {e}")
|
||||
return False
|
||||
|
||||
def _detect_tv_state(self):
|
||||
"""Try to detect current TV power state"""
|
||||
if not self.enabled:
|
||||
return
|
||||
|
||||
try:
|
||||
# Query power status of device 0 (TV)
|
||||
result = subprocess.run(
|
||||
'echo "pow 0" | cec-client -s -d 1',
|
||||
shell=True,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
timeout=5,
|
||||
check=False
|
||||
)
|
||||
|
||||
output = result.stdout.decode('utf-8', errors='ignore').lower()
|
||||
|
||||
if 'power status: on' in output or 'power status: 0' in output:
|
||||
self.tv_state = True
|
||||
logging.info("TV detected as ON")
|
||||
elif 'power status: standby' in output or 'power status: 1' in output:
|
||||
self.tv_state = False
|
||||
logging.info("TV detected as STANDBY/OFF")
|
||||
else:
|
||||
logging.debug(f"Could not detect TV state. Output: {output[:200]}")
|
||||
|
||||
except Exception as e:
|
||||
logging.debug(f"Could not detect TV state: {e}")
|
||||
|
||||
def turn_on(self) -> bool:
|
||||
"""Turn TV on via HDMI-CEC
|
||||
|
||||
Returns:
|
||||
True if command succeeded or TV was already on
|
||||
"""
|
||||
if not self.enabled:
|
||||
return False
|
||||
|
||||
# Cancel any pending turn-off timer
|
||||
if self.turn_off_timer:
|
||||
self.turn_off_timer.cancel()
|
||||
self.turn_off_timer = None
|
||||
logging.debug("Cancelled pending TV turn-off timer")
|
||||
|
||||
# Skip if TV is already on
|
||||
if self.tv_state is True:
|
||||
logging.debug("TV already on, skipping CEC command")
|
||||
return True
|
||||
|
||||
logging.info("Turning TV ON via HDMI-CEC...")
|
||||
|
||||
# Send power on command to device 0 (TV)
|
||||
success = self._run_cec_command(f'on {self.device}')
|
||||
|
||||
if success:
|
||||
self.tv_state = True
|
||||
logging.info("TV turned ON successfully")
|
||||
# Give TV time to actually power on (TVs can be slow)
|
||||
if self.power_on_wait > 0:
|
||||
logging.debug(f"Waiting {self.power_on_wait} seconds for TV to power on...")
|
||||
import time
|
||||
time.sleep(self.power_on_wait)
|
||||
else:
|
||||
logging.warning("Failed to turn TV ON")
|
||||
|
||||
return success
|
||||
|
||||
def turn_off(self, delayed: bool = False) -> bool:
|
||||
"""Turn TV off via HDMI-CEC
|
||||
|
||||
Args:
|
||||
delayed: If True, uses configured delay before turning off
|
||||
|
||||
Returns:
|
||||
True if command succeeded or was scheduled
|
||||
"""
|
||||
if not self.enabled:
|
||||
return False
|
||||
|
||||
if delayed and self.turn_off_delay > 0:
|
||||
# Schedule delayed turn-off
|
||||
if self.turn_off_timer:
|
||||
self.turn_off_timer.cancel()
|
||||
|
||||
logging.info(f"Scheduling TV turn-off in {self.turn_off_delay}s...")
|
||||
self.turn_off_timer = threading.Timer(
|
||||
self.turn_off_delay,
|
||||
self._turn_off_now
|
||||
)
|
||||
self.turn_off_timer.daemon = True
|
||||
self.turn_off_timer.start()
|
||||
return True
|
||||
else:
|
||||
# Immediate turn-off
|
||||
return self._turn_off_now()
|
||||
|
||||
def _turn_off_now(self) -> bool:
|
||||
"""Internal method to turn TV off immediately"""
|
||||
# Skip if TV is already off
|
||||
if self.tv_state is False:
|
||||
logging.debug("TV already off, skipping CEC command")
|
||||
return True
|
||||
|
||||
logging.info("Turning TV OFF via HDMI-CEC...")
|
||||
|
||||
# Send standby command to device 0 (TV)
|
||||
success = self._run_cec_command(f'standby {self.device}')
|
||||
|
||||
if success:
|
||||
self.tv_state = False
|
||||
logging.info("TV turned OFF successfully")
|
||||
# Give TV time to actually power off
|
||||
if self.power_off_wait > 0:
|
||||
logging.debug(f"Waiting {self.power_off_wait} seconds for TV to power off...")
|
||||
import time
|
||||
time.sleep(self.power_off_wait)
|
||||
else:
|
||||
logging.warning("Failed to turn TV OFF")
|
||||
|
||||
return success
|
||||
|
||||
def cancel_turn_off(self):
|
||||
"""Cancel any pending turn-off timer"""
|
||||
if self.turn_off_timer:
|
||||
self.turn_off_timer.cancel()
|
||||
self.turn_off_timer = None
|
||||
logging.debug("Cancelled TV turn-off timer")
|
||||
|
||||
|
||||
class DisplayProcess:
|
||||
"""Manages a running display application process"""
|
||||
@@ -216,6 +467,15 @@ class DisplayManager:
|
||||
self.last_file_mtime: Optional[float] = None
|
||||
self.running = True
|
||||
|
||||
# Initialize HDMI-CEC controller
|
||||
self.cec = HDMICECController(
|
||||
enabled=CEC_ENABLED,
|
||||
device=CEC_DEVICE,
|
||||
turn_off_delay=CEC_TURN_OFF_DELAY,
|
||||
power_on_wait=CEC_POWER_ON_WAIT,
|
||||
power_off_wait=CEC_POWER_OFF_WAIT
|
||||
)
|
||||
|
||||
# Setup signal handlers for graceful shutdown
|
||||
signal.signal(signal.SIGTERM, self._signal_handler)
|
||||
signal.signal(signal.SIGINT, self._signal_handler)
|
||||
@@ -225,6 +485,8 @@ class DisplayManager:
|
||||
logging.info(f"Received signal {signum}, shutting down gracefully...")
|
||||
self.running = False
|
||||
self.stop_current_display()
|
||||
# Turn off TV when shutting down (delayed)
|
||||
self.cec.turn_off(delayed=True)
|
||||
sys.exit(0)
|
||||
|
||||
def read_event_file(self) -> Optional[Dict]:
|
||||
@@ -339,8 +601,12 @@ class DisplayManager:
|
||||
|
||||
return f"unknown_{abs(hash(json.dumps(event))) }"
|
||||
|
||||
def stop_current_display(self):
|
||||
"""Stop the currently running display process"""
|
||||
def stop_current_display(self, turn_off_tv: bool = True):
|
||||
"""Stop the currently running display process
|
||||
|
||||
Args:
|
||||
turn_off_tv: If True, schedule TV turn-off (with delay)
|
||||
"""
|
||||
if self.current_process:
|
||||
logging.info(f"Stopping current display: {self.current_process.event_type}")
|
||||
self.current_process.terminate()
|
||||
@@ -352,6 +618,10 @@ class DisplayManager:
|
||||
|
||||
self.current_process = None
|
||||
self.current_event_data = None
|
||||
|
||||
# Turn off TV when display stops (with configurable delay)
|
||||
if turn_off_tv:
|
||||
self.cec.turn_off(delayed=True)
|
||||
|
||||
def start_presentation(self, event: Dict) -> Optional[DisplayProcess]:
|
||||
"""Start presentation display (PDF/PowerPoint/LibreOffice) using Impressive
|
||||
@@ -1083,17 +1353,22 @@ class DisplayManager:
|
||||
if self.current_process.event_type == 'presentation' and exit_code == 0:
|
||||
logging.info("Presentation process ended with exit code 0 (likely normal completion).")
|
||||
self.current_process = None
|
||||
# Don't turn off TV yet - event might still be active
|
||||
return
|
||||
|
||||
logging.info("Restarting display process...")
|
||||
self.current_process = None
|
||||
# Don't turn off TV when restarting same event
|
||||
else:
|
||||
# Everything is fine, continue
|
||||
# Cancel any pending TV turn-off since event is still active
|
||||
self.cec.cancel_turn_off()
|
||||
return
|
||||
else:
|
||||
# Different event - stop current and start new
|
||||
logging.info(f"Event changed from {self.current_process.event_id} to {event_id}")
|
||||
self.stop_current_display()
|
||||
# Don't turn off TV when switching between events
|
||||
self.stop_current_display(turn_off_tv=False)
|
||||
|
||||
# Start new display
|
||||
logging.info(f"Starting display for event: {event_id}")
|
||||
@@ -1103,6 +1378,9 @@ class DisplayManager:
|
||||
if 'end' in active_event:
|
||||
logging.info(f" Event end time (UTC): {active_event['end']}")
|
||||
|
||||
# Turn on TV before starting display
|
||||
self.cec.turn_on()
|
||||
|
||||
new_process = self.start_display_for_event(active_event)
|
||||
|
||||
if new_process:
|
||||
|
||||
Reference in New Issue
Block a user