Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix hive_partition_sensor system test #40023

Merged
merged 1 commit into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions airflow/providers/google/cloud/operators/dataproc_metastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ def execute(self, context: Context) -> dict:
hook = DataprocMetastoreHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
self.log.info("Creating Dataproc Metastore service: %s", self.project_id)
self.log.info("Creating Dataproc Metastore service: %s", self.service_id)
try:
operation = hook.create_service(
region=self.region,
Expand Down Expand Up @@ -548,13 +548,24 @@ def execute(self, context: Context) -> None:
class DataprocMetastoreDeleteServiceOperator(GoogleCloudBaseOperator):
"""Delete a single service.

:param request: The request object. Request message for
[DataprocMetastore.DeleteService][google.cloud.metastore.v1.DataprocMetastore.DeleteService].
:param region: Required. The ID of the Google Cloud region that the service belongs to.
:param project_id: Required. The ID of the Google Cloud project that the service belongs to.
:param service_id: Required. The ID of the metastore service, which is used as the final component of
the metastore service's name. This value must be between 2 and 63 characters long inclusive, begin
with a letter, end with a letter or number, and consist of alphanumeric ASCII characters or
hyphens.
:param retry: Designation of what errors, if any, should be retried.
:param timeout: The timeout for this request.
:param metadata: Strings which should be sent along with the request as metadata.
:param gcp_conn_id:
:param gcp_conn_id: The connection ID to use connecting to Google Cloud.
:param impersonation_chain: Optional service account to impersonate using short-term
credentials, or chained list of accounts required to get the access_token
of the last account in the list, which will be impersonated in the request.
If set as a string, the account must grant the originating account
the Service Account Token Creator IAM role.
If set as a sequence, the identities from the list must grant
Service Account Token Creator IAM role to the directly preceding identity, with first
account from the list granting this role to the originating account (templated).
"""

template_fields: Sequence[str] = (
Expand Down Expand Up @@ -589,7 +600,7 @@ def execute(self, context: Context):
hook = DataprocMetastoreHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
self.log.info("Deleting Dataproc Metastore service: %s", self.project_id)
self.log.info("Deleting Dataproc Metastore service: %s", self.service_id)
operation = hook.delete_service(
region=self.region,
project_id=self.project_id,
Expand All @@ -599,7 +610,7 @@ def execute(self, context: Context):
metadata=self.metadata,
)
hook.wait_for_operation(self.timeout, operation)
self.log.info("Service %s deleted successfully", self.project_id)
self.log.info("Service %s deleted successfully", self.service_id)


class DataprocMetastoreExportMetadataOperator(GoogleCloudBaseOperator):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
# specific language governing permissions and limitations
# under the License.
"""
Example Airflow DAG that show how to check Hive partitions existence
using Dataproc Metastore Sensor.
Example Airflow DAG that shows how to check Hive partitions existence with Dataproc Metastore Sensor.

Note that Metastore service must be configured to use gRPC endpoints.
"""
Expand Down Expand Up @@ -47,7 +46,7 @@
DAG_ID = "hive_partition_sensor"
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "demo-project")
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "demo-env")
REGION = "us-central1"
REGION = "europe-west1"
NETWORK = "default"

METASTORE_SERVICE_ID = f"metastore-{DAG_ID}-{ENV_ID}".replace("_", "-")
Expand All @@ -60,7 +59,7 @@
"network": f"projects/{PROJECT_ID}/global/networks/{NETWORK}",
}
METASTORE_SERVICE_QFN = f"projects/{PROJECT_ID}/locations/{REGION}/services/{METASTORE_SERVICE_ID}"
DATAPROC_CLUSTER_NAME = f"cluster-{DAG_ID}".replace("_", "-")
DATAPROC_CLUSTER_NAME = f"cluster-{DAG_ID}-{ENV_ID}".replace("_", "-")
DATAPROC_CLUSTER_CONFIG = {
"master_config": {
"num_instances": 1,
Expand Down Expand Up @@ -133,7 +132,7 @@

@task(task_id="get_hive_warehouse_bucket_task")
def get_hive_warehouse_bucket(**kwargs):
"""Returns Hive Metastore Warehouse GCS bucket name."""
"""Return Hive Metastore Warehouse GCS bucket name."""
ti = kwargs["ti"]
metastore_service: dict = ti.xcom_pull(task_ids="create_metastore_service")
config_overrides: dict = metastore_service["hive_metastore_config"]["config_overrides"]
Expand Down Expand Up @@ -216,19 +215,16 @@ def get_hive_warehouse_bucket(**kwargs):
trigger_rule=TriggerRule.ALL_DONE,
)

# TEST SETUP
(
# TEST SETUP
create_metastore_service
>> create_cluster
>> get_hive_warehouse_bucket_task
>> copy_source_data
>> create_external_table
>> create_partitioned_table
>> partition_data
)
(
create_metastore_service
# TEST BODY
>> partition_data
>> hive_partition_sensor
# TEST TEARDOWN
>> [delete_dataproc_cluster, delete_metastore_service, delete_warehouse_bucket]
Expand Down