hugegraph+cassadra底层存储实现



阅读次数

参考文档:

1.hugegraph的索引:
https://blog.csdn.net/it1993/article/details/89492296

2.底层存储结构
https://www.jianshu.com/p/c5b1d59b1fcb

###cassadra中的底层存储表

(其中gx开头的基本是给业务数据用的表,而s开头的基本是系统用的表.16张是索引相关的表,3张系统异步任务表,1张counter表,7张存储表)

cqlsh> desc schgraph(测试用的keyspace,根据自己情况修改)
CREATE TABLE schgraph.pk (    //属性定义表
    id int PRIMARY KEY,
    aggregate_type tinyint,
    cardinality tinyint,   //表示该属性是单个值,还是集合?
    data_type tinyint,
    name text,
    properties set<int>,
    status tinyint,
    user_data map<text, text>
) WITH bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';
CREATE INDEX pk_name_index ON schgraph.pk (name);

CREATE TABLE schgraph.vl (    //顶点定义表 
    id int PRIMARY KEY,
    enable_label_index boolean,
    id_strategy tinyint,
    index_labels set<int>,    //关联到index_label这张表
    name text,
    nullable_keys set<int>,
    primary_keys list<int>,   
    properties set<int>,    //关联到属性表
    status tinyint,
    user_data map<text, text>
) WITH bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';
CREATE INDEX vl_name_index ON schgraph.vl (name);




CREATE TABLE schgraph.il (   //index_label表,存储索引定义
    id int PRIMARY KEY,
    base_type tinyint,
    base_value int,
    fields list<int>,
    index_type tinyint,
    name text,
    status tinyint
) WITH bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';
CREATE INDEX il_name_index ON schgraph.il (name);




CREATE TABLE schgraph.el (    //edge_label表,存储边的定义
    id int PRIMARY KEY,
    enable_label_index boolean,
    frequency tinyint,
    index_labels set<int>,
    name text,
    nullable_keys set<int>,
    properties set<int>,
    sort_keys list<int>,
    source_label int,
    status tinyint,
    target_label int,
    user_data map<text, text>
) WITH bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';
CREATE INDEX el_name_index ON schgraph.el (name);


 CREATE TABLE schgraph.g_v (  //graph_vertices顶点的具体存储
    id blob PRIMARY KEY,
    label int,
    properties map<int, blob>
) WITH bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';
CREATE INDEX g_v_label_index ON schgraph.g_v (label);


CREATE TABLE schgraph.g_ie (   //边的具体存储
    owner_vertex blob,
    direction tinyint,
    label int,
    sort_values text,
    other_vertex blob,
    properties map<int, blob>,
    PRIMARY KEY (owner_vertex, direction, label, sort_values, other_vertex)
) WITH CLUSTERING ORDER BY (direction ASC, label ASC, sort_values ASC, other_vertex ASC)
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';        

CREATE TABLE schgraph.g_oe (  //边的具体存储
    owner_vertex blob,
    direction tinyint,
    label int,
    sort_values text,
    other_vertex blob,
    properties map<int, blob>,
    PRIMARY KEY (owner_vertex, direction, label, sort_values, other_vertex)
) WITH CLUSTERING ORDER BY (direction ASC, label ASC, sort_values ASC, other_vertex ASC)
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';
CREATE INDEX g_oe_label_index ON schgraph.g_oe (label);



CREATE TABLE schgraph.s_v ( //整个系统的任务task执行情况,系统中存在很多异步任务,比如新增节点,修改节点都会触发rebuild index这类任务
    id blob PRIMARY KEY,
    label int,
    properties map<int, blob>
) WITH bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';
CREATE INDEX s_v_label_index ON schgraph.s_v (label);

CREATE KEYSPACE schgraph WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '3'}  AND durable_writes = true;



CREATE TABLE schgraph.s_ie (   //system task
    owner_vertex blob,
    direction tinyint,
    label int,
    sort_values text,
    other_vertex blob,
    properties map<int, blob>,
    PRIMARY KEY (owner_vertex, direction, label, sort_values, other_vertex)
) WITH CLUSTERING ORDER BY (direction ASC, label ASC, sort_values ASC, other_vertex ASC)
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';





CREATE TABLE schgraph.s_oe (
    owner_vertex blob,
    direction tinyint,
    label int,
    sort_values text,
    other_vertex blob,
    properties map<int, blob>,
    PRIMARY KEY (owner_vertex, direction, label, sort_values, other_vertex)
) WITH CLUSTERING ORDER BY (direction ASC, label ASC, sort_values ASC, other_vertex ASC)
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';
CREATE INDEX s_oe_label_index ON schgraph.s_oe (label);





CREATE TABLE schgraph.c (   //counter表
    schema_type text PRIMARY KEY,
    id counter
) WITH bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';





CREATE TABLE schgraph.s_ii (
    index_label_id int,
    field_values int,
    element_ids blob,
    PRIMARY KEY (index_label_id, field_values, element_ids)
) WITH CLUSTERING ORDER BY (field_values ASC, element_ids ASC)
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';

CREATE TABLE schgraph.g_di (  //属性值为double,对应的索引表?
    index_label_id int,
    field_values double,
    element_ids blob,
    PRIMARY KEY (index_label_id, field_values, element_ids)
) WITH CLUSTERING ORDER BY (field_values ASC, element_ids ASC)
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';

CREATE TABLE schgraph.g_fi (  //属性值为float对应的索引表?
    index_label_id int,
    field_values float,
    element_ids blob,
    PRIMARY KEY (index_label_id, field_values, element_ids)
) WITH CLUSTERING ORDER BY (field_values ASC, element_ids ASC)
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';



CREATE TABLE schgraph.g_hi (  //属性为text对应的索引表??
    index_label_id int,
    field_values text,
    element_ids blob,
    PRIMARY KEY (index_label_id, field_values, element_ids)
) WITH CLUSTERING ORDER BY (field_values ASC, element_ids ASC)
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';

CREATE TABLE schgraph.s_ai (
    field_values text,
    index_label_id int,
    element_ids blob,
    PRIMARY KEY (field_values, index_label_id, element_ids)
) WITH CLUSTERING ORDER BY (index_label_id ASC, element_ids ASC)
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';



CREATE TABLE schgraph.s_hi (
    index_label_id int,
    field_values text,
    element_ids blob,
    PRIMARY KEY (index_label_id, field_values, element_ids)
) WITH CLUSTERING ORDER BY (field_values ASC, element_ids ASC)
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';



CREATE TABLE schgraph.s_si (
    field_values text,
    index_label_id int,
    element_ids blob,
    PRIMARY KEY (field_values, index_label_id, element_ids)
) WITH CLUSTERING ORDER BY (index_label_id ASC, element_ids ASC)
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';

CREATE TABLE schgraph.s_di (
    index_label_id int,
    field_values double,
    element_ids blob,
    PRIMARY KEY (index_label_id, field_values, element_ids)
) WITH CLUSTERING ORDER BY (field_values ASC, element_ids ASC)
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';

CREATE TABLE schgraph.s_li (
    index_label_id int,
    field_values bigint,
    element_ids blob,
    PRIMARY KEY (index_label_id, field_values, element_ids)
) WITH CLUSTERING ORDER BY (field_values ASC, element_ids ASC)
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';

CREATE TABLE schgraph.s_ui (
    field_values text,
    index_label_id int,
    element_ids blob,
    PRIMARY KEY (field_values, index_label_id, element_ids)
) WITH CLUSTERING ORDER BY (index_label_id ASC, element_ids ASC)
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';

CREATE TABLE schgraph.g_li (  //属性值为long对应的索引表?
    index_label_id int,  //对应index_label里面的索引id
    field_values bigint,
    element_ids blob,
    PRIMARY KEY (index_label_id, field_values, element_ids)
) WITH CLUSTERING ORDER BY (field_values ASC, element_ids ASC)
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';



CREATE TABLE schgraph.g_ii (  //属性值为int对应的索引表?
    index_label_id int,
    field_values int,
    element_ids blob,
    PRIMARY KEY (index_label_id, field_values, element_ids)
) WITH CLUSTERING ORDER BY (field_values ASC, element_ids ASC)
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';

