Function: Read File

How to Read a File

Overview

This function does @Oleg

Function Nodes

Description
The Read File function has one input and output.

Function Code

exports.handler = async({ readable, ...ev }, ctx) => {
  const { route } = ctx;

  if (!readable) throw new Error('event.readable is required');
  if (!ev.fileName) throw new Error('event.fileName is required');

  ev.streams = [ readable ];
  route(ev);
};
const zlib = require('zlib');

exports.handler = async(ev, ctx) => {
  const { route, params } = ctx;
  const { streams } = ev;

  if (/(gz|zip)$/i.test(params.key)) {
    streams.push(zlib.createGunzip())
  }

  route(ev);
};
const csv = require('fast-csv');
const splitRows = require('split2');
const stream = require('stream');

exports.handler = async(ev, ctx) => {
  const { route, log, params } = ctx;
  const { streams } = ev;

  const format = getFormat(params);

  if (format === 'csv') {
    const parseCsv = csv.parse(params.csv);
    parseCsv.on('error', err => log.warn(err));
    streams.push(parseCsv);
  
  } else {
    const parse = format === 'jsonl' ? JSON.parse : row => row;

    streams.push(splitRows());
    streams.push(new stream.Transform({
      objectMode: true,
      transform(ev, enc, done) {
        try {
          this.push(parse(ev));
        } catch (err) {
          log.warn(err.message);
        }
        done();
      }
    }));
  }

  route(ev);
};

const getFormat = params => {
  if (params.format === 'auto') {
    if (/\.csv/i.test(params.fileName)) return 'csv';
    else if (/\.json/i.test(params.fileName)) return 'jsonl';
    else return 'raw';
  }
  return params.format;
}
const stream = require('stream');

exports.handler = async({ streams, ...ev }, ctx) => {
  streams.push(route(ev, ctx));

  await new Promise((resolve, reject) => {
    stream.pipeline(streams, err => {
      if (err) reject(err);
      else resolve();
    });
  });
};

const route = (ev, { route }) => {
  ev = {
    ...ev,
    rowNum: 0
  };
  
  return new stream.Writable({
    objectMode: true,
    highWaterMark: 1000,

    async write(row, enc, done) {
      ev.row && await route({ ...ev });
      ev.row = row;
      ev.rowNum++;
      done();
    },

    async final(done) {
      await route({ ...ev, eof: true });
      done();
    }
  });
};

Function Schema

Explanation Oleg

{
  "type": "object",
  "properties": {}
}