Source code for npfc.save

"""
Module save
================

A module containing the Saver class, used for storing DataFrames with molecules
on disk.
"""

# standard
import logging
import gzip
import shutil
import os
import numpy as np
import time
from pathlib import Path
from random import random
# data science
import pandas as pd
from pandas import DataFrame
from pandas import HDFStore
# chemoinformatics
from rdkit import Chem
from rdkit.Chem import SDWriter
# docs
from typing import List
# dev
from npfc import utils


# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ FUNCTIONS ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ #


"""
Module save
===========

A module for saving DataFrames into files in different formats.
"""

# standard
import logging
import gzip
import shutil
import numpy as np
from pathlib import Path
# data science
from pandas import DataFrame
# chemoinformatics
from rdkit import Chem
from rdkit.Chem import SDWriter
# docs
from typing import List
from typing import Tuple
from typing import Union

# dev
from npfc import utils


# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ GLOBALS  ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ #


from npfc.utils import FORMATS_IO
from npfc.utils import COLUMNS_MOL
from npfc.utils import COLUMNS_ENCODED


# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ FUNCTIONS ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ #


def _save_sdf(df: DataFrame,
              output_file: Union[str, Path],
              col_mol: str,
              col_idm: str,
              ) -> Tuple[str, int]:
    """
    Save molecules in input DF as a SDF, using RDKit.

    In case of the desired output file is an archive (i.e. file.sdf.gz), an uncompressed file will first
    be written (i.e. file.sdf), which may overwrite any pre-existing file with the same path.

    :param df: input DataFrame
    :param output_file: output SDF path
    :param col_mol: column with the RDKit Mol objects
    :param col_idm: column with the molecule ids, if not None, info is saved as property and as molecule title
    :return: a tuple containing the output file path and the number of saved molecules
    """
    def _format_value(value):
        """
        Helper function to format float values for export to avoid
        awkward scientific notation or flaots without trailing 0, i.e. '7.'.
        
        :param value: the value to format
        :return: the formatted value
        """
        # floats
        if np.issubdtype(type(value), np.floating):
            s = '{:f}'.format(value).rstrip("0")  # remove long trailing 0s
            if s[-1] == ".":
                s += "0"  # put the "0" back on if the result is something like "7."
            return s
        
        # others
        else:
            return str(value)

    # init
    output_file = str(output_file)
    path_output_file = Path(output_file)
    path_output_file.parent.mkdir(parents=True, exist_ok=True)
    format, compression = utils.get_file_format(output_file)

    # check input
    if format != 'SDF':
        raise ValueError("Error! Input file '{input_sdf}' has a format different from 'SDF' ('{format}')")

    # in case output file has to be compressed, remove the .gz from the filename for the output file for now
    if compression == 'gzip':
        output_archive = output_file
        output_file = str(path_output_file.parent / path_output_file.stem)
        path_output_file = Path(output_file)

    # format properties (floats)
    properties = [x for x in df.columns if x != col_mol]
    for p in properties:
        df[p] = df[p].map(_format_value)

    # init writer
    writer = SDWriter(output_file)
    writer.SetProps(properties)

    # export molecule to SDF
    for rowid, row in df.iterrows():
        # copy mol so it can be edited without consequence
        mol = Chem.Mol(row[col_mol])
        # set molecule title
        if col_idm is not None:
            mol.SetProp('_Name', str(row[col_idm]))
        else:
            mol.SetProp('_Name', '')
        # set properties
        for p in properties:
            mol.SetProp(p, row[p])
        # save mol
        writer.write(mol)
    
    # end the writing of mols
    writer.close()

    # compress the now-closed output file
    if compression == 'gzip':
        with open(output_file, 'rb') as OUTPUT:
            with gzip.open(output_archive, 'wb') as ARCHIVE:
                shutil.copyfileobj(OUTPUT, ARCHIVE)
            # delete the uncompressed file as it is only a byproduct
            Path(output_file).unlink()
            # consider output file to be the archive for return value 
            output_file = output_archive

    return output_file, len(df)


