Contents
  1. 1. 解释器使用credentials密码加密
  2. 2. SparkThriftServer方案
  3. 3. 检查用户登陆
  4. 4. 用户密码加密
    1. 4.1. ini方式
    2. 4.2. JDBC方式
  • 代码
  • 远程自启
  • 笔记本权限从public改私有
  • python实现hdfs浏览
  • https://zeppelin.apache.org/download.html

    cp conf/zeppelin-site.xml.template conf/zeppelin-site.xml
    vi conf/zeppelin-site.xml
    修改zeppelin-site.xml指定绑定的ip和port

    bin/zeppelin-daemon.sh start

    bin/zeppelin-daemon.sh stop

    支持的组件

    1
    2
    3
    4
    5
    alluxio    elasticsearch  groovy        java     kylin    neo4j   scalding      sparql
    angular file hazelcastjet jdbc lens pig scio submarine
    beam flink hbase jupyter livy python sh zeppelin-interpreter-shaded-0.10.1.jar
    bigquery flink-cmd ignite kotlin md r spark
    cassandra geode influxdb ksql mongodb sap spark-submit

    解释器使用credentials密码加密

    配置文件添加配置项 zeppelin.credentials.encryptKey,注意密钥需要是16/24/32字节
    创建interpreter时不配置默认密码,用户添加credentials即可

    SparkThriftServer方案

    配置个人账密连接受限用户(proxyuser)启动的thrift,thrift可显示sql对应用户,文件操作为受限用户

    配置JDBC的依赖为 org.apache.hive:hive-jdbc:2.3.4 会自动联网下载. (离线部署则使用hive-jdbc-2.3.10-standalone.jar,否则还需要下载其他依赖)
    driver为 org.apache.hive.jdbc.HiveDriver

    1
    2
    3
    4
    5
    6
    7
    ./local-repo/org/apache/hive/hive-jdbc
    ./local-repo/org/apache/hive/hive-jdbc/2.3.4/hive-jdbc-2.3.4.pom
    ./local-repo/org/apache/hive/hive-jdbc/2.3.4/hive-jdbc-2.3.4.pom.sha1
    ./local-repo/org/apache/hive/hive-jdbc/2.3.4/hive-jdbc-2.3.4.jar
    ./local-repo/org/apache/hive/hive-jdbc/2.3.4/hive-jdbc-2.3.4.jar.sha1
    ./local-repo/query-48/hive-jdbc-2.3.4.jar
    ./local-repo/query-194/hive-jdbc-2.3.4.jar

    使用登录用户身份连接,但还是用的STS进程用户查询,添加自定义配置项 default.proxy.user.property=hive.server2.proxy.user
    hive.server2.enable.doAs=true

    为了能避免创建目录是当前登录用户、写出数据又是启动STS的用户导致permission deny问题,配置hive.server2.enable.doAs=false ,使得操作都是sts的启动用户

    最后使用的方案是启动thriftserver时指定--proxy-user zeppelin,则都用zeppelin身份进行io,而在thriftserver可以知道谁执行

    zeppelin.spark.run.asLoginUser

    • global share: 以第一个使用的人的身份启动
    • isolated per note: 每个人都独立启动
    • scoped per note: 同一个sparksession,但变量相互不干扰

    检查用户登陆

    cat zeppelin-hadoop-LGJF-ZYC6-HCY-SVR556.log2023-08-0* |grep user |awk -F ‘:’ ‘{print $8}’ |sort -u

    grep “Creating connection pool” zeppelin-interpreter-query-shared_process-hadoop-a51-dg-hcy-bi-cal-003.log |awk -F’,’ ‘{print $3}’

    用户密码加密

    ini方式

    shiro.ini

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16

    [main]
    matcher=org.apache.shiro.authc.credential.HashedCredentialsMatcher
    matcher.hashAlgorithmName=sha-256
    matcher.storedCredentialsHexEncoded=true
    ## 加密次数
    #matcher.hashIterations=2
    ## 存储散列后的密码是否为16进制 # true = hex, false = base64


    myRealm.credentialsMatcher = $matcher

    [users]
    # user1 = sha256-hashed-hex-encoded password, role1, role2, ...
    user1 = 2bb80d537b1da3e38bd30361aa855686bde0eacd7162fef6a25fe97bf527a25b, role1, role2, ...

    生成密码方法: echo -n thisIsPassword | sha256sum

    其他验证类:
    SimpleCredentialsMatcher
    AllowAllCredentialsMatcher
    HashedCredentialsMatcher
    PasswordMatcher

    JDBC方式

    Maven Repository: org.xerial » sqlite-jdbc

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    [main] 
    myRealm=org.apache.shiro.realm.jdbc.JdbcRealm
    #明文就 SimpleCredentialsMatcher Sha256CredentialsMatcher
    myRealmCredentialsMatcher=org.apache.shiro.authc.credential.SimpleCredentialsMatcher
    myRealm.credentialsMatcher=$myRealmCredentialsMatcher

    #配置数据库的信息
    dataSource=org.sqlite.SQLiteDataSource
    dataSource.url=jdbc:sqlite:/data/zeppelin/user.db
    myRealm.dataSource=$dataSource

    1
    2
    3
    4
    5
    6
    7
    8
    9
    sqlite3 user.db
    create table users (username text, password text);
    insert into users values ('xx','xx'); -- sha256+hex

    create table user_roles(username text, role_name text);
    insert into user_roles values ('xx','admin');
    update user_roles set role_name='rol' where username='xx';

    create table roles_permissions (username text, permission text);

    代码

    ZeppelinServer

    NotebookService.java[runParagraph] 界面发起执行
    Note.run
    Paragraph.execute getBindedInterpreter()
    InterpreterFactory#getInterpreter
    InterpreterSetting getDefaultInterpreter(ExecutionContext executionContext)
    InterpreterSetting#getOrCreateSession(org.apache.zeppelin.interpreter.ExecutionContext)
    InterpreterSetting.java[getOrCreateInterpreterGroup] Create InterpreterGroup with groupId
    ManagedInterpreterGroup#getOrCreateSession
    InterpreterSetting#createInterpreters : new RemoteInterpreter
    Paragraph.execute interpreter.getScheduler().submit(this);放入队列

    RemotetScheduler(每个interpreter一个) /AbstractScheduler#run出队列->runJobInScheduler->JobRunner->runJob
    Paragraph/InterpretJob(是一种job) jobRun
    RemoteInterpreter.interpret发起执行

    • getOrCreateInterpreterProcess

    • ManagedInterpreterGroup.java[getOrCreateInterpreterProcess]

      • interpreterSetting.createInterpreterProcess -> createLauncher
      • PluginManager.java[loadInterpreterLauncher]
      • StandardInterpreterLauncher.java[launchDirectly]
      • ProcessLauncher.java[transition] 启动interpreter进程
      • RemoteInterpreterProcess#callRemoteFunction 发起远程执行
    • RemoteInterpreterServer 作为进程容器,后续由zeppelinServer的RemoteInterpreterEventClient作为客户端创建Interpreter

      • JDBCInterpreter 其中一种interpreter实例

    远程自启

    1
    2
    3
    4
    5
    6
    nohup bin/zeppelin-daemon.sh restart &

    nohup bash -c '
    sleep 3
    sh /data/soft/zeppelin/bin/restart.sh
    ' > /dev/null 2>&1 &

    笔记本权限从public改私有

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    import json

    f= open('notebook-authorization.json.bak', 'r')
    data = json.load(f)

    for note_id, note_data in data['authInfo'].items():
    if isinstance(note_data, dict):
    owner = note_data.get('owners', [])
    if not owner: # Skip if no owner
    continue

    first_owner = owner[0]

    readers = note_data.get('readers', [])
    if not readers:
    note_data['readers'] = [first_owner]

    writers = note_data.get('writers', [])
    if not writers:
    note_data['writers'] = [first_owner]

    runners = note_data.get('runners', [])
    if not runners:
    note_data['runners'] = [first_owner]

    # Save the modified data back to the file
    with open("notebook-authorization.json.bak.private", 'w') as f:
    json.dump(data, f, indent=2)

    python实现hdfs浏览

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    %python
    class HDFSForm:
    def __init__(self):
    self.host = z.input("HDFS Host", "10.27.48.7:50070")
    self.path = z.input("Path", "/")


    form = HDFSForm()

    # HDFS 查询类
    class HDFSQuery:
    def __init__(self, host):
    self.base_url = "http://" + host + "/webhdfs/v1"

    def query(self, path):
    url = self.base_url + path + "?user.name=biclusterhcyhadoop&op=LISTSTATUS"

    try:
    import urllib.request
    import json

    with urllib.request.urlopen(url) as response:
    data = response.read().decode('utf-8')
    return json.loads(data)
    except Exception as e:
    return {"error": str(e)}

    def format_size(bytes_val):
    if not bytes_val:
    return "-"
    bytes_val = float(bytes_val)
    for unit in ['B', 'KB', 'MB', 'GB']:
    if bytes_val < 1024.0:
    return "%.1f %s" % (bytes_val, unit)
    bytes_val /= 1024.0
    return "%.1f TB" % bytes_val

    def format_time(timestamp):
    if not timestamp or timestamp == 0:
    return "-"
    from datetime import datetime
    return datetime.fromtimestamp(timestamp/1000).strftime('%Y-%m-%d %H:%M:%S')

    hdfs = HDFSQuery(form.host)
    parsed_data = hdfs.query(form.path)

    print("路径:" + form.path)

    if "error" in parsed_data:
    print("<div style='color: red;'><strong>错误:</strong> " + result["error"] + "</div>")
    else:

    file_list = parsed_data["FileStatuses"]["FileStatus"]

    total_files = len(file_list)
    file_count = sum(1 for f in file_list if f["type"] == "FILE")
    dir_count = sum(1 for f in file_list if f["type"] == "DIRECTORY")
    total_size = sum(f["length"] for f in file_list if f["type"] == "FILE")

    print(f"总项目数: {total_files} 文件数: {file_count} 目录数: {dir_count} 总大小: {format_size(total_size)}")

    print("=" * 100)
    print(f"{'名称':<35} {'类型':<10} {'大小':<10} {'所有者':<10} {'权限':<8} {'修改时间':<20}")
    print("-" * 100)

    for file_info in file_list:
    name = file_info['pathSuffix']
    file_type = file_info['type']
    size = format_size(file_info['length'])
    owner = file_info['owner']
    permission = file_info['permission']
    mod_time = format_time(file_info['modificationTime'])

    icon = "📄" if file_type == "FILE" else "📁"

    print(f"{icon} {name:<33} {file_type:<9} {size:<9} {owner:<9} {permission:<7} {mod_time:<19}")