From 62f6db77b22e1fb28b3355d75a30f9ecf6c94d95 Mon Sep 17 00:00:00 2001 From: Pjotr Prins Date: Mon, 16 Mar 2015 15:34:38 +0300 Subject: Adding multi-core GWAS --- wqflask/wqflask/my_pylmm/pyLMM/gwas.py | 213 ++++++++++++++++++++++++++++++ wqflask/wqflask/my_pylmm/pyLMM/kinship.py | 4 +- wqflask/wqflask/my_pylmm/pyLMM/lmm.py | 62 +++++++-- wqflask/wqflask/my_pylmm/pyLMM/runlmm.py | 8 +- 4 files changed, 271 insertions(+), 16 deletions(-) create mode 100644 wqflask/wqflask/my_pylmm/pyLMM/gwas.py (limited to 'wqflask') diff --git a/wqflask/wqflask/my_pylmm/pyLMM/gwas.py b/wqflask/wqflask/my_pylmm/pyLMM/gwas.py new file mode 100644 index 00000000..52455014 --- /dev/null +++ b/wqflask/wqflask/my_pylmm/pyLMM/gwas.py @@ -0,0 +1,213 @@ +# pylmm-based GWAS calculation +# +# Copyright (C) 2013 Nicholas A. Furlotte (nick.furlotte@gmail.com) +# Copyright (C) 2015 Pjotr Prins (pjotr.prins@thebird.nl) +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +#!/usr/bin/python + +import pdb +import time +import sys +# from utility import temp_data +import lmm + + +import os +import numpy as np +import input +from optmatrix import matrix_initialize +# from lmm import LMM + +import multiprocessing as mp # Multiprocessing is part of the Python stdlib +import Queue + +def compute_snp(j,snp_ids,q = None): + # print(j,len(snp_ids),"\n") + result = [] + for snp_id in snp_ids: + # j,snp_id = collect + snp,id = snp_id + # id = collect[1] + # result = [] + # Check SNPs for missing values + x = snp[keep].reshape((n,1)) # all the SNPs + v = np.isnan(x).reshape((-1,)) + if v.sum(): + # NOTE: this code appears to be unreachable! + if verbose: + sys.stderr.write("Found missing values in "+str(x)) + keeps = True - v + xs = x[keeps,:] + if keeps.sum() <= 1 or xs.var() <= 1e-6: + # PS.append(np.nan) + # TS.append(np.nan) + # result.append(formatResult(id,np.nan,np.nan,np.nan,np.nan)) + # continue + result.append(formatResult(id,np.nan,np.nan,np.nan,np.nan)) + continue + + # Its ok to center the genotype - I used normalizeGenotype to + # force the removal of missing genotypes as opposed to replacing them with MAF. + if not normalizeGenotype: + xs = (xs - xs.mean()) / np.sqrt(xs.var()) + Ys = Y[keeps] + X0s = X0[keeps,:] + Ks = K[keeps,:][:,keeps] + if kfile2: + K2s = K2[keeps,:][:,keeps] + Ls = LMM_withK2(Ys,Ks,X0=X0s,verbose=verbose,K2=K2s) + else: + Ls = LMM(Ys,Ks,X0=X0s,verbose=verbose) + if refit: + Ls.fit(X=xs,REML=REML) + else: + #try: + Ls.fit(REML=REML) + #except: pdb.set_trace() + ts,ps,beta,betaVar = Ls.association(xs,REML=REML,returnBeta=True) + else: + if x.var() == 0: + # Note: this code appears to be unreachable! + + # PS.append(np.nan) + # TS.append(np.nan) + # result.append(formatResult(id,np.nan,np.nan,np.nan,np.nan)) # writes nan values + result.append(formatResult(id,np.nan,np.nan,np.nan,np.nan)) + continue + + if refit: + L.fit(X=x,REML=REML) + # This is where it happens + ts,ps,beta,betaVar = L.association(x,REML=REML,returnBeta=True) + result.append(formatResult(id,beta,np.sqrt(betaVar).sum(),ts,ps)) + # compute_snp.q.put([j,formatResult(id,beta,np.sqrt(betaVar).sum(),ts,ps)]) + # print [j,result[0]]," in result queue\n" + if not q: + q = compute_snp.q + q.put([j,result]) + return j + # PS.append(ps) + # TS.append(ts) + # return len(result) + # compute.q.put(result) + # return None + +def f_init(q): + compute_snp.q = q + +def gwas(Y,G,K,restricted_max_likelihood=True,refit=False,verbose=True): + """ + Execute a GWAS. The G matrix should be n inds (cols) x m snps (rows) + """ + matrix_initialize() + cpu_num = mp.cpu_count() + numThreads = 1 + kfile2 = False + + sys.stderr.write(str(G.shape)+"\n") + n = G.shape[1] # inds + inds = n + m = G.shape[0] # snps + snps = m + sys.stderr.write(str(m)+" SNPs\n") + # print "***** GWAS: G",G.shape,G + assert m>n, "n should be larger than m (snps>inds)" + + # CREATE LMM object for association + # if not kfile2: L = LMM(Y,K,Kva,Kve,X0,verbose=verbose) + # else: L = LMM_withK2(Y,K,Kva,Kve,X0,verbose=verbose,K2=K2) + + runlmm = lmm.LMM(Y,K) # ,Kva,Kve,X0,verbose=verbose) + if not refit: + if verbose: sys.stderr.write("Computing fit for null model\n") + runlmm.fit() # follow GN model in run_other + if verbose: sys.stderr.write("\t heritability=%0.3f, sigma=%0.3f\n" % (runlmm.optH,runlmm.optSigma)) + + # outFile = "test.out" + # out = open(outFile,'w') + out = sys.stderr + + def outputResult(id,beta,betaSD,ts,ps): + out.write(formatResult(id,beta,betaSD,ts,ps)) + def printOutHead(): out.write("\t".join(["SNP_ID","BETA","BETA_SD","F_STAT","P_VALUE"]) + "\n") + def formatResult(id,beta,betaSD,ts,ps): + return "\t".join([str(x) for x in [id,beta,betaSD,ts,ps]]) + "\n" + + printOutHead() + + # Set up the pool + # mp.set_start_method('spawn') + q = mp.Queue() + p = mp.Pool(numThreads, f_init, [q]) + collect = [] + + # Buffers for pvalues and t-stats + # PS = [] + # TS = [] + count = 0 + + completed = 0 + last_j = 0 + # for snp_id in G: + for snp in G: + print snp.shape,snp + snp_id = ('SNPID',snp) + count += 1 + if count % 1000 == 0: + job = count/1000 + if verbose: + sys.stderr.write("Job %d At SNP %d\n" % (job,count)) + if numThreads == 1: + compute_snp(job,collect,q) + collect = [] + j,lines = q.get() + if verbose: + sys.stderr.write("Job "+str(j)+" finished\n") + for line in lines: + out.write(line) + else: + p.apply_async(compute_snp,(job,collect)) + collect = [] + while job > completed: + try: + j,lines = q.get_nowait() + if verbose: + sys.stderr.write("Job "+str(j)+" finished\n") + for line in lines: + out.write(line) + completed += 1 + except Queue.Empty: + pass + if job > completed + cpu_num + 5: + time.sleep(1) + else: + if job >= completed: + break + + collect.append(snp_id) + + if not numThreads or numThreads > 1: + for job in range(int(count/1000)-completed): + j,lines = q.get(True,15) # time out + if verbose: + sys.stderr.write("Job "+str(j)+" finished\n") + for line in lines: + out.write(line) + + # print collect + ps = [item[1][3] for item in collect] + ts = [item[1][2] for item in collect] + print ps + return ts,ps diff --git a/wqflask/wqflask/my_pylmm/pyLMM/kinship.py b/wqflask/wqflask/my_pylmm/pyLMM/kinship.py index cd2445f1..00f48939 100644 --- a/wqflask/wqflask/my_pylmm/pyLMM/kinship.py +++ b/wqflask/wqflask/my_pylmm/pyLMM/kinship.py @@ -26,7 +26,6 @@ import multiprocessing as mp # Multiprocessing is part of the Python stdlib import Queue import time - from optmatrix import matrix_initialize, matrixMultT def kinship_full(G): @@ -87,12 +86,13 @@ def kinship(G,computeSize=1000,numThreads=None,useBLAS=False,verbose=True): m = G.shape[0] # snps snps = m sys.stderr.write(str(m)+" SNPs\n") - assert m>n, "n should be larger than m (snps>inds)" + assert snps>inds, "snps should be larger than inds (%i snps, %i inds)" % (snps,inds) q = mp.Queue() p = mp.Pool(numThreads, f_init, [q]) cpu_num = mp.cpu_count() print "CPU cores:",cpu_num + print snps,computeSize iterations = snps/computeSize+1 # if testing: # iterations = 8 diff --git a/wqflask/wqflask/my_pylmm/pyLMM/lmm.py b/wqflask/wqflask/my_pylmm/pyLMM/lmm.py index 9fd05b42..8c6d3c3c 100644 --- a/wqflask/wqflask/my_pylmm/pyLMM/lmm.py +++ b/wqflask/wqflask/my_pylmm/pyLMM/lmm.py @@ -51,6 +51,7 @@ from utility.benchmark import Bench from utility import temp_data from kinship import kinship, kinship_full, kvakve import genotype +import gwas try: from wqflask.my_pylmm.pyLMM import chunks @@ -253,7 +254,7 @@ def human_association(snp, # refit=False, # temp_data=None): -def run_other(pheno_vector, +def run_other_old(pheno_vector, genotype_matrix, restricted_max_likelihood=True, refit=False, @@ -269,7 +270,7 @@ def run_other(pheno_vector, """ - print("In run_other") + print("In run_other (old)") print("REML=",restricted_max_likelihood," REFIT=",refit) with Bench("Calculate Kinship"): kinship_matrix,genotype_matrix = calculate_kinship(genotype_matrix, tempdata) @@ -277,9 +278,51 @@ def run_other(pheno_vector, print("kinship_matrix: ", pf(kinship_matrix)) print("kinship_matrix.shape: ", pf(kinship_matrix.shape)) + # with Bench("Create LMM object"): + # lmm_ob = LMM(pheno_vector, kinship_matrix) + + # with Bench("LMM_ob fitting"): + # lmm_ob.fit() + + print("genotype_matrix: ", genotype_matrix.shape) + print(genotype_matrix) + + with Bench("Doing GWAS"): + t_stats, p_values = GWAS(pheno_vector, + genotype_matrix, + kinship_matrix, + restricted_max_likelihood=True, + refit=False, + temp_data=tempdata) + Bench().report() + return p_values, t_stats + +def run_other_new(pheno_vector, + genotype_matrix, + restricted_max_likelihood=True, + refit=False, + tempdata=None # <---- can not be None + ): + + """Takes the phenotype vector and genotype matrix and returns a set of p-values and t-statistics + + restricted_max_likelihood -- whether to use restricted max likelihood; True or False + refit -- whether to refit the variance component for each marker + temp_data -- TempData object that stores the progress for each major step of the + calculations ("calculate_kinship" and "GWAS" take the majority of time) + + """ + + print("In run_other (new)") + print("REML=",restricted_max_likelihood," REFIT=",refit) + with Bench("Calculate Kinship"): + kinship_matrix,genotype_matrix = calculate_kinship(genotype_matrix, tempdata) + + print("kinship_matrix: ", pf(kinship_matrix)) + print("kinship_matrix.shape: ", pf(kinship_matrix.shape)) + with Bench("Create LMM object"): lmm_ob = LMM(pheno_vector, kinship_matrix) - with Bench("LMM_ob fitting"): lmm_ob.fit() @@ -287,15 +330,15 @@ def run_other(pheno_vector, print(genotype_matrix) with Bench("Doing GWAS"): - t_stats, p_values = GWAS(pheno_vector, - genotype_matrix, - kinship_matrix, - restricted_max_likelihood=True, - refit=False, - temp_data=tempdata) + t_stats, p_values = gwas.gwas(pheno_vector, + genotype_matrix.T, + kinship_matrix, + restricted_max_likelihood=True, + refit=False,verbose=True) Bench().report() return p_values, t_stats +run_other = run_other_old def matrixMult(A,B): @@ -821,4 +864,3 @@ if __name__ == '__main__': print("Run from runlmm.py instead") - diff --git a/wqflask/wqflask/my_pylmm/pyLMM/runlmm.py b/wqflask/wqflask/my_pylmm/pyLMM/runlmm.py index cef0cdc4..f17f1bd1 100644 --- a/wqflask/wqflask/my_pylmm/pyLMM/runlmm.py +++ b/wqflask/wqflask/my_pylmm/pyLMM/runlmm.py @@ -21,7 +21,7 @@ from optparse import OptionParser import sys import tsvreader import numpy as np -from lmm import gn2_load_redis, calculate_kinship +from lmm import gn2_load_redis, calculate_kinship_old from kinship import kinship, kinship_full import genotype import phenotype @@ -151,17 +151,17 @@ elif cmd == 'kinship': gnt = None if options.test_kinship: - K = kinship_full(G) + K = kinship_full(np.copy(G)) print "Genotype",G.shape, "\n", G print "first Kinship method",K.shape,"\n",K k1 = round(K[0][0],4) - K2 = calculate_kinship(np.copy(G.T),temp_data=None) + K2,G = calculate_kinship_old(np.copy(G).T,temp_data=None) print "Genotype",G.shape, "\n", G print "GN2 Kinship method",K2.shape,"\n",K2 k2 = round(K2[0][0],4) print "Genotype",G.shape, "\n", G - K3 = kinship(np.copy(G),options) + K3 = kinship(G.T) print "third Kinship method",K3.shape,"\n",K3 sys.stderr.write(options.geno+"\n") k3 = round(K3[0][0],4) -- cgit v1.2.3