Downloader.ts

import { EventEmitter } from 'node:events';
import { cpus } from 'node:os';
import pLimit, { LimitFunction } from 'p-limit';
import { HlsUtils } from './HLSUtils.js';
import FileService from './services/FileWriter.js';
import HttpClient, { HttpClientOptions } from './services/HttpClient.js';
import PlaylistParser from './services/PlaylistParser.js';

/**
 * @category Types
 * Downloader events
 */
interface DownloaderEvents {
  start: (data: { total: number; destination: string }) => void;
  progress: (data: SegmentDownloadedData) => void;
  error: (error: DownloadError) => void;
  end: (summary: DownloadSummary) => void;
}

/**
 * @category Types
 * Represents a failed segment or playlist download attempt.
 */
interface DownloadError {
  /** The full URL of the resource that failed to download. */
  url: string;

  /** The error class name (e.g., 'HTTPError', 'TimeoutError'). */
  name: string;

  /** The descriptive error message provided by the network client or parser. */
  message: string;
}

/**
 * @category Types
 * Metadata describing a successfully downloaded HLS media segment.
 */
interface SegmentDownloadedData {
  /**
   * The original segment URL as referenced in the HLS playlist (.m3u8).
   */
  url: string;

  /**
   * Absolute or relative local file system path where the segment
   * was saved. Undefined if the segment was kept in memory.
   */
  path?: string;

  /**
   * Total number of segments to download.
   */
  total: number;

  /**
   * The current sequence number of the processed item (1-based).
   */
  processed: number;
}

/**
 * @category Types
 * Information about a failed HLS segment download.
 */
interface SegmentDownloadErrorData extends DownloadError {
  /**
   * The original segment URL as referenced in the HLS playlist (.m3u8).
   */
  url: string;

  /**
   * The error name or type (e.g., network error, timeout, aborted).
   */
  name: string;

  /**
   * Human-readable description of the failure.
   */
  message: string;

  /**
   * The current sequence number of the processed item (1-based).
   */
  processed: number;
}
/**
 * @category Types
 * Configuration contract for {@link Downloader}.
 */
interface DownloaderOptions extends HttpClientOptions {
  /**
   * The absolute URL to the master or variant .m3u8 playlist.
   */
  playlistURL: string;
  /**
   * The local directory where files will be saved.
   * If omitted, the downloader runs in 'dry-run' mode.
   * @default ""
   */
  destination?: string;
  /**
   * Indicates whether existing files should be overwritten.
   * @default false
   */
  overwrite?: boolean;
  /**
   * Maximum number of simultaneous network requests.
   * @default (CPU_CORES - 1)
   */
  concurrency?: number;
  /**  Optional HTTP client configuration */
  [key: string]: any; // For kyOptions
}

/**
 * @category Types
 * Final execution report returned by {@link Downloader.startDownload}.
 */
interface DownloadSummary {
  /**  Total number of processed segments.*/
  total: number;

  /** An array of errors encountered during the process. */
  errors: DownloadError[];

  /**  Human-readable completion status.*/
  message: string;
}

type DownloadInProgress = Promise<
  PromiseSettledResult<void>[] | PromiseSettledResult<ReadableStream<Uint8Array<ArrayBufferLike>> | undefined>[]
>;

/**
 * @category
 * @author Nur Rony<pro.nmrony@gmail.com>
 * The main orchestrator service for managing HLS stream acquisition.
 */
class Downloader extends EventEmitter {
  private items: Set<string> = new Set();
  private playlistURL: string;
  private pool: LimitFunction;
  private http: HttpClient;
  private fileService: FileService;
  private errors: DownloadSummary['errors'] = [];
  private concurrency = 1;
  private processedCount = 0;
  private isDownloading = false;

  /**
   * Creates a new Downloader instance.
   * @author Nur Rony<pro.nmrony@gmail.com>
   * @param options - Configuration object.
   * @param options.playlistURL - URL of the master HLS playlist.
   * @param [options.destination] - Output directory for downloaded files.
   * @param [options.overwrite] - Whether to overwrite existing files.
   * @param [options.concurrency] - Maximum concurrent downloads.
   * @param [options.headers] - Custom HTTP headers to be sent with every request.
   * @param [options.timeout] - Request timeout in milliseconds (Default: 10000)
   * @param [options.retry] - Retry strategy for transient network failures
   * @param [options.proxy] - Proxy URL (e.g., http://proxy.corp.com:8080)
   * @param [options.noProxy] - List of hostnames that should bypass the proxy
   */
  constructor(private options: DownloaderOptions) {
    super(); // Initialize EventEmitter

    const {
      destination = '',
      playlistURL = '',
      overwrite = false,
      concurrency = Math.max(1, cpus().length - 1),
      headers = {},
      ...httpClientOptions
    } = options || {};

    this.playlistURL = playlistURL;
    this.pool = pLimit(concurrency);
    this.concurrency = concurrency;

    this.http = new HttpClient({ headers, ...httpClientOptions });
    this.http.setPrimaryOrigin(this.playlistURL);

    this.fileService = new FileService(destination, overwrite);

    this.items.add(this.playlistURL);
  }

