Skip to content

SPI Event Listener Extension 说明文档

功能概述

spi-event-listener-extension 是一个 Keycloak 扩展,实现了 Keycloak 的事件监听器 SPI,用于将 Keycloak 的事件(如用户登录、登出、管理操作等)发送到消息队列,以便进行审计、监控或其他业务处理。

该扩展支持多种消息队列系统,包括 Kafka、RabbitMQ 和 RocketMQ,并且提供了可扩展的架构,允许添加新的消息通道实现。

界面预览

SPI Event Listener Extension

技术支持

核心组件

1. AuditEventListenerProvider

功能:实现了 Keycloak 的 EventListenerProvider 接口,负责监听 Keycloak 事件并将其发送到配置的消息通道。

主要方法

  • onEvent(Event event):处理用户事件(如登录、登出等)

    • 参数event - Keycloak 用户事件对象,包含事件类型、用户ID、客户端ID等信息
    • 返回值:无
    • 功能:将用户事件转换为JSON字符串并发送到配置的消息通道
  • onEvent(AdminEvent adminEvent, boolean includeRepresentation):处理管理事件(如用户创建、角色分配等)

    • 参数
      • adminEvent - Keycloak 管理事件对象,包含操作类型、资源类型、资源ID等信息
      • includeRepresentation - 是否包含资源表示
    • 返回值:无
    • 功能:将管理事件转换为JSON字符串并发送到配置的消息通道
  • sendToMessageChannels(String message):将事件消息发送到所有配置的消息通道

    • 参数message - 要发送的消息字符串
    • 返回值:无
    • 功能:遍历所有注册的消息通道工厂,创建通道实例并发送消息

2. AuditEventListenerProviderFactory

功能:实现了 Keycloak 的 EventListenerProviderFactory 接口,负责创建 AuditEventListenerProvider 实例。

主要方法

  • getId():返回提供者工厂的ID

    • 返回值:字符串,固定为 "bima-spi-event-listener-extension"
  • create(KeycloakSession session):创建事件监听器提供者实例

    • 参数session - Keycloak 会话对象
    • 返回值AuditEventListenerProvider 实例
  • init(Config.Scope config):初始化配置,注册消息通道工厂

    • 参数config - Keycloak 配置对象
    • 返回值:无
    • 功能:初始化消息通道工厂映射,注册 Kafka、RabbitMQ 和 RocketMQ 通道工厂
  • postInit(KeycloakSessionFactory factory):后初始化

    • 参数factory - Keycloak 会话工厂对象
    • 返回值:无
    • 功能:在所有提供者初始化后执行的操作
  • close():清理资源

    • 返回值:无
    • 功能:清理消息通道工厂映射

3. 消息通道组件

MessageChannel 接口

功能:定义了消息通道的基本操作。

主要方法

  • send(String message):发送消息

    • 参数message - 要发送的消息字符串
    • 返回值:无
    • 功能:将消息发送到消息队列
  • close():关闭通道

    • 返回值:无
    • 功能:关闭消息通道,释放资源

MessageChannelFactory 接口

功能:定义了消息通道工厂的基本操作。

主要方法

  • create(ComponentModel model):创建消息通道实例
    • 参数model - Keycloak 组件模型对象,包含通道配置
    • 返回值MessageChannel 实例
    • 功能:根据配置创建消息通道实例

具体实现

  • KafkaChannel:发送消息到 Kafka

    • 功能:将消息发送到 Kafka 主题
    • 配置参数
      • bootstrap.servers:Kafka 集群地址
      • topic:目标主题名称
      • acks:确认级别
  • RabbitMQChannel:发送消息到 RabbitMQ

    • 功能:将消息发送到 RabbitMQ 队列
    • 配置参数
      • host:RabbitMQ 服务器地址
      • port:RabbitMQ 服务器端口
      • username:RabbitMQ 用户名
      • password:RabbitMQ 密码
      • queue:目标队列名称
  • RocketMQChannel:发送消息到 RocketMQ

    • 功能:将消息发送到 RocketMQ 主题
    • 配置参数
      • namesrvAddr:NameServer 地址
      • topic:目标主题名称
      • producerGroup:生产者组名称

配置与使用

1. 开发环境搭建

前提条件

  • JDK 11 或更高版本
  • Maven 3.6 或更高版本
  • Keycloak 17.0.0 或更高版本
  • 消息队列系统(Kafka、RabbitMQ 或 RocketMQ)

