import { HubConnection, HubConnectionBuilder } from '@microsoft/signalr';
import { IHttpConnectionOptions } from '@microsoft/signalr/src/IHttpConnectionOptions';
import { IStreamResult, ISubscription } from '@microsoft/signalr/src/Stream';
import Debug from 'debug';
import { forEach } from 'lodash';
import Semaphore from 'semaphore-async-await';

import { EventEmitter, ProgressMonitor } from '../basic';
import { Connector as WebServicesConnector } from '../../utils/connector';

const HUB_API = '/hubs';

let idGenerator = 1;

const debug = Debug('common:ws:WebSocketConnector');
const debugWS = debug.extend('signalR');

export type PrepareMessageCallback<T> = (type: string, messageContent: any) => T;

export class WebSocketConnector {
    private static instance: WebSocketConnector;

    readonly #connector: WebServicesConnector = WebServicesConnector.getInstance();

    readonly #connectionsByAPI: Record<string, WebSocketConnection> = {};

    readonly #connectionSemaphoreByURL: Record<string, Semaphore>;

    private constructor() {
        this.#connectionSemaphoreByURL = {};
    }

    static getInstance(): WebSocketConnector {
        if (!WebSocketConnector.instance) {
            WebSocketConnector.instance = new WebSocketConnector();
        }

        return WebSocketConnector.instance;
    }

    async connect(url: string, progressMonitor: ProgressMonitor = ProgressMonitor.empty()): Promise<WebSocketConnection> {
        let builder = new HubConnectionBuilder();

        const connectionOptions: IHttpConnectionOptions = {
            headers: {},
            accessTokenFactory: this.#connector.getAccessTokenFactory(),
            logger: (debugWS.enabled) ? 1 : 3,
        };

        this.#connector.fillAuthentication(connectionOptions.headers!);

        builder = builder.withUrl(url, connectionOptions);

        builder = builder.withAutomaticReconnect();

        const hubConnection = builder.build();

        debug('Connect to url=', url, ' options=', connectionOptions);

        const connection = new WebSocketConnection(hubConnection);

        connection.on('OnStopping', () => {
            delete this.#connectionsByAPI[url];
        });

        await connection.start(progressMonitor);

        progressMonitor.verifyCancelled();

        debug('Connection started');

        return connection;
    }

    async subscribeChannel<T>(api: string, serviceName: string, channelName: string, params: Record<string, any>, prepareMessage?: PrepareMessageCallback<T>, progressMonitor: ProgressMonitor = ProgressMonitor.empty()): Promise<WebSocketChannel<T>> {
        const baseURL = `${api}${HUB_API}`;
        if (serviceName.charAt(0) !== '/') {
            serviceName = `/${serviceName}`;
        }
        const url = `${baseURL}${serviceName}`;

        let semaphore = this.#connectionSemaphoreByURL[url];
        if (!semaphore) {
            semaphore = new Semaphore(1);
            this.#connectionSemaphoreByURL[url] = semaphore;
        }

        let connection: WebSocketConnection;
        await semaphore.acquire();
        try {
            connection = this.#connectionsByAPI[url];
            if (!connection) {
                connection = await this.connect(url, progressMonitor);

                connection.on('OnChannelClosed', () => {
                    if (connection.openedChannelsCount) {
                        return;
                    }

                    connection.close();
                });

                this.#connectionsByAPI[url] = connection;
            }
        } finally {
            semaphore.release();

            if (semaphore.getPermits() === 1) {
                delete this.#connectionSemaphoreByURL[url];
            }
        }

        progressMonitor.verifyCancelled();

        const channel = await connection.subscribe<T>(channelName, params || {}, prepareMessage, progressMonitor);

        progressMonitor.verifyCancelled();

        return channel;
    }

    async listenChannel<T>(serverApi: string, serviceName: string, messageTypes: string[], prepareMessage?: PrepareMessageCallback<T>, progressMonitor: ProgressMonitor = ProgressMonitor.empty()): Promise<WebSocketChannel<T>> {
        const baseURL = `${serverApi}${HUB_API}`;
        if (serviceName.charAt(0) !== '/') {
            serviceName = `/${serviceName}`;
        }
        const url = `${baseURL}${serviceName}`;

        let semaphore = this.#connectionSemaphoreByURL[url];
        if (!semaphore) {
            semaphore = new Semaphore(1);
            this.#connectionSemaphoreByURL[url] = semaphore;
        }

        let connection: WebSocketConnection;
        await semaphore.acquire();
        try {
            connection = this.#connectionsByAPI[url];
            if (!connection) {
                connection = await this.connect(url, progressMonitor);

                connection.on('OnChannelClosed', () => {
                    if (connection.openedChannelsCount) {
                        return;
                    }

                    connection.close();
                });

                this.#connectionsByAPI[url] = connection;
            }
        } finally {
            semaphore.release();

            if (semaphore.getPermits() === 1) {
                delete this.#connectionSemaphoreByURL[url];
            }
        }

        progressMonitor.verifyCancelled();

        const channel = await connection.listen<T>(messageTypes, prepareMessage, progressMonitor);

        progressMonitor.verifyCancelled();

        return channel;
    }
}

