钉钉发送消息有频率限制,一分钟最多20次,超过会报错,为了防止消息丢失,下面采用消息发送到队列中,Executors.newScheduledThreadPool每分钟执行一次,并且从队列取数据,当前计数周期只取20条,超过20条之后重置次数
package com.cf.isoc.services.utils;
import cn.hutool.core.util.ObjectUtil;
import com.alibaba.fastjson.JSON;
import com.cf.support.utils.HttpClientUtil;
import com.cf.support.utils.SpringUtils;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import lombok.Data;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.*;
public class DingAlarmsUtil {
private static final Logger log = LoggerFactory.getLogger(DingAlarmsUtil.class);
private static final int MAX_NOTIFICATIONS_PER_MINUTE = 20;
private static final int RATE_LIMIT_PERIOD_SECONDS = 60;
// 初识延时10s
private static final int INITAL_DELAY_SECONDS = 10;
private static final ThreadPoolExecutor executorService = new ThreadPoolExecutor(1, 1, 1L, TimeUnit.MINUTES, new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.DiscardPolicy());
private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private static final Queue<NotificationRequest> notificationQueue = new ConcurrentLinkedQueue<>();
private static int notificationsSentThisMinute = 0;
private static DingAlarmsUtil instance = new DingAlarmsUtil();
static {
scheduler.scheduleAtFixedRate(instance::processNotificationQueue, INITAL_DELAY_SECONDS, RATE_LIMIT_PERIOD_SECONDS, TimeUnit.SECONDS);
}
private DingAlarmsUtil() {
}
public static void alarmException(String content, List<String> userIds, boolean isAtAll) {
try {
Thread.sleep(1000);
executorService.execute(() -> {
// 入队
notificationQueue.add(new NotificationRequest(content, userIds, isAtAll));
});
} catch (Exception e) {
log.error("alarmException", e);
}
}
/**
* 消费队列数据
*/
private void processNotificationQueue() {
// rateLimit
while (notificationsSentThisMinute < MAX_NOTIFICATIONS_PER_MINUTE) {
log.info("[*]processNotificationQueue notificationQueue池中还有:" + notificationQueue.size());
NotificationRequest notificationRequest = notificationQueue.poll();
if (ObjectUtil.isNull(notificationRequest)) {
break;
}
sendDingDing(notificationRequest);
notificationsSentThisMinute++;
}
// 重置计数
if (notificationsSentThisMinute >= MAX_NOTIFICATIONS_PER_MINUTE) {
resetNotificationsSent();
}
}
/**
* 重置
*/
private static void resetNotificationsSent() {
notificationsSentThisMinute = 0;
}
/**
* 发送钉钉通知
*
* @param request
*/
private void sendDingDing(NotificationRequest request) {
try {
Map<String, String> config = getConfig();
if (config == null) {
log.error("获取配置出错: 配置文件钉钉配置不全");
return;
}
String baseUrl = config.get("baseUrl");
String sign = config.get("sign");
// 钉钉机器人地址(配置机器人的webhook)
long timestamp = System.currentTimeMillis();
String stringToSign = timestamp + "\n" + sign;
Mac mac = Mac.getInstance("HmacSHA256");
mac.init(new SecretKeySpec(sign.getBytes(StandardCharsets.UTF_8), "HmacSHA256"));
byte[] signData = mac.doFinal(stringToSign.getBytes(StandardCharsets.UTF_8));
String dingUrl = baseUrl + "×tamp=" + timestamp + "&sign=" + URLEncoder.encode(new String(Base64.encodeBase64(signData)), "UTF-8");
// 组装请求内容
String reqStr = packageBuildReqData(request.getContent(), request.isAtAll(), request.getUserIds());
// 推送消息(http请求)
String result = HttpClientUtil.post(dingUrl, reqStr);
log.info("result: {}---notificationsSentThisMinute:{}", result, notificationsSentThisMinute);
// System.out.println("time:" + System.currentTimeMillis() + "---" + notificationsSentThisMinute);
} catch (Exception e) {
log.error("sendDingDingError: {}", e.getMessage());
}
}
/**
* 获取配置
*
* @return
*/
private static Map<String, String> getConfig() {
String defaultPrefix = "dingtalk";
String baseUrl = SpringUtils.getProperty(defaultPrefix + ".url");
if (StringUtils.isBlank(baseUrl)) {
log.error("配置[{}.url]不存在", defaultPrefix);
return null;
}
String sign = SpringUtils.getProperty(defaultPrefix + ".sign");
if (StringUtils.isBlank(sign)) {
log.error("配置[{}.sign]不存在", defaultPrefix);
return null;
}
return ImmutableMap.of("baseUrl", baseUrl, "sign", sign);
}
/**
* 组装请求报文
*/
private static String packageBuildReqData(String content, boolean isAtAll, List<String> userIds) {
Map<String, Object> atMap = Maps.newHashMap();
atMap.put("isAtAll", isAtAll);
atMap.put("atUserIds", userIds);
Map<String, String> contentMap = Maps.newHashMap();
contentMap.put("content", content);
Map<String, Object> reqMap = Maps.newHashMap();
reqMap.put("msgtype", "text");
reqMap.put("text", contentMap);
reqMap.put("at", atMap);
return JSON.toJSONString(reqMap);
}
}
@Data
class NotificationRequest {
private String content;
private List<String> userIds;
private boolean isAtAll;
public NotificationRequest(String content, List<String> userIds, boolean isAtAll) {
this.content = content;
this.userIds = userIds;
this.isAtAll = isAtAll;
}
}
配置文件
dingtalk:
url:
sign: