목록으로

CompletableFuture로 블록체인 RPC 호출 5개를 동시에

1

CompletableFuture로 블록체인 RPC 호출 5개를 동시에

→ CompletableFuture는 Java에서 비동기 작업을 처리하기 위한 클래스다. 여러 작업을 동시에 시작하고, 각각의 결과를 나중에 합칠 수 있다.

2026년 2월 9일 — TX 응답속도 개선 및 비동기 병렬 처리 최적화 2026년 2월 10일 — 사전검증 RPC 병렬 조회 추가 적용

문제: 검증만 하는데 1초가 걸린다

토큰을 전송하기 전에 5가지를 확인해야 한다.

  1. 컨트랙트가 일시정지 상태인가?
  2. 보내는 계정이 동결됐는가?
  3. 받는 계정이 동결됐는가?
  4. 잔액이 충분한가?
  5. 현재 블록 타임스탬프는?

각각이 블록체인 RPC 호출이고, 한 번에 100200ms 걸린다. → RPC(Remote Procedure Call)란 네트워크를 통해 원격 서버의 함수를 호출하는 방식이다. 블록체인에서는 노드에 잔액 조회, 상태 확인 등을 요청할 때 쓴다. 순차로 5번이면 500ms1초. 전송 자체보다 검증이 더 오래 걸리는 상황이었다.

해결: 5개를 동시에 날리고, 모아서 판단

fun transferWithAuthorization(fromAddress: String, toAddress: String, amount: BigDecimal, ...) {
    // 5개 검증을 동시에 시작
    val pausedFuture = CompletableFuture.supplyAsync { isPaused() }
    val frozenFromFuture = CompletableFuture.supplyAsync { isFrozen(fromAddress) }
    val frozenToFuture = CompletableFuture.supplyAsync { isFrozen(toAddress) }
    val balanceFuture = CompletableFuture.supplyAsync { balanceOf(fromAddress) }
    val blockTimestampFuture = CompletableFuture.supplyAsync {
        try {
            web3j.ethGetBlockByNumber(DefaultBlockParameterName.LATEST, false)
                .send().block.timestamp.toLong()
        } catch (e: Exception) {
            System.currentTimeMillis() / 1000  // 실패 시 서버 시간으로 대체
        }
    }

    // 결과 수집 + 검증
    if (joinUnwrap(pausedFuture)) throw IllegalStateException("컨트랙트 일시정지 상태")
    if (joinUnwrap(frozenFromFuture)) throw IllegalStateException("송신자 계정 동결")
    if (joinUnwrap(frozenToFuture)) throw IllegalStateException("수신자 계정 동결")
    val balance = joinUnwrap(balanceFuture)
    if (balance < amount) throw IllegalStateException("잔액 부족: $balance < $amount")
    val blockTimestamp = joinUnwrap(blockTimestampFuture)
}

5개 호출이 동시에 나가고, 가장 느린 것 기준으로 ~200ms면 전부 끝난다. 순차 대비 약 2.5배 빨라졌다.

joinUnwrap: CompletionException 벗기기

CompletableFuture.join()은 내부 예외를 CompletionException으로 감싼다. → join()은 비동기 작업이 끝날 때까지 기다렸다가 결과를 반환하는 메서드다. 작업 중 에러가 나면 원래 예외를 CompletionException으로 한 번 더 감싸서 던지기 때문에, 원래 예외를 꺼내는 언래핑(unwrapping) 작업이 필요하다.

private fun <T> joinUnwrap(future: CompletableFuture<T>): T {
    try {
        return future.join()
    } catch (e: CompletionException) {
        throw e.cause ?: e  // 원래 예외를 꺼내서 던짐
    }
}

이게 없으면 "잔액 부족" 같은 비즈니스 예외가 CompletionException에 감싸져서, 에러 핸들러에서 제대로 분기되지 않는다.

배치 결제에서도 같은 패턴

여러 지갑의 잔액을 조회할 때, 같은 지갑이 여러 번 나올 수 있다 (같은 사용자의 다른 Leg). 중복 제거 후 병렬 조회:

val balanceMap = normalizedFromAddresses.distinct()  // 중복 제거
    .associateWith { address ->
        CompletableFuture.supplyAsync { tokenContractService.balanceOf(address) }
    }
    .mapValues {
        try { it.value.join() }
        catch (e: CompletionException) { throw e.cause ?: e }
    }

distinct()로 같은 주소는 한 번만 조회하고, 결과를 Map으로 저장해서 Leg별로 참조한다.

전용 스레드풀을 쓴 이유

@Bean(name = ["ioExecutor"])
fun ioExecutor(): ExecutorService {
    return ThreadPoolExecutor(
        4, 32, 60L, TimeUnit.SECONDS,
        LinkedBlockingQueue(100),
        { r -> Thread(r, "io-executor").apply { isDaemon = true } },
        ThreadPoolExecutor.CallerRunsPolicy()
    )
}

CompletableFuture.supplyAsync { ... }는 기본적으로 ForkJoinPool.commonPool()을 쓴다. → ForkJoinPool.commonPool()은 JVM이 기본 제공하는 공용 스레드풀이다. CPU 코어 수에 맞춰 스레드가 제한되어 있어서, 계산 위주 작업에는 적합하지만 네트워크 대기가 긴 I/O 작업에는 적합하지 않다. → CPU 바운드는 연산이 주된 작업(예: 암호화), I/O 바운드는 네트워크/디스크 대기가 주된 작업(예: API 호출)을 뜻한다. 이건 CPU 바운드 작업용이라, I/O 대기(RPC 호출)로 스레드가 잠기면 다른 작업까지 영향을 받는다.

전용 ioExecutor를 만들어서:

  • CPU 바운드와 I/O 바운드를 격리
  • 최대 32개 스레드로 동시 RPC 호출 가능
  • 큐가 가득 차면 CallerRunsPolicy로 호출자 스레드에서 직접 실행 (요청 거부 대신 느려지기) → CallerRunsPolicy는 스레드풀이 가득 찼을 때 작업을 거부하지 않고, 작업을 요청한 스레드가 직접 실행하게 하는 정책이다. 시스템이 멈추지 않고 속도만 느려진다.

blockTimestamp 조회 실패 시 fallback

val blockTimestampFuture = CompletableFuture.supplyAsync {
    try {
        web3j.ethGetBlockByNumber(DefaultBlockParameterName.LATEST, false)
            .send().block.timestamp.toLong()
    } catch (e: Exception) {
        log.warn("블록 타임스탬프 조회 실패, 서버 시간 사용")
        System.currentTimeMillis() / 1000
    }
}

5개 중 4개가 성공했는데 타임스탬프 하나 때문에 전체가 실패하면 아까우니까, 서버 시간으로 대체한다. 어차피 validAfter/validBefore에 ±60초 여유를 두고 있어서 서버 시간으로도 충분하다.

배운 점

  • 독립적인 I/O 호출이 여러 개면, 무조건 병렬화를 고려하자
  • CompletionException 언래핑을 빼먹으면 에러 핸들링이 깨진다
  • 같은 데이터를 여러 번 조회하는 패턴이 보이면 distinct() + Map 캐싱
  • I/O 바운드 작업은 ForkJoinPool.commonPool() 말고 전용 스레드풀을 쓰자
CompletableFuture로 블록체인 RPC 호출 5개를 동시에 | KYUDORI