Paimon, 大数据

paimon-04 DDL

内部表

CREATE TABLE test (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    hh STRING,
    PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
);

分区表

CREATE TABLE test_p (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    hh STRING,
    PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) PARTITIONED BY (dt, hh);
  • 如果设置了主键,分区字段必须是主键的子集
  • 通过配置partition.expiration-time可以自动删除过期的分区

可以定义以下三类字段为分区字段

  • 创建时间(推荐):创建时间通常时不可变的,因此可以放心将其视为分区字段并将其添加到主键中
  • 事件时间:事件时间时原表中一个字段,对于cdc数据来说,比如从mysqlcdc中同步的表或者paimon生成的changelogs他们都是完整的cdc数据,包括update_before记录,即使你声明了包含分区字段的主键,也能达到独特的效果
    • 删除原来事件时间分区的数据,在新的事件事件分区插入新数据
  • cdc op_ts:不能定义为分区字段,无法知道之前的记录时间戳

CREATE AS

CREATE TABLE test1 (
    user_id BIGINT,
    item_id BIGINT
);
insert into test1 values(1,1),(1, 2);
CREATE TABLE test2 AS SELECT * FROM test1;

指定分区

CREATE TABLE test2_p WITH ('partition' = 'dt') AS SELECT * FROM test_p;

指定配置

CREATE TABLE test3(
    user_id BIGINT,
    item_id BIGINT
) WITH ('file.format' = 'orc');

CREATE TABLE test3_op WITH ('file.format' = 'parquet') AS SELECT * FROM test3;

指定主键

CREATE TABLE test_pk WITH ('primary-key' = 'dt,hh') AS SELECT * FROM test;

指定配置

CREATE TABLE test3 (
    user_id BIGINT,
    item_id BIGINT
) WITH ('file.format'='orc');

CREATE TABLE test3_op WITH ('file.format'='parquet') AS SELECT * FROM test3;

CREATE TABLE LIKE

创建与另一个表具有相同schema分区和表属性的表

  • 拷贝表结构 没有数据
CREATE TABLE test_ct1 LIKE test;

表属性

https://paimon.apache.org/docs/master/maintenance/configurations/

外部表

paimon外部表可以在任何catalogh中使用,如果不想创建parmon catalog只想读写表 则可以考虑外部表

CREATE TABLE ex (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    hh STRING,
    PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) WITH (
    'connector'='paimon',
    'path' = 'hdfs://mycluster/paimon/external/ex',
    'auto-create'='true'
);

临时表

临时表只是记录,如果临时表被删除,其资源将不会被删除,flink会话关闭时,临时表也会被删除。

如果想将paimon catalog与其他表一起使用,但不想将他们存储在其他catalog中,可以创建临时表

USE CATALOG hive_catalog;
CREATE TEMPORARY TABLE temp(
    k INT,
    v STRING
) WITH (
    'connector' = 'filesystem',
    'path' = 'hdfs://mycluster/temp.csv',
    'format'='csv'
);

SELECT test.k, test.v, temp.v FROM test JOIN temp ON test.k = temp.k;

修改表

更改 添加表属性

ALTER TABLE test WITH (
    'write-buffer-size' = '256MB'
);

重命名表名称

ALTER TABLE test1 RENAME TO test_new;

删除表属性

ALTER TABLE test RESET ('write-buffer-size');

修改列

添加新列

ALTER TABLE test ADD (c1 INT, c2 STRING);

重命名列

ALTER TABLE test RENAME c1 TO c0;

删除列

ALTER TABLE test DROP (c0, c2);

更改列的可为空性

CREATE TABLE test_null (
    id INT PRIMARY KEY NOT ENFORCED,
    coupon_info FLOAT NOT NULL
);
-- 将coupon_info修改为允许为null
ALTER TABLE test_null MODIFY coupon_info FLOAT;
-- 如果表中已经有null值,修改之前先设置下参数删除null值
SET 'table.exec.sink.not-null-enforcer' = 'DROP'
ALTER TABLE test_null MODIFY coupon_info FLOAT;

修改列注释

ALTER TABLE test MODIFY user_id BIGINT COMMENT '用户id';

添加列位置

ALTER TABLE test ADD a INT FIRST;
ALTER TABLE test ADD b INT AFTER a;

更改列位置

ALTER TABLE test MODIFY b INT FIRST;
ALTER TABLE test MODIFY a INT AFTER user_id;

更改列类型

ALTER TABLE test MODIFY a DOUBLE;

修改水印

添加水印

CREATE TABLE test_wm (
    id INT,
    name STRING,
    ts BIGINT
);
ALTER TABLE test_wm ADD (
    et AS to_timestamp_ltz(ts, 3),
    WATERMARK FOR et AS et - INTERVAL '1' SECOND
);

更改水印

ALTER TABLE test_wm MODIFY WATERMARK FOR et AS et - INTERVAL '2' SECOND;

去掉水印

ALTER TABLE test_wm DROP WATERMARK;