Location>code7788 >text

Geolocation, City Search Service with Elasticsearch

Popularity:510 ℃/2024-07-28 13:45:10

Recently used some simple geographic location query interface , based on the current positioning to obtain information about the user's location (provincial and municipal), and then based on this information to query the current region of ...... Provide services.

Then researched GIS on my own, as a programmer. Can you realize this function by yourself? The answer is of course yes. Immediately start work.

Idea: find the data, write to the database, using Elasticsearch's powerful search capabilities and rich fever GIS data processing capabilities to achieve.

GIS related professional information reference (found on bd, kinda specialized):Getting Started with GIS for Programmers|Both front and back ends should know a little bit about GIS.

After a lot of searching, "Kung Fu" found it on the internetRehoboth (name) Data that is more complete. Downloaded and formatted in SHAPE format.

Step 1: Download the data fromRehoboth (name)downloading

Step 2: Write python script to preprocess the data: ShapFile to GeoJSON, ES is stronger in handling GeoJSON.

import geopandas as gpd

# retrieve Shapefile
shapefile_path = 'D:/data/gis/2023_CTAmap_version_1.12/2023Provincial/2023Provincial.shp'
gdf = gpd.read_file(shapefile_path)

# probe GeoDataFrame
print(())

# If required,Data can be pre-processed,For example, filtering、Selecting specific columns, etc.
# gdf = gdf[['column1', 'column2', 'geometry']]

# commander-in-chief (military) GeoDataFrame Convert to standard Pandas DataFrame (If needed.)
df = ('geometry', axis=1).join(gdf['geometry'].apply(lambda x: (x).to_json()))

# commander-in-chief (military) Pandas DataFrame export as JSON file
output_json_path = 'D:/data/gis/'
# df.to_json(output_json_path, orient='records')

# If you want to keep GeoJSON specification,Can be saved directly GeoDataFrame
gdf.to_file(output_json_path, driver='GeoJSON')

Step 3: Write GeoJSON to Elasticsearch using Python scripts

from elasticsearch import Elasticsearch
from  import bulk
import json

# Connect to Elasticsearch
es= Elasticsearch("http://localhost:9200")

# Check the connection
if not ():
    raise ValueError("Connection failed")

# Delete the old index (if it exists)
if (index="province2023_geoshape_index_001"):
    (index="province2023_geoshape_index_001")

# Create the index and define the Mapping
mapping= {
    "mappings": {
        "properties": {
            "location": {
                "type": "geo_shape"
            },
            "name": {
                "type": "text"
            }
        }
    }
}

# Create the index
(index="province2023_geoshape_index_001", body=mapping)

# Read the GeoJSON file
with open("D:/data/gis/", "r", encoding="utf-8") as file:
    geojson_data = (file)

# Extract the GeoJSON feature set
features= geojson_data.get("features", [])

# Prepare data for import
documents= []
for feature in features:
    doc = {
        "location": {
            "type": feature["geometry"]["type"],
            "coordinates": feature["geometry"]["coordinates"]
        }
    }
    if "properties" in feature:
        (feature["properties"])
    (doc)

# Define the batch size
batch_size= 100  # of batch imports per batch


# Prepare actions
def generate_actions(documents).
for doc in documents:
        yield {
            "_index": "province2023_geoshape_index_001",
            "_source": doc
        }


# Execute batch import in batches
for i in range(0, len(documents), batch_size):
    end = min(i + batch_size, len(documents))
    success, _ = bulk(es, generate_actions(documents[i:end]))
    print(f"Bulk {i}-{end} completed, {success} documents indexed.")

print("All data indexed.")

Step 4: Calculate the center point of the area for each piece of data (extended function, raw data only polygon polygon data)

from elasticsearch import Elasticsearch
from  import bulk
import json
import ssl

# Connect to Elasticsearch
es= Elasticsearch("http://localhost:9200")

# Check the connection
if not ():
    raise ValueError("Connection failed")

# Delete the old index (if it exists)
if (index="province2023_centroid_geoshape_index_001"):
    (index="province2023_centroid_geoshape_index_001")

# Create the index and define the Mapping
mapping= {
    "mappings": {
        "properties": {
            "location": {
                "type": "geo_shape"
            },
            "centroid": { # Add new field
"type": "geo_point"
            },
            "name": {
                "type": "text"
            }
        }
    }
}