CREATE TABLE schgraph.g_ai (  //和g_hi的区别在哪里?
    field_values text,
    index_label_id int,
    element_ids blob,
    PRIMARY KEY (field_values, index_label_id, element_ids)
) WITH CLUSTERING ORDER BY (index_label_id ASC, element_ids ASC)
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';

CREATE TABLE schgraph.s_fi (
    index_label_id int,
    field_values float,
    element_ids blob,
    PRIMARY KEY (index_label_id, field_values, element_ids)
) WITH CLUSTERING ORDER BY (field_values ASC, element_ids ASC)
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';


    CREATE TABLE schgraph.g_si (  //🎧s
    field_values text,
    index_label_id int,
    element_ids blob,
    PRIMARY KEY (field_values, index_label_id, element_ids)
) WITH CLUSTERING ORDER BY (index_label_id ASC, element_ids ASC)
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';



    CREATE TABLE schgraph.g_ui (
    field_values text,
    index_label_id int,
    element_ids blob,
    PRIMARY KEY (field_values, index_label_id, element_ids)
) WITH CLUSTERING ORDER BY (index_label_id ASC, element_ids ASC)
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';

###详细示例

####1.图定义

graph.schema().propertyKey("uid").asText().ifNotExist().create();
graph.schema().propertyKey("shorturl").asText().ifNotExist().create();
graph.schema().propertyKey("timestamp").asLong().ifNotExist().create();
graph.schema().vertexLabel("person").properties("uid").primaryKeys("uid").ifNotExist().create();
graph.schema().vertexLabel("shorturl").properties("shorturl").primaryKeys("shorturl").ifNotExist().create();
graph.schema().indexLabel("createByTime").onE("create").by("timestamp").range().ifNotExist().create();



cqlsh:hgraphtest> select * from pk;                 

 id  | aggregate_type | cardinality | data_type | name               | properties | status | user_data
-----+----------------+-------------+-----------+--------------------+------------+--------+----------------------
   1 |              0 |           1 |         8 |                uid |       null |      1 |                 null
    2 |              0 |           1 |         8 |           shorturl |       null |      1 |                 null
    3 |              0 |           1 |         5 |          timestamp |       null |      1 |                 null



    cqlsh:hgraphtest> select * from vl;  

 id  | enable_label_index | id_strategy | index_labels | name     | nullable_keys             | primary_keys | properties                                                   | status | user_data
-----+--------------------+-------------+--------------+----------+---------------------------+--------------+--------------------------------------------------------------+--------+-----------
   1 |               True |           2 |         null |   person |                      null |          [1] |                                                          {1} |      1 |      null
   2 |               True |           2 |         null | shorturl |                      null |          [2] |                                                          {2} |      1 |      null  

cqlsh:hgraphtest> select * from il;

 id  | base_type | base_value | fields | index_type | name                        | status
-----+-----------+------------+--------+------------+-----------------------------+--------
   1 |         2 |          1 |    [3] |         23 |                createByTime |      2



    cqlsh:hgraphtest> select * from el;

 id | enable_label_index | frequency | index_labels | name   | nullable_keys | properties | sort_keys | source_label | status | target_label | user_data
----+--------------------+-----------+--------------+--------+---------------+------------+-----------+--------------+--------+--------------+-----------
  1 |               True |         1 |          {1} | create |          null |        {3} |      null |            1 |      1 |            2 |      null

(1 rows)

因为我们没有插入顶点和边,这时候g_v,g_ie,g_oe都为空,统计表如图:

cqlsh:hgraphtest> select * from c;

 schema_type  | id
--------------+----
 PROPERTY_KEY |  3
 VERTEX_LABEL |  2
   SYS_SCHEMA | 47
  INDEX_LABEL |  1
   EDGE_LABEL |  1
         TASK |  1

####2.插入顶点

marko = graph.addVertex(T.label, “person”, “uid”,”2229095301”)
url=graph.addVertex(T.label, “shorturl”, “shorturl”,”MfDALoIBLMXbuQIezN6T2Q”)

cqlsh:hgraphtest> select * from g_v;

id | label | properties
——————————————————+——-+——————————————————-
0x8b313a32323239303935333031 | 1 | {1: 0x0a32323239303935333031}
0x97323a4d6644414c6f49424c4d5862755149657a4e36543251 | 2 | {2: 0x164d6644414c6f49424c4d5862755149657a4e36543251}

####3.插入边

marko.addEdge(“create”, url, “timestamp”, 1587779170)

cqlsh:hgraphtest> select * from g_oe;

 owner_vertex                 | direction | label | sort_values | other_vertex                                         | properties
------------------------------+-----------+-------+-------------+------------------------------------------------------+-------------------
 0x8b313a32323239303935333031 |      -126 |     1 |             | 0x97323a4d6644414c6f49424c4d5862755149657a4e36543251 | {3: 0x85f58eac62}

(1 rows)
cqlsh:hgraphtest> select * from g_ie;

 owner_vertex                                         | direction | label | sort_values | other_vertex                 | properties
------------------------------------------------------+-----------+-------+-------------+------------------------------+-------------------
 0x97323a4d6644414c6f49424c4d5862755149657a4e36543251 |      -116 |     1 |             | 0x8b313a32323239303935333031 | {3: 0x85f58eac62}

(1 rows)


cqlsh:hgraphtest> select * from g_li;(timestamp对应的那个range索引)

 index_label_id | field_values | element_ids
----------------+--------------+------------------------------------------------------------------------------------------
              1 |   1587779170 | 0x7e8b313a32323239303935333031820801ff97323a4d6644414c6f49424c4d5862755149657a4e36543251

spark graphx计算pagerank源代码分析



阅读次数

##入口函数

def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double] = {                                                      
    PageRank.runUntilConvergence(graph, tol, resetProb)
}

其中resetProb的作用可以参考pagerank原理的说明:

##实现函数

196   def runUntilConvergenceWithOptions[VD: ClassTag, ED: ClassTag](
197       graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15,
198       srcId: Option[VertexId] = None): Graph[Double, Double] =
199   {
//这个用来表示,用户是否自定义了游走的起点
200     val personalized = srcId.isDefined
//默认是随机选择起点
201     val src: VertexId = srcId.getOrElse(-1L)
202 
// Initialize the pagerankGraph with each edge attribute
// having weight 1/outDegree and each vertex with attribute 1.0.
//初始化pagerankGraph,边的属性设置为源节点的出度的倒数。对于启动节点,顶点设置为(resetProb, Double.NegativeInfinity),其他的顶点设置为(0,0),图的顶点格式为VertextRDD[(Double,Double)]
205     val pagerankGraph: Graph[(Double, Double), Double] = graph
206       // Associate the degree with each vertex
207       .outerJoinVertices(graph.outDegrees) {
208         (vid, vdata, deg) => deg.getOrElse(0)
209       }
210       // Set the weight on the edges based on the degree
211       .mapTriplets( e => 1.0 / e.srcAttr )
212       // Set the vertex attributes to (initalPR, delta = 0)
213       .mapVertices { (id, attr) =>
214         if (id == src) (resetProb, Double.NegativeInfinity) else (0.0, 0.0)
215       }
216       .cache()
217 
// Define the three functions needed to implement PageRank in the GraphX
// version of Pregel

//定义pregel中的vprog,这里的msgSum就是下面map/reduce产出的messages图,每个节点的格式是VertexRDD[double],更新图
220     def vertexProgram(id: VertexId, attr: (Double, Double), msgSum: Double): (Double, Double) = {
221       val (oldPR, lastDelta) = attr
222       val newPR = oldPR + (1.0 - resetProb) * msgSum
223       (newPR, newPR - oldPR)
224     }
//定义指定了启动节点的vprog
226     def personalizedVertexProgram(id: VertexId, attr: (Double, Double),
227       msgSum: Double): (Double, Double) = {
228       val (oldPR, lastDelta) = attr
229       var teleport = oldPR
230       val delta = if (src==id) 1.0 else 0.0
231       teleport = oldPR*delta
232 
233       val newPR = teleport + (1.0 - resetProb) * msgSum
234       val newDelta = if (lastDelta == Double.NegativeInfinity) newPR else newPR - oldPR
235       (newPR, newDelta)
236     }

//定义pregel中的sendmessage,如果源节点的权重>tol的话,则按照边的权重(edge.attr)加权传递过来
238     def sendMessage(edge: EdgeTriplet[(Double, Double), Double]) = {
239       if (edge.srcAttr._2 > tol) {
240         Iterator((edge.dstId, edge.srcAttr._2 * edge.attr))
241       } else {
242         Iterator.empty
243       }
244     }
//定义pregel的reduce程序,将各个节点传递过来的权重相加即可
246     def messageCombiner(a: Double, b: Double): Double = a + b
247 
248     // The initial message received by all vertices in PageRank
249     val initialMessage = if (personalized) 0.0 else resetProb / (1.0 - resetProb)
250 
251     // Execute a dynamic version of Pregel.
252     val vp = if (personalized) {
253       (id: VertexId, attr: (Double, Double), msgSum: Double) =>
254         personalizedVertexProgram(id, attr, msgSum)
255     } else {
256       (id: VertexId, attr: (Double, Double), msgSum: Double) =>
257         vertexProgram(id, attr, msgSum)
258     }
259 
260     Pregel(pagerankGraph, initialMessage, activeDirection = EdgeDirection.Out)(
261       vp, sendMessage, messageCombiner)
262       .mapVertices((vid, attr) => attr._1)
263   } // end of deltaPageRank
264 
265 } 

