feat: cross socket forwarding

This commit is contained in:
ryana mittens 2024-09-06 20:54:19 +08:00
parent 08bb926220
commit 039fe9dcba
2 changed files with 34 additions and 15 deletions

View file

@ -7,9 +7,12 @@ from app.core.config import settings
from typing import List from typing import List
connected_clients: List[WebSocket] = [] connected_clients: List[WebSocket] = []
finnhub_ws: websocket.WebSocketApp = None
# WebSocket event handlers
async def on_message(ws, message): async def on_message(ws, message):
print("Client list:", connected_clients) print("Received message from Finnhub WebSocket:", message)
print("Forwarding message to connected clients:", connected_clients)
for client in connected_clients: for client in connected_clients:
try: try:
await client.send_text(message) await client.send_text(message)
@ -20,22 +23,34 @@ def on_error(ws, error):
print("Error:", error) print("Error:", error)
def on_close(ws): def on_close(ws):
print("### WebSocket closed ###") print("### Finnhub WebSocket closed ###")
def on_open(ws): def on_open(ws):
global finnhub_ws
finnhub_ws = ws
ws.send(json.dumps({'type': 'subscribe', 'symbol': 'AAPL'})) ws.send(json.dumps({'type': 'subscribe', 'symbol': 'AAPL'}))
ws.send(json.dumps({'type': 'subscribe', 'symbol': 'AMZN'})) ws.send(json.dumps({'type': 'subscribe', 'symbol': 'AMZN'}))
ws.send(json.dumps({'type': 'subscribe', 'symbol': 'BINANCE:BTCUSDT'})) ws.send(json.dumps({'type': 'subscribe', 'symbol': 'BINANCE:BTCUSDT'}))
ws.send(json.dumps({'type': 'subscribe', 'symbol': 'IC MARKETS:1'})) ws.send(json.dumps({'type': 'subscribe', 'symbol': 'IC MARKETS:1'}))
def forward_message_to_finnhub(message: str):
global finnhub_ws
if finnhub_ws:
finnhub_ws.send(message)
print("Forwarded message to Finnhub:", message)
else:
print("Finnhub WebSocket is not connected.")
def start_finnhub_websocket(): def start_finnhub_websocket():
websocket.enableTrace(True) websocket.enableTrace(True)
ws = websocket.WebSocketApp(f"wss://ws.finnhub.io?token={settings.FINNHUB_API_KEY}", ws = websocket.WebSocketApp(
on_message=lambda ws, msg: asyncio.run(on_message(ws, msg)), f"wss://ws.finnhub.io?token={settings.FINNHUB_API_KEY}",
on_error=on_error, on_message=lambda ws, msg: asyncio.run(on_message(ws, msg)),
on_close=on_close) on_error=on_error,
on_close=on_close
)
ws.on_open = on_open ws.on_open = on_open
# Running WebSocket in a separate thread # Run WebSocket in a separate thread
thread = threading.Thread(target=ws.run_forever) thread = threading.Thread(target=ws.run_forever)
thread.start() thread.start()

View file

@ -1,5 +1,6 @@
from fastapi import APIRouter, WebSocket, WebSocketDisconnect from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from app.core.websocket import connected_clients, start_finnhub_websocket from app.core.websocket import connected_clients, start_finnhub_websocket, forward_message_to_finnhub
from typing import List
router = APIRouter() router = APIRouter()
@ -7,19 +8,22 @@ router = APIRouter()
async def websocket_endpoint(websocket: WebSocket): async def websocket_endpoint(websocket: WebSocket):
await websocket.accept() await websocket.accept()
connected_clients.append(websocket) connected_clients.append(websocket)
print("Client connected:", websocket)
try: try:
while True: while True:
data = await websocket.receive_text() data = await websocket.receive_text()
print(f"Received from client: {data}") print("Received message from client:", data)
# Forwarding message from client to Finnhub
forward_message_to_finnhub(data)
except WebSocketDisconnect: except WebSocketDisconnect:
connected_clients.remove(websocket) connected_clients.remove(websocket)
print("Client disconnected") print("Client disconnected:", websocket)
@router.get("/start-websocket") @router.get("/start-websocket")
async def start_websocket(): async def start_websocket():
""" """
API to trigger the Finnhub WebSocket connection. API to trigger the Finnhub WebSocket connection.
""" """
start_finnhub_websocket() # start_finnhub_websocket()
return {"message": "WebSocket connection started"} return {"message": "WebSocket connection to Finnhub started"}