Spaces:
Build error
Build error
import { ParsedStream, UserData } from '../db/schemas'; | |
import { | |
createLogger, | |
getTimeTakenSincePoint, | |
DSU, | |
getSimpleTextHash, | |
} from '../utils'; | |
const logger = createLogger('deduplicator'); | |
class StreamDeduplicator { | |
private userData: UserData; | |
constructor(userData: UserData) { | |
this.userData = userData; | |
} | |
public async deduplicate(streams: ParsedStream[]): Promise<ParsedStream[]> { | |
let deduplicator = this.userData.deduplicator; | |
if (!deduplicator || !deduplicator.enabled) { | |
return streams; | |
} | |
const start = Date.now(); | |
const deduplicationKeys = deduplicator.keys || ['filename', 'infoHash']; | |
deduplicator = { | |
enabled: true, | |
keys: deduplicationKeys, | |
cached: deduplicator.cached || 'per_addon', | |
uncached: deduplicator.uncached || 'per_addon', | |
p2p: deduplicator.p2p || 'per_addon', | |
http: deduplicator.http || 'disabled', | |
live: deduplicator.live || 'disabled', | |
youtube: deduplicator.youtube || 'disabled', | |
external: deduplicator.external || 'disabled', | |
}; | |
// Group streams by their deduplication keys | |
// const streamGroups = new Map<string, ParsedStream[]>(); | |
const dsu = new DSU<string>(); | |
const keyToStreamIds = new Map<string, string[]>(); | |
for (const stream of streams) { | |
// Create a unique key based on the selected deduplication methods | |
dsu.makeSet(stream.id); | |
const currentStreamKeyStrings: string[] = []; | |
if (deduplicationKeys.includes('filename') && stream.filename) { | |
let normalisedFilename = stream.filename | |
.replace( | |
/(mkv|mp4|avi|mov|wmv|flv|webm|m4v|mpg|mpeg|3gp|3g2|m2ts|ts|vob|ogv|ogm|divx|xvid|rm|rmvb|asf|mxf|mka|mks|mk3d|webm|f4v|f4p|f4a|f4b)$/i, | |
'' | |
) | |
.replace(/[^\p{L}\p{N}+]/gu, '') | |
.replace(/\s+/g, '') | |
.toLowerCase(); | |
currentStreamKeyStrings.push(`filename:${normalisedFilename}`); | |
} | |
if (deduplicationKeys.includes('infoHash') && stream.torrent?.infoHash) { | |
currentStreamKeyStrings.push(`infoHash:${stream.torrent.infoHash}`); | |
} | |
if (deduplicationKeys.includes('smartDetect')) { | |
// generate a hash using many different attributes | |
// round size to nearest 100MB for some margin of error | |
const roundedSize = stream.size | |
? Math.round(stream.size / 100000000) * 100000000 | |
: undefined; | |
const hash = getSimpleTextHash( | |
`${roundedSize}${stream.parsedFile?.resolution}${stream.parsedFile?.quality}${stream.parsedFile?.visualTags}${stream.parsedFile?.audioTags}${stream.parsedFile?.languages}${stream.parsedFile?.encode}` | |
); | |
currentStreamKeyStrings.push(`smartDetect:${hash}`); | |
} | |
if (currentStreamKeyStrings.length > 0) { | |
for (const key of currentStreamKeyStrings) { | |
if (!keyToStreamIds.has(key)) { | |
keyToStreamIds.set(key, []); | |
} | |
keyToStreamIds.get(key)!.push(stream.id); | |
} | |
} | |
} | |
// Perform union operations based on shared keys | |
for (const streamIdsSharingCommonKey of keyToStreamIds.values()) { | |
if (streamIdsSharingCommonKey.length > 1) { | |
const firstStreamId = streamIdsSharingCommonKey[0]; | |
for (let i = 1; i < streamIdsSharingCommonKey.length; i++) { | |
dsu.union(firstStreamId, streamIdsSharingCommonKey[i]); | |
} | |
} | |
} | |
// Group actual stream objects by their DSU representative ID | |
const idToStreamMap = new Map(streams.map((s) => [s.id, s])); // For quick lookup | |
const finalDuplicateGroupsMap = new Map<string, ParsedStream[]>(); // Maps representative ID to stream objects | |
for (const stream of streams) { | |
const representativeId = dsu.find(stream.id); | |
if (!finalDuplicateGroupsMap.has(representativeId)) { | |
finalDuplicateGroupsMap.set(representativeId, []); | |
} | |
finalDuplicateGroupsMap.get(representativeId)!.push(stream); | |
} | |
const processedStreams = new Set<ParsedStream>(); | |
for (const group of finalDuplicateGroupsMap.values()) { | |
// Group streams by type | |
const streamsByType = new Map<string, ParsedStream[]>(); | |
for (const stream of group) { | |
let type = stream.type as string; | |
if ((type === 'debrid' || type === 'usenet') && stream.service) { | |
type = stream.service.cached ? 'cached' : 'uncached'; | |
} | |
const typeGroup = streamsByType.get(type) || []; | |
typeGroup.push(stream); | |
streamsByType.set(type, typeGroup); | |
} | |
// Process each type according to its deduplication mode | |
for (const [type, typeStreams] of streamsByType.entries()) { | |
const mode = deduplicator[type as keyof typeof deduplicator] as string; | |
if (mode === 'disabled') { | |
typeStreams.forEach((stream) => processedStreams.add(stream)); | |
continue; | |
} | |
switch (mode) { | |
case 'single_result': { | |
// Keep one result with highest priority service and addon | |
let selectedStream = typeStreams.sort((a, b) => { | |
// so a specific type may either have both streams not have a service, or both streams have a service | |
// if both streams have a service, then we can simpl | |
let aProviderIndex = | |
this.userData.services | |
?.filter((service) => service.enabled) | |
.findIndex((service) => service.id === a.service?.id) ?? 0; | |
let bProviderIndex = | |
this.userData.services | |
?.filter((service) => service.enabled) | |
.findIndex((service) => service.id === b.service?.id) ?? 0; | |
aProviderIndex = | |
aProviderIndex === -1 ? Infinity : aProviderIndex; | |
bProviderIndex = | |
bProviderIndex === -1 ? Infinity : bProviderIndex; | |
if (aProviderIndex !== bProviderIndex) { | |
return aProviderIndex - bProviderIndex; | |
} | |
// look at seeders for p2p and uncached streams | |
if ( | |
(type === 'p2p' || type === 'uncached') && | |
a.torrent?.seeders && | |
b.torrent?.seeders | |
) { | |
return (b.torrent.seeders || 0) - (a.torrent.seeders || 0); | |
} | |
// now look at the addon index | |
const aAddonIndex = this.userData.presets.findIndex( | |
(preset) => preset.instanceId === a.addon.presetInstanceId | |
); | |
const bAddonIndex = this.userData.presets.findIndex( | |
(preset) => preset.instanceId === b.addon.presetInstanceId | |
); | |
// the addon index MUST exist, its not possible for it to not exist | |
if (aAddonIndex !== bAddonIndex) { | |
return aAddonIndex - bAddonIndex; | |
} | |
// now look at stream type | |
let aTypeIndex = | |
this.userData.preferredStreamTypes?.findIndex( | |
(type) => type === a.type | |
) ?? 0; | |
let bTypeIndex = | |
this.userData.preferredStreamTypes?.findIndex( | |
(type) => type === b.type | |
) ?? 0; | |
aTypeIndex = aTypeIndex === -1 ? Infinity : aTypeIndex; | |
bTypeIndex = bTypeIndex === -1 ? Infinity : bTypeIndex; | |
if (aTypeIndex !== bTypeIndex) { | |
return aTypeIndex - bTypeIndex; | |
} | |
return 0; | |
})[0]; | |
processedStreams.add(selectedStream); | |
break; | |
} | |
case 'per_service': { | |
// Keep one result from each service (highest priority available addon for that service) | |
// first, ensure that all streams have a service, otherwise we can't use this mode | |
if (typeStreams.some((stream) => !stream.service)) { | |
throw new Error( | |
'per_service mode requires all streams to have a service' | |
); | |
} | |
let perServiceStreams = Object.values( | |
typeStreams.reduce( | |
(acc, stream) => { | |
acc[stream.service!.id] = acc[stream.service!.id] || []; | |
acc[stream.service!.id].push(stream); | |
return acc; | |
}, | |
{} as Record<string, ParsedStream[]> | |
) | |
).map((serviceStreams) => { | |
return serviceStreams.sort((a, b) => { | |
let aAddonIndex = this.userData.presets.findIndex( | |
(preset) => preset.instanceId === a.addon.presetInstanceId | |
); | |
let bAddonIndex = this.userData.presets.findIndex( | |
(preset) => preset.instanceId === b.addon.presetInstanceId | |
); | |
aAddonIndex = aAddonIndex === -1 ? Infinity : aAddonIndex; | |
bAddonIndex = bAddonIndex === -1 ? Infinity : bAddonIndex; | |
if (aAddonIndex !== bAddonIndex) { | |
return aAddonIndex - bAddonIndex; | |
} | |
// now look at stream type | |
let aTypeIndex = | |
this.userData.preferredStreamTypes?.findIndex( | |
(type) => type === a.type | |
) ?? 0; | |
let bTypeIndex = | |
this.userData.preferredStreamTypes?.findIndex( | |
(type) => type === b.type | |
) ?? 0; | |
aTypeIndex = aTypeIndex === -1 ? Infinity : aTypeIndex; | |
bTypeIndex = bTypeIndex === -1 ? Infinity : bTypeIndex; | |
if (aTypeIndex !== bTypeIndex) { | |
return aTypeIndex - bTypeIndex; | |
} | |
// look at seeders for p2p and uncached streams | |
if (type === 'p2p' || type === 'uncached') { | |
return (b.torrent?.seeders || 0) - (a.torrent?.seeders || 0); | |
} | |
return 0; | |
})[0]; | |
}); | |
for (const stream of perServiceStreams) { | |
processedStreams.add(stream); | |
} | |
break; | |
} | |
case 'per_addon': { | |
if (typeStreams.some((stream) => !stream.addon)) { | |
throw new Error( | |
'per_addon mode requires all streams to have an addon' | |
); | |
} | |
let perAddonStreams = Object.values( | |
typeStreams.reduce( | |
(acc, stream) => { | |
acc[stream.addon.presetInstanceId] = | |
acc[stream.addon.presetInstanceId] || []; | |
acc[stream.addon.presetInstanceId].push(stream); | |
return acc; | |
}, | |
{} as Record<string, ParsedStream[]> | |
) | |
).map((addonStreams) => { | |
return addonStreams.sort((a, b) => { | |
let aServiceIndex = | |
this.userData.services | |
?.filter((service) => service.enabled) | |
.findIndex((service) => service.id === a.service?.id) ?? 0; | |
let bServiceIndex = | |
this.userData.services | |
?.filter((service) => service.enabled) | |
.findIndex((service) => service.id === b.service?.id) ?? 0; | |
aServiceIndex = aServiceIndex === -1 ? Infinity : aServiceIndex; | |
bServiceIndex = bServiceIndex === -1 ? Infinity : bServiceIndex; | |
if (aServiceIndex !== bServiceIndex) { | |
return aServiceIndex - bServiceIndex; | |
} | |
if (type === 'p2p' || type === 'uncached') { | |
return (b.torrent?.seeders || 0) - (a.torrent?.seeders || 0); | |
} | |
return 0; | |
})[0]; | |
}); | |
for (const stream of perAddonStreams) { | |
processedStreams.add(stream); | |
} | |
break; | |
} | |
} | |
} | |
} | |
let deduplicatedStreams = Array.from(processedStreams); | |
logger.info( | |
`Filtered out ${streams.length - deduplicatedStreams.length} duplicate streams to ${deduplicatedStreams.length} streams in ${getTimeTakenSincePoint(start)}` | |
); | |
return deduplicatedStreams; | |
} | |
} | |
export default StreamDeduplicator; | |