import cv2 import fiona from pyproj import Transformer import numpy as np from tqdm import tqdm from grid_optim import get_optimal_grid def shapefile_to_latlong(file_path: str): c = fiona.open(file_path) contours = [] transformer = Transformer.from_crs(c.crs, 4326, always_xy=True) for poly in c: coords = poly["geometry"].coordinates for coord_set in coords: contours.append( np.array( list( transformer.itransform( coord_set[0] if len(coord_set) == 1 else coord_set ) ) ) ) return contours def mask_shapefile_to_grid_indices( file_path: str, side_len_m: float = 100, meters_per_px: float = 10 ): c = fiona.open(file_path, "r") all_indices = [] all_labels = [] city_counts = {} side_len = side_len_m / meters_per_px for poly in c: coords = poly["geometry"].coordinates city = poly["properties"]["City"] if city not in city_counts: city_counts[city] = 0 for coord_set in tqdm(coords): contour = np.array(coord_set[0] if len(coord_set) == 1 else coord_set) cmin = contour.min(axis=0) contour -= cmin cmax = int(contour.max() / meters_per_px) contour = contour // meters_per_px if cmax < side_len: continue mask = np.zeros((cmax, cmax), dtype="uint8") mask = cv2.drawContours( mask, [contour.reshape((-1, 1, 2)).astype(np.int32)], -1, (255), thickness=cv2.FILLED, ) indices = np.array(get_optimal_grid(mask, side_len=side_len)[0]) indices = indices * meters_per_px + cmin all_indices += sorted(indices.tolist(), key=lambda x: x[1]) all_labels += [ f"{city[0]}-{city_counts[city] + i}" for i in range(len(indices)) ] city_counts[city] += len(indices) transformer = Transformer.from_crs(c.crs, 4326, always_xy=True) all_indices = list(transformer.itransform(all_indices)) return np.array(all_indices), all_labels def points_to_shapefile(points: np.ndarray, labels: list, file_path: str): schema = {"geometry": "Point", "properties": [("ID", "int"), ("Name", "str")]} # open a fiona object pointShp = fiona.open( file_path, mode="w", driver="ESRI Shapefile", schema=schema, crs="EPSG:4326", ) # iterate over each row in the dataframe and save record for i, (x, y) in enumerate(points): rowDict = { "geometry": {"type": "Point", "coordinates": (x, y)}, "properties": {"ID": i, "Name": labels[i]}, } pointShp.write(rowDict) # close fiona object pointShp.close() def get_cached_grid_indices(file_path: str): c = fiona.open(file_path, "r") all_indices = [] all_labels = [] for poly in c: all_indices.append(poly["geometry"].coordinates) if "Name" in poly["properties"]: all_labels.append(poly["properties"]["Name"]) return np.array(all_indices), all_labels