1 org.springframework.kafka.core 包
2 调用地方
@Autowired @Qualifier("kafkaMessageServiceImpl") private MessageService messageService;
MessageSendLog messageSendLog = getMessageSendLog(subject, content, taskSubQuery.getTaskExeId(), info.getUserId(), taskExeName, taskConfig.getCreatorId()); messageService.commonPlatform(messageSendLog);
3 原始类
@Slf4j @Service("kafkaMessageServiceImpl") @ConditionalOnExpression("!'${environment.mode}'.equals('pre')") public class KafkaMessageServiceImpl implements MessageService {@Autowired@Qualifier("kafkaTemplateSj")private KafkaTemplate<String, Object> kafkaTemplateSj;
@Override public void platform(NotificationParam notificationParam) {List<AddMessageDTO> dtoList = conformityMessageDto(notificationParam);for (AddMessageDTO dto:dtoList) {notificationParam.setReceverid(dto.getReceiver());ItemNotificationRecord itemNotificationRecord = TaskItemTranslator.saveTaskItemNotificationRel(notificationParam);try {kafkaTemplateSj.send(MqConstant.ICWP_MSG_RECEIVE, /*dto.getMessgaeId(),*/ JSON.toJSONString(dto)).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {@Overridepublic void onFailure(Throwable ex) {if (notificationParam.getFailCallback() != null) {notificationParam.getFailCallback().accept(ex.getMessage());}itemNotificationRecord.setStatus("fail");itemNotificationRecordMapper.insert(itemNotificationRecord);log.error("platform sendKafkaMsg log : send topic error, msg = {}", ex.getMessage(), ex);}@Overridepublic void onSuccess(SendResult<String, Object> result) {if (notificationParam.getSuccCallback() != null) {notificationParam.getSuccCallback().accept("success");}itemNotificationRecordMapper.insert(itemNotificationRecord);log.info("platform sendKafkaMsg log : send topic success !");}});} catch (Exception e) {itemNotificationRecord.setStatus("fail");itemNotificationRecordMapper.insert(itemNotificationRecord);log.error("record cron sendKafkaMsgForChangeEventStatus error!", e);}}}
@Override public void commonPlatform(MessageSendLog messageSendLog) {List<AddMessageDTO> dtoList = getByAddMessageNewDtOlist(messageSendLog);for (AddMessageDTO dto:dtoList) {MessageSendLog messageInfo = messageSendLogVo(messageSendLog,dto);try {kafkaTemplateSj.send(MqConstant.ICWP_MSG_RECEIVE, JSON.toJSONString(dto)).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {@Overridepublic void onFailure(Throwable ex) {if (messageSendLog.getFailCallback() != null) {messageSendLog.getFailCallback().accept(ex.getMessage());}messageInfo.setStatus("fail");messageSendLogMapper.insert(messageInfo);log.error("platform commonPlatKafkaMsg log : send topic error, msg = {}", ex.getMessage(), ex);}@Overridepublic void onSuccess(SendResult<String, Object> result) {if (messageSendLog.getSuccCallback() != null) {messageSendLog.getSuccCallback().accept("success");}messageInfo.setStatus("success");messageSendLogMapper.insert(messageInfo);log.info("platform commonPlatKafkaMsg log : send topic success !");}});} catch (Exception e) {messageInfo.setStatus("fail");messageSendLogMapper.insert(messageInfo);log.error("record cron commonPlatKafkaMsg error!", e);}} }
public final static String ICWP_MSG_RECEIVE = "icwp-message-receive";
package com.sf.gis.common.entity;import com.baomidou.mybatisplus.annotation.*; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; import lombok.Getter; import lombok.Setter;import java.time.LocalDateTime; import java.util.function.Consumer;/*** <p>* * </p>** @author xxx* @since 2024-04-22*/ @Getter @Setter @TableName("message_send_log") @ApiModel(value = "MessageSendLog对象", description = "") public class MessageSendLog {@ApiModelProperty("UUID")@TableId(value = "message_send_log_id", type = IdType.UUID)private String messageSendLogId;@ApiModelProperty("消息主题")@TableField("subject")private String subject;@ApiModelProperty("消息来源来源的业务系统(1:数字网格+、2:政法平安、3、事件分拨、 4、决策分析、5、三级工作平台)")@TableField("source")private Integer source;@ApiModelProperty("消息内容")@TableField("content")private String content;@ApiModelProperty("消息类型(0 普通、1 预警、2 待办、 3 公告) 5 领导批示 6 领导关注")@TableField("type")private Integer type;@ApiModelProperty("链接地址支持跳转到业务系统的详 情地址")@TableField("link")private String link;@ApiModelProperty("移动端链接地址")@TableField("mobile_link")private String mobileLink;@ApiModelProperty("消息等级(0 一般、1 紧急)")@TableField("priority")private Integer priority;@ApiModelProperty("发起人 ID")@TableField("creator")private String creator;@ApiModelProperty("接收者(根据 receiverType 来传参) 到人:用户 ID 到部门:部门 Code 到角色:角色 Code ")@TableField("receiver")private String receiver;@ApiModelProperty("接收者类型 0 到人、1 到部门、2 到 角色(默认 0)")@TableField("receiver_type")private Integer receiverType;@ApiModelProperty("业务类型code")@TableField("bus_code")private String busCode;@ApiModelProperty("环节状态名称")@TableField("link_state_name")private String linkStateName;@ApiModelProperty("消息发送状态")@TableField("status")private String status;@ApiModelProperty("流水号")@TableField("rel_id")private String relId;@TableField(value = "create_time", fill = FieldFill.INSERT)private LocalDateTime createTime;/*** 失败回调*/@TableField(exist = false)private Consumer<String> failCallback;/*** 成功回调*/@TableField(exist = false)private Consumer<String> succCallback;}