import {
  CloudWatchLogsClient as Client,
  DescribeLogStreamsCommand,
  DescribeLogStreamsResponse,
  InputLogEvent,
  PutLogEventsCommand,
  PutLogEventsCommandOutput,
} from "@aws-sdk/client-cloudwatch-logs";
import { Credentials } from "aws-sdk";
import { ILog } from "lib/Logger";

import AbstractClientAggregator from "../AbstractClientAggregator/AbstractClientAggregator";

export interface CloudWatchLogsSettings {
  cloudWatchNamespace: string;
  cloudWatchLogGroup: string;
  cloudWatchLogStream: string;
}

/**
 * CloudWatchLogsClient is a wrapper around @aws-sdk/client-cloudwatch-logs
 * that initializes the SDK, and handles the lifecycle of a sequenceToken, which
 * is used to make subsequent requests. Cloudwatch throttles events if they
 * get sent quickly. To combat this, CloudWatchLogsClient sends events at a fixed
 * interval using the AbstractClientAggregator.
 */
class CloudWatchLogsClient extends AbstractClientAggregator<
  InputLogEvent,
  PutLogEventsCommandOutput
> {
  private static instance: CloudWatchLogsClient | undefined;
  public static getInstance(
    credentials: Credentials,
    region: string,
    logger: ILog,
    settings: CloudWatchLogsSettings
  ): CloudWatchLogsClient {
    if (!CloudWatchLogsClient.instance) {
      CloudWatchLogsClient.instance = new CloudWatchLogsClient(
        credentials,
        region,
        logger,
        settings
      );
    }

    return CloudWatchLogsClient.instance;
  }
  private cloudWatchLogsClient: Client;
  private sequenceToken: string | undefined;
  constructor(
    credentials: Credentials,
    region: string,
    private readonly logger: ILog,
    private readonly settings: CloudWatchLogsSettings
  ) {
    super({
      pollMs: 2500,
      pollEngineName: "CloudWatchLogsClientPoll",
      chunkSize: 100,
    });

    // Register the function and context that the PollEngine will use to execute
    this.pollEngine.registerContext(this);
    this.pollEngine.registerPollFn(this.sendQueuedCommands);

    this.cloudWatchLogsClient = new Client({
      apiVersion: "2010-08-01",
      region,
      credentials,
    });

    this.init()
      .then(() => {
        this.pollEngine.startPolling();
      })
      .catch((err) => {
        this.logger.error(
          "Unable to initialize CloudWatchLogs, cannot send logs.",
          err
        );
      });

    return this;
  }
  /**
   * Before any calls are made into the PutLogEvent command, we need
   * access to the uploadSequenceToken.
   */
  private async init() {
    const logStreamInput: DescribeLogStreamsCommand = new DescribeLogStreamsCommand(
      {
        logGroupName: this.settings.cloudWatchLogGroup,
        logStreamNamePrefix: this.settings.cloudWatchLogStream,
      }
    );

    const describeLogStreams = await this.executeDescribeLogStreams(
      logStreamInput
    );

    const [logStream] = describeLogStreams?.logStreams || [];
    this.sequenceToken = logStream?.uploadSequenceToken;
  }
  public executeDescribeLogStreams = (
    describeLogStreams: DescribeLogStreamsCommand
  ): Promise<DescribeLogStreamsResponse> => {
    return this.cloudWatchLogsClient.send(describeLogStreams);
  };
  public executePutLogEventStreams = async (): Promise<PutLogEventsCommandOutput | void> => {
    if (this.dataChunk.isEmpty()) {
      return Promise.resolve();
    }

    const [chunkedLogEvents] = this.dataChunk.getDataChunks().data;

    const putEventCommand: PutLogEventsCommand = new PutLogEventsCommand({
      logGroupName: this.settings.cloudWatchLogGroup,
      logStreamName: this.settings.cloudWatchLogStream,
      sequenceToken: this.sequenceToken || undefined,
      logEvents: chunkedLogEvents,
    });

    try {
      const response: PutLogEventsCommandOutput = await this.cloudWatchLogsClient.send(
        putEventCommand
      );

      this.dataChunk.clear(chunkedLogEvents.length);
      this.sequenceToken = response?.nextSequenceToken;
      return response;
    } catch (e: any) {
      const didStopPolling = this.trackError();

      if (didStopPolling) {
        return;
      }

      /**
       * https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html
       *
       * DataAlreadyAcceptedException - The event was already logged.
       * InvalidSequenceTokenException - The sequence token is not valid.
       */
      if (e?.name === "DataAlreadyAcceptedException") {
        this.dataChunk.clear(chunkedLogEvents.length);
        this.sequenceToken = e.expectedSequenceToken;
        return;
      } else if (e?.name === "InvalidSequenceTokenException") {
        this.sequenceToken = e.expectedSequenceToken;
        // if the token was invalid, immediately flush and resolve logs
        this.flush();
      }

      this.logger.error(
        "Unable to send CloudWatchLogs. Will try again later.",
        e
      );
    }
  };
  public flush = () => {
    this.sendQueuedCommands();
  };
  /**
   * We do not want to throttle the API. Add logs in a queue, and send logs
   * at a the this.putLogPollMs specified interval.
   */
  public enqueuePutLogEvent = (putLogEvents: InputLogEvent[]): void => {
    this.dataChunk.addData(putLogEvents);
  };
  protected sendQueuedCommands = () => {
    return this.executePutLogEventStreams();
  };

  public static __removeInstance__() {
    if (process.env.NODE_ENV !== "test") {
      return;
    }
    CloudWatchLogsClient.instance = undefined;
  }
}

export default CloudWatchLogsClient;