# Create the index
(index="province2023_centroid_geoshape_index_001", body=mapping)

# Read the GeoJSON file
with open("D:/data/gis/", "r", encoding="utf-8") as file:
    geojson_data = (file)

# Extract the GeoJSON feature set
features= geojson_data.get("features", [])


def calculate_centroid(polygons):
    total_area = 0.0
    total_x = 0.0
    total_y = 0.0

    for polygon in polygons.
        # Now polygon is a list of multiple coordinates
        centroid= calculate_simple_polygon_centroid(polygon)
        area = calculate_polygon_area(polygon)

        total_area += area
        total_x += centroid[0] * area
        total_y += centroid[1] * area

    if total_area == 0:
        # If the total area is zero, return to the origin as the center point
return [0, 0]
    else:
        return [total_x / total_area, total_y / total_area]


# is_coordinates_list method
# The following structure returns True, polygon is a list containing a list of coordinates
# [
# [[x1, y1], [x2, y2], [x3, y3], ...] ,
# [ [x1, y1], [x2, y2], [x3, y3], ...]  # If there are internal holes
# ]
# The following structure returns Fasle, containing a list of individual coordinates
# [
# [x1, y1].
# [x2, y2], #
# [x3, y3], #
# ...
# ]

def is_coordinate(coord).
return (
            isinstance(coord, (list, tuple)) and
            len(coord) == 2 and
            all(isinstance(c, (int, float)) for c in coord)
    )


def is_coordinates_list(coords).
    # Check if coords is a list containing a list of coordinates.
if isinstance(coords, list):
        if all(isinstance(c, list) and all(is_coordinate(coord) for coord in c) for c in coords):
            return True
    return False


def calculate_simple_polygon_centroid(polygon).
    # Determine the structure of the polygon
if is_coordinates_list(polygon).
        # polygon is a list containing a list of coordinates
        x_sum= sum(coord[0] for coord in polygon[0])
        y_sum = sum(coord[1] for coord in polygon[0])
        num_points = len(polygon[0])
    else:
        # print(False, polygon[0])
        # polygon is a list of multiple coordinates
        x_sum= sum(coord[0] for coord in polygon)
        y_sum = sum(coord[1] for coord in polygon)
        num_points = len(polygon)
    # Calculate the average coordinate
    centroid_x= x_sum / num_points
    centroid_y = y_sum / num_points

    return [centroid_x, centroid_y]


def calculate_polygon_area(polygon).
    # Calculate the area of a simple polygon
    area= 0.0
    if is_coordinates_list(polygon): # polygon is a list containing a list of coordinates
        num_points= len(polygon[0])
        for i in range(num_points):
            j = (i + 1) % num_points
            area += polygon[0][i][0] * polygon[0][j][1]
            area -= polygon[0][j][0] * polygon[0][i][1]

    else: # polygon is a list of multiple coordinates
        num_points= len(polygon)
        for i in range(num_points):
            j = (i + 1) % num_points
            area += polygon[i][0] * polygon[j][1]
            area -= polygon[j][0] * polygon[i][1]
    return abs(area) / 2.0


# Prepare data for import
documents= []
for feature in features.
    # Check if the coordinates are in the valid range
    coordinates= feature["geometry"]["coordinates"]
    centroid = calculate_centroid(coordinates)

    doc = {
        "location": {
            "type": feature["geometry"]["type"],
            "coordinates": coordinates
        },
        "centroid": centroid, # add center point
    }
if "properties" in feature:
        (feature["properties"])
    (doc)

# Define the batch size
batch_size= 100  # of batch imports per batch


# Prepare actions
def generate_actions(documents).
for doc in documents:
        yield {
            "_index": "district2023_centroid_geoshape_index_001",
            "_source": doc
        }


# Execute batch import in batches
for i in range(0, len(documents), batch_size):
    end = min(i + batch_size, len(documents))
    success, errors = bulk(es, generate_actions(documents[i:end]))
    if errors:
        print(f"Bulk {i}-{end} completed, {success} documents indexed, but {len(errors)} documents failed.")
        for error in errors:
            print(error)
    else:
        print(f"Bulk {i}-{end} completed, {success} documents indexed.")

print("All data indexed.")

Step 5: Preprocess the data using elasticsearch's pipeline and reindex capabilities

# geo_centroid Polymerization is an advanced polymerization,It calculates the centroid of a set of geographic locations。exist Elasticsearch center,This function is an advanced feature,通常只exist X-Pack (now called Elastic Security and Elastic Observability) is available in the license.
# Trial for 30 days to experience
POST/province2023_geoshape_index_001/_search
{
  "size": 0,
  "aggs": {
    "centroid": {
      "geo_centroid": {
        "field": "location"
      }
    }
  }
}


POST province2023_centroid_geoshape_index_001/_search
{
  "query": {
    "term": {
      "province.keyword": {
        "value": "Shaanxi Province" 
      }
    }
  }
}

PUT _ingest/pipeline/copy_field_pipeline
{
  "description": "Copy the value of one field to another",
  "processors": [
    {
      "copy": {
        "from": "leave out", 
        "to": "province_name"
      }
    }
  ]
}
GET province2023_centroid_geoshape_index_001/_mapping

GET province2023_centroid_geoshape_index_001/_mapping


PUT _ingest/pipeline/province_multiple_copy_fields_pipeline
{
  "description": "Copy multiple fields to new fields and rename fields to new fields",
  "processors": [
    {
      "set": {
        "field": "province_name",
        "value": "{{{Provincial}}}"
      }
    },
    {
      "remove": {
        "field": "leave out"
      }
    },
    {
      "rename": {
        "field": "provincial code",
        "target_field": "province_code"
      }
    },
    {
      "rename": {
        "field": "Type of province",
        "target_field": "province_type"
      }
    },
    {
      "rename": {
        "field": "VAR_NAME",
        "target_field": "var_name"
      }
    },
    {
      "rename": {
        "field": "ENG_NAME",
        "target_field": "eng_name"
      }
    },
    {
      "rename": {
        "field": "FIRST_GID",
        "target_field": "first_gid"
      }
    },
    {
      "rename": {
        "field": "FIRST_TYPE",
        "target_field": "first_type"
      }
    }
  ]
}

GET province2023_centroid_geoshape_index_002/_count

GET province2023_centroid_geoshape_index_002/_mapping
DELETE province2023_centroid_geoshape_index_002

PUT province2023_centroid_geoshape_index_002
{
  "mappings": {
    "properties": {
      "eng_name": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 256
          }
        }
      },
      "first_gid": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 256
          }
        }
      },
      "first_type": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 256
          }
        }
      },
      "var_name": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 256
          }
        }
      },
      "centroid": {
        "type": "geo_point"
      },
      "location": {
        "type": "geo_shape"
      },
      "name": {
        "type": "text"
      },
      "year": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 256
          }
        }
      }
    }
  }
}

POST _reindex
{
  "source": {
    "index": "province2023_centroid_geoshape_index_001"
  },
  "dest": {
    "index": "province2023_centroid_geoshape_index_002",
    "pipeline": "province_multiple_copy_fields_pipeline"
  }
}

GET province2023_centroid_geoshape_index_002/_search

Step 6: Query the data geo_distance

# centroidfields of thetypebe geo_point,存储的经纬度形式be数组Geopoint as an array
# geo_bounding_box Finds all geographic coordinate points within the bounding box.
POST province2023_centroid_geoshape_index_002/_search
{
  "query": {
    "geo_bounding_box": { 
      "centroid": {
        "top_left": {
          "lat": 42,
          "lon": -72
        },
        "bottom_right": {
          "lat": 40,
          "lon": -74
        }
      }
    }
  }
}

POST province2023_centroid_geoshape_index_002/_search
{
  "query": {
    "geo_distance": {
      "distance": 100,
      "centroid": {
        "lat": 40.09937484066758,
        "lon": 116.41960604340115
      }
    }
  }
}

POST province2023_centroid_geoshape_index_002/_search
{
  "query": {
    "bool": {
      "must": {
        "match": {
          "province_name":"xx city"
        }
      },
      "filter": {
        "geo_distance": {
          "distance": "2km",
          "centroid": {
            "lat": 40.09937484066758,
            "lon": 116.41960604340115
          }
        }
      }
    }
  }
}


POST province2023_centroid_geoshape_index_002/_search
{
  "query": {
    "bool": {
      "must": {
        "match": {
          "province_name":"xx city"
        }
      },
      "filter": {
        "geo_distance": {
          "distance": "200km",
          "location": {
            "lat": 40.09937484066758,
            "lon": 116.41960604340115
          }
        }
      }
    }
  }
}