# Synchronizes a fileserver with an IMAP inbox and generates # index.html and rss_feed.xml files. # # Written by Eric Meehan and GPT4o-mini import email import html import imaplib import os import re from collections import defaultdict from datetime import datetime from email.header import decode_header # Email account credentials USERNAME = os.getenv("USERNAME") PASSWORD = os.getenv("PASSWORD") IMAP_SERVER = os.getenv("IMAP_SERVER") # e.g., 'imap.gmail.com' for Gmail SAVE_DIR = os.getenv("SAVE_DIR") # Directory to save .eml files DOCUMENTS_DIR = f'{SAVE_DIR}/archive/Documents' DOWNLOADS_DIR = f'{SAVE_DIR}/archive/Downloads' MAIL_DIR = f'{SAVE_DIR}/archive/Mail' MUSIC_DIR = f'{SAVE_DIR}/archive/Music' PICTURES_DIR = f'{SAVE_DIR}/archive/Pictures' VIDEOS_DIR = f'{SAVE_DIR}/archive/Videos' HTML = """ eom.dev - Mailbox

eom.dev

Mailbox

Home | Archive | Repositories | Live Stream | Mailbox | Chat | Donate

Browse Email Threads

This is a public discussion board powered by email. The source code is available here.
Email or CC public-mailbox@eom.dev to have your message appear here.
Import the .eml file linked in the ID column into your email client to join an existing thread.
Digital signatures are encouraged. Encryption is not supported.
This board is slightly easier to browse when threads are quoted in replies.
Get updates via the RSS feed.

