import { useRef, useMemo, useCallback, useEffect, useState } from "react"
import { debounce } from "lodash"
import { useDispatch } from "react-redux"

import { getJwt, useCurrentUser } from "@dbai/ui-staples"
import { uuidv4, goToFocus } from "@dbai/tool-box"

import store from "reducers"
import { useSessionId } from "hooks"
import { actions } from "reducers/notebookReducer"
import { createPath, getCellById } from "lib/utils"
import { useKernel, frames, onlyType, onlyTypes } from "./kernel"
import {
  selectNodes,
  selectFocused,
  selectAutoScroll,
  selectFocusedCell,
  selectCellsWithNodeData,
} from "selectors"

const getDataframesCode =
  'import pandas as pd\n\nvariables = list(globals().keys())\n\ndataframes = []\nfor variable in variables:\n  if isinstance(globals()[variable], pd.DataFrame) and not variable.startswith("_"):\n    dataframes.append(variable)\n  \ndataframes\n'

// Checks if this is the last cell _out of all cells_, not just the current
// node.
export const checkIfLastCell = () => {
  const state = store.getState()
  const nodes = selectNodes(state)
  const { node, cellIdx } = selectFocused(state)

  const currentNodeIndex = nodes.findIndex(iteratee => iteratee.id === node)
  const isLastNode = currentNodeIndex === nodes.length - 1

  // Quit early and save some time. We know this isn't the last cell if this
  // isn't the last node.
  if (!isLastNode) return false

  const currentNode = nodes[currentNodeIndex]
  return currentNode.cells.length - 1 === cellIdx
}

const getPathDetails = path => ({
  output: `${path}.outputs`,
  execCount: `${path}.executionCount`,
})

const executeMarkdownCell = ({ cellId = null, dispatch, onExec }) => {
  dispatch(actions.updateCellCodeVisibility({ cellId, visible: false }))
  onExec()
}

export const executeCell =
  ({ error, loading, kernel, dispatch, user }) =>
  ({
    doc,
    docValue = doc.getValue(),
    cellId,
    nodeId,
    path,
    cellType,
    onExec = () => {},
    handleAutosave = true,
  }) => {
    if (loading || error) return
    const sessionId = useSessionId()
    switch (cellType) {
      case "markdown":
        executeMarkdownCell({ cellId, dispatch, onExec })
        break
      case "code":
        const { output } = getPathDetails(path)

        const [iopub] = kernel.channel("iopub")
        const [shell, exec] = kernel.channel("shell")
        const jwt = getJwt()

        const frame = frames.makeExec(docValue, {
          metadata: {
            cell: path,
            nodeId,
            cellId,
            jwt,
            clientSessionId: sessionId,
          },
          username: user && user.email,
        })
        onExec && onExec(docValue)
        dispatch(actions.set({ name: output, value: [] }))

        const updateStatus = status => {
          dispatch(
            actions.setCellStatus({
              cellId,
              status,
            })
          )
        }

        updateStatus("waiting")

        const executeInputSub = iopub
          .filter(onlyType("execute_input"))
          .subscribe(({ content, parent_header }) => {
            if (parent_header?.metadata?.cellId === cellId) {
              updateStatus("loading")
              handleAutosave && dispatch(actions.pauseAutosave())
              executeInputSub.unsubscribe()
            }
          })

        const executeReplySub = shell
          .filter(onlyType("execute_reply"))
          .subscribe(({ content, parent_header }) => {
            if (parent_header?.metadata?.cellId === cellId) {
              const { status } = content
              updateStatus(status)
              handleAutosave && dispatch(actions.unpauseAutosave())
              executeReplySub.unsubscribe()
            }
          })

        exec(frame)
          .filter(onlyType("comm_open"))
          .subscribe(({ content }) => {})
        break
      default:
        return
    }
  }

/*
 * Returns a function that accepts a start and finish index and executes all
 * cells inside that range. The given indices are expected to be for an array
 * containing all the cells in the workflow.
 */
