|
```
import pandas as pd
from tpf.mlib import ModelEval as me
y_prob_file = "/ai/data/tmp/y_prob.csv"
df = pd.read_csv(y_prob_file)
df
```
```
y_probs = df["y_prob"].to_numpy()
y_label = df["y_label"].to_numpy()
count_max_list, interval_width =me.interval_distribution(y_probs,y_label,interval=10,)
```
|
|
```
@staticmethod
def interval_distribution(y_probs, y_test, interval=100):
"""计算y_test在y_probs区间内的分布,按区间最小值升序排列
params
-------------------------------------------------------
- y_probs: 模型概率输出,1维数组,每个元素为正样本概率
- y_test: 真实标签,1维数组,二分类标签(0或1)
- interval: 区间数量,默认为100,即将概率范围划分为100个等宽区间
return
-------------------------
- count_max_list: 三元组列表,每个元素格式为 (zero_count, positive_count, interval_min)
- zero_count: 该区间内标签为0的样本数量
- positive_count: 该区间内标签为1的样本数量
- interval_min: 该区间的最小概率值
- 列表按interval_min升序排列,仅包含有正样本的区间
- interval_width: 每个区间的宽度值,计算方式为 (max_val - min_val) / interval
实际逻辑说明:
1. 将y_probs的取值范围[min_val, max_val]等分为interval个区间
2. 统计每个区间内y_test中标签为1和标签为0的样本数量
3. 只返回有正样本(count > 0)的区间信息
4. 按区间最小值(interval_min)升序排列结果
examples
--------------------------------------------------------
count_max_list, interval_width = ModelEval.interval_distribution(y_probs, y_test)
# count_max_list: [(zero_count1, positive_count1, interval_min1),
# (zero_count2, positive_count2, interval_min2), ...]
# interval_width: 浮点数,表示每个区间的宽度
for zero_count, positive_count, interval_min in count_max_list:
print(f"区间[{interval_min:.3f}, {interval_min+interval_width:.3f}): "
f"正样本{positive_count}个, 负样本{zero_count}个")
"""
if y_probs.ndim != 1:
raise Exception(f"y_probs必须为1维,实际为{y_probs.ndim}维")
min_val = np.min(y_probs)
max_val = np.max(y_probs)
interval_width = (max_val - min_val) / interval
result = []
for i in range(interval):
interval_min = min_val + i * interval_width
interval_max = min_val + (i + 1) * interval_width
# 找到该区间对应的y_probs的索引
interval_indices = np.where((y_probs >= interval_min) & (y_probs < interval_max))[0]
# 统计相同索引位置上y_test中为1的个数
count = np.sum(y_test[interval_indices] == 1)
result.append((round(float(interval_min),4), round(float(interval_max),4), int(count)))
count_max_list = sorted([(np.sum(y_test[interval_indices] == 0), count, interval_min)
for interval_min, interval_max, count in result
if count > 0
for interval_indices in [np.where((y_probs >= interval_min) & (y_probs < interval_max))[0]]],
key=lambda x: x[2])
count_max_list = [(int(zero_count), int(count), round(float(interval_min), 4))
for zero_count, count, interval_min in count_max_list]
return count_max_list, interval_width
```
|
|
|
|
|
|
|
|
```
@staticmethod
def eval_num_interval(y_probs, y_test, interval=10):
"""
按y_probs值从高到低取固定个数进行统计分析
与eval_prob_interval方法不同,此方法按y_probs中值从高到低取个数分组,
比如y_probs中值最大为0.7,从0.7开始往下数interval个样本,然后统计对应
y_test中标签为1的个数
参数:
- y_probs: 模型概率输出,1维数组,每个元素为正样本概率
- y_test: 真实标签,1维数组
- yuzhi: 阈值(此方法中不使用,保留参数一致性)
- interval: 每组样本数量,默认为10
返回:
- 四元组列表 [(min_prob, max_prob, count_ones, cumulative_count), ...]
- min_prob: 当前组的最小概率值
- max_prob: 当前组的最大概率值
- count_ones: 当前组中y_test标签为1的个数
- cumulative_count: 当前组的累积样本数(第1组为interval,第2组为2*interval,以此类推)
"""
# 输入验证
if y_probs.ndim != 1:
raise Exception(f"y_probs必须为1维,实际为{y_probs.ndim}维")
if y_test.ndim != 1:
raise Exception(f"y_test必须为1维,实际为{y_test.ndim}维")
if len(y_probs) != len(y_test):
raise Exception(f"y_probs和y_test长度必须相同,实际分别为{len(y_probs)}和{len(y_test)}")
if interval <= 0:
raise Exception(f"interval必须大于0,实际为{interval}")
# 获取原始索引,并按y_probs值降序排列
sorted_indices = np.argsort(y_probs)[::-1] # 降序排列的索引
sorted_y_probs = y_probs[sorted_indices] # 排序后的概率值
sorted_y_test = y_test[sorted_indices] # 对应的真实标签
total_samples = len(y_probs)
result = []
# 按interval大小分组处理
for start_idx in range(0, total_samples, interval):
end_idx = min(start_idx + interval, total_samples)
# 当前组的概率范围
current_probs = sorted_y_probs[start_idx:end_idx]
current_labels = sorted_y_test[start_idx:end_idx]
min_prob = float(np.min(current_probs))
max_prob = float(np.max(current_probs))
# 统计当前组中标签为1的个数
count_ones = int(np.sum(current_labels == 1))
# 累积样本数(当前组的结束位置)
cumulative_count = end_idx
result.append((min_prob, max_prob, count_ones, cumulative_count))
return result
```
|
|
|
|
|
|
|
|
|
|
```
@staticmethod
def eval_prob_interval( y_probs, y_test, interval=0.01):
"""
1. y_probs中值大于yuzhi的预测结果为1 否则为0;但实际上y_probs的值是一个概率,
2. 现在对y_probs按interval划分区间,比如y_probs的最小值为min,最大值为max,在[min,min+0.1]的范围内,y_test中1的个数是多少,y_probs在[min+0.1,min+0.2]的概率区中,y_test中1的个数是多少,以此类推,直到y_probs的最小值和最大值之间的区间,返回一个列表,列表中每个元素为[min,min+interval]的概率区间中,y_test中1的个数是多少
3. 返回一个列表,列表中每个元素为[min,min+interval]的概率区间中,y_test中1的个数是多少
4. 首先划定y_probs的[min,min+interval]区间,然后求y_probs在这个区间中的索引index,然后统计y_test[index]中1的个数,通过index一一对应
4. 最后的结果是一个四元组列表,[(min, max, count_ones, count_total)],按count_ones降序排列
"""
# 输入验证
if y_probs.ndim != 1:
raise Exception(f"y_probs必须为1维,实际为{y_probs.ndim}维")
if y_test.ndim != 1:
raise Exception(f"y_test必须为1维,实际为{y_test.ndim}维")
if len(y_probs) != len(y_test):
raise Exception(f"y_probs和y_test长度必须相同,实际分别为{len(y_probs)}和{len(y_test)}")
if interval <= 0:
raise Exception(f"interval必须大于0,实际为{interval}")
# 获取y_probs的最小值和最大值
min_val = np.min(y_probs)
max_val = np.max(y_probs)
# 如果最大值等于最小值,返回单个区间
if max_val == min_val:
count_ones = np.sum(y_test == 1)
count_total = len(y_test)
return [(float(min_val), float(max_val), int(count_ones), int(count_total))]
# 计算区间数量,确保覆盖整个范围
num_intervals = int(np.ceil((max_val - min_val) / interval))
result = []
# 遍历每个区间
for i in range(num_intervals):
# 计算当前区间的最小值和最大值
interval_min = min_val + i * interval
interval_max = min_val + (i + 1) * interval
# 对于最后一个区间,确保包含最大值
if i == num_intervals - 1:
interval_max = max_val + 1e-10 # 添加一个小的epsilon以包含最大值
# 找到在当前区间内的y_probs的索引
interval_indices = np.where((y_probs >= interval_min) & (y_probs < interval_max))[0]
# 统计对应索引位置上y_test中为1的个数
count_ones = np.sum(y_test[interval_indices] == 1)
# 统计该区间内y_probs的总个数
count_total = len(interval_indices)
# 添加到结果列表中
result.append((float(interval_min), float(interval_max), int(count_ones), int(count_total)))
# 按count_ones降序排列
result.sort(key=lambda x: x[2], reverse=True)
return result
```
|
|
|
|
|
|
|
|
|