Initial commit: IMAP IDLE to CalDAV daemon

Monitor an IMAP mailbox via IDLE for iMIP emails (invitations, RSVPs,
cancellations) and ICS attachments, then update a CalDAV calendar.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Thomas Faour 2026-02-12 19:40:17 -05:00
commit 37b032771f
9 changed files with 517 additions and 0 deletions

9
.gitignore vendored Normal file
View File

@ -0,0 +1,9 @@
__pycache__/
*.py[cod]
*.egg-info/
dist/
build/
.venv/
venv/
config.yaml
*.log

15
config.example.yaml Normal file
View File

@ -0,0 +1,15 @@
imap:
host: imap.example.com
port: 993
username: user@example.com
password: secret
folder: INBOX # folder to monitor
ssl: true
caldav:
url: https://caldav.example.com/dav/calendars/user/calendar/
username: user@example.com
password: secret
logging:
level: INFO # DEBUG, INFO, WARNING, ERROR

View File

View File

@ -0,0 +1,49 @@
"""Entry point for imap-idle-caldav daemon."""
import logging
import signal
import sys
import threading
from .caldav_client import CalDAVHandler
from .config import load_config, parse_args
from .email_processor import process_email
from .imap_client import run_idle_loop
log = logging.getLogger("imap_idle_caldav")
def main() -> None:
config_path = parse_args()
config = load_config(config_path)
logging.basicConfig(
level=getattr(logging, config.logging.level.upper(), logging.INFO),
format="%(asctime)s %(levelname)s [%(name)s] %(message)s",
stream=sys.stderr,
)
caldav_handler = CalDAVHandler(config.caldav)
caldav_handler.connect()
shutdown = threading.Event()
def on_signal(signum, _frame):
log.info("Received signal %s, shutting down", signal.Signals(signum).name)
shutdown.set()
signal.signal(signal.SIGTERM, on_signal)
signal.signal(signal.SIGINT, on_signal)
def on_message(raw_bytes: bytes) -> None:
events = process_email(raw_bytes)
for event in events:
caldav_handler.handle_event(event)
log.info("Starting IMAP IDLE daemon")
run_idle_loop(config.imap, on_message, shutdown_event=shutdown)
log.info("Daemon stopped")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,134 @@
"""CalDAV operations: create, update, and cancel events."""
import logging
import caldav
import icalendar
from .config import CaldavConfig
from .email_processor import CalendarEvent
log = logging.getLogger(__name__)
class CalDAVHandler:
def __init__(self, config: CaldavConfig):
self._config = config
self._client: caldav.DAVClient | None = None
self._calendar: caldav.Calendar | None = None
def connect(self) -> None:
"""Connect to the CalDAV server and locate the calendar."""
self._client = caldav.DAVClient(
url=self._config.url,
username=self._config.username,
password=self._config.password,
)
self._calendar = caldav.Calendar(client=self._client, url=self._config.url)
log.info("Connected to CalDAV server: %s", self._config.url)
def handle_event(self, event: CalendarEvent) -> None:
"""Dispatch a calendar event based on its METHOD."""
try:
if event.method == "REQUEST" or event.method == "PUBLISH":
self._handle_request(event)
elif event.method == "REPLY":
self._handle_reply(event)
elif event.method == "CANCEL":
self._handle_cancel(event)
else:
log.warning("Unknown METHOD %s for UID %s, treating as REQUEST", event.method, event.uid)
self._handle_request(event)
except Exception:
log.error("CalDAV error processing UID %s (method=%s)", event.uid, event.method, exc_info=True)
def _find_event(self, uid: str) -> caldav.CalendarObjectResource | None:
"""Search the calendar for an event by UID."""
try:
return self._calendar.event_by_uid(uid)
except caldav.error.NotFoundError:
return None
except Exception:
log.debug("Error searching for UID %s", uid, exc_info=True)
return None
def _handle_request(self, event: CalendarEvent) -> None:
"""Handle REQUEST/PUBLISH: create or update an event."""
existing = self._find_event(event.uid)
cal = icalendar.Calendar()
cal.add("PRODID", "-//imap-idle-caldav//EN")
cal.add("VERSION", "2.0")
cal.add_component(event.vevent)
ical_data = cal.to_ical().decode("utf-8")
if existing:
existing.data = ical_data
existing.save()
log.info("Updated event UID %s", event.uid)
else:
self._calendar.save_event(ical_data)
log.info("Created event UID %s", event.uid)
def _handle_reply(self, event: CalendarEvent) -> None:
"""Handle REPLY: update attendee PARTSTAT on an existing event."""
existing = self._find_event(event.uid)
if not existing:
log.warning("REPLY for unknown UID %s, skipping", event.uid)
return
existing_cal = icalendar.Calendar.from_ical(existing.data)
reply_attendees = event.vevent.get("ATTENDEE")
if reply_attendees is None:
log.warning("REPLY with no ATTENDEE for UID %s", event.uid)
return
# Normalize to list
if not isinstance(reply_attendees, list):
reply_attendees = [reply_attendees]
# Build a map of reply attendee addresses to their PARTSTAT
reply_map: dict[str, str] = {}
for att in reply_attendees:
addr = str(att).lower()
partstat = att.params.get("PARTSTAT", "NEEDS-ACTION")
reply_map[addr] = str(partstat)
updated = False
for component in existing_cal.walk():
if component.name != "VEVENT":
continue
existing_attendees = component.get("ATTENDEE")
if existing_attendees is None:
continue
if not isinstance(existing_attendees, list):
existing_attendees = [existing_attendees]
for att in existing_attendees:
addr = str(att).lower()
if addr in reply_map:
att.params["PARTSTAT"] = reply_map[addr]
updated = True
log.info("Updated PARTSTAT for %s to %s on UID %s", addr, reply_map[addr], event.uid)
if updated:
existing.data = existing_cal.to_ical().decode("utf-8")
existing.save()
else:
log.warning("REPLY attendee not found in existing event UID %s", event.uid)
def _handle_cancel(self, event: CalendarEvent) -> None:
"""Handle CANCEL: mark event as CANCELLED."""
existing = self._find_event(event.uid)
if not existing:
log.warning("CANCEL for unknown UID %s, skipping", event.uid)
return
existing_cal = icalendar.Calendar.from_ical(existing.data)
for component in existing_cal.walk():
if component.name == "VEVENT":
component["STATUS"] = "CANCELLED"
existing.data = existing_cal.to_ical().decode("utf-8")
existing.save()
log.info("Cancelled event UID %s", event.uid)

