java的一个服务cpu占用直接飙升至300%
其实这个java服务的业务很简单,就是消费kafka的公司网络安全事件报警消息,存到数据库,根据指定级别进行报警。报警是采用的钉钉群消息通知,用了一个线程池和一个newScheduledThreadPool限制一分钟只能发20条消息。多于的丢弃。
问题排查
找到cpu占用最高的线程
由上图知,cpu占用最高的进程是577456,我们看一下cpu占用最高的线程。
top -Hp 577456
得到cpu占用最高的三个线程的pid分别是577842,577838,577840
查看pid的十六进制
printf "%x\n" 577840
输出8d130
查看该线程的堆栈信息
查看前60行数据
jstack 577456 | grep 8d130 -A60
日志太长了,这里就不全部放出来了,最后观察发现,是三个kafka consumer线程,有两个会阻塞住。
[root@iZbp1amj7x33z8a3ukio89Z ~]# jstack 577456 | grep 8d130 -A60
"org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1" #369 prio=5 os_prio=0 tid=0x00007f631c430800 nid=0x8d130 runnable [0x00007f62749f5000]
java.lang.Thread.State: RUNNABLE
at java.lang.Object.notify(Native Method)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.pollHeartbeat(AbstractCoordinator.java:321)
- locked <0x000000008cab27a8> (a org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:462)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
at org.springframework.kafka.listener.KafkaMessageListenerContainerListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1283)
at org.springframework.kafka.listener.KafkaMessageListenerContainerListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1174)
at org.springframework.kafka.listener.KafkaMessageListenerContainerListenerConsumer.run(KafkaMessageListenerContainer.java:1087)
at java.util.concurrent.ExecutorsRunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
"ThreadPoolTaskScheduler-1" #368 prio=5 os_prio=0 tid=0x00007f631c42f000 nid=0x8d12f waiting on condition [0x00007f6274af6000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x000000008cae25d0> (a java.util.concurrent.locks.AbstractQueuedSynchronizerConditionObject)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at java.util.concurrent.locks.AbstractQueuedSynchronizerConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at java.util.concurrent.ScheduledThreadPoolExecutorDelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
at java.util.concurrent.ScheduledThreadPoolExecutorDelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
at java.util.concurrent.ThreadPoolExecutorWorker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
"org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1" #367 prio=5 os_prio=0 tid=0x00007f631c539800 nid=0x8d12e runnable [0x00007f6274bf7000]
java.lang.Thread.State: RUNNABLE
at java.lang.Object.notify(Native Method)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.pollHeartbeat(AbstractCoordinator.java:321)
- locked <0x000000008cae55c8> (a org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:462)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
at org.springframework.kafka.listener.KafkaMessageListenerContainerListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1283)
at org.springframework.kafka.listener.KafkaMessageListenerContainerListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1174)
at org.springframework.kafka.listener.KafkaMessageListenerContainerListenerConsumer.run(KafkaMessageListenerContainer.java:1087)
at java.util.concurrent.ExecutorsRunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
"ThreadPoolTaskScheduler-1" #366 prio=5 os_prio=0 tid=0x00007f631df1f000 nid=0x8d12d waiting on condition [0x00007f62b01db000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x000000008caf3578> (a java.util.concurrent.locks.AbstractQueuedSynchronizerConditionObject)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at java.util.concurrent.locks.AbstractQueuedSynchronizerConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at java.util.concurrent.ScheduledThreadPoolExecutorDelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
at java.util.concurrent.ScheduledThreadPoolExecutorDelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
at java.util.concurrent.ThreadPoolExecutorWorker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
我们一开始怀疑是钉钉通知的线程池和定时newScheduledThreadPool的问题,把那块注释掉了,重新复现,确实降下来了,(是假象,他是间歇性的,隔一段时间就升高,因为这个误判,导致在哪块饶了很久,第二天才发现这个现象,意识到是消费线程的问题),但是排查代码无果。
logstash消费kfka数据发送到es,但是es关机维护了,导致logstash消费数据一直消费不成功,一直占用kafka的资源,
kafka里面是nio模型,使用的线程池去处理请求,一直都被logstash占了,导致我们另一个应用3个消费者,poll的时候更多时候处于阻塞状态
占用cpu资源。当把es启动起来之后,logstash正常消费数据了,这边的cpu使用率也降低了,降低到百分之几左右。