Skip to content

Commit

Permalink
[rank] adds reduce background job that reduces all the counts to ranks (
Browse files Browse the repository at this point in the history
sourcegraph#48008)

Adds reduce background job that reduces all the counts to ranks

## Test plan
Test manually
<!-- All pull requests REQUIRE a test plan:
https://docs.sourcegraph.com/dev/background-information/testing_principles
-->
  • Loading branch information
cesrjimenez committed Feb 22, 2023
1 parent c9ab2fe commit a4098c5
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 145 deletions.
7 changes: 7 additions & 0 deletions enterprise/internal/codeintel/uploads/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,5 +216,12 @@ func NewGraphExporters(observationCtx *observation.Context, uploadSvc *Service)
ConfigExportInst.RankingInterval,
ConfigExportInst.RankingJobsEnabled,
),
background.NewRankingGraphReducer(
observationCtx,
uploadSvc,
ConfigExportInst.NumRankingRoutines,
ConfigExportInst.RankingInterval,
ConfigExportInst.RankingJobsEnabled,
),
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
type UploadService interface {
ExportRankingGraph(ctx context.Context, numRankingRoutines int, numBatchSize int, rankingJobEnabled bool) error
MapRankingGraph(ctx context.Context, numRankingRoutines int, rankingJobEnabled bool) error
ReduceRankingGraph(ctx context.Context, numRankingRoutines int, rankingJobEnabled bool) (float64, float64, error)
VacuumRankingGraph(ctx context.Context) error
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ func NewRankingGraphExporter(
// }

return nil
}))
}),
)
}

func NewRankingGraphMapper(
Expand All @@ -50,5 +51,32 @@ func NewRankingGraphMapper(
return err
}
return nil
}))
}),
)
}

