diff --git a/.gitignore b/.gitignore index b27a6b6..31d6ddf 100644 --- a/.gitignore +++ b/.gitignore @@ -79,6 +79,8 @@ node_modules/ logs/ __pycache__ filter/runs +pred/runs +pred/checkpoints data/ filter/checkpoints scripts diff --git a/pred/count.py b/pred/count.py new file mode 100644 index 0000000..5ed2d81 --- /dev/null +++ b/pred/count.py @@ -0,0 +1,12 @@ +# iterate all json files in ./data/pred + +import os +import json + +count = 0 +for filename in os.listdir('./data/pred'): + if filename.endswith('.json'): + with open('./data/pred/' + filename, 'r') as f: + data = json.load(f) + count += len(data) +print(count) \ No newline at end of file diff --git a/pred/crawler.py b/pred/crawler.py new file mode 100644 index 0000000..53008d8 --- /dev/null +++ b/pred/crawler.py @@ -0,0 +1,19 @@ +import os +import requests +import json +import time + +with open("./pred/2", "r") as fp: + raw = fp.readlines() + aids = [ int(x.strip()) for x in raw ] + +for aid in aids: + if os.path.exists(f"./data/pred/{aid}.json"): + continue + url = f"https://api.bunnyxt.com/tdd/v2/video/{aid}/record?last_count=5000" + r = requests.get(url) + data = r.json() + with open (f"./data/pred/{aid}.json", "w") as fp: + json.dump(data, fp, ensure_ascii=False, indent=4) + time.sleep(5) + print(aid) \ No newline at end of file diff --git a/pred/dataset.py b/pred/dataset.py new file mode 100644 index 0000000..9ed4846 --- /dev/null +++ b/pred/dataset.py @@ -0,0 +1,178 @@ +import os +import json +import random +import bisect +import numpy as np +import pandas as pd +import torch +from torch.utils.data import Dataset +import datetime + +class VideoPlayDataset(Dataset): + def __init__(self, data_dir, publish_time_path, term='long', seed=42): + if seed is not None: + random.seed(seed) + np.random.seed(seed) + torch.manual_seed(seed) + + self.data_dir = data_dir + self.series_dict = self._load_and_process_data(publish_time_path) + self.valid_series = [s for s in self.series_dict.values() if len(s['abs_time']) > 1] + self.term = term + # Set time window based on term + self.time_window = 1000 * 24 * 3600 if term == 'long' else 7 * 24 * 3600 + MINUTE = 60 + HOUR = 3600 + DAY = 24 * HOUR + + if term == 'long': + self.feature_windows = [ + 1 * HOUR, + 6 * HOUR, + 1 *DAY, + 3 * DAY, + 7 * DAY, + 30 * DAY, + 100 * DAY + ] + else: + self.feature_windows = [ + ( 15 * MINUTE, 0 * MINUTE), + ( 40 * MINUTE, 0 * MINUTE), + ( 1 * HOUR, 0 * HOUR), + ( 2 * HOUR, 1 * HOUR), + ( 3 * HOUR, 2 * HOUR), + ( 3 * HOUR, 0 * HOUR), + #( 6 * HOUR, 3 * HOUR), + ( 6 * HOUR, 0 * HOUR), + (18 * HOUR, 12 * HOUR), + #( 1 * DAY, 6 * HOUR), + ( 1 * DAY, 0 * DAY), + #( 2 * DAY, 1 * DAY), + ( 3 * DAY, 0 * DAY), + #( 4 * DAY, 1 * DAY), + ( 7 * DAY, 0 * DAY) + ] + + def _extract_features(self, series, current_idx, target_idx): + current_time = series['abs_time'][current_idx] + current_play = series['play_count'][current_idx] + dt = datetime.datetime.fromtimestamp(current_time) + + if self.term == 'long': + time_features = [ + np.log2(max(current_time - series['create_time'], 1)) + ] + else: + time_features = [ + (dt.hour * 3600 + dt.minute * 60 + dt.second) / 86400, + (dt.weekday() * 24 + dt.hour) / 168, + np.log2(max(current_time - series['create_time'], 1)) + ] + + growth_features = [] + if self.term == 'long': + for window in self.feature_windows: + prev_time = current_time - window + prev_idx = self._get_nearest_value(series, prev_time, current_idx) + if prev_idx is not None: + time_diff = current_time - series['abs_time'][prev_idx] + play_diff = current_play - series['play_count'][prev_idx] + scaled_diff = play_diff / (time_diff / window) if time_diff > 0 else 0.0 + else: + scaled_diff = 0.0 + growth_features.append(np.log2(max(scaled_diff, 1))) + else: + for window_start, window_end in self.feature_windows: + prev_time_start = current_time - window_start + prev_time_end = current_time - window_end # window_end is typically 0 + prev_idx_start = self._get_nearest_value(series, prev_time_start, current_idx) + prev_idx_end = self._get_nearest_value(series, prev_time_end, current_idx) + if prev_idx_start is not None and prev_idx_end is not None: + time_diff = series['abs_time'][prev_idx_end] - series['abs_time'][prev_idx_start] + play_diff = series['play_count'][prev_idx_end] - series['play_count'][prev_idx_start] + scaled_diff = play_diff / (time_diff / (window_start - window_end)) if time_diff > 0 else 0.0 + else: + scaled_diff = 0.0 + growth_features.append(np.log2(max(scaled_diff, 1))) + + time_diff = series['abs_time'][target_idx] - current_time + return [np.log2(max(time_diff, 1))] + [np.log2(current_play + 1)] + growth_features + time_features + + def _load_and_process_data(self, publish_time_path): + publish_df = pd.read_csv(publish_time_path) + publish_df['published_at'] = pd.to_datetime(publish_df['published_at']) + publish_dict = dict(zip(publish_df['aid'], publish_df['published_at'])) + series_dict = {} + for filename in os.listdir(self.data_dir): + if not filename.endswith('.json'): + continue + with open(os.path.join(self.data_dir, filename), 'r') as f: + data = json.load(f) + if 'code' in data: + continue + for item in data: + aid = item['aid'] + published_time = pd.to_datetime(publish_dict[aid]).timestamp() + if aid not in series_dict: + series_dict[aid] = { + 'abs_time': [], + 'play_count': [], + 'create_time': published_time + } + series_dict[aid]['abs_time'].append(item['added']) + series_dict[aid]['play_count'].append(item['view']) + # Sort each series by absolute time + for aid in series_dict: + sorted_indices = sorted(range(len(series_dict[aid]['abs_time'])), + key=lambda k: series_dict[aid]['abs_time'][k]) + series_dict[aid]['abs_time'] = [series_dict[aid]['abs_time'][i] for i in sorted_indices] + series_dict[aid]['play_count'] = [series_dict[aid]['play_count'][i] for i in sorted_indices] + return series_dict + + def __len__(self): + return 100000 # Virtual length for sampling + + def _get_nearest_value(self, series, target_time, current_idx): + times = series['abs_time'] + pos = bisect.bisect_right(times, target_time, 0, current_idx + 1) + candidates = [] + if pos > 0: + candidates.append(pos - 1) + if pos <= current_idx: + candidates.append(pos) + if not candidates: + return None + closest_idx = min(candidates, key=lambda i: abs(times[i] - target_time)) + return closest_idx + + def __getitem__(self, _idx): + while True: + series = random.choice(self.valid_series) + if len(series['abs_time']) < 2: + continue + current_idx = random.randint(0, len(series['abs_time']) - 2) + current_time = series['abs_time'][current_idx] + max_target_time = current_time + self.time_window + candidate_indices = [] + for j in range(current_idx + 1, len(series['abs_time'])): + if series['abs_time'][j] > max_target_time: + break + candidate_indices.append(j) + if not candidate_indices: + continue + target_idx = random.choice(candidate_indices) + break + current_play = series['play_count'][current_idx] + target_play = series['play_count'][target_idx] + target_delta = max(target_play - current_play, 0) + return { + 'features': torch.FloatTensor(self._extract_features(series, current_idx, target_idx)), + 'target': torch.log2(torch.FloatTensor([target_delta]) + 1) + } + +def collate_fn(batch): + return { + 'features': torch.stack([x['features'] for x in batch]), + 'targets': torch.stack([x['target'] for x in batch]) + } \ No newline at end of file diff --git a/pred/export_onnx.py b/pred/export_onnx.py new file mode 100644 index 0000000..c7b4d59 --- /dev/null +++ b/pred/export_onnx.py @@ -0,0 +1,28 @@ +import torch +import torch.onnx +from model import CompactPredictor + +def export_model(input_size, checkpoint_path, onnx_path): + model = CompactPredictor(input_size) + model.load_state_dict(torch.load(checkpoint_path)) + + dummy_input = torch.randn(1, input_size) + + model.eval() + + torch.onnx.export(model, # Model to be exported + dummy_input, # Model input + onnx_path, # Save path + export_params=True, # Whether to export model parameters + opset_version=11, # ONNX opset version + do_constant_folding=True, # Whether to perform constant folding optimization + input_names=['input'], # Input node name + output_names=['output'], # Output node name + dynamic_axes={'input': {0: 'batch_size'}, # Dynamic batch size + 'output': {0: 'batch_size'}}) + + print(f"ONNX model has been exported to: {onnx_path}") + +if __name__ == '__main__': + export_model(10, './pred/checkpoints/long_term.pt', 'long_term.onnx') + export_model(12, './pred/checkpoints/short_term.pt', 'short_term.onnx') diff --git a/pred/inference.py b/pred/inference.py new file mode 100644 index 0000000..9a3d678 --- /dev/null +++ b/pred/inference.py @@ -0,0 +1,32 @@ +import datetime +import numpy as np +from model import CompactPredictor +import torch + +def main(): + model = CompactPredictor(16).to('cpu', dtype=torch.float32) + model.load_state_dict(torch.load('./pred/checkpoints/model_20250315_0530.pt')) + model.eval() + # inference + initial = 999269 + last = initial + start_time = '2025-03-15 01:03:21' + for i in range(1, 48): + hour = i / 0.5 + sec = hour * 3600 + time_d = np.log2(sec) + data = [time_d, np.log2(initial+1), # time_delta, current_views + 2.801318, 3.455128, 3.903391, 3.995577, 4.641488, 5.75131, 6.723868, 6.105322, 8.141023, 9.576701, 10.665067, # grows_feat + 0.043993, 0.72057, 28.000902 # time_feat + ] + np_arr = np.array([data]) + tensor = torch.from_numpy(np_arr).to('cpu', dtype=torch.float32) + output = model(tensor) + num = output.detach().numpy()[0][0] + views_pred = int(np.exp2(num)) + initial + current_time = datetime.datetime.strptime(start_time, '%Y-%m-%d %H:%M:%S') + datetime.timedelta(hours=hour) + print(current_time.strftime('%m-%d %H:%M'), views_pred, views_pred - last) + last = views_pred + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/pred/model.py b/pred/model.py new file mode 100644 index 0000000..191e3eb --- /dev/null +++ b/pred/model.py @@ -0,0 +1,23 @@ +import torch.nn as nn + +class CompactPredictor(nn.Module): + def __init__(self, input_size): + super().__init__() + self.net = nn.Sequential( + nn.BatchNorm1d(input_size), + nn.Linear(input_size, 256), + nn.LeakyReLU(0.1), + nn.Dropout(0.3), + nn.Linear(256, 128), + nn.LeakyReLU(0.1), + nn.Dropout(0.2), + nn.Linear(128, 64), + nn.Tanh(), # Use Tanh to limit the output range + nn.Linear(64, 1) + ) + # Initialize the last layer to values close to zero + nn.init.uniform_(self.net[-1].weight, -0.01, 0.01) + nn.init.constant_(self.net[-1].bias, 0.0) + + def forward(self, x): + return self.net(x) diff --git a/pred/train.py b/pred/train.py new file mode 100644 index 0000000..b162163 --- /dev/null +++ b/pred/train.py @@ -0,0 +1,114 @@ +import random +import time +import numpy as np +from torch.utils.tensorboard import SummaryWriter +from torch.utils.data import DataLoader +import torch +from dataset import VideoPlayDataset, collate_fn +from pred.model import CompactPredictor + +def asymmetricHuberLoss(delta=1.0, beta=1.3): + """ + 创建一个可调用的非对称 Huber 损失函数。 + + 参数: + delta (float): Huber 损失的 delta 参数。 + beta (float): 控制负误差惩罚的系数。 + + 返回: + callable: 可调用的损失函数。 + """ + def loss_function(input, target): + error = input - target + abs_error = torch.abs(error) + + linear_loss = abs_error - 0.5 * delta + quadratic_loss = 0.5 * error**2 + + loss = torch.where(abs_error < delta, quadratic_loss, linear_loss) + loss = torch.where(error < 0, beta * loss, loss) + + return torch.mean(loss) + + return loss_function + +def train(model, dataloader, device, epochs=100): + writer = SummaryWriter(f'./pred/runs/play_predictor_{time.strftime("%Y%m%d_%H%M")}') + optimizer = torch.optim.AdamW(model.parameters(), lr=1e-3, weight_decay=0.01) + scheduler = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr=1e-3, + total_steps=len(dataloader)*30) + # Huber loss + criterion = asymmetricHuberLoss(delta=1.0, beta=2.1) + + model.train() + global_step = 0 + for epoch in range(epochs): + total_loss = 0.0 + for batch_idx, batch in enumerate(dataloader): + features = batch['features'].to(device) + targets = batch['targets'].to(device) + + optimizer.zero_grad() + outputs = model(features) + loss = criterion(outputs, targets) + loss.backward() + torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) + optimizer.step() + scheduler.step() + + total_loss += loss.item() + global_step += 1 + + if global_step % 100 == 0: + writer.add_scalar('Loss/train', loss.item(), global_step) + writer.add_scalar('LR', scheduler.get_last_lr()[0], global_step) + if batch_idx % 50 == 0: + # Monitor gradients + grad_norms = [ + torch.norm(p.grad).item() + for p in model.parameters() if p.grad is not None + ] + writer.add_scalar('Grad/Norm', sum(grad_norms)/len(grad_norms), global_step) + + # Monitor parameter values + param_means = [torch.mean(p.data).item() for p in model.parameters()] + writer.add_scalar('Params/Mean', sum(param_means)/len(param_means), global_step) + + samples_count = len(targets) + good = 0 + for r in range(samples_count): + r = random.randint(0, samples_count-1) + t = float(torch.exp2(targets[r])) - 1 + o = float(torch.exp2(outputs[r])) - 1 + d = features[r].cpu().numpy()[0] + speed = np.exp2(features[r].cpu().numpy()[8]) / 6 + time_diff = np.exp2(d) / 3600 + inc = speed * time_diff + model_error = abs(t - o) + reg_error = abs(inc - t) + if model_error < reg_error: + good += 1 + #print(f"{t:07.1f} | {o:07.1f} | {d:07.1f} | {inc:07.1f} | {good/samples_count*100:.1f}%") + writer.add_scalar('Train/WinRate', good/samples_count, global_step) + + print(f"Epoch {epoch+1} | Avg Loss: {total_loss/len(dataloader):.4f}") + + writer.close() + return model + +if __name__ == "__main__": + device = 'mps' + + # Initialize dataset and model + dataset = VideoPlayDataset('./data/pred', './data/pred/publish_time.csv', 'short') + dataloader = DataLoader(dataset, batch_size=128, shuffle=True, collate_fn=collate_fn) + + # Get feature dimension + sample = next(iter(dataloader)) + input_size = sample['features'].shape[1] + + model = CompactPredictor(input_size).to(device) + trained_model = train(model, dataloader, device, epochs=18) + + # Save model + torch.save(trained_model.state_dict(), f"./pred/checkpoints/model_{time.strftime('%Y%m%d_%H%M')}.pt")