import { createWriteStream } from 'fs';
import { access, constants, mkdir, unlink } from 'fs/promises';
import ky from 'ky';
import pLimit from 'p-limit';
import { dirname, join } from 'path';
import { Readable } from 'stream';
import { URL } from 'url';
import { InvalidPlayList } from './exceptions';
import Utils from './utils';
/**
* HLS Playlist file extension
* @constant
* @type {string}
* @memberof module:HLSDownloader
*/
const HLS_PLAYLIST_EXT = '.m3u8';
/**
* @class
* @memberof module:HLSDownloader
* @author Nur Rony<pro.nmrony@gmail.com>
* @classdesc Main donwloader class of HLSDownloader Package
*/
class Downloader {
/** @lends Downloader.prototype */
/**
* @static
* @type {object}
* @description Default <a href="https://www.npmjs.com/package/ky" target="_blank">Ky</a> options values set by HLSDownloader
* @default
* <pre>
* {
* retry: { limit: 0 }
* }
* </pre>
*/
static defaultKyOptions = { retry: { limit: 0 } };
/**
* @type {object}
* @default 1
* @description concurrency controller
*/
pool = pLimit(1);
/**
* @type {boolean}
* @default false
* @description concurrency controller
*/
overwrite = false;
/**
* @type {string[]}
* @default
* <pre>
* [
* 'uri',
* 'url',
* 'json',
* 'form',
* 'body',
* 'method',
* 'setHost',
* 'isStream',
* 'parseJson',
* 'prefixUrl',
* 'cookieJar',
* 'playlistURL',
* 'concurrency',
* 'allowGetBody',
* 'stringifyJson',
* 'methodRewriting'
* ]
* </pre>
*/
static unSupportedOptions = [
'uri',
'url',
'json',
'form',
'body',
'method',
'setHost',
'isStream',
'parseJson',
'prefixUrl',
'cookieJar',
'playlistURL',
'concurrency',
'allowGetBody',
'stringifyJson',
'methodRewriting',
];
/**
* @type {string[]}
* @description items that are downloaded successfully
*/
items = [];
/**
* @type {Array<{url: string, name: string, message: string}>}
* @description items that are not downloaded successfully
*/
errors = [];
/**
* @type {number=}
* @default 1
* @description Concurrency limit to download items
*/
concurrency = 1;
/**
* @type {object=}
* @default <pre>{}</pre>
* @description Extra options to pass into <a href="https://www.npmjs.com/package/ky" target="_blank">Ky</a>
*/
kyOptions = {};
/**
* @default ''
* @type {string}
* @description Playlist URL to download
*/
playlistURL = '';
/**
* @default ''
* @type {string}
* @description Absolute path to download the TS files with corresponding playlist file
*/
destination = '';
/**
* @default null
* @type {Function | null}
* @description Function to track downloaded data
*/
onData = null;
/**
* @default null
* @type {Function | null}
* @description Function to track error
*/
onError = null;
/**
* @constructor
* @throws TypeError
* @param {object} downloderOptions - Options to build downloader
* @param {string} downloderOptions.playlistURL - Playlist URL to download
* @param {number} [downloderOptions.concurrency = 1] - concurrency limit to download playlist chunk
* @param {object} [downloderOptions.destination = ''] - Absolute path to download
* @param {object | Function} [downloderOptions.onData = null] - onData hook
* @param {object | Function} [downloderOptions.onError = null] - onError hook
* @param {boolean} [downloderOptions.overwrite = false] - Overwrite files toggler
* @param {object} [downloderOptions.options = {}] - Options to override from <a href="https://www.npmjs.com/package/ky" target="_blank">Ky</a>
* @throws ProtocolNotSupported
*/
constructor(
{ playlistURL, destination, concurrency = 1, overwrite = false, onData = null, onError = null, ...options } = {
concurrency: 1,
destination: '',
playlistURL: '',
onData: null,
onError: null,
overwrite: false,
options: {},
}
) {
try {
this.items = [playlistURL];
this.playlistURL = playlistURL;
this.concurrency = concurrency;
this.overwrite = overwrite ?? false;
this.destination = destination ?? '';
this.pool = pLimit(concurrency ?? 1);
this.kyOptions = this.mergeOptions(options);
this.onData = onData;
this.onError = onError;
// method binding
this.fetchItems = this.fetchItems.bind(this);
this.downloadItem = this.downloadItem.bind(this);
this.mergeOptions = this.mergeOptions.bind(this);
this.fetchPlaylist = this.fetchPlaylist.bind(this);
this.startDownload = this.startDownload.bind(this);
this.downloadItems = this.downloadItems.bind(this);
this.shouldOverwrite = this.shouldOverwrite.bind(this);
this.createDirectory = this.createDirectory.bind(this);
this.parsePlaylist = this.parsePlaylist.bind(this);
this.processPlaylistItems = this.processPlaylistItems.bind(this);
this.formatPlaylistContent = this.formatPlaylistContent.bind(this);
Utils.isValidUrl(playlistURL);
if (this.onData !== null && Utils.isNotFunction(this.onData)) {
throw TypeError('The `onData` must be a function');
}
if (this.onError !== null && Utils.isNotFunction(this.onError)) {
throw TypeError('The `onError` must be a function');
}
} catch (error) {
throw error;
}
}
/**
* @method
* @memberof class:Downloader
* @description Start the downloading process
*/
async startDownload() {
const { url, body: playlistContent } = await this.fetchPlaylist(this.playlistURL);
if (this.errors.length > 0) {
return {
errors: this.errors,
message: 'Unsuccessful download',
};
}
let urls = this.parsePlaylist(url, playlistContent);
this.items = [...this.items, ...urls];
const playlists = urls.filter(url => url.toLowerCase().endsWith(HLS_PLAYLIST_EXT));
const playlistContentPromiseResults = await Promise.allSettled(playlists.map(this.fetchPlaylist));
const playlistContents = this.formatPlaylistContent(playlistContentPromiseResults);
urls = playlistContents.map(content => this.parsePlaylist(content?.url, content?.body)).flat();
this.items = [...this.items, ...urls];
await this.processPlaylistItems();
if (this.errors.length > 0) {
return {
errors: this.errors,
total: this.items.length,
message: 'Download ended with some errors',
};
}
return {
total: this.items.length,
playlistURL: this.playlistURL,
message: 'Downloaded successfully',
};
}
/**
* @returns {object}
* @param {object} options
* @description merge options
*/
mergeOptions(options) {
return Object.assign(Downloader.defaultKyOptions, Utils.omit(options, ...Downloader.unSupportedOptions));
}
/**
* @method
* @param {string} playlistContent
* @returns string[] Array of url
* @description Parse playlist content and index the TS chunk to download.
*/
parsePlaylist(playlistURL, playlistContent) {
return playlistContent
.replace(/^#[\s\S].*/gim, '')
.split(/\r?\n/)
.reduce((result, item) => {
if (item !== '') {
const url = new URL(item, playlistURL).href;
//@ts-ignore
result.push(url);
}
return result;
}, []);
}
/**
* @async
* @method
* @returns {Promise<{url, body}>}
* @description fetch playlist content
*/
async fetchPlaylist(url) {
try {
const body = await ky.get(url, { ...this.kyOptions }).text();
if (!Utils.isValidPlaylist(body)) {
const { name, message } = new InvalidPlayList('Invalid playlist');
this.errors.push({ url, name, message });
return { url: '', body: '' };
}
return { url, body };
} catch ({ name, message }) {
this.errors.push({ url, name, message });
if (this.onError) {
this.onError({ name, message, url });
}
return { url: '', body: '' };
}
}
/**
* @method
* @description filter playlist contents
* @param {object[]} playlistContentResults list of fetched playlist content
* @returns {Array<{url: string, body: string}>} list of object containing url and its content
*/
formatPlaylistContent(playlistContentResults) {
return playlistContentResults.reduce((contents, { status, value }) => {
if (status.toLowerCase() === 'fulfilled' && !!value) {
contents.push(value);
}
return contents;
}, []);
}
/**
* @async
* @method
* @returns {Promise<any>}
* @description Process playlist items
*/
async processPlaylistItems() {
return (this.destination && this.downloadItems()) || this.fetchItems();
}
/**
* @async
* @method
* @description Download each iteam
* @param {string} item - item to download
* @returns {Promise<any>}
*/
async downloadItem(item) {
try {
const response = await ky.get(item, { ...this.kyOptions });
const filePath = await this.createDirectory(item);
// @ts-ignore
const readStream = Readable.fromWeb(response.body);
return new Promise((resolve, reject) => {
const writeStream = createWriteStream(filePath);
readStream.pipe(writeStream);
readStream.on('error', error => {
readStream.destroy();
writeStream.destroy();
unlink(filePath);
if (this.onError) {
this.onError({
url: item,
name: error.name,
message: error.message,
});
}
reject(error);
});
writeStream.on('finish', () => {
writeStream.close();
if (this.onData) {
this.onData({ url: item, totalItems: this.items.length, path: filePath });
}
resolve('success');
});
writeStream.on('error', error => {
writeStream.destroy();
readStream.destroy();
if (this.onError) {
this.onError({
url: item,
name: error.name,
message: error.message,
});
}
reject(error);
});
});
} catch ({ name, message }) {
this.errors.push({ name, message, url: item });
if (this.onError) {
this.onError({ name, message, url: item });
}
}
}
async downloadItems() {
try {
if (!(await this.shouldOverwrite(this.playlistURL))) {
const error = new Error('directory already exists');
error.name = 'EEXIST';
throw error;
}
await this.createDirectory(this.playlistURL);
const downloaderPromises = this.items.map(url => this.pool(this.downloadItem, url));
return Promise.allSettled(downloaderPromises);
} catch (error) {
this.errors.push({ url: this.playlistURL, name: error.name, message: error.message });
if (this.onError) {
this.onError({ url: this.playlistURL, name: error.name, message: error.message });
}
}
}
/**
* @async
* @method
* @description Fetch playlist items
* @returns {Promise<any>}
*/
async fetchItems() {
return Promise.allSettled(
this.items.map(item =>
this.pool(async () => {
try {
const item$ = await ky.get(item, { ...this.kyOptions });
if (this.onData) {
this.onData({ url: item, totalItems: this.items.length, path: null });
}
return item$;
} catch ({ name, message }) {
this.errors.push({ url: item, name, message });
if (this.onError) {
this.onError({ url: item, name, message });
}
}
})
)
);
}
/**
* @description create directory to download
* @returns {Promise<string>} destination path
* @param {string} url url to construct the path from
*/
async createDirectory(url) {
const { pathname } = Utils.parseUrl(url);
const destDirectory = join(this.destination, dirname(pathname));
await mkdir(destDirectory, { recursive: true });
return join(this.destination, Utils.stripFirstSlash(pathname));
}
/**
* @method
* @param {string} url - url to build path from
* @description Checks for overwrite flag
* @returns {Promise<boolean>}
*/
async shouldOverwrite(url) {
try {
const { pathname } = Utils.parseUrl(url);
const destDirectory = join(this.destination, dirname(pathname));
await access(destDirectory, constants.F_OK);
return this.overwrite;
} catch (error) {
if (error.code === 'ENOENT') return true;
throw error;
}
}
}
/**
* @author Nur Rony<pro.nmrony@gmail.com>
* @classdesc Downloads or fetch HLS Playlist and its items
*/
export default Downloader;