Navigation

Node.js

Node.js Streams: Handle Large Data Sets Efficiently (2025)

Master Node.js streams to process large datasets without memory issues. Learn readable, writable, transform streams with practical examples and performance tips.

Table Of Contents

Introduction

Processing large files or datasets in Node.js applications often leads to a critical challenge: memory consumption. Loading a 2GB CSV file or streaming video content can quickly exhaust available RAM, causing applications to crash or become unresponsive. Traditional approaches that load entire datasets into memory simply don't scale for enterprise-level applications.

This is where Node.js streams shine as a game-changing solution. Streams allow you to process data piece by piece, maintaining constant memory usage regardless of the total data size. Instead of loading gigabytes of data into memory, streams process small chunks sequentially, making it possible to handle virtually unlimited data sizes on modest hardware.

In this comprehensive guide, you'll discover how to leverage Node.js streams to build memory-efficient applications that process large datasets with ease. From basic stream concepts to advanced pipeline techniques, you'll learn practical strategies that can transform your data processing capabilities and application performance.

Understanding Node.js Streams

What Are Streams?

Node.js streams are objects that enable reading or writing data continuously in small chunks rather than loading everything into memory at once. Think of streams like a water pipe: instead of filling a massive container and then emptying it, water flows continuously through the pipe in manageable amounts.

Streams implement the EventEmitter interface, making them inherently asynchronous and event-driven. This design allows for non-blocking data processing that maintains application responsiveness even when handling massive datasets.

The Memory Problem: Before and After Streams

Traditional Approach (Problematic):

const fs = require('fs');

// ❌ Loads entire file into memory
fs.readFile('massive-log-file.txt', 'utf8', (err, data) => {
  if (err) throw err;
  
  // Process 5GB file - causes memory issues
  const lines = data.split('\n');
  lines.forEach(line => processLogLine(line));
});

Stream Approach (Efficient):

const fs = require('fs');
const readline = require('readline');

// ✅ Processes file line by line with constant memory usage
const fileStream = fs.createReadStream('massive-log-file.txt');
const rl = readline.createInterface({
  input: fileStream,
  crlfDelay: Infinity
});

rl.on('line', (line) => {
  processLogLine(line); // Process one line at a time
});

rl.on('close', () => {
  console.log('File processing complete');
});

Core Stream Types

Node.js provides four fundamental stream types:

  1. Readable Streams: Sources of data you can read from
  2. Writable Streams: Destinations where you can write data
  3. Duplex Streams: Both readable and writable
  4. Transform Streams: Duplex streams that modify data as it passes through

Readable Streams: Consuming Data Efficiently

Creating Custom Readable Streams

Readable streams provide data that can be consumed by other parts of your application:

const { Readable } = require('stream');

class NumberGenerator extends Readable {
  constructor(options) {
    super(options);
    this.current = 0;
    this.max = 1000000; // Generate 1 million numbers
  }

  _read() {
    if (this.current < this.max) {
      // Push data in chunks
      this.push(`${this.current}\n`);
      this.current++;
    } else {
      // Signal end of data
      this.push(null);
    }
  }
}

// Usage
const numberStream = new NumberGenerator();
let count = 0;

numberStream.on('data', (chunk) => {
  count++;
  if (count % 100000 === 0) {
    console.log(`Processed ${count} numbers`);
  }
});

numberStream.on('end', () => {
  console.log('Number generation complete');
});

File Reading with Streams

Process large files efficiently using readable streams:

const fs = require('fs');
const path = require('path');

async function processLargeFile(filePath) {
  return new Promise((resolve, reject) => {
    const readStream = fs.createReadStream(filePath, {
      encoding: 'utf8',
      highWaterMark: 64 * 1024 // 64KB chunks
    });

    let processedBytes = 0;
    let lineCount = 0;

    readStream.on('data', (chunk) => {
      processedBytes += Buffer.byteLength(chunk, 'utf8');
      lineCount += (chunk.match(/\n/g) || []).length;
      
      // Process chunk without loading entire file
      console.log(`Processed ${processedBytes} bytes, ${lineCount} lines`);
    });

    readStream.on('end', () => {
      console.log(`File processing complete: ${lineCount} total lines`);
      resolve(lineCount);
    });

    readStream.on('error', reject);
  });
}

