"""IMAP connection, IDLE loop, and email fetching.""" import logging import time from collections.abc import Callable from imapclient import IMAPClient from .config import ImapConfig log = logging.getLogger(__name__) IDLE_TIMEOUT = 300 # 5 minutes, per RFC 2177 MAX_BACKOFF = 300 # 5 minutes max backoff def run_idle_loop( config: ImapConfig, on_message: Callable[[bytes], None], shutdown_event=None, ) -> None: """Main IMAP IDLE loop with automatic reconnection. Calls on_message(raw_email_bytes) for each new message received. Runs until shutdown_event is set (if provided) or forever. """ backoff = 1 while not (shutdown_event and shutdown_event.is_set()): try: _idle_session(config, on_message, shutdown_event) backoff = 1 # reset on clean disconnect except Exception: log.error("IMAP connection error, reconnecting in %ds", backoff, exc_info=True) if shutdown_event: shutdown_event.wait(backoff) else: time.sleep(backoff) backoff = min(backoff * 2, MAX_BACKOFF) def _idle_session( config: ImapConfig, on_message: Callable[[bytes], None], shutdown_event=None, ) -> None: """Run a single IMAP IDLE session until disconnection.""" client = IMAPClient(config.host, port=config.port, ssl=config.ssl) try: client.login(config.username, config.password) log.info("Logged in to IMAP %s as %s", config.host, config.username) client.select_folder(config.folder, readonly=True) log.info("Monitoring folder: %s", config.folder) # Get the current highest UID so we only process new messages last_uid = _get_highest_uid(client) log.debug("Starting UID: %s", last_uid) while not (shutdown_event and shutdown_event.is_set()): client.idle() try: responses = client.idle_check(timeout=IDLE_TIMEOUT) finally: client.idle_done() if shutdown_event and shutdown_event.is_set(): break # Check if any EXISTS response indicates new mail has_new = any( isinstance(resp, tuple) and len(resp) >= 2 and resp[1] == b"EXISTS" for resp in responses ) if not has_new: continue # Fetch new messages since last_uid new_uids = _fetch_new_uids(client, last_uid) if not new_uids: continue log.info("Fetching %d new message(s)", len(new_uids)) # Fetch full message bodies messages = client.fetch(new_uids, ["BODY.PEEK[]"]) for uid, data in messages.items(): raw = data.get(b"BODY[]") if raw: try: on_message(raw) except Exception: log.error("Error processing message UID %s", uid, exc_info=True) last_uid = max(new_uids) finally: try: client.logout() except Exception: pass def _get_highest_uid(client: IMAPClient) -> int: """Get the highest UID currently in the mailbox.""" uids = client.search(["ALL"]) if uids: return max(uids) return 0 def _fetch_new_uids(client: IMAPClient, last_uid: int) -> list[int]: """Search for messages with UID greater than last_uid.""" if last_uid > 0: uids = client.search(["UID", f"{last_uid + 1}:*"]) # IMAP UID search with * can return last_uid if no new messages return [u for u in uids if u > last_uid] return client.search(["ALL"])