104
imap_idle_caldav/config.py Normal file
View File

@ -0,0 +1,104 @@
"""YAML configuration loading and validation."""
import argparse
import sys
from dataclasses import dataclass, field
from pathlib import Path
import yaml
@dataclass
class ImapConfig:
host: str
port: int
username: str
password: str
folder: str = "INBOX"
ssl: bool = True
@dataclass
class CaldavConfig:
url: str
username: str
password: str
@dataclass
class LoggingConfig:
level: str = "INFO"
@dataclass
class Config:
imap: ImapConfig
caldav: CaldavConfig
logging: LoggingConfig = field(default_factory=LoggingConfig)
def load_config(path: Path) -> Config:
"""Load and validate configuration from a YAML file."""
if not path.exists():
print(f"Error: config file not found: {path}", file=sys.stderr)
sys.exit(1)
with open(path) as f:
raw = yaml.safe_load(f)
if not isinstance(raw, dict):
print("Error: config file must be a YAML mapping", file=sys.stderr)
sys.exit(1)
for section in ("imap", "caldav"):
if section not in raw:
print(f"Error: missing required config section: {section}", file=sys.stderr)
sys.exit(1)
imap_raw = raw["imap"]
for key in ("host", "username", "password"):
if key not in imap_raw:
print(f"Error: missing required imap config key: {key}", file=sys.stderr)
sys.exit(1)
caldav_raw = raw["caldav"]
for key in ("url", "username", "password"):
if key not in caldav_raw:
print(f"Error: missing required caldav config key: {key}", file=sys.stderr)
sys.exit(1)
imap_cfg = ImapConfig(
host=imap_raw["host"],
port=imap_raw.get("port", 993),
username=imap_raw["username"],
password=imap_raw["password"],
folder=imap_raw.get("folder", "INBOX"),
ssl=imap_raw.get("ssl", True),
)
caldav_cfg = CaldavConfig(
url=caldav_raw["url"],
username=caldav_raw["username"],
password=caldav_raw["password"],
)
logging_raw = raw.get("logging", {})
logging_cfg = LoggingConfig(
level=logging_raw.get("level", "INFO"),
)
return Config(imap=imap_cfg, caldav=caldav_cfg, logging=logging_cfg)
def parse_args() -> Path:
"""Parse CLI arguments and return the config file path."""
parser = argparse.ArgumentParser(
description="IMAP IDLE to CalDAV daemon",
)
parser.add_argument(
"-c", "--config",
default="config.yaml",
help="Path to YAML config file (default: config.yaml)",
)
args = parser.parse_args()
return Path(args.config)

View File