  /**
   * Initiates the download lifecycle.
   * @author Nur Rony<pro.nmrony@gmail.com>
   * @returns - {Promise<DownloadSummary>} {@link DownloadSummary}
   */
  async startDownload(): Promise<DownloadSummary> {
    if (this.isDownloading) {
      throw new Error('Download already in progress on this instance.');
    }

    try {
      this.isDownloading = true;
      this.processedCount = 0;
      this.errors = [];

      HlsUtils.isValidUrl(this.playlistURL);

      const mainContent = await this.http.fetchText(this.playlistURL);
      const urls = PlaylistParser.parse(this.playlistURL, mainContent);
      urls.forEach(url => this.items.add(url));

      const variantPlaylists = urls.filter(u => PlaylistParser.isPlaylist(u));
      const variantResults = await Promise.allSettled(variantPlaylists.map(u => this.http.fetchText(u)));

      variantResults.forEach((res, index) => {
        if (res.status === 'fulfilled') {
          const subUrls = PlaylistParser.parse(variantPlaylists[index], res.value);
          subUrls.forEach(url => this.items.add(url));
        }
      });

      // Reset counter before starting
      this.processedCount = 0;

      // Signal the start of the process
      this.emit('start', { total: this.items.size, destination: this.options.destination });

      await this.processQueue();
      const summary = this.generateSummary();

      this.emit('end', summary);

      return summary;
    } catch (error: any) {
      this.handleError(this.playlistURL, error);
      return this.generateSummary();
    } finally {
      this.isDownloading = false;
    }
  }

  /**
   * [override description]
   * @param   event  {string|symbol} event to emit
   * @param    args    evemt arguments
   * @returns  event success or failed as boolen
   */
  public override emit(event: string | symbol, ...args: any[]): boolean {
    return super.emit(event, ...args);
  }

  /**
   * Processes all queued URLs using controlled concurrency.
   * @author Nur Rony<pro.nmrony@gmail.com
   * @returns Promise<PromiseSettledResult<void>[] | PromiseSettledResult<ReadableStream<Uint8Array<ArrayBufferLike>> | undefined>[]>
   */
  private async processQueue(): DownloadInProgress {
    const total = this.items.size;
    const itemsArray = Array.from(this.items);

    if (this.fileService['destination']) {
      if (!(await this.fileService.canWrite(this.playlistURL))) {
        throw new Error('Directory already exists and overwrite is disabled');
      }
      const tasks = itemsArray.map(url => this.pool(() => this.downloadFile(url)));
      return Promise.allSettled(tasks);
    }

    const fetchTasks = itemsArray.map(url =>
      this.pool(async () => {
        try {
          const stream = await this.http.getStream(url);
          // Increment and emit
          this.processedCount++;
          this.emit('progress', { url, total, processed: this.processedCount } as SegmentDownloadedData);
          return stream;
        } catch (error: any) {
          this.handleError(url, error);
        }
      })
    );

    return Promise.allSettled(fetchTasks);
  }

  /**
   * Downloads and saves a single file.
   * @author Nur Rony<pro.nmrony@gmail.com>
   * @param url - Resource URL to download.
   * @returns -
   */
  private async downloadFile(url: string): Promise<void> {
    try {
      const total = this.items.size;
      const stream = await this.http.getStream(url);
      const path = await this.fileService.prepareDirectory(url);
      await this.fileService.saveStream(stream, path);
      // Increment and emit
      this.processedCount++;
      // Emit progress event
      this.emit('progress', {
        url,
        path,
        total,
        processed: this.processedCount,
      } as SegmentDownloadedData);
    } catch (error: any) {
      this.handleError(url, error);
    }
  }

  /**
   * Handles and aggregates download errors.
   * @author Nur Rony<pro.nmrony@gmail.com>
   * @param url - URL that caused the error.
   * @param error - The thrown error.
   */
  private handleError(url: string, error: Error): void {
    // We increment even on error so the progress bar is smooth
    this.processedCount++;
    const errorData: SegmentDownloadErrorData = {
      url,
      name: error.name,
      message: error.message,
      processed: this.processedCount,
    };
    this.errors.push(errorData);
    this.emit('error', errorData);
  }

  /**
   * Generates a structured summary of the download operation.
   * @author Nur Rony<pro.nmrony@gmail.com>
   * @returns - {@link DownloadSummary}
   */
  private generateSummary(): DownloadSummary {
    return {
      errors: this.errors,
      total: this.items.size,
      message: this.errors.length > 0 ? 'Download ended with errors' : 'Downloaded successfully',
    };
  }
}

/**
 * @author Nur Rony<pro.nmrony@gmail.com>
 * @classdesc Downloads or fetch HLS Playlist and its items
 */
export default Downloader;

/**
 * @author Nur Rony<pro.nmrony@gmail.com>
 * Types for Downloader
 */
export type {
  DownloaderEvents,
  DownloaderOptions,
  DownloadError,
  DownloadSummary,
  SegmentDownloadedData,
  SegmentDownloadErrorData,
};