https://zeppelin.apache.org/download.html
cp conf/zeppelin-site.xml.template conf/zeppelin-site.xml vi conf/zeppelin-site.xml 修改zeppelin-site.xml指定绑定的ip和port
bin/zeppelin-daemon.sh start
bin/zeppelin-daemon.sh stop
支持的组件
1 2 3 4 5 alluxio elasticsearch groovy java kylin neo4j scalding sparql angular file hazelcastjet jdbc lens pig scio submarine beam flink hbase jupyter livy python sh zeppelin-interpreter-shaded-0.10.1.jar bigquery flink-cmd ignite kotlin md r spark cassandra geode influxdb ksql mongodb sap spark-submit
解释器使用credentials密码加密 配置文件添加配置项 zeppelin.credentials.encryptKey,注意密钥需要是16/24/32字节 创建interpreter时不配置默认密码,用户添加credentials即可
SparkThriftServer方案 配置个人账密连接受限用户(proxyuser)启动的thrift,thrift可显示sql对应用户,文件操作为受限用户
配置JDBC的依赖为 org.apache.hive:hive-jdbc:2.3.4 会自动联网下载. (离线部署则使用hive-jdbc-2.3.10-standalone.jar,否则还需要下载其他依赖) driver为 org.apache.hive.jdbc.HiveDriver
1 2 3 4 5 6 7 ./local-repo/org/apache/hive/hive-jdbc ./local-repo/org/apache/hive/hive-jdbc/2.3.4/hive-jdbc-2.3.4.pom ./local-repo/org/apache/hive/hive-jdbc/2.3.4/hive-jdbc-2.3.4.pom.sha1 ./local-repo/org/apache/hive/hive-jdbc/2.3.4/hive-jdbc-2.3.4.jar ./local-repo/org/apache/hive/hive-jdbc/2.3.4/hive-jdbc-2.3.4.jar.sha1 ./local-repo/query-48/hive-jdbc-2.3.4.jar ./local-repo/query-194/hive-jdbc-2.3.4.jar
使用登录用户身份连接,但还是用的STS进程用户查询,添加自定义配置项 default.proxy.user.property=hive.server2.proxy.userhive.server2.enable.doAs=true
为了能避免创建目录是当前登录用户、写出数据又是启动STS的用户导致permission deny问题,配置hive.server2.enable.doAs=false ,使得操作都是sts的启动用户
最后使用的方案是启动thriftserver时指定--proxy-user zeppelin,则都用zeppelin身份进行io,而在thriftserver可以知道谁执行
zeppelin.spark.run.asLoginUser
global share: 以第一个使用的人的身份启动
isolated per note: 每个人都独立启动
scoped per note: 同一个sparksession,但变量相互不干扰
检查用户登陆 cat zeppelin-hadoop-LGJF-ZYC6-HCY-SVR556.log2023-08-0* |grep user |awk -F ‘:’ ‘{print $8}’ |sort -u
grep “Creating connection pool” zeppelin-interpreter-query-shared_process-hadoop-a51-dg-hcy-bi-cal-003.log |awk -F’,’ ‘{print $3}’
用户密码加密 ini方式 shiro.ini
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 [main] matcher =org.apache.shiro.authc.credential.HashedCredentialsMatchermatcher.hashAlgorithmName =sha-256 matcher.storedCredentialsHexEncoded =true myRealm.credentialsMatcher = $matcher [users] user1 = 2 bb80d537b1da3e38bd30361aa855686bde0eacd7162fef6a25fe97bf527a25b, role1, role2, ...
生成密码方法: echo -n thisIsPassword | sha256sum
其他验证类: SimpleCredentialsMatcher AllowAllCredentialsMatcher HashedCredentialsMatcher PasswordMatcher
JDBC方式 Maven Repository: org.xerial » sqlite-jdbc
1 2 3 4 5 6 7 8 9 10 11 [main] myRealm =org.apache.shiro.realm.jdbc.JdbcRealmmyRealmCredentialsMatcher =org.apache.shiro.authc.credential.SimpleCredentialsMatchermyRealm.credentialsMatcher =$myRealmCredentialsMatcher dataSource =org.sqlite.SQLiteDataSourcedataSource.url =jdbc:sqlite:/data/zeppelin/user.dbmyRealm.dataSource =$dataSource
1 2 3 4 5 6 7 8 9 sqlite3 user.db create table users (username text, password text); insert into users values ('xx','xx'); -- sha256+hex create table user_roles(username text, role_name text); insert into user_roles values ('xx','admin'); update user_roles set role_name='rol' where username='xx'; create table roles_permissions (username text, permission text);
代码 ZeppelinServer
NotebookService.java[runParagraph] 界面发起执行 Note.run Paragraph.execute getBindedInterpreter() InterpreterFactory#getInterpreter InterpreterSetting getDefaultInterpreter(ExecutionContext executionContext) InterpreterSetting#getOrCreateSession(org.apache.zeppelin.interpreter.ExecutionContext) InterpreterSetting.java[getOrCreateInterpreterGroup] Create InterpreterGroup with groupId ManagedInterpreterGroup#getOrCreateSession InterpreterSetting#createInterpreters : new RemoteInterpreter Paragraph.execute interpreter.getScheduler().submit(this);放入队列
RemotetScheduler(每个interpreter一个) /AbstractScheduler#run出队列->runJobInScheduler->JobRunner->runJob Paragraph/InterpretJob(是一种job) jobRun RemoteInterpreter.interpret发起执行
getOrCreateInterpreterProcess
ManagedInterpreterGroup.java[getOrCreateInterpreterProcess]
interpreterSetting.createInterpreterProcess -> createLauncher
PluginManager.java[loadInterpreterLauncher]
StandardInterpreterLauncher.java[launchDirectly]
ProcessLauncher.java[transition] 启动interpreter进程
RemoteInterpreterProcess#callRemoteFunction 发起远程执行
RemoteInterpreterServer 作为进程容器,后续由zeppelinServer的RemoteInterpreterEventClient作为客户端创建Interpreter
JDBCInterpreter 其中一种interpreter实例
远程自启 1 2 3 4 5 6 nohup bin/zeppelin-daemon.sh restart & nohup bash -c ' sleep 3 sh /data/soft/zeppelin/bin/restart.sh ' > /dev/null 2>&1 &
笔记本权限从public改私有 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 import jsonf= open ('notebook-authorization.json.bak' , 'r' ) data = json.load(f) for note_id, note_data in data['authInfo' ].items(): if isinstance (note_data, dict ): owner = note_data.get('owners' , []) if not owner: continue first_owner = owner[0 ] readers = note_data.get('readers' , []) if not readers: note_data['readers' ] = [first_owner] writers = note_data.get('writers' , []) if not writers: note_data['writers' ] = [first_owner] runners = note_data.get('runners' , []) if not runners: note_data['runners' ] = [first_owner] with open ("notebook-authorization.json.bak.private" , 'w' ) as f: json.dump(data, f, indent=2 )
python实现hdfs浏览 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 %python class HDFSForm : def __init__ (self ): self .host = z.input ("HDFS Host" , "10.27.48.7:50070" ) self .path = z.input ("Path" , "/" ) form = HDFSForm() class HDFSQuery : def __init__ (self, host ): self .base_url = "http://" + host + "/webhdfs/v1" def query (self, path ): url = self .base_url + path + "?user.name=biclusterhcyhadoop&op=LISTSTATUS" try : import urllib.request import json with urllib.request.urlopen(url) as response: data = response.read().decode('utf-8' ) return json.loads(data) except Exception as e: return {"error" : str (e)} def format_size (bytes_val ): if not bytes_val: return "-" bytes_val = float (bytes_val) for unit in ['B' , 'KB' , 'MB' , 'GB' ]: if bytes_val < 1024.0 : return "%.1f %s" % (bytes_val, unit) bytes_val /= 1024.0 return "%.1f TB" % bytes_val def format_time (timestamp ): if not timestamp or timestamp == 0 : return "-" from datetime import datetime return datetime.fromtimestamp(timestamp/1000 ).strftime('%Y-%m-%d %H:%M:%S' ) hdfs = HDFSQuery(form.host) parsed_data = hdfs.query(form.path) print ("路径:" + form.path)if "error" in parsed_data: print ("<div style='color: red;'><strong>错误:</strong> " + result["error" ] + "</div>" ) else : file_list = parsed_data["FileStatuses" ]["FileStatus" ] total_files = len (file_list) file_count = sum (1 for f in file_list if f["type" ] == "FILE" ) dir_count = sum (1 for f in file_list if f["type" ] == "DIRECTORY" ) total_size = sum (f["length" ] for f in file_list if f["type" ] == "FILE" ) print (f"总项目数: {total_files} 文件数: {file_count} 目录数: {dir_count} 总大小: {format_size(total_size)} " ) print ("=" * 100 ) print (f"{'名称' :<35 } {'类型' :<10 } {'大小' :<10 } {'所有者' :<10 } {'权限' :<8 } {'修改时间' :<20 } " ) print ("-" * 100 ) for file_info in file_list: name = file_info['pathSuffix' ] file_type = file_info['type' ] size = format_size(file_info['length' ]) owner = file_info['owner' ] permission = file_info['permission' ] mod_time = format_time(file_info['modificationTime' ]) icon = "📄" if file_type == "FILE" else "📁" print (f"{icon} {name:<33 } {file_type:<9 } {size:<9 } {owner:<9 } {permission:<7 } {mod_time:<19 } " )