编译构建

bash
cd spi-event-listener-extension
mvn clean package

编译完成后,在 target 目录下会生成 spi-event-listener-extension-1.0.0.jar 文件。

2. 部署

将编译好的 JAR 文件放入 Keycloak 的 standalone/deployments 目录。

3. 配置

3.1 Keycloak 事件监听器配置

  1. 登录 Keycloak 管理控制台
  2. 进入 Realm 设置 → 事件 → 配置
  3. 在事件监听器中添加 bima-spi-event-listener-extension
  4. 保存配置

3.2 消息队列配置

当前实现中,消息队列的配置是硬编码在 AuditEventListenerProviderFactory.init() 方法中的。在实际使用中,建议通过 Keycloak 的组件配置来管理这些设置。

配置示例

java
// 在 AuditEventListenerProviderFactory.init() 方法中
@Override
public void init(org.keycloak.Config.Scope config) {
    // 初始化消息通道工厂
    channelFactories = new HashMap<>();
    
    // 注册 Kafka 通道工厂
    KafkaChannelFactory kafkaFactory = new KafkaChannelFactory();
    // 设置默认配置
    Map<String, String> kafkaConfig = new HashMap<>();
    kafkaConfig.put("bootstrap.servers", "localhost:9092");
    kafkaConfig.put("topic", "keycloak-events");
    kafkaConfig.put("acks", "all");
    kafkaFactory.setDefaultConfig(kafkaConfig);
    channelFactories.put("kafka", kafkaFactory);
    
    // 注册 RabbitMQ 通道工厂
    RabbitMQChannelFactory rabbitFactory = new RabbitMQChannelFactory();
    // 设置默认配置
    Map<String, String> rabbitConfig = new HashMap<>();
    rabbitConfig.put("host", "localhost");
    rabbitConfig.put("port", "5672");
    rabbitConfig.put("username", "guest");
    rabbitConfig.put("password", "guest");
    rabbitConfig.put("queue", "keycloak-events");
    rabbitFactory.setDefaultConfig(rabbitConfig);
    channelFactories.put("rabbitmq", rabbitFactory);
    
    // 注册 RocketMQ 通道工厂
    RocketMQChannelFactory rocketFactory = new RocketMQChannelFactory();
    // 设置默认配置
    Map<String, String> rocketConfig = new HashMap<>();
    rocketConfig.put("namesrvAddr", "localhost:9876");
    rocketConfig.put("topic", "keycloak-events");
    rocketConfig.put("producerGroup", "keycloak-producer-group");
    rocketFactory.setDefaultConfig(rocketConfig);
    channelFactories.put("rocketmq", rocketFactory);
}

4. 测试

4.1 单元测试

bash
cd spi-event-listener-extension
mvn test

4.2 集成测试

  1. 启动 Keycloak 服务器
  2. 启动消息队列服务(如 Kafka)
  3. 执行用户操作(如登录、登出)
  4. 检查消息队列中是否收到事件消息

扩展与定制

添加新的消息通道

  1. 实现 MessageChannel 接口
java
public class CustomMessageChannel implements MessageChannel {
    
    private String endpoint;
    
    public CustomMessageChannel(String endpoint) {
        this.endpoint = endpoint;
    }
    
    @Override
    public void send(String message) {
        // 实现消息发送逻辑
        System.out.println("Sending message to " + endpoint + ": " + message);
    }
    
    @Override
    public void close() {
        // 实现资源清理逻辑
    }
}
  1. 实现 MessageChannelFactory 接口
java
public class CustomMessageChannelFactory implements MessageChannelFactory {
    
    private Map<String, String> defaultConfig;
    
    public void setDefaultConfig(Map<String, String> defaultConfig) {
        this.defaultConfig = defaultConfig;
    }
    
    @Override
    public MessageChannel create(ComponentModel model) {
        // 从组件模型获取配置,如果没有则使用默认配置
        String endpoint = model != null ? 
            model.getConfig().getFirst("endpoint") : 
            defaultConfig.get("endpoint");
        
        return new CustomMessageChannel(endpoint);
    }
}
  1. 在 AuditEventListenerProviderFactory.init() 方法中注册新的通道工厂
