merge: branch 'lab/pred'

This commit is contained in:
alikia2x (寒寒) 2025-03-15 16:47:34 +08:00
commit afffbd8ecb
Signed by: alikia2x
GPG Key ID: 56209E0CCD8420C6
8 changed files with 408 additions and 0 deletions

2
.gitignore vendored
View File

@ -79,6 +79,8 @@ node_modules/
logs/ logs/
__pycache__ __pycache__
filter/runs filter/runs
pred/runs
pred/checkpoints
data/ data/
filter/checkpoints filter/checkpoints
scripts scripts

12
pred/count.py Normal file
View File

@ -0,0 +1,12 @@
# iterate all json files in ./data/pred
import os
import json
count = 0
for filename in os.listdir('./data/pred'):
if filename.endswith('.json'):
with open('./data/pred/' + filename, 'r') as f:
data = json.load(f)
count += len(data)
print(count)

19
pred/crawler.py Normal file
View File

@ -0,0 +1,19 @@
import os
import requests
import json
import time
with open("./pred/2", "r") as fp:
raw = fp.readlines()
aids = [ int(x.strip()) for x in raw ]
for aid in aids:
if os.path.exists(f"./data/pred/{aid}.json"):
continue
url = f"https://api.bunnyxt.com/tdd/v2/video/{aid}/record?last_count=5000"
r = requests.get(url)
data = r.json()
with open (f"./data/pred/{aid}.json", "w") as fp:
json.dump(data, fp, ensure_ascii=False, indent=4)
time.sleep(5)
print(aid)

178
pred/dataset.py Normal file
View File

@ -0,0 +1,178 @@
import os
import json
import random
import bisect
import numpy as np
import pandas as pd
import torch
from torch.utils.data import Dataset
import datetime
class VideoPlayDataset(Dataset):
def __init__(self, data_dir, publish_time_path, term='long', seed=42):
if seed is not None:
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
self.data_dir = data_dir
self.series_dict = self._load_and_process_data(publish_time_path)
self.valid_series = [s for s in self.series_dict.values() if len(s['abs_time']) > 1]
self.term = term
# Set time window based on term
self.time_window = 1000 * 24 * 3600 if term == 'long' else 7 * 24 * 3600
MINUTE = 60
HOUR = 3600
DAY = 24 * HOUR
if term == 'long':
self.feature_windows = [
1 * HOUR,
6 * HOUR,
1 *DAY,
3 * DAY,
7 * DAY,
30 * DAY,
100 * DAY
]
else:
self.feature_windows = [
( 15 * MINUTE, 0 * MINUTE),
( 40 * MINUTE, 0 * MINUTE),
( 1 * HOUR, 0 * HOUR),
( 2 * HOUR, 1 * HOUR),
( 3 * HOUR, 2 * HOUR),
( 3 * HOUR, 0 * HOUR),
#( 6 * HOUR, 3 * HOUR),
( 6 * HOUR, 0 * HOUR),
(18 * HOUR, 12 * HOUR),
#( 1 * DAY, 6 * HOUR),
( 1 * DAY, 0 * DAY),
#( 2 * DAY, 1 * DAY),
( 3 * DAY, 0 * DAY),
#( 4 * DAY, 1 * DAY),
( 7 * DAY, 0 * DAY)
]
def _extract_features(self, series, current_idx, target_idx):
current_time = series['abs_time'][current_idx]
current_play = series['play_count'][current_idx]
dt = datetime.datetime.fromtimestamp(current_time)
if self.term == 'long':
time_features = [
np.log2(max(current_time - series['create_time'], 1))
]
else:
time_features = [
(dt.hour * 3600 + dt.minute * 60 + dt.second) / 86400,
(dt.weekday() * 24 + dt.hour) / 168,
np.log2(max(current_time - series['create_time'], 1))
]
growth_features = []
if self.term == 'long':
for window in self.feature_windows:
prev_time = current_time - window
prev_idx = self._get_nearest_value(series, prev_time, current_idx)
if prev_idx is not None:
time_diff = current_time - series['abs_time'][prev_idx]
play_diff = current_play - series['play_count'][prev_idx]
scaled_diff = play_diff / (time_diff / window) if time_diff > 0 else 0.0
else:
scaled_diff = 0.0
growth_features.append(np.log2(max(scaled_diff, 1)))
else:
for window_start, window_end in self.feature_windows:
prev_time_start = current_time - window_start
prev_time_end = current_time - window_end # window_end is typically 0
prev_idx_start = self._get_nearest_value(series, prev_time_start, current_idx)
prev_idx_end = self._get_nearest_value(series, prev_time_end, current_idx)
if prev_idx_start is not None and prev_idx_end is not None:
time_diff = series['abs_time'][prev_idx_end] - series['abs_time'][prev_idx_start]
play_diff = series['play_count'][prev_idx_end] - series['play_count'][prev_idx_start]
scaled_diff = play_diff / (time_diff / (window_start - window_end)) if time_diff > 0 else 0.0
else:
scaled_diff = 0.0
growth_features.append(np.log2(max(scaled_diff, 1)))
time_diff = series['abs_time'][target_idx] - current_time
return [np.log2(max(time_diff, 1))] + [np.log2(current_play + 1)] + growth_features + time_features
def _load_and_process_data(self, publish_time_path):
publish_df = pd.read_csv(publish_time_path)
publish_df['published_at'] = pd.to_datetime(publish_df['published_at'])
publish_dict = dict(zip(publish_df['aid'], publish_df['published_at']))
series_dict = {}
for filename in os.listdir(self.data_dir):
if not filename.endswith('.json'):
continue
with open(os.path.join(self.data_dir, filename), 'r') as f:
data = json.load(f)
if 'code' in data:
continue
for item in data:
aid = item['aid']
published_time = pd.to_datetime(publish_dict[aid]).timestamp()
if aid not in series_dict:
series_dict[aid] = {
'abs_time': [],
'play_count': [],
'create_time': published_time
}
series_dict[aid]['abs_time'].append(item['added'])
series_dict[aid]['play_count'].append(item['view'])
# Sort each series by absolute time
for aid in series_dict:
sorted_indices = sorted(range(len(series_dict[aid]['abs_time'])),
key=lambda k: series_dict[aid]['abs_time'][k])
series_dict[aid]['abs_time'] = [series_dict[aid]['abs_time'][i] for i in sorted_indices]
series_dict[aid]['play_count'] = [series_dict[aid]['play_count'][i] for i in sorted_indices]
return series_dict
def __len__(self):
return 100000 # Virtual length for sampling
def _get_nearest_value(self, series, target_time, current_idx):
times = series['abs_time']
pos = bisect.bisect_right(times, target_time, 0, current_idx + 1)
candidates = []
if pos > 0:
candidates.append(pos - 1)
if pos <= current_idx:
candidates.append(pos)
if not candidates:
return None
closest_idx = min(candidates, key=lambda i: abs(times[i] - target_time))
return closest_idx
def __getitem__(self, _idx):
while True:
series = random.choice(self.valid_series)
if len(series['abs_time']) < 2:
continue
current_idx = random.randint(0, len(series['abs_time']) - 2)
current_time = series['abs_time'][current_idx]
max_target_time = current_time + self.time_window
candidate_indices = []
for j in range(current_idx + 1, len(series['abs_time'])):
if series['abs_time'][j] > max_target_time:
break
candidate_indices.append(j)
if not candidate_indices:
continue
target_idx = random.choice(candidate_indices)
break
current_play = series['play_count'][current_idx]
target_play = series['play_count'][target_idx]
target_delta = max(target_play - current_play, 0)
return {
'features': torch.FloatTensor(self._extract_features(series, current_idx, target_idx)),
'target': torch.log2(torch.FloatTensor([target_delta]) + 1)
}
def collate_fn(batch):
return {
'features': torch.stack([x['features'] for x in batch]),
'targets': torch.stack([x['target'] for x in batch])
}

