disruptor初始化类
disruptor的初始化和相关组件使用都是固定的套路:消息体-时间-事件工厂-生产者方法-消费者-启动-关闭。
canal同步elasticsearch实战只是抛砖引玉,为大家提供一种实现思路和架构设计方案,难免有不足之处,请大家谅解。大家可以根据自己的实际情况做一下扩展:
- 实现数据的串并联及WorkerPool处理,只需要对disruptor.handleEvents进行改造即可
disruptor.handleEventsWith().then().thenHandleEventsWithWorkerPool()
- 分库分表,比如一个库一个disruptor,一个库的表比较多时可以再对表进行分组使用不同的disruptor处理,这样我们可以有效隔离不同库的数据处理,同时也可以对热点表分配更多的资源,提高其处理效能。
- disruptor是线程级的,无持久化,没有监控API,没有宕掉后的自动恢复机制,也不能被其他程序访问,因此disruptor的使用成本还是非常高的,大家必须有心理准备。
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.gson.GsonBuilder;
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import com.mol.elastic.base.AbstractSyncHandler;
import com.mol.elastic.base.HandlerFactory;
import com.mol.elastic.entity.InnerBinlogEntry;
import lombok.Builder;
import lombok.Data;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ThreadFactory;
@Component
public class messageEventQueueHelper implements InitializingBean, DisposableBean {
/**
* Disruptor 对象
*/
private Disruptor<MessageEvent> disruptor;
/**
* RingBuffer线程安全无锁
*/
private RingBuffer<MessageEvent> ringBuffer;
/**
* initQueue
*/
private List<MessageModel> initQueue = Collections.synchronizedList(new ArrayList<>());
/**
* 初始化
*/
public void init() {
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("DisruptorThreadPool").build();
//RingBuffer生产工厂,初始化RingBuffer的时候使用
EventFactory<MessageEvent> factory = () -> new MessageEvent();
//单线程模式,获取额外的性能
disruptor = new Disruptor<>(factory, 1024, namedThreadFactory, ProducerType.SINGLE, new YieldingWaitStrategy());
disruptor.setDefaultExceptionHandler(new MyHandlerException<>());
disruptor.handleEventsWith(new MessageHandler());
ringBuffer = disruptor.start();
//初始化数据发布,如果某个请求的sequence未被提交将会堵塞后续的发布操作或者其他的producer
for (MessageModel data : initQueue) {
ringBuffer.publishEvent((event, sequence, result) -> event.setMessageModel(result), data);
}
}
/**
* 插入队列消息,支持在对象init前插入队列,则在队列建立时立即发布到队列处理.
*/
public void publishEvent(MessageModel data) {
if (ringBuffer == null) {
initQueue.add(data);
return;
}
ringBuffer.publishEvent((event, sequence, result) -> event.setMessageModel(result), data);
}
@Override
public void destroy() throws Exception {
disruptor.shutdown();//关闭 disruptor,方法会堵塞,直至所有的事件都得到处理。
}
@Override
public void afterPropertiesSet() throws Exception {
init();
}
/**
* 消息体,通过JSON数据传输
*/
@Data
@Builder
public class MessageModel {
private String message;
}
/**
* 创建事件
*/
@Data
public class MessageEvent {
private MessageModel messageModel;
}
/**
* 创建消费者
*/
public class MessageHandler implements EventHandler<MessageEvent> {
@Override
public void onEvent(MessageEvent event, long sequence, boolean endOfBatch) throws Exception {
//具体消费逻辑
InnerBinlogEntry innerBinlogEntry = new GsonBuilder().create().fromJson(event.getMessageModel().getMessage(), InnerBinlogEntry.class);
AbstractSyncHandler handler = HandlerFactory.getHandlerResponsibilityChain();
handler.handleSync(innerBinlogEntry);
}
}
// 异常处理类
protected class MyHandlerException<T> implements ExceptionHandler<T> {
private Logger logger = LoggerFactory.getLogger(MyHandlerException.class);
@Override
public void handleEventException(Throwable ex, long sequence, T event) {
ex.printStackTrace();
logger.error("process data error sequence ==[{}] event==[{}] ,ex ==[{}]", sequence, event.toString(), ex.getMessage());
}
@Override
public void handleOnStartException(Throwable ex) {
logger.error("start disruptor error ==[{}]!", ex.getMessage());
}
@Override
public void handleOnShutdownException(Throwable ex) {
logger.error("shutdown disruptor error ==[{}]!", ex.getMessage());
}
}
}