// Process a large log file
processLargeFile('./massive-log.txt')
  .then(lines => console.log(`Total lines processed: ${lines}`))
  .catch(err => console.error('Processing failed:', err));

Writable Streams: Efficient Data Output

Creating Custom Writable Streams

Writable streams allow you to send data to destinations like files, databases, or APIs:

const { Writable } = require('stream');
const fs = require('fs');

class DatabaseWriter extends Writable {
  constructor(options) {
    super(options);
    this.batchSize = 1000;
    this.batch = [];
    this.totalRecords = 0;
  }

  _write(chunk, encoding, callback) {
    try {
      // Parse incoming data
      const record = JSON.parse(chunk.toString());
      this.batch.push(record);

      // Write in batches for efficiency
      if (this.batch.length >= this.batchSize) {
        this.flushBatch();
      }

      callback();
    } catch (error) {
      callback(error);
    }
  }

  _final(callback) {
    // Flush remaining records
    if (this.batch.length > 0) {
      this.flushBatch();
    }
    callback();
  }

  flushBatch() {
    // Simulate database write
    console.log(`Writing batch of ${this.batch.length} records to database`);
    this.totalRecords += this.batch.length;
    
    // In real implementation, write to database here
    // await database.insertMany(this.batch);
    
    this.batch = [];
  }
}

// Usage example
const dbWriter = new DatabaseWriter();

dbWriter.on('finish', () => {
  console.log(`Total records written: ${dbWriter.totalRecords}`);
});

// Simulate streaming data to database
for (let i = 0; i < 10000; i++) {
  const record = JSON.stringify({ id: i, value: Math.random() }) + '\n';
  dbWriter.write(record);
}

dbWriter.end();

File Writing with Backpressure Handling

Handle backpressure properly when writing large amounts of data:

const fs = require('fs');

async function writeDataStream(outputPath, dataGenerator) {
  return new Promise((resolve, reject) => {
    const writeStream = fs.createWriteStream(outputPath);
    let index = 0;

    const writeNext = () => {
      let canContinue = true;

      // Write data while stream can handle it
      while (canContinue && index < 1000000) {
        const data = `Record ${index}: ${dataGenerator(index)}\n`;
        
        if (index === 999999) {
          // Last write
          writeStream.write(data, resolve);
        } else {
          // Check if internal buffer is full
          canContinue = writeStream.write(data);
        }
        
        index++;
      }

      if (index < 1000000 && !canContinue) {
        // Wait for drain event when buffer is full
        writeStream.once('drain', writeNext);
      }
    };

    writeStream.on('error', reject);
    writeNext();
  });
}

// Usage
function generateData(index) {
  return `Generated data for record ${index} - ${Date.now()}`;
}

writeDataStream('./output.txt', generateData)
  .then(() => console.log('Data writing complete'))
  .catch(err => console.error('Write failed:', err));

Transform Streams: Data Processing Powerhouse

Building Data Processing Pipelines

Transform streams are ideal for processing data as it flows through your application:

const { Transform } = require('stream');
const fs = require('fs');

class CSVProcessor extends Transform {
  constructor(options) {
    super({ objectMode: true, ...options });
    this.headers = null;
    this.lineNumber = 0;
  }

  _transform(chunk, encoding, callback) {
    const lines = chunk.toString().split('\n');
    
    for (let line of lines) {
      if (!line.trim()) continue;
      
      this.lineNumber++;
      
      if (!this.headers) {
        // First line contains headers
        this.headers = line.split(',').map(h => h.trim());
        continue;
      }

      try {
        const values = line.split(',').map(v => v.trim());
        const record = {};
        
        this.headers.forEach((header, index) => {
          record[header] = values[index] || '';
        });

        // Transform the record
        record.processedAt = new Date().toISOString();
        record.lineNumber = this.lineNumber;

        // Push transformed data
        this.push(JSON.stringify(record) + '\n');
      } catch (error) {
        this.emit('error', new Error(`Parse error at line ${this.lineNumber}: ${error.message}`));
      }
    }

    callback();
  }
}

class DataValidator extends Transform {
  constructor(options) {
    super(options);
    this.validRecords = 0;
    this.invalidRecords = 0;
  }

