Read csv files from zip file #451
-
|
I am trying to the files inside a zip file using a transform stream all wrapped into a iterator for convenience. I haven't gotten to the csv part yet because I can't figure out how to use the Another problem is that the iterator only yields one line if I put a From reading the doc I suspect I might need to have access to the defaultWriter to ease out more values and close it after processing, but the implementation of getData is really hard to follow. import {Uint8ArrayWriter,Uint8ArrayReader,TextReader,ZipReader,ZipWriter} from 'https://deno.land/x/zipjs/index.js'
import {TextLineStream} from 'https://deno.land/std/streams/mod.ts'
import {readCSV,CSVReader} from 'https://deno.land/x/csv/mod.ts'
class FileIterator {
accumulator = []
constructor(entry) {
const transform = new TransformStream()
const queuingStrategy = new CountQueuingStrategy({highWaterMark:1})
let accumulator = this.accumulator
this.writable = new WritableStream({
write(chunk) {console.log('line');accumulator.push(chunk)},
close() {console.log('abort csv file iterator')},
abort(e) {console.error('abort csv file iterator: ',e)}
}, queuingStrategy)
transform.readable
.pipeThrough(new TextDecoderStream())
// .pipeThrough(new TextLineStream())
.pipeTo(this.writable)
this.transform = transform
this.entry = entry
}
[Symbol.asyncIterator]() {return this.iterateFromOffset(0)}
async *iterateFromOffset(offset) {
const writer = await this.entry.getData(this.transform)
while(this.accumulator[offset]) {
yield this.accumulator[offset++]
}
}
}
const zipWriter = new ZipWriter(new Uint8ArrayWriter());
await Promise.all([
zipWriter.add("a.csv", new TextReader("a,b,c\n1,2,3")),
zipWriter.add("b.csv", new TextReader("d,e,f\n4,5,6"))])
const zipFile = await zipWriter.close()
const zipReader = new ZipReader(new Uint8ArrayReader(zipFile))
for (const entry of await zipReader.getEntries()) {
console.log('file ', entry.filename)
const iter = new FileIterator(entry)
for await (const content of iter) {
console.log(content)
}
}
await zipReader.close()
console.log('END')log: |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 11 replies
This comment has been hidden.
This comment has been hidden.
-
|
import {
ZipReader,
ZipWriter,
terminateWorkers,
Uint8ArrayWriter,
Uint8ArrayReader,
TextReader
} from 'https://deno.land/x/[email protected]/index.js';
import {
initParser,
inferSchema
} from 'https://esm.sh/[email protected]';
const zipWriter = new ZipWriter(new Uint8ArrayWriter());
await Promise.all([
zipWriter.add('a.csv', new TextReader(
'a,b,c\n' + Array.from({ length: 100 }).map((_, i) => `${i},${i},${i}`).join('\n')
)),
zipWriter.add('b.csv', new TextReader('d,e,f\n4,5,6'))]);
const zipFile = await zipWriter.close();
const zipReader = new ZipReader(new Uint8ArrayReader(zipFile));
for (const entry of await zipReader.getEntries()) {
console.log('file ', entry.filename);
for await (const row of await csvFromZipEntry(entry)) {
console.log('row =>', row);
}
}
await zipReader.close();
terminateWorkers();
async function csvFromZipEntry(entry) {
let csvParser;
const { readable, writable } = new TransformStream();
const readableOutput = readable
.pipeThrough(new TextDecoderStream())
.pipeThrough(new TransformStream({
transform(chunk, controller) {
csvParser ??= initParser(inferSchema(chunk));
csvParser.chunk(chunk, csvParser.typedArrs, rows => rows.forEach(row => controller.enqueue(row)));
},
flush() {
csvParser.end();
}
}))
await entry.getData(writable);
return readableOutput;
} |
Beta Was this translation helpful? Give feedback.
I don't think it's possible since it depends on, the performance could be improved by not usingcontroller. Howeverawaitwhen callinggetDataand.csvFromZipEntry. The exceptiongetDatamight trow can also be ignored, because the stream (i.e.readable) can propagate itHere's below the code with these changes applied.