Skip to content

Commit

Permalink
bulker: better stat stages definition
Browse files Browse the repository at this point in the history
  • Loading branch information
absorbb committed Jun 28, 2024
1 parent 2ac6880 commit d5746da
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 5 deletions.
4 changes: 2 additions & 2 deletions bulkerlib/implementations/sql/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (bq *BigQuery) CopyTables(ctx context.Context, targetTable *Table, sourceTa
insertFromSelectStatement := fmt.Sprintf(bigqueryInsertFromSelectTemplate, bq.fullTableName(targetTable.Name), columnsString, columnsString, bq.fullTableName(sourceTable.Name))
query := bq.client.Query(insertFromSelectStatement)
_, state2, err := bq.RunJob(ctx, query, fmt.Sprintf("copy data from '%s' to '%s'", sourceTable.Name, targetTable.Name))
state2.Name = "insert from select"
state2.Name = "insert_from_select"
state.Merge(state2)
return state, err
} else {
Expand Down Expand Up @@ -257,7 +257,7 @@ func (bq *BigQuery) CopyTables(ctx context.Context, targetTable *Table, sourceTa

query := bq.client.Query(insertFromSelectStatement)
_, state, err = bq.RunJob(ctx, query, fmt.Sprintf("copy data from '%s' to '%s'", sourceTable.Name, targetTable.Name))
state.Name = "insert from select"
state.Name = "merge"
return state, err
}
}
Expand Down
11 changes: 8 additions & 3 deletions bulkerlib/implementations/sql/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,11 @@ func (s *Snowflake) LoadTable(ctx context.Context, targetTable *Table, loadSourc
Statement: putStatement,
})
}
state = bulker.WarehouseState{
Name: "stage_put",
TimeProcessedMs: time.Since(startTime).Milliseconds(),
}
startTime = time.Now()
defer func() {
removeStatement := fmt.Sprintf("REMOVE @~/%s", path.Base(loadSource.Path))
if _, err2 := s.txOrDb(ctx).ExecContext(ctx, removeStatement); err2 != nil {
Expand All @@ -395,10 +400,10 @@ func (s *Snowflake) LoadTable(ctx context.Context, targetTable *Table, loadSourc
})
err = multierror.Append(err, err2)
}
state = bulker.WarehouseState{
Name: "copy_from_csv",
state.Merge(bulker.WarehouseState{
Name: "copy_from_stage",
TimeProcessedMs: time.Since(startTime).Milliseconds(),
}
})
}()
columnNames := targetTable.MappedColumnNames(s.quotedColumnName)
statement := fmt.Sprintf(sfCopyStatement, quotedTableName, strings.Join(columnNames, ","), path.Base(loadSource.Path))
Expand Down

0 comments on commit d5746da

Please sign in to comment.