Skip to content

Commit 701d0ce

Browse files
committed
Attempt to get 'Topic.publish()' working with GAX.
See #1869 for remaining issue.
1 parent cebd7d2 commit 701d0ce

2 files changed

Lines changed: 42 additions & 5 deletions

File tree

gcloud/pubsub/_gax.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -165,12 +165,14 @@ def topic_publish(self, topic_path, messages):
165165
message_pbs = [_message_pb_from_dict(message)
166166
for message in messages]
167167
try:
168-
response = self._gax_api.publish(topic_path, message_pbs)
168+
event = self._gax_api.publish(topic_path, message_pbs)
169+
if not event.is_set():
170+
event.wait()
169171
except GaxError as exc:
170172
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
171173
raise NotFound(topic_path)
172174
raise
173-
return response.message_ids
175+
return event.result.message_ids
174176

175177
def topic_list_subscriptions(self, topic_path, page_size=0,
176178
page_token=None):

gcloud/pubsub/test__gax.py

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,30 @@ def test_topic_publish_hit(self):
207207
MSGID = 'DEADBEEF'
208208
MESSAGE = {'data': B64, 'attributes': {}}
209209
response = _PublishResponsePB([MSGID])
210-
gax_api = _GAXPublisherAPI(_publish_response=response)
210+
event = _Event(response)
211+
event.wait() # already received result
212+
gax_api = _GAXPublisherAPI(_publish_response=event)
213+
api = self._makeOne(gax_api)
214+
215+
resource = api.topic_publish(self.TOPIC_PATH, [MESSAGE])
216+
217+
self.assertEqual(resource, [MSGID])
218+
topic_path, message_pbs, options = gax_api._publish_called_with
219+
self.assertEqual(topic_path, self.TOPIC_PATH)
220+
message_pb, = message_pbs
221+
self.assertEqual(message_pb.data, B64)
222+
self.assertEqual(message_pb.attributes, {})
223+
self.assertEqual(options, None)
224+
225+
def test_topic_publish_hit_with_wait(self):
226+
import base64
227+
PAYLOAD = b'This is the message text'
228+
B64 = base64.b64encode(PAYLOAD).decode('ascii')
229+
MSGID = 'DEADBEEF'
230+
MESSAGE = {'data': B64, 'attributes': {}}
231+
response = _PublishResponsePB([MSGID])
232+
event = _Event(response)
233+
gax_api = _GAXPublisherAPI(_publish_response=event)
211234
api = self._makeOne(gax_api)
212235

213236
resource = api.topic_publish(self.TOPIC_PATH, [MESSAGE])
@@ -897,12 +920,24 @@ def __init__(self, items, page_token):
897920
self.page_token = page_token
898921

899922
def next(self):
900-
if self._items is None:
901-
raise StopIteration()
902923
items, self._items = self._items, None
903924
return items
904925

905926

927+
class _Event(object):
928+
929+
result = None
930+
931+
def __init__(self, result):
932+
self._result = result
933+
934+
def is_set(self):
935+
return self.result is not None
936+
937+
def wait(self, *_):
938+
self.result = self._result
939+
940+
906941
class _TopicPB(object):
907942

908943
def __init__(self, name):

0 commit comments

Comments
 (0)