Mysql-binlog-connect-java
一、核心类:BinaryLogClient
BinaryLogClient 是一个功能完整的 MySQL 二进制日志客户端,主要特点包括:
1、核心功能
- 实时数据同步:监听 MySQL binlog 事件,获取数据库变更
- 多数据库支持:支持 MySQL 和 MariaDB
- GTID 支持:完整的全局事务标识符支持
- SSL 安全连接:多种 SSL 模式支持
- 自动重连:内置保活和重连机制
2、架构设计
- 事件驱动:基于监听器模式处理事件
- 线程安全:使用 CopyOnWriteArrayList 和锁机制
- 资源管理:完善的连接和线程生命周期管理
- 错误处理:全面的异常处理和恢复机制
3、使用场景
- 数据同步和复制
- 实时数据变更监控
- 数据备份和恢复
- 数据分析和审计
- 这个客户端库为 Java 应用程序提供了与 MySQL 二进制日志交互的完整解决方案,是构建数据同步、实时监控等系统的重要基础组件。
二、核心方法 connect
public void connect() throws IOException, IllegalStateException {
// 1. 获取连接锁,防止重复连接
if (!connectLock.tryLock()) {
throw new IllegalStateException("BinaryLogClient is already connected");
}
try {
// 2. 建立网络连接
channel = openChannel();
// 3. 接收服务器问候包
GreetingPacket greetingPacket = receiveGreeting();
// 4. 解析数据库版本
resolveDatabaseVersion(greetingPacket);
// 5. 尝试升级到 SSL(如果需要)
tryUpgradeToSSL(greetingPacket);
// 6. 进行身份认证
new Authenticator(greetingPacket, channel, schema, username, password).authenticate();
// 7. 设置连接参数
setupConnection();
// 8. 请求二进制日志流
requestBinaryLogStream();
// 9. 开始监听事件
listenForEventPackets();
} finally {
connectLock.unlock();
}
}
三、listenForEventPackets 方法
listenForEventPackets()
是 BinaryLogClient
的核心方法,负责持续监听 MySQL 服务器发送的二进制日志事件包。
1. 方法概述
private void listenForEventPackets() throws IOException {
ByteArrayInputStream inputStream = channel.getInputStream();
boolean completeShutdown = false;
// ... 事件监听循环
}
作用:
- 持续读取 MySQL 服务器发送的 binlog 事件包
- 解析事件并通知注册的监听器
- 处理各种异常情况
- 维护连接状态和位置信息
2. 主要处理流程
2.1 初始化阶段
ByteArrayInputStream inputStream = channel.getInputStream();
boolean completeShutdown = false;
- 获取网络通道的输入流
- 初始化关闭标志
2.2 主循环 - 持续监听事件
while (inputStream.peek() != -1) {
// 处理每个数据包
}
循环条件:inputStream.peek() != -1
表示还有数据可读
2.3 数据包解析
步骤1:读取数据包头部
int packetLength = inputStream.readInteger(3); // 读取3字节的数据包长度
inputStream.skip(1); // 跳过1字节的序列号
int marker = inputStream.read(); // 读取1字节的标记
MySQL 协议数据包格式:
+------------------+------------------+------------------+------------------+
| Length | Sequence | Payload | Marker |
| (3 bytes) | (1 byte) | (variable) | (1 byte) |
+------------------+------------------+------------------+------------------+
步骤2:处理特殊标记
错误包处理 (0xFF):
if (marker == 0xFF) {
ErrorPacket errorPacket = new ErrorPacket(inputStream.read(packetLength - 1));
throw new ServerException(errorPacket.getErrorMessage(), errorPacket.getErrorCode(),
errorPacket.getSqlState());
}
EOF 包处理 (0xFE):
if (marker == 0xFE && !blocking) {
completeShutdown = true;
break;
}
- 在非阻塞模式下,EOF 包表示服务器主动断开连接
2.4 事件反序列化
Event event;
try {
event = eventDeserializer.nextEvent(packetLength == MAX_PACKET_LENGTH ?
new ByteArrayInputStream(readPacketSplitInChunks(inputStream, packetLength - 1)) :
inputStream);
if (event == null) {
throw new EOFException();
}
} catch (Exception e) {
// 处理反序列化异常
}
关键逻辑:
- 如果数据包长度等于
MAX_PACKET_LENGTH
(16777215),说明数据被分片 - 需要调用
readPacketSplitInChunks()
重新组装数据 - 否则直接使用原始输入流
2.5 大包分片处理
private byte[] readPacketSplitInChunks(ByteArrayInputStream inputStream, int packetLength) throws IOException {
byte[] result = inputStream.read(packetLength);
int chunkLength;
do {
chunkLength = inputStream.readInteger(3); // 读取下一个分片长度
inputStream.skip(1); // 跳过序列号
result = Arrays.copyOf(result, result.length + chunkLength); // 扩展结果数组
inputStream.fill(result, result.length - chunkLength, chunkLength); // 填充数据
} while (chunkLength == Packet.MAX_LENGTH); // 如果长度等于最大值,说明还有更多分片
return result;
}
分片重组逻辑:
- MySQL 协议限制单个数据包最大 16MB
- 超过限制的数据会被分成多个包
- 需要重新组装成完整的数据
2.6 事件处理
if (isConnected()) {
eventLastSeen = System.currentTimeMillis(); // 更新最后事件时间
updateGtidSet(event); // 更新 GTID 信息
notifyEventListeners(event); // 通知事件监听器
updateClientBinlogFilenameAndPosition(event); // 更新位置信息
}
处理步骤:
- 时间戳更新:用于保活机制检测
- GTID 更新:维护全局事务标识符状态
- 事件通知:调用所有注册的事件监听器
- 位置更新:更新当前 binlog 文件名和位置
3. 异常处理机制
3.1 反序列化异常
} catch (Exception e) {
Throwable cause = e instanceof EventDataDeserializationException ? e.getCause() : e;
if (cause instanceof EOFException || cause instanceof SocketException) {
throw e; // 重新抛出网络相关异常
}
if (isConnected()) {
for (LifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onEventDeserializationFailure(this, e);
}
}
continue; // 跳过当前事件,继续处理下一个
}
处理策略:
- 网络异常:重新抛出,终止连接
- 数据异常:通知监听器,跳过当前事件
- 继续处理:不中断整个监听循环
3.2 通信异常
} catch (Exception e) {
if (isConnected()) {
for (LifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onCommunicationFailure(this, e);
}
}
}
4. 资源清理
} finally {
if (isConnected()) {
if (completeShutdown) {
// 完全关闭(包括保活线程)
disconnect();
} else {
// 只关闭网络连接
disconnectChannel();
}
}
}
清理策略:
- 完全关闭:服务器主动断开时,关闭所有资源
- 部分关闭:网络异常时,只关闭网络连接,保活线程会尝试重连
5. 方法特点
5.1 阻塞性
- 方法会阻塞直到连接断开
- 在
connect()
方法中调用,是连接过程的核心
5.2 容错性
- 单个事件解析失败不会终止整个监听
- 网络异常会触发重连机制
5.3 实时性
- 持续监听,实时处理事件
- 立即通知监听器,无缓冲延迟
6. 使用场景
// 在 connect() 方法中的调用
public void connect() throws IOException, IllegalStateException {
// ... 连接建立逻辑
try {
// ... 其他初始化
listenForEventPackets(); // 开始监听事件(阻塞)
} finally {
connectLock.unlock();
}
}
7. 总结
listenForEventPackets()
方法实现了:
- 持续监听:循环读取 MySQL 事件包
- 协议解析:处理 MySQL 二进制日志协议
- 事件分发:将解析的事件通知给监听器
- 状态维护:更新 GTID、位置等状态信息
- 异常处理:优雅处理各种异常情况
- 资源管理:确保连接和资源正确清理
这是整个 BinaryLogClient
的核心,负责将 MySQL 的二进制日志事件转换为 Java 对象并分发给应用程序。
四、完整流程图
graph TD
A[客户端启动] --> B[注册监听器]
B --> C[调用 connect]
C --> D[建立网络连接]
D --> E[身份认证]
E --> F[触发 onConnect]
F --> G[开始监听事件]
G --> H[接收数据包]
H --> I{解析事件}
I -->|成功| J[触发 onEvent]
I -->|失败| K[触发 onEventDeserializationFailure]
J --> L[更新位置信息]
L --> H
K --> H
H --> M{连接状态}
M -->|正常| H
M -->|异常| N[触发 onCommunicationFailure]
M -->|断开| O[触发 onDisconnect]
N --> P[尝试重连]
P --> D
O --> Q[结束]