import {DataDispatcher} from "../test_runner/data_dispatcher";
import LatencyCollectorProvider from "../tools/latency_collector_provider";
import Pipe from "../pipeline/pipe";
import BytesCollectorPipe from "../pipeline/bytes_collector_pipe";
import TimeFilterPipe from "../pipeline/time_filter_pipe";
import BroadcastingPipe from "../pipeline/broadcasting_pipe";
import AveragingBandwidthCalculationPipe from "../pipeline/averaging_bandwidth_calculation_pipe";
import {AppConfiguration} from "../app_configuration";


export default class BandwidthMeasurementDispatcher {
  async run(
    latency_channel,
    bytes_channel,
    bandwidth_channel,
    warmup_debug_channel,
    bandwidth_collector,
    duration_s) {
    const latency_collector = LatencyCollectorProvider.provide(
      (latency) => {
        console.debug("latency", latency);
        DataDispatcher.broadcast_to(
          latency_channel,
          latency
        )
      }
    );
    let warmed_up = false, warmup_start = Date.now();
    const data_source = new Pipe();
    const bw_collector = new bandwidth_collector((chunk_size) => {
      data_source.feed(chunk_size);
    });
    const start_measuring_latency_and_cancel_bw_measurement_after_specified_duration = () => {
      latency_collector.start();
      setTimeout(() => {
        bw_collector.cancel();
      }, duration_s * 1e3);
    };
    data_source
      .pipe_to(new BytesCollectorPipe())
      .pipe_to(new BroadcastingPipe(bytes_channel))
      .pipe_to(new TimeFilterPipe(AppConfiguration.IGNORE_FIRST_N_SECONDS * 1e3))
      .pipe_to(new AveragingBandwidthCalculationPipe(AppConfiguration.MINIMUM_AVERAGE_WINDOW))
      .pipe_to(new BroadcastingPipe(bandwidth_channel))
      .pipe_to(() => {
        if (warmed_up) {
          return;
        }
        warmed_up = true;
        const warmup_time = Date.now() - warmup_start;
        DataDispatcher.broadcast_to(warmup_debug_channel, warmup_time);
        console.debug(`Warm up took: ${warmup_time}`);
        start_measuring_latency_and_cancel_bw_measurement_after_specified_duration();
      });
    await bw_collector.start_measuring();
    latency_collector.cancel();
    console.debug("Cancelled latency measurement");
  }
}