{content}
DateFromSubjectID
""" # Connect to the IMAP server and log in def connect_to_email(): mail = imaplib.IMAP4_SSL(IMAP_SERVER) mail.login(USERNAME, PASSWORD) return mail # Create a directory for saving emails def create_save_directory(): for each in [SAVE_DIR, DOCUMENTS_DIR, DOWNLOADS_DIR, MAIL_DIR, MUSIC_DIR, PICTURES_DIR, VIDEOS_DIR]: if not os.path.exists(each): os.makedirs(each) if not os.path.exists(f'{MAIL_DIR}/raw'): os.makedirs(f'{MAIL_DIR}/raw') # Sanitize the filename by removing invalid characters def sanitize_filename(filename): return re.sub(r'[ <>:"/\\|?*]', '_', filename).strip() # Get the content of the email message def get_email_content(msg): if msg.is_multipart(): for part in msg.walk(): content_type = part.get_content_type() if content_type == "text/plain" or content_type == "text/html": return part.get_payload(decode=True), content_type else: return msg.get_payload(decode=True), msg.get_content_type() return None, None # Fetch emails def fetch_emails(mail): mail.select("inbox") # Select the mailbox status, messages = mail.search(None, "ALL") threads = defaultdict(list) message_ids = {} for num in messages[0].split(): status, msg_data = mail.fetch(num, '(RFC822)') msg = email.message_from_bytes(msg_data[0][1]) subject, encoding = decode_header(msg['Subject'])[0] if isinstance(subject, bytes): subject = subject.decode(encoding if encoding else 'utf-8') if subject == "": subject = "No subject" msg_from, encoding = decode_header(msg['From'])[0] if isinstance(msg_from, bytes): msg_from = msg_from.decode(encoding if encoding else 'utf-8') msg_date = format_date(msg['Date']) msg_id = msg.get('Message-ID') in_reply_to = msg.get('In-Reply-To') # Store message for threading message_ids[msg_id] = {'subject': subject, 'id': num, 'message': msg, 'from': msg_from, 'date': msg_date} if in_reply_to: threads[in_reply_to].append(msg_id) else: threads[msg_id] = [] # This is a top-level message # Save the email as a .eml file with Message-ID save_email_as_eml(msg_data[0][1], subject, msg_id) # Save the email content in an appropriate file content, content_type = get_email_content(msg) if content_type: save_email_content(content, subject, msg_id, content_type) # Handle attachments if msg.is_multipart(): for part in msg.walk(): if part.get_content_disposition() == 'attachment': save_attachment(part) return threads, message_ids # Save attachments based on their type def save_attachment(part): filename = part.get_filename() if filename: # Determine save directory based on file type file_path = None if filename.lower().endswith(('.jpg', '.jpeg', '.png', '.gif', '.bmp')): file_path = os.path.join(PICTURES_DIR, filename) elif filename.lower().endswith(('.pdf', '.txt', '.json', '.yml', '.yaml', '.csv')): file_path = os.path.join(DOCUMENTS_DIR, filename) elif filename.lower().endswith(('.mp3', '.wav', '.aac', '.flac')): file_path = os.path.join(MUSIC_DIR, filename) elif filename.lower().endswith(('.mp4', '.mov', '.avi', '.wmv', '.flv')): file_path = os.path.join(VIDEOS_DIR, filename) else: file_path = os.path.join(DOWNLOADS_DIR, filename) # Save the attachment with open(file_path, 'wb') as f: f.write(part.get_payload(decode=True)) return file_path return None # Save email as .eml file def save_email_as_eml(raw_email, subject, msg_id): clean_subject = sanitize_filename("".join(c for c in subject if c.isalnum() or c in (' ', '-', '_')).strip()) clean_msg_id = sanitize_filename(msg_id.strip('<>').replace('@', '_')) filename = f"{clean_subject}_{clean_msg_id}.eml" file_path = os.path.join(MAIL_DIR, 'raw', filename) counter = 1 while os.path.exists(file_path): file_path = os.path.join(MAIL_DIR, 'raw', f"{clean_subject}_{clean_msg_id}_{counter}.eml") counter += 1 with open(file_path, 'wb') as f: f.write(raw_email) # Save email content to an appropriate file def save_email_content(content, subject, msg_id, content_type): if content_type == "text/plain": extension = ".txt" elif content_type == "text/html": extension = ".html" else: extension = ".txt" # Default to .txt if unknown clean_subject = sanitize_filename("".join(c for c in subject if c.isalnum() or c in (' ', '-', '_')).strip()) clean_msg_id = sanitize_filename(msg_id.strip('<>').replace('@', '_')) filename = f"{clean_subject}_{clean_msg_id}{extension}" file_path = os.path.join(MAIL_DIR, 'raw', filename) counter = 1 while os.path.exists(file_path): file_path = os.path.join(MAIL_DIR, 'raw', f"{clean_subject}_{clean_msg_id}_{counter}{extension}") counter += 1 with open(file_path, 'wb') as f: f.write(content) # Display emails in threads def display_threaded_emails(threads, message_ids): displayed = set() # Track displayed messages to avoid duplicates content = "" def display_message(msg_id, indent_level): content = "" if msg_id in displayed: return displayed.add(msg_id) msg = message_ids[msg_id] clean_subject = sanitize_filename("".join(c for c in msg['subject'] if c.isalnum() or c in (' ', '-', '_')).strip()) clean_msg_id = sanitize_filename(msg_id.strip('<>').replace('@', '_')) subject_hyperlink = f'{msg["subject"]}' id_hyperlink = f'({msg["id"]})' msg_date = msg['date'].replace('<', '<').replace('>', '>') msg_from = msg['from'].replace('<', '<').replace('>', '>') indent = f"{'' * 4 * indent_level}" if indent_level > 0 else "" content += f"{msg_date}{msg_from}{subject_hyperlink}{id_hyperlink}\n" # Display replies, if any for reply_id in threads.get(msg_id, []): content += "\n" content += display_message(reply_id, indent_level + 1) content += "\n" return content # Display top-level messages rows = [] for root_id in message_ids: if root_id not in displayed: rows.append(display_message(root_id, 0)) for row in reversed(rows): content += "\n" content += row content += "\n" return content # Function to generate RSS feed content def generate_rss_feed(threads, message_ids): rss_items = [] rss_channel_title = "eom.dev" rss_channel_link = "https://eom.dev/archive/Mail/rss_feed.xml" rss_channel_description = "RSS feed of eom.dev discussion board" for msg_id in message_ids: msg = message_ids[msg_id] # Escape special characters for XML subject = html.escape(msg['subject']) msg_from = html.escape(msg['from']) clean_subject = sanitize_filename(subject) clean_msg_id = sanitize_filename(msg_id.strip('<>').replace('@', '_')) msg_link = os.path.join("/archive/Mail/raw", f"{clean_subject}_{clean_msg_id}.eml") # Create RSS item item = f""" {subject} {msg_link} {f'Sender: {msg_from} \nDate: {msg["date"]}'} {format_date(msg['date'])} {msg_id.strip('<>')} """ rss_items.append(item) rss_feed = f""" {rss_channel_title} {rss_channel_link} {rss_channel_description} {''.join(rss_items)} """ return rss_feed # Helper function to format dates for RSS def format_date(original_date): # Convert date to RFC 822 format try: datetime_obj = email.utils.parsedate_to_datetime(original_date) return datetime_obj.strftime("%a, %d %b %Y %H:%M:%S +0000") except Exception as e: print(f"Error formatting date: {e}") return original_date # Main execution if __name__ == "__main__": create_save_directory() # Create directory to save emails mail = connect_to_email() threads, message_ids = fetch_emails(mail) with open(f'{MAIL_DIR}/index.html', 'w') as f: f.write(HTML.format(content=display_threaded_emails(threads, message_ids))) rss_feed_content = generate_rss_feed(threads, message_ids) with open(f'{MAIL_DIR}/rss_feed.xml', 'w') as rss_file: rss_file.write(rss_feed_content) mail.logout()