diff --git a/admin-panel/src/App.tsx b/admin-panel/src/App.tsx index d6f197f..fe337db 100644 --- a/admin-panel/src/App.tsx +++ b/admin-panel/src/App.tsx @@ -20,6 +20,7 @@ interface Stats { batchScheduler: { enabled: boolean; isRunning: boolean; + canForceStop: boolean; lastRun?: string; nextRun?: string; }; @@ -169,6 +170,34 @@ function App() { } }; + const forceStopBatch = async () => { + if (!confirm('実行中のバッチ処理を強制停止しますか?\n\n進行中の処理が中断され、データの整合性に影響が出る可能性があります。')) { + return; + } + + try { + const res = await fetch('/api/admin/batch/force-stop', { + method: 'POST' + }); + + const data = await res.json(); + + if (res.ok) { + if (data.result === 'STOPPED') { + setSuccess(data.message); + } else if (data.result === 'NO_PROCESS') { + setSuccess(data.message); + } + loadData(); // Refresh data to update batch status + } else { + setError(data.error || 'バッチ処理強制停止に失敗しました'); + } + } catch (err) { + setError('バッチ処理強制停止に失敗しました'); + console.error('Error force stopping batch:', err); + } + }; + const toggleBatchScheduler = async (enable: boolean) => { try { const res = await fetch(`/api/admin/batch/${enable ? 'enable' : 'disable'}`, { @@ -266,9 +295,22 @@ function App() {
- + {stats?.batchScheduler?.canForceStop && ( + + )} @@ -404,15 +446,25 @@ function App() {

手動実行

- +
+ + {stats?.batchScheduler?.canForceStop && ( + + )} +

- スケジューラーの状態に関係なく、バッチ処理を手動で実行できます。 + スケジューラーの状態に関係なく、バッチ処理を手動で実行できます。実行中の場合は強制停止も可能です。

@@ -423,6 +475,7 @@ function App() {
  • 新しいRSS記事の取得、要約生成、音声合成を行います
  • スケジューラーが無効の場合、定期実行は停止しますが手動実行は可能です
  • バッチ処理の実行中は重複実行を防ぐため、新たな実行はスキップされます
  • +
  • 強制停止: 実行中のバッチ処理を緊急停止できます(データ整合性に注意)
  • diff --git a/admin-server.ts b/admin-server.ts index bf7deac..1043f6b 100644 --- a/admin-server.ts +++ b/admin-server.ts @@ -378,6 +378,31 @@ app.post("/api/admin/batch/trigger", async (c) => { } }); +app.post("/api/admin/batch/force-stop", async (c) => { + try { + console.log("🛑 Force stop batch process requested via admin panel"); + + const stopped = batchScheduler.forceStop(); + + if (stopped) { + return c.json({ + result: "STOPPED", + message: "Batch process force stop signal sent", + timestamp: new Date().toISOString() + }); + } else { + return c.json({ + result: "NO_PROCESS", + message: "No batch process is currently running", + timestamp: new Date().toISOString() + }, 200); + } + } catch (error) { + console.error("Error force stopping batch process:", error); + return c.json({ error: "Failed to force stop batch process" }, 500); + } +}); + // Static file handlers for admin panel UI app.get("/assets/*", async (c) => { try { diff --git a/scripts/fetch_and_generate.ts b/scripts/fetch_and_generate.ts index b959041..f956279 100644 --- a/scripts/fetch_and_generate.ts +++ b/scripts/fetch_and_generate.ts @@ -32,10 +32,15 @@ interface FeedItem { * Main batch processing function * Processes all feeds and generates podcasts for new articles */ -export async function batchProcess(): Promise { +export async function batchProcess(abortSignal?: AbortSignal): Promise { try { console.log("🚀 Starting enhanced batch process..."); + // Check for cancellation at start + if (abortSignal?.aborted) { + throw new Error('Batch process was cancelled before starting'); + } + // Load feed URLs from file const feedUrls = await loadFeedUrls(); if (feedUrls.length === 0) { @@ -47,22 +52,42 @@ export async function batchProcess(): Promise { // Process each feed URL for (const url of feedUrls) { + // Check for cancellation before processing each feed + if (abortSignal?.aborted) { + throw new Error('Batch process was cancelled during feed processing'); + } + try { - await processFeedUrl(url); + await processFeedUrl(url, abortSignal); } catch (error) { + // Re-throw cancellation errors + if (error instanceof Error && (error.message.includes('cancelled') || error.name === 'AbortError')) { + throw error; + } console.error(`❌ Failed to process feed ${url}:`, error); // Continue with other feeds } } + // Check for cancellation before processing articles + if (abortSignal?.aborted) { + throw new Error('Batch process was cancelled before article processing'); + } + // Process unprocessed articles and generate podcasts - await processUnprocessedArticles(); + await processUnprocessedArticles(abortSignal); console.log( "✅ Enhanced batch process completed:", new Date().toISOString(), ); } catch (error) { + if (error instanceof Error && (error.message.includes('cancelled') || error.name === 'AbortError')) { + console.log("🛑 Batch process was cancelled"); + const abortError = new Error('Batch process was cancelled'); + abortError.name = 'AbortError'; + throw abortError; + } console.error("💥 Batch process failed:", error); throw error; } @@ -90,17 +115,27 @@ async function loadFeedUrls(): Promise { /** * Process a single feed URL and discover new articles */ -async function processFeedUrl(url: string): Promise { +async function processFeedUrl(url: string, abortSignal?: AbortSignal): Promise { if (!url || !url.startsWith("http")) { throw new Error(`Invalid feed URL: ${url}`); } + // Check for cancellation + if (abortSignal?.aborted) { + throw new Error('Feed processing was cancelled'); + } + console.log(`🔍 Processing feed: ${url}`); try { // Parse RSS feed const parser = new Parser(); const feed = await parser.parseURL(url); + + // Check for cancellation after parsing + if (abortSignal?.aborted) { + throw new Error('Feed processing was cancelled'); + } // Get or create feed record let feedRecord = await getFeedByUrl(url); @@ -189,12 +224,22 @@ async function discoverNewArticles( /** * Process unprocessed articles and generate podcasts */ -async function processUnprocessedArticles(): Promise { +async function processUnprocessedArticles(abortSignal?: AbortSignal): Promise { console.log("🎧 Processing unprocessed articles..."); try { + // Check for cancellation + if (abortSignal?.aborted) { + throw new Error('Article processing was cancelled'); + } + // Process retry queue first - await processRetryQueue(); + await processRetryQueue(abortSignal); + + // Check for cancellation after retry queue + if (abortSignal?.aborted) { + throw new Error('Article processing was cancelled'); + } // Get unprocessed articles (limit to prevent overwhelming) const unprocessedArticles = await getUnprocessedArticles( @@ -212,8 +257,13 @@ async function processUnprocessedArticles(): Promise { let successfullyGeneratedCount = 0; for (const article of unprocessedArticles) { + // Check for cancellation before processing each article + if (abortSignal?.aborted) { + throw new Error('Article processing was cancelled'); + } + try { - const episodeCreated = await generatePodcastForArticle(article); + const episodeCreated = await generatePodcastForArticle(article, abortSignal); // Only mark as processed and update RSS if episode was actually created if (episodeCreated) { @@ -233,6 +283,10 @@ async function processUnprocessedArticles(): Promise { console.warn(`⚠️ Episode creation failed for: ${article.title} - not marking as processed`); } } catch (error) { + if (error instanceof Error && (error.message.includes('cancelled') || error.name === 'AbortError')) { + console.log(`🛑 Article processing cancelled, stopping batch`); + throw error; // Re-throw to propagate cancellation + } console.error( `❌ Failed to generate podcast for article: ${article.title}`, error, @@ -254,7 +308,7 @@ async function processUnprocessedArticles(): Promise { /** * Process retry queue for failed TTS generation */ -async function processRetryQueue(): Promise { +async function processRetryQueue(abortSignal?: AbortSignal): Promise { const { getQueueItems, updateQueueItemStatus, removeFromQueue } = await import("../services/database.js"); const { Database } = await import("bun:sqlite"); const db = new Database(config.paths.dbPath); @@ -271,6 +325,11 @@ async function processRetryQueue(): Promise { console.log(`📋 Found ${queueItems.length} items in retry queue`); for (const item of queueItems) { + // Check for cancellation before processing each retry item + if (abortSignal?.aborted) { + throw new Error('Retry queue processing was cancelled'); + } + try { console.log(`🔁 Retrying TTS generation for: ${item.itemId} (attempt ${item.retryCount + 1}/3)`); @@ -294,6 +353,11 @@ async function processRetryQueue(): Promise { } } catch (error) { + if (error instanceof Error && (error.message.includes('cancelled') || error.name === 'AbortError')) { + console.log(`🛑 TTS retry processing cancelled for: ${item.itemId}`); + throw error; // Re-throw cancellation errors + } + console.error(`❌ TTS retry failed for: ${item.itemId}`, error); try { @@ -330,20 +394,35 @@ async function processRetryQueue(): Promise { * Generate podcast for a single article * Returns true if episode was successfully created, false otherwise */ -async function generatePodcastForArticle(article: any): Promise { +async function generatePodcastForArticle(article: any, abortSignal?: AbortSignal): Promise { console.log(`🎤 Generating podcast for: ${article.title}`); try { + // Check for cancellation + if (abortSignal?.aborted) { + throw new Error('Podcast generation was cancelled'); + } + // Get feed information for context const feed = await getFeedById(article.feedId); const feedTitle = feed?.title || "Unknown Feed"; + // Check for cancellation before classification + if (abortSignal?.aborted) { + throw new Error('Podcast generation was cancelled'); + } + // Classify the article/feed const category = await openAI_ClassifyFeed( `${feedTitle}: ${article.title}`, ); console.log(`🏷️ Article classified as: ${category}`); + // Check for cancellation before content generation + if (abortSignal?.aborted) { + throw new Error('Podcast generation was cancelled'); + } + // Generate podcast content for this single article const podcastContent = await openAI_GeneratePodcastContent(article.title, [ { @@ -351,6 +430,11 @@ async function generatePodcastForArticle(article: any): Promise { link: article.link, }, ]); + + // Check for cancellation before TTS + if (abortSignal?.aborted) { + throw new Error('Podcast generation was cancelled'); + } // Generate unique ID for the episode const episodeId = crypto.randomUUID(); @@ -410,6 +494,10 @@ async function generatePodcastForArticle(article: any): Promise { return false; } } catch (error) { + if (error instanceof Error && (error.message.includes('cancelled') || error.name === 'AbortError')) { + console.log(`🛑 Podcast generation cancelled for: ${article.title}`); + throw error; // Re-throw cancellation errors to stop the batch + } console.error( `💥 Error generating podcast for article: ${article.title}`, error, diff --git a/services/batch-scheduler.ts b/services/batch-scheduler.ts index 0910944..ea5d1d0 100644 --- a/services/batch-scheduler.ts +++ b/services/batch-scheduler.ts @@ -6,13 +6,17 @@ interface BatchSchedulerState { nextRun?: string; isRunning: boolean; intervalId?: NodeJS.Timeout; + canForceStop: boolean; } class BatchScheduler { private state: BatchSchedulerState = { enabled: true, isRunning: false, + canForceStop: false, }; + + private currentAbortController?: AbortController; private readonly SIX_HOURS_MS = 6 * 60 * 60 * 1000; // 6 hours in milliseconds @@ -64,16 +68,26 @@ class BatchScheduler { } this.state.isRunning = true; + this.state.canForceStop = true; this.state.lastRun = new Date().toISOString(); + + // Create new AbortController for this batch run + this.currentAbortController = new AbortController(); try { console.log("🔄 Running scheduled batch process..."); - await batchProcess(); + await batchProcess(this.currentAbortController.signal); console.log("✅ Scheduled batch process completed"); } catch (error) { - console.error("❌ Error during scheduled batch process:", error); + if (error instanceof Error && error.name === 'AbortError') { + console.log("🛑 Batch process was forcefully stopped"); + } else { + console.error("❌ Error during scheduled batch process:", error); + } } finally { this.state.isRunning = false; + this.state.canForceStop = false; + this.currentAbortController = undefined; } } @@ -117,6 +131,17 @@ class BatchScheduler { }; } + public forceStop(): boolean { + if (!this.state.isRunning || !this.currentAbortController) { + console.log("ℹ️ No batch process currently running to stop"); + return false; + } + + console.log("🛑 Force stopping batch process..."); + this.currentAbortController.abort(); + return true; + } + public isEnabled(): boolean { return this.state.enabled; }