Source code for npfc.load

"""
Module load
============

A module for loading files in different formats into DataFrames.
"""

# standard
import logging
import gzip
from pathlib import Path
# data science
import pandas as pd
from pandas import DataFrame
# chemoinformatics
from rdkit import Chem
# dev
from npfc import utils


# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ GLOBALS ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ #


from npfc.utils import COLUMN_IDM
from npfc.utils import COLUMNS_MOL
from npfc.utils import COLUMNS_ENCODED
from npfc.utils import FORMATS_IO
# from npfc.utils import FORMATS_CONFIG

CONVERTERS = {'molblock': lambda x: Chem.MolFromMolBlock(x),
              'smiles': lambda x: Chem.MolFromSmiles(x),
              'rdkit': lambda x: x,  # nothing to do here, but removes the needs for more if/elif
              }


# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ FUNCTIONS ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ #


def _from_sdf(input_sdf: str, col_mol: str = 'mol'):
    """
    Parses a SDF and load molecules into a DataFrame.
    Contrarly to PandasTools.LoadSDF function, empty molecules are not silently
    filtered out, so their properties can be accessed for better error tracking.

    :param input_sdf: input SDF
    :param col_mol: column name with the RDKit Mol objects
    :param compression: 
    """
    # determine compression
    compression = None
    if str(input_sdf).endswith('.gz'):
        compression = 'gzip'
    
    # create two file handlers, one for mols and one for properties (raw)
    if compression is None:
        logging.debug("Read sdf from uncompressed file")
        FH_mols = open(input_sdf, 'rb')
        FH_props = open(input_sdf, 'rb')
    elif compression == 'gzip':
        logging.debug("Read sdf from compressed file (%s)", compression)
        FH_mols = gzip.open(input_sdf)
        FH_props = gzip.open(input_sdf)
    else:
        raise ValueError(f"Error! Unknown compression type for SDF: '{compression}'")
    # init
    i = 0
    rows = []
    row_idx = []
    # double iteration over sdf but generators so it should be ok memory-wise
    for mol, mol_raw in zip(Chem.ForwardSDMolSupplier(FH_mols), Chem.ForwardSDMolSupplier(FH_props, sanitize=False)):
        try:
            # properties
            row = dict((k, mol_raw.GetProp(k)) for k in mol_raw.GetPropNames())
            # molecule title
            row['_Name'] = mol_raw.GetProp('_Name')
            # mol
            row[col_mol] = mol
        except AttributeError:
            logging.warning(f"Molecule #{i} could not be parsed and was skipped!")
            row = None
        # record entry
        if row is not None:
            rows.append(row)
            row_idx.append(i)
        i += 1

    # clean-up
    FH_mols.close()
    FH_props.close()

    return DataFrame(rows, index=row_idx)


