fix: close the Watch stream when we receive an error#834
Conversation
| ResponseObserver<ResponseT> responseObserverT, | ||
| ServerStreamingCallable<RequestT, ResponseT> callable); | ||
|
|
||
| <RequestT, ResponseT> ApiStreamObserver<RequestT> streamRequest( | ||
| ApiStreamObserver<ResponseT> responseObserverT, | ||
| <RequestT, ResponseT> ClientStream<RequestT> streamRequest( | ||
| BidiStreamObserver<RequestT, ResponseT> responseObserverT, |
There was a problem hiding this comment.
These are the main changes, the rest is plumbing and cleanup.
| () -> { | ||
| synchronized (Watch.this) { | ||
| stream.onCompleted(); | ||
| stream.closeSend(); |
There was a problem hiding this comment.
This now closes the stream instead of invoking a no-op method.
| } else { | ||
| return queryResponse(DOCUMENT_NAME + "3").answer(invocation); | ||
| } | ||
| }) |
There was a problem hiding this comment.
FYI: Here and below - this is only a formatting change as I removed the unused cast.
| MoreExecutors.directExecutor()); | ||
| } | ||
| }) | ||
| mock -> { |
There was a problem hiding this comment.
Only a formatting change.
kolea2
left a comment
There was a problem hiding this comment.
changes look fine. Just a few questions around testing
| retryAttempts[0]++; | ||
| return RETRYABLE_FAILED_FUTURE; | ||
| }) | ||
| mock -> { |
There was a problem hiding this comment.
will this test the new changes?
There was a problem hiding this comment.
No, it's hard to test these changes. We could mock the backend stream, but then we are essentially only testing the behavior of the mock. If you know of a pre-existing implementation/fake of a GRPC stream that we can use to test this behavior, then we can add a test. A homegrown implementation that validates that our code follows our ow assumptions will not provide us with much meaningful test coverage.
There was a problem hiding this comment.
Would something like this be helpful? https://github.com/googleapis/java-bigtable/blob/main/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/gaxx/testing/FakeStreamingApi.java
Examples in tests:
https://github.com/googleapis/java-bigtable/blob/main/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/FilterMarkerRowsCallableTest.java#L38
(It's using ApiStreamObserver, but I imagine could be refactored to this use case)
There was a problem hiding this comment.
Added test queryWatchShutsDownStreamOnPermissionDenied() that re-uses some of the existing functionality.
The Watch code called "onCompleted" to close the stream - but that did not actually do anything. This PR migrates our streaming GAPIC APIs to ClientStream (which is not deprecated) and then calls "close()" instead.
Fixes: #822