安永数据同步优化 cus_get_log service编写
parent
0042972aa5
commit
3819487589
|
@ -26,13 +26,28 @@ import java.util.Map;
|
|||
*
|
||||
* @author youHong.ai
|
||||
*/
|
||||
@Path("/ebu7-dev1/common/log")
|
||||
@Path("/ebu7-dev1/common/log/")
|
||||
public class GetLogController {
|
||||
|
||||
private final Logger log = Util.getLogger();
|
||||
|
||||
private final GetLogService service = new GetLogService();
|
||||
|
||||
@GET
|
||||
@Path("ipList")
|
||||
@Produces(MediaType.APPLICATION_JSON)
|
||||
public String getIpList(@Context HttpServletRequest request, @Context HttpServletResponse response) {
|
||||
try {
|
||||
User loginUser = HrmUserVarify.getUser(request, response);
|
||||
String path = request.getRealPath("/");
|
||||
return ApiResult.success(service.getIpList(loginUser, path));
|
||||
} catch (Exception e) {
|
||||
log.error("获取ip节点出错 : " + e.getMessage());
|
||||
log.error(Util.getErrString(e));
|
||||
return ApiResult.error("获取ip节点出错!");
|
||||
}
|
||||
}
|
||||
|
||||
@GET
|
||||
@Path("get")
|
||||
@Produces(MediaType.APPLICATION_OCTET_STREAM)
|
||||
|
|
|
@ -2,10 +2,14 @@ package com.api.ebu7dev1.common.getlog.service;
|
|||
|
||||
import aiyh.utils.Util;
|
||||
import aiyh.utils.excention.CustomerException;
|
||||
import com.api.ebu7dev1.common.getlog.util.GetLogUtil;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import weaver.hrm.User;
|
||||
|
||||
import javax.ws.rs.core.Response;
|
||||
import java.io.File;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* <h1>获取日志</h1>
|
||||
|
@ -19,12 +23,51 @@ public class GetLogService {
|
|||
return null;
|
||||
}
|
||||
|
||||
public String collectLogInfo(User loginUser, Map<String, Object> params) {
|
||||
if (loginUser.getUID() != 1) {
|
||||
throw new CustomerException("无权限查看!");
|
||||
/**
|
||||
* <h1>获取所有的ip地址</h1>
|
||||
* @author xuanran.wang
|
||||
* @dateTime 2023/7/1 14:19
|
||||
* @param ecologyPath ec路径
|
||||
* @return 集群ip 以及 本地ip
|
||||
**/
|
||||
public Set<String> getIpList(User loginUser, String ecologyPath){
|
||||
GetLogUtil.checkUser(loginUser);
|
||||
String weaverPropertiesPath;
|
||||
String resinPath = System.getProperty("user.dir") + File.separator;
|
||||
if (ecologyPath.endsWith(File.separator)) {
|
||||
weaverPropertiesPath = ecologyPath + "WEB-INF" + File.separator + "prop" + File.separator + "weaver.properties";
|
||||
} else {
|
||||
weaverPropertiesPath = ecologyPath + File.separator + "WEB-INF" + File.separator + "prop" + File.separator + "weaver.properties";
|
||||
}
|
||||
String localIp = GetLogUtil.getInnerIp();
|
||||
Set<String> ipList = GetLogUtil.readEcologyProp(weaverPropertiesPath);
|
||||
if(CollectionUtils.isEmpty(ipList)){
|
||||
String resinBinPath = resinPath + "bin" + File.separator + "startresin.sh";
|
||||
ipList.addAll(GetLogUtil.readResinProp(resinBinPath));
|
||||
}
|
||||
if (ipList.size() != 0) {//没有集群的时候
|
||||
boolean status = false;
|
||||
for (String ip : ipList) {
|
||||
if (ip.contains(localIp) || "127.0.0.1".equals(ip) || "127.0.0.1".equals(localIp)) {
|
||||
status = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!status) {
|
||||
ipList.add(localIp);
|
||||
}
|
||||
}else {
|
||||
ipList.add(localIp);
|
||||
}
|
||||
return ipList;
|
||||
}
|
||||
|
||||
public String collectLogInfo(User loginUser, Map<String, Object> params) {
|
||||
GetLogUtil.checkUser(loginUser);
|
||||
String startDate = Util.null2String(params.get("startDate"));
|
||||
String endDate = Util.null2String(params.get("endDate"));
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package weaver.xuanran.wang.eny.data_async.mapper;
|
||||
|
||||
import aiyh.utils.annotation.recordset.*;
|
||||
import io.swagger.models.auth.In;
|
||||
import weaver.xuanran.wang.eny.data_async.entity.DataAsyncConfigDetail;
|
||||
import weaver.xuanran.wang.eny.data_async.entity.DataAsyncConfigMain;
|
||||
|
||||
|
@ -53,6 +54,13 @@ public interface DataAsyncMapper {
|
|||
@ParamMapper("min") Integer min,
|
||||
@ParamMapper("max") Integer max);
|
||||
|
||||
@Select("select id from $t{tableName} where $t{foreignKey} between #{min} and #{max}")
|
||||
@CaseConversion(value = false)
|
||||
List<Integer> selectDataIdsByForeignKey(@ParamMapper("tableName") String tableName,
|
||||
@ParamMapper("foreignKey") String foreignKey,
|
||||
@ParamMapper("min") Integer min,
|
||||
@ParamMapper("max") Integer max);
|
||||
|
||||
@Select(custom = true)
|
||||
String selectCustomerSql(@SqlString String sql, Map<String, Object> map);
|
||||
|
||||
|
|
|
@ -21,6 +21,7 @@ import weaver.xuanran.wang.eny.data_async.entity.DataAsyncConfigMain;
|
|||
import weaver.xuanran.wang.eny.data_async.mapper.DataAsyncMapper;
|
||||
import weaver.xuanran.wang.eny.data_async.util.EyDataAsyncTokenUtil;
|
||||
import weaver.xuanran.wang.eny.data_async.util.ValueRuleMethod;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
@ -71,7 +72,7 @@ public class DataAsyncServiceImpl {
|
|||
**/
|
||||
public DataAsyncConfigMain getConfig(String onlyMark) {
|
||||
DataAsyncConfigMain config = asyncMapper.selectConfigByOnlyMark(onlyMark);
|
||||
if(Objects.isNull(config) || CollectionUtils.isEmpty(config.getConfigDetailList())){
|
||||
if (Objects.isNull(config) || CollectionUtils.isEmpty(config.getConfigDetailList())) {
|
||||
throw new CustomerException("该唯一标识在配置表中未找到对应的配置!或存在该配置但是明细数据为空!");
|
||||
}
|
||||
return config;
|
||||
|
@ -79,13 +80,14 @@ public class DataAsyncServiceImpl {
|
|||
|
||||
/**
|
||||
* <h1>数据同步 有一个点 就是如果接口请求过程中报错了 怎么重新处理</h1>
|
||||
*
|
||||
* @param config 配置对象
|
||||
* @param tokenUrl token地址
|
||||
* @param params token 参数
|
||||
* @author xuanran.wang
|
||||
* @dateTime 2023/6/9 13:28
|
||||
* @param config 配置对象
|
||||
* @param tokenUrl token地址
|
||||
* @param params token 参数
|
||||
**/
|
||||
public void asyncData(DataAsyncConfigMain config, String tokenUrl, Map<String, Object> params){
|
||||
public void asyncData(DataAsyncConfigMain config, String tokenUrl, Map<String, Object> params) {
|
||||
long startTime = System.currentTimeMillis();
|
||||
log.info("数据开始同步时间 : " + TimeUtil.getCurrentTimeString());
|
||||
String asyncUrl = config.getAsync_url();
|
||||
|
@ -101,12 +103,18 @@ public class DataAsyncServiceImpl {
|
|||
CountDownLatch latch = null;
|
||||
int preNum = 0;
|
||||
// 如果不是第一页就先定位到指定位置
|
||||
if(pageNo > 1){
|
||||
if (pageNo > 1) {
|
||||
count += (pageNo - 1) * pageSize;
|
||||
preNum = count;
|
||||
}
|
||||
List<DataAsyncConfigDetail> configDetailList = config.getConfigDetailList();
|
||||
List<DataAsyncConfigDetail> primaryKeyList = configDetailList.stream().filter(item -> item.getPrimary_key() == 0).collect(Collectors.toList());
|
||||
if (CollectionUtils.isEmpty(primaryKeyList)) {
|
||||
throw new CustomerException("请先配置主键字段!");
|
||||
}
|
||||
DataAsyncConfigDetail primaryKey = primaryKeyList.get(0);
|
||||
boolean show = true;
|
||||
while (count < total){
|
||||
while (count < total) {
|
||||
// 从缓存中获取token
|
||||
String token = EyDataAsyncTokenUtil.getToken(tokenUrl, params);
|
||||
header.put("Authorization", "Bearer " + token);
|
||||
|
@ -118,11 +126,11 @@ public class DataAsyncServiceImpl {
|
|||
try {
|
||||
// 获取接口数据
|
||||
data = requestMasterPlate.apiPost(url, new HashMap<>(), header, dataSuccess);
|
||||
total = Util.getIntValue(Util.null2DefaultStr(data.get("total"),""), -1);
|
||||
if(show){
|
||||
total = Util.getIntValue(Util.null2DefaultStr(data.get("total"), ""), -1);
|
||||
if (show) {
|
||||
log.info("接口数据条数total : " + total);
|
||||
}
|
||||
if(latch == null){
|
||||
if (latch == null) {
|
||||
// 应该传入 total / pageSize
|
||||
latch = new CountDownLatch(total);
|
||||
}
|
||||
|
@ -130,52 +138,57 @@ public class DataAsyncServiceImpl {
|
|||
count += pageSize;
|
||||
List<Map<String, Object>> list = (List<Map<String, Object>>) data.get("list");
|
||||
// 进行数据处理
|
||||
maps = dealData(config, list);
|
||||
maps = dealData(config, list, primaryKey);
|
||||
show = false;
|
||||
}catch (Exception e){
|
||||
} catch (Exception e) {
|
||||
for (int i = 0; i < pageSize; i++) {
|
||||
if(latch != null){
|
||||
if (latch != null) {
|
||||
latch.countDown();
|
||||
}
|
||||
}
|
||||
log.error("请求异常 : " + e.getMessage() + " 当前页数 : " + pageNo);
|
||||
if(latch != null){
|
||||
if (latch != null) {
|
||||
log.info("当前latch信号数量 : " + latch.getCount());
|
||||
}
|
||||
continue;
|
||||
}
|
||||
CountDownLatch finalLatch = latch;
|
||||
if(finalLatch.getCount() > 0 && maps.size() > 0){
|
||||
if (finalLatch.getCount() > 0 && maps.size() > 0) {
|
||||
// 异步提交数据库处理
|
||||
threadPoolInstance.execute(()->{
|
||||
threadPoolInstance.execute(() -> {
|
||||
log.info("=======================================================================================");
|
||||
// 查询当天同步的数据
|
||||
Map<String, Integer> map = parseMaxAndMin(maps, primaryKey);
|
||||
Integer max = map.get("max");
|
||||
Integer min = map.get("min");
|
||||
log.info(Thread.currentThread().getName() + " 入库之前信号数量 : " + finalLatch.getCount());
|
||||
boolean success = CusData2OA.baseInsertAndUpdate(config.getModel_id(), maps, finalLatch);
|
||||
log.info("数据库更新标识 : " + success);
|
||||
if (!success) {
|
||||
throw new CustomerException("更新数据失败!");
|
||||
}
|
||||
log.info(Thread.currentThread().getName() + " 入库之后信号数量 : " + finalLatch.getCount());
|
||||
List<Integer> dataIdList = asyncMapper.selectDataIdsByForeignKey(config.getTable_name(), primaryKey.getModel_field_name(), min, max);
|
||||
if (CollectionUtils.isNotEmpty(dataIdList)) {
|
||||
log.info("需要权限重构的数据条数 : " + dataIdList.size());
|
||||
for (Integer id : dataIdList) {
|
||||
moderightinfo.rebuildModeDataShareByEdit(1, Util.getIntValue(config.getModel_id(), -1), id);
|
||||
}
|
||||
}
|
||||
log.info("=======================================================================================");
|
||||
});
|
||||
}
|
||||
}
|
||||
try {
|
||||
if(latch != null){
|
||||
if (latch != null) {
|
||||
log.info("preNum : " + preNum);
|
||||
while (preNum-- > 0){
|
||||
while (preNum-- > 0) {
|
||||
latch.countDown();
|
||||
}
|
||||
boolean await = latch.await(30, TimeUnit.MINUTES);
|
||||
if(!await){
|
||||
if (!await) {
|
||||
throw new CustomerException("线程等待时间超过最大时间限制!");
|
||||
}
|
||||
// 查询当天同步的数据
|
||||
// 应该在上面 每一批数据处理完直接权限重构
|
||||
List<Integer> dataIdList = asyncMapper.selectDataIdListByCreateDate(config.getTable_name(), TimeUtil.getCurrentDateString());
|
||||
if(CollectionUtils.isNotEmpty(dataIdList)){
|
||||
log.info("需要权限重构的数据条数 : " + dataIdList.size());
|
||||
for (Integer id : dataIdList) {
|
||||
moderightinfo.rebuildModeDataShareByEdit(1, Util.getIntValue(config.getModel_id(),-1), id);
|
||||
}
|
||||
}
|
||||
log.info("数据结束同步时间 : " + TimeUtil.getCurrentTimeString());
|
||||
long endTime = System.currentTimeMillis();
|
||||
log.info("同步耗时时间 " + (endTime - startTime) / 1000 + " s");// 等待所有转换操作完成
|
||||
|
@ -187,64 +200,60 @@ public class DataAsyncServiceImpl {
|
|||
|
||||
/**
|
||||
* <h1>同步处理数据</h1>
|
||||
*
|
||||
* @param config 配置对象
|
||||
* @param data 接口数据
|
||||
* @author xuanran.wang
|
||||
* @dateTime 2023/6/8 16:50
|
||||
* @param config 配置对象
|
||||
* @param data 接口数据
|
||||
**/
|
||||
public List<Map<String, Object>> dealData(DataAsyncConfigMain config, List<Map<String, Object>> data) {
|
||||
public List<Map<String, Object>> dealData(DataAsyncConfigMain config,
|
||||
List<Map<String, Object>> data,
|
||||
DataAsyncConfigDetail primaryKey) {
|
||||
List<DataAsyncConfigDetail> configDetailList = config.getConfigDetailList();
|
||||
List<DataAsyncConfigDetail> primaryKey = configDetailList.stream().filter(item -> item.getPrimary_key() == 0).collect(Collectors.toList());
|
||||
List<Map<String, Object>> maps = new ArrayList<>();
|
||||
if(CollectionUtils.isNotEmpty(primaryKey)){
|
||||
// 模版-接口外键
|
||||
String primaryKeyModelFieldName = primaryKey.get(0).getModel_field_name();
|
||||
maps = data.stream().map(item -> convert(item, primaryKey.get(0).getInterface_field(), configDetailList)).filter(MapUtils::isNotEmpty).collect(Collectors.toList());
|
||||
// 进行排序
|
||||
Optional<Integer> minClassId = maps.stream()
|
||||
.map(map -> Util.getIntValue(Util.null2DefaultStr(map.get(primaryKeyModelFieldName),""),-1))
|
||||
.min(Comparator.naturalOrder());
|
||||
Optional<Integer> maxClassId = maps.stream()
|
||||
.map(map -> Util.getIntValue(Util.null2DefaultStr(map.get(primaryKeyModelFieldName),""),-1))
|
||||
.max(Comparator.naturalOrder());
|
||||
Integer min = minClassId.orElse(0);
|
||||
Integer max = maxClassId.orElse(0);
|
||||
// 按照外键排序并且在oa中范围查询出外键与oa数据的对应关系
|
||||
List<Map<String, String>> dataIdList = asyncMapper.selectDataIds(primaryKeyModelFieldName, config.getTable_name(), min, max);
|
||||
if("1".equals(debug)){
|
||||
log.info("dataIdList : " + JSONObject.toJSONString(dataIdList));
|
||||
// List<DataAsyncConfigDetail> primaryKey = configDetailList.stream().filter(item -> item.getPrimary_key() == 0).collect(Collectors.toList());
|
||||
List<Map<String, Object>> maps;
|
||||
// 模版-接口外键
|
||||
String primaryKeyModelFieldName = primaryKey.getModel_field_name();
|
||||
maps = data.stream().map(item -> convert(item, primaryKey.getInterface_field(), configDetailList)).filter(MapUtils::isNotEmpty).collect(Collectors.toList());
|
||||
// 进行排序
|
||||
Map<String, Integer> map = parseMaxAndMin(maps, primaryKey);
|
||||
Integer max = map.get("max");
|
||||
Integer min = map.get("min");
|
||||
// 按照外键排序并且在oa中范围查询出外键与oa数据的对应关系
|
||||
List<Map<String, String>> dataIdList = asyncMapper.selectDataIds(primaryKeyModelFieldName, config.getTable_name(), min, max);
|
||||
if ("1".equals(debug)) {
|
||||
log.info("dataIdList : " + JSONObject.toJSONString(dataIdList));
|
||||
}
|
||||
if (CollectionUtils.isNotEmpty(dataIdList)) {
|
||||
HashMap<String, String> idMap = parseListMap2Map(primaryKeyModelFieldName, "id", dataIdList);
|
||||
if ("1".equals(debug)) {
|
||||
log.info("idMap : " + JSONObject.toJSONString(idMap));
|
||||
}
|
||||
if(CollectionUtils.isNotEmpty(dataIdList)){
|
||||
HashMap<String, String> idMap = parseListMap2Map(primaryKeyModelFieldName, "id", dataIdList);
|
||||
if("1".equals(debug)){
|
||||
log.info("idMap : " + JSONObject.toJSONString(idMap));
|
||||
maps.forEach(item -> {
|
||||
if ("1".equals(debug)) {
|
||||
log.info("item : " + JSONObject.toJSONString(item));
|
||||
}
|
||||
maps.forEach(item->{
|
||||
if("1".equals(debug)){
|
||||
log.info("item : " + JSONObject.toJSONString(item));
|
||||
}
|
||||
String id = Util.null2DefaultStr(item.get(primaryKeyModelFieldName),"");
|
||||
if("1".equals(debug)){
|
||||
log.info("id : " + id);
|
||||
}
|
||||
if(idMap.containsKey(id)){
|
||||
String oaId = idMap.get(id);
|
||||
item.put("id",oaId);
|
||||
}
|
||||
});
|
||||
}
|
||||
String id = Util.null2DefaultStr(item.get(primaryKeyModelFieldName), "");
|
||||
if ("1".equals(debug)) {
|
||||
log.info("id : " + id);
|
||||
}
|
||||
if (idMap.containsKey(id)) {
|
||||
String oaId = idMap.get(id);
|
||||
item.put("id", oaId);
|
||||
}
|
||||
});
|
||||
}
|
||||
return maps;
|
||||
}
|
||||
|
||||
public Map<String, Integer> parseMaxAndMin(List<Map<String, Object>> maps, DataAsyncConfigDetail primaryKey){
|
||||
public Map<String, Integer> parseMaxAndMin(List<Map<String, Object>> maps, DataAsyncConfigDetail primaryKey) {
|
||||
String primaryKeyModelFieldName = primaryKey.getModel_field_name();
|
||||
// 进行排序
|
||||
Optional<Integer> minClassId = maps.stream()
|
||||
.map(map -> Util.getIntValue(Util.null2DefaultStr(map.get(primaryKeyModelFieldName),""),-1))
|
||||
.map(map -> Util.getIntValue(Util.null2DefaultStr(map.get(primaryKeyModelFieldName), ""), -1))
|
||||
.min(Comparator.naturalOrder());
|
||||
Optional<Integer> maxClassId = maps.stream()
|
||||
.map(map -> Util.getIntValue(Util.null2DefaultStr(map.get(primaryKeyModelFieldName),""),-1))
|
||||
.map(map -> Util.getIntValue(Util.null2DefaultStr(map.get(primaryKeyModelFieldName), ""), -1))
|
||||
.max(Comparator.naturalOrder());
|
||||
Integer min = minClassId.orElse(0);
|
||||
Integer max = maxClassId.orElse(0);
|
||||
|
@ -257,18 +266,19 @@ public class DataAsyncServiceImpl {
|
|||
|
||||
/**
|
||||
* <h1>将数据进行转换</h1>
|
||||
* @author xuanran.wang
|
||||
* @dateTime 2023/6/9 13:27
|
||||
* @param data 原始数据
|
||||
* @param foreignKey 外键
|
||||
*
|
||||
* @param data 原始数据
|
||||
* @param foreignKey 外键
|
||||
* @param configDetailList 配置集合
|
||||
* @return 转换后的数据
|
||||
* @author xuanran.wang
|
||||
* @dateTime 2023/6/9 13:27
|
||||
**/
|
||||
public Map<String, Object> convert(Map<String, Object> data,
|
||||
String foreignKey,
|
||||
List<DataAsyncConfigDetail> configDetailList){
|
||||
List<DataAsyncConfigDetail> configDetailList) {
|
||||
String foreignVal = Util.null2DefaultStr(data.get(foreignKey), "");
|
||||
if(StringUtils.isBlank(foreignVal)){
|
||||
if (StringUtils.isBlank(foreignVal)) {
|
||||
return new HashMap<>();
|
||||
}
|
||||
HashMap<String, Object> map = new HashMap<>();
|
||||
|
@ -280,19 +290,20 @@ public class DataAsyncServiceImpl {
|
|||
|
||||
/**
|
||||
* <h1>将集合数据放到缓存中</h1>
|
||||
*
|
||||
* @param listMap 集合数据
|
||||
* @author xuanran.wang
|
||||
* @dateTime 2023/4/10 18:33
|
||||
* @param listMap 集合数据
|
||||
**/
|
||||
public static HashMap<String, String> parseListMap2Map(String key, String value, List<Map<String, String>> listMap){
|
||||
if(CollectionUtils.isEmpty(listMap)){
|
||||
public static HashMap<String, String> parseListMap2Map(String key, String value, List<Map<String, String>> listMap) {
|
||||
if (CollectionUtils.isEmpty(listMap)) {
|
||||
return new HashMap<>();
|
||||
}
|
||||
HashMap<String, String> res = new HashMap<>();
|
||||
listMap.forEach(map -> {
|
||||
String outKey = Util.null2DefaultStr(map.get(key),"");
|
||||
if(StringUtils.isNotBlank(outKey)){
|
||||
String id = Util.null2DefaultStr(map.get(value),"");
|
||||
String outKey = Util.null2DefaultStr(map.get(key), "");
|
||||
if (StringUtils.isNotBlank(outKey)) {
|
||||
String id = Util.null2DefaultStr(map.get(value), "");
|
||||
res.put(outKey, id);
|
||||
}
|
||||
});
|
||||
|
@ -300,6 +311,4 @@ public class DataAsyncServiceImpl {
|
|||
}
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -37,7 +37,7 @@ public class DataAsyncTest extends BaseTest {
|
|||
String clientId = "6d067ac6c14211ed8fd700163e1331c6";
|
||||
String appId = "WEAVER";
|
||||
String appSecret = "7b09e5f7c14211ed8fd700163e1331c6";
|
||||
int pageSize = 500;
|
||||
int pageSize = 100;
|
||||
int pageNo = 25;
|
||||
dataAsyncService.setPageSize(25);
|
||||
dataAsyncService.setPageSize(pageSize);
|
||||
|
|
Loading…
Reference in New Issue