**Add email support for video summaries** This PR allow us to send video summary results via email instead of printing them. It also updates .env_example so that the template for environment variable is up to date. **Note:** Due to limitations with my OpenAI API token (insufficient credits), I was only able to successfully test the email sending independently. Integration of email with the transcription and summarization output has not been tested. **Closes #9** Co-authored-by: iakn0001 <imran.aknaf@ulb.be> Reviewed-on: #10 Reviewed-by: Eric Meehan <eric@eom.dev> Co-authored-by: Imran <imran.aknaf.pro@gmail.com> Co-committed-by: Imran <imran.aknaf.pro@gmail.com>
113 lines
3.6 KiB
Python
113 lines
3.6 KiB
Python
import os
|
|
import requests
|
|
|
|
from moviepy import VideoFileClip
|
|
from openai import OpenAI
|
|
from pydub import AudioSegment
|
|
|
|
from prompt import DEFAULT_PROMPT
|
|
|
|
import smtplib
|
|
from email.message import EmailMessage
|
|
|
|
VIDEO_URL = os.getenv('INPUT_VIDEO_URL', None)
|
|
OUTPUT_PATH = os.getenv('OUTPUT_PATH', 'tmp')
|
|
AUDIO_SEGMENT_DURATION = 30000
|
|
OPENAI_API_KEY = os.getenv('OPENAI_API_KEY', None)
|
|
OPENAI_BASE_URL = os.getenv('OPENAI_BASE_URL', 'https://api.openai.com/v1')
|
|
OPENAI_TRANSCRIPTION_MODEL = os.getenv('OPENAI_TRANSCRIPTION_MODEL', 'whisper-1')
|
|
OPENAI_CHAT_SYSTEM_PROMPT = os.getenv('OPENAI_CHAT_SYSTEM_PROMPT', DEFAULT_PROMPT)
|
|
OPENAI_CHAT_MODEL = os.getenv('OPENAI_CHAT_MODEL', 'whisper-1')
|
|
OPENAI_CHAT_N = int(os.getenv('OPENAI_CHAT_N', '3'))
|
|
SENDER_EMAIL = os.getenv("SENDER_EMAIL", None)
|
|
RECEIVER_EMAIL = os.getenv("RECEIVER_EMAIL", None)
|
|
SENDER_APP_PASSWORD = os.getenv("SENDER_APP_PASSWORD", None)
|
|
SMTP_HOST = os.getenv("SMTP_HOST", "smtp.gmail.com")
|
|
SMTP_PORT = int(os.getenv("SMTP_PORT", "465"))
|
|
|
|
def main():
|
|
openai_client = OpenAI(
|
|
base_url = OPENAI_BASE_URL,
|
|
api_key = OPENAI_API_KEY
|
|
)
|
|
|
|
summaries = summarize_transcription(
|
|
openai_client,
|
|
transcribe_audio(
|
|
openai_client,
|
|
get_audio_from_video(
|
|
get_video_from_url()
|
|
)
|
|
)
|
|
)
|
|
|
|
email_body=""
|
|
for i, summary in enumerate(summaries) :
|
|
email_body += f"Option {i+1} :\n{summary.text}\n\n"
|
|
|
|
send_email(subject="Video summaries results", body=email_body)
|
|
|
|
return summaries
|
|
|
|
def get_video_from_url():
|
|
filename = VIDEO_URL.split('/')[-1]
|
|
with open(f"{OUTPUT_PATH}/{filename}", 'wb') as f:
|
|
for chunk in requests.get(VIDEO_URL).iter_content(chunk_size=255):
|
|
if chunk:
|
|
f.write(chunk)
|
|
return filename
|
|
|
|
def get_audio_from_video(video_filename):
|
|
VideoFileClip(f"{OUTPUT_PATH}/{video_filename}").audio.write_audiofile(f"{OUTPUT_PATH}/{video_filename}.wav")
|
|
audio = AudioSegment.from_wav(f"{OUTPUT_PATH}/{video_filename}.wav")
|
|
segments = []
|
|
for i in range(0, len(audio), AUDIO_SEGMENT_DURATION):
|
|
segment = audio[i:i + AUDIO_SEGMENT_DURATION]
|
|
path = f"{OUTPUT_PATH}/audio_segment_{i // AUDIO_SEGMENT_DURATION}.wav"
|
|
segments.append(path)
|
|
segment.export(path, format='wav')
|
|
return segments
|
|
|
|
def transcribe_audio(openai_client, audio_segments):
|
|
return ' '.join([
|
|
openai_client.audio.transcriptions.create(
|
|
model=OPENAI_TRANSCRIPTION_MODEL,
|
|
file=open(each, 'rb')
|
|
).text for each in audio_segments
|
|
])
|
|
|
|
def summarize_transcription(openai_client, transcription):
|
|
return openai_client.completions.create(
|
|
model=OPENAI_CHAT_MODEL,
|
|
prompt=OPENAI_CHAT_SYSTEM_PROMPT.format(transcription)
|
|
).choices
|
|
|
|
def send_email(subject, body, sender_email=SENDER_EMAIL, receiver_email=RECEIVER_EMAIL, sender_password=SENDER_APP_PASSWORD):
|
|
msg = EmailMessage()
|
|
msg.set_content(body)
|
|
msg['Subject'] = subject
|
|
msg['From'] = sender_email
|
|
msg['To'] = receiver_email
|
|
|
|
try:
|
|
with smtplib.SMTP_SSL(SMTP_HOST, SMTP_PORT) as smtp:
|
|
smtp.login(sender_email, sender_password)
|
|
smtp.send_message(msg)
|
|
print("Email sent successfully!")
|
|
except Exception as e:
|
|
print(f"Email sending error: {e}")
|
|
|
|
def setup():
|
|
from dotenv import load_dotenv
|
|
load_dotenv()
|
|
|
|
def cleanup():
|
|
os.rmdir(OUTPUT_PATH)
|
|
|
|
if __name__ == '__main__':
|
|
setup()
|
|
|
|
for summary in main():
|
|
print("========")
|
|
print(summary .text)
|