diff --git a/README.md b/README.md index c40a3d6..5a3781d 100644 --- a/README.md +++ b/README.md @@ -16,9 +16,9 @@ npm install @humanwhocodes/csv-tools ## Usage -This package exports two main functions for working with CSV data via `ReadableStream` objects: +This package exports two main functions for working with CSV data via `ReadableStreamDefaultReader` objects: -### `countRows(stream, options)` +### `countRows(reader, options)` Counts rows in a CSV file with configurable options. @@ -31,28 +31,30 @@ import { ReadableStream } from "node:stream/web"; const fileStream = createReadStream("data.csv"); const webStream = ReadableStream.from(fileStream); +const reader = webStream.getReader(); // Count only data rows (exclude header) -const dataRowCount = await countRows(webStream); +const dataRowCount = await countRows(reader); console.log(`Found ${dataRowCount} data rows`); // Count all rows including header const fileStream2 = createReadStream("data.csv"); const webStream2 = ReadableStream.from(fileStream2); -const totalRowCount = await countRows(webStream2, { countHeaderRow: true }); +const reader2 = webStream2.getReader(); +const totalRowCount = await countRows(reader2, { countHeaderRow: true }); console.log(`Found ${totalRowCount} total rows`); ``` **Parameters:** -- `stream` (`ReadableStream`) - A readable stream containing CSV data +- `reader` (`ReadableStreamDefaultReader`) - A readable stream reader containing CSV data - `options` (`Object`, optional) - Configuration options - `countHeaderRow` (`boolean`, default: `false`) - Whether to count the header row - `countEmptyRows` (`boolean`, default: `false`) - Whether to count empty rows **Returns:** `Promise` - The count of rows in the CSV file -### `chunk(stream, options)` +### `chunk(reader, options)` An async generator function that yields strings of mini CSV files. Each chunk contains the header row followed by up to `chunkSize` data rows. @@ -65,9 +67,10 @@ import { ReadableStream } from "node:stream/web"; const fileStream = createReadStream("data.csv"); const webStream = ReadableStream.from(fileStream); +const reader = webStream.getReader(); // Process CSV in chunks of 50 rows -for await (const csvChunk of chunk(webStream, { chunkSize: 50 })) { +for await (const csvChunk of chunk(reader, { chunkSize: 50 })) { // Each csvChunk is a string with header + up to 50 data rows console.log("Processing chunk:"); console.log(csvChunk); @@ -77,7 +80,7 @@ for await (const csvChunk of chunk(webStream, { chunkSize: 50 })) { **Parameters:** -- `stream` (`ReadableStream`) - A readable stream containing CSV data +- `reader` (`ReadableStreamDefaultReader`) - A readable stream reader containing CSV data - `options` (`Object`) - Configuration options - `chunkSize` (`number`, optional) - Number of data rows per chunk. Default: 100 - `includeEmptyRows` (`boolean`, optional) - Whether to include empty rows. Default: false @@ -92,45 +95,15 @@ import { countRows, chunk } from "@humanwhocodes/csv-tools"; // Fetch CSV from URL const response = await fetch("https://example.com/data.csv"); const reader = response.body.getReader(); -const stream = new ReadableStream({ - start(controller) { - return pump(); - function pump() { - return reader.read().then(({ done, value }) => { - if (done) { - controller.close(); - return; - } - controller.enqueue(value); - return pump(); - }); - } - }, -}); // Count rows -const rowCount = await countRows(stream); +const rowCount = await countRows(reader); console.log(`Total rows: ${rowCount}`); // Or process in chunks const response2 = await fetch("https://example.com/data.csv"); const reader2 = response2.body.getReader(); -const stream2 = new ReadableStream({ - start(controller) { - return pump(); - function pump() { - return reader2.read().then(({ done, value }) => { - if (done) { - controller.close(); - return; - } - controller.enqueue(value); - return pump(); - }); - } - }, -}); -for await (const csvChunk of chunk(stream2, { chunkSize: 100 })) { +for await (const csvChunk of chunk(reader2, { chunkSize: 100 })) { // Process each chunk await processData(csvChunk); } diff --git a/src/index.js b/src/index.js index 420daac..1709c65 100644 --- a/src/index.js +++ b/src/index.js @@ -5,69 +5,75 @@ /* @ts-self-types="./index.d.ts" */ +/** + * Helper function to make a ReadableStreamDefaultReader async iterable + * @param {ReadableStreamDefaultReader} reader - The reader to make iterable + * @returns {AsyncGenerator} An async generator that yields values from the reader + */ +async function* makeReaderIterable(reader) { + while (true) { + const { done, value } = await reader.read(); + + if (done) { + break; + } + + yield value; + } +} + /** * Counts rows in a CSV file with configurable options. - * @param {ReadableStream} stream - The readable stream containing CSV data + * @param {ReadableStreamDefaultReader} reader - The readable stream reader containing CSV data * @param {Object} options - Options for counting * @param {boolean} [options.countHeaderRow=false] - Whether to count the header row * @param {boolean} [options.countEmptyRows=false] - Whether to count empty rows * @returns {Promise} The count of rows */ -export async function countRows(stream, options = {}) { +export async function countRows(reader, options = {}) { const { countHeaderRow = false, countEmptyRows = false } = options; - const reader = stream.getReader(); const decoder = new TextDecoder(); let buffer = ""; let rowCount = 0; let isFirstRow = true; - try { - while (true) { - const { done, value } = await reader.read(); - - if (done) { - // Process any remaining data in the buffer - if (buffer.length > 0) { - const isEmpty = buffer.trim() === ""; - - if (!isEmpty || countEmptyRows) { - if (!isFirstRow || countHeaderRow) { - rowCount++; - } - } - } - - break; - } - - buffer += decoder.decode(value, { stream: true }); + for await (const value of makeReaderIterable(reader)) { + buffer += decoder.decode(value, { stream: true }); - // Process complete lines - let newlineIndex; + // Process complete lines + let newlineIndex; - while ((newlineIndex = buffer.indexOf("\n")) !== -1) { - const line = buffer.substring(0, newlineIndex); + while ((newlineIndex = buffer.indexOf("\n")) !== -1) { + const line = buffer.substring(0, newlineIndex); - buffer = buffer.substring(newlineIndex + 1); + buffer = buffer.substring(newlineIndex + 1); - const isEmpty = line.trim() === ""; + const isEmpty = line.trim() === ""; - // Count based on options - if (!isEmpty || countEmptyRows) { - if (isFirstRow) { - if (countHeaderRow) { - rowCount++; - } - - isFirstRow = false; - } else { + // Count based on options + if (!isEmpty || countEmptyRows) { + if (isFirstRow) { + if (countHeaderRow) { rowCount++; } + + isFirstRow = false; + } else { + rowCount++; } } } - } finally { - reader.releaseLock(); + } + + // Process any remaining data in the buffer + if (buffer.length > 0) { + const isEmpty = buffer.trim() === ""; + + if (!isEmpty || countEmptyRows) { + if (!isFirstRow || countHeaderRow) { + rowCount++; + } + } } return rowCount; @@ -76,80 +82,69 @@ export async function countRows(stream, options = {}) { /** * A generator function that yields strings of mini CSV files. * Each chunk is a string containing the header row followed by chunkSize data rows. - * @param {ReadableStream} stream - The readable stream containing CSV data + * @param {ReadableStreamDefaultReader} reader - The readable stream reader containing CSV data * @param {Object} options - Options for chunking * @param {number} [options.chunkSize=100] - Number of data rows per chunk * @param {boolean} [options.includeEmptyRows=false] - Whether to include empty rows * @returns {AsyncGenerator} Generator yielding CSV chunks */ -export async function* chunk(stream, options = {}) { +export async function* chunk(reader, options = {}) { const { chunkSize = 100, includeEmptyRows = false } = options; - const reader = stream.getReader(); const decoder = new TextDecoder(); let buffer = ""; let header = null; let currentChunk = []; - try { - while (true) { - const { done, value } = await reader.read(); - - if (done) { - // Process any remaining data in the buffer - if (buffer.length > 0) { - const isEmpty = buffer.trim() === ""; - - if (header === null) { - if (!isEmpty || includeEmptyRows) { - header = buffer.trim(); - } - } else { - if (!isEmpty || includeEmptyRows) { - currentChunk.push(buffer.trim()); - } - } - } - - // Yield any remaining rows - if (currentChunk.length > 0 && header !== null) { - yield header + "\n" + currentChunk.join("\n"); - } + for await (const value of makeReaderIterable(reader)) { + buffer += decoder.decode(value, { stream: true }); - break; - } + // Process complete lines + let newlineIndex; - buffer += decoder.decode(value, { stream: true }); + while ((newlineIndex = buffer.indexOf("\n")) !== -1) { + const line = buffer.substring(0, newlineIndex); - // Process complete lines - let newlineIndex; + buffer = buffer.substring(newlineIndex + 1); - while ((newlineIndex = buffer.indexOf("\n")) !== -1) { - const line = buffer.substring(0, newlineIndex); + const isEmpty = line.trim() === ""; - buffer = buffer.substring(newlineIndex + 1); + // Skip empty lines unless includeEmptyRows is true + if (isEmpty && !includeEmptyRows) { + continue; + } - const isEmpty = line.trim() === ""; + // First non-empty line is the header + if (header === null) { + header = line.trim(); + } else { + currentChunk.push(line.trim()); - // Skip empty lines unless includeEmptyRows is true - if (isEmpty && !includeEmptyRows) { - continue; + // Yield chunk when it reaches the specified size + if (currentChunk.length === chunkSize) { + yield header + "\n" + currentChunk.join("\n"); + currentChunk = []; } + } + } + } - // First non-empty line is the header - if (header === null) { - header = line.trim(); - } else { - currentChunk.push(line.trim()); + // Process any remaining data in the buffer + if (buffer.length > 0) { + const isEmpty = buffer.trim() === ""; - // Yield chunk when it reaches the specified size - if (currentChunk.length === chunkSize) { - yield header + "\n" + currentChunk.join("\n"); - currentChunk = []; - } - } + if (header === null) { + if (!isEmpty || includeEmptyRows) { + header = buffer.trim(); + } + } else { + if (!isEmpty || includeEmptyRows) { + currentChunk.push(buffer.trim()); } } - } finally { - reader.releaseLock(); + } + + // Yield any remaining rows + if (currentChunk.length > 0 && header !== null) { + yield header + "\n" + currentChunk.join("\n"); } } diff --git a/tests/index.test.js b/tests/index.test.js index 44ab0cf..56d3204 100644 --- a/tests/index.test.js +++ b/tests/index.test.js @@ -7,20 +7,22 @@ import assert from "node:assert"; import { countRows, chunk } from "../dist/index.js"; /** - * Helper function to create a ReadableStream from a string - * @param {string} text - The text to convert to a stream - * @returns {ReadableStream} A readable stream + * Helper function to create a ReadableStreamDefaultReader from a string + * @param {string} text - The text to convert to a stream reader + * @returns {ReadableStreamDefaultReader} A readable stream reader */ -function createStreamFromString(text) { +function createReaderFromString(text) { const encoder = new TextEncoder(); const data = encoder.encode(text); - return new ReadableStream({ + const stream = new ReadableStream({ start(controller) { controller.enqueue(data); controller.close(); }, }); + + return stream.getReader(); } describe("chunk", () => { @@ -31,10 +33,10 @@ describe("chunk", () => { csv += `Person${i},${20 + i}\n`; } - const stream = createStreamFromString(csv); + const reader = createReaderFromString(csv); const chunks = []; - for await (const chunkData of chunk(stream)) { + for await (const chunkData of chunk(reader)) { chunks.push(chunkData); } @@ -61,10 +63,10 @@ describe("chunk", () => { it("should yield chunks with custom chunkSize", async () => { const csv = "name,age\nAlice,30\nBob,25\nCharlie,35\nDavid,40\nEve,28"; - const stream = createStreamFromString(csv); + const reader = createReaderFromString(csv); const chunks = []; - for await (const chunkData of chunk(stream, { chunkSize: 2 })) { + for await (const chunkData of chunk(reader, { chunkSize: 2 })) { chunks.push(chunkData); } @@ -82,10 +84,10 @@ describe("chunk", () => { it("should handle CSV with only header", async () => { const csv = "name,age"; - const stream = createStreamFromString(csv); + const reader = createReaderFromString(csv); const chunks = []; - for await (const chunkData of chunk(stream)) { + for await (const chunkData of chunk(reader)) { chunks.push(chunkData); } @@ -94,10 +96,10 @@ describe("chunk", () => { it("should handle empty CSV", async () => { const csv = ""; - const stream = createStreamFromString(csv); + const reader = createReaderFromString(csv); const chunks = []; - for await (const chunkData of chunk(stream)) { + for await (const chunkData of chunk(reader)) { chunks.push(chunkData); } @@ -106,10 +108,10 @@ describe("chunk", () => { it("should handle CSV with single data row", async () => { const csv = "name,age\nAlice,30"; - const stream = createStreamFromString(csv); + const reader = createReaderFromString(csv); const chunks = []; - for await (const chunkData of chunk(stream)) { + for await (const chunkData of chunk(reader)) { chunks.push(chunkData); } @@ -119,10 +121,10 @@ describe("chunk", () => { it("should skip empty lines", async () => { const csv = "name,age\nAlice,30\n\nBob,25\n\n\nCharlie,35"; - const stream = createStreamFromString(csv); + const reader = createReaderFromString(csv); const chunks = []; - for await (const chunkData of chunk(stream, { chunkSize: 2 })) { + for await (const chunkData of chunk(reader, { chunkSize: 2 })) { chunks.push(chunkData); } @@ -133,10 +135,10 @@ describe("chunk", () => { it("should not include trailing newlines in chunks", async () => { const csv = "name,age\nAlice,30\nBob,25\n"; - const stream = createStreamFromString(csv); + const reader = createReaderFromString(csv); const chunks = []; - for await (const chunkData of chunk(stream, { chunkSize: 2 })) { + for await (const chunkData of chunk(reader, { chunkSize: 2 })) { chunks.push(chunkData); } @@ -146,10 +148,10 @@ describe("chunk", () => { it("should handle exact multiple of chunkSize", async () => { const csv = "name,age\nAlice,30\nBob,25\nCharlie,35\nDavid,40"; - const stream = createStreamFromString(csv); + const reader = createReaderFromString(csv); const chunks = []; - for await (const chunkData of chunk(stream, { chunkSize: 2 })) { + for await (const chunkData of chunk(reader, { chunkSize: 2 })) { chunks.push(chunkData); } @@ -161,10 +163,10 @@ describe("chunk", () => { it("should preserve header in each chunk", async () => { const csv = "id,name,email\n1,Alice,alice@example.com\n2,Bob,bob@example.com\n3,Charlie,charlie@example.com"; - const stream = createStreamFromString(csv); + const reader = createReaderFromString(csv); const chunks = []; - for await (const chunkData of chunk(stream, { chunkSize: 1 })) { + for await (const chunkData of chunk(reader, { chunkSize: 1 })) { chunks.push(chunkData); } @@ -179,10 +181,10 @@ describe("chunk", () => { it("should include empty rows when includeEmptyRows is true", async () => { const csv = "name,age\nAlice,30\n\nBob,25\n\nCharlie,35"; - const stream = createStreamFromString(csv); + const reader = createReaderFromString(csv); const chunks = []; - for await (const chunkData of chunk(stream, { + for await (const chunkData of chunk(reader, { chunkSize: 3, includeEmptyRows: true, })) { @@ -203,10 +205,10 @@ describe("chunk", () => { it("should skip empty rows by default", async () => { const csv = "name,age\nAlice,30\n\nBob,25\n\nCharlie,35"; - const stream = createStreamFromString(csv); + const reader = createReaderFromString(csv); const chunks = []; - for await (const chunkData of chunk(stream, { chunkSize: 3 })) { + for await (const chunkData of chunk(reader, { chunkSize: 3 })) { chunks.push(chunkData); } @@ -225,40 +227,40 @@ describe("chunk", () => { describe("countRows", () => { it("should count data rows excluding header by default", async () => { const csv = "name,age\nAlice,30\nBob,25\nCharlie,35"; - const stream = createStreamFromString(csv); - const count = await countRows(stream); + const reader = createReaderFromString(csv); + const count = await countRows(reader); assert.strictEqual(count, 3); }); it("should count header row when countHeaderRow is true", async () => { const csv = "name,age\nAlice,30\nBob,25\nCharlie,35"; - const stream = createStreamFromString(csv); - const count = await countRows(stream, { countHeaderRow: true }); + const reader = createReaderFromString(csv); + const count = await countRows(reader, { countHeaderRow: true }); assert.strictEqual(count, 4); }); it("should not count empty rows by default", async () => { const csv = "name,age\nAlice,30\n\nBob,25\n\nCharlie,35"; - const stream = createStreamFromString(csv); - const count = await countRows(stream); + const reader = createReaderFromString(csv); + const count = await countRows(reader); assert.strictEqual(count, 3); }); it("should count empty rows when countEmptyRows is true", async () => { const csv = "name,age\nAlice,30\n\nBob,25\n\nCharlie,35"; - const stream = createStreamFromString(csv); - const count = await countRows(stream, { countEmptyRows: true }); + const reader = createReaderFromString(csv); + const count = await countRows(reader, { countEmptyRows: true }); assert.strictEqual(count, 5); // 3 data rows + 2 empty rows }); it("should count both header and empty rows when both options are true", async () => { const csv = "name,age\nAlice,30\n\nBob,25\n\nCharlie,35"; - const stream = createStreamFromString(csv); - const count = await countRows(stream, { + const reader = createReaderFromString(csv); + const count = await countRows(reader, { countHeaderRow: true, countEmptyRows: true, }); @@ -268,48 +270,48 @@ describe("countRows", () => { it("should not count trailing newlines as rows", async () => { const csv = "name,age\nAlice,30\nBob,25\n"; - const stream = createStreamFromString(csv); - const count = await countRows(stream); + const reader = createReaderFromString(csv); + const count = await countRows(reader); assert.strictEqual(count, 2); }); it("should not count trailing newlines as empty rows even when countEmptyRows is true", async () => { const csv = "name,age\nAlice,30\nBob,25\n"; - const stream = createStreamFromString(csv); - const count = await countRows(stream, { countEmptyRows: true }); + const reader = createReaderFromString(csv); + const count = await countRows(reader, { countEmptyRows: true }); assert.strictEqual(count, 2); // 2 data rows, trailing newline is not a row }); it("should count multiple consecutive empty rows when countEmptyRows is true", async () => { const csv = "name,age\nAlice,30\n\n\nBob,25"; - const stream = createStreamFromString(csv); - const count = await countRows(stream, { countEmptyRows: true }); + const reader = createReaderFromString(csv); + const count = await countRows(reader, { countEmptyRows: true }); assert.strictEqual(count, 4); // 2 data rows + 2 empty rows (header not counted) }); it("should return 0 for empty CSV", async () => { const csv = ""; - const stream = createStreamFromString(csv); - const count = await countRows(stream); + const reader = createReaderFromString(csv); + const count = await countRows(reader); assert.strictEqual(count, 0); }); it("should return 0 for CSV with only header when countHeaderRow is false", async () => { const csv = "name,age"; - const stream = createStreamFromString(csv); - const count = await countRows(stream, { countHeaderRow: false }); + const reader = createReaderFromString(csv); + const count = await countRows(reader, { countHeaderRow: false }); assert.strictEqual(count, 0); }); it("should return 1 for CSV with only header when countHeaderRow is true", async () => { const csv = "name,age"; - const stream = createStreamFromString(csv); - const count = await countRows(stream, { countHeaderRow: true }); + const reader = createReaderFromString(csv); + const count = await countRows(reader, { countHeaderRow: true }); assert.strictEqual(count, 1); });