Geospatial Spatio-Temporal Event Extraction Pipeline
About 814 wordsAbout 3 min
2026-04-15
1. Overview
The Geospatial Spatio-Temporal Event Extraction Pipeline is designed for geospatial event extraction and refinement. It first extracts event tuples with time and location information from raw text, then filters them by time range and target location, and finally uses an LLM to score rationale and consistency so that only more reliable geospatial events remain.
This pipeline is suitable for:
- extracting floods, eruptions, earthquakes, and other geospatial events
- event retrieval under temporal and spatial constraints
- cleaning GeoKG data before graph construction
- preparing data for geospatial monitoring, analysis, and downstream QA
The main stages are:
- Event extraction: extract event tuples from raw text.
- Time filtering: keep events within the target time window.
- Location filtering: keep events related to the target location.
- Rationale evaluation and filtering: remove obviously implausible events.
- Consistency evaluation and filtering: further remove internally inconsistent events.
2. Quick Start
Step 1: Create a new DataFlow workspace
mkdir run_dataflow_kg
cd run_dataflow_kgStep 2: Initialize pipeline code and default data
dfkg initThe initialization creates:
- Pipeline script:
api_pipelines/geokg_spatiotemporal_event_pipeline.py - Default data:
example_data/GeoKGSpatiotemporalEventPipeline/input.json
Step 3: Configure the API key and optional model settings
export DF_API_KEY=sk-xxxxThe default model is gpt-4o-mini. You can override the defaults with DF_API_URL, DF_LLM_MODEL, or DF_GEOKG_INPUT_FILE.
Step 4: Run the pipeline
python api_pipelines/geokg_spatiotemporal_event_pipeline.py3. Data Flow and Pipeline Logic
3.1 Input Data
This pipeline requires only one mandatory field:
- raw_chunk: source text describing spatio-temporal events
A sample input is:
[
{
"raw_chunk": "On June 3, 2024, a massive flood occurred along the Yangtze River in China, affecting Hubei, Hunan, and Jiangxi. Several cities including Wuhan and Yichang experienced severe inundation."
}
]3.2 Geospatial Event Extraction Pipeline Logic (GeoKGSpatiotemporalEventPipeline)
Step 1: Event Extraction (GeoKGEventExtraction)
Functionality:
- extract event tuples with temporal and spatial fields from raw text
- provide structured event candidates for later filtering and evaluation
Input: raw_chunk
Output: tuple
Step 2: Time Filtering (GeoKGEventTupleTimeFilter)
Functionality:
- filter events by
query_time_startandquery_time_end - keep only records within the target time window
Input: tuple
Output: tuple
Step 3: Location Filtering (GeoKGEventTupleLocationFilter)
Functionality:
- perform fuzzy matching over the
<location>field - keep only geospatial events relevant to
location_name
Input: tuple
Output: tuple
Step 4: Rationale Evaluation (GeoKGEventRationaleEvaluator)
Functionality:
- judge whether each event has basic semantic plausibility
- produce
rationale_scoresfor event tuples
Input: tuple
Output: rationale_scores
Step 5: Rationale Filtering (GeoKGEventRationaleFilter)
Input: tuple, rationale_scores
Output: tuple
Step 6: Consistency Evaluation (GeoKGEventConsistenceEvaluator)
Functionality:
- judge whether event fields are mutually consistent
- detect obvious conflicts among time, location, and event descriptions
Input: tuple
Output: consistency_scores
Step 7: Consistency Filtering (GeoKGEventConsistenceFilter)
Input: tuple, consistency_scores
Output: tuple
3.3 Output Data
Typical output fields include:
- tuple: the final retained geospatial event tuples
- rationale_scores: plausibility scores
- consistency_scores: consistency scores
An example output is:
{
"tuple": [
"<event> Yangtze River flood <location> Wuhan, Hubei, China <time> 2024-06-03 <cause> heavy rainfall <impact> urban inundation",
"<event> Yangtze River flood control <location> Yichang, Hubei, China <time> 2024-06-03 <impact> downstream damage mitigation"
],
"rationale_scores": [0.98, 0.96],
"consistency_scores": [0.97, 0.95]
}4. Pipeline Example
The following is a reference view of the GeoKGSpatiotemporalEventPipeline code generated by dfkg init. For execution, use the generated api_pipelines/geokg_spatiotemporal_event_pipeline.py file.
from dataflow.core import LLMServingABC
from dataflow.operators.domain_kg.geospatial_kg import (
GeoKGEventConsistenceEvaluator,
GeoKGEventConsistenceFilter,
GeoKGEventExtraction,
GeoKGEventRationaleEvaluator,
GeoKGEventRationaleFilter,
GeoKGEventTupleLocationFilter,
GeoKGEventTupleTimeFilter,
)
from dataflow.pipeline import PipelineABC
from dataflow.utils.storage import FileStorage
class GeoKGSpatiotemporalEventPipeline(PipelineABC):
"""Geospatial pipeline for extracting and refining spatio-temporal events.
Required dataset columns:
- `raw_chunk`: source text describing geographic events
"""
def __init__(
self,
first_entry_file_name: str,
llm_serving: LLMServingABC,
cache_path: str = "./cache_local",
file_name_prefix: str = "geokg_spatiotemporal_event_pipeline_step",
cache_type: str = "jsonl",
lang: str = "en",
query_time_start: str = "Q1 2021",
query_time_end: str = "2023-12-31",
location_name: str = "China",
rationale_min_score: float = 0.95,
consistency_min_score: float = 0.95,
):
super().__init__()
if llm_serving is None:
raise ValueError(
"llm_serving is required for GeoKGSpatiotemporalEventPipeline"
)
self.storage = FileStorage(
first_entry_file_name=first_entry_file_name,
cache_path=cache_path,
file_name_prefix=file_name_prefix,
cache_type=cache_type,
)
self.query_time_start = query_time_start
self.query_time_end = query_time_end
self.location_name = location_name
self.rationale_min_score = rationale_min_score
self.consistency_min_score = consistency_min_score
self.event_extraction_step1 = GeoKGEventExtraction(
llm_serving=llm_serving,
lang=lang,
)
self.time_filter_step2 = GeoKGEventTupleTimeFilter(merge_to_input=True)
self.location_filter_step3 = GeoKGEventTupleLocationFilter(
merge_to_input=True
)
self.rationale_eval_step4 = GeoKGEventRationaleEvaluator(
llm_serving=llm_serving,
lang=lang,
)
self.rationale_filter_step5 = GeoKGEventRationaleFilter(
merge_to_input=True
)
self.consistency_eval_step6 = GeoKGEventConsistenceEvaluator(
llm_serving=llm_serving,
lang=lang,
)
self.consistency_filter_step7 = GeoKGEventConsistenceFilter(
merge_to_input=True
)
def forward(self):
self.event_extraction_step1.run(
storage=self.storage.step(),
input_key="raw_chunk",
output_key="tuple",
)
self.time_filter_step2.run(
storage=self.storage.step(),
input_key="tuple",
output_key="tuple",
query_time_start=self.query_time_start,
query_time_end=self.query_time_end,
)
self.location_filter_step3.run(
storage=self.storage.step(),
input_key="tuple",
output_key="tuple",
location_name=self.location_name,
)
self.rationale_eval_step4.run(
storage=self.storage.step(),
input_key="tuple",
output_key="rationale_scores",
)
self.rationale_filter_step5.run(
storage=self.storage.step(),
input_key="tuple",
score_key="rationale_scores",
output_key="tuple",
min_score=self.rationale_min_score,
)
self.consistency_eval_step6.run(
storage=self.storage.step(),
input_key="tuple",
output_key="consistency_scores",
)
self.consistency_filter_step7.run(
storage=self.storage.step(),
input_key="tuple",
score_key="consistency_scores",
output_key="tuple",
min_score=self.consistency_min_score,
)A minimal invocation example is:
from dataflow.serving import APILLMServing_request
from dataflow.statics.pipelines.api_pipelines.geokg_spatiotemporal_event_pipeline import GeoKGSpatiotemporalEventPipeline
llm_serving = APILLMServing_request(
api_url="https://api.openai.com/v1/chat/completions",
key_name_of_api_key="DF_API_KEY",
model_name="gpt-4o-mini",
max_workers=8,
temperature=0.0,
)
pipeline = GeoKGSpatiotemporalEventPipeline(
first_entry_file_name="example_data/GeoKGSpatiotemporalEventPipeline/input.json",
llm_serving=llm_serving,
cache_path="./cache_geokg_event",
query_time_start="2024-01-01",
query_time_end="2024-12-31",
location_name="China",
lang="en",
)
pipeline.forward()
