# sparkastML: Training the Intention Classification Model

This is the model we use for intent recognition, using a **CNN architectur** and using an **Energy-based Model** to implement OSR (Open-set Recognition).

In this case, **positive samples** refer to data that can be classified into existing class, while **negative samples** are those does not belong to any of the existing class.

In [1]:
import json
import torch
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from sklearn.model_selection import train_test_split
from transformers import AutoTokenizer, AutoModel
import torch
import numpy as np
from scipy.spatial.distance import euclidean
from scipy.stats import weibull_min
from sklearn.preprocessing import normalize
import torch.nn.functional as F


In [2]:
model_name="microsoft/Phi-3-mini-4k-instruct"
DIMENSIONS = 128
tokenizer = AutoTokenizer.from_pretrained(model_name)

Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


## Load Data

We load the data from `data.json`, and also get the negative sample from the `noise.json`.

In [3]:
# Load data
with open('data.json', 'r') as f:
 data = json.load(f)

# Create map: class to index
class_to_idx = {cls: idx for idx, cls in enumerate(data.keys())}
idx_to_class = {idx: cls for cls, idx in class_to_idx.items()}

# Preprocess data, convert sentences to the format of (class idx, embedding)
def preprocess_data(data, embedding_map, tokenizer, max_length=64):
 dataset = []
 for label, sentences in data.items():
 for sentence in sentences:
 # Tokenize the sentence and convert tokens to embedding vectors
 tokens = tokenizer.tokenize(sentence)
 token_ids = tokenizer.convert_tokens_to_ids(tokens)
 embeddings = [embedding_map[token_id] for token_id in token_ids[:max_length]]
 embeddings = torch.tensor(embeddings)
 dataset.append((class_to_idx[label], embeddings))
 return dataset

# Load embedding map
embedding_map = torch.load('token_id_to_reduced_embedding.pt')

# Get preprocessed dataset
dataset = preprocess_data(data, embedding_map, tokenizer)

# Train-test split
train_data, val_data = train_test_split(dataset, test_size=0.2, random_state=42)

class TextDataset(Dataset):
 def __init__(self, data):
 self.data = data

 def __len__(self):
 return len(self.data)

 def __getitem__(self, idx):
 return self.data[idx]

 def collate_fn(self, batch):
 labels, embeddings = zip(*batch)
 labels = torch.tensor(labels)
 embeddings = pad_sequence(embeddings, batch_first=True)
 return labels, embeddings

train_dataset = TextDataset(train_data)
val_dataset = TextDataset(val_data)

train_loader = DataLoader(train_dataset, batch_size=24, shuffle=True, collate_fn=train_dataset.collate_fn)
val_loader = DataLoader(val_dataset, batch_size=24, shuffle=False, collate_fn=val_dataset.collate_fn)


 embeddings = torch.tensor(embeddings)


In [4]:
import torch
from torch.utils.data import Dataset, DataLoader

class NegativeSampleDataset(Dataset):
 def __init__(self, negative_samples):
 """
 negative_samples: List or array of negative sample embeddings or raw text
 """
 self.samples = negative_samples
 
 def __len__(self):
 return len(self.samples)
 
 def __getitem__(self, idx):
 return self.samples[idx]

 def collate_fn(self, batch):
 embeddings = pad_sequence(batch, batch_first=True)
 return embeddings

with open('noise.json', 'r') as f:
 negative_samples_list = json.load(f)

negative_embedding_list = []

for sentence in negative_samples_list:
 tokens = tokenizer.tokenize(sentence)
 token_ids = tokenizer.convert_tokens_to_ids(tokens)
 embeddings = [embedding_map[token_id] for token_id in token_ids[:64]]
 embeddings = torch.tensor(embeddings)
 negative_embedding_list.append(embeddings)

negative_dataset = NegativeSampleDataset(negative_embedding_list)
negative_loader = DataLoader(negative_dataset, batch_size=24, shuffle=True, collate_fn=negative_dataset.collate_fn)


## Implementating the Model

In [5]:
import torch.nn as nn
import torch.nn.functional as F

