Skip to content

Commit

Permalink
[rank] adds background job that maps and counts references to definit…
Browse files Browse the repository at this point in the history
…ions (sourcegraph#48004)

Depends on sourcegraph#48002,
sourcegraph#47997

Adds background job that maps references to definitions and counts
references per path.

## Test plan
Tested manually.
<!-- All pull requests REQUIRE a test plan:
https://docs.sourcegraph.com/dev/background-information/testing_principles
-->
  • Loading branch information
cesrjimenez committed Feb 22, 2023
1 parent d280cca commit c70e9f8
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 0 deletions.
8 changes: 8 additions & 0 deletions enterprise/internal/codeintel/uploads/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func NewService(

var (
bucketName = env.Get("CODEINTEL_UPLOADS_RANKING_BUCKET", "lsif-pagerank-experiments", "The GCS bucket.")
rankingMapReduceBatchSize = env.MustGetInt("CODEINTEL_UPLOADS_MAP_REDUCE_RANKING_BATCH_SIZE", 10000, "How many references, definitions, and path counts to map and reduce at once.")
rankingGraphKey = env.Get("CODEINTEL_UPLOADS_RANKING_GRAPH_KEY", "dev", "An identifier of the graph export. Change to start a new export in the configured bucket.")
rankingGraphBatchSize = env.MustGetInt("CODEINTEL_UPLOADS_RANKING_GRAPH_BATCH_SIZE", 16, "How many uploads to process at once.")
rankingGraphDeleteBatchSize = env.MustGetInt("CODEINTEL_UPLOADS_RANKING_GRAPH_DELETE_BATCH_SIZE", 32, "How many stale uploads to delete at once.")
Expand Down Expand Up @@ -208,5 +209,12 @@ func NewGraphExporters(observationCtx *observation.Context, uploadSvc *Service)
ConfigExportInst.RankingBatchSize,
ConfigExportInst.RankingJobsEnabled,
),
background.NewRankingGraphMapper(
observationCtx,
uploadSvc,
ConfigExportInst.NumRankingRoutines,
ConfigExportInst.RankingInterval,
ConfigExportInst.RankingJobsEnabled,
),
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

type UploadService interface {
ExportRankingGraph(ctx context.Context, numRankingRoutines int, numBatchSize int, rankingJobEnabled bool) error
MapRankingGraph(ctx context.Context, numRankingRoutines int, rankingJobEnabled bool) error
VacuumRankingGraph(ctx context.Context) error
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,22 @@ func NewRankingGraphExporter(
return nil
}))
}

func NewRankingGraphMapper(
observationCtx *observation.Context,
uploadsService UploadService,
numRankingRoutines int,
interval time.Duration,
rankingJobEnabled bool,
) goroutine.BackgroundRoutine {
return goroutine.NewPeriodicGoroutine(
context.Background(),
"rank.graph-mapper", "maps definitions and references data to path_counts_inputs table in store",
interval,
goroutine.HandlerFunc(func(ctx context.Context) error {
if err := uploadsService.MapRankingGraph(ctx, numRankingRoutines, rankingJobEnabled); err != nil {
return err
}
return nil
}))
}
2 changes: 2 additions & 0 deletions enterprise/internal/codeintel/uploads/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type operations struct {

// Ranking
exportRankingGraph *observation.Operation
mapRankingGraph *observation.Operation

numUploadsRead prometheus.Counter
numBytesUploaded prometheus.Counter
Expand Down Expand Up @@ -181,6 +182,7 @@ func newOperations(observationCtx *observation.Context) *operations {

// Ranking
exportRankingGraph: op("ExportRankingGraph"),
mapRankingGraph: op("MapRankingGraph"),

numUploadsRead: numUploadsRead,
numBytesUploaded: numBytesUploaded,
Expand Down
15 changes: 15 additions & 0 deletions enterprise/internal/codeintel/uploads/ranking.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,21 @@ func (s *Service) VacuumRankingGraph(ctx context.Context) error {
return nil
}

func (s *Service) MapRankingGraph(ctx context.Context, numRankingRoutines int, rankingJobEnabled bool) (err error) {
ctx, _, endObservation := s.operations.mapRankingGraph.With(ctx, &err, observation.Args{})
defer endObservation(1, observation.Args{})

if !rankingJobEnabled {
return nil
}

if err := s.store.InsertPathCountInputs(ctx, rankingGraphKey, rankingMapReduceBatchSize); err != nil {
return err
}

return nil
}

const maxBytesPerObject = 1024 * 1024 * 1024 // 1GB

func (s *Service) serializeAndPersistRankingGraphForUpload(
Expand Down

0 comments on commit c70e9f8

Please sign in to comment.