티스토리 뷰

[RabbitMQ] 원격 프로시저 호출 (RPC)

(using the Bunny client)


두 번째 강의에서 여러 worker들 사이에서 시간이 소요되는 작업을 분배하는 Work Queue에 관해서 배웠습니다.


하지만 만약 원격지의 함수를 실행시키고 해당 결과를 기다리는 경우에는 어떨까요? 이 경우는 약간 다른 경우입니다. 흔히 보통 이런 경우를 원격 프로시저 호출 또는 RPC(Remote Procedure Call)라고 알려져 있습니다.


이번 강의에서 RabbitMQ를 RPC 시스템을 구축하는데 사용해볼 것 입니다: 클라이언트와 확장가능한 RPC 서버로 구성됩니다. 실제의 시간이 소요되는 작업이 없기 때문에 피보나치 수열을 반환하는 가짜 RPC 서비스를 제작해볼 것 입니다.


전제조건

이 튜토리얼에서 RabbitMQ가 localhost에 설치되어있고 기본 포트인 5672로 동작 중인 것을 가정으로 진행됩니다. 다른 호스트나, 포트를 사용할 경우 적절한 세팅을 하셔야합니다.



클라이언트 인터페이스

RPC 서비스가 어떻게 사용되는지 묘사하기 위해 간단한 클라이언트 클래스를 만들어볼 것 입니다. RPC 요청을 보내고 답이 수신될 때 까지 기다리는 call 이름의 메서드를 만들 것 입니다.

client   = FibonacciClient.new(ch, "rpc_queue")
response = client.call(30)
puts " [.] Got #{response}"


RPC에 대해 알아두기

컴퓨터 세계에서 RPC는 흔한 패턴이지만, 종종 비판을 받기도 했습니다. 이는 프로그래머가 함수가 로컬에 있는지 혹은 느린 RPC인지 인지를 못한다면 문제가 야기될 수 있습니다. 이 경우에 예상치 못한 시스템 오류가 발생할 수 있거나 디버깅하기에 난해함을 초래할 수 있습니다. 허나 간편하게 사용하기위해 RPC를 사용하지 않든다면, 유지보수하기 힘든 스파게티 코드가 될 가능성도 있습니다.


위의 내용을 명심하며, 다음과 같은 조언을 고려해주세요.

> 호출하는 함수가 로컬에 있는지 원격지에 있는지 명백히 알아야 합니다.

> 여러분의 시스템을 문서화하세요. 구성요소간의 의존성을 명백히 하셔야 합니다.

> 에러 발생 경우를 처리하세요. RPC 서버가 오래동안 죽을 경우 클라언트는 어떻게 반응해야할 까요?


만약 RPC를 사용하지 않는다면, 비동기 파이프라인을 사용해야 합니다. 응답 결과는 비동기적으로 다음 계산 단계로 보내져야합니다.



Callback queue

RabbitMQ를 통해 RPC를 수행하는 것은 간단합니다. 클라이언트는 메시지를 요청하고 서버는 응당 메시비를 통해 답장을 보내면 됩니다. 응답을 받기위해 요청 시에 콜백 queue의 주소를 보낼 필요가 있습니다. 기본으로 제공되는 queue를 사용할 수 있습니다.

q = ch.queue("", :exclusive => true)
x = ch.default_exchange

x.publish(message, :routing_key => "rpc_queue", :reply_to => q.name)

# ... then code to read a response message from the callback_queue ...


메시지 속성

AMQP 프로토콜은 메시지에 포함되는 14가지의 속성을 사전정의 해놨습니다. 대부분의 속성들은 아래의 것을 제외한 경우 거의 사용되지 않습니다.


> :persistent : true 로 설정할 경우 영속성을 가지고 false 로 설정할 경우 가지지 않습니다.

> :content_type: 인코딩 mime-type 명시합니다. 예를들어 자주 사용되는 JSON의 경우 application/json 으로 설정합니다.

> :reply_to: callback queue 의 이름을 명시합니다.

> :correlation_id: 요청 때 RPC 응답과 연계 시 사용됩니다.



Correlation Id

위에서 RPC 요청을 위한 callback queue를 만들어봤습니다. 이 방법은 비효율적이지만, 다행이도 더 나은 방법이 존재합니다. - 클라이언트 당 개인 callback queue를 만들어볼 것 입니다.


이 경우 새로운 문제가 발생하는데, 어느 요청에 어느 큐가 응답을 해야되는지 명백하지가 않습니다. 그래서 :correlation_id 속성이 사용됩니다. 우리는 모든 요청마다 고유의 값을 설정할 것 입니다. 나중에 callback queue가 메시지를 수신할 때 이 속성을 살펴볼 것 입니다. 이를 기반으로 요청에 맞는 응답을 일치시킬 수 있습니다. 확인되지 않은 :correlation_id 값이 수신되면, 안전하게 메시지를 제거할 것 입니다.


왜 확인되지 않은 메시지가 callback queue에 수신되면 에러를 발생시키지 않고 무시하는지 의문을 가질 수 있습니다. 이는 서버 단의 race condition 이 가능해질 수도 있기 때문입니다. 잘 일어날 것 같진 않지만 RPC 서버가 응답을 보내고 acknowledgment 메시지를 보내기 전에 죽을 수 도 있습니다. 이럴 경우 재시작된 RPC 서버는  그 작업을 다시 처리하게 됩니다. 따라서 클라이언트는 중복된 응답을 적절히 처리할 수 있어야 됩니다.


요약



저희가 만들 RPC는 이와 같이 진행될 것 입니다.

> 클라이언트가 시작되면, 익명의 전용 callback queue가 생성됩니다.

> RPC 요청을 위해, 클라이언트는 두 가지의 속성과 함께 메시지를 전송합니다. - callback queue에 셋팅되는 :reply_to 와 모든 요청 각각의 고유한 값을 설정하는 :correlation_id 입니다.

> 요청은 rpc_queue 이름의 queue에 보내집니다.

> RPC 서버는 해당 큐가 요청을 받길 기다립니다. 요청이 오면 해당 작업을 처리하고 :reply_to 필드에 명시되어있는 클라이언트에게 다시 메시지를 전송합니다.

> 클라이언트는 callback queue로 부터 응답을 기다립니다. 메시지가 수신되면 :correlation_id 속성 값을 확인합니다. 요청 때의 값과 일치하면 어플리케이션에게 응답을 리턴합니다.



종합하기

피보나치 함수입니다:

def self.fib(n)
  case n
  when 0 then 0
  when 1 then 1
  else
    fib(n - 1) + fib(n - 2)
  end
end

저희가 선언할 피보나치 함수입니다. 오직 양의 정수만 입력된다고 가정합니다.(너무 큰 수를 처리하려고 하지마세요, 이는 가장 느린 재귀형태의 방법입니다.)


RPC 서버의 rpc_sever.rb 코드입니다:

#!/usr/bin/env ruby
# encoding: utf-8

require "bunny"

conn = Bunny.new
conn.start

ch   = conn.create_channel

class FibonacciServer

  def initialize(ch)
    @ch = ch
  end

  def start(queue_name)
    @q = @ch.queue(queue_name)
    @x = @ch.default_exchange

    @q.subscribe(:block => true) do |delivery_info, properties, payload|
      n = payload.to_i
      r = self.class.fib(n)

      @x.publish(r.to_s, :routing_key => properties.reply_to, :correlation_id => properties.correlation_id)
    end
  end

  def self.fib(n)
    case n
    when 0 then 0
    when 1 then 1
    else
      fib(n - 1) + fib(n - 2)
    end
  end
end

begin
  server = FibonacciServer.new(ch)
  puts " [x] Awaiting RPC requests"
  server.start("rpc_queue")
rescue Interrupt => _
  ch.close
  conn.close
end

(rpc_server.rb 소스코드)


서버 코드는 다소 직관입니다.

> 늘 그래왔듯이 connection 과 channel 을 생성하는 것부터 시작하며, queue를 선업합니다.

> 한 대 이상의 서버 프로세스를 구동시킬 수 도 있습니다. 이 경우 적절한 업무 분배를 위해 channel에 prefetch 를 설정하는 것이 바랍직합니다.

> queue로 부터 메시지를 수신할 때 Bunny::Queue#subscribe를 사용합니다. 그리고 요청 메시지를 기다리는 while 반복문에 진입합니다. 그리고 해당 작업을 처리하고 응답을 돌려줍니다.



RPC client인 rpc_client.rb 코드입니다:

#!/usr/bin/env ruby
# encoding: utf-8

require "bunny"
require "thread"

conn = Bunny.new(:automatically_recover => false)
conn.start

ch   = conn.create_channel

class FibonacciClient
  attr_reader :reply_queue
  attr_accessor :response, :call_id
  attr_reader :lock, :condition

  def initialize(ch, server_queue)
    @ch             = ch
    @x              = ch.default_exchange

    @server_queue   = server_queue
    @reply_queue    = ch.queue("", :exclusive => true)

    @lock      = Mutex.new
    @condition = ConditionVariable.new
    that       = self

    @reply_queue.subscribe do |delivery_info, properties, payload|
      if properties[:correlation_id] == that.call_id
        that.response = payload.to_i
        that.lock.synchronize{that.condition.signal}
      end
    end
  end

  def call(n)
    self.call_id = self.generate_uuid

    @x.publish(n.to_s,
      :routing_key    => @server_queue,
      :correlation_id => call_id,
      :reply_to       => @reply_queue.name)

    lock.synchronize{condition.wait(lock)}
    response
  end

  protected

  def generate_uuid
    # very naive but good enough for code
    # examples
    "#{rand}#{rand}#{rand}"
  end
end

client   = FibonacciClient.new(ch, "rpc_queue")
puts " [x] Requesting fib(30)"
response = client.call(30)
puts " [.] Got #{response}"

ch.close
conn.close

(rpc_client.rb 소스코드)


이제 RPC 서비스는 준비되었습니다. 서버를 실행시킬 수 있습니다:

$ ruby -rubygems rpc_server.rb
 [x] Awaiting RPC requests


피보나치 수열을 요청할 클라이언트를 실행시킵니다:

$ ruby -rubygems rpc_client.rb
 [x] Requesting fib(30)


여기서 사용한 디자인은 RPC 서비스를 구현 가능하게 할 뿐만아니라 매우 중요한 장점을 가진다:

> 만약 RPC 서버가 너무 느리다면, 다른 하나를 더 실행시킴으로써 확장할 수 있다. 새로운 콘솔에 두 번째 rpc_server.rb를 실행시켜보세요.

> 클라이언트 측면에서, RPC 요청은 오직 하나의 메시지로 주고 받습니다. Bunny::Channel#queue 처럼 동기적 호출이 필요하지 않습니다. 그 결과로 RPC 클라이언트는 단일 RPC 요청을 위한 오직 하나의 왕복 네트워크만 필요합니다.


예제의 코드는 매우 단순화시킨 것이므로 아래처럼 복잡한 문제를 해결할 수는 없습니다:

> 만약 동작 중인 서버가 없을 때는 클라이언트는 어떻게 반응하는가?

> 클라이언트는 RPC 응답 대기시간의 제한을 가지고 있는가?

> 만약 서버가 오동작을 하고 예외를 일으킨다면, 클라이언트에게 전송되어져야 하는가? 

> 유효하지 않은 메시지(ex. 허용범위 초과, 허용 타입 예외)가 온다면 처리하기 전 어떻게 대응하는가?


다양한 실험을 해보고 싶다면, rabbitmq-management 플러그인에서 유용한 것들을 찾아보세요.


'프로그래밍 > RabbitMQ' 카테고리의 다른 글

[RabbitMQ] 원격 프로시저 호출 (RPC)  (0) 2015.07.11
[RabbitMQ] Topics  (0) 2015.07.10
[RabbitMQ] Routing  (0) 2015.07.09
[RabbitMQ] Publish/Subscribe  (1) 2015.07.06
[RabbitMQ] Work Queues  (0) 2015.07.05
[Message Queue] RabbitMQ란?  (0) 2015.07.04
댓글