Skip to content
This repository was archived by the owner on Mar 9, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
]
SYSTEM_TEST_EXTERNAL_DEPENDENCIES = [
"psutil",
"flaky",
]
SYSTEM_TEST_LOCAL_DEPENDENCIES = []
SYSTEM_TEST_DEPENDENCIES = []
Expand Down
2 changes: 1 addition & 1 deletion owlbot.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@
versions=gcp.common.detect_versions(path="./google", default_first=True),
unit_test_python_versions=["3.7", "3.8", "3.9", "3.10", "3.11"],
system_test_python_versions=["3.10"],
system_test_external_dependencies=["psutil"],
system_test_external_dependencies=["psutil","flaky"],
)
s.move(templated_files, excludes=[".coveragerc", ".github/release-please.yml", "README.rst", "docs/index.rst"])

Expand Down
119 changes: 96 additions & 23 deletions tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
import sys
import threading
import time
from typing import Any, Callable, cast, TypeVar

# special case python < 3.8
if sys.version_info.major == 3 and sys.version_info.minor < 8:
import mock
else:
from unittest import mock

from flaky import flaky
import pytest

import google.auth
Expand All @@ -43,6 +45,9 @@

from test_utils.system import unique_resource_id

C = TypeVar("C", bound=Callable[..., Any])
typed_flaky = cast(Callable[[C], C], flaky(max_runs=3, min_passes=1))


@pytest.fixture(scope="module")
def project():
Expand All @@ -61,13 +66,13 @@ def subscriber(request):


@pytest.fixture
def topic_path(project, publisher):
def topic_path_base(project, publisher):
topic_name = "t" + unique_resource_id("-")
yield publisher.topic_path(project, topic_name)


@pytest.fixture
def subscription_path(project, subscriber):
def subscription_path_base(project, subscriber):
sub_name = "s" + unique_resource_id("-")
yield subscriber.subscription_path(project, sub_name)

Expand All @@ -82,7 +87,9 @@ def cleanup():
to_call(*args, **kwargs)


def test_publish_messages(publisher, topic_path, cleanup):
def test_publish_messages(publisher, topic_path_base, cleanup):
# Customize topic path to test.
topic_path = topic_path_base + "-publish-messages"
# Make sure the topic gets deleted.
cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))

Expand All @@ -100,7 +107,9 @@ def test_publish_messages(publisher, topic_path, cleanup):
assert isinstance(result, str)


def test_publish_large_messages(publisher, topic_path, cleanup):
def test_publish_large_messages(publisher, topic_path_base, cleanup):
# Customize topic path to test.
topic_path = topic_path_base + "-publish-large-messages"
# Make sure the topic gets deleted.
cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))

Expand Down Expand Up @@ -130,8 +139,11 @@ def test_publish_large_messages(publisher, topic_path, cleanup):


def test_subscribe_to_messages(
publisher, topic_path, subscriber, subscription_path, cleanup
publisher, topic_path_base, subscriber, subscription_path_base, cleanup
):
# Customize topic path to test.
topic_path = topic_path_base + "-subscribe-to-messages"
subscription_path = subscription_path_base + "-subscribe-to-messages"
# Make sure the topic and subscription get deleted.
cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))
cleanup.append(
Expand Down Expand Up @@ -175,8 +187,12 @@ def test_subscribe_to_messages(


def test_subscribe_to_messages_async_callbacks(
publisher, topic_path, subscriber, subscription_path, cleanup
publisher, topic_path_base, subscriber, subscription_path_base, cleanup
):
# Customize topic path to test.
custom_str = "-subscribe-to-messages-async-callback"
topic_path = topic_path_base + custom_str
subscription_path = subscription_path_base + custom_str
# Make sure the topic and subscription get deleted.
cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))
cleanup.append(
Expand Down Expand Up @@ -227,8 +243,12 @@ def test_subscribe_to_messages_async_callbacks(


def test_creating_subscriptions_with_non_default_settings(
publisher, subscriber, project, topic_path, subscription_path, cleanup
publisher, subscriber, project, topic_path_base, subscription_path_base, cleanup
):
# Customize topic path to test.
custom_str = "-creating-subscriptions-with-non-default-settings"
topic_path = topic_path_base + custom_str
subscription_path = subscription_path_base + custom_str
# Make sure the topic and subscription get deleted.
cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))
cleanup.append(
Expand Down Expand Up @@ -346,7 +366,8 @@ def test_listing_topic_subscriptions(publisher, subscriber, project, cleanup):
assert subscriptions == {subscription_paths[0], subscription_paths[2]}


def test_managing_topic_iam_policy(publisher, topic_path, cleanup):
def test_managing_topic_iam_policy(publisher, topic_path_base, cleanup):
topic_path = topic_path_base + "-managing-topic-iam-policy"
cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))

