상세 컨텐츠

본문 제목

[node]대용량 데이터 처리에 필수적인 stream 이해하기

언어/Javascript + Typescript

by moonionn 2021. 11. 28. 23:19

본문

적절한 짤을 찾았다. [출처: reddit]

 

이 글에서는 node의 stream 모듈 사용법과 예시를 담았습니다.

 

목차

- Stream 간단 설명

- pipe 메서드

- Stream 종류

  - Readable + Writable

    - dummy 예시

    - 좀 더 실질적인 예시

  - Transform

    - 예시

 


stream 간단 설명

stream을 한 문장으로 설명하라면, 저는 아래와 같이 정의할 것 같습니다.

데이터를 정해진 흐름대로 읽고, 쓰고, 주고받기 위해 인터페이스 역할을 하는 추상화된 객체

 

 

Node 공식 문서는 stream의 역할에 대해 아래와 같이 설명하고 있습니다.

They are a way to handle reading/writing files, network communications, or any kind of end-to-end information exchange in an efficient way.

 

번역하자면

file을 읽고 쓰거나, 네트워크 통신을 하는 등 데이터를 주고받을 일이 있을때 효과적으로 작업을 수행할 수 있는 방법이라고 합니다.

 

공식 문서의 예시를 보겠습니다.

(자꾸 공식문서 내용으로 날로 먹는 것 같지만... 간단하고 명료한지라... 역시 공식문서 짱)

const http = require('http')
const fs = require('fs')

const server = http.createServer(function(req, res) {
  fs.readFile(__dirname + '/data.txt', (err, data) => {
    res.end(data)
  })
})
server.listen(3000)

data.txt 파일을 response로 전송하는 코드입니다.

하지만 data.txt 파일의 용량이 얼마냐에 따라 응답속도는 천차만별이 되어버리죠.

왜냐하면 fs.readFile()data.txt의 내용을 전부 다 읽어들인 다음 콜백을 실행하기 때문입니다.

 

하지만 stream을 사용한다면?

const http = require('http')
const fs = require('fs')

const server = http.createServer((req, res) => {
  const stream = fs.createReadStream(__dirname + '/data.txt')
  stream.pipe(res)
})
server.listen(3000)

stream을 사용하면 파일이 다 읽힐때까지 기다리지 않습니다.

대신 데이터들이 준비되는대로 클라이언트에게 보내는 식으로 동작합니다.

 

뭐 이런거랄까요

 

대용량 데이터를 처리해야 할 일이 있을때 꼭 stream을 사용하는 이유가 여기에 있습니다.

데이터베이스에서 수천만건의 데이터를 집계해야 할 일이 있을때 stream을 사용하지 않는다면... node가 죽는다거나... 이를 막기 위해... 메모리를 늘려줘야 한다거나(무의미한 방어책 😵)... 할테니까요.

 

🌿 + ⚙️ = 💪🏻?

 

많은 분들이 node를 처음 공부할때 http 모듈, path 모듈, fs 모듈 등을 먼저 접하실 겁니다.

앞서 설명한 이유로, stream 모듈도 이 다른 모듈들과 함께 node의 핵심을 이룬다고 봅니다.

 


.pipe()

바로 위 예시코드에 .pipe()이란 메서드가 호출되는 걸 보셨을 겁니다.

이 메서드는 stream 동작에 있어서 핵심적인 기능이라고 할 수 있는데요, 파이프라인이 뭔지 아시는 분들은 금방 아셨겠지만, 설명하자면 데이터(스트림)를 보낼 대상(destination)을 향해 쏘는(?!) 역할을 합니다.

출처 : https://www.astronomer.io/blog/data-pipeline

 

그래서 stream.pipe(res) 라는 코드는 streamresponse 객체로 보내는 코드라고 이해하면 되는겁니다.

.pipe() 메서드는 여러개를 체이닝해서 사용할 수도 있습니다.

stream.pipe(destination1).pipe(destination2);

파이프 체이닝 - 다들 한번쯤 이런 게임 해보지 않았나요?

 


stream의 종류

stream은 크게 writable stream과 readable stream으로 나뉩니다.

(+ Duplex(Transform)도 있지만 하단에 별도로 설명하는걸로)

 

Readable, Writable

Readable stream은 데이터를 Buffer로 읽는 역할을 하며, data pipeline의 origin 부분에 해당합니다.(위위 이미지 참고) destination에 readable이 들어오면 에러가 납니다! Writable stream은 그 반대입니다.

 

말보다는 예시를 보는게 이해가 빠르니 얼른 예시로 넘어갑시다.

 

Readable 예시 - dummy편

 

여기 꽤 많은 내용을 담고 있는 dummy.txt라는 파일을 불러올 stream을 생성해봅시다.

const fs = require('fs');

const readStream = fs.createReadStream(__dirname + '/dummy.txt');

readStream.on('data', (data) => {
  console.log(data);
});

// => <Buffer 4c 6f 72 65 6d 20 69 70 73 75 6d 20 64 6f 6c 6f 72 20 73 69 74 20 61 6d 65 74 2c 20 63 6f 6e 73 65 63 74 65 74 75 65 72 20 61 64 69 70 69 73 63 69 6e ... 65486 more bytes>
// => <Buffer 69 6e 20 65 72 61 74 20 75 74 20 75 72 6e 61 20 63 75 72 73 75 73 20 76 65 73 74 69 62 75 6c 75 6d 2e 20 46 75 73 63 65 20 63 6f 6d 6d 6f 64 6f 20 61 ... 34413 more bytes>

결과물을 보면 아시겠지만 Readable stream은 데이터를 Buffer로 읽습니다.

(Buffer에 대한 설명은 또 장황해질 수 있어서 여기에 별도로 적진 않겠습니다)

그래서 dummy.txt 파일의 데이터를 통째로 처리하지 않고, 정해진 단위로 쪼개어 처리할 수 있게 합니다.

 

Writable 예시 - dummy편

아까 읽어온 dummy.txt의 내용을 좀 변환해서 다른 파일로 작성한다고 가정해봅시다.

const fs = require('fs');

const readStream = fs.createReadStream(__dirname + '/dummy.txt');
const writeStream = fs.createWriteStream(__dirname + '/dummy2.txt');

readStream.on('data', (data) => {
  writeStream.write(data.slice(0, 10));
});

파일이 이렇게 생성된다

 

잠깐, 저기 .on("data", () => {}) 이건 뭐죠?

스트림의 이벤트를 핸들링하는 구간입니다. 첫번째 인자로는 node에서 정해놓은 event의 종류가 들어갑니다. ('end', 'data', 'finish' 등)
이벤트의 종류도 writable stream이냐 readable stream이냐에 따라 각기 다릅니다. 자세한건 공식문서를 참고하세요!

 

 

Readable 예시 - 좀 더 실질적인 편

(sequelize는 stream처리에 불편한 라이브러리지만... 가장 대중적이니 일단 들고와봄)

(여러분 knex나 typeORM 쓰세요 ㅋㅋㅋ 평소처럼 select~ from~ 한 다음에 뒤에 .stream()만 붙여주면 됩니다)

// 모든 거래 데이터를 불러들인다는 예시코드
// sequelize에 대한 글이 아니기 때문에 pseudo code에 가깝습니다.

const { Readable } = require('stream');
const { Sequelize } = require('sequelize');

const sequelize = new Sequelize(...);

const readStream = new Readable({
  async read(size) {
    const transactions = await sequelize.query('SELECT * FROM transactions', {
      type: sequelize.QueryTypes.SELECT,
    });
    this.push(transactions);
    this.push(null);
  },
});

원본 데이터가 파일이 아니라면(ex: 데이터베이스에서 추출된 데이터)

new stream.Readable() 로 인스턴스를 생성합니다.

(물론 knex나 typeORM등으로 추출한 데이터라면 위에 말한 것처럼 .stream()만 붙여주면 됩니다만... 이런건 개인마다 상황에 맞게 잘 판단하시길 바랍니다)

 

_read()

readable stream은 _read()라는 메서드를 내장하고 있는데,

이 메서드는 데이터를 Buffer object로 읽는 작업을 할 때, pipe를 타게 되면 자동으로 호출됩니다.

그리고 이 read 메서드의 행위를 new Readable({ read(){} }) 이렇게 객체를 정의할때 설정해줄 수 있죠.

 

Writadable 예시 - 좀 더 실질적인 편

이제 불러온 데이터를 가공해봅시다. 아까 적었던 Readable 실질적 예시와 연동됩니다.

const { Readable, Writable } = require('stream');

// db에서 꺼내온 거래 정보중 id와 현재 time을 더해 uniqueCode를 생성한다는 컨셉의 코드

const uniqueCodes = [];

const readStream = new Readable({
  async read(size) {
    const transactions = await sequelize.query('SELECT * FROM transactions', {
      type: sequelize.QueryTypes.SELECT,
    });
    this.push(transactions);
    this.push(null);
  },
});

const writeStream = new Writable({
  write(chunk, encoding, callback) {
    uniqueCodes.push({ uniqueCode: chunk.id + Date.now() });
    callback();
  },
});

readStream.pipe(writeStream).on('finish', () => console.log(uniqueCodes));

그런데 위의 코드를 따라하면 작동을 안할겁니다.

