package publicsWay; import java.time.LocalDate; import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import databases.DBConnector; import scheduled_task.TrackTableManager; public class TrackDataBatchInserter { private static final int BATCH_SIZE = 1000; private static final long BATCH_INTERVAL_MS = 1000; private static final int MAX_QUEUE_SIZE = 100000; private static final ConcurrentLinkedQueue trackDataQueue = new ConcurrentLinkedQueue<>(); private static final ScheduledExecutorService batchInsertExecutor = Executors.newSingleThreadScheduledExecutor(); static { batchInsertExecutor.scheduleAtFixedRate(() -> { try { batchInsertTrackData(); } catch (Exception e) { System.err.println("ÅúÁ¿²åÈëÊý¾ÝÒì³£: " + e.getMessage()); } }, BATCH_INTERVAL_MS, BATCH_INTERVAL_MS, TimeUnit.MILLISECONDS); } // Ìí¼ÓÊý¾Ýµ½¶ÓÁÐ public static void addTrackData(TrackData data) { if (trackDataQueue.size() < MAX_QUEUE_SIZE) { trackDataQueue.offer(data); } } private static void batchInsertTrackData() { if (trackDataQueue.isEmpty()) { return; } String tableName = "tb_track_" + LocalDate.now().format(DateTimeFormatter.BASIC_ISO_DATE); List batchList = new ArrayList<>(BATCH_SIZE); int count = 0; while (count < BATCH_SIZE && !trackDataQueue.isEmpty()) { TrackData data = trackDataQueue.poll(); if (data != null) { batchList.add(data); count++; } } if (batchList.isEmpty()) { return; } try { // ʹÓÃDBConnector¼ì²é±íÊÇ·ñ´æÔÚ£¬²»´æÔÚÔò´´½¨ if (!DBConnector.tableExists(tableName)) { DBConnector.executeDDL(TrackTableManager.getcreateSQL(tableName)); } // ×¼±¸ÅúÁ¿²åÈëÊý¾Ý List paramsList = new ArrayList<>(); for (TrackData data : batchList) { Object[] params = new Object[]{ data.getDeviceNumber(), data.getDeviceName(), data.getxCoordinate(), data.getxCoordinate(), data.getxCoordinate(), data.getLayer(), data.getBattery(), data.getUtcTimes(), data.getLatitude(), data.getLongitude(), data.getPositioningQuality(), data.getSatelliteCount(), data.getHdop(), data.getAltitude(), data.getGeoidHeight(), data.getDifferentialTime(), data.getSource(), data.getCompany(), data.getSaveTime() }; paramsList.add(params); } // ʹÓÃDBConnector½øÐÐÅúÁ¿²åÈë String sql = "INSERT INTO " + tableName + " (device_number, device_name, x_coordinate, " + "y_coordinate, z_coordinate, layer, battery, utc_times, latitude, longitude, " + "positioning_quality, satellite_count, hdop, altitude, geoid_height, " + "differential_time, source, company, save_time) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; DBConnector.executeBatchUpdate(sql, paramsList); } catch (Exception e) { System.err.println("ÅúÁ¿²åÈëÊý¾Ý±íÒì³£: " + e.getMessage()); // ½«´íÎóµÄÊý¾ÝÖØÐ·ŻضÓÁÐ trackDataQueue.addAll(batchList); } } // ¹Ø±ÕÅúÁ¿²åÈë·þÎñ public static void shutdown() { batchInsertExecutor.shutdown(); try { if (!batchInsertExecutor.awaitTermination(5, TimeUnit.SECONDS)) { batchInsertExecutor.shutdownNow(); } } catch (InterruptedException e) { batchInsertExecutor.shutdownNow(); Thread.currentThread().interrupt(); } } }