# Synchronizes a fileserver with an IMAP inbox and generates
# index.html and rss_feed.xml files.
#
# Written by Eric Meehan and GPT4o-mini
import email
import html
import imaplib
import os
import re
from collections import defaultdict
from datetime import datetime
from email.header import decode_header
# Email account credentials
USERNAME = os.getenv("USERNAME")
PASSWORD = os.getenv("PASSWORD")
IMAP_SERVER = os.getenv("IMAP_SERVER") # e.g., 'imap.gmail.com' for Gmail
SAVE_DIR = os.getenv("SAVE_DIR") # Directory to save .eml files
DOCUMENTS_DIR = f'{SAVE_DIR}/archive/Documents'
DOWNLOADS_DIR = f'{SAVE_DIR}/archive/Downloads'
MAIL_DIR = f'{SAVE_DIR}/archive/Mail'
MUSIC_DIR = f'{SAVE_DIR}/archive/Music'
PICTURES_DIR = f'{SAVE_DIR}/archive/Pictures'
VIDEOS_DIR = f'{SAVE_DIR}/archive/Videos'
HTML = """
This is a public discussion board powered by email. The source code is available here.
Email or CC public-mailbox@eom.dev to have your message appear here.
Import the .eml file linked in the ID column into your email client to join an existing thread. Digital signatures are encouraged. Encryption is not supported.
This board is slightly easier to browse when threads are quoted in replies.
Get updates via the RSS feed.
Date
From
Subject
ID
{content}
"""
# Connect to the IMAP server and log in
def connect_to_email():
mail = imaplib.IMAP4_SSL(IMAP_SERVER)
mail.login(USERNAME, PASSWORD)
return mail
# Create a directory for saving emails
def create_save_directory():
for each in [SAVE_DIR, DOCUMENTS_DIR, DOWNLOADS_DIR, MAIL_DIR, MUSIC_DIR, PICTURES_DIR, VIDEOS_DIR]:
if not os.path.exists(each):
os.makedirs(each)
if not os.path.exists(f'{MAIL_DIR}/raw'):
os.makedirs(f'{MAIL_DIR}/raw')
# Sanitize the filename by removing invalid characters
def sanitize_filename(filename):
return re.sub(r'[ <>:"/\\|?*]', '_', filename).strip()
# Get the content of the email message
def get_email_content(msg):
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
if content_type == "text/plain" or content_type == "text/html":
return part.get_payload(decode=True), content_type
else:
return msg.get_payload(decode=True), msg.get_content_type()
return None, None
# Fetch emails
def fetch_emails(mail):
mail.select("inbox") # Select the mailbox
status, messages = mail.search(None, "ALL")
threads = defaultdict(list)
message_ids = {}
for num in messages[0].split():
status, msg_data = mail.fetch(num, '(RFC822)')
msg = email.message_from_bytes(msg_data[0][1])
subject, encoding = decode_header(msg['Subject'])[0]
if isinstance(subject, bytes):
subject = subject.decode(encoding if encoding else 'utf-8')
if subject == "":
subject = "No subject"
msg_from, encoding = decode_header(msg['From'])[0]
if isinstance(msg_from, bytes):
msg_from = msg_from.decode(encoding if encoding else 'utf-8')
msg_date = format_date(msg['Date'])
msg_id = msg.get('Message-ID')
in_reply_to = msg.get('In-Reply-To')
# Store message for threading
message_ids[msg_id] = {'subject': subject, 'id': num, 'message': msg, 'from': msg_from, 'date': msg_date}
if in_reply_to:
threads[in_reply_to].append(msg_id)
else:
threads[msg_id] = [] # This is a top-level message
# Save the email as a .eml file with Message-ID
save_email_as_eml(msg_data[0][1], subject, msg_id)
# Save the email content in an appropriate file
content, content_type = get_email_content(msg)
if content_type:
save_email_content(content, subject, msg_id, content_type)
# Handle attachments
if msg.is_multipart():
for part in msg.walk():
if part.get_content_disposition() == 'attachment':
save_attachment(part)
return threads, message_ids
# Save attachments based on their type
def save_attachment(part):
filename = part.get_filename()
if filename:
# Determine save directory based on file type
file_path = None
if filename.lower().endswith(('.jpg', '.jpeg', '.png', '.gif', '.bmp')):
file_path = os.path.join(PICTURES_DIR, filename)
elif filename.lower().endswith(('.pdf', '.txt', '.json', '.yml', '.yaml', '.csv')):
file_path = os.path.join(DOCUMENTS_DIR, filename)
elif filename.lower().endswith(('.mp3', '.wav', '.aac', '.flac')):
file_path = os.path.join(MUSIC_DIR, filename)
elif filename.lower().endswith(('.mp4', '.mov', '.avi', '.wmv', '.flv')):
file_path = os.path.join(VIDEOS_DIR, filename)
else:
file_path = os.path.join(DOWNLOADS_DIR, filename)
# Save the attachment
with open(file_path, 'wb') as f:
f.write(part.get_payload(decode=True))
return file_path
return None
# Save email as .eml file
def save_email_as_eml(raw_email, subject, msg_id):
clean_subject = sanitize_filename("".join(c for c in subject if c.isalnum() or c in (' ', '-', '_')).strip())
clean_msg_id = sanitize_filename(msg_id.strip('<>').replace('@', '_'))
filename = f"{clean_subject}_{clean_msg_id}.eml"
file_path = os.path.join(MAIL_DIR, 'raw', filename)
counter = 1
while os.path.exists(file_path):
file_path = os.path.join(MAIL_DIR, 'raw', f"{clean_subject}_{clean_msg_id}_{counter}.eml")
counter += 1
with open(file_path, 'wb') as f:
f.write(raw_email)
# Save email content to an appropriate file
def save_email_content(content, subject, msg_id, content_type):
if content_type == "text/plain":
extension = ".txt"
elif content_type == "text/html":
extension = ".html"
else:
extension = ".txt" # Default to .txt if unknown
clean_subject = sanitize_filename("".join(c for c in subject if c.isalnum() or c in (' ', '-', '_')).strip())
clean_msg_id = sanitize_filename(msg_id.strip('<>').replace('@', '_'))
filename = f"{clean_subject}_{clean_msg_id}{extension}"
file_path = os.path.join(MAIL_DIR, 'raw', filename)
counter = 1
while os.path.exists(file_path):
file_path = os.path.join(MAIL_DIR, 'raw', f"{clean_subject}_{clean_msg_id}_{counter}{extension}")
counter += 1
with open(file_path, 'wb') as f:
f.write(content)
# Display emails in threads
def display_threaded_emails(threads, message_ids):
displayed = set() # Track displayed messages to avoid duplicates
content = ""
def display_message(msg_id, indent_level):
content = ""
if msg_id in displayed:
return
displayed.add(msg_id)
msg = message_ids[msg_id]
clean_subject = sanitize_filename("".join(c for c in msg['subject'] if c.isalnum() or c in (' ', '-', '_')).strip())
clean_msg_id = sanitize_filename(msg_id.strip('<>').replace('@', '_'))
subject_hyperlink = f'{msg["subject"]}'
id_hyperlink = f'({msg["id"]})'
msg_date = msg['date'].replace('<', '<').replace('>', '>')
msg_from = msg['from'].replace('<', '<').replace('>', '>')
indent = f"{'' * 4 * indent_level}" if indent_level > 0 else ""
content += f"
{msg_date}
{msg_from}
{subject_hyperlink}
{id_hyperlink}
\n"
# Display replies, if any
for reply_id in threads.get(msg_id, []):
content += "
\n"
return content
# Display top-level messages
rows = []
for root_id in message_ids:
if root_id not in displayed:
rows.append(display_message(root_id, 0))
for row in reversed(rows):
content += "
\n"
content += row
content += "
\n"
return content
# Function to generate RSS feed content
def generate_rss_feed(threads, message_ids):
rss_items = []
rss_channel_title = "eom.dev"
rss_channel_link = "https://eom.dev/archive/Mail/rss_feed.xml"
rss_channel_description = "RSS feed of eom.dev discussion board"
for msg_id in message_ids:
msg = message_ids[msg_id]
# Escape special characters for XML
subject = html.escape(msg['subject'])
msg_from = html.escape(msg['from'])
clean_subject = sanitize_filename(subject)
clean_msg_id = sanitize_filename(msg_id.strip('<>').replace('@', '_'))
msg_link = os.path.join("/archive/Mail/raw", f"{clean_subject}_{clean_msg_id}.eml")
# Create RSS item
item = f"""
{subject}
{msg_link}
{f'Sender: {msg_from} \nDate: {msg["date"]}'}{format_date(msg['date'])}{msg_id.strip('<>')}
"""
rss_items.append(item)
rss_feed = f"""
{rss_channel_title}
{rss_channel_link}
{rss_channel_description}
{''.join(rss_items)}
"""
return rss_feed
# Helper function to format dates for RSS
def format_date(original_date):
# Convert date to RFC 822 format
try:
datetime_obj = email.utils.parsedate_to_datetime(original_date)
return datetime_obj.strftime("%a, %d %b %Y %H:%M:%S +0000")
except Exception as e:
print(f"Error formatting date: {e}")
return original_date
# Main execution
if __name__ == "__main__":
create_save_directory() # Create directory to save emails
mail = connect_to_email()
threads, message_ids = fetch_emails(mail)
with open(f'{MAIL_DIR}/index.html', 'w') as f:
f.write(HTML.format(content=display_threaded_emails(threads, message_ids)))
rss_feed_content = generate_rss_feed(threads, message_ids)
with open(f'{MAIL_DIR}/rss_feed.xml', 'w') as rss_file:
rss_file.write(rss_feed_content)
mail.logout()