1818
1919import com .google .api .core .AbstractApiService ;
2020import com .google .api .core .ApiClock ;
21+ import com .google .api .core .ApiFuture ;
2122import com .google .api .core .ApiFutureCallback ;
2223import com .google .api .core .ApiFutures ;
2324import com .google .api .core .InternalApi ;
2425import com .google .api .core .SettableApiFuture ;
2526import com .google .api .gax .batching .FlowController ;
2627import com .google .api .gax .core .Distribution ;
28+ import com .google .api .gax .grpc .GrpcCallContext ;
2729import com .google .api .gax .grpc .GrpcStatusCode ;
2830import com .google .api .gax .rpc .ApiException ;
2931import com .google .api .gax .rpc .ApiExceptionFactory ;
32+ import com .google .api .gax .rpc .ClientStream ;
33+ import com .google .api .gax .rpc .ResponseObserver ;
34+ import com .google .api .gax .rpc .StreamController ;
3035import com .google .cloud .pubsub .v1 .MessageDispatcher .AckProcessor ;
3136import com .google .cloud .pubsub .v1 .MessageDispatcher .PendingModifyAckDeadline ;
37+ import com .google .cloud .pubsub .v1 .stub .SubscriberStub ;
3238import com .google .common .collect .Lists ;
3339import com .google .common .util .concurrent .MoreExecutors ;
3440import com .google .protobuf .Empty ;
3541import com .google .pubsub .v1 .AcknowledgeRequest ;
3642import com .google .pubsub .v1 .ModifyAckDeadlineRequest ;
3743import com .google .pubsub .v1 .StreamingPullRequest ;
3844import com .google .pubsub .v1 .StreamingPullResponse ;
39- import com .google .pubsub .v1 .SubscriberGrpc .SubscriberStub ;
4045import io .grpc .Status ;
41- import io .grpc .stub .ClientCallStreamObserver ;
42- import io .grpc .stub .ClientResponseObserver ;
43- import io .grpc .stub .StreamObserver ;
4446import java .util .ArrayList ;
4547import java .util .Deque ;
4648import java .util .List ;
@@ -62,9 +64,9 @@ final class StreamingSubscriberConnection extends AbstractApiService implements
6264 private static final Duration INITIAL_CHANNEL_RECONNECT_BACKOFF = Duration .ofMillis (100 );
6365 private static final Duration MAX_CHANNEL_RECONNECT_BACKOFF = Duration .ofSeconds (10 );
6466 private static final int MAX_PER_REQUEST_CHANGES = 1000 ;
65- private static final Duration UNARY_TIMEOUT = Duration .ofSeconds (60 );
6667
6768 private final SubscriberStub stub ;
69+ private final int channelAffinity ;
6870 private final String subscription ;
6971 private final ScheduledExecutorService systemExecutor ;
7072 private final MessageDispatcher messageDispatcher ;
@@ -73,7 +75,7 @@ final class StreamingSubscriberConnection extends AbstractApiService implements
7375 new AtomicLong (INITIAL_CHANNEL_RECONNECT_BACKOFF .toMillis ());
7476
7577 private final Lock lock = new ReentrantLock ();
76- private ClientCallStreamObserver <StreamingPullRequest > requestObserver ;
78+ private ClientStream <StreamingPullRequest > clientStream ;
7779
7880 public StreamingSubscriberConnection (
7981 String subscription ,
@@ -82,6 +84,7 @@ public StreamingSubscriberConnection(
8284 Duration maxAckExtensionPeriod ,
8385 Distribution ackLatencyDistribution ,
8486 SubscriberStub stub ,
87+ int channelAffinity ,
8588 FlowController flowController ,
8689 Deque <MessageDispatcher .OutstandingMessageBatch > outstandingMessageBatches ,
8790 ScheduledExecutorService executor ,
@@ -90,6 +93,7 @@ public StreamingSubscriberConnection(
9093 this .subscription = subscription ;
9194 this .systemExecutor = systemExecutor ;
9295 this .stub = stub ;
96+ this .channelAffinity = channelAffinity ;
9397 this .messageDispatcher =
9498 new MessageDispatcher (
9599 receiver ,
@@ -118,15 +122,14 @@ protected void doStop() {
118122
119123 lock .lock ();
120124 try {
121- requestObserver . onError (Status .CANCELLED .asException ());
125+ clientStream . closeSendWithError (Status .CANCELLED .asException ());
122126 } finally {
123127 lock .unlock ();
124128 notifyStopped ();
125129 }
126130 }
127131
128- private class StreamingPullResponseObserver
129- implements ClientResponseObserver <StreamingPullRequest , StreamingPullResponse > {
132+ private class StreamingPullResponseObserver implements ResponseObserver <StreamingPullResponse > {
130133
131134 final SettableApiFuture <Void > errorFuture ;
132135
@@ -137,20 +140,21 @@ private class StreamingPullResponseObserver
137140 * the user can deal with -- so we save the request observer this response observer is "paired
138141 * with". If the stream has already errored, requesting more messages is a no-op.
139142 */
140- ClientCallStreamObserver < StreamingPullRequest > thisRequestObserver ;
143+ StreamController thisController ;
141144
142145 StreamingPullResponseObserver (SettableApiFuture <Void > errorFuture ) {
143146 this .errorFuture = errorFuture ;
144147 }
145148
146149 @ Override
147- public void beforeStart (ClientCallStreamObserver <StreamingPullRequest > requestObserver ) {
148- thisRequestObserver = requestObserver ;
149- requestObserver .disableAutoInboundFlowControl ();
150+ public void onStart (StreamController controller ) {
151+ thisController = controller ;
152+ thisController .disableAutoInboundFlowControl ();
153+ thisController .request (1 );
150154 }
151155
152156 @ Override
153- public void onNext (StreamingPullResponse response ) {
157+ public void onResponse (StreamingPullResponse response ) {
154158 channelReconnectBackoffMillis .set (INITIAL_CHANNEL_RECONNECT_BACKOFF .toMillis ());
155159 messageDispatcher .processReceivedMessages (
156160 response .getReceivedMessagesList (),
@@ -163,7 +167,7 @@ public void run() {
163167 if (isAlive () && !errorFuture .isDone ()) {
164168 lock .lock ();
165169 try {
166- thisRequestObserver .request (1 );
170+ thisController .request (1 );
167171 } catch (Exception e ) {
168172 logger .log (Level .WARNING , "cannot request more messages" , e );
169173 } finally {
@@ -180,29 +184,30 @@ public void onError(Throwable t) {
180184 }
181185
182186 @ Override
183- public void onCompleted () {
187+ public void onComplete () {
184188 logger .fine ("Streaming pull terminated successfully!" );
185189 errorFuture .set (null );
186190 }
187191 }
188192
189193 private void initialize () {
190194 final SettableApiFuture <Void > errorFuture = SettableApiFuture .create ();
191- final ClientResponseObserver < StreamingPullRequest , StreamingPullResponse > responseObserver =
195+ final ResponseObserver < StreamingPullResponse > responseObserver =
192196 new StreamingPullResponseObserver (errorFuture );
193- final ClientCallStreamObserver <StreamingPullRequest > requestObserver =
194- (ClientCallStreamObserver <StreamingPullRequest >) (stub .streamingPull (responseObserver ));
195- logger .log (
196- Level .FINER ,
197- "Initializing stream to subscription {0}" ,subscription );
198- // We need to set streaming ack deadline, but it's not useful since we'll modack to send receipt anyway.
199- // Set to some big-ish value in case we modack late.
200- requestObserver .onNext (
197+ ClientStream <StreamingPullRequest > initClientStream =
198+ stub .streamingPullCallable ()
199+ .splitCall (
200+ responseObserver ,
201+ GrpcCallContext .createDefault ().withChannelAffinity (channelAffinity ));
202+
203+ logger .log (Level .FINER , "Initializing stream to subscription {0}" , subscription );
204+ // We need to set streaming ack deadline, but it's not useful since we'll modack to send receipt
205+ // anyway. Set to some big-ish value in case we modack late.
206+ initClientStream .send (
201207 StreamingPullRequest .newBuilder ()
202208 .setSubscription (subscription )
203209 .setStreamAckDeadlineSeconds (60 )
204210 .build ());
205- requestObserver .request (1 );
206211
207212 /**
208213 * Must make sure we do this after sending the subscription name and deadline. Otherwise, some
@@ -211,7 +216,7 @@ private void initialize() {
211216 */
212217 lock .lock ();
213218 try {
214- this .requestObserver = requestObserver ;
219+ this .clientStream = initClientStream ;
215220 } finally {
216221 lock .unlock ();
217222 }
@@ -273,45 +278,43 @@ private boolean isAlive() {
273278 @ Override
274279 public void sendAckOperations (
275280 List <String > acksToSend , List <PendingModifyAckDeadline > ackDeadlineExtensions ) {
276- SubscriberStub timeoutStub =
277- stub .withDeadlineAfter (UNARY_TIMEOUT .toMillis (), TimeUnit .MILLISECONDS );
278- StreamObserver <Empty > loggingObserver = new StreamObserver <Empty >() {
279- @ Override
280- public void onCompleted () {
281- // noop
282- }
283-
284- @ Override
285- public void onNext (Empty e ) {
286- // noop
287- }
281+ ApiFutureCallback <Empty > loggingCallback =
282+ new ApiFutureCallback <Empty >() {
283+ @ Override
284+ public void onSuccess (Empty empty ) {
285+ // noop
286+ }
288287
289- @ Override
290- public void onError (Throwable t ) {
291- Level level = isAlive () ? Level .WARNING : Level .FINER ;
292- logger .log (level , "failed to send operations" , t );
293- }
294- };
288+ @ Override
289+ public void onFailure (Throwable t ) {
290+ Level level = isAlive () ? Level .WARNING : Level .FINER ;
291+ logger .log (level , "failed to send operations" , t );
292+ }
293+ };
295294
296295 for (PendingModifyAckDeadline modack : ackDeadlineExtensions ) {
297296 for (List <String > idChunk : Lists .partition (modack .ackIds , MAX_PER_REQUEST_CHANGES )) {
298- timeoutStub .modifyAckDeadline (
299- ModifyAckDeadlineRequest .newBuilder ()
300- .setSubscription (subscription )
301- .addAllAckIds (idChunk )
302- .setAckDeadlineSeconds (modack .deadlineExtensionSeconds )
303- .build (),
304- loggingObserver );
297+ ApiFuture <Empty > future =
298+ stub .modifyAckDeadlineCallable ()
299+ .futureCall (
300+ ModifyAckDeadlineRequest .newBuilder ()
301+ .setSubscription (subscription )
302+ .addAllAckIds (idChunk )
303+ .setAckDeadlineSeconds (modack .deadlineExtensionSeconds )
304+ .build ());
305+ ApiFutures .addCallback (future , loggingCallback );
305306 }
306307 }
307308
308309 for (List <String > idChunk : Lists .partition (acksToSend , MAX_PER_REQUEST_CHANGES )) {
309- timeoutStub .acknowledge (
310- AcknowledgeRequest .newBuilder ()
311- .setSubscription (subscription )
312- .addAllAckIds (idChunk )
313- .build (),
314- loggingObserver );
310+ ApiFuture <Empty > future =
311+ stub .acknowledgeCallable ()
312+ .futureCall (
313+ AcknowledgeRequest .newBuilder ()
314+ .setSubscription (subscription )
315+ .addAllAckIds (idChunk )
316+ .build ());
317+ ApiFutures .addCallback (future , loggingCallback );
315318 }
316319 }
317320
0 commit comments