|
统计相同卡号下最大交易号
a[["卡号","交易ID"]].groupby("卡号").max()
|
|
求sum要先判断其为数字,若为字符串,pandas会直接将其拼接,而不是报错
import pandas as pd
# 假设你有以下的DataFrame
data = {
'交易号': ['T001', 'T002', 'T003', 'T004', 'T005', 'T006'],
'卡号': ['C001', 'C002', 'C001', 'C003', 'C002', 'C001'],
'金额': [100, 200, 150, 300, 250, 120]
}
df = pd.DataFrame(data)
# 按卡号分组,并计算交易笔数和总金额
grouped = df.groupby('卡号').agg({'交易号': 'size', '金额': 'sum'}).reset_index()
# 将列名更改为更直观的名称
grouped.columns = ['卡号', '交易笔数', '总金额']
print(grouped)
卡号 交易笔数 总金额
0 C001 3 370
1 C002 2 450
2 C003 1 300
|
import pandas as pd
# 示例数据
data = {
'交易号': ['T1', 'T2', 'T3', 'T4', 'T5','T6'],
'卡号': ['C1', 'C1', 'C2', 'C2', 'C3','C3'],
'商户1': ['M1', 'M1', 'M2', 'M2', 'M1','M1'],
'商户2': ['S1', 'S2', 'S1', 'S2', 'S1','S1'],
'金额': [100, 200, 300, 400, 500,500]
}
df = pd.DataFrame(data)
print(df)
交易号 卡号 商户1 商户2 金额 0 T1 C1 M1 S1 100 1 T2 C1 M1 S2 200 2 T3 C2 M2 S1 300 3 T4 C2 M2 S2 400 4 T5 C3 M1 S1 500 5 T6 C3 M1 S1 500
# 首先,我们按'商户1'和'商户2'以及'卡号'进行分组
grouped = df.groupby(['卡号', '商户1', '商户2'])
# 接着,我们计算每个组的交易笔数('size')和交易金额的总和('金额'的'sum')
# 注意:'size'是pandas分组对象的一个默认方法,用于计算每个组的行数
result = grouped.agg({'交易号': 'size', '金额': 'sum'}).reset_index()
# 如果需要,可以将'交易号'列的名称更改为'交易笔数'以使其更清晰
result = result.rename(columns={'交易号': '交易笔数'})
result
卡号 商户1 商户2 交易笔数 金额 0 C1 M1 S1 1 100 1 C1 M1 S2 1 200 2 C2 M2 S1 1 300 3 C2 M2 S2 1 400 4 C3 M1 S1 2 1000 |
|
|
|
|
import pandas as pd
import numpy as np
# 示例数据
data = {
'指标编码': [1, 2, 3, 4, 5, 6],
'类别': ['基础', '衍生', '基础', '衍生', '基础', '衍生'],
'特征算法名': ['alg1', np.nan, 'alg3', np.nan, 'alg5', 'alg6']
}
df = pd.DataFrame(data)
df
指标编码 类别 特征算法名
0 1 基础 alg1
1 2 衍生 NaN
2 3 基础 alg3
3 4 衍生 NaN
4 5 基础 alg5
5 6 衍生 alg6
count:不包含空值的个数
# 使用 groupby 和 agg 进行统计
result = df.groupby('类别')['特征算法名'].agg(
总个数='count',
非空个数=lambda x: x.notna().sum(),
空值个数=lambda x: x.isna().sum()
).reset_index()
print(result)
类别 总个数 非空个数 空值个数
0 基础 3 3 0
1 衍生 1 1 2
size:包含空值的个数
import pandas as pd
import numpy as np
# 示例数据
data = {
'指标编码': [1, 2, 3, 4, 5, 6],
'类别': ['基础', '衍生', '基础', '衍生', '基础', '衍生'],
'特征算法名': ['alg1', np.nan, 'alg3', np.nan, 'alg5', 'alg6']
}
df = pd.DataFrame(data)
# 使用 groupby 和 agg 进行统计
result = df.groupby('类别')['特征算法名'].agg(
总个数='size', # 使用 size 计算总行数,包括 NaN
非空个数=lambda x: x.notna().sum(),
空值个数=lambda x: x.isna().sum()
).reset_index()
print(result)
上面的代码并不会统计类别为空的情况,可进行以下处理
import pandas as pd
import numpy as np
# 示例数据
data = {
'指标编码': [1, 2, 3, 4, 5, 6],
'类别': ['基础', '衍生', np.nan, '衍生', '基础', np.nan],
'特征算法名': ['alg1', np.nan, 'alg3', np.nan, 'alg5', 'alg6']
}
df = pd.DataFrame(data)
# 将类别列中的空值替换为“类别空值”
df['类别'] = df['类别'].fillna('未分类')
# 使用 groupby 和 agg 进行统计
result = df.groupby('类别')['特征算法名'].agg(
总个数='size', # 使用 size 计算总行数,包括 NaN
非空个数=lambda x: x.notna().sum(),
空值个数=lambda x: x.isna().sum()
).reset_index()
print(result)
类别 总个数 非空个数 空值个数
0 基础 2 2 0
1 未分类 2 2 0
2 衍生 2 0 2
|
使用 include_groups=False
而使用 include_groups=False,则 "PARTY_ID" 列不会被包含在内,
因此你的 .apply() 函数无法直接访问 "PARTY_ID" 列(除非你在函数内部特别处理)。
def example_apply_without_group_key(group):
# 注意这里没有访问 'PARTY_ID' 列,因为我们设置了 include_groups=False
return group['value'].sum()
result = df.groupby("PARTY_ID", include_groups=False).apply(example_apply_without_group_key).reset_index()
|
y_label_stats = df.groupby('label').agg(
count=('mismatch', 'sum'),
y_label_count=('label', 'size')
).reset_index()
df = df.merge(y_label_stats, on='label', how='left')
df[:3]
会生成两列y_label_count,count - count是mismatch按label sum - y_label_count是label size |
```
df['mean_reranked_score'] = df.groupby('real_label')['reranked_score'].transform('mean')
df['mean_ms_score'] = df.groupby('real_label')['ms_score'].transform('mean')
```
|
|
|
import pandas as pd
# 示例数据
df = pd.DataFrame({
'date_diff': [0, 1, 1, 0, 1, 3, 0,1,1,2,1],
'lianxu_day': [0, 1, 1, 1, 1, 0, 1,1,1,0,1]
})
# Step 1: 标记每次清零的位置作为新的组标志
# 每当 lianxu_day == 0 时,表示一个新组开始
groups = (df['lianxu_day'] == 0).cumsum()
df['lianxu'] = df.groupby(groups)['lianxu_day'].cumsum()
print(df)
date_diff lianxu_day lianxu
0 0 0 0
1 1 1 1
2 1 1 2
3 0 1 3
4 1 1 4
5 3 0 0
6 0 1 1
7 1 1 2
8 1 1 3
9 2 0 0
10 1 1 1
|
group = df[df["PARTY_ID"]=="PARTY009"].sort_values(by="TSTM")
group["date_diff"] = group["TSTM"].diff().dt.days.fillna(0).astype(int)
group["lianxu_day"] = group["date_diff"].apply(lambda x: 1 if x in [0, 1] else 0)
# 创建分组标记
groups = (group['lianxu_day'] == 0).cumsum()
# 自定义函数:仅累加 date_diff == 1 的行
def custom_cumsum(x):
cnt = 0
result = []
for val in x['date_diff']:
if val == 1:
cnt += 1
result.append(cnt)
result = np.array(result)+1
return pd.Series(result, index=x.index)
# 应用分组和自定义累加
group['consecutive'] = group.groupby(groups, group_keys=False).apply(custom_cumsum)
group[["TSTM", "date_diff","lianxu_day","consecutive"]]
TSTM date_diff lianxu_day consecutive
5 2022-06-01 0 1 1
60 2022-06-02 1 1 2
69 2022-06-02 0 1 2
2 2022-06-03 1 1 3
4 2022-06-03 0 1 3
98 2022-06-03 0 1 3
46 2022-06-05 2 0 1
43 2022-06-06 1 1 2
80 2022-06-07 1 1 3
50 2022-06-08 1 1 4
33 2022-06-09 1 1 5
38 2022-06-09 0 1 5
|
|
|
|
|
|
|
按某列分组,然后取均值
然后分组后的个数会比原来的数据行数少,因为会存在合并的数据
import pandas as pd
df = pd.read_csv("/wks/datasets/demo_data/.csv")
df
temp =df[df["currency"] == 2]
tmp1 = temp["amt"].groupby(temp["ACCT_NUM"]).transform('mean')
df["a1_mean"] = tmp1
df
temp分组后返回的是与temp同个数的序列
tmp1 = temp["amt"].groupby(temp["ACCT_NUM"]).transform('mean')
tmp1
temp有三行,tmp1返回的就有三行,其中两行完全一样
并且tmp1的索引与temp完全一致,而temp的索引与df完全一致,
所以tmp1的列全合并后df上时会按索引进行合并
|
|
如果重置索引,或者是两个不同的数表,按索引匹配就会出现错误的结果
acc2 = df[1:]
acc2 = acc2.reset_index()
acc2["bad_index"] = tmp1
acc2
按分组键合并
avg_values = temp.groupby(temp["ACCT_NUM"])["amt"].mean()
acc2['avg received '+str(1)] = acc2["ACCT_NUM"].map(avg_values).fillna(0)
acc2
avg_values这种分组方式生成的序列index为分组键,故后面map时可以一一对应
|
|
对df添加列bm25_label,按query_text分组,取每一组中bm25_sim_score的最大值对应的sim_label的值
# 按query_text分组,取每组中bm25_sim_score最大值对应的sim_label
df_max_idx = df.groupby('query_text')['bm25_sim_score'].idxmax()
top1_labels = df.loc[df_max_idx, ['query_text', 'sim_label']].set_index('query_text')['sim_label']
df['bm25_top1_label'] = df['query_text'].map(top1_labels)
|
|
|
|
|
data_path = "/wks/datasets/ibm_aml/bb11_train_ibm_small.csv"
df = pd.read_csv(data_path)
df.shape
(5078345, 14)
df.resample('D', on='DT_TIME')
count_per_day = df.resample('D', on='DT_TIME').size().reset_index(name='rows')
count_per_day
import pandas as pd
# 假设已有 DataFrame: df
# 1) 把 DT_TIME 转成 datetime
df['DT_TIME'] = pd.to_datetime(df['DT_TIME'], format='%Y/%m/%d %H:%M')
# 2) 按“天”统计行数
count_per_day = (
df.groupby(df['DT_TIME'].dt.date) # 按日期分组
.size() # 计数
.reset_index(name='rows') # 变回 DataFrame
)
print(count_per_day)
DT_TIME rows
0 2022-09-01 1114921
1 2022-09-02 754449
2 2022-09-03 207382
3 2022-09-04 207430
4 2022-09-05 482650
5 2022-09-06 482089
6 2022-09-07 482751
7 2022-09-08 482773
8 2022-09-09 654467
9 2022-09-10 208325
10 2022-09-11 396
11 2022-09-12 281
12 2022-09-13 184
13 2022-09-14 121
14 2022-09-15 46
15 2022-09-16 46
16 2022-09-17 23
17 2022-09-18 11
import pandas as pd
# 示例数据
df = pd.DataFrame({
"DT_TIME": ["2022/09/01 00:20", "2022/09/01 12:30", "2022/09/02 01:45"]
})
# 提取日期部分(假设格式是 "YYYY/MM/DD HH:MM")
df["DATE"] = df["DT_TIME"].str.split(" ").str[0]
# 按天统计行数
daily_counts = df["DATE"].value_counts().sort_index()
print(daily_counts)
转换为 datetime 后按日期分组(推荐)
# 转换为 datetime 类型
df["DT_TIME"] = pd.to_datetime(df["DT_TIME"], format="%Y/%m/%d %H:%M")
# 方法 2.1: 使用 dt.date 提取日期
daily_counts = df.groupby(df["DT_TIME"].dt.date).size()
print(daily_counts)
# 方法 2.2: 使用 dt.floor("D") 按天分组(等价于 dt.date)
daily_counts = df.groupby(df["DT_TIME"].dt.floor("D")).size()
print(daily_counts)
直接使用 resample(时间序列专用)
# 将 DT_TIME 设为索引
df = df.set_index("DT_TIME")
# 按天统计行数
daily_counts = df.resample("D").size()
print(daily_counts)
|
有一个pandas数表,列的信息如下
```
class ParamConfig:
#字段列
time14 = "DT_TIME"
id11 = "ACCT_NUM"
id12 = "TCAC"
bank11 = "ORGANKEY"
bank12 = "CFIC"
amt1 = "CRAT" #原币交易金额,付
amt2 = "CRAT" #原币交易金额,收
amt3 = "CNY_AMT"
amt4 = "USD_AMT"
currency1 = "CRTP" #币种
currency2 = "CRTP"
channel = "TSTP"
label = "VOUCHER_NO"
drop_cols = ["TICD","CB_PK","TX_NO","CNY_AMT","USD_AMT"]
dict_file = "dict_file23.txt"
num_scaler_file = "scaler_num23.pkl"
date_scaler_file = "scaler_date23.pkl"
max_date = '2035-01-01'
model_param_path = "ibm_gnn_model_1.pkl"
def __init__(self):
"""
- identity:标识列,需要转str
- date_type:日期列
- classify_type:类型别,需要需要转str
- num_type:数字列,转float64
"""
#字段列分类
self.identity= [self.id11,self.id12,self.bank11,self.bank12]
self.date_type = [self.time14]
self.classify_type = [self.channel,self.currency1]
self.classify_type2 = [[self.bank11,self.bank12]] #一组类别使用同一个字典
self.classify_type_pre = [self.channel, self.currency1, self.bank11, self.bank12] #预测时的类别列
self.num_type = [self.amt1]
self.bool_type = []
pc = ParamConfig()
```
按以下逻辑重新合并数据:
1. pc.id11与pc.id12相同的交易进行合并
2. 日期字段可以按1天合并,也可以按2天合并,还可以按小时合并,该部分配置为参数,可选参数有按小时,按天
3. 合并时判断交易中的类别型字段,cp.classify_type,需要交易按类别分组,相同类别的交易进行合并
4. 分组后相同的类别组合中数字类型字段相加
5. 不用考虑pc.label字段
请实现以上功能
单天多类别
下面给出一段可直接落地的代码,把“时间粒度”做成可配置参数(hour / day),其余规则全部满足。
核心思路:
1. 先把日期截断成指定粒度;
2. 用 `[id11, id12, 截断时间, channel, currency1]` 组成分组键;
3. 数字列求和,其余列保留第一条。
```python
import pandas as pd
def merge_transactions(
df: pd.DataFrame,
time_grain: str = "day" # 'hour' 或 'day'
) -> pd.DataFrame:
"""
按规则合并交易
:param df: 原始 DataFrame
:param time_grain: 'hour' 或 'day'
:return: 合并后的 DataFrame
"""
# ---------- 1. 字段类型处理 ----------
str_cols = pc.identity + pc.classify_type
df[str_cols] = df[str_cols].astype(str)
df[pc.time14] = pd.to_datetime(df[pc.time14], errors='coerce')
# ---------- 2. 构造截断时间 ----------
if time_grain == "hour":
df["time_key"] = df[pc.time14].dt.floor("H")
elif time_grain == "day":
df["time_key"] = df[pc.time14].dt.floor("D")
else:
raise ValueError("time_grain 只能取 'hour' 或 'day'")
# ---------- 3. 分组键 ----------
group_keys = [pc.id11, pc.id12, "time_key"] + pc.classify_type
# ---------- 4. 聚合规则 ----------
# 数字列:求和
num_cols = [c for c in pc.num_type if c in df.columns]
# 其余列:保留第一条
other_cols = [c for c in df.columns if c not in num_cols + group_keys + [pc.label]]
agg_dict = {c: "sum" for c in num_cols}
agg_dict.update({c: "first" for c in other_cols})
# label 明确丢弃,所以不放进聚合规则
# ---------- 5. 合并 ----------
df_out = (
df
.groupby(group_keys, as_index=False, sort=False)
.agg(agg_dict)
)
# 把截断时间列删掉,保持原列
df_out = df_out.drop(columns=["time_key"])
return df_out
# -------------------- 使用示例 --------------------
# df_raw = pd.read_csv("xxx.csv")
# df_merged = merge_transactions(df_raw, time_grain="day") # 或 "hour"
```
使用方式
```python
# 按小时合并
df_hour = merge_transactions(df, time_grain="hour")
# 按天合并
df_day = merge_transactions(df, time_grain="day")
```
代码已满足
1. `id11` + `id12` 相同交易合并
2. 时间粒度可配置:hour / day
3. `classify_type`(即 `TSTP` 与 `CRTP`)同类别再分组
4. 数字列(`amt1` 等)求和
5. 忽略 `label / VOUCHER_NO`。
|
|
封装示例 import pandas as pd data_path = "/wks/datasets/ibm_aml/bb11_train_ibm_small.csv" df = pd.read_csv(data_path) df.shape from dataflow import agg_days from algtu import ParamConfig pc = ParamConfig() results = agg_days(df,pc) results 详细代码 from algtu import ParamConfig pc = ParamConfig() import pandas as pd data_path = "/wks/datasets/ibm_aml/bb11_train_ibm_small.csv" df = pd.read_csv(data_path) df.shape
import pandas as pd
from typing import List, Dict, Any, Literal, Optional
from datetime import datetime, timedelta
class PtimeAgg:
def merge_transactions_with_time_granularity(self,
df: pd.DataFrame,
pc: ParamConfig,
time_granularity: Literal['hour', 'day'] = 'day',
days_interval: int = 1,
time_column: str = None
) -> pd.DataFrame:
"""
按时间粒度合并交易数据
参数:
df: 原始交易数据DataFrame
pc: ParamConfig配置对象
time_granularity: 时间粒度,可选 'hour' 或 'day'
days_interval: 当time_granularity='day'时,指定合并的天数间隔
time_column: 时间列名,如果为None则使用pc.date_type中的第一个时间列
返回:
合并后的DataFrame
"""
# 确保必要的列存在
required_cols = pc.identity_agg + pc.classify_type + pc.date_type
missing_cols = [col for col in required_cols if col not in df.columns]
if missing_cols:
raise ValueError(f"数据框中缺少必要的列: {missing_cols}")
# 设置时间列
if time_column is None:
time_column = pc.date_type[0]
# 复制数据以避免修改原始数据
df_work = df.copy()
# 确保时间列是datetime类型
if not pd.api.types.is_datetime64_any_dtype(df_work[time_column]):
df_work[time_column] = pd.to_datetime(df_work[time_column])
# 根据时间粒度创建时间分组键
if time_granularity == 'hour':
# 按小时分组
df_work[pc.time_group] = df_work[time_column].dt.floor('h')
elif time_granularity == 'day':
if days_interval == 1:
# 按天分组
df_work[pc.time_group] = df_work[time_column].dt.date
else:
# 按多天间隔分组
df_work[pc.time_group] = df_work[time_column].apply(
lambda x: self._get_time_interval_group(x, days_interval)
)
else:
raise ValueError("time_granularity 必须是 'hour' 或 'day'")
# 定义分组键:id11, id12, 类别字段, 时间分组
groupby_cols = pc.identity_agg + pc.classify_type + [pc.time_group]
# 定义聚合规则
aggregation_rules = self._create_aggregation_rules(df_work, pc, time_column)
# 执行分组聚合
merged_df = df_work.groupby(groupby_cols, as_index=False).agg(aggregation_rules)
# 清理临时列
if pc.time_group in merged_df.columns:
merged_df = merged_df.drop(pc.time_group, axis=1)
# 重置索引
merged_df.reset_index(drop=True, inplace=True)
return merged_df
def _get_time_interval_group(self, dt: datetime, days_interval: int) -> str:
"""
根据天数间隔获取时间分组键
参数:
dt: datetime对象
days_interval: 天数间隔
返回:
分组键字符串
"""
if days_interval <= 0:
raise ValueError("days_interval 必须大于0")
# 计算从某个参考点开始的天数间隔
# 这里使用1970-01-01作为参考点
reference_date = datetime(1970, 1, 1)
days_diff = (dt - reference_date).days
group_num = days_diff // days_interval
start_date = reference_date + timedelta(days=group_num * days_interval)
end_date = start_date + timedelta(days=days_interval - 1)
return f"{start_date.strftime('%Y-%m-%d')}_{end_date.strftime('%Y-%m-%d')}"
def _create_aggregation_rules(self, df: pd.DataFrame, pc: ParamConfig, time_column: str) -> Dict[str, Any]:
"""
创建聚合规则字典
参数:
df: 数据框
pc: 配置对象
time_column: 时间列名
返回:
聚合规则字典
"""
aggregation_rules = {}
# 数值型字段 - 求和
for num_col in pc.num_type:
if num_col in df.columns:
aggregation_rules[num_col] = 'sum'
# 时间字段 - 取最小值(表示该时间段的开始时间)
if time_column in df.columns:
aggregation_rules[time_column] = 'min'
# 标识字段 - 取第一个值(因为这些字段在合并后应该相同)
for id_col in pc.identity:
if id_col in df.columns and id_col not in [pc.id11, pc.id12]:
aggregation_rules[id_col] = 'first'
# 类别字段已经在分组键中,不需要额外聚合
# 处理其他字段 - 取第一个值
for col in df.columns:
if (col not in aggregation_rules and
col not in pc.identity_agg + pc.classify_type + [pc.time_group] and
col != pc.label): # 排除label字段
aggregation_rules[col] = 'first'
return aggregation_rules
def merge_transactions_flexible(self,
df: pd.DataFrame,
pc: ParamConfig,
time_config: Optional[Dict[str, Any]] = None
) -> pd.DataFrame:
"""
灵活的合并函数,支持多种配置选项
参数:
df: 原始交易数据DataFrame
pc: ParamConfig配置对象
time_config: 时间配置字典,包含:
- granularity: 'hour', 'day', 'none'(不按时间分组)
- days_interval: 当granularity='day'时的天数间隔
- time_column: 时间列名
返回:
合并后的DataFrame
"""
# 默认时间配置
if time_config is None:
time_config = {
'granularity': 'day',
'days_interval': 1,
'time_column': pc.date_type[0] if pc.date_type else None
}
# 复制数据
df_work = df.copy()
# 设置分组键
groupby_cols = pc.identity_agg + pc.classify_type
# 处理时间分组
if time_config['granularity'] != 'none' and time_config['time_column']:
time_column = time_config['time_column']
# 确保时间列是datetime类型
if not pd.api.types.is_datetime64_any_dtype(df_work[time_column]):
df_work[time_column] = pd.to_datetime(df_work[time_column])
if time_config['granularity'] == 'hour':
# 按小时分组
df_work[pc.time_group] = df_work[time_column].dt.floor('H')
groupby_cols.append(pc.time_group)
elif time_config['granularity'] == 'day':
if time_config['days_interval'] == 1:
# 按天分组
df_work[pc.time_group] = df_work[time_column].dt.date
groupby_cols.append(pc.time_group)
else:
# 按多天间隔分组
df_work[pc.time_group] = df_work[time_column].apply(
lambda x: self._get_time_interval_group(x, time_config['days_interval'])
)
groupby_cols.append(pc.time_group)
# 创建聚合规则
aggregation_rules = self._create_aggregation_rules(df_work, pc, time_config['time_column'] if time_config['time_column'] else None)
# 执行分组聚合
merged_df = df_work.groupby(groupby_cols, as_index=False).agg(aggregation_rules)
# 清理临时列
if pc.time_group in merged_df.columns:
merged_df = merged_df.drop(pc.time_group, axis=1)
# 重置索引
merged_df.reset_index(drop=True, inplace=True)
return merged_df
# 使用示例
def example_usage():
"""
使用示例
"""
# 假设你已经有了数据框 df 和配置对象 pc
# df = pd.read_csv('your_data.csv')
# pc = ParamConfig()
pt = PtimeAgg()
# 示例1: 按天合并
merged_daily = pt.merge_transactions_with_time_granularity(
df, pc, time_granularity='day', days_interval=1
)
# 示例2: 按2天间隔合并
merged_2days = pt.merge_transactions_with_time_granularity(
df, pc, time_granularity='day', days_interval=2
)
# 示例3: 按小时合并
merged_hourly = pt.merge_transactions_with_time_granularity(
df, pc, time_granularity='hour'
)
# 示例4: 使用灵活配置
time_config = {
'granularity': 'day',
'days_interval': 3,
'time_column': pc.time14
}
merged_custom = pt.merge_transactions_flexible(df, pc, time_config)
print("合并完成!")
print(f"原始数据行数: {len(df)}")
print(f"按天合并后行数: {len(merged_daily)}")
print(f"按2天合并后行数: {len(merged_2days)}")
print(f"按小时合并后行数: {len(merged_hourly)}")
print(f"自定义合并后行数: {len(merged_custom)}")
return {
'daily': merged_daily,
'2days': merged_2days,
'hourly': merged_hourly,
'custom': merged_custom
}
results = example_usage()
results
|
|
|
|
|
按identity_agg作为标识字段对数据按day做了聚合df_days,聚合后的数据df_days没有pc.label字段,现在要为聚合后的数据df_days增加pc.label字段(pc.label字段的值只有0与1),方法如下:
1. 针对原始数据df,按identity_agg作为标识字段 分析pc.label字段,如果pc.label字段存在为1的值,那么聚合后数据的pc.label为1
2. 针对原始数据df,按identity_agg作为标识字段 分析pc.label字段,如果pc.label字段的值皆为0,那么聚合后数据的pc.label为0
请实现以上功能
import pandas as pd
# 假设已有
# pc = ParamConfig()
# df : 原始交易流水
# df_days : 按 identity_agg 聚合后的表(不含 pc.label)
# 1. 计算每个 identity_agg 分组是否出现过 label==1
label_flag = (
df.groupby(pc.identity_agg)[pc.label] # 按标识字段分组
.max() # 只要出现过1,max就是1
.astype(int) # 确保是 0/1 int
.rename(pc.label) # 列名保持统一
.reset_index() # 变回普通列
)
# 2. 把结果合并到聚合后的表
df_days = df_days.merge(label_flag, on=pc.identity_agg, how='left')
# 如果 df_days 里原本就可能有 pc.label,但值全为 NaN,需要覆盖
# df_days[pc.label] = df_days[pc.label].fillna(0).astype(int)
df_days[pc.label].unique()
array([0, 1])
label_flag 中有pc.label为1的数据,df_days中原本是没有pc.label标签的, 但merge之后,df_days中有了pc.label为1的数据可以理解,但pc.label为0的数据是怎么来的?
`label_flag` 这张表 **对每一个** `identity_agg` 组合 **都有一条记录**,
其中 `label_flag[pc.label]` 要么是 1,要么是 0。
在 `merge` 时,`how='left'` 以 `df_days` 为主表,去 `label_flag` 里 **精准匹配** 对应的 `identity_agg` 组合。
- 只要匹配成功,`label_flag` 里对应的 0 或 1 就被带进来;
- 由于 `label_flag` 本身就把所有出现过的组合都算了一遍,**不会出现匹配不到的情况**,因此 **不会留下 NaN**。
所以:
- 在原始 `df` 中 **出现过 1** 的组 → `label_flag` 里就是 1 → merge 后对应行就是 1;
- 在原始 `df` 中 **只有 0** 的组 → `label_flag` 里就是 0 → merge 后对应行就是 0。
这就是 **“pc.label 为 0 的数据是怎么来的”** ——
它们本来就由 `groupby(...).max()` 算出来并放在 `label_flag` 里,再通过 `merge` 带进了 `df_days`。
|
# 假设原始数据为 df,聚合后的数据为 df_days
# 1. 从原始数据df中,针对每个identity_agg分组,计算label的最大值(因为label只有0和1,最大值即为是否存在1)
label_agg = df.groupby(pc.identity_agg)[pc.label].max().reset_index()
# 重命名列,以便后续合并
label_agg.rename(columns={pc.label: pc.label}, inplace=True)
# 2. 将计算得到的label合并到聚合后的数据df_days中
# 注意:df_days中已经按identity_agg聚合(可能包含多列),这里使用left merge以确保df_days的行数不变
df_days = df_days.merge(label_agg, on=pc.identity_agg, how='left')
# 3. 检查是否有缺失值(正常情况下不应该有,因为df_days的identity_agg来自df)
# 如果有缺失,则填充为0(根据业务逻辑:如果原始数据中没有该分组,则标记为0?但通常不会出现)
# 但根据需求,原始数据df中每个identity_agg分组至少有一条记录,所以这里应该没有缺失。
# 为了安全,可以填充0:
df_days[pc.label].fillna(0, inplace=True)
# 4. 确保label列为整数类型(0和1)
df_days[pc.label] = df_days[pc.label].astype(int)
|
|
|
|
|
|
|