Skip to content

Commit

Permalink
[BEAM-10052] check hash and avoid duplicated artifacts
Browse files Browse the repository at this point in the history
  • Loading branch information
ihji committed May 21, 2020
1 parent ffd74b0 commit b8e582f
Showing 1 changed file with 11 additions and 1 deletion.
12 changes: 11 additions & 1 deletion sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,7 @@ def _stage_resources(self, pipeline, options):
raise RuntimeError('The --temp_location option must be specified.')

resources = []
hashs = {}
for _, env in sorted(pipeline.components.environments.items(),
key=lambda kv: kv[0]):
for dep in env.dependencies:
Expand All @@ -595,7 +596,16 @@ def _stage_resources(self, pipeline, options):
role_payload = (
beam_runner_api_pb2.ArtifactStagingToRolePayload.FromString(
dep.role_payload))
resources.append((type_payload.path, role_payload.staged_name))
if type_payload.sha256 and type_payload.sha256 in hashs:
_LOGGER.info(
'Found duplicated artifact: %s (%s)',
type_payload.path,
type_payload.sha256)
dep.role_payload = beam_runner_api_pb2.ArtifactStagingToRolePayload(
staged_name=hashs[type_payload.sha256]).SerializeToString()
else:
resources.append((type_payload.path, role_payload.staged_name))
hashs[type_payload.sha256] = role_payload.staged_name

resource_stager = _LegacyDataflowStager(self)
staged_resources = resource_stager.stage_job_resources(
Expand Down

0 comments on commit b8e582f

Please sign in to comment.