Appearance
SPI Event Listener Extension 说明文档
功能概述
spi-event-listener-extension 是一个 Keycloak 扩展,实现了 Keycloak 的事件监听器 SPI,用于将 Keycloak 的事件(如用户登录、登出、管理操作等)发送到消息队列,以便进行审计、监控或其他业务处理。
该扩展支持多种消息队列系统,包括 Kafka、RabbitMQ 和 RocketMQ,并且提供了可扩展的架构,允许添加新的消息通道实现。
界面预览

技术支持
- 技术支持: 北京必码科技工作室
- 官方文档: https://bima.cc
- 官方店铺: https://bima.taobao.com
- 联系邮箱: bima.cc@qq.com
- 联系微信: e18929958
核心组件
1. AuditEventListenerProvider
功能:实现了 Keycloak 的 EventListenerProvider 接口,负责监听 Keycloak 事件并将其发送到配置的消息通道。
主要方法:
onEvent(Event event):处理用户事件(如登录、登出等)- 参数:
event- Keycloak 用户事件对象,包含事件类型、用户ID、客户端ID等信息 - 返回值:无
- 功能:将用户事件转换为JSON字符串并发送到配置的消息通道
- 参数:
onEvent(AdminEvent adminEvent, boolean includeRepresentation):处理管理事件(如用户创建、角色分配等)- 参数:
adminEvent- Keycloak 管理事件对象,包含操作类型、资源类型、资源ID等信息includeRepresentation- 是否包含资源表示
- 返回值:无
- 功能:将管理事件转换为JSON字符串并发送到配置的消息通道
- 参数:
sendToMessageChannels(String message):将事件消息发送到所有配置的消息通道- 参数:
message- 要发送的消息字符串 - 返回值:无
- 功能:遍历所有注册的消息通道工厂,创建通道实例并发送消息
- 参数:
2. AuditEventListenerProviderFactory
功能:实现了 Keycloak 的 EventListenerProviderFactory 接口,负责创建 AuditEventListenerProvider 实例。
主要方法:
getId():返回提供者工厂的ID- 返回值:字符串,固定为 "bima-spi-event-listener-extension"
create(KeycloakSession session):创建事件监听器提供者实例- 参数:
session- Keycloak 会话对象 - 返回值:
AuditEventListenerProvider实例
- 参数:
init(Config.Scope config):初始化配置,注册消息通道工厂- 参数:
config- Keycloak 配置对象 - 返回值:无
- 功能:初始化消息通道工厂映射,注册 Kafka、RabbitMQ 和 RocketMQ 通道工厂
- 参数:
postInit(KeycloakSessionFactory factory):后初始化- 参数:
factory- Keycloak 会话工厂对象 - 返回值:无
- 功能:在所有提供者初始化后执行的操作
- 参数:
close():清理资源- 返回值:无
- 功能:清理消息通道工厂映射
3. 消息通道组件
MessageChannel 接口
功能:定义了消息通道的基本操作。
主要方法:
send(String message):发送消息- 参数:
message- 要发送的消息字符串 - 返回值:无
- 功能:将消息发送到消息队列
- 参数:
close():关闭通道- 返回值:无
- 功能:关闭消息通道,释放资源
MessageChannelFactory 接口
功能:定义了消息通道工厂的基本操作。
主要方法:
create(ComponentModel model):创建消息通道实例- 参数:
model- Keycloak 组件模型对象,包含通道配置 - 返回值:
MessageChannel实例 - 功能:根据配置创建消息通道实例
- 参数:
具体实现
KafkaChannel:发送消息到 Kafka
- 功能:将消息发送到 Kafka 主题
- 配置参数:
bootstrap.servers:Kafka 集群地址topic:目标主题名称acks:确认级别
RabbitMQChannel:发送消息到 RabbitMQ
- 功能:将消息发送到 RabbitMQ 队列
- 配置参数:
host:RabbitMQ 服务器地址port:RabbitMQ 服务器端口username:RabbitMQ 用户名password:RabbitMQ 密码queue:目标队列名称
RocketMQChannel:发送消息到 RocketMQ
- 功能:将消息发送到 RocketMQ 主题
- 配置参数:
namesrvAddr:NameServer 地址topic:目标主题名称producerGroup:生产者组名称
配置与使用
1. 开发环境搭建
前提条件
- JDK 11 或更高版本
- Maven 3.6 或更高版本
- Keycloak 17.0.0 或更高版本
- 消息队列系统(Kafka、RabbitMQ 或 RocketMQ)
编译构建
bash
cd spi-event-listener-extension
mvn clean package编译完成后,在 target 目录下会生成 spi-event-listener-extension-1.0.0.jar 文件。
2. 部署
将编译好的 JAR 文件放入 Keycloak 的 standalone/deployments 目录。
3. 配置
3.1 Keycloak 事件监听器配置
- 登录 Keycloak 管理控制台
- 进入 Realm 设置 → 事件 → 配置
- 在事件监听器中添加
bima-spi-event-listener-extension - 保存配置
3.2 消息队列配置
当前实现中,消息队列的配置是硬编码在 AuditEventListenerProviderFactory.init() 方法中的。在实际使用中,建议通过 Keycloak 的组件配置来管理这些设置。
配置示例:
java
// 在 AuditEventListenerProviderFactory.init() 方法中
@Override
public void init(org.keycloak.Config.Scope config) {
// 初始化消息通道工厂
channelFactories = new HashMap<>();
// 注册 Kafka 通道工厂
KafkaChannelFactory kafkaFactory = new KafkaChannelFactory();
// 设置默认配置
Map<String, String> kafkaConfig = new HashMap<>();
kafkaConfig.put("bootstrap.servers", "localhost:9092");
kafkaConfig.put("topic", "keycloak-events");
kafkaConfig.put("acks", "all");
kafkaFactory.setDefaultConfig(kafkaConfig);
channelFactories.put("kafka", kafkaFactory);
// 注册 RabbitMQ 通道工厂
RabbitMQChannelFactory rabbitFactory = new RabbitMQChannelFactory();
// 设置默认配置
Map<String, String> rabbitConfig = new HashMap<>();
rabbitConfig.put("host", "localhost");
rabbitConfig.put("port", "5672");
rabbitConfig.put("username", "guest");
rabbitConfig.put("password", "guest");
rabbitConfig.put("queue", "keycloak-events");
rabbitFactory.setDefaultConfig(rabbitConfig);
channelFactories.put("rabbitmq", rabbitFactory);
// 注册 RocketMQ 通道工厂
RocketMQChannelFactory rocketFactory = new RocketMQChannelFactory();
// 设置默认配置
Map<String, String> rocketConfig = new HashMap<>();
rocketConfig.put("namesrvAddr", "localhost:9876");
rocketConfig.put("topic", "keycloak-events");
rocketConfig.put("producerGroup", "keycloak-producer-group");
rocketFactory.setDefaultConfig(rocketConfig);
channelFactories.put("rocketmq", rocketFactory);
}4. 测试
4.1 单元测试
bash
cd spi-event-listener-extension
mvn test4.2 集成测试
- 启动 Keycloak 服务器
- 启动消息队列服务(如 Kafka)
- 执行用户操作(如登录、登出)
- 检查消息队列中是否收到事件消息
扩展与定制
添加新的消息通道
- 实现 MessageChannel 接口
java
public class CustomMessageChannel implements MessageChannel {
private String endpoint;
public CustomMessageChannel(String endpoint) {
this.endpoint = endpoint;
}
@Override
public void send(String message) {
// 实现消息发送逻辑
System.out.println("Sending message to " + endpoint + ": " + message);
}
@Override
public void close() {
// 实现资源清理逻辑
}
}- 实现 MessageChannelFactory 接口
java
public class CustomMessageChannelFactory implements MessageChannelFactory {
private Map<String, String> defaultConfig;
public void setDefaultConfig(Map<String, String> defaultConfig) {
this.defaultConfig = defaultConfig;
}
@Override
public MessageChannel create(ComponentModel model) {
// 从组件模型获取配置,如果没有则使用默认配置
String endpoint = model != null ?
model.getConfig().getFirst("endpoint") :
defaultConfig.get("endpoint");
return new CustomMessageChannel(endpoint);
}
}- 在 AuditEventListenerProviderFactory.init() 方法中注册新的通道工厂
java
@Override
public void init(org.keycloak.Config.Scope config) {
// 初始化消息通道工厂
channelFactories = new HashMap<>();
// 注册现有通道工厂...
// 注册自定义通道工厂
CustomMessageChannelFactory customFactory = new CustomMessageChannelFactory();
// 设置默认配置
Map<String, String> customConfig = new HashMap<>();
customConfig.put("endpoint", "http://localhost:8080/api/events");
customFactory.setDefaultConfig(customConfig);
channelFactories.put("custom", customFactory);
}自定义事件处理
可以修改 AuditEventListenerProvider.onEvent() 方法来定制事件处理逻辑,例如:
添加事件过滤
java
@Override
public void onEvent(Event event) {
try {
// 过滤不需要处理的事件类型
if (event.getType() == EventType.LOGIN_ERROR ||
event.getType() == EventType.REGISTER) {
// 将事件转换为JSON字符串
String eventJson = objectMapper.writeValueAsString(event);
// 发送到配置的消息通道
sendToMessageChannels(eventJson);
}
} catch (Exception e) {
e.printStackTrace();
}
}对事件进行转换或增强
java
@Override
public void onEvent(Event event) {
try {
// 创建增强的事件对象
Map<String, Object> enhancedEvent = new HashMap<>();
enhancedEvent.put("type", event.getType());
enhancedEvent.put("realmId", event.getRealmId());
enhancedEvent.put("clientId", event.getClientId());
enhancedEvent.put("userId", event.getUserId());
enhancedEvent.put("ipAddress", event.getIpAddress());
enhancedEvent.put("details", event.getDetails());
enhancedEvent.put("timestamp", event.getTime());
enhancedEvent.put("processedAt", System.currentTimeMillis());
enhancedEvent.put("environment", "production");
// 将增强的事件转换为JSON字符串
String eventJson = objectMapper.writeValueAsString(enhancedEvent);
// 发送到配置的消息通道
sendToMessageChannels(eventJson);
} catch (Exception e) {
e.printStackTrace();
}
}实现不同事件类型的不同处理策略
java
@Override
public void onEvent(Event event) {
try {
// 根据事件类型选择不同的处理策略
String eventJson;
switch (event.getType()) {
case LOGIN:
case LOGIN_ERROR:
// 登录事件特殊处理
eventJson = processLoginEvent(event);
break;
case REGISTER:
// 注册事件特殊处理
eventJson = processRegisterEvent(event);
break;
default:
// 其他事件默认处理
eventJson = objectMapper.writeValueAsString(event);
}
// 发送到配置的消息通道
sendToMessageChannels(eventJson);
} catch (Exception e) {
e.printStackTrace();
}
}
private String processLoginEvent(Event event) throws JsonProcessingException {
// 登录事件处理逻辑
Map<String, Object> loginEvent = new HashMap<>();
loginEvent.put("type", "LOGIN_ATTEMPT");
loginEvent.put("userId", event.getUserId());
loginEvent.put("clientId", event.getClientId());
loginEvent.put("ipAddress", event.getIpAddress());
loginEvent.put("success", event.getType() == EventType.LOGIN);
loginEvent.put("timestamp", event.getTime());
return objectMapper.writeValueAsString(loginEvent);
}
private String processRegisterEvent(Event event) throws JsonProcessingException {
// 注册事件处理逻辑
Map<String, Object> registerEvent = new HashMap<>();
registerEvent.put("type", "USER_REGISTERED");
registerEvent.put("userId", event.getUserId());
registerEvent.put("clientId", event.getClientId());
registerEvent.put("ipAddress", event.getIpAddress());
registerEvent.put("timestamp", event.getTime());
return objectMapper.writeValueAsString(registerEvent);
}代码结构
spi-event-listener-extension/
├── src/
│ ├── main/
│ │ ├── java/
│ │ │ └── cc/bima/keycloak/extension/event/
│ │ │ ├── AuditEventListenerProvider.java # 事件监听器提供者实现
│ │ │ ├── AuditEventListenerProviderFactory.java # 事件监听器提供者工厂
│ │ │ ├── KafkaChannel.java # Kafka 消息通道实现
│ │ │ ├── KafkaChannelFactory.java # Kafka 消息通道工厂
│ │ │ ├── MessageChannel.java # 消息通道接口
│ │ │ ├── MessageChannelFactory.java # 消息通道工厂接口
│ │ │ ├── RabbitMQChannel.java # RabbitMQ 消息通道实现
│ │ │ ├── RabbitMQChannelFactory.java # RabbitMQ 消息通道工厂
│ │ │ ├── RocketMQChannel.java # RocketMQ 消息通道实现
│ │ │ └── RocketMQChannelFactory.java # RocketMQ 消息通道工厂
│ │ └── resources/
│ │ └── META-INF/
│ │ └── services/
│ │ └── org.keycloak.events.EventListenerProviderFactory # 服务提供者配置
│ └── test/
│ └── java/
│ └── cc/bima/keycloak/extension/event/ # 测试代码
└── pom.xml # Maven 配置文件部署与维护
1. 部署方式
1.1 标准部署
将编译好的 JAR 文件放入 Keycloak 的 standalone/deployments 目录,Keycloak 会自动部署该扩展。
1.2 Docker 部署
如果使用 Docker 运行 Keycloak,可以将扩展 JAR 文件复制到容器的 /opt/keycloak/standalone/deployments 目录。
Dockerfile 示例:
dockerfile
FROM quay.io/keycloak/keycloak:17.0.0
COPY spi-event-listener-extension-1.0.0.jar /opt/keycloak/standalone/deployments/
ENV KEYCLOAK_ADMIN=admin
ENV KEYCLOAK_ADMIN_PASSWORD=admin
ENTRYPOINT ["/opt/keycloak/bin/kc.sh", "start-dev"]2. 监控与日志
2.1 日志配置
在 Keycloak 的 standalone/configuration/standalone.xml 文件中,可以配置日志级别:
xml
<logger category="cc.bima.keycloak.extension.event" level="info"/>2.2 监控指标
可以通过 Keycloak 的管理 API 或 JMX 监控扩展的运行状态。
3. 故障排除
3.1 常见问题
| 问题 | 原因 | 解决方案 |
|---|---|---|
| 事件未发送到消息队列 | 消息队列连接失败 | 检查消息队列服务是否运行,配置是否正确 |
| 扩展未加载 | JAR 文件未正确部署 | 检查 JAR 文件是否在 standalone/deployments 目录中,是否有部署错误 |
| 事件监听器未生效 | 未在 Keycloak 管理控制台中配置 | 登录管理控制台,在事件配置中添加 bima-spi-event-listener-extension |
性能优化
- 连接池管理:使用连接池管理消息队列连接,避免频繁创建和关闭连接
- 批量发送:对事件进行批量处理,减少网络传输次数
- 异步处理:使用异步方式发送消息,避免阻塞 Keycloak 主流程
- 消息压缩:对消息进行压缩,减少网络传输量
- 事件过滤:只处理必要的事件类型,减少消息量
注意事项
- 消息队列连接配置:需要根据实际环境进行调整,确保连接参数正确
- 错误处理:生产环境中应添加完善的错误处理和重试机制
- 消息序列化:建议使用高效的序列化方式,如 Protobuf 或 Avro
- 事件配置:应根据业务需求合理配置事件类型,避免过多的事件导致消息队列负载过高
- 安全性:确保消息队列连接使用安全的认证方式,避免未授权访问
- 监控:定期监控消息队列的运行状态,确保事件处理正常
- 备份:对重要的审计事件进行备份,防止数据丢失
- 版本兼容性:注意 Keycloak 版本与扩展版本的兼容性,避免因版本不匹配导致的问题
免责声明
本项目基于 GitHub 开源软件进行定制化开发,旨在为企业和开发者提供更便捷的项目基座解决方案。使用本项目时,请务必了解以下免责声明:
- 开源基础:本项目基于 GitHub 开源软件构建,遵循原开源协议的相关规定。
- 定制开发:我们对原开源软件进行了定制和扩展,以提供更优质的开发体验和功能支持。
- 责任限制:对于使用本项目可能产生的任何直接或间接的经济损失、数据丢失或其他问题,北京必码科技工作室不承担任何责任。
- 使用建议:在生产环境中使用本项目前,请进行充分的测试和验证,确保其符合您的业务需求和安全要求。
- 技术支持:我们提供技术支持服务,但不保证解决所有可能出现的问题。
- 合规使用:用户应确保在使用本项目时遵守相关法律法规和行业规范,不得用于任何违法或违规用途。