aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPjotr Prins2015-03-14 16:45:37 +0300
committerPjotr Prins2015-03-14 16:45:37 +0300
commit58f43e9e767890031c77e44d10939c48bc1c81fe (patch)
tree27fe9a63201261f4c5616312768d103b4b52c954
parent7759c755cb8d525ba3739864021921888bdf47e9 (diff)
downloadgenenetwork2-58f43e9e767890031c77e44d10939c48bc1c81fe.tar.gz
Kinship multi-core: don't overload the queue with jobs
-rw-r--r--wqflask/wqflask/my_pylmm/pyLMM/kinship.py30
1 files changed, 19 insertions, 11 deletions
diff --git a/wqflask/wqflask/my_pylmm/pyLMM/kinship.py b/wqflask/wqflask/my_pylmm/pyLMM/kinship.py
index a5497a77..967eca81 100644
--- a/wqflask/wqflask/my_pylmm/pyLMM/kinship.py
+++ b/wqflask/wqflask/my_pylmm/pyLMM/kinship.py
@@ -22,7 +22,8 @@ import sys
import os
import numpy as np
import multiprocessing as mp # Multiprocessing is part of the Python stdlib
-import Queue
+import Queue
+import time
from optmatrix import matrix_initialize, matrixMultT
@@ -90,6 +91,8 @@ def kinship(G,options):
q = mp.Queue()
p = mp.Pool(numThreads, f_init, [q])
+ cpu_num = mp.cpu_count()
+ print "CPU cores:",cpu_num
iterations = snps/options.computeSize+1
if options.testing:
iterations = 8
@@ -105,6 +108,7 @@ def kinship(G,options):
sys.stderr.write("Processing job %d first %d SNPs\n" % (job, ((job+1)*options.computeSize)))
W = compute_W(job,G,n,snps,options.computeSize)
if numThreads == 1:
+ # Single-core
compute_matrixMult(job,W,q)
j,x = q.get()
if options.verbose: sys.stderr.write("Job "+str(j)+" finished\n")
@@ -112,25 +116,29 @@ def kinship(G,options):
# print j,K_j[:,0]
K = K + K_j
else:
+ # Multi-core
results.append(p.apply_async(compute_matrixMult, (job,W)))
# Do we have a result?
- try:
- j,x = q.get_nowait()
- if options.verbose: sys.stderr.write("Job "+str(j)+" finished\n")
- K_j = x
- # print j,K_j[:,0]
- K = K + K_j
- completed += 1
- except Queue.Empty:
- pass
-
+ while (len(results)-completed>cpu_num):
+ try:
+ j,x = q.get_nowait()
+ if options.verbose: sys.stderr.write("Job "+str(j)+" finished\n")
+ K_j = x
+ # print j,K_j[:,0]
+ K = K + K_j
+ completed += 1
+ except Queue.Empty:
+ pass
+
if numThreads == None or numThreads > 1:
+ # results contains the growing result set
for job in range(len(results)-completed):
j,x = q.get(True,15)
if options.verbose: sys.stderr.write("Job "+str(j)+" finished\n")
K_j = x
# print j,K_j[:,0]
K = K + K_j
+ completed += 1
K = K / float(snps)
outFile = 'runtest.kin'