/* eslint-disable no-console */
import * as mqtt from 'mqtt';
import { eventChannel } from '@redux-saga/core';
import { put, call, take } from '@redux-saga/core/effects';
import { reportMessageToSentry } from 'lib/error';
import config from 'config';
import authToken from 'models/authToken';

let client;
let channel;
const topics = new Set();

export function subscribe({ topic }) {
    return new Promise((resolve, reject) => {
        client.subscribe(topic, (error) => {
            if (error) {
                reportMessageToSentry({ message: 'MQTT subscribe error', extras: { error } });
                return reject(error);
            }
            console.debug(`MQTT: subscribe to topic: ${topic}`);
            return resolve();
        });
    });
}

export function unsubscribe({ topic }) {
    return new Promise((resolve, reject) => {
        client.unsubscribe(topic, {}, (error) => {
            if (error) {
                reportMessageToSentry({ message: 'MQTT unsubscribe error', extras: { error } });
                return reject(error);
            }
            console.debug(`MQTT: unsubscribe to topic: ${topic}`);
            topics.delete(topic);
            return resolve();
        });
    });
}

function createMqttChannel() {
    return eventChannel((emitter) => {
        const listener = (incomingTopic, message) => {
            console.debug({ data: JSON.parse(message.toString()), incomingTopic });
            emitter({ data: JSON.parse(message.toString()), incomingTopic });
        };

        client.on('message', listener);

        return () => {
            channel.close();
            channel = null;
            client.off('message', listener);
        };
    });
}

function transformWsUrl(url) {
    const { raw } = authToken.get();
    const wsUrl = new URL(url);
    wsUrl.searchParams.set('token', raw);

    return wsUrl.toString();
}

function connectToMqtt() {
    return new Promise((resolve, reject) => {
        if (client) {
            resolve();
            return;
        }

        client = mqtt.connect(config.mqtt.url, {
            ...config.mqtt.options,
            transformWsUrl,
        });
        client.on('connect', () => console.debug(`MQTT: connected to ${config.mqtt.url}`));
        client.on('reconnect', () => console.debug('MQTT: reconnecting'));
        client.on('close', () => console.debug('MQTT: connection closed'));
        client.on('offline', () => console.debug('MQTT: went offline'));
        client.on('error', (error) => console.error(error, 'MQTT: error'));

        client.once('connect', () => resolve());
        client.once('error', (error) => {
            reportMessageToSentry({ message: 'MQTT error', extras: { error } });
            return reject(error);
        });
    });
}

export function* watchForMqttEvents({ topic }) {
    try {
        if (authToken.isExpired()) {
            return;
        }

        if (topics.has(topic)) return;
        topics.add(topic);

        yield call(connectToMqtt);
        yield call(subscribe, { topic });

        if (channel) return;
        channel = yield call(createMqttChannel);

        while (true) {
            const { incomingTopic, data } = yield take(channel);
            yield put({ type: `MQTT/${incomingTopic}`, data });
        }
    } catch (error) {
        throw new Error(error);
    }
}
