Skip to content

Commit 8c609e9

Browse files
authored
Merge pull request #1870 from tseaver/1855-moar_gax_paging_fixes
More gax paging fixes
2 parents cf481d4 + 701d0ce commit 8c609e9

3 files changed

Lines changed: 128 additions & 65 deletions

File tree

gcloud/pubsub/_gax.py

Lines changed: 35 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ class _PublisherAPI(object):
4646
def __init__(self, gax_api):
4747
self._gax_api = gax_api
4848

49-
def list_topics(self, project, page_token=None):
49+
def list_topics(self, project, page_size=0, page_token=None):
5050
"""List topics for the project associated with this API.
5151
5252
See:
@@ -55,6 +55,10 @@ def list_topics(self, project, page_token=None):
5555
:type project: string
5656
:param project: project ID
5757
58+
:type page_size: int
59+
:param page_size: maximum number of topics to return, If not passed,
60+
defaults to a value set by the API.
61+
5862
:type page_token: string
5963
:param page_token: opaque marker for the next "page" of topics. If not
6064
passed, the API will return the first page of
@@ -68,9 +72,11 @@ def list_topics(self, project, page_token=None):
6872
"""
6973
options = _build_paging_options(page_token)
7074
path = 'projects/%s' % (project,)
71-
response = self._gax_api.list_topics(path, options)
72-
topics = [{'name': topic_pb.name} for topic_pb in response.topics]
73-
return topics, response.next_page_token
75+
page_iter = self._gax_api.list_topics(
76+
path, page_size=page_size, options=options)
77+
topics = [{'name': topic_pb.name} for topic_pb in page_iter.next()]
78+
token = page_iter.page_token or None
79+
return topics, token
7480

7581
def topic_create(self, topic_path):
7682
"""API call: create a topic
@@ -159,14 +165,17 @@ def topic_publish(self, topic_path, messages):
159165
message_pbs = [_message_pb_from_dict(message)
160166
for message in messages]
161167
try:
162-
response = self._gax_api.publish(topic_path, message_pbs)
168+
event = self._gax_api.publish(topic_path, message_pbs)
169+
if not event.is_set():
170+
event.wait()
163171
except GaxError as exc:
164172
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
165173
raise NotFound(topic_path)
166174
raise
167-
return response.message_ids
175+
return event.result.message_ids
168176

169-
def topic_list_subscriptions(self, topic_path, page_token=None):
177+
def topic_list_subscriptions(self, topic_path, page_size=0,
178+
page_token=None):
170179
"""API call: list subscriptions bound to a topic
171180
172181
See:
@@ -176,6 +185,10 @@ def topic_list_subscriptions(self, topic_path, page_token=None):
176185
:param topic_path: fully-qualified path of the topic, in format
177186
``projects/<PROJECT>/topics/<TOPIC_NAME>``.
178187
188+
:type page_size: int
189+
:param page_size: maximum number of subscriptions to return, If not
190+
passed, defaults to a value set by the API.
191+
179192
:type page_token: string
180193
:param page_token: opaque marker for the next "page" of subscriptions.
181194
If not passed, the API will return the first page
@@ -189,15 +202,15 @@ def topic_list_subscriptions(self, topic_path, page_token=None):
189202
"""
190203
options = _build_paging_options(page_token)
191204
try:
192-
response = self._gax_api.list_topic_subscriptions(
193-
topic_path, options)
205+
page_iter = self._gax_api.list_topic_subscriptions(
206+
topic_path, page_size=page_size, options=options)
194207
except GaxError as exc:
195208
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
196209
raise NotFound(topic_path)
197210
raise
198-
subs = [{'topic': topic_path, 'name': subscription}
199-
for subscription in response.subscriptions]
200-
return subs, response.next_page_token
211+
subs = page_iter.next()
212+
token = page_iter.page_token or None
213+
return subs, token
201214

202215

203216
class _SubscriberAPI(object):
@@ -209,7 +222,7 @@ class _SubscriberAPI(object):
209222
def __init__(self, gax_api):
210223
self._gax_api = gax_api
211224

212-
def list_subscriptions(self, project, page_token=None):
225+
def list_subscriptions(self, project, page_size=0, page_token=None):
213226
"""List subscriptions for the project associated with this API.
214227
215228
See:
@@ -218,6 +231,10 @@ def list_subscriptions(self, project, page_token=None):
218231
:type project: string
219232
:param project: project ID
220233
234+
:type page_size: int
235+
:param page_size: maximum number of subscriptions to return, If not
236+
passed, defaults to a value set by the API.
237+
221238
:type page_token: string
222239
:param page_token: opaque marker for the next "page" of subscriptions.
223240
If not passed, the API will return the first page
@@ -231,10 +248,12 @@ def list_subscriptions(self, project, page_token=None):
231248
"""
232249
options = _build_paging_options(page_token)
233250
path = 'projects/%s' % (project,)
234-
response = self._gax_api.list_subscriptions(path, options)
251+
page_iter = self._gax_api.list_subscriptions(
252+
path, page_size=page_size, options=options)
235253
subscriptions = [_subscription_pb_to_mapping(sub_pb)
236-
for sub_pb in response.subscriptions]
237-
return subscriptions, response.next_page_token
254+
for sub_pb in page_iter.next()]
255+
token = page_iter.page_token or None
256+
return subscriptions, token
238257

239258
def subscription_create(self, subscription_path, topic_path,
240259
ack_deadline=None, push_endpoint=None):

gcloud/pubsub/connection.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,8 +217,8 @@ def topic_list_subscriptions(self, topic_path, page_size=None,
217217
``projects/<PROJECT>/topics/<TOPIC_NAME>``.
218218
219219
:type page_size: int
220-
:param page_size: maximum number of topics to return, If not passed,
221-
defaults to a value set by the API.
220+
:param page_size: maximum number of subscriptions to return, If not
221+
passed, defaults to a value set by the API.
222222
223223
:type page_token: string
224224
:param page_token: opaque marker for the next "page" of topics. If not

0 commit comments

Comments
 (0)