Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: GRCPIO versions from 1.59.0 to 1.62.1 can cause Beam Python pipelines to get stuck #30867

Closed
2 of 16 tasks
DerRidda opened this issue Apr 5, 2024 · 27 comments · Fixed by #31044
Closed
2 of 16 tasks

Comments

@DerRidda
Copy link

DerRidda commented Apr 5, 2024

What happened?

A combination of software releases in Beam dependency chain has surfaced a failure mode, that might cause unexplained pipeline stuckness. The issue affects Apache Beam 2.55.0 and 2.55.1, but may potentially affect other SDKs when the pipeline runtime environment has

google-api-core version 2.17.0 or above, AND grpcio version in the following range 1.59.0<=grpcio<=1.62.1.

Symptoms

Beam pipelines might get stuck. Dataflow jobs might have errors like: Unable to retrieve status info from SDK harness

There are 10 consecutive failures obtaining SDK worker status info, SDK worker appears to be permanently unresponsive. Aborting the SDK.

Mitigation

Upgrade to Apache Beam 2.56.0 or above once available, until then: install any of the following dependency combinations in the Beam pipeline runtime environment

  • upgrade grpcio to version 1.62.2 or above OR
  • downgrade grpcio and grpcio-status to 1.58.0 or below. OR
  • downgrade google-api-core to version 2.16.2 or below

You can define dependencies in the pipeline runtime environment using a --requirements_file pipeline option or other options outlined in https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/.

Users of Apache Beam 2.55.0 might be able to avoid the issue by downgrading to apache-beam==2.54.0, since the default containers for the runtime environment has the set of dependencies that does not trigger the bug.

Rootcause

The issue was caused by a regression in grpcio==1.59.0 grpc/grpc#36265, which has been now fixed in grpcio==1.62.2 and above. The regression triggered the failure mode when used with google-api-core==2.17.0 and above.

Description updated: 2023-04-23.

Original description:

Update of the Python grpcio dependency to version 1.62.1 caused Dataflow job stalling, with excessive waits for responses in GRCP Multi-threaded rendezvous probably somewhere in SDK worker. Upstream issue exists here: grpc/grpc#36256

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@DerRidda DerRidda changed the title [Bug]: [Bug]: GRCPIO 1.62.1 deadlocks in Beam SDK worker with Dataflow Apr 5, 2024
@liferoad
Copy link
Collaborator

liferoad commented Apr 5, 2024

@tvalentyn FYI.

@tvalentyn
Copy link
Contributor

tvalentyn commented Apr 5, 2024

Thanks.

(py38) :python$ docker run --rm -it --entrypoint=/bin/sh apache/beam_python3.10_sdk:2.53.0  -c "pip list | grep grpcio" 
grpcio                          1.59.3
grpcio-status                   1.59.3
(py38) :python$ docker run --rm -it --entrypoint=/bin/sh apache/beam_python3.10_sdk:2.54.0  -c "pip list | grep grpcio" 
grpcio                          1.60.0
grpcio-status                   1.60.0
(py38) :python$ docker run --rm -it --entrypoint=/bin/sh apache/beam_python3.10_sdk:2.55.0  -c "pip list | grep grpcio" 
grpcio                          1.62.0
grpcio-status                   1.62.0
(py38) :python$ docker run --rm -it --entrypoint=/bin/sh apache/beam_python3.10_sdk:2.55.1  -c "pip list | grep grpcio" 
grpcio                          1.62.0
grpcio-status                   1.62.0

I could determine that at least version 1.60.0 doesn't exhibit this issue.

Double checking, is this still the case?

@tvalentyn tvalentyn added this to the 2.56.0 Release milestone Apr 5, 2024
@tvalentyn
Copy link
Contributor

cc: @damccorm

@DerRidda
Copy link
Author

DerRidda commented Apr 5, 2024

Thanks.

(py38) :python$ docker run --rm -it --entrypoint=/bin/sh apache/beam_python3.10_sdk:2.53.0  -c "pip list | grep grpcio" 
grpcio                          1.59.3
grpcio-status                   1.59.3
(py38) :python$ docker run --rm -it --entrypoint=/bin/sh apache/beam_python3.10_sdk:2.54.0  -c "pip list | grep grpcio" 
grpcio                          1.60.0
grpcio-status                   1.60.0
(py38) :python$ docker run --rm -it --entrypoint=/bin/sh apache/beam_python3.10_sdk:2.55.0  -c "pip list | grep grpcio" 
grpcio                          1.62.0
grpcio-status                   1.62.0
(py38) :python$ docker run --rm -it --entrypoint=/bin/sh apache/beam_python3.10_sdk:2.55.1  -c "pip list | grep grpcio" 
grpcio                          1.62.0
grpcio-status                   1.62.0

I could determine that at least version 1.60.0 doesn't exhibit this issue.

Double checking, is this still the case?

Actually still happens on 1.60.0 for me. It just looked fixed for a brief moment.
The known good version of my pipeline probably uses 1.59.3 as I changed the SDK container image to be fully custom as well in the update so even though the project local deps were showing 1.60.0 it never got installed into the image of the known good service. I will investigate more on Monday.

@tvalentyn
Copy link
Contributor

Ok. Thank you very much for reporting the issue, please let us know if you have more information, that might also help grpcio maintainers.

@DerRidda
Copy link
Author

DerRidda commented Apr 9, 2024

@tvalentyn Some further investigation might point at a combination of the protobuf library version with grpcio: grpc/grpc#36256 (comment)

@tvalentyn
Copy link
Contributor

Can you share Dataflow Job IDs where you've seen this error?

@tvalentyn
Copy link
Contributor

also what are the exact errors you are seeing?

@DerRidda
Copy link
Author

@tvalentyn Job ID: 2024-04-03_08_14_02-12227946365357908481 in europe-west4
I saw failures to obtain SDK worker status which might actually be the symptom of #30679 but when I manually applied the fix for that I still saw completely stuck workers, sometimes without seeing any feedback from the Dataflow backend, just nothing happening. Maybe occasionally an SDK worker went away again without much for me to go on in terms of the immediately visible Dataflow worker logs or parsing through Logs Explorer more in depth.

It also seems to manifest in the Google Cloud Console Dataflow Job viewer UI locking up in the browser until the browser considers tab unresponsive while a fixed job stays responsive..

@tvalentyn
Copy link
Contributor

tvalentyn commented Apr 10, 2024

