package publicsWay;
|
import java.time.LocalDate;
|
import java.time.format.DateTimeFormatter;
|
import java.util.ArrayList;
|
import java.util.List;
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
import java.util.concurrent.Executors;
|
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.TimeUnit;
|
|
import databases.DBConnector;
|
import scheduled_task.TrackTableManager;
|
|
public class TrackDataBatchInserter {
|
private static final int BATCH_SIZE = 1000;
|
private static final long BATCH_INTERVAL_MS = 1000;
|
private static final int MAX_QUEUE_SIZE = 100000;
|
|
private static final ConcurrentLinkedQueue<TrackData> trackDataQueue = new ConcurrentLinkedQueue<>();
|
private static final ScheduledExecutorService batchInsertExecutor = Executors.newSingleThreadScheduledExecutor();
|
|
static {
|
batchInsertExecutor.scheduleAtFixedRate(() -> {
|
try {
|
batchInsertTrackData();
|
} catch (Exception e) {
|
System.err.println("ÅúÁ¿²åÈëÊý¾ÝÒì³£: " + e.getMessage());
|
}
|
}, BATCH_INTERVAL_MS, BATCH_INTERVAL_MS, TimeUnit.MILLISECONDS);
|
}
|
|
|
// Ìí¼ÓÊý¾Ýµ½¶ÓÁÐ
|
public static void addTrackData(TrackData data) {
|
if (trackDataQueue.size() < MAX_QUEUE_SIZE) {
|
trackDataQueue.offer(data);
|
}
|
}
|
|
private static void batchInsertTrackData() {
|
if (trackDataQueue.isEmpty()) {
|
return;
|
}
|
|
String tableName = "tb_track_" + LocalDate.now().format(DateTimeFormatter.BASIC_ISO_DATE);
|
|
List<TrackData> batchList = new ArrayList<>(BATCH_SIZE);
|
int count = 0;
|
|
while (count < BATCH_SIZE && !trackDataQueue.isEmpty()) {
|
TrackData data = trackDataQueue.poll();
|
if (data != null) {
|
batchList.add(data);
|
count++;
|
}
|
}
|
|
if (batchList.isEmpty()) {
|
return;
|
}
|
|
try {
|
// ʹÓÃDBConnector¼ì²é±íÊÇ·ñ´æÔÚ£¬²»´æÔÚÔò´´½¨
|
if (!DBConnector.tableExists(tableName)) {
|
DBConnector.executeDDL(TrackTableManager.getcreateSQL(tableName));
|
}
|
|
// ×¼±¸ÅúÁ¿²åÈëÊý¾Ý
|
List<Object[]> paramsList = new ArrayList<>();
|
for (TrackData data : batchList) {
|
Object[] params = new Object[]{
|
data.getDeviceNumber(),
|
data.getDeviceName(),
|
data.getxCoordinate(),
|
data.getxCoordinate(),
|
data.getxCoordinate(),
|
data.getLayer(),
|
data.getBattery(),
|
data.getUtcTimes(),
|
data.getLatitude(),
|
data.getLongitude(),
|
data.getPositioningQuality(),
|
data.getSatelliteCount(),
|
data.getHdop(),
|
data.getAltitude(),
|
data.getGeoidHeight(),
|
data.getDifferentialTime(),
|
data.getSource(),
|
data.getCompany(),
|
data.getSaveTime()
|
};
|
paramsList.add(params);
|
}
|
|
// ʹÓÃDBConnector½øÐÐÅúÁ¿²åÈë
|
String sql = "INSERT INTO " + tableName + " (device_number, device_name, x_coordinate, " +
|
"y_coordinate, z_coordinate, layer, battery, utc_times, latitude, longitude, " +
|
"positioning_quality, satellite_count, hdop, altitude, geoid_height, " +
|
"differential_time, source, company, save_time) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
|
|
DBConnector.executeBatchUpdate(sql, paramsList);
|
|
} catch (Exception e) {
|
System.err.println("ÅúÁ¿²åÈëÊý¾Ý±íÒì³£: " + e.getMessage());
|
// ½«´íÎóµÄÊý¾ÝÖØÐ·ŻضÓÁÐ
|
trackDataQueue.addAll(batchList);
|
}
|
}
|
|
// ¹Ø±ÕÅúÁ¿²åÈë·þÎñ
|
public static void shutdown() {
|
batchInsertExecutor.shutdown();
|
try {
|
if (!batchInsertExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
|
batchInsertExecutor.shutdownNow();
|
}
|
} catch (InterruptedException e) {
|
batchInsertExecutor.shutdownNow();
|
Thread.currentThread().interrupt();
|
}
|
}
|
}
|