package network import androidx.compose.runtime.mutableStateOf import io.ktor.client.* import io.ktor.client.plugins.websocket.* import io.ktor.http.* import io.ktor.websocket.* import kotlinx.coroutines.* import model.KisSession import java.util.concurrent.atomic.AtomicBoolean class KisWebSocketManager { private val client = HttpClient { install(WebSockets) } private var session: DefaultClientWebSocketSession? = null private val isConnected = AtomicBoolean(false) private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob()) // 콜백 리스너 var onPriceUpdate: ((String, Double) -> Unit)? = null var onExecutionReceived: ((String, String, String, String, Boolean) -> Unit)? = null suspend fun connect() { if (isConnected.get()) return val url = if (KisSession.config.isSimulation) "ops.koreainvestment.com:21000" else "ops.koreainvestment.com:31000" scope.launch { try { client.webSocket(method = HttpMethod.Get, host = url.split(":")[0], port = url.split(":")[1].toInt(), path = "/last_price") { session = this isConnected.set(true) println("✅ 웹소켓 연결 성공") // 연결 직후 HTS ID 기반 체결 통보 자동 구독 val htsId = KisSession.config.htsId if (htsId.isNotEmpty()) sendRequest("1", "H0STT084R", htsId) for (frame in incoming) { if (frame is Frame.Text) handleMessage(frame.readText()) } } } catch (e: Exception) { println("❌ 웹소켓 에러: ${e.message}") } finally { isConnected.set(false) } } } private val _currentPrice = mutableStateOf("0") val currentPrice = _currentPrice val tradeLogs = androidx.compose.runtime.mutableStateListOf() suspend fun unsubscribeStock(code: String) { subscribeStock(code, isSubscribe = false) } private fun handleMessage(message: String) { if (!message.startsWith("0") && !message.startsWith("1")) return val parts = message.split("|") if (parts.size < 4) return val trId = parts[1] val dataRows = parts[3].split("^") when (trId) { "H0STCNT0" -> { val price = dataRows[2] _currentPrice.value = price // 상태 업데이트 onPriceUpdate?.invoke(dataRows[0], price.toDoubleOrNull() ?: 0.0) // 로그 추가 (예시) tradeLogs.add(0, model.RealTimeTrade( time = dataRows[1], price = price, change = dataRows[4], volume = dataRows[2], type = model.TradeType.NEUTRAL )) if (tradeLogs.size > 50) tradeLogs.removeLast() } "H0STT084R" -> onExecutionReceived?.invoke(dataRows[5], dataRows[9], dataRows[12], dataRows[13], dataRows[15] == "02") } } fun clearData() { tradeLogs.clear() _currentPrice.value = "0" } suspend fun subscribeStock(code: String, isSubscribe: Boolean = true) { val trType = if (isSubscribe) "1" else "2" sendRequest(trType, "H0STCNT0", code) if (isSubscribe) println("📡 구독 등록: $code") else println("📴 구독 해제: $code") } private suspend fun sendRequest(trType: String, trId: String, trKey: String) { val approvalKey = KisSession.getWebSocketKey() ?: return val json = """ { "header": { "approval_key": "$approvalKey", "custtype": "P", "tr_type": "$trType", "content-type": "utf-8" }, "body": { "input": { "tr_id": "$trId", "tr_key": "$trKey" } } } """.trimIndent() session?.send(json) } private val activeSubscriptions = mutableSetOf() // 현재 구독 중인 종목 코드 관리 suspend fun updateSubscriptions(requiredCodes: Set) { // 해지할 종목 (현재 구독 중이나 요구 리스트에 없는 것) val toUnsubscribe = activeSubscriptions - requiredCodes toUnsubscribe.forEach { subscribeStock(it, isSubscribe = false) } // 신규 구독 (요구 리스트에는 있으나 현재 구독 중이 아닌 것) val toSubscribe = requiredCodes - activeSubscriptions toSubscribe.forEach { subscribeStock(it, isSubscribe = true) } activeSubscriptions.clear() activeSubscriptions.addAll(requiredCodes) } }