func NewRankingGraphReducer(
observationCtx *observation.Context,
uploadsService UploadService,
numRankingRoutines int,
interval time.Duration,
rankingJobEnabled bool,
) goroutine.BackgroundRoutine {
operations := newRankingOperations(observationCtx)
return goroutine.NewPeriodicGoroutine(
context.Background(),
"rank.graph-reducer", "reduces path_counts_inputs into a count of paths per repository and stores it in path_ranks table in store.",
interval,
goroutine.HandlerFunc(func(ctx context.Context) error {
numPathRanksInserted, numPathCountsInputsProcessed, err := uploadsService.ReduceRankingGraph(ctx, numRankingRoutines, rankingJobEnabled)
if err != nil {
return err
}

operations.numPathCountsInputsRowsProcessed.Add(numPathCountsInputsProcessed)
operations.numPathRanksInserted.Add(numPathRanksInserted)

return nil
}),
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,35 @@ func newOperations(observationCtx *observation.Context) *operations {
numReconcileDeletesFromCodeIntelDB: numReconcileDeletesFromCodeIntelDB,
}
}

type rankingOperations struct {
numPathCountsInputsRowsProcessed prometheus.Counter
numPathRanksInserted prometheus.Counter
}

func newRankingOperations(observationCtx *observation.Context) *rankingOperations {
counter := func(name, help string) prometheus.Counter {
counter := prometheus.NewCounter(prometheus.CounterOpts{
Name: name,
Help: help,
})

observationCtx.Registerer.MustRegister(counter)
return counter
}

numPathCountInputsRowsProcessed := counter(
"src_codeintel_ranking_path_count_inputs_rows_processed_total",
"The number of input row records merged into document scores for a single repo.",
)

numPathRanksInserted := counter(
"src_codeintel_ranking_path_ranks_inserted_total",
"The number of path ranks inserted and merged in for a single repo.",
)

return &rankingOperations{
numPathCountsInputsRowsProcessed: numPathCountInputsRowsProcessed,
numPathRanksInserted: numPathRanksInserted,
}
}
2 changes: 2 additions & 0 deletions enterprise/internal/codeintel/uploads/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type operations struct {
// Ranking
exportRankingGraph *observation.Operation
mapRankingGraph *observation.Operation
reduceRankingGraph *observation.Operation

numUploadsRead prometheus.Counter
numBytesUploaded prometheus.Counter
Expand Down Expand Up @@ -183,6 +184,7 @@ func newOperations(observationCtx *observation.Context) *operations {
// Ranking
exportRankingGraph: op("ExportRankingGraph"),
mapRankingGraph: op("MapRankingGraph"),
reduceRankingGraph: op("ReduceRankingGraph"),

numUploadsRead: numUploadsRead,
numBytesUploaded: numBytesUploaded,
Expand Down
159 changes: 16 additions & 143 deletions enterprise/internal/codeintel/uploads/ranking.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,8 @@
package uploads

import (
"bufio"
"context"
"fmt"
"io"
"path/filepath"
"strings"
"time"

"cloud.google.com/go/storage"
"github.com/sourcegraph/log"
Expand All @@ -17,7 +12,6 @@ import (
"github.com/sourcegraph/sourcegraph/enterprise/internal/codeintel/uploads/internal/store"
"github.com/sourcegraph/sourcegraph/enterprise/internal/codeintel/uploads/shared"
"github.com/sourcegraph/sourcegraph/internal/observation"
"github.com/sourcegraph/sourcegraph/lib/errors"
"github.com/sourcegraph/sourcegraph/lib/group"
)

Expand Down Expand Up @@ -195,147 +189,26 @@ func (s *Service) MapRankingGraph(ctx context.Context, numRankingRoutines int, r
return nil
}

const maxBytesPerObject = 1024 * 1024 * 1024 // 1GB

func (s *Service) serializeAndPersistRankingGraphForUpload(
func (s *Service) ReduceRankingGraph(
ctx context.Context,
id int,
repo string,
root string,
objectPrefix string,
) (err error) {
writers := map[string]*gcsObjectWriter{}
defer func() {
for _, wc := range writers {
if closeErr := wc.Close(); closeErr != nil {
err = errors.Append(err, closeErr)
}
}
}()

return s.serializeRankingGraphForUpload(ctx, id, repo, root, func(filename string, format string, args ...any) error {
path := fmt.Sprintf("%s/%s", objectPrefix, filename)

ow, ok := writers[path]
if !ok {
handle := s.rankingBucket.Object(path)
if err := handle.Delete(ctx); err != nil && err != storage.ErrObjectNotExist {
return err
}

wc := handle.NewWriter(ctx)
ow = &gcsObjectWriter{
Writer: bufio.NewWriter(wc),
c: wc,
written: 0,
}
writers[path] = ow
}

if n, err := io.Copy(ow, strings.NewReader(fmt.Sprintf(format, args...))); err != nil {
return err
} else {
ow.written += n
s.operations.numBytesUploaded.Add(float64(n))

if ow.written > maxBytesPerObject {
return errors.Newf("CSV output exceeds max bytes (%d)", maxBytesPerObject)
}
}

return nil
})
}

type gcsObjectWriter struct {
*bufio.Writer
c io.Closer
written int64
}

func (b *gcsObjectWriter) Close() error {
return errors.Append(b.Flush(), b.c.Close())
}

func (s *Service) serializeRankingGraphForUpload(
ctx context.Context,
id int,
repo string,
root string,
write func(filename string, format string, args ...any) error,
) error {
documentsDefiningSymbols := map[string][]int64{}
documentsReferencingSymbols := map[string][]int64{}

if err := s.lsifstore.ScanDocuments(ctx, id, func(path string, document *scip.Document) error {
documentID := hash(strings.Join([]string{repo, root, path}, ":"))
documentMonikers := map[string]map[string]struct{}{}

for _, occurrence := range document.Occurrences {
if occurrence.Symbol == "" || scip.IsLocalSymbol(occurrence.Symbol) {
continue
}

if scip.SymbolRole_Definition.Matches(occurrence) {
if _, ok := documentMonikers[occurrence.Symbol]; !ok {
documentMonikers[occurrence.Symbol] = map[string]struct{}{}
}
documentMonikers[occurrence.Symbol]["definition"] = struct{}{}
documentsDefiningSymbols[occurrence.Symbol] = append(documentsDefiningSymbols[occurrence.Symbol], documentID)
} else {
if _, ok := documentMonikers[occurrence.Symbol]; !ok {
documentMonikers[occurrence.Symbol] = map[string]struct{}{}
}
documentMonikers[occurrence.Symbol]["reference"] = struct{}{}
documentsReferencingSymbols[occurrence.Symbol] = append(documentsReferencingSymbols[occurrence.Symbol], documentID)
}
}

if err := write("documents.csv", "%d,%s,%s\n", documentID, repo, filepath.Join(root, path)); err != nil {
return err
}

for identifier, monikerTypes := range documentMonikers {
for monikerType := range monikerTypes {
if err := write("monikers.csv", "%d,%s,%s:%s\n", documentID, monikerType, "scip", identifier); err != nil {
return err
}
}
}

return nil
}); err != nil {
return err
}

for symbolName, referencingDocumentIDs := range documentsReferencingSymbols {
for _, definingDocumentID := range documentsDefiningSymbols[symbolName] {
for _, referencingDocumentID := range referencingDocumentIDs {
if referencingDocumentID == definingDocumentID {
continue
}

if err := write("references.csv", "%d,%d\n", referencingDocumentID, definingDocumentID); err != nil {
return err
}
}
}
}
numRankingRoutines int,
rankingJobEnabled bool,
) (numPathRanksInserted float64, numPathCountInputsProcessed float64, err error) {
ctx, _, endObservation := s.operations.reduceRankingGraph.With(ctx, &err, observation.Args{})
defer endObservation(1, observation.Args{})

if err := write("done", "%s\n", time.Now().Format(time.RFC3339)); err != nil {
return err
if !rankingJobEnabled {
return 0, 0, nil
}

return nil
}

func hash(v string) (h int64) {
if len(v) == 0 {
return 0
}
for _, r := range v {
h = 31*h + int64(r)
numPathRanksInserted, numPathCountInputsProcessed, err = s.store.InsertPathRanks(
ctx,
rankingGraphKey,
rankingMapReduceBatchSize,
)
if err != nil {
return numPathCountInputsProcessed, numPathCountInputsProcessed, err
}

return h
return numPathRanksInserted, numPathCountInputsProcessed, nil
}

0 comments on commit a4098c5

Please sign in to comment.