  _transform(chunk, encoding, callback) {
    try {
      const record = JSON.parse(chunk.toString());
      
      // Validate required fields
      if (this.isValidRecord(record)) {
        record.validated = true;
        this.validRecords++;
        this.push(JSON.stringify(record) + '\n');
      } else {
        this.invalidRecords++;
        console.warn(`Invalid record at line ${record.lineNumber}`);
      }
    } catch (error) {
      this.invalidRecords++;
      console.error('JSON parse error:', error.message);
    }

    callback();
  }

  isValidRecord(record) {
    // Add your validation logic here
    return record.name && record.email && record.email.includes('@');
  }

  _flush(callback) {
    console.log(`Validation complete: ${this.validRecords} valid, ${this.invalidRecords} invalid`);
    callback();
  }
}

// Create processing pipeline
const csvProcessor = new CSVProcessor();
const validator = new DataValidator();

fs.createReadStream('large-dataset.csv')
  .pipe(csvProcessor)
  .pipe(validator)
  .pipe(fs.createWriteStream('processed-data.json'))
  .on('finish', () => {
    console.log('Data processing pipeline complete');
  })
  .on('error', (err) => {
    console.error('Pipeline error:', err);
  });

Advanced Stream Techniques

Stream Pipelines with Error Handling

Use the pipeline utility for robust stream processing:

const { pipeline } = require('stream');
const { promisify } = require('util');
const fs = require('fs');
const zlib = require('zlib');

const pipelineAsync = promisify(pipeline);

class LogAnalyzer extends Transform {
  constructor(options) {
    super(options);
    this.errorCount = 0;
    this.warningCount = 0;
    this.infoCount = 0;
  }

  _transform(chunk, encoding, callback) {
    const lines = chunk.toString().split('\n');
    
    for (let line of lines) {
      if (!line.trim()) continue;
      
      const logLevel = this.extractLogLevel(line);
      
      switch (logLevel) {
        case 'ERROR':
          this.errorCount++;
          break;
        case 'WARNING':
          this.warningCount++;
          break;
        case 'INFO':
          this.infoCount++;
          break;
      }

      // Only pass through error and warning logs
      if (logLevel === 'ERROR' || logLevel === 'WARNING') {
        this.push(line + '\n');
      }
    }

    callback();
  }

  extractLogLevel(line) {
    const match = line.match(/\[(ERROR|WARNING|INFO)\]/);
    return match ? match[1] : 'UNKNOWN';
  }

  _flush(callback) {
    const summary = {
      errors: this.errorCount,
      warnings: this.warningCount,
      info: this.infoCount,
      total: this.errorCount + this.warningCount + this.infoCount
    };
    
    this.push('\n--- LOG SUMMARY ---\n');
    this.push(JSON.stringify(summary, null, 2));
    callback();
  }
}

async function processLogFile(inputPath, outputPath) {
  try {
    await pipelineAsync(
      fs.createReadStream(inputPath),
      new LogAnalyzer(),
      zlib.createGzip(), // Compress output
      fs.createWriteStream(outputPath + '.gz')
    );
    
    console.log('Log processing completed successfully');
  } catch (error) {
    console.error('Pipeline failed:', error);
    throw error;
  }
}

// Usage
processLogFile('./app.log', './filtered-logs.txt')
  .then(() => console.log('Log analysis complete'))
  .catch(err => console.error('Failed to process logs:', err));

Memory-Efficient Data Aggregation

Perform aggregations on large datasets without loading everything into memory:

const { Transform } = require('stream');
const readline = require('readline');
const fs = require('fs');

class StreamingAggregator extends Transform {
  constructor(options) {
    super({ objectMode: true, ...options });
    this.aggregates = {
      totalSales: 0,
      salesByRegion: {},
      salesByMonth: {},
      recordCount: 0
    };
  }

  _transform(record, encoding, callback) {
    try {
      this.recordCount++;
      
      // Parse sales data
      const amount = parseFloat(record.amount) || 0;
      const region = record.region || 'Unknown';
      const date = new Date(record.date);
      const month = `${date.getFullYear()}-${String(date.getMonth() + 1).padStart(2, '0')}`;

      // Update aggregates
      this.aggregates.totalSales += amount;
      
      this.aggregates.salesByRegion[region] = 
        (this.aggregates.salesByRegion[region] || 0) + amount;
      
      this.aggregates.salesByMonth[month] = 
        (this.aggregates.salesByMonth[month] || 0) + amount;

      // Emit progress every 10,000 records
      if (this.recordCount % 10000 === 0) {
        console.log(`Processed ${this.recordCount} records, Total sales: $${this.aggregates.totalSales.toFixed(2)}`);
      }

      callback();
    } catch (error) {
      callback(error);
    }
  }

