import {catchError, EMPTY, finalize, mergeMap, Observable, Subject, tap} from "rxjs";
import {action, computed, makeObservable, observable, reaction} from "mobx";
import {AxiosError, isAxiosError} from "axios";

export class PipeManager<T, R = any> {
    private readonly pipe = new Subject<[number, T]>()
    readonly inPipe = new Set<number>();
    error = ''

    constructor(process: (o: T) => Observable<R>) {
        makeObservable(this, {
            process: false,
            inPipe: observable,
            error: observable,
            setError: action,
            setInPipe: action,
            hasInPipe: computed,
            nInPipe: computed,
        })
        this.pipe.pipe(
            tap(([key, _]) => this.setInPipe(key, true)),
            mergeMap(([key, object]) =>
                process(object).pipe(
                    tap(() => {
                        this.setError('');
                    }),
                    finalize(() => this.setInPipe(key, false)),
                    catchError((err) => {
                        console.error('Cannot process element', key, JSON.stringify(object), err);
                        if (isAxiosError(err) || err?.name === 'AxiosError') {
                            const axiosError: AxiosError<any> = err;
                            const msg = axiosError.response?.data?.message
                                || axiosError.response?.data
                                || axiosError.response?.statusText
                                || axiosError.message
                                || String(axiosError)
                            this.setError(msg);
                        } else {
                            this.setError(String(err));
                        }
                        return EMPTY;
                    }),
                ))
        ).subscribe()
    }

    process(key: number, object: T) {
        this.pipe.next([key, object]);
    }

    setInPipe(key: number, inPipe: boolean) {
        if (inPipe) {
            this.inPipe.add(key);
        } else {
            this.inPipe.delete(key);
        }
    }

    setError(error: string) {
        this.error = error;
    }

    get hasInPipe() {
        return this.inPipe.size !== 0;
    }

    get nInPipe(): number {
        return this.inPipe.size;
    }
}

export class PipeManager2<T, R = any> {
    private inc = 0;
    public readonly m: PipeManager<T, R>;

    constructor(process: (o: T) => Observable<R>, private onEmpty: Subject<void>) {
        this.m = new PipeManager<T, R>(process);
        reaction(() => this.m.nInPipe, (next, prev) => {
            console.log('PipeManager2', prev, next);
            if (next === 0 && prev > 0) {
                // We went to zero
                onEmpty.next();
            }
        })
    }

    processNew(object: T) {
        this.m.process(this.inc++, object);
    }
}
