Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/admin/api/client.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Admin Client API
================

.. automodule:: google.cloud.pubsublite.admin_client
:members:
:inherited-members:
80 changes: 80 additions & 0 deletions docs/admin/index.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
Admin Operations
================

Admin operations are handled through the
:class:`~.pubsublite.admin_client.AdminClient` class (aliased as
``google.cloud.pubsublite.AdminClient``).

Instantiating an admin client requires you to provide a valid cloud region
for the Pub/Sub Lite service:

.. code-block:: python

from google.cloud.pubsublite import AdminClient
cloud_region = CloudRegion("us-central1")
admin_client = AdminClient(cloud_region)


Create a topic
--------------

To create a message, use the
:meth:`~.pubsublite.admin_client.AdminClient.create_topic` method. This method accepts
one positional arguments: a :class:`~.pubsublite_v1.types.Topic` object, where the name
of the topic is passed along as a string.

Pub/Sub Lite topics have the canonical form of

``projects/{project_number}/locations/{location}/topics/{topic_id}``

A location (a.k.a. `zone`_) is comprised of a cloud region and a zone ID.

.. _zone: https://cloud.google.com/pubsub/lite/docs/locations/

A call to create a Pub/Sub Lite topic looks like:

.. code-block:: python

from google.cloud.pubsublite import AdminClient, Topic
from google.cloud.pubsublite.types import (
CloudRegion, CloudZone, TopicPath,
)

project_number = 1122334455
zone_id = "a"
topic_id = "your-topic-id"

cloud_region = CloudRegion(cloud_region)
location = CloudZone(cloud_region, zone_id)
topic_path = TopicPath(project_number, location, topic_id)

topic = Topic(
name=str(topic_path),
partition_config=Topic.PartitionConfig(
# 1 partition
count=1,
# Publish at 4 MiB/s and subscribe at 8 MiB/s
capacity=Topic.PartitionConfig.Capacity(
publish_mib_per_sec=4,
subscribe_mib_per_sec=8,
),
),
retention_config=Topic.RetentionConfig(
# 30 GiB
per_partition_bytes=30 * 1024 * 1024 * 1024,
# 7 days
period=Duration(seconds=60 * 60 * 24 * 7),
),
)

admin_client = AdminClient(cloud_region)
response = admin_client.create_topic(topic)


API Reference
-------------

.. toctree::
:maxdepth: 2

api/client
4 changes: 2 additions & 2 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,10 +345,10 @@

# Example configuration for intersphinx: refer to the Python standard library.
intersphinx_mapping = {
"python": ("http://python.readthedocs.org/en/latest/", None),
"python": ("https://python.readthedocs.io/en/latest/", None),
"google-auth": ("https://google-auth.readthedocs.io/en/stable", None),
"google.api_core": ("https://googleapis.dev/python/google-api-core/latest/", None,),
"grpc": ("https://grpc.io/grpc/python/", None),
"grpc": ("https://grpc.github.io/grpc/python/", None),
"proto-plus": ("https://proto-plus-python.readthedocs.io/en/latest/", None),
}

Expand Down
8 changes: 5 additions & 3 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
API Reference
-------------
.. toctree::
:maxdepth: 2
:maxdepth: 3

pubsublite_v1/services
pubsublite_v1/types
Admin Client <admin/index>
Publisher Client <publisher/index>
Subscriber Client <subscriber/index>
Types <types>

Changelog
---------
Expand Down
6 changes: 6 additions & 0 deletions docs/publisher/api/client.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Publisher Client API
====================

.. automodule:: google.cloud.pubsublite.cloudpubsub.publisher_client
:members:
:inherited-members:
60 changes: 60 additions & 0 deletions docs/publisher/index.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
Publisher Client
================

Publish operations are handled through the
:class:`~.pubsublite.cloudpubsub.publisher_client.PublisherClient` class (aliased as
``google.cloud.pubsublite.cloudpubsub.PublisherClient``).

You should instantiate a publisher client using a context manager:

.. code-block:: python

from google.cloud.pubsublite.cloudpubsub import PublisherClient

with PublisherClient() as publisher_client:
# Use publisher_client

Comment thread
anguillanneuf marked this conversation as resolved.
When not using a context manager, you need to call
:meth:`~.pubsublite.cloudpubsub.publisher_client.PublisherClient.__enter__`.

Publish a message
-----------------

To publish a message, use the
:meth:`~.pubsublite.cloudpubsub.publisher_client.PublisherClient.publish`
method. This method accepts
two positional arguments: a :class:`~.pubsublite.types.TopicPath` object
and a message in byte string.

A call to publish a message looks like:

.. code-block:: python

from google.cloud.pubsublite.cloudpubsub import PublisherClient
from google.cloud.pubsublite.types import (
CloudRegion, CloudZone, TopicPath,
)

project_number = 1122334455
cloud_region = "us-central1"
zone_id = "a"
topic_id = "your-topic-id"