  _flush(callback) {
    // Calculate final statistics
    const topRegions = Object.entries(this.aggregates.salesByRegion)
      .sort(([,a], [,b]) => b - a)
      .slice(0, 5);

    const result = {
      summary: {
        totalRecords: this.recordCount,
        totalSales: this.aggregates.totalSales,
        averageSale: this.aggregates.totalSales / this.recordCount
      },
      topRegions: topRegions.map(([region, sales]) => ({ region, sales })),
      monthlySales: this.aggregates.salesByMonth
    };

    this.push(JSON.stringify(result, null, 2));
    callback();
  }
}

async function analyzeSalesData(filePath) {
  return new Promise((resolve, reject) => {
    const fileStream = fs.createReadStream(filePath);
    const rl = readline.createInterface({
      input: fileStream,
      crlfDelay: Infinity
    });

    const aggregator = new StreamingAggregator();
    let headers = null;

    rl.on('line', (line) => {
      if (!headers) {
        headers = line.split(',').map(h => h.trim());
        return;
      }

      const values = line.split(',').map(v => v.trim());
      const record = {};
      
      headers.forEach((header, index) => {
        record[header] = values[index] || '';
      });

      aggregator.write(record);
    });

    rl.on('close', () => {
      aggregator.end();
    });

    aggregator.on('data', (result) => {
      resolve(JSON.parse(result.toString()));
    });

    aggregator.on('error', reject);
    rl.on('error', reject);
  });
}

// Analyze large sales dataset
analyzeSalesData('./sales-data.csv')
  .then(results => {
    console.log('Sales Analysis Results:');
    console.log(`Total Sales: $${results.summary.totalSales.toFixed(2)}`);
    console.log(`Average Sale: $${results.summary.averageSale.toFixed(2)}`);
    console.log('Top Regions:', results.topRegions);
  })
  .catch(err => console.error('Analysis failed:', err));

Performance Optimization and Memory Management

Optimizing Stream Performance

1. Choose Appropriate Buffer Sizes:

const fs = require('fs');

// Optimize for different use cases
const streamConfigs = {
  smallFiles: { highWaterMark: 16 * 1024 },     // 16KB
  mediumFiles: { highWaterMark: 64 * 1024 },    // 64KB
  largeFiles: { highWaterMark: 256 * 1024 },    // 256KB
  networkStream: { highWaterMark: 8 * 1024 }    // 8KB for network
};

function createOptimizedReadStream(filePath, fileSize) {
  let config;
  
  if (fileSize < 1024 * 1024) {          // < 1MB
    config = streamConfigs.smallFiles;
  } else if (fileSize < 100 * 1024 * 1024) {  // < 100MB
    config = streamConfigs.mediumFiles;
  } else {
    config = streamConfigs.largeFiles;
  }

  return fs.createReadStream(filePath, config);
}

2. Implement Backpressure Monitoring:

class MonitoredWriteStream extends Writable {
  constructor(options) {
    super(options);
    this.bytesWritten = 0;
    this.backpressureEvents = 0;
    this.startTime = Date.now();
  }

  _write(chunk, encoding, callback) {
    this.bytesWritten += chunk.length;
    
    // Simulate processing time
    setImmediate(() => {
      if (this.bytesWritten % (1024 * 1024) === 0) {
        this.reportProgress();
      }
      callback();
    });
  }

  reportProgress() {
    const elapsed = (Date.now() - this.startTime) / 1000;
    const throughput = (this.bytesWritten / 1024 / 1024) / elapsed;
    
    console.log(`Progress: ${(this.bytesWritten / 1024 / 1024).toFixed(2)}MB written`);
    console.log(`Throughput: ${throughput.toFixed(2)}MB/s`);
    console.log(`Backpressure events: ${this.backpressureEvents}`);
  }
}

3. Memory Usage Monitoring:

const { Transform } = require('stream');

class MemoryMonitorTransform extends Transform {
  constructor(options) {
    super(options);
    this.processedChunks = 0;
    this.monitorInterval = setInterval(() => {
      this.logMemoryUsage();
    }, 5000); // Check every 5 seconds
  }

