-
Notifications
You must be signed in to change notification settings - Fork 42
/
query.go
791 lines (725 loc) · 20.5 KB
/
query.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
// http:https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.
package sqlite
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strings"
"github.com/dop251/goja"
"github.com/go-kivik/kivik/v4"
"github.com/go-kivik/kivik/v4/driver"
internal "github.com/go-kivik/kivik/v4/int/errors"
"github.com/go-kivik/kivik/x/sqlite/v4/reduce"
)
func fromJSValue(v interface{}) (*string, error) {
if v == nil {
return nil, nil
}
vv, err := json.Marshal(v)
if err != nil {
return nil, err
}
s := string(vv)
return &s, nil
}
func (d *db) Query(ctx context.Context, ddoc, view string, options driver.Options) (driver.Rows, error) {
opts := newOpts(options)
vopts, err := opts.viewOptions(ddoc)
if err != nil {
return nil, err
}
if isBuiltinView(ddoc) {
return d.queryBuiltinView(ctx, vopts)
}
// Normalize the ddoc and view values
ddoc = strings.TrimPrefix(ddoc, "_design/")
view = strings.TrimPrefix(view, "_view/")
results, err := d.performQuery(
ctx,
ddoc, view,
vopts,
)
if err != nil {
return nil, err
}
if vopts.update == updateModeLazy {
go func() {
if _, err := d.updateIndex(context.Background(), ddoc, view, updateModeTrue); err != nil {
d.logger.Print("Failed to update index: " + err.Error())
}
}()
}
return results, nil
}
const (
leavesCTE = `
WITH leaves AS (
SELECT
rev.id,
rev.rev,
rev.rev_id,
rev.key,
doc.doc,
doc.deleted
FROM {{ .Revs }} AS rev
LEFT JOIN {{ .Revs }} AS child ON child.id = rev.id AND rev.rev = child.parent_rev AND rev.rev_id = child.parent_rev_id
JOIN {{ .Docs }} AS doc ON rev.id = doc.id AND rev.rev = doc.rev AND rev.rev_id = doc.rev_id
WHERE child.id IS NULL
AND NOT doc.deleted
)
`
)
func (d *db) performQuery(
ctx context.Context,
ddoc, view string,
vopts *viewOptions,
) (driver.Rows, error) {
if vopts.group {
return d.performGroupQuery(ctx, ddoc, view, vopts)
}
for {
rev, err := d.updateIndex(ctx, ddoc, view, vopts.update)
if err != nil {
return nil, err
}
args := []interface{}{
vopts.includeDocs, vopts.conflicts, vopts.reduce, vopts.updateSeq,
"_design/" + ddoc, rev.rev, rev.id, view, vopts.attachments,
}
where := append([]string{""}, vopts.buildWhere(&args)...)
query := fmt.Sprintf(d.ddocQuery(ddoc, view, rev.String(), leavesCTE+`,
reduce AS (
SELECT
CASE WHEN MAX(id) IS NOT NULL THEN TRUE ELSE FALSE END AS reducible,
COALESCE(func_body, "") AS reduce_func
FROM {{ .Design }}
WHERE id = $5
AND rev = $6
AND rev_id = $7
AND func_type = 'reduce'
AND func_name = $8
)
-- Metadata header
SELECT
COALESCE(MAX(last_seq), 0) == (SELECT COALESCE(max(seq),0) FROM {{ .Docs }}) AS up_to_date,
reduce.reducible,
reduce.reduce_func,
IIF($4, last_seq, "") AS update_seq,
MAX(last_seq) AS last_seq,
NULL,
0 AS attachment_count,
NULL AS filename,
NULL AS content_type,
NULL AS length,
NULL AS digest,
NULL AS rev_pos,
NULL AS data
FROM {{ .Design }} AS map
JOIN reduce
WHERE id = $5
AND rev = $6
AND rev_id = $7
AND func_type = 'map'
AND func_name = $8
UNION ALL
-- View map to pass to reduce
SELECT
*
FROM (
SELECT
id AS id,
key AS key,
value AS value,
pk AS first,
pk AS last,
NULL AS conflicts,
0 AS attachment_count,
NULL AS filename,
NULL AS content_type,
NULL AS length,
NULL AS digest,
NULL AS rev_pos,
NULL AS data
FROM {{ .Map }} AS view
JOIN reduce
WHERE reduce.reducible AND ($3 IS NULL OR $3 == TRUE)
%[5]s -- ORDER BY
)
UNION ALL
-- Normal query results
SELECT
CASE WHEN row_number = 1 THEN id END AS id,
CASE WHEN row_number = 1 THEN key END AS key,
CASE WHEN row_number = 1 THEN value END AS value,
CASE WHEN row_number = 1 THEN rev END AS rev,
CASE WHEN row_number = 1 THEN doc END AS doc,
CASE WHEN row_number = 1 THEN conflicts END AS conflicts,
COALESCE(attachment_count, 0) AS attachment_count,
filename,
content_type,
length,
digest,
rev_pos,
data
FROM (
SELECT
view.id,
view.key,
view.value,
IIF($1, view.rev || '-' || view.rev_id, "") AS rev,
view.doc,
view.conflicts,
SUM(CASE WHEN bridge.pk IS NOT NULL THEN 1 ELSE 0 END) OVER (PARTITION BY view.id, view.rev, view.rev_id) AS attachment_count,
ROW_NUMBER() OVER (PARTITION BY view.id, view.rev, view.rev_id, view.pk) AS row_number,
att.filename AS filename,
att.content_type AS content_type,
att.length AS length,
att.digest AS digest,
att.rev_pos AS rev_pos,
IIF($9, att.data, NULL) AS data
FROM (
SELECT
view.pk,
view.id,
view.key,
view.value,
docs.rev,
docs.rev_id,
IIF($1, docs.doc, NULL) AS doc,
IIF($2, GROUP_CONCAT(conflicts.rev || '-' || conflicts.rev_id, ','), NULL) AS conflicts
FROM {{ .Map }} AS view
JOIN reduce
JOIN {{ .Docs }} AS docs ON view.id = docs.id AND view.rev = docs.rev AND view.rev_id = docs.rev_id
LEFT JOIN leaves AS conflicts ON conflicts.id = view.id AND NOT (view.rev = conflicts.rev AND view.rev_id = conflicts.rev_id)
WHERE $3 == FALSE OR NOT reduce.reducible
%[2]s -- WHERE
GROUP BY view.id, view.key, view.value, view.rev, view.rev_id
%[1]s -- ORDER BY
LIMIT %[3]d OFFSET %[4]d
) AS view
LEFT JOIN {{ .AttachmentsBridge }} AS bridge ON view.id = bridge.id AND view.rev = bridge.rev AND view.rev_id = bridge.rev_id AND $1
LEFT JOIN {{ .Attachments }} AS att ON bridge.pk = att.pk
%[1]s -- ORDER BY
)
`), vopts.buildOrderBy(), strings.Join(where, " AND "), vopts.limit, vopts.skip,
vopts.buildOrderBy("pk"))
results, err := d.db.QueryContext(ctx, query, args...) //nolint:rowserrcheck // Err checked in Next
switch {
case errIsNoSuchTable(err):
return nil, &internal.Error{Status: http.StatusNotFound, Message: "missing named view"}
case err != nil:
return nil, err
}
meta, err := readFirstRow(results, vopts)
if err != nil {
return nil, err
}
if !meta.upToDate && vopts.update == updateModeTrue {
_ = results.Close() //nolint:sqlclosecheck // Not up to date, so close the results and try again
continue
}
if meta.reducible && (vopts.reduce == nil || *vopts.reduce) {
return d.reduce(ctx, meta.lastSeq, ddoc, view, rev.String(), results, meta.reduceFuncJS, vopts.reduceGroupLevel())
}
// If the results are up to date, OR, we're in false/lazy update mode,
// then these results are fine.
return &rows{
ctx: ctx,
db: d,
rows: results,
updateSeq: meta.updateSeq,
}, nil
}
}
func (d *db) performGroupQuery(ctx context.Context, ddoc, view string, vopts *viewOptions) (driver.Rows, error) {
var (
results *sql.Rows
reducible bool
reduceFuncJS string
rev revision
err error
lastSeq int
)
for {
rev, err = d.updateIndex(ctx, ddoc, view, vopts.update)
if err != nil {
return nil, err
}
query := fmt.Sprintf(d.ddocQuery(ddoc, view, rev.String(), `
WITH reduce AS (
SELECT
CASE WHEN MAX(id) IS NOT NULL THEN TRUE ELSE FALSE END AS reducible,
COALESCE(func_body, "") AS reduce_func
FROM {{ .Design }}
WHERE id = $1
AND rev = $2
AND rev_id = $3
AND func_type = 'reduce'
AND func_name = $4
)
SELECT
COALESCE(MAX(last_seq), 0) == (SELECT COALESCE(max(seq),0) FROM {{ .Docs }}) AS up_to_date,
reduce.reducible,
reduce.reduce_func,
MAX(last_seq) AS last_seq,
NULL,
NULL,
0 AS attachment_count,
NULL AS filename,
NULL AS content_type,
NULL AS length,
NULL AS digest,
NULL AS rev_pos,
NULL AS data
FROM {{ .Design }} AS map
JOIN reduce
WHERE id = $1
AND rev = $2
AND rev_id = $3
AND func_type = 'map'
AND func_name = $4
UNION ALL
SELECT *
FROM (
SELECT
id AS id,
COALESCE(key, "null") AS key,
value AS value,
pk AS first,
pk AS last,
NULL AS conflicts,
0 AS attachment_count,
NULL AS filename,
NULL AS content_type,
NULL AS length,
NULL AS digest,
NULL AS rev_pos,
NULL AS data
FROM {{ .Map }} AS view
JOIN reduce
WHERE reduce.reducible AND ($6 IS NULL OR $6 == TRUE)
%[1]s -- ORDER BY
)
`), vopts.buildOrderBy("pk"))
results, err = d.db.QueryContext( //nolint:rowserrcheck // Err checked in iterator
ctx, query,
"_design/"+ddoc, rev.rev, rev.id, view, kivik.EndKeySuffix, true,
)
switch {
case errIsNoSuchTable(err):
return nil, &internal.Error{Status: http.StatusNotFound, Message: "missing named view"}
case err != nil:
return nil, err
}
defer results.Close()
// The first row is used to verify the index is up to date
if !results.Next() {
// should never happen
return nil, errors.New("no rows returned")
}
if vopts.update != updateModeTrue {
break
}
var upToDate bool
if err := results.Scan(
&upToDate, &reducible, &reduceFuncJS, &lastSeq, discard{}, discard{},
discard{}, discard{}, discard{}, discard{}, discard{}, discard{}, discard{},
); err != nil {
return nil, err
}
if !reducible {
field := "group"
if vopts.groupLevel > 0 {
field = "group_level"
}
return nil, &internal.Error{Status: http.StatusBadRequest, Message: field + " is invalid for map-only views"}
}
if upToDate {
break
}
}
return d.reduce(ctx, lastSeq, ddoc, view, rev.String(), results, reduceFuncJS, vopts.reduceGroupLevel())
}
func (d *db) reduce(ctx context.Context, seq int, ddoc, name, rev string, results *sql.Rows, reduceFuncJS string, groupLevel int) (driver.Rows, error) {
stmt, err := d.db.PrepareContext(ctx, d.ddocQuery(ddoc, name, rev, `
INSERT INTO {{ .Reduce }} (seq, depth, first_key, first_pk, last_key, last_pk, value)
VALUES ($1, $2, $3, $4, $5, $6, $7)
`))
if err != nil {
return nil, err
}
callback := func(depth uint, rows []reduce.Row) {
for _, row := range rows {
key, _ := json.Marshal(row.Key)
var value []byte
if row.Value != nil {
value, _ = json.Marshal(row.Value)
}
fmt.Printf("INSERTING: %v, %v, %v, %v, %v, %v, %v\n", seq, depth, string(key), row.First, string(key), row.Last, string(value))
if _, err = stmt.ExecContext(ctx, seq, depth, key, row.First, key, row.Last, value); err != nil {
d.logger.Printf("Failed to insert reduce result [%v, %v, %v, %v, %v, %v, %v]: %s",
seq, depth, key, row.First, key, row.Last, value,
err)
}
}
}
return reduce.Reduce(&reduceRowIter{results: results}, reduceFuncJS, d.logger, groupLevel, callback)
}
const batchSize = 100
// updateIndex queries for the current index status, and returns the current
// ddoc revid and last_seq. If mode is "true", it will also update the index.
func (d *db) updateIndex(ctx context.Context, ddoc, view, mode string) (revision, error) {
var (
ddocRev revision
mapFuncJS *string
lastSeq int
includeDesign, localSeq sql.NullBool
)
err := d.db.QueryRowContext(ctx, d.query(`
SELECT
docs.rev,
docs.rev_id,
design.func_body,
design.include_design,
design.local_seq,
COALESCE(design.last_seq, 0) AS last_seq
FROM {{ .Docs }} AS docs
LEFT JOIN {{ .Design }} AS design ON docs.id = design.id AND docs.rev = design.rev AND docs.rev_id = design.rev_id AND design.func_type = 'map'
WHERE docs.id = $1
ORDER BY docs.rev DESC, docs.rev_id DESC
LIMIT 1
`), "_design/"+ddoc).Scan(&ddocRev.rev, &ddocRev.id, &mapFuncJS, &includeDesign, &localSeq, &lastSeq)
switch {
case errors.Is(err, sql.ErrNoRows):
return revision{}, &internal.Error{Status: http.StatusNotFound, Message: "missing"}
case err != nil:
return revision{}, err
}
if mode != "true" {
return ddocRev, nil
}
if mapFuncJS == nil {
return revision{}, &internal.Error{Status: http.StatusNotFound, Message: "missing named view"}
}
query := d.query(`
WITH leaves AS (
SELECT
rev.id AS id,
rev.rev AS rev,
rev.rev_id AS rev_id,
doc.doc,
doc.deleted
FROM {{ .Revs }} AS rev
LEFT JOIN {{ .Revs }} AS child ON child.id = rev.id AND rev.rev = child.parent_rev AND rev.rev_id = child.parent_rev_id
JOIN {{ .Docs }} AS doc ON rev.id = doc.id AND rev.rev = doc.rev AND rev.rev_id = doc.rev_id
WHERE child.id IS NULL
)
SELECT
CASE WHEN row_number = 1 THEN seq END AS seq,
CASE WHEN row_number = 1 THEN id END AS id,
CASE WHEN row_number = 1 THEN rev END AS rev,
CASE WHEN row_number = 1 THEN doc END AS doc,
CASE WHEN row_number = 1 THEN deleted END AS deleted,
COALESCE(attachment_count, 0) AS attachment_count,
filename,
content_type,
length,
digest,
rev_pos
FROM (
SELECT
seq.seq AS seq,
doc.id AS id,
doc.rev || '-' || doc.rev_id AS rev,
seq.doc AS doc,
seq.deleted AS deleted,
doc.attachment_count,
doc.row_number,
doc.filename,
doc.content_type,
doc.length,
doc.digest,
doc.rev_pos
FROM {{ .Docs }} AS seq
LEFT JOIN (
SELECT
rev.id,
rev.rev,
rev.rev_id,
SUM(CASE WHEN bridge.pk IS NOT NULL THEN 1 ELSE 0 END) OVER (PARTITION BY rev.id, rev.rev, rev.rev_id) AS attachment_count,
ROW_NUMBER() OVER (PARTITION BY rev.id, rev.rev, rev.rev_id) AS row_number,
att.filename,
att.content_type,
att.length,
att.digest,
att.rev_pos
FROM (
SELECT
id AS id,
rev AS rev,
rev_id AS rev_id,
IIF($1, doc, NULL) AS doc,
ROW_NUMBER() OVER (PARTITION BY id ORDER BY rev DESC, rev_id DESC) AS rank
FROM leaves
) AS rev
LEFT JOIN {{ .AttachmentsBridge }} AS bridge ON rev.id = bridge.id AND rev.rev = bridge.rev AND rev.rev_id = bridge.rev_id
LEFT JOIN {{ .Attachments }} AS att ON bridge.pk = att.pk
WHERE rev.rank = 1
) AS doc ON seq.id = doc.id AND seq.rev = doc.rev AND seq.rev_id = doc.rev_id
WHERE seq.seq > $1
ORDER BY seq.seq
)
`)
docs, err := d.db.QueryContext(ctx, query, lastSeq)
if err != nil {
return revision{}, err
}
defer docs.Close()
batch := newMapIndexBatch()
vm := goja.New()
emit := func(id string, rev revision) func(interface{}, interface{}) {
return func(key, value interface{}) {
defer func() {
if r := recover(); r != nil {
panic(vm.ToValue(r))
}
}()
k, err := fromJSValue(key)
if err != nil {
panic(err)
}
v, err := fromJSValue(value)
if err != nil {
panic(err)
}
batch.add(id, rev, k, v)
}
}
if _, err := vm.RunString("const map = " + *mapFuncJS); err != nil {
return revision{}, err
}
mapFunc, ok := goja.AssertFunction(vm.Get("map"))
if !ok {
return revision{}, fmt.Errorf("expected map to be a function, got %T", vm.Get("map"))
}
var seq int
for {
full := &fullDoc{}
err := iter(docs, &seq, full)
if err == io.EOF {
break
}
if err != nil {
return revision{}, err
}
// Skip design/local docs
if full.ID == "" {
continue
}
rev, err := full.rev()
if err != nil {
return revision{}, err
}
// TODO move this to the query
if strings.HasPrefix(full.ID, "_local/") ||
(!includeDesign.Bool && strings.HasPrefix(full.ID, "_design/")) {
continue
}
if full.Deleted {
batch.delete(full.ID, rev)
continue
}
if localSeq.Bool {
full.LocalSeq = seq
}
if err := vm.Set("emit", emit(full.ID, rev)); err != nil {
return revision{}, err
}
if _, err := mapFunc(goja.Undefined(), vm.ToValue(full.toMap())); err != nil {
var exception *goja.Exception
if errors.As(err, &exception) {
d.logger.Printf("map function threw exception for %s: %s", full.ID, exception.String())
batch.delete(full.ID, rev)
} else {
return revision{}, err
}
}
if batch.insertCount >= batchSize {
if err := d.writeMapIndexBatch(ctx, seq, ddocRev, ddoc, view, batch); err != nil {
return revision{}, err
}
batch.clear()
}
}
if err := d.writeMapIndexBatch(ctx, seq, ddocRev, ddoc, view, batch); err != nil {
return revision{}, err
}
return ddocRev, docs.Err()
}
func iter(docs *sql.Rows, seq *int, full *fullDoc) error {
var (
attachmentsCount int
rowSeq *int
rowID, rowRev *string
rowDoc *[]byte
rowDeleted *bool
)
for {
if !docs.Next() {
if err := docs.Err(); err != nil {
return err
}
return io.EOF
}
var (
filename, contentType *string
length *int64
revPos *int
digest *md5sum
)
if err := docs.Scan(
&rowSeq, &rowID, &rowRev, &rowDoc, &rowDeleted,
&attachmentsCount,
&filename, &contentType, &length, &digest, &revPos,
); err != nil {
return err
}
if rowSeq != nil {
*seq = *rowSeq
*full = fullDoc{
ID: *rowID,
Rev: *rowRev,
Doc: *rowDoc,
Deleted: *rowDeleted,
}
}
if filename != nil {
if full.Attachments == nil {
full.Attachments = make(map[string]*attachment)
}
full.Attachments[*filename] = &attachment{
ContentType: *contentType,
Length: *length,
Digest: *digest,
RevPos: *revPos,
}
}
if attachmentsCount == len(full.Attachments) {
break
}
}
return nil
}
type docRev struct {
id string
rev int
revID string
}
type mapIndexBatch struct {
insertCount int
entries map[docRev][]mapIndexEntry
deleted []docRev
}
type mapIndexEntry struct {
Key *string
Value *string
}
func newMapIndexBatch() *mapIndexBatch {
return &mapIndexBatch{
entries: make(map[docRev][]mapIndexEntry, batchSize),
}
}
func (b *mapIndexBatch) add(id string, rev revision, key, value *string) {
mapKey := docRev{
id: id,
rev: rev.rev,
revID: rev.id,
}
b.entries[mapKey] = append(b.entries[mapKey], mapIndexEntry{
Key: key,
Value: value,
})
b.insertCount++
}
func (b *mapIndexBatch) delete(id string, rev revision) {
mapKey := docRev{
id: id,
rev: rev.rev,
revID: rev.id,
}
b.deleted = append(b.deleted, mapKey)
b.insertCount -= len(b.entries[mapKey])
delete(b.entries, mapKey)
}
func (b *mapIndexBatch) clear() {
b.insertCount = 0
b.deleted = b.deleted[:0]
b.entries = make(map[docRev][]mapIndexEntry, batchSize)
}
func (d *db) writeMapIndexBatch(ctx context.Context, seq int, rev revision, ddoc, viewName string, batch *mapIndexBatch) error {
tx, err := d.db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
if _, err := tx.ExecContext(ctx, d.query(`
UPDATE {{ .Design }}
SET last_seq=$1
WHERE id = $2
AND rev = $3
AND rev_id = $4
AND func_type = 'map'
AND func_name = $5
`), seq, "_design/"+ddoc, rev.rev, rev.id, viewName); err != nil {
return err
}
// Clear any stale entries
if len(batch.entries) > 0 || len(batch.deleted) > 0 {
ids := make([]interface{}, 0, len(batch.entries)+len(batch.deleted))
for mapKey := range batch.entries {
ids = append(ids, mapKey.id)
}
for _, mapKey := range batch.deleted {
ids = append(ids, mapKey.id)
}
query := fmt.Sprintf(d.ddocQuery(ddoc, viewName, rev.String(), `
DELETE FROM {{ .Map }}
WHERE id IN (%s)
`), placeholders(1, len(ids)))
if _, err := tx.ExecContext(ctx, query, ids...); err != nil {
return err
}
}
if batch.insertCount > 0 {
args := make([]interface{}, 0, batch.insertCount*5)
values := make([]string, 0, batch.insertCount)
for mapKey, entries := range batch.entries {
for _, entry := range entries {
values = append(values, fmt.Sprintf("($%d, $%d, $%d, $%d, $%d)", len(args)+1, len(args)+2, len(args)+3, len(args)+4, len(args)+5))
args = append(args, mapKey.id, mapKey.rev, mapKey.revID, entry.Key, entry.Value)
}
}
query := d.ddocQuery(ddoc, viewName, rev.String(), `
INSERT INTO {{ .Map }} (id, rev, rev_id, key, value)
VALUES
`) + strings.Join(values, ",")
if _, err := tx.ExecContext(ctx, query, args...); err != nil {
return err
}
}
return tx.Commit()
}