aboutsummaryrefslogtreecommitdiff
path: root/scripts/lmdb-publishdata-export.scm
blob: 84271129009d5ee562d7363947bccf8d134b7e20 (about) (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
#! /usr/bin/env guile
!#
;; To run this script run:
;;
;; $ guix shell guile guile-hashing guile3-dbi guile3-dbd-mysql \
;; guile-lmdb -- guile lmdb-publishdata-export.scm
;; conn.scm
;;
;; Example conn.scm:
;; ((sql-username . "webqtlout")
;;  (sql-password . "xxxx")
;;  (sql-database . "db_webqtl")
;;  (sql-host . "localhost")
;;  (sql-port . 3306)
;;  (output-dir . "/tmp/data")
;;  (log-file . "/tmp/export.log"))

(use-modules (dbi dbi)
             (rnrs bytevectors)
             (system foreign)
             (ice-9 match)
             (srfi srfi-1)
             (srfi srfi-26)
	     (srfi srfi-43)
             (rnrs io ports)
             (hashing md5)
             ((lmdb lmdb) #:prefix mdb:)
             (ice-9 format)
             (ice-9 exceptions)
	     (json)
             (logging logger)
             (logging rotating-log)
             (logging port-log)
             (oop goops))


;; Set up logging
(define* (setup-logging #:key (log-file "lmdb-dump-log"))
  "Initialize the logging system with rotating file logs and error port output.
   Creates a new logger, adds rotating and error port handlers,
   sets it as the default logger, and opens the log for writing."
  (let ((lgr (make <logger>))
	(rotating (make <rotating-log>
		    #:num-files 3
		    #:size-limit 1024
		    #:file-name log-file))
	(err (make <port-log> #:port (current-error-port))))
    ;; add the handlers to our logger
    (add-handler! lgr rotating)
    (add-handler! lgr err)
    ;; make this the application's default logger
    (set-default-logger! lgr)
    (open-log! lgr)))

(define (shutdown-logging)
  "Properly shutdown the logging system.
   Flushes any pending log messages, closes the log handlers,
   and removes the default logger reference."
  (flush-log)  ;; since no args, it uses the default
  (close-log!) ;; since no args, it uses the default
  (set-default-logger! #f))

(define (call-with-database backend connection-string proc)
  "Execute PROC with an open database connection.  BACKEND is the
database type (e.g. \"mysql\").  CONNECTION-STRING is the database
connection string."
  (let ((db #f))
    (dynamic-wind
      (lambda ()
        (set! db (dbi-open backend connection-string)))
      (cut proc db)
      (lambda ()
        (when db
          (dbi-close db))))))

(define (call-with-target-database connection-settings proc)
  "Connect to the target database using CONNECTION-SETTINGS and execute
PROC."
  (call-with-database "mysql" (string-join
                               (list (assq-ref connection-settings 'sql-username)
                                     (assq-ref connection-settings 'sql-password)
                                     (assq-ref connection-settings 'sql-database)
                                     "tcp"
                                     (assq-ref connection-settings 'sql-host)
                                     (number->string
                                      (assq-ref connection-settings 'sql-port)))
                               ":")
                      proc))

(define* (lmdb-save path key value)
  "Save a NUM with KEY to PATH."
  (mdb:with-env-and-txn
     (path) (env txn)
     (let ((dbi (mdb:dbi-open txn #f 0)))
       (mdb:put txn dbi key
		(if (number? value)
                    (number->string value)
		    value)))))

(define (sql-exec db statement)
  "Execute an SQL STATEMENT on database connection DB.  Throws an error
if the statement execution fails."
  (dbi-query db statement)
  (database-check-status db))

(define (sql-fold proc init db statement)
  "Fold over SQL query results."
  (sql-exec db statement)
  (let loop ((result init))
    (let ((row (dbi-get_row db)))
      (if row
          (loop (proc row result))
          result))))

(define (sql-for-each proc db statement)
  "Apply PROC to each row returned by STATEMENT."
  (sql-fold (lambda (row _)
              (proc row))
            #f db statement))

(define (sql-map proc db statement)
  "Map PROC over rows returned by STATEMENT."
  (sql-fold (lambda (row result)
              (cons (proc row) result))
            (list) db statement))

(define (sql-find db statement)
  (sql-exec db statement)
  (dbi-get_row db))

(define (database-check-status db)
  "Check the status of the last database operation on DB.  Throws an
error if the status code is non-zero."
  (match (dbi-get_status db)
    ((code . str)
     (unless (zero? code)
       (error str)))))

(define* (save-dataset-values settings)
  "Main function to extract and save dataset values.  Queries the
database for datasets and their values, computes MD5 hashes for
dataset-trait combinations, and saves strain values to LMDB files in
/export5/lmdb-data-hashes/."
  (dynamic-wind
    (lambda ()
      (setup-logging #:log-file (assq-ref settings 'log-file))
      (log-msg 'INFO "Starting dataset value extraction"))
    (lambda ()
      (call-with-target-database
       settings
       (lambda (db)
         (sql-for-each
          (lambda (row)
	    (match row
	      ((("Name" . dataset-name)
		("Id" . trait-id))
			    (let* ((data-query (format #f "SELECT
JSON_ARRAYAGG(JSON_ARRAY(Strain.Name, PublishData.Value)) AS data,
 MD5(JSON_ARRAY(Strain.Name, PublishData.Value)) as md5hash
FROM
    PublishData
    INNER JOIN Strain ON PublishData.StrainId = Strain.Id
    INNER JOIN PublishXRef ON PublishData.Id = PublishXRef.DataId
    INNER JOIN PublishFreeze ON PublishXRef.InbredSetId = PublishFreeze.InbredSetId
LEFT JOIN PublishSE ON
    PublishSE.DataId = PublishData.Id AND
    PublishSE.StrainId = PublishData.StrainId
LEFT JOIN NStrain ON
    NStrain.DataId = PublishData.Id AND
    NStrain.StrainId = PublishData.StrainId
WHERE
    PublishFreeze.Name = \"~a\" AND
    PublishXRef.Id = ~a AND
    PublishFreeze.public > 0 AND
    PublishData.value IS NOT NULL AND
    PublishFreeze.confidentiality < 1
ORDER BY
    LENGTH(Strain.Name), Strain.Name" dataset-name trait-id)))
			      (match (call-with-target-database
				      settings
				      (lambda (db2) (sql-find db2 data-query)))
				((("data" . data)
				  ("md5hash" . md5-hash))
				 (let* ((trait-name (format #f "~a~a" dataset-name trait-id))
					(base-dir (assq-ref settings 'output-dir))
					(out (format #f "~a-~a" trait-name
						     (substring md5-hash 0 12)))
					(out-dir (format #f "~a/~a" base-dir out)))
				   (log-msg
				    'INFO (format #f "Writing ~a to: ~a" trait-name out-dir))
				   (unless (file-exists? out-dir)
				     (mkdir out-dir))
				   (lmdb-save (format #f "~a/index" base-dir) trait-name out)
				   (vector-for-each
				    (lambda (_ x)
				      (match x
					(#(strain value)
					 (lmdb-save out-dir strain value))))
				    (json-string->scm data)))))))))
          db
          "SELECT DISTINCT PublishFreeze.Name, PublishXRef.Id FROM
PublishData INNER JOIN Strain ON PublishData.StrainId = Strain.Id
INNER JOIN PublishXRef ON PublishData.Id = PublishXRef.DataId
INNER JOIN PublishFreeze ON PublishXRef.InbredSetId = PublishFreeze.InbredSetId
LEFT JOIN PublishSE ON
    PublishSE.DataId = PublishData.Id AND
    PublishSE.StrainId = PublishData.StrainId
LEFT JOIN NStrain ON
    NStrain.DataId = PublishData.Id AND
    NStrain.StrainId = PublishData.StrainId
WHERE
    PublishFreeze.public > 0 AND
    PublishFreeze.confidentiality < 1
ORDER BY
    PublishFreeze.Id, PublishXRef.Id"))))
    (lambda ()
      (shutdown-logging))))

(define main
  (match-lambda*
    ((_ connection-settings-file)
     (save-dataset-values (call-with-input-file connection-settings-file
			    read)))
    ((arg0 _ ...)
     (display (format "Usage: ~a CONNECTION-SETTINGS-FILE~%" arg0)
	      (current-error-port))
     (exit #f))))

(apply main (command-line))