灰气球

灰气球

ElasticSearch 数据同步框架 elasticsearch-sync

293
2021-07-11

前言

在上文《Elasticsearch存储设计与MySQL数据同步方案》中提及,常见同步方案有三种。
为什么有这个框架?抱歉,当前应该说是Demo。
一方面,与团队商讨之后,根据当前情况,使用方案二定时Select同步方案落地最为适宜。
另一方面,通过Binlog 实现Elasticsearch与MySQL的数据同步已经有canal开源实现了,没有必要重复造轮子。

落地方案

MySQL数据表维护一个业务无关的更新时间,任何更新数据表内容的操作都会使该字段的更新;
生产者定时任务,按一定的时间周期扫描MySQL数据表,把该时间段内发生变化的数据标识(或者主键)push到MQ中;
消费者定时任务,负责消费MQ中的内容,组装数据同步到Elasticsearch中。
MySQL负责业务事物场景的数据存储,而Elasticsearch负责系统的数据检索和数据导出功能。

代码仓库

Elasticsearch存储设计

MySQL初始化脚本:mysql-init.sql
ElasticSearch初始化脚本:elasticsearch-init.txt
关系型数据库MySQL的关系表设计:user, user_extend, user_operation_log
MySQL关联关系如下:

user : user_extend : user_operation_log = 1 : 1 : n

Elasticsearch mapping 如下:

{
    "properties": {
        "userId": {
            "type": "long"
        },
        "name": {
            "type": "keyword"
        },
        "age": {
            "type": "integer"
        },
        "email": {
            "type": "keyword"
        },
        "headPortrait": {
            "type": "text"
        },
        "imgs": {
            "type": "text"
        },
        "userOperationLogs": {
            "properties": {
                "id": {
                    "type": "long"
                },
                "userId": {
                    "type": "long"
                },
                "desc": {
                    "type": "text"
                }
            }
        }
    }
}

关键项解释(JAVA)

本系统是基于Java语言实现的,本文只展示主要代码,具体实现请从Github拉取

Jar依赖

框架组件 版本
spring-boot 2.5.2
mybatis 3.3.2
elasticsearch-rest-high-level-client 7.12.1
注意根据Elasticsearch版本挑选,最好跟你的Elasticsearch版本对其,特别注意,别夸版本使用jar!
Elasticsearch 7.12.1
同上
jedis 3.6.1

主要Class解释

  • ElasticsearchConfig:相关配置,初始化Elasticsearch操作客户端:RestHighLevelClient;
  • BaseEsPo:是Elasticsearch Type的ORM对象基类;
  • BaseDao:是对Elasticsearch的Type的curd操作实现的基类;
  • BaseProducer:生产者定时任务基类,实现了定时从MySQL指定数据表中拉取更新的数据标识push到MQ中。
  • BaseConsumer:消费者定时任务基类,实现了根据MQ中的数据标识组合数据同步到Elasticsearch的能力;

主要配置项

# 生产者配置示例
## [通用] 是否打印生产内容
es.producer.log.enable=0
## 是否启用
UserInfoProducer.enable=1
## 初始更新时间
UserExtendProducer.default.updateTimeStart=0
## 一次最大生产数量
UserExtendProducer.maxSize=1
## 队列名称
UserExtendProducer.queue.redis.key=UserInfoProducer:queue:9
## 上一次处理完成的更新时间
UserExtendProducer.update.time.start.redis.key=UserInfoProducer:updateTimeStart:9

# 消费者配置示例
## [通用] 队列堆积预警开关
es.consumer.queue.size.alarm.enable=0
## [通用] 队列堆积预警阈值,是最大消费数量的倍数
es.consumer.queue.size.threshold.multiple=0
## 是否启用
UserInfoConsumer.enable=1
## 一次消费最大数量
UserInfoConsumer.consumeSize=1

编码实现(JAVA)

注意:本文仅列出主要代码,具体详情请前往Github查看

  • Elasticsearch 配置,初始化:RestHighLevelClient
@Slf4j
@Configuration
public class ElasticsearchConfig {

    @Value("${elasticsearch.host}")
    private String host;

    @Value("${elasticsearch.port}")
    private String port;

    @Value("${elasticsearch.username:}")
    private String username;

    @Value("${elasticsearch.password:}")
    private String password;

    /**
     * 索引名称
     */
    public static final String ES_INDEX_NAME = "user";
    /**
     * 类型名称
     */
    public static final String ES_TYPE_NAME = "_doc";


    @Bean
    public RestHighLevelClient restHighLevelClient() {
        log.info("restHighLevelClient init start, host = {}, port = {}, username = {}, password = {}", host, port, username, password);
        CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
        return new RestHighLevelClient(RestClient.builder(new HttpHost(host, Integer.parseInt(port), "http"))
                .setHttpClientConfigCallback((HttpAsyncClientBuilder httpAsyncClientBuilder) -> httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider)));
    }
}
  • BaseEsPo
@Data
public abstract class BaseEsPo {

    private String esId;

    private Long userId;

    public void checkElseThrow() {
        if (userId == null || userId == 0) {
            throw new RuntimeException("userId不能为空");
        }
    }

}
  • UserInfo 对应 user表的内容
@Getter
@Setter
@NoArgsConstructor
@ToString(callSuper = true)
public class UserInfo extends BaseEsPo {

    private Long id;
    private String name;
    private Integer age;
    private String email;
}
  • BaseDao负责Es对象的crud操作
@Slf4j
@Getter
public abstract class BaseDao {

    @Resource
    protected RestHighLevelClient restHighLevelClient;

    protected static final Integer MAX_BULK_SIZE = 100;


    public <T extends BaseEsPo> Optional<T> findByUserId(Long userId, Class<T> clazz) {
        String className = this.getClass().getSimpleName();
        List<T> pos = new ArrayList<>();
        SearchRequest request = new SearchRequest();
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(QueryBuilders.matchPhraseQuery("userId", userId));
        request.source(searchSourceBuilder);
        request.indices(ElasticsearchConfig.ES_INDEX_NAME);
        request.types(ElasticsearchConfig.ES_TYPE_NAME);
        try {
            log.info("className = {}, restHighLevelClient.search, req = {}", className, JSON.toJSONString(request));
            SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
            log.info("className = {}, restHighLevelClient.search, res = {}", className, JSON.toJSONString(response));
            for (SearchHit hit : response.getHits().getHits()) {
                String id = hit.getId();
                T po = JSON.parseObject(hit.getSourceAsString(), clazz);
                po.setEsId(id);
                pos.add(po);
            }
        } catch (IOException e) {
            log.error(String.format("className = %s, es查询异常, userId = %s", className, userId), e);
            throw new RuntimeException(e);
        }
        if (CollectionUtils.isEmpty(pos)) {
            return Optional.empty();
        }
        if (CollectionUtils.size(pos) > 1) {
            log.info("es:存在多条数据:userId:{}", userId);
        }
        return Optional.of(pos.get(0));
    }


    public <T extends BaseEsPo> void insertOrUpdate(T po, Class<T> clazz) {
        String className = this.getClass().getSimpleName();
        log.info("className = {}, insertOrUpdate po = {}", className, JSON.toJSONString(po));
        po.checkElseThrow();
        Optional<T> optional = findByUserId(po.getUserId(), clazz);
        if (optional.isPresent()) {
            T poInEs = optional.get();
            log.info("className = {}, userId = {}, esId = {}", className, po.getUserId(), poInEs.getEsId());
            UpdateRequest request = new UpdateRequest(ElasticsearchConfig.ES_INDEX_NAME, ElasticsearchConfig.ES_TYPE_NAME, poInEs.getEsId());
            request.doc(JSON.toJSONString(po), XContentType.JSON);
            request.fetchSource(true);
            UpdateResponse response;
            try {
                log.info("className = {}, restHighLevelClient.update req = {}", className, JSON.toJSONString(request));
                response = restHighLevelClient.update(request, RequestOptions.DEFAULT);
                log.info("className = {}, restHighLevelClient.update res = {}", className, JSON.toJSONString(response));
            } catch (ElasticsearchException | IOException e) {
                log.info("className = {}, 同步es失败, userId = {}", className, po.getUserId());
                throw new RuntimeException(e);
            }
            if (response != null) {
                if (response.getResult() == DocWriteResponse.Result.CREATED) {
                    log.info("className = {}, 新增文档成功, userId = {}", className, po.getUserId());
                } else if (response.getResult() == DocWriteResponse.Result.UPDATED) {
                    log.info("className = {}, 修改文档成功, userId = {}", className, po.getUserId());
                }
            }
        } else {
            IndexRequest request = new IndexRequest(ElasticsearchConfig.ES_INDEX_NAME, ElasticsearchConfig.ES_TYPE_NAME);
            request.source(JSON.toJSONString(po), XContentType.JSON);
            IndexResponse response;
            try {
                log.info("className = {}, restHighLevelClient.index req = {}", className, JSON.toJSONString(request));
                response = restHighLevelClient.index(request, RequestOptions.DEFAULT);
                log.info("className = {}, restHighLevelClient.index res = {}", className, JSON.toJSONString(response));
            } catch (ElasticsearchException | IOException e) {
                log.info("className = {}, 同步es失败, userId = {}", className, po.getUserId());
                throw new RuntimeException(e);
            }
            if (response != null) {
                if (response.getResult() == DocWriteResponse.Result.CREATED) {
                    log.info("className = {}, 新增文档成功, userId = {}", className, po.getUserId());
                } else if (response.getResult() == DocWriteResponse.Result.UPDATED) {
                    log.info("className = {}, 修改文档成功, userId = {}", className, po.getUserId());
                }
            }
        }
    }

}
  • UserInfoDao、UserInfoDaoImpl :负责UserInfo的curd操作实现
public interface UserInfoDao {

    /**
     * 新增或更新用户基本信息
     *
     * @param userInfo 用户基本信息
     */
    void insertOrUpdate(UserInfo userInfo);

}

@Slf4j
@Service
public class UserInfoDaoImpl extends BaseDao implements UserInfoDao {


    @Override
    public void insertOrUpdate(UserInfo userInfo) {
        insertOrUpdate(userInfo, UserInfo.class);
    }

}
  • UserSyncService、BaseUserSyncService、UserInfoSyncServiceImpl:Service层接口与实现
public interface UserSyncService {

    /**
     * 根据userId同步人物画像
     *
     * @param userId userId
     */
    void syncByUserId(Long userId);
}

public abstract class BaseUserSyncService implements UserSyncService {

    @Resource
    protected EsSyncMapper esSyncMapper;
    @Resource
    private RedisUtil redisUtil;

    private static final String REDIS_KEY_PREFIX = "UserSyncService:";

    /**
     * 根据userId同步
     *
     * @param userId userId
     */
    @Override
    public void syncByUserId(Long userId) {
        String redisKey = REDIS_KEY_PREFIX + userId;
        try {
            redisUtil.repetitionRequestLockOrElseThrow(redisKey);
        } catch (Exception e) {
            throw new EsSyncConcurrentLockException(e);
        }
        BaseEsPo po = selectOneByUserId(userId);
        insertOrUpdate(po);
        redisUtil.remove(redisKey);
    }

    /**
     * 根据userId查询一条数据
     *
     * @param userId userId
     * @return po
     */
    protected abstract BaseEsPo selectOneByUserId(Long userId);

    /**
     * 同步数据到es
     *
     * @param po 对象
     */
    protected abstract void insertOrUpdate(BaseEsPo po);
}

@Service
public class UserInfoSyncServiceImpl extends BaseUserSyncService {

    @Resource
    private UserInfoDao userInfoDao;

    @Override
    protected BaseEsPo selectOneByUserId(Long userId) {
        return esSyncMapper.selectUserInfoByUserId(userId);
    }

    @Override
    protected void insertOrUpdate(BaseEsPo po) {
        userInfoDao.insertOrUpdate((UserInfo) po);
    }
}
  • 生产者定时任务BaseProducer、UserInfoProducer
@Slf4j
public abstract class BaseProducer {

    @Resource
    private EsSyncMapper esSyncMapper;
    @Resource
    private RedisUtil redisUtil;
    @Value("${es.producer.log.enable:0}")
    protected Integer logEnable;


