AWS Opensearch를 Kotlin환경에서 사용하기 - 설정 및 Index / Alias 셋업
AWS Opensearch의 클라이언트 선언
구현은 Opensearch Java Client를 사용한다. aws Opensearch를 사용하고 K8s의 환경에 대응하기 위해 awssdk 의존성을 추가한다.
dependencies {
// ...
implementation("software.amazon.awssdk:sts:2.25.64")
implementation("software.amazon.awssdk:apache-client:2.25.60")
implementation("org.apache.httpcomponents.client5:httpclient5:5.3.1")
implementation("org.opensearch.client:opensearch-rest-client:2.15.0")
implementation("org.opensearch.client:opensearch-java:2.10.3")
}
설정 관련 몇가지 코드가 생략됐지만, OpenSearchClient 빈을 생성하는 코드다.
import org.opensearch.client.opensearch.OpenSearchClient
import org.opensearch.client.transport.aws.AwsSdk2Transport
import org.opensearch.client.transport.aws.AwsSdk2TransportOptions
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import software.amazon.awssdk.http.apache.ApacheHttpClient
import software.amazon.awssdk.regions.Region
@Configuration
class OpenSearchConfig(private val properties: OpenSearchConfigurationProperties) {
@Bean
fun openSearchClient(): OpenSearchClient {
val client = ApacheHttpClient.builder().build()
val awsSdk2Transport = AwsSdk2Transport(
client,
properties.hostname,
"es",
Region.of(properties.region),
AwsSdk2TransportOptions.builder().build(),
)
return OpenSearchClient(awsSdk2Transport)
}
}
모델링
코틀린 환경에서 저장할 문서의 모델을 data class로 선언한다. Jackson 기반으로 시리얼라이즈 하도록 구현되어있기 때문에 아래와 같이 선언할 수 있다.
import com.fasterxml.jackson.annotation.JsonProperty
data class UserDocument(
@JsonProperty("userId")
val userId: Long,
@JsonProperty("gender")
val gender: UserGender,
@JsonProperty("dateOfBirth")
val dateOfBirth: Date,
@JsonProperty("location")
val location: GeoPoint?,
)
data class GeoPoint(
@JsonProperty("lat")
val lat: Double,
@JsonProperty("lon")
val lon: Double,
)
이렇게 활용된다.
fun find(query: UserQuery): List<UserDocument> {
val request = SearchRequest.of { b -> b.index(INDEX_ALIAS_NAME).query(query.toQuery()) }
return client.search(request, UserDocument::class.java).hits().hits().mapNotNull { it.source() }
}
Index 와 Alias
Opensearch를 무중단으로 안정적으로 운영하기 위해 Index 1개를 직접 이용하지 않고, Alias와 Alias API를 이용한다.
An alias is a secondary name for a group of data streams or indices. Most Elasticsearch APIs accept an alias in place of a data stream or index name.
Index의 이름이 아닌, Index에 Alias를 추가하여 이 Alias를 이용한 API 사용이 가능하다.
Performs one or more alias actions in a single atomic operation.
그리고 이 Alias API의 변경은 atomic한 작동을 지원하기 때문에, swap 동작을 구현하여 무중단 기능 제공을 가능하게 한다. (마치 배포 시 CNAME SWAP하듯!) 과정을 다이어그램으로 표현하자면 아래와 같다.
client.indices().updateAliases(
UpdateAliasesRequest.of { builder ->
builder
.actions { requests ->
requests.add { addBuilder ->
addBuilder
.index("index-002")
.alias("index-alias")
}
}.actions { requests ->
requests.remove { removeBuilder ->
removeBuilder
.index("index-001")
.alias("index-alias")
}
}
},
)
이 작업을 atomic하게 완료하게 되는데, 서비스의 API 사용에 영향 없이 새로운 색인을 서빙할 수 있게 된다. 이를 Spring Batch 위에서 구현하고 Cron Job으로 실행시킨다.
@Bean(JOB_NAME)
fun updateUsersRelativeScoreJob(): Job = JobBuilder(JOB_NAME, jobRepository)
.incrementer(RunIdIncrementer())
.start(getIndicesOfAliasStep())
.next(createNewIndexStep())
.next(prepareNewIndexStep())
.next(switchAliasStep()).on("FAILED").to(switchAliasFallBackStep()).end()
.build()
getIndicesOfAliasStep
: 현재 Alias를 사용하고 있는 Index를 조회합니다.createNewIndexStep
: 새로운 Index을 생성합니다.prepareNewIndexStep
: 새로운 Index에 문서를 색인합니다.switchAliasStep
: Alias를 사용하고 있는 Index를 할당 해제, 새로운 Index에 할당합니다. (atomic)switchAliasFallBackStep
: 실패 시 기존의 Alias 할당으로 돌아갑니다.