Skip to content

Commit 5577f8c

Browse files
DumengLinchin
authored andcommitted
feat: add support for proto3 optional tag (#727)
* feat: add support for proto3 optional tag * format writer.py * Add the same changes to v1beta2 * Add systen test for proto3 support * Remove v1beta2 modifications * Fix issue in the test script and reformat * fix lint * Fix typo in the test * Remove unneed offset --------- Co-authored-by: Lingqing Gan <lingqing.gan@gmail.com>
1 parent 6569227 commit 5577f8c

2 files changed

Lines changed: 94 additions & 0 deletions

File tree

packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/writer.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,17 @@ def __init__(
100100
# The threads created in ``._open()``.
101101
self._consumer = None
102102

103+
# The protobuf payload will be decoded as proto2 on the server side. The schema is also
104+
# specified as proto2. Hence we must clear proto3-only features. This works since proto2 and
105+
# proto3 are binary-compatible.
106+
proto_descriptor = (
107+
self._inital_request_template.proto_rows.writer_schema.proto_descriptor
108+
)
109+
for field in proto_descriptor.field:
110+
field.ClearField("oneof_index")
111+
field.ClearField("proto3_optional")
112+
proto_descriptor.ClearField("oneof_decl")
113+
103114
@property
104115
def is_active(self) -> bool:
105116
"""bool: True if this manager is actively streaming.

packages/google-cloud-bigquery-storage/tests/system/test_writer.py

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,29 @@
1616
import pytest
1717

1818
from google.cloud.bigquery_storage_v1 import types as gapic_types
19+
from google.cloud.bigquery_storage_v1.writer import AppendRowsStream
20+
import uuid
21+
22+
23+
@pytest.fixture
24+
def table(project_id, dataset, bq_client):
25+
from google.cloud import bigquery
26+
27+
schema = [
28+
bigquery.SchemaField("first_name", "STRING", mode="NULLABLE"),
29+
bigquery.SchemaField("last_name", "STRING", mode="NULLABLE"),
30+
bigquery.SchemaField("age", "INTEGER", mode="NULLABLE"),
31+
]
32+
33+
unique_suffix = str(uuid.uuid4()).replace("-", "_")
34+
table_id = "users_" + unique_suffix
35+
table_id_full = f"{project_id}.{dataset.dataset_id}.{table_id}"
36+
bq_table = bigquery.Table(table_id_full, schema=schema)
37+
created_table = bq_client.create_table(bq_table)
38+
39+
yield created_table
40+
41+
bq_client.delete_table(created_table)
1942

2043

2144
@pytest.fixture(scope="session")
@@ -31,3 +54,63 @@ def test_append_rows_with_invalid_stream_name_fails_fast(bqstorage_write_client)
3154

3255
with pytest.raises(exceptions.GoogleAPICallError):
3356
bqstorage_write_client.append_rows(bad_request)
57+
58+
59+
def test_append_rows_with_proto3(bqstorage_write_client, table):
60+
import proto
61+
from google.protobuf import descriptor_pb2
62+
63+
# Using Proto Plus to build proto3
64+
# Declare proto3 field `optional` for presence
65+
class PersonProto(proto.Message):
66+
first_name = proto.Field(
67+
proto.STRING,
68+
number=1,
69+
optional=True,
70+
)
71+
last_name = proto.Field(
72+
proto.STRING,
73+
number=2,
74+
optional=True,
75+
)
76+
age = proto.Field(
77+
proto.INT64,
78+
number=3,
79+
optional=True,
80+
)
81+
82+
person_pb = PersonProto.pb()
83+
84+
stream_name = f"projects/{table.project}/datasets/{table.dataset_id}/tables/{table.table_id}/_default"
85+
request_template = gapic_types.AppendRowsRequest()
86+
request_template.write_stream = stream_name
87+
88+
proto_schema = gapic_types.ProtoSchema()
89+
proto_descriptor = descriptor_pb2.DescriptorProto()
90+
person_pb.DESCRIPTOR.CopyToProto(
91+
proto_descriptor,
92+
)
93+
proto_schema.proto_descriptor = proto_descriptor
94+
proto_data = gapic_types.AppendRowsRequest.ProtoData()
95+
proto_data.writer_schema = proto_schema
96+
request_template.proto_rows = proto_data
97+
98+
append_rows_stream = AppendRowsStream(
99+
bqstorage_write_client,
100+
request_template,
101+
)
102+
103+
request = gapic_types.AppendRowsRequest()
104+
proto_data = gapic_types.AppendRowsRequest.ProtoData()
105+
proto_rows = gapic_types.ProtoRows()
106+
row = person_pb()
107+
row.first_name = "fn"
108+
row.last_name = "ln"
109+
row.age = 20
110+
proto_rows.serialized_rows.append(row.SerializeToString())
111+
proto_data.rows = proto_rows
112+
request.proto_rows = proto_data
113+
response_future = append_rows_stream.send(request)
114+
115+
assert response_future.result()
116+
# The request should success

0 commit comments

Comments
 (0)