diff --git a/examples/probesets-experiment-metadata.scm b/examples/probesets-experiment-metadata.scm
index d7c0764..b2eb93e 100755
--- a/examples/probesets-experiment-metadata.scm
+++ b/examples/probesets-experiment-metadata.scm
@@ -70,11 +70,10 @@
(name "ProbeSet Experiments Metadata")
(connection %connection-settings)
(table-metadata? #f)
- ;; Use composite keyset pagination over joined table identifiers.
- (keyset-pagination? #t)
- (chunk-key-field '("ProbeSetFreeze.Id" "ProbeSet.Id"))
- ;; (total-rows 1000000)
- (rows-per-chunk 10000)
+ (total-rows (assoc-ref
+ (sql-find db "SELECT count(*) AS count from ProbeSetXRef")
+ "count"))
+ (rows-per-chunk 1000000)
(prefixes
'(("dcat:" "<http://www.w3.org/ns/dcat#>")
("gn:" "<http://rdf.genenetwork.org/v1/id/>")
diff --git a/transform/special-forms.scm b/transform/special-forms.scm
index 3cf6a24..0c07a0a 100644
--- a/transform/special-forms.scm
+++ b/transform/special-forms.scm
@@ -31,16 +31,19 @@
(else #t)))
(define (emit-short-turtle subject po-alist)
- ;; Hot path: avoid rescanning remaining pairs on each step (O(n^2)).
- ;; Build filtered pairs and count in one pass.
- (let collect ((pairs po-alist) (kept '()) (count 0))
- (if (null? pairs)
- (let loop ((pairs (reverse kept)) (index 0))
- (match pairs
- (((p . o) rest ...)
- (if (zero? index)
- (format #t "~a " subject)
- (display "\t"))
+ (let loop ((pairs po-alist) (first? #t))
+ (match pairs
+ (((p . o) rest ...)
+ (if (not (emittable-object? o))
+ (loop rest first?) ; skip malformed or empty objects
+ (begin
+ ;; subject only once
+ (when first?
+ (format #t "~a " subject))
+ (when (not first?)
+ (format #t "\t"))
+
+ ;; emit predicate–object
(match o
((? symbol?)
(format #t "~a ~a" p (symbol->string o)))
@@ -48,47 +51,16 @@
(format #t "~a \"~a\"" p o))
(_
(format #t "~a ~s" p o)))
- (if (< (+ index 1) count)
- (display " ;\n")
- (display " .\n"))
- (loop rest (+ index 1)))
- (() #f)))
- (match (car pairs)
- ((p . o)
- (if (emittable-object? o)
- (collect (cdr pairs) (cons (cons p o) kept) (+ count 1))
- (collect (cdr pairs) kept count)))))))
-
-(define (make-short-turtle-emitter subject)
- "Return two procedures: EMIT! and FINISH! for streaming short-turtle output."
- (let ((first? #t)
- (pending #f))
- (define (emit-one pair tail)
- (let ((p (car pair))
- (o (cdr pair)))
- (if first?
- (format #t "~a " subject)
- (display "\t"))
- (match o
- ((? symbol?)
- (format #t "~a ~a" p (symbol->string o)))
- ((? string?)
- (format #t "~a \"~a\"" p o))
- (_
- (format #t "~a ~s" p o)))
- (display tail)
- (set! first? #f)))
- (define (emit! predicate object)
- (when (emittable-object? object)
- (let ((current (cons predicate object)))
- (when pending
- (emit-one pending " ;\n"))
- (set! pending current))))
- (define (finish!)
- (when pending
- (emit-one pending " .\n")
- (set! pending #f)))
- (values emit! finish!)))
+
+ ;; separator depends on *remaining emittable pairs*
+ (if (any (match-lambda
+ ((p . o) (emittable-object? o)))
+ rest)
+ (format #t " ;~%")
+ (format #t " .~%"))
+
+ (loop rest #f))))
+ (() #f))))
@@ -212,76 +184,65 @@ Example:
(syntax-case x ()
((_ alist actions ...)
;; TODO: Check that all actions are valid.
- (let* ((actions #'(actions ...))
- (filter-set-keys
- (filter-map (lambda (action)
- (syntax-case action (filter-set)
- ((filter-set key expression) #'key)
- (_ #f)))
- actions))
- (set/filter-pairs
- (filter-map (lambda (action)
- (syntax-case action (set filter-set)
- ((set key expression)
- #`(cons 'key #,(key->assoc-ref #'evaluated-alist #'expression)))
- ((filter-set key expression)
- #`(cons 'key #,(key->assoc-ref #'evaluated-alist #'expression)))
- (_ #f)))
- actions))
- (multiset-exprs
- (filter-map (lambda (action)
- (syntax-case action (multiset)
- ((multiset key expression)
- #`(map (cut cons 'key <>)
- #,(key->assoc-ref #'evaluated-alist #'expression)))
- (_ #f)))
- actions))
- (else-proc
- (any (lambda (action)
- (syntax-case action (else=>)
- ((else=> proc) #'proc)
- (_ #f)))
- actions))
- (unspecified-keys
- #`(list
- ;; Keys that were referenced
- #,@(append-map (lambda (action)
- (syntax-case action ()
- ((_ key expression)
- (collect-keys #'expression))
- (_ '())))
- actions)
- ;; Keys that were deleted
- #,@(filter-map (lambda (action)
- (syntax-case action (remove)
- ((remove key) #''key)
- (_ #f)))
- actions)
- ;; Keys that were set
- #,@(filter-map (lambda (action)
- (syntax-case action ()
- ((_ key expression) #''key)
- (_ #f)))
- actions))))
- #`(let ((evaluated-alist alist))
- (append
- ;; Do set and filter-set.
- #,(if (null? filter-set-keys)
- #`(list #,@set/filter-pairs)
- #`(remove (match-lambda
- ((key . #f)
- (member key '#,filter-set-keys))
- (_ #f))
- (list #,@set/filter-pairs)))
- ;; Do multiset.
- #,@multiset-exprs
- ;; Apply else=> only when explicitly present.
- #,@(if else-proc
- (list
- #`(filter-map #,else-proc
- (alist-delete* evaluated-alist
- #,unspecified-keys)))
- '()))))))))
+ #`(let ((evaluated-alist alist))
+ (append (remove (match-lambda
+ ;; Filter out results of filter-set actions.
+ ((key . #f)
+ (member key '#,(filter-map (lambda (action)
+ (syntax-case action (filter-set)
+ ((filter-set key expression) #'key)
+ (_ #f)))
+ #'(actions ...))))
+ (_ #f))
+ ;; Do set and filter-set.
+ `#,(filter-map (lambda (action)
+ (syntax-case action (set filter-set)
+ ((set key expression)
+ #`(key . ,#,(key->assoc-ref #'evaluated-alist #'expression)))
+ ((filter-set key expression)
+ #`(key . ,#,(key->assoc-ref #'evaluated-alist #'expression)))
+ (_ #f)))
+ #'(actions ...)))
+ ;; Do multiset.
+ #,@(filter-map (lambda (action)
+ (syntax-case action (multiset)
+ ((multiset key expression)
+ #`(map (cut cons 'key <>)
+ #,(key->assoc-ref #'evaluated-alist #'expression)))
+ (_ #f)))
+ #'(actions ...))
+ ;; Apply else=> procedure on unspecified keys. If
+ ;; no else=> procedure is specified, delete
+ ;; unspecified keys.
+ (filter-map #,(or (any (lambda (action)
+ (syntax-case action (else=>)
+ ((else=> proc) #'proc)
+ (_ #f)))
+ #'(actions ...))
+ #'(const #f))
+ ;; The unspecified part of the input
+ ;; alist
+ (alist-delete* evaluated-alist
+ (list
+ ;; Keys that were referenced
+ #,@(append-map (lambda (action)
+ (syntax-case action ()
+ ((_ key expression)
+ (collect-keys #'expression))
+ (_ '())))
+ #'(actions ...))
+ ;; Keys that were deleted
+ #,@(filter-map (lambda (action)
+ (syntax-case action (remove)
+ ((remove key) #''key)
+ (_ #f)))
+ #'(actions ...))
+ ;; Keys that were set
+ #,@(filter-map (lambda (action)
+ (syntax-case action ()
+ ((_ key expression) #''key)
+ (_ #f)))
+ #'(actions ...)))))))))))
@@ -455,56 +416,22 @@ must be remedied."
#'(schema-triples)))
((triples subject predicate-clauses ...) (triples)
(find-clause #'(clauses ...) 'triples)))
- #`(define* (name db #:key
- (metadata? #f)
- (data? #t)
- (documentation? #f)
- (keyset? #f)
- (chunk-key "Id")
- (limit #f)
- (offset #f)
- (cursor #f))
- (let* ((base-sql
- (select-query #,(collect-fields #'(subject predicate-clauses ...))
- (primary-table other-tables ...)
- tables-raw ...))
- (chunk-keys
- (if (list? chunk-key)
- chunk-key
- (list chunk-key)))
- (chunk-key-exprs
- (map (lambda (k)
- (if (string-index k #\.)
- k
- (format #f "~a.~a"
- #,(symbol->string (syntax->datum #'primary-table))
- k)))
- chunk-keys))
- (sql-for-keyset
- (prepend-select-expr
- base-sql
- (string-join
- (map (lambda (expr idx)
- (format #f "~a AS __chunk_key_~a" expr (+ idx 1)))
- chunk-key-exprs
- (iota (length chunk-key-exprs)))
- ", ")))
- (sql
- (cond
- (keyset?
- (append-order-limit
- (if cursor
- (append-filter-clause
- sql-for-keyset
- (build-keyset-where chunk-key-exprs cursor))
- sql-for-keyset)
- (build-keyset-order chunk-key-exprs)
- (or limit 100000)))
- ((and limit offset)
- (format #f "~a LIMIT ~a OFFSET ~a"
- base-sql limit offset))
- (else base-sql))))
- (when metadata?
+ #`(define* (name db #:key
+ (metadata? #f)
+ (data? #t)
+ (documentation? #f)
+ (limit #f)
+ (offset #f))
+ (let* ((base-sql
+ (select-query #,(collect-fields #'(subject predicate-clauses ...))
+ (primary-table other-tables ...)
+ tables-raw ...))
+ (sql
+ (if (and limit offset)
+ (format #f "~a LIMIT ~a OFFSET ~a"
+ base-sql limit offset)
+ base-sql)))
+ (when metadata?
#,@(let ((table (symbol->string (syntax->datum #'primary-table)))
(subject-type (any (lambda (predicate)
(syntax-case predicate (rdf:type)
@@ -630,144 +557,16 @@ The above query results to triples that have the form:
(list 'triple-predicate ...)
(list 'triple-object ...)))
(_ (error "Invalid schema triples clause:" #'schema-triples-clause)))
- (let ((rows-written 0)
- (last-cursor cursor))
- (sql-for-each
- (lambda (row)
- (set! rows-written (+ rows-written 1))
- (when keyset?
- (set! last-cursor
- (map (lambda (idx)
- (assoc-ref row (format #f "__chunk_key_~a" (+ idx 1))))
- (iota (if (list? chunk-key)
- (length chunk-key)
- 1)))))
- (let ((subject-val #,(field->assoc-ref #'row #'subject)))
- (call-with-values
- (lambda () (make-short-turtle-emitter subject-val))
- (lambda (emit! finish!)
- #,@(map (lambda (predicate-clause)
- (syntax-case predicate-clause (set multiset filter-set)
- ((set predicate object)
- #`(emit! 'predicate
- #,(field->assoc-ref #'row #'object)))
- ((filter-set predicate object)
- #`(let ((val #,(field->assoc-ref #'row #'object)))
- (when val
- (emit! 'predicate val))))
- ((multiset predicate object)
- #`(for-each (lambda (val)
- (emit! 'predicate val))
- #,(field->assoc-ref #'row #'object)))
- (_ (error "Invalid predicate clause:" predicate-clause))))
- #'(predicate-clauses ...))
- (finish!)))))
- db
- sql)
- (if keyset?
- (cons rows-written last-cursor)
- rows-written)))))))
+ (sql-for-each
+ (lambda (row)
+ (let* ((subject-val #,(field->assoc-ref #'row #'subject))
+ (po-alist
+ (map-alist row #,@(field->key #'(predicate-clauses ...)))))
+ (emit-short-turtle subject-val po-alist)))
+ db
+ sql))))))
(_ (error "Invalid define-transformer syntax:" (syntax->datum x))))))
-(define (sql-literal v)
- (cond
- ((number? v) (number->string v))
- ((string? v)
- (format #f "'~a'" (replace-substrings v '(("'" . "''")))))
- ((symbol? v)
- (format #f "'~a'" (symbol->string v)))
- (else
- (format #f "'~a'" v))))
-
-(define (normalize-keyset-cursor cursor key-count)
- (cond
- ((not cursor) #f)
- ((and (= key-count 1) (not (list? cursor)))
- (list cursor))
- ((and (list? cursor) (= (length cursor) key-count))
- cursor)
- (else
- (error "Invalid keyset cursor, expected "
- key-count " values, got: " cursor))))
-
-(define (normalize-key-exprs key-spec)
- "Accept either a list of SQL key expressions or a key count."
- (cond
- ((list? key-spec) key-spec)
- ((number? key-spec)
- (map (lambda (idx)
- (format #f "__chunk_key_~a" (+ idx 1)))
- (iota key-spec)))
- (else
- (error "Invalid key specification: " key-spec))))
-
-(define (build-keyset-order key-spec)
- (string-join
- (normalize-key-exprs key-spec)
- ", "))
-
-(define (build-keyset-where key-spec cursor)
- (let* ((key-exprs (normalize-key-exprs key-spec))
- (key-count (length key-exprs))
- (vals (normalize-keyset-cursor cursor key-count)))
- (unless vals
- (error "build-keyset-where called with empty cursor"))
- (string-join
- (map (lambda (i)
- (let ((eq-prefix
- (string-join
- (map (lambda (j)
- (format #f "~a = ~a"
- (list-ref key-exprs j)
- (sql-literal (list-ref vals j))))
- (iota i))
- " AND "))
- (gt-part
- (format #f "~a > ~a"
- (list-ref key-exprs i)
- (sql-literal (list-ref vals i)))))
- (if (zero? i)
- gt-part
- (string-append "(" eq-prefix " AND " gt-part ")"))))
- (iota key-count))
- " OR ")))
-
-(define (find-sql-tail-start sql)
- (let ((positions
- (filter-map identity
- (list (string-contains sql " GROUP BY ")
- (string-contains sql " ORDER BY ")
- (string-contains sql " LIMIT ")))))
- (if (null? positions) #f (apply min positions))))
-
-(define (append-filter-clause sql clause)
- (let* ((tail-start (find-sql-tail-start sql))
- (head (if tail-start (substring sql 0 tail-start) sql))
- (tail (if tail-start (substring sql tail-start) "")))
- (if (string-contains head " WHERE ")
- (string-append head " AND (" clause ")" tail)
- (string-append head " WHERE (" clause ")" tail))))
-
-(define (append-order-limit sql order-expr limit)
- (let* ((tail-start (find-sql-tail-start sql))
- (head (if tail-start (substring sql 0 tail-start) sql))
- (tail (if tail-start (substring sql tail-start) "")))
- (string-append head tail
- " ORDER BY " order-expr
- " LIMIT " (number->string limit))))
-
-(define (prepend-select-expr sql expr)
- "Prepend EXPR to the SELECT list of SQL."
- (cond
- ((and (>= (string-length sql) 16)
- (string=? (substring sql 0 16) "SELECT DISTINCT "))
- (string-append "SELECT DISTINCT " expr ", " (substring sql 16)))
- ((and (>= (string-length sql) 7)
- (string=? (substring sql 0 7) "SELECT "))
- (string-append "SELECT " expr ", " (substring sql 7)))
- (else
- (error "Expected SQL SELECT query" sql))))
-
(define (get-keyword-value args keyword default)
(let ((kv (memq keyword args)))
(if (and kv (>= (length kv) 2))
@@ -783,12 +582,10 @@ The above query results to triples that have the form:
(table-metadata? (assoc-ref alist 'table-metadata?))
(prefixes (assoc-ref alist 'prefixes))
(inputs (assoc-ref alist 'inputs))
- (outputs (assoc-ref alist 'outputs))
- (total-rows (assoc-ref alist 'total-rows))
- (rows-per-chunk (assoc-ref alist 'rows-per-chunk))
- (keyset-pagination? (assoc-ref alist 'keyset-pagination?))
- (chunk-key-field (assoc-ref alist 'chunk-key-field))
- (chunking? (and total-rows rows-per-chunk))
+ (outputs (assoc-ref alist 'outputs))
+ (total-rows (assoc-ref alist 'total-rows))
+ (rows-per-chunk (assoc-ref alist 'rows-per-chunk))
+ (chunking? (and total-rows rows-per-chunk))
(chunks (if chunking?
(ceiling (/ total-rows rows-per-chunk))
1))
@@ -817,76 +614,32 @@ The above query results to triples that have the form:
inputs))
#:encoding "UTF-8"))
- ;; Dumping the actual data
- (when rdf-path
- (if keyset-pagination?
- ;; Cursor-based chunking with configurable key column.
- (let loop ((chunk-index 0)
- (states (map (lambda (_) (cons #f #f)) inputs)))
- (unless (every cdr states)
- (let* ((out-file
- (if (= chunk-index 0)
- rdf-path
- (string-append (path-without-extension rdf-path)
- "." (number->string (+ chunk-index 1)) ".ttl")))
- (next-states
- (with-output-to-file
- out-file
- (lambda ()
- (for-each
- (match-lambda
- ((k v)
- (prefix k v)))
- prefixes)
- (newline)
- (let walk ((procs inputs)
- (st states)
- (acc '()))
- (if (null? procs)
- (reverse acc)
- (let* ((state (car st))
- (cursor (car state))
- (done? (cdr state)))
- (if done?
- (walk (cdr procs) (cdr st) (cons state acc))
- (let* ((result ((car procs)
- db
- #:metadata? table-metadata?
- #:keyset? #t
- #:chunk-key (or chunk-key-field "Id")
- #:limit rows-per-chunk
- #:cursor cursor))
- (rows (car result))
- (next-cursor (cdr result))
- (proc-done? (or (= rows 0)
- (< rows rows-per-chunk))))
- (walk (cdr procs)
- (cdr st)
- (cons (cons next-cursor proc-done?) acc))))))))
- #:encoding "UTF-8")))
- (loop (+ chunk-index 1) next-states))))
- ;; Legacy OFFSET chunking.
- (do ((i 0 (+ i 1)))
- ((>= i chunks))
- (let* ((offset (* i (or rows-per-chunk 0)))
- (out-file
- (if (= chunks 1)
- rdf-path
- (string-append (path-without-extension rdf-path)
- "." (number->string (+ i 1)) ".ttl"))))
- (with-output-to-file
- out-file
- (lambda ()
- (for-each
- (match-lambda
- ((k v)
- (prefix k v)))
- prefixes)
- (newline)
- (for-each
- (lambda (proc)
- (proc db #:metadata? table-metadata?
- #:limit rows-per-chunk
- #:offset offset))
- inputs))
- #:encoding "UTF-8")))))))))))
+ ;; Dumping the actual data
+ (when rdf-path
+ (do ((i 0 (+ i 1)))
+ ((>= i chunks))
+ (let* ((offset (* i (or rows-per-chunk 0)))
+ (out-file
+ (if (= chunks 1)
+ rdf-path
+ (string-append (path-without-extension rdf-path)
+ "." (number->string (+ i 1)) ".ttl"))))
+ (with-output-to-file
+ out-file
+ (lambda ()
+ ;; Add the prefixes
+ (for-each
+ (match-lambda
+ ((k v)
+ (begin
+ (prefix k v))))
+ prefixes)
+ (newline)
+ (for-each
+ (lambda (proc)
+ (proc db #:metadata? table-metadata?
+ #:limit rows-per-chunk
+ #:offset offset))
+ inputs))
+ #:encoding "UTF-8"))))))))))
+
|