레디스 클러스터의 Sharded Pub/Sub

Sharded Pub/Sub이란?

AWS contributions to Valkey

AWS has a long history of contributing to Redis OSS. For example, AWS previously contributed several major features in Redis OSS 7 including fine grained access control over keys and commands, native hostname support for clustered configuration which enables TLS security, and partitioned channels for scalable pub/sub.

https://aws.amazon.com/ko/blogs/database/amazon-elasticache-and-amazon-memorydb-announce-support-for-valkey/

Redis 7.0부터 도입된 Sharded Pub/Sub은 기존 pub/sub의 확장성 문제를 해결하기 위해 설계되었다. 기존 pub/sub은 메시지가 클러스터의 모든 노드로 전파되어 네트워크 대역폭과 CPU 사용량이 클러스터 크기에 비례해서 증가하는 문제가 있었다.

Sharded Pub/Sub에서는 채널이 Redis 키와 동일한 해시 알고리즘을 통해 특정 슬롯에 할당된다. 메시지는 해당 슬롯을 담당하는 노드에서만 처리되며, 그 노드의 마스터와 레플리카에만 전파된다. 이를 통해 클러스터 내 데이터 전송량을 제한하고 수평 확장이 가능해진다.

# Sharded pub/sub 구독
SSUBSCRIBE channel1 channel2

# Sharded pub/sub 메시지 발행  
SPUBLISH channel1 "message"

# Sharded pub/sub 구독 해제
SUNSUBSCRIBE channel1 channel2

1. Redis 서버의 Pub/Sub 상태 저장

/* pubsub.c */

/* Structure to hold the pubsub related metadata. Currently used
 * for pubsub and pubsubshard feature. */
typedef struct pubsubtype {
    int shard;
    dict *(*clientPubSubChannels)(client*);
    int (*subscriptionCount)(client*);
    kvstore **serverPubSubChannels;
    robj **subscribeMsg;
    robj **unsubscribeMsg;
    robj **messageBulk;
}pubsubtype;

Sharded Pub/Sub 기능 추가에서 처음 등장한 pubsubtype 이라는 구조체가 있다. 일반 pub/sub과 sharded pub/sub의 차이점을 추상화하기 위한 함수 포인터 테이블이다. Redis 서버 시작 시 두 개의 싱글턴 인스턴스가 초기화되고, 모든 pub/sub 함수들이 이를 파라미터로 받아 동작을 결정한다.

아래와 같이 일반 Pub/Sub 을 위한 것과 Sharded Pub/Sub 을 위한 정의가 되어있다.

/* pubsub.c */

/*
 * Pub/Sub type for global channels.
 */
pubsubtype pubSubType = {
    .shard = 0,
    .clientPubSubChannels = getClientPubSubChannels,
    .subscriptionCount = clientSubscriptionsCount,
    .serverPubSubChannels = &server.pubsub_channels,
    .subscribeMsg = &shared.subscribebulk,
    .unsubscribeMsg = &shared.unsubscribebulk,
    .messageBulk = &shared.messagebulk,
};

/*
 * Pub/Sub type for shard level channels bounded to a slot.
 */
pubsubtype pubSubShardType = {
    .shard = 1,
    .clientPubSubChannels = getClientPubSubShardChannels,
    .subscriptionCount = clientShardSubscriptionsCount,
    .serverPubSubChannels = &server.pubsubshard_channels,
    .subscribeMsg = &shared.ssubscribebulk,
    .unsubscribeMsg = &shared.sunsubscribebulk,
    .messageBulk = &shared.smessagebulk,
};

.shard 필드는 해당 pubsub 타입이 클러스터 슬롯 기반으로 분산되는지를 나타내는 플래그다.

이 구조체를 통해 Redis는 일반 pub/sub과 sharded pub/sub의 차이점을 하나의 코드베이스로 처리한다. 모든 pub/sub 관련 함수들이 pubsubtype 파라미터를 받아서 동작을 결정하는 방식이다. 예를 들어 구독을 처리하는 pubsubSubscribeChannel 함수를 보면, 클러스터가 활성화되어 있고 type.shard가 1인 경우에만 슬롯을 계산한다. 일반 pub/sub의 경우 shard가 0이므로 항상 slot=0을 사용하고, sharded pub/sub의 경우 getKeySlot()을 통해 채널명을 해시해서 실제 슬롯 번호를 구한다.

구독 시

// pubsub.c
int pubsubSubscribeChannel(client *c, robj *channel, pubsubtype type) {
    unsigned int slot = 0;
    
    // shard 값에 따른 분기 처리
    if (server.cluster_enabled && type.shard) {
        slot = getKeySlot(channel->ptr);  // 샤드면 슬롯 계산
    }
    // slot = 0 (일반 pubsub은 항상 슬롯 0 사용)

    // 저장소 선택: type.serverPubSubChannels 함수 포인터 사용
    de = kvstoreDictAddRaw(*type.serverPubSubChannels, slot, channel, &existing);
    //    ↳ 일반: &server.pubsub_channels
    //    ↳ 샤드: &server.pubsubshard_channels
    
    // 응답 메시지: type.subscribeMsg 함수 포인터 사용  
    addReplyPubsubSubscribed(c, channel, type);
    //    ↳ 일반: shared.subscribebulk ("subscribe")
    //    ↳ 샤드: shared.ssubscribebulk ("ssubscribe")
}

저장소도 다르게 사용된다. 일반 pub/sub은 server.pubsub_channels를 사용하고, sharded pub/sub은 server.pubsubshard_channels를 사용한다. 일반 pub/sub은 항상 슬롯 0에만 데이터를 저장하고, sharded pub/sub은 채널별로 계산된 실제 슬롯에 분산 저장된다.

메시지 발행 시

// pubsub.c:465 - pubsubPublishMessageInternal 함수
int pubsubPublishMessageInternal(robj *channel, robj *message, pubsubtype type) {
    unsigned int slot = 0;
    
    // 발행 시에도 동일한 슬롯 계산 로직
    if (server.cluster_enabled && type.shard) {
        slot = keyHashSlot(channel->ptr, sdslen(channel->ptr));
    }
    
    // 구독자 찾기: 동일한 슬롯에서 검색
    de = kvstoreDictFind(*type.serverPubSubChannels, slot, channel);
    if (de) {
        dict *clients = dictGetVal(de);
        // 해당 채널 구독자들에게 메시지 전송
        while ((entry = dictNext(&iter)) != NULL) {
            client *c = dictGetKey(entry);
            addReplyPubsubMessage(c, channel, message, *type.messageBulk);
            //    ↳ 일반: shared.messagebulk ("message")  
            //    ↳ 샤드: shared.smessagebulk ("smessage")
        }
    }
    
    // 패턴 매칭 (샤드는 제외)
    if (type.shard) {
        return receivers;  // 샤드는 패턴 구독 미지원
    }
    // 일반 pubsub만 패턴 매칭 수행
}

메시지 발행 시에도 동일한 로직이 적용된다. pubsubPublishMessageInternal 함수에서 구독자를 찾을 때도 동일한 슬롯 계산을 통해 해당 슬롯에서만 검색한다.

구독 해제 시

// pubsub.c:283 - pubsubUnsubscribeChannel 함수
int pubsubUnsubscribeChannel(client *c, robj *channel, int notify, pubsubtype type) {
    int slot = 0;
    
    // 구독과 동일한 슬롯 계산
    if (server.cluster_enabled && type.shard) {
        slot = getKeySlot(channel->ptr);
    }
    
    // 클라이언트에서 채널 제거
    if (dictDelete(type.clientPubSubChannels(c), channel) == DICT_OK) {
        //    ↳ 일반: c->pubsub_channels
        //    ↳ 샤드: c->pubsubshard_channels
        
        // 서버에서 클라이언트 제거
        de = kvstoreDictFind(*type.serverPubSubChannels, slot, channel);
        clients = dictGetVal(de);
        dictDelete(clients, c);
        
        // 마지막 구독자면 채널 삭제
        if (dictSize(clients) == 0) {
            kvstoreDictDelete(*type.serverPubSubChannels, slot, channel);
        }
    }
    
    // 응답: type.unsubscribeMsg 사용
    if (notify) {
        addReplyPubsubUnsubscribed(c, channel, type);
        //    ↳ 일반: shared.unsubscribebulk ("unsubscribe")
        //    ↳ 샤드: shared.sunsubscribebulk ("sunsubscribe")  
    }
}

구독 해제도 마찬가지다. 클라이언트에서 채널을 제거할 때는 type.clientPubSubChannels() 함수 포인터를 통해 일반 pub/sub은 c->pubsub_channels에서, sharded pub/sub은 c->pubsubshard_channels에서 제거한다. 서버에서도 동일한 슬롯에서 해당 클라이언트를 제거하고, 마지막 구독자였다면 채널 자체를 삭제한다.

클러스터 리밸런싱

// cluster_legacy.c
void removeChannelsInSlot(unsigned int slot) {
    if (countChannelsInSlot(slot) == 0) return;

    pubsubShardUnsubscribeAllChannelsInSlot(slot);
}


// pubsub.c
void pubsubShardUnsubscribeAllChannelsInSlot(unsigned int slot) {
    // 오직 샤드 채널만 슬롯별 정리
    kvstoreDictIterator *kvs_di = kvstoreGetDictSafeIterator(server.pubsubshard_channels, slot);
    // 해당 슬롯의 모든 샤드 채널 구독 해제
    dictEntry *de;
    while ((de = kvstoreDictIteratorNext(kvs_di)) != NULL) {
        robj *channel = dictGetKey(de);
        dict *clients = dictGetVal(de);
        /* For each client subscribed to the channel, unsubscribe it. */
        dictIterator iter;
        dictEntry *entry;

        dictInitIterator(&iter, clients);
        while ((entry = dictNext(&iter)) != NULL) {
            client *c = dictGetKey(entry);
            int retval = dictDelete(c->pubsubshard_channels, channel);
            serverAssertWithInfo(c,channel,retval == DICT_OK);
            addReplyPubsubUnsubscribed(c, channel, pubSubShardType);
            /* If the client has no other pubsub subscription,
             * move out of pubsub mode. */
            if (clientTotalPubSubSubscriptionCount(c) == 0) {
                unmarkClientAsPubSub(c);
            }
        }
        dictResetIterator(&iter);
        kvstoreDictDelete(server.pubsubshard_channels, slot, channel);
    }
    kvstoreReleaseDictIterator(kvs_di);
}

가장 중요한 차이점은 클러스터 리밸런싱 시 나타난다. pubsubShardUnsubscribeAllChannelsInSlot 함수는 오직 sharded pub/sub 채널만을 대상으로 한다. 특정 슬롯이 다른 노드로 이동할 때, 해당 슬롯에 있는 모든 sharded 채널의 구독을 자동으로 해제한다.

일반 pub/sub은 모든 채널이 슬롯 0에 저장되어 있고, 슬롯 0은 절대 이동하지 않기 때문에 재밸런싱의 영향을 받지 않는다. 하지만 sharded pub/sub은 채널이 실제 슬롯에 분산되어 있어서, 슬롯이 이동하면 해당 슬롯의 채널들은 더 이상 접근할 수 없게 되므로 구독을 해제해야 한다.

2. Slot 관리

/* Pubsub */
dict *pubsub_channels;  /* channels a client is interested in (SUBSCRIBE) */
dict *pubsub_patterns;  /* patterns a client is interested in (PSUBSCRIBE) */
dict *pubsubshard_channels;  /* shard level channels a client is interested in (SSUBSCRIBE) */
/* We have 16384 hash slots. The hash slot of a given key is obtained
 * as the least significant 14 bits of the crc16 of the key.
 *
 * However, if the key contains the {...} pattern, only the part between
 * { and } is hashed. This may be useful in the future to force certain
 * keys to be in the same node (assuming no resharding is in progress). */
static inline unsigned int keyHashSlot(char *key, int keylen) {
    int s, e; /* start-end indexes of { and } */

    for (s = 0; s < keylen; s++)
        if (key[s] == '{') break;

    /* No '{' ? Hash the whole key. This is the base case. */
    if (likely(s == keylen)) return crc16(key,keylen) & 0x3FFF;

    /* '{' found? Check if we have the corresponding '}'. */
    for (e = s+1; e < keylen; e++)
        if (key[e] == '}') break;

    /* No '}' or nothing between {} ? Hash the whole key. */
    if (e == keylen || e == s+1) return crc16(key,keylen) & 0x3FFF;

    /* If we are here there is both a { and a } on its right. Hash
     * what is in the middle between { and }. */
    return crc16(key+s+1,e-s-1) & 0x3FFF;
}

0x3FFF 는 16383 이다. 레디스의 슬롯은 0~16383 의 값을 사용하는데, 이는 2^14 개이다. 즉 레디스 클러스터는 14bit 의 슬롯을 관리한다. Sharded Pub/Sub의 경우 채널명이 CRC16 해시를 통해 특정 슬롯에 매핑되고, 그 슬롯을 담당하는 노드에서만 해당 채널의 구독/발행을 처리한다.

// pubsub.c - 슬롯별 구독자 조회
de = kvstoreDictFind(*type.serverPubSubChannels, slot, channel);

일반 pub/sub과 달리 sharded pub/sub은 각 노드가 담당하는 슬롯의 채널만 관리한다. 이를 통해:

  1. 메모리 효율성: 각 노드는 자신이 담당하는 슬롯의 채널만 저장
  2. 네트워크 효율성: 메시지가 모든 노드로 전파되지 않고 해당 슬롯 노드에서만 처리
  3. 확장성: 클러스터 크기가 커져도 각 노드의 부하가 분산됨
// cluster.h - keyHashSlot 함수에서 hash tag 처리
for (s = 0; s < keylen; s++)
    if (key[s] == '{') break;

// '{' found? Check if we have the corresponding '}'.
for (e = s+1; e < keylen; e++)
    if (key[e] == '}') break;

// Hash what is in the middle between { and }.
return crc16(key+s+1,e-s-1) & 0x3FFF;

0. Extra

슬롯 강제 배치

위에서 발췌한 슬롯 해싱 코드를 보면 알수도 있었겠지만, Hash tag를 사용하면 여러 채널을 같은 슬롯에 강제로 배치할 수 있다:

# 모두 동일한 슬롯에 배치됨
SSUBSCRIBE order{payment}
SSUBSCRIBE shipping{payment}  
SSUBSCRIBE invoice{payment}

이는 관련된 채널들을 같은 노드에서 처리하고 싶을 때 유용하다.