I am observing the pattern that the jobs you start with Beam 2.55.0 SDK have many errors like Unable to retrieve status info from SDK harness. This is definitely concerning. The message means that Beam SDK process running inside a container, aka SDK harness, becomes unresponsive to SDK Status requests from the runner; eventually, the runner terminates the unresponsive SDK, and worker might restart.

These errors appear fairly early in pipeline execution.

Dataflow workers serve the SDK status page on localhost:8081/sdk_status, and it can be queried manually via: gcloud compute ssh --zone "xx-somezone-z" "some-dataflow-gce-worker-01300848-wqox-harness-bvf7" --project "some-project-id" --command "curl localhost:8081/sdk_status".

Would it be possible to take a closer look at the differences between 2.55.0 and 2.53.0 setup that you have to narrow down the exact change that increases instances of these errors? For example: upgrading/downgrading a dependency X and doing nothing else increases/decreases instances of this error. I'll also try to repro this issue myself.

It also seems to manifest in the Google Cloud Console Dataflow Job viewer UI locking up in the browser until the browser considers tab unresponsive while a fixed job stays responsive..

That is likely an unrelated UI issue.

@tvalentyn
Copy link
Contributor

one other thing I would try: use Beam 2.55.0 setup but downgrade protobuf to "protbuf==3.20.3"

@tvalentyn
Copy link
Contributor

tvalentyn commented Apr 10, 2024

I see an instance of "Unable to retrieve status info from SDK harness" integration test happens in executions of some beam tests, for example apache_beam.examples.cookbook.bigtableio_it_test.BigtableIOWriteTest , it might be reproducible in other beam tests as well; we should be able to use these for a repro.

@tvalentyn tvalentyn added P1 and removed P2 labels Apr 10, 2024
@tvalentyn
Copy link
Contributor

I was able to repro the issue with a couple of execution of an integration test:

python -m pytest  -o log_cli=True -o log_level=Info apache_beam/examples/cookbook/bigtableio_it_test.py::BigtableIOWriteTest::test_bigtable_write   --test-pipeline-options='--runner=TestDataflowRunner --project=apache-beam-testing --temp_location=valentyn-testing  --region=us-central1 --wait_until_finish_duration=36000000' --timeout=36000

@tvalentyn
Copy link
Contributor

tvalentyn commented Apr 10, 2024

follow up investigation:

ssh into the VM:

gcloud compute ssh --zone "us-central1-b" "beamapp-valentyn-04101932-04101232-3i3x-harness-3sqg" --project "apache-beam-testing"
curl localhost:8081/sdk_status 

is unresponsive.

However, we can ssh into the docker container and use pystack:

docker ps 

# look for beam sdk container that has 'python' in its name, note its hash, then log into the running container:

CONTAINER ID   IMAGE                                                                                                  COMMAND                  CREATED          STATUS          PORTS     NAMES
6577f349f06d   us-central1-artifactregistry.gcr.io/google.com/dataflow-containers/worker/v1beta3/beam_python3.8_sdk   "/opt/apache/beam/bo…"   9 minutes ago    Up 8 minutes              k8s_sdk-0-0_df-beamapp-valentyn-04122309-04121609-dnrj-harness-tbzw_default_714e0e8f66081ac27ef158d49706b5bc_0

...



CONT_ID=`docker ps | grep python | awk '{print $1}'` ; docker exec --privileged  -it $CONT_ID /bin/bash

pip install pystack
ps -A

root@beamapp-valentyn-04101932-04101232-3i3x-harness-3sqg:/# ps -A
    PID TTY          TIME CMD
      1 ?        00:00:00 boot
     29 ?        00:03:02 python
     30 ?        00:02:46 python
    114 pts/0    00:00:00 bash
    125 pts/0    00:00:00 ps

pystack remote 29

Which reveals:

docker exec --privileged -it 1142b2c7eb3c /bin/bash
root@beamapp-valentyn-04101932-04101232-3i3x-harness-3sqg:/# pystack remote 29
Traceback for thread 100 (python) [Has the GIL] (most recent call last):
    (Python) File "/usr/local/lib/python3.8/threading.py", line 890, in _bootstrap
        self._bootstrap_inner()
    (Python) File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
        self.run()
    (Python) File "/usr/local/lib/python3.8/threading.py", line 870, in run
        self._target(*self._args, **self._kwargs)
    (Python) File "/usr/local/lib/python3.8/concurrent/futures/thread.py", line 80, in _worker
        work_item.run()
    (Python) File "/usr/local/lib/python3.8/concurrent/futures/thread.py", line 57, in run
        result = self.fn(*self.args, **self.kwargs)
    (Python) File "/usr/local/lib/python3.8/site-packages/google/cloud/bigtable/batcher.py", line 385, in _flush_rows
        response = self.table.mutate_rows(rows_to_flush)
    (Python) File "/usr/local/lib/python3.8/site-packages/google/cloud/bigtable/table.py", line 724, in mutate_rows
        self.name,
    (Python) File "/usr/local/lib/python3.8/site-packages/google/cloud/bigtable/table.py", line 160, in name
        table_client = self._instance._client.table_data_client
    (Python) File "/usr/local/lib/python3.8/site-packages/google/cloud/bigtable/client.py", line 332, in table_data_client
        transport = self._create_gapic_client_channel(
    (Python) File "/usr/local/lib/python3.8/site-packages/google/cloud/bigtable/client.py", line 285, in _create_gapic_client_channel
        channel = grpc_transport.create_channel(
    (Python) File "/usr/local/lib/python3.8/site-packages/google/cloud/bigtable_v2/services/bigtable/transports/grpc.py", line 217, in create_channel
        return grpc_helpers.create_channel(
    (Python) File "/usr/local/lib/python3.8/site-packages/google/api_core/grpc_helpers.py", line 386, in create_channel
        return grpc.secure_channel(
    (Python) File "/usr/local/lib/python3.8/site-packages/grpc/__init__.py", line 2119, in secure_channel
        return _channel.Channel(
    (Python) File "/usr/local/lib/python3.8/site-packages/grpc/_channel.py", line 2046, in __init__
        self._channel = cygrpc.Channel(

Traceback for thread 99 (python) [] (most recent call last):
    (Python) File "/usr/local/lib/python3.8/threading.py", line 890, in _bootstrap
        self._bootstrap_inner()
    (Python) File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
        self.run()
    (Python) File "/usr/local/lib/python3.8/threading.py", line 870, in run
        self._target(*self._args, **self._kwargs)
    (Python) File "/usr/local/lib/python3.8/concurrent/futures/thread.py", line 80, in _worker
        work_item.run()
    (Python) File "/usr/local/lib/python3.8/concurrent/futures/thread.py", line 57, in run
        result = self.fn(*self.args, **self.kwargs)
    (Python) File "/usr/local/lib/python3.8/site-packages/google/cloud/bigtable/batcher.py", line 385, in _flush_rows
        response = self.table.mutate_rows(rows_to_flush)
    (Python) File "/usr/local/lib/python3.8/site-packages/google/cloud/bigtable/table.py", line 724, in mutate_rows
        self.name,
    (Python) File "/usr/local/lib/python3.8/site-packages/google/cloud/bigtable/table.py", line 160, in name
        table_client = self._instance._client.table_data_client
    (Python) File "/usr/local/lib/python3.8/site-packages/google/cloud/bigtable/client.py", line 332, in table_data_client
        transport = self._create_gapic_client_channel(
    (Python) File "/usr/local/lib/python3.8/site-packages/google/cloud/bigtable/client.py", line 285, in _create_gapic_client_channel
        channel = grpc_transport.create_channel(
    (Python) File "/usr/local/lib/python3.8/site-packages/google/cloud/bigtable_v2/services/bigtable/transports/grpc.py", line 217, in create_channel
        return grpc_helpers.create_channel(
    (Python) File "/usr/local/lib/python3.8/site-packages/google/api_core/grpc_helpers.py", line 386, in create_channel
        return grpc.secure_channel(
    (Python) File "/usr/local/lib/python3.8/site-packages/grpc/__init__.py", line 2119, in secure_channel
        return _channel.Channel(
    (Python) File "/usr/local/lib/python3.8/site-packages/grpc/_channel.py", line 2046, in __init__
        self._channel = cygrpc.Channel(
    (Python) File "/usr/local/lib/python3.8/pkgutil.py", line 638, in get_data
        return loader.get_data(resource_name)
    (Python) File "<frozen importlib._bootstrap_external>", line 1033, in get_data

Traceback for thread 98 (python) [] (most recent call last):
    (Python) File "/usr/local/lib/python3.8/threading.py", line 890, in _bootstrap
        self._bootstrap_inner()
    (Python) File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
        self.run()
    (Python) File "/usr/local/lib/python3.8/threading.py", line 1252, in run
        self.finished.wait(self.interval)
    (Python) File "/usr/local/lib/python3.8/threading.py", line 558, in wait
        signaled = self._cond.wait(timeout)
    (Python) File "/usr/local/lib/python3.8/threading.py", line 306, in wait
        gotit = waiter.acquire(True, timeout)

Traceback for thread 97 (python) [] (most recent call last):
    (Python) File "/usr/local/lib/python3.8/threading.py", line 890, in _bootstrap
        self._bootstrap_inner()
    (Python) File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
        self.run()
    (Python) File "/usr/local/lib/python3.8/threading.py", line 870, in run
        self._target(*self._args, **self._kwargs)

Traceback for thread 95 (python) [] (most recent call last):
    (Python) File "/usr/local/lib/python3.8/threading.py", line 890, in _bootstrap
        self._bootstrap_inner()
    (Python) File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
        self.run()
    (Python) File "/usr/local/lib/python3.8/threading.py", line 870, in run
        self._target(*self._args, **self._kwargs)
    (Python) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/data_plane.py", line 686, in <lambda>
        target=lambda: self._read_inputs(elements_iterator),
    (Python) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/data_plane.py", line 669, in _read_inputs
        for elements in elements_iterator:
    (Python) File "/usr/local/lib/python3.8/site-packages/grpc/_channel.py", line 542, in __next__
        return self._next()
    (Python) File "/usr/local/lib/python3.8/site-packages/grpc/_channel.py", line 959, in _next
        _common.wait(self._state.condition.wait, _response_ready)
    (Python) File "/usr/local/lib/python3.8/site-packages/grpc/_common.py", line 156, in wait
        _wait_once(wait_fn, MAXIMUM_WAIT_TIMEOUT, spin_cb)
    (Python) File "/usr/local/lib/python3.8/site-packages/grpc/_common.py", line 116, in _wait_once
        wait_fn(timeout=timeout)
    (Python) File "/usr/local/lib/python3.8/threading.py", line 306, in wait
        gotit = waiter.acquire(True, timeout)

Traceback for thread 94 (python) [] (most recent call last):
    (Python) File "/usr/local/lib/python3.8/threading.py", line 890, in _bootstrap
        self._bootstrap_inner()
    (Python) File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
        self.run()
    (Python) File "/usr/local/lib/python3.8/threading.py", line 870, in run
        self._target(*self._args, **self._kwargs)
    (Python) File "/usr/local/lib/python3.8/site-packages/grpc/_channel.py", line 275, in consume_request_iterator
        request = next(request_iterator)
    (Python) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/data_plane.py", line 608, in _write_outputs
        streams = [self._to_send.get()]
    (Python) File "/usr/local/lib/python3.8/queue.py", line 170, in get
        self.not_empty.wait()
    (Python) File "/usr/local/lib/python3.8/threading.py", line 302, in wait
        waiter.acquire()

Traceback for thread 93 (python) [] (most recent call last):
    (Python) File "/usr/local/lib/python3.8/threading.py", line 890, in _bootstrap
        self._bootstrap_inner()
    (Python) File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
        self.run()
    (Python) File "/usr/local/lib/python3.8/threading.py", line 870, in run
        self._target(*self._args, **self._kwargs)
    (Python) File "/usr/local/lib/python3.8/site-packages/grpc/_channel.py", line 1728, in channel_spin
        event = state.channel.next_call_event()

Traceback for thread 92 (python) [] (most recent call last):
    (Python) File "/usr/local/lib/python3.8/threading.py", line 890, in _bootstrap
        self._bootstrap_inner()
    (Python) File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
        self.run()
    (Python) File "/usr/local/lib/python3.8/threading.py", line 870, in run
        self._target(*self._args, **self._kwargs)
    (Python) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 1060, in pull_responses
        for response in responses:
    (Python) File "/usr/local/lib/python3.8/site-packages/grpc/_channel.py", line 542, in __next__
        return self._next()
    (Python) File "/usr/local/lib/python3.8/site-packages/grpc/_channel.py", line 959, in _next
        _common.wait(self._state.condition.wait, _response_ready)
    (Python) File "/usr/local/lib/python3.8/site-packages/grpc/_common.py", line 156, in wait
        _wait_once(wait_fn, MAXIMUM_WAIT_TIMEOUT, spin_cb)
    (Python) File "/usr/local/lib/python3.8/site-packages/grpc/_common.py", line 116, in _wait_once
        wait_fn(timeout=timeout)
    (Python) File "/usr/local/lib/python3.8/threading.py", line 306, in wait
        gotit = waiter.acquire(True, timeout)

Traceback for thread 91 (python) [] (most recent call last):
    (Python) File "/usr/local/lib/python3.8/threading.py", line 890, in _bootstrap
        self._bootstrap_inner()
    (Python) File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
        self.run()
    (Python) File "/usr/local/lib/python3.8/threading.py", line 870, in run
        self._target(*self._args, **self._kwargs)
    (Python) File "/usr/local/lib/python3.8/site-packages/grpc/_channel.py", line 275, in consume_request_iterator
        request = next(request_iterator)
    (Python) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 1050, in request_iter
        request = self._requests.get()
    (Python) File "/usr/local/lib/python3.8/queue.py", line 170, in get
        self.not_empty.wait()
    (Python) File "/usr/local/lib/python3.8/threading.py", line 302, in wait
        waiter.acquire()

Traceback for thread 90 (python) [] (most recent call last):
    (Python) File "/usr/local/lib/python3.8/threading.py", line 890, in _bootstrap
        self._bootstrap_inner()
    (Python) File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
        self.run()
    (Python) File "/usr/local/lib/python3.8/threading.py", line 870, in run
        self._target(*self._args, **self._kwargs)
    (Python) File "/usr/local/lib/python3.8/site-packages/grpc/_channel.py", line 1728, in channel_spin
        event = state.channel.next_call_event()

Traceback for thread 89 (python) [] (most recent call last):
    (Python) File "/usr/local/lib/python3.8/threading.py", line 890, in _bootstrap
        self._bootstrap_inner()
    (Python) File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
        self.run()
    (Python) File "/usr/local/lib/python3.8/site-packages/apache_beam/utils/thread_pool_executor.py", line 53, in run
        self._work_item.run()
    (Python) File "/usr/local/lib/python3.8/site-packages/apache_beam/utils/thread_pool_executor.py", line 37, in run
        self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
    (Python) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 385, in task
        self._execute(
    (Python) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 311, in _execute
        response = task()
    (Python) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 386, in <lambda>
        lambda: self.create_worker().do_instruction(request), request)
    (Python) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 650, in do_instruction
        return getattr(self, request_type)(
    (Python) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 688, in process_bundle
        bundle_processor.process_bundle(instruction_id))
    (Python) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1113, in process_bundle
        input_op_by_transform_id[element.transform_id].process_encoded(
    (Python) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 237, in process_encoded
        self.output(decoded_value)
    (Python) File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigtableio.py", line 156, in process
        self.batcher.mutate(row)
    (Python) File "/usr/local/lib/python3.8/site-packages/google/cloud/bigtable/batcher.py", line 266, in mutate
        self._flush_async()
    (Python) File "/usr/local/lib/python3.8/site-packages/google/cloud/bigtable/batcher.py", line 340, in _flush_async
        future = self._executor.submit(self._flush_rows, rows_to_flush)
    (Python) File "/usr/local/lib/python3.8/concurrent/futures/thread.py", line 188, in submit
        self._adjust_thread_count()
    (Python) File "/usr/local/lib/python3.8/concurrent/futures/thread.py", line 213, in _adjust_thread_count
        t.start()
    (Python) File "/usr/local/lib/python3.8/threading.py", line 857, in start
        self._started.wait()
    (Python) File "/usr/local/lib/python3.8/threading.py", line 558, in wait
        signaled = self._cond.wait(timeout)
    (Python) File "/usr/local/lib/python3.8/threading.py", line 302, in wait
        waiter.acquire()

Traceback for thread 88 (python) [] (most recent call last):
    (Python) File "/usr/local/lib/python3.8/threading.py", line 890, in _bootstrap
        self._bootstrap_inner()
    (Python) File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
        self.run()
    (Python) File "/usr/local/lib/python3.8/threading.py", line 870, in run
        self._target(*self._args, **self._kwargs)
    (Python) File "/usr/local/lib/python3.8/site-packages/grpc/_channel.py", line 275, in consume_request_iterator
        request = next(request_iterator)
    (Python) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 258, in get_responses
        response = self._responses.get()
    (Python) File "/usr/local/lib/python3.8/queue.py", line 170, in get
        self.not_empty.wait()
    (Python) File "/usr/local/lib/python3.8/threading.py", line 302, in wait
        waiter.acquire()

Traceback for thread 87 (python) [] (most recent call last):
    (Python) File "/usr/local/lib/python3.8/threading.py", line 890, in _bootstrap
        self._bootstrap_inner()
    (Python) File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
        self.run()
    (Python) File "/usr/local/lib/python3.8/threading.py", line 870, in run
        self._target(*self._args, **self._kwargs)
    (Python) File "/usr/local/lib/python3.8/site-packages/grpc/_channel.py", line 1728, in channel_spin
        event = state.channel.next_call_event()

Traceback for thread 83 (python) [] (most recent call last):
    (Python) File "/usr/local/lib/python3.8/threading.py", line 890, in _bootstrap
        self._bootstrap_inner()
    (Python) File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
        self.run()
    (Python) File "/usr/local/lib/python3.8/threading.py", line 870, in run
        self._target(*self._args, **self._kwargs)
    (Python) File "/usr/local/lib/python3.8/site-packages/grpc/_channel.py", line 275, in consume_request_iterator
        request = next(request_iterator)
    (Python) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/worker_status.py", line 190, in _get_responses
        response = self._responses.get()
    (Python) File "/usr/local/lib/python3.8/queue.py", line 170, in get
        self.not_empty.wait()
    (Python) File "/usr/local/lib/python3.8/threading.py", line 302, in wait
        waiter.acquire()

Traceback for thread 82 (python) [] (most recent call last):
    (Python) File "/usr/local/lib/python3.8/threading.py", line 890, in _bootstrap
        self._bootstrap_inner()
    (Python) File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
        self.run()
    (Python) File "/usr/local/lib/python3.8/threading.py", line 870, in run
        self._target(*self._args, **self._kwargs)
    (Python) File "/usr/local/lib/python3.8/site-packages/grpc/_channel.py", line 1728, in channel_spin
        event = state.channel.next_call_event()

Traceback for thread 81 (python) [] (most recent call last):
    (Python) File "/usr/local/lib/python3.8/threading.py", line 890, in _bootstrap
        self._bootstrap_inner()
    (Python) File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
        self.run()
    (Python) File "/usr/local/lib/python3.8/threading.py", line 870, in run
        self._target(*self._args, **self._kwargs)
    (Python) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/worker_status.py", line 182, in <lambda>
        target=lambda: self._log_lull_in_bundle_processor(
    (Python) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/worker_status.py", line 232, in _log_lull_in_bundle_processor
        time.sleep(2 * 60)

Traceback for thread 80 (python) [] (most recent call last):
    (Python) File "/usr/local/lib/python3.8/threading.py", line 890, in _bootstrap
        self._bootstrap_inner()
    (Python) File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
        self.run()
    (Python) File "/usr/local/lib/python3.8/threading.py", line 870, in run
        self._target(*self._args, **self._kwargs)
    (Python) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/worker_status.py", line 177, in <lambda>
        target=lambda: self._serve(), name='fn_api_status_handler')
    (Python) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/worker_status.py", line 198, in _serve
        for request in self._status_stub.WorkerStatus(self._get_responses()):
    (Python) File "/usr/local/lib/python3.8/site-packages/grpc/_channel.py", line 542, in __next__
        return self._next()
    (Python) File "/usr/local/lib/python3.8/site-packages/grpc/_channel.py", line 959, in _next
        _common.wait(self._state.condition.wait, _response_ready)
    (Python) File "/usr/local/lib/python3.8/site-packages/grpc/_common.py", line 156, in wait
        _wait_once(wait_fn, MAXIMUM_WAIT_TIMEOUT, spin_cb)
    (Python) File "/usr/local/lib/python3.8/site-packages/grpc/_common.py", line 116, in _wait_once
        wait_fn(timeout=timeout)
    (Python) File "/usr/local/lib/python3.8/threading.py", line 306, in wait
        gotit = waiter.acquire(True, timeout)

Traceback for thread 72 (python) [] (most recent call last):
    (Python) File "/usr/local/lib/python3.8/threading.py", line 890, in _bootstrap
        self._bootstrap_inner()
    (Python) File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
        self.run()
    (Python) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/data_plane.py", line 224, in run
        while not self._finished.wait(next_call - time.time()):
    (Python) File "/usr/local/lib/python3.8/threading.py", line 558, in wait
        signaled = self._cond.wait(timeout)
    (Python) File "/usr/local/lib/python3.8/threading.py", line 306, in wait
        gotit = waiter.acquire(True, timeout)

Traceback for thread 65 (python) [] (most recent call last):
    (Python) File "/usr/local/lib/python3.8/threading.py", line 890, in _bootstrap
        self._bootstrap_inner()
    (Python) File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
        self.run()
    (Python) File "/usr/local/lib/python3.8/threading.py", line 870, in run
        self._target(*self._args, **self._kwargs)
    (Python) File "/usr/local/lib/python3.8/site-packages/grpc/_channel.py", line 275, in consume_request_iterator
        request = next(request_iterator)
    (Python) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/log_handler.py", line 182, in _write_log_entries
        log_entries = [self._log_entry_queue.get()]
    (Python) File "/usr/local/lib/python3.8/queue.py", line 170, in get
        self.not_empty.wait()
    (Python) File "/usr/local/lib/python3.8/threading.py", line 302, in wait
        waiter.acquire()

Traceback for thread 63 (python) [] (most recent call last):
    (Python) File "/usr/local/lib/python3.8/threading.py", line 890, in _bootstrap
        self._bootstrap_inner()
    (Python) File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
        self.run()
    (Python) File "/usr/local/lib/python3.8/threading.py", line 870, in run
        self._target(*self._args, **self._kwargs)
    (Python) File "/usr/local/lib/python3.8/site-packages/grpc/_channel.py", line 1728, in channel_spin
        event = state.channel.next_call_event()

Traceback for thread 60 (python) [] (most recent call last):
    (Python) File "/usr/local/lib/python3.8/threading.py", line 890, in _bootstrap
        self._bootstrap_inner()
    (Python) File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
        self.run()
    (Python) File "/usr/local/lib/python3.8/threading.py", line 870, in run
        self._target(*self._args, **self._kwargs)
    (Python) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/log_handler.py", line 99, in <lambda>
        target=lambda: self._read_log_control_messages(),
    (Python) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/log_handler.py", line 216, in _read_log_control_messages
        for _ in log_control_iterator:
    (Python) File "/usr/local/lib/python3.8/site-packages/grpc/_channel.py", line 542, in __next__
        return self._next()
    (Python) File "/usr/local/lib/python3.8/site-packages/grpc/_channel.py", line 959, in _next
        _common.wait(self._state.condition.wait, _response_ready)
    (Python) File "/usr/local/lib/python3.8/site-packages/grpc/_common.py", line 156, in wait
        _wait_once(wait_fn, MAXIMUM_WAIT_TIMEOUT, spin_cb)
    (Python) File "/usr/local/lib/python3.8/site-packages/grpc/_common.py", line 116, in _wait_once
        wait_fn(timeout=timeout)
    (Python) File "/usr/local/lib/python3.8/threading.py", line 306, in wait
        gotit = waiter.acquire(True, timeout)

Traceback for thread 29 (python) [] (most recent call last):
    (Python) File "/usr/local/lib/python3.8/runpy.py", line 194, in _run_module_as_main
        return _run_code(code, main_globals, None,
    (Python) File "/usr/local/lib/python3.8/runpy.py", line 87, in _run_code
        exec(code, run_globals)
    (Python) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker_main.py", line 367, in <module>
        main(sys.argv)
    (Python) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker_main.py", line 212, in main
        sdk_harness.run()
    (Python) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 266, in run
        for work_request in self._control_stub.Control(get_responses()):
    (Python) File "/usr/local/lib/python3.8/site-packages/grpc/_channel.py", line 542, in __next__
        return self._next()
    (Python) File "/usr/local/lib/python3.8/site-packages/grpc/_channel.py", line 959, in _next
        _common.wait(self._state.condition.wait, _response_ready)
    (Python) File "/usr/local/lib/python3.8/site-packages/grpc/_common.py", line 156, in wait
        _wait_once(wait_fn, MAXIMUM_WAIT_TIMEOUT, spin_cb)
    (Python) File "/usr/local/lib/python3.8/site-packages/grpc/_common.py", line 116, in _wait_once
        wait_fn(timeout=timeout)
    (Python) File "/usr/local/lib/python3.8/threading.py", line 306, in wait
        gotit = waiter.acquire(True, timeout)

@tvalentyn
Copy link
Contributor

TLDR is that a thread that executes bigtable/transports/grpc.create_channel() later calls into likely a python extension cygrpc.Channel(), which holds GIL indefinitely, so other threads cannot run, and we hence SDK is not responsive on /sdk_status RPC calls. https://cloud.google.com/dataflow/docs/guides/common-errors#worker-lost-contact also explains this failure mode

Traceback for thread 100 (python) [Has the GIL] (most recent call last):
...
        transport = self._create_gapic_client_channel(
    (Python) File "/usr/local/lib/python3.8/site-packages/google/cloud/bigtable/client.py", line 285, in _create_gapic_client_channel
        channel = grpc_transport.create_channel(
    (Python) File "/usr/local/lib/python3.8/site-packages/google/cloud/bigtable_v2/services/bigtable/transports/grpc.py", line 217, in create_channel
        return grpc_helpers.create_channel(
    (Python) File "/usr/local/lib/python3.8/site-packages/google/api_core/grpc_helpers.py", line 386, in create_channel
        return grpc.secure_channel(
    (Python) File "/usr/local/lib/python3.8/site-packages/grpc/__init__.py", line 2119, in secure_channel
        return _channel.Channel(
    (Python) File "/usr/local/lib/python3.8/site-packages/grpc/_channel.py", line 2046, in __init__
        self._channel = cygrpc.Channel(

@tvalentyn
Copy link
Contributor

tvalentyn commented Apr 10, 2024

With --native-all, pystack adds a bit more details, and also explicitly says that other threads are waiting on getting GIL:

root@beamapp-valentyn-04101932-04101232-3i3x-harness-3sqg:/# pystack remote 29 --native-all
Traceback for thread 100 (python) [Has the GIL] (most recent call last):
    (C) File "Python/thread_pthread.h", line 232, in pythread_wrapper (/usr/local/lib/libpython3.8.so.1.0)
    (C) File "./Modules/_threadmodule.c", line 1002, in t_bootstrap (/usr/local/lib/libpython3.8.so.1.0)
    (C) File "Objects/call.c", line 284, in function_code_fastcall (/usr/local/lib/libpython3.8.so.1.0)
    (Python) File "/usr/local/lib/python3.8/threading.py", line 890, in _bootstrap
        self._bootstrap_inner()
    (C) File "Objects/call.c", line 284, in function_code_fastcall (/usr/local/lib/libpython3.8.so.1.0)
    (Python) File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
        self.run()
    (C) File "Objects/call.c", line 284, in function_code_fastcall (/usr/local/lib/libpython3.8.so.1.0)
    (Python) File "/usr/local/lib/python3.8/threading.py", line 870, in run
        self._target(*self._args, **self._kwargs)
    (C) File "Objects/call.c", line 284, in function_code_fastcall (/usr/local/lib/libpython3.8.so.1.0)
    (Python) File "/usr/local/lib/python3.8/concurrent/futures/thread.py", line 80, in _worker
        work_item.run()
    (C) File "Objects/call.c", line 284, in function_code_fastcall (/usr/local/lib/libpython3.8.so.1.0)
    (Python) File "/usr/local/lib/python3.8/concurrent/futures/thread.py", line 57, in run
        result = self.fn(*self.args, **self.kwargs)
    (C) File "Objects/call.c", line 284, in function_code_fastcall (/usr/local/lib/libpython3.8.so.1.0)
    (Python) File "/usr/local/lib/python3.8/site-packages/google/cloud/bigtable/batcher.py", line 385, in _flush_rows
        response = self.table.mutate_rows(rows_to_flush)
    (Python) File "/usr/local/lib/python3.8/site-packages/google/cloud/bigtable/table.py", line 724, in mutate_rows
        self.name,
    (C) File "Objects/descrobject.c", line 1496, in property_descr_get (/usr/local/lib/libpython3.8.so.1.0)
    (C) File "Objects/call.c", line 284, in function_code_fastcall (/usr/local/lib/libpython3.8.so.1.0)
    (Python) File "/usr/local/lib/python3.8/site-packages/google/cloud/bigtable/table.py", line 160, in name
        table_client = self._instance._client.table_data_client
    (C) File "Objects/descrobject.c", line 1496, in property_descr_get (/usr/local/lib/libpython3.8.so.1.0)
    (C) File "Objects/call.c", line 284, in function_code_fastcall (/usr/local/lib/libpython3.8.so.1.0)
    (Python) File "/usr/local/lib/python3.8/site-packages/google/cloud/bigtable/client.py", line 332, in table_data_client
        transport = self._create_gapic_client_channel(
    (C) File "Objects/call.c", line 284, in function_code_fastcall (/usr/local/lib/libpython3.8.so.1.0)
    (Python) File "/usr/local/lib/python3.8/site-packages/google/cloud/bigtable/client.py", line 285, in _create_gapic_client_channel
        channel = grpc_transport.create_channel(
    (Python) File "/usr/local/lib/python3.8/site-packages/google/cloud/bigtable_v2/services/bigtable/transports/grpc.py", line 217, in create_channel
        return grpc_helpers.create_channel(
    (Python) File "/usr/local/lib/python3.8/site-packages/google/api_core/grpc_helpers.py", line 386, in create_channel
        return grpc.secure_channel(
    (Python) File "/usr/local/lib/python3.8/site-packages/grpc/__init__.py", line 2119, in secure_channel
        return _channel.Channel(
    (C) File "Objects/typeobject.c", line 6790, in slot_tp_init (/usr/local/lib/libpython3.8.so.1.0)
    (C) File "Objects/call.c", line 284, in function_code_fastcall (/usr/local/lib/libpython3.8.so.1.0)
    (Python) File "/usr/local/lib/python3.8/site-packages/grpc/_channel.py", line 2046, in __init__
        self._channel = cygrpc.Channel(
    (C) File "???", line 0, in syscall (/usr/lib/x86_64-linux-gnu/libc.so.6)

Traceback for thread 99 (python) [Waiting for the GIL] (most recent call last):
    (C) File "Python/thread_pthread.h", line 232, in pythread_wrapper (/usr/local/lib/libpython3.8.so.1.0)
    (C) File "./Modules/_threadmodule.c", line 1002, in t_bootstrap (/usr/local/lib/libpython3.8.so.1.0)
    (C) File "Objects/call.c", line 284, in function_code_fastcall (/usr/local/lib/libpython3.8.so.1.0)
    (Python) File "/usr/local/lib/python3.8/threading.py", line 890, in _bootstrap
        self._bootstrap_inner()
    (C) File "Objects/call.c", line 284, in function_code_fastcall (/usr/local/lib/libpython3.8.so.1.0)
    (Python) File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
        self.run()
    (C) File "Objects/call.c", line 284, in function_code_fastcall (/usr/local/lib/libpython3.8.so.1.0)
    (Python) File "/usr/local/lib/python3.8/threading.py", line 870, in run
        self._target(*self._args, **self._kwargs)
    (C) File "Objects/call.c", line 284, in function_code_fastcall (/usr/local/lib/libpython3.8.so.1.0)
    (Python) File "/usr/local/lib/python3.8/concurrent/futures/thread.py", line 80, in _worker
        work_item.run()
    (C) File "Objects/call.c", line 284, in function_code_fastcall (/usr/local/lib/libpython3.8.so.1.0)
    (Python) File "/usr/local/lib/python3.8/concurrent/futures/thread.py", line 57, in run
        result = self.fn(*self.args, **self.kwargs)
    (C) File "Objects/call.c", line 284, in function_code_fastcall (/usr/local/lib/libpython3.8.so.1.0)
    (Python) File "/usr/local/lib/python3.8/site-packages/google/cloud/bigtable/batcher.py", line 385, in _flush_rows
        response = self.table.mutate_rows(rows_to_flush)
    (Python) File "/usr/local/lib/python3.8/site-packages/google/cloud/bigtable/table.py", line 724, in mutate_rows
        self.name,
    (C) File "Objects/descrobject.c", line 1496, in property_descr_get (/usr/local/lib/libpython3.8.so.1.0)
    (C) File "Objects/call.c", line 284, in function_code_fastcall (/usr/local/lib/libpython3.8.so.1.0)
    (Python) File "/usr/local/lib/python3.8/site-packages/google/cloud/bigtable/table.py", line 160, in name
        table_client = self._instance._client.table_data_client
    (C) File "Objects/descrobject.c", line 1496, in property_descr_get (/usr/local/lib/libpython3.8.so.1.0)
    (C) File "Objects/call.c", line 284, in function_code_fastcall (/usr/local/lib/libpython3.8.so.1.0)
    (Python) File "/usr/local/lib/python3.8/site-packages/google/cloud/bigtable/client.py", line 332, in table_data_client
        transport = self._create_gapic_client_channel(
    (C) File "Objects/call.c", line 284, in function_code_fastcall (/usr/local/lib/libpython3.8.so.1.0)
    (Python) File "/usr/local/lib/python3.8/site-packages/google/cloud/bigtable/client.py", line 285, in _create_gapic_client_channel
        channel = grpc_transport.create_channel(
    (Python) File "/usr/local/lib/python3.8/site-packages/google/cloud/bigtable_v2/services/bigtable/transports/grpc.py", line 217, in create_channel
        return grpc_helpers.create_channel(
    (Python) File "/usr/local/lib/python3.8/site-packages/google/api_core/grpc_helpers.py", line 386, in create_channel
        return grpc.secure_channel(
    (Python) File "/usr/local/lib/python3.8/site-packages/grpc/__init__.py", line 2119, in secure_channel
        return _channel.Channel(
    (C) File "Objects/typeobject.c", line 6790, in slot_tp_init (/usr/local/lib/libpython3.8.so.1.0)
    (C) File "Objects/call.c", line 284, in function_code_fastcall (/usr/local/lib/libpython3.8.so.1.0)
    (Python) File "/usr/local/lib/python3.8/site-packages/grpc/_channel.py", line 2046, in __init__
        self._channel = cygrpc.Channel(
    (Python) File "/usr/local/lib/python3.8/pkgutil.py", line 638, in get_data
        return loader.get_data(resource_name)
    (C) File "Objects/call.c", line 284, in function_code_fastcall (/usr/local/lib/libpython3.8.so.1.0)
    (Python) File "<frozen importlib._bootstrap_external>", line 1033, in get_data
    (C) File "./Modules/_io/clinic/bufferedio.c.h", line 174, in _io__Buffered_read (/usr/local/lib/libpython3.8.so.1.0)
    (C) File "./Modules/_io/bufferedio.c", line 891, in _io__Buffered_read_impl (inlined) (/usr/local/lib/libpython3.8.so.1.0)
    (C) File "./Modules/_io/bufferedio.c", line 1541, in _bufferedreader_read_all (inlined) (/usr/local/lib/libpython3.8.so.1.0)
    (C) File "./Modules/_io/fileio.c", line 706, in _io_FileIO_readall_impl (/usr/local/lib/libpython3.8.so.1.0)
    (C) File "Python/ceval_gil.h", line 206, in take_gil (/usr/local/lib/libpython3.8.so.1.0)
    (C) File "Python/condvar.h", line 73, in PyCOND_TIMEDWAIT (inlined) (/usr/local/lib/libpython3.8.so.1.0)
    (C) File "???", line 0, in pthread_cond_timedwait (/usr/lib/x86_64-linux-gnu/libc.so.6)

Traceback for thread 98 (python) [Waiting for the GIL] (most recent call last):
    (C) File "Python/thread_pthread.h", line 232, in pythread_wrapper (/usr/local/lib/libpython3.8.so.1.0)
    (C) File "./Modules/_threadmodule.c", line 1002, in t_bootstrap (/usr/local/lib/libpython3.8.so.1.0)
    (C) File "Objects/call.c", line 284, in function_code_fastcall (/usr/local/lib/libpython3.8.so.1.0)
    (Python) File "/usr/local/lib/python3.8/threading.py", line 890, in _bootstrap
        self._bootstrap_inner()
    (C) File "Objects/call.c", line 284, in function_code_fastcall (/usr/local/lib/libpython3.8.so.1.0)
    (Python) File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
        self.run()
    (C) File "Objects/call.c", line 284, in function_code_fastcall (/usr/local/lib/libpython3.8.so.1.0)
    (Python) File "/usr/local/lib/python3.8/threading.py", line 1252, in run
        self.finished.wait(self.interval)
    (Python) File "/usr/local/lib/python3.8/threading.py", line 558, in wait
        signaled = self._cond.wait(timeout)
    (Python) File "/usr/local/lib/python3.8/threading.py", line 306, in wait
        gotit = waiter.acquire(True, timeout)
    (C) File "./Modules/_threadmodule.c", line 146, in lock_PyThread_acquire_lock (/usr/local/lib/libpython3.8.so.1.0)
    (C) File "./Modules/_threadmodule.c", line 64, in acquire_timed (/usr/local/lib/libpython3.8.so.1.0)
    (C) File "Python/ceval_gil.h", line 206, in take_gil (/usr/local/lib/libpython3.8.so.1.0)
    (C) File "Python/condvar.h", line 73, in PyCOND_TIMEDWAIT (inlined) (/usr/local/lib/libpython3.8.so.1.0)
    (C) File "???", line 0, in pthread_cond_timedwait (/usr/lib/x86_64-linux-gnu/libc.so.6)

@tvalentyn
Copy link
Contributor

tvalentyn commented Apr 10, 2024

looks like bigtableio_it_test.py::BigtableIOWriteTest::test_bigtable_write was recently re-enabled: 33a0bfb , and it was known to be flaky before, so #30867 (comment) might be a separate issue specific to bigtable client. We should investigate it but it would also help to confirm that this stuckness is the same as what we have in #30867 (comment) .

@tvalentyn
Copy link
Contributor

tvalentyn commented Apr 10, 2024

@DerRidda would it be possible for you to re-run your job using Beam 2.55 SDK, then find a stuck worker VM, and retrieve stacktraces with pystack as I did in: #30867 (comment) ? Note: you might have to use Dataflow Classic instead of Dataflow Prime to be able to access Dataflow workers.

To find a Stuck VM, look for "Unable to retrieve status info from SDK harness." logs, then find which worker emits those logs by expanding the logging entry in Cloud Logging. you might see something like:

{
insertId: "7918190538793288800:164250:0:18926"
jsonPayload: {
line: "fnapi_harness_status_service.cc:212"
message: "Unable to retrieve status info from SDK harness sdk-0-0_sibling_1 within allowed time."
thread: "115"
}
labels: {
compute.googleapis.com/resource_id: "7918190538793288800"
compute.googleapis.com/resource_name: "df-<some_identifier>-harness-9zz3"
compute.googleapis.com/resource_type: "instance"

the df-<some_identifier>-harness-9zz3 would be the VM name

Then, SSH to that VM from UI or via a gcloud command, log into the running python container in a privileged mode and run pystack.

@tvalentyn
Copy link
Contributor

tvalentyn commented Apr 10, 2024

given that across all our test suite I only see the error in Bitgtable tests reliably, I filed a separate issue: #30927.

@DerRidda Let's try to get separate stacktraces for stuck processes for the pipelines discussed earlier in this bug.

@DerRidda
Copy link
Author

@tvalentyn Sorry I didn't reply sooner but I can't really siphon of the time to repro this issue more on Dataflow. Seeing your latest comments in the grpcio bug report I assume you have what you need now, though?

@tvalentyn
Copy link
Contributor

tvalentyn commented Apr 15, 2024

I am not certain they are the same bugs.

@tvalentyn
Copy link
Contributor

we can fix the bug we are currently investigating and then come back to your issue and see if you still reproduce it.

@tvalentyn
Copy link
Contributor

tvalentyn commented Apr 16, 2024

There is a confirmed issue googleapis/python-bigtable#949 that affects google-api-core and grpcio library, which caused a regression in Apache Beam 2.55.0. It will be fixed in the upcoming release of grpcio and mitigated in Beam 2.56.0.

For affected 2.55.0 users, any of the following mitigations should help:

  • downgrade to apache-beam==2.54.0 or below OR

install any of the the following dependency combinations in the Beam pipeline runtime environment (For example, you can use a --requirements_file pipeline option):

  • upgrade grpcio and grpcio-status to version 1.63.0 or above (currently only 1.63.0rc1 is available) OR
  • downgrade google-api-core to version 2.16.2 or below OR
  • downgrade grpcio and grpcio-status to 1.58.0 or below.

@DerRidda
Copy link
Author

DerRidda commented Apr 16, 2024

So, I removed all other dependency pins and just updated grpcio to 1.63.0rc1 (didn't even do grpcio-status in the beginning) on SDK version 2.55.1 with this patch manually applied to rule that out as a potential reason for pipeline stalling.

So far I can no longer repro the issue, my job performs normally under load. I now also added the grpcio-status pin and updated in place, will check once the job is back to expected full scaling but I don't expect issues here.

@damccorm
Copy link
Contributor

@tvalentyn is the fix here basically just disallowing 1.59.0-1.62.1 in

'grpcio>=1.33.1,!=1.48.0,<2',
? Branch cut is tomorrow, so I'd love to get this in today if we can

@tvalentyn
Copy link
Contributor

@DerRidda Great to hear! grpcio-status change is to make pip check happy for the downgrade scenario.
@damccorm yes, a PR is coming shortly.

@tvalentyn
Copy link
Contributor

tvalentyn commented Apr 16, 2024

@tvalentyn is the fix here basically just disallowing 1.59.0-1.62.1 in

We have at least two changes in Beam that depend on new versions of GRPCIO, we'd have to roll them back. I am discussing with GRPC maintainers a possibility for a 1.62.2 patch release.

Update: 1.62.2 has been released.

I updated the issue description to explain the rootcause and mitigation options.

tvalentyn added a commit that referenced this issue Apr 18, 2024
* Exclude broken versions of GRPCIO

* Upgrade requirements.
@tvalentyn tvalentyn changed the title [Bug]: GRCPIO 1.62.1 deadlocks in Beam SDK worker with Dataflow [Bug]: GRCPIO versions from 1.58.0 to 1.62.1 can cause Beam Python pipelines to get stuck Apr 23, 2024
@tvalentyn tvalentyn changed the title [Bug]: GRCPIO versions from 1.58.0 to 1.62.1 can cause Beam Python pipelines to get stuck [Bug]: GRCPIO versions from 1.59.0 to 1.62.1 can cause Beam Python pipelines to get stuck Apr 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants