import { Injectable } from '@angular/core';
import { BehaviorSubject, Observable, ReplaySubject, debounceTime } from 'rxjs';
import { Model, ModelType, cloneLatest, getIdFields, getPath } from 'tw2-common';
import { UpdateType, registerModelHook } from 'tw2-mh';
import { ModelService } from './model.service';
import { MQTTService } from './mqtt.service';

@Injectable()
export class ModelMQTTService implements ModelService {
	private listSubjects: { [type: string]: { [keys: string]: BehaviorSubject<any[]> } } = {};
	//private modelLists: { [type: string]: { [keys: string]: Model[] } } = {};
	private readSubjects: { [type: string]: { [keys: string]: ReplaySubject<any> } } = {};

	//	private modelSubjects: { [type: string]: ReplaySubject<Model[]> } = {};
	constructor(
		private mqttService: MQTTService
	) {
	}

	public create<T extends Model>(modelType: ModelType<T>, data: T): Observable<T> {
		throw new Error('Create not supported over MQTT');
	}

	public delete<T extends Model>(modelType: ModelType<T>, data: T): Observable<T> {
		throw new Error('Delete not supported over MQTT');
	}

	public list<T extends Model>(
		modelType: ModelType<T>,
		keys: { [key in keyof T]?: string[] | string } | Partial<T> = {},
		debounceTimeout = 50

	): Observable<T[]> {
		const cLatest = cloneLatest(modelType)
		const list: T[] = [];
		const tmp: { [key: string]: any } = {};
		getIdFields(modelType).forEach(f => {
			tmp[f as string] = (keys as any)[f];
		})
		const id = JSON.stringify(tmp);
		if (!this.listSubjects[modelType.name])
			this.listSubjects[modelType.name] = {};
		if (!this.listSubjects[modelType.name][id]) {

			this.listSubjects[modelType.name][id] = new BehaviorSubject<T[]>(list);
			this.mqttService.getClient().subscribe(async client => {
				registerModelHook(
					client,
					modelType,
					async (updateType: UpdateType, model: T) => {
						if (cLatest && model.id == 'latest')
							return;
						const path = getPath(modelType, model).join('/');
						const i = list.findIndex(p => getPath(modelType, p).join('/') == path);
						switch (updateType) {
							case 'update':
								if (i == -1)
									list.push(model);
								else
									list.splice(i, 1, model);
								break;
							case 'delete':
								if (i !== -1)
									list.splice(i, 1);
								break;
						}
						this.listSubjects[modelType.name][id].next([...list]);
					},
					keys
				);
			})
		}

		const subject = this.listSubjects[modelType.name][id];
		if (debounceTimeout > 0)
			return subject.pipe(debounceTime(debounceTimeout));
		return subject;
	}

	public latest<T extends Model>(
		modelType: ModelType<T>,
		keys: { [key in keyof T]?: string[] | string } | Partial<T> = {}
	): Observable<T> {
		if (!cloneLatest(modelType))
			throw Error('Invalid model type');
		return this.read(modelType, Object.assign(keys, { id: 'latest' }));
	}

	public read<T extends Model>(
		modelType: ModelType<T>,
		keys: { [key in keyof T]?: string[] | string } | Partial<T> = {}
	): Observable<T> {
		const tmp: { [key: string]: any } = {};

		getIdFields(modelType).forEach(f => {
			tmp[f as string] = (keys as any)[f];
		})
		const id = JSON.stringify(tmp);

		if (!this.readSubjects[modelType.name])
			this.readSubjects[modelType.name] = {};
		if (!this.readSubjects[modelType.name][id]) {
			this.readSubjects[modelType.name][id] = new ReplaySubject<T[]>(1);
			this.mqttService.getClient().subscribe(client => {
				registerModelHook(
					client,
					modelType,
					async (updateType: UpdateType, model: T) => {
						switch (updateType) {
							case 'delete':
								return this.readSubjects[modelType.name][id].next(undefined)
							case 'update':
								return this.readSubjects[modelType.name][id].next(model)
						}

					},
					keys
				);
			})
		}
		return this.readSubjects[modelType.name][id];
	}

	public update<T extends Model>(modelType: ModelType<T>, data: T): Observable<T> {
		throw new Error('Update not supported over MQTT');
	}
}
