"""IMAP connection, IDLE loop, and email fetching.""" import logging import time from collections.abc import Callable from imapclient import IMAPClient from .config import ImapConfig log = logging.getLogger(__name__) IDLE_TIMEOUT = 300 # 5 minutes total IDLE before re-idle (RFC 2177) IDLE_POLL = 30 # check shutdown every 30 seconds within an IDLE MAX_BACKOFF = 300 # 5 minutes max backoff def run_idle_loop( config: ImapConfig, on_message: Callable[[bytes], None], shutdown_event=None, ) -> None: """Main IMAP IDLE loop with automatic reconnection. Calls on_message(raw_email_bytes) for each new message received. Runs until shutdown_event is set (if provided) or forever. """ backoff = 1 while not (shutdown_event and shutdown_event.is_set()): try: _idle_session(config, on_message, shutdown_event) backoff = 1 # reset on clean disconnect except Exception: log.error("IMAP connection error, reconnecting in %ds", backoff, exc_info=True) if shutdown_event: shutdown_event.wait(backoff) else: time.sleep(backoff) backoff = min(backoff * 2, MAX_BACKOFF) def _idle_session( config: ImapConfig, on_message: Callable[[bytes], None], shutdown_event=None, ) -> None: """Run a single IMAP IDLE session until disconnection.""" client = IMAPClient(config.host, port=config.port, ssl=config.ssl) try: client.login(config.username, config.password) log.info("Logged in to IMAP %s as %s", config.host, config.username) client.select_folder(config.folder, readonly=True) log.info("Monitoring folder: %s", config.folder) # Get the current highest UID so we only process new messages last_uid = _get_highest_uid(client) log.debug("Starting UID: %s", last_uid) while not (shutdown_event and shutdown_event.is_set()): client.idle() try: # Poll in short intervals so we can check shutdown_event # between iterations, rather than blocking for the full # IDLE_TIMEOUT (which causes a hang on Ctrl-C). responses = [] deadline = time.monotonic() + IDLE_TIMEOUT while time.monotonic() < deadline: if shutdown_event and shutdown_event.is_set(): break chunk = client.idle_check(timeout=IDLE_POLL) if chunk: responses.extend(chunk) break # got activity, exit IDLE to process finally: done_result = client.idle_done() if done_result and done_result[0]: responses.extend(done_result[0]) if shutdown_event and shutdown_event.is_set(): break # Always search for new UIDs, not just when we see EXISTS. # A message can arrive between idle_done() and the next idle() # call — the server already sent EXISTS while we weren't # listening, so we'd never see it. The UID SEARCH is cheap # and eliminates this race condition entirely. new_uids = _fetch_new_uids(client, last_uid) if not new_uids: continue log.info("Fetching %d new message(s)", len(new_uids)) # Fetch full message bodies messages = client.fetch(new_uids, ["BODY.PEEK[]"]) for uid, data in messages.items(): raw = data.get(b"BODY[]") if raw: try: on_message(raw) except Exception: log.error("Error processing message UID %s", uid, exc_info=True) last_uid = max(new_uids) finally: try: client.logout() except Exception: pass def _get_highest_uid(client: IMAPClient) -> int: """Get the highest UID currently in the mailbox.""" uids = client.search(["ALL"]) if uids: return max(uids) return 0 def _fetch_new_uids(client: IMAPClient, last_uid: int) -> list[int]: """Search for messages with UID greater than last_uid.""" if last_uid > 0: uids = client.search(["UID", f"{last_uid + 1}:*"]) # IMAP UID search with * can return last_uid if no new messages return [u for u in uids if u > last_uid] return client.search(["ALL"])