export const useExecuteCells = () => {
  const [user] = useCurrentUser()
  const dispatch = useDispatch()
  const { error, loading, kernel } = useKernel()
  const exec = useCallback(
    (start, finish) => {
      const cells = selectCellsWithNodeData(store.getState())
      const safeStart = Math.max(0, start)
      const safeFinish = Math.floor(finish, cells.length - 1)
      const execute = executeCell({ error, loading, kernel, dispatch, user })
      cells
        .slice(safeStart, safeFinish)
        .forEach(({ cell, parentData }, idx) => {
          // We should probably start getting away form these "path" and "prefix"
          // values, as they are really brittle and getting out of hand.
          const path = `workflow.spec.nodes[${parentData.nodeIdx}].cells[${parentData.cellIdx}]`
          execute({
            docValue: cell.source.join(""),
            path,
            cellId: cell.uuid,
            cellType: cell.cellType,
            handleAutosave: false,
            nodeId: parentData.parentNodeId,
          })
        })
    },
    [dispatch, error, kernel, loading, user]
  )
  return exec
}

const directions = {
  before: "before",
  after: "after",
}

export const useExecuteAdjacentCells = () => {
  const execute = useExecuteCells()

  const exec = useCallback(
    (cell, direction) => {
      const cells = selectCellsWithNodeData(store.getState())
      const grandIndex = cells.findIndex(({ cell: c }) => c.uuid === cell.uuid)
      if (direction === directions.before) return execute(0, grandIndex)
      if (direction === directions.after)
        return execute(grandIndex + 1, cells.length)
      console.warn(
        "useExecuteAdjacentCells was called with an invalid direction"
      )
      console.warn(
        `Permitted values: "${directions.before}", "${directions.after}"`
      )
      console.warn(`Provided value: "${direction}"`)
    },
    [execute]
  )
  return exec
}

export const useExecuteBeforeCell = () => {
  const execute = useExecuteAdjacentCells()
  return useCallback(cell => execute(cell, directions.before), [execute])
}

export const useExecuteAfterCell = () => {
  const execute = useExecuteAdjacentCells()
  return useCallback(cell => execute(cell, directions.after), [execute])
}

export const useMuteCell = cellId => {
  const dispatch = useDispatch()
  return useCallback(
    () => dispatch(actions.toggleCellMute({ cellId })),
    [cellId, dispatch]
  )
}

export const useExecCell = () => {
  const [user] = useCurrentUser()
  const dispatch = useDispatch()
  const { error, loading, kernel } = useKernel()
  const exec = useMemo(() => {
    return executeCell({ error, loading, kernel, dispatch, user })
  }, [dispatch, error, kernel, loading, user])
  return exec
}

export const useExecCellByUuid = () => {
  const exec = useExecCell()
  const runCellById = useCallback(
    (cellUuid, onExec) => {
      const {
        cellId,
        nodeId,
        cellIdx,
        nodeIdx,
        cell: { source, cellType },
      } = getCellById(cellUuid, store.getState())
      const path = createPath({ nodeIdx, cellIdx })
      const docValue = source.join("")

      return exec({
        path,
        cellId,
        nodeId,
        onExec,
        cellType,
        docValue,
      })
    },
    [exec]
  )

  return runCellById
}

export const useExecFocusedCell = () => {
  const exec = useExecCellByUuid()
  const runFocusedCell = useCallback(
    onExec => {
      const focusedCell = selectFocusedCell(store.getState())
      return exec(focusedCell.uuid, onExec)
    },
    [exec]
  )

  return runFocusedCell
}

const createFilterForCell =
  cellUuid =>
  ({ parent_header = {} }) => {
    const { metadata = {} } = parent_header
    const { cellId } = metadata
    return cellUuid === cellId
  }

// Given a cell path this hook creates a subscription that responds to updates
// for kernel messages that have a msg_id prefixed with the given shell path
// and updates the execution_count for that cell in redux.
export const useExecutionCountSubscription = ({
  nodeIdx,
  cellIdx,
  cellUuid,
}) => {
  const dispatch = useDispatch()
  const { error, loading, kernel } = useKernel()

  useEffect(() => {
    if (error || loading || !kernel) return

    const channel = kernel.connect()

    const onlyForNode = createFilterForCell(cellUuid)

    const subscription = channel
      .filter(onlyForNode)
      .filter(onlyType("execute_reply"))
      .subscribe(({ header, content, parent_header }) => {
        const { execution_count } = content
        dispatch(
          actions.updateCellExecutionCount({
            nodeIdx,
            cellIdx,
            executionCount: execution_count,
          })
        )
      })

    return () => subscription.unsubscribe()
  }, [cellIdx, cellUuid, dispatch, error, kernel, loading, nodeIdx])
}

const outputTypes = ["stream", "display_data", "execute_result"]

const errorTypes = ["error"]

const desiredMsgTypes = [
  ...outputTypes,
  ...errorTypes,
  "status",
  "clear_output",
  "execute_input",
]

const createOutputFilter =
  ({ cellUuid, user, muted }) =>
  ({ header, parent_header = {} }) => {
    const sessionId = useSessionId()
    const { username, metadata = {} } = parent_header
    const { cellId, clientSessionId } = metadata
    if (cellUuid !== cellId) return false
    if (username !== user?.email) return false
    if (clientSessionId !== sessionId) return false
    if (!desiredMsgTypes.includes(header.msg_type)) {
      return false
    }
    if (muted && outputTypes.includes(header.msg_type)) return false
    return true
  }

const contentLengthLimit = 1000

const trimLines = (acc, text) => {
  const existingText = acc.text ? `${acc.text}\n` : ""

  if (text?.length > contentLengthLimit) {
    const truncatedSlice = text.slice(0, contentLengthLimit)
    return {
      ...acc,
      isTruncated: true,
      text: `${existingText}${truncatedSlice}`,
    }
  }

  return {
    ...acc,
    text: `${existingText}${text}`,
  }
}

const trimOutputText = (content = {}) => {
  if (!content?.text) return content

  const { text = "", ...rest } = content
  const slices = text.split("\n")
  return slices.reduce(trimLines, rest)
}

// Given a cell path this hook creates a subscription that responds to updates
// for kernel messages that have a msg_id prefixed with the given shell path
// and updated the output for that cell in redux.
export const useCellOutputSubscription = ({
  cellUuid,
  nodeIdx,
  cellIdx,
  muted,
}) => {
  const dispatch = useDispatch()
  const [user] = useCurrentUser()
  const cache = useRef([])
  const { error, loading, kernel } = useKernel()
  // eslint-disable-next-line react-hooks/exhaustive-deps
  const flushCachedOutput = useCallback(
    debounce(
      (parent_header, opts) => {
        dispatch(actions.updateCellOutputs({ ...opts, content: cache.current }))
        const autoScroll = selectAutoScroll(store.getState())
        cache.current = []
        // In case of dramatically changing output, scroll to the current
        // point of focus if this user triggered the event.
        if (autoScroll && user && parent_header.username === user.email) {
          goToFocus()
        }
      },
      333,
      { maxWait: 2000 }
    ),
    [dispatch]
  )

  useEffect(() => {
    if (error || loading || !kernel) return

    const channel = kernel.connect()

    const onlyForNode = createOutputFilter({ cellUuid, user, muted })

    const subscription = channel
      .filter(onlyForNode)
      .subscribe(({ header, content, parent_header }) => {
        const { msg_type } = header || {}
        const isNewExecution = content.code && msg_type === "execute_input"
        const isExplicitClearRequest = msg_type === "clear_output"

        if (msg_type === "status" && content.execution_state === "idle") {
          const startTime = new Date(parent_header.date)
          const endTime = new Date(header.date)
          const runTime = (endTime - startTime) / 1000
          dispatch(actions.setCellRunTime({ cellId: cellUuid, runTime }))
        }

        if (isNewExecution || isExplicitClearRequest) {
          /*
           * When a user triggers the execution of a cell or if there is an
           * explicit request to remove the current output of a cell, clear
           * the output.
           */
          cache.current = []
          dispatch(
            actions.clearCellOutputs({
              nodeIdx,
              cellIdx,
            })
          )
        } else {
          const newContent = [...cache.current, trimOutputText(content)]
          cache.current = newContent
          flushCachedOutput(parent_header, {
            nodeIdx,
            cellIdx,
            outputType: header.msg_type,
          })
        }
      })

    return () => subscription.unsubscribe()
  }, [
    dispatch,
    user,
    error,
    kernel,
    loading,
    cellIdx,
    nodeIdx,
    cellUuid,
    flushCachedOutput,
    muted,
  ])
}

export const useCellSubscription = ({ nodeIdx, cellIdx, cellUuid, muted }) => {
  useExecutionCountSubscription({ nodeIdx, cellIdx, cellUuid })
  useCellOutputSubscription({ nodeIdx, cellIdx, cellUuid, muted })
}

const convertDataframesText = text => {
  let dataframesList = text
  if (dataframesList === "[]") return []

  return dataframesList
    .substring(1, text.length - 2)
    .replaceAll("'", "")
    .replaceAll("\n", "")
    .split(", ")
}