###pregel的具体实现(一次mapReduceTriplets完成一次全局权重调整,vprog可以理解为两次权重调整的更新)

class GraphOps[VD, ED] {
  def pregel[A]
      (initialMsg: A,
       maxIter: Int = Int.MaxValue,
       activeDir: EdgeDirection = EdgeDirection.Out)
      (vprog: (VertexId, VD, A) => VD,
       sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
       mergeMsg: (A, A) => A)
    : Graph[VD, ED] = {
    // Receive the initial message at each vertex
    var g = mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ).cache()  

    // compute the messages
    var messages = GraphXUtils.mapReduceTriplets(g, sendMsg, mergeMsg)  
    var activeMessages = messages.count()
    // Loop until no messages remain or maxIterations is achieved
    var i = 0
    while (activeMessages > 0 && i < maxIterations) {
      // Receive the messages and update the vertices.
      g = g.joinVertices(messages)(vprog).cache()
      val oldMessages = messages
      // Send new messages, skipping edges where neither side received a message. We must cache
      // messages so it can be materialized on the next line, allowing us to uncache the previous
      // iteration.
      messages = GraphXUtils.mapReduceTriplets(
        g, sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()
      activeMessages = messages.count()
      i += 1
    }
    g
  }
}

druid安装配置&数据导入



阅读次数

一.druid的设计与架构

设计:http://druid.io/docs/0.9.1.1/design/design.html
白皮书:http://static.druid.io/docs/druid.pdf

