Browse Source

ai 业务处理

yangshun 3 weeks ago
parent
commit
b04bc0bb7c

+ 224 - 0
guoyan-ai/src/main/java/com/cy/guoyan/admin/module/ai/service/aiservice/AiSelectionService.java

@@ -0,0 +1,224 @@
+package com.cy.guoyan.admin.module.ai.service.aiservice;
+
+import cn.hutool.core.io.IoUtil;
+import com.alibaba.fastjson.JSONObject;
+import com.cy.guoyan.admin.framework.common.util.object.BeanUtils;
+import com.cy.guoyan.admin.module.ai.controller.admin.chatdocument.vo.ChatDocumentSaveReqVO;
+import com.cy.guoyan.admin.module.ai.controller.admin.chatknowledgedataset.vo.ChatKnowledgeDatasetSaveReqVO;
+import com.cy.guoyan.admin.module.ai.controller.admin.chatmessage.vo.ChatMessageRespVO;
+import com.cy.guoyan.admin.module.ai.controller.admin.chatmessage.vo.ChatMessageSendReqVO;
+import com.cy.guoyan.admin.module.ai.dal.dataobject.chatdocument.ChatDocumentDO;
+import com.cy.guoyan.admin.module.ai.dal.dataobject.chatknowledgedataset.ChatKnowledgeDatasetDO;
+import com.cy.guoyan.admin.module.ai.dal.mysql.chatdocument.ChatDocumentMapper;
+import com.cy.guoyan.admin.module.ai.dal.mysql.chatknowledgedataset.ChatKnowledgeDatasetMapper;
+import com.cy.guoyan.admin.module.infra.api.file.FileApi;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+import org.springframework.web.multipart.MultipartFile;
+
+import javax.annotation.Resource;
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+import static com.cy.guoyan.admin.framework.common.exception.util.ServiceExceptionUtil.exception;
+import static com.cy.guoyan.admin.module.system.enums.ErrorCodeConstants.*;
+
+@Service
+@Slf4j
+public class AiSelectionService implements AiService {
+
+    private final AiService difyService;
+
+    private final AiService ragflowService;
+
+    @Resource
+    private ChatKnowledgeDatasetMapper chatKnowledgeDatasetMapper;
+
+    @Resource
+    private ChatDocumentMapper chatDocumentMapper;
+
+    @Resource
+    private FileApi fileApi;
+
+    public AiSelectionService(
+            @Qualifier("dify") AiService difyService,
+            @Qualifier("ragflow") AiService ragflowService) {
+        this.difyService = difyService;
+        this.ragflowService = ragflowService;
+    }
+
+
+    @Override
+    @Transactional
+    public JSONObject createDataset(ChatKnowledgeDatasetSaveReqVO req) throws IOException {
+            JSONObject difyJson = difyService.createDataset(req);
+
+            String difyDatasetId = difyJson.getString("id");
+
+            String difyEmbeddingModel  = difyJson.getString("embedding_model");
+
+            JSONObject ragflowJson = ragflowService.createDataset(req);
+
+            JSONObject data = ragflowJson.getJSONObject("data");
+            if (data == null) {
+                throw new IOException("返回结果中没有 data 字段:" + ragflowJson);
+            }
+
+            String ragflowDatasetId  = data.getString("id");
+            String ragflowEmbeddingModel  = data.getString("embedding_model");
+
+            ChatKnowledgeDatasetDO aDo = new ChatKnowledgeDatasetDO();
+            aDo.setName(req.getName());
+            aDo.setDescription(req.getDescription());
+            aDo.setDifyDatasetId(difyDatasetId);
+            aDo.setRagflowDatasetId(ragflowDatasetId);
+            aDo.setDifyEmbeddingModel(difyEmbeddingModel);
+            aDo.setRagflowEmbeddingModel(ragflowEmbeddingModel);
+            aDo.setDifyMetadataJson(difyJson.toJSONString());
+            aDo.setRagflowMetadataJson(data.toJSONString());
+            chatKnowledgeDatasetMapper.insert(aDo);
+            return difyJson;
+    }
+
+    @Override
+    @Transactional
+    public void updateDataset(ChatKnowledgeDatasetSaveReqVO req) throws IOException {
+            difyService.updateDataset(req);
+            ragflowService.updateDataset(req);
+    }
+
+    @Override
+    @Transactional
+    public String deleteDataset(String id) throws IOException {
+            ChatKnowledgeDatasetDO datasetDO = chatKnowledgeDatasetMapper.selectById(id);
+            if (datasetDO == null) {
+                throw exception(CHAT_KNOWLEDGE_DATASET_NOT_EXISTS);
+            }
+
+            String difyResult = difyService.deleteDataset(datasetDO.getDifyDatasetId());
+            log.info("删除dify知识库:{}", difyResult);
+
+            String ragflowResult = ragflowService.deleteDataset(datasetDO.getRagflowDatasetId());
+
+            log.info("删除ragflow知识库t:{}", ragflowResult);
+
+            chatKnowledgeDatasetMapper.deleteById(id);
+
+
+        return "success";
+    }
+
+
+
+    @Override
+    public JSONObject createDocumentByFile(MultipartFile file, ChatDocumentSaveReqVO createReqVO, File tmpFile) throws IOException {
+
+        ChatKnowledgeDatasetDO dataset = chatKnowledgeDatasetMapper.selectById(createReqVO.getDatasetId());
+        if (dataset == null) {
+            throw exception(CHAT_KNOWLEDGE_DATASET_NOT_EXISTS);
+        }
+
+        JSONObject difyResp = null;
+        JSONObject ragflowResp = null;
+        String originalFilename = file.getOriginalFilename();
+        createReqVO.setName(originalFilename);
+
+        ChatDocumentDO doc = BeanUtils.toBean(createReqVO, ChatDocumentDO.class);
+        doc.setDataSourceType("file_upload");
+        doc.setDatasetId(createReqVO.getDatasetId());
+        doc.setProcessRuleMode(createReqVO.getProcessRuleMode());
+
+
+        Map<String, Object> apiFile = fileApi.getFile(originalFilename, IoUtil.readBytes(file.getInputStream()), "restricted");
+        log.info("DEBUG - File Upload Response: {}", apiFile);
+        String fileUrl = apiFile.get("path").toString();
+        doc.setFilePath(fileUrl);
+        File tmp = File.createTempFile("tem_up", "_" + file.getOriginalFilename());
+        file.transferTo(tmp);
+        tmp.deleteOnExit();
+
+        difyResp = difyService.createDocumentByFile(null, createReqVO.setDatasetId(dataset.getDifyDatasetId()), tmp);
+        log.info("DEBUG - difyResp: {}", difyResp);
+
+        ragflowResp = ragflowService.createDocumentByFile(null, createReqVO.setDatasetId(dataset.getRagflowDatasetId()), tmp);
+        log.info("DEBUG - ragflowResp: {}", ragflowResp);
+
+        JSONObject difyJson = difyResp.getJSONObject("document");
+        JSONObject ragflowJson = ragflowResp.getJSONArray("data").getJSONObject(0);
+        doc.setDifyDocumentId(difyJson.getString("id"));
+        doc.setRagflowDocumentId(ragflowJson.getString("id"));
+        doc.setWordCount(difyJson.getIntValue("word_count"));
+        doc.setIndexingStatus(difyJson.getString("indexing_status"));
+        doc.setBatch(difyResp.getString("batch"));
+        doc.setFileSize(apiFile.get("size").toString());
+        chatDocumentMapper.insert(doc);
+        if (StringUtils.isNotBlank(doc.getDifyDocumentId())) {
+            difyService.getDocumentIndexingStatus(dataset.getDifyDatasetId(), String.valueOf(doc.getId()));
+        }
+        if (StringUtils.isNotBlank(doc.getRagflowDocumentId())) {
+            ragflowService.getDocumentIndexingStatus(dataset.getRagflowDatasetId(), String.valueOf(doc.getId()));
+        }
+        return difyResp;
+
+    }
+
+    @Override
+    public void deleteDocument(String datasetId, String documentId) throws IOException {
+        ChatDocumentDO documentDO = chatDocumentMapper.selectById(documentId);
+        if (documentDO == null) {
+            throw exception(CHAT_DOCUMENT_NOT_EXISTS);
+        }
+
+        ChatKnowledgeDatasetDO dataset = chatKnowledgeDatasetMapper.selectById(documentDO.getDatasetId());
+        if (dataset == null) {
+            throw exception(CHAT_KNOWLEDGE_DATASET_NOT_EXISTS);
+        }
+
+        difyService.deleteDocument(dataset.getDifyDatasetId(), documentDO.getDifyDocumentId());
+
+        ragflowService.deleteDocument(dataset.getRagflowDatasetId(), documentDO.getRagflowDocumentId());
+
+        chatDocumentMapper.deleteById(documentId);
+
+    }
+
+
+    @Override
+    public JSONObject getDocumentIndexingStatus(String datasetId, String batch) throws IOException {
+        ChatKnowledgeDatasetDO dataset = chatKnowledgeDatasetMapper.selectById(datasetId);
+        if (dataset == null) {
+            throw exception(CHAT_KNOWLEDGE_DATASET_NOT_EXISTS);
+        }
+
+        ChatDocumentDO doc = chatDocumentMapper.selectById(batch);
+
+        JSONObject documentIndexingStatus = null;
+
+        if (StringUtils.isNotBlank(doc.getDifyDocumentId())) {
+            difyService.getDocumentIndexingStatus(dataset.getDifyDatasetId(), String.valueOf(doc.getId()));
+        }
+        if (StringUtils.isNotBlank(doc.getRagflowDocumentId())) {
+             documentIndexingStatus = ragflowService.getDocumentIndexingStatus(dataset.getRagflowDatasetId(), String.valueOf(doc.getId()));
+        }
+        return documentIndexingStatus;
+    }
+
+    @Override
+    public ChatMessageRespVO sendChatMessageBlock(ChatMessageSendReqVO sendReqVO) throws IOException {
+        ChatMessageRespVO respVO = null;
+        if (sendReqVO.getAppType() == null) {
+            throw exception(CHAT_MODEL_NOT_EXISTS);
+        }
+        if (sendReqVO.getAppType() == 2) {
+            respVO = difyService.sendChatMessageBlock(sendReqVO);
+        } else if (sendReqVO.getAppType() == 1) {
+            respVO = ragflowService.sendChatMessageBlock(sendReqVO);
+        }
+        return respVO;
+    }
+}
+

