Skip to content

Commit 6a15b7e

Browse files
authored
[FLINK-29950][sql-gateway] Refactor ResultSet to an interface
This closes #21502
1 parent 5cb434c commit 6a15b7e

16 files changed

Lines changed: 664 additions & 224 deletions

File tree

flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.flink.table.endpoint.hive.util;
2020

21+
import org.apache.flink.table.api.ResultKind;
2122
import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
2223
import org.apache.flink.table.catalog.Column;
2324
import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -41,6 +42,7 @@
4142
import org.apache.flink.table.gateway.api.SqlGatewayService;
4243
import org.apache.flink.table.gateway.api.results.FunctionInfo;
4344
import org.apache.flink.table.gateway.api.results.ResultSet;
45+
import org.apache.flink.table.gateway.api.results.ResultSetImpl;
4446
import org.apache.flink.table.gateway.api.results.TableInfo;
4547
import org.apache.flink.table.gateway.api.session.SessionHandle;
4648
import org.apache.flink.table.types.logical.DecimalType;
@@ -65,6 +67,7 @@
6567
import java.util.stream.Collectors;
6668
import java.util.stream.Stream;
6769

70+
import static org.apache.flink.table.api.internal.StaticResultProvider.SIMPLE_ROW_DATA_TO_STRING_CONVERTER;
6871
import static org.apache.flink.table.endpoint.hive.HiveServer2Schemas.GET_CATALOGS_SCHEMA;
6972
import static org.apache.flink.table.endpoint.hive.HiveServer2Schemas.GET_COLUMNS_SCHEMA;
7073
import static org.apache.flink.table.endpoint.hive.HiveServer2Schemas.GET_FUNCTIONS_SCHEMA;
@@ -537,7 +540,15 @@ private static GenericRowData wrap(Object... elements) {
537540
}
538541

539542
private static ResultSet buildResultSet(ResolvedSchema schema, List<RowData> data) {
540-
return new ResultSet(EOS, null, schema, data);
543+
return new ResultSetImpl(
544+
EOS,
545+
null,
546+
schema,
547+
data,
548+
SIMPLE_ROW_DATA_TO_STRING_CONVERTER,
549+
false,
550+
null,
551+
ResultKind.SUCCESS_WITH_CONTENT);
541552
}
542553

