import React from 'react'

import qs from 'qs'
import { connect, ConnectedProps } from 'react-redux'

import { service, wsPaths } from 'api'
import { NODE_ENV } from 'env'
import { RequestStreamBody, RootState, StreamDescriptor, StreamMessage, StreamMultiMeta } from 'types'
import { fastApiWsUrl, log, logoutSessionExpired } from 'utils'
import {
  HIGH_QUALITY_STREAM_PARAMS,
  HIGHEST_QUALITY_STREAM_PARAMS,
  STANDARD_QUALITY_STREAM_PARAMS,
} from 'utils/constants'

const clientTestConnectionMessage = { ping: 'ok' }

export interface FrameWithMeta {
  frame: Blob
  meta?: StreamMultiMeta
  time?: number
}

export interface CommonProps extends PropsFromRedux {
  streams?: StreamDescriptor[]
  params?: {
    past_ms?: number
    last_n?: number
    max_frequency?: number
    deserialization?: 'none' | 'msgpack' | 'arrow'
    framerate?: number
  }
  handshake?: {
    data_key?: string
  }
  reconnectOnClose?: boolean
}

interface MultiVideoStreamProps extends CommonProps {
  mode: 'multi_video'
  connect: { url: string }
  onFrame: (frameWithMeta: FrameWithMeta) => any
}

interface VideoStreamProps extends CommonProps {
  mode: 'video'
  connect: { robotId: string; relativeUrl: string }
  onFrame: (frameWithMeta: FrameWithMeta) => any
}

interface MessageStreamProps extends CommonProps {
  mode: 'message'
  connect: { url: string } | { robotId: string; relativeUrl: string }
  onMessages: (messages: StreamMessage[]) => any
}

type Props = VideoStreamProps | MessageStreamProps | MultiVideoStreamProps
/**
 * Renders nothing. Connects over websocket and passes messages to callback.
 * Automatically reconnects on error, or if server closes connection. Websocket
 * closed and dereferenced when component unmounts.
 *
 * If neither `past_ms` nor `last_n` are passed, starts reading all messages
 * published after websocket connection.
 *
 * @param connect - Object with either full `url` to ws endpoint, excluding
 *     querystring, or `relativeUrl` and `robotId`; if second form is passed, we
 *     try to connect to colocated robot Redis, else fall back to cloud Redis
 * @param onMessages - Callback that receives new messages read from stream
 * @param onFrame - Callback that receives each frame as it comes in
 * @param streams - Stream descriptors, for reading from multiple streams; these
 *     must be memoized
 * @param mode - message, video or multi_video
 * @param params.past_ms - On first read, read all messages within past_ms
 *     published to stream
 * @param params.last_n - On first read, read last N messages published to
 *     stream
 * @param params.max_frequency - Downsamples the stream to this frequency
 * @param params.deserialization - Used by FastAPI on messages read from streams
 *     after handshake?
 * @param params.framerate - Max framerate at which to read video feed
 * @param handshake.data_key - Instruct server to return only this key's value
 *     in stream messages
 */
class StreamListener extends React.Component<Props> {
  mounted?: boolean
  timerId?: number
  testConnectionTimerId?: number
  keepStreamOpenTimerId?: number
  socket?: WebSocket
  latest_id?: string // Id of most recently read message, to ensure we don't read messages more than once
  frameMeta?: StreamMultiMeta
  latestIdByStreamKey: { [streamKey: string]: string | undefined } = {}

