欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 健康 > 美食 > java中 kafka简单应用

java中 kafka简单应用

2025/2/22 20:08:26 来源:https://blog.csdn.net/HUHAO987654321/article/details/145708585  浏览:    关键词:java中 kafka简单应用
1 org.springframework.kafka.core 包

2 调用地方

@Autowired
@Qualifier("kafkaMessageServiceImpl")
private MessageService messageService;

MessageSendLog messageSendLog = getMessageSendLog(subject, content, taskSubQuery.getTaskExeId(), info.getUserId(), taskExeName, taskConfig.getCreatorId());
messageService.commonPlatform(messageSendLog);

3 原始类

@Slf4j
@Service("kafkaMessageServiceImpl")
@ConditionalOnExpression("!'${environment.mode}'.equals('pre')")
public class KafkaMessageServiceImpl implements MessageService {@Autowired@Qualifier("kafkaTemplateSj")private KafkaTemplate<String, Object> kafkaTemplateSj;

@Override
public void platform(NotificationParam notificationParam) {List<AddMessageDTO> dtoList = conformityMessageDto(notificationParam);for (AddMessageDTO dto:dtoList) {notificationParam.setReceverid(dto.getReceiver());ItemNotificationRecord itemNotificationRecord = TaskItemTranslator.saveTaskItemNotificationRel(notificationParam);try {kafkaTemplateSj.send(MqConstant.ICWP_MSG_RECEIVE, /*dto.getMessgaeId(),*/ JSON.toJSONString(dto)).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {@Overridepublic void onFailure(Throwable ex) {if (notificationParam.getFailCallback() != null) {notificationParam.getFailCallback().accept(ex.getMessage());}itemNotificationRecord.setStatus("fail");itemNotificationRecordMapper.insert(itemNotificationRecord);log.error("platform sendKafkaMsg  log : send topic error, msg = {}", ex.getMessage(), ex);}@Overridepublic void onSuccess(SendResult<String, Object> result) {if (notificationParam.getSuccCallback() != null) {notificationParam.getSuccCallback().accept("success");}itemNotificationRecordMapper.insert(itemNotificationRecord);log.info("platform sendKafkaMsg log : send topic success !");}});} catch (Exception e) {itemNotificationRecord.setStatus("fail");itemNotificationRecordMapper.insert(itemNotificationRecord);log.error("record cron sendKafkaMsgForChangeEventStatus error!", e);}}}

@Override
public void commonPlatform(MessageSendLog messageSendLog) {List<AddMessageDTO> dtoList = getByAddMessageNewDtOlist(messageSendLog);for (AddMessageDTO dto:dtoList) {MessageSendLog messageInfo = messageSendLogVo(messageSendLog,dto);try {kafkaTemplateSj.send(MqConstant.ICWP_MSG_RECEIVE, JSON.toJSONString(dto)).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {@Overridepublic void onFailure(Throwable ex) {if (messageSendLog.getFailCallback() != null) {messageSendLog.getFailCallback().accept(ex.getMessage());}messageInfo.setStatus("fail");messageSendLogMapper.insert(messageInfo);log.error("platform commonPlatKafkaMsg  log : send topic error, msg = {}", ex.getMessage(), ex);}@Overridepublic void onSuccess(SendResult<String, Object> result) {if (messageSendLog.getSuccCallback() != null) {messageSendLog.getSuccCallback().accept("success");}messageInfo.setStatus("success");messageSendLogMapper.insert(messageInfo);log.info("platform commonPlatKafkaMsg log : send topic success !");}});} catch (Exception e) {messageInfo.setStatus("fail");messageSendLogMapper.insert(messageInfo);log.error("record cron commonPlatKafkaMsg error!", e);}}
}
public final static String ICWP_MSG_RECEIVE = "icwp-message-receive";

package com.sf.gis.common.entity;import com.baomidou.mybatisplus.annotation.*;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Getter;
import lombok.Setter;import java.time.LocalDateTime;
import java.util.function.Consumer;/*** <p>* * </p>** @author xxx* @since 2024-04-22*/
@Getter
@Setter
@TableName("message_send_log")
@ApiModel(value = "MessageSendLog对象", description = "")
public class MessageSendLog {@ApiModelProperty("UUID")@TableId(value = "message_send_log_id", type = IdType.UUID)private String messageSendLogId;@ApiModelProperty("消息主题")@TableField("subject")private String subject;@ApiModelProperty("消息来源来源的业务系统(1:数字网格+、2:政法平安、3、事件分拨、 4、决策分析、5、三级工作平台)")@TableField("source")private Integer source;@ApiModelProperty("消息内容")@TableField("content")private String content;@ApiModelProperty("消息类型(0 普通、1 预警、2 待办、 3 公告) 5 领导批示 6 领导关注")@TableField("type")private Integer type;@ApiModelProperty("链接地址支持跳转到业务系统的详 情地址")@TableField("link")private String link;@ApiModelProperty("移动端链接地址")@TableField("mobile_link")private String mobileLink;@ApiModelProperty("消息等级(0 一般、1 紧急)")@TableField("priority")private Integer priority;@ApiModelProperty("发起人 ID")@TableField("creator")private String creator;@ApiModelProperty("接收者(根据 receiverType 来传参) 到人:用户 ID 到部门:部门 Code 到角色:角色 Code ")@TableField("receiver")private String receiver;@ApiModelProperty("接收者类型 0 到人、1 到部门、2 到 角色(默认 0)")@TableField("receiver_type")private Integer receiverType;@ApiModelProperty("业务类型code")@TableField("bus_code")private String busCode;@ApiModelProperty("环节状态名称")@TableField("link_state_name")private String linkStateName;@ApiModelProperty("消息发送状态")@TableField("status")private String status;@ApiModelProperty("流水号")@TableField("rel_id")private String relId;@TableField(value = "create_time", fill = FieldFill.INSERT)private LocalDateTime createTime;/*** 失败回调*/@TableField(exist = false)private Consumer<String> failCallback;/*** 成功回调*/@TableField(exist = false)private Consumer<String> succCallback;}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词