Ktor Server 어플리케이션에서 모니터링 도구 설정하기 - Datadog, Sentry

Ktor로 개발한 서버 어플리케이션에서 모니터링 도구를 설정하는 방법을 공유한다. 레퍼런스도 거의 없고, 원하는 대로 동작하는 믿을만한 도구 / 라이브러리도 없어서 시행 착오 끝에 중간 정리하는 글이다.

Datadog

datadog

이번 글에서 소개하는 작업은 아래와 같은 문제를 해결하기 위함이다.

  • 예상하지 못한 운영 상 문제 상황을 빠르게 파악하기 위한 모니터링 도구 설정
  • 원하는 데이터 (api 요청 별 root span / trace)가 안잡히는 문제 해결

Datadog에 데이터를 보내기 위한 dd-trace-java 라이브러리와 커스터마이즈를 위한 OpenTelemetry API를 사용한다. Sentry로 데이터를 보내기 위해 Sentry SDK를 사용한다.

Datadog Trace Java Agent

Configuring the Java Tracing Library에 자세한 인스트럭션이 있으므로 Dockerfile만 공유한다.

FROM gradle:8.4.0-jdk21 AS build
COPY --chown=gradle:gradle .. /home/gradle/src
WORKDIR /home/gradle/src
ADD dd-java-agent-1.34.0.jar dd-java-agent.jar
RUN gradle build -x test --no-daemon


FROM amazoncorretto:21
EXPOSE 8000
COPY --from=build /home/gradle/src/build/libs/example.app.jar app.jar
COPY --from=build /home/gradle/src/dd-java-agent.jar dd-java-agent.jar

ENV DD_TRACE_OTEL_ENABLED=true
CMD ["java", "-javaagent:dd-java-agent.jar", "-Ddd.service=example-app", "-jar", "/app.jar"]

Customize tracing with OpenTelemetry API

Datadog은 OpenTelemetry을 지원한다. OpenTelemetry은 클라우드 네이티브 환경에서의 모니터링을 위한 표준이다. 자바 어플리케이션의 커스텀한 트레이싱을 위해 OpenTelemetry api를 사용한다. CIO (Coroutine based IO)를 이용하고 있기 때문에 커스터마이즈 없이는 제대로된 데이터를 Datadog으로 보낼 수 없다.

아래와 같은 의존성이 필요하다. gradle 기준으로 아래와 같이 추가한다.

api(platform("io.opentelemetry:opentelemetry-bom:1.40.0"))
api("io.opentelemetry:opentelemetry-api")
api("io.opentelemetry:opentelemetry-extension-kotlin")
api("io.opentelemetry.instrumentation:opentelemetry-instrumentation-annotations:2.6.0")
api("io.opentelemetry.semconv:opentelemetry-semconv:1.26.0-alpha")

문서를 참고했다. 먼저 어플리케이션에 설치할 수 있는 플러그인을 구현했다.

import io.ktor.http.HttpHeaders
import io.ktor.server.application.Application
import io.ktor.server.application.ApplicationCall
import io.ktor.server.application.ApplicationCallPipeline
import io.ktor.server.application.BaseApplicationPlugin
import io.ktor.server.application.application
import io.ktor.server.application.call
import io.ktor.server.auth.jwt.JWTPrincipal
import io.ktor.server.auth.principal
import io.ktor.server.plugins.origin
import io.ktor.server.request.httpMethod
import io.ktor.server.request.uri
import io.ktor.server.routing.RoutingResolveTrace
import io.ktor.server.routing.routing
import io.ktor.util.AttributeKey
import io.ktor.util.pipeline.PipelinePhase
import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.api.trace.SpanKind
import io.opentelemetry.api.trace.StatusCode
import io.opentelemetry.extension.kotlin.asContextElement
import io.opentelemetry.semconv.ClientAttributes
import io.opentelemetry.semconv.HttpAttributes
import io.opentelemetry.semconv.UrlAttributes
import kotlinx.coroutines.withContext

