package network import androidx.compose.runtime.mutableStateListOf import androidx.compose.runtime.mutableStateOf import androidx.compose.ui.graphics.Color import io.ktor.client.* import io.ktor.client.engine.cio.* import io.ktor.client.plugins.HttpTimeout import io.ktor.client.plugins.websocket.* import io.ktor.http.* import io.ktor.websocket.* import kotlinx.coroutines.* import kotlinx.coroutines.flow.consumeAsFlow import model.KisSession import model.RealTimeTrade import model.TradeType class KisWebSocketManager { private val client = HttpClient(CIO) { install(WebSockets) { pingInterval = 20_000 } install(HttpTimeout) { requestTimeoutMillis = 15_000 connectTimeoutMillis = 15_000 } } private var session: DefaultClientWebSocketSession? = null private val scope = CoroutineScope(Dispatchers.Default + Job()) // UI 상태값 val currentPrice = mutableStateOf("0") val tradeLogs = mutableStateListOf() // 콜백: 체결 발생 시 (주문번호, 종목코드, 가격, 수량, 매수/매도여부) var onExecutionReceived: ((orderNo: String, code: String, price: String, qty: String, isBuy: Boolean) -> Unit)? = null // 콜백: 감시 조건 도달 시 (종목코드, 현재가, 타입) var onTargetReached: ((code: String, price: Double, isProfit: Boolean) -> Unit)? = null suspend fun connect() { val config = KisSession.config if (config.websocketToken.isEmpty()) return val hostUrl = "ops.koreainvestment.com" val port = 21000 // 실전: 21000, 모의: 21000 (동일하나 TR_ID 등에 따라 다름) scope.launch { try { client.webSocket(method = HttpMethod.Get, host = hostUrl, port = port, path = "/tryitout/H0STCNT0") { session = this println("✅ 웹소켓 서버 연결 성공") incoming.consumeAsFlow().collect { frame -> if (frame is Frame.Text) { parseTradeData(frame.readText()) } } } } catch (e: Exception) { println("❌ 웹소켓 연결 오류: ${e.localizedMessage}") } } } private fun parseTradeData(data: String) { // KIS 데이터 포맷: 수신구분|TRID|데이터건수|체결데이터 val parts = data.split("|") if (parts.size < 4) return val trId = parts[1] val body = parts[3] when (trId) { "H0STCNT0" -> handlePriceData(body) // [1] 실시간 시세 처리 "H0STCNI0" -> handleExecutionData(body) // [2] 실시간 체결 통보 처리 } } /** * [1] 실시간 가격 데이터 처리 및 감시 로직 */ private fun handlePriceData(body: String) { val rows = body.split("^") if (rows.size < 16) return val stockCode = rows[0] val priceStr = rows[2] val currentPriceInt = priceStr.toIntOrNull() ?: 0 val newTrade = RealTimeTrade( time = rows[1].chunked(2).joinToString(":"), price = priceStr, change = rows[4], volume = rows[12], type = if (rows[15] == "1") TradeType.BUY else TradeType.SELL ) scope.launch(Dispatchers.Main) { tradeLogs.add(0, newTrade) if (tradeLogs.size > 30) tradeLogs.removeLast() currentPrice.value = String.format("%,d", currentPriceInt) // 실시간 감시 엔진 작동 checkAutoTradeTargets(stockCode, currentPriceInt.toDouble()) } } /** * [2] 실시간 개인 체결 통보 처리 */ private fun handleExecutionData(body: String) { val rows = body.split("^") if (rows.size < 13) return val orderNo = rows[1] val stockCode = rows[7] val side = rows[9] // 01: 매도, 02: 매수 val price = rows[11] val qty = rows[12] scope.launch(Dispatchers.Main) { val isBuy = side == "02" println("📣 체결 통보 수신: $stockCode | ${if(isBuy) "매수" else "매도"} | $price 원") // 외부 콜백 실행 (DB 업데이트 및 UI 전환 트리거) onExecutionReceived?.invoke(orderNo, stockCode, price, qty, isBuy) // 매수 체결 시 즉시 해당 종목 실시간 시세 구독 시작 if (isBuy) subscribeStock(stockCode) } } /** * 자동매매 목표가 도달 여부 판단 */ private fun checkAutoTradeTargets(code: String, currentPrice: Double) { // DB에서 해당 종목의 감시 설정(익절/손절가)을 가져와 비교 // 효율성을 위해 Map 등에 캐싱하여 사용할 것을 권장 scope.launch(Dispatchers.IO) { val config = DatabaseFactory.findConfigByCode(code) ?: return@launch if (currentPrice >= config.targetPrice) { withContext(Dispatchers.Main) { onTargetReached?.invoke(code, currentPrice, true) } } else if (currentPrice <= config.stopLossPrice) { withContext(Dispatchers.Main) { onTargetReached?.invoke(code, currentPrice, false) } } } } /** * 개인 체결 통보 구독 (HTS ID 필요) */ suspend fun subscribeExecution(htsId: String) { sendRequest(htsId, trType = "1", trId = "H0STCNI0") println("📡 실시간 체결 통보 구독 시작: $htsId") } suspend fun subscribeStock(stockCode: String) { sendRequest(stockCode, trType = "1", trId = "H0STCNT0") } suspend fun unsubscribeStock(stockCode: String) { if (stockCode.isNotEmpty()) sendRequest(stockCode, trType = "2", trId = "H0STCNT0") } private suspend fun sendRequest(key: String, trType: String, trId: String) { val currentSession = session ?: return val config = KisSession.config val requestJson = """ { "header": { "approval_key": "${config.websocketToken}", "custtype": "P", "tr_type": "$trType", "content-type": "utf-8" }, "body": { "input": { "tr_id": "$trId", "tr_key": "$key" } } } """.trimIndent() try { currentSession.send(Frame.Text(requestJson)) } catch (e: Exception) { println("❌ 웹소켓 요청 실패 ($trId): ${e.localizedMessage}") } } fun clearData() { tradeLogs.clear() currentPrice.value = "0" } }