下载并解压WaterDrop1
wget https://github.com/InterestingLab/waterdrop/releases/download/v1.4.1/waterdrop-1.4.1.zip
修改配置文件waterdrop-env.sh1
2
3cd /data/work/waterdrop-1.4.1
vim config/waterdrop-env.sh
SPARK_HOME=/data/work/spark-2.4  #配置为spark的路径
增加配置文件small.conf1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39spark {
  spark.streaming.batchDuration = 5
  spark.app.name = "small_spark_streaming"
  spark.ui.port = 14020
  spark.executor.instances = 3
  spark.executor.cores = 1
  spark.executor.memory = "1g"
}
input {
  kafkaStream  {
    topics = "small"
    consumer.bootstrap.servers = "hadoop008.eqxiu.com:9092,hadoop006.eqxiu.com:9092,hadoop007.eqxiu.com:9092"
    consumer.zookeeper.connect = "hadoop004:2181,hadoop003:2181,hadoop002:2181,hadoop001:2181,hadoop005:2181"
    consumer.group.id = "clickhouse_small"
    consumer.failOnDataLoss = false
    consumer.auto.offset.reset = latest
    consumer.rebalance.max.retries = 100
  }
}
filter {
      json{
            source_field = "raw_message"
      }
}
output {
    clickhouse {
        host = "10.10.8.1:8123"
        database = "bw"
        table = "small"
        fields = ["act","b_t","b_v","bro","c_i","c_p","s_t","c_t","cit","cou","url","ref","u_i"]
        username = ""
        password = ""
        retry_codes = [209, 210 ,1002]
        retry = 10
	    bulk_size = 1000
    }
}
创建Clickhouse表1
create table bw.small( act String, b_t String, b_v String, bro String, c_i String, c_p String, s_t String, c_t String, cit String, cou String, url String, ref String, u_i String ) ENGINE = MergeTree() partition by toYYYYMMDD(toDateTime(toUInt64(s_t)/1000)) order by (s_t);
启动写入程序1
2cd /data/work/waterdrop-1.4.1
sh bin/start-waterdrop.sh --master yarn --deploy-mode client --config small.conf