Skip to content

Commit

Permalink
Merge pull request #969 from go-kivik/mergeQueryMethods
Browse files Browse the repository at this point in the history
Merge query methods
  • Loading branch information
flimzy committed May 13, 2024
2 parents 0bcafec + eb36cdc commit 8b40051
Show file tree
Hide file tree
Showing 7 changed files with 200 additions and 68 deletions.
4 changes: 2 additions & 2 deletions x/sqlite/changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ func (d *db) newNormalChanges(ctx context.Context, opts optsMap, since, lastSeq

var query string
if opts.includeDocs() {
query = d.normalChangesQueryWithDocs(opts.direction())
query = d.normalChangesQueryWithDocs(descendingToDirection(opts.descending()))
} else {
query = d.normalChangesQueryWithoutDocs(opts.direction())
query = d.normalChangesQueryWithoutDocs(descendingToDirection(opts.descending()))
}
if limit > 0 {
query += " LIMIT " + strconv.FormatUint(limit+1, 10)
Expand Down
4 changes: 4 additions & 0 deletions x/sqlite/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,10 @@ type fullDoc struct {
Deleted bool `json:"_deleted,omitempty"`
}

func (d fullDoc) rev() (revision, error) {
return parseRev(d.Rev)
}

func (d *fullDoc) toRaw() json.RawMessage {
buf := bytes.Buffer{}
_ = buf.WriteByte('{')
Expand Down
7 changes: 0 additions & 7 deletions x/sqlite/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,13 +250,6 @@ func (o optsMap) descending() bool {
return v
}

func (o optsMap) direction() string {
if o.descending() {
return "DESC"
}
return "ASC"
}

func (o optsMap) includeDocs() bool {
v, _ := toBool(o["include_docs"])
return v
Expand Down
119 changes: 85 additions & 34 deletions x/sqlite/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,26 @@ func fromJSValue(v interface{}) (*string, error) {

func (d *db) Query(ctx context.Context, ddoc, view string, options driver.Options) (driver.Rows, error) {
opts := newOpts(options)

limit, err := opts.limit()
if err != nil {
return nil, err
}
skip, err := opts.skip()
if err != nil {
return nil, err
}
endkey := opts.endKey()
startkey := opts.startKey()
descending := opts.descending()
includeDocs := opts.includeDocs()
inclusiveEnd := opts.inclusiveEnd()
conflicts := opts.conflicts()

switch ddoc {
case viewAllDocs, viewLocalDocs, viewDesignDocs:
return d.queryBuiltinView(ctx, ddoc, startkey, endkey, limit, skip, includeDocs, descending, inclusiveEnd, conflicts)
}
update, err := opts.update()
if err != nil {
return nil, err
Expand All @@ -64,7 +84,7 @@ func (d *db) Query(ctx context.Context, ddoc, view string, options driver.Option
return nil, err
}

results, err := d.performQuery(ctx, ddoc, view, update, reduce, group, groupLevel)
results, err := d.performQuery(ctx, ddoc, view, update, reduce, group, includeDocs, groupLevel, limit, skip)
if err != nil {
return nil, err
}
Expand All @@ -80,7 +100,14 @@ func (d *db) Query(ctx context.Context, ddoc, view string, options driver.Option
return results, nil
}

func (d *db) performQuery(ctx context.Context, ddoc, view, update string, reduce *bool, group bool, groupLevel uint64) (driver.Rows, error) {
func (d *db) performQuery(
ctx context.Context,
ddoc, view, update string,
reduce *bool,
group, includeDocs bool,
groupLevel uint64,
limit, skip int64,
) (driver.Rows, error) {
if group {
return d.performGroupQuery(ctx, ddoc, view, update, groupLevel)
}
Expand All @@ -95,7 +122,7 @@ func (d *db) performQuery(ctx context.Context, ddoc, view, update string, reduce
return nil, err
}

query := d.ddocQuery(ddoc, view, rev.String(), `
query := fmt.Sprintf(d.ddocQuery(ddoc, view, rev.String(), `
WITH reduce AS (
SELECT
CASE WHEN MAX(id) IS NOT NULL THEN TRUE ELSE FALSE END AS reducable,
Expand Down Expand Up @@ -145,22 +172,25 @@ func (d *db) performQuery(ctx context.Context, ddoc, view, update string, reduce
SELECT *
FROM (
SELECT
id,
key,
value,
"" AS rev,
NULL AS doc,
map.id,
map.key,
map.value,
IIF($7, docs.rev || '-' || docs.rev_id, "") AS rev,
IIF($7, docs.doc, NULL) AS doc,
"" AS conflicts
FROM {{ .Map }}
FROM {{ .Map }} AS map
JOIN reduce
JOIN {{ .Docs }} AS docs ON map.id = docs.id AND map.rev = docs.rev AND map.rev_id = docs.rev_id
WHERE $6 == FALSE OR NOT reduce.reducable
ORDER BY key
LIMIT %[1]d OFFSET %[2]d
)
`)
`), limit, skip)

results, err = d.db.QueryContext( //nolint:rowserrcheck // Err checked in Next
ctx, query,
"_design/"+ddoc, rev.rev, rev.id, view, kivik.EndKeySuffix, reduce,
includeDocs,
)
switch {
case errIsNoSuchTable(err):
Expand Down Expand Up @@ -382,7 +412,7 @@ func (d *db) updateIndex(ctx context.Context, ddoc, view, mode string) (revision

vm := goja.New()

emit := func(id string) func(interface{}, interface{}) {
emit := func(id string, rev revision) func(interface{}, interface{}) {
return func(key, value interface{}) {
defer func() {
if r := recover(); r != nil {
Expand All @@ -397,7 +427,7 @@ func (d *db) updateIndex(ctx context.Context, ddoc, view, mode string) (revision
if err != nil {
panic(err)
}
batch.add(id, k, v)
batch.add(id, rev, k, v)
}
}

Expand All @@ -421,19 +451,24 @@ func (d *db) updateIndex(ctx context.Context, ddoc, view, mode string) (revision
return revision{}, err
}

rev, err := full.rev()
if err != nil {
return revision{}, err
}

if full.Deleted {
batch.delete(full.ID)
batch.delete(full.ID, rev)
continue
}

if err := vm.Set("emit", emit(full.ID)); err != nil {
if err := vm.Set("emit", emit(full.ID, rev)); err != nil {
return revision{}, err
}
if _, err := mapFunc(goja.Undefined(), vm.ToValue(full.toMap())); err != nil {
var exception *goja.Exception
if errors.As(err, &exception) {
d.logger.Printf("map function threw exception for %s: %s", full.ID, exception.String())
batch.delete(full.ID)
batch.delete(full.ID, rev)
} else {
return revision{}, err
}
Expand Down Expand Up @@ -508,10 +543,16 @@ func iter(docs *sql.Rows, seq *int, full *fullDoc) error {
return nil
}

type docRev struct {
id string
rev int
revID string
}

type mapIndexBatch struct {
insertCount int
entries map[string][]mapIndexEntry
deleted []string
entries map[docRev][]mapIndexEntry
deleted []docRev
}

type mapIndexEntry struct {
Expand All @@ -521,28 +562,38 @@ type mapIndexEntry struct {

func newMapIndexBatch() *mapIndexBatch {
return &mapIndexBatch{
entries: make(map[string][]mapIndexEntry, batchSize),
entries: make(map[docRev][]mapIndexEntry, batchSize),
}
}

func (b *mapIndexBatch) add(id string, key, value *string) {
b.entries[id] = append(b.entries[id], mapIndexEntry{
func (b *mapIndexBatch) add(id string, rev revision, key, value *string) {
mapKey := docRev{
id: id,
rev: rev.rev,
revID: rev.id,
}
b.entries[mapKey] = append(b.entries[mapKey], mapIndexEntry{
Key: key,
Value: value,
})
b.insertCount++
}

func (b *mapIndexBatch) delete(id string) {
b.deleted = append(b.deleted, id)
b.insertCount -= len(b.entries[id])
delete(b.entries, id)
func (b *mapIndexBatch) delete(id string, rev revision) {
mapKey := docRev{
id: id,
rev: rev.rev,
revID: rev.id,
}
b.deleted = append(b.deleted, mapKey)
b.insertCount -= len(b.entries[mapKey])
delete(b.entries, mapKey)
}

func (b *mapIndexBatch) clear() {
b.insertCount = 0
b.deleted = b.deleted[:0]
b.entries = make(map[string][]mapIndexEntry, batchSize)
b.entries = make(map[docRev][]mapIndexEntry, batchSize)
}

func (d *db) writeMapIndexBatch(ctx context.Context, seq int, rev revision, ddoc, viewName string, batch *mapIndexBatch) error {
Expand All @@ -567,11 +618,11 @@ func (d *db) writeMapIndexBatch(ctx context.Context, seq int, rev revision, ddoc
// Clear any stale entries
if len(batch.entries) > 0 || len(batch.deleted) > 0 {
ids := make([]interface{}, 0, len(batch.entries)+len(batch.deleted))
for id := range batch.entries {
ids = append(ids, id)
for mapKey := range batch.entries {
ids = append(ids, mapKey.id)
}
for _, id := range batch.deleted {
ids = append(ids, id)
for _, mapKey := range batch.deleted {
ids = append(ids, mapKey.id)
}
query := fmt.Sprintf(d.ddocQuery(ddoc, viewName, rev.String(), `
DELETE FROM {{ .Map }}
Expand All @@ -583,16 +634,16 @@ func (d *db) writeMapIndexBatch(ctx context.Context, seq int, rev revision, ddoc
}

if batch.insertCount > 0 {
args := make([]interface{}, 0, batch.insertCount*3)
args := make([]interface{}, 0, batch.insertCount*5)
values := make([]string, 0, batch.insertCount)
for id, entries := range batch.entries {
for mapKey, entries := range batch.entries {
for _, entry := range entries {
values = append(values, fmt.Sprintf("($%d, $%d, $%d)", len(args)+1, len(args)+2, len(args)+3))
args = append(args, id, entry.Key, entry.Value)
values = append(values, fmt.Sprintf("($%d, $%d, $%d, $%d, $%d)", len(args)+1, len(args)+2, len(args)+3, len(args)+4, len(args)+5))
args = append(args, mapKey.id, mapKey.rev, mapKey.revID, entry.Key, entry.Value)
}
}
query := d.ddocQuery(ddoc, viewName, rev.String(), `
INSERT INTO {{ .Map }} (id, key, value)
INSERT INTO {{ .Map }} (id, rev, rev_id, key, value)
VALUES
`) + strings.Join(values, ",")
if _, err := tx.ExecContext(ctx, query, args...); err != nil {
Expand Down
84 changes: 81 additions & 3 deletions x/sqlite/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1208,6 +1208,87 @@ func TestDBQuery(t *testing.T) {
},
}
})
tests.Add("limit=1", func(t *testing.T) interface{} {
d := newDB(t)
_ = d.tPut("_design/foo", map[string]interface{}{
"views": map[string]interface{}{
"bar": map[string]string{
"map": `function(doc) {
if (doc.key) {
emit(doc.key, doc.value);
}
}`,
},
},
})
_ = d.tPut("a", map[string]interface{}{"key": "a", "value": 1})
_ = d.tPut("b", map[string]interface{}{"key": "b", "value": 2})

return test{
db: d,
ddoc: "_design/foo",
view: "_view/bar",
options: kivik.Param("limit", 1),
want: []rowResult{
{ID: "a", Key: `"a"`, Value: "1"},
},
}
})
tests.Add("limit=1, skip=1", func(t *testing.T) interface{} {
d := newDB(t)
_ = d.tPut("_design/foo", map[string]interface{}{
"views": map[string]interface{}{
"bar": map[string]string{
"map": `function(doc) {
if (doc.key) {
emit(doc.key, doc.value);
}
}`,
},
},
})
_ = d.tPut("a", map[string]interface{}{"key": "a", "value": 1})
_ = d.tPut("b", map[string]interface{}{"key": "b", "value": 2})
_ = d.tPut("c", map[string]interface{}{"key": "c", "value": 3})

return test{
db: d,
ddoc: "_design/foo",
view: "_view/bar",
options: kivik.Params(map[string]interface{}{
"limit": 1,
"skip": 1,
}),
want: []rowResult{
{ID: "b", Key: `"b"`, Value: "2"},
},
}
})
tests.Add("include_docs=true", func(t *testing.T) interface{} {
d := newDB(t)
_ = d.tPut("_design/foo", map[string]interface{}{
"views": map[string]interface{}{
"bar": map[string]string{
"map": `function(doc) {
if (doc.key) {
emit(doc.key, doc.value);
}
}`,
},
},
})
rev := d.tPut("a", map[string]interface{}{"key": "a", "value": 1})

return test{
db: d,
ddoc: "_design/foo",
view: "_view/bar",
options: kivik.Param("include_docs", true),
want: []rowResult{
{ID: "a", Key: `"a"`, Value: "1", Doc: `{"_id":"a","_rev":"` + rev + `","key":"a","value":1}`},
},
}
})
/*
TODO:
- _stats
Expand All @@ -1234,13 +1315,10 @@ func TestDBQuery(t *testing.T) {
- end_key
- endkey_docid
- end_key_doc_id
- include_docs
- inclusive_end
- key
- keys
- limit
- reduce
- skip
- sorted
- stable // N/A only for clusters
- stale // deprecated
Expand Down
5 changes: 4 additions & 1 deletion x/sqlite/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,11 @@ var schema = []string{
var viewSchema = []string{
`CREATE TABLE IF NOT EXISTS {{ .Map }} (
id TEXT NOT NULL,
rev INTEGER NOT NULL,
rev_id TEXT NOT NULL,
key TEXT COLLATE COUCHDB_UCI,
value TEXT
value TEXT,
FOREIGN KEY (id, rev, rev_id) REFERENCES {{ .Docs }} (id, rev, rev_id)
)`,
`CREATE INDEX IF NOT EXISTS {{ .IndexMap }} ON {{ .Map }} (key)`,
}
Loading

0 comments on commit 8b40051

Please sign in to comment.