[docs]def file(input_file: str, col_mol: str = None, col_idm: str = None, mol_format: str = 'rdkit', keep_props: bool = True, decode: bool = True, csv_sep = '|', ): """Load a file into a DataFrame. :param input_file: the input file to load :param col_idm: the column/property to use for molecule ids. If left by default and no idm col is found, then _Name is used instead. If this property is not set, then a sequential idm will be generated (MOL_0000001, etc.). :param col_mol: the column to use for molecules (irrerlevant for SDF) :param csv_sep: the column separator to use for parsing the input file (CSV) :param mol_format: the input format for molecules :param out_id: the column name used for storing molecule ids :param out_mol: the column name used for storing molecules :param keep_props: keep all properties found in the input file. If False, then only out_id and out_mol are kept. :param decode: decode base64 strings into objects. Columns with encoded objects are labelled with a leading '_'. For molecules, reserved names are 'mol' and 'mol_frag'. :return: a DataFrame ..warning:: if a 'idm' property exists in the input file but the user picks another property for in_id, the pre-existing 'idm' will be renamed into 'idm.1' (and overwritten if already present). """ # init out_id = 'idm' out_mol = 'mol' # check arguments # TODO: merge all check_arg_input files/config etc. into one function with a type parameter. utils.check_arg_input_file(input_file) # path_input_file = Path(input_file) format, compression = utils.get_file_format(input_file) if format not in FORMATS_IO and format not in FORM: ############ raise ValueError("Error! Unsupported format for input file '{input_file}' ('{format}').") logging.debug("Loading file '%s' (format=%s, compression=%s)", input_file, format, compression) # read mols if format == 'SDF': df = _from_sdf(input_file) elif format == 'HDF': df = pd.read_hdf(input_file) #, key=path_input_file.stem) # does not work anymore if pytables is not used? elif format == 'CSV': df = pd.read_csv(input_file, sep=csv_sep) else: raise ValueError("Error! Unsupported format for file '{input_file}' ('{format}')") # process data # logging.debug("EXCERPT OF PARSED DATA BEFORE MODIFICATION:\n%s", df.head(3)) # automatically use idm and mol columns, when possible, until stated otherwise if col_idm is None: if 'idm' in df.columns: col_idm = 'idm' elif format == 'SDF': col_idm = '_Name' if col_mol is None and 'mol' in df.columns: col_mol = 'mol' # back up idm col if already existing in props but not used as idm if keep_props and col_idm != out_id and out_id in df.columns: df = df.rename({out_id: f"{out_id}.1"}, axis=1) logging.warning("Warning! Reserved name col_idm '%s' is already present in DF, renaming it to '%s' to avoid overwriting column.", out_id, f"{out_id}.1") # set idm/mol if col_idm is not None: df['idm'] = df[col_idm] if 'idm' in df.columns: df['idm'] = df['idm'].astype(str) # ids should never be numbers, everything is simpler as str to compare between datasets, etc. if col_mol is not None and col_mol != 'mol': df['mol'] = df[col_mol] # keep_props if not keep_props: df = df.drop([c for c in df.columns if c not in (out_id, out_mol)], axis=1) # mol_format: create RDKit Mol objects for SMILEs and MolBlocks, but leave (encoded) RDKit as found if out_mol in df.columns: df[out_mol] = df[out_mol].map(CONVERTERS[mol_format]) # decode if decode: # handle mols differently becasue it is faster this way # alread in Mol format for SDF files cols_to_ignore = [] # in case of SDF, mols are already RDKit Mol objects if format == 'SDF': cols_to_ignore.append('mol') # decode mols cols_to_decode = [c for c in COLUMNS_MOL if c not in cols_to_ignore and c in df.columns] logging.debug("COLUMNS WITH MOLECULES TO DECODE: %s", ','.join(cols_to_decode)) for col in cols_to_decode: df[col] = df[col].map(utils.decode_mol) # decode other objects cols_to_decode = [c for c in COLUMNS_ENCODED if c in df.columns] logging.debug("COLUMNS WITH OBJECTS TO DECODE: %s", ','.join(cols_to_decode)) for col in cols_to_decode: df[col] = df[col].map(utils.decode_object) logging.debug("First 3 rows loaded:\n\n%s\n", df.head(3)) return df
[docs]def count_mols(input_file: str, buffer_size: int = 10240, keep_uncompressed: bool = False): """ Count the number of molecules in an input file. The method varies depending on the format: - SDF: count the $$$$ pattern - CSV: count the number of lines, minus 1 for column headers - HDF: load file into memory using Pandas and then count number of rows - PARQUET: not implemented yet This function is optmized for memory, so it should handle very large files, apart from HDF files. :param input_file: input file :param buffer_size: buffer size in bytes to use for scanning the input text file (SDF and CSV). Default is 10Mb. :param keep_uncompressed: in case of gzip file, leave the uncompressed file after execution :return: counf of molecules """ def _count_generator(reader, buffer_size): """Use a generator it iterate over the raw data for larger files: https://pynative.com/python-count-number-of-lines-in-file/ """ b = reader(buffer_size * buffer_size) while b: yield b b = reader(buffer_size * buffer_size) # check arguments utils.check_arg_input_file(input_file) path_input_file = Path(input_file) format, compression = utils.get_file_format(input_file) if format not in FORMATS_IO: raise ValueError("Error! Unsupported format for input file '{input_file}' ('{format}').") # in case of compressed file, uncompress it if compression == 'gzip': uncompressed_file = input_file.split('.gz')[0] with open(uncompressed_file, 'wb') as uncompressed, gzip.open(input_file, 'rb') as compressed: bindata = compressed.read() uncompressed.write(bindata) input_file = uncompressed_file # count mols with open(input_file, 'rb') as FH: # create an iterator c_generator = _count_generator(FH.raw.read, buffer_size) # apply pattern to count molecules if format == 'SDF': count = sum(buffer.count(b'$$$$') for buffer in c_generator) elif format == 'CSV': count = sum(buffer.count(b'\n') for buffer in c_generator) - 1 # ignore headers elif format == 'HDF': count = len(file(input_file)) elif format == 'PARQUET': count = -1 logging.warning("PARQUET FORMAT NOT IMPLEMENTED YET") else: raise ValueError("Error! Unsupported format for input file '{input_file}' ('{format}').") # remove uncompressed file if compression == 'gzip' and not keep_uncompressed: Path(uncompressed_file).unlink() return count