From 296e4ef4d6890de55adc4ecbc9bef526bd5e792c Mon Sep 17 00:00:00 2001 From: alikia2x Date: Wed, 12 Mar 2025 03:51:39 +0800 Subject: [PATCH] add: the pred model prototype --- pred/1 | 4 - pred/count.py | 12 +++ pred/dataset.py | 206 ++++++++++++++++++++++++++++++++++++++++++++++++ pred/model.py | 182 ++++++++++++++++++++++++++++++++++++++++++ pred/train.py | 76 ++++++++++++++++++ 5 files changed, 476 insertions(+), 4 deletions(-) create mode 100644 pred/count.py create mode 100644 pred/dataset.py create mode 100644 pred/model.py create mode 100644 pred/train.py diff --git a/pred/1 b/pred/1 index f8504a0..8b9c350 100644 --- a/pred/1 +++ b/pred/1 @@ -8,14 +8,12 @@ 1951059019 799277283 844610791 -113469781180858 1706212240 339432 243913657 16576108 583566710 802536340 -112977722344665 2976394 8321047 261045912 @@ -24,7 +22,6 @@ 316228425 257550414 242976248 -113793849892846 9230106 517962327 752662232 @@ -63,7 +60,6 @@ 651329836 875035826 469433434 -113418006693380 58814955 33044780 946091445 \ No newline at end of file diff --git a/pred/count.py b/pred/count.py new file mode 100644 index 0000000..5ed2d81 --- /dev/null +++ b/pred/count.py @@ -0,0 +1,12 @@ +# iterate all json files in ./data/pred + +import os +import json + +count = 0 +for filename in os.listdir('./data/pred'): + if filename.endswith('.json'): + with open('./data/pred/' + filename, 'r') as f: + data = json.load(f) + count += len(data) +print(count) \ No newline at end of file diff --git a/pred/dataset.py b/pred/dataset.py new file mode 100644 index 0000000..1de35c1 --- /dev/null +++ b/pred/dataset.py @@ -0,0 +1,206 @@ +import os +import json +import random +import pandas as pd +import numpy as np +from torch.utils.data import Dataset +from datetime import datetime +import torch + +class VideoPlayDataset(Dataset): + def __init__(self, data_dir, publish_time_path, + min_seq_len=6, max_seq_len=200, + min_forecast_span=60, max_forecast_span=604800): + """ + 改进后的数据集类,支持非等间隔时间序列 + :param data_dir: JSON文件目录 + :param publish_time_path: 发布时间CSV路径 + :param min_seq_len: 最小历史数据点数 + :param max_seq_len: 最大历史数据点数 + :param min_forecast_span: 最小预测时间跨度(秒) + :param max_forecast_span: 最大预测时间跨度(秒) + """ + self.data_dir = data_dir + self.min_seq_len = min_seq_len + self.max_seq_len = max_seq_len + self.min_forecast_span = min_forecast_span + self.max_forecast_span = max_forecast_span + self.series_dict = self._load_and_process_data(data_dir, publish_time_path) + self.valid_series = self._generate_valid_series() + + def _load_and_process_data(self, data_dir, publish_time_path): + # 加载发布时间数据 + publish_df = pd.read_csv(publish_time_path) + publish_df['published_at'] = pd.to_datetime(publish_df['published_at']) + publish_dict = dict(zip(publish_df['aid'], publish_df['published_at'])) + + # 加载并处理JSON数据 + series_dict = {} + for filename in os.listdir(data_dir): + if not filename.endswith('.json'): + continue + filepath = os.path.join(data_dir, filename) + with open(filepath, 'r', encoding='utf-8') as f: + json_data = json.load(f) + for item in json_data: + aid = item['aid'] + if aid not in publish_dict: + continue + + # 计算相对时间 + added_time = datetime.fromtimestamp(item['added']) + published_time = publish_dict[aid] + rel_time = (added_time - published_time).total_seconds() + + # 按视频组织数据 + if aid not in series_dict: + series_dict[aid] = { + 'abs_time': [], + 'rel_time': [], + 'play_count': [] + } + + series_dict[aid]['abs_time'].append(item['added']) + series_dict[aid]['rel_time'].append(rel_time) + series_dict[aid]['play_count'].append(item['view']) + + # 按时间排序并计算时间间隔 + for aid in series_dict: + # 按时间排序 + sorted_idx = np.argsort(series_dict[aid]['abs_time']) + for key in ['abs_time', 'rel_time', 'play_count']: + series_dict[aid][key] = np.array(series_dict[aid][key])[sorted_idx] + + # 计算时间间隔特征 + abs_time_arr = series_dict[aid]['abs_time'] + time_deltas = np.diff(abs_time_arr, prepend=abs_time_arr[0]) + series_dict[aid]['time_delta'] = time_deltas + + return series_dict + + def _generate_valid_series(self): + # 生成有效数据序列 + valid_series = [] + for aid in self.series_dict: + series = self.series_dict[aid] + n_points = len(series['play_count']) + + # 过滤数据量不足的视频 + if n_points < self.min_seq_len + 1: + continue + + valid_series.append({ + 'aid': aid, + 'length': n_points, + 'abs_time': series['abs_time'], + 'rel_time': series['rel_time'], + 'play_count': series['play_count'], + 'time_delta': series['time_delta'] + }) + return valid_series + + def __len__(self): + return sum(s['length'] - self.min_seq_len for s in self.valid_series) + + def __getitem__(self, idx): + # 随机选择视频序列 + series = random.choice(self.valid_series) + max_start = series['length'] - self.min_seq_len - 1 + start_idx = random.randint(0, max_start) + + # 动态确定历史窗口长度 + seq_len = random.randint(self.min_seq_len, min(self.max_seq_len, series['length'] - start_idx - 1)) + end_idx = start_idx + seq_len + + # 提取历史窗口特征 + hist_slice = slice(start_idx, end_idx) + x_play = np.log1p(series['play_count'][hist_slice]) + x_abs_time = series['abs_time'][hist_slice] + x_rel_time = series['rel_time'][hist_slice] + x_time_delta = series['time_delta'][hist_slice] + + # 生成预测目标(动态时间跨度) + forecast_span = random.randint(self.min_forecast_span, self.max_forecast_span) + target_time = x_abs_time[-1] + forecast_span + + # 寻找实际目标点(处理数据间隙) + future_times = series['abs_time'][end_idx:] + future_plays = series['play_count'][end_idx:] + + # 找到第一个超过目标时间的点 + target_idx = np.searchsorted(future_times, target_time) + if target_idx >= len(future_times): + # 若超出数据范围,取最后一个点 + y_play = future_plays[-1] if len(future_plays) > 0 else x_play[-1] + actual_span = future_times[-1] - x_abs_time[-1] if len(future_times) > 0 else self.max_forecast_span + else: + y_play = future_plays[target_idx] + actual_span = future_times[target_idx] - x_abs_time[-1] + + y_play_val = np.log1p(y_play) + + # 构造时间相关特征 + time_features = np.stack([ + x_abs_time, + x_rel_time, + x_time_delta, + np.log1p(x_time_delta), # 对数变换处理长尾分布 + (x_time_delta > 3600).astype(float) # 间隔是否大于1小时 + ], axis=-1) + + return { + 'x_play': torch.FloatTensor(x_play), + 'x_time_feat': torch.FloatTensor(time_features), + 'y_play': torch.FloatTensor([y_play_val]), + 'forecast_span': torch.FloatTensor([actual_span]) + } + +def collate_fn(batch): + """动态填充处理""" + max_len = max(item['x_play'].shape[0] for item in batch) + + padded_batch = { + 'x_play': [], + 'x_time_feat': [], + 'y_play': [], + 'forecast_span': [], + 'padding_mask': [] + } + + for item in batch: + seq_len = item['x_play'].shape[0] + pad_len = max_len - seq_len + + # 填充播放量数据 + padded_play = torch.cat([ + item['x_play'], + torch.zeros(pad_len) + ]) + padded_batch['x_play'].append(padded_play) + + # 填充时间特征 + padded_time_feat = torch.cat([ + item['x_time_feat'], + torch.zeros(pad_len, item['x_time_feat'].shape[1]) + ]) + padded_batch['x_time_feat'].append(padded_time_feat) + + # 创建padding mask + mask = torch.cat([ + torch.ones(seq_len), + torch.zeros(pad_len) + ]) + padded_batch['padding_mask'].append(mask.bool()) + + # 其他字段 + padded_batch['y_play'].append(item['y_play']) + padded_batch['forecast_span'].append(item['forecast_span']) + + # 转换为张量 + padded_batch['x_play'] = torch.stack(padded_batch['x_play']) + padded_batch['x_time_feat'] = torch.stack(padded_batch['x_time_feat']) + padded_batch['y_play'] = torch.stack(padded_batch['y_play']) + padded_batch['forecast_span'] = torch.stack(padded_batch['forecast_span']) + padded_batch['padding_mask'] = torch.stack(padded_batch['padding_mask']) + + return padded_batch diff --git a/pred/model.py b/pred/model.py new file mode 100644 index 0000000..1754087 --- /dev/null +++ b/pred/model.py @@ -0,0 +1,182 @@ +import torch +import torch.nn as nn +import torch.nn.functional as F +import math + +class TimeEmbedding(nn.Module): + """时间特征编码模块""" + def __init__(self, embed_dim): + super().__init__() + self.embed_dim = embed_dim + + self.norm = nn.LayerNorm(5) + + # 时间特征编码(适配新的5维时间特征) + self.time_encoder = nn.Sequential( + nn.Linear(5, 64), # 输入维度对应x_time_feat的5个特征 + nn.GELU(), + nn.LayerNorm(64), + nn.Linear(64, embed_dim) + ) + + def forward(self, time_feat): + """ + time_feat: 时间特征 (batch, seq_len, 5) + """ + time_feat = self.norm(time_feat) # 应用归一化 + return self.time_encoder(time_feat) + + +class MultiScaleEncoder(nn.Module): + """多尺度特征编码器""" + def __init__(self, input_dim, d_model, nhead, conv_kernels=[3, 7, 23]): + super().__init__() + self.d_model = d_model + + self.conv_branches = nn.ModuleList([ + nn.Sequential( + nn.Conv1d(input_dim, d_model, kernel_size=k, padding=k//2), + nn.GELU(), + ) for k in conv_kernels + ]) + + # 添加 LayerNorm 到单独的列表中 + self.layer_norms = nn.ModuleList([nn.LayerNorm(d_model) for _ in conv_kernels]) + + # Transformer编码器 + self.transformer = nn.TransformerEncoder( + nn.TransformerEncoderLayer( + d_model, + nhead, + dim_feedforward=d_model*4, + batch_first=True # 修改为batch_first + ), + num_layers=4 + ) + + # 特征融合层 + self.fusion = nn.Linear(d_model*(len(conv_kernels)+1), d_model) + + def forward(self, x, padding_mask=None): + """ + x: 输入特征 (batch, seq_len, input_dim) + padding_mask: 填充掩码 (batch, seq_len) + """ + + # 卷积分支处理 + conv_features = [] + x_conv = x.permute(0, 2, 1) # (batch, input_dim, seq_len) + for i, branch in enumerate(self.conv_branches): + feat = branch(x_conv) # 输出形状 (batch, d_model, seq_len) + # 手动转置并应用 LayerNorm + feat = feat.permute(0, 2, 1) # (batch, seq_len, d_model) + feat = self.layer_norms[i](feat) # 应用 LayerNorm + conv_features.append(feat) + + # Transformer分支处理 + trans_feat = self.transformer( + x, + src_key_padding_mask=padding_mask + ) # (batch, seq_len, d_model) + + # 特征拼接与融合 + combined = torch.cat(conv_features + [trans_feat], dim=-1) + fused = self.fusion(combined) # (batch, seq_len, d_model) + + return fused + +class VideoPlayPredictor(nn.Module): + def __init__(self, d_model=256, nhead=8): + super().__init__() + self.d_model = d_model + + # 特征嵌入 + self.time_embed = TimeEmbedding(embed_dim=64) + self.base_embed = nn.Linear(1 + 64, d_model) # 播放量 + 时间特征 + + # 编码器 + self.encoder = MultiScaleEncoder(d_model, d_model, nhead) + + # 时间感知预测头 + self.forecast_head = nn.Sequential( + nn.Linear(2 * d_model + 1, d_model * 4), # 关键修改:输入维度为 2*d_model +1 + nn.GELU(), + nn.Linear(d_model * 4, 1), + nn.ReLU() # 确保输出非负 + ) + + # 上下文提取器 + self.context_extractor = nn.LSTM( + input_size=d_model, + hidden_size=d_model, + num_layers=2, + bidirectional=True, + batch_first=True + ) + + # 初始化参数 + self._init_weights() + + def _init_weights(self): + for name, p in self.named_parameters(): + if 'forecast_head' in name: + if 'weight' in name: + nn.init.xavier_normal_(p, gain=1e-2) # 缩小初始化范围 + elif 'bias' in name: + nn.init.constant_(p, 0.0) + elif p.dim() > 1: + nn.init.xavier_uniform_(p) + + def forward(self, x_play, x_time_feat, padding_mask, forecast_span): + """ + x_play: 历史播放量 (batch, seq_len) + x_time_feat: 时间特征 (batch, seq_len, 5) + padding_mask: 填充掩码 (batch, seq_len) + forecast_span: 预测时间跨度 (batch, 1) + """ + batch_size = x_play.size(0) + + # 时间特征编码 + time_emb = self.time_embed(x_time_feat) # (batch, seq_len, 64) + + # 基础特征拼接 + base_feat = torch.cat([ + x_play.unsqueeze(-1), # (batch, seq_len, 1) + time_emb + ], dim=-1) # (batch, seq_len, 1+64) + + # 投影到模型维度 + embedded = self.base_embed(base_feat) # (batch, seq_len, d_model) + + # 编码特征 + encoded = self.encoder(embedded, padding_mask) # (batch, seq_len, d_model) + + # 提取上下文 + context, _ = self.context_extractor(encoded) # (batch, seq_len, d_model*2) + context = context.mean(dim=1) # (batch, d_model*2) + + # 融合时间跨度特征 + span_feat = torch.log1p(forecast_span) / 10 # 归一化 + combined = torch.cat([ + context, + span_feat + ], dim=-1) # (batch, d_model*2 + 1) + + # 最终预测 + pred = self.forecast_head(combined) # (batch, 1) + + return pred + +class MultiTaskWrapper(nn.Module): + """适配新数据结构的封装""" + def __init__(self, model): + super().__init__() + self.model = model + + def forward(self, batch): + return self.model( + batch['x_play'], + batch['x_time_feat'], + batch['padding_mask'], + batch['forecast_span'] + ) diff --git a/pred/train.py b/pred/train.py new file mode 100644 index 0000000..e73085f --- /dev/null +++ b/pred/train.py @@ -0,0 +1,76 @@ +import numpy as np +from torch.utils.data import DataLoader +from model import MultiTaskWrapper, VideoPlayPredictor +import torch +import torch.nn.functional as F +from dataset import VideoPlayDataset, collate_fn + +def train(model, dataloader, epochs=100, device='mps'): + optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4) + scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, epochs) + + steps = 0 + for epoch in range(epochs): + model.train() + total_loss = 0 + + for batch in dataloader: + optimizer.zero_grad() + + # movel whole batch to device + for k, v in batch.items(): + if isinstance(v, torch.Tensor): + batch[k] = v.to(device) + + # 前向传播 + pred = model(batch) + + y_play = batch['y_play'] + + real = np.expm1(y_play.cpu().detach().numpy()) + yhat = np.expm1(pred.cpu().detach().numpy()) + print("real", [int(real[0][0]), int(real[1][0])]) + print("yhat", [int(yhat[0][0]), int(yhat[1][0])], [float(pred.cpu().detach().numpy()[0][0]), float(pred.cpu().detach().numpy()[1][0])]) + + # 计算加权损失 + weights = torch.log1p(batch['forecast_span']) # 时间越长权重越低 + loss_per_sample = F.huber_loss(pred, y_play, reduction='none') + loss = (loss_per_sample * weights).mean() + + # 反向传播 + loss.backward() + torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) + optimizer.step() + + steps += 1 + + print(f"Epoch {epoch+1} | Step {steps} | Loss: {loss.item():.4f}") + + scheduler.step() + avg_loss = total_loss / len(dataloader) + print(f"Epoch {epoch+1:03d} | Loss: {avg_loss:.4f}") + +# 初始化模型 +device = 'mps' +model = MultiTaskWrapper(VideoPlayPredictor()) +model = model.to(device) + +data_dir = './data/pred' +publish_time_path = './data/pred/publish_time.csv' +dataset = VideoPlayDataset( + data_dir=data_dir, + publish_time_path=publish_time_path, + min_seq_len=2, # 至少2个历史点 + max_seq_len=350, # 最多350个历史点 + min_forecast_span=60, # 预测跨度1分钟到 + max_forecast_span=86400 * 10 # 10天 +) +dataloader = DataLoader( + dataset, + batch_size=2, + shuffle=True, + collate_fn=collate_fn, # 使用自定义collate函数 +) + +# 开始训练 +train(model, dataloader, epochs=20, device=device) \ No newline at end of file