diff --git a/.coveragerc b/.coveragerc index 3a8249f6..d59cabf2 100644 --- a/.coveragerc +++ b/.coveragerc @@ -2,7 +2,6 @@ branch = True [report] -fail_under = 100 show_missing = True omit = google/cloud/pubsublite/__init__.py diff --git a/.github/.OwlBot.lock.yaml b/.github/.OwlBot.lock.yaml index da616c91..e2b39f94 100644 --- a/.github/.OwlBot.lock.yaml +++ b/.github/.OwlBot.lock.yaml @@ -1,3 +1,3 @@ docker: image: gcr.io/repo-automation-bots/owlbot-python:latest - digest: sha256:c66ba3c8d7bc8566f47df841f98cd0097b28fff0b1864c86f5817f4c8c3e8600 + digest: sha256:99d90d097e4a4710cc8658ee0b5b963f4426d0e424819787c3ac1405c9a26719 diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 5b661519..1bf7ad8b 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -6,7 +6,7 @@ # The @googleapis/api-pubsublite is the default owner for changes in this repo -* @googleapis/api-pubsublite +* @googleapis/yoshi-python @googleapis/api-pubsublite # The python-samples-reviewers team is the default owner for samples changes /samples/ @googleapis/python-samples-owners diff --git a/.kokoro/samples/python3.9/common.cfg b/.kokoro/samples/python3.9/common.cfg new file mode 100644 index 00000000..1c46942f --- /dev/null +++ b/.kokoro/samples/python3.9/common.cfg @@ -0,0 +1,40 @@ +# Format: //devtools/kokoro/config/proto/build.proto + +# Build logs will be here +action { + define_artifacts { + regex: "**/*sponge_log.xml" + } +} + +# Specify which tests to run +env_vars: { + key: "RUN_TESTS_SESSION" + value: "py-3.9" +} + +# Declare build specific Cloud project. +env_vars: { + key: "BUILD_SPECIFIC_GCLOUD_PROJECT" + value: "python-docs-samples-tests-py39" +} + +env_vars: { + key: "TRAMPOLINE_BUILD_FILE" + value: "github/python-pubsublite/.kokoro/test-samples.sh" +} + +# Configure the docker image for kokoro-trampoline. +env_vars: { + key: "TRAMPOLINE_IMAGE" + value: "gcr.io/cloud-devrel-kokoro-resources/python-samples-testing-docker" +} + +# Download secrets for samples +gfile_resources: "/bigstore/cloud-devrel-kokoro-resources/python-docs-samples" + +# Download trampoline resources. +gfile_resources: "/bigstore/cloud-devrel-kokoro-resources/trampoline" + +# Use the trampoline script to run in docker. +build_file: "python-pubsublite/.kokoro/trampoline.sh" \ No newline at end of file diff --git a/.kokoro/samples/python3.9/continuous.cfg b/.kokoro/samples/python3.9/continuous.cfg new file mode 100644 index 00000000..a1c8d975 --- /dev/null +++ b/.kokoro/samples/python3.9/continuous.cfg @@ -0,0 +1,6 @@ +# Format: //devtools/kokoro/config/proto/build.proto + +env_vars: { + key: "INSTALL_LIBRARY_FROM_SOURCE" + value: "True" +} \ No newline at end of file diff --git a/.kokoro/samples/python3.9/periodic-head.cfg b/.kokoro/samples/python3.9/periodic-head.cfg new file mode 100644 index 00000000..f9cfcd33 --- /dev/null +++ b/.kokoro/samples/python3.9/periodic-head.cfg @@ -0,0 +1,11 @@ +# Format: //devtools/kokoro/config/proto/build.proto + +env_vars: { + key: "INSTALL_LIBRARY_FROM_SOURCE" + value: "True" +} + +env_vars: { + key: "TRAMPOLINE_BUILD_FILE" + value: "github/python-pubsub/.kokoro/test-samples-against-head.sh" +} diff --git a/.kokoro/samples/python3.9/periodic.cfg b/.kokoro/samples/python3.9/periodic.cfg new file mode 100644 index 00000000..50fec964 --- /dev/null +++ b/.kokoro/samples/python3.9/periodic.cfg @@ -0,0 +1,6 @@ +# Format: //devtools/kokoro/config/proto/build.proto + +env_vars: { + key: "INSTALL_LIBRARY_FROM_SOURCE" + value: "False" +} \ No newline at end of file diff --git a/.kokoro/samples/python3.9/presubmit.cfg b/.kokoro/samples/python3.9/presubmit.cfg new file mode 100644 index 00000000..a1c8d975 --- /dev/null +++ b/.kokoro/samples/python3.9/presubmit.cfg @@ -0,0 +1,6 @@ +# Format: //devtools/kokoro/config/proto/build.proto + +env_vars: { + key: "INSTALL_LIBRARY_FROM_SOURCE" + value: "True" +} \ No newline at end of file diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 4f00c7cf..62eb5a77 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -16,7 +16,7 @@ # See https://pre-commit.com/hooks.html for more hooks repos: - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v3.4.0 + rev: v4.0.1 hooks: - id: trailing-whitespace - id: end-of-file-fixer diff --git a/CHANGELOG.md b/CHANGELOG.md index a7d1de5f..1887d1ea 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,26 @@ # Changelog +## [0.6.0](https://www.github.com/googleapis/python-pubsublite/compare/v0.5.0...v0.6.0) (2021-07-13) + + +### Features + +* add always_use_jwt_access ([6c84e24](https://www.github.com/googleapis/python-pubsublite/commit/6c84e24ce6e3e0c50f2807c2e98db47fe0424715)) +* Add SeekSubscription and Operations to API ([#169](https://www.github.com/googleapis/python-pubsublite/issues/169)) ([2e29ba1](https://www.github.com/googleapis/python-pubsublite/commit/2e29ba1f39f299acf97e543db355bf8ebfcdf121)) + + +### Bug Fixes + +* **deps:** add packaging requirement ([#162](https://www.github.com/googleapis/python-pubsublite/issues/162)) ([94281c5](https://www.github.com/googleapis/python-pubsublite/commit/94281c5b925b550c0e0905e3f53ec9d23c45b499)) +* disable always_use_jwt_access ([#175](https://www.github.com/googleapis/python-pubsublite/issues/175)) ([6c84e24](https://www.github.com/googleapis/python-pubsublite/commit/6c84e24ce6e3e0c50f2807c2e98db47fe0424715)) +* exclude docs and tests from package ([#161](https://www.github.com/googleapis/python-pubsublite/issues/161)) ([b8a70d9](https://www.github.com/googleapis/python-pubsublite/commit/b8a70d9bafca7d62351404421c465e9dfc466420)) +* is_reset_signal should handle status that is None ([#183](https://www.github.com/googleapis/python-pubsublite/issues/183)) ([4ba484e](https://www.github.com/googleapis/python-pubsublite/commit/4ba484e1c5f8ff458a4ad462167f8907b44ebe28)) + + +### Documentation + +* omit mention of Python 2.7 in 'CONTRIBUTING.rst' ([#1127](https://www.github.com/googleapis/python-pubsublite/issues/1127)) ([#165](https://www.github.com/googleapis/python-pubsublite/issues/165)) ([cea99be](https://www.github.com/googleapis/python-pubsublite/commit/cea99be19a5415796eaddf7f51ca4bcd4af9f75f)) + ## [0.5.0](https://www.github.com/googleapis/python-pubsublite/compare/v0.4.1...v0.5.0) (2021-06-11) diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst index 06ab24a4..c653f5d8 100644 --- a/CONTRIBUTING.rst +++ b/CONTRIBUTING.rst @@ -68,15 +68,12 @@ Using ``nox`` We use `nox `__ to instrument our tests. - To test your changes, run unit tests with ``nox``:: + $ nox -s unit - $ nox -s unit-2.7 - $ nox -s unit-3.8 - $ ... +- To run a single unit test:: -- Args to pytest can be passed through the nox command separated by a `--`. For - example, to run a single test:: + $ nox -s unit-3.9 -- -k - $ nox -s unit-3.8 -- -k .. note:: @@ -143,8 +140,7 @@ Running System Tests - To run system tests, you can execute:: # Run all system tests - $ nox -s system-3.8 - $ nox -s system-2.7 + $ nox -s system # Run a single system test $ nox -s system-3.8 -- -k @@ -152,9 +148,8 @@ Running System Tests .. note:: - System tests are only configured to run under Python 2.7 and - Python 3.8. For expediency, we do not run them in older versions - of Python 3. + System tests are only configured to run under Python 3.8. + For expediency, we do not run them in older versions of Python 3. This alone will not run the tests. You'll need to change some local auth settings and change some configuration in your project to @@ -218,8 +213,8 @@ Supported versions can be found in our ``noxfile.py`` `config`_. .. _config: https://github.com/googleapis/python-pubsublite/blob/master/noxfile.py -We also explicitly decided to support Python 3 beginning with version -3.6. Reasons for this include: +We also explicitly decided to support Python 3 beginning with version 3.6. +Reasons for this include: - Encouraging use of newest versions of Python 3 - Taking the lead of `prominent`_ open-source `projects`_ diff --git a/docs/conf.py b/docs/conf.py index 1fd2191f..0576e13d 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -80,9 +80,9 @@ master_doc = "index" # General information about the project. -project = u"google-cloud-pubsublite" -copyright = u"2019, Google" -author = u"Google APIs" +project = "google-cloud-pubsublite" +copyright = "2019, Google" +author = "Google APIs" # The version info for the project you're documenting, acts as replacement for # |version| and |release|, also used in various other places throughout the @@ -281,7 +281,7 @@ ( master_doc, "google-cloud-pubsublite.tex", - u"google-cloud-pubsublite Documentation", + "google-cloud-pubsublite Documentation", author, "manual", ) @@ -316,7 +316,7 @@ ( master_doc, "google-cloud-pubsublite", - u"google-cloud-pubsublite Documentation", + "google-cloud-pubsublite Documentation", [author], 1, ) @@ -335,7 +335,7 @@ ( master_doc, "google-cloud-pubsublite", - u"google-cloud-pubsublite Documentation", + "google-cloud-pubsublite Documentation", author, "google-cloud-pubsublite", "google-cloud-pubsublite Library", diff --git a/google/cloud/pubsublite/internal/wire/reset_signal.py b/google/cloud/pubsublite/internal/wire/reset_signal.py index ce4fa3d5..5771a0dc 100644 --- a/google/cloud/pubsublite/internal/wire/reset_signal.py +++ b/google/cloud/pubsublite/internal/wire/reset_signal.py @@ -29,14 +29,17 @@ def is_reset_signal(error: GoogleAPICallError) -> bool: return False try: status = rpc_status.from_call(error.response) + if not status: + return False for detail in status.details: - info = ErrorInfo() - if ( - detail.Unpack(info) - and info.reason == "RESET" - and info.domain == "pubsublite.googleapis.com" - ): - return True + if detail.Is(ErrorInfo.DESCRIPTOR): + info = ErrorInfo() + if ( + detail.Unpack(info) + and info.reason == "RESET" + and info.domain == "pubsublite.googleapis.com" + ): + return True except ValueError: pass return False diff --git a/google/cloud/pubsublite/testing/test_reset_signal.py b/google/cloud/pubsublite/testing/test_reset_signal.py index 97a45c42..d0d989da 100644 --- a/google/cloud/pubsublite/testing/test_reset_signal.py +++ b/google/cloud/pubsublite/testing/test_reset_signal.py @@ -30,6 +30,13 @@ def make_call(status_pb: Status) -> grpc.Call: return mock_call +def make_call_without_metadata(status_pb: Status) -> grpc.Call: + mock_call = make_call(status_pb) + # Causes rpc_status.from_call to return None. + mock_call.trailing_metadata.return_value = None + return mock_call + + def make_reset_signal() -> GoogleAPICallError: any = Any() any.Pack(ErrorInfo(reason="RESET", domain="pubsublite.googleapis.com")) diff --git a/google/cloud/pubsublite_v1/__init__.py b/google/cloud/pubsublite_v1/__init__.py index fbcd53a0..bbec89bf 100644 --- a/google/cloud/pubsublite_v1/__init__.py +++ b/google/cloud/pubsublite_v1/__init__.py @@ -47,6 +47,9 @@ from .types.admin import ListTopicsResponse from .types.admin import ListTopicSubscriptionsRequest from .types.admin import ListTopicSubscriptionsResponse +from .types.admin import OperationMetadata +from .types.admin import SeekSubscriptionRequest +from .types.admin import SeekSubscriptionResponse from .types.admin import TopicPartitions from .types.admin import UpdateReservationRequest from .types.admin import UpdateSubscriptionRequest @@ -147,6 +150,7 @@ "MessagePublishRequest", "MessagePublishResponse", "MessageResponse", + "OperationMetadata", "PartitionAssignment", "PartitionAssignmentAck", "PartitionAssignmentRequest", @@ -159,6 +163,8 @@ "Reservation", "SeekRequest", "SeekResponse", + "SeekSubscriptionRequest", + "SeekSubscriptionResponse", "SequencedCommitCursorRequest", "SequencedCommitCursorResponse", "SequencedMessage", diff --git a/google/cloud/pubsublite_v1/gapic_metadata.json b/google/cloud/pubsublite_v1/gapic_metadata.json index 6e66f383..1e350404 100644 --- a/google/cloud/pubsublite_v1/gapic_metadata.json +++ b/google/cloud/pubsublite_v1/gapic_metadata.json @@ -85,6 +85,11 @@ "list_topics" ] }, + "SeekSubscription": { + "methods": [ + "seek_subscription" + ] + }, "UpdateReservation": { "methods": [ "update_reservation" @@ -180,6 +185,11 @@ "list_topics" ] }, + "SeekSubscription": { + "methods": [ + "seek_subscription" + ] + }, "UpdateReservation": { "methods": [ "update_reservation" diff --git a/google/cloud/pubsublite_v1/services/admin_service/async_client.py b/google/cloud/pubsublite_v1/services/admin_service/async_client.py index cf3e1afb..bd98ad22 100644 --- a/google/cloud/pubsublite_v1/services/admin_service/async_client.py +++ b/google/cloud/pubsublite_v1/services/admin_service/async_client.py @@ -26,6 +26,8 @@ from google.auth import credentials as ga_credentials # type: ignore from google.oauth2 import service_account # type: ignore +from google.api_core import operation # type: ignore +from google.api_core import operation_async # type: ignore from google.cloud.pubsublite_v1.services.admin_service import pagers from google.cloud.pubsublite_v1.types import admin from google.cloud.pubsublite_v1.types import common @@ -1084,6 +1086,91 @@ async def delete_subscription( request, retry=retry, timeout=timeout, metadata=metadata, ) + async def seek_subscription( + self, + request: admin.SeekSubscriptionRequest = None, + *, + retry: retries.Retry = gapic_v1.method.DEFAULT, + timeout: float = None, + metadata: Sequence[Tuple[str, str]] = (), + ) -> operation_async.AsyncOperation: + r"""Performs an out-of-band seek for a subscription to a + specified target, which may be timestamps or named + positions within the message backlog. Seek translates + these targets to cursors for each partition and + orchestrates subscribers to start consuming messages + from these seek cursors. + + If an operation is returned, the seek has been + registered and subscribers will eventually receive + messages from the seek cursors (i.e. eventual + consistency), as long as they are using a minimum + supported client library version and not a system that + tracks cursors independently of Pub/Sub Lite (e.g. + Apache Beam, Dataflow, Spark). The seek operation will + fail for unsupported clients. + + If clients would like to know when subscribers react to + the seek (or not), they can poll the operation. The seek + operation will succeed and complete once subscribers are + ready to receive messages from the seek cursors for all + partitions of the topic. This means that the seek + operation will not complete until all subscribers come + online. + + If the previous seek operation has not yet completed, it + will be aborted and the new invocation of seek will + supersede it. + + Args: + request (:class:`google.cloud.pubsublite_v1.types.SeekSubscriptionRequest`): + The request object. Request for SeekSubscription. + retry (google.api_core.retry.Retry): Designation of what errors, if any, + should be retried. + timeout (float): The timeout for this request. + metadata (Sequence[Tuple[str, str]]): Strings which should be + sent along with the request as metadata. + + Returns: + google.api_core.operation_async.AsyncOperation: + An object representing a long-running operation. + + The result type for the operation will be + :class:`google.cloud.pubsublite_v1.types.SeekSubscriptionResponse` + Response for SeekSubscription long running operation. + + """ + # Create or coerce a protobuf request object. + request = admin.SeekSubscriptionRequest(request) + + # Wrap the RPC method; this adds retry and timeout information, + # and friendly error handling. + rpc = gapic_v1.method_async.wrap_method( + self._client._transport.seek_subscription, + default_timeout=None, + client_info=DEFAULT_CLIENT_INFO, + ) + + # Certain fields should be provided within the metadata header; + # add these here. + metadata = tuple(metadata) + ( + gapic_v1.routing_header.to_grpc_metadata((("name", request.name),)), + ) + + # Send the request. + response = await rpc(request, retry=retry, timeout=timeout, metadata=metadata,) + + # Wrap the response in an operation future. + response = operation_async.from_gapic( + response, + self._client._transport.operations_client, + admin.SeekSubscriptionResponse, + metadata_type=admin.OperationMetadata, + ) + + # Done; return the response. + return response + async def create_reservation( self, request: admin.CreateReservationRequest = None, diff --git a/google/cloud/pubsublite_v1/services/admin_service/client.py b/google/cloud/pubsublite_v1/services/admin_service/client.py index a4926aa1..ee16b493 100644 --- a/google/cloud/pubsublite_v1/services/admin_service/client.py +++ b/google/cloud/pubsublite_v1/services/admin_service/client.py @@ -30,6 +30,8 @@ from google.auth.exceptions import MutualTLSChannelError # type: ignore from google.oauth2 import service_account # type: ignore +from google.api_core import operation # type: ignore +from google.api_core import operation_async # type: ignore from google.cloud.pubsublite_v1.services.admin_service import pagers from google.cloud.pubsublite_v1.types import admin from google.cloud.pubsublite_v1.types import common @@ -1299,6 +1301,92 @@ def delete_subscription( request, retry=retry, timeout=timeout, metadata=metadata, ) + def seek_subscription( + self, + request: admin.SeekSubscriptionRequest = None, + *, + retry: retries.Retry = gapic_v1.method.DEFAULT, + timeout: float = None, + metadata: Sequence[Tuple[str, str]] = (), + ) -> operation.Operation: + r"""Performs an out-of-band seek for a subscription to a + specified target, which may be timestamps or named + positions within the message backlog. Seek translates + these targets to cursors for each partition and + orchestrates subscribers to start consuming messages + from these seek cursors. + + If an operation is returned, the seek has been + registered and subscribers will eventually receive + messages from the seek cursors (i.e. eventual + consistency), as long as they are using a minimum + supported client library version and not a system that + tracks cursors independently of Pub/Sub Lite (e.g. + Apache Beam, Dataflow, Spark). The seek operation will + fail for unsupported clients. + + If clients would like to know when subscribers react to + the seek (or not), they can poll the operation. The seek + operation will succeed and complete once subscribers are + ready to receive messages from the seek cursors for all + partitions of the topic. This means that the seek + operation will not complete until all subscribers come + online. + + If the previous seek operation has not yet completed, it + will be aborted and the new invocation of seek will + supersede it. + + Args: + request (google.cloud.pubsublite_v1.types.SeekSubscriptionRequest): + The request object. Request for SeekSubscription. + retry (google.api_core.retry.Retry): Designation of what errors, if any, + should be retried. + timeout (float): The timeout for this request. + metadata (Sequence[Tuple[str, str]]): Strings which should be + sent along with the request as metadata. + + Returns: + google.api_core.operation.Operation: + An object representing a long-running operation. + + The result type for the operation will be + :class:`google.cloud.pubsublite_v1.types.SeekSubscriptionResponse` + Response for SeekSubscription long running operation. + + """ + # Create or coerce a protobuf request object. + # Minor optimization to avoid making a copy if the user passes + # in a admin.SeekSubscriptionRequest. + # There's no risk of modifying the input as we've already verified + # there are no flattened fields. + if not isinstance(request, admin.SeekSubscriptionRequest): + request = admin.SeekSubscriptionRequest(request) + + # Wrap the RPC method; this adds retry and timeout information, + # and friendly error handling. + rpc = self._transport._wrapped_methods[self._transport.seek_subscription] + + # Certain fields should be provided within the metadata header; + # add these here. + metadata = tuple(metadata) + ( + gapic_v1.routing_header.to_grpc_metadata((("name", request.name),)), + ) + + # Send the request. + response = rpc(request, retry=retry, timeout=timeout, metadata=metadata,) + + # Wrap the response in an operation future. + response = operation.from_gapic( + response, + self._transport.operations_client, + admin.SeekSubscriptionResponse, + metadata_type=admin.OperationMetadata, + ) + + # Done; return the response. + return response + def create_reservation( self, request: admin.CreateReservationRequest = None, diff --git a/google/cloud/pubsublite_v1/services/admin_service/transports/base.py b/google/cloud/pubsublite_v1/services/admin_service/transports/base.py index 74d8c6a4..15d64407 100644 --- a/google/cloud/pubsublite_v1/services/admin_service/transports/base.py +++ b/google/cloud/pubsublite_v1/services/admin_service/transports/base.py @@ -23,10 +23,13 @@ from google.api_core import exceptions as core_exceptions # type: ignore from google.api_core import gapic_v1 # type: ignore from google.api_core import retry as retries # type: ignore +from google.api_core import operations_v1 # type: ignore from google.auth import credentials as ga_credentials # type: ignore +from google.oauth2 import service_account # type: ignore from google.cloud.pubsublite_v1.types import admin from google.cloud.pubsublite_v1.types import common +from google.longrunning import operations_pb2 # type: ignore from google.protobuf import empty_pb2 # type: ignore try: @@ -47,8 +50,6 @@ except pkg_resources.DistributionNotFound: # pragma: NO COVER _GOOGLE_AUTH_VERSION = None -_API_CORE_VERSION = google.api_core.__version__ - class AdminServiceTransport(abc.ABC): """Abstract transport class for AdminService.""" @@ -66,6 +67,7 @@ def __init__( scopes: Optional[Sequence[str]] = None, quota_project_id: Optional[str] = None, client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, + always_use_jwt_access: Optional[bool] = False, **kwargs, ) -> None: """Instantiate the transport. @@ -89,6 +91,8 @@ def __init__( API requests. If ``None``, then default info will be used. Generally, you only need to set this if you're developing your own client library. + always_use_jwt_access (Optional[bool]): Whether self signed JWT should + be used for service account credentials. """ # Save the hostname. Default to port 443 (HTTPS) if none is specified. if ":" not in host: @@ -98,7 +102,7 @@ def __init__( scopes_kwargs = self._get_scopes_kwargs(self._host, scopes) # Save the scopes. - self._scopes = scopes or self.AUTH_SCOPES + self._scopes = scopes # If no credentials are provided, then determine the appropriate # defaults. @@ -117,13 +121,20 @@ def __init__( **scopes_kwargs, quota_project_id=quota_project_id ) + # If the credentials is service account credentials, then always try to use self signed JWT. + if ( + always_use_jwt_access + and isinstance(credentials, service_account.Credentials) + and hasattr(service_account.Credentials, "with_always_use_jwt_access") + ): + credentials = credentials.with_always_use_jwt_access(True) + # Save the credentials. self._credentials = credentials - # TODO(busunkim): These two class methods are in the base transport + # TODO(busunkim): This method is in the base transport # to avoid duplicating code across the transport classes. These functions - # should be deleted once the minimum required versions of google-api-core - # and google-auth are increased. + # should be deleted once the minimum required versions of google-auth is increased. # TODO: Remove this function once google-auth >= 1.25.0 is required @classmethod @@ -144,27 +155,6 @@ def _get_scopes_kwargs( return scopes_kwargs - # TODO: Remove this function once google-api-core >= 1.26.0 is required - @classmethod - def _get_self_signed_jwt_kwargs( - cls, host: str, scopes: Optional[Sequence[str]] - ) -> Dict[str, Union[Optional[Sequence[str]], str]]: - """Returns kwargs to pass to grpc_helpers.create_channel depending on the google-api-core version""" - - self_signed_jwt_kwargs: Dict[str, Union[Optional[Sequence[str]], str]] = {} - - if _API_CORE_VERSION and ( - packaging.version.parse(_API_CORE_VERSION) - >= packaging.version.parse("1.26.0") - ): - self_signed_jwt_kwargs["default_scopes"] = cls.AUTH_SCOPES - self_signed_jwt_kwargs["scopes"] = scopes - self_signed_jwt_kwargs["default_host"] = cls.DEFAULT_HOST - else: - self_signed_jwt_kwargs["scopes"] = scopes or cls.AUTH_SCOPES - - return self_signed_jwt_kwargs - def _prep_wrapped_messages(self, client_info): # Precompute the wrapped methods. self._wrapped_methods = { @@ -208,6 +198,9 @@ def _prep_wrapped_messages(self, client_info): self.delete_subscription: gapic_v1.method.wrap_method( self.delete_subscription, default_timeout=None, client_info=client_info, ), + self.seek_subscription: gapic_v1.method.wrap_method( + self.seek_subscription, default_timeout=None, client_info=client_info, + ), self.create_reservation: gapic_v1.method.wrap_method( self.create_reservation, default_timeout=None, client_info=client_info, ), @@ -230,6 +223,11 @@ def _prep_wrapped_messages(self, client_info): ), } + @property + def operations_client(self) -> operations_v1.OperationsClient: + """Return the client designed to process long-running operations.""" + raise NotImplementedError() + @property def create_topic( self, @@ -339,6 +337,15 @@ def delete_subscription( ]: raise NotImplementedError() + @property + def seek_subscription( + self, + ) -> Callable[ + [admin.SeekSubscriptionRequest], + Union[operations_pb2.Operation, Awaitable[operations_pb2.Operation]], + ]: + raise NotImplementedError() + @property def create_reservation( self, diff --git a/google/cloud/pubsublite_v1/services/admin_service/transports/grpc.py b/google/cloud/pubsublite_v1/services/admin_service/transports/grpc.py index 1d98d5e9..2c6d34ed 100644 --- a/google/cloud/pubsublite_v1/services/admin_service/transports/grpc.py +++ b/google/cloud/pubsublite_v1/services/admin_service/transports/grpc.py @@ -17,6 +17,7 @@ from typing import Callable, Dict, Optional, Sequence, Tuple, Union from google.api_core import grpc_helpers # type: ignore +from google.api_core import operations_v1 # type: ignore from google.api_core import gapic_v1 # type: ignore import google.auth # type: ignore from google.auth import credentials as ga_credentials # type: ignore @@ -26,6 +27,7 @@ from google.cloud.pubsublite_v1.types import admin from google.cloud.pubsublite_v1.types import common +from google.longrunning import operations_pb2 # type: ignore from google.protobuf import empty_pb2 # type: ignore from .base import AdminServiceTransport, DEFAULT_CLIENT_INFO @@ -61,6 +63,7 @@ def __init__( client_cert_source_for_mtls: Callable[[], Tuple[bytes, bytes]] = None, quota_project_id: Optional[str] = None, client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, + always_use_jwt_access: Optional[bool] = False, ) -> None: """Instantiate the transport. @@ -101,6 +104,8 @@ def __init__( API requests. If ``None``, then default info will be used. Generally, you only need to set this if you're developing your own client library. + always_use_jwt_access (Optional[bool]): Whether self signed JWT should + be used for service account credentials. Raises: google.auth.exceptions.MutualTLSChannelError: If mutual TLS transport @@ -111,6 +116,7 @@ def __init__( self._grpc_channel = None self._ssl_channel_credentials = ssl_channel_credentials self._stubs: Dict[str, Callable] = {} + self._operations_client = None if api_mtls_endpoint: warnings.warn("api_mtls_endpoint is deprecated", DeprecationWarning) @@ -153,6 +159,7 @@ def __init__( scopes=scopes, quota_project_id=quota_project_id, client_info=client_info, + always_use_jwt_access=always_use_jwt_access, ) if not self._grpc_channel: @@ -208,14 +215,14 @@ def create_channel( and ``credentials_file`` are passed. """ - self_signed_jwt_kwargs = cls._get_self_signed_jwt_kwargs(host, scopes) - return grpc_helpers.create_channel( host, credentials=credentials, credentials_file=credentials_file, quota_project_id=quota_project_id, - **self_signed_jwt_kwargs, + default_scopes=cls.AUTH_SCOPES, + scopes=scopes, + default_host=cls.DEFAULT_HOST, **kwargs, ) @@ -225,6 +232,20 @@ def grpc_channel(self) -> grpc.Channel: """ return self._grpc_channel + @property + def operations_client(self) -> operations_v1.OperationsClient: + """Create the client designed to process long-running operations. + + This property caches on the instance; repeated calls return the same + client. + """ + # Sanity check: Only create a new client if we do not already have one. + if self._operations_client is None: + self._operations_client = operations_v1.OperationsClient(self.grpc_channel) + + # Return the client from cache. + return self._operations_client + @property def create_topic(self) -> Callable[[admin.CreateTopicRequest], common.Topic]: r"""Return a callable for the create topic method over gRPC. @@ -534,6 +555,58 @@ def delete_subscription( ) return self._stubs["delete_subscription"] + @property + def seek_subscription( + self, + ) -> Callable[[admin.SeekSubscriptionRequest], operations_pb2.Operation]: + r"""Return a callable for the seek subscription method over gRPC. + + Performs an out-of-band seek for a subscription to a + specified target, which may be timestamps or named + positions within the message backlog. Seek translates + these targets to cursors for each partition and + orchestrates subscribers to start consuming messages + from these seek cursors. + + If an operation is returned, the seek has been + registered and subscribers will eventually receive + messages from the seek cursors (i.e. eventual + consistency), as long as they are using a minimum + supported client library version and not a system that + tracks cursors independently of Pub/Sub Lite (e.g. + Apache Beam, Dataflow, Spark). The seek operation will + fail for unsupported clients. + + If clients would like to know when subscribers react to + the seek (or not), they can poll the operation. The seek + operation will succeed and complete once subscribers are + ready to receive messages from the seek cursors for all + partitions of the topic. This means that the seek + operation will not complete until all subscribers come + online. + + If the previous seek operation has not yet completed, it + will be aborted and the new invocation of seek will + supersede it. + + Returns: + Callable[[~.SeekSubscriptionRequest], + ~.Operation]: + A function that, when called, will call the underlying RPC + on the server. + """ + # Generate a "stub function" on-the-fly which will actually make + # the request. + # gRPC handles serialization and deserialization, so we just need + # to pass in the functions for each. + if "seek_subscription" not in self._stubs: + self._stubs["seek_subscription"] = self.grpc_channel.unary_unary( + "/google.cloud.pubsublite.v1.AdminService/SeekSubscription", + request_serializer=admin.SeekSubscriptionRequest.serialize, + response_deserializer=operations_pb2.Operation.FromString, + ) + return self._stubs["seek_subscription"] + @property def create_reservation( self, diff --git a/google/cloud/pubsublite_v1/services/admin_service/transports/grpc_asyncio.py b/google/cloud/pubsublite_v1/services/admin_service/transports/grpc_asyncio.py index ff9841ee..a1590e0c 100644 --- a/google/cloud/pubsublite_v1/services/admin_service/transports/grpc_asyncio.py +++ b/google/cloud/pubsublite_v1/services/admin_service/transports/grpc_asyncio.py @@ -18,6 +18,7 @@ from google.api_core import gapic_v1 # type: ignore from google.api_core import grpc_helpers_async # type: ignore +from google.api_core import operations_v1 # type: ignore from google.auth import credentials as ga_credentials # type: ignore from google.auth.transport.grpc import SslCredentials # type: ignore import packaging.version @@ -27,6 +28,7 @@ from google.cloud.pubsublite_v1.types import admin from google.cloud.pubsublite_v1.types import common +from google.longrunning import operations_pb2 # type: ignore from google.protobuf import empty_pb2 # type: ignore from .base import AdminServiceTransport, DEFAULT_CLIENT_INFO from .grpc import AdminServiceGrpcTransport @@ -82,14 +84,14 @@ def create_channel( aio.Channel: A gRPC AsyncIO channel object. """ - self_signed_jwt_kwargs = cls._get_self_signed_jwt_kwargs(host, scopes) - return grpc_helpers_async.create_channel( host, credentials=credentials, credentials_file=credentials_file, quota_project_id=quota_project_id, - **self_signed_jwt_kwargs, + default_scopes=cls.AUTH_SCOPES, + scopes=scopes, + default_host=cls.DEFAULT_HOST, **kwargs, ) @@ -107,6 +109,7 @@ def __init__( client_cert_source_for_mtls: Callable[[], Tuple[bytes, bytes]] = None, quota_project_id=None, client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, + always_use_jwt_access: Optional[bool] = False, ) -> None: """Instantiate the transport. @@ -148,6 +151,8 @@ def __init__( API requests. If ``None``, then default info will be used. Generally, you only need to set this if you're developing your own client library. + always_use_jwt_access (Optional[bool]): Whether self signed JWT should + be used for service account credentials. Raises: google.auth.exceptions.MutualTlsChannelError: If mutual TLS transport @@ -158,6 +163,7 @@ def __init__( self._grpc_channel = None self._ssl_channel_credentials = ssl_channel_credentials self._stubs: Dict[str, Callable] = {} + self._operations_client = None if api_mtls_endpoint: warnings.warn("api_mtls_endpoint is deprecated", DeprecationWarning) @@ -199,6 +205,7 @@ def __init__( scopes=scopes, quota_project_id=quota_project_id, client_info=client_info, + always_use_jwt_access=always_use_jwt_access, ) if not self._grpc_channel: @@ -228,6 +235,22 @@ def grpc_channel(self) -> aio.Channel: # Return the channel from cache. return self._grpc_channel + @property + def operations_client(self) -> operations_v1.OperationsAsyncClient: + """Create the client designed to process long-running operations. + + This property caches on the instance; repeated calls return the same + client. + """ + # Sanity check: Only create a new client if we do not already have one. + if self._operations_client is None: + self._operations_client = operations_v1.OperationsAsyncClient( + self.grpc_channel + ) + + # Return the client from cache. + return self._operations_client + @property def create_topic( self, @@ -546,6 +569,58 @@ def delete_subscription( ) return self._stubs["delete_subscription"] + @property + def seek_subscription( + self, + ) -> Callable[[admin.SeekSubscriptionRequest], Awaitable[operations_pb2.Operation]]: + r"""Return a callable for the seek subscription method over gRPC. + + Performs an out-of-band seek for a subscription to a + specified target, which may be timestamps or named + positions within the message backlog. Seek translates + these targets to cursors for each partition and + orchestrates subscribers to start consuming messages + from these seek cursors. + + If an operation is returned, the seek has been + registered and subscribers will eventually receive + messages from the seek cursors (i.e. eventual + consistency), as long as they are using a minimum + supported client library version and not a system that + tracks cursors independently of Pub/Sub Lite (e.g. + Apache Beam, Dataflow, Spark). The seek operation will + fail for unsupported clients. + + If clients would like to know when subscribers react to + the seek (or not), they can poll the operation. The seek + operation will succeed and complete once subscribers are + ready to receive messages from the seek cursors for all + partitions of the topic. This means that the seek + operation will not complete until all subscribers come + online. + + If the previous seek operation has not yet completed, it + will be aborted and the new invocation of seek will + supersede it. + + Returns: + Callable[[~.SeekSubscriptionRequest], + Awaitable[~.Operation]]: + A function that, when called, will call the underlying RPC + on the server. + """ + # Generate a "stub function" on-the-fly which will actually make + # the request. + # gRPC handles serialization and deserialization, so we just need + # to pass in the functions for each. + if "seek_subscription" not in self._stubs: + self._stubs["seek_subscription"] = self.grpc_channel.unary_unary( + "/google.cloud.pubsublite.v1.AdminService/SeekSubscription", + request_serializer=admin.SeekSubscriptionRequest.serialize, + response_deserializer=operations_pb2.Operation.FromString, + ) + return self._stubs["seek_subscription"] + @property def create_reservation( self, diff --git a/google/cloud/pubsublite_v1/services/cursor_service/transports/base.py b/google/cloud/pubsublite_v1/services/cursor_service/transports/base.py index cb025dde..bd867b29 100644 --- a/google/cloud/pubsublite_v1/services/cursor_service/transports/base.py +++ b/google/cloud/pubsublite_v1/services/cursor_service/transports/base.py @@ -24,6 +24,7 @@ from google.api_core import gapic_v1 # type: ignore from google.api_core import retry as retries # type: ignore from google.auth import credentials as ga_credentials # type: ignore +from google.oauth2 import service_account # type: ignore from google.cloud.pubsublite_v1.types import cursor @@ -45,8 +46,6 @@ except pkg_resources.DistributionNotFound: # pragma: NO COVER _GOOGLE_AUTH_VERSION = None -_API_CORE_VERSION = google.api_core.__version__ - class CursorServiceTransport(abc.ABC): """Abstract transport class for CursorService.""" @@ -64,6 +63,7 @@ def __init__( scopes: Optional[Sequence[str]] = None, quota_project_id: Optional[str] = None, client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, + always_use_jwt_access: Optional[bool] = False, **kwargs, ) -> None: """Instantiate the transport. @@ -87,6 +87,8 @@ def __init__( API requests. If ``None``, then default info will be used. Generally, you only need to set this if you're developing your own client library. + always_use_jwt_access (Optional[bool]): Whether self signed JWT should + be used for service account credentials. """ # Save the hostname. Default to port 443 (HTTPS) if none is specified. if ":" not in host: @@ -96,7 +98,7 @@ def __init__( scopes_kwargs = self._get_scopes_kwargs(self._host, scopes) # Save the scopes. - self._scopes = scopes or self.AUTH_SCOPES + self._scopes = scopes # If no credentials are provided, then determine the appropriate # defaults. @@ -115,13 +117,20 @@ def __init__( **scopes_kwargs, quota_project_id=quota_project_id ) + # If the credentials is service account credentials, then always try to use self signed JWT. + if ( + always_use_jwt_access + and isinstance(credentials, service_account.Credentials) + and hasattr(service_account.Credentials, "with_always_use_jwt_access") + ): + credentials = credentials.with_always_use_jwt_access(True) + # Save the credentials. self._credentials = credentials - # TODO(busunkim): These two class methods are in the base transport + # TODO(busunkim): This method is in the base transport # to avoid duplicating code across the transport classes. These functions - # should be deleted once the minimum required versions of google-api-core - # and google-auth are increased. + # should be deleted once the minimum required versions of google-auth is increased. # TODO: Remove this function once google-auth >= 1.25.0 is required @classmethod @@ -142,27 +151,6 @@ def _get_scopes_kwargs( return scopes_kwargs - # TODO: Remove this function once google-api-core >= 1.26.0 is required - @classmethod - def _get_self_signed_jwt_kwargs( - cls, host: str, scopes: Optional[Sequence[str]] - ) -> Dict[str, Union[Optional[Sequence[str]], str]]: - """Returns kwargs to pass to grpc_helpers.create_channel depending on the google-api-core version""" - - self_signed_jwt_kwargs: Dict[str, Union[Optional[Sequence[str]], str]] = {} - - if _API_CORE_VERSION and ( - packaging.version.parse(_API_CORE_VERSION) - >= packaging.version.parse("1.26.0") - ): - self_signed_jwt_kwargs["default_scopes"] = cls.AUTH_SCOPES - self_signed_jwt_kwargs["scopes"] = scopes - self_signed_jwt_kwargs["default_host"] = cls.DEFAULT_HOST - else: - self_signed_jwt_kwargs["scopes"] = scopes or cls.AUTH_SCOPES - - return self_signed_jwt_kwargs - def _prep_wrapped_messages(self, client_info): # Precompute the wrapped methods. self._wrapped_methods = { diff --git a/google/cloud/pubsublite_v1/services/cursor_service/transports/grpc.py b/google/cloud/pubsublite_v1/services/cursor_service/transports/grpc.py index 4fcb9e8e..5e75e176 100644 --- a/google/cloud/pubsublite_v1/services/cursor_service/transports/grpc.py +++ b/google/cloud/pubsublite_v1/services/cursor_service/transports/grpc.py @@ -60,6 +60,7 @@ def __init__( client_cert_source_for_mtls: Callable[[], Tuple[bytes, bytes]] = None, quota_project_id: Optional[str] = None, client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, + always_use_jwt_access: Optional[bool] = False, ) -> None: """Instantiate the transport. @@ -100,6 +101,8 @@ def __init__( API requests. If ``None``, then default info will be used. Generally, you only need to set this if you're developing your own client library. + always_use_jwt_access (Optional[bool]): Whether self signed JWT should + be used for service account credentials. Raises: google.auth.exceptions.MutualTLSChannelError: If mutual TLS transport @@ -152,6 +155,7 @@ def __init__( scopes=scopes, quota_project_id=quota_project_id, client_info=client_info, + always_use_jwt_access=always_use_jwt_access, ) if not self._grpc_channel: @@ -207,14 +211,14 @@ def create_channel( and ``credentials_file`` are passed. """ - self_signed_jwt_kwargs = cls._get_self_signed_jwt_kwargs(host, scopes) - return grpc_helpers.create_channel( host, credentials=credentials, credentials_file=credentials_file, quota_project_id=quota_project_id, - **self_signed_jwt_kwargs, + default_scopes=cls.AUTH_SCOPES, + scopes=scopes, + default_host=cls.DEFAULT_HOST, **kwargs, ) diff --git a/google/cloud/pubsublite_v1/services/cursor_service/transports/grpc_asyncio.py b/google/cloud/pubsublite_v1/services/cursor_service/transports/grpc_asyncio.py index a5d385c8..4d6bf39a 100644 --- a/google/cloud/pubsublite_v1/services/cursor_service/transports/grpc_asyncio.py +++ b/google/cloud/pubsublite_v1/services/cursor_service/transports/grpc_asyncio.py @@ -81,14 +81,14 @@ def create_channel( aio.Channel: A gRPC AsyncIO channel object. """ - self_signed_jwt_kwargs = cls._get_self_signed_jwt_kwargs(host, scopes) - return grpc_helpers_async.create_channel( host, credentials=credentials, credentials_file=credentials_file, quota_project_id=quota_project_id, - **self_signed_jwt_kwargs, + default_scopes=cls.AUTH_SCOPES, + scopes=scopes, + default_host=cls.DEFAULT_HOST, **kwargs, ) @@ -106,6 +106,7 @@ def __init__( client_cert_source_for_mtls: Callable[[], Tuple[bytes, bytes]] = None, quota_project_id=None, client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, + always_use_jwt_access: Optional[bool] = False, ) -> None: """Instantiate the transport. @@ -147,6 +148,8 @@ def __init__( API requests. If ``None``, then default info will be used. Generally, you only need to set this if you're developing your own client library. + always_use_jwt_access (Optional[bool]): Whether self signed JWT should + be used for service account credentials. Raises: google.auth.exceptions.MutualTlsChannelError: If mutual TLS transport @@ -198,6 +201,7 @@ def __init__( scopes=scopes, quota_project_id=quota_project_id, client_info=client_info, + always_use_jwt_access=always_use_jwt_access, ) if not self._grpc_channel: diff --git a/google/cloud/pubsublite_v1/services/partition_assignment_service/transports/base.py b/google/cloud/pubsublite_v1/services/partition_assignment_service/transports/base.py index 652fbed1..3f1d6eff 100644 --- a/google/cloud/pubsublite_v1/services/partition_assignment_service/transports/base.py +++ b/google/cloud/pubsublite_v1/services/partition_assignment_service/transports/base.py @@ -24,6 +24,7 @@ from google.api_core import gapic_v1 # type: ignore from google.api_core import retry as retries # type: ignore from google.auth import credentials as ga_credentials # type: ignore +from google.oauth2 import service_account # type: ignore from google.cloud.pubsublite_v1.types import subscriber @@ -45,8 +46,6 @@ except pkg_resources.DistributionNotFound: # pragma: NO COVER _GOOGLE_AUTH_VERSION = None -_API_CORE_VERSION = google.api_core.__version__ - class PartitionAssignmentServiceTransport(abc.ABC): """Abstract transport class for PartitionAssignmentService.""" @@ -64,6 +63,7 @@ def __init__( scopes: Optional[Sequence[str]] = None, quota_project_id: Optional[str] = None, client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, + always_use_jwt_access: Optional[bool] = False, **kwargs, ) -> None: """Instantiate the transport. @@ -87,6 +87,8 @@ def __init__( API requests. If ``None``, then default info will be used. Generally, you only need to set this if you're developing your own client library. + always_use_jwt_access (Optional[bool]): Whether self signed JWT should + be used for service account credentials. """ # Save the hostname. Default to port 443 (HTTPS) if none is specified. if ":" not in host: @@ -96,7 +98,7 @@ def __init__( scopes_kwargs = self._get_scopes_kwargs(self._host, scopes) # Save the scopes. - self._scopes = scopes or self.AUTH_SCOPES + self._scopes = scopes # If no credentials are provided, then determine the appropriate # defaults. @@ -115,13 +117,20 @@ def __init__( **scopes_kwargs, quota_project_id=quota_project_id ) + # If the credentials is service account credentials, then always try to use self signed JWT. + if ( + always_use_jwt_access + and isinstance(credentials, service_account.Credentials) + and hasattr(service_account.Credentials, "with_always_use_jwt_access") + ): + credentials = credentials.with_always_use_jwt_access(True) + # Save the credentials. self._credentials = credentials - # TODO(busunkim): These two class methods are in the base transport + # TODO(busunkim): This method is in the base transport # to avoid duplicating code across the transport classes. These functions - # should be deleted once the minimum required versions of google-api-core - # and google-auth are increased. + # should be deleted once the minimum required versions of google-auth is increased. # TODO: Remove this function once google-auth >= 1.25.0 is required @classmethod @@ -142,27 +151,6 @@ def _get_scopes_kwargs( return scopes_kwargs - # TODO: Remove this function once google-api-core >= 1.26.0 is required - @classmethod - def _get_self_signed_jwt_kwargs( - cls, host: str, scopes: Optional[Sequence[str]] - ) -> Dict[str, Union[Optional[Sequence[str]], str]]: - """Returns kwargs to pass to grpc_helpers.create_channel depending on the google-api-core version""" - - self_signed_jwt_kwargs: Dict[str, Union[Optional[Sequence[str]], str]] = {} - - if _API_CORE_VERSION and ( - packaging.version.parse(_API_CORE_VERSION) - >= packaging.version.parse("1.26.0") - ): - self_signed_jwt_kwargs["default_scopes"] = cls.AUTH_SCOPES - self_signed_jwt_kwargs["scopes"] = scopes - self_signed_jwt_kwargs["default_host"] = cls.DEFAULT_HOST - else: - self_signed_jwt_kwargs["scopes"] = scopes or cls.AUTH_SCOPES - - return self_signed_jwt_kwargs - def _prep_wrapped_messages(self, client_info): # Precompute the wrapped methods. self._wrapped_methods = { diff --git a/google/cloud/pubsublite_v1/services/partition_assignment_service/transports/grpc.py b/google/cloud/pubsublite_v1/services/partition_assignment_service/transports/grpc.py index 62508860..17f1517c 100644 --- a/google/cloud/pubsublite_v1/services/partition_assignment_service/transports/grpc.py +++ b/google/cloud/pubsublite_v1/services/partition_assignment_service/transports/grpc.py @@ -58,6 +58,7 @@ def __init__( client_cert_source_for_mtls: Callable[[], Tuple[bytes, bytes]] = None, quota_project_id: Optional[str] = None, client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, + always_use_jwt_access: Optional[bool] = False, ) -> None: """Instantiate the transport. @@ -98,6 +99,8 @@ def __init__( API requests. If ``None``, then default info will be used. Generally, you only need to set this if you're developing your own client library. + always_use_jwt_access (Optional[bool]): Whether self signed JWT should + be used for service account credentials. Raises: google.auth.exceptions.MutualTLSChannelError: If mutual TLS transport @@ -150,6 +153,7 @@ def __init__( scopes=scopes, quota_project_id=quota_project_id, client_info=client_info, + always_use_jwt_access=always_use_jwt_access, ) if not self._grpc_channel: @@ -205,14 +209,14 @@ def create_channel( and ``credentials_file`` are passed. """ - self_signed_jwt_kwargs = cls._get_self_signed_jwt_kwargs(host, scopes) - return grpc_helpers.create_channel( host, credentials=credentials, credentials_file=credentials_file, quota_project_id=quota_project_id, - **self_signed_jwt_kwargs, + default_scopes=cls.AUTH_SCOPES, + scopes=scopes, + default_host=cls.DEFAULT_HOST, **kwargs, ) diff --git a/google/cloud/pubsublite_v1/services/partition_assignment_service/transports/grpc_asyncio.py b/google/cloud/pubsublite_v1/services/partition_assignment_service/transports/grpc_asyncio.py index 0010ee19..61c01e87 100644 --- a/google/cloud/pubsublite_v1/services/partition_assignment_service/transports/grpc_asyncio.py +++ b/google/cloud/pubsublite_v1/services/partition_assignment_service/transports/grpc_asyncio.py @@ -81,14 +81,14 @@ def create_channel( aio.Channel: A gRPC AsyncIO channel object. """ - self_signed_jwt_kwargs = cls._get_self_signed_jwt_kwargs(host, scopes) - return grpc_helpers_async.create_channel( host, credentials=credentials, credentials_file=credentials_file, quota_project_id=quota_project_id, - **self_signed_jwt_kwargs, + default_scopes=cls.AUTH_SCOPES, + scopes=scopes, + default_host=cls.DEFAULT_HOST, **kwargs, ) @@ -106,6 +106,7 @@ def __init__( client_cert_source_for_mtls: Callable[[], Tuple[bytes, bytes]] = None, quota_project_id=None, client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, + always_use_jwt_access: Optional[bool] = False, ) -> None: """Instantiate the transport. @@ -147,6 +148,8 @@ def __init__( API requests. If ``None``, then default info will be used. Generally, you only need to set this if you're developing your own client library. + always_use_jwt_access (Optional[bool]): Whether self signed JWT should + be used for service account credentials. Raises: google.auth.exceptions.MutualTlsChannelError: If mutual TLS transport @@ -198,6 +201,7 @@ def __init__( scopes=scopes, quota_project_id=quota_project_id, client_info=client_info, + always_use_jwt_access=always_use_jwt_access, ) if not self._grpc_channel: diff --git a/google/cloud/pubsublite_v1/services/publisher_service/transports/base.py b/google/cloud/pubsublite_v1/services/publisher_service/transports/base.py index e8421155..80e52699 100644 --- a/google/cloud/pubsublite_v1/services/publisher_service/transports/base.py +++ b/google/cloud/pubsublite_v1/services/publisher_service/transports/base.py @@ -24,6 +24,7 @@ from google.api_core import gapic_v1 # type: ignore from google.api_core import retry as retries # type: ignore from google.auth import credentials as ga_credentials # type: ignore +from google.oauth2 import service_account # type: ignore from google.cloud.pubsublite_v1.types import publisher @@ -45,8 +46,6 @@ except pkg_resources.DistributionNotFound: # pragma: NO COVER _GOOGLE_AUTH_VERSION = None -_API_CORE_VERSION = google.api_core.__version__ - class PublisherServiceTransport(abc.ABC): """Abstract transport class for PublisherService.""" @@ -64,6 +63,7 @@ def __init__( scopes: Optional[Sequence[str]] = None, quota_project_id: Optional[str] = None, client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, + always_use_jwt_access: Optional[bool] = False, **kwargs, ) -> None: """Instantiate the transport. @@ -87,6 +87,8 @@ def __init__( API requests. If ``None``, then default info will be used. Generally, you only need to set this if you're developing your own client library. + always_use_jwt_access (Optional[bool]): Whether self signed JWT should + be used for service account credentials. """ # Save the hostname. Default to port 443 (HTTPS) if none is specified. if ":" not in host: @@ -96,7 +98,7 @@ def __init__( scopes_kwargs = self._get_scopes_kwargs(self._host, scopes) # Save the scopes. - self._scopes = scopes or self.AUTH_SCOPES + self._scopes = scopes # If no credentials are provided, then determine the appropriate # defaults. @@ -115,13 +117,20 @@ def __init__( **scopes_kwargs, quota_project_id=quota_project_id ) + # If the credentials is service account credentials, then always try to use self signed JWT. + if ( + always_use_jwt_access + and isinstance(credentials, service_account.Credentials) + and hasattr(service_account.Credentials, "with_always_use_jwt_access") + ): + credentials = credentials.with_always_use_jwt_access(True) + # Save the credentials. self._credentials = credentials - # TODO(busunkim): These two class methods are in the base transport + # TODO(busunkim): This method is in the base transport # to avoid duplicating code across the transport classes. These functions - # should be deleted once the minimum required versions of google-api-core - # and google-auth are increased. + # should be deleted once the minimum required versions of google-auth is increased. # TODO: Remove this function once google-auth >= 1.25.0 is required @classmethod @@ -142,27 +151,6 @@ def _get_scopes_kwargs( return scopes_kwargs - # TODO: Remove this function once google-api-core >= 1.26.0 is required - @classmethod - def _get_self_signed_jwt_kwargs( - cls, host: str, scopes: Optional[Sequence[str]] - ) -> Dict[str, Union[Optional[Sequence[str]], str]]: - """Returns kwargs to pass to grpc_helpers.create_channel depending on the google-api-core version""" - - self_signed_jwt_kwargs: Dict[str, Union[Optional[Sequence[str]], str]] = {} - - if _API_CORE_VERSION and ( - packaging.version.parse(_API_CORE_VERSION) - >= packaging.version.parse("1.26.0") - ): - self_signed_jwt_kwargs["default_scopes"] = cls.AUTH_SCOPES - self_signed_jwt_kwargs["scopes"] = scopes - self_signed_jwt_kwargs["default_host"] = cls.DEFAULT_HOST - else: - self_signed_jwt_kwargs["scopes"] = scopes or cls.AUTH_SCOPES - - return self_signed_jwt_kwargs - def _prep_wrapped_messages(self, client_info): # Precompute the wrapped methods. self._wrapped_methods = { diff --git a/google/cloud/pubsublite_v1/services/publisher_service/transports/grpc.py b/google/cloud/pubsublite_v1/services/publisher_service/transports/grpc.py index 2fef8410..906ff27b 100644 --- a/google/cloud/pubsublite_v1/services/publisher_service/transports/grpc.py +++ b/google/cloud/pubsublite_v1/services/publisher_service/transports/grpc.py @@ -61,6 +61,7 @@ def __init__( client_cert_source_for_mtls: Callable[[], Tuple[bytes, bytes]] = None, quota_project_id: Optional[str] = None, client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, + always_use_jwt_access: Optional[bool] = False, ) -> None: """Instantiate the transport. @@ -101,6 +102,8 @@ def __init__( API requests. If ``None``, then default info will be used. Generally, you only need to set this if you're developing your own client library. + always_use_jwt_access (Optional[bool]): Whether self signed JWT should + be used for service account credentials. Raises: google.auth.exceptions.MutualTLSChannelError: If mutual TLS transport @@ -153,6 +156,7 @@ def __init__( scopes=scopes, quota_project_id=quota_project_id, client_info=client_info, + always_use_jwt_access=always_use_jwt_access, ) if not self._grpc_channel: @@ -208,14 +212,14 @@ def create_channel( and ``credentials_file`` are passed. """ - self_signed_jwt_kwargs = cls._get_self_signed_jwt_kwargs(host, scopes) - return grpc_helpers.create_channel( host, credentials=credentials, credentials_file=credentials_file, quota_project_id=quota_project_id, - **self_signed_jwt_kwargs, + default_scopes=cls.AUTH_SCOPES, + scopes=scopes, + default_host=cls.DEFAULT_HOST, **kwargs, ) diff --git a/google/cloud/pubsublite_v1/services/publisher_service/transports/grpc_asyncio.py b/google/cloud/pubsublite_v1/services/publisher_service/transports/grpc_asyncio.py index 1c24ed7a..e2e64c94 100644 --- a/google/cloud/pubsublite_v1/services/publisher_service/transports/grpc_asyncio.py +++ b/google/cloud/pubsublite_v1/services/publisher_service/transports/grpc_asyncio.py @@ -82,14 +82,14 @@ def create_channel( aio.Channel: A gRPC AsyncIO channel object. """ - self_signed_jwt_kwargs = cls._get_self_signed_jwt_kwargs(host, scopes) - return grpc_helpers_async.create_channel( host, credentials=credentials, credentials_file=credentials_file, quota_project_id=quota_project_id, - **self_signed_jwt_kwargs, + default_scopes=cls.AUTH_SCOPES, + scopes=scopes, + default_host=cls.DEFAULT_HOST, **kwargs, ) @@ -107,6 +107,7 @@ def __init__( client_cert_source_for_mtls: Callable[[], Tuple[bytes, bytes]] = None, quota_project_id=None, client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, + always_use_jwt_access: Optional[bool] = False, ) -> None: """Instantiate the transport. @@ -148,6 +149,8 @@ def __init__( API requests. If ``None``, then default info will be used. Generally, you only need to set this if you're developing your own client library. + always_use_jwt_access (Optional[bool]): Whether self signed JWT should + be used for service account credentials. Raises: google.auth.exceptions.MutualTlsChannelError: If mutual TLS transport @@ -199,6 +202,7 @@ def __init__( scopes=scopes, quota_project_id=quota_project_id, client_info=client_info, + always_use_jwt_access=always_use_jwt_access, ) if not self._grpc_channel: diff --git a/google/cloud/pubsublite_v1/services/subscriber_service/transports/base.py b/google/cloud/pubsublite_v1/services/subscriber_service/transports/base.py index 5a06ac1e..bfe6b48e 100644 --- a/google/cloud/pubsublite_v1/services/subscriber_service/transports/base.py +++ b/google/cloud/pubsublite_v1/services/subscriber_service/transports/base.py @@ -24,6 +24,7 @@ from google.api_core import gapic_v1 # type: ignore from google.api_core import retry as retries # type: ignore from google.auth import credentials as ga_credentials # type: ignore +from google.oauth2 import service_account # type: ignore from google.cloud.pubsublite_v1.types import subscriber @@ -45,8 +46,6 @@ except pkg_resources.DistributionNotFound: # pragma: NO COVER _GOOGLE_AUTH_VERSION = None -_API_CORE_VERSION = google.api_core.__version__ - class SubscriberServiceTransport(abc.ABC): """Abstract transport class for SubscriberService.""" @@ -64,6 +63,7 @@ def __init__( scopes: Optional[Sequence[str]] = None, quota_project_id: Optional[str] = None, client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, + always_use_jwt_access: Optional[bool] = False, **kwargs, ) -> None: """Instantiate the transport. @@ -87,6 +87,8 @@ def __init__( API requests. If ``None``, then default info will be used. Generally, you only need to set this if you're developing your own client library. + always_use_jwt_access (Optional[bool]): Whether self signed JWT should + be used for service account credentials. """ # Save the hostname. Default to port 443 (HTTPS) if none is specified. if ":" not in host: @@ -96,7 +98,7 @@ def __init__( scopes_kwargs = self._get_scopes_kwargs(self._host, scopes) # Save the scopes. - self._scopes = scopes or self.AUTH_SCOPES + self._scopes = scopes # If no credentials are provided, then determine the appropriate # defaults. @@ -115,13 +117,20 @@ def __init__( **scopes_kwargs, quota_project_id=quota_project_id ) + # If the credentials is service account credentials, then always try to use self signed JWT. + if ( + always_use_jwt_access + and isinstance(credentials, service_account.Credentials) + and hasattr(service_account.Credentials, "with_always_use_jwt_access") + ): + credentials = credentials.with_always_use_jwt_access(True) + # Save the credentials. self._credentials = credentials - # TODO(busunkim): These two class methods are in the base transport + # TODO(busunkim): This method is in the base transport # to avoid duplicating code across the transport classes. These functions - # should be deleted once the minimum required versions of google-api-core - # and google-auth are increased. + # should be deleted once the minimum required versions of google-auth is increased. # TODO: Remove this function once google-auth >= 1.25.0 is required @classmethod @@ -142,27 +151,6 @@ def _get_scopes_kwargs( return scopes_kwargs - # TODO: Remove this function once google-api-core >= 1.26.0 is required - @classmethod - def _get_self_signed_jwt_kwargs( - cls, host: str, scopes: Optional[Sequence[str]] - ) -> Dict[str, Union[Optional[Sequence[str]], str]]: - """Returns kwargs to pass to grpc_helpers.create_channel depending on the google-api-core version""" - - self_signed_jwt_kwargs: Dict[str, Union[Optional[Sequence[str]], str]] = {} - - if _API_CORE_VERSION and ( - packaging.version.parse(_API_CORE_VERSION) - >= packaging.version.parse("1.26.0") - ): - self_signed_jwt_kwargs["default_scopes"] = cls.AUTH_SCOPES - self_signed_jwt_kwargs["scopes"] = scopes - self_signed_jwt_kwargs["default_host"] = cls.DEFAULT_HOST - else: - self_signed_jwt_kwargs["scopes"] = scopes or cls.AUTH_SCOPES - - return self_signed_jwt_kwargs - def _prep_wrapped_messages(self, client_info): # Precompute the wrapped methods. self._wrapped_methods = { diff --git a/google/cloud/pubsublite_v1/services/subscriber_service/transports/grpc.py b/google/cloud/pubsublite_v1/services/subscriber_service/transports/grpc.py index 7a4b8c97..0a6f0194 100644 --- a/google/cloud/pubsublite_v1/services/subscriber_service/transports/grpc.py +++ b/google/cloud/pubsublite_v1/services/subscriber_service/transports/grpc.py @@ -58,6 +58,7 @@ def __init__( client_cert_source_for_mtls: Callable[[], Tuple[bytes, bytes]] = None, quota_project_id: Optional[str] = None, client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, + always_use_jwt_access: Optional[bool] = False, ) -> None: """Instantiate the transport. @@ -98,6 +99,8 @@ def __init__( API requests. If ``None``, then default info will be used. Generally, you only need to set this if you're developing your own client library. + always_use_jwt_access (Optional[bool]): Whether self signed JWT should + be used for service account credentials. Raises: google.auth.exceptions.MutualTLSChannelError: If mutual TLS transport @@ -150,6 +153,7 @@ def __init__( scopes=scopes, quota_project_id=quota_project_id, client_info=client_info, + always_use_jwt_access=always_use_jwt_access, ) if not self._grpc_channel: @@ -205,14 +209,14 @@ def create_channel( and ``credentials_file`` are passed. """ - self_signed_jwt_kwargs = cls._get_self_signed_jwt_kwargs(host, scopes) - return grpc_helpers.create_channel( host, credentials=credentials, credentials_file=credentials_file, quota_project_id=quota_project_id, - **self_signed_jwt_kwargs, + default_scopes=cls.AUTH_SCOPES, + scopes=scopes, + default_host=cls.DEFAULT_HOST, **kwargs, ) diff --git a/google/cloud/pubsublite_v1/services/subscriber_service/transports/grpc_asyncio.py b/google/cloud/pubsublite_v1/services/subscriber_service/transports/grpc_asyncio.py index 915ca112..290b854d 100644 --- a/google/cloud/pubsublite_v1/services/subscriber_service/transports/grpc_asyncio.py +++ b/google/cloud/pubsublite_v1/services/subscriber_service/transports/grpc_asyncio.py @@ -79,14 +79,14 @@ def create_channel( aio.Channel: A gRPC AsyncIO channel object. """ - self_signed_jwt_kwargs = cls._get_self_signed_jwt_kwargs(host, scopes) - return grpc_helpers_async.create_channel( host, credentials=credentials, credentials_file=credentials_file, quota_project_id=quota_project_id, - **self_signed_jwt_kwargs, + default_scopes=cls.AUTH_SCOPES, + scopes=scopes, + default_host=cls.DEFAULT_HOST, **kwargs, ) @@ -104,6 +104,7 @@ def __init__( client_cert_source_for_mtls: Callable[[], Tuple[bytes, bytes]] = None, quota_project_id=None, client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, + always_use_jwt_access: Optional[bool] = False, ) -> None: """Instantiate the transport. @@ -145,6 +146,8 @@ def __init__( API requests. If ``None``, then default info will be used. Generally, you only need to set this if you're developing your own client library. + always_use_jwt_access (Optional[bool]): Whether self signed JWT should + be used for service account credentials. Raises: google.auth.exceptions.MutualTlsChannelError: If mutual TLS transport @@ -196,6 +199,7 @@ def __init__( scopes=scopes, quota_project_id=quota_project_id, client_info=client_info, + always_use_jwt_access=always_use_jwt_access, ) if not self._grpc_channel: diff --git a/google/cloud/pubsublite_v1/services/topic_stats_service/transports/base.py b/google/cloud/pubsublite_v1/services/topic_stats_service/transports/base.py index 62e99490..e150f25c 100644 --- a/google/cloud/pubsublite_v1/services/topic_stats_service/transports/base.py +++ b/google/cloud/pubsublite_v1/services/topic_stats_service/transports/base.py @@ -24,6 +24,7 @@ from google.api_core import gapic_v1 # type: ignore from google.api_core import retry as retries # type: ignore from google.auth import credentials as ga_credentials # type: ignore +from google.oauth2 import service_account # type: ignore from google.cloud.pubsublite_v1.types import topic_stats @@ -45,8 +46,6 @@ except pkg_resources.DistributionNotFound: # pragma: NO COVER _GOOGLE_AUTH_VERSION = None -_API_CORE_VERSION = google.api_core.__version__ - class TopicStatsServiceTransport(abc.ABC): """Abstract transport class for TopicStatsService.""" @@ -64,6 +63,7 @@ def __init__( scopes: Optional[Sequence[str]] = None, quota_project_id: Optional[str] = None, client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, + always_use_jwt_access: Optional[bool] = False, **kwargs, ) -> None: """Instantiate the transport. @@ -87,6 +87,8 @@ def __init__( API requests. If ``None``, then default info will be used. Generally, you only need to set this if you're developing your own client library. + always_use_jwt_access (Optional[bool]): Whether self signed JWT should + be used for service account credentials. """ # Save the hostname. Default to port 443 (HTTPS) if none is specified. if ":" not in host: @@ -96,7 +98,7 @@ def __init__( scopes_kwargs = self._get_scopes_kwargs(self._host, scopes) # Save the scopes. - self._scopes = scopes or self.AUTH_SCOPES + self._scopes = scopes # If no credentials are provided, then determine the appropriate # defaults. @@ -115,13 +117,20 @@ def __init__( **scopes_kwargs, quota_project_id=quota_project_id ) + # If the credentials is service account credentials, then always try to use self signed JWT. + if ( + always_use_jwt_access + and isinstance(credentials, service_account.Credentials) + and hasattr(service_account.Credentials, "with_always_use_jwt_access") + ): + credentials = credentials.with_always_use_jwt_access(True) + # Save the credentials. self._credentials = credentials - # TODO(busunkim): These two class methods are in the base transport + # TODO(busunkim): This method is in the base transport # to avoid duplicating code across the transport classes. These functions - # should be deleted once the minimum required versions of google-api-core - # and google-auth are increased. + # should be deleted once the minimum required versions of google-auth is increased. # TODO: Remove this function once google-auth >= 1.25.0 is required @classmethod @@ -142,27 +151,6 @@ def _get_scopes_kwargs( return scopes_kwargs - # TODO: Remove this function once google-api-core >= 1.26.0 is required - @classmethod - def _get_self_signed_jwt_kwargs( - cls, host: str, scopes: Optional[Sequence[str]] - ) -> Dict[str, Union[Optional[Sequence[str]], str]]: - """Returns kwargs to pass to grpc_helpers.create_channel depending on the google-api-core version""" - - self_signed_jwt_kwargs: Dict[str, Union[Optional[Sequence[str]], str]] = {} - - if _API_CORE_VERSION and ( - packaging.version.parse(_API_CORE_VERSION) - >= packaging.version.parse("1.26.0") - ): - self_signed_jwt_kwargs["default_scopes"] = cls.AUTH_SCOPES - self_signed_jwt_kwargs["scopes"] = scopes - self_signed_jwt_kwargs["default_host"] = cls.DEFAULT_HOST - else: - self_signed_jwt_kwargs["scopes"] = scopes or cls.AUTH_SCOPES - - return self_signed_jwt_kwargs - def _prep_wrapped_messages(self, client_info): # Precompute the wrapped methods. self._wrapped_methods = { diff --git a/google/cloud/pubsublite_v1/services/topic_stats_service/transports/grpc.py b/google/cloud/pubsublite_v1/services/topic_stats_service/transports/grpc.py index 1dfa633f..15f8f7ae 100644 --- a/google/cloud/pubsublite_v1/services/topic_stats_service/transports/grpc.py +++ b/google/cloud/pubsublite_v1/services/topic_stats_service/transports/grpc.py @@ -58,6 +58,7 @@ def __init__( client_cert_source_for_mtls: Callable[[], Tuple[bytes, bytes]] = None, quota_project_id: Optional[str] = None, client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, + always_use_jwt_access: Optional[bool] = False, ) -> None: """Instantiate the transport. @@ -98,6 +99,8 @@ def __init__( API requests. If ``None``, then default info will be used. Generally, you only need to set this if you're developing your own client library. + always_use_jwt_access (Optional[bool]): Whether self signed JWT should + be used for service account credentials. Raises: google.auth.exceptions.MutualTLSChannelError: If mutual TLS transport @@ -150,6 +153,7 @@ def __init__( scopes=scopes, quota_project_id=quota_project_id, client_info=client_info, + always_use_jwt_access=always_use_jwt_access, ) if not self._grpc_channel: @@ -205,14 +209,14 @@ def create_channel( and ``credentials_file`` are passed. """ - self_signed_jwt_kwargs = cls._get_self_signed_jwt_kwargs(host, scopes) - return grpc_helpers.create_channel( host, credentials=credentials, credentials_file=credentials_file, quota_project_id=quota_project_id, - **self_signed_jwt_kwargs, + default_scopes=cls.AUTH_SCOPES, + scopes=scopes, + default_host=cls.DEFAULT_HOST, **kwargs, ) diff --git a/google/cloud/pubsublite_v1/services/topic_stats_service/transports/grpc_asyncio.py b/google/cloud/pubsublite_v1/services/topic_stats_service/transports/grpc_asyncio.py index 78b82f2c..3e5e9aeb 100644 --- a/google/cloud/pubsublite_v1/services/topic_stats_service/transports/grpc_asyncio.py +++ b/google/cloud/pubsublite_v1/services/topic_stats_service/transports/grpc_asyncio.py @@ -79,14 +79,14 @@ def create_channel( aio.Channel: A gRPC AsyncIO channel object. """ - self_signed_jwt_kwargs = cls._get_self_signed_jwt_kwargs(host, scopes) - return grpc_helpers_async.create_channel( host, credentials=credentials, credentials_file=credentials_file, quota_project_id=quota_project_id, - **self_signed_jwt_kwargs, + default_scopes=cls.AUTH_SCOPES, + scopes=scopes, + default_host=cls.DEFAULT_HOST, **kwargs, ) @@ -104,6 +104,7 @@ def __init__( client_cert_source_for_mtls: Callable[[], Tuple[bytes, bytes]] = None, quota_project_id=None, client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, + always_use_jwt_access: Optional[bool] = False, ) -> None: """Instantiate the transport. @@ -145,6 +146,8 @@ def __init__( API requests. If ``None``, then default info will be used. Generally, you only need to set this if you're developing your own client library. + always_use_jwt_access (Optional[bool]): Whether self signed JWT should + be used for service account credentials. Raises: google.auth.exceptions.MutualTlsChannelError: If mutual TLS transport @@ -196,6 +199,7 @@ def __init__( scopes=scopes, quota_project_id=quota_project_id, client_info=client_info, + always_use_jwt_access=always_use_jwt_access, ) if not self._grpc_channel: diff --git a/google/cloud/pubsublite_v1/types/__init__.py b/google/cloud/pubsublite_v1/types/__init__.py index 56e6353e..3ea25f36 100644 --- a/google/cloud/pubsublite_v1/types/__init__.py +++ b/google/cloud/pubsublite_v1/types/__init__.py @@ -34,6 +34,9 @@ ListTopicsResponse, ListTopicSubscriptionsRequest, ListTopicSubscriptionsResponse, + OperationMetadata, + SeekSubscriptionRequest, + SeekSubscriptionResponse, TopicPartitions, UpdateReservationRequest, UpdateSubscriptionRequest, @@ -114,6 +117,9 @@ "ListTopicsResponse", "ListTopicSubscriptionsRequest", "ListTopicSubscriptionsResponse", + "OperationMetadata", + "SeekSubscriptionRequest", + "SeekSubscriptionResponse", "TopicPartitions", "UpdateReservationRequest", "UpdateSubscriptionRequest", diff --git a/google/cloud/pubsublite_v1/types/admin.py b/google/cloud/pubsublite_v1/types/admin.py index 35edd29e..140b0b9c 100644 --- a/google/cloud/pubsublite_v1/types/admin.py +++ b/google/cloud/pubsublite_v1/types/admin.py @@ -17,6 +17,7 @@ from google.cloud.pubsublite_v1.types import common from google.protobuf import field_mask_pb2 # type: ignore +from google.protobuf import timestamp_pb2 # type: ignore __protobuf__ = proto.module( @@ -38,6 +39,9 @@ "ListSubscriptionsResponse", "UpdateSubscriptionRequest", "DeleteSubscriptionRequest", + "SeekSubscriptionRequest", + "SeekSubscriptionResponse", + "OperationMetadata", "CreateReservationRequest", "GetReservationRequest", "ListReservationsRequest", @@ -341,6 +345,63 @@ class DeleteSubscriptionRequest(proto.Message): name = proto.Field(proto.STRING, number=1,) +class SeekSubscriptionRequest(proto.Message): + r"""Request for SeekSubscription. + Attributes: + name (str): + Required. The name of the subscription to + seek. + named_target (google.cloud.pubsublite_v1.types.SeekSubscriptionRequest.NamedTarget): + Seek to a named position with respect to the + message backlog. + time_target (google.cloud.pubsublite_v1.types.TimeTarget): + Seek to the first message whose publish or + event time is greater than or equal to the + specified query time. If no such message can be + located, will seek to the end of the message + backlog. + """ + + class NamedTarget(proto.Enum): + r"""A named position with respect to the message backlog.""" + NAMED_TARGET_UNSPECIFIED = 0 + TAIL = 1 + HEAD = 2 + + name = proto.Field(proto.STRING, number=1,) + named_target = proto.Field(proto.ENUM, number=2, oneof="target", enum=NamedTarget,) + time_target = proto.Field( + proto.MESSAGE, number=3, oneof="target", message=common.TimeTarget, + ) + + +class SeekSubscriptionResponse(proto.Message): + r"""Response for SeekSubscription long running operation. """ + + +class OperationMetadata(proto.Message): + r"""Metadata for long running operations. + Attributes: + create_time (google.protobuf.timestamp_pb2.Timestamp): + The time the operation was created. + end_time (google.protobuf.timestamp_pb2.Timestamp): + The time the operation finished running. Not + set if the operation has not completed. + target (str): + Resource path for the target of the operation. For example, + targets of seeks are subscription resources, structured + like: + projects/{project_number}/locations/{location}/subscriptions/{subscription_id} + verb (str): + Name of the verb executed by the operation. + """ + + create_time = proto.Field(proto.MESSAGE, number=1, message=timestamp_pb2.Timestamp,) + end_time = proto.Field(proto.MESSAGE, number=2, message=timestamp_pb2.Timestamp,) + target = proto.Field(proto.STRING, number=3,) + verb = proto.Field(proto.STRING, number=4,) + + class CreateReservationRequest(proto.Message): r"""Request for CreateReservation. Attributes: diff --git a/samples/snippets/requirements-test.txt b/samples/snippets/requirements-test.txt index 05b7e31a..b18c21c5 100644 --- a/samples/snippets/requirements-test.txt +++ b/samples/snippets/requirements-test.txt @@ -1,2 +1,2 @@ -backoff==1.10.0 +backoff==1.11.0 pytest==6.2.4 \ No newline at end of file diff --git a/samples/snippets/requirements.txt b/samples/snippets/requirements.txt index 9b0e34a1..f0d36dc2 100644 --- a/samples/snippets/requirements.txt +++ b/samples/snippets/requirements.txt @@ -1 +1 @@ -google-cloud-pubsublite==0.4.1 \ No newline at end of file +google-cloud-pubsublite==0.5.0 \ No newline at end of file diff --git a/setup.py b/setup.py index 4df6d538..b4926729 100644 --- a/setup.py +++ b/setup.py @@ -19,7 +19,7 @@ import os import setuptools # type: ignore -version = "0.5.0" +version = "0.6.0" package_root = os.path.abspath(os.path.dirname(__file__)) @@ -32,6 +32,7 @@ "grpcio >= 1.18.0", "grpcio-status >= 1.18.0", "overrides>=6.0.1, <7.0.0", + "packaging >= 14.3", ] setuptools.setup( @@ -42,7 +43,11 @@ author_email="googleapis-packages@google.com", license="Apache 2.0", url="https://github.com/googleapis/python-pubsublite", - packages=setuptools.PEP420PackageFinder.find(), + packages=[ + package + for package in setuptools.PEP420PackageFinder.find() + if package.startswith("google") + ], namespace_packages=("google", "google.cloud"), platforms="Posix; MacOS X; Windows", include_package_data=True, diff --git a/testing/constraints-3.6.txt b/testing/constraints-3.6.txt index aa61acd7..0ed3c3c6 100644 --- a/testing/constraints-3.6.txt +++ b/testing/constraints-3.6.txt @@ -7,3 +7,4 @@ # Then this file should have foo==1.14.0 google-cloud-pubsub==2.1.0 overrides==2.0.0 +packaging==14.3 diff --git a/tests/unit/gapic/pubsublite_v1/test_admin_service.py b/tests/unit/gapic/pubsublite_v1/test_admin_service.py index 30e7fd88..c59ce8a5 100644 --- a/tests/unit/gapic/pubsublite_v1/test_admin_service.py +++ b/tests/unit/gapic/pubsublite_v1/test_admin_service.py @@ -26,31 +26,34 @@ from google.api_core import client_options from google.api_core import exceptions as core_exceptions +from google.api_core import future from google.api_core import gapic_v1 from google.api_core import grpc_helpers from google.api_core import grpc_helpers_async +from google.api_core import operation_async # type: ignore +from google.api_core import operations_v1 from google.auth import credentials as ga_credentials from google.auth.exceptions import MutualTLSChannelError from google.cloud.pubsublite_v1.services.admin_service import AdminServiceAsyncClient from google.cloud.pubsublite_v1.services.admin_service import AdminServiceClient from google.cloud.pubsublite_v1.services.admin_service import pagers from google.cloud.pubsublite_v1.services.admin_service import transports -from google.cloud.pubsublite_v1.services.admin_service.transports.base import ( - _API_CORE_VERSION, -) from google.cloud.pubsublite_v1.services.admin_service.transports.base import ( _GOOGLE_AUTH_VERSION, ) from google.cloud.pubsublite_v1.types import admin from google.cloud.pubsublite_v1.types import common +from google.longrunning import operations_pb2 from google.oauth2 import service_account from google.protobuf import duration_pb2 # type: ignore from google.protobuf import field_mask_pb2 # type: ignore +from google.protobuf import timestamp_pb2 # type: ignore import google.auth -# TODO(busunkim): Once google-api-core >= 1.26.0 is required: -# - Delete all the api-core and auth "less than" test cases +# TODO(busunkim): Once google-auth >= 1.25.0 is required transitively +# through google-api-core: +# - Delete the auth "less than" test cases # - Delete these pytest markers (Make the "greater than or equal to" tests the default). requires_google_auth_lt_1_25_0 = pytest.mark.skipif( packaging.version.parse(_GOOGLE_AUTH_VERSION) >= packaging.version.parse("1.25.0"), @@ -61,16 +64,6 @@ reason="This test requires google-auth >= 1.25.0", ) -requires_api_core_lt_1_26_0 = pytest.mark.skipif( - packaging.version.parse(_API_CORE_VERSION) >= packaging.version.parse("1.26.0"), - reason="This test requires google-api-core < 1.26.0", -) - -requires_api_core_gte_1_26_0 = pytest.mark.skipif( - packaging.version.parse(_API_CORE_VERSION) < packaging.version.parse("1.26.0"), - reason="This test requires google-api-core >= 1.26.0", -) - def client_cert_source_callback(): return b"cert bytes", b"key bytes" @@ -128,6 +121,34 @@ def test_admin_service_client_from_service_account_info(client_class): assert client.transport._host == "pubsublite.googleapis.com:443" +@pytest.mark.parametrize("client_class", [AdminServiceClient, AdminServiceAsyncClient,]) +def test_admin_service_client_service_account_always_use_jwt(client_class): + with mock.patch.object( + service_account.Credentials, "with_always_use_jwt_access", create=True + ) as use_jwt: + creds = service_account.Credentials(None, None, None) + client = client_class(credentials=creds) + use_jwt.assert_not_called() + + +@pytest.mark.parametrize( + "transport_class,transport_name", + [ + (transports.AdminServiceGrpcTransport, "grpc"), + (transports.AdminServiceGrpcAsyncIOTransport, "grpc_asyncio"), + ], +) +def test_admin_service_client_service_account_always_use_jwt_true( + transport_class, transport_name +): + with mock.patch.object( + service_account.Credentials, "with_always_use_jwt_access", create=True + ) as use_jwt: + creds = service_account.Credentials(None, None, None) + transport = transport_class(credentials=creds, always_use_jwt_access=True) + use_jwt.assert_called_once_with(True) + + @pytest.mark.parametrize("client_class", [AdminServiceClient, AdminServiceAsyncClient,]) def test_admin_service_client_from_service_account_file(client_class): creds = ga_credentials.AnonymousCredentials() @@ -3309,6 +3330,146 @@ async def test_delete_subscription_flattened_error_async(): ) +def test_seek_subscription( + transport: str = "grpc", request_type=admin.SeekSubscriptionRequest +): + client = AdminServiceClient( + credentials=ga_credentials.AnonymousCredentials(), transport=transport, + ) + + # Everything is optional in proto3 as far as the runtime is concerned, + # and we are mocking out the actual API, so just send an empty request. + request = request_type() + + # Mock the actual call within the gRPC stub, and fake the request. + with mock.patch.object( + type(client.transport.seek_subscription), "__call__" + ) as call: + # Designate an appropriate return value for the call. + call.return_value = operations_pb2.Operation(name="operations/spam") + response = client.seek_subscription(request) + + # Establish that the underlying gRPC stub method was called. + assert len(call.mock_calls) == 1 + _, args, _ = call.mock_calls[0] + assert args[0] == admin.SeekSubscriptionRequest() + + # Establish that the response is the type that we expect. + assert isinstance(response, future.Future) + + +def test_seek_subscription_from_dict(): + test_seek_subscription(request_type=dict) + + +def test_seek_subscription_empty_call(): + # This test is a coverage failsafe to make sure that totally empty calls, + # i.e. request == None and no flattened fields passed, work. + client = AdminServiceClient( + credentials=ga_credentials.AnonymousCredentials(), transport="grpc", + ) + + # Mock the actual call within the gRPC stub, and fake the request. + with mock.patch.object( + type(client.transport.seek_subscription), "__call__" + ) as call: + client.seek_subscription() + call.assert_called() + _, args, _ = call.mock_calls[0] + assert args[0] == admin.SeekSubscriptionRequest() + + +@pytest.mark.asyncio +async def test_seek_subscription_async( + transport: str = "grpc_asyncio", request_type=admin.SeekSubscriptionRequest +): + client = AdminServiceAsyncClient( + credentials=ga_credentials.AnonymousCredentials(), transport=transport, + ) + + # Everything is optional in proto3 as far as the runtime is concerned, + # and we are mocking out the actual API, so just send an empty request. + request = request_type() + + # Mock the actual call within the gRPC stub, and fake the request. + with mock.patch.object( + type(client.transport.seek_subscription), "__call__" + ) as call: + # Designate an appropriate return value for the call. + call.return_value = grpc_helpers_async.FakeUnaryUnaryCall( + operations_pb2.Operation(name="operations/spam") + ) + response = await client.seek_subscription(request) + + # Establish that the underlying gRPC stub method was called. + assert len(call.mock_calls) + _, args, _ = call.mock_calls[0] + assert args[0] == admin.SeekSubscriptionRequest() + + # Establish that the response is the type that we expect. + assert isinstance(response, future.Future) + + +@pytest.mark.asyncio +async def test_seek_subscription_async_from_dict(): + await test_seek_subscription_async(request_type=dict) + + +def test_seek_subscription_field_headers(): + client = AdminServiceClient(credentials=ga_credentials.AnonymousCredentials(),) + + # Any value that is part of the HTTP/1.1 URI should be sent as + # a field header. Set these to a non-empty value. + request = admin.SeekSubscriptionRequest() + + request.name = "name/value" + + # Mock the actual call within the gRPC stub, and fake the request. + with mock.patch.object( + type(client.transport.seek_subscription), "__call__" + ) as call: + call.return_value = operations_pb2.Operation(name="operations/op") + client.seek_subscription(request) + + # Establish that the underlying gRPC stub method was called. + assert len(call.mock_calls) == 1 + _, args, _ = call.mock_calls[0] + assert args[0] == request + + # Establish that the field header was sent. + _, _, kw = call.mock_calls[0] + assert ("x-goog-request-params", "name=name/value",) in kw["metadata"] + + +@pytest.mark.asyncio +async def test_seek_subscription_field_headers_async(): + client = AdminServiceAsyncClient(credentials=ga_credentials.AnonymousCredentials(),) + + # Any value that is part of the HTTP/1.1 URI should be sent as + # a field header. Set these to a non-empty value. + request = admin.SeekSubscriptionRequest() + + request.name = "name/value" + + # Mock the actual call within the gRPC stub, and fake the request. + with mock.patch.object( + type(client.transport.seek_subscription), "__call__" + ) as call: + call.return_value = grpc_helpers_async.FakeUnaryUnaryCall( + operations_pb2.Operation(name="operations/op") + ) + await client.seek_subscription(request) + + # Establish that the underlying gRPC stub method was called. + assert len(call.mock_calls) + _, args, _ = call.mock_calls[0] + assert args[0] == request + + # Establish that the field header was sent. + _, _, kw = call.mock_calls[0] + assert ("x-goog-request-params", "name=name/value",) in kw["metadata"] + + def test_create_reservation( transport: str = "grpc", request_type=admin.CreateReservationRequest ): @@ -4962,6 +5123,7 @@ def test_admin_service_base_transport(): "list_subscriptions", "update_subscription", "delete_subscription", + "seek_subscription", "create_reservation", "get_reservation", "list_reservations", @@ -4973,6 +5135,11 @@ def test_admin_service_base_transport(): with pytest.raises(NotImplementedError): getattr(transport, method)(request=object()) + # Additionally, the LRO client (a property) should + # also raise NotImplementedError + with pytest.raises(NotImplementedError): + transport.operations_client + @requires_google_auth_gte_1_25_0 def test_admin_service_base_transport_with_credentials_file(): @@ -5099,7 +5266,6 @@ def test_admin_service_transport_auth_adc_old_google_auth(transport_class): (transports.AdminServiceGrpcAsyncIOTransport, grpc_helpers_async), ], ) -@requires_api_core_gte_1_26_0 def test_admin_service_transport_create_channel(transport_class, grpc_helpers): # If credentials and host are not provided, the transport class should use # ADC credentials. @@ -5128,79 +5294,6 @@ def test_admin_service_transport_create_channel(transport_class, grpc_helpers): ) -@pytest.mark.parametrize( - "transport_class,grpc_helpers", - [ - (transports.AdminServiceGrpcTransport, grpc_helpers), - (transports.AdminServiceGrpcAsyncIOTransport, grpc_helpers_async), - ], -) -@requires_api_core_lt_1_26_0 -def test_admin_service_transport_create_channel_old_api_core( - transport_class, grpc_helpers -): - # If credentials and host are not provided, the transport class should use - # ADC credentials. - with mock.patch.object( - google.auth, "default", autospec=True - ) as adc, mock.patch.object( - grpc_helpers, "create_channel", autospec=True - ) as create_channel: - creds = ga_credentials.AnonymousCredentials() - adc.return_value = (creds, None) - transport_class(quota_project_id="octopus") - - create_channel.assert_called_with( - "pubsublite.googleapis.com:443", - credentials=creds, - credentials_file=None, - quota_project_id="octopus", - scopes=("https://www.googleapis.com/auth/cloud-platform",), - ssl_credentials=None, - options=[ - ("grpc.max_send_message_length", -1), - ("grpc.max_receive_message_length", -1), - ], - ) - - -@pytest.mark.parametrize( - "transport_class,grpc_helpers", - [ - (transports.AdminServiceGrpcTransport, grpc_helpers), - (transports.AdminServiceGrpcAsyncIOTransport, grpc_helpers_async), - ], -) -@requires_api_core_lt_1_26_0 -def test_admin_service_transport_create_channel_user_scopes( - transport_class, grpc_helpers -): - # If credentials and host are not provided, the transport class should use - # ADC credentials. - with mock.patch.object( - google.auth, "default", autospec=True - ) as adc, mock.patch.object( - grpc_helpers, "create_channel", autospec=True - ) as create_channel: - creds = ga_credentials.AnonymousCredentials() - adc.return_value = (creds, None) - - transport_class(quota_project_id="octopus", scopes=["1", "2"]) - - create_channel.assert_called_with( - "pubsublite.googleapis.com:443", - credentials=creds, - credentials_file=None, - quota_project_id="octopus", - scopes=["1", "2"], - ssl_credentials=None, - options=[ - ("grpc.max_send_message_length", -1), - ("grpc.max_receive_message_length", -1), - ], - ) - - @pytest.mark.parametrize( "transport_class", [transports.AdminServiceGrpcTransport, transports.AdminServiceGrpcAsyncIOTransport], @@ -5220,7 +5313,7 @@ def test_admin_service_grpc_transport_client_cert_source_for_mtls(transport_clas "squid.clam.whelk:443", credentials=cred, credentials_file=None, - scopes=("https://www.googleapis.com/auth/cloud-platform",), + scopes=None, ssl_credentials=mock_ssl_channel_creds, quota_project_id=None, options=[ @@ -5324,7 +5417,7 @@ def test_admin_service_transport_channel_mtls_with_client_cert_source(transport_ "mtls.squid.clam.whelk:443", credentials=cred, credentials_file=None, - scopes=("https://www.googleapis.com/auth/cloud-platform",), + scopes=None, ssl_credentials=mock_ssl_cred, quota_project_id=None, options=[ @@ -5368,7 +5461,7 @@ def test_admin_service_transport_channel_mtls_with_adc(transport_class): "mtls.squid.clam.whelk:443", credentials=mock_cred, credentials_file=None, - scopes=("https://www.googleapis.com/auth/cloud-platform",), + scopes=None, ssl_credentials=mock_ssl_cred, quota_project_id=None, options=[ @@ -5379,6 +5472,32 @@ def test_admin_service_transport_channel_mtls_with_adc(transport_class): assert transport.grpc_channel == mock_grpc_channel +def test_admin_service_grpc_lro_client(): + client = AdminServiceClient( + credentials=ga_credentials.AnonymousCredentials(), transport="grpc", + ) + transport = client.transport + + # Ensure that we have a api-core operations client. + assert isinstance(transport.operations_client, operations_v1.OperationsClient,) + + # Ensure that subsequent calls to the property send the exact same object. + assert transport.operations_client is transport.operations_client + + +def test_admin_service_grpc_lro_async_client(): + client = AdminServiceAsyncClient( + credentials=ga_credentials.AnonymousCredentials(), transport="grpc_asyncio", + ) + transport = client.transport + + # Ensure that we have a api-core operations client. + assert isinstance(transport.operations_client, operations_v1.OperationsAsyncClient,) + + # Ensure that subsequent calls to the property send the exact same object. + assert transport.operations_client is transport.operations_client + + def test_reservation_path(): project = "squid" location = "clam" diff --git a/tests/unit/gapic/pubsublite_v1/test_cursor_service.py b/tests/unit/gapic/pubsublite_v1/test_cursor_service.py index d865480b..f456ba14 100644 --- a/tests/unit/gapic/pubsublite_v1/test_cursor_service.py +++ b/tests/unit/gapic/pubsublite_v1/test_cursor_service.py @@ -35,9 +35,6 @@ from google.cloud.pubsublite_v1.services.cursor_service import CursorServiceClient from google.cloud.pubsublite_v1.services.cursor_service import pagers from google.cloud.pubsublite_v1.services.cursor_service import transports -from google.cloud.pubsublite_v1.services.cursor_service.transports.base import ( - _API_CORE_VERSION, -) from google.cloud.pubsublite_v1.services.cursor_service.transports.base import ( _GOOGLE_AUTH_VERSION, ) @@ -47,8 +44,9 @@ import google.auth -# TODO(busunkim): Once google-api-core >= 1.26.0 is required: -# - Delete all the api-core and auth "less than" test cases +# TODO(busunkim): Once google-auth >= 1.25.0 is required transitively +# through google-api-core: +# - Delete the auth "less than" test cases # - Delete these pytest markers (Make the "greater than or equal to" tests the default). requires_google_auth_lt_1_25_0 = pytest.mark.skipif( packaging.version.parse(_GOOGLE_AUTH_VERSION) >= packaging.version.parse("1.25.0"), @@ -59,16 +57,6 @@ reason="This test requires google-auth >= 1.25.0", ) -requires_api_core_lt_1_26_0 = pytest.mark.skipif( - packaging.version.parse(_API_CORE_VERSION) >= packaging.version.parse("1.26.0"), - reason="This test requires google-api-core < 1.26.0", -) - -requires_api_core_gte_1_26_0 = pytest.mark.skipif( - packaging.version.parse(_API_CORE_VERSION) < packaging.version.parse("1.26.0"), - reason="This test requires google-api-core >= 1.26.0", -) - def client_cert_source_callback(): return b"cert bytes", b"key bytes" @@ -131,6 +119,36 @@ def test_cursor_service_client_from_service_account_info(client_class): assert client.transport._host == "pubsublite.googleapis.com:443" +@pytest.mark.parametrize( + "client_class", [CursorServiceClient, CursorServiceAsyncClient,] +) +def test_cursor_service_client_service_account_always_use_jwt(client_class): + with mock.patch.object( + service_account.Credentials, "with_always_use_jwt_access", create=True + ) as use_jwt: + creds = service_account.Credentials(None, None, None) + client = client_class(credentials=creds) + use_jwt.assert_not_called() + + +@pytest.mark.parametrize( + "transport_class,transport_name", + [ + (transports.CursorServiceGrpcTransport, "grpc"), + (transports.CursorServiceGrpcAsyncIOTransport, "grpc_asyncio"), + ], +) +def test_cursor_service_client_service_account_always_use_jwt_true( + transport_class, transport_name +): + with mock.patch.object( + service_account.Credentials, "with_always_use_jwt_access", create=True + ) as use_jwt: + creds = service_account.Credentials(None, None, None) + transport = transport_class(credentials=creds, always_use_jwt_access=True) + use_jwt.assert_called_once_with(True) + + @pytest.mark.parametrize( "client_class", [CursorServiceClient, CursorServiceAsyncClient,] ) @@ -1286,7 +1304,6 @@ def test_cursor_service_transport_auth_adc_old_google_auth(transport_class): (transports.CursorServiceGrpcAsyncIOTransport, grpc_helpers_async), ], ) -@requires_api_core_gte_1_26_0 def test_cursor_service_transport_create_channel(transport_class, grpc_helpers): # If credentials and host are not provided, the transport class should use # ADC credentials. @@ -1315,79 +1332,6 @@ def test_cursor_service_transport_create_channel(transport_class, grpc_helpers): ) -@pytest.mark.parametrize( - "transport_class,grpc_helpers", - [ - (transports.CursorServiceGrpcTransport, grpc_helpers), - (transports.CursorServiceGrpcAsyncIOTransport, grpc_helpers_async), - ], -) -@requires_api_core_lt_1_26_0 -def test_cursor_service_transport_create_channel_old_api_core( - transport_class, grpc_helpers -): - # If credentials and host are not provided, the transport class should use - # ADC credentials. - with mock.patch.object( - google.auth, "default", autospec=True - ) as adc, mock.patch.object( - grpc_helpers, "create_channel", autospec=True - ) as create_channel: - creds = ga_credentials.AnonymousCredentials() - adc.return_value = (creds, None) - transport_class(quota_project_id="octopus") - - create_channel.assert_called_with( - "pubsublite.googleapis.com:443", - credentials=creds, - credentials_file=None, - quota_project_id="octopus", - scopes=("https://www.googleapis.com/auth/cloud-platform",), - ssl_credentials=None, - options=[ - ("grpc.max_send_message_length", -1), - ("grpc.max_receive_message_length", -1), - ], - ) - - -@pytest.mark.parametrize( - "transport_class,grpc_helpers", - [ - (transports.CursorServiceGrpcTransport, grpc_helpers), - (transports.CursorServiceGrpcAsyncIOTransport, grpc_helpers_async), - ], -) -@requires_api_core_lt_1_26_0 -def test_cursor_service_transport_create_channel_user_scopes( - transport_class, grpc_helpers -): - # If credentials and host are not provided, the transport class should use - # ADC credentials. - with mock.patch.object( - google.auth, "default", autospec=True - ) as adc, mock.patch.object( - grpc_helpers, "create_channel", autospec=True - ) as create_channel: - creds = ga_credentials.AnonymousCredentials() - adc.return_value = (creds, None) - - transport_class(quota_project_id="octopus", scopes=["1", "2"]) - - create_channel.assert_called_with( - "pubsublite.googleapis.com:443", - credentials=creds, - credentials_file=None, - quota_project_id="octopus", - scopes=["1", "2"], - ssl_credentials=None, - options=[ - ("grpc.max_send_message_length", -1), - ("grpc.max_receive_message_length", -1), - ], - ) - - @pytest.mark.parametrize( "transport_class", [ @@ -1410,7 +1354,7 @@ def test_cursor_service_grpc_transport_client_cert_source_for_mtls(transport_cla "squid.clam.whelk:443", credentials=cred, credentials_file=None, - scopes=("https://www.googleapis.com/auth/cloud-platform",), + scopes=None, ssl_credentials=mock_ssl_channel_creds, quota_project_id=None, options=[ @@ -1517,7 +1461,7 @@ def test_cursor_service_transport_channel_mtls_with_client_cert_source(transport "mtls.squid.clam.whelk:443", credentials=cred, credentials_file=None, - scopes=("https://www.googleapis.com/auth/cloud-platform",), + scopes=None, ssl_credentials=mock_ssl_cred, quota_project_id=None, options=[ @@ -1564,7 +1508,7 @@ def test_cursor_service_transport_channel_mtls_with_adc(transport_class): "mtls.squid.clam.whelk:443", credentials=mock_cred, credentials_file=None, - scopes=("https://www.googleapis.com/auth/cloud-platform",), + scopes=None, ssl_credentials=mock_ssl_cred, quota_project_id=None, options=[ diff --git a/tests/unit/gapic/pubsublite_v1/test_partition_assignment_service.py b/tests/unit/gapic/pubsublite_v1/test_partition_assignment_service.py index 036db0f8..37b07a95 100644 --- a/tests/unit/gapic/pubsublite_v1/test_partition_assignment_service.py +++ b/tests/unit/gapic/pubsublite_v1/test_partition_assignment_service.py @@ -38,9 +38,6 @@ PartitionAssignmentServiceClient, ) from google.cloud.pubsublite_v1.services.partition_assignment_service import transports -from google.cloud.pubsublite_v1.services.partition_assignment_service.transports.base import ( - _API_CORE_VERSION, -) from google.cloud.pubsublite_v1.services.partition_assignment_service.transports.base import ( _GOOGLE_AUTH_VERSION, ) @@ -49,8 +46,9 @@ import google.auth -# TODO(busunkim): Once google-api-core >= 1.26.0 is required: -# - Delete all the api-core and auth "less than" test cases +# TODO(busunkim): Once google-auth >= 1.25.0 is required transitively +# through google-api-core: +# - Delete the auth "less than" test cases # - Delete these pytest markers (Make the "greater than or equal to" tests the default). requires_google_auth_lt_1_25_0 = pytest.mark.skipif( packaging.version.parse(_GOOGLE_AUTH_VERSION) >= packaging.version.parse("1.25.0"), @@ -61,16 +59,6 @@ reason="This test requires google-auth >= 1.25.0", ) -requires_api_core_lt_1_26_0 = pytest.mark.skipif( - packaging.version.parse(_API_CORE_VERSION) >= packaging.version.parse("1.26.0"), - reason="This test requires google-api-core < 1.26.0", -) - -requires_api_core_gte_1_26_0 = pytest.mark.skipif( - packaging.version.parse(_API_CORE_VERSION) < packaging.version.parse("1.26.0"), - reason="This test requires google-api-core >= 1.26.0", -) - def client_cert_source_callback(): return b"cert bytes", b"key bytes" @@ -137,6 +125,39 @@ def test_partition_assignment_service_client_from_service_account_info(client_cl assert client.transport._host == "pubsublite.googleapis.com:443" +@pytest.mark.parametrize( + "client_class", + [PartitionAssignmentServiceClient, PartitionAssignmentServiceAsyncClient,], +) +def test_partition_assignment_service_client_service_account_always_use_jwt( + client_class, +): + with mock.patch.object( + service_account.Credentials, "with_always_use_jwt_access", create=True + ) as use_jwt: + creds = service_account.Credentials(None, None, None) + client = client_class(credentials=creds) + use_jwt.assert_not_called() + + +@pytest.mark.parametrize( + "transport_class,transport_name", + [ + (transports.PartitionAssignmentServiceGrpcTransport, "grpc"), + (transports.PartitionAssignmentServiceGrpcAsyncIOTransport, "grpc_asyncio"), + ], +) +def test_partition_assignment_service_client_service_account_always_use_jwt_true( + transport_class, transport_name +): + with mock.patch.object( + service_account.Credentials, "with_always_use_jwt_access", create=True + ) as use_jwt: + creds = service_account.Credentials(None, None, None) + transport = transport_class(credentials=creds, always_use_jwt_access=True) + use_jwt.assert_called_once_with(True) + + @pytest.mark.parametrize( "client_class", [PartitionAssignmentServiceClient, PartitionAssignmentServiceAsyncClient,], @@ -811,7 +832,6 @@ def test_partition_assignment_service_transport_auth_adc_old_google_auth( (transports.PartitionAssignmentServiceGrpcAsyncIOTransport, grpc_helpers_async), ], ) -@requires_api_core_gte_1_26_0 def test_partition_assignment_service_transport_create_channel( transport_class, grpc_helpers ): @@ -842,79 +862,6 @@ def test_partition_assignment_service_transport_create_channel( ) -@pytest.mark.parametrize( - "transport_class,grpc_helpers", - [ - (transports.PartitionAssignmentServiceGrpcTransport, grpc_helpers), - (transports.PartitionAssignmentServiceGrpcAsyncIOTransport, grpc_helpers_async), - ], -) -@requires_api_core_lt_1_26_0 -def test_partition_assignment_service_transport_create_channel_old_api_core( - transport_class, grpc_helpers -): - # If credentials and host are not provided, the transport class should use - # ADC credentials. - with mock.patch.object( - google.auth, "default", autospec=True - ) as adc, mock.patch.object( - grpc_helpers, "create_channel", autospec=True - ) as create_channel: - creds = ga_credentials.AnonymousCredentials() - adc.return_value = (creds, None) - transport_class(quota_project_id="octopus") - - create_channel.assert_called_with( - "pubsublite.googleapis.com:443", - credentials=creds, - credentials_file=None, - quota_project_id="octopus", - scopes=("https://www.googleapis.com/auth/cloud-platform",), - ssl_credentials=None, - options=[ - ("grpc.max_send_message_length", -1), - ("grpc.max_receive_message_length", -1), - ], - ) - - -@pytest.mark.parametrize( - "transport_class,grpc_helpers", - [ - (transports.PartitionAssignmentServiceGrpcTransport, grpc_helpers), - (transports.PartitionAssignmentServiceGrpcAsyncIOTransport, grpc_helpers_async), - ], -) -@requires_api_core_lt_1_26_0 -def test_partition_assignment_service_transport_create_channel_user_scopes( - transport_class, grpc_helpers -): - # If credentials and host are not provided, the transport class should use - # ADC credentials. - with mock.patch.object( - google.auth, "default", autospec=True - ) as adc, mock.patch.object( - grpc_helpers, "create_channel", autospec=True - ) as create_channel: - creds = ga_credentials.AnonymousCredentials() - adc.return_value = (creds, None) - - transport_class(quota_project_id="octopus", scopes=["1", "2"]) - - create_channel.assert_called_with( - "pubsublite.googleapis.com:443", - credentials=creds, - credentials_file=None, - quota_project_id="octopus", - scopes=["1", "2"], - ssl_credentials=None, - options=[ - ("grpc.max_send_message_length", -1), - ("grpc.max_receive_message_length", -1), - ], - ) - - @pytest.mark.parametrize( "transport_class", [ @@ -939,7 +886,7 @@ def test_partition_assignment_service_grpc_transport_client_cert_source_for_mtls "squid.clam.whelk:443", credentials=cred, credentials_file=None, - scopes=("https://www.googleapis.com/auth/cloud-platform",), + scopes=None, ssl_credentials=mock_ssl_channel_creds, quota_project_id=None, options=[ @@ -1048,7 +995,7 @@ def test_partition_assignment_service_transport_channel_mtls_with_client_cert_so "mtls.squid.clam.whelk:443", credentials=cred, credentials_file=None, - scopes=("https://www.googleapis.com/auth/cloud-platform",), + scopes=None, ssl_credentials=mock_ssl_cred, quota_project_id=None, options=[ @@ -1095,7 +1042,7 @@ def test_partition_assignment_service_transport_channel_mtls_with_adc(transport_ "mtls.squid.clam.whelk:443", credentials=mock_cred, credentials_file=None, - scopes=("https://www.googleapis.com/auth/cloud-platform",), + scopes=None, ssl_credentials=mock_ssl_cred, quota_project_id=None, options=[ diff --git a/tests/unit/gapic/pubsublite_v1/test_publisher_service.py b/tests/unit/gapic/pubsublite_v1/test_publisher_service.py index 8ccc5a9e..9f93ca92 100644 --- a/tests/unit/gapic/pubsublite_v1/test_publisher_service.py +++ b/tests/unit/gapic/pubsublite_v1/test_publisher_service.py @@ -36,9 +36,6 @@ ) from google.cloud.pubsublite_v1.services.publisher_service import PublisherServiceClient from google.cloud.pubsublite_v1.services.publisher_service import transports -from google.cloud.pubsublite_v1.services.publisher_service.transports.base import ( - _API_CORE_VERSION, -) from google.cloud.pubsublite_v1.services.publisher_service.transports.base import ( _GOOGLE_AUTH_VERSION, ) @@ -49,8 +46,9 @@ import google.auth -# TODO(busunkim): Once google-api-core >= 1.26.0 is required: -# - Delete all the api-core and auth "less than" test cases +# TODO(busunkim): Once google-auth >= 1.25.0 is required transitively +# through google-api-core: +# - Delete the auth "less than" test cases # - Delete these pytest markers (Make the "greater than or equal to" tests the default). requires_google_auth_lt_1_25_0 = pytest.mark.skipif( packaging.version.parse(_GOOGLE_AUTH_VERSION) >= packaging.version.parse("1.25.0"), @@ -61,16 +59,6 @@ reason="This test requires google-auth >= 1.25.0", ) -requires_api_core_lt_1_26_0 = pytest.mark.skipif( - packaging.version.parse(_API_CORE_VERSION) >= packaging.version.parse("1.26.0"), - reason="This test requires google-api-core < 1.26.0", -) - -requires_api_core_gte_1_26_0 = pytest.mark.skipif( - packaging.version.parse(_API_CORE_VERSION) < packaging.version.parse("1.26.0"), - reason="This test requires google-api-core >= 1.26.0", -) - def client_cert_source_callback(): return b"cert bytes", b"key bytes" @@ -134,6 +122,36 @@ def test_publisher_service_client_from_service_account_info(client_class): assert client.transport._host == "pubsublite.googleapis.com:443" +@pytest.mark.parametrize( + "client_class", [PublisherServiceClient, PublisherServiceAsyncClient,] +) +def test_publisher_service_client_service_account_always_use_jwt(client_class): + with mock.patch.object( + service_account.Credentials, "with_always_use_jwt_access", create=True + ) as use_jwt: + creds = service_account.Credentials(None, None, None) + client = client_class(credentials=creds) + use_jwt.assert_not_called() + + +@pytest.mark.parametrize( + "transport_class,transport_name", + [ + (transports.PublisherServiceGrpcTransport, "grpc"), + (transports.PublisherServiceGrpcAsyncIOTransport, "grpc_asyncio"), + ], +) +def test_publisher_service_client_service_account_always_use_jwt_true( + transport_class, transport_name +): + with mock.patch.object( + service_account.Credentials, "with_always_use_jwt_access", create=True + ) as use_jwt: + creds = service_account.Credentials(None, None, None) + transport = transport_class(credentials=creds, always_use_jwt_access=True) + use_jwt.assert_called_once_with(True) + + @pytest.mark.parametrize( "client_class", [PublisherServiceClient, PublisherServiceAsyncClient,] ) @@ -779,7 +797,6 @@ def test_publisher_service_transport_auth_adc_old_google_auth(transport_class): (transports.PublisherServiceGrpcAsyncIOTransport, grpc_helpers_async), ], ) -@requires_api_core_gte_1_26_0 def test_publisher_service_transport_create_channel(transport_class, grpc_helpers): # If credentials and host are not provided, the transport class should use # ADC credentials. @@ -808,79 +825,6 @@ def test_publisher_service_transport_create_channel(transport_class, grpc_helper ) -@pytest.mark.parametrize( - "transport_class,grpc_helpers", - [ - (transports.PublisherServiceGrpcTransport, grpc_helpers), - (transports.PublisherServiceGrpcAsyncIOTransport, grpc_helpers_async), - ], -) -@requires_api_core_lt_1_26_0 -def test_publisher_service_transport_create_channel_old_api_core( - transport_class, grpc_helpers -): - # If credentials and host are not provided, the transport class should use - # ADC credentials. - with mock.patch.object( - google.auth, "default", autospec=True - ) as adc, mock.patch.object( - grpc_helpers, "create_channel", autospec=True - ) as create_channel: - creds = ga_credentials.AnonymousCredentials() - adc.return_value = (creds, None) - transport_class(quota_project_id="octopus") - - create_channel.assert_called_with( - "pubsublite.googleapis.com:443", - credentials=creds, - credentials_file=None, - quota_project_id="octopus", - scopes=("https://www.googleapis.com/auth/cloud-platform",), - ssl_credentials=None, - options=[ - ("grpc.max_send_message_length", -1), - ("grpc.max_receive_message_length", -1), - ], - ) - - -@pytest.mark.parametrize( - "transport_class,grpc_helpers", - [ - (transports.PublisherServiceGrpcTransport, grpc_helpers), - (transports.PublisherServiceGrpcAsyncIOTransport, grpc_helpers_async), - ], -) -@requires_api_core_lt_1_26_0 -def test_publisher_service_transport_create_channel_user_scopes( - transport_class, grpc_helpers -): - # If credentials and host are not provided, the transport class should use - # ADC credentials. - with mock.patch.object( - google.auth, "default", autospec=True - ) as adc, mock.patch.object( - grpc_helpers, "create_channel", autospec=True - ) as create_channel: - creds = ga_credentials.AnonymousCredentials() - adc.return_value = (creds, None) - - transport_class(quota_project_id="octopus", scopes=["1", "2"]) - - create_channel.assert_called_with( - "pubsublite.googleapis.com:443", - credentials=creds, - credentials_file=None, - quota_project_id="octopus", - scopes=["1", "2"], - ssl_credentials=None, - options=[ - ("grpc.max_send_message_length", -1), - ("grpc.max_receive_message_length", -1), - ], - ) - - @pytest.mark.parametrize( "transport_class", [ @@ -903,7 +847,7 @@ def test_publisher_service_grpc_transport_client_cert_source_for_mtls(transport_ "squid.clam.whelk:443", credentials=cred, credentials_file=None, - scopes=("https://www.googleapis.com/auth/cloud-platform",), + scopes=None, ssl_credentials=mock_ssl_channel_creds, quota_project_id=None, options=[ @@ -1012,7 +956,7 @@ def test_publisher_service_transport_channel_mtls_with_client_cert_source( "mtls.squid.clam.whelk:443", credentials=cred, credentials_file=None, - scopes=("https://www.googleapis.com/auth/cloud-platform",), + scopes=None, ssl_credentials=mock_ssl_cred, quota_project_id=None, options=[ @@ -1059,7 +1003,7 @@ def test_publisher_service_transport_channel_mtls_with_adc(transport_class): "mtls.squid.clam.whelk:443", credentials=mock_cred, credentials_file=None, - scopes=("https://www.googleapis.com/auth/cloud-platform",), + scopes=None, ssl_credentials=mock_ssl_cred, quota_project_id=None, options=[ diff --git a/tests/unit/gapic/pubsublite_v1/test_subscriber_service.py b/tests/unit/gapic/pubsublite_v1/test_subscriber_service.py index 342ac916..6dfa426d 100644 --- a/tests/unit/gapic/pubsublite_v1/test_subscriber_service.py +++ b/tests/unit/gapic/pubsublite_v1/test_subscriber_service.py @@ -38,9 +38,6 @@ SubscriberServiceClient, ) from google.cloud.pubsublite_v1.services.subscriber_service import transports -from google.cloud.pubsublite_v1.services.subscriber_service.transports.base import ( - _API_CORE_VERSION, -) from google.cloud.pubsublite_v1.services.subscriber_service.transports.base import ( _GOOGLE_AUTH_VERSION, ) @@ -50,8 +47,9 @@ import google.auth -# TODO(busunkim): Once google-api-core >= 1.26.0 is required: -# - Delete all the api-core and auth "less than" test cases +# TODO(busunkim): Once google-auth >= 1.25.0 is required transitively +# through google-api-core: +# - Delete the auth "less than" test cases # - Delete these pytest markers (Make the "greater than or equal to" tests the default). requires_google_auth_lt_1_25_0 = pytest.mark.skipif( packaging.version.parse(_GOOGLE_AUTH_VERSION) >= packaging.version.parse("1.25.0"), @@ -62,16 +60,6 @@ reason="This test requires google-auth >= 1.25.0", ) -requires_api_core_lt_1_26_0 = pytest.mark.skipif( - packaging.version.parse(_API_CORE_VERSION) >= packaging.version.parse("1.26.0"), - reason="This test requires google-api-core < 1.26.0", -) - -requires_api_core_gte_1_26_0 = pytest.mark.skipif( - packaging.version.parse(_API_CORE_VERSION) < packaging.version.parse("1.26.0"), - reason="This test requires google-api-core >= 1.26.0", -) - def client_cert_source_callback(): return b"cert bytes", b"key bytes" @@ -135,6 +123,36 @@ def test_subscriber_service_client_from_service_account_info(client_class): assert client.transport._host == "pubsublite.googleapis.com:443" +@pytest.mark.parametrize( + "client_class", [SubscriberServiceClient, SubscriberServiceAsyncClient,] +) +def test_subscriber_service_client_service_account_always_use_jwt(client_class): + with mock.patch.object( + service_account.Credentials, "with_always_use_jwt_access", create=True + ) as use_jwt: + creds = service_account.Credentials(None, None, None) + client = client_class(credentials=creds) + use_jwt.assert_not_called() + + +@pytest.mark.parametrize( + "transport_class,transport_name", + [ + (transports.SubscriberServiceGrpcTransport, "grpc"), + (transports.SubscriberServiceGrpcAsyncIOTransport, "grpc_asyncio"), + ], +) +def test_subscriber_service_client_service_account_always_use_jwt_true( + transport_class, transport_name +): + with mock.patch.object( + service_account.Credentials, "with_always_use_jwt_access", create=True + ) as use_jwt: + creds = service_account.Credentials(None, None, None) + transport = transport_class(credentials=creds, always_use_jwt_access=True) + use_jwt.assert_called_once_with(True) + + @pytest.mark.parametrize( "client_class", [SubscriberServiceClient, SubscriberServiceAsyncClient,] ) @@ -780,7 +798,6 @@ def test_subscriber_service_transport_auth_adc_old_google_auth(transport_class): (transports.SubscriberServiceGrpcAsyncIOTransport, grpc_helpers_async), ], ) -@requires_api_core_gte_1_26_0 def test_subscriber_service_transport_create_channel(transport_class, grpc_helpers): # If credentials and host are not provided, the transport class should use # ADC credentials. @@ -809,79 +826,6 @@ def test_subscriber_service_transport_create_channel(transport_class, grpc_helpe ) -@pytest.mark.parametrize( - "transport_class,grpc_helpers", - [ - (transports.SubscriberServiceGrpcTransport, grpc_helpers), - (transports.SubscriberServiceGrpcAsyncIOTransport, grpc_helpers_async), - ], -) -@requires_api_core_lt_1_26_0 -def test_subscriber_service_transport_create_channel_old_api_core( - transport_class, grpc_helpers -): - # If credentials and host are not provided, the transport class should use - # ADC credentials. - with mock.patch.object( - google.auth, "default", autospec=True - ) as adc, mock.patch.object( - grpc_helpers, "create_channel", autospec=True - ) as create_channel: - creds = ga_credentials.AnonymousCredentials() - adc.return_value = (creds, None) - transport_class(quota_project_id="octopus") - - create_channel.assert_called_with( - "pubsublite.googleapis.com:443", - credentials=creds, - credentials_file=None, - quota_project_id="octopus", - scopes=("https://www.googleapis.com/auth/cloud-platform",), - ssl_credentials=None, - options=[ - ("grpc.max_send_message_length", -1), - ("grpc.max_receive_message_length", -1), - ], - ) - - -@pytest.mark.parametrize( - "transport_class,grpc_helpers", - [ - (transports.SubscriberServiceGrpcTransport, grpc_helpers), - (transports.SubscriberServiceGrpcAsyncIOTransport, grpc_helpers_async), - ], -) -@requires_api_core_lt_1_26_0 -def test_subscriber_service_transport_create_channel_user_scopes( - transport_class, grpc_helpers -): - # If credentials and host are not provided, the transport class should use - # ADC credentials. - with mock.patch.object( - google.auth, "default", autospec=True - ) as adc, mock.patch.object( - grpc_helpers, "create_channel", autospec=True - ) as create_channel: - creds = ga_credentials.AnonymousCredentials() - adc.return_value = (creds, None) - - transport_class(quota_project_id="octopus", scopes=["1", "2"]) - - create_channel.assert_called_with( - "pubsublite.googleapis.com:443", - credentials=creds, - credentials_file=None, - quota_project_id="octopus", - scopes=["1", "2"], - ssl_credentials=None, - options=[ - ("grpc.max_send_message_length", -1), - ("grpc.max_receive_message_length", -1), - ], - ) - - @pytest.mark.parametrize( "transport_class", [ @@ -904,7 +848,7 @@ def test_subscriber_service_grpc_transport_client_cert_source_for_mtls(transport "squid.clam.whelk:443", credentials=cred, credentials_file=None, - scopes=("https://www.googleapis.com/auth/cloud-platform",), + scopes=None, ssl_credentials=mock_ssl_channel_creds, quota_project_id=None, options=[ @@ -1013,7 +957,7 @@ def test_subscriber_service_transport_channel_mtls_with_client_cert_source( "mtls.squid.clam.whelk:443", credentials=cred, credentials_file=None, - scopes=("https://www.googleapis.com/auth/cloud-platform",), + scopes=None, ssl_credentials=mock_ssl_cred, quota_project_id=None, options=[ @@ -1060,7 +1004,7 @@ def test_subscriber_service_transport_channel_mtls_with_adc(transport_class): "mtls.squid.clam.whelk:443", credentials=mock_cred, credentials_file=None, - scopes=("https://www.googleapis.com/auth/cloud-platform",), + scopes=None, ssl_credentials=mock_ssl_cred, quota_project_id=None, options=[ diff --git a/tests/unit/gapic/pubsublite_v1/test_topic_stats_service.py b/tests/unit/gapic/pubsublite_v1/test_topic_stats_service.py index c4dd171c..5d16d93b 100644 --- a/tests/unit/gapic/pubsublite_v1/test_topic_stats_service.py +++ b/tests/unit/gapic/pubsublite_v1/test_topic_stats_service.py @@ -38,9 +38,6 @@ TopicStatsServiceClient, ) from google.cloud.pubsublite_v1.services.topic_stats_service import transports -from google.cloud.pubsublite_v1.services.topic_stats_service.transports.base import ( - _API_CORE_VERSION, -) from google.cloud.pubsublite_v1.services.topic_stats_service.transports.base import ( _GOOGLE_AUTH_VERSION, ) @@ -51,8 +48,9 @@ import google.auth -# TODO(busunkim): Once google-api-core >= 1.26.0 is required: -# - Delete all the api-core and auth "less than" test cases +# TODO(busunkim): Once google-auth >= 1.25.0 is required transitively +# through google-api-core: +# - Delete the auth "less than" test cases # - Delete these pytest markers (Make the "greater than or equal to" tests the default). requires_google_auth_lt_1_25_0 = pytest.mark.skipif( packaging.version.parse(_GOOGLE_AUTH_VERSION) >= packaging.version.parse("1.25.0"), @@ -63,16 +61,6 @@ reason="This test requires google-auth >= 1.25.0", ) -requires_api_core_lt_1_26_0 = pytest.mark.skipif( - packaging.version.parse(_API_CORE_VERSION) >= packaging.version.parse("1.26.0"), - reason="This test requires google-api-core < 1.26.0", -) - -requires_api_core_gte_1_26_0 = pytest.mark.skipif( - packaging.version.parse(_API_CORE_VERSION) < packaging.version.parse("1.26.0"), - reason="This test requires google-api-core >= 1.26.0", -) - def client_cert_source_callback(): return b"cert bytes", b"key bytes" @@ -136,6 +124,36 @@ def test_topic_stats_service_client_from_service_account_info(client_class): assert client.transport._host == "pubsublite.googleapis.com:443" +@pytest.mark.parametrize( + "client_class", [TopicStatsServiceClient, TopicStatsServiceAsyncClient,] +) +def test_topic_stats_service_client_service_account_always_use_jwt(client_class): + with mock.patch.object( + service_account.Credentials, "with_always_use_jwt_access", create=True + ) as use_jwt: + creds = service_account.Credentials(None, None, None) + client = client_class(credentials=creds) + use_jwt.assert_not_called() + + +@pytest.mark.parametrize( + "transport_class,transport_name", + [ + (transports.TopicStatsServiceGrpcTransport, "grpc"), + (transports.TopicStatsServiceGrpcAsyncIOTransport, "grpc_asyncio"), + ], +) +def test_topic_stats_service_client_service_account_always_use_jwt_true( + transport_class, transport_name +): + with mock.patch.object( + service_account.Credentials, "with_always_use_jwt_access", create=True + ) as use_jwt: + creds = service_account.Credentials(None, None, None) + transport = transport_class(credentials=creds, always_use_jwt_access=True) + use_jwt.assert_called_once_with(True) + + @pytest.mark.parametrize( "client_class", [TopicStatsServiceClient, TopicStatsServiceAsyncClient,] ) @@ -1152,7 +1170,6 @@ def test_topic_stats_service_transport_auth_adc_old_google_auth(transport_class) (transports.TopicStatsServiceGrpcAsyncIOTransport, grpc_helpers_async), ], ) -@requires_api_core_gte_1_26_0 def test_topic_stats_service_transport_create_channel(transport_class, grpc_helpers): # If credentials and host are not provided, the transport class should use # ADC credentials. @@ -1181,79 +1198,6 @@ def test_topic_stats_service_transport_create_channel(transport_class, grpc_help ) -@pytest.mark.parametrize( - "transport_class,grpc_helpers", - [ - (transports.TopicStatsServiceGrpcTransport, grpc_helpers), - (transports.TopicStatsServiceGrpcAsyncIOTransport, grpc_helpers_async), - ], -) -@requires_api_core_lt_1_26_0 -def test_topic_stats_service_transport_create_channel_old_api_core( - transport_class, grpc_helpers -): - # If credentials and host are not provided, the transport class should use - # ADC credentials. - with mock.patch.object( - google.auth, "default", autospec=True - ) as adc, mock.patch.object( - grpc_helpers, "create_channel", autospec=True - ) as create_channel: - creds = ga_credentials.AnonymousCredentials() - adc.return_value = (creds, None) - transport_class(quota_project_id="octopus") - - create_channel.assert_called_with( - "pubsublite.googleapis.com:443", - credentials=creds, - credentials_file=None, - quota_project_id="octopus", - scopes=("https://www.googleapis.com/auth/cloud-platform",), - ssl_credentials=None, - options=[ - ("grpc.max_send_message_length", -1), - ("grpc.max_receive_message_length", -1), - ], - ) - - -@pytest.mark.parametrize( - "transport_class,grpc_helpers", - [ - (transports.TopicStatsServiceGrpcTransport, grpc_helpers), - (transports.TopicStatsServiceGrpcAsyncIOTransport, grpc_helpers_async), - ], -) -@requires_api_core_lt_1_26_0 -def test_topic_stats_service_transport_create_channel_user_scopes( - transport_class, grpc_helpers -): - # If credentials and host are not provided, the transport class should use - # ADC credentials. - with mock.patch.object( - google.auth, "default", autospec=True - ) as adc, mock.patch.object( - grpc_helpers, "create_channel", autospec=True - ) as create_channel: - creds = ga_credentials.AnonymousCredentials() - adc.return_value = (creds, None) - - transport_class(quota_project_id="octopus", scopes=["1", "2"]) - - create_channel.assert_called_with( - "pubsublite.googleapis.com:443", - credentials=creds, - credentials_file=None, - quota_project_id="octopus", - scopes=["1", "2"], - ssl_credentials=None, - options=[ - ("grpc.max_send_message_length", -1), - ("grpc.max_receive_message_length", -1), - ], - ) - - @pytest.mark.parametrize( "transport_class", [ @@ -1278,7 +1222,7 @@ def test_topic_stats_service_grpc_transport_client_cert_source_for_mtls( "squid.clam.whelk:443", credentials=cred, credentials_file=None, - scopes=("https://www.googleapis.com/auth/cloud-platform",), + scopes=None, ssl_credentials=mock_ssl_channel_creds, quota_project_id=None, options=[ @@ -1387,7 +1331,7 @@ def test_topic_stats_service_transport_channel_mtls_with_client_cert_source( "mtls.squid.clam.whelk:443", credentials=cred, credentials_file=None, - scopes=("https://www.googleapis.com/auth/cloud-platform",), + scopes=None, ssl_credentials=mock_ssl_cred, quota_project_id=None, options=[ @@ -1434,7 +1378,7 @@ def test_topic_stats_service_transport_channel_mtls_with_adc(transport_class): "mtls.squid.clam.whelk:443", credentials=mock_cred, credentials_file=None, - scopes=("https://www.googleapis.com/auth/cloud-platform",), + scopes=None, ssl_credentials=mock_ssl_cred, quota_project_id=None, options=[ diff --git a/tests/unit/pubsublite/internal/wire/reset_signal_test.py b/tests/unit/pubsublite/internal/wire/reset_signal_test.py index 8925d525..78bd0f8b 100644 --- a/tests/unit/pubsublite/internal/wire/reset_signal_test.py +++ b/tests/unit/pubsublite/internal/wire/reset_signal_test.py @@ -17,6 +17,7 @@ from google.cloud.pubsublite.internal.wire.reset_signal import is_reset_signal from google.cloud.pubsublite.testing.test_reset_signal import ( make_call, + make_call_without_metadata, make_reset_signal, ) from google.protobuf.any_pb2 import Any @@ -39,6 +40,13 @@ async def test_missing_call(): assert not is_reset_signal(Aborted("")) +async def test_extracted_status_is_none(): + status_pb = Status(code=10, details=[]) + assert not is_reset_signal( + Aborted("", response=make_call_without_metadata(status_pb)) + ) + + async def test_wrong_reason(): any = Any() any.Pack(ErrorInfo(reason="OTHER", domain="pubsublite.googleapis.com"))