This repository was archived by the owner on Mar 13, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 35
Expand file tree
/
Copy pathtest_read_only_transaction.py
More file actions
119 lines (111 loc) · 4.41 KB
/
test_read_only_transaction.py
File metadata and controls
119 lines (111 loc) · 4.41 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# Copyright 2024 Google LLC All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from sqlalchemy import create_engine, select
from sqlalchemy.orm import Session
from sqlalchemy.testing import eq_, is_instance_of
from google.cloud.spanner_v1 import (
FixedSizePool,
BatchCreateSessionsRequest,
ExecuteSqlRequest,
BeginTransactionRequest,
TransactionOptions,
)
from test.mockserver_tests.mock_server_test_base import MockServerTestBase
from test.mockserver_tests.mock_server_test_base import add_result
import google.cloud.spanner_v1.types.type as spanner_type
import google.cloud.spanner_v1.types.result_set as result_set
class TestReadOnlyTransaction(MockServerTestBase):
def test_read_only_transaction(self):
from test.mockserver_tests.read_only_model import Singer
add_singer_query_result("SELECT singers.id, singers.name \n" + "FROM singers")
engine = create_engine(
"spanner:///projects/p/instances/i/databases/d",
echo=True,
connect_args={"client": self.client, "pool": FixedSizePool(size=10)},
)
for i in range(2):
with Session(engine.execution_options(read_only=True)) as session:
# Execute two queries in a read-only transaction.
session.scalars(select(Singer)).all()
session.scalars(select(Singer)).all()
# Verify the requests that we got.
requests = self.spanner_service.requests
eq_(7, len(requests))
is_instance_of(requests[0], BatchCreateSessionsRequest)
is_instance_of(requests[1], BeginTransactionRequest)
is_instance_of(requests[2], ExecuteSqlRequest)
is_instance_of(requests[3], ExecuteSqlRequest)
is_instance_of(requests[4], BeginTransactionRequest)
is_instance_of(requests[5], ExecuteSqlRequest)
is_instance_of(requests[6], ExecuteSqlRequest)
# Verify that the transaction is a read-only transaction.
for index in [1, 4]:
begin_request: BeginTransactionRequest = requests[index]
eq_(
TransactionOptions(
dict(
read_only=TransactionOptions.ReadOnly(
dict(
strong=True,
return_read_timestamp=True,
)
)
)
),
begin_request.options,
)
def add_singer_query_result(sql: str):
result = result_set.ResultSet(
dict(
metadata=result_set.ResultSetMetadata(
dict(
row_type=spanner_type.StructType(
dict(
fields=[
spanner_type.StructType.Field(
dict(
name="singers_id",
type=spanner_type.Type(
dict(code=spanner_type.TypeCode.INT64)
),
)
),
spanner_type.StructType.Field(
dict(
name="singers_name",
type=spanner_type.Type(
dict(code=spanner_type.TypeCode.STRING)
),
)
),
]
)
)
)
),
)
)
result.rows.extend(
[
(
"1",
"Jane Doe",
),
(
"2",
"John Doe",
),
]
)
add_result(sql, result)