/**
* Methods for Live Stream creation and management
*
* @module ElvClient/LiveStream
*/
const {LiveConf} = require("./LiveConf");
const path = require("path");
const fs = require("fs");
const HttpClient = require("../HttpClient");
const Fraction = require("fraction.js");
const {ValidateObject, ValidatePresence} = require("../Validation");
const ContentObjectAudit = require("../ContentObjectAudit");
const MakeTxLessToken = async({client, libraryId, objectId, versionHash}) => {
const tok = await client.authClient.AuthorizationToken({libraryId, objectId,
versionHash, channelAuth: false, noCache: true,
noAuth: true});
return tok;
};
const Sleep = (ms) => {
return new Promise(resolve => setTimeout(resolve, ms));
};
const CueInfo = async ({eventId, status}) => {
let cues;
try {
const lroStatusResponse = await this.utils.ResponseToJson(
await HttpClient.Fetch(status.lro_status_url)
);
console.log("lroStatusResponse", lroStatusResponse);
cues = lroStatusResponse.custom.cues;
} catch(error) {
console.log("LRO status failed", error);
return {error: "failed to retrieve status", eventId};
}
let eventStart, eventEnd;
for(const value of Object.values(cues)) {
for(const event of Object.values(value.descriptors)) {
if(event.id == eventId) {
switch(event.type_id) {
case 32:
case 16:
eventStart = value.insertion_time;
break;
case 33:
case 17:
eventEnd = value.insertion_time;
break;
}
}
}
}
return {eventStart, eventEnd, eventId};
};
/**
* Set the offering for the live stream
*
* @methodGroup Live Stream
* @namedParams
* @param {Object} client - The client object
* @param {string} libraryId - ID of the library for the new live stream object
* @param {string} objectId - ID of the new live stream object
* @param {string=} typeAbrMaster - Content type hash
* @param {string=} typeLiveStream - Content type hash
* @param {string} streamUrl - Live source URL
* @param {object} abrProfile - ABR Profile for the offering
* @param {number} aBitRate - Audio bitrate
* @param {number} aChannels - Audio channels
* @param {number} aSampleRate - Audio sample rate
* @param {number} aStreamIndex - Audio stream index
* @param {string} aTimeBase - Audio time base as a fraction, e.g. "1/48000" (usually equal to 1/aSampleRate)
* @param {string} aChannelLayout - Channel layout, e.g. "stereo"
* @param {number} vBitRate - Video bitrate
* @param {number} vHeight - Video height
* @param {number} vStreamIndex - Video stream index
* @param {number} vWidth - Video width
* @param {string} vDisplayAspectRatio - Display aspect ratio as a fraction, e.g. "16/9"
* @param {string} vFrameRate - Frame rate as an integer, e.g. "30"
* @param {string} vTimeBase - Time base as a fraction, e.g. "1/30000"
*
* @return {Promise<string>} - Final hash of the live stream object
*/
const StreamGenerateOffering = async({
client,
libraryId,
objectId,
typeAbrMaster,
typeLiveStream,
streamUrl,
abrProfile,
aBitRate,
aChannels,
aSampleRate,
aStreamIndex,
aTimeBase,
aChannelLayout,
vBitRate,
vHeight,
vStreamIndex,
vWidth,
vDisplayAspectRatio,
vFrameRate,
vTimeBase
}) => {
// compute duration_ts
const DUMMY_DURATION = 1001; // should result in integer duration_ts values for both audio and video
const aDurationTs = Fraction(aTimeBase).inverse().mul(DUMMY_DURATION).valueOf();
const vDurationTs = Fraction(vTimeBase).inverse().mul(DUMMY_DURATION).valueOf();
// construct /production_master/sources/STREAM_URL/streams
const sourceAudioStream = {
"bit_rate": aBitRate,
"channel_layout": aChannelLayout,
"channels": aChannels,
"codec_name": "aac",
"duration": DUMMY_DURATION,
"duration_ts": aDurationTs,
"frame_count": 0,
"language": "",
"max_bit_rate": aBitRate,
"sample_rate": aSampleRate,
"start_pts": 0,
"start_time": 0,
"time_base": aTimeBase,
"type": "StreamAudio"
};
const sourceVideoStream = {
"bit_rate": vBitRate,
"codec_name": "h264",
"display_aspect_ratio": vDisplayAspectRatio,
"duration": DUMMY_DURATION,
"duration_ts": vDurationTs,
"field_order": "progressive",
"frame_count": 0,
"frame_rate": vFrameRate,
"hdr": null,
"height": vHeight,
"language": "",
"max_bit_rate": vBitRate,
"sample_aspect_ratio": "1",
"start_pts": 0,
"start_time": 0,
"time_base": vTimeBase,
"type": "StreamVideo",
"width": vWidth
};
// placeholder stream to use if [aStreamIndex, vStreamIndex].sort() is not [0,1]
const DUMMY_STREAM = {
"bit_rate": 0,
"codec_name": "",
"duration": DUMMY_DURATION,
"duration_ts": 2500 * DUMMY_DURATION,
"frame_count": 1,
"language": "",
"max_bit_rate": 0,
"start_pts": 0,
"start_time": 0,
"time_base": "1/2500",
"type": "StreamData"
};
const sourceStreams = [];
const maxStreamIndex = Math.max(aStreamIndex, vStreamIndex);
for(let i = 0; i <= maxStreamIndex; i++) {
if(i === aStreamIndex) {
sourceStreams.push(sourceAudioStream);
} else if(i === vStreamIndex) {
sourceStreams.push(sourceVideoStream);
} else {
sourceStreams.push(DUMMY_STREAM);
}
}
// construct /production_master/sources
const sources = {
[streamUrl]: {
"container_format": {
"duration": DUMMY_DURATION,
"filename": streamUrl,
"format_name": "mov,mp4,m4a,3gp,3g2,mj2",
"start_time": 0
},
"streams": sourceStreams
}
};
// construct /production_master/variants
const variants = {
"default": {
"streams": {
"audio": {
"default_for_media_type": false,
"label": "",
"language": "",
"mapping_info": "",
"sources": [
{
"files_api_path": streamUrl,
"stream_index": aStreamIndex
}
]
},
"video": {
"default_for_media_type": false,
"label": "",
"language": "",
"mapping_info": "",
"sources": [
{
"files_api_path": streamUrl,
"stream_index": vStreamIndex
}
]
}
}
}
};
// construct /production_master
const production_master = {sources, variants};
// get existing metadata
console.log("Retrieving current metadata...");
let metadata = await client.ContentObjectMetadata({
libraryId,
objectId
});
// add /production_master to metadata
metadata.production_master = production_master;
// write back to object
console.log("Getting write token...");
let editResponse = await client.EditContentObject({
libraryId,
objectId,
options: {
type: typeAbrMaster
}
});
let writeToken = editResponse.write_token;
console.log(`New write token: ${writeToken}`);
console.log("Writing back metadata with /production_master added...");
await client.ReplaceMetadata({
libraryId,
metadata,
objectId,
writeToken
});
console.log("Finalizing...");
let finalizeResponse = await client.FinalizeContentObject({
libraryId,
objectId,
writeToken
});
let masterVersionHash = finalizeResponse.hash;
console.log(`Finalized, new version hash: ${masterVersionHash}`);
// Generate offering
const createResponse = await client.CreateABRMezzanine({
libraryId,
objectId,
masterVersionHash,
variant: "default",
offeringKey: "default",
abrProfile
});
if(createResponse.warnings.length > 0) {
console.log("WARNINGS:");
console.log(JSON.stringify(createResponse.warnings, null, 2));
}
if(createResponse.errors.length > 0) {
console.log("ERRORS:");
console.log(JSON.stringify(createResponse.errors, null, 2));
}
let versionHash = createResponse.hash;
console.log(`New version hash: ${versionHash}`);
// get new metadata
console.log("Retrieving revised metadata with offering...");
metadata = await client.ContentObjectMetadata({
libraryId,
versionHash
});
console.log("Moving /abr_mezzanine/offerings to /offerings and removing /abr_mezzanine...");
metadata.offerings = metadata.abr_mezzanine.offerings;
delete metadata.abr_mezzanine;
// add items to media_struct needed to use options.json handler
metadata.offerings.default.media_struct.duration_rat = `${DUMMY_DURATION}`;
// write back to object
console.log("Getting write token...");
editResponse = await client.EditContentObject({
libraryId,
objectId,
options: {
type: typeLiveStream
}
});
writeToken = editResponse.write_token;
console.log(`New write token: ${writeToken}`);
console.log("Writing back metadata with /offerings...");
await client.ReplaceMetadata({
libraryId,
metadata,
objectId,
writeToken
});
console.log("Finalizing...");
finalizeResponse = await client.FinalizeContentObject({
libraryId,
objectId,
writeToken
});
const finalHash = finalizeResponse.hash;
console.log(`Finalized, new version hash: ${finalHash}`);
return finalHash;
};
/**
* Retrieve the status of the current live stream session
*
* @methodGroup Live Stream
* @namedParams
* @param {string} name - Object ID or name of the live stream object
* @param {boolean=} stopLro - If specified, will stop LRO
* @param {boolean=} showParams - If specified, will return recording_params with status
* States:
* unconfigured - no live_recording_config
* uninitialized - no live_recording config generated
* inactive - live_recording config initialized but no 'edge write token'
* stopped - edge-write-token but not started
* starting - LRO running but no source data yet
* running - stream is running and producing output
* stalled - LRO running but no source data (so not producing output)
*
* @return {Promise<Object>} - The status response for the object, as well as logs, warnings and errors from the master initialization
*/
exports.StreamStatus = async function({name, stopLro=false, showParams=false}) {
let objectId = name;
let status = {name: name};
try {
let libraryId = await this.ContentObjectLibraryId({objectId});
status.library_id = libraryId;
status.object_id = objectId;
let mainMeta = await this.ContentObjectMetadata({
libraryId,
objectId,
select: [
"live_recording_config",
"live_recording"
]
});
status.reference_url = mainMeta.live_recording_config.reference_url;
if(mainMeta.live_recording_config == undefined || mainMeta.live_recording_config.url == undefined) {
status.state = "unconfigured";
return status;
}
if(mainMeta.live_recording == undefined || mainMeta.live_recording.fabric_config == undefined ||
mainMeta.live_recording.playout_config == undefined || mainMeta.live_recording.recording_config == undefined) {
status.state = "uninitialized";
return status;
}
let fabURI = mainMeta.live_recording.fabric_config.ingress_node_api;
if(fabURI === undefined) {
console.log("bad fabric config - missing ingress node API");
status.state = "uninitialized";
return status;
}
// Support both hostname and URL ingress_node_api
if(!fabURI.startsWith("http")) {
// Assume https
fabURI = "https://" + fabURI;
}
status.fabric_api = fabURI;
status.url = mainMeta.live_recording.recording_config.recording_params.origin_url;
let edgeWriteToken = mainMeta.live_recording.fabric_config.edge_write_token;
if(!edgeWriteToken) {
status.state = "inactive";
return status;
}
this.RecordWriteToken({writeToken: edgeWriteToken, fabricNodeUrl: fabURI});
status.edge_write_token = edgeWriteToken;
status.stream_id = edgeWriteToken; // By convention the stream ID is its write token
let edgeMeta;
try {
edgeMeta = await this.ContentObjectMetadata({
libraryId: libraryId,
objectId: objectId,
writeToken: edgeWriteToken,
select: [
"live_recording"
]
});
} catch(error) {
console.error("Unable to read edge write token metadata. Has token been deleted?", error);
status.state = "inactive";
return status;
}
status.edge_meta_size = JSON.stringify(edgeMeta || "").length;
// If a stream has never been started return state 'inactive'
if(edgeMeta.live_recording === undefined ||
edgeMeta.live_recording.recordings === undefined ||
edgeMeta.live_recording.recordings.recording_sequence === undefined) {
status.state = "stopped";
return status;
}
let recordings = edgeMeta.live_recording.recordings;
status.recording_period_sequence = recordings.recording_sequence;
let sequence = recordings.recording_sequence;
let period = recordings.live_offering[sequence - 1];
let tlro = period.live_recording_handle;
status.tlro = tlro;
let videoLastFinalizationTimeEpochSec = -1;
let videoFinalizedParts = 0;
let sinceLastFinalize = -1;
if(period.finalized_parts_info && period.finalized_parts_info.video && period.finalized_parts_info.video.last_finalization_time) {
videoLastFinalizationTimeEpochSec = period.finalized_parts_info.video.last_finalization_time / 1000000;
videoFinalizedParts = period.finalized_parts_info.video.n_parts;
sinceLastFinalize = Math.floor(new Date().getTime() / 1000) - videoLastFinalizationTimeEpochSec;
}
let recording_period = {
activation_time_epoch_sec: period.recording_start_time_epoch_sec,
start_time_epoch_sec: period.start_time_epoch_sec,
start_time_text: new Date(period.start_time_epoch_sec * 1000).toLocaleString(),
end_time_epoch_sec: period.end_time_epoch_sec,
end_time_text: period.end_time_epoch_sec === 0 ? null : new Date(period.end_time_epoch_sec * 1000).toLocaleString(),
video_parts: videoFinalizedParts,
video_last_part_finalized_epoch_sec: videoLastFinalizationTimeEpochSec,
video_since_last_finalize_sec : sinceLastFinalize
};
status.recording_period = recording_period;
status.lro_status_url = await this.FabricUrl({
libraryId: libraryId,
objectId: objectId,
writeToken: edgeWriteToken,
call: "live/status/" + tlro
});
status.insertions = [];
if((edgeMeta.live_recording.playout_config.interleaves != undefined) &&
(edgeMeta.live_recording.playout_config.interleaves[sequence] != undefined)) {
let insertions = edgeMeta.live_recording.playout_config.interleaves[sequence];
for(let i = 0; i < insertions.length; i ++) {
let insertionTimeSinceEpoch = recording_period.start_time_epoch_sec + insertions[i].insertion_time;
status.insertions[i] = {
insertion_time_since_start: insertions[i].insertion_time,
insertion_time: new Date(insertionTimeSinceEpoch * 1000).toISOString(),
insertion_time_local: new Date(insertionTimeSinceEpoch * 1000).toLocaleString(),
target: insertions[i].playout};
}
}
if(showParams) {
status.recording_params = edgeMeta.live_recording.recording_config.recording_params;
}
let state = "stopped";
let lroStatus = "";
try {
lroStatus = await this.utils.ResponseToJson(
await HttpClient.Fetch(status.lro_status_url)
);
state = lroStatus.state;
status.warnings = lroStatus.custom && lroStatus.custom.warnings;
status.quality = lroStatus.custom && lroStatus.custom.quality;
if(lroStatus.custom && lroStatus.custom.status) {
status.recording_status = lroStatus.custom.status;
}
} catch(error) {
console.log("LRO Status (failed): ", error.response.statusCode);
status.state = "stopped";
status.error = error.response;
return status;
}
const segDurationMeta = edgeMeta.live_recording.recording_config.recording_params.xc_params.seg_duration
// Convert LRO 'state' to desired 'state'
if(state === "running" && videoLastFinalizationTimeEpochSec <= 0) {
state = "starting";
} else if(state === "running" && segDurationMeta !== undefined && sinceLastFinalize > (parseInt(segDurationMeta) + 5)) {
state = "stalled";
} else if(state == "terminated") {
state = "stopped";
}
status.state = state;
if((state === "running" || state === "stalled" || state === "starting") && stopLro) {
lroStopUrl = await this.FabricUrl({
libraryId,
objectId,
writeToken: edgeWriteToken,
call: "live/stop/" + tlro
});
try {
await this.utils.ResponseToJson(
await HttpClient.Fetch(lroStopUrl)
);
console.log("LRO Stop: ", lroStatus.body);
} catch(error) {
console.log("LRO Stop (failed): ", error.response.statusCode);
}
state = "stopped";
status.state = state;
}
if(state === "running") {
let playout_urls = {};
let playout_options = await this.PlayoutOptions({
objectId,
linkPath: "public/asset_metadata/sources/default"
});
let hls_clear_enabled = (
playout_options &&
playout_options.hls &&
playout_options.hls.playoutMethods &&
playout_options.hls.playoutMethods.clear !== undefined
);
if(hls_clear_enabled) {
playout_urls.hls_clear = await this.FabricUrl({
libraryId: libraryId,
objectId: objectId,
rep: "playout/default/hls-clear/playlist.m3u8",
});
}
let hls_aes128_enabled = (
playout_options &&
playout_options.hls &&
playout_options.hls.playoutMethods &&
playout_options.hls.playoutMethods["aes-128"] !== undefined
);
if(hls_aes128_enabled) {
playout_urls.hls_aes128 = await this.FabricUrl({
libraryId: libraryId,
objectId: objectId,
rep: "playout/default/hls-aes128/playlist.m3u8",
});
}
let hls_sample_aes_enabled = (
playout_options &&
playout_options.hls &&
playout_options.hls.playoutMethods &&
playout_options.hls.playoutMethods["sample-aes"] !== undefined
);
if(hls_sample_aes_enabled) {
playout_urls.hls_sample_aes = await this.FabricUrl({
libraryId: libraryId,
objectId: objectId,
rep: "playout/default/hls-sample-aes/playlist.m3u8",
});
}
const networkInfo = await this.NetworkInfo();
let token = await this.authClient.AuthorizationToken({
libraryId,
objectId,
channelAuth: false,
noCache: true,
noAuth: true
});
let embed_net = "main";
if(networkInfo.name.includes("demo")) {
embed_net = "demo";
}
let embed_url = `https://embed.v3.contentfabric.io/?net=${embed_net}&p&ct=h&oid=${objectId}&mt=lv&ath=${token}`;
playout_urls.embed_url = embed_url;
status.playout_urls = playout_urls;
}
} catch(error) {
console.error(error);
}
return status;
};
/**
* Create a new edge write token
*
* @methodGroup Live Stream
* @namedParams
* @param {string} name - Object ID or name of the live stream object
* @param {boolean=} start - If specified, will start the stream after creation
*
* @return {Promise<Object>} - The status response for the object
*
*/
exports.StreamCreate = async function({name, start=false}) {
let status = await this.StreamStatus({name});
if(status.state != "uninitialized" && status.state !== "inactive" && status.state !== "terminated" && status.state !== "stopped") {
return {
state: status.state,
error: "stream still active - must terminate first"
};
}
let objectId = status.object_id;
console.log("START: ", name, "start", start);
let libraryId = await this.ContentObjectLibraryId({objectId: objectId});
// Read live recording parameters - determine ingest node
let liveRecording = await this.ContentObjectMetadata({
libraryId: libraryId,
objectId: objectId,
metadataSubtree: "/live_recording"
});
let fabURI = liveRecording.fabric_config.ingress_node_api;
// Support both hostname and URL ingress_node_api
if(!fabURI.startsWith("http")) {
// Assume https
fabURI = "https://" + fabURI;
}
this.SetNodes({fabricURIs: [fabURI]});
console.log("Node URI", fabURI, "ID", liveRecording.fabric_config.ingress_node_id);
let response = await this.EditContentObject({
libraryId: libraryId,
objectId: objectId
});
const edgeToken = response.write_token;
console.log("Edge token:", edgeToken);
/*
* Set the metadata, including the edge token.
*/
response = await this.EditContentObject({
libraryId: libraryId,
objectId: objectId
});
let writeToken = response.write_token;
this.Log("Merging metadata: ", libraryId, objectId, writeToken);
await this.MergeMetadata({
libraryId: libraryId,
objectId: objectId,
writeToken: writeToken,
metadata: {
live_recording: {
status: {
edge_write_token: edgeToken,
state: "active" // indicates there is an active session (set to 'closed' when done)
},
fabric_config: {
edge_write_token: edgeToken
}
}
}
});
this.Log("Finalizing content draft: ", libraryId, objectId, writeToken);
response = await this.FinalizeContentObject({
libraryId: libraryId,
objectId: objectId,
writeToken: writeToken,
commitMessage: "Create stream edge write token " + edgeToken
});
const objectHash = response.hash;
this.Log("Finalized object: ", objectHash);
status = {
object_id: objectId,
hash: objectHash,
library_id: libraryId,
stream_id: edgeToken,
edge_write_token: edgeToken,
fabric_api: fabURI,
state: "stopped"
};
if(start) {
status = this.StreamStartOrStopOrReset({name, op: start});
}
return status;
};
/**
* Start, stop or reset a stream within the current session (current edge write token)
*
* @methodGroup Live Stream
* @namedParams
* @param {string} name - Object ID or name of the live stream object
* @param {string} op - The operation to perform. Possible values:
* 'start'
* 'reset' - Stops current LRO recording and starts a new one. Does
* not create a new edge write token (just creates a new recording
* period in the existing edge write token)
* - 'stop'
*
* @return {Promise<Object>} - The status response for the stream
*
*/
exports.StreamStartOrStopOrReset = async function({name, op}) {
try {
let status = await this.StreamStatus({name});
if(status.state != "stopped") {
if(op === "start") {
status.error = "Unable to start stream - state: " + status.state;
return status;
}
}
if(status.state == "running" || status.state == "starting" || status.state == "stalled") {
try {
await this.CallBitcodeMethod({
libraryId: status.library_id,
objectId: status.object_id,
writeToken: status.edge_write_token,
method: "/live/stop/" + status.tlro,
constant: false
});
} catch(error) {
// The /call/lro/stop API returns empty response
// console.log("LRO Stop (failed): ", error);
}
// Wait until LRO is terminated
let tries = 10;
while(status.state != "stopped" && tries-- > 0) {
console.log("Wait to terminate - ", status.state);
await Sleep(1000);
status = await this.StreamStatus({name});
}
console.log("Status after stop - ", status.state);
if(tries <= 0) {
console.log("Failed to stop");
return status;
}
}
if(op === "stop") {
return status;
}
console.log("STARTING", "edge_write_token", status.edge_write_token);
try {
await this.CallBitcodeMethod({
libraryId: status.library_id,
objectId: status.object_id,
writeToken: status.edge_write_token,
method: "/live/start",
constant: false
});
} catch(error) {
console.log("LRO Start (failed): ", error);
return {
state: status.state,
error: "LRO start failed - must create a stream first"
};
}
// Wait until LRO is 'starting'
let tries = 10;
while(status.state != "starting" && tries-- > 0) {
console.log("Wait to start - ", status.state);
await Sleep(1000);
status = await this.StreamStatus({name});
}
console.log("Status after restart - ", status.state);
return status;
} catch(error) {
console.error(error);
}
};
/**
* Close the edge write token and make the stream object inactive.
*
* @methodGroup Live Stream
* @namedParams
* @param {string} name - Object ID or name of the live stream object
*
* @return {Promise<Object>} - The finalize response for the stream object
*/
exports.StreamStopSession = async function({name}) {
try {
this.Log(`Terminating stream session for: ${name}`);
let objectId = name;
let libraryId = await this.ContentObjectLibraryId({objectId});
let mainMeta = await this.ContentObjectMetadata({
libraryId,
objectId
});
let fabURI = mainMeta.live_recording.fabric_config.ingress_node_api;
// Support both hostname and URL ingress_node_api
if(!fabURI.startsWith("http")) {
// Assume https
fabURI = "https://" + fabURI;
}
this.SetNodes({fabricURIs: [fabURI]});
const metaEdgeWriteToken = mainMeta.live_recording.fabric_config.edge_write_token;
if(!metaEdgeWriteToken) {
return {
state: "inactive",
error: "The stream is not active"
};
}
try {
const streamMetadata = await this.ContentObjectMetadata({
libraryId,
objectId,
writeToken: metaEdgeWriteToken
});
const status = await this.StreamStatus({name});
if(status.state !== "stopped") {
return {
state: status.state,
error: "The stream must be stopped before terminating"
};
}
await this.DeleteWriteToken({
libraryId,
writeToken: metaEdgeWriteToken
});
} catch(error) {
this.Log("Unable to retrieve metadata for edge write token");
}
const {writeToken} = await this.EditContentObject({
libraryId: libraryId,
objectId: objectId
});
// Set stop time and inactive state
const newState = "inactive";
const stopTime = Math.floor(new Date().getTime() / 1000);
const finalizeMetadata = {
live_recording: {
status: {
edge_write_token: "",
state: newState,
recording_stop_time: stopTime
},
fabric_config: {
edge_write_token: ""
}
}
};
await this.MergeMetadata({
libraryId,
objectId,
writeToken,
metadata: finalizeMetadata
});
let fin = await this.FinalizeContentObject({
libraryId,
objectId,
writeToken,
commitMessage: `Deactivate live stream - stop time ${stopTime}`
});
return {
fin,
name,
state: newState
};
} catch(error) {
console.error(error);
}
};
/**
* Initialize the stream object
*
* @methodGroup Live Stream
* @namedParams
* @param {string} name - Object ID or name of the live stream object
* @param {boolean=} drm - If specified, playout will be DRM protected
* @param {string=} format - Specify the list of playout formats and DRM to support,
comma-separated (hls-clear, hls-aes128, hls-sample-aes,
hls-fairplay)
*
* @return {Promise<Object>} - The name, object ID, and state of the stream
*/
exports.StreamInitialize = async function({name, drm=false, format}) {
let typeAbrMaster;
let typeLiveStream;
// Fetch Title and Live Stream content types from tenant meta
const tenantContractId = await this.userProfileClient.TenantContractId();
const {live_stream, title} = await this.ContentObjectMetadata({
libraryId: tenantContractId.replace("iten", "ilib"),
objectId: tenantContractId.replace("iten", "iq__"),
metadataSubtree: "public/content_types",
select: [
"live_stream",
"title"
]
});
if(live_stream) {
typeLiveStream = live_stream;
}
if(title) {
typeAbrMaster = title;
}
if(typeAbrMaster === undefined || typeLiveStream === undefined) {
console.log("ERROR - unable to find content types", "ABR Master", typeAbrMaster, "Live Stream", typeLiveStream);
return {};
}
const res = await this.StreamSetOfferingAndDRM({name, typeAbrMaster, typeLiveStream, drm, format});
return res;
};
/**
* Create a dummy VoD offering and initialize DRM keys.
*
* @methodGroup Live Stream
* @namedParams
* @param {string} name - Object ID or name of the live stream object
* @param {string=} typeAbrMaster - Content type hash
* @param {string=} typeLiveStream - Content type hash
* @param {boolean=} drm - If specified, DRM will be applied to the stream
* @param {string=} format - A list of playout formats and DRM to support, comma-separated
* (hls-clear, hls-aes128, hls-sample-aes, hls-fairplay). If specified,
* this will take precedence over the drm value
*
* @return {Promise<Object>} - The name, object ID, and state of the stream
*/
exports.StreamSetOfferingAndDRM = async function({name, typeAbrMaster, typeLiveStream, drm=false, format}) {
let status = await this.StreamStatus({name});
if(status.state != "uninitialized" && status.state != "inactive" && status.state != "stopped") {
return {
state: status.state,
error: "stream still active - must terminate first"
};
}
let objectId = status.object_id;
console.log("INIT: ", name, objectId);
const aBitRate = 128000;
const aChannels = 2;
const aSampleRate = 48000;
const aStreamIndex = 1;
const aTimeBase = "1/48000";
const aChannelLayout = "stereo";
const vBitRate = 14000000;
const vHeight = 720;
const vStreamIndex = 0;
const vWidth = 1280;
const vDisplayAspectRatio = "16/9";
const vFrameRate = "30000/1001";
const vTimeBase = "1/30000"; // "1/16000";
const abrProfileDefault = require("../abr_profiles/abr_profile_live_drm.js");
let playoutFormats;
let abrProfile = JSON.parse(JSON.stringify(abrProfileDefault));
if(format) {
drm = true; // Override DRM parameter
playoutFormats = {};
let formats = format.split(",");
for(let i = 0; i < formats.length; i++) {
if(formats[i] === "hls-clear") {
abrProfile.drm_optional = true;
playoutFormats["hls-clear"] = {
"drm": null,
"protocol": {
"type": "ProtoHls"
}
};
continue;
}
if(formats[i] === "dash-clear") {
abrProfile.drm_optional = true;
playoutFormats["dash-clear"] = {
"drm": null,
"protocol": {
"min_buffer_length": 2,
"type": "ProtoDash"
}
}
continue;
}
playoutFormats[formats[i]] = abrProfile.playout_formats[formats[i]];
}
} else if(!drm) {
abrProfile.drm_optional = true;
playoutFormats = {
"hls-clear": {
"drm": null,
"protocol": {
"type": "ProtoHls"
}
},
"dash-clear": {
"drm": null,
"protocol": {
"min_buffer_length": 2,
"type": "ProtoDash"
}
}
};
} else {
playoutFormats = Object.assign({}, abrProfile.playout_formats);
}
abrProfile.playout_formats = playoutFormats;
let libraryId = await this.ContentObjectLibraryId({objectId});
try {
let mainMeta = await this.ContentObjectMetadata({
libraryId,
objectId
});
let fabURI = mainMeta.live_recording.fabric_config.ingress_node_api;
// Support both hostname and URL ingress_node_api
if(!fabURI.startsWith("http")) {
// Assume https
fabURI = "https://" + fabURI;
}
this.SetNodes({fabricURIs: [fabURI]});
let streamUrl = mainMeta.live_recording.recording_config.recording_params.origin_url;
await StreamGenerateOffering({
client: this,
libraryId,
objectId,
typeAbrMaster,
typeLiveStream,
streamUrl,
abrProfile,
aBitRate,
aChannels,
aSampleRate,
aStreamIndex,
aTimeBase,
aChannelLayout,
vBitRate,
vHeight,
vStreamIndex,
vWidth,
vDisplayAspectRatio,
vFrameRate,
vTimeBase
});
console.log("Finished generating offering");
return {
name,
object_id: objectId,
state: "initialized"
};
} catch(error) {
console.error(error);
}
};
/**
* Add a content insertion entry
*
* @methodGroup Live Stream
* @namedParams
* @param {string} name - Object ID or name of the live stream object
* @param {number} insertionTime - Time in seconds (float)
* @param {boolean=} sinceStart - If specified, time specified will be elapsed seconds
* since stream start, otherwise, time will be elapsed since epoch
* @param {number=} duration - Time in seconds (float). Default: 20.0
* @param {string} targetHash - The target content object hash (playable)
* @param {boolean=} remove - If specified, will remove the inseration at the exact 'time' (instead of adding)
*
* @return {Promise<Object>} - Insertions, as well as any errors from bad insertions
*/
exports.StreamInsertion = async function({name, insertionTime, sinceStart=false, duration, targetHash, remove=false}) {
// Determine audio and video parameters of the insertion
// Content Type check is currently disabled due to permissions
/*
let ct = await this.client.ContentObject({versionHash});
if(ct.type != undefined && ct.type != "") {
let typeMeta = await this.client.ContentObjectMetadata({
versionHash: ct.type
});
if(typeMeta.bitcode_flags != "abrmaster") {
throw new Error("Not a playable VoD object " + versionHash);
}
}
*/
let offeringMeta = await this.ContentObjectMetadata({
versionHash: targetHash,
metadataSubtree: "/offerings/default"
});
var insertionInfo = {
duration_sec: 0 // Minimum of video and audio duration
};
["video", "audio"].forEach(mt => {
const stream = offeringMeta.media_struct.streams[mt];
insertionInfo[mt] = {
seg_duration_sec: stream.optimum_seg_dur.float,
duration_sec: stream.duration.float,
frame_rate_rat: stream.rate,
};
if(insertionInfo.duration_sec === 0 || stream.duration.float < insertionInfo.duration_sec) {
insertionInfo.duration_sec = stream.duration.float;
}
});
const audioAbrDuration = insertionInfo.audio.seg_duration_sec;
const videoAbrDuration = insertionInfo.video.seg_duration_sec;
if(audioAbrDuration === 0 || videoAbrDuration === 0) {
throw new Error("Bad segment duration hash:", targetHash);
}
if(duration === undefined) {
duration = insertionInfo.duration_sec; // Use full duration of the insertion
} else {
if(duration > insertionInfo.duration_sec) {
throw new Error("Bad duration - larger than insertion object duration", insertionInfo.duration_sec);
}
}
let objectId = name;
let libraryId = await this.ContentObjectLibraryId({objectId});
let mainMeta = await this.ContentObjectMetadata({
libraryId,
objectId
});
let fabURI = mainMeta.live_recording.fabric_config.ingress_node_api;
// Support both hostname and URL ingress_node_api
if(!fabURI.startsWith("http")) {
// Assume https
fabURI = "https://" + fabURI;
}
this.SetNodes({fabricURIs: [fabURI]});
let edgeWriteToken = mainMeta.live_recording.fabric_config.edge_write_token;
let edgeMeta = await this.ContentObjectMetadata({
libraryId,
objectId,
writeToken: edgeWriteToken
});
// Find stream start time (from the most recent recording section)
let recordings = edgeMeta.live_recording.recordings;
let sequence = 1;
let streamStartTime = 0;
if(recordings != undefined && recordings.recording_sequence != undefined) {
// We have at least one recording - check if still active
sequence = recordings.recording_sequence;
let period = recordings.live_offering[sequence - 1];
if(period.end_time_epoch_sec > 0) {
// The last period is closed - apply insertions to the next period
sequence ++;
} else {
// The period is active
streamStartTime = period.start_time_epoch_sec;
}
}
if(streamStartTime === 0) {
// There is no active period - must use absolute time
if(sinceStart === false) {
throw new Error("Stream not running - must use 'time since start'");
}
}
// Find the current period playout configuration
if(edgeMeta.live_recording.playout_config.interleaves === undefined) {
edgeMeta.live_recording.playout_config.interleaves = {};
}
if(edgeMeta.live_recording.playout_config.interleaves[sequence] === undefined) {
edgeMeta.live_recording.playout_config.interleaves[sequence] = [];
}
let playoutConfig = edgeMeta.live_recording.playout_config;
let insertions = playoutConfig.interleaves[sequence];
let res = {};
if(!sinceStart) {
insertionTime = insertionTime - streamStartTime;
}
// Assume insertions are sorted by insertion time
let errs = [];
let currentTime = -1;
let insertionDone = false;
let newInsertion = {
insertion_time: insertionTime,
duration: duration,
audio_abr_duration: audioAbrDuration,
video_abr_duration: videoAbrDuration,
playout: "/qfab/" + targetHash + "/rep/playout" // TO FIX - should be a link
};
for(let i = 0; i < insertions.length; i ++) {
if(insertions[i].insertion_time <= currentTime) {
// Bad insertion - must be later than current time
append(errs, "Bad insertion - time:", insertions[i].insertion_time);
}
if(remove) {
if(insertions[i].insertion_time === insertionTime) {
insertions.splice(i, 1);
break;
}
} else {
if(insertions[i].insertion_time > insertionTime) {
if(i > 0) {
insertions = [
...insertions.splice(0, i),
newInsertion,
...insertions.splice(i)
];
} else {
insertions = [
newInsertion,
...insertions.splice(i)
];
}
insertionDone = true;
break;
}
}
}
if(!remove && !insertionDone) {
// Add to the end of the insertions list
console.log("Add insertion at the end");
insertions = [
...insertions,
newInsertion
];
}
playoutConfig.interleaves[sequence] = insertions;
// Store the new insertions in the write token
await this.ReplaceMetadata({
libraryId: libraryId,
objectId: objectId,
writeToken: edgeWriteToken,
metadataSubtree: "/live_recording/playout_config",
metadata: edgeMeta.live_recording.playout_config
});
res.errors = errs;
res.insertions = insertions;
return res;
};
/**
* Configure the stream based on built-in logic and optional custom settings.
*
* Custom settings format:
* {
* "audio" {
* "1" : { // This is the stream index
* "tags" : "language: english",
* "codec" : "aac",
* "bitrate": 204000,
* "record": true,
* "recording_bitrate" : 192000,
* "recording_channels" : 2,
* "playout": bool
* "playout_label": "English (Stereo)"
* },
* "3": {
* ...
* }
* }
* }
*
* @methodGroup Live Stream
* @namedParams
* @param {string} name - Object ID or name of the live stream object
* @param {Object=} customSettings - Additional options to customize configuration settings
* @param {Object=} probeMetadata - Metadata for the probe. If not specified, a new probe will be configured
* @return {Promise<Object>} - The status response for the stream
*
*/
exports.StreamConfig = async function({name, customSettings={}, probeMetadata}) {
let objectId = name;
let status = {name};
let libraryId = await this.ContentObjectLibraryId({objectId});
status.library_id = libraryId;
status.object_id = objectId;
let probe = probeMetadata;
let mainMeta = await this.ContentObjectMetadata({
libraryId: libraryId,
objectId: objectId
});
let userConfig = mainMeta.live_recording_config;
status.user_config = userConfig;
// Get node URI from user config
const parsedName = userConfig.url
.replace("udp://", "https://")
.replace("rtmp://", "https://")
.replace("srt://", "https://");
const hostName = new URL(parsedName).hostname;
const streamUrl = new URL(userConfig.url);
this.Log(`Retrieving nodes - matching: ${hostName}`);
let nodes = await this.SpaceNodes({matchEndpoint: hostName});
if(nodes.length < 1) {
status.error = "No node matching stream URL " + streamUrl.href;
return status;
}
const node = {
endpoints: nodes[0].services.fabric_api.urls,
id: nodes[0].id
};
status.node = node;
let endpoint = node.endpoints[0];
if(!probe) {
this.SetNodes({fabricURIs: [endpoint]});
// Probe the stream
probe = {};
try {
let probeUrl = await this.Rep({
libraryId,
objectId,
rep: "probe"
});
probe = await this.utils.ResponseToJson(
await HttpClient.Fetch(probeUrl, {
body: JSON.stringify({
"filename": streamUrl.href,
"listen": true
}),
method: "POST"
})
);
if(probe.errors) {
throw probe.errors[0];
}
} catch(error) {
if(error.code === "ETIMEDOUT") {
throw "Stream probe time out - make sure the stream source is available";
} else {
throw error;
}
}
probe.format.filename = streamUrl.href;
}
// Create live recording config
let lc = new LiveConf(probe, node.id, endpoint, false, false, true);
const liveRecordingConfig = lc.generateLiveConf({
customSettings
});
// Store live recording config into the stream object
let e = await this.EditContentObject({
libraryId,
objectId: objectId
});
let writeToken = e.write_token;
await this.ReplaceMetadata({
libraryId,
objectId,
writeToken,
metadataSubtree: "live_recording",
metadata: liveRecordingConfig.live_recording
});
await this.ReplaceMetadata({
libraryId,
objectId,
writeToken,
metadataSubtree: "live_recording_config/probe_info",
metadata: probe
});
status.fin = await this.FinalizeContentObject({
libraryId,
objectId,
writeToken,
commitMessage: "Apply live stream configuration"
});
return status;
};
/**
* List the pre-allocated URLs for a site
*
* @methodGroup Live Stream
* @namedParams
* @param {string=} siteId - ID of the live stream site object
*
* @return {Promise<Object>} - The list of stream URLs
*/
exports.StreamListUrls = async function({siteId}={}) {
try {
const STATUS_MAP = {
UNCONFIGURED: "unconfigured",
UNINITIALIZED: "uninitialized",
INACTIVE: "inactive",
STOPPED: "stopped",
STARTING: "starting",
RUNNING: "running",
STALLED: "stalled",
};
if(!siteId) {
const tenantContractId = await this.userProfileClient.TenantContractId();
if(!tenantContractId) {
throw Error("No tenant contract ID configured");
}
siteId = await this.ContentObjectMetadata({
libraryId: tenantContractId.replace("iten", "ilib"),
objectId: tenantContractId.replace("iten", "iq__"),
metadataSubtree: "public/sites/live_streams",
});
}
const streamMetadata = await this.ContentObjectMetadata({
libraryId: await this.ContentObjectLibraryId({objectId: siteId}),
objectId: siteId,
metadataSubtree: "public/asset_metadata/live_streams",
resolveLinks: true,
resolveIgnoreErrors: true,
resolveIncludeSource: true
});
const activeUrlMap = {};
await this.utils.LimitedMap(
10,
Object.keys(streamMetadata || {}),
async slug => {
const stream = streamMetadata[slug];
let versionHash;
if(stream && stream["."] && stream["."].source) {
versionHash = stream["."].source;
}
if(versionHash) {
const objectId = this.utils.DecodeVersionHash(versionHash).objectId;
const libraryId = await this.ContentObjectLibraryId({objectId});
const status = await this.StreamStatus({
name: objectId
});
const streamMeta = await this.ContentObjectMetadata({
objectId,
libraryId,
select: [
"live_recording_config/reference_url",
// live_recording_config/url is the old path
"live_recording_config/url"
]
});
const url = streamMeta.live_recording_config.reference_url || streamMeta.live_recording_config.url;
const isActive = [STATUS_MAP.STARTING, STATUS_MAP.RUNNING, STATUS_MAP.STALLED, STATUS_MAP.STOPPED].includes(status.state);
if(url && isActive) {
activeUrlMap[url] = true;
}
}
}
);
const streamUrlStatus = {};
const streamUrls = await this.ContentObjectMetadata({
libraryId: await this.ContentObjectLibraryId({objectId: siteId}),
objectId: siteId,
metadataSubtree: "/live_stream_urls",
resolveLinks: true,
resolveIgnoreErrors: true
});
if(!streamUrls) {
throw Error("No pre-allocated URLs configured");
}
Object.keys(streamUrls || {}).forEach(protocol => {
streamUrlStatus[protocol] = streamUrls[protocol].map(url => {
return {
url,
active: activeUrlMap[url] || false
};
});
});
return streamUrlStatus;
} catch(error) {
console.error(error);
}
};
/**
* Copy a portion of a live stream recording into a standard VoD object using the zero-copy content fabric API
*
* Limitations:
* - currently requires the target object to be pre-created and have content encryption keys (CAPS)
* - for audio and video to be sync'd, the live stream needs to have the beginning of the desired recording period
* - for an event stream, make sure the TTL is long enough to allow running the live-to-vod command before the beginning of the recording expires
* - for 24/7 streams, make sure to reset the stream before the desired recording (as to create a new recording period) and have the TTL long enough
* to allow running the live-to-vod command before the beginning of the recording expires.
* - startTime and endTime are not currently implemented by this method
*
*
* @methodGroup Live Stream
* @namedParams
* @param {string} name - Object ID or name of the live stream
* @param {string} targetObjectId - Object ID of the target VOD object
* @param {string=} eventId -
* @param {boolean=} finalize - If enabled, target object will be finalized after copy to vod operations
* @param {number=} recordingPeriod - Determines which recording period to copy, which are 0-based. -1 copies the current (or last) period
*
* @return {Promise<Object>} - The status response for the stream
*/
/*
Example fabric API flow:
https://host-76-74-34-194.contentfabric.io/qlibs/ilib24CtWSJeVt9DiAzym8jB6THE9e7H/q/$QWT/call/media/live_to_vod/init -d @r1 -H "Authorization: Bearer $TOK"
{
"live_qhash": "hq__5Zk1jSN8vNLUAXjQwMJV8F8J8ESXNvmVKkhaXySmGc1BXnJPG2FvvaXee4CXqvFHuGuU3fqLJc",
"start_time": "",
"end_time": "",
"recording_period": -1,
"streams": ["video", "audio"],
"variant_key": "default"
}
https://host-76-74-34-194.contentfabric.io/qlibs/ilib24CtWSJeVt9DiAzym8jB6THE9e7H/q/$QWT/call/media/abr_mezzanine/init -H "Authorization: Bearer $TOK" -d @r2
{
"abr_profile": { ... },
"offering_key": "default",
"prod_master_hash": "tqw__HSQHBt7vYxWfCMPH5yXwKTfhdPcQ4Lcs9WUMUbTtnMbTZPTLo4BfJWPMGpoy1Dpv1wWQVtUtAtAr429TnVs",
"variant_key": "default",
"keep_other_streams": false
}
https://host-76-74-34-194.contentfabric.io/qlibs/ilib24CtWSJeVt9DiAzym8jB6THE9e7H/q/$QWT/call/media/live_to_vod/copy -d '{"variant_key":"","offering_key":""}' -H "Authorization: Bearer $TOK"
https://host-76-74-34-194.contentfabric.io/qlibs/ilib24CtWSJeVt9DiAzym8jB6THE9e7H/q/$QWT/call/media/abr_mezzanine/offerings/default/finalize -d '{}' -H "Authorization: Bearer $TOK"
*/
exports.StreamCopyToVod = async function({
name,
targetObjectId,
eventId,
streams=null,
finalize=true,
recordingPeriod=-1,
startTime="",
endTime=""
}) {
const objectId = name;
const abrProfile = require("../abr_profiles/abr_profile_live_to_vod.js");
const status = await this.StreamStatus({name});
const libraryId = status.library_id;
this.Log(`Copying stream ${name} to target ${targetObjectId}`);
ValidateObject(targetObjectId);
const targetLibraryId = await this.ContentObjectLibraryId({objectId: targetObjectId});
// Validation - ensure target object has content encryption keys
const kmsAddress = await this.authClient.KMSAddress({objectId: targetObjectId});
const kmsCapId = `eluv.caps.ikms${this.utils.AddressToHash(kmsAddress)}`;
const kmsCap = await this.ContentObjectMetadata({
libraryId: targetLibraryId,
objectId: targetObjectId,
metadataSubtree: kmsCapId
});
if(!kmsCap) {
throw Error(`No content encryption key set for object ${targetObjectId}`);
}
try {
status.live_object_id = objectId;
const liveHash = await this.LatestVersionHash({objectId, libraryId});
status.live_hash = liveHash;
if(eventId) {
// Retrieve start and end times for the event
let event = await this.CueInfo({eventId, status});
if(event.eventStart && event.eventEnd) {
startTime = event.eventStart;
endTime = event.eventEnd;
}
}
const {writeToken} = await this.EditContentObject({
objectId: targetObjectId,
libraryId: targetLibraryId
});
status.target_object_id = targetObjectId;
status.target_library_id = targetLibraryId;
status.target_write_token = writeToken;
this.Log("Process live source (takes around 20 sec per hour of content)");
await this.CallBitcodeMethod({
libraryId: targetLibraryId,
objectId: targetObjectId,
writeToken,
method: "/media/live_to_vod/init",
body: {
"live_qhash": liveHash,
"start_time": startTime, // eg. "2023-10-03T02:09:02.00Z",
"end_time": endTime, // eg. "2023-10-03T02:15:00.00Z",
"streams": streams,
"recording_period": recordingPeriod,
"variant_key": "default"
},
constant: false,
format: "text"
});
const abrMezInitBody = {
abr_profile: abrProfile,
"offering_key": "default",
"prod_master_hash": writeToken,
"variant_key": "default",
"keep_other_streams": false
};
await this.CallBitcodeMethod({
libraryId: targetLibraryId,
objectId: targetObjectId,
writeToken,
method: "/media/abr_mezzanine/init",
body: abrMezInitBody,
constant: false,
format: "text"
});
try {
await this.CallBitcodeMethod({
libraryId: targetLibraryId,
objectId: targetObjectId,
writeToken,
method: "/media/live_to_vod/copy",
body: {},
constant: false,
format: "text"
});
} catch(error) {
console.error("Unable to call /media/live_to_vod/copy", error);
throw error;
}
await this.CallBitcodeMethod({
libraryId: targetLibraryId,
objectId: targetObjectId,
writeToken,
method: "/media/abr_mezzanine/offerings/default/finalize",
body: abrMezInitBody,
constant: false,
format: "text"
});
if(finalize) {
const finalizeResponse = await this.FinalizeContentObject({
libraryId: targetLibraryId,
objectId: targetObjectId,
writeToken,
commitMessage: "Live Stream to VoD"
});
status.target_hash = finalizeResponse.hash;
}
// Clean up unnecessary status items
delete status.playout_urls;
delete status.lro_status_url;
delete status.recording_period;
delete status.recording_period_sequence;
delete status.edge_meta_size;
delete status.insertions;
return status;
} catch(error) {
this.Log(error, true);
throw error;
}
};
/**
* Remove a watermark for a live stream
*
* @methodGroup Live Stream
* @namedParams
* @param {string=} libraryId - Library ID of the live stream
* @param {string} objectId - Object ID of the live stream
* @param {string=} writeToken - Write token of the draft
* @param {Array<string>} types - Specify which type of watermark to remove. Possible values:
* - "image"
* - "text"
* - "forensic"
* @param {boolean=} finalize - If enabled, target object will be finalized after removing watermark
*
* @return {Promise<Object>} - The finalize response
*/
exports.StreamRemoveWatermark = async function({
libraryId,
objectId,
writeToken,
types,
finalize=true
}) {
ValidateObject(objectId);
if(!libraryId) {
libraryId = await this.ContentObjectLibraryId({objectId});
}
if(!writeToken) {
({writeToken} = await this.EditContentObject({
objectId,
libraryId
}));
}
this.Log(`Removing watermark types: ${types.join(", ")} ${libraryId} ${objectId}`);
const edgeWriteToken = await this.ContentObjectMetadata({
objectId,
libraryId,
metadataSubtree: "/live_recording/fabric_config/edge_write_token"
});
const metadataPath = "live_recording/playout_config";
const objectMetadata = await this.ContentObjectMetadata({
libraryId,
objectId,
writeToken,
metadataSubtree: metadataPath,
resolveLinks: false
});
if(!objectMetadata) {
throw Error("Stream object must be configured before removing a watermark");
}
types.forEach(type => {
if(type === "text") {
delete objectMetadata.simple_watermark;
} else if(type === "image") {
delete objectMetadata.image_watermark;
} else if(type === "forensic") {
delete objectMetadata.forensic_watermark;
}
});
await this.ReplaceMetadata({
libraryId,
objectId,
writeToken,
metadataSubtree: metadataPath,
metadata: objectMetadata
});
if(edgeWriteToken) {
await this.ReplaceMetadata({
libraryId,
objectId,
writeToken: edgeWriteToken,
metadataSubtree: metadataPath,
metadata: objectMetadata
});
}
if(finalize) {
const finalizeResponse = await this.FinalizeContentObject({
libraryId,
objectId,
writeToken,
commitMessage: "Watermark removed"
});
return finalizeResponse;
}
};
/**
* Create a watermark for a live stream
*
* @methodGroup Live Stream
* @namedParams
* @param {string=} libraryId - Library ID of the live stream
* @param {string} objectId - Object ID of the live stream
* @param {string=} writeToken - Write token of the draft
* @param {Object} simpleWatermark - Text watermark
* @param {Object} imageWatermark - Image watermark
* @param {Object} forensicWatermark - Forensic watermark
* @param {boolean=} finalize - If enabled, target object will be finalized after adding watermark
* Watermark examples:
*
* Simple Watermark:
{
"font_color": "",
"font_relative_height": 0,
"shadow": false,
"template": "",
"timecode": "",
"timecode_rate": 0,
"x": "",
"y": ""
}
*
* Image watermark:
{
"image": "",
"align_h": "",
"align_v": "",
"target_video_height": 0,
"wm_enabled": false
}
*
* Forensic watermark:
{
"algo": 6,
"forensic_duration": 0,
"forensic_start": "",
"image_a": <path_to_image>,
"image_b": <path_to_image>,
"is_stub": true,
"payload_bit_nb": 23,
"wm_enabled": true
}
*
*
* @return {Promise<Object>} - The finalize response
*/
exports.StreamAddWatermark = async function({
libraryId,
objectId,
writeToken,
simpleWatermark,
imageWatermark,
forensicWatermark,
finalize=true
}) {
ValidateObject(objectId);
if(!libraryId) {
libraryId = await this.ContentObjectLibraryId({objectId});
}
if(!writeToken) {
({writeToken} = await this.EditContentObject({
objectId,
libraryId
}));
}
const edgeWriteToken = await this.ContentObjectMetadata({
objectId,
libraryId,
metadataSubtree: "/live_recording/fabric_config/edge_write_token"
});
const watermarkType = imageWatermark ? "image" : forensicWatermark ? "forensic" : "text";
const metadataPath = "live_recording/playout_config";
this.Log(`Adding watermarking type: ${watermarkType} ${libraryId} ${objectId}`);
const objectMetadata = await this.ContentObjectMetadata({
libraryId,
objectId,
writeToken,
metadataSubtree: metadataPath,
resolveLinks: false
});
if(!objectMetadata) {
throw Error("Stream object must be configured before adding a watermark");
}
const watermarkArgCount = [simpleWatermark, imageWatermark, forensicWatermark].filter(i => !!i).length;
console.log("watermark arg count", watermarkArgCount)
if(watermarkArgCount === 0) {
throw Error("No watermark was provided");
} else if(watermarkArgCount > 1) {
throw Error("Only one watermark is allowed")
}
if(simpleWatermark) {
objectMetadata.simple_watermark = simpleWatermark;
} else if(imageWatermark) {
objectMetadata.image_watermark = imageWatermark;
} else if(forensicWatermark) {
objectMetadata.forensic_watermark = forensicWatermark;
}
await this.ReplaceMetadata({
libraryId,
objectId,
writeToken,
metadataSubtree: metadataPath,
metadata: objectMetadata
});
if(edgeWriteToken) {
await this.ReplaceMetadata({
libraryId,
objectId,
writeToken: edgeWriteToken,
metadataSubtree: metadataPath,
metadata: objectMetadata
});
}
const response = {
"imageWatermark": objectMetadata.image_watermark,
"textWatermark": objectMetadata.simple_watermark,
"forensicWatermark": objectMetadata.forensic_watermark
};
if(finalize) {
const finalizeResponse = await this.FinalizeContentObject({
libraryId,
objectId,
writeToken,
commitMessage: "Watermark set"
});
response.hash = finalizeResponse.hash;
}
return response;
};
/**
* Audit the specified live stream against several content fabric nodes
*
* @methodGroup Live Stream
* @namedParams
* @param {string=} objectId - Object ID of the live stream
* @param {string=} versionHash - Version hash of the live stream -- if not specified, latest version is returned
* @param {string=} salt - base64-encoded byte sequence for salting the audit hash
* @param {Array<number>=} samples - list of percentages (0.0 - <1.0) used for sampling the content part list, up to 3
* @param {string=} authorizationToken - Additional authorization token for this request
*
* @returns {Promise<Object>} - Response describing audit results
*/
exports.AuditStream = async function({objectId, versionHash, salt, samples, authorizationToken}) {
return await ContentObjectAudit.AuditContentObject({
client: this,
objectId,
versionHash,
salt,
samples,
live: true,
authorizationToken
});
};