diff --git a/samples/snippets/resources/us-states-plus.avsc b/samples/snippets/resources/us-states-plus.avsc new file mode 100644 index 000000000..74225ae7e --- /dev/null +++ b/samples/snippets/resources/us-states-plus.avsc @@ -0,0 +1,24 @@ +{ + "type":"record", + "name":"State", + "namespace":"utilities", + "doc":"A list of states in the United States of America.", + "fields":[ + { + "name":"name", + "type":"string", + "doc":"The common name of the state." + }, + { + "name":"post_abbr", + "type":"string", + "doc":"The postal code abbreviation of the state." + }, + { + "name":"population", + "type":"long", + "default":0, + "doc":"The population of the state." + } + ] +} diff --git a/samples/snippets/resources/us-states-plus.proto b/samples/snippets/resources/us-states-plus.proto new file mode 100644 index 000000000..9f845d9f4 --- /dev/null +++ b/samples/snippets/resources/us-states-plus.proto @@ -0,0 +1,9 @@ +syntax = "proto3"; + +package utilities; + +message StateProto { + string name = 1; + string post_abbr = 2; + int64 population = 3; +} diff --git a/samples/snippets/schema.py b/samples/snippets/schema.py index e2a171d1d..9c0dd656f 100644 --- a/samples/snippets/schema.py +++ b/samples/snippets/schema.py @@ -1,6 +1,6 @@ #!/usr/bin/env python -# Copyright 2021 Google LLC. All Rights Reserved. +# Copyright 2023 Google LLC. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -54,6 +54,7 @@ def create_avro_schema(project_id: str, schema_id: str, avsc_file: str) -> None: request={"parent": project_path, "schema": schema, "schema_id": schema_id} ) print(f"Created a schema using an Avro schema file:\n{result}") + return result except AlreadyExists: print(f"{schema_id} already exists.") # [END pubsub_create_avro_schema] @@ -88,11 +89,76 @@ def create_proto_schema(project_id: str, schema_id: str, proto_file: str) -> Non request={"parent": project_path, "schema": schema, "schema_id": schema_id} ) print(f"Created a schema using a protobuf schema file:\n{result}") + return result except AlreadyExists: print(f"{schema_id} already exists.") # [END pubsub_create_proto_schema] +def commit_avro_schema(project_id: str, schema_id: str, avsc_file: str) -> None: + """Commit a schema resource from a JSON-formatted Avro schema file.""" + # [START pubsub_commit_avro_schema] + from google.api_core.exceptions import NotFound + from google.cloud.pubsub import SchemaServiceClient + from google.pubsub_v1.types import Schema + + # TODO(developer): Replace these variables before running the sample. + # project_id = "your-project-id" + # schema_id = "your-schema-id" + # avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json" + + # Read a JSON-formatted Avro schema file as a string. + with open(avsc_file, "rb") as f: + avsc_source = f.read().decode("utf-8") + + schema_client = SchemaServiceClient() + schema_path = schema_client.schema_path(project_id, schema_id) + schema = Schema(name=schema_path, type_=Schema.Type.AVRO, definition=avsc_source) + + try: + result = schema_client.commit_schema( + request={"schema": schema, "name": schema_path} + ) + print(f"Committed a schema revision using an Avro schema file:\n{result}") + return result + except NotFound: + print(f"{schema_id} does not exist.") + # [END pubsub_commit_avro_schema] + + +def commit_proto_schema(project_id: str, schema_id: str, proto_file: str) -> None: + """Commit a schema revision from a protobuf schema file.""" + # [START pubsub_commit_proto_schema] + from google.api_core.exceptions import NotFound + from google.cloud.pubsub import SchemaServiceClient + from google.pubsub_v1.types import Schema + + # TODO(developer): Replace these variables before running the sample. + # project_id = "your-project-id" + # schema_id = "your-schema-id" + # proto_file = "path/to/a/proto/file/(.proto)/formatted/in/protocol/buffers" + + # Read a protobuf schema file as a string. + with open(proto_file, "rb") as f: + proto_source = f.read().decode("utf-8") + + schema_client = SchemaServiceClient() + schema_path = schema_client.schema_path(project_id, schema_id) + schema = Schema( + name=schema_path, type_=Schema.Type.PROTOCOL_BUFFER, definition=proto_source + ) + + try: + result = schema_client.commit_schema( + request={"schema": schema, "name": schema_path} + ) + print(f"Committed a schema revision using a protobuf schema file:\n{result}") + return result + except NotFound: + print(f"{schema_id} does not exist.") + # [END pubsub_commit_proto_schema] + + def get_schema(project_id: str, schema_id: str) -> None: """Get a schema resource.""" # [START pubsub_get_schema] @@ -114,6 +180,32 @@ def get_schema(project_id: str, schema_id: str) -> None: # [END pubsub_get_schema] +def get_schema_revision( + project_id: str, schema_id: str, schema_revision_id: str +) -> None: + """Get a schema revision.""" + # [START pubsub_get_schema_revision] + from google.api_core.exceptions import NotFound + from google.cloud.pubsub import SchemaServiceClient + + # TODO(developer): Replace these variables before running the sample. + # project_id = "your-project-id" + # schema_id = "your-schema-id" + # schema_revision_id = "your-schema-revision-id" + + schema_client = SchemaServiceClient() + schema_path = schema_client.schema_path( + project_id, schema_id + "@" + schema_revision_id + ) + + try: + result = schema_client.get_schema(request={"name": schema_path}) + print(f"Got a schema revision:\n{result}") + except NotFound: + print(f"{schema_id} not found.") + # [END pubsub_get_schema_revision] + + def list_schemas(project_id: str) -> None: """List schema resources.""" # [START pubsub_list_schemas] @@ -132,6 +224,51 @@ def list_schemas(project_id: str) -> None: # [END pubsub_list_schemas] +def list_schema_revisions(project_id: str, schema_id: str) -> None: + """List schema revisions for a schema resource.""" + # [START pubsub_list_schema_revisions] + from google.cloud.pubsub import SchemaServiceClient + + # TODO(developer): Replace these variables before running the sample. + # project_id = "your-project-id" + # schema_id = "your-schema-id" + + schema_client = SchemaServiceClient() + schema_path = schema_client.schema_path(project_id, schema_id) + + for schema in schema_client.list_schema_revisions(request={"name": schema_path}): + print(schema) + + print("Listed schema revisions.") + # [END pubsub_list_schema_revisions] + + +def rollback_schema_revision( + project_id: str, schema_id: str, schema_revision_id: str +) -> None: + """Roll back a schema revision.""" + # [START pubsub_rollback_schema] + from google.api_core.exceptions import NotFound + from google.cloud.pubsub import SchemaServiceClient + + # TODO(developer): Replace these variables before running the sample. + # project_id = "your-project-id" + # schema_id = "your-schema-id" + # schema_revision_id = "your-schema-revision-id" + + schema_client = SchemaServiceClient() + schema_path = schema_client.schema_path(project_id, schema_id) + + try: + result = schema_client.rollback_schema( + request={"name": schema_path, "revision_id": schema_revision_id} + ) + print(f"Rolled back a schema revision:\n{result}") + except NotFound: + print(f"{schema_id} not found.") + # [END pubsub_rollback_schema] + + def delete_schema(project_id: str, schema_id: str) -> None: """Delete a schema resource.""" # [START pubsub_delete_schema] @@ -153,6 +290,28 @@ def delete_schema(project_id: str, schema_id: str) -> None: # [END pubsub_delete_schema] +def delete_schema_revision(project_id: str, schema_id: str, revision_id: str) -> None: + """Delete a schema revision.""" + # [START pubsub_delete_schema_revision] + from google.api_core.exceptions import NotFound + from google.cloud.pubsub import SchemaServiceClient + + # TODO(developer): Replace these variables before running the sample. + # project_id = "your-project-id" + # schema_id = "your-schema-id" + # revision_id = "your-revision-id" + + schema_client = SchemaServiceClient() + schema_path = schema_client.schema_path(project_id, schema_id + "@" + revision_id) + + try: + schema_client.delete_schema_revision(request={"name": schema_path}) + print(f"Deleted a schema revision:\n{schema_path}") + except NotFound: + print(f"{schema_id} not found.") + # [END pubsub_delete_schema_revision] + + def create_topic_with_schema( project_id: str, topic_id: str, schema_id: str, message_encoding: str ) -> None: @@ -194,10 +353,106 @@ def create_topic_with_schema( except AlreadyExists: print(f"{topic_id} already exists.") except InvalidArgument: - print("Please choose either BINARY or JSON as a valid message encoding type.") + print("Schema settings are not valid.") # [END pubsub_create_topic_with_schema] +def update_topic_schema( + project_id: str, topic_id: str, first_revision_id: str, last_revision_id: str +) -> None: + """Update a topic resource's first schema revision.""" + # [START pubsub_update_topic_schema] + from google.api_core.exceptions import InvalidArgument, NotFound + from google.cloud.pubsub import PublisherClient + + # TODO(developer): Replace these variables before running the sample. + # project_id = "your-project-id" + # topic_id = "your-topic-id" + # first_revision_id = "your-revision-id" + # last_revision_id = "your-revision-id" + + publisher_client = PublisherClient() + topic_path = publisher_client.topic_path(project_id, topic_id) + + try: + response = publisher_client.update_topic( + request={ + "topic": { + "name": topic_path, + "schema_settings": { + "first_revision_id": first_revision_id, + "last_revision_id": last_revision_id, + }, + }, + "update_mask": "schemaSettings.firstRevisionId,schemaSettings.lastRevisionId", + } + ) + print(f"Updated a topic schema:\n{response}") + + except NotFound: + print(f"{topic_id} not found.") + except InvalidArgument: + print("Schema settings are not valid.") + # [END pubsub_update_topic_schema] + + +def create_topic_with_schema_revisions( + project_id: str, + topic_id: str, + schema_id: str, + first_revision_id: str, + last_revision_id: str, + message_encoding: str, +) -> None: + """Create a topic resource with a schema.""" + # [START pubsub_create_topic_with_schema_revisions] + from google.api_core.exceptions import AlreadyExists, InvalidArgument + from google.cloud.pubsub import PublisherClient, SchemaServiceClient + from google.pubsub_v1.types import Encoding + + # TODO(developer): Replace these variables before running the sample. + # project_id = "your-project-id" + # topic_id = "your-topic-id" + # schema_id = "your-schema-id" + # first_revision_id = "your-revision-id" + # last_revision_id = "your-revision-id" + # Choose either BINARY or JSON as valid message encoding in this topic. + # message_encoding = "BINARY" + + publisher_client = PublisherClient() + topic_path = publisher_client.topic_path(project_id, topic_id) + + schema_client = SchemaServiceClient() + schema_path = schema_client.schema_path(project_id, schema_id) + + if message_encoding == "BINARY": + encoding = Encoding.BINARY + elif message_encoding == "JSON": + encoding = Encoding.JSON + else: + encoding = Encoding.ENCODING_UNSPECIFIED + + try: + response = publisher_client.create_topic( + request={ + "name": topic_path, + "schema_settings": { + "schema": schema_path, + "encoding": encoding, + "first_revision_id": first_revision_id, + "last_revision_id": last_revision_id, + }, + } + ) + print(f"Created a topic:\n{response}") + + except AlreadyExists: + print(f"{topic_id} already exists.") + except InvalidArgument: + print("Please choose either BINARY or JSON as a valid message encoding type.") + # [END pubsub_create_topic_with_schema_revisions] + + def publish_avro_records(project_id: str, topic_id: str, avsc_file: str) -> None: """Pulbish a BINARY or JSON encoded message to a topic configured with an Avro schema.""" # [START pubsub_publish_avro_records] @@ -359,6 +614,90 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None: # [END pubsub_subscribe_avro_records] +def subscribe_with_avro_schema_with_revisions( + project_id: str, + subscription_id: str, + avsc_file: str, + timeout: Optional[float] = None, +) -> None: + """Receive and decode messages sent to a topic with an Avro schema.""" + # [START pubsub_subscribe_avro_records_with_revisions] + import avro + from avro.io import BinaryDecoder, DatumReader + from concurrent.futures import TimeoutError + import io + import json + from google.api_core.exceptions import NotFound + from google.cloud.pubsub import SchemaServiceClient, SubscriberClient + + schema_client = SchemaServiceClient() + + # TODO(developer) + # project_id = "your-project-id" + # subscription_id = "your-subscription-id" + # avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json" + # Number of seconds the subscriber listens for messages + # timeout = 5.0 + + subscriber = SubscriberClient() + subscription_path = subscriber.subscription_path(project_id, subscription_id) + + writer_avro_schema = avro.schema.parse(open(avsc_file, "rb").read()) + # Dict to keep readers for different schema revisions. + revisions_to_readers = {} + + def callback(message: pubsub_v1.subscriber.message.Message) -> None: + # Get the message serialization type. + schema_name = message.attributes.get("googclient_schemaname") + schema_revision_id = message.attributes.get("googclient_schemarevisionid") + encoding = message.attributes.get("googclient_schemaencoding") + + if schema_revision_id not in revisions_to_readers: + schema_path = schema_name + "@" + schema_revision_id + try: + received_avro_schema = schema_client.get_schema( + request={"name": schema_path} + ) + except NotFound: + print(f"{schema_path} not found.") + message.nack() + return + reader_avro_schema = avro.schema.parse(received_avro_schema.definition) + revisions_to_readers[schema_revision_id] = DatumReader( + writer_avro_schema, reader_avro_schema + ) + reader = revisions_to_readers[schema_revision_id] + + # Deserialize the message data accordingly. + if encoding == "BINARY": + bout = io.BytesIO(message.data) + decoder = BinaryDecoder(bout) + message_data = reader.read(decoder) + print(f"Received a binary-encoded message:\n{message_data}") + elif encoding == "JSON": + message_data = json.loads(message.data) + print(f"Received a JSON-encoded message:\n{message_data}") + else: + print(f"Received a message with no encoding:\n{message}") + message.nack() + + message.ack() + + streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback) + print(f"Listening for messages on {subscription_path}..\n") + + # Wrap subscriber in a 'with' block to automatically call close() when done. + with subscriber: + try: + # When `timeout` is not set, result() will block indefinitely, + # unless an exception occurs first. + streaming_pull_future.result(timeout=timeout) + except TimeoutError: + streaming_pull_future.cancel() # Trigger the shutdown. + streaming_pull_future.result() # Block until the shutdown is complete. + # [END pubsub_subscribe_avro_records_with_revisions] + + def subscribe_with_proto_schema( project_id: str, subscription_id: str, timeout: float ) -> None: @@ -484,16 +823,35 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None: create_avro_schema(args.project_id, args.schema_id, args.avsc_file) if args.command == "create-proto": create_proto_schema(args.project_id, args.schema_id, args.proto_file) + if args.command == "commit-avro": + commit_avro_schema(args.project_id, args.schema_id, args.avsc_file) + if args.command == "commit-proto": + commit_proto_schema(args.project_id, args.schema_id, args.proto_file) if args.command == "get": get_schema(args.project_id, args.schema_id) + if args.command == "get-revision": + get_schema_revision(args.project_id, args.schema_id, args.revision_id) if args.command == "list": list_schemas(args.project_id) + if args.command == "list-revisions": + list_schema_revisions(args.project_id, args.schema_id) if args.command == "delete": delete_schema(args.project_id, args.schema_id) + if args.command == "delete-revision": + delete_schema_revision(args.project_id, args.schema_id, args.revision_id) if args.command == "create-topic": create_topic_with_schema( args.project_id, args.topic_id, args.schema_id, args.message_encoding ) + if args.command == "create-topic-with-revisions": + create_topic_with_schema_revisions( + args.project_id, + args.topic_id, + args.schema_id, + args.first_revision_id, + args.last_revision_id, + args.message_encoding, + ) if args.command == "publish-avro": publish_avro_records(args.project_id, args.topic_id, args.avsc_file) if args.command == "publish-proto": @@ -502,5 +860,9 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None: subscribe_with_avro_schema( args.project_id, args.subscription_id, args.avsc_file, args.timeout ) + if args.command == "receive-avro-with-revisions": + subscribe_with_avro_schema_with_revisions( + args.project_id, args.subscription_id, args.avsc_file, args.timeout + ) if args.command == "receive-proto": subscribe_with_proto_schema(args.project_id, args.subscription_id, args.timeout) diff --git a/samples/snippets/schema_test.py b/samples/snippets/schema_test.py index 7780bebc1..f62449e88 100644 --- a/samples/snippets/schema_test.py +++ b/samples/snippets/schema_test.py @@ -15,17 +15,14 @@ # limitations under the License. import os -from typing import Any, Callable, cast, Generator, TypeVar +from typing import Any, Callable, Generator, TypeVar, cast import uuid from _pytest.capture import CaptureFixture from flaky import flaky -from google.api_core.exceptions import InternalServerError -from google.api_core.exceptions import NotFound +from google.api_core.exceptions import InternalServerError, NotFound from google.cloud import pubsub_v1 -from google.cloud.pubsub import PublisherClient -from google.cloud.pubsub import SchemaServiceClient -from google.cloud.pubsub import SubscriberClient +from google.cloud.pubsub import PublisherClient, SchemaServiceClient, SubscriberClient from google.pubsub_v1.types import Encoding import pytest @@ -39,12 +36,15 @@ raise KeyError("Need to set GOOGLE_CLOUD_PROJECT as an environment variable.") AVRO_TOPIC_ID = f"schema-test-avro-topic-{UUID}" PROTO_TOPIC_ID = f"schema-test-proto-topic-{UUID}" +PROTO_WITH_REVISIONS_TOPIC_ID = f"schema-test-proto-with-revisions-topic-{UUID}" AVRO_SUBSCRIPTION_ID = f"schema-test-avro-subscription-{UUID}" PROTO_SUBSCRIPTION_ID = f"schema-test-proto-subscription-{UUID}" AVRO_SCHEMA_ID = f"schema-test-avro-schema-{UUID}" PROTO_SCHEMA_ID = f"schema-test-proto-schema-{UUID}" AVSC_FILE = "resources/us-states.avsc" +AVSC_REVISION_FILE = "resources/us-states.avsc" PROTO_FILE = "resources/us-states.proto" +PROTO_REVISION_FILE = "resources/us-states.proto" # These tests run in parallel if pytest-parallel is installed. # Avoid modifying resources that are shared across tests, @@ -229,6 +229,30 @@ def test_create_proto_schema( assert f"{proto_schema}" in out +def test_commit_avro_schema( + schema_client: pubsub_v1.SchemaServiceClient, + avro_schema: str, + capsys: CaptureFixture[str], +) -> None: + schema.commit_avro_schema(PROJECT_ID, AVRO_SCHEMA_ID, AVSC_REVISION_FILE) + + out, _ = capsys.readouterr() + assert "Committed a schema revision using an Avro schema file:" in out + # assert f"{avro_schema}" in out + + +def test_commit_proto_schema( + schema_client: pubsub_v1.SchemaServiceClient, + proto_schema: str, + capsys: CaptureFixture[str], +) -> None: + schema.commit_proto_schema(PROJECT_ID, PROTO_SCHEMA_ID, PROTO_REVISION_FILE) + + out, _ = capsys.readouterr() + assert "Committed a schema revision using a protobuf schema file:" in out + # assert f"{proto_schema}" in out + + def test_get_schema(avro_schema: str, capsys: CaptureFixture[str]) -> None: schema.get_schema(PROJECT_ID, AVRO_SCHEMA_ID) out, _ = capsys.readouterr() @@ -236,12 +260,54 @@ def test_get_schema(avro_schema: str, capsys: CaptureFixture[str]) -> None: assert f"{avro_schema}" in out +def test_get_schema_revsion(avro_schema: str, capsys: CaptureFixture[str]) -> None: + committed_schema = schema.commit_avro_schema( + PROJECT_ID, AVRO_SCHEMA_ID, AVSC_REVISION_FILE + ) + schema.get_schema_revision(PROJECT_ID, AVRO_SCHEMA_ID, committed_schema.revision_id) + out, _ = capsys.readouterr() + assert "Got a schema revision" in out + assert f"{avro_schema}" in out + + +def test_rollback_schema_revsion(avro_schema: str, capsys: CaptureFixture[str]) -> None: + committed_schema = schema.commit_avro_schema( + PROJECT_ID, AVRO_SCHEMA_ID, AVSC_REVISION_FILE + ) + schema.commit_avro_schema(PROJECT_ID, AVRO_SCHEMA_ID, AVSC_REVISION_FILE) + schema.rollback_schema_revision( + PROJECT_ID, AVRO_SCHEMA_ID, committed_schema.revision_id + ) + out, _ = capsys.readouterr() + assert "Rolled back a schema revision" in out + # assert f"{avro_schema}" in out + + +def test_delete_schema_revsion(avro_schema: str, capsys: CaptureFixture[str]) -> None: + committed_schema = schema.commit_avro_schema( + PROJECT_ID, AVRO_SCHEMA_ID, AVSC_REVISION_FILE + ) + schema.commit_avro_schema(PROJECT_ID, AVRO_SCHEMA_ID, AVSC_REVISION_FILE) + schema.delete_schema_revision( + PROJECT_ID, AVRO_SCHEMA_ID, committed_schema.revision_id + ) + out, _ = capsys.readouterr() + assert "Deleted a schema revision" in out + # assert f"{avro_schema}" in out + + def test_list_schemas(capsys: CaptureFixture[str]) -> None: schema.list_schemas(PROJECT_ID) out, _ = capsys.readouterr() assert "Listed schemas." in out +def test_list_schema_revisions(capsys: CaptureFixture[str]) -> None: + schema.list_schema_revisions(PROJECT_ID, AVRO_SCHEMA_ID) + out, _ = capsys.readouterr() + assert "Listed schema revisions." in out + + def test_create_topic_with_schema( avro_schema: str, capsys: CaptureFixture[str] ) -> None: @@ -253,6 +319,45 @@ def test_create_topic_with_schema( assert "BINARY" in out or "2" in out +def test_create_topic_with_schema_revisions( + proto_schema: str, capsys: CaptureFixture[str] +) -> None: + committed_schema = schema.commit_proto_schema( + PROJECT_ID, PROTO_SCHEMA_ID, PROTO_REVISION_FILE + ) + schema.create_topic_with_schema_revisions( + PROJECT_ID, + PROTO_WITH_REVISIONS_TOPIC_ID, + PROTO_SCHEMA_ID, + committed_schema.revision_id, + committed_schema.revision_id, + "BINARY", + ) + out, _ = capsys.readouterr() + assert "Created a topic" in out + assert f"{PROTO_WITH_REVISIONS_TOPIC_ID}" in out + assert f"{proto_schema}" in out + assert "BINARY" in out or "2" in out + + +def test_update_topic_schema( + proto_schema: str, proto_topic: str, capsys: CaptureFixture[str] +) -> None: + committed_schema = schema.commit_proto_schema( + PROJECT_ID, PROTO_SCHEMA_ID, PROTO_REVISION_FILE + ) + schema.update_topic_schema( + PROJECT_ID, + PROTO_WITH_REVISIONS_TOPIC_ID, + committed_schema.revision_id, + committed_schema.revision_id, + ) + out, _ = capsys.readouterr() + assert "Updated a topic schema" in out + assert f"{PROTO_WITH_REVISIONS_TOPIC_ID}" in out + assert f"{proto_schema}" in out + + def test_publish_avro_records( avro_schema: str, avro_topic: str, capsys: CaptureFixture[str] ) -> None: @@ -275,6 +380,21 @@ def test_subscribe_with_avro_schema( assert "Received a binary-encoded message:" in out +def test_subscribe_with_avro_schema_revisions( + avro_schema: str, + avro_topic: str, + avro_subscription: str, + capsys: CaptureFixture[str], +) -> None: + schema.publish_avro_records(PROJECT_ID, AVRO_TOPIC_ID, AVSC_FILE) + + schema.subscribe_with_avro_schema_with_revisions( + PROJECT_ID, AVRO_SUBSCRIPTION_ID, AVSC_FILE, 9 + ) + out, _ = capsys.readouterr() + assert "Received a binary-encoded message:" in out + + def test_publish_proto_records(proto_topic: str, capsys: CaptureFixture[str]) -> None: schema.publish_proto_messages(PROJECT_ID, PROTO_TOPIC_ID) out, _ = capsys.readouterr()