Add new batch scheduler service

This commit is contained in:
2025-06-07 15:31:59 +09:00
parent ffb9ba644e
commit b413162033
4 changed files with 212 additions and 21 deletions

View File

@ -20,6 +20,7 @@ interface Stats {
batchScheduler: { batchScheduler: {
enabled: boolean; enabled: boolean;
isRunning: boolean; isRunning: boolean;
canForceStop: boolean;
lastRun?: string; lastRun?: string;
nextRun?: string; nextRun?: string;
}; };
@ -169,6 +170,34 @@ function App() {
} }
}; };
const forceStopBatch = async () => {
if (!confirm('実行中のバッチ処理を強制停止しますか?\n\n進行中の処理が中断され、データの整合性に影響が出る可能性があります。')) {
return;
}
try {
const res = await fetch('/api/admin/batch/force-stop', {
method: 'POST'
});
const data = await res.json();
if (res.ok) {
if (data.result === 'STOPPED') {
setSuccess(data.message);
} else if (data.result === 'NO_PROCESS') {
setSuccess(data.message);
}
loadData(); // Refresh data to update batch status
} else {
setError(data.error || 'バッチ処理強制停止に失敗しました');
}
} catch (err) {
setError('バッチ処理強制停止に失敗しました');
console.error('Error force stopping batch:', err);
}
};
const toggleBatchScheduler = async (enable: boolean) => { const toggleBatchScheduler = async (enable: boolean) => {
try { try {
const res = await fetch(`/api/admin/batch/${enable ? 'enable' : 'disable'}`, { const res = await fetch(`/api/admin/batch/${enable ? 'enable' : 'disable'}`, {
@ -266,9 +295,22 @@ function App() {
</div> </div>
<div style={{ marginBottom: '20px' }}> <div style={{ marginBottom: '20px' }}>
<button className="btn btn-success" onClick={triggerBatch}> <button
className="btn btn-success"
onClick={triggerBatch}
disabled={stats?.batchScheduler?.isRunning}
>
{stats?.batchScheduler?.isRunning ? 'バッチ処理実行中...' : 'バッチ処理を手動実行'}
</button> </button>
{stats?.batchScheduler?.canForceStop && (
<button
className="btn btn-danger"
onClick={forceStopBatch}
style={{ marginLeft: '8px' }}
>
</button>
)}
<button className="btn btn-primary" onClick={loadData} style={{ marginLeft: '8px' }}> <button className="btn btn-primary" onClick={loadData} style={{ marginLeft: '8px' }}>
</button> </button>
@ -404,6 +446,7 @@ function App() {
<div style={{ marginBottom: '24px' }}> <div style={{ marginBottom: '24px' }}>
<h4></h4> <h4></h4>
<div style={{ display: 'flex', gap: '12px', marginTop: '12px' }}>
<button <button
className="btn btn-primary" className="btn btn-primary"
onClick={triggerBatch} onClick={triggerBatch}
@ -411,8 +454,17 @@ function App() {
> >
{stats?.batchScheduler?.isRunning ? 'バッチ処理実行中...' : 'バッチ処理を手動実行'} {stats?.batchScheduler?.isRunning ? 'バッチ処理実行中...' : 'バッチ処理を手動実行'}
</button> </button>
{stats?.batchScheduler?.canForceStop && (
<button
className="btn btn-danger"
onClick={forceStopBatch}
>
</button>
)}
</div>
<p style={{ fontSize: '14px', color: '#6c757d', marginTop: '8px' }}> <p style={{ fontSize: '14px', color: '#6c757d', marginTop: '8px' }}>
</p> </p>
</div> </div>
@ -423,6 +475,7 @@ function App() {
<li>RSS記事の取得</li> <li>RSS記事の取得</li>
<li></li> <li></li>
<li></li> <li></li>
<li><strong>:</strong> </li>
</ul> </ul>
</div> </div>
</> </>

View File

@ -378,6 +378,31 @@ app.post("/api/admin/batch/trigger", async (c) => {
} }
}); });
app.post("/api/admin/batch/force-stop", async (c) => {
try {
console.log("🛑 Force stop batch process requested via admin panel");
const stopped = batchScheduler.forceStop();
if (stopped) {
return c.json({
result: "STOPPED",
message: "Batch process force stop signal sent",
timestamp: new Date().toISOString()
});
} else {
return c.json({
result: "NO_PROCESS",
message: "No batch process is currently running",
timestamp: new Date().toISOString()
}, 200);
}
} catch (error) {
console.error("Error force stopping batch process:", error);
return c.json({ error: "Failed to force stop batch process" }, 500);
}
});
// Static file handlers for admin panel UI // Static file handlers for admin panel UI
app.get("/assets/*", async (c) => { app.get("/assets/*", async (c) => {
try { try {

View File

@ -32,10 +32,15 @@ interface FeedItem {
* Main batch processing function * Main batch processing function
* Processes all feeds and generates podcasts for new articles * Processes all feeds and generates podcasts for new articles
*/ */
export async function batchProcess(): Promise<void> { export async function batchProcess(abortSignal?: AbortSignal): Promise<void> {
try { try {
console.log("🚀 Starting enhanced batch process..."); console.log("🚀 Starting enhanced batch process...");
// Check for cancellation at start
if (abortSignal?.aborted) {
throw new Error('Batch process was cancelled before starting');
}
// Load feed URLs from file // Load feed URLs from file
const feedUrls = await loadFeedUrls(); const feedUrls = await loadFeedUrls();
if (feedUrls.length === 0) { if (feedUrls.length === 0) {
@ -47,22 +52,42 @@ export async function batchProcess(): Promise<void> {
// Process each feed URL // Process each feed URL
for (const url of feedUrls) { for (const url of feedUrls) {
// Check for cancellation before processing each feed
if (abortSignal?.aborted) {
throw new Error('Batch process was cancelled during feed processing');
}
try { try {
await processFeedUrl(url); await processFeedUrl(url, abortSignal);
} catch (error) { } catch (error) {
// Re-throw cancellation errors
if (error instanceof Error && (error.message.includes('cancelled') || error.name === 'AbortError')) {
throw error;
}
console.error(`❌ Failed to process feed ${url}:`, error); console.error(`❌ Failed to process feed ${url}:`, error);
// Continue with other feeds // Continue with other feeds
} }
} }
// Check for cancellation before processing articles
if (abortSignal?.aborted) {
throw new Error('Batch process was cancelled before article processing');
}
// Process unprocessed articles and generate podcasts // Process unprocessed articles and generate podcasts
await processUnprocessedArticles(); await processUnprocessedArticles(abortSignal);
console.log( console.log(
"✅ Enhanced batch process completed:", "✅ Enhanced batch process completed:",
new Date().toISOString(), new Date().toISOString(),
); );
} catch (error) { } catch (error) {
if (error instanceof Error && (error.message.includes('cancelled') || error.name === 'AbortError')) {
console.log("🛑 Batch process was cancelled");
const abortError = new Error('Batch process was cancelled');
abortError.name = 'AbortError';
throw abortError;
}
console.error("💥 Batch process failed:", error); console.error("💥 Batch process failed:", error);
throw error; throw error;
} }
@ -90,11 +115,16 @@ async function loadFeedUrls(): Promise<string[]> {
/** /**
* Process a single feed URL and discover new articles * Process a single feed URL and discover new articles
*/ */
async function processFeedUrl(url: string): Promise<void> { async function processFeedUrl(url: string, abortSignal?: AbortSignal): Promise<void> {
if (!url || !url.startsWith("http")) { if (!url || !url.startsWith("http")) {
throw new Error(`Invalid feed URL: ${url}`); throw new Error(`Invalid feed URL: ${url}`);
} }
// Check for cancellation
if (abortSignal?.aborted) {
throw new Error('Feed processing was cancelled');
}
console.log(`🔍 Processing feed: ${url}`); console.log(`🔍 Processing feed: ${url}`);
try { try {
@ -102,6 +132,11 @@ async function processFeedUrl(url: string): Promise<void> {
const parser = new Parser<FeedItem>(); const parser = new Parser<FeedItem>();
const feed = await parser.parseURL(url); const feed = await parser.parseURL(url);
// Check for cancellation after parsing
if (abortSignal?.aborted) {
throw new Error('Feed processing was cancelled');
}
// Get or create feed record // Get or create feed record
let feedRecord = await getFeedByUrl(url); let feedRecord = await getFeedByUrl(url);
if (!feedRecord) { if (!feedRecord) {
@ -189,12 +224,22 @@ async function discoverNewArticles(
/** /**
* Process unprocessed articles and generate podcasts * Process unprocessed articles and generate podcasts
*/ */
async function processUnprocessedArticles(): Promise<void> { async function processUnprocessedArticles(abortSignal?: AbortSignal): Promise<void> {
console.log("🎧 Processing unprocessed articles..."); console.log("🎧 Processing unprocessed articles...");
try { try {
// Check for cancellation
if (abortSignal?.aborted) {
throw new Error('Article processing was cancelled');
}
// Process retry queue first // Process retry queue first
await processRetryQueue(); await processRetryQueue(abortSignal);
// Check for cancellation after retry queue
if (abortSignal?.aborted) {
throw new Error('Article processing was cancelled');
}
// Get unprocessed articles (limit to prevent overwhelming) // Get unprocessed articles (limit to prevent overwhelming)
const unprocessedArticles = await getUnprocessedArticles( const unprocessedArticles = await getUnprocessedArticles(
@ -212,8 +257,13 @@ async function processUnprocessedArticles(): Promise<void> {
let successfullyGeneratedCount = 0; let successfullyGeneratedCount = 0;
for (const article of unprocessedArticles) { for (const article of unprocessedArticles) {
// Check for cancellation before processing each article
if (abortSignal?.aborted) {
throw new Error('Article processing was cancelled');
}
try { try {
const episodeCreated = await generatePodcastForArticle(article); const episodeCreated = await generatePodcastForArticle(article, abortSignal);
// Only mark as processed and update RSS if episode was actually created // Only mark as processed and update RSS if episode was actually created
if (episodeCreated) { if (episodeCreated) {
@ -233,6 +283,10 @@ async function processUnprocessedArticles(): Promise<void> {
console.warn(`⚠️ Episode creation failed for: ${article.title} - not marking as processed`); console.warn(`⚠️ Episode creation failed for: ${article.title} - not marking as processed`);
} }
} catch (error) { } catch (error) {
if (error instanceof Error && (error.message.includes('cancelled') || error.name === 'AbortError')) {
console.log(`🛑 Article processing cancelled, stopping batch`);
throw error; // Re-throw to propagate cancellation
}
console.error( console.error(
`❌ Failed to generate podcast for article: ${article.title}`, `❌ Failed to generate podcast for article: ${article.title}`,
error, error,
@ -254,7 +308,7 @@ async function processUnprocessedArticles(): Promise<void> {
/** /**
* Process retry queue for failed TTS generation * Process retry queue for failed TTS generation
*/ */
async function processRetryQueue(): Promise<void> { async function processRetryQueue(abortSignal?: AbortSignal): Promise<void> {
const { getQueueItems, updateQueueItemStatus, removeFromQueue } = await import("../services/database.js"); const { getQueueItems, updateQueueItemStatus, removeFromQueue } = await import("../services/database.js");
const { Database } = await import("bun:sqlite"); const { Database } = await import("bun:sqlite");
const db = new Database(config.paths.dbPath); const db = new Database(config.paths.dbPath);
@ -271,6 +325,11 @@ async function processRetryQueue(): Promise<void> {
console.log(`📋 Found ${queueItems.length} items in retry queue`); console.log(`📋 Found ${queueItems.length} items in retry queue`);
for (const item of queueItems) { for (const item of queueItems) {
// Check for cancellation before processing each retry item
if (abortSignal?.aborted) {
throw new Error('Retry queue processing was cancelled');
}
try { try {
console.log(`🔁 Retrying TTS generation for: ${item.itemId} (attempt ${item.retryCount + 1}/3)`); console.log(`🔁 Retrying TTS generation for: ${item.itemId} (attempt ${item.retryCount + 1}/3)`);
@ -294,6 +353,11 @@ async function processRetryQueue(): Promise<void> {
} }
} catch (error) { } catch (error) {
if (error instanceof Error && (error.message.includes('cancelled') || error.name === 'AbortError')) {
console.log(`🛑 TTS retry processing cancelled for: ${item.itemId}`);
throw error; // Re-throw cancellation errors
}
console.error(`❌ TTS retry failed for: ${item.itemId}`, error); console.error(`❌ TTS retry failed for: ${item.itemId}`, error);
try { try {
@ -330,20 +394,35 @@ async function processRetryQueue(): Promise<void> {
* Generate podcast for a single article * Generate podcast for a single article
* Returns true if episode was successfully created, false otherwise * Returns true if episode was successfully created, false otherwise
*/ */
async function generatePodcastForArticle(article: any): Promise<boolean> { async function generatePodcastForArticle(article: any, abortSignal?: AbortSignal): Promise<boolean> {
console.log(`🎤 Generating podcast for: ${article.title}`); console.log(`🎤 Generating podcast for: ${article.title}`);
try { try {
// Check for cancellation
if (abortSignal?.aborted) {
throw new Error('Podcast generation was cancelled');
}
// Get feed information for context // Get feed information for context
const feed = await getFeedById(article.feedId); const feed = await getFeedById(article.feedId);
const feedTitle = feed?.title || "Unknown Feed"; const feedTitle = feed?.title || "Unknown Feed";
// Check for cancellation before classification
if (abortSignal?.aborted) {
throw new Error('Podcast generation was cancelled');
}
// Classify the article/feed // Classify the article/feed
const category = await openAI_ClassifyFeed( const category = await openAI_ClassifyFeed(
`${feedTitle}: ${article.title}`, `${feedTitle}: ${article.title}`,
); );
console.log(`🏷️ Article classified as: ${category}`); console.log(`🏷️ Article classified as: ${category}`);
// Check for cancellation before content generation
if (abortSignal?.aborted) {
throw new Error('Podcast generation was cancelled');
}
// Generate podcast content for this single article // Generate podcast content for this single article
const podcastContent = await openAI_GeneratePodcastContent(article.title, [ const podcastContent = await openAI_GeneratePodcastContent(article.title, [
{ {
@ -352,6 +431,11 @@ async function generatePodcastForArticle(article: any): Promise<boolean> {
}, },
]); ]);
// Check for cancellation before TTS
if (abortSignal?.aborted) {
throw new Error('Podcast generation was cancelled');
}
// Generate unique ID for the episode // Generate unique ID for the episode
const episodeId = crypto.randomUUID(); const episodeId = crypto.randomUUID();
@ -410,6 +494,10 @@ async function generatePodcastForArticle(article: any): Promise<boolean> {
return false; return false;
} }
} catch (error) { } catch (error) {
if (error instanceof Error && (error.message.includes('cancelled') || error.name === 'AbortError')) {
console.log(`🛑 Podcast generation cancelled for: ${article.title}`);
throw error; // Re-throw cancellation errors to stop the batch
}
console.error( console.error(
`💥 Error generating podcast for article: ${article.title}`, `💥 Error generating podcast for article: ${article.title}`,
error, error,

View File

@ -6,14 +6,18 @@ interface BatchSchedulerState {
nextRun?: string; nextRun?: string;
isRunning: boolean; isRunning: boolean;
intervalId?: NodeJS.Timeout; intervalId?: NodeJS.Timeout;
canForceStop: boolean;
} }
class BatchScheduler { class BatchScheduler {
private state: BatchSchedulerState = { private state: BatchSchedulerState = {
enabled: true, enabled: true,
isRunning: false, isRunning: false,
canForceStop: false,
}; };
private currentAbortController?: AbortController;
private readonly SIX_HOURS_MS = 6 * 60 * 60 * 1000; // 6 hours in milliseconds private readonly SIX_HOURS_MS = 6 * 60 * 60 * 1000; // 6 hours in milliseconds
constructor() { constructor() {
@ -64,16 +68,26 @@ class BatchScheduler {
} }
this.state.isRunning = true; this.state.isRunning = true;
this.state.canForceStop = true;
this.state.lastRun = new Date().toISOString(); this.state.lastRun = new Date().toISOString();
// Create new AbortController for this batch run
this.currentAbortController = new AbortController();
try { try {
console.log("🔄 Running scheduled batch process..."); console.log("🔄 Running scheduled batch process...");
await batchProcess(); await batchProcess(this.currentAbortController.signal);
console.log("✅ Scheduled batch process completed"); console.log("✅ Scheduled batch process completed");
} catch (error) { } catch (error) {
if (error instanceof Error && error.name === 'AbortError') {
console.log("🛑 Batch process was forcefully stopped");
} else {
console.error("❌ Error during scheduled batch process:", error); console.error("❌ Error during scheduled batch process:", error);
}
} finally { } finally {
this.state.isRunning = false; this.state.isRunning = false;
this.state.canForceStop = false;
this.currentAbortController = undefined;
} }
} }
@ -117,6 +131,17 @@ class BatchScheduler {
}; };
} }
public forceStop(): boolean {
if (!this.state.isRunning || !this.currentAbortController) {
console.log(" No batch process currently running to stop");
return false;
}
console.log("🛑 Force stopping batch process...");
this.currentAbortController.abort();
return true;
}
public isEnabled(): boolean { public isEnabled(): boolean {
return this.state.enabled; return this.state.enabled;
} }