Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 13 additions & 40 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ npm install @humanwhocodes/csv-tools

## Usage

This package exports two main functions for working with CSV data via `ReadableStream` objects:
This package exports two main functions for working with CSV data via `ReadableStreamDefaultReader` objects:

### `countRows(stream, options)`
### `countRows(reader, options)`

Counts rows in a CSV file with configurable options.

Expand All @@ -31,28 +31,30 @@ import { ReadableStream } from "node:stream/web";

const fileStream = createReadStream("data.csv");
const webStream = ReadableStream.from(fileStream);
const reader = webStream.getReader();

// Count only data rows (exclude header)
const dataRowCount = await countRows(webStream);
const dataRowCount = await countRows(reader);
console.log(`Found ${dataRowCount} data rows`);

// Count all rows including header
const fileStream2 = createReadStream("data.csv");
const webStream2 = ReadableStream.from(fileStream2);
const totalRowCount = await countRows(webStream2, { countHeaderRow: true });
const reader2 = webStream2.getReader();
const totalRowCount = await countRows(reader2, { countHeaderRow: true });
console.log(`Found ${totalRowCount} total rows`);
```

**Parameters:**

- `stream` (`ReadableStream<Uint8Array>`) - A readable stream containing CSV data
- `reader` (`ReadableStreamDefaultReader<Uint8Array>`) - A readable stream reader containing CSV data
- `options` (`Object`, optional) - Configuration options
- `countHeaderRow` (`boolean`, default: `false`) - Whether to count the header row
- `countEmptyRows` (`boolean`, default: `false`) - Whether to count empty rows

**Returns:** `Promise<number>` - The count of rows in the CSV file

### `chunk(stream, options)`
### `chunk(reader, options)`

An async generator function that yields strings of mini CSV files. Each chunk contains the header row followed by up to `chunkSize` data rows.

Expand All @@ -65,9 +67,10 @@ import { ReadableStream } from "node:stream/web";

const fileStream = createReadStream("data.csv");
const webStream = ReadableStream.from(fileStream);
const reader = webStream.getReader();

// Process CSV in chunks of 50 rows
for await (const csvChunk of chunk(webStream, { chunkSize: 50 })) {
for await (const csvChunk of chunk(reader, { chunkSize: 50 })) {
// Each csvChunk is a string with header + up to 50 data rows
console.log("Processing chunk:");
console.log(csvChunk);
Expand All @@ -77,7 +80,7 @@ for await (const csvChunk of chunk(webStream, { chunkSize: 50 })) {

**Parameters:**

- `stream` (`ReadableStream<Uint8Array>`) - A readable stream containing CSV data
- `reader` (`ReadableStreamDefaultReader<Uint8Array>`) - A readable stream reader containing CSV data
- `options` (`Object`) - Configuration options
- `chunkSize` (`number`, optional) - Number of data rows per chunk. Default: 100
- `includeEmptyRows` (`boolean`, optional) - Whether to include empty rows. Default: false
Expand All @@ -92,45 +95,15 @@ import { countRows, chunk } from "@humanwhocodes/csv-tools";
// Fetch CSV from URL
const response = await fetch("https://example.com/data.csv");
const reader = response.body.getReader();
const stream = new ReadableStream({
start(controller) {
return pump();
function pump() {
return reader.read().then(({ done, value }) => {
if (done) {
controller.close();
return;
}
controller.enqueue(value);
return pump();
});
}
},
});

// Count rows
const rowCount = await countRows(stream);
const rowCount = await countRows(reader);
console.log(`Total rows: ${rowCount}`);

// Or process in chunks
const response2 = await fetch("https://example.com/data.csv");
const reader2 = response2.body.getReader();
const stream2 = new ReadableStream({
start(controller) {
return pump();
function pump() {
return reader2.read().then(({ done, value }) => {
if (done) {
controller.close();
return;
}
controller.enqueue(value);
return pump();
});
}
},
});
for await (const csvChunk of chunk(stream2, { chunkSize: 100 })) {
for await (const csvChunk of chunk(reader2, { chunkSize: 100 })) {
// Process each chunk
await processData(csvChunk);
}
Expand Down
179 changes: 87 additions & 92 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,69 +5,75 @@

/* @ts-self-types="./index.d.ts" */

/**
* Helper function to make a ReadableStreamDefaultReader async iterable
* @param {ReadableStreamDefaultReader<Uint8Array>} reader - The reader to make iterable
* @returns {AsyncGenerator<Uint8Array>} An async generator that yields values from the reader
*/
async function* makeReaderIterable(reader) {
while (true) {
const { done, value } = await reader.read();

if (done) {
break;
}

yield value;
}
}

/**
* Counts rows in a CSV file with configurable options.
* @param {ReadableStream<Uint8Array>} stream - The readable stream containing CSV data
* @param {ReadableStreamDefaultReader<Uint8Array>} reader - The readable stream reader containing CSV data
* @param {Object} options - Options for counting
* @param {boolean} [options.countHeaderRow=false] - Whether to count the header row
* @param {boolean} [options.countEmptyRows=false] - Whether to count empty rows
* @returns {Promise<number>} The count of rows
*/
export async function countRows(stream, options = {}) {
export async function countRows(reader, options = {}) {
const { countHeaderRow = false, countEmptyRows = false } = options;
const reader = stream.getReader();
const decoder = new TextDecoder();
let buffer = "";
let rowCount = 0;
let isFirstRow = true;

try {
while (true) {
const { done, value } = await reader.read();

if (done) {
// Process any remaining data in the buffer
if (buffer.length > 0) {
const isEmpty = buffer.trim() === "";

if (!isEmpty || countEmptyRows) {
if (!isFirstRow || countHeaderRow) {
rowCount++;
}
}
}

break;
}

buffer += decoder.decode(value, { stream: true });
for await (const value of makeReaderIterable(reader)) {
buffer += decoder.decode(value, { stream: true });

// Process complete lines
let newlineIndex;
// Process complete lines
let newlineIndex;

while ((newlineIndex = buffer.indexOf("\n")) !== -1) {
const line = buffer.substring(0, newlineIndex);
while ((newlineIndex = buffer.indexOf("\n")) !== -1) {
const line = buffer.substring(0, newlineIndex);

buffer = buffer.substring(newlineIndex + 1);
buffer = buffer.substring(newlineIndex + 1);

const isEmpty = line.trim() === "";
const isEmpty = line.trim() === "";

// Count based on options
if (!isEmpty || countEmptyRows) {
if (isFirstRow) {
if (countHeaderRow) {
rowCount++;
}

isFirstRow = false;
} else {
// Count based on options
if (!isEmpty || countEmptyRows) {
if (isFirstRow) {
if (countHeaderRow) {
rowCount++;
}

isFirstRow = false;
} else {
rowCount++;
}
}
}
} finally {
reader.releaseLock();
}

// Process any remaining data in the buffer
if (buffer.length > 0) {
const isEmpty = buffer.trim() === "";

if (!isEmpty || countEmptyRows) {
if (!isFirstRow || countHeaderRow) {
rowCount++;
}
}
}

return rowCount;
Expand All @@ -76,80 +82,69 @@ export async function countRows(stream, options = {}) {
/**
* A generator function that yields strings of mini CSV files.
* Each chunk is a string containing the header row followed by chunkSize data rows.
* @param {ReadableStream<Uint8Array>} stream - The readable stream containing CSV data
* @param {ReadableStreamDefaultReader<Uint8Array>} reader - The readable stream reader containing CSV data
* @param {Object} options - Options for chunking
* @param {number} [options.chunkSize=100] - Number of data rows per chunk
* @param {boolean} [options.includeEmptyRows=false] - Whether to include empty rows
* @returns {AsyncGenerator<string>} Generator yielding CSV chunks
*/
export async function* chunk(stream, options = {}) {
export async function* chunk(reader, options = {}) {
const { chunkSize = 100, includeEmptyRows = false } = options;
const reader = stream.getReader();
const decoder = new TextDecoder();
let buffer = "";
let header = null;
let currentChunk = [];

try {
while (true) {
const { done, value } = await reader.read();

if (done) {
// Process any remaining data in the buffer
if (buffer.length > 0) {
const isEmpty = buffer.trim() === "";

if (header === null) {
if (!isEmpty || includeEmptyRows) {
header = buffer.trim();
}
} else {
if (!isEmpty || includeEmptyRows) {
currentChunk.push(buffer.trim());
}
}
}

// Yield any remaining rows
if (currentChunk.length > 0 && header !== null) {
yield header + "\n" + currentChunk.join("\n");
}
for await (const value of makeReaderIterable(reader)) {
buffer += decoder.decode(value, { stream: true });

break;
}
// Process complete lines
let newlineIndex;

buffer += decoder.decode(value, { stream: true });
while ((newlineIndex = buffer.indexOf("\n")) !== -1) {
const line = buffer.substring(0, newlineIndex);

// Process complete lines
let newlineIndex;
buffer = buffer.substring(newlineIndex + 1);

while ((newlineIndex = buffer.indexOf("\n")) !== -1) {
const line = buffer.substring(0, newlineIndex);
const isEmpty = line.trim() === "";

buffer = buffer.substring(newlineIndex + 1);
// Skip empty lines unless includeEmptyRows is true
if (isEmpty && !includeEmptyRows) {
continue;
}

const isEmpty = line.trim() === "";
// First non-empty line is the header
if (header === null) {
header = line.trim();
} else {
currentChunk.push(line.trim());

// Skip empty lines unless includeEmptyRows is true
if (isEmpty && !includeEmptyRows) {
continue;
// Yield chunk when it reaches the specified size
if (currentChunk.length === chunkSize) {
yield header + "\n" + currentChunk.join("\n");
currentChunk = [];
}
}
}
}

// First non-empty line is the header
if (header === null) {
header = line.trim();
} else {
currentChunk.push(line.trim());
// Process any remaining data in the buffer
if (buffer.length > 0) {
const isEmpty = buffer.trim() === "";

// Yield chunk when it reaches the specified size
if (currentChunk.length === chunkSize) {
yield header + "\n" + currentChunk.join("\n");
currentChunk = [];
}
}
if (header === null) {
if (!isEmpty || includeEmptyRows) {
header = buffer.trim();
}
} else {
if (!isEmpty || includeEmptyRows) {
currentChunk.push(buffer.trim());
}
}
} finally {
reader.releaseLock();
}

// Yield any remaining rows
if (currentChunk.length > 0 && header !== null) {
yield header + "\n" + currentChunk.join("\n");
}
}
Loading