about summary refs log tree commit diff
diff options
context:
space:
mode:
-rwxr-xr-xexamples/probesets-experiment-metadata.scm9
-rw-r--r--transform/special-forms.scm525
2 files changed, 143 insertions, 391 deletions
diff --git a/examples/probesets-experiment-metadata.scm b/examples/probesets-experiment-metadata.scm
index d7c0764..b2eb93e 100755
--- a/examples/probesets-experiment-metadata.scm
+++ b/examples/probesets-experiment-metadata.scm
@@ -70,11 +70,10 @@
       (name "ProbeSet Experiments Metadata")
       (connection %connection-settings)
       (table-metadata? #f)
-      ;; Use composite keyset pagination over joined table identifiers.
-      (keyset-pagination? #t)
-      (chunk-key-field '("ProbeSetFreeze.Id" "ProbeSet.Id"))
-      ;; (total-rows 1000000)
-      (rows-per-chunk 10000)
+      (total-rows (assoc-ref
+                   (sql-find db "SELECT count(*) AS count from ProbeSetXRef")
+                   "count"))
+      (rows-per-chunk 1000000)
       (prefixes
        '(("dcat:" "<http://www.w3.org/ns/dcat#>")
          ("gn:" "<http://rdf.genenetwork.org/v1/id/>")
diff --git a/transform/special-forms.scm b/transform/special-forms.scm
index 3cf6a24..0c07a0a 100644
--- a/transform/special-forms.scm
+++ b/transform/special-forms.scm
@@ -31,16 +31,19 @@
     (else #t)))
 
 (define (emit-short-turtle subject po-alist)
-  ;; Hot path: avoid rescanning remaining pairs on each step (O(n^2)).
-  ;; Build filtered pairs and count in one pass.
-  (let collect ((pairs po-alist) (kept '()) (count 0))
-    (if (null? pairs)
-        (let loop ((pairs (reverse kept)) (index 0))
-          (match pairs
-            (((p . o) rest ...)
-             (if (zero? index)
-                 (format #t "~a " subject)
-                 (display "\t"))
+  (let loop ((pairs po-alist) (first? #t))
+    (match pairs
+      (((p . o) rest ...)
+       (if (not (emittable-object? o))
+           (loop rest first?)   ; skip malformed or empty objects
+           (begin
+             ;; subject only once
+             (when first?
+               (format #t "~a " subject))
+             (when (not first?)
+               (format #t "\t"))
+
+             ;; emit predicate–object
              (match o
                ((? symbol?)
                 (format #t "~a ~a" p (symbol->string o)))
@@ -48,47 +51,16 @@
                 (format #t "~a \"~a\"" p o))
                (_
                 (format #t "~a ~s" p o)))
-             (if (< (+ index 1) count)
-                 (display " ;\n")
-                 (display " .\n"))
-             (loop rest (+ index 1)))
-            (() #f)))
-        (match (car pairs)
-          ((p . o)
-           (if (emittable-object? o)
-               (collect (cdr pairs) (cons (cons p o) kept) (+ count 1))
-               (collect (cdr pairs) kept count)))))))
-
-(define (make-short-turtle-emitter subject)
-  "Return two procedures: EMIT! and FINISH! for streaming short-turtle output."
-  (let ((first? #t)
-        (pending #f))
-    (define (emit-one pair tail)
-      (let ((p (car pair))
-            (o (cdr pair)))
-        (if first?
-            (format #t "~a " subject)
-            (display "\t"))
-        (match o
-          ((? symbol?)
-           (format #t "~a ~a" p (symbol->string o)))
-          ((? string?)
-           (format #t "~a \"~a\"" p o))
-          (_
-           (format #t "~a ~s" p o)))
-        (display tail)
-        (set! first? #f)))
-    (define (emit! predicate object)
-      (when (emittable-object? object)
-        (let ((current (cons predicate object)))
-          (when pending
-            (emit-one pending " ;\n"))
-          (set! pending current))))
-    (define (finish!)
-      (when pending
-        (emit-one pending " .\n")
-        (set! pending #f)))
-    (values emit! finish!)))
+
+             ;; separator depends on *remaining emittable pairs*
+             (if (any (match-lambda
+                        ((p . o) (emittable-object? o)))
+                      rest)
+                 (format #t " ;~%")
+                 (format #t " .~%"))
+
+             (loop rest #f))))
+      (() #f))))
 
 
 
@@ -212,76 +184,65 @@ Example:
     (syntax-case x ()
       ((_ alist actions ...)
        ;; TODO: Check that all actions are valid.
-       (let* ((actions #'(actions ...))
-              (filter-set-keys
-               (filter-map (lambda (action)
-                             (syntax-case action (filter-set)
-                               ((filter-set key expression) #'key)
-                               (_ #f)))
-                           actions))
-              (set/filter-pairs
-               (filter-map (lambda (action)
-                             (syntax-case action (set filter-set)
-                               ((set key expression)
-                                #`(cons 'key #,(key->assoc-ref #'evaluated-alist #'expression)))
-                               ((filter-set key expression)
-                                #`(cons 'key #,(key->assoc-ref #'evaluated-alist #'expression)))
-                               (_ #f)))
-                           actions))
-              (multiset-exprs
-               (filter-map (lambda (action)
-                             (syntax-case action (multiset)
-                               ((multiset key expression)
-                                #`(map (cut cons 'key <>)
-                                       #,(key->assoc-ref #'evaluated-alist #'expression)))
-                               (_ #f)))
-                           actions))
-              (else-proc
-               (any (lambda (action)
-                      (syntax-case action (else=>)
-                        ((else=> proc) #'proc)
-                        (_ #f)))
-                    actions))
-              (unspecified-keys
-               #`(list
-                  ;; Keys that were referenced
-                  #,@(append-map (lambda (action)
-                                   (syntax-case action ()
-                                     ((_ key expression)
-                                      (collect-keys #'expression))
-                                     (_ '())))
-                                 actions)
-                  ;; Keys that were deleted
-                  #,@(filter-map (lambda (action)
-                                   (syntax-case action (remove)
-                                     ((remove key) #''key)
-                                     (_ #f)))
-                                 actions)
-                  ;; Keys that were set
-                  #,@(filter-map (lambda (action)
-                                   (syntax-case action ()
-                                     ((_ key expression) #''key)
-                                     (_ #f)))
-                                 actions))))
-         #`(let ((evaluated-alist alist))
-             (append
-              ;; Do set and filter-set.
-              #,(if (null? filter-set-keys)
-                    #`(list #,@set/filter-pairs)
-                    #`(remove (match-lambda
-                                ((key . #f)
-                                 (member key '#,filter-set-keys))
-                                (_ #f))
-                              (list #,@set/filter-pairs)))
-              ;; Do multiset.
-              #,@multiset-exprs
-              ;; Apply else=> only when explicitly present.
-              #,@(if else-proc
-                     (list
-                      #`(filter-map #,else-proc
-                                    (alist-delete* evaluated-alist
-                                                   #,unspecified-keys)))
-                     '()))))))))
+       #`(let ((evaluated-alist alist))
+           (append (remove (match-lambda
+                             ;; Filter out results of filter-set actions.
+                             ((key . #f)
+                              (member key '#,(filter-map (lambda (action)
+                                                           (syntax-case action (filter-set)
+                                                             ((filter-set key expression) #'key)
+                                                             (_ #f)))
+                                                         #'(actions ...))))
+                             (_ #f))
+                           ;; Do set and filter-set.
+                           `#,(filter-map (lambda (action)
+                                            (syntax-case action (set filter-set)
+                                              ((set key expression)
+                                               #`(key . ,#,(key->assoc-ref #'evaluated-alist #'expression)))
+                                              ((filter-set key expression)
+                                               #`(key . ,#,(key->assoc-ref #'evaluated-alist #'expression)))
+                                              (_ #f)))
+                                          #'(actions ...)))
+                   ;; Do multiset.
+                   #,@(filter-map (lambda (action)
+                                    (syntax-case action (multiset)
+                                      ((multiset key expression)
+                                       #`(map (cut cons 'key <>)
+                                              #,(key->assoc-ref #'evaluated-alist #'expression)))
+                                      (_ #f)))
+                                  #'(actions ...))
+                   ;; Apply else=> procedure on unspecified keys. If
+                   ;; no else=> procedure is specified, delete
+                   ;; unspecified keys.
+                   (filter-map #,(or (any (lambda (action)
+                                            (syntax-case action (else=>)
+                                              ((else=> proc) #'proc)
+                                              (_ #f)))
+                                          #'(actions ...))
+                                     #'(const #f))
+                               ;; The unspecified part of the input
+                               ;; alist
+                               (alist-delete* evaluated-alist
+                                              (list
+                                               ;; Keys that were referenced
+                                               #,@(append-map (lambda (action)
+                                                                (syntax-case action ()
+                                                                  ((_ key expression)
+                                                                   (collect-keys #'expression))
+                                                                  (_ '())))
+                                                              #'(actions ...))
+                                               ;; Keys that were deleted
+                                               #,@(filter-map (lambda (action)
+                                                                (syntax-case action (remove)
+                                                                  ((remove key) #''key)
+                                                                  (_ #f)))
+                                                              #'(actions ...))
+                                               ;; Keys that were set
+                                               #,@(filter-map (lambda (action)
+                                                                (syntax-case action ()
+                                                                  ((_ key expression) #''key)
+                                                                  (_ #f)))
+                                                              #'(actions ...)))))))))))
 
 
 
@@ -455,56 +416,22 @@ must be remedied."
                                                #'(schema-triples)))
                     ((triples subject predicate-clauses ...) (triples)
                      (find-clause #'(clauses ...) 'triples)))
-	         #`(define* (name db #:key
-	                          (metadata? #f)
-	                          (data? #t)
-	                          (documentation? #f)
-                                  (keyset? #f)
-                                  (chunk-key "Id")
-	                          (limit #f)
-	                          (offset #f)
-                                  (cursor #f))
-	             (let* ((base-sql
-	                     (select-query #,(collect-fields #'(subject predicate-clauses ...))
-	                                   (primary-table other-tables ...)
-	                                   tables-raw ...))
-                            (chunk-keys
-                             (if (list? chunk-key)
-                                 chunk-key
-                                 (list chunk-key)))
-                            (chunk-key-exprs
-                             (map (lambda (k)
-                                    (if (string-index k #\.)
-                                        k
-                                        (format #f "~a.~a"
-                                                #,(symbol->string (syntax->datum #'primary-table))
-                                                k)))
-                                  chunk-keys))
-                            (sql-for-keyset
-                             (prepend-select-expr
-                              base-sql
-                              (string-join
-                               (map (lambda (expr idx)
-                                      (format #f "~a AS __chunk_key_~a" expr (+ idx 1)))
-                                    chunk-key-exprs
-                                    (iota (length chunk-key-exprs)))
-                               ", ")))
-		                    (sql
-                             (cond
-                              (keyset?
-                               (append-order-limit
-                                (if cursor
-                                    (append-filter-clause
-                                     sql-for-keyset
-                                     (build-keyset-where chunk-key-exprs cursor))
-                                    sql-for-keyset)
-                                (build-keyset-order chunk-key-exprs)
-                                (or limit 100000)))
-			      ((and limit offset)
-			       (format #f "~a LIMIT ~a OFFSET ~a"
-				       base-sql limit offset))
-			      (else base-sql))))
-		               (when metadata?
+         #`(define* (name db #:key
+                          (metadata? #f)
+                          (data? #t)
+                          (documentation? #f)
+                          (limit #f)
+                          (offset #f))
+             (let* ((base-sql
+                     (select-query #,(collect-fields #'(subject predicate-clauses ...))
+                                   (primary-table other-tables ...)
+                                   tables-raw ...))
+                    (sql
+		      (if (and limit offset)
+			  (format #f "~a LIMIT ~a OFFSET ~a"
+				  base-sql limit offset)
+			  base-sql)))
+               (when metadata?
                  #,@(let ((table (symbol->string (syntax->datum #'primary-table)))
                           (subject-type (any (lambda (predicate)
                                                (syntax-case predicate (rdf:type)
@@ -630,144 +557,16 @@ The above query results to triples that have the form:
                                 (list 'triple-predicate ...)
                                 (list 'triple-object ...)))
                    (_ (error "Invalid schema triples clause:" #'schema-triples-clause)))
-                        (let ((rows-written 0)
-                             (last-cursor cursor))
-		         (sql-for-each
-			  (lambda (row)
-                            (set! rows-written (+ rows-written 1))
-                            (when keyset?
-                              (set! last-cursor
-                                    (map (lambda (idx)
-                                           (assoc-ref row (format #f "__chunk_key_~a" (+ idx 1))))
-                                         (iota (if (list? chunk-key)
-                                                   (length chunk-key)
-                                                   1)))))
-	                    (let ((subject-val #,(field->assoc-ref #'row #'subject)))
-	                      (call-with-values
-	                          (lambda () (make-short-turtle-emitter subject-val))
-	                        (lambda (emit! finish!)
-	                          #,@(map (lambda (predicate-clause)
-	                                    (syntax-case predicate-clause (set multiset filter-set)
-	                                      ((set predicate object)
-	                                       #`(emit! 'predicate
-	                                                #,(field->assoc-ref #'row #'object)))
-	                                      ((filter-set predicate object)
-	                                       #`(let ((val #,(field->assoc-ref #'row #'object)))
-	                                           (when val
-	                                             (emit! 'predicate val))))
-	                                      ((multiset predicate object)
-	                                       #`(for-each (lambda (val)
-	                                                     (emit! 'predicate val))
-	                                                   #,(field->assoc-ref #'row #'object)))
-	                                      (_ (error "Invalid predicate clause:" predicate-clause))))
-	                                  #'(predicate-clauses ...))
-	                          (finish!)))))
-	                  db
-	                  sql)
-                         (if keyset?
-                             (cons rows-written last-cursor)
-                             rows-written)))))))
+	       (sql-for-each
+		  (lambda (row)
+                    (let* ((subject-val #,(field->assoc-ref #'row #'subject))
+                           (po-alist
+                            (map-alist row #,@(field->key #'(predicate-clauses ...)))))
+                      (emit-short-turtle subject-val po-alist)))
+                  db
+                  sql))))))
       (_ (error "Invalid define-transformer syntax:" (syntax->datum x))))))
 
-(define (sql-literal v)
-  (cond
-   ((number? v) (number->string v))
-   ((string? v)
-    (format #f "'~a'" (replace-substrings v '(("'" . "''")))))
-   ((symbol? v)
-    (format #f "'~a'" (symbol->string v)))
-   (else
-    (format #f "'~a'" v))))
-
-(define (normalize-keyset-cursor cursor key-count)
-  (cond
-   ((not cursor) #f)
-   ((and (= key-count 1) (not (list? cursor)))
-    (list cursor))
-   ((and (list? cursor) (= (length cursor) key-count))
-    cursor)
-   (else
-    (error "Invalid keyset cursor, expected "
-           key-count " values, got: " cursor))))
-
-(define (normalize-key-exprs key-spec)
-  "Accept either a list of SQL key expressions or a key count."
-  (cond
-   ((list? key-spec) key-spec)
-   ((number? key-spec)
-    (map (lambda (idx)
-           (format #f "__chunk_key_~a" (+ idx 1)))
-         (iota key-spec)))
-   (else
-    (error "Invalid key specification: " key-spec))))
-
-(define (build-keyset-order key-spec)
-  (string-join
-   (normalize-key-exprs key-spec)
-   ", "))
-
-(define (build-keyset-where key-spec cursor)
-  (let* ((key-exprs (normalize-key-exprs key-spec))
-         (key-count (length key-exprs))
-         (vals (normalize-keyset-cursor cursor key-count)))
-    (unless vals
-      (error "build-keyset-where called with empty cursor"))
-    (string-join
-     (map (lambda (i)
-            (let ((eq-prefix
-                   (string-join
-                    (map (lambda (j)
-                           (format #f "~a = ~a"
-                                   (list-ref key-exprs j)
-                                   (sql-literal (list-ref vals j))))
-                         (iota i))
-                    " AND "))
-                  (gt-part
-                   (format #f "~a > ~a"
-                           (list-ref key-exprs i)
-                           (sql-literal (list-ref vals i)))))
-              (if (zero? i)
-                  gt-part
-                  (string-append "(" eq-prefix " AND " gt-part ")"))))
-          (iota key-count))
-     " OR ")))
-
-(define (find-sql-tail-start sql)
-  (let ((positions
-         (filter-map identity
-                     (list (string-contains sql " GROUP BY ")
-                           (string-contains sql " ORDER BY ")
-                           (string-contains sql " LIMIT ")))))
-    (if (null? positions) #f (apply min positions))))
-
-(define (append-filter-clause sql clause)
-  (let* ((tail-start (find-sql-tail-start sql))
-         (head (if tail-start (substring sql 0 tail-start) sql))
-         (tail (if tail-start (substring sql tail-start) "")))
-    (if (string-contains head " WHERE ")
-        (string-append head " AND (" clause ")" tail)
-        (string-append head " WHERE (" clause ")" tail))))
-
-(define (append-order-limit sql order-expr limit)
-  (let* ((tail-start (find-sql-tail-start sql))
-         (head (if tail-start (substring sql 0 tail-start) sql))
-         (tail (if tail-start (substring sql tail-start) "")))
-    (string-append head tail
-                   " ORDER BY " order-expr
-                   " LIMIT " (number->string limit))))
-
-(define (prepend-select-expr sql expr)
-  "Prepend EXPR to the SELECT list of SQL."
-  (cond
-   ((and (>= (string-length sql) 16)
-         (string=? (substring sql 0 16) "SELECT DISTINCT "))
-    (string-append "SELECT DISTINCT " expr ", " (substring sql 16)))
-   ((and (>= (string-length sql) 7)
-         (string=? (substring sql 0 7) "SELECT "))
-    (string-append "SELECT " expr ", " (substring sql 7)))
-   (else
-    (error "Expected SQL SELECT query" sql))))
-
 (define (get-keyword-value args keyword default)
   (let ((kv (memq keyword args)))
     (if (and kv (>= (length kv) 2))
@@ -783,12 +582,10 @@ The above query results to triples that have the form:
             (table-metadata? (assoc-ref alist 'table-metadata?))
             (prefixes (assoc-ref alist 'prefixes))
             (inputs (assoc-ref alist 'inputs))
-	            (outputs (assoc-ref alist 'outputs))
-	            (total-rows (assoc-ref alist 'total-rows))
-	            (rows-per-chunk (assoc-ref alist 'rows-per-chunk))
-                    (keyset-pagination? (assoc-ref alist 'keyset-pagination?))
-                    (chunk-key-field (assoc-ref alist 'chunk-key-field))
-	            (chunking? (and total-rows rows-per-chunk))
+            (outputs (assoc-ref alist 'outputs))
+            (total-rows (assoc-ref alist 'total-rows))
+            (rows-per-chunk (assoc-ref alist 'rows-per-chunk))
+            (chunking? (and total-rows rows-per-chunk))
             (chunks (if chunking?
                         (ceiling (/ total-rows rows-per-chunk))
                         1))
@@ -817,76 +614,32 @@ The above query results to triples that have the form:
                  inputs))
               #:encoding "UTF-8"))
 
-	          ;; Dumping the actual data
-	          (when rdf-path
-                    (if keyset-pagination?
-                        ;; Cursor-based chunking with configurable key column.
-                        (let loop ((chunk-index 0)
-                                   (states (map (lambda (_) (cons #f #f)) inputs)))
-                          (unless (every cdr states)
-                            (let* ((out-file
-                                    (if (= chunk-index 0)
-                                        rdf-path
-                                        (string-append (path-without-extension rdf-path)
-                                                       "." (number->string (+ chunk-index 1)) ".ttl")))
-                                   (next-states
-                                    (with-output-to-file
-                                        out-file
-                                      (lambda ()
-                                        (for-each
-                                         (match-lambda
-                                           ((k v)
-                                            (prefix k v)))
-                                         prefixes)
-                                        (newline)
-                                        (let walk ((procs inputs)
-                                                   (st states)
-                                                   (acc '()))
-                                          (if (null? procs)
-                                              (reverse acc)
-                                              (let* ((state (car st))
-                                                     (cursor (car state))
-                                                     (done? (cdr state)))
-                                                (if done?
-                                                    (walk (cdr procs) (cdr st) (cons state acc))
-                                                    (let* ((result ((car procs)
-                                                                    db
-                                                                    #:metadata? table-metadata?
-                                                                    #:keyset? #t
-                                                                    #:chunk-key (or chunk-key-field "Id")
-                                                                    #:limit rows-per-chunk
-                                                                    #:cursor cursor))
-                                                           (rows (car result))
-                                                           (next-cursor (cdr result))
-                                                           (proc-done? (or (= rows 0)
-                                                                           (< rows rows-per-chunk))))
-                                                      (walk (cdr procs)
-                                                            (cdr st)
-                                                            (cons (cons next-cursor proc-done?) acc))))))))
-                                      #:encoding "UTF-8")))
-                              (loop (+ chunk-index 1) next-states))))
-                        ;; Legacy OFFSET chunking.
-                        (do ((i 0 (+ i 1)))
-                            ((>= i chunks))
-                          (let* ((offset (* i (or rows-per-chunk 0)))
-                                 (out-file
-                                  (if (= chunks 1)
-                                      rdf-path
-                                      (string-append (path-without-extension rdf-path)
-                                                     "." (number->string (+ i 1)) ".ttl"))))
-                            (with-output-to-file
-                                out-file
-                              (lambda ()
-                                (for-each
-                                 (match-lambda
-                                   ((k v)
-                                    (prefix k v)))
-                                 prefixes)
-                                (newline)
-                                (for-each
-                                 (lambda (proc)
-                                   (proc db #:metadata? table-metadata?
-                                         #:limit rows-per-chunk
-                                         #:offset offset))
-                                 inputs))
-                              #:encoding "UTF-8")))))))))))
+          ;; Dumping the actual data
+          (when rdf-path
+            (do ((i 0 (+ i 1)))
+                ((>= i chunks))
+              (let* ((offset (* i (or rows-per-chunk 0)))
+                     (out-file
+                      (if (= chunks 1)
+                          rdf-path
+                          (string-append (path-without-extension rdf-path)
+                                         "." (number->string (+ i 1)) ".ttl"))))
+                (with-output-to-file
+                    out-file
+                  (lambda ()
+                    ;; Add the prefixes
+                    (for-each
+                     (match-lambda
+                       ((k v)
+                        (begin
+                          (prefix k v))))
+                     prefixes)
+                    (newline)
+                    (for-each
+                     (lambda (proc)
+                       (proc db #:metadata? table-metadata?
+                             #:limit rows-per-chunk
+                             #:offset offset))
+                     inputs))
+                  #:encoding "UTF-8"))))))))))
+