import { of, NEVER, Observable } from "rxjs";
import { switchMap, first, map } from "rxjs/operators";
import { Realtime } from "ably";
import { fetchAsObservable } from "./fetcher";
import canopyUrls from "canopy-urls!sofe";
import auth from "cp-client-auth!sofe";
import { onlineListener } from "online-listener";

let ablyClient;

export function onAbly(eventName, channelName) {
  return onlineListener.pipe(
    switchMap((online) => {
      if (!online) {
        return NEVER;
      }

      const channelName$ = channelName
        ? of(channelName)
        : auth.getLoggedInUserAsObservable().pipe(
            first(),
            map((loggedInUser) => `private:${loggedInUser.id}`)
          );

      return channelName$.pipe(
        switchMap((channelName) => {
          if (!ablyClient) {
            ablyClient = new Realtime({
              authCallback: (tokenParams, callback) => {
                fetchAsObservable(
                  `${canopyUrls.getAPIUrl()}/ably-authenticate`
                ).subscribe(
                  (authData) => callback(null, authData),
                  (err) => callback(err)
                );
              },
            });
          } else if (ablyClient.connection.state === "closed") {
            ablyClient.connect();
          }

          const channel = ablyClient.channels.get(channelName);

          return new Observable((observer) => {
            const messageHandler = (message) => {
              observer.next(message.data);
            };

            const errorHandler = (stateChange) => {
              if (stateChange.current === "failed") {
                observer.error(stateChange.reason);
              }
            };

            channel.subscribe(eventName, messageHandler);
            ablyClient.connection.on(errorHandler);

            return () => {
              channel.unsubscribe(eventName, messageHandler);
              ablyClient.connection.off(errorHandler);
            };
          });
        })
      );
    })
  );
}
