first commit

This commit is contained in:
Thomas Faour 2026-02-18 21:41:08 -05:00
commit 79872c6607
6 changed files with 619 additions and 0 deletions

6
.gitignore vendored Normal file
View File

@ -0,0 +1,6 @@
config.yaml
.processed_ids
venv/
__pycache__/
*.pyc

103
README.md Normal file
View File

@ -0,0 +1,103 @@
# Yoga Email Finder
Monitors an IMAP inbox via IDLE for District Flow Yoga reservation emails and automatically creates events on a CalDAV calendar.
## How it works
1. On startup it scans all existing emails from `info@districtflowyoga.com` and adds any unprocessed reservations to your calendar.
2. It then enters IMAP IDLE so the server pushes new email notifications instantly — no polling.
3. Each processed `Message-ID` is recorded in `.processed_ids` so restarts never create duplicate events.
---
## Installation on Ubuntu Server
### 1. Copy files to the server
```bash
scp -r yoga_email_finder/ youruser@yourserver:/tmp/yoga-email-finder
```
Or clone/copy however you prefer.
### 2. Create a dedicated user and install directory
```bash
sudo useradd --system --no-create-home --shell /usr/sbin/nologin yoga-email-finder
sudo mkdir /opt/yoga-email-finder
sudo cp /tmp/yoga-email-finder/* /opt/yoga-email-finder/
sudo chown -R yoga-email-finder:yoga-email-finder /opt/yoga-email-finder
```
### 3. Create a Python virtual environment and install dependencies
```bash
sudo apt install python3 python3-venv -y
sudo -u yoga-email-finder python3 -m venv /opt/yoga-email-finder/venv
sudo -u yoga-email-finder /opt/yoga-email-finder/venv/bin/pip install -r /opt/yoga-email-finder/requirements.txt
```
### 4. Create the config file
```bash
sudo cp /opt/yoga-email-finder/config.example.yaml /opt/yoga-email-finder/config.yaml
sudo nano /opt/yoga-email-finder/config.yaml
```
Fill in your IMAP and CalDAV credentials. See `config.example.yaml` for all options.
Lock down permissions so only the service user can read the credentials:
```bash
sudo chown yoga-email-finder:yoga-email-finder /opt/yoga-email-finder/config.yaml
sudo chmod 600 /opt/yoga-email-finder/config.yaml
```
### 5. Install and enable the systemd service
```bash
sudo cp /opt/yoga-email-finder/yoga-email-finder.service /etc/systemd/system/
sudo systemctl daemon-reload
sudo systemctl enable yoga-email-finder
sudo systemctl start yoga-email-finder
```
### 6. Check it's running
```bash
sudo systemctl status yoga-email-finder
sudo journalctl -u yoga-email-finder -f
```
---
## Configuration reference
| Key | Required | Default | Description |
|-----|----------|---------|-------------|
| `imap.host` | yes | — | IMAP server hostname |
| `imap.port` | no | `993` | IMAP port |
| `imap.ssl` | no | `true` | Use TLS |
| `imap.username` | yes | — | IMAP login |
| `imap.password` | yes | — | IMAP password |
| `imap.folder` | no | `INBOX` | Mailbox folder to watch |
| `caldav.url` | yes | — | CalDAV calendar URL |
| `caldav.username` | yes | — | CalDAV login |
| `caldav.password` | yes | — | CalDAV password |
| `caldav.calendar_name` | no | first calendar | Target calendar name |
| `sender_filter.email` | no | — | Filter by sender address |
| `sender_filter.name` | no | — | Filter by sender display name (fallback) |
| `timezone` | no | system local | IANA timezone for events (e.g. `America/New_York`) |
| `class_duration_minutes` | no | `60` | Assumed class length |
| `processed_ids_file` | no | `.processed_ids` | Path to the duplicate-tracking file |
---
## Updating
```bash
sudo systemctl stop yoga-email-finder
sudo cp new-version/yoga_email_finder.py /opt/yoga-email-finder/
sudo systemctl start yoga-email-finder
```

25
config.example.yaml Normal file
View File

@ -0,0 +1,25 @@
# Yoga Email Finder — example configuration
# Copy to config.yaml and fill in your credentials.
imap:
host: imap.example.com
port: 993 # optional, default 993
ssl: true # optional, default true
username: you@example.com
password: your-imap-password
folder: INBOX # optional, default INBOX
caldav:
url: https://caldav.example.com/dav/principals/users/you/calendars/
username: you
password: your-caldav-password
calendar_name: Yoga # optional — uses first calendar if omitted
sender_filter:
email: info@districtflowyoga.com
name: District Flow Yoga # optional fallback check on the From display name
# Optional settings
timezone: America/New_York # IANA timezone for calendar events; omit to use system local time
class_duration_minutes: 60 # assumed class length (default: 60)
processed_ids_file: .processed_ids # tracks processed Message-IDs across restarts