  componentDidMount() {
    this.mounted = true
    this.socket = this.connectWithSocket()

    /**
     * Send a test connection frame from client to server; if underlying TCP connnection is dead, this makes browser aware of it, and socket.readyState transitions to CLOSED after ~15s; here are simple steps to repro
     *
     * - create websocket connection to remote server
     * - wait a bit, then kill internet, e.g. by killing WiFi
     * - bring internet back after a minute
     * - check socket.readyState
     * - it's still OPEN (1), even though connection is broken; it stays in OPEN indefinitely
     *
     * This is very dangerous; e.g., if WiFi fails temporarily, and we don't send test connection messages, we'll never notice that TCP connection, and hence socket, is dead
     *
     * See https://stackoverflow.com/questions/26898081, https://stackoverflow.com/questions/44902181, https://stackoverflow.com/questions/33104436, https://stackoverflow.com/questions/11809267
     */
    this.testConnectionTimerId = window.setInterval(() => {
      if (this.socket && this.socket.readyState === WebSocket.OPEN) {
        // If FastAPI receives this message it's guaranteed to ignore it; e.g. it won't mistake it for a handshake or any other "contentful" message
        this.socket.send(JSON.stringify(clientTestConnectionMessage))
      }
    }, 10000)

    this.timerId = window.setInterval(() => {
      if (!this.mounted) return

      const {
        socket,
        props: { reconnectOnClose = true },
      } = this
      if (
        socket &&
        reconnectOnClose &&
        (socket.readyState === WebSocket.CLOSING || socket.readyState === WebSocket.CLOSED)
      ) {
        this.socket = this.connectWithSocket()
      }
    }, 1000)

    this.refreshStreams()
    this.keepStreamOpenTimerId = window.setInterval(this.refreshStreams, 25000)
  }

  componentDidUpdate(prevProps: Props) {
    // If client code passes stream descriptors, they must memoize them
    if (this.getSocketUrl(prevProps) !== this.getSocketUrl() || prevProps.streams !== this.props.streams) {
      if (this.socket) this.socket.close()
      this.latestIdByStreamKey = {}
      this.latest_id = undefined
      if (this.mounted) {
        this.socket = this.connectWithSocket()
        // If we change to another quality stream, we need to request that stream to Atom immediately
        this.refreshStreams()
      }
    }
  }

  componentWillUnmount() {
    this.mounted = false
    if (this.socket) this.socket.close()
    window.clearInterval(this.testConnectionTimerId)
    window.clearInterval(this.timerId)
    window.clearInterval(this.keepStreamOpenTimerId)
  }

  refreshStreams = () => {
    let requestStreamPayload: RequestStreamBody | undefined = undefined

    if (this.props.mode === 'multi_video') {
      const { streams } = this.props
      if (!streams?.length) return

      for (const stream of streams) {
        if (!stream.robot_id) continue

        requestStreamPayload = undefined
        if (stream.element === 'transcoder-basler-image-standard') requestStreamPayload = STANDARD_QUALITY_STREAM_PARAMS
        if (stream.element === 'transcoder-basler-image-high') requestStreamPayload = HIGH_QUALITY_STREAM_PARAMS
        if (stream.element === 'transcoder-basler-image-highest') requestStreamPayload = HIGHEST_QUALITY_STREAM_PARAMS

        // We need to request a stream for each robot to atom
        if (requestStreamPayload) {
          requestStreamToAtom(stream.robot_id, requestStreamPayload)
        }
      }
    }

    if (this.props.mode === 'video') {
      const { connect } = this.props
      if (wsPaths.videoBaslerStandard(connect.robotId) === connect.relativeUrl)
        requestStreamPayload = STANDARD_QUALITY_STREAM_PARAMS
      if (wsPaths.videoBaslerHigh(connect.robotId) === connect.relativeUrl)
        requestStreamPayload = HIGH_QUALITY_STREAM_PARAMS
      if (wsPaths.videoBaslerHighest(connect.robotId) === connect.relativeUrl)
        requestStreamPayload = HIGHEST_QUALITY_STREAM_PARAMS

      if (requestStreamPayload) {
        requestStreamToAtom(connect.robotId, requestStreamPayload)
      }
    }
  }

  handleMessageEvent = (event: MessageEvent) => {
    if (this.props.mode !== 'message') return

    const { onMessages } = this.props
    const url = this.getSocketUrl()

    let data: StreamMessage[]
    try {
      data = JSON.parse(event.data)
    } catch (e) {
      log('src/components/StreamListener.tsx', 'messageParseError', url, e)
      return
    }

    const messages = data
    if (messages.length === 0) return // FastAPI can return empty message arrays if it wants, e.g. just to make sure client is still connected

    for (const message of messages) {
      if (message.meta) this.latestIdByStreamKey[getStreamKeyFromStreamMeta(message.meta)] = message.message_id
    }

    const messageId = messages[messages.length - 1]?.message_id
    this.latest_id = messageId
    const receivedMs = Date.now()
    if (onMessages) {
      onMessages(
        messages.map(message => {
          // Type safety: make sure element and robot_id keys are strings even if we're reading from cloud streams
          const meta: StreamMultiMeta = message.meta && { element: '', robot_id: '', ...(message.meta as any) }
          return { ...message, meta, new: true, receivedMs, messageMs: getMessageTs(message) }
        }),
      )
    }
  }

