From f99640d828df9410a9c23078f2c787ac5e27dc8e Mon Sep 17 00:00:00 2001 From: "gcf-owl-bot[bot]" <78513119+gcf-owl-bot[bot]@users.noreply.github.com> Date: Mon, 27 Feb 2023 12:11:49 -0500 Subject: [PATCH 1/8] chore(python): upgrade gcp-releasetool in .kokoro [autoapprove] (#876) Source-Link: https://github.com/googleapis/synthtool/commit/5f2a6089f73abf06238fe4310f6a14d6f6d1eed3 Post-Processor: gcr.io/cloud-devrel-public-resources/owlbot-python:latest@sha256:8555f0e37e6261408f792bfd6635102d2da5ad73f8f09bcb24f25e6afb5fac97 Co-authored-by: Owl Bot --- .github/.OwlBot.lock.yaml | 2 +- .kokoro/requirements.in | 2 +- .kokoro/requirements.txt | 6 +++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/.github/.OwlBot.lock.yaml b/.github/.OwlBot.lock.yaml index 894fb6bc9..5fc5daa31 100644 --- a/.github/.OwlBot.lock.yaml +++ b/.github/.OwlBot.lock.yaml @@ -13,4 +13,4 @@ # limitations under the License. docker: image: gcr.io/cloud-devrel-public-resources/owlbot-python:latest - digest: sha256:f62c53736eccb0c4934a3ea9316e0d57696bb49c1a7c86c726e9bb8a2f87dadf + digest: sha256:8555f0e37e6261408f792bfd6635102d2da5ad73f8f09bcb24f25e6afb5fac97 diff --git a/.kokoro/requirements.in b/.kokoro/requirements.in index cbd7e77f4..882178ce6 100644 --- a/.kokoro/requirements.in +++ b/.kokoro/requirements.in @@ -1,5 +1,5 @@ gcp-docuploader -gcp-releasetool +gcp-releasetool>=1.10.5 # required for compatibility with cryptography>=39.x importlib-metadata typing-extensions twine diff --git a/.kokoro/requirements.txt b/.kokoro/requirements.txt index 096e4800a..fa99c1290 100644 --- a/.kokoro/requirements.txt +++ b/.kokoro/requirements.txt @@ -154,9 +154,9 @@ gcp-docuploader==0.6.4 \ --hash=sha256:01486419e24633af78fd0167db74a2763974765ee8078ca6eb6964d0ebd388af \ --hash=sha256:70861190c123d907b3b067da896265ead2eeb9263969d6955c9e0bb091b5ccbf # via -r requirements.in -gcp-releasetool==1.10.0 \ - --hash=sha256:72a38ca91b59c24f7e699e9227c90cbe4dd71b789383cb0164b088abae294c83 \ - --hash=sha256:8c7c99320208383d4bb2b808c6880eb7a81424afe7cdba3c8d84b25f4f0e097d +gcp-releasetool==1.10.5 \ + --hash=sha256:174b7b102d704b254f2a26a3eda2c684fd3543320ec239baf771542a2e58e109 \ + --hash=sha256:e29d29927fe2ca493105a82958c6873bb2b90d503acac56be2c229e74de0eec9 # via -r requirements.in google-api-core==2.10.2 \ --hash=sha256:10c06f7739fe57781f87523375e8e1a3a4674bf6392cd6131a3222182b971320 \ From df4d54100b4fee5fe2f83de386b8075005038ccb Mon Sep 17 00:00:00 2001 From: Mend Renovate Date: Wed, 1 Mar 2023 10:41:27 +0000 Subject: [PATCH 2/8] chore(deps): update dependency google-cloud-bigquery to v3.6.0 (#873) Co-authored-by: Anna Cocuzzo <63511057+acocuzzo@users.noreply.github.com> Co-authored-by: Anthonios Partheniou --- samples/snippets/requirements-test.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samples/snippets/requirements-test.txt b/samples/snippets/requirements-test.txt index ba89d85dc..3c743a349 100644 --- a/samples/snippets/requirements-test.txt +++ b/samples/snippets/requirements-test.txt @@ -2,4 +2,4 @@ backoff==2.2.1 pytest==7.2.1 mock==5.0.1 flaky==3.7.0 -google-cloud-bigquery==3.5.0 +google-cloud-bigquery==3.6.0 From 1dcff30f6254cb6429565fe74b9c8cdc3e0338bb Mon Sep 17 00:00:00 2001 From: Mend Renovate Date: Wed, 1 Mar 2023 13:49:02 +0000 Subject: [PATCH 3/8] chore(deps): update dependency google-cloud-pubsub to v2.15.0 (#878) --- samples/snippets/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samples/snippets/requirements.txt b/samples/snippets/requirements.txt index af598f2ff..a69515e44 100644 --- a/samples/snippets/requirements.txt +++ b/samples/snippets/requirements.txt @@ -1,2 +1,2 @@ -google-cloud-pubsub==2.14.1 +google-cloud-pubsub==2.15.0 avro==1.11.1 From eb9f233d656c4e335829f11ef8617b9af38466bb Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Fri, 10 Mar 2023 21:59:59 -0500 Subject: [PATCH 4/8] samples: Schema evolution (#881) * samples: schema evolution * Add command-line commands * Fix tag for rollback * Make formatting fixes * Formatting fixes * Fix exceptions --- .../snippets/resources/us-states-plus.avsc | 24 ++ .../snippets/resources/us-states-plus.proto | 9 + samples/snippets/schema.py | 366 +++++++++++++++++- samples/snippets/schema_test.py | 132 ++++++- 4 files changed, 523 insertions(+), 8 deletions(-) create mode 100644 samples/snippets/resources/us-states-plus.avsc create mode 100644 samples/snippets/resources/us-states-plus.proto diff --git a/samples/snippets/resources/us-states-plus.avsc b/samples/snippets/resources/us-states-plus.avsc new file mode 100644 index 000000000..74225ae7e --- /dev/null +++ b/samples/snippets/resources/us-states-plus.avsc @@ -0,0 +1,24 @@ +{ + "type":"record", + "name":"State", + "namespace":"utilities", + "doc":"A list of states in the United States of America.", + "fields":[ + { + "name":"name", + "type":"string", + "doc":"The common name of the state." + }, + { + "name":"post_abbr", + "type":"string", + "doc":"The postal code abbreviation of the state." + }, + { + "name":"population", + "type":"long", + "default":0, + "doc":"The population of the state." + } + ] +} diff --git a/samples/snippets/resources/us-states-plus.proto b/samples/snippets/resources/us-states-plus.proto new file mode 100644 index 000000000..9f845d9f4 --- /dev/null +++ b/samples/snippets/resources/us-states-plus.proto @@ -0,0 +1,9 @@ +syntax = "proto3"; + +package utilities; + +message StateProto { + string name = 1; + string post_abbr = 2; + int64 population = 3; +} diff --git a/samples/snippets/schema.py b/samples/snippets/schema.py index e2a171d1d..9c0dd656f 100644 --- a/samples/snippets/schema.py +++ b/samples/snippets/schema.py @@ -1,6 +1,6 @@ #!/usr/bin/env python -# Copyright 2021 Google LLC. All Rights Reserved. +# Copyright 2023 Google LLC. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -54,6 +54,7 @@ def create_avro_schema(project_id: str, schema_id: str, avsc_file: str) -> None: request={"parent": project_path, "schema": schema, "schema_id": schema_id} ) print(f"Created a schema using an Avro schema file:\n{result}") + return result except AlreadyExists: print(f"{schema_id} already exists.") # [END pubsub_create_avro_schema] @@ -88,11 +89,76 @@ def create_proto_schema(project_id: str, schema_id: str, proto_file: str) -> Non request={"parent": project_path, "schema": schema, "schema_id": schema_id} ) print(f"Created a schema using a protobuf schema file:\n{result}") + return result except AlreadyExists: print(f"{schema_id} already exists.") # [END pubsub_create_proto_schema] +def commit_avro_schema(project_id: str, schema_id: str, avsc_file: str) -> None: + """Commit a schema resource from a JSON-formatted Avro schema file.""" + # [START pubsub_commit_avro_schema] + from google.api_core.exceptions import NotFound + from google.cloud.pubsub import SchemaServiceClient + from google.pubsub_v1.types import Schema + + # TODO(developer): Replace these variables before running the sample. + # project_id = "your-project-id" + # schema_id = "your-schema-id" + # avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json" + + # Read a JSON-formatted Avro schema file as a string. + with open(avsc_file, "rb") as f: + avsc_source = f.read().decode("utf-8") + + schema_client = SchemaServiceClient() + schema_path = schema_client.schema_path(project_id, schema_id) + schema = Schema(name=schema_path, type_=Schema.Type.AVRO, definition=avsc_source) + + try: + result = schema_client.commit_schema( + request={"schema": schema, "name": schema_path} + ) + print(f"Committed a schema revision using an Avro schema file:\n{result}") + return result + except NotFound: + print(f"{schema_id} does not exist.") + # [END pubsub_commit_avro_schema] + + +def commit_proto_schema(project_id: str, schema_id: str, proto_file: str) -> None: + """Commit a schema revision from a protobuf schema file.""" + # [START pubsub_commit_proto_schema] + from google.api_core.exceptions import NotFound + from google.cloud.pubsub import SchemaServiceClient + from google.pubsub_v1.types import Schema + + # TODO(developer): Replace these variables before running the sample. + # project_id = "your-project-id" + # schema_id = "your-schema-id" + # proto_file = "path/to/a/proto/file/(.proto)/formatted/in/protocol/buffers" + + # Read a protobuf schema file as a string. + with open(proto_file, "rb") as f: + proto_source = f.read().decode("utf-8") + + schema_client = SchemaServiceClient() + schema_path = schema_client.schema_path(project_id, schema_id) + schema = Schema( + name=schema_path, type_=Schema.Type.PROTOCOL_BUFFER, definition=proto_source + ) + + try: + result = schema_client.commit_schema( + request={"schema": schema, "name": schema_path} + ) + print(f"Committed a schema revision using a protobuf schema file:\n{result}") + return result + except NotFound: + print(f"{schema_id} does not exist.") + # [END pubsub_commit_proto_schema] + + def get_schema(project_id: str, schema_id: str) -> None: """Get a schema resource.""" # [START pubsub_get_schema] @@ -114,6 +180,32 @@ def get_schema(project_id: str, schema_id: str) -> None: # [END pubsub_get_schema] +def get_schema_revision( + project_id: str, schema_id: str, schema_revision_id: str +) -> None: + """Get a schema revision.""" + # [START pubsub_get_schema_revision] + from google.api_core.exceptions import NotFound + from google.cloud.pubsub import SchemaServiceClient + + # TODO(developer): Replace these variables before running the sample. + # project_id = "your-project-id" + # schema_id = "your-schema-id" + # schema_revision_id = "your-schema-revision-id" + + schema_client = SchemaServiceClient() + schema_path = schema_client.schema_path( + project_id, schema_id + "@" + schema_revision_id + ) + + try: + result = schema_client.get_schema(request={"name": schema_path}) + print(f"Got a schema revision:\n{result}") + except NotFound: + print(f"{schema_id} not found.") + # [END pubsub_get_schema_revision] + + def list_schemas(project_id: str) -> None: """List schema resources.""" # [START pubsub_list_schemas] @@ -132,6 +224,51 @@ def list_schemas(project_id: str) -> None: # [END pubsub_list_schemas] +def list_schema_revisions(project_id: str, schema_id: str) -> None: + """List schema revisions for a schema resource.""" + # [START pubsub_list_schema_revisions] + from google.cloud.pubsub import SchemaServiceClient + + # TODO(developer): Replace these variables before running the sample. + # project_id = "your-project-id" + # schema_id = "your-schema-id" + + schema_client = SchemaServiceClient() + schema_path = schema_client.schema_path(project_id, schema_id) + + for schema in schema_client.list_schema_revisions(request={"name": schema_path}): + print(schema) + + print("Listed schema revisions.") + # [END pubsub_list_schema_revisions] + + +def rollback_schema_revision( + project_id: str, schema_id: str, schema_revision_id: str +) -> None: + """Roll back a schema revision.""" + # [START pubsub_rollback_schema] + from google.api_core.exceptions import NotFound + from google.cloud.pubsub import SchemaServiceClient + + # TODO(developer): Replace these variables before running the sample. + # project_id = "your-project-id" + # schema_id = "your-schema-id" + # schema_revision_id = "your-schema-revision-id" + + schema_client = SchemaServiceClient() + schema_path = schema_client.schema_path(project_id, schema_id) + + try: + result = schema_client.rollback_schema( + request={"name": schema_path, "revision_id": schema_revision_id} + ) + print(f"Rolled back a schema revision:\n{result}") + except NotFound: + print(f"{schema_id} not found.") + # [END pubsub_rollback_schema] + + def delete_schema(project_id: str, schema_id: str) -> None: """Delete a schema resource.""" # [START pubsub_delete_schema] @@ -153,6 +290,28 @@ def delete_schema(project_id: str, schema_id: str) -> None: # [END pubsub_delete_schema] +def delete_schema_revision(project_id: str, schema_id: str, revision_id: str) -> None: + """Delete a schema revision.""" + # [START pubsub_delete_schema_revision] + from google.api_core.exceptions import NotFound + from google.cloud.pubsub import SchemaServiceClient + + # TODO(developer): Replace these variables before running the sample. + # project_id = "your-project-id" + # schema_id = "your-schema-id" + # revision_id = "your-revision-id" + + schema_client = SchemaServiceClient() + schema_path = schema_client.schema_path(project_id, schema_id + "@" + revision_id) + + try: + schema_client.delete_schema_revision(request={"name": schema_path}) + print(f"Deleted a schema revision:\n{schema_path}") + except NotFound: + print(f"{schema_id} not found.") + # [END pubsub_delete_schema_revision] + + def create_topic_with_schema( project_id: str, topic_id: str, schema_id: str, message_encoding: str ) -> None: @@ -194,10 +353,106 @@ def create_topic_with_schema( except AlreadyExists: print(f"{topic_id} already exists.") except InvalidArgument: - print("Please choose either BINARY or JSON as a valid message encoding type.") + print("Schema settings are not valid.") # [END pubsub_create_topic_with_schema] +def update_topic_schema( + project_id: str, topic_id: str, first_revision_id: str, last_revision_id: str +) -> None: + """Update a topic resource's first schema revision.""" + # [START pubsub_update_topic_schema] + from google.api_core.exceptions import InvalidArgument, NotFound + from google.cloud.pubsub import PublisherClient + + # TODO(developer): Replace these variables before running the sample. + # project_id = "your-project-id" + # topic_id = "your-topic-id" + # first_revision_id = "your-revision-id" + # last_revision_id = "your-revision-id" + + publisher_client = PublisherClient() + topic_path = publisher_client.topic_path(project_id, topic_id) + + try: + response = publisher_client.update_topic( + request={ + "topic": { + "name": topic_path, + "schema_settings": { + "first_revision_id": first_revision_id, + "last_revision_id": last_revision_id, + }, + }, + "update_mask": "schemaSettings.firstRevisionId,schemaSettings.lastRevisionId", + } + ) + print(f"Updated a topic schema:\n{response}") + + except NotFound: + print(f"{topic_id} not found.") + except InvalidArgument: + print("Schema settings are not valid.") + # [END pubsub_update_topic_schema] + + +def create_topic_with_schema_revisions( + project_id: str, + topic_id: str, + schema_id: str, + first_revision_id: str, + last_revision_id: str, + message_encoding: str, +) -> None: + """Create a topic resource with a schema.""" + # [START pubsub_create_topic_with_schema_revisions] + from google.api_core.exceptions import AlreadyExists, InvalidArgument + from google.cloud.pubsub import PublisherClient, SchemaServiceClient + from google.pubsub_v1.types import Encoding + + # TODO(developer): Replace these variables before running the sample. + # project_id = "your-project-id" + # topic_id = "your-topic-id" + # schema_id = "your-schema-id" + # first_revision_id = "your-revision-id" + # last_revision_id = "your-revision-id" + # Choose either BINARY or JSON as valid message encoding in this topic. + # message_encoding = "BINARY" + + publisher_client = PublisherClient() + topic_path = publisher_client.topic_path(project_id, topic_id) + + schema_client = SchemaServiceClient() + schema_path = schema_client.schema_path(project_id, schema_id) + + if message_encoding == "BINARY": + encoding = Encoding.BINARY + elif message_encoding == "JSON": + encoding = Encoding.JSON + else: + encoding = Encoding.ENCODING_UNSPECIFIED + + try: + response = publisher_client.create_topic( + request={ + "name": topic_path, + "schema_settings": { + "schema": schema_path, + "encoding": encoding, + "first_revision_id": first_revision_id, + "last_revision_id": last_revision_id, + }, + } + ) + print(f"Created a topic:\n{response}") + + except AlreadyExists: + print(f"{topic_id} already exists.") + except InvalidArgument: + print("Please choose either BINARY or JSON as a valid message encoding type.") + # [END pubsub_create_topic_with_schema_revisions] + + def publish_avro_records(project_id: str, topic_id: str, avsc_file: str) -> None: """Pulbish a BINARY or JSON encoded message to a topic configured with an Avro schema.""" # [START pubsub_publish_avro_records] @@ -359,6 +614,90 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None: # [END pubsub_subscribe_avro_records] +def subscribe_with_avro_schema_with_revisions( + project_id: str, + subscription_id: str, + avsc_file: str, + timeout: Optional[float] = None, +) -> None: + """Receive and decode messages sent to a topic with an Avro schema.""" + # [START pubsub_subscribe_avro_records_with_revisions] + import avro + from avro.io import BinaryDecoder, DatumReader + from concurrent.futures import TimeoutError + import io + import json + from google.api_core.exceptions import NotFound + from google.cloud.pubsub import SchemaServiceClient, SubscriberClient + + schema_client = SchemaServiceClient() + + # TODO(developer) + # project_id = "your-project-id" + # subscription_id = "your-subscription-id" + # avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json" + # Number of seconds the subscriber listens for messages + # timeout = 5.0 + + subscriber = SubscriberClient() + subscription_path = subscriber.subscription_path(project_id, subscription_id) + + writer_avro_schema = avro.schema.parse(open(avsc_file, "rb").read()) + # Dict to keep readers for different schema revisions. + revisions_to_readers = {} + + def callback(message: pubsub_v1.subscriber.message.Message) -> None: + # Get the message serialization type. + schema_name = message.attributes.get("googclient_schemaname") + schema_revision_id = message.attributes.get("googclient_schemarevisionid") + encoding = message.attributes.get("googclient_schemaencoding") + + if schema_revision_id not in revisions_to_readers: + schema_path = schema_name + "@" + schema_revision_id + try: + received_avro_schema = schema_client.get_schema( + request={"name": schema_path} + ) + except NotFound: + print(f"{schema_path} not found.") + message.nack() + return + reader_avro_schema = avro.schema.parse(received_avro_schema.definition) + revisions_to_readers[schema_revision_id] = DatumReader( + writer_avro_schema, reader_avro_schema + ) + reader = revisions_to_readers[schema_revision_id] + + # Deserialize the message data accordingly. + if encoding == "BINARY": + bout = io.BytesIO(message.data) + decoder = BinaryDecoder(bout) + message_data = reader.read(decoder) + print(f"Received a binary-encoded message:\n{message_data}") + elif encoding == "JSON": + message_data = json.loads(message.data) + print(f"Received a JSON-encoded message:\n{message_data}") + else: + print(f"Received a message with no encoding:\n{message}") + message.nack() + + message.ack() + + streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback) + print(f"Listening for messages on {subscription_path}..\n") + + # Wrap subscriber in a 'with' block to automatically call close() when done. + with subscriber: + try: + # When `timeout` is not set, result() will block indefinitely, + # unless an exception occurs first. + streaming_pull_future.result(timeout=timeout) + except TimeoutError: + streaming_pull_future.cancel() # Trigger the shutdown. + streaming_pull_future.result() # Block until the shutdown is complete. + # [END pubsub_subscribe_avro_records_with_revisions] + + def subscribe_with_proto_schema( project_id: str, subscription_id: str, timeout: float ) -> None: @@ -484,16 +823,35 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None: create_avro_schema(args.project_id, args.schema_id, args.avsc_file) if args.command == "create-proto": create_proto_schema(args.project_id, args.schema_id, args.proto_file) + if args.command == "commit-avro": + commit_avro_schema(args.project_id, args.schema_id, args.avsc_file) + if args.command == "commit-proto": + commit_proto_schema(args.project_id, args.schema_id, args.proto_file) if args.command == "get": get_schema(args.project_id, args.schema_id) + if args.command == "get-revision": + get_schema_revision(args.project_id, args.schema_id, args.revision_id) if args.command == "list": list_schemas(args.project_id) + if args.command == "list-revisions": + list_schema_revisions(args.project_id, args.schema_id) if args.command == "delete": delete_schema(args.project_id, args.schema_id) + if args.command == "delete-revision": + delete_schema_revision(args.project_id, args.schema_id, args.revision_id) if args.command == "create-topic": create_topic_with_schema( args.project_id, args.topic_id, args.schema_id, args.message_encoding ) + if args.command == "create-topic-with-revisions": + create_topic_with_schema_revisions( + args.project_id, + args.topic_id, + args.schema_id, + args.first_revision_id, + args.last_revision_id, + args.message_encoding, + ) if args.command == "publish-avro": publish_avro_records(args.project_id, args.topic_id, args.avsc_file) if args.command == "publish-proto": @@ -502,5 +860,9 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None: subscribe_with_avro_schema( args.project_id, args.subscription_id, args.avsc_file, args.timeout ) + if args.command == "receive-avro-with-revisions": + subscribe_with_avro_schema_with_revisions( + args.project_id, args.subscription_id, args.avsc_file, args.timeout + ) if args.command == "receive-proto": subscribe_with_proto_schema(args.project_id, args.subscription_id, args.timeout) diff --git a/samples/snippets/schema_test.py b/samples/snippets/schema_test.py index 7780bebc1..f62449e88 100644 --- a/samples/snippets/schema_test.py +++ b/samples/snippets/schema_test.py @@ -15,17 +15,14 @@ # limitations under the License. import os -from typing import Any, Callable, cast, Generator, TypeVar +from typing import Any, Callable, Generator, TypeVar, cast import uuid from _pytest.capture import CaptureFixture from flaky import flaky -from google.api_core.exceptions import InternalServerError -from google.api_core.exceptions import NotFound +from google.api_core.exceptions import InternalServerError, NotFound from google.cloud import pubsub_v1 -from google.cloud.pubsub import PublisherClient -from google.cloud.pubsub import SchemaServiceClient -from google.cloud.pubsub import SubscriberClient +from google.cloud.pubsub import PublisherClient, SchemaServiceClient, SubscriberClient from google.pubsub_v1.types import Encoding import pytest @@ -39,12 +36,15 @@ raise KeyError("Need to set GOOGLE_CLOUD_PROJECT as an environment variable.") AVRO_TOPIC_ID = f"schema-test-avro-topic-{UUID}" PROTO_TOPIC_ID = f"schema-test-proto-topic-{UUID}" +PROTO_WITH_REVISIONS_TOPIC_ID = f"schema-test-proto-with-revisions-topic-{UUID}" AVRO_SUBSCRIPTION_ID = f"schema-test-avro-subscription-{UUID}" PROTO_SUBSCRIPTION_ID = f"schema-test-proto-subscription-{UUID}" AVRO_SCHEMA_ID = f"schema-test-avro-schema-{UUID}" PROTO_SCHEMA_ID = f"schema-test-proto-schema-{UUID}" AVSC_FILE = "resources/us-states.avsc" +AVSC_REVISION_FILE = "resources/us-states.avsc" PROTO_FILE = "resources/us-states.proto" +PROTO_REVISION_FILE = "resources/us-states.proto" # These tests run in parallel if pytest-parallel is installed. # Avoid modifying resources that are shared across tests, @@ -229,6 +229,30 @@ def test_create_proto_schema( assert f"{proto_schema}" in out +def test_commit_avro_schema( + schema_client: pubsub_v1.SchemaServiceClient, + avro_schema: str, + capsys: CaptureFixture[str], +) -> None: + schema.commit_avro_schema(PROJECT_ID, AVRO_SCHEMA_ID, AVSC_REVISION_FILE) + + out, _ = capsys.readouterr() + assert "Committed a schema revision using an Avro schema file:" in out + # assert f"{avro_schema}" in out + + +def test_commit_proto_schema( + schema_client: pubsub_v1.SchemaServiceClient, + proto_schema: str, + capsys: CaptureFixture[str], +) -> None: + schema.commit_proto_schema(PROJECT_ID, PROTO_SCHEMA_ID, PROTO_REVISION_FILE) + + out, _ = capsys.readouterr() + assert "Committed a schema revision using a protobuf schema file:" in out + # assert f"{proto_schema}" in out + + def test_get_schema(avro_schema: str, capsys: CaptureFixture[str]) -> None: schema.get_schema(PROJECT_ID, AVRO_SCHEMA_ID) out, _ = capsys.readouterr() @@ -236,12 +260,54 @@ def test_get_schema(avro_schema: str, capsys: CaptureFixture[str]) -> None: assert f"{avro_schema}" in out +def test_get_schema_revsion(avro_schema: str, capsys: CaptureFixture[str]) -> None: + committed_schema = schema.commit_avro_schema( + PROJECT_ID, AVRO_SCHEMA_ID, AVSC_REVISION_FILE + ) + schema.get_schema_revision(PROJECT_ID, AVRO_SCHEMA_ID, committed_schema.revision_id) + out, _ = capsys.readouterr() + assert "Got a schema revision" in out + assert f"{avro_schema}" in out + + +def test_rollback_schema_revsion(avro_schema: str, capsys: CaptureFixture[str]) -> None: + committed_schema = schema.commit_avro_schema( + PROJECT_ID, AVRO_SCHEMA_ID, AVSC_REVISION_FILE + ) + schema.commit_avro_schema(PROJECT_ID, AVRO_SCHEMA_ID, AVSC_REVISION_FILE) + schema.rollback_schema_revision( + PROJECT_ID, AVRO_SCHEMA_ID, committed_schema.revision_id + ) + out, _ = capsys.readouterr() + assert "Rolled back a schema revision" in out + # assert f"{avro_schema}" in out + + +def test_delete_schema_revsion(avro_schema: str, capsys: CaptureFixture[str]) -> None: + committed_schema = schema.commit_avro_schema( + PROJECT_ID, AVRO_SCHEMA_ID, AVSC_REVISION_FILE + ) + schema.commit_avro_schema(PROJECT_ID, AVRO_SCHEMA_ID, AVSC_REVISION_FILE) + schema.delete_schema_revision( + PROJECT_ID, AVRO_SCHEMA_ID, committed_schema.revision_id + ) + out, _ = capsys.readouterr() + assert "Deleted a schema revision" in out + # assert f"{avro_schema}" in out + + def test_list_schemas(capsys: CaptureFixture[str]) -> None: schema.list_schemas(PROJECT_ID) out, _ = capsys.readouterr() assert "Listed schemas." in out +def test_list_schema_revisions(capsys: CaptureFixture[str]) -> None: + schema.list_schema_revisions(PROJECT_ID, AVRO_SCHEMA_ID) + out, _ = capsys.readouterr() + assert "Listed schema revisions." in out + + def test_create_topic_with_schema( avro_schema: str, capsys: CaptureFixture[str] ) -> None: @@ -253,6 +319,45 @@ def test_create_topic_with_schema( assert "BINARY" in out or "2" in out +def test_create_topic_with_schema_revisions( + proto_schema: str, capsys: CaptureFixture[str] +) -> None: + committed_schema = schema.commit_proto_schema( + PROJECT_ID, PROTO_SCHEMA_ID, PROTO_REVISION_FILE + ) + schema.create_topic_with_schema_revisions( + PROJECT_ID, + PROTO_WITH_REVISIONS_TOPIC_ID, + PROTO_SCHEMA_ID, + committed_schema.revision_id, + committed_schema.revision_id, + "BINARY", + ) + out, _ = capsys.readouterr() + assert "Created a topic" in out + assert f"{PROTO_WITH_REVISIONS_TOPIC_ID}" in out + assert f"{proto_schema}" in out + assert "BINARY" in out or "2" in out + + +def test_update_topic_schema( + proto_schema: str, proto_topic: str, capsys: CaptureFixture[str] +) -> None: + committed_schema = schema.commit_proto_schema( + PROJECT_ID, PROTO_SCHEMA_ID, PROTO_REVISION_FILE + ) + schema.update_topic_schema( + PROJECT_ID, + PROTO_WITH_REVISIONS_TOPIC_ID, + committed_schema.revision_id, + committed_schema.revision_id, + ) + out, _ = capsys.readouterr() + assert "Updated a topic schema" in out + assert f"{PROTO_WITH_REVISIONS_TOPIC_ID}" in out + assert f"{proto_schema}" in out + + def test_publish_avro_records( avro_schema: str, avro_topic: str, capsys: CaptureFixture[str] ) -> None: @@ -275,6 +380,21 @@ def test_subscribe_with_avro_schema( assert "Received a binary-encoded message:" in out +def test_subscribe_with_avro_schema_revisions( + avro_schema: str, + avro_topic: str, + avro_subscription: str, + capsys: CaptureFixture[str], +) -> None: + schema.publish_avro_records(PROJECT_ID, AVRO_TOPIC_ID, AVSC_FILE) + + schema.subscribe_with_avro_schema_with_revisions( + PROJECT_ID, AVRO_SUBSCRIPTION_ID, AVSC_FILE, 9 + ) + out, _ = capsys.readouterr() + assert "Received a binary-encoded message:" in out + + def test_publish_proto_records(proto_topic: str, capsys: CaptureFixture[str]) -> None: schema.publish_proto_messages(PROJECT_ID, PROTO_TOPIC_ID) out, _ = capsys.readouterr() From f3f744c412b092aceda0d9b0998e2c0af86a1069 Mon Sep 17 00:00:00 2001 From: Mend Renovate Date: Sat, 11 Mar 2023 03:33:14 +0000 Subject: [PATCH 5/8] chore(deps): update dependency pytest to v7.2.2 (#882) --- samples/snippets/requirements-test.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samples/snippets/requirements-test.txt b/samples/snippets/requirements-test.txt index 3c743a349..97457d941 100644 --- a/samples/snippets/requirements-test.txt +++ b/samples/snippets/requirements-test.txt @@ -1,5 +1,5 @@ backoff==2.2.1 -pytest==7.2.1 +pytest==7.2.2 mock==5.0.1 flaky==3.7.0 google-cloud-bigquery==3.6.0 From a6df376efd522bfc78a90e0613f93e601bceebda Mon Sep 17 00:00:00 2001 From: Mend Renovate Date: Mon, 13 Mar 2023 21:25:36 +0000 Subject: [PATCH 6/8] chore(deps): update dependency google-cloud-bigquery to v3.7.0 (#883) --- samples/snippets/requirements-test.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samples/snippets/requirements-test.txt b/samples/snippets/requirements-test.txt index 97457d941..9dca1d1d5 100644 --- a/samples/snippets/requirements-test.txt +++ b/samples/snippets/requirements-test.txt @@ -2,4 +2,4 @@ backoff==2.2.1 pytest==7.2.2 mock==5.0.1 flaky==3.7.0 -google-cloud-bigquery==3.6.0 +google-cloud-bigquery==3.7.0 From 0d247e6b189409b4d57c95dbbbf3df3e0fac0fa2 Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Tue, 14 Mar 2023 11:02:53 -0400 Subject: [PATCH 7/8] fix: set x-goog-request-params for streaming pull request (#884) * samples: schema evolution * Add command-line commands * Fix tag for rollback * Make formatting fixes * Formatting fixes * Fix exceptions * fix: Set x-goog-request-params for streaming pull request --- .../subscriber/_protocol/streaming_pull_manager.py | 4 ++++ .../pubsub_v1/subscriber/test_streaming_pull_manager.py | 6 ++++++ 2 files changed, 10 insertions(+) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index 13974ebe4..2f5a31e49 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -279,6 +279,9 @@ def __init__( self._await_callbacks_on_shutdown = await_callbacks_on_shutdown self._ack_histogram = histogram.Histogram() self._last_histogram_size = 0 + self._stream_metadata = [ + ["x-goog-request-params", "subscription=" + subscription] + ] # If max_duration_per_lease_extension is the default # we set the stream_ack_deadline to the default of 60 @@ -845,6 +848,7 @@ def open( initial_request=get_initial_request, should_recover=self._should_recover, should_terminate=self._should_terminate, + metadata=self._stream_metadata, throttle_reopen=True, ) self._rpc.add_done_callback(self._on_rpc_done) diff --git a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index e01299ef9..199aea256 100644 --- a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -91,6 +91,7 @@ def test__wrap_callback_errors_error(): def test_constructor_and_default_state(): + mock.sentinel.subscription = str() manager = streaming_pull_manager.StreamingPullManager( mock.sentinel.client, mock.sentinel.subscription ) @@ -113,6 +114,7 @@ def test_constructor_and_default_state(): def test_constructor_with_default_options(): + mock.sentinel.subscription = str() flow_control_ = types.FlowControl() manager = streaming_pull_manager.StreamingPullManager( mock.sentinel.client, @@ -128,6 +130,7 @@ def test_constructor_with_default_options(): def test_constructor_with_min_and_max_duration_per_lease_extension_(): + mock.sentinel.subscription = str() flow_control_ = types.FlowControl( min_duration_per_lease_extension=15, max_duration_per_lease_extension=20 ) @@ -142,6 +145,7 @@ def test_constructor_with_min_and_max_duration_per_lease_extension_(): def test_constructor_with_min_duration_per_lease_extension_too_low(): + mock.sentinel.subscription = str() flow_control_ = types.FlowControl( min_duration_per_lease_extension=9, max_duration_per_lease_extension=9 ) @@ -156,6 +160,7 @@ def test_constructor_with_min_duration_per_lease_extension_too_low(): def test_constructor_with_max_duration_per_lease_extension_too_high(): + mock.sentinel.subscription = str() flow_control_ = types.FlowControl( max_duration_per_lease_extension=601, min_duration_per_lease_extension=601 ) @@ -1181,6 +1186,7 @@ def test_open(heartbeater, dispatcher, leaser, background_consumer, resumable_bi initial_request=mock.ANY, should_recover=manager._should_recover, should_terminate=manager._should_terminate, + metadata=manager._stream_metadata, throttle_reopen=True, ) initial_request_arg = resumable_bidi_rpc.call_args.kwargs["initial_request"] From 89bd082fcf3331d53ace786eeb134a258af85c36 Mon Sep 17 00:00:00 2001 From: "release-please[bot]" <55107282+release-please[bot]@users.noreply.github.com> Date: Tue, 14 Mar 2023 20:33:04 -0400 Subject: [PATCH 8/8] chore(main): release 2.15.1 (#885) Co-authored-by: release-please[bot] <55107282+release-please[bot]@users.noreply.github.com> --- .release-please-manifest.json | 2 +- CHANGELOG.md | 7 +++++++ google/pubsub/gapic_version.py | 2 +- google/pubsub_v1/gapic_version.py | 2 +- .../snippet_metadata_google.pubsub.v1.json | 2 +- 5 files changed, 11 insertions(+), 4 deletions(-) diff --git a/.release-please-manifest.json b/.release-please-manifest.json index 4fa518a27..6abca04f4 100644 --- a/.release-please-manifest.json +++ b/.release-please-manifest.json @@ -1,4 +1,4 @@ { - ".": "2.15.0" + ".": "2.15.1" } \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index 201301eb4..0108ba02e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,13 @@ [1]: https://pypi.org/project/google-cloud-pubsub/#history +## [2.15.1](https://github.com/googleapis/python-pubsub/compare/v2.15.0...v2.15.1) (2023-03-14) + + +### Bug Fixes + +* Set x-goog-request-params for streaming pull request ([#884](https://github.com/googleapis/python-pubsub/issues/884)) ([0d247e6](https://github.com/googleapis/python-pubsub/commit/0d247e6b189409b4d57c95dbbbf3df3e0fac0fa2)) + ## [2.15.0](https://github.com/googleapis/python-pubsub/compare/v2.14.1...v2.15.0) (2023-02-22) diff --git a/google/pubsub/gapic_version.py b/google/pubsub/gapic_version.py index 2788e5e55..505a42f15 100644 --- a/google/pubsub/gapic_version.py +++ b/google/pubsub/gapic_version.py @@ -13,4 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. # -__version__ = "2.15.0" # {x-release-please-version} +__version__ = "2.15.1" # {x-release-please-version} diff --git a/google/pubsub_v1/gapic_version.py b/google/pubsub_v1/gapic_version.py index 2788e5e55..505a42f15 100644 --- a/google/pubsub_v1/gapic_version.py +++ b/google/pubsub_v1/gapic_version.py @@ -13,4 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. # -__version__ = "2.15.0" # {x-release-please-version} +__version__ = "2.15.1" # {x-release-please-version} diff --git a/samples/generated_samples/snippet_metadata_google.pubsub.v1.json b/samples/generated_samples/snippet_metadata_google.pubsub.v1.json index 3e40ad50d..40e872753 100644 --- a/samples/generated_samples/snippet_metadata_google.pubsub.v1.json +++ b/samples/generated_samples/snippet_metadata_google.pubsub.v1.json @@ -8,7 +8,7 @@ ], "language": "PYTHON", "name": "google-cloud-pubsub", - "version": "2.15.0" + "version": "2.15.1" }, "snippets": [ {