  _transform(chunk, encoding, callback) {
    this.processedChunks++;
    
    // Your data processing logic here
    this.push(chunk);
    callback();
  }

  _flush(callback) {
    clearInterval(this.monitorInterval);
    this.logMemoryUsage();
    console.log(`Processing complete. Total chunks: ${this.processedChunks}`);
    callback();
  }

  logMemoryUsage() {
    const usage = process.memoryUsage();
    console.log('Memory Usage:');
    console.log(`  RSS: ${(usage.rss / 1024 / 1024).toFixed(2)}MB`);
    console.log(`  Heap Used: ${(usage.heapUsed / 1024 / 1024).toFixed(2)}MB`);
    console.log(`  Heap Total: ${(usage.heapTotal / 1024 / 1024).toFixed(2)}MB`);
    console.log(`  External: ${(usage.external / 1024 / 1024).toFixed(2)}MB`);
  }
}

Real-World Use Cases and Applications

CSV to JSON Converter for Large Datasets

const fs = require('fs');
const { Transform, pipeline } = require('stream');
const { promisify } = require('util');

const pipelineAsync = promisify(pipeline);

class CSVToJSONTransform extends Transform {
  constructor(options = {}) {
    super({ objectMode: true, ...options });
    this.headers = null;
    this.currentLine = '';
    this.lineCount = 0;
    this.batchSize = options.batchSize || 1000;
    this.batch = [];
  }

  _transform(chunk, encoding, callback) {
    const data = this.currentLine + chunk.toString();
    const lines = data.split('\n');
    
    // Keep the last incomplete line
    this.currentLine = lines.pop() || '';

    for (const line of lines) {
      if (!line.trim()) continue;
      
      this.lineCount++;
      
      if (!this.headers) {
        this.headers = this.parseCSVLine(line);
        continue;
      }

      try {
        const values = this.parseCSVLine(line);
        const record = this.createRecord(values);
        this.batch.push(record);

        if (this.batch.length >= this.batchSize) {
          this.flushBatch();
        }
      } catch (error) {
        this.emit('error', new Error(`Parse error at line ${this.lineCount}: ${error.message}`));
        return;
      }
    }

    callback();
  }

  _flush(callback) {
    // Process any remaining line
    if (this.currentLine.trim()) {
      try {
        const values = this.parseCSVLine(this.currentLine);
        const record = this.createRecord(values);
        this.batch.push(record);
      } catch (error) {
        console.warn(`Warning: Could not parse final line: ${error.message}`);
      }
    }

    // Flush remaining batch
    if (this.batch.length > 0) {
      this.flushBatch();
    }

    callback();
  }

  parseCSVLine(line) {
    const result = [];
    let current = '';
    let inQuotes = false;

    for (let i = 0; i < line.length; i++) {
      const char = line[i];
      
      if (char === '"') {
        inQuotes = !inQuotes;
      } else if (char === ',' && !inQuotes) {
        result.push(current.trim());
        current = '';
      } else {
        current += char;
      }
    }
    
    result.push(current.trim());
    return result;
  }

  createRecord(values) {
    const record = {};
    this.headers.forEach((header, index) => {
      record[header] = values[index] || null;
    });
    return record;
  }

  flushBatch() {
    this.push(JSON.stringify(this.batch) + '\n');
    this.batch = [];
  }
}

async function convertCSVToJSON(inputFile, outputFile) {
  console.log(`Converting ${inputFile} to ${outputFile}...`);
  
  try {
    await pipelineAsync(
      fs.createReadStream(inputFile),
      new CSVToJSONTransform({ batchSize: 5000 }),
      fs.createWriteStream(outputFile)
    );
    
    console.log('Conversion completed successfully');
  } catch (error) {
    console.error('Conversion failed:', error);
    throw error;
  }
}

// Usage
convertCSVToJSON('./large-dataset.csv', './converted-data.json')
  .then(() => console.log('CSV to JSON conversion complete'))
  .catch(err => console.error('Conversion error:', err));

Log File Analysis and Processing

const fs = require('fs');
const { Transform } = require('stream');
const readline = require('readline');