28
pred/export_onnx.py Normal file
View File

@ -0,0 +1,28 @@
import torch
import torch.onnx
from model import CompactPredictor
def export_model(input_size, checkpoint_path, onnx_path):
model = CompactPredictor(input_size)
model.load_state_dict(torch.load(checkpoint_path))
dummy_input = torch.randn(1, input_size)
model.eval()
torch.onnx.export(model, # Model to be exported
dummy_input, # Model input
onnx_path, # Save path
export_params=True, # Whether to export model parameters
opset_version=11, # ONNX opset version
do_constant_folding=True, # Whether to perform constant folding optimization
input_names=['input'], # Input node name
output_names=['output'], # Output node name
dynamic_axes={'input': {0: 'batch_size'}, # Dynamic batch size
'output': {0: 'batch_size'}})
print(f"ONNX model has been exported to: {onnx_path}")
if __name__ == '__main__':
export_model(10, './pred/checkpoints/long_term.pt', 'long_term.onnx')
export_model(12, './pred/checkpoints/short_term.pt', 'short_term.onnx')

32
pred/inference.py Normal file
View File

@ -0,0 +1,32 @@
import datetime
import numpy as np
from model import CompactPredictor
import torch
def main():
model = CompactPredictor(16).to('cpu', dtype=torch.float32)
model.load_state_dict(torch.load('./pred/checkpoints/model_20250315_0530.pt'))
model.eval()
# inference
initial = 999269
last = initial
start_time = '2025-03-15 01:03:21'
for i in range(1, 48):
hour = i / 0.5
sec = hour * 3600
time_d = np.log2(sec)
data = [time_d, np.log2(initial+1), # time_delta, current_views
2.801318, 3.455128, 3.903391, 3.995577, 4.641488, 5.75131, 6.723868, 6.105322, 8.141023, 9.576701, 10.665067, # grows_feat
0.043993, 0.72057, 28.000902 # time_feat
]
np_arr = np.array([data])
tensor = torch.from_numpy(np_arr).to('cpu', dtype=torch.float32)
output = model(tensor)
num = output.detach().numpy()[0][0]
views_pred = int(np.exp2(num)) + initial
current_time = datetime.datetime.strptime(start_time, '%Y-%m-%d %H:%M:%S') + datetime.timedelta(hours=hour)
print(current_time.strftime('%m-%d %H:%M'), views_pred, views_pred - last)
last = views_pred
if __name__ == '__main__':
main()

23
pred/model.py Normal file
View File

@ -0,0 +1,23 @@
import torch.nn as nn
class CompactPredictor(nn.Module):
def __init__(self, input_size):
super().__init__()
self.net = nn.Sequential(
nn.BatchNorm1d(input_size),
nn.Linear(input_size, 256),
nn.LeakyReLU(0.1),
nn.Dropout(0.3),
nn.Linear(256, 128),
nn.LeakyReLU(0.1),
nn.Dropout(0.2),
nn.Linear(128, 64),
nn.Tanh(), # Use Tanh to limit the output range
nn.Linear(64, 1)
)
# Initialize the last layer to values close to zero
nn.init.uniform_(self.net[-1].weight, -0.01, 0.01)
nn.init.constant_(self.net[-1].bias, 0.0)
def forward(self, x):
return self.net(x)

114
pred/train.py Normal file
View File

@ -0,0 +1,114 @@
import random
import time
import numpy as np
from torch.utils.tensorboard import SummaryWriter
from torch.utils.data import DataLoader
import torch
from dataset import VideoPlayDataset, collate_fn
from pred.model import CompactPredictor
def asymmetricHuberLoss(delta=1.0, beta=1.3):
"""
创建一个可调用的非对称 Huber 损失函数
参数
delta (float): Huber 损失的 delta 参数
beta (float): 控制负误差惩罚的系数
返回
callable: 可调用的损失函数
"""
def loss_function(input, target):
error = input - target
abs_error = torch.abs(error)
linear_loss = abs_error - 0.5 * delta
quadratic_loss = 0.5 * error**2
loss = torch.where(abs_error < delta, quadratic_loss, linear_loss)
loss = torch.where(error < 0, beta * loss, loss)
return torch.mean(loss)
return loss_function
def train(model, dataloader, device, epochs=100):
writer = SummaryWriter(f'./pred/runs/play_predictor_{time.strftime("%Y%m%d_%H%M")}')
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-3, weight_decay=0.01)
scheduler = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr=1e-3,
total_steps=len(dataloader)*30)
# Huber loss
criterion = asymmetricHuberLoss(delta=1.0, beta=2.1)
model.train()
global_step = 0
for epoch in range(epochs):
total_loss = 0.0
for batch_idx, batch in enumerate(dataloader):
features = batch['features'].to(device)
targets = batch['targets'].to(device)
optimizer.zero_grad()
outputs = model(features)
loss = criterion(outputs, targets)
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()
scheduler.step()
total_loss += loss.item()
global_step += 1
if global_step % 100 == 0:
writer.add_scalar('Loss/train', loss.item(), global_step)
writer.add_scalar('LR', scheduler.get_last_lr()[0], global_step)
if batch_idx % 50 == 0:
# Monitor gradients
grad_norms = [
torch.norm(p.grad).item()
for p in model.parameters() if p.grad is not None
]
writer.add_scalar('Grad/Norm', sum(grad_norms)/len(grad_norms), global_step)
# Monitor parameter values
param_means = [torch.mean(p.data).item() for p in model.parameters()]
writer.add_scalar('Params/Mean', sum(param_means)/len(param_means), global_step)
samples_count = len(targets)
good = 0
for r in range(samples_count):
r = random.randint(0, samples_count-1)
t = float(torch.exp2(targets[r])) - 1
o = float(torch.exp2(outputs[r])) - 1
d = features[r].cpu().numpy()[0]
speed = np.exp2(features[r].cpu().numpy()[8]) / 6
time_diff = np.exp2(d) / 3600
inc = speed * time_diff
model_error = abs(t - o)
reg_error = abs(inc - t)
if model_error < reg_error:
good += 1
#print(f"{t:07.1f} | {o:07.1f} | {d:07.1f} | {inc:07.1f} | {good/samples_count*100:.1f}%")
writer.add_scalar('Train/WinRate', good/samples_count, global_step)
print(f"Epoch {epoch+1} | Avg Loss: {total_loss/len(dataloader):.4f}")
writer.close()
return model
if __name__ == "__main__":
device = 'mps'
# Initialize dataset and model
dataset = VideoPlayDataset('./data/pred', './data/pred/publish_time.csv', 'short')
dataloader = DataLoader(dataset, batch_size=128, shuffle=True, collate_fn=collate_fn)
# Get feature dimension
sample = next(iter(dataloader))
input_size = sample['features'].shape[1]
model = CompactPredictor(input_size).to(device)
trained_model = train(model, dataloader, device, epochs=18)
# Save model
torch.save(trained_model.state_dict(), f"./pred/checkpoints/model_{time.strftime('%Y%m%d_%H%M')}.pt")