How can the MDC context be used in the reactive Spring applications
The slf4j library and its underlying implementations like logback offer a concept of the Mapped Diagnostic Context. In the multi-threaded systems, where each user request is handled by a separate thread, the MDC context enables us to produce different logging output for each request. We can use this technique to include session id, request id or actually any request specific property to each log message outputted during the execution of that specific request.
The standard way of using the MDC is to set a context value bounded to a specific key. Later on, we can reference that same key in our logging configuration and the logging framework will print the value that is available in the MDC context for a thread running the request.
|
1 2 3 4 5 6 7 8 9 |
val sessionId = .. # read session id from somewhere MDC.set("session-id", sessionId) ... start of the request .. logger.info("Request started") ... end of the request .. MDC.clear() |
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
<configuration> <appender name="msg" class="ch.qos.logback.core.ConsoleAppender"> <encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder"> <providers> <pattern> <pattern> { "log": "%msg", "session-id": "%session-id", "time": "%d{yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSSZ}" } </pattern> </pattern> </providers> </encoder> </appender> <root level="info"> <appender-ref ref="msg"/> </root> </configuration> |
|
1 2 3 |
{"log":"Request started","sessionId":"449c2a24-0fb8-430c-88fc-ab4247f4aac6","time":"2018-12-14T15:06:32.000000516+0100"} {"log":"Request started","sessionId":"6713a28b-b570-4a04-b114-4813a8e23985","time":"2018-12-14T15:06:32.000000517+0100"} {"log":"Request started","sessionId":"44a84121-795d-49fc-84c7-4368b30aab96","time":"2018-12-14T15:06:32.000000666+0100"} |
This technique works well as long as a single request is completely processed by one thread, as the MDC uses the Java ThreadLocal to store the values and separate them between threads. However, Spring 5 comes with the Spring WebFlux that uses Spring Reactor and Netty to handle requests in a reactive way. In the reactive non-blocking world, a request could be processed by multiple threads. This means that setting the MDC context at the beginning of the request is not anymore an option.
In order to continue using the MDC feature in the reactive Spring application, we need to make sure that whenever a thread starts processing a request it has to update the state of the MDC context. This can be done by doing two things:
- All the values that we previously added directly to the MDC context should now be added to Reactor context. The framework will ensure that the reactive context is passed along the reactive execution and it will not be bounded to any specific thread.
- Ensure that values residing in the reactive context are copied to the MDC context whenever there is a possibility that a thread that processes the request has changed. For this, we will implement the CoreSubscriber that will be hooked into the Reactor using Hooks.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
import org.reactivestreams.Subscription import org.slf4j.MDC import org.springframework.context.annotation.Configuration import reactor.core.CoreSubscriber import reactor.core.publisher.Hooks import reactor.core.publisher.Operators import reactor.util.context.Context import java.util.stream.Collectors import javax.annotation.PostConstruct import javax.annotation.PreDestroy @Configuration class MdcContextLifterConfiguration { companion object { val MDC_CONTEXT_REACTOR_KEY: String = MdcContextLifterConfiguration::class.java.name } @PostConstruct fun contextOperatorHook() { Hooks.onEachOperator(MDC_CONTEXT_REACTOR_KEY, Operators.lift { _, subscriber -> MdcContextLifter(subscriber) }) } @PreDestroy fun cleanupHook() { Hooks.resetOnEachOperator(MDC_CONTEXT_REACTOR_KEY) } } /** * Helper that copies the state of Reactor [Context] to MDC on the #onNext function. */ class MdcContextLifter<T>(private val coreSubscriber: CoreSubscriber<T>) : CoreSubscriber<T> { override fun onNext(t: T) { coreSubscriber.currentContext().copyToMdc() coreSubscriber.onNext(t) } override fun onSubscribe(subscription: Subscription) { coreSubscriber.onSubscribe(subscription) } override fun onComplete() { coreSubscriber.onComplete() } override fun onError(throwable: Throwable?) { coreSubscriber.onError(throwable) } override fun currentContext(): Context { return coreSubscriber.currentContext() } } /** * Extension function for the Reactor [Context]. Copies the current context to the MDC, if context is empty clears the MDC. * State of the MDC after calling this method should be same as Reactor [Context] state. * One thread-local access only. */ private fun Context.copyToMdc() { if (!this.isEmpty) { val map: Map<String, String> = this.stream() .collect(Collectors.toMap({ e -> e.key.toString() }, { e -> e.value.toString() })) MDC.setContextMap(map) } else { MDC.clear() } } |
With the above configuration in place, the only thing left is to add context values to a reactive sequence. Once the sequence is materialized the MDC context will be properly updated, so any logging statement we produce in the userService will correctly log the sessionId that was passed along.
|
1 2 3 |
val sessionId = .. # read session id from somewhere return userService.getUserMono() .subscriberContext { ctx -> ctx.put("session-id", sessionId) } |
Recent posts






Comment article