AWS Opensearch를 Kotlin환경에서 사용하기 - 설정 및 Index / Alias 셋업

AWS Opensearch의 클라이언트 선언

구현은 Opensearch Java Client를 사용한다. aws Opensearch를 사용하고 K8s의 환경에 대응하기 위해 awssdk 의존성을 추가한다.

dependencies {
    // ...
    implementation("software.amazon.awssdk:sts:2.25.64")
    implementation("software.amazon.awssdk:apache-client:2.25.60")
    implementation("org.apache.httpcomponents.client5:httpclient5:5.3.1")
    implementation("org.opensearch.client:opensearch-rest-client:2.15.0")
    implementation("org.opensearch.client:opensearch-java:2.10.3")
}

설정 관련 몇가지 코드가 생략됐지만, OpenSearchClient 빈을 생성하는 코드다.

import org.opensearch.client.opensearch.OpenSearchClient
import org.opensearch.client.transport.aws.AwsSdk2Transport
import org.opensearch.client.transport.aws.AwsSdk2TransportOptions
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import software.amazon.awssdk.http.apache.ApacheHttpClient
import software.amazon.awssdk.regions.Region

@Configuration
class OpenSearchConfig(private val properties: OpenSearchConfigurationProperties) {

    @Bean
    fun openSearchClient(): OpenSearchClient {
        val client = ApacheHttpClient.builder().build()
        val awsSdk2Transport = AwsSdk2Transport(
            client,
            properties.hostname,
            "es",
            Region.of(properties.region),
            AwsSdk2TransportOptions.builder().build(),
        )
        return OpenSearchClient(awsSdk2Transport)
    }
}

모델링

코틀린 환경에서 저장할 문서의 모델을 data class로 선언한다. Jackson 기반으로 시리얼라이즈 하도록 구현되어있기 때문에 아래와 같이 선언할 수 있다.

import com.fasterxml.jackson.annotation.JsonProperty

data class UserDocument(
    @JsonProperty("userId")
    val userId: Long,
    @JsonProperty("gender")
    val gender: UserGender,
    @JsonProperty("dateOfBirth")
    val dateOfBirth: Date,
    @JsonProperty("location")
    val location: GeoPoint?,
)

data class GeoPoint(
    @JsonProperty("lat")
    val lat: Double,
    @JsonProperty("lon")
    val lon: Double,
)

이렇게 활용된다.

fun find(query: UserQuery): List<UserDocument> {
    val request = SearchRequest.of { b -> b.index(INDEX_ALIAS_NAME).query(query.toQuery()) }
    return client.search(request, UserDocument::class.java).hits().hits().mapNotNull { it.source() }
}

Index 와 Alias

Opensearch를 무중단으로 안정적으로 운영하기 위해 Index 1개를 직접 이용하지 않고, AliasAlias API를 이용한다.

An alias is a secondary name for a group of data streams or indices. Most Elasticsearch APIs accept an alias in place of a data stream or index name.

Index의 이름이 아닌, Index에 Alias를 추가하여 이 Alias를 이용한 API 사용이 가능하다.

Performs one or more alias actions in a single atomic operation.

그리고 이 Alias API의 변경은 atomic한 작동을 지원하기 때문에, swap 동작을 구현하여 무중단 기능 제공을 가능하게 한다. (마치 배포 시 CNAME SWAP하듯!) 과정을 다이어그램으로 표현하자면 아래와 같다.




client.indices().updateAliases(
    UpdateAliasesRequest.of { builder ->
        builder
            .actions { requests ->
                requests.add { addBuilder ->
                    addBuilder
                        .index("index-002")
                        .alias("index-alias")
                }
            }.actions { requests ->
                requests.remove { removeBuilder ->
                    removeBuilder
                        .index("index-001")
                        .alias("index-alias")
                }
            }
    },
)

이 작업을 atomic하게 완료하게 되는데, 서비스의 API 사용에 영향 없이 새로운 색인을 서빙할 수 있게 된다. 이를 Spring Batch 위에서 구현하고 Cron Job으로 실행시킨다.

    @Bean(JOB_NAME)
    fun updateUsersRelativeScoreJob(): Job = JobBuilder(JOB_NAME, jobRepository)
        .incrementer(RunIdIncrementer())
        .start(getIndicesOfAliasStep())
        .next(createNewIndexStep())
        .next(prepareNewIndexStep())
        .next(switchAliasStep()).on("FAILED").to(switchAliasFallBackStep()).end()
        .build()

Reference


comments powered by Disqus