543554
private static List<Type> getSupportedHiveType() {

flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions;
3535
import org.apache.flink.table.gateway.api.operation.OperationHandle;
3636
import org.apache.flink.table.gateway.api.operation.OperationStatus;
37-
import org.apache.flink.table.gateway.api.results.ResultSet;
3837
import org.apache.flink.table.gateway.api.session.SessionEnvironment;
3938
import org.apache.flink.table.gateway.api.session.SessionHandle;
4039
import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
@@ -114,6 +113,7 @@
114113
import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_SQL_DIALECT;
115114
import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toSessionHandle;
116115
import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTOperationHandle;
116+
import static org.apache.flink.table.gateway.service.result.NotReadyResult.NOT_READY_RESULT;
117117
import static org.apache.hive.service.rpc.thrift.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10;
118118
import static org.assertj.core.api.Assertions.assertThat;
119119
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -811,7 +811,7 @@ private void runOperationRequest(
811811
sessionHandle,
812812
() -> {
813813
latch.await();
814-
return ResultSet.NOT_READY_RESULTS;
814+
return NOT_READY_RESULT;
815815
});
816816
manipulateOp.accept(
817817
toTOperationHandle(sessionHandle, operationHandle, TOperationType.UNKNOWN));

flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/ResultSet.java

Lines changed: 23 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -19,59 +19,33 @@
1919
package org.apache.flink.table.gateway.api.results;
2020

2121
import org.apache.flink.annotation.PublicEvolving;
22+
import org.apache.flink.api.common.JobID;
23+
import org.apache.flink.table.api.ResultKind;
2224
import org.apache.flink.table.api.TableResult;
2325
import org.apache.flink.table.catalog.ResolvedSchema;
2426
import org.apache.flink.table.data.RowData;
2527

2628
import javax.annotation.Nullable;
2729

28-
import java.util.Collections;
2930
import java.util.List;
30-
import java.util.Objects;
31-
import java.util.stream.Collectors;
3231

33-
/** The collection of the results. */
32+
/**
33+
* A {@code ResultSet} represents the collection of the results. This interface defines the methods
34+
* that can be used on the ResultSet.
35+
*/
3436
@PublicEvolving
35-
public class ResultSet {
36-
37-
private final ResultType resultType;
38-
39-
@Nullable private final Long nextToken;
40-
41-
private final ResolvedSchema resultSchema;
42-
private final List<RowData> data;
43-
44-
public static final ResultSet NOT_READY_RESULTS =
45-
new ResultSet(
46-
ResultType.NOT_READY,
47-
0L,
48-
ResolvedSchema.of(Collections.emptyList()),
49-
Collections.emptyList());
50-
51-
public ResultSet(
52-
ResultType resultType,
53-
@Nullable Long nextToken,
54-
ResolvedSchema resultSchema,
55-
List<RowData> data) {
56-
this.nextToken = nextToken;
57-
this.resultType = resultType;
58-
this.resultSchema = resultSchema;
59-
this.data = data;
60-
}
37+
public interface ResultSet {
6138

6239
/** Get the type of the results, which may indicate the result is EOS or has data. */
63-
public ResultType getResultType() {
64-
return resultType;
65-
}
40+
ResultType getResultType();
6641

6742
/**
6843
* The token indicates the next batch of the data.
6944
*
7045
* <p>When the token is null, it means all the data has been fetched.
7146
*/
72-
public @Nullable Long getNextToken() {
73-
return nextToken;
74-
}
47+
@Nullable
48+
Long getNextToken();
7549

7650
/**
7751
* The schema of the data.
@@ -90,53 +64,27 @@ public ResultType getResultType() {
9064
* +- -----------+-------------+----------+
9165
* </pre>
9266
*/
93-
public ResolvedSchema getResultSchema() {
94-
return resultSchema;
95-
}
67+
ResolvedSchema getResultSchema();
9668

9769
/** All the data in the current results. */
98-
public List<RowData> getData() {
99-
return data;
100-
}
70+
List<RowData> getData();
10171

102-
@Override
103-
public String toString() {
104-
return String.format(
105-
"ResultSet{\n"
106-
+ " resultType=%s,\n"
107-
+ " nextToken=%s,\n"
108-
+ " resultSchema=%s,\n"
109-
+ " data=[%s]\n"
110-
+ "}",
111-
resultType,
112-
nextToken,
113-
resultSchema.toString(),
114-
data.stream().map(Object::toString).collect(Collectors.joining(",")));
115-
}
72+
/** Indicates that whether the result is for a query. */
73+
boolean isQueryResult();
11674

117-
@Override
118-
public boolean equals(Object o) {
119-
if (this == o) {
120-
return true;
121-
}
122-
if (!(o instanceof ResultSet)) {
123-
return false;
124-
}
125-
ResultSet resultSet = (ResultSet) o;
126-
return resultType == resultSet.resultType
127-
&& Objects.equals(nextToken, resultSet.nextToken)
128-
&& Objects.equals(resultSchema, resultSet.resultSchema)
129-
&& Objects.equals(data, resultSet.data);
130-
}
75+
/**
76+
* If the statement was submitted to a client, returns the JobID which uniquely identifies the
77+
* job. Otherwise, returns null.
78+
*/
79+
@Nullable
80+
JobID getJobID();
13181

132-
@Override
133-
public int hashCode() {
134-
return Objects.hash(resultType, nextToken, resultSchema, data);
135-
}
82+
/** Gets the result kind of the result. */
83+
ResultKind getResultKind();
13684

13785
/** Describe the kind of the result. */
13886
@PublicEvolving
139-
public enum ResultType {
87+
enum ResultType {
14088
/** Indicate the result is not ready. */
14189
NOT_READY,
14290

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.gateway.api.results;
20+
21+
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.api.common.JobID;
23+
import org.apache.flink.table.api.ResultKind;
24+
import org.apache.flink.table.catalog.ResolvedSchema;
25+
import org.apache.flink.table.data.RowData;
26+
import org.apache.flink.table.utils.print.RowDataToStringConverter;
27+
28+
import javax.annotation.Nullable;
29+
30+
import java.util.List;
31+
import java.util.Objects;
32+
import java.util.stream.Collectors;
33+
34+
/** An implementation of {@link ResultSet}. */
35+
@Internal
36+
public class ResultSetImpl implements ResultSet {
37+
38+
private final ResultType resultType;
39+
40+
@Nullable private final Long nextToken;
41+
42+
private final ResolvedSchema resultSchema;
43+
private final List<RowData> data;
44+
private final RowDataToStringConverter converter;
45+
46+
private final boolean isQueryResult;
47+
48+
@Nullable private final JobID jobID;
49+
50+
private final ResultKind resultKind;
51+
52+
public ResultSetImpl(
53+
ResultType resultType,
54+
@Nullable Long nextToken,
55+
ResolvedSchema resultSchema,
56+
List<RowData> data,
57+
RowDataToStringConverter converter,
58+
boolean isQueryResult,
59+
@Nullable JobID jobID,
60+
ResultKind resultKind) {
61+
this.nextToken = nextToken;
62+
this.resultType = resultType;
63+
this.resultSchema = resultSchema;
64+
this.data = data;
65+
this.converter = converter;
66+
this.isQueryResult = isQueryResult;
67+
this.jobID = jobID;
68+
this.resultKind = resultKind;
69+
}
70+
71+
@Override
72+
public ResultType getResultType() {
73+
return resultType;
74+
}
75+
76+
@Override
77+
public @Nullable Long getNextToken() {
78+
return nextToken;
79+
}
80+
81+
@Override
82+
public ResolvedSchema getResultSchema() {
83+
return resultSchema;
84+
}
85+
86+
@Override
87+
public List<RowData> getData() {
88+
return data;
89+
}
90+
91+
@Override
92+
public boolean isQueryResult() {
93+
return isQueryResult;
94+
}
95+
96+
@Override
97+
public JobID getJobID() {
98+
return jobID;
99+
}
100+
101+
@Override
102+
public ResultKind getResultKind() {
103+
return resultKind;
104+
}
105+
106+
@Override
107+
public String toString() {
108+
return String.format(
109+
"ResultSet{\n"
110+
+ " resultType=%s,\n"
111+
+ " nextToken=%s,\n"
112+
+ " resultSchema=%s,\n"
113+
+ " data=[%s],\n"
114+
+ " isQueryResult=%s,\n"
115+
+ " jobId=%s,\n"
116+
+ " resultKind=%s\n"
117+
+ "}",
118+
resultType,
119+
nextToken,
120+
resultSchema.toString(),
121+
data.stream().map(Object::toString).collect(Collectors.joining(",")),
122+
isQueryResult,
123+
jobID,
124+
resultKind);
125+
}
126+
127+
@Override
128+
public boolean equals(Object o) {
129+
if (this == o) {
130+
return true;
131+
}
132+
if (!(o instanceof ResultSetImpl)) {
133+
return false;
134+
}
135+
ResultSetImpl resultSet = (ResultSetImpl) o;
136+
return resultType == resultSet.resultType
137+
&& Objects.equals(nextToken, resultSet.nextToken)
138+
&& Objects.equals(resultSchema, resultSet.resultSchema)
139+
&& Objects.equals(data, resultSet.data)
140+
&& isQueryResult == resultSet.isQueryResult
141+
&& Objects.equals(jobID, resultSet.jobID)
142+
&& resultKind == resultSet.resultKind;
143+
}
144+
145+
@Override
146+
public int hashCode() {
147+
return Objects.hash(
148+
resultType, nextToken, resultSchema, data, isQueryResult, jobID, resultKind);
149+
}
150+
}

flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -348,10 +348,10 @@ public List<String> completeStatement(
348348
OperationManager operationManager = getSession(sessionHandle).getOperationManager();
349349
OperationHandle operationHandle =
350350
operationManager.submitOperation(
351-
() ->
351+
handle ->
352352
getSession(sessionHandle)
353353
.createExecutor()
354-
.getCompletionHints(statement, position));
354+
.getCompletionHints(handle, statement, position));
355355
operationManager.awaitOperationTermination(operationHandle);
356356

357357
ResultSet resultSet =

0 commit comments

Comments
 (0)