开发者

Java调用ChatGPT API并实现流式接收方式(Server-Sent Events,SSE)

开发者 https://www.devze.com 2025-08-20 10:28 出处:网络 作者: 陈震_
目录简介OkHttp 流式获取 GPT 响应通过 SSE 流式推送前端后端代码消息实体接口接口实现数据推送给前端前端代码创建sseClient.jsvue3代码优化后端代码踩坑总结简介
目录
  • 简介
  • OkHttp 流式获取 GPT 响应
  • 通过 SSE 流式推送前端
    • 后端代码
      • 消息实体
      • 接口
      • 接口实现
      • 数据推送给前端
    • 前端代码
      • 创建sseClient.js
      • vue3代码
    • 优化后端代码
    • 踩坑
      • 总结

        简介

        用过 ChatGPT 的伙伴应该想过自己通过调用ChatGPT官网提供的接口来实现一个自己的问答机器人,但是在调用的时候发现,请求总是以传统的HTTP请求/响应模式进行,这意味着我们没发送一个请求后需要等待 ChatGPT 服务器返回完整的响应。这种方式在生成文本时并不不是我们理想的,因为用户体验不够流畅。

        为了提供更好的用户体验,我们可以使用Server-Sent Events(SSE)技术来实现流式接收。这样,当ChatGPT 服务器可以在生成响应的同时逐步将内容推送给我们,我们在通过 SSE 流式推送到前端页面,让用户能够实时看到生成的内容。我将详细介绍如何在Java中实现这一功能。

        OkHttp 流式获取 GPT 响应

        其实市面上已经有很多现成的框架支持,但我们这里使用 okHttp 这个轻量级的HTTP客户端库来实现。

        需要先引用相关maven:

            <dependency>
                <groupId>com.squareup.okhttp3</groupId>
                <artifactId>okhttp</artifactId>
            </dependency>
            <dependency>
                <groupId>com.squareup.okhttp3</groupId>
                <artifactId>okhttp-sse</artifactId>
            </dehttp://www.devze.compendency>
        

        构建请求体,必须加上参数 stream 值为true

         //构建发送内容
                String messageStr = StrUtil.format(prompt, params);
        
                // 创建一个Message对象,该对象表示一个消息,并设置其属性
                Message message = new Message(Message.Role.USER.getRole(), messageStr);
                // 创建一个ChatCompletion对象,表示聊天完成请求,并将刚创建的消息添加到其中
                ChatCompletionRequest request = ChatCompletionRequest.builder()
                        .model(ChatCompletionRequest.Model.GPT_3_5_TURBO.getName())
                        .messages(Arrays.asList(message))
                        .stream(true)
                        .build();
        
               
        // 定义see接口
        Request request = new Request.Builder().url("https://api.openai.com/v1/chat/completions")
                .header("Authorization","xxx")
                .post(okhttp3.RequestBody.create(okhttp3.MediaType.parse("application/json; charset=utf-8"),param.toJSONString()))
                .build();
        OkHttpClient okHttpClient = new OkHttpClient.Builder()
                .connectTimeout(10, TimeUnit.MINUTES)
                .readTimeout(10, TimeUnit.MINUTES)//这边需要将超时显示设置长一点,不然刚连上就断开,之前以为调用方式错误被坑了半天
                .build();
        
        // 实例化EventSource,注册EventSource监听器
        RealEventSource realEventSource = new RealEventSource(request, new EventSourceListener() {
        
            @Override
            public void onOpen(EventSource eventSource, Response response) {
         编程客栈       log.info("onOpen");
            }
        
            @SneakyThrows
            @Override
            public void onEvent(EventSource eventSource, String id, String type, String data) {
        		// log.info("onEvent");
        		// 在实际应用中,你可以在这里将数据推送给前端
                log.info(data);//请求到的数据
            
            }
        
            @Override
            public void onClosed(EventSource eventSource) {
                log.info("onClosed");
        //                emitter.complete();
            }
        
            @Override
            public void onFailure(EventSource eventSource, Throwable t, Response response) {
                log.error("onFailure 出现异常,response={}", response, t);//这边可以监听并重新打开
        //                emitter.complete();
            }
        });
        realEventSource.connect(okHttpClient);//真正开始请求的一步
        

        通过 SSE 流式推送前端

        sse(Server Sent Event),直译为服务器发送事件,顾名思义,也就是客户端可以获取到服务器发送的事件

        我们常见的 http 交互方式是客户端发起请求,服务端响应,然后一次请求完毕;但是在 sse 的场景下,客户端发起请求,连接一直保持,服务端有数据就可以返回数据给客户端,这个返回可以是多次间隔的方式

        原理是先建立链接,然后不断发消息就可以

        我们利用 springboot 封装的 SseEmitter 来完成推送,需要用到以下依赖:

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.7.16</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
        
        

        后端代码

        消息实体

        其中客户端 ID 是每个 SSE 链接的唯一标识,拿到 ID 可以精准的给唯一的用户推送消息,消息通过字符串的方式进行传递

        import lombok.AllArgsConstructor;
        import lombok.Data;
        import lombok.NoArgsConstructor;
        
        /**
         * 消息体
         */
        @Data
        @AllArgsConstructor
        @NoArgsConstructor
        public class MessageVo {
            /**
             * 客户端id
             */
            private String clientId;
            /**
             * 传输数据体(json)
             */
            private String data;
        }
        
        

        接口

        import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
        
        
        public interface SseEmitterService {
            /**
             * 创建连接
             *
             * @param clientId 客户端ID
             */
            SseEmitter createConnect(String clientId);
        
            /**
             * 根据客户端id获取SseEmitter对象
             *
             * @param clientId 客户端ID
             */
            SseEmitter getSseEmitterByClientId(String clientId);
        
            /**
             * 发送消息给所有客户端
             *
             * @param msg 消息内容
             */
            void sendMessageToAllClient(String msg);
        
            /**
             * 给指定客户端发送消息
             *
             * @param clientId 客户端ID
             * @param msg      消息内容
             */
            void sendMessageToOneClient(String clientId, String msg);
        
            /**
             * 关闭连接
             *
             * @param clientId 客户端ID
             */
            void closeConnect(String clientId);
        }
        
        

        接口实现

        @Slf4j
        @Service
        public class SseEmitterServiceImpl implements SseEmitterService {
            /**
             * 容器,保存连接,用于输出返回 ;可使用其他方法实现
             */
            private static final Map<String, SseEmitter> sseCache = new ConcurrentHashMap<>();
        
        
            /**
             * 根据客户端id获取SseEmitter对象
             *
             * @param clientId 客户端ID
             */
            @Override
            public SseEmitter getSseEmitterByClientId(String clientId) {
                return sseCache.get(clientId);
            }
        
        
            /**
             * 创建连接
             *
             * @param clientId 客户端ID
             */
            @Override
            public SseEmitter createConnect(String clientId) {
                // 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutException
                SseEmitter sseEmitter = new SseEmitter(0L);
                // 是否需要给客户端推送ID
                if (StrUtil.isBlank(clientId)) {
                    clientId = IdUtil.simpleUUID();
                }
                // 注册回调
                sseEmitter.onCompletion(completionCallBack(clientId));     // 长链接完成后回调接口(即关闭连接时调用)
                sseEmitter.onTimeout(timeoutCallBack(clientId));        // 连接超时回调
                sseEmitter.onError(errorCallBack(clientId));          // 推送消息异常时,回调方法
                sseCache.put(clientId, sseEmitter);
                log.info("创建新的sse连接,当前用户:{}    累计用户:{}", clientId, sseCache.size());
                try {
                    // 注册成功返回用户信息
                    sseEmitter.send(SseEmitter.event().id(String.valueOf(HttpStatus.HTTP_CREATED)).data(clientId, MediaType.APPLICATION_JSON));
                } catch (IOException e) {
                    log.error("创建长链接异常,客户端ID:{}   异常信息:{}", clientId, e.getMessage());
                }
                return sseEmitter;
            }
        
            /**
             * 发送消息给所有客户端
             *
             * @param msg 消息内容
             */
            @Override
            public void sendMessageToAllClient(String msg) {
                if (MapUtil.isEmpty(sseCache)) {
                    return;
                }
                // 判断发送的消息是否为空
        
                for (Map.Entry<String, SseEmitter> entry : sseCache.entrySet()) {
                    MessageVo messageVo = new MessageVo();
                    messageVo.setClientId(entry.getKey());
                    messageVo.setData(msg);
                    sendMsgToClientByClientId(entry.getKey(), messageVo, entry.getValue());
                }
        
            }
        
            /**
             * 给指定客户端发送消息
             *
             * @param clientId 客户端ID
             * @param msg      消息内容
             */
            @Override
            public void sendMessageToOneClient(String clientId, String msg) {
                MessageVo messageVo = new MessageVo(clientId, msg);
                sendMsgToClientByClientId(clientId, messageVo, sseCache.get(clientId));
            }
        
            /**
             * 关闭连接
             *
             * @param clientId 客户端ID
             */
            @Override
            public void closeConnect(String clientId) {
                SseEmitter sseEmitter = sseCache.get(clientId);
                if (sseEmitter != null) {
                    sseEmitter.complete();
                    removeUser(clientId);
                }
            }
        
            /**
             * 推送消息到客户端
             * 此处做了推送失败后,重试推送机制,可根据自己业务进行修改
             *
             * @param clientId  客户端ID
             * @param messageVo 推送信息,此处结合具体业务,定义自己的返回值即可
             **/
            private void sendMsgToClientByClientId(String clientId, MessageVo messageVo, SseEmitter sseEmitter) {
                if (sseEmitter == null) {
                    log.error("推送消息失败:客户端{}未创建长链接,失败消息:{}",
                            clientId, messageVo.toString());
                    return;
                }
                SseEmitter.SseEventBuilder sendData = SseEmitter.event().id(String.valueOf(HttpStatus.HTTP_OK))
                        .data(messageVo, MediaType.APPLICATION_JSON);
                try {
                    sseEmitter.send(sendData);
                } catch (IOException e) {
                    // 推送消息失败,记录错误日志,进行重推
                    log.error("推送消息失败:{},尝试进行重推", messageVo.toString());
                    boolean isSuccess = true;
                    // 推送消息失败后,每隔10s推送一次,推送5次
                    for (int i = 0; i < 5; i++) {
                        try {
                            Thread.sleep(10000);
                            sseEmitter = sseCache.get(clientId);
                            if (sseEmitter == null) {
                                log.error("{}的第{}次消息重推失败,未创建长链接", clientId, i + 1);
                                continue;
                            }
                            sseEmitter.send(sendData);
                        } catch (Exception ex) {
                            log.error("{}的第{}次消息重推失败", clientId, i + 1, ex);
                            continue;
                        }
                        log.info("{}的第{}次消息重推成功,{}", clientId, i + 1, messageVo.toString());
                        return;
                    }
                }
            }
        
        
            /**
             * 长链接完成后回调接口(即关闭连接时调用)
             *
             * @param clientId 客户端ID
             **/
            private Runnable completionCallBack(String clientId) {
                return () -> {
                    log.info("结束连接:{}", clientId);
                    removeUser(clientjavascriptId);
                };
            }
        
            /**
             * 连接超时时调用
             *
             * @param clientId 客户端ID
             **/
            private Runnable timeoutCallBack(String clientId) {
                return () -> {
                    log.info("连接超时:{}", clientId);
                    removeUser(clientId);
                };
            }
        
            /**
             * 推送消息异常时,回调方法
             *
             * @param clientId 客户端ID
             **/
            private Consumer<Throwable> errorCallBack(String clientId) {
                return throwable -> {
                    log.error("SseEmitterServiceImpl[errorCallBack]:连接异常,客户端ID:{}", clientId);
        
                    // 推送消息失败后,每隔10s推送一次,推送5次
                    for (int i = 0; i < 5; i++) {
                        try {
                            Thread.sleep(10000);
                            SseEmitter sseEmitter = sseCache.get(clientId);
                            if (sseEmitter == null) {
                                log.error("SseEmitterServiceImpl[errorCallBack]:第{}次消息重推失败,未获取到 {} 对应的长链接", i + 1, clientId);
                                continue;
                            }
                            sseEmitter.send("失败后重新推送");
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                };
            }
        
            /**
             * 移除用户连接
             *
             * @param clientId 客户端ID
             **/
            private void removeUser(String clientId) {
                sseCache.remove(clientId);
                log.inf编程客栈o("SseEmitterServiceImpl[removeUser]:移除用户:{}", clientId);
            }
        }
        
        

        数据推送给前端

        在 onEvent 回调中添加代码,每接收到消息后就推送到前端

               
        // 定义see接口
        Request request = new Request.Builder().url("https://api.openai.com/v1/chat/completions")
                .header("Authorization","xxx")
                .post(okhttp3.RequestBody.create(okhttp3.MediaType.parse("application/json; charset=utf-8"),param.toJSONString()))
                .build();
        OkHttpClient okHttpClient = new OkHttpClient.Builder()
                .connectTimeout(10, TimeUnit.MINUTES)
                .readTimeout(10, TimeUnit.MINUTES)//这边需要将超时显示设置长一点,不然刚连上就断开,之前以为调用方式错误被坑了半天
                .build();
        
        // 实例化EventSource,注册EventSource监听器
        RealEventSource realEventSource = new RealEventSource(request, new EventSourceListener() {
        
            @Override
            public void onOpen(EventSource eventSource, Response response) {
                log.info("onOpen");
            }
        
            @Override
            public void onEvent(EventSource eventSource, String id, String type, String data) {
        		if ("[DONE]".equals(data)) {
                    System.out.println("收到 [DONE] 信号");
                    return;
                }
                ChatCompletionResp chatCompletionResp = JSON.parseobject(data, ChatCompletionResp.class);
                // 获得生成的文章内容
                if (CollUtil.isEmpty(chatCompletionResp.getChoices())){
                    return;
                }
                Message delta = chatCompletionResp.getChoices().get(0).getDelta();
                if (delta == null){
                    return;
                }
        
                sseEmitterService.sendMessageToOneClient(clientId , delta);
        		log.info(data);//请求到的数据
            
            }
        
            @Override
            public void onClosed(EventSource eventSource) {
                log.info("onClosed");
        //                emitter.complete();
            }
        
            @Override
            public void onFailure(EventSource eventSource, Throwable t, Response response) {
              log.error("onFailure 出现异常,response={}", response, t);//这边可以监听并重新打开
        //                emitter.complete();
            }
        });
        realEventSource.connect(okHttpClient);//真正开始请求的一步
        

        前端代码

        由于 EventSource 不允许直接配置请求头,普通的 EventSource 如果需要携带token请求,那就需要引入一个插件

        安装 EventSourcePolyfill

        你可以通过npm安装 event-source-polyfill:

        npm install event-source-polyfill
        

        引入 EventSourcePolyfill 后,它会自动替换浏览器中的原生 EventSource,其用法与原生的 API 一致。你可以像使用 EventSource 一样使用它:

        创建sseClient.js

        封装一下, sse 最佳实践,

        // utils/sseClient.js
        import { EventSourcePolyfill } from 'event-source-polyfill'
        import { baseURL } from '../config';
        
        
        // 封装一个创建 SSE 连接的方法
        export function newEventSource({ clientId = '', headers = {}, onMessage, onError, onOpen }) {
            const token = sessionStorage.getItem('token') || ''
        
            const es = new EventSourcePolyfill(baseURL + 'p/sse/createConnect?clientId=' + clientId  , {
                headers: {
                    'Authorization': `Bearer ${token}`
                    ...headers
                },
                heartbeatTimeout: 60 * 1000, // 心跳超时(可选)
            })
        
            es.onopen = (event) => {
                console.log('SSE 连接已开启')
                onOpen && onOpen(event)
            }
        
            es.onmessage = (event) => {
            	//前端:在接收到结束标识后立即销毁
                if (event.data === '[DONE]') {
                    console.log('SSE 连接已关闭')
                    es.close()
                }
                onMessage && onMessage(event)
            }
        
            es.onerror = (event) => {
                console.error('SSE 错误:', event)
                onError && onError(event)
                es.close() // 出错时自动关闭
            }
        
            return es // 返回实例,方便外部主动关闭
        }
        
        

        vue3代码

        import { newEventSource } from '@/utils/sseClient.js'
        
        const createSseConnection = () => {
          return newEventSource({
            clientId: 'xxx',
            onMessage: (event) => {
              console.log('Received SSE message:', event.data);
            }
          });
        };
        
        

        优化后端代码

        按需建立连接并及时关闭 是非常关键的实践策略,每一个 SseEmitter 在服务端都是一个线程或者任务挂起的状态,太多不关闭会导致资源消耗(线程、连接、内存等);

        如果每个用户长时间挂一个 SSE,不及时关闭,可能造成内存泄露或线程池耗尽,所以我们优化一下后端代码,在完成输出后及时关闭连接.

        在关闭和异常的回调方法中添加:

        sseEmitterService.sendMessageToOneClient(clientId, "[DONE]");
        sseEmitterService.closeConnect(clientId);
        

        修改后:

               
        // 定义see接口
        Request request = new Request.Builder().url("https://api.openai.com/v1/chat/completions")
                .header("Authorization","xxx")
                .post(okhttp3.RequestBody.create(okhttp3.MediaType.parse("application/json; charset=utf-8"),param.toJSONString()))
                .build();
        OkHttpClient okHttpClient = new OkHttpClient.Builder()
                .connectTimeout(10, TimeUnit.MINUTES)
                .readTimeout(10, TimeWhcupUnit.MINUTES)//这边需要将超时显示设置长一点,不然刚连上就断开,之前以为调用方式错误被坑了半天
                .build();
        
        // 实例化EventSource,注册EventSource监听器
        RealEventSource realEventSource = new RealEventSource(request, new EventSourceListener() {
        
            @Override
            public void onOpen(EventSource eventSource, Response response) {
                log.info("onOpen");
            }
        
            @SneakyThrows
            @Override
            public void onEvent(EventSource eventSource, String id, String type, String data) {
        		// log.info("onEvent");
        		// 在实际应用中,你可以在这里将数据推送给前端
                log.info(data);//请求到的数据
            }
        
            @Override
            public void onClosed(EventSource eventSource) {
                log.info("onClosed");
                sseEmitterService.sendMessageToOneClient(clientId, "[DONE]");
                sseEmitterService.closeConnect(clientId);
        //                emitter.complete();
            }
        
            @Override
            public void onFailure(EventSource eventSource, Throwable t, Response response) {
                log.error("onFailure 出现异常,response={}", response, t);//这边可以监听并重新打开
                sseEmitterService.sendMessageToOneClient(clientId, "[DONE]");
                sseEmitterService.closeConnect(clientId);
        //                emitter.complete();
            }
        });
        realEventSource.connect(okHttpClient);//真正开始请求的一步
        

        输出效果如下:

        Java调用ChatGPT API并实现流式接收方式(Server-Sent Events,SSE)

        踩坑

        sseEmitterService.sendMessageToOneClient(clientId, "[DONE]");
        

        使用该方法发送纯文本时,里面不能有任何回车符 \n,因为发送到前端后换行符可能被当作字段结束符处理,最好封装对象然后转JSON字符串发送

        总结

        以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程客栈(www.devze.com)。

        0

        精彩评论

        暂无评论...
        验证码 换一张
        取 消

        关注公众号