import pandas as pd from pandas.api.types import is_string_dtype from pandas.api.types import is_numeric_dtype from sklearn.preprocessing import LabelEncoder import matplotlib.pyplot as plt from sklearn.preprocessing import MinMaxScaler from sklearn.model_selection import train_test_split import os import pyreadr from sklearn.linear_model import LogisticRegression import joblib from sklearn.metrics import roc_curve, auc import numpy as np def ROC_curve(clf, X_test, y_test, name): # ROC曲线绘制,AUC值计算 # 计算每一类的得分 yscore_rf = clf.predict_proba(X_test) fpr_rf, tpr_rf, thersholds = roc_curve(y_test, yscore_rf[:, 1]) plt.figure() plt.plot(fpr_rf, tpr_rf, color='darkorange', lw=2, label='ROC curve (area = %0.2f)' % auc(fpr_rf, tpr_rf)) plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--') plt.xlim([0.0, 1.0]) plt.ylim([0.0, 1.05]) plt.xlabel('False Positive Rate') plt.ylabel('True Positive Rate') # plt.title('Receiver operating characteristic example') plt.title(name) plt.legend(loc="lower right") # plt.savefig(plt.savefig('figs/savefig_example.png')) plt.savefig('../images/' + name + '.png') # plt.show() def save_gene_list(COAD_path, STAD_path): print(COAD_path, STAD_path) # 读取非胃癌数据 data_health = pyreadr.read_r(COAD_path) data_health = data_health['data'] # .head(10000) # 提取前10000行,后续把.head()删掉 # 读取胃癌数据 data_sick = pyreadr.read_r(STAD_path) data_sick = data_sick['data'] # .head(10000) # 提取前10000行,后续把.head()删掉 # 数据处理 data_health['D'] = 0 data_sick['D'] = 1 data = pd.concat([data_health, data_sick]) print(50, type(data)) data.to_csv('data.csv') # data_sick[['Variant_Classification','Tumor_Sample_Barcode']] sample_list = list(data.groupby(['Tumor_Sample_Barcode']).groups.keys()) # 把获取类别转为列表 gene_list = list(data.groupby(['Variant_Classification']).groups.keys()) # 把获取类别转为列表 np.save('sample_list.npy', np.array(sample_list)) # 保存为.npy格式 np.save('gene_list.npy', np.array(gene_list)) # 保存为.npy格式 def get_gene_list(): gene_list = np.load('gene_list.npy') gene_list = gene_list.tolist() return gene_list def get_sample_list(): sample_list = np.load('sample_list.npy') sample_list = sample_list.tolist() return sample_list def get_data(): snp_data = pd.read_csv('data.csv') # 假设SNP数据已保存在CSV文件中 # save_gene_list(COAD_path='../Rdata/COAD.Rdata', STAD_path='../Rdata/STAD.Rdata') # 计算基因数目并保存 def count_the_number_of_genes(): data = pd.read_csv('data.csv', low_memory=False) # 假设SNP数据已保存在CSV文件中 gene_list = get_gene_list() sample_list = get_sample_list() list_final = gene_list + ['Tumor_Sample_Barcode', 'D'] data_final = pd.DataFrame(columns=list_final) ind = 0 for sample in sample_list: tempdata = data[data['Tumor_Sample_Barcode'] == sample] message = [] for gene in gene_list: num = len(tempdata[tempdata['Variant_Classification'] == gene]) message.append(num) message = message + [sample, tempdata.iloc[0][-1]] data_final.loc[ind] = message ind = ind + 1 data_final.to_csv('data_final.csv') # count_the_number_of_genes() # 数据处理 def load_data(COAD_path, STAD_path): # 读取非胃癌数据 data_health = pyreadr.read_r('COAD.Rdata') data_health = data_health['data'] # .head(10000) # 提取前10000行,后续把.head()删掉 # 读取胃癌数据 data_sick = pyreadr.read_r('STAD.Rdata') data_sick = data_sick['data'] # .head(10000) # 提取前10000行,后续把.head()删掉 def training_SC_GRS_model(): data_final = pd.read_csv('data_final.csv', low_memory=False) # 假设SNP数据已保存在CSV文件中 gene_list = get_gene_list() data_final['Gi_sum'] = data_final[gene_list].sum(axis=1) # 模型训练 X_train, X_test, y_train, y_test = train_test_split(data_final['Gi_sum'].values.reshape(-1, 1), data_final['D'].values.astype('int'), test_size=0.2) clf = LogisticRegression(penalty='l2') clf.fit(X_train, y_train) # 模型系数 coef = clf.coef_ intercept = clf.intercept_ # 储存模型 joblib.dump(clf, 'clf_scgrs.model') # ROC绘图 SC_GRS方法 ROC_curve(clf, X_test, y_test, name='SC_GRS') print('training_SC_GRS_model') # 模型预测,假设data_final是输入数据,注意,data_final已经是处理好对应的基因数目的数据 # 模型加载 gene_list = np.load('gene_list.npy') gene_list = gene_list.tolist() clf_scgrs = joblib.load('clf_scgrs.model') # 数据处理 X_test = data_final[gene_list].sum(axis=1).values.reshape(-1, 1) y_pre = clf.predict(X_test) # 分类结果预测 prob_pre = clf.predict_proba(X_test) # 结果概率预测 # print(95, prob_pre) # print(95, np.amin(prob_pre)) # print(95, np.amax(prob_pre)) print(95, np.mean(prob_pre)) print(95, np.mean(y_pre)) training_SC_GRS_model() def training_DL_GRS_model(): print('training_DL_GRS_model') def training_OR_GRS_model(): print('training_OR_GRS_model')