    public void produce(Integer enable, String producerQueueRedisKey, String updateTimeStartRedisKey,
                        String defaultUpdateTimeStart, Integer maxSize, String primaryKeyColumnName,
                        String uniqueColumnName, String tableName, String updateColumnName) {
        long startTime = System.currentTimeMillis();
        log.info("produce start");
        String taskName = this.getClass().getSimpleName();
        if (enable != null && enable == 0) {
            log.info("taskName = {}, produce disable end, cost : {} ms", taskName, System.currentTimeMillis() - startTime);
            return;
        }
        String updateTimeStart = redisUtil.get(updateTimeStartRedisKey);
        if (StringUtils.isBlank(updateTimeStart)) {
            updateTimeStart = defaultUpdateTimeStart;
        }
        String updateTimeEnd = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
        Long latestMaxPrimaryKey = 0L;
        List<Long> updatedDataPrimaryKeys;
        do {
            log.info("taskName = {}, primaryKeyColumnName = {}, tableName = {}, updateColumnName = {}, updateTimeStart = {}, updateTimeEnd = {}, latestMaxPrimaryKey = {}, maxSize = {}",
                    taskName, primaryKeyColumnName, tableName, updateColumnName, updateTimeStart, updateTimeEnd, latestMaxPrimaryKey, maxSize);
            updatedDataPrimaryKeys = esSyncMapper.selectUpdatedDataPrimaryKey(primaryKeyColumnName, tableName, updateColumnName, updateTimeStart, updateTimeEnd, latestMaxPrimaryKey, maxSize);
            log.info("taskName = {}, updatedDataPrimaryKeys size = {}", taskName, updatedDataPrimaryKeys.size());
            if (CollectionUtils.isNotEmpty(updatedDataPrimaryKeys)) {
                if (DelStatus.DELETED.getCode() == logEnable) {
                    log.info("updatedDataPrimaryKeys = {}", JSON.toJSONString(updatedDataPrimaryKeys));
                }
                if (CollectionUtils.size(updatedDataPrimaryKeys) >= maxSize) {
                    log.info("更新数量超过阈值, maxSize = {}", maxSize);
                }
                Map<String, Double> scoreMembers = new HashMap<>(updatedDataPrimaryKeys.size());
                List<Map<String, Object>> updatedDatas = esSyncMapper.selectUniqueKeyByPrimaryKey(primaryKeyColumnName, tableName, uniqueColumnName, updateColumnName, updatedDataPrimaryKeys, maxSize);
                if (CollectionUtils.isNotEmpty(updatedDatas)) {
                    for (Map<String, Object> updatedData : updatedDatas) {
                        Object userIdObject = updatedData.get(uniqueColumnName);
                        String userIdString = String.valueOf(userIdObject);
                        Double score = redisUtil.zscore(producerQueueRedisKey, userIdString);
                        if (score != null) {
                            log.info("score != null, userIdString = {}", userIdString);
                        } else {
                            log.info("score == null, userIdString = {}", userIdString);
                            LocalDateTime updateTime = (LocalDateTime) updatedData.get(updateColumnName);
                            Double updateTimeDouble = (double) (updateTime == null ? 0L : updateTime.toEpochSecond(DateUtil.ZONE_OFFSET));
                            scoreMembers.put(userIdString, updateTimeDouble);
                        }
                    }
                    if (!scoreMembers.isEmpty()) {
                        if (DelStatus.DELETED.getCode() == logEnable) {
                            log.info("scoreMembers = {}", JSON.toJSONString(scoreMembers));
                        }
                        redisUtil.zadd(producerQueueRedisKey, scoreMembers);
                    }
                }
                latestMaxPrimaryKey = updatedDataPrimaryKeys.get(updatedDataPrimaryKeys.size() - 1);
            } else {
                latestMaxPrimaryKey = 0L;
            }
        } while (CollectionUtils.size(updatedDataPrimaryKeys) >= maxSize && latestMaxPrimaryKey > 0L);
        redisUtil.set(updateTimeStartRedisKey, updateTimeEnd, 60 * 60 * 24 * 30);
        log.info("taskName = {}, produce end, cost : {} ms", taskName, System.currentTimeMillis() - startTime);
    }
}
@Slf4j
@Configuration
@EnableScheduling
public class UserInfoProducer extends BaseProducer {

    @Value("${UserInfoProducer.enable:0}")
    private Integer enable;
    @Value("${UserInfoProducer.default.updateTimeStart:2021-07-09 14:00:00}")
    private String defaultUpdateTimeStart;
    @Value("${UserInfoProducer.maxSize:1}")
    private Integer maxSize;
    @Value("${UserInfoProducer.queue.redis.key:UserInfoProducer:queue:9}")
    private String producerQueueRedisKey;
    @Value("${UserInfoProducer.update.time.start.redis.key:UserInfoProducer:updateTimeStart:9}")
    private String producerUpdateTimeStartRedisKey;