const buildTable = (data, dfName) => {
  const rows = JSON.parse(data)
  const columns = Object.keys(rows[0]).map(x => {
    return {
      key: x,
      title: x,
      dataIndex: x,
      width: x.length * 10,
      ellipsis: true,
      onCell: (record, idx) => ({ record, dfName, dfColumn: x, idx }),
    }
  })
  const firstRow = columns.reduce((acc, col) => {
    return { ...acc, [col.dataIndex]: col.dataIndex }
  }, {})
  return { columns: columns, data: [firstRow, ...rows] }
}

export const execPythonCode = (kernel, sessionId, user, code, callback) => {
  const [iopub] = kernel.channel("iopub")
  const [, exec] = kernel.channel("shell")
  const jwt = getJwt()

  const frameId = uuidv4()

  const frame = frames.makeExec(code, {
    metadata: {
      jwt,
      id: frameId,
      clientSessionId: sessionId,
    },
    username: user && user.email,
  })

  const sub = iopub
    .filter(onlyTypes(["execute_result", "display_data"]))
    .subscribe(({ content, parent_header, rest }) => {
      if (parent_header.metadata.id !== frameId) return
      callback(content)
    })
  exec(frame)
  return sub
}

//NOTE: this will only work if the python code uses a `display` function.  Using `print` goes through iopub stream
export const usePythonResults = (code, callback) => {
  const { error: kernelError, loading: kernelLoading, kernel } = useKernel()
  const [user] = useCurrentUser()
  const sessionId = useSessionId()

  useEffect(() => {
    if (kernelLoading || kernelError) return
    const sub = execPythonCode(kernel, sessionId, user, code, callback)
    return () => sub.unsubscribe()
  }, [kernel, kernelError, kernelLoading, user, sessionId, code, callback])
}

export const useDataframes = () => {
  const [dataframesLoading, setDataframesLoading] = useState(true)
  const [dataframes, setDataframes] = useState([])
  const frameId = useMemo(() => uuidv4(), [])
  const { error, loading, kernel } = useKernel()
  const [user] = useCurrentUser()
  const sessionId = useSessionId()

  const onDataframesResult = useCallback(results => {
    const responseText = results.data["text/plain"]
    setDataframes(convertDataframesText(responseText))
    setDataframesLoading(false)
  }, [])

  // function that executes code against the kernel to get dataframes
  const getDataframes = useCallback(() => {
    const jwt = getJwt()
    const [, exec] = kernel.channel("shell")
    const frame = frames.makeExec(getDataframesCode, {
      metadata: {
        jwt,
        id: frameId,
        clientSessionId: sessionId,
      },
      username: user && user.email,
    })
    exec(frame)
  }, [kernel, sessionId, user, frameId])

  // get dataframes on first render
  useEffect(() => {
    if (loading || error) return
    getDataframes()
  }, [getDataframes, loading, error])

  // observe for new dataframes
  useEffect(() => {
    if (loading || error) return
    const [iopub] = kernel.channel("iopub")
    const sub = iopub
      .filter(onlyTypes(["execute_result", "display_data"]))
      .subscribe(({ content, parent_header, rest }) => {
        if (parent_header.metadata.id !== frameId) return
        onDataframesResult(content)
      })
    return () => sub.unsubscribe()
  }, [frameId, error, kernel, loading, onDataframesResult])

  return { dataframesLoading, dataframes, getDataframes }
}

export const useDataframe = dfName => {
  const [dataframeLoading, setDataframeLoading] = useState(false)
  const [dataframe, setDataframe] = useState([])
  const [shapeLoading, setShapeLoading] = useState(false)
  const [shape, setShape] = useState([])

  const dataframeShapeCode = useMemo(() => `${dfName}.shape`, [dfName])

  const dataframeShapeCallback = useCallback(results => {
    const responseText = results.data["text/plain"]
    setShapeLoading(false)
    setShape(responseText)
  }, [])

  usePythonResults(dataframeShapeCode, dataframeShapeCallback)

  const dataframeRowsCode = useMemo(
    () => `${dfName}.head(1000).to_json(orient="records")`,
    [dfName]
  )

  const dataframeRowsCallback = useCallback(
    results => {
      const responseText = results.data["text/plain"]
      const jsString = responseText
        .substring(1, responseText.length - 1)
        .replace(/'/g, '"')
      const dataframe = buildTable(jsString, dfName)
      setDataframe(dataframe)
      setDataframeLoading(false)
    },
    [dfName]
  )
  usePythonResults(dataframeRowsCode, dataframeRowsCallback)

  return { dataframeLoading, dataframe, shapeLoading, shape }
}
