import crypto from "crypto"; import fs from "fs/promises"; import Parser from "rss-parser"; import { config } from "../services/config.js"; import { closeBrowser, enhanceArticleContent, } from "../services/content-extractor.js"; import { getFeedById, getFeedByUrl, getUnprocessedArticles, markArticleAsProcessed, saveArticle, saveEpisode, saveFeed, } from "../services/database.js"; import { openAI_ClassifyEpisode, openAI_ClassifyFeed, openAI_GeneratePodcastContent, } from "../services/llm.js"; import { regenerateStartupFiles, updatePodcastRSS, } from "../services/podcast.js"; import { generateTTS, generateTTSWithoutQueue } from "../services/tts.js"; interface FeedItem { id?: string; title?: string; link?: string; pubDate?: string; contentSnippet?: string; content?: string; description?: string; } /** * Main batch processing function * Processes all feeds and generates podcasts for new articles */ export async function batchProcess(abortSignal?: AbortSignal): Promise { try { console.log("🚀 Starting enhanced batch process..."); // Check for cancellation at start if (abortSignal?.aborted) { throw new Error("Batch process was cancelled before starting"); } // Load feed URLs from file const { fetchActiveFeeds } = await import("../services/database.js"); const feeds = await fetchActiveFeeds(); const feedUrls = feeds.map((feed) => feed.url).filter((url) => !!url); if (feedUrls.length === 0) { console.log("â„šī¸ No feed URLs found."); return; } console.log(`📡 Processing ${feedUrls.length} feeds...`); // Process each feed URL for (const url of feedUrls) { // Check for cancellation before processing each feed if (abortSignal?.aborted) { throw new Error("Batch process was cancelled during feed processing"); } try { await processFeedUrl(url, abortSignal); } catch (error) { // Re-throw cancellation errors if ( error instanceof Error && (error.message.includes("cancelled") || error.name === "AbortError") ) { throw error; } console.error(`❌ Failed to process feed ${url}:`, error); // Continue with other feeds } } // Check for cancellation before processing articles if (abortSignal?.aborted) { throw new Error("Batch process was cancelled before article processing"); } // Process unprocessed articles and generate podcasts await processUnprocessedArticles(abortSignal); console.log( "✅ Enhanced batch process completed:", new Date().toISOString(), ); } catch (error) { if ( error instanceof Error && (error.message.includes("cancelled") || error.name === "AbortError") ) { console.log("🛑 Batch process was cancelled"); const abortError = new Error("Batch process was cancelled"); abortError.name = "AbortError"; throw abortError; } console.error("đŸ’Ĩ Batch process failed:", error); throw error; } } /** * Process a single feed URL and discover new articles */ async function processFeedUrl( url: string, abortSignal?: AbortSignal, ): Promise { if (!url || !url.startsWith("http")) { throw new Error(`Invalid feed URL: ${url}`); } // Check for cancellation if (abortSignal?.aborted) { throw new Error("Feed processing was cancelled"); } console.log(`🔍 Processing feed: ${url}`); try { // Parse RSS feed const parser = new Parser(); const feed = await parser.parseURL(url); // Check for cancellation after parsing if (abortSignal?.aborted) { throw new Error("Feed processing was cancelled"); } // Get or create feed record let feedRecord = await getFeedByUrl(url); if (!feedRecord) { console.log(`➕ Adding new feed: ${feed.title || url}`); await saveFeed({ url, title: feed.title, description: feed.description, lastUpdated: new Date().toISOString(), active: true, }); feedRecord = await getFeedByUrl(url); } if (!feedRecord) { throw new Error("Failed to create or retrieve feed record"); } // Process feed items and save new articles const newArticlesCount = await discoverNewArticles( feedRecord, feed.items || [], ); // Update feed last updated timestamp if (newArticlesCount > 0) { await saveFeed({ url: feedRecord.url, title: feedRecord.title, description: feedRecord.description, lastUpdated: new Date().toISOString(), active: feedRecord.active, }); } console.log( `📊 Feed processed: ${feed.title || url} (${newArticlesCount} new articles)`, ); } catch (error) { console.error(`đŸ’Ĩ Error processing feed ${url}:`, error); throw error; } } /** * Discover and save new articles from feed items */ async function discoverNewArticles( feed: any, items: FeedItem[], ): Promise { let newArticlesCount = 0; for (const item of items) { if (!item.title || !item.link) { console.warn("âš ī¸ Skipping item without title or link"); continue; } try { // Generate article ID based on link const articleId = await saveArticle({ feedId: feed.id, title: item.title, link: item.link, description: item.description || item.contentSnippet, content: item.content, pubDate: item.pubDate || new Date().toISOString(), processed: false, }); // Check if this is truly a new article if (articleId) { newArticlesCount++; console.log(`📄 New article discovered: ${item.title}`); } } catch (error) { console.error(`❌ Error saving article: ${item.title}`, error); } } return newArticlesCount; } /** * Process unprocessed articles and generate podcasts */ async function processUnprocessedArticles( abortSignal?: AbortSignal, ): Promise { console.log("🎧 Processing unprocessed articles..."); try { // Check for cancellation if (abortSignal?.aborted) { throw new Error("Article processing was cancelled"); } // Process retry queue first await processRetryQueue(abortSignal); // Check for cancellation after retry queue if (abortSignal?.aborted) { throw new Error("Article processing was cancelled"); } // Get unprocessed articles (limit to prevent overwhelming) const unprocessedArticles = await getUnprocessedArticles( Number.parseInt(import.meta.env["LIMIT_UNPROCESSED_ARTICLES"] || "10"), ); if (unprocessedArticles.length === 0) { console.log("â„šī¸ No unprocessed articles found."); return; } console.log(`đŸŽ¯ Found ${unprocessedArticles.length} unprocessed articles`); // Track articles that successfully generated audio AND episodes let successfullyGeneratedCount = 0; for (const article of unprocessedArticles) { // Check for cancellation before processing each article if (abortSignal?.aborted) { throw new Error("Article processing was cancelled"); } try { const episodeCreated = await generatePodcastForArticle( article, abortSignal, ); // Only mark as processed and update RSS if episode was actually created if (episodeCreated) { await markArticleAsProcessed(article.id); console.log(`✅ Podcast generated for: ${article.title}`); successfullyGeneratedCount++; // Update RSS immediately after each successful episode creation console.log( `đŸ“ģ Updating podcast RSS after successful episode creation...`, ); try { await regenerateStartupFiles(); console.log(`đŸ“ģ RSS updated successfully for: ${article.title}`); } catch (rssError) { console.error( `❌ Failed to update RSS after episode creation for: ${article.title}`, rssError, ); } } else { console.warn( `âš ī¸ Episode creation failed for: ${article.title} - not marking as processed`, ); } } catch (error) { if ( error instanceof Error && (error.message.includes("cancelled") || error.name === "AbortError") ) { console.log(`🛑 Article processing cancelled, stopping batch`); throw error; // Re-throw to propagate cancellation } console.error( `❌ Failed to generate podcast for article: ${article.title}`, error, ); // Don't mark as processed if generation failed } } console.log( `đŸŽ¯ Batch processing completed: ${successfullyGeneratedCount} episodes successfully created`, ); if (successfullyGeneratedCount === 0) { console.log(`â„šī¸ No episodes were successfully created in this batch`); } } catch (error) { console.error("đŸ’Ĩ Error processing unprocessed articles:", error); throw error; } } /** * Process retry queue for failed TTS generation */ async function processRetryQueue(abortSignal?: AbortSignal): Promise { const { getQueueItems, updateQueueItemStatus, removeFromQueue } = await import("../services/database.js"); const { Database } = await import("bun:sqlite"); const db = new Database(config.paths.dbPath); console.log("🔄 Processing TTS retry queue..."); try { const queueItems = await getQueueItems(5); // Process 5 items at a time if (queueItems.length === 0) { return; } console.log(`📋 Found ${queueItems.length} items in retry queue`); for (const item of queueItems) { // Check for cancellation before processing each retry item if (abortSignal?.aborted) { throw new Error("Retry queue processing was cancelled"); } try { console.log( `🔁 Retrying TTS generation for: ${item.itemId} (attempt ${item.retryCount + 1}/3)`, ); // Mark as processing await updateQueueItemStatus(item.id, "processing"); // Attempt TTS generation without re-queuing on failure await generateTTSWithoutQueue( item.itemId, item.scriptText, item.retryCount, ); // Success - remove from queue and update RSS await removeFromQueue(item.id); console.log(`✅ TTS retry successful for: ${item.itemId}`); // Update RSS immediately after successful retry console.log(`đŸ“ģ Updating podcast RSS after successful retry...`); try { await updatePodcastRSS(); console.log( `đŸ“ģ RSS updated successfully after retry for: ${item.itemId}`, ); } catch (rssError) { console.error( `❌ Failed to update RSS after retry for: ${item.itemId}`, rssError, ); } } catch (error) { if ( error instanceof Error && (error.message.includes("cancelled") || error.name === "AbortError") ) { console.log(`🛑 TTS retry processing cancelled for: ${item.itemId}`); throw error; // Re-throw cancellation errors } console.error(`❌ TTS retry failed for: ${item.itemId}`, error); try { if (item.retryCount >= 2) { // Max retries reached, mark as failed await updateQueueItemStatus(item.id, "failed"); console.log( `💀 Max retries reached for: ${item.itemId}, marking as failed`, ); } else { // Increment retry count and reset to pending for next retry const updatedRetryCount = item.retryCount + 1; const stmt = db.prepare( "UPDATE tts_queue SET retry_count = ?, status = 'pending' WHERE id = ?", ); stmt.run(updatedRetryCount, item.id); console.log( `🔄 Updated retry count to ${updatedRetryCount} for: ${item.itemId}`, ); } } catch (dbError) { console.error( `❌ Failed to update queue status for: ${item.itemId}`, dbError, ); } } } } catch (error) { console.error("đŸ’Ĩ Error processing retry queue:", error); throw error; } finally { // Clean up database connection try { db.close(); } catch (closeError) { console.warn( "âš ī¸ Warning: Failed to close database connection:", closeError, ); } // Close Puppeteer browser on exit await closeBrowser(); } } /** * Generate podcast for a single article * Returns true if episode was successfully created, false otherwise */ async function generatePodcastForArticle( article: any, abortSignal?: AbortSignal, ): Promise { console.log(`🎤 Generating podcast for: ${article.title}`); try { // Check for cancellation if (abortSignal?.aborted) { throw new Error("Podcast generation was cancelled"); } // Get feed information for context await getFeedById(article.feedId); // Check for cancellation before classification if (abortSignal?.aborted) { throw new Error("Podcast generation was cancelled"); } // Enhance article content with web scraping if needed console.log(`🔍 Enhancing content for: ${article.title}`); const enhancedContent = await enhanceArticleContent( article.title, article.link, article.content, article.description, ); // Check for cancellation before content generation if (abortSignal?.aborted) { throw new Error("Podcast generation was cancelled"); } // Generate podcast content for this single article const podcastContent = await openAI_GeneratePodcastContent(article.title, [ { title: article.title, link: article.link, content: enhancedContent.content, description: enhancedContent.description, }, ]); // Classify the episode based on the podcast content console.log(`đŸˇī¸ Classifying episode content for: ${article.title}`); const episodeCategory = await openAI_ClassifyEpisode( article.title, enhancedContent.description, enhancedContent.content, ); console.log(`đŸˇī¸ Episode classified as: ${episodeCategory}`); // Check for cancellation before TTS if (abortSignal?.aborted) { throw new Error("Podcast generation was cancelled"); } // Generate unique ID for the episode const episodeId = crypto.randomUUID(); // Generate TTS audio - this is the critical step that can fail let audioFilePath: string; try { audioFilePath = await generateTTS(episodeId, podcastContent); console.log(`🔊 Audio generated: ${audioFilePath}`); } catch (ttsError) { console.error(`❌ TTS generation failed for ${article.title}:`, ttsError); // Check if error indicates item was added to retry queue const errorMessage = ttsError instanceof Error ? ttsError.message : String(ttsError); if (errorMessage.includes("added to retry queue")) { console.log( `📋 Article will be retried later via TTS queue: ${article.title}`, ); // Don't mark as processed - leave it for retry return false; } else { console.error( `💀 TTS generation permanently failed for ${article.title} - max retries exceeded`, ); // Max retries exceeded, don't create episode but mark as processed to avoid infinite retry return false; } } // Verify audio file was actually created and is valid try { const audioStats = await getAudioFileStats(audioFilePath); if (audioStats.size === 0) { console.error( `❌ Audio file is empty for ${article.title}: ${audioFilePath}`, ); return false; } } catch (statsError) { console.error( `❌ Cannot access audio file for ${article.title}: ${audioFilePath}`, statsError, ); return false; } // Get audio file stats const audioStats = await getAudioFileStats(audioFilePath); // Save episode - only if we have valid audio try { await saveEpisode({ articleId: article.id, title: article.title, description: article.description || `Podcast episode for: ${article.title}`, audioPath: audioFilePath, duration: audioStats.duration, fileSize: audioStats.size, category: episodeCategory, }); console.log(`💾 Episode saved for article: ${article.title}`); return true; } catch (saveError) { console.error( `❌ Failed to save episode for ${article.title}:`, saveError, ); return false; } } catch (error) { if ( error instanceof Error && (error.message.includes("cancelled") || error.name === "AbortError") ) { console.log(`🛑 Podcast generation cancelled for: ${article.title}`); throw error; // Re-throw cancellation errors to stop the batch } console.error( `đŸ’Ĩ Error generating podcast for article: ${article.title}`, error, ); return false; } } /** * Get audio file statistics */ async function getAudioFileStats( audioFileName: string, ): Promise<{ duration?: number; size: number }> { try { const audioPath = `${config.paths.podcastAudioDir}/${audioFileName}`; const stats = await fs.stat(audioPath); return { size: stats.size, // TODO: Add duration calculation using ffprobe if needed duration: undefined, }; } catch (error) { console.warn( `âš ī¸ Could not get audio file stats for ${audioFileName}:`, error, ); return { size: 0 }; } } /** * Legacy function compatibility - process feed URL the old way * This is kept for backward compatibility during migration */ // Commented out to fix TypeScript unused variable warnings /* async function legacyProcessFeedUrl(url: string): Promise { console.log(`🔄 Legacy processing for: ${url}`); const parser = new Parser(); const feed = await parser.parseURL(url); // Feed classification const feedTitle = feed.title || url; const category = await openAI_ClassifyFeed(feedTitle); console.log(`Feed classified: ${feedTitle} - ${category}`); const latest5Items = (feed.items || []).slice(0, 5); if (latest5Items.length === 0) { console.log(`No items found in feed: ${feedTitle}`); return; } // Generate podcast content (old way - multiple articles in one podcast) console.log(`Generating podcast content for: ${feedTitle}`); const validItems = latest5Items.filter((item): item is FeedItem => { return !!item.title && !!item.link; }); if (validItems.length === 0) { console.log(`No valid items found in feed: ${feedTitle}`); return; } const podcastContent = await openAI_GeneratePodcastContent( feedTitle, validItems as any ); // Generate unique ID for this feed and category combination const feedUrlHash = crypto.createHash("md5").update(url).digest("hex"); const categoryHash = crypto.createHash("md5").update(category).digest("hex"); const timestamp = new Date().getTime(); const uniqueId = `${feedUrlHash}-${categoryHash}-${timestamp}`; const audioFilePath = await generateTTS(uniqueId, podcastContent); console.log(`Audio file generated: ${audioFilePath}`); // Save as legacy episode const firstItem = latest5Items[0]; if (!firstItem) { console.warn("No items found"); return; } const pubDate = new Date(firstItem.pubDate || new Date()); // For now, save using the new episode structure // TODO: Remove this once migration is complete const tempArticleId = crypto.randomUUID(); await saveEpisode({ articleId: tempArticleId, title: `${category}: ${feedTitle}`, description: `Legacy podcast for feed: ${feedTitle}`, audioPath: audioFilePath }); console.log(`Legacy episode saved: ${category} - ${feedTitle}`); // Mark individual articles as processed (legacy) for (const item of latest5Items) { try { const itemId = (item as any)["id"] as string | undefined; const fallbackId = item.link || item.title || JSON.stringify(item); const finalItemId = itemId && typeof itemId === "string" && itemId.trim() !== "" ? itemId : `fallback-${Buffer.from(fallbackId).toString("base64")}`; if (!finalItemId || finalItemId.trim() === "") { console.warn(`Could not generate ID for feed item`, { feedUrl: url, itemTitle: item.title, itemLink: item.link, }); continue; } const alreadyProcessed = await markAsProcessed(url, finalItemId); if (alreadyProcessed) { console.log(`Already processed: ${finalItemId}`); } } catch (error) { console.error(`Error marking item as processed:`, error); } } } */ // Export function for use in server export async function addNewFeedUrl(feedUrl: string): Promise { if (!feedUrl || !feedUrl.startsWith("http")) { throw new Error("Invalid feed URL"); } try { // Parse RSS feed to get feed information including title const parser = new Parser(); const feed = await parser.parseURL(feedUrl); // Extract feed title, fallback to URL if not available const feedTitle = feed.title || feedUrl; // Classify feed category using OpenAI const category = await openAI_ClassifyFeed(feedTitle); // Add to feeds table with title and category await saveFeed({ url: feedUrl, title: feedTitle, category: category, active: true, }); console.log( `✅ Feed URL added: ${feedUrl} (Title: ${feedTitle}, Category: ${category})`, ); } catch (error) { console.error(`❌ Failed to add feed URL: ${feedUrl}`, error); throw error; } } // Run if this script is executed directly if (import.meta.main) { batchProcess().catch((err) => { console.error("đŸ’Ĩ Batch process failed:", err); process.exit(1); }); }