Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions cli/commands/wp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,6 @@ import { isServerRunning, sendWpCliCommand } from 'cli/lib/wordpress-server-mana
import { Logger, LoggerError } from 'cli/logger';
import { GlobalOptions } from 'cli/types';

// Sending WP-CLI messages to the child process is controlled by this feature flag. We've disabled
// it until we can figure out the problems with race conditions and `MaxPhpInstancesError`s from
// Playground
const IS_WP_CLI_CHILD_PROCESS_EXECUTION_ENABLED = false;

const logger = new Logger< '' >();

export async function runCommand(
Expand All @@ -30,7 +25,7 @@ export async function runCommand(
// a different PHP version, pass the command to it…
const useCustomPhpVersion = options.phpVersion && options.phpVersion !== site.phpVersion;

if ( IS_WP_CLI_CHILD_PROCESS_EXECUTION_ENABLED && ! useCustomPhpVersion ) {
if ( ! useCustomPhpVersion ) {
try {
await connect();

Expand Down
4 changes: 2 additions & 2 deletions cli/lib/tests/pm2-manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ describe( 'PM2 Manager', () => {

const message: ManagerMessage = {
topic: 'stop-server',
messageId: 1,
messageId: '1',
};

await sendMessageToProcess( 42, message );
Expand All @@ -302,7 +302,7 @@ describe( 'PM2 Manager', () => {

const message: ManagerMessage = {
topic: 'stop-server',
messageId: 1,
messageId: '1',
};

await expect( sendMessageToProcess( 42, message ) ).rejects.toThrow( 'Send failed' );
Expand Down
6 changes: 3 additions & 3 deletions cli/lib/types/wordpress-server-ipc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ const _managerMessagePayloadSchema = z.discriminatedUnion( 'topic', [
] );
export type ManagerMessagePayload = z.infer< typeof _managerMessagePayloadSchema >;

const managerMessageBase = z.object( { messageId: z.number() } );
const managerMessageBase = z.object( { messageId: z.string() } );
export const managerMessageSchema = z.discriminatedUnion( 'topic', [
managerMessageBase.merge( managerMessageStartServer ),
managerMessageBase.merge( managerMessageRunBlueprint ),
Expand All @@ -75,13 +75,13 @@ const childMessageActivity = z.object( {
} );

const childMessageResult = z.object( {
originalMessageId: z.number(),
originalMessageId: z.string(),
topic: z.literal( 'result' ),
result: z.unknown(),
} );

const childMessageError = z.object( {
originalMessageId: z.number(),
originalMessageId: z.string(),
topic: z.literal( 'error' ),
errorMessage: z.string(),
errorStack: z.string().optional(),
Expand Down
10 changes: 6 additions & 4 deletions cli/lib/wordpress-server-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,8 @@ async function waitForReadyMessage( pmId: number ): Promise< void > {
* - Checks periodically for inactivity
* - Has both inactivity timeout and max total timeout
*/
let nextMessageId = 0;
const messageActivityTrackers = new Map<
number,
string,
{
activityCheckIntervalId: NodeJS.Timeout;
}
Expand All @@ -175,7 +174,7 @@ async function sendMessage(
): Promise< unknown > {
const { maxTotalElapsedTime = PLAYGROUND_CLI_MAX_TIMEOUT, logger } = options;
const bus = await getPm2Bus();
const messageId = nextMessageId++;
const messageId = crypto.randomUUID();
let responseHandler: ( packet: unknown ) => void;

return new Promise( ( resolve, reject ) => {
Expand Down Expand Up @@ -223,7 +222,10 @@ async function sendMessage(
} else if ( validPacket.raw.topic === 'console-message' ) {
lastActivityTimestamp = Date.now();
logger?.reportProgress( validPacket.raw.message );
} else if ( validPacket.raw.topic === 'error' ) {
} else if (
validPacket.raw.topic === 'error' &&
validPacket.raw.originalMessageId === messageId
) {
const error = new Error( validPacket.raw.errorMessage ) as Error & {
cliArgs?: Record< string, unknown >;
};
Expand Down
54 changes: 31 additions & 23 deletions cli/wordpress-server-child.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import { sanitizeRunCLIArgs } from 'common/lib/cli-args-sanitizer';
import { isWordPressDirectory } from 'common/lib/fs-utils';
import { getMuPlugins } from 'common/lib/mu-plugins';
import { formatPlaygroundCliMessage } from 'common/lib/playground-cli-messages';
import { sequential } from 'common/lib/sequential';
import { isWordPressDevVersion } from 'common/lib/wordpress-version-utils';
import { z } from 'zod';
import { getSqliteCommandPath, getWpCliPharPath } from 'cli/lib/server-files';
Expand Down Expand Up @@ -285,30 +286,31 @@ async function runBlueprint( config: ServerConfig ): Promise< void > {
}
}

async function runWpCliCommand(
args: string[]
): Promise< { stdout: string; stderr: string; exitCode: number } > {
Comment on lines -288 to -290
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const runWpCliCommand = sequential( 3, async ( args: string[] ): Promise< WpCliResult > => {

await Promise.allSettled( [ startingPromise ] );
const runWpCliCommand = sequential(
async ( args: string[] ): Promise< { stdout: string; stderr: string; exitCode: number } > => {
await Promise.allSettled( [ startingPromise ] );

if ( ! server ) {
throw new Error( `Failed to run WP CLI command because server is not running` );
}
if ( ! server ) {
throw new Error( `Failed to run WP CLI command because server is not running` );
}

const response = await server.playground.cli( [
'php',
'/tmp/wp-cli.phar',
`--path=${ await server.playground.documentRoot }`,
...args,
] );

return {
stdout: await response.stdoutText,
stderr: await response.stderrText,
exitCode: await response.exitCode,
};
}
const response = await server.playground.cli( [
'php',
'/tmp/wp-cli.phar',
`--path=${ await server.playground.documentRoot }`,
...args,
] );

return {
stdout: await response.stdoutText,
stderr: await response.stderrText,
exitCode: await response.exitCode,
};
},
{ concurrent: 3, max: 10 }
);

function sendErrorMessage( messageId: number, error: unknown ) {
function sendErrorMessage( messageId: string, error: unknown ) {
const errorResponse: ChildMessageRaw = {
originalMessageId: messageId,
topic: 'error',
Expand All @@ -325,7 +327,7 @@ async function ipcMessageHandler( packet: unknown ) {
if ( ! messageResult.success ) {
errorToConsole( 'Invalid message received:', messageResult.error );

const minimalMessageSchema = z.object( { id: z.number() } );
const minimalMessageSchema = z.object( { id: z.string() } );
const minimalMessage = minimalMessageSchema.safeParse( packet );
if ( minimalMessage.success ) {
sendErrorMessage( minimalMessage.data.id, messageResult.error );
Expand All @@ -349,7 +351,13 @@ async function ipcMessageHandler( packet: unknown ) {
result = await stopServer();
break;
case 'wp-cli-command':
result = await runWpCliCommand( validMessage.data.args );
try {
result = await runWpCliCommand( validMessage.data.args );
} catch ( wpCliError ) {
errorToConsole( `WP-CLI error:`, wpCliError );
sendErrorMessage( validMessage.messageId, wpCliError );
return; // Don't crash, just return error to caller
}
break;
default:
throw new Error( `Unknown message.` );
Expand Down
25 changes: 18 additions & 7 deletions common/lib/sequential.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,28 @@
const sequentialLocks = new Map< () => Promise< unknown >, Set< Promise< unknown > > >();
type SequentialOptions = {
concurrent?: number;
max?: number;
};

// Ensures that calls to the provided function are executed sequentially
export function sequential< Args extends unknown[], Return >(
fn: ( ...args: Args ) => Promise< Return >
fn: ( ...args: Args ) => Promise< Return >,
options?: SequentialOptions
) {
const concurrentCount = options?.concurrent ?? 1;
const maxQueueSize = options?.max;
const locks = new Set< Promise< Return > >();
let queueCount = 0;

return async ( ...args: Args ) => {
const locks = sequentialLocks.get( fn ) ?? new Set();
if ( ! sequentialLocks.has( fn ) ) {
sequentialLocks.set( fn, locks );
if ( maxQueueSize !== undefined && queueCount >= maxQueueSize ) {
throw new Error(
`Queue is full (${ maxQueueSize } pending commands). Please try again later.`
);
}

while ( locks.size ) {
while ( locks.size >= concurrentCount ) {
queueCount++;
await Promise.allSettled( [ ...locks ] );
queueCount--;
}

const fnPromise = fn( ...args );
Expand Down
Loading