class LogProcessor extends Transform {
  constructor(options = {}) {
    super({ objectMode: true, ...options });
    this.filters = options.filters || {};
    this.stats = {
      totalLines: 0,
      errorLines: 0,
      warningLines: 0,
      filteredLines: 0,
      uniqueIPs: new Set(),
      statusCodes: {},
      hourlyStats: {}
    };
  }

  _transform(line, encoding, callback) {
    this.stats.totalLines++;
    
    try {
      const logEntry = this.parseLogLine(line.toString());
      
      if (!logEntry) {
        callback();
        return;
      }

      // Apply filters
      if (this.shouldFilterOut(logEntry)) {
        this.stats.filteredLines++;
        callback();
        return;
      }

      // Update statistics
      this.updateStats(logEntry);

      // Classify log level
      if (logEntry.message.includes('ERROR')) {
        this.stats.errorLines++;
        logEntry.level = 'ERROR';
      } else if (logEntry.message.includes('WARNING')) {
        this.stats.warningLines++;
        logEntry.level = 'WARNING';
      } else {
        logEntry.level = 'INFO';
      }

      // Enrich with additional data
      logEntry.processed_at = new Date().toISOString();
      
      this.push(JSON.stringify(logEntry) + '\n');
    } catch (error) {
      console.warn(`Failed to parse log line: ${error.message}`);
    }

    callback();
  }

  parseLogLine(line) {
    // Parse common log format: IP - - [timestamp] "method path protocol" status size "referer" "user-agent"
    const logPattern = /^(\S+) \S+ \S+ \[([^\]]+)\] "([^"]*)" (\d+) (\d+|-) "([^"]*)" "([^"]*)"/;
    const match = line.match(logPattern);
    
    if (!match) {
      // Try parsing as simple timestamped log
      const simplePattern = /^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) (.+)$/;
      const simpleMatch = line.match(simplePattern);
      
      if (simpleMatch) {
        return {
          timestamp: simpleMatch[1],
          message: simpleMatch[2],
          ip: null,
          method: null,
          path: null,
          status: null,
          size: null
        };
      }
      
      return null;
    }

    return {
      ip: match[1],
      timestamp: match[2],
      request: match[3],
      method: match[3].split(' ')[0],
      path: match[3].split(' ')[1],
      status: parseInt(match[4]),
      size: match[5] === '-' ? 0 : parseInt(match[5]),
      referer: match[6],
      userAgent: match[7],
      message: line
    };
  }

  shouldFilterOut(logEntry) {
    if (this.filters.excludeIPs && this.filters.excludeIPs.includes(logEntry.ip)) {
      return true;
    }
    
    if (this.filters.onlyErrors && logEntry.status < 400) {
      return true;
    }
    
    if (this.filters.excludePaths) {
      for (const path of this.filters.excludePaths) {
        if (logEntry.path && logEntry.path.includes(path)) {
          return true;
        }
      }
    }
    
    return false;
  }

  updateStats(logEntry) {
    if (logEntry.ip) {
      this.stats.uniqueIPs.add(logEntry.ip);
    }
    
    if (logEntry.status) {
      this.stats.statusCodes[logEntry.status] = 
        (this.stats.statusCodes[logEntry.status] || 0) + 1;
    }
    
    if (logEntry.timestamp) {
      const hour = logEntry.timestamp.substring(0, 13); // YYYY-MM-DD HH
      this.stats.hourlyStats[hour] = (this.stats.hourlyStats[hour] || 0) + 1;
    }
  }

  _flush(callback) {
    // Generate final report
    const report = {
      summary: {
        totalLines: this.stats.totalLines,
        errorLines: this.stats.errorLines,
        warningLines: this.stats.warningLines,
        filteredLines: this.stats.filteredLines,
        uniqueIPs: this.stats.uniqueIPs.size,
        processedAt: new Date().toISOString()
      },
      statusCodeDistribution: this.stats.statusCodes,
      hourlyActivity: this.stats.hourlyStats,
      topIPs: this.getTopIPs()
    };

    this.push('\n--- PROCESSING REPORT ---\n');
    this.push(JSON.stringify(report, null, 2));
    
    callback();
  }

  getTopIPs() {
    // This is a simplified version - in real implementation, 
    // you'd track IP frequencies during processing
    return Array.from(this.stats.uniqueIPs).slice(0, 10);
  }
}