+ 28 - 0
guoyan-ai/src/main/java/com/cy/guoyan/admin/module/ai/service/aiservice/AiService.java

@@ -0,0 +1,28 @@
+package com.cy.guoyan.admin.module.ai.service.aiservice;
+
+import com.alibaba.fastjson.JSONObject;
+import com.cy.guoyan.admin.module.ai.controller.admin.chatdocument.vo.ChatDocumentSaveReqVO;
+import com.cy.guoyan.admin.module.ai.controller.admin.chatknowledgedataset.vo.ChatKnowledgeDatasetSaveReqVO;
+import com.cy.guoyan.admin.module.ai.controller.admin.chatmessage.vo.ChatMessageRespVO;
+import com.cy.guoyan.admin.module.ai.controller.admin.chatmessage.vo.ChatMessageSendReqVO;
+import org.springframework.web.multipart.MultipartFile;
+
+import java.io.File;
+import java.io.IOException;
+
+public interface AiService {
+    JSONObject createDataset(ChatKnowledgeDatasetSaveReqVO req) throws IOException;
+
+    void updateDataset(ChatKnowledgeDatasetSaveReqVO req) throws IOException;
+
+    String deleteDataset(String id) throws IOException;
+
+    JSONObject createDocumentByFile(MultipartFile file, ChatDocumentSaveReqVO createReqVO, File tmpFile)throws IOException;
+
+    JSONObject getDocumentIndexingStatus(String datasetId, String batch) throws IOException;
+
+    void deleteDocument(String datasetId, String documentId) throws IOException;
+
+    ChatMessageRespVO sendChatMessageBlock(ChatMessageSendReqVO sendReqVO) throws IOException;
+
+}

