This report will be a little bit longer than normal cause it will comprehend all the week changes too. So let’s start.

  1. Status :: Finished to work at update_upstream_versions and merged with the current auto-tick-bot to run and test, unfortunately there was an import error, so at the first attempt everything failed, (hahaha), but CJ correctly right away and the process run accordingly to expected (at least the addition at the cli.xsh file). Now it remains to add the pool mode to the update_upstream_version and test it at CircleCI (cause it’s not affordable to test the sequential mode as it takes a lot of time to finish it).
  2. Abstract :: So as talked before, the main alteration made was the pool mode to update_upstream_version what consists to divide the work made by the get_new_version function into different threads at multiprocessing and accelerate our method (and jesus it’s amazing how fast it goes, it’s like was like a reduction of 36x the spent time !). Basically we will use the an iterator object called Executor to change the different multiprocessing schemes accordingly to its use:
     def executor(kind: str, max_workers: int, daemon=True) -> typing.Iterator[Executor]:
    """General purpose utility to get an executor with its as_completed handler
    
    This allows us to easily use other executors as needed.
    """
    if kind == "thread":
        with ThreadPoolExecutor(max_workers=max_workers) as pool_t:
            yield pool_t
    elif kind == "process":
        with ProcessPoolExecutor(max_workers=max_workers) as pool_p:
            yield pool_p
    elif kind in ["dask", "dask-process", "dask-thread"]:
        import dask
        import distributed
        from distributed.cfexecutor import ClientExecutor
    
        processes = kind == "dask" or kind == "dask-process"
    
        with dask.config.set({"distributed.worker.daemon": daemon}):
            with distributed.LocalCluster(
                n_workers=max_workers, processes=processes,
            ) as cluster:
                with distributed.Client(cluster) as client:
                    yield ClientExecutor(client)
    else:
        raise NotImplementedError("That kind is not implemented")
    

    this together with concurrent.futures and other project of NumFocus called Dask, wich is an tool for parallel computing that works incredibly with big datasets, grants more stability for the application. That enables the pool feature for update-version:

     with executor(kind="dask", max_workers=20) as pool:
    _all_nodes = [t for t in gx.nodes.items()]
        random.shuffle(_all_nodes)
    
        for node, node_attrs in tqdm.tqdm(_all_nodes):
            with node_attrs["payload"] as attrs:
                if attrs.get("bad") or attrs.get("archived"):
                    attrs["new_version"] = False
                    continue
                # avoid
                if node == "ca-policy-lcg":
                    attrs["new_version"] = False
                    continue
                futures.update(
                    {
                        pool.submit(get_latest_version, node, attrs, sources): (
                            node,
                            attrs,
                        ),
                    },
                )
    

    Where futures.update will return a Future class object, wich one we could say (accordingly to concurrent.future) “The Future class encapsulates the asynchronous execution of a callable”. This way we guarantee that everything occurs smoothly as a submit request for get_latest_version and all the alterations regarding the items will be made using the object _ all_nodes so we will not touch the gx graph at all! Then, after the pool.submit phase start the common procedure with the completed requests:

     ++++
     n_tot = len(futures)
        n_left = len(futures)
        start = time.time()
        # eta :: elapsed time average
        eta = -1
        for f in as_completed(futures):
            up_to = {}
    
            n_left -= 1
            if n_left % 10 == 0:
                eta = (time.time() - start) / (n_tot - n_left) * n_left
    
            node, node_attrs = futures[f]
            with node_attrs as attrs:
                try:
                    new_version = f.result()
                    attrs["new_version"] = new_version or attrs["new_version"]
                except Exception as e:
                    try:
                        se = repr(e)
                    except Exception as ee:
                        se = f"Bad exception string: {ee}"
                    logger.error(
                        "itr % 5d - eta % 5ds: "
                        "Error getting upstream version of %s: %s"
                        % (n_left, eta, node, se),
                    )
                    attrs["bad"] = "Upstream: Error getting upstream version"
                else:
                    logger.info(
                        "itr % 5d - eta % 5ds: %s - %s - %s"
                        % (
                            n_left,
                            eta,
                            node,
                            attrs.get("version", "<no-version>"),
                            attrs["new_version"],
                        ),
                    )
     ++++
    

    We are using the concurrent.futures.as_completed() instance as a tool for enabling us to work directly with all the completed submits done until now, this will give an easy time for futures.result() takes action, as it requires a certain amount of time to be given. Another important change that was made, is the correction for the dump JSON file, which now will be dumped for every available node inside a file called versions.

     ++++
     # writing out file
     with open(f"versions/{node}.json", "w") as outfile:
    up_to[f"{node}"] = {
                        "bad": attrs.get("bad"),
                        "new_version": attrs.get("new_version"),
                    }
    json.dump(up_to, outfile)
     ++++
    

    For handling the two modes of the update_upstream_version function, I’ve received a friendly function that do exactly this, and it also give us a good place to locate the sources.

     def update_upstream_versions(
    gx: nx.DiGraph, sources: Iterable[AbstractSource] = None,
    ) -> None:
    sources = (
        (PyPI(), CRAN(), NPM(), ROSDistro(), RawURL(), Github())
        if sources is None
        else sources
    )
    updater = (
        _update_upstream_versions_sequential
        if CONDA_FORGE_TICK_DEBUG
        else _update_upstream_versions_process_pool
    )
    logger.info("Updating upstream versions")
    updater(gx, sources)
    
  3. Next Steps :: With all these alteration done so far, I need to merge the CircleCI new Head configuration to actually start testing the new features (all of them), the updates come as a new job called run_versions_update. When it is finally done (the versions folder was successfully created and populated with the version files)I can merge the alteration at make_graph to collect these new info and update the graph structure with them, which in return will disable the old version update scheme and toggle for this new one.