Spaces:
Running
on
Zero
Running
on
Zero
import json | |
import random | |
import pandas as pd | |
import torch | |
from transformers import AutoModelForCausalLM, AutoTokenizer | |
SYSTEM_PROMPT_I2V = """ | |
You are an expert in video captioning. You are given a structured video caption and you need to compose it to be more natural and fluent in English. | |
## Structured Input | |
{structured_input} | |
## Notes | |
1. If there has an empty field, just ignore it and do not mention it in the output. | |
2. Do not make any semantic changes to the original fields. Please be sure to follow the original meaning. | |
3. If the action field is not empty, eliminate the irrelevant information in the action field that is not related to the timing action(such as wearings, background and environment information) to make a pure action field. | |
## Output Principles and Orders | |
1. First, eliminate the static information in the action field that is not related to the timing action, such as background or environment information. | |
2. Second, describe each subject with its pure action and expression if these fields exist. | |
## Output | |
Please directly output the final composed caption without any additional information. | |
""" | |
SYSTEM_PROMPT_T2V = """ | |
You are an expert in video captioning. You are given a structured video caption and you need to compose it to be more natural and fluent in English. | |
## Structured Input | |
{structured_input} | |
## Notes | |
1. According to the action field information, change its name field to the subject pronoun in the action. | |
2. If there has an empty field, just ignore it and do not mention it in the output. | |
3. Do not make any semantic changes to the original fields. Please be sure to follow the original meaning. | |
## Output Principles and Orders | |
1. First, describe the shot_type, then describe the shot_angle and the shot_position fields in natural and fluent. | |
2. Second, eliminate information in the action field that is not related to the timing action, such as background or environment information if action is not empty. | |
3. Third, describe each subject with its pure action, appearance, expression, position if these fields exist. | |
4. Finally, declare the environment and lighting if the environment and lighting fields are not empty. | |
## Output | |
Please directly output the final composed caption without any additional information. | |
""" | |
class StructuralCaptionDataset(torch.utils.data.Dataset): | |
def __init__(self, input_csv, model_path, task): | |
if isinstance(input_csv, pd.DataFrame): | |
self.meta = input_csv | |
else: | |
self.meta = pd.read_csv(input_csv) | |
self.task = task | |
self.system_prompt = SYSTEM_PROMPT_T2V if self.task == 't2v' else SYSTEM_PROMPT_I2V | |
self.tokenizer = AutoTokenizer.from_pretrained(model_path) | |
def __len__(self): | |
return len(self.meta) | |
def __getitem__(self, index): | |
row = self.meta.iloc[index] | |
real_index = self.meta.index[index] | |
struct_caption = json.loads(row["structural_caption"]) | |
camera_movement = struct_caption.get('camera_motion', '') | |
if camera_movement != '': | |
camera_movement += '.' | |
camera_movement = camera_movement.capitalize() | |
fusion_by_llm = False | |
cleaned_struct_caption = self.clean_struct_caption(struct_caption, self.task) | |
if cleaned_struct_caption.get('num_subjects', 0) > 0: | |
new_struct_caption = json.dumps(cleaned_struct_caption, indent=4, ensure_ascii=False) | |
conversation = [ | |
{ | |
"role": "user", | |
"content": self.system_prompt.format(structured_input=new_struct_caption), | |
}, | |
] | |
text = self.tokenizer.apply_chat_template( | |
conversation, | |
tokenize=False, | |
add_generation_prompt=True, | |
enable_thinking=False, | |
) | |
fusion_by_llm = True | |
else: | |
text = '-' | |
return real_index, fusion_by_llm, text, '-', camera_movement | |
def clean_struct_caption(self, struct_caption, task): | |
raw_subjects = struct_caption.get('subjects', []) | |
subjects = [] | |
for subject in raw_subjects: | |
subject_type = subject.get("TYPES", {}).get('type', '') | |
subject_sub_type = subject.get("TYPES", {}).get('sub_type', '') | |
if subject_type not in ["Human", "Animal"]: | |
subject['expression'] = '' | |
if subject_type == 'Human' and subject_sub_type == 'Accessory': | |
subject['expression'] = '' | |
if subject_sub_type != '': | |
subject['name'] = subject_sub_type | |
if 'TYPES' in subject: | |
del subject['TYPES'] | |
if 'is_main_subject' in subject: | |
del subject['is_main_subject'] | |
subjects.append(subject) | |
to_del_subject_ids = [] | |
for idx, subject in enumerate(subjects): | |
action = subject.get('action', '').strip() | |
subject['action'] = action | |
if random.random() > 0.9 and 'appearance' in subject: | |
del subject['appearance'] | |
if random.random() > 0.9 and 'position' in subject: | |
del subject['position'] | |
if task == 'i2v': | |
# just keep name and action, expression in subjects | |
dropped_keys = ['appearance', 'position'] | |
for key in dropped_keys: | |
if key in subject: | |
del subject[key] | |
if subject['action'] == '' and ('expression' not in subject or subject['expression'] == ''): | |
to_del_subject_ids.append(idx) | |
# delete the subjects according to the to_del_subject_ids | |
for idx in sorted(to_del_subject_ids, reverse=True): | |
del subjects[idx] | |
new_struct_caption = { | |
'num_subjects': len(subjects), | |
'subjects': subjects, | |
'shot_type': struct_caption.get('shot_type', ''), | |
'shot_angle': struct_caption.get('shot_angle', ''), | |
'shot_position': struct_caption.get('shot_position', ''), | |
'environment': struct_caption.get('environment', ''), | |
'lighting': struct_caption.get('lighting', ''), | |
} | |
if task == 't2v' and random.random() > 0.9: | |
del new_struct_caption['lighting'] | |
if task == 'i2v': | |
drop_keys = ['environment', 'lighting', 'shot_type', 'shot_angle', 'shot_position'] | |
for drop_key in drop_keys: | |
del new_struct_caption[drop_key] | |
return new_struct_caption | |
class FusionCaptioner: | |
def __init__(self, model_path): | |
self.model = AutoModelForCausalLM.from_pretrained( | |
model_path, | |
torch_dtype="auto", | |
device_map="cuda", | |
) | |
self.model_path = model_path | |
self.tokenizer = AutoTokenizer.from_pretrained(model_path) | |
def __call__(self, structural_caption, task='t2v'): | |
if isinstance(structural_caption, dict): | |
structural_caption = json.dumps(structural_caption, ensure_ascii=False) | |
else: | |
structural_caption = json.dumps(json.loads(structural_caption), ensure_ascii=False) | |
meta = pd.DataFrame([structural_caption], columns=['structural_caption']) | |
dataset = StructuralCaptionDataset(meta, self.model_path, task) | |
_, fusion_by_llm, text, original_text, camera_movement = dataset[0] | |
if not fusion_by_llm: | |
caption = original_text + " " + camera_movement | |
return caption | |
model_inputs = self.tokenizer([text], return_tensors="pt").to(self.model.device) | |
generated_ids = self.model.generate(**model_inputs, max_new_tokens=1024, temperature=0.1) | |
generated_ids = [ | |
output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids) | |
] | |
result = self.tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0] | |
llm_caption = result + " " + camera_movement | |
return llm_caption | |