+ def __init__(self):
+ # Has our parse() method been invoked yet?
+ self.config_parse_called = False
+
+ # A configuration dictionary that will contain parsed
+ # arguments. This is the data that is most interesting to our
+ # callers as it will hold the configuration result.
+ self.config: Dict[str, Any] = {}
+
+ # Defer logging messages until later when logging has been
+ # initialized.
+ self.saved_messages: List[str] = []
+
+ # A zookeeper client that is lazily created so as to not incur
+ # the latency of connecting to zookeeper for programs that are
+ # not reading or writing their config data into zookeeper.
+ self.zk: Optional[KazooClient] = None
+
+ # Per known zk file, what is the max version we have seen?
+ self.max_version: Dict[str, int] = {}
+
+ def __getitem__(self, key: str) -> Optional[Any]:
+ """If someone uses []'s on us, pass it onto self.config."""
+ return self.config.get(key, None)
+
+ def __setitem__(self, key: str, value: Any) -> None:
+ self.config[key] = value
+
+ def __contains__(self, key: str) -> bool:
+ return key in self.config
+
+ def get(self, key: str, default: Any = None) -> Optional[Any]:
+ return self.config.get(key, default)
+
+ @staticmethod
+ def add_commandline_args(title: str, description: str = "") -> argparse._ArgumentGroup:
+ """Create a new context for arguments and return a handle.
+
+ Args:
+ title: A title for your module's commandline arguments group.
+ description: A helpful description of your module.
+
+ Returns:
+ An argparse._ArgumentGroup to be populated by the caller.
+ """
+ return ARGS.add_argument_group(title, description)
+
+ @staticmethod
+ def overwrite_argparse_epilog(msg: str) -> None:
+ """Allows your code to override the default epilog created by
+ argparse.
+
+ Args:
+ msg: The epilog message to substitute for the default.
+ """
+ ARGS.epilog = msg
+
+ @staticmethod
+ def is_flag_already_in_argv(var: str) -> bool:
+ """Returns true if a particular flag is passed on the commandline
+ and false otherwise.
+
+ Args:
+ var: The flag to search for.
+ """
+ for _ in sys.argv:
+ if var in _:
+ return True
+ return False
+
+ @staticmethod
+ def print_usage() -> None:
+ """Prints the normal help usage message out."""
+ ARGS.print_help()
+
+ @staticmethod
+ def usage() -> str:
+ """
+ Returns:
+ program usage help text as a string.
+ """
+ return ARGS.format_usage()
+
+ @staticmethod
+ def _reorder_arg_action_groups_before_help(entry_module: Optional[str]):
+ """Internal. Used to reorder the arguments before dumping out a
+ generated help string such that the main program's arguments come
+ last.
+
+ """
+ reordered_action_groups = []
+ for grp in ARGS._action_groups:
+ if entry_module is not None and entry_module in grp.title: # type: ignore
+ reordered_action_groups.append(grp)
+ elif PROGRAM_NAME in GROUP.title: # type: ignore
+ reordered_action_groups.append(grp)
+ else:
+ reordered_action_groups.insert(0, grp)
+ return reordered_action_groups
+
+ @staticmethod
+ def _parse_arg_into_env(arg: str) -> Optional[Tuple[str, str, List[str]]]:
+ """Internal helper to parse commandling args into environment vars."""
+ arg = arg.strip()
+ if not arg.startswith('['):
+ return None
+ arg = arg.strip('[')
+ if not arg.endswith(']'):
+ return None
+ arg = arg.strip(']')
+
+ chunks = arg.split()
+ if len(chunks) > 1:
+ var = chunks[0]
+ else:
+ var = arg
+
+ # Environment vars the same as flag names without
+ # the initial -'s and in UPPERCASE.
+ env = var.upper()
+ while env[0] == '-':
+ env = env[1:]
+ return var, env, chunks
+
+ def _augment_sys_argv_from_environment_variables(self):
+ """Internal. Look at the system environment for variables that match
+ commandline arg names. This is done via some munging such that:
+
+ :code:`--argument_to_match`
+
+ ...is matched by:
+
+ :code:`ARGUMENT_TO_MATCH`
+
+ This allows users to set args via shell environment variables
+ in lieu of passing them on the cmdline.
+
+ """
+ usage_message = Config.usage()
+ optional = False
+ arg = ''
+
+ # Foreach valid optional commandline option (chunk) generate
+ # its analogous environment variable.
+ for chunk in usage_message.split():
+ if chunk[0] == '[':
+ optional = True
+ if optional:
+ arg += f'{chunk} '
+ if chunk[-1] == ']':
+ optional = False
+ _ = Config._parse_arg_into_env(arg)
+ if _:
+ var, env, chunks = _
+ if env in os.environ:
+ if not Config.is_flag_already_in_argv(var):
+ value = os.environ[env]
+ self.saved_messages.append(
+ f'Initialized from environment: {var} = {value}'
+ )
+ from string_utils import to_bool
+
+ if len(chunks) == 1 and to_bool(value):
+ sys.argv.append(var)
+ elif len(chunks) > 1:
+ sys.argv.append(var)
+ sys.argv.append(value)
+ arg = ''
+
+ def _process_dynamic_args(self, event: WatchedEvent):
+ assert self.zk
+ logger = logging.getLogger(__name__)
+ contents, meta = self.zk.get(event.path, watch=self._process_dynamic_args)
+ logger.debug('Update for %s at version=%d.', event.path, meta.version)
+ logger.debug(
+ 'Max known version for %s is %d.', event.path, self.max_version.get(event.path, 0)
+ )
+ if meta.version > self.max_version.get(event.path, 0):
+ self.max_version[event.path] = meta.version
+ contents = contents.decode()
+ temp_argv = []
+ for arg in contents.split():
+ if 'dynamic' in arg:
+ temp_argv.append(arg)
+ logger.info("Updating %s from zookeeper async config change.", arg)
+ if len(temp_argv) > 0:
+ old_argv = sys.argv
+ sys.argv = temp_argv
+ known, _ = ARGS.parse_known_args()
+ sys.argv = old_argv
+ self.config.update(vars(known))
+
+ def _augment_sys_argv_from_loadfile(self):
+ """Internal. Augment with arguments persisted in a saved file."""
+
+ loadfile = None
+ saw_other_args = False
+ grab_next_arg = False
+ for arg in sys.argv[1:]:
+ if 'config_loadfile' in arg:
+ pieces = arg.split('=')
+ if len(pieces) > 1:
+ loadfile = pieces[1]
+ else:
+ grab_next_arg = True
+ elif grab_next_arg:
+ loadfile = arg
+ else:
+ saw_other_args = True
+
+ if loadfile is not None:
+ zkpath = None
+ if loadfile[:3] == 'zk:':
+ try:
+ if self.zk is None:
+ self.zk = KazooClient(
+ hosts=scott_secrets.ZOOKEEPER_NODES,
+ use_ssl=True,
+ verify_certs=False,
+ keyfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
+ keyfile_password=scott_secrets.ZOOKEEPER_CLIENT_PASS,
+ certfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
+ )
+ self.zk.start()
+ zkpath = loadfile[3:]
+ if not zkpath.startswith('/config/'):
+ zkpath = '/config/' + zkpath
+ zkpath = re.sub(r'//+', '/', zkpath)
+ if not self.zk.exists(zkpath):
+ raise Exception(
+ f'ERROR: --config_loadfile argument must be a file, {loadfile} not found (in zookeeper)'
+ )
+ except Exception as e:
+ raise Exception(
+ f'ERROR: Error talking with zookeeper while looking for {loadfile}'
+ ) from e
+ elif not os.path.exists(loadfile):
+ raise Exception(
+ f'ERROR: --config_loadfile argument must be a file, {loadfile} not found.'
+ )
+
+ if saw_other_args:
+ msg = f'Augmenting commandline arguments with those from {loadfile}.'
+ else:
+ msg = f'Reading commandline arguments from {loadfile}.'
+ print(msg, file=sys.stderr)
+ self.saved_messages.append(msg)
+
+ newargs = []
+ if zkpath:
+ try:
+ assert self.zk
+ contents, meta = self.zk.get(zkpath, watch=self._process_dynamic_args)
+ contents = contents.decode()
+ newargs = [
+ arg.strip('\n')
+ for arg in contents.split('\n')
+ if 'config_savefile' not in arg
+ ]
+ self.saved_messages.append(f'Setting {zkpath}\'s max_version to {meta.version}')
+ self.max_version[zkpath] = meta.version
+ except Exception as e:
+ raise Exception(f'Error reading {zkpath} from zookeeper.') from e
+ self.saved_messages.append(f'Loaded config from zookeeper from {zkpath}')
+ else:
+ with open(loadfile, 'r') as rf:
+ newargs = rf.readlines()
+ newargs = [arg.strip('\n') for arg in newargs if 'config_savefile' not in arg]
+ sys.argv += newargs
+
+ def dump_config(self):
+ """Print the current config to stdout."""
+ print("Global Configuration:", file=sys.stderr)
+ pprint.pprint(self.config, stream=sys.stderr)
+ print()
+
+ def parse(self, entry_module: Optional[str]) -> Dict[str, Any]:
+ """Main program should call this early in main(). Note that the
+ :code:`bootstrap.initialize` wrapper takes care of this automatically.
+ This should only be called once per program invocation.
+
+ """
+ if self.config_parse_called:
+ return self.config
+
+ # If we're about to do the usage message dump, put the main
+ # module's argument group last in the list (if possible) so that
+ # when the user passes -h or --help, it will be visible on the
+ # screen w/o scrolling.
+ for arg in sys.argv:
+ if arg in ('--help', '-h'):
+ if entry_module is not None:
+ entry_module = os.path.basename(entry_module)
+ ARGS._action_groups = Config._reorder_arg_action_groups_before_help(entry_module)
+
+ # Examine the environment for variables that match known flags.
+ # For a flag called --example_flag the corresponding environment
+ # variable would be called EXAMPLE_FLAG. If found, hackily add
+ # these into sys.argv to be parsed.
+ self._augment_sys_argv_from_environment_variables()
+
+ # Look for loadfile and read/parse it if present. This also
+ # works by jamming these values onto sys.argv.
+ self._augment_sys_argv_from_loadfile()
+
+ # Parse (possibly augmented, possibly completely overwritten)
+ # commandline args with argparse normally and populate config.
+ known, unknown = ARGS.parse_known_args()
+ self.config.update(vars(known))
+
+ # Reconstruct the argv with unrecognized flags for the benefit of
+ # future argument parsers. For example, unittest_main in python
+ # has some of its own flags. If we didn't recognize it, maybe
+ # someone else will.
+ if len(unknown) > 0:
+ if config['config_rejects_unrecognized_arguments']:
+ raise Exception(
+ f'Encountered unrecognized config argument(s) {unknown} with --config_rejects_unrecognized_arguments enabled; halting.'
+ )
+ self.saved_messages.append(
+ f'Config encountered unrecognized commandline arguments: {unknown}'
+ )
+ sys.argv = sys.argv[:1] + unknown
+
+ # Check for savefile and populate it if requested.
+ savefile = config['config_savefile']
+ if savefile and len(savefile) > 0:
+ data = '\n'.join(ORIG_ARGV[1:])
+ if savefile[:3] == 'zk:':
+ zkpath = savefile[3:]
+ if not zkpath.startswith('/config/'):
+ zkpath = '/config/' + zkpath
+ zkpath = re.sub(r'//+', '/', zkpath)
+ try:
+ if not self.zk:
+ self.zk = KazooClient(
+ hosts=scott_secrets.ZOOKEEPER_NODES,
+ use_ssl=True,
+ verify_certs=False,
+ keyfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
+ keyfile_password=scott_secrets.ZOOKEEPER_CLIENT_PASS,
+ certfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
+ )
+ self.zk.start()
+ data = data.encode()
+ if len(data) > 1024 * 1024:
+ raise Exception(f'Saved args are too large! ({len(data)} bytes)')
+ if not self.zk.exists(zkpath):
+ self.zk.create(zkpath, data)
+ self.saved_messages.append(
+ f'Just created {zkpath}; setting its max_version to 0'
+ )
+ self.max_version[zkpath] = 0
+ else:
+ meta = self.zk.set(zkpath, data)
+ self.saved_messages.append(
+ f'Setting {zkpath}\'s max_version to {meta.version}'
+ )
+ self.max_version[zkpath] = meta.version
+ except Exception as e:
+ raise Exception(f'Failed to create zookeeper path {zkpath}') from e
+ self.saved_messages.append(f'Saved config to zookeeper in {zkpath}')
+ else:
+ with open(savefile, 'w') as wf:
+ wf.write(data)