import { of, throwError, timer, BehaviorSubject, EMPTY, Observable, Observer } from 'rxjs';
import { delay, finalize, retryWhen, switchMap } from 'rxjs/operators';

import { NGXLogger } from 'ngx-logger';

import { Message } from 'google-protobuf';
import { StatusCode } from 'grpc-web';

import { RETRY_STRATEGY_CONFIG } from '@core/configs';

import { EStreamType } from '../enums/stream-type.grpc.enum';
import { grpcMetadata } from '../helpers/grpc-metadata';
import { IServiceError } from '../interfaces/common/service-error.interface';
import { IStreamData, IStreamResponse, IStreamStatus } from '../interfaces/grpc-interfaces';

export const grpcStreamLastTimestamp: BehaviorSubject<number> = new BehaviorSubject(new Date().getTime());
export const browserTabVisibility: BehaviorSubject<boolean> = new BehaviorSubject(true);

export function grpcStream<T>(
    client: Object,
    method: string,
    streamRequestData: Message,
    // @ts-ignore
    logger: Console | NGXLogger = console,
    metadata: boolean = false,
    token: string = null,
    retryStrategy: boolean = true,
): Observable<T> {
    // INFO: to enable logger for stream;
    const isEnabledLogger: boolean = true;

    const id: () => string = () => `f${(~~(Math.random() * 1e8)).toString(16)}`;
    // const isActive$: BehaviorSubject<boolean> = new BehaviorSubject(document.visibilityState === 'visible');
    const retryDelay: number = RETRY_STRATEGY_CONFIG.retryWhenTimeout ?? 5000;

    let stream: IStreamResponse<T> = null;
    let streamId: string = null;
    let retryCounter: number = retryStrategy ? RETRY_STRATEGY_CONFIG.retryWhenAttempts ?? 5 : 0;

    const innerLogger: Function = (message: string, shouldEnable: boolean): void => {
        if (shouldEnable) {
            logger.log(message);
        }
    };

    const isEmptyStreamData: Function = (data: IStreamData<T>): boolean => {
        let ret = true;

        for (const elem of data.array) {
            if (!(typeof elem === 'undefined' || (elem && elem.length === 0))) {
                ret = false;
            }
        }
        return ret;
    };

    const clearStream: Function = (): void => {
        if (stream) {
            innerLogger(`Stream ${method} (${streamId}) disconnected`, isEnabledLogger);
            stream.cancel();
            stream = null;

            innerLogger(`Stream ${method} (${streamId}) clear`, isEnabledLogger);
            streamId = null;
        }
    };

    // const visibilityChangeListener: EventListenerOrEventListenerObject = () => {
    //     isActive$.next(document.visibilityState === 'visible');
    // };

    // document.addEventListener('visibilitychange', visibilityChangeListener);

    const data: Observable<T> = new Observable((observer: Observer<T>) => {
        streamId = id();

        if (metadata) {
            stream = client[method](streamRequestData, grpcMetadata(token));
        } else {
            stream = client[method](streamRequestData);
        }

        innerLogger(`Stream ${method} (${streamId}) connected`, isEnabledLogger);

        stream.on(EStreamType.DATA, (response: IStreamData<T>) => {
            const resp: T = isEmptyStreamData(response) ? undefined : response.toObject();

            grpcStreamLastTimestamp.next(new Date().getTime());

            observer.next({ ...resp, streamId: streamId });
        });

        stream.on(EStreamType.STATUS, (status: IStreamStatus) => {
            if (status.code !== StatusCode.OK) {
                clearStream();

                innerLogger(`Stream ${method} (${streamId}) catch error (${JSON.stringify(status)})`, isEnabledLogger);

                observer.error({ ...status, method: method, streamId: streamId });
                observer.complete();
            }
        });

        stream.on(EStreamType.END, () => {
            timer(1).subscribe(() => {
                clearStream();

                innerLogger(`Stream ${method} (${streamId}) catch END code`, isEnabledLogger);

                observer.error({ details: 'end stream', method: method, streamId: streamId });
                observer.complete();
            });
        });
    });

    return /*isActive$*/ data.pipe(
        // Commented (03-02-2020) Turn off streams with 5 sec delay, turn on immediately
        // switchMap((isActive: boolean) => of(isActive).pipe(delay(+!isActive * 5000))),
        // distinctUntilChanged(),
        // tap((isActive: boolean) => {
        //     browserTabVisibility.next(isActive);

        //     if (!isActive) {
        //         innerLogger(`Stream ${method} (${streamId}) inactive page, init disconnect and clear`, isEnabledLogger);

        //         clearStream();
        //     }
        // }),
        // switchMap((isActive: boolean) => (isActive ? data : EMPTY)),
        finalize(() => {
            clearStream();
        }),
        retryWhen((errors) =>
            errors.pipe(
                switchMap((error: IServiceError) =>
                    retryCounter-- ? of(EMPTY).pipe(delay(retryDelay)) : throwError(error),
                ),
            ),
        ),
    );
}
