레디스 클러스터의 Sharded Pub/Sub
Sharded Pub/Sub이란?
AWS contributions to Valkey
AWS has a long history of contributing to Redis OSS. For example, AWS previously contributed several major features in Redis OSS 7 including fine grained access control over keys and commands, native hostname support for clustered configuration which enables TLS security, and partitioned channels for scalable pub/sub.
Redis 7.0부터 도입된 Sharded Pub/Sub은 기존 pub/sub의 확장성 문제를 해결하기 위해 설계되었다. 기존 pub/sub은 메시지가 클러스터의 모든 노드로 전파되어 네트워크 대역폭과 CPU 사용량이 클러스터 크기에 비례해서 증가하는 문제가 있었다.
Sharded Pub/Sub에서는 채널이 Redis 키와 동일한 해시 알고리즘을 통해 특정 슬롯에 할당된다. 메시지는 해당 슬롯을 담당하는 노드에서만 처리되며, 그 노드의 마스터와 레플리카에만 전파된다. 이를 통해 클러스터 내 데이터 전송량을 제한하고 수평 확장이 가능해진다.
# Sharded pub/sub 구독
SSUBSCRIBE channel1 channel2
# Sharded pub/sub 메시지 발행
SPUBLISH channel1 "message"
# Sharded pub/sub 구독 해제
SUNSUBSCRIBE channel1 channel2
1. Redis 서버의 Pub/Sub 상태 저장
/* pubsub.c */
/* Structure to hold the pubsub related metadata. Currently used
* for pubsub and pubsubshard feature. */
typedef struct pubsubtype {
int shard;
dict *(*clientPubSubChannels)(client*);
int (*subscriptionCount)(client*);
kvstore **serverPubSubChannels;
robj **subscribeMsg;
robj **unsubscribeMsg;
robj **messageBulk;
}pubsubtype;
Sharded Pub/Sub 기능 추가에서 처음 등장한 pubsubtype
이라는 구조체가 있다.
일반 pub/sub과 sharded pub/sub의 차이점을 추상화하기 위한 함수 포인터 테이블이다.
Redis 서버 시작 시 두 개의 싱글턴 인스턴스가 초기화되고, 모든 pub/sub 함수들이 이를 파라미터로 받아 동작을 결정한다.
아래와 같이 일반 Pub/Sub 을 위한 것과 Sharded Pub/Sub 을 위한 정의가 되어있다.
/* pubsub.c */
/*
* Pub/Sub type for global channels.
*/
pubsubtype pubSubType = {
.shard = 0,
.clientPubSubChannels = getClientPubSubChannels,
.subscriptionCount = clientSubscriptionsCount,
.serverPubSubChannels = &server.pubsub_channels,
.subscribeMsg = &shared.subscribebulk,
.unsubscribeMsg = &shared.unsubscribebulk,
.messageBulk = &shared.messagebulk,
};
/*
* Pub/Sub type for shard level channels bounded to a slot.
*/
pubsubtype pubSubShardType = {
.shard = 1,
.clientPubSubChannels = getClientPubSubShardChannels,
.subscriptionCount = clientShardSubscriptionsCount,
.serverPubSubChannels = &server.pubsubshard_channels,
.subscribeMsg = &shared.ssubscribebulk,
.unsubscribeMsg = &shared.sunsubscribebulk,
.messageBulk = &shared.smessagebulk,
};
.shard
필드는 해당 pubsub 타입이 클러스터 슬롯 기반으로 분산되는지를 나타내는 플래그다.
이 구조체를 통해 Redis는 일반 pub/sub과 sharded pub/sub의 차이점을 하나의 코드베이스로 처리한다. 모든 pub/sub 관련 함수들이 pubsubtype
파라미터를 받아서 동작을 결정하는 방식이다. 예를 들어 구독을 처리하는 pubsubSubscribeChannel
함수를 보면, 클러스터가 활성화되어 있고 type.shard
가 1인 경우에만 슬롯을 계산한다. 일반 pub/sub의 경우 shard
가 0이므로 항상 slot=0
을 사용하고, sharded pub/sub의 경우 getKeySlot()
을 통해 채널명을 해시해서 실제 슬롯 번호를 구한다.
구독 시
// pubsub.c
int pubsubSubscribeChannel(client *c, robj *channel, pubsubtype type) {
unsigned int slot = 0;
// shard 값에 따른 분기 처리
if (server.cluster_enabled && type.shard) {
slot = getKeySlot(channel->ptr); // 샤드면 슬롯 계산
}
// slot = 0 (일반 pubsub은 항상 슬롯 0 사용)
// 저장소 선택: type.serverPubSubChannels 함수 포인터 사용
de = kvstoreDictAddRaw(*type.serverPubSubChannels, slot, channel, &existing);
// ↳ 일반: &server.pubsub_channels
// ↳ 샤드: &server.pubsubshard_channels
// 응답 메시지: type.subscribeMsg 함수 포인터 사용
addReplyPubsubSubscribed(c, channel, type);
// ↳ 일반: shared.subscribebulk ("subscribe")
// ↳ 샤드: shared.ssubscribebulk ("ssubscribe")
}
저장소도 다르게 사용된다. 일반 pub/sub은 server.pubsub_channels
를 사용하고, sharded pub/sub은 server.pubsubshard_channels
를 사용한다.
일반 pub/sub은 항상 슬롯 0에만 데이터를 저장하고, sharded pub/sub은 채널별로 계산된 실제 슬롯에 분산 저장된다.
메시지 발행 시
// pubsub.c:465 - pubsubPublishMessageInternal 함수
int pubsubPublishMessageInternal(robj *channel, robj *message, pubsubtype type) {
unsigned int slot = 0;
// 발행 시에도 동일한 슬롯 계산 로직
if (server.cluster_enabled && type.shard) {
slot = keyHashSlot(channel->ptr, sdslen(channel->ptr));
}
// 구독자 찾기: 동일한 슬롯에서 검색
de = kvstoreDictFind(*type.serverPubSubChannels, slot, channel);
if (de) {
dict *clients = dictGetVal(de);
// 해당 채널 구독자들에게 메시지 전송
while ((entry = dictNext(&iter)) != NULL) {
client *c = dictGetKey(entry);
addReplyPubsubMessage(c, channel, message, *type.messageBulk);
// ↳ 일반: shared.messagebulk ("message")
// ↳ 샤드: shared.smessagebulk ("smessage")
}
}
// 패턴 매칭 (샤드는 제외)
if (type.shard) {
return receivers; // 샤드는 패턴 구독 미지원
}
// 일반 pubsub만 패턴 매칭 수행
}
메시지 발행 시에도 동일한 로직이 적용된다.
pubsubPublishMessageInternal
함수에서 구독자를 찾을 때도 동일한 슬롯 계산을 통해 해당 슬롯에서만 검색한다.
구독 해제 시
// pubsub.c:283 - pubsubUnsubscribeChannel 함수
int pubsubUnsubscribeChannel(client *c, robj *channel, int notify, pubsubtype type) {
int slot = 0;
// 구독과 동일한 슬롯 계산
if (server.cluster_enabled && type.shard) {
slot = getKeySlot(channel->ptr);
}
// 클라이언트에서 채널 제거
if (dictDelete(type.clientPubSubChannels(c), channel) == DICT_OK) {
// ↳ 일반: c->pubsub_channels
// ↳ 샤드: c->pubsubshard_channels
// 서버에서 클라이언트 제거
de = kvstoreDictFind(*type.serverPubSubChannels, slot, channel);
clients = dictGetVal(de);
dictDelete(clients, c);
// 마지막 구독자면 채널 삭제
if (dictSize(clients) == 0) {
kvstoreDictDelete(*type.serverPubSubChannels, slot, channel);
}
}
// 응답: type.unsubscribeMsg 사용
if (notify) {
addReplyPubsubUnsubscribed(c, channel, type);
// ↳ 일반: shared.unsubscribebulk ("unsubscribe")
// ↳ 샤드: shared.sunsubscribebulk ("sunsubscribe")
}
}
구독 해제도 마찬가지다.
클라이언트에서 채널을 제거할 때는 type.clientPubSubChannels()
함수 포인터를 통해 일반 pub/sub은 c->pubsub_channels
에서, sharded pub/sub은 c->pubsubshard_channels
에서 제거한다.
서버에서도 동일한 슬롯에서 해당 클라이언트를 제거하고, 마지막 구독자였다면 채널 자체를 삭제한다.
클러스터 리밸런싱
// cluster_legacy.c
void removeChannelsInSlot(unsigned int slot) {
if (countChannelsInSlot(slot) == 0) return;
pubsubShardUnsubscribeAllChannelsInSlot(slot);
}
// pubsub.c
void pubsubShardUnsubscribeAllChannelsInSlot(unsigned int slot) {
// 오직 샤드 채널만 슬롯별 정리
kvstoreDictIterator *kvs_di = kvstoreGetDictSafeIterator(server.pubsubshard_channels, slot);
// 해당 슬롯의 모든 샤드 채널 구독 해제
dictEntry *de;
while ((de = kvstoreDictIteratorNext(kvs_di)) != NULL) {
robj *channel = dictGetKey(de);
dict *clients = dictGetVal(de);
/* For each client subscribed to the channel, unsubscribe it. */
dictIterator iter;
dictEntry *entry;
dictInitIterator(&iter, clients);
while ((entry = dictNext(&iter)) != NULL) {
client *c = dictGetKey(entry);
int retval = dictDelete(c->pubsubshard_channels, channel);
serverAssertWithInfo(c,channel,retval == DICT_OK);
addReplyPubsubUnsubscribed(c, channel, pubSubShardType);
/* If the client has no other pubsub subscription,
* move out of pubsub mode. */
if (clientTotalPubSubSubscriptionCount(c) == 0) {
unmarkClientAsPubSub(c);
}
}
dictResetIterator(&iter);
kvstoreDictDelete(server.pubsubshard_channels, slot, channel);
}
kvstoreReleaseDictIterator(kvs_di);
}
가장 중요한 차이점은 클러스터 리밸런싱 시 나타난다.
pubsubShardUnsubscribeAllChannelsInSlot
함수는 오직 sharded pub/sub 채널만을 대상으로 한다.
특정 슬롯이 다른 노드로 이동할 때, 해당 슬롯에 있는 모든 sharded 채널의 구독을 자동으로 해제한다.
일반 pub/sub은 모든 채널이 슬롯 0에 저장되어 있고, 슬롯 0은 절대 이동하지 않기 때문에 재밸런싱의 영향을 받지 않는다. 하지만 sharded pub/sub은 채널이 실제 슬롯에 분산되어 있어서, 슬롯이 이동하면 해당 슬롯의 채널들은 더 이상 접근할 수 없게 되므로 구독을 해제해야 한다.
2. Slot 관리
/* Pubsub */
dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
dict *pubsub_patterns; /* patterns a client is interested in (PSUBSCRIBE) */
dict *pubsubshard_channels; /* shard level channels a client is interested in (SSUBSCRIBE) */
/* We have 16384 hash slots. The hash slot of a given key is obtained
* as the least significant 14 bits of the crc16 of the key.
*
* However, if the key contains the {...} pattern, only the part between
* { and } is hashed. This may be useful in the future to force certain
* keys to be in the same node (assuming no resharding is in progress). */
static inline unsigned int keyHashSlot(char *key, int keylen) {
int s, e; /* start-end indexes of { and } */
for (s = 0; s < keylen; s++)
if (key[s] == '{') break;
/* No '{' ? Hash the whole key. This is the base case. */
if (likely(s == keylen)) return crc16(key,keylen) & 0x3FFF;
/* '{' found? Check if we have the corresponding '}'. */
for (e = s+1; e < keylen; e++)
if (key[e] == '}') break;
/* No '}' or nothing between {} ? Hash the whole key. */
if (e == keylen || e == s+1) return crc16(key,keylen) & 0x3FFF;
/* If we are here there is both a { and a } on its right. Hash
* what is in the middle between { and }. */
return crc16(key+s+1,e-s-1) & 0x3FFF;
}
0x3FFF
는 16383 이다. 레디스의 슬롯은 0~16383 의 값을 사용하는데, 이는 2^14
개이다.
즉 레디스 클러스터는 14bit 의 슬롯을 관리한다.
Sharded Pub/Sub의 경우 채널명이 CRC16 해시를 통해 특정 슬롯에 매핑되고, 그 슬롯을 담당하는 노드에서만 해당 채널의 구독/발행을 처리한다.
// pubsub.c - 슬롯별 구독자 조회
de = kvstoreDictFind(*type.serverPubSubChannels, slot, channel);
일반 pub/sub과 달리 sharded pub/sub은 각 노드가 담당하는 슬롯의 채널만 관리한다. 이를 통해:
- 메모리 효율성: 각 노드는 자신이 담당하는 슬롯의 채널만 저장
- 네트워크 효율성: 메시지가 모든 노드로 전파되지 않고 해당 슬롯 노드에서만 처리
- 확장성: 클러스터 크기가 커져도 각 노드의 부하가 분산됨
// cluster.h - keyHashSlot 함수에서 hash tag 처리
for (s = 0; s < keylen; s++)
if (key[s] == '{') break;
// '{' found? Check if we have the corresponding '}'.
for (e = s+1; e < keylen; e++)
if (key[e] == '}') break;
// Hash what is in the middle between { and }.
return crc16(key+s+1,e-s-1) & 0x3FFF;
0. Extra
슬롯 강제 배치
위에서 발췌한 슬롯 해싱 코드를 보면 알수도 있었겠지만, Hash tag를 사용하면 여러 채널을 같은 슬롯에 강제로 배치할 수 있다:
# 모두 동일한 슬롯에 배치됨
SSUBSCRIBE order{payment}
SSUBSCRIBE shipping{payment}
SSUBSCRIBE invoice{payment}
이는 관련된 채널들을 같은 노드에서 처리하고 싶을 때 유용하다.