바로 node의 stream 모듈은 default로 Buffer와 string 타입에만 적용이 되기 때문이죠.

그 외 type의 데이터를 핸들링하기 위해서는 object mode라는 설정을 true로 명시해주어야 합니다.

 

const { Readable, Writable } = require('stream');

// objectMode가 적용된, 제대로 작동하는 코드

const uniqueCodes = [];

const readStream = new Readable({
  objectMode: true,	// objectMode 설정!!!
  
  async read(size) {
    const transactions = await sequelize.query('SELECT * FROM transactions', {
      type: sequelize.QueryTypes.SELECT,
    });
    this.push(transactions);
    this.push(null);
  },
});

const writeStream = new Writable({
  objectMode: true,	// objectMode 설정!!!
  
  write(chunk, encoding, callback) {
    uniqueCodes.push({ uniqueCode: chunk.id + Date.now() });
    callback();
  },
});

readStream.pipe(writeStream).on('finish', () => console.log(uniqueCodes));
// => [ { uniqueCode: 1638104987792 }, { uniqueCode: 1638104987793 }, ... ]

 

_write()

writable stream은 _write()라는 메서드를 내장하고 있습니다.

readable stream의 _read()와 마찬가지로 pipe를 타게 되면 자동으로 호출됩니다.

 


Transform

자 만약, 데이터베이스에서 가지고 온 데이터들을 가공한 뒤, 파일에 저장하려 한다든가, 등의 이유로 pipe를 chaining해야할 일이 생겼다면 또 다른 방법으로 코드를 구성해야 합니다. 왜냐하면 앞서 말했던 것처럼 Writable stream은 pipe의 destination에만 위치할 수 있고, Readable stream은 pipe의 origin에만 위치할 수 있기 때문인데요, 실제로 아래처럼 작동시키려 하면

readStream.pipe(writeStream).pipe(fileDestination);

//	Error [ERR_STREAM_CANNOT_PIPE]: Cannot pipe, not readable
//	at Writable.pipe

이런 에러가 발생합니다.

 

실무에서는 파이프 하나로 데이터를 처리하기 보다는, 여러 파이프로 파이프라인을 구성해 데이터를 가공할 일이 더 많습니다. 그럴때 쓰라고 있는 것이 바로 Transform입니다.

(제가 실제로 Duplex를 써본 적은 없어서 Transform 예시만 들겠습니다.)

* Duplex랑 Transform이랑 무슨 차이?

Duplex도 readable과 writable이 공존합니다. 다만 둘 사이에 큰 차이가 있는데, Duplex는 writable stream과 readable stream이 독단적으로 존재하고, 작동합니다. 반대로 Transform은 readable과 writable가 서로 영향을 끼치기 때문에 Transform 내부적으로 write한 걸 read할 수 있죠. 이에 따라 Duplex를 쓰려면 _read()와 _write() 메서드를 둘 다 꼭 명시해주어야 하지만, Transform은 _transform() 메서드로 퉁칠수(?) 있습니다.

참고 : https://stackoverflow.com/questions/18335499/nodejs-whats-the-difference-between-a-duplex-stream-and-a-transform-stream

 

Transform 예시

우선 파일을 생성하기 위해 필요한 fs.writeStream을 호출합니다.

const fs = require('fs');

const fileStream = fs.createWriteStream(
  __dirname + 'unique-codes.txt';
);

 

아까 작성한 코드를 살짝 수정해봅시다.

const { Readable, Transform } = require('stream');

const readStream = new Readable({
  objectMode: true,
  
  async read(size) {
    const transactions = await sequelize.query('SELECT * FROM transactions', {
      type: sequelize.QueryTypes.SELECT,
    });
    this.push(transactions);
    this.push(null);
  },
});

const transformStream = new Transform({
  readableObjectMode: true,
  writableObjectMode: true,

  transform(chunk, encoding, callback) {
    const data = { uniqueCode: chunk.id + Date.now() };
    this.push(JSON.stringify(data) + "\n");
    callback();
  },
});

 

이제 마지막으로 파이프라인에 태우면!

readStream.pipe(trasnformStream).pipe(fileStream);

짜잔

 

( + 추가 넉두리 )

이 포스트의 썸네일인 이 짤 있잖습니까

사실 아무리 stream 처리를 했다 해도 처리해야 할 대용량 데이터가 많으면 프로그램에 무리가 갑니다.

그래서 이런 류(통계)의 작업은 별도의 서버를 구성하거나, 비동기 프로세스로 넘겨버리기도 하죠.

혹은 데이터 스트림 요청이 들어올때마다 서버리스를 띄우는 방법도 있겠습니다.

무튼 토끼를 살리는 것이 중요하니까요...

 

관련글 더보기

댓글 영역