    @Scheduled(initialDelay = 10000, fixedDelayString = "${UserInfoProducer.fixedDelayString:1000}")
    public void produce() {
        try {
            String primaryKeyColumnName = "id";
            String uniqueColumnName = "userId";
            String tableName = "user_extend";
            String updateColumnName = "update_time";
            produce(enable, producerQueueRedisKey, producerUpdateTimeStartRedisKey,
                    defaultUpdateTimeStart, maxSize, primaryKeyColumnName,
                    uniqueColumnName, tableName, updateColumnName);
        } catch (Throwable throwable) {
            log.error(String.format("UserInfoProducer:发生未知异常, e = %s", throwable), throwable);
        }
    }

}
  • 消费者定时任务BaseConsumer、UserInfoConsumer
@Slf4j
public abstract class BaseConsumer {

    @Resource
    protected RedisUtil redisUtil;
    @Value("${es.consumer.queue.size.alarm.enable:0}")
    protected Integer queueSizeAlarmEnable;
    @Value("${es.consumer.queue.size.threshold.multiple:10}")
    protected Integer queueSizeThresholdMultiple;

    private final UserSyncService userSyncService;

    protected BaseConsumer(UserSyncService userSyncService) {
        this.userSyncService = userSyncService;
    }

    public void consume(Integer enable, Integer consumeSize, String queueName) {
        long startTime = System.currentTimeMillis();
        String taskName = this.getClass().getSimpleName();
        log.info("taskName = {}, consume start, enable = {}, consumeSize = {}, queueName = {}", taskName, enable, consumeSize, queueName);
        if (enable != null && enable == 0) {
            log.info("taskName = {}, consume end, cost : {} ms", taskName, System.currentTimeMillis() - startTime);
            return;
        }
        long currentSecond = LocalDateTime.now().toEpochSecond(ZoneOffset.of("+8"));
        Set<String> uniqueValueString = redisUtil.zrangeByScore(queueName, 0, currentSecond, 0, consumeSize);
        log.info("taskName = {}, userIdStrings size = {}, content = {}", taskName, uniqueValueString.size(), JSON.toJSONString(uniqueValueString));
        uniqueValueString.removeIf(Objects::isNull);
        uniqueValueString.removeIf(item -> Long.parseLong(item) == 0);
        for (String uniqueValue : uniqueValueString) {
            Long userId = null;
            try {
                userId = Long.valueOf(uniqueValue);
                userSyncService.syncByUserId(userId);
                redisUtil.zrem(queueName, uniqueValue);
            } catch (Throwable throwable) {
                if (throwable instanceof EsSyncConcurrentLockException) {
                    double updateTimeDouble = LocalDateTime.now().toEpochSecond(DateUtil.ZONE_OFFSET) + 60;
                    redisUtil.zadd(queueName, updateTimeDouble, uniqueValue);
                    log.info("并发同步ES, userId = {}", userId);
                } else {
                    throw throwable;
                }
            }
        }
        if (DelStatus.DELETED.getCode() == queueSizeAlarmEnable) {
            Long queueSize = redisUtil.zcard(queueName);
            if ((long) queueSizeThresholdMultiple * consumeSize < queueSize) {
                log.info("taskName = {}, 生产堆积, queueName = {}", taskName, queueName);
            }
        }
        log.info("taskName = {}, consume end, cost : {} ms", taskName, System.currentTimeMillis() - startTime);
    }

}
@Slf4j
@Configuration
@EnableScheduling
public class UserInfoConsumer extends BaseConsumer {

    @Value("${UserInfoConsumer.enable:0}")
    private Integer enable;
    @Value("${UserInfoConsumer.consumeSize:1}")
    private Integer consumeSize;
    @Value("${UserInfoProducer.queue.redis.key:UserInfoProducer:queue:9}")
    private String producerQueueRedisKey;

    protected UserInfoConsumer(UserInfoSyncServiceImpl userPortraitSyncService) {
        super(userPortraitSyncService);
    }


    @Scheduled(initialDelay = 10000, fixedDelayString = "${UserInfoConsumer.fixedDelayString:1000}")
    public void consume() {
        try {
            super.consume(enable, consumeSize, producerQueueRedisKey);
        } catch (Throwable throwable) {
            log.error(String.format("UserInfoConsumer:发生未知异常, e = %s", throwable), throwable);
        }
    }
}