当前位置: 动力学知识库 > 问答 > 编程问答 >

Iterator is in failed state -kafka 0.8x

问题描述:

I have deployed my consumer as war file in wildfly 9.X. This simple consumer works fine in a standalone jar application.

When deploying the war file, i get,

java.lang.IllegalStateException: Iterator is in failed state [1]

Exception.Why is that?

I try to listen number of topics. For each topic i spawn new threads for consuming.

My consumer;

Executor

Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector

.createMessageStreams(topicMap);

for (int i = 0; i < topicMap.size(); i++) {

List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topicMap.keySet().iterator().next());

executor = Executors.newFixedThreadPool(CoreConstants.THREAD_SIZE);

for (final KafkaStream stream : streams) {

executor.submit(new KafkaMessageListenerThread(this, stream));

}

}

Thread

public void run() {

try {

ConsumerIterator<byte[], byte[]> itr = stream.iterator();

System.out.println("listens...." );

while (itr.hasNext()) {

byte[] data = itr.next().message();

.....

15:29:38,938 INFO [stdout] (ServerService Thread Pool -- 68) Kafka service is started

15:29:38,938 ERROR [stderr] (pool-8-thread-1) java.lang.IllegalStateException: Iterator is in failed state

15:29:38,938 INFO [stdout] (pool-7-thread-1) listens....

15:29:38,939 ERROR [stderr] (pool-8-thread-1) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)

15:29:38,939 ERROR [stderr] (pool-8-thread-1) at com.xx..listener.KafkaMessageListenerThread.run(KafkaMessageListenerThread.java:45)

15:29:38,939 ERROR [stderr] (pool-8-thread-1) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

15:29:38,939 ERROR [stderr] (pool-8-thread-1) at java.util.concurrent.FutureTask.run(FutureTask.java:266)

15:29:38,939 ERROR [stderr] (pool-8-thread-1) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

15:29:38,939 ERROR [stderr] (pool-8-thread-1) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

15:29:38,939 ERROR [stderr] (pool-8-thread-1) at java.lang.Thread.run(Thread.java:745)

15:29:38,940 ERROR [stderr] (pool-7-thread-1) java.lang.IllegalStateException: Iterator is in failed state

15:29:38,940 ERROR [stderr] (pool-7-thread-1) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)

15:29:38,940 ERROR [stderr] (pool-7-thread-1) at com.xxl.listener.KafkaMessageListenerThread.run(KafkaMessageListenerThread.java:45)

15:29:38,940 ERROR [stderr] (pool-7-thread-1) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

15:29:38,940 ERROR [stderr] (pool-7-thread-1) at java.util.concurrent.FutureTask.run(FutureTask.java:266)

15:29:38,940 ERROR [stderr] (pool-7-thread-1) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

15:29:38,940 ERROR [stderr] (pool-7-thread-1) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

15:29:38,940 ERROR [stderr] (pool-7-thread-1) at java.lang.Thread.run(Thread.java:745)

15:2

分享给朋友:
您可能感兴趣的文章:
随机阅读: