Skip to content

Commit dd7640e

Browse files
authored
Back out DML/PDML support. (#6150)
PR #6048 merged too soon. Reverts #6048.
1 parent cf8eddb commit dd7640e

7 files changed

Lines changed: 362 additions & 742 deletions

File tree

packages/google-cloud-spanner/google/cloud/spanner_v1/database.py

Lines changed: 1 addition & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -14,32 +14,25 @@
1414

1515
"""User friendly container for Cloud Spanner Database."""
1616

17-
import copy
18-
import functools
1917
import re
2018
import threading
19+
import copy
2120

2221
from google.api_core.gapic_v1 import client_info
2322
import google.auth.credentials
24-
from google.protobuf.struct_pb2 import Struct
2523
from google.cloud.exceptions import NotFound
2624
import six
2725

2826
# pylint: disable=ungrouped-imports
2927
from google.cloud.spanner_v1 import __version__
30-
from google.cloud.spanner_v1._helpers import _make_value_pb
3128
from google.cloud.spanner_v1._helpers import _metadata_with_prefix
3229
from google.cloud.spanner_v1.batch import Batch
3330
from google.cloud.spanner_v1.gapic.spanner_client import SpannerClient
3431
from google.cloud.spanner_v1.keyset import KeySet
3532
from google.cloud.spanner_v1.pool import BurstyPool
3633
from google.cloud.spanner_v1.pool import SessionCheckout
3734
from google.cloud.spanner_v1.session import Session
38-
from google.cloud.spanner_v1.snapshot import _restart_on_unavailable
3935
from google.cloud.spanner_v1.snapshot import Snapshot
40-
from google.cloud.spanner_v1.streamed import StreamedResultSet
41-
from google.cloud.spanner_v1.proto.transaction_pb2 import (
42-
TransactionSelector, TransactionOptions)
4336
# pylint: enable=ungrouped-imports
4437

4538

@@ -279,64 +272,6 @@ def drop(self):
279272
metadata = _metadata_with_prefix(self.name)
280273
api.drop_database(self.name, metadata=metadata)
281274

282-
def execute_partitioned_dml(
283-
self, dml, params=None, param_types=None):
284-
"""Execute a partitionable DML statement.
285-
286-
:type dml: str
287-
:param dml: DML statement
288-
289-
:type params: dict, {str -> column value}
290-
:param params: values for parameter replacement. Keys must match
291-
the names used in ``dml``.
292-
293-
:type param_types: dict[str -> Union[dict, .types.Type]]
294-
:param param_types:
295-
(Optional) maps explicit types for one or more param values;
296-
required if parameters are passed.
297-
298-
:rtype: int
299-
:returns: Count of rows affected by the DML statement.
300-
"""
301-
if params is not None:
302-
if param_types is None:
303-
raise ValueError(
304-
"Specify 'param_types' when passing 'params'.")
305-
params_pb = Struct(fields={
306-
key: _make_value_pb(value) for key, value in params.items()})
307-
else:
308-
params_pb = None
309-
310-
api = self.spanner_api
311-
312-
txn_options = TransactionOptions(
313-
partitioned_dml=TransactionOptions.PartitionedDml())
314-
315-
metadata = _metadata_with_prefix(self.name)
316-
317-
with SessionCheckout(self._pool) as session:
318-
319-
txn = api.begin_transaction(
320-
session.name, txn_options, metadata=metadata)
321-
322-
txn_selector = TransactionSelector(id=txn.id)
323-
324-
restart = functools.partial(
325-
api.execute_streaming_sql,
326-
session.name,
327-
dml,
328-
transaction=txn_selector,
329-
params=params_pb,
330-
param_types=param_types,
331-
metadata=metadata)
332-
333-
iterator = _restart_on_unavailable(restart)
334-
335-
result_set = StreamedResultSet(iterator)
336-
list(result_set) # consume all partials
337-
338-
return result_set.stats.row_count_lower_bound
339-
340275
def session(self, labels=None):
341276
"""Factory to create a session for this database.
342277

packages/google-cloud-spanner/google/cloud/spanner_v1/snapshot.py

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@ class _SnapshotBase(_SessionWrapper):
7171
_multi_use = False
7272
_transaction_id = None
7373
_read_request_count = 0
74-
_execute_sql_count = 0
7574

7675
def _make_txn_selector(self): # pylint: disable=redundant-returns-doc
7776
"""Helper for :meth:`read` / :meth:`execute_sql`.
@@ -196,20 +195,14 @@ def execute_sql(self, sql, params=None, param_types=None,
196195

197196
restart = functools.partial(
198197
api.execute_streaming_sql,
199-
self._session.name,
200-
sql,
201-
transaction=transaction,
202-
params=params_pb,
203-
param_types=param_types,
204-
query_mode=query_mode,
205-
partition_token=partition,
206-
seqno=self._execute_sql_count,
198+
self._session.name, sql,
199+
transaction=transaction, params=params_pb, param_types=param_types,
200+
query_mode=query_mode, partition_token=partition,
207201
metadata=metadata)
208202

209203
iterator = _restart_on_unavailable(restart)
210204

211205
self._read_request_count += 1
212-
self._execute_sql_count += 1
213206

214207
if self._multi_use:
215208
return StreamedResultSet(iterator, source=self)

packages/google-cloud-spanner/google/cloud/spanner_v1/transaction.py

Lines changed: 5 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,11 @@
1414

1515
"""Spanner read-write transaction support."""
1616

17-
from google.protobuf.struct_pb2 import Struct
17+
from google.cloud.spanner_v1.proto.transaction_pb2 import TransactionSelector
18+
from google.cloud.spanner_v1.proto.transaction_pb2 import TransactionOptions
1819

1920
from google.cloud._helpers import _pb_timestamp_to_datetime
20-
from google.cloud.spanner_v1._helpers import _make_value_pb
2121
from google.cloud.spanner_v1._helpers import _metadata_with_prefix
22-
from google.cloud.spanner_v1.proto.transaction_pb2 import TransactionSelector
23-
from google.cloud.spanner_v1.proto.transaction_pb2 import TransactionOptions
2422
from google.cloud.spanner_v1.snapshot import _SnapshotBase
2523
from google.cloud.spanner_v1.batch import _BatchBase
2624

@@ -37,7 +35,6 @@ class Transaction(_SnapshotBase, _BatchBase):
3735
"""Timestamp at which the transaction was successfully committed."""
3836
_rolled_back = False
3937
_multi_use = True
40-
_execute_sql_count = 0
4138

4239
def __init__(self, session):
4340
if session._transaction is not None:
@@ -117,6 +114,9 @@ def commit(self):
117114
"""
118115
self._check_state()
119116

117+
if not self._mutations:
118+
raise ValueError("No mutations to commit")
119+
120120
database = self._session._database
121121
api = database.spanner_api
122122
metadata = _metadata_with_prefix(database.name)
@@ -128,58 +128,6 @@ def commit(self):
128128
del self._session._transaction
129129
return self.committed
130130

131-
def execute_update(self, dml, params=None, param_types=None,
132-
query_mode=None):
133-
"""Perform an ``ExecuteSql`` API request with DML.
134-
135-
:type dml: str
136-
:param dml: SQL DML statement
137-
138-
:type params: dict, {str -> column value}
139-
:param params: values for parameter replacement. Keys must match
140-
the names used in ``dml``.
141-
142-
:type param_types: dict[str -> Union[dict, .types.Type]]
143-
:param param_types:
144-
(Optional) maps explicit types for one or more param values;
145-
required if parameters are passed.
146-
147-
:type query_mode:
148-
:class:`google.cloud.spanner_v1.proto.ExecuteSqlRequest.QueryMode`
149-
:param query_mode: Mode governing return of results / query plan. See
150-
https://cloud.google.com/spanner/reference/rpc/google.spanner.v1#google.spanner.v1.ExecuteSqlRequest.QueryMode1
151-
152-
:rtype: int
153-
:returns: Count of rows affected by the DML statement.
154-
"""
155-
if params is not None:
156-
if param_types is None:
157-
raise ValueError(
158-
"Specify 'param_types' when passing 'params'.")
159-
params_pb = Struct(fields={
160-
key: _make_value_pb(value) for key, value in params.items()})
161-
else:
162-
params_pb = None
163-
164-
database = self._session._database
165-
metadata = _metadata_with_prefix(database.name)
166-
transaction = self._make_txn_selector()
167-
api = database.spanner_api
168-
169-
response = api.execute_sql(
170-
self._session.name,
171-
dml,
172-
transaction=transaction,
173-
params=params_pb,
174-
param_types=param_types,
175-
query_mode=query_mode,
176-
seqno=self._execute_sql_count,
177-
metadata=metadata,
178-
)
179-
180-
self._execute_sql_count += 1
181-
return response.stats.row_count_exact
182-
183131
def __enter__(self):
184132
"""Begin ``with`` block."""
185133
self.begin()

packages/google-cloud-spanner/tests/system/test_system.py

Lines changed: 0 additions & 159 deletions
Original file line numberDiff line numberDiff line change
@@ -627,165 +627,6 @@ def test_transaction_read_and_insert_or_update_then_commit(self):
627627
rows = list(session.read(self.TABLE, self.COLUMNS, self.ALL))
628628
self._check_rows_data(rows)
629629

630-
def _generate_insert_statements(self):
631-
insert_template = (
632-
'INSERT INTO {table} ({column_list}) '
633-
'VALUES ({row_data})'
634-
)
635-
for row in self.ROW_DATA:
636-
yield insert_template.format(
637-
table=self.TABLE,
638-
column_list=', '.join(self.COLUMNS),
639-
row_data='{}, "{}", "{}", "{}"'.format(*row)
640-
)
641-
642-
@RetryErrors(exception=exceptions.ServerError)
643-
@RetryErrors(exception=exceptions.Conflict)
644-
def test_transaction_execute_sql_w_dml_read_rollback(self):
645-
retry = RetryInstanceState(_has_all_ddl)
646-
retry(self._db.reload)()
647-
648-
session = self._db.session()
649-
session.create()
650-
self.to_delete.append(session)
651-
652-
with session.batch() as batch:
653-
batch.delete(self.TABLE, self.ALL)
654-
655-
transaction = session.transaction()
656-
transaction.begin()
657-
658-
rows = list(
659-
transaction.read(self.TABLE, self.COLUMNS, self.ALL))
660-
self.assertEqual(rows, [])
661-
662-
for insert_statement in self._generate_insert_statements():
663-
result = transaction.execute_sql(insert_statement)
664-
list(result) # iterate to get stats
665-
self.assertEqual(result.stats.row_count_exact, 1)
666-
667-
# Rows inserted via DML *can* be read before commit.
668-
during_rows = list(
669-
transaction.read(self.TABLE, self.COLUMNS, self.ALL))
670-
self._check_rows_data(during_rows)
671-
672-
transaction.rollback()
673-
674-
rows = list(session.read(self.TABLE, self.COLUMNS, self.ALL))
675-
self._check_rows_data(rows, [])
676-
677-
@RetryErrors(exception=exceptions.ServerError)
678-
@RetryErrors(exception=exceptions.Conflict)
679-
def test_transaction_execute_update_read_commit(self):
680-
retry = RetryInstanceState(_has_all_ddl)
681-
retry(self._db.reload)()
682-
683-
session = self._db.session()
684-
session.create()
685-
self.to_delete.append(session)
686-
687-
with session.batch() as batch:
688-
batch.delete(self.TABLE, self.ALL)
689-
690-
with session.transaction() as transaction:
691-
rows = list(transaction.read(self.TABLE, self.COLUMNS, self.ALL))
692-
self.assertEqual(rows, [])
693-
694-
for insert_statement in self._generate_insert_statements():
695-
row_count = transaction.execute_update(insert_statement)
696-
self.assertEqual(row_count, 1)
697-
698-
# Rows inserted via DML *can* be read before commit.
699-
during_rows = list(
700-
transaction.read(self.TABLE, self.COLUMNS, self.ALL))
701-
self._check_rows_data(during_rows)
702-
703-
rows = list(session.read(self.TABLE, self.COLUMNS, self.ALL))
704-
self._check_rows_data(rows)
705-
706-
@RetryErrors(exception=exceptions.ServerError)
707-
@RetryErrors(exception=exceptions.Conflict)
708-
def test_transaction_execute_update_then_insert_commit(self):
709-
retry = RetryInstanceState(_has_all_ddl)
710-
retry(self._db.reload)()
711-
712-
session = self._db.session()
713-
session.create()
714-
self.to_delete.append(session)
715-
716-
with session.batch() as batch:
717-
batch.delete(self.TABLE, self.ALL)
718-
719-
insert_statement = list(self._generate_insert_statements())[0]
720-
721-
with session.transaction() as transaction:
722-
rows = list(transaction.read(self.TABLE, self.COLUMNS, self.ALL))
723-
self.assertEqual(rows, [])
724-
725-
row_count = transaction.execute_update(insert_statement)
726-
self.assertEqual(row_count, 1)
727-
728-
transaction.insert(self.TABLE, self.COLUMNS, self.ROW_DATA[1:])
729-
730-
rows = list(session.read(self.TABLE, self.COLUMNS, self.ALL))
731-
self._check_rows_data(rows)
732-
733-
def test_execute_partitioned_dml(self):
734-
retry = RetryInstanceState(_has_all_ddl)
735-
retry(self._db.reload)()
736-
737-
delete_statement = 'DELETE FROM {} WHERE true'.format(self.TABLE)
738-
739-
def _setup_table(txn):
740-
txn.execute_update(delete_statement)
741-
for insert_statement in self._generate_insert_statements():
742-
txn.execute_update(insert_statement)
743-
744-
committed = self._db.run_in_transaction(_setup_table)
745-
746-
with self._db.snapshot(read_timestamp=committed) as snapshot:
747-
before_pdml = list(snapshot.read(
748-
self.TABLE, self.COLUMNS, self.ALL))
749-
750-
self._check_rows_data(before_pdml)
751-
752-
nonesuch = 'nonesuch@example.com'
753-
target = 'phred@example.com'
754-
update_statement = (
755-
'UPDATE {table} SET {table}.email = @email '
756-
'WHERE {table}.email = @target').format(
757-
table=self.TABLE)
758-
759-
row_count = self._db.execute_partitioned_dml(
760-
update_statement,
761-
params={
762-
'email': nonesuch,
763-
'target': target,
764-
},
765-
param_types={
766-
'email': Type(code=STRING),
767-
'target': Type(code=STRING),
768-
},
769-
)
770-
self.assertEqual(row_count, 1)
771-
772-
row = self.ROW_DATA[0]
773-
updated = [row[:3] + (nonesuch,)] + list(self.ROW_DATA[1:])
774-
775-
with self._db.snapshot(read_timestamp=committed) as snapshot:
776-
after_update = list(snapshot.read(
777-
self.TABLE, self.COLUMNS, self.ALL))
778-
self._check_rows_data(after_update, updated)
779-
780-
row_count = self._db.execute_partitioned_dml(delete_statement)
781-
self.assertEqual(row_count, len(self.ROW_DATA))
782-
783-
with self._db.snapshot(read_timestamp=committed) as snapshot:
784-
after_delete = list(snapshot.read(
785-
self.TABLE, self.COLUMNS, self.ALL))
786-
787-
self._check_rows_data(after_delete, [])
788-
789630
def _transaction_concurrency_helper(self, unit_of_work, pkey):
790631
INITIAL_VALUE = 123
791632
NUM_THREADS = 3 # conforms to equivalent Java systest.

0 commit comments

Comments
 (0)