Repo created

This commit is contained in:
Fr4nz D13trich 2025-11-22 13:58:55 +01:00
parent 4af19165ec
commit 68073add76
12458 changed files with 12350765 additions and 2 deletions

View file

@ -0,0 +1,39 @@
import os
resource_path = os.path.join(
os.path.dirname(os.path.realpath(__file__)), "..", "..", "..", "data",
)
if not os.path.exists(resource_path):
from data_files import find_data_files
resource_path = find_data_files("omim-data")
assert resource_path is not None
from mwm.feature_types import init as _init
_init(resource_path)
from mwm.feature_types import INDEX_TO_NAME_TYPE_MAPPING
from mwm.feature_types import NAME_TO_INDEX_TYPE_MAPPING
from mwm.feature_types import readable_type
from mwm.feature_types import type_index
from mwm.mwm_interface import GeomType
from mwm.mwm_interface import MapType
from mwm.mwm_interface import MetadataField
from mwm.mwm_interface import Point
from mwm.mwm_interface import Rect
from mwm.mwm_interface import RegionDataField
from mwm.mwm_interface import Triangle
from mwm.mwm_python import get_region_info
from mwm.utils import EnumAsStrEncoder
try:
from mwm.mwm_pygen import MwmPygen as Mwm
from mwm.mwm_pygen import FeaturePygen as Feature
from mwm.mwm_pygen import init as _init
_init(resource_path)
except ImportError:
from mwm.mwm_python import MwmPython as Mwm
from mwm.mwm_python import FeaturePython as Feature

View file

@ -0,0 +1,134 @@
import argparse
import logging
import sys
from mwm.decode_id import decode_id
from mwm.dump_mwm import dump_mwm
from mwm.find_feature import find_and_print_features
from mwm.ft2osm import ft2osm
from mwm.mwm_feature_compare import compare_mwm
logger = logging.getLogger("mwm")
logger.setLevel(logging.ERROR)
class Mwm:
def __init__(self):
parser = argparse.ArgumentParser(
description="Mwm utils",
usage="""mwm <command> [<args>]
The most commonly used mwm commands are:
decode_id Unpacks maps.me OSM id to an OSM object link.
dump_mwm Dumps some MWM structures.
find_feature Finds features in an mwm file based on a query.
ft2osm Finds an OSM object for a given feature id.
mwm_feature_compare Compares feature count in .mwm files.
""",
)
parser.add_argument("command", help="Subcommand to run")
args = parser.parse_args(sys.argv[1:2])
if not hasattr(self, args.command):
print(f"Unrecognized command {args.command}")
parser.print_help()
exit(1)
getattr(self, args.command)()
@staticmethod
def decode_id():
parser = argparse.ArgumentParser(
description="Unpacks maps.me OSM id to an OSM object link."
)
parser.add_argument(
"--id", type=str, required=True, help="OsmId or url from osm.org."
)
args = parser.parse_args(sys.argv[2:])
id = decode_id(args.id)
if id is None:
print("Decode id error.")
exit(1)
print(id)
@staticmethod
def dump_mwm():
parser = argparse.ArgumentParser(description="Dumps some MWM structures.")
parser.add_argument("--path", type=str, required=True, help="Path to mwm.")
parser.add_argument(
"--format",
type=str,
default="str",
choices=("str", "json"),
help="Output format.",
)
parser.add_argument(
"--need_features", action="store_true", help="Need to dump features."
)
args = parser.parse_args(sys.argv[2:])
dump_mwm(args.path, args.format, args.need_features)
@staticmethod
def find_feature():
parser = argparse.ArgumentParser(
description="Finds features in an mwm file based on a query."
)
parser.add_argument("--path", type=str, required=True, help="Path to mwm.")
parser.add_argument(
"--type",
type=str,
required=True,
choices=["t", "et", "n", "m", "id"],
help="""Type:
t for inside types ("t hwtag" will find all hwtags-*)
et for exact type ("et shop" won\'t find shop-chemist)
n for names, case-sensitive ("n Starbucks" fo r all starbucks)
m for metadata keys ("m flats" for features with flats
id for feature id ("id 1234" for feature #1234""",
)
parser.add_argument(
"--str", type=str, required=True, help="String to find in mwm"
)
args = parser.parse_args(sys.argv[2:])
find_and_print_features(args.path, args.type, args.str)
@staticmethod
def ft2osm():
parser = argparse.ArgumentParser(
description="Finds features in an mwm file based on a query."
)
parser.add_argument(
"--path", type=str, required=True, help="Path to osm to feature mapping."
)
parser.add_argument("--id", type=str, required=True, help="Feature id.")
args = parser.parse_args(sys.argv[2:])
id = ft2osm(args.path, args.id)
if id is None:
print("Error: id not found.")
exit(1)
print(id)
@staticmethod
def mwm_feature_compare():
parser = argparse.ArgumentParser(
description="Compares feature count in .mwm files."
)
parser.add_argument(
"-n", "--new", help="New mwm files path", type=str, required=True
)
parser.add_argument(
"-o", "--old", help="Old mwm files path", type=str, required=True
)
parser.add_argument(
"-f", "--feature", help="Feature name to count", type=str, required=True
)
parser.add_argument(
"-t",
"--threshold",
help="Threshold in percent to warn",
type=int,
default=20,
)
args = parser.parse_args(sys.argv[2:])
compare_mwm(args.old, args.new, args.feature, args.threshold)
Mwm()

