package network import androidx.compose.runtime.mutableStateOf import io.ktor.client.* import io.ktor.client.engine.cio.CIO import io.ktor.client.engine.okhttp.OkHttp import io.ktor.client.engine.config import io.ktor.client.plugins.HttpTimeout import io.ktor.client.plugins.websocket.* import io.ktor.client.engine.cio.* import io.ktor.client.engine.okhttp.* import io.ktor.client.plugins.* import io.ktor.client.plugins.logging.DEFAULT import io.ktor.client.plugins.logging.LogLevel import io.ktor.client.plugins.logging.Logger import io.ktor.client.plugins.logging.Logging import io.ktor.http.* import io.ktor.websocket.* import kotlinx.coroutines.* import kotlinx.serialization.json.Json import kotlinx.serialization.json.jsonObject import kotlinx.serialization.json.jsonPrimitive import model.KisSession import model.RealTimeTrade import util.AesCrypto import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicBoolean object KisWebSocketManager { val os = System.getProperty("os.name").lowercase() val arch = System.getProperty("os.arch").lowercase() val isWin = os.contains("win") private val client = HttpClient(if(isWin){ OkHttp } else { CIO }) { install(WebSockets) { pingInterval = 20_000 } install(Logging) { logger = Logger.DEFAULT level = LogLevel.ALL // ALL, HEADERS, BODY, INFO, NONE 중 선택 } install(HttpTimeout) { requestTimeoutMillis = 30_000 // 전체 요청 시간 connectTimeoutMillis = 10_000 // 서버 연결 시간 socketTimeoutMillis = 30_000 // 데이터 패킷 간격 시간 } // 엔진별 상세 설정 (config 대신 해당 엔진 명칭 사용) if (isWin) { engine { // OkHttp 환경 특화: 윈도우 네트워크 유휴 상태 방지 this as OkHttpConfig config { retryOnConnectionFailure(true) connectTimeout(15, TimeUnit.SECONDS) readTimeout(0, TimeUnit.SECONDS) // 무제한 대기 } } } else { engine { // CIO 엔진 설정 this as CIOEngineConfig endpoint { connectTimeout = 15_000 requestTimeout = 30_000 } } } } private var session: DefaultClientWebSocketSession? = null private val isConnected = AtomicBoolean(false) private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob()) private var connectJob: Job? = null // 콜백 리스너 var onPriceUpdate: ((RealTimeTrade) -> Unit)? = null var onExecutionReceived: ((String, String, String, String, Boolean) -> Unit)? = null suspend fun connect() { if (isConnected.get()) return val url = if (KisSession.config.isSimulation) "ops.koreainvestment.com:31000" else "ops.koreainvestment.com:21000" connectJob?.cancelAndJoin() connectJob = scope.launch { while (isActive) { // 재연결을 위한 루프 추가 try { client.webSocket(method = HttpMethod.Get, host = url.split(":")[0], port = url.split(":")[1].toInt(), path = "/last_price") { println("✅ 웹소켓 세션 진입 성공") session = this isConnected.set(true) println("✅ 웹소켓 연결 성공") // 기존 구독 신청 로직 (H0STCNI0 등) val htsId = KisSession.config.htsId if (htsId.isNotEmpty()) sendRequest("1", "H0STCNI0", htsId) // 메시지 수신 루프 for (frame in incoming) { if (frame is Frame.Text) { val text = frame.readText() // [핵심] PINGPONG 처리: 수신된 텍스트 그대로 다시 전송 if (text.contains("PINGPONG")) { send(Frame.Text(text)) // println("🔄 PINGPONG 응답 완료") } else { handleMessage(text) } } } } } catch (e: Exception) { println("❌ 웹소켓 연결 끊김: ${e.message}") e.printStackTrace() } finally { println("🏁 웹소켓 finally 블록 진입 (연결 시도 종료)") isConnected.set(false) session = null println("⏳ 5초 후 재연결 시도...") delay(5000) // 5초 후 재연결 시도 } } } } // private val _currentPrice = mutableStateOf("0") private val currentPrice = androidx.compose.runtime.mutableStateMapOf() // val currentPrice = _currentPrice val tradeLogs = androidx.compose.runtime.mutableStateListOf() suspend fun unsubscribeStock(code: String) { subscribeStock(code, isSubscribe = false) } // 체결 통보 복호화를 위한 키 저장소 private var aesKey: String = "" private var aesIv: String = "" private fun handleMessage(message: String) { if (message.startsWith("{")) { val json = Json.parseToJsonElement(message).jsonObject val trId = json["header"]?.jsonObject?.get("tr_id")?.jsonPrimitive?.content if (trId == "H0STCNI0" || trId == "H0STCNI9") { val output = json["body"]?.jsonObject?.get("output")?.jsonObject aesKey = output?.get("key")?.jsonPrimitive?.content ?: "" aesIv = output?.get("iv")?.jsonPrimitive?.content ?: "" println("🔑 복호화 키 획득 완료: KEY[$aesKey]") } return } if (!message.startsWith("0") && !message.startsWith("1")) return // 2. 실시간 데이터 처리 val parts = message.split("|") if (parts.size < 4) return val leadingChar = message[0] // '0' 또는 '1' val trId = parts[1] when (leadingChar) { '0' -> { // 일반 시세 (암호화 안됨) if (trId == "H0STCNT0") { val dataRows = parts[3].split("^") val price = dataRows[2] currentPrice[dataRows[0]] = price // 상태 업데이트 onPriceUpdate?.invoke(RealTimeTrade( code = dataRows[0], time = dataRows[1], price = price, change = dataRows[4], volume = dataRows[12], type = model.TradeType.NEUTRAL )) // 로그 추가 (예시) // if (tradeLogs.isNotEmpty() && tradeLogs.last().code) } } '1' -> { // 체결 통보 (암호화 됨) if ((trId == "H0STCNI0" || trId == "H0STCNI9") && aesKey.isNotEmpty()) { // AES 복호화 실행 val decryptedData = AesCrypto.decrypt(parts[3], aesKey, aesIv) val dataRows = decryptedData.split("^") println("🔔 복호화된 체결 통보: ${if (dataRows[4] == "01") {"매도"} else {"매수"}} ${dataRows[8]} ${dataRows[9]}주 ${if(dataRows[13] == "01"){"체결"}else{"접수"} }") // UI 콜백 호출 (종목코드, 체결량, 체결가, 주문번호, 체결여부) onExecutionReceived?.invoke( dataRows[8], // 주식단축종목코드 dataRows[9], // 체결수량 dataRows[10], // 체결단가 dataRows[2], // 주문번호 dataRows[13] == "2" // 체결여부 (02: 체결) ) } } } } fun clearData() { tradeLogs.clear() currentPrice.clear() } suspend fun subscribeStock(code: String, isSubscribe: Boolean = true) { val trType = if (isSubscribe) "1" else "2" sendRequest(trType, "H0STCNT0", code) // if (isSubscribe) println("📡 구독 등록: $code") else println("📴 구독 해제: $code") } private suspend fun sendRequest(trType: String, trId: String, trKey: String) { val approvalKey = KisSession.getWebSocketKey() ?: return val json = """ { "header": { "approval_key": "$approvalKey", "custtype": "P", "tr_type": "$trType", "content-type": "utf-8" }, "body": { "input": { "tr_id": "$trId", "tr_key": "$trKey" } } } """.trimIndent() session?.send(json) } private val activeSubscriptions = mutableSetOf() // 현재 구독 중인 종목 코드 관리 suspend fun updateSubscriptions(requiredCodes: Set) { // 해지할 종목 (현재 구독 중이나 요구 리스트에 없는 것) val toUnsubscribe = activeSubscriptions - requiredCodes toUnsubscribe.forEach { subscribeStock(it, isSubscribe = false) } // 신규 구독 (요구 리스트에는 있으나 현재 구독 중이 아닌 것) val toSubscribe = requiredCodes - activeSubscriptions toSubscribe.forEach { subscribeStock(it, isSubscribe = true) } activeSubscriptions.clear() activeSubscriptions.addAll(requiredCodes) } suspend fun disconnect() { println("🔌 웹소켓 연결 종료 중...") isConnected.set(false) connectJob?.cancelAndJoin() // 루프 자체를 중단 session?.close() session = null connectJob = null println("🛑 웹소켓 완전 종료") } }