Files
VoiceRSSSummary/scripts/fetch_and_generate.ts
2025-06-08 21:53:45 +09:00

749 lines
22 KiB
TypeScript
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import crypto from "crypto";
import fs from "fs/promises";
import Parser from "rss-parser";
import { config } from "../services/config.js";
import {
closeBrowser,
enhanceArticleContent,
} from "../services/content-extractor.js";
import {
getFeedById,
getFeedByUrl,
getUnprocessedArticles,
markArticleAsProcessed,
saveArticle,
saveEpisode,
saveFeed,
} from "../services/database.js";
import {
openAI_ClassifyEpisode,
openAI_ClassifyFeed,
openAI_GeneratePodcastContent,
} from "../services/llm.js";
import { updatePodcastRSS } from "../services/podcast.js";
import { generateTTS, generateTTSWithoutQueue } from "../services/tts.js";
interface FeedItem {
id?: string;
title?: string;
link?: string;
pubDate?: string;
contentSnippet?: string;
content?: string;
description?: string;
}
/**
* Main batch processing function
* Processes all feeds and generates podcasts for new articles
*/
export async function batchProcess(abortSignal?: AbortSignal): Promise<void> {
try {
console.log("🚀 Starting enhanced batch process...");
// Check for cancellation at start
if (abortSignal?.aborted) {
throw new Error("Batch process was cancelled before starting");
}
// Load feed URLs from file
const { fetchActiveFeeds } = await import("../services/database.js");
const feeds = await fetchActiveFeeds();
const feedUrls = feeds.map((feed) => feed.url).filter((url) => !!url);
if (feedUrls.length === 0) {
console.log(" No feed URLs found.");
return;
}
console.log(`📡 Processing ${feedUrls.length} feeds...`);
// Process each feed URL
for (const url of feedUrls) {
// Check for cancellation before processing each feed
if (abortSignal?.aborted) {
throw new Error("Batch process was cancelled during feed processing");
}
try {
await processFeedUrl(url, abortSignal);
} catch (error) {
// Re-throw cancellation errors
if (
error instanceof Error &&
(error.message.includes("cancelled") || error.name === "AbortError")
) {
throw error;
}
console.error(`❌ Failed to process feed ${url}:`, error);
// Continue with other feeds
}
}
// Check for cancellation before processing articles
if (abortSignal?.aborted) {
throw new Error("Batch process was cancelled before article processing");
}
// Process unprocessed articles and generate podcasts
await processUnprocessedArticles(abortSignal);
console.log(
"✅ Enhanced batch process completed:",
new Date().toISOString(),
);
} catch (error) {
if (
error instanceof Error &&
(error.message.includes("cancelled") || error.name === "AbortError")
) {
console.log("🛑 Batch process was cancelled");
const abortError = new Error("Batch process was cancelled");
abortError.name = "AbortError";
throw abortError;
}
console.error("💥 Batch process failed:", error);
throw error;
}
}
/**
* Process a single feed URL and discover new articles
*/
async function processFeedUrl(
url: string,
abortSignal?: AbortSignal,
): Promise<void> {
if (!url || !url.startsWith("http")) {
throw new Error(`Invalid feed URL: ${url}`);
}
// Check for cancellation
if (abortSignal?.aborted) {
throw new Error("Feed processing was cancelled");
}
console.log(`🔍 Processing feed: ${url}`);
try {
// Parse RSS feed
const parser = new Parser<FeedItem>();
const feed = await parser.parseURL(url);
// Check for cancellation after parsing
if (abortSignal?.aborted) {
throw new Error("Feed processing was cancelled");
}
// Get or create feed record
let feedRecord = await getFeedByUrl(url);
if (!feedRecord) {
console.log(` Adding new feed: ${feed.title || url}`);
await saveFeed({
url,
title: feed.title,
description: feed.description,
lastUpdated: new Date().toISOString(),
active: true,
});
feedRecord = await getFeedByUrl(url);
}
if (!feedRecord) {
throw new Error("Failed to create or retrieve feed record");
}
// Process feed items and save new articles
const newArticlesCount = await discoverNewArticles(
feedRecord,
feed.items || [],
);
// Update feed last updated timestamp
if (newArticlesCount > 0) {
await saveFeed({
url: feedRecord.url,
title: feedRecord.title,
description: feedRecord.description,
lastUpdated: new Date().toISOString(),
active: feedRecord.active,
});
}
console.log(
`📊 Feed processed: ${feed.title || url} (${newArticlesCount} new articles)`,
);
} catch (error) {
console.error(`💥 Error processing feed ${url}:`, error);
throw error;
}
}
/**
* Discover and save new articles from feed items
*/
async function discoverNewArticles(
feed: any,
items: FeedItem[],
): Promise<number> {
let newArticlesCount = 0;
for (const item of items) {
if (!item.title || !item.link) {
console.warn("⚠️ Skipping item without title or link");
continue;
}
try {
// Generate article ID based on link
const articleId = await saveArticle({
feedId: feed.id,
title: item.title,
link: item.link,
description: item.description || item.contentSnippet,
content: item.content,
pubDate: item.pubDate || new Date().toISOString(),
processed: false,
});
// Check if this is truly a new article
if (articleId) {
newArticlesCount++;
console.log(`📄 New article discovered: ${item.title}`);
}
} catch (error) {
console.error(`❌ Error saving article: ${item.title}`, error);
}
}
return newArticlesCount;
}
/**
* Process unprocessed articles and generate podcasts
*/
async function processUnprocessedArticles(
abortSignal?: AbortSignal,
): Promise<void> {
console.log("🎧 Processing unprocessed articles...");
try {
// Check for cancellation
if (abortSignal?.aborted) {
throw new Error("Article processing was cancelled");
}
// Process retry queue first
await processRetryQueue(abortSignal);
// Check for cancellation after retry queue
if (abortSignal?.aborted) {
throw new Error("Article processing was cancelled");
}
// Get unprocessed articles (limit to prevent overwhelming)
const unprocessedArticles = await getUnprocessedArticles(
Number.parseInt(import.meta.env["LIMIT_UNPROCESSED_ARTICLES"] || "10"),
);
if (unprocessedArticles.length === 0) {
console.log(" No unprocessed articles found.");
return;
}
console.log(`🎯 Found ${unprocessedArticles.length} unprocessed articles`);
// Track articles that successfully generated audio AND episodes
let successfullyGeneratedCount = 0;
for (const article of unprocessedArticles) {
// Check for cancellation before processing each article
if (abortSignal?.aborted) {
throw new Error("Article processing was cancelled");
}
try {
const episodeCreated = await generatePodcastForArticle(
article,
abortSignal,
);
// Only mark as processed and update RSS if episode was actually created
if (episodeCreated) {
await markArticleAsProcessed(article.id);
console.log(`✅ Podcast generated for: ${article.title}`);
successfullyGeneratedCount++;
// Update RSS immediately after each successful episode creation
console.log(
`📻 Updating podcast RSS after successful episode creation...`,
);
try {
await updatePodcastRSS();
console.log(`📻 RSS updated successfully for: ${article.title}`);
} catch (rssError) {
console.error(
`❌ Failed to update RSS after episode creation for: ${article.title}`,
rssError,
);
}
} else {
console.warn(
`⚠️ Episode creation failed for: ${article.title} - not marking as processed`,
);
}
} catch (error) {
if (
error instanceof Error &&
(error.message.includes("cancelled") || error.name === "AbortError")
) {
console.log(`🛑 Article processing cancelled, stopping batch`);
throw error; // Re-throw to propagate cancellation
}
console.error(
`❌ Failed to generate podcast for article: ${article.title}`,
error,
);
// Don't mark as processed if generation failed
}
}
console.log(
`🎯 Batch processing completed: ${successfullyGeneratedCount} episodes successfully created`,
);
if (successfullyGeneratedCount === 0) {
console.log(` No episodes were successfully created in this batch`);
}
} catch (error) {
console.error("💥 Error processing unprocessed articles:", error);
throw error;
}
}
/**
* Process retry queue for failed TTS generation
*/
async function processRetryQueue(abortSignal?: AbortSignal): Promise<void> {
const { getQueueItems, updateQueueItemStatus, removeFromQueue } =
await import("../services/database.js");
const { Database } = await import("bun:sqlite");
const db = new Database(config.paths.dbPath);
console.log("🔄 Processing TTS retry queue...");
try {
const queueItems = await getQueueItems(5); // Process 5 items at a time
if (queueItems.length === 0) {
return;
}
console.log(`📋 Found ${queueItems.length} items in retry queue`);
for (const item of queueItems) {
// Check for cancellation before processing each retry item
if (abortSignal?.aborted) {
throw new Error("Retry queue processing was cancelled");
}
try {
console.log(
`🔁 Retrying TTS generation for: ${item.itemId} (attempt ${item.retryCount + 1}/3)`,
);
// Mark as processing
await updateQueueItemStatus(item.id, "processing");
// Attempt TTS generation without re-queuing on failure
await generateTTSWithoutQueue(
item.itemId,
item.scriptText,
item.retryCount,
);
// Success - remove from queue and update RSS
await removeFromQueue(item.id);
console.log(`✅ TTS retry successful for: ${item.itemId}`);
// Update RSS immediately after successful retry
console.log(`📻 Updating podcast RSS after successful retry...`);
try {
await updatePodcastRSS();
console.log(
`📻 RSS updated successfully after retry for: ${item.itemId}`,
);
} catch (rssError) {
console.error(
`❌ Failed to update RSS after retry for: ${item.itemId}`,
rssError,
);
}
} catch (error) {
if (
error instanceof Error &&
(error.message.includes("cancelled") || error.name === "AbortError")
) {
console.log(`🛑 TTS retry processing cancelled for: ${item.itemId}`);
throw error; // Re-throw cancellation errors
}
console.error(`❌ TTS retry failed for: ${item.itemId}`, error);
try {
if (item.retryCount >= 2) {
// Max retries reached, mark as failed
await updateQueueItemStatus(item.id, "failed");
console.log(
`💀 Max retries reached for: ${item.itemId}, marking as failed`,
);
} else {
// Increment retry count and reset to pending for next retry
const updatedRetryCount = item.retryCount + 1;
const stmt = db.prepare(
"UPDATE tts_queue SET retry_count = ?, status = 'pending' WHERE id = ?",
);
stmt.run(updatedRetryCount, item.id);
console.log(
`🔄 Updated retry count to ${updatedRetryCount} for: ${item.itemId}`,
);
}
} catch (dbError) {
console.error(
`❌ Failed to update queue status for: ${item.itemId}`,
dbError,
);
}
}
}
} catch (error) {
console.error("💥 Error processing retry queue:", error);
throw error;
} finally {
// Clean up database connection
try {
db.close();
} catch (closeError) {
console.warn(
"⚠️ Warning: Failed to close database connection:",
closeError,
);
}
// Close Puppeteer browser on exit
await closeBrowser();
}
}
/**
* Generate podcast for a single article
* Returns true if episode was successfully created, false otherwise
*/
async function generatePodcastForArticle(
article: any,
abortSignal?: AbortSignal,
): Promise<boolean> {
console.log(`🎤 Generating podcast for: ${article.title}`);
try {
// Check for cancellation
if (abortSignal?.aborted) {
throw new Error("Podcast generation was cancelled");
}
// Get feed information for context
await getFeedById(article.feedId);
// Check for cancellation before classification
if (abortSignal?.aborted) {
throw new Error("Podcast generation was cancelled");
}
// Enhance article content with web scraping if needed
console.log(`🔍 Enhancing content for: ${article.title}`);
const enhancedContent = await enhanceArticleContent(
article.title,
article.link,
article.content,
article.description,
);
// Check for cancellation before content generation
if (abortSignal?.aborted) {
throw new Error("Podcast generation was cancelled");
}
// Generate podcast content for this single article
const podcastContent = await openAI_GeneratePodcastContent(article.title, [
{
title: article.title,
link: article.link,
content: enhancedContent.content,
description: enhancedContent.description,
},
]);
// Classify the episode based on the podcast content
console.log(`🏷️ Classifying episode content for: ${article.title}`);
const episodeCategory = await openAI_ClassifyEpisode(
article.title,
enhancedContent.description,
enhancedContent.content,
);
console.log(`🏷️ Episode classified as: ${episodeCategory}`);
// Check for cancellation before TTS
if (abortSignal?.aborted) {
throw new Error("Podcast generation was cancelled");
}
// Generate unique ID for the episode
const episodeId = crypto.randomUUID();
// Generate TTS audio - this is the critical step that can fail
let audioFilePath: string;
try {
audioFilePath = await generateTTS(episodeId, podcastContent);
console.log(`🔊 Audio generated: ${audioFilePath}`);
} catch (ttsError) {
console.error(`❌ TTS generation failed for ${article.title}:`, ttsError);
// Check if error indicates item was added to retry queue
const errorMessage =
ttsError instanceof Error ? ttsError.message : String(ttsError);
if (errorMessage.includes("added to retry queue")) {
console.log(
`📋 Article will be retried later via TTS queue: ${article.title}`,
);
// Don't mark as processed - leave it for retry
return false;
} else {
console.error(
`💀 TTS generation permanently failed for ${article.title} - max retries exceeded`,
);
// Max retries exceeded, don't create episode but mark as processed to avoid infinite retry
return false;
}
}
// Verify audio file was actually created and is valid
try {
const audioStats = await getAudioFileStats(audioFilePath);
if (audioStats.size === 0) {
console.error(
`❌ Audio file is empty for ${article.title}: ${audioFilePath}`,
);
return false;
}
} catch (statsError) {
console.error(
`❌ Cannot access audio file for ${article.title}: ${audioFilePath}`,
statsError,
);
return false;
}
// Get audio file stats
const audioStats = await getAudioFileStats(audioFilePath);
// Save episode - only if we have valid audio
try {
await saveEpisode({
articleId: article.id,
title: article.title,
description:
article.description || `Podcast episode for: ${article.title}`,
audioPath: audioFilePath,
duration: audioStats.duration,
fileSize: audioStats.size,
category: episodeCategory,
});
console.log(`💾 Episode saved for article: ${article.title}`);
return true;
} catch (saveError) {
console.error(
`❌ Failed to save episode for ${article.title}:`,
saveError,
);
return false;
}
} catch (error) {
if (
error instanceof Error &&
(error.message.includes("cancelled") || error.name === "AbortError")
) {
console.log(`🛑 Podcast generation cancelled for: ${article.title}`);
throw error; // Re-throw cancellation errors to stop the batch
}
console.error(
`💥 Error generating podcast for article: ${article.title}`,
error,
);
return false;
}
}
/**
* Get audio file statistics
*/
async function getAudioFileStats(
audioFileName: string,
): Promise<{ duration?: number; size: number }> {
try {
const audioPath = `${config.paths.podcastAudioDir}/${audioFileName}`;
const stats = await fs.stat(audioPath);
return {
size: stats.size,
// TODO: Add duration calculation using ffprobe if needed
duration: undefined,
};
} catch (error) {
console.warn(
`⚠️ Could not get audio file stats for ${audioFileName}:`,
error,
);
return { size: 0 };
}
}
/**
* Legacy function compatibility - process feed URL the old way
* This is kept for backward compatibility during migration
*/
// Commented out to fix TypeScript unused variable warnings
/* async function legacyProcessFeedUrl(url: string): Promise<void> {
console.log(`🔄 Legacy processing for: ${url}`);
const parser = new Parser<FeedItem>();
const feed = await parser.parseURL(url);
// Feed classification
const feedTitle = feed.title || url;
const category = await openAI_ClassifyFeed(feedTitle);
console.log(`Feed classified: ${feedTitle} - ${category}`);
const latest5Items = (feed.items || []).slice(0, 5);
if (latest5Items.length === 0) {
console.log(`No items found in feed: ${feedTitle}`);
return;
}
// Generate podcast content (old way - multiple articles in one podcast)
console.log(`Generating podcast content for: ${feedTitle}`);
const validItems = latest5Items.filter((item): item is FeedItem => {
return !!item.title && !!item.link;
});
if (validItems.length === 0) {
console.log(`No valid items found in feed: ${feedTitle}`);
return;
}
const podcastContent = await openAI_GeneratePodcastContent(
feedTitle,
validItems as any
);
// Generate unique ID for this feed and category combination
const feedUrlHash = crypto.createHash("md5").update(url).digest("hex");
const categoryHash = crypto.createHash("md5").update(category).digest("hex");
const timestamp = new Date().getTime();
const uniqueId = `${feedUrlHash}-${categoryHash}-${timestamp}`;
const audioFilePath = await generateTTS(uniqueId, podcastContent);
console.log(`Audio file generated: ${audioFilePath}`);
// Save as legacy episode
const firstItem = latest5Items[0];
if (!firstItem) {
console.warn("No items found");
return;
}
const pubDate = new Date(firstItem.pubDate || new Date());
// For now, save using the new episode structure
// TODO: Remove this once migration is complete
const tempArticleId = crypto.randomUUID();
await saveEpisode({
articleId: tempArticleId,
title: `${category}: ${feedTitle}`,
description: `Legacy podcast for feed: ${feedTitle}`,
audioPath: audioFilePath
});
console.log(`Legacy episode saved: ${category} - ${feedTitle}`);
// Mark individual articles as processed (legacy)
for (const item of latest5Items) {
try {
const itemId = (item as any)["id"] as string | undefined;
const fallbackId = item.link || item.title || JSON.stringify(item);
const finalItemId = itemId && typeof itemId === "string" && itemId.trim() !== ""
? itemId
: `fallback-${Buffer.from(fallbackId).toString("base64")}`;
if (!finalItemId || finalItemId.trim() === "") {
console.warn(`Could not generate ID for feed item`, {
feedUrl: url,
itemTitle: item.title,
itemLink: item.link,
});
continue;
}
const alreadyProcessed = await markAsProcessed(url, finalItemId);
if (alreadyProcessed) {
console.log(`Already processed: ${finalItemId}`);
}
} catch (error) {
console.error(`Error marking item as processed:`, error);
}
}
} */
// Export function for use in server
export async function addNewFeedUrl(feedUrl: string): Promise<void> {
if (!feedUrl || !feedUrl.startsWith("http")) {
throw new Error("Invalid feed URL");
}
try {
// Parse RSS feed to get feed information including title
const parser = new Parser<FeedItem>();
const feed = await parser.parseURL(feedUrl);
// Extract feed title, fallback to URL if not available
const feedTitle = feed.title || feedUrl;
// Classify feed category using OpenAI
const category = await openAI_ClassifyFeed(feedTitle);
// Add to feeds table with title and category
await saveFeed({
url: feedUrl,
title: feedTitle,
category: category,
active: true,
});
console.log(
`✅ Feed URL added: ${feedUrl} (Title: ${feedTitle}, Category: ${category})`,
);
} catch (error) {
console.error(`❌ Failed to add feed URL: ${feedUrl}`, error);
throw error;
}
}
// Run if this script is executed directly
if (import.meta.main) {
batchProcess().catch((err) => {
console.error("💥 Batch process failed:", err);
process.exit(1);
});
}