import os import requests from moviepy import VideoFileClip from openai import OpenAI from pydub import AudioSegment from prompt import DEFAULT_PROMPT VIDEO_URL = os.getenv('INPUT_VIDEO_URL', None) OUTPUT_PATH = os.getenv('OUTPUT_PATH', 'tmp') AUDIO_SEGMENT_DURATION = 30000 OPENAI_API_KEY = os.getenv('OPENAI_API_KEY', None) OPENAI_BASE_URL = os.getenv('OPENAI_BASE_URL', 'https://api.openai.com/v1') OPENAI_TRANSCRIPTION_MODEL = os.getenv('OPENAI_TRANSCRIPTION_MODEL', 'whisper-1') OPENAI_CHAT_SYSTEM_PROMPT = os.getenv('OPENAI_CHAT_SYSTEM_PROMPT', DEFAULT_PROMPT) OPENAI_CHAT_MODEL = os.getenv('OPENAI_CHAT_MODEL', 'whisper-1') OPENAI_CHAT_N = int(os.getenv('OPENAI_CHAT_N', '3')) def main(): openai_client = OpenAI( base_url = OPENAI_BASE_URL, api_key = OPENAI_API_KEY ) return summarize_transcription( openai_client, transcribe_audio( openai_client, get_audio_from_video( get_video_from_url() ) ) ) def get_video_from_url(): filename = VIDEO_URL.split('/')[-1] with open(f"{OUTPUT_PATH}/{filename}", 'wb') as f: for chunk in requests.get(VIDEO_URL).iter_content(chunk_size=255): if chunk: f.write(chunk) return filename def get_audio_from_video(video_filename): VideoFileClip(f"{OUTPUT_PATH}/{video_filename}").audio.write_audiofile(f"{OUTPUT_PATH}/{video_filename}.wav") audio = AudioSegment.from_wav(f"{OUTPUT_PATH}/{video_filename}.wav") segments = [] for i in range(0, len(audio), AUDIO_SEGMENT_DURATION): segment = audio[i:i + AUDIO_SEGMENT_DURATION] path = f"{OUTPUT_PATH}/audio_segment_{i // AUDIO_SEGMENT_DURATION}.wav" segments.append(path) segment.export(path, format='wav') return segments def transcribe_audio(openai_client, audio_segments): return ' '.join([ openai_client.audio.transcriptions.create( model=OPENAI_TRANSCRIPTION_MODEL, file=open(each, 'rb') ).text for each in audio_segments ]) def summarize_transcription(openai_client, transcription): return openai_client.completions.create( model=OPENAI_CHAT_MODEL, prompt=OPENAI_CHAT_SYSTEM_PROMPT.format(transcription) ).choices def setup(): from dotenv import load_dotenv load_dotenv() def cleanup(): os.rmdir(OUTPUT_PATH) if __name__ == '__main__': setup() for each in main(): print("========") print(each.text)