diff options
author | Pjotr Prins | 2015-03-14 16:45:37 +0300 |
---|---|---|
committer | Pjotr Prins | 2015-03-14 16:45:37 +0300 |
commit | 58f43e9e767890031c77e44d10939c48bc1c81fe (patch) | |
tree | 27fe9a63201261f4c5616312768d103b4b52c954 | |
parent | 7759c755cb8d525ba3739864021921888bdf47e9 (diff) | |
download | genenetwork2-58f43e9e767890031c77e44d10939c48bc1c81fe.tar.gz |
Kinship multi-core: don't overload the queue with jobs
-rw-r--r-- | wqflask/wqflask/my_pylmm/pyLMM/kinship.py | 30 |
1 files changed, 19 insertions, 11 deletions
diff --git a/wqflask/wqflask/my_pylmm/pyLMM/kinship.py b/wqflask/wqflask/my_pylmm/pyLMM/kinship.py index a5497a77..967eca81 100644 --- a/wqflask/wqflask/my_pylmm/pyLMM/kinship.py +++ b/wqflask/wqflask/my_pylmm/pyLMM/kinship.py @@ -22,7 +22,8 @@ import sys import os import numpy as np import multiprocessing as mp # Multiprocessing is part of the Python stdlib -import Queue +import Queue +import time from optmatrix import matrix_initialize, matrixMultT @@ -90,6 +91,8 @@ def kinship(G,options): q = mp.Queue() p = mp.Pool(numThreads, f_init, [q]) + cpu_num = mp.cpu_count() + print "CPU cores:",cpu_num iterations = snps/options.computeSize+1 if options.testing: iterations = 8 @@ -105,6 +108,7 @@ def kinship(G,options): sys.stderr.write("Processing job %d first %d SNPs\n" % (job, ((job+1)*options.computeSize))) W = compute_W(job,G,n,snps,options.computeSize) if numThreads == 1: + # Single-core compute_matrixMult(job,W,q) j,x = q.get() if options.verbose: sys.stderr.write("Job "+str(j)+" finished\n") @@ -112,25 +116,29 @@ def kinship(G,options): # print j,K_j[:,0] K = K + K_j else: + # Multi-core results.append(p.apply_async(compute_matrixMult, (job,W))) # Do we have a result? - try: - j,x = q.get_nowait() - if options.verbose: sys.stderr.write("Job "+str(j)+" finished\n") - K_j = x - # print j,K_j[:,0] - K = K + K_j - completed += 1 - except Queue.Empty: - pass - + while (len(results)-completed>cpu_num): + try: + j,x = q.get_nowait() + if options.verbose: sys.stderr.write("Job "+str(j)+" finished\n") + K_j = x + # print j,K_j[:,0] + K = K + K_j + completed += 1 + except Queue.Empty: + pass + if numThreads == None or numThreads > 1: + # results contains the growing result set for job in range(len(results)-completed): j,x = q.get(True,15) if options.verbose: sys.stderr.write("Job "+str(j)+" finished\n") K_j = x # print j,K_j[:,0] K = K + K_j + completed += 1 K = K / float(snps) outFile = 'runtest.kin' |