AWS Opensearch를 Kotlin환경에서 사용하기 - 설정 및 Index / Alias 셋업

🧐
이 포스트는 빈약한 Opensearch Java Client 라이브러리의 예시를 공유하려는 목적도 포함되어있어 코드가 많다....

AWS Opensearch의 클라이언트 선언

구현은 Opensearch Java Client를 사용한다. aws Opensearch를 사용하고 K8s의 환경에 대응하기 위해 awssdk 의존성을 추가한다.

dependencies {
    // ...
    implementation("software.amazon.awssdk:sts:2.25.64")
    implementation("software.amazon.awssdk:apache-client:2.25.60")
    implementation("org.apache.httpcomponents.client5:httpclient5:5.3.1")
    implementation("org.opensearch.client:opensearch-rest-client:2.15.0")
    implementation("org.opensearch.client:opensearch-java:2.10.3")
}

설정 관련 몇가지 코드가 생략됐지만, OpenSearchClient 빈을 생성하는 코드다.

import org.opensearch.client.opensearch.OpenSearchClient
import org.opensearch.client.transport.aws.AwsSdk2Transport
import org.opensearch.client.transport.aws.AwsSdk2TransportOptions
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import software.amazon.awssdk.http.apache.ApacheHttpClient
import software.amazon.awssdk.regions.Region

@Configuration
class OpenSearchConfig(private val properties: OpenSearchConfigurationProperties) {

    @Bean
    fun openSearchClient(): OpenSearchClient {
        val client = ApacheHttpClient.builder().build()
        val awsSdk2Transport = AwsSdk2Transport(
            client,
            properties.hostname,
            "es",
            Region.of(properties.region),
            AwsSdk2TransportOptions.builder().build(),
        )
        return OpenSearchClient(awsSdk2Transport)
    }
}

모델링

코틀린 환경에서 저장할 문서의 모델을 data class로 선언한다. Jackson 기반으로 시리얼라이즈 하도록 구현되어있기 때문에 아래와 같이 선언할 수 있다.

import com.fasterxml.jackson.annotation.JsonProperty

data class UserDocument(
    @JsonProperty("userId")
    val userId: Long,
    @JsonProperty("gender")
    val gender: UserGender,
    @JsonProperty("dateOfBirth")
    val dateOfBirth: Date,
    @JsonProperty("location")
    val location: GeoPoint?,
)

data class GeoPoint(
    @JsonProperty("lat")
    val lat: Double,
    @JsonProperty("lon")
    val lon: Double,
)

이렇게 활용된다.

fun find(query: UserQuery): List<UserDocument> {
    val request = SearchRequest.of { b -> b.index(INDEX_ALIAS_NAME).query(query.toQuery()) }
    return client.search(request, UserDocument::class.java).hits().hits().mapNotNull { it.source() }
}

Index 와 Alias

Opensearch를 무중단으로 안정적으로 운영하기 위해 Index 1개를 직접 이용하지 않고, AliasAlias API를 이용한다.

An alias is a secondary name for a group of data streams or indices. Most Elasticsearch APIs accept an alias in place of a data stream or index name.

Index의 이름이 아닌, Index에 Alias를 추가하여 이 Alias를 이용한 API 사용이 가능하다.

Performs one or more alias actions in a single atomic operation.

그리고 이 Alias API의 변경은 atomic한 작동을 지원하기 때문에, swap 동작을 구현하여 무중단 기능 제공을 가능하게 한다. (마치 배포 시 CNAME SWAP하듯!) 과정을 다이어그램으로 표현하자면 아래와 같다.




client.indices().updateAliases(
    UpdateAliasesRequest.of { builder ->
        builder
            .actions { requests ->
                requests.add { addBuilder ->
                    addBuilder
                        .index("index-002")
                        .alias("index-alias")
                }
            }.actions { requests ->
                requests.remove { removeBuilder ->
                    removeBuilder
                        .index("index-001")
                        .alias("index-alias")
                }
            }
    },
)

이 작업을 atomic하게 완료하게 되는데, 서비스의 API 사용에 영향 없이 새로운 색인을 서빙할 수 있게 된다.