从零开始构建一个任务流水线:Java 版

背景

任务流水线在开发者日常的工作中算是比较常见的任务。比如读 MySQL,然后往 Redis 或者 ElasticSearch (后文称为 ES)中灌数。

一般来讲,应付一下需求,迅速弄出一个快糙猛的方案,活干完就得了。但是,一而再再而三的接到这样的类似的需求,不管是提高效率,还是节省自己体力,最后总会想想怎么能偷点懒,更快更省事的弄出来。

本文探讨一种通用的简易方案。

需求

业务系统使用 ES 作为 DB 查询的加速器,ES 是很久很久的一个版本,需要升级到较新版本,因此,需要将全量数据(约上亿量级)从 MySQL 索引至新升级的 ES。

约束条件:

  • 总量 1 亿
  • 控制跑数机器的资源使用量
  • 跑一次全量索引用时尽量短

Approach I

本着先易后难,先具体后抽象的原则,将 Pipeline 总体框架构思出来。

API 设计

Endpoint

POST /api/backdoor/db-to-es

请求参数

名称 说明
offset 查询数据库起始偏移量
pageSize 查询数据库分页大小,简化起见,同时也是 ES 批量索引大小
maxId 最大查询到 ID 为止
esIndex ES 索引名称
esIndexType ES Type 名称
esEndpoint ES API 地址

请求举例

{ "offset": 0, "pageSize": 1000, "maxId": 10000000, "esIndex": "fake_fake3", "esIndexType": "fake_person", "esEndpoint": "http://localhost:9200" }

响应

原则上,API 提交跑数任务后,立即返回。

举例:

Submit OK

实现

逻辑上很简单,取数据,拆分任务提交线程池,执行任务写 ES。流程上,类似上图。

按页查询数据

按照分页的参数,使用 jdbc-template 查询出来对象列表

@Autowired private JdbcTemplate jdbcTemplate; private List<FakePerson> findPersonPage(final QueryPageParams pageParams) { final long offset = pageParams.offset; final long maxId = pageParams.maxId; final int pageSize = pageParams.pageSize; StringBuilder sb = new StringBuilder("("); long ceil = Math.min(offset+pageSize, maxId); for (long id = offset; id < ceil; id++) { sb.append("'") .append(id+1) .append("'"); if (id+1 < ceil) { sb.append(","); } } sb.append(")"); String sql = "select id,first_name,last_name,full_name,email from fake_person where id in " + sb.toString(); List<FakePerson> personList = jdbcTemplate.query( sql, (rs, rowNum) -> { FakePerson p = new FakePerson(); p.setId(rs.getLong("id")); p.setEmail(rs.getString("email")); p.setFirstName(rs.getString("first_name")); p.setLastName(rs.getString("last_name")); p.setFullName(rs.getString("full_name")); p.setTimestamp(new Date()); return p; }); return personList; }

独立提出来的分页逻辑,通过迭代器,将分页查询和分页逻辑组合在一起。

@Data @AllArgsConstructor @Builder static class QueryPageParams { long offset; int pageSize; long maxId; } private Iterator<List<FakePerson>> pageIterator( final SyncRequest syncRequest, final Function<QueryPageParams, List<FakePerson>> fn) { return new Iterator<List<FakePerson>>() { long offset = syncRequest.offset; long max = syncRequest.maxId; @Override public boolean hasNext() { return this.offset < this.max; } @Override public List<FakePerson> next() { List<FakePerson> page = fn.apply(QueryPageParams.builder() .offset(offset) .pageSize(syncRequest.pageSize) .maxId(syncRequest.maxId) .build()); this.offset += syncRequest.pageSize; return page; } }; }

写数至 ES

由于 ES 本身版本演进较快,官方的 Client 和社区的 Client 都较为混乱,考虑到这里对 ES 的操作需求很明确,就是 批量索引 Doc. 这里采用了 okhttp3 这个库,如果你是基于 Spring Boot 开发应用,okhttp3 是 Spring Boot 包含的依赖,简单添加就好。

修改 pom.xml

<dependency> <groupId>com.squareup.okhttp3</groupId> <artifactId>okhttp</artifactId> </dependency>

ES 批量索引 API 的格式如下,基于文本的协议。

