import { BehaviorSubject, forkJoin, Observable, of, Subject } from 'rxjs';
import {
    bufferToggle, catchError, concatMap,
    delay, distinctUntilChanged, filter,
    first, map, scan, startWith, tap
} from 'rxjs/operators';
import { retryWithDelay } from './retryWithDelay';

export interface IMuxRegistryEntry<TRequest, TResponse> {
    ingress$: Subject<TRequest>;
    result$: Subject<TResponse>;
}
export type MuxRegistry<TRequest, TResponse> = {
    [key: string]: IMuxRegistryEntry<TRequest, TResponse>;
};
export type MuxConfig = {
    bufferTime: number;
    retryCooldown: number;
    maxRetries: number;
};

export class Mux<TRequest, TResponse> {
    static showConsoleLog = true;
    constructor(
        private config: MuxConfig,
        private getKey: (req: TRequest) => string,
        private worker: (req: TRequest) => Observable<TResponse>,
        private onlineNotifier: Observable<boolean> = of(true)
    ) { }

    registry: MuxRegistry<TRequest, TResponse> = {};

    // SubscribableOrPromise
    private updatePendingOperationsCount$ = new Subject<number>();
    private currentPendingOperations$ = new BehaviorSubject(0);
    private isOnline$ = new BehaviorSubject<boolean>(true);
    private modifyPendingOperationsCountEffect$ = this.updatePendingOperationsCount$.pipe(
        scan((acc, x) => acc + x, 0),
        tap(x => { this.log('mux: pending operations count', x); }),
        distinctUntilChanged(),
        tap(this.currentPendingOperations$)
    ).subscribe();

    private onlineNotificationEffect$ = this.onlineNotifier
        .pipe(
            distinctUntilChanged(),
            tap((isOnline) => this.isOnline$.next(isOnline))
        )
        .subscribe();

    public get notifyPendingOperationsCount$() {
        return this.currentPendingOperations$;
    }


    next$ = (stream: Observable<TRequest>) =>
        stream.pipe(
            concatMap(req => forkJoin([
                of(req),
                of(this.getRegistryEntry(this.getKey(req))),
            ])),
            concatMap(([req, { ingress$, result$ }]) => {
                console.log('worksheet-mux:next ingress', req);
                ingress$.next(req);
                return result$
                    .pipe(
                        first(),
                        tap(() => {
                            console.log('worksheet-mux:operation complete:', req);
                        })
                    );
            })
        );
    getInProgressNotifier$() {
        return this.currentPendingOperations$
            .pipe(
                map((count) => count > 0),
                distinctUntilChanged()
            );
    }
    getIsCompletedNotifier$() {
        return this.getInProgressNotifier$().pipe(startWith(true), map(isInProgress => !isInProgress), distinctUntilChanged());
    }
    onComplete() {
        return this.getIsCompletedNotifier$().pipe(startWith(false), filter(x => x === true), first());
    }
    isInProgress = () => Object
        .keys(this.registry)
        .map(key => this.isMuxInProgress(this.registry[key]))
        .reduce((acc, cur) => acc || cur, false);

    private log(...data: any[]) { if (Mux.showConsoleLog) { console.log(data); } }

    private getRegistryEntry = (key: string) => {
        const muxInstance = this.registry[key];
        const isReady = this.isMuxOpen(muxInstance);
        return isReady ? muxInstance : this.createMuxInstance(key);
    };

    private isMuxOpen(muxInstance: IMuxRegistryEntry<TRequest, TResponse>) {
        if (!muxInstance) {
            return false;
        }
        return !muxInstance.ingress$.isStopped;
    }

    private isMuxInProgress(muxInstance: IMuxRegistryEntry<TRequest, TResponse>) {
        return !muxInstance?.result$.closed ?? false;
    }


    private createMuxInstance(key: string) {
        const ingress$ = new Subject<TRequest>();
        const result$ = new Subject<TResponse>();
        this.updatePendingOperationsCount$.next(1);

        const buffer$ = forkJoin([ingress$.pipe(
            bufferToggle(
                of(true),
                (i) => this.isOnline$.pipe(
                    concatMap((isOnline) => {
                        if (!isOnline) {
                            console.log('mux: waiting for IsOnline status to change');

                            return this.isOnline$.pipe(first(val => val === true));
                        }
                        return of(true).pipe(delay(this.config.bufferTime));
                    }),
                    first(),
                    tap(() => {
                        console.log('mux: client IsOnline, process request');
                    })
                ),
            ),
            filter(({ length }) => length > 0),
            map(next => next.pop()),
            first()
        )]);

        buffer$.pipe(
            concatMap(([req]) => {
                ingress$.complete();
                this.log('mux: executing worker');
                return this.worker(req).pipe(
                    first(),
                    retryWithDelay(this.config.retryCooldown, this.config.maxRetries),
                    tap(() => {
                        this.log('mux: worker completed');
                        this.updatePendingOperationsCount$.next(-1);
                    })
                );
            }),
            catchError(err => of(err)),
            tap(x => {
                result$.next(x);
                result$.complete();

            }),
            first()
        ).subscribe();

        return this.registry[key] = {
            ingress$,
            result$
        } as IMuxRegistryEntry<TRequest, TResponse>;
    }

}
