diff --git a/cli/commands/wp.ts b/cli/commands/wp.ts index 94a09666c..d6d11abb8 100644 --- a/cli/commands/wp.ts +++ b/cli/commands/wp.ts @@ -9,11 +9,6 @@ import { isServerRunning, sendWpCliCommand } from 'cli/lib/wordpress-server-mana import { Logger, LoggerError } from 'cli/logger'; import { GlobalOptions } from 'cli/types'; -// Sending WP-CLI messages to the child process is controlled by this feature flag. We've disabled -// it until we can figure out the problems with race conditions and `MaxPhpInstancesError`s from -// Playground -const IS_WP_CLI_CHILD_PROCESS_EXECUTION_ENABLED = false; - const logger = new Logger< '' >(); export async function runCommand( @@ -30,7 +25,7 @@ export async function runCommand( // a different PHP version, pass the command to it… const useCustomPhpVersion = options.phpVersion && options.phpVersion !== site.phpVersion; - if ( IS_WP_CLI_CHILD_PROCESS_EXECUTION_ENABLED && ! useCustomPhpVersion ) { + if ( ! useCustomPhpVersion ) { try { await connect(); diff --git a/cli/lib/tests/pm2-manager.test.ts b/cli/lib/tests/pm2-manager.test.ts index 0eff6dd49..5bc248649 100644 --- a/cli/lib/tests/pm2-manager.test.ts +++ b/cli/lib/tests/pm2-manager.test.ts @@ -281,7 +281,7 @@ describe( 'PM2 Manager', () => { const message: ManagerMessage = { topic: 'stop-server', - messageId: 1, + messageId: '1', }; await sendMessageToProcess( 42, message ); @@ -302,7 +302,7 @@ describe( 'PM2 Manager', () => { const message: ManagerMessage = { topic: 'stop-server', - messageId: 1, + messageId: '1', }; await expect( sendMessageToProcess( 42, message ) ).rejects.toThrow( 'Send failed' ); diff --git a/cli/lib/types/wordpress-server-ipc.ts b/cli/lib/types/wordpress-server-ipc.ts index 3c4c0bda8..41308a089 100644 --- a/cli/lib/types/wordpress-server-ipc.ts +++ b/cli/lib/types/wordpress-server-ipc.ts @@ -56,7 +56,7 @@ const _managerMessagePayloadSchema = z.discriminatedUnion( 'topic', [ ] ); export type ManagerMessagePayload = z.infer< typeof _managerMessagePayloadSchema >; -const managerMessageBase = z.object( { messageId: z.number() } ); +const managerMessageBase = z.object( { messageId: z.string() } ); export const managerMessageSchema = z.discriminatedUnion( 'topic', [ managerMessageBase.merge( managerMessageStartServer ), managerMessageBase.merge( managerMessageRunBlueprint ), @@ -75,13 +75,13 @@ const childMessageActivity = z.object( { } ); const childMessageResult = z.object( { - originalMessageId: z.number(), + originalMessageId: z.string(), topic: z.literal( 'result' ), result: z.unknown(), } ); const childMessageError = z.object( { - originalMessageId: z.number(), + originalMessageId: z.string(), topic: z.literal( 'error' ), errorMessage: z.string(), errorStack: z.string().optional(), diff --git a/cli/lib/wordpress-server-manager.ts b/cli/lib/wordpress-server-manager.ts index 544ce17ac..d83954a63 100644 --- a/cli/lib/wordpress-server-manager.ts +++ b/cli/lib/wordpress-server-manager.ts @@ -155,9 +155,8 @@ async function waitForReadyMessage( pmId: number ): Promise< void > { * - Checks periodically for inactivity * - Has both inactivity timeout and max total timeout */ -let nextMessageId = 0; const messageActivityTrackers = new Map< - number, + string, { activityCheckIntervalId: NodeJS.Timeout; } @@ -175,7 +174,7 @@ async function sendMessage( ): Promise< unknown > { const { maxTotalElapsedTime = PLAYGROUND_CLI_MAX_TIMEOUT, logger } = options; const bus = await getPm2Bus(); - const messageId = nextMessageId++; + const messageId = crypto.randomUUID(); let responseHandler: ( packet: unknown ) => void; return new Promise( ( resolve, reject ) => { @@ -223,7 +222,10 @@ async function sendMessage( } else if ( validPacket.raw.topic === 'console-message' ) { lastActivityTimestamp = Date.now(); logger?.reportProgress( validPacket.raw.message ); - } else if ( validPacket.raw.topic === 'error' ) { + } else if ( + validPacket.raw.topic === 'error' && + validPacket.raw.originalMessageId === messageId + ) { const error = new Error( validPacket.raw.errorMessage ) as Error & { cliArgs?: Record< string, unknown >; }; diff --git a/cli/wordpress-server-child.ts b/cli/wordpress-server-child.ts index efbdda133..72472ea8f 100644 --- a/cli/wordpress-server-child.ts +++ b/cli/wordpress-server-child.ts @@ -25,6 +25,7 @@ import { sanitizeRunCLIArgs } from 'common/lib/cli-args-sanitizer'; import { isWordPressDirectory } from 'common/lib/fs-utils'; import { getMuPlugins } from 'common/lib/mu-plugins'; import { formatPlaygroundCliMessage } from 'common/lib/playground-cli-messages'; +import { sequential } from 'common/lib/sequential'; import { isWordPressDevVersion } from 'common/lib/wordpress-version-utils'; import { z } from 'zod'; import { getSqliteCommandPath, getWpCliPharPath } from 'cli/lib/server-files'; @@ -285,30 +286,31 @@ async function runBlueprint( config: ServerConfig ): Promise< void > { } } -async function runWpCliCommand( - args: string[] -): Promise< { stdout: string; stderr: string; exitCode: number } > { - await Promise.allSettled( [ startingPromise ] ); +const runWpCliCommand = sequential( + async ( args: string[] ): Promise< { stdout: string; stderr: string; exitCode: number } > => { + await Promise.allSettled( [ startingPromise ] ); - if ( ! server ) { - throw new Error( `Failed to run WP CLI command because server is not running` ); - } + if ( ! server ) { + throw new Error( `Failed to run WP CLI command because server is not running` ); + } - const response = await server.playground.cli( [ - 'php', - '/tmp/wp-cli.phar', - `--path=${ await server.playground.documentRoot }`, - ...args, - ] ); - - return { - stdout: await response.stdoutText, - stderr: await response.stderrText, - exitCode: await response.exitCode, - }; -} + const response = await server.playground.cli( [ + 'php', + '/tmp/wp-cli.phar', + `--path=${ await server.playground.documentRoot }`, + ...args, + ] ); + + return { + stdout: await response.stdoutText, + stderr: await response.stderrText, + exitCode: await response.exitCode, + }; + }, + { concurrent: 3, max: 10 } +); -function sendErrorMessage( messageId: number, error: unknown ) { +function sendErrorMessage( messageId: string, error: unknown ) { const errorResponse: ChildMessageRaw = { originalMessageId: messageId, topic: 'error', @@ -325,7 +327,7 @@ async function ipcMessageHandler( packet: unknown ) { if ( ! messageResult.success ) { errorToConsole( 'Invalid message received:', messageResult.error ); - const minimalMessageSchema = z.object( { id: z.number() } ); + const minimalMessageSchema = z.object( { id: z.string() } ); const minimalMessage = minimalMessageSchema.safeParse( packet ); if ( minimalMessage.success ) { sendErrorMessage( minimalMessage.data.id, messageResult.error ); @@ -349,7 +351,13 @@ async function ipcMessageHandler( packet: unknown ) { result = await stopServer(); break; case 'wp-cli-command': - result = await runWpCliCommand( validMessage.data.args ); + try { + result = await runWpCliCommand( validMessage.data.args ); + } catch ( wpCliError ) { + errorToConsole( `WP-CLI error:`, wpCliError ); + sendErrorMessage( validMessage.messageId, wpCliError ); + return; // Don't crash, just return error to caller + } break; default: throw new Error( `Unknown message.` ); diff --git a/common/lib/sequential.ts b/common/lib/sequential.ts index 593ee246d..2667feb70 100644 --- a/common/lib/sequential.ts +++ b/common/lib/sequential.ts @@ -1,17 +1,28 @@ -const sequentialLocks = new Map< () => Promise< unknown >, Set< Promise< unknown > > >(); +type SequentialOptions = { + concurrent?: number; + max?: number; +}; -// Ensures that calls to the provided function are executed sequentially export function sequential< Args extends unknown[], Return >( - fn: ( ...args: Args ) => Promise< Return > + fn: ( ...args: Args ) => Promise< Return >, + options?: SequentialOptions ) { + const concurrentCount = options?.concurrent ?? 1; + const maxQueueSize = options?.max; + const locks = new Set< Promise< Return > >(); + let queueCount = 0; + return async ( ...args: Args ) => { - const locks = sequentialLocks.get( fn ) ?? new Set(); - if ( ! sequentialLocks.has( fn ) ) { - sequentialLocks.set( fn, locks ); + if ( maxQueueSize !== undefined && queueCount >= maxQueueSize ) { + throw new Error( + `Queue is full (${ maxQueueSize } pending commands). Please try again later.` + ); } - while ( locks.size ) { + while ( locks.size >= concurrentCount ) { + queueCount++; await Promise.allSettled( [ ...locks ] ); + queueCount--; } const fnPromise = fn( ...args );