25
tools/python/mwm/decode_id.py Executable file
View file

@ -0,0 +1,25 @@
import re
from mwm.ft2osm import OsmIdCode
from mwm.ft2osm import unpack_osmid
def decode_id(id):
if id.isdigit():
osm_id = unpack_osmid(int(id))
type_abbr = {"n": "node", "w": "way", "r": "relation"}
return f"https://www.openstreetmap.org/{type_abbr[osm_id[0]]}/{osm_id[1]}"
else:
m = re.search(r"/(node|way|relation)/(\d+)", id)
if m:
type_name = m.group(1)
oid = int(m.group(2))
if type_name == "node":
oid |= OsmIdCode.NODE
elif type_name == "way":
oid |= OsmIdCode.WAY
elif type_name == "relation":
oid |= OsmIdCode.RELATION
return oid
else:
return None

21
tools/python/mwm/dump_mwm.py Executable file
View file

@ -0,0 +1,21 @@
import json
from mwm import EnumAsStrEncoder
from mwm import Mwm
def dump_mwm(path, format, need_features):
mwm = Mwm(path)
if format == "str":
print(mwm)
elif format == "json":
print(json.dumps(mwm.to_json(), ensure_ascii=False, cls=EnumAsStrEncoder))
if need_features:
for ft in mwm:
if format == "str":
print(ft)
elif format == "json":
print(
json.dumps(ft.to_json(), ensure_ascii=False, cls=EnumAsStrEncoder)
)

View file

@ -0,0 +1,6 @@
class MwmError(Exception):
pass
class FeaturesSectionParseError(MwmError):
pass

View file

@ -0,0 +1,35 @@
import os
NAME_TO_INDEX_TYPE_MAPPING = {}
INDEX_TO_NAME_TYPE_MAPPING = {}
def init(resource_path):
global NAME_TO_INDEX_TYPE_MAPPING
global INDEX_TO_NAME_TYPE_MAPPING
NAME_TO_INDEX_TYPE_MAPPING = {}
INDEX_TO_NAME_TYPE_MAPPING = {}
with open(os.path.join(resource_path, "types.txt")) as f:
for i, line in enumerate(f):
s = line.strip()
name = s.replace("|", "-")
if s.startswith("*"):
name = name[1:]
NAME_TO_INDEX_TYPE_MAPPING[name] = i
INDEX_TO_NAME_TYPE_MAPPING[i] = name
def readable_type(index: int) -> str:
try:
return INDEX_TO_NAME_TYPE_MAPPING[index]
except KeyError:
return "unknown"
def type_index(type_name: str) -> int:
try:
return NAME_TO_INDEX_TYPE_MAPPING[type_name]
except KeyError:
return -1

View file

@ -0,0 +1,52 @@
import json
from typing import List
from mwm import EnumAsStrEncoder
from mwm import Feature
from mwm import Mwm
from mwm import readable_type
def find_features(path: str, typ: str, string: str) -> List[Feature]:
features = []
index = int(string) if typ == "id" else None
for feature in Mwm(path):
found = False
if typ == "n":
for value in feature.names().values():
if string in value:
found = True
break
elif typ in ("t", "et"):
for t in feature.types():
readable_type_ = readable_type(t)
if readable_type_ == string:
found = True
break
elif typ == "t" and string in readable_type_:
found = True
break
elif typ == "m":
for f in feature.metadata():
if string in f.name:
found = True
break
elif typ == "id" and index == feature.index():
found = True
if found:
features.append(feature)
return features
def find_and_print_features(path: str, typ: str, string: str):
for feature in find_features(path, typ, string):
print(
json.dumps(
feature.to_json(),
ensure_ascii=False,
sort_keys=True,
cls=EnumAsStrEncoder,
)
)

108
tools/python/mwm/ft2osm.py Executable file
View file

@ -0,0 +1,108 @@
from mwm.mwm_python import read_uint
from mwm.mwm_python import read_varuint
class OsmIdCode:
# We use here obsolete types. If we change this types to new types,
# we must support it here. See base/geo_object_id.hpp.
NODE = 0x4000000000000000
WAY = 0x8000000000000000
RELATION = 0xC000000000000000
FULL_MASK = NODE | WAY | RELATION
RESET = ~FULL_MASK
@staticmethod
def is_node(code):
return code & OsmIdCode.FULL_MASK == OsmIdCode.NODE
@staticmethod
def is_way(code):
return code & OsmIdCode.FULL_MASK == OsmIdCode.WAY
@staticmethod
def is_relation(code):
return code & OsmIdCode.FULL_MASK == OsmIdCode.RELATION
@staticmethod
def get_type(code):
if OsmIdCode.is_relation(code):
return "r"
elif OsmIdCode.is_node(code):
return "n"
elif OsmIdCode.is_way(code):
return "w"
return None
@staticmethod
def get_id(code):
return code & OsmIdCode.RESET
def unpack_osmid(num):
typ = OsmIdCode.get_type(num)
if typ is None:
return None
return typ, OsmIdCode.get_id(num)
def _read_osm2ft_v0(f, ft2osm, tuples):
count = read_varuint(f)
result = {}
for i in range(count):
osmid = read_uint(f, 8)
if tuples:
osmid = unpack_osmid(osmid)
fid = read_uint(f, 4)
read_uint(f, 4) # filler
if osmid is not None:
if ft2osm:
result[fid] = osmid
else:
result[osmid] = fid
return result
def _read_osm2ft_v1(f, ft2osm, tuples):
count = read_varuint(f)
result = {}
for i in range(count):
osmid = read_uint(f, 8)
# V1 use complex ids. Here we want to skip second part of complex id
# to save old interface osm2ft.
read_uint(f, 8)
if tuples:
osmid = unpack_osmid(osmid)
fid = read_uint(f, 4)
read_uint(f, 4) # filler
if osmid is not None:
if ft2osm:
result[fid] = osmid
else:
result[osmid] = fid
return result
def read_osm2ft(f, ft2osm=False, tuples=True):
"""Reads mwm.osm2ft file, returning a dict of feature id <-> osm id."""
header = read_uint(f, 4)
is_new_format = header == 0xFFFFFFFF
if is_new_format:
version = read_uint(f, 1)
if version == 1:
return _read_osm2ft_v1(f, ft2osm, tuples)
else:
raise Exception("Format {0} is not supported".format(version))
else:
f.seek(0)
return _read_osm2ft_v0(f, ft2osm, tuples)
def ft2osm(path, ftid):
with open(path, "rb") as f:
ft2osm = read_osm2ft(f, ft2osm=True)
type_abbr = {"n": "node", "w": "way", "r": "relation"}
ftid = int(ftid)
if ftid in ft2osm:
return f"https://www.openstreetmap.org/{type_abbr[ft2osm[ftid][0]]}/{ft2osm[ftid][1]}"
return None

View file

@ -0,0 +1,38 @@
import multiprocessing
import os
from mwm.find_feature import find_features
def compare_feature_num(old_mwm, new_mwm, type_name, threshold):
old_count = len(find_features(old_mwm, "et", type_name))
new_count = len(find_features(new_mwm, "et", type_name))
delta = new_count - old_count
if delta < 0:
p_change = float(abs(delta)) / old_count * 100
if p_change > threshold:
print(
f'In "{os.path.basename(new_mwm)}" number of "{type_name}" '
f"decreased by {round(p_change)} ({old_count}{new_count})"
)
return False
return True
def compare_mwm(old_mwm_path, new_mwm_path, type_name, threshold):
def generate_names(path):
return {
file_name: os.path.abspath(os.path.join(path, file_name))
for file_name in os.listdir(path)
if file_name.endswith(".mwm") and not file_name.startswith("World")
}
old_mwms = generate_names(old_mwm_path)
new_mwms = generate_names(new_mwm_path)
same_mwms = set(new_mwms) & set(old_mwms)
args = ((old_mwms[mwm], new_mwms[mwm], type_name, threshold) for mwm in same_mwms)
pool = multiprocessing.Pool()
return all(pool.imap(compare_feature_num, args))

View file

@ -0,0 +1,409 @@
import enum
import os
from abc import ABC
from abc import abstractmethod
from typing import Dict
from typing import Iterable
from typing import List
from typing import Union
from mwm.feature_types import readable_type
# See coding/string_utf8_multilang.cpp to synchronize languages.
LANGS = (
"default",
"en",
"ja",
"fr",
"ko_rm",
"ar",
"de",
"int_name",
"ru",
"sv",
"zh",
"fi",
"be",
"ka",
"ko",
"he",
"nl",
"ga",
"ja_rm",
"el",
"it",
"es",
"zh_pinyin",
"th",
"cy",
"sr",
"uk",
"ca",
"hu",
"reserved (earlier hsb)",
"eu",
"fa",
"reserved (earlier br)",
"pl",
"hy",
"reserved (earlier kn)",
"sl",
"ro",
"sq",
"am",
"no",
"cs",
"id",
"sk",
"af",
"ja_kana",
"reserved (earlier lb)",
"pt",
"hr",
"da",
"vi",
"tr",
"bg",
"alt_name",
"lt",
"old_name",
"kk",
"reserved (earlier gsw)",
"et",
"ku",
"mn",
"mk",
"lv",
"hi",
)
class MetadataField(enum.Enum):
cuisine = 1
open_hours = 2
phone_number = 3
fax_number = 4
stars = 5
operator = 6
url = 7
website = 8
internet = 9
ele = 10
turn_lanes = 11
turn_lanes_forward = 12
turn_lanes_backward = 13
email = 14
postcode = 15
wikipedia = 16
flats = 18
height = 19
min_height = 20
denomination = 21
building_levels = 22
test_id = 23
sponsored_id = 24
price_rate = 25
rating = 26
banner_url = 27
level = 28
airport_iata = 29
brand = 30
duration = 31
building_min_level = 40
class RegionDataField(enum.Enum):
languages = 0
driving = 1
timezone = 2
address_format = 3
phone_format = 4
postcode_format = 5
public_holidays = 6
allow_housenames = 7
class MapType(enum.Enum):
world = 0
world_coasts = 1
country = 2
class GeomType(enum.Enum):
undefined = -1
point = 0
line = 1
area = 2
class SectionInfo:
__slots__ = "name", "offset", "size"
def __init__(self, name, offset, size):
self.name = name
self.offset = offset
self.size = size
def __repr__(self):
return (
f"SectionInfo[name: {self.name}, "
f"offset: {self.offset}, "
f"size: {self.size}]"
)
def to_json(self):
return {"name": self.name, "offset": self.offset, "size": self.size}
class MwmVersion:
__slots__ = "format", "seconds_since_epoch", "version"
def __init__(self, format, seconds_since_epoch, version):
self.format = format
self.seconds_since_epoch = seconds_since_epoch
self.version = version
def __repr__(self):
return (
f"MwmVersion[format: {self.format}, "
f"seconds since epoch: {self.seconds_since_epoch}, "
f"version: {self.version}]"
)
def to_json(self):
return {
"format": self.format,
"secondsSinceEpoch": self.seconds_since_epoch,
"version": self.version,
}
class Point:
__slots__ = "x", "y"
def __init__(self, x=0.0, y=0.0):
self.x = x
self.y = y
def __add__(self, other):
if isinstance(other, Point):
return Point(self.x + other.x, self.y + other.y)
raise NotImplementedError
def __iadd__(self, other):
if isinstance(other, Point):
self.x += other.x
self.y += other.y
raise NotImplementedError
def __repr__(self):
return f"({self.x}, {self.y})"
def to_json(self):
return {"x": self.x, "y": self.y}
class Rect:
__slots__ = "left_bottom", "right_top"
def __init__(self, left_bottom: Point, right_top: Point):
self.left_bottom = left_bottom
self.right_top = right_top
def __repr__(self):
return f"Rect[{self.left_bottom}, {self.right_top}]"
def to_json(self):
return {
"leftBottom": self.left_bottom.to_json(),
"rightTop": self.right_top.to_json(),
}
class Triangle:
__slots__ = "x", "y", "z"
def __init__(self, x: Point, y: Point, z: Point):
self.x = x
self.y = y
self.z = z
def __repr__(self):
return f"Triangle[{self.x}, {self.y}, {self.z}]"
def to_json(self):
return {"x": self.x.to_json(), "y": self.y.to_json(), "z": self.z.to_json()}
class Mwm(ABC):
def __init__(self, filename: str):
self.filename = filename
def name(self) -> str:
return os.path.basename(self.filename)
def path(self) -> str:
return self.filename
@abstractmethod
def version(self) -> MwmVersion:
pass
@abstractmethod
def type(self) -> MapType:
pass
@abstractmethod
def bounds(self) -> Rect:
pass
@abstractmethod
def sections_info(self) -> Dict[str, SectionInfo]:
pass
@abstractmethod
def __len__(self) -> int:
pass
@abstractmethod
def __iter__(self) -> Iterable:
pass
def __repr__(self):
si = "\n".join(
[
f" {s}"
for s in sorted(self.sections_info().values(), key=lambda x: x.offset)
]
)
return (
f"Mwm[\n"
f" name: {self.name()}\n"
f" type: {self.type()}\n"
f" version: {self.version()}\n"
f" number of features: {len(self)}\n"
f" bounds: {self.bounds()}\n"
f" sections info: [\n{si} \n ]\n"
f"]"
)
def to_json(self, with_features=False):
m = {
"name": self.name(),
"version": self.version().to_json(),
"type": self.type(),
"bounds": self.bounds().to_json(),
"sections_info": {k: v.to_json() for k, v in self.sections_info().items()},
"size": len(self),
}
if with_features:
m["features"] = [f.to_json() for f in self]
return m
class Feature(ABC):
@abstractmethod
def index(self) -> int:
pass
@abstractmethod
def types(self) -> List[int]:
pass
def readable_types(self) -> List[str]:
return [readable_type(i) for i in self.types()]
@abstractmethod
def metadata(self) -> Dict[MetadataField, str]:
pass
@abstractmethod
def names(self) -> Dict[str, str]:
pass
@abstractmethod
def readable_name(self) -> str:
pass
@abstractmethod
def rank(self) -> int:
pass
@abstractmethod
def population(self) -> int:
pass
@abstractmethod
def road_number(self) -> str:
pass
@abstractmethod
def house_number(self) -> str:
pass
@abstractmethod
def layer(self) -> int:
pass
@abstractmethod
def geom_type(self) -> GeomType:
pass
@abstractmethod
def center(self) -> Point:
pass
@abstractmethod
def geometry(self) -> Union[List[Point], List[Triangle]]:
pass
@abstractmethod
def limit_rect(self) -> Rect:
pass
@abstractmethod
def parse(self):
pass
def __repr__(self):
return (
f"Feature[\n"
f" index: {self.index()}\n"
f" readable name: {self.readable_name()}\n"
f" types: {self.readable_types()}\n"
f" names: {self.names()}\n"
f" metadata: {self.metadata()}\n"
f" geom_type: {self.geom_type()}\n"
f" center: {self.center()}\n"
f" limit_rect: {self.limit_rect()}\n"
f"]"
)
def to_json(self):
center = None
center_ = self.center()
if center_:
center = self.center().to_json()
limit_rect = None
limit_rect_ = self.limit_rect()
if limit_rect_:
limit_rect = limit_rect_.to_json()
return {
"index": self.index(),
"types": {t: readable_type(t) for t in self.types()},
"metadata": {k.name: v for k, v in self.metadata().items()},
"names": self.names(),
"readable_name": self.readable_name(),
"rank": self.rank(),
"population": self.population(),
"road_number": self.road_number(),
"house_number": self.house_number(),
"layer": self.layer(),
"geom_type": self.geom_type(),
"center": center,
"limit_rect": limit_rect,
}

View file

@ -0,0 +1,133 @@
from typing import Dict
from typing import Iterable
from typing import List
from typing import Union
from pygen import classif
from pygen import geometry
from pygen import mwm
from mwm import mwm_interface as mi
def init(resource_path):
classif.init_classificator(resource_path)
class MwmPygen(mi.Mwm):
def __init__(self, filename: str, parse: bool = False):
super().__init__(filename)
self.mwm = mwm.Mwm(filename, parse)
def version(self) -> mi.MwmVersion:
v = self.mwm.version()
return mi.MwmVersion(
format=int(v.format()) + 1,
seconds_since_epoch=v.seconds_since_epoch(),
version=v.version(),
)
def type(self) -> mi.MapType:
t = self.mwm.type()
return mi.MapType(int(t))
def bounds(self) -> mi.Rect:
b = self.mwm.bounds()
return from_pygen_rect(b)
def sections_info(self) -> Dict[str, mi.SectionInfo]:
si = self.mwm.sections_info()
return {
k: mi.SectionInfo(name=v.tag, offset=v.offset, size=v.size)
for k, v in si.items()
}
def __len__(self) -> int:
return self.mwm.__len__()
def __iter__(self) -> Iterable:
return FeaturePygenIter(self.mwm.__iter__())
class FeaturePygenIter:
def __init__(self, iter: mwm.MwmIter):
self.iter = iter
def __iter__(self) -> "FeaturePygenIter":
return self
def __next__(self) -> "FeaturePygen":
ft = self.iter.__next__()
return FeaturePygen(ft)
class FeaturePygen(mi.Feature):
def __init__(self, ft: mwm.FeatureType):
self.ft = ft
def index(self) -> int:
return self.ft.index()
def types(self) -> List[int]:
return self.ft.types()
def metadata(self) -> Dict[mi.MetadataField, str]:
m = self.ft.metadata()
return {mi.MetadataField(int(k)): v for k, v in m.items()}
def names(self) -> Dict[str, str]:
return self.ft.names()
def readable_name(self) -> str:
return self.ft.readable_name()
def rank(self) -> int:
return self.ft.rank()
def population(self) -> int:
return self.ft.population()
def road_number(self) -> str:
return self.ft.road_number()
def house_number(self) -> str:
return self.ft.house_number()
def layer(self) -> int:
return self.ft.layer()
def geom_type(self) -> mi.GeomType:
g = self.ft.geom_type()
return mi.GeomType(int(g))
def center(self) -> mi.Point:
c = self.ft.center()
return from_pygen_point(c)
def geometry(self) -> Union[List[mi.Point], List[mi.Triangle]]:
if self.geom_type() == mi.GeomType.area:
return [from_pygen_triangle(t) for t in self.ft.geometry()]
return [from_pygen_point(t) for t in self.ft.geometry()]
def limit_rect(self) -> mi.Rect:
r = self.ft.limit_rect()
return from_pygen_rect(r)
def parse(self):
self.ft.parse()
def from_pygen_point(p: geometry.PointD) -> mi.Point:
return mi.Point(p.x, p.y)
def from_pygen_rect(r: geometry.RectD) -> mi.Rect:
return mi.Rect(from_pygen_point(r.left_bottom), from_pygen_point(r.right_top))
def from_pygen_triangle(t: geometry.TriangleD) -> mi.Triangle:
return mi.Triangle(
from_pygen_point(t.x()), from_pygen_point(t.y()), from_pygen_point(t.z())
)

View file

@ -0,0 +1,469 @@
import logging
import mmap
import struct
from datetime import datetime
from typing import AnyStr
from typing import Dict
from typing import Iterable
from typing import List
from typing import Tuple
from typing import Union
import math
from mwm import mwm_interface as mi
from mwm.exceptions import FeaturesSectionParseError
logger = logging.getLogger(__name__)
class MwmPython(mi.Mwm):
def __init__(self, filename: str, parse: bool = False):
super().__init__(filename)
self.f = open(filename, "rb")
self.file = mmap.mmap(self.f.fileno(), 0, access=mmap.ACCESS_READ)
self.tags = self._read_sections_info()
self.seek_tag("header")
coord_bits = read_varuint(self.file)
self.coord_size = (1 << coord_bits) - 1
self.base_point = mwm_bitwise_split(read_varuint(self.file))
self.bounds_ = read_bounds(self.file, self.coord_size)
self.scales = read_uint_array(self.file)
self.langs = [mi.LANGS[code] for code in read_uint_array(self.file)]
self.map_type = mi.MapType(read_varint(self.file))
self.version_ = self._read_version()
self.metadata_offsets = self._read_metadata_offsets()
def version(self) -> mi.MwmVersion:
return self.version_
def type(self) -> mi.MapType:
return self.map_type
def bounds(self) -> mi.Rect:
return self.bounds_
def sections_info(self) -> Dict[str, mi.SectionInfo]:
return self.tags
def __len__(self) -> int:
old_pos = self.file.tell()
pos, end = self._get_features_offset_and_size()
size = 0
while pos < end:
self.file.seek(pos)
feature_size = read_varuint(self.file)
pos = self.file.tell() + feature_size
size += 1
self.file.seek(old_pos)
return size
def __iter__(self) -> Iterable:
return MwmPythonIter(self)
def get_tag(self, name: str) -> mi.SectionInfo:
return self.tags[name]
def seek_tag(self, name: str):
self.file.seek(self.tags[name].offset)
def has_tag(self, name: str) -> bool:
return name in self.tags and self.tags[name].size > 0
def _read_sections_info(self) -> Dict[str, mi.SectionInfo]:
self.file.seek(0)
self.file.seek(read_uint(self.file, 8))
tags = {}
for _ in range(read_varuint(self.file)):
name = read_string(self.file, plain=True)
offset = read_varuint(self.file)
length = read_varuint(self.file)
tags[name] = mi.SectionInfo(name=name, offset=offset, size=length)
return tags
def _read_metadata_offsets(self) -> Dict[int, int]:
if self.version_.format >= 10 :
logger.warn("Method _read_metadata_offsets() does not have an implementation.")
return None
self.seek_tag("metaidx")
tag_info = self.get_tag("metaidx")
current = 0
metadata_offsets = {}
while current < tag_info.size:
id = read_uint(self.file, 4)
offs = read_uint(self.file, 4)
metadata_offsets[id] = offs
current += 8
return metadata_offsets
def _get_features_offset_and_size(self) -> Tuple[int, int]:
old_pos = self.file.tell()
pos = 0
end = 0
if self.version_.format < 10:
assert self.has_tag("dat")
tag_info = self.get_tag("dat")
pos = tag_info.offset
end = pos + tag_info.size
else:
assert self.has_tag("features")
tag_info = self.get_tag("features")
self.seek_tag("features")
version = read_uint(self.file, 1)
if version != 0:
self.file.seek(old_pos)
raise FeaturesSectionParseError(f"Unexpected features section version: {version}.")
features_offset = read_uint(self.file, bytelen=4)
if features_offset >= tag_info.size:
self.file.seek(old_pos)
raise FeaturesSectionParseError(f"Wrong features offset: {features_offset}.")
pos = tag_info.offset + features_offset
end = pos + tag_info.size - features_offset
self.file.seek(old_pos)
return pos, end
def _read_version(self) -> mi.MwmVersion:
self.seek_tag("version")
# Skip prolog.
self.file.read(4)
fmt = read_varuint(self.file) + 1
seconds_since_epoch = read_varuint(self.file)
vdate = datetime.fromtimestamp(seconds_since_epoch)
version = int(vdate.strftime("%y%m%d"))
return mi.MwmVersion(
format=fmt, seconds_since_epoch=seconds_since_epoch, version=version
)
class MwmPythonIter:
def __init__(self, mwm: MwmPython):
self.mwm = mwm
self.pos, self.end = self.mwm._get_features_offset_and_size()
self.index = 0
def __iter__(self) -> "MwmPythonIter":
return self
def __next__(self) -> "FeaturePython":
if self.end <= self.pos:
raise StopIteration
self.mwm.file.seek(self.pos)
feature_size = read_varuint(self.mwm.file)
self.pos = self.mwm.file.tell() + feature_size
feature = FeaturePython(self.mwm, self.index)
self.index += 1
return feature
class GeomType:
POINT = 0
LINE = 1 << 5
AREA = 1 << 6
POINT_EX = 3 << 5
class FeaturePython(mi.Feature):
def __init__(self, mwm: MwmPython, index: int):
self.mwm = mwm
self._index = index
header_bits = read_uint(self.mwm.file, 1)
types_count = (header_bits & 0x07) + 1
has_name = header_bits & 0x08 > 0
has_layer = header_bits & 0x10 > 0
has_addinfo = header_bits & 0x80 > 0
geom_type = header_bits & 0x60
self._types = [read_varuint(self.mwm.file) for _ in range(types_count)]
self._names = read_multilang(self.mwm.file) if has_name else {}
self._layer = read_uint(self.mwm.file, 1) if has_layer else 0
self._rank = 0
self._road_number = ""
self._house_number = ""
if has_addinfo:
if geom_type == GeomType.POINT:
self._rank = read_uint(self.mwm.file, 1)
elif geom_type == GeomType.LINE:
self._road_number = read_string(self.mwm.file)
elif geom_type == GeomType.AREA or geom_type == GeomType.POINT_EX:
self._house_number = read_numeric_string(self.mwm.file)
self._geom_type = mi.GeomType.undefined
self._geometry = []
if geom_type == GeomType.POINT or geom_type == GeomType.POINT_EX:
self._geometry = mi.GeomType.point
geometry = [
read_coord(self.mwm.file, self.mwm.base_point, self.mwm.coord_size)
]
elif geom_type == GeomType.LINE:
self._geometry = mi.GeomType.line
elif geom_type == GeomType.AREA:
self._geometry = mi.GeomType.area
def readable_name(self) -> str:
if "default" in self._names:
return self._names["default"]
elif "en" in self._names:
return self._names["en"]
elif self._names:
k = next(iter(self._names))
return self._names[k]
return ""
def population(self) -> int:
logger.warn("Method population() does not have an implementation.")
def center(self) -> mi.Point:
logger.warn("Method center() does not have an implementation.")
def limit_rect(self) -> mi.Rect:
logger.warn("Method limit_rect() does not have an implementation.")
def index(self) -> int:
return self._index
def types(self) -> List[int]:
return self._types
def metadata(self) -> Dict[mi.MetadataField, str]:
mwm = self.mwm
if mwm.metadata_offsets is None or self._index not in mwm.metadata_offsets:
return {}
old_pos = mwm.file.tell()
new_pos = mwm.get_tag("meta").offset + mwm.metadata_offsets[self._index]
mwm.file.seek(new_pos)
metadata = {}
if mwm.version().format >= 8:
sz = read_varuint(mwm.file)
for _ in range(sz):
t = read_varuint(mwm.file)
field = mi.MetadataField(t)
metadata[field] = read_string(mwm.file)
else:
while True:
t = read_uint(mwm.file, 1)
is_last = t & 0x80 > 0
t = t & 0x7F
l = read_uint(mwm.file, 1)
field = mi.MetadataField(t)
metadata[field] = mwm.file.read(l).decode("utf-8")
if is_last:
break
mwm.file.seek(old_pos)
return metadata
def names(self) -> Dict[str, str]:
return self._names
def rank(self) -> int:
return self._rank
def road_number(self) -> str:
return self._road_number
def house_number(self) -> str:
return self._house_number
def layer(self) -> int:
return self._layer
def geom_type(self) -> mi.GeomType:
return self._geom_type
def geometry(self) -> Union[List[mi.Point], List[mi.Triangle]]:
if self._geometry == mi.GeomType.line:
logger.warn("Method geometry() does not have an implementation for line.")
elif self._geometry == mi.GeomType.area:
logger.warn("Method geometry() does not have an implementation for area.")
return self._geometry
def parse(self):
pass
def get_region_info(path):
m = MwmPython(path)
if not m.has_tag("rgninfo"):
return {}
region_info = {}
m.seek_tag("rgninfo")
sz = read_varuint(m.file)
for _ in range(sz):
t = read_varuint(m.file)
field = mi.RegionDataField(t)
region_info[field] = read_string(m.file)
if t == mi.RegionDataField.languages:
region_info[field] = [mi.LANGS[ord(x)] for x in region_info[field]]
return region_info
def read_point(f, base_point: mi.Point, packed: bool = True) -> mi.Point:
"""Reads an unsigned point, returns (x, y)."""
u = read_varuint(f) if packed else read_uint(f, 8)
return mwm_decode_delta(u, base_point)
def to_4326(coord_size: int, point: mi.Point) -> mi.Point:
"""Convert a point in maps.me-mercator CS to WGS-84 (EPSG:4326)."""
merc_bounds = (-180.0, -180.0, 180.0, 180.0) # Xmin, Ymin, Xmax, Ymax
x = point.x * (merc_bounds[2] - merc_bounds[0]) / coord_size + merc_bounds[0]
y = point.y * (merc_bounds[3] - merc_bounds[1]) / coord_size + merc_bounds[1]
y = 360.0 * math.atan(math.tanh(y * math.pi / 360.0)) / math.pi
return mi.Point(x, y)
def read_coord(
f, base_point: mi.Point, coord_size: int, packed: bool = True
) -> mi.Point:
"""Reads a pair of coords in degrees mercator, returns (lon, lat)."""
point = read_point(f, base_point, packed)
return to_4326(coord_size, point)
def read_bounds(f, coord_size) -> mi.Rect:
"""Reads mercator bounds, returns (min_lon, min_lat, max_lon, max_lat)."""
rmin = mwm_bitwise_split(read_varint(f))
rmax = mwm_bitwise_split(read_varint(f))
pmin = to_4326(coord_size, rmin)
pmax = to_4326(coord_size, rmax)
return mi.Rect(left_bottom=pmin, right_top=pmax)
def read_string(f, plain: bool = False, decode: bool = True) -> AnyStr:
length = read_varuint(f) + (0 if plain else 1)
s = f.read(length)
return s.decode("utf-8") if decode else s
def read_uint_array(f) -> List[int]:
length = read_varuint(f)
return [read_varuint(f) for _ in range(length)]
def read_numeric_string(f) -> str:
sz = read_varuint(f)
if sz & 1 != 0:
return str(sz >> 1)
sz = (sz >> 1) + 1
return f.read(sz).decode("utf-8")
def read_multilang(f) -> Dict[str, str]:
def find_multilang_next(s, i):
i += 1
while i < len(s):
try:
c = ord(s[i])
except:
c = s[i]
if c & 0xC0 == 0x80:
break
if c & 0x80 == 0:
pass
elif c & 0xFE == 0xFE:
i += 6
elif c & 0xFC == 0xFC:
i += 5
elif c & 0xF8 == 0xF8:
i += 4
elif c & 0xF0 == 0xF0:
i += 3
elif c & 0xE0 == 0xE0:
i += 2
elif c & 0xC0 == 0xC0:
i += 1
i += 1
return i
s = read_string(f, decode=False)
langs = {}
i = 0
while i < len(s):
n = find_multilang_next(s, i)
try:
lng = ord(s[i]) & 0x3F
except TypeError:
lng = s[i] & 0x3F
if lng < len(mi.LANGS):
try:
langs[mi.LANGS[lng]] = s[i + 1: n].decode("utf-8")
except:
print(s[i + 1: n])
i = n
return langs
def mwm_unshuffle(x: int) -> int:
x = ((x & 0x22222222) << 1) | ((x >> 1) & 0x22222222) | (x & 0x99999999)
x = ((x & 0x0C0C0C0C) << 2) | ((x >> 2) & 0x0C0C0C0C) | (x & 0xC3C3C3C3)
x = ((x & 0x00F000F0) << 4) | ((x >> 4) & 0x00F000F0) | (x & 0xF00FF00F)
x = ((x & 0x0000FF00) << 8) | ((x >> 8) & 0x0000FF00) | (x & 0xFF0000FF)
return x
def mwm_bitwise_split(v) -> mi.Point:
hi = mwm_unshuffle(v >> 32)
lo = mwm_unshuffle(v & 0xFFFFFFFF)
x = ((hi & 0xFFFF) << 16) | (lo & 0xFFFF)
y = (hi & 0xFFFF0000) | (lo >> 16)
return mi.Point(x, y)
def mwm_decode_delta(v, base_point: mi.Point) -> mi.Point:
p = mwm_bitwise_split(v)
return p + base_point
def read_uint(f, bytelen: int = 1) -> int:
if bytelen == 1:
fmt = "B"
elif bytelen == 2:
fmt = "H"
elif bytelen == 4:
fmt = "I"
elif bytelen == 8:
fmt = "Q"
else:
raise Exception("Bytelen {0} is not supported".format(bytelen))
res = struct.unpack(fmt, f.read(bytelen))
return res[0]
def read_varuint(f) -> int:
res = 0
shift = 0
more = True
while more:
b = f.read(1)
if not b:
return res
try:
bc = ord(b)
except TypeError:
bc = b
res |= (bc & 0x7F) << shift
shift += 7
more = bc >= 0x80
return res
def zigzag_decode(uint: int) -> int:
res = uint >> 1
return res if uint & 1 == 0 else -res
def read_varint(f) -> int:
return zigzag_decode(read_varuint(f))

View file

@ -0,0 +1,136 @@
import logging
import os
import timeit
import mwm
logger = logging.getLogger("mwm")
logger.setLevel(logging.ERROR)
def example__storing_features_in_a_collection(path):
ft_list = [ft for ft in mwm.Mwm(path)]
print(f"List size: {len(ft_list)}")
ft_tuple = tuple(ft for ft in mwm.Mwm(path))
print(f"Tuple size: {len(ft_tuple)}")
def slow():
ft_with_metadata_list = []
for ft in mwm.Mwm(path):
if ft.metadata():
ft_with_metadata_list.append(ft)
return ft_with_metadata_list
ft_with_metadata_list = slow()
print("Features with metadata:", len(ft_with_metadata_list))
print("First three are:", ft_with_metadata_list[:3])
def fast():
ft_with_metadata_list = []
for ft in mwm.Mwm(path, False):
if ft.metadata():
ft_with_metadata_list.append(ft.parse())
return ft_with_metadata_list
tslow = timeit.timeit(slow, number=10)
tfast = timeit.timeit(fast, number=10)
print(f"Slow took {tslow}, fast took {tfast}.")
def example__features_generator(path):
def make_gen(path):
return (ft for ft in mwm.Mwm(path))
cnt = 0
print("Names of several first features:")
for ft in make_gen(path):
print(ft.names())
if cnt == 5:
break
cnt += 1
def return_ft(num):
cnt = 0
for ft in mwm.Mwm(path):
if cnt == num:
return ft
cnt += 1
print(return_ft(10))
def example__sequential_processing(path):
long_names = []
for ft in mwm.Mwm(path):
if len(ft.readable_name()) > 100:
long_names.append(ft.readable_name())
print("Long names:", long_names)
def example__working_with_features(path):
it = iter(mwm.Mwm(path))
ft = next(it)
print("Feature members are:", dir(ft))
print("index:", ft.index())
print(
"types:",
ft.types(),
"redable types:",
[mwm.readable_type(t) for t in ft.types()],
)
print("metadata:", ft.metadata())
print("names:", ft.names())
print("readable_name:", ft.readable_name())
print("rank:", ft.rank())
print("population:", ft.population())
print("road_number:", ft.road_number())
print("house_number:", ft.house_number())
print("layer:", ft.layer())
print("geom_type:", ft.geom_type())
print("center:", ft.center())
print("geometry:", ft.geometry())
print("limit_rect:", ft.limit_rect())
print("__repr__:", ft)
for ft in it:
geometry = ft.geometry()
if ft.geom_type() == mwm.GeomType.area and len(geometry) < 10:
print("area geometry", geometry)
break
def example__working_with_mwm(path):
map = mwm.Mwm(path)
print("Mwm members are:", dir(map))
print(map)
print("version:", map.version())
print("type:", map.type())
print("bounds:", map.bounds())
print("sections_info:", map.sections_info())
def main(path):
example__storing_features_in_a_collection(path)
example__features_generator(path)
example__sequential_processing(path)
example__working_with_features(path)
example__working_with_mwm(path)
if __name__ == "__main__":
main(
os.path.join(
os.path.dirname(os.path.realpath(__file__)),
"..",
"..",
"..",
"data",
"minsk-pass.mwm",
)
)

View file

@ -0,0 +1,3 @@
omim-data-essential
omim-data-files
omim-pygen

View file

31
tools/python/mwm/setup.py Executable file
View file

@ -0,0 +1,31 @@
#!/usr/bin/env python3
import os
import sys
import setuptools
module_dir = os.path.abspath(os.path.dirname(__file__))
sys.path.insert(0, os.path.join(module_dir, "..", "..", ".."))
from pyhelpers.setup import chdir
from pyhelpers.setup import get_version
from pyhelpers.setup import get_requirements
with chdir(os.path.abspath(os.path.dirname(__file__))):
setuptools.setup(
name="omim-mwm",
version=str(get_version()),
author="CoMaps",
author_email="info@comaps.app",
description="This package is a library that can work with mwm files.",
url="https://codeberg.org/comaps",
package_dir={"mwm": ""},
packages=["mwm"],
classifiers=[
"Programming Language :: Python :: 3",
"License :: OSI Approved :: Apache Software License",
],
python_requires=">=3.6",
install_requires=get_requirements(),
)

View file

@ -0,0 +1,9 @@
import enum
import json
class EnumAsStrEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, enum.Enum):
return obj.name
return json.JSONEncoder.default(self, obj)