java
@Override
public void init(org.keycloak.Config.Scope config) {
    // 初始化消息通道工厂
    channelFactories = new HashMap<>();
    
    // 注册现有通道工厂...
    
    // 注册自定义通道工厂
    CustomMessageChannelFactory customFactory = new CustomMessageChannelFactory();
    // 设置默认配置
    Map<String, String> customConfig = new HashMap<>();
    customConfig.put("endpoint", "http://localhost:8080/api/events");
    customFactory.setDefaultConfig(customConfig);
    channelFactories.put("custom", customFactory);
}

自定义事件处理

可以修改 AuditEventListenerProvider.onEvent() 方法来定制事件处理逻辑,例如:

添加事件过滤

java
@Override
public void onEvent(Event event) {
    try {
        // 过滤不需要处理的事件类型
        if (event.getType() == EventType.LOGIN_ERROR || 
            event.getType() == EventType.REGISTER) {
            // 将事件转换为JSON字符串
            String eventJson = objectMapper.writeValueAsString(event);
            // 发送到配置的消息通道
            sendToMessageChannels(eventJson);
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
}

对事件进行转换或增强

java
@Override
public void onEvent(Event event) {
    try {
        // 创建增强的事件对象
        Map<String, Object> enhancedEvent = new HashMap<>();
        enhancedEvent.put("type", event.getType());
        enhancedEvent.put("realmId", event.getRealmId());
        enhancedEvent.put("clientId", event.getClientId());
        enhancedEvent.put("userId", event.getUserId());
        enhancedEvent.put("ipAddress", event.getIpAddress());
        enhancedEvent.put("details", event.getDetails());
        enhancedEvent.put("timestamp", event.getTime());
        enhancedEvent.put("processedAt", System.currentTimeMillis());
        enhancedEvent.put("environment", "production");
        
        // 将增强的事件转换为JSON字符串
        String eventJson = objectMapper.writeValueAsString(enhancedEvent);
        // 发送到配置的消息通道
        sendToMessageChannels(eventJson);
    } catch (Exception e) {
        e.printStackTrace();
    }
}

实现不同事件类型的不同处理策略

java
@Override
public void onEvent(Event event) {
    try {
        // 根据事件类型选择不同的处理策略
        String eventJson;
        switch (event.getType()) {
            case LOGIN:
            case LOGIN_ERROR:
                // 登录事件特殊处理
                eventJson = processLoginEvent(event);
                break;
            case REGISTER:
                // 注册事件特殊处理
                eventJson = processRegisterEvent(event);
                break;
            default:
                // 其他事件默认处理
                eventJson = objectMapper.writeValueAsString(event);
        }
        
        // 发送到配置的消息通道
        sendToMessageChannels(eventJson);
    } catch (Exception e) {
        e.printStackTrace();
    }
}

private String processLoginEvent(Event event) throws JsonProcessingException {
    // 登录事件处理逻辑
    Map<String, Object> loginEvent = new HashMap<>();
    loginEvent.put("type", "LOGIN_ATTEMPT");
    loginEvent.put("userId", event.getUserId());
    loginEvent.put("clientId", event.getClientId());
    loginEvent.put("ipAddress", event.getIpAddress());
    loginEvent.put("success", event.getType() == EventType.LOGIN);
    loginEvent.put("timestamp", event.getTime());
    return objectMapper.writeValueAsString(loginEvent);
}

private String processRegisterEvent(Event event) throws JsonProcessingException {
    // 注册事件处理逻辑
    Map<String, Object> registerEvent = new HashMap<>();
    registerEvent.put("type", "USER_REGISTERED");
    registerEvent.put("userId", event.getUserId());
    registerEvent.put("clientId", event.getClientId());
    registerEvent.put("ipAddress", event.getIpAddress());
    registerEvent.put("timestamp", event.getTime());
    return objectMapper.writeValueAsString(registerEvent);
}

代码结构

spi-event-listener-extension/
├── src/
│   ├── main/
│   │   ├── java/
│   │   │   └── cc/bima/keycloak/extension/event/
│   │   │       ├── AuditEventListenerProvider.java       # 事件监听器提供者实现
│   │   │       ├── AuditEventListenerProviderFactory.java # 事件监听器提供者工厂
│   │   │       ├── KafkaChannel.java                     # Kafka 消息通道实现
│   │   │       ├── KafkaChannelFactory.java               # Kafka 消息通道工厂
│   │   │       ├── MessageChannel.java                   # 消息通道接口
│   │   │       ├── MessageChannelFactory.java           # 消息通道工厂接口
│   │   │       ├── RabbitMQChannel.java                  # RabbitMQ 消息通道实现
│   │   │       ├── RabbitMQChannelFactory.java            # RabbitMQ 消息通道工厂
│   │   │       ├── RocketMQChannel.java                  # RocketMQ 消息通道实现
│   │   │       └── RocketMQChannelFactory.java            # RocketMQ 消息通道工厂
│   │   └── resources/
│   │       └── META-INF/
│   │           └── services/
│   │               └── org.keycloak.events.EventListenerProviderFactory # 服务提供者配置
│   └── test/
│       └── java/
│           └── cc/bima/keycloak/extension/event/         # 测试代码
└── pom.xml                                               # Maven 配置文件

部署与维护

1. 部署方式

1.1 标准部署

将编译好的 JAR 文件放入 Keycloak 的 standalone/deployments 目录,Keycloak 会自动部署该扩展。

1.2 Docker 部署

如果使用 Docker 运行 Keycloak,可以将扩展 JAR 文件复制到容器的 /opt/keycloak/standalone/deployments 目录。

Dockerfile 示例

dockerfile
FROM quay.io/keycloak/keycloak:17.0.0

COPY spi-event-listener-extension-1.0.0.jar /opt/keycloak/standalone/deployments/

ENV KEYCLOAK_ADMIN=admin
ENV KEYCLOAK_ADMIN_PASSWORD=admin

ENTRYPOINT ["/opt/keycloak/bin/kc.sh", "start-dev"]

2. 监控与日志

2.1 日志配置

在 Keycloak 的 standalone/configuration/standalone.xml 文件中,可以配置日志级别:

xml
<logger category="cc.bima.keycloak.extension.event" level="info"/>

2.2 监控指标

可以通过 Keycloak 的管理 API 或 JMX 监控扩展的运行状态。

3. 故障排除

3.1 常见问题

问题原因解决方案
事件未发送到消息队列消息队列连接失败检查消息队列服务是否运行,配置是否正确
扩展未加载JAR 文件未正确部署检查 JAR 文件是否在 standalone/deployments 目录中,是否有部署错误
事件监听器未生效未在 Keycloak 管理控制台中配置登录管理控制台,在事件配置中添加 bima-spi-event-listener-extension

性能优化

  1. 连接池管理:使用连接池管理消息队列连接,避免频繁创建和关闭连接
  2. 批量发送:对事件进行批量处理,减少网络传输次数
  3. 异步处理:使用异步方式发送消息,避免阻塞 Keycloak 主流程
  4. 消息压缩:对消息进行压缩,减少网络传输量
  5. 事件过滤:只处理必要的事件类型,减少消息量

注意事项

  1. 消息队列连接配置:需要根据实际环境进行调整,确保连接参数正确
  2. 错误处理:生产环境中应添加完善的错误处理和重试机制
  3. 消息序列化:建议使用高效的序列化方式,如 Protobuf 或 Avro
  4. 事件配置:应根据业务需求合理配置事件类型,避免过多的事件导致消息队列负载过高
  5. 安全性:确保消息队列连接使用安全的认证方式,避免未授权访问
  6. 监控:定期监控消息队列的运行状态,确保事件处理正常
  7. 备份:对重要的审计事件进行备份,防止数据丢失
  8. 版本兼容性:注意 Keycloak 版本与扩展版本的兼容性,避免因版本不匹配导致的问题

免责声明

本项目基于 GitHub 开源软件进行定制化开发,旨在为企业和开发者提供更便捷的项目基座解决方案。使用本项目时,请务必了解以下免责声明:

  1. 开源基础:本项目基于 GitHub 开源软件构建,遵循原开源协议的相关规定。
  2. 定制开发:我们对原开源软件进行了定制和扩展,以提供更优质的开发体验和功能支持。
  3. 责任限制:对于使用本项目可能产生的任何直接或间接的经济损失、数据丢失或其他问题,北京必码科技工作室不承担任何责任。
  4. 使用建议:在生产环境中使用本项目前,请进行充分的测试和验证,确保其符合您的业务需求和安全要求。
  5. 技术支持:我们提供技术支持服务,但不保证解决所有可能出现的问题。
  6. 合规使用:用户应确保在使用本项目时遵守相关法律法规和行业规范,不得用于任何违法或违规用途。