@@ -55,6 +55,7 @@ class BlobWriteChannel extends BaseWriteChannel<StorageOptions, BlobInfo> {
5555 // TODO: I don't think this is thread safe, and there's probably a better way to detect a retry
5656 // occuring.
5757 private boolean retrying = false ;
58+ private boolean checkingForLastChunk = false ;
5859
5960 boolean isRetrying () {
6061 return retrying ;
@@ -64,129 +65,141 @@ StorageObject getStorageObject() {
6465 return storageObject ;
6566 }
6667
68+ private StorageObject transmitChunk (
69+ int chunkOffset , int chunkLength , long position , boolean last ) {
70+ return getOptions ()
71+ .getStorageRpcV1 ()
72+ .writeWithResponse (getUploadId (), getBuffer (), chunkOffset , position , chunkLength , last );
73+ }
74+
75+ private long getRemotePosition () {
76+ return getOptions ().getStorageRpcV1 ().getCurrentUploadOffset (getUploadId ());
77+ }
78+
79+ private StorageObject getRemoteStorageObject () {
80+ return getOptions ().getStorageRpcV1 ().get (getEntity ().toPb (), null );
81+ }
82+
83+ private StorageException unrecoverableState (
84+ int chunkOffset , int chunkLength , long localPosition , long remotePosition , boolean last ) {
85+ StringBuilder sb = new StringBuilder ();
86+ sb .append ("Unable to recover in upload.\n " );
87+ sb .append (
88+ "This may be a symptom of multiple clients uploading to the same upload session.\n \n " );
89+ sb .append ("For debugging purposes:\n " );
90+ sb .append ("uploadId: " ).append (getUploadId ()).append ('\n' );
91+ sb .append ("chunkOffset: " ).append (chunkOffset ).append ('\n' );
92+ sb .append ("chunkLength: " ).append (chunkLength ).append ('\n' );
93+ sb .append ("localOffset: " ).append (localPosition ).append ('\n' );
94+ sb .append ("remoteOffset: " ).append (remotePosition ).append ('\n' );
95+ sb .append ("lastChunk: " ).append (last ).append ("\n \n " );
96+ return new StorageException (0 , sb .toString ());
97+ }
98+
99+ // Retriable interruption occurred.
100+ // Variables:
101+ // chunk = getBuffer()
102+ // localNextByteOffset == getPosition()
103+ // chunkSize = getChunkSize()
104+ //
105+ // Case 1: localNextByteOffset == remoteNextByteOffset:
106+ // Retrying the entire chunk
107+ //
108+ // Case 2: localNextByteOffset < remoteNextByteOffset
109+ // && driftOffset < chunkSize:
110+ // Upload progressed and localNextByteOffset is not in-sync with
111+ // remoteNextByteOffset and driftOffset is less than chunkSize.
112+ // driftOffset must be less than chunkSize for it to retry using
113+ // chunk maintained in memory.
114+ // Find the driftOffset by subtracting localNextByteOffset from
115+ // remoteNextByteOffset.
116+ // Use driftOffset to determine where to restart from using the chunk in
117+ // memory.
118+ //
119+ // Case 3: localNextByteOffset < remoteNextByteOffset
120+ // && driftOffset == chunkSize:
121+ // Special case of Case 2.
122+ // If chunkSize is equal to driftOffset then remoteNextByteOffset has moved on
123+ // to the next chunk.
124+ //
125+ // Case 4: localNextByteOffset < remoteNextByteOffset
126+ // && driftOffset > chunkSize:
127+ // Throw exception as remoteNextByteOffset has drifted beyond the retriable
128+ // chunk maintained in memory. This is not possible unless there's multiple
129+ // clients uploading to the same resumable upload session.
130+ //
131+ // Case 5: localNextByteOffset > remoteNextByteOffset:
132+ // For completeness, this case is not possible because it would require retrying
133+ // a 400 status code which is not allowed.
134+ //
135+ // Case 6: remoteNextByteOffset==-1 && last == true
136+ // Upload is complete and retry occurred in the "last" chunk. Data sent was
137+ // received by the service.
138+ //
139+ // Case 7: remoteNextByteOffset==-1 && last == false && !checkingForLastChunk
140+ // Not last chunk and are not checkingForLastChunk, allow for the client to
141+ // catch up to final chunk which meets
142+ // Case 6.
143+ //
144+ // Case 8: remoteNextByteOffset==-1 && last == false && checkingForLastChunk
145+ // Not last chunk and checkingForLastChunk means this is the second time we
146+ // hit this case, meaning the upload was completed by a different client.
147+ //
148+ // Case 9: Only possible if the client local offset continues beyond the remote
149+ // offset which is not possible.
150+ //
67151 @ Override
68- protected void flushBuffer (final int length , final boolean last ) {
152+ protected void flushBuffer (final int length , final boolean lastChunk ) {
69153 try {
70154 runWithRetries (
71155 callable (
72156 new Runnable () {
73157 @ Override
74158 public void run () {
159+ // Get remote offset from API
160+ final long localPosition = getPosition ();
161+ // For each request it should be possible to retry from its location in this code
162+ final long remotePosition = isRetrying () ? getRemotePosition () : getPosition ();
163+ final int chunkOffset = (int ) (remotePosition - localPosition );
164+ final int chunkLength = length - chunkOffset ;
165+ final boolean uploadAlreadyComplete = remotePosition == -1 ;
166+ // Enable isRetrying state to reduce number of calls to getRemotePosition()
75167 if (!isRetrying ()) {
76- // Enable isRetrying state to reduce number of calls to getCurrentUploadOffset()
77168 retrying = true ;
169+ }
170+ if (uploadAlreadyComplete && lastChunk ) {
171+ // Case 6
172+ // Request object metadata if not available
173+ if (storageObject == null ) {
174+ storageObject = getRemoteStorageObject ();
175+ }
176+ // Verify that with the final chunk we match the blob length
177+ if (storageObject .getSize ().longValue () != getPosition () + length ) {
178+ throw unrecoverableState (
179+ chunkOffset , chunkLength , localPosition , remotePosition , lastChunk );
180+ }
181+ retrying = false ;
182+ } else if (uploadAlreadyComplete && !lastChunk && !checkingForLastChunk ) {
183+ // Case 7
184+ // Make sure this is the second to last chunk.
185+ checkingForLastChunk = true ;
186+ // Continue onto next chunk in case this is the last chunk
187+ } else if (localPosition <= remotePosition && chunkOffset < getChunkSize ()) {
188+ // Case 1 && Case 2
189+ // We are in a position to send a chunk
78190 storageObject =
79- getOptions ()
80- .getStorageRpcV1 ()
81- .writeWithResponse (
82- getUploadId (), getBuffer (), 0 , getPosition (), length , last );
191+ transmitChunk (chunkOffset , chunkLength , remotePosition , lastChunk );
192+ retrying = false ;
193+ } else if (localPosition < remotePosition && chunkOffset == getChunkSize ()) {
194+ // Case 3
195+ // Continue to next chunk to catch up with remotePosition we are one chunk
196+ // behind
197+ retrying = false ;
83198 } else {
84- // Retriable interruption occurred.
85- // Variables:
86- // chunk = getBuffer()
87- // localNextByteOffset == getPosition()
88- // chunkSize = getChunkSize()
89- //
90- // Case 1: localNextByteOffset == 0 && remoteNextByteOffset == 0:
91- // we are retrying from first chunk start from 0 offset.
92- //
93- // Case 2: localNextByteOffset == remoteNextByteOffset:
94- // Special case of Case 1 when a chunk is retried.
95- //
96- // Case 3: localNextByteOffset < remoteNextByteOffset
97- // && driftOffset < chunkSize:
98- // Upload progressed and localNextByteOffset is not in-sync with
99- // remoteNextByteOffset and driftOffset is less than chunkSize.
100- // driftOffset must be less than chunkSize for it to retry using
101- // chunk maintained in memory.
102- // Find the driftOffset by subtracting localNextByteOffset from
103- // remoteNextByteOffset.
104- // Use driftOffset to determine where to restart from using the chunk in
105- // memory.
106- //
107- // Case 4: localNextByteOffset < remoteNextByteOffset
108- // && driftOffset == chunkSize:
109- // Special case of Case 3.
110- // If chunkSize is equal to driftOffset then remoteNextByteOffset has moved on
111- // to the next chunk.
112- //
113- // Case 5: localNextByteOffset < remoteNextByteOffset
114- // && driftOffset > chunkSize:
115- // Throw exception as remoteNextByteOffset has drifted beyond the retriable
116- // chunk maintained in memory. This is not possible unless there's multiple
117- // clients uploading to the same resumable upload session.
118- //
119- // Case 6: localNextByteOffset > remoteNextByteOffset:
120- // For completeness, this case is not possible because it would require retrying
121- // a 400 status code which is not allowed.
122- //
123- // Case 7: remoteNextByteOffset==-1 && last == true
124- // Upload is complete and retry occurred in the "last" chunk. Data sent was
125- // received by the service.
126- //
127- // Case 8: remoteNextByteOffset==-1 && last == false
128- // Upload was completed by another client because this retry did not occur
129- // during the last chunk.
130- //
131- // Get remote offset from API
132- long remoteNextByteOffset =
133- getOptions ().getStorageRpcV1 ().getCurrentUploadOffset (getUploadId ());
134- long localNextByteOffset = getPosition ();
135- int driftOffset = (int ) (remoteNextByteOffset - localNextByteOffset );
136- int retryChunkLength = length - driftOffset ;
137-
138- if (localNextByteOffset == 0 && remoteNextByteOffset == 0
139- || localNextByteOffset == remoteNextByteOffset ) {
140- // Case 1 and 2
141- storageObject =
142- getOptions ()
143- .getStorageRpcV1 ()
144- .writeWithResponse (
145- getUploadId (), getBuffer (), 0 , getPosition (), length , last );
146- } else if (localNextByteOffset < remoteNextByteOffset
147- && driftOffset < getChunkSize ()) {
148- // Case 3
149- storageObject =
150- getOptions ()
151- .getStorageRpcV1 ()
152- .writeWithResponse (
153- getUploadId (),
154- getBuffer (),
155- driftOffset ,
156- remoteNextByteOffset ,
157- retryChunkLength ,
158- last );
159- } else if (localNextByteOffset < remoteNextByteOffset
160- && driftOffset == getChunkSize ()) {
161- // Case 4
162- // Continue to next chunk
163- retrying = false ;
164- return ;
165- } else if (localNextByteOffset < remoteNextByteOffset
166- && driftOffset > getChunkSize ()) {
167- // Case 5
168- StringBuilder sb = new StringBuilder ();
169- sb .append (
170- "Remote offset has progressed beyond starting byte offset of next chunk." );
171- sb .append (
172- "This may be a symptom of multiple clients uploading to the same upload session.\n \n " );
173- sb .append ("For debugging purposes:\n " );
174- sb .append ("uploadId: " ).append (getUploadId ()).append ('\n' );
175- sb .append ("localNextByteOffset: " ).append (localNextByteOffset ).append ('\n' );
176- sb .append ("remoteNextByteOffset: " ).append (remoteNextByteOffset ).append ('\n' );
177- sb .append ("driftOffset: " ).append (driftOffset ).append ("\n \n " );
178- throw new StorageException (0 , sb .toString ());
179- } else if (remoteNextByteOffset == -1 && last ) {
180- // Case 7
181- retrying = false ;
182- return ;
183- } else if (remoteNextByteOffset == -1 && !last ) {
184- // Case 8
185- throw new StorageException (0 , "Resumable upload is already complete." );
186- }
199+ // Case 4 && Case 8 && Case 9
200+ throw unrecoverableState (
201+ chunkOffset , chunkLength , localPosition , remotePosition , lastChunk );
187202 }
188- // Request was successful and retrying state is now disabled.
189- retrying = false ;
190203 }
191204 }),
192205 getOptions ().getRetrySettings (),
0 commit comments