2026-02-12 20:30:10 -05:00

128 lines
4.5 KiB
Python

"""IMAP connection, IDLE loop, and email fetching."""
import logging
import time
from collections.abc import Callable
from imapclient import IMAPClient
from .config import ImapConfig
log = logging.getLogger(__name__)
IDLE_TIMEOUT = 300 # 5 minutes total IDLE before re-idle (RFC 2177)
IDLE_POLL = 30 # check shutdown every 30 seconds within an IDLE
MAX_BACKOFF = 300 # 5 minutes max backoff
def run_idle_loop(
config: ImapConfig,
on_message: Callable[[bytes], None],
shutdown_event=None,
) -> None:
"""Main IMAP IDLE loop with automatic reconnection.
Calls on_message(raw_email_bytes) for each new message received.
Runs until shutdown_event is set (if provided) or forever.
"""
backoff = 1
while not (shutdown_event and shutdown_event.is_set()):
try:
_idle_session(config, on_message, shutdown_event)
backoff = 1 # reset on clean disconnect
except Exception:
log.error("IMAP connection error, reconnecting in %ds", backoff, exc_info=True)
if shutdown_event:
shutdown_event.wait(backoff)
else:
time.sleep(backoff)
backoff = min(backoff * 2, MAX_BACKOFF)
def _idle_session(
config: ImapConfig,
on_message: Callable[[bytes], None],
shutdown_event=None,
) -> None:
"""Run a single IMAP IDLE session until disconnection."""
client = IMAPClient(config.host, port=config.port, ssl=config.ssl)
try:
client.login(config.username, config.password)
log.info("Logged in to IMAP %s as %s", config.host, config.username)
client.select_folder(config.folder, readonly=True)
log.info("Monitoring folder: %s", config.folder)
# Get the current highest UID so we only process new messages
last_uid = _get_highest_uid(client)
log.debug("Starting UID: %s", last_uid)
while not (shutdown_event and shutdown_event.is_set()):
client.idle()
try:
# Poll in short intervals so we can check shutdown_event
# between iterations, rather than blocking for the full
# IDLE_TIMEOUT (which causes a hang on Ctrl-C).
responses = []
deadline = time.monotonic() + IDLE_TIMEOUT
while time.monotonic() < deadline:
if shutdown_event and shutdown_event.is_set():
break
chunk = client.idle_check(timeout=IDLE_POLL)
if chunk:
responses.extend(chunk)
break # got activity, exit IDLE to process
finally:
done_result = client.idle_done()
if done_result and done_result[0]:
responses.extend(done_result[0])
if shutdown_event and shutdown_event.is_set():
break
# Always search for new UIDs, not just when we see EXISTS.
# A message can arrive between idle_done() and the next idle()
# call — the server already sent EXISTS while we weren't
# listening, so we'd never see it. The UID SEARCH is cheap
# and eliminates this race condition entirely.
new_uids = _fetch_new_uids(client, last_uid)
if not new_uids:
continue
log.info("Fetching %d new message(s)", len(new_uids))
# Fetch full message bodies
messages = client.fetch(new_uids, ["BODY.PEEK[]"])
for uid, data in messages.items():
raw = data.get(b"BODY[]")
if raw:
try:
on_message(raw)
except Exception:
log.error("Error processing message UID %s", uid, exc_info=True)
last_uid = max(new_uids)
finally:
try:
client.logout()
except Exception:
pass
def _get_highest_uid(client: IMAPClient) -> int:
"""Get the highest UID currently in the mailbox."""
uids = client.search(["ALL"])
if uids:
return max(uids)
return 0
def _fetch_new_uids(client: IMAPClient, last_uid: int) -> list[int]:
"""Search for messages with UID greater than last_uid."""
if last_uid > 0:
uids = client.search(["UID", f"{last_uid + 1}:*"])
# IMAP UID search with * can return last_uid if no new messages
return [u for u in uids if u > last_uid]
return client.search(["ALL"])