스프링부트

Spring Data Redis - RedisTemplate 트랜잭션

알쓸개잡 2024. 4. 25. 20:09

RedisTemplate의 트랜잭션 기능은 데이터의 일관성과 무결성을 유지하기 위해 여러 명령을 그룹화하여 원자적으로 실행해야 할 때 사용한다. 이번 포스팅에서는 RedisTemplate를 이용하여 트랜잭션을 사용하는 방법과 트랜잭션 도중 키 감시기능을 통해서 데이터의 일관성을 보장하는 방식에 대한 내용을 다루고자 한다.

 

트랜잭션 사용 사례

트랜잭션은 데이터의 일관성과 무결성을 보장하기 위해서 여러 명령 셋이 모두 성공을 해야 하는 경우, 더불어 하나라도 명령이 실패하면 이전에 수행했던 명령은 롤백이 되어야 하는 경우에 사용한다.

간단한 사용 사례를 살펴보자.

  • 재고 관리 시스템
    • 온라인 쇼핑몰에서 상품의 주문 처리 과정에서 재고 수량을 감소시키고, 관련된 주문 정보를 업데이트해야 하는 경우.
  • 은행 계좌 이체
    • 두 계좌간 금액 이체 시에 한 계좌에서 금액을 차감하고 다른 계좌에 그 금액을 추가해야 하는 경우.
  • 포인트 시스템 업데이트
    • 사용자가 특정 이벤트를 수행했을 때 포인트를 적립하고 해당 포인트에 따라 사용자 등급을 업데이트해야 하는 경우.
  • 회원 가입과 동시에 여러 서비스 활성화
    • 신규 사용자가 가입할 때 여러 초기 설정을 저장하고, 여러 서비스를 동시에 활성화 해야 하는 경우.
  • 세션 만료와 데이터 정리
    • 웹 어플리케이션에서 사용자 세션이 만료될 때 세션 키를 삭제하고 사용자의 임시 데이터를 정리해야 하는 경우.

이 외에도 트랜잭션이 필요한 경우는 무수히 많을 것이다. 그만큼 트랜잭션의 처리는 중요하다.

 

RedisCallback과 SessionCallback

Spring Data Redis에서 RedisTemplate을 이용하여 트랜잭션을 관리할 때 사용되는 인터페이스는 RedisCallback과 SessionCallback이 있다. 두 클래스의 정의는 다음과 같다.

public interface RedisCallback<T> {

	/**
	 * Method called by {@link RedisTemplate} with an active {@link RedisConnection}.
	 * <p>
	 * Callback code need not care about activating/opening or closing the {@link RedisConnection},
	 * nor handling {@link Exception exceptions}.
	 *
	 * @param connection active {@link RedisConnection Redis connection}.
	 * @return the {@link Object result} of the operation performed in the callback or {@code null}.
	 * @throws DataAccessException if the operation performed by the callback fails to execute in the context of Redis
	 * using the given {@link RedisConnection}.
	 */
	@Nullable
	T doInRedis(RedisConnection connection) throws DataAccessException;

}
public interface SessionCallback<T> {

	/**
	 * Executes all the given operations inside the same session.
	 *
	 * @param operations Redis operations
	 * @return return value
	 */
	@Nullable
	<K, V> T execute(RedisOperations<K, V> operations) throws DataAccessException;
}
  • RedisCallback의 doInRedis 메서드와 SessionCallback의 execute 메서드의 파라미터 타입을 보면 RedisCallback은 RedisConnection 인스턴스를 전달받고 SessionCallback은 RedisOperations<K, V> 인스턴스를 전달받는다.
  • RedisCallback의 경우 RedisConnection 인스턴스를 전달받기 때문에 더 낮은 수준의 접근이 가능하며 사용자가 Redis 서버와의 통신을 더 세밀하게 제어할 수 있어서 성능 최적화나 특수한 사용 사례에 유용할 수 있다. 하지만 저수준의 접근을 해야 하므로 사용이 더 어렵다.
  • SessionCallback의 경우 높은 수준의 추상화된 API를 제공(RedisOperations) 하여 트랜잭션을 비롯한 다양한 Redis 연산을 비교적 손쉽게 처리할 수 있다. 내부적으로 SessionCallback으로 전달되는 RedisOperations<K, V> 인스턴스는 RedisTemplate 인스턴스의 execute() 메서드를 호출하는 자신의 인스턴스가 전달된다.
  • RedisTemplate에서 트랜잭션을 처리할 때 SessionCallback을 사용하는 것이 일반적인 방법이라고 할 수 있다.

RedisTemplate에서 제공하는 트랜잭션의 이점은 다음과 같다.

  • 세션 연속성
    • SessionCallback은 하나의 Redis 연결을 통해 여러 연산을 수행하게 함으로써 연결을 반복적으로 열고 닫는 비용을 줄여준다.
  • 트랜잭션 관리
    • 트랜잭션 시작(multi)과 종료(exec 또는 discard)를 세션 내에서 명확하게 관리할 수 있다.
  • 데이터 무결성
    • watch 메서드와 같이 트랜잭션을 안전하게 관리할 수 있는 명령을 사용할 때 SessionCallback은 트랜잭션 내에서 명령들이 순차적으로 실행되도록 보장한다.

 

RedisTemplate 트랜잭션 관련 주요 메서드

multi() Redis 트랜잭션을 시작한다. 이 메서드 호출 이후에 실행되는 모든 명령은 실제로 서버에 전송되지 않고 큐에 저장된다. exec() 호출시 큐에 저장된 모든 명령이 원자적으로 실행된다.
exec() 큐에 저장된 모든 명령을 실행하고 트랜잭션을 종료한다. 모든 명령은 원자적으로 처리되며 명령 실행 결과를 리스트 형태로 반환한다.
discard() 현재 트랜잭션을 취소하고 큐에 저장된 모든 명령을 삭제한다. 이 메서드는 트랜잭션 중 오류가 발생했을 때 또는 조건에 따라 트랜잭션을 중단하고자 할 때 사용한다.
watch(K key) key로 전달된 redis 키를 감시한다. 이 메서드 호출 이후 감시되고 있는 키의 값이 변경되면 exec() 호출시 트랜잭션이 실패한다. (빈 결과를 리턴한다.)
watch(Collection<K> keys) keys로 전달된 redis 키들을 감시한다. 이 메서드 호출 이후 감시되고 있는 키의 값이 변경되면 exec() 호출시 트랜잭션이 실패한다. (빈 결과를 리턴한다.)
unwatch() watch()로 설정된 모든 키의 감시를 해제한다. 이 메서드는 감시를 더 이상 필요하지 않을 때, 또는 감시한 후 트랜잭션이 시작되기 전에 호출 될 수 있다.
exec()를 통해서 트랜잭션이 실행되는 경우 자동으로 감시되는 키들은 해제된다. 이 때 unwatch()를 별도로 호출할 필요는 없다.
execute(SessionCallback<T> session) SessionCallback 구현체를 인자로 전달 받아 트랜잭션을 실행하고 트랜잭션 실행 결과를 리턴 받는다.

 

 

RedisTemplate 트랜잭션 기본 사용

주문/재고 관리 케이스를 예를 들어 샘플 코드를 살펴보자.

주문/재고 관리 키는 각각 다음과 같다.

재고
stock#<productId>

주문
order#<productId>

간단히 재고, 주문의 각 키에는 정수형 타입의 수량만 저장하도록 하였다.

따라서 샘플 코드에서 사용하는 RedisTemplate은 RedisTemplate<String, Integer> 타입이 되겠다.

//Integer 타입 value 전용 RedisTemplate 빈 생성
@Bean
public RedisTemplate<String, Integer> integerRedisTemplate( LettuceConnectionFactory lettuceConnectionFactory ) {
    RedisTemplate<String, Integer> template = new RedisTemplate<>();
    template.setConnectionFactory( lettuceConnectionFactory );
    // set key serializer
    // template.setKeySerializer( new StringRedisSerializer(StandardCharsets.UTF_8) ); 와 동일함
    template.setKeySerializer( RedisSerializer.string() );

    // set value serializer
    // template.setDefaultSerializer( new GenericJackson2JsonRedisSerializer() ); 와 동일함
    template.setValueSerializer( RedisSerializer.json() );
    return template;
}

 

테스트 코드는 다음과 같다.

//아래 RedisTemplate<String, Integer> 빈이 이미 주입되어 있다.
@Autowired
RedisTemplate<String, Integer> integerRedisTemplate;

@Test
@DisplayName( "주문 / 재고 트랜잭션 테스트" )
void order_stock_transaction_test() {
    final String productId = UUID.randomUUID().toString();
    //현재 재고 수량은 10개
    final Integer initialStock = 10;
    //구매 수량은 3
    final Integer purchaseCount = 3;

    //재고 초기 수량 저장
    integerRedisTemplate.opsForValue().set( "stock#" + productId, initialStock );

    try {
        //executed 인스턴스에서는 executeTransaction 메서드 내부에서 수행된 redis 명령의 결과가 순차적으로 저장된다.
        List<Object> executed = executeTransaction( productId, purchaseCount, false );
        if ( executed != null ) {
            Assertions.assertThat( executed.size() ).isEqualTo( 2 );
            //총 구매 수량 검증
            Assertions.assertThat( executed.getFirst() ).isEqualTo( purchaseCount.longValue() );
            System.out.println("남은 재고 수량: " + executed.getLast());
        } else {
            System.out.println("남은 재고가 부족합니다.!!!");
        }
    } catch ( DataAccessException e ) {
        System.out.println(e.getMessage());
    }
}
  • 제품의 초기 재고 수량은 10개, 구매 수량은 3개인 경우에 대한 테스트 코드다.
  • executeTransaction 메서드가 트랜잭션을 수행하는 메서드이며 null 이 리턴된 경우는 구매 수량에 비해 재고 수량이 부족한 경우다.
  • 다음 항목에서 설명할 watch() 메서드 사용 여부를 지정하는 withWatch 플래그를 false로 지정하여 우선 watch() 메서드를 호출하지 않도록 하였다.
//아래 RedisTemplate<String, Integer> 빈이 이미 주입되어 있다.
@Autowired
RedisTemplate<String, Integer> integerRedisTemplate;

// 주문 / 재고 트랜잭션
// 현재 재고 수량에서 purchaseCount만큼 감소
// 현재 구매 수량에서 purchaseCount만큼 증가
List<Object> executeTransaction(String productId, Integer purchaseCount, boolean withWatch) {
    //리턴되는 List<Object> 에는
    //integerRedisTemplate.opsForValue().increment( "order#" + productId, purchaseCount );
    //integerRedisTemplate.opsForValue().decrement( "stock#" + productId, purchaseCount );
    //명령 수행 결과가 저장된다.
    return integerRedisTemplate.execute( new SessionCallback<>() {
        @Override
        //operations 파라미터에 integerRedisTemplate 인스턴스가 전달된다.
        //즉 operations 인스턴스와 integerRedisTemplate 인스턴스는 동일하다.
        public List<Object> execute( RedisOperations operations ) throws DataAccessException {
            if ( withWatch ) {
                integerRedisTemplate.watch( "stock#" + productId );
            }

            Integer currentStock = integerRedisTemplate.opsForValue().get( "stock#" + productId );
            Assertions.assertThat( currentStock ).isNotNull();
            if ( currentStock.compareTo( purchaseCount ) >= 0 ) {
                //트랜잭션 시작
                integerRedisTemplate.multi();
                //주문의 구매 수량을 purchaseCount 만큼 증가
                integerRedisTemplate.opsForValue().increment( "order#" + productId, purchaseCount );
                //재고의 수량을 purchaseCount 만큼 감소
                integerRedisTemplate.opsForValue().decrement( "stock#" + productId, purchaseCount );
                //트랜잭션 command 실행
                return integerRedisTemplate.exec();
            }
            else {
                if ( withWatch ) {
                    integerRedisTemplate.unwatch();
                }
                return null;
            }
        }
    } );
}
  • executeTransaction 메서드를 통해 제품에 대한 재고 수량을 구매 수량만큼 감소시키고, 주문 수량을 구매 수량만큼 증가시키도록 하는 트랜잭션을 수행한다.
  • integerRedisTemplate 인스턴스의 execute() 메서드에 SessionCallback 인터페이스 구현체 인스턴스를 인자로 전달한다.
  • SessionCallback 구현체에 정의한 execute(RedisOperations operations) 메서드로 전달되는 operations 인스턴스는 integerRedisTemplate 인스턴스가 전달된다.
  • RedisTemplate 클래스의 execute(SessionCallback<T> session) 메서드를 보면 이해할 수 있다.
@Override
public <T> T execute(SessionCallback<T> session) {

    Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it");
    Assert.notNull(session, "Callback object must not be null");

    RedisConnectionFactory factory = getRequiredConnectionFactory();
    // bind connection
    RedisConnectionUtils.bindConnection(factory, enableTransactionSupport);
    try {
        return session.execute(this);
    } finally {
        RedisConnectionUtils.unbindConnection(factory);
    }
}

위 코드는 RedisTemplate 클래스의 execute() 메서드 내용이다. return 문을 살펴보면 session.execute(this)로 RedisTemplate 자신의 인스턴스를 전달하는 것을 알 수 있다. 

따라서 SessionCallback 구현체의 execute(RedisOperations operations) 메서드 내에서 operations 인스턴스 대신 integerRedisTemplate 인스턴스를 사용해도 무방하다. (operations 인스턴스와 integerRedisTemplate 인스턴스는 같은 인스턴스)

operations와 integerRedisTemplate는 같은 인스턴스
operations와 integerRedisTemplate는 동일 인스턴스

 

RedisTemplate 트랜잭션의 watch()

Redis에서 트랜잭션을 수행할 때 watch() 메서드는 데이터 일관성과 무결성을 위해서 매우 중요한 역할을 한다. watch() 메서드는 redis의 optimistic locking 기능을 사용하여 지정된 하나 이상의 키를 감시하는 역할을 한다.

이는 트랜잭션 도중 감시된 키의 값이 변경되면 트랜잭션이 실패하도록 보장하는데 이를 통해 데이터 무결성을 유지하는데 중요한 역할을 한다.

 

Optimistic Locking

Optimistic Locking은 동시성 제어에서 사용되는 기법 중 하나로, 데이터가 업데이트되는 동안 다른 트랜잭션에 의해 해당 데이터가 변경되는 것을 방지하기 위해서 사용된다. Optimistic Locking은 실제로 데이터를 lock 하지 않는다.

  • 장점
    • 데이터를 락(lock) 하지 않기 때문에 리소스에 대한 오버헤드가 감소하고 대기 시간이 줄어든다. 특히나 redis와 같이 단일 스레드로 동작하는 서비스의 경우 lock은 성능상에 치명적인 문제를 발생시킬 수 있기 때문에 optimistic locking은 필수적인 요소라고 할 수 있겠다.
    • 데이터를 락(lock)하지 않기 때문에 데드락의 위험이 줄어든다.
  • 단점
    • 데이터가 변경된 경우 실행 동작에서 오류를 발생시키기 때문에 개발자가 이를 해결하는 로직을 직접 관리해야 하므로 코드 복잡도가 높아진다.

우선 트랜잭션을 사용하는 경우 watch()를 사용하지 않았을 때 발생할 수 있는 시나리오는 다음과 같다.

 

  • 현재 제품의 재고 수량은 10개다.
  • 사용자 A가 제품 3개를 주문한다.
  • 재고 주문 관리를 위해서 트랜잭션을 수행하기 전에 제품의 현재 재고 수량을 가져온다. (10개)
  • 현재 재고 수량을 가져온 뒤 다른 사용자 B가 제품 8개를 주문하여 재고 수량이 2개로 변경되었다.
  • 이미 재고 수량을 10개라고 판단했기 때문에 사용자 A가 주문한 수량에 맞게 재고 및 주문 카운트를 변경하는 트랜잭션을 수행한다.
  • 사용자 A가 주문한 제품은 3개 현재 재고는 2개 이므로 사용자 A에 대한 주문이 처리되면 안 되지만 처리된다.

위 시나리오 대로 위 트랜잭션 테스트 코드를 디버거를 통해 실행해 보자.

우선 트랜잭션을 수행하는 executeTransaction 메서드에서 현재 제품의 재고 수량을 가져온 다음 라인에 break point를 걸었다.

Integer currentStock = integerRedisTemplate.opsForValue().get( "stock#" + productId );
Assertions.assertThat( currentStock ).isNotNull(); //이곳에 break point

break point 지점에서 데이터는 다음과 같다.

break point 지점의 현재 상태

현재 재고 수량은 10이다. 이 상태에서 사용자 B의 주문에 의해서 제품의 재고를 2로 변경하였다.

127.0.0.1:6379> set stock#e9b4953f-70e9-4c8c-b6e7-9c277d59f00f 2
OK
127.0.0.1:6379> get stock#e9b4953f-70e9-4c8c-b6e7-9c277d59f00f
"2"

이 상태에서 다음 코드를 끝까지 실행하면 다음과 같은 결과가 출력된다.

남은 재고 수량: -1

이는 재고가 2개 있는 상태에서 사용자 A의 제품 3개 주문이 처리되면서 재고가 부족함에도 주문이 처리되었다는 것을 나타낸다.

 

이제 watch() 메서드를 적용하여 stock#<productId> 키를 감시하도록 하자.

코드는 다음과 같다.

@Test
@DisplayName( "주문 / 재고 트랜잭션 테스트 with watch()" )
void order_stock_transaction_watch_test() {
    final String productId = UUID.randomUUID().toString();
    //현재 재고 수량은 10개
    final Integer initialStock = 10;
    //구매 수량은 3
    final Integer purchaseCount = 3;

    //재고 초기 수량 저장
    integerRedisTemplate.opsForValue().set( "stock#" + productId, initialStock );

    try {
        List<Object> executed;

        //watch()로 인해 감시되는 key의 값이 트랜잭션 도중에 다른 connection의 명령에 의해서 변경된 경우
        //데이터 일관성이 꺠졌으므로 트랜잭션을 다시 수행한다.
        do {
            executed = executeTransaction( productId, purchaseCount, true );
            //executed 리스트에는
            //integerRedisTemplate.opsForValue().increment( "order#" + productId, purchaseCount );
            //integerRedisTemplate.opsForValue().decrement( "stock#" + productId, purchaseCount );
            //명령 수행 결과가 저장된다.
            if ( executed != null && !executed.isEmpty()) {
                Assertions.assertThat( executed.size() ).isEqualTo( 2 );
                //총 구매 수량 검증
                Assertions.assertThat( executed.getFirst() ).isEqualTo( purchaseCount.longValue() );
                System.out.println("남은 재고 수량: " + executed.getLast());
            }
        } while (executed != null && executed.isEmpty());

        if ( executed == null ) {
            System.out.println("남은 재고가 부족합니다.!!!");
        }
    } catch ( DataAccessException e ) {
        System.out.println(e.getMessage());
    }
}
  • executeTransaction 메서드의 3번째 인자인 withWatch 플래그를 true로 지정하여 watch() 메서드를 사용하도록 하였다.
  • executeTransaction 내에서 트랜잭션 실행 이전에 stock#<productId> 키를 감시하도록 하여 해당 키 값이 변경되면 트랜잭션은 실패하고 리턴되는 결과는 empty list 인스턴스가 된다.

위 코드를 시나리오 대로 다시 수행해 보자.

Integer currentStock = integerRedisTemplate.opsForValue().get( "stock#" + productId );
Assertions.assertThat( currentStock ).isNotNull(); //이곳에 break point

이전과 동일하게 동일 위치에 break point를 걸고 현재 재고 수량을 가져온 뒤에 재고 수량을 2로 변경한다.

break point 지점의 현재 상태

127.0.0.1:6379> set stock#93094293-da01-464f-bb94-01dc734c30e1 2
OK
127.0.0.1:6379> get stock#93094293-da01-464f-bb94-01dc734c30e1
"2"

이 상태에서 다음 코드를 끝까지 실행하면 다음과 같은 결과가 출력된다.

남은 재고가 부족합니다.!!!

 

로직 순서를 살펴보면

  • 트랜잭션을 실행하기 전에 stock#<productId> 키를 감시한다.
  • 현재 재고 수량을 가져온다. (10개)
  • 사용자 B에 의해서 재고 수량이 2로 변경되었다.
  • 먼저 가져온 재고 수량(10개)이 주문 수량(3개) 보다 여유가 있으므로 사용자 A 주문에 대한 트랜잭션을 수행한다.
  • 감시하고 있는 stock#<productId> 키의 값이 변경되었기 때문에 결과는 empty list가 출력된다.
  • 다시 executeTransaction 메서드를 실행한다.
  • stock#<productId> 키를 감시한다.
  • 현재 재고 수량을 가져온다. (2개)
  • 먼저 가져온 재고 수량(2개)이 주문 수량(3개) 보다 적으므로 null을 리턴한다.
  • 남은 재고가 부족합니다.!!! 메시지를 출력한다.

와 같이 되겠다. 로직은 더 복잡해졌지만 데이터 일관성을 유지할 수 있다.

 

 

Redis는 단일 스레드로 동작하는 것으로 알고 있는데 여러 프로세스에서 병렬적으로 command가 실행이 될까?

Redis 트랜잭션의 watch() 메서드를 조사하면서 위와 같은 궁금증이 문득 생겼다.

단일 스레드면 명령의 순서가 보장되는 건 아닌가? 맞다. 명령의 순서가 보장된다. redis로 전송된 순서대로 명령을 실행한다.

명령을 순서대로 실행한다는 것이지 연결을 하나만 받는 것이 아니다.

여러 연결에서 명령을 전송할 것이다. redis는 수신된 명령을 큐에 적재해 두고 하나씩 pop 하면서 실행한다.

들어오는 명령은 순서를 보장할 수 없다.

위 시나리오 대로 생각했을 때

사용자 A 주문을 처리하기 위한 connection. (A connection)

사용자 B 주문을 처리하기 위한 connection. (B connection)

이 있다.

명령이 다음과 같은 순서대로 redis로 수신된다고 해보자.

사용자 A 주문을 처리하기 위해서 A connection에서 현재 재고 상태를 조회하는 명령을 전송한다. (10개)

B connection에서 사용자 B 주문을 처리하여 재고 수량을 2로 변경하는 명령을 전송한다. (2개) - 여기서 문제 발생

A connection에서 사용자 A의 트랜잭션 명령을 실행한다.

위와 같은 순서로 명령이 수신되었을 때 문제가 발생한다는 것이다.

단순히 단일 스레드로 실행되기 때문에 데이터 일관성이 보장되는 게 아닌가? 하는 일차원적인 생각을 잠시 했었다.

 

전체 샘플 코드는 GITLAB 링크에서 확인할 수 있다.

끝.