mq消息队列消费者代码编写
parent
868fc51f2d
commit
bef9b98546
|
@ -27,6 +27,7 @@ public class RocketMQFactory {
|
|||
configMap = RocketUtil.initMQConfigMap(configName);
|
||||
CONFIG_MAPS.put(configName, configMap);
|
||||
}
|
||||
// 最大重试次数
|
||||
int maxReconsumeTimes = Util.getIntValue(Util.null2String(configMap.get("MaxReconsumeTimes")), RocketMQConstant.DEFAULT_MAX_RECONSUME_TIMES);
|
||||
// 声明一个消费者consumer,需要传入一个组 weaver-consumer
|
||||
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(Util.null2String(configMap.get("ConsumerGroup")));
|
||||
|
|
|
@ -1,5 +1,8 @@
|
|||
package weaver.xuanran.wang.shyl.mq.constant;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* <h1>RocketMQ常量</h1>
|
||||
*
|
||||
|
@ -12,4 +15,20 @@ public class RocketMQConstant {
|
|||
public static final String DELETE_ACTION = "DELETE_ACTION";
|
||||
public static final String PASSWORD_ACTION = "PASSWORD_ACTION";
|
||||
public static final int DEFAULT_MAX_RECONSUME_TIMES = 5;
|
||||
public static final String SEX_GIRL = "1";
|
||||
public static final String SEX_BOY = "2";
|
||||
public static final String STATUS_ENABLE = "1";
|
||||
public static final String STATUS_NO_ENABLE = "4";
|
||||
public static final String ID_TYPE_UNKNOWN = "0";
|
||||
public static final String ID_TYPE_ID_CARD = "1";
|
||||
public static final String ID_TYPE_PASSPORT = "3";
|
||||
public static final String ID_TYPE_STU_CARD = "4";
|
||||
public static final String ID_TYPE_SOLDIER_CARD = "5";
|
||||
public static final String DEFAULT_PASSWORD = "1";
|
||||
|
||||
public static Map<String, String> SEX_MAPPING = new HashMap<>();
|
||||
|
||||
static {
|
||||
SEX_MAPPING.put(SEX_BOY, "0");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -9,6 +9,7 @@ import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
|
|||
import org.apache.rocketmq.common.message.MessageExt;
|
||||
import weaver.xuanran.wang.shyl.mq.RocketMQFactory;
|
||||
import weaver.xuanran.wang.shyl.mq.RocketMQListener;
|
||||
import weaver.xuanran.wang.shyl.mq.service.impl.OrgServiceImpl;
|
||||
import weaver.xuanran.wang.shyl.mq.util.RocketUtil;
|
||||
|
||||
import java.util.List;
|
||||
|
@ -24,6 +25,7 @@ public class OrgConsumer extends RocketMQListener {
|
|||
|
||||
private static final Logger log = Util.getLogger();
|
||||
private static final String CONFIG_NAME = "OrgConsumer";
|
||||
private final OrgServiceImpl orgService = new OrgServiceImpl();
|
||||
|
||||
public OrgConsumer(){
|
||||
super(CONFIG_NAME);
|
||||
|
@ -31,11 +33,7 @@ public class OrgConsumer extends RocketMQListener {
|
|||
|
||||
@Override
|
||||
public MessageListenerConcurrently service() {
|
||||
return (List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext)->{
|
||||
Map<String, Object> map = RocketMQFactory.CONFIG_MAPS.get(CONFIG_NAME);
|
||||
log.info(Util.logStr("{} service config is {}", CONFIG_NAME, JSONObject.toJSONString(map)));
|
||||
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
|
||||
};
|
||||
return (List<MessageExt> msg, ConsumeConcurrentlyContext consumeConcurrentlyContext) -> RocketUtil.execute(msg, consumeConcurrentlyContext, orgService, CONFIG_NAME);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -9,6 +9,8 @@ import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
|
|||
import org.apache.rocketmq.common.message.MessageExt;
|
||||
import weaver.xuanran.wang.shyl.mq.RocketMQFactory;
|
||||
import weaver.xuanran.wang.shyl.mq.RocketMQListener;
|
||||
import weaver.xuanran.wang.shyl.mq.service.impl.PassWordServiceImpl;
|
||||
import weaver.xuanran.wang.shyl.mq.util.RocketUtil;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -28,13 +30,11 @@ public class PassWordConsumer extends RocketMQListener {
|
|||
super(CONFIG_NAME);
|
||||
}
|
||||
|
||||
private final PassWordServiceImpl passWordService = new PassWordServiceImpl();
|
||||
|
||||
@Override
|
||||
public MessageListenerConcurrently service() {
|
||||
return (List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext)->{
|
||||
Map<String, Object> map = RocketMQFactory.CONFIG_MAPS.get(CONFIG_NAME);
|
||||
log.info(Util.logStr("{} service config is {}", CONFIG_NAME, JSONObject.toJSONString(map)));
|
||||
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
|
||||
};
|
||||
return (List<MessageExt> msg, ConsumeConcurrentlyContext consumeConcurrentlyContext) -> RocketUtil.execute(msg, consumeConcurrentlyContext, passWordService, CONFIG_NAME);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,11 +1,14 @@
|
|||
package weaver.xuanran.wang.shyl.mq.entity;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
* <h1>密码修改</h1>
|
||||
*
|
||||
* @Author xuanran.wang
|
||||
* @Date 2022/12/30 13:59
|
||||
*/
|
||||
@Data
|
||||
public class ModifyPassWord {
|
||||
/**
|
||||
* <h2>主键</h2>
|
||||
|
|
|
@ -3,6 +3,8 @@ package weaver.xuanran.wang.shyl.mq.mapper;
|
|||
import aiyh.utils.annotation.recordset.ParamMapper;
|
||||
import aiyh.utils.annotation.recordset.Update;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* <h1>消费者mapper</h1>
|
||||
*
|
||||
|
@ -15,7 +17,6 @@ public interface ConsumerMapper {
|
|||
* <h1>通过outKey更新人员状态为离职</h1>
|
||||
* @author xuanran.wang
|
||||
* @dateTime 2022/12/30 14:33
|
||||
* @param outKey 外部系统id
|
||||
* @return 更新成功/失败
|
||||
**/
|
||||
@Update("update hrmresource set status = 5 where outkey = #{outKey}")
|
||||
|
@ -31,5 +32,44 @@ public interface ConsumerMapper {
|
|||
@Update("delete hrmdepartment where outkey = #{outKey}")
|
||||
boolean deleteDepartmentByOutKey(@ParamMapper("outKey") String outKey);
|
||||
|
||||
/**
|
||||
* <h1>通过outKey获取部门数据信息</h1>
|
||||
* @author xuanran.wang
|
||||
* @dateTime 2022/12/30 14:33
|
||||
* @param outKey 外部系统id
|
||||
* @return map key : id, val : 分部id
|
||||
**/
|
||||
@Update("select id departmentId, subcompanyid1 subCompanyId from hrmdepartment where outkey = #{outKey}")
|
||||
Map<String, Integer> getDepInfoByOutKey(@ParamMapper("outKey") String outKey);
|
||||
|
||||
/**
|
||||
* <h1>通过outKey获取人力资源id</h1>
|
||||
* @author xuanran.wang
|
||||
* @dateTime 2022/12/30 14:33
|
||||
* @param outKey 外部系统id
|
||||
* @return id
|
||||
**/
|
||||
@Update("select id from hrmresource where outkey = #{outKey}")
|
||||
String getHrmIdByOutKey(@ParamMapper("outKey") String outKey);
|
||||
|
||||
/**
|
||||
* <h1>通过outKey获取oa部门id</h1>
|
||||
* @author xuanran.wang
|
||||
* @dateTime 2022/12/30 14:33
|
||||
* @param outKey 外部系统id
|
||||
* @return id
|
||||
**/
|
||||
@Update("select id from hrmdepartment where outkey = #{outKey}")
|
||||
String getDepIdByOutKey(@ParamMapper("outKey") String outKey);
|
||||
|
||||
/**
|
||||
* <h1>通过outKey获取oa部门id</h1>
|
||||
* @author xuanran.wang
|
||||
* @dateTime 2022/12/30 14:33
|
||||
* @param id 外部系统id
|
||||
* @return true/false
|
||||
**/
|
||||
@Update("update hrmresource set password = #{password} where id = #{id}")
|
||||
boolean updatePasswordById(@ParamMapper("id") String id,
|
||||
@ParamMapper("password") String password);
|
||||
}
|
||||
|
|
|
@ -1,9 +1,12 @@
|
|||
package weaver.xuanran.wang.shyl.mq.service;
|
||||
|
||||
import aiyh.utils.Util;
|
||||
import org.apache.log4j.Logger;
|
||||
import weaver.conn.RecordSet;
|
||||
import weaver.xuanran.wang.shyl.mq.mapper.ConsumerMapper;
|
||||
import weaver.xuanran.wang.shyl.mq.service.interfaces.CreateAction;
|
||||
import weaver.xuanran.wang.shyl.mq.service.interfaces.DeleteAction;
|
||||
import weaver.xuanran.wang.shyl.mq.service.interfaces.PassWordAction;
|
||||
import weaver.xuanran.wang.shyl.mq.service.interfaces.UpdateAction;
|
||||
|
||||
/**
|
||||
|
@ -12,9 +15,27 @@ import weaver.xuanran.wang.shyl.mq.service.interfaces.UpdateAction;
|
|||
* @Author xuanran.wang
|
||||
* @Date 2022/12/30 13:04
|
||||
*/
|
||||
public abstract class CusInfoActionService implements CreateAction, DeleteAction, UpdateAction {
|
||||
public abstract class CusInfoActionService implements CreateAction, DeleteAction, UpdateAction, PassWordAction {
|
||||
protected final RecordSet recordSet = new RecordSet();
|
||||
|
||||
protected final Logger logger = Util.getLogger();
|
||||
|
||||
/**
|
||||
* <h2>consumer-mapper</h2>
|
||||
**/
|
||||
protected final ConsumerMapper consumerMapper = Util.getMapper(ConsumerMapper.class);
|
||||
|
||||
/**
|
||||
* <h1>获取下一个人员id</h1>
|
||||
* @author xuanran.wang
|
||||
* @dateTime 2023/01/03 10:50
|
||||
* @return next id
|
||||
**/
|
||||
protected synchronized String getNextHrmId(){
|
||||
recordSet.executeProc("HrmResourceMaxId_Get", "");
|
||||
recordSet.next();
|
||||
//新增的部门id
|
||||
return Util.null2String(recordSet.getInt(1));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,9 +1,22 @@
|
|||
package weaver.xuanran.wang.shyl.mq.service.impl;
|
||||
|
||||
import aiyh.utils.Util;
|
||||
import aiyh.utils.excention.CustomerException;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
|
||||
import weaver.conn.RecordSet;
|
||||
import weaver.general.TimeUtil;
|
||||
import weaver.hrm.company.DepartmentComInfo;
|
||||
import weaver.interfaces.hrm.HrmServiceManager;
|
||||
import weaver.matrix.MatrixUtil;
|
||||
import weaver.xuanran.wang.shyl.mq.entity.MQMessage;
|
||||
import weaver.xuanran.wang.shyl.mq.entity.Org;
|
||||
import weaver.xuanran.wang.shyl.mq.service.CusInfoActionService;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* <h1></h1>
|
||||
*
|
||||
|
@ -11,18 +24,151 @@ import weaver.xuanran.wang.shyl.mq.service.CusInfoActionService;
|
|||
* @Date 2022/12/30 15:05
|
||||
*/
|
||||
public class OrgServiceImpl extends CusInfoActionService {
|
||||
/**
|
||||
* <h1>部门创建</h1>
|
||||
* @author xuanran.wang
|
||||
* @dateTime 2023/1/3 14:05
|
||||
* @param message mq消息
|
||||
* @return 成功/重试
|
||||
**/
|
||||
@Override
|
||||
public ConsumeConcurrentlyStatus cusCreateAction(MQMessage message) {
|
||||
return null;
|
||||
try {
|
||||
String content = message.getContent();
|
||||
Org org = JSONObject.parseObject(content, Org.class);
|
||||
char separator = weaver.general.Util.getSeparator();
|
||||
// 分部id
|
||||
String subId = "";
|
||||
RecordSet insertRs = new RecordSet();
|
||||
if(StringUtils.isBlank(subId)){
|
||||
throw new CustomerException("SubCompany can not be empey!");
|
||||
}
|
||||
//使用存储过程新增分部
|
||||
String para = org.getOrgName() + separator + org.getOrgName() + separator +
|
||||
"" + separator + "" + separator + subId + separator + org.getSortIndex() + separator + "";
|
||||
insertRs.executeProc("HrmDepartment_Insert", para);
|
||||
if (insertRs.next()) {
|
||||
int depId = insertRs.getInt(1);
|
||||
updateDepartmentInfo(String.valueOf(depId), subId, org);
|
||||
}
|
||||
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
||||
}catch (Exception e){
|
||||
throw new CustomerException(Util.logStr("orgCreateAction error : {}", e.getMessage()));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* <h1>部门删除</h1>
|
||||
* @author xuanran.wang
|
||||
* @dateTime 2023/1/3 13:59
|
||||
* @param message mq消息
|
||||
* @return 成功/重试
|
||||
**/
|
||||
@Override
|
||||
public ConsumeConcurrentlyStatus cusDeleteAction(MQMessage message) {
|
||||
return null;
|
||||
try {
|
||||
String content = message.getContent();
|
||||
Org org = JSONObject.parseObject(content, Org.class);
|
||||
String id = org.getId();
|
||||
if(StringUtils.isBlank(id)){
|
||||
throw new CustomerException("userInfo id can not be empty!");
|
||||
}
|
||||
boolean success = consumerMapper.deleteDepartmentByOutKey(id);
|
||||
if (!success) {
|
||||
throw new CustomerException(Util.logStr("update user status error!"));
|
||||
}
|
||||
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
||||
}catch (Exception e){
|
||||
throw new CustomerException(Util.logStr("orgDeleteAction execute error : [{}]!", e.getMessage()));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConsumeConcurrentlyStatus cusUpdateAction(MQMessage message) {
|
||||
try {
|
||||
String content = message.getContent();
|
||||
Org org = JSONObject.parseObject(content, Org.class);
|
||||
String outKey = org.getId();
|
||||
// 分部待明确
|
||||
String subId = "";
|
||||
String depId = consumerMapper.getDepIdByOutKey(outKey);
|
||||
if(StringUtils.isBlank(depId)){
|
||||
throw new CustomerException(Util.logStr("The department information data obtained by foreign key is empty!"));
|
||||
}
|
||||
updateDepartmentInfo(depId, subId, org);
|
||||
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
||||
}catch (Exception e){
|
||||
throw new CustomerException(Util.logStr("orgUpdateAction error : {}", e.getMessage()));
|
||||
}
|
||||
}
|
||||
|
||||
public void updateDepartmentInfo(String depId, String subId, Org org){
|
||||
String insertSQL = "update hrmdepartment set created = ?, creater = ?, modified = ?, modifier = ?," +
|
||||
" departmentcode = ?, tlevel = ?, showorder = ?, canceled = ?,departmentmark = ?, " +
|
||||
" departmentname = ?, supdepid = ?, subcompanyid1 = ?, outkey = ? where id = ?";
|
||||
RecordSet updateRs = new RecordSet();
|
||||
List<Object> params = initDepartParam(String.valueOf(depId), subId, org);
|
||||
if (!updateRs.executeUpdate(insertSQL, params)) {
|
||||
throw new CustomerException(Util.logStr("insert HrmDepartment error sql : {}, params : {}", insertSQL, JSONObject.toJSONString(params)));
|
||||
}
|
||||
DepartmentComInfo dci = new DepartmentComInfo();
|
||||
//清除全部部门缓存
|
||||
dci.removeCompanyCache();
|
||||
//新增单个部门缓存
|
||||
HrmServiceManager hrmServiceManager = new HrmServiceManager();
|
||||
hrmServiceManager.SynInstantDepartment(String.valueOf(depId), "2");
|
||||
//同步分部数据到矩阵
|
||||
MatrixUtil.updateDepartmentData("" + depId);
|
||||
}
|
||||
|
||||
/**
|
||||
* <h1>封装部门更新/插入参数集合</h1>
|
||||
* @author xuanran.wang
|
||||
* @dateTime 2023/1/3 13:34
|
||||
* @param depId oa部门id
|
||||
* @param org 部门
|
||||
* @return 更新/插入参数
|
||||
**/
|
||||
private List<Object> initDepartParam(String depId, String subId, Org org){
|
||||
List<Object> params = new ArrayList<>();
|
||||
String currentTime = TimeUtil.getCurrentTimeString();
|
||||
// 创建时间
|
||||
params.add(currentTime);
|
||||
// 创建人
|
||||
params.add("1");
|
||||
// 修改时间
|
||||
params.add(currentTime);
|
||||
// 修改人
|
||||
params.add("1");
|
||||
// 部门编码
|
||||
params.add(org.getOrgCode());
|
||||
// 层级结构
|
||||
params.add(org.getSortIndex());
|
||||
// 显示顺序
|
||||
params.add(org.getSortIndex());
|
||||
// 是否封存
|
||||
params.add("0");
|
||||
// 部门标识
|
||||
params.add(org.getOrgName());
|
||||
// 部门名称
|
||||
params.add(org.getOrgName());
|
||||
String parentId = org.getParentId();
|
||||
// 父级部门id
|
||||
String supDepId = "";
|
||||
if(StringUtils.isNotBlank(parentId)){
|
||||
supDepId = consumerMapper.getDepIdByOutKey(parentId);
|
||||
}
|
||||
params.add(supDepId);
|
||||
// 分部
|
||||
params.add(subId);
|
||||
// out key
|
||||
params.add(org.getId());
|
||||
params.add(depId);
|
||||
return params;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConsumeConcurrentlyStatus cusPassWordAction(MQMessage message) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,52 @@
|
|||
package weaver.xuanran.wang.shyl.mq.service.impl;
|
||||
|
||||
import aiyh.utils.Util;
|
||||
import aiyh.utils.excention.CustomerException;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
|
||||
import weaver.xuanran.wang.shyl.mq.entity.MQMessage;
|
||||
import weaver.xuanran.wang.shyl.mq.entity.ModifyPassWord;
|
||||
import weaver.xuanran.wang.shyl.mq.service.CusInfoActionService;
|
||||
|
||||
/**
|
||||
* <h1>密码修改业务方法</h1>
|
||||
*
|
||||
* @author xuanran.wang
|
||||
* @date 2023/1/3 16:07
|
||||
*/
|
||||
public class PassWordServiceImpl extends CusInfoActionService {
|
||||
@Override
|
||||
public ConsumeConcurrentlyStatus cusCreateAction(MQMessage message) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConsumeConcurrentlyStatus cusDeleteAction(MQMessage message) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConsumeConcurrentlyStatus cusPassWordAction(MQMessage message) {
|
||||
try {
|
||||
String content = message.getContent();
|
||||
ModifyPassWord passWord = JSONObject.parseObject(content, ModifyPassWord.class);
|
||||
String outKey = passWord.getId();
|
||||
String hrmId = consumerMapper.getHrmIdByOutKey(outKey);
|
||||
if(StringUtils.isBlank(hrmId)){
|
||||
throw new CustomerException(Util.logStr("the userId is {} , no personnel information found in oa!", hrmId));
|
||||
}
|
||||
if (!consumerMapper.updatePasswordById(hrmId, Util.getEncrypt(passWord.getPassword()))) {
|
||||
throw new CustomerException("update user password error!");
|
||||
}
|
||||
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
||||
} catch (Exception e) {
|
||||
throw new CustomerException(Util.logStr("passWordAction execute error : [{}]!", e.getMessage()));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConsumeConcurrentlyStatus cusUpdateAction(MQMessage message) {
|
||||
return null;
|
||||
}
|
||||
}
|
|
@ -3,12 +3,27 @@ package weaver.xuanran.wang.shyl.mq.service.impl;
|
|||
import aiyh.utils.Util;
|
||||
import aiyh.utils.excention.CustomerException;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.weaver.general.TimeUtil;
|
||||
import org.apache.commons.collections.MapUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
|
||||
import weaver.conn.RecordSet;
|
||||
import weaver.hrm.finance.SalaryManager;
|
||||
import weaver.hrm.resource.ResourceComInfo;
|
||||
import weaver.xiao.commons.utils.JsonResult;
|
||||
import weaver.xuanran.wang.shyl.mq.constant.RocketMQConstant;
|
||||
import weaver.xuanran.wang.shyl.mq.entity.MQMessage;
|
||||
import weaver.xuanran.wang.shyl.mq.entity.UserInfo;
|
||||
import weaver.xuanran.wang.shyl.mq.mapper.ConsumerMapper;
|
||||
import weaver.xuanran.wang.shyl.mq.service.CusInfoActionService;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Calendar;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static com.api.ecme.excel.HtmlLayoutOperate.user;
|
||||
|
||||
/**
|
||||
* <h1>用户业务方法</h1>
|
||||
*
|
||||
|
@ -17,33 +32,206 @@ import weaver.xuanran.wang.shyl.mq.service.CusInfoActionService;
|
|||
*/
|
||||
public class UserServiceImpl extends CusInfoActionService {
|
||||
|
||||
/**
|
||||
* <h1>用户新增</h1>
|
||||
* @author xuanran.wang
|
||||
* @dateTime 2023/1/3 13:37
|
||||
* @param message mq消息
|
||||
* @return 成功/重试
|
||||
**/
|
||||
@Override
|
||||
public ConsumeConcurrentlyStatus cusCreateAction(MQMessage message) {
|
||||
try {
|
||||
String content = message.getContent();
|
||||
UserInfo userInfo = JSONObject.parseObject(content, UserInfo.class);
|
||||
return null;
|
||||
String userInfoDepartmentId = userInfo.getDepartmentId();
|
||||
// 部门id
|
||||
if(StringUtils.isBlank(userInfoDepartmentId)){
|
||||
throw new CustomerException("userInfo userInfoDepartmentId can not be empty!");
|
||||
}
|
||||
Map<String, Integer> depInfoByOutKey = consumerMapper.getDepInfoByOutKey(userInfo.getDepartmentId());
|
||||
if (MapUtils.isEmpty(depInfoByOutKey)) {
|
||||
// 通过外键获取部门信息数据为空
|
||||
throw new CustomerException("The department information data obtained by foreign key is empty!");
|
||||
}
|
||||
String nextHrmId = getNextHrmId();
|
||||
List<Object> params = initHrmParam(nextHrmId, userInfo, depInfoByOutKey);
|
||||
String departmentId = Util.null2DefaultStr(depInfoByOutKey.get("departmentId"), "");
|
||||
String subCompanyId = Util.null2DefaultStr(depInfoByOutKey.get("subCompanyId"), "");
|
||||
RecordSet insertRs = new RecordSet();
|
||||
//使用sql新增人员
|
||||
String insertSql = "insert into HrmResource(systemlanguage,workcode,departmentid,subcompanyid1," +
|
||||
" status,createrid,createdate,lastmodid,lastmoddate,lastname,sex,loginid," +
|
||||
" password,birthday,certificatenum,email, mobile,outkey,id) " +
|
||||
" values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
|
||||
if (insertRs.executeUpdate(insertSql, params)) {
|
||||
char separator = Util.getSeparator();
|
||||
Calendar todayCal = Calendar.getInstance();
|
||||
String today = Util.add0(todayCal.get(Calendar.YEAR), 4) + "-" +
|
||||
Util.add0(todayCal.get(Calendar.MONTH) + 1, 2) + "-" +
|
||||
Util.add0(todayCal.get(Calendar.DAY_OF_MONTH), 2);
|
||||
String userPara = "" + 1 + separator + today;
|
||||
insertRs.executeProc("HrmResource_CreateInfo", "" + nextHrmId + separator + userPara + separator + userPara);
|
||||
ResourceComInfo resourceComInfo = new ResourceComInfo();
|
||||
resourceComInfo.addResourceInfoCache(nextHrmId);
|
||||
SalaryManager salaryManager = new SalaryManager();
|
||||
salaryManager.initResourceSalary(nextHrmId);
|
||||
String para1 = "" + nextHrmId + separator + "" + separator + departmentId + separator + subCompanyId + separator + "0" + separator + "0";
|
||||
insertRs.executeProc("HrmResource_Trigger_Insert", para1);
|
||||
String sql_1 = ("insert into HrmInfoStatus (itemid,hrmid) values(1," + nextHrmId + ")");
|
||||
insertRs.execute(sql_1);
|
||||
String sql_2 = ("insert into HrmInfoStatus (itemid,hrmid) values(2," + nextHrmId + ")");
|
||||
insertRs.execute(sql_2);
|
||||
String sql_3 = ("insert into HrmInfoStatus (itemid,hrmid) values(3," + nextHrmId + ")");
|
||||
insertRs.execute(sql_3);
|
||||
String sql_10 = ("insert into HrmInfoStatus (itemid,hrmid) values(10," + nextHrmId + ")");
|
||||
insertRs.execute(sql_10);
|
||||
resourceComInfo.updateResourceInfoCache(nextHrmId);
|
||||
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
||||
}else {
|
||||
throw new CustomerException(Util.logStr("insert HrmResource error sql : {}, params : {}", insertSql, JSONObject.toJSONString(params)));
|
||||
}
|
||||
}catch (Exception e){
|
||||
throw new CustomerException(Util.logStr("hrmCreateAction error : {}", e.getMessage()));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* <h1>用户删除</h1>
|
||||
* @author xuanran.wang
|
||||
* @dateTime 2023/1/3 13:38
|
||||
* @param message mq消息
|
||||
* @return 成功/重试
|
||||
**/
|
||||
@Override
|
||||
public ConsumeConcurrentlyStatus cusDeleteAction(MQMessage message) {
|
||||
try {
|
||||
String content = message.getContent();
|
||||
UserInfo userInfo = JSONObject.parseObject(content, UserInfo.class);
|
||||
String id = userInfo.getId();
|
||||
if(StringUtils.isBlank(id)){
|
||||
throw new CustomerException("userInfo id can not be empty!");
|
||||
}
|
||||
boolean success = consumerMapper.updateUserStatusByOutKey(id);
|
||||
if (!success) {
|
||||
throw new CustomerException(Util.logStr("update user status error!"));
|
||||
}
|
||||
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
||||
} catch (Exception e) {
|
||||
throw new CustomerException(Util.logStr("cusDeleteAction execute error : [{}]!", e.getMessage()));
|
||||
throw new CustomerException(Util.logStr("hrmDeleteAction execute error : [{}]!", e.getMessage()));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* <h1>用户更新</h1>
|
||||
* @author xuanran.wang
|
||||
* @dateTime 2023/1/3 13:39
|
||||
* @param message mq消息
|
||||
* @return 成功/重试
|
||||
**/
|
||||
@Override
|
||||
public ConsumeConcurrentlyStatus cusUpdateAction(MQMessage message) {
|
||||
try {
|
||||
String content = message.getContent();
|
||||
UserInfo userInfo = JSONObject.parseObject(content, UserInfo.class);
|
||||
String userInfoId = userInfo.getId();
|
||||
String userInfoDepartmentId = userInfo.getDepartmentId();
|
||||
// 接口人员id
|
||||
if(StringUtils.isBlank(userInfoId)){
|
||||
throw new CustomerException("userInfo id can not be empty!");
|
||||
}
|
||||
// 部门id
|
||||
if(StringUtils.isBlank(userInfoDepartmentId)){
|
||||
throw new CustomerException("userInfo userInfoDepartmentId can not be empty!");
|
||||
}
|
||||
// oa部门信息
|
||||
Map<String, Integer> depInfoByOutKey = consumerMapper.getDepInfoByOutKey(userInfo.getDepartmentId());
|
||||
if (MapUtils.isEmpty(depInfoByOutKey)) {
|
||||
// 通过外键获取部门信息数据为空
|
||||
throw new CustomerException("The department information data obtained by foreign key is empty!");
|
||||
}
|
||||
// oa人员id
|
||||
String hrmId = consumerMapper.getHrmIdByOutKey(userInfoId);
|
||||
if(StringUtils.isBlank(hrmId)){
|
||||
throw new CustomerException(Util.logStr("userInfoId = [{}], No personnel information found in oa!", userInfoId));
|
||||
}
|
||||
//使用sql新增人员
|
||||
String updateSql = "update HrmResource set systemlanguage = ?, workcode = ?, departmentid = ?, subcompanyid1 = ?," +
|
||||
" status = ?,createrid = ?, createdate = ?, lastmodid = ? ,lastmoddate = ? ,lastname = ? ,sex = ?, loginid = ?, " +
|
||||
" password = ?, birthday = ?,certificatenum = ?,email = ?, mobile = ? , outkey = ? where id = ? ";
|
||||
List<Object> params = initHrmParam(hrmId, userInfo, depInfoByOutKey);
|
||||
RecordSet updateRs = new RecordSet();
|
||||
if(!updateRs.executeUpdate(updateSql, params)){
|
||||
throw new CustomerException(Util.logStr("update HrmResource error sql : {}, params : {}", updateSql, JSONObject.toJSONString(params)));
|
||||
}
|
||||
ResourceComInfo resourceComInfo = new ResourceComInfo();
|
||||
// 清空缓存
|
||||
resourceComInfo.updateResourceInfoCache(hrmId);
|
||||
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
||||
}catch (Exception e){
|
||||
throw new CustomerException(Util.logStr("hrmUpdateAction execute error : [{}]!", e.getMessage()));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* <h1>封装人员更新/插入参数集合</h1>
|
||||
* @author xuanran.wang
|
||||
* @dateTime 2023/1/3 13:34
|
||||
* @param nextHrmId oa人员id
|
||||
* @param userInfo mq人员对象
|
||||
* @param depInfoByOutKey 部门信息
|
||||
* @return 更新/插入参数
|
||||
**/
|
||||
private List<Object> initHrmParam(String nextHrmId, UserInfo userInfo, Map<String, Integer> depInfoByOutKey){
|
||||
String password = Util.getEncrypt(RocketMQConstant.DEFAULT_PASSWORD);
|
||||
String date = TimeUtil.getCurrentDateString();
|
||||
ArrayList<Object> params = new ArrayList<>();
|
||||
String userName = Util.null2DefaultStr(userInfo.getUserName(), "");
|
||||
String departmentId = Util.null2DefaultStr(depInfoByOutKey.get("departmentId"), "");
|
||||
String subCompanyId = Util.null2DefaultStr(depInfoByOutKey.get("subCompanyId"), "");
|
||||
// 语言
|
||||
params.add(7);
|
||||
// 工号
|
||||
params.add(userName);
|
||||
// 部门id
|
||||
params.add(departmentId);
|
||||
// 分部id
|
||||
params.add(subCompanyId);
|
||||
// 状态
|
||||
params.add(1);
|
||||
// 创建人
|
||||
params.add(1);
|
||||
// 创建日期
|
||||
params.add(date);
|
||||
// 最后修改人
|
||||
params.add(1);
|
||||
// 最后修改日期
|
||||
params.add(date);
|
||||
// 人员名称
|
||||
params.add(Util.null2DefaultStr(userInfo.getDisplayName(), ""));
|
||||
// 性别 如果没传就默认男
|
||||
params.add(RocketMQConstant.SEX_MAPPING.get(Util.null2DefaultStr(userInfo.getGender(), "1")));
|
||||
// 登陆名
|
||||
params.add(userName);
|
||||
// 密码
|
||||
params.add(password);
|
||||
// 生日
|
||||
params.add(Util.null2DefaultStr(userInfo.getBirthDate(), ""));
|
||||
// 身份证号码
|
||||
params.add(Util.null2DefaultStr(userInfo.getIdCardNo(), ""));
|
||||
// 邮箱
|
||||
params.add(Util.null2DefaultStr(userInfo.getEmail(), ""));
|
||||
// 手机号
|
||||
params.add(Util.null2DefaultStr(userInfo.getMobile(), ""));
|
||||
// 用户out key
|
||||
params.add(Util.null2DefaultStr(userInfo.getId(),""));
|
||||
// oaId
|
||||
params.add(nextHrmId);
|
||||
return params;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConsumeConcurrentlyStatus cusPassWordAction(MQMessage message) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -71,10 +71,17 @@ public class RocketUtil {
|
|||
MQMessage mqMessage = null;
|
||||
try {
|
||||
msgBody = new String(messageExt.getBody(), StandardCharsets.UTF_8);
|
||||
mqMessage = JSONObject.parseObject(msgBody, MQMessage.class);
|
||||
}catch (Exception e){
|
||||
throw new CustomerException(Util.logStr("parse msgBody to Message error current msgBody is {}, the error is {}", msg, e.getMessage()));
|
||||
if(StringUtils.isBlank(msgBody)){
|
||||
throw new CustomerException("MQ msgBody is empty!");
|
||||
}
|
||||
mqMessage = JSONObject.parseObject(msgBody, MQMessage.class);
|
||||
// 业务主体
|
||||
String content = Util.null2DefaultStr(mqMessage.getContent(),"");
|
||||
if(StringUtils.isBlank(content)){
|
||||
throw new CustomerException(Util.logStr("the messageId : {}, content is empty!", Util.null2DefaultStr(mqMessage.getId(), "")));
|
||||
}
|
||||
log.info(Util.logStr("MQMessage : {} ", mqMessage));
|
||||
// 业务类型
|
||||
String actionType = mqMessage.getActionType();
|
||||
switch (actionType){
|
||||
case RocketMQConstant.CREATE_ACTION:{
|
||||
|
@ -86,8 +93,14 @@ public class RocketUtil {
|
|||
case RocketMQConstant.DELETE_ACTION:{
|
||||
return cusInfoActionService.cusDeleteAction(mqMessage);
|
||||
}
|
||||
case RocketMQConstant.PASSWORD_ACTION:{
|
||||
return cusInfoActionService.cusPassWordAction(mqMessage);
|
||||
}
|
||||
default: throw new CustomerException(Util.logStr("current actionType : [{}] is not supported!", actionType));
|
||||
}
|
||||
}catch (Exception e){
|
||||
throw new CustomerException(Util.logStr("parse msgBody to Message error current msgBody is {}, the error is {}", msg, e.getMessage()));
|
||||
}
|
||||
}else {
|
||||
log.error("the msgList is empty!");
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue