From 94837fe3439d291bd76b63090bcc484bb5e80a17 Mon Sep 17 00:00:00 2001 From: Jinzhe Zeng Date: Sat, 20 Jun 2026 22:49:20 +0800 Subject: [PATCH] feat(deepmd): add hdf5 mixed format --- dpdata/formats/deepmd/hdf5.py | 14 +- dpdata/formats/deepmd/mixed.py | 103 ++++++++- dpdata/plugins/deepmd.py | 385 +++++++++++++++++++++++++++++++++ dpdata/system.py | 17 +- tests/test_deepmd_hdf5.py | 230 +++++++++++++++++++- 5 files changed, 730 insertions(+), 19 deletions(-) diff --git a/dpdata/formats/deepmd/hdf5.py b/dpdata/formats/deepmd/hdf5.py index c2b3bd424..6349a960c 100644 --- a/dpdata/formats/deepmd/hdf5.py +++ b/dpdata/formats/deepmd/hdf5.py @@ -45,10 +45,6 @@ def to_system_data( data["atom_types"] = g["type.raw"][:] ntypes = np.max(data["atom_types"]) + 1 natoms = data["atom_types"].size - data["atom_numbs"] = [] - for ii in range(ntypes): - data["atom_numbs"].append(np.count_nonzero(data["atom_types"] == ii)) - data["atom_names"] = [] # if find type_map.raw, use it if "type_map.raw" in g.keys(): my_type_map = list(np.char.decode(g["type_map.raw"][:])) @@ -60,9 +56,11 @@ def to_system_data( my_type_map = [] for ii in range(ntypes): my_type_map.append("Type_%d" % ii) # noqa: UP031 - assert len(my_type_map) >= len(data["atom_numbs"]) - for ii in range(len(data["atom_numbs"])): - data["atom_names"].append(my_type_map[ii]) + assert len(my_type_map) >= ntypes + data["atom_names"] = my_type_map + data["atom_numbs"] = [] + for ii, _ in enumerate(data["atom_names"]): + data["atom_numbs"].append(np.count_nonzero(data["atom_types"] == ii)) data["orig"] = np.zeros([3]) if "nopbc" in g.keys(): @@ -81,7 +79,6 @@ def to_system_data( "atom_names", "atom_types", "orig", - "real_atom_types", "real_atom_names", "nopbc", ): @@ -184,7 +181,6 @@ def dump( "atom_names", "atom_types", "orig", - "real_atom_types", "real_atom_names", "nopbc", ): diff --git a/dpdata/formats/deepmd/mixed.py b/dpdata/formats/deepmd/mixed.py index 734b6a730..dbaf21125 100644 --- a/dpdata/formats/deepmd/mixed.py +++ b/dpdata/formats/deepmd/mixed.py @@ -123,9 +123,34 @@ def _strip_virtual_atoms(atom_types_row, coords, extra_data, dtypes): return atom_types, coords, stripped -def to_system_data(folder, type_map=None, labels=True): - data = comp_to_system_data(folder, type_map, labels) - # data is empty +def _to_system_data(data, type_map=None, labels=True): + """Split one mixed-type data dict into regular System data dicts. + + Mixed DeePMD data stores all atoms as one placeholder atom type and keeps + the original atom type of every frame in ``real_atom_types``. This helper + groups frames with the same ``real_atom_types`` row, restores the original + ``atom_types`` and ``atom_numbs``, and strips virtual atoms introduced by + ``atom_numb_pad``. + + Parameters + ---------- + data : dict + Mixed-type data loaded by a backend reader. The dict must contain + ``real_atom_types`` and the usual System/LabeledSystem frame data. + type_map : list[str], optional + Type map used to remap stored atom types while loading. Virtual atoms + marked by ``-1`` are preserved during remapping. + labels : bool, default=True + Whether the data should be interpreted with + :class:`dpdata.LabeledSystem` data types. Set to ``False`` for + unlabeled System data. + + Returns + ------- + list[dict] + Regular System/LabeledSystem data dicts, one for each unique real atom + type layout found in the mixed input. + """ old_type_map = data["atom_names"].copy() if type_map is not None: assert isinstance(type_map, list) @@ -220,7 +245,73 @@ def to_system_data(folder, type_map=None, labels=True): return data_list -def dump(folder, data, set_size=2000, comp_prec=np.float32, remove_sets=True): +def to_system_data(folder, type_map=None, labels=True, load_func=None): + """Load mixed-type DeePMD data and split it into regular systems. + + By default this function reads the ``deepmd/npy/mixed`` directory layout + through :mod:`dpdata.formats.deepmd.comp`. Other storage backends can pass + ``load_func`` to reuse the same mixed-type reconstruction logic. The loader + must return the same data dict shape as ``deepmd/npy`` and include + ``real_atom_types``. + + Parameters + ---------- + folder + Backend-specific location to load. For the default npy backend this is + a directory; HDF5 callers pass an HDF5 group. + type_map : list[str], optional + Type map used to remap atom types while loading. + labels : bool, default=True + Whether labeled data such as energies and forces should be loaded. + load_func : callable, optional + Backend reader with signature ``load_func(folder, type_map, labels)``. + + Returns + ------- + list[dict] + Regular System/LabeledSystem data dicts split out of the mixed input. + """ + if load_func is None: + load_func = comp_to_system_data + data = load_func(folder, type_map=type_map, labels=labels) + return _to_system_data(data, type_map=type_map, labels=labels) + + +def dump( + folder, + data, + set_size=2000, + comp_prec=np.float32, + remove_sets=True, + dump_func=None, +): + """Dump one System data dict in mixed-type DeePMD layout. + + If ``data`` has not already been converted to mixed type, it is copied and + converted first. The converted data stores the original element names in + ``real_atom_names`` and the per-frame real atom type table in + ``real_atom_types``; the backend writer receives the converted data with + ``real_atom_names`` exposed as ``atom_names`` so it is written to + ``type_map.raw``. + + Parameters + ---------- + folder + Backend-specific destination. For the default npy backend this is a + directory; HDF5 callers pass an HDF5 group. + data : dict + System or LabeledSystem data dict to dump. + set_size : int, default=2000 + Maximum number of frames per ``set.*`` chunk. + comp_prec : numpy.dtype, default=numpy.float32 + Floating point precision used by the backend writer. + remove_sets : bool, default=True + Whether existing npy ``set.*`` directories should be removed before + dumping. Backends that do not use directories may ignore this argument. + dump_func : callable, optional + Backend writer with signature + ``dump_func(folder, data, set_size, comp_prec, remove_sets)``. + """ # if not converted to mixed if "real_atom_types" not in data: from dpdata import LabeledSystem, System @@ -236,7 +327,9 @@ def dump(folder, data, set_size=2000, comp_prec=np.float32, remove_sets=True): data = data.copy() data["atom_names"] = data.pop("real_atom_names") - comp_dump(folder, data, set_size, comp_prec, remove_sets) + if dump_func is None: + dump_func = comp_dump + dump_func(folder, data, set_size, comp_prec, remove_sets) def mix_system(*system, type_map, atom_numb_pad=None, **kwargs): diff --git a/dpdata/plugins/deepmd.py b/dpdata/plugins/deepmd.py index 99bd9b237..a0262ca98 100644 --- a/dpdata/plugins/deepmd.py +++ b/dpdata/plugins/deepmd.py @@ -422,6 +422,391 @@ def to_multi_systems( yield f.create_group(ff) +@Format.register("deepmd/hdf5/mixed") +class DeePMDHDF5MixedFormat(DeePMDMixedFormat): + """Mixed type HDF5 format for DeePMD-kit. + + Mixed type data stores frames with the same atom count in one dataset even + when their formulas differ. The placeholder ``type.raw`` contains only the + mixed token type, while ``set.*/real_atom_types.npy`` stores the real atom + type layout for each frame. Loading reconstructs regular Systems by + splitting frames with different ``real_atom_types`` rows. + + The HDF5 layout mirrors ``deepmd/npy/mixed`` inside HDF5 groups. For + :class:`dpdata.MultiSystems`, each top-level mixed group is keyed by the + number of atoms after optional padding, such as ``"4"`` or ``"8"``. A + string path may include ``"#group/path"`` to read or write mixed data under + a nested HDF5 group. + + Examples + -------- + Dump a :class:`dpdata.MultiSystems` object to a mixed HDF5 file: + + >>> systems.to_deepmd_hdf5_mixed("mixed.hdf5") + + Dump with atom-count padding: + + >>> systems.to_deepmd_hdf5_mixed("mixed.hdf5", atom_numb_pad=8) + + Load a mixed HDF5 file into :class:`dpdata.MultiSystems`: + + >>> dpdata.MultiSystems().from_deepmd_hdf5_mixed("mixed.hdf5") + """ + + @staticmethod + def _load_hdf5_mixed_data(group, type_map=None, labels=True): + """Load one mixed HDF5 group as a backend data dict. + + Parameters + ---------- + group : h5py.Group or h5py.File + HDF5 object containing one mixed DeePMD system group. The group must + contain ``type.raw``, ``type_map.raw`` and ``set.*`` children. + type_map : list[str], optional + Type map used by the generic HDF5 loader. + labels : bool, default=True + Whether labeled data such as energies and forces should be loaded. + + Returns + ------- + dict + Mixed-type data dict consumed by + :func:`dpdata.formats.deepmd.mixed.to_system_data`. + """ + return dpdata.formats.deepmd.hdf5.to_system_data( + group, "", type_map=type_map, labels=labels + ) + + @staticmethod + def _dump_hdf5_mixed_data(group, data, set_size, comp_prec, remove_sets=True): + """Dump one mixed data dict to an HDF5 group. + + Parameters + ---------- + group : h5py.Group or h5py.File + Destination HDF5 object. + data : dict + Mixed-type data dict prepared by + :func:`dpdata.formats.deepmd.mixed.dump`. + set_size : int + Maximum number of frames per ``set.*`` group. + comp_prec : numpy.dtype + Floating point precision for dumped frame data. + remove_sets : bool, default=True + Accepted for backend compatibility. HDF5 groups are recreated by the + caller, so this argument is not used. + """ + dpdata.formats.deepmd.hdf5.dump( + group, "", data, set_size=set_size, comp_prec=comp_prec + ) + + @staticmethod + def _iter_mixed_groups(group): + """Yield mixed DeePMD HDF5 groups under ``group``. + + A group is considered a mixed system group when it contains + ``type.raw``, ``type_map.raw`` and at least one ``set.*`` group with a + ``real_atom_types.npy`` dataset. If the current group is not a system + group, nested HDF5 groups are searched recursively. This supports files + written either as a single mixed system at the file root or as + MultiSystems groups such as ``/4`` and ``/8``. + + Parameters + ---------- + group : h5py.Group or h5py.File + HDF5 group or file to scan. + + Yields + ------ + h5py.Group or h5py.File + Mixed system groups to pass to ``from_system_mix``. + """ + import h5py + + set_groups = [ + item + for key, item in group.items() + if key.startswith("set.") and isinstance(item, h5py.Group) + ] + is_mixed_group = ( + "type.raw" in group + and "type_map.raw" in group + and any("real_atom_types.npy" in set_group for set_group in set_groups) + ) + if is_mixed_group: + yield group + return + for item in group.values(): + if isinstance(item, h5py.Group): + yield from DeePMDHDF5MixedFormat._iter_mixed_groups(item) + + @staticmethod + def _get_group(file, name): + """Return ``file`` or a named child group. + + Parameters + ---------- + file : h5py.File or h5py.Group + Root HDF5 object. + name : str + Child group path. An empty string selects ``file`` itself. + + Returns + ------- + h5py.File or h5py.Group + Selected HDF5 object. + """ + if not name: + return file + return file[name] + + @staticmethod + def _create_group(file, name): + """Create a named child group. + + Parameters + ---------- + file : h5py.File or h5py.Group + Root HDF5 object. + name : str + Child group path. An empty string selects ``file`` itself. + + Returns + ------- + h5py.File or h5py.Group + Created group, or ``file`` when ``name`` is empty. + """ + if not name: + return file + return file.create_group(name) + + def from_system_mix(self, file_name, type_map=None, **kwargs): + """Load unlabeled mixed HDF5 data and split it into Systems. + + Parameters + ---------- + file_name : str or h5py.Group or h5py.File + HDF5 file, HDF5 group, or string in ``"file.hdf5#group"`` form. + type_map : list[str], optional + Type map used to remap real atom types while loading. + **kwargs : dict + Additional keyword arguments accepted for format API compatibility. + + Returns + ------- + list[dict] + Unlabeled System data dicts reconstructed from the mixed data. + """ + return self._from_system_mix(file_name, type_map=type_map, labels=False) + + def from_labeled_system_mix(self, file_name, type_map=None, **kwargs): + """Load labeled mixed HDF5 data and split it into LabeledSystems. + + Parameters + ---------- + file_name : str or h5py.Group or h5py.File + HDF5 file, HDF5 group, or string in ``"file.hdf5#group"`` form. + type_map : list[str], optional + Type map used to remap real atom types while loading. + **kwargs : dict + Additional keyword arguments accepted for format API compatibility. + + Returns + ------- + list[dict] + LabeledSystem data dicts reconstructed from the mixed data. + """ + return self._from_system_mix(file_name, type_map=type_map, labels=True) + + def _from_system_mix(self, file_name, type_map=None, labels=True): + """Load mixed HDF5 data through the shared mixed backend. + + Parameters + ---------- + file_name : str or h5py.Group or h5py.File + HDF5 file, HDF5 group, or string in ``"file.hdf5#group"`` form. + When a file object is given, the object itself is interpreted as the + mixed system group. + type_map : list[str], optional + Type map used to remap real atom types while loading. + labels : bool, default=True + Whether labeled data such as energies and forces should be loaded. + + Returns + ------- + list[dict] + System or LabeledSystem data dicts split out of the mixed HDF5 data. + + Raises + ------ + TypeError + If ``file_name`` is not a string, HDF5 group, or HDF5 file. + """ + import h5py + + register_spin() + + if isinstance(file_name, (h5py.Group, h5py.File)): + return dpdata.formats.deepmd.mixed.to_system_data( + file_name, + type_map=type_map, + labels=labels, + load_func=self._load_hdf5_mixed_data, + ) + elif isinstance(file_name, str): + s = file_name.split("#") + name = s[1] if len(s) > 1 else "" + with h5py.File(s[0], "r") as f: + return dpdata.formats.deepmd.mixed.to_system_data( + self._get_group(f, name), + type_map=type_map, + labels=labels, + load_func=self._load_hdf5_mixed_data, + ) + else: + raise TypeError("Unsupported file_name") + + def to_system( + self, + data, + file_name, + set_size: int = 2000, + prec=np.float64, + comp_prec=None, + **kwargs, + ): + """Dump a System data dict in mixed HDF5 format. + + Parameters + ---------- + data : dict + System or LabeledSystem data dict. If it is not already in mixed + type form, it is copied and converted before dumping. + file_name : str or h5py.Group or h5py.File + HDF5 file, HDF5 group, or string in ``"file.hdf5#group"`` form. + Strings open the target file in write mode. HDF5 objects are written + in place. + set_size : int, default=2000 + Maximum number of frames per ``set.*`` group. + prec : numpy.dtype, default=numpy.float64 + Floating point precision for dumped frame data. Kept for + consistency with ``deepmd/npy/mixed``. + comp_prec : numpy.dtype, optional + Explicit floating point precision. When provided, this overrides + ``prec``. + **kwargs : dict + Additional keyword arguments accepted for format API compatibility. + + Raises + ------ + TypeError + If ``file_name`` is not a string, HDF5 group, or HDF5 file. + """ + import h5py + + if comp_prec is None: + comp_prec = prec + + if isinstance(file_name, (h5py.Group, h5py.File)): + dpdata.formats.deepmd.mixed.dump( + file_name, + data, + set_size=set_size, + comp_prec=comp_prec, + dump_func=self._dump_hdf5_mixed_data, + ) + elif isinstance(file_name, str): + s = file_name.split("#") + name = s[1] if len(s) > 1 else "" + with h5py.File(s[0], "w") as f: + dpdata.formats.deepmd.mixed.dump( + self._create_group(f, name), + data, + set_size=set_size, + comp_prec=comp_prec, + dump_func=self._dump_hdf5_mixed_data, + ) + else: + raise TypeError("Unsupported file_name") + + def from_multi_systems(self, directory, **kwargs): + """Generate mixed HDF5 groups for MultiSystems loading. + + Parameters + ---------- + directory : str or h5py.Group or h5py.File + HDF5 file, HDF5 group, or string in ``"file.hdf5#group"`` form. The + selected object may be either one mixed system group or a container + of mixed groups. + **kwargs : dict + Additional keyword arguments accepted for format API compatibility. + + Yields + ------ + h5py.Group or h5py.File + Mixed HDF5 groups that will be passed to ``from_system_mix``. + + Raises + ------ + TypeError + If ``directory`` is not a string, HDF5 group, or HDF5 file. + """ + import h5py + + register_spin() + + if isinstance(directory, (h5py.Group, h5py.File)): + yield from self._iter_mixed_groups(directory) + elif isinstance(directory, str): + s = directory.split("#") + name = s[1] if len(s) > 1 else "" + with h5py.File(s[0], "r") as f: + yield from self._iter_mixed_groups(self._get_group(f, name)) + else: + raise TypeError("Unsupported directory") + + def to_multi_systems(self, formulas, directory, **kwargs): + """Generate HDF5 groups for MultiSystems mixed dumping. + + Parameters + ---------- + formulas : list[str] + Mixed group names produced by ``mix_system``. For mixed HDF5 these + names are atom counts after optional padding. + directory : str or h5py.Group or h5py.File + HDF5 file, HDF5 group, or string in ``"file.hdf5#group"`` form. + Strings open the target file in write mode. + **kwargs : dict + Additional keyword arguments accepted for format API compatibility. + + Yields + ------ + h5py.Group + Destination groups that will be passed to ``to_system``. + + Raises + ------ + TypeError + If ``directory`` is not a string, HDF5 group, or HDF5 file. + """ + import h5py + + if isinstance(directory, (h5py.Group, h5py.File)): + for ff in formulas: + if ff in directory: + del directory[ff] + yield directory.create_group(ff) + elif isinstance(directory, str): + s = directory.split("#") + name = s[1] if len(s) > 1 else "" + with h5py.File(s[0], "w") as f: + root = self._create_group(f, name) + for ff in formulas: + yield root.create_group(ff) + else: + raise TypeError("Unsupported directory") + + @Driver.register("dp") @Driver.register("deepmd") @Driver.register("deepmd-kit") diff --git a/dpdata/system.py b/dpdata/system.py index 4150abc89..a777ccb6b 100644 --- a/dpdata/system.py +++ b/dpdata/system.py @@ -123,6 +123,7 @@ def __init__( - ``lammps/dump``: Lammps dump - ``deepmd/raw``: deepmd-kit raw - ``deepmd/npy``: deepmd-kit compressed format (numpy binary) + - ``deepmd/npy/mixed``: deepmd-kit mixed type compressed format (numpy binary) - ``vasp/poscar``: vasp POSCAR - ``vasp/contcar``: vasp contcar - ``vasp/string``: vasp string @@ -164,6 +165,7 @@ def __init__( - ``gaussian/gjf``: gaussian gjf - ``deepmd/comp``: deepmd comp - ``deepmd/hdf5``: deepmd hdf5 + - ``deepmd/hdf5/mixed``: deepmd mixed type hdf5 - ``gromacs/gro``: gromacs gro - ``cp2k/aimd_output``: cp2k aimd_output - ``cp2k/output``: cp2k output @@ -1180,6 +1182,9 @@ class LabeledSystem(System): - ``vasp/outcar``: vasp OUTCAR - ``deepmd/raw``: deepmd-kit raw - ``deepmd/npy``: deepmd-kit compressed format (numpy binary) + - ``deepmd/npy/mixed``: deepmd-kit mixed type compressed format (numpy binary) + - ``deepmd/hdf5``: deepmd hdf5 + - ``deepmd/hdf5/mixed``: deepmd mixed type hdf5 - ``qe/cp/traj``: Quantum Espresso CP trajectory files. should have: file_name+'.in', file_name+'.pos', file_name+'.evp' and file_name+'.for' - ``qe/pw/scf``: Quantum Espresso PW single point calculations. Both input and output files are required. If file_name is a string, it denotes the output file name. Input file name is obtained by replacing 'out' by 'in' from file_name. Or file_name is a list, with the first element being the input file name and the second element being the output filename. - ``siesta/output``: siesta SCF output file @@ -1417,10 +1422,14 @@ def to_fmt_obj(self, fmtobj: Format, directory, *args: Any, **kwargs: Any): mixed_systems = fmtobj.mix_system( *list(self.systems.values()), type_map=self.atom_names, **kwargs ) - for fn in mixed_systems: - mixed_systems[fn].to_fmt_obj( - fmtobj, os.path.join(directory, fn), *args, **kwargs - ) + for fn, ss in zip( + fmtobj.to_multi_systems( + list(mixed_systems.keys()), directory, **kwargs + ), + mixed_systems.values(), + strict=True, + ): + ss.to_fmt_obj(fmtobj, fn, *args, **kwargs) return self def to(self, fmt: str, *args: Any, **kwargs: Any) -> MultiSystems: diff --git a/tests/test_deepmd_hdf5.py b/tests/test_deepmd_hdf5.py index b4a22f3c1..9d65f29a6 100644 --- a/tests/test_deepmd_hdf5.py +++ b/tests/test_deepmd_hdf5.py @@ -3,8 +3,17 @@ import os import unittest +import h5py # noqa: TID253 import numpy as np -from comp_sys import CompLabeledSys, CompSys, IsNoPBC, IsPBC, MultiSystems +from comp_sys import ( + CompLabeledMultiSys, + CompLabeledSys, + CompSys, + IsNoPBC, + IsPBC, + MSAllIsNoPBC, + MultiSystems, +) from context import dpdata @@ -72,3 +81,222 @@ def setUp(self): def tearDown(self): if os.path.exists("tmp.deepmd.hdf5"): os.remove("tmp.deepmd.hdf5") + + +class TestHDF5MixedMulti( + unittest.TestCase, CompLabeledMultiSys, MultiSystems, MSAllIsNoPBC +): + def setUp(self): + self.places = 6 + self.e_places = 6 + self.f_places = 6 + self.v_places = 6 + + system_1 = dpdata.LabeledSystem( + "gaussian/methane.gaussianlog", fmt="gaussian/log" + ) + system_2 = dpdata.LabeledSystem( + "gaussian/methane_sub.gaussianlog", fmt="gaussian/log" + ) + + tmp_data = system_1.data.copy() + tmp_data["atom_numbs"] = [1, 1, 1, 2] + tmp_data["atom_names"] = ["C", "H", "A", "B"] + tmp_data["atom_types"] = np.array([0, 1, 2, 3, 3]) + system_3 = dpdata.LabeledSystem(data=tmp_data) + + self.ms = dpdata.MultiSystems(system_1, system_2, system_3) + self.ms.to_deepmd_hdf5_mixed("tmp.deepmd.mixed.hdf5") + self.systems = dpdata.MultiSystems().from_deepmd_hdf5_mixed( + "tmp.deepmd.mixed.hdf5" + ) + self.ms_1 = self.ms + self.ms_2 = self.systems + + self.system_names = ["C1H4A0B0", "C1H3A0B0", "C1H1A1B2"] + self.system_sizes = {"C1H4A0B0": 1, "C1H3A0B0": 1, "C1H1A1B2": 1} + self.atom_names = ["C", "H", "A", "B"] + + def tearDown(self): + if os.path.exists("tmp.deepmd.mixed.hdf5"): + os.remove("tmp.deepmd.mixed.hdf5") + + def test_hdf5_group_layout(self): + with h5py.File("tmp.deepmd.mixed.hdf5", "r") as f: + self.assertEqual(set(f.keys()), {"4", "5"}) + for group in f.values(): + self.assertIn("type_map.raw", group) + self.assertIn("set.000/real_atom_types.npy", group) + + +class TestHDF5MixedPadding( + unittest.TestCase, CompLabeledMultiSys, MultiSystems, MSAllIsNoPBC +): + def setUp(self): + self.places = 6 + self.e_places = 6 + self.f_places = 6 + self.v_places = 6 + + system_1 = dpdata.LabeledSystem( + "gaussian/methane.gaussianlog", fmt="gaussian/log" + ) + system_2 = dpdata.LabeledSystem( + "gaussian/methane_sub.gaussianlog", fmt="gaussian/log" + ) + + self.ms = dpdata.MultiSystems(system_1, system_2) + self.ms.to_deepmd_hdf5_mixed("tmp.deepmd.mixed.pad.hdf5", atom_numb_pad=8) + self.systems = dpdata.MultiSystems().from_deepmd_hdf5_mixed( + "tmp.deepmd.mixed.pad.hdf5" + ) + self.ms_1 = self.ms + self.ms_2 = self.systems + + self.system_names = ["C1H4", "C1H3"] + self.system_sizes = {"C1H4": 1, "C1H3": 1} + self.atom_names = ["C", "H"] + + def tearDown(self): + if os.path.exists("tmp.deepmd.mixed.pad.hdf5"): + os.remove("tmp.deepmd.mixed.pad.hdf5") + + def test_single_padded_group(self): + with h5py.File("tmp.deepmd.mixed.pad.hdf5", "r") as f: + self.assertEqual(list(f.keys()), ["8"]) + real_atom_types = f["8/set.000/real_atom_types.npy"][:] + self.assertEqual(real_atom_types.shape[1], 8) + self.assertTrue(np.any(real_atom_types == -1)) + + +class TestHDF5MixedIOVariants(unittest.TestCase): + def tearDown(self): + for file_name in ( + "tmp.deepmd.mixed.single.hdf5", + "tmp.deepmd.mixed.group.hdf5", + "tmp.deepmd.mixed.object.hdf5", + "tmp.deepmd.mixed.unlabeled.hdf5", + "tmp.deepmd.mixed.typemap.hdf5", + "tmp.deepmd.regular.hdf5", + ): + if os.path.exists(file_name): + os.remove(file_name) + + def test_single_system_string_round_trip(self): + system = dpdata.LabeledSystem( + "gaussian/methane.gaussianlog", fmt="gaussian/log" + ) + system.to("deepmd/hdf5/mixed", "tmp.deepmd.mixed.single.hdf5") + + systems = dpdata.MultiSystems().from_deepmd_hdf5_mixed( + "tmp.deepmd.mixed.single.hdf5" + ) + + self.assertEqual(len(systems), 1) + self.assertIn("C1H4", systems.systems) + np.testing.assert_allclose( + systems["C1H4"].data["coords"], system.data["coords"] + ) + + def test_hash_group_round_trip(self): + system_1 = dpdata.LabeledSystem( + "gaussian/methane.gaussianlog", fmt="gaussian/log" + ) + system_2 = dpdata.LabeledSystem( + "gaussian/methane_sub.gaussianlog", fmt="gaussian/log" + ) + + dpdata.MultiSystems(system_1, system_2).to_deepmd_hdf5_mixed( + "tmp.deepmd.mixed.group.hdf5#mixed" + ) + systems = dpdata.MultiSystems().from_deepmd_hdf5_mixed( + "tmp.deepmd.mixed.group.hdf5#mixed" + ) + + self.assertEqual(set(systems.systems), {"C1H4", "C1H3"}) + with h5py.File("tmp.deepmd.mixed.group.hdf5", "r") as f: + self.assertEqual(set(f["mixed"].keys()), {"4", "5"}) + + def test_hdf5_object_round_trip(self): + system_1 = dpdata.LabeledSystem( + "gaussian/methane.gaussianlog", fmt="gaussian/log" + ) + system_2 = dpdata.LabeledSystem( + "gaussian/methane_sub.gaussianlog", fmt="gaussian/log" + ) + + with h5py.File("tmp.deepmd.mixed.object.hdf5", "w") as f: + f.create_group("5") + dpdata.MultiSystems(system_1, system_2).to_deepmd_hdf5_mixed(f) + + with h5py.File("tmp.deepmd.mixed.object.hdf5", "r") as f: + systems = dpdata.MultiSystems().from_deepmd_hdf5_mixed(f) + + self.assertEqual(set(systems.systems), {"C1H4", "C1H3"}) + np.testing.assert_allclose( + systems["C1H4"].data["forces"], system_1.data["forces"] + ) + + def test_unlabeled_round_trip(self): + system = dpdata.System("poscars/POSCAR.h2o.md", fmt="vasp/poscar") + system.to("deepmd/hdf5/mixed", "tmp.deepmd.mixed.unlabeled.hdf5") + + systems = dpdata.MultiSystems().load_systems_from_file( + "tmp.deepmd.mixed.unlabeled.hdf5", + fmt="deepmd/hdf5/mixed", + labeled=False, + ) + + self.assertEqual(len(systems), 1) + self.assertNotIn("energies", list(systems.systems.values())[0].data) + np.testing.assert_allclose( + list(systems.systems.values())[0].data["coords"], system.data["coords"] + ) + + def test_type_map_round_trip(self): + system = dpdata.LabeledSystem( + "gaussian/methane.gaussianlog", fmt="gaussian/log" + ) + dpdata.MultiSystems(system).to_deepmd_hdf5_mixed( + "tmp.deepmd.mixed.typemap.hdf5" + ) + + systems = dpdata.MultiSystems().from_deepmd_hdf5_mixed( + "tmp.deepmd.mixed.typemap.hdf5", type_map=["H", "C"] + ) + system_ref = system.copy() + system_ref.apply_type_map(["H", "C"]) + + self.assertEqual(set(systems.systems), {system_ref.formula}) + np.testing.assert_allclose( + systems[system_ref.formula].data["forces"], system_ref.data["forces"] + ) + + def test_unsupported_inputs(self): + fmt = dpdata.plugins.deepmd.DeePMDHDF5MixedFormat() + + with self.assertRaises(TypeError): + fmt.from_system_mix(object()) + with self.assertRaises(TypeError): + fmt.to_system({}, object()) + with self.assertRaises(TypeError): + list(fmt.from_multi_systems(object())) + with self.assertRaises(TypeError): + list(fmt.to_multi_systems(["1"], object())) + + def test_regular_hdf5_groups_are_not_mixed(self): + system_1 = dpdata.LabeledSystem( + "gaussian/methane.gaussianlog", fmt="gaussian/log" + ) + system_2 = dpdata.LabeledSystem( + "gaussian/methane_sub.gaussianlog", fmt="gaussian/log" + ) + dpdata.MultiSystems(system_1, system_2).to_deepmd_hdf5( + "tmp.deepmd.regular.hdf5" + ) + + systems = dpdata.MultiSystems().from_deepmd_hdf5_mixed( + "tmp.deepmd.regular.hdf5" + ) + + self.assertEqual(len(systems), 0)