class TextCNN(nn.Module):
 def __init__(self, input_dim, num_classes):
 super(TextCNN, self).__init__()
 self.conv1 = nn.Conv1d(in_channels=input_dim, out_channels=DIMENSIONS, kernel_size=3, padding=1)
 self.conv2 = nn.Conv1d(in_channels=DIMENSIONS, out_channels=DIMENSIONS, kernel_size=4, padding=1)
 self.conv3 = nn.Conv1d(in_channels=DIMENSIONS, out_channels=DIMENSIONS, kernel_size=5, padding=2)
 
 self.bn1 = nn.BatchNorm1d(DIMENSIONS)
 self.bn2 = nn.BatchNorm1d(DIMENSIONS)
 self.bn3 = nn.BatchNorm1d(DIMENSIONS)
 
 self.dropout = nn.Dropout(0.5)
 self.fc = nn.Linear(DIMENSIONS * 3, num_classes)

 def forward(self, x):
 x = x.permute(0, 2, 1) # Change the input shape to (batch_size, embedding_dim, seq_length)
 
 x1 = F.relu(self.bn1(self.conv1(x)))
 x1 = F.adaptive_max_pool1d(x1, output_size=1).squeeze(2)
 
 x2 = F.relu(self.bn2(self.conv2(x)))
 x2 = F.adaptive_max_pool1d(x2, output_size=1).squeeze(2)
 
 x3 = F.relu(self.bn3(self.conv3(x)))
 x3 = F.adaptive_max_pool1d(x3, output_size=1).squeeze(2)
 
 x = torch.cat((x1, x2, x3), dim=1)
 x = self.dropout(x)
 x = self.fc(x)
 return x

# Initialize model
input_dim = DIMENSIONS
num_classes = len(class_to_idx)
model = TextCNN(input_dim, num_classes)


## Energy-based Models

In [6]:
def energy_score(logits):
 # Energy score is minus logsumexp
 return -torch.logsumexp(logits, dim=1)

def generate_noise(batch_size, seq_length ,input_dim, device):
 # Generate a Gaussian noise
 return torch.randn(batch_size, seq_length, input_dim).to(device)


## Training

In [7]:
import torch.optim as optim

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=8e-4)

from torch.utils.tensorboard import SummaryWriter
import tensorboard
writer = SummaryWriter()

def train_energy_model(model, train_loader, negative_loader, criterion, optimizer, num_epochs=10):
 model.train()
 device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
 model.to(device)
 
 negative_iter = iter(negative_loader)
 
 for epoch in range(num_epochs):
 total_loss = 0
 for batch_idx, (labels, embeddings) in enumerate(train_loader):
 embeddings = embeddings.to(device)
 labels = labels.to(device)
 
 batch_size = embeddings.size(0)
 
 # ---------------------
 # 1. Positive sample
 # ---------------------
 optimizer.zero_grad()
 outputs = model(embeddings) # logits from the model
 
 class_loss = criterion(outputs, labels)
 
 # Energy of positive sample
 known_energy = energy_score(outputs)
 energy_loss_known = known_energy.mean()
 
 # ------------------------------------
 # 2. Negative sample - Random Noise
 # ------------------------------------
 noise_embeddings = torch.randn_like(embeddings).to(device)
 noise_outputs = model(noise_embeddings)
 noise_energy = energy_score(noise_outputs)
 energy_loss_noise = F.relu(1 - noise_energy).mean() # For the energy of noise, bigger is better 
 
 # ------------------------------------
 # 3. Negative sample - custom corpus
 # ------------------------------------
 
 try:
 negative_samples = next(negative_iter)
 except StopIteration:
 negative_iter = iter(negative_loader)
 negative_samples = next(negative_iter)
 negative_samples = negative_samples.to(device)
 negative_outputs = model(negative_samples)
 negative_energy = energy_score(negative_outputs)
 energy_loss_negative = F.relu(1 - negative_energy).mean() # For the energy of noise, bigger is better 
 
 # -----------------------------
 # 4. Overall Loss calculation
 # -----------------------------
 total_energy_loss = energy_loss_known + energy_loss_noise + energy_loss_negative
 total_loss_batch = class_loss + total_energy_loss * 0.1 + 10

 writer.add_scalar("Engergy Loss", total_energy_loss, epoch)
 writer.add_scalar("Loss", total_loss_batch, epoch)
 writer.add_scalar("Norm Loss", torch.exp(total_loss_batch * 0.003) * 10 , epoch)
 
 total_loss_batch.backward()
 optimizer.step()
 
 total_loss += total_loss_batch.item()
 
 avg_loss = total_loss / len(train_loader)
 print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}')

train_energy_model(model, train_loader, negative_loader, criterion, optimizer, num_epochs=50)
writer.flush()


