302 lines
12 KiB
Python
302 lines
12 KiB
Python
# Synchronizes a fileserver with an IMAP inbox and generates
|
|
# index.html and rss_feed.xml files.
|
|
#
|
|
# Written by Eric Meehan and GPT4o-mini
|
|
import email
|
|
import html
|
|
import imaplib
|
|
import os
|
|
import re
|
|
|
|
from collections import defaultdict
|
|
from datetime import datetime
|
|
from email.header import decode_header
|
|
|
|
# Email account credentials
|
|
USERNAME = os.getenv("USERNAME")
|
|
PASSWORD = os.getenv("PASSWORD")
|
|
IMAP_SERVER = os.getenv("IMAP_SERVER") # e.g., 'imap.gmail.com' for Gmail
|
|
SAVE_DIR = os.getenv("SAVE_DIR") # Directory to save .eml files
|
|
|
|
DOCUMENTS_DIR = f'{SAVE_DIR}/archive/Documents'
|
|
DOWNLOADS_DIR = f'{SAVE_DIR}/archive/Downloads'
|
|
MAIL_DIR = f'{SAVE_DIR}/archive/Mail'
|
|
MUSIC_DIR = f'{SAVE_DIR}/archive/Music'
|
|
PICTURES_DIR = f'{SAVE_DIR}/archive/Pictures'
|
|
VIDEOS_DIR = f'{SAVE_DIR}/archive/Videos'
|
|
|
|
HTML = """
|
|
<!DOCTYPE html>
|
|
<html lang="en">
|
|
<meta charset="UTF-8">
|
|
<title>eom.dev - Mailbox</title>
|
|
<meta name="viewport" content="width=device-width,initial-scale=1">
|
|
<link rel="stylesheet" href="/common/catppuccin.css">
|
|
<link rel="icon" type="image/x-icon" href="/common/favicon.ico">
|
|
<style>
|
|
</style>
|
|
<script src="common/ddg.js"></script>
|
|
<body>
|
|
<h1>eom.dev</h1>
|
|
<h2>Mailbox</h2>
|
|
<a href="/">Home
|
|
</a> | <a href="/archive">Archive
|
|
</a> | <a href="/archive/Repositories">Repositories
|
|
</a> | <a href="/stream">Live Stream
|
|
</a> | <a href="/archive/Mail">Mailbox
|
|
</a> | <a href="/chat">Chat
|
|
</a> | <a href="/donate">Donate
|
|
</a>
|
|
<h3>Browse Email Threads</h3>
|
|
<p>
|
|
This is a public discussion board powered by email. The source code is available <a href='/archive/Repositories/?p=www;a=tree'>here</a>.<br>
|
|
Email or CC <code>public-mailbox@eom.dev</code> to have your message appear here.<br>
|
|
Import the <code>.eml</code> file linked in the <i>ID</i> column into your email client to join an existing thread.<br>
|
|
<a href='https://en.wikipedia.org/wiki/Digital_signature'>Digital signatures</a> are encouraged. Encryption is not supported.<br>
|
|
This board is slightly easier to browse when threads are quoted in replies.<br>
|
|
Get updates via the <a href="/archive/Mail/rss_feed.xml">RSS feed</a>.
|
|
</p>
|
|
<!-- DuckDuckGo Site Search -->
|
|
<form
|
|
id="ddg-site-search"
|
|
action="https://duckduckgo.com/"
|
|
method="get"
|
|
target="_blank"
|
|
>
|
|
<input
|
|
type="search"
|
|
name="q"
|
|
id="ddg-query"
|
|
placeholder="Search with DuckDuckGo..."
|
|
aria-label="Search with DuckDuckGo"
|
|
required
|
|
/>
|
|
</form>
|
|
<table>
|
|
<tr>
|
|
<th>Date</th><th>From</th><th>Subject</th><th>ID</th>
|
|
</tr>
|
|
{content}
|
|
</table>
|
|
</body>
|
|
</html>
|
|
"""
|
|
|
|
|
|
# Connect to the IMAP server and log in
|
|
def connect_to_email():
|
|
mail = imaplib.IMAP4_SSL(IMAP_SERVER)
|
|
mail.login(USERNAME, PASSWORD)
|
|
return mail
|
|
|
|
# Create a directory for saving emails
|
|
def create_save_directory():
|
|
for each in [SAVE_DIR, DOCUMENTS_DIR, DOWNLOADS_DIR, MAIL_DIR, MUSIC_DIR, PICTURES_DIR, VIDEOS_DIR]:
|
|
if not os.path.exists(each):
|
|
os.makedirs(each)
|
|
if not os.path.exists(f'{MAIL_DIR}/raw'):
|
|
os.makedirs(f'{MAIL_DIR}/raw')
|
|
|
|
# Sanitize the filename by removing invalid characters
|
|
def sanitize_filename(filename):
|
|
return re.sub(r'[ <>:"/\\|?*]', '_', filename).strip()
|
|
|
|
# Get the content of the email message
|
|
def get_email_content(msg):
|
|
if msg.is_multipart():
|
|
for part in msg.walk():
|
|
content_type = part.get_content_type()
|
|
if content_type == "text/plain" or content_type == "text/html":
|
|
return part.get_payload(decode=True), content_type
|
|
else:
|
|
return msg.get_payload(decode=True), msg.get_content_type()
|
|
return None, None
|
|
|
|
# Fetch emails
|
|
def fetch_emails(mail):
|
|
mail.select("inbox") # Select the mailbox
|
|
status, messages = mail.search(None, "ALL")
|
|
threads = defaultdict(list)
|
|
message_ids = {}
|
|
for num in messages[0].split():
|
|
status, msg_data = mail.fetch(num, '(RFC822)')
|
|
msg = email.message_from_bytes(msg_data[0][1])
|
|
subject, encoding = decode_header(msg['Subject'])[0]
|
|
if isinstance(subject, bytes):
|
|
subject = subject.decode(encoding if encoding else 'utf-8')
|
|
if subject == "":
|
|
subject = "No subject"
|
|
msg_from, encoding = decode_header(msg['From'])[0]
|
|
if isinstance(msg_from, bytes):
|
|
msg_from = msg_from.decode(encoding if encoding else 'utf-8')
|
|
msg_date = format_date(msg['Date'])
|
|
msg_id = msg.get('Message-ID')
|
|
in_reply_to = msg.get('In-Reply-To')
|
|
# Store message for threading
|
|
message_ids[msg_id] = {'subject': subject, 'id': num, 'message': msg, 'from': msg_from, 'date': msg_date}
|
|
if in_reply_to:
|
|
threads[in_reply_to].append(msg_id)
|
|
else:
|
|
threads[msg_id] = [] # This is a top-level message
|
|
# Save the email as a .eml file with Message-ID
|
|
save_email_as_eml(msg_data[0][1], subject, msg_id)
|
|
# Save the email content in an appropriate file
|
|
content, content_type = get_email_content(msg)
|
|
if content_type:
|
|
save_email_content(content, subject, msg_id, content_type)
|
|
# Handle attachments
|
|
if msg.is_multipart():
|
|
for part in msg.walk():
|
|
if part.get_content_disposition() == 'attachment':
|
|
save_attachment(part)
|
|
return threads, message_ids
|
|
|
|
# Save attachments based on their type
|
|
def save_attachment(part):
|
|
filename = part.get_filename()
|
|
if filename:
|
|
# Determine save directory based on file type
|
|
file_path = None
|
|
if filename.lower().endswith(('.jpg', '.jpeg', '.png', '.gif', '.bmp')):
|
|
file_path = os.path.join(PICTURES_DIR, filename)
|
|
elif filename.lower().endswith(('.pdf', '.txt', '.json', '.yml', '.yaml', '.csv')):
|
|
file_path = os.path.join(DOCUMENTS_DIR, filename)
|
|
elif filename.lower().endswith(('.mp3', '.wav', '.aac', '.flac')):
|
|
file_path = os.path.join(MUSIC_DIR, filename)
|
|
elif filename.lower().endswith(('.mp4', '.mov', '.avi', '.wmv', '.flv')):
|
|
file_path = os.path.join(VIDEOS_DIR, filename)
|
|
else:
|
|
file_path = os.path.join(DOWNLOADS_DIR, filename)
|
|
# Save the attachment
|
|
with open(file_path, 'wb') as f:
|
|
f.write(part.get_payload(decode=True))
|
|
return file_path
|
|
return None
|
|
|
|
# Save email as .eml file
|
|
def save_email_as_eml(raw_email, subject, msg_id):
|
|
clean_subject = sanitize_filename("".join(c for c in subject if c.isalnum() or c in (' ', '-', '_')).strip())
|
|
clean_msg_id = sanitize_filename(msg_id.strip('<>').replace('@', '_'))
|
|
filename = f"{clean_subject}_{clean_msg_id}.eml"
|
|
file_path = os.path.join(MAIL_DIR, 'raw', filename)
|
|
counter = 1
|
|
while os.path.exists(file_path):
|
|
file_path = os.path.join(MAIL_DIR, 'raw', f"{clean_subject}_{clean_msg_id}_{counter}.eml")
|
|
counter += 1
|
|
with open(file_path, 'wb') as f:
|
|
f.write(raw_email)
|
|
|
|
# Save email content to an appropriate file
|
|
def save_email_content(content, subject, msg_id, content_type):
|
|
if content_type == "text/plain":
|
|
extension = ".txt"
|
|
elif content_type == "text/html":
|
|
extension = ".html"
|
|
else:
|
|
extension = ".txt" # Default to .txt if unknown
|
|
clean_subject = sanitize_filename("".join(c for c in subject if c.isalnum() or c in (' ', '-', '_')).strip())
|
|
clean_msg_id = sanitize_filename(msg_id.strip('<>').replace('@', '_'))
|
|
filename = f"{clean_subject}_{clean_msg_id}{extension}"
|
|
file_path = os.path.join(MAIL_DIR, 'raw', filename)
|
|
counter = 1
|
|
while os.path.exists(file_path):
|
|
file_path = os.path.join(MAIL_DIR, 'raw', f"{clean_subject}_{clean_msg_id}_{counter}{extension}")
|
|
counter += 1
|
|
with open(file_path, 'wb') as f:
|
|
f.write(content)
|
|
|
|
# Display emails in threads
|
|
def display_threaded_emails(threads, message_ids):
|
|
displayed = set() # Track displayed messages to avoid duplicates
|
|
content = ""
|
|
def display_message(msg_id, indent_level):
|
|
content = ""
|
|
if msg_id in displayed:
|
|
return
|
|
displayed.add(msg_id)
|
|
msg = message_ids[msg_id]
|
|
clean_subject = sanitize_filename("".join(c for c in msg['subject'] if c.isalnum() or c in (' ', '-', '_')).strip())
|
|
clean_msg_id = sanitize_filename(msg_id.strip('<>').replace('@', '_'))
|
|
subject_hyperlink = f'<a href="{os.path.join("/archive/Mail/raw", f"{clean_subject}_{clean_msg_id}" + (".html" if msg["message"].get_content_type() == "text/html" else ".txt"))}">{msg["subject"]}</a>'
|
|
id_hyperlink = f'<a href="{os.path.join("/archive/Mail/raw", f"{clean_subject}_{clean_msg_id}" + ".eml")}">({msg["id"]})</a>'
|
|
msg_date = msg['date'].replace('<', '<').replace('>', '>')
|
|
msg_from = msg['from'].replace('<', '<').replace('>', '>')
|
|
indent = f"{'' * 4 * indent_level}" if indent_level > 0 else ""
|
|
content += f"<td>{msg_date}</td><td><strong>{msg_from}</td><td style='padding-left: {indent_level * 25}px;'>{subject_hyperlink}</td><td>{id_hyperlink}</td>\n"
|
|
# Display replies, if any
|
|
for reply_id in threads.get(msg_id, []):
|
|
content += "<tr>\n"
|
|
content += display_message(reply_id, indent_level + 1)
|
|
content += "</tr>\n"
|
|
return content
|
|
# Display top-level messages
|
|
rows = []
|
|
for root_id in message_ids:
|
|
if root_id not in displayed:
|
|
rows.append(display_message(root_id, 0))
|
|
for row in reversed(rows):
|
|
content += "<tr>\n"
|
|
content += row
|
|
content += "</tr>\n"
|
|
return content
|
|
|
|
# Function to generate RSS feed content
|
|
def generate_rss_feed(threads, message_ids):
|
|
rss_items = []
|
|
rss_channel_title = "eom.dev"
|
|
rss_channel_link = "https://eom.dev/archive/Mail/rss_feed.xml"
|
|
rss_channel_description = "RSS feed of eom.dev discussion board"
|
|
for msg_id in message_ids:
|
|
msg = message_ids[msg_id]
|
|
# Escape special characters for XML
|
|
subject = html.escape(msg['subject'])
|
|
msg_from = html.escape(msg['from'])
|
|
clean_subject = sanitize_filename(subject)
|
|
clean_msg_id = sanitize_filename(msg_id.strip('<>').replace('@', '_'))
|
|
msg_link = os.path.join("/archive/Mail/raw", f"{clean_subject}_{clean_msg_id}.eml")
|
|
# Create RSS item
|
|
item = f"""
|
|
<item>
|
|
<title>{subject}</title>
|
|
<link>{msg_link}</link>
|
|
<description>{f'Sender: {msg_from} \nDate: {msg["date"]}'}</description>
|
|
<pubDate>{format_date(msg['date'])}</pubDate>
|
|
<guid isPermaLink="false">{msg_id.strip('<>')}</guid> <!-- Remove angle brackets -->
|
|
</item>
|
|
"""
|
|
rss_items.append(item)
|
|
rss_feed = f"""<?xml version="1.0" encoding="UTF-8" ?>
|
|
<rss version="2.0">
|
|
<channel>
|
|
<title>{rss_channel_title}</title>
|
|
<link>{rss_channel_link}</link>
|
|
<description>{rss_channel_description}</description>
|
|
{''.join(rss_items)}
|
|
</channel>
|
|
</rss>
|
|
"""
|
|
return rss_feed
|
|
|
|
# Helper function to format dates for RSS
|
|
def format_date(original_date):
|
|
# Convert date to RFC 822 format
|
|
try:
|
|
datetime_obj = email.utils.parsedate_to_datetime(original_date)
|
|
return datetime_obj.strftime("%a, %d %b %Y %H:%M:%S +0000")
|
|
except Exception as e:
|
|
print(f"Error formatting date: {e}")
|
|
return original_date
|
|
|
|
# Main execution
|
|
if __name__ == "__main__":
|
|
create_save_directory() # Create directory to save emails
|
|
mail = connect_to_email()
|
|
threads, message_ids = fetch_emails(mail)
|
|
with open(f'{MAIL_DIR}/index.html', 'w') as f:
|
|
f.write(HTML.format(content=display_threaded_emails(threads, message_ids)))
|
|
rss_feed_content = generate_rss_feed(threads, message_ids)
|
|
with open(f'{MAIL_DIR}/rss_feed.xml', 'w') as rss_file:
|
|
rss_file.write(rss_feed_content)
|
|
mail.logout()
|
|
|