1919import io .contract_testing .contractcase .grpc .ContractCaseStream .StateHandlerHandle .Stage ;
2020import io .grpc .Status ;
2121import io .grpc .stub .StreamObserver ;
22+ import java .util .concurrent .ExecutorService ;
23+ import java .util .concurrent .Executors ;
2224import org .jetbrains .annotations .NotNull ;
2325
2426class ContractResponseStreamObserver <T extends AbstractMessage , B extends Builder <B >> implements
@@ -29,6 +31,7 @@ class ContractResponseStreamObserver<T extends AbstractMessage, B extends Builde
2931 private final IResultPrinter resultPrinter ;
3032 private final ConfigHandle configHandle ;
3133 private final IRunTestCallback runTestCallback ;
34+ private final ExecutorService executor ;
3235
3336
3437 public ContractResponseStreamObserver (
@@ -42,15 +45,17 @@ public ContractResponseStreamObserver(
4245 this .resultPrinter = resultPrinter ;
4346 this .configHandle = configHandle ;
4447 this .runTestCallback = runTestCallback ;
48+ this .executor = Executors .newCachedThreadPool ();
4549 }
4650
4751 @ Override
48- public void onNext (final ContractResponse note ) {
52+ public void onNext (final ContractResponse coreResponse ) {
4953 /* For when we receive messages from the server */
50- final var requestId = ConnectorIncomingMapper .map (note .getId ());
51- switch (note .getKindCase ()) {
54+ final var requestId = ConnectorIncomingMapper .map (coreResponse .getId ());
55+ MaintainerLog .log ("Received id(" + requestId + "), which was: " + coreResponse );
56+ switch (coreResponse .getKindCase ()) {
5257 case RUN_STATE_HANDLER -> {
53- final var stateHandlerRunRequest = note .getRunStateHandler ();
58+ final var stateHandlerRunRequest = coreResponse .getRunStateHandler ();
5459 var stateName = stateHandlerRunRequest .getStateHandlerHandle ()
5560 .getHandle ()
5661 .getValue ();
@@ -69,7 +74,7 @@ public void onNext(final ContractResponse note) {
6974 );
7075 }
7176 case LOG_REQUEST -> {
72- final var logRequest = note .getLogRequest ();
77+ final var logRequest = coreResponse .getLogRequest ();
7378 rpcConnector .sendResponse (
7479 ResultResponse .newBuilder ().setResult (
7580 mapResult (
@@ -88,7 +93,7 @@ public void onNext(final ContractResponse note) {
8893 );
8994 }
9095 case PRINT_MATCH_ERROR_REQUEST -> {
91- final var printMatchErrorRequest = note .getPrintMatchErrorRequest ();
96+ final var printMatchErrorRequest = coreResponse .getPrintMatchErrorRequest ();
9297 rpcConnector .sendResponse (
9398 mapResultResponse (
9499 resultPrinter .printMatchError (
@@ -103,20 +108,20 @@ public void onNext(final ContractResponse note) {
103108 mapResultResponse (
104109 resultPrinter .printMessageError (
105110 mapMessageErrorRequest (
106- note .getPrintMessageErrorRequest ()
111+ coreResponse .getPrintMessageErrorRequest ()
107112 )
108113 )
109114 ),
110115 requestId
111116 );
112117 }
113118 case PRINT_TEST_TITLE_REQUEST -> {
114- final var printTestTitleRequest = note .getPrintTestTitleRequest ();
119+ final var printTestTitleRequest = coreResponse .getPrintTestTitleRequest ();
115120 rpcConnector .sendResponse (mapResultResponse (resultPrinter .printTestTitle (
116121 mapPrintableTestTitle (printTestTitleRequest ))), requestId );
117122 }
118123 case TRIGGER_FUNCTION_REQUEST -> {
119- var triggerFunctionRequest = note .getTriggerFunctionRequest ();
124+ var triggerFunctionRequest = coreResponse .getTriggerFunctionRequest ();
120125 var handle = ConnectorIncomingMapper .map (triggerFunctionRequest .getTriggerFunction ()
121126 .getHandle ());
122127 if (handle == null ) {
@@ -139,24 +144,19 @@ public void onNext(final ContractResponse note) {
139144 );
140145 }
141146 case RESULT_RESPONSE -> {
142- rpcConnector .completeWait (requestId , note .getResultResponse ().getResult ());
147+ rpcConnector .completeWait (requestId , coreResponse .getResultResponse ().getResult ());
143148 }
144149 case START_TEST_EVENT -> {
145- var startTestEvent = note .getStartTestEvent ();
146- rpcConnector .sendResponse (
150+ var startTestEvent = coreResponse .getStartTestEvent ();
151+ executor . submit (() -> rpcConnector .sendResponse (
147152 mapResultResponse (
148153 runTestCallback .runTest (
149154 startTestEvent .getTestName ().getValue (),
150155 () -> rpcConnector .executeCallAndWait (rpcConnector .makeInvokeTest (
151156 startTestEvent .getInvokerId ()))
152157 )),
153158 requestId
154- );
155- // TODO: Implement this
156- throw new ContractCaseCoreError (
157- "Received start test event incorrectly during a define contract" ,
158- "Java Internal Connector"
159- );
159+ ));
160160 }
161161 case KIND_NOT_SET -> {
162162 throw new ContractCaseCoreError (
@@ -191,6 +191,8 @@ public void onError(final Throwable t) {
191191
192192 @ Override
193193 public void onCompleted () {
194+ MaintainerLog .log ("Closing listener and pool" );
195+ executor .shutdown ();
194196 }
195197
196198 @ NotNull
0 commit comments