/**
* Methods for ABR video creation and management
*
* For more information on how to publish ABR content see <a href="./abr/index.html">this detailed guide</a>
*
* @module ElvClient/ABRPublishing
*/
const R = require("ramda");
const UrlJoin = require("url-join");
const {
ValidateLibrary,
ValidateVersion,
ValidateParameters
} = require("../Validation");
// When `/abr_mezzanine/offerings` contains more than one entry, only 1 is the 'real' offering, the others are
// additional copies to be modified upon finalization due to addlOfferingSpecs having been specified in call to
// `CreateABRMezzanine()`. The 'real' offering will have an object stored in `mez_prep_specs`, the copies will not.
// This function accepts the metadata retrieved from `/abr_mezzanine/offerings` and returns the offering key for the
// 'real' offering that actually has transcode LROs.
// If no suitable offering is found, throws an error.
const MezJobMainOfferingKey = function(abrMezOfferings) {
if(!abrMezOfferings) throw Error("No mezzanine preparation job info found at /abr_mezzanine");
const mainOfferingKey = Object.keys(abrMezOfferings).find(offKey => abrMezOfferings[offKey].mez_prep_specs);
if(!mainOfferingKey) throw Error("Could not determine offering key for last submitted job");
return mainOfferingKey;
};
/**
* Create a master media content object with the given files.
*
* - If uploading using local files, use fileInfo parameter (see UploadFiles for format)
* - If uploading from S3 bucket, use access, filePath and copy, parameters (see UploadFilesFromS3 method)
*
* @methodGroup ABR Publishing
* @namedParams
* @param {string} libraryId - ID of the library
* @param {string=} type - ID or version hash of the content type for this master
* @param {string} name - Name of the content
* @param {string=} description - Description of the content
* @param {string} contentTypeName - Name of the content type to use
* @param {Object=} metadata - Additional metadata for the content object
* @param {Array<Object>=} fileInfo - Files to upload (See UploadFiles/UploadFilesFromS3 method)
* @param {boolean=} encrypt=true - (Local or copied files only) - Unless `false` is passed in explicitly, any uploaded/copied files will be stored encrypted
* @param {boolean=} copy=false - (S3) If specified, files will be copied from S3
* @param {function=} callback - Progress callback for file upload (See UploadFiles/UploadFilesFromS3 method)
* @param {("warn"|"info"|"debug")=} respLogLevel=warn - The level of logging to return in http response
* @param {("none"|"error"|"warn"|"info"|"debug")=} structLogLevel=none - The level of logging to save to object metadata
* @param {Array<Object>=} access=[] - Array of cloud credentials, along with path matching regex strings - Required if any files in the masters are cloud references (currently only AWS S3 is supported)
* - If this parameter is non-empty, all items in fileInfo are assumed to be items in cloud storage
* - Format: [
* - {
* - path_matchers: ["FILE_PATH_MATCH_REGEX_1", "FILE_PATH_MATCH_REGEX_2" ...],
* - remote_access: {
* - protocol: "s3",
* - platform: "aws",
* - path: "YOUR_AWS_S3_BUCKET_NAME" + "/",
* - storage_endpoint: {
* - region: "YOUR_AWS_REGION_NAME"
* - },
* - cloud_credentials: {
* - access_key_id: "YOUR_AWS_S3_ACCESS_KEY",
* - secret_access_key: "YOUR_AWS_S3_SECRET"
* - }
* - }
* - },
* - {
* - path_matchers: [".*"], // <-- catch-all for any remaining unmatched items in fileInfo
* - remote_access: {
* - ...
* - }
* - },
* - ...
* - ]
* -
* - The simplest case is a one element array with .path_matchers == [".*"], in which case the same credentials will be used for all items in fileInfo
*
* @throws {Object} error - If the initialization of the master fails, error details can be found in error.body
* @return {Object} - The finalize response for the object, as well as logs, warnings and errors from the master initialization
*/
exports.CreateProductionMaster = async function({
libraryId,
type,
name,
description,
metadata={},
fileInfo,
encrypt=true,
access=[],
copy=false,
callback,
respLogLevel = "warn",
structLogLevel="none"
}) {
ValidateLibrary(libraryId);
const {id, write_token} = await this.CreateContentObject({
libraryId,
options: type ? { type } : {}
});
// any files specified?
if(fileInfo) {
// are they stored in cloud?
if(access.length > 0) {
// S3 Upload
const s3prefixRegex = /^s3:\/\/([^/]+)\//i; // for matching and extracting bucket name when full s3:// path is specified
// batch the cloud storage files by matching credential set, check each file's source path against credential set path_matchers
for(let i = 0; i < fileInfo.length; i++) {
const oneFileInfo = fileInfo[i];
let matched = false;
for(let j = 0; !matched && j < access.length; j++) {
let credentialSet = access[j];
// strip trailing slash to get bucket name for credential set
const credentialSetBucket = credentialSet.remote_access.path.replace(/\/$/, "");
const matchers = credentialSet.path_matchers;
for(let k = 0; !matched && k < matchers.length; k++) {
const matcher = new RegExp(matchers[k]);
const fileSourcePath = oneFileInfo.source;
if(matcher.test(fileSourcePath)) {
matched = true;
// if full s3 path supplied, check bucket name
const s3prefixMatch = (s3prefixRegex.exec(fileSourcePath));
if(s3prefixMatch) {
const bucketName = s3prefixMatch[1];
if(bucketName !== credentialSetBucket) {
throw Error("Full S3 file path \"" + fileSourcePath + "\" matched to credential set with different bucket name '" + credentialSetBucket + "'");
}
}
if(credentialSet.hasOwnProperty("matched")) {
credentialSet.matched.push(oneFileInfo);
} else {
// first matching file path for this credential set,
// initialize new 'matched' property to 1-element array
credentialSet.matched = [oneFileInfo];
}
}
}
}
if(!matched) {
throw Error("no credential set found for file path: \"" + filePath + "\"");
}
}
// iterate over credential sets, if any matching files were found, upload them using that credential set
for(let i = 0; i < access.length; i++) {
const credentialSet = access[i];
if(credentialSet.hasOwnProperty("matched") && credentialSet.matched.length > 0) {
const region = credentialSet.remote_access.storage_endpoint.region;
const bucket = credentialSet.remote_access.path.replace(/\/$/, "");
const accessKey = credentialSet.remote_access.cloud_credentials.access_key_id;
const secret = credentialSet.remote_access.cloud_credentials.secret_access_key;
await this.UploadFilesFromS3({
libraryId,
objectId: id,
writeToken: write_token,
fileInfo: credentialSet.matched,
region,
bucket,
accessKey,
secret,
copy,
callback,
encryption: encrypt ? "cgck" : "none"
});
}
}
} else {
await this.UploadFiles({
libraryId,
objectId: id,
writeToken: write_token,
fileInfo,
callback,
encryption: encrypt ? "cgck" : "none"
});
}
}
await this.CreateEncryptionConk({libraryId, objectId: id, writeToken: write_token, createKMSConk: true});
const { logs, errors, warnings } = await this.CallBitcodeMethod({
libraryId,
objectId: id,
writeToken: write_token,
method: UrlJoin("media", "production_master", "init"),
queryParams: {
response_log_level: respLogLevel,
struct_log_level: structLogLevel
},
body: {
access
},
constant: false
});
await this.MergeMetadata({
libraryId,
objectId: id,
writeToken: write_token,
metadata: {
...(metadata || {}),
name,
description,
reference: access && !copy,
public: {
...((metadata || {}).public || {}),
name: name || "",
description: description || ""
},
elv_created_at: new Date().getTime(),
}
});
const finalizeResponse = await this.FinalizeContentObject({
libraryId,
objectId: id,
writeToken: write_token,
commitMessage: "Create master",
awaitCommitConfirmation: false
});
return {
errors: errors || [],
logs: logs || [],
warnings: warnings || [],
...finalizeResponse
};
};
/**
* Create (or edit) a mezzanine offering based on the a given master content object version and variant key
*
* @methodGroup ABR Publishing
* @namedParams
* @param {Object=} abrProfile - Custom ABR profile. If not specified, the profile of the mezzanine library will be used
* @param {Object=} addlOfferingSpecs - Specs for additional offerings to create by patching the offering being created/edited
* @param {string=} description - Description for mezzanine content object
* @param {boolean=} keepOtherStreams=false - If objectId is specified, whether to preserve existing streams with keys other than the ones specified in production master
* @param {string} libraryId - ID of the mezzanine library
* @param {string} masterVersionHash - The version hash of the production master content object
* @param {Object=} metadata - Additional metadata for mezzanine content object
* @param {string} name - Name for mezzanine content object
* @param {string=} objectId - ID of existing object (if not specified, new object will be created)
* @param {string=} offeringKey=default - The key of the offering to create
* @param {("warn"|"info"|"debug")=} respLogLevel=warn - The level of logging to return in http response
* @param {("none"|"error"|"warn"|"info"|"debug")=} structLogLevel=none - The level of logging to save to object metadata
* @param {Array<string>} streamKeys - List of stream keys from variant to include. If not supplied all streams will be included.
* @param {string=} type - ID or version hash of the content type for the mezzanine
* @param {string=} variant=default - What variant of the master content object to use
*
* @return {Object} - The finalize response for the object, as well as logs, warnings and errors from the mezzanine initialization
*/
exports.CreateABRMezzanine = async function({
libraryId,
objectId,
type,
name,
description,
metadata,
masterVersionHash,
abrProfile,
addlOfferingSpecs,
variant="default",
offeringKey="default",
keepOtherStreams= false,
respLogLevel = "warn",
structLogLevel="none",
streamKeys
}) {
ValidateLibrary(libraryId);
ValidateVersion(masterVersionHash);
if(!masterVersionHash) {
throw Error("Master version hash not specified");
}
if(!objectId && (keepOtherStreams)) {
throw Error("Existing mezzanine object ID required in order to use 'keepOtherStreams'");
}
if(addlOfferingSpecs && !abrProfile) {
throw Error("abrProfile required when using addlOfferingSpecs");
}
const existingMez = !!objectId;
let options = type ? { type } : {};
let id, write_token;
if(existingMez) {
// Edit existing
const editResponse = await this.EditContentObject({
libraryId,
objectId,
options
});
id = editResponse.id;
write_token = editResponse.write_token;
} else {
// Create new
const createResponse = await this.CreateContentObject({
libraryId,
options
});
id = createResponse.id;
write_token = createResponse.write_token;
}
await this.CreateEncryptionConk({libraryId, objectId: id, writeToken: write_token, createKMSConk: true});
const masterName = await this.ContentObjectMetadata({
versionHash: masterVersionHash,
metadataSubtree: "public/name"
});
// Include authorization for library, master, and mezzanine
let authorizationTokens = [];
authorizationTokens.push(await this.authClient.AuthorizationToken({libraryId, objectId: id, update: true}));
authorizationTokens.push(await this.authClient.AuthorizationToken({libraryId}));
authorizationTokens.push(await this.authClient.AuthorizationToken({versionHash: masterVersionHash}));
const headers = {
Authorization: authorizationTokens.map(token => `Bearer ${token}`).join(",")
};
const body = {
additional_offering_specs: addlOfferingSpecs,
offering_key: offeringKey,
keep_other_streams: keepOtherStreams,
prod_master_hash: masterVersionHash,
stream_keys: streamKeys,
variant_key: variant
};
let storeClear = false;
if(abrProfile) {
body.abr_profile = abrProfile;
storeClear = abrProfile.store_clear;
} else {
// Retrieve ABR profile from library to check store clear
storeClear = await this.ContentObjectMetadata({
libraryId,
objectId: this.utils.AddressToObjectId(this.utils.HashToAddress(libraryId)),
metadataSubtree: "abr_profile/store_clear"
});
}
if(!storeClear) {
// If mez parts are to be encrypted, generate encryption conks
await this.EncryptionConk({
libraryId,
objectId: id,
writeToken: write_token
});
}
const {logs, errors, warnings} = await this.CallBitcodeMethod({
libraryId,
objectId: id,
writeToken: write_token,
method: UrlJoin("media", "abr_mezzanine", "init"),
queryParams: {
response_log_level: respLogLevel,
struct_log_level: structLogLevel
},
headers,
body,
constant: false
});
if(!metadata) { metadata = {}; }
if(!metadata.public) { metadata.public = {}; }
if(!metadata.public.asset_metadata) { metadata.public.asset_metadata = {}; }
metadata.master = {
name: masterName,
id: this.utils.DecodeVersionHash(masterVersionHash).objectId,
hash: masterVersionHash,
variant
};
metadata.public = {
...metadata.public
};
metadata.public.asset_metadata = {
sources: {
[offeringKey]: {
"/": `./rep/playout/${offeringKey}/options.json`
}
},
...metadata.public.asset_metadata
};
if(name || !existingMez) {
metadata.name = name || `${masterName} Mezzanine`;
metadata.public.name = name || `${masterName} Mezzanine`;
}
if(description || !existingMez) {
metadata.description = description || "";
metadata.public.description = description || "";
}
// retrieve existing metadata to merge with updated metadata
const existingMetadata = await this.ContentObjectMetadata({
libraryId,
objectId: id,
writeToken: write_token,
});
// newer metadata values replace existing metadata, unless both new and old values are objects,
// in which case their keys are merged recursively
metadata = R.mergeDeepRight(existingMetadata, metadata);
if(!existingMez) {
// set creation date
metadata.elv_created_at = new Date().getTime();
}
// write metadata to mezzanine object
await this.ReplaceMetadata({
libraryId,
objectId: id,
writeToken: write_token,
metadata
});
const finalizeResponse = await this.FinalizeContentObject({
libraryId,
objectId: id,
writeToken: write_token,
commitMessage: "Create ABR mezzanine"
});
return {
logs: logs || [],
warnings: warnings || [],
errors: errors || [],
...finalizeResponse
};
};
/**
* Start transcoding jobs previously set up by CreateABRMezzanine() on the specified mezzanine
*
* @methodGroup ABR Publishing
* @namedParams
* @param {string} libraryId - ID of the mezzanine library
* @param {string} objectId - ID of the mezzanine object
* @param {Array<Object>=} access - Array of S3 credentials, along with path matching regexes - Required if any files in the masters are S3 references (See CreateProductionMaster method)
* - Format: {region, bucket, accessKey, secret}
* @param {number[]} jobIndexes - Array of LRO job indexes to start. LROs are listed in a map under metadata key /abr_mezzanine/offerings/(offeringKey)/mez_prep_specs/, and job indexes start with 0, corresponding to map keys in alphabetical order
*
* @return {Promise<Object>} - A write token for the mezzanine object, as well as any logs, warnings and errors from the job initialization
*/
exports.StartABRMezzanineJobs = async function({
libraryId,
objectId,
access=[],
jobIndexes = null
}) {
ValidateParameters({libraryId, objectId});
const lastJobOfferingsInfo = await this.ContentObjectMetadata({
libraryId,
objectId,
metadataSubtree: UrlJoin("abr_mezzanine", "offerings")
});
const offeringKey = MezJobMainOfferingKey(lastJobOfferingsInfo);
const prepSpecs = lastJobOfferingsInfo[offeringKey].mez_prep_specs;
if(!prepSpecs) throw Error("No stream preparation specs found");
// Retrieve all masters associated with this offering
let masterVersionHashes = Object.keys(prepSpecs).map(spec =>
(prepSpecs[spec].source_streams || []).map(stream => stream.source_hash)
);
// Flatten and filter
masterVersionHashes = [].concat.apply([], masterVersionHashes)
.filter(hash => hash)
.filter((v, i, a) => a.indexOf(v) === i);
// Retrieve authorization tokens for all masters and the mezzanine
let authorizationTokens = await Promise.all(
masterVersionHashes.map(async versionHash => await this.authClient.AuthorizationToken({versionHash}))
);
authorizationTokens = [
await this.authClient.AuthorizationToken({libraryId, objectId, update: true}),
...authorizationTokens
];
const headers = {
Authorization: authorizationTokens.map(token => `Bearer ${token}`).join(",")
};
const processingDraft = await this.EditContentObject({libraryId, objectId});
const lroInfo = {
write_token: processingDraft.write_token,
node: processingDraft.nodeUrl,
offering: offeringKey
};
// Update metadata with LRO version write token
const statusDraft = await this.EditContentObject({libraryId, objectId});
await this.ReplaceMetadata({
libraryId,
objectId,
writeToken: statusDraft.write_token,
metadataSubtree: "lro_draft",
metadata: lroInfo
});
const finalizeResponse = await this.FinalizeContentObject({
libraryId,
objectId,
writeToken: statusDraft.write_token,
commitMessage: "Mezzanine LRO status"
});
const {data, errors, warnings, logs} = await this.CallBitcodeMethod({
libraryId,
objectId,
writeToken: processingDraft.write_token,
headers,
method: UrlJoin("media", "abr_mezzanine", "prep_start"),
constant: false,
body: {
access,
offering_key: offeringKey,
job_indexes: jobIndexes
}
});
return {
hash: finalizeResponse.hash,
lro_draft: lroInfo,
writeToken: processingDraft.write_token,
nodeUrl: processingDraft.nodeUrl,
data,
logs: logs || [],
warnings: warnings || [],
errors: errors || []
};
};
/**
* Retrieve node and write token for a mezzanine's current offering preparation job (if any).
* Also returns the offering key.
*
* @methodGroup ABR Publishing
* @namedParams
* @param {string} libraryId - ID of the library
* @param {string} objectId - ID of the object
*
* @return {Promise<Object>} - LRO status
*/
exports.LRODraftInfo = async function({libraryId, objectId}) {
const standardPathContents = await this.ContentObjectMetadata({
libraryId,
objectId,
metadataSubtree: "lro_draft"
});
if(standardPathContents) return standardPathContents;
// get last job info, under /abr_mezzanine/offerings/
const lastJobOfferingsInfo = await this.ContentObjectMetadata({
libraryId,
objectId,
metadataSubtree: UrlJoin("abr_mezzanine", "offerings")
});
if(!lastJobOfferingsInfo) throw Error("No metadata for mezzanine preparation job found at /abr_mezzanine");
const mainOfferingKey = MezJobMainOfferingKey(lastJobOfferingsInfo);
if(!mainOfferingKey) throw Error("Could not determine offering key for last submitted job");
// see if offering from last job was finalized
const ready = lastJobOfferingsInfo[mainOfferingKey].ready;
// old location for LRO draft info
const oldPathContents = await this.ContentObjectMetadata({
libraryId,
objectId,
metadataSubtree: `lro_draft_${mainOfferingKey}`
});
if(oldPathContents) {
return oldPathContents;
} else {
if(ready) {
throw Error("No LRO draft found for this mezzanine - looks like last mez prep job was already finalized.");
} else {
throw Error("No LRO draft found for this mezzanine - looks like last mez prep job was either cancelled or discarded.");
}
}
};
/**
* Retrieve status information for mezzanine transcoding jobs, aka long running operations (LROs) on the given object.
*
* @methodGroup ABR Publishing
* @namedParams
* @param {string} libraryId - ID of the library
* @param {string} objectId - ID of the object
*
* @return {Promise<Object>} - LRO status
*/
exports.LROStatus = async function({libraryId, objectId}) {
ValidateParameters({libraryId, objectId});
const lroDraft = await this.LRODraftInfo({libraryId, objectId});
this.RecordWriteToken({writeToken: lroDraft.write_token, fabricNodeUrl: lroDraft.node});
return await this.ContentObjectMetadata({
libraryId,
objectId,
writeToken: lroDraft.write_token,
metadataSubtree: "lro_status"
});
};
/**
* Finalize a mezzanine object after all jobs have finished
*
* @methodGroup ABR Publishing
* @namedParams
* @param {string} libraryId - ID of the mezzanine library
* @param {string} objectId - ID of the mezzanine object
* @param {string} writeToken - Write token for the mezzanine object
* @param {function=} preFinalizeFn - A function to call before finalizing changes, to allow further modifications to offering. The function will be invoked with {elvClient, nodeUrl, writeToken} to allow access to the draft and MUST NOT finalize the draft.
* @param {boolean=} preFinalizeThrow - If set to `true` then any error thrown by preFinalizeFn will not be caught. Otherwise, any exception will be appended to the `warnings` array returned after finalization.
*
* @return {Promise<Object>} - The finalize response for the mezzanine object, as well as any logs, warnings and errors from the finalization
*/
exports.FinalizeABRMezzanine = async function({libraryId, objectId, preFinalizeFn, preFinalizeThrow}) {
ValidateParameters({libraryId, objectId});
const lroDraft = await this.LRODraftInfo({libraryId, objectId});
// tell http client what node to contact for this write token
this.RecordWriteToken({writeToken: lroDraft.write_token, fabricNodeUrl: lroDraft.node});
const lastJobOfferingsInfo = await this.ContentObjectMetadata({
libraryId,
objectId,
writeToken: lroDraft.write_token,
metadataSubtree: UrlJoin("abr_mezzanine", "offerings")
});
const offeringKey = MezJobMainOfferingKey(lastJobOfferingsInfo);
const masterHash = lastJobOfferingsInfo[offeringKey].prod_master_hash;
// Authorization token for mezzanine and master
let authorizationTokens = [
await this.authClient.AuthorizationToken({libraryId, objectId, update: true}),
await this.authClient.AuthorizationToken({versionHash: masterHash})
];
const headers = {
Authorization: authorizationTokens.map(token => `Bearer ${token}`).join(",")
};
const {data, errors, warnings, logs} = await this.CallBitcodeMethod({
objectId,
libraryId,
writeToken: lroDraft.write_token,
method: UrlJoin("media", "abr_mezzanine", "offerings", offeringKey, "finalize"),
headers,
constant: false
});
let preFinalizeWarnings = [];
if(preFinalizeFn) {
const params = {
nodeUrl: lroDraft.node,
offeringKey,
writeToken: lroDraft.write_token
};
try {
await preFinalizeFn(params);
} catch(err) {
if(preFinalizeThrow) {
// noinspection ExceptionCaughtLocallyJS
throw new Error("Error running preFinalize function", {cause: err});
} else {
preFinalizeWarnings = `Error running preFinalize function: ${err}`;
}
}
}
const finalizeResponse = await this.FinalizeContentObject({
libraryId,
objectId: objectId,
writeToken: lroDraft.write_token,
commitMessage: "Finalize ABR mezzanine",
awaitCommitConfirmation: false
});
return {
data,
logs: logs || [],
warnings: (warnings || []).concat(preFinalizeWarnings),
errors: errors || [],
...finalizeResponse
};
};