IMAP_CalDAV_Yoga_Identifier/yoga_email_finder.py
2026-02-18 21:41:08 -05:00

460 lines
15 KiB
Python

#!/usr/bin/env python3
"""
Yoga Email Finder
Monitors an IMAP inbox via IDLE for District Flow Yoga reservation emails
and creates CalDAV calendar events automatically.
Usage:
python yoga_email_finder.py [config.yaml]
"""
import re
import sys
import uuid
import logging
import time
import email
import email.header
import unicodedata
from datetime import datetime, timedelta
from email.utils import parseaddr
from pathlib import Path
from zoneinfo import ZoneInfo
import yaml
import caldav
import icalendar
from bs4 import BeautifulSoup
from imapclient import IMAPClient
from imapclient.exceptions import IMAPClientError
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger(__name__)
# How often to refresh the IDLE command (servers drop it after ~30 min)
IDLE_REFRESH_SECONDS = 29 * 60
# ---------------------------------------------------------------------------
# Email parsing helpers
# ---------------------------------------------------------------------------
def decode_header(value: str) -> str:
"""Decode an encoded email header value (e.g. =?UTF-8?B?...?=)."""
parts = email.header.decode_header(value)
decoded = []
for part, charset in parts:
if isinstance(part, bytes):
decoded.append(part.decode(charset or "utf-8", errors="replace"))
else:
decoded.append(part)
return "".join(decoded)
def normalize_whitespace(text: str) -> str:
"""
Replace all Unicode whitespace variants (narrow no-break space U+202F, etc.)
with a plain ASCII space, then collapse runs.
"""
result = []
for ch in text:
cat = unicodedata.category(ch)
if cat.startswith("Z") or cat in ("Cc", "Cf") or ch in "\t\n\r\f\v":
result.append(" ")
else:
result.append(ch)
return re.sub(r" +", " ", "".join(result))
def extract_text_from_html(html: str) -> str:
"""Convert HTML to plain text using BeautifulSoup."""
soup = BeautifulSoup(html, "html.parser")
return soup.get_text(separator=" ", strip=True)
def get_email_body(msg: email.message.Message) -> str:
"""Extract the best available plain-text body from an email message."""
plain = ""
html = ""
if msg.is_multipart():
for part in msg.walk():
ct = part.get_content_type()
if ct == "text/plain" and not plain:
payload = part.get_payload(decode=True)
if payload:
plain = payload.decode(
part.get_content_charset() or "utf-8", errors="replace"
)
elif ct == "text/html" and not html:
payload = part.get_payload(decode=True)
if payload:
html = payload.decode(
part.get_content_charset() or "utf-8", errors="replace"
)
else:
payload = msg.get_payload(decode=True)
if payload:
content = payload.decode(
msg.get_content_charset() or "utf-8", errors="replace"
)
if msg.get_content_type() == "text/html":
html = content
else:
plain = content
if plain.strip():
return normalize_whitespace(plain)
if html.strip():
return normalize_whitespace(extract_text_from_html(html))
return ""
def parse_reservation_email(msg: email.message.Message) -> dict | None:
"""
Parse a District Flow Yoga reservation confirmation email.
Expected subject: "You reserved {class} at {time} on {date}!"
Expected body: "Your spot is reserved for {class} with {instructor}!
We will see you at {time} on {day}, {date}."
Returns a dict with event details, or None if not a reservation email.
"""
subject = decode_header(msg.get("Subject", ""))
# Quick subject check before doing expensive body parsing
if not re.search(r"you reserved", subject, re.IGNORECASE):
return None
body = get_email_body(msg)
if not body:
logger.warning("Email body is empty")
return None
logger.debug("Body snippet: %s", body[:300])
# Primary pattern from the email body
pattern = (
r"Your spot is reserved for (.+?) with (.+?)!"
r".{0,80}?"
r"We will see you at (.+?) on (?:\w+,\s+)?(.+?)\."
)
match = re.search(pattern, body, re.DOTALL | re.IGNORECASE)
if not match:
logger.warning("Reservation body pattern not found. Subject: %s", subject)
return None
class_name = match.group(1).strip()
instructor = match.group(2).strip()
time_str = match.group(3).strip()
date_str = match.group(4).strip()
# Parse datetime — try several formats to be robust
dt_combined = f"{date_str} {time_str}".strip()
start_dt = None
for fmt in (
"%B %d, %Y %I:%M %p", # March 2, 2026 8:00 PM
"%B %d, %Y %I:%M%p", # March 2, 2026 8:00PM
"%B %d, %Y %H:%M", # March 2, 2026 20:00
"%m/%d/%Y %I:%M %p", # 3/2/2026 8:00 PM
"%m/%d/%Y %I:%M%p", # 3/2/2026 8:00PM
):
try:
start_dt = datetime.strptime(dt_combined, fmt)
break
except ValueError:
continue
if start_dt is None:
logger.warning("Could not parse datetime from: '%s'", dt_combined)
return None
return {
"class_name": class_name,
"instructor": instructor,
"start_dt": start_dt,
"summary": f"Yoga: {class_name} with {instructor}",
"description": (
f"District Flow Yoga\n"
f"Class: {class_name}\n"
f"Instructor: {instructor}\n"
f"Time: {start_dt.strftime('%I:%M %p on %A, %B %d, %Y')}"
),
}
# ---------------------------------------------------------------------------
# CalDAV helpers
# ---------------------------------------------------------------------------
class CalendarManager:
"""Manages creating events on a CalDAV calendar."""
def __init__(self, config: dict):
caldav_cfg = config["caldav"]
self.client = caldav.DAVClient(
url=caldav_cfg["url"],
username=caldav_cfg["username"],
password=caldav_cfg["password"],
)
self.calendar = self._get_calendar(caldav_cfg.get("calendar_name"))
self.class_duration = timedelta(
minutes=config.get("class_duration_minutes", 60)
)
tz_name = config.get("timezone")
self.tzinfo = ZoneInfo(tz_name) if tz_name else None
def _get_calendar(self, name: str | None):
principal = self.client.principal()
calendars = principal.calendars()
if not calendars:
raise RuntimeError("No calendars found on CalDAV server")
if name:
for cal in calendars:
if cal.name and cal.name.lower() == name.lower():
logger.info("Using calendar: %s", cal.name)
return cal
logger.warning(
"Calendar '%s' not found, using: %s", name, calendars[0].name
)
else:
logger.info("Using calendar: %s", calendars[0].name)
return calendars[0]
def event_exists(self, summary: str, start_dt: datetime) -> bool:
"""Check whether an event with the same title already exists on that day."""
try:
day_start = start_dt.replace(hour=0, minute=0, second=0, microsecond=0)
day_end = day_start + timedelta(days=1)
events = self.calendar.date_search(start=day_start, end=day_end)
for ev in events:
try:
cal_obj = icalendar.Calendar.from_ical(ev.data)
for component in cal_obj.walk():
if component.name == "VEVENT":
ev_summary = str(component.get("SUMMARY", ""))
if ev_summary == summary:
return True
except Exception:
pass
except Exception as e:
logger.warning("Could not check for duplicate events: %s", e)
return False
def add_event(self, event_info: dict) -> bool:
"""
Add a yoga class event to the calendar.
Returns True if added, False if a duplicate was detected.
"""
start_dt = event_info["start_dt"]
# Apply timezone if configured
if self.tzinfo:
start_dt = start_dt.replace(tzinfo=self.tzinfo)
if self.event_exists(event_info["summary"], start_dt):
logger.info("Event already exists, skipping: %s", event_info["summary"])
return False
end_dt = start_dt + self.class_duration
cal = icalendar.Calendar()
cal.add("prodid", "-//Yoga Email Finder//EN")
cal.add("version", "2.0")
ev = icalendar.Event()
ev.add("uid", str(uuid.uuid4()))
ev.add("summary", event_info["summary"])
ev.add("description", event_info["description"])
ev.add("dtstart", start_dt)
ev.add("dtend", end_dt)
ev.add("dtstamp", datetime.now())
cal.add_component(ev)
self.calendar.add_event(cal.to_ical().decode("utf-8"))
logger.info("Added event: %s at %s", event_info["summary"], start_dt)
return True
# ---------------------------------------------------------------------------
# Duplicate tracking across restarts
# ---------------------------------------------------------------------------
class ProcessedTracker:
"""
Tracks Message-IDs of processed emails in a flat file so we don't
re-add calendar events after a restart.
"""
def __init__(self, path: str = ".processed_ids"):
self.path = Path(path)
self.ids: set[str] = set()
self._load()
def _load(self):
if self.path.exists():
self.ids = set(self.path.read_text().splitlines())
logger.debug("Loaded %d processed message IDs", len(self.ids))
def contains(self, msg_id: str) -> bool:
return msg_id in self.ids
def add(self, msg_id: str):
self.ids.add(msg_id)
with self.path.open("a") as f:
f.write(msg_id + "\n")
# ---------------------------------------------------------------------------
# IMAP / message processing
# ---------------------------------------------------------------------------
def is_from_sender(msg: email.message.Message, sender_filter: dict) -> bool:
"""Return True if the message is from the configured sender."""
from_raw = msg.get("From", "")
_, from_addr = parseaddr(from_raw)
filter_email = sender_filter.get("email", "").lower()
filter_name = sender_filter.get("name", "").lower()
if filter_email and from_addr.lower() == filter_email:
return True
if filter_name and filter_name in from_raw.lower():
return True
return False
def process_messages(
client: IMAPClient,
uids: list,
sender_filter: dict,
calendar: CalendarManager,
tracker: ProcessedTracker,
):
"""Fetch and process a list of message UIDs."""
if not uids:
return
fetch_data = client.fetch(uids, ["RFC822"])
for uid, data in fetch_data.items():
raw = data.get(b"RFC822")
if not raw:
continue
msg = email.message_from_bytes(raw)
msg_id = msg.get("Message-ID", "").strip()
if msg_id and tracker.contains(msg_id):
logger.debug("Already processed %s, skipping", msg_id)
continue
if not is_from_sender(msg, sender_filter):
logger.debug("Skipping email from: %s", msg.get("From", ""))
if msg_id:
tracker.add(msg_id)
continue
subject = decode_header(msg.get("Subject", ""))
logger.info("Processing email: %s", subject)
event_info = parse_reservation_email(msg)
if event_info:
calendar.add_event(event_info)
else:
logger.info("Not a reservation email, skipping")
if msg_id:
tracker.add(msg_id)
def has_new_messages(responses) -> bool:
"""Return True if any IDLE server response indicates new or changed messages."""
for resp in responses:
if isinstance(resp, tuple) and len(resp) == 2:
if resp[1] in (b"EXISTS", b"RECENT"):
return True
return False
# ---------------------------------------------------------------------------
# Main loop
# ---------------------------------------------------------------------------
def run(config_path: str):
with open(config_path) as f:
config = yaml.safe_load(f)
imap_cfg = config["imap"]
sender_filter = config.get("sender_filter", {})
logger.info("Connecting to CalDAV...")
calendar = CalendarManager(config)
tracker = ProcessedTracker(config.get("processed_ids_file", ".processed_ids"))
filter_email = sender_filter.get("email", "")
while True:
try:
logger.info("Connecting to IMAP %s...", imap_cfg["host"])
with IMAPClient(
imap_cfg["host"],
port=imap_cfg.get("port", 993),
ssl=imap_cfg.get("ssl", True),
) as client:
client.login(imap_cfg["username"], imap_cfg["password"])
folder = imap_cfg.get("folder", "INBOX")
client.select_folder(folder)
logger.info("Selected folder: %s", folder)
# --- Initial scan: process all historical emails from sender ---
search_criteria = (
["FROM", filter_email] if filter_email else ["ALL"]
)
all_uids = client.search(search_criteria)
logger.info(
"Initial scan: %d emails to check from sender", len(all_uids)
)
process_messages(client, all_uids, sender_filter, calendar, tracker)
# --- IDLE loop: wait for new emails in real time ---
logger.info("Entering IDLE mode, waiting for new emails...")
client.idle()
while True:
responses = client.idle_check(timeout=IDLE_REFRESH_SECONDS)
client.idle_done()
if responses and has_new_messages(responses):
logger.info("New messages detected: %s", responses)
# Fetch any unseen emails from the sender
unseen_criteria = (
["UNSEEN", "FROM", filter_email]
if filter_email
else ["UNSEEN"]
)
new_uids = client.search(unseen_criteria)
process_messages(
client, new_uids, sender_filter, calendar, tracker
)
else:
logger.debug("IDLE keepalive/timeout, refreshing connection")
# Re-enter IDLE for the next window
client.idle()
except (IMAPClientError, ConnectionError, TimeoutError, OSError) as e:
logger.error("Connection error: %s — reconnecting in 30s...", e)
time.sleep(30)
except Exception as e:
logger.exception("Unexpected error: %s — reconnecting in 30s...", e)
time.sleep(30)
if __name__ == "__main__":
config_file = sys.argv[1] if len(sys.argv) > 1 else "config.yaml"
run(config_file)