(https://docs.imply.io/on-premise/deploy/cluster)

五种节点类型:

Historical: 离线节点,加载离线存储的segments。它和coordinator通过zk进行联系。当接收到新的segments加载请求的时候,先查本地,没命中则根据metadata信息从deep storage中加载,加载完成后申报到zk,这时候该segment就可以被查询

Broker:接受查询,根据zk的信息查询segment的位置,将查询路由到正确的位置。最后merge结果返回

Coordinator:协调segment的存储,决定哪些segments应该进historical nodes

Indexing Service:包含三大组件。peon,middle manager,overlord
任务从overlord的http提交,由middle manager分配给Peons处理。

Realtime:实时节点

其他名词:

Tranquility: helps you send real-time event streams to Druid and handles partitioning, replication, service discovery, and schema rollover, seamlessly and without downtime

Tranquility server:一个http server,有它就可以不需要写java程序来导数据到druid,而通过http接口就可以

依赖的外围模块:

Deep Storage:
Metadata Storage:
ZooKeeper:


二.安装部署:

1.参考

http://druid.io/docs/0.9.1.1/tutorials/cluster.html(./bin/xxx.sh start启动各个对应的服务)
http://www.open-open.com/lib/view/open1447852962978.html

2.下载druid以及mysql extenstion&tranquility

http://druid.io/downloads.html

3.拷贝mysql-metadata-storage-0.12.0.tar.gz到extensions路径下并解压

create database druid DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci;

4.配置conf/druid/_common/common.runtime.properties

5.拷贝hadoop相关配置到conf/druid/_common下(core-site.xml, hdfs-site.xml, yarn-site.xml, mapred-site.xm)

6.启动各个组件:(也可以使用sh bin/XXX.sh start来启动)

启动一个实例就够了
java cat conf/druid/coordinator/jvm.config | xargs -cp conf/druid/_common:conf/druid/coordinator:lib/ io.druid.cli.Main server coordinator
java cat conf/druid/overlord/jvm.config | xargs -cp conf/druid/_common:conf/druid/overlord:lib/
io.druid.cli.Main server overlord

可以按需启动多个实例
java cat conf/druid/historical/jvm.config | xargs -cp conf/druid/_common:conf/druid/historical:lib/ io.druid.cli.Main server historical
java cat conf/druid/middleManager/jvm.config | xargs -cp conf/druid/_common:conf/druid/middleManager:lib/
io.druid.cli.Main server middleManager
java cat conf/druid/broker/jvm.config | xargs -cp conf/druid/_common:conf/druid/broker:lib/* io.druid.cli.Main server broker

遇到问题:

historical node内存启不了

启动得时候报错:

12) Not enough direct memory.  Please adjust -XX:MaxDirectMemorySize, druid.processing.buffer.sizeBytes, druid.processing.numThreads, or druid.processing.numMergeBuffers: maxDirectMemory[2,147,483,648], memoryNeeded[5,368,709,120] = druid.processing.buffer.sizeBytes[536,870,912] * (druid.processing.numMergeBuffers[2] + druid.processing.numThreads[7] + 1)

根据提示将maxDirectMemory从2G修改为5G就可以了。。

https://groups.google.com/forum/#!topic/druid-user/j0sFcUIiQiE

三.druid的数据导入简介:(分files和stream)

files方式不依赖tranquility,参考http://druid.io/docs/latest/tutorials/tutorial-batch.html

stream数据导入有两种方式:

1.Tranquility (a Druid-aware client) and the indexing service(push方式)

2.Realtime nodes(不推荐,有若干缺点:http://druid.io/docs/0.9.1.1/ingestion/stream-pull.html#limitations) (pull)

stream push & stream pull &batch ingestion

stream push有两种方式:

1)通过Tranquility server通过http接口推进去
http://druid.io/docs/0.9.1.1/tutorials/tutorial-streams.html

2)通过Tranquility Kafka推进去

stream pull:

通过realtime node的方式,参考:
http://druid.io/docs/latest/ingestion/stream-pull.html

四.druid的数据导入

1.files导入,参考:http://druid.io/docs/latest/tutorials/tutorial-batch.html

curl -X ‘POST’ -H ‘Content-Type:application/json’ -d@./wikiticker-index.json host-170.bjyz:8090/druid/indexer/v1/task

遇到问题:

1)peon启动不起来,报错:

3) Not enough direct memory.  Please adjust -XX:MaxDirectMemorySize, druid.processing.buffer.sizeBytes, druid.processing.numThreads, or druid.processing.numMergeBuffers: maxDirectMemory[1,908,932,608], memoryNeeded[2,684,354,560] = druid.processing.buffer.sizeBytes[536,870,912] * (druid.processing.numMergeBuffers[2] + druid.processing.numThreads[2] + 1)

修改:druid.indexer.runner.javaOpts=-server -Xmx2g -XX:MaxDirectMemorySize=2560m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Djav
a.util.logging.manager=org.apache.logging.log4j.jul.LogManager

可以查看状态:

http://host-170.bjyz.baidu.com:8090/console.html (overlord得端口,可以查看导入服务得运行状态)

可以看到middlemanager已经启动了一个peon来执行任务:

Main internal peon var/druid/task/index_hadoop_wikiticker_2018-03-27T10:07:12.646Z/task.json var/druid/task/index_hadoop_wikiticker_2018-03-27T10:07:12.646Z/d93e84a0-d9f2-40e4-8b1a-0e24072a00f3/status.json

2)默认的mr jobs是提交到yarn集群的default队列,为了修改该peon所提交得mapreduce job得queue name。设置一下common中得mapred-site.xml得mapreduce.job.queuename参数

3)关于yarn集群java版本和druid依赖java版本不一致得问题:

conf.set(“mapred.child.env”, “JAVA_HOME=/home/iteblog/java/jdk1.8.0_25”);
conf.set(“yarn.app.mapreduce.am.env”, “JAVA_HOME=/home/iteblog/java/jdk1.8.0_25”);

可以参考:https://www.iteblog.com/archives/1883.html

4)遇到JNDI lookup class is not available because this JRE does not support JNDI的问题(这个warning无视就好了。。。哈哈)

参考http://druid.io/docs/latest/operations/other-hadoop.html的Tip 2解决

5)使用的wikiticker-index.json文件

{
 "type" : "index_hadoop",
 "spec" : {
   "ioConfig" : {
     "type" : "hadoop",
     "inputSpec" : {
       "type" : "static",
       "paths" : "/smallfile/druid/quickstart/wikiticker-2015-09-12-sampled.json.gz"
     }
   },
   "dataSchema" : {
     "dataSource" : "wikiticker",
     "granularitySpec" : {
       "type" : "uniform",
       "segmentGranularity" : "day",
       "queryGranularity" : "none",
       "intervals" : ["2015-09-12/2015-09-13"]
     },
     "parser" : {
       "type" : "hadoopyString",
               "parseSpec" : {
         "format" : "json",
         "dimensionsSpec" : {
           "dimensions" : [
             "channel",
             "cityName",
             "comment",
             "countryIsoCode",
             "countryName",
             "isAnonymous",
             "isMinor",
             "isNew",
             "isRobot",
             "isUnpatrolled",
             "metroCode",
             "namespace",
             "page",
             "regionIsoCode",
                           "regionName",
             "user"
           ]
         },
         "timestampSpec" : {
           "format" : "auto",
           "column" : "time"
         }
       }
     },
     "metricsSpec" : [
       {
         "name" : "count",
         "type" : "count"
       },
       {
         "name" : "added",
         "type" : "longSum",
         "fieldName" : "added"
       },        {
         "name" : "deleted",
         "type" : "longSum",
         "fieldName" : "deleted"
       },
       {
         "name" : "delta",
         "type" : "longSum",
         "fieldName" : "delta"
       },
       {
         "name" : "user_unique",
         "type" : "hyperUnique",
         "fieldName" : "user"
       }
     ]
   },
   "tuningConfig" : {
     "type" : "hadoop",
     "partitionsSpec" : {        "type" : "hashed",
       "targetPartitionSize" : 5000000
     },
     "jobProperties" : {
       "mapreduce.map.java.opts":"-Duser.timezone=UTC -Dfile.encoding=UTF-8",
       "mapreduce.reduce.java.opts":"-Duser.timezone=UTC -Dfile.encoding=UTF-8",
       "mapred.child.env":"JAVA_HOME=/home/work/.jumbo/opt/sun-java8",
       "yarn.app.mapreduce.am.env":"JAVA_HOME=/home/work/.jumbo/opt/sun-java8",
       "mapreduce.job.queuename":"bigJob",
       "mapreduce.job.classloader": "true"
       }
   }
 }

6)确认任务真的导入成功

通过broker接口查询导入的user有多少
curl -X POST ‘host-175.bjyz:8082/druid/v2/?pretty’ -H ‘Content-Type:application/json’ -d @query/useruniq.json

useruniq.json内容:

{
  "queryType": "timeseries",
  "dataSource": "wikiticker",
  "granularity": "day",
  "aggregations": [
    { "type": "hyperUnique", "name": "user_unique", "fieldName": "user_unique" }
  ],
  "intervals": [ "2015-09-12T00:00:00.000/2015-09-13T00:00:00.000" ],
  "context" : {
    "skipEmptyBuckets": "true"
  }
}

通过broker接口查询导入的meta信息:
curl -X POST ‘host-175.bjyz:8082/druid/v2/?pretty’ -H ‘Content-Type:application/json’ -d @query/metadata.json

{
  "queryType":"segmentMetadata",
  "dataSource":"wikiticker",
  "intervals":["2015-09-12/2015-09-13"]
}

或者使用自带的查询串
curl -X POST ‘host-175.bjyz:8082/druid/v2/?pretty’ -H ‘Content-Type:application/json’ -d @wikiticker-top-pages.json查看被编辑最大的pages

2.stream push方式(参考:http://druid.io/docs/latest/ingestion/stream-push.html)
stream push主要是借助了tranquility,关于tranquility的介绍:https://github.com/druid-io/tranquility/blob/master/docs/overview.md

tranquility导入数据主要有几种方式:

a)tranquility server (http接口)
b)tranquility kafka(用户将数据推入kafka,tranquility写入druid)
c)自己写一个依赖tranquility library的JVM app
参考:https://github.com/druid-io/tranquility/blob/master/docs/core.md

d)利用tranquility里面实现的各种流连接器,比如spark如何写入druid:
https://github.com/druid-io/tranquility/blob/master/docs/spark.md

其中a)b)方案依赖一定第三方服务。c)d)只依赖tranquility的library

针对spark中的计算结果如何写入druid,会另外开一篇文章专门讨论

3.stream pull方式

这种方式需要用到realtime node。貌似不推荐,这里不多研究

四.druid&caravel

当在druid存储了数据后,我们使用caravel页面进行展示

1)add druid cluster

配置以下coordinator以及broker的地址即可
配置完成保存后refresh一下druid元数据

然后点击进入datasource就可以愉快地olap了。如果没有数据检查下数据的起始时间。(例如例子中导入的2015年数据,需要选择4 years ago)

2)配置报表

在datasource视图中选择分组,度量查询之后,将结果保存成slice,点击报表标签页面可以看到刚才保存的slice,选择仪表盘页面新建仪表盘,报表就选择刚才保存的slice。。。

spark远程调试



阅读次数

##spark远程调试

深入了解spark少不了改变代码进行调试,因此编译调试是必备技能:

###首先说一下编译:

完整编译:

./build/sbt -Pyarn -Phive -Phive-thriftserver -Dhadoop.version=2.7.3 -DskipTests clean package

增量编译:

./build/sbt -Pyarn -Phive -Phive-thriftserver -DskipTests  ~package
export SPARK_PREPEND_CLASSES=1

如果使用maven默认就是增量编译

###idea远程调试

先决条件:拥有一个在idea能够完整编译通过的spark项目

####1.在idea中run->edit configurations设置remote->remotedebug参数

其中debugger mode选择attach,然后设置要链接的host以及port(也就是spark进程启动的机器以及监听端口)

####2.在远程host上启动spark程序,设置jvm参数

–driver-java-options “ -Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=8888”

比如一个提交到yarn集群的例子:

/home/work/dataplatform/spark-2.0.2/bin/spark-submit \
–master yarn\
–deploy-mode client\
–class com.baidu.pcsdata.message.value.analysis.MsgFastCategory \
–driver-cores 8 \
–executor-cores 2\
–num-executors 10 \
–driver-memory 5G\
–principal horus/pcsdata@PCSDATA.COM\
–keytab /home/work/chenxiue/horus.keytab\
–executor-memory 6G\
–driver-java-options “ -Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8888”\
/home/work/chenxiue/spark/msg-mining/pcs_fsg-assembly-1.0.jar $inputpath $outputpath $modelpath

####3.在idea上面设置好断点后(把鼠标放在需要断的某行上面,然后左击出现红点)运行
run->debug “remote debug”

好了,现在就能够愉快地修改代码并且编译调试了~

###参考:
https://segmentfault.com/a/1190000008867470

原创-flume写hdfs性能优化



阅读次数

标签(空格分隔): 未分类


flume在写hdfs的时候,每接收到一个event就会调用bucketwriter.java的append(event)方法,在append当中每次都会去检查shouldrotate,我们来看一下这块的逻辑:

private boolean shouldRotate() {
    boolean doRotate = false;

    if (writer.isUnderReplicated()) {
      this.isUnderReplicated = true;
      doRotate = true;
    } else {
      this.isUnderReplicated = false;
    }

    if ((rollCount > 0) && (rollCount <= eventCounter)) {
      LOG.debug("rolling: rollCount: {}, events: {}", rollCount, eventCounter);
      doRotate = true;
    }

    if ((rollSize > 0) && (rollSize <= processSize)) {
      LOG.debug("rolling: rollSize: {}, bytes: {}", rollSize, processSize);
      doRotate = true;
    }

    return doRotate;
  }

我们可以看到这里每次都会调用writer.isUnderReplicated(),如果副本数低于预期,那么不管有没有达到rollcount或者rollsize就rotate??(这块可能是说上一个event如果副本数没有达到预期,那么就需要重新开一个新文件去写入,为啥要这样设计?不太清楚。。。)

public boolean isUnderReplicated() {
    try {
      int numBlocks = getNumCurrentReplicas();
      if (numBlocks == -1) {
        return false;
      }
      int desiredBlocks;
      if (configuredMinReplicas != null) {
        desiredBlocks = configuredMinReplicas;
      } else {
        desiredBlocks = getFsDesiredReplication();
      }
      return numBlocks < desiredBlocks;
    } catch (IllegalAccessException e) {
      logger.error("Unexpected error while checking replication factor", e);
    } catch (InvocationTargetException e) {
      logger.error("Unexpected error while checking replication factor", e);
    } catch (IllegalArgumentException e) {
      logger.error("Unexpected error while checking replication factor", e);
    }
    return false;
  }

副本获取的代码如下:

/**
   * This method gets the datanode replication count for the current open file.
   *
   * If the pipeline isn't started yet or is empty, you will get the default
   * replication factor.
   *
   * <p/>If this function returns -1, it means you
   * are not properly running with the HDFS-826 patch.
   * @throws InvocationTargetException
   * @throws IllegalAccessException
   * @throws IllegalArgumentException
   */
  public int getNumCurrentReplicas()
      throws IllegalArgumentException, IllegalAccessException,
          InvocationTargetException {
    if (refGetNumCurrentReplicas != null && outputStream != null) {
      OutputStream dfsOutputStream = outputStream.getWrappedStream();
      if (dfsOutputStream != null) {
        Object repl = refGetNumCurrentReplicas.invoke(dfsOutputStream, NO_ARGS);
        if (repl instanceof Integer) {
          return ((Integer)repl).intValue();
        }
      }
    }
    return -1;
  }

我们可以看到每一次event写入都要去和hdfs交互一次,这个代价非常高。而实际上在我们的工作当中我们并没有用rollcount和rollsize,而是使用了rollinterval,rollinterval的逻辑并不在这里,在open()里面有这么一段逻辑:

if (rollInterval > 0) {
      Callable<Void> action = new Callable<Void>() {
        public Void call() throws Exception {
          LOG.debug("Rolling file ({}): Roll scheduled after {} sec elapsed.",
              bucketPath, rollInterval);
          try {
            // Roll the file and remove reference from sfWriters map.
            close(true);
          } catch(Throwable t) {
            LOG.error("Unexpected error", t);
          }
          return null;
        }
      };
      timedRollFuture = timedRollerPool.schedule(action, rollInterval,TimeUnit.SECONDS);

我们回过头来看一下hdfseventsink类的process方法:

public Status process() throws EventDeliveryException {
    Channel channel = getChannel();
    Transaction transaction = channel.getTransaction();
    List<BucketWriter> writers = Lists.newArrayList();
    transaction.begin();
    try {
      int txnEventCount = 0;
      for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {
        Event event = channel.take();
        if (event == null) {
          break;
        }

        // reconstruct the path name by substituting place holders
        String realPath = BucketPath.escapeString(filePath, event.getHeaders(),
            timeZone, needRounding, roundUnit, roundValue, useLocalTime);
        String realName = BucketPath.escapeString(fileName, event.getHeaders(),
          timeZone, needRounding, roundUnit, roundValue, useLocalTime);

        String lookupPath = realPath + DIRECTORY_DELIMITER + realName;
        BucketWriter bucketWriter;
        HDFSWriter hdfsWriter = null;
        // Callback to remove the reference to the bucket writer from the
        // sfWriters map so that all buffers used by the HDFS file
        // handles are garbage collected.
        WriterCallback closeCallback = new WriterCallback() {
          @Override
          public void run(String bucketPath) {
            LOG.info("Writer callback called.");
            synchronized (sfWritersLock) {
              sfWriters.remove(bucketPath);
            }
          }
        };
        synchronized (sfWritersLock) {
          bucketWriter = sfWriters.get(lookupPath);
          // we haven't seen this file yet, so open it and cache the handle
          if (bucketWriter == null) {
            hdfsWriter = writerFactory.getWriter(fileType);
            bucketWriter = initializeBucketWriter(realPath, realName,
              lookupPath, hdfsWriter, closeCallback);
            sfWriters.put(lookupPath, bucketWriter);
          }
        }

        // track the buckets getting written in this transaction
        if (!writers.contains(bucketWriter)) {
          writers.add(bucketWriter);
        }

        // Write the data to HDFS
        try {
          bucketWriter.append(event);
        } catch (BucketClosedException ex) {
          LOG.info("Bucket was closed while trying to append, " +
            "reinitializing bucket and writing event.");
          hdfsWriter = writerFactory.getWriter(fileType);
          bucketWriter = initializeBucketWriter(realPath, realName,
            lookupPath, hdfsWriter, closeCallback);
          synchronized (sfWritersLock) {
            sfWriters.put(lookupPath, bucketWriter);
          }
          bucketWriter.append(event);
        }
      }

      if (txnEventCount == 0) {
        sinkCounter.incrementBatchEmptyCount();
      } else if (txnEventCount == batchSize) {
        sinkCounter.incrementBatchCompleteCount();
      } else {
        sinkCounter.incrementBatchUnderflowCount();
      }

      // flush all pending buckets before committing the transaction
      for (BucketWriter bucketWriter : writers) {
        bucketWriter.flush();
      }

      transaction.commit();

      if (txnEventCount < 1) {
        return Status.BACKOFF;
      } else {
        sinkCounter.addToEventDrainSuccessCount(txnEventCount);
        return Status.READY;
      }
    } catch (IOException eIO) {
      transaction.rollback();
      LOG.warn("HDFS IO error", eIO);
      return Status.BACKOFF;
    } catch (Throwable th) {
      transaction.rollback();
      LOG.error("process failed", th);
      if (th instanceof Error) {
        throw (Error) th;
      } else {
        throw new EventDeliveryException(th);
      }
    } finally {
      transaction.close();
    }
  }

hdfseventsink当中维护了map的sfwrites哈希表,每次event来的时候,会去sfwriters当中寻找对应path(根据hdfs.path以及hdfs.fileprefix来决定)的bucketwriter作为当前写入的handle.如果找不到,就创建一个,并且将新的句炳加入到sfwriters当中去。因此实际上对于上述bucketwriter中的rotate来说,只是针对同一个路径内部的,如果没有一个路径内rollsize和rollcount滚动的需求,以及数据一定要多副本写入的需求,完全可以把bucketwriter内部判断rotate的逻辑注释掉,这样能够大大提升flume写入能力.

agent.sinks.sk_cloudui.hdfs.path = hdfs://bigfile/om/anticrack/cloudui.log/%Y%m%d/%k%M/
agent.sinks.sk_cloudui.hdfs.filePrefix = qd01-pcsdata45.qd01.baidu.com

原创-spark-on-yarn情况下historyserver的配置



阅读次数

标签(空格分隔): 未分类


我们经常需要在一个app执行完成之后,去对这个app的执行情况进行分析,不管是分析它如何失败也好,或者是分析这个任务执行过程是否需要优化。那么这时候我们就需要用到historyserver.

首先log分为两种:1)标准输入输出的log 2)spark event log
对应yarn页面上的两个按钮:1)logs 2)history

