概要
本文将介绍模型调用预测的步骤,这里模型使用的是LGB
代码
导包
from datetime import date, timedelta
now = datetime.datetime.now()
day1 = datetime.datetime(now.year, now.month, 1)
day1 = day1.strftime("%Y%m%d")print(day1) # 20240201
设置时间参数
now = datetime.datetime.now()
this_month_start = datetime.datetime(now.year, now.month, 1)
this_month_start #datetime.datetime(2024, 2, 1, 0, 0)
last_month_end = this_month_start - timedelta(days = 1)
last_month_start = datetime.datetime(last_month_end.year, last_month_end.month, 1).strftime("%Y%m%d")
last_month_end = last_month_end.strftime("%Y%m%d")
print(last_month_start, last_month_end) # 20240101 20240131
print("预测的月,取月初:", day1) #20240201
print("上个月月初: ", last_month_start) #20240101
print("上个月月末: ", last_month_end) #20240131
读取数据
project = spark.sql(f"""select *,row_number() over(order by customer_id desc) as rnfrom(select a.cust_no,a.*(除customer_id),nvl(duartion_is_lm, 0) as duartion_is_lm,{day1} as dtfrom(select t1.*from ku.lstable t1inner join(select customer_idfrom kb.cust_infowhere dt = {last_month_end} and ka_type != '') t2 on t1.customer_id = t2.customer_idwhere t1.dt = {day1}) aleft join(select customer_id,case when sum(duartion) > 60 then 1 else 0 end as duartion_is_lmfrom ku.duartiontablewhere dt between {last_month_start} and {last_month_end}group by customer_id) b on a.customer_id = b.customer_id)""")
project.show(3)
获取跑的批数
project.write.format("hive").mode("overwrite").saveAsTable("ku.cunchu")
del ls
rn = spark.sql(f"""select max(rn) as max_rn from ku.cunchu""")
rn.show(3)
rn_data = rn.toPandas()
del rn
rn_data_num = int(np.ceil(rn_data.values/1000000)) #间隔1百万跑一次
del rn_data
print("最多次数:", rn_data_num) # 12
导入模型
import joblib
model_joblib = joblib.load("project_joblib.pkl")
t1 = time.time()
模型预测
for num in range(1, rn_data_num + 1):if num == 1:print("开始第%d个" %num)data_rn = spark.sql(f"""select from ku.cunchu where rn <= 1000000""")df2 = data_rn.toPanas()del data_rndf2 = df2.drop(['rn'], axis=1)df2['customer_id'] = pd.to_numeric(df2['customer_id'], error="coerce")df2 = df2.dropna(subset=['customer_id'])for col in df2.columns:df2[col] = df2[col].astype("int")X = df2.drop(['customer_id', 'dt'], axis=1)y_pred = model_joblib.predict(X)y_pred = pd.DataFrame(y_pred)result = pd.concat((df2, y_pred), axis=1)del df2result.rename({0: u'y_pred'}, axis=1, inplace=True)result = result.fillna(0)result = result[result["customer_id"] != 0]for col in result.columns:result[col] = result[col].astype("int")spark_df = spark.createDataFrame(result)del resultspark_df.write.format("hive").mode("append").saveAsTable("ku.cunchu_result")del spark_dfprint("第%d个跑完" %num)else:print("开始第%d个" %num)data_rn = spark.sql(f"""select * from ku.cunchu where rn between 1000000*{num-1}+1 and 1000000*{num}""")df2 = data_rn.toPanas()del data_rndf2 = df2.drop(['rn'], axis=1)df2['customer_id'] = pd.to_numeric(df2['customer_id'], error="coerce")df2 = df2.dropna(subset=['customer_id'])for col in df2.columns:df2[col] = df2[col].astype("int")X = df2.drop(['customer_id', 'dt'], axis=1)y_pred = model_joblib.predict(X)y_pred = pd.DataFrame(y_pred)result = pd.concat((df2, y_pred), axis=1)del df2result.rename({0: u'y_pred'}, axis=1, inplace=True)result = result.fillna(0)result = result[result["customer_id"] != 0]for col in result.columns:result[col] = result[col].astype("int")spark_df = spark.createDataFrame(result)del resultspark_df.write.format("hive").mode("append").saveAsTable("ku.cunchu_result")del spark_dfprint("第%d个跑完" %num)
print("预测建表跑了 {time.time()-t1:.05f}s")
结果存储
df_pjzb = spark.sql(f"""select cust_no, y_pred from ku.cunchu_result where dt = {day1}""")
df_pjzb2 = df_pjzb.groupBy("y_pred").agg({"customer_id": "count"}).toDF("y_pred", "cou")
查看结果分布
df_pjzb2.show()
# y_pred cou
# 0 9000
# 1 3000
参考资料:自己琢磨将资料整合