Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mapped TriggerDagRun operator with trigger_id as partial crashes on accessing extra_link #32150

Closed
1 of 2 tasks
tirkarthi opened this issue Jun 26, 2023 · 4 comments · Fixed by #42563
Closed
1 of 2 tasks
Labels
area:core-operators Operators, Sensors and hooks within Core Airflow area:webserver Webserver related Issues kind:bug This is a clearly a bug

Comments

@tirkarthi
Copy link
Contributor

tirkarthi commented Jun 26, 2023

Apache Airflow version

main (development)

What happened

While loading a Mapped TriggerDagRun operator inside the webserver the trigger_dag_id is set as None and operator also doesn't have partial_kwargs set to fetch it. This is part of partial_kwargs during serialization. So None value causes the link to return 500 error as per below traceback.

query = {"dag_id": cast(TriggerDagRunOperator, operator).trigger_dag_id, "base_date": when}

I tried the below patch but I am not sure if it's a good idea.

diff --git a/airflow/models/mappedoperator.py b/airflow/models/mappedoperator.py
index 66b75923fd..1fc8ce2134 100644
--- a/airflow/models/mappedoperator.py
+++ b/airflow/models/mappedoperator.py
@@ -636,6 +636,10 @@ class MappedOperator(AbstractOperator):
 
         op = SerializedBaseOperator(task_id=self.task_id, params=self.params, _airflow_from_mapped=True)
         SerializedBaseOperator.populate_operator(op, self.operator_class)
+
+        for k, v in getattr(self, 'partial_kwargs', {}).items():
+            setattr(op, k, v)
+
         return op
 
     def _get_specified_expand_input(self) -> ExpandInput:
import datetime

from airflow.decorators import task
from airflow.models.dag import dag
from airflow.operators.trigger_dagrun import TriggerDagRunOperator

from airflow import DAG

with DAG(
    catchup=False,
    start_date=datetime.datetime(2022, 1, 2),
    schedule_interval=datetime.timedelta(seconds=3600),
    dag_id="my_dag_trigger",
    dagrun_timeout=datetime.timedelta(seconds=3600),
    default_args={
        "retry_delay": datetime.timedelta(seconds=3600),
    }
) as dag:
    t1 = TriggerDagRunOperator.partial(
        task_id="my_task", trigger_dag_id="example_bash_operator"
    ).expand(conf=[{"params": {"name": "foo!!!"}}, {"params": {"name": "foo!!!"}}])

    t2 = TriggerDagRunOperator(
        task_id="my_task_1", trigger_dag_id="example_bash_operator"
    )

    t1 >> t2