+ 388 - 0
guoyan-ai/src/main/java/com/cy/guoyan/admin/module/ai/service/aiservice/DifyService.java

@@ -0,0 +1,388 @@
+package com.cy.guoyan.admin.module.ai.service.aiservice;
+
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.cy.guoyan.admin.framework.common.util.object.BeanUtils;
+import com.cy.guoyan.admin.framework.mybatis.core.query.LambdaQueryWrapperX;
+import com.cy.guoyan.admin.framework.security.core.util.SecurityFrameworkUtils;
+import com.cy.guoyan.admin.module.ai.controller.admin.chatdocument.vo.ChatDocumentSaveReqVO;
+import com.cy.guoyan.admin.module.ai.controller.admin.chatknowledgedataset.vo.ChatKnowledgeDatasetSaveReqVO;
+import com.cy.guoyan.admin.module.ai.controller.admin.chatmessage.vo.ChatMessageRespVO;
+import com.cy.guoyan.admin.module.ai.controller.admin.chatmessage.vo.ChatMessageSendReqVO;
+import com.cy.guoyan.admin.module.ai.dal.dataobject.chatapp.ChatAppDO;
+import com.cy.guoyan.admin.module.ai.dal.dataobject.chatdocument.ChatDocumentDO;
+import com.cy.guoyan.admin.module.ai.dal.dataobject.chatknowledgedataset.ChatKnowledgeDatasetDO;
+import com.cy.guoyan.admin.module.ai.dal.dataobject.chatmessage.ChatMessageDO;
+import com.cy.guoyan.admin.module.ai.dal.dataobject.chatsession.ChatSessionDO;
+import com.cy.guoyan.admin.module.ai.dal.mysql.chatapp.ChatAppMapper;
+import com.cy.guoyan.admin.module.ai.dal.mysql.chatdocument.ChatDocumentMapper;
+import com.cy.guoyan.admin.module.ai.dal.mysql.chatknowledgedataset.ChatKnowledgeDatasetMapper;
+import com.cy.guoyan.admin.module.ai.dal.mysql.chatmessage.ChatMessageMapper;
+import com.cy.guoyan.admin.module.ai.dal.mysql.chatsession.ChatSessionMapper;
+import com.cy.guoyan.admin.module.ai.service.chatdocument.ChatDocumentService;
+import com.cy.guoyan.admin.module.ai.util.DifyClient;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+import org.springframework.web.multipart.MultipartFile;
+import javax.annotation.Resource;
+import java.io.*;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.*;
+import java.util.stream.Collectors;
+import static com.cy.guoyan.admin.framework.common.exception.util.ServiceExceptionUtil.exception;
+import static com.cy.guoyan.admin.module.system.enums.ErrorCodeConstants.CHAT_APP_NOT_EXISTS;
+import static com.cy.guoyan.admin.module.system.enums.ErrorCodeConstants.CHAT_KNOWLEDGE_DATASET_NOT_EXISTS;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+@Service
+@Qualifier("dify")
+@Slf4j
+public class DifyService implements AiService {
+
+    @Resource
+    private ChatKnowledgeDatasetMapper chatKnowledgeDatasetMapper;
+
+    @Resource
+    private ChatDocumentService chatDocumentService;
+
+    @Value("${dify.datasetsKey}")
+    private String difyApiKey;
+
+    @Value("${dify.baseUrl}")
+    private String difyBaseUrl;
+
+    @Resource
+    private ChatSessionMapper chatSessionMapper;
+
+    @Resource
+    private ChatMessageMapper chatMessageMapper;
+
+    @Resource
+    private ChatDocumentMapper chatDocumentMapper;
+
+    @Resource
+    private ChatAppMapper chaAppMapper;
+
+    private final DifyClient difyClient;
+
+    public DifyService(DifyClient difyClient) {
+        this.difyClient = difyClient;
+    }
+
+    private HttpURLConnection prepare(String method, String path) throws IOException {
+        URL url = new URL(difyBaseUrl + path);
+        HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+        conn.setRequestMethod(method);
+        conn.setRequestProperty("Authorization", "Bearer " + difyApiKey);
+        return conn;
+    }
+
+    /** 读取响应 body 或抛异常 */
+    private String readResponse(HttpURLConnection conn) throws IOException {
+        int code = conn.getResponseCode();
+        InputStream is = (code >= 200 && code < 300) ? conn.getInputStream() : conn.getErrorStream();
+        if (is == null) throw new IOException("无返回流,HTTP " + code);
+        String body;
+        try (BufferedReader reader = new BufferedReader(new InputStreamReader(is, UTF_8))) {
+            body = reader.lines().collect(Collectors.joining());
+        }
+        if (code < 200 || code >= 300) {
+            throw new IOException("Dify 错误 HTTP " + code + "," + body);
+        }
+        return body;
+    }
+
+
+    @Override
+    public JSONObject createDataset(ChatKnowledgeDatasetSaveReqVO vo)throws IOException {
+        JSONObject body = new JSONObject();
+        body.put("name", vo.getName());
+        body.put("description", vo.getDescription());
+        body.put("indexing_technique", "high_quality");
+//        body.put("embedding_model", vo.getEmbeddingModel());
+        body.put("auto_update", vo.getAutoUpdate());
+        body.put("provider", "vendor");
+        body.put("permission", "all_team_members");
+
+        HttpURLConnection conn = prepare("POST", "/v1/datasets");
+        conn.setDoOutput(true);
+        conn.setRequestProperty("Content-Type", "application/json");
+        try (OutputStream os = conn.getOutputStream()) {
+            os.write(body.toJSONString().getBytes(UTF_8));
+        }
+        String resp = readResponse(conn);
+        return JSONObject.parseObject(resp);
+    }
+
+    @Override
+    public void updateDataset(ChatKnowledgeDatasetSaveReqVO updateReqVO) {
+        // 校验存在
+        validateChatKnowledgeDatasetExists(updateReqVO.getId());
+        // 更新
+        ChatKnowledgeDatasetDO updateObj = BeanUtils.toBean(updateReqVO, ChatKnowledgeDatasetDO.class);
+        chatKnowledgeDatasetMapper.updateById(updateObj);
+    }
+
+    @Override
+    public String deleteDataset(String datasetId) throws IOException {
+        HttpURLConnection conn = prepare("DELETE", "/v1/datasets/" + datasetId);
+        log.info("删除知识库:{}", datasetId);
+        return readResponse(conn);
+    }
+
+    private void validateChatKnowledgeDatasetExists(Long id) {
+        if (chatKnowledgeDatasetMapper.selectById(id) == null) {
+            throw exception(CHAT_KNOWLEDGE_DATASET_NOT_EXISTS);
+        }
+    }
+
+
+    @Override
+    public JSONObject createDocumentByFile(MultipartFile file, ChatDocumentSaveReqVO createReqVO,File tmpFile) throws IOException {
+        String datasetId = createReqVO.getDatasetId();
+        String originalFilename = createReqVO.getName();
+
+        Map<String, Object> requestData = new HashMap<>();
+        requestData.put("datasetId", datasetId);
+        requestData.put("name", originalFilename);
+        requestData.put("indexing_technique", "high_quality");
+        requestData.put("doc_form", "text_model");
+
+        Map<String, Object> processRule = new HashMap<>();
+        if (Objects.equals("custom",createReqVO.getProcessRuleMode())) {
+            processRule.put("mode", "custom");
+            // 预处理规则
+            Map<String, Object> rules = new HashMap<>();
+            List<Map<String, Object>> preProcessingRules = new ArrayList<>();
+            Map<String, Object> rule1 = new HashMap<>();
+            rule1.put("id", "remove_extra_spaces");
+            rule1.put("enabled", true);
+            preProcessingRules.add(rule1);
+            Map<String, Object> rule2 = new HashMap<>();
+            rule2.put("id", "remove_urls_emails");
+            rule2.put("enabled", true);
+            preProcessingRules.add(rule2);
+            rules.put("pre_processing_rules", preProcessingRules);
+
+            // 分段规则
+            Map<String, Object> segmentation = new HashMap<>();
+            segmentation.put("separator", "\n\n");
+            segmentation.put("max_tokens", 500);
+            rules.put("segmentation", segmentation);
+            processRule.put("rules", rules);
+        }else {
+            processRule.put("mode", "automatic");
+        }
+        requestData.put("process_rule", processRule);
+
+        return  difyClient.createDocumentByFile(tmpFile, originalFilename, requestData);
+    }
+
+    @Override
+    public JSONObject getDocumentIndexingStatus(String datasetId,String documentId) throws IOException {
+
+        ChatDocumentDO doc = chatDocumentMapper.selectById(documentId);
+
+        new Thread(() -> {
+            try {
+                int waited = 0;
+                while (waited < 60*60*24) {
+                    JSONObject status = difyClient.getDocumentIndexingStatus(datasetId, doc.getBatch());
+                    JSONArray array = status.getJSONArray("data");
+                    if (array == null || array.isEmpty()) {
+                        log.warn("dify文档处理状态为空 name={} 状态:{}", doc.getName(), status);
+                        return;
+                    }
+                    JSONObject item = array.getJSONObject(0);
+                    String state = item.getString("indexing_status");
+                    Integer completedSegments = item.getInteger("completed_segments");
+                    log.info("dify文档处理状态 name={} 状态:{},分段:{}", doc.getName(), state,  completedSegments);
+                    if ("completed".equals(state)) {
+                        if (doc.getId() == null) {
+                            ChatDocumentDO aDo = chatDocumentMapper.selectOne("dataset_id", doc.getDatasetId(), "batch", doc.getBatch());
+                            doc.setId(aDo.getId());
+                            doc.setCreator(aDo.getCreator());
+                            doc.setUpdater(aDo.getUpdater());
+                            doc.setName(aDo.getName());
+                        }
+                        log.info("dify文档处理完成 name={} ✅",  doc.getName());
+//                doc.setHitCount(completedSegments);
+                        doc.setIndexingStatus("completed");
+                        doc.setCompletedSegments(completedSegments);
+                        try {
+                            JSONObject jsonObject = difyClient.listDocuments(datasetId, doc.getName(), null, null);
+                            JSONArray dataArray = jsonObject.getJSONArray("data");
+                            if (!dataArray.isEmpty()) {
+                                JSONObject firstDocument = dataArray.getJSONObject(0);
+                                // 提取 "word_count" 字段
+                                int wordCount = firstDocument.getIntValue("word_count");
+                                doc.setWordCount(wordCount);
+                            }
+                        } catch (Exception e) {
+                            log.error("dify获取文档信息失败", e);
+                        }
+                        chatDocumentMapper.updateById(doc);
+                        return;
+                    } else if ("failed".equals(state)) {
+                        log.warn("dify文档处理失败 name={} ❌ 错误:{}", doc.getName(), status.getString("error"));
+                        // 可更新失败状态
+                        return;
+                    }
+                    Thread.sleep(5000);
+                    waited += 5;
+                }
+                log.warn("轮询超时 name={} ⏰", doc.getName());
+
+
+            } catch (Exception e) {
+                log.error("异步轮询 Dify 状态失败", e);
+            }
+        }).start();
+
+
+        return difyClient.getDocumentIndexingStatus(datasetId, doc.getBatch());
+
+    }
+
+
+
+    @Override
+    public void deleteDocument(String datasetId, String documentId) throws IOException {
+        difyClient.deleteDocument(datasetId, documentId);
+    }
+
+    @Override
+    public ChatMessageRespVO sendChatMessageBlock(ChatMessageSendReqVO sendReqVO) throws IOException {
+        String conversationId = sendReqVO.getConversationId();
+        // 1. 从登录上下文取用户昵称
+        String nickname = String.valueOf(SecurityFrameworkUtils.getLoginUser().getInfo().get("nickname"));
+        ChatMessageDO msg = new ChatMessageDO();
+        msg.setUserId(nickname);
+        msg.setContent(sendReqVO.getQuery());
+        msg.setRole("user");
+        if (StringUtils.isNotBlank(conversationId)) {
+            msg.setConversationId(conversationId);
+        }
+        chatMessageMapper.insert(msg);
+        ChatAppDO appDO = getAppKey(sendReqVO);
+
+        // 2. 构造请求体
+        JSONObject body = new JSONObject();
+        body.put("query", sendReqVO.getQuery());
+        body.put("inputs", Optional.ofNullable(sendReqVO.getInputs()).orElse(new HashMap<>()));
+        if (StringUtils.isNotBlank(conversationId)) {
+            body.put("conversation_id", conversationId);
+        }
+        body.put("user", nickname);
+        body.put("response_mode", "blocking");
+        body.put("auto_generate_name", true);
+        body.put("enable_retrieval", true);
+//        ChatKnowledgeDatasetDO datasetDO = chatKnowledgeDatasetMapper.selectById(sendReqVO.getDatasetId());
+//        body.put("datasets", Collections.singletonList(datasetDO.getDatasetId()));
+        log.info("Dify 请求体: {}", body.toJSONString());
+        // 3. 发起 HTTP POST
+        URL url = new URL(difyBaseUrl + "/v1/chat-messages");
+        HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+        conn.setRequestProperty("Authorization", "Bearer " + appDO.getAppKey());
+        conn.setRequestProperty("Content-Type", "application/json");
+        conn.setDoOutput(true);
+        conn.setRequestMethod("POST");
+        try (OutputStream os = conn.getOutputStream()) {
+            os.write(body.toJSONString().getBytes(StandardCharsets.UTF_8));
+        }
+
+        // 5. 读取响应(
+        int code = conn.getResponseCode();
+        InputStream rawStream = (code >= 200 && code < 300)
+                ? conn.getInputStream()
+                : conn.getErrorStream();
+
+        String respStr;
+        if (rawStream == null) {
+            respStr = "Dify API 无返回流,HTTP 状态码:" + code;
+        } else {
+            try (BufferedReader reader = new BufferedReader(new InputStreamReader(rawStream, StandardCharsets.UTF_8))) {
+                respStr = reader.lines().collect(Collectors.joining());
+            }
+        }
+
+        if (code < 200 || code >= 300) {
+            throw new IOException("Dify API返回错误: HTTP " + code + ",响应: " + respStr);
+        }
+
+        // 6. 解析并保存
+        return parseAndSave(respStr, nickname,msg, appDO);
+    }
+    private ChatMessageRespVO parseAndSave(String jsonStr, String nickname, ChatMessageDO msg,  ChatAppDO appDO) {
+        JSONObject json = JSONObject.parseObject(jsonStr);
+        String did = json.getString("conversation_id");
+
+        // 6.1 本地如果尚未存 Dify 的 conversation_id,则更新
+        ChatSessionDO session = chatSessionMapper.selectOne(new LambdaQueryWrapperX<ChatSessionDO>().eq(ChatSessionDO::getConversationId, did));
+        if (session == null) {
+            session = new ChatSessionDO();
+            session.setName(msg.getContent());
+            session.setUserId(nickname);
+            session.setConversationId(did);
+            session.setAppId(appDO.getId());
+            chatSessionMapper.insert(session);
+            // ======== 补充:从 Dify 获取会话名称(同步 + 异步)=======
+//            asyncUpdateSessionName(did, session.getId());
+
+        }
+        chatMessageMapper.updateById(msg.setConversationId(did).setSessionId(session.getId()));
+        JSONObject metadata = json.getJSONObject("metadata");
+        String metadataStr = metadata.toJSONString();
+        // 6.2 保存消息
+        ChatMessageDO m = new ChatMessageDO();
+        m.setConversationId(did);
+        m.setSessionId(session.getId());
+        m.setUserId(nickname);
+        m.setMessageId(json.getString("message_id"));
+        m.setContent(json.getString("answer"));
+        m.setRole("assistant");
+        m.setMetadataJson(metadataStr);
+        chatMessageMapper.insert(m);
+
+        // 7. 组装 VO 返回
+        ChatMessageRespVO vo = new ChatMessageRespVO();
+        vo.setSessionId(session.getId());
+        vo.setConversationId(did);
+        vo.setUserId(nickname);
+        vo.setMessageId(m.getMessageId());
+        vo.setContent(m.getContent());
+        vo.setRole(m.getRole());
+        vo.setMessageTime(m.getCreateTime());
+        vo.setStatus("completed");
+        vo.setResponseJson(jsonStr);
+        vo.setMetadataJson(metadata);
+        return vo;
+    }
+
+    public ChatAppDO getAppKey(ChatMessageSendReqVO sendReqVO) {
+        ChatAppDO appDO = new ChatAppDO();
+        if (StringUtils.isBlank(sendReqVO.getName())) {
+            throw exception(CHAT_APP_NOT_EXISTS);
+        } else {
+            if (sendReqVO.getNetworking() == true) {
+                appDO = chaAppMapper.selectOne(new LambdaQueryWrapperX<ChatAppDO>().like(ChatAppDO::getAppName, sendReqVO.getName()).eq(ChatAppDO::getNetworking, 1).eq(ChatAppDO::getAppType, 1));
+            }else {
+                appDO = chaAppMapper.selectOne(new LambdaQueryWrapperX<ChatAppDO>().like(ChatAppDO::getAppName, sendReqVO.getName()).eq(ChatAppDO::getNetworking, 0).eq(ChatAppDO::getAppType, 1));
+            }
+            if (appDO == null) {
+                throw exception(CHAT_APP_NOT_EXISTS);
+            }
+            return appDO;
+        }
+    }
+
+
+}

+ 380 - 0
guoyan-ai/src/main/java/com/cy/guoyan/admin/module/ai/service/aiservice/RagFlowService.java

@@ -0,0 +1,380 @@
+package com.cy.guoyan.admin.module.ai.service.aiservice;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.cy.guoyan.admin.framework.common.util.object.BeanUtils;
+import com.cy.guoyan.admin.framework.mybatis.core.query.LambdaQueryWrapperX;
+import com.cy.guoyan.admin.framework.security.core.util.SecurityFrameworkUtils;
+import com.cy.guoyan.admin.module.ai.controller.admin.chatdocument.vo.ChatDocumentSaveReqVO;
+import com.cy.guoyan.admin.module.ai.controller.admin.chatknowledgedataset.vo.ChatKnowledgeDatasetSaveReqVO;
+import com.cy.guoyan.admin.module.ai.controller.admin.chatmessage.vo.ChatMessageRespVO;
+import com.cy.guoyan.admin.module.ai.controller.admin.chatmessage.vo.ChatMessageSendReqVO;
+import com.cy.guoyan.admin.module.ai.dal.dataobject.chatapp.ChatAppDO;
+import com.cy.guoyan.admin.module.ai.dal.dataobject.chatdocument.ChatDocumentDO;
+import com.cy.guoyan.admin.module.ai.dal.dataobject.chatknowledgedataset.ChatKnowledgeDatasetDO;
+import com.cy.guoyan.admin.module.ai.dal.dataobject.chatmessage.ChatMessageDO;
+import com.cy.guoyan.admin.module.ai.dal.dataobject.chatsession.ChatSessionDO;
+import com.cy.guoyan.admin.module.ai.dal.mysql.chatapp.ChatAppMapper;
+import com.cy.guoyan.admin.module.ai.dal.mysql.chatdocument.ChatDocumentMapper;
+import com.cy.guoyan.admin.module.ai.dal.mysql.chatknowledgedataset.ChatKnowledgeDatasetMapper;
+import com.cy.guoyan.admin.module.ai.dal.mysql.chatmessage.ChatMessageMapper;
+import com.cy.guoyan.admin.module.ai.dal.mysql.chatsession.ChatSessionMapper;
+import com.cy.guoyan.admin.module.ai.util.DifyClient;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+import org.springframework.web.multipart.MultipartFile;
+
+import javax.annotation.Resource;
+import java.io.*;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.*;
+import java.util.stream.Collectors;
+
+import static com.cy.guoyan.admin.framework.common.exception.util.ServiceExceptionUtil.exception;
+import static com.cy.guoyan.admin.module.system.enums.ErrorCodeConstants.CHAT_APP_NOT_EXISTS;
+import static com.cy.guoyan.admin.module.system.enums.ErrorCodeConstants.CHAT_KNOWLEDGE_DATASET_NOT_EXISTS;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+@Service
+@Qualifier("ragflow")
+@Slf4j
+public class RagFlowService implements AiService {
+
+    @Resource
+    private ChatKnowledgeDatasetMapper chatKnowledgeDatasetMapper;
+
+    @Resource
+    private ChatDocumentMapper chatDocumentMapper;
+
+    @Value("${ragflow.apiKey}")
+    private String ragflowApiKey;
+
+
+    @Value("${ragflow.endpoint}")
+    private String ragflowBaseUrl;
+
+    @Resource
+    private ChatSessionMapper chatSessionMapper;
+
+    @Resource
+    private ChatMessageMapper chatMessageMapper;
+
+    @Resource
+    private ChatAppMapper chaAppMapper;
+
+    private final DifyClient difyClient;
+
+    public RagFlowService(DifyClient difyClient) {
+        this.difyClient = difyClient;
+    }
+
+    private HttpURLConnection prepare(String method, String path) throws IOException {
+        URL url = new URL(ragflowBaseUrl + path);
+        HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+        conn.setRequestMethod(method);
+        conn.setRequestProperty("Authorization", "Bearer " + ragflowApiKey);
+        return conn;
+    }
+
+    /** 读取响应 body 或抛异常 */
+    private String readResponse(HttpURLConnection conn) throws IOException {
+        int code = conn.getResponseCode();
+        InputStream is = (code >= 200 && code < 300) ? conn.getInputStream() : conn.getErrorStream();
+        if (is == null) throw new IOException("无返回流,HTTP " + code);
+        String body;
+        try (BufferedReader reader = new BufferedReader(new InputStreamReader(is, UTF_8))) {
+            body = reader.lines().collect(Collectors.joining());
+        }
+        if (code < 200 || code >= 300) {
+            throw new IOException("Dify 错误 HTTP " + code + "," + body);
+        }
+        return body;
+    }
+
+    private LocalDateTime epoch2Local(Long epoch) {
+        return epoch == null ? null : LocalDateTime.ofInstant(
+                Instant.ofEpochSecond(epoch), ZoneId.systemDefault());
+    }
+
+    @Override
+    public JSONObject createDataset(ChatKnowledgeDatasetSaveReqVO vo)throws IOException {
+        JSONObject body = new JSONObject();
+        body.put("name", vo.getName());
+        body.put("description", vo.getDescription());
+//        body.put("language", "Chinese");
+//        body.put("embedding_model", vo.getEmbeddingModel());
+//        body.put("permission", "team");
+
+        HttpURLConnection conn = prepare("POST", "/api/v1/datasets");
+        conn.setDoOutput(true);
+        conn.setRequestProperty("Content-Type", "application/json");
+        try (OutputStream os = conn.getOutputStream()) {
+            os.write(body.toJSONString().getBytes(UTF_8));
+        }
+        String resp = readResponse(conn);
+
+        return JSONObject.parseObject(resp);
+    }
+
+    @Override
+    public void updateDataset(ChatKnowledgeDatasetSaveReqVO updateReqVO) {
+        // 校验存在
+        validateChatKnowledgeDatasetExists(updateReqVO.getId());
+        // 更新
+        ChatKnowledgeDatasetDO updateObj = BeanUtils.toBean(updateReqVO, ChatKnowledgeDatasetDO.class);
+        chatKnowledgeDatasetMapper.updateById(updateObj);
+    }
+
+    @Override
+    public String deleteDataset(String datasetId) throws IOException {
+        JSONArray idsArray = new JSONArray(Collections.singletonList(datasetId));
+        JSONObject body = new JSONObject();
+        body.put("ids", idsArray);
+        HttpURLConnection conn = prepare("DELETE", "/api/v1/datasets");
+        log.info("删除知识库:{}", datasetId);
+
+        conn.setDoOutput(true);
+        conn.setRequestProperty("Content-Type", "application/json");
+        try (OutputStream os = conn.getOutputStream()) {
+            os.write(body.toJSONString().getBytes(UTF_8));
+        }
+
+        return readResponse(conn);
+
+    }
+
+    private void validateChatKnowledgeDatasetExists(Long id) {
+        if (chatKnowledgeDatasetMapper.selectById(id) == null) {
+            throw exception(CHAT_KNOWLEDGE_DATASET_NOT_EXISTS);
+        }
+    }
+
+    @Override
+    public JSONObject createDocumentByFile(MultipartFile file, ChatDocumentSaveReqVO createReqVO,File tmpFile) throws IOException {
+
+        return difyClient.ragflowCreateDocumentByFile(Collections.singletonList(tmpFile),createReqVO.getName(), createReqVO.getDatasetId());
+    }
+
+    @Override
+    public JSONObject getDocumentIndexingStatus(String datasetId, String documentId) throws IOException {
+
+        ChatDocumentDO doc = chatDocumentMapper.selectById(documentId);
+
+        new Thread(() -> {
+            try {
+                int waited = 0;
+                while (waited < 60 * 60 * 24) {
+                    // 查询文档解析状态
+                    JSONObject status = difyClient.getRagflowDocumentStatus(datasetId, doc.getRagflowDocumentId());
+                    JSONArray array = status.getJSONObject("data").getJSONArray("docs");
+                    if (array == null || array.isEmpty()) {
+                        log.warn("ragflow文档处理状态为空 name={} 状态:{}", doc.getName(), status);
+                        return;
+                    }
+                    JSONObject item = array.getJSONObject(0);
+                    String state = item.getString("run");
+                    log.info("ragflow文档处理状态 name={} 状态:{}", doc.getName(), state);
+
+                    if ("DONE".equals(state)) {
+                        doc.setIndexingStatus("completed");
+
+                        log.info("ragflow文档处理完成 name={} ✅", doc.getName());
+                        // 更新文档状态
+                        doc.setRagflowStatus("DONE");
+
+                        chatDocumentMapper.updateById(doc);
+                        return;
+                    } else if ("FAILED".equals(state)) {
+                        log.warn("ragflow文档处理失败 name={} ❌ 错误:{}", doc.getName(), status.getString("error"));
+                        // 可更新失败状态
+                        doc.setRagflowStatus("FAIL");
+                        chatDocumentMapper.updateById(doc);
+                        return;
+                    }
+                    Thread.sleep(5000);
+                    waited += 5;
+                }
+                log.warn("轮询超时 name={} ⏰", doc.getName());
+            } catch (Exception e) {
+                log.error("异步轮询 ragflow文件 状态失败", e);
+            }
+        }).start();
+        return difyClient.ragflowDocumentChunks(datasetId, Collections.singletonList(doc.getRagflowDocumentId()));
+    }
+
+    @Override
+    public void deleteDocument(String datasetId, String documentId) throws IOException {
+        difyClient.deleteRagFlowDocuments(datasetId, documentId);
+    }
+
+    @Override
+    public ChatMessageRespVO sendChatMessageBlock(ChatMessageSendReqVO sendReqVO) throws IOException {
+        String conversationId = sendReqVO.getConversationId();
+        // 1. 从登录上下文取用户昵称
+        String nickname = String.valueOf(SecurityFrameworkUtils.getLoginUser().getInfo().get("nickname"));
+        ChatMessageDO msg = new ChatMessageDO();
+        msg.setUserId(nickname);
+        msg.setContent(sendReqVO.getQuery());
+        msg.setRole("user");
+        if (StringUtils.isNotBlank(conversationId)) {
+            msg.setConversationId(conversationId);
+        }
+        ChatAppDO appDO = getAppKey(sendReqVO);
+
+        // 2. 构造请求体
+        JSONObject body = new JSONObject();
+        body.put("question", sendReqVO.getQuery());
+        if (StringUtils.isNotBlank(conversationId)) {
+            body.put("session_id", conversationId);
+        }else {
+            String session = createChatSession(appDO.getAppKey(), sendReqVO.getQuery(), nickname);
+            body.put("session_id", session);
+        }
+        body.put("user_id", nickname);
+        body.put("stream", false);
+        chatMessageMapper.insert(msg);
+
+        log.info("ragflow 请求体: {}", body.toJSONString());
+        // 3. 发起 HTTP POST
+        URL url = new URL(ragflowBaseUrl + "/api/v1/chats/"+appDO.getAppKey()+"/completions");
+        HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+        conn.setRequestProperty("Authorization", "Bearer " + ragflowApiKey);
+        conn.setRequestProperty("Content-Type", "application/json");
+        conn.setDoOutput(true);
+        conn.setRequestMethod("POST");
+        try (OutputStream os = conn.getOutputStream()) {
+            os.write(body.toJSONString().getBytes(StandardCharsets.UTF_8));
+        }
+
+        // 5. 读取响应(
+        int code = conn.getResponseCode();
+        InputStream rawStream = (code >= 200 && code < 300)
+                ? conn.getInputStream()
+                : conn.getErrorStream();
+
+        String respStr;
+        if (rawStream == null) {
+            respStr = "ragflow API 无返回流,HTTP 状态码:" + code;
+        } else {
+            try (BufferedReader reader = new BufferedReader(new InputStreamReader(rawStream, StandardCharsets.UTF_8))) {
+                respStr = reader.lines().collect(Collectors.joining());
+            }
+        }
+
+        if (code < 200 || code >= 300) {
+            throw new IOException("ragflow API返回错误: HTTP " + code + ",响应: " + respStr);
+        }
+
+        // 6. 解析并保存
+        return parseAndSave(respStr, nickname,msg, appDO);
+    }
+    private ChatMessageRespVO parseAndSave(String jsonStr, String nickname, ChatMessageDO msg,  ChatAppDO appDO) {
+        JSONObject data = JSONObject.parseObject(jsonStr);
+        JSONObject json = JSON.parseObject(data.getString("data"));
+        String did = json.getString("session_id");
+
+        // 6.1 本地如果尚未存 ragflow 的 conversation_id,则更新
+        ChatSessionDO session = chatSessionMapper.selectOne(new LambdaQueryWrapperX<ChatSessionDO>().eq(ChatSessionDO::getConversationId, did));
+        if (session == null) {
+            session = new ChatSessionDO();
+            session.setName(msg.getContent());
+            session.setUserId(nickname);
+            session.setConversationId(did);
+            session.setAppId(appDO.getId());
+            chatSessionMapper.insert(session);
+        }
+        chatMessageMapper.updateById(msg.setConversationId(did).setSessionId(session.getId()));
+        // 6.2 保存消息
+        ChatMessageDO m = new ChatMessageDO();
+        m.setConversationId(did);
+        m.setSessionId(session.getId());
+        m.setUserId(nickname);
+        m.setMessageId(json.getString("id"));
+        m.setContent(json.getString("answer"));
+        m.setRole("assistant");
+        m.setMetadataJson(json.toJSONString());
+        chatMessageMapper.insert(m);
+
+        // 7. 组装 VO 返回
+        ChatMessageRespVO vo = new ChatMessageRespVO();
+        vo.setSessionId(session.getId());
+        vo.setConversationId(did);
+        vo.setUserId(nickname);
+        vo.setMessageId(m.getMessageId());
+        vo.setContent(m.getContent());
+        vo.setRole(m.getRole());
+        vo.setMessageTime(m.getCreateTime());
+        vo.setStatus("completed");
+        vo.setResponseJson(jsonStr);
+        vo.setMetadataJson(json);
+        return vo;
+    }
+
+    public ChatAppDO getAppKey(ChatMessageSendReqVO sendReqVO) {
+        ChatAppDO appDO = new ChatAppDO();
+        if (StringUtils.isBlank(sendReqVO.getName())) {
+            throw exception(CHAT_APP_NOT_EXISTS);
+        } else {
+            if (sendReqVO.getNetworking() == true) {
+                appDO = chaAppMapper.selectOne(new LambdaQueryWrapperX<ChatAppDO>().like(ChatAppDO::getAppName, sendReqVO.getName()).eq(ChatAppDO::getNetworking, 1).eq(ChatAppDO::getAppType, 2));
+            }else {
+                appDO = chaAppMapper.selectOne(new LambdaQueryWrapperX<ChatAppDO>().like(ChatAppDO::getAppName, sendReqVO.getName()).eq(ChatAppDO::getNetworking, 0).eq(ChatAppDO::getAppType, 2));
+            }
+            if (appDO == null) {
+                throw exception(CHAT_APP_NOT_EXISTS);
+            }
+            return appDO;
+        }
+    }
+
+    public String createChatSession(String chatId, String name, String userId) throws IOException {
+        String url = String.format("%s/api/v1/chats/%s/sessions", ragflowBaseUrl, chatId);
+
+        // 创建请求体
+        JSONObject body = new JSONObject();
+        body.put("name", name);
+        if (userId != null && !userId.isEmpty()) {
+            body.put("user_id", userId);
+        }
+
+        // 创建 POST 请求
+        HttpPost post = new HttpPost(url);
+        post.setHeader("Content-Type", "application/json");
+        post.setHeader("Authorization", "Bearer " + ragflowApiKey);
+        post.setEntity(new StringEntity(body.toJSONString(), StandardCharsets.UTF_8));
+
+        // 执行请求
+        try (CloseableHttpClient client = HttpClients.createDefault();
+             CloseableHttpResponse response = client.execute(post)) {
+
+            int statusCode = response.getStatusLine().getStatusCode();
+            if (statusCode != 200) {
+                throw new IOException("Failed to create session, status: " + statusCode);
+            }
+
+            // 读取响应体
+            String jsonString = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8);
+            JSONObject json = JSONObject.parseObject(jsonString);
+
+            // 提取会话 ID
+            return json.getJSONObject("data").getString("id");
+        }
+    }
+}