import { EventEmitter } from 'node:events';
import { cpus } from 'node:os';
import pLimit, { LimitFunction } from 'p-limit';
import { HlsUtils } from './HLSUtils.js';
import FileService from './services/FileWriter.js';
import HttpClient, { HttpClientOptions } from './services/HttpClient.js';
import PlaylistParser from './services/PlaylistParser.js';
/**
* @category Types
* Downloader events
*/
interface DownloaderEvents {
start: (data: { total: number; destination: string }) => void;
progress: (data: SegmentDownloadedData) => void;
error: (error: DownloadError) => void;
end: (summary: DownloadSummary) => void;
}
/**
* @category Types
* Represents a failed segment or playlist download attempt.
*/
interface DownloadError {
/** The full URL of the resource that failed to download. */
url: string;
/** The error class name (e.g., 'HTTPError', 'TimeoutError'). */
name: string;
/** The descriptive error message provided by the network client or parser. */
message: string;
}
/**
* @category Types
* Metadata describing a successfully downloaded HLS media segment.
*/
interface SegmentDownloadedData {
/**
* The original segment URL as referenced in the HLS playlist (.m3u8).
*/
url: string;
/**
* Absolute or relative local file system path where the segment
* was saved. Undefined if the segment was kept in memory.
*/
path?: string;
/**
* Total number of segments to download.
*/
total: number;
/**
* The current sequence number of the processed item (1-based).
*/
processed: number;
}
/**
* @category Types
* Information about a failed HLS segment download.
*/
interface SegmentDownloadErrorData extends DownloadError {
/**
* The original segment URL as referenced in the HLS playlist (.m3u8).
*/
url: string;
/**
* The error name or type (e.g., network error, timeout, aborted).
*/
name: string;
/**
* Human-readable description of the failure.
*/
message: string;
/**
* The current sequence number of the processed item (1-based).
*/
processed: number;
}
/**
* @category Types
* Configuration contract for {@link Downloader}.
*/
interface DownloaderOptions extends HttpClientOptions {
/**
* The absolute URL to the master or variant .m3u8 playlist.
*/
playlistURL: string;
/**
* The local directory where files will be saved.
* If omitted, the downloader runs in 'dry-run' mode.
* @default ""
*/
destination?: string;
/**
* Indicates whether existing files should be overwritten.
* @default false
*/
overwrite?: boolean;
/**
* Maximum number of simultaneous network requests.
* @default (CPU_CORES - 1)
*/
concurrency?: number;
/** Optional HTTP client configuration */
[key: string]: any; // For kyOptions
}
/**
* @category Types
* Final execution report returned by {@link Downloader.startDownload}.
*/
interface DownloadSummary {
/** Total number of processed segments.*/
total: number;
/** An array of errors encountered during the process. */
errors: DownloadError[];
/** Human-readable completion status.*/
message: string;
}
type DownloadInProgress = Promise<
PromiseSettledResult<void>[] | PromiseSettledResult<ReadableStream<Uint8Array<ArrayBufferLike>> | undefined>[]
>;
/**
* @category
* @author Nur Rony<pro.nmrony@gmail.com>
* The main orchestrator service for managing HLS stream acquisition.
*/
class Downloader extends EventEmitter {
private items: Set<string> = new Set();
private playlistURL: string;
private pool: LimitFunction;
private http: HttpClient;
private fileService: FileService;
private errors: DownloadSummary['errors'] = [];
private concurrency = 1;
private processedCount = 0;
private isDownloading = false;
/**
* Creates a new Downloader instance.
* @author Nur Rony<pro.nmrony@gmail.com>
* @param options - Configuration object.
* @param options.playlistURL - URL of the master HLS playlist.
* @param [options.destination] - Output directory for downloaded files.
* @param [options.overwrite] - Whether to overwrite existing files.
* @param [options.concurrency] - Maximum concurrent downloads.
* @param [options.headers] - Custom HTTP headers to be sent with every request.
* @param [options.timeout] - Request timeout in milliseconds (Default: 10000)
* @param [options.retry] - Retry strategy for transient network failures
* @param [options.proxy] - Proxy URL (e.g., http://proxy.corp.com:8080)
* @param [options.noProxy] - List of hostnames that should bypass the proxy
*/
constructor(private options: DownloaderOptions) {
super(); // Initialize EventEmitter
const {
destination = '',
playlistURL = '',
overwrite = false,
concurrency = Math.max(1, cpus().length - 1),
headers = {},
...httpClientOptions
} = options || {};
this.playlistURL = playlistURL;
this.pool = pLimit(concurrency);
this.concurrency = concurrency;
this.http = new HttpClient({ headers, ...httpClientOptions });
this.http.setPrimaryOrigin(this.playlistURL);
this.fileService = new FileService(destination, overwrite);
this.items.add(this.playlistURL);
}
/**
* Initiates the download lifecycle.
* @author Nur Rony<pro.nmrony@gmail.com>
* @returns - {Promise<DownloadSummary>} {@link DownloadSummary}
*/
async startDownload(): Promise<DownloadSummary> {
if (this.isDownloading) {
throw new Error('Download already in progress on this instance.');
}
try {
this.isDownloading = true;
this.processedCount = 0;
this.errors = [];
HlsUtils.isValidUrl(this.playlistURL);
const mainContent = await this.http.fetchText(this.playlistURL);
const urls = PlaylistParser.parse(this.playlistURL, mainContent);
urls.forEach(url => this.items.add(url));
const variantPlaylists = urls.filter(u => PlaylistParser.isPlaylist(u));
const variantResults = await Promise.allSettled(variantPlaylists.map(u => this.http.fetchText(u)));
variantResults.forEach((res, index) => {
if (res.status === 'fulfilled') {
const subUrls = PlaylistParser.parse(variantPlaylists[index], res.value);
subUrls.forEach(url => this.items.add(url));
}
});
// Reset counter before starting
this.processedCount = 0;
// Signal the start of the process
this.emit('start', { total: this.items.size, destination: this.options.destination });
await this.processQueue();
const summary = this.generateSummary();
this.emit('end', summary);
return summary;
} catch (error: any) {
this.handleError(this.playlistURL, error);
return this.generateSummary();
} finally {
this.isDownloading = false;
}
}
/**
* [override description]
* @param event {string|symbol} event to emit
* @param args evemt arguments
* @returns event success or failed as boolen
*/
public override emit(event: string | symbol, ...args: any[]): boolean {
return super.emit(event, ...args);
}
/**
* Processes all queued URLs using controlled concurrency.
* @author Nur Rony<pro.nmrony@gmail.com
* @returns Promise<PromiseSettledResult<void>[] | PromiseSettledResult<ReadableStream<Uint8Array<ArrayBufferLike>> | undefined>[]>
*/
private async processQueue(): DownloadInProgress {
const total = this.items.size;
const itemsArray = Array.from(this.items);
if (this.fileService['destination']) {
if (!(await this.fileService.canWrite(this.playlistURL))) {
throw new Error('Directory already exists and overwrite is disabled');
}
const tasks = itemsArray.map(url => this.pool(() => this.downloadFile(url)));
return Promise.allSettled(tasks);
}
const fetchTasks = itemsArray.map(url =>
this.pool(async () => {
try {
const stream = await this.http.getStream(url);
// Increment and emit
this.processedCount++;
this.emit('progress', { url, total, processed: this.processedCount } as SegmentDownloadedData);
return stream;
} catch (error: any) {
this.handleError(url, error);
}
})
);
return Promise.allSettled(fetchTasks);
}
/**
* Downloads and saves a single file.
* @author Nur Rony<pro.nmrony@gmail.com>
* @param url - Resource URL to download.
* @returns -
*/
private async downloadFile(url: string): Promise<void> {
try {
const total = this.items.size;
const stream = await this.http.getStream(url);
const path = await this.fileService.prepareDirectory(url);
await this.fileService.saveStream(stream, path);
// Increment and emit
this.processedCount++;
// Emit progress event
this.emit('progress', {
url,
path,
total,
processed: this.processedCount,
} as SegmentDownloadedData);
} catch (error: any) {
this.handleError(url, error);
}
}
/**
* Handles and aggregates download errors.
* @author Nur Rony<pro.nmrony@gmail.com>
* @param url - URL that caused the error.
* @param error - The thrown error.
*/
private handleError(url: string, error: Error): void {
// We increment even on error so the progress bar is smooth
this.processedCount++;
const errorData: SegmentDownloadErrorData = {
url,
name: error.name,
message: error.message,
processed: this.processedCount,
};
this.errors.push(errorData);
this.emit('error', errorData);
}
/**
* Generates a structured summary of the download operation.
* @author Nur Rony<pro.nmrony@gmail.com>
* @returns - {@link DownloadSummary}
*/
private generateSummary(): DownloadSummary {
return {
errors: this.errors,
total: this.items.size,
message: this.errors.length > 0 ? 'Download ended with errors' : 'Downloaded successfully',
};
}
}
/**
* @author Nur Rony<pro.nmrony@gmail.com>
* @classdesc Downloads or fetch HLS Playlist and its items
*/
export default Downloader;
/**
* @author Nur Rony<pro.nmrony@gmail.com>
* Types for Downloader
*/
export type {
DownloaderEvents,
DownloaderOptions,
DownloadError,
DownloadSummary,
SegmentDownloadedData,
SegmentDownloadErrorData,
};