Skip to content

Commit

Permalink
Merge pull request #992 from go-kivik/reduceCache
Browse files Browse the repository at this point in the history
Reduce cache
  • Loading branch information
flimzy committed Jun 3, 2024
2 parents 8acaaa6 + be0f9fb commit 3577771
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 49 deletions.
13 changes: 11 additions & 2 deletions x/sqlite/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"math"
"net/http"
"strconv"
"strings"

"github.com/go-kivik/kivik/v4/driver"
internal "github.com/go-kivik/kivik/v4/int/errors"
Expand Down Expand Up @@ -589,10 +590,18 @@ func (v viewOptions) buildWhere(args *[]any) []string {
return where
}

func (v viewOptions) buildOrderBy() string {
func (v viewOptions) buildOrderBy(moreColumns ...string) string {
if v.sorted {
direction := descendingToDirection(v.descending)
return "ORDER BY view.key " + direction
if len(moreColumns) == 0 {
return "ORDER BY view.key " + direction
}
conditions := make([]string, 0, len(moreColumns)+1)
conditions = append(conditions, "view.key "+direction)
for _, col := range moreColumns {
conditions = append(conditions, "view."+col+" "+direction)
}
return "ORDER BY " + strings.Join(conditions, ", ")
}
return ""
}
Expand Down
64 changes: 48 additions & 16 deletions x/sqlite/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,13 @@ func (d *db) performQuery(
AND func_name = $8
)
-- Metadata header
SELECT
COALESCE(MAX(last_seq), 0) == (SELECT COALESCE(max(seq),0) FROM {{ .Docs }}) AS up_to_date,
reduce.reducible,
reduce.reduce_func,
IIF($4, last_seq, "") AS update_seq,
NULL,
MAX(last_seq) AS last_seq,
NULL,
0 AS attachment_count,
NULL AS filename,
Expand All @@ -154,14 +155,16 @@ func (d *db) performQuery(
UNION ALL
SELECT *
-- View map to pass to reduce
SELECT
*
FROM (
SELECT
id AS id,
key AS key,
value AS value,
NULL AS rev,
NULL AS doc,
pk AS first,
pk AS last,
NULL AS conflicts,
0 AS attachment_count,
NULL AS filename,
Expand All @@ -173,11 +176,12 @@ func (d *db) performQuery(
FROM {{ .Map }} AS view
JOIN reduce
WHERE reduce.reducible AND ($3 IS NULL OR $3 == TRUE)
ORDER BY key
%[5]s -- ORDER BY
)
UNION ALL
-- Normal query results
SELECT
CASE WHEN row_number = 1 THEN id END AS id,
CASE WHEN row_number = 1 THEN key END AS key,
Expand Down Expand Up @@ -232,7 +236,8 @@ func (d *db) performQuery(
LEFT JOIN {{ .Attachments }} AS att ON bridge.pk = att.pk
%[1]s -- ORDER BY
)
`), vopts.buildOrderBy(), strings.Join(where, " AND "), vopts.limit, vopts.skip)
`), vopts.buildOrderBy(), strings.Join(where, " AND "), vopts.limit, vopts.skip,
vopts.buildOrderBy("pk"))
results, err := d.db.QueryContext(ctx, query, args...) //nolint:rowserrcheck // Err checked in Next
switch {
case errIsNoSuchTable(err):
Expand All @@ -252,8 +257,7 @@ func (d *db) performQuery(
}

if meta.reducible && (vopts.reduce == nil || *vopts.reduce) {
ri := &reduceRowIter{results: results}
return reduce.Reduce(ri, meta.reduceFuncJS, d.logger, vopts.reduceGroupLevel(), nil)
return d.reduce(ctx, meta.lastSeq, ddoc, view, rev.String(), results, meta.reduceFuncJS, vopts.reduceGroupLevel())
}

// If the results are up to date, OR, we're in false/lazy update mode,
Expand All @@ -272,9 +276,12 @@ func (d *db) performGroupQuery(ctx context.Context, ddoc, view string, vopts *vi
results *sql.Rows
reducible bool
reduceFuncJS string
rev revision
err error
lastSeq int
)
for {
rev, err := d.updateIndex(ctx, ddoc, view, vopts.update)
rev, err = d.updateIndex(ctx, ddoc, view, vopts.update)
if err != nil {
return nil, err
}
Expand All @@ -296,7 +303,7 @@ func (d *db) performGroupQuery(ctx context.Context, ddoc, view string, vopts *vi
COALESCE(MAX(last_seq), 0) == (SELECT COALESCE(max(seq),0) FROM {{ .Docs }}) AS up_to_date,
reduce.reducible,
reduce.reduce_func,
NULL,
MAX(last_seq) AS last_seq,
NULL,
NULL,
0 AS attachment_count,
Expand All @@ -322,8 +329,8 @@ func (d *db) performGroupQuery(ctx context.Context, ddoc, view string, vopts *vi
id AS id,
COALESCE(key, "null") AS key,
value AS value,
NULL AS rev,
NULL AS doc,
pk AS first,
pk AS last,
NULL AS conflicts,
0 AS attachment_count,
NULL AS filename,
Expand All @@ -337,7 +344,7 @@ func (d *db) performGroupQuery(ctx context.Context, ddoc, view string, vopts *vi
WHERE reduce.reducible AND ($6 IS NULL OR $6 == TRUE)
%[1]s -- ORDER BY
)
`), vopts.buildOrderBy())
`), vopts.buildOrderBy("pk"))

results, err = d.db.QueryContext( //nolint:rowserrcheck // Err checked in iterator
ctx, query,
Expand All @@ -361,7 +368,7 @@ func (d *db) performGroupQuery(ctx context.Context, ddoc, view string, vopts *vi
}
var upToDate bool
if err := results.Scan(
&upToDate, &reducible, &reduceFuncJS, discard{}, discard{}, discard{},
&upToDate, &reducible, &reduceFuncJS, &lastSeq, discard{}, discard{},
discard{}, discard{}, discard{}, discard{}, discard{}, discard{}, discard{},
); err != nil {
return nil, err
Expand All @@ -378,8 +385,33 @@ func (d *db) performGroupQuery(ctx context.Context, ddoc, view string, vopts *vi
}
}

ri := &reduceRowIter{results: results}
return reduce.Reduce(ri, reduceFuncJS, d.logger, vopts.reduceGroupLevel(), nil)
return d.reduce(ctx, lastSeq, ddoc, view, rev.String(), results, reduceFuncJS, vopts.reduceGroupLevel())
}

func (d *db) reduce(ctx context.Context, seq int, ddoc, name, rev string, results *sql.Rows, reduceFuncJS string, groupLevel int) (driver.Rows, error) {
stmt, err := d.db.PrepareContext(ctx, d.ddocQuery(ddoc, name, rev, `
INSERT INTO {{ .Reduce }} (seq, depth, first_key, first_pk, last_key, last_pk, value)
VALUES ($1, $2, $3, $4, $5, $6, $7)
`))
if err != nil {
return nil, err
}
callback := func(depth uint, rows []reduce.Row) {
for _, row := range rows {
key, _ := json.Marshal(row.Key)
var value []byte
if row.Value != nil {
value, _ = json.Marshal(row.Value)
}
fmt.Printf("INSERTING: %v, %v, %v, %v, %v, %v, %v\n", seq, depth, string(key), row.First, string(key), row.Last, string(value))
if _, err = stmt.ExecContext(ctx, seq, depth, key, row.First, key, row.Last, value); err != nil {
d.logger.Printf("Failed to insert reduce result [%v, %v, %v, %v, %v, %v, %v]: %s",
seq, depth, key, row.First, key, row.Last, value,
err)
}
}
}
return reduce.Reduce(&reduceRowIter{results: results}, reduceFuncJS, d.logger, groupLevel, callback)
}

const batchSize = 100
Expand Down
25 changes: 1 addition & 24 deletions x/sqlite/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"encoding/json"
"io"

"github.com/go-kivik/kivik/v4/driver"
"github.com/go-kivik/kivik/x/sqlite/v4/reduce"
)

Expand All @@ -34,7 +33,7 @@ func (r *reduceRowIter) ReduceNext(row *reduce.Row) error {
}
var key, value *[]byte
err := r.results.Scan(
&row.ID, &key, &value, discard{}, discard{}, discard{},
&row.ID, &key, &value, &row.First, &row.Last, discard{},
discard{}, discard{}, discard{}, discard{}, discard{}, discard{}, discard{},
)
if err != nil {
Expand All @@ -54,27 +53,5 @@ func (r *reduceRowIter) ReduceNext(row *reduce.Row) error {
} else {
row.Value = nil
}
return err
}

type reducedRows []driver.Row

var _ driver.Rows = (*reducedRows)(nil)

func (r *reducedRows) Close() error {
*r = nil
return nil
}

func (r *reducedRows) Next(row *driver.Row) error {
if len(*r) == 0 {
return io.EOF
}
*row = (*r)[0]
*r = (*r)[1:]
return nil
}

func (*reducedRows) Offset() int64 { return 0 }
func (*reducedRows) TotalRows() int64 { return 0 }
func (*reducedRows) UpdateSeq() string { return "" }
19 changes: 16 additions & 3 deletions x/sqlite/reduce/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ func (*Rows) UpdateSeq() string { return "" }
// [CouchDB reduce function]: https://docs.couchdb.org/en/stable/ddocs/ddocs.html#reduce-and-rereduce-functions
type Func func(keys [][2]interface{}, values []interface{}, rereduce bool) ([]interface{}, error)

// Callback is called with the group depth and result of each intermediate
// reduce call. It can be used to cache intermediate results.
type Callback func(depth uint, rows []Row)

// Reduce calls fn on rows, and returns the results. The input must be in
// key-sorted order, and may contain both previously reduced rows, and map
// output rows. cb, if not nil, is called with the results of every
Expand All @@ -104,15 +108,15 @@ type Func func(keys [][2]interface{}, values []interface{}, rereduce bool) ([]in
// -1: Maximum grouping, same as group=true
// 0: No grouping, same as group=false
// 1+: Group by the first N elements of the key, same as group_level=N
func Reduce(rows Reducer, javascript string, logger *log.Logger, groupLevel int, cb func([]Row)) (*Rows, error) {
func Reduce(rows Reducer, javascript string, logger *log.Logger, groupLevel int, cb Callback) (*Rows, error) {
fn, err := ParseFunc(javascript, logger)
if err != nil {
return nil, err
}
return reduce(rows, fn, groupLevel, cb)
}

func reduce(rows Reducer, fn Func, groupLevel int, cb func([]Row)) (*Rows, error) {
func reduce(rows Reducer, fn Func, groupLevel int, cb Callback) (*Rows, error) {
out := make(Rows, 0, 1)
var first, last int

Expand Down Expand Up @@ -148,7 +152,16 @@ func reduce(rows Reducer, fn Func, groupLevel int, cb func([]Row)) (*Rows, error
first, last = 0, 0
}
if cb != nil {
cb(rows)
var depth uint
switch t := key.(type) {
case nil:
// depth is 0 for non-grouped results
case []any:
depth = uint(len(t))
default:
depth = 1
}
cb(depth, rows)
}
out = append(out, rows...)
return nil
Expand Down
2 changes: 1 addition & 1 deletion x/sqlite/reduce/reduce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func TestReduce(t *testing.T) {

tests.Run(t, func(t *testing.T, tt test) {
var cache [][]Row
cb := func(rows []Row) {
cb := func(_ uint, rows []Row) {
cache = append(cache, rows)
}
got, err := Reduce(tt.input, tt.javascript, log.New(io.Discard, "", 0), tt.groupLevel, cb)
Expand Down
14 changes: 12 additions & 2 deletions x/sqlite/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ var schema = []string{
}

var viewSchema = []string{
`CREATE TABLE IF NOT EXISTS {{ .Map }} (
`CREATE TABLE {{ .Map }} (
pk INTEGER PRIMARY KEY,
id TEXT NOT NULL,
rev INTEGER NOT NULL,
Expand All @@ -96,5 +96,15 @@ var viewSchema = []string{
value TEXT,
FOREIGN KEY (id, rev, rev_id) REFERENCES {{ .Docs }} (id, rev, rev_id)
)`,
`CREATE INDEX IF NOT EXISTS {{ .IndexMap }} ON {{ .Map }} (key)`,
`CREATE INDEX {{ .IndexMap }} ON {{ .Map }} (key)`,
`CREATE TABLE {{ .Reduce }} (
seq INTEGER NOT NULL,
depth INTEGER NOT NULL,
first_key TEXT COLLATE COUCHDB_UCI,
first_pk INTEGER NOT NULL,
last_key TEXT COLLATE COUCHDB_UCI,
last_pk INTEGER NOT NULL,
value TEXT,
UNIQUE (depth, first_key, first_pk, last_key, last_pk)
)`,
}
7 changes: 6 additions & 1 deletion x/sqlite/views.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ type viewMetadata struct {
reducible bool
reduceFuncJS string
updateSeq string
lastSeq int
}

// readFirstRow reads the first row from the resultset, which contains. In the
Expand All @@ -195,8 +196,9 @@ func readFirstRow(results *sql.Rows, vopts *viewOptions) (*viewMetadata, error)
return nil, errors.New("no rows returned")
}
var meta viewMetadata
var lastSeq *int
if err := results.Scan(
&meta.upToDate, &meta.reducible, &meta.reduceFuncJS, &meta.updateSeq, discard{}, discard{},
&meta.upToDate, &meta.reducible, &meta.reduceFuncJS, &meta.updateSeq, &lastSeq, discard{},
discard{}, discard{}, discard{}, discard{}, discard{}, discard{}, discard{},
); err != nil {
_ = results.Close() //nolint:sqlclosecheck // Aborting
Expand All @@ -206,6 +208,9 @@ func readFirstRow(results *sql.Rows, vopts *viewOptions) (*viewMetadata, error)
_ = results.Close() //nolint:sqlclosecheck // Aborting
return nil, &internal.Error{Status: http.StatusBadRequest, Message: "reduce is invalid for map-only views"}
}
if lastSeq != nil {
meta.lastSeq = *lastSeq
}
return &meta, nil
}

Expand Down

0 comments on commit 3577771

Please sign in to comment.