async function processLogFile(inputPath, outputPath, filters = {}) {
  return new Promise((resolve, reject) => {
    const processor = new LogProcessor({ filters });
    const writeStream = fs.createWriteStream(outputPath);
    
    const rl = readline.createInterface({
      input: fs.createReadStream(inputPath),
      crlfDelay: Infinity
    });

    rl.on('line', (line) => {
      processor.write(line);
    });

    rl.on('close', () => {
      processor.end();
    });

    processor.pipe(writeStream);

    writeStream.on('finish', () => {
      console.log('Log processing completed');
      resolve();
    });

    processor.on('error', reject);
    writeStream.on('error', reject);
    rl.on('error', reject);
  });
}

// Usage example
const filters = {
  excludeIPs: ['127.0.0.1', '::1'],
  excludePaths: ['/health', '/metrics'],
  onlyErrors: false
};

processLogFile('./access.log', './processed-logs.json', filters)
  .then(() => console.log('Log analysis complete'))
  .catch(err => console.error('Processing failed:', err));

Error Handling and Debugging

Comprehensive Error Handling Strategies

const { Transform, pipeline } = require('stream');
const { promisify } = require('util');
const fs = require('fs');

const pipelineAsync = promisify(pipeline);

class RobustDataProcessor extends Transform {
  constructor(options = {}) {
    super({ objectMode: true, ...options });
    this.errorCount = 0;
    this.maxErrors = options.maxErrors || 100;
    this.logErrors = options.logErrors !== false;
    this.errorLog = options.errorLog || './processing-errors.log';
  }

  _transform(data, encoding, callback) {
    try {
      const processed = this.processData(data);
      
      if (processed) {
        this.push(processed);
      }
      
      callback();
    } catch (error) {
      this.handleError(error, data, callback);
    }
  }

  processData(data) {
    // Your data processing logic here
    if (typeof data === 'string') {
      try {
        return JSON.parse(data);
      } catch (e) {
        throw new Error(`Invalid JSON: ${e.message}`);
      }
    }
    return data;
  }

  handleError(error, data, callback) {
    this.errorCount++;
    
    const errorRecord = {
      timestamp: new Date().toISOString(),
      error: error.message,
      data: data.toString().substring(0, 500), // Limit data size in log
      stack: error.stack
    };

    if (this.logErrors) {
      fs.appendFileSync(this.errorLog, JSON.stringify(errorRecord) + '\n');
    }

    console.warn(`Processing error ${this.errorCount}/${this.maxErrors}: ${error.message}`);

    if (this.errorCount >= this.maxErrors) {
      callback(new Error(`Too many errors (${this.errorCount}). Processing aborted.`));
    } else {
      // Continue processing despite error
      callback();
    }
  }
}

class CircuitBreaker extends Transform {
  constructor(options = {}) {
    super(options);
    this.failureThreshold = options.failureThreshold || 5;
    this.resetTimeout = options.resetTimeout || 30000; // 30 seconds
    this.state = 'CLOSED'; // CLOSED, OPEN, HALF_OPEN
    this.failureCount = 0;
    this.nextAttempt = null;
  }

  _transform(chunk, encoding, callback) {
    if (this.state === 'OPEN') {
      if (Date.now() > this.nextAttempt) {
        this.state = 'HALF_OPEN';
        console.log('Circuit breaker: Attempting to close');
      } else {
        callback(new Error('Circuit breaker is OPEN'));
        return;
      }
    }

    try {
      // Simulate processing that might fail
      this.processChunk(chunk);
      
      if (this.state === 'HALF_OPEN') {
        this.state = 'CLOSED';
        this.failureCount = 0;
        console.log('Circuit breaker: CLOSED');
      }
      
      this.push(chunk);
      callback();
    } catch (error) {
      this.recordFailure();
      callback(error);
    }
  }

  processChunk(chunk) {
    // Simulate processing logic that might fail
    if (Math.random() < 0.1) { // 10% failure rate for demo
      throw new Error('Processing failed');
    }
  }

  recordFailure() {
    this.failureCount++;
    
    if (this.failureCount >= this.failureThreshold) {
      this.state = 'OPEN';
      this.nextAttempt = Date.now() + this.resetTimeout;
      console.log(`Circuit breaker: OPEN after ${this.failureCount} failures`);
    }
  }
}