5
requirements.txt Normal file
View File

@ -0,0 +1,5 @@
imapclient>=2.3.1
caldav>=1.3.9
icalendar>=5.0.12
pyyaml>=6.0.2
beautifulsoup4>=4.12.3

21
yoga-email-finder.service Normal file
View File

@ -0,0 +1,21 @@
[Unit]
Description=Yoga Email Finder — District Flow Yoga to CalDAV
After=network-online.target
Wants=network-online.target
[Service]
Type=simple
User=yoga-email-finder
WorkingDirectory=/opt/yoga-email-finder
ExecStart=/opt/yoga-email-finder/venv/bin/python yoga_email_finder.py config.yaml
Restart=on-failure
RestartSec=30
# Least-privilege hardening
NoNewPrivileges=true
PrivateTmp=true
ProtectSystem=strict
ReadWritePaths=/opt/yoga-email-finder
[Install]
WantedBy=multi-user.target

459
yoga_email_finder.py Normal file
View File

@ -0,0 +1,459 @@
#!/usr/bin/env python3
"""
Yoga Email Finder
Monitors an IMAP inbox via IDLE for District Flow Yoga reservation emails
and creates CalDAV calendar events automatically.
Usage:
python yoga_email_finder.py [config.yaml]
"""
import re
import sys
import uuid
import logging
import time
import email
import email.header
import unicodedata
from datetime import datetime, timedelta
from email.utils import parseaddr
from pathlib import Path
from zoneinfo import ZoneInfo
import yaml
import caldav
import icalendar
from bs4 import BeautifulSoup
from imapclient import IMAPClient
from imapclient.exceptions import IMAPClientError
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger(__name__)
# How often to refresh the IDLE command (servers drop it after ~30 min)
IDLE_REFRESH_SECONDS = 29 * 60
# ---------------------------------------------------------------------------
# Email parsing helpers
# ---------------------------------------------------------------------------
def decode_header(value: str) -> str:
"""Decode an encoded email header value (e.g. =?UTF-8?B?...?=)."""
parts = email.header.decode_header(value)
decoded = []
for part, charset in parts:
if isinstance(part, bytes):
decoded.append(part.decode(charset or "utf-8", errors="replace"))
else:
decoded.append(part)
return "".join(decoded)
def normalize_whitespace(text: str) -> str:
"""
Replace all Unicode whitespace variants (narrow no-break space U+202F, etc.)
with a plain ASCII space, then collapse runs.
"""
result = []
for ch in text:
cat = unicodedata.category(ch)
if cat.startswith("Z") or cat in ("Cc", "Cf") or ch in "\t\n\r\f\v":
result.append(" ")
else:
result.append(ch)
return re.sub(r" +", " ", "".join(result))
def extract_text_from_html(html: str) -> str:
"""Convert HTML to plain text using BeautifulSoup."""
soup = BeautifulSoup(html, "html.parser")
return soup.get_text(separator=" ", strip=True)
def get_email_body(msg: email.message.Message) -> str:
"""Extract the best available plain-text body from an email message."""
plain = ""
html = ""
if msg.is_multipart():
for part in msg.walk():
ct = part.get_content_type()
if ct == "text/plain" and not plain:
payload = part.get_payload(decode=True)
if payload:
plain = payload.decode(
part.get_content_charset() or "utf-8", errors="replace"
)
elif ct == "text/html" and not html:
payload = part.get_payload(decode=True)
if payload:
html = payload.decode(
part.get_content_charset() or "utf-8", errors="replace"
)
else:
payload = msg.get_payload(decode=True)
if payload:
content = payload.decode(
msg.get_content_charset() or "utf-8", errors="replace"
)
if msg.get_content_type() == "text/html":
html = content
else:
plain = content
if plain.strip():
return normalize_whitespace(plain)
if html.strip():
return normalize_whitespace(extract_text_from_html(html))
return ""
def parse_reservation_email(msg: email.message.Message) -> dict | None:
"""
Parse a District Flow Yoga reservation confirmation email.
Expected subject: "You reserved {class} at {time} on {date}!"
Expected body: "Your spot is reserved for {class} with {instructor}!
We will see you at {time} on {day}, {date}."
Returns a dict with event details, or None if not a reservation email.
"""
subject = decode_header(msg.get("Subject", ""))
# Quick subject check before doing expensive body parsing
if not re.search(r"you reserved", subject, re.IGNORECASE):
return None
body = get_email_body(msg)
if not body:
logger.warning("Email body is empty")
return None
logger.debug("Body snippet: %s", body[:300])
# Primary pattern from the email body
pattern = (
r"Your spot is reserved for (.+?) with (.+?)!"
r".{0,80}?"
r"We will see you at (.+?) on (?:\w+,\s+)?(.+?)\."
)
match = re.search(pattern, body, re.DOTALL | re.IGNORECASE)
if not match:
logger.warning("Reservation body pattern not found. Subject: %s", subject)
return None
class_name = match.group(1).strip()
instructor = match.group(2).strip()
time_str = match.group(3).strip()
date_str = match.group(4).strip()
# Parse datetime — try several formats to be robust
dt_combined = f"{date_str} {time_str}".strip()
start_dt = None
for fmt in (
"%B %d, %Y %I:%M %p", # March 2, 2026 8:00 PM
"%B %d, %Y %I:%M%p", # March 2, 2026 8:00PM
"%B %d, %Y %H:%M", # March 2, 2026 20:00
"%m/%d/%Y %I:%M %p", # 3/2/2026 8:00 PM
"%m/%d/%Y %I:%M%p", # 3/2/2026 8:00PM
):
try:
start_dt = datetime.strptime(dt_combined, fmt)
break
except ValueError:
continue
if start_dt is None:
logger.warning("Could not parse datetime from: '%s'", dt_combined)
return None
return {
"class_name": class_name,
"instructor": instructor,
"start_dt": start_dt,
"summary": f"Yoga: {class_name} with {instructor}",
"description": (
f"District Flow Yoga\n"
f"Class: {class_name}\n"
f"Instructor: {instructor}\n"
f"Time: {start_dt.strftime('%I:%M %p on %A, %B %d, %Y')}"
),
}
# ---------------------------------------------------------------------------
# CalDAV helpers
# ---------------------------------------------------------------------------
class CalendarManager:
"""Manages creating events on a CalDAV calendar."""
def __init__(self, config: dict):
caldav_cfg = config["caldav"]
self.client = caldav.DAVClient(
url=caldav_cfg["url"],
username=caldav_cfg["username"],
password=caldav_cfg["password"],
)
self.calendar = self._get_calendar(caldav_cfg.get("calendar_name"))
self.class_duration = timedelta(
minutes=config.get("class_duration_minutes", 60)
)
tz_name = config.get("timezone")
self.tzinfo = ZoneInfo(tz_name) if tz_name else None
def _get_calendar(self, name: str | None):
principal = self.client.principal()
calendars = principal.calendars()
if not calendars:
raise RuntimeError("No calendars found on CalDAV server")
if name:
for cal in calendars:
if cal.name and cal.name.lower() == name.lower():
logger.info("Using calendar: %s", cal.name)
return cal
logger.warning(
"Calendar '%s' not found, using: %s", name, calendars[0].name
)
else:
logger.info("Using calendar: %s", calendars[0].name)
return calendars[0]
def event_exists(self, summary: str, start_dt: datetime) -> bool:
"""Check whether an event with the same title already exists on that day."""
try:
day_start = start_dt.replace(hour=0, minute=0, second=0, microsecond=0)
day_end = day_start + timedelta(days=1)
events = self.calendar.date_search(start=day_start, end=day_end)
for ev in events:
try:
cal_obj = icalendar.Calendar.from_ical(ev.data)
for component in cal_obj.walk():
if component.name == "VEVENT":
ev_summary = str(component.get("SUMMARY", ""))
if ev_summary == summary:
return True
except Exception:
pass
except Exception as e:
logger.warning("Could not check for duplicate events: %s", e)
return False
def add_event(self, event_info: dict) -> bool:
"""
Add a yoga class event to the calendar.
Returns True if added, False if a duplicate was detected.
"""
start_dt = event_info["start_dt"]
# Apply timezone if configured
if self.tzinfo:
start_dt = start_dt.replace(tzinfo=self.tzinfo)
if self.event_exists(event_info["summary"], start_dt):
logger.info("Event already exists, skipping: %s", event_info["summary"])
return False
end_dt = start_dt + self.class_duration
cal = icalendar.Calendar()
cal.add("prodid", "-//Yoga Email Finder//EN")
cal.add("version", "2.0")
ev = icalendar.Event()
ev.add("uid", str(uuid.uuid4()))
ev.add("summary", event_info["summary"])
ev.add("description", event_info["description"])
ev.add("dtstart", start_dt)
ev.add("dtend", end_dt)
ev.add("dtstamp", datetime.now())
cal.add_component(ev)
self.calendar.add_event(cal.to_ical().decode("utf-8"))
logger.info("Added event: %s at %s", event_info["summary"], start_dt)
return True
# ---------------------------------------------------------------------------
# Duplicate tracking across restarts
# ---------------------------------------------------------------------------
class ProcessedTracker:
"""
Tracks Message-IDs of processed emails in a flat file so we don't
re-add calendar events after a restart.
"""
def __init__(self, path: str = ".processed_ids"):
self.path = Path(path)
self.ids: set[str] = set()
self._load()
def _load(self):
if self.path.exists():
self.ids = set(self.path.read_text().splitlines())
logger.debug("Loaded %d processed message IDs", len(self.ids))
def contains(self, msg_id: str) -> bool:
return msg_id in self.ids
def add(self, msg_id: str):
self.ids.add(msg_id)
with self.path.open("a") as f:
f.write(msg_id + "\n")
# ---------------------------------------------------------------------------
# IMAP / message processing
# ---------------------------------------------------------------------------
def is_from_sender(msg: email.message.Message, sender_filter: dict) -> bool:
"""Return True if the message is from the configured sender."""
from_raw = msg.get("From", "")
_, from_addr = parseaddr(from_raw)
filter_email = sender_filter.get("email", "").lower()
filter_name = sender_filter.get("name", "").lower()
if filter_email and from_addr.lower() == filter_email:
return True
if filter_name and filter_name in from_raw.lower():
return True
return False
def process_messages(
client: IMAPClient,
uids: list,
sender_filter: dict,
calendar: CalendarManager,
tracker: ProcessedTracker,
):
"""Fetch and process a list of message UIDs."""
if not uids:
return
fetch_data = client.fetch(uids, ["RFC822"])
for uid, data in fetch_data.items():
raw = data.get(b"RFC822")
if not raw:
continue
msg = email.message_from_bytes(raw)
msg_id = msg.get("Message-ID", "").strip()
if msg_id and tracker.contains(msg_id):
logger.debug("Already processed %s, skipping", msg_id)
continue
if not is_from_sender(msg, sender_filter):
logger.debug("Skipping email from: %s", msg.get("From", ""))
if msg_id:
tracker.add(msg_id)
continue
subject = decode_header(msg.get("Subject", ""))
logger.info("Processing email: %s", subject)
event_info = parse_reservation_email(msg)
if event_info:
calendar.add_event(event_info)
else:
logger.info("Not a reservation email, skipping")
if msg_id:
tracker.add(msg_id)
def has_new_messages(responses) -> bool:
"""Return True if any IDLE server response indicates new or changed messages."""
for resp in responses:
if isinstance(resp, tuple) and len(resp) == 2:
if resp[1] in (b"EXISTS", b"RECENT"):
return True
return False
# ---------------------------------------------------------------------------
# Main loop
# ---------------------------------------------------------------------------
def run(config_path: str):
with open(config_path) as f:
config = yaml.safe_load(f)
imap_cfg = config["imap"]
sender_filter = config.get("sender_filter", {})
logger.info("Connecting to CalDAV...")
calendar = CalendarManager(config)
tracker = ProcessedTracker(config.get("processed_ids_file", ".processed_ids"))
filter_email = sender_filter.get("email", "")
while True:
try:
logger.info("Connecting to IMAP %s...", imap_cfg["host"])
with IMAPClient(
imap_cfg["host"],
port=imap_cfg.get("port", 993),
ssl=imap_cfg.get("ssl", True),
) as client:
client.login(imap_cfg["username"], imap_cfg["password"])
folder = imap_cfg.get("folder", "INBOX")
client.select_folder(folder)
logger.info("Selected folder: %s", folder)
# --- Initial scan: process all historical emails from sender ---
search_criteria = (
["FROM", filter_email] if filter_email else ["ALL"]
)
all_uids = client.search(search_criteria)
logger.info(
"Initial scan: %d emails to check from sender", len(all_uids)
)
process_messages(client, all_uids, sender_filter, calendar, tracker)
# --- IDLE loop: wait for new emails in real time ---
logger.info("Entering IDLE mode, waiting for new emails...")
client.idle()
while True:
responses = client.idle_check(timeout=IDLE_REFRESH_SECONDS)
client.idle_done()
if responses and has_new_messages(responses):
logger.info("New messages detected: %s", responses)
# Fetch any unseen emails from the sender
unseen_criteria = (
["UNSEEN", "FROM", filter_email]
if filter_email
else ["UNSEEN"]
)
new_uids = client.search(unseen_criteria)
process_messages(
client, new_uids, sender_filter, calendar, tracker
)
else:
logger.debug("IDLE keepalive/timeout, refreshing connection")
# Re-enter IDLE for the next window
client.idle()
except (IMAPClientError, ConnectionError, TimeoutError, OSError) as e:
logger.error("Connection error: %s — reconnecting in 30s...", e)
time.sleep(30)
except Exception as e:
logger.exception("Unexpected error: %s — reconnecting in 30s...", e)
time.sleep(30)
if __name__ == "__main__":
config_file = sys.argv[1] if len(sys.argv) > 1 else "config.yaml"
run(config_file)