基于disruptor的责任链模式的canal同步elasticsearch,实现思路整体比较清晰,特别适用于基础设施比较缺乏,运维整体能力偏弱的情况。
- 引入依赖
<!-- https://mvnrepository.com/artifact/org.jodd/jodd-core -->
<dependency>
<groupId>org.jodd</groupId>
<artifactId>jodd-core</artifactId>
<version>5.3.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>30.1-jre</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba.otter/canal.client -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.code.gson/gson -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.6</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.lmax/disruptor -->
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.2</version>
</dependency>
- 数据封装类
import java.io.Serializable;
import java.util.Map;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import lombok.Data;
@Data
public class InnerBinlogEntry implements Serializable {
private static final long serialVersionUID = 1L;
/**
* es mapping信息
*/
private String esMapping;
/**
* canal原生的Entry
*/
private Entry entry;
/**
* 该Entry归属于的表名
*/
private String tableName;
/**
* 该Entry归属数据库名
*/
private String schemaName;
/**
* 该Entry本次的操作类型,对应canal原生的枚举;EventType.INSERT; EventType.UPDATE; EventType.DELETE;
*/
private EventType eventType;
/**
* 字段数据
*/
private Map<String, Object> rows;
}
3.责任链抽象类
import com.elastic.entity.InnerBinlogEntry;
public abstract class AbstractSyncHandler {
/**
* 下一个责任链成员
*/
protected AbstractSyncHandler nextHandler;
public AbstractSyncHandler getNextHandler() {
return nextHandler;
}
public void setNextHandler(AbstractSyncHandler nextHandler) {
this.nextHandler = nextHandler;
}
public abstract void handleSync(InnerBinlogEntry innerBinlogEntry);
}
4.责任链工厂类
import com.google.common.base.Preconditions;
import jodd.typeconverter.TypeConverterManager;
import org.springframework.beans.BeanUtils;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
public class HandlerFactory {
private static TypeConverterManager typeConverterManager = TypeConverterManager.get();
/**
* 责任链
*/
@SafeVarargs
public static AbstractSyncHandler getHandlerResponsibilityChain(Class< ? extends AbstractSyncHandler> ... handlers ) {
Preconditions.checkNotNull(handlers, "handler列表不能为空");
List list = Arrays.stream(handlers).map(BeanUtils::instantiateClass).collect(Collectors.toList());
AbstractSyncHandler result = null;
for (int i = 1; i < list.size(); i++) {
AbstractSyncHandler pre = typeConverterManager.convertType(list.get(i - 1), handlers[i - 1]);
AbstractSyncHandler cur = typeConverterManager.convertType(list.get(i), handlers[i]);
cur.setNextHandler(pre);
result = cur;
}
return result;
}
}
5.责任链实现类
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.mol.elastic.base.AbstractSyncHandler;
import com.mol.elastic.entity.InnerBinlogEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
public class ProductHandler extends AbstractSyncHandler {
protected final static Logger logger = LoggerFactory.getLogger(ProductHandler.class);
@Override
public void handleSync(InnerBinlogEntry innerBinlogEntry) {
String schemaName = innerBinlogEntry.getSchemaName();
String tableName = innerBinlogEntry.getTableName();
String esMapping = innerBinlogEntry.getEsMapping();
CanalEntry.EventType syncEnum = innerBinlogEntry.getEventType();
Map<String, Object> column = innerBinlogEntry.getRow();
if (schemaName.equals("mall") && tableName.equals("mall_product")) {
try {
if (syncEnum == CanalEntry.EventType.INSERT) {
//ES数据插入
}
if (syncEnum == CanalEntry.EventType.DELETE) {
//ES数据删除
}
if (syncEnum == CanalEntry.EventType.UPDATE) {
//ES数据更新
}
} catch (Exception e) {
e.printStackTrace();
logger.error("处理mall_product失败:", e);
}
} else {
if (nextHandler == null) {
return;
}
nextHandler.handleSync(innerBinlogEntry);
}
}
}