QUEUE와 DELAYED 프로세싱
J2SE 5.0 출시에는 새로운 top-level Queue
인터페이스가 Collections Framework에 추가되어 Map
, List
, Set
인터페이스와 어우러진다. 일반적으로 queue는 먼저 들어온 것이 먼저 나가는 데이터 스트럭쳐이지만 priority queue같은 몇몇 구현들에서는 queue 뒤에 엘리먼트가 첨부되지 않는다. 이 queue와 FIFO(first in, first out)구조에 관한 얘기는 은행에서의 대기선에 비유할 수 있겠다. 은행직원은 고객이 대기선에 있는지 확인한 후 대기선의 첫번째 손님을 맞이하여 손님이 원하는 거래를 처리한다. 그리고 그 손님은 이제 대기선에서 제외된다.
J2SE 5.0에는 Queue
인터페이스와 함께 몇 가지 새로운 queue 구현이 있다. DelayQueue
가 그 중 하나인데, DelayQueue
에서 queue 안의 아이템들은 지연시간 동안 처리되지 않는다. 이번 테크팁에서는 새로운 Queue
인터페이스와 DelayQueue
구현에 대해 알아보도록 하자.
첫번째로 Queue
인터페이스를 분석해보자. 이 인터페이스는 Collection
을 확장하고 다섯개의 고유 메서드들을 추가한다.
E element()
boolean offer(E o)
E peek()
E poll()
E remove()
J2SE 5.0이 새롭게 generics를 지원하므로 여기에서의 E
는 어느 타입이든지 가능하며, Queue
가 생성될 때 정의된 엘리먼트 타입으로 결정된다.
Queue
의 엘리먼트를 추가하고 제거하는 데 당연히 Collection
인터페이스 메서드들을 사용할 수도 있지만, 그 메서드들은 동작할 때 부가적인 요구사항을 가지므로, Queue
인터페이스에서는 사용하지 않을 것을 권장하고 있다. 예를 들어 Collection.add
메서드로 queue에 엘리먼트를 추가하는 대신에 offer
메서드로 queue에 엘리먼트를 추가할 수 있다. 이 둘은 무슨 차이가 있을까? add 실행이 오류가 날 수 있다. 그 한 예는 queue에 사이즈 제한이 있을 때이다.(은행 대기선에 비유하자면, 10명만 대기 가능할 경우) Collection
의 add
메서드를 사용하면 add
는 예외를 던지면서 실패한다. 이와 비교할 때 offer
메서드는 false를 리턴하며 "실패"하게 된다. 따라서 offer
메서드를 사용하면 실제로 예외적인 상황에서만(특히 체크 안 된 런타임 예외가 던져졌을 경우) 예외처리(exception handling)을 사용하게 된다.
Queue
의 다른 네가지 메서드는 두개씩 짝지어 설명할 수 있다. remove
/poll
, element
/peek
. remove
와 poll
메서드는 둘 다 queue의 첫번째 엘리먼트 즉 "head"를 제거하는 데 사용된다. 빈 Collection 객체에서 호출되었을 때, remove
메서드는 예외를 던지고, poll
메서드는 단순히 null 값을 리턴한다. head 엘리먼트를 제거하는 대신 단지 그 엘리먼트를 살펴볼 수도 있다. 이 때 element와 peek 메서드가 사용된다. 여기서 element
메서드는 빈 queue에서는 예외를 던지고, peek
은 null값을 리턴한다. Queue는 일반적으로 태스크를 진행하는 데 사용되므로 빈 queue를 갖는 것이 예외상황일 필요는 없다. 따라서, poll
/peek
모델이 사용하기에 보다 적합할 것이다. (앞에서 본 메서드들 중 예외를 안 던지고 null 리턴하는 메서드들)
Queue
는 다음과 같은 상황에서 일반적으로 사용된다.
class Producer implements Runnable { private final Queue queue; Producer(Queue q) { queue = q; } public void run() { try { while(true) { queue.offer(produce()); } } catch (InterruptedException ex) { ... handle ...} } Object produce() { ... } } class Consumer implements Runnable { private final Queue queue; Consumer(Queue q) { queue = q; } public void run() { try { Object o; while((o = queue.poll()) != null) { consume(o); } } catch (InterruptedException ex) { ... handle ...} } void consume(Object x) { ... } }
Queue가 꽉 차 있거나(생산자의 입장에서) 비어있을 때(소비자의 입장에서) 어떤 일이 일어나는 지 궁금할 것이다. 이 시점에서 새로운 queue 인터페이스 BlockingQueue
를 설명하는 것이 좋겠다. Queue
를 사용하여 엘리먼트를 무작정 추가하거나(offer사용) 삭제하는(poll
사용) 대신 BlockingQueue
의 put
메서드로 엘리먼트를 추가하고 take
메서드로 제거할 수 있다. put
과 take
는 모두 이를 호출한 쓰레드가, 특정 조건 하에서 블로킹되게끔 한다. put
은 queue가 꽉 차 있을 경우, take
는 queue가 비어있을 경우가 블로킹 조건이다.
BlockingQueue
의 일반적인 사용패턴은 다음과 같다.
class Producer implements Runnable { private final BlockingQueue queue; Producer(BlockingQueue q) { queue = q; } public void run() { try { while(true) { queue.put(produce()); } } catch (InterruptedException ex) { ... handle ...} } Object produce() { ... } } class Consumer implements Runnable { private final BlockingQueue queue; Consumer(BlockingQueue q) { queue = q; } public void run() { try { while(true) { consume(queue.take()); } } catch (InterruptedException ex) { ... handle ...} } void consume(Object x) { ... } }
각 생성된 생산자가 꽉 찬 queue에 새로운 아이템을 추가하려 하면 put
메서드에서 기다리고, take
메서드에서는 꺼내갈 것이 추가될 때까지 기다린다. queue가 비어있다면 while(true)
조건문에는 아무런 변화도 일어나지 않을 것이다.
DelayQueue는 BlockingQueue 인터페이스의 구체적인 구현이다. DelayQueue 에 추가된 아이템들은 반드시 새로운 Delayed 인터페이스를 구현해야하며, 이 Delayed는 한 개의 메서드, long getDelay(TimeUnit unit)를 갖고 있다. DelayQueue는 우선순위 힙 데이터 스트럭쳐에 기반한 시간 기준 스케쥴링 Queue로 동작한다
데모를 위해, 다음의 프로그램 DelayTest
는 몇 초안에 실행되는 Delayed 인터페이스를 구현한다. 알아둬야할 것은1) nanosecond는 10억분의 1초, 2) nanosecond 유니트에서 작업을 가능케하는 System의 새로운 메서드, nanoTime 이 있다는 것이다. getDelay 메서드가 nanosecond로 리턴된 횟수를 필요로 하기 때문에 nanosecond에서 작업하는 것이 중요하다.
import java.util.Random; import java.util.concurrent.Delayed; import java.util.concurrent.DelayQueue; import java.util.concurrent.TimeUnit; public class DelayTest { public static long BILLION = 1000000000; static class SecondsDelayed implements Delayed { long trigger; String name; SecondsDelayed(String name, long i) { this.name = name; trigger = System.nanoTime() + (i * BILLION); } public int compareTo(Delayed d) { long i = trigger; long j = ((SecondsDelayed)d).trigger; int returnValue; if (i < j) { returnValue = -1; } else if (i > j) { returnValue = 1; } else { returnValue = 0; } return returnValue; } public boolean equals(Object other) { return ((SecondsDelayed)other).trigger == trigger; } public long getDelay(TimeUnit unit) { long n = trigger - System.nanoTime(); return unit.convert(n, TimeUnit.NANOSECONDS); } public long getTriggerTime() { return trigger; } public String getName() { return name; } public String toString() { return name + " / " + String.valueOf(trigger); } } public static void main(String args[]) throws InterruptedException { Random random = new Random(); DelayQueue<SecondsDelayed> queue = new DelayQueue<SecondsDelayed>(); for (int i=0; i < 10; i++) { int delay = random.nextInt(10); System.out.println("Delaying: " + delay + " for loop " + i); queue.add(new SecondsDelayed("loop " + i, delay)); } long last = 0; for (int i=0; i < 10; i++) { SecondsDelayed delay = (SecondsDelayed)(queue.take()); String name = delay.getName(); long tt = delay.getTriggerTime(); if (i != 0) { System.out.println("Delta: " + (tt - last) / (double)BILLION); } System.out.println(name + " / Trigger time: " + tt); last = tt; } } }
DelayTest
프로그램은 엘리먼트를 실행하기 전 DelayQueue
안에 그 10개의 엘리먼트를 위치시킨다.
다음은 DelayQueue
를 한번 구동했을 때의 출력물이다.
Delaying: 8 for loop 0 Delaying: 7 for loop 1 Delaying: 2 for loop 2 Delaying: 4 for loop 3 Delaying: 0 for loop 4 Delaying: 9 for loop 5 Delaying: 3 for loop 6 Delaying: 4 for loop 7 Delaying: 6 for loop 8 Delaying: 2 for loop 9 loop 4 / Trigger time: 1883173869520000 Delta: 1.9995545 loop 2 / Trigger time: 1883175869074500 Delta: 0.0012475 loop 9 / Trigger time: 1883175870322000 Delta: 0.9995177 loop 6 / Trigger time: 1883176869839700 Delta: 0.9995187 loop 3 / Trigger time: 1883177869358400 Delta: 6.408E-4 loop 7 / Trigger time: 1883177869999200 Delta: 2.0001667 loop 8 / Trigger time: 1883179870165900 Delta: 0.9986953 loop 1 / Trigger time: 1883180868861200 Delta: 0.9995595 loop 0 / Trigger time: 1883181868420700 Delta: 1.001262 loop 5 / Trigger time: 1883182869682700
이 출력물은 loop4를 위한 아이템이 time 0, 즉 지연 없이 시작하기로 설정되어있다는 것을 가리키고 있으며,
Delaying: 0 for loop 4
따라서 첫번째로 다음과 같이 구동한다.
loop 4 / Trigger time: 1883173869520000
Loop 2에서는 2초간의 지연이 있으며,
Delaying: 2 for loop 2
따라서 다음과 같이 나타난다.
loop 2 / Trigger time: 1883175869074500
여기서의 delta는 1.9995545로 약 2초이다.
Delta: 1.9995545
다른 loop를 위한 비슷한 delta들도 존재한다.
좀 더 실제적인 예를 위해 DelayQueue
에서 pull된 것을 단지 출력하는 대신, queue의 아이템들을 Runnable
구현시키고, 그 아이템들의 run
메서드를 호출할 수 있다.
Queue
, DelayQueue
와 J2SE 5.0에서 바뀐 다른 Collections Framework에 대한 좀 더 많은 정보를 원한다면 Collection Framework Enhancements를 참조하기 바란다.
@http://kr.sun.com/developers/techtips/c2004_1019.html