atrade/src/main/kotlin/network/KisWebSocketManager.kt
2026-02-12 15:31:34 +09:00

195 lines
7.8 KiB
Kotlin

package network
import androidx.compose.runtime.mutableStateOf
import io.ktor.client.*
import io.ktor.client.plugins.websocket.*
import io.ktor.http.*
import io.ktor.websocket.*
import kotlinx.coroutines.*
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.jsonObject
import kotlinx.serialization.json.jsonPrimitive
import model.KisSession
import model.RealTimeTrade
import util.AesCrypto
import java.util.concurrent.atomic.AtomicBoolean
class KisWebSocketManager {
private val client = HttpClient {
install(WebSockets) {
pingInterval = 20_000 // 20초마다 표준 웹소켓 핑 전송 (서버-클라이언트 연결 유지 도움)
}
}
private var session: DefaultClientWebSocketSession? = null
private val isConnected = AtomicBoolean(false)
private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())
// 콜백 리스너
var onPriceUpdate: ((RealTimeTrade) -> Unit)? = null
var onExecutionReceived: ((String, String, String, String, Boolean) -> Unit)? = null
suspend fun connect() {
if (isConnected.get()) return
val url = if (KisSession.config.isSimulation) "ops.koreainvestment.com:31000" else "ops.koreainvestment.com:21000"
scope.launch {
while (isActive) { // 재연결을 위한 루프 추가
try {
client.webSocket(method = HttpMethod.Get, host = url.split(":")[0], port = url.split(":")[1].toInt(), path = "/last_price") {
session = this
isConnected.set(true)
println("✅ 웹소켓 연결 성공")
// 기존 구독 신청 로직 (H0STCNI0 등)
val htsId = KisSession.config.htsId
if (htsId.isNotEmpty()) sendRequest("1", "H0STCNI0", htsId)
// 메시지 수신 루프
for (frame in incoming) {
if (frame is Frame.Text) {
val text = frame.readText()
// [핵심] PINGPONG 처리: 수신된 텍스트 그대로 다시 전송
if (text.contains("PINGPONG")) {
send(Frame.Text(text))
// println("🔄 PINGPONG 응답 완료")
} else {
handleMessage(text)
}
}
}
}
} catch (e: Exception) {
println("❌ 웹소켓 연결 끊김: ${e.message}")
} finally {
isConnected.set(false)
session = null
println("⏳ 5초 후 재연결 시도...")
delay(5000) // 5초 후 재연결 시도
}
}
}
}
// private val _currentPrice = mutableStateOf("0")
private val currentPrice = androidx.compose.runtime.mutableStateMapOf<String, String>()
// val currentPrice = _currentPrice
val tradeLogs = androidx.compose.runtime.mutableStateListOf<model.RealTimeTrade>()
suspend fun unsubscribeStock(code: String) {
subscribeStock(code, isSubscribe = false)
}
// 체결 통보 복호화를 위한 키 저장소
private var aesKey: String = ""
private var aesIv: String = ""
private fun handleMessage(message: String) {
if (message.startsWith("{")) {
val json = Json.parseToJsonElement(message).jsonObject
val trId = json["header"]?.jsonObject?.get("tr_id")?.jsonPrimitive?.content
if (trId == "H0STCNI0" || trId == "H0STCNI9") {
val output = json["body"]?.jsonObject?.get("output")?.jsonObject
aesKey = output?.get("key")?.jsonPrimitive?.content ?: ""
aesIv = output?.get("iv")?.jsonPrimitive?.content ?: ""
println("🔑 복호화 키 획득 완료: KEY[$aesKey]")
}
return
}
if (!message.startsWith("0") && !message.startsWith("1")) return
// 2. 실시간 데이터 처리
val parts = message.split("|")
if (parts.size < 4) return
val leadingChar = message[0] // '0' 또는 '1'
val trId = parts[1]
when (leadingChar) {
'0' -> { // 일반 시세 (암호화 안됨)
if (trId == "H0STCNT0") {
val dataRows = parts[3].split("^")
val price = dataRows[2]
currentPrice[dataRows[0]] = price // 상태 업데이트
onPriceUpdate?.invoke(RealTimeTrade(
code = dataRows[0],
time = dataRows[1],
price = price,
change = dataRows[4],
volume = dataRows[12],
type = model.TradeType.NEUTRAL
))
// 로그 추가 (예시)
// if (tradeLogs.isNotEmpty() && tradeLogs.last().code)
}
}
'1' -> { // 체결 통보 (암호화 됨)
if ((trId == "H0STCNI0" || trId == "H0STCNI9") && aesKey.isNotEmpty()) {
// AES 복호화 실행
val decryptedData = AesCrypto.decrypt(parts[3], aesKey, aesIv)
val dataRows = decryptedData.split("^")
println("🔔 복호화된 체결 통보: ${if (dataRows[4] == "01") {"매도"} else {"매수"}} ${dataRows[8]} ${dataRows[9]}${if(dataRows[13] == "01"){"체결"}else{"접수"} }")
// UI 콜백 호출 (종목코드, 체결량, 체결가, 주문번호, 체결여부)
onExecutionReceived?.invoke(
dataRows[8], // 주식단축종목코드
dataRows[9], // 체결수량
dataRows[10], // 체결단가
dataRows[2], // 주문번호
dataRows[13] == "2" // 체결여부 (02: 체결)
)
}
}
}
}
fun clearData() {
tradeLogs.clear()
currentPrice.clear()
}
suspend fun subscribeStock(code: String, isSubscribe: Boolean = true) {
val trType = if (isSubscribe) "1" else "2"
sendRequest(trType, "H0STCNT0", code)
// if (isSubscribe) println("📡 구독 등록: $code") else println("📴 구독 해제: $code")
}
private suspend fun sendRequest(trType: String, trId: String, trKey: String) {
val approvalKey = KisSession.getWebSocketKey() ?: return
val json = """
{
"header": {
"approval_key": "$approvalKey",
"custtype": "P",
"tr_type": "$trType",
"content-type": "utf-8"
},
"body": {
"input": { "tr_id": "$trId", "tr_key": "$trKey" }
}
}
""".trimIndent()
session?.send(json)
}
private val activeSubscriptions = mutableSetOf<String>() // 현재 구독 중인 종목 코드 관리
suspend fun updateSubscriptions(requiredCodes: Set<String>) {
// 해지할 종목 (현재 구독 중이나 요구 리스트에 없는 것)
val toUnsubscribe = activeSubscriptions - requiredCodes
toUnsubscribe.forEach { subscribeStock(it, isSubscribe = false) }
// 신규 구독 (요구 리스트에는 있으나 현재 구독 중이 아닌 것)
val toSubscribe = requiredCodes - activeSubscriptions
toSubscribe.forEach { subscribeStock(it, isSubscribe = true) }
activeSubscriptions.clear()
activeSubscriptions.addAll(requiredCodes)
}
}