// Usage with comprehensive error handling
async function robustProcessing(inputFile, outputFile) {
  try {
    await pipelineAsync(
      fs.createReadStream(inputFile),
      new RobustDataProcessor({ 
        maxErrors: 50,
        logErrors: true,
        errorLog: './errors.log'
      }),
      new CircuitBreaker({
        failureThreshold: 3,
        resetTimeout: 10000
      }),
      fs.createWriteStream(outputFile)
    );
    
    console.log('Processing completed successfully');
  } catch (error) {
    console.error('Pipeline failed:', error.message);
    
    // Implement retry logic
    if (error.message.includes('Circuit breaker')) {
      console.log('Retrying after circuit breaker timeout...');
      setTimeout(() => robustProcessing(inputFile, outputFile), 15000);
    } else {
      throw error;
    }
  }
}

Frequently Asked Questions

When should I use streams instead of loading data into memory?

Use streams when processing datasets larger than available RAM, when you need consistent memory usage regardless of data size, or when dealing with real-time data feeds. Generally, if your data exceeds 100MB or you're processing multiple files concurrently, streams provide significant benefits. Streams are essential for server applications that must handle multiple requests simultaneously without memory exhaustion.

How do I handle backpressure in Node.js streams?

Backpressure occurs when data is written faster than it can be processed. Handle it by checking the return value of stream.write() - if it returns false, pause writing and wait for the 'drain' event. Use the pipeline utility for automatic backpressure handling, or implement custom logic by monitoring the stream's internal buffer size through the highWaterMark option.

What's the difference between objectMode and regular streams?

Regular streams work with strings and buffers, processing data as binary or text chunks. Object mode streams handle JavaScript objects, allowing you to pass complex data structures through the pipeline. Use objectMode when transforming structured data like JSON objects, database records, or parsed CSV rows. Set objectMode: true in the stream constructor options.

How can I debug memory leaks in stream processing?

Monitor memory usage with process.memoryUsage(), implement regular garbage collection checks, and use tools like clinic.js or Node.js built-in --inspect flag. Common causes include not properly closing streams, keeping references to processed data, or accumulating data in transform streams. Always call stream.destroy() on errors and avoid storing large amounts of data in instance variables.

Can I use streams with async/await patterns?

Yes, use util.promisify(stream.pipeline) to convert stream pipelines into promises, or wrap streams in promises manually. The stream/promises module (Node.js 15+) provides promise-based versions of stream utilities. You can also use async iterators with readable streams using for await (const chunk of stream) syntax for cleaner async processing.

How do I optimize stream performance for large files?

Tune the highWaterMark option based on your use case - larger values for big files (256KB+), smaller for network streams (8-16KB). Process data in batches within transform streams rather than item-by-item. Use object pooling for frequently created objects, enable compression for network streams, and consider using worker threads for CPU-intensive transformations to avoid blocking the event loop.

Conclusion

Node.js streams represent a fundamental shift from memory-intensive data processing to efficient, scalable solutions that handle datasets of any size. Throughout this guide, we've explored how streams transform the way you approach large data processing challenges.

Memory Efficiency: Streams maintain constant memory usage regardless of data size, enabling applications to process terabytes of data on modest hardware. This approach prevents memory exhaustion and ensures consistent performance across different dataset sizes.

Real-World Applications: From CSV processing and log analysis to database operations and API data transformation, streams provide practical solutions for common enterprise challenges. The pipeline approach creates maintainable, composable data processing workflows that scale with your application needs.

Performance Optimization: Proper stream configuration, backpressure handling, and error management strategies ensure robust, production-ready applications. Understanding buffer sizes, object mode, and async patterns maximizes throughput while maintaining code clarity.

Advanced Techniques: Custom transform streams, circuit breakers, and comprehensive error handling create resilient data processing pipelines. These patterns handle real-world complexities like malformed data, network failures, and processing bottlenecks.

Ready to implement streams in your Node.js applications? Start by identifying your largest data processing operations and gradually replace memory-intensive approaches with stream-based solutions. Share your stream implementation experiences in the comments below - what challenges did you overcome, and how much did memory usage improve in your applications?

For more advanced Node.js performance techniques and scalability patterns, subscribe to our newsletter and join our community of developers building efficient, production-ready applications.

Share this article

Add Comment

No comments yet. Be the first to comment!

More from Node.js