Skip to content

Commit

Permalink
Merge pull request #959 from go-kivik/reduceCombine
Browse files Browse the repository at this point in the history
Extract reduce logic to separate function
  • Loading branch information
flimzy committed May 2, 2024
2 parents 3541516 + 50eabc2 commit 5c375e0
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 85 deletions.
87 changes: 2 additions & 85 deletions x/sqlite/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
package sqlite

import (
"bytes"
"context"
"database/sql"
"encoding/json"
Expand All @@ -22,7 +21,6 @@ import (
"io"
"log"
"net/http"
"slices"
"strings"

"github.com/dop251/goja"
Expand Down Expand Up @@ -193,42 +191,7 @@ func (d *db) performQuery(ctx context.Context, ddoc, view, update string, reduce
}

if reducible && (reduce == nil || *reduce) {
reduceFn, err := d.reduceFunc(reduceFuncJS, d.logger)
if err != nil {
return nil, err
}
var (
keys [][2]interface{}
values []interface{}

id, key, value *string
)

for results.Next() {
if err := results.Scan(&id, &key, &value, discard{}, discard{}, discard{}); err != nil {
return nil, err
}
keys = append(keys, [2]interface{}{id, key})
if value == nil {
values = append(values, nil)
} else {
values = append(values, *value)
}
}

if err := results.Err(); err != nil {
return nil, err
}

rv := reduceFn(keys, values, false)
tmp, _ := json.Marshal(rv)
return &reducedRows{
{
Key: json.RawMessage(`null`),
Value: bytes.NewReader(tmp),
},
}, nil

return d.reduceRows(results, reduceFuncJS, false)
}

return &rows{
Expand Down Expand Up @@ -332,53 +295,7 @@ func (d *db) performGroupQuery(ctx context.Context, ddoc, view, update string, g
}
}

reduceFn, err := d.reduceFunc(reduceFuncJS, d.logger)
if err != nil {
return nil, err
}
var (
intermediate = map[string][]interface{}{}

id, key string
rowValue *string
)

for results.Next() {
if err := results.Scan(&id, &key, &rowValue, discard{}, discard{}, discard{}); err != nil {
return nil, err
}
var value interface{}
if rowValue != nil {
value = *rowValue
}
rv := reduceFn([][2]interface{}{{id, key}}, []interface{}{value}, false)
intermediate[key] = append(intermediate[key], rv)
}

if err := results.Err(); err != nil {
return nil, err
}

final := make(reducedRows, 0, len(intermediate))
for key, values := range intermediate {
var value json.RawMessage
if len(values) > 1 {
rv := reduceFn(nil, values, true)
value, _ = json.Marshal(rv)
} else {
value, _ = json.Marshal(values[0])
}
final = append(final, driver.Row{
Key: json.RawMessage(key),
Value: bytes.NewReader(value),
})
}

slices.SortFunc(final, func(a, b driver.Row) int {
return couchdbCmpJSON(a.Key, b.Key)
})

return &final, nil
return d.reduceRows(results, reduceFuncJS, true)
}

const batchSize = 100
Expand Down
71 changes: 71 additions & 0 deletions x/sqlite/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,82 @@
package sqlite

import (
"bytes"
"database/sql"
"encoding/json"
"io"
"slices"

"github.com/go-kivik/kivik/v4/driver"
)

func (d *db) reduceRows(results *sql.Rows, reduceFuncJS *string, group bool) (driver.Rows, error) {
reduceFn, err := d.reduceFunc(reduceFuncJS, d.logger)
if err != nil {
return nil, err
}
var (
intermediate = map[string][]interface{}{}

id, key string
rowValue *string
)

for results.Next() {
if err := results.Scan(&id, &key, &rowValue, discard{}, discard{}, discard{}); err != nil {
return nil, err
}
var value interface{}
if rowValue != nil {
value = *rowValue
}
rv := reduceFn([][2]interface{}{{id, key}}, []interface{}{value}, false)
intermediate[key] = append(intermediate[key], rv)
}

if err := results.Err(); err != nil {
return nil, err
}

// Note that group_level is handled at the query level, so we don't need to
// worry about it here.
if !group {
var values []interface{}
for _, v := range intermediate {
values = append(values, v...)
}
rv := reduceFn(nil, values, true)
tmp, _ := json.Marshal(rv)
return &reducedRows{
{
Key: json.RawMessage(`null`),
Value: bytes.NewReader(tmp),
},
}, nil
}

final := make(reducedRows, 0, len(intermediate))
for key, values := range intermediate {
var value json.RawMessage
if len(values) > 1 {
rv := reduceFn(nil, values, true)
value, _ = json.Marshal(rv)
} else {
value, _ = json.Marshal(values[0])
}
final = append(final, driver.Row{
Key: json.RawMessage(key),
Value: bytes.NewReader(value),
})
}

slices.SortFunc(final, func(a, b driver.Row) int {
return couchdbCmpJSON(a.Key, b.Key)
})

return &final, nil
}

type reducedRows []driver.Row

var _ driver.Rows = (*reducedRows)(nil)
Expand Down

0 comments on commit 5c375e0

Please sign in to comment.