Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
25307: release-2.0: importccl: check node health and compatibility during IMPORT planning r=mjibson a=mjibson

Backport 1/1 commits from cockroachdb#25162.

/cc @cockroachdb/release

---

Simplify the LoadCSV signature by taking just a PlanHookState for any
argument that can be fetched from it. Determine the node list using
this new health check function. We can remove the rand.Shuffle call
because the map iteration should produce some level of randomness.

Fixes cockroachdb#12876

Release note (bug fix): Fix problems with imports sometimes failing
after node decommissioning.


Co-authored-by: Matt Jibson <[email protected]>
  • Loading branch information
craig[bot] and maddyblue committed May 14, 2018
2 parents 8b1dab0 + c0277cd commit fa2b2bf
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 44 deletions.
17 changes: 2 additions & 15 deletions pkg/ccl/importccl/csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"

"github.com/cockroachdb/cockroach/pkg/sql/distsqlrun"
"github.com/cockroachdb/cockroach/pkg/sql/jobs"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
Expand Down Expand Up @@ -1157,16 +1157,6 @@ func doDistributedCSVTransform(
) error {
evalCtx := p.ExtendedEvalContext()

// TODO(dan): Filter out unhealthy nodes.
resp, err := p.ExecCfg().StatusServer.Nodes(ctx, &serverpb.NodesRequest{})
if err != nil {
return err
}
var nodes []roachpb.NodeDescriptor
for _, node := range resp.Nodes {
nodes = append(nodes, node.Desc)
}

ci := sqlbase.ColTypeInfoFromColTypes([]sqlbase.ColumnType{
{SemanticType: sqlbase.ColumnType_STRING},
{SemanticType: sqlbase.ColumnType_INT},
Expand All @@ -1183,11 +1173,8 @@ func doDistributedCSVTransform(

if err := p.DistLoader().LoadCSV(
ctx,
p,
job,
p.ExecCfg().DB,
evalCtx,
p.ExecCfg().NodeID.Get(),
nodes,
sql.NewRowResultWriter(rows),
tableDesc,
files,
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/csv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -939,7 +939,7 @@ func TestImportStmt(t *testing.T) {
)

data = ",5,e,,,"
if _, err := conn.Exec(query, srv.URL); !testutils.IsError(err, `parse "a" as INT: could not parse ""`) {
if _, err := conn.Exec(query, srv.URL); !testutils.IsError(err, `could not parse "" as type int`) {
t.Fatalf("unexpected: %v", err)
}
if _, err := conn.Exec(query+nullif, srv.URL); !testutils.IsError(err, `"a" violates not-null constraint`) {
Expand Down
34 changes: 22 additions & 12 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -878,23 +878,33 @@ func (dsp *DistSQLPlanner) getNodeIDForScan(
if err != nil {
return 0, err
}
nodeID := replInfo.NodeID

// Check node health and compatibility.
addr := replInfo.NodeDesc.Address.String()
if err := dsp.checkNodeHealth(planCtx.ctx, nodeID, addr); err != nil {
log.Eventf(planCtx.ctx, "not planning on node %d. unhealthy", nodeID)
return dsp.nodeDesc.NodeID, nil
nodeID := replInfo.NodeDesc.NodeID
if err := dsp.checkNodeHealthAndVersion(planCtx, replInfo.NodeDesc); err != nil {
log.Eventf(planCtx.ctx, "not planning on node %d. %v", nodeID, err)
}
if !dsp.nodeVersionIsCompatible(nodeID, dsp.planVersion) {
log.Eventf(planCtx.ctx, "not planning on node %d. incompatible version", nodeID)
return dsp.nodeDesc.NodeID, nil
}

planCtx.nodeAddresses[nodeID] = addr
return nodeID, nil
}

// checkNodeHealthAndVersion adds the node to planCtx if it is healthy and
// has a compatible version. An error is returned otherwise.
func (dsp *DistSQLPlanner) checkNodeHealthAndVersion(
planCtx *planningCtx, desc *roachpb.NodeDescriptor,
) error {
nodeID := desc.NodeID
addr := desc.Address.String()
var err error

if err = dsp.checkNodeHealth(planCtx.ctx, nodeID, addr); err != nil {
err = errors.New("unhealthy")
} else if !dsp.nodeVersionIsCompatible(nodeID, dsp.planVersion) {
err = errors.New("incompatible version")
} else {
planCtx.nodeAddresses[nodeID] = addr
}
return err
}

// createTableReaders generates a plan consisting of table reader processors,
// one for each node that has spans that we are reading.
// overridesResultColumns is optional.
Expand Down
52 changes: 36 additions & 16 deletions pkg/sql/distsql_plan_csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ import (
"context"
"fmt"
"math"
"math/rand"

"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlplan"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlrun"
"github.com/cockroachdb/cockroach/pkg/sql/jobs"
Expand Down Expand Up @@ -112,11 +114,8 @@ var colTypeBytes = sqlbase.ColumnType{SemanticType: sqlbase.ColumnType_BYTES}
// and stores them in enterprise backup format at to.
func (l *DistLoader) LoadCSV(
ctx context.Context,
phs PlanHookState,
job *jobs.Job,
db *client.DB,
evalCtx *extendedEvalContext,
thisNode roachpb.NodeID,
nodes []roachpb.NodeDescriptor,
resultRows *RowResultWriter,
tableDesc *sqlbase.TableDescriptor,
from []string,
Expand All @@ -128,6 +127,33 @@ func (l *DistLoader) LoadCSV(
) error {
ctx = log.WithLogTag(ctx, "import-distsql", nil)

dsp := l.distSQLPlanner
evalCtx := phs.ExtendedEvalContext()
planCtx := dsp.newPlanningCtx(ctx, evalCtx, nil /* txn */)

resp, err := phs.ExecCfg().StatusServer.Nodes(ctx, &serverpb.NodesRequest{})
if err != nil {
return err
}
// Because we're not going through the normal pathways, we have to set up
// the nodeID -> nodeAddress map ourselves.
for _, node := range resp.Nodes {
if err := dsp.checkNodeHealthAndVersion(&planCtx, &node.Desc); err != nil {
continue
}
}
nodes := make([]roachpb.NodeID, 0, len(planCtx.nodeAddresses))
for nodeID := range planCtx.nodeAddresses {
nodes = append(nodes, nodeID)
}
// Shuffle node order so that multiple IMPORTs done in parallel will not
// identically schedule CSV reading. For example, if there are 3 nodes and 4
// files, the first node will get 2 files while the other nodes will each get 1
// file. Shuffling will make that first node random instead of always the same.
rand.Shuffle(len(nodes), func(i, j int) {
nodes[i], nodes[j] = nodes[j], nodes[i]
})

// Setup common to both stages.

// For each input file, assign it to a node.
Expand Down Expand Up @@ -168,12 +194,8 @@ func (l *DistLoader) LoadCSV(
}
}

planCtx := l.distSQLPlanner.newPlanningCtx(ctx, evalCtx, nil /* txn */)
// Because we're not going through the normal pathways, we have to set up
// the nodeID -> nodeAddress map ourselves.
for _, node := range nodes {
planCtx.nodeAddresses[node.NodeID] = node.Address.String()
}
db := phs.ExecCfg().DB
thisNode := phs.ExecCfg().NodeID.Get()

// Determine if we need to run the sampling plan or not.

Expand Down Expand Up @@ -239,9 +261,8 @@ func (l *DistLoader) LoadCSV(
// We can reuse the phase 1 ReadCSV specs, just have to clear sampling.
for i, rcs := range csvSpecs {
rcs.SampleSize = 0
node := nodes[i]
proc := distsqlplan.Processor{
Node: node.NodeID,
Node: nodes[i],
Spec: distsqlrun.ProcessorSpec{
Core: distsqlrun.ProcessorCoreUnion{ReadCSV: rcs},
Output: []distsqlrun.OutputRouterSpec{{
Expand Down Expand Up @@ -280,7 +301,7 @@ func (l *DistLoader) LoadCSV(
Contribution: float32(len(swSpec.Spans)) / float32(len(spans)),
}
proc := distsqlplan.Processor{
Node: node.NodeID,
Node: node,
Spec: distsqlrun.ProcessorSpec{
Input: []distsqlrun.InputSyncSpec{{
ColumnTypes: firstStageTypes,
Expand Down Expand Up @@ -343,7 +364,7 @@ func (l *DistLoader) loadCSVSamplingPlan(
db *client.DB,
evalCtx *extendedEvalContext,
thisNode roachpb.NodeID,
nodes []roachpb.NodeDescriptor,
nodes []roachpb.NodeID,
from []string,
splitSize int64,
planCtx *planningCtx,
Expand Down Expand Up @@ -377,9 +398,8 @@ func (l *DistLoader) loadCSVSamplingPlan(

p.ResultRouters = make([]distsqlplan.ProcessorIdx, len(csvSpecs))
for i, rcs := range csvSpecs {
node := nodes[i]
proc := distsqlplan.Processor{
Node: node.NodeID,
Node: nodes[i],
Spec: distsqlrun.ProcessorSpec{
Core: distsqlrun.ProcessorCoreUnion{ReadCSV: rcs},
Output: []distsqlrun.OutputRouterSpec{{Type: distsqlrun.OutputRouterSpec_PASS_THROUGH}},
Expand Down

0 comments on commit fa2b2bf

Please sign in to comment.