  handleVideoMessageEvent = (event: MessageEvent) => {
    if (this.props.mode === 'message') return

    const { onFrame } = this.props
    const message: string | Blob = event.data

    // We get meta message first, then image frame
    if (typeof message === 'string') {
      try {
        const messageObject = JSON.parse(message)

        // This was a test connection message sent by server; not sent by non-video endpoints; these can just send empty array to test connection
        if (messageObject.ping === 'ok') return

        // Type safety: make sure stream, element and robot_id keys are strings even if we're reading from cloud streams, or single video streams
        this.frameMeta = { stream: '', element: '', robot_id: '', ...messageObject.meta }
      } catch (e) {}
    } else {
      if (onFrame) onFrame({ frame: message as Blob, meta: this.frameMeta })
      this.frameMeta = undefined // This frame meta can't possibly be useful for the next image we get; FastAPI guarantees a meta frame is followed by an image frame from the same camera even when multiplexing video feeds
    }
  }

  getSocketUrl = (props: Props = this.props): string => {
    const { connect, edge } = props
    if ('url' in connect) return connect.url
    return `${fastApiWsUrl(edge, connect.robotId)}${connect.relativeUrl}`
  }

  connectWithSocket = (): WebSocket => {
    this.frameMeta = undefined // Reset frame meta if we're reconnecting with ws

    const { auth, currentOrg, streams, mode, params, handshake } = this.props
    const { latest_id } = this

    const qsOptions = { latest_id, ...params }

    // Prefer full `url`, else fall back to `relativeUrl` with base URL computed based on `edge`
    const socket = new WebSocket(`${this.getSocketUrl()}?${qs.stringify(qsOptions)}`)

    socket.onopen = () => {
      const streamsWithLatestId: StreamDescriptor[] | undefined = streams?.map(stream => {
        const streamKey = getStreamKeyFromStreamMeta(stream)
        return { ...stream, latest_id: this.latestIdByStreamKey[streamKey] }
      })
      // Send handshake payload
      const payload: {
        streams: StreamDescriptor[] | undefined
        org_id?: string
        token?: string
        data_key?: string | undefined
      } = { org_id: currentOrg?.organization.id, streams: streamsWithLatestId, ...handshake }
      if (NODE_ENV === 'development') payload.token = auth.auth?.token
      socket.send(JSON.stringify(payload))
    }

    socket.onerror = e => {
      log('src/components/StreamListener.tsx', `(streamError from ${this.getSocketUrl()}`, e)
    }

    socket.addEventListener('message', mode === 'message' ? this.handleMessageEvent : this.handleVideoMessageEvent)

    socket.onclose = e => {
      if (e.code === 3000) logoutSessionExpired()
    }

    return socket
  }

  render() {
    return null
  }
}

function getMessageTs(message: StreamMessage) {
  return parseInt(message.message_id.split('-')[0]!)
}

// Certain streams require us to notify atom to turn them on for a period of time to not waste resources
function requestStreamToAtom(robotId: string, payload: RequestStreamBody) {
  service.atomSendCommand('transcoder', 'request_streams', robotId, {
    command_args: {
      streams: [payload],
    },
  })
}

export function getStreamKeyFromStreamMeta(meta: { stream: string; element?: string; robot_id?: string }) {
  if (meta.element && meta.robot_id) return `${meta.element}:${meta.stream}:${meta.robot_id}`
  return meta.stream
}

const mapStateToProps = (state: RootState) => ({
  auth: state.auth,
  edge: state.edge,
  currentOrg: state.currentOrg,
})

const connector = connect(mapStateToProps)
type PropsFromRedux = ConnectedProps<typeof connector>
export default connector(StreamListener)