###logs配置
对于yarn来说,logs这块需要配置:(所有nodemanager的机器上都需要修改该配置)

<property>
  <name>yarn.log-aggregation-enable</name>
  <value>true</value>
  </property>

  <property> 
  <name>yarn.log-aggregation.retain-seconds</name>
  <value>259200</value>
  </property>

  <property>
    <name>yarn.log.server.url</name>
    <value>http://hostA:8937/jobhistory/logs/</value>(yarn会在用户点击历史任务logs的时候跳转到这个url,这个url提供的jobhistory server是mapreduce的功能)
  </property>

 <property> 
  <name>yarn.nodemanager.remote-app-log-dir</name>
  <value>/tmp/logs/yarn/</value>(yarn会负责任务结束后将地址转存到这个位置)
  </property>

在hostA上修改mapred-site.xml配置:
对于mapred-site.xml来说需要配置:(./sbin/mr-jobhistory-daemon.sh start historyserver)

<property>
<name>mapreduce.jobhistory.address</name>
<value>hostA:8927</value>
</property>

<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>hostB:8937</value>
</property>

(historyserver挂掉,检查相关的几个目录是不是满了)

在hostA上执行$HADOOP_HOME/sbin/mr-jobhistory-daemon.sh start historyserver启动historyserver。

这个步骤完成后在yarn页面上点击app的logs按钮就可以看到该app的日志,不管这个app执行完成与否。

