node - 在管道之后正确关闭流

假设我有以下代码:

try {
    let size = 0;

    await pipeline(
        fs.createReadStream('lowercase.txt'),
        async function* (source) {
            for await (const chunk of source) {
                size += chunk.length;

                if (size >= 1000000) {
                    throw new Error('File is too big');
                }

                yield String(chunk).toUpperCase();
            }
        },
        fs.createWriteStream('uppercase.txt')
    );

    console.log('Pipeline succeeded.');
} catch (error) {
    console.log('got error:', error);
}

如何确保在每种情况下都正确关闭流? node docs 没有多大帮助——它们只是告诉我我将有悬空的事件侦听器:

stream.pipeline() 将对所有流调用 stream.destroy(err),除了:

发出“结束”或“关闭”的可读流。

发出“完成”或“关闭”的可写流。

stream.pipeline() 在调用回调后在流上留下悬空事件侦听器。在失败后重用流的情况下,这可能会导致事件侦听器泄漏和吞噬错误。

stack overflow Node - Closing Streams Properly after Pipeline
原文答案
author avatar

接受的答案

So, I find many of the node.js stream compound operations such as pipeline() and .pipe() to be really bad/incomplete at error handling. For example, if you just do this:

fs.createReadStream("input.txt")
  .pipe(fs.createWriteStream("output.txt"))
  .on('error', err => {
      console.log(err);
  }).on('finish', () => {
      console.log("all done");
  });

You would expect that if there was an error opening the readStream that you'd get that error in your error handler here, but "no" that is not the case. An error opening that input file will be unhandled. There's some logic to that as .pipe() returns the output stream and an input error isn't an error on the output stream, but when that's not passed through, it makes it very easy to miss errors on the input stream. The .pipe() operation could have listened for errors on the input stream and passed an error through (even if it was a pipeErr or something different) and then it could have also cleaned up the writeStream properly upon a read error. But, .pipe() wasn't implemented that thoroughly. It seems to want to assume that there would never be an error on the input stream.

Instead, you have to separately save the readStream object and attach an error handler to it directly in order to see that error. So, I just don't trust this compound stuff any more and the doc never really explains how to do proper error handling. I tried to look at the code for pipeline() to see if I could understand the error handling and that did not prove to be a fruitful endeavor.

So, your particular problem seems like it could be done with a transform stream:

const fs = require('fs');
const { Transform } = require('stream');

const myTransform = new Transform({
    transform: function(chunk, encoding, callback) {
        let str = chunk.toString('utf8');
        this.push(str.toUpperCase());
        callback();
    }
});

function upperFile(input, output) {
    return new Promise((resolve, reject) => {
        // common function for cleaning up a partial output file
        function errCleanup(err) {
            fs.unlink(output, function(e) {
                if (e) console.log(e);
                reject(err);
            });
        }

        let inputStream = fs.createReadStream(input, {encoding: 'utf8'});
        let outputStream = fs.createWriteStream(output, {emitClose: true});

        // have to separately listen for read/open errors
        inputStream.on("error", err => {
            // have to manually close writeStream when there was an error reading
            if (outputStream) outputStream.destroy();
            errCleanup(err);
        });
        inputStream.pipe(myTransform)
            .pipe(outputStream)
            .on("error", errCleanup)
            .on("close", resolve);        
    });
}

// sample usage
upperFile("input.txt", "output.txt").then(() => {
    console.log("all done");
}).catch(err => {
    console.log("got error", err);
});

