Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions examples/stream-text.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
'use strict';

const { MailParser } = require('../index');
const fs = require('fs');
const emailFile = process.argv[2] || 'examples/simple.eml';

console.log('=== Configuration Options ===\n');

// 1. Traditional Mode (default)
const traditional = new MailParser(); // streamText: false (default)

traditional.on('data', (data) => {
if (data.type === 'text') {
console.log(' 📝 Final aggregated text:', data.text);
}
});

// 2. Streaming Mode
const streaming = new MailParser({ streamText: true });

streaming.on('data', (data) => {
if (data.type === 'text') {
console.log(` 🚀 Stream part (${data.contentType}):`);
data.content.pipe(process.stdout);
data.content.on('end', () => console.log(' [stream ended]'));
}
});

// Test both
if (fs.existsSync(emailFile)) {
console.log(`\nParsing ${emailFile}...\n`);

fs.createReadStream(emailFile).pipe(traditional);

setTimeout(() => {
fs.createReadStream(emailFile).pipe(streaming);
}, 100);
} else {
console.error(`File not found: ${emailFile}`);
}
63 changes: 40 additions & 23 deletions lib/mail-parser.js
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ class MailParser extends Transform {

this.textTypes = ['text/plain', 'text/html'].concat(!this.options.keepDeliveryStatus ? 'message/delivery-status' : []);

// Streaming configuration
this.streamText = this.options.streamText || false;

this.decoder = this.getDecoder();

this.splitter.on('readable', () => {
Expand Down Expand Up @@ -273,13 +276,14 @@ class MailParser extends Transform {

cleanup(done) {
let finish = () => {
try {
let t = this.getTextContent();
this.push(t);
} catch (err) {
return this.emit('error', err);
if (!this.streamText) {
try {
let t = this.getTextContent();
this.push(t);
} catch (err) {
return this.emit('error', err);
}
}

done();
};

Expand Down Expand Up @@ -878,8 +882,6 @@ class MailParser extends Transform {
this.push(attachment);
this.attachmentList.push(attachment);
} else if (node.disposition === 'inline') {
let chunks = [];
let chunklen = 0;
node.contentStream = node.decoder;

if (node.contentType === 'text/plain') {
Expand Down Expand Up @@ -919,24 +921,39 @@ class MailParser extends Transform {
}
}

node.contentStream.on('readable', () => {
let chunk;
while ((chunk = node.contentStream.read()) !== null) {
if (typeof chunk === 'string') {
chunk = Buffer.from(chunk);
if (this.streamText) {
let textPart = {
type: 'text',
content: node.contentStream, // Stream directly - no buffering!
contentType: node.contentType,
partId,
headers: node.headers
};

this.push(textPart);
} else {
let chunks = [];
let chunklen = 0;

node.contentStream.on('readable', () => {
let chunk;
while ((chunk = node.contentStream.read()) !== null) {
if (typeof chunk === 'string') {
chunk = Buffer.from(chunk);
}
chunks.push(chunk);
chunklen += chunk.length;
}
chunks.push(chunk);
chunklen += chunk.length;
}
});
});

node.contentStream.once('end', () => {
node.textContent = Buffer.concat(chunks, chunklen).toString().replace(/\r?\n/g, '\n');
});
node.contentStream.once('end', () => {
node.textContent = Buffer.concat(chunks, chunklen).toString().replace(/\r?\n/g, '\n');
});

node.contentStream.once('error', err => {
this.emit('error', err);
});
node.contentStream.once('error', err => {
this.emit('error', err);
});
}
}

break;
Expand Down
139 changes: 139 additions & 0 deletions test/mail-parser-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1899,3 +1899,142 @@ exports['Decoder already ended on cleanup'] = test => {
test.done();
});
};

exports['Text streaming mode'] = test => {
let mail = fs.readFileSync(__dirname + '/fixtures/mixed.eml');

test.expect(7); // Expecting multiple text parts from mixed email
let mailparser = new MailParser({
streamText: true // Enable streaming mode
});
mailparser.write(mail);
mailparser.end();

let textParts = [];
let streamedContent = '';

mailparser.on('data', data => {
if (data.type === 'text') {
textParts.push(data);

// Verify it's a stream
test.ok(data.content && typeof data.content.pipe === 'function', 'Text content should be a stream');

// Read the stream content
data.content.on('data', chunk => {
streamedContent += chunk.toString();
});

data.content.on('end', () => {
// Stream should be readable
test.ok(streamedContent.length > 0, 'Stream should contain content');
});
}
if (data.type === 'attachment') {
data.content.on('data', () => {});
data.content.on('end', () => data.release());
}
});

mailparser.on('error', err => {
test.done(err);
});

mailparser.on('end', () => {
test.ok(streamedContent.includes('This e-mail message has been scanned for Viruses and Content and cleared'), 'Streamed content should contain expected text');
test.done();
});
};

exports['Each text part streams separately'] = test => {
let mail = fs.readFileSync(__dirname + '/fixtures/mixed.eml');

test.expect(6); // 3 parts * 2 assertions each (stream check + content validation)
let mailparser = new MailParser({
streamText: true
});

mailparser.write(mail);
mailparser.end();

let textParts = [];
let expectedContents = [
'This e-mail message has been scanned for Viruses and Content and cleared',
'<HTML>',
'Good Morning'
];

mailparser.on('data', data => {
if (data.type === 'text') {
let partIndex = textParts.length;
textParts.push({
contentType: data.contentType,
index: partIndex
});

// Verify each part is a stream
test.ok(typeof data.content.pipe === 'function', `Part ${partIndex + 1} should be a stream`);

let partContent = '';
data.content.on('data', chunk => {
partContent += chunk.toString();
});

data.content.on('end', () => {
// Validate content based on part index
let expectedContent = expectedContents[partIndex];
test.ok(partContent.includes(expectedContent),
`Part ${partIndex + 1} should contain "${expectedContent}"`);
});
}
if (data.type === 'attachment') {
data.content.on('data', () => {});
data.content.on('end', () => data.release());
}
});

mailparser.on('end', () => {
test.done();
});

mailparser.on('error', err => {
test.done(err);
});
};

exports['Traditional mode still works'] = test => {
let mail = fs.readFileSync(__dirname + '/fixtures/mixed.eml');

test.expect(3);
let mailparser = new MailParser({
streamText: false // Traditional mode (also default)
});

mailparser.write(mail);
mailparser.end();

let textData = null;

mailparser.on('data', data => {
if (data.type === 'text') {
textData = data;

// Verify traditional properties exist
test.ok(data.text !== undefined, 'Should have text property');
test.ok(data.textAsHtml !== undefined, 'Should have textAsHtml property');
}
if (data.type === 'attachment') {
data.content.on('data', () => {});
data.content.on('end', () => data.release());
}
});

mailparser.on('error', err => {
test.done(err);
});

mailparser.on('end', () => {
test.ok(textData.text.includes('This e-mail message has been scanned for Viruses and Content and cleared'), 'Traditional text content should contain expected text');
test.done();
});
};