###event log配置
对于history来说。yarn会负责跳转到app自身指定的history server当中去。(在hostB机器上$SPARK_HOME/sbin/start-history-server.sh启动historyserver)

spark.eventLog.enabled           true
spark.eventLog.dir  hdfs://hostNamenode:8900/spark-event-2.0
spark.yarn.historyServer.address hostB:8651
spark.history.fs.logDirectory hdfs://hostNamenode:8900/spark-event-2.0
spark.history.retainedApplications 1000
spark.history.ui.port 8651
spark.history.fs.cleaner.enabled true
spark.history.fs.cleaner.interval 1d
spark.history.fs.cleaner.maxAge 3d
spark.executor.extraJavaOptions  -XX:+PrintGCDetails -XX:+PrintGCTimeStamps

这个完成后,就可以通过history按钮看到spark event log,不管该app执行完成与否。

原创-spark中parquet文件写入优化



阅读次数

标签(空格分隔): 性能优化


在我们的实际使用中,经常需要将原始的文本文件转换为parquet列存储格式,以便后续查询的时候使用。写parquet能提高后续表查询效率这个事情我们不多说,下面讨论一下写parquet文件的效率问题:

我们来看一下两段程序:

1.使用case class作为df转换

package com.yundata.transtoparquet

import java.lang.Exception
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.Row

import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary,Statistics}
import org.apache.spark.mllib.tree.DecisionTree
import org.apache.spark.mllib.tree.model.DecisionTreeModel
import org.apache.spark.mllib.util.MLUtils

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint

import org.apache.spark.mllib.tree.GradientBoostedTrees
import org.apache.spark.mllib.tree.configuration.BoostingStrategy
import org.apache.spark.mllib.tree.model.GradientBoostedTreesModel
import org.apache.spark.mllib.util.MLUtils

import org.apache.spark.rdd.PairRDDFunctions

import scala.collection.mutable.ArrayBuffer


import org.apache.spark.sql.types.{StructType,StructField,StringType};
import org.apache.spark.sql.Row;


case class formatrow(fs_id:Long,user_id:Long,app_id:Long,parent_path:String,server_filename:String,s3_handle:String,size:Long,server_mtime:String,server_ctime:Long,local_mtime:Long,local_ctime:Long,isdir:Long,isdelete:Long,status:Long,category:Long,object_key:String,extent_int2:Long,recompute_tag:Long,user_range:Long,md5_range:Long,event_day:String) extends Product;


/**
 * Created by robert on 15-5-18.
 */
class transtoparquet(userConfFile: String,sourceFile:String,destFile:String,event_day:String) extends Serializable{



  def run(): Unit = {

    transtoparquetconf(userConfFile);

    val conf=transtoparquetconf.getSparkConf();
    conf.setAppName(conf.get("spark.app.name", this.getClass.getName));

    val sc=new SparkContext(conf);
    val sqlContext = new org.apache.spark.sql.SQLContext(sc);
    import sqlContext.implicits._

    sc.hadoopConfiguration.addResource("hdfs-site.xml");
    sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")

    System.out.println(sc.hadoopConfiguration.get("dfs.ha.namenodes.bigfile"));

    System.out.println(sc.hadoopConfiguration.toString);



    val txtfile = sc.textFile(sourceFile);

    txtfile.take(10).foreach(println);
    println(sourceFile);


    val txtdf=txtfile.filter(_.split("\t").size==18).map(
      x=>{

            var s3_handle=x.split("\t")(5);
            var md5_range=Long2long(0);
            if(x.split("\t")(5)=="")
              {
                s3_handle=(new scala.util.Random).nextInt(99).toString(); //取0-99之间的随机数,保证其散列开来
                md5_range=s3_handle.toLong;
              }
            else if(x.split("\t")(5).size==32)
            {
                md5_range=((s3_handle.toLowerCase.substring(0, 24).toList.map("0123456789abcdef".indexOf(_)).map(BigInt(_)).reduceLeft( _ * 32 + _))%100).toLong
            }
            else
            {
                md5_range = Long2long(-1);
            }
            var user_range=(x.split("\t")(0)).toLong/100000000;


            if(user_range>=100)
            {
                user_range=100;
            }

            if(md5_range == Long2long(-1))
              {
                null
              }


            else {
              formatrow((x.split("\t")(0)).toLong, x.split("\t")(1).toLong, x.split("\t")(2).toLong, x.split("\t")(3), x.split("\t")(4), s3_handle, (x.split("\t")(6)).toLong, x.split("\t")(7), (x.split("\t")(8)).toLong, (x.split("\t")(9)).toLong, (x.split("\t")(10)).toLong, (x.split("\t")(11)).toLong, (x.split("\t")(12)).toLong, (x.split("\t")(13)).toLong, (x.split("\t")(14)).toLong, x.split("\t")(15), (x.split("\t")(16)).toLong, (x.split("\t")(17)).toLong, user_range, md5_range, event_day);
            }
      }
    ).filter(_ != null).toDF();


    txtdf.repartition(5).write.mode(org.apache.spark.sql.SaveMode.Append).partitionBy("user_range","md5_range","event_day").parquet(destFile);

  }



}



object transtoparquet{

  def main(args: Array[String]): Unit = {
    if (args.size != 4) {
      println("usage: com.yundata.transtoparquet.transtoparquet config srcfile destfile event_day")
      return
    }
    new transtoparquet(args(0),args(1),args(2),args(3)).run()
  }
  }

这个程序实际测试,原始数据9G,压缩后大概是5.5G左右,使用50个核跑了好几个小时,居然都没有写完数据,看executor日志,几B几B地在往hdfs当中去写日志。崩溃,于是,换了一种写法。

2.使用row作为df转换

package com.yundata.transtoparquet


import java.lang.Exception
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.Row

import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary,Statistics}
import org.apache.spark.mllib.tree.DecisionTree
import org.apache.spark.mllib.tree.model.DecisionTreeModel
import org.apache.spark.mllib.util.MLUtils

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint

import org.apache.spark.mllib.tree.GradientBoostedTrees
import org.apache.spark.mllib.tree.configuration.BoostingStrategy
import org.apache.spark.mllib.tree.model.GradientBoostedTreesModel
import org.apache.spark.mllib.util.MLUtils

import org.apache.spark.rdd.PairRDDFunctions

import scala.collection.mutable.ArrayBuffer


import org.apache.spark.sql.types.{StructType,StructField,StringType,LongType};
import org.apache.spark.sql.Row;

/**
 * Created by robert on 15-5-18.
 */
