diff --git a/examples/probesets-experiment-metadata.scm b/examples/probesets-experiment-metadata.scm
index b2eb93e..d7c0764 100755
--- a/examples/probesets-experiment-metadata.scm
+++ b/examples/probesets-experiment-metadata.scm
@@ -70,10 +70,11 @@
(name "ProbeSet Experiments Metadata")
(connection %connection-settings)
(table-metadata? #f)
- (total-rows (assoc-ref
- (sql-find db "SELECT count(*) AS count from ProbeSetXRef")
- "count"))
- (rows-per-chunk 1000000)
+ ;; Use composite keyset pagination over joined table identifiers.
+ (keyset-pagination? #t)
+ (chunk-key-field '("ProbeSetFreeze.Id" "ProbeSet.Id"))
+ ;; (total-rows 1000000)
+ (rows-per-chunk 10000)
(prefixes
'(("dcat:" "<http://www.w3.org/ns/dcat#>")
("gn:" "<http://rdf.genenetwork.org/v1/id/>")
diff --git a/transform/special-forms.scm b/transform/special-forms.scm
index 0c07a0a..3cf6a24 100644
--- a/transform/special-forms.scm
+++ b/transform/special-forms.scm
@@ -31,19 +31,16 @@
(else #t)))
(define (emit-short-turtle subject po-alist)
- (let loop ((pairs po-alist) (first? #t))
- (match pairs
- (((p . o) rest ...)
- (if (not (emittable-object? o))
- (loop rest first?) ; skip malformed or empty objects
- (begin
- ;; subject only once
- (when first?
- (format #t "~a " subject))
- (when (not first?)
- (format #t "\t"))
-
- ;; emit predicate–object
+ ;; Hot path: avoid rescanning remaining pairs on each step (O(n^2)).
+ ;; Build filtered pairs and count in one pass.
+ (let collect ((pairs po-alist) (kept '()) (count 0))
+ (if (null? pairs)
+ (let loop ((pairs (reverse kept)) (index 0))
+ (match pairs
+ (((p . o) rest ...)
+ (if (zero? index)
+ (format #t "~a " subject)
+ (display "\t"))
(match o
((? symbol?)
(format #t "~a ~a" p (symbol->string o)))
@@ -51,16 +48,47 @@
(format #t "~a \"~a\"" p o))
(_
(format #t "~a ~s" p o)))
-
- ;; separator depends on *remaining emittable pairs*
- (if (any (match-lambda
- ((p . o) (emittable-object? o)))
- rest)
- (format #t " ;~%")
- (format #t " .~%"))
-
- (loop rest #f))))
- (() #f))))
+ (if (< (+ index 1) count)
+ (display " ;\n")
+ (display " .\n"))
+ (loop rest (+ index 1)))
+ (() #f)))
+ (match (car pairs)
+ ((p . o)
+ (if (emittable-object? o)
+ (collect (cdr pairs) (cons (cons p o) kept) (+ count 1))
+ (collect (cdr pairs) kept count)))))))
+
+(define (make-short-turtle-emitter subject)
+ "Return two procedures: EMIT! and FINISH! for streaming short-turtle output."
+ (let ((first? #t)
+ (pending #f))
+ (define (emit-one pair tail)
+ (let ((p (car pair))
+ (o (cdr pair)))
+ (if first?
+ (format #t "~a " subject)
+ (display "\t"))
+ (match o
+ ((? symbol?)
+ (format #t "~a ~a" p (symbol->string o)))
+ ((? string?)
+ (format #t "~a \"~a\"" p o))
+ (_
+ (format #t "~a ~s" p o)))
+ (display tail)
+ (set! first? #f)))
+ (define (emit! predicate object)
+ (when (emittable-object? object)
+ (let ((current (cons predicate object)))
+ (when pending
+ (emit-one pending " ;\n"))
+ (set! pending current))))
+ (define (finish!)
+ (when pending
+ (emit-one pending " .\n")
+ (set! pending #f)))
+ (values emit! finish!)))
@@ -184,65 +212,76 @@ Example:
(syntax-case x ()
((_ alist actions ...)
;; TODO: Check that all actions are valid.
- #`(let ((evaluated-alist alist))
- (append (remove (match-lambda
- ;; Filter out results of filter-set actions.
- ((key . #f)
- (member key '#,(filter-map (lambda (action)
- (syntax-case action (filter-set)
- ((filter-set key expression) #'key)
- (_ #f)))
- #'(actions ...))))
- (_ #f))
- ;; Do set and filter-set.
- `#,(filter-map (lambda (action)
- (syntax-case action (set filter-set)
- ((set key expression)
- #`(key . ,#,(key->assoc-ref #'evaluated-alist #'expression)))
- ((filter-set key expression)
- #`(key . ,#,(key->assoc-ref #'evaluated-alist #'expression)))
- (_ #f)))
- #'(actions ...)))
- ;; Do multiset.
- #,@(filter-map (lambda (action)
- (syntax-case action (multiset)
- ((multiset key expression)
- #`(map (cut cons 'key <>)
- #,(key->assoc-ref #'evaluated-alist #'expression)))
- (_ #f)))
- #'(actions ...))
- ;; Apply else=> procedure on unspecified keys. If
- ;; no else=> procedure is specified, delete
- ;; unspecified keys.
- (filter-map #,(or (any (lambda (action)
- (syntax-case action (else=>)
- ((else=> proc) #'proc)
- (_ #f)))
- #'(actions ...))
- #'(const #f))
- ;; The unspecified part of the input
- ;; alist
- (alist-delete* evaluated-alist
- (list
- ;; Keys that were referenced
- #,@(append-map (lambda (action)
- (syntax-case action ()
- ((_ key expression)
- (collect-keys #'expression))
- (_ '())))
- #'(actions ...))
- ;; Keys that were deleted
- #,@(filter-map (lambda (action)
- (syntax-case action (remove)
- ((remove key) #''key)
- (_ #f)))
- #'(actions ...))
- ;; Keys that were set
- #,@(filter-map (lambda (action)
- (syntax-case action ()
- ((_ key expression) #''key)
- (_ #f)))
- #'(actions ...)))))))))))
+ (let* ((actions #'(actions ...))
+ (filter-set-keys
+ (filter-map (lambda (action)
+ (syntax-case action (filter-set)
+ ((filter-set key expression) #'key)
+ (_ #f)))
+ actions))
+ (set/filter-pairs
+ (filter-map (lambda (action)
+ (syntax-case action (set filter-set)
+ ((set key expression)
+ #`(cons 'key #,(key->assoc-ref #'evaluated-alist #'expression)))
+ ((filter-set key expression)
+ #`(cons 'key #,(key->assoc-ref #'evaluated-alist #'expression)))
+ (_ #f)))
+ actions))
+ (multiset-exprs
+ (filter-map (lambda (action)
+ (syntax-case action (multiset)
+ ((multiset key expression)
+ #`(map (cut cons 'key <>)
+ #,(key->assoc-ref #'evaluated-alist #'expression)))
+ (_ #f)))
+ actions))
+ (else-proc
+ (any (lambda (action)
+ (syntax-case action (else=>)
+ ((else=> proc) #'proc)
+ (_ #f)))
+ actions))
+ (unspecified-keys
+ #`(list
+ ;; Keys that were referenced
+ #,@(append-map (lambda (action)
+ (syntax-case action ()
+ ((_ key expression)
+ (collect-keys #'expression))
+ (_ '())))
+ actions)
+ ;; Keys that were deleted
+ #,@(filter-map (lambda (action)
+ (syntax-case action (remove)
+ ((remove key) #''key)
+ (_ #f)))
+ actions)
+ ;; Keys that were set
+ #,@(filter-map (lambda (action)
+ (syntax-case action ()
+ ((_ key expression) #''key)
+ (_ #f)))
+ actions))))
+ #`(let ((evaluated-alist alist))
+ (append
+ ;; Do set and filter-set.
+ #,(if (null? filter-set-keys)
+ #`(list #,@set/filter-pairs)
+ #`(remove (match-lambda
+ ((key . #f)
+ (member key '#,filter-set-keys))
+ (_ #f))
+ (list #,@set/filter-pairs)))
+ ;; Do multiset.
+ #,@multiset-exprs
+ ;; Apply else=> only when explicitly present.
+ #,@(if else-proc
+ (list
+ #`(filter-map #,else-proc
+ (alist-delete* evaluated-alist
+ #,unspecified-keys)))
+ '()))))))))
@@ -416,22 +455,56 @@ must be remedied."
#'(schema-triples)))
((triples subject predicate-clauses ...) (triples)
(find-clause #'(clauses ...) 'triples)))
- #`(define* (name db #:key
- (metadata? #f)
- (data? #t)
- (documentation? #f)
- (limit #f)
- (offset #f))
- (let* ((base-sql
- (select-query #,(collect-fields #'(subject predicate-clauses ...))
- (primary-table other-tables ...)
- tables-raw ...))
- (sql
- (if (and limit offset)
- (format #f "~a LIMIT ~a OFFSET ~a"
- base-sql limit offset)
- base-sql)))
- (when metadata?
+ #`(define* (name db #:key
+ (metadata? #f)
+ (data? #t)
+ (documentation? #f)
+ (keyset? #f)
+ (chunk-key "Id")
+ (limit #f)
+ (offset #f)
+ (cursor #f))
+ (let* ((base-sql
+ (select-query #,(collect-fields #'(subject predicate-clauses ...))
+ (primary-table other-tables ...)
+ tables-raw ...))
+ (chunk-keys
+ (if (list? chunk-key)
+ chunk-key
+ (list chunk-key)))
+ (chunk-key-exprs
+ (map (lambda (k)
+ (if (string-index k #\.)
+ k
+ (format #f "~a.~a"
+ #,(symbol->string (syntax->datum #'primary-table))
+ k)))
+ chunk-keys))
+ (sql-for-keyset
+ (prepend-select-expr
+ base-sql
+ (string-join
+ (map (lambda (expr idx)
+ (format #f "~a AS __chunk_key_~a" expr (+ idx 1)))
+ chunk-key-exprs
+ (iota (length chunk-key-exprs)))
+ ", ")))
+ (sql
+ (cond
+ (keyset?
+ (append-order-limit
+ (if cursor
+ (append-filter-clause
+ sql-for-keyset
+ (build-keyset-where chunk-key-exprs cursor))
+ sql-for-keyset)
+ (build-keyset-order chunk-key-exprs)
+ (or limit 100000)))
+ ((and limit offset)
+ (format #f "~a LIMIT ~a OFFSET ~a"
+ base-sql limit offset))
+ (else base-sql))))
+ (when metadata?
#,@(let ((table (symbol->string (syntax->datum #'primary-table)))
(subject-type (any (lambda (predicate)
(syntax-case predicate (rdf:type)
@@ -557,16 +630,144 @@ The above query results to triples that have the form:
(list 'triple-predicate ...)
(list 'triple-object ...)))
(_ (error "Invalid schema triples clause:" #'schema-triples-clause)))
- (sql-for-each
- (lambda (row)
- (let* ((subject-val #,(field->assoc-ref #'row #'subject))
- (po-alist
- (map-alist row #,@(field->key #'(predicate-clauses ...)))))
- (emit-short-turtle subject-val po-alist)))
- db
- sql))))))
+ (let ((rows-written 0)
+ (last-cursor cursor))
+ (sql-for-each
+ (lambda (row)
+ (set! rows-written (+ rows-written 1))
+ (when keyset?
+ (set! last-cursor
+ (map (lambda (idx)
+ (assoc-ref row (format #f "__chunk_key_~a" (+ idx 1))))
+ (iota (if (list? chunk-key)
+ (length chunk-key)
+ 1)))))
+ (let ((subject-val #,(field->assoc-ref #'row #'subject)))
+ (call-with-values
+ (lambda () (make-short-turtle-emitter subject-val))
+ (lambda (emit! finish!)
+ #,@(map (lambda (predicate-clause)
+ (syntax-case predicate-clause (set multiset filter-set)
+ ((set predicate object)
+ #`(emit! 'predicate
+ #,(field->assoc-ref #'row #'object)))
+ ((filter-set predicate object)
+ #`(let ((val #,(field->assoc-ref #'row #'object)))
+ (when val
+ (emit! 'predicate val))))
+ ((multiset predicate object)
+ #`(for-each (lambda (val)
+ (emit! 'predicate val))
+ #,(field->assoc-ref #'row #'object)))
+ (_ (error "Invalid predicate clause:" predicate-clause))))
+ #'(predicate-clauses ...))
+ (finish!)))))
+ db
+ sql)
+ (if keyset?
+ (cons rows-written last-cursor)
+ rows-written)))))))
(_ (error "Invalid define-transformer syntax:" (syntax->datum x))))))
+(define (sql-literal v)
+ (cond
+ ((number? v) (number->string v))
+ ((string? v)
+ (format #f "'~a'" (replace-substrings v '(("'" . "''")))))
+ ((symbol? v)
+ (format #f "'~a'" (symbol->string v)))
+ (else
+ (format #f "'~a'" v))))
+
+(define (normalize-keyset-cursor cursor key-count)
+ (cond
+ ((not cursor) #f)
+ ((and (= key-count 1) (not (list? cursor)))
+ (list cursor))
+ ((and (list? cursor) (= (length cursor) key-count))
+ cursor)
+ (else
+ (error "Invalid keyset cursor, expected "
+ key-count " values, got: " cursor))))
+
+(define (normalize-key-exprs key-spec)
+ "Accept either a list of SQL key expressions or a key count."
+ (cond
+ ((list? key-spec) key-spec)
+ ((number? key-spec)
+ (map (lambda (idx)
+ (format #f "__chunk_key_~a" (+ idx 1)))
+ (iota key-spec)))
+ (else
+ (error "Invalid key specification: " key-spec))))
+
+(define (build-keyset-order key-spec)
+ (string-join
+ (normalize-key-exprs key-spec)
+ ", "))
+
+(define (build-keyset-where key-spec cursor)
+ (let* ((key-exprs (normalize-key-exprs key-spec))
+ (key-count (length key-exprs))
+ (vals (normalize-keyset-cursor cursor key-count)))
+ (unless vals
+ (error "build-keyset-where called with empty cursor"))
+ (string-join
+ (map (lambda (i)
+ (let ((eq-prefix
+ (string-join
+ (map (lambda (j)
+ (format #f "~a = ~a"
+ (list-ref key-exprs j)
+ (sql-literal (list-ref vals j))))
+ (iota i))
+ " AND "))
+ (gt-part
+ (format #f "~a > ~a"
+ (list-ref key-exprs i)
+ (sql-literal (list-ref vals i)))))
+ (if (zero? i)
+ gt-part
+ (string-append "(" eq-prefix " AND " gt-part ")"))))
+ (iota key-count))
+ " OR ")))
+
+(define (find-sql-tail-start sql)
+ (let ((positions
+ (filter-map identity
+ (list (string-contains sql " GROUP BY ")
+ (string-contains sql " ORDER BY ")
+ (string-contains sql " LIMIT ")))))
+ (if (null? positions) #f (apply min positions))))
+
+(define (append-filter-clause sql clause)
+ (let* ((tail-start (find-sql-tail-start sql))
+ (head (if tail-start (substring sql 0 tail-start) sql))
+ (tail (if tail-start (substring sql tail-start) "")))
+ (if (string-contains head " WHERE ")
+ (string-append head " AND (" clause ")" tail)
+ (string-append head " WHERE (" clause ")" tail))))
+
+(define (append-order-limit sql order-expr limit)
+ (let* ((tail-start (find-sql-tail-start sql))
+ (head (if tail-start (substring sql 0 tail-start) sql))
+ (tail (if tail-start (substring sql tail-start) "")))
+ (string-append head tail
+ " ORDER BY " order-expr
+ " LIMIT " (number->string limit))))
+
+(define (prepend-select-expr sql expr)
+ "Prepend EXPR to the SELECT list of SQL."
+ (cond
+ ((and (>= (string-length sql) 16)
+ (string=? (substring sql 0 16) "SELECT DISTINCT "))
+ (string-append "SELECT DISTINCT " expr ", " (substring sql 16)))
+ ((and (>= (string-length sql) 7)
+ (string=? (substring sql 0 7) "SELECT "))
+ (string-append "SELECT " expr ", " (substring sql 7)))
+ (else
+ (error "Expected SQL SELECT query" sql))))
+
(define (get-keyword-value args keyword default)
(let ((kv (memq keyword args)))
(if (and kv (>= (length kv) 2))
@@ -582,10 +783,12 @@ The above query results to triples that have the form:
(table-metadata? (assoc-ref alist 'table-metadata?))
(prefixes (assoc-ref alist 'prefixes))
(inputs (assoc-ref alist 'inputs))
- (outputs (assoc-ref alist 'outputs))
- (total-rows (assoc-ref alist 'total-rows))
- (rows-per-chunk (assoc-ref alist 'rows-per-chunk))
- (chunking? (and total-rows rows-per-chunk))
+ (outputs (assoc-ref alist 'outputs))
+ (total-rows (assoc-ref alist 'total-rows))
+ (rows-per-chunk (assoc-ref alist 'rows-per-chunk))
+ (keyset-pagination? (assoc-ref alist 'keyset-pagination?))
+ (chunk-key-field (assoc-ref alist 'chunk-key-field))
+ (chunking? (and total-rows rows-per-chunk))
(chunks (if chunking?
(ceiling (/ total-rows rows-per-chunk))
1))
@@ -614,32 +817,76 @@ The above query results to triples that have the form:
inputs))
#:encoding "UTF-8"))
- ;; Dumping the actual data
- (when rdf-path
- (do ((i 0 (+ i 1)))
- ((>= i chunks))
- (let* ((offset (* i (or rows-per-chunk 0)))
- (out-file
- (if (= chunks 1)
- rdf-path
- (string-append (path-without-extension rdf-path)
- "." (number->string (+ i 1)) ".ttl"))))
- (with-output-to-file
- out-file
- (lambda ()
- ;; Add the prefixes
- (for-each
- (match-lambda
- ((k v)
- (begin
- (prefix k v))))
- prefixes)
- (newline)
- (for-each
- (lambda (proc)
- (proc db #:metadata? table-metadata?
- #:limit rows-per-chunk
- #:offset offset))
- inputs))
- #:encoding "UTF-8"))))))))))
-
+ ;; Dumping the actual data
+ (when rdf-path
+ (if keyset-pagination?
+ ;; Cursor-based chunking with configurable key column.
+ (let loop ((chunk-index 0)
+ (states (map (lambda (_) (cons #f #f)) inputs)))
+ (unless (every cdr states)
+ (let* ((out-file
+ (if (= chunk-index 0)
+ rdf-path
+ (string-append (path-without-extension rdf-path)
+ "." (number->string (+ chunk-index 1)) ".ttl")))
+ (next-states
+ (with-output-to-file
+ out-file
+ (lambda ()
+ (for-each
+ (match-lambda
+ ((k v)
+ (prefix k v)))
+ prefixes)
+ (newline)
+ (let walk ((procs inputs)
+ (st states)
+ (acc '()))
+ (if (null? procs)
+ (reverse acc)
+ (let* ((state (car st))
+ (cursor (car state))
+ (done? (cdr state)))
+ (if done?
+ (walk (cdr procs) (cdr st) (cons state acc))
+ (let* ((result ((car procs)
+ db
+ #:metadata? table-metadata?
+ #:keyset? #t
+ #:chunk-key (or chunk-key-field "Id")
+ #:limit rows-per-chunk
+ #:cursor cursor))
+ (rows (car result))
+ (next-cursor (cdr result))
+ (proc-done? (or (= rows 0)
+ (< rows rows-per-chunk))))
+ (walk (cdr procs)
+ (cdr st)
+ (cons (cons next-cursor proc-done?) acc))))))))
+ #:encoding "UTF-8")))
+ (loop (+ chunk-index 1) next-states))))
+ ;; Legacy OFFSET chunking.
+ (do ((i 0 (+ i 1)))
+ ((>= i chunks))
+ (let* ((offset (* i (or rows-per-chunk 0)))
+ (out-file
+ (if (= chunks 1)
+ rdf-path
+ (string-append (path-without-extension rdf-path)
+ "." (number->string (+ i 1)) ".ttl"))))
+ (with-output-to-file
+ out-file
+ (lambda ()
+ (for-each
+ (match-lambda
+ ((k v)
+ (prefix k v)))
+ prefixes)
+ (newline)
+ (for-each
+ (lambda (proc)
+ (proc db #:metadata? table-metadata?
+ #:limit rows-per-chunk
+ #:offset offset))
+ inputs))
+ #:encoding "UTF-8")))))))))))
|