"""Classes for odorants, mixtures, chemical orders, etc."""
import base64
import io
import json
import re
import time
import warnings
from collections import OrderedDict
from datetime import datetime
from urllib.error import HTTPError
from urllib.parse import quote
from urllib.request import urlopen
import numpy as np
import pandas as pd
import pubchempy as pcp
from IPython.display import display
from PIL import Image
import quantities as pq
from pyrfume import load_data, logger, tqdm, trange
from pyrfume.physics import mackay
from quantities.constants.statisticalmechanics import R
from typing import Dict
try:
from rdkit import Chem
from rdkit.Chem import Draw, AllChem, SaltRemover
from rdkit import RDLogger
rdkit_logger = RDLogger.logger()
except ImportError:
warnings.warn(
"Parts of mordred and/or rdkit could not be imported; try installing rdkit via conda",
UserWarning,
)
ROOM_TEMP = (22 + 273.15) * pq.Kelvin
ROOM_PRESSURE = 1 * pq.atm
GAS_MOLAR_DENSITY = ROOM_PRESSURE / (R * ROOM_TEMP)
ODORANTS_BASIC_INFO_PATH = "molecules/all-cids-properties.csv"
ODORANT_SOURCES_PATH = "molecules/all-cids.csv"
PUBCHEM_KINDS = ['name', 'smiles', 'inchi', 'inchikey', 'formula', 'sdf', None]
[docs]class Solution:
components: Dict["Compound", pq.quantity.Quantity] = None
date_created: datetime = None
def __init__(self, components: dict, date_created: datetime=None):
self.total_volume = 0 * pq.mL
assert isinstance(components, dict), "Components must be a dict"
for component, volume in components.items():
assert isinstance(
component, (Compound, Solution)
), "Each component must be a Compound or a Solution"
try:
volume = volume.rescale(pq.mL)
except ValueError:
raise ValueError("Components must be provided with volumes")
self.total_volume += volume # Assume that volume is conserved
self.components = components
if not date_created:
date_created = str(datetime.now())[:-7]
self.date_created = date_created if date_created else datetime.now()
@property
def compounds(self):
return self._compounds()
def _compounds(self, result: dict = None):
if result is None:
result = {}
for component, volume in self.components.items():
if isinstance(component, Compound):
if component in result:
result[component] += volume
else:
result[component] = volume
else: # If it is a Solution
component._compounds(result=result)
return result
@property
def molecules(self):
"""Returns a dictionary with the moles of each Molecule"""
compounds = self.compounds
assert all([c.density for c, v in compounds.items() if v and not c.is_solvent]), (
"All non-solvent compounds must have a known density " "in order to compute moles"
)
assert all([c.molecular_weight for c, v in compounds.items() if v and not c.is_solvent]), (
"All non-solvent compounds must have a known molecular weight "
"in order to compute moles"
)
return {
c.molecule: (v * c.molarity).rescale(pq.mol) for c, v in self.compounds.items() if v
}
@property
def molarities(self):
"""Returns a dictionary with the molarity of each Molecule"""
return {m: mol / self.total_volume for m, mol in self.molecules.items() if mol}
@property
def mole_fractions(self):
"""Returns a dictionary with the mole fraction of each Molecule"""
molecules = self.molecules
assert [moles for molecule, moles in molecules.items()], (
"All compounds must have a known number of moles " "in order to compute mole fraction"
)
# A Quantities bug prevents me from simply summing molecules.values()
total_moles = 0 * pq.mol
for moles in molecules.values():
total_moles += moles
return {molecule: moles / total_moles for molecule, moles in molecules.items()}
[docs] def mole_fraction(self, molecule):
return self.mole_fractions[molecule] if molecule in self.mole_fractions else 0
@property
def dilutions(self):
return {
c.molecule: self.total_volume / c.volume for c in self.compounds if not c.is_solvent
}
@property
def partial_pressures(self):
"""Computes partial pressures for each odorant
in the mixture using Raoult's law"""
return {
m: self.mole_fraction(m) * m.vapor_pressure for m in self.molecules if m.vapor_pressure
}
[docs] def partial_pressure(self, molecule):
return self.partial_pressures[molecule]
@property
def total_pressure(self):
"""Computes total pressure of the vapor using Dalton's law"""
preferred_units = pq.Pa
partial_pressures = [pressure.rescale(preferred_units) for pressure in self.partial_pressures.values()]
return preferred_units * np.sum(partial_pressures)
@property
def vapor_concentrations(self):
"""Concentrations of each component in the vapor phase at steady state.
Units are fraction of volume. Air is assumed to make up the balance"""
pp = self.partial_pressures
result = {}
for m, p in pp.items():
ratio = (p / pq.atm).simplified
assert ratio.units == pq.dimensionless
result[m] = float(ratio)
return result
[docs] def vapor_concentration(self, molecule):
return self.vapor_concentrations[molecule]
@property
def molar_evaporation_rates(self):
mf = self.mole_fractions
result = {
molecule: mole_fraction * molecule.molar_evaporation_rate
for molecule, mole_fraction in mf.items()
}
return result
[docs] def molar_evaporation_rate(self, molecule):
return self.molar_evaporation_rates[molecule]
[docs]class Molecule:
def __init__(self, cid: int, name: str=None, fill: bool=False):
self.cid = cid
if fill:
self.fill_details()
if name:
self.name = name
# Integer Chemical ID number (CID) from PubChem
cid: int = 0
# Chemical Abstract Service (CAS) number
cas: str = ""
# Principal name
name: str = ""
# Synonyms
synonyms: str = ""
# IUPAC name (long, unique name)
iupac: str = ""
# Density (pq.g / pq.ml)
density: float = None
# Vapor pressure (pq.Pa)
vapor_pressure: float = None
# Molecular weight (pq.g / pq.mol)
molecular_weight: float = None
@property
def molarity(self):
if not (self.molecular_weight and self.density):
result = None
else:
result = self.density / self.molecular_weight
result = result.rescale(pq.mol / pq.L)
return result
@property
def molar_evaporation_rate(self):
return mackay(self.vapor_pressure)
[docs] def fill_details(self):
assert self.cid is not None
url_template = (
"https://pubchem.ncbi.nlm.nih.gov/" "rest/pug/compound/cid/%d/property/" "%s/JSON"
)
property_list = ["MolecularWeight", "IsomericSMILES"]
url = url_template % (self.cid, ",".join(property_list))
json_data = url_to_json(url)
details = json_data["PropertyTable"]["Properties"][0]
def convert(name):
s1 = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", name)
return re.sub("([a-z0-9])([A-Z])", r"\1_\2", s1).lower()
for key, value in details.items():
if key == "CID":
assert value == self.cid, "REST API CID does not match provided CID"
key = convert(key)
if key == "molecular_weight":
value = float(value) * pq.g / pq.mol
setattr(self, key, value)
if not self.name:
self.name = self.get_name_from_api()
[docs] def get_name_from_api(self):
url_template = "https://pubchem.ncbi.nlm.nih.gov/" "rest/pug/compound/cid/%d/synonyms/JSON"
url = url_template % (self.cid)
json_data = url_to_json(url)
name = None
if json_data:
information = json_data["InformationList"]["Information"][0]
synonyms = information["Synonym"]
name = synonyms[0].lower()
return name
[docs] def get_cid_from_api(self):
url_template = "https://pubchem.ncbi.nlm.nih.gov/" "rest/pug/compound/%s/%s/cids/JSON"
options = [getattr(self, x) for x in ("cas", "name") if len(getattr(self, x))]
cid = None
query = self.name
for option in options:
url = url_template % (option, query)
json_data = url_to_json(url)
cid = json_data["IdentifierList"]["CID"][0]
return cid
def __eq__(self, other):
if self.cid:
return self.cid == other.cid
else:
return self.name == self.name
def __lt__(self, other):
if self.cid:
return self.cid < other.cid
else:
return self.name < self.name
def __hash__(self):
return id(self)
def __repr__(self):
if self.cid and self.name:
result = "%d (%s)" % (self.cid, self.name)
elif self.cid:
result = "%d" % self.cid
elif self.name:
result = "%s" % self.name
else:
result = "Unknown"
return result
[docs]class Vendor:
def __init__(self, name: str, url: str):
self.name = name
self.url = url
name: str = ""
url: str = ""
[docs]class ChemicalOrder:
def __init__(self, molecule: "Molecule", vendor: "Vendor", part_id: str, purity: float=1, known_impurities: list=None):
self.molecule = molecule
self.vendor = vendor
self.part_id = part_id
self.purity = purity
self.known_impurities = known_impurities
# Molecule
molecule: Molecule = None
# Vendor, e.g. Sigma-Aldrich
vendor: Vendor = None
# ID number of compound at vendor
part_id: str = ""
# Reported purity as a fraction
purity: float = 1
# List of known impurities (Molecules)
known_impurities: list = None
[docs]class Compound:
def __init__(
self, chemical_order: ChemicalOrder, stock: str="", date_arrived: datetime=None,
date_opened: datetime=None, is_solvent: bool=False
):
self.chemical_order = chemical_order
self.stock = stock
self.date_arrived = date_arrived if date_arrived else datetime.now
self.date_opened = date_opened
self.is_solvent = is_solvent
# ChemicalOrder
chemical_order: ChemicalOrder = None
# Stock number (supplied by vendor, usually on bottle)
stock: str = ""
# Date arrived at the lab/clinic
date_arrived: datetime = None
# Date opened
date_opened: datetime = None
# Is it a solvent?
is_solvent: bool = False
def __getattr__(self, attr):
"""If no attribute is found, try looking up on the
ChemicalOrder or the Molecule"""
try:
return getattr(self.chemical_order, attr)
except AttributeError:
return getattr(self.chemical_order.molecule, attr)
[docs]def url_to_json(url, verbose=True) -> str:
json_data = None
msgs = []
try:
page = urlopen(url)
string = page.read().decode("utf-8")
json_data = json.loads(string)
except HTTPError:
msgs.append("HTTPError for query '%s'" % url)
if json_data is None and verbose:
for msg in msgs:
print(msg)
return json_data
[docs]def is_kind(identifier: str, kind: str) -> bool:
if kind == 'smiles':
rdkit_logger.setLevel(RDLogger.CRITICAL)
result = Chem.MolFromSmiles(identifier) is not None
rdkit_logger.setLevel(RDLogger.WARNING)
return result
elif kind == 'inchikey':
return len(identifier) == 27 and identifier[14]=='-' and identifier[25]=='-'
elif kind == 'inchi':
return Chem.inchi.MolFromInchi(identifier, logLevel=None) is not None
elif kind == 'name':
return True
else:
return False
[docs]def get_kind(identifier: str):
kinds = [kind for kind in PUBCHEM_KINDS if is_kind(identifier, kind)]
return kinds[-1] # Return most sophisticated kind ('name' will always be in the list)
[docs]def deisomerize_smiles(smiles: str) -> str:
mol = Chem.MolFromSmiles(smiles)
if mol: # If a mol object was successfully create (i.e. not `None`)
smiles = Chem.MolToSmiles(mol, isomericSmiles=False)
else:
smiles = smiles.replace('@','').replace('@@','').replace('/','').replace('\\','')
return smiles
[docs]def canonical_smiles(smiles: str, kekulize: bool = False) -> str:
"""Use rdkit to convert the `smiles` string to canonical form"""
mol = Chem.MolFromSmiles(smiles)
if mol: # If a mol object was successfully create (i.e. not `None`)
if kekulize:
Chem.Kekulize(mol)
smiles = Chem.MolToSmiles(mol, isomericSmiles=True)
else: # No mol object means the `smiles` string was invalid
smiles = ""
return smiles
[docs]def get_cids(
identifiers: list,
kind: str = None,
verbose: bool = True,
wait: float = 0,
results: dict = None,
) -> dict:
"""Return CIDs for molecule based on any synonym,
including a chemical name or a CAS"""
assert kind in PUBCHEM_KINDS
if results is None:
results = {}
p = tqdm(identifiers)
for identifier in p:
#if not isinstance(identifier, str):
# logger.warning("%s is not a string" % identifier)
# continue
p.set_description(str(identifier))
cid = get_cid(identifier, kind=kind, verbose=verbose)
if not cid:
logger.warning("Could not find %s" % identifier)
results[identifier] = cid
if wait:
time.sleep(wait)
return results
[docs]def get_cid(
identifier: str, kind: str = None, verbose: bool = True, fix_smiles_on_error: bool = True, attempt=0
) -> int:
"""
Return data about a molecule from any synonym,
including a chemical name or a CAS.
change_kind: Whether to try other kinds if the search fails.
"""
if isinstance(identifier, float) and np.isnan(identifier):
return 0
replace = [('α', 'alpha'), ('β', 'beta'), ('γ', 'gamma'), ('δ', 'delta')]
for a, b in replace:
identifier = identifier.replace(a, b)
if kind is None:
kind = get_kind(identifier)
else:
kind = kind.lower()
try:
result = pcp.get_cids(identifier, namespace=kind)
except pcp.BadRequestError:
logger.warning('Request Error for "%s"' % identifier)
result = []
except pcp.PubChemHTTPError as e:
if attempt == 0:
import time
time.sleep(10)
return get_cid(identifier,kind,verbose, fix_smiles_on_error, 1)
else:
raise e
if not len(result):
cid = 0
else:
if (len(result) > 1) and verbose:
logger.warning("Multiple CIDs for %s: %s" % (identifier, result))
cid = result[0]
if not cid and kind == "smiles" and fix_smiles_on_error:
# Retry with canonical SMILES
identifier = canonical_smiles(identifier)
if identifier:
cid = get_cid(identifier, kind=kind, verbose=verbose, fix_smiles_on_error=False, change_kind=change_kind)
return cid
[docs]def from_cids(cids: list, property_list: bool = None) -> list:
if property_list is None:
property_list = ["MolecularWeight", "IsomericSMILES", "IUPACName"]
result = []
chunk_size = 100
for start in trange(0, len(cids), chunk_size):
stop = min(start + chunk_size, len(cids))
print("Retrieving %d through %d" % (start, stop - 1))
cid_subset = [
str(x) for x in [cid for cid in cids[start:stop] if int(cid) > 0 and cid is not None]
]
cid_subset = ",".join(cid_subset)
properties_template = (
"https://pubchem.ncbi.nlm.nih.gov/" "rest/pug/compound/cid/%s/property/" "%s/JSON"
)
url = properties_template % (cid_subset, ",".join(property_list))
json_data = url_to_json(url)
data = json_data["PropertyTable"]["Properties"]
synonyms_template = (
"https://pubchem.ncbi.nlm.nih.gov/" "rest/pug/compound/cid/%s/synonyms/JSON"
)
url = synonyms_template % (cid_subset)
json_data = url_to_json(url)
information = json_data["InformationList"]["Information"]
for i, d in enumerate(data):
try:
synonyms = information[i]["Synonym"]
except KeyError:
try:
d["name"] = d["IUPACName"]
except KeyError:
d["name"] = ""
else:
d["name"] = synonyms[0].lower()
result += data
return result
[docs]def cids_to_smiles(cids: list) -> dict:
"""Returns an ordered dictionary of SMILES strings with CIDs as keys"""
info = from_cids(cids, property_list=["IsomericSMILES"])
smiles = {item["CID"]: item["IsomericSMILES"] for item in info}
return smiles
[docs]def cids_to_cas(cids: list) -> OrderedDict:
result = OrderedDict()
chunk_size = 100
for start in trange(0, len(cids), chunk_size):
stop = min(start + chunk_size, len(cids))
# msg = "Retrieving %d through %d" % (start, stop-1)
cid_subset = [str(x) for x in cids[start:stop]]
cid_subset = ",".join(cid_subset)
synonyms_template = (
"https://pubchem.ncbi.nlm.nih.gov/" "rest/pug/compound/cid/%s/synonyms/JSON"
)
url = synonyms_template % (cid_subset)
json_data = url_to_json(url)
information = json_data["InformationList"]["Information"]
for i, info in enumerate(information):
cid = info["CID"]
try:
synonyms = info["Synonym"]
except KeyError:
result[cid] = []
else:
result[cid] = cas_from_synonyms(synonyms)
result
return result
[docs]def cas_from_synonyms(synonyms: list) -> list:
result = []
for s in synonyms:
if re.match(r"^[0-9]+\-[0-9]+\-[0-9]+$", s):
result.append(s)
return result
[docs]def cactus(identifier: str, output: str = "cas") -> str:
url_template = "https://cactus.nci.nih.gov/chemical/structure/%s/%s"
identifier = identifier.replace(' ', '%20')
url = url_template % (identifier, output)
page = urlopen(url)
result = page.read().decode("utf-8")
return result
[docs]def cactus_image(smiles: str) -> None:
url_template = "https://cactus.nci.nih.gov/chemical/structure/%s/image"
smiles = smiles.replace(' ', '%20')
url = url_template % smiles
image_data = urlopen(url).read()
image = Image(image_data)
display(image)
[docs]def get_compound_summary(cid: int, heading: str):
"""Get summary info about `heading` from PubChem for the compound
given by `cid`. Example heading: 'Physical Description'"""
url_template = (
"https://pubchem.ncbi.nlm.nih.gov/" "rest/pug_view/data/compound/%d/JSON?heading=%s"
)
escaped_heading = quote(heading) # Escape the string
url = url_template % (cid, escaped_heading)
json_data = url_to_json(url, verbose=False)
return json_data
[docs]def get_compound_odor(cid, raw=False):
info = []
for heading in ["Odor", "Physical Description"]:
json_data = get_compound_summary(cid, heading)
if raw:
info += [] if json_data is None else [json_data]
else:
info += _parse_odor_info(json_data)
return info
def _parse_odor_info(info, odors=None, any_string=False):
if odors is None:
odors = []
if isinstance(info, dict):
for key, value in info.items():
if key == "TOCHeading" and value == "Odor":
any_string = True
if key == "String" and (any_string or "odor" in value.lower()):
odors.append(value)
else:
_parse_odor_info(value, odors=odors, any_string=any_string)
elif isinstance(info, list):
for value in info:
_parse_odor_info(value, odors=odors, any_string=any_string)
return odors
def _parse_other_info(info, records=None):
if records is None:
records = []
if isinstance(info, dict):
for key, value in info.items():
if key == "String":
records.append(value)
elif key == "Value" and "Number" in value:
records.append(value)
else:
_parse_other_info(value, records=records)
elif isinstance(info, list):
for value in info:
_parse_other_info(value, records=records)
return records
[docs]def display_molecules(molecules: pd.DataFrame, no_of_columns=5, figsize=(15, 15)):
import matplotlib.pyplot as plt
from IPython.display import display
fig = plt.figure(figsize=figsize)
column = 0
for i, (cid, info) in enumerate(molecules.iterrows()):
column += 1
# check for end of column and create a new figure
if column == no_of_columns+1:
fig = plt.figure(figsize=figsize)
column = 1
fig.add_subplot(1, no_of_columns, column)
image = smiles_to_image(info['IsomericSMILES'], png=False)
plt.imshow(image)
plt.axis('off')
plt.title("%d: %s" % (cid, info['name']))
[docs]def embed_molecules(molecules: pd.DataFrame):
import matplotlib.pyplot as plt
plt.figure(figsize=(6, 6))
ax = plt.gca()
embedding = load_data('embedding/pf_umap.pkl')
ax = embedding.plot.scatter(x=0, y=1, alpha=0.05, c='k', ax=ax)
smiles = molecules['IsomericSMILES']
embedding_ = embedding.loc[smiles]
embedding_.plot.scatter(x=0, y=1, alpha=1, c='r', s=100, ax=ax)
ax.set_xlabel('Dimension 1')
ax.set_ylabel('Dimension 2')
[docs]def smiles_to_image(smiles, png=True, b64=False, crop=True, padding=10, size=300):
"""
png: Whether to convert to .png data (or to leave as a PIL image)
b64: Whether to base64 encode (only possible for .png data)
"""
buffer = io.BytesIO()
mol = Chem.MolFromSmiles(smiles)
image = Draw.MolToImage(mol, fitImage=True, size=(size, size))
if crop:
image = crop_image(image, padding=padding)
if png:
image.save(buffer, format="PNG")
image = buffer.getvalue()
if b64:
assert png, "Can only base64 encode PNG data, not a PIL image"
image = base64.b64encode(image).decode("utf8")
return image
[docs]def smiles_to_mol(smiles: list, max_attempts: int=10, use_random_coords: bool=False, deisomerize=False) -> dict:
if deisomerize:
f = deisomerize_smiles
else:
f = lambda x: x
mols_raw = [Chem.MolFromSmiles(f(smi)) for smi in smiles]
logger.info("Computing 3D coordinates...")
s = SaltRemover.SaltRemover()
mols = {}
n = len(mols_raw)
pbar = tqdm(total=n)
for i, mol in enumerate(mols_raw):
pbar.update()
logger.debug("Embedding %s" % smiles[i])
try:
mol = s.StripMol(mol, dontRemoveEverything=True)
mol = Chem.AddHs(mol)
AllChem.Compute2DCoords(mol)
AllChem.EmbedMolecule(mol, maxAttempts=max_attempts, useRandomCoords=use_random_coords)
AllChem.UFFOptimizeMolecule(mol) # Is this deterministic?
except Exception as e:
logger.warning("Exception for %s: %s" % (smiles[i], str(e)))
else:
mols[smiles[i]] = mol
logger.info("Finished embedding all molecules")
return mols
[docs]def crop_image(img, padding=0):
"""Crop white out of a PIL image."""
as_array = np.array(img) # N x N x (r,g,b,a)
if as_array.shape[2] == 4:
as_array[as_array[:, :, 3] == 0] = [255, 255, 255, 255]
has_content = np.sum(as_array, axis=2, dtype=np.uint32) != 255 * 4
xs, ys = np.nonzero(has_content)
x_range = max([min(xs) - padding, 0]), min([max(xs) + padding, as_array.shape[0]])
y_range = max([min(ys) - padding, 0]), min([max(ys) + padding, as_array.shape[1]])
as_array_cropped = as_array[x_range[0] : x_range[1], y_range[0] : y_range[1], 0:3]
img = Image.fromarray(as_array_cropped, mode="RGB")
return img
[docs]def all_odorants():
"""All CIDs, SMILES, Names, and Molecular Weights found in the
file at ODORANTS_BASIC_INFO_PATH"""
df = load_data(ODORANTS_BASIC_INFO_PATH)
df = df.sort_index()
return df
[docs]def all_sources():
"""Whether or not each odorant (by CID) is in each of the data sources"""
df = load_data(ODORANT_SOURCES_PATH)
df = df.sort_index()
return df
[docs]def all_cids():
"""All CIDs found in the file at ODORANTS_BASIC_INFO_PATH"""
df = all_odorants()
return list(df.index)
[docs]def all_smiles():
"""All SMILES found in the file at ODORANTS_BASIC_INFO_PATH.
May contain duplicates (if two CIDs give the same SMILES)"""
df = all_odorants()
return list(df["SMILES"])
if __name__ == "__main__":
x = Molecule(325, fill=True)
print(x.__dict__)