Spaces:
Build error
Build error
File size: 7,925 Bytes
0bfe2e3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 |
import {
Addon,
ParsedStream,
StrictManifestResource,
UserData,
} from '../db/schemas';
import { constants, createLogger, getTimeTakenSincePoint } from '../utils';
import { Wrapper } from '../wrapper';
import { GroupConditionEvaluator } from '../parser/streamExpression';
import { getAddonName } from '../utils/general';
import StreamFilter from './filterer';
import StreamPrecompute from './precomputer';
const logger = createLogger('fetcher');
class StreamFetcher {
private userData: UserData;
private filter: StreamFilter;
private precompute: StreamPrecompute;
constructor(userData: UserData) {
this.userData = userData;
this.filter = new StreamFilter(userData);
this.precompute = new StreamPrecompute(userData);
}
public async fetch(
addons: Addon[],
type: string,
id: string
): Promise<{
streams: ParsedStream[];
errors: {
title: string;
description: string;
}[];
}> {
const allErrors: {
title: string;
description: string;
}[] = [];
let allStreams: ParsedStream[] = [];
const start = Date.now();
let totalTimeTaken = 0;
let previousGroupStreams: ParsedStream[] = [];
let previousGroupTimeTaken = 0;
// Helper function to fetch streams from an addon and log summary
const fetchFromAddon = async (addon: Addon) => {
let summaryMsg = '';
const start = Date.now();
try {
const streams = await new Wrapper(addon).getStreams(type, id);
const errorStreams = streams.filter(
(s) => s.type === constants.ERROR_STREAM_TYPE
);
const addonErrors = errorStreams.map((s) => ({
title: `[β] ${s.error?.title || getAddonName(addon)}`,
description: s.error?.description || 'Unknown error',
}));
if (errorStreams.length > 0) {
logger.error(
`Found ${errorStreams.length} error streams from ${getAddonName(addon)}`,
{
errorStreams: errorStreams.map((s) => s.error?.title),
}
);
}
summaryMsg = `
ββββββββββββββββββββββββββββββββββββββ
${errorStreams.length > 0 ? 'π ' : 'π’'} [${getAddonName(addon)}] Scrape Summary
ββββββββββββββββββββββββββββββββββββββ
β Status : ${errorStreams.length > 0 ? 'PARTIAL SUCCESS' : 'SUCCESS'}
π¦ Streams : ${streams.length}
${errorStreams.length > 0 ? ` β Errors : ${errorStreams.map((s) => ` β’ ${s.error?.title || 'Unknown error'}: ${s.error?.description || 'No description'}`).join('\n')}` : ''}
π Details : ${
errorStreams.length > 0
? `Found errors:\n${errorStreams.map((s) => ` β’ ${s.error?.title || 'Unknown error'}: ${s.error?.description || 'No description'}`).join('\n')}`
: 'Successfully fetched streams.'
}
β±οΈ Time : ${getTimeTakenSincePoint(start)}
ββββββββββββββββββββββββββββββββββββββ`;
return {
success: true as const,
streams: streams.filter(
(s) => s.type !== constants.ERROR_STREAM_TYPE
),
errors: addonErrors,
timeTaken: Date.now() - start,
};
} catch (error) {
const errMsg = error instanceof Error ? error.message : String(error);
const addonErrors = {
title: `[β] ${getAddonName(addon)}`,
description: errMsg,
};
summaryMsg = `
ββββββββββββββββββββββββββββββββββββββ
π΄ [${getAddonName(addon)}] Scrape Summary
ββββββββββββββββββββββββββββββββββββββ
β Status : FAILED
π« Error : ${errMsg}
β±οΈ Time : ${getTimeTakenSincePoint(start)}
ββββββββββββββββββββββββββββββββββββββ`;
return {
success: false as const,
errors: [addonErrors],
timeTaken: 0,
streams: [],
};
} finally {
logger.info(summaryMsg);
}
};
// Helper function to fetch from a group of addons and track time
const fetchFromGroup = async (addons: Addon[]) => {
const groupStart = Date.now();
const results = await Promise.all(addons.map(fetchFromAddon));
const groupStreams = results.flatMap((r) => r.streams);
const groupErrors = results.flatMap((r) => r.errors);
allErrors.push(...groupErrors);
const filteredStreams = await this.filter.filter(groupStreams, type, id);
await this.precompute.precompute(filteredStreams);
const groupTime = Date.now() - groupStart;
logger.info(`Filtered to ${filteredStreams.length} streams`);
return {
totalTime: groupTime,
streams: filteredStreams,
};
};
// If groups are configured, handle group-based fetching
if (this.userData.groups && this.userData.groups.length > 0) {
// Always fetch from first group
const firstGroupAddons = addons.filter(
(addon) =>
addon.presetInstanceId &&
this.userData.groups![0].addons.includes(addon.presetInstanceId)
);
logger.info(
`Fetching streams from first group with ${firstGroupAddons.length} addons`
);
// Fetch streams from first group
const firstGroupResult = await fetchFromGroup(firstGroupAddons);
allStreams.push(...firstGroupResult.streams);
totalTimeTaken = firstGroupResult.totalTime;
previousGroupStreams = firstGroupResult.streams;
previousGroupTimeTaken = firstGroupResult.totalTime;
// For each subsequent group, evaluate condition and fetch if true
for (let i = 1; i < this.userData.groups.length; i++) {
const group = this.userData.groups[i];
// Skip if no condition or addons
if (!group.condition || !group.addons.length) continue;
try {
const evaluator = new GroupConditionEvaluator(
previousGroupStreams,
allStreams,
previousGroupTimeTaken,
totalTimeTaken,
type
);
const shouldFetch = await evaluator.evaluate(group.condition);
if (shouldFetch) {
logger.info(`Condition met for group ${i + 1}, fetching streams`);
const groupAddons = addons.filter(
(addon) =>
addon.presetInstanceId &&
group.addons.includes(addon.presetInstanceId)
);
const groupResult = await fetchFromGroup(groupAddons);
allStreams.push(...groupResult.streams);
totalTimeTaken += groupResult.totalTime;
previousGroupStreams = groupResult.streams;
previousGroupTimeTaken = groupResult.totalTime;
} else {
logger.info(
`Condition not met for group ${i + 1}, skipping remaining groups`
);
// if we meet a group whose condition is not met, we do not need to fetch from any subsequent groups
break;
}
} catch (error) {
logger.error(`Error evaluating condition for group ${i}:`, error);
continue;
}
}
} else {
// If no groups configured, fetch from all addons in parallel
const result = await fetchFromGroup(addons);
allStreams.push(...result.streams);
totalTimeTaken = result.totalTime;
}
logger.info(
`Fetched ${allStreams.length} streams from ${addons.length} addons in ${getTimeTakenSincePoint(start)}`
);
return { streams: allStreams, errors: allErrors };
}
}
export default StreamFetcher;
|