460 lines
15 KiB
Python
460 lines
15 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Yoga Email Finder
|
|
|
|
Monitors an IMAP inbox via IDLE for District Flow Yoga reservation emails
|
|
and creates CalDAV calendar events automatically.
|
|
|
|
Usage:
|
|
python yoga_email_finder.py [config.yaml]
|
|
"""
|
|
|
|
import re
|
|
import sys
|
|
import uuid
|
|
import logging
|
|
import time
|
|
import email
|
|
import email.header
|
|
import unicodedata
|
|
from datetime import datetime, timedelta
|
|
from email.utils import parseaddr
|
|
from pathlib import Path
|
|
from zoneinfo import ZoneInfo
|
|
|
|
import yaml
|
|
import caldav
|
|
import icalendar
|
|
from bs4 import BeautifulSoup
|
|
from imapclient import IMAPClient
|
|
from imapclient.exceptions import IMAPClientError
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s %(levelname)s %(message)s",
|
|
datefmt="%Y-%m-%d %H:%M:%S",
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# How often to refresh the IDLE command (servers drop it after ~30 min)
|
|
IDLE_REFRESH_SECONDS = 29 * 60
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Email parsing helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def decode_header(value: str) -> str:
|
|
"""Decode an encoded email header value (e.g. =?UTF-8?B?...?=)."""
|
|
parts = email.header.decode_header(value)
|
|
decoded = []
|
|
for part, charset in parts:
|
|
if isinstance(part, bytes):
|
|
decoded.append(part.decode(charset or "utf-8", errors="replace"))
|
|
else:
|
|
decoded.append(part)
|
|
return "".join(decoded)
|
|
|
|
|
|
def normalize_whitespace(text: str) -> str:
|
|
"""
|
|
Replace all Unicode whitespace variants (narrow no-break space U+202F, etc.)
|
|
with a plain ASCII space, then collapse runs.
|
|
"""
|
|
result = []
|
|
for ch in text:
|
|
cat = unicodedata.category(ch)
|
|
if cat.startswith("Z") or cat in ("Cc", "Cf") or ch in "\t\n\r\f\v":
|
|
result.append(" ")
|
|
else:
|
|
result.append(ch)
|
|
return re.sub(r" +", " ", "".join(result))
|
|
|
|
|
|
def extract_text_from_html(html: str) -> str:
|
|
"""Convert HTML to plain text using BeautifulSoup."""
|
|
soup = BeautifulSoup(html, "html.parser")
|
|
return soup.get_text(separator=" ", strip=True)
|
|
|
|
|
|
def get_email_body(msg: email.message.Message) -> str:
|
|
"""Extract the best available plain-text body from an email message."""
|
|
plain = ""
|
|
html = ""
|
|
|
|
if msg.is_multipart():
|
|
for part in msg.walk():
|
|
ct = part.get_content_type()
|
|
if ct == "text/plain" and not plain:
|
|
payload = part.get_payload(decode=True)
|
|
if payload:
|
|
plain = payload.decode(
|
|
part.get_content_charset() or "utf-8", errors="replace"
|
|
)
|
|
elif ct == "text/html" and not html:
|
|
payload = part.get_payload(decode=True)
|
|
if payload:
|
|
html = payload.decode(
|
|
part.get_content_charset() or "utf-8", errors="replace"
|
|
)
|
|
else:
|
|
payload = msg.get_payload(decode=True)
|
|
if payload:
|
|
content = payload.decode(
|
|
msg.get_content_charset() or "utf-8", errors="replace"
|
|
)
|
|
if msg.get_content_type() == "text/html":
|
|
html = content
|
|
else:
|
|
plain = content
|
|
|
|
if plain.strip():
|
|
return normalize_whitespace(plain)
|
|
if html.strip():
|
|
return normalize_whitespace(extract_text_from_html(html))
|
|
return ""
|
|
|
|
|
|
def parse_reservation_email(msg: email.message.Message) -> dict | None:
|
|
"""
|
|
Parse a District Flow Yoga reservation confirmation email.
|
|
|
|
Expected subject: "You reserved {class} at {time} on {date}!"
|
|
Expected body: "Your spot is reserved for {class} with {instructor}!
|
|
We will see you at {time} on {day}, {date}."
|
|
|
|
Returns a dict with event details, or None if not a reservation email.
|
|
"""
|
|
subject = decode_header(msg.get("Subject", ""))
|
|
|
|
# Quick subject check before doing expensive body parsing
|
|
if not re.search(r"you reserved", subject, re.IGNORECASE):
|
|
return None
|
|
|
|
body = get_email_body(msg)
|
|
if not body:
|
|
logger.warning("Email body is empty")
|
|
return None
|
|
|
|
logger.debug("Body snippet: %s", body[:300])
|
|
|
|
# Primary pattern from the email body
|
|
pattern = (
|
|
r"Your spot is reserved for (.+?) with (.+?)!"
|
|
r".{0,80}?"
|
|
r"We will see you at (.+?) on (?:\w+,\s+)?(.+?)\."
|
|
)
|
|
match = re.search(pattern, body, re.DOTALL | re.IGNORECASE)
|
|
if not match:
|
|
logger.warning("Reservation body pattern not found. Subject: %s", subject)
|
|
return None
|
|
|
|
class_name = match.group(1).strip()
|
|
instructor = match.group(2).strip()
|
|
time_str = match.group(3).strip()
|
|
date_str = match.group(4).strip()
|
|
|
|
# Parse datetime — try several formats to be robust
|
|
dt_combined = f"{date_str} {time_str}".strip()
|
|
start_dt = None
|
|
for fmt in (
|
|
"%B %d, %Y %I:%M %p", # March 2, 2026 8:00 PM
|
|
"%B %d, %Y %I:%M%p", # March 2, 2026 8:00PM
|
|
"%B %d, %Y %H:%M", # March 2, 2026 20:00
|
|
"%m/%d/%Y %I:%M %p", # 3/2/2026 8:00 PM
|
|
"%m/%d/%Y %I:%M%p", # 3/2/2026 8:00PM
|
|
):
|
|
try:
|
|
start_dt = datetime.strptime(dt_combined, fmt)
|
|
break
|
|
except ValueError:
|
|
continue
|
|
|
|
if start_dt is None:
|
|
logger.warning("Could not parse datetime from: '%s'", dt_combined)
|
|
return None
|
|
|
|
return {
|
|
"class_name": class_name,
|
|
"instructor": instructor,
|
|
"start_dt": start_dt,
|
|
"summary": f"Yoga: {class_name} with {instructor}",
|
|
"description": (
|
|
f"District Flow Yoga\n"
|
|
f"Class: {class_name}\n"
|
|
f"Instructor: {instructor}\n"
|
|
f"Time: {start_dt.strftime('%I:%M %p on %A, %B %d, %Y')}"
|
|
),
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CalDAV helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class CalendarManager:
|
|
"""Manages creating events on a CalDAV calendar."""
|
|
|
|
def __init__(self, config: dict):
|
|
caldav_cfg = config["caldav"]
|
|
self.client = caldav.DAVClient(
|
|
url=caldav_cfg["url"],
|
|
username=caldav_cfg["username"],
|
|
password=caldav_cfg["password"],
|
|
)
|
|
self.calendar = self._get_calendar(caldav_cfg.get("calendar_name"))
|
|
self.class_duration = timedelta(
|
|
minutes=config.get("class_duration_minutes", 60)
|
|
)
|
|
tz_name = config.get("timezone")
|
|
self.tzinfo = ZoneInfo(tz_name) if tz_name else None
|
|
|
|
def _get_calendar(self, name: str | None):
|
|
principal = self.client.principal()
|
|
calendars = principal.calendars()
|
|
if not calendars:
|
|
raise RuntimeError("No calendars found on CalDAV server")
|
|
if name:
|
|
for cal in calendars:
|
|
if cal.name and cal.name.lower() == name.lower():
|
|
logger.info("Using calendar: %s", cal.name)
|
|
return cal
|
|
logger.warning(
|
|
"Calendar '%s' not found, using: %s", name, calendars[0].name
|
|
)
|
|
else:
|
|
logger.info("Using calendar: %s", calendars[0].name)
|
|
return calendars[0]
|
|
|
|
def event_exists(self, summary: str, start_dt: datetime) -> bool:
|
|
"""Check whether an event with the same title already exists on that day."""
|
|
try:
|
|
day_start = start_dt.replace(hour=0, minute=0, second=0, microsecond=0)
|
|
day_end = day_start + timedelta(days=1)
|
|
events = self.calendar.date_search(start=day_start, end=day_end)
|
|
for ev in events:
|
|
try:
|
|
cal_obj = icalendar.Calendar.from_ical(ev.data)
|
|
for component in cal_obj.walk():
|
|
if component.name == "VEVENT":
|
|
ev_summary = str(component.get("SUMMARY", ""))
|
|
if ev_summary == summary:
|
|
return True
|
|
except Exception:
|
|
pass
|
|
except Exception as e:
|
|
logger.warning("Could not check for duplicate events: %s", e)
|
|
return False
|
|
|
|
def add_event(self, event_info: dict) -> bool:
|
|
"""
|
|
Add a yoga class event to the calendar.
|
|
Returns True if added, False if a duplicate was detected.
|
|
"""
|
|
start_dt = event_info["start_dt"]
|
|
|
|
# Apply timezone if configured
|
|
if self.tzinfo:
|
|
start_dt = start_dt.replace(tzinfo=self.tzinfo)
|
|
|
|
if self.event_exists(event_info["summary"], start_dt):
|
|
logger.info("Event already exists, skipping: %s", event_info["summary"])
|
|
return False
|
|
|
|
end_dt = start_dt + self.class_duration
|
|
|
|
cal = icalendar.Calendar()
|
|
cal.add("prodid", "-//Yoga Email Finder//EN")
|
|
cal.add("version", "2.0")
|
|
|
|
ev = icalendar.Event()
|
|
ev.add("uid", str(uuid.uuid4()))
|
|
ev.add("summary", event_info["summary"])
|
|
ev.add("description", event_info["description"])
|
|
ev.add("dtstart", start_dt)
|
|
ev.add("dtend", end_dt)
|
|
ev.add("dtstamp", datetime.now())
|
|
|
|
cal.add_component(ev)
|
|
self.calendar.add_event(cal.to_ical().decode("utf-8"))
|
|
logger.info("Added event: %s at %s", event_info["summary"], start_dt)
|
|
return True
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Duplicate tracking across restarts
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class ProcessedTracker:
|
|
"""
|
|
Tracks Message-IDs of processed emails in a flat file so we don't
|
|
re-add calendar events after a restart.
|
|
"""
|
|
|
|
def __init__(self, path: str = ".processed_ids"):
|
|
self.path = Path(path)
|
|
self.ids: set[str] = set()
|
|
self._load()
|
|
|
|
def _load(self):
|
|
if self.path.exists():
|
|
self.ids = set(self.path.read_text().splitlines())
|
|
logger.debug("Loaded %d processed message IDs", len(self.ids))
|
|
|
|
def contains(self, msg_id: str) -> bool:
|
|
return msg_id in self.ids
|
|
|
|
def add(self, msg_id: str):
|
|
self.ids.add(msg_id)
|
|
with self.path.open("a") as f:
|
|
f.write(msg_id + "\n")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# IMAP / message processing
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def is_from_sender(msg: email.message.Message, sender_filter: dict) -> bool:
|
|
"""Return True if the message is from the configured sender."""
|
|
from_raw = msg.get("From", "")
|
|
_, from_addr = parseaddr(from_raw)
|
|
filter_email = sender_filter.get("email", "").lower()
|
|
filter_name = sender_filter.get("name", "").lower()
|
|
|
|
if filter_email and from_addr.lower() == filter_email:
|
|
return True
|
|
if filter_name and filter_name in from_raw.lower():
|
|
return True
|
|
return False
|
|
|
|
|
|
def process_messages(
|
|
client: IMAPClient,
|
|
uids: list,
|
|
sender_filter: dict,
|
|
calendar: CalendarManager,
|
|
tracker: ProcessedTracker,
|
|
):
|
|
"""Fetch and process a list of message UIDs."""
|
|
if not uids:
|
|
return
|
|
|
|
fetch_data = client.fetch(uids, ["RFC822"])
|
|
for uid, data in fetch_data.items():
|
|
raw = data.get(b"RFC822")
|
|
if not raw:
|
|
continue
|
|
|
|
msg = email.message_from_bytes(raw)
|
|
msg_id = msg.get("Message-ID", "").strip()
|
|
|
|
if msg_id and tracker.contains(msg_id):
|
|
logger.debug("Already processed %s, skipping", msg_id)
|
|
continue
|
|
|
|
if not is_from_sender(msg, sender_filter):
|
|
logger.debug("Skipping email from: %s", msg.get("From", ""))
|
|
if msg_id:
|
|
tracker.add(msg_id)
|
|
continue
|
|
|
|
subject = decode_header(msg.get("Subject", ""))
|
|
logger.info("Processing email: %s", subject)
|
|
|
|
event_info = parse_reservation_email(msg)
|
|
if event_info:
|
|
calendar.add_event(event_info)
|
|
else:
|
|
logger.info("Not a reservation email, skipping")
|
|
|
|
if msg_id:
|
|
tracker.add(msg_id)
|
|
|
|
|
|
def has_new_messages(responses) -> bool:
|
|
"""Return True if any IDLE server response indicates new or changed messages."""
|
|
for resp in responses:
|
|
if isinstance(resp, tuple) and len(resp) == 2:
|
|
if resp[1] in (b"EXISTS", b"RECENT"):
|
|
return True
|
|
return False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main loop
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def run(config_path: str):
|
|
with open(config_path) as f:
|
|
config = yaml.safe_load(f)
|
|
|
|
imap_cfg = config["imap"]
|
|
sender_filter = config.get("sender_filter", {})
|
|
|
|
logger.info("Connecting to CalDAV...")
|
|
calendar = CalendarManager(config)
|
|
tracker = ProcessedTracker(config.get("processed_ids_file", ".processed_ids"))
|
|
|
|
filter_email = sender_filter.get("email", "")
|
|
|
|
while True:
|
|
try:
|
|
logger.info("Connecting to IMAP %s...", imap_cfg["host"])
|
|
with IMAPClient(
|
|
imap_cfg["host"],
|
|
port=imap_cfg.get("port", 993),
|
|
ssl=imap_cfg.get("ssl", True),
|
|
) as client:
|
|
client.login(imap_cfg["username"], imap_cfg["password"])
|
|
folder = imap_cfg.get("folder", "INBOX")
|
|
client.select_folder(folder)
|
|
logger.info("Selected folder: %s", folder)
|
|
|
|
# --- Initial scan: process all historical emails from sender ---
|
|
search_criteria = (
|
|
["FROM", filter_email] if filter_email else ["ALL"]
|
|
)
|
|
all_uids = client.search(search_criteria)
|
|
logger.info(
|
|
"Initial scan: %d emails to check from sender", len(all_uids)
|
|
)
|
|
process_messages(client, all_uids, sender_filter, calendar, tracker)
|
|
|
|
# --- IDLE loop: wait for new emails in real time ---
|
|
logger.info("Entering IDLE mode, waiting for new emails...")
|
|
client.idle()
|
|
|
|
while True:
|
|
responses = client.idle_check(timeout=IDLE_REFRESH_SECONDS)
|
|
client.idle_done()
|
|
|
|
if responses and has_new_messages(responses):
|
|
logger.info("New messages detected: %s", responses)
|
|
# Fetch any unseen emails from the sender
|
|
unseen_criteria = (
|
|
["UNSEEN", "FROM", filter_email]
|
|
if filter_email
|
|
else ["UNSEEN"]
|
|
)
|
|
new_uids = client.search(unseen_criteria)
|
|
process_messages(
|
|
client, new_uids, sender_filter, calendar, tracker
|
|
)
|
|
else:
|
|
logger.debug("IDLE keepalive/timeout, refreshing connection")
|
|
|
|
# Re-enter IDLE for the next window
|
|
client.idle()
|
|
|
|
except (IMAPClientError, ConnectionError, TimeoutError, OSError) as e:
|
|
logger.error("Connection error: %s — reconnecting in 30s...", e)
|
|
time.sleep(30)
|
|
except Exception as e:
|
|
logger.exception("Unexpected error: %s — reconnecting in 30s...", e)
|
|
time.sleep(30)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
config_file = sys.argv[1] if len(sys.argv) > 1 else "config.yaml"
|
|
run(config_file)
|