@senspond
>
Java Spring WebFlux 로 ChatGPT OpenAI Streaming API 만들고 자바스크립트에서 스트리밍 요청처리를 하는 방법을 정리해봅니다.
ChatGPT와 같은 LLM(Large Language Model, 대형 언어 모델)으로 ChatBot서비스들이 많이 등장하면서 웹에서 텍스트를 스트리밍하는 사례가 점점 늘어나고 있습니다. 또한 대용량 파일을 웹에서 읽을 때도 사용됩니다.
< 저의 고민상담 ChatBot >
<!doctype html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport"
content="width=device-width, user-scalable=no, initial-scale=1.0, maximum-scale=1.0, minimum-scale=1.0">
<meta http-equiv="X-UA-Compatible" content="ie=edge">
<title>Document</title>
</head>
<body>
<h1>하이</h1>
<div id="output"></div>
<script src="./stream.js"></script>
</body>
</html>
아래는 가상의 데이터 스트림을 만들고 출력해봅니다.
function streaming(){
const stream = new ReadableStream({
start(controller) {
console.log("start");
let num = 0;
const interval = setInterval(() => {
controller.enqueue(num++);
if (num === 10) {
controller.close();
clearInterval(interval);
}
}, 500);
},
});
const element = document.querySelector('#output')
const reader = stream.getReader();
reader.read().then(function print({ done, value }) {
if (done) return console.log("done");
console.log({ value });
const item = document.createElement("div");
item.innerHTML = `<div>${value}</div>`
element.append(item)
reader.read().then(print);
});
}
streaming()
0.5초 간격으로 0부터 9까지 하나씩 출력되고 있는 것을 볼 수 있습니다.
Javascript Streams API은 크게 4가지가 있는데 간략히 정리하면 이러합니다.
Readable Streams : 파일에서 읽을 때와 같이 데이터 읽기를 허용
Writable Streams : 파일에 쓸 때와 같이 데이터 쓰기를 허용한다.
Duplex Streams : TCP 소켓처럼 읽기 및 쓰기가 모두 가능
Transform Streams : 데이터를 쓰고 읽을 때 데이터를 수정하거나 변환할 수 있는 이중 스트림
WebFlux는 Spring 5에서 새롭게 추가된 Reactive-stack의 웹 프레임워크이며, 클라이언트/서버에서 리액티브(reactive) 애플리케이션 개발을 위한 논블로킹 리액티브 스트림을 지원합니다.
Java 백엔드에서 NodeJS처럼 비동기 스트리밍 데이터를 응답해 줄 수 있습니다.
JDK 17
Spring Boot 3.0.1
implementation group: 'org.springframework.boot', name: 'spring-boot-starter-webflux', version: '3.0.1'
@CrossOrigin(origins = "*") // 잠시 테스트를 위해
@RestController
public class ExampleRestController{
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
Flux<Map<String, Integer>> stream() {
Stream<Integer> stream = Stream.iterate(0, i -> i + 1);
return Flux.fromStream(stream.limit(200))
.map(i -> {
try {
Thread.sleep(150);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Collections.singletonMap("value", i);
}
);
}
}
응답이 정적으로 한번에 오는 것이 아니라 connection 이 closed 될 때까지 계속 이어지는 것을 볼 수 있습니다.
프론트엔드 에서 fetch API를 이용해 받아올 때는 아래처럼 사용할 수 있습니다.
일반적인 Rest API와는 다르게 Stream 을 읽어올 수 있는 reader 를 통해 읽어와야 합니다.
그리고 Stream이 닫힐 때까지 재귀적으로 반복적으로 읽어오게 합니다.
<!doctype html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport"
content="width=device-width, user-scalable=no, initial-scale=1.0, maximum-scale=1.0, minimum-scale=1.0">
<meta http-equiv="X-UA-Compatible" content="ie=edge">
<title>Document</title>
</head>
<body>
<h1>Stream</h1>
<div id="output"></div>
</body>
</html>
<script>
function fetchData(){
const element = document.querySelector('#output')
fetch("http://localhost:9090/stream").then( res=>{
const reader = res.body
.pipeThrough(new TextDecoderStream())
.getReader();
reader.read().then(function print({ done, value }) {
if (done) return console.log("done");
console.log({ value });
const item = document.createElement("div");
item.innerHTML = `<div>${value}</div>`
element.append(item)
reader.read().then(print);
});
console.log(reader)
console.log('Response fully received');
}
fetchData();
</script>
@Getter
@NoArgsConstructor
@AllArgsConstructor
public class ChatMessage implements Serializable {
private String role;
private String content;
}
@Getter
@NoArgsConstructor
public class ChatOpenAiRequest implements Serializable {
private String model;
@JsonProperty("max_tokens")
private Integer maxTokens;
private Double temperature;
@JsonProperty("top_p")
private Double topP;
private Boolean stream;
private List<ChatMessage> messages;
@Builder
public ChatOpenAiRequest(String model, Integer maxTokens, Double temperature,
Boolean stream, List<ChatMessage> messages, Double topP) {
this.model = model;
this.maxTokens = maxTokens;
this.temperature = temperature;
this.stream = stream;
this.messages = messages;
this.topP = topP;
}
}
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode(of="id")
@Getter
public class ChatCompletionChunkResponse implements Serializable {
private String id;
private List<Choices> choices;
@NoArgsConstructor
@AllArgsConstructor
@Getter
public static class Choices{
private Delta delta;
@Getter
@NoArgsConstructor
@AllArgsConstructor
public static class Delta{
private String content;
}
}
}
중요한것은 content를 꺼내오는 것이기 때문에 다른 응답 필드는 굳이 맵핑 할 필요가 없습니다.
@Component
public class ChatOpenAiProvider {
@Value("${openai.key}")
private String openAiKey;
private WebClient webClient;
private final ObjectMapper objectMapper = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
.setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE );
@PostConstruct
public void init() {
var client = HttpClient.create().responseTimeout(Duration.ofSeconds(45));
webClient = WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(client))
.baseUrl("https://api.openai.com/v1/chat/completions")
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.defaultHeader("Authorization", "Bearer " + openAiKey)
.build();
}
public ChatOpenAiProvider(){ }
public Flux<String> ask(ChatOpenAiRequest request) throws JsonProcessingException {
String requestValue = objectMapper.writeValueAsString(request);
return webClient.post()
.bodyValue(requestValue)
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(ChatCompletionChunkResponse.class)
.onErrorResume(error -> {
if (error.getMessage().contains("JsonToken.START_ARRAY")) {
return Flux.empty();
}
else {
return Flux.error(error);
}
})
.filter(response -> {
var content = response.getChoices().get(0).getDelta().getContent();
return content != null && !content.equals("\n\n");
})
.map(response -> response.getChoices().get(0).getDelta().getContent());
}
}
@Service
@RequiredArgsConstructor
public class OpenAiService {
private final ChatOpenAiProvider chatOpenAiProvider;
public Flux<String> rgbit_chat(String message) throws JsonProcessingException {
List<ChatMessage> messages = new ArrayList<>();
messages.add(new ChatMessage("system", """
당신은 RGB코딩이 만든 훌륭한 AI어시스턴스 입니다.
사용자가 질문을 하면 답변을 해주고 다함께 코딩!코딩! 외쳐 주세요.
"""));
messages.add(new ChatMessage("user", message));
var request = ChatOpenAiRequest.builder()
.model("gpt-3.5-turbo")
.topP(1.0)
.temperature(0.7)
.messages(messages)
.maxTokens(2000)
.stream(true)
.build();
return chatOpenAiProvider.ask(request);
}
}
@CrossOrigin(origins = "*") // 잠시 테스트를 위해
@RestController
@RequiredArgsConstructor
public class OpenAiController {
private final OpenAiService openAiService;
private final ChatService chatService;
@PostMapping(value="/rgbit_chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> rgbit_chat(@RequestParam(value = "message") String message) throws JsonProcessingException {
return openAiService.rgbit_chat(message);
}
}
html은 위 Example 의 코드와 동일합니다.
function fetchData2(){
const element = document.querySelector('#output')
const formData = new FormData();
formData.append("message", "너는 누구니?")
fetch("http://localhost:9090/rgbit_chat", {
method: 'POST',
body : formData
}).then(res=>{
const reader = res.body
.pipeThrough(new TextDecoderStream())
.getReader();
reader.read().then(function print({ done, value }) {
if (done) return console.log("done");
console.log({ value });
const item = document.createElement("div");
item.innerHTML = `<div>${value}</div>`
element.append(item)
reader.read().then(print);
});
});
console.log('Response fully received');
}
fetchData2()
value값을 보면 data: 나 \n 같은 문자가 포함되어 들어오고 있는데, 좀 처리를 해서 출력해줄 필요가 있습니다.
async function fetchData2() {
const element = document.querySelector('#output')
const formData = new FormData();
formData.append("message", "너는 누구니?")
const res = await fetch("http://localhost:9090/rgbit_chat", {
method: 'POST',
body: formData
})
const data = res.body
const reader = data.getReader();
const decoder = new TextDecoder()
let done = false;
let lastMessage = "";
while (!done && reader) {
const { value, done: doneReading } = await reader.read();
done = doneReading;
const chunkValue = decoder.decode(value);
lastMessage = lastMessage + chunkValue;
const text = lastMessage.replaceAll("data:", "").trim()
const item = document.createElement("div");
item.innerHTML = `<div>${text}</div>`
element.append(text)
}
}
계속 chunkValue를 붙인 lastMessage를 차례대로 출력한 것 입니다.
async function fetchData2() {
const element = document.querySelector('#output')
const formData = new FormData();
formData.append("message", "너는 누구니?")
const res = await fetch("http://localhost:9090/rgbit_chat", {
method: 'POST',
body: formData
})
const data = res.body
const reader = data?.getReader();
const decoder = new TextDecoder()
let done = false;
let lastMessage = "";
while (!done && reader) {
const { value, done: doneReading } = await reader.read();
done = doneReading;
const chunkValue = decoder.decode(value);
lastMessage = lastMessage + chunkValue;
const text = lastMessage.replaceAll("data:", "").trim()
element.innerHTML = text
}
}
이렇게 하면 이제 타이핑 하듯이 텍스트가 출력되는 것을 볼 수 있습니다.
그런데 Java로 구현한 코드들은 NodeJS로 구현하면 단 몇 줄이면 끝나는 코드라는 것 ㅠㅠ
그리고 React 와 결합된 NextJS 를 사용하면 이런거 만들때 정말 좋은 것 같습니다.
React도 이렇게 DOM을 자주 변경해야 하는 경우에 이점을 가지고 있고요.
나중에 Full-Stack 프레임워크인 NextJS에서 백엔드 API를 만들고 React 로 하는 부분을 정리해보도록 하겠습니다.
안녕하세요. Red, Green, Blue 가 만나 새로운 세상을 만들어 나가겠다는 이상을 가진 개발자의 개인공간입니다.
현재글에서 작성자가 발행한 같은 카테고리내 이전, 다음 글들을 보여줍니다
@senspond
>