Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,12 @@ public final Object apply(Object input, FunctionInvocationWrapper targetFunction
boolean functionalTracingEnabled = !StringUtils.hasText(functionalTracingEnabledStr)
|| Boolean.parseBoolean(functionalTracingEnabledStr);
if (functionalTracingEnabled && !(input instanceof Publisher) && input instanceof Message && !FunctionTypeUtils.isCollectionOfMessage(targetFunction.getOutputType())) {
Object result = this.doApply(input, targetFunction);
targetFunction.wrapped = false;
return result;
try {
return this.doApply(input, targetFunction);
}
finally {
targetFunction.wrapped = false;
}
}
else {
return targetFunction.apply(input);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,33 @@ public void testWrappedWithAroundAdviseConfiguration() {
assertThat(result.getHeaders().get("after")).isEqualTo("bar");
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testAroundWrapperAppliedOnEveryInvocation() {
FunctionCatalog catalog = this.configureCatalog(AroundWrapperExceptionResetConfiguration.class);
FunctionInvocationWrapper f = catalog.lookup("uppercase");
AtomicInteger wrapperCallCount = (AtomicInteger) this.context.getBean("wrapperCallCount");

// successful invocation
Message result = (Message) f.apply(MessageBuilder.withPayload("hello").build());
assertThat(result.getPayload()).isEqualTo("HELLO");
assertThat(wrapperCallCount.get()).isEqualTo(1);

// failed invocation
try {
f.apply(MessageBuilder.withPayload("exception").build());
}
catch (RuntimeException e) {
// expected
}
assertThat(wrapperCallCount.get()).isEqualTo(2);

// subsequent invocation must still go through the wrapper
result = (Message) f.apply(MessageBuilder.withPayload("world").build());
assertThat(result.getPayload()).isEqualTo("WORLD");
assertThat(wrapperCallCount.get()).isEqualTo(3);
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testEachElementInFluxIsProcessed() {
Expand Down Expand Up @@ -1616,4 +1643,35 @@ public Function<Message<?>, Message<?>> myFunction() {
return msg -> msg;
}
}

@EnableAutoConfiguration
@Configuration
protected static class AroundWrapperExceptionResetConfiguration {

@Bean
public Function<Message<String>, Message<String>> uppercase() {
return v -> {
if ("exception".equals(v.getPayload())) {
throw new RuntimeException("Expected exception");
}
return MessageBuilder.withPayload(v.getPayload().toUpperCase(Locale.ROOT)).copyHeaders(v.getHeaders()).build();
};
}

@Bean
public AtomicInteger wrapperCallCount() {
return new AtomicInteger();
}

@Bean
public FunctionAroundWrapper wrapper(AtomicInteger wrapperCallCount) {
return new FunctionAroundWrapper() {
@Override
protected Object doApply(Object input, FunctionInvocationWrapper targetFunction) {
wrapperCallCount.incrementAndGet();
return targetFunction.apply(input);
}
};
}
}
}