В один прекрасный момент появилась необходимость записи проведения экзаменов в одном из учебных учреждений, с последующей выгрузкой наработанного видео материала для обработки экспертами. Ранее запись производилась на различные видео регистраторы и специализированные сервера. Так как камер было чуть больше 50, а производить выгрузку материалов с последующим конвертированием и именованием — еще то сомнительное удовольствие, я решил немного автоматизировать данный процесс.
Для решения данной задачи будем производить запись RTSP потоков с IP-камер с использованием следующее ПО: CentOS Linux 7, FFmpeg, MySQL и Java. Захват потока в контейнер будем производить с помощью FFmpeg -набора свободных библиотек с открытым исходным кодом, позволяющем записывать , конвертировать и передавать цифровые аудио и видеозаписи в различных форматах. В базе MySQL будем хранить данные о записях и имеющихся IP камерах.
Для реализации автоматизации запуска захвата видеопотока в контейнер, а так же именования и сортировки записанных файлов, мониторинга состояния потоков было решено написать небольшое приложение на Java. На данный момент используется представленная ниже версия, она имеет некоторые недостатки, но на данный момент является работоспособной и позволила довольно сильно упростить поставленную задачу.
Ниже приведен пример класса newNvr, данный класс содержит в себе основной метод нашего приложения. В данном классе на основе данных о имеющихся камерах мы создаем потоки, в которых будет производиться запись RTSP потоков IP-камер и наполняем вектор данными о созданных потоках. Так же в данном классе производится обработка событий потоков.
Исходный код класса newNvr.java:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
import javax.sql.DataSource; import java.sql.Connection; import java.util.Vector; public class newNvr implements recorderThreadListener{ Vector<recorderThread> cameras = new Vector<>(); recorderThread RecorderThread; private static DataSource ds; //Собственно самый главный метод public static void main (String[] args) { ds = new sqlConnectionPool().getDataSource(); new newNvr().startRecord(); } //Получаем из базы данные о имеющихся камерах и создаем потоки, записывающие видео public void startRecord(){ String[][] cameraUrls = sqlConnectionPool.getCamera(ds) ; for (int i = 0; i < cameraUrls.length; i++) { RecorderThread = new recorderThread(this, cameraUrls[i][0], cameraUrls[i][2], cameraUrls[i][1], cameraUrls[i][3]); System.out.println("Создан поток :"+RecorderThread.getName()); //рассинхроним выполнение потоков при первом запуске try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } } //методы recorderThreadListener //Наполняем вектор созданными потоками и сообщаем о запуске потока @Override public void onStartRecorderThread(recorderThread thread, String message) { cameras.add(thread); System.out.println(message); } //Обрабатываем завершение выполнения потока @Override public void onStopCameraRecorderThread(recorderThread thread, String cameraUrl, String cameraIp, String cameraId, String cameraName) { cameras.remove(thread); RecorderThread = new recorderThread(this, cameraUrl, cameraIp, cameraId, cameraName); } //обрабатываем создание каталога @Override public void onCreateFolderRecorderThread(recorderThread thread, String message) { System.out.println(message); } //В том случае если процесс завершился успешно @Override public void onSucsessRecordRecorderThread(recorderThread thread, String fileUrl, int cameraId, long newTime) { sqlConnectionPool.addRecord(cameraId, fileUrl, newTime, 1, ds); sqlConnectionPool.AddStatus(cameraId, 1,ds); System.out.println("Запись в потоке "+thread.getName() +" произведена успешно"); } //В том случае если процесс завершился неудачно, но не упал с ошибкой @Override public void onUnsuccessRecordRecorderThread(recorderThread thread, String fileUrl, int cameraId, long newTime) { sqlConnectionPool.addRecord(cameraId, fileUrl, newTime, 0, ds); sqlConnectionPool.AddStatus(cameraId, 0, ds); System.out.println("Запись в потоке "+thread.getName() +" сомнительна"); } //выводим ошибку @Override public void onRecorderThreadException(recorderThread thread, Exception e) { System.out.println("Поток " + thread.getName() + " завершился с ошибкой:"); e.printStackTrace(); } //onStartRecordFromCamera @Override public void onStartRecordFromCamera(recorderThread thread, String message) { System.out.println(message); } } |
В классе recorderThread производится работа с RTSP потоком. В данном классе мы проверяем, есть ли каталог для записи RTSP потока в контейнер (если каталог не существует — создаем его) и формируем параметры для последующего запуска утилиты FFmpeg.
Исходный код recorderThread.java :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
import java.io.File; import java.io.IOException; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.concurrent.TimeUnit; import java.util.*; import java.io.*; public class recorderThread extends Thread { // finalCameraUrls, finalCameraIp, finalCameraId) private String cameraUrl; private String cameraId; private String cameraIp; private String cameraName; private recorderThreadListener listener; private static DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd--HH:mm:ss"); private static DateFormat dateFormatForFolder = new SimpleDateFormat("yyyy-MM-dd"); public recorderThread(recorderThreadListener listener, String cameraUrl, String cameraIp, String cameraId, String cameraName ) { this.cameraIp = cameraIp; this.cameraUrl = cameraUrl; this.cameraId = cameraId; this.listener = listener; this.cameraName = cameraName; start(); } @Override public void run(){ Runtime r = Runtime.getRuntime(); Process p = null; listener.onStartRecorderThread(this,"поток запущен"); try { while (!isInterrupted()) { long newTime = System.currentTimeMillis(); String fileRoot = "/var/nvr/"; //создаем в корневой директории каталог с текущей датой для хранения каталогов с камер String directoryToRecord = fileRoot.concat(dateFormatForFolder.format(newTime)); File directory = new File(directoryToRecord); if (!directory.exists()){ directory.mkdir(); listener.onCreateFolderRecorderThread(this, "Создан новый каталог: " + directoryToRecord); } fileRoot = directoryToRecord.concat("/"); //создаем каталог для хранения записи с камеры String directoryToRecord2 = fileRoot.concat(cameraName); File directory2 = new File(directoryToRecord2); if (!directory2.exists()){ directory2.mkdir(); listener.onCreateFolderRecorderThread(this, "Создан новый каталог: " + directoryToRecord2); } //создаем имя для файла, в котором будет храниться запись String fileUrl = directoryToRecord2.concat("/").concat(cameraName)+dateFormat.format(newTime)+".avi"; String newExec = "ffmpeg -i "+ cameraUrl + " -acodec copy -vcodec copy -t 00:05:00 "+fileUrl; //listener.onStartRecordFromCamera(this, "Начинаем запись потока в файл: " + fileUrl); listener.onStartRecordFromCamera(this, "Начинаем запись потока "+this.getName()+" в файл: " + fileUrl); //Запускаем захват потока с помощью ffmpeg и ожидаем окончания его выполнения try { p = r.exec(newExec); InputStream stderr = p.getErrorStream(); InputStreamReader isr = new InputStreamReader(stderr); BufferedReader br = new BufferedReader(isr); String line = null; System.out.println("<ERROR>"); while ( (line = br.readLine()) != null) System.out.println("Ошибка с камерой :"+ cameraName+" :"+line); int exitVal = p.waitFor(); System.out.println("Process exitValue: " + exitVal); // if (p.waitFor(310, TimeUnit.SECONDS)) p.destroyForcibly(); } catch (Throwable e) { e.printStackTrace(); // listener.onRecorderThreadException(this, e); listener.onStopCameraRecorderThread(this, cameraUrl, cameraIp, cameraId, cameraName); interrupt(); } if (p.exitValue()==0){ listener.onSucsessRecordRecorderThread(this, fileUrl, Integer.parseInt(cameraId), newTime); } else { listener.onUnsuccessRecordRecorderThread(this, fileUrl, Integer.parseInt(cameraId), newTime); } } Exception e = null; listener.onStopCameraRecorderThread(this, cameraUrl, cameraIp, cameraId, cameraName); } catch (Exception e) { listener.onRecorderThreadException(this, e); listener.onStopCameraRecorderThread(this, cameraUrl, cameraIp, cameraId, cameraName); } } } |
Далее приведен исходный текст класса sqlConnectionPool. В данном классе реализованы методы для работы с БД с помощью пула соединений HicaryCP.
Исходный код класса sqlConnectionPool.java:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import javax.sql.DataSource; import java.sql.SQLException; import java.sql.Statement; import com.zaxxer.hikari.HikariDataSource; import com.zaxxer.hikari.HikariConfig; public class sqlConnectionPool { private static final String url = "jdbc:mysql://localhost:3306/nvr?autoReconnect=true&useSSL=false"; private static final String user = "root"; private static final String password = "myPass"; private Connection connection; private static DataSource DS; private static Connection con; private static Statement stmt; private static ResultSet rs; private static boolean eu; public DataSource getDataSource() { HikariConfig config = new HikariConfig(); config.setJdbcUrl(url); config.setUsername(user); config.setPassword(password); config.setDriverClassName("com.mysql.jdbc.Driver"); config.setMaximumPoolSize(20); // config.setConnectionTimeout(360000); config.addDataSourceProperty("cachePrepStmts", "true"); config.addDataSourceProperty("prepStmtCacheSize", "250"); config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048"); HikariDataSource ds = new HikariDataSource(config); return ds; } synchronized static void addRecord(int cameraId, String fileUrl, long nowDate, int status, DataSource ds){ String request ="INSERT INTO `record` (`cam_id`, `date`, `url`, `status`) VALUES ('"+cameraId+"', '"+nowDate+"', '"+fileUrl+"', '"+status+"')"; try { con = ds.getConnection(); stmt = con.createStatement(); eu = stmt.execute(request); } catch (SQLException e) { throw new RuntimeException(e); } try { stmt.close(); con.close(); } catch (SQLException e) { e.printStackTrace(); } } synchronized static void AddStatus(int cameraId, int cameraStatus, DataSource ds) { String request ="UPDATE `camera` SET `status` = '"+cameraStatus+"' WHERE `camera`.`id` = "+cameraId; try { con = ds.getConnection(); stmt = con.createStatement(); eu = stmt.execute(request); } catch (SQLException e) { throw new RuntimeException(e); } try { stmt.close(); con.close(); } catch (SQLException e) { e.printStackTrace(); } } synchronized static String[][] getCamera(DataSource ds) { String request = "SELECT `id`, `ip`, `login`, `password`, `url`, `name` FROM `camera`"; String countRequest = "SELECT COUNT(*) FROM `camera`"; // System.out.println(request); try { con = ds.getConnection(); stmt = con.createStatement(); rs = stmt.executeQuery(countRequest); int masLengh = 0; while (rs.next()){ masLengh = rs.getInt(1); } // System.out.println(masLengh); rs = stmt.executeQuery(request); int i = 0; String[][] cameraUrls = new String[masLengh][4]; while (rs.next()) { String id = rs.getString(1); String ip_Adr = rs.getString(2); String login = rs.getString(3); String passwd = rs.getString(4); String url = rs.getString(5); String name = rs.getString(6); String cameraUrl = "rtsp://" + login + ":" + passwd + "@" + ip_Adr + url; cameraUrls[i][0] = cameraUrl; cameraUrls[i][1] = id; cameraUrls[i][2] = ip_Adr; cameraUrls[i][3] = name; i++; } try { stmt.close(); con.close(); } catch (SQLException ex) { ex.printStackTrace(); } return cameraUrls; } catch (SQLException e) { throw new RuntimeException(e); } } } |
Обработка событий реализована с помощью класса-интерфейса, код которого приведен ниже.
Интерфейс recorderThreadListener.java :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
public interface recorderThreadListener { void onStartRecorderThread(recorderThread thread, String message); void onCreateFolderRecorderThread(recorderThread thread, String message); void onSucsessRecordRecorderThread(recorderThread thread, String fileUrl, int cameraId, long newTime); void onUnsuccessRecordRecorderThread(recorderThread thread, String fileUrl, int cameraId, long newTime); void onRecorderThreadException(recorderThread thread, Exception e); void onStopCameraRecorderThread(recorderThread thread, String cameraUrl, String cameraIp, String cameraId, String cameraName); void onStartRecordFromCamera(recorderThread thread, String message); } |