location = CloudZone(CloudRegion(cloud_region), zone_id)
topic_path = TopicPath(project_number, location, topic_id)

with PublisherClient() as publisher_client:
data = "Hello world!"
api_future = publisher_client.publish(t
opic_path, data.encode("utf-8")
)
message_id = api_future.result()


API Reference
-------------

.. toctree::
:maxdepth: 2

api/client
21 changes: 0 additions & 21 deletions docs/pubsublite_v1/services.rst

This file was deleted.

6 changes: 0 additions & 6 deletions docs/pubsublite_v1/types.rst

This file was deleted.

6 changes: 6 additions & 0 deletions docs/subscriber/api/client.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Subscriber Client API
=====================

.. automodule:: google.cloud.pubsublite.cloudpubsub.subscriber_client
:members:
:inherited-members:
104 changes: 104 additions & 0 deletions docs/subscriber/index.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
Subscriber Client
=================

Subscribe operations are handled through the
:class:`~.pubsublite.cloudpubsub.subscriber_client.SubscriberClient` class (aliased as
``google.cloud.pubsublite.cloudpubsub.SubscriberClient``).

You should instantiate a subscriber client using a context manager:

.. code-block:: python

from google.cloud.pubsublite.cloudpubsub import SubscriberClient
Comment thread
anguillanneuf marked this conversation as resolved.

with SubscriberClient() as subscriber_client:
# Use subscriber_client

When not using a context manager, you need to call
:meth:`~.pubsublite.cloudpubsub.subscriber_client.SubscriberClient.__enter__`.

Receive messages
----------------

To receive messages, use the
:meth:`~.pubsublite.cloudpubsub.subscriber_client.SubscriberClient.subscribe`
method. This method requires
three positional arguments: a :class:`~.pubsublite.types.SubscriptionPath` object,
a callback function, and a :class:`~.pubsublite.types.FlowControlSettings` object.

Receiving messages looks like:

.. code-block:: python

from google.cloud.pubsublite.cloudpubsub import SubscriberClient
from google.cloud.pubsublite.types import (
CloudRegion,
CloudZone,
SubscriptionPath,
DISABLED_FLOW_CONTROL,
)

project_number = 1122334455
cloud_region = "us-central1"
zone_id = "a"
subscription_id = "your-subscription-id"

location = CloudZone(CloudRegion(cloud_region), zone_id)
subscription_path = SubscriptionPath(project_number, location, subscription_id)

with SubscriberClient() as subscriber_client:
streaming_pull_future = subscriber_client.subscribe(
subscription_path,
callback=callback,
per_partition_flow_control_settings=DISABLED_FLOW_CONTROL,
)

streaming_pull_future.result()

Subscriber Callbacks
--------------------

Received messages are processed in a callback function. This callback function
only takes one argument of type :class:`google.cloud.pubsub_v1.subscriber.message.Message`.
After this message has been processed, the function should either call
:meth:`~.pubsub_v1.subscriber.message.Message.ack` to acknowledge the message or
:meth:`~.pubsub_v1.subscriber.message.Message.nack` to send a negative acknowledgement.

.. code-block:: python

def callback(message):
message_data = message.data.decode("utf-8")
print(f"Received {message_data}.")
message.ack()

Flow Control Settings
---------------------

Flow control settings are applied per partition.
They control when to pause the message stream to a partition so the server temporarily
stops sending out more messages (known as outstanding messages) from this partition.

Comment thread
dpcollins-google marked this conversation as resolved.
You can configure flow control settings by setting the maximum number and size of
outstanding messages. The message stream is paused when either condition is met.

.. code-block:: python

from google.cloud.pubsublite.types import FlowControlSettings

flow_control_settings = FlowControlSettings(
# 1,000 outstanding messages. Must be >0.
messages_outstanding=1000,
# 10 MiB. Must be greater than the allowed size of the largest message (1 MiB).
bytes_outstanding=10 * 1024 * 1024,
)

You may also turn off flow control settings by setting it to
:class:`google.cloud.pubsublite.types.DISABLED_FLOW_CONTROL`.

API Reference
-------------

.. toctree::
:maxdepth: 2

api/client
18 changes: 18 additions & 0 deletions docs/types.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
Pub/Sub Lite Client Types
=========================

.. automodule:: google.cloud.pubsublite.types
:members:
:noindex:

.. autoclass:: google.cloud.pubsublite_v1.Cursor
:members:
:noindex:

.. autoclass:: google.cloud.pubsublite_v1.Subscription
:members:
:noindex:

.. autoclass:: google.cloud.pubsublite_v1.Topic
:members:
:noindex:
16 changes: 16 additions & 0 deletions google/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
# -*- coding: utf-8 -*-
#
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pkg_resources

pkg_resources.declare_namespace(__name__)
16 changes: 16 additions & 0 deletions google/cloud/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
# -*- coding: utf-8 -*-
#
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pkg_resources

pkg_resources.declare_namespace(__name__)
Loading