Skip to content

Commit 0574c82

Browse files
authored
NIFI-15885: Addressed issues that cause Offload to get stuck: when a … (#11184)
* NIFI-15885: Addressed issues that cause Offload to get stuck: when a processor extends AbstractSessionFactoryProcessor and stores a reference to a created ProcessSession but not its factory, garbage collection could cause framework to be unable to rollback sessions. When a processor is terminated, ensure we rollback retained sessions even if all threads are completed. Fixed related bug that caused background thread never to complete when waiting for processor thread count to reach 1 and terminate set it to 0. * NIFI-15885: Fix to unit test * NIFI-15885: Allow StandardProcessSession.migrate to unwrap delegating Session wrappers so FactoryRetainingProcessSession (used to keep an ActiveProcessSessionFactory reachable for offload/terminate) is recognized as a StandardProcessSession target. Adds a framework-internal DelegatingProcessSession contract implemented by the wrapper, and an IT covering the migrate-to-wrapper path that reproduces the MergeRecord CI failure.
1 parent ff6df52 commit 0574c82

9 files changed

Lines changed: 701 additions & 8 deletions

File tree

nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1905,6 +1905,14 @@ public void run() {
19051905
}
19061906
}
19071907
}
1908+
} else if (lifecycleState.isTerminated()) {
1909+
// Termination was requested while the stop sequence was waiting for active threads to drain.
1910+
// LifecycleState.terminate() reset the active thread count to zero, so the count==1
1911+
// condition above will never be reached and rescheduling would loop forever. Complete the
1912+
// stop action and exit. completeStopAction() is idempotent if procNode.terminate() already
1913+
// invoked it.
1914+
LOG.debug("Stop sequence for {} aborted because LifecycleState was terminated", this);
1915+
completeStopAction();
19081916
} else {
19091917
// Not all of the active threads have finished. Try again in 100 milliseconds.
19101918
executor.schedule(this, 100, TimeUnit.MILLISECONDS);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.nifi.controller.repository;
19+
20+
import org.apache.nifi.processor.ProcessSession;
21+
22+
/**
23+
* Framework-internal contract implemented by {@link ProcessSession} wrappers that forward all
24+
* operations to an underlying delegate Session. This allows framework code that needs to operate
25+
* on a concrete Session implementation (for example, {@code StandardProcessSession.migrate}, which
26+
* requires its target to be another {@code StandardProcessSession}) to look through any number of
27+
* wrapping layers and recover the underlying Session.
28+
*
29+
* Extensions must not implement this interface; it is intended for framework-internal Session
30+
* wrappers only.
31+
*/
32+
public interface DelegatingProcessSession extends ProcessSession {
33+
34+
/**
35+
* @return the Session that this wrapper forwards operations to. May itself be a
36+
* {@link DelegatingProcessSession} when wrappers are nested.
37+
*/
38+
ProcessSession getDelegate();
39+
}

nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1501,11 +1501,23 @@ public void migrate(final ProcessSession newOwner, final Collection<FlowFile> fl
15011501
throw new IllegalArgumentException("Must supply at least one FlowFile to migrate");
15021502
}
15031503

1504-
if (!(newOwner instanceof StandardProcessSession)) {
1504+
// Look through any framework-internal Session wrappers (such as the one used to keep an
1505+
// ActiveProcessSessionFactory reachable for the offload/terminate path) so the underlying
1506+
// StandardProcessSession can be located.
1507+
ProcessSession resolvedOwner = newOwner;
1508+
while (resolvedOwner instanceof DelegatingProcessSession delegating) {
1509+
resolvedOwner = delegating.getDelegate();
1510+
}
1511+
1512+
if (!(resolvedOwner instanceof StandardProcessSession standardOwner)) {
15051513
throw new IllegalArgumentException("Cannot migrate from a StandardProcessSession to a " + newOwner.getClass());
15061514
}
15071515

1508-
migrate((StandardProcessSession) newOwner, flowFiles);
1516+
if (standardOwner == this) {
1517+
throw new IllegalArgumentException("Cannot migrate FlowFiles from a Process Session to itself");
1518+
}
1519+
1520+
migrate(standardOwner, flowFiles);
15091521
}
15101522

15111523
private synchronized void migrate(final StandardProcessSession newOwner, Collection<FlowFile> flowFiles) {

nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/WeakHashMapProcessSessionFactory.java

Lines changed: 287 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,32 @@
1717

1818
package org.apache.nifi.controller.repository;
1919

20+
import org.apache.nifi.components.state.Scope;
21+
import org.apache.nifi.components.state.StateMap;
22+
import org.apache.nifi.controller.queue.QueueSize;
23+
import org.apache.nifi.flowfile.FlowFile;
24+
import org.apache.nifi.processor.FlowFileFilter;
2025
import org.apache.nifi.processor.ProcessSession;
2126
import org.apache.nifi.processor.ProcessSessionFactory;
27+
import org.apache.nifi.processor.Relationship;
2228
import org.apache.nifi.processor.exception.TerminatedTaskException;
29+
import org.apache.nifi.processor.io.InputStreamCallback;
30+
import org.apache.nifi.processor.io.OutputStreamCallback;
31+
import org.apache.nifi.processor.io.StreamCallback;
32+
import org.apache.nifi.processor.metrics.CommitTiming;
33+
import org.apache.nifi.provenance.ProvenanceReporter;
2334

35+
import java.io.IOException;
36+
import java.io.InputStream;
37+
import java.io.OutputStream;
38+
import java.nio.file.Path;
39+
import java.util.Collection;
40+
import java.util.List;
2441
import java.util.Map;
42+
import java.util.Set;
2543
import java.util.WeakHashMap;
44+
import java.util.function.Consumer;
45+
import java.util.regex.Pattern;
2646

2747
public class WeakHashMapProcessSessionFactory implements ActiveProcessSessionFactory {
2848
private final ProcessSessionFactory delegate;
@@ -39,9 +59,16 @@ public synchronized ProcessSession createSession() {
3959
throw new TerminatedTaskException();
4060
}
4161

42-
final ProcessSession session = delegate.createSession();
43-
sessionMap.put(session, Boolean.TRUE);
44-
return session;
62+
final ProcessSession delegateSession = delegate.createSession();
63+
64+
// Wrap the delegate Session so that the returned Session strongly references this factory. The
65+
// LifecycleState tracks ActiveProcessSessionFactory instances in a WeakHashMap, so without a
66+
// back-reference from the Session a Processor that retains the Session but not the factory across
67+
// onTrigger invocations would let the WeakHashMap entry clear, leaving terminate() unable to roll
68+
// back the orphaned Session and preventing offload from completing.
69+
final ProcessSession wrapper = new FactoryRetainingProcessSession(delegateSession, this);
70+
sessionMap.put(wrapper, Boolean.TRUE);
71+
return wrapper;
4572
}
4673

4774
@Override
@@ -56,4 +83,261 @@ public synchronized void terminateActiveSessions() {
5683

5784
sessionMap.clear();
5885
}
86+
87+
/**
88+
* A delegating {@link ProcessSession} that strongly references the factory that produced it. This
89+
* back-reference exists solely to keep the factory reachable so that the {@code LifecycleState}
90+
* WeakHashMap entry tracking the factory is not cleared while the Session is still in use. All
91+
* operations are forwarded to the underlying delegate Session.
92+
*/
93+
private static final class FactoryRetainingProcessSession implements DelegatingProcessSession {
94+
private final ProcessSession delegate;
95+
@SuppressWarnings("unused") // Retained only to keep the factory reachable; see class Javadoc.
96+
private final WeakHashMapProcessSessionFactory factoryRetention;
97+
98+
private FactoryRetainingProcessSession(final ProcessSession delegate, final WeakHashMapProcessSessionFactory factoryRetention) {
99+
this.delegate = delegate;
100+
this.factoryRetention = factoryRetention;
101+
}
102+
103+
@Override
104+
public ProcessSession getDelegate() {
105+
return delegate;
106+
}
107+
108+
@Override
109+
public void commit() {
110+
delegate.commit();
111+
}
112+
113+
@Override
114+
public void commitAsync() {
115+
delegate.commitAsync();
116+
}
117+
118+
@Override
119+
public void commitAsync(final Runnable onSuccess, final Consumer<Throwable> onFailure) {
120+
delegate.commitAsync(onSuccess, onFailure);
121+
}
122+
123+
@Override
124+
public void rollback() {
125+
delegate.rollback();
126+
}
127+
128+
@Override
129+
public void rollback(final boolean penalize) {
130+
delegate.rollback(penalize);
131+
}
132+
133+
@Override
134+
public void migrate(final ProcessSession newOwner, final Collection<FlowFile> flowFiles) {
135+
delegate.migrate(newOwner, flowFiles);
136+
}
137+
138+
@Override
139+
public void migrate(final ProcessSession newOwner) {
140+
delegate.migrate(newOwner);
141+
}
142+
143+
@Override
144+
public void adjustCounter(final String name, final long delta, final boolean immediate) {
145+
delegate.adjustCounter(name, delta, immediate);
146+
}
147+
148+
@Override
149+
public void recordGauge(final String name, final double value, final CommitTiming commitTiming) {
150+
delegate.recordGauge(name, value, commitTiming);
151+
}
152+
153+
@Override
154+
public FlowFile get() {
155+
return delegate.get();
156+
}
157+
158+
@Override
159+
public List<FlowFile> get(final int maxResults) {
160+
return delegate.get(maxResults);
161+
}
162+
163+
@Override
164+
public List<FlowFile> get(final FlowFileFilter filter) {
165+
return delegate.get(filter);
166+
}
167+
168+
@Override
169+
public QueueSize getQueueSize() {
170+
return delegate.getQueueSize();
171+
}
172+
173+
@Override
174+
public FlowFile create() {
175+
return delegate.create();
176+
}
177+
178+
@Override
179+
public FlowFile create(final FlowFile parent) {
180+
return delegate.create(parent);
181+
}
182+
183+
@Override
184+
public FlowFile create(final Collection<FlowFile> parents) {
185+
return delegate.create(parents);
186+
}
187+
188+
@Override
189+
public FlowFile clone(final FlowFile example) {
190+
return delegate.clone(example);
191+
}
192+
193+
@Override
194+
public FlowFile clone(final FlowFile example, final long offset, final long size) {
195+
return delegate.clone(example, offset, size);
196+
}
197+
198+
@Override
199+
public FlowFile penalize(final FlowFile flowFile) {
200+
return delegate.penalize(flowFile);
201+
}
202+
203+
@Override
204+
public FlowFile putAttribute(final FlowFile flowFile, final String key, final String value) {
205+
return delegate.putAttribute(flowFile, key, value);
206+
}
207+
208+
@Override
209+
public FlowFile putAllAttributes(final FlowFile flowFile, final Map<String, String> attributes) {
210+
return delegate.putAllAttributes(flowFile, attributes);
211+
}
212+
213+
@Override
214+
public FlowFile removeAttribute(final FlowFile flowFile, final String key) {
215+
return delegate.removeAttribute(flowFile, key);
216+
}
217+
218+
@Override
219+
public FlowFile removeAllAttributes(final FlowFile flowFile, final Set<String> keys) {
220+
return delegate.removeAllAttributes(flowFile, keys);
221+
}
222+
223+
@Override
224+
public FlowFile removeAllAttributes(final FlowFile flowFile, final Pattern keyPattern) {
225+
return delegate.removeAllAttributes(flowFile, keyPattern);
226+
}
227+
228+
@Override
229+
public void transfer(final FlowFile flowFile, final Relationship relationship) {
230+
delegate.transfer(flowFile, relationship);
231+
}
232+
233+
@Override
234+
public void transfer(final FlowFile flowFile) {
235+
delegate.transfer(flowFile);
236+
}
237+
238+
@Override
239+
public void transfer(final Collection<FlowFile> flowFiles) {
240+
delegate.transfer(flowFiles);
241+
}
242+
243+
@Override
244+
public void transfer(final Collection<FlowFile> flowFiles, final Relationship relationship) {
245+
delegate.transfer(flowFiles, relationship);
246+
}
247+
248+
@Override
249+
public void remove(final FlowFile flowFile) {
250+
delegate.remove(flowFile);
251+
}
252+
253+
@Override
254+
public void remove(final Collection<FlowFile> flowFiles) {
255+
delegate.remove(flowFiles);
256+
}
257+
258+
@Override
259+
public void read(final FlowFile source, final InputStreamCallback reader) {
260+
delegate.read(source, reader);
261+
}
262+
263+
@Override
264+
public InputStream read(final FlowFile flowFile) {
265+
return delegate.read(flowFile);
266+
}
267+
268+
@Override
269+
public FlowFile merge(final Collection<FlowFile> sources, final FlowFile destination) {
270+
return delegate.merge(sources, destination);
271+
}
272+
273+
@Override
274+
public FlowFile merge(final Collection<FlowFile> sources, final FlowFile destination, final byte[] header, final byte[] footer, final byte[] demarcator) {
275+
return delegate.merge(sources, destination, header, footer, demarcator);
276+
}
277+
278+
@Override
279+
public FlowFile write(final FlowFile source, final OutputStreamCallback writer) {
280+
return delegate.write(source, writer);
281+
}
282+
283+
@Override
284+
public FlowFile write(final FlowFile source, final StreamCallback writer) {
285+
return delegate.write(source, writer);
286+
}
287+
288+
@Override
289+
public OutputStream write(final FlowFile source) {
290+
return delegate.write(source);
291+
}
292+
293+
@Override
294+
public FlowFile append(final FlowFile source, final OutputStreamCallback writer) {
295+
return delegate.append(source, writer);
296+
}
297+
298+
@Override
299+
public FlowFile importFrom(final Path source, final boolean keepSourceFile, final FlowFile destination) {
300+
return delegate.importFrom(source, keepSourceFile, destination);
301+
}
302+
303+
@Override
304+
public FlowFile importFrom(final InputStream source, final FlowFile destination) {
305+
return delegate.importFrom(source, destination);
306+
}
307+
308+
@Override
309+
public void exportTo(final FlowFile flowFile, final Path destination, final boolean append) {
310+
delegate.exportTo(flowFile, destination, append);
311+
}
312+
313+
@Override
314+
public void exportTo(final FlowFile flowFile, final OutputStream destination) {
315+
delegate.exportTo(flowFile, destination);
316+
}
317+
318+
@Override
319+
public ProvenanceReporter getProvenanceReporter() {
320+
return delegate.getProvenanceReporter();
321+
}
322+
323+
@Override
324+
public void setState(final Map<String, String> state, final Scope scope) throws IOException {
325+
delegate.setState(state, scope);
326+
}
327+
328+
@Override
329+
public StateMap getState(final Scope scope) throws IOException {
330+
return delegate.getState(scope);
331+
}
332+
333+
@Override
334+
public boolean replaceState(final StateMap oldValue, final Map<String, String> newValue, final Scope scope) throws IOException {
335+
return delegate.replaceState(oldValue, newValue, scope);
336+
}
337+
338+
@Override
339+
public void clearState(final Scope scope) throws IOException {
340+
delegate.clearState(scope);
341+
}
342+
}
59343
}

0 commit comments

Comments
 (0)