So, I find many of the node.js stream compound operations such as pipeline()
and .pipe()
to be really bad/incomplete at error handling. For example, if you just do this:
fs.createReadStream("input.txt")
.pipe(fs.createWriteStream("output.txt"))
.on('error', err => {
console.log(err);
}).on('finish', () => {
console.log("all done");
});
You would expect that if there was an error opening the readStream that you'd get that error in your error handler here, but "no" that is not the case. An error opening that input file will be unhandled. There's some logic to that as .pipe()
returns the output stream and an input error isn't an error on the output stream, but when that's not passed through, it makes it very easy to miss errors on the input stream. The .pipe()
operation could have listened for errors on the input stream and passed an error through (even if it was a pipeErr
or something different) and then it could have also cleaned up the writeStream properly upon a read error. But, .pipe()
wasn't implemented that thoroughly. It seems to want to assume that there would never be an error on the input stream.
Instead, you have to separately save the readStream object and attach an error handler to it directly in order to see that error. So, I just don't trust this compound stuff any more and the doc never really explains how to do proper error handling. I tried to look at the code for pipeline()
to see if I could understand the error handling and that did not prove to be a fruitful endeavor.
So, your particular problem seems like it could be done with a transform stream:
const fs = require('fs');
const { Transform } = require('stream');
const myTransform = new Transform({
transform: function(chunk, encoding, callback) {
let str = chunk.toString('utf8');
this.push(str.toUpperCase());
callback();
}
});
function upperFile(input, output) {
return new Promise((resolve, reject) => {
// common function for cleaning up a partial output file
function errCleanup(err) {
fs.unlink(output, function(e) {
if (e) console.log(e);
reject(err);
});
}
let inputStream = fs.createReadStream(input, {encoding: 'utf8'});
let outputStream = fs.createWriteStream(output, {emitClose: true});
// have to separately listen for read/open errors
inputStream.on("error", err => {
// have to manually close writeStream when there was an error reading
if (outputStream) outputStream.destroy();
errCleanup(err);
});
inputStream.pipe(myTransform)
.pipe(outputStream)
.on("error", errCleanup)
.on("close", resolve);
});
}
// sample usage
upperFile("input.txt", "output.txt").then(() => {
console.log("all done");
}).catch(err => {
console.log("got error", err);
});
As you can see, about 2/3 of this code is dealing with errors in a robust manner (the part that the built-in operations don't do properly).
TLDR;
pipe
有这些问题pipeline
是为了解决所有问题而创建的,它确实如此pipeline
很好,但如果没有:stream.compose
函数来解决这个问题长篇大论的回答:
接受的答案只是忽略了
pipeline
,但它是专门为解决这个问题而设计的。pipe
绝对受到了它的影响(更多下文),但我没有发现pipeline
没有正确关闭文件、http 等周围的流的情况。带有随机 npm 包的 YMMV,但如果它有close
或destroy
函数,以及on('error'
事件,应该没问题。为了演示,这会调用 shell 以查看我们的测试文件是否打开:
如果您在上面示例中的循环内调用它:
输出将不断重复:
如果你在 catch 之后再次调用它,你可以看到一切都关闭了。
关于引用的文档:
pipeline
文档在前 2 个要点中所指的是它不会关闭已经关闭的流,因为......好吧,它们已经关闭了。至于悬空监听器,它们确实留在传递给pipeline
的各个流上。但是,在您的示例(典型案例)中,您无论如何都没有保留对各个流的引用;管道完成后,它们将立即被垃圾收集。例如,如果您经常引用其中一个,它会警告潜在的副作用。相反,最好有“干净”的实例。现在生成器函数很容易链接,甚至根本就没有对转换的引用,但是您可以简单地创建一个返回新实例而不是具有常量实例的函数:
简而言之,上面的例子在所有 3 点上都很好。
有关
pipe
的更多信息另一方面,
pipe
确实存在上述传播问题。人们普遍认为这很痛苦/不直观,但现在改变它为时已晚。我尽量避免它,并尽可能推荐
pipeline
。然而,重要的是要注意pipeline
需要将所有部分放在一起。所以例如对于上述内容,您还需要最终的Writable
目标。在这种情况下,如果您只想构建链的一部分,您仍然必须使用pipe
。解决此问题的方法更容易单独推理:不过,也有好消息。它仍然是实验性的,但很快流将公开一个
stream.compose
函数。它具有pipeline
的所有传播/清理优势,但只是返回一个新流。本质上,这是大多数人thought
会做的事情。 ;)pipe
在此之前,请查看 ```
// NO propagation or cleanup
readable.pipe(transform);
// automatic propagation and cleanup
stream.compose(readable, transform);
import { pipeline } from 'stream';
import { promisify } from 'util';
await promisify(pipeline)(// ...