As you can see, about 2/3 of this code is dealing with errors in a robust manner (the part that the built-in operations don't do properly).


答案:

作者头像

TLDR;

  • pipe 有这些问题
  • pipeline 是为了解决所有问题而创建的,它确实如此
  • 如果有从头到尾的所有部分, pipeline 很好,但如果没有:
    • 未来版本的 node 将有一个 stream.compose 函数来解决这个问题
    • 在此之前 stream-chain 库是一个不错的选择

长篇大论的回答:

接受的答案只是忽略了 pipeline ,但它是专门为解决这个问题而设计的。 pipe 绝对受到了它的影响(更多下文),但我没有发现 pipeline 没有正确关闭文件、http 等周围的流的情况。带有随机 npm 包的 YMMV,但如果它有 closedestroy 函数,以及 on('error' 事件,应该没问题。

为了演示,这会调用 shell 以查看我们的测试文件是否打开:

const listOpenFiles = async () => {
  const { stdout } = await promisify(exec)("lsof -c node | awk '{print $9}'");

  // only show our test files
  const openFiles = stdout.split('n').filter((str) => str.endsWith('case.txt'));
  console.log('***** open files:n', openFiles, 'n-------------');
};

如果您在上面示例中的循环内调用它:

for await (const chunk of source) {
  await listOpenFiles();

输出将不断重复:

***** open files:
[
  '/path/to/lowercase.txt',
  '/path/to/uppercase.txt'
]

如果你在 catch 之后再次调用它,你可以看到一切都关闭了。

***** open files:
 [] 

关于引用的文档:

pipeline 文档在前 2 个要点中所指的是它不会关闭已经关闭的流,因为......好吧,它们已经关闭了。至于悬空监听器,它们确实留在传递给 pipeline 的各个流上。但是,在您的示例(典型案例)中,您无论如何都没有保留对各个流的引用;管道完成后,它们将立即被垃圾收集。例如,如果您经常引用其中一个,它会警告潜在的副作用。

// using this same instance over and over will end up with tons of dangling listeners
export const capitalizer = new Transform(// ...

相反,最好有“干净”的实例。现在生成器函数很容易链接,甚至根本就没有对转换的引用,但是您可以简单地创建一个返回新实例而不是具有常量实例的函数:

export const createCaptilizer = () => new Transform(// ...

简而言之,上面的例子在所有 3 点上都很好。

有关 pipe 的更多信息

另一方面, pipe 确实存在上述传播问题。

const csvStream = (file) => {
  // does not expose file errors, nor clean up the file stream on parsing errors!!!
  return fs.createReadStream(file).pipe(createCsvTransform());
};

人们普遍认为这很痛苦/不直观,但现在改变它为时已晚。我尽量避免它,并尽可能推荐 pipeline 。然而,重要的是要注意 pipeline 需要将所有部分放在一起。所以例如对于上述内容,您还需要最终的 Writable 目标。在这种情况下,如果您只想构建链的一部分,您仍然必须使用 pipe 。解决此问题的方法更容易单独推理:

const csvStream = (file) => {
  const fileStream = fs.createReadStream(file);
  const transform = createCsvTransform();
  // pass file errors forward
  fileStream.on('error', (error) => transform.emit('error', error));
  // close file stream on parsing errors
  transform.on('error', () => fileStream.close());

  return transform;
}

不过,也有好消息。它仍然是实验性的,但很快流将公开一个 stream.compose 函数。它具有 pipeline 的所有传播/清理优势,但只是返回一个新流。本质上,这是大多数人 thought 会做的事情。 ;)

pipe

在此之前,请查看 ```
// NO propagation or cleanup
readable.pipe(transform);

// automatic propagation and cleanup
stream.compose(readable, transform);


关于  [https://www.npmjs.com/package/stream-chain](https://www.npmjs.com/package/stream-chain)  和  `pipeline`  的注释
--------------------------------

请注意,上面的示例使用  `await` ,但链接的文档是同步版本的。这不会返回一个承诺,所以  `await pipeline(//...`  什么都不做。从node 15 开始,您通常会在此处需要  `await`  api: `stream/promises` 

 [https://nodejs.org/api/stream.html#streams-promises-api](https://nodejs.org/api/stream.html#streams-promises-api) 

在node 15 之前,您可以使用 util 的  ```
import { pipeline } from 'stream/promises'; // NOT 'stream'

```  使其成为一个承诺:

 `promisify` 

或者,为了简化整个文件:

import { pipeline } from 'stream';
import { promisify } from 'util';

await promisify(pipeline)(// ...



我只提到这一点是因为,如果您将  ```
import * as stream from 'stream';
import { promisify } from 'util';

const pipeline = promisify(stream.pipeline);

```  与同步版本一起使用,它实际上不会在  `await`  之后完成,因此可能会产生无法清理的错误印象,而实际上它尚未完成。