diff --git a/shared/src/types/types.ts b/shared/src/types/types.ts index 3195adae..a8177f03 100644 --- a/shared/src/types/types.ts +++ b/shared/src/types/types.ts @@ -133,6 +133,7 @@ export interface ExtensionData { activeDecorators?: Record; solutionServerConnected: boolean; isWaitingForUserInteraction?: boolean; + isProcessingQueuedMessages?: boolean; } export type ConfigErrorType = diff --git a/vscode/core/src/commands.ts b/vscode/core/src/commands.ts index 11309621..1a82e9a7 100644 --- a/vscode/core/src/commands.ts +++ b/vscode/core/src/commands.ts @@ -15,14 +15,9 @@ import { Position, } from "vscode"; import { cleanRuleSets, loadResultsFromDataFolder, loadRuleSets, loadStaticResults } from "./data"; -import { EnhancedIncident, RuleSet, Scope, ChatMessageType } from "@editor-extensions/shared"; -import { - type KaiWorkflowMessage, - type KaiInteractiveWorkflowInput, -} from "@editor-extensions/agentic"; +import { EnhancedIncident, RuleSet } from "@editor-extensions/shared"; import { updateAnalyzerPath, - getConfigAgentMode, getAllConfigurationValues, enableGenAI, getWorkspaceRelativePath, @@ -39,12 +34,10 @@ import { fixGroupOfIncidents, IncidentTypeItem } from "./issueView"; import { paths } from "./paths"; import { checkIfExecutable, copySampleProviderSettings } from "./utilities/fileUtils"; import { handleConfigureCustomRules } from "./utilities/profiles/profileActions"; -import { v4 as uuidv4 } from "uuid"; -import { processMessage } from "./utilities/ModifiedFiles/processMessage"; -import { MessageQueueManager } from "./utilities/ModifiedFiles/queueManager"; import { VerticalDiffCodeLensProvider } from "./diff/verticalDiffCodeLens"; import type { Logger } from "winston"; import { parseModelConfig, getProviderConfigKeys } from "./modelProvider/config"; +import { SolutionWorkflowOrchestrator } from "./solutionWorkflowOrchestrator"; const isWindows = process.platform === "win32"; @@ -59,7 +52,7 @@ export function executeExtensionCommand(commandSuffix: string, ...args: any[]): /** * Helper function to execute deferred workflow disposal after solution completes */ -function executeDeferredWorkflowDisposal(state: ExtensionState, logger: Logger): void { +export function executeDeferredWorkflowDisposal(state: ExtensionState, logger: Logger): void { if (state.workflowDisposalPending && state.workflowManager && state.workflowManager.dispose) { logger.info("Executing deferred workflow disposal after solution completion"); state.workflowManager.dispose(); @@ -161,248 +154,8 @@ const commandsMap: ( analyzerClient.runAnalysis(); }, [`${EXTENSION_NAME}.getSolution`]: async (incidents: EnhancedIncident[]) => { - if (state.data.isFetchingSolution) { - logger.info("Solution already being fetched"); - window.showWarningMessage("Solution already being fetched"); - return; - } - - // Check if GenAI is disabled - if (state.data.configErrors.some((e) => e.type === "genai-disabled")) { - logger.info("GenAI disabled, cannot get solution"); - window.showErrorMessage("GenAI functionality is disabled."); - return; - } - - // Check if model provider is not initialized - if (!state.modelProvider) { - logger.info("Model provider not initialized, cannot get solution"); - window.showErrorMessage( - "Model provider is not configured. Please check your provider settings.", - ); - return; - } - - // Read agent mode from configuration instead of parameter - const agentMode = getConfigAgentMode(); - logger.info("Get solution command called", { incidents, agentMode }); - await executeExtensionCommand("showResolutionPanel"); - - // Create a scope for the solution - const scope: Scope = { incidents }; - - const clientId = uuidv4(); - state.solutionServerClient.setClientId(clientId); - logger.debug("Client ID set", { clientId }); - - // Update the state to indicate we're starting to fetch a solution - // Clear previous data to prevent stale content from showing - state.mutateData((draft) => { - draft.isFetchingSolution = true; - draft.solutionState = "started"; - draft.solutionScope = scope; - draft.chatMessages = []; // Clear previous chat messages - draft.activeDecorators = {}; - }); - - // Declare variables outside try block for proper cleanup access - const pendingInteractions = new Map void>(); - let workflow: any; - - try { - // Get the profile name from the incidents - const profileName = incidents[0]?.activeProfileName; - if (!profileName) { - window.showErrorMessage("No profile name found in incidents"); - return; - } - - // Set the state to indicate we're fetching a solution - - await state.workflowManager.init({ - modelProvider: state.modelProvider, - workspaceDir: state.data.workspaceRoot, - solutionServerClient: state.solutionServerClient, - }); - logger.debug("Agent initialized"); - - // Get the workflow instance - workflow = state.workflowManager.getWorkflow(); - // Track processed message tokens to prevent duplicates - const processedTokens = new Set(); - - // Clear any existing modified files state at the start of a new solution - state.modifiedFiles.clear(); - const modifiedFilesPromises: Array> = []; - // Queue to store messages that arrive while waiting for user interaction - - // Create the queue manager for centralized queue processing - const queueManager = new MessageQueueManager( - state, - workflow, - modifiedFilesPromises, - processedTokens, - pendingInteractions, - ); - - // Store the resolver function in the state so webview handler can access it - state.resolvePendingInteraction = (messageId: string, response: any) => { - const resolver = pendingInteractions.get(messageId); - if (resolver) { - try { - pendingInteractions.delete(messageId); - resolver(response); - return true; - } catch (error) { - logger.error(`Error executing resolver for messageId: ${messageId}:`, error); - return false; - } - } else { - return false; - } - }; - - // Set up the event listener to use our message processing function - - workflow.removeAllListeners(); - workflow.on("workflowMessage", async (msg: KaiWorkflowMessage) => { - await processMessage(msg, state, queueManager); - }); - - // Add error event listener to catch workflow errors - workflow.on("error", (error: any) => { - logger.error("Workflow error:", error); - state.mutateData((draft) => { - draft.isFetchingSolution = false; - if (draft.solutionState === "started") { - draft.solutionState = "failedOnSending"; - } - }); - executeDeferredWorkflowDisposal(state, logger); - }); - - try { - const input: KaiInteractiveWorkflowInput = { - incidents, - migrationHint: profileName, - programmingLanguage: "Java", - enableAgentMode: agentMode, - }; - - await workflow.run(input); - - // Wait for all message processing to complete before proceeding - // This is critical for non-agentic mode where ModifiedFile messages - // are processed asynchronously during the workflow - if (!agentMode) { - // Give a short delay to ensure all async message processing completes - await new Promise((resolve) => setTimeout(resolve, 100)); - - // Wait for any remaining promises in the modifiedFilesPromises array - await Promise.all(modifiedFilesPromises); - } - } catch (err) { - logger.error(`Error in running the agent - ${err}`); - logger.info(`Error trace - `, err instanceof Error ? err.stack : "N/A"); - - // Ensure isFetchingSolution is reset on any error - state.mutateData((draft) => { - draft.isFetchingSolution = false; - if (draft.solutionState === "started") { - draft.solutionState = "failedOnSending"; - } - }); - executeDeferredWorkflowDisposal(state, logger); - } finally { - // Clear the stuck interaction monitoring - - // Ensure isFetchingSolution is reset even if workflow fails unexpectedly - state.mutateData((draft) => { - draft.isFetchingSolution = false; - if (draft.solutionState === "started") { - draft.solutionState = "failedOnSending"; - } - // Also ensure analysis flags are reset to prevent stuck tasks interactions - draft.isAnalyzing = false; - draft.isAnalysisScheduled = false; - }); - executeDeferredWorkflowDisposal(state, logger); - - // Clean up queue manager - if (queueManager) { - queueManager.dispose(); - } - - // Only clean up if we're not waiting for user interaction - // This prevents clearing pending interactions while users are still deciding on file changes - if (!state.data.isWaitingForUserInteraction) { - pendingInteractions.clear(); - state.resolvePendingInteraction = undefined; - } - - // Clean up workflow resources - if (workflow) { - workflow.removeAllListeners(); - } - - // Dispose of workflow manager if it has pending resources - if (state.workflowManager && state.workflowManager.dispose) { - state.workflowManager.dispose(); - } - } - - // In agentic mode, file changes are handled through ModifiedFile messages - // In non-agentic mode, file changes are also handled through ModifiedFile messages - - // Reset the cache after all processing is complete - state.kaiFsCache.reset(); - - // Update the state - solution fetching is complete - state.mutateData((draft) => { - draft.solutionState = "received"; - draft.isFetchingSolution = false; - // File changes are handled through ModifiedFile messages in both agent and non-agent modes - }); - executeDeferredWorkflowDisposal(state, logger); - - // Clean up pending interactions and resolver function after successful completion - // Only clean up if we're not waiting for user interaction - if (!state.data.isWaitingForUserInteraction) { - pendingInteractions.clear(); - state.resolvePendingInteraction = undefined; - - // Reset solution state - state.mutateData((draft) => { - draft.solutionState = "none"; - }); - } - } catch (error: any) { - logger.error("Error in getSolution", { error }); - - // Clean up pending interactions and resolver function on error - // Only clean up if we're not waiting for user interaction - if (!state.data.isWaitingForUserInteraction) { - pendingInteractions.clear(); - state.resolvePendingInteraction = undefined; - } - - // Update the state to indicate an error - state.mutateData((draft) => { - draft.solutionState = "failedOnSending"; - draft.isFetchingSolution = false; - draft.chatMessages.push({ - messageToken: `m${Date.now()}`, - kind: ChatMessageType.String, - value: { message: `Error: ${error instanceof Error ? error.message : String(error)}` }, - timestamp: new Date().toISOString(), - }); - }); - executeDeferredWorkflowDisposal(state, logger); - - window.showErrorMessage( - `Failed to generate solution: ${error instanceof Error ? error.message : String(error)}`, - ); - } + const orchestrator = new SolutionWorkflowOrchestrator(state, logger, incidents); + await orchestrator.run(); }, [`${EXTENSION_NAME}.getSuccessRate`]: async () => { logger.info("Getting success rate for incidents"); diff --git a/vscode/core/src/extension.ts b/vscode/core/src/extension.ts index 35ffbd7c..0e0c2a57 100644 --- a/vscode/core/src/extension.ts +++ b/vscode/core/src/extension.ts @@ -95,6 +95,7 @@ class VsCodeExtension { activeDecorators: {}, solutionServerConnected: false, isWaitingForUserInteraction: false, + isProcessingQueuedMessages: false, analysisConfig: { labelSelector: "", labelSelectorValid: false, diff --git a/vscode/core/src/extensionState.ts b/vscode/core/src/extensionState.ts index def01025..605fa0b8 100644 --- a/vscode/core/src/extensionState.ts +++ b/vscode/core/src/extensionState.ts @@ -15,6 +15,7 @@ import { EventEmitter } from "events"; import winston from "winston"; import { VerticalDiffManager } from "./diff/vertical/manager"; import { StaticDiffAdapter } from "./diff/staticDiffAdapter"; +import { MessageQueueManager } from "./utilities/ModifiedFiles/queueManager"; export interface ExtensionState { analyzerClient: AnalyzerClient; @@ -50,4 +51,6 @@ export interface ExtensionState { modelProvider: KaiModelProvider | undefined; verticalDiffManager?: VerticalDiffManager; staticDiffAdapter?: StaticDiffAdapter; + currentQueueManager?: MessageQueueManager; + pendingInteractionsMap?: Map void>; } diff --git a/vscode/core/src/solutionWorkflowOrchestrator.ts b/vscode/core/src/solutionWorkflowOrchestrator.ts new file mode 100644 index 00000000..ec639afb --- /dev/null +++ b/vscode/core/src/solutionWorkflowOrchestrator.ts @@ -0,0 +1,556 @@ +import * as vscode from "vscode"; +import { ExtensionState } from "./extensionState"; +import { EnhancedIncident, Scope, ChatMessageType } from "@editor-extensions/shared"; +import { + type KaiWorkflowMessage, + type KaiInteractiveWorkflowInput, +} from "@editor-extensions/agentic"; +import { getConfigAgentMode } from "./utilities/configuration"; +import { executeExtensionCommand, executeDeferredWorkflowDisposal } from "./commands"; +import { processMessage } from "./utilities/ModifiedFiles/processMessage"; +import { MessageQueueManager } from "./utilities/ModifiedFiles/queueManager"; +import { v4 as uuidv4 } from "uuid"; +import type { Logger } from "winston"; + +/** + * Manages the lifecycle of a solution workflow session + */ +export class SolutionWorkflowOrchestrator { + private pendingInteractions = new Map void>(); + private workflow: any; + private queueManager?: MessageQueueManager; + private processedTokens = new Set(); + private modifiedFilesPromises: Array> = []; + private agentMode: boolean; + private workflowRunCompleted = false; // Track if workflow.run() has finished + + constructor( + private state: ExtensionState, + private logger: Logger, + private incidents: EnhancedIncident[], + ) { + this.agentMode = getConfigAgentMode(); + } + + /** + * Validate preconditions before starting workflow + */ + private validatePreconditions(): { valid: boolean; error?: string } { + if (this.state.data.isFetchingSolution) { + return { valid: false, error: "Solution already being fetched" }; + } + + if (this.state.data.configErrors.some((e) => e.type === "genai-disabled")) { + return { valid: false, error: "GenAI functionality is disabled." }; + } + + if (!this.state.modelProvider) { + return { + valid: false, + error: "Model provider is not configured. Please check your provider settings.", + }; + } + + const profileName = this.incidents[0]?.activeProfileName; + if (!profileName) { + return { valid: false, error: "No profile name found in incidents" }; + } + + return { valid: true }; + } + + /** + * Initialize the workflow session + */ + private async initializeWorkflow(): Promise { + this.logger.info("Initializing workflow", { + incidentsCount: this.incidents.length, + agentMode: this.agentMode, + }); + + // Initialize workflow manager + await this.state.workflowManager.init({ + modelProvider: this.state.modelProvider!, + workspaceDir: this.state.data.workspaceRoot, + solutionServerClient: this.state.solutionServerClient, + }); + + this.workflow = this.state.workflowManager.getWorkflow(); + this.logger.debug("Workflow initialized"); + } + + /** + * Clean up stale state from previous workflow runs + */ + private cleanupStaleState(): void { + this.logger.debug("Cleaning up stale state from previous runs"); + + // Clean up existing queue manager + if (this.state.currentQueueManager) { + this.state.currentQueueManager.dispose(); + this.state.currentQueueManager = undefined; + } + + // Clean up pending interactions + if (this.state.pendingInteractionsMap && this.state.pendingInteractionsMap.size > 0) { + this.state.pendingInteractionsMap.clear(); + this.state.pendingInteractionsMap = undefined; + } + + // Clean up resolver + if (this.state.resolvePendingInteraction) { + this.state.resolvePendingInteraction = undefined; + } + + // Clear modified files + this.state.modifiedFiles.clear(); + + // Reset state flags to prevent stale UI overlays + this.state.mutateData((draft) => { + draft.isWaitingForUserInteraction = false; + draft.isProcessingQueuedMessages = false; + draft.chatMessages = []; + }); + } + + /** + * Set up the queue manager for handling workflow messages + */ + private setupQueueManager(): void { + this.queueManager = new MessageQueueManager( + this.state, + this.workflow, + this.modifiedFilesPromises, + this.processedTokens, + this.pendingInteractions, + ); + + // Store references in state for external access + this.state.currentQueueManager = this.queueManager; + this.state.pendingInteractionsMap = this.pendingInteractions; + + // Set up the resolver function + this.state.resolvePendingInteraction = this.createInteractionResolver(); + + // Register onDrain handler to trigger cleanup when queue empties + // This catches cases where the queue drains naturally without resolver calls + this.queueManager.onDrain(() => { + this.handleQueueDrained(); + }); + } + + /** + * Handle queue drain event - check if cleanup should happen + * Called automatically when the queue becomes empty + */ + private handleQueueDrained(): void { + this.logger.debug("Queue drained, checking cleanup conditions", { + workflowRunCompleted: this.workflowRunCompleted, + pendingInteractionsSize: this.pendingInteractions.size, + isWaitingForUserInteraction: this.state.data.isWaitingForUserInteraction, + isFetchingSolution: this.state.data.isFetchingSolution, + }); + + // Check all cleanup conditions + const allComplete = + this.pendingInteractions.size === 0 && + !this.state.data.isWaitingForUserInteraction && + this.queueManager!.getQueueLength() === 0; + + if (allComplete && this.workflowRunCompleted && this.state.data.isFetchingSolution) { + this.logger.info("Queue drained and all conditions met - triggering cleanup"); + this.finalCleanup(); + } else { + this.logger.debug("Queue drained but not all conditions met for cleanup", { + allComplete, + workflowRunCompleted: this.workflowRunCompleted, + isFetchingSolution: this.state.data.isFetchingSolution, + }); + } + } + + /** + * Create the resolver function for pending interactions + */ + private createInteractionResolver() { + return (messageId: string, response: any): boolean => { + const resolver = this.pendingInteractions.get(messageId); + + if (!resolver) { + this.logger.error("Resolver not found for messageId", { + messageId, + availableResolvers: Array.from(this.pendingInteractions.keys()), + }); + return false; + } + + try { + this.pendingInteractions.delete(messageId); + resolver(response); + + const queueLength = this.state.currentQueueManager?.getQueueLength() ?? 0; + this.logger.debug("Interaction resolved", { + messageId, + remainingInteractions: this.pendingInteractions.size, + queueLength, + isWaitingForUserInteraction: this.state.data.isWaitingForUserInteraction, + }); + + // Check if all processing is complete (works for both modes) + const allComplete = + this.pendingInteractions.size === 0 && + !this.state.data.isWaitingForUserInteraction && + queueLength === 0; + + if (allComplete) { + // CRITICAL: Only cleanup if workflow.run() has actually completed! + // The workflow might be between phases (e.g., after yes/no question in agent mode) + // We must wait until workflow.run() promise resolves before cleanup + if (this.workflowRunCompleted && this.state.data.isFetchingSolution) { + this.logger.info("All interactions complete and workflow finished - cleaning up", { + agentMode: this.agentMode, + }); + this.finalCleanup(); + } else if (!this.workflowRunCompleted) { + this.logger.debug( + "Interactions complete but workflow still running - deferring cleanup", + { + pendingInteractionsSize: this.pendingInteractions.size, + queueLength, + }, + ); + } + } else if (this.pendingInteractions.size === 0 && queueLength > 0) { + this.logger.debug( + `Pending interactions cleared but ${queueLength} messages still in queue`, + ); + } + + return true; + } catch (error) { + this.logger.error("Error executing resolver", { messageId, error }); + return false; + } + }; + } + + /** + * Set up event listeners for workflow messages + */ + private setupEventListeners(): void { + this.workflow.removeAllListeners(); + + // Handle workflow messages + this.workflow.on("workflowMessage", async (msg: KaiWorkflowMessage) => { + await processMessage(msg, this.state, this.queueManager!); + }); + + // Handle workflow errors + this.workflow.on("error", (error: any) => { + this.logger.error("Workflow error:", error); + this.state.mutateData((draft) => { + draft.isFetchingSolution = false; + if (draft.solutionState === "started") { + draft.solutionState = "failedOnSending"; + } + }); + executeDeferredWorkflowDisposal(this.state, this.logger); + }); + } + + /** + * Execute the workflow + */ + private async executeWorkflow(): Promise { + const profileName = this.incidents[0]!.activeProfileName!; + + const input: KaiInteractiveWorkflowInput = { + incidents: this.incidents, + migrationHint: profileName, + programmingLanguage: "Java", + enableAgentMode: this.agentMode, + }; + + await this.workflow.run(input); + + this.logger.info("Workflow.run() completed", { + agentMode: this.agentMode, + pendingInteractionsCount: this.pendingInteractions.size, + queueLength: this.queueManager!.getQueueLength(), + isWaitingForUserInteraction: this.state.data.isWaitingForUserInteraction, + }); + + // Set flag to indicate we're processing queued messages + if (!this.agentMode && this.queueManager!.getQueueLength() > 0) { + this.state.mutateData((draft) => { + draft.isProcessingQueuedMessages = true; + }); + } + + // Wait for processing based on mode + await this.waitForProcessing(); + } + + /** + * Wait for message processing to complete based on agent mode + */ + private async waitForProcessing(): Promise { + if (this.agentMode) { + // In agent mode, workflow.run() has completed but messages might still be in queue + // Give a brief delay to ensure all messages are processed + await new Promise((resolve) => setTimeout(resolve, 500)); + + this.logger.info("After delay in agent mode", { + pendingInteractionsCount: this.pendingInteractions.size, + queueLength: this.queueManager!.getQueueLength(), + isWaitingForUserInteraction: this.state.data.isWaitingForUserInteraction, + }); + } else { + // In non-agent mode, give a delay to ensure messages are queued + // The queue manager will process them and wait for user interactions + await new Promise((resolve) => setTimeout(resolve, 500)); + + this.logger.info("After delay in non-agent mode", { + pendingInteractionsCount: this.pendingInteractions.size, + queueLength: this.queueManager!.getQueueLength(), + isWaitingForUserInteraction: this.state.data.isWaitingForUserInteraction, + chatMessagesCount: this.state.data.chatMessages.length, + }); + } + } + + /** + * Handle workflow execution errors + */ + private handleWorkflowError(err: unknown): void { + const errorMessage = err instanceof Error ? err.message : String(err); + const isStringLengthError = errorMessage.includes("Invalid string length"); + + this.logger.error("Error in running the agent", { errorMessage }); + + if (!isStringLengthError && err instanceof Error) { + this.logger.info("Error trace", { stack: err.stack }); + } else if (isStringLengthError) { + this.logger.error( + "Invalid string length error - likely due to logging large/circular objects in workflow", + ); + } + + // Update state + this.state.mutateData((draft) => { + draft.isFetchingSolution = false; + if (draft.solutionState === "started") { + draft.solutionState = "failedOnSending"; + } + if (isStringLengthError) { + draft.isWaitingForUserInteraction = false; + } + + // Add error message to chat + draft.chatMessages.push({ + messageToken: `error-${Date.now()}`, + kind: ChatMessageType.String, + value: { + message: isStringLengthError + ? "Error: Workflow failed due to internal logging issue. This typically happens with large analysis runs. Please try with fewer incidents at once." + : `Error: ${errorMessage}`, + }, + timestamp: new Date().toISOString(), + }); + }); + + // Clean up queue manager if string length error + if (isStringLengthError && this.queueManager) { + this.logger.info("Disposing queue manager due to workflow error"); + this.queueManager.dispose(); + } + + executeDeferredWorkflowDisposal(this.state, this.logger); + } + + /** + * Final cleanup - resets all state and disposes resources + * Called either from: + * - Agent mode: cleanupAfterExecution() when workflow completes and queue drains + * - Non-agent mode: createInteractionResolver() when last interaction resolves + */ + private finalCleanup(): void { + this.logger.info("Performing final cleanup"); + + this.state.mutateData((draft) => { + draft.isFetchingSolution = false; + draft.solutionState = "received"; + draft.isProcessingQueuedMessages = false; + draft.isAnalyzing = false; + draft.isAnalysisScheduled = false; + }); + + // Dispose queue manager + if (this.state.currentQueueManager) { + this.state.currentQueueManager.dispose(); + this.state.currentQueueManager = undefined; + } + + // Clear pending interactions + this.pendingInteractions.clear(); + this.state.pendingInteractionsMap = undefined; + this.state.resolvePendingInteraction = undefined; + + // Reset cache + this.state.kaiFsCache.reset(); + + // Clean up workflow resources + if (this.workflow) { + this.workflow.removeAllListeners(); + } + + // Dispose of workflow manager + if (this.state.workflowManager && this.state.workflowManager.dispose) { + this.state.workflowManager.dispose(); + } + + executeDeferredWorkflowDisposal(this.state, this.logger); + } + + /** + * Clean up resources after workflow execution + * The resolver will handle final cleanup when all conditions are met. + * This method just checks if we can cleanup immediately or need to wait. + */ + private cleanupAfterExecution(): void { + this.logger.info("Workflow execution finished", { + agentMode: this.agentMode, + pendingInteractionsCount: this.pendingInteractions.size, + queueLength: this.queueManager?.getQueueLength() ?? 0, + isWaitingForUserInteraction: this.state.data.isWaitingForUserInteraction, + }); + + // Check if we can cleanup immediately + const queueLength = this.queueManager?.getQueueLength() ?? 0; + const hasPendingInteractions = this.pendingInteractions.size > 0; + const canCleanupNow = + !this.state.data.isWaitingForUserInteraction && !hasPendingInteractions && queueLength === 0; + + if (canCleanupNow) { + this.logger.info("Queue empty - performing immediate cleanup"); + this.finalCleanup(); + } else { + this.logger.info("Deferring cleanup - waiting for queue to drain or user interactions", { + queueLength, + hasPendingInteractions, + isWaitingForUserInteraction: this.state.data.isWaitingForUserInteraction, + note: "Resolver will cleanup when conditions are met", + }); + + // Reset analysis flags but keep isFetchingSolution true + // The resolver will reset isFetchingSolution when everything is complete + this.state.mutateData((draft) => { + draft.isAnalyzing = false; + draft.isAnalysisScheduled = false; + }); + } + } + + /** + * Initialize state for a new workflow session + */ + private initializeState(): void { + const scope: Scope = { incidents: this.incidents }; + const clientId = uuidv4(); + + this.state.solutionServerClient.setClientId(clientId); + this.logger.debug("Client ID set", { clientId }); + + this.state.mutateData((draft) => { + draft.isFetchingSolution = true; + draft.solutionState = "started"; + draft.solutionScope = scope; + draft.chatMessages = []; + draft.activeDecorators = {}; + }); + } + + /** + * Main entry point to run the solution workflow + */ + async run(): Promise { + // Validate preconditions + const validation = this.validatePreconditions(); + if (!validation.valid) { + this.logger.info("Validation failed", { error: validation.error }); + if (validation.error?.includes("already being fetched")) { + vscode.window.showWarningMessage(validation.error); + } else { + vscode.window.showErrorMessage(validation.error!); + } + return; + } + + this.logger.info("Starting solution workflow", { + incidentsCount: this.incidents.length, + agentMode: this.agentMode, + }); + + // Show resolution panel + await executeExtensionCommand("showResolutionPanel"); + + // Initialize state + this.initializeState(); + + try { + // Initialize workflow + await this.initializeWorkflow(); + + // Clean up stale state + this.cleanupStaleState(); + + // Set up queue manager + this.setupQueueManager(); + + // Set up event listeners + this.setupEventListeners(); + + try { + // Execute workflow + await this.executeWorkflow(); + } catch (err) { + this.handleWorkflowError(err); + } finally { + // Mark that workflow.run() has completed + // This prevents premature cleanup while workflow is between phases + this.workflowRunCompleted = true; + + // cleanupAfterExecution() handles mode-specific cleanup: + // - Agent mode: cleans up if queue is drained + // - Non-agent mode: defers cleanup to interaction resolver + this.cleanupAfterExecution(); + } + + this.logger.info("Workflow execution complete", { + agentMode: this.agentMode, + pendingInteractionsCount: this.pendingInteractions.size, + queueLength: this.queueManager?.getQueueLength() ?? 0, + isWaitingForUserInteraction: this.state.data.isWaitingForUserInteraction, + }); + } catch (error: any) { + this.logger.error("Error in getSolution", { error }); + + this.state.mutateData((draft) => { + draft.solutionState = "failedOnSending"; + draft.isFetchingSolution = false; + draft.chatMessages.push({ + messageToken: `m${Date.now()}`, + kind: ChatMessageType.String, + value: { message: `Error: ${error instanceof Error ? error.message : String(error)}` }, + timestamp: new Date().toISOString(), + }); + }); + + executeDeferredWorkflowDisposal(this.state, this.logger); + + vscode.window.showErrorMessage( + `Failed to generate solution: ${error instanceof Error ? error.message : String(error)}`, + ); + } + } +} diff --git a/vscode/core/src/utilities/ModifiedFiles/handleFileResponse.ts b/vscode/core/src/utilities/ModifiedFiles/handleFileResponse.ts index 4f5f81bd..ef3b4446 100644 --- a/vscode/core/src/utilities/ModifiedFiles/handleFileResponse.ts +++ b/vscode/core/src/utilities/ModifiedFiles/handleFileResponse.ts @@ -100,14 +100,30 @@ export async function handleFileResponse( ): Promise { const logger = state.logger.child({ component: "handleFileResponse.handleFileResponse" }); try { + logger.info(`handleFileResponse called`, { + messageToken, + responseId, + path, + hasPendingInteraction: state.pendingInteractionsMap?.has(messageToken) ?? false, + totalPendingInteractions: state.pendingInteractionsMap?.size ?? 0, + }); + const messageIndex = state.data.chatMessages.findIndex( (msg) => msg.messageToken === messageToken, ); if (messageIndex === -1) { - state.logger - .child({ component: "handleFileResponse.handleFileResponse" }) - .error("Message token not found:", messageToken); + logger.error("Message token not found in chatMessages:", { + messageToken, + totalChatMessages: state.data.chatMessages.length, + chatMessageTokens: state.data.chatMessages.map((m) => m.messageToken), + }); + + // This might be a stale interaction - clean it up + if (state.resolvePendingInteraction) { + logger.warn("Attempting to resolve stale pending interaction"); + state.resolvePendingInteraction(messageToken, { responseId, path }); + } return; } diff --git a/vscode/core/src/utilities/ModifiedFiles/handleModifiedFile.ts b/vscode/core/src/utilities/ModifiedFiles/handleModifiedFile.ts index f399a5ae..5b5a570f 100644 --- a/vscode/core/src/utilities/ModifiedFiles/handleModifiedFile.ts +++ b/vscode/core/src/utilities/ModifiedFiles/handleModifiedFile.ts @@ -6,7 +6,11 @@ import { import { createTwoFilesPatch, createPatch } from "diff"; import { ExtensionState } from "src/extensionState"; import { Uri } from "vscode"; -import { ModifiedFileState, ChatMessageType } from "@editor-extensions/shared"; +import { + ModifiedFileState, + ChatMessageType, + ModifiedFileMessageValue, +} from "@editor-extensions/shared"; // Import path module for platform-agnostic path handling import { processModifiedFile } from "./processModifiedFile"; import { MessageQueueManager, handleUserInteractionComplete } from "./queueManager"; @@ -170,9 +174,52 @@ export const handleModifiedFileMessage = async ( // Set up the pending interaction using the same mechanism as UserInteraction messages // This ensures that handleFileResponse can properly trigger queue processing - await new Promise((resolve) => { - pendingInteractions.set(msg.id, async (response: any) => { + // + // CRITICAL: In non-agent mode, we DON'T await this Promise because: + // 1. The queue processor would block waiting for user response + // 2. User response comes from webview via handleFileResponse + // 3. This creates a potential deadlock + // + // Instead, we just set up the pending interaction and return immediately. + // The resolver will be called when the user responds via handleFileResponse. + const interactionPromise = new Promise((resolve) => { + // Set up a timeout to prevent stuck interactions (5 minutes) + const timeoutId = setTimeout(async () => { + console.error( + `ModifiedFile interaction timeout for ${filePath} (${msg.id}) - auto-resolving to prevent stuck state`, + ); + state.mutateData((draft) => { + draft.chatMessages.push({ + kind: ChatMessageType.String, + messageToken: `timeout-${msg.id}`, + timestamp: new Date().toISOString(), + value: { + message: `Warning: File modification for ${filePath} timed out waiting for user response. Continuing...`, + }, + }); + const i = draft.chatMessages.findIndex( + (m) => m.kind === ChatMessageType.ModifiedFile && m.messageToken === msg.id, + ); + if (i >= 0) { + (draft.chatMessages[i].value as ModifiedFileMessageValue).status = "rejected"; + } + }); + + if (pendingInteractions.has(msg.id)) { + pendingInteractions.delete(msg.id); + } + + // Use the centralized interaction completion handler to properly resume queue + await handleUserInteractionComplete(state, queueManager); + + resolve(); + }, 300000); // 5 minutes + + pendingInteractions.set(msg.id, async (_response: any) => { try { + // Clear the timeout since we got a real response + clearTimeout(timeoutId); + // Use the centralized interaction completion handler await handleUserInteractionComplete(state, queueManager); @@ -180,6 +227,7 @@ export const handleModifiedFileMessage = async ( pendingInteractions.delete(msg.id); resolve(); } catch (error) { + clearTimeout(timeoutId); console.error(`Error in ModifiedFile resolver for messageId: ${msg.id}:`, error); // Remove the entry from pendingInteractions to prevent memory leaks pendingInteractions.delete(msg.id); @@ -187,6 +235,11 @@ export const handleModifiedFileMessage = async ( } }); }); + + // Store the promise for cleanup but DON'T await it here + // This allows the queue processor to continue processing other messages + // The promise will resolve when the user responds + modifiedFilesPromises.push(interactionPromise); } } catch (err) { console.error(`Error in handleModifiedFileMessage for ${filePath}:`, err); diff --git a/vscode/core/src/utilities/ModifiedFiles/queueManager.ts b/vscode/core/src/utilities/ModifiedFiles/queueManager.ts index 5c71848f..dbc844f5 100644 --- a/vscode/core/src/utilities/ModifiedFiles/queueManager.ts +++ b/vscode/core/src/utilities/ModifiedFiles/queueManager.ts @@ -12,6 +12,7 @@ export class MessageQueueManager { private isProcessingQueue = false; private processingTimer: NodeJS.Timeout | null = null; private logger: Logger; + private onDrainCallback?: () => void; constructor( private state: ExtensionState, @@ -27,11 +28,21 @@ export class MessageQueueManager { }); } + /** + * Register a callback to be invoked when the queue drains (becomes empty) + */ + onDrain(callback: () => void): void { + this.onDrainCallback = callback; + } + /** * Adds a message to the queue */ enqueueMessage(message: KaiWorkflowMessage): void { this.messageQueue.push(message); + this.logger.debug( + `Message enqueued: ${message.type}, id: ${message.id}, queue length: ${this.messageQueue.length}`, + ); } /** @@ -86,18 +97,22 @@ export class MessageQueueManager { async processQueuedMessages(): Promise { // Prevent concurrent queue processing if (this.isProcessingQueue) { + this.logger.debug("Already processing queue, skipping"); return; } if (this.messageQueue.length === 0) { + this.logger.debug("Queue is empty, nothing to process"); return; } // Don't process if waiting for user interaction if (this.state.data.isWaitingForUserInteraction) { + this.logger.debug("Waiting for user interaction, skipping queue processing"); return; } + this.logger.info(`Starting queue processing, ${this.messageQueue.length} messages in queue`); this.isProcessingQueue = true; try { @@ -105,6 +120,9 @@ export class MessageQueueManager { while (this.messageQueue.length > 0 && !this.state.data.isWaitingForUserInteraction) { // Take the first message from queue const msg = this.messageQueue.shift()!; + this.logger.info( + `Processing message: ${msg.type}, id: ${msg.id}, remaining in queue: ${this.messageQueue.length}`, + ); try { // Call the core processing logic directly @@ -121,6 +139,9 @@ export class MessageQueueManager { // If this message triggered user interaction, stop processing if (this.state.data.isWaitingForUserInteraction) { + this.logger.info( + `Message ${msg.id} triggered user interaction, stopping queue processing`, + ); break; } } catch (error) { @@ -128,6 +149,8 @@ export class MessageQueueManager { // Continue processing other messages even if one fails } } + + this.logger.info(`Queue processing complete, ${this.messageQueue.length} messages remaining`); } catch (error) { this.logger.error("Error in queue processing:", error); @@ -144,6 +167,17 @@ export class MessageQueueManager { }); } finally { this.isProcessingQueue = false; + + // If queue is now empty and we have a drain callback, invoke it + // This allows the orchestrator to check if cleanup should happen + if (this.messageQueue.length === 0 && this.onDrainCallback) { + this.logger.debug("Queue drained, invoking onDrain callback"); + try { + this.onDrainCallback(); + } catch (error) { + this.logger.error("Error in onDrain callback:", error); + } + } } } @@ -171,7 +205,8 @@ export async function handleUserInteractionComplete( state: ExtensionState, queueManager: MessageQueueManager, ): Promise { - // Reset the waiting flag + // CRITICAL: Always reset the waiting flag to allow queue processing to continue + // Must set to false to unblock the queue processor state.mutateData((draft) => { draft.isWaitingForUserInteraction = false; }); diff --git a/webview-ui/src/components/AnalysisPage/AnalysisPage.tsx b/webview-ui/src/components/AnalysisPage/AnalysisPage.tsx index 1fe0a636..3d32d1d6 100644 --- a/webview-ui/src/components/AnalysisPage/AnalysisPage.tsx +++ b/webview-ui/src/components/AnalysisPage/AnalysisPage.tsx @@ -374,14 +374,18 @@ const AnalysisPage: React.FC = () => { - {(isWaitingForSolution || isWaitingForUserInteraction) && ( + {(isWaitingForSolution || + isWaitingForUserInteraction || + state.isProcessingQueuedMessages) && (
{isWaitingForUserInteraction ? "Waiting for user action..." - : "Waiting for solution confirmation..."} + : state.isProcessingQueuedMessages + ? "Processing solution..." + : "Waiting for solution confirmation..."}