class transtoparquet(userConfFile: String,sourceFile:String,destFile:String,event_day:String) extends Serializable{

  def run(): Unit = {

    transtoparquetconf(userConfFile);

    val conf=transtoparquetconf.getSparkConf();
    conf.setAppName(conf.get("spark.app.name", this.getClass.getName));

    val sc=new SparkContext(conf);
    val sqlContext = new org.apache.spark.sql.SQLContext(sc);
    import sqlContext.implicits._

    sc.hadoopConfiguration.addResource("hdfs-site.xml");
    sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")

    System.out.println(sc.hadoopConfiguration.get("dfs.ha.namenodes.bigfile"));

    System.out.println(sc.hadoopConfiguration.toString);



    val txtfile = sc.textFile(sourceFile);

    txtfile.take(10).foreach(println);
    println(sourceFile);

    val schema=StructType(Array(StructField("fs_id",LongType,true), StructField("user_id",LongType,true),StructField("app_id",LongType,true),StructField("parent_path",StringType,true),StructField("server_filename",StringType,true),StructField("s3_handle",StringType,true),StructField("size",LongType,true),StructField("server_mtime",StringType,true),StructField("server_ctime",LongType,true),StructField("local_mtime",LongType,true),StructField("local_ctime",LongType,true),StructField("isdir",LongType,true),StructField("isdelete",LongType,true),StructField("status",LongType,true),StructField("category",LongType,true),StructField("object_key",StringType,true),StructField("extent_int2",LongType,true),StructField("recompute_tag",LongType,true),StructField("user_range",LongType,true),StructField("md5_range",LongType,true),StructField("event_day",StringType,true)));

    val txtrow=txtfile.filter(_.split("\t").size==18).map(
      x=>{

            var s3_handle=x.split("\t")(5);
            var md5_range=Long2long(0);
            if(x.split("\t")(5)=="")
              {
                s3_handle=(new scala.util.Random).nextInt(99).toString(); //取0-99之间的随机数,保证其散列开来
                md5_range=s3_handle.toLong;
              }
            else if(x.split("\t")(5).size==32)
            {
                md5_range=((s3_handle.toLowerCase.substring(0, 24).toList.map("0123456789abcdef".indexOf(_)).map(BigInt(_)).reduceLeft( _ * 32 + _))%100).toLong
            }
            else
            {
                md5_range = Long2long(-1);
            }
            var user_range=(x.split("\t")(0)).toLong/100000000;


            if(user_range>=100)
            {
                user_range=100;
            }

            if(md5_range == Long2long(-1))
              {
                null
              }
            else {
              Row((x.split("\t")(0)).toLong, x.split("\t")(1).toLong, x.split("\t")(2).toLong, x.split("\t")(3), x.split("\t")(4), s3_handle, (x.split("\t")(6)).toLong, x.split("\t")(7), (x.split("\t")(8)).toLong, (x.split("\t")(9)).toLong, (x.split("\t")(10)).toLong, (x.split("\t")(11)).toLong, (x.split("\t")(12)).toLong, (x.split("\t")(13)).toLong, (x.split("\t")(14)).toLong, x.split("\t")(15), (x.split("\t")(16)).toLong, (x.split("\t")(17)).toLong, user_range, md5_range, event_day);
            }
      }
    ).filter(_ != null);




    val txtdf=sqlContext.createDataFrame(txtrow, schema);

    txtdf.repartition(5).write.mode(org.apache.spark.sql.SaveMode.Append).partitionBy("user_range","md5_range","event_day").parquet(destFile);
  }
}



object transtoparquet{

  def main(args: Array[String]): Unit = {

    if (args.size != 4) {
      println("usage: com.yundata.transtoparquet.transtoparquet config srcfile destfile event_day")
      return
    }
    new transtoparquet(args(0),args(1),args(2),args(3)).run()
  }

}

使用了这个程序之后,很快,18分钟,就将5G parquet数据全部写入了。

在上述程序中,我使用的配置是:

spark.app.name = TRANSTOPARQUET-JOB
spark.master = spark://sparkmaster:8650
spark.cores.max=50
spark.executor.instances=50
spark.executor.memory=2g
spark.speculation=true
spark.driver.maxResultSize=2g
spark.ui.port=8221
spark.ui.retainedStages=20
spark.ui.retainedJobs=20
spark.sql.parquet.compression.codec=snappy

但是这个程序还有一个问题是,如果repartition不设置的话,最后写入的文件数会非常多,大概是num(user_range)num(md5_range)num(event_day)*num(repartition),很可能会瞬间打爆namenode的内存。因此repartition要设置得非常小,这又导致了整个程序会非常慢。

3)优化一下repartition的方式

原先使用repartiton(5)的方式的时候,是随机分区,导致所有的task都基本可能有每一个分区的数据,所以导致每个分区下面都有5个文件,但是如果我按照需要的分区来作哈希的话,例如repartition(user_range,md5_range,event_day)来的话,那么每个分区的数据只会存在在一个最后写入的task任务中,也就保证了整个任务产生的分区数最大是num(user_range)num(md5_range)num(event_day)

而不是原来的num(repartition)num(user_range)num(md5_range)*num(event_day)

因此我们可以随意启并发数。

将写入代码修改为:

txtdf.repartition(txtdf(“user_range”), txtdf(“md5_range”), txtdf(“event_day”)).write.mode(org.apache.spark.sql.SaveMode.Append).partitionBy(“user_range”,”md5_range”,”event_day”).parquet(destFile)

这样修改了之后。数据5分钟左右就全部写入了。

9208 4556 3297227427 /horus/users/chenxiue

数据的大小也比之前小了一些,因为文件更加集中了。

下面我们对比一下gzip压缩和snappy压缩的效果:

1)压缩比率

原始文件大小:

1 517 9359700501 hdfs://namenode:8700/pika_data/file_meta_data_20170101_part4/2017011421/1483873213877

snappy压缩产出的文件格式类似:part-r-00000-b3ff5d89-8885-42f1-bb3d-e8dc6fb692a0.snappy.parquet

压缩后的文件大小:
9206 22157 5501724946 /horus/users/chenxiue

大概花了18分钟左右。

gzip压缩是类似:part-r-00003-be2d54a8-088e-4970-be30-363747930a6e.gz.parquet这样的文件

大概也花了18分钟左右
9206 22157 3958666080 /horus/users/chenxiue1

可以看出gzip的压缩比更加大一些。

2)在随机分区得情况下,我们尝试加大最后写入的并发度,看看会不会有加速?

txtdf.repartition(10).write.mode(org.apache.spark.sql.SaveMode.Append).partitionBy(“user_range”,”md5_range”,”event_day”).parquet(destFile);

结果写入花了19分钟???为啥??
9208 44158 4259882305 /horus/users/chenxiue
发现写入的大小比原先的稍微大一些。

怀疑是数据太小,主要时间花在建立文件上。repartiton越大的话,文件数就越多。

原创-在kerberos+HA环境下的ranger编译安装



阅读次数

标签(空格分隔): 部署文档,转载请注明出处


1.代码下载&编译

git clone https://github.com/apache/incubator-ranger.git
cd incubator-ranger
git checkout ranger-0.5

mvn clean compile package assembly:assembly install

下载的过程中遇到python hash库的问题,重新安装下python即可
另外经常因为下载库过程超时,重试几次就好了

编译好的目录在target目录下。

2.控制台ranger-admin的安装

1)安装mysql数据库

配置my.cnf:

basedir =/home/bae/dataplatform/jumbo
datadir =/home/bae/dataplatform/jumbo/var
port = 3309
socket = /home/bae/dataplatform/jumbo/var/mysql.sock

启动mysql:
./bin/mysql_install_db
./share/mysql/mysql.server start

2)生成各个模块的keytab

addprinc -randkey rangeradmin/hostA@EXAMPLE.COM

xst -k /home/bae/dataplatform/kerberos/keytab/rangeradmin.keytab rangeradmin/hostA@EXAMPLE.COM

addprinc -randkey rangerlookup/hostA@EXAMPLE.COM

