diff --git a/noxfile.py b/noxfile.py index 574fbd644..35e5916a9 100644 --- a/noxfile.py +++ b/noxfile.py @@ -55,6 +55,7 @@ ] SYSTEM_TEST_EXTERNAL_DEPENDENCIES = [ "psutil", + "flaky", ] SYSTEM_TEST_LOCAL_DEPENDENCIES = [] SYSTEM_TEST_DEPENDENCIES = [] diff --git a/owlbot.py b/owlbot.py index 8787d90c6..7539adfdb 100644 --- a/owlbot.py +++ b/owlbot.py @@ -338,7 +338,7 @@ versions=gcp.common.detect_versions(path="./google", default_first=True), unit_test_python_versions=["3.7", "3.8", "3.9", "3.10", "3.11"], system_test_python_versions=["3.10"], - system_test_external_dependencies=["psutil"], + system_test_external_dependencies=["psutil","flaky"], ) s.move(templated_files, excludes=[".coveragerc", ".github/release-please.yml", "README.rst", "docs/index.rst"]) diff --git a/tests/system.py b/tests/system.py index d7f7c5bea..bb1265453 100644 --- a/tests/system.py +++ b/tests/system.py @@ -23,6 +23,7 @@ import sys import threading import time +from typing import Any, Callable, cast, TypeVar # special case python < 3.8 if sys.version_info.major == 3 and sys.version_info.minor < 8: @@ -30,6 +31,7 @@ else: from unittest import mock +from flaky import flaky import pytest import google.auth @@ -43,6 +45,9 @@ from test_utils.system import unique_resource_id +C = TypeVar("C", bound=Callable[..., Any]) +typed_flaky = cast(Callable[[C], C], flaky(max_runs=3, min_passes=1)) + @pytest.fixture(scope="module") def project(): @@ -61,13 +66,13 @@ def subscriber(request): @pytest.fixture -def topic_path(project, publisher): +def topic_path_base(project, publisher): topic_name = "t" + unique_resource_id("-") yield publisher.topic_path(project, topic_name) @pytest.fixture -def subscription_path(project, subscriber): +def subscription_path_base(project, subscriber): sub_name = "s" + unique_resource_id("-") yield subscriber.subscription_path(project, sub_name) @@ -82,7 +87,9 @@ def cleanup(): to_call(*args, **kwargs) -def test_publish_messages(publisher, topic_path, cleanup): +def test_publish_messages(publisher, topic_path_base, cleanup): + # Customize topic path to test. + topic_path = topic_path_base + "-publish-messages" # Make sure the topic gets deleted. cleanup.append((publisher.delete_topic, (), {"topic": topic_path})) @@ -100,7 +107,9 @@ def test_publish_messages(publisher, topic_path, cleanup): assert isinstance(result, str) -def test_publish_large_messages(publisher, topic_path, cleanup): +def test_publish_large_messages(publisher, topic_path_base, cleanup): + # Customize topic path to test. + topic_path = topic_path_base + "-publish-large-messages" # Make sure the topic gets deleted. cleanup.append((publisher.delete_topic, (), {"topic": topic_path})) @@ -130,8 +139,11 @@ def test_publish_large_messages(publisher, topic_path, cleanup): def test_subscribe_to_messages( - publisher, topic_path, subscriber, subscription_path, cleanup + publisher, topic_path_base, subscriber, subscription_path_base, cleanup ): + # Customize topic path to test. + topic_path = topic_path_base + "-subscribe-to-messages" + subscription_path = subscription_path_base + "-subscribe-to-messages" # Make sure the topic and subscription get deleted. cleanup.append((publisher.delete_topic, (), {"topic": topic_path})) cleanup.append( @@ -175,8 +187,12 @@ def test_subscribe_to_messages( def test_subscribe_to_messages_async_callbacks( - publisher, topic_path, subscriber, subscription_path, cleanup + publisher, topic_path_base, subscriber, subscription_path_base, cleanup ): + # Customize topic path to test. + custom_str = "-subscribe-to-messages-async-callback" + topic_path = topic_path_base + custom_str + subscription_path = subscription_path_base + custom_str # Make sure the topic and subscription get deleted. cleanup.append((publisher.delete_topic, (), {"topic": topic_path})) cleanup.append( @@ -227,8 +243,12 @@ def test_subscribe_to_messages_async_callbacks( def test_creating_subscriptions_with_non_default_settings( - publisher, subscriber, project, topic_path, subscription_path, cleanup + publisher, subscriber, project, topic_path_base, subscription_path_base, cleanup ): + # Customize topic path to test. + custom_str = "-creating-subscriptions-with-non-default-settings" + topic_path = topic_path_base + custom_str + subscription_path = subscription_path_base + custom_str # Make sure the topic and subscription get deleted. cleanup.append((publisher.delete_topic, (), {"topic": topic_path})) cleanup.append( @@ -346,7 +366,8 @@ def test_listing_topic_subscriptions(publisher, subscriber, project, cleanup): assert subscriptions == {subscription_paths[0], subscription_paths[2]} -def test_managing_topic_iam_policy(publisher, topic_path, cleanup): +def test_managing_topic_iam_policy(publisher, topic_path_base, cleanup): + topic_path = topic_path_base + "-managing-topic-iam-policy" cleanup.append((publisher.delete_topic, (), {"topic": topic_path})) # create a topic and customize its policy @@ -375,8 +396,11 @@ def test_managing_topic_iam_policy(publisher, topic_path, cleanup): def test_managing_subscription_iam_policy( - publisher, subscriber, topic_path, subscription_path, cleanup + publisher, subscriber, topic_path_base, subscription_path_base, cleanup ): + custom_str = "-managing-subscription-iam-policy" + topic_path = topic_path_base + custom_str + subscription_path = subscription_path_base + custom_str # Make sure the topic and subscription get deleted. cleanup.append((publisher.delete_topic, (), {"topic": topic_path})) cleanup.append( @@ -410,7 +434,7 @@ def test_managing_subscription_iam_policy( def test_subscriber_not_leaking_open_sockets( - publisher, topic_path, subscription_path, cleanup + publisher, topic_path_base, subscription_path_base, cleanup ): # Make sure the topic and the supscription get deleted. # NOTE: Since subscriber client will be closed in the test, we should not @@ -419,8 +443,12 @@ def test_subscriber_not_leaking_open_sockets( # Also, since the client will get closed, we need another subscriber client # to clean up the subscription. We also need to make sure that auxiliary # subscriber releases the sockets, too. + custom_str = "-not-leaking-open-sockets" + subscription_path = subscription_path_base + custom_str + topic_path = topic_path_base + custom_str subscriber = pubsub_v1.SubscriberClient(transport="grpc") subscriber_2 = pubsub_v1.SubscriberClient(transport="grpc") + cleanup.append( (subscriber_2.delete_subscription, (), {"subscription": subscription_path}) ) @@ -460,8 +488,11 @@ def test_subscriber_not_leaking_open_sockets( def test_synchronous_pull_no_deadline_error_if_no_messages( - publisher, topic_path, subscriber, subscription_path, cleanup + publisher, topic_path_base, subscriber, subscription_path_base, cleanup ): + custom_str = "-synchronous-pull-deadline-error-if-no-messages" + topic_path = topic_path_base + custom_str + subscription_path = subscription_path_base + custom_str # Make sure the topic and subscription get deleted. cleanup.append((publisher.delete_topic, (), {"topic": topic_path})) cleanup.append( @@ -485,8 +516,11 @@ def test_synchronous_pull_no_deadline_error_if_no_messages( class TestStreamingPull(object): def test_streaming_pull_callback_error_propagation( - self, publisher, topic_path, subscriber, subscription_path, cleanup + self, publisher, topic_path_base, subscriber, subscription_path_base, cleanup ): + custom_str = "-streaming-pull-callback-error-propagation" + topic_path = topic_path_base + custom_str + subscription_path = subscription_path_base + custom_str # Make sure the topic and subscription get deleted. cleanup.append((publisher.delete_topic, (), {"topic": topic_path})) cleanup.append( @@ -512,9 +546,19 @@ class CallbackError(Exception): with pytest.raises(CallbackError): future.result(timeout=30) + @typed_flaky def test_streaming_pull_ack_deadline( - self, publisher, subscriber, project, topic_path, subscription_path, cleanup + self, + publisher, + subscriber, + project, + topic_path_base, + subscription_path_base, + cleanup, ): + custom_str = "-streaming-pull-ack-deadline" + topic_path = topic_path_base + custom_str + subscription_path = subscription_path_base + custom_str # Make sure the topic and subscription get deleted. cleanup.append((publisher.delete_topic, (), {"topic": topic_path})) cleanup.append( @@ -564,8 +608,11 @@ def test_streaming_pull_ack_deadline( subscription_future.cancel() def test_streaming_pull_max_messages( - self, publisher, topic_path, subscriber, subscription_path, cleanup + self, publisher, topic_path_base, subscriber, subscription_path_base, cleanup ): + custom_str = "-streaming-pull-max-messages" + topic_path = topic_path_base + custom_str + subscription_path = subscription_path_base + custom_str # Make sure the topic and subscription get deleted. cleanup.append((publisher.delete_topic, (), {"topic": topic_path})) cleanup.append( @@ -619,9 +666,13 @@ def test_streaming_pull_max_messages( finally: subscription_future.cancel() # trigger clean shutdown + @typed_flaky def test_streaming_pull_blocking_shutdown( - self, publisher, topic_path, subscriber, subscription_path, cleanup + self, publisher, topic_path_base, subscriber, subscription_path_base, cleanup ): + custom_str = "-streaming-pull-blocking-shutdown" + topic_path = topic_path_base + custom_str + subscription_path = subscription_path_base + custom_str # Make sure the topic and subscription get deleted. cleanup.append((publisher.delete_topic, (), {"topic": topic_path})) cleanup.append( @@ -702,9 +753,11 @@ def callback2(message): ) class TestBasicRBAC(object): def test_streaming_pull_subscriber_permissions_sufficient( - self, publisher, topic_path, subscriber, subscription_path, cleanup + self, publisher, topic_path_base, subscriber, subscription_path_base, cleanup ): - + custom_str = "-streaming-pull-subscriber-permissions-sufficient" + topic_path = topic_path_base + custom_str + subscription_path = subscription_path_base + custom_str # Make sure the topic and subscription get deleted. cleanup.append((publisher.delete_topic, (), {"topic": topic_path})) cleanup.append( @@ -739,9 +792,11 @@ def test_streaming_pull_subscriber_permissions_sufficient( future.cancel() def test_publisher_role_can_publish_messages( - self, publisher, topic_path, subscriber, subscription_path, cleanup + self, publisher, topic_path_base, subscriber, subscription_path_base, cleanup ): - + custom_str = "-publisher-role-can-publish-messages" + topic_path = topic_path_base + custom_str + subscription_path = subscription_path_base + custom_str # Make sure the topic and subscription get deleted. cleanup.append((publisher.delete_topic, (), {"topic": topic_path})) cleanup.append( @@ -767,8 +822,17 @@ def test_publisher_role_can_publish_messages( "Snapshot creation is not instant on the backend, causing test falkiness." ) def test_snapshot_seek_subscriber_permissions_sufficient( - self, project, publisher, topic_path, subscriber, subscription_path, cleanup + self, + project, + publisher, + topic_path_base, + subscriber, + subscription_path_base, + cleanup, ): + custom_str = "-snapshot-seek-subscriber-permissions-sufficient" + topic_path = topic_path_base + custom_str + subscription_path = subscription_path_base + custom_str snapshot_name = "snap" + unique_resource_id("-") snapshot_path = "projects/{}/snapshots/{}".format(project, snapshot_name) @@ -813,10 +877,10 @@ def test_snapshot_seek_subscriber_permissions_sufficient( assert len(response.received_messages) == 1 def test_viewer_role_can_list_resources( - self, project, publisher, topic_path, subscriber, cleanup + self, project, publisher, topic_path_base, subscriber, cleanup ): project_path = "projects/" + project - + topic_path = topic_path_base + "-viewer-role-can-list-resources" # Make sure the created topic gets deleted. cleanup.append((publisher.delete_topic, (), {"topic": topic_path})) @@ -844,8 +908,17 @@ def test_viewer_role_can_list_resources( next(iter(viewer_only_subscriber.list_snapshots(project=project_path)), None) def test_editor_role_can_create_resources( - self, project, publisher, topic_path, subscriber, subscription_path, cleanup + self, + project, + publisher, + topic_path_base, + subscriber, + subscription_path_base, + cleanup, ): + custom_str = "-editor-role-can-create-resources" + topic_path = topic_path_base + custom_str + subscription_path = subscription_path_base + custom_str snapshot_name = "snap" + unique_resource_id("-") snapshot_path = "projects/{}/snapshots/{}".format(project, snapshot_name)