# create a topic and customize its policy
Expand Down Expand Up @@ -375,8 +396,11 @@ def test_managing_topic_iam_policy(publisher, topic_path, cleanup):


def test_managing_subscription_iam_policy(
publisher, subscriber, topic_path, subscription_path, cleanup
publisher, subscriber, topic_path_base, subscription_path_base, cleanup
):
custom_str = "-managing-subscription-iam-policy"
topic_path = topic_path_base + custom_str
subscription_path = subscription_path_base + custom_str
# Make sure the topic and subscription get deleted.
cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))
cleanup.append(
Expand Down Expand Up @@ -410,7 +434,7 @@ def test_managing_subscription_iam_policy(


def test_subscriber_not_leaking_open_sockets(
publisher, topic_path, subscription_path, cleanup
publisher, topic_path_base, subscription_path_base, cleanup
):
# Make sure the topic and the supscription get deleted.
# NOTE: Since subscriber client will be closed in the test, we should not
Expand All @@ -419,8 +443,12 @@ def test_subscriber_not_leaking_open_sockets(
# Also, since the client will get closed, we need another subscriber client
# to clean up the subscription. We also need to make sure that auxiliary
# subscriber releases the sockets, too.
custom_str = "-not-leaking-open-sockets"
subscription_path = subscription_path_base + custom_str
topic_path = topic_path_base + custom_str
subscriber = pubsub_v1.SubscriberClient(transport="grpc")
subscriber_2 = pubsub_v1.SubscriberClient(transport="grpc")

cleanup.append(
(subscriber_2.delete_subscription, (), {"subscription": subscription_path})
)
Expand Down Expand Up @@ -460,8 +488,11 @@ def test_subscriber_not_leaking_open_sockets(


def test_synchronous_pull_no_deadline_error_if_no_messages(
publisher, topic_path, subscriber, subscription_path, cleanup
publisher, topic_path_base, subscriber, subscription_path_base, cleanup
):
custom_str = "-synchronous-pull-deadline-error-if-no-messages"
topic_path = topic_path_base + custom_str
subscription_path = subscription_path_base + custom_str
# Make sure the topic and subscription get deleted.
cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))
cleanup.append(
Expand All @@ -485,8 +516,11 @@ def test_synchronous_pull_no_deadline_error_if_no_messages(

class TestStreamingPull(object):
def test_streaming_pull_callback_error_propagation(
self, publisher, topic_path, subscriber, subscription_path, cleanup
self, publisher, topic_path_base, subscriber, subscription_path_base, cleanup
):
custom_str = "-streaming-pull-callback-error-propagation"
topic_path = topic_path_base + custom_str
subscription_path = subscription_path_base + custom_str
# Make sure the topic and subscription get deleted.
cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))
cleanup.append(
Expand All @@ -512,9 +546,19 @@ class CallbackError(Exception):
with pytest.raises(CallbackError):
future.result(timeout=30)

@typed_flaky
def test_streaming_pull_ack_deadline(
self, publisher, subscriber, project, topic_path, subscription_path, cleanup
self,
publisher,
subscriber,
project,
topic_path_base,
subscription_path_base,
cleanup,
):
custom_str = "-streaming-pull-ack-deadline"
topic_path = topic_path_base + custom_str
subscription_path = subscription_path_base + custom_str
# Make sure the topic and subscription get deleted.
cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))
cleanup.append(
Expand Down Expand Up @@ -564,8 +608,11 @@ def test_streaming_pull_ack_deadline(
subscription_future.cancel()

def test_streaming_pull_max_messages(
self, publisher, topic_path, subscriber, subscription_path, cleanup
self, publisher, topic_path_base, subscriber, subscription_path_base, cleanup
):
custom_str = "-streaming-pull-max-messages"
topic_path = topic_path_base + custom_str
subscription_path = subscription_path_base + custom_str
# Make sure the topic and subscription get deleted.
cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))
cleanup.append(
Expand Down Expand Up @@ -619,9 +666,13 @@ def test_streaming_pull_max_messages(
finally:
subscription_future.cancel() # trigger clean shutdown

@typed_flaky
def test_streaming_pull_blocking_shutdown(
self, publisher, topic_path, subscriber, subscription_path, cleanup
self, publisher, topic_path_base, subscriber, subscription_path_base, cleanup
):
custom_str = "-streaming-pull-blocking-shutdown"
topic_path = topic_path_base + custom_str
subscription_path = subscription_path_base + custom_str
# Make sure the topic and subscription get deleted.
cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))
cleanup.append(
Expand Down Expand Up @@ -702,9 +753,11 @@ def callback2(message):
)
class TestBasicRBAC(object):
def test_streaming_pull_subscriber_permissions_sufficient(
self, publisher, topic_path, subscriber, subscription_path, cleanup
self, publisher, topic_path_base, subscriber, subscription_path_base, cleanup
):

custom_str = "-streaming-pull-subscriber-permissions-sufficient"
topic_path = topic_path_base + custom_str
subscription_path = subscription_path_base + custom_str
# Make sure the topic and subscription get deleted.
cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))
cleanup.append(
Expand Down Expand Up @@ -739,9 +792,11 @@ def test_streaming_pull_subscriber_permissions_sufficient(
future.cancel()

def test_publisher_role_can_publish_messages(
self, publisher, topic_path, subscriber, subscription_path, cleanup
self, publisher, topic_path_base, subscriber, subscription_path_base, cleanup
):

custom_str = "-publisher-role-can-publish-messages"
topic_path = topic_path_base + custom_str
subscription_path = subscription_path_base + custom_str
# Make sure the topic and subscription get deleted.
cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))
cleanup.append(
Expand All @@ -767,8 +822,17 @@ def test_publisher_role_can_publish_messages(
"Snapshot creation is not instant on the backend, causing test falkiness."
)
def test_snapshot_seek_subscriber_permissions_sufficient(
self, project, publisher, topic_path, subscriber, subscription_path, cleanup
self,
project,
publisher,
topic_path_base,
subscriber,
subscription_path_base,
cleanup,
):
custom_str = "-snapshot-seek-subscriber-permissions-sufficient"
topic_path = topic_path_base + custom_str
subscription_path = subscription_path_base + custom_str
snapshot_name = "snap" + unique_resource_id("-")
snapshot_path = "projects/{}/snapshots/{}".format(project, snapshot_name)

Expand Down Expand Up @@ -813,10 +877,10 @@ def test_snapshot_seek_subscriber_permissions_sufficient(
assert len(response.received_messages) == 1

def test_viewer_role_can_list_resources(
self, project, publisher, topic_path, subscriber, cleanup
self, project, publisher, topic_path_base, subscriber, cleanup
):
project_path = "projects/" + project

topic_path = topic_path_base + "-viewer-role-can-list-resources"
# Make sure the created topic gets deleted.
cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))

Expand Down Expand Up @@ -844,8 +908,17 @@ def test_viewer_role_can_list_resources(
next(iter(viewer_only_subscriber.list_snapshots(project=project_path)), None)

def test_editor_role_can_create_resources(
self, project, publisher, topic_path, subscriber, subscription_path, cleanup
self,
project,
publisher,
topic_path_base,
subscriber,
subscription_path_base,
cleanup,
):
custom_str = "-editor-role-can-create-resources"
topic_path = topic_path_base + custom_str
subscription_path = subscription_path_base + custom_str
snapshot_name = "snap" + unique_resource_id("-")
snapshot_path = "projects/{}/snapshots/{}".format(project, snapshot_name)

Expand Down