Skip to content

Commit

Permalink
[rank] adding batch insertions for ranking definitions and references (
Browse files Browse the repository at this point in the history
…sourcegraph#47989)

Depends on sourcegraph#47987

Adds batch insertions to the definitions and references tables for
ranking.

## Test plan
Added test and tested manually.

<!-- All pull requests REQUIRE a test plan:
https://docs.sourcegraph.com/dev/background-information/testing_principles
-->

![letsgo](https://media.giphy.com/media/3o7TKUM3IgJBX2as9O/giphy.gif)
  • Loading branch information
cesrjimenez committed Feb 21, 2023
1 parent 7bd3a02 commit 004291a
Show file tree
Hide file tree
Showing 7 changed files with 1,200 additions and 0 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ type operations struct {

reindexUploads *observation.Operation
reindexUploadByID *observation.Operation

// Ranking
insertDefinitionsAndReferencesForDocument *observation.Operation
insertDefintionsForRanking *observation.Operation
insertReferencesForRanking *observation.Operation
}

var m = new(metrics.SingletonREDMetrics)
Expand Down Expand Up @@ -176,5 +181,10 @@ func newOperations(observationCtx *observation.Context) *operations {

reindexUploads: op("ReindexUploads"),
reindexUploadByID: op("ReindexUploadByID"),

// Ranking
insertDefinitionsAndReferencesForDocument: op("InsertDefinitionsAndReferencesForDocument"),
insertDefintionsForRanking: op("InsertDefintionsForRanking"),
insertReferencesForRanking: op("InsertReferencesForRanking"),
}
}
6 changes: 6 additions & 0 deletions enterprise/internal/codeintel/uploads/internal/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

logger "github.com/sourcegraph/log"
"github.com/sourcegraph/scip/bindings/go/scip"

"github.com/sourcegraph/sourcegraph/enterprise/internal/codeintel/shared/types"
"github.com/sourcegraph/sourcegraph/enterprise/internal/codeintel/uploads/shared"
Expand Down Expand Up @@ -107,6 +108,11 @@ type Store interface {

ReindexUploads(ctx context.Context, opts shared.ReindexUploadsOptions) error
ReindexUploadByID(ctx context.Context, id int) error

// Ranking
InsertDefinitionsAndReferencesForDocument(ctx context.Context, upload ExportedUpload, rankingGraphKey string, rankingBatchSize int, f func(ctx context.Context, upload ExportedUpload, rankingBatchSize int, rankingGraphKey, path string, document *scip.Document) error) (err error)
InsertDefintionsForRanking(ctx context.Context, rankingGraphKey string, rankingBatchSize int, defintions []shared.RankingDefintions) (err error)
InsertReferencesForRanking(ctx context.Context, rankingGraphKey string, rankingBatchSize int, references shared.RankingReferences) (err error)
}

// store manages the database operations for uploads.
Expand Down
207 changes: 207 additions & 0 deletions enterprise/internal/codeintel/uploads/internal/store/store_ranking.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
package store

import (
"bytes"
"context"

"github.com/keegancsmith/sqlf"
"github.com/lib/pq"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/sourcegraph/scip/bindings/go/scip"
"google.golang.org/protobuf/proto"

"github.com/sourcegraph/sourcegraph/enterprise/internal/codeintel/uploads/shared"
"github.com/sourcegraph/sourcegraph/internal/database/basestore"
"github.com/sourcegraph/sourcegraph/internal/database/batch"
"github.com/sourcegraph/sourcegraph/internal/observation"
)

func (s *store) InsertDefinitionsAndReferencesForDocument(
ctx context.Context,
upload ExportedUpload,
rankingGraphKey string,
rankingBatchNumber int,
setDefsAndRefs func(ctx context.Context, upload ExportedUpload, rankingBatchNumber int, rankingGraphKey, path string, document *scip.Document) error,
) (err error) {
ctx, _, endObservation := s.operations.insertDefinitionsAndReferencesForDocument.With(ctx, &err, observation.Args{LogFields: []otlog.Field{
otlog.Int("id", upload.ID),
}})
defer endObservation(1, observation.Args{})

rows, err := s.db.Query(ctx, sqlf.Sprintf(getDocumentsByUploadIDQuery, upload.ID))
if err != nil {
return err
}
defer func() { err = basestore.CloseRows(rows, err) }()

for rows.Next() {
var path string
var compressedSCIPPayload []byte
if err := rows.Scan(&path, &compressedSCIPPayload); err != nil {
return err
}

scipPayload, err := shared.Decompressor.Decompress(bytes.NewReader(compressedSCIPPayload))
if err != nil {
return err
}

var document scip.Document
if err := proto.Unmarshal(scipPayload, &document); err != nil {
return err
}
err = setDefsAndRefs(ctx, upload, rankingBatchNumber, rankingGraphKey, path, &document)
if err != nil {
return err
}
}

return nil
}

const getDocumentsByUploadIDQuery = `
SELECT
sid.document_path,
sd.raw_scip_payload
FROM codeintel_scip_document_lookup sid
JOIN codeintel_scip_documents sd ON sd.id = sid.document_id
WHERE sid.upload_id = %s
ORDER BY sid.document_path
`

func (s *store) InsertDefintionsForRanking(
ctx context.Context,
rankingGraphKey string,
rankingBatchNumber int,
defintions []shared.RankingDefintions,
) (err error) {
ctx, _, endObservation := s.operations.insertDefintionsForRanking.With(
ctx,
&err,
observation.Args{},
)
defer endObservation(1, observation.Args{})

tx, err := s.db.Transact(ctx)
if err != nil {
return err
}
defer func() { err = tx.Done(err) }()

inserter := func(inserter *batch.Inserter) error {
batchDefinitions := make([]shared.RankingDefintions, 0, rankingBatchNumber)
for _, def := range defintions {
batchDefinitions = append(batchDefinitions, def)

if len(batchDefinitions) == rankingBatchNumber {
if err := insertDefinitions(ctx, inserter, rankingGraphKey, batchDefinitions); err != nil {
return err
}
batchDefinitions = make([]shared.RankingDefintions, 0, rankingBatchNumber)
}
}

if len(batchDefinitions) > 0 {
if err := insertDefinitions(ctx, inserter, rankingGraphKey, batchDefinitions); err != nil {
return err
}
}

return nil
}

if err := batch.WithInserter(
ctx,
tx.Handle(),
"codeintel_ranking_definitions",
batch.MaxNumPostgresParameters,
[]string{
"upload_id",
"symbol_name",
"repository",
"document_path",
"graph_key",
},
inserter,
); err != nil {
return err
}

return nil
}

func insertDefinitions(
ctx context.Context,
inserter *batch.Inserter,
rankingGraphKey string,
definitions []shared.RankingDefintions,
) error {
for _, def := range definitions {
if err := inserter.Insert(
ctx,
def.UploadID,
def.SymbolName,
def.Repository,
def.DocumentPath,
rankingGraphKey,
); err != nil {
return err
}
}
return nil
}

func (s *store) InsertReferencesForRanking(
ctx context.Context,
rankingGraphKey string,
rankingBatchNumber int,
references shared.RankingReferences,
) (err error) {
ctx, _, endObservation := s.operations.insertReferencesForRanking.With(
ctx,
&err,
observation.Args{},
)
defer endObservation(1, observation.Args{})

tx, err := s.db.Transact(ctx)
if err != nil {
return err
}
defer func() { err = tx.Done(err) }()

inserter := func(inserter *batch.Inserter) error {
batchSymbolNames := make([]string, 0, rankingBatchNumber)
for _, ref := range references.SymbolNames {
batchSymbolNames = append(batchSymbolNames, ref)

if len(batchSymbolNames) == rankingBatchNumber {
if err := inserter.Insert(ctx, references.UploadID, pq.Array(batchSymbolNames), rankingGraphKey); err != nil {
return err
}
batchSymbolNames = make([]string, 0, rankingBatchNumber)
}
}

if len(batchSymbolNames) > 0 {
if err := inserter.Insert(ctx, references.UploadID, pq.Array(batchSymbolNames), rankingGraphKey); err != nil {
return err
}
}

return nil
}

if err := batch.WithInserter(
ctx,
tx.Handle(),
"codeintel_ranking_references",
batch.MaxNumPostgresParameters,
[]string{"upload_id", "symbol_names", "graph_key"},
inserter,
); err != nil {
return err
}

return nil
}
Loading

0 comments on commit 004291a

Please sign in to comment.