/* eslint-disable no-console */
import mqtt from 'mqtt';
import { eventChannel } from '@redux-saga/core';
import { put, call, take } from '@redux-saga/core/effects';
import { reportMessageToSentry } from 'lib/error';
import config from 'config';
import authToken from 'models/authToken';

const clients = new Map();
const channels = new Map();
const topics = new Set();

const BROKERS = {
    VG: 'vg',
    LINODE: 'linode',
};
function subscribe({ topic, broker }) {
    return new Promise((resolve, reject) => {
        clients.get(broker).subscribe(topic, (error) => {
            if (error) {
                reportMessageToSentry({ message: `MQTT(${broker}) subscribe error - ${topic}`, extras: { error } });
                return reject(error);
            }
            console.debug(`MQTT: subscribe to topic: ${topic}, broker=${broker}`);
            return resolve();
        });
    });
}

export function unsubscribe({ topic }) {
    const broker = chooseBroker(topic);
    return new Promise((resolve, reject) => {
        clients.get(broker).unsubscribe(topic, {}, (error) => {
            if (error) {
                reportMessageToSentry({ message: `MQTT(${broker}) unsubscribe error - ${topic}`, extras: { error } });
                return reject(error);
            }
            console.debug(`MQTT: unsubscribe to topic: ${topic}, broker=${broker}`);
            topics.delete(topic);
            return resolve();
        });
    });
}

function createMqttChannel({ broker }) {
    return eventChannel((emitter) => {
        const listener = (incomingTopic, message) => {
            console.debug({ data: JSON.parse(message.toString()), incomingTopic, broker });
            emitter({ data: JSON.parse(message.toString()), incomingTopic });
        };

        const client = clients.get(broker);
        const channel = channels.get(broker);
        client.on('message', listener);

        return () => {
            channel.close();
            channels.delete(broker);
            client.off('message', listener);
        };
    });
}

function transformWsUrl(url) {
    const { raw } = authToken.get();
    const wsUrl = new URL(url);
    wsUrl.searchParams.set('token', raw);

    return wsUrl.toString();
}

function connectToMqtt({ broker }) {
    return new Promise((resolve, reject) => {
        if (clients.has(broker)) {
            resolve();
            return;
        }
        const url = broker === BROKERS.VG ? config.mqtt.urlVg : config.mqtt.urlLinode;
        const client = mqtt.connect(url, {
            ...config.mqtt.options,
            transformWsUrl,
        });
        clients.set(broker, client);
        client.on('connect', () => console.debug(`MQTT: connected to ${url}, broker=${broker}`));
        client.on('reconnect', () => console.debug('MQTT: reconnecting'));
        client.on('close', () => console.debug('MQTT: connection closed'));
        client.on('offline', () => console.debug('MQTT: went offline'));
        client.on('error', (error) => console.error(error, 'MQTT: error'));

        client.once('connect', () => resolve());
        client.once('error', (error) => {
            reportMessageToSentry({ message: `MQTT(${broker}) error`, extras: { error } });
            return reject(error);
        });
    });
}
const chooseBroker = (topic) => {
    // special case for one subscription
    // looks like mqtt library supports topic as array and we have one occurance in code - yats related
    // [`ingest/${provider}/${newAssetId}/cut/+`, `ingest/${provider}/${newAssetId}/transcode/+`];
    if (Array.isArray(topic)) {
        return BROKERS.VG;
    }
    if (topic.startsWith('live')) {
        return BROKERS.VG;
    }
    if (topic.startsWith('ingest') && topic.includes('convert')) {
        return BROKERS.LINODE;
    }
    // this must be migrated before enabling whole `category`
    // https://github.schibsted.io/svp/src/blob/main/apps/dynamic-preview-publisher/src/services/mqtt/index.ts#L6
    if (!topic.startsWith('ingest') && topic.includes('category')) {
        return BROKERS.VG;
    }
    if (!topic.startsWith('ingest') && topic.includes('asset')) {
        return BROKERS.LINODE;
    }
    return BROKERS.VG;
};
export function* watchForMqttEvents({ topic }) {
    try {
        if (authToken.isExpired()) {
            return;
        }

        if (topics.has(topic)) return;
        topics.add(topic);

        const broker = chooseBroker(topic);
        yield call(connectToMqtt, { broker });
        yield call(subscribe, { topic, broker });

        if (channels.has(broker)) return;
        const channel = yield call(createMqttChannel, { broker });
        channels.set(broker, channel);

        while (true) {
            const { incomingTopic, data } = yield take(channel);
            yield put({ type: `MQTT/${incomingTopic}`, data });
        }
    } catch (error) {
        throw new Error(error);
    }
}
