feat: Major classes for Spark continuous streaming#396
Conversation
| private final MultiPartitionCommitter committer; | ||
| private PslSourceOffset startOffset; | ||
|
|
||
| public PslContinuousReader(PslDataSourceOptions options) { |
There was a problem hiding this comment.
this doesn't belong here, this is wiring code. Remove this constructor.
There was a problem hiding this comment.
I would like to have this so I could keep code in PslDataSource super simple.
| AdminServiceClient adminServiceClient, | ||
| CursorServiceClient cursorServiceClient, | ||
| MultiPartitionCommitter committer) { | ||
| this.options = options; |
There was a problem hiding this comment.
don't take the options, take the things you need from it.
There was a problem hiding this comment.
this is needed for lower level (PslContinuousInputPartition) for many params in the option(subPath, flowctrl, creds), prefer not to disassemble it early.
| assert PslSourceOffset.class.isAssignableFrom(start.get().getClass()) | ||
| : "start offset is not assignable to PslSourceOffset."; | ||
| startOffset = (PslSourceOffset) start.get(); | ||
| return; |
There was a problem hiding this comment.
Do these methods need to be thread safe?
There was a problem hiding this comment.
No, they are called serially by the order of deserializeoffset (if writeahead log has any from previous query), setStartOffset, getStartOffset, planInputPartitions... get things done... mergeOffsets, commit.
|
There is an extra issue in this PR. The buffering pull subscriber will lead to an unbounded message cache inside, thus not respecting the flow control. Please ignore this for this PR. I have another PR to address and make it bounded. EDIT: #408 is the PR to make it bounded but let's finish this before I assign that. |
| @AutoValue.Builder | ||
| public abstract static class Builder { | ||
|
|
||
| public abstract Builder credentialsKey(String credentialsKey); |
There was a problem hiding this comment.
nit. these methods should be called "set"
dpcollins-google
left a comment
There was a problem hiding this comment.
Rebase this, fix the few comments, and then ping me back for approval
…continuous-processing
Codecov Report
@@ Coverage Diff @@
## master #396 +/- ##
============================================
- Coverage 72.10% 71.63% -0.47%
- Complexity 845 871 +26
============================================
Files 158 163 +5
Lines 4399 4573 +174
Branches 222 226 +4
============================================
+ Hits 3172 3276 +104
- Misses 1104 1168 +64
- Partials 123 129 +6
Continue to review full report at Codecov.
|
This implements major classes for spark continuous streaming.