From 40718e2ff77d44d01d089e253a82f40afa68e63e Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Thu, 4 May 2023 20:52:47 -0400 Subject: [PATCH 01/14] fix assert fail for duplicate keys --- .../subscriber/_protocol/messages_on_hold.py | 49 +++++++++++-------- .../subscriber/test_messages_on_hold.py | 40 +++++++++++++++ 2 files changed, 69 insertions(+), 20 deletions(-) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/messages_on_hold.py b/google/cloud/pubsub_v1/subscriber/_protocol/messages_on_hold.py index 5c3cc1a75..e83f256b5 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/messages_on_hold.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/messages_on_hold.py @@ -13,6 +13,7 @@ # limitations under the License. import collections +import logging import typing from typing import Any, Callable, Iterable, Optional @@ -20,6 +21,8 @@ from google.cloud.pubsub_v1 import subscriber +_LOGGER = logging.getLogger(__name__) + class MessagesOnHold(object): """Tracks messages on hold by ordering key. Not thread-safe.""" @@ -71,12 +74,14 @@ def get(self) -> Optional["subscriber.message.Message"]: if msg.ordering_key: pending_queue = self._pending_ordered_messages.get(msg.ordering_key) if pending_queue is None: + _LOGGER.info("pending_queue is None for ordering key: %s", msg.ordering_key) # Create empty queue to indicate a message with the # ordering key is in flight. self._pending_ordered_messages[ msg.ordering_key ] = collections.deque() self._size = self._size - 1 + _LOGGER.info("Created pending_queue for ordering key: %s", msg.ordering_key) return msg else: # Another message is in flight so add message to end of @@ -113,23 +118,27 @@ def activate_ordering_keys( Args: ordering_keys: - The ordering keys to activate. May be empty. + The ordering keys to activate. May be empty, or contain duplicates. schedule_message_callback: The callback to call to schedule a message to be sent to the user. """ + activated_keys = set() for key in ordering_keys: - assert ( - self._pending_ordered_messages.get(key) is not None - ), "A message queue should exist for every ordered message in flight." - next_msg = self._get_next_for_ordering_key(key) - if next_msg: - # Schedule the next message because the previous was dropped. - # Note that this may overload the user's `max_bytes` limit, but - # not their `max_messages` limit. - schedule_message_callback(next_msg) - else: - # No more messages for this ordering key, so do clean-up. - self._clean_up_ordering_key(key) + if key not in activated_keys: + activated_keys.add(key) + pending_ordered_messages = self._pending_ordered_messages.get(key) + if pending_ordered_messages is None: + _LOGGER.warning("No message queue exists for message ordering key: %s.", key) + continue + next_msg = self._get_next_for_ordering_key(key) + if next_msg: + # Schedule the next message because the previous was dropped. + # Note that this may overload the user's `max_bytes` limit, but + # not their `max_messages` limit. + schedule_message_callback(next_msg) + else: + # No more messages for this ordering key, so do clean-up. + self._clean_up_ordering_key(key) def _get_next_for_ordering_key( self, ordering_key: str @@ -157,12 +166,12 @@ def _clean_up_ordering_key(self, ordering_key: str) -> None: Args: ordering_key: The ordering key to clean up. """ + _LOGGER.debug("Cleaning up ordering key queue for key %s:", ordering_key) message_queue = self._pending_ordered_messages.get(ordering_key) - assert ( - message_queue is not None - ), "Cleaning up ordering key that does not exist." - assert not len(message_queue), ( - "Ordering key must only be removed if there are no messages " - "left for that key." - ) + if message_queue is None: + _LOGGER.warning("Tried to clean up ordering key that does not exist %s", ordering_key) + return + if len(message_queue) > 0: + _LOGGER.warning("Tried to clean up ordering key: %s with %d messages remaining.", ordering_key, len(message_queue)) + return del self._pending_ordered_messages[ordering_key] diff --git a/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py b/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py index 0d28ec447..74d35fc2c 100644 --- a/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py +++ b/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py @@ -108,6 +108,46 @@ def test_ordered_messages_one_key(): assert moh.get() is None assert moh.size == 0 +def test_ordered_messages_drop_duplicate_keys(): + moh = messages_on_hold.MessagesOnHold() +# comment + msg1 = make_message(ack_id="ack1", ordering_key="key1") + moh.put(msg1) + assert moh.size == 1 + + msg2 = make_message(ack_id="ack2", ordering_key="key1") + moh.put(msg2) + assert moh.size == 2 + + # Get first message for "key1" + assert moh.get() == msg1 + assert moh.size == 1 + + # Still waiting on the previously-sent message for "key1", and there are no + # other messages, so return None. + assert moh.get() is None + assert moh.size == 1 + + # Activate "key1", the second is ignored. + callback_tracker = ScheduleMessageCallbackTracker() + moh.activate_ordering_keys(["key1", "key1"], callback_tracker) + assert callback_tracker.called + assert callback_tracker.message == msg2 + assert moh.size == 0 + assert len(moh._pending_ordered_messages) == 1 + + # Activate "key1" again. There are no other messages for that key, so clean + # up state for that key. + callback_tracker = ScheduleMessageCallbackTracker() + moh.activate_ordering_keys(["key1"], callback_tracker) + assert not callback_tracker.called + + # Activate "key1" again (dropping a message we already dropped) + callback_tracker = ScheduleMessageCallbackTracker() + moh.activate_ordering_keys(["key1"], callback_tracker) + assert not callback_tracker.called + + def test_ordered_messages_two_keys(): moh = messages_on_hold.MessagesOnHold() From 837bd28c5447a57bdbc002df50de4528d82b38b2 Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Thu, 4 May 2023 21:03:43 -0400 Subject: [PATCH 02/14] cover cleanup changes --- .../subscriber/test_messages_on_hold.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py b/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py index 74d35fc2c..33d726323 100644 --- a/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py +++ b/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py @@ -318,3 +318,19 @@ def test_ordered_and_unordered_messages_interleaved(): # No messages left. assert moh.get() is None assert moh.size == 0 + +def test_cleanup_nonexistent_key(): + moh = messages_on_hold.MessagesOnHold() + moh._clean_up_ordering_key("non-existent key") + +def test_cleanup_key_with_messages(): + moh = messages_on_hold.MessagesOnHold() + + # Put message with "key1". + msg1 = make_message(ack_id="ack1", ordering_key="key1") + moh.put(msg1) + assert moh.size == 1 + + moh._clean_up_ordering_key("key1") + assert moh.size == 1 + From 94378ab657e1c4c07714d3677d1ef9cf867077ed Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Thu, 4 May 2023 21:31:47 -0400 Subject: [PATCH 03/14] add unit test coverage, and fix lint --- .../subscriber/_protocol/messages_on_hold.py | 23 +++++++++++++++---- .../subscriber/test_messages_on_hold.py | 20 +++++++++++----- 2 files changed, 32 insertions(+), 11 deletions(-) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/messages_on_hold.py b/google/cloud/pubsub_v1/subscriber/_protocol/messages_on_hold.py index e83f256b5..3eb43f63e 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/messages_on_hold.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/messages_on_hold.py @@ -23,6 +23,7 @@ _LOGGER = logging.getLogger(__name__) + class MessagesOnHold(object): """Tracks messages on hold by ordering key. Not thread-safe.""" @@ -74,14 +75,18 @@ def get(self) -> Optional["subscriber.message.Message"]: if msg.ordering_key: pending_queue = self._pending_ordered_messages.get(msg.ordering_key) if pending_queue is None: - _LOGGER.info("pending_queue is None for ordering key: %s", msg.ordering_key) + _LOGGER.info( + "pending_queue is None for ordering key: %s", msg.ordering_key + ) # Create empty queue to indicate a message with the # ordering key is in flight. self._pending_ordered_messages[ msg.ordering_key ] = collections.deque() self._size = self._size - 1 - _LOGGER.info("Created pending_queue for ordering key: %s", msg.ordering_key) + _LOGGER.info( + "Created pending_queue for ordering key: %s", msg.ordering_key + ) return msg else: # Another message is in flight so add message to end of @@ -128,7 +133,9 @@ def activate_ordering_keys( activated_keys.add(key) pending_ordered_messages = self._pending_ordered_messages.get(key) if pending_ordered_messages is None: - _LOGGER.warning("No message queue exists for message ordering key: %s.", key) + _LOGGER.warning( + "No message queue exists for message ordering key: %s.", key + ) continue next_msg = self._get_next_for_ordering_key(key) if next_msg: @@ -169,9 +176,15 @@ def _clean_up_ordering_key(self, ordering_key: str) -> None: _LOGGER.debug("Cleaning up ordering key queue for key %s:", ordering_key) message_queue = self._pending_ordered_messages.get(ordering_key) if message_queue is None: - _LOGGER.warning("Tried to clean up ordering key that does not exist %s", ordering_key) + _LOGGER.warning( + "Tried to clean up ordering key that does not exist %s", ordering_key + ) return if len(message_queue) > 0: - _LOGGER.warning("Tried to clean up ordering key: %s with %d messages remaining.", ordering_key, len(message_queue)) + _LOGGER.warning( + "Tried to clean up ordering key: %s with %d messages remaining.", + ordering_key, + len(message_queue), + ) return del self._pending_ordered_messages[ordering_key] diff --git a/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py b/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py index 33d726323..19a61cdd1 100644 --- a/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py +++ b/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py @@ -17,6 +17,7 @@ from google.cloud.pubsub_v1.subscriber import message from google.cloud.pubsub_v1.subscriber._protocol import messages_on_hold from google.pubsub_v1 import types as gapic_types +from _pytest.capture import CaptureFixture def make_message(ack_id, ordering_key): @@ -108,9 +109,10 @@ def test_ordered_messages_one_key(): assert moh.get() is None assert moh.size == 0 + def test_ordered_messages_drop_duplicate_keys(): moh = messages_on_hold.MessagesOnHold() -# comment + # comment msg1 = make_message(ack_id="ack1", ordering_key="key1") moh.put(msg1) assert moh.size == 1 @@ -148,7 +150,6 @@ def test_ordered_messages_drop_duplicate_keys(): assert not callback_tracker.called - def test_ordered_messages_two_keys(): moh = messages_on_hold.MessagesOnHold() @@ -319,11 +320,17 @@ def test_ordered_and_unordered_messages_interleaved(): assert moh.get() is None assert moh.size == 0 -def test_cleanup_nonexistent_key(): + +def test_cleanup_nonexistent_key(capsys: CaptureFixture[str]): moh = messages_on_hold.MessagesOnHold() - moh._clean_up_ordering_key("non-existent key") + moh._clean_up_ordering_key("non-existent-key") + out, _ = capsys.readouterr() + assert ( + f"Tried to clean up ordering key that does not exist: non-existent-key" in out + ) + -def test_cleanup_key_with_messages(): +def test_cleanup_key_with_messages(capsys: CaptureFixture[str]): moh = messages_on_hold.MessagesOnHold() # Put message with "key1". @@ -333,4 +340,5 @@ def test_cleanup_key_with_messages(): moh._clean_up_ordering_key("key1") assert moh.size == 1 - + out, _ = capsys.readouterr() + assert f"Tried to clean up ordering key: key1 with 1 messages remaining." in out From 39fe06c56957a28ae5677a6891533016f076440e Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Thu, 4 May 2023 21:34:01 -0400 Subject: [PATCH 04/14] fix lint --- tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py b/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py index 19a61cdd1..4aadb89fe 100644 --- a/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py +++ b/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py @@ -326,7 +326,7 @@ def test_cleanup_nonexistent_key(capsys: CaptureFixture[str]): moh._clean_up_ordering_key("non-existent-key") out, _ = capsys.readouterr() assert ( - f"Tried to clean up ordering key that does not exist: non-existent-key" in out + "Tried to clean up ordering key that does not exist: non-existent-key" in out ) @@ -341,4 +341,4 @@ def test_cleanup_key_with_messages(capsys: CaptureFixture[str]): moh._clean_up_ordering_key("key1") assert moh.size == 1 out, _ = capsys.readouterr() - assert f"Tried to clean up ordering key: key1 with 1 messages remaining." in out + assert "Tried to clean up ordering key: key1 with 1 messages remaining." in out From ac1f51d399185118bca2ee9cd2e03f5859a58201 Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Thu, 4 May 2023 21:41:04 -0400 Subject: [PATCH 05/14] fix unit test --- tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py b/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py index 4aadb89fe..42743502a 100644 --- a/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py +++ b/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py @@ -324,9 +324,9 @@ def test_ordered_and_unordered_messages_interleaved(): def test_cleanup_nonexistent_key(capsys: CaptureFixture[str]): moh = messages_on_hold.MessagesOnHold() moh._clean_up_ordering_key("non-existent-key") - out, _ = capsys.readouterr() + _, err = capsys.readouterr() assert ( - "Tried to clean up ordering key that does not exist: non-existent-key" in out + "Tried to clean up ordering key that does not exist: non-existent-key" in err ) @@ -340,5 +340,5 @@ def test_cleanup_key_with_messages(capsys: CaptureFixture[str]): moh._clean_up_ordering_key("key1") assert moh.size == 1 - out, _ = capsys.readouterr() - assert "Tried to clean up ordering key: key1 with 1 messages remaining." in out + _ err = capsys.readouterr() + assert "Tried to clean up ordering key: key1 with 1 messages remaining." in err From 730cfc8f538cc4dda5a1737fb458600bd78a4d9a Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Thu, 4 May 2023 21:44:03 -0400 Subject: [PATCH 06/14] fix unit test --- tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py b/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py index 42743502a..a1942a490 100644 --- a/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py +++ b/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py @@ -340,5 +340,5 @@ def test_cleanup_key_with_messages(capsys: CaptureFixture[str]): moh._clean_up_ordering_key("key1") assert moh.size == 1 - _ err = capsys.readouterr() + _, err = capsys.readouterr() assert "Tried to clean up ordering key: key1 with 1 messages remaining." in err From bfa1da9bb827365857adadee4a027fbd03460e64 Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Thu, 4 May 2023 21:46:01 -0400 Subject: [PATCH 07/14] fix lint --- tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py b/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py index a1942a490..29952a5cb 100644 --- a/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py +++ b/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py @@ -325,9 +325,7 @@ def test_cleanup_nonexistent_key(capsys: CaptureFixture[str]): moh = messages_on_hold.MessagesOnHold() moh._clean_up_ordering_key("non-existent-key") _, err = capsys.readouterr() - assert ( - "Tried to clean up ordering key that does not exist: non-existent-key" in err - ) + assert "Tried to clean up ordering key that does not exist: non-existent-key" in err def test_cleanup_key_with_messages(capsys: CaptureFixture[str]): From a1c679b033be8f00f2469579c4524a298fa15d6b Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Thu, 4 May 2023 21:55:57 -0400 Subject: [PATCH 08/14] add caplog --- .../subscriber/test_messages_on_hold.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py b/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py index 29952a5cb..ac09a3d2c 100644 --- a/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py +++ b/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py @@ -12,12 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging import queue from google.cloud.pubsub_v1.subscriber import message from google.cloud.pubsub_v1.subscriber._protocol import messages_on_hold from google.pubsub_v1 import types as gapic_types -from _pytest.capture import CaptureFixture def make_message(ack_id, ordering_key): @@ -321,14 +321,17 @@ def test_ordered_and_unordered_messages_interleaved(): assert moh.size == 0 -def test_cleanup_nonexistent_key(capsys: CaptureFixture[str]): +def test_cleanup_nonexistent_key(caplog): moh = messages_on_hold.MessagesOnHold() moh._clean_up_ordering_key("non-existent-key") _, err = capsys.readouterr() - assert "Tried to clean up ordering key that does not exist: non-existent-key" in err + assert ( + "Tried to clean up ordering key that does not exist: non-existent-key" + in caplog.text + ) -def test_cleanup_key_with_messages(capsys: CaptureFixture[str]): +def test_cleanup_key_with_messages(caplog): moh = messages_on_hold.MessagesOnHold() # Put message with "key1". @@ -338,5 +341,6 @@ def test_cleanup_key_with_messages(capsys: CaptureFixture[str]): moh._clean_up_ordering_key("key1") assert moh.size == 1 - _, err = capsys.readouterr() - assert "Tried to clean up ordering key: key1 with 1 messages remaining." in err + assert ( + "Tried to clean up ordering key: key1 with 1 messages remaining." in caplog.text + ) From 57e02992ef38de14d4f8380b6f73ef904973ae23 Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Thu, 4 May 2023 21:57:31 -0400 Subject: [PATCH 09/14] add caplog --- tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py b/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py index ac09a3d2c..1e2c56bbb 100644 --- a/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py +++ b/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py @@ -324,7 +324,6 @@ def test_ordered_and_unordered_messages_interleaved(): def test_cleanup_nonexistent_key(caplog): moh = messages_on_hold.MessagesOnHold() moh._clean_up_ordering_key("non-existent-key") - _, err = capsys.readouterr() assert ( "Tried to clean up ordering key that does not exist: non-existent-key" in caplog.text From 6362c8b5c8f068b3c6ae9f6c432214b3cca3ccfb Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Thu, 4 May 2023 22:40:49 -0400 Subject: [PATCH 10/14] allow duplicate ordering keys in activation --- .../subscriber/_protocol/messages_on_hold.py | 37 +++++++++---------- .../subscriber/test_messages_on_hold.py | 20 ++++++++-- 2 files changed, 34 insertions(+), 23 deletions(-) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/messages_on_hold.py b/google/cloud/pubsub_v1/subscriber/_protocol/messages_on_hold.py index 3eb43f63e..447360dda 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/messages_on_hold.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/messages_on_hold.py @@ -127,25 +127,22 @@ def activate_ordering_keys( schedule_message_callback: The callback to call to schedule a message to be sent to the user. """ - activated_keys = set() for key in ordering_keys: - if key not in activated_keys: - activated_keys.add(key) - pending_ordered_messages = self._pending_ordered_messages.get(key) - if pending_ordered_messages is None: - _LOGGER.warning( - "No message queue exists for message ordering key: %s.", key - ) - continue - next_msg = self._get_next_for_ordering_key(key) - if next_msg: - # Schedule the next message because the previous was dropped. - # Note that this may overload the user's `max_bytes` limit, but - # not their `max_messages` limit. - schedule_message_callback(next_msg) - else: - # No more messages for this ordering key, so do clean-up. - self._clean_up_ordering_key(key) + pending_ordered_messages = self._pending_ordered_messages.get(key) + if pending_ordered_messages is None: + _LOGGER.warning( + "No message queue exists for message ordering key: %s.", key + ) + continue + next_msg = self._get_next_for_ordering_key(key) + if next_msg: + # Schedule the next message because the previous was dropped. + # Note that this may overload the user's `max_bytes` limit, but + # not their `max_messages` limit. + schedule_message_callback(next_msg) + else: + # No more messages for this ordering key, so do clean-up. + self._clean_up_ordering_key(key) def _get_next_for_ordering_key( self, ordering_key: str @@ -170,14 +167,14 @@ def _get_next_for_ordering_key( def _clean_up_ordering_key(self, ordering_key: str) -> None: """Clean up state for an ordering key with no pending messages. - Args: + Args ordering_key: The ordering key to clean up. """ _LOGGER.debug("Cleaning up ordering key queue for key %s:", ordering_key) message_queue = self._pending_ordered_messages.get(ordering_key) if message_queue is None: _LOGGER.warning( - "Tried to clean up ordering key that does not exist %s", ordering_key + "Tried to clean up ordering key that does not exist: %s", ordering_key ) return if len(message_queue) > 0: diff --git a/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py b/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py index 1e2c56bbb..ac11e3e47 100644 --- a/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py +++ b/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py @@ -110,7 +110,7 @@ def test_ordered_messages_one_key(): assert moh.size == 0 -def test_ordered_messages_drop_duplicate_keys(): +def test_ordered_messages_drop_duplicate_keys(caplog): moh = messages_on_hold.MessagesOnHold() # comment msg1 = make_message(ack_id="ack1", ordering_key="key1") @@ -130,7 +130,7 @@ def test_ordered_messages_drop_duplicate_keys(): assert moh.get() is None assert moh.size == 1 - # Activate "key1", the second is ignored. + # Activate "key1". callback_tracker = ScheduleMessageCallbackTracker() moh.activate_ordering_keys(["key1", "key1"], callback_tracker) assert callback_tracker.called @@ -148,6 +148,7 @@ def test_ordered_messages_drop_duplicate_keys(): callback_tracker = ScheduleMessageCallbackTracker() moh.activate_ordering_keys(["key1"], callback_tracker) assert not callback_tracker.called + assert "No message queue exists for message ordering key: \"key1\"" def test_ordered_messages_two_keys(): @@ -338,8 +339,21 @@ def test_cleanup_key_with_messages(caplog): moh.put(msg1) assert moh.size == 1 - moh._clean_up_ordering_key("key1") + # Put another message "key1" + msg2 = make_message(ack_id="ack2", ordering_key="key1") + moh.put(msg2) + assert moh.size == 2 + + # Get first message for "key1" + assert moh.get() == msg1 assert moh.size == 1 + + + # Get first message for "key1" + assert moh.get() == None + assert moh.size == 1 + + moh._clean_up_ordering_key("key1") assert ( "Tried to clean up ordering key: key1 with 1 messages remaining." in caplog.text ) From 62cb2aa62cbf96b3246ac8685f4dac9d529fe18e Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Thu, 4 May 2023 22:44:26 -0400 Subject: [PATCH 11/14] fix lint --- tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py b/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py index ac11e3e47..d759bb935 100644 --- a/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py +++ b/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py @@ -148,7 +148,7 @@ def test_ordered_messages_drop_duplicate_keys(caplog): callback_tracker = ScheduleMessageCallbackTracker() moh.activate_ordering_keys(["key1"], callback_tracker) assert not callback_tracker.called - assert "No message queue exists for message ordering key: \"key1\"" + assert 'No message queue exists for message ordering key: "key1"' def test_ordered_messages_two_keys(): @@ -348,7 +348,6 @@ def test_cleanup_key_with_messages(caplog): assert moh.get() == msg1 assert moh.size == 1 - # Get first message for "key1" assert moh.get() == None assert moh.size == 1 From 8a0c559cd7520576da163b6ea7d4b46ff139cfbc Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Thu, 4 May 2023 23:12:36 -0400 Subject: [PATCH 12/14] remove logs --- .../subscriber/_protocol/messages_on_hold.py | 7 ---- .../subscriber/test_messages_on_hold.py | 33 ++++++++++++++++--- 2 files changed, 29 insertions(+), 11 deletions(-) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/messages_on_hold.py b/google/cloud/pubsub_v1/subscriber/_protocol/messages_on_hold.py index 447360dda..63c2edbfa 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/messages_on_hold.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/messages_on_hold.py @@ -75,18 +75,12 @@ def get(self) -> Optional["subscriber.message.Message"]: if msg.ordering_key: pending_queue = self._pending_ordered_messages.get(msg.ordering_key) if pending_queue is None: - _LOGGER.info( - "pending_queue is None for ordering key: %s", msg.ordering_key - ) # Create empty queue to indicate a message with the # ordering key is in flight. self._pending_ordered_messages[ msg.ordering_key ] = collections.deque() self._size = self._size - 1 - _LOGGER.info( - "Created pending_queue for ordering key: %s", msg.ordering_key - ) return msg else: # Another message is in flight so add message to end of @@ -170,7 +164,6 @@ def _clean_up_ordering_key(self, ordering_key: str) -> None: Args ordering_key: The ordering key to clean up. """ - _LOGGER.debug("Cleaning up ordering key queue for key %s:", ordering_key) message_queue = self._pending_ordered_messages.get(ordering_key) if message_queue is None: _LOGGER.warning( diff --git a/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py b/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py index d759bb935..871c07953 100644 --- a/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py +++ b/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py @@ -112,7 +112,7 @@ def test_ordered_messages_one_key(): def test_ordered_messages_drop_duplicate_keys(caplog): moh = messages_on_hold.MessagesOnHold() - # comment + msg1 = make_message(ack_id="ack1", ordering_key="key1") moh.put(msg1) assert moh.size == 1 @@ -136,7 +136,12 @@ def test_ordered_messages_drop_duplicate_keys(caplog): assert callback_tracker.called assert callback_tracker.message == msg2 assert moh.size == 0 - assert len(moh._pending_ordered_messages) == 1 + assert len(moh._pending_ordered_messages) == 0 + + # Activate "key1" again + callback_tracker = ScheduleMessageCallbackTracker() + moh.activate_ordering_keys(["key1"], callback_tracker) + assert not callback_tracker.called # Activate "key1" again. There are no other messages for that key, so clean # up state for that key. @@ -144,11 +149,31 @@ def test_ordered_messages_drop_duplicate_keys(caplog): moh.activate_ordering_keys(["key1"], callback_tracker) assert not callback_tracker.called - # Activate "key1" again (dropping a message we already dropped) + msg3 = make_message(ack_id="ack3", ordering_key="key1") + moh.put(msg3) + assert moh.size == 1 + + # Get next message for "key1" + assert moh.get() == msg3 + assert moh.size == 0 + + # Activate "key1". + callback_tracker = ScheduleMessageCallbackTracker() + moh.activate_ordering_keys(["key1"], callback_tracker) + assert not callback_tracker.called + + # Activate "key1" again. There are no other messages for that key, so clean + # up state for that key. + callback_tracker = ScheduleMessageCallbackTracker() + moh.activate_ordering_keys(["key1"], callback_tracker) + assert not callback_tracker.called + + # Activate "key1" again after being cleaned up. There are no other messages for that key, so clean + # up state for that key. callback_tracker = ScheduleMessageCallbackTracker() moh.activate_ordering_keys(["key1"], callback_tracker) assert not callback_tracker.called - assert 'No message queue exists for message ordering key: "key1"' + assert 'No message queue exists for message ordering key: key1' in caplog.text def test_ordered_messages_two_keys(): From 479dab044ceedc9d8c43dd46215da1837eb98d94 Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Thu, 4 May 2023 23:13:56 -0400 Subject: [PATCH 13/14] fix lint --- tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py b/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py index 871c07953..95a79d133 100644 --- a/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py +++ b/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py @@ -162,18 +162,18 @@ def test_ordered_messages_drop_duplicate_keys(caplog): moh.activate_ordering_keys(["key1"], callback_tracker) assert not callback_tracker.called - # Activate "key1" again. There are no other messages for that key, so clean + # Activate "key1" again. There are no other messages for that key, so clean # up state for that key. callback_tracker = ScheduleMessageCallbackTracker() moh.activate_ordering_keys(["key1"], callback_tracker) assert not callback_tracker.called - # Activate "key1" again after being cleaned up. There are no other messages for that key, so clean + # Activate "key1" again after being cleaned up. There are no other messages for that key, so clean # up state for that key. callback_tracker = ScheduleMessageCallbackTracker() moh.activate_ordering_keys(["key1"], callback_tracker) assert not callback_tracker.called - assert 'No message queue exists for message ordering key: key1' in caplog.text + assert "No message queue exists for message ordering key: key1" in caplog.text def test_ordered_messages_two_keys(): From df8df31f01cdf7d29cbd801a80fd616dc7745a1b Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Thu, 4 May 2023 23:17:08 -0400 Subject: [PATCH 14/14] fix lint --- tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py b/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py index 95a79d133..5e1dcf91b 100644 --- a/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py +++ b/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging import queue from google.cloud.pubsub_v1.subscriber import message @@ -374,7 +373,7 @@ def test_cleanup_key_with_messages(caplog): assert moh.size == 1 # Get first message for "key1" - assert moh.get() == None + assert moh.get() is None assert moh.size == 1 moh._clean_up_ordering_key("key1")