import PushStream from "zen-push"

const onlyChannel =
  name =>
  ({ channel }) =>
    channel === name
const onlyType =
  type =>
  ({ header }) =>
    header.msg_type === type
const onlyTypes =
  types =>
  ({ header }) =>
    types.includes(header.msg_type)
const onlyComm =
  id =>
  ({ content }) =>
    content.comm_id === id

const onlyChildMessage =
  frame =>
  ({ header, parent_header }) => {
    return parent_header.msg_id === frame.header.msg_id
  }

// If it is intentional that a kernel's ws connection is terminated, then this
// code should be used with it to prevent a reconnect attempt.
const closeWSCode = 4001

const makeKernel = (ws, cname, serverName, kernel, kernelsAPI, reconnect) => {
  const kernelStream = new PushStream()

  ws.onmessage = message => kernelStream.next(JSON.parse(message.data))
  ws.onerror = err => kernelStream.error(err)
  ws.onclose = event => {
    kernelStream.complete()
    if (event.code !== closeWSCode) reconnect()
  }

  const connect = () => kernelStream.observable
  const connectChannel = name => connect().filter(onlyChannel(name))

  const send = name => frame => {
    if (ws.readyState === 1) {
      ws.send(JSON.stringify({ ...frame, channel: name }))
    }
    return connect()
  }

  const channel = name => [connectChannel(name), send(name)]

  const interrupt = () => {
    return kernelsAPI.interrupt(cname, serverName, kernel.id)
  }

  const restart = () => {
    return kernelsAPI.restart(cname, serverName, kernel.id)
  }

  const destroy = () => {
    return kernelsAPI.destroy(cname, serverName, kernel.id)
  }

  return {
    send,
    channel,
    connect,
    destroy,
    restart,
    interrupt,
    connectChannel,
  }
}

export {
  onlyType,
  onlyTypes,
  onlyComm,
  makeKernel,
  closeWSCode,
  onlyChannel,
  onlyChildMessage,
}
