/*
* Content Fabric live stream management
*/
const { ElvClient } = require("@eluvio/elv-client-js");
const Utils = require("@eluvio/elv-client-js/src/Utils.js");
const { execSync } = require("child_process");
const { Config } = require("./Config.js");
const fs = require("fs");
const got = require("got");
const https = require("https");
const yaml = require("js-yaml");
const MakeTxLessToken = async({client, libraryId, objectId, versionHash}) => {
const tok = await client.authClient.AuthorizationToken({libraryId, objectId,
versionHash, channelAuth: false, noCache: true,
noAuth: true});
return tok;
};
/**
* Provides live stream management operations on the Eluvio Content Fabric:
* creating and configuring streams, controlling recording sessions, copying
* live recordings to VoD objects, and measuring ingest/egress latency.
*/
class EluvioLiveStream {
/**
* Instantiate the EluvioLiveStream
*
* @namedParams
* @param {string} url - Optional node endpoint URL (overwrites config URL)
* @param {boolean} debugLogging - Optional debug logging flag
* @param {string} token - Optional static authorization token
* @return {EluvioLiveStream} - New EluvioLiveStream object connected to the specified content fabric
*/
constructor({ url, debugLogging = false, token }) {
if (url) {
this.configUrl = url+"/config?self&qspace="+Config.net;
} else {
this.configUrl = Config.networks[Config.net];
}
this.debug = debugLogging;
this.staticToken = token;
}
/**
* Initialize the EluvioLiveStream SDK, connecting to the Content Fabric
* using the PRIVATE_KEY environment variable.
*/
async Init() {
this.client = await ElvClient.FromConfigurationUrl({
configUrl: this.configUrl,
});
let wallet = this.client.GenerateWallet();
let signer = wallet.AddAccount({
privateKey: process.env.PRIVATE_KEY,
});
this.client.SetSigner({ signer });
this.client.ToggleLogging(this.debug);
if (this.staticToken) {
console.log("Use static token");
this.client.SetStaticToken({token: this.staticToken});
}
}
/**
* Prepare a stream for status retrieval by setting a transaction-less static token,
* reducing auth overhead when polling multiple streams.
*
* @namedParams
* @param {string} name - The object ID of the live stream
*/
async StatusPrep({name}) {
const objectId = name;
const libraryId = await this.client.ContentObjectLibraryId({objectId});
try {
// Set static token - avoid individual auth for separate channels/streams
let token = await MakeTxLessToken({client: this.client, libraryId});
this.client.SetStaticToken({token});
} catch (error) {
console.log("StatusPrep failed: ", error);
return null;
}
}
/**
* Retrieve the status of the current live stream session.
*
* Stream states:
* - `unconfigured` — no live_recording_config
* - `uninitialized` — no live_recording config generated
* - `inactive` — live_recording config initialized but no edge write token
* - `stopped` — edge write token exists but recording not started
* - `starting` — LRO running but no source data yet
* - `running` — stream is running and producing output
* - `stalled` — LRO running but no source data (not producing output)
*
* @namedParams
* @param {string} name - The object ID of the live stream
* @param {boolean} [stopLro=false] - Stop the LRO if the stream is stalled
* @param {boolean} [showParams=false] - Include recording parameters in the response
* @param {boolean} [saveMeta=true] - Write edge metadata to a local JSON file
* @returns {Promise<Object>} Stream status object
*/
async Status({ name, stopLro = false, showParams = false, saveMeta = true }) {
let status = await this.client.StreamStatus({name, stopLro, showParams});
if (saveMeta) {
let edgeMeta = await this.client.ContentObjectMetadata({
libraryId: status.libraryId,
objectId: status.objectId,
writeToken: status.edgeWriteToken
});
fs.writeFileSync("meta-" + status.name + ".json", JSON.stringify(edgeMeta, null, 2));
}
return status;
}
/**
* Create a live stream object on the Content Fabric.
* The recording configuration may be either a named profile or a path to a
* local YAML/JSON file.
*
* @namedParams
* @param {string} [objectId] - Existing content object ID to use as the stream object
* @param {string} [libraryId] - Library in which to create a new stream object
* @param {string} url - Ingest URL for the live source
* @param {boolean} [finalize] - Finalize the object after creation
* @param {string} liveRecordingConfigArg - Named profile or path to a YAML/JSON config file
* @param {string} [name] - Display name for the stream object
* @param {string} [permission] - Permission level (e.g. "editable")
* @param {boolean} [linkToSite] - Link the stream to its site object
* @returns {Promise<Object>} Result from StreamCreate
*/
async streamCreate({ objectId, libraryId, url, finalize, liveRecordingConfigArg, name, permission, linkToSite }) {
let liveRecordingConfig;
if (fs.existsSync(liveRecordingConfigArg)) {
// Although its yaml.load it still works with JSON sources!
liveRecordingConfig = yaml.load(fs.readFileSync(liveRecordingConfigArg, "utf8"));
} else {
liveRecordingConfig = await this.client.StreamConfigProfile({profileName: liveRecordingConfigArg});
}
const options = {};
if (name !== undefined) options.name = name;
if (permission !== undefined) options.permission = permission;
if (linkToSite !== undefined) options.linkToSite = linkToSite;
return await this.client.StreamCreate({
objectId,
libraryId,
url,
finalize,
options,
liveRecordingConfig
});
}
/**
* Create multiple live stream objects in bulk from a YAML or JSON batch file.
* The file must define `library`, `streams[]`, and either `profile_name` or
* `profile_data` at the top level.
*
* @param {string} batch_file - Path to the YAML/JSON batch configuration file
* @returns {Promise<boolean>} true on success
*/
async CreateStreamObjectBatch(batch_file){
let bulkFileContents = {};
try {
const fileContents = fs.readFileSync(batch_file, "utf8");
// Although its yaml.load it still works with JSON sources!
bulkFileContents = yaml.load(fileContents);
} catch (e) {
console.error(`Error: Could not read or parse file. ${e.message}`);
process.exit(1);
}
// check if we will use a saved profile or one defined in the file
let liveRecordingConfig;
if (bulkFileContents.profile_name !== undefined) {
liveRecordingConfig = await this.client.StreamConfigProfile({profileName: bulkFileContents.profile_name});
} else if (bulkFileContents.profile_data !== undefined) {
liveRecordingConfig = bulkFileContents.profile_data;
} else {
console.log("ERROR: profile_name or profile_data not defined in batch file");
process.exit(1);
}
// iterate through all the streams and create them
const libraryId = bulkFileContents.library;
const streams = bulkFileContents.streams;
for (const stream of streams) {
try {
console.log(`CREATING ${stream.name}`);
const url = stream.url;
const options = {
name: stream.name,
displayTitle: stream.name,
linkToSite: stream.link_to_site ?? true,
permission: stream.permission ?? "editable"
};
await this.client.StreamCreate({
libraryId,
url,
options,
liveRecordingConfig
});
} catch (e) {
console.error(`Error: Could not create stream: ${e.message}`);
process.exit(1);
}
}
return true;
}
/**
* Start a new recording session by creating a new edge write token.
* Optionally prints the curl commands needed to manually control the stream.
*
* @namedParams
* @param {string} name - The object ID of the live stream
* @param {boolean} [start=false] - Immediately start the LRO after creating the token
* @param {boolean} [show_curl=false] - Print curl commands for manual stream control
* @returns {Promise<Object>} Recording session status
*/
async StreamStartRecording ({name, start = false, show_curl = false}) {
const status = await this.client.StreamStartRecording({name, start});
if (show_curl) {
const objectId = status.object_id;
const libraryId = status.library_id;
const edgeToken = status.edge_write_token;
const objectHash = status.hash;
const fabURI = status.fabric_api;
const response = await this.client.authClient.AuthorizationToken({
libraryId: libraryId,
objectId: objectId,
versionHash: "",
channelAuth: false,
noCache: true,
update: true,
});
const curlCmd = "curl -s -H \"$AUTH_HEADER\" ";
const fabLibHashURI = fabURI + "/qlibs/" + libraryId + "/q/" + objectHash;
const fabLibTokenURI = fabURI + "/qlibs/" + libraryId + "/q/" + edgeToken;
console.log("\nSet Authorization header:\nexport AUTH_HEADER=\"" +
"Authorization: Bearer " + response + "\"");
console.log("\nInspect metadata:\n" +
curlCmd + fabLibHashURI + "/meta | jq");
console.log("\nInspect edge metadata:\n" +
curlCmd + fabLibTokenURI + "/meta | jq");
console.log("\nStart recording (returns HANDLE):\n" +
curlCmd + fabLibTokenURI + "/call/live/start | jq");
console.log("\nStop recording (use HANDLE from start):\n" +
curlCmd + fabLibTokenURI + "/call/live/stop/HANDLE");
console.log("\nPlayout options:\n" +
curlCmd + fabLibHashURI + "/rep/live/default/options.json | jq");
console.log("\nHLS playlist:\n" +
fabLibHashURI + "/rep/live/default/hls-sample-aes/playlist.m3u8?authorization=" + response);
}
return status;
}
/**
* Start, stop, or reset a stream within the current recording session
* (current edge write token).
*
* Operations:
* - `start` — begin the LRO
* - `reset` — stop the current LRO and start a new one; creates a new recording
* period inside the existing edge write token (does NOT create a new token)
* - `stop` — stop the LRO
*
* @namedParams
* @param {string} name - The object ID of the live stream
* @param {string} op - Operation: "start" | "reset" | "stop"
* @returns {Promise<Object>} Stream status
*/
async StartOrStopOrReset({name, op}) {
return this.client.StreamStartOrStopOrReset({name, op});
}
/**
* Stop the live stream session and close the edge write token.
*
* @namedParams
* @param {string} name - The object ID of the live stream
* @returns {Promise<Object>} Result from StreamStopRecording
*/
async StopSession({name}) {
return this.client.StreamStopRecording({name});
}
/**
* Initialize a live stream, generating the live_recording configuration from
* the user-supplied live_recording_config metadata.
*
* @namedParams
* @param {string} name - The object ID of the live stream
* @param {boolean} [drm=false] - Enable DRM for the stream output
* @param {string} [format] - Output format override
* @returns {Promise<Object>} Initialization result
*/
async Initialize({name, drm=false, format}) {
return this.client.StreamInitialize({name, drm, format});
}
/**
* Add or remove a content insertion entry in the live stream.
*
* @namedParams
* @param {string} name - The object ID of the live stream
* @param {number} insertionTime - Insertion time in seconds (float)
* @param {boolean} sinceStart - true if time is relative to stream start, false if Unix epoch seconds
* @param {number} [duration=20.0] - Insertion duration in seconds (float)
* @param {string} targetHash - Content hash of the playable insertion content
* @param {boolean} [remove=false] - Remove the insertion at the given time instead of adding it
* @returns {Promise<Object>} Result from StreamInsertion
*/
async Insertion({name, insertionTime, sinceStart, duration, targetHash, remove}) {
return this.client.StreamInsertion({name, insertionTime, sinceStart, duration, targetHash, remove});
}
/**
* Download the raw parts of a live stream recording period and reassemble
* them into an MP4 (or MPEG-TS) file using ffmpeg.
*
* @namedParams
* @param {string} name - The object ID of the live stream
* @param {number} [period] - Recording period index; defaults to the latest period
* @param {number} [offset=0] - Skip parts before this many seconds from the start
* @param {boolean} [makeFrame=false] - Extract a JPEG thumbnail from each video part
* @param {boolean} [mpegtsCopy=false] - Output as a concatenated MPEG-TS file instead of MP4
* @returns {Promise<Object>} Status object with `file` path and `state`
*/
async StreamDownload({name, period, offset, makeFrame, mpegtsCopy}) {
let objectId = name;
let status = {name};
try {
const libraryId = await this.client.ContentObjectLibraryId({objectId: objectId});
status.library_id = libraryId;
status.object_id = objectId;
let mainMeta = await this.client.ContentObjectMetadata({
libraryId: libraryId,
objectId: objectId
});
let fabURI = mainMeta.live_recording.fabric_config.ingress_node_api;
if (fabURI == undefined) {
console.log("bad fabric config - missing ingress node API");
}
// Support both hostname and URL ingress_node_api
if (!fabURI.startsWith("http")) {
// Assume https
fabURI = "https://" + fabURI;
}
this.client.SetNodes({fabricURIs: [fabURI]});
let edgeWriteToken = mainMeta.live_recording.fabric_config.edge_write_token;
let edgeMeta = await this.client.ContentObjectMetadata({
libraryId: libraryId,
objectId: objectId,
writeToken: edgeWriteToken
});
// If a stream has never been started return state 'inactive'
if (edgeMeta.live_recording == undefined ||
edgeMeta.live_recording.recordings == undefined ||
edgeMeta.live_recording.recordings.recording_sequence == undefined) {
status.state = "no recordings";
return status;
}
let recordings = edgeMeta.live_recording.recordings;
status.recording_period_sequence = recordings.recording_sequence;
let sequence = recordings.recording_sequence;
if (period == undefined || period < 0 || period > sequence - 1) {
period = sequence - 1;
}
console.log("Downloading stream", name, " period", period, " latest", sequence - 1);
let recording = recordings.live_offering[period];
if (recording == undefined || recording.sources == undefined) {
console.log("ERROR - recording period not found: ", period);
}
let streams = Object.keys(recording.sources);
console.log("Streams", streams);
let dpath = "DOWNLOAD/" + edgeWriteToken + "." + period;
!fs.existsSync(dpath) && fs.mkdirSync(dpath, {recursive: true});
// Reorder streams list so it starts with video
let mts = [];
if (mpegtsCopy) {
mts.push("mpegts");
} else {
mts.push("video");
for (let mi = 0; mi < streams.length; mi ++) {
if (streams[mi].includes("video"))
continue;
mts.push(streams[mi]);
}
}
let inputs = "";
let inputs_map = "";
let makeFrameCmds = [];
for (let mi = 0; mi < mts.length; mi ++) {
let mt = mts[mi];
if (mt.includes("video")) {
inputs = inputs + " -i " + dpath + "/" + mt + ".mp4";
inputs_map = inputs_map + ` -map ${mi}:v:0`;
} else if (mt.includes("audio")) {
inputs = inputs + " -i " + dpath + "/" + mt + ".mp4";
inputs_map = inputs_map + ` -map ${mi}:a:0`;
}
console.log("Downloading ", mt);
let mtpath = dpath + "/" + mt;
let partsfile = dpath + "/parts_" + mt + ".txt";
!fs.existsSync(mtpath) && fs.mkdirSync(mtpath);
var sources = recording.sources[mt].parts;
for (let i = 0; i < sources.length - 1; i++) {
if (i * 30 <= offset) {
console.log(sources[i].hash, "skipped");
continue;
}
console.log(sources[i].hash);
let partHash = sources[i].hash;
let buf = await this.client.DownloadPart({
libraryId,
objectId: objectId,
partHash,
format: "buffer",
chunked: false,
callback: ({bytesFinished, bytesTotal}) => {
console.log(" progress: ", bytesFinished + "/" + bytesTotal);
}
});
let partfile = mtpath + "/" + partHash + ".mp4";
fs.appendFile(partfile, buf, (err) => {
if (err)
console.log(err);
});
fs.appendFile(partsfile, "file '" + mt + "/" + partHash + ".mp4'\n", (err) => {
if (err)
console.log(err);
});
if (makeFrame && mt.includes("video")) {
const makeFrameCmd = "ffmpeg -i " + partfile+ " -vframes 1 -update 1 -q:v 1 " + mtpath + "/" + partHash + ".jpg";
makeFrameCmds.push(makeFrameCmd);
}
}
// Make frames from parts
if (makeFrame && mt.includes("video")) {
for (let i = 0; i < makeFrameCmds.length; i++) {
console.log("Frame cmd", makeFrameCmds[i]);
execSync(makeFrameCmds[i]);
}
}
if (mpegtsCopy) {
// Concatenate parts into one ts file
status.file = dpath + "/" + mt + ".ts";
let cmd = "ffmpeg -f concat -safe 0 -i " + partsfile + " -map 0 -c copy " + status.file;
console.log("Running", cmd);
execSync(cmd);
status.state = "completed";
return status;
}
// Concatenate parts into one mp4
let cmd = "ffmpeg -f concat -safe 0 -i " + partsfile + " -c copy " + dpath + "/" + mt + ".mp4";
console.log("Running", cmd);
execSync(cmd);
}
// Create final mp4 file
let f = dpath + "/download.mp4";
let cmd = "ffmpeg " + inputs + " " + inputs_map + " -c copy -shortest " + f;
console.log("Running", cmd);
execSync(cmd);
status.file = f;
status.state = "completed";
} catch (e) {
console.log("Download failed", e);
throw e;
}
return status;
}
/**
* Copy a portion of a live stream recording into a standard VoD object using
* the zero-copy Content Fabric API. If no target object is supplied one is
* created automatically in the specified library.
*
* Limitations:
* - the target object must have content encryption keys (CAPS) set
* - audio and video sync requires the recording period to start from the
* very beginning of the desired segment; for event streams ensure the TTL
* is long enough; for 24/7 streams reset the stream first
* - `startTime` / `endTime` trimming requires the fabric node to support it
*
* @namedParams
* @param {string} stream - Object ID of the source live stream
* @param {string} [object] - Object ID of the existing target VoD object; a new object is created if omitted
* @param {string} [library] - Library ID for the new VoD object (required when `object` is omitted)
* @param {string} [name] - Name for the new VoD object
* @param {string} [title] - Title for the new VoD object's asset metadata
* @param {boolean} [drm=true] - Enable DRM on the VoD mezzanine
* @param {boolean} [includeTags=false] - Copy video tags from the live stream
* @param {boolean} [defaultDash=false] - Add a `default_dash` (Chromecast-friendly) offering
* @param {boolean} [keepExistingStreams=false] - Preserve existing stream info (e.g. thumbnails) in the target
* @param {string} [eventId] - SCTE-35 event ID used to look up start/end times automatically
* @param {string} [startTime] - ISO-8601 start time for the clip (e.g. "2023-10-03T02:09:02.00Z")
* @param {string} [endTime] - ISO-8601 end time for the clip
* @param {number} [recordingPeriod] - Recording period index to copy (-1 for latest)
* @param {string[]} [streams] - Stream tracks to include (e.g. ["video", "audio"])
* @returns {Promise<Object>} Status object including `target_hash` of the finalized VoD object
*/
/*
Example fabric API flow:
https://host-76-74-34-194.contentfabric.io/qlibs/ilib24CtWSJeVt9DiAzym8jB6THE9e7H/q/$QWT/call/media/live_to_vod/init -d @r1 -H "Authorization: Bearer $TOK"
{
"live_qhash": "hq__5Zk1jSN8vNLUAXjQwMJV8F8J8ESXNvmVKkhaXySmGc1BXnJPG2FvvaXee4CXqvFHuGuU3fqLJc",
"start_time": "",
"end_time": "",
"recording_period": -1,
"streams": ["video", "audio"],
"variant_key": "default"
}
https://host-76-74-34-194.contentfabric.io/qlibs/ilib24CtWSJeVt9DiAzym8jB6THE9e7H/q/$QWT/call/media/abr_mezzanine/init -H "Authorization: Bearer $TOK" -d @r2
{
"abr_profile": { ... },
"offering_key": "default",
"prod_master_hash": "tqw__HSQHBt7vYxWfCMPH5yXwKTfhdPcQ4Lcs9WUMUbTtnMbTZPTLo4BfJWPMGpoy1Dpv1wWQVtUtAtAr429TnVs",
"variant_key": "default",
"keep_other_streams": false
}
https://host-76-74-34-194.contentfabric.io/qlibs/ilib24CtWSJeVt9DiAzym8jB6THE9e7H/q/$QWT/call/media/live_to_vod/copy -d '{"variant_key":"","offering_key":""}' -H "Authorization: Bearer $TOK"
https://host-76-74-34-194.contentfabric.io/qlibs/ilib24CtWSJeVt9DiAzym8jB6THE9e7H/q/$QWT/call/media/abr_mezzanine/offerings/default/finalize -d '{}' -H "Authorization: Bearer $TOK"
*/
async StreamCopyToVod({stream, object, library, name, title, drm = true, includeTags = false, defaultDash = false, keepExistingStreams = false, eventId, startTime, endTime, recordingPeriod, streams}) {
const objectId = stream;
let abrProfileLiveToVod;
if (drm) {
console.log("DRM");
abrProfileLiveToVod = require("./abr_profile_live_to_vod_drm.json");
} else {
console.log("NO DRM");
abrProfileLiveToVod = require("./abr_profile_live_to_vod.json");
}
let status = await this.Status({name: stream});
let libraryId = status.libraryId;
let targetLibraryId;
let targetWriteToken;
// If a target object is not specified, create it here
if (object == undefined) {
if (library == undefined) {
throw "one of object or library must be specified";
}
const typeId = await this.FindContentType({label: "title"});
if (typeId == undefined) {
throw "content type not found: title";
}
console.log("Creating new object in library " + library);
if (!name) {
name = "VOD - Live Stream " + stream + " - " + new Date().toISOString();
}
if (!title) {
title = "Live Stream " + stream + " - " + new Date().toISOString();
}
const newObject = await this.client.CreateContentObject({libraryId: library, options: {
type: typeId,
meta: {
public: {
name: name,
asset_metadata: {
title: title
}
}
}
}});
await this.client.SetPermission({objectId: newObject.objectId, writeToken: newObject.writeToken, permission: "editable"});
object = newObject.objectId;
targetWriteToken = newObject.writeToken;
targetLibraryId = library;
} else {
targetLibraryId = await this.client.ContentObjectLibraryId({objectId: object});
}
// If updating an existing object, capture entry/exit rat from existing
// offerings so we can restore them after live-to-vod overwrites them.
let preservedOfferingPoints = {};
if (object && targetWriteToken == undefined) {
for (const offeringKey of ["default", "default_dash"]) {
const existing = await this.client.ContentObjectMetadata({
libraryId: targetLibraryId,
objectId: object,
metadataSubtree: `offerings/${offeringKey}`
});
if (existing && (existing.entry_point_rat || existing.exit_point_rat)) {
preservedOfferingPoints[offeringKey] = {
entry_point_rat: existing.entry_point_rat,
exit_point_rat: existing.exit_point_rat
};
console.log(
`Preserving offerings/${offeringKey} entry_point_rat=${existing.entry_point_rat} exit_point_rat=${existing.exit_point_rat}`
);
}
}
}
console.log("Copying stream", stream, "object", object, "drm", drm);
// Validation - require target object
if (!object) {
throw "Must specify a target object ID";
}
// Validation - ensure target object has content encryption keys
const kmsAddress = await this.client.authClient.KMSAddress({objectId: object});
const kmsCapId = `eluv.caps.ikms${Utils.AddressToHash(kmsAddress)}`;
const kmsCap = await this.client.ContentObjectMetadata({
libraryId: targetLibraryId,
objectId: object,
writeToken: targetWriteToken,
metadataSubtree: kmsCapId
});
if (!kmsCap) {
throw Error("No content encryption key set for this object");
}
try {
status.live_object_id = objectId;
let liveHash = await this.client.LatestVersionHash({objectId: objectId, libraryId});
status.live_hash = liveHash;
if (eventId) {
// Retrieve start and end times for the event
let event = await this.CueInfo({eventId, status});
if (event.eventStart && event.eventEnd) {
console.log("Event", event);
startTime = event.eventStart;
endTime = event.eventEnd;
}
}
if (targetWriteToken == undefined) {
let edt = await this.client.EditContentObject({
objectId: object,
libraryId: targetLibraryId
});
targetWriteToken = edt.writeToken;
}
console.log("Target write token", targetWriteToken);
status.target_object_id = object;
status.target_library_id = targetLibraryId;
status.target_write_token = targetWriteToken;
console.log("Process live source (takes around 30 sec per hour of content)");
await this.client.CallBitcodeMethod({
libraryId: targetLibraryId,
objectId: object,
writeToken: targetWriteToken,
method: "/media/live_to_vod/init",
body: {
"live_qhash": liveHash,
"start_time": startTime, // eg. "2023-10-03T02:09:02.00Z",
"end_time": endTime, // eg. "2023-10-03T02:15:00.00Z",
"streams": streams,
"recording_period": recordingPeriod,
"variant_key": "default",
"include_tags": includeTags // true to copy video tags from live stream
},
constant: false,
format: "text"
});
console.log("Initialize VoD mezzanine");
let abrMezInitBody = {
abr_profile: abrProfileLiveToVod,
"offering_key": "default",
"prod_master_hash": targetWriteToken,
"variant_key": "default",
"keep_other_streams": keepExistingStreams, // true to keep existing stream info including thumbnails
...(defaultDash && {
"additional_offering_specs": {
"default_dash": [ // creates an offering for chromecasting
{
"op": "replace",
"path": "/playout/playout_formats",
"value": {
"dash-clear": {
"drm": null,
"protocol": {
"min_buffer_length": 2,
"type": "ProtoDash"
}
}
}
}
]
}
})
};
await this.client.CallBitcodeMethod({
libraryId: targetLibraryId,
objectId: object,
writeToken: targetWriteToken,
method: "/media/abr_mezzanine/init",
body: abrMezInitBody,
constant: false,
format: "text"
});
console.log("Populate live parts");
await this.client.CallBitcodeMethod({
libraryId: targetLibraryId,
objectId: object,
writeToken: targetWriteToken,
method: "/media/live_to_vod/copy", // Takes about 20 sec per "hour" of live
body: {
"variant_key": "default",
"offering_key": "default",
},
constant: false,
format: "text"
});
console.log("Finalize VoD mezzanine");
await this.client.CallBitcodeMethod({
libraryId: targetLibraryId,
objectId: object,
writeToken: targetWriteToken,
method: "/media/abr_mezzanine/offerings/default/finalize",
body: abrMezInitBody,
constant: false,
format: "text"
});
// Restore previously captured entry/exit rat values onto the offerings
for (const [offeringKey, points] of Object.entries(preservedOfferingPoints)) {
const offering = await this.client.ContentObjectMetadata({
libraryId: targetLibraryId,
objectId: object,
writeToken: targetWriteToken,
metadataSubtree: `offerings/${offeringKey}`
});
if (!offering) {
console.log(`Skipping restore for offerings/${offeringKey} (not present after copy)`);
continue;
}
if (points.entry_point_rat !== undefined) {
offering.entry_point_rat = points.entry_point_rat;
}
if (points.exit_point_rat !== undefined) {
offering.exit_point_rat = points.exit_point_rat;
}
await this.client.ReplaceMetadata({
libraryId: targetLibraryId,
objectId: object,
writeToken: targetWriteToken,
metadataSubtree: `offerings/${offeringKey}`,
metadata: offering
});
console.log(
`Restored offerings/${offeringKey} entry_point_rat=${points.entry_point_rat} exit_point_rat=${points.exit_point_rat}`
);
}
let finalize = true;
if (finalize) {
console.log("Finalize target object");
let fin = await this.client.FinalizeContentObject({
libraryId: targetLibraryId,
objectId: object,
writeToken: targetWriteToken,
commitMessage: "Live Stream to VoD"
});
status.target_hash = fin.hash;
}
// Clean up status items we don't need
delete status.playout_urls;
delete status.lro_status_url;
delete status.recording_period;
delete status.recording_period_sequence;
delete status.edge_meta_size;
delete status.insertions;
return status;
} catch (e) {
console.log("FAILED", JSON.stringify(e));
}
}
/**
* Set or remove a simple watermark on a live stream object.
*
* @namedParams
* @param {string} op - Operation: "set" to apply a watermark, "rm" to remove it
* @param {string} objectId - Object ID of the live stream
* @param {string} [fileName] - Path to a JSON file containing the watermark definition (required for "set")
* @returns {Promise<Object>} Object with `watermark` and finalized `hash`
*/
async Watermark({op, objectId, fileName}) {
const libraryId = await this.client.ContentObjectLibraryId({objectId});
const edt = await this.client.EditContentObject({
objectId,
libraryId
});
const recordingParamsPath = "live_recording/recording_config/recording_params";
let m = await this.client.ContentObjectMetadata({
libraryId,
objectId,
writeToken: edt.write_token,
metadataSubtree: recordingParamsPath,
resolveLinks: false
});
if (!m) {
throw "stream object must be configured";
}
switch (op) {
case "set":
const wmBuf = fs.readFileSync(fileName);
const wm = JSON.parse(wmBuf);
m.simple_watermark = wm;
break;
case "rm":
delete m.simple_watermark;
break;
default:
throw "watermark operation must be 'set' or 'rm'";
}
await this.client.ReplaceMetadata({
libraryId,
objectId,
writeToken: edt.write_token,
metadataSubtree: recordingParamsPath,
metadata: m
});
let res = {
"watermark": m.simple_watermark
};
let finalize = true;
if (finalize) {
let fin = await this.client.FinalizeContentObject({
libraryId,
objectId,
writeToken: edt.write_token,
commitMessage: "Watermark " + op
});
res.hash = fin.hash;
}
return res;
}
/**
* Retrieve the resolved recording configuration for a live stream, merging
* the user-supplied `live_recording_config` metadata with the base profile.
*
* @namedParams
* @param {string} name - The object ID of the live stream
* @returns {Promise<Object>} Resolved stream configuration
*/
async StreamConfig({name}) {
const objectId = name;
// Read user config (meta /live_recording_config)
const libraryId = await this.client.ContentObjectLibraryId({objectId});
let userConfig = await this.client.ContentObjectMetadata({
libraryId,
objectId,
metadataSubtree: "live_recording_config",
resolveLinks: false
});
return this.client.StreamConfig({name, customSettings: userConfig});
}
/**
* List all playout URLs for streams associated with a site object.
*
* @namedParams
* @param {string} siteId - Object ID of the site
* @returns {Promise<Object>} Map of stream names to playout URLs
*/
async StreamListUrls({siteId}) {
return this.client.StreamListUrls({siteId});
}
async ReadEdgeMeta() {
}
/**
* Calculate live streaming latency across three dimensions: part ingest delay,
* segment egress delay, and metadata retrieval time.
*
* Probes segments at positions 1, 8, and 15 within the current mezzanine part
* and aggregates min/max/avg egress delay.
*
* @namedParams
* @param {Object} status - Stream status object (from {@link Status})
* @returns {Promise<Object>} Latency stats including `part_ingest`, `egress`, and `meta_delay`
*/
async LatencyCalculator({status}) {
const debug = this.debug;
let stats = {};
if (debug) console.log("Latency calculator ", status.object_id);
const getMetaStart = process.hrtime();
let edgeMeta = await this.client.ContentObjectMetadata({
libraryId: status.library_id,
objectId: status.object_id,
writeToken: status.edge_write_token
});
const getMetaElapsed = process.hrtime(getMetaStart);
stats.meta_delay = (getMetaElapsed[0] * 1000000000 + getMetaElapsed[1]) / 1000000;
let recordings = edgeMeta.live_recording.recordings;
let sequence = recordings.recording_sequence;
let period = recordings.live_offering[sequence - 1];
let params = edgeMeta.live_recording.recording_config.recording_params;
let sourceTimescale = params.source_timescale;
let videoSegDurationTs = params.xc_params.video_seg_duration_ts;
let mezDurationMillis = videoSegDurationTs * 1000 / sourceTimescale;
let segDurationMillis = mezDurationMillis / 15;
let startTimeMillis = period.start_time_epoch_sec * 1000;
let reps = [];
for (let i = 0; i < params.ladder_specs.length; i ++) {
reps[i] = params.ladder_specs[i].representation;
}
// Using the top rep
stats.rep = reps[0];
stats.start_time = startTimeMillis;
stats.seg_duration = segDurationMillis;
stats.mez_duration = mezDurationMillis;
// Ingest latency
let videoSources = period.sources.video;
let videoSourcesTrimmed = Number(period.sources.video_trimmed);
let min = Number.MAX_SAFE_INTEGER, max = 0, sum = 0, cnt =0;
for (let i = 0; i < videoSources.length; i ++) {
let finalized = videoSources[i].finalization_time / 1000;
if (finalized <= 0) {
continue;
}
let partDelay = finalized - startTimeMillis - (1 + i + videoSourcesTrimmed) * mezDurationMillis;
if (partDelay < min) {
min = partDelay;
}
if (partDelay > max) {
max = partDelay;
}
sum += partDelay;
cnt ++;
}
stats.part_ingest = {
delay_min: min,
delay_max: max,
delay_avg: sum / cnt
};
// Segment delivery latency
stats.egress = {
seg_delay_min: Number.MAX_SAFE_INTEGER,
seg_delay_max: 0,
seg_delay_sum: 0,
seg_delay_cnt: 0,
};
let details = {};
// Find first seg in part in the future
let nowMillis = new Date().getTime();
let segNum = Math.floor((35000 + nowMillis - startTimeMillis) / segDurationMillis);
segNum = Math.floor(segNum / 15) * 15;
details.segOne = await this.LatencySegment({status, stats, sequence, period, segNum});
details.segOne2 = await this.LatencySegment({status, stats, sequence, period, segNum: segNum + 1});
details.segOne3 = await this.LatencySegment({status, stats, sequence, period, segNum: segNum + 2});
details.segOne4 = await this.LatencySegment({status, stats, sequence, period, segNum: segNum + 3});
// Segment 8 in the part (in the future)
nowMillis = new Date().getTime();
segNum = Math.floor((35000 + nowMillis - startTimeMillis) / segDurationMillis);
segNum = Math.floor(segNum / 15) * 15 + 8;
details.segEight = await this.LatencySegment({status, stats, sequence, period, segNum});
details.segEight2 = await this.LatencySegment({status, stats, sequence, period, segNum: segNum + 1});
details.segEight3 = await this.LatencySegment({status, stats, sequence, period, segNum: segNum + 2});
details.segEight4 = await this.LatencySegment({status, stats, sequence, period, segNum: segNum + 3});
// Seg 15 in the part (in the future)
nowMillis = new Date().getTime();
segNum = Math.floor((35000 + nowMillis - startTimeMillis) / segDurationMillis);
segNum = Math.floor(segNum / 15) * 15 - 1;
details.segFifteen = await this.LatencySegment({status, stats, sequence, period, segNum});
details.segFifteen2 = await this.LatencySegment({status, stats, sequence, period, segNum: segNum + 1});
details.segFifteen3 = await this.LatencySegment({status, stats, sequence, period, segNum: segNum + 2});
details.segFifteen4 = await this.LatencySegment({status, stats, sequence, period, segNum: segNum + 3});
stats.egress.seg_delay_avg = stats.egress.seg_delay_sum / stats.egress.seg_delay_cnt;
delete stats.egress.seg_delay_sum;
delete stats.egress.seg_delay_cnt;
if (debug) stats.details = details;
return stats;
}
/**
* Measure the egress latency for a single HLS segment by timing an HTTP GET
* from its scheduled availability to first byte and full download.
*
* @namedParams
* @param {number} segNum - Segment index within the recording period
* @param {Object} stats - Cumulative stats object; egress fields are updated in place
* @param {number} sequence - Current recording sequence number
* @param {Object} period - Recording period metadata object
* @param {Object} status - Stream status object (provides library/object IDs)
* @returns {Promise<Object>} Per-segment result: segNum, segDelay, segDelayFirstByte, segSize, downloadMbps
*/
async LatencySegment({segNum, stats, sequence, period, status}) {
const debug = this.debug;
let startTimeMillis = period.start_time_epoch_sec * 1000;
let segDurationMillis = stats.seg_duration;
let nowMillis = new Date().getTime();
let targetMillis = startTimeMillis + segNum * segDurationMillis;
if (debug) console.log("Segment target", segNum, targetMillis, "from_now", targetMillis - nowMillis);
let segURL = await this.client.FabricUrl({
libraryId: status.library_id,
objectId: status.object_id,
queryParams: {rec_seq: sequence},
rep: "playout/default/hls-clear/video/" + stats.rep + "/00" + segNum + ".m4s",
});
if (debug) console.log(segURL);
let segSize = 0;
let segDelayFirstByte;
let segDelay;
let downloadMbps;
await new Promise((resolve) => {
https.get(segURL, (res) => {
res.once("readable", () => {
segDelayFirstByte = new Date().getTime() - targetMillis;
});
res.on("data", (chunk) => {
segSize += chunk.length;
});
res.on("end", () => {
segDelay = new Date().getTime() - targetMillis;
downloadMbps = segSize * 8 / (segDelay - segDelayFirstByte) / 1024;
resolve();
});
res.on("error", err => {
console.log("Error: " + err.message);
});
});
});
if (segDelay < stats.egress.seg_delay_min) {
stats.egress.seg_delay_min = segDelay;
}
if (segDelay > stats.egress.seg_delay_max) {
stats.egress.seg_delay_max = segDelay;
}
stats.egress.seg_delay_cnt ++;
stats.egress.seg_delay_sum += segDelay;
return {
segNum, segDelay, segDelayFirstByte, segSize, downloadMbps
};
}
/**
* Look up the start and end times for a SCTE-35 event by its ID,
* using the LRO status URL from the stream status object.
*
* @namedParams
* @param {string} eventId - SCTE-35 event ID to search for
* @param {Object} status - Stream status object (must include `lro_status_url`)
* @returns {Promise<Object>} Object with `eventStart`, `eventEnd`, and `eventId`
*/
async CueInfo({eventId, status}) {
let cues;
try {
let lroStatus = await got(status.lro_status_url);
cues = JSON.parse(lroStatus.body).custom.cues;
} catch (error) {
console.log("LRO status failed", error);
return {error: "failed to retrieve status", eventId};
}
let eventStart, eventEnd;
for (const value of Object.values(cues)) {
for (const event of Object.values(value.descriptors)) {
if (event.id == eventId) {
switch (event.type_id) {
case 32:
case 16:
eventStart = value.insertion_time;
break;
case 33:
case 17:
eventEnd = value.insertion_time;
break;
}
}
}
}
return {eventStart, eventEnd, eventId};
}
/**
* Switch a stream's playout source between its primary live feed and a
* backup content hash (e.g. a pre-recorded fallback).
*
* @namedParams
* @param {string} name - Object ID of the stream (site) object
* @param {string} source - "primary" to use the live feed, "backup" to use the backup hash
* @param {string} [backupHash] - Content hash of the backup object (required when source is "backup")
* @returns {Promise<Object>} Finalize result with `source` and resolved `link`
*/
async StreamSwitch({name, source, backupHash}) {
console.log("Switch", name, source, backupHash);
const objectId = name;
const libraryId = await this.client.ContentObjectLibraryId({objectId});
const edt = await this.client.EditContentObject({
libraryId,
objectId
});
const writeToken = edt.write_token;
let sources = await this.client.ContentObjectMetadata({
libraryId,
objectId,
writeToken,
metadataSubtree: "public/asset_metadata/sources",
resolveLinks: false
});
var rep = "/rep/playout/default/options.json";
var lk = "." + rep;
if (source == "backup") {
if (!backupHash) {
throw "Bad backup hash";
}
lk = "/qfab/" + backupHash + rep;
}
sources.default["/"] = lk;
await this.client.ReplaceMetadata({
libraryId,
objectId,
writeToken,
metadataSubtree: "public/asset_metadata/sources",
metadata: sources
});
const fin = await this.client.FinalizeContentObject({
libraryId,
objectId,
writeToken
});
return {
...fin,
source,
link: lk
};
}
/**
* Find a content type object ID by its label, looking up the types registered
* in the tenant's top-level object (`public/content_types`).
*
* @namedParams
* @param {string} label - Content type label (e.g. "live-stream", "title")
* @returns {Promise<string|undefined>} Content type object ID, or undefined if not found
*/
async FindContentType({label}) {
const tenantId = await this.client.userProfileClient.TenantContractId();
const objectId = "iq__" + tenantId.substring(4);
const libraryId = await this.client.ContentObjectLibraryId({objectId});
const m = await this.client.ContentObjectMetadata({objectId, libraryId, metadataSubtree: "/public/content_types"});
return m[label];
}
} // End class
// TODO fix and add as CLI command
/*
const ConfigStreamRebroadcast = async () => {
const t = 1619850660;
try {
let client;
if (conf.clientConf.configUrl) {
client = await ElvClient.FromConfigurationUrl({
configUrl: conf.clientConf.configUrl
});
} else {
client = new ElvClient(conf.clientConf);
}
const wallet = client.GenerateWallet();
const signer = wallet.AddAccount({ privateKey: conf.signerPrivateKey });
client.SetSigner({ signer });
const fabURI = client.fabricURIs[0];
console.log("Fabric URI: " + fabURI);
const ethURI = client.ethereumURIs[0];
console.log("Ethereum URI: " + ethURI);
client.ToggleLogging(false);
let mainMeta = await client.ContentObjectMetadata({
libraryId: conf.libraryId,
objectId: conf.objectId
});
console.log("Main meta:", mainMeta);
edgeWriteToken = mainMeta.edge_write_token;
console.log("Edge: ", edgeWriteToken);
let edgeMeta = await client.ContentObjectMetadata({
libraryId: conf.libraryId,
objectId: conf.objectId,
writeToken: edgeWriteToken
});
console.log("Edge meta:", edgeMeta);
//console.log("CONFIG: ", edgeMeta.live_recording_parameters.live_playout_config);
console.log("recording_start_time: ", edgeMeta.recording_start_time);
console.log("recording_stop_time: ", edgeMeta.recording_stop_time);
// Set rebroadcast start
edgeMeta.live_recording_parameters.live_playout_config.rebroadcast_start_time_sec_epoch = t;
await client.MergeMetadata({
libraryId: conf.libraryId,
objectId: conf.objectId,
writeToken: edgeWriteToken,
metadata: {
"live_recording_parameters": {
"live_playout_config" : edgeMeta.live_recording_parameters.live_playout_config
}
}
});
} catch (error) {
console.error(error);
}
};
*/
exports.EluvioLiveStream = EluvioLiveStream;