about summary refs log tree commit diff
diff options
context:
space:
mode:
-rwxr-xr-xexamples/probesets-experiment-metadata.scm9
-rw-r--r--transform/special-forms.scm525
2 files changed, 391 insertions, 143 deletions
diff --git a/examples/probesets-experiment-metadata.scm b/examples/probesets-experiment-metadata.scm
index b2eb93e..d7c0764 100755
--- a/examples/probesets-experiment-metadata.scm
+++ b/examples/probesets-experiment-metadata.scm
@@ -70,10 +70,11 @@
       (name "ProbeSet Experiments Metadata")
       (connection %connection-settings)
       (table-metadata? #f)
-      (total-rows (assoc-ref
-                   (sql-find db "SELECT count(*) AS count from ProbeSetXRef")
-                   "count"))
-      (rows-per-chunk 1000000)
+      ;; Use composite keyset pagination over joined table identifiers.
+      (keyset-pagination? #t)
+      (chunk-key-field '("ProbeSetFreeze.Id" "ProbeSet.Id"))
+      ;; (total-rows 1000000)
+      (rows-per-chunk 10000)
       (prefixes
        '(("dcat:" "<http://www.w3.org/ns/dcat#>")
          ("gn:" "<http://rdf.genenetwork.org/v1/id/>")
diff --git a/transform/special-forms.scm b/transform/special-forms.scm
index 0c07a0a..3cf6a24 100644
--- a/transform/special-forms.scm
+++ b/transform/special-forms.scm
@@ -31,19 +31,16 @@
     (else #t)))
 
 (define (emit-short-turtle subject po-alist)
-  (let loop ((pairs po-alist) (first? #t))
-    (match pairs
-      (((p . o) rest ...)
-       (if (not (emittable-object? o))
-           (loop rest first?)   ; skip malformed or empty objects
-           (begin
-             ;; subject only once
-             (when first?
-               (format #t "~a " subject))
-             (when (not first?)
-               (format #t "\t"))
-
-             ;; emit predicate–object
+  ;; Hot path: avoid rescanning remaining pairs on each step (O(n^2)).
+  ;; Build filtered pairs and count in one pass.
+  (let collect ((pairs po-alist) (kept '()) (count 0))
+    (if (null? pairs)
+        (let loop ((pairs (reverse kept)) (index 0))
+          (match pairs
+            (((p . o) rest ...)
+             (if (zero? index)
+                 (format #t "~a " subject)
+                 (display "\t"))
              (match o
                ((? symbol?)
                 (format #t "~a ~a" p (symbol->string o)))
@@ -51,16 +48,47 @@
                 (format #t "~a \"~a\"" p o))
                (_
                 (format #t "~a ~s" p o)))
-
-             ;; separator depends on *remaining emittable pairs*
-             (if (any (match-lambda
-                        ((p . o) (emittable-object? o)))
-                      rest)
-                 (format #t " ;~%")
-                 (format #t " .~%"))
-
-             (loop rest #f))))
-      (() #f))))
+             (if (< (+ index 1) count)
+                 (display " ;\n")
+                 (display " .\n"))
+             (loop rest (+ index 1)))
+            (() #f)))
+        (match (car pairs)
+          ((p . o)
+           (if (emittable-object? o)
+               (collect (cdr pairs) (cons (cons p o) kept) (+ count 1))
+               (collect (cdr pairs) kept count)))))))
+
+(define (make-short-turtle-emitter subject)
+  "Return two procedures: EMIT! and FINISH! for streaming short-turtle output."
+  (let ((first? #t)
+        (pending #f))
+    (define (emit-one pair tail)
+      (let ((p (car pair))
+            (o (cdr pair)))
+        (if first?
+            (format #t "~a " subject)
+            (display "\t"))
+        (match o
+          ((? symbol?)
+           (format #t "~a ~a" p (symbol->string o)))
+          ((? string?)
+           (format #t "~a \"~a\"" p o))
+          (_
+           (format #t "~a ~s" p o)))
+        (display tail)
+        (set! first? #f)))
+    (define (emit! predicate object)
+      (when (emittable-object? object)
+        (let ((current (cons predicate object)))
+          (when pending
+            (emit-one pending " ;\n"))
+          (set! pending current))))
+    (define (finish!)
+      (when pending
+        (emit-one pending " .\n")
+        (set! pending #f)))
+    (values emit! finish!)))
 
 
 
@@ -184,65 +212,76 @@ Example:
     (syntax-case x ()
       ((_ alist actions ...)
        ;; TODO: Check that all actions are valid.
-       #`(let ((evaluated-alist alist))
-           (append (remove (match-lambda
-                             ;; Filter out results of filter-set actions.
-                             ((key . #f)
-                              (member key '#,(filter-map (lambda (action)
-                                                           (syntax-case action (filter-set)
-                                                             ((filter-set key expression) #'key)
-                                                             (_ #f)))
-                                                         #'(actions ...))))
-                             (_ #f))
-                           ;; Do set and filter-set.
-                           `#,(filter-map (lambda (action)
-                                            (syntax-case action (set filter-set)
-                                              ((set key expression)
-                                               #`(key . ,#,(key->assoc-ref #'evaluated-alist #'expression)))
-                                              ((filter-set key expression)
-                                               #`(key . ,#,(key->assoc-ref #'evaluated-alist #'expression)))
-                                              (_ #f)))
-                                          #'(actions ...)))
-                   ;; Do multiset.
-                   #,@(filter-map (lambda (action)
-                                    (syntax-case action (multiset)
-                                      ((multiset key expression)
-                                       #`(map (cut cons 'key <>)
-                                              #,(key->assoc-ref #'evaluated-alist #'expression)))
-                                      (_ #f)))
-                                  #'(actions ...))
-                   ;; Apply else=> procedure on unspecified keys. If
-                   ;; no else=> procedure is specified, delete
-                   ;; unspecified keys.
-                   (filter-map #,(or (any (lambda (action)
-                                            (syntax-case action (else=>)
-                                              ((else=> proc) #'proc)
-                                              (_ #f)))
-                                          #'(actions ...))
-                                     #'(const #f))
-                               ;; The unspecified part of the input
-                               ;; alist
-                               (alist-delete* evaluated-alist
-                                              (list
-                                               ;; Keys that were referenced
-                                               #,@(append-map (lambda (action)
-                                                                (syntax-case action ()
-                                                                  ((_ key expression)
-                                                                   (collect-keys #'expression))
-                                                                  (_ '())))
-                                                              #'(actions ...))
-                                               ;; Keys that were deleted
-                                               #,@(filter-map (lambda (action)
-                                                                (syntax-case action (remove)
-                                                                  ((remove key) #''key)
-                                                                  (_ #f)))
-                                                              #'(actions ...))
-                                               ;; Keys that were set
-                                               #,@(filter-map (lambda (action)
-                                                                (syntax-case action ()
-                                                                  ((_ key expression) #''key)
-                                                                  (_ #f)))
-                                                              #'(actions ...)))))))))))
+       (let* ((actions #'(actions ...))
+              (filter-set-keys
+               (filter-map (lambda (action)
+                             (syntax-case action (filter-set)
+                               ((filter-set key expression) #'key)
+                               (_ #f)))
+                           actions))
+              (set/filter-pairs
+               (filter-map (lambda (action)
+                             (syntax-case action (set filter-set)
+                               ((set key expression)
+                                #`(cons 'key #,(key->assoc-ref #'evaluated-alist #'expression)))
+                               ((filter-set key expression)
+                                #`(cons 'key #,(key->assoc-ref #'evaluated-alist #'expression)))
+                               (_ #f)))
+                           actions))
+              (multiset-exprs
+               (filter-map (lambda (action)
+                             (syntax-case action (multiset)
+                               ((multiset key expression)
+                                #`(map (cut cons 'key <>)
+                                       #,(key->assoc-ref #'evaluated-alist #'expression)))
+                               (_ #f)))
+                           actions))
+              (else-proc
+               (any (lambda (action)
+                      (syntax-case action (else=>)
+                        ((else=> proc) #'proc)
+                        (_ #f)))
+                    actions))
+              (unspecified-keys
+               #`(list
+                  ;; Keys that were referenced
+                  #,@(append-map (lambda (action)
+                                   (syntax-case action ()
+                                     ((_ key expression)
+                                      (collect-keys #'expression))
+                                     (_ '())))
+                                 actions)
+                  ;; Keys that were deleted
+                  #,@(filter-map (lambda (action)
+                                   (syntax-case action (remove)
+                                     ((remove key) #''key)
+                                     (_ #f)))
+                                 actions)
+                  ;; Keys that were set
+                  #,@(filter-map (lambda (action)
+                                   (syntax-case action ()
+                                     ((_ key expression) #''key)
+                                     (_ #f)))
+                                 actions))))
+         #`(let ((evaluated-alist alist))
+             (append
+              ;; Do set and filter-set.
+              #,(if (null? filter-set-keys)
+                    #`(list #,@set/filter-pairs)
+                    #`(remove (match-lambda
+                                ((key . #f)
+                                 (member key '#,filter-set-keys))
+                                (_ #f))
+                              (list #,@set/filter-pairs)))
+              ;; Do multiset.
+              #,@multiset-exprs
+              ;; Apply else=> only when explicitly present.
+              #,@(if else-proc
+                     (list
+                      #`(filter-map #,else-proc
+                                    (alist-delete* evaluated-alist
+                                                   #,unspecified-keys)))
+                     '()))))))))
 
 
 
@@ -416,22 +455,56 @@ must be remedied."
                                                #'(schema-triples)))
                     ((triples subject predicate-clauses ...) (triples)
                      (find-clause #'(clauses ...) 'triples)))
-         #`(define* (name db #:key
-                          (metadata? #f)
-                          (data? #t)
-                          (documentation? #f)
-                          (limit #f)
-                          (offset #f))
-             (let* ((base-sql
-                     (select-query #,(collect-fields #'(subject predicate-clauses ...))
-                                   (primary-table other-tables ...)
-                                   tables-raw ...))
-                    (sql
-		      (if (and limit offset)
-			  (format #f "~a LIMIT ~a OFFSET ~a"
-				  base-sql limit offset)
-			  base-sql)))
-               (when metadata?
+	         #`(define* (name db #:key
+	                          (metadata? #f)
+	                          (data? #t)
+	                          (documentation? #f)
+                                  (keyset? #f)
+                                  (chunk-key "Id")
+	                          (limit #f)
+	                          (offset #f)
+                                  (cursor #f))
+	             (let* ((base-sql
+	                     (select-query #,(collect-fields #'(subject predicate-clauses ...))
+	                                   (primary-table other-tables ...)
+	                                   tables-raw ...))
+                            (chunk-keys
+                             (if (list? chunk-key)
+                                 chunk-key
+                                 (list chunk-key)))
+                            (chunk-key-exprs
+                             (map (lambda (k)
+                                    (if (string-index k #\.)
+                                        k
+                                        (format #f "~a.~a"
+                                                #,(symbol->string (syntax->datum #'primary-table))
+                                                k)))
+                                  chunk-keys))
+                            (sql-for-keyset
+                             (prepend-select-expr
+                              base-sql
+                              (string-join
+                               (map (lambda (expr idx)
+                                      (format #f "~a AS __chunk_key_~a" expr (+ idx 1)))
+                                    chunk-key-exprs
+                                    (iota (length chunk-key-exprs)))
+                               ", ")))
+		                    (sql
+                             (cond
+                              (keyset?
+                               (append-order-limit
+                                (if cursor
+                                    (append-filter-clause
+                                     sql-for-keyset
+                                     (build-keyset-where chunk-key-exprs cursor))
+                                    sql-for-keyset)
+                                (build-keyset-order chunk-key-exprs)
+                                (or limit 100000)))
+			      ((and limit offset)
+			       (format #f "~a LIMIT ~a OFFSET ~a"
+				       base-sql limit offset))
+			      (else base-sql))))
+		               (when metadata?
                  #,@(let ((table (symbol->string (syntax->datum #'primary-table)))
                           (subject-type (any (lambda (predicate)
                                                (syntax-case predicate (rdf:type)
@@ -557,16 +630,144 @@ The above query results to triples that have the form:
                                 (list 'triple-predicate ...)
                                 (list 'triple-object ...)))
                    (_ (error "Invalid schema triples clause:" #'schema-triples-clause)))
-	       (sql-for-each
-		  (lambda (row)
-                    (let* ((subject-val #,(field->assoc-ref #'row #'subject))
-                           (po-alist
-                            (map-alist row #,@(field->key #'(predicate-clauses ...)))))
-                      (emit-short-turtle subject-val po-alist)))
-                  db
-                  sql))))))
+                        (let ((rows-written 0)
+                             (last-cursor cursor))
+		         (sql-for-each
+			  (lambda (row)
+                            (set! rows-written (+ rows-written 1))
+                            (when keyset?
+                              (set! last-cursor
+                                    (map (lambda (idx)
+                                           (assoc-ref row (format #f "__chunk_key_~a" (+ idx 1))))
+                                         (iota (if (list? chunk-key)
+                                                   (length chunk-key)
+                                                   1)))))
+	                    (let ((subject-val #,(field->assoc-ref #'row #'subject)))
+	                      (call-with-values
+	                          (lambda () (make-short-turtle-emitter subject-val))
+	                        (lambda (emit! finish!)
+	                          #,@(map (lambda (predicate-clause)
+	                                    (syntax-case predicate-clause (set multiset filter-set)
+	                                      ((set predicate object)
+	                                       #`(emit! 'predicate
+	                                                #,(field->assoc-ref #'row #'object)))
+	                                      ((filter-set predicate object)
+	                                       #`(let ((val #,(field->assoc-ref #'row #'object)))
+	                                           (when val
+	                                             (emit! 'predicate val))))
+	                                      ((multiset predicate object)
+	                                       #`(for-each (lambda (val)
+	                                                     (emit! 'predicate val))
+	                                                   #,(field->assoc-ref #'row #'object)))
+	                                      (_ (error "Invalid predicate clause:" predicate-clause))))
+	                                  #'(predicate-clauses ...))
+	                          (finish!)))))
+	                  db
+	                  sql)
+                         (if keyset?
+                             (cons rows-written last-cursor)
+                             rows-written)))))))
       (_ (error "Invalid define-transformer syntax:" (syntax->datum x))))))
 
+(define (sql-literal v)
+  (cond
+   ((number? v) (number->string v))
+   ((string? v)
+    (format #f "'~a'" (replace-substrings v '(("'" . "''")))))
+   ((symbol? v)
+    (format #f "'~a'" (symbol->string v)))
+   (else
+    (format #f "'~a'" v))))
+
+(define (normalize-keyset-cursor cursor key-count)
+  (cond
+   ((not cursor) #f)
+   ((and (= key-count 1) (not (list? cursor)))
+    (list cursor))
+   ((and (list? cursor) (= (length cursor) key-count))
+    cursor)
+   (else
+    (error "Invalid keyset cursor, expected "
+           key-count " values, got: " cursor))))
+
+(define (normalize-key-exprs key-spec)
+  "Accept either a list of SQL key expressions or a key count."
+  (cond
+   ((list? key-spec) key-spec)
+   ((number? key-spec)
+    (map (lambda (idx)
+           (format #f "__chunk_key_~a" (+ idx 1)))
+         (iota key-spec)))
+   (else
+    (error "Invalid key specification: " key-spec))))
+
+(define (build-keyset-order key-spec)
+  (string-join
+   (normalize-key-exprs key-spec)
+   ", "))
+
+(define (build-keyset-where key-spec cursor)
+  (let* ((key-exprs (normalize-key-exprs key-spec))
+         (key-count (length key-exprs))
+         (vals (normalize-keyset-cursor cursor key-count)))
+    (unless vals
+      (error "build-keyset-where called with empty cursor"))
+    (string-join
+     (map (lambda (i)
+            (let ((eq-prefix
+                   (string-join
+                    (map (lambda (j)
+                           (format #f "~a = ~a"
+                                   (list-ref key-exprs j)
+                                   (sql-literal (list-ref vals j))))
+                         (iota i))
+                    " AND "))
+                  (gt-part
+                   (format #f "~a > ~a"
+                           (list-ref key-exprs i)
+                           (sql-literal (list-ref vals i)))))
+              (if (zero? i)
+                  gt-part
+                  (string-append "(" eq-prefix " AND " gt-part ")"))))
+          (iota key-count))
+     " OR ")))
+
+(define (find-sql-tail-start sql)
+  (let ((positions
+         (filter-map identity
+                     (list (string-contains sql " GROUP BY ")
+                           (string-contains sql " ORDER BY ")
+                           (string-contains sql " LIMIT ")))))
+    (if (null? positions) #f (apply min positions))))
+
+(define (append-filter-clause sql clause)
+  (let* ((tail-start (find-sql-tail-start sql))
+         (head (if tail-start (substring sql 0 tail-start) sql))
+         (tail (if tail-start (substring sql tail-start) "")))
+    (if (string-contains head " WHERE ")
+        (string-append head " AND (" clause ")" tail)
+        (string-append head " WHERE (" clause ")" tail))))
+
+(define (append-order-limit sql order-expr limit)
+  (let* ((tail-start (find-sql-tail-start sql))
+         (head (if tail-start (substring sql 0 tail-start) sql))
+         (tail (if tail-start (substring sql tail-start) "")))
+    (string-append head tail
+                   " ORDER BY " order-expr
+                   " LIMIT " (number->string limit))))
+
+(define (prepend-select-expr sql expr)
+  "Prepend EXPR to the SELECT list of SQL."
+  (cond
+   ((and (>= (string-length sql) 16)
+         (string=? (substring sql 0 16) "SELECT DISTINCT "))
+    (string-append "SELECT DISTINCT " expr ", " (substring sql 16)))
+   ((and (>= (string-length sql) 7)
+         (string=? (substring sql 0 7) "SELECT "))
+    (string-append "SELECT " expr ", " (substring sql 7)))
+   (else
+    (error "Expected SQL SELECT query" sql))))
+
 (define (get-keyword-value args keyword default)
   (let ((kv (memq keyword args)))
     (if (and kv (>= (length kv) 2))
@@ -582,10 +783,12 @@ The above query results to triples that have the form:
             (table-metadata? (assoc-ref alist 'table-metadata?))
             (prefixes (assoc-ref alist 'prefixes))
             (inputs (assoc-ref alist 'inputs))
-            (outputs (assoc-ref alist 'outputs))
-            (total-rows (assoc-ref alist 'total-rows))
-            (rows-per-chunk (assoc-ref alist 'rows-per-chunk))
-            (chunking? (and total-rows rows-per-chunk))
+	            (outputs (assoc-ref alist 'outputs))
+	            (total-rows (assoc-ref alist 'total-rows))
+	            (rows-per-chunk (assoc-ref alist 'rows-per-chunk))
+                    (keyset-pagination? (assoc-ref alist 'keyset-pagination?))
+                    (chunk-key-field (assoc-ref alist 'chunk-key-field))
+	            (chunking? (and total-rows rows-per-chunk))
             (chunks (if chunking?
                         (ceiling (/ total-rows rows-per-chunk))
                         1))
@@ -614,32 +817,76 @@ The above query results to triples that have the form:
                  inputs))
               #:encoding "UTF-8"))
 
-          ;; Dumping the actual data
-          (when rdf-path
-            (do ((i 0 (+ i 1)))
-                ((>= i chunks))
-              (let* ((offset (* i (or rows-per-chunk 0)))
-                     (out-file
-                      (if (= chunks 1)
-                          rdf-path
-                          (string-append (path-without-extension rdf-path)
-                                         "." (number->string (+ i 1)) ".ttl"))))
-                (with-output-to-file
-                    out-file
-                  (lambda ()
-                    ;; Add the prefixes
-                    (for-each
-                     (match-lambda
-                       ((k v)
-                        (begin
-                          (prefix k v))))
-                     prefixes)
-                    (newline)
-                    (for-each
-                     (lambda (proc)
-                       (proc db #:metadata? table-metadata?
-                             #:limit rows-per-chunk
-                             #:offset offset))
-                     inputs))
-                  #:encoding "UTF-8"))))))))))
-
+	          ;; Dumping the actual data
+	          (when rdf-path
+                    (if keyset-pagination?
+                        ;; Cursor-based chunking with configurable key column.
+                        (let loop ((chunk-index 0)
+                                   (states (map (lambda (_) (cons #f #f)) inputs)))
+                          (unless (every cdr states)
+                            (let* ((out-file
+                                    (if (= chunk-index 0)
+                                        rdf-path
+                                        (string-append (path-without-extension rdf-path)
+                                                       "." (number->string (+ chunk-index 1)) ".ttl")))
+                                   (next-states
+                                    (with-output-to-file
+                                        out-file
+                                      (lambda ()
+                                        (for-each
+                                         (match-lambda
+                                           ((k v)
+                                            (prefix k v)))
+                                         prefixes)
+                                        (newline)
+                                        (let walk ((procs inputs)
+                                                   (st states)
+                                                   (acc '()))
+                                          (if (null? procs)
+                                              (reverse acc)
+                                              (let* ((state (car st))
+                                                     (cursor (car state))
+                                                     (done? (cdr state)))
+                                                (if done?
+                                                    (walk (cdr procs) (cdr st) (cons state acc))
+                                                    (let* ((result ((car procs)
+                                                                    db
+                                                                    #:metadata? table-metadata?
+                                                                    #:keyset? #t
+                                                                    #:chunk-key (or chunk-key-field "Id")
+                                                                    #:limit rows-per-chunk
+                                                                    #:cursor cursor))
+                                                           (rows (car result))
+                                                           (next-cursor (cdr result))
+                                                           (proc-done? (or (= rows 0)
+                                                                           (< rows rows-per-chunk))))
+                                                      (walk (cdr procs)
+                                                            (cdr st)
+                                                            (cons (cons next-cursor proc-done?) acc))))))))
+                                      #:encoding "UTF-8")))
+                              (loop (+ chunk-index 1) next-states))))
+                        ;; Legacy OFFSET chunking.
+                        (do ((i 0 (+ i 1)))
+                            ((>= i chunks))
+                          (let* ((offset (* i (or rows-per-chunk 0)))
+                                 (out-file
+                                  (if (= chunks 1)
+                                      rdf-path
+                                      (string-append (path-without-extension rdf-path)
+                                                     "." (number->string (+ i 1)) ".ttl"))))
+                            (with-output-to-file
+                                out-file
+                              (lambda ()
+                                (for-each
+                                 (match-lambda
+                                   ((k v)
+                                    (prefix k v)))
+                                 prefixes)
+                                (newline)
+                                (for-each
+                                 (lambda (proc)
+                                   (proc db #:metadata? table-metadata?
+                                         #:limit rows-per-chunk
+                                         #:offset offset))
+                                 inputs))
+                              #:encoding "UTF-8")))))))))))