class OpenTracingServerPlugin(
    configuration: Configuration,
) {
    class Configuration {
        var filter: ((ApplicationCall) -> Boolean) = { false }
    }

    companion object Plugin : BaseApplicationPlugin<Application, Configuration, OpenTracingServerPlugin> {
        private val KTOR_ROUTING_TRACE_RESULT_REGEX = Regex("""Route resolve result:\s*SUCCESS;.*\s+@ ([^@]+)""")

        override val key = AttributeKey<OpenTracingServerPlugin>("OpenTracingServer")

        override fun install(
            pipeline: Application,
            configure: Configuration.() -> Unit,
        ): OpenTracingServerPlugin {
            val configuration = Configuration().apply(configure)
            val plugin = OpenTracingServerPlugin(configuration)
            val tracingPhase = PipelinePhase("OpenTracingServer")
            pipeline.insertPhaseAfter(ApplicationCallPipeline.Monitoring, tracingPhase)
            pipeline.intercept(tracingPhase) {
                if (configuration.filter(call)) return@intercept
                val tracer = GlobalOpenTelemetry.getTracer("ktor.server")
                val span =
                    tracer
                        .spanBuilder(call.request.let { "${it.httpMethod.value} - ${it.uri}" })
                        .setSpanKind(SpanKind.SERVER)
                        .setAttribute(HttpAttributes.HTTP_REQUEST_METHOD, call.request.httpMethod.value)
                        .setAttribute(UrlAttributes.URL_FULL, call.request.uri)
                        .setAttribute("http.url", call.request.uri)
                        .setAttribute("http.method", call.request.httpMethod.value)
                        .apply {
                            call.request
                                .let {
                                    it.origin.remoteHost.takeIf { host ->
                                        !host.startsWith("0:0:0:0") &&
                                            !host.startsWith("10.0.") &&
                                            !host.startsWith("172.16.") &&
                                            !host.startsWith("192.168.")
                                    }
                                }?.let { setAttribute(ClientAttributes.CLIENT_ADDRESS, it) }
                        }.startSpan()
                call.attributes.put(AttributeKey<String>("traceId"), span.spanContext.traceId)
                call.attributes.put(AttributeKey<String>(HttpHeaders.XRequestId), span.spanContext.spanId)
                var routingTrace: RoutingResolveTrace? = null
                application.routing { trace { routingTrace = it } }
                try {
                    withContext(span.asContextElement()) {
                        proceed()
                    }
                    span.setStatus(StatusCode.OK, "process request success")
                } catch (e: Throwable) {
                    span.setStatus(StatusCode.ERROR, "process request error")
                    span.recordException(e)
                    throw e
                } finally {
                    call.response
                        .status()
                        ?.value
                        ?.let {
                            span.setAttribute(HttpAttributes.HTTP_RESPONSE_STATUS_CODE, it)
                            span.setAttribute("http.status_code", it.toString())
                        }
                    call
                        .principal<JWTPrincipal>()
                        ?.let { 
                            span.setAttribute("principalServiceName", 
                            it.payload.getClaim("service").asString()) 
                        }
                    call.request.headers[CLIENT_REQUEST_ID_HEADER]
                        ?.let { span.setAttribute("xClientRequestId", it) }
                    routingTrace
                        ?.buildText()
                        ?.let { text -> KTOR_ROUTING_TRACE_RESULT_REGEX.find(text) }
                        ?.groupValues
                        ?.get(1)
                        ?.trim()
                        ?.let { resourceName -> span.updateName(resourceName) }
                    span.end()
                }
            }
            return plugin
        }
    }
}

Routing 얻어내기

심플하게 call.request.uri를 span의 이름으로 설정할 수 있는데, datadog에서는 불편한 점이 있다. resource name에 path parameter가 포함되는 경우가 있기 때문에 해당 API로만 필터링하여 관찰하기 어려운 점이 있다.

ktor 가 요청에 대한 라우팅을 어떻게 처리하는지 알아내려면 내부적으로 Application에 설치된 Routing Plugin이 동작할 때 남기는 trace 객체를 얻어내야 한다. io.ktor.server.routing.Routing은 플러그인 클래스인데, 이 클래스가 라우트할 엔드포인트를 찾아내는 과정을 수행하면서 trace 객체를 남긴다. 그 객체를 통해 어떤 라우트가 선택됐는지 알 수 있다.

var routingTrace: RoutingResolveTrace? = null
application.routing { trace { routingTrace = it } }

application.routing.trace 블록 안에서 routingTrace 변수로 trace 객체를 갖고온다. 요청이 실행됐을 때, routingTrace 변수를 통해 얻어낸 라우팅 로그를 파싱해 span의 이름을 업데이트한다. RoutingResolveTrace 구현을 살펴보면 우아하게(?) finalResult를 갖고올 수 있을 것 처럼 보였는데, private 라서 어쩔 수 없이 로그를 파싱했다. 공식 문서에서도 로그에 대한 이야기밖에 없다.

private val KTOR_ROUTING_TRACE_RESULT_REGEX = Regex("""Route resolve result:\s*SUCCESS;.*\s+@ ([^@]+)""")
routingTrace
    ?.buildText()
    ?.let { text -> KTOR_ROUTING_TRACE_RESULT_REGEX.find(text) }
    ?.groupValues
    ?.get(1)
    ?.trim()
    ?.let { resourceName -> span.updateName(resourceName) }

이 정규 표현식을 이용해서 라우팅 로그에서 resource 이름을 추출한다.

Datadog을 위한 http 정보 설정

.setAttribute("http.url", call.request.uri)
.setAttribute("http.method", call.request.httpMethod.value)
.setAttribute("http.status_code", it.toString())
// ...

이 정보들이 있어야 Trace를 볼 때 status code와 같은 정보들을 쉽게 볼 수 있다. 결국 이렇게 설정한 결과, Datadog에서는 아래와 같이 트레이스를 볼 수 있었다.


하지만, 아직 부족하다. api 호출에 포함된 서비스 클래스, 저장소 클래스들의 동작 정보를 아직 수집하지 않았다.

더 기록하고 싶은 정보는 아래와 같다.

위 목적을 달성하기 위해, AspectJ를 사용했으며 다양한 사용 방법이 있지만, Compile-time으로 Weaving하지 않고 Load-Time Weaving 했다. java agent로 Aspectjweaver를 사용하기 때문에, 로컬에서는 아예 배재할 수 있고, 도커 이미지에만 포함시킬 수 있다.

구성

<aspectj>
    <weaver options="-verbose -showWeaveInfo">
        <include within="example.app..*" />
        <include within="aws.sdk.kotlin.services.dynamodb..*" />
    </weaver>
    <aspects>
        <aspect name="example.app.plugins.opentracing.TraceAspect" />
    </aspects>
</aspectj>

이렇게 aop.xml 파일을 작성하고, AspectJWeaver를 사용하기 위해 META-INF/aop.xml 파일을 작성한다.

그리고 Dockerfile에 아래와 같이 CMD를 변경한다.

CMD ["java", "-javaagent:dd-java-agent.jar", "-javaagent:aspectjweaver.jar", "-Ddd.service=example-app", "-jar", "/app.jar"]

기동 초기에 관련 로그를 살펴보면 대상을 잘 감지하는 것을 확인할 수 있다.

Aspect 구현

import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.api.trace.Span
import io.opentelemetry.api.trace.StatusCode
import io.opentelemetry.context.Context
import example.app.plugins.opentracing.OpenTracingServerPlugin.Plugin.INSTRUMENTATION_SCOPE_NAME
import org.aspectj.lang.ProceedingJoinPoint
import org.aspectj.lang.annotation.Around
import org.aspectj.lang.annotation.Aspect
import org.aspectj.lang.reflect.CodeSignature

@Aspect
class TraceAspect {
    private val tracer = GlobalOpenTelemetry.getTracer(INSTRUMENTATION_SCOPE_NAME)

