Skip to content

Commit 5436518

Browse files
committed
fix: Add adaptive batch retry for Ollama connection errors
Implements automatic batch splitting when encountering EOF or connection errors during embedding generation. Starts with larger batches (100) for optimal performance and recursively splits batches on failure until successful or minimum batch size reached. Also adds callback-based parsing for AST splitter to handle files > 32KB.
1 parent 8fe50d7 commit 5436518

File tree

4 files changed

+142
-2078
lines changed

4 files changed

+142
-2078
lines changed

packages/core/src/context.ts

Lines changed: 64 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1231,9 +1231,12 @@ export class Context {
12311231
onFileProcessed?: (filePath: string, fileIndex: number, totalFiles: number) => void
12321232
): Promise<{ processedFiles: number; totalChunks: number; status: 'completed' | 'limit_reached' }> {
12331233
const isHybrid = this.getIsHybrid();
1234+
1235+
// Batch size for embedding processing
12341236
const EMBEDDING_BATCH_SIZE = Math.max(1, parseInt(envManager.get('EMBEDDING_BATCH_SIZE') || '100', 10));
1237+
console.log(`[Context] 🔧 Using batch size: ${EMBEDDING_BATCH_SIZE}`);
1238+
12351239
const CHUNK_LIMIT = 450000;
1236-
console.log(`[Context] 🔧 Using EMBEDDING_BATCH_SIZE: ${EMBEDDING_BATCH_SIZE}`);
12371240

12381241
let chunkBuffer: Array<{ chunk: CodeChunk; codebasePath: string }> = [];
12391242
let processedFiles = 0;
@@ -1335,15 +1338,72 @@ export class Context {
13351338
await this.processChunkBatch(chunks, codebasePath);
13361339
}
13371340

1341+
/**
1342+
* Detect if error is a connection/EOF error that might be resolved by smaller batches
1343+
*/
1344+
private isConnectionError(error: Error): boolean {
1345+
const errorMessage = error.message.toLowerCase();
1346+
return (
1347+
errorMessage.includes('eof') ||
1348+
errorMessage.includes('econnreset') ||
1349+
errorMessage.includes('socket hang up') ||
1350+
(errorMessage.includes('connection') && errorMessage.includes('reset')) ||
1351+
errorMessage.includes('fetch failed')
1352+
);
1353+
}
1354+
1355+
/**
1356+
* Adaptive batch embedding with automatic retry on smaller batches
1357+
* @param contents Array of text contents to embed
1358+
* @param minBatchSize Minimum batch size before giving up (default: 5)
1359+
* @returns Array of embedding vectors
1360+
*/
1361+
private async embedBatchAdaptive(
1362+
contents: string[],
1363+
minBatchSize: number = 5
1364+
): Promise<EmbeddingVector[]> {
1365+
try {
1366+
// Try with the full batch
1367+
return await this.embedding.embedBatch(contents);
1368+
} catch (error) {
1369+
if (!(error instanceof Error) || !this.isConnectionError(error)) {
1370+
// Not a connection error, re-throw
1371+
throw error;
1372+
}
1373+
1374+
// Connection error - try splitting the batch
1375+
if (contents.length <= minBatchSize) {
1376+
// Already at minimum batch size, can't split further
1377+
console.error(`[Context] ❌ Failed to embed batch of ${contents.length} chunks even at minimum batch size`);
1378+
throw error;
1379+
}
1380+
1381+
// Split batch in half and retry
1382+
const midpoint = Math.floor(contents.length / 2);
1383+
const firstHalf = contents.slice(0, midpoint);
1384+
const secondHalf = contents.slice(midpoint);
1385+
1386+
console.warn(`[Context] ⚠️ Batch embedding failed (${error.message}), splitting ${contents.length} chunks into ${firstHalf.length} + ${secondHalf.length}`);
1387+
1388+
// Recursively process both halves
1389+
const [firstResults, secondResults] = await Promise.all([
1390+
this.embedBatchAdaptive(firstHalf, minBatchSize),
1391+
this.embedBatchAdaptive(secondHalf, minBatchSize)
1392+
]);
1393+
1394+
return [...firstResults, ...secondResults];
1395+
}
1396+
}
1397+
13381398
/**
13391399
* Get cached embeddings or generate new ones
13401400
* @param chunkContents Array of chunk content strings
13411401
* @returns Array of embedding vectors
13421402
*/
13431403
private async getCachedOrGenerateEmbeddings(chunkContents: string[]): Promise<EmbeddingVector[]> {
13441404
if (!this.embeddingCache.isAvailable()) {
1345-
// Cache not available, use regular embedding generation
1346-
return await this.embedding.embedBatch(chunkContents);
1405+
// Cache not available, use adaptive embedding generation
1406+
return await this.embedBatchAdaptive(chunkContents);
13471407
}
13481408

13491409
// Generate content hashes for all chunks
@@ -1372,7 +1432,7 @@ export class Context {
13721432
// Generate embeddings for uncached chunks
13731433
if (uncachedContents.length > 0) {
13741434
console.log(`[Context] 🔄 Cache miss for ${uncachedContents.length}/${chunkContents.length} chunks, generating embeddings...`);
1375-
const newEmbeddings = await this.embedding.embedBatch(uncachedContents);
1435+
const newEmbeddings = await this.embedBatchAdaptive(uncachedContents);
13761436

13771437
// Store new embeddings in cache
13781438
const embeddingsToCache = new Map<string, number[]>();

packages/core/src/splitter/ast-splitter.ts

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,47 @@ export class AstCodeSplitter implements Splitter {
5555
try {
5656
console.log(`🌳 Using AST splitter for ${language} file: ${filePath || 'unknown'}`);
5757

58-
this.parser.setLanguage(langConfig.parser);
59-
const tree = this.parser.parse(code);
58+
// Validate input before parsing
59+
if (typeof code !== 'string') {
60+
console.warn(`[ASTSplitter] ⚠️ Code is not a string (type: ${typeof code}), falling back to LangChain: ${filePath || 'unknown'}`);
61+
return await this.langchainFallback.split(String(code || ''), language, filePath);
62+
}
63+
64+
if (code.length === 0) {
65+
console.log(`[ASTSplitter] Empty file, returning empty chunks: ${filePath || 'unknown'}`);
66+
return [];
67+
}
68+
69+
// Set language with explicit error handling
70+
try {
71+
this.parser.setLanguage(langConfig.parser);
72+
} catch (langError) {
73+
console.error(`[ASTSplitter] ❌ setLanguage failed for ${language}: ${langError}`);
74+
throw langError;
75+
}
76+
77+
// Parse with explicit error handling
78+
// Note: tree-sitter has a 32KB (32767 bytes) limit for string inputs
79+
// For larger files, we must use the callback-based API with chunked reads
80+
let tree;
81+
try {
82+
if (code.length > 32767) {
83+
// Use callback-based parsing for files > 32KB
84+
// Return chunks of at most 16KB to stay well under the 32KB limit
85+
const CHUNK_SIZE = 16384;
86+
tree = this.parser.parse((offset) => {
87+
if (offset >= code.length) return null;
88+
const end = Math.min(offset + CHUNK_SIZE, code.length);
89+
return code.slice(offset, end);
90+
});
91+
} else {
92+
// Use direct string parsing for smaller files
93+
tree = this.parser.parse(code);
94+
}
95+
} catch (parseError) {
96+
console.error(`[ASTSplitter] ❌ parse failed for ${language} (${filePath}): ${parseError}`);
97+
throw parseError;
98+
}
6099

61100
if (!tree.rootNode) {
62101
console.warn(`[ASTSplitter] ⚠️ Failed to parse AST for ${language}, falling back to LangChain: ${filePath || 'unknown'}`);

packages/mcp/src/embedding.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ export function createEmbeddingInstance(config: ContextMcpConfig): OpenAIEmbeddi
5454
model: config.embeddingModel,
5555
host: config.ollamaHost
5656
});
57-
console.log(`[EMBEDDING] ✅ Ollama embedding instance created successfully`);
57+
console.log(`[EMBEDDING] ✅ Ollama embedding instance created`);
5858
return ollamaEmbedding;
5959

6060
default:

0 commit comments

Comments
 (0)