Files
www/mailsync.py
2026-02-19 19:10:13 -05:00

302 lines
12 KiB
Python

# Synchronizes a fileserver with an IMAP inbox and generates
# index.html and rss_feed.xml files.
#
# Written by Eric Meehan and GPT4o-mini
import email
import html
import imaplib
import os
import re
from collections import defaultdict
from datetime import datetime
from email.header import decode_header
# Email account credentials
USERNAME = os.getenv("USERNAME")
PASSWORD = os.getenv("PASSWORD")
IMAP_SERVER = os.getenv("IMAP_SERVER") # e.g., 'imap.gmail.com' for Gmail
SAVE_DIR = os.getenv("SAVE_DIR") # Directory to save .eml files
DOCUMENTS_DIR = f'{SAVE_DIR}/archive/Documents'
DOWNLOADS_DIR = f'{SAVE_DIR}/archive/Downloads'
MAIL_DIR = f'{SAVE_DIR}/archive/Mail'
MUSIC_DIR = f'{SAVE_DIR}/archive/Music'
PICTURES_DIR = f'{SAVE_DIR}/archive/Pictures'
VIDEOS_DIR = f'{SAVE_DIR}/archive/Videos'
HTML = """
<!DOCTYPE html>
<html lang="en">
<meta charset="UTF-8">
<title>eom.dev - Mailbox</title>
<meta name="viewport" content="width=device-width,initial-scale=1">
<link rel="stylesheet" href="/common/catppuccin.css">
<link rel="icon" type="image/x-icon" href="/common/favicon.ico">
<style>
</style>
<script src="/common/ddg.js"></script>
<body>
<h1>eom.dev</h1>
<h2>Mailbox</h2>
<a href="/">Home
</a> | <a href="/archive">Archive
</a> | <a href="/archive/Repositories">Repositories
</a> | <a href="/stream">Live Stream
</a> | <a href="/archive/Mail">Mailbox
</a> | <a href="/chat">Chat
</a> | <a href="/donate">Donate
</a>
<h3>Browse Email Threads</h3>
<p>
This is a public discussion board powered by email. The source code is available <a href='/archive/Repositories/?p=www;a=tree'>here</a>.<br>
Email or CC <code>public-mailbox@eom.dev</code> to have your message appear here.<br>
Import the <code>.eml</code> file linked in the <i>ID</i> column into your email client to join an existing thread.<br>
<a href='https://en.wikipedia.org/wiki/Digital_signature'>Digital signatures</a> are encouraged. Encryption is not supported.<br>
This board is slightly easier to browse when threads are quoted in replies.<br>
Get updates via the <a href="/archive/Mail/rss_feed.xml">RSS feed</a>.
</p>
<!-- DuckDuckGo Site Search -->
<form
id="ddg-site-search"
action="https://duckduckgo.com/"
method="get"
target="_blank"
>
<input
type="search"
name="q"
id="ddg-query"
placeholder="Search with DuckDuckGo..."
aria-label="Search with DuckDuckGo"
required
/>
</form>
<table>
<tr>
<th>Date</th><th>From</th><th>Subject</th><th>ID</th>
</tr>
{content}
</table>
</body>
</html>
"""
# Connect to the IMAP server and log in
def connect_to_email():
mail = imaplib.IMAP4_SSL(IMAP_SERVER)
mail.login(USERNAME, PASSWORD)
return mail
# Create a directory for saving emails
def create_save_directory():
for each in [SAVE_DIR, DOCUMENTS_DIR, DOWNLOADS_DIR, MAIL_DIR, MUSIC_DIR, PICTURES_DIR, VIDEOS_DIR]:
if not os.path.exists(each):
os.makedirs(each)
if not os.path.exists(f'{MAIL_DIR}/raw'):
os.makedirs(f'{MAIL_DIR}/raw')
# Sanitize the filename by removing invalid characters
def sanitize_filename(filename):
return re.sub(r'[ <>:"/\\|?*]', '_', filename).strip()
# Get the content of the email message
def get_email_content(msg):
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
if content_type == "text/plain" or content_type == "text/html":
return part.get_payload(decode=True), content_type
else:
return msg.get_payload(decode=True), msg.get_content_type()
return None, None
# Fetch emails
def fetch_emails(mail):
mail.select("inbox") # Select the mailbox
status, messages = mail.search(None, "ALL")
threads = defaultdict(list)
message_ids = {}
for num in messages[0].split():
status, msg_data = mail.fetch(num, '(RFC822)')
msg = email.message_from_bytes(msg_data[0][1])
subject, encoding = decode_header(msg['Subject'])[0]
if isinstance(subject, bytes):
subject = subject.decode(encoding if encoding else 'utf-8')
if subject == "":
subject = "No subject"
msg_from, encoding = decode_header(msg['From'])[0]
if isinstance(msg_from, bytes):
msg_from = msg_from.decode(encoding if encoding else 'utf-8')
msg_date = format_date(msg['Date'])
msg_id = msg.get('Message-ID')
in_reply_to = msg.get('In-Reply-To')
# Store message for threading
message_ids[msg_id] = {'subject': subject, 'id': num, 'message': msg, 'from': msg_from, 'date': msg_date}
if in_reply_to:
threads[in_reply_to].append(msg_id)
else:
threads[msg_id] = [] # This is a top-level message
# Save the email as a .eml file with Message-ID
save_email_as_eml(msg_data[0][1], subject, msg_id)
# Save the email content in an appropriate file
content, content_type = get_email_content(msg)
if content_type:
save_email_content(content, subject, msg_id, content_type)
# Handle attachments
if msg.is_multipart():
for part in msg.walk():
if part.get_content_disposition() == 'attachment':
save_attachment(part)
return threads, message_ids
# Save attachments based on their type
def save_attachment(part):
filename = part.get_filename()
if filename:
# Determine save directory based on file type
file_path = None
if filename.lower().endswith(('.jpg', '.jpeg', '.png', '.gif', '.bmp')):
file_path = os.path.join(PICTURES_DIR, filename)
elif filename.lower().endswith(('.pdf', '.txt', '.json', '.yml', '.yaml', '.csv')):
file_path = os.path.join(DOCUMENTS_DIR, filename)
elif filename.lower().endswith(('.mp3', '.wav', '.aac', '.flac')):
file_path = os.path.join(MUSIC_DIR, filename)
elif filename.lower().endswith(('.mp4', '.mov', '.avi', '.wmv', '.flv')):
file_path = os.path.join(VIDEOS_DIR, filename)
else:
file_path = os.path.join(DOWNLOADS_DIR, filename)
# Save the attachment
with open(file_path, 'wb') as f:
f.write(part.get_payload(decode=True))
return file_path
return None
# Save email as .eml file
def save_email_as_eml(raw_email, subject, msg_id):
clean_subject = sanitize_filename("".join(c for c in subject if c.isalnum() or c in (' ', '-', '_')).strip())
clean_msg_id = sanitize_filename(msg_id.strip('<>').replace('@', '_'))
filename = f"{clean_subject}_{clean_msg_id}.eml"
file_path = os.path.join(MAIL_DIR, 'raw', filename)
counter = 1
while os.path.exists(file_path):
file_path = os.path.join(MAIL_DIR, 'raw', f"{clean_subject}_{clean_msg_id}_{counter}.eml")
counter += 1
with open(file_path, 'wb') as f:
f.write(raw_email)
# Save email content to an appropriate file
def save_email_content(content, subject, msg_id, content_type):
if content_type == "text/plain":
extension = ".txt"
elif content_type == "text/html":
extension = ".html"
else:
extension = ".txt" # Default to .txt if unknown
clean_subject = sanitize_filename("".join(c for c in subject if c.isalnum() or c in (' ', '-', '_')).strip())
clean_msg_id = sanitize_filename(msg_id.strip('<>').replace('@', '_'))
filename = f"{clean_subject}_{clean_msg_id}{extension}"
file_path = os.path.join(MAIL_DIR, 'raw', filename)
counter = 1
while os.path.exists(file_path):
file_path = os.path.join(MAIL_DIR, 'raw', f"{clean_subject}_{clean_msg_id}_{counter}{extension}")
counter += 1
with open(file_path, 'wb') as f:
f.write(content)
# Display emails in threads
def display_threaded_emails(threads, message_ids):
displayed = set() # Track displayed messages to avoid duplicates
content = ""
def display_message(msg_id, indent_level):
content = ""
if msg_id in displayed:
return
displayed.add(msg_id)
msg = message_ids[msg_id]
clean_subject = sanitize_filename("".join(c for c in msg['subject'] if c.isalnum() or c in (' ', '-', '_')).strip())
clean_msg_id = sanitize_filename(msg_id.strip('<>').replace('@', '_'))
subject_hyperlink = f'<a href="{os.path.join("/archive/Mail/raw", f"{clean_subject}_{clean_msg_id}" + (".html" if msg["message"].get_content_type() == "text/html" else ".txt"))}">{msg["subject"]}</a>'
id_hyperlink = f'<a href="{os.path.join("/archive/Mail/raw", f"{clean_subject}_{clean_msg_id}" + ".eml")}">({msg["id"]})</a>'
msg_date = msg['date'].replace('<', '&lt').replace('>', '&gt')
msg_from = msg['from'].replace('<', '&lt').replace('>', '&gt')
indent = f"{'' * 4 * indent_level}" if indent_level > 0 else ""
content += f"<td>{msg_date}</td><td><strong>{msg_from}</td><td style='padding-left: {indent_level * 25}px;'>{subject_hyperlink}</td><td>{id_hyperlink}</td>\n"
# Display replies, if any
for reply_id in threads.get(msg_id, []):
content += "<tr>\n"
content += display_message(reply_id, indent_level + 1)
content += "</tr>\n"
return content
# Display top-level messages
rows = []
for root_id in message_ids:
if root_id not in displayed:
rows.append(display_message(root_id, 0))
for row in reversed(rows):
content += "<tr>\n"
content += row
content += "</tr>\n"
return content
# Function to generate RSS feed content
def generate_rss_feed(threads, message_ids):
rss_items = []
rss_channel_title = "eom.dev"
rss_channel_link = "https://eom.dev/archive/Mail/rss_feed.xml"
rss_channel_description = "RSS feed of eom.dev discussion board"
for msg_id in message_ids:
msg = message_ids[msg_id]
# Escape special characters for XML
subject = html.escape(msg['subject'])
msg_from = html.escape(msg['from'])
clean_subject = sanitize_filename(subject)
clean_msg_id = sanitize_filename(msg_id.strip('<>').replace('@', '_'))
msg_link = os.path.join("/archive/Mail/raw", f"{clean_subject}_{clean_msg_id}.eml")
# Create RSS item
item = f"""
<item>
<title>{subject}</title>
<link>{msg_link}</link>
<description>{f'Sender: {msg_from} \nDate: {msg["date"]}'}</description>
<pubDate>{format_date(msg['date'])}</pubDate>
<guid isPermaLink="false">{msg_id.strip('<>')}</guid> <!-- Remove angle brackets -->
</item>
"""
rss_items.append(item)
rss_feed = f"""<?xml version="1.0" encoding="UTF-8" ?>
<rss version="2.0">
<channel>
<title>{rss_channel_title}</title>
<link>{rss_channel_link}</link>
<description>{rss_channel_description}</description>
{''.join(rss_items)}
</channel>
</rss>
"""
return rss_feed
# Helper function to format dates for RSS
def format_date(original_date):
# Convert date to RFC 822 format
try:
datetime_obj = email.utils.parsedate_to_datetime(original_date)
return datetime_obj.strftime("%a, %d %b %Y %H:%M:%S +0000")
except Exception as e:
print(f"Error formatting date: {e}")
return original_date
# Main execution
if __name__ == "__main__":
create_save_directory() # Create directory to save emails
mail = connect_to_email()
threads, message_ids = fetch_emails(mail)
with open(f'{MAIL_DIR}/index.html', 'w') as f:
f.write(HTML.format(content=display_threaded_emails(threads, message_ids)))
rss_feed_content = generate_rss_feed(threads, message_ids)
with open(f'{MAIL_DIR}/rss_feed.xml', 'w') as rss_file:
rss_file.write(rss_feed_content)
mail.logout()