xst -k /home/bae/dataplatform/kerberos/keytab/rangerlookup.keytab rangerlookup/hostA@EXAMPLE.COM

addprinc -randkey rangerusersync/hostA@EXAMPLE.COM

xst -k /home/bae/dataplatform/kerberos/keytab/rangerusersync.keytab rangerusersync/hostA@EXAMPLE.COM

addprinc -randkey rangertagsync/hostA@EXAMPLE.COM

xst -k /home/bae/dataplatform/kerberos/keytab/rangertagsync.keytab rangertagsync/hostA@EXAMPLE.COM

3)配置ranger-admin

将ranger-0.5.4-SNAPSHOT-admin.tar.gz解压到安装目录下,修改install.properties,需要修改的选项如下:

SQL_CONNECTOR_JAR=/home/bae/dataplatform/jumbo/lib/mysql/mysql-connector-java-5.1.41-bin.jar
db_root_user=root
db_root_password=
db_host=hostMysql:3309

db_name=ranger
db_user=rangeradmin
db_password=123456


audit_store=db
audit_db_name=ranger_audit
audit_db_user=rangerlogger
audit_db_password=123456


policymgr_external_url=http://localhost:8070
policymgr_http_enabled=true

unix_user=work
unix_group=work


spnego_principal=HTTP/hostA@EXAMPLE.COM
spnego_keytab=/home/bae/dataplatform/kerberos/keytab/spnego.service.keytab
token_valid=30
cookie_domain=hostA
cookie_path=/

admin_principal=rangeradmin/hostA@EXAMPLE.COM
admin_keytab=/home/bae/dataplatform/kerberos/keytab/rangeradmin.keytab
lookup_principal=rangerlookup/hostA@EXAMPLE.COM
lookup_keytab=/home/bae/dataplatform/kerberos/keytab/rangerlookup.keytab

运行./setup.sh(root运行,否则报groupadd没有权限)

遇到问题:

a)报错:

SQLException : SQL state: 28000 java.sql.SQLException: Access denied for user ‘rangeradmin’@’hostA’ (using password: YES) ErrorCode: 1045

查看user表,该用户已经创建,但是机器没有被授权

create user 'rangeradmin'@'hostA' identified by '123456';
flush privileges;

b)修改了policymgr_external_url=http://localhost:8070端口,发现8070端口没有启动成功

在conf/ranger-admin-site.xml中发现

<property>
<name>ranger.service.http.port</name>
<value>6080</value>
</property>

这里需要修改.

c)range-admin stop/start重新启动后就可以看到了。注意tomcat的日志在ews/logs/catalina.out当中

验证是否成功:打开http://localhost:8070,使用admin/admin登录

3.安装usersync进程

这个安装的目的是同步unix,或者ldap中的用户到ranger中。

拷贝编译好的ranger-0.5.4-SNAPSHOT-usersync.tar.gz到适当目录并解压

修改install.properties:(同步本机的unix用户)

POLICY_MGR_URL = http://localhost:8070
# sync source,  only unix and ldap are supported at present
# defaults to unix
SYNC_SOURCE = unix
#User and group for the usersync process
unix_user=work
unix_group=work
logdir=/home/bae/dataplatform/ranger-0.5.4-SNAPSHOT-usersync/logs/ranger/usersync
usersync_principal=rangerusersync/hostA@EXAMPLE.COM
usersync_keytab=/home/bae/dataplatform/kerberos/keytab/rangerusersync.keytab
hadoop_conf=/home/bae/dataplatform/hadoop/conf/

使用root账号运行./setup.sh
启动usersync:/ranger-usersync-services.sh start
验证是否成功:在ranger控制台的settings->Users/Groups信息看本机的账号是否已经被同步上来。

4.hdfs-plugin安装(只需要在对应集群的主备namenode上安装)

为了让ranger能够控制hdfs,需要安装plugin

拷贝ranger-0.5.4-SNAPSHOT-hbase-plugin.tar.gz到对应目录并解压。修改install.properties

POLICY_MGR_URL=http://hostA:8070
SQL_CONNECTOR_JAR=/home/bae/dataplatform/jumbo/lib/mysql/mysql-connector-java-5.1.41-bin.jar
REPOSITORY_NAME=hadoopdev(与后续页面上配置的一致)
XAAUDIT.DB.IS_ENABLED=true
XAAUDIT.DB.FLAVOUR=MYSQL
XAAUDIT.DB.HOSTNAME=hostA:3309
XAAUDIT.DB.DATABASE_NAME=ranger_audit
XAAUDIT.DB.USER_NAME=rangeradmin
XAAUDIT.DB.PASSWORD=123456

CUSTOM_USER=work
CUSTOM_GROUP=work

创建到hadoop_conf的软链:
ln -s /home/bae/dataplatform/hadoop-2.7.2 /home/bae/dataplatform/hadoop
ln -s /home/bae/dataplatform/hadoop-2.7.2/etc/hadoop/ /home/bae/dataplatform/hadoop-2.7.2/conf

确认$HADOOP_HOME下面有lib目录,如果没有需要编译native lib,编译方法:

http://hadoop.apache.org/docs/r2.7.3/hadoop-project-dist/hadoop-common/NativeLibraries.html

使用root账号启动hdfs-plugin:

./enable-hdfs-plugin.sh(root身份运行)

重启namenode进程:

将$HADOOP_HOME/lib下面新增的ranger jar添加到hadoop_classpath变量中,

在conf/hadoop-env.sh中添加:

for f in $HADOOP_HOME/lib/*.jar; do
  if [ "$HADOOP_CLASSPATH" ]; then
    export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$f
  else
    export HADOOP_CLASSPATH=$f
  fi
done

重新启动namenode(如果报jdbc方法找不到问题,就将mysql-connector-java-5.1.41-bin.jar拷贝到$HADOOP_HOME/lib目录下后重启)

5.在ranger的控制台中增加plugin配置:

首先创建一个kerberos的用户名密码

addprinc -pw password rangeradmin@example.com

修改core-site.xml增加映射:

RULE:[2:$1@$0](rangeradmin@EXAMPLE.COM)s/.*/work/
RULE:[1:$1@$0](rangeradmin@EXAMPLE.COM)s/.*/work/

重启namenode使其生效,重启ranger-admin

在Service Manager->hdfs中增加hadoopdev(名称与hdfs plugin中设置的一致)repo

username:rangeradmin@example.com
password:password
namenode url:hdfs://hostB:8900
Authorization Enabled:yes
Authentication Type:kerberos
hadoop.security.auth_to_local:RULE:[1:$1@$0](rangeradmin@EXAMPLE.COM)s/.*/work/
dfs.datanode.kerberos.principal:dn/_HOST@EXAMPLE.COM
dfs.namenode.kerberos.principal:nn/_HOST@EXAMPLE.COM
dfs.secondary.namenode.kerberos.principal:nn/_HOST@EXAMPLE.COM
RPC Protection Type:Authentication

dfs.nameservices = smallfile
dfs.ha.namenodes.smallfile= nn1,nn2
dfs.namenode.rpc-address.nn1 = hostB:8900
dfs.namenode.rpc-address.nn2 = hostC:8900
dfs.client.failover.proxy.provider.smallfile = org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider

其中username/password需要是kerberos中有效的用户名密码。

点击test connection如果成功,那么save.

验证plugin是否添加成功:在audit->plugin目录下是否出现对应的plugin信息。

6.ranger对hdfs授权测试

注意首先要在hdfs上将权限收回,比如把一个目录权限设置成000,这样就完全由ranger policy控制。否则生效的都是hdfs上的大权限。

可以通过audit->access中得Access Enforcer看生效得是ranger-acl还是hadoop-acl

参考文档:

https://cwiki.apache.org/confluence/display/RANGER/Apache+Ranger+0.5.0+Installation#ApacheRanger0.5.0Installation-InstallandconfigureSolrorSolrCloud

在kerberos环境下安装ranger:

https://cwiki.apache.org/confluence/display/RANGER/Ranger+installation+in+Kerberized++Environment

https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.4.2/bk_Security_Guide/content/hdfs_plugin_kerberos.html