Коннектор Flink
Это официальный Apache Flink Sink Connector с поддержкой от ClickHouse. Он построен на основе AsyncSinkBase Flink и официального java client ClickHouse.
Коннектор поддерживает DataStream API Apache Flink. Поддержка Table API запланирована в одном из будущих релизов.
- Требования
- Матрица совместимости версий Flink
- Установка и настройка
- Использование DataStream API
- Поддерживаемые типы данных
- Поддерживаемые входные форматы
- Метрики
- Ограничения
- Совместимость версий ClickHouse и безопасность
- Расширенные и рекомендуемые варианты использования
- Устранение неполадок
- Участие в разработке и поддержка
Требования
- Java 11+ (для Flink 1.17+) или 17+ (для Flink 2.0+)
- Apache Flink 1.17+
Матрица совместимости версий Flink
Коннектор разбит на два артефакта для поддержки Flink 1.17+ и Flink 2.0+. Выберите артефакт, соответствующий нужной версии Flink:
| Версия Flink | Артефакт | Версия ClickHouse Java Client | Требуемая версия Java |
|---|---|---|---|
| latest | flink-connector-clickhouse-2.0.0 | 0.9.5 | Java 17+ |
| 2.0.1 | flink-connector-clickhouse-2.0.0 | 0.9.5 | Java 17+ |
| 2.0.0 | flink-connector-clickhouse-2.0.0 | 0.9.5 | Java 17+ |
| 1.20.2 | flink-connector-clickhouse-1.17 | 0.9.5 | Java 11+ |
| 1.19.3 | flink-connector-clickhouse-1.17 | 0.9.5 | Java 11+ |
| 1.18.1 | flink-connector-clickhouse-1.17 | 0.9.5 | Java 11+ |
| 1.17.2 | flink-connector-clickhouse-1.17 | 0.9.5 | Java 11+ |
Коннектор не тестировался на версиях Flink ниже 1.17.2
Установка и настройка
Добавьте как зависимость
Для Flink 2.0+
- Maven
- Gradle
- SBT
Для Flink 1.17+
- Maven
- Gradle
- SBT
Скачайте бинарный файл
Шаблон имени JAR-файла:
где:
flink_version— одно из значений:2.0.0или1.17stable_version— версия стабильного релиза артефакта
Все доступные JAR-файлы опубликованных релизов можно найти в репозитории Maven Central.
Использование DataStream API
Пример
Предположим, вы хотите вставить необработанные данные CSV в ClickHouse:
- Java
Дополнительные примеры и фрагменты кода можно найти в наших тестах:
Пример быстрого запуска
Мы подготовили пример на базе Maven для быстрого начала работы с ClickHouse Sink:
Более подробные инструкции см. в руководстве по примеру
Варианты подключения к DataStream API
Параметры клиента ClickHouse
| Параметры | Описание | Значение по умолчанию | Обязательно |
|---|---|---|---|
url | Полный URL ClickHouse | Н/Д | Да |
username | Имя пользователя базы данных ClickHouse | Н/Д | Да |
password | Пароль базы данных ClickHouse | Н/Д | Да |
database | Имя базы данных ClickHouse | Н/Д | Да |
table | Имя таблицы ClickHouse | Н/Д | Да |
options | map параметров конфигурации Java-клиента | Пустой map | Нет |
serverSettings | map настроек сессии сервера ClickHouse | Пустой map | Нет |
enableJsonSupportAsString | Настройка сервера ClickHouse, при которой для типа данных JSON ожидается String в формате JSON | true | Нет |
options и serverSettings следует передавать клиенту как Map<String, String>. Если для любого из них передан пустой map, будут использоваться значения по умолчанию клиента или сервера соответственно.
Все доступные параметры Java-клиента перечислены в ClientConfigProperties.java и на этой странице документации.
Все доступные настройки сессии сервера перечислены на этой странице документации.
Например:
- Java
Параметры sink
Следующие параметры напрямую взяты из AsyncSinkBase во Flink:
| Parameters | Description | Default Value | Required |
|---|---|---|---|
maxBatchSize | Максимальное количество записей, вставляемых за один пакет | N/A | Да |
maxInFlightRequests | Максимальное количество запросов в обработке, допустимое до того, как sink начнет применять обратное давление | N/A | Да |
maxBufferedRequests | Максимальное количество записей, которое может быть буферизовано в sink до применения обратного давления | N/A | Да |
maxBatchSizeInBytes | Максимальный размер пакета (в байтах). Все отправляемые пакеты будут меньше либо равны этому значению | N/A | Да |
maxTimeInBufferMS | Максимальное время, в течение которого запись может находиться в sink перед сбросом | N/A | Да |
maxRecordSizeInBytes | Максимальный размер записи, который принимает sink; записи большего размера будут автоматически отклонены | N/A | Да |
Поддерживаемые типы данных
В таблице ниже приведена краткая справка по преобразованию типов данных при вставке данных из Flink в ClickHouse.
Вставка данных из Flink в ClickHouse
| Тип Java | Тип ClickHouse | Поддерживается | Метод сериализации |
|---|---|---|---|
byte/Byte | Int8 | ✅ | DataWriter.writeInt8 |
short/Short | Int16 | ✅ | DataWriter.writeInt16 |
int/Integer | Int32 | ✅ | DataWriter.writeInt32 |
long/Long | Int64 | ✅ | DataWriter.writeInt64 |
BigInteger | Int128 | ✅ | DataWriter.writeInt128 |
BigInteger | Int256 | ✅ | DataWriter.writeInt256 |
short/Short | UInt8 | ✅ | DataWriter.writeUInt8 |
int/Integer | UInt8 | ✅ | DataWriter.writeUInt8 |
int/Integer | UInt16 | ✅ | DataWriter.writeUInt16 |
long/Long | UInt32 | ✅ | DataWriter.writeUInt32 |
long/Long | UInt64 | ✅ | DataWriter.writeUInt64 |
BigInteger | UInt64 | ✅ | DataWriter.writeUInt64 |
BigInteger | UInt128 | ✅ | DataWriter.writeUInt128 |
BigInteger | UInt256 | ✅ | DataWriter.writeUInt256 |
BigDecimal | Decimal | ✅ | DataWriter.writeDecimal |
BigDecimal | Decimal32 | ✅ | DataWriter.writeDecimal |
BigDecimal | Decimal64 | ✅ | DataWriter.writeDecimal |
BigDecimal | Decimal128 | ✅ | DataWriter.writeDecimal |
BigDecimal | Decimal256 | ✅ | DataWriter.writeDecimal |
float/Float | Float | ✅ | DataWriter.writeFloat32 |
double/Double | Double | ✅ | DataWriter.writeFloat64 |
boolean/Boolean | Boolean | ✅ | DataWriter.writeBoolean |
String | String | ✅ | DataWriter.writeString |
String | FixedString | ✅ | DataWriter.writeFixedString |
LocalDate | Date | ✅ | DataWriter.writeDate |
LocalDate | Date32 | ✅ | DataWriter.writeDate32 |
LocalDateTime | DateTime | ✅ | DataWriter.writeDateTime |
ZonedDateTime | DateTime | ✅ | DataWriter.writeDateTime |
LocalDateTime | DateTime64 | ✅ | DataWriter.writeDateTime64 |
ZonedDateTime | DateTime64 | ✅ | DataWriter.writeDateTime64 |
int/Integer | Time | ❌ | Н/Д |
long/Long | Time64 | ❌ | Н/Д |
byte/Byte | Enum8 | ✅ | DataWriter.writeInt8 |
int/Integer | Enum16 | ✅ | DataWriter.writeInt16 |
java.util.UUID | UUID | ✅ | DataWriter.writeIntUUID |
String | JSON | ✅ | DataWriter.writeJSON |
Array<Type> | Array<Type> | ✅ | DataWriter.writeArray |
Map<K,V> | Map<K,V> | ✅ | DataWriter.writeMap |
Tuple<Type,..> | Tuple<T1,T2,..> | ✅ | DataWriter.writeTuple |
Object | Variant | ❌ | Н/Д |
Примечания:
- При выполнении операций с датами необходимо указать
ZoneId. - При выполнении операций с десятичными числами необходимо указать точность и масштаб.
- Чтобы ClickHouse мог разобрать строку Java как JSON, необходимо включить
enableJsonSupportAsStringвClickHouseClientConfig. - Коннектору требуется
ElementConvertorдля преобразования элементов входного DataStream в данные для ClickHouse. Для этого коннектор предоставляетClickHouseConvertorиPOJOConvertor, которые можно использовать для реализации этого преобразования с помощью указанных выше методов сериализацииDataWriter.
Поддерживаемые входные форматы
Список доступных входных форматов ClickHouse можно найти на этой странице документации и в ClickHouseFormat.java.
Чтобы указать формат, который коннектор должен использовать для сериализации вашего DataStream в данные для ClickHouse, используйте функцию setClickHouseFormat. Например:
По умолчанию коннектор использует RowBinaryWithDefaults или RowBinary, если параметр setSupportDefault в ClickHouseClientConfig явно установлен в true или false соответственно.
Метрики
Коннектор предоставляет следующие дополнительные метрики в дополнение к уже существующим метрикам Flink:
| Metric | Description | Type | Status |
|---|---|---|---|
numBytesSend | Общее количество байтов, отправленных в ClickHouse в полезной нагрузке запроса. Примечание: эта метрика измеряет размер сериализованных данных, переданных по сети, и может отличаться от written_bytes в system.query_log ClickHouse, который отражает фактическое количество байтов, записанных в хранилище после обработки | Counter | ✅ |
numRecordSend | Общее количество записей, отправленных в ClickHouse | Counter | ✅ |
numRequestSubmitted | Общее количество отправленных запросов (фактическое количество выполненных сбросов) | Counter | ✅ |
numOfDroppedBatches | Общее количество батчей, отброшенных из-за ошибок, не допускающих повторной попытки | Counter | ✅ |
numOfDroppedRecords | Общее количество записей, отброшенных из-за ошибок, не допускающих повторной попытки | Counter | ✅ |
totalBatchRetries | Общее количество повторных попыток отправки батчей из-за ошибок, допускающих повторную попытку | Counter | ✅ |
writeLatencyHistogram | Гистограмма распределения задержки успешной записи (мс) | Histogram | ✅ |
writeFailureLatencyHistogram | Гистограмма распределения задержки неуспешной записи (мс) | Histogram | ✅ |
triggeredByMaxBatchSizeCounter | Общее количество сбросов, вызванных достижением maxBatchSize | Counter | ✅ |
triggeredByMaxBatchSizeInBytesCounter | Общее количество сбросов, вызванных достижением maxBatchSizeInBytes | Counter | ✅ |
triggeredByMaxTimeInBufferMSCounter | Общее количество сбросов, вызванных достижением maxTimeInBufferMS | Counter | ✅ |
actualRecordsPerBatch | Гистограмма распределения фактического размера батча | Histogram | ✅ |
actualBytesPerBatch | Гистограмма распределения фактического количества байтов в батче | Histogram | ✅ |
Ограничения
- В настоящее время sink предоставляет гарантию доставки как минимум один раз. Работа над семантикой exactly-once отслеживается здесь.
- Sink пока не поддерживает очередь необрабатываемых сообщений (DLQ) для буферизации записей, которые не удалось обработать. Пока коннектор будет пытаться повторно вставить записи, завершившиеся ошибкой, и отбрасывать их в случае неудачи. Эта возможность отслеживается здесь.
- Sink пока не поддерживает создание через Table API Flink или Flink SQL. Эта возможность отслеживается здесь.
Совместимость версий ClickHouse и безопасность
- Коннектор ежедневно тестируется в CI с рядом последних версий ClickHouse, включая latest и head. Список тестируемых версий периодически обновляется по мере выхода новых релизов ClickHouse. Список версий, с которыми коннектор ежедневно проходит тесты, см. здесь.
- Сведения об известных уязвимостях и инструкции по сообщению о новой уязвимости см. в политике безопасности ClickHouse.
- Мы рекомендуем регулярно обновлять коннектор, чтобы своевременно получать исправления безопасности и другие улучшения.
- Если у вас возникла проблема с миграцией, создайте issue в GitHub, и мы ответим!
Расширенные и рекомендуемые варианты использования
- Для оптимальной производительности убедитесь, что тип элементов вашего DataStream не является Generic — см. описание различий между типами во Flink. Элементы не типа Generic позволяют избежать накладных расходов на сериализацию через Kryo и повысить пропускную способность при записи в ClickHouse.
- Мы рекомендуем установить
maxBatchSizeкак минимум в 1000, а в идеале — в диапазоне от 10 000 до 100 000. Подробнее см. в этом руководстве по пакетным вставкам. - Чтобы выполнять дедупликацию в стиле OLTP или upsert в ClickHouse, обратитесь к этой странице документации. Примечание: не путайте это с дедупликацией пакетов при повторных попытках, которая подробно описана ниже.
Устранение неполадок
CANNOT_READ_ALL_DATA
Может возникнуть следующая ошибка:
Причина: Чаще всего ошибка CANNOT_READ_ALL_DATA означает, что схема таблицы ClickHouse перестала соответствовать схеме записей Flink. Это может произойти, если одна из них была изменена с нарушением обратной совместимости.
Решение: Обновите схему таблицы ClickHouse, входной тип данных коннектора или и то и другое, чтобы они снова стали совместимыми. При необходимости см. сопоставление типов, чтобы понять, как типы Java соотносятся с типами ClickHouse. Примечание: если какие-то записи все еще находятся в обработке, при перезапуске коннектора потребуется сбросить состояние Flink.
Низкая пропускная способность
Вы можете заметить, что пропускная способность коннектора не масштабируется вместе с параллелизмом задания (числом задач Flink) при записи в ClickHouse.
Причина: фоновый процесс слияния частей в ClickHouse может замедлять вставки. Это может происходить, если настроенный размер пакета слишком мал, коннектор слишком часто выполняет сброс, или из-за сочетания обоих факторов.
Решение: Мониторьте метрики numRequestSubmitted и actualRecordsPerBatch, чтобы понять, как подобрать размер пакета (maxBatchSize) и частоту сброса. Рекомендации по размеру пакета также приведены в разделе Расширенное и рекомендуемое использование.
В таблице ClickHouse отсутствуют строки
Причина: Пакеты были отброшены либо из-за ошибки, не подлежащей повторной попытке, либо потому, что их не удалось вставить за заданное число повторных попыток (настраивается через ClickHouseClientConfig.setNumberOfRetries()). Примечание: по умолчанию коннектор пытается повторно вставить пакет до 3 раз, прежде чем отбросить его.
Решение: Проверьте логи TaskManager и/или трассировки стека, чтобы определить первопричину.
Участие в разработке и поддержка
Если вы хотите внести вклад в проект или сообщить о каких-либо проблемах, мы будем рады вашей помощи! Перейдите в наш репозиторий GitHub, чтобы создать issue, предложить улучшения или отправить pull request.
Мы приветствуем ваш вклад! Перед началом работы, пожалуйста, ознакомьтесь с руководством для участников в репозитории. Спасибо, что помогаете улучшать коннектор ClickHouse для Flink!