    @Around(
        "execution(public * example.app.services..*(..)) || execution(public * example.app.repositories..*(..))",
    )
    fun trace(joinPoint: ProceedingJoinPoint): Any? {
        val span = getCurrentSpan(joinPoint)
        return proceed(span, joinPoint)
    }

    @Around(
        "execution(public * aws.sdk.kotlin.services.dynamodb.DynamoDbClient.*(..)) || execution(public * aws.sdk.kotlin.services.dynamodb.DefaultDynamoDbClient.*(..))",
    )
    fun traceAWSDynamoDb(joinPoint: ProceedingJoinPoint): Any? {
        val span = getCurrentSpan(joinPoint)
        resolveDynamoDbInput(joinPoint).getOrNull()?.let {
            span.setAttribute("dynamoDbInput", it)
        }
        return proceed(span, joinPoint)
    }

    private fun getCurrentSpan(joinPoint: ProceedingJoinPoint): Span {
        val parentSpan: Span = Span.current()
        return tracer
            .spanBuilder(
                joinPoint.signature.declaringType.simpleName + "." + joinPoint.signature.name,
            ).setParent(Context.current().with(parentSpan))
            .startSpan()
    }

    private fun proceed(
        span: Span,
        joinPoint: ProceedingJoinPoint,
    ): Any? {
        try {
            span.makeCurrent().use { _ ->
                return joinPoint.proceed()
            }
        } catch (e: IllegalArgumentException) {
            span.setStatus(StatusCode.ERROR)
            throw e
        } finally {
            span.end()
        }
    }

    private fun resolveDynamoDbInput(joinPoint: ProceedingJoinPoint): String? {
        val signature = joinPoint.signature as CodeSignature
        for (i in signature.parameterNames.indices) {
            if (signature.parameterNames[i] == "input") {
                return joinPoint.args[i].toString()
            }
        }
        return null
    }
}

AspectJ를 이용해서 각 클래스들의 메소드 호출 정보를 수집하고, DynamoDb 클라이언트가 대상인 경우 어떤 API Request를 보냈는지 수집한다. 모든 DynamoDb Request는 input이라는 이름의 파라미터로 전달하고, toString()이 잘 구현되어 있기 때문에 충분한 정보를 Datadog에서 확인할 수 있게 된다.

Sentry

fun Application.configureMonitoring() {
    val stage = environment.getAsString("stage")
    if (stage in listOf("dev", "prod")) {
        install(OpenTracingServerPlugin) {
            filter = { call -> call.request.path().startsWith("/system") }
        }
        Sentry.init { options ->
            options.dsn = environment.getAsString("sentry.dsn")
            options.environment = stage
            options.tracesSampleRate = 1.0
            options.isDebug = false
        }
    }
}

위에서 언급한 OpenTracingServerPlugin을 설치하면서, Sentry까지 셋업하는 코드이다. Sentry는 올바른 DSN 정보와 같이 잘 초기화만 해주면 문제없다. Sentry에서 퍼포먼스 모니터는 하지 않을 예정이므로 이정도에서 마무리 했지만, 추가 설정이 필요하다면 이 스레드를 참고하면 될 것 같다.

StatusPages를 이용해서 예외를 처리하면서 Sentry에 예외 정보를 보내는 코드를 추가했다.

import io.ktor.http.HttpStatusCode
import io.ktor.server.application.Application
import io.ktor.server.application.install
import io.ktor.server.plugins.statuspages.StatusPages
import io.ktor.server.response.respond
import io.sentry.Sentry
import example.app.exceptions.services.BaseException

fun Application.handleExceptions() {
    install(StatusPages) {
        exception<BaseException> { call, cause ->
            call.respond<ErrorSchema>(HttpStatusCode.NotFound, ErrorSchema(cause))
        }
        exception<Throwable> { _, cause -> Sentry.captureException(cause) }
    }
}

ps. io.opentelemetry.instrumentation:opentelemetry-ktor-2.0 라이브러리도 존재한다. 하지만, 원하는 결과물을 만드는데 어려워서 포기했다.


comments powered by Disqus