projects
/
python_utils.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Clean up histogram __repr__
[python_utils.git]
/
remote_worker.py
diff --git
a/remote_worker.py
b/remote_worker.py
index 84f8d56fa33318b507f3f11618c663861718182b..b58c6ba0a66f8d32b2b81af72a66d23493c9b2e5 100755
(executable)
--- a/
remote_worker.py
+++ b/
remote_worker.py
@@
-31,35
+31,41
@@
cfg.add_argument(
type=str,
required=True,
metavar='FILENAME',
type=str,
required=True,
metavar='FILENAME',
- help='The location of the bundle of code to execute.'
+ help='The location of the bundle of code to execute.'
,
)
cfg.add_argument(
'--result_file',
type=str,
required=True,
metavar='FILENAME',
)
cfg.add_argument(
'--result_file',
type=str,
required=True,
metavar='FILENAME',
- help='The location where we should write the computation results.'
+ help='The location where we should write the computation results.'
,
)
cfg.add_argument(
'--watch_for_cancel',
action=argparse_utils.ActionNoYes,
)
cfg.add_argument(
'--watch_for_cancel',
action=argparse_utils.ActionNoYes,
- default=
Fals
e,
- help='Should we watch for the cancellation of our parent ssh process?'
+ default=
Tru
e,
+ help='Should we watch for the cancellation of our parent ssh process?'
,
)
@background_thread
def watch_for_cancel(terminate_event: threading.Event) -> None:
)
@background_thread
def watch_for_cancel(terminate_event: threading.Event) -> None:
+ logger.debug('Starting up background thread...')
p = psutil.Process(os.getpid())
while True:
saw_sshd = False
ancestors = p.parents()
for ancestor in ancestors:
name = ancestor.name()
p = psutil.Process(os.getpid())
while True:
saw_sshd = False
ancestors = p.parents()
for ancestor in ancestors:
name = ancestor.name()
+ pid = ancestor.pid
+ logger.debug(f'Ancestor process {name} (pid={pid})')
if 'ssh' in name.lower():
saw_sshd = True
break
if not saw_sshd:
if 'ssh' in name.lower():
saw_sshd = True
break
if not saw_sshd:
+ logger.error(
+ 'Did not see sshd in our ancestors list?! Committing suicide.'
+ )
os.system('pstree')
os.kill(os.getpid(), signal.SIGTERM)
time.sleep(5.0)
os.system('pstree')
os.kill(os.getpid(), signal.SIGTERM)
time.sleep(5.0)
@@
-75,6
+81,10
@@
def main() -> None:
in_file = config.config['code_file']
out_file = config.config['result_file']
in_file = config.config['code_file']
out_file = config.config['result_file']
+ stop_thread = None
+ if config.config['watch_for_cancel']:
+ (thread, stop_thread) = watch_for_cancel()
+
logger.debug(f'Reading {in_file}.')
try:
with open(in_file, 'rb') as rb:
logger.debug(f'Reading {in_file}.')
try:
with open(in_file, 'rb') as rb:
@@
-82,6
+92,7
@@
def main() -> None:
except Exception as e:
logger.exception(e)
logger.critical(f'Problem reading {in_file}. Aborting.')
except Exception as e:
logger.exception(e)
logger.critical(f'Problem reading {in_file}. Aborting.')
+ stop_thread.set()
sys.exit(-1)
logger.debug(f'Deserializing {in_file}.')
sys.exit(-1)
logger.debug(f'Deserializing {in_file}.')
@@
-90,6
+101,7
@@
def main() -> None:
except Exception as e:
logger.exception(e)
logger.critical(f'Problem deserializing {in_file}. Aborting.')
except Exception as e:
logger.exception(e)
logger.critical(f'Problem deserializing {in_file}. Aborting.')
+ stop_thread.set()
sys.exit(-1)
logger.debug('Invoking user code...')
sys.exit(-1)
logger.debug('Invoking user code...')
@@
-104,6
+116,7
@@
def main() -> None:
except Exception as e:
logger.exception(e)
logger.critical(f'Could not serialize result ({type(ret)}). Aborting.')
except Exception as e:
logger.exception(e)
logger.critical(f'Could not serialize result ({type(ret)}). Aborting.')
+ stop_thread.set()
sys.exit(-1)
logger.debug(f'Writing {out_file}.')
sys.exit(-1)
logger.debug(f'Writing {out_file}.')
@@
-113,8
+126,13
@@
def main() -> None:
except Exception as e:
logger.exception(e)
logger.critical(f'Error writing {out_file}. Aborting.')
except Exception as e:
logger.exception(e)
logger.critical(f'Error writing {out_file}. Aborting.')
+ stop_thread.set()
sys.exit(-1)
sys.exit(-1)
+ if stop_thread is not None:
+ stop_thread.set()
+ thread.join()
+
if __name__ == '__main__':
main()
if __name__ == '__main__':
main()