export interface ConnectionWSEventTypes {
    OnStarting: () => void;
    OnRunning: () => void;
    OnReconnecting: (error?: Error) => void;
    OnReconnected: (error?: Error) => void;
    OnStopping: () => void;
    OnStopped: () => void;
    OnChannelOpened: () => void;
    OnChannelClosed: () => void;
}

export type ConnectionWSState =
    'initialized'
    | 'starting'
    | 'started'
    | 'running'
    | 'stopping'
    | 'stopped'
    ;

export class WebSocketConnection extends EventEmitter<ConnectionWSEventTypes> {
    readonly #id = `#${idGenerator++}`;

    readonly #connection: HubConnection;
    #state: ConnectionWSState;
    #openedChannels = 0;

    readonly #links: Record<string, number> = {};

    constructor(connection: HubConnection) {
        super();

        this.#connection = connection;

        connection.onreconnecting(this.handleReconnecting);
        connection.onreconnected(this.handleReconnected);

        this.#state = 'initialized';
        this.updateStateId();
    }

    get openedChannelsCount(): number {
        return this.#openedChannels;
    }

    async start(progressMonitor: ProgressMonitor) {
        debug('Start connection', this.#id, 'current state=', this.#state);

        if (this.#state !== 'initialized') {
            throw new Error('Invalid state');
        }
        this.#state = 'starting';
        this.updateStateId();
        this.emit('OnStarting');

        await this.#connection.start();

        this.#state = 'running';
        this.updateStateId();
        this.emit('OnRunning');
    }

    async close() {
        debug('Close connection', this.#id, 'current state=', this.#state);

        if (this.#state === 'initialized') {
            return;
        }

        if (this.#state !== 'running') {
            throw new Error('Invalid state');
        }
        this.#state = 'stopping';
        this.updateStateId();
        this.emit('OnStopping');

        await this.#connection.stop();

        this.#state = 'stopped';
        this.updateStateId();
        this.emit('OnStopped');
    }

    handleReconnecting = (error?: Error) => {
        debug('Connection', this.#id, 'received reconnecting', error);

        this.emit('OnReconnecting', error);
    };

    handleReconnected = () => {
        debug('Connection', this.#id, 'received reconnected');

        this.emit('OnReconnected');
    };

    listen<T>(messageTypes: string[], prepareMessage?: (type: string, messageBody: any) => T, progressMonitor?: ProgressMonitor): WebSocketChannel<T> {
        const channel = new WebSocketConnectionChannel<T>(this.#connection, messageTypes, prepareMessage, this.#links);

        this.#openedChannels++;

        channel.on('OnStopped', () => {
            this.#openedChannels--;
            this.emit('OnChannelClosed');
        });

        this.emit('OnChannelOpened');

        return channel;
    }

    async subscribe<T>(name: string, params: Record<string, any>, prepareMessage?: PrepareMessageCallback<T>, progressMonitor?: ProgressMonitor): Promise<WebSocketChannel<T>> {
        debug('Connection', this.#id, 'subscribe channel', name, 'params=', params);

        const result: IStreamResult<any> = this.#connection.stream(name, params);

        const channel = new WebSocketResultChannel<T>(name, result, prepareMessage);

        let disposed = false;

        channel.on('OnStopped', () => {
            if (disposed) {
                return;
            }
            disposed = true;
            this.#openedChannels--;
            this.emit('OnChannelClosed');
        });
        channel.on('OnError', () => {
            if (disposed) {
                return;
            }
            disposed = true;
            this.#openedChannels--;
            this.emit('OnChannelClosed');
        });

        this.#openedChannels++;
        this.emit('OnChannelOpened');

        return channel;
    }
}

export type ConnectionChannelWSState = 'running' | 'stopping' | 'stopped' | 'errored';

export interface ConnectionChannelWSEventTypes<T> {
    OnStopped: () => void;
    OnError: (error: any) => void;
    OnMessage: (type: string, message: T) => void;
}

export class WebSocketChannel<T> extends EventEmitter {
    protected state: ConnectionChannelWSState;
    protected id = `#${idGenerator++}`;

    protected prepareMessage?: PrepareMessageCallback<T>;


    constructor(prepareMessage?: PrepareMessageCallback<T>) {
        super();
        this.prepareMessage = prepareMessage;

        debug('Create channel', this.id);

        this.state = 'running';
    }

    async close(): Promise<void> {
    }
}

export class WebSocketConnectionChannel<T> extends WebSocketChannel<T> {
    readonly #connection: HubConnection;
    readonly #handlersByMessageType: Record<string, (message: any) => void> = {};
    #closed = false;
    readonly #links: Record<string, number>;

    constructor(connection: HubConnection, messageTypes: string[], prepareMessage: PrepareMessageCallback<T> | undefined, links: Record<string, number>) {
        super(prepareMessage);

        this.#connection = connection;
        this.#links = links;

        messageTypes.forEach((messageType) => {
            const handler = (message: any) => {
                this.handleMessage(messageType, message);
            };
            this.#handlersByMessageType[messageType] = handler;
            connection.on(messageType, handler);
        });
    }

    get connection() {
        return this.#connection;
    }

    link(name: string): boolean {
        const links = this.#links;

        const linkCount = links[name];
        if (linkCount === undefined) {
            links[name] = 1;
            debug('link', 'Open link=', name);

            return true;
        }

        debug('link', 'Add link=', name, 'count=', (linkCount + 1));

        links[name] = linkCount + 1;

        return false;
    }

    unlink(name: string): boolean {
        const links = this.#links;

        const linksCount = links[name];
        if (linksCount === undefined) {
            console.error('Unknown unlink name=', name);

            return false;
        }

        if (linksCount === 1) {
            debug('unlink', 'Close link=', name);
            delete links[name];

            return true;
        }

        debug('unlink', 'Remove link=', name, 'count=', (linksCount - 1));

        links[name] = linksCount - 1;

        return false;
    }

    handleMessage = (messageType: string, message: any) => {
        if (this.#closed) {
            return;
        }

        let preparedMessage;
        if (this.prepareMessage) {
            preparedMessage = this.prepareMessage(messageType, message);
        } else {
            preparedMessage = message as T;
        }

        debug('Channel ', this.id, 'get message=', preparedMessage);

        this.emit('OnMessage', messageType, preparedMessage);
    };

    async close(): Promise<void> {
        if (this.#closed) {
            return;
        }
        this.#closed = true;

        forEach(this.#handlersByMessageType, (handler, messageType) => {
            this.#connection.off(messageType, handler);
        });

        this.emit('OnStopped');
    }
}

export class WebSocketResultChannel<T> extends WebSocketChannel<T> {
    readonly #subscription: ISubscription<T>;
    #closed = false;
    readonly #streamName: string;

    constructor(streamName: string, streamResult: IStreamResult<T>, prepareMessage?: PrepareMessageCallback<T>) {
        super(prepareMessage);

        debug('Create channel', this.id);

        this.state = 'running';
        this.#streamName = streamName;

        this.#subscription = streamResult.subscribe({
            next: this.handleStreamNext,
            complete: this.handleStreamComplete,
            error: this.handleStreamError,
        });
    }

    handleStreamNext = (message: any) => {
        if (this.#closed) {
            return;
        }

        let preparedMessage;
        if (this.prepareMessage) {
            preparedMessage = this.prepareMessage(this.#streamName, message);
        } else {
            preparedMessage = message as T;
        }

        debug('Channel ', this.id, 'get message=', preparedMessage);
        this.emit('OnMessage', this.#streamName, preparedMessage);
    };

    handleStreamComplete = () => {
        debug('Channel ', this.id, 'get stream complete. current status=', this.state);

        this.state = 'stopped';
        this.updateStateId();

        this.emit('OnStopped');
    };

    handleStreamError = (error?: any) => {
        debug('Channel ', this.id, 'get stream error. error=', error, 'current status=', this.state);

        this.state = 'errored';
        this.updateStateId();

        this.emit('OnError', error);
    };

    async close(): Promise<void> {
        debug('Channel ', this.id, 'request close. current status=', this.state);

        if (this.#closed) {
            return;
        }
        this.#closed = true;

        if (this.state !== 'running') {
            throw new Error(`Invalid state ${this.state}`);
        }

        this.state = 'stopping';
        this.updateStateId();

        this.#subscription.dispose();
    }
}