Epoch [1/50], Loss: 12.5108
Epoch [2/50], Loss: 10.7305
Epoch [3/50], Loss: 10.2943
Epoch [4/50], Loss: 9.9350
Epoch [5/50], Loss: 9.7991
Epoch [6/50], Loss: 9.6443
Epoch [7/50], Loss: 9.4762
Epoch [8/50], Loss: 9.4637
Epoch [9/50], Loss: 9.3025
Epoch [10/50], Loss: 9.1719
Epoch [11/50], Loss: 9.0632
Epoch [12/50], Loss: 8.9741
Epoch [13/50], Loss: 8.8487
Epoch [14/50], Loss: 8.6565
Epoch [15/50], Loss: 8.5830
Epoch [16/50], Loss: 8.4196
Epoch [17/50], Loss: 8.2319
Epoch [18/50], Loss: 8.0655
Epoch [19/50], Loss: 7.7140
Epoch [20/50], Loss: 7.6921
Epoch [21/50], Loss: 7.3375
Epoch [22/50], Loss: 7.2297
Epoch [23/50], Loss: 6.8833
Epoch [24/50], Loss: 6.8534
Epoch [25/50], Loss: 6.4557
Epoch [26/50], Loss: 6.1365
Epoch [27/50], Loss: 5.8558
Epoch [28/50], Loss: 5.5030
Epoch [29/50], Loss: 5.1604
Epoch [30/50], Loss: 4.7742
Epoch [31/50], Loss: 4.5958
Epoch [32/50], Loss: 4.0713
Epoch [33/50], Loss: 3.8872
Epoch [34/50], Loss: 3.5240
Epoch [35/50], Loss: 3.3115
Epoch [36/50], Loss: 2.566

## Evalutation

In [8]:
ENERGY_THRESHOLD = -3

In [9]:
from sklearn.metrics import f1_score, accuracy_score, precision_recall_fscore_support

def evaluate_energy_model(model, known_loader, unknown_loader, energy_threshold):
 model.eval()
 device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
 
 all_preds = []
 all_labels = []
 
 # Evaluate positive sample
 with torch.no_grad():
 for labels, embeddings in known_loader:
 embeddings = embeddings.to(device)
 logits = model(embeddings)
 energy = energy_score(logits)
 
 preds = (energy <= energy_threshold).long()
 all_preds.extend(preds.cpu().numpy())
 all_labels.extend([1] * len(preds)) # Positive sample labeled as 1
 
 # Evaluate negative sample
 with torch.no_grad():
 for embeddings in unknown_loader:
 embeddings = embeddings.to(device)
 logits = model(embeddings)
 energy = energy_score(logits)
 
 preds = (energy <= energy_threshold).long()
 all_preds.extend(preds.cpu().numpy())
 all_labels.extend([0] * len(preds)) # Negative sample labeled as 1
 
 precision, recall, f1, _ = precision_recall_fscore_support(all_labels, all_preds, average='binary')
 accuracy = accuracy_score(all_labels, all_preds)

 print(f'Accuracy: {accuracy:.4f}')
 print(f'Precision: {precision:.4f}')
 print(f'Recall: {recall:.4f}')
 print(f'F1 Score: {f1:.4f}')

evaluate_energy_model(model, val_loader, negative_loader, ENERGY_THRESHOLD)

Accuracy: 0.9315
Precision: 1.0000
Recall: 0.9254
F1 Score: 0.9612


In [20]:
# Save the model
torch.save(model, "model.pt")

## Inference

In [19]:
def predict_with_energy(model, sentence, embedding_map, tokenizer, idx_to_class, energy_threshold, max_length=64):
 model.eval()
 tokens = tokenizer.tokenize(sentence)
 token_ids = tokenizer.convert_tokens_to_ids(tokens)
 embeddings = [embedding_map[token_id] for token_id in token_ids[:max_length]]
 embeddings = torch.tensor(embeddings).unsqueeze(0) # Add batch dimension
 
 with torch.no_grad():
 logits = model(embeddings)
 probabilities = F.softmax(logits, dim=1)
 max_prob, predicted = torch.max(probabilities, 1)
 
 # Calculate energy score
 energy = energy_score(logits)

 # If energy > threshold, consider the input as unknown class
 if energy.item() > energy_threshold:
 return ["Unknown", max_prob.item(), energy.item()]
 else:
 return [idx_to_class[predicted.item()], max_prob.item(), energy.item()]

# Example usage:
sentence = "weather today"
energy_threshold = ENERGY_THRESHOLD
predicted = predict_with_energy(model, sentence, embedding_map, tokenizer, idx_to_class, energy_threshold)
print(f'Predicted: {predicted}')


Predicted: ['weather', 0.9989822506904602, -8.016249656677246]
