Weekly Update
This report will be a little bit longer than normal cause it will comprehend all the week changes too. So let’s start.
- Status :: Finished to work at
update_upstream_versions
and merged with the currentauto-tick-bot
to run and test, unfortunately there was animport
error, so at the first attempt everything failed, (hahaha), but CJ correctly right away and the process run accordingly to expected (at least the addition at thecli.xsh
file). Now it remains to add thepool
mode to theupdate_upstream_version
and test it at CircleCI (cause it’s not affordable to test thesequential
mode as it takes a lot of time to finish it). - Abstract :: So as talked before, the main alteration made was the
pool
mode toupdate_upstream_version
what consists to divide the work made by theget_new_version
function into different threads atmultiprocessing
and accelerate our method (and jesus it’s amazing how fast it goes, it’s like was like a reduction of 36x the spent time !). Basically we will use the an iterator object calledExecutor
to change the differentmultiprocessing
schemes accordingly to its use:def executor(kind: str, max_workers: int, daemon=True) -> typing.Iterator[Executor]: """General purpose utility to get an executor with its as_completed handler This allows us to easily use other executors as needed. """ if kind == "thread": with ThreadPoolExecutor(max_workers=max_workers) as pool_t: yield pool_t elif kind == "process": with ProcessPoolExecutor(max_workers=max_workers) as pool_p: yield pool_p elif kind in ["dask", "dask-process", "dask-thread"]: import dask import distributed from distributed.cfexecutor import ClientExecutor processes = kind == "dask" or kind == "dask-process" with dask.config.set({"distributed.worker.daemon": daemon}): with distributed.LocalCluster( n_workers=max_workers, processes=processes, ) as cluster: with distributed.Client(cluster) as client: yield ClientExecutor(client) else: raise NotImplementedError("That kind is not implemented")
this together with
concurrent.futures
and other project of NumFocus called Dask, wich is an tool for parallel computing that works incredibly with big datasets, grants more stability for the application. That enables the pool feature for update-version:with executor(kind="dask", max_workers=20) as pool: _all_nodes = [t for t in gx.nodes.items()] random.shuffle(_all_nodes) for node, node_attrs in tqdm.tqdm(_all_nodes): with node_attrs["payload"] as attrs: if attrs.get("bad") or attrs.get("archived"): attrs["new_version"] = False continue # avoid if node == "ca-policy-lcg": attrs["new_version"] = False continue futures.update( { pool.submit(get_latest_version, node, attrs, sources): ( node, attrs, ), }, )
Where
futures.update
will return a Future class object, wich one we could say (accordingly toconcurrent.future
) “The Future class encapsulates the asynchronous execution of a callable”. This way we guarantee that everything occurs smoothly as a submit request forget_latest_version
and all the alterations regarding theitems
will be made using the object_ all_nodes
so we will not touch thegx
graph at all! Then, after thepool.submit
phase start the common procedure with the completed requests:++++ n_tot = len(futures) n_left = len(futures) start = time.time() # eta :: elapsed time average eta = -1 for f in as_completed(futures): up_to = {} n_left -= 1 if n_left % 10 == 0: eta = (time.time() - start) / (n_tot - n_left) * n_left node, node_attrs = futures[f] with node_attrs as attrs: try: new_version = f.result() attrs["new_version"] = new_version or attrs["new_version"] except Exception as e: try: se = repr(e) except Exception as ee: se = f"Bad exception string: {ee}" logger.error( "itr % 5d - eta % 5ds: " "Error getting upstream version of %s: %s" % (n_left, eta, node, se), ) attrs["bad"] = "Upstream: Error getting upstream version" else: logger.info( "itr % 5d - eta % 5ds: %s - %s - %s" % ( n_left, eta, node, attrs.get("version", "<no-version>"), attrs["new_version"], ), ) ++++
We are using the
concurrent.futures.as_completed()
instance as a tool for enabling us to work directly with all the completed submits done until now, this will give an easy time forfutures.result()
takes action, as it requires a certain amount of time to be given. Another important change that was made, is the correction for the dumpJSON
file, which now will be dumped for every available node inside a file calledversions
.++++ # writing out file with open(f"versions/{node}.json", "w") as outfile: up_to[f"{node}"] = { "bad": attrs.get("bad"), "new_version": attrs.get("new_version"), } json.dump(up_to, outfile) ++++
For handling the two modes of the
update_upstream_version
function, I’ve received a friendly function that do exactly this, and it also give us a good place to locate thesources
.def update_upstream_versions( gx: nx.DiGraph, sources: Iterable[AbstractSource] = None, ) -> None: sources = ( (PyPI(), CRAN(), NPM(), ROSDistro(), RawURL(), Github()) if sources is None else sources ) updater = ( _update_upstream_versions_sequential if CONDA_FORGE_TICK_DEBUG else _update_upstream_versions_process_pool ) logger.info("Updating upstream versions") updater(gx, sources)
- Next Steps :: With all these alteration done so far, I need to merge the CircleCI new Head configuration to actually start testing the new features (all of them), the updates come as a new job called
run_versions_update
. When it is finally done (the versions folder was successfully created and populated with the version files)I can merge the alteration atmake_graph
to collect these new info and update the graph structure with them, which in return will disable the old version update scheme and toggle for this new one.