[docs]def file(df: DataFrame, output_file: Union[str, Path], col_mol: str = 'mol', col_idm: str = 'idm', csv_sep: str = '|', encode: bool = True, ): """ Save an input DataFrame into a single file. :param df: input DataFrame :param output_file: output file path :param col_mol: for SDF format only, column with the RDKit Mol objects :param col_idm: for SDF format only, column with the molecule ids, if not None, info is saved as property and as molecule title :param csv_sep: for CSV format only, delimiter to use :param encode: encode mols and objects into base64 strings based on predefined column names. :return: a tuple containing the output file name and its number of records """ # check some arguments utils.check_arg_output_file(output_file) format, compression = utils.get_file_format(output_file) if format not in FORMATS_IO: raise ValueError(f"Error! Output file format is unknown ('{format}'). Authorized formats are: {', '.join(FORMATS_IO)}.") # init logging.debug("Excerpt of the data as provided to save.file function:\n\n%s\n", df.head(5)) path_output_file = Path(output_file) # avoid pandas warnings df = df.copy() # encode predefined data if encode: cols_to_encode = [x for x in COLUMNS_ENCODED if x in df.columns] cols_mol = [x for x in COLUMNS_MOL if x in df.columns] else: cols_to_encode = [] cols_mol = [] # other objects for c in cols_to_encode: df[c] = df[c].map(utils.encode_object) if format == 'SDF': # encode all mols except the 'mol' column if len(cols_mol) > 1: cols_mol_to_encode = [x for x in cols_mol if x != col_mol] logging.warning("More than 1 RDKit Mol objects columns detected, using CTAB from column '%s' and encoding the rest: %s", col_mol, ','.join(cols_mol_to_encode)) for c in cols_mol_to_encode: df[c] = df[c].map(utils.encode_mol) return _save_sdf(df, output_file, col_mol, col_idm) else: # CSV/HDF # RDKit Mol objects for c in cols_mol: print(f"save.file: encoding column {c}") df[c] = df[c].map(utils.encode_mol) # export to other formats if format == 'CSV': df.to_csv(output_file, index=False, sep=csv_sep) elif format == 'HDF': key = path_output_file.stem.split('.')[0] df.to_hdf(output_file, key=key) else: raise ValueError("Error! Output file format '{format}' is not supported.") return (output_file, len(df))
[docs]def chunk(df: DataFrame, chunk_name_template: str, chunk_size: int, shuffle: bool = False, random_seed: int = None, col_mol: str = 'mol', col_idm: str = 'idm', csv_sep: str = '|', encode : bool = True, ) -> List[Tuple[str, int]]: """ Save an input DataFrame into several chunks, written on disk. The input data has to be converted as a whole to a DF first. :param df: input DataFrame :param chunk_name_template: path of the output file if there was only one (i.e. dir/file.csv). Is modified to add chunk IDs (i.e. dir/file_001.csv, dir/file_002.csv, etc.). :param chunk_size: number of record for each chunk (last chunk might contain less) :param shuffle: shuffle the records before splitting into chunks :param random_seed: random seed to use for shuffling records :param col_mol: for SDF format only, column with the RDKit Mol objects :param col_idm: for SDF format only, column with the molecule ids, if not None, info is saved as property and as molecule title :param csv_sep: for CSV format only, delimiter to use :param encode: encode RDKit Mol and other predefined objects into base64 strings. :return: a list of tuples containing each chunk name and its number of records """ # check arguments if not isinstance(chunk_size, int) and chunk_size < 1: raise ValueError("Error! Argument chunk_size needs to be defined as a non-zero positive integer. (value was: '{chunk_size}')") # shuffle records to remove bias from input order if shuffle: df = df.sample(frac=1, random_state=random_seed) # init iteration num_records = len(df) output_chunks = [] start = 0 output_chunk_template = Path(chunk_name_template) output_dir_path = output_chunk_template.parent output_chunk_basename = Path(output_chunk_template.stem).stem # 2-layer stem for cases like 'file.sdf.gz' output_chunk_suffixes = output_chunk_template.suffixes # iteration for i, start in enumerate(range(0, num_records, chunk_size)): end = start + chunk_size output_chunk = output_dir_path / Path(output_chunk_basename + "_" + str(i+1).zfill(3) + ''.join(output_chunk_suffixes)) results = file(df=df.iloc[start:end], output_file=output_chunk, col_mol=col_mol, col_idm=col_idm, csv_sep=csv_sep, encode=encode, ) output_chunks.append(results) return output_chunks
[docs]def chunk_sdf(input_sdf: str, output_dir: str, chunk_size: int = None, prefix: str = None, keep_uncompressed: bool = False, ) -> List[Tuple[str, int]]: """ Split an input SDF file into SDF chunks using memory-efficient line by line text parsing, suitable for large files. Molecules are not parsed, no change is made to the molblocks. :param input_sdf: input SDF :param output_dir: :param chunk_name_template: path of the output file as if there was only one (i.e. dir/file.csv). It is modified to add chunk IDs (i.e. dir/file_001.csv, dir/file_002.csv, etc.). :param chunk_size: number of record for each chunk (last chunk might contain less) :param prefix: prefix to use for chunks. If left to None, the input SDF filename will be used. :param keep_uncompressed: in case of gzip input, keep the uncompressed file instead of deleting it as a temp file :return: a list of tuples containing each chunk name and its number of records TODO: support for gzip outputs TODO: support for gzip input """ def _save_chunk(current_lines, output_dir_path, output_chunk_basename, current_chunk_idx, format, compression): """Helper function for saving currently gathered text into the current uncompressed chunk.""" # save to uncompressed output file output_chunk = output_dir_path / Path(f"{output_chunk_basename}_{str(current_chunk_idx).zfill(3)}.{format.lower()}") with open(output_chunk, 'w+') as OFH: OFH.write(''.join(current_lines)) # compress the now-closed output file if compression == 'gzip': output_archive = f"{output_chunk}.gz" with open(output_chunk, 'rb') as OUTPUT: with gzip.open(output_archive, 'wb') as ARCHIVE: shutil.copyfileobj(OUTPUT, ARCHIVE) # delete the uncompressed file as it is only a byproduct Path(output_chunk).unlink() # consider output file to be the archive for return value output_chunk = output_archive return output_chunk # define I/O input_dir_path = Path(input_sdf).parent output_dir_path = Path(output_dir) # output_chunk_basename = Path(Path(input_sdf).stem).stem # path_chunk_template = Path(chunk_name_template) # output_dir_path = path_chunk_template.parent format, compression = utils.get_file_format(input_sdf) path_chunk_template = output_dir_path / Path(input_sdf).name # format_out, compression_out = utils.get_file_format(chunk_name_template) # in case input file is compressed, uncompress it if compression == 'gzip': input_sdf_uncompressed = str(input_dir_path / Path(input_sdf).stem) with open(input_sdf_uncompressed, 'wb') as FH_UNCOMPRESSED, gzip.open(input_sdf, 'rb') as FH_COMPRESSED: bindata = FH_COMPRESSED.read() FH_UNCOMPRESSED.write(bindata) input_sdf = input_sdf_uncompressed # in case output file has to be compressed, remove the .gz from the filename for the output file for now output_archive = path_chunk_template path_chunk_template = output_dir_path / path_chunk_template.stem if prefix is None: output_chunk_basename = path_chunk_template.stem else: output_chunk_basename = prefix # check format and compression if format != 'SDF' or compression not in (None, 'gzip'): raise ValueError("Error! Input SDF should be of format SDF and, if compressed, be a gzip archive.") # begin with open(input_sdf,'r') as FH: # init iteration current_num_record = 0 current_chunk_idx = 1 output_chunks = [] current_lines = [] file_read = False # iteration line by line while True: line = FH.readline() # exit when reached the end of the file if not line: FH.close() file_read = True break # identify end of record if line.strip() == '$$$$': current_num_record += 1 # append the lines to the current chunk current_lines.append(line) # save chunk when enough records are gathered if current_num_record >= chunk_size: output_chunk = _save_chunk(current_lines, output_dir_path, output_chunk_basename, current_chunk_idx, format, compression) output_chunks.append((output_chunk, current_num_record)) # clean-up current_lines = [] current_num_record = 0 current_chunk_idx += 1 # saving last chunk with current records if file_read and len(current_lines) > 0: output_chunk = _save_chunk(current_lines, output_dir_path, output_chunk_basename, current_chunk_idx, format, compression) output_chunks.append((output_chunk, current_num_record)) # clean-up if compression == 'gzip' and not keep_uncompressed: Path(input_sdf).unlink() # remove the temporary uncompressed input SDF return output_chunks
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ CLASSES ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ #
[docs]class SafeHDF5Store(HDFStore): """Implement safe HDFStore by obtaining file lock. Multiple writes will queue if lock is not obtained. Edited after: https://stackoverflow.com/questions/41231678/obtaining-a-exclusive-lock-when-writing-to-an-hdf5-file """ def __init__(self, *args, **kwargs): """Initialize and obtain file lock.""" interval = kwargs.pop('probe_interval', random()) self._lock = f"{args[0]}.lock" while True: try: self._flock = os.open(self._lock, os.O_CREAT | os.O_EXCL | os.O_WRONLY) break except (IOError, OSError): time.sleep(interval) HDFStore.__init__(self, *args, **kwargs) def __exit__(self, *args, **kwargs): """Exit and remove file lock.""" HDFStore.__exit__(self, *args, **kwargs) os.close(self._flock) os.remove(self._lock)