POST <ES_HOST>/_bulk {"index": {"_index": "index-name", "_type": "type-name", "_id": "ID"}}\n {"id": <ID>, "field1": "value1", ...}

请求体的第一行称为指令行,本例为索引指令。第二行是序列化为 JSON 字符串的 Doc。再有第二个 Doc, 重复此格式。

private OkHttpClient httpClient = new OkHttpClient(); // 直接使用 HTTP Client 访问 ES private ObjectMapper objectMapper = new ObjectMapper(); // JSON 序列化 private static final String NEW_LINE = "\n"; // 将 Page 写入到 ES private void writePageToES( final Collection<FakePerson> page, String esIndex, String esIndexType, String esEndpoint) throws IOException { StringBuilder bulkRequest = new StringBuilder(); for (FakePerson p : page) { String action = "{\"index\":{\"_index\":\""+esIndex+"\",\"_type\":\""+esIndexType+"\",\"_id\":\""+p.getId()+"\"}}"; String json = objectMapper.writeValueAsString(p); // 将 Page 序列化为 JSON bulkRequest.append(action).append(NEW_LINE).append(json).append(NEW_LINE); } Request request = new Request.Builder() .url(esEndpoint + "/_bulk") .post(okhttp3.RequestBody .create(MediaType.parse("application/json"), bulkRequest.toString())) .build(); Response response = httpClient.newCall(request).execute(); if (response != null) { response.close(); } if (response.code() > 201) { throw new IOException("write to ES got error, response code = " + response.code()); } }

任务调度

任务执行请求定义

@Data @NoArgsConstructor @AllArgsConstructor public static class SyncRequest { long offset; int pageSize; long maxId; String esIndex; String esIndexType; // ES7 不再支持 Type String esEndpoint; // ES HTTP 接口地址,例如 http://localhost:9200 }

最后,任务调度逻辑,任务提交线程池执行。

private final ExecutorService executorService = Executors.newFixedThreadPool(8); private volatile boolean db2esRunning = false; @PostMapping("/backdoor/db2es-v1") public Object syncDBToES(@RequestBody SyncRequest syncRequest) { if (db2esRunning) { return "Can't, db2esRunning is true"; } if (syncRequest.offset < 0) { return "ERROR: offset must >= 0"; } if (syncRequest.pageSize < 1) { return "ERROR: pageSize must > 0"; } if (syncRequest.maxId <= 0) { return "ERROR: maxId must > 0"; } if (syncRequest.maxId < syncRequest.offset) { return "ERROR: maxId must > offset"; } if (StringUtils.isEmpty(syncRequest.esIndex)) { return "ERROR: esIndex required"; } if (StringUtils.isEmpty(syncRequest.esIndexType)) { return "ERROR: esIndexType required when ES version before 7"; } if (StringUtils.isEmpty(syncRequest.esEndpoint)) { return "ERROR: esEndpoint required, format <http://localhost:9200>"; } final Iterator<List<FakePerson>> pageSource = pageIterator(syncRequest, this::findPersonPage); final Semaphore maxQueuedTask = new Semaphore(20); new Thread(() -> { long startAt = System.currentTimeMillis(); try { BackdoorControllerV1.this.db2esRunning = true; while (pageSource.hasNext()) { try { maxQueuedTask.acquire(); // (1) 取 token final List<FakePerson> page = pageSource.next(); executorService.submit(() -> { try { BackdoorControllerV1.this.writePageToES( Collections.unmodifiableCollection(page), syncRequest.esIndex, syncRequest.esIndexType, syncRequest.esEndpoint); } catch (JsonProcessingException e) { // TODO: 处理异常 e.printStackTrace(); } catch (IOException e) { // TODO: 处理异常 e.printStackTrace(); } finally { maxQueuedTask.release(); // (2) 还 token } }); } catch (InterruptedException e) { e.printStackTrace(); // TODO: 处理异常 } } } finally { LOGGER.info("all sub-task was done, time duration is {}", System.currentTimeMillis() - startAt); BackdoorControllerV1.this.db2esRunning = false; // 重置标志位 } }).start(); return "Submit OK"; }

限制线程池任务排队数量:

  • (1) 取 Token,如果取不到,则阻塞等待,直到有任务执行完成,归还 Token
  • (2) 归还 Token。任务执行完毕,或者抛异常结束,都务必要通过 finally 归还 Token

© 2021, XZD