"""
try:
queue.put((True, function(*args, **kwargs)))
- except:
+ except Exception:
queue.put((False, sys.exc_info()[1]))
*,
do_signal_cleanup: bool = True,
expiration_timestamp: Optional[float] = None,
+ override_command: Optional[str] = None,
) -> None:
self.is_locked = False
self.lockfile = lockfile_path
+ self.override_command = override_command
if do_signal_cleanup:
signal.signal(signal.SIGINT, self._signal)
signal.signal(signal.SIGTERM, self._signal)
self.release()
def _get_lockfile_contents(self) -> str:
+ if self.override_command:
+ cmd = self.override_command
+ else:
+ cmd = ' '.join(sys.argv)
contents = LockFileContents(
pid = os.getpid(),
- commandline = ' '.join(sys.argv),
- expiration_timestamp = self.expiration_timestamp
+ cmd,
+ expiration_timestamp = self.expiration_timestamp,
)
return json.dumps(contents.__dict__)
centcount = int(float(chunk) * 100.0)
elif CentCount.CURRENCY_RE.match(chunk) is not None:
currency = chunk
- except:
+ except Exception:
pass
if centcount is not None and currency is not None:
return (centcount, currency)
def __init__ (
self,
- amount: Decimal = Decimal("0.0"),
+ amount: Decimal = Decimal("0"),
currency: str = 'USD',
*,
strict_mode = False
amount = Decimal(chunk)
elif Money.CURRENCY_RE.match(chunk) is not None:
currency = chunk
- except:
+ except Exception:
pass
if amount is not None and currency is not None:
return (amount, currency)