@ -0,0 +1,70 @@
"""Extract ICS calendar data from emails."""
import email
import logging
from dataclasses import dataclass
from email.message import Message
import icalendar
log = logging.getLogger(__name__)
@dataclass
class CalendarEvent:
method: str # REQUEST, REPLY, CANCEL, PUBLISH
vevent: icalendar.cal.Event
uid: str
def process_email(raw_bytes: bytes) -> list[CalendarEvent]:
"""Parse a raw email and extract calendar events from ICS content.
Looks for:
- text/calendar MIME parts (iMIP inline)
- application/ics attachments
- Attachments with .ics extension
"""
msg = email.message_from_bytes(raw_bytes)
subject = msg.get("Subject", "(no subject)")
log.debug("Processing email: %s", subject)
events: list[CalendarEvent] = []
for part in msg.walk():
content_type = part.get_content_type()
filename = part.get_filename() or ""
is_calendar = content_type in ("text/calendar", "application/ics")
is_ics_attachment = filename.lower().endswith(".ics")
if not (is_calendar or is_ics_attachment):
continue
payload = part.get_payload(decode=True)
if not payload:
continue
try:
cal = icalendar.Calendar.from_ical(payload)
except Exception:
log.warning("Failed to parse ICS from email: %s", subject, exc_info=True)
continue
method = str(cal.get("METHOD", "PUBLISH")).upper()
for component in cal.walk():
if component.name != "VEVENT":
continue
uid = str(component.get("UID", ""))
if not uid:
log.warning("VEVENT without UID in email: %s", subject)
continue
events.append(CalendarEvent(method=method, vevent=component, uid=uid))
if events:
log.info("Found %d calendar event(s) in email: %s", len(events), subject)
else:
log.debug("No calendar content in email: %s", subject)
return events

View File

@ -0,0 +1,118 @@
"""IMAP connection, IDLE loop, and email fetching."""
import logging
import time
from collections.abc import Callable
from imapclient import IMAPClient
from .config import ImapConfig
log = logging.getLogger(__name__)
IDLE_TIMEOUT = 300 # 5 minutes, per RFC 2177
MAX_BACKOFF = 300 # 5 minutes max backoff
def run_idle_loop(
config: ImapConfig,
on_message: Callable[[bytes], None],
shutdown_event=None,
) -> None:
"""Main IMAP IDLE loop with automatic reconnection.
Calls on_message(raw_email_bytes) for each new message received.
Runs until shutdown_event is set (if provided) or forever.
"""
backoff = 1
while not (shutdown_event and shutdown_event.is_set()):
try:
_idle_session(config, on_message, shutdown_event)
backoff = 1 # reset on clean disconnect
except Exception:
log.error("IMAP connection error, reconnecting in %ds", backoff, exc_info=True)
if shutdown_event:
shutdown_event.wait(backoff)
else:
time.sleep(backoff)
backoff = min(backoff * 2, MAX_BACKOFF)
def _idle_session(
config: ImapConfig,
on_message: Callable[[bytes], None],
shutdown_event=None,
) -> None:
"""Run a single IMAP IDLE session until disconnection."""
client = IMAPClient(config.host, port=config.port, ssl=config.ssl)
try:
client.login(config.username, config.password)
log.info("Logged in to IMAP %s as %s", config.host, config.username)
client.select_folder(config.folder, readonly=True)
log.info("Monitoring folder: %s", config.folder)
# Get the current highest UID so we only process new messages
last_uid = _get_highest_uid(client)
log.debug("Starting UID: %s", last_uid)
while not (shutdown_event and shutdown_event.is_set()):
client.idle()
try:
responses = client.idle_check(timeout=IDLE_TIMEOUT)
finally:
client.idle_done()
if shutdown_event and shutdown_event.is_set():
break
# Check if any EXISTS response indicates new mail
has_new = any(
isinstance(resp, tuple) and len(resp) >= 2 and resp[1] == b"EXISTS"
for resp in responses
)
if not has_new:
continue
# Fetch new messages since last_uid
new_uids = _fetch_new_uids(client, last_uid)
if not new_uids:
continue
log.info("Fetching %d new message(s)", len(new_uids))
# Fetch full message bodies
messages = client.fetch(new_uids, ["BODY.PEEK[]"])
for uid, data in messages.items():
raw = data.get(b"BODY[]")
if raw:
try:
on_message(raw)
except Exception:
log.error("Error processing message UID %s", uid, exc_info=True)
last_uid = max(new_uids)
finally:
try:
client.logout()
except Exception:
pass
def _get_highest_uid(client: IMAPClient) -> int:
"""Get the highest UID currently in the mailbox."""
uids = client.search(["ALL"])
if uids:
return max(uids)
return 0
def _fetch_new_uids(client: IMAPClient, last_uid: int) -> list[int]:
"""Search for messages with UID greater than last_uid."""
if last_uid > 0:
uids = client.search(["UID", f"{last_uid + 1}:*"])
# IMAP UID search with * can return last_uid if no new messages
return [u for u in uids if u > last_uid]
return client.search(["ALL"])

18
pyproject.toml Normal file
View File

@ -0,0 +1,18 @@
[build-system]
requires = ["setuptools>=68.0", "setuptools-scm>=8.0"]
build-backend = "setuptools.build_meta"
[project]
name = "imap-idle-caldav"
version = "0.1.0"
description = "Daemon that monitors IMAP via IDLE for iMIP emails and updates a CalDAV calendar"
requires-python = ">=3.10"
dependencies = [
"imapclient>=3.0",
"caldav>=1.3",
"icalendar>=5.0",
"pyyaml>=6.0",
]
[project.scripts]
imap-idle-caldav = "imap_idle_caldav.__main__:main"