Source code for

"""Functions related to reading and writing data."""
import logging
import io

# import argparse
from collections import Counter
from pathlib import Path
from sys import path as sys_path
import requests
from . import psize
from . import inputgen
from . import cif
from . import pdb
from . import definitions as defns
from .structures import Atom
from .config import FORCE_FIELDS, TITLE_STR
from .config import AA_DEF_PATH, NA_DEF_PATH, PATCH_DEF_PATH

_LOGGER = logging.getLogger(__name__)

[docs]class DuplicateFilter(logging.Filter): """Filter duplicate messages."""
[docs] def __init__(self): super().__init__() self.warn_count = Counter()
[docs] def filter(self, record): """Filter current record.""" if record.levelname == "WARNING": for fwarn in FILTER_WARNINGS: if record.getMessage().startswith(fwarn): self.warn_count.update([fwarn]) if self.warn_count[fwarn] > FILTER_WARNINGS_LIMIT: return False elif self.warn_count[fwarn] == FILTER_WARNINGS_LIMIT: _LOGGER.warning( f'Suppressing further "{fwarn}" messages' ) return False else: return True return True
[docs]def get_old_header(pdblist): """Get old header from list of :mod:`pdb` objects. :param pdblist: list of :mod:`pdb` block objects :type pdblist: [] :return: old header as string :rtype: str """ old_header = io.StringIO() header_types = ( pdb.HEADER, pdb.TITLE, pdb.COMPND, pdb.SOURCE, pdb.KEYWDS, pdb.EXPDTA, pdb.AUTHOR, pdb.REVDAT, pdb.JRNL, pdb.REMARK, pdb.SPRSDE, pdb.NUMMDL, ) for pdb_obj in pdblist: if not isinstance(pdb_obj, header_types): break old_header.write(str(pdb_obj)) old_header.write("\n") return old_header.getvalue()
[docs]def dump_apbs(output_pqr, output_path): """Generate and dump APBS input files related to output_pqr. :param output_pqr: path to PQR file used to generate APBS input file :type output_pqr: str :param output_path: path for APBS input file output :type output_path: str """ method = "mg-auto" size = psize.Psize() size.parse_input(output_pqr) size.run_psize(output_pqr) input_ = inputgen.Input(output_pqr, size, method, 0, potdx=True) input_.print_input_files(output_path)
[docs]def test_for_file(name, type_): """Test for the existence of a file with a few name permutations. :param name: name of file :type name: str :param type_: type of file :type type_: str :return: path to file :raises FileNotFoundError: if file not found :rtype: Path """ if name is None: return "" test_names = [name, name.upper(), name.lower()] test_suffixes = ["", f".{type_.upper()}", f".{type_.lower()}"] test_dirs = [ Path(p).joinpath("pdb2pqr", "dat") for p in sys_path + [Path.cwd()] ] if name.lower() in FORCE_FIELDS: name = name.upper() for test_dir in test_dirs: test_dir = Path(test_dir) for test_name in test_names: for test_suffix in test_suffixes: test_path = test_dir / (test_name + test_suffix) if test_path.is_file(): _LOGGER.debug(f"Found {type_} file {test_path}") return test_path err = f"Unable to find {type_} file for {name}" raise FileNotFoundError(err)
[docs]def test_names_file(name): """Test for the .names file that contains the XML mapping. :param name: the name of the forcefield :type name: str :returns: the path to the file :rtype: Path :raises FileNotFoundError: file not found """ return test_for_file(name, "NAMES")
[docs]def test_dat_file(name): """Test for the forcefield file with a few name permutations. :param name: the name of the dat file :type name: str :returns: the path to the file :rtype: Path :raises FileNotFoundError: file not found """ return test_for_file(name, "DAT")
[docs]def test_xml_file(name): """Test for the XML file with a few name permutations. :param name: the name of the dat file :type name: str :returns: the path to the file :rtype: Path :raises FileNotFoundError: file not found """ return test_for_file(name, "xml")
[docs]def get_pdb_file(name): """Obtain a PDB file. First check the path given on the command line - if that file is not available, obtain the file from the PDB webserver at .. todo:: This should be a context manager (to close the open file). .. todo:: Remove hard-coded parameters. :param name: name of PDB file (path) or PDB ID :type name: str :return: file-like object containing PDB file :rtype: file """ path = Path(name) if path.is_file(): return open(path, "rt", encoding="utf-8") url_path = f"{path.stem}.pdb" _LOGGER.debug(f"Attempting to fetch PDB from {url_path}") resp = requests.get(url_path) if resp.status_code != 200: errstr = f"Got code {resp.status_code} while retrieving {url_path}" raise IOError(errstr) return io.StringIO(resp.text)
[docs]def get_molecule(input_path): """Get molecular structure information as a series of parsed lines. :param input_path: structure file PDB ID or path :type intput_path: str :return: (list of molecule records, Boolean indicating whether entry is CIF) :rtype: ([str], bool) :raises RuntimeError: problems with structure file """ path = Path(input_path) input_file = get_pdb_file(input_path) is_cif = False if path.suffix.lower() == ".cif": pdblist, errlist = cif.read_cif(input_file) is_cif = True else: pdblist, errlist = pdb.read_pdb(input_file) input_file.close() if len(pdblist) == 0 and len(errlist) == 0: raise RuntimeError(f"Unable to find file {path}!") if len(errlist) != 0: if is_cif: _LOGGER.warning(f"Warning: {path} is a non-standard CIF file.\n") else: _LOGGER.warning(f"Warning: {path} is a non-standard PDB file.\n") _LOGGER.error(errlist) return pdblist, is_cif
[docs]def get_definitions( aa_path=AA_DEF_PATH, na_path=NA_DEF_PATH, patch_path=PATCH_DEF_PATH ): """Load topology definition files. :param aa_path: likely location of amino acid topology definitions :type aa_path: str :param na_path: likely location of nucleic acid topology definitions :type na_path: str :param patch_path: likely location of patch topology definitions :type patch_path: str :return: topology Definitions object. :rtype: Definition """ aa_path_ = test_xml_file(aa_path) na_path_ = test_xml_file(na_path) patch_path_ = test_xml_file(patch_path) with open(aa_path_, "rt") as aa_file: with open(na_path_, "rt") as na_file: with open(patch_path_, "rt") as patch_file: definitions = defns.Definition( aa_file=aa_file, na_file=na_file, patch_file=patch_file ) return definitions
[docs]def setup_logger(output_pqr, level="DEBUG"): """Setup the logger. Setup logger to output the log file to the same directory as PQR output. :param str output_pqr: path to PQR file :param str level: logging level """ # Get the output logging location output_pth = Path(output_pqr) log_file = Path(output_pth.parent, output_pth.stem + ".log") log_format = ( "%(asctime)s %(levelname)s:%(filename)s:" "%(lineno)d:%(funcName)s:%(message)s" )"Logs stored: {log_file}") logging.basicConfig( filename=log_file, format=log_format, level=getattr(logging, level), ) console = logging.StreamHandler() formatter = logging.Formatter("%(levelname)s:%(message)s") console.setFormatter(formatter) console.setLevel(level=getattr(logging, level)) logging.getLogger("").addHandler(console) logging.getLogger("").addFilter(DuplicateFilter())
[docs]def read_pqr(pqr_file): """Read PQR file. :param pqr_file: file object ready for reading as text :type pqr_file: file :returns: list of atoms read from file :rtype: [Atom] """ atoms = [] for line in pqr_file: atom = Atom.from_pqr_line(line) if atom is not None: atoms.append(atom) return atoms
[docs]def read_qcd(qcd_file): """Read QCD (UHDB QCARD format) file. :param file qcd_file: file object ready for reading as text :returns: list of atoms read from file :rtype: [Atom] """ atoms = [] atom_serial = 1 for line in qcd_file: atom = Atom.from_qcd_line(line, atom_serial) if atom is not None: atoms.append(atom) atom_serial += 1 return atoms
[docs]def read_dx(dx_file): """Read DX-format volumetric information. The OpenDX file format is defined at <`. .. note:: This function is not a general-format OpenDX file parser and makes many assumptions about the input data type, grid structure, etc. .. todo:: This function should be moved into the APBS code base. :param dx_file: file object for DX file, ready for reading as text :type dx_file: file :returns: dictionary with data from DX file :rtype: dict :raises ValueError: on parsing error """ dx_dict = { "grid spacing": [], "values": [], "number of grid points": None, "lower left corner": None, } for line in dx_file: words = [w.strip() for w in line.split()] if words[0] in ["#", "attribute", "component"]: pass elif words[0] == "object": if words[1] == "1": dx_dict["number of grid points"] = ( int(words[5]), int(words[6]), int(words[7]), ) elif words[0] == "origin": dx_dict["lower left corner"] = [ float(words[1]), float(words[2]), float(words[3]), ] elif words[0] == "delta": spacing = [float(words[1]), float(words[2]), float(words[3])] dx_dict["grid spacing"].append(spacing) else: for word in words: dx_dict["values"].append(float(word)) return dx_dict
[docs]def write_cube(cube_file, data_dict, atom_list, comment="CPMD CUBE FILE."): """Write a Cube-format data file. Cube file format is defined at <>. .. todo:: This function should be moved into the APBS code base. :param cube_file: file object ready for writing text data :type cube_file: file :param data_dict: dictionary of volumetric data as produced by :func:`read_dx` :type data_dict: dict :param comment: comment for Cube file :type comment: str """ cube_file.write(comment + "\n") cube_file.write("OUTER LOOP: X, MIDDLE LOOP: Y, INNER LOOP: Z\n") num_atoms = len(atom_list) origin = data_dict["lower left corner"] cube_file.write( f"{num_atoms:>4} {origin[0]:>11.6f} {origin[1]:>11.6f} " f"{origin[2]:>11.6f}\n" ) num_points = data_dict["number of grid points"] spacings = data_dict["grid spacing"] for i in range(3): cube_file.write( f"{-num_points[i]:>4} " f"{spacings[i][0]:>11.6f} " f"{spacings[i][1]:>11.6f} " f"{spacings[i][2]:>11.6f}\n" ) for atom in atom_list: cube_file.write( f"{atom.serial:>4} {atom.charge:>11.6f} {atom.x:>11.6f} " f"{atom.y:>11.6f} {atom.z:>11.6f}\n" ) stride = 6 values = data_dict["values"] for i in range(0, len(values), 6): if i + stride < len(values): imax = i + 6 words = [f"{val:< 13.5E}" for val in values[i:imax]] cube_file.write(" ".join(words) + "\n") else: words = [f"{val:< 13.5E}" for val in values[i:]] cube_file.write(" ".join(words))