diff --git a/x/sqlite/options.go b/x/sqlite/options.go index 49b3aab8..4f62e49e 100644 --- a/x/sqlite/options.go +++ b/x/sqlite/options.go @@ -18,6 +18,7 @@ import ( "math" "net/http" "strconv" + "strings" "github.com/go-kivik/kivik/v4/driver" internal "github.com/go-kivik/kivik/v4/int/errors" @@ -589,10 +590,18 @@ func (v viewOptions) buildWhere(args *[]any) []string { return where } -func (v viewOptions) buildOrderBy() string { +func (v viewOptions) buildOrderBy(moreColumns ...string) string { if v.sorted { direction := descendingToDirection(v.descending) - return "ORDER BY view.key " + direction + if len(moreColumns) == 0 { + return "ORDER BY view.key " + direction + } + conditions := make([]string, 0, len(moreColumns)+1) + conditions = append(conditions, "view.key "+direction) + for _, col := range moreColumns { + conditions = append(conditions, "view."+col+" "+direction) + } + return "ORDER BY " + strings.Join(conditions, ", ") } return "" } diff --git a/x/sqlite/query.go b/x/sqlite/query.go index d54f5b02..b30c7ccf 100644 --- a/x/sqlite/query.go +++ b/x/sqlite/query.go @@ -130,12 +130,13 @@ func (d *db) performQuery( AND func_name = $8 ) + -- Metadata header SELECT COALESCE(MAX(last_seq), 0) == (SELECT COALESCE(max(seq),0) FROM {{ .Docs }}) AS up_to_date, reduce.reducible, reduce.reduce_func, IIF($4, last_seq, "") AS update_seq, - NULL, + MAX(last_seq) AS last_seq, NULL, 0 AS attachment_count, NULL AS filename, @@ -154,14 +155,16 @@ func (d *db) performQuery( UNION ALL - SELECT * + -- View map to pass to reduce + SELECT + * FROM ( SELECT id AS id, key AS key, value AS value, - NULL AS rev, - NULL AS doc, + pk AS first, + pk AS last, NULL AS conflicts, 0 AS attachment_count, NULL AS filename, @@ -173,11 +176,12 @@ func (d *db) performQuery( FROM {{ .Map }} AS view JOIN reduce WHERE reduce.reducible AND ($3 IS NULL OR $3 == TRUE) - ORDER BY key + %[5]s -- ORDER BY ) UNION ALL + -- Normal query results SELECT CASE WHEN row_number = 1 THEN id END AS id, CASE WHEN row_number = 1 THEN key END AS key, @@ -232,7 +236,8 @@ func (d *db) performQuery( LEFT JOIN {{ .Attachments }} AS att ON bridge.pk = att.pk %[1]s -- ORDER BY ) - `), vopts.buildOrderBy(), strings.Join(where, " AND "), vopts.limit, vopts.skip) + `), vopts.buildOrderBy(), strings.Join(where, " AND "), vopts.limit, vopts.skip, + vopts.buildOrderBy("pk")) results, err := d.db.QueryContext(ctx, query, args...) //nolint:rowserrcheck // Err checked in Next switch { case errIsNoSuchTable(err): @@ -252,8 +257,7 @@ func (d *db) performQuery( } if meta.reducible && (vopts.reduce == nil || *vopts.reduce) { - ri := &reduceRowIter{results: results} - return reduce.Reduce(ri, meta.reduceFuncJS, d.logger, vopts.reduceGroupLevel(), nil) + return d.reduce(ctx, meta.lastSeq, ddoc, view, rev.String(), results, meta.reduceFuncJS, vopts.reduceGroupLevel()) } // If the results are up to date, OR, we're in false/lazy update mode, @@ -272,9 +276,12 @@ func (d *db) performGroupQuery(ctx context.Context, ddoc, view string, vopts *vi results *sql.Rows reducible bool reduceFuncJS string + rev revision + err error + lastSeq int ) for { - rev, err := d.updateIndex(ctx, ddoc, view, vopts.update) + rev, err = d.updateIndex(ctx, ddoc, view, vopts.update) if err != nil { return nil, err } @@ -296,7 +303,7 @@ func (d *db) performGroupQuery(ctx context.Context, ddoc, view string, vopts *vi COALESCE(MAX(last_seq), 0) == (SELECT COALESCE(max(seq),0) FROM {{ .Docs }}) AS up_to_date, reduce.reducible, reduce.reduce_func, - NULL, + MAX(last_seq) AS last_seq, NULL, NULL, 0 AS attachment_count, @@ -322,8 +329,8 @@ func (d *db) performGroupQuery(ctx context.Context, ddoc, view string, vopts *vi id AS id, COALESCE(key, "null") AS key, value AS value, - NULL AS rev, - NULL AS doc, + pk AS first, + pk AS last, NULL AS conflicts, 0 AS attachment_count, NULL AS filename, @@ -337,7 +344,7 @@ func (d *db) performGroupQuery(ctx context.Context, ddoc, view string, vopts *vi WHERE reduce.reducible AND ($6 IS NULL OR $6 == TRUE) %[1]s -- ORDER BY ) - `), vopts.buildOrderBy()) + `), vopts.buildOrderBy("pk")) results, err = d.db.QueryContext( //nolint:rowserrcheck // Err checked in iterator ctx, query, @@ -361,7 +368,7 @@ func (d *db) performGroupQuery(ctx context.Context, ddoc, view string, vopts *vi } var upToDate bool if err := results.Scan( - &upToDate, &reducible, &reduceFuncJS, discard{}, discard{}, discard{}, + &upToDate, &reducible, &reduceFuncJS, &lastSeq, discard{}, discard{}, discard{}, discard{}, discard{}, discard{}, discard{}, discard{}, discard{}, ); err != nil { return nil, err @@ -378,8 +385,33 @@ func (d *db) performGroupQuery(ctx context.Context, ddoc, view string, vopts *vi } } - ri := &reduceRowIter{results: results} - return reduce.Reduce(ri, reduceFuncJS, d.logger, vopts.reduceGroupLevel(), nil) + return d.reduce(ctx, lastSeq, ddoc, view, rev.String(), results, reduceFuncJS, vopts.reduceGroupLevel()) +} + +func (d *db) reduce(ctx context.Context, seq int, ddoc, name, rev string, results *sql.Rows, reduceFuncJS string, groupLevel int) (driver.Rows, error) { + stmt, err := d.db.PrepareContext(ctx, d.ddocQuery(ddoc, name, rev, ` + INSERT INTO {{ .Reduce }} (seq, depth, first_key, first_pk, last_key, last_pk, value) + VALUES ($1, $2, $3, $4, $5, $6, $7) + `)) + if err != nil { + return nil, err + } + callback := func(depth uint, rows []reduce.Row) { + for _, row := range rows { + key, _ := json.Marshal(row.Key) + var value []byte + if row.Value != nil { + value, _ = json.Marshal(row.Value) + } + fmt.Printf("INSERTING: %v, %v, %v, %v, %v, %v, %v\n", seq, depth, string(key), row.First, string(key), row.Last, string(value)) + if _, err = stmt.ExecContext(ctx, seq, depth, key, row.First, key, row.Last, value); err != nil { + d.logger.Printf("Failed to insert reduce result [%v, %v, %v, %v, %v, %v, %v]: %s", + seq, depth, key, row.First, key, row.Last, value, + err) + } + } + } + return reduce.Reduce(&reduceRowIter{results: results}, reduceFuncJS, d.logger, groupLevel, callback) } const batchSize = 100 diff --git a/x/sqlite/reduce.go b/x/sqlite/reduce.go index 6d504db7..2c8e69f6 100644 --- a/x/sqlite/reduce.go +++ b/x/sqlite/reduce.go @@ -17,7 +17,6 @@ import ( "encoding/json" "io" - "github.com/go-kivik/kivik/v4/driver" "github.com/go-kivik/kivik/x/sqlite/v4/reduce" ) @@ -34,7 +33,7 @@ func (r *reduceRowIter) ReduceNext(row *reduce.Row) error { } var key, value *[]byte err := r.results.Scan( - &row.ID, &key, &value, discard{}, discard{}, discard{}, + &row.ID, &key, &value, &row.First, &row.Last, discard{}, discard{}, discard{}, discard{}, discard{}, discard{}, discard{}, discard{}, ) if err != nil { @@ -54,27 +53,5 @@ func (r *reduceRowIter) ReduceNext(row *reduce.Row) error { } else { row.Value = nil } - return err -} - -type reducedRows []driver.Row - -var _ driver.Rows = (*reducedRows)(nil) - -func (r *reducedRows) Close() error { - *r = nil - return nil -} - -func (r *reducedRows) Next(row *driver.Row) error { - if len(*r) == 0 { - return io.EOF - } - *row = (*r)[0] - *r = (*r)[1:] return nil } - -func (*reducedRows) Offset() int64 { return 0 } -func (*reducedRows) TotalRows() int64 { return 0 } -func (*reducedRows) UpdateSeq() string { return "" } diff --git a/x/sqlite/reduce/reduce.go b/x/sqlite/reduce/reduce.go index 9e3ab942..4a902e84 100644 --- a/x/sqlite/reduce/reduce.go +++ b/x/sqlite/reduce/reduce.go @@ -92,6 +92,10 @@ func (*Rows) UpdateSeq() string { return "" } // [CouchDB reduce function]: https://docs.couchdb.org/en/stable/ddocs/ddocs.html#reduce-and-rereduce-functions type Func func(keys [][2]interface{}, values []interface{}, rereduce bool) ([]interface{}, error) +// Callback is called with the group depth and result of each intermediate +// reduce call. It can be used to cache intermediate results. +type Callback func(depth uint, rows []Row) + // Reduce calls fn on rows, and returns the results. The input must be in // key-sorted order, and may contain both previously reduced rows, and map // output rows. cb, if not nil, is called with the results of every @@ -104,7 +108,7 @@ type Func func(keys [][2]interface{}, values []interface{}, rereduce bool) ([]in // -1: Maximum grouping, same as group=true // 0: No grouping, same as group=false // 1+: Group by the first N elements of the key, same as group_level=N -func Reduce(rows Reducer, javascript string, logger *log.Logger, groupLevel int, cb func([]Row)) (*Rows, error) { +func Reduce(rows Reducer, javascript string, logger *log.Logger, groupLevel int, cb Callback) (*Rows, error) { fn, err := ParseFunc(javascript, logger) if err != nil { return nil, err @@ -112,7 +116,7 @@ func Reduce(rows Reducer, javascript string, logger *log.Logger, groupLevel int, return reduce(rows, fn, groupLevel, cb) } -func reduce(rows Reducer, fn Func, groupLevel int, cb func([]Row)) (*Rows, error) { +func reduce(rows Reducer, fn Func, groupLevel int, cb Callback) (*Rows, error) { out := make(Rows, 0, 1) var first, last int @@ -148,7 +152,16 @@ func reduce(rows Reducer, fn Func, groupLevel int, cb func([]Row)) (*Rows, error first, last = 0, 0 } if cb != nil { - cb(rows) + var depth uint + switch t := key.(type) { + case nil: + // depth is 0 for non-grouped results + case []any: + depth = uint(len(t)) + default: + depth = 1 + } + cb(depth, rows) } out = append(out, rows...) return nil diff --git a/x/sqlite/reduce/reduce_test.go b/x/sqlite/reduce/reduce_test.go index 5ad22485..cbaddfef 100644 --- a/x/sqlite/reduce/reduce_test.go +++ b/x/sqlite/reduce/reduce_test.go @@ -147,7 +147,7 @@ func TestReduce(t *testing.T) { tests.Run(t, func(t *testing.T, tt test) { var cache [][]Row - cb := func(rows []Row) { + cb := func(_ uint, rows []Row) { cache = append(cache, rows) } got, err := Reduce(tt.input, tt.javascript, log.New(io.Discard, "", 0), tt.groupLevel, cb) diff --git a/x/sqlite/schema.go b/x/sqlite/schema.go index ded14d53..21b720c4 100644 --- a/x/sqlite/schema.go +++ b/x/sqlite/schema.go @@ -87,7 +87,7 @@ var schema = []string{ } var viewSchema = []string{ - `CREATE TABLE IF NOT EXISTS {{ .Map }} ( + `CREATE TABLE {{ .Map }} ( pk INTEGER PRIMARY KEY, id TEXT NOT NULL, rev INTEGER NOT NULL, @@ -96,5 +96,15 @@ var viewSchema = []string{ value TEXT, FOREIGN KEY (id, rev, rev_id) REFERENCES {{ .Docs }} (id, rev, rev_id) )`, - `CREATE INDEX IF NOT EXISTS {{ .IndexMap }} ON {{ .Map }} (key)`, + `CREATE INDEX {{ .IndexMap }} ON {{ .Map }} (key)`, + `CREATE TABLE {{ .Reduce }} ( + seq INTEGER NOT NULL, + depth INTEGER NOT NULL, + first_key TEXT COLLATE COUCHDB_UCI, + first_pk INTEGER NOT NULL, + last_key TEXT COLLATE COUCHDB_UCI, + last_pk INTEGER NOT NULL, + value TEXT, + UNIQUE (depth, first_key, first_pk, last_key, last_pk) + )`, } diff --git a/x/sqlite/views.go b/x/sqlite/views.go index a1951817..236d6d13 100644 --- a/x/sqlite/views.go +++ b/x/sqlite/views.go @@ -184,6 +184,7 @@ type viewMetadata struct { reducible bool reduceFuncJS string updateSeq string + lastSeq int } // readFirstRow reads the first row from the resultset, which contains. In the @@ -195,8 +196,9 @@ func readFirstRow(results *sql.Rows, vopts *viewOptions) (*viewMetadata, error) return nil, errors.New("no rows returned") } var meta viewMetadata + var lastSeq *int if err := results.Scan( - &meta.upToDate, &meta.reducible, &meta.reduceFuncJS, &meta.updateSeq, discard{}, discard{}, + &meta.upToDate, &meta.reducible, &meta.reduceFuncJS, &meta.updateSeq, &lastSeq, discard{}, discard{}, discard{}, discard{}, discard{}, discard{}, discard{}, discard{}, ); err != nil { _ = results.Close() //nolint:sqlclosecheck // Aborting @@ -206,6 +208,9 @@ func readFirstRow(results *sql.Rows, vopts *viewOptions) (*viewMetadata, error) _ = results.Close() //nolint:sqlclosecheck // Aborting return nil, &internal.Error{Status: http.StatusBadRequest, Message: "reduce is invalid for map-only views"} } + if lastSeq != nil { + meta.lastSeq = *lastSeq + } return &meta, nil }