Skip to content

Commit

Permalink
Merge pull request #956 from go-kivik/reduce2
Browse files Browse the repository at this point in the history
More reduce progress
  • Loading branch information
flimzy committed Apr 29, 2024
2 parents 33b382a + b59389d commit 8cc3915
Show file tree
Hide file tree
Showing 5 changed files with 200 additions and 133 deletions.
32 changes: 28 additions & 4 deletions x/sqlite/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,10 +231,34 @@ func (o optsMap) update() (string, error) {
return "", &internal.Error{Status: http.StatusBadRequest, Message: "invalid value for `update`"}
}

func (o optsMap) reduce() *bool {
v, ok := toBool(o["reduce"])
func (o optsMap) reduce() (*bool, error) {
raw, ok := o["reduce"]
if !ok {
return nil
return nil, nil
}
return &v
v, ok := toBool(raw)
if !ok {
return nil, &internal.Error{Status: http.StatusBadRequest, Message: "invalid value for `reduce`"}
}
return &v, nil
}

func (o optsMap) group() (bool, error) {
raw, ok := o["group"]
if !ok {
return false, nil
}
v, ok := toBool(raw)
if !ok {
return false, &internal.Error{Status: http.StatusBadRequest, Message: "invalid value for `group`"}
}
return v, nil
}

func (o optsMap) groupLevel() (uint64, error) {
raw, ok := o["group_level"]
if !ok {
return 0, nil
}
return toUint64(raw, "invalid value for `group_level`")
}
230 changes: 112 additions & 118 deletions x/sqlite/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package sqlite

import (
"bytes"
"context"
"database/sql"
"encoding/json"
Expand Down Expand Up @@ -52,34 +53,67 @@ func (d *db) Query(ctx context.Context, ddoc, view string, options driver.Option
ddoc = strings.TrimPrefix(ddoc, "_design/")
view = strings.TrimPrefix(view, "_view/")

var results *sql.Rows
reduce, err := opts.reduce()
if err != nil {
return nil, err
}
if _, err := opts.group(); err != nil {
return nil, err
}
if _, err := opts.groupLevel(); err != nil {
return nil, err
}

results, err := d.performQuery(ctx, ddoc, view, update, reduce)
if err != nil {
return nil, err
}

if update == updateModeLazy {
go func() {
if _, err := d.updateIndex(context.Background(), ddoc, view, updateModeTrue); err != nil {
d.logger.Print("Failed to update index: " + err.Error())
}
}()
}

return results, nil
}

func (d *db) performQuery(ctx context.Context, ddoc, view, update string, reduce *bool) (driver.Rows, error) {
var (
results *sql.Rows
reducible bool
reduceFuncJS *string
)
for {
rev, err := d.updateIndex(ctx, ddoc, view, update)
if err != nil {
return nil, err
}

query := d.ddocQuery(ddoc, view, rev.String(), `
WITH view AS (
SELECT EXISTS(
SELECT 1
FROM {{ .Design }}
WHERE id = $1
AND rev = $2
AND rev_id = $3
AND func_type = 'reduce'
) AS reducable
WITH reduce AS (
SELECT
CASE WHEN MAX(id) IS NOT NULL THEN TRUE ELSE FALSE END AS reducable,
func_body AS reduce_func
FROM {{ .Design }}
WHERE id = $1
AND rev = $2
AND rev_id = $3
AND func_type = 'reduce'
AND func_name = $4
)
SELECT
COALESCE(MAX(last_seq), 0) == (SELECT COALESCE(max(seq),0) FROM {{ .Docs }}) AS up_to_date,
view.reducable,
NULL,
reduce.reducable,
reduce.reduce_func,
NULL,
NULL,
NULL
FROM {{ .Design }}
JOIN view
FROM {{ .Design }} AS map
JOIN reduce
WHERE id = $1
AND rev = $2
AND rev_id = $3
Expand All @@ -91,15 +125,16 @@ func (d *db) Query(ctx context.Context, ddoc, view string, options driver.Option
SELECT *
FROM (
SELECT
NULL AS id,
NULL AS key,
id AS id,
key AS key,
value AS value,
"" AS rev,
NULL AS rev,
NULL AS doc,
"" AS conflicts
FROM {{ .Reduce }}
JOIN view
WHERE view.reducable AND ($6 IS NULL OR $6 == TRUE)
NULL AS conflicts
FROM {{ .Map }}
JOIN reduce
WHERE reduce.reducable AND ($6 IS NULL OR $6 == TRUE)
ORDER BY id, key
)
UNION ALL
Expand All @@ -114,15 +149,15 @@ func (d *db) Query(ctx context.Context, ddoc, view string, options driver.Option
NULL AS doc,
"" AS conflicts
FROM {{ .Map }}
JOIN view
WHERE $6 == FALSE OR NOT view.reducable
JOIN reduce
WHERE $6 == FALSE OR NOT reduce.reducable
ORDER BY key
)
`)

results, err = d.db.QueryContext( //nolint:rowserrcheck // Err checked in Next
ctx, query,
"_design/"+ddoc, rev.rev, rev.id, view, kivik.EndKeySuffix, opts.reduce(),
"_design/"+ddoc, rev.rev, rev.id, view, kivik.EndKeySuffix, reduce,
)
switch {
case errIsNoSuchTable(err):
Expand All @@ -139,24 +174,55 @@ func (d *db) Query(ctx context.Context, ddoc, view string, options driver.Option
if update != updateModeTrue {
break
}
var upToDate, reducible bool
if err := results.Scan(&upToDate, &reducible, discard{}, discard{}, discard{}, discard{}); err != nil {
var upToDate bool
if err := results.Scan(&upToDate, &reducible, &reduceFuncJS, discard{}, discard{}, discard{}); err != nil {
return nil, err
}
if reduce := opts.reduce(); reduce != nil && *reduce && !reducible {
if reduce != nil && *reduce && !reducible {
return nil, &internal.Error{Status: http.StatusBadRequest, Message: "reduce is invalid for map-only views"}
}
if upToDate {
break
}
}

if update == updateModeLazy {
go func() {
if _, err := d.updateIndex(context.Background(), ddoc, view, updateModeTrue); err != nil {
d.logger.Print("Failed to update index: " + err.Error())
if reducible && (reduce == nil || *reduce) {
reduceFn, err := d.reduceFunc(reduceFuncJS, d.logger)
if err != nil {
return nil, err
}
var (
keys [][2]interface{}
values []interface{}

id, key, value *string
)

for results.Next() {
if err := results.Scan(&id, &key, &value, discard{}, discard{}, discard{}); err != nil {
return nil, err
}
}()
keys = append(keys, [2]interface{}{id, key})
if value == nil {
values = append(values, nil)
} else {
values = append(values, *value)
}
}

if err := results.Err(); err != nil {
return nil, err
}

rv := reduceFn(keys, values, false)
tmp, _ := json.Marshal(rv)
return &reducedRows{
{
Key: json.RawMessage(`null`),
Value: bytes.NewReader(tmp),
},
}, nil

}

return &rows{
Expand All @@ -172,36 +238,22 @@ const batchSize = 100
// ddoc revid and last_seq. If mode is "true", it will also update the index.
func (d *db) updateIndex(ctx context.Context, ddoc, view, mode string) (revision, error) {
var (
ddocRev revision
mapFuncJS, reduceFuncJS *string
lastSeq int
ddocRev revision
mapFuncJS *string
lastSeq int
)
err := d.db.QueryRowContext(ctx, d.query(`
WITH design AS (
SELECT
id,
rev,
rev_id,
MAX(CASE WHEN func_type = 'map' THEN func_body END) AS map_func,
MAX(CASE WHEN func_type = 'reduce' THEN func_body END) AS reduce_func,
MAX(last_seq) AS last_seq
FROM {{ .Design }}
WHERE id = $1
AND func_type IN ('map', 'reduce')
GROUP BY id, rev, rev_id
)
SELECT
docs.rev,
docs.rev_id,
design.map_func,
design.reduce_func,
design.func_body,
COALESCE(design.last_seq, 0) AS last_seq
FROM {{ .Docs }} AS docs
LEFT JOIN design ON docs.id = design.id AND docs.rev = design.rev AND docs.rev_id = design.rev_id
LEFT JOIN {{ .Design }} AS design ON docs.id = design.id AND docs.rev = design.rev AND docs.rev_id = design.rev_id AND design.func_type = 'map'
WHERE docs.id = $1
ORDER BY docs.rev DESC, docs.rev_id DESC
LIMIT 1
`), "_design/"+ddoc).Scan(&ddocRev.rev, &ddocRev.id, &mapFuncJS, &reduceFuncJS, &lastSeq)
`), "_design/"+ddoc).Scan(&ddocRev.rev, &ddocRev.id, &mapFuncJS, &lastSeq)
switch {
case errors.Is(err, sql.ErrNoRows):
return revision{}, &internal.Error{Status: http.StatusNotFound, Message: "missing"}
Expand Down Expand Up @@ -261,7 +313,7 @@ func (d *db) updateIndex(ctx context.Context, ddoc, view, mode string) (revision
}
defer docs.Close()

batch := newMapIndexBatch(reduceFuncJS)
batch := newMapIndexBatch()

vm := goja.New()

Expand Down Expand Up @@ -395,19 +447,16 @@ type mapIndexBatch struct {
insertCount int
entries map[string][]mapIndexEntry
deleted []string
// reduce is the text of the JS reduce function, if any.
reduce *string
}

type mapIndexEntry struct {
Key *string
Value *string
}

func newMapIndexBatch(reduce *string) *mapIndexBatch {
func newMapIndexBatch() *mapIndexBatch {
return &mapIndexBatch{
entries: make(map[string][]mapIndexEntry, batchSize),
reduce: reduce,
}
}

Expand Down Expand Up @@ -486,71 +535,16 @@ func (d *db) writeMapIndexBatch(ctx context.Context, seq int, rev revision, ddoc
}
}

reduceFunc, err := batch.reduceFunc(d.logger)
if err != nil {
return err
}
if reduceFunc != nil {
if _, err := tx.ExecContext(ctx, d.ddocQuery(ddoc, viewName, rev.String(), `
DELETE FROM {{ .Reduce }}
`)); err != nil {
return err
}

rows, err := tx.QueryContext(ctx, d.ddocQuery(ddoc, viewName, rev.String(), `
SELECT id, key, value
FROM {{ .Map }}
`))
if err != nil {
return err
}
defer rows.Close()

var (
keys [][2]interface{}
values []interface{}

id, key, value *string
)

for rows.Next() {
if err := rows.Scan(&id, &key, &value); err != nil {
return err
}
keys = append(keys, [2]interface{}{id, key})
if value == nil {
values = append(values, nil)
} else {
values = append(values, *value)
}
}
if err := rows.Err(); err != nil {
return err
}

rv := reduceFunc(keys, values, false)
var rvJSON *json.RawMessage
if rv != nil {
tmp, _ := json.Marshal(rv)
rvJSON = (*json.RawMessage)(&tmp)
}

if _, err := tx.ExecContext(ctx, d.ddocQuery(ddoc, viewName, rev.String(), `
INSERT INTO {{ .Reduce }} (min_key, max_key, value)
VALUES ($1, $2, $3)
`), nil, kivik.EndKeySuffix, rvJSON); err != nil {
return err
}
}

return tx.Commit()
}

func (b *mapIndexBatch) reduceFunc(logger *log.Logger) (func(keys [][2]interface{}, values []interface{}, rereduce bool) interface{}, error) {
if b.reduce == nil {
type reduceFunc func(keys [][2]interface{}, values []interface{}, rereduce bool) interface{}

func (d *db) reduceFunc(reduceFuncJS *string, logger *log.Logger) (reduceFunc, error) {
if reduceFuncJS == nil {
return nil, nil
}
switch *b.reduce {
switch *reduceFuncJS {
case "_count":
return func(_ [][2]interface{}, values []interface{}, rereduce bool) interface{} {
if !rereduce {
Expand All @@ -575,7 +569,7 @@ func (b *mapIndexBatch) reduceFunc(logger *log.Logger) (func(keys [][2]interface
default:
vm := goja.New()

if _, err := vm.RunString("const reduce = " + *b.reduce); err != nil {
if _, err := vm.RunString("const reduce = " + *reduceFuncJS); err != nil {
return nil, err
}
reduceFunc, ok := goja.AssertFunction(vm.Get("reduce"))
Expand Down
Loading

0 comments on commit 8cc3915

Please sign in to comment.