原理:余弦相似度是通过计算两个向量的夹角余弦值来衡量它们的相似度。公式为
其中 A⋅B 是向量 A 和 B 的点积
$\displaystyle \text{similarity} = \dfrac{x_1 \cdot x_2}{\max(\Vert x_1 \Vert _2, \epsilon) \cdot \max(\Vert x_2 \Vert _2, \epsilon)}$
numpy·1维
import numpy as np
# 定义两个向量
A = np.array([1, 2, 3])
B = np.array([4, 5, 6])
# 计算点积
dot_product = np.dot(A, B)
# 计算模长
norm_A = np.linalg.norm(A)
norm_B = np.linalg.norm(B)
# 计算余弦相似度
cosine_similarity = dot_product / (norm_A * norm_B)
print("余弦相似度:", cosine_similarity)
pytorch·1维
import torch
# 定义两个向量
A = torch.tensor([1.0, 2.0, 3.0])
B = torch.tensor([4.0, 5.0, 6.0])
# 计算余弦相似度
cosine_similarity = torch.cosine_similarity(A, B, dim=0)
print("余弦相似度:", cosine_similarity.item())
import torch
import torch.nn.functional as F
# 定义两个向量
A = torch.tensor([1.0, 2.0, 3.0])
B = torch.tensor([4.0, 5.0, 6.0])
# 计算余弦相似度
cosine_similarity = F.cosine_similarity(A, B, dim=0)
print("余弦相似度:", cosine_similarity.item())
pytorch·2维 import torch # 定义两个向量 A = torch.tensor([[1.0, 2.0, 3.0]]) B = torch.tensor([[1.0, 2.0, 3.0],[4.0, 5.0, 6.0]]) torch.cosine_similarity(A,B,dim=1)
tensor([1.0000, 0.9746])
如果一个矩阵只有一个元素,会进行广播
import torch # 定义两个向量 A = torch.tensor([[1.0, 2.0, 3.0],[4.0, 5.0, 6.0]]) B = torch.tensor([[1.0, 2.0, 3.0],[4.0, 5.0, 6.0]]) torch.cosine_similarity(A,B,dim=1)
tensor([1.0000, 1.0000])
如果两个矩阵同shape,则一对一运算
|
原理:欧氏距离是衡量两个向量在空间中距离的常用方法。公式为
numpy
import numpy as np
# 定义两个向量
A = np.array([1, 2, 3])
B = np.array([4, 5, 6])
# 计算欧氏距离
euclidean_distance = np.linalg.norm(A - B)
print("欧氏距离:", euclidean_distance)
|
原理:曼哈顿距离是计算两个向量在各维度差的绝对值之和。公式为
numpy
import numpy as np
# 定义两个向量
A = np.array([1, 2, 3])
B = np.array([4, 5, 6])
# 计算曼哈顿距离
manhattan_distance = np.sum(np.abs(A - B))
print("曼哈顿距离:", manhattan_distance)
|
|
计算矩阵x1中每个行向量与矩阵x2中每个行向量的相似度,并提取x2中最相似行的相似度大小 numpy实现
#numpy实现
import numpy as np
# 定义两个矩阵
x1 = np.array([[1, 2, 3], [4, 5, 6]])
x2 = np.array([[7, 8, 9], [1, 2, 3]])
# 计算 x1 和 x2 的行向量的余弦相似度矩阵
def cosine_similarity_matrix(x1, x2):
# 计算 x1 和 x2 的行向量的模
norm_x1 = np.linalg.norm(x1, axis=1, keepdims=True)
norm_x2 = np.linalg.norm(x2, axis=1, keepdims=True)
# 计算 x1 和 x2 的行向量的点积
dot_product = np.dot(x1, x2.T)
# 计算余弦相似度矩阵
cosine_sim_matrix = dot_product / (norm_x1 * norm_x2.T)
return cosine_sim_matrix
cosine_sim_matrix = cosine_similarity_matrix(x1, x2)
# 提取 x1 中每个行向量与 x2 中最相似行的相似度大小
max_similarities = np.max(cosine_sim_matrix, axis=1)
print("最大相似度:", max_similarities)
x1中的每一行与x2中的每一列相乘再相加,这正是矩阵乘法的基本运算,也正好符合题目的要求
最大相似度: tensor([1.0000, 0.9982])
pytorch实现
import torch
# 定义两个矩阵
x1 = torch.tensor([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]])
x2 = torch.tensor([[7.0, 8.0, 9.0], [1.0, 2.0, 3.0]])
# 计算 x1 和 x2 的行向量的余弦相似度矩阵,x1要让行向量成为(1,n)的矩阵,于是x2为了对上x1中的元素就在0位置上新增维度
cosine_sim_matrix = torch.cosine_similarity(x1.unsqueeze(1), x2.unsqueeze(0), dim=2)
# 提取 x1 中每个行向量与 x2 中最相似行的相似度大小
max_similarities, _ = torch.max(cosine_sim_matrix, dim=1)
print("最大相似度:", max_similarities)
最大相似度: tensor([1.0000, 0.9982])
封装示例
import torch
from tpf.alg.simi import MaxSimilarity
X = torch.tensor([
[0.3745, 0.9507, 0.7320],
[0.5987, 0.1560, 0.1560],
[0.0581, 0.8662, 0.6011],
[0.7081, 0.0206, 0.9699],
[0.8324, 0.2123, 0.1818],
[0.1834, 0.3042, 0.5248],
[0.4319, 0.2912, 0.6119]]).float()
x1 = X[:2,:]
x2 = X[2:,:]
model = MaxSimilarity()
sim = model(x1,x2)
sim #tensor([0.9684, 0.9992])
数据非torch时会转为torch
import torch
import numpy as np
from tpf.alg.simi import MaxSimilarity
X = np.array([
[0.3745, 0.9507, 0.7320],
[0.5987, 0.1560, 0.1560],
[0.0581, 0.8662, 0.6011],
[0.7081, 0.0206, 0.9699],
[0.8324, 0.2123, 0.1818],
[0.1834, 0.3042, 0.5248],
[0.4319, 0.2912, 0.6119]])
x1 = X[:2,:]
x2 = X[2:,:]
model = MaxSimilarity()
sim = model(x1,x2)
sim #tensor([0.9684, 0.9992])
|
import torch
from tpf.alg.sim import TopSim
X = torch.tensor([
[0.3745, 0.9507, 0.7320],
[0.5987, 0.1560, 0.1560],
[0.0581, 0.8662, 0.6011],
[0.7081, 0.0206, 0.9699],
[0.8324, 0.2123, 0.1818],
[0.1834, 0.3042, 0.5248],
[0.4319, 0.2912, 0.6119]]).float()
x1 = X[:2,:]
x2 = X[2:,:]
model = TopSim()
sim,index = model(x1,x2,n=2)
sim,index
(tensor([
[0.9684, 0.9316],
[0.9992, 0.7791]]),
tensor([
[0, 3],
[2, 4]]))
sim_data = x2[index]
sim_data[0][0]
tensor([0.0581, 0.8662, 0.6011])
#x1中索引为1的数据在x2中最相似的数据
sim_data[1][0]
tensor([0.8324, 0.2123, 0.1818])
import torch
X = torch.tensor([
[0.3745, 0.9507, 0.7320],
[0.5987, 0.1560, 0.1560],
[0.0581, 0.8662, 0.6011],
[0.7081, 0.0206, 0.9699],
[0.8324, 0.2123, 0.1818],
[0.1834, 0.3042, 0.5248],
[0.4319, 0.2912, 0.6119]]).float()
x1 = X[:2,:]
x2 = X[2:,:]
# 计算余弦相似度矩阵
cosine_sim_matrix = torch.cosine_similarity(x1.unsqueeze(1), x2.unsqueeze(0), dim=2)
# 设置要获取的前n个最相似项
n = 2 # 例如获取前2个最相似的
# 获取前n个最相似的相似度值和索引
topk_similarities, topk_indices = torch.topk(cosine_sim_matrix, k=n, dim=1)
# 输出结果
print("余弦相似度矩阵:")
print(cosine_sim_matrix)
print("\n每个x1行向量与x2中前{}个最相似行向量的相似度:".format(n))
print(topk_similarities)
print("\n对应的x2中的行索引:")
print(topk_indices)
余弦相似度矩阵:
tensor([[0.9684, 0.6589, 0.5859, 0.9316, 0.8777],
[0.3914, 0.7548, 0.9992, 0.5914, 0.7791]])
每个x1行向量与x2中前2个最相似行向量的相似度:
tensor([[0.9684, 0.9316],
[0.9992, 0.7791]])
对应的x2中的行索引:
tensor([[0, 3],
[2, 4]])
|
# 计算距离,每个样本与所有样本的距离 distance_matrix = torch.sum((x1-x2) ** 2, dim=1) index = torch.argmin(distance_matrix, dim=0) |
import numpy as np
import torch
from torch import nn
class DLCorr(nn.Module):
def __init__(self):
"""相似推荐,从指定数据集寻找最自己最接近的数据
- 求数据第1个元素与后面元素的相似度
return
-------------------------
返回与自己最相近的数据的索引
"""
super().__init__()
def forward(self, X):
"""正向传播
- 计算目标数据中与自己相似度最高的数据的索引
"""
x1 = X[:1,:]
x2 = X[1:,:]
# 计算距离,每个样本与所有样本的距离
distance_matrix = torch.sum((x1-x2) ** 2, dim=1)
index = torch.argmin(distance_matrix, dim=0)
return index
X = torch.tensor([
[0.3745, 0.9507, 0.7320],
[0.5987, 0.1560, 0.1560],
[0.0581, 0.8662, 0.6011],
[0.7081, 0.0206, 0.9699],
[0.8324, 0.2123, 0.1818],
[0.1834, 0.3042, 0.5248],
[0.4319, 0.2912, 0.6119]]).float()
model = DLCorr()
index = model(X)
X[1:][index]
tensor([0.0581, 0.8662, 0.6011])
下面的h0要能跑通模型,大部分模型批次维度给1就行
但这个自定义模型是第1个元素与后面的元素对比,那么就至少需要2个元素
所以具体的形状还得看模型设计
#[B,C,H,W],trace需要通过实际运行一遍模型导出其静态图,故需要一个输入数据
h0 = torch.zeros(3, 3)
# trace方式,在模型设计时不要用for循环,
# 能在模型外完成的数据操作不要在模型中写,
# 不用inplace等高大上的语法,保持简单,简洁,否则onnx可能无法完全转换过去
torch.onnx.export(
model=model,
# model的参数,就是原来y_out = model(args)的args在这里指定了
# 有其shape能让模型运行一次就行,不需要真实数据
args=(h0,),
# 储存的文件路径
f="model_knn2.onnx",
# 导出模型参数,默认为True
export_params = True,
# eval推理模式,dropout,BatchNorm等超参数固定或不生效
training=torch.onnx.TrainingMode.EVAL,
# 打印详细信息
verbose=True,
# 为输入和输出节点指定名称,方便后面查看或者操作
input_names=["input1"],
output_names=["output1"],
# 这里的opset,指各类算子以何种方式导出,对应于symbolic_opset11
opset_version=11,
# batch维度是动态的,其他的避免动态
dynamic_axes={
"input1": {0: "batch"},
"output1": {0: "batch"},
}
)
import onnx
import onnxruntime
import numpy as np
model_onnx = onnx.load("model_knn2.onnx") # 加载onnx模型
onnx.checker.check_model(model_onnx) # 验证onnx模型是否加载成功
# 创建会话
session = onnxruntime.InferenceSession("model_knn2.onnx",providers=[ 'CPUExecutionProvider'])
ort_input = {session.get_inputs()[0].name: np.array(X).astype(np.float32)} #这里要转为Numpy数组
output_name = session.get_outputs()[0].name
ort_output = session.run([output_name], ort_input)
index = ort_output[0]
X[1:][index]
tensor([0.0581, 0.8662, 0.6011])
|
返回与第1条数据最相似数据的相似度大小及索引
$ \text{similarity} = \dfrac{x_1 \cdot x_2}{\max(\Vert x_1 \Vert _2, \epsilon) \cdot \max(\Vert x_2 \Vert _2, \epsilon)}$
import numpy as np
import torch
from torch import nn
import torch.nn.functional as F
class DLCorr(nn.Module):
def __init__(self):
"""相似推荐,从指定数据集寻找最自己最接近的数据
- 返回与第1条数据最相似数据的相似度大小及索引号
return
-------------------------
返回与自己最相近的数据的索引
"""
super().__init__()
def forward(self, X):
"""正向传播
- 计算目标数据中与自己相似度最高的数据的索引
"""
x1 = X[:1,:]
x2 = X[1:,:]
# 计算余弦相似度,[0,1]越大相似度越高
cosine_sim = F.cosine_similarity(x1, x2, dim=1)
index = torch.argmax(cosine_sim, dim=0)
return cosine_sim[index],index
X = torch.tensor([
[0.3745, 0.9507, 0.7320],
[0.5987, 0.1560, 0.1560],
[0.0581, 0.8662, 0.6011],
[0.7081, 0.0206, 0.9699],
[0.8324, 0.2123, 0.1818],
[0.1834, 0.3042, 0.5248],
[0.4319, 0.2912, 0.6119]]).float()
model = DLCorr()
cosine_sim,index = model(X)
X[1:][index] #tensor([0.0581, 0.8662, 0.6011])
#[B,C,H,W],trace需要通过实际运行一遍模型导出其静态图,故需要一个输入数据
h0 = torch.zeros(3, 3)
# trace方式,在模型设计时不要用for循环,
# 能在模型外完成的数据操作不要在模型中写,
# 不用inplace等高大上的语法,保持简单,简洁,否则onnx可能无法完全转换过去
torch.onnx.export(
model=model,
# model的参数,就是原来y_out = model(args)的args在这里指定了
# 有其shape能让模型运行一次就行,不需要真实数据
args=(h0,),
# 储存的文件路径
f="model_knn2.onnx",
# 导出模型参数,默认为True
export_params = True,
# eval推理模式,dropout,BatchNorm等超参数固定或不生效
training=torch.onnx.TrainingMode.EVAL,
# 打印详细信息
verbose=True,
# 为输入和输出节点指定名称,方便后面查看或者操作
input_names=["input1"],
output_names=["output1"],
# 这里的opset,指各类算子以何种方式导出,对应于symbolic_opset11
opset_version=11,
# batch维度是动态的,其他的避免动态
dynamic_axes={
"input1": {0: "batch"},
"output1": {0: "batch"},
}
)
import onnx
import onnxruntime
import numpy as np
model_onnx = onnx.load("model_knn2.onnx") # 加载onnx模型
onnx.checker.check_model(model_onnx) # 验证onnx模型是否加载成功
# 创建会话
session = onnxruntime.InferenceSession("model_knn2.onnx",providers=[ 'CPUExecutionProvider'])
ort_input = {session.get_inputs()[0].name: np.array(X).astype(np.float32)} #这里要转为Numpy数组
ort_output = session.run(None, ort_input)
cosine_sim = ort_output[0] index = ort_output[1] cosine_sim,X[1:][index] (array(0.9683632, dtype=float32), tensor([0.0581, 0.8662, 0.6011])) |
|
|
|
|
import torch from torch import nn from tpf.mlib.seq import SeqOne a = torch.randn(64,512) #模拟2维数表 model = SeqOne(seq_len=a.shape[1], out_features=2) model(a)[:3]
tensor([[-0.2379, -0.3309],
[-0.6055, 0.4042],
[ 0.2394, 0.1917]], grad_fn=SliceBackward0)
------------------------------------------------------------------------
|
from T import train from T import X_test from tpf.mlib.seq import SeqOne model = SeqOne(seq_len=X_test.shape[1], out_features=2) # model(torch.tensor(X_test).float()[:3])[:3] train(model) ------------------------------------------------------------------------ |
import numpy as np
import pandas as pd
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
import torch
from torch import nn
# 加载乳腺癌数据集
data = load_breast_cancer()
X = pd.DataFrame(data.data, columns=data.feature_names)
y = pd.DataFrame(data.target,columns=['target'])
X = torch.tensor(data.data).float()
y = torch.tensor(data.target).reshape(-1,1).float()
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
"""
构建数据集
"""
import numpy as np
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import torch
from torch import nn
from torch.nn import functional as F
class MyDataSet(Dataset):
def __init__(self,X,y):
"""
构建数据集
"""
self.X = X
self.y = y.reshape(-1)
# self.y = y.reshape(-1,1)
def __len__(self):
return len(self.X)
def __getitem__(self, idx):
x = self.X[idx]
y = self.y[idx]
return torch.tensor(data=x).float(), torch.tensor(data=y).long()
train_dataset = MyDataSet(X=X_train, y=y_train)
train_dataloader = DataLoader(dataset=train_dataset, shuffle=True, batch_size=128)
test_dataset = MyDataSet(X=X_test, y=y_test)
test_dataloader = DataLoader(dataset=test_dataset, shuffle=False, batch_size=256)
# 定义训练轮次
epochs = 20
device = "cuda:0" if torch.cuda.is_available() else "cpu"
# 定义过程监控函数
def get_acc(dataloader=train_dataloader, model=None):
accs = []
model.to(device=device)
model.eval()
with torch.no_grad():
for X,y in dataloader:
X=X.to(device=device)
y=y.to(device=device)
y_pred = model(X)
y_pred = y_pred.argmax(dim=1)
acc = (y_pred == y).float().mean().item()
accs.append(acc)
return np.array(accs).mean()
# 定义训练过程
def train(model,
epochs=epochs,
train_dataloader=train_dataloader,
test_dataloader=test_dataloader):
model.to(device=device)
# 定义优化器
optimizer = torch.optim.SGD(params=model.parameters(), lr=1e-3)
# 定义损失函数
loss_fn = nn.CrossEntropyLoss()
for epoch in range(1, epochs+1):
print(f"正在进行第 {epoch} 轮训练:")
model.train()
for X,y in train_dataloader:
X=X.to(device=device)
y=y.to(device=device)
# 正向传播
y_pred = model(X)
# 清空梯度
optimizer.zero_grad()
# 计算损失
loss = loss_fn(y_pred, y)
# 梯度下降
loss.backward()
# 优化一步
optimizer.step()
print(f"train_acc: {get_acc(dataloader=train_dataloader,model=model)}, test_acc: {get_acc(dataloader=test_dataloader,model=model)}")
|
|
简易示例 import torch from tpf.mlib.seq import SeqOne from tpf.mlib import ModelPre as mp a = torch.randn(3,512) #模拟2维数表 model = SeqOne(seq_len=a.shape[1], out_features=2) file_path = "seqone.dict" torch.save(model.state_dict(), file_path) model.load_state_dict(torch.load(file_path,weights_only=True)) 封装示例:模型保存必须以.dict后缀,否则无法使用该API import torch from tpf.mlib.seq import SeqOne from tpf.mlib import ModelPre as mp a = torch.randn(3,512) #模拟2维数表 model = SeqOne(seq_len=a.shape[1], out_features=2) file_path = "seqone.dict" y_pred = mp.predict_proba(a,model_path=file_path,model=model) y_pred array([-0.03084982, 0.01873165, -0.01635017], dtype=float32) 参数 - 数据 - 模型参数 - 模型定义 batch_size默认10W,为0表示全量预测 y_pred = mp.predict_proba(a,model_path=file_path,model=model,batch_size=1000000) y_pred array([0.03419267, 0.0257818 , 0.04169804], dtype=float32) |
|
|
import os
import time
import numpy as np
import torch
from torch import nn
from tpf import pkl_save,pkl_load
BASE_DIR = "/wks/datasets/ibm_aml"
file_pkl = "ibm_aml_1.pkl"
# file_path = os.path.join(BASE_DIR, file_pkl)
file_path = file_pkl
X_train, X_test, y_train, y_test = pkl_load(file_path=file_path)
len(y_test[y_test==1]),len(y_train[y_train==1]),len(X_train)
from torch.utils.data import Dataset from torch.utils.data import DataLoader from tpf.dl import DataSet11 from tpf.dl import T11
from tpf.mlib.seq import SeqOne
model = SeqOne(seq_len=X_test.shape[1], out_features=2)
T11.train(model, X_train, y_train, X_test, y_test,
epochs=50000,
batch_size=512,
learning_rate=1e-4,
model_param_path="model_params_12.pkl.dict",
log_file="/tmp/train.log",
per_epoch=100)
ibm_aml_1.pkl
- 去除了时间列,保留7列转化为数字
- 也就是不考虑数据之间的序列特征
SeqOne
- 提取一条数据多个特征之间的关系,也不考虑多条数据之间的关系
该模型的的精度最多达到70% 正在进行第 epoch= 1 轮训练...每轮次批次个数 =5,batch_size=512 [2025-06-21 21:04:50] train_acc: 0.697265625, test_acc: 0.68515625, good_acc:0.741796875
|
|
|
|
|
|
|
|
|
从决策树中提取出叶子节点中数据分布集中的树分支
import numpy as np
import pandas as pd
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
# 加载乳腺癌数据集
data = load_breast_cancer()
X = pd.DataFrame(data.data, columns=data.feature_names)
y = pd.DataFrame(data.target,columns=['label'])
y = y['label']
from sklearn.tree import DecisionTreeClassifier
clf = DecisionTreeClassifier(
max_depth=3,
min_samples_leaf=30
)
clf = clf.fit(X, y)
import matplotlib.pyplot as plt
import dtreeviz
import warnings
warnings.filterwarnings("ignore")
viz_model = dtreeviz.model(clf,
X_train=X,
y_train=y,
target_name='label',
feature_names=X.columns,
class_names={0:'good',1:'bad'},
)
v = viz_model.view(fancy=False)
v.show()
v.save("img.svg")
---------------------------------------------------------------------------
|
import numpy as np
import pandas as pd
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
# 加载乳腺癌数据集
data = load_breast_cancer()
X = pd.DataFrame(data.data, columns=data.feature_names)
y = pd.DataFrame(data.target,columns=['target'])
Y = y['target']
from sklearn import tree
clf = tree.DecisionTreeClassifier(
max_depth=3,
min_samples_leaf=50
)
clf = clf.fit(X, Y)
import matplotlib.pyplot as plt
#设置图片的大小,想要清晰的可以设置的大点
plt.figure(figsize=(8,8),dpi=1000)
tree.plot_tree(clf)
plt.show()
# 保存矢量图格式(SVG)
plt.savefig('b.svg', format='svg', bbox_inches='tight')
|
|
|
|
|
|
|
风控策略的自动化生成-利用决策树分分钟生成上千条策略