Traceback (most recent call last):
  File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.8/site-packages/flask/app.py", line 2552, in __call__
    return self.wsgi_app(environ, start_response)
  File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.8/site-packages/flask/app.py", line 2532, in wsgi_app
    response = self.handle_exception(e)
  File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.8/site-packages/flask/app.py", line 2529, in wsgi_app
    response = self.full_dispatch_request()
  File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.8/site-packages/flask/app.py", line 1825, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.8/site-packages/flask/app.py", line 1823, in full_dispatch_request
    rv = self.dispatch_request()
  File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.8/site-packages/flask/app.py", line 1799, in dispatch_request
    return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)
  File "/home/karthikeyan/stuff/python/airflow/airflow/www/auth.py", line 48, in decorated
    return func(*args, **kwargs)
  File "/home/karthikeyan/stuff/python/airflow/airflow/www/decorators.py", line 125, in wrapper
    return f(*args, **kwargs)
  File "/home/karthikeyan/stuff/python/airflow/airflow/utils/session.py", line 76, in wrapper
    return func(*args, session=session, **kwargs)
  File "/home/karthikeyan/stuff/python/airflow/airflow/www/views.py", line 3700, in extra_links
    url = task.get_extra_links(ti, link_name)
  File "/home/karthikeyan/stuff/python/airflow/airflow/models/abstractoperator.py", line 390, in get_extra_links
    return link.get_link(self.unmap(None), ti_key=ti.key)
  File "/home/karthikeyan/stuff/python/airflow/airflow/operators/trigger_dagrun.py", line 65, in get_link
    return build_airflow_url_with_query(query)
  File "/home/karthikeyan/stuff/python/airflow/airflow/utils/helpers.py", line 253, in build_airflow_url_with_query
    return flask.url_for(f"Airflow.{view}", **query)
  File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.8/site-packages/flask/helpers.py", line 256, in url_for
    return current_app.url_for(
  File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.8/site-packages/flask/app.py", line 2034, in url_for
    return self.handle_url_build_error(error, endpoint, values)
  File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.8/site-packages/flask/app.py", line 2023, in url_for
    rv = url_adapter.build(  # type: ignore[union-attr]
  File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.8/site-packages/werkzeug/routing/map.py", line 917, in build
    raise BuildError(endpoint, values, method, self)
werkzeug.routing.exceptions.BuildError: Could not build url for endpoint 'Airflow.grid' with values ['base_date']. Did you forget to specify values ['dag_id']?

What you think should happen instead

The appropriate extra link should be returned.

How to reproduce

  1. Run the attached dag.
  2. Try accessing the extra link url from graph view.

Test case

diff --git a/tests/www/views/test_views_extra_links.py b/tests/www/views/test_views_extra_links.py
index db5d832ef7..e0fd3b1322 100644
--- a/tests/www/views/test_views_extra_links.py
+++ b/tests/www/views/test_views_extra_links.py
@@ -26,6 +26,8 @@ import pytest
 
 from airflow.models import DAG
 from airflow.models.baseoperator import BaseOperator, BaseOperatorLink
+from airflow.models.serialized_dag import SerializedDagModel
+from airflow.operators.trigger_dagrun import TriggerDagRunOperator
 from airflow.utils import timezone
 from airflow.utils.session import create_session
 from airflow.utils.state import DagRunState, TaskInstanceState
@@ -115,6 +117,18 @@ def task_3(dag):
     return Dummy3TestOperator(task_id="some_dummy_task_3", dag=dag)
 
 
+@pytest.fixture(scope="module", autouse=True)
+def trigger_task(dag):
+    TRIGGERED_DAG_ID = "dag"
+
+    return TriggerDagRunOperator.partial(
+        task_id="test_trigger_expand",
+        trigger_dag_id=TRIGGERED_DAG_ID,
+        reset_dag_run=True,
+        dag=dag,
+    ).expand(execution_date=[DEFAULT_DATE])
+
+
 @pytest.fixture(scope="module", autouse=True)
 def init_blank_task_instances():
     """Make sure there are no runs before we test anything.
@@ -290,3 +304,17 @@ def test_operator_extra_link_multiple_operators(dag_run, task_2, task_3, viewer_
     if isinstance(response.data, bytes):
         response_str = response_str.decode()
     assert json.loads(response_str) == {"url": "https://www.google.com", "error": None}
+
+
+def test_trigger_dagrun_expand_view(app, trigger_task, dag_run, viewer_client):
+    """Test TriggerDagRunOperator with expand."""
+    with mock.patch.object(app, "dag_bag") as mock_dag_bag:
+        SerializedDagModel.write_dag(trigger_task.dag)
+        mock_dag_bag.get_dag.return_value = SerializedDagModel.get_dag(trigger_task.dag.dag_id)
+
+        response = viewer_client.get(
+            f"{ENDPOINT}?dag_id={trigger_task.dag_id}&task_id={trigger_task.task_id}"
+            f"&execution_date={DEFAULT_DATE}&link_name=Triggered DAG&map_index=0",
+            follow_redirects=True,
+        )
+        assert response.status_code == 200

Operating System

Ubuntu

Versions of Apache Airflow Providers

No response

Deployment

Virtualenv installation

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@tirkarthi tirkarthi added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Jun 26, 2023
@tirkarthi
Copy link
Contributor Author

cc: @uranusjr since #25360 was fixed and this was found during backporting it to 2.3.4.

@hussein-awala
Copy link
Member

I cannot reproduce the issue. The mapped task does not seem to have an extra link. Could you please provide a screenshot showing what you are seeing?

@tirkarthi
Copy link
Contributor Author

The extra link is not present for mapped task in grid view and I am working on it as an enhancement. Are you trying this in grid view? Can you please try reproducing this in graph view selecting the mapped task and then selecting the map index from dropdown in the modal?

image

@hussein-awala
Copy link
Member

Ah yes I have the same error

@hussein-awala hussein-awala added area:webserver Webserver related Issues area:core-operators Operators, Sensors and hooks within Core Airflow and removed area:core needs-triage label for new issues that we didn't triage yet labels Jun 26, 2023
@hussein-awala hussein-awala self-assigned this Jun 26, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core-operators Operators, Sensors and hooks within Core Airflow area:webserver Webserver related Issues kind:bug This is a clearly a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants