Search results for 'queue'

Apache Kafka 0.9.0.0 변경점

아파치 카프카 0.9.0.0 는 이전 버전 대비 인증, SSL레이어 추가 등 많은 변화가 있었습니다.
kafka topic 중심의 변경점은 다음과 같음.

변경
  • 더 이상 Java 1.6 지원하지않음.
  • 더 이상 Scala 2.9 지원하지 않음.
  • 1000이상의 Broker ID는 자동으로 Broker ID를 할당하기 위해 예약되어 있음. 이미 1000 이상의 Broker ID를 사용하고 있다면 Broker Configuration의 reserved.broker.max.id 값을 임계값 이상으로 설정 해야 함.
  • replica.lag.max.messages 설정은 제거되었으며, 동기화 되는 복제본 결정할 때 파티션 리더는 더 이상 후행(lag) 메시지의 수를 고려 하지 않음.
  • replica.lag.time.max.ms 설정은 마지막 복제 요청으로 경과한 시간이 아닌 복제가 마지막으로 이뤄진 시간까지를 포함. 복제는 여전히 리더로부터 패치하며  replica.lag.time.max.ms 시간내에 복제가 되지 않으면 sync가 어긋난것으로 간주.
  • log.cleaner.enable 는 true 가 기본값이 됨. 이는 cleanup.policy=compact 을 설정 시 topic은 기본적으로 compact 적용, 128MB 힙사이즈가 log.cleaner.dedupe.buffer.size 설정을 통한 clear process에게 할당. compacted topics의 사용량에 따라 log.cleaner.dedupe.buffer.size 와 다른 log.cleaner 계열의 설정을 조정해야 할 수 있음.
  • MirrorMaker는 더 이상 multiple target clusters를 지원하지 않음. 그 결과로 단일 consumer.config 설정만 허용되며, 다중 소스 클러스터를 미러링 하려면 각각의 consumer configuration의 소스 클러스터당 적어도 하나 이상의 MirrorMaker 인스턴스가 필요함.
  • org.apache.kafka.clients.tools.* 는 org.apache.kafka.tools.* 로 이관. 
  • kafka-run-class.sh 내의 JVM 성능 옵션 (KAFKA_JVM_PERFORMANCE_OPTS) 변경.
  • kafka-topics.sh 스크립트 ( kafka.admin.TopicCommand )는 이제 실행 실패 시 0이 아닌 종료 코드로 종료.
  • kafka-topics.sh 스크립트 ( kafka.admin.TopicCommand )는 이제 '.' 이나 '_' 등 topic 이름이 충돌할 수 있는 경우 경고 메세지를 출력.
  • kafka-console-producer.sh 스크립트 ( kafka.tools.ConsoleProducer )는 기본값으로 이전 생산자 대신 새 프로듀서 를 사용,  기존의 프로듀서를 사용하려면  이전버전의 producer 사용을 명시 해야 함.
  • 기본적으로 명령줄 메세지는 stderr를 통해 출력.


삭제
  • kafka-topics.sh script (kafka.admin.TopicCommand)를 통한 topic 변경 명령은 deprecate됨. 앞으로는 kafka-configs.sh script (kafka.admin.ConfigCommand)를 사용할 것.
  • 오프셋을 확인용 명령 kafka-consumer-offset-checker.sh (kafka.tools.ConsumerOffsetChecker) 역시 deprecate됨. kafka-consumer-groups.sh (kafka.admin.ConsumerGroupCommand)를 사용 할 것.
  • kafka.tools.ProducerPerformance 클래스 deprecate 됨. org.apache.kafka.tools.ProducerPerformance 클래스를 사용 할 것. kafka-producer-perf-test.sh 역시 새로운 클래스 사용으로 변경 됨.
2016/02/03 09:53 2016/02/03 09:53
Trackback Address:이 글에는 트랙백을 보낼 수 없습니다

QUEUE와 DELAYED 프로세싱

J2SE 5.0 출시에는 새로운 top-level Queue 인터페이스가 Collections Framework에 추가되어 Map, List, Set 인터페이스와 어우러진다. 일반적으로 queue는 먼저 들어온 것이 먼저 나가는 데이터 스트럭쳐이지만 priority queue같은 몇몇 구현들에서는 queue 뒤에 엘리먼트가 첨부되지 않는다. 이 queue와 FIFO(first in, first out)구조에 관한 얘기는 은행에서의 대기선에 비유할 수 있겠다. 은행직원은 고객이 대기선에 있는지 확인한 후 대기선의 첫번째 손님을 맞이하여 손님이 원하는 거래를 처리한다. 그리고 그 손님은 이제 대기선에서 제외된다.

J2SE 5.0에는 Queue 인터페이스와 함께 몇 가지 새로운 queue 구현이 있다. DelayQueue가 그 중 하나인데, DelayQueue에서 queue 안의 아이템들은 지연시간 동안 처리되지 않는다. 이번 테크팁에서는 새로운 Queue 인터페이스와 DelayQueue 구현에 대해 알아보도록 하자.

첫번째로 Queue 인터페이스를 분석해보자. 이 인터페이스는 Collection을 확장하고 다섯개의 고유 메서드들을 추가한다.

  • E element()
  • boolean offer(E o)
  • E peek()
  • E poll()
  • E remove()

J2SE 5.0이 새롭게 generics를 지원하므로 여기에서의 E는 어느 타입이든지 가능하며, Queue가 생성될 때 정의된 엘리먼트 타입으로 결정된다.

Queue의 엘리먼트를 추가하고 제거하는 데 당연히 Collection 인터페이스 메서드들을 사용할 수도 있지만, 그 메서드들은 동작할 때 부가적인 요구사항을 가지므로, Queue 인터페이스에서는 사용하지 않을 것을 권장하고 있다. 예를 들어 Collection.add 메서드로 queue에 엘리먼트를 추가하는 대신에 offer 메서드로 queue에 엘리먼트를 추가할 수 있다. 이 둘은 무슨 차이가 있을까? add 실행이 오류가 날 수 있다. 그 한 예는 queue에 사이즈 제한이 있을 때이다.(은행 대기선에 비유하자면, 10명만 대기 가능할 경우) Collectionadd 메서드를 사용하면 add는 예외를 던지면서 실패한다. 이와 비교할 때 offer 메서드는 false를 리턴하며 "실패"하게 된다. 따라서 offer 메서드를 사용하면 실제로 예외적인 상황에서만(특히 체크 안 된 런타임 예외가 던져졌을 경우) 예외처리(exception handling)을 사용하게 된다.

Queue의 다른 네가지 메서드는 두개씩 짝지어 설명할 수 있다. remove/poll, element/peek. removepoll 메서드는 둘 다 queue의 첫번째 엘리먼트 즉 "head"를 제거하는 데 사용된다. 빈 Collection 객체에서 호출되었을 때, remove 메서드는 예외를 던지고, poll 메서드는 단순히 null 값을 리턴한다. head 엘리먼트를 제거하는 대신 단지 그 엘리먼트를 살펴볼 수도 있다. 이 때 element와 peek 메서드가 사용된다. 여기서 element 메서드는 빈 queue에서는 예외를 던지고, peek은 null값을 리턴한다. Queue는 일반적으로 태스크를 진행하는 데 사용되므로 빈 queue를 갖는 것이 예외상황일 필요는 없다. 따라서, poll/peek 모델이 사용하기에 보다 적합할 것이다. (앞에서 본 메서드들 중 예외를 안 던지고 null 리턴하는 메서드들)

Queue는 다음과 같은 상황에서 일반적으로 사용된다.

   class Producer implements Runnable {
     private final Queue queue;
     Producer(Queue q) { queue = q; }
     public void run() {
       try {
         while(true) { queue.offer(produce()); }
       } catch (InterruptedException ex) { ... handle ...}
     }
     Object produce() { ... }
   }

   class Consumer implements Runnable {
     private final Queue queue;
     Consumer(Queue q) { queue = q; }
     public void run() {
       try {
         Object o;
         while((o = queue.poll()) != null) { consume(o); }
       } catch (InterruptedException ex) { ... handle ...}
     }
     void consume(Object x) { ... }
   }

Queue가 꽉 차 있거나(생산자의 입장에서) 비어있을 때(소비자의 입장에서) 어떤 일이 일어나는 지 궁금할 것이다. 이 시점에서 새로운 queue 인터페이스 BlockingQueue를 설명하는 것이 좋겠다. Queue를 사용하여 엘리먼트를 무작정 추가하거나(offer사용) 삭제하는(poll 사용) 대신 BlockingQueueput 메서드로 엘리먼트를 추가하고 take메서드로 제거할 수 있다. puttake는 모두 이를 호출한 쓰레드가, 특정 조건 하에서 블로킹되게끔 한다. put은 queue가 꽉 차 있을 경우, take는 queue가 비어있을 경우가 블로킹 조건이다.

BlockingQueue의 일반적인 사용패턴은 다음과 같다.

   class Producer implements Runnable {
     private final BlockingQueue queue;
     Producer(BlockingQueue q) { queue = q; }
     public void run() {
       try {
         while(true) { queue.put(produce()); }
       } catch (InterruptedException ex) { ... handle ...}
     }
       Object produce() { ... }
  }
  
   class Consumer implements Runnable {
     private final BlockingQueue queue;
     Consumer(BlockingQueue q) { queue = q; }
     public void run() {
       try {
         while(true) { consume(queue.take()); }
       } catch (InterruptedException ex) { ... handle ...}
     }
     void consume(Object x) { ... }
   }

각 생성된 생산자가 꽉 찬 queue에 새로운 아이템을 추가하려 하면 put메서드에서 기다리고, take 메서드에서는 꺼내갈 것이 추가될 때까지 기다린다. queue가 비어있다면 while(true) 조건문에는 아무런 변화도 일어나지 않을 것이다.

DelayQueue는 BlockingQueue 인터페이스의 구체적인 구현이다. DelayQueue 에 추가된 아이템들은 반드시 새로운 Delayed 인터페이스를 구현해야하며, 이 Delayed는 한 개의 메서드, long getDelay(TimeUnit unit)를 갖고 있다. DelayQueue는 우선순위 힙 데이터 스트럭쳐에 기반한 시간 기준 스케쥴링 Queue로 동작한다

데모를 위해, 다음의 프로그램 DelayTest는 몇 초안에 실행되는 Delayed 인터페이스를 구현한다. 알아둬야할 것은1) nanosecond는 10억분의 1초, 2) nanosecond 유니트에서 작업을 가능케하는 System의 새로운 메서드, nanoTime 이 있다는 것이다. getDelay 메서드가 nanosecond로 리턴된 횟수를 필요로 하기 때문에 nanosecond에서 작업하는 것이 중요하다.

   import java.util.Random;
   import java.util.concurrent.Delayed;
   import java.util.concurrent.DelayQueue;
   import java.util.concurrent.TimeUnit;

   public class DelayTest {
     public static long BILLION = 1000000000;
     static class SecondsDelayed implements Delayed { 
       long trigger;
       String name;
       SecondsDelayed(String name, long i) { 
         this.name = name;
         trigger = System.nanoTime() + (i * BILLION);
       }
       public int compareTo(Delayed d) {
         long i = trigger;
         long j = ((SecondsDelayed)d).trigger;
         int returnValue;
         if (i < j) {
           returnValue = -1;
         } else if (i > j) {
           returnValue = 1;
         } else {
           returnValue = 0;
         }
         return returnValue;
       }
       public boolean equals(Object other) {
         return ((SecondsDelayed)other).trigger == trigger;
       }
       public long getDelay(TimeUnit unit) {
         long n = trigger - System.nanoTime();
         return unit.convert(n, TimeUnit.NANOSECONDS);
       }
       public long getTriggerTime() {
         return trigger;
       }
       public String getName() {
         return name;
       }
       public String toString() {
         return name + " / " + String.valueOf(trigger);
       }
     }
     public static void main(String args[]) 
             throws InterruptedException {
       Random random = new Random();
       DelayQueue<SecondsDelayed> queue = 
             new DelayQueue<SecondsDelayed>();
       for (int i=0; i < 10; i++) {
         int delay = random.nextInt(10);
         System.out.println("Delaying: " + 
               delay + " for loop " + i);
         queue.add(new SecondsDelayed("loop " + i, delay));
       }
       long last = 0;
       for (int i=0; i < 10; i++) {
         SecondsDelayed delay = (SecondsDelayed)(queue.take());
         String name = delay.getName();
         long tt = delay.getTriggerTime();
         if (i != 0) {
           System.out.println("Delta: " + 
                 (tt - last) / (double)BILLION);
         }
         System.out.println(name + " / Trigger time: " + tt);
         last = tt;
       }
     }
   }

DelayTest 프로그램은 엘리먼트를 실행하기 전 DelayQueue 안에 그 10개의 엘리먼트를 위치시킨다.

다음은 DelayQueue를 한번 구동했을 때의 출력물이다.

   Delaying: 8 for loop 0
   Delaying: 7 for loop 1
   Delaying: 2 for loop 2
   Delaying: 4 for loop 3
   Delaying: 0 for loop 4
   Delaying: 9 for loop 5
   Delaying: 3 for loop 6
   Delaying: 4 for loop 7
   Delaying: 6 for loop 8
   Delaying: 2 for loop 9
   loop 4 / Trigger time: 1883173869520000
   Delta: 1.9995545
   loop 2 / Trigger time: 1883175869074500
   Delta: 0.0012475
   loop 9 / Trigger time: 1883175870322000
   Delta: 0.9995177
   loop 6 / Trigger time: 1883176869839700
   Delta: 0.9995187
   loop 3 / Trigger time: 1883177869358400
   Delta: 6.408E-4
   loop 7 / Trigger time: 1883177869999200
   Delta: 2.0001667
   loop 8 / Trigger time: 1883179870165900
   Delta: 0.9986953
   loop 1 / Trigger time: 1883180868861200
   Delta: 0.9995595
   loop 0 / Trigger time: 1883181868420700
   Delta: 1.001262
   loop 5 / Trigger time: 1883182869682700

이 출력물은 loop4를 위한 아이템이 time 0, 즉 지연 없이 시작하기로 설정되어있다는 것을 가리키고 있으며,

   Delaying: 0 for loop 4

따라서 첫번째로 다음과 같이 구동한다.

   loop 4 / Trigger time: 1883173869520000

Loop 2에서는 2초간의 지연이 있으며,

   Delaying: 2 for loop 2

따라서 다음과 같이 나타난다.

   loop 2 / Trigger time: 1883175869074500

여기서의 delta는 1.9995545로 약 2초이다.

   Delta: 1.9995545

다른 loop를 위한 비슷한 delta들도 존재한다.

좀 더 실제적인 예를 위해 DelayQueue에서 pull된 것을 단지 출력하는 대신, queue의 아이템들을 Runnable 구현시키고, 그 아이템들의 run 메서드를 호출할 수 있다.

Queue, DelayQueue 와 J2SE 5.0에서 바뀐 다른 Collections Framework에 대한 좀 더 많은 정보를 원한다면 Collection Framework Enhancements를 참조하기 바란다.


@http://kr.sun.com/developers/techtips/c2004_1019.html

2007/03/16 18:01 2007/03/16 18:01
Trackback Address:이 글에는 트랙백을 보낼 수 없습니다