fix: shutdown event loop if publisher fails to start and set exception on result future#124
Conversation
feat: adding ability to create subscriptions at head
| try: | ||
| self._managed_loop.submit(self._underlying.__aenter__()).result() | ||
| except GoogleAPICallError: | ||
| self._managed_loop.__exit__(None, None, None) |
There was a problem hiding this comment.
You can't actually do this. This will result in the managed loop being closed twice when a context manager is used. You shouldn't try to catch this exception- the caller still has to call exit
There was a problem hiding this comment.
You're right, the problem is that we are not calling exit on SinglePublisherImpl from the MultiplexedPublisherClient.
The MultiplexedPublisherClient attempts to get_or_create a SinglePublisherImpl here:
The ClientMultiplexer will attempt to create and start the client here:
But, in the Not Found topic case, the client fails on enter because the call to getTopicPartitions fails. So, this raises an exception and the new publisher client is never assigned to live_clients[key].
When exit is eventually called on the ClientMultiplexer, we will never call exit on the 'not found' client because it was never assigned to live_clients[key]:
Therefore, we will never run exit on the managed_loop. I will re-work this PR to address the real issue.
|
@dpcollins-google Is this ok to merge? It seems like it but just want to check. |
When publishing to a topic that does not exist, the publisher fails to start because the call to get topic partitions fails with 'Not found'. This change does the following: