记一次导出 CSV 产生的研究
2024/9/1
此文于 2020 年首发于掘金
引入
工作中做一个将数据库数据导出为 CSV 格式的功能,开发环境下一切非常正常,然后满意地上到测试环境进行测试,结果测试同学反馈导出大量数据时 Chrome 直接报错网络错误,我就很纳闷,自己开发的时候也用了大数据量来测试,为什么没有出问题,一句 It works on my computer
就要脱口而出了,本着严谨的原则还是自己跑测试环境测试了一下,发现只要数据量达到100万条左右时几乎每一次导出都会报错。
一开始先想到是不是网络超时,但这个下载过程其实是一直都有数据传输的,不应该触发超时设定才对,不然其他下载文件的过程也应该超时了,因此需要另寻思路查一查这个问题。
寻找线索
在 Stack Overflow
上面搜索一番,主要说法是以下两种:
Chrome
的Predict network actions to improve page load performance
选项- 杀毒软件不兼容
两种解决方式尝试无果,而且一看就不像正确答案/捂脸
于是用了一个偏门的方法,把这个导出接口的响应头部设置为
Content-Disposition: inline;
即把接口传输出来的 csv
数据直接当做网页显示在浏览器里面,也能够借助 Chrome DevTools
看看详细的报错信息。
为了模拟耗时较长的下载过程,将 Chrome DevTools
中 Network
面板的 Throttle
选项打开,设置成 Slow 3G
,这样可以模拟测试环境外网的网络速度。
测试了四次,有三次在 7 分钟左右报错,一次在 10 分钟左右停下来,Chrome
的下载结果只显示一个 失败-网络错误,但打开控制台就发现线索了,每次的报错都是一样的:
net::ERR_INCOMPLETED_CHUNK_ENCODING 200
借着这个报错信息再搜索 Stack Overflow
,似乎发现了问题所在:
This is typically caused by a server not sending us the terminal 0-length chunk. We sit around waiting for more data with the request hung until the server closes the socket. At that point, we have no way to know whether we've received the entire file or not. This seems to be working as intended. The server needs to be fixed.
附上链接 Bug Chrome
大概意思是服务器没有发送代表结束的最后一个 0-length chunk
,相当于响应的数据是不完整的,这个与报错信息中的 INCOMPLETED
一致。
分析原因
线索有了,来看一下业务代码:
// res 指的是 Express 的 Response 对象
const stream = Model.find({}).stream();
stream.on("data", data => {
// do some stuff
res.write(data);
});
stream.on("error", err => {
// process and log the error
});
stream.on("end", () => {
res.end();
});
这里需要介绍一下相关的概念,首先 stream
是一个 mongoose
的可读流,res
是 Express
的 Response
对象,stream
是一个可读流,res
是一个可写流。我们知道通常读取数据的速度都要快于写入数据的速度,因此为了维持读写速度的平衡,从可读流读取出来的数据会在缓冲区堆积,等待可写流空闲时再继续写入操作。
express
流式写入 response
对象在 http 协议层面使用的是 Transfer-Encoding
的首部:
Transfer-Encoding: Chunked
对于这个首部,http 协议有以下规定:
The terminating chunk is a regular chunk, with the exception that its length is zero.
在最终的分块传输完成之后,需要再传输一个零长度的块,告知客户端流数据已经传输完毕。
以上代码的问题是,没有考虑到读写速度的平衡,只考虑了数据是否被读取出来,实际上从可读流读取出来的数据可能大量积压,此时可读流 stream
触发 end
事件时,数据并不一定完全写入到了 res
流中。TCP Socket
在把缓冲区的数据通过网络发送出去之后并不会立即把缓冲区的数据删除,而是需要等待对端的 ACK
报文到达才会真正把数据删除,如果网络情况不好,则对端的 ACK
报文可能需要很长时间才能到达,此时缓冲区很快就满了,并且无法再写入,在测试环境使用外网时,网络情况较差,express
写入到 TCP Socket
写缓冲区的数据都被阻塞,此时调用 res.end()
方法会导致实际上写入的数据并不完整,并且最后的零长度分块也没写入,客户端自然就判定为数据接收不完整,就报错了,这也是为什么开发时内网环境没有出现这种情况的原因。
Backpressure的简单介绍
这里涉及到一个概念: Backpressure
通常在数据处理的时候我们会遇到一个普遍的问题:
背压
,意思是在数据传输过程中有一大堆数据在缓存之后积压着。每次当数据到达结尾又遇到复杂的运算,又或者无论什么原因它比预期的慢,这样累积下来,从源头来的数据就会变得很庞大,像一个塞子一样堵塞住。
我们需要一个合理的机制来处理这种情况,实际上 Node.js
已经提供了该类问题的解决方案。
- 在调用可写流的
write
方法时,该方法会根据缓冲区的数据堆积情况来确定返回结果,若可以继续写入则返回true
,若暂时无法继续写入则返回false
(原因可能是写队列繁忙或者读取出来的数据块太大)。在该方法返回 false 时,不应该继续调用write
方法,而是应该等待drain
事件发出之后再继续。 - 使用流的
pipe
方法,将读写平衡的控制交给Node.js
自己来完成。
但是实际上 pipe 方法使用的也是第一种方案,只不过 Node.js 已经帮我们做了这些处理了,在 ReadableStream 的源码中有这么一段
src.on('data', ondata);
function ondata(chunk) {
debug('ondata');
const ret = dest.write(chunk);
debug('dest.write', ret);
if (ret === false) {
// If the user unpiped during `dest.write()`, it is possible
// to get stuck in a permanently paused state if that write
// also returned false.
// => Check whether `dest` is still a piping destination.
if (!cleanedUp) {
if (state.pipes.length === 1 && state.pipes[0] === dest) {
debug('false write response, pause', 0);
state.awaitDrainWriters = dest;
state.multiAwaitDrain = false;
} else if (state.pipes.length > 1 && state.pipes.includes(dest)) {
debug('false write response, pause', state.awaitDrainWriters.size);
state.awaitDrainWriters.add(dest);
}
src.pause();
}
if (!ondrain) {
// When the dest drains, it reduces the awaitDrain counter
// on the source. This would be more elegant with a .once()
// handler in flow(), but adding and removing repeatedly is
// too slow.
ondrain = pipeOnDrain(src, dest);
dest.on('drain', ondrain);
}
}
}
在 src
流也就是管道的源头发起 data
事件时,执行 ondata
回调,调用 dest.write
方法写入数据到目标流也就是管道的另一端,判断返回值若为 false
(cleanedup
为 false
表示仍然有事件监听器在监听 src
上的事件),表示此时写入的目标流无法继续写入,所以需要暂停数据的写入过程,同时保存需要等待 drain
事件的可写流对象,并为该可写流添加 drain
事件监听器,它的具体实现是这样的:
function pipeOnDrain(src, dest) {
return function pipeOnDrainFunctionResult() {
const state = src._readableState;
// `ondrain` will call directly,
// `this` maybe not a reference to dest,
// so we use the real dest here.
if (state.awaitDrainWriters === dest) {
debug('pipeOnDrain', 1);
state.awaitDrainWriters = null;
} else if (state.multiAwaitDrain) {
debug('pipeOnDrain', state.awaitDrainWriters.size);
state.awaitDrainWriters.delete(dest);
}
if ((!state.awaitDrainWriters || state.awaitDrainWriters.size === 0) &&
EE.listenerCount(src, 'data')) {
state.flowing = true;
flow(src);
}
};
}
监听到 drain
事件之后就恢复源也就是可读流的流动状态,继续读取数据,注意这里如果是一个可读流写入多个可写流,则必须等待所有可写流都监听到 drain
事件后才能恢复流动。
结合实际谈谈用法
结合我自己的在这一块的工作内容,对于以上两种解决方案,说说我自己的看法。首先说说第二种,平时工作上使用流导出 CSV
的方式通常是生成一个 MongoDB
的可读流,然后在 data
事件的回调中做数据结构的转换和处理,再写入 res
流中,如果使用 pipe
方法,则需要自己写一个 Transform
转换流,然后这样调用:
const { Transform } = require('stream');
class MyTransform extends Transform {
...
}
// export.js
const stream = Model.find({}).stream();
const transform = new MyTransform();
stream.pipe(transform).pipe(res);
// 发生错误时断开管道
stream.on('error', err => {
...
});
transform.on('error', err => {
...
});
因为导出时经常需要根据第一次查到的数据去其他表或库查另外的数据,所以需要在处理数据时发起异步请求,如果需要在 Transform
流中做异步操作,则需要自己写不同的 Transform
流的子类,非常麻烦,所以通常在不需要异步查其他数据时使用这种解决方式。
那么需要异步查询其他数据库或表时怎么做呢,通常用第一种解决方式,比较灵活,具体写法如下:
const stream = Model.find({}).stream();
stream.on('data', data => {
asyncOperation(data).then(result => {
const finalData = doSomething(result);
let writable = res.write(finalData);
if (!writable) {
stream.pause();
}
});
});
// 缓冲区空闲时流对象会发起 drain 事件,代表此时可以继续写入了
res.on('drain', () => {
stream.resume();
});
结语
按照上述方案修改之后,这个困扰了我一个多月的问题成功解决了,顺带收获了一波知识点,舒坦。
萌新第一次发掘金,以上有写错的地方,欢迎大佬指正。