/* eslint no-console: 0 */
/**
* Methods for Live Stream creation and management
*
* @module ElvClient/LiveStream
*/
const {LiveConf} = require("./LiveConf");
const path = require("path");
const fs = require("fs");
const HttpClient = require("../HttpClient");
const Fraction = require("fraction.js");
const {ValidateObject, ValidatePresence} = require("../Validation");
const ContentObjectAudit = require("../ContentObjectAudit");
const slugify = str => (str || "").toLowerCase().trim().replace(/ /g, "-").replace(/[^a-z0-9-]/g, "");
const LRCProfile = require("../live_recording_config_profiles/live_recording_config_default");
const R = require("ramda");
const UrlJoin = require("url-join");
const URI = require("urijs");
const MakeTxLessToken = async({client, libraryId, objectId, versionHash}) => {
const tok = await client.authClient.AuthorizationToken({libraryId, objectId,
versionHash, channelAuth: false, noCache: true,
noAuth: true});
return tok;
};
const Sleep = (ms) => {
return new Promise(resolve => setTimeout(resolve, ms));
};
const VALID_PLAYOUT_FORMATS = [
"hls-sample-aes",
"hls-aes128",
"hls-fairplay",
"hls-widevine-cenc",
"hls-playready-cenc",
"dash-widevine",
"hls-clear",
"dash-clear"
];
/**
* Converts a list of File objects into an array of file info objects suitable for upload.
*
* @param {string} path - The destination path prefix for the files
* @param {FileList|Array<File>} fileList - The list of File objects to process
*
* @returns {Promise<Array<Object>>} - Array of file info objects with path, type, size, mime_type, and data
*/
const FileInfo = async ({path, fileList}) => {
return await Promise.all(
Array.from(fileList).map(async file => {
const data = file;
const filePath = file.webkitRelativePath || file.name;
return {
path: UrlJoin(path, filePath).replace(/^\/+/g, ""),
type: "file",
size: file.size,
mime_type: file.type,
data,
name: file.name
};
})
);
};
const GetStreamProbe = async ({client, libraryId, objectId, streamHref, endpoint}) => {
client.SetNodes({fabricURIs: [endpoint]});
let probe = {};
try {
const probeUrl = await client.Rep({
libraryId,
objectId,
rep: "probe"
});
probe = await client.utils.ResponseToJson(
await HttpClient.Fetch(probeUrl, {
body: JSON.stringify({
"filename": streamHref,
"listen": true
}),
method: "POST"
})
);
if(probe.errors) {
throw probe.errors[0];
}
} catch(error) {
if(error.code === "ETIMEDOUT") {
throw "Stream probe timed out - make sure the stream source is available";
} else {
throw error;
}
}
probe.format.filename = streamHref;
return probe;
};
const GetNodeFromStreamData = async ({client, url, nodeId, nodeApi}) => {
let nodes;
if(url) {
const parsedName = url
.replace("udp://", "https://")
.replace("rtmp://", "https://")
.replace("rtp://", "https://")
.replace("srt://", "https://");
// Use regex for hostname extraction — new URL() rejects ports > 65535 (e.g. SRT streams)
const hostName = parsedName.match(/^https?:\/\/([^/:]+)/)?.[1];
client.Log(`Retrieving nodes - matching: ${hostName}`);
nodes = await client.SpaceNodes({matchEndpoint: hostName});
} else if(nodeId) {
nodes = await client.SpaceNodes({matchNodeId: nodeId});
url = nodes?.[0].services.fabric_api?.urls?.[0];
}
// Preserve the original stream URL (including any high port numbers) as a plain string
const streamHref = nodeApi ?? url;
if(nodes.length < 1) {
throw new Error(`No node found for stream URL: ${streamHref}. Wrong network?`);
}
const node = {
endpoints: nodes[0].services.fabric_api.urls,
id: nodes[0].id
};
const endpoint = node.endpoints[0];
return {node, endpoint, streamHref};
};
const CueInfo = async ({eventId, status}) => {
let cues;
try {
const lroStatusResponse = await this.utils.ResponseToJson(
await HttpClient.Fetch(status.lro_status_url)
);
cues = lroStatusResponse.custom.cues;
} catch(error) {
console.log("LRO status failed", error);
return {error: "failed to retrieve status", eventId};
}
let eventStart, eventEnd;
for(const value of Object.values(cues)) {
for(const event of Object.values(value.descriptors)) {
if(event.id == eventId) {
switch(event.type_id) {
case 32:
case 16:
eventStart = value.insertion_time;
break;
case 33:
case 17:
eventEnd = value.insertion_time;
break;
}
}
}
}
return {eventStart, eventEnd, eventId};
};
/**
* Create a live stream object
*
* @methodGroup Live Stream
* @namedParams
* @param {string} libraryId - ID of the library for the new live stream object
* @param {string=} objectId - ID of the object
* @param {string} url - Source stream URL
* @param {boolean=} finalize - If enabled, object will be finalized after creation (default: true)
* @param {LiveRecordingConfig=} liveRecordingConfig - Configuration profile for the live stream including recording, playout, and transcoding settings
*
* @param {Object=} options - Additional options for customizing a live stream
* @param {string=} options.name - Name of the live stream
* @param {string=} options.displayTitle - Display title for the live stream
* @param {string=} options.description - Description for the live stream
* @param {Array<string>=} options.accessGroups - Access group addresses to receive 'manage' permissions
* @param {string=} options.permission - Permission level to set on the object
* @param {boolean=} options.linkToSite - If enabled, will create a link in the live stream site
* @param {boolean=} options.initializeDrm - If enabled, will initialize DRM for the object
* @param {string=} options.ingressNodeId - ID of the ingress node used for stream allocation (required for non-public nodes)
*
* @return {Promise<Object>} - Object containing objectId, libraryId, writeToken, and hash if finalized
*/
exports.StreamCreate = async function({
libraryId,
objectId,
url,
finalize=true,
liveRecordingConfig,
options={}
}) {
const defaultName = `LIVE STREAM - ${new Date().toISOString().slice(0, 10)}`;
const existingObject = !!objectId;
let contentType;
let adminGroups = options.accessGroups ?? [];
// Retrieve live stream content type
try {
const tenantId = await this.userProfileClient.TenantContractId();
const tenantMeta = await this.ContentObjectMetadata({
libraryId: tenantId.replace("iten", "ilib"),
objectId: tenantId.replace("iten", "iq__"),
metadataSubtree: "public",
select: [
"content_types/live_stream"
]
});
const tenantContentAdminGroup = await this.ContentAdminGroup({tenantContractId: tenantId});
adminGroups = adminGroups.concat(tenantContentAdminGroup ?? []);
contentType = tenantMeta.content_types?.live_stream;
if(!contentType) {
throw new Error(`No content type configured for tenant ${tenantId}`);
}
} catch(error) {
console.error("Unable to load tenant data", error);
}
let editResponse;
if(objectId) {
// Edit existing object
editResponse = await this.EditContentObject({
libraryId,
objectId
});
// If no new URL is provided, use value from saved config
if(!url) {
url = await this.ContentObjectMetadata({
libraryId,
objectId,
metadataSubtree: "live_recording_config/url"
});
}
} else {
// Create new object
editResponse = await this.CreateContentObject({
libraryId,
options: {
type: contentType
}
});
objectId = editResponse.objectId;
}
const {writeToken} = editResponse;
const {
accessGroup,
name=defaultName,
displayTitle,
description,
permission="editable",
ingressNodeId,
initializeDrm=true
} = options;
if(!liveRecordingConfig) {
liveRecordingConfig = {};
}
const playoutFormats = liveRecordingConfig?.playout_config?.playout_formats;
if(playoutFormats) {
const invalid = playoutFormats.filter(f => !VALID_PLAYOUT_FORMATS.includes(f));
if(invalid.length > 0) {
throw new Error(`Invalid playout_formats: ${invalid.join(", ")}. Valid values: ${VALID_PLAYOUT_FORMATS.join(", ")}`);
}
}
liveRecordingConfig.url = url;
liveRecordingConfig.ingress_node_id = ingressNodeId;
// Add access group permissions
await Promise.all(
adminGroups.filter(el => !!el).map(async(group) => {
if(!group) { return; }
await
this.AddContentObjectGroupPermission({
objectId,
groupAddress: group,
permission: "manage"
});
})
);
const metadata = {
public: {
name,
description,
asset_metadata: {
display_title: displayTitle || name,
title: name || displayTitle || defaultName,
title_type: "live_stream",
video_type: "live",
slug: slugify(name)
}
},
"live_recording_config": liveRecordingConfig
};
const oldProfile = await this.ContentObjectMetadata({
libraryId,
objectId,
metadataSubtree: "live_recording_config/name"
});
if(liveRecordingConfig?.name && liveRecordingConfig.name !== oldProfile) {
metadata.public.asset_metadata.profile_last_updated = new Date().toISOString();
}
await this.MergeMetadata({
libraryId,
objectId,
writeToken,
metadata
});
try {
await this.CreateLinks({
libraryId,
objectId,
writeToken,
links: [{
type: "rep",
path: "public/asset_metadata/sources/default",
target: "playout/default/options.json"
}]
});
} catch(error) {
console.log("Failed to create links", error);
}
await this.SetPermission({
objectId,
permission: permission,
writeToken
});
let returnResponse = {
objectId,
libraryId,
writeToken
};
// If stream info is provided, continue to configure
if(liveRecordingConfig?.input_stream_info) {
await this.StreamConfig({
name: objectId,
liveRecordingConfig,
inputStreamInfo: liveRecordingConfig.input_stream_info,
writeToken,
finalize: false
});
}
if(initializeDrm) {
const formats = liveRecordingConfig?.playout_config?.playout_formats;
await this.StreamInitialize({
name: objectId,
drm: (formats || []).some(el => !el.includes("clear")) ? true : false,
format: formats ? formats?.join(",") : "",
writeToken,
finalize: false
});
}
if(finalize) {
let finalizeResponse = await this.FinalizeContentObject({
libraryId,
objectId,
writeToken,
commitMessage: existingObject ? "Update live stream" : "Create live stream"
});
returnResponse = {...returnResponse, ...finalizeResponse};
}
if(finalize && liveRecordingConfig?.name) {
const slug = slugify(liveRecordingConfig?.name);
await this.StreamAssignProfile({profileSlug: slug, streamObjectId: objectId});
}
if(options.linkToSite) {
await this.StreamLinkToSite({
objectId
});
}
return returnResponse;
};
/**
* Load live stream data from site object
*
* @methodGroup Live Stream
* @namedParams
* @param {boolean=} resolveIncludeSource - If specified, resolved links will include the hash of the link at the root of the metadata
* @param {boolean=} resolveLinks - If specified, links in the metadata will be resolved
* @param {boolean=} resolveIgnoreErrors - If specified, link errors within the requested metadata will not cause the entire response to result in an error
*
* @return {Promise<Object>}
*/
exports.StreamSiteSettings = async function({
resolveLinks=true,
resolveIncludeSource=true,
resolveIgnoreErrors=true
}={}) {
const tenantId = await this.userProfileClient.TenantContractId();
if(!tenantId) {
throw new Error("Tenant ID not found. Ensure the user profile has a tenant contract configured.");
}
const tenantLibraryId = tenantId.replace("iten", "ilib");
const tenantObjectId = tenantId.replace("iten", "iq__");
const [siteObjectId, contentTypes] = await Promise.all([
this.ContentObjectMetadata({
libraryId: tenantLibraryId,
objectId: tenantObjectId,
metadataSubtree: "public/sites/live_streams",
}),
this.ContentObjectMetadata({
libraryId: tenantLibraryId,
objectId: tenantObjectId,
metadataSubtree: "public/content_types",
select: ["live_stream", "title"]
})
]);
const siteLibraryId = await this.ContentObjectLibraryId({objectId: siteObjectId});
const streamMetadata = await this.ContentObjectMetadata({
libraryId: siteLibraryId,
objectId: siteObjectId,
metadataSubtree: "public/asset_metadata/live_streams",
resolveIncludeSource,
resolveLinks,
resolveIgnoreErrors
});
return {
streamMetadata,
siteObjectId,
siteLibraryId,
contentTypes
};
};
/**
* Link a live stream object to a site by adding it to the site's live_streams metadata.
* Creates a fabric link to the stream object with proper ordering.
*
* @methodGroup Live Stream
* @namedParams
* @param {string} objectId - Object ID of the live stream to link to the site
*
* @return {Promise<void>}
*/
exports.StreamLinkToSite = async function({
objectId
}) {
try {
ValidateObject(objectId);
const {streamMetadata, siteObjectId, siteLibraryId} = await this.StreamSiteSettings({resolveLinks: false, resolveIncludeSource: false, resolveIgnoreErrors: false});
const alreadyLinked = Object.values(streamMetadata || {}).some(entry => {
const source = entry["."]?.source;
return source && this.utils.DecodeVersionHash(source).objectId === objectId;
});
if(alreadyLinked) {
return;
}
const objectName = await this.ContentObjectMetadata({
libraryId: await this.ContentObjectLibraryId({objectId}),
objectId,
metadataSubtree: "public/name"
});
const streamKey = slugify(objectName);
const streamData = {
".": {
container: await this.LatestVersionHash({objectId: siteObjectId}),
auto_update: {
tag: "latest"
}
},
"/": `/qfab/${await this.LatestVersionHash({objectId})}/meta/public/asset_metadata`,
order: Object.keys(streamMetadata).length + 1
};
const {writeToken} = await this.EditContentObject({
libraryId: siteLibraryId,
objectId: siteObjectId
});
streamMetadata[streamKey] = streamData;
await this.ReplaceMetadata({
libraryId: siteLibraryId,
objectId: siteObjectId,
writeToken,
metadataSubtree: "public/asset_metadata/live_streams",
metadata: streamMetadata
});
await this.FinalizeContentObject({
libraryId: siteLibraryId,
objectId: siteObjectId,
writeToken,
commitMessage: "Add live stream",
awaitCommitConfirmation: true
});
} catch(error) {
// eslint-disable-next-line no-console
console.error("Failed to link stream object to site", JSON.stringify(error, null, 2));
}
};
/**
* Unlink a live stream object from a site by removing it from the site's live_streams metadata.
*
* @methodGroup Live Stream
* @namedParams
* @param {string=} objectId - Object ID of the live stream to link to the site
* @param {string=} slug - Slug of the object
* @param {string=} siteObjectId - Object ID of the site (defaults to rootStore.dataStore.siteId)
* @param {string=} siteLibraryId - Library ID of the site (defaults to rootStore.dataStore.siteLibraryId)
* @param {string=} writeToken - Write token for the draft object. If not provided, a new edit will be opened and finalized.
* @param {boolean=} finalize - If enabled, the site object will be finalized after the removal (default: true)
* @return {Promise<void>}
*/
exports.StreamRemoveLinkToSite = async function({objectId, slug, writeToken, finalize=true}) {
try {
if(!objectId && !slug) {
throw new Error("Either objectId or slug must be provided.");
}
const {streamMetadata, siteObjectId, siteLibraryId} = await this.StreamSiteSettings({resolveIncludeSource: false, resolveLinks: false});
if(objectId) {
Object.keys(streamMetadata || {}).forEach(streamSlug => {
let source = streamMetadata[streamSlug]["."]?.source;
// If object has been deleted, resolving the link will not return a source, so check link hq__
if(!source) {
const match = streamMetadata[streamSlug]?.["/"].match(/(hq__[^/]+)/);
source = match ? match[1] : undefined;
}
const id = this.utils.DecodeVersionHash(source).objectId;
if(id === objectId) {
slug = streamSlug;
}
});
}
if(slug) {
delete streamMetadata[slug];
if(!writeToken) {
({writeToken} = await this.EditContentObject({
libraryId: siteLibraryId,
objectId: siteObjectId
}));
}
await this.DeleteMetadata({
libraryId: siteLibraryId,
objectId: siteObjectId,
writeToken,
metadataSubtree: `public/asset_metadata/live_streams/${slug}`
});
if(finalize) {
await this.FinalizeContentObject({
libraryId: siteLibraryId,
objectId: siteObjectId,
writeToken,
commitMessage: "Remove live stream",
awaitCommitConfirmation: true
});
}
} else {
throw new Error(`Provided objectId ${objectId} not found in site live_streams`);
}
} catch(error) {
// eslint-disable-next-line no-console
console.error("Failed to remove stream object link from site", error);
}
};
/**
* Set the offering for the live stream
*
* @methodGroup Live Stream
* @namedParams
* @param {Object} client - The client object
* @param {string} libraryId - ID of the library for the new live stream object
* @param {string} objectId - ID of the new live stream object
* @param {string=} writeToken - Write token of the draft
* @param {string=} typeAbrMaster - Content type hash
* @param {string=} typeLiveStream - Content type hash
* @param {string} streamUrl - Live source URL
* @param {object} abrProfile - ABR Profile for the offering
* @param {number} aBitRate - Audio bitrate
* @param {number} aChannels - Audio channels
* @param {number} aSampleRate - Audio sample rate
* @param {number} aStreamIndex - Audio stream index
* @param {string} aTimeBase - Audio time base as a fraction, e.g. "1/48000" (usually equal to 1/aSampleRate)
* @param {string} aChannelLayout - Channel layout, e.g. "stereo"
* @param {number} vBitRate - Video bitrate
* @param {number} vHeight - Video height
* @param {number} vStreamIndex - Video stream index
* @param {number} vWidth - Video width
* @param {string} vDisplayAspectRatio - Display aspect ratio as a fraction, e.g. "16/9"
* @param {string} vFrameRate - Frame rate as an integer, e.g. "30"
* @param {string} vTimeBase - Time base as a fraction, e.g. "1/30000"
*
* @return {Promise<string>} - Final hash of the live stream object
*/
const StreamGenerateOffering = async({
client,
libraryId,
objectId,
writeToken,
typeAbrMaster,
typeLiveStream,
streamUrl,
abrProfile,
aBitRate,
aChannels,
aSampleRate,
aStreamIndex,
aTimeBase,
aChannelLayout,
vBitRate,
vHeight,
vStreamIndex,
vWidth,
vDisplayAspectRatio,
vFrameRate,
vTimeBase,
finalize=true
}) => {
// compute duration_ts
const DUMMY_DURATION = 1001; // should result in integer duration_ts values for both audio and video
const aDurationTs = Fraction(aTimeBase).inverse().mul(DUMMY_DURATION).valueOf();
const vDurationTs = Fraction(vTimeBase).inverse().mul(DUMMY_DURATION).valueOf();
// construct /production_master/sources/STREAM_URL/streams
const sourceAudioStream = {
"bit_rate": aBitRate,
"channel_layout": aChannelLayout,
"channels": aChannels,
"codec_name": "aac",
"duration": DUMMY_DURATION,
"duration_ts": aDurationTs,
"frame_count": 0,
"language": "",
"max_bit_rate": aBitRate,
"sample_rate": aSampleRate,
"start_pts": 0,
"start_time": 0,
"time_base": aTimeBase,
"type": "StreamAudio"
};
const sourceVideoStream = {
"bit_rate": vBitRate,
"codec_name": "h264",
"display_aspect_ratio": vDisplayAspectRatio,
"duration": DUMMY_DURATION,
"duration_ts": vDurationTs,
"field_order": "progressive",
"frame_count": 0,
"frame_rate": vFrameRate,
"hdr": null,
"height": vHeight,
"language": "",
"max_bit_rate": vBitRate,
"sample_aspect_ratio": "1",
"start_pts": 0,
"start_time": 0,
"time_base": vTimeBase,
"type": "StreamVideo",
"width": vWidth
};
// placeholder stream to use if [aStreamIndex, vStreamIndex].sort() is not [0,1]
const DUMMY_STREAM = {
"bit_rate": 0,
"codec_name": "",
"duration": DUMMY_DURATION,
"duration_ts": 2500 * DUMMY_DURATION,
"frame_count": 1,
"language": "",
"max_bit_rate": 0,
"start_pts": 0,
"start_time": 0,
"time_base": "1/2500",
"type": "StreamData"
};
const sourceStreams = [];
const maxStreamIndex = Math.max(aStreamIndex, vStreamIndex);
for(let i = 0; i <= maxStreamIndex; i++) {
if(i === aStreamIndex) {
sourceStreams.push(sourceAudioStream);
} else if(i === vStreamIndex) {
sourceStreams.push(sourceVideoStream);
} else {
sourceStreams.push(DUMMY_STREAM);
}
}
// construct /production_master/sources
const sources = {
[streamUrl]: {
"container_format": {
"duration": DUMMY_DURATION,
"filename": streamUrl,
"format_name": "mov,mp4,m4a,3gp,3g2,mj2",
"start_time": 0
},
"streams": sourceStreams
}
};
// construct /production_master/variants
const variants = {
"default": {
"streams": {
"audio": {
"default_for_media_type": false,
"label": "",
"language": "",
"mapping_info": "",
"sources": [
{
"files_api_path": streamUrl,
"stream_index": aStreamIndex
}
]
},
"video": {
"default_for_media_type": false,
"label": "",
"language": "",
"mapping_info": "",
"sources": [
{
"files_api_path": streamUrl,
"stream_index": vStreamIndex
}
]
}
}
}
};
// construct /production_master
const production_master = {sources, variants};
const existingWriteToken = !!writeToken;
// get existing metadata
console.log("Retrieving current metadata...");
let metadata = await client.ContentObjectMetadata({
libraryId,
objectId,
writeToken
});
// add /production_master to metadata
metadata.production_master = production_master;
// write back to object
if(!writeToken) {
({writeToken} = await client.EditContentObject({
libraryId,
objectId,
options: {
type: typeAbrMaster
}
}));
}
console.log("Writing back metadata with /production_master added...");
await client.ReplaceMetadata({
libraryId,
metadata,
objectId,
writeToken
});
let finalizeResponse, masterVersionHash;
if(!existingWriteToken) {
finalizeResponse = await client.FinalizeContentObject({
libraryId,
objectId,
writeToken
});
masterVersionHash = finalizeResponse.hash;
}
// Generate offering
const createResponse = await client.CreateABRMezzanine({
libraryId,
objectId,
masterVersionHash: existingWriteToken ? undefined : masterVersionHash,
masterWriteToken: existingWriteToken ? writeToken : undefined,
writeToken: existingWriteToken ? writeToken : undefined,
variant: "default",
offeringKey: "default",
abrProfile
});
if(createResponse.warnings.length > 0) {
console.log("WARNINGS:");
console.log(JSON.stringify(createResponse.warnings, null, 2));
}
if(createResponse.errors.length > 0) {
console.log("ERRORS:");
console.log(JSON.stringify(createResponse.errors, null, 2));
}
let versionHash = createResponse.hash;
// get new metadata
console.log("Retrieving revised metadata with offering...");
metadata = await client.ContentObjectMetadata({
libraryId,
objectId,
writeToken: existingWriteToken ? writeToken : undefined,
versionHash: existingWriteToken ? undefined : versionHash
});
console.log("Moving /abr_mezzanine/offerings to /offerings and removing /abr_mezzanine...");
metadata.offerings = metadata.abr_mezzanine.offerings;
delete metadata.abr_mezzanine;
// add items to media_struct needed to use options.json handler
metadata.offerings.default.media_struct.duration_rat = `${DUMMY_DURATION}`;
console.log("Writing back metadata with /offerings...");
await client.ReplaceMetadata({
libraryId,
metadata,
objectId,
writeToken
});
if(finalize) {
console.log("Finalizing...");
finalizeResponse = await client.FinalizeContentObject({
libraryId,
objectId,
writeToken,
commitMessage: "Update offering"
});
const finalHash = finalizeResponse.hash;
console.log(`Finalized, new version hash: ${finalHash}`);
return finalHash;
}
};
/**
* Retrieve the status of the current live stream session
*
* @methodGroup Live Stream
* @namedParams
* @param {string} name - Object ID or name of the live stream object
* @param {boolean=} showParams - If specified, will return recording_params with status
* States:
* unconfigured - no live_recording_config
* uninitialized - no live_recording config generated
* inactive - live_recording config initialized but no 'edge write token'
* stopped - edge-write-token but not started
* starting - LRO running but no source data yet
* running - stream is running and producing output
* stalled - LRO running but no source data (so not producing output)
*
* @return {Promise<Object>} - The status response for the object, as well as logs, warnings and errors from the master initialization
*/
exports.StreamStatus = async function({name, showParams=false, writeToken}) {
let objectId = name;
let status = {name: name};
try {
let libraryId = await this.ContentObjectLibraryId({objectId});
status.libraryId = libraryId;
status.objectId = objectId;
let mainMeta = await this.ContentObjectMetadata({
libraryId,
objectId,
writeToken,
select: [
"live_recording_config",
"live_recording"
]
});
status.ingressNodeApi = mainMeta.live_recording_config?.ingress_node_api;
status.url = mainMeta.live_recording_config?.url;
if(mainMeta.live_recording_config == undefined || mainMeta.live_recording_config.url == undefined) {
status.state = "unconfigured";
return status;
}
if(mainMeta.live_recording == undefined || mainMeta.live_recording.fabric_config == undefined ||
mainMeta.live_recording.playout_config == undefined || mainMeta.live_recording.recording_config == undefined) {
status.state = "uninitialized";
return status;
}
let fabURI = mainMeta.live_recording.fabric_config.ingress_node_api;
if(fabURI === undefined) {
console.log("bad fabric config - missing ingress node API");
status.state = "uninitialized";
return status;
}
// Support both hostname and URL ingress_node_api
if(!fabURI.startsWith("http")) {
// Assume https
fabURI = "https://" + fabURI;
}
status.fabricApi = fabURI;
let edgeWriteToken = mainMeta.live_recording.fabric_config.edge_write_token;
if(!edgeWriteToken) {
status.state = "inactive";
return status;
}
this.RecordWriteToken({writeToken: edgeWriteToken, fabricNodeUrl: fabURI});
status.edgeWriteToken = edgeWriteToken;
status.streamId = edgeWriteToken; // By convention the stream ID is its write token
let edgeMeta;
try {
edgeMeta = await this.CallBitcodeMethod({
libraryId: libraryId,
objectId: objectId,
method: "/live/meta",
constant: true
});
} catch(error) {
if(error.message && error.message.includes("ERR_TOO_MANY_REDIRECTS")) {
console.error("Redirect loop detected when trying to read metadata.");
status.state = "unavailable";
} else {
console.error("Unable to read edge write token metadata. Has token been deleted?", error);
status.state = "inactive";
}
return status;
}
status.edgeMetaSize = JSON.stringify(edgeMeta || "").length;
// If a stream has never been started return state 'inactive'
if(edgeMeta.live_recording === undefined ||
edgeMeta.live_recording.recordings === undefined ||
edgeMeta.live_recording.recordings.recording_sequence === undefined) {
// StreamStartOrStopOrReset relies on the state being 'stopped' before launching the LRO - and so this state cannot be changed to 'inactive'
status.state = "stopped";
return status;
}
let recordings = edgeMeta.live_recording.recordings;
status.recordingPeriodSequence = recordings.recording_sequence;
let sequence = recordings.recording_sequence;
let period = recordings.live_offering[sequence - 1];
let tlro = period.live_recording_handle;
status.tlro = tlro;
let videoLastFinalizationTimeEpochSec = -1;
let videoFinalizedParts = 0;
let sinceLastFinalize = -1;
if(period.finalized_parts_info && period.finalized_parts_info.video && period.finalized_parts_info.video.last_finalization_time) {
videoLastFinalizationTimeEpochSec = period.finalized_parts_info.video.last_finalization_time / 1000000;
videoFinalizedParts = period.finalized_parts_info.video.n_parts;
sinceLastFinalize = Math.floor(new Date().getTime() / 1000) - videoLastFinalizationTimeEpochSec;
}
const recordingPeriod = {
activationTimeEpochSec: period.recording_start_time_epoch_sec,
startTimeEpochSec: period.start_time_epoch_sec,
startTimeText: new Date(period.start_time_epoch_sec * 1000).toLocaleString(),
endTimeEpochSec: period.end_time_epoch_sec,
endTimeText: period.end_time_epoch_sec === 0 ? null : new Date(period.end_time_epoch_sec * 1000).toLocaleString(),
videoParts: videoFinalizedParts,
videoLastPartFinalizedEpochSec: videoLastFinalizationTimeEpochSec,
videoSinceLastFinalizeSec: sinceLastFinalize
};
status.recordingPeriod = recordingPeriod;
status.lroStatusUrl = await this.FabricUrl({
libraryId: libraryId,
objectId: objectId,
writeToken: edgeWriteToken,
call: "live/status"
});
status.insertions = [];
if((edgeMeta.live_recording.playout_config.interleaves != undefined) &&
(edgeMeta.live_recording.playout_config.interleaves[sequence] != undefined)) {
let insertions = edgeMeta.live_recording.playout_config.interleaves[sequence];
for(let i = 0; i < insertions.length; i ++) {
let insertionTimeSinceEpoch = recordingPeriod.startTimeEpochSec + insertions[i].insertion_time;
status.insertions[i] = {
insertionTimeSinceStart: insertions[i].insertion_time,
insertionTime: new Date(insertionTimeSinceEpoch * 1000).toISOString(),
insertionTimeLocal: new Date(insertionTimeSinceEpoch * 1000).toLocaleString(),
target: insertions[i].playout};
}
}
if(showParams) {
status.recordingParams = edgeMeta.live_recording.recording_config.recording_params;
}
let state = "stopped";
let lroStatus = "";
try {
lroStatus = await this.utils.ResponseToJson(
await HttpClient.Fetch(status.lroStatusUrl)
);
state = lroStatus.state;
status.warnings = lroStatus.custom && lroStatus.custom.warnings;
status.quality = lroStatus.custom && lroStatus.custom.quality;
status.input_stats = lroStatus.custom && lroStatus.custom.input_stats;
if(lroStatus.custom && lroStatus.custom.status) {
status.recordingStatus = lroStatus.custom.status;
}
} catch(error) {
console.log("LRO Status (failed): ", error.response.statusCode);
status.state = "stopped";
status.error = error.response;
return status;
}
// Convert LRO 'state' to desired 'state'
if(state === "running" && videoLastFinalizationTimeEpochSec <= 0) {
state = "starting"; // The LRO returns 'running' even if the source hasn't connected
} else if(state == "terminated") {
state = "stopped"; // The LRO reports 'terminated' which for the recording means 'stopped'
}
status.state = state;
if(state === "running") {
let playoutUrls = {};
let playout_options;
try {
playout_options = await this.PlayoutOptions({
objectId
});
} catch(error) {
console.log("Failed to generate playout options based on 'default' offering:", error);
}
let hls_clear_enabled = (
playout_options &&
playout_options.hls &&
playout_options.hls.playoutMethods &&
playout_options.hls.playoutMethods.clear !== undefined
);
if(hls_clear_enabled) {
playoutUrls.hlsClear = await this.FabricUrl({
libraryId: libraryId,
objectId: objectId,
rep: "playout/default/hls-clear/playlist.m3u8",
});
}
let hls_aes128_enabled = (
playout_options &&
playout_options.hls &&
playout_options.hls.playoutMethods &&
playout_options.hls.playoutMethods["aes-128"] !== undefined
);
if(hls_aes128_enabled) {
playoutUrls.hlsAes128 = await this.FabricUrl({
libraryId: libraryId,
objectId: objectId,
rep: "playout/default/hls-aes128/playlist.m3u8",
});
}
let hls_sample_aes_enabled = (
playout_options &&
playout_options.hls &&
playout_options.hls.playoutMethods &&
playout_options.hls.playoutMethods["sample-aes"] !== undefined
);
if(hls_sample_aes_enabled) {
playoutUrls.hlsSampleAes = await this.FabricUrl({
libraryId: libraryId,
objectId: objectId,
rep: "playout/default/hls-sample-aes/playlist.m3u8",
});
}
const networkInfo = await this.NetworkInfo();
let token = await this.authClient.AuthorizationToken({
libraryId,
objectId,
channelAuth: false,
noCache: true,
noAuth: true
});
let embed_net = "main";
if(networkInfo.name.includes("demo")) {
embed_net = "demo";
}
playoutUrls.embedUrl = `https://embed.v3.contentfabric.io/?net=${embed_net}&p&ct=h&oid=${objectId}&mt=lv&ath=${token}`;
status.playoutUrls = playoutUrls;
}
} catch(error) {
console.error(error);
}
return status;
};
/**
* Create a new edge write token
*
* @methodGroup Live Stream
* @namedParams
* @param {string} name - Object ID or name of the live stream object
* @param {boolean=} start - If specified, will start the stream after creation
*
* @return {Promise<Object>} - The status response for the object
*
*/
exports.StreamStartRecording = async function({name, start=false}) {
let status = await this.StreamStatus({name});
if(status.state != "uninitialized" && status.state !== "inactive" && status.state !== "terminated" && status.state !== "stopped") {
return {
state: status.state,
error: "stream still active - must terminate first"
};
}
let objectId = status.objectId;
console.log("START: ", name, "start", start);
let libraryId = await this.ContentObjectLibraryId({objectId: objectId});
// Read live recording parameters - determine ingest node
let liveRecording = await this.ContentObjectMetadata({
libraryId: libraryId,
objectId: objectId,
metadataSubtree: "/live_recording"
});
let fabURI = liveRecording.fabric_config.ingress_node_api;
// Support both hostname and URL ingress_node_api
if(!fabURI.startsWith("http")) {
// Assume https
fabURI = "https://" + fabURI;
}
this.SetNodes({fabricURIs: [fabURI]});
console.log("Node URI", fabURI, "ID", liveRecording.fabric_config.ingress_node_id);
let response = await this.EditContentObject({
libraryId: libraryId,
objectId: objectId
});
const edgeToken = response.write_token;
console.log("Edge token:", edgeToken);
/*
* Set the metadata, including the edge token.
*/
response = await this.EditContentObject({
libraryId: libraryId,
objectId: objectId
});
let writeToken = response.write_token;
this.Log("Merging metadata: ", libraryId, objectId, writeToken);
await this.MergeMetadata({
libraryId: libraryId,
objectId: objectId,
writeToken: writeToken,
metadata: {
live_recording: {
status: {
edge_write_token: edgeToken,
state: "active" // indicates there is an active session (set to 'closed' when done)
},
fabric_config: {
edge_write_token: edgeToken
}
}
}
});
this.Log("Finalizing content draft: ", libraryId, objectId, writeToken);
response = await this.FinalizeContentObject({
libraryId: libraryId,
objectId: objectId,
writeToken: writeToken,
commitMessage: "Create stream edge write token " + edgeToken
});
const objectHash = response.hash;
this.Log("Finalized object: ", objectHash);
status = {
objectId: objectId,
hash: objectHash,
libraryId: libraryId,
streamId: edgeToken,
edgeWriteToken: edgeToken,
fabricApi: fabURI,
state: "stopped"
};
if(start) {
status = this.StreamStartOrStopOrReset({name, op: start});
}
return status;
};
/**
* Start, stop or reset a stream within the current session (current edge write token)
*
* @methodGroup Live Stream
* @namedParams
* @param {string} name - Object ID or name of the live stream object
* @param {string} op - The operation to perform. Possible values:
* 'start'
* 'reset' - Stops current LRO recording and starts a new one. Does
* not create a new edge write token (just creates a new recording
* period in the existing edge write token)
* - 'stop'
*
* @return {Promise<Object>} - The status response for the stream
*
*/
exports.StreamStartOrStopOrReset = async function({name, op}) {
try {
let status = await this.StreamStatus({name});
if(status.state != "stopped") {
if(op === "start") {
status.error = "Unable to start stream - state: " + status.state;
return status;
}
}
if(status.state == "running" || status.state == "starting" || status.state == "stalled") {
try {
await this.CallBitcodeMethod({
libraryId: status.libraryId,
objectId: status.objectId,
writeToken: status.edgeWriteToken,
method: "/live/stop/" + status.tlro,
constant: false
});
} catch(error) {
// The /call/lro/stop API returns empty response
// console.log("LRO Stop (failed): ", error);
}
// Wait until LRO is terminated
let tries = 10;
while(status.state != "stopped" && tries-- > 0) {
console.log("Wait to terminate - ", status.state);
await Sleep(1000);
status = await this.StreamStatus({name});
}
console.log("Status after stop - ", status.state);
if(tries <= 0) {
console.log("Failed to stop");
return status;
}
}
if(op === "stop") {
return status;
}
console.log("STARTING", "edgeWriteToken", status.edgeWriteToken);
try {
await this.CallBitcodeMethod({
libraryId: status.libraryId,
objectId: status.objectId,
writeToken: status.edgeWriteToken,
method: "/live/start",
constant: false
});
} catch(error) {
console.log("LRO Start (failed): ", error);
return {
state: status.state,
error: "LRO start failed - must create a stream first"
};
}
// Wait until LRO is 'starting'
let tries = 10;
while(status.state != "starting" && tries-- > 0) {
console.log("Wait to start - ", status.state);
await Sleep(1000);
status = await this.StreamStatus({name});
}
console.log("Status after restart - ", status.state);
return status;
} catch(error) {
console.error(error);
}
};
/**
* Close the edge write token and make the stream object inactive.
*
* @methodGroup Live Stream
* @namedParams
* @param {string} name - Object ID or name of the live stream object
*
* @return {Promise<Object>} - The finalize response for the stream object
*/
exports.StreamStopRecording = async function({name}) {
try {
this.Log(`Terminating stream session for: ${name}`);
let objectId = name;
let libraryId = await this.ContentObjectLibraryId({objectId});
let mainMeta = await this.ContentObjectMetadata({
libraryId,
objectId
});
let fabURI = mainMeta.live_recording.fabric_config.ingress_node_api;
// Support both hostname and URL ingress_node_api
if(!fabURI.startsWith("http")) {
// Assume https
fabURI = "https://" + fabURI;
}
this.SetNodes({fabricURIs: [fabURI]});
const metaEdgeWriteToken = mainMeta.live_recording.fabric_config.edge_write_token;
if(!metaEdgeWriteToken) {
return {
state: "inactive",
error: "The stream is not active"
};
}
try {
const streamMetadata = await this.ContentObjectMetadata({
libraryId,
objectId,
writeToken: metaEdgeWriteToken
});
let status = await this.StreamStatus({name});
if(status.state !== "stopped") {
status = await this.StreamStartOrStopOrReset({name, op: start});
if(status.state !== "stopped") {
return {
status,
error: "The stream is not stopped"
}
}
}
await this.DeleteWriteToken({
writeToken: metaEdgeWriteToken
});
} catch(error) {
this.Log("Unable to retrieve metadata for edge write token");
}
const {writeToken} = await this.EditContentObject({
libraryId: libraryId,
objectId: objectId
});
// Set stop time and inactive state
const newState = "inactive";
const stopTime = Math.floor(new Date().getTime() / 1000);
const finalizeMetadata = {
live_recording: {
status: {
edge_write_token: "",
state: newState,
recording_stop_time: stopTime
},
fabric_config: {
edge_write_token: ""
}
}
};
await this.MergeMetadata({
libraryId,
objectId,
writeToken,
metadata: finalizeMetadata
});
let fin = await this.FinalizeContentObject({
libraryId,
objectId,
writeToken,
commitMessage: `Deactivate live stream - stop time ${stopTime}`
});
return {
fin,
name,
state: newState
};
} catch(error) {
console.error(error);
}
};
/**
* Initialize the stream object
*
* @methodGroup Live Stream
* @namedParams
* @param {string} name - Object ID or name of the live stream object
* @param {boolean=} drm - If specified, playout will be DRM protected
* @param {string=} format - Specify the list of playout formats and DRM to support,
comma-separated (hls-clear, hls-aes128, hls-sample-aes,
hls-fairplay)
* @param {string=} writeToken - Write token of the draft
* @param {boolean=} finalize - If enabled, target object will be finalized after configuration
*
* @return {Promise<Object>} - The name, object ID, and state of the stream
*/
exports.StreamInitialize = async function({
name,
drm=false,
format,
writeToken,
finalize=true
}) {
try {
let typeAbrMaster;
let typeLiveStream;
// Fetch Title and Live Stream content types from tenant meta
const tenantContractId = await this.userProfileClient.TenantContractId();
const {live_stream, title} = await this.ContentObjectMetadata({
libraryId: tenantContractId.replace("iten", "ilib"),
objectId: tenantContractId.replace("iten", "iq__"),
metadataSubtree: "public/content_types",
select: [
"live_stream",
"title"
]
});
if(live_stream) {
typeLiveStream = live_stream;
}
if(title) {
typeAbrMaster = title;
}
if(typeAbrMaster === undefined || typeLiveStream === undefined) {
console.log("ERROR - unable to find content types", "ABR Master", typeAbrMaster, "Live Stream", typeLiveStream);
return {};
}
res = await this.StreamSetOfferingAndDRM({
name,
typeAbrMaster,
typeLiveStream,
drm,
format,
writeToken,
finalize
});
return res;
} catch(error) {
console.error("Unable to intitialize stream", error);
}
};
/**
* Create a dummy VoD offering and initialize DRM keys.
*
* @methodGroup Live Stream
* @namedParams
* @param {string} name - Object ID or name of the live stream object
* @param {string=} typeAbrMaster - Content type hash
* @param {string=} typeLiveStream - Content type hash
* @param {boolean=} drm - If specified, DRM will be applied to the stream
* @param {string=} format - A list of playout formats and DRM to support, comma-separated
* (hls-clear, hls-aes128, hls-sample-aes, hls-fairplay). If specified,
* this will take precedence over the drm value
* @param {string=} writeToken - Write token of the draft
*
* @return {Promise<Object>} - The name, object ID, and state of the stream
*/
exports.StreamSetOfferingAndDRM = async function({
name,
typeAbrMaster,
typeLiveStream,
drm=false,
format,
writeToken,
finalize=true
}) {
let status = await this.StreamStatus({name});
console.log('StreamSetOfferingAndDrm', status)
const validStates = ["uninitialized", "inactive", "stopped", "unconfigured", "initialized"];
if(!validStates.includes(status.state)) {
return {
state: status.state,
error: "stream still active - must terminate first"
};
}
let objectId = status.objectId;
console.log("INIT: ", name, objectId);
const aBitRate = 128000;
const aChannels = 2;
const aSampleRate = 48000;
const aStreamIndex = 1;
const aTimeBase = "1/48000";
const aChannelLayout = "stereo";
const vBitRate = 14000000;
const vHeight = 720;
const vStreamIndex = 0;
const vWidth = 1280;
const vDisplayAspectRatio = "16/9";
const vFrameRate = "30000/1001";
const vTimeBase = "1/30000"; // "1/16000";
const abrProfileDefault = require("../abr_profiles/abr_profile_live_drm.js");
let playoutFormats;
let abrProfile = JSON.parse(JSON.stringify(abrProfileDefault));
if(format) {
drm = true; // Override DRM parameter
playoutFormats = {};
let formats = format.split(",");
for(let option of formats) {
if(option === "hls-clear") {
abrProfile.drm_optional = true;
playoutFormats["hls-clear"] = {
"drm": null,
"protocol": {
"type": "ProtoHls"
}
};
continue;
}
if(option === "dash-clear") {
abrProfile.drm_optional = true;
playoutFormats["dash-clear"] = {
"drm": null,
"protocol": {
"min_buffer_length": 2,
"type": "ProtoDash"
}
};
continue;
}
playoutFormats[option] = abrProfile.playout_formats[option];
}
} else if(!drm) {
abrProfile.drm_optional = true;
playoutFormats = {
"hls-clear": {
"drm": null,
"protocol": {
"type": "ProtoHls"
}
},
"dash-clear": {
"drm": null,
"protocol": {
"min_buffer_length": 2,
"type": "ProtoDash"
}
}
};
} else {
playoutFormats = Object.assign({}, abrProfile.playout_formats);
}
abrProfile.playout_formats = playoutFormats;
let libraryId = await this.ContentObjectLibraryId({objectId});
try {
let mainMeta = await this.ContentObjectMetadata({
libraryId,
objectId,
writeToken
});
const nodeId = mainMeta?.live_recording_config?.ingress_node_id;
let streamData;
if(nodeId) {
streamData = {client: this, nodeId, nodeApi: mainMeta?.live_recording_config?.url};
} else {
const url = mainMeta?.live_recording?.fabric_config?.ingress_node_api || mainMeta?.live_recording_config?.url;
streamData = {client: this, url};
}
let node, endpoint;
try {
({node, endpoint} = await GetNodeFromStreamData(streamData));
status.node = node;
} catch(error) {
status.error = error.message;
return status;
}
let fabURI = endpoint;
// Support both hostname and URL ingress_node_api
if(!fabURI.startsWith("http")) {
// Assume https
fabURI = "https://" + fabURI;
}
this.SetNodes({fabricURIs: [fabURI]});
let streamUrl = mainMeta?.live_recording?.recording_config?.recording_params?.origin_url || mainMeta?.live_recording_config?.url;
await StreamGenerateOffering({
client: this,
libraryId,
objectId,
typeAbrMaster,
typeLiveStream,
streamUrl,
abrProfile,
aBitRate,
aChannels,
aSampleRate,
aStreamIndex,
aTimeBase,
aChannelLayout,
vBitRate,
vHeight,
vStreamIndex,
vWidth,
vDisplayAspectRatio,
vFrameRate,
vTimeBase,
writeToken,
finalize
});
console.log("Finished generating offering");
return {
name,
objectId: objectId,
state: "initialized"
};
} catch(error) {
console.error(error);
}
};
/**
* Add a content insertion entry
*
* @methodGroup Live Stream
* @namedParams
* @param {string} name - Object ID or name of the live stream object
* @param {number} insertionTime - Time in seconds (float)
* @param {boolean=} sinceStart - If specified, time specified will be elapsed seconds
* since stream start, otherwise, time will be elapsed since epoch
* @param {number=} duration - Time in seconds (float). Default: 20.0
* @param {string} targetHash - The target content object hash (playable)
* @param {boolean=} remove - If specified, will remove the inseration at the exact 'time' (instead of adding)
*
* @return {Promise<Object>} - Insertions, as well as any errors from bad insertions
*/
exports.StreamInsertion = async function({name, insertionTime, sinceStart=false, duration, targetHash, remove=false}) {
// Determine audio and video parameters of the insertion
// Content Type check is currently disabled due to permissions
/*
let ct = await this.client.ContentObject({versionHash});
if(ct.type != undefined && ct.type != "") {
let typeMeta = await this.client.ContentObjectMetadata({
versionHash: ct.type
});
if(typeMeta.bitcode_flags != "abrmaster") {
throw new Error("Not a playable VoD object " + versionHash);
}
}
*/
let offeringMeta = await this.ContentObjectMetadata({
versionHash: targetHash,
metadataSubtree: "/offerings/default"
});
var insertionInfo = {
duration_sec: 0 // Minimum of video and audio duration
};
["video", "audio"].forEach(mt => {
const stream = offeringMeta.media_struct.streams[mt];
insertionInfo[mt] = {
seg_duration_sec: stream.optimum_seg_dur.float,
duration_sec: stream.duration.float,
frame_rate_rat: stream.rate,
};
if(insertionInfo.duration_sec === 0 || stream.duration.float < insertionInfo.duration_sec) {
insertionInfo.duration_sec = stream.duration.float;
}
});
const audioAbrDuration = insertionInfo.audio.seg_duration_sec;
const videoAbrDuration = insertionInfo.video.seg_duration_sec;
if(audioAbrDuration === 0 || videoAbrDuration === 0) {
throw new Error("Bad segment duration hash:", targetHash);
}
if(duration === undefined) {
duration = insertionInfo.duration_sec; // Use full duration of the insertion
} else {
if(duration > insertionInfo.duration_sec) {
throw new Error("Bad duration - larger than insertion object duration", insertionInfo.duration_sec);
}
}
let objectId = name;
let libraryId = await this.ContentObjectLibraryId({objectId});
let mainMeta = await this.ContentObjectMetadata({
libraryId,
objectId
});
let fabURI = mainMeta.live_recording.fabric_config.ingress_node_api;
// Support both hostname and URL ingress_node_api
if(!fabURI.startsWith("http")) {
// Assume https
fabURI = "https://" + fabURI;
}
this.SetNodes({fabricURIs: [fabURI]});
let edgeWriteToken = mainMeta.live_recording.fabric_config.edge_write_token;
let edgeMeta = await this.ContentObjectMetadata({
libraryId,
objectId,
writeToken: edgeWriteToken
});
// Find stream start time (from the most recent recording section)
let recordings = edgeMeta.live_recording.recordings;
let sequence = 1;
let streamStartTime = 0;
if(recordings != undefined && recordings.recording_sequence != undefined) {
// We have at least one recording - check if still active
sequence = recordings.recording_sequence;
let period = recordings.live_offering[sequence - 1];
if(period.end_time_epoch_sec > 0) {
// The last period is closed - apply insertions to the next period
sequence ++;
} else {
// The period is active
streamStartTime = period.start_time_epoch_sec;
}
}
if(streamStartTime === 0) {
// There is no active period - must use absolute time
if(sinceStart === false) {
throw new Error("Stream not running - must use 'time since start'");
}
}
// Find the current period playout configuration
if(edgeMeta.live_recording.playout_config.interleaves === undefined) {
edgeMeta.live_recording.playout_config.interleaves = {};
}
if(edgeMeta.live_recording.playout_config.interleaves[sequence] === undefined) {
edgeMeta.live_recording.playout_config.interleaves[sequence] = [];
}
let playoutConfig = edgeMeta.live_recording.playout_config;
let insertions = playoutConfig.interleaves[sequence];
let res = {};
if(!sinceStart) {
insertionTime = insertionTime - streamStartTime;
}
// Assume insertions are sorted by insertion time
let errs = [];
let currentTime = -1;
let insertionDone = false;
let newInsertion = {
insertion_time: insertionTime,
duration: duration,
audio_abr_duration: audioAbrDuration,
video_abr_duration: videoAbrDuration,
playout: "/qfab/" + targetHash + "/rep/playout" // TO FIX - should be a link
};
for(let i = 0; i < insertions.length; i ++) {
if(insertions[i].insertion_time <= currentTime) {
// Bad insertion - must be later than current time
append(errs, "Bad insertion - time:", insertions[i].insertion_time);
}
if(remove) {
if(insertions[i].insertion_time === insertionTime) {
insertions.splice(i, 1);
break;
}
} else {
if(insertions[i].insertion_time > insertionTime) {
if(i > 0) {
insertions = [
...insertions.splice(0, i),
newInsertion,
...insertions.splice(i)
];
} else {
insertions = [
newInsertion,
...insertions.splice(i)
];
}
insertionDone = true;
break;
}
}
}
if(!remove && !insertionDone) {
// Add to the end of the insertions list
console.log("Add insertion at the end");
insertions = [
...insertions,
newInsertion
];
}
playoutConfig.interleaves[sequence] = insertions;
// Store the new insertions in the write token
await this.ReplaceMetadata({
libraryId: libraryId,
objectId: objectId,
writeToken: edgeWriteToken,
metadataSubtree: "/live_recording/playout_config",
metadata: edgeMeta.live_recording.playout_config
});
res.errors = errs;
res.insertions = insertions;
return res;
};
/**
* Get all available live recording config profiles from the live stream site configuration.
* Ladder profiles define settings for configuring live streams including recording config, playout config, and recording params.
*
* @methodGroup Live Stream
*
* @returns {Promise<Object>} - Object containing all live recording config profiles
*/
exports.StreamConfigProfiles = async function({resolveLinks=false}={}) {
const {siteObjectId, siteLibraryId} = await this.StreamSiteSettings();
const profiles = await this.ContentObjectMetadata({
libraryId: siteLibraryId,
objectId: siteObjectId,
metadataSubtree: "public/asset_metadata/profiles",
resolveLinks
});
if(!profiles || !resolveLinks) {
return profiles;
}
// The fabric's resolve=true only follows metadata links, not file links.
// Detect unresolved file links (e.g. { '/': './files/...' }) and fetch them individually.
const resolved = {};
await Promise.all(
Object.entries(profiles).map(async ([name, profile]) => {
if(profile && typeof profile["/"] === "string" && profile["/"].startsWith("./files/")) {
try {
resolved[name] = await this.LinkData({
libraryId: siteLibraryId,
objectId: siteObjectId,
linkPath: `public/asset_metadata/profiles/${name}`,
format: "json"
});
} catch(e) {
resolved[name] = profile;
}
}
})
);
return resolved;
};
/**
* Get a specific live recording config profile's specifications by name.
*
* @methodGroup Live Stream
* @namedParams
* @param {string} profileName - Name of the profile to retrieve
* @param {string} profileSlug - Slug of the profile to retrieve
*
@returns {Promise<Object>} - The specifications for the requested profile
*
*/
exports.StreamConfigProfile = async function({profileName, profileSlug}) {
console.log({profileName, profileSlug})
if(!profileName && !profileSlug) {
throw new Error("Either profileName or profileSlug must be provided.");
}
if(!profileSlug) {
profileSlug = slugify(profileName);
}
const {siteObjectId, siteLibraryId} = await this.StreamSiteSettings();
const profileData = await this.ContentObjectMetadata({
libraryId: siteLibraryId,
objectId: siteObjectId,
metadataSubtree: `public/asset_metadata/profiles/${profileSlug}`,
resolveLinks: true
});
if(!profileData) {
console.warn(`Live Recording Config profile ${profileName} not found.`);
}
return profileData;
};
/**
* Save a live recording config profile to the live stream site object.
* Uploads any provided profile files or profile metadata and creates a metadata link to the profile. One must be specified. The argument profileMetadata takes precendence if both are provided.
*
* @methodGroup Live Stream
* @namedParams
* @param {FileList|Array<File>=} files - Profile files to upload to the site object
* @param {Object=} profileMetadata - Metadata for the profile
*
* @returns {Promise<void>}
*/
exports.StreamSaveConfigProfile = async function({
files,
profileMetadata,
writeToken,
finalize=true
}) {
if(!files && !profileMetadata) {
throw new Error("Missing required field: Please specify files or profileMetadata.")
}
const profiles = await this.StreamConfigProfiles({resolveLinks: true});
const {
siteObjectId: objectId,
siteLibraryId: libraryId
} = await this.StreamSiteSettings();
if(!writeToken) {
({writeToken} = await this.EditContentObject({
libraryId,
objectId
}));
}
if(profileMetadata) {
const defaultName = `Profile-${new Date().toISOString().slice(0, 10)}`;
profileMetadata.last_updated = new Date().toISOString();
const metaFileName = slugify(profileMetadata.name || defaultName);
const blob = new Blob([JSON.stringify(profileMetadata, null, 2)], {type: "application/json"});
const metaFile = new File([blob], `${metaFileName}.json`, {type: "application/json"});
files = files ? [...Array.from(files), metaFile] : [metaFile];
}
let fileInfo;
if(files) {
fileInfo = await FileInfo({
path: "live_stream_profiles",
fileList: files
});
await this.UploadFiles({
libraryId,
objectId,
writeToken,
fileInfo
});
}
const links = fileInfo.map(file => {
const fileName = path.parse(file.name).name;
return {
type: "file",
path: `public/asset_metadata/profiles/${slugify(fileName)}`,
target: file.path
}
});
await this.CreateLinks({
libraryId,
objectId,
writeToken,
links
});
if(finalize) {
await this.FinalizeContentObject({
libraryId,
objectId,
writeToken,
commitMessage: "Add live recording config profile"
});
}
};
/**
* Assign a live recording config profile to a stream by adding the stream to the
* profile's stream index on the site object. If the stream is already assigned, this is a no-op.
*
* @methodGroup Live Stream
* @namedParams
* @param {string} profileSlug - Slug of the profile to assign
* @param {string} streamObjectId - Object ID of the stream to assign the profile to
* @param {string=} writeToken - Write token for the site object. If not provided, a new edit will be opened and finalized.
* @param {boolean=} finalize - If enabled, the site object will be finalized after the assignment (default: true)
*
* @returns {Promise<void>}
*/
exports.StreamAssignProfile = async function({
profileSlug,
streamObjectId,
writeToken,
finalize=true
}) {
try {
const {siteObjectId: objectId, siteLibraryId: libraryId} = await this.StreamSiteSettings({resolveIncludeSource: false, resolveLinks: false});
if(!objectId || !libraryId) {
throw new Error("Site object must be configured first.")
}
const profileStreams = await this.ContentObjectMetadata({
libraryId,
objectId,
metadataSubtree: `public/asset_metadata/profile_streams/${profileSlug}`
}) || [];
if(!profileStreams.includes(streamObjectId)) {
profileStreams.push(streamObjectId);
if(!writeToken) {
({writeToken} = await this.EditContentObject({
libraryId,
objectId
}))
}
await this.ReplaceMetadata({
libraryId,
objectId,
writeToken,
metadataSubtree: `public/asset_metadata/profile_streams/${profileSlug}`,
metadata: profileStreams
});
if(finalize) {
await this.FinalizeContentObject({
libraryId,
objectId,
writeToken,
commitMessage: "Assign profile to stream"
});
}
}
} catch(error) {
console.error("Unable to assign profile to stream", error);
throw error;
}
};
/**
* Unassign a live recording config profile from a stream by removing the stream from the
* profile's stream index on the site object.
*
* @methodGroup Live Stream
* @namedParams
* @param {string} profileSlug - Slug of the profile to unassign
* @param {string} streamObjectId - Object ID of the stream to remove from the profile
* @param {string=} writeToken - Write token for the site object. If not provided, a new edit will be opened and finalized.
* @param {boolean=} finalize - If enabled, the site object will be finalized after the unassignment (default: true)
*
* @returns {Promise<void>}
*/
exports.StreamUnassignProfile = async function({
profileSlug,
streamObjectId,
writeToken,
finalize=true
}) {
try {
const {siteObjectId: objectId, siteLibraryId: libraryId} = await this.StreamSiteSettings({resolveIncludeSource: false, resolveLinks: false});
if(!objectId || !libraryId) {
throw new Error("Site object must be configured first.")
}
let profileStreams = await this.ContentObjectMetadata({
libraryId,
objectId,
metadataSubtree: `public/asset_metadata/profile_streams/${profileSlug}`
}) || [];
profileStreams = profileStreams.filter(id => id !== streamObjectId);
if(!writeToken) {
({writeToken} = await this.EditContentObject({
libraryId,
objectId
}));
}
await this.ReplaceMetadata({
libraryId,
objectId,
writeToken,
metadataSubtree: `public/asset_metadata/profile_streams/${profileSlug}`,
metadata: profileStreams
});
if(finalize) {
await this.FinalizeContentObject({
libraryId,
objectId,
writeToken,
commitMessage: "Unassign profile to stream"
});
}
} catch(error) {
console.error("Unable to unassign profile to stream", error);
throw error;
}
};
/**
* Apply a live recording config profile to a stream, writing the merged configuration
* to the stream's live_recording_config and updating the profile's stream index on the site object.
* Handles switching profiles by unassigning the previous profile from the index.
* Use this for CLI workflows. For app workflows managing the config edit separately, use StreamAssignProfile directly.
*
* @methodGroup Live Stream
* @namedParams
* @param {string=} profileSlug - Slug of the profile to apply. Required if profile is not provided.
* @param {Object=} profile - Profile object to apply. Required if profileSlug is not provided. If both are provided, profile is used and profileSlug is derived from profile.name.
* @param {string} objectId - Object ID of the stream to apply the profile to
* @param {string=} streamWriteToken - Write token for the stream object. If not provided, a new edit will be opened.
* @param {string=} siteWriteToken - Write token for the site object. If not provided, a new edit will be opened.
* @param {boolean=} finalize - If enabled, both the stream and site objects will be finalized (default: true)
*
* @returns {Promise<{config: Object}|{streamWriteToken: string, siteWriteToken: string}>} - The merged config if finalized, otherwise the open write tokens
*/
exports.StreamApplyProfile = async function({
profileSlug,
profile,
objectId,
streamWriteToken,
siteWriteToken,
finalize=true
}) {
if(!profile && !profileSlug) {
throw new Error("Either profile or profileSlug must be provided.");
}
if(!profile) {
profile = await this.StreamConfigProfile({profileSlug});
}
const libraryId = await this.ContentObjectLibraryId({objectId});
if(!streamWriteToken) {
({writeToken: streamWriteToken} = await this.EditContentObject({
libraryId,
objectId
}));
}
// Load the base config profile and merge with overrides
const overrides = await this.ContentObjectMetadata({
libraryId,
objectId,
writeToken: streamWriteToken,
metadataSubtree: "live_recording_overrides"
}) || {};
const currentConfig = await this.ContentObjectMetadata({
libraryId,
objectId,
writeToken: streamWriteToken,
metadataSubtree: "live_recording_config"
}) || {};
// Preserve stream-specific fields (e.g. url) from the current config, then apply profile, then overrides
const config = R.mergeDeepRight(R.mergeDeepRight(currentConfig, profile), overrides);
const currentProfileName = await this.ContentObjectMetadata({
libraryId,
objectId,
writeToken: streamWriteToken,
metadataSubtree: "live_recording_config/name"
});
const currentProfileSlug = slugify(currentProfileName);
await this.StreamUpdateConfig({
libraryId,
objectId,
writeToken: streamWriteToken,
finalize: false,
liveRecordingConfig: config
});
if(!profileSlug) {
profileSlug = slugify(profile.name);
}
const {siteObjectId, siteLibraryId} = await this.StreamSiteSettings({resolveIncludeSource: false, resolveLinks: false});
if(!siteWriteToken) {
({writeToken: siteWriteToken} = await this.EditContentObject({libraryId: siteLibraryId, objectId: siteObjectId}));
}
if(currentProfileSlug && currentProfileSlug !== profileSlug) {
await this.StreamUnassignProfile({
profileSlug: currentProfileSlug,
streamObjectId: objectId,
writeToken: siteWriteToken,
finalize: false
})
}
// Update profile update timestamp
await this.ReplaceMetadata({
libraryId,
objectId,
writeToken: streamWriteToken,
metadataSubtree: "public/asset_metadata/profile_last_updated",
metadata: new Date().toISOString()
});
await this.StreamAssignProfile({
profileSlug,
streamObjectId: objectId,
writeToken: siteWriteToken,
finalize: false
});
if(finalize) {
await this.FinalizeContentObject({
libraryId,
objectId,
writeToken: streamWriteToken,
commitMessage: "Update profile"
});
await this.FinalizeContentObject({
libraryId: siteLibraryId,
objectId: siteObjectId,
writeToken: siteWriteToken,
commitMessage: "Update profile streams"
});
}
if(!finalize) {
return {streamWriteToken, siteWriteToken};
} else {
return {config};
}
};
/**
* Update the live recording configuration of a stream object.
*
* @methodGroup Live Stream
* @namedParams
* @param {string} libraryId - Library ID of the stream object
* @param {string} objectId - Object ID of the stream
* @param {string} commitMessage - Message to include about this commit
* @param {string=} writeToken - Write token for the stream object. If not provided, a new edit will be opened.
* @param {LiveRecordingConfig} liveRecordingConfig - The live recording configuration to write
* @param {Object=} overrideSettings - Partial LiveRecordingConfig deep-merged over liveRecordingConfig
* @param {boolean=} finalize - If enabled, the stream object will be finalized after the update (default: true)
*
* @returns {Promise<{writeToken: string}|void>} - The write token if finalize is false, otherwise void
*/
exports.StreamUpdateConfig = async function({
libraryId,
objectId,
writeToken,
liveRecordingConfig,
overrideSettings,
commitMessage,
finalize=true
}) {
if(!writeToken) {
({writeToken} = await this.EditContentObject({
libraryId,
objectId
}));
}
if(!libraryId) {
libraryId = await this.ContentObjectLibraryId({objectId});
}
if(overrideSettings) {
await this.ReplaceMetadata({
libraryId,
objectId,
writeToken,
metadataSubtree: "live_recording_overrides",
metadata: overrideSettings
});
liveRecordingConfig = R.mergeDeepRight(liveRecordingConfig, overrideSettings);
}
await this.ReplaceMetadata({
libraryId,
objectId,
writeToken,
metadataSubtree: "live_recording_config",
metadata: liveRecordingConfig
});
if(finalize) {
await this.FinalizeContentObject({
libraryId,
objectId,
writeToken,
commitMessage: commitMessage ?? "Update stream config"
});
}
if(!finalize) {
return {writeToken};
}
};
/**
* @typedef {Object} InputStreamInfo
* @property {Object=} input_stream_info - Simplified probe information for the input stream
* @property {Object=} input_stream_info.format - Format information
* @property {string=} input_stream_info.format.format_name - Format name (e.g., "mpegts")
* @property {string=} input_stream_info.format.filename - Stream URL
* @property {Array<Object>=} input_stream_info.streams - Array of stream information
* @property {string=} input_stream_info.streams[].codec_name - Codec name (e.g., "h264", "aac")
* @property {string=} input_stream_info.streams[].codec_type - Codec type ("video" or "audio")
* @property {string=} input_stream_info.streams[].display_aspect_ratio - Display aspect ratio (e.g., "16/9")
* @property {string=} input_stream_info.streams[].field_order - Field order (e.g., "progressive")
* @property {string=} input_stream_info.streams[].frame_rate - Frame rate (e.g., "50")
* @property {number=} input_stream_info.streams[].height - Video height in pixels
* @property {number=} input_stream_info.streams[].width - Video width in pixels
* @property {number=} input_stream_info.streams[].level - Codec level
* @property {number=} input_stream_info.streams[].stream_id - Stream ID
* @property {number=} input_stream_info.streams[].stream_index - Stream index
* @property {number=} input_stream_info.streams[].channel_layout - Audio channel layout
* @property {number=} input_stream_info.streams[].channels - Number of audio channels
* @property {number=} input_stream_info.streams[].sample_rate - Audio sample rate
*/
/**
* @typedef {Object} LiveRecordingConfig
* @property {string=} name - Name of the profile
*
* @property {Object=} recording_config - Recording configuration settings
* @property {number=} recording_config.part_ttl - Time-to-live for stream parts in seconds
* @property {number=} recording_config.connection_timeout - Initial connection timeout when starting the stream, in seconds
* @property {number=} recording_config.reconnect_timeout - Duration to listen after disconnect detection, in seconds
* @property {boolean=} recording_config.copy_mpegts - Whether to copy MPEG-TS data
* @property {Object=} recording_config.input_cfg - Input configuration settings
* @property {boolean=} recording_config.input_cfg.bypass_libav_reader - Whether to bypass libav reader
* @property {string=} recording_config.input_cfg.copy_mode - Copy mode setting: "" (empty), "none", "raw", or "remuxed"
* @property {string=} recording_config.input_cfg.copy_packaging - Copy packaging mode: "raw_ts" or "rtp_ts"
* @property {boolean=} recording_config.input_cfg.custom_read_loop_enabled - Legacy reader
* @property {string=} recording_config.input_cfg.input_packaging - Input Packaging
*
* @property {Object=} playout_config - Playout configuration settings
* @property {Object=} playout_config.image_watermark - Image watermark configuration
* @property {string=} playout_config.image_watermark.align_h - Horizontal alignment (e.g., "left", "center", "right")
* @property {string=} playout_config.image_watermark.align_v - Vertical alignment (e.g., "top", "middle", "bottom")
* @property {string=} playout_config.image_watermark.image - Path to watermark image file
* @property {boolean=} playout_config.image_watermark.wm_enabled - Whether the image watermark is enabled
* @property {Object=} playout_config.simple_watermark - Simple text watermark configuration
* @property {string=} playout_config.simple_watermark.font_color - Font color (e.g., "white@0.5")
* @property {number=} playout_config.simple_watermark.font_relative_height - Font size relative to video height
* @property {boolean=} playout_config.simple_watermark.shadow - Whether to add shadow to text
* @property {string=} playout_config.simple_watermark.shadow_color - Shadow color (e.g., "black@0.5")
* @property {string=} playout_config.simple_watermark.template - Watermark text template
* @property {string=} playout_config.simple_watermark.x - Horizontal position expression
* @property {string=} playout_config.simple_watermark.y - Vertical position expression
* @property {boolean=} playout_config.dvr - Whether to enable DVR functionality
* TODO: update possible drm types
* TODO: update possible playout formats
* @property {Array<string>=} playout_config.playout_formats - List of playout format names (e.g., "dash-widevine", "hls-widevine")
* @property {Object=} playout_config.ladder_specs - Encoding ladder specifications
* @property {Array<Object>=} playout_config.ladder_specs.audio - Audio encoding ladder
* @property {number=} playout_config.ladder_specs.audio[].bit_rate - Audio bitrate
* @property {number=} playout_config.ladder_specs.audio[].channels - Number of audio channels
* @property {string=} playout_config.ladder_specs.audio[].codecs - Audio codec identifier
* @property {Array<Object>=} playout_config.ladder_specs.video - Video encoding ladder
* @property {number=} playout_config.ladder_specs.video[].bit_rate - Video bitrate
* @property {string=} playout_config.ladder_specs.video[].codecs - Video codec identifier
* @property {number=} playout_config.ladder_specs.video[].height - Video height in pixels
* @property {number=} playout_config.ladder_specs.video[].width - Video width in pixels
*
* @property {Object=} recording_stream_config - Stream recording configuration
* @property {Object=} recording_stream_config.audio - Audio stream recording configuration indexed by stream number
* @property {number=} recording_stream_config.audio[].bitrate - Stream bitrate
* @property {string=} recording_stream_config.audio[].codec - Audio codec (e.g., "aac")
* @property {boolean=} recording_stream_config.audio[].playout - Whether to include this stream in playout
* @property {string=} recording_stream_config.audio[].playout_label - Label for playout (e.g., "Audio 1")
* @property {boolean=} recording_stream_config.audio[].record - Whether to record this audio stream
* @property {number=} recording_stream_config.audio[].recording_bitrate - Recording bitrate
* @property {number=} recording_stream_config.audio[].recording_channels - Number of recording channels
*
* @property {InputStreamInfo=} input_stream_info - Simplified probe information for the input stream
*
* @property {Object=} recording_params - Advanced recording parameters
* @property {Object=} recording_params.xc_params - Transcoding parameters
* @property {number=} recording_params.xc_params.audio_bitrate - Audio bitrate for encoding
* @property {Object=} recording_params.xc_params.audio_index - Audio stream index mapping (indexed by output stream number)
* @property {number=} recording_params.xc_params.audio_seg_duration_ts - Audio segment duration in time scale units
* @property {number=} recording_params.xc_params.connection_timeout - Connection timeout in seconds
* @property {boolean=} recording_params.xc_params.copy_mpegts - Whether to copy MPEG-TS data
* @property {string=} recording_params.xc_params.ecodec2 - Audio encoder codec (e.g., "aac")
* @property {number=} recording_params.xc_params.enc_height - Encoding height in pixels
* @property {number=} recording_params.xc_params.enc_width - Encoding width in pixels
* @property {string=} recording_params.xc_params.filter_descriptor - FFmpeg filter descriptor string
* @property {number=} recording_params.xc_params.force_keyint - Force keyframe interval
* @property {string=} recording_params.xc_params.format - Output format (e.g., "fmp4-segment")
* @property {boolean=} recording_params.xc_params.listen - Whether to listen for incoming stream
* @property {number=} recording_params.xc_params.n_audio - Number of audio streams
* @property {string=} recording_params.xc_params.preset - Encoding preset (e.g., "faster", "medium", "slow")
* @property {number=} recording_params.xc_params.sample_rate - Audio sample rate in Hz
* @property {string=} recording_params.xc_params.seg_duration - Segment duration in seconds (as string)
* @property {boolean=} recording_params.xc_params.skip_decoding - Whether to skip decoding
* @property {string=} recording_params.xc_params.start_segment_str - Starting segment number (as string)
* @property {number=} recording_params.xc_params.stream_id - Stream ID (-1 for auto)
* @property {number=} recording_params.xc_params.sync_audio_to_stream_id - Stream ID to sync audio to
* @property {number=} recording_params.xc_params.video_bitrate - Video bitrate for encoding
* @property {number=} recording_params.xc_params.video_frame_duration_ts - Video frame duration in time scale units (null for auto)
* @property {number=} recording_params.xc_params.video_seg_duration_ts - Video segment duration in time scale units
* @property {string=} recording_params.xc_params.video_time_base - Video time base (null for auto)
* @property {number=} recording_params.xc_params.xc_type - Transcoding type identifier
*
* @property {Object=} probe_info - Full probe information (stored for historical/debugging purposes, only in live_recording_config)
*
*/
/**
* Configure the stream based on built-in logic and optional custom settings.
*
* @methodGroup Live Stream
* @namedParams
* @param {string} name - Object ID or name of the live stream object
* @param {LiveRecordingConfig=} liveRecordingConfig - Configuration profile for the live stream including recording, playout, and transcoding settings
* @param {InputStreamInfo=} inputStreamInfo - Simplified probe metadata
* @param {string=} writeToken - Write token of the draft
* @param {boolean=} finalize - If enabled, target object will be finalized after configuring
*
* @return {Promise<Object>} - The status response for the stream
*
*/
exports.StreamConfig = async function({
name,
liveRecordingConfig,
inputStreamInfo,
writeToken,
finalize=true
}) {
const objectId = name;
let probe = inputStreamInfo || liveRecordingConfig?.input_stream_info;
const currentStatus = await this.StreamStatus({name, writeToken});
if(!["uninitialized", "inactive", "unconfigured"].includes(currentStatus.state)) {
return {
state: currentStatus.state,
error: "Stream still active - must deactivate first"
};
}
const libraryId = await this.ContentObjectLibraryId({objectId});
const status = {
name,
libraryId: libraryId,
objectId: objectId,
}
const liveRecordingMeta = await this.ContentObjectMetadata({
libraryId: libraryId,
objectId,
writeToken,
metadataSubtree: "/live_recording"
});
let liveRecordingConfigProfile;
if(liveRecordingConfig && Object.keys(liveRecordingConfig || {}).length > 0) {
// Extract values that may have been saved during Create but aren't being repeated in the Config step
const savedConfigData = await this.ContentObjectMetadata({
libraryId,
objectId,
writeToken,
metadataSubtree: "/live_recording_config"
});
liveRecordingConfigProfile = R.mergeDeepRight(savedConfigData ?? {}, liveRecordingConfig);
} else {
const lrcMeta = await this.ContentObjectMetadata({
libraryId: libraryId,
objectId,
writeToken,
metadataSubtree: "/live_recording_config",
});
// Save liveRecordingConfig as saved profile or default
liveRecordingConfigProfile = lrcMeta ?? LRCProfile;
}
let nodeId = liveRecordingConfigProfile?.ingress_node_id;
status.userConfig = liveRecordingConfigProfile;
const streamData = {
client: this
};
if(nodeId) {
streamData.nodeId = nodeId;
streamData.nodeApi = liveRecordingConfigProfile?.url;
} else {
streamData.url = liveRecordingConfigProfile.url;
}
// Get node URI from user config
let node, endpoint, streamHref;
try {
({node, endpoint, streamHref} = await GetNodeFromStreamData(streamData));
status.node = node;
nodeId = node.id;
} catch(error) {
throw error;
}
// No stream data provided ; probe the stream for info
if(!probe) {
probe = await GetStreamProbe({
client: this,
libraryId,
objectId,
streamHref,
endpoint
});
}
// Create live recording config
const liveConf = new LiveConf({
url: liveRecordingConfigProfile.url,
probeData: probe,
liveRecordingMeta,
nodeId,
nodeUrl: endpoint,
includeAVSegDurations: false,
overwriteOriginUrl: false,
syncAudioToVideo: true
});
const liveRecordingConfigMeta = liveConf.generateLiveConf({
customSettings: {
liveRecordingConfigProfile
}
});
// Store live recording config into the stream object
if(!writeToken) {
let e = await this.EditContentObject({
libraryId,
objectId: objectId
});
writeToken = e.write_token;
}
if(["uninitialized", "unconfigured"].includes(currentStatus.state)) {
const formats = liveRecordingConfigMeta?.live_recording.playout_config?.playout_formats;
await this.StreamInitialize({
name: objectId,
drm: (formats || []).some(el => !el.includes("clear")) ? true : false,
format: formats ? formats?.join(",") : "",
writeToken,
finalize: false
});
}
const allowList = ["fabric_config", "playout_config", "recording_config", "url"];
const filteredMeta = Object.fromEntries(
Object.entries(liveRecordingConfigMeta.live_recording || {}).filter(([key]) => allowList.includes(key))
);
await this.ReplaceMetadata({
libraryId,
objectId,
writeToken,
metadataSubtree: "live_recording",
metadata: filteredMeta
});
await this.ReplaceMetadata({
libraryId,
objectId,
writeToken,
metadataSubtree: "live_recording_config/probe_info",
metadata: probe
});
if(finalize) {
status.fin = await this.FinalizeContentObject({
libraryId,
objectId,
writeToken,
commitMessage: "Apply live stream configuration"
});
}
return status;
};
/**
* List the pre-allocated URLs for a site
*
* @methodGroup Live Stream
* @namedParams
* @param {string=} siteId - ID of the live stream site object
*
* @return {Promise<Object>} - The list of stream URLs
*/
exports.StreamListUrls = async function({siteId}={}) {
try {
const STATUS_MAP = {
UNCONFIGURED: "unconfigured",
UNINITIALIZED: "uninitialized",
INACTIVE: "inactive",
STOPPED: "stopped",
STARTING: "starting",
RUNNING: "running",
STALLED: "stalled",
};
if(!siteId) {
const tenantContractId = await this.userProfileClient.TenantContractId();
if(!tenantContractId) {
throw Error("No tenant contract ID configured");
}
siteId = await this.ContentObjectMetadata({
libraryId: tenantContractId.replace("iten", "ilib"),
objectId: tenantContractId.replace("iten", "iq__"),
metadataSubtree: "public/sites/live_streams",
});
}
const streamMetadata = await this.ContentObjectMetadata({
libraryId: await this.ContentObjectLibraryId({objectId: siteId}),
objectId: siteId,
metadataSubtree: "public/asset_metadata/live_streams",
resolveLinks: true,
resolveIgnoreErrors: true,
resolveIncludeSource: true
});
const activeUrlMap = {};
await this.utils.LimitedMap(
10,
Object.keys(streamMetadata || {}),
async slug => {
const stream = streamMetadata[slug];
let versionHash;
if(stream && stream["."] && stream["."].source) {
versionHash = stream["."].source;
}
if(versionHash) {
const objectId = this.utils.DecodeVersionHash(versionHash).objectId;
const libraryId = await this.ContentObjectLibraryId({objectId});
const status = await this.StreamStatus({
name: objectId
});
const streamMeta = await this.ContentObjectMetadata({
objectId,
libraryId,
select: [
// live_recording_config/reference_url is an old path
"live_recording_config/reference_url",
"live_recording_config/ingress_node_api",
// live_recording_config/url is the old path
"live_recording_config/url"
]
}) || {};
const url = streamMeta.live_recording_config?.ingress_node_api || streamMeta.live_recording_config?.reference_url || streamMeta.live_recording_config?.url;
const isActive = [STATUS_MAP.STARTING, STATUS_MAP.RUNNING, STATUS_MAP.STALLED, STATUS_MAP.STOPPED].includes(status.state);
if(url && isActive) {
activeUrlMap[url] = true;
}
}
}
);
const streamUrlStatus = {};
const streamUrls = await this.ContentObjectMetadata({
libraryId: await this.ContentObjectLibraryId({objectId: siteId}),
objectId: siteId,
metadataSubtree: "/live_stream_urls",
resolveLinks: true,
resolveIgnoreErrors: true
});
if(!streamUrls) {
throw Error("No pre-allocated URLs configured");
}
Object.keys(streamUrls || {}).forEach(protocol => {
streamUrlStatus[protocol] = streamUrls[protocol].map(url => {
return {
url,
active: activeUrlMap[url] || false
};
});
});
return streamUrlStatus;
} catch(error) {
console.error(error);
}
};
/**
* Copy a portion of a live stream recording into a standard VoD object using the zero-copy content fabric API
*
* Limitations:
* - currently requires the target object to be pre-created and have content encryption keys (CAPS)
* - for audio and video to be sync'd, the live stream needs to have the beginning of the desired recording period
* - for an event stream, make sure the TTL is long enough to allow running the live-to-vod command before the beginning of the recording expires
* - for 24/7 streams, make sure to reset the stream before the desired recording (as to create a new recording period) and have the TTL long enough
* to allow running the live-to-vod command before the beginning of the recording expires.
* - startTime and endTime are not currently implemented by this method
*
*
* @methodGroup Live Stream
* @namedParams
* @param {string} name - Object ID or name of the live stream
* @param {string} targetObjectId - Object ID of the target VOD object
* @param {string=} eventId -
* @param {boolean=} finalize - If enabled, target object will be finalized after copy to vod operations
* @param {number=} recordingPeriod - Determines which recording period to copy, which are 0-based. -1 copies the current (or last) period
*
* @return {Promise<Object>} - The status response for the stream
*/
/*
Example fabric API flow:
https://host-76-74-34-194.contentfabric.io/qlibs/ilib24CtWSJeVt9DiAzym8jB6THE9e7H/q/$QWT/call/media/live_to_vod/init -d @r1 -H "Authorization: Bearer $TOK"
{
"live_qhash": "hq__5Zk1jSN8vNLUAXjQwMJV8F8J8ESXNvmVKkhaXySmGc1BXnJPG2FvvaXee4CXqvFHuGuU3fqLJc",
"start_time": "",
"end_time": "",
"recording_period": -1,
"streams": ["video", "audio"],
"variant_key": "default"
}
https://host-76-74-34-194.contentfabric.io/qlibs/ilib24CtWSJeVt9DiAzym8jB6THE9e7H/q/$QWT/call/media/abr_mezzanine/init -H "Authorization: Bearer $TOK" -d @r2
{
"abr_profile": { ... },
"offering_key": "default",
"prod_master_hash": "tqw__HSQHBt7vYxWfCMPH5yXwKTfhdPcQ4Lcs9WUMUbTtnMbTZPTLo4BfJWPMGpoy1Dpv1wWQVtUtAtAr429TnVs",
"variant_key": "default",
"keep_other_streams": false
}
https://host-76-74-34-194.contentfabric.io/qlibs/ilib24CtWSJeVt9DiAzym8jB6THE9e7H/q/$QWT/call/media/live_to_vod/copy -d '{"variant_key":"","offering_key":""}' -H "Authorization: Bearer $TOK"
https://host-76-74-34-194.contentfabric.io/qlibs/ilib24CtWSJeVt9DiAzym8jB6THE9e7H/q/$QWT/call/media/abr_mezzanine/offerings/default/finalize -d '{}' -H "Authorization: Bearer $TOK"
*/
exports.StreamCopyToVod = async function({
name,
targetObjectId,
eventId,
streams=null,
finalize=true,
recordingPeriod=-1,
startTime="",
endTime=""
}) {
const objectId = name;
const abrProfile = require("../abr_profiles/abr_profile_live_to_vod.js");
const status = await this.StreamStatus({name});
const libraryId = status.libraryId;
this.Log(`Copying stream ${name} to target ${targetObjectId}`);
ValidateObject(targetObjectId);
const targetLibraryId = await this.ContentObjectLibraryId({objectId: targetObjectId});
// Validation - ensure target object has content encryption keys
const kmsAddress = await this.authClient.KMSAddress({objectId: targetObjectId});
const kmsCapId = `eluv.caps.ikms${this.utils.AddressToHash(kmsAddress)}`;
const kmsCap = await this.ContentObjectMetadata({
libraryId: targetLibraryId,
objectId: targetObjectId,
metadataSubtree: kmsCapId
});
if(!kmsCap) {
throw Error(`No content encryption key set for object ${targetObjectId}`);
}
try {
status.liveObjectId = objectId;
const liveHash = await this.LatestVersionHash({objectId, libraryId});
status.liveHash = liveHash;
if(eventId) {
// Retrieve start and end times for the event
let event = await this.CueInfo({eventId, status});
if(event.eventStart && event.eventEnd) {
startTime = event.eventStart;
endTime = event.eventEnd;
}
}
const {writeToken} = await this.EditContentObject({
objectId: targetObjectId,
libraryId: targetLibraryId
});
status.targetObjectId = targetObjectId;
status.targetLibraryId = targetLibraryId;
status.targetWriteToken = writeToken;
this.Log("Process live source (takes around 20 sec per hour of content)");
await this.CallBitcodeMethod({
libraryId: targetLibraryId,
objectId: targetObjectId,
writeToken,
method: "/media/live_to_vod/init",
body: {
"live_qhash": liveHash,
"start_time": startTime, // eg. "2023-10-03T02:09:02.00Z",
"end_time": endTime, // eg. "2023-10-03T02:15:00.00Z",
"streams": streams,
"recording_period": recordingPeriod,
"variant_key": "default"
},
constant: false,
format: "text"
});
const abrMezInitBody = {
abr_profile: abrProfile,
"offering_key": "default",
"prod_master_hash": writeToken,
"variant_key": "default",
"keep_other_streams": false
};
await this.CallBitcodeMethod({
libraryId: targetLibraryId,
objectId: targetObjectId,
writeToken,
method: "/media/abr_mezzanine/init",
body: abrMezInitBody,
constant: false,
format: "text"
});
try {
await this.CallBitcodeMethod({
libraryId: targetLibraryId,
objectId: targetObjectId,
writeToken,
method: "/media/live_to_vod/copy",
body: {},
constant: false,
format: "text"
});
} catch(error) {
console.error("Unable to call /media/live_to_vod/copy", error);
throw error;
}
await this.CallBitcodeMethod({
libraryId: targetLibraryId,
objectId: targetObjectId,
writeToken,
method: "/media/abr_mezzanine/offerings/default/finalize",
body: abrMezInitBody,
constant: false,
format: "text"
});
if(finalize) {
const finalizeResponse = await this.FinalizeContentObject({
libraryId: targetLibraryId,
objectId: targetObjectId,
writeToken,
commitMessage: "Live Stream to VoD"
});
status.targetHash = finalizeResponse.hash;
}
// Clean up unnecessary status items
delete status.playoutUrls;
delete status.lroStatusUrl;
delete status.recordingPeriod;
delete status.recordingPeriodSequence;
delete status.edgeMetaSize;
delete status.insertions;
return status;
} catch(error) {
this.Log(error, true);
throw error;
}
};
/**
* Remove a watermark for a live stream
*
* @methodGroup Live Stream
* @namedParams
* @param {string=} libraryId - Library ID of the live stream
* @param {string} objectId - Object ID of the live stream
* @param {string=} writeToken - Write token of the draft
* @param {Array<string>} types - Specify which type of watermark to remove. Possible values:
* - "image"
* - "text"
* - "forensic"
* @param {boolean=} finalize - If enabled, target object will be finalized after removing watermark
*
* @return {Promise<Object>} - The finalize response
*/
exports.StreamRemoveWatermark = async function({
libraryId,
objectId,
writeToken,
types,
finalize=true
}) {
ValidateObject(objectId);
if(!libraryId) {
libraryId = await this.ContentObjectLibraryId({objectId});
}
if(!writeToken) {
({writeToken} = await this.EditContentObject({
objectId,
libraryId
}));
}
this.Log(`Removing watermark types: ${types.join(", ")} ${libraryId} ${objectId}`);
const edgeWriteToken = await this.ContentObjectMetadata({
objectId,
libraryId,
metadataSubtree: "/live_recording/fabric_config/edge_write_token"
});
const metadataPath = "live_recording/playout_config";
const objectMetadata = await this.ContentObjectMetadata({
libraryId,
objectId,
writeToken,
metadataSubtree: metadataPath,
resolveLinks: false
});
if(!objectMetadata) {
throw Error("Stream object must be configured before removing a watermark");
}
types.forEach(type => {
if(type === "text") {
delete objectMetadata.simple_watermark;
} else if(type === "image") {
delete objectMetadata.image_watermark;
} else if(type === "forensic") {
delete objectMetadata.forensic_watermark;
}
});
await this.ReplaceMetadata({
libraryId,
objectId,
writeToken,
metadataSubtree: metadataPath,
metadata: objectMetadata
});
if(edgeWriteToken) {
await this.ReplaceMetadata({
libraryId,
objectId,
writeToken: edgeWriteToken,
metadataSubtree: metadataPath,
metadata: objectMetadata
});
}
if(finalize) {
const finalizeResponse = await this.FinalizeContentObject({
libraryId,
objectId,
writeToken,
commitMessage: "Watermark removed"
});
return finalizeResponse;
}
};
/**
* Create a watermark for a live stream
*
* @methodGroup Live Stream
* @namedParams
* @param {string=} libraryId - Library ID of the live stream
* @param {string} objectId - Object ID of the live stream
* @param {string=} writeToken - Write token of the draft
* @param {Object} simpleWatermark - Text watermark
* @param {Object} imageWatermark - Image watermark
* @param {Object} forensicWatermark - Forensic watermark
* @param {boolean=} finalize - If enabled, target object will be finalized after adding watermark
* Watermark examples:
*
* Simple Watermark:
{
"font_color": "",
"font_relative_height": 0,
"shadow": false,
"template": "",
"timecode": "",
"timecode_rate": 0,
"x": "",
"y": ""
}
*
* Image watermark:
{
"image": "",
"align_h": "",
"align_v": "",
"target_video_height": 0,
"wm_enabled": false
}
*
* Forensic watermark:
{
"algo": 6,
"forensic_duration": 0,
"forensic_start": "",
"image_a": <path_to_image>,
"image_b": <path_to_image>,
"is_stub": true,
"payload_bit_nb": 23,
"wm_enabled": true
}
*
*
* @return {Promise<Object>} - The finalize response
*/
exports.StreamAddWatermark = async function({
libraryId,
objectId,
writeToken,
simpleWatermark,
imageWatermark,
forensicWatermark,
finalize=true
}) {
ValidateObject(objectId);
if(!libraryId) {
libraryId = await this.ContentObjectLibraryId({objectId});
}
if(!writeToken) {
({writeToken} = await this.EditContentObject({
objectId,
libraryId
}));
}
const edgeWriteToken = await this.ContentObjectMetadata({
objectId,
libraryId,
metadataSubtree: "/live_recording/fabric_config/edge_write_token"
});
const watermarkType = imageWatermark ? "image" : forensicWatermark ? "forensic" : "text";
const metadataPath = "live_recording/playout_config";
this.Log(`Adding watermarking type: ${watermarkType} ${libraryId} ${objectId}`);
const objectMetadata = await this.ContentObjectMetadata({
libraryId,
objectId,
writeToken,
metadataSubtree: metadataPath,
resolveLinks: false
});
if(!objectMetadata) {
throw Error("Stream object must be configured before adding a watermark");
}
const watermarkArgCount = [simpleWatermark, imageWatermark, forensicWatermark].filter(i => !!i).length;
console.log("watermark arg count", watermarkArgCount);
if(watermarkArgCount === 0) {
throw Error("No watermark was provided");
} else if(watermarkArgCount > 1) {
throw Error("Only one watermark is allowed");
}
if(simpleWatermark) {
objectMetadata.simple_watermark = simpleWatermark;
} else if(imageWatermark) {
objectMetadata.image_watermark = imageWatermark;
} else if(forensicWatermark) {
objectMetadata.forensic_watermark = forensicWatermark;
}
await this.ReplaceMetadata({
libraryId,
objectId,
writeToken,
metadataSubtree: metadataPath,
metadata: objectMetadata
});
if(edgeWriteToken) {
await this.ReplaceMetadata({
libraryId,
objectId,
writeToken: edgeWriteToken,
metadataSubtree: metadataPath,
metadata: objectMetadata
});
}
const response = {
"imageWatermark": objectMetadata.image_watermark,
"textWatermark": objectMetadata.simple_watermark,
"forensicWatermark": objectMetadata.forensic_watermark
};
if(finalize) {
const finalizeResponse = await this.FinalizeContentObject({
libraryId,
objectId,
writeToken,
commitMessage: "Watermark set"
});
response.hash = finalizeResponse.hash;
}
return response;
};
/**
* Audit the specified live stream against several content fabric nodes
*
* @methodGroup Live Stream
* @namedParams
* @param {string=} objectId - Object ID of the live stream
* @param {string=} versionHash - Version hash of the live stream -- if not specified, latest version is returned
* @param {string=} salt - base64-encoded byte sequence for salting the audit hash
* @param {Array<number>=} samples - list of percentages (0.0 - <1.0) used for sampling the content part list, up to 3
* @param {string=} authorizationToken - Additional authorization token for this request
*
* @returns {Promise<Object>} - Response describing audit results
*/
exports.AuditStream = async function({objectId, versionHash, salt, samples, authorizationToken}) {
return await ContentObjectAudit.AuditContentObject({
client: this,
objectId,
versionHash,
salt,
samples,
live: true,
authorizationToken
});
};
/**
* @typedef {Object} LiveOutput
* @property {boolean=} enabled - Whether the output is enabled
* @property {string=} name - Display name for the output
* @property {string=} description - Description of the output
* @property {string=} external_id - External identifier for the output
* @property {boolean=} reset - Whether to reset the output
* @property {Object=} input - Input stream configuration
* @property {string=} input.stream - Object ID of the input stream (null to disconnect)
* @property {Object=} srt_pull - SRT pull delivery configuration
* @property {Array<string>=} srt_pull.node_ids - Egress node IDs for SRT delivery (max 1)
* @property {string=} srt_pull.passphrase - SRT passphrase for encrypted delivery
* @property {boolean=} srt_pull.strip_rtp - Whether to strip RTP headers
* @property {Object=} srt_pull.connection - Additional SRT connection configuration
* @property {Array<string>=} srt_pull.urls - SRT URLs (returned by server, not set by caller)
*/
// Resolve egress node and replace SRT URL with the egress endpoint hostname.
// Necessary because the backend API doesn't return the proper SRT URLs currently.
exports.OutputsResolveSrtPullUrls = async function({value}) {
const nodeId = value.srt_pull?.node_ids?.[0];
if(!nodeId) { return value; }
const nodes = await this.SpaceNodes({matchNodeId: nodeId});
const fabricUrl = nodes?.[0]?.services?.fabric_api?.urls?.[0];
if(fabricUrl) {
const egressHost = new URL(fabricUrl).hostname;
if(value.srt_pull?.urls) {
value.srt_pull.urls = value.srt_pull.urls.map(url =>
url.replace(/^srt:\/\/[^:/?]+/, `srt://${egressHost}`)
);
}
}
return value;
};
/**
* List all live outputs for a stream object, optionally including live state.
*
* @methodGroup Live Stream
* @namedParams
* @param {string=} libraryId - Library ID of the output settings object. If not provided, it will be retrieved automatically.
* @param {string} objectId - Object ID of the output settings object
* @param {boolean=} includeState - If true, also retrieve live state from each output's egress node (default: true)
*
* @returns {Promise<Object<string, LiveOutput>>} - Map of output IDs to LiveOutput objects, each optionally with a `state` field
*/
exports.OutputsList = async function({libraryId, objectId, includeState=true}) {
ValidateObject(objectId);
if(!libraryId) {
libraryId = await this.ContentObjectLibraryId({objectId});
}
// Route to any live egress node for the initial list call (only necessary until the API is globally available)
const {restore} = await RouteToLiveEgress({client: this});
let outputs;
try {
outputs = await this.CallBitcodeMethod({
libraryId,
objectId,
method: "live/outputs",
constant: true
});
} finally {
restore();
}
for(let [key, value] of Object.entries(outputs)) {
const streamId = value.input?.stream;
if(streamId) {
const streamMetadata = await this.ContentObjectMetadata({
libraryId: await this.ContentObjectLibraryId({objectId: streamId}),
objectId: streamId,
metadataSubtree: "/public/name",
});
const streamStatus = await this.StreamStatus({name: streamId});
value.input.name = streamMetadata;
value.input.status = streamStatus?.state;
}
value = await this.OutputsResolveSrtPullUrls({value});
if(includeState) {
try {
const nodeId = value.srt_pull?.node_ids?.[0];
const result = await this.OutputsState({outputId: key, objectId, libraryId, nodeId, includeState: true});
value.state = result.state;
} catch(error) {
this.Log(`Failed to retrieve state for output ${key}: ${error.message}`, true);
value.state = {};
}
}
}
return outputs;
};
/**
* Get the configuration of a single live output by ID, optionally including live state.
*
* @methodGroup Live Stream
* @namedParams
* @param {string=} libraryId - Library ID of the output settings object. If not provided, it will be retrieved automatically.
* @param {string} objectId - Object ID of the output settings object
* @param {string} outputId - ID of the output to retrieve
* @param {boolean=} includeState - If true, also retrieve live state from the output's egress node (default: true)
*
* @returns {Promise<LiveOutput>} - Output config, optionally with a `state` field containing client_stats and srt_stats
*/
exports.OutputsListItem = async function({libraryId, objectId, outputId, includeState=true}) {
ValidateObject(objectId);
ValidatePresence("outputId", outputId);
if(!libraryId) {
libraryId = await this.ContentObjectLibraryId({objectId});
}
const {restore} = await RouteToLiveEgress({client: this});
let outputs;
try {
outputs = await this.CallBitcodeMethod({
libraryId,
objectId,
method: "live/outputs",
constant: true
});
} finally {
restore();
}
let value = outputs[outputId];
if(!value) {
throw new Error(`Output not found: ${outputId}`);
}
const streamId = value.input?.stream;
if(streamId) {
const streamMetadata = await this.ContentObjectMetadata({
libraryId: await this.ContentObjectLibraryId({objectId: streamId}),
objectId: streamId,
metadataSubtree: "/public/name",
});
const streamStatus = await this.StreamStatus({name: streamId});
value.input.name = streamMetadata;
value.input.status = streamStatus?.state;
}
value = await this.OutputsResolveSrtPullUrls({value});
if(includeState) {
try {
const nodeId = value.srt_pull?.node_ids?.[0];
const result = await this.OutputsState({outputId, objectId, libraryId, nodeId, includeState: true});
value.state = result.state;
} catch(error) {
this.Log(`Failed to retrieve state for output ${outputId}: ${error.message}`, true);
value.state = {};
}
}
return value;
};
/**
* Get the configuration of a specific live output, optionally including live state.
* Retrieves the output config from a live egress node. If includeState is true, also
* queries the output's specific egress node for live client and SRT stats.
*
* @methodGroup Live Stream
* @namedParams
* @param {string=} libraryId - Library ID of the output settings object. If not provided, it will be retrieved automatically.
* @param {string} objectId - Object ID of the output settings object
* @param {string} outputId - ID of the output to retrieve state for
* @param {string=} nodeId - Node ID to query for state. If not provided, it will be retrieved from the output's config.
* @param {boolean=} includeState - If true, also retrieve live state from the output's egress node (default: true)
*
* @returns {Promise<LiveOutput>} - Output config, optionally with a `state` field containing client_stats and srt_stats
*/
exports.OutputsState = async function({libraryId, objectId, outputId, nodeId, includeState=true}) {
ValidateObject(objectId);
ValidatePresence("outputId", outputId);
if(!libraryId) {
libraryId = await this.ContentObjectLibraryId({objectId});
}
// Route to a live egress node first so the output config fetch below succeeds
const {restore} = await RouteToLiveEgress({client: this});
try {
const {config} = await RouteToOutputNode({client: this, libraryId, objectId, outputId, nodeId});
if(!includeState) {
return config;
}
const state = await this.CallBitcodeMethod({
libraryId,
objectId,
method: UrlJoin("live", "outputs", outputId, "state"),
queryParams: {
"client_stats": 1,
"srt_stats": 1
},
constant: true
});
return {
...config,
state
};
} finally {
restore();
}
};
/**
* Pin (route) the client to the egress node for a specific output. Fetches the output config
* to determine the node ID then sets the client fabric URIs.
* Currently node ID is retrieved from srt_pull.node_ids (only srt_pull outputs are supported).
* Assumes the client is already routed to an eligible egress node (via RouteToLiveEgress).
* Returns a function that restores the original fabric URIs.
*
* @param {Object} client - ElvClient instance
* @param {string} libraryId - Library ID of the output settings object
* @param {string} objectId - Object ID of the output settings object
* @param {string} outputId - ID of the output
* @param {string=} nodeId - Node ID if already known (skips config fetch)
* @returns {Promise<{restore: Function, config: Object}>} - restore function and output config
*/
const RouteToOutputNode = async ({client, libraryId, objectId, outputId, nodeId}) => {
const savedURIs = [...client.fabricURIs];
const restore = () => client.SetNodes({fabricURIs: savedURIs});
let config;
if(!nodeId) {
config = await client.CallBitcodeMethod({
libraryId,
objectId,
method: UrlJoin("live", "outputs", outputId),
constant: true
});
nodeId = config?.srt_pull?.node_ids?.[0];
}
if(nodeId) {
const nodes = await client.SpaceNodes({matchNodeId: nodeId});
const fabricUrl = nodes?.[0]?.services?.fabric_api?.urls?.[0];
if(fabricUrl) {
client.SetNodes({fabricURIs: [fabricUrl]});
if(config) { config = await client.OutputsResolveSrtPullUrls({value: config}); }
}
}
return {restore, config};
};
/**
* Pin (route) the client to any eligible live egress node from the /config API.
* Returns a function that restores the original fabric URIs.
*
* @param {Object} client - ElvClient instance
* @returns {Promise<{restore: Function}>} - restore function
*/
const RouteToLiveEgress = async ({client}) => {
const savedURIs = [...client.fabricURIs];
const restore = () => client.SetNodes({fabricURIs: savedURIs});
const nodeId = await RetrieveOutputNodeId({client});
if(nodeId) {
const nodes = await client.SpaceNodes({matchNodeId: nodeId});
const fabricUrl = nodes?.[0]?.services?.fabric_api?.urls?.[0];
if(fabricUrl) {
client.SetNodes({fabricURIs: [fabricUrl]});
}
}
return {restore};
};
/**
* Resolve a node ID for live egress output. If nodeIds is provided, uses the first element directly.
* Otherwise, calls the /config API (optionally filtered by geo) to get live_egress endpoints,
* then resolves the first endpoint to a node ID via SpaceNodes.
*
* @param {Object} client - ElvClient instance
* @param {Array<string>=} nodeIds - Explicit node IDs to use
* @param {Array<string>=} geos - Geo regions to filter config API results (max 1)
* @returns {Promise<string>} - A node ID for the output
*/
const RetrieveOutputNodeId = async ({client, nodeIds, geos}) => {
if(nodeIds) {
return nodeIds[0];
}
const uri = new URI(client.ConfigUrl());
uri.pathname("/config");
if(geos && geos.length > 0) {
uri.addSearch("elvgeo", geos[0]);
}
const fabricInfo = await client.utils.ResponseToJson(
HttpClient.Fetch(uri.toString())
);
const liveEgressUrls = fabricInfo.network.services.live_egress;
if(!liveEgressUrls || liveEgressUrls.length === 0) {
throw new Error("No live_egress endpoints found in fabric config");
}
// Extract hostname from the first live_egress URL and resolve to a node ID
const hostname = new URL(liveEgressUrls[0]).hostname;
const nodes = await client.SpaceNodes({matchEndpoint: hostname});
if(!nodes || nodes.length === 0) {
throw new Error(`No node found matching live_egress endpoint: ${hostname}`);
}
return nodes[0].id;
};
/**
* Create a new live output.
*
* At the current version of the live outputs API an output will be pinned to a node, by either:
* - specifying the node directly in 'nodeIds'
* - specifying an 'elvgeo' and use fabric config 'live_egress' services to pick a node ID
*
* Note: Output creation and modification is transactional. To create multiple outputs in a single
* transaction, use EditContentObject to open a write token, call CallBitcodeMethod for each output,
* then finalize with FinalizeContentObject. This method handles a single output end-to-end.
*
* @methodGroup Live Stream
* @namedParams
* @param {string=} libraryId - Library ID of the output settings object. If not provided, it will be retrieved automatically.
* @param {string} objectId - Object ID of the outputs settings object
* @param {string=} streamObjectId - Object ID of the input stream to use as the output source
* @param {string=} name - Display name for the output
* @param {string=} description - Description of the output
* @param {Array<string>=} nodeIds - Explicit node ID(s) for SRT delivery (max 1)
* @param {Array<string>=} geos - Geo regions for SRT delivery (max 1) — used to resolve a node from live_egress endpoints
* @param {string=} passphrase - SRT passphrase for encrypted delivery
* @param {boolean=} stripRtp - Whether to strip RTP headers (default: false)
* @param {Object=} srtConfig - Additional SRT connection configuration (see openapi-bitcode.html#tocssrtconnectionconfig)
*
* @returns {Promise<Object>} - The created output
*/
exports.OutputsCreate = async function({
libraryId,
objectId,
streamObjectId,
enabled,
name,
description,
externalId,
nodeIds,
geos=[],
passphrase,
stripRtp=false,
srtConfig
}) {
ValidateObject(objectId);
if(nodeIds && geos.length > 0) {
throw new Error("Specify either nodeIds or geos, not both");
}
if(nodeIds && nodeIds.length > 1) {
throw new Error("Only one node ID is supported — nodeIds must have at most 1 element");
}
if(geos.length > 1) {
throw new Error("Only one geo is supported — geos must have at most 1 element");
}
if(!libraryId) {
libraryId = await this.ContentObjectLibraryId({objectId});
}
const resolvedNodeId = await RetrieveOutputNodeId({client: this, nodeIds, geos});
// Route to any live egress node
const {restore} = await RouteToLiveEgress({client: this});
// Auto-generate passphrase if encryption is truthy
if(srtConfig?.enforced_encryption && !passphrase) {
passphrase = Buffer.from(globalThis.crypto.getRandomValues(new Uint8Array(16))).toString("base64").replace(/\+/g, "-").replace(/\//g, "_").replace(/=/g, "");
srtConfig["pb_keylen"] = 16;
}
try {
const output = {
enabled: streamObjectId ? enabled : false, // Output must be disabled if no stream specified
name,
description,
external_id: externalId,
input: streamObjectId ? {stream: streamObjectId} : undefined,
srt_pull: {
connection: srtConfig ?? undefined,
node_ids: [resolvedNodeId],
passphrase,
strip_rtp: stripRtp
}
};
const {writeToken} = await this.EditContentObject({libraryId, objectId});
// Note - you may create multiple outputs here, then finalize the transaction below
const outputs = await this.CallBitcodeMethod({
libraryId,
objectId,
writeToken,
method: "live/outputs",
constant: false,
body: output
});
await this.FinalizeContentObject({
libraryId,
objectId,
writeToken,
commitMessage: "Create output"
});
return outputs;
} finally {
restore();
}
};
/**
* Modify an existing live output.
*
* Note: Supply all fields when modifying an output — read the current output first, then apply changes.
*
* Note: Output modification is transactional. To modify multiple outputs in a single
* transaction, use EditContentObject to open a write token, call CallBitcodeMethod for each output,
* then finalize with FinalizeContentObject. This method handles a single output end-to-end.
*
* @methodGroup Live Stream
* @namedParams
* @param {string=} libraryId - Library ID of the output settings object. If not provided, it will be retrieved automatically.
* @param {string} objectId - Object ID of the output settings object
* @param {string} outputId - ID of the output to modify
* @param {LiveOutput} output - Full output object to PUT (read the current output first, apply changes, then pass the result)
* @param {string=} writeToken - Write token to use. If not provided, a new edit will be opened.
* @param {boolean=} finalize - If true, finalize after modifying (default: true)
*
* @returns {Promise<Object>} - The modified output
*/
exports.OutputsModify = async function({
libraryId,
objectId,
outputId,
output,
writeToken,
finalize=true
}) {
ValidateObject(objectId);
ValidatePresence("output", output);
if(!libraryId) {
libraryId = await this.ContentObjectLibraryId({objectId});
}
// Route to any live egress node
const {restore} = await RouteToLiveEgress({client: this});
try {
if(!writeToken) {
({writeToken} = await this.EditContentObject({libraryId, objectId}));
}
if(output.srt_pull?.connection?.enforced_encryption && !output.srt_pull?.passphrase) {
output.srt_pull.passphrase = Buffer.from(
globalThis.crypto.getRandomValues(new Uint8Array(16))
).toString("base64").replace(/\+/g, "-").replace(/\//g, "_").replace(/=/g, "");
}
const outputs = await this.CallBitcodeMethod({
libraryId,
objectId,
writeToken,
method: UrlJoin("live", "outputs", outputId),
verb: "PUT",
constant: false,
body: output
});
if(finalize) {
await this.FinalizeContentObject({
libraryId,
objectId,
writeToken,
commitMessage: "Modify output"
});
}
return outputs;
} finally {
restore();
}
};
/**
* Modify multiple live outputs in a single transaction.
*
* Takes a map of output IDs to partial output configurations. Routes to any eligible live
* egress node, opens a write token, posts the map to live/outputs, then finalizes.
*
* Example:
* {
* "out001": { "enabled": false, "input": { "stream": "iq__..." }, "name": "A03" },
* "out002": { "enabled": true }
* }
*
* @methodGroup Live Stream
* @namedParams
* @param {string=} libraryId - Library ID of the output settings object. If not provided, it will be retrieved automatically.
* @param {string} objectId - Object ID of the output settings object
* @param {Object<string, LiveOutput>} outputs - Map of output IDs to output configurations
*
* @returns {Promise<Object>} - The response from the bitcode call
*/
exports.OutputsModifyBatch = async function({libraryId, objectId, outputs}) {
ValidateObject(objectId);
ValidatePresence("outputs", outputs);
if(!libraryId) {
libraryId = await this.ContentObjectLibraryId({objectId});
}
const {restore} = await RouteToLiveEgress({client: this});
try {
// Read all current outputs and merge changes on top
const current = await this.CallBitcodeMethod({
libraryId,
objectId,
method: "live/outputs",
constant: true
});
const merged = {...current};
for(const [id, changes] of Object.entries(outputs)) {
merged[id] = {...(current[id] || {}), ...changes};
}
const {writeToken} = await this.EditContentObject({libraryId, objectId});
const result = await this.CallBitcodeMethod({
libraryId,
objectId,
writeToken,
method: "live/outputs",
verb: "PUT",
constant: false,
body: merged
});
await this.FinalizeContentObject({
libraryId,
objectId,
writeToken,
commitMessage: "Modify outputs (batch)"
});
return result;
} finally {
restore();
}
};
/**
* Stop a live output.
*
* @methodGroup Live Stream
* @namedParams
* @param {string=} libraryId - Library ID of the output settings object. If not provided, it will be retrieved automatically.
* @param {string} objectId - Object ID of the output settings object
* @param {string} outputId - ID of the output to stop
*
* @returns {Promise<Object>} - Response from the stop call
*/
exports.OutputsStop = async function({libraryId, objectId, outputId}) {
ValidateObject(objectId);
ValidatePresence("outputId", outputId);
if(!libraryId) {
libraryId = await this.ContentObjectLibraryId({objectId});
}
// Route to a live egress node, then to the specific output's node
const {restore} = await RouteToLiveEgress({client: this});
await RouteToOutputNode({client: this, libraryId, objectId, outputId});
try {
const {writeToken} = await this.EditContentObject({libraryId, objectId});
return await this.CallBitcodeMethod({
libraryId,
objectId,
writeToken,
method: UrlJoin("live", "outputs", outputId, "ctrl", "stop"),
constant: false
});
} finally {
restore();
}
};
/**
* Delete a live output.
*
* @methodGroup Live Stream
* @namedParams
* @param {string=} libraryId - Library ID of the output settings object. If not provided, it will be retrieved automatically.
* @param {string} objectId - Object ID of the output settings object
* @param {string} outputId - ID of the output to delete
*
* @returns {Promise<Object>} - Response from the delete call
*/
exports.OutputsDelete = async function({libraryId, objectId, outputId}) {
ValidateObject(objectId);
ValidatePresence("outputId", outputId);
if(!libraryId) {
libraryId = await this.ContentObjectLibraryId({objectId});
}
// Route to any live egress node
const {restore} = await RouteToLiveEgress({client: this});
try {
const {writeToken} = await this.EditContentObject({libraryId, objectId});
const result = await this.CallBitcodeMethod({
libraryId,
objectId,
writeToken,
method: UrlJoin("live", "outputs", outputId),
verb: "DELETE",
constant: false,
});
await this.FinalizeContentObject({
libraryId,
objectId,
writeToken,
commitMessage: "Remove output"
});
return result;
} finally {
restore();
}
};
/**
* Delete multiple live outputs in a single operation.
*
* @methodGroup Live Stream
* @namedParams
* @param {string=} libraryId - Library ID of the output settings object. If not provided, it will be retrieved automatically.
* @param {string} objectId - Object ID of the output settings object
* @param {Array<string>} outputs - List of output IDs to delete
*
* @returns {Promise<Object>} - Response from the delete call
*/
exports.OutputsDeleteBatch = async function({libraryId, objectId, outputs}) {
ValidateObject(objectId);
ValidatePresence("outputs", outputs);
if(!libraryId) {
libraryId = await this.ContentObjectLibraryId({objectId});
}
const {restore} = await RouteToLiveEgress({client: this});
try {
const {writeToken} = await this.EditContentObject({libraryId, objectId});
const result = await this.CallBitcodeMethod({
libraryId,
objectId,
writeToken,
method: UrlJoin("live", "outputs", outputId),
verb: "DELETE",
constant: false,
});
await this.FinalizeContentObject({
libraryId,
objectId,
writeToken,
commitMessage: "Remove outputs (batch)"
});
return result;
} finally {
restore();
}
};